FlinkSql读取kafka数据流的方法(scala)

我的scala版本为2.12

<scala.binary.version>2.12</scala.binary.version>

我的Flink版本为1.13.6

<flink.version>1.13.6</flink.version>

FlinkSql读取kafka数据流需要如下依赖:

    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
    </dependency>

我们首先建一个kafka主题person_test。然后建立一个scala类作为kafka的生产者,示例内容如下:

package cha01

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import java.util.Properties
import Util._

import scala.util.Random

object FlinkSqlKafkaConnectorProducer {
  def main(args: Array[String]): Unit = {
    val producerConf = new Properties()
    producerConf.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"single01:9092")
    producerConf.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"10")
    producerConf.setProperty(ProducerConfig.LINGER_MS_CONFIG,"50")
    producerConf.setProperty(ProducerConfig.RETRIES_CONFIG,"2")
    producerConf.setProperty(ProducerConfig.ACKS_CONFIG,"1")
    producerConf.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.IntegerSerializer")
    producerConf.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val topic = "person_test"
    val producer:KafkaProducer[Integer,String] = new KafkaProducer(producerConf);
    val rand = new Random()

    for(i <- 1 to 2000){
      val line: String = s"$i,Origami$i,${rand.nextInt(30)+18},${if (rand.nextInt(10) >=8) "Male" else "Female"}"
      val record: ProducerRecord[Integer, String] =
        new ProducerRecord[Integer, String](topic, 0, System.currentTimeMillis(), i, line)
      producer.send(record)
      Thread.sleep(50+rand.nextInt(500))
    }

    producer.flush()
    producer.close()

  }
}

此kafka生产者会随机生产出一系列类似以下内容的数据,类型为csv:

id,name,age,gender
1,Origami1,25,Female
2,Origami2,30,Male
3,Origami3,22,Female

随后再在工程中建立一个scala类,内容示例如下:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object CSVKafkaSystem {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance()
      .inStreamingMode()
      .build()
    val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(see)

    tabEnv.executeSql(
      """
         |CREATE TABLE person(
         |id INT,
         |name STRING,
         |age INT,
         |gender STRING
         |) WITH (
         |'connector' = 'kafka',
         |'topic'= 'person_test',
         |'properties.bootstrap.servers' = 'single01:9092',
         |'properties.group.id' = 'person_test_group',
         |'scan.startup.mode' = 'earliest-offset',
         |'format' = 'csv',
         |'csv.ignore-parse-errors' = 'true',
         |'csv.field-delimiter' = ','
         |)
         |""".stripMargin)

    tabEnv.sqlQuery("SELECT * FROM person").execute().print()
  }
}

其中的一些参数解释如下:'

connector' = 'kafka'

指定连接器类型为kafka

'topic'= 'person_test'

指定要读取的kafka主题为person_test

'properties.bootstrap.servers' = 'single01:9092'

指定kafka所在的服务器的ip地址以及端口号

'properties.group.id' = 'person_test_group'

指定 Kafka 消费者组的 ID为person_test_group

'scan.startup.mode' = 'earliest-offset'

指定了控制 Flink 从 Kafka 中读取数据时的起始位置

  • earliest-offset 表示从 Kafka 中每个分区的最早消息开始读取。
  • latest-offset 表示从 Kafka 中每个分区的最新消息开始读取。
  • group-offsets 表示使用 Kafka 消费者组的偏移量来恢复上次消费的位置

'format' = 'csv'

指定了 kafka 消息的格式为csv

'csv.ignore-parse-errors' = 'true'

指定了忽略解析异常的信息

'csv.field-delimiter' = '

指定 CSV 数据中字段的分隔符为,

可以看到最终结果如下,数据在源源不断的生成,flinkSQL也在源源不断的读取表内容

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/914802.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

语音 AI 革命:未来,消费者更可能倾向于与 AI 沟通,而非人工客服

