使用微服务Spring Cloud集成Kafka实现异步通信

在微服务架构中,使用Spring Cloud集成Apache Kafka来实现异步通信是一种常见且高效的做法。Kafka作为一个分布式流处理平台,能够处理高吞吐量的数据,非常适合用于微服务之间的消息传递。

微服务之间的通信方式包括同步通信和异步通信。

1)同步通信:通常通过HTTP RESTful API或RPC(远程过程调用)实现。服务消费者通过发送HTTP请求到服务提供者,服务提供者处理请求后返回响应。这种方式简单直接,但可能会受到网络延迟和并发量的影响。

同步通信的实现代码参见博文:微服务3:微服务间接口远程调用(同步通信方式)-CSDN博客

2)异步通信:通过消息队列(如RabbitMQ、Kafka等)实现。服务消费者将消息发送到队列中,服务提供者从队列中拉取消息并进行处理。这种方式实现了服务之间的解耦,提高了系统的可扩展性和容错性。但也需要考虑消息的顺序性、一致性和可靠性等问题。

1、本文目标

本文的目标是使用微服务Spring Cloud集成Kafka实现异步通信。本文实现了一个简单的Kafka Producer微服务,连接至部署再Ubuntu系统上的Kafka Server,同时在Ubuntu通过命令行终端启动一个监听的消费者,当通过浏览器测试接口想Kafka Producer微服务发送一条消息,Kafka Producer微服务即刻将该消息发送至Ubuntu系统上的Kafka Server,同时在Kafka consumer终端上可收到并显示出该消息。具体系统架构如下图所示。

部署Kafka Server和Kafka consumer,参见博文:Ubuntu下Kafka安装及使用-CSDN博客

Eureka注册中心的实现,参见博文:

微服务1:搭建微服务注册中心(命令行简易版,不使用IDE)-CSDN博客

2、创建Kafka Producer

mvn archetype:generate -DgroupId=com.test -DartifactId=microservice-kafka -DarchetypeArtifactId=maven-archetype-quickstart

完整代码的目录如下:

编辑pom.xml,添加依赖包:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  
  <groupId>com.test</groupId>
  <artifactId>microservice-kafka</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>microservice-kafka</name>
  <url>http://maven.apache.org</url>
  
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.0.RELEASE</version>
    <relativePath/> 
  </parent>
  
  <dependencies>
  	<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>         
	 
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>


  </dependencies>
  
    <dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.SR4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>               
    </dependencies>
  </dependencyManagement>
  
  <build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
  
</project>

编辑application.yml,配置kafka:

bootstrap-servers: 192.168.23.131:9092其中192.168.23.131是Kafka Server的IP地址。

server:
  port: 8020
spring:
  application:
    name: microservice-kafka
  kafka:
    bootstrap-servers: 192.168.23.131:9092
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8080/eureka/
  instance:
    prefer-ip-address: true            

App.java的完整代码如下:

package com.test;

import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;


@SpringBootApplication
@EnableDiscoveryClient
public class App 
{
    public static void main( String[] args )
    {
        System.out.println( "Hello World!" );
        SpringApplication.run(App.class, args);
    }
}

KafkaController.java的完整代码如下:

package com.test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.kafka.core.*;



@RequestMapping("/kafka")
@RestController
public class KafkaController {
	
	  @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
 
    @GetMapping("sendMsg")
    public String helloProducer(String msg){
        kafkaTemplate.send("mydemo1",msg);
        return "ok";
    }

}




启动Kafka Producer 和Eureka

mvn spring-boot:run

3、启动Kafka Server及Consumer

bin/kafka-server-start.sh config/server.properties&

创建主题

./bin/kafka-topics.sh --create --bootstrap-server demo1:9092 --replication-factor 1 --partitions 1 --topic mydemo1

在命令行终端启动消费者

bin/kafka-console-consumer.sh --bootstrap-server demo1:9092 --topic mydemo1

4、浏览器测试

在浏览器输入:

http://localhost:8020/kafka/sendMsg?msg=测试消息testmsg

