Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比

章节内容

上一节完成了如下的内容:

  • 编写Agent Conf配置文件
  • 收集Hive数据
  • 汇聚到HDFS中
  • 测试效果

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。
之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。

  • 2C4G 编号 h121
  • 2C4G 编号 h122
  • 2C2G 编号 h123

在这里插入图片描述

文档推荐

除了官方文档以外,这里有一个写的很好的中文文档:
https://flume.liyifeng.org/

监控目录

业务需求

  • 想要监控指定目录 收集信息并上传到HDFS中

Source

选择 spooldir,因为 spooldir 能够保证数据不丢失,且能够进行断点续传,但是延迟较高,不能实时监控。

Channel

选择 memory

Sink

选择 HDFS

需要注意

  • 拷贝到 spool 目录下的文件 不可以再打开编辑
  • 无法监控子目录的文件夹变动
  • 被监控文件夹每500毫秒 扫描一次文件变动
  • 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步

配置文件

cd /opt/wzk/flume_test
vim flume_spooldir-hdfs.conf

我们需要写入如下内容

# Name the components on this agent
a3.sources = r3
a3.channels = c3
a3.sinks = k3
# Describe/configure the source
a3.sources.r3.type = spooldir
# 注意这里的文件夹 换成自己的!!!
a3.sources.r3.spoolDir = /opt/wzk/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true

# 忽略以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 500
# Describe the sink
a3.sinks.k3.type = hdfs
# 注意修改成你自己的IP!!!
a3.sinks.k3.hdfs.path = hdfs://h121.wzk.icu:9000/flume/upload/%Y%m%d/%H%M

# 上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
# 是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# 积攒500个Event,flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 500
# 设置文件类型
a3.sinks.k3.hdfs.fileType = DataStream
# 60秒滚动一次
a3.sinks.k3.hdfs.rollInterval = 60
# 128M滚动一次
a3.sinks.k3.hdfs.rollSize = 134217700
# 文件滚动与event数量无关
a3.sinks.k3.hdfs.rollCount = 0
# 最小冗余数
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

启动Agent

$FLUME_HOME/bin/flume-ng agent --name a3 \
--conf-file flume-spooldir-hdfs.conf \
-Dflume.root.logger=INFO,console

在这里插入图片描述

测试效果

Flume

cd /opt/wzk/upload
vim 1.txt

随便向其中写入一些内容,并保存,可以看到Flume已经有反应了。
在这里插入图片描述

HDFS

查看HDFS,也已经有内容了
在这里插入图片描述

采集双写

这里业务上需要:

  • Flume将数据写入本地
  • Flume将数据写入HDFS

分析实现

  • 需要多个Agent级联实现
  • Source选择taildir
  • Channel选择memory
  • 最终的Sink分别选择HDFS,file_roll

在这里插入图片描述

配置文件1

配置文件包含如下内容:

  • 1个 taildir source
  • 2个 memory channel
  • 2个 avro sink

新建文件

vim flume-taildir-avro.conf

写入如下内容

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating
# source
a1.sources.r1.type = taildir
# 记录每个文件最新消费位置
a1.sources.r1.positionFile = /root/flume/taildir_position.json
a1.sources.r1.filegroups = f1
# 备注:.*log 是正则表达式;这里写成 *.log 是错误的
a1.sources.r1.filegroups.f1 = /tmp/root/.*log
# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux123
a1.sinks.k1.port = 9091
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux123
a1.sinks.k2.port = 9092
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 500
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 500
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

配置文件2

配置文件包含如下内容:

  • 1个 avro source
  • 1个 memory channel
  • 1个 hdfs sink

新建配置文件

vim flume-avro-hdfs.conf

写入如下的内容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = linux123
a2.sources.r1.port = 9091
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 500
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://linux121:8020/flume2/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
# 是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 500个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 500
# 设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
# 60秒生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 0
a2.sinks.k1.hdfs.rollCount = 0
a2.sinks.k1.hdfs.minBlockReplicas = 1
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

