【大数据学习 | flume】flume Sink Processors与拦截器Interceptor

1. Failover Sink Processor

故障转移处理器可以同时指定多个sink输出,按照优先级高低进行数据的分发,并具有故障转移能力。

需要修改第一台服务器agent

a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 

a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555

a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-2
a1.sinks.k1.port = 55555

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1

第二台和第三台agent编写如下:

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=avro
a1.sources.r1.bind=11.147.251.96
a1.sources.r1.port=55555

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

从后往前分别启动三台agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./avro.agent -Dflume.root.logger=INFO,console

测试给第一台flume发送数据,由于第三台节点的优先级高,所以第三台会打印数据到控制台

如果此时第三台flume宕机,则会将数据发送到优先级略低的第二台服务器上

2. Load balancing Sink Processor

负载平衡处理器提供了在多个sink负载平衡流量的能力。支持两种模式:round_robin and random 。round_robin 可以将数据负载均衡到多个sink上,random支持随机分发到不同的sink上。

需要修改第一台服务器agent

a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 

a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555

a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-2
a1.sinks.k1.port = 55555

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1

第二台和第三台agent编写如下:

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=avro
a1.sources.r1.bind=11.147.251.96
a1.sources.r1.port=55555

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=logger

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

从后往前分别启动三台agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./avro.agent -Dflume.root.logger=INFO,console

测试给第一台flume发送数据,第二台和第一台会随机收集数据

还支持轮询分发数据到两个sink中,这里的轮询是的是sink的轮询,不是event的轮询。

需要修改第一台服务器agent

a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 

a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555

a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-2
a1.sinks.k1.port = 55555

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1

3. Multiplexing Channel Selector

多路复用信道选择器,source是通过 event header 来决定传输到哪一个 channel。

比如:一个日志文件(多个系统的日志都在该文件中),根据日志中某个字段值,比如type=1,是系统A日志,sink to hdfs;type=2,是系统B日志,sink to kafka,此时就可以使用Flume多路复用,通过event header 来决定传输到哪个Channel

a1.sources=r1
a1.sinks=k1 k2 
a1.channels=c1  c2

a1.sources.r1.type=http
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444

a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.1= c1
a1.sources.r1.selector.mapping.2 = c2
a1.sources.r1.selector.default = c2

a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100

a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=100

a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555

a1.sinks.k2.type=avro
a1.sinks.k2.hostname = worke-2
a1.sinks.k2.port = 55555

a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

测试:

通过http协议并携带type头信息,测试type=1,type=2,type=3去往哪一台服务器

第二台服务器接收:

4. Interceptor拦截器

拦截器可以将flume收集到的event进行拦截,并使用对应的拦截器,对event进行简单修改,过滤。同时可以配置多个拦截器实现不同的功能,按照配置的先后顺序进行拦截处理。

常见的 Interceptor描述
timestamp Interceptor给event的头信息中添加时间戳
Static Interceptor给event的头信息中添加自定义键值
Host Interceptor给event的头信息中添加主机名或者ip信息
Search and Replace Interceptor拦截信息进行匹配和替换
Regex Filtering Interceptor拦截信息进行过滤

5. Timestamp Interceptor

此拦截器将插入事件标头,即它处理事件的时间(毫秒)到event中。

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.142.160
a1.sources.r1.port = 22222
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type=logger

#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

6. Host Interceptor

此拦截器器插入主机的主机名或IP地址。插入带有key为host标头,值是主机的主机名称或IP地址。

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.142.160
a1.sources.r1.port = 22222
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type=logger

#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

7. Static Interceptor

静态拦截器允许用户将带有静态值的静态标头附加到所有事件。

当前实现不允许同时指定多个标头。相反,用户可以使用多个静态拦截器,每个拦截器定义一个静态标头。

# Static Interceptor
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=55555

a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = name
a1.sources.r1.interceptors.i1.value = zhangsan

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type=logger

#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

8. Search and Replace Interceptor

这个拦截器基于Java正则表达式提供了简单的基于字符串的搜索和替换功能

# Search and Replace Interceptor
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=55555

