【Apache Flink】Flink DataStream API的基本使用

Flink DataStream API的基本使用

文章目录

  • 前言
  • 1. 基本使用方法
  • 2. 核心示例代码
  • 3. 完成工程代码
    • pom.xml
    • WordCountExample
    • 测试验证
  • 4. Stream 执行环境
  • 5. 参考文档

前言

Flink DataStream API主要用于处理无界和有界数据流 。
无界数据流是一个持续生成数据的数据源,它没有明确的结束点,例如实时的交易数据或传感器数据。这种类型的数据流需要使用Apache Flink的实时处理功能来连续地处理和分析。

有界数据流是一个具有明确开始和结束点的数据集,例如一个文件或数据库表。这种类型的数据流通常在批处理场景中使用,其中所有数据都已经可用,并可以一次性处理。

Flink的DataStream API提供了一套丰富的操作符,如map、filter、reduce、aggregations、windowing、join等,以支持各种复杂的数据处理和分析需求。此外,DataStream API还提供了容错保证,能确保在发生故障时,应用程序能从最近的检查点(checkpoint)恢复,从而实现精确一次(exactly-once)的处理语义。

1. 基本使用方法

  1. 创建执行环境:

    每一个Flink程序都需要创建一个StreamExecutionEnvironment(执行环境),它可以被用来设置参数和创建从外部系统读取数据的流。

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
  2. 创建数据流:

    你可以从各种数据源中创建数据流,如本地集合,文件,socket等。下面的代码是从本地集合创建数据流的示例:

    DataStream<String> dataStream = env.fromElements("hello", "flink");
    
  3. 转换操作:

    Flink提供了丰富的转换操作,如mapfilterreduce等。以下代码首先将每个字符串映射为其长度,然后过滤出长度大于5的元素:

    DataStream<Integer> transformedStream = dataStream
        .map(s -> s.length())
        .filter(l -> l > 5);
    
  4. 数据输出:

    Flink支持将数据流输出到各种存储系统,如文件,socket,数据库等。下面的代码将数据流输出到标准输出:

    transformedStream.print();
    
  5. 执行程序:

    将上述所有步骤放在main函数中,并在最后调用env.execute()方法来启动程序。Flink程序是懒加载的,只有在调用execute方法时才会真正开始执行。

    env.execute("Flink Basic API Usage");
    

2. 核心示例代码

使用Flink DataStream API构建一个实时Word Count程序,它会从一个socket端口读取文本数据,统计每个单词的出现次数,并将结果输出到标准输出。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 创建数据流,从socket接收数据,需要在本地启动一个端口为9000的socket服务器
        DataStream<String> textStream = env.socketTextStream("localhost", 9000);

        // 3. 转换操作
        DataStream<Tuple2<String, Integer>> wordCountStream = textStream
                .flatMap(new LineSplitter()) // 将文本行切分为单词
                .keyBy(0) // 按单词分组
                .sum(1); // 对每个单词的计数求和

        // 4. 数据输出
        wordCountStream.print();

        // 5. 执行程序
        env.execute("Socket Word Count Example");
    }

    // 自定义一个FlatMapFunction,将输入的每一行文本切分为单词,并输出为Tuple2,第一个元素是单词,第二个元素是计数(初始值为1)
    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

3. 完成工程代码

下面是一个基于Apache Flink的实时单词计数应用程序的完整工程代码,包括Pom.xml文件和所有Java类。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>flink-wordcount-example</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <flink.version>1.13.2</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

WordCountExample

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 创建数据流,从socket接收数据,需要在本地启动一个端口为9000的socket服务器
        DataStream<String> textStream = env.socketTextStream("localhost", 9000);

        // 3. 转换操作
        DataStream<Tuple2<String, Integer>> wordCountStream = textStream
                .flatMap(new LineSplitter())  // 将文本行切分为单词
                .keyBy(0)  // 按单词分组
                .sum(1);  // 对每个单词的计数求和

        // 4. 数据输出
        wordCountStream.print();

        // 5. 执行程序
        env.execute("Socket Word Count Example");
    }

    // 自定义一个FlatMapFunction,将输入的每一行文本切分为单词,并输出为Tuple2,第一个元素是单词,第二个元素是计数(初始值为1)
    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

现在,你可以使用Maven编译并运行这个程序。在启动程序之前,你需要在本地启动一个端口为9000的Socket服务器。这可以通过使用Netcat工具 (nc -lk 9000) 或者其他任何能打开端口的工具实现。然后,你可以输入文本行,Flink程序会统计每个单词出现的次数,并实时打印结果。

测试验证

