flink学习(3)——方法的使用—对流的处理(map,flatMap,filter)

map

数据

86.149.9.216 10001 17/05/2015:10:05:30 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 10002 17/05/2015:10:06:53 GET /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:06:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:07:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:08:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:09:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:10:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:16:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
10.0.0.1 10003 17/05/2015:10:26:53 POST /presentations/logstash-monitorama-2013/css/print/paper.css
package com.bigdata.day02;

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @基本功能:
 * @program:flinkProject
 * @author: jinnian
 * @create:2024-11-21 16:26:15
 **/
public class _01_map {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> dataStreamSource = env.readTextFile("datas/a.log");
        //3. transformation-数据处理转换
        dataStreamSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                String[] line = s.split("\\s+");
                LogBean logBean = new LogBean();
                logBean.setIp(line[0]);
                logBean.setUserId(Integer.parseInt(line[1]));
                logBean.setMethod(line[3]);
                
                // 解析时间的方式
                
                // 17/05/2015:10:05:30
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:hh:mm:ss");
                Date date = simpleDateFormat.parse(line[2]);
                
                // 另一种方法
                Date date1 = DateUtils.parseDate(line[2],"dd/MM/yyyy:hh:mm:ss");
                
                logBean.setTimestamp(date1.getTime());
                logBean.setPath(line[4]);

                return JSONObject.toJSONString(logBean);
            }
        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

flatMap案例

张三,苹果手机,联想电脑,华为平板
李四,华为手机,苹果电脑,小米平板
package com.bigdata.day02;

public class _02_flatmap {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> dataStreamSource = env.readTextFile("datas/flatmap.log");
        //3. transformation-数据处理转换
        dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] split = s.split(",");
                for (int i = 1; i < split.length; i++) {
                    collector.collect(split[0]+"有"+split[i]);
                }
            }
        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

filter案例

package com.bigdata.day02;

public class _03_filter {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> dataStreamSource = env.readTextFile("datas/a.log");
        //3. transformation-数据处理转换
        dataStreamSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                String[] line = s.split("\\s+");
                LogBean logBean = new LogBean();
                logBean.setIp(line[0]);
                logBean.setUserId(Integer.parseInt(line[1]));
                logBean.setMethod(line[3]);
                // 17/05/2015:10:05:30
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:hh:mm:ss");
                Date date = simpleDateFormat.parse(line[2]);
                // 另一种方法
                Date date1 = DateUtils.parseDate(line[2],"dd/MM/yyyy:hh:mm:ss");
                logBean.setTimestamp(date1.getTime());
                logBean.setPath(line[4]);

                return JSONObject.toJSONString(logBean);
            }
        }).filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {

                return s.contains("83.149.9.216");
            }
        }).print();
        //4. sink-数据输出

        //5. execute-执行
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/924341.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[在线实验]-ActiveMQ Docker镜像的下载与部署

镜像下载 下载ActiveMQ的Docker镜像文件。通常&#xff0c;这些文件会以.tar格式提供&#xff0c;例如activemq.tar。 docker的activemq镜像资源-CSDN文库 加载镜像 下载完成后&#xff0c;您可以使用以下命令将镜像文件加载到Docker中&#xff1a; docker load --input a…

【AI绘画】Midjourney进阶:色调详解(下)

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: AI绘画 | Midjourney 文章目录 &#x1f4af;前言&#x1f4af;Midjourney中的色彩控制为什么要控制色彩&#xff1f;为什么要在Midjourney中控制色彩&#xff1f; &#x1f4af;色调纯色调灰色调暗色调 &#x1f4af…

YOLO系列论文综述(从YOLOv1到YOLOv11)【第3篇:YOLOv1——YOLO的开山之作】

YOLOv1 1 摘要2 YOLO: You Only Look Once2.1 如何工作2.2 网络架构2.3 训练2.4 优缺点 YOLO系列博文&#xff1a; 【第1篇&#xff1a;概述物体检测算法发展史、YOLO应用领域、评价指标和NMS】【第2篇&#xff1a;YOLO系列论文、代码和主要优缺点汇总】 ——————————…

数字图像处理(9):VGA接口及其时序

&#xff08;1&#xff09;特点&#xff1a;成本低、结构简单、应用灵活 VGA接口需要五个信号&#xff1a;R、G、B、Hsync、Vsync &#xff08;2&#xff09;VGA的工作原理&#xff1a; 设定一个高速时钟信号&#xff08;像素时钟&#xff09;来控制每个像素的传输速率&#…

微信小程序按字母顺序渲染城市 功能实现详细讲解

在微信小程序功能搭建中&#xff0c;按字母渲染城市会用到多个ES6的方法&#xff0c;如reduce&#xff0c;map&#xff0c;Object.entries()&#xff0c;Object.keys() &#xff0c;需要组合熟练掌握&#xff0c;才能优雅的处理数据完成渲染。 目录 一、数据分析 二、数据处理 …

DVWA靶场通过——文件上传漏洞

File Upload漏洞 它允许攻击者通过上传恶意文件来执行任意代码、窃取数据、获取服务器权限&#xff0c;甚至完全控制服务器。为了防止文件上传漏洞&#xff0c;开发者需要对文件上传过程进行严格的验证和处理。 1. 文件上传漏洞概述 文件上传漏洞发生在Web应用程序允许用户通过…

react后台管理系统(一)

&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;React篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来React篇专栏内容:react后台管理系统(一) 前言 本文档详细介绍了如何从零开始搭建一个基于 React 和 Ant Design 的…

