Flink代码单词统计 ---批处理

  • flatMap:一对多转换操作,输入句子,输出分词后的每个词
  • groupBy:按Key分组,0代表选择第1列作为Key
  • sum:求和,1代表按照第2列进行累加
  • print:打印最终结果

1.WordCount代码编写

需求:统计一段文字中,每个单词出现的频次。

环境准备:在src/main/java目录下,新建一个包,命名为com.atguigu.wc。

1.1 批处理

批处理基本思路:

①.先逐行读入文件数据

②.然后将每一行文字拆分成单词

③.接着按照单词分组

④.统计每组数据的个数

⑤.就是对应单词的频次。

1.2 创建项目

1)创建工程

(1)打开IntelliJ IDEA,创建一个Maven工程。

(2)将这个Maven工程命名为Flinkdemo。

2)添加项目依赖

在项目的pom文件中,添加Flink的依赖,包括flink-java、flink-streaming-java,以及flink-clients(客户端,也可以省略)。

<properties>
        <flink.version>1.17.0</flink.version>
</properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

     <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
     </dependency>
</dependencies>

3)数据准备

(1)在工程根目录下新建一个Data文件夹,并在下面创建文本文件words.txt

(2)在words.txt中输入一些文字,例如:

hello flink
hello world
hello java

4)代码编写

(1)在com.atguigu.wc包下新建Java类Demo01_BatchProcess,在静态main方法中编写代码。具体代码实现如下:

package com.atguigu.wordcount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**as
 * Created by Smexy on 2023/9/3
 *
 *  计算的套路:
 *      ①计算的环境
 *          spark:SparkContext
 *          mr:   Driver
 *          flink:ExecutionEnvironment
 *      ②把要计算的数据封装为计算模型
 *         spark:  RDD(spark core)
 *                 DataFrame|DataSet(sparksql)
 *                 DStream(sparkstreaming)
 *         mr:     K-V
 *         flink:  DataSource
 *
 *      ③调用计算api
 *          RDD.转换算子()
 *          mr: 自己去编写Mapper,Reducer
 *          flink: DataSource.算子()
 *
 *  使用的是DataSetAPI(批处理)
 *
 *  -------------------------
 *      了解。后续不用了!
 *
 *
 */
public class Demo01_BatchProcess
{
    public static void main(String[] args) throws Exception {

        //创建支持flink计算的环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        //使用环境去读数据,封装为计算模型
        DataSource<String> source = env.readTextFile("data/words.txt");
        //调用计算api
        source
            /*
                hello hi hi hi
                    变为 (hello,1)
                         (h1,1)
                           (h1,1)
                             (h1,1)
                             输出到下游
             */
            .flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>()
        {/*
                String value: 输入的一行内容
                Collector<String> out: 输出结果的收集器。帮你把结果自动收集,输出到下游。
                    单词,1: 输出的数据是多列,此时就应该使用集合或Bean来封装。

                flink提供了Tuple的集合。用于封装多个列。
                    Tuple2: 用来封装2列
                    Tuple3: 用来封装3列
                    ....
                    Tuple25: 用来封装25列

             */
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    //Tuple2<String, Integer> data = new Tuple2<>(word, 1);
                    Tuple2<String, Integer> data = Tuple2.of(word, 1);
                    //收集要输出的数据
                    out.collect(data);
                }
            }
        })
            /*
                收到的是 (单词,1) 格式
                计算: 得到 (单词,N)
                 groupBy(int fileds): 适用于 对Tuple类型的数据进行聚合。传入
                    int N,N代表Tuple中的列的索引。
                  groupBy(String fileds): 适用于对Bean类型的数据进行聚合,传入的String就是
                    Bean中的属性名。
             */
            .groupBy(0)
            // 对tuple2分组后的第二列进行sum运算
            .sum(1)
            //在控制台打印输出
            .print();


    }
}

5).输出

(flink,1)
(world,1)
(hello,3)
(java,1)

1.3  常见问题

问题1.

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

解决方式:maven项目的 pom.xml安装依赖:

        <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.32</version>
            </dependency>

问题2.

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

解决办法:log4j没有配置日志记录的位置,需要配置log4j.properties,在src目录main目录resources文件夹下下新建log4j.properties

log4j.properties配置文件:

log4j.rootLogger=warn,CONSOLE,File
 
