流批一体计算引擎-10-[Flink]中的常用算子和DataStream转换

pyflink 处理 kafka数据
在这里插入图片描述

1 DataStream API 示例代码

从非空集合中读取数据,并将结果写入本地文件系统。

from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink

def tutorial():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    ds = env.from_collection(
        collection=[(1, 'aaa'), (2, 'bbb')],
        type_info=Types.ROW([Types.INT(), Types.STRING()]))
    ds.add_sink(StreamingFileSink
                .for_row_format('output', Encoder.simple_string_encoder())
                .build())
    env.execute("tutorial_job")

if __name__ == '__main__':
    tutorial()

(1)DataStream API应用程序首先需要声明一个执行环境
StreamExecutionEnvironment,这是流式程序执行的上下文。
后续将通过它来设置作业的属性(例如默认并发度、重启策略等)、创建源、并最终触发作业的执行。

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

(2)声明数据源
一旦创建了 StreamExecutionEnvironment 之后,可以使用它来声明数据源。
数据源从外部系统(如Apache Kafka、Rabbit MQ 或 Apache Pulsar)拉取数据到Flink作业里。
为了简单起见,本次使用元素集合作为数据源。
这里从相同类型数据集合中创建数据流(一个带有 INT 和 STRING 类型字段的ROW类型)。

ds = env.from_collection(
    collection=[(1, 'aaa'), (2, 'bbb')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))

(3)转换操作或写入外部系统
现在可以在这个数据流上执行转换操作,或者使用 sink 将数据写入外部系统。
本次使用StreamingFileSink将数据写入output文件目录中。

ds.add_sink(StreamingFileSink
   .for_row_format('output', Encoder.simple_string_encoder())
   .build())

(4)执行作业
最后一步是执行真实的 PyFlink DataStream API作业。
PyFlink applications是懒加载的,并且只有在完全构建之后才会提交给集群上执行。
要执行一个应用程序,只需简单地调用env.execute(job_name)。

env.execute("tutorial_job")

在这里插入图片描述

2 自定义转换函数的三种方式

三种方式支持用户自定义函数。

2.1 Lambda函数[简便]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())

mapped_stream.print()
env.execute("tutorial_job")

2.2 python函数[简便]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

def my_map_func(value):
   return value + 1

def main():
   env = StreamExecutionEnvironment.get_execution_environment()
   env.set_parallelism(1)

   data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
   mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())

   mapped_stream.print()
   env.execute("tutorial_job")

if __name__ == '__main__':
   main()

2.3 接口函数[复杂]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, MapFunction


class MyMapFunction(MapFunction):

   def map(self, value):
      return value + 1


def main():
   env = StreamExecutionEnvironment.get_execution_environment()
   env.set_parallelism(1)

   data_stream = env.from_collection([1, 2, 3, 41, 5], type_info=Types.INT())
   mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())

   mapped_stream.print()
   env.execute("tutorial_job")

if __name__ == '__main__':
   main()

3 常用算子

参考官网算子

3.1 map【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())

mapped_stream.print()
env.execute("tutorial_job")

在这里插入图片描述

3.2 flat_map【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream = env.from_collection(collection=['hello apache flink', 'streaming compute'])
out = data_stream.flat_map(lambda x: x.split(' '), output_type=Types.STRING())

out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.3 filter【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件


def my_func(value):
    if value % 2 == 0:
        return value
    
    
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
filtered_stream = data_stream.filter(my_func)

filtered_stream.print()
env.execute("tutorial_job")

3.4 window_all【DataStream->AllWindowedStream】

根据某些特征(例如,最近 100毫秒秒内到达的数据)对所有流事件进行分组。
所有的元素。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
all_window_stream = data_stream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))

3.4.1 apply【AllWindowedStream->DataStream】

将通用 function 应用于整个窗口。

from typing import Iterable

from pyflink.common import Time
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import AllWindowFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows, TimeWindow

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

class MyAllWindowFunction(AllWindowFunction[tuple, int, TimeWindow]):
    def apply(self, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:
        sum = 0
        for input in inputs:
            sum += input[0]
        yield sum


data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
all_window_stream = data_stream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
out = all_window_stream.apply(MyAllWindowFunction())

out.print()
env.execute("tutorial_job")

3.5 key_by【DataStream->KeyedStream】

需要结合reduce或window算子使用。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())

3.6 reduce【KeyedStream->DataStream】增量

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
out = key_stream.reduce(lambda a, b: (a[0]+b[0], a[1]))

out.print()
env.execute("tutorial_job")

在这里插入图片描述
在相同 key 的数据流上“滚动”执行 reduce。
将当前元素与最后一次 reduce 得到的值组合然后输出新值。

3.7 window【KeyedStream->WindowedStream】

在已经分区的 KeyedStreams 上定义 Window。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))

3.7.1 apply【WindowedStream->DataStream】

将通用 function 应用于整个窗口。

from typing import Iterable

from pyflink.common import Time, Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import WindowFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows, TimeWindow

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

