文章目录
- Introduce
- Project Structure
- Declare Plugins and Modules
- Apply Plugins and Add Dependencies
- Sender Properties
- Sender Application
- Sender Controller
- Receiver Properties
- Receiver Application
- Receiver Message Handler
- Congratulations
- Automatically Send Message By Interval
- Type Adapter for Payload
- Send Message Model as JSON
- Receive JSON as Message Model
Introduce
spring-cloud-starter-stream
have a great change since version 4.x
most annotations like @EnableBinding
@Input
@Output
@StreamListener
were all removed
this blog is about stream-rocketmq
, but also fit for stream-kafaka
just migrate dependency from rocketmq
to kafaka
Project Structure
- stream-binder-sender : rocket message sender
- stream-binder-receiver : rocket message receiver
Declare Plugins and Modules
pluginManagement {
repositories {
gradlePluginPortal()
google()
mavenCentral()
}
}
dependencyResolutionManagement {
repositoriesMode = RepositoriesMode.PREFER_SETTINGS
repositories {
gradlePluginPortal()
google()
mavenCentral()
}
}
buildscript {
repositories {
gradlePluginPortal()
google()
mavenCentral()
}
}
plugins {
id("org.jetbrains.kotlin.jvm") version "2.0.21" apply false
id("org.jetbrains.kotlin.kapt") version "2.0.21" apply false
id("org.jetbrains.kotlin.plugin.spring") version "2.0.21" apply false
id("org.springframework.boot") version "3.4.1" apply false
}
include("stream-binder-sender")
include("stream-binder-receiver")
Apply Plugins and Add Dependencies
plugins {
id("org.jetbrains.kotlin.jvm")
id("org.jetbrains.kotlin.kapt")
id("org.jetbrains.kotlin.plugin.spring")
id("org.springframework.boot")
}
java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}
dependencies {
val springBootVersion = "3.4.1"
val springCloudVersion = "4.2.0"
val springCloudAlibabaVersion = "2023.0.3.2"
// commons
api("io.github.hellogoogle2000:kotlin-commons:1.0.19")
// kotlin
api("org.jetbrains.kotlin:kotlin-reflect:2.0.21")
// spring
api("org.springframework.boot:spring-boot-starter:$springBootVersion")
api("org.springframework.boot:spring-boot-starter-web:$springBootVersion")
api("org.springframework.cloud:spring-cloud-starter-bootstrap:$springCloudVersion")
// spring cloud stream binder
api("com.alibaba.cloud:spring-cloud-starter-stream-rocketmq:$springCloudAlibabaVersion")
}
Sender Properties
configTopicSender-out
is the name for customized output binding object
# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Sender Application
package x.spring.hello
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
@SpringBootApplication
class StreamBinderSenderApplication
fun main(args: Array<String>) {
runApplication<StreamBinderSenderApplication>(*args)
}
Sender Controller
the binding name for sending should be same with output name in properties
package x.spring.hello.controller
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.messaging.support.MessageBuilder
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController
@RestController
class MessageSendController {
@Autowired
private lateinit var bridge: StreamBridge
@GetMapping("send")
fun send(): String {
val payload = "config"
val message = MessageBuilder.withPayload(payload).build()
bridge.send("configTopicProducer-out", message)
return "send successfully"
}
}
Receiver Properties
plainTextConsumer
is the name of message handler function
remember it and you should implement it by yourself
you can define multiple message handler functions, and split with ,
plainTextConsumer-in-0
is the name of input binding object
its format is constrained to format of <definition>-in-<index>
# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Receiver Application
package x.spring.hello
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
@SpringBootApplication
class StreamBinderReceiverApplication
fun main(args: Array<String>) {
runApplication<StreamBinderReceiverApplication>(*args)
}
Receiver Message Handler
function name correspond to properties specified by spring.cloud.function.definition
property
package x.spring.hello.component
import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer
@Component
class MessageConsumerObject {
@Bean("configTopicConsumer")
fun configTopicConsumer(): Consumer<Message<String>> {
return Consumer<Message<String>> { message ->
val payload = message.payload
println("consumer receive config topic message: $payload")
}
}
}
Congratulations
now, you have get known about basic usage of message binder
do not modify demos above, it may cause a failure, and waste lots of time
try your own ways, let them run out first
let us try some advanced way, after achieve goals above
Automatically Send Message By Interval
register a supplier object to automatically generate heartbeat message
package x.spring.hello.component
import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.messaging.MessageHeaders
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
import org.springframework.util.MimeTypeUtils
import java.util.function.Supplier
@Component
class MessageSupplierObject {
@Bean
fun heartPacketProducer(): Supplier<Message<String>> {
return Supplier<Message<String>> {
println("send heart packet message")
val payload = "heart"
val message = MessageBuilder
.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.build()
return@Supplier message
}
}
}
update properties of sender project, add a output binding object named heartPacketProducer
# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.function.definition=heartPacketProducer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.bindings.configTopicProducer-out.consumer.concurrency=100
spring.cloud.stream.bindings.heartPacketProducer-out-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketProducer-out-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketProducer-out-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketProducer-out-0.consumer.concurrency=100
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
update properties of receiver project, add a input binding object named heartPacketConsumer
# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer;heartPacketConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketConsumer-in-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
register message handler function for receiver project
package x.spring.hello.component
import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer
@Component
class MessageConsumerObject {
@Bean("heartPacketConsumer")
fun heartPacketConsumer(): Consumer<Message<String>> {
return Consumer<Message<String>> { message ->
val payload = message.payload
println("consumer receive heart packet message: $payload")
}
}
@Bean("configTopicConsumer")
fun configTopicConsumer(): Consumer<Message<String>> {
return Consumer<Message<String>> { message ->
val payload = message.payload
println("consumer receive config topic message: $payload")
}
}
}
Type Adapter for Payload
this enable your auto send and receive advanced object like class/json/xml
put this adapter file into both sender project and receiver object
package x.spring.hello.component
import org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
import x.kotlin.commons.serialize.JSON.fromJson
import x.kotlin.commons.serialize.JSON.toJson
import x.spring.hello.model.ConfigModel
import java.util.function.Function
@Component
class MessageModelAdapter {
@Bean
fun configModelConvertor1(): Function<ConfigModel, String> {
return Function { it.toJson() }
}
@Bean
fun configModelConvertor2(): Function<String, ConfigModel> {
return Function { it.fromJson(ConfigModel::class.java) }
}
}
Send Message Model as JSON
@GetMapping("send2")
fun send2(): String {
val payload = ConfigModel()
payload.username = "admin"
payload.password = "123456"
val message = MessageBuilder
.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build()
bridge.send("configModelProducer-out", message)
return "send successfully"
}
spring.cloud.stream.bindings.configModelProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configModelProducer-out.destination=topic-config-model
spring.cloud.stream.bindings.configModelProducer-out.content-type=application/json
spring.cloud.stream.bindings.configModelProducer-out.consumer.concurrency=100
Receive JSON as Message Model
@Bean
fun configModelConsumer(): Consumer<Message<ConfigModel>> {
return Consumer<Message<ConfigModel>> { message ->
val payload = message.payload.toJson()
println("consumer receive config model message: $payload")
}
}
spring.cloud.function.definition=configModelConsumer
spring.cloud.stream.bindings.configModelConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configModelConsumer-in-0.destination=topic-config-model
spring.cloud.stream.bindings.configModelConsumer-in-0.content-type=application/json