【Java】SpringBoot快速整合Kafka

目录

1.什么是Kafka?

主要特点和概念:

主要组成部分:

2.Kafka可以用来做什么?

3.SpringBoot整合Kafka步骤:

1. 添加依赖:

2. 配置 Kafka:

3. 创建 Kafka 生产者:

4. 创建 Kafka 消费者:

5. 发布消息:

6. 使用Postman进行测试:


如果你没有Kafka,可以参考这篇文章进行安装【Docker】手把手教你使用Docker搭建kafka【详细教程】_docker 安装kafka-CSDN博客

1.什么是Kafka?

        Kafka是一个开源的流式平台,用于构建实时数据流应用程序和实时数据管道。Kafka旨在处理大规模的数据流,具有高吞吐量、可扩展性、持久性和容错性的特点。

主要特点和概念:

  1. 发布-订阅模型: Kafka采用发布-订阅模型,数据生产者将消息发布到一个或多个主题(topics),而数据消费者则订阅这些主题以接收消息。

  2. 分布式架构: Kafka是一个分布式系统,允许横向扩展,通过分布式存储和分区机制来实现高吞吐量和可扩展性。

  3. 持久性存储: Kafka使用持久性存储来保留消息,可以在消息发送后保留一定的时间,确保消费者可以在需要时检索历史消息。

  4. 数据分区: 主题被划分为多个分区,每个分区可以在不同的服务器上,以实现并行处理和提高性能。

  5. 流式处理: Kafka提供了流处理功能,允许应用程序实时处理和分析数据流,执行复杂的事件处理操作。

  6. 高可用性: Kafka在集群中的多个节点之间复制数据,提高了系统的容错性和可用性。

  7. 数据保证: Kafka提供了不同级别的数据传递保证,包括至多一次、至少一次和精确一次语义。

  8. 生态系统: Kafka生态系统丰富,包括连接器(Connectors)、Kafka Streams、MirrorMaker等组件,用于与各种外部系统集成和实现各种应用场景。

主要组成部分:

  • Producer(生产者): 负责向Kafka主题发布消息。

  • Broker(代理): Kafka集群中的服务器,负责存储和管理消息。

  • Consumer(消费者): 订阅并处理Kafka主题中的消息。

  • Topic(主题): 消息的类别或标签,生产者将消息发布到主题,而消费者从主题订阅消息。

  • Partition(分区): 主题可以划分为多个分区,每个分区独立存储和处理消息。

2.Kafka可以用来做什么?

  1. 消息队列:

    场景: 在电子商务平台上,订单服务产生订单消息,并将其发布到Kafka主题。支付服务、物流服务等通过订阅相应主题,异步处理订单信息,实现订单处理的解耦和异步通信。

  2. 实时数据流处理:

    场景: 在在线广告平台上,使用Kafka Streams处理实时产生的广告点击数据。可以实时计算点击率、过滤无效点击、将数据与用户信息连接,以实现实时广告效果分析。

  3. 日志收集与分析:

    场景: 在一个大规模的云服务中,使用Kafka收集分布在不同服务器上的应用程序日志。日志分析服务通过消费Kafka主题,实时分析日志以监控系统性能、检测异常和进行故障排除。

  4. 事件溯源(Event Sourcing):

    场景: 在金融领域的交易系统中,使用Kafka追踪交易事件。每笔交易引发一个事件,将其发布到Kafka主题,以便在需要时进行审计、回溯和重新处理。

  5. 数据同步:

    场景: 在企业的分布式系统中,使用Kafka同步用户信息。用户服务在用户数据变更时将事件发布到Kafka主题,其他服务通过消费主题以保持用户数据同步。

  6. 消息广播:

    场景: 在社交媒体应用中,使用Kafka将用户发布的状态更新广播给其关注者。关注者通过订阅用户状态的Kafka主题,实现实时消息广播。

  7. 分布式应用解耦:

    场景: 在电子商务微服务架构中,购物车服务、订单服务、支付服务等通过Kafka进行异步通信。例如,购物车服务可以通过Kafka发布购物车更新的事件,订单服务通过订阅事件来处理相关订单逻辑。

  8. 大数据集成:

    场景: 在一个大数据处理流水线中,使用Kafka将产生的数据传输到Spark进行实时分析。生产者将数据发布到Kafka主题,而Spark应用程序通过订阅主题来接收实时数据。

  9. 实时推荐系统:

    场景: 在在线视频平台上,使用Kafka收集用户观看记录。推荐引擎通过消费Kafka主题,实时更新用户的个性化推荐列表,提高用户体验。

  10. 异步通信:

    场景: 在电商平台中,使用Kafka实现异步订单处理。当订单支付成功时,订单服务通过Kafka发布订单处理完成的消息,而邮件服务通过订阅该主题来异步发送订单确认邮件。

