pyfink1.20版本下实现消费kafka中数据并实时计算

1、环境

JDK版本:1.8.0_412

python版本:3.10.6

apache-flink版本:1.20.0

flink版本:1.20

kafka版本:kafka_2.12-3.1.1

flink-sql-connector-kafka版本:3.3.0-1.20

2、执行python-flink脚本

从kafka的demo获取消息,并将其中的a字段存入kafka的test_kafka_topic内,并打印sum(b)的值

from pyflink.table import TableEnvironment, EnvironmentSettings

def log_processing():
    # 创建流处理环境
    env_settings = EnvironmentSettings.in_streaming_mode()
    t_env = TableEnvironment.create(env_settings)
    
    # 设置 Kafka 连接器 JAR 文件的路径
    # 确保 JAR 文件确实存在于指定路径,并且与 Flink 版本兼容
    t_env.get_config().get_configuration().set_string(
        "pipeline.jars", 
        "file:///home/data/flink/flink-1.20.0/lib/flink-sql-connector-kafka-3.3.0-1.20.jar"
    )

    # 定义源表 DDL
    source_ddl = """
    CREATE TABLE source_table(
        a VARCHAR,
        b INT  -- 如果 b 字段不重要,可以考虑从源表中移除它
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'demo',
        'properties.bootstrap.servers' = '192.168.15.130:9092',
        'properties.group.id' = 'test_3',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json'
    )
    """

    # 定义目标表 DDL
    sink_ddl = """
    CREATE TABLE sink_table(
        a VARCHAR
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'test_kafka_topic',
        'properties.bootstrap.servers' = '192.168.15.130:9092',
        'format' = 'json'
    )
    """

    # 执行 DDL 语句创建表
    t_env.execute_sql(source_ddl)
    #table = t_env.from_path("sql_source")
    #table.execute().print()
    table_result  = t_env.execute_sql("select sum(b) sb from source_table")
    table_result.print()
    t_env.execute_sql(sink_ddl)

    # 执行 SQL 查询并将结果插入到目标表
    # 注意:wait() 方法会阻塞,直到插入操作完成(在流处理中通常是无限的)
    t_env.sql_query("SELECT a FROM source_table") \
        .execute_insert("sink_table").wait()  # 考虑是否真的需要 wait()

if __name__ == '__main__':
    log_processing()
python3 KafkaSource.py

3、启动kafka生产者

/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.130:9092 --topic demo
输入模拟数据进行测试
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}

可以看到sum(b)值已输出
在这里插入图片描述

4、启动kafka消费者

查看往test_kafka_topic插入的a字段数据已被消费

/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.130:9092  --from-beginning --topic test_kafka_topic

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/937630.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python 写的《桌面时钟》屏保

原代码: # 日历式时钟 # 导入所需的库 # 作者:Hoye # 日期:2024年12月16日 # 功能:显示当前日期、星期、时间,并显示模拟时钟 import tkinter as tk from tkinter import ttk import time import math import sysdef …

51c自动驾驶~合集41

我自己的原文哦~ https://blog.51cto.com/whaosoft/12830614 #SFPNet 迈向通用Lidar分割!取代Transformer的新架构SFPNet 迈向通用激光雷达语义分割(),取代Transformer的新架构SFPNet,新数据集S.MID 论文标题&am…

前端之CSS光速入门

