【数仓】通过Flume+kafka采集日志数据存储到Hadoop

相关文章

  • 【数仓】基本概念、知识普及、核心技术
  • 【数仓】数据分层概念以及相关逻辑
  • 【数仓】Hadoop软件安装及使用(集群配置)
  • 【数仓】Hadoop集群配置常用参数说明
  • 【数仓】zookeeper软件安装及集群配置
  • 【数仓】kafka软件安装及集群配置
  • 【数仓】flume软件安装及配置
  • 【数仓】flume常见配置总结,以及示例

一、flume有什么作用

Apache Flume是一个分布式、可靠且可用的大数据日志采集、聚合和传输系统。它主要用于将大量的日志数据从不同的数据源收集起来,然后通过通道(Channel)进行传输,最终将数据传输到指定的目的地,如HDFS、HBase等。Flume具有高度可扩展性、容错性和灵活性,可以适应各种复杂的数据采集场景。

Flume的核心组件包括Source、Channel和Sink。Source负责从数据源中读取数据,可以是文件、网络套接字、消息队列等;Channel是数据的缓冲区,用于在Source和Sink之间传输数据;Sink负责将数据写入目标存储系统,如HDFS、HBase、Kafka等。此外,Flume还支持多种类型的Source、Channel和Sink,用户可以根据实际需求进行选择和配置。

Flume的主要作用是实现大规模数据采集和传输,实现数据的实时处理和分析,从而为企业提供更好的业务决策支持。在实际应用中,Flume可以用于日志收集、事件跟踪、数据流处理等场景。通过将数据从不同的数据源采集并传输到指定的目的地,Flume可以帮助企业实现数据的集中存储和管理,为后续的数据分析和挖掘提供基础。

此外,Flume还具有可靠性机制和故障转移和恢复机制,能够保证数据传输的可靠性和安全性。同时,Flume还支持客户扩展和自定义开发,用户可以根据自己的需求进行扩展和优化,使其更加适合特定的应用场景。

总的来说,Apache Flume是一个功能强大、灵活可靠的大数据日志采集、聚合和传输系统,它在大数据处理中起到了至关重要的作用。

二、环境准备

准备1台虚拟机

  • Hadoop131:192.168.56.131

本例系统版本 CentOS-7.8,已安装jdk1.8

关闭防火墙

systemctl stop firewalld

zookeeper、kafka 已安装,且已启动

三、flume安装配置

1、配置flume agent

1)本例演示 flume 去掉kafka数据,然后存储到hdfs中
2)完整数据通道是:log文件 > flume > kafka > flume > hdfs
3)flume 安装目录是 /data/flume
4)kafka 、Hadoop在前面已经安装过

新建配置文件 /data/flume/conf/job/kafka_to_hdfs_log.conf,内容如下:

# 定义组件
# 这里定义了Flume agent的三个主要组件:source(数据源)、channel(通道)和sink(数据接收器)。
a2.sources=r2
a2.channels=c2
a2.sinks=k2

# 配置source
# 配置数据源为Kafka,指定了Kafka的相关参数,如服务器地址、主题等。
a2.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
# 每次从Kafka拉取的数据量
a2.sources.r2.batchSize=5000
# 拉取数据的间隔时间(毫秒)
a2.sources.r2.batchDurationMillis=2000
# Kafka服务器地址列表
a2.sources.r2.kafka.bootstrap.servers = hadoop131:9092,hadoop132:9092,hadoop133:9092
# 从Kafka的哪个主题拉取数据
a2.sources.r2.kafka.topics=topic_log
# 注释掉的部分是关于拦截器的配置,拦截器可以用来对数据源的数据进行预处理。
#a2.sources.r2.interceptors=i2
#a2.sources.r2.interceptors.i2.type = com.my.flume.interceptor.TimestampInterceptor

# 配置channel
# 配置通道为文件通道,指定了通道的相关参数,如检查点目录、数据目录等。
a2.channels.c2.type = file
# 检查点目录,用于存储通道的状态信息
a2.channels.c2.checkpointDir = /data/flume/checkpoint/behaviorl
# 数据目录,用于存储通道中的数据
a2.channels.c2.dataDirs = /data/flume/data/behaviorl
# 通道中文件的最大大小(字节)
a2.channels.c2.maxFileSize = 2146435071
# 通道的容量,即可以存储的最大事件数
a2.channels.c2.capacity = 1000000
# 通道的keepalive时间(秒)
a2.channels.c2.keepalive = 6

