07用户行为日志数据采集

用户行为数据由Flume从Kafka直接同步到HDFS,由于离线数仓采用Hive的分区表按天统计,所以目标路径要包含一层日期。具体数据流向如下图所示。
在这里插入图片描述
按照规划,该Flume需将Kafka中topic_log的数据发往HDFS。并且对每天产生的用户行为日志进行区分,将不同天的数据发往HDFS不同天的路径。
此处选择KafkaSource、FileChannel、HDFSSink。关键配置如下:
在这里插入图片描述

日志消费者 Flume 实操

  1. 在hadoop101 节点的Flume 的 job目录下创建 kafka_to_hdfs_log.conf,内容如下
    配置注释:

    • FileChannel优化:配置 dataDirsk可以通过逗号分隔指向多个路径,每个路径对应不同硬盘,可以增加吞吐量。
    • 新增checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
    #定义组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1
    
    #配置source1
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092
    a1.sources.r1.kafka.topics=topic_log
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.logan.gmall.flume.interceptor.TimestampInterceptor$Builder
    
    #配置channel
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 6
    
    #配置sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = log
    a1.sinks.k1.hdfs.round = false
    
    
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 0
    
    #控制输出文件类型
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = gzip
    
    #组装 
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
  2. HDFS Sink 优化

    • HDFS存入大量小文件,有什么影响?
      元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命。
    • HDFS小文件处理。
      官方默认三个参数配置写入HDFS 后会产生小文件: hdfs.rollInterval, hdfs.rollSize, hdfs.rollCount。
      本次配置的参数为hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0。意味着文件在达到128M时会滚动生成新文件,或者文件超过 3600 秒会生成新文件。
  3. 编写 Flume 拦截器

    • 解决问题
      在这里插入图片描述
    • 在com.logan.gmall.flume.interceptor包下创建TimestampInterceptor类
    package com.logan.gmall.flume.interceptor;
    
    import com.alibaba.fastjson.JSONObject;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.List;
    import java.util.Map;
    
    public class TimestampInterceptor implements Interceptor {
    
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
            // 获取header和body数据
            Map<String, String> headers = event.getHeaders();
            String body = new String(event.getBody(), StandardCharsets.UTF_8);
    
            // 将body转换成JsonObject类型
            JSONObject jsonObject = JSONObject.parseObject(body);
    
            // 将header中的timestamp时间转换成body中的timestamp(解决数据漂移问题)
            String ts = jsonObject.getString("ts");
            headers.put("timestamp", ts);
    
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> list) {
            for (Event event : list) {
                intercept(event);
            }
            return list;
        }
    
        public static class Builder implements Interceptor.Builder{
    
            @Override
            public Interceptor build() {
                return new TimestampInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
        @Override
        public void close() {
    
        }
    }
    
    
  4. 将打好的包放入到hadoop101的/opt/module/flume/lib文件夹下

启动测试

  1. 启动 Zookeeper、Kafka 集群
  2. 启动 hadoop101 的消费Flume
[logan@hadoop101 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
  1. 生成模拟数据[logan@hadoop101 ~]$ vim /opt/module/applog/log/app.2023-12-14.log
{"common":{"ar":"110000","ba":"vivo","ch":"oppo","is_new":"0","md":"vivo iqoo3","mid":"mid_70997","os":"Android 11.0","uid":"776","vc":"v2.1.134"},"start":{"entry":"icon","loading_time":11968,"open_ad_id":16,"open_ad_ms":7891,"open_ad_skip_ms":0},"ts":1672503309000}
{"common":{"ar":"110000","ba":"vivo","ch":"oppo","is_new":"0","md":"vivo iqoo3","mid":"mid_70997","os":"Android 11.0","uid":"776","vc":"v2.1.134"},"displays":[{"display_type":"activity","item":"2","item_type":"activity_id","order":1,"pos_id":1},{"display_type":"activity","item":"2","item_type":"activity_id","order":2,"pos_id":1},{"display_type":"query","item":"9","item_type":"sku_id","order":3,"pos_id":1},{"display_type":"query","item":"18","item_type":"sku_id","order":4,"pos_id":4},{"display_type":"promotion","item":"35","item_type":"sku_id","order":5,"pos_id":4},{"display_type":"query","item":"35","item_type":"sku_id","order":6,"pos_id":4},{"display_type":"recommend","item":"13","item_type":"sku_id","order":7,"pos_id":5}],"page":{"during_time":14287,"page_id":"home"},"ts":1672503309000}
  1. 检查HFDS是否生成数据
  2. 当 HDFS 生成数据后,增加[logan@hadoop101 bin]$ vim f2.sh
#!/bin/bash

case $1 in
"start")
        echo " --------启动 hadoop101 日志数据flume-------"
        ssh hadoop101 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 hadoop101 日志数据flume-------"
        ssh hadoop101 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
  1. 最终 HDFS 文件
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/253039.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【学习笔记】JavaScript中的GC算法

1、内存管理 内存&#xff1a;由可读写单元组成&#xff0c;标识一片可操作的空间 管理&#xff1a; 认为的去操作一篇空间的申请、使用和释放 内存管理&#xff1a;开发者主动申请空间、使用空间、释放空间 管理流程&#xff1a; 申请-使用-释放 // 申请 let obj {} //使…

java代理模式

1.定义:一个对象要访问另外一个对象 通过一个中间对象&#xff0c;像一个中介 2.类图 一个抽象类 一个代理类 一个真实调用对象类 3.代理模式 4.符合开闭原则 可以新创建代理类 来满足不通的情况 例如不同等级的账号拥有的权限不同 5.总结 6.类似springAOP

遗传算法应用-- 栅格法机器人路径规划

文章目录 一、遗传算法1.1 编码与解码1.2 选择算子-轮盘赌法1.3 交叉算子1.4 变异算子1.5 遗传算法流程1.6 基于遗传算法的栅格法机器人路径规划 二、采用模拟退火算法改善适应度函数 一、遗传算法 遗传算法 (Genetic AIgorithm, 简称 GA)起源于对生物系统所进行的计算机模拟研…

DC电源模块的设计与制造技术创新

BOSHIDA DC电源模块的设计与制造技术创新 DC电源模块的设计与制造技术创新主要涉及以下几个方面&#xff1a; 1. 高效率设计&#xff1a;传统的DC电源模块存在能量转换损耗较大的问题&#xff0c;技术创新可通过采用高效率的电路拓扑结构、使用高性能的功率开关器件和优化控制…

HarmonyOS云开发基础认证考试满分答案(100分)【全网最全-不断更新】【鸿蒙专栏-29】

系列文章&#xff1a; HarmonyOS应用开发者基础认证满分答案&#xff08;100分&#xff09; HarmonyOS应用开发者基础认证【闯关习题 满分答案】 HarmonyOS应用开发者高级认证满分答案&#xff08;100分&#xff09; HarmonyOS云开发基础认证满分答案&#xff08;100分&#xf…

动态规划——OJ题(一)

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、第N个泰波那契数1、题目讲解2、思路讲解3、代码实现 二、三步问题1、题目讲解2、思路讲解…

PyCharm community 安装教程

目录 链接cudatoolkit 下载地址&#xff1a;https://www.jetbrains.com/pycharm/download/other.html 双击打开 安装路径&#xff0c;可自行更换到D盘 不导入设置 链接cudatoolkit

英伟达盒子 Jetson Xshell连接串口查看日志方法(串口日志、盒子日志)

文章目录 连接串口xshell连接串口信息 连接串口 接盒子上的A2、B2&#xff0c;以及接地线&#xff1a; 另外一头接上电脑&#xff08;我用的RS485转USB工具&#xff09;&#xff1a; xshell连接 协议选择SERIAL&#xff1a; 设置盒子厂商约定的端口号、波特率、数据位、停止位…

[Verilog] Verilog 数值表示

主页&#xff1a; 元存储博客 文章目录 前言1. 整数表示1.1 整数数据类型1.2 整数转换函数 2. 负数表示3. 实数表示4. 逻辑电平表示5. 逻辑值表示6. 字符表示法7. 字符串表示 前言 Verilog中&#xff0c;可以使用多种方式表示数值。 1. 整数表示 1.1 整数数据类型 基数格式…

相机倾斜棋盘格标定全记录 vs200+opencv安装

论文参考是这个 Geiger A, Moosmann F, Car , et al. Automatic camera and range sensor calibration using a single shot[C]//Robotics and Automation (ICRA), 2012 IEEE International Conference on. IEEE, 2012: 3936-3943. 代码是这个github 花了一上午配好了c环境。。…

AAAI中稿心得

很幸运我们的一篇工作中稿了AAAI2024&#xff0c;题目是 Self-Prompt Mechanism for Few-Shot Image Recognition. 很高兴能在研二的上学期中稿一篇a会保底&#xff0c;也是我中稿的第一篇工作&#xff0c;成为我申请博士的资本。最重要的是&#xff0c;让枯燥无味的科研&#…

linux串口数据丢失--中断绑定CPU优化

问题现象 机器在户外测试时, 出现 轮速记 丢失的现象 小概率出现 50Hz丢失1~2帧极低概率出现 0.1~0.3秒内没有底盘数据 此问题导致slam定位漂, 需要优化处理. 验证与测试 问题1: 底盘串口 一个数据帧(head–data–crc) 被分片2~3报文 解决方法: 检测到head之后, 解析data…

机器学习--归一化处理

归一化 归一化的目的 归一化的一个目的是&#xff0c;使得梯度下降在不同维度 θ \theta θ 参数&#xff08;不同数量级&#xff09;上&#xff0c;可以步调一致协同的进行梯度下降。这就好比社会主义&#xff0c;一小部分人先富裕起来了&#xff0c;先富带后富&#xff0c…

03 使用Vite开发Vue3项目

概述 要使用vite创建Vue3项目&#xff0c;有很多种方式&#xff0c;如果使用命令&#xff0c;则推荐如下命令&#xff1a; # 使用nvm将nodejs的版本切换到20 nvm use 20# 全局安装yarn npm install -g yarn# 使用yarnvite创建项目 yarn create vite不过&#xff0c;笔者更推荐…

docker小白第五天

docker小白第五天 docker的私有库 有些涉密的信息代码不能放在阿里云的镜像仓库&#xff0c;因此需要构建一个个人内网专属的私有库&#xff0c;将镜像或者容器代码进行推送保存。 下载镜像docker registry 执行代码docker pull registry&#xff0c;用于搭建私服前的准备。…

Python异常值的自动检测实战案例

概要 在数据分析和机器学习中&#xff0c;异常值的检测是一个关键步骤&#xff0c;它有助于识别数据中的异常模式和离群点。本文将介绍Python中异常值检测的实战案例&#xff0c;使用一些常见的技术和库&#xff0c;为大家提供全面的示例代码和详细解释。 异常值的定义 异常值…

虚拟机下Ubuntu上网设置

文章目录 一、虚拟机上网的两种方式1.1 NAT模式&#xff08;Network Address Translation&#xff09;1.2 桥接模式&#xff08;Bridge Mode&#xff09;1.3 简介 二、实际配置2.1 NAT模式配置2.2 桥接模式配置 之前跟着博客配了好几个也没用&#xff0c;后来自己慢慢模式实践测…

HQL优化之数据倾斜

group by导致倾斜 前文提到过&#xff0c;Hive中未经优化的分组聚合&#xff0c;是通过一个MapReduce Job实现的。Map端负责读取数据&#xff0c;并按照分组字段分区&#xff0c;通过Shuffle&#xff0c;将数据发往Reduce端&#xff0c;各组数据在Reduce端完成最终的聚合运算。…

女生想通过培训转行软件测试类可行吗?

首先&#xff0c;女生转行IT行业做软件测试是可以的&#xff0c;因为软件测试岗&#xff0c;尤其是其中的功能性测试岗&#xff0c;入行门槛并不高&#xff0c;有很多女生在做&#xff0c;且我个人认为还蛮适合女生的&#xff0c;因为女生相对来说更细心&#xff0c;文档能力也…

PVE系列-防火墙的免费安静之旅IPfire

Ventoy一款引导盘可以引导各种启动盘安装盘的工具https://www.ventoy.net/cn/index.html 在它的兼容iso的列表 中发现了Ipfirehttps://wiki.ipfire.org/ &#xff0c;本来用着openwrt也挺好&#xff0c;忍不住的虚拟机尝了尝鲜&#xff0c;发现的功能有2&#xff0c; 安全吧&a…