Flink流批一体计算(16):PyFlink DataStream API

目录

概述

Pipeline Dataflow

代码示例WorldCount.py

执行脚本WorldCount.py


概述

Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。

用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成。

Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。

当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。

 FlinkKafkaConsumer是一个Source OperatorMapKeyByTimeWindowApplyTransformation OperatorRollingSink是一个Sink Operator

Pipeline Dataflow

Flink中,程序是并行和分布式的方式运行。一个Stream可以被分成多个Stream分区(Stream Partitions),一个Operator可以被分成多个Operator Subtask

Flink内部有一个优化的功能,根据上下游算子的紧密程度来进行优化。

紧密度低的算子则不能进行优化,而是将每一个Operator Subtask放在不同的线程中独立执行。一个Operator的并行度,等于Operator Subtask的个数,一个Stream的并行度(分区总数)等于生成它的Operator的并行度。

 紧密度高的算子可以进行优化,优化后可以将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行。

图中上半部分表示的是将Source和map两个紧密度高的算子优化后串成一个Operator Chain,实际上一个Operator Chain就是一个大的Operator的概念。图中的Operator Chain表示一个Operator,keyBy表示一个Operator,Sink表示一个Operator,它们通过Stream连接,而每个Operator在运行时对应一个Task,也就是说图中的上半部分有3个Operator对应的是3个Task。

图中下半部分是上半部分的一个并行版本,对每一个Task都并行化为多个Subtask,这里只是演示了2个并行度,Sink算子是1个并行度。

代码示例WorldCount.py

在本章中,你将学习如何使用 PyFlink 和 DataStream API 构建一个简单的流式应用程序。

编写一个简单的 Python DataStream 作业。

该程序读取一个 csv 文件,计算词频,并将结果写到一个结果文件中。

import argparse
import logging
import sys
from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,RollingPolicy)


word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "Or to take arms against a sea of troubles,",
                   "And by opposing end them?--To die,--to sleep,--",
                   "No more; and by a sleep to say we end",
                   "The heartache, and the thousand natural shocks",
                   "That flesh is heir to,--'tis a consummation",
                   "Devoutly to be wish'd. To die,--to sleep;--",
                   "To sleep! perchance to dream:--ay, there's the rub;",
                   "For in that sleep of death what dreams may come,",
                   "When we have shuffled off this mortal coil,",
                   "Must give us pause: there's the respect",
                   "That makes calamity of so long life;",
                   "For who would bear the whips and scorns of time,",
                   "The oppressor's wrong, the proud man's contumely,",
                   "The pangs of despis'd love, the law's delay,",
                   "The insolence of office, and the spurns",
                   "That patient merit of the unworthy takes,",
                   "When he himself might his quietus make",
                   "With a bare bodkin? who would these fardels bear,",
                   "To grunt and sweat under a weary life,",
                   "But that the dread of something after death,--",
                   "The undiscover'd country, from whose bourn",
                   "No traveller returns,--puzzles the will,",
                   "And makes us rather bear those ills we have",
                   "Than fly to others that we know not of?",
                   "Thus conscience does make cowards of us all;",
                   "And thus the native hue of resolution",
                   "Is sicklied o'er with the pale cast of thought;",
                   "And enterprises of great pith and moment,",
                   "With this regard, their currents turn awry,",
                   "And lose the name of action.--Soft you now!",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]


