Flume拦截器使用-实现分表、解决零点漂移等

img

1.场景分析

使用flume做数据传输时,可能遇到将一个数据流中的多张表分别保存到各自位置的问题,同时由于采集时间和数据实际发生时间存在差异,因此需要根据数据实际发生时间进行分区保存。
鉴于此,需要设计flume拦截器配置conf文件实现上述功能,废话不多说,直接上代码。

2.配置文件

<dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83_noneautotype</version>
        </dependency>
    </dependencies>

3.主程序

public class test implements Interceptor  {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //1、获取header和body的数据
        try {
            byte[] body = event.getBody();
            Map<String, String> headers = event.getHeaders();
            String log = new String(body, StandardCharsets.UTF_8);
            //2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回null
            JSONObject jsonObject = JSONObject.parseObject(log);
            //3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题和历史数据同步)
            Long ts = DateFormatUtil.toTsAddTimeZone(jsonObject.getString("@timestamp"));
            String topicHeader = headers.get("topic");
//            System.out.println("topicHeader主题名称为:"+topicHeader);
            String httpUserAgent = jsonObject.getString("topicHeader");
            //数据筛选
            if("clb-healthcheck".equals(httpUserAgent) || (StringUtils.isNotEmpty(httpUserAgent) && httpUserAgent.startsWith("kube-probe"))){
//                System.out.println("过滤的事件为:"+event);
                return null;
            }else {
                jsonObject.put("timestamp",ts);
                headers.put("timestamp", ts.toString());
                if("xxx".equals(topicHeader)){
                    headers.put("table","table1");
                }else if("xxxx".equals(topicHeader)){
                    headers.put("table","table2");
                }else{
                    headers.put("table","other");
                }

                event.setBody(jsonObject.toString().getBytes(StandardCharsets.UTF_8));
//                System.out.println("传输的事件为:"+event);
                return event;
            }

        }catch (JSONException e){
//            System.out.println("格式有问题的事件为:"+event);
            return null;
        }
    }


    @Override
    public List<Event> intercept(List<Event> list) {
        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            if(intercept(next)==null){
                iterator.remove();
            }
        }

        return list;
    }

    @Override
    public void close() {

    }
    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new test();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

4.utils-时间处理程序

/**
 * 日期转换工具类
 * 注意:SimpleDateFormat在对日期进行转换的时候,存在线程安全的问题
 * 建议:使用JDK1.8之后提供的日期包下的相关类完成封装
 */
public class DateFormatUtil {
    private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    private static final DateTimeFormatter dtf1 = DateTimeFormatter.ofPattern("yyyyMMdd");
    private static final DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyyMMddHH");
    private static final DateTimeFormatter dtfFull = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private static final DateTimeFormatter dtfFull1 = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss SSS");
    private static final DateTimeFormatter dtfFull2 = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");

    public static Long toTs(String dtStr, boolean isFull) {

        LocalDateTime localDateTime = null;
        if (!isFull) {
            dtStr = dtStr + " 00:00:00";
        }
        localDateTime = LocalDateTime.parse(dtStr, dtfFull);

        return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
    }

    public static Long toTsAddTimeZone(String dtStr) {

        LocalDateTime localDateTime = null;
        localDateTime = LocalDateTime.parse(dtStr, dtfFull2);

        return localDateTime.toInstant(ZoneOffset.of("+0")).toEpochMilli();
    }
    public static Long toTs1(String dtStr) {

        LocalDateTime localDateTime = null;

        localDateTime = LocalDateTime.parse(dtStr, dtfFull1);

        return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
    }


    public static Long toTs(String dtStr) {
        return toTs(dtStr, false);
    }

    public static String toDate(Long ts) {
        Date dt = new Date(ts);
        LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
        return dtf.format(localDateTime);
    }

    public static String toYmdHms(Long ts) {
        Date dt = new Date(ts);
        LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
        return dtfFull.format(localDateTime);
    }

    public static String toYmd(Long ts) {
        Date dt = new Date(ts);
        LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
        return dtf1.format(localDateTime);
    }
    public static String toYmdH(Long ts) {
        Date dt = new Date(ts);
        LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
        return dtf2.format(localDateTime);
    }
    public static int toYmdHInt(Long ts) {
        Date dt = new Date(ts);
        LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
        return Integer.parseInt(dtf2.format(localDateTime));
    }

    public static void main(String[] args) {


        long t1= 1670833135997L;
        String s1 = toYmdH(t1);
        int i1 = toYmdHInt(t1);
        String s2 = toYmdH(1670833136000L);
//        long s3 = toTsAddTimeZone("2024-01-31 05:43:46");
        long s4 = toTsAddTimeZone("2024-01-31T06:11:54.000Z");

//        System.out.println(s3);
        System.out.println(s4);



    }

}

