Spring Cloud Stream实现数据流处理

1.什么是Spring Cloud Stream?

我看很多回答都是“为了屏蔽消息队列的差异,使我们在使用消息队列的时候能够用统一的一套API,无需关心具体的消息队列实现”。 这样理解是有些不全面的,Spring Cloud Stream的核心是Stream,准确来讲Spring Cloud Stream提供了一整套数据流走向(流向)的API, 它的最终目的是使我们不关心数据的流入和写出,而只关心对数据的业务处理 我们举一个例子:你们公司有一套系统,这套系统由多个模块组成,你负责其中一个模块。数据会从第一个模块流入,处理完后再交给下一个模块。对于你负责的这个模块来说,它的功能就是接收上一个模块处理完成的数据,自己再加工加工,扔给下一个模块。

module

我们很容易总结出每个模块的流程:

  1. 从上一个模块拉取数据
  2. 处理数据
  3. 将处理完成的数据发给下一个模块

其中流程1和3代表两个模块间的数据交互,这种数据交互往往会采用一些中间件(middleware)。比如模块1和模块2间数据可能使用的是kafka,模块1向kafka中push数据,模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitMQ。很明显,它们的功能都是一样的:提供数据的流向,让数据可以流入自己同时又可以从自己流出发给别人。但由于中间件的不同,需要使用不同的API。 为了消除这种数据流入(输入)和数据流出(输出)实现上的差异性,因此便出现了Spring Cloud Stream。

2.环境准备

采用docker-compose搭建kafaka环境

version: '3'

networks:
  kafka:
    ipam:
      driver: default
      config:
        - subnet: "172.22.6.0/24"

services:
  zookepper:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latest
    container_name: zookeeper-server
    restart: unless-stopped
    volumes:
      - "/etc/localtime:/etc/localtime"
    environment:
      ALLOW_ANONYMOUS_LOGIN: yes
    ports:
      - "2181:2181"
    networks:
      kafka:
        ipv4_address: 172.22.6.11

  kafka:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1
    container_name: kafka
    restart: unless-stopped
    volumes:
      - "/etc/localtime:/etc/localtime"
    environment:
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092
    ports:
      - "9092:9092"
    depends_on:
      - zookepper
    networks:
      kafka:
        ipv4_address: 172.22.6.12

  kafka-map:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-map
    container_name: kafka-map
    restart: unless-stopped
    volumes:
      - "./kafka/kafka-map/data:/usr/local/kafka-map/data"
    environment:
      DEFAULT_USERNAME: admin
      DEFAULT_PASSWORD: 123456
    ports:
      - "9080:8080"
    depends_on:                         
      - kafka
    networks:
      kafka:
        ipv4_address: 172.22.6.13

run

docker-compose -f docker-compose-kafka.yml -p kafka up -d

kafka-map

https://github.com/dushixiang/kafka-map

  • 访问:http://127.0.0.1:9080
  • 账号密码:admin/123456

3.代码工程

stream

 实验目标

  1. 生成UUID并将其发送到Kafka主题batch-in
  2. batch-in主题接收UUID的批量消息,移除其中的数字,并将结果发送到batch-out主题。
  3. 监听batch-out主题并打印接收到的消息。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring-cloud-stream-kafaka</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- Spring Boot Starter Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

    </dependencies>

</project>

处理流

/*
 * Copyright 2023 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.et;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;

/**
 * @author Steven Gantz
 */
@SpringBootApplication
public class CloudStreamsFunctionBatch {

   public static void main(String[] args) {
      SpringApplication.run(CloudStreamsFunctionBatch.class, args);
   }

   @Bean
   public Supplier<UUID> stringSupplier() {
      return () -> {
         var uuid = UUID.randomUUID();
         System.out.println(uuid + " -> batch-in");
         return uuid;
      };
   }

