Python 编写 Flink 应用程序经验记录(Flink1.17.1)

目录

官方API文档

提交作业到集群运行

官方示例

环境

实例处理Kafka后入库到Mysql

下载依赖

读取kafka数据

写入mysql数据


官方API文档

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/overview/

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/datastream/kafka/

提交作业到集群运行

#! /usr/bin/env python
# -*- coding: utf-8 -*-

# /opt/test_flink.py
if __name__ == "__main__":
    print("这是一个简单的测试用例")

flink 安装目录下的 examples 目录里面已经提供了一些测试案例,我们也可以直接拿它来做实验。

提交至集群

./bin/flink run -py 代码文件

通过 flink run 即可运行应用程序,由于 flink 既可运行 Java 程序、也可以运行 Python 程序,所以这里我们需要指定 -py 参数,表示运行的是 py 文件。但默认情况下解释器使用的 python2,当然如果你终端输入 python 进入的就是 python3 的话则当我没说,要是我们想指定 flink 使用 python3 解释器的话,则需要配置一个环境变量。

export PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3

下面来测试一下:

./bin/flink run -py /opt/test_flink.py

很明显结果是成功的,当然这里面没有涉及到任何与 Flink 有关的内容,只是演示如何提交一个 Python 应用程序。当然 flink run 是同时支持 Java、Python 等语言的。

不管使用哪种 API 进行编程,最终客户端都会生成 JobGraph 提交到 JM 上。但毕竟 Flink 的内核是采用 Java 语言编写的,如果 Python 应用程序变成 JobGraph 对象被提交到 Flink 集群上运行的话,那么 Python 虚拟机和 Java 虚拟机之间一定有某种方式,使得 Python 可以直接动态访问 Java 中的对象、Java 也可以回调 Python 中的对象。没错,实现这一点的便是 py4j。

提交单个 py 文件知道怎么做了,但如果该文件还导入了其它文件该怎么办呢?一个项目中还会涉及到包的存在。其实不管项目里的文件有多少,启动文件只有一个,只需要把这个启动文件提交上去即可。举例说明,当然这里仍不涉及具体和 Flink 相关的内容,先把如何提交程序这一步给走通。因为不管编写的程序多复杂,提交这一步骤是不会变的。

先来看看编写的程序:

flink_test 就是主目录,里面有一个 apps 子目录和一个 main.py 文件,apps 目录里面有三个 py 文件,对应的内容分别如图所示。然后将其提交到 Flink Standalone 集群上运行,命令和提交单个文件是一样的

即使是多文件,提交方式也是相似的,输出结果表明提交成功了。

官方示例

环境

  • Java 11
  • Python 3.7, 3.8, 3.9 or 3.10
python -m pip install apache-flink==1.17.1

编写 Flink Python Table API 程序的第一步是创建 TableEnvironment。这是 Python Table API 作业的入口类。

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")

接下来,我们将介绍如何创建源表和结果表。

t_env.create_temporary_table(
    'source',
    TableDescriptor.for_connector('filesystem')
        .schema(Schema.new_builder()
                .column('word', DataTypes.STRING())
                .build())
        .option('path', input_path)
        .format('csv')
        .build())
tab = t_env.from_path('source')

t_env.create_temporary_table(
    'sink',
    TableDescriptor.for_connector('filesystem')
        .schema(Schema.new_builder()
                .column('word', DataTypes.STRING())
                .column('count', DataTypes.BIGINT())
                .build())
        .option('path', output_path)
        .format(FormatDescriptor.for_format('canal-json')
                .build())
        .build())

你也可以使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表:

my_source_ddl = """
    create table source (
        word STRING
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '{}'
    )
""".format(input_path)

my_sink_ddl = """
    create table sink (
        word STRING,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'canal-json',
        'path' = '{}'
    )
""".format(output_path)

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

上面的程序展示了如何创建及注册表名分别为 source 和 sink 的表。 其中,源表 source 有一列: word,该表代表了从 input_path 所指定的输入文件中读取的单词; 结果表 sink 有两列: word 和 count,该表的结果会输出到 output_path 所指定的输出文件中。