5.打包放入flume的lib目录

mv test.jar /data/flume-1.9.0/lib/

6.编写配置文件运行

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1 k2

#配置source
a1.sources.r1.type= org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 2000
a1.sources.r1.kafka.consumer.group.id= xxx
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = xxx:9092
a1.sources.r1.kafka.topics = xxx,xxxx
a1.sources.r1.kafka.consumer.auto.offset.reset = latest
a1.sources.r1.interceptors = i1
#此处需要写jar包的详细reference
a1.sources.r1.interceptors.i1.type =test$Builder



#memory channel
a1.channels.c1.type = memory
#channel的event个数
a1.channels.c1.capacity = 20000
#事务event个数
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 2147483648

#配置channel
#a1.channels.c1.type = file
#a1.channels.c1.checkpointDir =/data/xxx
#a1.channels.c1.dataDirs = /data/module/xxx
#a1.channels.c1.maxFileSize = 2147483648
#a1.channels.c1.capacity = 2000000
#a1.channels.c1.transactionCapacity=20000
#a1.channels.c1.keep-alive = 6
#a1.chhannels.c1.checkpointInterval=60000
#a1.minimumRequirdSpace=26214400

#配置sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =/hadoop/dm_dw/tmp_data/log/%{table}/%Y%m%d/%H
a1.sinks.k1.hdfs.filePrefix = log1
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 360
a1.sinks.k1.hdfs.rollSize = 1174405120
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize=3000
#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip


#配置sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path =/hadoop/dm_dw/tmp_data/log/%{table}/%Y%m%d/%H
a1.sinks.k2.hdfs.filePrefix = log2
a1.sinks.k2.hdfs.round = false
a1.sinks.k2.hdfs.rollInterval = 360
a1.sinks.k2.hdfs.rollSize = 1174405120
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.batchSize=3000
#控制输出文件类型
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.codeC = gzip


#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

#k1.batchSize+k2.batchSize < c1.capacity

启动命令

 nohup /data/module/flume-1.9.0/bin/flume-ng agent -Xms1024m -Xmx2048m -n a1 -c /data/module/flume-1.9.0/conf -f /data/module/flume-1.9.0/job/test.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=36001  >/dev/null 2>&1 &

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/378907.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C#,佩尔数(Pell Number)的算法与源代码

1 佩尔数&#xff08;Pell Number&#xff09; 佩尔数&#xff08;Pell Number&#xff09;是一个自古以来就知道的整数数列&#xff0c;由递推关系定义&#xff0c;与斐波那契数类似。佩尔数呈指数增长&#xff0c;增长速率与白银比的幂成正比。它出现在2的算术平方根的近似值…

一图窥探RAG技术发展现状

2023年除了大语言模型&#xff0c;听到最多的当属RAG&#xff08;检索增强生成技术了&#xff09;&#xff0c;在实际业务场景落地过程中&#xff0c;由于大模型目前的一定局限和能力现状以及Token限制、训练成本等多种因素的影响下&#xff0c;RAG不得不成为大家选择快速试错、…

WebSocket+Http实现功能加成

WebSocketHttp实现功能加成 前言 首先&#xff0c;WebSocket和HTTP是两种不同的协议&#xff0c;它们在设计和用途上有一些显著的区别。以下是它们的主要特点和区别&#xff1a; HTTP (HyperText Transfer Protocol): 请求-响应模型&#xff1a; HTTP 是基于请求-响应模型的协…

Three.js学习8:基础贴图

一、贴图 贴图&#xff08;Texture Mapping&#xff09;&#xff0c;也翻译为纹理映射&#xff0c;“贴图”这个翻译更直观。 贴图&#xff0c;就是把图片贴在 3D 物体材质的表面&#xff0c;让它具有一定的纹理&#xff0c;来为 3D 物体添加细节的一种方法。这使我们能够添加…

TCP和UDP相关问题(重点)——7.TCP的流量控制怎么实现的?

流量控制就是在双方通信时&#xff0c;发送方的速率和接收方的速率不一定是相等的&#xff0c;如果发送方发送的太快&#xff0c;接收方就只能把数据先放到接收缓冲区中&#xff0c;如果缓冲区都满了&#xff0c;那么处理不过来就只能丢弃&#xff0c;所以需要控制发送方的速率…

【Go】三、Go并发编程

并发编程 我们主流的并发编程思路一般有&#xff1a;多进程、多线程 但这两种方式都需要操作系统介入&#xff0c;进入内核态&#xff0c;是十分大的时间开销 由此而来&#xff0c;一个解决该需求的技术出现了&#xff1a;用户级线程&#xff0c;也叫做 绿程、轻量级线程、协…

猫头虎分享已解决Bug || Spring Error: Request method ‘POST‘ not supported

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通鸿蒙》 …