用py在本地启动一个socket服务器,监听9000端口,

python比较简单实现一个socket通信 。写一个Python来验证上面写的例子。

import socket

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 9000))
server_socket.listen(1)

print("Waiting for connection...")
client_socket, client_address = server_socket.accept()
print("Connected to:", client_address)

while True:
    data = input("Enter text: ")
    client_socket.sendall(data.encode())

运行Flink程序和Python socket服务器,然后在Python程序中输入文本, 会看到Flink程序实时统计每个单词出现的次数并输出到控制台。

4. Stream 执行环境

开发学习过程中,不需要关注。每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment。
在这里插入图片描述

DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。

注意,如果没有调用 execute(),应用就不会运行。

Flink runtime: client, job manager, task managers
此分布式运行时取决于你的应用是否是可序列化的。它还要求所有依赖对集群中的每个节点均可用。

5. 参考文档

https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/datastream_api/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/110534.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue引入字体

假定已经下了字体包 1、在src/assets文件夹下新建一个font文件夹&#xff0c;放入字体文件与配置文件 这个与两个字体&#xff0c;优设标题黑和DIN字体&#xff0c;font.css用于给字体取名&#xff0c;将字体引入网站。 font-face { font-family: YouSheBiaoTiHei; src: url(…

美术如何创建 skybox 贴图资源?

文章目录 目的PS手绘Panorama To CubemapPS手绘Pano2VRSkybox & Cubemap Tutorial (Maya & Photoshop)Unity 中使用 ReflectionProbe 生成 Cubemap 然后再 PS 调整PS直接手绘 cubemapBlender 导入 Panorama&#xff0c;然后烘焙到 cubemap&#xff0c;再导入unity中使用…

动态表单生成Demo(Vue+elment)

摘要&#xff1a;本文将介绍如何使用vue和elment ui组件库实现一个简单的动态表单生成的Demo。主要涉及两个.vue文件的书写&#xff0c;一个是动态表单生成的组件文件&#xff0c;一个是使用该动态表单生成的组件。 1.动态表单生成组件 这里仅集成了输入框、选择框、日期框三种…

字符集详解

常见字符集介绍 字符集基础知识&#xff1a; 计算机底层不可以直接存储字符的。 计算机中底层只能存储二进制(0、1) 。 二进制是可以转换成十进制的。 结论&#xff1a;计算机底层可以表示成十进制编号。计算机可以给人类字符进行编号存储&#xff0c;这套编号规则就是字符…

国内内卷太严重,还不考虑一下在海外接单?那这几个平台你知道吗?

作为一个程序员&#xff0c;在平台上接单赚点外快是再正常不过的事情了&#xff0c;但是现今国内各个平台都内卷比较严重&#xff0c;你是否考虑过去“外面的世界”看看&#xff1f; 如果想过&#xff0c;那么这几个外国的接单平台你都知道吗&#xff1f; 接下来就和我一起来看…

超实用的企业防范数据泄露小技巧!

超实用的企业防范数据泄露小技巧&#xff01; 小技巧1、加强员工培训&#xff0c;提高防范思想 及时向员工传达有关安全信息&#xff0c;加强员工意识、认识和执行安全措施&#xff0c;以防止数据泄露发生。 小技巧2、建立安全政策&#xff0c;明确处理流程 企业应该建立安…

Tower for Mac—Git客户端 支持M1

Tower是一款Mac OS X系统上的Git客户端软件&#xff0c;它提供了丰富的功能和工具&#xff0c;帮助用户更加方便地管理和使用Git版本控制系统。以下是Tower的一些特点&#xff1a; 1. 界面友好&#xff1a;Tower的界面友好&#xff0c;使用户能够轻松地掌握软件的使用方法。 …

MySQL扩展语句和约束方式

一、扩展语句 复制&#xff0c;通过like这个语法直接复制bbb的表结构。只是复制表结构&#xff0c;不能复制表里面的数据 把bbb表里面的数据&#xff0c;复制到test&#xff0c;两个表数据结构要一致 创建一张表&#xff0c;test1,数据从bbb来&#xff0c;表结构也是bbb delete…

数据库深入浅出,数据库介绍,SQL介绍,DDL、DML、DQL、TCL介绍

一、基础知识&#xff1a; 1.数据库基础知识 数据(Data)&#xff1a;文本信息(字母、数字、符号等)、音频、视频、图片等&#xff1b; 数据库(DataBase)&#xff1a;存储数据的仓库&#xff0c;本质文件&#xff0c;以文件的形式将数据保存到电脑磁盘中 数据库管理系统(DBMS)&…

