02. Flink 快速上手

02. Flink 快速上手

1、创建项目导入依赖

pom文件:

<properties>
	<flink.version>1.17.0</flink.version>
</properties>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java</artifactId>
  <version>${flink.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients</artifactId>
  <version>${flink.version}</version>
</dependency>

2、需求

批处理基本思路:先逐行读取文本,在根据空格进行单词拆分,最后再去统计每个单词出现的频率。

(1)数据准备

在工程目录下新建文件夹input,新建文本words.txt。

文件输入:

hello world
hello flink
hello java

2.1 批处理

代码编写(使用DataSet API实现)

package com.company.onedayflink.demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class FlinkBatchWords {

    public static void main(String[] args) throws Exception {
        // 1、创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2、从文件中读取数据
        DataSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");
        // 3、切分、转换
        FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            /**
             *
             * @param value     读取到的输入
             * @param out       返回的内容,Tuple2是一个二元分组,(字符串,个数)。
             * @throws Exception
             */
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                // 3.1 切分
                for (String s : value.split(" ")) {
                    // 3.2 将单组转为二元组
                    Tuple2<String, Integer> tuple = Tuple2.of(s, 1);
                    // 3.3 将二元组发送给下游
                    out.collect(tuple);
                }

            }
        });
        // 4、按照 word 分组
        UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroup = wordAndOne.groupBy(0); // 0 表示下标为0的参数,也就是二元组的String单词
        // 5、各分组聚合
        AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroup.sum(1);//1 表示下标1的元素,即单词个数
        // 6、输出
        sum.print();
    }
}

运行结果:

image-20240519130034466

2.2 流处理

2.2.1 有界流

代码编写(使用DataStream API实现,读取文件属于有界流)

package com.company.onedayflink.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

@Slf4j
public class FlinkStreamWords {
    public static void main(String[] args) throws Exception {
        // 1、创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、从文件中读取数据
        DataStreamSource<String> lineDS = env.readTextFile("one-day-flink/input/words.txt");
        // 3、处理数据(切换、转换、分组、聚合)
        // 3.1 切换、转换
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                for (String s : value.split(" ")) {
                    // 构建二元组
                    Tuple2<String, Integer> tuple = Tuple2.of(s, 1);
                    // 通过采集器向下游发送数据
                    out.collect(tuple);
                }
            }
        });

        // 3.2 分组, KeySelector<IN, KEY> 中 IN 表示输入的类型,KEY 表示分组key的类型
        KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOne
                .keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0); // value.f0 表示二元组的第一个元素

        // 3.3 聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOneKS.sum(1);  // 1 表示二元组的第二个元素

        // 4、输出数据
        sum.print();
        // 5、执行
        env.execute();
    }
}

执行结果:

2> (java,1)
3> (hello,1)
3> (hello,2)
3> (hello,3)
6> (world,1)
8> (flink,1)

前面的编号是并行度,线程数。

2.2.2 无界流

(1)使用 netcat 监听7777端口,建立stream流

安装 netcat

brew install netcat

监听 7777 端口

nc -lk 7777

(2)代码编写(使用DataStream API实现,读取stream流属于无界流)

package com.company.onedayflink.demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FlinkSteamSocketWords {
    public static void main(String[] args) throws Exception {
        // 1、创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、读取数据(其中hostname 是需要监听的主机名称,mac电脑可以在终端使用hostname命令查看)
        DataStreamSource<String> socketDS = env.socketTextStream("zgyMacBook-Pro.local", 7777);
        // 3、数据处理(切割、转换、分组、聚合)
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
                    // 3.1 切分
                    for (String s : value.split(" ")) {
                        // 3.2 将单组转为二元组
                        Tuple2<String, Integer> tuple = Tuple2.of(s, 1);
                        // 3.3 将二元组发送给下游
                        out.collect(tuple);
                    }
                })
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1);

        // 4、输出
        sum.print();
        // 5、执行
        env.execute();
    }
}

