【Flume】高级组件之Sink Processors及项目实践(Sink负载均衡和故障转移)

文章目录

  • 1. 组件简介
  • 2. 项目实践
    • 2.1 负载均衡
      • 2.1.1 需求
      • 2.1.2 配置
      • 2.1.3 运行
    • 2.2 故障转移
      • 2.2.1 需求
      • 2.2.2 配置
      • 2.2.3 运行

1. 组件简介

       Sink Processors类型包括这三种:Default Sink Processor、Load balancing Sink Processor和Failover Sink Processor。

  • Default Sink Processor是默认的,不用配置Sink group,就是咱们现在使用的这种最普通的形式,一个Channel后面接一个Sink的形式;
  • Load balancing Sink Processor是负载均衡处理器,一个Channle后面可以接多个Sink,这多个Sink属于一个Sink group,根据指定的算法进行轮询或者随机发送,减轻单个Sink的压力;
  • Failover Sink Processor是故障转移处理器,一个Channle后面可以接多个Sink,这多个Sink属于一个Sink group,按照Sink的优先级,默认先让优先级高的Sink来处理数据,如果这个Sink出现了故障,则用优先级低一点的Sink处理数据,可以保证数据不丢失。

2. 项目实践

2.1 负载均衡

使用Load balancing Sink Processor,即负载均衡处理器,一个Channle后面可以接多个Sink,这多个Sink属于一个Sink group,根据指定的算法进行轮询或者随机发送,减轻单个Sink的压力。其参数为:

  • processor.sinks:指定这个sink groups中有哪些sink,指定sink的名称,多个的话中间使用空格隔开即可;
  • processor.type:针对负载均衡的sink处理器,这里需要指定load_balance;
  • processor.selector:此参数的值内置支持两个,round_robin和random,round_robin表示轮询,按照sink的顺序,轮流处理数据,random表示随机。
  • processor.backoff:默认为false,设置为true后,故障的节点会列入黑名单,过一定时间才会再次发送数据,如果还失败,则等待时间是指数级增长,一直到达到最大的时间。如果不开启,故障的节点每次还会被重试发送,如果真有故障节点的话就会影响效率;
  • processor.selector.maxTimeOut:最大的黑名单时间,默认是30秒。

2.1.1 需求

采集指定端口的数据,并实现两个sink通道的负载均衡,采用轮询方式发送数据,为了展现实验效果,使用avro sink,每到一个event就写一次数据(默认是积攒接收一百个再写一次数据)。

2.1.2 配置

在这里插入图片描述
配置bigData01上的Flume Agent:

[root@bigdata01 apache-flume-1.9.0-bin]# cat conf/load-balancing.conf 
# agent的名称是a1
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 k2 
# 配置source组件 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 44444 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件,[为了方便演示效果,把batch-size设置为1] 
a1.sinks.k1.type=avro 
a1.sinks.k1.hostname=192.168.152.101 
a1.sinks.k1.port=41414 
a1.sinks.k1.batch-size = 1 
a1.sinks.k2.type=avro 
a1.sinks.k2.hostname=192.168.152.102 
a1.sinks.k2.port=41414 
a1.sinks.k2.batch-size = 1 
# 配置sink策略 
a1.sinkgroups = g1 
a1.sinkgroups.g1.sinks = k1 k2 
a1.sinkgroups.g1.processor.type = load_balance 
a1.sinkgroups.g1.processor.backoff = true 
a1.sinkgroups.g1.processor.selector = round_robin 

# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c1

配置bigData02上的Flume Agent:

[root@bigdata02 apache-flume-1.9.0-bin]# cat conf/load-balancing-101.conf 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/load_balance
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data101 
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

配置bigData03上的Flume Agent:

[root@bigdata03 apache-flume-1.9.0-bin]# cat conf/load-balancing-102.conf 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/load_balance
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data102 
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

2.1.3 运行

先启动bigdata02和bigdata03上的Agent,最后启动bigdata01上的Agent:

[apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing-101.conf -Dflume.root.logger=INFO,console
[apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing-102.conf -Dflume.root.logger=INFO,console
apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing.conf -Dflume.root.logger=INFO,console

向指定端口发送数据,模拟输入:

[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hehe
OK
haha
OK

查看HDFS中的保存的运行结果:

[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -ls -R / 
-rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txt
drwxr-xr-x   - root supergroup          0 2023-06-22 00:47 /load_balance
-rw-r--r--   2 root supergroup          6 2023-06-22 00:47 /load_balance/data101.1687366028115.log.tmp
-rw-r--r--   2 root supergroup          6 2023-06-22 00:47 /load_balance/data102.1687366024769.log.tmp
[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -cat /load_balance/data101.1687366028115.log.tmp 
haha
[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -cat /load_balance/data102.1687366024769.log.tmp
hehe

2.2 故障转移

使用Failover Sink Processor,即故障转移处理器,一个channle后面可以接多个sink,这多个sink属于一个sink group,按照sink的优先级,默认先让优先级高的sink来处理数据,如果这个sink出现了故障,则用优先级低一点的sink处理数据,可以保证数据不丢失。其参数为:

  • processor.type:针对故障转移的sink处理器,使用failover;
  • processor.priority.:指定sink group中每一个sink组件的优先级,默认情况下channel中的数据会被优先级比较高的sink取走;
  • processor.maxpenalty:sink发生故障之后,最大等待时间。

2.2.1 需求

实现两个sink的故障转移。

2.2.2 配置

在这里插入图片描述
配置bigData01上的Flume Agent:

[root@bigdata01 conf]# cat failover.conf 
# agent的名称是a1 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 k2 
# 配置source组件 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 44444 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件,[为了方便演示效果,把batch-size设置为1] 
a1.sinks.k1.type = avro 
a1.sinks.k1.hostname = 192.168.152.101 
a1.sinks.k1.port = 41414 
a1.sinks.k1.batch-size = 1 
a1.sinks.k2.type = avro 
a1.sinks.k2.hostname = 192.168.152.102 
a1.sinks.k2.port = 41414 
a1.sinks.k2.batch-size = 1 
# 配置sink策略 
a1.sinkgroups = g1 
a1.sinkgroups.g1.sinks = k1 k2 
a1.sinkgroups.g1.processor.type = failover 
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 
a1.sinks.k2.channel = c1

配置bigData02上的Flume Agent:

[root@bigdata02 conf]# cat failover-101.conf 
# agent的名称是a1 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/failover 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data101 
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

配置bigData03上的Flume Agent:

[root@bigdata03 conf]# cat failover-102.conf 
# agent的名称是a1 
# 指定source组件、channel组件和Sink组件的名称 
a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 
# 配置source组件 
a1.sources.r1.type = avro 
a1.sources.r1.bind = 0.0.0.0 
a1.sources.r1.port = 41414 
# 配置channel组件 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
# 配置sink组件[为了区分两个sink组件生成的文件,修改filePrefix的值] 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/failover 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
a1.sinks.k1.hdfs.filePrefix = data102
a1.sinks.k1.hdfs.fileSuffix = .log 
# 把组件连接起来 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1

2.2.3 运行

  1. 先启动bigdata02和bigdata03上的Agent,最后启动bigdata01上的Agent:
bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover-101.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover-102.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover.conf -Dflume.root.logger=INFO,console
  1. 向指定端口发送数据,模拟输入两个数据test1test2
[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
test1
OK
test2
OK
  1. 查看HDFS中的保存的运行结果:

因为bigdata03的优先级高,可以看到两个数据都是由其写入。

[root@bigdata01 hadoop-3.3.5]# hdfs dfs -ls -R /
-rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txt
drwxr-xr-x   - root supergroup          0 2023-06-22 09:51 /failover
-rw-r--r--   2 root supergroup          7 2023-06-22 09:51 /failover/data102.1687398676525.log.tmp
drwxr-xr-x   - root supergroup          0 2023-06-22 00:52 /load_balance
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data101.1687366028115.log
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data102.1687366024769.log
[root@bigdata01 hadoop-3.3.5]# hdfs dfs -cat /failover/data102.1687398676525.log.tmp
test1
test2
  1. 关闭bigdata03,再输入测试数据test3
[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
test1
OK
test2
OK
test3
OK
  1. 查看HDFS中的保存的运行结果:

关闭bigdata03后,数据就由优先度较低的bigdata02写入,保证数据不丢失,达到故障转移的目的,此时若再次开启bigdata03,则数据就又会由优限度更高的bigdata03传输。

[root@bigdata01 hadoop-3.3.5]# hdfs dfs -ls -R /
-rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txt
drwxr-xr-x   - root supergroup          0 2023-06-22 09:54 /failover
-rw-r--r--   2 root supergroup          7 2023-06-22 09:54 /failover/data101.1687398846336.log.tmp
-rw-r--r--   2 root supergroup         14 2023-06-22 09:53 /failover/data102.1687398676525.log
drwxr-xr-x   - root supergroup          0 2023-06-22 00:52 /load_balance
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data101.1687366028115.log
-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data102.1687366024769.log
[root@bigdata01 hadoop-3.3.5]# hdfs dfs -cat /failover/data101.1687398846336.log.tmp
test3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/30838.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

kotlin学习(二)泛型、函数、lambda、扩展、运算符重载

文章目录 泛型&#xff1a;in、out、where型变&#xff08;variance&#xff09;不变&#xff08;Invariant&#xff09;协变&#xff08;Covariant&#xff09;Java上界通配符<? extends T>Kotlin的关键词 outUnsafeVariance 逆变&#xff08;Contravariant&#xff09…

GBASE金融信创优秀解决方案鉴赏 · 核心业务系统数据库解决方案

为此&#xff0c;实验室特别开设金融信创优秀解决方案专栏&#xff0c;集中展示优秀成果。现在&#xff0c;让我们一起来领略下GBASE的优秀解决方案吧~可点击阅读原文 →《金融信创优秀解决方案--核心业务系统数据库解决方案》。 核心业务系统数据库解决方案 方案简介 随着技…

C++:虚函数

C面向对象的三个特性&#xff0c;封装继承多态。在继承的关系中&#xff0c;所有的东西都可以被继承下来&#xff0c;如数据可以被继承下来在内存&#xff0c;而函数的继承则是继承调用权。 虚函数主要是通过虚函数表来实现&#xff0c;每个类都有自己的虚表&#xff0c;当你创…

A fight among three “三国”混战 | 经济学人20230520版社论双语精翻

《经济学人》2023年5月20日封面&#xff08;社论&#xff09;文章精翻&#xff1a;《全球支付系统的“三国”混战》&#xff08;A fight among three&#xff09; A fight among three “三国”混战 The fight over the future of global payments 全球支付的未来之争 Digital …

【STM32】软件I2C(支持多字节)

I2C简介 I2C总线是一种串行、半双工的总线&#xff0c;主要用于近距离、低速的芯片之间的通信。I2C总线有两根双向的信号线&#xff0c;一根数据线SDA用于收发数据&#xff0c;一根时钟线SCL用于通信双方时钟的同步。 在一个i2c通讯总线中&#xff0c;可连接多个i2c通讯设备&a…

Go-unsafe详解

Go语言unsafe包 Go语言的unsafe包提供了一些底层操作的函数&#xff0c;这些函数可以绕过Go语言的类型系统&#xff0c;直接操作内存。虽然这些函数很强大&#xff0c;但是使用不当可能会导致程序崩溃或者产生不可预料的行为。因此&#xff0c;使用unsafe包时必须小心谨慎。 …

吴恩达ChatGPT《Prompt Engineering》笔记

ChatGPT 提示词工程师教程 1. 课程介绍 1.1 ChatGPT 相关术语 LLM&#xff1a;Large Language Model&#xff0c;大语言模型 Instruction Tuned LLM&#xff1a;经过指令微调的大语言模型 Prompt&#xff1a;提示词 RLHF&#xff1a;Reinforcement Learning from Human F…

机器视觉初步6:图像分割专题

图像分割是一种图像处理技术&#xff0c;它将图像划分为具有相似特征的区域。常见的图像分割方法包括阈值分割、边缘分割、区域分割、基于阈值的方法、基于边缘的方法、基于区域的方法、聚类分割、基于图论的方法、基于深度学习的方法。 文章目录 1.阈值分割2.边缘分割3.区域分…

CloFormer实战:使用CloFormer实现图像分类任务(二)

文章目录 训练部分导入项目使用的库设置随机因子设置全局参数图像预处理与增强读取数据设置Loss设置模型设置优化器和学习率调整算法设置混合精度&#xff0c;DP多卡&#xff0c;EMA定义训练和验证函数训练函数验证函数调用训练和验证方法 运行以及结果查看测试热力图可视化展示…

kali常用ping命令探测

ping 判断目标主机网络是否畅通 ping $ip -c 1其中&#xff0c;-c 1 表示发送一个数据包 traceroute 跟踪路由 traceroute $domain ARPING 探测局域网IP ARP&#xff08;地址解析协议&#xff09;&#xff0c;将IP地址转换成MAC地址arping $ip -c 1 #!/bin/ bash######…

基于matlab使用先导校准来补偿阵列不确定性(附源码)

一、前言 此示例说明如何使用先导校准来提高天线阵列在存在未知扰动时的性能。 原则上&#xff0c;可以轻松设计理想的均匀线性阵列&#xff08;ULA&#xff09;来执行阵列处理任务&#xff0c;例如波束成形或到达方向估计。在实践中&#xff0c;没有理想的阵列。例如&#xff…

初识轻量级分布式任务调度平台 xxl-job

文章目录 前言xxl-job的目录结构项目依赖 (父 pom.xml)xxl-job-admin 启动xxl-job-executor-sample (项目使用示例)xxl-job-executor-sample-frameless : 不使用框架的接入方式案例xxl-job-executor-sample-springboot : springboot接入方案案例 xxl-job执行器器启动流程分析调…

linux_centos7.9/ubuntu20.04_下载镜像及百度网盘分享链接

1、镜像下载站点 网易开源镜像&#xff1a;http://mirrors.163.com/ 搜狐开源镜像&#xff1a;http://mirrors.sohu.com/ 阿里开源镜像&#xff1a;https://developer.aliyun.com/mirror/ 首都在线科技股份有限公司&#xff1a;http://mirrors.yun-idc.com/ 常州贝特康姆软件技…

C++【红黑树】

✨个人主页&#xff1a; 北 海 &#x1f389;所属专栏&#xff1a; C修行之路 &#x1f383;操作环境&#xff1a; Visual Studio 2019 版本 16.11.17 文章目录 &#x1f307;前言&#x1f3d9;️正文1、认识红黑树1.1、红黑树的定义1.2、红黑树的性质1.3、红黑树的特点 2、红黑…

三分钟学习一个python小知识1-----------我的对python的基本语法的理解

文章目录 一、变量定义二、数据类型三、条件语句四、循环语句五、函数定义总结 一、变量定义 在Python中&#xff0c;使用等号&#xff08;&#xff09;进行变量的定义&#xff0c;并不需要声明变量的类型&#xff0c;Python会自动根据赋值的数据类型来判断变量的类型&#xf…

chatgpt赋能python:Python构造和析构:介绍和实例

Python 构造和析构&#xff1a;介绍和实例 当你编写 Python 程序时&#xff0c;你可能会注意到一个名为构造函数和析构函数的概念。这些函数可以在创建和删除一个对象时自动执行一些操作。本文将深入介绍 Python 中的构造和析构概念。 构造函数 Python 使用一种名为 __init_…

戴尔U盘重装系统Win10步骤和详细教程

戴尔电脑深受用户们的喜欢&#xff0c;那么如何使用U盘给戴尔电脑重装Win10系统呢&#xff0c;这让很多用户都犯难了&#xff0c;以下就是小编给大家分享的戴尔U盘重装系统Win10步骤和详细教程&#xff0c;按照这个教程操作&#xff0c;就能顺利完成戴尔U盘重装Win10系统的操作…

3、互联网行业及产品经理分类

上一篇文章&#xff1a;2、产品经理的工作内容_阿杰学编程的博客-CSDN博客 1、产品经理分类 我们把产品经理划分成这样两个大的类型&#xff0c;一个是传统行业的&#xff0c;一个是互联网行业的。这个简单了解一下就行。 这个里面会发现绝大多数也是体育劳动&#xff0c;你比…

Nautilus Chain:模块化Layer3的先行者

“模块化特性的 Nautilus Chain 正在成为 Layer3 的早期定义者之一&#xff0c;并有望进一步推动区块链更广泛的应用与实践 ” 自以太坊创始人 Vitalik Buterin 在去年提出 Layer3 的概念后&#xff0c;行业始终对“Layer3”进行讨论&#xff0c;并期望推动该概念&#xff0c;从…

微服务框架

流量入口Nginx 在上图中可以看到&#xff0c;Nginx作为整个架构的流量入口&#xff0c;可以理解为一个外部的网关&#xff0c;它承担着请求的路由转发、负载均衡、动静分离等功能。作为一个核心入口点&#xff0c;Nginx肯定要采用多节点部署&#xff0c;同时通过keepalived来实…