flink sink kafka

接上文:一文说清flink从编码到部署上线
之前写了kafka source,现在补充kafka sink。完善kafka相关操作。

环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401;kafka_2.12-2.5.0。

1. kafka 创建 topic

topic:rv-test-sink。
在这里插入图片描述

2.添加依赖

<!--flink cdc kafka 相关依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

3.创建运行环境

package com.zl.utils;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;

/**
 * EnvUtil
 * @description:
 */
public class EnvUtil {
    /**
     * 设置flink执行环境
     * @param parallelism 并行度
     */
    public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {
        // System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为root
        System.setProperty("HADOOP_USER_NAME", "root");
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        if (parallelism >0 ){
            //设置并行度
            env.setParallelism(parallelism);
        } else {
            env.setParallelism(1);// 默认1
        }

        // 添加重启机制
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));
        // 没有这个配置,会导致“Flink 任务没报错,但是无法同步数据到doris”。
        // 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000
        env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
        //rocksdb状态后端,启用增量checkpoint
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
        //设置checkpoint路径
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        // 同一时间只允许一个 checkpoint 进行(默认)
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //最小间隔,10*60*1000=60000
        checkpointConfig.setMinPauseBetweenCheckpoints(60000);
        // 取消任务后,checkpoint仍然保存
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //checkpoint容忍失败的次数
        checkpointConfig.setTolerableCheckpointFailureNumber(5);
        //checkpoint超时时间 默认10分钟
        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
        //禁用operator chain(方便排查反压)
        env.disableOperatorChaining();
        return env;
    }

    public static StreamTableEnvironment getFlinkTenv(StreamExecutionEnvironment env) {
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
        //设置时区 东八
        tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));
        Configuration configuration = tenv.getConfig().getConfiguration();
        // 开启miniBatch
        configuration.setString("table.exec.mini-batch.enabled", "true");
        // 批量输出的间隔时间
        configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
        // 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条
        configuration.setString("table.exec.mini-batch.size", "20000");
        // 开启LocalGlobal
        configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
        //设置TTL API指定
        tenv.getConfig().setIdleStateRetention(Duration.ofHours(25));

        return tenv;
    }

}

4.核心代码

package com.zl.kafka;

import com.alibaba.fastjson.JSONObject;
import com.zl.utils.EnvUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

public class KafkaExampleSink {
    public static void main(String[] args) throws Exception {

        // 配置运行环境,并行度1
        StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);
        // 程序间隔离,每个程序单独设置
        env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/KafkaExampleSink");

        /// ===== 构造kafka sink =====
        // 相关参数配置可以参考下面这两个文档:①https://cloud.tencent.com/developer/article/2089393
        // ②https://www.bilibili.com/opus/819228616166473783
        // kafka配置
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "10.86.97.21:9092,10.86.97.21:9093,10.86.97.21:9094");
        // 当设置为“true”时,生产者将确保流中只写入每条消息的一个副本。
        prop.setProperty("enable.idempotence", "true");
        // 指定了生产者在接收到服务器相应之前可以发送多个消息,值越高,占用的内存越大,
        // 当然也可以提升吞吐量,发生错误时,可能会造成数据的发送顺序改变,其默认值是5.
        prop.setProperty("max.in.flight.requests.per.connection", "5");
        prop.setProperty("acks", "all");
        // 在kafka中消息发送失败时,指定生产者可以重发消息的次数,默认情况下,
        // 生产者在每次重试之间默认等待100ms,可以通过参数retey.backoff.ms参数来改变这个时间间隔。retries的缺省值:0.
        prop.setProperty("retries", "5");
        // 事务超时时间
        prop.setProperty("transaction.timeout.ms", 15 * 60 * 1000 + "");

        String topic = "rv-test-sink";
        FlinkKafkaProducer<String> flinkKafkaProducer = new FlinkKafkaProducer<String>(
                topic,// topic
                new KafkaSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String s, @Nullable Long aLong) {
                        return new ProducerRecord<>(topic, s.getBytes(StandardCharsets.UTF_8));
                    }
                },
                prop,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        );

        /// ===== 构造模拟数据 =====
        JSONObject rvJsonObject = new JSONObject();
        rvJsonObject.put("dt","2024-12-20");// 日期取当天
        rvJsonObject.put("uuid","data-stream-1");
        rvJsonObject.put("report_time",1733881971621L);

        String mockJson = JSONObject.toJSONString(rvJsonObject);

        /// ===== sink kafka =====
        env.fromElements(mockJson).addSink(flinkKafkaProducer).setParallelism(3).name("kafka-sink").uid("kafka-sink");

        env.execute("kafka-sink-job");

    }// main

}

