1、环境
JDK版本:1.8.0_412
python版本:3.10.6
apache-flink版本:1.20.0
flink版本:1.20
kafka版本:kafka_2.12-3.1.1
flink-sql-connector-kafka版本:3.3.0-1.20
2、执行python-flink脚本
从kafka的demo获取消息,并将其中的a字段存入kafka的test_kafka_topic内,并打印sum(b)的值
from pyflink.table import TableEnvironment, EnvironmentSettings
def log_processing():
# 创建流处理环境
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
# 设置 Kafka 连接器 JAR 文件的路径
# 确保 JAR 文件确实存在于指定路径,并且与 Flink 版本兼容
t_env.get_config().get_configuration().set_string(
"pipeline.jars",
"file:///home/data/flink/flink-1.20.0/lib/flink-sql-connector-kafka-3.3.0-1.20.jar"
)
# 定义源表 DDL
source_ddl = """
CREATE TABLE source_table(
a VARCHAR,
b INT -- 如果 b 字段不重要,可以考虑从源表中移除它
) WITH (
'connector' = 'kafka',
'topic' = 'demo',
'properties.bootstrap.servers' = '192.168.15.130:9092',
'properties.group.id' = 'test_3',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
# 定义目标表 DDL
sink_ddl = """
CREATE TABLE sink_table(
a VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'test_kafka_topic',
'properties.bootstrap.servers' = '192.168.15.130:9092',
'format' = 'json'
)
"""
# 执行 DDL 语句创建表
t_env.execute_sql(source_ddl)
#table = t_env.from_path("sql_source")
#table.execute().print()
table_result = t_env.execute_sql("select sum(b) sb from source_table")
table_result.print()
t_env.execute_sql(sink_ddl)
# 执行 SQL 查询并将结果插入到目标表
# 注意:wait() 方法会阻塞,直到插入操作完成(在流处理中通常是无限的)
t_env.sql_query("SELECT a FROM source_table") \
.execute_insert("sink_table").wait() # 考虑是否真的需要 wait()
if __name__ == '__main__':
log_processing()
python3 KafkaSource.py
3、启动kafka生产者
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-producer.sh --broker-list 192.168.15.130:9092 --topic demo
输入模拟数据进行测试
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 42}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}
>{"a": "example_string_1672531199", "b": 4}
可以看到sum(b)值已输出
4、启动kafka消费者
查看往test_kafka_topic插入的a字段数据已被消费
/usr/local/kafka_2.12-3.1.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.15.130:9092 --from-beginning --topic test_kafka_topic