Java版Flink使用指南——分流导出

大纲

  • 新建工程
  • 编码
    • Pom.xml
    • 自定义无界流
    • 分流
  • 测试
  • 工程代码

在之前的案例中,我们一直使用的是单个Sink来做数据的输出。实际上,Flink是支持多个输出流的。本文我们就来讲解如何在Flink数据输出时做分流处理。
我们将基于《Java版Flink使用指南——自定义无界流生成器》的输入流,按生成数字的奇偶性,将其分流输出到不同的RabbitMQ队列中。

新建工程

我们新建一个名字叫MultiSinkTo的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

编码

Pom.xml

因为我们要往RabbitMQ中输出,所以需要引入相关连接组件。

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-rabbitmq</artifactId>
			<version>3.0.1-1.17</version>
		</dependency>

自定义无界流

新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
这块的代码可以见《Java版Flink使用指南——自定义无界流生成器》
它会每隔1秒钟生成一个递增的数字

package org.example.generator;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        long count = 0L;
        while (isRunning) {
            Thread.sleep(1000); // Simulate delay
            ctx.collect(count++); // Emit data
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
        System.out.println("UnBoundedStreamGenerator canceled");
    }
}

分流

我们通过下面的代码生成数据流

		DataStreamSource<Long> longDataStreamSource = env.addSource(new UnBoundedStreamGenerator());

然后奇数发布到odd.data.to.rbtmq队列;偶数发布到even.data.to.rbtmq。
分流主要是通过filter来区分数据,然后针对不同的数据addSink来发布到不同的队列。
如果不需要区分数据,只是将相同的数据发布到不同的目的地,则可以直接多次addSink来达成。

		String host = "172.25.103.252"; // IP of the rabbitmq server
		int port = 5672;
		String username = "admin";
		String password = "fangliang";
		String virtualHost = "/";
		RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder()
				.setHost(host)
				.setPort(port)
				.setUserName(username)
				.setPassword(password)
				.setVirtualHost(virtualHost)
				.build();

		int parallelism = 1;
		
		String oddSinkQueueName = "odd.data.to.rbtmq"; 
		RMQSink<String> oddRMQSink = new RMQSink<>(rmqConnectionConfig, oddSinkQueueName, new SimpleStringSchema());
		longDataStreamSource.filter(value -> value % 2 != 0).map(Object::toString).addSink(oddRMQSink).setParallelism(parallelism).name("oddSink");

		String evenSinkQueueName = "even.data.to.rbtmq";
		RMQSink<String> evenRMQSink = new RMQSink<>(rmqConnectionConfig, evenSinkQueueName, new SimpleStringSchema());
		longDataStreamSource.filter(value -> value % 2 == 0).map(Object::toString).addSink(evenRMQSink).setParallelism(parallelism).name("evenSink");

测试

执行一段时间后,我们看到两个队列相序增加
在这里插入图片描述
奇数队列
在这里插入图片描述
偶数队列
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/789173.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

人工智能(AI)在医疗行业的应用前景

人工智能&#xff08;AI&#xff09;在医疗行业的应用前景十分广阔&#xff0c;有望彻底改变医疗行业的各个方面。需要注意的是&#xff0c;AI在医疗领域的应用也存在一些潜在的风险和挑战&#xff0c;例如算法偏见、数据隐私和安全、伦理问题等。在开发和应用AI医疗产品时&…

苹果手机短信功能停用怎么恢复?一分钟快速解决!

在使用苹果手机的过程中&#xff0c;可能会遇到短信功能突然停用的情况&#xff0c;这可能导致你无法发送或接收短信&#xff0c;影响日常通讯。这个问题可能由多种原因引起&#xff0c;如网络设置、软件冲突或运营商问题。 短信功能停用怎么恢复&#xff1f;不必担心&#xf…

【Superset】dashboard 自定义URL

URL设置 在发布仪表盘&#xff08;dashboard&#xff09;后&#xff0c;可以通过修改看板属性中的SLUG等&#xff0c;生成url 举例&#xff1a; http://localhost:8090/superset/dashboard/test/ 参数设置 以下 URL 参数可用于修改仪表板的呈现方式&#xff1a;此处参考了官…

django农产品销售系统-计算机毕业设计源码65418

基于HTML5的农产品销售系统的设计与实现 摘 要 本文针对农产品销售系统存在的传统销售方式效率低下、信息交流困难等问题&#xff0c;基于HTML5进行了系统的设计与实现。首先&#xff0c;通过对当前农产品销售系统的现状和问题进行分析&#xff0c;提出了基于HTML5的系统设计方…

蓝牙人员定位精准吗?是否会对人体有伤害?

不知道大家现在使用的蓝牙人员定位系统都是什么样的呢&#xff1f;其实就出行而言&#xff0c;使用GPS定位也就是足够了的&#xff0c;而且目前的定位相对也比较精准了&#xff0c;效果还是很不错的。但是如果说是室内定位&#xff0c;很显然常规的定位系统是无法满足使用需求的…

C语言 结构体和共用体——结构体类型与结构体变量

目录 问题的提出 数组的解决方法 我们希望的内存分配图 如何声明一个结构体类型&#xff1f; 如何定义一个结构体变量&#xff1f; 用typedef给数据类型定义一个别名 如何定义一个结构体变量&#xff1f; 结构体变量的初始化 问题的提出 数组的解决方法 我们希望的内存…

PostgREST API 安装及基础使用