5.运行

由于不是持续输入流,运行完会结束。
在这里插入图片描述
sink到kafka的数据如下:
在这里插入图片描述

6.完整代码

完整代码见:完整代码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/942495.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【安全编码】Web平台如何设计防止重放攻击

我们先来做一道关于防重放的题&#xff0c;答案在文末 防止重放攻击最有效的方法是&#xff08; &#xff09;。 A.对用户密码进行加密存储使用 B.使用一次一密的加密方式 C.强制用户经常修改用户密码 D.强制用户设置复杂度高的密码 如果这道题目自己拿不准&#xff0c;或者…

Diagramming AI: 使用自然语言来生成各种工具图

前言 在画一些工具图时&#xff08;流程图、UML图、架构图&#xff09;&#xff0c;你还在往画布上一个个的拖拽组件来进行绘制么&#xff1f;今天介绍一款更有效率的画图工具&#xff0c;它能够通过简单的自然语言描述来完成一个个复杂的图。 首页 进入官网之后&#xff0c;我…

黑马Java面试教程_P9_MySQL

系列博客目录 文章目录 系列博客目录前言1. 优化1.1 MySQL中&#xff0c;如何定位慢查询&#xff1f;面试文稿 1.2 面试官接着问&#xff1a;那这个SQL语句执行很慢,如何分析 ( 如何优化&#xff09;呢?面试文稿 1.3 了解过索引吗?(什么是索引)1.4 继续问 索引的底层数据结构…

Windows11家庭版启动Hyper-V

Hyper-V 是微软的硬件虚拟化产品&#xff0c;允许在 Windows 上以虚拟机形式运行多个操作系统。每个虚拟机都在虚拟硬件上运行&#xff0c;可以创建虚拟硬盘驱动器、虚拟交换机等虚拟设备。使用虚拟化可以运行需要较旧版本的 Windows 或非 Windows 操作系统的软件&#xff0c;以…

第6章 图论

2024年12月25日一稿 &#x1f430;6.1 图的基本概念 6.1.1 图的定义和表示 6.1.2 图的同构 6.1.3 完全图与正则图 6.1.4 子图与补图 6.1.5 通路与回路 6.2 图的连通性 6.2.1 无向图的连通性 6.2.2 有向图的连通性 6.3 图的矩阵表示 6.3.1 关联矩阵 6.3.2 有向图的邻接矩阵…

智能网关在电力物联网中的应用

摘要 随着电力技术的快速发展&#xff0c;断路器从传统的单一保护功能演变为具备智能监控和远程管理能力的多功能设备。智能断路器作为配电系统的重要组成部分&#xff0c;集成了实时监测、远程控制和多层保护功能&#xff0c;显著提升了配电系统的安全性、稳定性和管理效率…

【论文阅读】Comprehensive Review of End-to-End Video Compression

摘要&#xff1a; 近年来&#xff0c;端到端视频压缩作为一种新兴且有前景的解决方案开始在视频压缩领域受到关注。本文对端到端视频编码和解码技术的发展与现状进行了全面的综述&#xff0c;详细介绍了传统混合编码器和端到端编码器的基本原理。本研究深入探讨了从传统视频压…

系统架构师考试 常错题记录 01

1.按照《中华人民共和国著作权法》的权利保护期&#xff08; &#xff09;受到永久保护。 A.发表权 B.修改权 C.复制权 D.发行权 正确答案&#xff1a;B 解析&#xff1a;本题考查知识产权法中的《中华人民共和著作权法》保护期限知识点。 《中华人民共和著作权法》中约定署名权…

Redis-十大数据类型

Reids数据类型指的是value的类型&#xff0c;key都是字符串 redis-server:启动redis服务 redis-cli:进入redis交互式终端 常用的key的操作 redis的命令和参数不区分大小写 &#xff0c;key和value区分 1、查看当前库所有的key keys * 2、判断某个key是否存在 exists key 3、查…

IIC驱动EEPROM

