Java版Flink使用指南——从RabbitMQ中队列中接入消息流

大纲

  • 创建RabbitMQ队列
  • 新建工程
    • 新增依赖
    • 编码
      • 设置数据源配置
      • 读取、处理数据
      • 完整代码
    • 打包、上传和运行任务
    • 测试
  • 工程代码

在《Java版Flink使用指南——安装Flink和使用IntelliJ制作任务包》一文中,我们完成了第一个小型Demo的编写。例子中的数据是代码预先指定的。而现实中,数据往往来源于外部。本文我们将尝试Flink从RabbitMQ中读取数据,然后输出到日志中。
关于RabbitMQ的知识可以参阅《RabbitMQ实践》。

创建RabbitMQ队列

我们创建一个Classic队列data.from.rbtmq。注意要选择Durable类型,这是后续用的默认连接器的限制。
具体方法见《RabbitMQ实践——在管理后台测试消息收发功能》。
在这里插入图片描述

后续我们将在后台通过默认交换器,给这个队列新增消息。

新建工程

我们在IntelliJ中新建一个工程DataFromRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java。
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-rabbitmq</artifactId>
	<version>3.0.1-1.17</version>
</dependency>

编码

设置数据源配置

String queueName = "data.from.rbtmq";
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;

// create a RabbitMQ source
RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder()
		.setHost(host)
		.setPort(port)
		.setUserName(username)
		.setPassword(password)
		.setVirtualHost(virtualHost)
		.build();

RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());

读取、处理数据

下面代码通过addSource添加RabbitMQ数据源。注意,不能使用fromSource方法,是因为RMQSource没有实现SourceFunction方法。

final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);

stream.print().name(username + "'s data from " + queueName);

完整代码

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.example;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

/**
 * Skeleton for a Flink DataStream Job.
 *
 * <p>For a tutorial how to write a Flink application, check the
 * tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.
 *
 * <p>To package your application into a JAR file for execution, run
 * 'mvn clean package' on the command line.
 *
 * <p>If you change the name of the main class (with the public static void main(String[] args))
 * method, change the respective entry in the POM.xml file (simply search for 'mainClass').
 */
public class DataStreamJob {

	public static void main(String[] args) throws Exception {
		// Sets up the execution environment, which is the main entry point
		// to building Flink applications.
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		String queueName = "data.from.rbtmq";
		String host = "172.21.112.140"; // IP of the rabbitmq server
		int port = 5672;
		String username = "admin";
		String password = "fangliang";
		String virtualHost = "/";
		int parallelism = 1;

		// create a RabbitMQ source
		RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder()
				.setHost(host)
				.setPort(port)
				.setUserName(username)
				.setPassword(password)
				.setVirtualHost(virtualHost)
				.build();

		RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());
		final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);

		stream.print().name(username + "'s data from " + queueName);

		env.execute("Flink Java API Skeleton");
	}
}

打包、上传和运行任务

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

测试

在RabbitMQ后台的默认交换器中,发布一条消息到data.from.rbtmq
在这里插入图片描述
然后使用下面指令可以看到Flink读取到消息并执行了print方法

tail log/flink-*-taskexecutor-*.out

==> flink-fangliang-taskexecutor-0-fangliang.out <==
data from http://172.21.112.140:15672/#/exchanges/%2F/amq.default

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/785985.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

74HC165芯片验证

目录 0x01 74HC165芯片介绍0x02 编程实现 0x01 74HC165芯片介绍 74HC165的引脚定义如下&#xff0c;长这个样子 ABCDEFGH是它的八个输入引脚&#xff0c;例如你可以将它连接按键&#xff0c;让它来读取8个按键值。也可以将他级联其它的74165&#xff0c;无需增加单片机GPIO引…

Nginx+Tomcat群集

一.实验环境 二.安装多台Tomcat服务器 1.在安装Tomcat之前必须先安装JDK。 JDK的全称是Java Development Kit&#xff0c;是sun公司提供的JAVA语言的软件开发工具包&#xff0c;其中包含Java虚拟机&#xff08;JVM&#xff09;&#xff0c;编写好的Java源程序经过编译可形成Ja…

bert-base-chinese模型离线使用案例

import torch import torch.nn as nn from transformers import BertModel, BertTokenizer# 通过torch.hub(pytorch中专注于迁移学的工具)获得已经训练好的bert-base-chinese模型 # model torch.hub.load(huggingface/pytorch-transformers, model, bert-base-chinese) model…

Python 定义和调用函数

在Python编程中&#xff0c;函数是组织和重用代码的一种重要方式。函数可以提高代码的可读性和维护性&#xff0c;并且可以避免重复代码。 1. 定义函数 在Python中&#xff0c;函数使用def关键字定义。一个简单的函数定义包括函数名、参数列表和函数体。以下是一个基本的函数…

[Python爬虫] 抓取京东商品数据||京东商品API接口采集

本文结构&#xff1a; 一、引言 二、代码分享 三、问题总结 引言 这两天因为一些需求&#xff0c;研究了一下如何爬取京东商品数据。最开始还是常规地使用selenium库进行商品页的商品抓取&#xff0c;后来因为想要获取优惠信息&#xff0c;只能进入到商品详情页进行抓取&#x…

苏东坡传-读书笔记十一

苏东坡对写作与风格所表示的意见最为清楚。他说做文章“大略如行云流水&#xff0c;初无定质&#xff0c;但常行于所当行&#xff0c;常止于所不可不止。文理自然&#xff0c;姿态横生。孔子曰&#xff1a;‘言之不文&#xff0c;行而不远。’又曰&#xff1a;‘辞达而已矣。’…

