Flume 之自定义Sink

1、简介

        前文我们介绍了 Flume 如何自定义 Source, 并进行案例演示,本文将接着前文,自定义Sink,在这篇文章中,将使用自定义 Source 和 自定义的 Sink 实现数据传输,让大家快速掌握Flume这门技术。

2、自定义Source

        自定义Source参考前文:https://blog.csdn.net/zwl2220943286/article/details/135633120

3、自定义Sink

        本文将Sink定义为mysql。

3.1、引入依赖
<dependency>
	<groupId>org.apache.flume</groupId>
	<artifactId>flume-ng-core</artifactId>
	<version>1.11.0</version>
</dependency>

<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>8.0.33</version>
</dependency>
3.2、自定义Sink
3.2.1、Sink代码
import com.weilong.flumeselfdefinition.util.MysqlConfig;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {
    private final static Logger log = LoggerFactory.getLogger(MySink.class);
    private String url;
    private String username;
    private String password;

    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        Channel channel = getChannel();
        // channel 支持事务
        Transaction thx = channel.getTransaction();
        thx.begin();
        try {
            Event event = channel.take();
            String name = new String(event.getBody());
            int i = MysqlConfig.insertData(this.url, this.username, this.password, name);
            if (i > 0){
                log.info("==插入数据库成功==");
            }
            thx.commit();
            status = Status.READY;
        } catch (Exception ex){
            ex.printStackTrace();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        String url = context.getString("url");
        String username = context.getString("username");
        String password = context.getString("password");
        this.url = url;
        this.username = username;
        this.password = password;
    }
}
 3.2.2、数据库连接配置:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class MysqlConfig {

    private MysqlConfig(){
    }

    static {
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }

    public static Connection getConnection(String url, String username, String password) throws SQLException {
        Connection connection = DriverManager.getConnection(url, username, password);
        return connection;
    }

    public static int insertData(String url, String username,String password, String name){
        Connection connection = null;
        try{
            connection = getConnection(url, username, password);
            PreparedStatement preparedStatement = connection.prepareStatement("insert into test(`name`) values( '" + name + "')");
            boolean res = preparedStatement.execute();
            if (res){
                return 1;
            }
            return 0;
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            if (connection != null){
                try {
                    connection.close();
                }catch (Exception ex){
                    ex.printStackTrace();
                }
            }
        }
        return 0;
    }
}
 3.3、Flume 配置文件

        vim flume-self-source-sink.conf

a1.sources = r1
a1.channels = c1
a1.sinks=k1
# source
a1.sources.r1.type = com.weilong.flumeselfdefinition.MySource 
# 自定义 Source 的全限定类名
a1.sources.r1.path = http://192.168.30.3:8088/hello 
# 自定义参数
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 自定义Sink
a1.sinks.k1.type = com.weilong.flumeselfdefinition.MySink
a1.sinks.k1.url = jdbc:mysql://192.168.30.3:3306/test?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC
a1.sinks.k1.username = root
a1.sinks.k1.password = 146815
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4、将jar包放入lib目录 
 4.1、将自定义jar包放入lib目录

 

4.2、将数据库驱动jar包放入lib目录

        驱动jar包下载地址:https://mvnrepository.com/artifact/mysql/mysql-connector-java

注:mysql 驱动jar包不放进lib,会出现驱动类找不到。 

5、启动 Flume
bin/flume-ng agent -c conf/ -n a1 -f testconf/flume-self-source-sink.conf -Dflume.root.logger=INFO,console

注:启动Flume 之前,自定义 web 服务也要启动。

 6、结果

成功保存进数据库。

7、总结 

         本文结合前文完成 Flume 的 Source 和 Sink 的自定义,帮助大家能够完成各种场景下的Flume的使用。关于更高级Flume的知识,关注下面公众号。

        本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/327828.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【PostgreSQL】安装和常用命令教程

