Flink Python作业快速入门

Flink Python快速入门_实时计算 Flink版(Flink)-阿里云帮助中心


import argparse
# 用于处理命令行参数和选项,使程序能够接收用户通过命令行传递的参数
import logging
import sys

from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,
                                           RollingPolicy)

# WatermarkStrategy: 用于生成水印(watermarks),水印是用于处理事件时间(event time)的数据流中的延迟数据的一种机制。
# Encoder: 用于定义如何将数据编码为字节序列,通常用于数据的序列化和反序列化。
# Types: 包含了 Flink 中各种数据类型的定义,用于指定数据流中数据的类型。
# StreamExecutionEnvironment: 是所有 Flink 流处理程序的入口点,用于配置和启动流处理任务。
# RuntimeExecutionMode: 定义了流处理任务的执行模式,例如批处理模式或流处理模式。
# FileSource: 用于从文件系统中读取数据源。
# StreamFormat: 定义了数据的格式,例如 CSV、JSON 等。
# FileSink: 用于将数据写入文件系统。
# OutputFileConfig: 配置输出文件的相关设置,如前缀和后缀。
# RollingPolicy: 定义了文件滚动策略,即何时创建新的输出文件。

word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "Or to take arms against a sea of troubles,",
                   "And by opposing end them?--To die,--to sleep,--",
                   "No more; and by a sleep to say we end",
                   "The heartache, and the thousand natural shocks",
                   "That flesh is heir to,--'tis a consummation",
                   "Devoutly to be wish'd. To die,--to sleep;--",
                   "To sleep! perchance to dream:--ay, there's the rub;",
                   "For in that sleep of death what dreams may come,",
                   "When we have shuffled off this mortal coil,",
                   "Must give us pause: there's the respect",
                   "That makes calamity of so long life;",
                   "For who would bear the whips and scorns of time,",
                   "The oppressor's wrong, the proud man's contumely,",
                   "The pangs of despis'd love, the law's delay,",
                   "The insolence of office, and the spurns",
                   "That patient merit of the unworthy takes,",
                   "When he himself might his quietus make",
                   "With a bare bodkin? who would these fardels bear,",
                   "To grunt and sweat under a weary life,",
                   "But that the dread of something after death,--",
                   "The undiscover'd country, from whose bourn",
                   "No traveller returns,--puzzles the will,",
                   "And makes us rather bear those ills we have",
                   "Than fly to others that we know not of?",
                   "Thus conscience does make cowards of us all;",
                   "And thus the native hue of resolution",
                   "Is sicklied o'er with the pale cast of thought;",
                   "And enterprises of great pith and moment,",
                   "With this regard, their currents turn awry,",
                   "And lose the name of action.--Soft you now!",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]


def word_count(input_path, output_path):
    """
    计算文本文件中单词的频率,并将结果输出到指定路径。

    该函数从指定的输入路径读取文本数据,进行单词频率统计,并将结果写入指定的输出路径。
    如果没有提供输入路径或输出路径,则使用默认数据或直接打印结果。

    参数:
    - input_path: 输入文本文件的路径。如果为None,则使用默认数据。
    - output_path: 输出结果的路径。如果为None,则直接打印结果。
    """
    # 获取流处理环境并设置为流处理模式,设置并行度为1
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    env.set_parallelism(1)

    # 定义数据源
    if input_path is not None:
        # 从文件系统中读取数据
        ds = env.from_source(
            source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
                                                       input_path)
            .process_static_file_set().build(),
            watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
            source_name="file_source"
        )
    else:
        # 使用默认数据
        ds = env.from_collection(word_count_data)

    # 定义分割函数,将每行文本分割成单词
    def split(line):
        yield from line.split()

    # 计算单词频率
    ds = ds.flat_map(split) \
        .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
        .key_by(lambda i: i[0]) \
        .reduce(lambda i, j: (i[0], i[1] + j[1]))

    # 定义数据汇
    if output_path is not None:
        # 将结果写入文件系统
        ds.sink_to(
            sink=FileSink.for_row_format(
                base_path=output_path,
                encoder=Encoder.simple_string_encoder())
            .with_output_file_config(
                OutputFileConfig.builder()
                .with_part_prefix("prefix")
                .with_part_suffix(".ext")
                .build())
            .with_rolling_policy(RollingPolicy.default_rolling_policy())
            .build()
        )
    else:
        # 直接打印结果
        ds.print()

    # 提交作业以执行
    env.execute()