class MyWindowFunction(WindowFunction[tuple, int, int, TimeWindow]):
    def apply(self, key: int, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:
        sum = 0
        for input in inputs:
            sum += input[0]
        yield key, sum


data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.apply(MyWindowFunction())

out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.7.2 reduce【WindowedStream->DataStream】

from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.window import TumblingEventTimeWindows,TumblingProcessingTimeWindows

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.reduce(lambda a, b: (a[0]+b[0], a[1]))

out.print()
env.execute("tutorial_job")

在这里插入图片描述
方式二

from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, ReduceFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

class MyReduceFunction(ReduceFunction):
    def reduce(self, value1, value2):
        return value1[0] + value2[0], value1[1]

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.reduce(MyReduceFunction())

out.print()
env.execute("tutorial_job")

3.8 union【DataStream*->DataStream】

将两个或多个数据流联合来创建一个包含所有流中数据的新流。
注意:如果一个数据流和自身进行联合,这个流中的每个数据将在合并后的流中出现两次。

from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
out = data_stream2.union(data_stream1)

out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.9 connect【DataStream,DataStream->ConnectedStream】

stream_1 = ...
stream_2 = ...
connected_streams = stream_1.connect(stream_2)

3.9.1 CoMap【ConnectedStream->DataStream】

from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
connected_stream = data_stream1.connect(data_stream2)


class MyCoMapFunction(CoMapFunction):

    def map1(self, value):
        return value[0] *100, value[1]

    def map2(self, value):
        return value[0], value[1] + 'flink'

out = connected_stream.map(MyCoMapFunction())

out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.9.2 CoFlatMap【ConnectedStream->DataStream】

from pyflink.datastream import StreamExecutionEnvironment, CoFlatMapFunction

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件

data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
connected_stream = data_stream1.connect(data_stream2)


class MyCoFlatMapFunction(CoFlatMapFunction):

    def flat_map1(self, value):
        for i in range(value[0]):
            yield value[0]*100

    def flat_map2(self, value):
        yield value[0] + 10

out = connected_stream.flat_map(MyCoFlatMapFunction())

out.print()
env.execute("tutorial_job")

在这里插入图片描述

4 对接kafka输入json输出json

输入{“name”:“中文”}
输出{“name”:“中文结果”}

from pyflink.common import SimpleStringSchema, WatermarkStrategy, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, KafkaSink, \
    KafkaRecordSerializationSchema
import json

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)

brokers = "IP:9092"

# 读取kafka
source = KafkaSource.builder() \
    .set_bootstrap_servers(brokers) \
    .set_topics("flink_source") \
    .set_group_id("my-group") \
    .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .build()

ds1 = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
ds1.print()

# 处理流程
def process_fun(line):
    data_dict = json.loads(line)
    result_dict = {"result": data_dict.get("name", "无")+"结果"}
    return json.dumps(result_dict, ensure_ascii=False)

ds2 = ds1.map(process_fun, Types.STRING())
ds2.print()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/697868.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Vue】图形验证码功能

说明: 图形验证码,本质就是一个请求回来的图片用户将来输入图形验证码,用于强制人机交互,可以抵御机器自动化攻击 (例如:避免批量请求获取短信) 需求: 动态将请求回来的 base64 图片,解析渲染…

【面试干货】聚集索引和非聚集索引区别?

【面试干货】聚集索引和非聚集索引区别? 1、聚集索引(Clustered Index)1.1 特点1.2 例子 2、非聚集索引(Nonclustered Index)2.1 特点2.2 例子 3、根本区别 💖The Begin💖点点关注,收藏不迷路&…

秋招突击——6/10——复习{(树形DP)树的最长路径、}——新作{电话号码的字母组合}

文章目录 引言复习树形DP——树的最长路径思路分析参考思路求图的最长的直径的通用方法证明 树形DP分析方法问题 参考代码使用一维数组模拟邻接表存储树形结构或者稀疏图 新作电话号码的组合思路分析参考实现 总结 引言 中间面试了两天,去上海呆了一天,…

小熊家务帮day19-day21 订单模块2(取消订单,退款功能等)

目录 1 订单退款功能1.1 需求分析1.2 接口分析1.3 退款流程分析1.4 表结构设计1.5 取消未支付订单实现1.5.1 接口开发Controller层开发Service层开发 1.5.2 接口测试 1.5 取消已支付订单实现 1 订单退款功能 1.1 需求分析 用户下单成功可以取消订单,在订单的不同状…

机器视觉系统-同轴光源大小选择技巧

同轴光源多用于检测光滑平面产品上的缺陷,同样利用上述的方法计算得出光源尺寸。 实际上,同轴光源可理解为没有孔的开孔面光,因此可等效为发光面相等的面光源,如下图: 如图所示,同轴光源的效果与开孔面光的…

【Labview】通过串口通信从上位机读取和写入数据