接下来,我们介绍如何创建一个作业:该作业读取表 source 中的数据,进行一些变换,然后将结果写入表 sink

最后,需要做的就是启动 Flink Python Table API 作业。上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当 execute_insert(sink_name) 被调用的时候, 作业才会被真正提交到集群或者本地进行执行。

@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):
    for s in line[0].split():
        yield Row(s)

# 计算 word count
tab.flat_map(split).alias('word') \
    .group_by(col('word')) \
    .select(col('word'), lit(1).count) \
    .execute_insert('sink') \
    .wait()

该教程的完整代码如下:

import argparse
import logging
import sys

from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf

word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "Or to take arms against a sea of troubles,",
                   "And by opposing end them?--To die,--to sleep,--",
                   "No more; and by a sleep to say we end",
                   "The heartache, and the thousand natural shocks",
                   "That flesh is heir to,--'tis a consummation",
                   "Devoutly to be wish'd. To die,--to sleep;--",
                   "To sleep! perchance to dream:--ay, there's the rub;",
                   "For in that sleep of death what dreams may come,",
                   "When we have shuffled off this mortal coil,",
                   "Must give us pause: there's the respect",
                   "That makes calamity of so long life;",
                   "For who would bear the whips and scorns of time,",
                   "The oppressor's wrong, the proud man's contumely,",
                   "The pangs of despis'd love, the law's delay,",
                   "The insolence of office, and the spurns",
                   "That patient merit of the unworthy takes,",
                   "When he himself might his quietus make",
                   "With a bare bodkin? who would these fardels bear,",
                   "To grunt and sweat under a weary life,",
                   "But that the dread of something after death,--",
                   "The undiscover'd country, from whose bourn",
                   "No traveller returns,--puzzles the will,",
                   "And makes us rather bear those ills we have",
                   "Than fly to others that we know not of?",
                   "Thus conscience does make cowards of us all;",
                   "And thus the native hue of resolution",
                   "Is sicklied o'er with the pale cast of thought;",
                   "And enterprises of great pith and moment,",
                   "With this regard, their currents turn awry,",
                   "And lose the name of action.--Soft you now!",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]


def word_count(input_path, output_path):
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    # write all the data to one file
    t_env.get_config().set("parallelism.default", "1")

    # define the source
    if input_path is not None:
        t_env.create_temporary_table(
            'source',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .build())
                .option('path', input_path)
                .format('csv')
                .build())
        tab = t_env.from_path('source')
    else:
        print("Executing word_count example with default input data set.")
        print("Use --input to specify file input.")
        tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
                                  DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))

    # define the sink
    if output_path is not None:
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .column('count', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format(FormatDescriptor.for_format('canal-json')
                        .build())
                .build())
    else:
        print("Printing result to stdout. Use --output to specify output path.")
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('print')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .column('count', DataTypes.BIGINT())
                        .build())
                .build())

    @udtf(result_types=[DataTypes.STRING()])
    def split(line: Row):
        for s in line[0].split():
            yield Row(s)

    # compute word count
    tab.flat_map(split).alias('word') \
        .group_by(col('word')) \
        .select(col('word'), lit(1).count) \
        .execute_insert('sink') \
        .wait()
    # remove .wait if submitting to a remote cluster, refer to
    # https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
    # for more details


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    word_count(known_args.input, known_args.output)

接下来,可以在命令行中运行作业(假设作业名为 word_count.py):

python word_count.py

上述命令会构建 Python Table API 程序,并在本地 mini cluster 中运行。如果想将作业提交到远端集群执行, 可以参考作业提交示例。

最后,你可以得到如下运行结果:

实例处理Kafka后入库到Mysql

下载依赖

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar

读取kafka数据

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import sys
import logging

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import WatermarkStrategy, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer

from pyflink.common import Row
from pyflink.datastream import FlatMapFunction