if __name__ == '__main__':
    # 配置日志输出到标准输出,设置日志级别为INFO,并格式化日志消息以仅显示消息内容
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    # 创建一个ArgumentParser对象以处理命令行参数
    parser = argparse.ArgumentParser()
    # 添加可选的命令行参数,用于指定输入和输出文件
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='要处理的输入文件。')
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='要写入结果的输出文件。')

    # 获取命令行参数,排除脚本名称
    argv = sys.argv[1:]
    print("Command line arguments: ", argv)
    # 解析已知的命令行参数,并忽略未知参数
    known_args, _ = parser.parse_known_args(argv)
    print("known_args: ", known_args)

    # 调用word_count函数,传入从解析参数中获取的输入和输出文件路径
    word_count(known_args.input, known_args.output)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/933652.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数字图像处理(15):图像平移

(1)图像平移的基本原理:计算每个像素点的移动向量,并将这些像素按照指定的方向和距离进行移动。 (2)平移向量包括水平和垂直分量,可以表示为(dx,dy)&#xff…

JAVA秋招面试题精选-第一天总结

目录 分栏简介: 问题一:订单表每天新增500W条数据,分库分表应该怎么设计? 问题难度以及频率: 问题导向: 满分答案: 举一反三: 问题总结: 问题二:解释…

Rnnoise和SpeexDsp两种降噪方式有什么区别?

在蒙以CourseMaker 7.0软件中,增加了两种降噪模式,一种是Rnnoise,一种是SpeexDsp,这两种降噪模式有什么区别呢? Rnnoise 基于神经网络。当噪声与 rnnoise 的模型训练的噪声匹配时,它的效果非常好。比如说&…

博物馆导览系统方案(一)背景需求分析与核心技术实现

维小帮提供多个场所的室内外导航导览方案,如需获取博物馆导览系统解决方案可前往文章最下方获取,如有项目合作及技术交流欢迎私信我们哦~撒花! 一、博物馆导览系统的背景与市场需求 在数字化转型的浪潮中,博物馆作为文化传承和知…

福昕PDF低代码平台

福昕PDF低代码平台简介 福昕PDF 低代码平台是一款创新的工具,旨在简化PDF处理和管理的流程。通过这个平台,用户可以通过简单的拖拽界面上的按钮,轻松完成对Cloud API的调用工作流,而无需编写复杂的代码。这使得即使没有编程经验的…

Linux —— 管理文件

一、Linux的目录结构及用途 /bin:存放最常用的命令,如ls、cat等,所有用户都可以执行的命令。/boot:包含启动Linux系统所需的核心文件,如内核文件和引导加载程序。/dev:设备文件目录,包含系统中的…

NanoLog起步笔记-7-log解压过程初探

nonolog起步笔记-6-log解压过程初探 再看解压过程建立调试工程修改makefile添加新的launch项 注:重新学习nanolog的README.mdPost-Execution Log Decompressor 下面我们尝试了解,解压的过程,是如何得到文件头部的meta信息的。 再看解压过程 …

处理配置System Viewer缺少SFR文件

按照网上的教程,其他的都配好 这里给几个参考 嵌入式开发--Keil MDK仿真时System Viewer不显示寄存器选项_keil system viewer不显示外设寄存器-CSDN博客 keil无法查看外设寄存器(生成SFR文件)_keil sfr文件-CSDN博客 keil5软件仿真 Logic…

网络安全中大数据和人工智能应用实践