下面就使用SpringBoot整合kafka的发布订阅机制,实现消息的发布和订阅。

3.SpringBoot整合Kafka步骤:

1. 添加依赖:

确保在你的pom.xml文件中包含了Spring Boot和Spring Kafka的依赖。

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Spring Kafka Starter -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
</dependencies>

2. 配置 Kafka:

在application.properties或application.yml中配置 Kafka 连接信息。

spring:
  kafka:
    bootstrap-servers: your-kafka-server:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3. 创建 Kafka 生产者:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaMessageProducer {

    private static final String TOPIC = "admin-messages";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendAdminMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
    }
}

4. 创建 Kafka 消费者:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaMessageConsumer {

    @KafkaListener(topics = "admin-messages", groupId = "user-group")
    public void receiveAdminMessage(String message) {
        System.out.println("Received message: " + message);
        // ...
    }
}

5. 发布消息:

在管理员需要发布消息的地方调用KafkaMessageProducer的 sendAdminMessage 方法。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/message")
public class AdminController {

    @Autowired
    private KafkaMessageProducer kafkaMessageProducer;

    @GetMapping("/publish")
    public void publishAdminMessage(@RequestParam("messagemessage") String message) {
        kafkaMessageProducer.sendAdminMessage(message);
    }
}

        当调用 publishAdminMessage方法时,所有监听 admin-messages 主题的用户将会接收到相应的消息。

6. 使用Postman进行测试:

控制台输出结果:

这样就使用SpringBoot整合了Kafka并写了一个简单的案例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/270180.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【基础篇】二、字节码文件的组成 Arthas + jclasslib +javap

文章目录 1、jclasslib工具2、基础信息部分3、常量池部分4、方法部分&#xff08;从字节码指令看i&#xff09;5、三种1操作的性能对比6、javap -v命令7、jclasslib插件8、Arthas 1、jclasslib工具 字节码文件中保存的是源代码编译后的内容&#xff0c;以二进制方式存储&#…

智能优化算法应用:基于协作搜索算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于协作搜索算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于协作搜索算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.协作搜索算法4.实验参数设定5.算法结果6.…

人工智能_机器学习074_SVM支持向量机_软间隔与优化目标函数构建_C参数由来_惩罚误差点的惩罚度---人工智能工作笔记0114