   @Bean
   public Function<List<UUID>, List<Message<String>>> digitRemovingConsumer() {
      return idBatch -> {
         System.out.println("Removed digits from batch of " + idBatch.size());
         return idBatch.stream()
            .map(UUID::toString)
            // Remove all digits from the UUID
            .map(uuid -> uuid.replaceAll("\\d",""))
            .map(noDigitString -> MessageBuilder.withPayload(noDigitString).build())
            .toList();
      };
   }

   @KafkaListener(id = "batch-out", topics = "batch-out")
   public void listen(String in) {
      System.out.println("batch-out -> " + in);
   }

}
  • 定义一个名为stringSupplier的Bean,它实现了Supplier<UUID>接口。这个方法生成一个随机的UUID,并打印到控制台,表示这个UUID将被发送到batch-in主题。
  • 定义一个名为digitRemovingConsumer的Bean,它实现了Function<List<UUID>, List<Message<String>>>接口。这个方法接受一个UUID的列表,打印出处理的UUID数量,然后将每个UUID转换为字符串,移除其中的所有数字,最后将结果封装为消息并返回。
  • 使用@KafkaListener注解定义一个Kafka监听器,监听batch-out主题。当接收到消息时,调用listen方法并打印接收到的消息内容。

配置文件

spring:
  cloud:
    function:
      definition: stringSupplier;digitRemovingConsumer
    stream:
      bindings:
        stringSupplier-out-0:
          destination: batch-in
        digitRemovingConsumer-in-0:
          destination: batch-in
          group: batch-in
          consumer:
            batch-mode: true
        digitRemovingConsumer-out-0:
          destination: batch-out
      kafka:
        binder:
          brokers: localhost:9092
        bindings:
          digitRemovingConsumer-in-0:
            consumer:
              configuration:
                # Forces consumer to wait 5 seconds before polling for messages
                fetch.max.wait.ms: 5000
                fetch.min.bytes: 1000000000
                max.poll.records: 10000000

参数解释

spring:
  cloud:
    function:
      definition: stringSupplier;digitRemovingConsumer
  • spring.cloud.function.definition:定义了两个函数,stringSupplierdigitRemovingConsumer。这两个函数将在应用程序中被使用。
    stream:
      bindings:
        stringSupplier-out-0:
          destination: batch-in
  • stream.bindings.stringSupplier-out-0.destination:将stringSupplier函数的输出绑定到Kafka主题batch-in
        digitRemovingConsumer-in-0:
          destination: batch-in
          group: batch-in
          consumer:
            batch-mode: true
  • stream.bindings.digitRemovingConsumer-in-0.destination:将digitRemovingConsumer函数的输入绑定到Kafka主题batch-in
  • group: batch-in:指定消费者组为batch-in,这意味着多个实例可以共享这个组来处理消息。
  • consumer.batch-mode: true:启用批处理模式,允许消费者一次处理多条消息。
        digitRemovingConsumer-out-0:
          destination: batch-out
  • stream.bindings.digitRemovingConsumer-out-0.destination:将digitRemovingConsumer函数的输出绑定到Kafka主题batch-out

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • https://github.com/Harries/springcloud-demo(Spring Cloud Stream)

4.测试

启动弄Spring Boot应用,可以看到控制台输出日志如下:

291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in
c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in
a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in
db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in
b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in
Removed digits from batch of 5
batch-out -> eacc-ee-dfb-b-dead
batch-out -> cbae-e-f-c-acfb
batch-out -> ab-dd---adade
batch-out -> db-fb-f-bbb-bfdec
batch-out -> bdb--d-ad-bbbb

5.引用

  • https://github.com/spring-cloud/spring-cloud-stream-samples
  • Spring Cloud Stream 3.X - coderZoe的博客
  • Spring Cloud Stream实现数据流处理 | Harries Blog™

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/920484.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

i春秋-签到题

练习平台地址 竞赛中心 题目描述 题目内容 点击GUESS后会有辨识细菌的选择题 全部完成后会有弹窗提示 输入nickname后提示获得flag F12检查 元素中没有发现信息 检查后发现flag在控制台中 flag flag{663a5c95-3050-4c3a-bb6e-bc4f2fb6c32e} 注意事项 flag不一定要在元素中找&a…

