电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume

1、数据仓库概念

数据仓库( Data Warehouse ),是为企业制定决策,提供数据支持的。可以帮助企业,改进业务流程、提高产品质量等。
数据仓库的输入数据通常包括:业务数据、用户行为数据和爬虫数据等。
业务数据:就是各行业在处理事务过程中产生的数据。比如用户在电商网站中登录、下单、支付等过程中,需要和网站后台数据库进行增删改查交互,产生的数据就是业务数据。业务数据通常存储在MySQL、Oracle等数据库中。
用户行为数据:用户在使用产品过程中,通过埋点收集与客户端产品交互过程中产生的数据,并发往日志服务器进行保存。比如页面浏览、点击、停留、评论、点赞、收藏等。用户行为数据通常存储在日志文件中。
爬虫数据:通常是通过技术手段获取其他公司网站的数据。
在这里插入图片描述

2、项目需求及架构设计

2.1 项目需求分析

1、采集平台
(1)用户行为数据采集平台
(2)业务数据采集平台

2.2 项目架构

2.2.1 技术选型

技术选型主要考虑的因素:数据量大小、业务需求、行业内经验、技术成熟度、开发维护成本、总成本预算。

  • 数据采集传输:Flume、Kafka、DataX、MaxWell
  • 数据储存:MySQL、HDFS、Hbase、Redis
  • 数据计算:Hive、Spark、Flink
  • 数据查询:Presto、ClickHouse
  • 数据可视化:Superset、Sugar
  • 任务调度:DolphinScheduler
  • 集群监控:Zabbix、Prometheus
2.2.2 系统数据流程设计

在这里插入图片描述

2.2.3 框架版本选型

(1)Apache框架版本

框架版本
Hadoop3.1.3
Zookeeper3.5.7
MySQL5.7.16
Hive3.1.2
Flume1.9.0
Kafka3.0.0
Spark3.0.0
DataX3.0.0
Superset1.3.2
DolphinScheduler2.0.3
MaxWell1.29.2
Flink1.13.0
Redis6.0.8
Hbase2.0.5
ClickHouse20.4.5.36-2
2.2.4 测试集群资源规划设计
服务名称子服务服务器(hadoop102)服务器(hadoop103)服务器(hadoop104)
HDFSNameNode
HDFSDataNode
HDFSSecondaryNameNode
YarnResourceManager
YarnNodeManager
ZookeeperZookeeper Server
Flume(采集日志)Flume
KafkaKafka
Flume(消费kafka日志)Flume
Flume(消费kafka业务)Flume
Hive
MySQLMysQL
DataX
Spark
DolphinSchedulerApiApplicationServer
DolphinSchedulerAlertServer
DolphinSchedulerMasterServer
DolphinSchedulerWorkerServer
DolphinSchedulerLoggerServer
SupersetSuperset
Flink
ClickHouse
Redis
Hbase
服务数总计201112

3、用户行为日志

3.1 用户行为日志概述

用户行为日志的内容,主要包括用户的各项行为信息以及行为所处的环境信息。收集这些信息的主要目的是优化产品和为各项分析统计指标提供数据支撑。收集这些信息的手段通常为埋点。

目前主流的埋点方式,有代码埋点(前端/后端)、可视化埋点、全埋点等。

代码埋点是通过调用埋点SDK函数,在需要埋点的业务逻辑功能位置调用接口,上报埋点数据。例如,我们对页面中的某个按钮埋点后,当这个按钮被点击时,可以在这个按钮对应的 OnClick 函数里面调用SDK提供的数据发送接口,来发送数据。

可视化埋点只需要研发人员集成采集 SDK,不需要写埋点代码,业务人员就可以通过访问分析平台的“圈选”功能,来“圈”出需要对用户行为进行捕捉的控件,并对该事件进行命名。圈选完毕后,这些配置会同步到各个用户的终端上,由采集 SDK 按照圈选的配置自动进行用户行为数据的采集和发送。

全埋点是通过在产品中嵌入SDK,前端自动采集页面上的全部用户行为事件,上报埋点数据,相当于做了一个统一的埋点。然后再通过界面配置哪些数据需要在系统里面进行分析。

3.2 用户行为日志内容

本项目收集和分析的用户行为信息主要有页面浏览记录、动作记录、曝光记录、启动记录和错误记录。

