Java版Flink使用指南——定制RabbitMQ数据源的序列化器

大纲

  • 新建工程
    • 新增依赖
    • 数据对象
    • 序列化器
    • 接入数据源
  • 测试
    • 修改Slot个数
    • 打包、提交、运行
  • 工程代码

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们从RabbitMQ队列中读取了字符串型数据。如果我们希望读取的数据被自动化转换为一个对象,则需要定制序列化器。本文我们就将讲解数据源序列化器的定制方法。

新建工程

我们在IntelliJ中新建一个工程SourceSerializer。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-rabbitmq</artifactId>
			<version>3.0.1-1.17</version>
		</dependency>

新增Json库依赖

		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-core</artifactId>
			<version>2.17.1</version>
		</dependency>

新增lombok库,主要是为了使用它的一些注解

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.32</version>
            <scope>provided</scope>
        </dependency>

数据对象

我们新建一个简单的数据对象SampleData
src/main/java/org/example/vo/SampleData.java

package org.example.vo;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class SampleData {
    private Long id;
    private String name;
    private int age;
    private Boolean married;
    private Double salary;

    public String toJson() throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.writeValueAsString(this);
    }

    public static SampleData fromJson(String json) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readValue(json, SampleData.class);
    }
}

这个方法包含两个方法,一个是将SampleData 转换成字符串,另一个是将字符串转成SampleData 对象。

序列化器

我们定义的数据源序列化器要实现AbstractDeserializationSchema接口,主要是通过deserialize方法将二进制数组转换成SampleData 对象。

src/main/java/org/example/serializer/SampleDataRabbitMQSourceSerializer.java

package org.example.serializer;

import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.example.vo.SampleData;

import java.io.IOException;

public class SampleDataRabbitMQSourceSerializer extends AbstractDeserializationSchema<SampleData> {
    @Override
    public SampleData deserialize(byte[] message) throws IOException {
        return SampleData.fromJson(new String(message));
    }

    @Override
    public boolean isEndOfStream(SampleData nextElement) {
        return false;
    }

    @Override
    public TypeInformation<SampleData> getProducedType() {
        return TypeInformation.of(SampleData.class);
    }
}

接入数据源

我们在《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》一文中,往data.to.rbtmq对了写入了大量SampleData 数据。这次我们将其作为数据源来做测试
这次我们在创建RMQSource时传入序列化器SampleDataRabbitMQSourceSerializer。它会将从RabbitMQ获取的数据转换成SampleData对象。
然后我们获取所有“已婚”(filter.getMarried() == true)的数据,将其打印到日志中。

		String queueName = "data.to.rbtmq";
		String host = "172.21.112.140"; // IP of the rabbitmq server
		int port = 5672;
		String username = "admin";
		String password = "fangliang";
		String virtualHost = "/";
		int parallelism = 1;

		// create a RabbitMQ source
		RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder()
				.setHost(host)
				.setPort(port)
				.setUserName(username)
				.setPassword(password)
				.setVirtualHost(virtualHost)
				.build();

		RMQSource<SampleData> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SampleDataRabbitMQSourceSerializer());
		final DataStream<SampleData> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);

		stream.filter(filter -> filter.getMarried() == true).print().name(username + "'s sink to stdout").setParallelism(parallelism);

测试

修改Slot个数

由于我们要运行两个流式计算任务,于是需要两个Slot。

vim conf/config.yaml 

将numberOfTaskSlots的值改成2。

打包、提交、运行