「未来&#xff0c;消费者更可能倾向于与 AI 沟通&#xff0c;而非人工客服&#xff0c;因为这将成为解决问题的最高效途径。」 这篇来自 Bessemer Venture Partners 的报告&#xff0c;是目前为止对语音 AI 在企业应用上最完整清晰的一次梳理。 核心要点&#xff1a; 尽管市…

【t365】基于springboot的高校疫情防控系统

摘 要 互联网发展至今&#xff0c;无论是其理论还是技术都已经成熟&#xff0c;而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以很好地为人们提供服务。针对信息管理混乱&#xff0c;出错率高&#xff0c;信息安全性差&#x…

DAY27|贪心算法Part01|LeetCode:455.分发饼干、376. 摆动序列、53. 最大子序和

贪心算法 贪心的本质是选择每一阶段的局部最优&#xff0c;从而达到全局最优。 贪心算法并没有固定的套路&#xff0c;最难想的就在于如何通过局部最优去推出全局最优。在做一个题目的时候&#xff0c;靠自己手动模拟&#xff0c;如果模拟可行&#xff0c;就可以试一试贪心策略…

四万字长文SpringBoot、Spring、SpringMVC等相关面试题(注:该篇博客将会持续维护 最新维护时间:2024年11月12日)

&#x1f9f8;本篇博客重在讲解SpringBoot、Spring、SpringMVC等相关面试题&#xff0c;将会实时更新&#xff0c;欢迎大家添加作者文末联系方式交流 &#x1f4dc;JAVA面试题专栏&#xff1a;JAVA崭新面试题——2024版_dream_ready的博客-CSDN博客 &#x1f4dc;作者首页&…

免费HTML模板和CSS样式网站汇总

HTML模板&#xff1a;&#xff08;注意版权&#xff0c;部分不可商用&#xff09; 1、Tooplate&#xff0c;免费HTML模板下载 Download 60 Free HTML Templates for your websitesDownload 60 free HTML website templates or responsive Bootstrap templates instantly from T…

深入理解接口测试:实用指南与最佳实践5.0(二)

✨博客主页&#xff1a; https://blog.csdn.net/m0_63815035?typeblog &#x1f497;《博客内容》&#xff1a;.NET、Java.测试开发、Python、Android、Go、Node、Android前端小程序等相关领域知识 &#x1f4e2;博客专栏&#xff1a; https://blog.csdn.net/m0_63815035/cat…

SpringBoot技术:共享汽车行业的新动力

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了共享汽车管理系统的开发全过程。通过分析共享汽车管理系统管理的不足&#xff0c;创建了一个计算机管理共享汽车管理系统的方案。文章介绍了共享汽车管理系统的系…

Java Review - 线程池原理源码解析

文章目录 Pre为什么要用线程池线程池的优点&#xff08;1&#xff09;重复利用线程&#xff08;2&#xff09;控制线程的数量 线程池实现原理线程池ThreadPoolExecutor类关系线程池的工作流程任务队列空闲线程的存活时间参数ThreadFactory拒绝策略被拒绝后的任务如何再次执行 向…

昇思大模型平台打卡体验活动:项目4基于MindSpore实现Roberta模型Prompt Tuning

基于MindNLP的Roberta模型Prompt Tuning 本文档介绍了如何基于MindNLP进行Roberta模型的Prompt Tuning&#xff0c;主要用于GLUE基准数据集的微调。本文提供了完整的代码示例以及详细的步骤说明&#xff0c;便于理解和复现实验。 环境配置 在运行此代码前&#xff0c;请确保…

【MySQL】数据库表连接简明解释