然后我们接着上一节再来看一下这里我们说有个 min_faces_per_person = 0 这个可以看到如果我们写上0,就意味着要加载所有的人脸图片,就会花费的时间久对吧 我们可以试试,这里我们 min_faces_per_person = 0 改成0然后 我们等一会加载完了以后,我们用 display(X.shape,faces.sh…

OpenEuler安装内网穿透工具实现ssh连接openEuler系统

文章目录 1. 本地SSH连接测试2. openEuler安装Cpolar3. 配置 SSH公网地址4. 公网远程SSH连接5. 固定连接SSH公网地址6. SSH固定地址连接测试 本文主要介绍在openEuler中安装Cpolar内网穿透工具实现远程也可以ssh 连接openEuler系统使用. 欧拉操作系统(openEuler, 简称“欧拉”…

自媒体实战篇:剪辑软件应用与实操

剪辑软件应用与实操 剪映基础界面认识 素材面板 导入本地素材,剪映自带素材库,音频,文本等素材合集面板播放预览 预览本地素材,,剪映自带素材库以及时间线面板中的素材的实时效果时间线面板 对素材进行基础的编辑操作,调整素材轨道等素材功能面板 可对素材或者文本等精细…

【软件工程】可执行文件和数据分离

一、概述 可执行文件和数据分离是一种软件设计策略&#xff0c;旨在将程序代码和程序使用的数据分离存储。这种方法通常用于提高软件的模块化程度和灵活性&#xff0c;以及方便软件的管理和维护。 在可执行文件和数据分离中&#xff0c;程序代码通常以可执行文件的形式存储&a…

什么是数据分析思维

参考 一文学会如何做电商数据分析&#xff08;附运营分析指标框架&#xff09; 电子商务该如何做数据分析&#xff1f;如何数据分析入门&#xff08;从各项指标表象进入&#xff09; https://www.processon.com/outline/6589838c3129f1550cc69950 数据分析步骤 什么是数据分析…

C# Onnx yolov8 pokemon detection

目录 效果 模型信息 项目 代码 下载 C# Onnx yolov8 pokemon detectio 效果 模型信息 Model Properties ------------------------- date&#xff1a;2023-12-25T17:55:44.583431 author&#xff1a;Ultralytics task&#xff1a;detect license&#xff1a;AGPL-3.0 h…

uniapp创建/运行/发布项目

1、产生背景----跨平台应用框架 在移动端各大App盛行的时代&#xff0c;App之间的竞争也更加激烈&#xff0c;他们执着于让一个应用可以做多个事情 所以就应运而生了小程序&#xff0c;微信小程序、支付宝小程序、抖音小程序等等基于App本身的内嵌类程序。 但是各大App他不可…

Python能做大项目(6)Poetry -- 项目管理的诗和远方之一

[Poetry] 是一个依赖管理和打包工具。Poetry 的作者解释开发 Poetry 的初衷时说&#xff1a; 通过前面的案例&#xff0c;我们已经提出了一些问题。但不止于此。 当您将依赖加入到 requirements.txt 时&#xff0c;没有人帮你确定它是否与既存的依赖能够和平共处&#xff0c;这…

2023读书笔记57|《顾城诗选》——我们走进了夜海, 去打捞遗失的繁星

2023读书笔记57|《顾城诗选》——我们走进了夜海&#xff0c; 去打捞遗失的繁星 细雨&#xff0c;洗去空气中的浮尘&#xff0c; 薄暗里蜜酒散开阵阵醇香。 野蜂在风雨的摇荡中开始安眠&#xff0c; 带着无限甜美的梦想。 河岸边&#xff0c;开满了耀眼的冰花。 沙洲上&#x…

设计模式-生成器模式

设计模式专栏 模式介绍模式特点应用场景生成器模式和工厂模式的区别代码示例Java实现生成器模式Python实现生成器模式 生成器模式在spring中的应用 模式介绍 生成器模式是一种创建型模式&#xff0c;它的主要目的是将一个复杂对象的构建与它的表示分离&#xff0c;使得同样的构…

OCP NVME SSD规范解读-1

OCP&#xff08;Open Compute Project&#xff09;是一个由Facebook于2011年发起的开源项目。其目标是重新设计和优化数据中心的硬件&#xff0c;包括服务器、存储、网络设备等&#xff0c;以提高效率&#xff0c;降低运营成本&#xff0c;并推动技术的创新和标准化。 在OCP中&…

thinkphp6.0的workerman在PHP8.0下报错

一、我先升级了thinkphp6.0到最新版本&#xff1a; composer update topthink/framework二、结果提示我composer版本过低&#xff0c;需要升级到2&#xff0c;于是我又升级了composer composer self-update 三、我又升级了workerman: composer require topthink/think-work…

主机安全技术措施

目录 身份鉴别 进阶 访问控制 进阶 安全审计 进阶 ​编辑 剩余信息保护 入侵防范 进阶 恶意代码防范 资源控制 身份鉴别 进阶 访问控制 进阶 安全审计 进阶 剩余信息保护 入侵防范 进阶 恶意代码防范 资源控制 ~over~

【回溯】符号三角形问题Python实现

文章目录 [toc]问题描述回溯法时间复杂性Python实现 个人主页&#xff1a;丷从心 系列专栏&#xff1a;回溯法 问题描述 下图是由 14 14 14个“ ”和 14 14 14个“ − - −”组成的符号三角形&#xff0c; 2 2 2个同号下面都是” “&#xff0c; 2 2 2个异号下面都是“ −…

如何编写高效清晰的嵌入式C程序

作为嵌入式工程师&#xff0c;怎么写出效率高、思路清晰的C语言程序呢? 要用C语言的思维方式来进行程序的构架构建 要有良好的C语言算法基础&#xff0c;以此来实现程序的逻辑构架 灵活运用C语言的指针操作 虽然看起来以上的说法很抽象&#xff0c;给人如坠雾里的感觉&…

智能优化算法应用:基于厨师算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于厨师算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于厨师算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.厨师算法4.实验参数设定5.算法结果6.参考文献7.MA…

reactive和TypeScript标注数据类型-ts使用方法

一、vite项目中<script setup lang"ts"> : lang"ts" 是表明支持ts校验&#xff08;ts 全称typescript,是es6语法&#xff0c;是javascript的超集强类型编程语言&#xff0c;类似java&#xff0c;定义变量类型后&#xff0c;赋值类型不一致&#xff0…

网站管理员应该知道的:一款免费、简单、强大的 WAF(雷池社区版)

作为网站管理员&#xff0c;一定会关注网站是否安全&#xff0c;是否能够抵御黑客的攻击&#xff0c;是否能够保护数据和用户。可能已经听说过 WAF&#xff08;Web Application Firewall&#xff0c;Web 应用防火墙&#xff09;&#xff0c;一种能够在应用层对 Web 流量进行检测…