SpringBoot整合SpringCloudStream3.1+版本Kafka

SpringBoot整合SpringCloudStream3.1+版本Kafka

下一节直通车

SpringBoot整合SpringCloudStream3.1+版本的Kafka死信队列

为什么用SpringCloudStream3.1

  1. Springcloud架构提供,基于spring生态
  2. 能够快速切换市面上常见的MQ产品
  3. 3.1后使用配置文件的形式定义channel,不再需要3.1前的硬编码

Jar

<!--SpringCloudStream Kafka-->
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-kafka</artifactId>
	<version>3.2.4</version>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream</artifactId>
	<version>3.2.4</version>
</dependency>
<!--Kafka相关-->
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>2.8.6</version>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
	<version>2.6.8</version>
</dependency>
<dependency>
	<groupId>org.projectlombok</groupId>
	<artifactId>lombok</artifactId>
	<version>1.18.24</version>
</dependency>

配置文件yml

kafka基本配置(application-mq.yml)

server:
  port: 7105
spring:
  application:
	name: betrice-message-queue
  config:
	import:
	- classpath:application-bindings.yml
  cloud:
	stream:
	  kafka:
		binder:
		  brokers: localhost:9092
		  configuration:
			key-serializer: org.apache.kafka.common.serialization.StringSerializer
			value-serializer: org.apache.kafka.common.serialization.StringSerializer
		  consumer-properties:
			enable.auto.commit: false
	  binders:
		betrice-kafka:
		  type: kafka
		  environment:
			spring.kafka:
			  bootstrap-servers: ${spring.cloud.stream.kafka.binder.brokers}

classpath:application-bindings.yml 引入通道绑定配置文件,即消息生产、消费者的关系。

通道绑定配置(application-bindings.yml)

参数含义:
spring.cloud.stream.function.definition:定义channel名字,每个channel又可以作为生产者(in)与消费者(out)
spring.cloud.stream.bindings: 是一个map形式(具体看源码的Properties定义)
1. destination(生产/消费者通向的topic);
2. group:消费者组名;
3. binder:绑定当前使用的MQ类型(见betrice-kafka);
4. content-type:消息序列/反序列化的类型(见源码的支持的类型)

spring:
  cloud:
	stream:
	  betrice-default-binder: betrice-kafka
	  function:
		# 声明两个channel,transfer接收生产者的消息,处理完后给sink
		definition: transfer;sink;gather;gatherEcho
	  bindings:
		# 添加生产者binding,输出到destination对应的topic
		Evad05:
		  destination: Evad07
		  binder: ${spring.cloud.stream.betrice-default-binder}
		transfer-in-0:
		  destination: Evad07
		  binder: ${spring.cloud.stream.betrice-default-binder}
		transfer-out-0:
		  destination: Evad08
		  binder: ${spring.cloud.stream.betrice-default-binder}
		  content-type: text/plain
		sink-in-0:
		  destination: Evad08
		  binder: ${spring.cloud.stream.betrice-default-binder}
		  content-type: text/plain

Controller

/**
 * @author Evad.Wu
 * @Description 消息队列 控制类
 * @date 2023-02-10
 */
@Slf4j
@RestController
@RequestMapping(value = "betriceMqController")
public class BetriceMqController {
	@Resource(name = "betriceKafkaProducer")
	private Producer<String, String> producer;
	@Resource(name = "streamBridgeUtils")
	private StreamBridge streamBridge;

	@PostMapping("send")
	public void send(String topic, String key, String message) {
		try {
			producer.send(new ProducerRecord<>(topic, key, message));
			log.info("发送消息:" + message);
		} catch (Exception e) {
			log.error("异常消息:" + e);
		}
	}

	@PostMapping("streamSend")
	public void streamSend(String topic, String message) {
		try {
			streamBridge.send(topic, message);
			log.info("发送消息:" + message);
		} catch (Exception e) {
			log.error("异常消息:" + e);
		}
	}
}

Channel(生产、消费者通道)

/**
 * @author Evad.Wu
 * @Description mq消息通道 配置类
 * @date 2023-02-11
 */
@Configuration
public class BetriceMqSubChannel {
	/**
	 * Function方式声明binding,相当于同时声明了一个Producer的Bindng和一个Consumer的Binding。
	 * transfer-in-0 表示消费者  transfer-out-0 表示生产者
	 * 其作用就在于,echo-in-0收到的消息,立即就会通过echo-out-0发送出去。
	 */
	@Bean
	public Function<String, String> transfer() {
		return message -> {
			System.out.println("transfer: " + message);
			throw new RuntimeException("死信队列测试!");
//            return "transfer:" + message;
		};
	}