我们将本例和《Java版Flink使用指南——定制RabbitMQ的Sink序列化器》中的包都提交运行
在这里插入图片描述
然后在日志中可以看到“已婚”的数据都在输出

 tail -f log/*

在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/785238.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JAVA案例ATM系统

一案例要求&#xff1a; 首先完成ATM的用户登录和用户开户两个大功能&#xff0c;用户开户有账户名&#xff0c;性别&#xff0c;账户密码&#xff0c;确认密码&#xff0c;每次取现额度&#xff0c;并且随机生成一个7位数的账号&#xff0c;用户登录功能有查询&#xff0c;存…

k8s 部署 metribeat 实现 kibana 可视化 es 多集群监控指标

文章目录 [toc]环境介绍老(来)板(把)真(展)帅(示)helm 包准备配置监控集群获取集群 uuid生成 api_key配置 values.yaml 配置 es 集群获取集群 uuid 和 api_key配置 values.yaml 查看监控 缺少角色的报错 开始之前&#xff0c;需要准备好以下场景 一套 k8s 环境 k8s 内有两套不同…

Aqara 发布多款智能照明新品,引领空间智能新时代

7月8日&#xff0c;全球 IoT 独角兽品牌 Aqara 以“光&#xff0c;重塑空间想象”为主题&#xff0c;举办了线上智能照明新品沟通会。 会上&#xff0c;Aqara 正式发布一系列引领行业的智能照明新品&#xff0c;包括银河系列轨道灯 V1 以及繁星系列妙控旋钮 V1 等&#xff0c;…

Hospital Management System v4.0 SQL 注入漏洞(CVE-2022-24263)

前言 CVE-2022-24263 是一个影响 Hospital Management System (HMS) v4.0 的 SQL 注入漏洞。这个漏洞允许攻击者通过注入恶意 SQL 代码来获取数据库的敏感信息&#xff0c;甚至可能控制整个数据库。以下是对这个漏洞的详细介绍&#xff1a; 漏洞描述 在 Hospital Management…

使用Keil 点亮LED灯 F103ZET6

1.新建项目 不截图了 2.startup_stm32f10x_hd.s Keil\Packs\Keil\STM32F1xx_DFP\2.2.0\Device\Source\ARM 搜索startup_stm32f10x_hd.s 复制到项目路径&#xff0c;双击Source Group 1 3.项目文件夹新建stm32f10x.h&#xff0c; 新建文件main.c #include "stm32f10x…

OS-HACKNOS-2.1

确定靶机IP地址 扫描靶机开放端口信息 目录扫描 访问后发现个邮箱地址 尝试爆破二级目录 确定为wordpress站 利用wpscan进行漏洞扫描 #扫描所有插件 wpscan --url http://192.168.0.2/tsweb -e ap 发现存在漏洞插件 cat /usr/share/exploitdb/exploits/php/webapps/46537.txt…

Camera Raw:裁剪

Camera Raw 的裁剪 Crop面板提供了裁剪、旋转、翻转、拉直照片等功能&#xff0c;通过它们可以更精确地调整照片的视角和范围&#xff0c;以达到最佳二次构图的视觉效果。 快捷键&#xff1a;C ◆ ◆ ◆ 使用方法与技巧 1、使用预设 选择多种裁剪预设&#xff08;如 1:1、16:…

前端传到后端的data数组中有些属性值为空

将前端输入框中的值全部放入data中传入后端&#xff0c;但是在后端查看发现后端接收到的数据有些属性值为空。 第一种情况&#xff1a;只有第一个属性为空&#xff0c;其余属性接收正常 可能原因&#xff1a;后端用来接收的 比如前端发送数据&#xff1a; 实际上前端发送的数…

防火墙详解(USG6000V)

0、防火墙组网模式 防火墙能够工作在三种模式下分别是路由模式、透明模式、旁路检测模式、混合模式 0.1、路由模式 路由模式&#xff1a;防火墙全部以第三层对外连接&#xff0c;即接口具有IP 地址。一般都用在防火墙是边界的场景下 防火墙需要的部署/配置&#xff1a; 接…

【Excel】 批量跳转图片

目录标题 1. CtrlA全选图片 → 右键 → 大小和属性2. 取消 锁定纵横比 → 跳转高度宽度 → 关闭窗口3. 最后一图拉到最后一单元格 → Alt吸附边框![](https://i-blog.csdnimg.cn/direct/d56ac1f41af54d54bb8c68339b558dd1.png)4. CtrlA全选图片 → 对齐 → 左对齐 → 纵向分布!…

C++初探究

概述 C可以追溯到1979年&#xff0c;C之父Bjarne Stroustrup在在使用C语言研发工作时发现C语言的不足&#xff0c;并想要将其改进&#xff0c;到1983年&#xff0c;Bjarne Stroustrup在C语言的基础上添加了面向对象编程的特性&#xff0c;设计出了C的雏形。 网址推荐 C官方文…

Java面试八股之MySQL主从复制机制简述

MySQL主从复制机制简述 MySQL的主从复制机制是一种数据复制方案&#xff0c;用于在多个服务器之间同步数据。此机制允许从一个服务器&#xff08;主服务器&#xff09;到一个或多个其他服务器&#xff08;从服务器&#xff09;进行数据的复制&#xff0c;从而增强数据冗余、提…

HTTP 请求走私漏洞详解

超详细的HTTP请求走私漏洞教程&#xff0c;看完还不会你来找我。 1. 简介 HTTP请求走私漏洞&#xff08;HTTP Request Smuggling&#xff09;发生在前端服务器&#xff08;也称代理服务器&#xff0c;一般会进行身份验证或访问控制&#xff09;和后端服务器在解析HTTP请求时&…

YASKAWA安川Σ-V系列伺服驱动器AC设计维护手侧

YASKAWA安川Σ-V系列伺服驱动器AC设计维护手侧

C#——序列化和反序列化概念

(1)序列化 在编程中&#xff0c;序列化是指将对象转换为可存储或传输的格式&#xff0c;例如将对象转换为 JSON 字符串或字节流。 (2)反序列化 在编程中&#xff0c;反序列化则是将存储或传输的数据转换回对象的过程。 序列化和反序列化经常用于数据的持久化、数据交换以及…

JAVA基础-----包装类,自动装箱、拆箱

一、包装类&#xff1a; Java中的数据类型总体上分为基本数据类型和引用数据类型。引用类型的数据可以通过对象的属性和方法来进行操作&#xff0c;但对于基本数据类型的数据&#xff0c;我们能不能像操作对象那样来操作呢&#xff1f;为了实现这个目标&#xff0c;Java为8种基…

WebOffice在线编微软Offfice,并以二进制流的形式打开Word文档

在日常办公场景中&#xff0c;我们经常会遇到这种场景&#xff1a;我们的合同管理系统的各种Word,excel,ppt数据都是以二进制数组的形式存储在数据库中&#xff0c;如何从数据库中读取二进制数据&#xff0c;以二进制数据作为参数&#xff0c;然后加载到浏览器的Office窗口&…

华为HCIP Datacom H12-821 卷30

1.单选题 以下关于OSPF协议报文说法错误的是? A、OSPF报文采用UDP报文封装并且端口号是89 B、OSPF所有报文的头部格式相同 C、OSPF协议使用五种报文完成路由信息的传递 D、OSPF所有报文头部都携带了Router-ID字段 正确答案&#xff1a;A 解析&#xff1a; OSPF用IP报…

每日一练全新考试模式解锁|考试升级

&#x1f64b;频繁有小伙伴咨询&#xff1a;我想举办一场历时一个月的答题活动&#xff0c;学生可以每天打开答题&#xff0c;活动完结后可以导出每天的答题成绩 此前我们都会让小伙伴创建30场考试&#xff0c;然后使用批量分享功能组合起来&#xff0c;对外分享一个链接就可以…

强化学习编程实战-2马尔可夫决策过程

2.1 从多臂赌博机到马尔可夫决策过程 如图2-1&#xff0c;图中A为多臂赌博机&#xff0c;B为一堆鸳鸯&#xff0c;其中左上角为雄性鸳鸯&#xff0c;右上角为雌性鸳鸯&#xff0c;B展示的任务是雄性鸳鸯绕过障碍物找到词性鸳鸯。跟多臂赌博机不同的是&#xff0c;雄性鸳鸯经过一…