kafka集成flink api编写教程

1.引入依赖(pox.xml)

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.6</version>

        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.13.6</version>

        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.13.6</version>

        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.13.6</version>

        </dependency>
    </dependencies>

2.创建日志配置文件

把$FLINK_HOME/conf/log4j.properties 内容复制粘贴过来

# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO

# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = RollingFile
appender.main.append = true
appender.main.fileName = ${sys:log.file}
appender.main.filePattern = ${sys:log.file}.%i
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type = Policies
appender.main.policies.size.type = SizeBasedTriggeringPolicy
appender.main.policies.size.size = 100MB
appender.main.policies.startup.type = OnStartupTriggeringPolicy
appender.main.strategy.type = DefaultRolloverStrategy
appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

3.flink生产者api

package com.ljr.flink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.ArrayList;
import java.util.Properties;

public class MyFlinkKafkaProducer {
    //输入main tab 键 即创建入main 方法
    public static void main(String[] args) throws Exception {
        //1.获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //设置的槽数与分区相等
        env.setParallelism(3);
        //2.准备数据源
        ArrayList<String> wordlist = new ArrayList<>();
        wordlist.add("zhangsan");
        wordlist.add("lisi");
        DataStreamSource<String> stream = env.fromCollection(wordlist);
        
            //创建kafka生产者
        Properties pros = new Properties();
        pros.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
        FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer("customers", new SimpleStringSchema(), pros);
        //3.添加数据源
        stream.addSink(kafkaProducer);
        //4.执行代码
        env.execute();
        
    }
}

运行;kafka消费者消费结果

4.flink消费者api

package com.ljr.flink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class MyFlinkKafkaConsumer {
    public static void main(String[] args) throws Exception {
        //1 初始化flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        //2 创建消费者
        Properties pros = new Properties();
        pros.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
        //pros.put(ConsumerConfig.GROUP_ID_CONFIG,"hh")
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("customers", new SimpleStringSchema(), pros);
        //3 关联消费者和flink流
        env.addSource(flinkKafkaConsumer).print();
        //4 执行
        env.execute();
    }
}

运行,用3中的生产者生产数据,消费结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/693612.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C# WPF入门学习主线篇(十六)—— Grid布局容器

C# WPF入门学习主线篇&#xff08;十六&#xff09;—— Grid布局容器 欢迎来到C# WPF入门学习系列的第十六篇。在前几篇文章中&#xff0c;我们已经探讨了 Canvas、StackPanel、WrapPanel 和 DockPanel 布局容器及其使用方法。本篇博客将介绍另一种功能强大且灵活的布局容器—…

MT76X8 RF定频使用方法

一、从下面网址下载QA软件包&#xff0c;然后在WIN系统下安装QA环境。https://download.csdn.net/download/zhouwu_linux/89408573?spm1001.2014.3001.5503 在WINDOWS 7系统下先安装WinPcap_4_1_3.exe。 二、硬件连接。 模块上电&#xff0c;PC机 的IP配置成为10.10.18.100&a…

GraphQL(6):认证与中间件

下面用简单来讲述GraphQL的认证示例 1 实现代码 在代码中添加过滤器&#xff1a; 完整代码如下&#xff1a; const express require(express); const {buildSchema} require(graphql); const grapqlHTTP require(express-graphql).graphqlHTTP; // 定义schema&#xff0c;…

Wireshark TS | 应用传输丢包问题

问题背景 仍然是来自于朋友分享的一个案例&#xff0c;实际案例不难&#xff0c;原因也就是互联网线路丢包产生的重传问题。但从一开始只看到数据包截图的判断结果&#xff0c;和最后拿到实际数据包的分析结果&#xff0c;却不是一个结论&#xff0c;方向有点跑偏&#xff0c;…

微服务第一轮

课程文档 目录 一、业务流程 1、登录 Controller中的接口&#xff1a; Service中的实现impl&#xff1a; Service中的实现impl所继承的接口IService&#xff08;各种方法&#xff09;&#xff1a; VO&#xff1a; DTO&#xff1a; 2、搜索商品 ​Controller中的接口&a…

牛客热题:最长公共子串

&#x1f4df;作者主页&#xff1a;慢热的陕西人 &#x1f334;专栏链接&#xff1a;力扣刷题日记 &#x1f4e3;欢迎各位大佬&#x1f44d;点赞&#x1f525;关注&#x1f693;收藏&#xff0c;&#x1f349;留言 文章目录 牛客热题&#xff1a;最长公共子串题目链接方法一&am…

【Redis学习笔记05】Jedis客户端(中)

Jedis客户端 1. 命令 1.1 String类型 1.1.1 常见命令 SET命令 语法&#xff1a;SET key value [EX seconds | PX milliseconds] [NX|XX] 说明&#xff1a;将string类型的value值设置到指定key中&#xff0c;如果之前该key存在&#xff0c;则会覆盖原先的值&#xff0c;原先…

Java--可变参数

1.JDK1.5开始&#xff0c;Java支持同类型的可变参数给一个方法 2.在方法声明之前&#xff0c;在指定参数类型后加一个省略号&#xff08;...&#xff09; 3.一个方法只能指定一个可变参数&#xff0c;它必须是方法的最后一个参数&#xff0c;任何普通的参数必须在它之前声明 …

