Spring Cloud Stream实践

概述

不同中间件,有各自的使用方法,代码也不一样。

可以使用Spring Cloud Stream解耦,切换中间件时,不需要修改代码。实现方式为使用绑定层,绑定层对生产者和消费者提供统一的编码方式,需要连接不同的中间件时,绑定层使用不同的绑定器即可,也就是把切换中间件需要做相应的修改工作交给绑定层来做。

本文的操作是在 微服务调用链路追踪 的基础上进行。

环境说明

jdk1.8

maven3.6.3

mysql8

spring cloud2021.0.8

spring boot2.7.12

idea2022

rabbitmq3.12.4

步骤

消息生产者

创建子模块stream_producer

添加依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>

刷新依赖

配置application.yml

server:
  port: 7001
spring:
  application:
    name: stream_producer
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        output:
          destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称
      binders:
        defaultRabbit:
          type: rabbit #配置默认的绑定器为rabbit

查看Source.class源码

编写生产者代码,发送一条消息("hello world")到rabbitmq的my-default exchange中

package org.example.stream;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding(Source.class)
@SpringBootApplication
public class StreamProductApplication implements CommandLineRunner {
    @Autowired
    private MessageChannel output;

    @Override
    public void run(String... args) throws Exception {
        //发送消息
        // messageBuilder 工具类,创建消息
        output.send(MessageBuilder.withPayload("hello world").build());
    }

    public static void main(String[] args) {
        SpringApplication.run(StreamProductApplication.class, args);
    }


}

查看rabbitmq web UI

http://localhost:15672/

看到Exchanges中还没有my-default

运行StreamProductApplication

刷新rabbitmq Web UI,看到了my-dafault的exchange

消息消费者

创建子模块stream_consumer

添加依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>

配置application.yml

server:
  port: 7002
spring:
  application:
    name: stream_consumer
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input: #内置获取消息的通道,从destination配置值的exchange中获取信息
          destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称
      binders:
        defaultRabbit:
          type: rabbit #配置默认的绑定器为rabbit

查看内置通道名称为input

编写消息消费者启动类,在启动类监听接收消息

package org.example.stream;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

@SpringBootApplication
@EnableBinding(Sink.class)
public class StreamConsumerApplication {

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("监听收到:" + message.getPayload());
    }

    public static void main(String[] args) {
        SpringApplication.run(StreamConsumerApplication.class, args);
    }
}

运行stream_consumer消费者服务,监听消息

运行stream_producer生产者服务,发送消息

查看消费者服务控制台日志,接收到了消息

优化代码

之前把生产和消费的消息都写在启动类中了,代码耦合高。

优化思路是把不同功能的代码分开放。

消息生产者

stream_producer 代码结构如下

package org.example.stream.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * 向中间件发送数据
 */
@Component
@EnableBinding(Source.class)
public class MessageSender {
    @Autowired
    private MessageChannel output;//通道

    //发送消息
    public void send(Object obj){
        output.send(MessageBuilder.withPayload(obj).build());
    }
}

修改启动类

package org.example.stream;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

@SpringBootApplication
public class StreamProductApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamProductApplication.class, args);
    }

}

pom.xml添加junit依赖

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <scope>test</scope>
</dependency>

刷新依赖

编写测试类

在stream_producerm模块的src/test目录下,新建org.example.stream包,再建出ProducerTest类,代码如下

package org.example.stream;

import org.example.stream.producer.MessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {
    @Autowired
    private MessageSender messageSender;//注入发送消息工具类

    @Test
    public void testSend(){
        messageSender.send("hello world");
    }
}

 

消息消费者

stream_consumer代码结构如下

添加MessageListener类获取消息

package org.example.stream.consumer;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class MessageListener {

    // 监听binding中的信息
    @StreamListener(Sink.INPUT)
    public void input(String message){
        System.out.println("获取信息:" + message);
    }
}

修改启动类

package org.example.stream;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

@SpringBootApplication
public class StreamConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamConsumerApplication.class, args);
    }

}

启动consumer接收消息

执行producer单元测试类ProducerTest的testSend()方法,发送消息

查看consumer控制台输出,接收到信息了

代码解耦后,同样能成功生产消息和消费消息。

自定义消息通道

此前使用默认的消息通道outputinput。

