【Flume Kafaka实战】Using Kafka with Flume

一 目标

在Cloudera Manager中创建两个Flume的Agent,Agent1从local file中获取内容,写入到kafka的队列中。Agent2以Agent1的sink作为source,将数据从kafka中读取出来,写入到HDFS中。

二 实战

2.1 Kafka Sink

第一步,在Cloudera Manager中安装Flume,安装时指定两个Agent。这一步很简单。

第二步,创建一个新Role Group。默认情况下,所有的Agent都处于一个叫Agent Default Group的角色组中,处于同一角色组中的Agent共享相同的配置。但是在我们这个例子中,两个Agent要完成不同的工作,需要不同的配置。所有新建一个Role Group,并把其中一个Agent移到到这个新的Group中,如下图所示。

第三步,分别编辑两个Agent的配置文件,我的第一个Agent名字为file2Kafka,配置文件内容如下。不难看出,这个配置的source就是去tail一个本地文件,然后写入到kafka的消息队列中。

即:Kafka Sink

# Name the components on this agent
file2Kafka.sources = file2Kafka_source
file2Kafka.sinks = file2Kafka_sink
file2Kafka.channels = file2Kafka_channel

# Describe/configure the source
file2Kafka.sources.file2Kafka_source.type = exec
file2Kafka.sources.file2Kafka_source.command = tail -F /home/demo/flume-exec.txt

# Describe the sink
file2Kafka.sinks.file2Kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
# topic前不加kafka
file2Kafka.sinks.file2Kafka_sink.topic = flumetest
file2Kafka.sinks.file2Kafka_sink.kafka.bootstrap.servers= slave1:9092,slave2:9092
file2Kafka.sinks.file2Kafka_sink.kafka.flumeBatchSize= 20

# Use a channel which buffers events in memory
file2Kafka.channels.file2Kafka_channel.type = memory
file2Kafka.channels.file2Kafka_channel.capacity = 1000
file2Kafka.channels.file2Kafka_channel.transactionCapacity = 1000

# Bind the source and sink to the channel
file2Kafka.sources.file2Kafka_source.channels = file2Kafka_channel
file2Kafka.sinks.file2Kafka_sink.channel = file2Kafka_channel

2.2 Kafka Source

第二Agent的名字是kafka2Hdfs,配置文件如下。这个配置的内容就是把Agent1中写到kafka的数据读出来,然后写入到HDFS中。注意hdfs.path这个配置,由于在Cloudera Manager中,Flume知道HDFS相关的配置,所以无需去加入hdfs://my-cluster这样的协议前缀。

# Name the components on this agent
kafka2Hdfs.sources = kafka2Hdfs_source
kafka2Hdfs.sinks = kafka2Hdfs_sink
kafka2Hdfs.channels = kafka2Hdfs_channel

# Describe/configure the source
kafka2Hdfs.sources.kafka2Hdfs_source.type = org.apache.flume.source.kafka.KafkaSource
kafka2Hdfs.sources.kafka2Hdfs_source.batchSize = 10
kafka2Hdfs.sources.kafka2Hdfs_source.batchDurationMillis = 1000
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.bootstrap.servers = slave1:9092,slave2:9092
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.topics = flumetest
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.consumer.group.id = flume

# Describe the sink
kafka2Hdfs.sinks.kafka2Hdfs_sink.type = hdfs
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.path = /flume/
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.fileType = DataStream
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.filePrefix=sxt
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.rollCount=0
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.rollInterval=0

# Use a channel which buffers events in memory
kafka2Hdfs.channels.kafka2Hdfs_channel.type = memory
kafka2Hdfs.channels.kafka2Hdfs_channel.capacity = 1000
kafka2Hdfs.channels.kafka2Hdfs_channel.transactionCapacity = 100

# Bind the source and sink to the channel
kafka2Hdfs.sources.kafka2Hdfs_source.channels = kafka2Hdfs_channel
kafka2Hdfs.sinks.kafka2Hdfs_sink.channel = kafka2Hdfs_channel

整个配置完成之后,Cloudera Manager中的界面如下图:

在运行中可能会出现一些目录读写的权限问题,需要去修改hdfs中相关目录的权限。比如我的配置中,数据是写到/flume这个目录下的,这个目录我是用root用户去创建的,但flume运行是使用一个叫flume的用户名来运行的,所以用hdfs dfs -chmod 777 /flume把这个目录的读写权限放开了。

这是一个例子,主要演示如何在cloudera manager中把两个flume的agent串联在一起使用。在现实的生产中,如果需要把一个文本数据通过kakfa写入到hdfs中,更合理的做法是使用一个agent,把kafka作为channel来使用。具体可以参考https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html

2.3 Kafka Channel

# Name the components on this agent
kafkaCh.sources = src_1_file
kafkaCh.channels = ch_1_kafka
kafkaCh.sinks = sink_1_hdfs