【Linux】:进程等待

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关Linux进程等待的相关知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门…

电竞玩家的云端盛宴!四大云电脑平台:ToDesk、顺网云、青椒云、极云普惠云实测大比拼

本文目录 一、云电脑概念及市场需求二、云电竞性能测试2.1 ToDesk云电脑2.2 顺网云2.3 青椒云2.4 极云普惠云电脑 三、四大云电脑平台综合配置对比3.1 CPU处理器3.2 GPU显卡3.3 内存 四、总结 一、云电脑概念及市场需求 在数字化时代的推动下&#xff0c;云计算技术日益成熟&a…

JAVA 代码块介绍

一、基本介绍 代码化块又称为初始化块&#xff0c;属于类中的成员[即 是类的一部分]&#xff0c;类似于方法&#xff0c;将逻辑语句封装在方法体中&#xff0c;通过包围起来。 但和方法不同&#xff0c;没有方法名&#xff0c;没有返回&#xff0c;没有参数&#xff0c;只有方…

Java面试八股之MySQL支持哪些数据类型

MySQL支持哪些数据类型 MySQL支持多种数据类型&#xff0c;这些类型可以大致分为三大类&#xff1a;数值类型、日期/时间类型和字符串类型。下面是一些常见的数据类型及其用途&#xff1a; 数值类型 整数类型&#xff1a; TINYINT&#xff1a;通常占用1字节。 SMALLINT&am…

注册商标为什么要先查询

注册商标为什么要先查询 在知识产权日益受到重视的今天&#xff0c;商标的注册成为了许多企业和个人保护其品牌价值和市场地位的重要手段。然而&#xff0c;商标注册并非一蹴而就的过程&#xff0c;其中一个关键的步骤就是商标查询&#xff0c;也就是我们通常所说的“商标检索…

STM32CubeMX如何配置生成项目以及安装包

目录 一、STM32CubeMX介绍 二、用STM32CubeMX生成项目 1.创建项目 2.定义引脚 3.配置时钟 4.保存项目 5.生成项目 6.打开项目 一、STM32CubeMX介绍 STM32CubeMX是STM32Cube工具家族中的一员&#xff0c;专门为STM32微控制器的开发提供便利。它是一款图形化工具&#xf…

新加坡工作和生活指北:租房篇

本文首发于公众号 Keegan小钢 前段时间已经分享了工作篇&#xff0c;现在接着聊聊生活篇。因为生活这块涉及到多个方面&#xff0c;内容比较多&#xff0c;所以我再细分了一下&#xff0c;本篇先聊聊租房。 先来看看新加坡的地区分布图&#xff0c;如下&#xff1a; 上图将新加…

【C语言】指针(3):探索-不同类型指针变量

目录 一、字符指针变量 二、数组指针变量 三、二维数组传参的本质 四、函数指针变量 4.1 函数指针变量 4.2 函数指针变量的使用 4.3 函数指针变量的拓展 五、函数指针数组 六、转移表的应用 通过深入理解指针&#xff08;1&#xff09;和深入理解指针&#xff08;2&am…

【北京迅为】《i.MX8MM嵌入式Linux开发指南》-第一篇 嵌入式Linux入门篇-第十二章 Linux 权限管理

i.MX8MM处理器采用了先进的14LPCFinFET工艺&#xff0c;提供更快的速度和更高的电源效率;四核Cortex-A53&#xff0c;单核Cortex-M4&#xff0c;多达五个内核 &#xff0c;主频高达1.8GHz&#xff0c;2G DDR4内存、8G EMMC存储。千兆工业级以太网、MIPI-DSI、USB HOST、WIFI/BT…

Python 插入、替换、提取、或删除Excel中的图片

Excel是主要用于处理表格和数据的工具&#xff0c;我们也能在其中插入、编辑或管理图片&#xff0c;为工作表增添视觉效果&#xff0c;提升报告的吸引力。本文将详细介绍如何使用Python操作Excel中的图片&#xff0c;包含以下4个基础示例&#xff1a; 文章目录 Python 在Excel…

Autogen基本使用介绍

文章目录 一&#xff0c;Build1&#xff0c;Skill2&#xff0c;Models3&#xff0c;agents4,workflow 二&#xff0c;Playground 本文唯一目的就是介绍一下Autogen Studio的基本的使用。 打开这个网页以后&#xff0c;看到它有2个菜单&#xff0c;分别是&#xff1a; BuildPla…

07-7.3.2 平衡二叉树(AVL)

&#x1f44b; Hi, I’m Beast Cheng &#x1f440; I’m interested in photography, hiking, landscape… &#x1f331; I’m currently learning python, javascript, kotlin… &#x1f4eb; How to reach me --> 458290771qq.com 喜欢《数据结构》部分笔记的小伙伴可以…

使用 Qt 和 ECharts 进行数据可视化

文章目录 示例图表预览折线图散点图柱状图使用 Qt 和 ECharts 进行数据可视化一、准备工作1. 安装 Qt2. 准备 ECharts二、在 Qt 中使用 ECharts1. 创建 Qt 项目2. 配置项目文件3. 在 UI 中添加 WebEngineView4. 加载 ECharts三、创建折线图、散点图和柱状图1. 折线图2. 散点图3…

工作流之战: Flowable vs. Camunda vs. Activiti

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 &#x1f38f;&#xff1a;你只管努力&#xff0c;剩下的交给时间 &#x1f3e0; &#xff1a;小破站 工作流之战: Flowable vs. Camunda vs. Activiti 前言功能特性对比架构设计分析性能比较使用场景…