csapp datalab

知识点总结 1. 逻辑运算符关系 and&#xff08;与&#xff09;、or&#xff08;或&#xff09;和xor&#xff08;异或&#xff09;是逻辑运算符&#xff0c;用于对布尔值进行操作。它们可以在不同的逻辑表达式之间进行转换。下面是and、or和xor之间的转换规则&#xff1a; a…

网络协议--TCP的未来和性能

24.1 引言 TCP已经在从1200 b/s的拨号SLIP链路到以太数据链路上运行了许多年。在80年代和90年代初期&#xff0c;以太网是运行TCP/IP最主要的数据链路方式。虽然TCP在比以太网速率高的环境&#xff08;如T2电话线、FDDI及千兆比网络&#xff09;中也能够正确运行&#xff0c;但…

AcWing第 127 场周赛 - AcWing 5283. 牛棚入住+AcWing 5284. 构造矩阵 - 模拟+快速幂+数学

AcWing 5283. 牛棚入住 题目数据范围不大&#xff0c;直接暴力模拟即可 按照题目所说的意思即可。 #include <math.h> #include <stdio.h> #include <algorithm> #include <cstring> #include <iostream> using namespace std; const int N 1…

Spring Cloud之ElasticSearch的学习【详细】

目录 ElasticSearch 正向索引与倒排索引 数据库与elasticsearch概念对比 安装ES、Kibana与分词器 分词器作用 自定义字典 拓展词库 禁用词库 索引库操作 Mapping属性 创建索引库 查询索引库 删除索引库 修改索引库 文档操作 新增文档 查找文档 修改文档 全量…

使用内网穿透工具进行支付宝沙箱环境支付的SDK接口远程测试

Java支付宝沙箱环境支付&#xff0c;SDK接口远程调试【内网穿透】 1.测试环境 MavenSpring bootJdk 1.8 2.本地配置 获取支付宝支付Java SDK,maven项目可以选择maven版本,普通java项目可以在GitHub下载,这里以maven为例 SDK下载地址&#xff1a;https://doc.open.alipay.com…

删除的PPT怎么找回来?4个必备恢复方法!

“最近的期末展示需要制作一个PPT&#xff0c;我熬了几个大夜才完成了&#xff0c;但是不知道怎么的我在删除其他文件时不小心把这个PPT一起删掉了&#xff0c;有什么方法可以帮我找回这个误删的PPT吗&#xff1f;” 我们在工作和学习中&#xff0c;经常都需要使用到PPT&#x…

白票某度自媒体混剪剪辑视频素材/爬虫软件说明文档

大家好&#xff0c;我是淘小白~ 软件&#xff1a;某度自媒体混剪素材爬虫软件 语言&#xff1a;Python 说明文档&#xff1a; 1、自定义关键词采集 2、采集百度aigc视频素材&#xff0c;经过测试&#xff0c;使用剪映的文字成片某度视频素材&#xff0c;可过头条的原创检测…

杂牌行车记录仪特殊AVI结构恢复案例

最近遇到一个杂牌的行车记录仪需要恢复数据&#xff0c;其使用AVI格式&#xff0c;但是在扫描恢复的过程中却发现厂家对其AVI结构进行了“魔改”致程序无法正常识别 故障存储:16G SD卡 fat32文件系统 故障现象: 16G的SD卡&#xff0c;在发生事故后客户尝试自行接到手机上读…

Mybatis—基础操作

mybatis入门后&#xff0c;继续学习mybatis基础操作。 目录 Mybatis基础操作准备工作删除操作日志输入预编译SQLSQL注入参数占位符 新增操作基本新增添加后返回主键 更新操作查询操作根据id查询数据封装条件查询条件查询 Mybatis基础操作 准备工作 根据下面页面原型及需求&am…

【产品运营】产品需求应该如何管理

产品项目在进行时经常会有一些需求需要实现&#xff0c;需求是产品更新迭代的动力&#xff0c;需求也是从用户诉求转化而来&#xff1b;在做需求管理时&#xff0c;我们需要判断一个需求的优先级等方面&#xff0c;对产品进行优化&#xff1b; 目录&#xff1a; 一、 为什么要…

游戏在小米设备上因自适应刷新率功能,帧率减半

1&#xff09;游戏在小米设备上因自适应刷新率功能&#xff0c;帧率减半 2&#xff09;Lua在计算时出现非法值&#xff0c;开启Debugger之后不再触发 3&#xff09;如何在Unity中实现液体蔓延的效果 这是第357篇UWA技术知识分享的推送&#xff0c;精选了UWA社区的热门话题&…