# Describe/configure the source
kafkaCh.sources.src_1_file.type = exec
kafkaCh.sources.src_1_file.command = tail -F /home/demo/flume-exec.txt

# Define a kafka channel
kafkaCh.channels.ch_1_kafka.type = org.apache.flume.channel.kafka.KafkaChannel
kafkaCh.channels.ch_1_kafka.kafka.bootstrap.servers = slave1:9092,slave2:9092
kafkaCh.channels.ch_1_kafka.kafka.topic = kafka_channel
kafkaCh.channels.ch_1_kafka.kafka.consumer.group.id = flume-consumer

# Describe the sink
kafkaCh.sinks.sink_1_hdfs.type = hdfs
kafkaCh.sinks.sink_1_hdfs.hdfs.path = /flume/kafka/channel
kafkaCh.sinks.sink_1_hdfs.hdfs.fileType = DataStream
kafkaCh.sinks.sink_1_hdfs.hdfs.filePrefix=sxt
kafkaCh.sinks.sink_1_hdfs.hdfs.rollCount=0
kafkaCh.sinks.sink_1_hdfs.hdfs.rollInterval=0

# Bind the source and sink to the channel
kafkaCh.sources.src_1_file.channels = ch_1_kafka
kafkaCh.sinks.sink_1_hdfs.channel = ch_1_kafka

将上面两个Agent放在一个Agent中,用Kafka Channel实现。

注意:hdfs.path 必须存在,且有权限进行操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/886379.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

828华为云征文|部署多功能集成的协作知识库 AFFiNE

828华为云征文|部署多功能集成的协作知识库 AFFiNE 一、Flexus云服务器X实例介绍二、Flexus云服务器X实例配置2.1 重置密码2.2 服务器连接2.3 安全组配置2.4 Docker 环境搭建 三、Flexus云服务器X实例部署 AFFiNE3.1 AFFiNE 介绍3.2 AFFiNE 部署3.3 AFFiNE 使用 四、…

Nginx基础详解5(nginx集群、四七层的负载均衡、Jmeter工具的使用、实验验证集群的性能与单节点的性能)

续Nginx基础详解4(location模块、nginx跨域问题的解决、nginx防盗链的设计原理及应用、nginx模块化解剖)-CSDN博客 目录 14.nginx集群(前传) 14.1如何理解单节点和集群的概念 14.2单节点和集群的比较 14.3Nginx中的负载均衡…

StopWath,apache commons lang3 包下的一个任务执行时间监视器的使用

StopWath是 apache commons lang3 包下的一个任务执行时间监视器&#xff0c;与我们平时常用的秒表的行为比较类似&#xff0c;我们先看一下其中的一些重要方法&#xff1a; <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --> <dependen…

过渡到内存安全语言:挑战和注意事项

开放源代码安全基金会 ( OpenSSF )总经理 Omkhar Arasaratnam 讨论了内存安全编程语言的演变及其为应对 C 和 C 等语言的局限性而出现的现象。 内存安全问题已存在五十多年&#xff0c;它要求程序员从内存管理任务中抽离出来。 Java、Rust、Python 和 JavaScript 等现代语言通…

八大排序详解

文章目录 目录1. 排序的概念及其运用1.1 排序的概念1.2 排序的运用1.3 常见的排序算法 2. 常见排序算法的实现2.1 插入排序2.1.1 基本思想2.1.2 直接插入排序2.1.3 希尔排序 2.2 选择排序2.2.1 基本思想2.2.2 直接选择排序2.2.3 堆排序 2.3 交换排序2.3.1 基本思想2.3.2 冒泡排…

SSL VPN | Easyconnect下载安装使用 (详尽)

EasyConnect是一款远程连接工具&#xff0c;为用户提供简便、快捷的远程访问和控制解决方案。 目录 下载 安装 使用 卸载 下载 通过链接进入官网技术支持板块 深信服技术支持-简单、高效、自助化服务 (sangfor.com.cn)https://support.sangfor.com.cn/ 选择软件下载 在安…

【C语言】指针篇 | 万字笔记

写在前面 在学习C语言过程&#xff0c;总有一个要点难点离不开&#xff0c;那就是大名鼎鼎的C语言指针&#xff0c;也是应为有指针的存在&#xff0c;使得C语言一直长盛不衰。因此不才把指针所学的所有功力都转换成这个笔记。希望对您有帮助&#x1f970;&#x1f970; 学习指…

【2025】基于Hadoop短视频流量数据分析与可视化(源码+文档+调试+答疑)

文章目录 前言一、主要技术&#xff1f;二、项目内容1.整体介绍&#xff08;示范&#xff09;2.运行截图3.部分代码介绍 总结更多项目 前言 随着我国经济的高速发展与人们生活水平的日益提高&#xff0c;人们对生活质量的追求也多种多样。尤其在人们生活节奏不断加快的当下&am…

unix中的exec族函数介绍