无人机 PX4飞控 | CUAV 7-Nano 飞行控制器介绍与使用

无人机 PX4飞控 | CUAV 7-Nano 飞行控制器介绍与使用 7-Nano简介硬件参数接口定义模块连接供电部分遥控器电机 固件安装 7-Nano简介 7-Nano是一款针对小型化无人系统设备研发的微型自动驾驶仪。它由雷迅创新自主研发和生产&#xff0c;其创新性的采用叠层设计&#xff0c;在极…

怎么编译OpenWrt镜像?-基于Widora开发板

1.准备相应的环境&#xff0c;我使用的环境是VMware16ubuntu20.04&#xff0c;如图1所示安装编译所需的依赖包&#xff1b; sudo apt-get install build-essential asciidoc binutils bzip2 gawk gettext git libncurses5-dev libz-dev patch python3 python2.7 unzip zlib1g-…

优化表单交互:在 el-select 组件中嵌入表格显示选项

介绍了一种通过 el-select 插槽实现表格样式数据展示的方案&#xff0c;可更直观地辅助用户选择。支持列配置、行数据绑定及自定义搜索&#xff0c;简洁高效&#xff0c;适用于复杂选择场景。完整代码见GitHub 仓库。 背景 在进行业务开发选择订单时&#xff0c;如果单纯的根…

SpringBoot中设置超时30分钟自动删除元素的List和Map

简介 在 Spring Boot 中&#xff0c;你可以使用多种方法来实现自动删除超时元素的 List 或 Map。以下是两种常见的方式&#xff1a; 如果你需要简单的功能并且不介意引入外部依赖&#xff0c;可以选择 Guava Cache。如果你想要更灵活的控制&#xff0c;使用 Spring 的调度功能…

使用可视化工具kafkatool连接docker的kafka集群,查看消息内容和offset

1、下载kafkatool 下载地址Offset Explorer&#xff0c;下载对应系统的offset explorer 下载完&#xff0c;傻瓜安装即可&#xff08;建议放D盘&#xff09;&#xff0c;在开始菜单输入offset找到该应用打开 打开 2、连接kafka 点击File > add new connection Bootstrap…

Docker3:docker基础1

欢迎来到“雪碧聊技术”CSDN博客&#xff01; 在这里&#xff0c;您将踏入一个专注于Java开发技术的知识殿堂。无论您是Java编程的初学者&#xff0c;还是具有一定经验的开发者&#xff0c;相信我的博客都能为您提供宝贵的学习资源和实用技巧。作为您的技术向导&#xff0c;我将…

BLIP-2模型的详解与思考

大模型学习笔记------BLIP-2模型的详解与思考 1、BLIP-2框架概述2、BLIP-2网络结构详解3、BLIP-2的几点思考 上一篇文章上文中讲解了 BLIP&#xff08;Bootstrapping Language-Image Pretraining&#xff09;模型的一些思考&#xff0c;本文将讲述一个BLIP的升级版 BLIP-2&am…

java-贪心算法

1. 霍夫曼编码&#xff08;Huffman Coding&#xff09; 描述&#xff1a; 霍夫曼编码是一种使用变长编码表对数据进行编码的算法&#xff0c;由David A. Huffman在1952年发明。它是一种贪心算法&#xff0c;用于数据压缩。霍夫曼编码通过构建一个二叉树&#xff08;霍夫曼树&a…

RK3568平台(中断篇)ARM中断流程

一.ARM 处理器程序运行的过程 ARM芯片属于精简指令集计算机 (RISC: Reduced Instruction Set Computing),它所用的指令比较简单,有如下特点: ① 对内存只有读、写指令 ② 对于数据的运算是在CPU内部实现 ③ 使用RISC指令的CPU复杂度小一点,易于设计 比如对于 a=a+b 这…

SpringBoot与MongoDB深度整合及应用案例

