PyFlink使用教程,Flink,Python,Java

环境准备

环境要求

Java 11
Python 3.7, 3.8, 3.9 or 3.10

文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/installation/

打开 Anaconda3 Prompt

> java -version
java version "11.0.22" 2024-01-16 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.22+9-LTS-219)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.22+9-LTS-219, mixed mode)

> python --version
Python 3.8.8

> conda env list
# conda environments:
#
base                  *  d:\ProgramData\Anaconda3
tensorflow2.4            d:\ProgramData\Anaconda3\envs\tensorflow2.4

> conda create -n PyFlink-1.17.1 python==3.8.8

> conda activate PyFlink-1.17.1

> python -m pip install apache-flink==1.17.1

> conda list
# packages in environment at d:\ProgramData\Anaconda3\envs\PyFlink-1.17.1:
#
# Name                    Version                   Build  Channel
apache-beam               2.43.0                   pypi_0    pypi
apache-flink              1.17.1                   pypi_0    pypi
apache-flink-libraries    1.17.1                   pypi_0    pypi
avro-python3              1.9.2.1                  pypi_0    pypi
ca-certificates           2023.12.12           haa95532_0    defaults
certifi                   2023.11.17               pypi_0    pypi
charset-normalizer        3.3.2                    pypi_0    pypi
cloudpickle               2.2.0                    pypi_0    pypi
crcmod                    1.7                      pypi_0    pypi
dill                      0.3.1.1                  pypi_0    pypi
docopt                    0.6.2                    pypi_0    pypi
fastavro                  1.4.7                    pypi_0    pypi
fasteners                 0.19                     pypi_0    pypi
grpcio                    1.60.0                   pypi_0    pypi
hdfs                      2.7.3                    pypi_0    pypi
httplib2                  0.20.4                   pypi_0    pypi
idna                      3.6                      pypi_0    pypi
numpy                     1.21.6                   pypi_0    pypi
objsize                   0.5.2                    pypi_0    pypi
openssl                   1.1.1w               h2bbff1b_0    defaults
orjson                    3.9.12                   pypi_0    pypi
pandas                    1.3.5                    pypi_0    pypi
pip                       23.3.1           py38haa95532_0    defaults
proto-plus                1.23.0                   pypi_0    pypi
protobuf                  3.20.3                   pypi_0    pypi
py4j                      0.10.9.7                 pypi_0    pypi
pyarrow                   8.0.0                    pypi_0    pypi
pydot                     1.4.2                    pypi_0    pypi
pymongo                   3.13.0                   pypi_0    pypi
pyparsing                 3.1.1                    pypi_0    pypi
python                    3.8.0                hff0d562_2    defaults
python-dateutil           2.8.2                    pypi_0    pypi
pytz                      2023.3.post1             pypi_0    pypi
regex                     2023.12.25               pypi_0    pypi
requests                  2.31.0                   pypi_0    pypi
setuptools                68.2.2           py38haa95532_0    defaults
six                       1.16.0                   pypi_0    pypi
sqlite                    3.41.2               h2bbff1b_0    defaults
typing-extensions         4.9.0                    pypi_0    pypi
urllib3                   2.1.0                    pypi_0    pypi
vc                        14.2                 h21ff451_1    defaults
vs2015_runtime            14.27.29016          h5e58377_2    defaults
wheel                     0.41.2           py38haa95532_0    defaults
zstandard                 0.22.0                   pypi_0    pypi

下载的包存储在Anaconda3\envs\PyFlink-1.17.1\Lib\site-packages

PyFlink 案例

从Flink 1.11版本开始, PyFlink 作业支持在 Windows 系统上运行,因此您也可以在 Windows 上开发和调试 PyFlink 作业了。

打开 VSCode 切换到 PyFlink-1.17.1 环境,按照 教程 写一个 Table API 的示例

learn_pyflink/tableAPIJob.py

import argparse
import logging
import sys

from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf

word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "Or to take arms against a sea of troubles,",
                   "And by opposing end them?--To die,--to sleep,--",
                   "No more; and by a sleep to say we end",
                   "The heartache, and the thousand natural shocks",
                   "That flesh is heir to,--'tis a consummation",
                   "Devoutly to be wish'd. To die,--to sleep;--",
                   "To sleep! perchance to dream:--ay, there's the rub;",
                   "For in that sleep of death what dreams may come,",
                   "When we have shuffled off this mortal coil,",
                   "Must give us pause: there's the respect",
                   "That makes calamity of so long life;",
                   "For who would bear the whips and scorns of time,",
                   "The oppressor's wrong, the proud man's contumely,",
                   "The pangs of despis'd love, the law's delay,",
                   "The insolence of office, and the spurns",
                   "That patient merit of the unworthy takes,",
                   "When he himself might his quietus make",
                   "With a bare bodkin? who would these fardels bear,",
                   "To grunt and sweat under a weary life,",
                   "But that the dread of something after death,--",
                   "The undiscover'd country, from whose bourn",
                   "No traveller returns,--puzzles the will,",
                   "And makes us rather bear those ills we have",
                   "Than fly to others that we know not of?",
                   "Thus conscience does make cowards of us all;",
                   "And thus the native hue of resolution",
                   "Is sicklied o'er with the pale cast of thought;",
                   "And enterprises of great pith and moment,",
                   "With this regard, their currents turn awry,",
                   "And lose the name of action.--Soft you now!",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]