一、CSS介绍 什么是CSS? CSS(Cascading Style Sheet),层叠样式表,用于控制页面的样式. CSS能够对网页中元素位置的排版进行像素级的精确控制,实现美化页面的效果.能够做到页面的样式和结构分离.(CSS可以理解为"东方四大邪术"的化妆术.对页面展示进行化…

P8615 拼接平方数 P8699 排列数

文章目录 [蓝桥杯 2014 国 C] 拼接平方数[蓝桥杯 2019 国 B] 排列数 [蓝桥杯 2014 国 C] 拼接平方数 题目描述 小明发现 49 49 49 很有趣,首先,它是个平方数。它可以拆分为 4 4 4 和 9 9 9,拆分出来的部分也是平方数。 169 169 169 也有…

【AIGC】结构化的力量:ChatGPT 如何实现高效信息管理

博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: AIGC | ChatGPT 文章目录 💯前言💯结构化的定义 (Structuration: Definition)1. 结构化的定义2. 结构化的示例3. 技术领域中的结构化数据 💯有序的规则的重要…

如何实现日期选择窗口

文章目录 1 概念介绍2 使用方法3 示例代码我们在上一章回中介绍了TimePicker Widget相关的内容,本章回中将介绍DatePickerDialog Widget.闲话休提,让我们一起Talk Flutter吧。 1 概念介绍 我们在这里说的DatePickerDialog是一种弹出窗口,只不过窗口的内容固定显示为日期,它…

开启数字化时代心理服务新篇章:专属线上心理咨询服务小程序

在当今快节奏的社会中,心理健康问题日益受到人们的关注。然而,传统的心理咨询模式往往受限于时间和地点,使得许多人在寻求心理帮助时感到不便。与此同时,心理课程的传播也面临着诸多挑战,如何高效地触达目标客户群体&a…

ElasticSearch 简介

一、什么是 ElastcSearch? ElasticSearch 是基于 Lucene 的 Restful 的分布式实时全文搜索引擎。 1.1 ElasticSearh 的基本术语概念 index 索引 索引类似与 mysql 中的数据库,ES 中的索引是存储数据的地方,包含了一堆有相似结构的文档数据…

条件概率相关公式

条件概率 条件概率是指在事件 B 已经发生的情况下,事件 A 发生的概率,记作 P(A∣B) 。其定义公式为: ( P(B) > 0 ) 全概率公式 全概率公式用于计算由一组互斥且完备的事件构成的事件的概率。设 是一组互斥且完备…

【C++】C++11(lambda、可变参数模板、包装器、线程库)

🌈个人主页:秦jh_-CSDN博客🔥 系列专栏:https://blog.csdn.net/qinjh_/category_12575764.html?spm1001.2014.3001.5482 ​ 目录 前言 lambda表达式 C98中的一个例子 lambda表达式语法 函数对象与lambda表达式 新的类功能…

12.11数据结构-图

无向完全图:在无向图中,如果任意两个顶点之间都存在边,则称该图为无向完全图。 有向完全图:在有向图中,如果任意两个顶点之间都存在方向相反的两条弧,则称该图为有向完全图。 含有n个顶点的无向完全图有…

深度学习作业 - 作业十一 - LSTM

问题一 推导LSTM网络中参数的梯度,并的分析其避免梯度消失的效果 LSTM网络是为了解简单RNN中存在的长程依赖问题而提出的一种新型网络结构,其主要思想是通过引入门控机制来控制数据的流通,门控机制包括输入门、遗忘门与输出门,同…

Sigrity System Explorer DC IR Drop Analysis模式进行直流压降仿真分析操作指导

Sigrity System Explorer DC IR Drop Analysis模式进行直流压降仿真分析操作指导 Sigrity System Explorer DC IR Drop Analysis模式可以用于直流压降仿真分析,通过搭建简易拓扑用于前仿真分析,下面搭建一个简易的直流系统进行说明,以下图为例,准备好PCB的SPICE模型SpiceNe…

华为HarmonyOS实现跨多个子系统融合的场景化服务 -- 4 设置打开App Button

场景介绍 本章节将向您介绍如何使用Button组件打开APP功能,可调用对应Button组件打开另一个应用。 效果图展示 单击“打开APP”按钮,出现提示弹窗,单击“允许”,跳转至新的应用界面。 说明 弹窗是否弹出以及弹窗效果与跳转目标…

Spring Security 6 系列之二 - 基于数据库的用户认证和认证原理

之所以想写这一系列,是因为之前工作过程中使用Spring Security,但当时基于spring-boot 2.3.x,其默认的Spring Security是5.3.x。之后新项目升级到了spring-boot 3.3.0,结果一看Spring Security也升级为6.3.0,关键是其风…

[笔记] Ubuntu Server 24.04安装MySql8,并配置远程连接

1、MySql安装 #更新列表 sudo apt update ​ #安装mysql sudo apt install mysql-server ​ #运行状态 mysql sudo service mysql status ​ # 安装完成,已自动启动,该步可以不用 启动 mysql sudo /etc/init.d/mysql start ​ # 该步骤可以不配置&…

软件开发中 Bug 为什么不能彻底消除

在软件开发中,Bug无法彻底消除的原因主要包括:软件复杂度高、人员认知与沟通受限、需求和环境不断变化、工具与测试覆盖不足、经济与时间成本制约。其中“需求和环境不断变化”尤为关键,因为在实际开发中,业务逻辑随着市场与用户反…

使用ElasticSearch实现全文检索

文章目录 全文检索任务描述技术难点任务目标实现过程1. java读取Json文件,并导入MySQL数据库中2. 利用Logstah完成MySQL到ES的数据同步3. 开始编写功能接口3.1 全文检索接口3.2 查询详情 4. 前端调用 全文检索 任务描述 在获取到数据之后如何在ES中进行数据建模&a…

《拉依达的嵌入式\驱动面试宝典》—C/CPP基础篇(三)

《拉依达的嵌入式\驱动面试宝典》—C/CPP基础篇(三) 你好,我是拉依达。 感谢所有阅读关注我的同学支持,目前博客累计阅读 27w,关注1.5w人。其中博客《最全Linux驱动开发全流程详细解析(持续更新)-CSDN博客》已经是 Linux驱动 相关内容搜索的推荐首位,感谢大家支持。 《拉…

调用完BAPI_PO_CREATE1创建采购订单之后,如果不调用BAPI_TRANSACTION_COMMIT,数据库里面没有数

在调用完BAPI_PO_CREATE1创建采购订单之后,如果不调用BAPI_TRANSACTION_COMMIT,那么就无法生成真正的采购订单号,在数据库里面没有数 运行结果 特别注意