(3)测试

在终端发送消息

hello flink
hello world

观察程序控制台打印

8> (flink,1)
3> (hello,1)
6> (world,1)
3> (hello,2)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/642720.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot 中缓存的用法

缓存&#xff08;Caching&#xff09;是提升应用性能的重要手段之一&#xff0c;通过减少不必要的数据计算和数据库访问&#xff0c;显著提高系统的响应速度。在 Spring Boot 中&#xff0c;缓存机制被集成得非常好&#xff0c;使得我们能够快速、方便地使用缓存功能。本文将介…

python-pytorch 下批量seq2seq+Bahdanau Attention实现问答1.0.000

python-pytorch 下批量seq2seq+Bahdanau Attention实现简单问答1.0.000 前言原理看图数据准备分词、index2word、word2index、vocab_size输入模型的数据构造注意力模型decoder的编写关于损失函数和优化器在预测时完整代码参考前言 前面实现了 luong的dot 、general、concat注意…

CCF20230901——坐标变换(其一)

CCF20230901——坐标变换&#xff08;其一&#xff09; #include<bits/stdc.h> using namespace std; int main() {int n,m,x[101],y[101],x1[101],y1[101];cin>>n>>m;for(int i0;i<n;i)cin>>x1[i]>>y1[i];for(int j0;j<m;j)cin>>x[…

【活动】开源与闭源大模型:探索未来趋势的双轨道路

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 开源与闭源大模型&#xff1a;探索未来趋势的双轨道路引言一、开源大模型&#…

数据库缓存 buffer pool详解

什么是buffer pool buffer pool, 又称之缓存池, 是mysql中为了提升查询性能而引入的缓存, 如果每次查询和修改都去操作磁盘的话, 性能就会很差, 从而引入 Buffer Pool包含多个缓冲页&#xff08;默认大小通常为16KB&#xff09;&#xff0c;每个缓冲页都有对应的控制信息&#…

【TB作品】stm32单片机读取DS2401程序

DS2401是由Analog Devices公司生产的一种硅序列号芯片&#xff0c;它提供了一个绝对唯一的64位ROM识别码&#xff0c;用于确保可追溯性。以下是对DS2401器件的分析&#xff1a; 特点和优势&#xff1a; 唯一性&#xff1a;每个DS2401芯片都有一个独一无二的64位注册码&#x…

Windows安装VMware(Broadcom)

1.安装前提 1.检查BIOS中是否开启了虚拟化技术。1.1 打开任务管理器&#xff0c;查看性能&#xff0c;CPU部分&#xff0c;虚拟化处于“已启用”状态。1.2 如果没有开启&#xff0c;则需要进入BIOS系统&#xff0c;将 Intel Virtualization Technology改为Enalble。2.下载VMwa…

海外CDN加速方式

随着全球化经济的进一步推进和互联网时代的到来&#xff0c;给对外贸易行业带来了巨大的商机&#xff0c;众多传统的贸易公司都纷纷建立起自已的外贸网站或服务站点等各种信息化平台&#xff0c; 相当多的贸易公司也从他们所构建的平台中得到了很高的利益&#xff0c;然而由于当…

推荐个免费天气接口

http://www.tianqiapi.com/index/doc?versionmonthhttp://www.tianqiapi.com/index/doc?versionmonth 个人博客使用足够了&#xff01;

浅析FAT32文件系统

本文通过实验测试了FAT文件系统的存储规律&#xff0c;并且探究了部分可能的文件隐藏方法。 实验背景 现有一块硬盘&#xff08;U盘&#xff09;&#xff0c;其中存在两个分区&#xff0c;分别为FAT32和NTFS文件系统分区。 在FAT分区中存在如下文件&#xff1a; 现需要阅读底…

Android Compose 九:常用组件列表 简单使用