#Console
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss SSS} [%t] [%c] [%p] - %m%n
 
#File  DailyRollingFileAppender
log4j.logger.File=info
log4j.appender.File=org.apache.log4j.DailyRollingFileAppender
log4j.appender.File.layout=org.apache.log4j.PatternLayout
log4j.appender.File.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss SSS} [%t] [%c] [%p] - %m%n
log4j.appender.File.datePattern='.'yyyy-MM-dd
log4j.appender.File.Threshold = info
log4j.appender.File.append=true
log4j.appender.File.File=d://code/logs/flink/disk.log

 此时再次执行成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/410753.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

js设计模式:路由模式

作用: 业务开发时,路由这个概念无论对于前后端来说肯定是不可缺少的 前端的vue-router就是很经典的路由模式 示例: //这里模拟实现一个vueclass Vue{constructor(options){Object.keys(options).forEach(item>{this[item] options[item]})}$mount(node){console.log(挂载…

基础知识常见算法识别

特征值识别 很多常见的算法&#xff0c;如AES、DES等&#xff0c;在运算过程中会使用一些常量&#xff0c;而为了提高运算效率&#xff0c;这些常量往往被硬编码在程序中 通过识别这些特征值&#xff0c;可以对算法进行一个大致判断 算法特征值&#xff08;无特殊说明为十六…

Web3的奇迹:数字世界的新篇章

在数字化时代的潮流中&#xff0c;Web3正以其令人振奋的潜力和前景引领着我们进入一个全新的数字时代。作为互联网的下一代&#xff0c;Web3将重新定义我们对数字世界的认知和体验&#xff0c;为我们带来无限的可能性和奇迹。本文将深入探讨Web3的重要性、核心特征以及未来展望…

智能美颜引领短视频创作风潮:探秘美颜SDK技术背后的创新

美颜技术不仅改善了用户的拍摄体验&#xff0c;还推动了短视频创作的风潮。本文将深入探讨智能美颜在短视频创作中的应用&#xff0c;以及美颜SDK技术背后的创新。 一、短视频时代的美颜潮流 随着短视频应用的普及&#xff0c;用户对于视频质量的要求也越来越高。然而&#…

精品springboot科研项目工作量管理系统的设计与实现

《[含文档PPT源码等]精品基于springboot科研工作量管理系统的设计与实现[包运行成功]》该项目含有源码、文档、PPT、配套开发软件、软件安装教程、项目发布教程、包运行成功&#xff01; 软件开发环境及开发工具&#xff1a; Java——涉及技术&#xff1a; 前端使用技术&…

Wagtail安装运行并结合内网穿透实现公网访问本地网站界面

文章目录 前言1. 安装并运行Wagtail1.1 创建并激活虚拟环境 2. 安装cpolar内网穿透工具3. 实现Wagtail公网访问4. 固定的Wagtail公网地址 正文开始前给大家推荐个网站&#xff0c;前些天发现了一个巨牛的 人工智能学习网站&#xff0c; 通俗易懂&#xff0c;风趣幽默&#xf…

c# 异常处理

异常类 .NET Framework 类库中的所有异常都派生于 Exception 类&#xff0c;异常包括系统异常和应用异常。 默认所有系统异常派生于 System.SystemException&#xff0c;所有的应用程序异常派生于 System.ApplicationException。 系统异常一般不可预测&#xff0c;比如内存堆…

RabbitMQ 面试八股题整理

前言&#xff1a;本文是博主网络自行收集的一些RabbitMQ相关八股文&#xff0c;还在准备暑期实习&#xff0c;后续应该会持续更新...... 参考&#xff1a;三天吃透RabbitMQ面试八股文_牛客网 目录 RabbitMQ概述 什么是 RabbitMQ&#xff1f; 说一说RabbitMQ中的AMQP 为什么…

rancher change domain name 【rancher 更改域名】

文章目录 1. 预备条件2. 准备全部集群的直连 kubeconfig 配置文件3. 准备证书4. 更新证书4.1 Rancher 单节点运行&#xff08;默认容器自动生成自签名 SSL 证书&#xff09;#4.2 Rancher 单节点运行&#xff08;外置自签名 SSL 证书&#xff09;#4.3 Rancher HA 5. 修改 Ranche…

前端解析后端返回文件流格式数据