def read_kafka():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///D:/安技汇/运营平台/DataManage/flink-sql-connector-kafka-1.17.1.jar")

    source = KafkaSource.builder() \
        .set_bootstrap_servers("172.16.12.128:9092") \
        .set_topics("test") \
        .set_group_id("my-group") \
        .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
        .set_value_only_deserializer(SimpleStringSchema()) \
        .build()
        # 从消费组提交的位点开始消费,不指定位点重置策略
        #.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets()) \
        # 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
        #.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(KafkaOffsetResetStrategy.EARLIEST)) \
        # 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
        #.set_starting_offsets(KafkaOffsetsInitializer.timestamp(1657256176000)) \
        # 从最早位点开始消费
        #.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
        # 从最末尾位点开始消费
        #.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
        #.set_property("partition.discovery.interval.ms", "10000")  # 每 10 秒检查一次新分区
        #.set_property("security.protocol", "SASL_PLAINTEXT") \
        #.set_property("sasl.mechanism", "PLAIN") \
        #.set_property("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")

    kafka_stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
    kafka_stream.print()

    env.execute("Source")


if __name__ == "__main__":
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    read_kafka()

写入mysql数据

没通,待补充。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/104848.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

软件工程17-18期末试卷

2.敏捷开发提倡一个迭代80%以上的时间都在编程,几乎没有设计阶段。敏捷方法可以说是一种无计划性和纪律性的方法。错 敏捷开发是一种软件开发方法论,它强调快速响应变化、持续交付有价值的软件、紧密合作和适应性。虽然敏捷方法鼓励迭代开发和灵活性&…

腾讯云国际-如何使用对象存储COS在 CKafka 控制台创建数据异步拉取任务?腾讯云代充

操作场景 Datahub 支持接入各种数据源产生的不同类型的数据,统一管理,再分发给下游的离线/在线处理平台,构建清晰的数据通道。 本文以 COS 数据为例介绍如何在 CKafka 控制台创建数据异步拉取任务,并对任务进行修改配置&#xf…

CVE-2023-46227 Apache inlong JDBC URL反序列化漏洞

项目介绍 Apache InLong(应龙)是一站式、全场景的海量数据集成框架,同时支持数据接入、数据同步和数据订阅,提供自动、安全、可靠和高性能的数据传输能力,方便业务构建基于流式的数据分析、建模和应用。 项目地址 h…

Python 中的 Pexpect

我们将通过示例介绍Python中的Pexpect。 Python 中的 Pexpect Python 是一种非常流行的语言,用于数据科学和机器学习。 它是一种非常强大的语言,因为 Python 具有可用于不同目的的内置库。 在这篇文章中,我们将研究Python中的pexpect。 Pex…

性能测试 —— Jmeter 常用三种定时器!

1、同步定时器 位置:HTTP请求->定时器->Synchronizing Timer 当需要进行大量用户的并发测试时,为了让用户能真正的同时执行,添加同步定时器,用户阻塞线程,知道线程数达到预先配置的数值,才开始执行…

k8s客户端配置

K8s客户端安装 前提 K8s服务部署成功,如下 角色 IP地址 操作系统 主机名 Kubernetes版本 master节点 172.16.4.167 CentOS 7.9 k8s-master01 v1.28.2 工作节点1 172.16.4.168 CentOS 7.9 k8s-worker01 v1.28.2 工作节点2 172.16.4.169 CentOS 7.9…

竞赛选题 深度学习卷积神经网络垃圾分类系统 - 深度学习 神经网络 图像识别 垃圾分类 算法 小程序

文章目录 0 简介1 背景意义2 数据集3 数据探索4 数据增广(数据集补充)5 垃圾图像分类5.1 迁移学习5.1.1 什么是迁移学习?5.1.2 为什么要迁移学习? 5.2 模型选择5.3 训练环境5.3.1 硬件配置5.3.2 软件配置 5.4 训练过程5.5 模型分类效果(PC端) 6 构建垃圾…

2.7每日一题(分段函数不定积分)

1、分段函数不定积分是分段做,但分段得出原函数后必须写成C1,C2,C3;(直接全部写成C是错误的) 2、又因为原函数可导,可导必连续,所以根据原函数在分界点的连续性找到C1,C…

吃豆人C语言开发—Day2 需求分析 流程图 原型图