遇事不决 先看官方文档 列表和网格 如果不需要任何滚动&#xff0c;通过Column 或 Row可以使用verticalScroll() 使Column滚动 Column(modifier Modifier.verticalScroll(rememberScrollState())) {for (i in 0..50){Text(text "条目>>${i}")}}显示大量列表…

移动硬盘难题:不显示容量与无法访问的解决策略

在使用移动硬盘的过程中&#xff0c;有时会遇到一些棘手的问题&#xff0c;比如移动硬盘不显示容量且无法访问。这种情况让人十分头疼&#xff0c;因为它不仅影响了数据的正常使用&#xff0c;还可能导致重要数据的丢失。接下来&#xff0c;我们就来详细探讨一下这个问题及其解…

弹性盒子布局,flex布局

弹性盒子布局&#xff08;Flexbox&#xff09;是CSS3引入的一种新的布局模式&#xff0c;它提供了一种更加有效的方式来设计、布局和对齐容器中的项目&#xff0c;即使容器的大小动态改变或者项目的数量未知。 弹性盒子布局的主要特点是能够轻松地在不同的屏幕大小和设备上实现…

虚拟局域网(VLAN)

关键词&#xff1a;veth、vlan、bridge、iptables、nat、tcpdump、icmp、cidr、arp、路由表、计算机网络协议栈 前言 在过去的几十年里&#xff0c;互联网发展得非常快。许多新兴技术迅速崛起&#xff0c;也有不少曾经的主流技术被淘汰。然而&#xff0c;有些技术因为其基础性…

iPhone实况照片从Windows资源管理器复制的JPG+MOV无法正常还原到iPhone

背景&#xff1a; 之前使用的iPhone 15 Pro&#xff0c;使用的Windows资源管理器当中复制导出的实况照片&#xff0c;复制出来的格式例如IMG_0001.JPG, IMG_0001.MOV。之后手机就卖掉了。现在使用的iPhone 14 Pro Max&#xff0c;想要导回之前备份的实况照片。尝试使用爱思助手…

解决vue版本不一致导致不能正常编译

解决vue版本不一致导致不能正常编译 异常现象分析原因解决方案 异常现象 项目原本运行无异常&#xff0c;但安装了一个el-table-infinite-scroll的插件后&#xff0c;编译报错&#xff0c;截图如下 分析原因 vue版本与compile版本不一致&#xff0c;应该统一起来&#xff0…

网创教程:WordPress插件网创自动采集并发布

网创教程&#xff1a;WordPress插件网创自动采集并发布 使用插件注意事项&#xff1a; 如果遇到404错误&#xff0c;请先检查并调整网站的伪静态设置&#xff0c;这是最常见的问题。需要定制化服务&#xff0c;请随时联系我。 本次更新内容 我们进行了多项更新和优化&#x…

1.存储部分

1.Flash Memory--闪速存储器&#xff08;注&#xff1a;U盘&#xff0c;SD卡就是闪存&#xff09;在EEPROM基础上发展而来的&#xff0c;断电后也能保存信息&#xff0c;且可进行多次 快速擦除重写。注意&#xff1a;由于闪存需要先擦除再写入&#xff0c;因此闪存写的速度要比…

ssm141餐厅点菜管理系统+vue

餐厅点菜管理系统的设计与实现 摘 要 网络技术和计算机技术发展至今&#xff0c;已经拥有了深厚的理论基础&#xff0c;并在现实中进行了充分运用&#xff0c;尤其是基于计算机运行的软件更是受到各界的关注。加上现在人们已经步入信息时代&#xff0c;所以对于信息的宣传和管…

Unity-Sprite Atlas+UGUI系统的运行原理

每日一句&#xff1a;别听世俗耳语&#xff0c;看自己的风景就好 目录 SA的原理&#xff1a; SA的优点&#xff1a; SA的缺点&#xff1a; DrawCall是什么&#xff1f; 批处理是什么&#xff1f; 我们先了解一下UGUI系统的运行原理吧&#xff01; 提到图集优化&#xff0…