【实战】Spring Cloud Stream 3.1+整合Kafka

文章目录

    • 前言
    • 新版版本优势
    • 实战演示
      • 增加maven依赖
      • 增加applicaiton.yaml配置
      • 新增Kafka通道消费者
      • 新增发送消息的接口
    • 实战测试
      • postman发送一个正常的消息
      • postman发送异常消息

前言

之前我们已经整合过Spring Cloud Stream 3.0版本与Kafka、RabbitMQ中间件,简直不要太好,直接让我们不用再关心底层MQ如何集与消息收发。但是从Spring Cloud 2020版本开始,Spring Cloud Stream的版本升级至3.1.0以上版本,自此版本开始@StreamListener上面就增加@Deprecated注解,不赞成使用,有可能接下来的版本会删除掉。传说是有利于使用Project Reactor提供的事件流抽象(如Flux和Mono),命令函数在每个单独的事件上触发,而reactive函数只触发一次。故今天我们分享一期Spring Cloud Stream 3.1+整合Kafka,各位看官敬请鉴赏。

在这里插入图片描述

新版版本优势

新版提倡用函数式进行发送和消费信息

定义返回类型为Supplier, Function or Consumer的bean提供消息发送和消费的bean 看看绑定名称命名规则
input - + -in- +
output - + -out- +

在配置文件中指定spring.cloud.function.definition/spring.cloud.stream.function.definition的名称后会把这个bean绑定到对应的消费者和提供者上。

比如 inputChannel bean绑定了inputChannel-in-0通道,outputChannel bean绑定了outputChannel-out-0通道:

spring:
  kafka:
    bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092
  cloud:
    stream:
      kafka:
        binder:
          brokers: ${spring.kafka.bootstrap-servers}
      binders:
        kafkahub:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka: ${spring.cloud.stream.kafka.binder}
      default-binder: kafkahub 
      function:
          definition: inputChannel,outputChannel
      bindings:
        inputChannel-in-0:
          binder: kafkahub
          destination: test-kafka-topic
          group: test-kafka-group
          content-type: text/plain
        outputChannel-out-0:
          binder: kafkahub
          destination: test-kafka-topic
          content-type: text/plain
          producer:
            partition-count: 3 #分区数目
  

此时消息生产者为:

@Resource
private StreamBridge streamBridge;

@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){
    boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());
    return send;
}

此时消息消费者为:

@Configuration
public class KafkaChannel {

    @Resource
    private StreamBridge streamBridge;
    /**
     * inputChannel 消费者
     * @author senfel
     * @date 2024/6/18 15:26
     * @return java.util.function.Consumer<java.lang.String>
     */
    @Bean
    public Consumer<Message<String>> inputChannel(){
        return message -> {
            System.out.println("接收到消息Payloa:" + message.getPayload());
            System.out.println("接收到消息Header:" + message.getHeaders());
        };
    }

}

实战演示

我们简单进行一下演示即可,kafka环境可以看我之前的博文搭建。
主要演示功能:

正常情况下生产者发送消息到kafka,消费者监听消息并消费成功
异常情况下消费者消费失败,立即将异常消息投递到另一个topic上,兜底topic消费者消费

本次全部采用自动ack模式,如果需要手动ack参照之前的博文配置即可,注意在消费者端加上手动ack逻辑。

增加maven依赖

 <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.12.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>cce-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>seata-demo-order</name>
<description>Demo project for Spring Boot</description>
<properties>
    <java.version>8</java.version>
    <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies>
       <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        <version>3.2.4</version>
    </dependency>
</dependencies>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

增加applicaiton.yaml配置

spring:
  #kafka
  kafka:
    bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092
  cloud:
    stream:
      kafka:  # kafka配置
        binder:
          brokers: ${spring.kafka.bootstrap-servers}
          auto-add-partitions: true #自动分区
          auto-create-topics: true #自动创建主题
          replication-factor: 3 #副本
          min-partition-count: 3 #最小分区
        bindings:
          outputChannel-out-0:
            producer:
              # 无限制重发不产生消息丢失
              retries: Integer.MAX_VALUE
              #acks =0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低
              #acks =1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中
              #acks = all 、 -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长
              #可以设置的值为:all, -1, 0, 1
              acks: all
              min:
                insync:
                  replicas: 3 #感知副本数
          inputChannel-in-0:
            consumer:
              concurrency: 1 #消费者数量
              max-concurrency: 5 #最大消费者数量
              recovery-interval: 3000  #3s 重连
              auto-rebalance-enabled: true  #主题分区消费者组成员自动平衡
              auto-commit-offset: false   #手动提交偏移量
              enable-dlq: true  # 开启 dlq队列
              dlq-name: test-kafka-topic.dlq
              deserializationExceptionHandler: sendToDlq #异常加入死信
      binders: # 与外部mq组件绑定
        kafkahub:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka: ${spring.cloud.stream.kafka.binder}

      default-binder: kafkahub #默认绑定
      function: # 定义channel名字,每个channel又可以作为生产者(in)与消费者(out)
        definition: inputChannel;outputChannel;dlqChannel
      bindings: # 通道绑定
        inputChannel-in-0:
          binder: kafkahub
          destination: test-kafka-topic
          group: test-kafka-group
          content-type: text/plain
          consumer:
            maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
            backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
            backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2
            backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
        outputChannel-out-0:
          binder: kafkahub
          destination: test-kafka-topic
          content-type: text/plain
          producer:
            partition-count: 3 #分区数目
        dlqChannel-in-0:
          binder: kafkahub
          destination: test-kafka-topic.dlq
          group: test-kafka-group
          content-type: text/plain
          consumer:
            maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3
            backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行
            backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2
            backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10s
        dlqChannel-out-0:
          binder: kafkahub
          destination: test-kafka-topic.dlq
          content-type: text/plain
          producer:
            partition-count: 3 #分区数目