	/**
	 * Consumer声明一个消息消费者,sink1就对应sink-in-0
	 */
	@Bean
	public Consumer<String> sink() {
		return message -> {
			System.out.println("******************");
			System.out.println("At Sink1");
			System.out.println("******************");
			System.out.println("Received message " + message);
		};
	}
}

结果

在这里插入图片描述

参考网址

SpringCloudStream实战拆解以及3.1后新版本特性分析
@EnableBinding @deprecated 自 3.1 起支持函数式编程模型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/42434.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

# Linux下替换删除文件中的颜色等控制字符的方法

Linux下替换删除文件中的颜色等控制字符的方法 文章目录 Linux下替换删除文件中的颜色等控制字符的方法1 Linux下的控制字符&#xff08;显示的文字并不是他本身&#xff09;&#xff1a;2 颜色字符范例&#xff1a;3 替换4 最后 我们在shell编程显示输出时&#xff0c;会定义文…

Linux的时间函数

2023年7月19日&#xff0c;周三下午 我今天基于GitHub搭建了自己的博客网站&#xff0c;欢迎大家来我的个人博客网站阅读我的博客 巨龙之路的GitHub个人博客 (julongzhilu.github.io) 目录 time函数原型使用方法ctime函数原型使用方法疑惑gmtime、 localtime函数原型什么是分…

WEB:FlatScience

背景知识 sql注入 SQLite数据库知识 SQLite3注入方法 题目 用dirsearch进行扫描&#xff0c;下面几个关键目录&#xff1a;robots.txt&#xff0c;login.php&#xff0c;admin.php&#xff0c;剩下的目录就是一些pdf格式的论文了 一个一个访问并查看源代码&#xff0c;在查看l…

常用API学习06(Java)

Biglnteger public BigInteger(int num, Random rnd) 获取随机大整数&#xff0c;范围&#xff1a;[0~2的num次方-1] public BigInteger(String val) 获取指定的大整数 public BigInteger(String val, int radix) 获取指定进制的大整数 public static BigInteg…

怎么给pdf文件加密?pdf文档如何加密

在数字化时代&#xff0c;保护个人和机密信息的重要性越来越受到关注。PDF&#xff08;Portable Document Format&#xff09;是一种广泛使用的文件格式&#xff0c;用于共享和存储各种类型的文档。然而&#xff0c;由于其易于编辑和复制的特性&#xff0c;保护PDF文件中的敏感…

一张表实现短视频“评论区“完整功能

前言 现如今&#xff0c;不管是哪种类型的应用&#xff0c;评论区都少不了。从工具类的到媒体信息流类的&#xff0c;评论留言都是最基本的互动环节。比如抖音短视频下&#xff0c;针对视频每个用户都可以发表自己的观点&#xff1b;而针对用户的评论&#xff0c;其他的用户又可…

Spring Cloud—GateWay之限流

RequestRateLimiter RequestRateLimiter GatewayFilter 工厂使用 RateLimiter 实现来确定是否允许当前请求继续进行。如果不允许&#xff0c;就会返回 HTTP 429 - Too Many Requests&#xff08;默认&#xff09;的状态。 这个过滤器需要一个可选的 keyResolver 参数和特定于…

Spring @RequestMapping 工作原理

Spring RequestMapping 工作原理 配置基础启动类及Controller类 SpringBootApplication public class DemoServiceApplication {public static void main(String[] args) {SpringApplication.run(DemoServiceApplication.class, args);} }RestController public class HelloC…

单元测试用例到底该如何设计?

目录 前言 使用参数方法创建测试用例 使用执行路径方法创建测试用例 总结 前言 最近一些大公司在进行去测试化的操作&#xff0c;这一切的根源大概可以从几年前微软一刀切砍掉所有内部正式的测试人员开始说起&#xff0c;当时微软内部的测试工程师有一部分转职成了开发工程…

【数据结构常见七大排序(三)上】—交换排序篇【冒泡排序】And【快速排序】