未经许可,不得转载。 文章目录 表连接表连接的类型内连接与外连接结合 WHERE 条件交叉连接(cross join)表连接 在关系型数据库中,建模是数据组织的核心难点。数据库建模需要将数据关系理清,构建出适合存储和查询的结构。 所谓“模型”包括实体(entity) 和关系(relati…

离线安装GDAL与MapServer:在银河麒麟V10上的快速指南

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 ✨特色专栏&#xff1a…

17.UE5丰富怪物、结构体、数据表、构造函数

2-19 丰富怪物&#xff0c;结构体、数据表格、构造函数_哔哩哔哩_bilibili 目录 1.结构体和数据表格 2.在构造函数中初始化怪物 3.实现怪物是否游荡 1.结构体和数据表格 创建蓝图&#xff1a;结构体蓝图 在结构体蓝图中添加变量&#xff0c;如下所示&#xff0c;为了实现不…

Kafka 快速入门(一)

1.1安装部署 1.1.1 集群规划 bigdata01bigdata02bigdata03zookeeperzookeeperzookeeperkafkakafkakafka 1.1.2 集群部署 官方下载地址&#xff1a;http://kafka.apache.org/downloads.html 检查三台虚拟机的zk是否启动&#xff1a;zkServer.sh start 默认启动方式 1)解压…

零件图纸的技术要求及标注

1零件的技术要求 零件在加工、检验时的各项技术要求&#xff0c;通常是指表面粗糙度、尺寸公差、形状和位置公差&#xff0c;材料的热处理及表面处理等。 2尺寸公差与配合 1、零件的互换性&定义、作用 在按规定要求大量制造的零件或部件中&#xff0c;任取一个&#xff0…

Python 的 Pygame 库,编写简单的 Flappy Bird 游戏

Pygame 是一个用 Python 编写的开源游戏开发框架&#xff0c;专门用于编写 2D 游戏。它提供了丰富的工具和功能&#xff0c;使得开发者能够快速实现游戏中的图形渲染、声音播放、输入处理和动画效果等功能。Pygame 非常适合初学者和想要快速创建游戏原型的开发者。 Pygame 的主…

【缓存策略】你知道 Cache Aside(缓存旁路)这个缓存策略吗

&#x1f449;博主介绍&#xff1a; 博主从事应用安全和大数据领域&#xff0c;有8年研发经验&#xff0c;5年面试官经验&#xff0c;Java技术专家&#xff0c;WEB架构师&#xff0c;阿里云专家博主&#xff0c;华为云云享专家&#xff0c;51CTO 专家博主 ⛪️ 个人社区&#x…

2024版最新kali linux 新手教程(非常详细)零基础入门到精通,收藏这篇就够了

您是否有兴趣使用 Kali Linux&#xff0c;但不知道从哪里开始&#xff1f;您来对地方了。 Kali Linux 是一个强大的渗透测试和道德黑客工具&#xff0c;提供许多工具和资源。 本 Kali Linux 教程将向您展示如何下载和安装它、解释桌面并强调您应该知道的关键领域。接下来&…

Android JNI 技术入门指南

引言 在Android开发中&#xff0c;Java是一种主要的编程语言&#xff0c;然而&#xff0c;对于一些性能要求较高的场景&#xff08;如音视频处理、图像处理、计算密集型任务等&#xff09;&#xff0c;我们可能需要使用到C或C等语言来编写底层的高效代码。为了实现Java代码与C…

国标GB28181视频平台EasyCVR私有化部署视频平台对接监控录像机NVR时,录像机“资源不足”是什么原因?

EasyCVR视频融合云平台&#xff0c;是TSINGSEE青犀视频“云边端”架构体系中的“云平台”系列之一&#xff0c;是一款针对大中型项目设计的跨区域、网络化、视频监控综合管理系统平台&#xff0c;通过接入视频监控设备及视频平台&#xff0c;实现视频数据的集中汇聚、融合管理、…

HarmonyOS NEXT:模块化项目 ——修改应用图标+启动页等

涉及官方文档 应用配置文件应用/组件级配置图标资源规范 涉及到app.json5配置文件和module.json5配置文件 1、 icon和label的校验。 IDE从5.0.3.800版本开始&#xff0c;不再对module.json5中的icon和label做强制校验&#xff0c;因此module.json5与app.json5只需要选择其一…