使用SpringBoot对接Kafka

Kafka是什么,以及如何使用SpringBoot对接Kafka

一、Kafka与流处理

我们先来看看比较正式的介绍:Kafka是一种流处理平台,由LinkedIn公司创建,现在是Apache下的开源项目。Kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等优点,使其成为了流处理和实时数据管道的首选解决方案

介绍其实是比较清晰的,如果你是第一次接触“流处理”概念,我们也可以做一点解释,流处理指的是对连续、实时产生的数据流进行实时处理、计算和分析的过程。

假设你正在玩一款在线游戏,其他玩家的动作和游戏事件会实时地传到服务器上。这些事件就形成了一条数据流。在流处理中,我们会对这条数据流进行实时处理,例如计算每个玩家的分数、监控游戏区域内的异常情况、统计玩家在线时长等等。这样,游戏管理员就可以实时地监控和管理游戏,而不需要等到游戏结束才进行操作。
类似的,流处理还可以应用在其他实时性要求比较高的场景中,例如金融交易、物联网、实时监测等。通过对数据流进行实时处理,我们可以更加精准地掌握数据变化的情况,并及时做出反应和调整,

二、Spring Boot与Kafka的整合Demo

1. 新建springboot工程

如果你没有现成的Spring boot项目,那么我们可以使用IDEA自带的Spring Initializr 来创建一个spring-boot的项目

此时我们可以直接选择使用Apache Kafka,另外项目还可以加个Spring Web准备让前台调用

2. 添加Kafka依赖

如果你不是像上述一样新建的项目,那你也可以选择在已有的Spring Boot应用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依赖:

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.8.11</version>
</dependency>

3. 配置Kafka

在application.properties文件中添加以下配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

这里我们指定了Kafka服务器的地址和端口,并配置了消费者组的ID,关于消费者组的概念,其实就是某一些消费者具备相同的功能,因此会把他们设为同一个消费者组,这样他们就不会重复消费同一条消息了。更具体地原理,我们会在之后地篇章中介绍。

4. 创建Kafka生产者

在Kafka中,生产者是发送消息的应用程序或服务。在Spring Boot中,我们可以使用KafkaTemplate类来创建Kafka生产者

package com.zhanfu.kafkademo.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaService {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("test_topic", message);
    }
}

这里我们使用@Autowired注解来自动注入KafkaTemplate,并使用send方法将消息发送到名为“test_topic”的Kafka主题中。

5. 创建Kafka消费者

在Kafka中,消费者是接收并处理订阅主题消息的应用程序或服务。在Spring Boot中,我们可以使用@KafkaListener注解来创建Kafka消费者。

package com.zhanfu.kafkademo.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaLis {

    @KafkaListener(topics = "test_topic", groupId = "test_group")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

6. 应用程序入口

现在我们已经完成了Spring Boot和Kafka的整合。我们可以启动Spring Boot应用程序,然后发送消息并消费它,以测试我们的应用程序是否正确地与Kafka集成。

package com.zhanfu.kafkademo.controller;

import com.zhanfu.kafkademo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private KafkaService kafkaService;

    @GetMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) {
        kafkaService.sendMessage(message);
        return "Message sent successfully";
    }
}

在这个例子中,我们使用@Autowired注解来自动注入KafkaProducer,并通过发送消息的方法来调用sendMessage方法。最终项目整体框架如图:

三、启动与验证

首先自然是启动 Kafka ,然后是启动我们的Spring Boot项目

然后在浏览器中输入

http://127.0.0.1:8080/send/hello

最后检查我们的项目日志:

可以看到,整个发送和接收的流程都走通了

四、KafkaTemplate 介绍

不难看出,在Springboot中,使用kafka的关键在于 KafkaTemplate, 它是 Spring 提供的 Kafka 生产者模版,用于向 Kafka 集群发送消息。并且把 Kafka 的生产者客户端封装成了一个 Spring Bean,提供更加方便易用的 API。

它有三个主要属性:

        producerFactory:生产者工厂类,用于创建 KafkaProducer 实例。
        defaultTopic:默认主题名称,如果在发送消息时没有指定主题名称,则使用该默认主题。
        messageConverter:消息转换器,用于将消息对象转换为 Kafka ProducerRecord

它的主要方法:

        send(ProducerRecord<K,V> record):向指定的 Kafka 主题发送一条消息。ProducerRecord 包含了主题名称、分区编号、Key 和 Value 等信息。
        send(String topic, V data):向指定的 Kafka 主题发送一条消息。
        send(String topic, K key, V data):向指定的 Kafka 主题发送一条消息,并指定消息的 Key。
        execute(ProducerCallback<K,V> callback):使用回调方式发送消息,可以自定义消息的创建过程和错误处理过程。
        inTransaction():启用事务,多个 send 方法调用将被包装在一个事务中,保证 Kafka 事务的原子性。