配置文件3

配置文件包含如下内容:

  • 1个 avro source
  • 1个 memory channel
  • 1个 file_roll sink

新建配置文件

vim flume-avro-file.conf

写入如下的内容

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = linux123
a3.sources.r1.port = 9092
# Describe the sink
a3.sinks.k1.type = file_roll
# 目录需要提前创建好
a3.sinks.k1.sink.directory = /root/flume/output
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 10000
a3.channels.c2.transactionCapacity = 500
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

启动Agent1

$FLUME_HOME/bin/flume-ng agent --name a3 \
--conf-file ~/conf/flume-avro-file.conf \
-Dflume.root.logger=INFO,console &

启动Agent2

$FLUME_HOME/bin/flume-ng agent --name a2 \
--conf-file ~/conf/flume-avro-hdfs.conf \
-Dflume.root.logger=INFO,console &

启动Agent3

$FLUME_HOME/bin/flume-ng agent --name a1 \
--conf-file ~/conf/flume-taildir-avro.conf \
-Dflume.root.logger=INFO,console &

Hive测试

hive -e "show databases;"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/786091.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网页提示“非私密连接”怎么办?

当网页提示“非私密连接”或“您与该网站的连接不是私密连接”,这通常意味着浏览器无法建立一个安全的HTTPS连接。HTTPS协议是HTTP协议的安全版本,通过SSL协议加密数据传输,以保护用户的数据免受中间人攻击或监听。主要有下面几个原因&#x…

vue3自自定义插件注册全局事件

一. 首先在components中定义自定义组件 二. 然后在components下建立一个index.ts文件 index.ts中的代码如下 // 引入项目中全部的全局组件 import SvgIcon from ./SvgIcon/index.vue import pagination from ./pagination/index.vue // 全局对象 const allGloablComponen…

66条AI共创文章润色秘诀,一键提升你的写作水平

猫头虎 🐯 建联猫头虎,商务合作,产品评测,产品推广,个人自媒体创作,超级个体,涨粉秘籍,一起探索编程世界的无限可能! 掌握这些提示词和指令,让你的AI创作更…

centos7停服之后换阿里云的源

原因: Centos7停止维护 CentOS 7 官方支持在2024年6月30日结束。如果您正在使用CentOS 7,建议迁移到另一个仍在维护的Linux发行版,如CentOS Stream、AlmaLinux、Rocky Linux或者转换到使用Debian或Ubuntu。国产的华为的:openEule…

数据结构(初阶1)

文章目录 一、复杂度概念 二、时间复杂度 2.1 大O的渐进表示法 2.2 时间复杂度计算示例 2.2.1. // 计算Func2的时间复杂度? 2.2.2.// 计算Func3的时间复杂度? 2.2.3.// 计算Func4的时间复杂度? 2.2.4.// 计算strchr的时间复杂度? …

Windows与time.windows.com同步time出错(手把手操作)

今天我来针对Windows讲解Time同步 时间问题 计算机的时间不同,过快或者过慢。(可以和自己的手机时间进行对比,手机的时间进行同步的频率会比计算机更快,因此更精准)计算机time过快和过慢,会导致使用过程中…

从零开始做题:emoji

题目 给出一张图片 解题 from PIL import Image import random # 读取txt文件 with open("rgb.txt", "r") as file: lines file.readlines() # 跳过第一行(包含尺寸信息) lines lines[1:] # 提取RGB颜色值 colors…

RK3588开发笔记(四):基于定制的RK3588一体主板升级镜像

若该文为原创文章,转载请注明原文出处 本文章博客地址:https://hpzwl.blog.csdn.net/article/details/140288662 长沙红胖子Qt(长沙创微智科)博文大全:开发技术集合(包含Qt实用技术、树莓派、三维、OpenCV…

新能源汽车充电站远程监控系统S275钡铼技术无线RTU