def word_count(input_path, output_path):
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    # write all the data to one file
    env.set_parallelism(1)
    # define the source
    if input_path is not None:
        ds = env.from_source(
            source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
                                                       input_path)
                             .process_static_file_set().build(),
            watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
            source_name="file_source"
        )
    else:
        print("Executing word_count example with default input data set.")
        print("Use --input to specify file input.")
        ds = env.from_collection(word_count_data)


    def split(line):
        yield from line.split()
    # compute word count


    ds = ds.flat_map(split) \
           .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
           .key_by(lambda i: i[0]) \
           .reduce(lambda i, j: (i[0], i[1] + j[1]))
    # define the sink
    if output_path is not None:
        ds.sink_to(
            sink=FileSink.for_row_format(
                base_path=output_path,
                encoder=Encoder.simple_string_encoder())
            .with_output_file_config(
                OutputFileConfig.builder()
                .with_part_prefix("prefix")
                .with_part_suffix(".ext")
                .build())
            .with_rolling_policy(RollingPolicy.default_rolling_policy())
            .build()
        )
    else:
        print("Printing result to stdout. Use --output to specify output path.")
        ds.print()

    # submit for execution
    env.execute()



if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)
    word_count(known_args.input, known_args.output)

执行脚本WorldCount.py

python word_count.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/88366.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker微服务实战

文章目录 业务需求IDEA编写代码编写Dockerfile构建镜像运行容器网页端访问测试 业务需求 利用Docker部署应用服务,实现在网页端通过输入地址 ip:端口/hello/docker,页面显示hello docker ! IDEA编写代码 创建springboot项目 网上很多教程,此步骤省略……

校园跑腿小程序开发方案详解

校园跑腿小程序App的功能有哪些? 1、用户注册与登录 用户可以通过手机号、社交账号等方式进行注册和登录,以便使用跑腿服务。 2、下单与发布任务 用户可以发布各类跑腿任务,包括食品外卖、快递代收、文件送达、帮我买、帮我取、帮我送等等…

运放的分类、运放的参数

一、运放的分类 运放按功能分为通用运放与专用运放(高速运放、精密运放、低IB运放等)。 1.1通用运放 除廉价外,没有任何最优指标的运放。 例:uA741,LM324,TL06X,TL07X、TL08X等 国外知名运放…

Docker搭建elasticsearch+kibana测试

最近需要做大数据画像,所以先简单搭建一个eskibana学习使用,记录一下搭建过程和遇到的问题以及解决办法 1.拉取es和kibana镜像 在拉取镜像之前先搜索一下 elasticsearch发现是存在elasticsearch镜像的,我一般习惯性拉取最新镜像&#xff0c…

C#__自定义类传输数据和前台线程和后台线程

// 前台线程和后台线程 // 默认情况下,用Thread类创建的线程是前台线程。线程池中的线程总是后台线程。 // 用Thread类创建线程的时候,可以设置IsBackground属性,表示一个后台线程。 // 前台线程在主函数运行结束后依旧执行,后台线…

b树/b+树、时间轮、跳表、LSM-Tree

b树、b树:关系型数据库核心存储结构 1、为什么磁盘数据存储结构用B树、而不用红黑树 磁盘每次读取不是读一个节点、是返回一页数据。 红黑树每次遍历一个节点排除一半数据。 B树通常映射相邻的磁盘页数据。4K mysql索引一个节点隐射16k故而映射4倍,故…

【C语言】扫雷游戏(可展开)——超细教学

🚩纸上得来终觉浅, 绝知此事要躬行。 🌟主页:June-Frost 🚀专栏:C语言 🔥该篇将运用数组来实现 扫雷游戏。 目录: 🌟思路框架测试游戏 🌟测试部分函数实现&am…

云计算在线实训系统建设方案

一、 人工智能与云计算系统概述 人工智能(Artificial Intelligence,简称AI)是一种模拟人类智能的科学和工程,通过使用计算机系统来模拟、扩展和增强人类的智能能力。人工智能涉及多个领域,包括机器学习、深度学习、自然…

Web 开发 Django 管理工具

上次为大家介绍了 Django 的模型,通过模型就可以操作数据库,从而就可以改变页面的展示内容,那问题来了,我们只能通过手动编辑模型文件来配置模型吗?当然不是,Django 为我们提供了强大的工具,可以…

优化Python代理爬虫的应用