a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type = search_replace
a1.sources.r1.interceptors.i1.searchPattern = [a-z]
a1.sources.r1.interceptors.i1.replaceString =*

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type=logger

#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

9. Regex Filtering Interceptor

该拦截器通过将event解释为文本并将文本与配置的正则表达式匹配来选择性地过滤事件。提供的正则表达式可用于包含事件或排除事件。

# Regex Filtering Interceptor
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=55555
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=regex_filter
a1.sources.r1.interceptors.i1.regex=^jp.*
a1.sources.r1.interceptors.i1.excludeEvents=true

#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type=logger

#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

10. Regex Extractor Interceptor

此拦截器使用指定的正则表达式提取正则表达式匹配组,并将匹配组作为标头附加到事件上。

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1

#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=55555
a1.sources.r1.interceptors = i1  
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = (^[a-zA-Z]*)\\s([0-9]*$)
a1.sources.r1.interceptors.i1.serializers = s1 s2
# key name
a1.sources.r1.interceptors.i1.serializers.s1.name = word
a1.sources.r1.interceptors.i1.serializers.s2.name = digital 
#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100

#定义sink
a1.sinks.k1.type=logger

#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

测试:

收到:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/917320.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

i春秋-登陆(sql盲注爆字段,.git缓存利用)

练习平台地址 竞赛中心 题目描述 先登陆再说 题目内容 就是一个登录框 测试登录 用户名:admin or 11# 密码:随便输 返回密码错误 用户名:随便输 密码:随便输 返回用户名不存在 这里就可以确定时一个bool盲注了 这里提供一个lik…

【爬虫实战】抓取某站评论

【爬虫实战】抓取某站评论 声明:本文中所有内容仅供学习交流使用,不用于其他任何目的,不提供完整代码,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关! 方式一:JS逆向request发…

【微软:多模态基础模型】(1)从专家到通用助手

欢迎关注【youcans的AGI学习笔记】原创作品 【微软:多模态基础模型】(1)从专家到通用助手 【微软:多模态基础模型】(2)视觉理解 【微软:多模态基础模型】(3)视觉生成 【微…

HarmonyOS ArkUI(基于ArkTS) 开发布局 (中)

HarmonyOS ArkUI(基于ArkTS) 开发布局 (上) 四 层叠布局 (Stack) 层叠布局(StackLayout)用于在屏幕上预留一块区域来显示组件中的元素,提供元素可以重叠的布局。层叠布局通过Stack容器组件实现位置的固定定位与层叠&…

港湾周评|鼎益丰“庞氏骗局”陨落

《港湾商业观察》李镭 在坊间有着“老鼎”之称的鼎益丰迎来了全面陨落,这丝毫不出人意料,毕竟在一年前就已经暴雷了。 同样,仙风道骨般神采的鼎益丰老板隋广义也迎来人生的至暗时刻,应了无间道那句话,出来混总是要还…

创建vue插件,发布npm

开发步骤:1.创建一个vue项目,2.开发一个组件。 3.注册成插件。 4.vite和package.json配置。5.发布到npm 1.创建一个vue项目 npm create vuelatest 生成了vue项目之后,得到了以下结构。 在src下创建个plugins目录。用于存放开发的…

用paramiko与SSH交互

# 导入paramiko库用于SSH连接,以及sys库用于处理命令行参数 import paramiko import sys# 定义一个函数send_command,用于发送命令到SSH服务器并打印输出结果 def send_command(ssh_client, cmd):# 使用exec_command方法执行命令,并获取输入、…

Go语言中AES加密算法的实现与应用

一、前言 在当今的软件开发领域,数据安全至关重要。加密技术作为保护数据机密性的关键手段,被广泛应用于各个方面。AES(高级加密标准)作为一种对称加密算法,以其高效性和安全性在众多加密场景中占据重要地位。本文将详…

CSS 语法规范

基本语法结构 CSS 的基本语法结构包含 选择器 和 声明块,两者共同组成 规则集。规则集可以为 HTML 元素设置样式,使页面结构和样式实现分离,便于网页的美化和布局调整。 CSS 规则集的结构如下: selector {property: value; }选择器(Selector) 选择器用于指定需要应用…

JavaScript 变量:理解基元和引用类型

