用户行为日志采集平台搭建
1、用户行为日志概述
用户行为日志的内容,主要包括用户的各项行为信息以及行为所处的环境信息。收集这些信息的主要目的是优化产品和为各项分析统计指标提供数据支撑。收集这些信息的手段通常为埋点。
目前主流的埋点方式,有代码埋点(前端/后端)、可视化埋点、全埋点等。
- 代码埋点是通过调用埋点SDK函数(JavaScript 技术 ),在需要埋点的业务逻辑功能位置调用接口,上报埋点数据。例如,我们对页面中的某个按钮埋点后,当这个按钮被点击时,可以在这个按钮对应的 OnClick 函数里面调用SDK提供的数据发送接口,来发送数据(把用户的行为以 http请求的形式发送到后端)。
- 可视化埋点只需要研发人员集成采集 SDK,不需要写埋点代码,业务人员就可以通过访问分析平台的“圈选”功能,来“圈”出需要对用户行为进行捕捉的控件,并对该事件进行命名。圈选完毕后,这些配置会同步到各个用户的终端上,由采集 SDK 按照圈选的配置自动进行用户行为数据的采集和发送。
- 全埋点是通过在产品中嵌入SDK(比如使用神策的埋点SDK),前端自动采集页面上的全部用户行为事件,上报埋点数据,相当于做了一个统一的埋点。然后再通过界面配置哪些数据需要在系统里面进行分析。
目前用的比较多的是全埋点,但是毕竟使用第三方的 SDK 可能出现数据泄露的风险。
2、用户行为日志内容
本项目收集和分析的用户行为信息主要有页面浏览记录、动作记录、曝光记录、启动记录和错误记录。
2.1、页面浏览记录
页面浏览记录,记录的是访客对页面的浏览行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及页面信息等。
- 用户 id:每个账号分配一个唯一的 id
- 时间信息:用户跳入页面的时间和跳出的时间,以此来评估用户对该商品的感兴趣程度等等指标
- 设备信息:获取用户的设备信息,可以用来做广告推荐等
2.2、动作记录
动作记录,记录的是用户的业务操作行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息 及动作目标对象信息等。
- 动作目标对象相关信息,比如用户点击了哪个模块 (优惠券?哪个优惠价)
2.3、曝光记录
曝光记录,记录的是曝光行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及曝光对象信息等。
- 曝光相关信息:曝光指的是在当前界面展示过的内容
2.4、启动记录
启动记录,记录的是用户启动应用的行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、启动类型及开屏广告信息等。
- 启动类型:用户是点击软件按钮启动还是推送通知启动
- 开屏广告信息:打开软件时广告界面的广告id
2.5、错误记录
启动记录,记录的是用户在使用应用过程中的报错行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、以及可能与报错相关的页面信息、动作信息、曝光信息和动作信息。
3、用户行为日志格式
上面我们知道了各种用户行为日志的类型,我们可以发现,有很多记录其实是重复的,比如用户信息、设别信息、渠道信息 ... 存储重复的记录不仅没有意义而且占用磁盘空间,那么应该怎样存储呢?
我们的日志结构大致可分为两类,一是页面日志,二是启动日志。
3.1、页面日志
页面日志,以页面浏览为单位,即一个页面浏览记录,生成一条页面埋点日志。一条完整的页面日志包含,一个页面浏览记录,若干个用户在该页面所做的动作记录,若干个该页面的曝光记录,以及一个在该页面发生的报错记录。除上述行为信息,页面日志还包含了这些行为所处的各种环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。
{
"common": { -- 环境信息
"ar": "230000", -- 地区编码
"ba": "iPhone", -- 手机品牌
"ch": "Appstore", -- 渠道
"is_new": "1", -- 是否首日使用,首次使用的当日,该字段值为1,过了24:00,该字段置为0。
"md": "iPhone 8", -- 手机型号
"mid": "YXfhjAYH6As2z9Iq", -- 设备id
"os": "iOS 13.2.9", -- 操作系统
"uid": "485", -- 会员id
"vc": "v2.1.134" -- app版本号
},
"actions": [{ -- 动作(事件)
"action_id": "favor_add", -- 动作id
"item": "3", -- 目标id
"item_type": "sku_id", -- 目标类型
"ts": 1585744376605 -- 动作时间戳
}
],
"displays": [{ -- 曝光
"displayType": "query", -- 曝光类型
"item": "3", -- 曝光对象id
"item_type": "sku_id", -- 曝光对象类型
"order": 1, -- 出现顺序
"pos_id": 2 -- 曝光位置
},
{
"displayType": "promotion",
"item": "6",
"item_type": "sku_id",
"order": 2,
"pos_id": 1
},
{
"displayType": "promotion",
"item": "9",
"item_type": "sku_id",
"order": 3,
"pos_id": 3
},
{
"displayType": "recommend",
"item": "6",
"item_type": "sku_id",
"order": 4,
"pos_id": 2
},
{
"displayType": "query ",
"item": "6",
"item_type": "sku_id",
"order": 5,
"pos_id": 1
}
],
"page": { -- 页面信息
"during_time": 7648, -- 持续时间毫秒
"item": "3", -- 目标id
"item_type": "sku_id", -- 目标类型
"last_page_id": "login", -- 上页类型
"page_id": "good_detail", -- 页面ID
"sourceType": "promotion" -- 来源类型
},
"err": { --错误
"error_code": "1234", --错误码
"msg": "***********" --错误信息
},
"ts": 1585744374423 --跳入时间戳
}
3.2、启动日志
启动日志以启动为单位,及一次启动行为,生成一条启动日志。一条完整的启动日志包括一个启动记录,一个本次启动时的报错记录,以及启动时所处的环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。
{
"common": {
"ar": "370000",
"ba": "Honor",
"ch": "wandoujia",
"is_new": "1",
"md": "Honor 20s",
"mid": "eQF5boERMJFOujcp",
"os": "Android 11.0",
"uid": "76",
"vc": "v2.1.134"
},
"start": {
"entry": "icon", --icon手机图标 notice 通知 install 安装后启动
"loading_time": 18803, --启动加载时间
"open_ad_id": 7, --广告页ID
"open_ad_ms": 3449, -- 广告总共播放时间
"open_ad_skip_ms": 1989 -- 用户跳过广告时点
},
"err":{ --错误
"error_code": "1234", --错误码
"msg": "***********" --错误信息
},
"ts": 1585744304000 --日志产生的时间
}
4、模拟数据
模拟数据我们直接使用封装好的日志生成 jar 包:
java -jar gmall2020-mock-log-2021-10-10.jar
我们需要在 hadoop102 和 hadoop103 上都配置好日志数据源,所以为了方便还需要写一个脚本:
#!/bin/bash
for i in hadoop102 hadoop103; do
echo "========== $i =========="
ssh $i "cd /opt/module/applog/; java -jar gmall2020-mock-log-2021-10-10.jar >/dev/null 2>&1 &"
done
我们在 hadoop102 和 hadoop103 的 /opt/module/applog/log 看到最新生成的日志,这个日志文件是按照时间滚动的
用户行为日志数据采集模块
1、数据通道
现在,我们需要把用户行为日志数据通过 Flume 传输到 Kafka 的日志 topic 中。
上面我们的 hadoop102 和 hadoop103 已经具备了生成日志的能力,之前我们说 flume 是但节点安装,确实没错,但是这里因为我们是两个地方产生日志,如果跨节点传输日志那样效率太低了,所以索性就直接安装两个 flume 即可。
这里我们回顾一下之前学习 flume 的时候讲到的 source ,有
- taildir:适合用于监听多个实时追加的文件(支持不同目录的不同文件),并且能够实现断点续传
- netcat:监听 netcat 端口发生过来的内容
- exec:适用于监控一个实时追加的文件,不能实现断点续传
- spooling:适合用于同步新文件,支持断点续传,但是它监听的是文件夹而不是文件,不适合对实时追加日志的文件进行监听并同步
- avro:当需要多个 flume 节点之间传输日志时,之间必须使用 avro sourcce 和 avro sink
- kafka:kafka source 其实就是一个 Kafka 消费者,从Kafka 的 topic 中读取数据:
还有 channel:
- file :日志数据缓存在本地磁盘,并且在内存中保存有索引,磁盘中同样保存两份索引来保证读取的速度
- memory:日志缓存在内存中,读取速度快,但是不安全
- kafka:实际使用最多的
kafka channel 其实就是把 Event 保存到了 Kafka 集群当中,它支持 3 种结构:
1. 结合 source 和 sink
2. 只结合 source
3. 只结合 sink
还有 sink:
- hdfs:写出到 hdfs
- kafka:写出到 kafka,相当于 kafka 消费者
- avro:写出到 avro 系统(avro 是一个 rpc 数据序列化系统)
为了保证数据的可靠性(以及 Kafak 具有削峰解耦的功能),我们当然是选择 Kafak Channel 作为 channel ,那么对于 Kafak Channel 的三种结构,我们应该如何选择呢?
首先第一种结构 - 结合 source 和 sink,这种结构不可取。因为 Kafka Channel 有一个参数 parseAsFlumeEvent(默认为 true) ,它的意思是把日志数据以 Flume Event 的格式存储到 Kafka,这就包括了 Event Header 和 Event Body。对于离线数仓而言,因为它本来 sink 用的就是 flume 的 sink 从 kafka channel 中读取,它可以把 event 中的 body 解析出来。但是对于实时数仓,flink 是从 kafka 中直接读取数据的,所以它读到的就是包含 header 和 body 的完整 event 数据,而 header 对于实时数仓来说根本没有,所以没有必要存储。
既然 header 不需要存储我们能不能把 parseAsFlumeEvent 设为 false 然后继续使用这种结构呢,我们知道拦截器是在 source 和 channel 之间起作用的,如果不使用 flume event 的形式存储数据就不能使用拦截器,而 channel 和 sink 之间又不能设置拦截器。所以最好的办法就是使用第二种结构 - Kafak Channel 结合 Source 使用,同时设置 parseAsFlumeEvent 参数为 false ,这样存储进 Kafka 的数据就只有 body,虽然没有了 sink,但是我们可以把存储在 Kafak 中的数据当做一个数据源,下游再用一个 Source 读取,这样我们就可以再设置拦截器了。
2、作业脚本配置
编写 flume 配置文件
file_to_kafka.conf
#定义组件
a1.sources = r1
a1.channels = c1
#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json
#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
#组装
a1.sources.r1.channels = c1
测试
# 启动 zookeeper 和 kafka
zk start
kf.sh start
# 开启flume作业
bin/flume-ng agent -n a1 -c conf/ -f job/warehouse/file_to_kafka.conf -Dflume.root.logger=INFO,console
# 开启kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log
# 开启日志生成器
mklog.sh
运行结果:
至此,flume 的 source -> kafka channel 配置完毕,接下来我们需要在 flume 这里进行一个简单的 ETL 数据清洗,把一些脏数据去除掉,而且这里不能做复杂的拦截器,比如添加多个拦截器,因为flume 毕竟只是一个传递数据的管道,如果这里的 ETL工作太复杂会导致数据堆积在拦截器这里,影响效率。
添加拦截器
引入依赖和打包插件:
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Json 工具类:
package com.lyh.gmall.utils;
import com.alibaba.fastjson.JSONObject;
public class JSONUtil {
// 通过异常捕获来校验json是否合法
public static boolean isJsonValidate(String body) {
try {
JSONObject.parseObject(body);
return true;
}catch (Exception e){
return false;
}
}
}
自定义拦截器:
package com.lyh.gmall.interceptor;
import com.lyh.gmall.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class ETLInterceptor implements Interceptor {
@Override
public void initialize() {
}
// 单个事件的拦截器
@Override
public Event intercept(Event event) {
// 1. 获取body当中的数据
String body = new String(event.getBody(), StandardCharsets.UTF_8);
// 2. 判断数据是否合法,如果合法直接返回,否则返回 null
return JSONUtil.isJsonValidate(body)?event:null;
}
// 多个事件的拦截器
@Override
public List<Event> intercept(List<Event> list) {
for (int i = 0; i < list.size(); i++) {
if (intercept(list.get(i)) == null){
list.remove(i);
i--;
}
}
return list;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new ETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
打包后会生成两个jar包,选择带依赖的放到 flume 的 lib 目录下,在 flume 作业配置文件中添加以下配置来添加拦截器:
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.lyh.gmall.interceptor.ETLInterceptor$Builder
测试:
因为 flume 监听的是 applog/log/app-log-xxx.log 文件,所以我们直接 echo 写入来模拟错误日志(不完整的 json 信息):
可以看到,不合法的 json 数据被过滤掉了,至此,我们的数据采集通道可用了。根据我们的项目架构图可以看到,我们需要在 hadoop102 和 hadoop103 上采集,所以 hadoop103 也许要配置:
flume 采集脚本
#!/bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103
do
echo " -------- $i flume开始采集 -------"
ssh $i "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf/ -f /opt/module/flume-1.9.0/job/warehouse/file_to_kafka.conf >/dev/null 2>&1 &"
done
};;
"stop"){
for i in hadoop102 hadoop103
do
echo " -------- $i flume停止采集-------"
ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
done
};;
esac
总结
至此,用户行为日志的采集环境搭建好了,只要输入命令 f1.sh start 就可以让在 hadoop102 和 hadoop103 上的 flume开启采集日志了。接下来就是业务数据的采集了。
这一部分还算简单,对于 flume 的操作无非就是拦截器的编写,逻辑也没多复杂,flume 的 source 使用了 taildir 监听由脚本不断生成的日志文件,channel 使用了 Kafka source ,sink 估计不会采用 flume 的 sink。