Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!

代码仓库

会同步代码到 GitHub
https://github.com/turbo-duck/flink-demo

在这里插入图片描述

当前章节

继续上一节的内容:https://blog.csdn.net/w776341482/article/details/139875037

上一节中,我们需要使用 nc 或者 telnet 等工具来模拟 Socket 流。这节我们写一个 ServerSocket 来模拟这些 操作,让流自动的写入不用我们手动去操作了。

POM.xml

与上一节一致,不需要修改

编写代码

还是和上一节一样的 Socket 流,这里略去其他的代码

DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream(ip, port, "\n");

FlinkServer

继承 Thread 启动一个线程来进行Flink的服务

package icu.wzk.demo03;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class FlinkServer extends Thread {

    @Override
    public void run() {
        String ip = "0.0.0.0";
        int port = 9999;
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> textStream = streamExecutionEnvironment.socketTextStream(ip, port, "\n");
        SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = textStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
                String[] splits = s.split("\\s");
                for (String word : splits) {
                    collector.collect(Tuple2.of(word, 1L));
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator
                .keyBy(new KeySelector<Tuple2<String, Long>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {
                        return stringLongTuple2.f0;
                    }
                })
                .window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
                .sum(1);
        System.out.println("wait word print()");
        word.print();
        try {
            streamExecutionEnvironment.execute("stream!");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

NumRandom

使用 ServerSocket 实现一个持续的流输出

package icu.wzk.demo03;

import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

public class RandomNumClient extends Thread {

    @Override
    public void run() {
        String ip = "0.0.0.0";
        int port = 9999;
        try {
            ServerSocket serverSocket = new ServerSocket();
            InetSocketAddress address = new InetSocketAddress(ip, port);
            serverSocket.bind(address);
            Socket socket = serverSocket.accept();
            OutputStream output = socket.getOutputStream();
            PrintWriter writer = new PrintWriter(output, true);
            Random random = new Random();
            for (int i = 0; i < 500; i ++) {
                int randomNumber = random.nextInt(10) + 1;
                writer.println(randomNumber);
                System.out.println("ServerSocket Send To Flink: " + randomNumber);
                Thread.sleep(200);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

}

StartApp

将上述的两个类组装起来

请添加图片描述

package icu.wzk.demo03;


public class StartApp {

    public static void main(String[] args) throws Exception {
        RandomNumClient randomNumClient = new RandomNumClient();
        FlinkServer flinkServer = new FlinkServer();
        flinkServer.start();
        randomNumClient.start();
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/735036.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【python】linux下安装chromedriver

首先&#xff0c;安装selenium模块 pip3 install selenium查看系统内chrome版本&#xff1a; google-chrome --version 根据谷歌浏览器版本下载对应的浏览器驱动版本&#xff1a; wget https://storage.googleapis.com/chrome-for-testing-public/126.0.6478.114/linux64/ch…

2024年6月大众点评成都餐饮店铺POI分析22万家

2024年6月大众点评成都餐饮店铺POI共有221002家 店铺POI点位示例&#xff1a; 店铺id CACuqlcUQApLA7Ki 店铺名称 峨眉山豆腐脑(百吉街店) 十分制服务评分 7.3 十分制环境评分 7.5 十分制划算评分 7.1 人均价格 18 评价数量 38 店铺地址 百吉街86号1层 大类 美食 中类…

Day7 —— 大数据技术之Hive

Hive快速入门系列 Hive的概述什么是Hive&#xff1f;使用Hive的原因 Hive架构Hive安装Hive配置文件修改启动Hive以命令行方式启动&#xff08;在$HIVE_HOME/bin目录下&#xff09;以JDBC连接启动&#xff08;beeline方式连接&#xff09; Hive基本操作Hive数据库操作Hive表操作…

天气冷电脑不能启动找不到硬盘

https://diy.zol.com.cn/2004/0611/101994.shtml

为什么 JakeWharton 建议:App 只要用到一个 Activity ?

我们来看看这条回答都提到了哪些内容&#xff0c;对 Activity 和 Fragment 之间的爱恨情仇有何独到的见解&#xff0c;凭什么能得到 JakeWharton 本尊的青睐有加。 因为 Activity 是一个程序入口。你可以将其视为 app 的一个 main 函数。站在用户的立场上&#xff0c;通常你进入…

ARM功耗管理软件之WFIWFE

安全之安全(security)博客目录导读 思考&#xff1a;功耗管理软件栈及示例&#xff1f;WFI&WFE&#xff1f;时钟&电源树&#xff1f;DVFS&AVS&#xff1f; ARM功耗管理精讲与实战汇总参见&#xff1a;Arm功耗管理精讲与实战

IO模型详解

阻塞IO模型 假设应用程序的进程发起IO调用&#xff0c;但是如果内核的数据还没准备好的话&#xff0c;那应用程序进程就一直在阻塞等待&#xff0c;一直等到内核数据准备好了&#xff0c;从内核拷贝到用户空间&#xff0c;才返回成功提示&#xff0c;此次IO操作&#xff0c;称…

OkHttp框架源码深度剖析【Android热门框架分析第一弹】

OkHttp介绍 OkHttp是当下Android使用最频繁的网络请求框架&#xff0c;由Square公司开源。Google在Android4.4以后开始将源码中的HttpURLConnection底层实现替换为OKHttp&#xff0c;同时现在流行的Retrofit框架底层同样是使用OKHttp的。 源码传送门 优点: 支持Http1、Http…

基于Java的农机电招平台系统

你好呀&#xff0c;我是计算机学姐码农小野&#xff01;如果你对农机电招平台系统感兴趣或有相关开发需求&#xff0c;可以私信联系我。 开发语言 Java 数据库 MySQL 技术 B/S结构&#xff0c;SpringBoot框架 工具 Eclipse&#xff0c;Navicat&#xff0c;Tomcat8.0 系…

24年下半年各省自考报名时间汇总

24年下半年各省自考报名时间汇总

C语言 | Leetcode C语言题解之第174题地下城游戏

题目&#xff1a; 题解&#xff1a; int calculateMinimumHP(int** dungeon, int dungeonSize, int* dungeonColSize) {int n dungeonSize, m dungeonColSize[0];int dp[n 1][m 1];memset(dp, 0x3f, sizeof(dp));dp[n][m - 1] dp[n - 1][m] 1;for (int i n - 1; i >…

利用JAVA语言调用GLM-4接口实战指南

一、什么是API接口 API&#xff08;Application Programming Interface&#xff0c;应用程序编程接口&#xff09;是一种软件接口&#xff0c;它定义了不同应用程序之间如何相互通信、交互。API接口分为很多种&#xff0c;常见的有Web API&#xff0c;数据库API&#xff0c;操…

【非常实验】如何在移动设备上运行 Docker?

本章就从在 DevOps 中最基本但也是最强大的工具 Docker 开始。最近,我在尝试更多Termux的可能性,于是就想着试试Docker适不适合arm架构。 我用的是天玑9000芯片,而不是高通,所以显示不出来 Qualcomm。所以我决定从在手机上运行 docker 开始,但这可能吗?让我们一起来看看吧…

高性能并行计算华为云实验三:蒙特卡罗算法实验

目录 一、实验目的 二、实验说明 三、实验过程 3.1 创建蒙特卡罗算法源码 3.2 Makefile的创建与编译 3.3 主机文件配置与运行监测​​​​​​​ 四、实验结果与分析 4.1 原教程对应的实验结果 4.2 改进后的实验结果 五、实验思考与总结 5.1 实验思考 5.2 实验总结…

从零实现GPT【1】——BPE

文章目录 Embedding 的原理训练特殊 token 处理和保存编码解码完整代码 BPE&#xff0c;字节对编码 Embedding 的原理 简单来说就是查表 # 解释embedding from torch.nn import Embedding import torch# 标准的正态分布初始化 也可以用均匀分布初始化 emb Embedding(10, 32) …

探索Agent AI智能体的未来

随着人工智能&#xff08;AI&#xff09;技术的飞速发展&#xff0c;Agent AI智能体正成为一种改变世界的新力量。这些智能体不仅在当前的技术领域中发挥着重要作用&#xff0c;而且在未来将以更深远的影响改变我们的生活、工作和社会结构。本文将探讨Agent AI智能体的现状、潜…

回顾今年的618大战:除了卷低价,还有别的出路吗?

今年的618刚刚落下帷幕&#xff0c;大促期间&#xff0c;一些电商平台纷纷备足马力、迎接挑战&#xff0c;反倒是一向领跑的淘宝京东公开表示&#xff0c;今年取消了618预售制。 互联网电商20年来&#xff0c;每年618、双11轮流登场&#xff0c;“低价大战”愈演愈烈&#xff0…

【C++】类和对象2.0

俺来写笔记了&#xff0c;哈哈哈&#xff0c;浅浅介绍类和对象的知识点&#xff01; 1.类的6个默认成员函数 俺们定义一个空类&#xff1a; class N {}; 似乎这个类N里面什么都没有&#xff0c;其实不是这样子的。这个空类有6个默认的成员函数 。 默认成员函数&#xff1a…

Android 你应该知道的学习资源 进阶之路贵在坚持

coderzheaven 覆盖各种教程&#xff0c;关于Android基本时案例驱动的方式。 非常推荐 thenewcircle 貌似是个培训机构&#xff0c;多数是收费的&#xff0c;不过仍然有一些free resources值得你去挖掘。 coreservlets 虽然主打不是android&#xff0c;但是android的教程也​ 是…

【前端技术】标签页通讯localStorage、BroadcastChannel、SharedWorker的技术详解

&#x1f604; 19年之后由于某些原因断更了三年&#xff0c;23年重新扬帆起航&#xff0c;推出更多优质博文&#xff0c;希望大家多多支持&#xff5e; &#x1f337; 古之立大事者&#xff0c;不惟有超世之才&#xff0c;亦必有坚忍不拔之志 &#x1f390; 个人CSND主页——Mi…