PostgREST是一个独立的Web服务器&#xff0c;它将PostgreSQL数据库转换为RESTful API。它提供基于基础数据库的结构自定义的API。 PostgREST安装 首先访问Releases PostgREST/postgrest (github.com)&#xff0c;根据安装平台选择下载的源码。比如我现在的设备是Mac但是我的…

man手册的安装和使用

man手册 - HQ 文章目录 man手册 - HQ[toc]man手册的使用Linux man中文手册安装man中文手册通过安装包安装通过apt安装 配置man中文手册README使用说明配置步骤 man手册的使用 首先man分为八个目录&#xff0c;每个目录用一个数字表示 1.可执行程序2.系统调用3.库函数4.特殊文…

身份证二要素API,实名认证领域的创新之选

身份证二要素API&#xff0c;是一种实名认证领域的创新解决方案。通过输入姓名和身份证号&#xff0c;该API可以通过官方权威渠道进行核查&#xff0c;实时校验二要素的一致性&#xff0c;并返回生日、性别、籍贯等详细信息。这篇博文将详细介绍身份证二要素API的使用方法&…

c语言数据结构--链表

实验内容&#xff1a; 编程实现链表下教材第二章定义的线性表的基本操作&#xff0c;最好用菜单形式对应各个操作&#xff0c;使其编程一个完整的小软件。 实验步骤&#xff1a; 1.按照实验要求编写代码&#xff0c;构造链表&#xff0c;初始化链表 2.再插入元素1 2 3 3.分别…

书生大模型实战营(暑假场)-入门岛-第一关

书生大模型实战营暑假场重磅开启&#xff01;&#xff0c;这场学习路线看起来很好玩呀&#xff0c;闯关学习既能学到知识又有免费算力可得&#xff0c;太良心啦。感兴趣的小伙伴赶快一起报名学习吧&#xff01;&#xff01;&#xff01; 关卡任务 好的&#xff0c;我们废话不多…

如何在忘记密码的情况下重置Realme手机?

欢迎阅读我们关于如何在有或没有密码的情况下重置Realme手机的综合指南。无论您是忘记了密码&#xff0c;还是只是需要将设备恢复到出厂设置&#xff0c;我们都会为您提供所需的专业提示和技术专长。 发现分步说明、专家提示和行之有效的方法&#xff0c;轻松重新控制您的 Rea…

python库(9):prettytable库快速实现ASCII表格

下面介绍一个快速制作ASCII表格库——prettytable&#xff0c;可以方便地制作简单表格。 1 安装prettytable pip install -i https://pypi.tuna.tsinghua.edu.cn/simple prettytable 结果如下&#xff1a; 2 代码实例 from prettytable import PrettyTable table PrettyTa…

How do I format markdown chatgpt response in tkinter frame python?

题意&#xff1a;怎样在Tkinter框架中使用Python来格式化Markdown格式的ChatGPT响应&#xff1f; 问题背景&#xff1a; Chatgpt sometimes responds in markdown language. Sometimes the respond contains ** ** which means the text in between should be bold and ### te…

CentOS 6.5配置国内在线yum源和制作openssh 9.8p1 rpm包 —— 筑梦之路

CentOS 6.5比较古老的版本了&#xff0c;而还是有一些古老的项目仍然在使用。 环境说明 1. 更换国内在线yum源 CentOS 6 在线可用yum源配置——筑梦之路_centos6可用yum源-CSDN博客 cat > CentOS-163.repo << EOF [base] nameCentOS-$releasever - Base - 163.com …

ChatGLM-6B入门

ChatGLM-6B ChatGLM-6B 一、介绍 ChatGLM-6B 是一个开源的、支持中英双语的对话语言模型&#xff0c;基于 General Language Model (GLM) 架构&#xff0c;具有 62 亿参数。结合模型量化技术&#xff0c;用户可以在消费级的显卡上进行本地部署&#xff08;INT4 量化级别下最…

数据结构(初阶1.复杂度)

文章目录 一、复杂度概念 二、时间复杂度 2.1 大O的渐进表示法 2.2 时间复杂度计算示例 2.2.1. // 计算Func2的时间复杂度&#xff1f; 2.2.2.// 计算Func3的时间复杂度&#xff1f; 2.2.3.// 计算Func4的时间复杂度&#xff1f; 2.2.4.// 计算strchr的时间复杂度&#xff1f; …

智能车载防窒息系统设计

摘要 随着汽车行业的快速发展&#xff0c;车辆安全问题越来越受到人们的关注。其中&#xff0c;车载防窒息系统是一项重要的安全设备。本论文基于STM32单片机&#xff0c;设计了一种智能车载防窒息系统。该系统主要包括氧气浓度检测模块、温湿度检测模块、声音检测模块、光线检…

URPF简介

定义 URPF&#xff08;Unicast Reverse Path Forwarding&#xff09;是单播逆向路径转发的简称&#xff0c;其主要功能是防止基于源IP地址欺骗的网络攻击行为。 目的 拒绝服务DoS&#xff08;Denial of Service&#xff09;攻击是一种阻止连接服务的网络攻击。DoS的攻击方式…

QT开发积累——qt中的注释和多行注释的几种方式,函数方法注释生成

目录 引出qt中的注释和多行注释方法的注释生成 总结日积月累&#xff0c;开发集锦方法参数加const和不加const的区别方法加static和不加static的区别Qt遍历list提高效率显示函数的调用使用&与不使用&qt方法的参数中使用&与不使用&除法的一个坑 项目创建相关新建…