PostgreSQL window安装教程 window安装PostgreSQL 建表语句&#xff1a; DROP TABLE IF EXISTS student; CREATE TABLE student (id serial NOT NULL,name varchar(100) NOT NULL,sex varchar(5) NOT NULL,PRIMARY KEY (id) );INSERT INTO student (id, name, sex) VALUES (…

【电力电子】2 开、闭环单相桥式SPWM逆变仿真电路

【仅供参考】 【2022.11西南交大电力电子仿真】 目录 1 开环单相桥式SPWM逆变电路搭建及波形记录 2 闭环单相桥式SPWM逆变电路搭建及波形记录 1 开环单相桥式SPWM逆变电路搭建及波形记录 采用单极性调制法&#xff0c;按老师PPT&#xff08;如下图&#xff09;所示进行单相…

图解基础排序算法(冒泡、插入、选择)(山东大学实验二)

目录 ⚽前言&#xff1a; &#x1f3d0; 冒泡排序&#xff1a; 设定&#xff1a; 分类&#xff1a; 起源&#xff1a; 图解冒泡&#xff1a; 图中绿色&#xff1a; 图中橙色&#xff1a; 整体思路&#xff1a; 交换思路&#xff1a; 核心代码&#xff1a; &#x…

基于WebSocket双向通信技术实现-下单提醒和催单(后端)

学习复盘和总结项目亮点。 扩展&#xff1a;该功能能应用在&#xff0c;各种服务类项目中。&#xff08;例如&#xff1a;酒店、洗脚城等系ERP系中提醒类服务&#xff09; 4. 来单提醒 4.1 需求分析和设计 用户下单并且支付成功后&#xff0c;需要第一时间通知外卖商家。通…

服务网关 Gateway

服务网关 Gateway Spring Cloud Gateway 是 Spring Cloud 生态系统中的网关&#xff0c;它基于 Spring5.0 SpringBoot2.0 WebFlux&#xff08;基于高性能的 Reactor 模式响应式通信框架 Netty&#xff0c;异步非阻塞模型&#xff09;等技术开发。旨在为微服务架构提供一种简…

如何在CentOS 7 中搭建Python 3.0 环境

1、下载 通过https://www.python.org/ftp/python/下载Python安装包&#xff0c;这里下载Python-3.10.9.tgz&#xff1b; 2、上传 借助MobaXterm等工具将Python安装包上传至/opt目录&#xff1b; 3、解压 将JDK压缩文件解压至/opt目录&#xff1a;tar -xvf /opt/Python-3.1…

React Store及store持久化的使用

1.安装 npm insatll react-redux npm install reduxjs/toolkit npm install redux-persist2. 使用React Toolkit创建counterStore并配置持久化 store/modules/counterStore.ts&#xff1a; import { createSlice } from reduxjs/toolkit// 定义状态类型 interface Action {…

Python数据分析案例33——新闻文本主题多分类(Transformer, 组合模型) 模型保存

案例背景 对于海量的新闻&#xff0c;我们可能需要进行文本的分类。模型构建很重要&#xff0c;现在对于自然语言处理基本都是神经网络的方法了。 本次这里正好有一组质量特别高的新闻数据&#xff0c;涉及 教育 科技 社会 时政 财经 房产 家居 七大主题&#xff0c;基本涵盖…

编译原理1.1习题 语言处理器

图源&#xff1a;文心一言 编译原理习题整理~&#x1f95d;&#x1f95d; 作为初学者的我&#xff0c;这些习题主要用于自我巩固。由于是自学&#xff0c;答案难免有误&#xff0c;非常欢迎各位小伙伴指正与讨论&#xff01;&#x1f44f;&#x1f4a1; 第1版&#xff1a;自…

HCIP -- ospf实验

要求&#xff1a; 实现&#xff1a; R3&#xff1a;int t 0/0/0ip address 172.16.1.4 255.255.255.248 &#xff08;配置虚拟接口ip地址&#xff09;tunnel-protocol gre p2mp &#xff08;配置接口协议为p2mp&#xff09;source 43.0.0.2 &#xff08;配置源&#xff09;osp…