SpringBoot与MongoDB深度整合及应用案例 在当今快速发展的软件开发领域&#xff0c;NoSQL数据库因其灵活性和可扩展性而变得越来越流行。MongoDB&#xff0c;作为一款领先的NoSQL数据库&#xff0c;以其文档导向的存储模型和强大的查询能力脱颖而出。本文将为您提供一个全方位…

项目实战(webshop)

一、搭建靶场webshop(www.example1.com) 二、信息收集 1、获取IP winR→cmd→ping www.example1.com→显示出ip(192.168.51.128) 注&#xff1a;TTL为IP包的生存时间&#xff0c;拿到TTL我们可以大概的判断一下目标机器的操作系统&#xff0c;但不是很准确&#xff0c;因为…

初识算法 · 分治(3)

目录 前言&#xff1a; 归并排序 题目解析 算法原理 算法编写 求逆序对总数 题目解析 算法原理 算法编写 前言&#xff1a; ​本文的主题是分治&#xff0c;通过两道题目讲解&#xff0c;一道是归并排序&#xff0c;一道是求逆序对。 链接分别为&#xff1a; 912. 排…

MyBatis——#{} 和 ${} 的区别和动态 SQL

1. #{} 和 ${} 的区别 为了方便&#xff0c;接下来使用注解方式来演示&#xff1a; #{} 的 SQL 语句中的参数是用过 ? 来起到类似于占位符的作用&#xff0c;而 ${} 是直接进行参数替换&#xff0c;这种直接替换的即时 SQL 就可能会出现一个问题 当传入一个字符串时&#xff…

学习threejs,导入FBX格式骨骼绑定模型

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;threejs gis工程师 文章目录 一、&#x1f340;前言1.1 ☘️THREE.FBXLoader fbx 模型加…

【腾讯云产品最佳实践】腾讯云CVM入门技术与实践:通过腾讯云快速构建云上应用

目录 前言 什么是腾讯云CVM&#xff1f; 腾讯云CVM的技术优势 基于最佳技术实践&#xff0c;使用腾讯云CVM搭建应用 1. 开通CVM实例 2. 连接CVM实例 3. 配置Web环境 4. 部署PHP应用 腾讯云CVM行业应用案例&#xff1a;电商平台的双十一攻略 1. 弹性伸缩解决高并发问题…

mongodb多表查询,五个表查询

需求是这样的&#xff0c;而数据是从mysql导入进来的&#xff0c;由于mysql不支持数组类型的数据&#xff0c;所以有很多关联表。药剂里找药物&#xff0c;需要药剂与药物的关联表&#xff0c;然后再找药物表。从药物表里再找药物与成分关联表&#xff0c;最后再找成分表。 这里…

STL中vector实现——简单易懂版

本章内容 模拟实现 vector 的部分重要功能 1.迭代器的引入1.1 之前写法1.2 STL库中的写法 2.默认成员函数2.1构造与拷贝构造2.2拷贝赋值2.3析构函数 3.增删查改功能3.1插入3.2删除 4.为什么STL中vector没有find函数&#xff1f;5.&#x1f525;&#x1f525;迭代器失效场景&am…

Springboot + vue 健身房管理系统项目部署

1、前言 ​ 许多人在拿到 Spring Boot 项目的源码后&#xff0c;不知道如何运行。我以 Spring Boot Vue 健身房管理系统的部署为例&#xff0c;详细介绍一下部署流程。大多数 Spring Boot 项目都可以通过这种方式部署&#xff0c;希望能帮助到大家。 ​ 2、项目查看 ​ 首…

NuGet如何支持HTTP源

今天是2024年11月21号&#xff0c;最近更新了VisualStudio后发现HTTP的包源已经默认禁止使用了&#xff0c;生成时会直接报错。如下图&#xff1a; 官方也明确指出了要想使用HTTP包源的解决办法&#xff0c;这里就简单总结一下。 一、全局配置 1、全局NuGet包的配置文件路径在…