[ Spring ] Spring Cloud Alibaba Message Stream Binder for RocketMQ 2025

文章目录

          • Introduce
          • Project Structure
          • Declare Plugins and Modules
          • Apply Plugins and Add Dependencies
          • Sender Properties
          • Sender Application
          • Sender Controller
          • Receiver Properties
          • Receiver Application
          • Receiver Message Handler
          • Congratulations
          • Automatically Send Message By Interval
          • Type Adapter for Payload
          • Send Message Model as JSON
          • Receive JSON as Message Model

Introduce

spring-cloud-starter-stream have a great change since version 4.x

most annotations like @EnableBinding @Input @Output @StreamListener were all removed

this blog is about stream-rocketmq, but also fit for stream-kafaka

just migrate dependency from rocketmq to kafaka

Project Structure
  • stream-binder-sender : rocket message sender
  • stream-binder-receiver : rocket message receiver
Declare Plugins and Modules
pluginManagement {
    repositories {
        gradlePluginPortal()
        google()
        mavenCentral()
    }
}

dependencyResolutionManagement {
    repositoriesMode = RepositoriesMode.PREFER_SETTINGS
    repositories {
        gradlePluginPortal()
        google()
        mavenCentral()
    }
}

buildscript {
    repositories {
        gradlePluginPortal()
        google()
        mavenCentral()
    }
}

plugins {
    id("org.jetbrains.kotlin.jvm") version "2.0.21" apply false
    id("org.jetbrains.kotlin.kapt") version "2.0.21" apply false
    id("org.jetbrains.kotlin.plugin.spring") version "2.0.21" apply false
    id("org.springframework.boot") version "3.4.1" apply false
}

include("stream-binder-sender")
include("stream-binder-receiver")
Apply Plugins and Add Dependencies
plugins {
    id("org.jetbrains.kotlin.jvm")
    id("org.jetbrains.kotlin.kapt")
    id("org.jetbrains.kotlin.plugin.spring")
    id("org.springframework.boot")
}

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(17)
    }
}

dependencies {
    val springBootVersion = "3.4.1"
    val springCloudVersion = "4.2.0"
    val springCloudAlibabaVersion = "2023.0.3.2"
    // commons
    api("io.github.hellogoogle2000:kotlin-commons:1.0.19")
    // kotlin
    api("org.jetbrains.kotlin:kotlin-reflect:2.0.21")
    // spring
    api("org.springframework.boot:spring-boot-starter:$springBootVersion")
    api("org.springframework.boot:spring-boot-starter-web:$springBootVersion")
    api("org.springframework.cloud:spring-cloud-starter-bootstrap:$springCloudVersion")
    // spring cloud stream binder
    api("com.alibaba.cloud:spring-cloud-starter-stream-rocketmq:$springCloudAlibabaVersion")
}
Sender Properties

configTopicSender-out is the name for customized output binding object

# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Sender Application
package x.spring.hello

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class StreamBinderSenderApplication

fun main(args: Array<String>) {
    runApplication<StreamBinderSenderApplication>(*args)
}
Sender Controller

the binding name for sending should be same with output name in properties

package x.spring.hello.controller

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.messaging.support.MessageBuilder
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController

@RestController
class MessageSendController {

    @Autowired
    private lateinit var bridge: StreamBridge

    @GetMapping("send")
    fun send(): String {
        val payload = "config"
        val message = MessageBuilder.withPayload(payload).build()
        bridge.send("configTopicProducer-out", message)
        return "send successfully"
    }
}
Receiver Properties

plainTextConsumer is the name of message handler function

remember it and you should implement it by yourself

you can define multiple message handler functions, and split with ,

plainTextConsumer-in-0 is the name of input binding object

its format is constrained to format of <definition>-in-<index>

# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Receiver Application
package x.spring.hello

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class StreamBinderReceiverApplication

fun main(args: Array<String>) {
    runApplication<StreamBinderReceiverApplication>(*args)
}
Receiver Message Handler

function name correspond to properties specified by spring.cloud.function.definition property

package x.spring.hello.component

import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer

@Component
class MessageConsumerObject {

    @Bean("configTopicConsumer")
    fun configTopicConsumer(): Consumer<Message<String>> {
        return Consumer<Message<String>> { message ->
            val payload = message.payload
            println("consumer receive config topic message: $payload")
        }
    }
}
Congratulations

now, you have get known about basic usage of message binder

do not modify demos above, it may cause a failure, and waste lots of time

try your own ways, let them run out first

let us try some advanced way, after achieve goals above

Automatically Send Message By Interval

register a supplier object to automatically generate heartbeat message

package x.spring.hello.component

import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.messaging.MessageHeaders
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
import org.springframework.util.MimeTypeUtils
import java.util.function.Supplier

@Component
class MessageSupplierObject {

    @Bean
    fun heartPacketProducer(): Supplier<Message<String>> {
        return Supplier<Message<String>> {
            println("send heart packet message")
            val payload = "heart"
            val message = MessageBuilder
                .withPayload(payload)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
                .build()
            return@Supplier message
        }
    }
}

update properties of sender project, add a output binding object named heartPacketProducer

# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.function.definition=heartPacketProducer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.bindings.configTopicProducer-out.consumer.concurrency=100
spring.cloud.stream.bindings.heartPacketProducer-out-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketProducer-out-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketProducer-out-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketProducer-out-0.consumer.concurrency=100
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

update properties of receiver project, add a input binding object named heartPacketConsumer

# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer;heartPacketConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketConsumer-in-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

register message handler function for receiver project

package x.spring.hello.component

import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer

@Component
class MessageConsumerObject {

    @Bean("heartPacketConsumer")
    fun heartPacketConsumer(): Consumer<Message<String>> {
        return Consumer<Message<String>> { message ->
            val payload = message.payload
            println("consumer receive heart packet message: $payload")
        }
    }

    @Bean("configTopicConsumer")
    fun configTopicConsumer(): Consumer<Message<String>> {
        return Consumer<Message<String>> { message ->
            val payload = message.payload
            println("consumer receive config topic message: $payload")
        }
    }
}
Type Adapter for Payload

this enable your auto send and receive advanced object like class/json/xml

put this adapter file into both sender project and receiver object

package x.spring.hello.component

import org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
import x.kotlin.commons.serialize.JSON.fromJson
import x.kotlin.commons.serialize.JSON.toJson
import x.spring.hello.model.ConfigModel
import java.util.function.Function

@Component
class MessageModelAdapter {

    @Bean
    fun configModelConvertor1(): Function<ConfigModel, String> {
        return Function { it.toJson() }
    }

    @Bean
    fun configModelConvertor2(): Function<String, ConfigModel> {
        return Function { it.fromJson(ConfigModel::class.java) }
    }
}
Send Message Model as JSON
@GetMapping("send2")
fun send2(): String {
    val payload = ConfigModel()
    payload.username = "admin"
    payload.password = "123456"
    val message = MessageBuilder
        .withPayload(payload)
        .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
        .build()
    bridge.send("configModelProducer-out", message)
    return "send successfully"
}
spring.cloud.stream.bindings.configModelProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configModelProducer-out.destination=topic-config-model
spring.cloud.stream.bindings.configModelProducer-out.content-type=application/json
spring.cloud.stream.bindings.configModelProducer-out.consumer.concurrency=100
Receive JSON as Message Model
@Bean
fun configModelConsumer(): Consumer<Message<ConfigModel>> {
    return Consumer<Message<ConfigModel>> { message ->
        val payload = message.payload.toJson()
        println("consumer receive config model message: $payload")
    }
}
spring.cloud.function.definition=configModelConsumer
spring.cloud.stream.bindings.configModelConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configModelConsumer-in-0.destination=topic-config-model
spring.cloud.stream.bindings.configModelConsumer-in-0.content-type=application/json

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/959564.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

“大模型横扫千军”背后的大数据挖掘--浅谈MapReduce