【UE 材质】简单的纹理失真、溶解效果

目录 1. 失真效果 2. 溶解效果 3. 失真溶解 我们一开始有这样一个纹理 1. 失真效果 其中纹理节点“DistortTexture”的纹理为引擎自带的纹理“T_Noise01”&#xff0c;我们可以通过控制参数“失真度”来控制纹理的失真程度 2. 溶解效果 3. 失真溶解

奇安信天擎 rptsvr 任意文件上传漏洞复现

0x01 产品简介 奇安信天擎是奇安信集团旗下一款致力于一体化终端安全解决方案的终端安全管理系统(简称“天擎”)产品。通过“体系化防御、数字化运营”方法,帮助政企客户准确识别、保护和监管终端,并确保这些终端在任何时候都能可信、安全、合规地访问数据和业务。天擎基于…

微软推出付费版Copilot

关注卢松松&#xff0c;会经常给你分享一些我的经验和观点。 微软已经超越苹果&#xff0c;成了全球市值最高的公司&#xff0c;其他公司都因为AI大裁员&#xff0c;而微软正好相反&#xff0c;当然这个原因很简单&#xff1a;就是微软强制把AI全面接入到系统里来了。而Copilot…

Electron+React项目打包踩坑记录

首先&#xff0c;如何打包 写下本文的时间是 2024/01/16&#xff0c;搜索了网络上 ElectronReact 的打包方式&#xff0c;中间行不通&#xff0c;本文采用的方式是记录本文时 Electron 快速入门(https://www.electronjs.org/zh/docs/latest/tutorial/quick-start)记录的打包方式…

【Java 设计模式】创建型之抽象工厂模式

文章目录 1. 定义2. 应用场景3. 代码实现4. 应用示例结语 在软件开发中&#xff0c;抽象工厂模式是一种常见的创建型设计模式&#xff0c;它提供了一种创建一系列相关或相互依赖对象的接口&#xff0c;而无需指定它们具体的类。抽象工厂模式的核心思想是将一组相关的产品组合成…

spring boot shardingsphere mybatis-plus druid mysql 搭建mysql数据库读写分离架构

spring boot shardingsphere mybatis-plus druid mysql 搭建mysql数据库读写分离架构 ##关于window mysql主从搭建简单教程 传送门 window mysql5.7 搭建主从同步环境-CSDN博客 ##父pom.xml <?xml version"1.0" encoding"UTF-8"?> <project…

北交所交易手续费标准?哪家证券公司开通北交所券商交易手续费佣金万2?

北交所&#xff08;Beijing Exchange&#xff09;是指位于中国北京的一家金融交易所。北交所是中国政府为推动金融改革和国际化市场而设立的交易场所。它提供包括股票、债券、期货、外汇等多种金融产品的交易服务。北交所的目标是促进中国金融市场的发展&#xff0c;吸引国内外…

机器人持续学习基准LIBERO系列7——计算并可视化点云

0.前置 机器人持续学习基准LIBERO系列1——基本介绍与安装测试机器人持续学习基准LIBERO系列2——路径与基准基本信息机器人持续学习基准LIBERO系列3——相机画面可视化及单步移动更新机器人持续学习基准LIBERO系列4——robosuite最基本demo机器人持续学习基准LIBERO系列5——…

git项目管理

Git工作流程图 git 基础指令 git init #创建本地仓库,创建成功后&#xff0c;当前目录会多一个.git文件夹 git status #查看修改状态 git add . #添加工作区到暂存区 git commit -m 注释内容 #提交暂存区到本地仓库&#xff08;commit&#xff09; git log …

PngToIco.java

PngToIco.java [PNG转ICO]为了解决今天项目PNG弄成ICO写的 package image;import java.awt.image.BufferedImage; import java.io.File; import java.io.IOException;import javax.imageio.ImageIO;import org.apache.commons.imaging.ImageFormats; import org.apache.common…