3.2.1 页面流量记录

页面浏览记录,记录的是访客对页面的浏览行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及页面信息等。

3.2.2 动作记录

动作记录,记录的是用户的业务操作行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息 及动作目标对象信息等。

3.2.3 曝光记录

曝光记录,记录的是曝光行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及曝光对象信息等。

3.2.4 启动记录

启动记录,记录的是用户启动应用的行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、启动类型及开屏广告信息等。

3.2.5 错误记录

启动记录,记录的是用户在使用应用过程中的报错行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、以及可能与报错相关的页面信息、动作信息、曝光信息和动作信息。

3.3 用户行为日志格式

我们的日志结构大致可分为两类,一是页面日志,二是启动日志。

3.3.1 页面日志

页面日志,以页面浏览为单位,即一个页面浏览记录,生成一条页面埋点日志。一条完整的页面日志包含,一个页面浏览记录,若干个用户在该页面所做的动作记录,若干个该页面的曝光记录,以及一个在该页面发生的报错记录。除上述行为信息,页面日志还包含了这些行为所处的各种环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

{
	"common": {                     -- 环境信息
		"ar": "230000",             -- 地区编码
		"ba": "iPhone",             -- 手机品牌
		"ch": "Appstore",           -- 渠道
		"is_new": "1",              -- 是否首日使用,首次使用的当日,该字段值为1,过了24:00,该字段置为0。
		"md": "iPhone 8",           -- 手机型号
		"mid": "YXfhjAYH6As2z9Iq",  -- 设备id
		"os": "iOS 13.2.9",         -- 操作系统
		"uid": "485",               -- 会员id
		"vc": "v2.1.134"            -- app版本号
	},
	"actions": [{                   -- 动作(事件)
		"action_id": "favor_add",   -- 动作id
		"item": "3",                -- 目标id
		"item_type": "sku_id",      -- 目标类型
		"ts": 1585744376605         -- 动作时间戳
	    }
	],
	"displays": [{                  -- 曝光
			"displayType": "query", -- 曝光类型
			"item": "3",            -- 曝光对象id
			"item_type": "sku_id",  -- 曝光对象类型
			"order": 1,             -- 出现顺序
			"pos_id": 2             -- 曝光位置
		},
		{
			"displayType": "promotion",
			"item": "6",
			"item_type": "sku_id",
			"order": 2,
			"pos_id": 1
		},
		{
			"displayType": "promotion",
			"item": "9",
			"item_type": "sku_id",
			"order": 3,
			"pos_id": 3
		},
		{
			"displayType": "recommend",
			"item": "6",
			"item_type": "sku_id",
			"order": 4,
			"pos_id": 2
		},
		{
			"displayType": "query ",
			"item": "6",
			"item_type": "sku_id",
			"order": 5,
			"pos_id": 1
		}
	],
	"page": {                          -- 页面信息
		"during_time": 7648,           -- 持续时间毫秒
		"item": "3", 	               -- 目标id
		"item_type": "sku_id",         -- 目标类型
		"last_page_id": "login",       -- 上页类型
		"page_id": "good_detail",      -- 页面ID
		"sourceType": "promotion"      -- 来源类型
	},                                 
	"err": {                           --错误
		"error_code": "1234",          --错误码
		"msg": "***********"           --错误信息
	},                                 
	"ts": 1585744374423                --跳入时间戳
}
3.3.2 启动日志

启动日志以启动为单位,及一次启动行为,生成一条启动日志。一条完整的启动日志包括一个启动记录,一个本次启动时的报错记录,以及启动时所处的环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

{
  "common": {
    "ar": "370000",
    "ba": "Honor",
    "ch": "wandoujia",
    "is_new": "1",
    "md": "Honor 20s",
    "mid": "eQF5boERMJFOujcp",
    "os": "Android 11.0",
    "uid": "76",
    "vc": "v2.1.134"
  },
  "start": {   
    "entry": "icon",         --icon手机图标  notice 通知   install 安装后启动
    "loading_time": 18803,  --启动加载时间
    "open_ad_id": 7,        --广告页ID
    "open_ad_ms": 3449,    -- 广告总共播放时间
    "open_ad_skip_ms": 1989   --  用户跳过广告时点
  },
"err":{                     --错误
"error_code": "1234",      --错误码
    "msg": "***********"       --错误信息
},
  "ts": 1585744304000
}