文章目录 O 背景知识1 数据挖掘2 邦费罗尼原则3 TF.IDF4 哈希函数5 分布式文件系统 一、MapReduce基本介绍1. Map 任务2. 按键分组3. Reduce 任务4. 节点失效处理5.小测验&#xff1a;在一个大型语料库上有100个map任务和若干reduce任务&#xff1a; 二、基于MapReduce的基本运…

25美赛ABCDEF题详细建模过程+可视化图表+参考论文+写作模版+数据预处理

详情见该链接&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; 25美国大学生数学建模如何准备&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;-CSDN博客文章浏览阅读791次&#xff0c;点赞13次&#xff0c;收藏7次。通过了解比赛基本…

Python:元组构造式和字典推导式

&#xff08;Python 元组构造式和字典推导式整理笔记&#xff09; 1. 元组构造式 1.1 创建元组 使用圆括号&#xff1a; tuple1 (1, 2.5, (three, four), [True, 5], False) print(tuple1) # 输出: (1, 2.5, (three, four), [True, 5], False) 省略圆括号&#xff1a; tup…

appium自动化环境搭建

一、appium介绍 appium介绍 appium是一个开源工具、支持跨平台、用于自动化ios、安卓手机和windows桌面平台上面的原生、移动web和混合应用&#xff0c;支持多种编程语言(python&#xff0c;java&#xff0c;Ruby&#xff0c;Javascript、PHP等) 原生应用和混合应用&#xf…

vue3组件el-table报错

传给table标签的data不是数组就会报错&#xff0c; 摁着商品管理代码找了半天也没发现哪里错了&#xff0c;而且关闭报错表格数据能正常显示&#xff0c; 。。。 最后发现我还有个订单管理页面&#xff0c;这里面的data初始化成ref( )了&#xff0c;把这个组件注释掉&#xf…

基于SpringBoot+WebSocket的前后端连接,并接入文心一言大模型API

前言&#xff1a; 本片博客只讲述了操作的大致流程&#xff0c;具体实现步骤并不标准&#xff0c;请以参考为准。 本文前提&#xff1a;熟悉使用webSocket 如果大家还不了解什么是WebSocket&#xff0c;可以参考我的这篇博客&#xff1a; rWebSocket 详解&#xff1a;全双工…

《边界感知的分而治之方法:基于扩散模型的无监督阴影去除解决方案》学习笔记

paper&#xff1a;Boundary-Aware Divide and Conquer: A Diffusion-Based Solution for Unsupervised Shadow Removal 目录 摘要 1、介绍 2、相关工作 2.1 阴影去除 2.2 去噪扩散概率模型&#xff08;Denoising Diffusion Probabilistic Models, DDPM&#xff09; 3、方…

linux-mysql在centos7安装和基础配置

1.安装mysql数据库 1.使用官网安装 1.检查是否存在mysql的分支mariadb [rootlocalhost ~]# rpm -qa |grep mariadb mariadb-libs-5.5.64-1.el7.x86_64 [rootlocalhost ~]# 2.卸载这个分支包 [rootlocalhost ~]# rpm -qa | grep mariadb mariadb-libs-5.5.64-1.el7.x86_64 …

Python!从0开始学爬虫:(一)HTTP协议 及 请求与响应

前言 爬虫需要基础知识&#xff0c;HTTP协议只是个开始&#xff0c;除此之外还有很多&#xff0c;我们慢慢来记录。 今天的HTTP协议&#xff0c;会有助于我们更好的了解网络。 一、什么是HTTP协议 &#xff08;1&#xff09;定义 HTTP&#xff08;超文本传输协议&#xff…

MySQL数据库笔记——最左前缀原则原理及其注意事项

大家好&#xff0c;这里是Good Note&#xff0c;关注 公主号&#xff1a;Goodnote&#xff0c;专栏文章私信限时Free。本文详细介绍MySQL索引的关键潜规则——最左前缀原则。 文章目录 图示单值索引和联合索引单值索引联合索引 最左前缀原则示例分析1. 全值匹配查询时2. 匹配左…

Java数据结构 (链表反转(LinkedList----Leetcode206))