新增Kafka通道消费者

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.function.Consumer;

/**
 * KafkaCustomer
 * @author senfel
 * @version 1.0
 * @date 2024/6/18 15:22
 */
@Configuration
public class KafkaChannel {

    @Resource
    private StreamBridge streamBridge;
    /**
     * inputChannel 消费者
     * @author senfel
     * @date 2024/6/18 15:26
     * @return java.util.function.Consumer<java.lang.String>
     */
    @Bean
    public Consumer<Message<String>> inputChannel(){
        return message -> {
            System.out.println("接收到消息:" + message.getPayload());
            System.out.println("接收到消息:" + message.getHeaders());
            if(message.getPayload().contains("9")){
                boolean send = streamBridge.send("dlqChannel-out-0", MessageBuilder.withPayload("kafka异常消息发送到dlq测试:"+message).build());
                System.err.println("向dlqChannel发送消息:"+send);
            }
        };
    }

    /**
     * dlqChannel 死信消费者
     * @author senfel
     * @date 2024/6/18 15:26
     * @return java.util.function.Consumer<java.lang.String>
     */
    @Bean
    public Consumer<Message<String>> dlqChannel(){
        return message -> {
            System.out.println("死信dlqChannel接收到消息:" + message.getPayload());
            System.out.println("死信dlqChannel接收到消息:" + message.getHeaders());
        };
    }
}

新增发送消息的接口

@Resource
private StreamBridge streamBridge;

@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){
    boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());
    return send;
}

实战测试

postman发送一个正常的消息

在这里插入图片描述

postman发送异常消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/723385.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

要改Google签名?这些你足够了解吗!

大家好&#xff0c;我是小编阿文。欢迎您关注我们&#xff0c;经常分享有关Android出海&#xff0c;iOS出海&#xff0c;App市场政策实时更新&#xff0c;互金市场投放策略&#xff0c;最新互金新闻资讯等文章&#xff0c;期待与您共航世界之海。 老项目keystore签名信息包含国…

python-画三角形

[题目描述] 输入一个正整数n&#xff0c;请使用大写字母拼成一个这样的三角形图案&#xff08;参考样例输入输出&#xff09;&#xff1a;三角形图案的第1行有1个字母&#xff0c;第2行有2个字母&#xff0c;以此类推&#xff1b;在三角形图案中&#xff0c;由上至下、由左至右…

【Redis技术进阶之路】「底层源码解析」揭秘高效存储模型与数据结构底层实现(链表)

揭秘高效存储模型与数据结构底层实现 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 链表使用场景List&#xff08;列表&#xff09;和 链表的关系链表的实现链表的节点list的源码实现结构模…

零信任是对抗AI威胁的“解药”

人工智能的变革力量正在重塑众多行业的业务运营。通过机器人流程自动化&#xff08;RPA&#xff09;&#xff0c;人工智能正在将人力资源从重复的、基于规则的任务中解放出来&#xff0c;并将其重点放在战略性的、复杂的操作上。此外&#xff0c;人工智能和机器学习算法可以以前…

防爆气象仪的工作原理

TH-WFB5矿山气象传感器在矿山安全监测系统中扮演着至关重要的角色&#xff0c;它们能够及时发现异常情况&#xff0c;为矿山的安全运营提供可靠的数据支持。矿山气象传感器能够实时监测矿山环境中的风速、风向、温度、湿度和大气压力等关键气象参数。这些传感器采用先进的传感技…

BottomSheet 半模态视图

先看效果图: 越来越多的app,使用半模态视图,弹窗从底部弹窗,手动滑动收起。交互流程丝滑,体验流畅。我这一研究才发现,官方出了一个控件叫 UISheetPresentationController,使用起来及其方便,只需要关注业务逻辑就可以,着急的朋友可以直接把demo拿去。BottomSheetDemo…

项目二 OpenStack快速入门

任务一 熟悉OpenStack图形界面操作 1.1 Horizon项目 •各OpenStack服务的图形界面都是由Horizon提供的。 •Horizon提供基于Web的模块化用户界面。 •Horizon为云管理员提供一个整体的视图。 •Horizon为终端用户提供一个自主服务的门户。 •Horizon由云管理员进行管理…

2024最新版Vcpkg安装第三方库报错error: building XXXX failed with: BUILD_FAILED