3.4 服务器和JDK准备

3.4.1 服务器准备
第1章 Hadoop运行环境搭建
1.1 模板虚拟机环境准备
0)安装模板虚拟机,IP地址192.168.10.100、主机名称hadoop100、内存4G、硬盘50G

1)hadoop100虚拟机配置要求如下(本文Linux系统全部以CentOS-7.5-x86-1804为例)
(1)使用yum安装需要虚拟机可以正常上网,yum安装前可以先测试下虚拟机联网情况
[root@hadoop100 ~]# ping www.baidu.com
PING www.baidu.com (14.215.177.39) 56(84) bytes of data.
64 bytes from 14.215.177.39 (14.215.177.39): icmp_seq=1 ttl=128 time=8.60 ms
64 bytes from 14.215.177.39 (14.215.177.39): icmp_seq=2 ttl=128 time=7.72 ms
(2)安装epel-release
注:Extra Packages for Enterprise Linux是为“红帽系”的操作系统提供额外的软件包,适用于RHEL、CentOS和Scientific Linux。相当于是一个软件仓库,大多数rpm包在官方 repository 中是找不到的)
[root@hadoop100 ~]# yum install -y epel-release
(3)注意:如果Linux安装的是最小系统版,还需要安装如下工具;如果安装的是Linux桌面标准版,不需要执行如下操作
net-tool:工具包集合,包含ifconfig等命令
[root@hadoop100 ~]# yum install -y net-tools 
vim:编辑器
[root@hadoop100 ~]# yum install -y vim
2)关闭防火墙,关闭防火墙开机自启
[root@hadoop100 ~]# systemctl stop firewalld
[root@hadoop100 ~]# systemctl disable firewalld.service
	注意:在企业开发时,通常单个服务器的防火墙时关闭的。公司整体对外会设置非常安全的防火墙
3)创建atguigu用户,并修改atguigu用户的密码
[root@hadoop100 ~]# useradd atguigu
[root@hadoop100 ~]# passwd atguigu
4)配置atguigu用户具有root权限,方便后期加sudo执行root权限的命令
[root@hadoop100 ~]# vim /etc/sudoers
修改/etc/sudoers文件,在%wheel这行下面添加一行,如下所示:
## Allow root to run any commands anywhere
root    ALL=(ALL)     ALL

## Allows people in group wheel to run all commands
%wheel  ALL=(ALL)       ALL
atguigu   ALL=(ALL)     NOPASSWD:ALL
注意:atguigu这一行不要直接放到root行下面,因为所有用户都属于wheel组,你先配置了atguigu具有免密功能,但是程序执行到%wheel行时,该功能又被覆盖回需要密码。所以atguigu要放到%wheel这行下面。
5)在/opt目录下创建文件夹,并修改所属主和所属组
(1)在/opt目录下创建module、software文件夹
[root@hadoop100 ~]# mkdir /opt/module
[root@hadoop100 ~]# mkdir /opt/software
	(2)修改module、software文件夹的所有者和所属组均为atguigu用户 
[root@hadoop100 ~]# chown atguigu:atguigu /opt/module 
[root@hadoop100 ~]# chown atguigu:atguigu /opt/software
(3)查看module、software文件夹的所有者和所属组
[root@hadoop100 ~]# cd /opt/
[root@hadoop100 opt]# ll
总用量 12
drwxr-xr-x. 2 atguigu atguigu 4096 5月  28 17:18 module
drwxr-xr-x. 2 root    root    4096 9月   7 2017 rh
drwxr-xr-x. 2 atguigu atguigu 4096 5月  28 17:18 software
6)卸载虚拟机自带的JDK
	注意:如果你的虚拟机是最小化安装不需要执行这一步。
[root@hadoop100 ~]# rpm -qa | grep -i java | xargs -n1 rpm -e --nodeps 
rpm -qa:查询所安装的所有rpm软件包
grep -i:忽略大小写
xargs -n1:表示每次只传递一个参数
rpm -e –nodeps:强制卸载软件
7)重启虚拟机
[root@hadoop100 ~]# reboot
1.2 克隆虚拟机
1)利用模板机hadoop100,克隆三台虚拟机:hadoop102、hadoop103、hadoop104
	注意:克隆时,要先关闭hadoop100