也可以自己定义消息通道,例如:myoutputmyinput

消息生产者

org.example.stream包下新建channel包,在channel包下新建MyProcessor接口类

package org.example.stream.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * 自定义的消息通道
 */
public interface MyProcessor {
    /**
     * 消息生产这的配置
     */
    String MYOUTPUT = "myoutput";

    @Output("myoutput")
    MessageChannel myoutput();

    /**
     * 消息消费者的配置
     */
    String MYINPUT = "myinput";
    @Input("myinput")
    SubscribableChannel myinput();
}

修改MessageSender

package org.example.stream.producer;

import org.example.stream.channel.MyProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * 向中间件发送数据
 */
@Component
@EnableBinding(MyProcessor.class)
public class MessageSender {
    @Autowired
    private MessageChannel myoutput;//通道

    //发送消息
    public void send(Object obj){
        myoutput.send(MessageBuilder.withPayload(obj).build());
    }
}

修改application.yml

cloud:
  stream:
    bindings:
      output:
        destination: my-default #指定消息发送的目的地
      myoutput:
        destination: custom-output

消息消费者

在stream_consumer服务的org.example.stream包下新建channel包,在channel包下新建MyProcessor接口类

package org.example.stream.channel;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * 自定义的消息通道
 */
public interface MyProcessor {
    /**
     * 消息生产者的配置
     */
    String MYOUTPUT = "myoutput";

    @Output("myoutput")
    MessageChannel myoutput();

    /**
     * 消息消费者的配置
     */
    String MYINPUT = "myinput";
    @Input("myinput")
    SubscribableChannel myinput();
}

修改MessageListener

package org.example.stream.stream;

import org.example.stream.channel.MyProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(MyProcessor.class)
public class MessageListener {

    // 监听binding中的信息
    @StreamListener(MyProcessor.MYINPUT)
    public void input(String message){
        System.out.println("获取信息:" + message);
    }
}

修改application.yml配置

cloud:
  stream:
    bindings:
      input: #内置获取消息的通道,从destination配置值的exchange中获取信息
        destination: my-default #指定消息发送的目的地
      myinput:
        destination: custom-output

测试

启动stream_consumer

运行单元测试的testSend()方法生产消息

查看stream_consumer控制台,能看到生产的消息,如下

获取信息:hello world

消息分组

采用复制配置方式运行两个消费者

启动第一个消费者(端口为7002)

修改端口为7003,copy configuration,再启动另一个消费者

执行生产者单元测试生产消息,看到两个消费者都接收到了信息

说明:如果有两个消费者,生产一条消息后,两个消费者均能收到信息。

但当我们发送一条消息只需要其中一个消费者消费消息时,这时候就需要用到消息分组,发送一条消息消费者组内只有一个消费者消费到。

我们只需要在服务消费者端设置spring.cloud.stream.bindings.input.group 属性即可

重启两个消费者

修改端口号为7002,重新启动第一个消费者

修改端口号为7003,重新启动第二个消费者

生产者生产一条消息

查看消费者接收消息情况,只有一个消费者接收到信息。

消息分区

消息分区就是实现特定消息只往特定机器发送。

修改生产者配置

  cloud:
    stream:
      bindings:
        output:
          destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称
        myoutput:
          destination: custom-output
          producer:
            partition-key-expression: payload #分区关键字 可以是对象中的id,或对象
            partition-count: 2 #分区数量

修改消费者1的application.yml配置

server:
  port: 7002
spring:
  application:
    name: stream_consumer
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input: #内置获取消息的通道,从destination配置值的exchange中获取信息
          destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称
        myinput:
          destination: custom-output
          group: group1 #消息分组,有多个消费者时,只有一个消费者接收到信息
          consumer:
            partitioned: true #开启分区支持
      binders:
        defaultRabbit:
          type: rabbit #配置默认的绑定器为rabbit
      instance-count: 2 #消费者总数
      instance-index: 0 #当前消费者的索引

启动消费者1

修改消费者2的配置

server:
  port: 7003