# 配置sink
# 配置数据接收器为HDFS,指定了HDFS的相关参数,如文件路径、文件前缀等。
a2.sinks.k2.type =hdfs
# HDFS上的文件路径,使用了时间变量来动态生成目录
a2.sinks.k2.hdfs.path = /origin_data/user/log/topic_log/%Y-%m-%d
# HDFS上的文件前缀
a2.sinks.k2.hdfs.filePrefix=log
# 是否按照时间轮转文件,这里设置为false,表示不按照时间轮转
a2.sinks.k2.hdfs.round =false
# 文件轮转的时间间隔(秒)
a2.sinks.k2.hdfs.rollInterval=10
# 文件轮转的大小阈值(字节)
a2.sinks.k2.hdfs.rollSize=134217728
# 文件轮转的事件数阈值,这里设置为0,表示不按照事件数轮转
a2.sinks.k2.hdfs.rollCount=0

# 控制输出文件类型
# 设置输出文件的类型为压缩流格式,并使用gzip压缩算法。
a2.sinks.k2.hdfs.fileType = CompressedStream
a2.sinks.k2.hdfs.codeC = gzip

# 组装
# 将数据源、通道和数据接收器组装在一起,形成一个完整的Flume agent。
a2.sources.r2.channels=c2
a2.sinks.k2.channel=c2

这个配置文件定义了一个Flume agent,它从Kafka中读取数据,通过文件通道进行缓存,并最终将数据写入到HDFS中。在写入HDFS时,使用了压缩流格式,并对输出文件进行了gzip压缩。同时,还通过一些参数对文件的轮转进行了控制。

2、启动flume

1)创建flume启动脚本f2.sh

vi /usr/bin/f2.sh
# 修改文件权限
chmod 777 /usr/bin/f2.sh

2)复制如下内容

#!/bin/bash

#1. 判断参数个数
if [ $# -lt 1 ]
then
  echo Not Enough Arguement!
  exit;
fi

case $1 in
"start")
    #遍历集群所有机器
    for host in hadoop131
    do
        echo --------------------  $host 日志收集 flume 启动 --------------------
        ssh $host "nohup /data/flume/bin/flume-ng agent -n a2 -c /data/flume/conf/ -f /data/flume/conf/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
    done
;;
"stop")
    #遍历集群所有机器
    for host in hadoop131
    do
        echo --------------------  $host 日志收集 flume 停止 --------------------
        ssh $host "ps -ef | grep kafka_to_hdfs_log | grep -v grep | awk '{print \$2}' |xargs -n1 kill 9"
    done
;;
*)
    echo "Input Args Error..."
;;
esac

3)通过集群脚本 f2.sh 操作

f2.sh start

flume启动命令说明

以下是flume启动命令的常用参数:

参数默认值说明
--name-n无默认值,必须指定指定启动的Flume Agent的名称。这个名称应该与配置文件中定义的agent的名称一致。
--conf-c无默认值,通常设置为flume配置文件的目录指定Flume配置文件的目录。这个目录下应该包含flume的配置文件。
--conf-file-f无默认值,必须指定指定具体的Flume配置文件名。这个文件应该包含了Flume Agent的配置信息。
--zkConnString-z无默认值当Flume配置使用Zookeeper进行集群管理时,指定Zookeeper的连接字符串。格式为主机名:端口号,多个节点用逗号分隔。
-Dflume.root.logger无默认值,通常设置为INFO,console设置Flume的日志级别和输出方式。例如,INFO,console表示日志级别为INFO,并输出到控制台。也可以设置为输出到日志文件。
--no-reload-conffalse如果设置为true,那么Flume将不会重新加载配置文件,即使配置文件发生了变化。
--help-h无默认值显示帮助信息,列出所有可用的启动参数。

需要注意的是,Flume的启动参数可能会因版本和具体的使用场景而有所不同。上表中的参数是最常用的,但并不是所有的参数都在所有版本的Flume中都可用。在实际使用时,建议查阅对应版本的Flume官方文档或使用flume-ng agent --help命令查看可用的参数列表。