2)修改克隆机IP,以下以hadoop102举例说明
(1)修改克隆虚拟机的静态IP
[atguigu@hadoop100 ~]# sudo vim /etc/sysconfig/network-scripts/ifcfg-ens33
改成
DEVICE=ens33
TYPE=Ethernet
ONBOOT=yes
BOOTPROTO=static
NAME="ens33"
IPADDR=192.168.10.102
PREFIX=24
GATEWAY=192.168.10.2
DNS1=192.168.10.2
(2)查看Linux虚拟机的虚拟网络编辑器,编辑->虚拟网络编辑器->VMnet8


(3)查看Windows系统适配器VMware Network Adapter VMnet8的IP地址

(4)保证Linux系统ifcfg-ens33文件中IP地址、虚拟网络编辑器地址和Windows系统VM8网络IP地址相同。
3)修改克隆机主机名,以下以hadoop102举例说明
	(1)修改主机名称
[atguigu@hadoop100 ~]# sudo vim /etc/hostname
hadoop102
(2)配置Linux克隆机主机名称映射hosts文件,打开/etc/hosts
[atguigu@hadoop100 ~]# sudo vim /etc/hosts
添加如下内容
192.168.10.100 hadoop100
192.168.10.101 hadoop101
192.168.10.102 hadoop102
192.168.10.103 hadoop103
192.168.10.104 hadoop104
192.168.10.105 hadoop105
192.168.10.106 hadoop106
192.168.10.107 hadoop107
192.168.10.108 hadoop108
4)重启克隆机hadoop102 
[atguigu@hadoop100 ~]# sudo reboot
5)修改windows的主机映射文件(hosts文件)
(1)如果操作系统是window7,可以直接修改 
	①进入C:\Windows\System32\drivers\etc路径
	②打开hosts文件并添加如下内容,然后保存
192.168.10.100 hadoop100
192.168.10.101 hadoop101
192.168.10.102 hadoop102
192.168.10.103 hadoop103
192.168.10.104 hadoop104
192.168.10.105 hadoop105
192.168.10.106 hadoop106
192.168.10.107 hadoop107
192.168.10.108 hadoop108
(2)如果操作系统是window10,先拷贝出来,修改保存以后,再覆盖即可
①进入C:\Windows\System32\drivers\etc路径
②拷贝hosts文件到桌面
③打开桌面hosts文件并添加如下内容
192.168.10.100 hadoop100
192.168.10.101 hadoop101
192.168.10.102 hadoop102
192.168.10.103 hadoop103
192.168.10.104 hadoop104
192.168.10.105 hadoop105
192.168.10.106 hadoop106
192.168.10.107 hadoop107
192.168.10.108 hadoop108
④将桌面hosts文件覆盖C:\Windows\System32\drivers\etc路径hosts文件
3.4.2 编写集群分发脚本xsync

1、xsync集群分发脚本
(1)需求:循环复制文件到所有节点的相同目录下
(2)需求分析
①rsync命令原始拷贝:
rsync -av /opt/module root@hadoop103:/opt/
②期望脚本:
xsync要同步的文件名称
③说明:在/home/zhm/bin这个目录下存放的脚本,zhm用户可以在系统任何地方直接执行。
(3)脚本实现
①在用的家目录/home/zhm下创建bin文件夹
②在/home/atguigu/bin目录下创建xsync文件,以便全局调用
在该文件中编写如下代码

#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
  echo Not Enough Arguement!
  exit;
fi
#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
  echo ====================  $host  ====================
  #3. 遍历所有目录,挨个发送
  for file in $@
  do
    #4 判断文件是否存在
    if [ -e $file ]
    then
      #5. 获取父目录
      pdir=$(cd -P $(dirname $file); pwd)
      #6. 获取当前文件的名称
      fname=$(basename $file)
      ssh $host "mkdir -p $pdir"
      rsync -av $pdir/$fname $host:$pdir
    else
      echo $file does not exists!
    fi
  done
done

③修改脚本xsync具有执行权限

3.4.3 SSH无密登录配置

说明:这里面只配置了hadoop102、hadoop103到其他主机的无密登录;因为hadoop102未外配置的是NameNode,hadoop103配置的是ResourceManager,都要求对其他节点无密访问。
1、hadoop102上生成公钥和私钥:

ssh-keygen -t rsa (要cd ~/.ssh)

然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)
2、将hadoop102公钥拷贝到要免密登录的目标机器上(在.ssh目录下,下面一样)

ssh-copy-id hadoop102
ssh-copy-id hadoop103
ssh-copy-id hadoop104

3、hadoop103上生成公钥和私钥:

ssh-keygen -t rsa

4、将hadoop103公钥拷贝到要免密登录的目标机器上

ssh-copy-id hadoop102
ssh-copy-id hadoop103
ssh-copy-id hadoop104
3.4.4 JDK准备

1、卸载现有JDK(3台节点)

sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps

(1)rpm -qa:表示查询所有已经安装的软件包
(2)grep -i:表示过滤时不区分大小写
(3)xargs -n1:表示一次获取上次执行结果的一个值
(4)rpm -e --nodeps:表示卸载软件
2、用XShell工具将JDK导入到hadoop102的/opt/software文件夹下面
3、解压JDK到/opt/module目录下

tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/

4、配置JDK环境变量
(1)新建/etc/profile.d/my_env.sh文件
添加如下内容,然后保存(:wq)退出

#JAVA_HOME
export JAVA_HOME=/opt/module/jdk
export PATH=$PATH:$JAVA_HOME/bin

(2)让环境变量生效

source /etc/profile.d/my_env.sh

5、测试JDK是否安装成功

java -version

6、分发JDK

xsync /opt/module/jdk

7、分发环境变量配置文件

sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh

8、分别在hadoop103、hadoop104上执行source

3.5 模拟数据

3.5.1 使用说明

1、将application.yml、gmall2020-mock-log-2021-10-10.jar、path.json、logback.xml上传到hadoop102的/opt/module/applog目录下

资源获取
链接:https://pan.baidu.com/s/1d_sumGVy5OxYGHT1HbB7xg
提取码:zhm6

(1)创建applog路径
(2)上传文件到/opt/module/applog目录
2、配置文件
(1)application.yml文件
可以根据需求生成对应日期的用户行为日志。

vim application.yml

修改如下内容

# 外部配置打开
logging.config: "./logback.xml"
#业务日期  注意:并不是Linux系统生成日志的日期,而是生成数据中的时间
mock.date: "2020-06-14"

#模拟数据发送模式
#mock.type: "http"
#mock.type: "kafka"
mock.type: "log"

#http模式下,发送的地址
mock.url: "http://hdp1/applog"

#kafka模式下,发送的地址
mock:
  kafka-server: "hdp1:9092,hdp2:9092,hdp3:9092"
  kafka-topic: "ODS_BASE_LOG"

#启动次数
mock.startup.count: 200
#设备最大值
mock.max.mid: 500000
#会员最大值
mock.max.uid: 100
#商品最大值
mock.max.sku-id: 35
#页面平均访问时间
mock.page.during-time-ms: 20000
#错误概率 百分比
mock.error.rate: 3
#每条日志发送延迟 ms
mock.log.sleep: 10
#商品详情来源  用户查询,商品推广,智能推荐, 促销活动
mock.detail.source-type-rate: "40:25:15:20"
#领取购物券概率
mock.if_get_coupon_rate: 75
#购物券最大id
mock.max.coupon-id: 3
#搜索关键词  
mock.search.keyword: "图书,小米,iphone11,电视,口红,ps5,苹果手机,小米盒子"

(2)path.json,该文件用来配置访问路径
(3)logback配置文件
可配置日志生成路径,修改内容如下

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_HOME" value="/opt/module/applog/log" />
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <!-- 将某一个包下日志单独打印日志 -->
    <logger name="com.atgugu.gmall2020.mock.log.util.LogUtil"
            level="INFO" additivity="false">
        <appender-ref ref="rollingFile" />
        <appender-ref ref="console" />
    </logger>

    <root level="error"  >
        <appender-ref ref="console" />
    </root>
</configuration>

(4)生成日志

  • 进入到/opt/module/applog路径,执行以下命令
java -jar gmall2020-mock-log-2021-10-10.jar
  • 在/opt/module/applog/log目录下查看生成日志
3.5.2 集群日志生成脚本

在hadoop102的/home/atguigu目录下创建bin目录,这样脚本可以在服务器的任何目录执行。
1、在/home/atguigu/bin目录下创建脚本lg.sh
2、在脚本中编写如下内容