1. 链表的当前结构 每个方框代表一个节点&#xff0c;每个节点包含两个部分&#xff1a; 左侧的数字&#xff1a;节点存储的值&#xff0c;例如 45、34 等。右侧的地址&#xff08;如 0x90&#xff09;&#xff1a;表示该节点 next 指针指向的下一个节点的内存地址。 例子中&a…

IMX6ull项目环境配置

文件解压缩&#xff1a; .tar.gz 格式解压为 tar -zxvf .tar.bz2 格式解压为 tar -jxvf 2.4版本后的U-boot.bin移植进SD卡后&#xff0c;通过串口启动配置开发板和虚拟机网络。 setenv ipaddr 192.168.2.230 setenv ethaddr 00:04:9f:…

git基础指令大全

版本控制 git管理文件夹 进入要管理的文件夹 — 进入 初始化&#xff08;提名&#xff09; git init 管理文件夹 生成版本 .git ---- git在管理文件夹时&#xff0c;版本控制的信息 生成版本 git status 检测当前文件夹下的文件状态 (检测&#xff0c;检测之后就要管理了…

[高等数学学习记录]函数的极值与最大值最小值

1 知识点 1.1 函数的极值及其求法 定义 设函数 f ( x ) f(x) f(x) 在点 x 0 x_0 x0​ 的某邻域 U ˚ ( x 0 ) \mathring{U}(x_0) U˚(x0​) 内有定义&#xff0c;如果对于去心邻域 U ˚ ( x 0 ) \mathring{U}(x_0) U˚(x0​) 内的任一 x x x&#xff0c;有 f ( x ) <…

docker 简要笔记

文章目录 一、前提内容1、docker 环境准备2、docker-compose 环境准备3、流程说明 二、打包 docker 镜像1、基础镜像2、国内镜像源3、基础的dockerfile4、打包镜像 四、构建运行1、docker 部分2、docker-compose 部分2.1、构建docker-compose.yml2.1.1、同目录构建2.1.2、利用镜…

JVM常见知识点

在《深入理解Java虚拟机》一书中&#xff0c;介绍了JVM的相关特性。 1、JVM的内存区域划分 在真实的操作系统中&#xff0c;对于地址空间进行了分区域的设计&#xff0c;由于JVM是仿照真实的机器进行设计的&#xff0c;那么也进行了分区域的设计。核心区域有四个&#xff0c;…

电脑系统bcd文件损坏修复方法:小白也会的修复方法

电脑系统bcd文件损坏怎么办?当电脑开机时出现bcd文件损坏&#xff0c;一般情况是由于电脑系统的引导坏了&#xff0c;需要进行修复。现在越来越多的小伙伴遇到电脑引导丢失或者安装后无法正常引导的问题&#xff0c;我们现在一般是pe下进行修复引导&#xff0c;那么电脑系统bc…

Flutter_学习记录_导航和其他

Flutter 的导航页面跳转&#xff0c;是通过组件Navigator 和 组件MaterialPageRoute来实现的&#xff0c;Navigator提供了很多个方法&#xff0c;但是目前&#xff0c;我只记录我学习过程中接触到的方法&#xff1a; Navigator.push(), 跳转下一个页面Navigator.pop(), 返回上一…

【架构面试】二、消息队列和MySQL和Redis

MQ MQ消息中间件 问题引出与MQ作用 常见面试问题&#xff1a;面试官常针对项目中使用MQ技术的候选人提问&#xff0c;如如何确保消息不丢失&#xff0c;该问题可考察候选人技术能力。MQ应用场景及作用&#xff1a;以京东系统下单扣减京豆为例&#xff0c;MQ用于交易服和京豆服…

Git 如何将旧仓库迁移新仓库中,但不显示旧的提交记录

一、异常错误 场景&#xff1a;我想把旧仓库迁移新仓库中&#xff0c;放进去之后&#xff0c;新仓库会显示这个项目之前的所有提交&#xff0c;如何不显示这些旧的提交&#xff1f; 二、原因 我们需要将旧仓库迁移新仓库中&#xff0c;但是又不想在新仓库中显示旧的提交记录…