很多朋友用Vcpkg安装第三方库的时候基本都会遇到报错的情况&#xff0c;而且大部分都会出现下面这个页面里面的红色报错信息&#xff0c;但是实际上真正错误应该是上面的Cmake Error提示&#xff0c;下面的红色警告只是Vcpkg官方提供给我们的一个最基础的解决方式&#xff0c;而…

『啥叫不可测试代码』

最近写单元测试&#xff0c;知道 golang的单测覆盖率提高会比较难&#xff0c;没想到这么难。当提高到一定程度&#xff0c;有的 case就无法成型了&#xff0c;也就无从增加覆盖率。为何呢&#xff1f;思考许久&#xff0c;究其原因&#xff0c;还是被测代码属于“不可测代码”…

NSSCTF-Web题目10

目录 [强网杯 2019]随便注 1、题目 2、知识点 3、思路 [GXYCTF 2019]BabyUpload 1、题目 2、知识点 3、思路 [强网杯 2019]随便注 1、题目 2、知识点 数据库注入&#xff0c;堆叠注入&#xff0c;更改表名 3、思路 正常提交查询&#xff0c;看看数据回显 加入单引号…

数学建模基础:非线性模型

目录 前言 一、非线性方程组 二、非线性规划 三、微分方程模型 四、非线性模型的应用 五、实例示范&#xff1a;传染病传播模型 实例总结 五、总结 前言 非线性模型用于描述变量之间的非线性关系&#xff0c;相比线性模型&#xff0c;其数学形式更为复杂&#xff0c;但…

Window常用的脚本有哪些?快来看看有哪些是你正在用的!(欢迎评论补充~)

前言 在日常开发中&#xff0c;如果能熟练掌握以下这些使用频率很高的脚本&#xff0c;那工作起来真的是手拿把攥&#xff0c;事半功倍&#xff0c;接下来给大家介绍一些我们日常使用率很高的一些脚本&#xff01; 常用脚本(Batchfile & VBScript) 1.一键启动.bat 一次…

Android device/xxx/system/common/overlay编译产物

MTK 如下代码编译的产物在 framework-res.apk 编译配置文件在device/mediatek/system/common/目录下的Android.bp device/mediatek/system/common/overlay/telephony/frameworks/base/core/res/res/values-mcc655-mnc01/config.xml 在Android U上面还在overlay目录中进行了产…

pytets测试框架中如果需要运行多个测试套件时pytest.ini文件设置

pytets测试框架中如果需要运行多个测试套件时pytest.ini文件设置方法 testpaths testcases/fenmi testcases/weixin testcases/Zgen

【数据结构与算法】对称矩阵,三角矩阵 详解

给出对称矩阵、三角矩阵的节省内存的存贮结构并写出相应的输入、输出算法。 对称矩阵和三角矩阵可以通过特殊的存储结构来节省内存。这种存储结构只存储矩阵的一部分元素&#xff0c;而不是全部元素。 对称矩阵&#xff1a;对于一个n阶对称矩阵&#xff0c;我们只需要存储主对…

【单片机毕业设计选题24014】-基于Arduino的瓜果采摘机构设计

系统功能: 使用MeArm机械臂, 上电后四路舵机处于初始90度位置&#xff0c;通过APP"蓝牙调试器"连接手机后可通过手机端控制四路舵机执行单独或连续的动作&#xff0c;并读取颜色传感器的RGB值。 主要功能模块原理图: 资料获取地址 系统主要功能模块代码 //存储电机…

备忘录文字颜色怎么改 备忘录改变字体颜色方法

在日常的工作和生活中&#xff0c;备忘录已经成为我不可或缺的好帮手。但是&#xff0c;面对满满当当的备忘录&#xff0c;有时候不同的任务和信息都混在一起&#xff0c;让人眼花缭乱。我常常想&#xff0c;如果能改变备忘录中的文字颜色&#xff0c;用以区分不同类别的事项&a…

如何打开pak文件-翻译pak语言包

最近碰到一些程序的语言包是pak格式&#xff0c;用Notepad打开全是乱码&#xff0c;百度搜索了一下&#xff0c;pak是一种少见的压缩文件格式&#xff0c;是pak Quake系列游戏所采用的一种特殊压缩包格式&#xff0c;由Quake游戏公司开发&#xff0c;用高版本的winrar可以打开&…

哪个城市的Delphier最多?Delphier平均年龄多大了?

先来看看哪个城市的Delphier最多&#xff1a; 北上广深不是白叫的&#xff0c; 大家想换工作&#xff0c;就去这些大城市&#xff0c;机会多。 有人会觉得奇怪&#xff0c;怎么才这么几个人&#xff1f; 因为以上数据统计基数为2000人&#xff0c; 根据微信公众号和QQ群得出…

好用耐用充电宝有哪些牌子?公认性能超全充电宝品牌

在共享充电宝遍布大街小巷的今天&#xff0c;许多人可能会觉得拥有一款自己的充电宝已经不再必要。然而&#xff0c;共享充电宝的高昂费用和不够便携的特点&#xff0c;让越来越多的消费者开始重新考虑购买一款属于自己的充电宝。那么&#xff0c;在众多充电宝品牌中&#xff0…