#!/bin/bash
for i in hadoop102 hadoop103; do
    echo "========== $i =========="
    ssh $i "cd /opt/module/applog/; java -jar gmall2020-mock-log-2021-10-10.jar >/dev/null 2>&1 &"
done 

3、修改脚本执行权限

chmod 777 lg.sh

4、将jar包及配置文件上传至hadoop103的/opt/module/applog/路径
5、启动脚本

lg.sh

6、分别在hadoop102、hadoop103的/opt/module/applog/log目录上查看生成的数据

4、用户行为数据采集模块

4.1 数据通道

用户行为日志数据通道
在这里插入图片描述

4.2 环境准备

4.2.1 集群所有进程查看脚本

1、在/home/atguigu/bin目录下创建脚本jpsall

vim jpsall

2、在脚本中编写如下内容

#! /bin/bash
 
for i in hadoop102 hadoop103 hadoop104
do
    echo --------- $i ----------
    ssh $i jps
done

3、修改脚本执行权限

chmod 777 jpsall

4、启动脚本

xcall
4.2.2 Hadoop安装
4.2.3 Zookeeper安装
4.2.4 Kafka安装
4.2.5 Flume安装

4.3 日志采集Flume

4.3.1 日志采集Flume概述

按照规划,需要采集的用户行为日志文件分布在hadoop102,hadoop103两台日志服务器,故需要在hadoop102,hadoop103两台节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka。

此处可选择TaildirSource和KafkaChannel,并配置日志校验拦截器。

选择TailDirSource和KafkaChannel的原因如下:
1、TailDirSource
TailDirSource相比ExecSource、SpoolingDirectorySource的优势
TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
SpoolingDirectorySource监控目录,支持断点续传。
2、KafkaChannel
采用Kafka Channel,省去了Sink,提高了效率。

日志采集Flume关键配置如下:
在这里插入图片描述

4.3.2 日志采集Flume配置实操

1、创建Flume配置文件
在hadoop102节点的Flume的job目录下创建file_to_kafka.conf

@hadoop104 flume]$ mkdir job
@hadoop104 flume]$ vim job/file_to_kafka.conf 

2、配置文件内容如下

#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = org.zhm.gmall.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1

3、编写Flume拦截器
(1)创建Maven工程flume-interceptor
(2)创建包:com.atguigu.gmall.flume.interceptor
(3)在pom.xml文件中添加如下配置

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

(4)在org.zhm.gmall.flume.utils包下创建JSONUtil类

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONException;

public class JSONUtil {
/*
* 通过异常判断是否是json字符串
* 是:返回true  不是:返回false
* */
    public static boolean isJSONValidate(String log){
        try {
            JSONObject.parseObject(log);
            return true;
        }catch (JSONException e){
            return false;
        }
    }
}

(5)在org.zhm.gmall.flume.interceptor包下创建ETLInterceptor类

import com.atguigu.gmall.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;


import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
		
		//1、获取body当中的数据并转成字符串
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
		//2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回null
        if (JSONUtil.isJSONValidate(log)) {
            return event;
        } else {
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> list) {

        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            if(intercept(next)==null){
                iterator.remove();
            }
        }

        return list;
    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
        @Override
        public void configure(Context context) {

        }

    }

    @Override
    public void close() {

    }
}

(6)打包
在这里插入图片描述

(7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

4.3.3 日志采集Flume测试

1、启动Zookeeper、Kafka集群
2、启动hadoop102的日志采集Flume

 bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console

3、启动一个Kafka的Console-Consumer

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log

4、生成模拟数据

lg.sh

5、观察Kafka消费者是否能消费到数据

4.3.4 日志·采集Flume启停脚本

1、分发日志采集Flume配置文件和拦截器
若上述测试通过,需将hadoop102节点的Flume的配置文件和拦截器jar包,向另一台日志服务器发送一份。

scp -r job hadoop103:/opt/module/flume/
[atguigu@hadoop102 flume]$ scp lib/flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar hadoop103:/opt/module/flume/lib/

2、方便起见,此处编写一个日志采集Flume进程的启停脚本
在hadoop102节点的/home/atguigu/bin目录下创建脚本f1.sh

vim f1.sh

在脚本中填写如下内容

#!/bin/bash

case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
        done
};; 
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac

3、增加脚本执行权限

 chmod 777 f1.sh

4、f1启动

f1.sh start

5、f1停止

f1.sh stop

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/30511.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

流场粒子追踪精度数值实验

在计算流线&#xff0c;拉格朗日拟序结构等流场后处理时&#xff0c;我们常常需要计算无质量的粒子在流场中迁移时的轨迹&#xff0c;无质量意味着粒子的速度为流场当地的速度。此时&#xff0c;求解粒子的位移这个问题是一个非常简单的常微分方程问题。 假设流场中存在 i 个粒…

Java版本+企业电子招投标系统源代码之电子招投标系统建设的重点和未来趋势

计算机与网络技术的不断发展&#xff0c;推动了社会各行业信息化的步伐。时至今日&#xff0c;电子政务、电子商务已经非常普及&#xff0c;云计算、大数据、工业4.0、“互联网”等发展理念也逐步深入人心&#xff0c;如何将传统行业与互联网科技有效结合起来&#xff0c;产生1…

Vue实现元素沿着坐标数组移动,超出窗口视图时页面跟随元素滚动

一、实现元素沿着坐标数组移动 现在想要实现船沿着下图中的每个河岸移动。 实现思路&#xff1a; 1、将所有河岸的位置以 [{x: 1, y: 2}, {x: 4, y: 4}, …] 的形式保存在数组中。 data() {return {coordinateArr: [{ x: 54, y: 16 }, { x: 15, y: 31 }, { x: 51, y: 69 }…

升级Nginx

目录 前言 一、升级Nginx 1&#xff09;首先在官网下载一个新版本的Nginx 2&#xff09;首先将下载的压缩包进行解包 3&#xff09;进入已解包的目录中 4&#xff09;配置安装路径 5&#xff09;make 6&#xff09;备份原来Nginx的资源 7&#xff09;重启Nginx服务 8&#…

【2023最全教程】Web自动化测试怎么做?Web自动化测试的详细流程和步骤

一、什么是web自动化测试 自动化&#xff08;Automation&#xff09;是指机器设备、系统或过程&#xff08;生产、管理过程&#xff09;在没有人或较少人的直接参与下&#xff0c;按照人的要求&#xff0c;经过自动检测、信息处理、分析判断、操纵控制&#xff0c;实现预期的目…

毕业季Android开发面试,有哪些常见的题?

前言 对于计算机行业早已烂大街&#xff0c;随之而来的毕业季。还会有大批的程序员涌进来&#xff0c;而我们想要继续进入Android开发岗位的人员&#xff0c;最先考虑的是面试。面试题是我们决定踏进工作的重要环节。 对于刚毕业的实习生来说&#xff0c;如何在应聘中脱颖而出…

LightningChart .NET 10.5.1 Crack LightningChart 2023

LightningChart .NET v.10.5.1 已经发布&#xff01; DataCursor 和 3D TransparencyRenderMode 现在可用。 为所有 3D、Polar 和 Smith 系列启用 DataCursor 在早期阶段&#xff0c;LightningChart 提供了不同的工具&#xff0c;需要用户编写额外的代码才能启用数据跟踪功能。…

控制您的数据:Web3私有链为数据主权带来的突破性变革

在数字化时代&#xff0c;数据已经成为企业和个人最宝贵的资产之一。然而&#xff0c;随着大规模数据泄露和滥用事件的频发&#xff0c;数据主权和隐私保护成为了备受关注的问题。在这个背景下&#xff0c;Web3私有链的出现为数据主权带来了一场突破性的变革。 首先&#xff0c…

风景类Midjourney prompt提示词

稳定输出优美风景壁纸的Midjourney prompt提示词。 1\在夏夜&#xff0c;有淡蓝色的星空&#xff0c;海边&#xff0c;流星&#xff0c;烟花&#xff0c;海滩上全是蓝色的玫瑰和绿色的植物&#xff0c;由Ivan Aivazovsky和Dan Mumford&#xff0c;趋势在cgsociety&#xff0c;…

windows2022证书配置.docx

Windows证书的配置 要求两台主机&#xff0c;一台作为域&#xff0c;一台进入域 按要求来选择角色服务 确认之后安装 安装完以后配置证书服务 选择服务 按要求配置 注&#xff1a;此处不用域用户登陆无法使用企业CA 按要求来 创建新的私钥 这几处检查无误后默认即可 有效期…