一、前言 本文将介绍unix中exec族函数&#xff0c;包括其作用以及使用方法。当一个进程调用fork函数创建一个新进程后&#xff0c;新进程可以直接执行原本正文段的其他内容&#xff0c;但更多时候&#xff0c;我们在一个进程中调用fork创建新的进程后&#xff0c;希望新进程能…

杭州电子科技大学《2019年+2023年861自动控制原理真题》 (完整版)

本文内容&#xff0c;全部选自自动化考研联盟的&#xff1a;《杭州电子科技大学861自控考研资料》的真题篇。后续会持续更新更多学校&#xff0c;更多年份的真题&#xff0c;记得关注哦~ 目录 2019年真题 2023年真题 Part1&#xff1a;2019年2023年完整版真题 2019年真题 2…

ubuntu 开启root

sudo passwd root#输入以下命令来给root账户设置密码 sudo passwd -u root#启用root账户 su - root#要登录root账户 root 开启远程访问&#xff1a; 小心不要改到这里了&#xff1a;sudo nano /etc/ssh/ssh_config 而是&#xff1a;/etc/ssh/sshd_config sudo nano /etc/ssh…

猫猫cpu的缓存

原题过长&#xff0c;放一下题目大意 题目大意 给你 m m m 个 1 1 1 到 n n n 之间的整数&#xff0c;你要找到若干个大小为固定的 k k k 的闭区间&#xff0c;使得所有这些数都在你找到的某个区间内。你需要最小化这些区间的并集的大小&#xff0c;并输出此大小。本题里…

基于单片机的两轮直立平衡车的设计

本设计基于单片机设计的两轮自平衡小车&#xff0c;其中机械部分包括车体、车轮、直流电机、锂电池等部件。控制电路板采用STC12C5A60S2作为主控制器&#xff0c;采用6轴姿态传感器MPU6050测量小车倾角&#xff0c;采用TB6612FNG芯片驱动电机。通过模块化编程完成了平衡车系统软…

calibre-web的翻译translations

calibre-web的翻译translations Windows安装calibre-web&#xff0c;Python-CSDN博客文章浏览阅读539次&#xff0c;点赞10次&#xff0c;收藏11次。pip install calibreweb报错&#xff1a;error: Microsoft Visual C 14.0 or greater is required. Get it with "Microso…

机器学习(5):机器学习项目步骤(二)——收集数据与预处理

1. 数据收集与预处理的任务&#xff1f; 为机器学习模型提供好的“燃料” 2. 数据收集与预处理的分步骤&#xff1f; 收集数据-->数据可视化-->数据清洗-->特征工程-->构建特征集和数据集-->拆分数据集、验证集和测试集 3. 数据可视化工作&#xff1f; a. 作用&…

深入理解 C 语言中的内存操作函数:memcpy、memmove、memset 和 memcmp

目录&#xff1a; 前言一、 memcpy 函数二、 memmove 函数三、 memset 函数四、 memcmp 函数总结 前言 在 C 语言中&#xff0c;内存操作函数是非常重要的工具&#xff0c;它们允许我们对内存进行直接操作&#xff0c;从而实现高效的数据处理。本文将深入探讨四个常用的内存操…

DC00022基于ssm高校社团管理系统web社团管理系统java web+MySQL项目web程序设计

1、项目功能演示 DC00022基于ssm高校社团管理系统web社团管理系统java web项目MySQL 2、项目功能描述 社团管理系统分为普通用户、管理员 2.1 普通用户功能 01 系统登录、系统注册 02 系统首页、新闻公告、规章制度、社团活动、互动交流 03 修改密码 04 个人信息修改 05 我的…

Win10鼠标总是频繁自动失去焦点-非常有效-重启之后立竿见影

针对Win10鼠标频繁自动失去焦点的问题&#xff0c;可以尝试以下解决方案&#xff1a; 一、修改注册表&#xff08;最有效的方法-重启之后立竿见影&#xff09; 打开注册表编辑器&#xff1a; 按下WindowsR组合键&#xff0c;打开运行窗口。在运行窗口中输入“regedit”&#x…

ECP 集成字段非必填配置

导读 INTRODUCTION 非必填设置&#xff1a;ECP主数据同步的时候&#xff0c;经常遇到一个问题&#xff0c;就是ECP报错&#xff0c;但是这个字段两边的ecp顾问与sf顾问都觉得没实际意思&#xff0c;觉得没有传输的必要性&#xff0c;这个时候我们就可以考虑非必输的字段不必输…

【机器学习】集成学习——提升模型准确度的秘密武器

【机器学习】集成学习——提升模型准确度的秘密武器 1. 引言 集成学习&#xff08;Ensemble Learning&#xff09;是一种通过结合多个弱模型来提升整体预测准确性的技术。通过将多个模型的预测结果进行组合&#xff0c;集成学习在复杂任务中展现了极强的泛化能力。本文将探讨…