0基础学习PyFlink——使用DataStream进行字数统计

大纲

  • source
  • Map
    • Splitting
    • Mapping
  • Reduce
    • Keying
    • Reducing
  • 完整代码
  • 结构
  • 参考资料

在《0基础学习PyFlink——模拟Hadoop流程》一文中,我们看到Hadoop在处理大数据时的MapReduce过程。
在这里插入图片描述
本节介绍的DataStream API,则使用了类似的结构。

source

为了方便,我们依然使用from_collection从内存中读取数据。
和使用Table API类似,我们给from_collection传递的第二参数是每行数据类型。本例中是String,即“A C B”的类型。

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode

word_count_data = ["A C B",
                   "A E B",
                   "E C D"]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.STRING()
    # define the source
    source = env.from_collection(word_count_data, source_type_info)

可以使用下面指令输出source内容

    source.print()
A C B
A E B
E C D

Map

和上图一样,Map由Splitting和Mapping组成。它们分别将数据切割成做小运算单元,和生成map结构。

Splitting

    def split(line):
        for s in line.split():
            yield s
            
    splitted = source.flat_map(split) 

上述splitted的结构输出是

A
C
B
A
E
B
E
C
D

Mapping

Mapping的操作就是将之前的数组结构转换成map结构

mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))

mapped的输出值如下,可以看到它还是按我们输入数据的顺序排列的。

(A,1)
(C,1)
(B,1)
(A,1)
(E,1)
(B,1)
(E,1)
(C,1)
(D,1)

Reduce

Keying

这一步对应于上图中的Shuffling&Sorting,它会将相同key的数据进行分区,以供后面reducing操作使用。

    keyed=mapped.key_by(lambda i: i[0]) 

可以看到keyed数据已经经过排序和聚合了。

(A,1)
(A,1)
(B,1)
(B,1)
(C,1)
(C,1)
(D,1)

Reducing

 reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))

reduce的方法有如下注释

Applies a reduce transformation on the grouped data stream grouped on by the given
key position. The ReduceFunction will receive input values based on the key value.
Only input values with the same key will go to the same reducer.

特别是最后一句非常有用“Only input values with the same key will go to the same reducer”(只有相同Key的输入数据才会进入相同的Reducer中)。这句话意味着上述Keyed的数据会被分组执行,于是就不会出现计算错乱。

(A,2)
(B,2)
(C,2)
(D,1)
(E,2)

完整代码

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode

word_count_data = ["A C B",
                   "A E B",
                   "E C D"]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.STRING()
    # define the source
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    def split(line):
        for s in line.split():
            yield s
            
    splitted = source.flat_map(split) 
    # splitted.print()
    mapped=splitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))
    # mapped.print()
    keyed=mapped.key_by(lambda i: i[0]) 
    # keyed.print()
    reduced=keyed.reduce(lambda i, j: (i[0], i[1] + j[1]))

    # define the sink
    reduced.print()

    # submit for execution
    env.execute()

if __name__ == '__main__':
    word_count()

结构

在这里插入图片描述

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/python/datastream_tutorial/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/114534.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用 Docker 搭建一个“一主一从”的 MySQL 读写分离集群(超详细步骤)

目录 一、前提二、MySQL 生产安装1,拉取mysql2,查看mysql镜像3, 启动 mysql 容器4,修改mysql的中文编码5,查看验证mysql的中文编码 三、Mysql主机 mysql_master 的安装与配置1, 拷贝master容器2&#xff0c…

stable-diffusion 电商领域prompt测评集合

和GhostReivew一个思路,还是从比较好的图片或者是civitai上找一些热门的prompt,从小红书上也找到了不少的prompt,lexica.art上也有不少,主要是为了电商场景的一些测评: 小红书、civitai、Lexica、Liblib.ai、 depth o…

在钣金加工领域,迅镭激光切割机广泛使用的原因和优点何在?

激光切割工艺和激光切割设备正在被广泛的板材加工企业逐渐理解并接受,凭借其高效率的加工、高精度的加工、优质的切割断面、三维切割能力等诸多优势,逐步取代了传统的钣金切割设备。 苏州迅镭激光科技有限公司推出的激光切割设备的柔性化程度高&#xff…

降低边际成本:跨境电商的利润增长策略

在竞争激烈的跨境电商领域,降低成本是提高利润的关键。边际成本,即生产或销售一件额外商品所需的额外成本,在跨境电商中起到至关重要的作用。在本文中,我们将探讨降低边际成本的策略,以实现跨境电商的利润增长。 供应链…

centos7中实现多个python版本共存(python2.7、python3.6、python3.9等)

问题描述: 开发环境中,新项目需要在python3.9及以上版本开发,为了不影响之前运行在python3.6上的项目,就需要增加一个python3.9环境。线上直接使用docker部署就可以了。 解决办法 前提:python2.7和python3.6之前已经…

计算机网络 第五章传输层

文章目录 1 传输层的功能2 传输层两种协议:UDP和TCP3 端口和端口号4 UDP数据报特点和首部格式5 UDP校验6 TCP协议的特点7 TCP报文段首部格式8 TCP连接:三次握手建立连接9 TCP连接:四次挥手释放连接10 TCP可靠传输11 TCP流量控制12 TCP拥塞控制…

