Flume的安装部署及常见问题解决

在这里插入图片描述

1.安装地址

(1) Flume官网地址:http://flume.apache.org/
(2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
(3)下载地址:http://archive.apache.org/dist/flume/

2.安装部署

注意:前提是配置好java环境

(1)将apache-flume-1.10.1-bin.tar.gz上传到linux的/opt/package/目录下
在这里插入图片描述
(2)解压apache-flume-1.10.1-bin.tar.gz到/opt/software/目录下

[zhangflink@9wmwtivvjuibcd2e package]$ tar -zxvf apache-flume-1.10.1-bin.tar.gz -C /opt/software/

(3)修改apache-flume-1.10.1-bin的名称为flume

[zhangflink@9wmwtivvjuibcd2e software]$ mv apache-flume-1.10.1-bin/ flume

(4)修改conf目录下的log4j2.xml配置文件,配置日志文件路径

修改日志路径

<Property name="LOG_DIR">/opt/module/flume/log</Property>

在这里插入图片描述

 <AppenderRef ref="Console" />

在这里插入图片描述

编写配置文件

官网翻译成中文的网站,可以参考这个网站进行编写配置文件:https://flume.liyifeng.org/

在这里插入图片描述

(1).Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。
Agent 主要有三个组成部分,Source、Channel、Sink。
(2).第一步:配置各个组件,根据你采集数据的需求进行选择对应的source,channels,sinks组件(直接去参考官网对应的组件功能选择即可)。
(3).第二步:连接各个组件,把采集端(Flume Sources),中间缓存(Flume Channels)和写入端(Flume Sinks)连接到一起。
(4).第三步:启动Agent。
bin目录下的flume-ng是Flume的启动脚本,启动时需要指定Agent的名字、配置文件的目录和配置文件的名称。

bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

-n后面就是agent的主节点,-f 后面就是配置文件的位置,其它不变。

常用案例

监听端口配置:

# example.conf: 一个单节点的 Flume 实例配置

# 配置Agent a1各个组件的名称

#Agent a1 的source有一个,叫做r1
a1.sources = r1    
#Agent a1 的sink也有一个,叫做k1
a1.sinks = k1      
#Agent a1 的channel有一个,叫做c1
a1.channels = c1   

# 配置Agent a1的source r1的属性
#使用的是NetCat TCP Source,这里配的是别名,Flume内置的一些组件都是有别名的,没有别名填全限定类名
a1.sources.r1.type = netcat       
#NetCat TCP Source监听的hostname,这个是本机
a1.sources.r1.bind = localhost    
#监听的端口
a1.sources.r1.port = 44444        

# 配置Agent a1的sink k1的属性

# sink使用的是Logger Sink,这个配的也是别名
a1.sinks.k1.type = logger         

# 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的

#channel的类型是内存channel,顾名思义这个channel是使用内存来缓冲数据
a1.channels.c1.type = memory                
#内存channel的容量大小是1000,注意这个容量不是越大越好,配置越大一旦Flume挂掉丢失的event也就越多
a1.channels.c1.capacity = 1000              
#source和sink从内存channel每次事务传输的event数量
a1.channels.c1.transactionCapacity = 100    

# 把source和sink绑定到channel上

#与source r1绑定的channel有一个,叫做c1
a1.sources.r1.channels = c1       
#与sink k1绑定的channel有一个,叫做c1
a1.sinks.k1.channel = c1         

启动agent

 bin/flume-ng agent -n a1 -c conf -f conf/example.conf

在这里插入图片描述

监听文件写入HDFS里面

# file_chanel_hdfs.conf: 一个监听文件数据写入hdfs的实例配置

# 配置Agent a1各个组件的名称

#Agent a1 的source有一个,叫做r1
a1.sources = r1    
#Agent a1 的sink也有一个,叫做k1
a1.sinks = k1      
#Agent a1 的channel有一个,叫做c1
a1.channels = c1   

#监听文件的source,这个source支持断点续传可靠性更高
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/software/flume/text_log/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/software/flume/text_log/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /opt/software/flume/text_log/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

# 配置Agent a1的sink k1的属性

#写入HDFS的sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://10.0.3.141:8020/flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.timeZone = Asia/Shanghai


# 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的

#channel的类型是内存channel,顾名思义这个channel是使用内存来缓冲数据
a1.channels.c1.type = memory                
#内存channel的容量大小是1000,注意这个容量不是越大越好,配置越大一旦Flume挂掉丢失的event也就越多
a1.channels.c1.capacity = 1000              
#source和sink从内存channel每次事务传输的event数量
a1.channels.c1.transactionCapacity = 100    



# 把source和sink绑定到channel上

#与source r1绑定的channel有一个,叫做c1
a1.sources.r1.channels = c1       
#与sink k1绑定的channel有一个,叫做c1
a1.sinks.k1.channel = c1        

启动后可能遇到的问题及解决方法

在这里插入图片描述

原因是普通用户没有创建文件的权限,使用root权限启动即可

sudo bin/flume-ng agent -c conf -n a1 -f conf/file_chanel_hdfs.conf

在这里插入图片描述

原因是因为写入到hfds时使用到了时间戳来区分目录结构,flume的消息组件event在接受到之后在header中没有发现时间戳参数,导致该错误发生,有三种方法可以解决这个错误;
1、agent1.sources.source1.interceptors = t1
agent1.sources.source1.interceptors.t1.type = timestamp
为source添加拦截,每条event头中加入时间戳;(效率会慢一些)
2、agent1.sinks.sink1.hdfs.useLocalTimeStamp = true 为sink指定该参数为true
(如果客户端和flume集群时间不一致数据时间会不准确)
3、在向source发送event时,将时间戳参数添加到event的header中即可,header是一个map,添加时mapkey为timestamp(推荐使用)

我使用了第二种方法(如果实时链路中,一般数据中都会带有时间戳,要使用第一种方法,保证时间语义的准确性)。

在这里插入图片描述
在这里插入图片描述

遇到这个错误是sink配置语句中创建hdfs的路径报错

要和hadoop里面的core-site.xml 文件保持一致

<!-- 指定NameNode的地址 -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://flinkv1:8020</value>
</property>

在这里插入图片描述
此问题是由于操作hdfs的文件权限不足,修改hdfs文件权限即可。

[zhangflink@9wmwtivvjuibcd2e flume]$ hdfs dfs -ls /
Found 1 items
drwxr-xr-x   - zhangflink supergroup          0 2023-11-19 11:04 /flume
[zhangflink@9wmwtivvjuibcd2e flume]$ hdfs dfs -chmod 777 /flume
[zhangflink@9wmwtivvjuibcd2e flume]$ hdfs dfs -ls /
Found 1 items
drwxrwxrwx   - zhangflink supergroup          0 2023-11-19 11:04 /flume

启动成功数据写入

在这里插入图片描述
在这里插入图片描述

监听文件写入kafka里面

首先创建kafka的topic

[zhangflink@9wmwtivvjuibcd2e kafka]$ bin/kafka-topics.sh --bootstrap-server flinkv1:9092 --create --partitions 1 --replication-factor 3 --topic flumeData

编写配置文件:

# file_memory_kafka.conf: 一个监听文件数据写入hdfs的实例配置

# 配置Agent a1各个组件的名称

#Agent a1 的source有一个,叫做r1
a1.sources = r1    
#Agent a1 的sink也有一个,叫做k1
a1.sinks = k1      
#Agent a1 的channel有一个,叫做c1
a1.channels = c1   

#监听文件的source,这个source支持断点续传可靠性更高
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/software/flume/text_log/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/software/flume/text_log/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /opt/software/flume/text_log/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

# 配置Agent a1的sink k1的属性

#写入kafka的sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flumeData
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1


# 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的

#channel的类型是内存channel,顾名思义这个channel是使用内存来缓冲数据
a1.channels.c1.type = memory                
#内存channel的容量大小是1000,注意这个容量不是越大越好,配置越大一旦Flume挂掉丢失的event也就越多
a1.channels.c1.capacity = 1000              
#source和sink从内存channel每次事务传输的event数量
a1.channels.c1.transactionCapacity = 100    

# 把source和sink绑定到channel上

#与source r1绑定的channel有一个,叫做c1
a1.sources.r1.channels = c1       
#与sink k1绑定的channel有一个,叫做c1
a1.sinks.k1.channel = c1     

消费对应topic测试数据是否写入

[zhangflink@9wmwtivvjuibcd2e kafka]$ bin/kafka-console-consumer.sh --bootstrap-server flinkv1:9092 --from-beginning --topic flumeData

监听成功
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/164734.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

利用SD存储介质扩展MAXQ20000的非易失性数据存储空间

SD存储卡是一种可移动存储介质&#xff0c;通常用于相机、手机、平板电脑等设备中存储照片、视频、音乐等数据。SD存储卡的全称为Secure Digital Memory Card&#xff0c;是由SD Card Association制定的一种标准格式。它具有体积小、存储容量大、读写速度快、价格低廉等优点。目…

在线随机字符串生成工具

具体请前往&#xff1a;在线随机字符串生成器--通过该工具生成动态复杂随机密码,随机字符串等&#xff0c;加密盐等

PC业务校验(已有该名称,已有该编码)

rules: {name: [{ required: true, message: "部门名称不能为空", trigger: "blur" },{min: 2,max: 10,message: "部门名称的长度为2-10个字符",trigger: "blur",},{trigger: "blur",validator: async (rule, value, callba…

命令执行相关函数及各类命令执行绕过技巧

相关函数 &#xff08;命令注入&#xff09; 命令执行的绕过

DSP2335的LED工程笔记

首先是确定时钟 在技术参考中&#xff0c;找到时钟章节 只能观察每个寄存器&#xff0c;才能看到寄存器控制那个外设的时钟 第二找到对应GPIO以及寄存器&#xff1b; 在我板子里面的原理图是 但是TI的提供的库函数是分ABC的&#xff0c;刚开始就不知道怎麽分。GPIO68到GPIO6…

4.Pod详解【四】

文章目录 4. Pod详解4.1 Pod介绍4.1.1 Pod结构4.1.2 Pod定义 4.2 Pod配置4.2.1 基本配置4.2.2 镜像拉取4.2.3 启动命令4.2.4 环境变量4.2.5 端口设置4.2.6 资源配额 4.3 Pod生命周期4.3.1 创建和终止4.3.2 初始化容器4.3.3 钩子函数4.3.4 容器探测4.3.5 重启策略 4.4 Pod调度4.…

支持4KHz回报还能无线充电,简约不简单的雷柏VT3S游戏鼠标上手

这两年国产鼠标的表现很让人惊喜&#xff0c;不仅外观做工越来越精细&#xff0c;配置也越来越强大&#xff0c;当然价格依然亲民。现在很容易找到一款搭载高端传感器、响应速度快、电池续航时间长&#xff0c;并且还支持无线充电的全能型鼠标。 我之前用雷柏的鼠标比较多&…

Hive 定义变量 变量赋值 引用变量

Hive 定义变量 变量赋值 引用变量 变量 hive 中变量和属性命名空间 命名空间权限描述hivevar读写用户自定义变量hiveconf读写hive相关配置属性system读写java定义额配置属性env只读shell环境定义的环境变量 语法 Java对这个除env命名空间内容具有可读可写权利&#xff1b; …

【STM32】ADC(模拟/数字转换)

一、ADC的简介 1.什么是ADC 1&#xff09;将【电信号】-->【电压】-->【数字量】 2&#xff09;ADC可以将引脚上连续变化的模拟电压转换为内存中存储的数字量&#xff0c;建立模拟电路到数字电路的桥梁。 3&#xff09;12位逐次逼近型ADC&#xff0c;1us转换时间&#xf…

内容运营策略:个性化推荐

一、推荐系统流程 典型的推荐系统包括3个部分&#xff0c;即召回层&#xff08; Recall )、排序层&#xff08; Rank )和重排层&#xff08; ReRank )。 1&#xff0e;召回层&#xff08; Recall ) 召回层主要是从全量库中首先获取用户可能感兴趣的候选集&#xff0c;是推荐系…

【Qt开发流程之】窗口部件

qt类关系图 创建Qt项目时&#xff0c;发现提供的窗体默认 基类有&#xff1a;QMainWindow、QDialog、QWidget这三种。 之后&#xff0c;你会发现&#xff0c;这3中窗体在UI交互中&#xff0c;用的也是最多的。 以下是Qt类关系图&#xff1a; 基础窗口控件QWidget 由上图可以…

Swin Transformer

Swin Transformer 简介 下采样的层级设计&#xff0c;能够逐渐增大感受野。采用window进行注意力计算&#xff0c;极大降低了内存消耗&#xff0c;避免了整张图像尺寸大小的qkv矩阵滑窗操作包括不重叠的 local window&#xff0c;和重叠的 cross-window。不重叠的local window…

volatile 无法保证原子性 案例展示

volatile 无法保证原子性 在 Java 中&#xff0c;原子性是指一个操作是不可中断的&#xff0c;要么都执行要么都不执行。 但是 volatile 修饰的变量&#xff0c;只是保证了从主内存加载到工作内存的值是最新的&#xff0c;并不能保证对变量的操作是原子性的 变量的写操作和读…

关于缓存和数据库一致性问题的深入研究

如何保证缓存和数据库一致性&#xff0c;这是一个老生常谈的话题了。 但很多人对这个问题&#xff0c;依旧有很多疑惑&#xff1a; 到底是更新缓存还是删缓存&#xff1f;到底选择先更新数据库&#xff0c;再删除缓存&#xff0c;还是先删除缓存&#xff0c;再更新数据库&…

Spring Boot中实现支付宝、微信和银联支付的功能

Spring Boot中实现支付宝、微信和银联支付的功能 在Spring Boot中实现支付宝、微信和银联支付的功能&#xff0c;通常需要使用它们各自的SDK&#xff08;Software Development Kit&#xff09;。以下是一个简单的示例代码&#xff0c;演示了如何在Spring Boot项目中集成支付宝…

操作系统:操作系统教程第六版(骆斌、葛季栋、费翔林)习题一计算机操作系统概述

目录 前言1. 思考题2. 应用题 前言 本系列文章是针对操作系统教程第六版&#xff08;骆斌、葛季栋、费翔林&#xff09;的习题解答&#xff0c;其中简答题部分为博主自己搜索整理的&#xff0c;错漏之处在所难免。应用题部分有答案为依据。 1. 思考题 &#xff08;1&#xf…

vscode设置代码模板

一键生成vue3模板代码 效果演示 输入vue3 显示快捷键 按回车键 一键生成自定义模板 实现方法 进入用户代码片段设置 选择片段语言 vue.json输入自定义的代码片段 prefix是触发的内容&#xff0c;按自己的喜好来就行&#xff1b; body是模板代码&#xff0c;写入自己需要的…

java springboot 在测试类中声明临时Bean对象

上文 java springboot在当前测试类中添加临时属性 不影响application和其他范围 中 我们讲了怎么在测试类中设置临时属性 但是 如果我们想设置临时的Bean呢&#xff1f; 其实做过几个项目的人都会理解 我们很多功能 需要一些第三方bean才能完成 那么 我们可能存在需要用第三方b…

MySQL 的执行原理(四)

5.5. MySQL 的查询重写规则 对于一些执行起来十分耗费性能的语句&#xff0c;MySQL 还是依据一些规则&#xff0c;竭尽全力的把这个很糟糕的语句转换成某种可以比较高效执行的形式&#xff0c;这个过程也可以 被称作查询重写。 5.5.1. 条件化简 我们编写的查询语句的搜索条件…

SpringMVC总结

SpringMVC简介 简介 SpringMVC是一款基于Servlet API构建的原始Web框架&#xff0c;从一开始就包含在Spring Framework中。正式名称“Spring Web MVC”来自其源模块的名称&#xff08; spring-webmvc &#xff09;&#xff0c;但它通常被称为“Spring MVC”。 调用流程 接收数…