最近博主需要通过Labview的上位机控制一个温控仪表,主要实现在上位机读取实时温度和设定的目标温度,以及通过上位机设定目标温度。这里将其中遇到的问题和心得分享给大家,博主自己也做一个记录。 由于温控仪表采用的485通讯,modb…

王学岗鸿蒙开发(北向)——————(十)子组件修改父组件的内容与 动画

子组件修改父组件的内容 使用类似Android的回调,父组件传递给子组件一个函数 import { MyComment } from ./component/MyComment import { MyContent } from ./component/MyComtent import { MyTitleComponent } from ./component/MyTitleComponentEntry Componen…

现代x86汇编-环境安装

今天端午节,独自在家,翻阅了张银奎老师编写的《现代x86汇编语言程序设计》一书,前言部分说明书中示例代码都是用微软visual C工具编写并使用微软宏汇编(著名的MASM)编译的,好久没有用微软vc了,假…

18.2 HTTP服务器-处理函数、响应404错误

1. 处理函数 处理来自客户端的请求,并回之以特定的响应,这是处理函数的主要任务。在处理函数中,我们通常会完成如下工作: 验证请求路径 http.Request.URL.Pathhttp.NotFound(...) 当请求没有对应的处理函数时,返回4…

文章解读与仿真程序复现思路——电力自动化设备EI\CSCD\北大核心《计及电-气园区综合能源系统多重不确定性的变置信区间优化调度 》

本专栏栏目提供文章与程序复现思路,具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源…

物联网概念

物联网 物联网简介物联网体系结构物联网体系结构定义物联网体系结构设计原则物联网体系结构四层物联网体系结构感知控制层数据传输层数据处理层应用决策层 物联网关键技术感知标识技术网络与通信技术云计算技术安全技术 已有物联网相关应用架构无线传感器网络的体系结构EPC/UID…

【讲解下Chrome DevTools,什么是Chrome DevTools?】

🎥博主:程序员不想YY啊 💫CSDN优质创作者,CSDN实力新星,CSDN博客专家 🤗点赞🎈收藏⭐再看💫养成习惯 ✨希望本文对您有所裨益,如有不足之处,欢迎在评论区提出…

精神建设:为什么要学C语言以及如何学习C语言

一,为什么要学习C语言 学习C语言有以下几个重要原因: 基础性:C语言是一种非常基础的编程语言,它接近计算机硬件层面,让你能够更深入地理解计算机系统如何工作,包括内存管理、指针操作等。这对于构建坚实的…

2024年6月最新开源电视影视TVAPP原生源码和后台管理平台源码及完整教程

本套源码为本人维护更新完善半年左右的还在使用开发的源码,与市面上倒卖的残次品不一样,没有可比性,向下兼容安卓4.0,向上兼容安卓13以上TV电视系统, 完全无闪退,弹窗报错,卡死、异常死循环残次…

[FreeRTOS 基础知识] 任务调度 与 链表

文章目录 任务并行的概念RTOS如何实现多任务调度? 任务并行的概念 在生活中,经常出现一心多用的情况。比如你需要一边吃饭一边手机回复信息,这里面就存在两个任务:任务一、吃饭。任务二、手机回复信息。 假如你无法一心多用&…

数据仓库核心:事实表深度解析与设计指南

文章目录 1. 引言1.1基本概念1.2 事实表定义 2. 设计原则2.1 原则一:全面覆盖业务相关事实2.2 原则二:精选与业务过程紧密相关的事实2.3 原则三:拆分不可加事实为可加度量2.4 原则四:明确声明事实表的粒度2.5 原则五:避…

Harmony中的HAP、HAR、HSP区别

Harmony中的HAP、HAR、HSP区别 想要更加合理的开发一个企业级别的Harmony应用,那么就不得不提其中的HAP、HAR、HSP了。 前言 对于普通的用户来说,可能一个普通的应用就等于一个安装文件如安卓下的APK。但是对于Harmony应用开发工程师来讲,…

单田芳mp3百度网盘,单田芳评书下载百度云百度网盘

单老的评书还注重情感的表达。他善于运用声音、语气、语调等手段,将人物的情感刻画得淋漓尽致。无论是喜怒哀乐,他都能准确地把握人物的情感变化,并通过自己的表演将其传递给听众。这种情感的传递,使得听众能够更加深入地理解故事…

构建大语言模型友好型网站

以大语言模型为代表的AI 技术迅速发展,将会影响原有信息网络的方式。其中一个明显的趋势是通过chatGPT 对话代替搜索引擎和浏览器来获取信息。 互联网时代,主要是通过网站(website)提供信息。网站主要为人类阅读的方式构建的。主要…

Vitis HLS 学习笔记--聚合与解聚-AXI主接口

目录 1. 简介 2. 用法及语法 3. 详细解读 4. 总结 1. 简介 在使用 Vitis HLS 工具进行硬件设计时,如果你在接口上使用了结构体,工具会自动把结构体里的所有元素组合成一个整体。就像把一堆零件组装成一个玩具一样。这样做的好处是,数据可…