传统的网络安全防护手段主要是通过单点的网络安全设备,随着网络攻击的方式和手段不断的变化,大数据和人工智能技术也在最近十年飞速地发展,网络安全防护也逐渐开始拥抱大数据和人工智能。传统的安全设备和防护手段容易形成数据孤岛&#xff0…

create-react-app react19 搭建项目报错

报错截图 此时运行会报错: 解决方法: 1.根据提示安装依赖法 执行npm i web-vitals然后重新允许 2.删除文件法 在index.js中删除对报错文件的引入,删除报错文件

excel如何让单元格选中时显示提示信息?

现象: 当鼠标放在单元格上,会出现提示信息: 先选中单元格选择上方的【数据】-【数据验证】图标选择【输入信息】勾上【选定单元格时显示输入信息】输入【标题】,如:最上方图中的:姓名:输入【输…

PyCharm+Selenium+Pytest配置小记

1、下载ChromeDriver: Chrome130以后的Driver下载: Chrome for Testing availabilityhttps://googlechromelabs.github.io/chrome-for-testing/ (1)查看自己Crome浏览器的版本:设置-->关于 Chrome; &…

用最小的代价解决mybatis-plus关于批量保存的性能问题

1.问题说明 问题背景说明,在使用达梦数据库时,mybatis-plus的serviceImpl.saveBatch()方法或者updateBatchById()方法的时候,随着数据量、属性字段的增加,效率越发明显的慢。 serviceImpl.saveBatch(); serviceImpl.updateBatch…

电子商务人工智能指南 4/6 - 内容理解

介绍 81% 的零售业高管表示, AI 至少在其组织中发挥了中等至完全的作用。然而,78% 的受访零售业高管表示,很难跟上不断发展的 AI 格局。 近年来,电子商务团队加快了适应新客户偏好和创造卓越数字购物体验的需求。采用 AI 不再是一…

Helm安装Mysql8主从复制集群

目录 一、Helm安装 二、安装mysql 1、拉取镜像 2、修改配置文件 3、创建mysql-secret 4、安装 一、Helm安装 这里不再赘叙,具体安装请参考官网 Helm | 快速入门指南 二、安装mysql 1、拉取镜像 #添加仓库 helm repo add bitnami https://charts.bitnami.c…

Java并发编程学习之从资本家的角度看多线程和并发性(一)

目录 前言前置知识一、单线程时代二、为什么要有多线程,多线程的优点?三、使用多线程会遇到什么问题?四、多线程和并发编程的关系总结 前言 这篇文章是打开Java多线程和并发编程的大门的开始,如标题《从老板的角度看多线程和并发…

【爬虫】selenium打开浏览器以及页面

本篇探讨如何使用 selenium 打开浏览器 selenium 基础与网页打开 selenium 是一个广泛应用于自动化测试和网页抓取的工具,它能够模拟用户在浏览器中的各种操作。首先,我们需要根据指定的浏览器类型(这里以 Chrome 为例)打开网页…

【算法练习】162. 寻找峰值

题目链接&#xff1a;162. 寻找峰值 看思路图&#xff1a; class Solution { public:int findPeakElement(vector<int>& nums) {int left 0,right nums.size()-1;while(left<right){int mid left (right-left)/2;if(nums[mid]>nums[mid1]){right mid;}els…

Android SurfaceFlinger layer层级

壁纸作为显示的最底层窗口它是怎么显示的 1. SurfaceFlinger layer层级 锁屏状态dump SurfaceFlinger &#xff0c;adb shell dumpsys SurfaceFlinger Display 0 (active) HWC layers: -----------------------------------------------------------------------------------…

SAP Ariba Approval _Email Approval

Email Approval Example 当用户成为文档审批者时,SAP Ariba会向该用户发送电子邮件通知消息。 在以下情况下,批准人可以收到电子邮件通知: 有人提交或重新提交文件以获得批准 某人撤回文件 系统升级文档 系统即将向主管升级请求 如果多个用户共享一个群组职责,他们则会收到…