Java----抽象类和接口

欢迎大家来这次博客-----抽象类和接口。 1.抽象类 1.1 抽象类概念 在Java中我们都是通过类来描述对象&#xff0c;但反过来并不是所有的类都是用来描述对象的。当一个类中没有足够的信息来描述一个具体对象&#xff0c;我们就将该类称为抽象类。 如上图中的Shape类&#xff…

支付宝订单支付和超时收单

下订单成功后&#xff0c;发送mq消息到死信队列&#xff0c;消息过期后根据死信的路由key重新路由到交换机&#xff0c;交换机根据消息的路由key把消息路由到普通队列上&#xff0c;消费者监听队列并消费。 问题&#xff0c;现在提交订单信息调用支付宝的支付页面&#xff0c;…

AI论文速读 | 2024[ICML]FlashST:简单通用的流量预测提示微调框架

题目&#xff1a; FlashST: A Simple and Universal Prompt-Tuning Framework for Traffic Prediction 作者&#xff1a;Zhonghang Li, Lianghao Xia&#xff08;夏良昊&#xff09;, Yong Xu&#xff08;徐勇&#xff09;, Chao Huang 机构&#xff1a;华南理工大学&#xf…

分享不用会员免费听歌的软件,可听付费,支持随听随下!

今天来点特别的&#xff0c;给你们带来几款全网免费听歌的神器&#xff0c;让你们的音乐之旅不再有障碍&#xff01; 现在&#xff0c;找好听的歌越来越像寻宝一样&#xff0c;动不动就得掏腰包。不过别担心&#xff0c;阿星今天就来分享几款好用的免费听歌app&#xff0c;电脑…

SQL(一)基本语法

文章目录 一、Sql 语言基本特点二、数据查询&#xff08;按执行顺序排列&#xff09;1. From & Join2. Where3. Group by4. Having5. Select6. Distinct7. Order by8. Limit/ Offset 三、功能公式1. 字符处理2. 时间处理3. 统计计算 一、Sql 语言基本特点 不区分大小写分号…

数据库(29)——子查询

概念 SQL语句中嵌套SELECT语句&#xff0c;称为嵌套查询&#xff0c;又称子查询。 SELECT * FROM t1 WHERE column1 (SELECT column1 FROM t2); 子查询外部语句可以是INSERT/UPDATE/DELETE/SELECT的任何一个。 标量子查询 子查询返回的结果是单个值&#xff08;数字&#xff…

pdf压缩文件怎么压缩最小,软件工具压缩清晰

PDF格式的文件&#xff0c;当其体积过于庞大时&#xff0c;确实在上传的过程中显得尤为不便。今天给大家分享一个压缩pdf的简单的方法&#xff0c;让大家可以轻松的压缩pdf。 浏览器打开 "轻云处理pdf官网" &#xff0c;上传pdf文件&#xff0c;文件上传完成后网站会…

ChatGPT Prompt技术全攻略-精通篇:Prompt工程技术的高级应用

系列篇章&#x1f4a5; No.文章1ChatGPT Prompt技术全攻略-入门篇&#xff1a;AI提示工程基础2ChatGPT Prompt技术全攻略-进阶篇&#xff1a;深入Prompt工程技术3ChatGPT Prompt技术全攻略-高级篇&#xff1a;掌握高级Prompt工程技术4ChatGPT Prompt技术全攻略-应用篇&#xf…

计算机网络到底是指什么?

计算机网络是信息技术领域中最为核心和复杂的一部分&#xff0c;它涵盖了众多的技术原理和应用。下面&#xff0c;我们将从技术层面深入探讨计算机网络的相关内容。 一、计算机网络的分层模型 计算机网络的分层模型是网络通信的基石&#xff0c;它将网络通信过程划分为不同的层…

万能嗅探:视频号下载神器

万能嗅探是一款比较好用资源嗅探软件&#xff0c;界面干净&#xff0c;可以抓取浏览器的网页&#xff0c;不过想必各位主要用来抓取视频号&#xff0c;下面是使用方法。 使用方法 打开万能嗅探客户端&#xff0c;然后打开浏览器&#xff0c;产生网络请求即可&#xff0c;看看…

【Linux高级IO】select、poll、epoll

【Linux高级IO】select、poll、epoll toc 作者&#xff1a;爱写代码的刚子 时间&#xff1a;2024.6.5 前言&#xff1a;本篇博客将会介绍面试重点考察的select、poll、epoll IO: input && Output read && write 应用层read&&write的时候&#xff0c…

PostgreSQL 17 Beta1 发布,酷克数据再次贡献核心力量

得益于全球的开发者贡献&#xff0c;PostgreSQL已成长为一款拥有众多全球用户和贡献者、成熟稳定的开源数据库。2024年5月23日&#xff0c;PostgreSQL全球开发组宣布&#xff0c;PostgreSQL 17的首个 Beta 版本现已开放下载。本次新版本带来了众多惊喜。值得一提的是&#xff0…