当我们在资源受限的环境中使用Python代理爬虫时,我们需要采取一些优化措施,以确保程序的高效性和稳定性。在本文中,我将分享一些关于如何优化Python代理爬虫在资源受限环境下的应用的实用技巧。 首先我们来了解,哪些情况算是资源…

GNU make系列之介绍Makefile

一.欢迎来到我的酒馆 在本章节介绍Makefile。 目录 一.欢迎来到我的酒馆二.GNU make 预览三.一个简单的Makefile四.make程序如何处理Makefile文件 二.GNU make 预览 2.1 GNU make工具会自动决定哪些程序需要被重新编译,并且执行相应的命令来重新编译程序。在本系列博…

[MyBatis系列④]核心配置文件

目录 1、简介 2、DTD 3、typeHandlers 3.1、默认类型处理器 3.2、自定义类型处理器 4、plugins ⭐MyBatis系列①:增删改查 ⭐MyBatis系列②:两种Dao开发方式 ⭐MyBatis系列③:动态SQL 1、简介 MyBatis的核心配置文件(通常命…

whisper 语音识别项目部署

1.安装anaconda软件 在如下网盘免费获取软件: 链接:https://pan.baidu.com/s/1zOZCQOeiDhx6ebHh5zNasA 提取码:hfnd 2.使用conda命令创建python3.8环境 conda create -n whisper python3.83.进入whisper虚拟环境 conda activate whisper4.…

HQL解决连续三天登陆问题

1.背景 统计连续登录天数超过3天的用户,输出信息包括:用户id,登录天数,起始时间,结束时间; 2.准备数据 -- 建表 create table if not exists user_login_3days(user_id STRING,login_date date );--插入…

01.Django入门

1.创建项目 1.1基于终端创建Django项目 打开终端进入文件路径(打算将项目放在哪个目录,就进入哪个目录) E:\learning\python\Django 执行命令创建项目 F:\Anaconda3\envs\pythonWeb\Scripts\django-admin.exe(Django-admin.exe所…

手写模拟SpringBoot核心流程(二):实现Tomcat和Jetty的切换

实现Tomcat和Jetty的切换 前言 上一篇文章我们聊到,SpringBoot中内置了web服务器,包括Tomcat、Jetty,并且实现了SpringBoot启动Tomcat的流程。 那么SpringBoot怎样自动切换成Jetty服务器呢? 接下来我们继续学习如何实现Tomcat…

Python代理池健壮性测试 - 压力测试和异常处理

大家好!在构建一个可靠的Python代理池时,除了实现基本功能外,我们还需要进行一系列健壮性测试来确保其能够稳定运行,并具备应对各种异常情况的能力。本文将介绍如何使用压力测试工具以及合适的异常处理机制来提升Python代理池的可…

linux系统(centos、ubuntu、银河麒麟服务、uos、deepin)判断程序是否已安装,通用判断方法:使用所有应用和命令的判断

前言 项目中需要判断linux服务器中是否已经安装了某个服务 方法有很多种,但是很多都不通用, 脚本代码就不容易做成统一的 解决方案 用下面的脚本代码去进行判断 用jdk测试 脚本意思如下: 输入java -version命令,将返回的字…

微服务中间件--分布式搜索ES

分布式搜索ES 11.分布式搜索 ESa.介绍ESb.IK分词器c.索引库操作 (类似于MYSQL的Table)d.查看、删除、修改 索引库e.文档操作 (类似MYSQL的数据)1) 添加文档2) 查看文档3) 删除文档4) 修改文档 f.RestClient操作索引库1) 创建索引库2) 删除索引库/判断索引库 g.RestClient操作文…

Linux之基础IO文件系统讲解

基础IO文件系统讲解 回顾C语言读写文件读文件操作写文件操作输出信息到显示器的方法stdin & stdout & stderr总结 系统文件IOIO接口介绍文件描述符fd文件描述符的分配规则C标准库文件操作函数简易模拟实现重定向dup2 系统调用在minishell中添加重定向功能 FILE文件系统…