此时在Ubuntu的Consumer终端可以看到从浏览器输入的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/888438.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【CTF Web】Pikachu CSRF(get) Writeup(CSRF+GET请求+社会工程学)

CSRF(跨站请求伪造)概述 Cross-site request forgery 简称为“CSRF”&#xff0c;在CSRF的攻击场景中攻击者会伪造一个请求&#xff08;这个请求一般是一个链接&#xff09;&#xff0c;然后欺骗目标用户进行点击&#xff0c;用户一旦点击了这个请求&#xff0c;整个攻击就完成…

vmstat命令:系统性能监控

一、命令简介 ​vmstat​ 是一种在类 Unix 系统上常用的性能监控工具&#xff0c;它可以报告虚拟内存统计信息&#xff0c;包括进程、内存、分页、块 IO、陷阱&#xff08;中断&#xff09;和 CPU 活动等。 ‍ 二、命令参数 2.1 命令格式 vmstat [选项] [ 延迟 [次数] ]2…

docker快速上手

一个轻量的虚拟机&#xff0c;让程序员不再纠结于环境部署&#xff0c;更多集中于代码编写&#xff0c;基础建设&#xff0c;开发 作用&#xff1a; 打包&#xff1a;把你软件运行所需的所有东西打包到一起 分发&#xff1a;把你打包好的“安装包”上传到一个镜像仓库&#…

渲染技术的教育普及,塑造未来视觉艺术与技术的璀璨星辰

在数字时代的浪潮中&#xff0c;渲染技术作为连接创意与现实的桥梁&#xff0c;正以前所未有的速度推动着视觉艺术与技术领域的融合与发展。从电影特效的震撼呈现到游戏世界的细腻构建&#xff0c;从广告设计的视觉冲击力到建筑设计方案的直观展示&#xff0c;渲染技术无处不在…

css 简单网页布局——浮动(一)