3、验证日志采集通路

1)在指定的log目录中生成日志文件

cat app.log >> /data/applog/log/app_test.log

2)打开Hadoop查看数据,http://192.168.56.131:9870/

Hadoop在前面已经安装过

在这里插入图片描述

参考

  • https://flume.apache.org/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/444258.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【深度学习笔记】6_10 双向循环神经网络bi-rnn

注:本文为《动手学深度学习》开源内容,部分标注了个人理解,仅为个人学习记录,无抄袭搬运意图 6.10 双向循环神经网络 之前介绍的循环神经网络模型都是假设当前时间步是由前面的较早时间步的序列决定的,因此它们都将信…

【Leetcode打卡】递归回溯

【Leetcode打卡】递归回溯 784. 字母大小写全排列 class Solution { public:int find(string s,int pos){int ipos;while(i<s.size()){if(isalpha(s[i])){return i;}i;}return -1;}void turn(string& s,int pos){if(islower(s[pos])){s[pos]toupper(s[pos]);}else{s[po…

ChatGPT提示词工程:prompt和chatbot

ChatGPT Prompt Engineering for Developers 本文是 https://www.deeplearning.ai/short-courses/chatgpt-prompt-engineering-for-developers/ 这门课程的学习笔记。 ChatGPT提示词工程&#xff1a;prompt和chatbot 文章目录 ChatGPT Prompt Engineering for DevelopersWhat …

Linux下使用open3d进行点云可视化(.bin文件)

整个场景可视化&#xff1a; import numpy as np import open3d as o3ddef read_kitti_bin_point_cloud(bin_file):# 加载.bin文件point_cloud_np np.fromfile(bin_file, dtypenp.float32).reshape(-1, 4)# 仅使用X, Y, Z坐标&#xff0c;忽略反射率point_cloud_o3d o3d.geo…

代码训练LeetCode(6)编辑距离

代码训练(6)LeetCode之编辑距离 Author: Once Day Date: 2024年3月9日 漫漫长路&#xff0c;才刚刚开始… 全系列文章可参考专栏: 十年代码训练_Once-Day的博客-CSDN博客 参考文章: 72. 编辑距离 - 力扣&#xff08;LeetCode&#xff09;力扣 (LeetCode) 全球极客挚爱的技…

44岁「台偶一哥」成现实版「王子变青蛙」,育一子一女成人生赢家

电影《周处除三害》近日热度极高&#xff0c;男主角阮经天被大赞演技出色&#xff0c;最让人意想不到&#xff0c;因为该片在内地票房报捷&#xff0c;很多人走去恭喜另一位台湾男艺人明道&#xff0c;皆因二人出道时外貌神似&#xff0c;至今仍有不少人将两人搞混。 多年过去&…

RNN预测正弦时间点

import torch.nn as nn import torch import numpy as np import matplotlib matplotlib.use(TkAgg) from matplotlib import pyplot as plt # net nn.RNN(100,10) #100个单词&#xff0c;每个单词10个维度 # print(net._parameters.keys()) #序列时间点预测num_time_steps 50…

什么是RabbitMQ的死信队列

RabbitMQ的死信队列&#xff0c;是一种用于处理消息&#xff0c;处理失败或无法路由的消息的机制。它允许将无法被正常消费的消息重新路由到另一个队列&#xff0c;以便稍后进行进一步的处理&#xff0c;分析或排查问题。 当消息队列里面的消息出现以下几种情况时&#xff0c;就…

ChatGPT 控制机器人的基本框架

过去的一年&#xff0c;OpenAI的chatGPT将自然语言的大型语言模型&#xff08;LLM&#xff09;推向了公众的视野&#xff0c;人工智能AI如一夜春风吹遍了巴黎&#xff0c;全世界都为AI而疯狂。 OpenAI ChatGPT是一个使用人类反馈进行微调的预训练生成文本模型。不像以前的模型主…

每日一练:LeeCode-209、长度最小的子数组【滑动窗口+双指针】

每日一练&#xff1a;LeeCode-209、长度最小的子数组【滑动窗口双指针】 思路暴⼒解法滑动窗口 本文是力扣 每日一练&#xff1a;LeeCode-209、长度最小的子数组【滑动窗口双指针】 学习与理解过程&#xff0c;本文仅做学习之用&#xff0c;对本题感兴趣的小伙伴可以出门左拐 L…

《剑指 Offer》专项突破版 - 面试题 76 : 数组中第 k 大的数字(C++ 实现)

目录 详解快速排序 面试题 76 : 数组中第 k 大的数字 详解快速排序 快速排序是一种非常高效的算法&#xff0c;从其名字可以看出这种排序算法最大的特点是快。当表现良好时&#xff0c;快速排序的速度比其他主要对手&#xff08;如归并排序&#xff09;快 2 ~ 3 倍。 快速排…

力扣---腐烂的橘子

题目&#xff1a; bfs思路&#xff1a; 感觉bfs还是很容易想到的&#xff0c;首先定义一个双端队列&#xff08;队列也是可以的~&#xff09;&#xff0c;如果值为2&#xff0c;则入队列&#xff0c;我这里将队列中的元素定义为pair<int,int>。第一个int记录在数组中的位…

嵌入式驱动学习第二周——使用perf进行性能优化

前言 这篇博客来聊一聊如何使用perf进行性能优化。 嵌入式驱动学习专栏将详细记录博主学习驱动的详细过程&#xff0c;未来预计四个月将高强度更新本专栏&#xff0c;喜欢的可以关注本博主并订阅本专栏&#xff0c;一起讨论一起学习。现在关注就是老粉啦&#xff01; 目录 前言…

考研复习C语言初阶(4)+标记和BFS展开的扫雷游戏

目录 1. 一维数组的创建和初始化。 1.1 数组的创建 1.2 数组的初始化 1.3 一维数组的使用 1.4 一维数组在内存中的存储 2. 二维数组的创建和初始化 2.1 二维数组的创建 2.2 二维数组的初始化 2.3 二维数组的使用 2.4 二维数组在内存中的存储 3. 数组越界 4. 冒泡…

HarmonyOS NEXT应用开发之MpChart图表实现案例

介绍 MpChart是一个包含各种类型图表的图表库&#xff0c;主要用于业务数据汇总&#xff0c;例如销售数据走势图&#xff0c;股价走势图等场景中使用&#xff0c;方便开发者快速实现图表UI。本示例主要介绍如何使用三方库MpChart实现柱状图UI效果。如堆叠数据类型显示&#xf…

FPGA的配置状态字寄存器Status Register

目录 简介 状态字定义 Unknown Device/Many Unknow Devices 解决办法 一般原因 简介 Xilinx的FPGA有多种配置接口&#xff0c;如SPI&#xff0c;BPI&#xff0c;SeletMAP&#xff0c;Serial&#xff0c;JTAG等&#xff1b;如果从时钟发送者的角度分&#xff0c;还可以…

2024/3/10周报

文章目录 摘要Abstract文献阅读题目问题创新点方法Section1&#xff1a;运动员检测Section2&#xff1a;行为识别输入层隐藏层输出层 实验实验数据评估指标模型设置实验结果 深度学习模糊逻辑系统概念模糊化模糊规则解模糊 总结 摘要 本周阅读了一篇关于基于YOLO和深度模糊LST…

131.分割回文串

// 定义一个名为Solution的类 class Solution {// 声明一个成员变量&#xff0c;用于存储所有满足条件的字符串子序列划分结果List<List<String>> lists new ArrayList<>(); // 声明一个成员变量&#xff0c;使用LinkedList实现的双端队列&#xff0c;用于临…

【Objective -- C】—— 自引用计数

【Objective -- C】—— 自引用计数 一. 内存管理/自引用计数1.自引用计数2.内存管理的思考方式自己生成的对象&#xff0c;自己持有非自己生成的对象&#xff0c;自己也能持有不再需要自己持有的对象时释放无法释放非自己持有的对象 3.alloc/retain/release/dealloc实现4. aut…

力扣--滑动窗口438.找到字符串中所有字母异位词

思路分析&#xff1a; 使用两个数组snum和pnum分别记录字符串s和p中各字符出现的次数。遍历字符串p&#xff0c;统计其中各字符的出现次数&#xff0c;存储在pnum数组中。初始化snum数组&#xff0c;统计s的前m-1个字符的出现次数。从第m个字符开始遍历s&#xff0c;通过滑动窗…