def word_count(input_path, output_path):
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    # write all the data to one file
    t_env.get_config().set("parallelism.default", "1")

    # define the source
    if input_path is not None:
        t_env.create_temporary_table(
            'source',
            TableDescriptor.for_connector('filesystem')
            .schema(Schema.new_builder()
                    .column('word', DataTypes.STRING())
                    .build())
            .option('path', input_path)
            .format('csv')
            .build())
        tab = t_env.from_path('source')
    else:
        print("Executing word_count example with default input data set.")
        print("Use --input to specify file input.")
        tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
                                  DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))

    # define the sink
    if output_path is not None:
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('filesystem')
            .schema(Schema.new_builder()
                    .column('word', DataTypes.STRING())
                    .column('count', DataTypes.BIGINT())
                    .build())
            .option('path', output_path)
            .format(FormatDescriptor.for_format('canal-json')
                    .build())
            .build())
    else:
        print("Printing result to stdout. Use --output to specify output path.")
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('print')
            .schema(Schema.new_builder()
                    .column('word', DataTypes.STRING())
                    .column('count', DataTypes.BIGINT())
                    .build())
            .build())

    @udtf(result_types=[DataTypes.STRING()])
    def split(line: Row):
        for s in line[0].split():
            yield Row(s)

    # compute word count
    tab.flat_map(split).alias('word') \
        .group_by(col('word')) \
        .select(col('word'), lit(1).count) \
        .execute_insert('sink') \
        .wait()
    # remove .wait if submitting to a remote cluster, refer to
    # https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
    # for more details


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout,
                        level=logging.INFO, format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    word_count(known_args.input, known_args.output)

要在PyFlink-1.17.1环境下运行