spring:
  application:
    name: stream_consumer
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input: #内置获取消息的通道,从destination配置值的exchange中获取信息
          destination: my-default #指定消息发送的目的地,值为rabbit的exchange的名称
        myinput:
          destination: custom-output
          group: group1 #消息分组,有多个消费者时,只有一个消费者接收到信息
          consumer:
            partitioned: true #开启分区支持
      binders:
        defaultRabbit:
          type: rabbit #配置默认的绑定器为rabbit
      instance-count: 2 #消费者总数
      instance-index: 1 #当前消费者的索引

修改端口号为7003,当前消费者的索引instance-index的值修改为1

启动消费者2

生产者发送消息,看到只有Application(2)接收到消息

再用生产者发送一次消息,也是Application(2)接收到消息

说明实现了消息分区

也可以更改发送的数据,看是否能发送到不同消费者

修改生产者,发送数据由hello world变为hello world1,同时发送5次

	public void testSend(){
        for (int i = 0; i < 5; i++) {
            messageSender.send("hello world1");
        }
    }

看到hello world1全部被Application消费

所以消息分区是根据发送的消息不同,发送到不同消费者中。

完成!enjoy it!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/169074.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

拼图游游戏代码

一.创建新项目 二.插入图片 三.游戏的主界面 1.代码 package com.itheima.ui;import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.awt.event.KeyEvent; import java.awt.event.KeyListener; import java.util.Random;import javax.swing…

贪吃蛇代码

一.准备 1.新建项目 2.放进照片 3.创建两个包放置图片类和入口类 二&#xff0c;游戏界面 package com.snake.view;import java.awt.Color; import java.awt.EventQueue; import java.awt.Font; import java.awt.Frame; import java.awt.Graphics; import java.awt.Image; i…

【Java程序员面试专栏 算法训练篇】二叉树高频面试算法题

一轮的算法训练完成后,对相关的题目有了一个初步理解了,接下来进行专题训练,以下这些题目就是二叉树相关汇总的高频题目 遍历二叉树 遍历二叉树,分为递归和迭代两种方式,递归类似于DFS,迭代类似于BFS,【算法训练-二叉树 一】【遍历二叉树】前序遍历、中序遍历、后续遍…

【高级程序设计】Week2-4Week3-1 JavaScript

一、Javascript 1. What is JS 定义A scripting language used for client-side web development.作用 an implementation of the ECMAScript standard defines the syntax/characteristics of the language and a basic set of commonly used objects such as Number, Date …

【Java 进阶篇】Ajax 实现——原生JS方式

大家好&#xff0c;欢迎来到这篇关于原生 JavaScript 中使用 Ajax 实现的博客&#xff01;在前端开发中&#xff0c;我们经常需要与服务器进行数据交互&#xff0c;而 Ajax&#xff08;Asynchronous JavaScript and XML&#xff09;是一种用于创建异步请求的技术&#xff0c;它…

多态语法详解

多态语法详解 一&#xff1a;概念1&#xff1a;多态实现条件 二:重写&#xff1a;三&#xff1a;向上转型和向下转型1:向上转型&#xff1a;1&#xff1a;直接赋值&#xff1a;2&#xff1a;方法传参3&#xff1a;返回值 2:向下转型 一&#xff1a;概念 1&#xff1a;同一个引…

Java值传递和引用传递

在Java中&#xff0c;有值传递&#xff08;Pass-by-Value&#xff09;和引用传递&#xff08;Pass-by-Reference&#xff09;两种参数传递方式。 值传递&#xff08;Pass-by-Value&#xff09;&#xff1a;当使用值传递方式时&#xff0c;方法将参数的副本传递给调用方法。这意…

Go 语言中的map和内存泄漏

map在内存中总是会增长&#xff1b;它不会收缩。因此&#xff0c;如果map导致了一些内存问题&#xff0c;你可以尝试不同的选项&#xff0c;比如强制 Go 重新创建map或使用指针。 在 Go 中使用map时&#xff0c;我们需要了解map增长和收缩的一些重要特性。让我们深入探讨这一点…

大型 APP 的性能优化思路

做客户端开发都基本都做过性能优化&#xff0c;比如提升自己所负责的业务的速度或流畅性&#xff0c;优化内存占用等等。但是大部分开发者所做的性能优化可能都是针对中小型 APP 的&#xff0c;大型 APP 的性能优化经验并不会太多&#xff0c;毕竟大型 APP 就只有那么几个&…

ESP32-BLE基础知识