代码参考正点原子 i2c_dri:主要是三段式状态机的编写 module iic_dri#(parameter SLAVE_ADDR 7b1010000 , //EEPROM从机地址parameter CLK_FREQ 26d50_000_000, //模块输入的时钟频率parameter I2C_FREQ 18d250_000 //IIC_SCL的时钟频率)( …

webrtc获取IceCandidate流程

在WebRTC(Web Real-Time Communication)中,ICECandidate是一个关键概念,它用于描述在建立点对点(P2P)连接时可以考虑的潜在通信端点。以下是关于WebRTC中ICECandidate的详细解释: 一、ICECandidate的定义 ICECandidate对象通常包含以下关键属性: foundation:用于唯一…

一文彻底拿捏DevEco Studio的使用小技巧

程序员Feri一名12年的程序员,做过开发带过团队创过业,擅长Java相关开发、鸿蒙开发、人工智能等,专注于程序员搞钱那点儿事,希望在搞钱的路上有你相伴&#xff01;君志所向,一往无前&#xff01; 0.安装DevEco Studio DevEco Studio面向HarmonyOS应用及元服务开发者提供的集成开…

基于openEuler22.09部署OpenStack Yoga云平台(一)

OpenStack Yoga部署 安装OpenStack 一、基础准备 基于OpenStack经典的三节点环境进行部署&#xff0c;三个节点分别是控制节点&#xff08;controller&#xff09;、计算节点&#xff08;compute&#xff09;、存储节点&#xff08;storage&#xff09;&#xff0c;其中存储…

新服务器ubuntu系统相关操作

1、查看驱动:驱动版本535.216.01能够支持cuda12.2,下面直接使用默认安装的cuda。 2、赋予用户管理员权限。 首先有超级用户(root)权限来编辑 /etc/sudoers 文件,visudo 是一个命令,用于安全地编辑 /etc/sudoers 文件。运行: sudo visudo 在 visudo 编辑器中,找到类似…

微机接口课设——基于Proteus和8086的打地鼠设计(8255、8253、8259)

原理图设计 汇编代码 ; I/O 端口地址定义 IOY0 EQU 0600H IOY1 EQU 0640H IOY2 EQU 0680HMY8255_A EQU IOY000H*2 ; 8255 A 口端口地址 MY8255_B EQU IOY001H*2 ; 8255 B 口端口地址 MY8255_C EQU IOY002H*2 ; 8255 C 口端口地址 MY8255_MODE EQU IOY003H*2 ; …

【C++数据结构——树】二叉树的遍历算法(头歌教学实验平台习题) 【合集】

目录&#x1f60b; 任务描述 相关知识 1. 二叉树的基本概念与结构定义 2. 建立二叉树 3. 先序遍历 4. 中序遍历 5. 后序遍历 6. 层次遍历 测试说明 通关代码 测试结果 任务描述 本关任务&#xff1a;实现二叉树的遍历 相关知识 为了完成本关任务&#xff0c;你需要掌…

简单园区网拓扑实验

1.实验拓扑 2.实验要求 1、按照图示的VLAN及IP地址需求&#xff0c;完成相关配置 2、要求SW1为VLAN 2/3的主根及主网关 SW2为vlan 20/30的主根及主网关 SW1和SW2互为备份 3、可以使用super vlan 4、上层通过静态路由协议完成数据通信过程 5、AR1为企业出口路由器 6、要求全网可…

USB接口实现CDC(usb转串口功能)

主控&#xff1a;stm32f429 PHY芯片&#xff1a;usb3320 Cubemx System Core-RCC connectivity-USB_OTG_HS Middleware and Software Packs-USB_DEVICE 时钟配置&#xff1a;根据自己使用的MCU工作频率设置 Generate Code Keil5 打开工程 usbd_cdc_if.c这个文件&…

C++---------动态内存管理

以下是对 C 中相关概念的详细说明及代码示例&#xff1a; 一、动态分配和堆 new 操作符&#xff1a; new 操作符用于在堆上动态分配内存。它会调用对象的构造函数&#xff08;如果是类对象&#xff09;并返回指向分配内存的指针。示例&#xff1a; #include <iostream&g…

企业该如何进行合格文件外发管理

随着信息技术的迅猛发展&#xff0c;企业间的文件交换变得越来越频繁。但是&#xff0c;如何确保文件传输的安全性与效率&#xff0c;成为企业管理者面临的一个重大挑战。镭速&#xff08;Raysync&#xff09;文件外发管理方案以其独特的优势&#xff0c;成为众多企业的首选。本…