除了上述方法外,KafkaTemplate 还提供了其他方法,如 sendDefault()sendOffsetsToTransaction() 等,可以根据实际需要进行选择和使用。

需要注意的是,在使用 KafkaTemplate 发送消息时应该注意消息的序列化方式、主题和分区的选择以及错误处理等问题,以保证消息的可靠性和正确性。

当然,很多同学可能还注意到一个细节,我们在上面的Demo中,我们直接将其 @Autowired进我们的代码中,这是怎么做到的呢?换句话说,这个 KafkaTemplate 为什么自己就会被spring 容器管理的呢?其实这得益于SpringBoot中对Kafka有了很多自动配置的内容。如下:

如上图,相信对Spring Boot熟悉的同学看到 ConditionalOnClass ConditionalOnMissingBean 应该就明白了。其实Spring Boot 早就贴心的为我们预留了这些自动配置,只要我们引入了 spring-kafka 包,使得项目中出现了 KafkaTemplate 类,那么它就能被自动配置并存入Spring 容器内

总结
        今天我们通过一个Demo讲解了在SpringBoot中如何对接Kafka,也介绍了下关键类 KafkaTemplate ,得益于Spring Boot 的自动配置,开发者要做的配置内容其实并不多,使用也主要是依赖其提供的API,相对简单,相信大家很容易也都学会了,那么在后面的过程中,我们将继续学习其使用,并且会着重讲解 Kafka 的原理与结构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/713365.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

VMware Workstation安装及使用详细教程

如何安装VMware Workstation的详细教程 一、准备工作 1. 下载VMware Workstation&#xff1a; 访问VMware官方网站&#xff0c;找到VMware Workstation的下载页面。根据您的操作系统&#xff08;Windows或macOS&#xff09;选择相应的版本进行下载。确保您的计算机满足VMwar…

牛客小白月赛96 解题报告 | 珂学家

前言 题解 A. 最少胜利题数 签到 n1 len(set(input())) n2 len(set(input()))if n1 < n2:n1, n2 n2, n1print (-1 if n1 6 else n1 - n2 1)B. 最少操作次数 思路: 分类讨论 只有-1,0,1,2这四种结果 特判 01, 10 n int(input()) s input()# 枚举 from collectio…

vue之一键部署的shell脚本和它的点.bat文件、海螺AI、ChatGPT

MENU 前言vite.config.ts的配置deploy文件夹的其他内容remote.shpwd.txtdeploy.bat 前言 1、在src同级新建deploy.bat文件&#xff1b; 2、在src同级新建deploy文件夹&#xff0c;文件夹中新建pwd.txt和remote.sh文件&#xff1b; 3、配置好后&#xff0c;直接双击deploy.bat文…

Java_FileIO流

存储数据的方案 有些数据想长久保存起来&#xff0c;咋整&#xff1f; 文件时非常重要的存储方式&#xff0c;在计算机硬盘中。 即便断电&#xff0c;或者程序终止了&#xff0c;存储在硬盘文件中的数据也不会丢失。 File File 是Java.io.包下的类&#xff0c;File类对象&…

C++ string字符串的使用和简单模拟实现

目录 前言 1. string简介 2. string的使用和简单模拟实现 2.1 string类的定义 2.2 string(),~string()和c_str() 2.2 size&#xff0c;重载符号[ ]&#xff0c;begin和end函数 2.3 push_back&#xff0c;reserve&#xff0c;append&#xff0c;运算符重载 2.4 insert和…

DDPM公式推导(三)

2 Background 扩散模型【53】是一种以 p θ ( x 0 ) : ∫ p θ ( x 0 : T ) d x 1 : T p_\theta\left(\mathbf{x}_0\right):\int p_\theta\left(\mathbf{x}_{0: T}\right) d \mathbf{x}_{1: T} pθ​(x0​):∫pθ​(x0:T​)dx1:T​ 形式的潜在变量模型&#xff0c;其中 x 1…

机器真的能思考、学习和智能地行动吗?

In this post, were going to define what machine learning is and how computers think and learn. Were also going to look at some history relevant to the development of the intelligent machine. 在这篇文章中&#xff0c;我们将定义机器学习是什么&#xff0c;以及…

BerkeleyDB练习

代码; #include <db.h> #include <stdio.h>int main() {DB *dbp;db_create(&dbp, NULL, 0);printf("Berkeley DB version: %s\n", db_version(NULL, NULL, NULL));dbp->close(dbp, 0);return 0; } 编译运行

Android studio在Ubuntu桌面上 创建桌面图标,以及导航栏图标