一、存储模式 两种存储模式&#xff1a; 大端存储&#xff1a;低地址存高字节&#xff0c;如将0x1234存成[0x12,0x34]。小端存储&#xff1a;低地址存低字节&#xff0c;如将0x1234存成[0x34,0x12]。 一般来说&#xff0c;我们看到的一些字符串形式的数字都是大端存储形式&a…

【VRTK】【VR开发】【Unity】7-配置交互能力和向量追踪

【前情提要】 目前为止,我们虽然设定了手模型和动画,还能够正确根据输入触发动作,不过还未能与任何物体互动。要互动,需要给手部设定相应的Interactor能力。 【配置Interactor的抓取功能】 在Hierarchy中选中[VRTK_CAMERA_RIGS_SETUP] ➤ Camera Rigs, Tracked Alias ➤ …

MobaXterm配置ssh端口转发(tensorboard使用)

背景&#xff1a; 我有一台本地Windows电脑&#xff0c;上面安装了MobaXterm软件。 MobaXterm通过ssh连接了一台服务器&#xff08;默认是通过22端口连&#xff0c;我这里配了一下&#xff0c;要填别的&#xff09; 现在服务器在跑模型&#xff0c;其6006端口是tensorboard端口…

8、创建第一个鸿蒙页面并实现页面跳转

一、创建页面 1、新建页面 在项目的"pages"目录上右键&#xff0c;选择”新建“——”page" 2、录入页面的名称 在“Page name”中输入页面的名称&#xff0c;并点击“Finish”完成创建 3、以下为创建的新页面 2、注册页面 新建的页面会自动在“resources”…

ArkTS - HarmonyOS服务卡片(创建)

可以参考官网文档 其中我们在已有的文件中File > New > Service Widget创建你想要的小卡片 本文章发布时目前可使用的模板就三种 有卡片后的new 最终效果

pnpm : 无法加载文件 E:\Soft\PromSoft\nodejs\node_global\pnpm.ps1,

pnpm : 无法加载文件 E:\Soft\PromSoft\nodejs\node_global\pnpm.ps1&#xff0c;因为在此系统上禁止运行脚本。有关详细信息&#xff0c;请参阅 https:/go.microsoft.com/fwlink/?LinkID135170 中 的 about_Execution_Policies。 所在位置 行:1 字符: 1pnpm -v~~~~ CategoryI…

Kotlin学习之函数

原文链接 Understanding Kotlin Functions 函数对于编程语言来说是极其重要的一个组成部分&#xff0c;函数可以视为是程序的执行&#xff0c;是真正活的代码&#xff0c;为啥呢&#xff1f;因为运行的时候你必须要执行一个函数&#xff0c;一般从主函数入口&#xff0c;开始一…

《微信小程序开发从入门到实战》学习二十二

3.3 开发创建投票页面 3.3.10 使用switch开关组件 用switch开关组件增加一个设置是否匿名投票的功能。 switch常用属性如下&#xff1a; checked 开还是关&#xff0c;默认false关 disabled 是否禁用&#xff0c;默认false不禁用&#xff0…

应试教育导致学生迷信标准答案惯性导致思维僵化-移动机器人

移动机器人课程群实践创新的困境与突围 一、引言 随着科技的快速发展&#xff0c;工程教育变得越来越重要。然而&#xff0c;传统的应试教育模式往往侧重于理论知识的传授&#xff0c;忽视了学生的实践能力和创新精神的培养。这在移动机器人课程群的教学中表现得尤为明显。本文…

win10手机投屏到电脑的操作方法

工具/原料&#xff1a; 系统版本&#xff1a;iOS 15.3,HarmonyOS 2.0.0&#xff0c;windows10系统 品牌型号&#xff1a;iPhone 13,HUAWEI Mate 40 Pro&#xff0c;联想小新air14 方法/步骤&#xff1a;方法一&#xff1a;安卓手机使用无线投屏功能投屏到win10电脑 1、保持手…

sapjco3.dll has version “721.619“, but required is at least version “721.913“

context with path [] threw exception [org.glassfish.jersey.server.ContainerException: java.lang.ExceptionInInitializerError: Native library sapjco3 is too old. Found library C:\Windows\System32\sapjco3.dll has version “721.619”, but required is at least …