Vue.js --- 生命周期

1. 前言 在 Vue.js 中&#xff0c;生命周期是指一个 Vue 实例从创建到销毁的过程。Vue 提供了一系列的生命周期钩子&#xff08;lifecycle hooks&#xff09;&#xff0c;让开发者可以在不同的阶段执行特定的代码。了解这些生命周期钩子是构建 Vue 组件的基础&#xff0c;能够…

使用1panel一键安装Ollama WebUI连接本地Ollama使用开源ai模型

当前我的环境 设备有限只有一张3060 12gb显卡&#xff0c;平时用来轻度学习 主机&#xff1a;windows server Ollama&#xff1a;windows版&#xff08;它也有linux和mac&#xff09; 因虚拟机使用的服务器无显卡&#xff0c;只用来跑面板和WebUi 虚拟机&#xff1a;ubuntu se…

任意文件读取漏洞(CVE-2024-7928)修复

验证CVE-2024-7928问题是否存在可以使用如下方法&#xff1a; https://域名/index/ajax/lang?lang..//..//目录名/文件名&#xff08;不带后缀&#xff09; 目录名是该项目的一个目录&#xff0c;这里目录位置为nginx设置站点目录为基准&#xff0c;网上两层目录。 文件名…

房屋出租出售预约系统支持微信小程序+H5+APP

核心功能有&#xff1a;新盘销售、房屋租赁、地图找房、小区找房&#xff0c;地铁找房等方式。 地图找房&#xff1a;通过地图标注查看附近房源&#xff0c;方便用户根据地理位置查找合适的房产。二手房资讯&#xff1a;提供租房及二手房市场的相关资讯&#xff0c;帮助用户了…

设计模式:11、迭代器模式(游标)

目录 0、定义 1、迭代器模式的四种角色 2、迭代器模式的UML类图 3、示例代码 4、迭代器的next()方法与集合的get(int index)方法的效率对比&#xff08;LinkedList为例&#xff09; 0、定义 提供一种方法顺序访问一个聚合对象中的各个元素&#xff0c;而又不需要暴露该对象…

抓SystemTrace的简易方法

前言&#xff1a; Systrace是分析Android性能问题的神器。一般抓trace命令是需要在AndroidSDK下的\platform-tools\systrace执行\systrace.py&#xff0c;很奇怪我的AndroidSDK并没有systrace文件夹&#xff0c;于是CSDN单独下载了trace文件 但是我一运行.\systrace.py -b 102…

微信小程序 城市点击后跳转 并首页显示被点击城市

在微信小程序中&#xff0c;渲染出城市列表后&#xff0c;如何点击城市&#xff0c;就跳转回到首页&#xff0c;并在首页显示所点击的城市呢&#xff1f; 目录 一、定义点击城市的事件 二、首页的处理 首页&#xff1a;点击成都市会跳转到城市列表 城市列表&#xff1a;点击…

修改IDEA配置导致Spring Boot项目读取application.properties中文乱码问题

之前很多配置都是放在nacos里面&#xff0c;然后这次同事有个配置写在application.properties中&#xff0c;这个配置含有中文&#xff0c;启动之后发现拿到的中文值会乱码&#xff0c;然后就帮忙看了一下问题。 排查问题 经过不停的百度、排查发现&#xff0c;spring读取app…

STM32F103系列单片机通用和复用I/O(GPIO)

一、GPIO功能描述 每个GPI/O端口有两个32位配置寄存器(GPIOx_CRL&#xff0c; GPIOx_CRH)&#xff0c;两个32位数据寄存器(GPIOx_IDR和GPIOx_ODR)&#xff0c;一个32位置位/复位寄存器(GPIOx_BSRR)&#xff0c;一个16位复位寄存器(GPIOx_BRR)和一个32位锁定寄存器(GPIOx_LCKR)。…

从 App Search 到 Elasticsearch — 挖掘搜索的未来

作者&#xff1a;来自 Elastic Nick Chow App Search 将在 9.0 版本中停用&#xff0c;但 Elasticsearch 拥有你构建强大的 AI 搜索体验所需的一切。以下是你需要了解的内容。 生成式人工智能的最新进展正在改变用户行为&#xff0c;激励开发人员创造更具活力、更直观、更引人入…

《生成式 AI》课程 第7講:大型語言模型修練史 — 第二階段: 名師指點,發揮潛力 (兼談對 ChatGPT 做逆向工程與 LLaMA 時代的開始)

资料来自李宏毅老师《生成式 AI》课程&#xff0c;如有侵权请通知下线 Introduction to Generative AI 2024 Springhttps://speech.ee.ntu.edu.tw/~hylee/genai/2024-spring.php 摘要 这一系列的作业是为 2024 年春季的《生成式 AI》课程设计的&#xff0c;共包含十个作业。…

LabVIEW动态显示控件方案

在LabVIEW开发中&#xff0c;涉及到动态显示和控制界面的设计时&#xff0c;经常需要根据用户选择的不同参数来动态显示或隐藏相关控件。例如&#xff0c;某些能可能会根据“Type”控件的不同选择显示不同的参数&#xff0c;如“Target”、“Duration”和“EndType”等。在一个…

DVWA靶场——File Inclusion

File Inclusion&#xff08;文件包含&#xff09;漏洞 指攻击者通过恶意构造输入&#xff0c;利用应用程序错误的文件包含机制&#xff0c;导致程序包含并执行未经授权的本地或远程文件。这类漏洞广泛存在于Web应用程序中&#xff0c;尤其是在那些允许用户提供文件路径或URL的地…