目录 前言 1.冒泡排序 1.1冒泡排序动图 1.2冒泡排序源代码 1.3冒泡排序的特性总结 2.快速排序&#x1f451; 2.1hoare版本实现思想 排序前 排序中 排序后 2.2hoare版本快排源代码 2.3分析先走 情况1&#x1f947; 情况2&#x1f948; 前言 交换类排序两个常见的排…

嵌入式工程师常用的软件工具推荐

前言&#xff1a;常言道&#xff1a;工欲善其事&#xff0c;必先利其器。作为一名合格的嵌入式工程师&#xff0c;日常可能需要接触和处理各种奇奇怪怪的问题&#xff0c;这时候一款高适配性的工具将会令工作效率大大提升。作者根据个人的实际使用情况与粉丝的客观感受&#xf…

紧跟国家“新能源+”模式!涂鸦智慧能源方案助力夏季用电节能提效

“今天的你是几分熟&#xff1f;” 今年夏天&#xff0c;高温来得比往年更早&#xff0c;五六月份就提前开启了滚滚热浪模式&#xff0c;京津冀和山东等地最高气温也一度突破了历史极值。在提前到来的高温“烤”验下&#xff0c;全社会供电能力面临着极大挑战。 中国电力网预…

完整的电商平台后端API开发总结

对于开发一个Web项目来说&#xff0c;无论是电商还是其他品类的项目&#xff0c;注册与登录模块都是必不可少的&#xff1b;注册登录功能也是我们在日常生活中最长接触的&#xff0c;对于这个业务场景的需求与逻辑大概是没有什么需要详细介绍的&#xff0c;市面上常见的邮箱注册…

PSP - Jackhmmer 搜索 EMBL 序列数据库的相似序列

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/131817060 EMBL (European Molecular Biology Laboratory&#xff0c;欧洲分子生物实验室)&#xff1a;EMBL 数据库是一个由欧洲生物信息学研究所…

第六届字节跳动青训营报录比(宣传大使)

统计 前端基础卷&#xff1a;105 前端基础班&#xff1a;120-22(笔试不过基础班&#xff0c;宣传大使奖励进入&#xff09;98 前端进阶卷&#xff1a;77 前端进阶班&#xff1a;18-216 后端基础卷&#xff1a;151 后端基础班&#xff1a;220 后端进阶卷&#xff1a;133 后端进…

LeetCode·每日一题·1851. 包含每个查询的最小区间·优先队列(小顶堆)

题目 示例 思路 离线查询&#xff1a; 输入的结果数组queries[]是无序的。如果我们按照输入的queries[]本身的顺序逐个查看&#xff0c;时间复杂度会比较高。 于是&#xff0c;我们将queries[]数组按照数值大小&#xff0c;由小到大逐个查询&#xff0c;这种方法称之为离线查询…

《微服务架构设计模式》第十二章 部署微服务应用

内容总结自《微服务架构设计模式》 部署微服务应用 一、部署模式分类二、编程语言特定的发布包格式1、概述2、利弊 三、将服务部署为虚拟机1、概览2、利弊 四、将服务部署为容器1、概述2、利弊3、K8S部署 五、Serverless部署1、概述2、利弊3、示例 六、总结 一、部署模式分类 …

视频融合平台EasyCVR级联后上级平台播放失败的问题排查与优化

EasyCVR视频融合平台基于云边端智能协同架构&#xff0c;具有强大的数据接入、处理及分发能力&#xff0c;平台可提供视频监控直播、云端录像、云存储、录像检索与回看、智能告警、平台级联、云台控制等视频能力与服务&#xff0c;可支持多协议、多类型的海量设备接入与分发。 …

7、PHP语法要点2

1、or 和 ||&#xff0c;&& 和 and 都是逻辑运算符&#xff0c;效果一样&#xff0c;但是其优先级却不一样。&&、||的优先级在赋值运算符之前&#xff0c;or和and在赋值运算符之后。 2、字符串变量及数组可以在echo输出时双引号内、双引号外均可引用&#xff…

Meta提出全新参数高效微调方案,仅需一个RNN,Transformer模型GPU使用量减少84%!

近来&#xff0c;随着ChatGPT和GPT-4模型的不断发展&#xff0c;国内外互联网大厂纷纷推出了自家的大语言模型&#xff0c;例如谷歌的PaLM系列&#xff0c;MetaAI的LLaMA系列&#xff0c;还有国内公司和高校推出的一些大模型&#xff0c;例如百度的文心一言&#xff0c;清华的C…