新能源汽车充电站的远程监控系统在现代城市基础设施中扮演着至关重要的角色,而钡铼技术的S275无线RTU作为一款先进的物联网数据监测采集控制短信报警终端,为充电站的安全运行和高效管理提供了强大的技术支持。 技术特点和功能 钡铼S275采用了基于UCOSI…

哨兵系统:一套实时灵活可配置化的业务指标监控系统

简介: 在KOO分期的线下业务中,需要对很多关键业务指标进行实时监控,并需要根据一定的数据格式,通过企微机器人发往对应的企微群,因此KOO分期技术团队在KOO业务指标库之上,搭建了一套KOO分期业务指标监控系统&#xff…

【算法】单调队列单调栈

一、单调队列 用来维护一段区间内的最大值或最小值,例如滑动窗口、区间最值等问题。 基本概念 单调队列是一种存储数据的队列,其中元素的顺序是单调递增或单调递减的。在算法竞赛中,我们一般使用两个单调队列,一个维护单调递增序…

Android约束布局的概念与属性(1)

目录 1.相对定位约束2.居中和偏移约束 约束布局(ConstraintLayout)是当前Android Studio默认的布局方式,也是最灵活的一种布局方式。约束布局推荐使用所见即所得的模式进行布局,约束布局的大部分布局可以通…

25考研,数二全程跟的张宇老师请问660(做了一半)880和张宇1000题应该怎么选择?

跟张宇老师,也可以做其他的题集,不一定非要做1000题 我当初考研复习的时候,也听了张宇老师的课程,但是我并没有做1000题 因为1000题对于我来说太难了。做了一章之后,就换成其他的题目了。 对于大家来说,…

xcode中对项目或者文件文件夹重命名操作

提起揭秘答案:选中文件后,按下回车键就可以了 如果在项目中对新建的文件夹或者文件名称不满意或者输入错误了,想要修改一下名称该怎么办?如果是在文件或文件夹上右键是没有rename选项的: 其实想要重命名,很…

网络通信、BIO、NIO

1. 涉及的网络基础知识 Socket: 操作系统提供的api,介于应用层和tcp/ip层之间的软件层,封装服务器客户端之间网络通信相关内容,方便调用 IO多路复用: (I/O Multiplexing)是一种IO操作模式&a…

Java | Leetcode Java题解之第221题最大正方形

题目: 题解: class Solution {public int maximalSquare(char[][] matrix) {int maxSide 0;if (matrix null || matrix.length 0 || matrix[0].length 0) {return maxSide;}int rows matrix.length, columns matrix[0].length;int[][] dp new in…

泰勒雷达图2

matplotlib绘制泰勒雷达图 import matplotlib.pyplot as plt import numpy as np from numpy.core.fromnumeric import shape import pandas as pd import dask.dataframe as dd from matplotlib.projections import PolarAxes import mpl_toolkits.axisartist.floating_axes a…

代码随想录day36

题目一 上边、左边初始化为1 采用求和进行dp运算 class Solution(object):def uniquePaths(self, m, n):""":type m: int:type n: int:rtype: int"""dp [[0]*n for _ in range(m)]for i in range(m):dp[i][0] 1for j in range(n):dp[0][j] 1…

python-课程满意度计算(赛氪OJ)

[题目描述] 某个班主任对学生们学习的的课程做了一个满意度调查,一共在班级内抽取了 N 个同学,对本学期的 M 种课程进行满意度调查。他想知道,有多少门课是被所有调查到的同学都喜欢的。输入格式: 第一行输入两个整数 N , M 。 接…

科普文:一文搞懂jvm实战(四)深入理解逃逸分析Escape Analysis

概叙 Java 中的对象是否都分配在堆内存中? 好了太抽象了,那具体一点,看看下面这个对象是在哪里分配内存? public void test() { Object object new Object(); }这个方法中的object对象,是在堆中分配内存么&#xff1…