快速入手maven

文章目录 Maven介绍Maven安装和配置基于IDEA的Maven工程创建梳理Maven工程GAVP属性Idea构建Maven JavaSE工程Idea构建Maven JavaEE工程1. 手动创建2. 插件方式创建 Maven工程项目结构说明Maven核心功能依赖和构建管理依赖传递和冲突依赖导入失败场景和解决方案扩展构建管理和插…

时间复杂度的计算技巧-算法模型中的时间复杂度如何计算,有哪些技巧呢

大家好,我是微学AI,今天给大家介绍一下时间复杂度的计算技巧-算法模型中的时间复杂度如何计算,有哪些技巧呢,算法的时间复杂度是评估算法性能和效率的一种方式,它表示算法需要执行多少次基本操作才能完成其任务&#x…

k8s-服务网格实战-入门Istio

istio-01.png 背景 终于进入大家都比较感兴趣的服务网格系列了,在前面已经讲解了: 如何部署应用到 kubernetes服务之间如何调用如何通过域名访问我们的服务如何使用 kubernetes 自带的配置 ConfigMap 基本上已经够我们开发一般规模的 web 应用了&#xf…

app逆向入门之车智赢

声明:本文仅限学习交流使用,禁止用于非法用途、商业活动等。否则后果自负。如有侵权,请告知删除,谢谢!本教程也没有专门针对某个网站而编写,单纯的技术研究 目录 案例分析技术依赖参数分析效果展示代码分享…

电压放大器可用于什么场合

电压放大器是电子器件中常见的一种放大器类型,它可以将输入信号的电压放大到更大的幅度,以满足特定应用的需求。电压放大器广泛应用于多个领域和场合,下面将详细介绍一些使用电压放大器的场景。 音频放大器:音频放大器是电压放大器…

Spark的主要概念

文章目录 🔊博主介绍🥤本文内容🍊 1. RDD🍊 2. Spark SQL🍊 3. Spark Streaming🍊 4. MLlib🍊 5. GraphX🍊 总结 📢文章总结📥博主目标 🔊博主介绍…

linux——网络套接字编程

目录 一.简单了解TCP和UDP协议 二.网络字节序 三.socket常见的编程接口 1.介绍接口 2.sockaddr结构 四.简单的UDP网络程序 1.recvfrom和sendto 2.server.cc 3.client.cc 五.简单的TCP通信 1.client.cc 2.server.cc 一.简单了解TCP和UDP协议 此处我们先对TCP(Transm…

零日漏洞预防

零日漏洞,是软件应用程序或操作系统(OS)中的意外安全漏洞,负责修复该漏洞的一方或供应商不知道该漏洞,它们仍然未被披露和修补,为攻击者留下了漏洞,而公众仍然没有意识到风险。 零日攻击是如何…

AI“走深向实”,蚂蚁蚁盾在云栖大会发布实体产业「知识交互建模引擎」

数字化起步晚、数据分散稀疏、专业壁垒高、行业知识依赖「老师傅」,是很多传统产业智能化发展面临的难题。2023年云栖大会上,蚂蚁集团安全科技品牌蚁盾发布“知识交互建模引擎”,将实体产业知识与AI模型有机结合,助力企业最快10分…

uniapp subNvue 写的视频播放

文件下载地址 https://download.csdn.net/download/weixin_47517731/88500016https://download.csdn.net/download/weixin_47517731/88500016 1:在pages.json中配置视频播放页面 {/* 视频详情页面 */"path": "pages/detail-video/detail","style&q…

AD1255/AD1256硬件SPI开发实战与跳坑过程

AD1255/AD1256硬件SPI开发实战与跳坑过程 以上图片我们可以知道在t17阶段,数据是不能被读取的。另外最小是16个τCLKIN,具体是多少这个跟你配置的DATA_rate的设置有关系。 1.6 同步SYNC的时序 要同步SYNC,要么采用管脚SYNC,要么…

云原生环境下JAVA应用容器JVM内存如何配置?—— 筑梦之路

Docker环境下的JVM参数非定值配置 —— 筑梦之路_docker jvm设置-CSDN博客 之前简单地记录过一篇,这里在之前的基础上更加细化一下。 场景说明 使用Java开发且设置的JVM堆空间过小时,程序会出现系统内存不足OOM(Out of Memory)的…

词典查询工具django-mdict

什么是 django-mdict ? django-mdict 不是词典软件,是词典查询的脚本工具,主要目的是解决词典数量多,手机容量不足的问题,是对其他词典软件局域网在线查询功能的补充,是用 django 实现的 mdict 词典查询工具…

微信小程序上传图片和上传视频的组件失效

微信小程序上传图片和上传视频的组件失效 今天公司的小程序展示图片和视频文字的页面上传图片组件突然失效,之前用的好好的,突然所有使用都都发现用不了,以为是代码出现问题,反复查了很久。换了一个openid居然就可以了&#xff0…