06-OpenFeign-使用HtppClient连接池

默认下OpenFeign使用URLConnection 请求连接&#xff0c;每次都需要创建、销毁连接 1、添加ApacheHttpClient依赖 <!-- 使用Apache HttpClient替换Feign原生httpclient--><dependency><groupId>org.apache.httpcomponents</groupId><artifact…

springboo冬奥会科普平台源码和论文

随着信息技术和网络技术的飞速发展&#xff0c;人类已进入全新信息化时代&#xff0c;传统管理技术已无法高效&#xff0c;便捷地管理信息。为了迎合时代需求&#xff0c;优化管理效率&#xff0c;各种各样的管理平台应运而生&#xff0c;各行各业相继进入信息管理时代&#xf…

EMC学习笔记(二十二)降低EMI的PCB设计指南(二)

降低EMI的PCB设计指南&#xff08;二&#xff09; 1.电源和地概述2.电感量3.两层板和四层板4.单层和双层设计中的微控制器接地5.信号返回地6.模拟、数字信号与大功率电源7.模拟电源引脚和模拟参考电源8.四层板电源设计参考注意事项 tips&#xff1a;资料主要来自网络&#xff0…

Apache网站部署

站点添加及linux防火墙和selinux启动和停止 apache站点添加 linux系统防火墙和selinux起停 1、防火墙firewall操作 查看防火墙的状态&#xff0c;如下&#xff08;默认开启&#xff09;&#xff1a; systemctl status firewalld 关闭服务 systemctl stop firewalld 关闭…

《向量数据库指南》——Milvus Cloud「删除」:眼见未必为实

“执行 Collection 中的 delete 操作后,再次调用 num_entities 检查集合中的数据的条数,和删除前一致, delete 不能从物理层面上删除数据吗?”“删除的数据还能被查到是为什么?”“请问下删除 collection 后,磁盘大小没有恢复,该怎么处理?”社区中关于“删除”讨论最多…

EMC学习笔记(二十一)降低EMI的PCB设计指南(一)

降低EMI的PCB设计指南&#xff08;一&#xff09; 1.概述2.射频3.连接器与过孔元件4.静态引脚和动态引脚和输入5.基本回路6.差模与共模 tips&#xff1a;资料主要来自网络&#xff0c;仅供学习使用。 1.概述 印刷电路板(PCB)的一般布局准则&#xff0c;基本上都有相对的文件进…

2024.2.5

#include<stdio.h> #include<string.h> #include<math.h> #include<stdlib.h> typedef int datatype; //定义结点结构体 typedef struct Node {datatype data;struct Node *next; }*node; //创建结点 node creat_node() {node s(node)malloc(sizeof(st…

基于SSM的网络在线考试系统(有报告)。Javaee项目。ssm项目。

演示视频&#xff1a; 基于SSM的网络在线考试系统&#xff08;有报告&#xff09;。Javaee项目。ssm项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spring …

(十八)springboot实战——spring securtity注解方式的授权流程源码解析

前言 在上一节内容中&#xff0c;我们介绍了如何在FilterSecurityInterceptor过滤器中处理用户的授权流程&#xff0c;并分析了其源码&#xff0c;spring security还提供了方法级别的授权方式&#xff0c;通过EnableMethodSecurity注解启用权限认证流程&#xff0c;只需要在方…

【数据结构】链表OJ面试题4(题库+解析)

1.前言 前五题在这http://t.csdnimg.cn/UeggB 后三题在这http://t.csdnimg.cn/gbohQ 给定一个链表&#xff0c;判断链表中是否有环。http://t.csdnimg.cn/Rcdyc 记录每天的刷题&#xff0c;继续坚持&#xff01; 2.OJ题目训练 10. 给定一个链表&#xff0c;返回链表开始…

【doghead】uv_loop_t的创建及线程执行

worker测试程序,类似mediasoup对uv的使用,是one loop per thread 。创建一个UVLoop 就可以创建一个uv_loop_t Transport 创建一个: 试验配置创建一个: UvLoop 封装了libuv的uv_loop_t ,作为共享指针提供 对uv_loop_t 创建并初始化

【CV论文精读】【MVDet】Multiview Detection with Feature Perspective Transformation

0.论文摘要 合并多个摄像机视图进行检测减轻了拥挤场景中遮挡的影响。在多视图检测系统中&#xff0c;我们需要回答两个重要问题。首先&#xff0c;我们应该如何从多个视图中聚合线索&#xff1f;第二&#xff0c;我们应该如何从空间上相邻的位置聚集信息&#xff1f;为了解决…

【机器学习】数据清洗之识别缺失点

&#x1f388;个人主页&#xff1a;甜美的江 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;机器学习 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进步…