Flink Flink数据写入Kafka

一、环境准备

flink 1.14写入Kafka,首先在pom.xml文件中导入相关依赖

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.14.6</flink.version>
        <spark.version>2.4.3</spark.version>
        <hadoop.version>2.8.5</hadoop.version>
        <hbase.version>1.4.9</hbase.version>
        <hive.version>2.3.5</hive.version>
        <java.version>1.8</java.version>
        <scala.version>2.11.8</scala.version>
        <mysql.version>8.0.22</mysql.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

二、Flink将Socket数据写入Kafka(精准一次)

注意:如果要使用 精准一次 写入 Kafka,需要满足以下条件,缺一不可
1、开启 checkpoint
2、设置事务前缀
3、设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max 的 15 分钟

package com.flink.DataStream.Sink;


import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class flinkSinkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setParallelism(1);
        // 如果是精准一次,必须开启 checkpoint
        streamExecutionEnvironment.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        DataStreamSource<String> streamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);
        /**
         * TODO Kafka Sink
         * TODO 注意:如果要使用 精准一次 写入 Kafka,需要满足以下条件,缺一不可
         * 1、开启 checkpoint
         * 2、设置事务前缀
         * 3、设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max 的 15 分钟
         */
        Properties properties=new Properties();
        properties.put("transaction.timeout.ms",10 * 60 * 1000 + "");
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // 指定 kafka 的地址和端口
                .setBootstrapServers("localhost:9092")
                // 指定序列化器:指定Topic名称、具体的序列化(产生方需要序列化,接收方需要反序列化)
                .setRecordSerializer(KafkaRecordSerializationSchema
                        .<String>builder()
                        .setTopic("testtopic01")
                        // 指定value的序列化器
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                // 写到 kafka 的一致性级别: 精准一次、至少一次
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                // 如果是精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("flinkkafkasink-")
                // 如果是精准一次,必须设置 事务超时时间: 大于 checkpoint间隔,小于 max 15 分钟
                .setKafkaProducerConfig(properties)
                //.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();
        streamSource.sinkTo(kafkaSink);
        streamExecutionEnvironment.execute();
    }
}

查看ProduceerConfig配置
在这里插入图片描述

三、启动Zookeeper、Kafka