1. 三种布局方式 1.1 标准流 1.2 浮动的使用 1.3 简述浮动 1.3.1 浮动三大特性 <style>.out {border: 1px red solid;width: 1000px;height: 500px;}.one {background-color: aquamarine;width: 200px;height: 100px;}.two {background-color: blueviolet;width: 200px;h…

『网络游戏』窗口基类【06】

创建脚本&#xff1a;WindowRoot.cs 编写脚本&#xff1a; 修改脚本&#xff1a;LoginWnd.cs 修改脚本&#xff1a;LoadingWnd.cs 修改脚本&#xff1a;ResSvc.cs 修改脚本&#xff1a;LoginSys.cs 运行项目 - 功能不变 本章结束

【AI知识点】批归一化(Batch Normalization)

批归一化&#xff08;Batch Normalization&#xff0c;BN&#xff09; 是一种用于加速神经网络训练并提高模型稳定性的方法&#xff0c;通过在每一层对神经网络中的激活值进行标准化&#xff0c;使得每一层的输入保持在一个稳定的分布中&#xff0c;从而缓解梯度消失和梯度爆炸…

Chromium 搜索引擎功能浅析c++

地址栏输入&#xff1a;chrome://settings/searchEngines 可以看到 有百度等数据源&#xff0c;那么如何调整其顺序呢&#xff0c;此数据又存储在哪里呢&#xff1f; 1、浏览器初始化搜索引擎数据来源在 components\search_engines\prepopulated_engines.json // Copyright …

机器学习-支撑向量机SVM

Support Vector Machine 离分类样本尽可能远 Soft Margin SVM scikit-learn中的SVM 和kNN一样&#xff0c;要做数据标准化处理&#xff01; 涉及距离&#xff01; 加载数据集 import numpy as np import matplotlib.pyplot as plt from sklearn import datasetsiris datas…

Debezium日常分享系列之:Debezium 3.0.0.Final发布

Debezium日常分享系列之&#xff1a;Debezium 3.0.0.Final发布 Debezium 核心的变化需要 Java 17基于Kafka 3.8 构建废弃的增量信号字段的删除每个表的详细指标 MariaDB连接器的更改版本 11.4.3 支持 MongoDB连接器的更改MongoDB sink connector MySQL连接器的改变MySQL 9MySQL…

【图论】迪杰特斯拉算法

文章目录 迪杰特斯拉算法主要特点基本思想算法步骤示例 实现迪杰斯特拉算法基本步骤算法思路 总结 迪杰特斯拉算法 迪杰特斯拉算法是由荷兰计算机科学家艾兹赫尔迪杰特斯拉&#xff08;Edsger W. Dijkstra&#xff09;在1956年提出的&#xff0c;用于解决单源最短路径问题的经…

命令行py脚本——Linux下方便快捷地运行*.py脚本

命令行参数传递&#xff0c;shell批指令和命令别名。 (笔记模板由python脚本于2024年10月08日 12:25:54创建&#xff0c;本篇笔记适合喜欢python和Linux的coder翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free&#xff1a;大咖免费“圣…

Docker:快速部署

docker安装&#xff1a; ​‌​‬&#xfeff;​&#xfeff;⁠​‍‬​‍‬‬‌​‬‬‬​&#xfeff;⁠​‍​​‌‬‌&#xfeff;​​​​​​‌​​​​⁠​‍⁠‌安装Docker - 飞书云文档 (feishu.cn) docker命令解读 docker run -d \ > --name mysql \ > -p 33…

【bug】finalshell向远程主机拖动windows快捷方式导致卡死

finalshell向远程主机拖动windows快捷方式导致卡死 问题描述 如题&#xff0c;作死把桌面的快捷方式拖到了finalshell连接的服务器面板中&#xff0c;导致finalshell没有响应&#xff08;小概率事件&#xff0c;有时会触发&#xff09; 解决 打开任务管理器查看finalshell进…

SpringBoot Jar 包加密防止反编译

今天看到了一个说明jar包加密的实现方式&#xff0c;特意试了下效果&#xff0c;并下载了插件源码及实现源码查看了下子&#xff0c;感兴趣的可以在最后得到gitee地址。 SpringBoot 程序 Jar 包加密的方式&#xff0c;通过代码加密可以实现无法反编译。应用场景就是当需要把公司…

RK3568笔记六十四:SG90驱动测试

若该文为原创文章,转载请注明原文出处。 前面有测试过PWM驱动,现在使用两种方式来产生PWM驱动SG90,实现舵机旋转任意角度 方法一:使用硬件PWM 方法二:使用高精度定时器,GPIO模拟PWM. 一、PWM子系统框架 二、SG90控制方法 舵机的控制需要MCU产生一个周期为20ms的脉冲信号…

(Linux驱动学习 - 8).信号异步通知

一.异步通知简介 1.信号简介 信号类似于我们硬件上使用的“中断”&#xff0c;只不过信号是软件层次上的。算是在软件层次上对中断的一种模拟&#xff0c;驱动可以通过主动向应用程序发送信号的方式来报告自己可以访问了&#xff0c;应用程序获取到信号以后就可以从驱动设备中…

【JavaEE】【多线程】Thread类讲解

目录 Thread构造方法Thread 的常见属性创建一个线程获取当前线程引用终止一个线程使用标志位使用自带的标志位 等待一个线程线程休眠线程状态线程安全线程不安全原因总结解决由先前线程不安全问题例子 Thread构造方法 方法说明Thread()创建线程对象Thread(Runnable target)使用…

Web3 游戏周报(9.22 - 9.28)

回顾上周的区块链游戏概况&#xff0c;查看 Footprint Analytics 与 ABGA 最新发布的数据报告。 【9.22-9.28】Web3 游戏行业动态&#xff1a; Axie Infinity 将 Fortune Slips 的冷却时间缩短至 24 小时&#xff0c;从而提高玩家的收入。 Web3 游戏开发商 Darkbright Studios…

使用sponge+dtm快速搭建一个高性能的电商系统,秒杀抢购和订单架构的设计与实现

本文将展示如何使用 Sponge 框架快速创建一个简易版高性能电商系统&#xff0c;主要实现秒杀抢购和订单功能&#xff0c;并通过分布式事务管理器 DTM 来确保数据一致性。电商系统的架构图如下&#xff1a; 这是源码示例eshop&#xff0c;目录下包括了两个一样的代码示例&#x…