> python tableAPIJob.py


Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
Executing word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
+I[To, 1]
+I[be,, 1]
+I[or, 1]
+I[not, 1]
+I[to, 1]
+I[be,--that, 1]
+I[is, 1]
+I[the, 1]
+I[question:--, 1]
+I[Whether, 1]
+I['tis, 1]
+I[nobler, 1]
+I[in, 1]
-U[the, 1]
+U[the, 2]
+I[mind, 1]
-U[to, 1]
+U[to, 2]
.
.
.

提交 PyFlink 作业到 Flink

参考:https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/deployment/cli/#submitting-pyflink-jobs

我的 Flink 是安装在 WSL 上面的,因此也要准备环境。

下载 java-11-linux:https://download.oracle.com/otn/java/jdk/11.0.22%2B9/8662aac2120442c2a89b1ee9c67d7069/jdk-11.0.22_linux-x64_bin.tar.gz

> tar -zxf jdk-11.0.22_linux-x64_bin.tar.gz -C /usr/lib/jdk

# 生成 jre
> bin/jlink --module-path jmods --add-modules java.desktop --output jre

> vi /etc/profile

export JAVA_HOME=/usr/lib/jdk/jdk-11.0.22
export JRE_HOME=${JAVA_HOME}/jre    
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib    
export PATH=${JAVA_HOME}/bin:$PATH

> source /etc/profile 

> ls -l /usr/bin/python*

lrwxrwxrwx 1 root root       9 Mar 26  2019 /usr/bin/python3 -> python3.7
lrwxrwxrwx 1 root root      16 Mar 26  2019 /usr/bin/python3-config -> python3.7-config
-rwxr-xr-x 1 root root    1018 Mar  4  2018 /usr/bin/python3-jsondiff
-rwxr-xr-x 1 root root    3661 Mar  4  2018 /usr/bin/python3-jsonpatch
-rwxr-xr-x 1 root root    1342 May  2  2016 /usr/bin/python3-jsonpointer
-rwxr-xr-x 1 root root     398 Nov 22  2018 /usr/bin/python3-jsonschema
-rwxr-xr-x 2 root root 4877888 Apr  3  2019 /usr/bin/python3.7
lrwxrwxrwx 1 root root      33 Apr  3  2019 /usr/bin/python3.7-config -> x86_64-linux-gnu-python3.7-config
-rwxr-xr-x 2 root root 4877888 Apr  3  2019 /usr/bin/python3.7m
lrwxrwxrwx 1 root root      34 Apr  3  2019 /usr/bin/python3.7m-config -> x86_64-linux-gnu-python3.7m-config
lrwxrwxrwx 1 root root      10 Mar 26  2019 /usr/bin/python3m -> python3.7m
lrwxrwxrwx 1 root root      17 Mar 26  2019 /usr/bin/python3m-config -> python3.7m-config

> python --version

Command 'python' not found, but can be installed with:

apt install python3         # version 3.7.3-1, or
apt install python          # version 2.7.16-1
apt install python-minimal  # version 2.7.16-1

You also have python3 installed, you can run 'python3' instead.

> python3 --version
Python 3.7.3

# 已经安装了 python-3.7.3,创建一个软连接即可
> ln -s /usr/bin/python3.7 /usr/local/bin/python

> python --version
Python 3.7.3

# 设置镜像源,否则会非常慢
> python -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade pip
> pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple

# 启动 Flink-1.17.1
> bin/start-cluster.sh

除此之外,在 WSL 上还需要安装有此python脚本依赖的库,也就是 apache-flink 库。因为 Flink 需要调用 python 命令来解析 pytion 脚本,这里面涉及到 python 和 java 之间的通讯。这一块还只是在 Flink 客户端上面(bin/flink run ...),而 Flink 的 TaskManager 在运行此任务的时候还需要调用 python 解释器,因为上面代码中有UDF函数,这个函数在Java中是不存在的,关于 Flink 支持 Python 任务的内部原理后面再写一篇。

> python -m pip install apache-flink==1.17.1
> pip list

然后将代码中的.wait()调用删掉

tab.flat_map(split).alias('word') \
       .group_by(col('word')) \
       .select(col('word'), lit(1).count) \
       .execute_insert('sink')

提交任务

> ./bin/flink run --python /mnt/d/dev/php/magook/trunk/server/learn-python/learn_pyflink/tableAPIJob.py

Executing word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID a18e581d16785a9872336073efdf5df0

来到 webUI 查看任务

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/358220.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

信息安全考证攻略

🔥在信息安全领域,拥有相关的证书不仅能提升自己的专业技能,更能为职业生涯增添不少光彩。下面为大家盘点了一些国内外实用的信息安全证书,让你一睹为快! 🌟国内证书(认证机构:中国信…

网工,这才是跳纤的正确姿势!

晚上好,我的网工朋友。 当你们看到下面这张图,内心是什么感想? 这时你是不是巴不得把所有线全部拔了,来重新整一遍哈哈哈哈。那话说到这,到底该如何跳纤呢?有没有什么秘诀呢?遵循什么原则&#…

GLOBALCHIP GC3909Pin to Pin兼容A3909/allegro电机驱动芯片产品参数分析,应用于摇头机,舞台灯,打印机,白色家电等

GLOBALCHIP GC3909 12V H 桥驱动器芯片替代A3909/Allegro产品概述: GC3909是一款双通道12V直流电机驱动芯片,为摄像机、消费类产品、玩具和其他低压或者电池供电的运动控制类应用提供了集成的电机驱动解决方案。芯片一般用来驱动两个直流电机或者驱动一个步进电机。…

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之DataPanel组件

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之DataPanel组件 一、操作环境 操作系统: Windows 10 专业版、IDE:DevEco Studio 3.1、SDK:HarmonyOS 3.1 二、DataPanel组件 数据面板组件,用于将多个数据占比情况使用占比图进…

网络安全全栈培训笔记(59-服务攻防-中间件安全CVE复现lSApacheTomcataNginx)

第59天 服务攻防-中间件安全&CVE复现&lS&Apache&Tomcata&Nginx 知识点: 中间件及框架列表: lIS,Apache,Nginx,Tomcat,Docker,Weblogic,JBoos,WebSphere,Jenkins, GlassFish,Jira,Struts2,Laravel,Solr,Shiro,Thinkphp,Sprng,Flask,…

Linux实验记录:使用iptables

前言: 本文是一篇关于Linux系统初学者的实验记录。 参考书籍:《Linux就该这么学》 实验环境: VmwareWorkStation 17——虚拟机软件 RedHatEnterpriseLinux[RHEL]8——红帽操作系统 备注: 防火墙作为公网与内网的屏障&#…

【linux】磁盘空间不足-常用排查和处理命令

【linux】磁盘空间不足-常用排查和处理命令 1.通查一下 df -h #查看服务器磁盘空间情况 du -hs * 2>/dev/null #列出各目录所占空间大小 或 du -h -d 1 2>/dev/null #列出各目录所占空间大小 1.1情况一 df 磁盘空间和du 目录空间占用相等&#xff0c…

C++中 this指针、构造函数、析构函数

1.this指针 我们定义一个日期类来举例子 对于上述类,有这样一个问题,Date类中有Init和Print这两个成员函数,函数体中没有关于不同对象的区分,那d1调用函数的时候,编译器是如和来确定d1而不是d2呢?C通过引入…

Linux线程安全

Linux线程安全 Linux线程互斥进程线程间的互斥相关背景概念互斥量mutex互斥量的接口 可重入VS线程安全常见锁概念死锁的四个必要条件 Linux线程同步条件变量 Linux线程互斥 进程线程间的互斥相关背景概念 临界资源和临界区 进程之间如果要进行通信我们需要先创建第三方资源&a…

虚拟机安装Centos8.5

记得看目录哦! 附件1. 新建虚拟机2. 安装Centos8.5 附件 安装包自行下载 https://mirrors.aliyun.com/centos/8/isos/x86_64/ 1. 新建虚拟机 2. 安装Centos8.5 启动虚拟机–选择第一个install Centos8.5 记得接收许可证

VUE3子表格嵌套分页查询互相干扰的问题解决

VUE3在表格中嵌套子表格子表格的分页查询互相干扰的问题解决 简单嵌套 如果不需要做子表格的分页查询,那么可以直接在主表格中嵌套子表格,有两种方式;一种是主表格加载的同时加载子表格数据,另一种是点击展开时加载子表格数据&…

(2024,初始化原型嵌入,扩散模型微调,类别特征正则化,对象特定损失)使用原型嵌入对文本到图像扩散进行对象驱动的单样本微调

Object-Driven One-Shot Fine-tuning of Text-to-Image Diffusion with Prototypical Embedding 公和众和号:EDPJ(进 Q 交流群:922230617 或加 VX:CV_EDPJ 进 V 交流群) 目录 0. 摘要 3. 方法 3.1 概述 3.2 LDM …

面试官要你介绍项目,怎么说?

🔥 交流讨论:欢迎加入我们一起学习! 🔥 资源分享:耗时200小时精选的「软件测试」资料包 🔥 教程推荐:火遍全网的《软件测试》教程 📢欢迎点赞 👍 收藏 ⭐留言 &#x1…

Golang数据结构性能优化实践

仅仅通过对struct字段重新排序,优化内存对齐方式,就可以获得明显的内存和执行效率提升。原文: How to Speed Up Your Struct in Golang Mike Pexels 如果你有Golang开发经验,一定定义过struct类型。 但可能你不知道,通过简单的重新…

数据据库八之 视图、触发器、事务

【零】数据准备 【1】创建表 (1)部门表 d_id是部门的编号d_name是部门的名字 # 确保表不存在 drop table if exists department; # 创建表 create table department( d_id int auto_increment primary key, d_name varchar(6) )auto_increment 501 …

【linux|java应用报错】Cannot allocate memory

启动一个java应用报Cannot allocate memory,并且会生产一个hs_ess_pid.log文件。 文件内容为: #内存不足,Java运行时环境无法继续。 #本机内存分配(mmap)无法映射4294967296字节以提交保留内存。 【排查】 1、尝试使…

Mysql-事务(隔离级别,事务底层原理,MVCC)

什么是事务?有哪些特性? 事务:事务指的是逻辑上的一组操作,组成这组操作的各个单元要么全都成功,要么全都失败。 事务特性: 原子性(Atomicity): 原子性是指事务是一个不…

Python tkinter (11) —— Frame控件

本文主要是Python tkinter Frame框架控件介绍及使用简单示例。 tkinter系列文章 python tkinter窗口简单实现 Python tkinter (1) —— Label标签 Python tkinter (2) —— Button标签 Python tkinter (3) —— Entry标签 Python tkinter (4) —— Text控件 Python tkint…

大健康行业千城万企信用建设工作启动大会在京召开

9月19日,为响应商务部、中宣部、国家发改委等13个部门共同举办的“诚信兴商宣传月”活动,中国国际电子商务中心所属北京国富泰信用管理有限公司联合北京华商国医堂集团及旗下东方岐黄商学院,北京华商国医堂中医药研究院举办的共筑信用月&…

Mov转MP4怎么转换?如何播放mov视频?

MOV文件格式的使用场景 MOV文件格式以其支持多种媒体数据类型的特性而闻名,包括视频、音频、文本、动画等。它常用于存储包含视频剪辑、电影、音频轨道等多媒体元素的文件。由于其在质量和编辑方面的优越性,MOV文件在电影制作、广告宣传、多媒体演示等领…