#启动zookeeper
${ZK_HOME}/bin/zkServer.sh start
#查看zookeeper状态
${ZK_HOME}/bin/zkServer.sh status
#启动kafka
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties
#查看topic
${KAFKA_HOME}/bin/kafka-topics.sh --list --zookeeper localhost:2181
#创建topic
${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic testtopic02 --partitions 2 --replication-factor 1
#删除topic
${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic testtopic02
#生产消息
${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic01
#消费消息
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic01 --from-beginning

通过socket模拟数据写入Flink之后,Flink将数据写入Kafka
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/223496.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙一出,android开发处境再受重创

华为宣布其自研操作系统鸿蒙HarmonyOSNEXT开发者预览版将不再兼容安卓系统&#xff0c;这一消息引起了广泛关注和热议。这一决策标志着华为正式告别安卓&#xff0c;摆脱了外部的制约&#xff0c;开始着手打造一个全新的生态系统。 鸿蒙系统4发布一个月&#xff0c;截至目前&a…

Java LeetCode篇-深入了解二叉树经典解法(三种方式实现:获取二叉树的最大深度)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 对称二叉树 1.1 判断对称二叉树实现思路 1.2 代码实现&#xff1a;判断对称二叉树 2.0 二叉树的最大深度 2.1 使用递归实现获取二叉树的最大深度思路 2.2 代码实…

书-用数组给已知数组插入某个元素(3)

#include<stdio.h> int main(){int i ;int b[8]{99,65,87,23,44,55};//在87前面插入一个数int n6 ;// 长度for (in;i>3;i--)b[i]b[i-1]; //解释&#xff1a;原来23是第三个位置&#xff0c;但是插入数字后变成了第四个位置//就是把b[4]赋值给b[3];b[2]1000;nn1;//一定…

深入理解MySQL事务隔离级别与锁机制

表锁&#xff1a; 行锁&#xff1a; InnoDB和MyISAM最大的不同有两点&#xff1a; InnoDB支持事务&#xff08;TRANSACTION&#xff09;InnoDB支持行锁 MyISAM在执行查询SELECT前&#xff0c;会自动给涉及的所有表加读锁&#xff0c;在执行update、insert、delete操作会自动…

【多线程】线程的三种常见创建方式

文章目录 线程创建方式1——Thread线程创建方式2——Runnable线程创建方式2——匿名内部类线程创建方式3——Callable、FutureTask,带返回值 线程其实是程序中的一条执行路径。 那怎样的程序才是多线程程序呢&#xff1f; 例如12306网站就是支持多线程的&#xff0c;因为同时可…

ArcGIS Enterprise on Kubernetes 11.1安装示例

博客主页&#xff1a;https://tomcat.blog.csdn.net 博主昵称&#xff1a;农民工老王 主要领域&#xff1a;Java、Linux、K8S 期待大家的关注&#x1f496;点赞&#x1f44d;收藏⭐留言&#x1f4ac; 目录 安装前置条件基本安装解压文件生成秘钥执行安装脚本 配置DNS方法一方法…

算法通关村第二关—K个一组反转(黄金)

K个一组翻转链表 题目介绍 LeetCode25.给你一个链表&#xff0c;每k个节点一组进行翻转&#xff0c;请你返回翻转后的链表。k是一个正整数&#xff0c;它的值小于或等于链表的长度。如果节点总数不是k的整数倍&#xff0c;那么请将最后剩余的节点保持原有顺序。进阶&#xff1…

FPGA竞赛_考试赢积分兑换专题课活动

温馨提示&#xff1a;明德扬特别组织了考试竞赛赢积分活动&#xff0c;欢迎大家积极参加考试&#xff01;我是本次活动的负责人小易老师。 一.考试兑换FPGA专题课 1积分1元.可以兑换FPGA专题课&#xff08;例如&#xff1a;拿到1000积分&#xff0c;课程售价999元&#xff0c…

第三方组件自定义扫描规则

第三方例如dubbo自定义扫描组件规则方式注入进容器。例如DubboService注解的类注入进容器中&#xff0c;实现ImportBeanDefinitionRegistrar接口&#xff0c;并通过Import注解注入。 Import除了注入ImportBeanDefinitionRegistrar类&#xff0c;还可以注入配置类Configuration和…

微信小程序音乐播放器

项目预览 项目说明 听歌音乐播放器(小程序)&#xff0c;本项目的目的是为了方便听歌用户&#xff0c;随时随地听歌&#xff0c;不需要下载APP,即用即听 运行项目时&#xff0c;微信开发者工具只需将 dist 文件夹放入即可。另&#xff0c;请将微信开发者工具中的 【不校验合法…

什么是SD-WAN?软件定义WAN是如何工作的?

下午好&#xff0c;我的网工朋友。 宽带接入以及Internet骨干网容量的持续提升&#xff0c;促使企业WAN技术变革。 在已有专线的基础上&#xff0c;SD-WAN提供了一种低成本的快捷方案&#xff0c;正受到业界的追捧。 今天就和你科普一波企业WAN技术的演进&#xff0c;再来说说…

二百一十三、Flume——Flume拓扑结构介绍

一、目的 最近在看尚硅谷的Flume资料&#xff0c;看到拓扑结构这一块&#xff0c;觉得蛮有意思&#xff0c;于是整理一下Flume的4种拓扑结构 二、拓扑结构 &#xff08;一&#xff09;简单串联 1、结构含义 这种模式是将多个flume顺序连接起来了&#xff0c;从最初的sourc…

常见的Bean工厂后置处理器

此代码在jdk11上测试通过&#xff0c;SpringBoot版本为2.7.14 1.上代码 导入坐标 <dependencies><!-- spring数据坐标 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-rest</art…

一文搞懂Git版本控制系统

1. Git简介 当涉及到软件开发或协作时&#xff0c;版本管理是一个不可或缺的概念。无论你是一个独立开发者还是一个团队成员&#xff0c;都会遇到需要跟踪和管理代码变更的情况。这时候&#xff0c;Git作为一个强大而流行的版本控制系统就发挥着重要的作用。 Git&#xff08;读…

wait notify

文章目录 1. API 介绍2. 怎么使用wait、notify2.1 sleep 和 wait 的区别2.2 sleep 和 wait 的使用模板 1. API 介绍 都属于 Object 对象的方法。必须获得此对象的锁&#xff0c;才能调用这几个方法&#xff0c;只有重量级锁才能调用wait、notify obj.wait() 让进入 object 监…

ROS小练习——话题发布

目录 一、话题与消息获取 1、话题 2、消息 二、代码编写 1、C 2、python 三、编译运行 一、话题与消息获取 打开小乌龟案例 1、话题 rqt_graph rostopic list 2、消息 获取消息类型: rostopic type /turtle1/cmd_vel 获取消息格式: rosmsg info geometry_msgs/Twi…

JAVA IO:NIO

1.阻塞 IO 模型 ​ 最传统的一种 IO 模型&#xff0c;即在读写数据过程中会发生阻塞现象。当用户线程发出 IO 请求之后&#xff0c;内核会去查看数据是否就绪&#xff0c;如果没有就绪就会等待数据就绪&#xff0c;而用户线程就会处于阻塞状态&#xff0c;用户线程交出 CPU。当…

做亚马逊需要IP代理吗?需要纯净度高的吗?

做亚马逊跨境电商的老玩家都知道&#xff0c;代理IP的作用不容小觑。通过代理IP&#xff0c;跨境电商卖家可以进行深入的市场研究&#xff0c;获取关键的数据分析&#xff0c;助力业务决策。让卖家能够安全轻松管理不同地区的账户&#xff0c;轻松防关联&#xff0c;无缝对接多…

国内AI翘楚,看看有没有你心动的offer?

科技创新争占高地&#xff0c;AI领域各显神通。从一战成名的阿尔法狗到引起轩然大波的ChatGPT&#xff0c;我们早已卷入了一场没有硝烟的革命。前方世人看到的科技日新日异、岁月静好&#xff0c;后方是各大企业的绞尽脑汁、争先恐后。人工智能时代&#xff0c;AI是挡不住的时代…

草柴返利APP领淘宝天猫优惠券拿购物返利 淘宝天猫订单如何隐藏删除订单记录?

草柴返利APP领淘宝天猫优惠券拿购物返利 手机安装「草柴」返利APP&#xff0c;复制要购买的商品链接到草柴&#xff0c;查询该商品淘宝天猫优惠券及购物返利&#xff0c;确认收货后到草柴返利APP提取返利&#xff1b; 淘宝天猫订单如何隐藏删除订单记录&#xff1f; 1、打开手…