AJAX概述

1.1什么是AJAX. Ajax即AsynchronousJavascript And XML&#xff1a;异步数据回调。 使用Ajax技术网页应用能够快速地将更新呈现在用户界面上&#xff0c;不需要重载&#xff08;刷新&#xff09;整个页面【只刷新局部】&#xff0c;这使得程序能够更快地回应用户的操作。、 1…

2023年5月青少年机器人技术等级考试理论综合试卷(四级)

青少年机器人技术等级考试理论综合试卷&#xff08;四级&#xff09;2023.6 分数&#xff1a; 100 题数&#xff1a; 30 一、 单选题(共 20 题&#xff0c; 共 80 分) 1.Arduino C 语言&#xff0c; 部分程序如下&#xff0c; 串口监视器输出结果是“D”时&#xff0c; 变量 i …

【集群】Haproxy搭建Web群集

文章目录 一、Haproxy 相关概念1. Haproxy 的概述2. Haproxy 的主要特性3. 常见的 Web 集群调度器4. 常见的应用分析4.1 LVS 应用4.2 Haproxy 应用4.3 LVS、Nginx、Haproxy的区别 5. Haproxy 调度算法原理5.1 roundrobin5.2 static-rr5.3 leastconn5.4 source5.5 uri5.6 url_pa…

SpringBoot + Vue前后端分离项目实战 || 二:Spring Boot后端与数据库连接

系列文章&#xff1a; SpringBoot Vue前后端分离项目实战 || 一&#xff1a;Vue前端设计 文章目录 新建Spring后台项目添加依赖 新建数据库IDEA 连接数据库IDEA 自动创建类实体定义数据传递至前端的格式 B站视频讲解&#xff1a;2023全网最简单但实用的SpringBootVue前后端分离…

RTC

文章目录 前言驱动应用程序运行 前言 RTC&#xff08;Real Time Clock&#xff0c;实时时钟&#xff09;是个常用的外设&#xff0c;通过 RTC 我们可以知道日期和时间信息&#xff0c;因此在需要记录时间的场合就需要实时时钟。 可以使用专用的实时时钟芯片来完成此功能&#…

扫雷小游戏【C语言】

目录 前言 一、基本实现逻辑 二、实现步骤 1. 我们希望在进入游戏时有一个菜单让我们选择 2. 我们希望可以重复的玩&#xff08;一把玩完了还可以接着玩&#xff09; 3. 采用多文件形式编程 4.要扫雷先得有棋盘&#xff08;创建棋盘R*N&#xff09; 5.初始化棋盘 6.打…

【网络安全】深入解析 PHP 代码审计技术与实战

前言 登录某个网站并浏览其页面时&#xff0c;注意到了一些看起来不太对劲的地方。这些迹象可能是该网站存在漏洞或被黑客入侵的标志。为了确保这个网站的安全性&#xff0c;需要进行代码审计&#xff0c;这是一项专门针对软件代码进行检查和分析的技术。在本文中&#xff0c;…

一、Docker介绍

学习参考&#xff1a;尚硅谷Docker实战教程、Docker官网、其他优秀博客(参考过的在文章最后列出) 目录 前言一、Docker是什么&#xff1f;二、Docker能干撒&#xff1f;三、容器虚拟化技术 和 虚拟机有啥区别&#xff1f;1.虚拟机2.容器虚拟化技术3.对比4.Docker为啥比VM虚拟机…

献给蓝初小白系列(二)——Liunx应急响应

1、Linux被入侵的症状​​ ​​https://blog.csdn.net/weixin_52351575/article/details/131221720​​ 2、Linux应急措施 顺序是&#xff1a;隔离主机--->阻断通信--->清除病毒--->可疑用户--->启动项和服务--->文件与后门--->杀毒、重装系统、恢复数据 …

AAC ADTS格式分析

标题 1.AAC简介2. AAC ADTS格式分析2.1 adts_fixed_header详细介绍2.2 adts_variable_header详细介绍 1.AAC简介 AAC音频格式:Advanced Audio Coding(⾼级⾳频解码)&#xff0c;是⼀种由MPEG-4标准定义的有损⾳频压缩格式&#xff0c;由Fraunhofer发展&#xff0c;Dolby, Sony…