Flink Python快速入门_实时计算 Flink版(Flink)-阿里云帮助中心
import argparse
# 用于处理命令行参数和选项,使程序能够接收用户通过命令行传递的参数
import logging
import sys
from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,
RollingPolicy)
# WatermarkStrategy: 用于生成水印(watermarks),水印是用于处理事件时间(event time)的数据流中的延迟数据的一种机制。
# Encoder: 用于定义如何将数据编码为字节序列,通常用于数据的序列化和反序列化。
# Types: 包含了 Flink 中各种数据类型的定义,用于指定数据流中数据的类型。
# StreamExecutionEnvironment: 是所有 Flink 流处理程序的入口点,用于配置和启动流处理任务。
# RuntimeExecutionMode: 定义了流处理任务的执行模式,例如批处理模式或流处理模式。
# FileSource: 用于从文件系统中读取数据源。
# StreamFormat: 定义了数据的格式,例如 CSV、JSON 等。
# FileSink: 用于将数据写入文件系统。
# OutputFileConfig: 配置输出文件的相关设置,如前缀和后缀。
# RollingPolicy: 定义了文件滚动策略,即何时创建新的输出文件。
word_count_data = ["To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
"Devoutly to be wish'd. To die,--to sleep;--",
"To sleep! perchance to dream:--ay, there's the rub;",
"For in that sleep of death what dreams may come,",
"When we have shuffled off this mortal coil,",
"Must give us pause: there's the respect",
"That makes calamity of so long life;",
"For who would bear the whips and scorns of time,",
"The oppressor's wrong, the proud man's contumely,",
"The pangs of despis'd love, the law's delay,",
"The insolence of office, and the spurns",
"That patient merit of the unworthy takes,",
"When he himself might his quietus make",
"With a bare bodkin? who would these fardels bear,",
"To grunt and sweat under a weary life,",
"But that the dread of something after death,--",
"The undiscover'd country, from whose bourn",
"No traveller returns,--puzzles the will,",
"And makes us rather bear those ills we have",
"Than fly to others that we know not of?",
"Thus conscience does make cowards of us all;",
"And thus the native hue of resolution",
"Is sicklied o'er with the pale cast of thought;",
"And enterprises of great pith and moment,",
"With this regard, their currents turn awry,",
"And lose the name of action.--Soft you now!",
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."]
def word_count(input_path, output_path):
"""
计算文本文件中单词的频率,并将结果输出到指定路径。
该函数从指定的输入路径读取文本数据,进行单词频率统计,并将结果写入指定的输出路径。
如果没有提供输入路径或输出路径,则使用默认数据或直接打印结果。
参数:
- input_path: 输入文本文件的路径。如果为None,则使用默认数据。
- output_path: 输出结果的路径。如果为None,则直接打印结果。
"""
# 获取流处理环境并设置为流处理模式,设置并行度为1
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)
# 定义数据源
if input_path is not None:
# 从文件系统中读取数据
ds = env.from_source(
source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
input_path)
.process_static_file_set().build(),
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="file_source"
)
else:
# 使用默认数据
ds = env.from_collection(word_count_data)
# 定义分割函数,将每行文本分割成单词
def split(line):
yield from line.split()
# 计算单词频率
ds = ds.flat_map(split) \
.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.key_by(lambda i: i[0]) \
.reduce(lambda i, j: (i[0], i[1] + j[1]))
# 定义数据汇
if output_path is not None:
# 将结果写入文件系统
ds.sink_to(
sink=FileSink.for_row_format(
base_path=output_path,
encoder=Encoder.simple_string_encoder())
.with_output_file_config(
OutputFileConfig.builder()
.with_part_prefix("prefix")
.with_part_suffix(".ext")
.build())
.with_rolling_policy(RollingPolicy.default_rolling_policy())
.build()
)
else:
# 直接打印结果
ds.print()
# 提交作业以执行
env.execute()
if __name__ == '__main__':
# 配置日志输出到标准输出,设置日志级别为INFO,并格式化日志消息以仅显示消息内容
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
# 创建一个ArgumentParser对象以处理命令行参数
parser = argparse.ArgumentParser()
# 添加可选的命令行参数,用于指定输入和输出文件
parser.add_argument(
'--input',
dest='input',
required=False,
help='要处理的输入文件。')
parser.add_argument(
'--output',
dest='output',
required=False,
help='要写入结果的输出文件。')
# 获取命令行参数,排除脚本名称
argv = sys.argv[1:]
print("Command line arguments: ", argv)
# 解析已知的命令行参数,并忽略未知参数
known_args, _ = parser.parse_known_args(argv)
print("known_args: ", known_args)
# 调用word_count函数,传入从解析参数中获取的输入和输出文件路径
word_count(known_args.input, known_args.output)