目录 需求分析 流程图 原型图 主菜单: 设置界面: 地图选择: 游戏界面: 收集完成提示: 游戏胜利界面: 游戏失败界面 死亡提示: 这个项目是我和朋友们一起开发的,在此声明一下…

Qt 窗口的尺寸

本文通过多个案例,详细说明关于Qt窗体尺寸的一些重要问题 默认尺寸 对于一个Qt的窗口(继承于QWidget),获取其窗体尺寸的方法size(); 以一个Qt创建Qt Widgets Application项目的默认生成代码为基础,做如下…

实现基于 Jenkins 的多服务器打包方案

实现基于 Jenkins 的多服务器打包方案 在实际项目中,我们经常会遇到需要将一个应用程序或服务部署到不同的服务器上的需求。而使用 Jenkins 可以很方便地自动化这个过程。 设置参数 首先,我们需要设置一些参数,以便在构建过程中指定要部署…

Element UI + Vue 新增和编辑共用表单校验无法清除问题(已解决)

问题描述 在新增和编辑过程中大部分情况下 两个表单是一致的,而且编辑也有回显需要,所有绝大多数情况下 都是一个表单两个用处,但是随之而来出现了一个无法清除校验的问题,在先点击编辑后再点击新增会出现校验红字: …

防止消息丢失与消息重复——Kafka可靠性分析及优化实践

系列文章目录 上手第一关,手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么,以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析,打破面试难关 防止消息丢失与消息重复——Kafka可…

【Linux】进程优先级|进程并发概念|在vim中批量化注释

文章目录 前言tips——如何在vim中批量化注释进程更深度理解一、什么是进程优先级二、 为什么要有优先级三、Linux怎么设置优先级查看进程优先级的命令PRI and NI用top命令更改已存在进程的nice: 如何根据优先级开展调度呢?五、其他概念并发(…

css四种导入方式

1 行内样式 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title> </head> <body> <h1 style"color: blue">我是标题</h1> </body> </htm…

【Objective-C】浅析Block及其捕获机制

目录 Block的基本使用Block的声明Block的实现Block的调用 Block作为形参使用Block作为属性使用给Block起别名Block的copy Block的捕获机制auto类型的局部变量__block浅析static类型的局部变量全局变量 其他问题 Block的基本使用 什么是Block&#xff1f; Block &#xff08;块…

超市商品管理系统 毕业设计 JAVA+Vue+SpringBoot+MySQL

项目下载地址 目录 项目下载地址 一、摘要1.1 简介1.2 项目录屏 二、研究内容2.1 数据中心模块2.2 超市区域模块2.3 超市货架模块2.4 商品类型模块2.5 商品档案模块 三、系统设计3.1 用例图3.2 时序图3.3 类图3.4 E-R图 四、系统实现4.1 登录4.2 注册4.3 主页4.4 超市区域管理4…

【嵌入式开发学习】__单片机中容易造成内存泄露的几个痛点

目录 前言 一、程序运行 二、什么是内存泄露&#xff1f; 三、内存泄露的严重后果&#xff01; 四、如何定位到泄露的要点&#xff1f; 五、三大痛点 1. 访问越界 2. 栈 3. 堆 六、泄露常见的场景 1. 重新赋值 2. 首先释放父块 3. 返回值的不正确处理 七、常见的…

Azure - 机器学习:创建机器学习所需资源,配置工作区

目录 一、Azure机器学习工作区与计算实例简要介绍工作区计算实例 二、创建工作区1. 登录到 Azure 机器学习工作室2. 选择“创建工作区”3. 提供以下信息来配置新工作区&#xff1a;4. 选择“创建”以创建工作区 三、创建计算实例四、工作室实战4.1 工作室快速导览4.2 从示例笔记…

常用linux命令 linux_cmd_sheet

查看文件大小 ls -al 显示每个文件的kb大小 查看系统日志 dmesg -T | tail 在 top 命令中&#xff0c;RES 和 VIRT&#xff08;或者 total-vm&#xff09;是用来表示进程内存使用的两个不同指标&#xff0c;它们之间有以下区别&#xff1a; RES&#xff08;Resident Set Size…