Android studio在Ubuntu桌面上 创建桌面图标&#xff0c;以及导航栏图标 1. 下载Android studio for Lunux 免安装版本之后&#xff0c;解压 2. 通过控制台运行 ~/Documents/android-studio-2024.1.1.2-linux/android-studio/bin$ ./studio.sh 3. 选择菜单&#xff0c;Tools…

1586. 扫地机器人

问题描述 Mike同学在为扫地机器人设计一个在矩形区域中行走的算法,Mike是这样设计的:先把机器人放在出发点 (1,1)(1,1) 点上,机器人在每个点上都会沿用如下的规则来判断下一个该去的点是哪里。规则:优先向右,如果向右不能走(比如:右侧出了矩形或者右侧扫过了)则尝试向…

基于51单片机的烟雾报警器设计-ADC0809

一.硬件方案 火灾报警器采用51单片机为核心控制器&#xff0c;利用气体传感器MQ-2、ADC0809模数转换器、DS18B20温度传感器等实现基本功能。通过这些传感器和芯片&#xff0c;当环境中可燃气体浓度或温度等发生变化时系统会发出相应的灯光报警信号和声音报警信号&#xff0c;以…

28.启动与暂停程序

上一个内容&#xff1a;27.设计注入功能界面 以它 27.设计注入功能界面 的代码为基础进行修改 点击添加游戏按钮之后就把游戏启动了 CWndINJ.cpp文件中修改&#xff1a; void CWndINJ::OnBnClickedButton1() {// TODO: 在此添加控件通知处理程序代码/*ExeLst.InsertItem(0, L…

Vue I18n国际化插件

Vue I18n国际化插件 安装目录结构及文件内容./locales/lang/zh.js./locales/lang/en.js./locales/index.js main.js引入页面具体使用及语言切换&#xff08;Vue3&#xff09;刷新保存原语言&#xff0c;App.vue添加路由守卫注意点 中文文档&#xff1a; https://kazupon.githu…

69. UE5 RPG 使用Gameplay Cue 实现技能表现效果

在上一章中&#xff0c;我们实现了敌人的攻击技能的特效和音效。如果我们在多人模式下打开&#xff0c;发现&#xff0c;其它客户端看不到对应的效果。 造成这种问题的原因是因为敌人的技能是运行在服务器端的&#xff0c;它只复制到拥有它的客户端&#xff0c;而敌人的效果对于…

英伟达与斯坦福携手,打造未来全息XR眼镜:头带时代的终结

在XR(扩展现实)技术的演进过程中,一个显著的挑战在于如何平衡设备的便携性与视觉体验。传统的XR设备由于需要厚重的头带固定光学器件和显示器,不仅增加了体积,还为用户带来了社交上的不便。然而,随着英伟达与斯坦福大学戈登韦茨斯坦教授领导的研究团队的合作,这一难题似…

meilisearch的分页

Elasticsearch 做为老牌搜索引擎&#xff0c;功能基本满足&#xff0c;但复杂&#xff0c;重量级&#xff0c;适合大数据量。 MeiliSearch 设计目标针对数据在 500GB 左右的搜索需求&#xff0c;极快&#xff0c;单文件&#xff0c;超轻量。 所以&#xff0c;对于中小型项目来说…

探地雷达正演模拟,基于时域有限差分方法,四

突然发现第三章后半部分已经讲了使用接收记录成像的问题&#xff0c;所以这一章只讲解简单的数据分析。 &#xff08;均以宽角法数据为例子&#xff0c;剖面法数据处理方式都是相同的&#xff09;假设&#xff0c;我们现在已经获得了一个GPR记录&#xff0c;可以是常用的.sgy格…

DAY03 HTML

文章目录 一 表格1. 表格的语法2. 表格的可选标记3. 不规则的单元格&#xff08;合并单元格&#xff09;4. 表格的属性5. 表格的大小 二 列表1. 有序列表2. 无序列表3. 属性4. 列表的嵌套5. 定义列表【了解】 三 表单(重点)1. 表单的语法2. 表单的控件分类3. input元素4. selec…

为什么说Python 是胶水语言?

​ "Python 是胶水语言"这一说法是指它很擅长将不同的程序或代码库连接在一起&#xff0c;能够让来自不同编程语言或框架的组件无缝协作。Python 具有丰富的库和简单的语法&#xff0c;使得它可以轻松调用其他语言编写的程序或使用不同技术栈的模块。 ​ 以下是几个…

如何区分人工智能生成的图像与真实照片(下)

4 功能上的不合理性 AI 生成的图像往往会因为缺乏对现实世界物体结构和相互作用的了解&#xff0c;而产生各种功能不合理之处。这些不合理之处主要表现在以下几个方面&#xff1a; 4.1 构图不合理 物体关系不合逻辑: AI 生成的图像中&#xff0c;物体和人物之间的关系可能不符…