两种基本类型的数据存储在 javascript 中的变量中:基元 和 引用类型。了解这两种类型之间的区别对于内存管理以及调节数据的共享、存储和更改至关重要。本文深入探讨了它们之间的区别,提供了现实世界的示例,并研究了有效处理这两种类型的方法…

【C++】—— stack和queue的模拟实现

前言 ​ stack 和 queue使用起来都非常简单,现在来模拟实现一下,理解其底层的原理。 ​ 在实现之前,应该知道,stack 和 queue 都是容器适配器,通过看官网文件也可以看出来;其默认的容器都是deque&#xff…

MuMu模拟器安卓12安装Xposed 框架

MuMu模拟器安卓12安装Xposed 框架 当开启代理后,客户端会对代理服务器证书与自身内置证书展开检测,只要检测出两者存在不一致的情况,客户端就会拒绝连接。正是这个原因,才致使我们既没有网络,又抓不到数据包。 解决方式: 通过xposed框架和trustmealready禁掉app里面校验…

Linux网络:守护进程

Linux网络:守护进程 会话进程组会话终端 守护进程setsiddaemon 在创建一个网络服务后,往往这个服务进程是一直运行的。但是对于大部分进程来说,如果退出终端,这个终端上创建的所有进程都会退出,这就导致进程的生命周期…

丹摩征文活动|丹摩平台一日游

目录 一.引言 二.平台简介 三.体验过程 1.注册与登录 (1).注册 (2).登录 2.界面介绍 (1).主界面 (2).任务监控界面 3.功能体验 (1).数据存储与管理 (2).数据预处理 (3).模型训练 (4).模型评估与优化 4.例子 (1).创建一个实例 (2).选择类型 1.实例配置 2.选择…

计算机网络中的数据包传输机制详解

💓 博客主页:瑕疵的CSDN主页 📝 Gitee主页:瑕疵的gitee主页 ⏩ 文章专栏:《热点资讯》 计算机网络中的数据包传输机制详解 计算机网络中的数据包传输机制详解 计算机网络中的数据包传输机制详解 引言 数据包的基本概念…

普通用户切换到 root 用户不需要输入密码配置(Ubuntu20)

在 Ubuntu 系统中,允许一个普通用户切换到 root 用户而不需要输入密码,可以通过以下步骤配置 sudo 设置来实现。 步骤: 打开 sudoers 文件进行编辑: 在终端中,输入以下命令来编辑 sudoers 文件: sudo visu…

入侵检测算法平台部署LiteAIServer视频智能分析平台行人入侵检测算法:科技守护安全的新篇章

在现代化城市快速发展的背景下,安全防范已成为城市管理与社会生活中不可或缺的一环。随着人工智能、大数据、物联网等技术的飞速发展,智能化安防系统正逐步改变着传统的安全防护模式,特别是在行人入侵检测领域,视频智能分析平台Li…

20.UE5UI预构造,开始菜单,事件分发器

2-22 开始菜单、事件分发器、UI预构造_哔哩哔哩_bilibili 目录 1.UI预构造 2.开始菜单和开始关卡 2.1开始菜单 2.2开始关卡 2.3将开始菜单展示到开始关卡 3.事件分发器 1.UI预构造 如果我们直接再画布上设计我们的按钮,我们需要为每一个按钮进行编辑&#x…

GoFly框架使用vue flow流程图组件说明

Vue Flow组件库是个高度可定制化的流程图组件,可用于工作流设计、流程图及图表编辑器、系统架构展示。可以根据自己的需求,设计独特的节点和边,实现个性化的流程图展示。这不仅增强了应用的视觉效果,也使得用户交互更为直观和流畅…

小白投资理财 - 看懂随机指标 KDJ

小白投资理财 - 看懂随机指标 KDJ 什么是 KDJKDJ 的组成计算 RSV计算 K 值计算 D 值J 值KDJ 的解读 KDJ 使用方式首先是 KD 线适合超买和超卖KD 线的黄金交叉线和死亡交叉线J 线J 线捉低点 KDJ 线注意点总结 身边总会有一位朋友在做选择上总是摇摆不定,做一个选择也…