当后端接口返回数据是一个文件流数据时&#xff0c;如下后端返回给我的是一个pdf文件流数据 methods: {gotoPri() {protocolApi().then(res > {this.createPdf(res.data,XXX协议)})},createPdf(res, name) {// Blob构造函数返回一个新的 Blob 对象并指定type类型。let blob …

Vue.js+SpringBoot开发智能停车场管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容A. 车主端功能B. 停车工作人员功能C. 系统管理员功能1. 停车位模块2. 车辆模块3. 停车记录模块4. IC卡模块5. IC卡挂失模块 三、界面展示3.1 登录注册3.2 车辆模块3.3 停车位模块3.4 停车数据模块3.5 IC卡档案模块3.6 IC卡挂…

Pytorch学习(杂知识)

目录 Mini-batch 魔法函数 DataLoader与DataSet transformers的简介 torchvision简介 torch.no_grad() Softmax ReLu 随机梯度下降&#xff08;Stochastic Gradient Descent&#xff0c;SGD&#xff09; torch.nn.CrossEntropyLoss()&#xff0c;交叉损失函数 Tenso…

掌握C语言指针,轻松解锁代码高效性与灵活性

1. 指针与地址 1.1 概念 我们都知道计算机的数据必须存储在内存里&#xff0c;为了正确地访问这些数据&#xff0c;必须为每个数据都编上号码&#xff0c;就像门牌号、身份证号一样&#xff0c;每个编号是唯一的&#xff0c;根据编号可以准确地找到某个数据。而这些编号我们就…

申创贝拓电气设备邀您参观2024生物发酵展

参展企业介绍 BETTO贝拓电气成立于2017年&#xff0c;初期总部坐落于安徽合肥&#xff0c;从事工业电控机柜的设计和销售工作。2022年总部迁往上海&#xff0c;有了自己的制造基地&#xff0c;涉及制造和销售工业控制柜、操作台、IT机柜、户外机柜、人机界面、悬臂、电气安装成…

怎么把pdf转换成word?

怎么把pdf转换成word&#xff1f;Pdf和word在电脑上的使用非常广泛&#xff0c;pdf和word分别是由 Adobe和Microsoft 分别开发的电脑文件格式。PDF 文件可以在不同操作系统和设备上保持一致的显示效果&#xff0c;无论是在 Windows、Mac 还是移动设备上查看&#xff0c;都能保持…

车载测试面试:题库+项目

车载测试如何面试&#xff08;面试技巧&#xff09;https://blog.csdn.net/2301_79031315/article/details/136229809 入职车载测试常见面试题(附答案&#xff09;https://blog.csdn.net/2301_79031315/article/details/136229946 各大车企面试题汇总&#xff08;含答案&am…

基于R语言的Meta分析【全流程、不确定性分析】方法与Meta机器学习技术应用

Meta分析是针对某一科研问题&#xff0c;根据明确的搜索策略、选择筛选文献标准、采用严格的评价方法&#xff0c;对来源不同的研究成果进行收集、合并及定量统计分析的方法&#xff0c;最早出现于“循证医学”&#xff0c;现已广泛应用于农林生态&#xff0c;资源环境等方面。…

tigramite教程(一)解释与假定或发现的因果模型相关的关联

文章目录 0、生成一些具有同时依赖关系的示例过程1、估计&#xff08;马尔可夫等价类的&#xff09;因果图2、如果马尔可夫等价类有多个成员&#xff08;存在未定向的边&#xff09;&#xff0c;选择类的一个成员&#xff0c;这可以自动完成3、对从图中提取的因果父节点进行线性…

跨端轻量JavaScript引擎的实现与探索

一、JavaScript 1.JavaScript语言 JavaScript是ECMAScript的实现,由ECMA 39(欧洲计算机制造商协会39号技术委员会)负责制定ECMAScript标准。 ECMAScript发展史: 时间版本说明1997年7月ES1.0 发布当年7月&#xff0c;ECMA262 标准出台1998年6月ES2.0 发布该版本修改完全符合…

cgroup底层技术研究一、cgroup简介与cgroup命令行工具

本文参考以下文章&#xff1a; 58 | cgroup技术&#xff1a;内部创业公司应该独立核算成本 特此致谢&#xff01; 一、cgroup简介 1. cgroup是什么 cgroup&#xff08;Control Group&#xff09;是Linux内核提供的一种机制&#xff0c;用于对进程或进程组进行资源限制、优先…