Kafka是什么,以及如何使用SpringBoot对接Kafka

系列文章目录

上手第一关,手把手教你安装kafka与可视化工具kafka-eagle
架构必备能力——kafka的选型对比及应用场景
Kafka存取原理与实现分析,打破面试难关
防止消息丢失与消息重复——Kafka可靠性分析及优化实践



在这里插入图片描述
继上一次教大家手把手安装kafka后,今天我们直接来到入门实操教程,也就是使用SpringBoot该怎么对接和使用kafka。当然,在一开始我们也会比较细致的介绍一下kafka本身。那么话不多说,马上开始今天的学习吧

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 kafka 专栏,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis dubbo docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待

一、Kafka与流处理

我们先来看看比较正式的介绍:Kafka是一种流处理平台,由LinkedIn公司创建,现在是Apache下的开源项目。Kafka通过发布/订阅机制实现消息的异步传输和处理。它具有高吞吐量、低延迟、可伸缩性和可靠性等优点,使其成为了流处理和实时数据管道的首选解决方案

介绍其实是比较清晰的,如果你是第一次接触“流处理”概念,我们也可以做一点解释,流处理指的是对连续、实时产生的数据流进行实时处理、计算和分析的过程。

假设你正在玩一款在线游戏,其他玩家的动作和游戏事件会实时地传到服务器上。这些事件就形成了一条数据流。在流处理中,我们会对这条数据流进行实时处理,例如计算每个玩家的分数、监控游戏区域内的异常情况、统计玩家在线时长等等。这样,游戏管理员就可以实时地监控和管理游戏,而不需要等到游戏结束才进行操作。
类似的,流处理还可以应用在其他实时性要求比较高的场景中,例如金融交易、物联网、实时监测等。通过对数据流进行实时处理,我们可以更加精准地掌握数据变化的情况,并及时做出反应和调整,

二、Spring Boot与Kafka的整合Demo

1. 新建springboot工程

如果你没有现成的Spring boot项目,那么我们可以使用IDEA自带的Spring Initializr 来创建一个spring-boot的项目

在这里插入图片描述

此时我们可以直接选择使用Apache Kafka,另外项目还可以加个Spring Web准备让前台调用

在这里插入图片描述

2. 添加Kafka依赖

如果你不是像上述一样新建的项目,那你也可以选择在已有的Spring Boot应用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依赖:

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.8.11</version>
</dependency>

3. 配置Kafka

在application.properties文件中添加以下配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

这里我们指定了Kafka服务器的地址和端口,并配置了消费者组的ID,关于消费者组的概念,其实就是某一些消费者具备相同的功能,因此会把他们设为同一个消费者组,这样他们就不会重复消费同一条消息了。更具体地原理,我们会在之后地篇章中介绍。

4. 创建Kafka生产者

在Kafka中,生产者是发送消息的应用程序或服务。在Spring Boot中,我们可以使用KafkaTemplate类来创建Kafka生产者

package com.zhanfu.kafkademo.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaService {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("test_topic", message);
    }
}

这里我们使用@Autowired注解来自动注入KafkaTemplate,并使用send方法将消息发送到名为“test_topic”的Kafka主题中。


5. 创建Kafka消费者

在Kafka中,消费者是接收并处理订阅主题消息的应用程序或服务。在Spring Boot中,我们可以使用@KafkaListener注解来创建Kafka消费者。

package com.zhanfu.kafkademo.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaLis {

    @KafkaListener(topics = "test_topic", groupId = "test_group")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

6. 应用程序入口

现在我们已经完成了Spring Boot和Kafka的整合。我们可以启动Spring Boot应用程序,然后发送消息并消费它,以测试我们的应用程序是否正确地与Kafka集成。

package com.zhanfu.kafkademo.controller;

import com.zhanfu.kafkademo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Autowired
    private KafkaService kafkaService;

    @GetMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) {
        kafkaService.sendMessage(message);
        return "Message sent successfully";
    }
}

在这个例子中,我们使用@Autowired注解来自动注入KafkaProducer,并通过发送消息的方法来调用sendMessage方法。最终项目整体框架如图:

在这里插入图片描述

三、启动与验证

首先自然是启动 Kafka ,怎么启动可参考 《上手第一关,手把手教你安装kafka与可视化工具kafka-eagle》,然后是启动我们的Spring Boot项目

在这里插入图片描述

然后在浏览器中输入

http://127.0.0.1:8080/send/hello

在这里插入图片描述

最后检查我们的项目日志:

在这里插入图片描述

可以看到,整个发送和接收的流程都走通了

四、KafkaTemplate 介绍

不难看出,在Springboot中,使用kafka的关键在于 KafkaTemplate, 它是 Spring 提供的 Kafka 生产者模版,用于向 Kafka 集群发送消息。并且把 Kafka 的生产者客户端封装成了一个 Spring Bean,提供更加方便易用的 API。

它有三个主要属性:

  • producerFactory:生产者工厂类,用于创建 KafkaProducer 实例。
  • defaultTopic:默认主题名称,如果在发送消息时没有指定主题名称,则使用该默认主题。
  • messageConverter:消息转换器,用于将消息对象转换为 Kafka ProducerRecord

它的主要方法:

  • send(ProducerRecord<K,V> record):向指定的 Kafka 主题发送一条消息。ProducerRecord 包含了主题名称、分区编号、Key 和 Value 等信息。
  • send(String topic, V data):向指定的 Kafka 主题发送一条消息。
  • send(String topic, K key, V data):向指定的 Kafka 主题发送一条消息,并指定消息的 Key。
  • execute(ProducerCallback<K,V> callback):使用回调方式发送消息,可以自定义消息的创建过程和错误处理过程。
  • inTransaction():启用事务,多个 send 方法调用将被包装在一个事务中,保证 Kafka 事务的原子性。

除了上述方法外,KafkaTemplate 还提供了其他方法,如 sendDefault()sendOffsetsToTransaction() 等,可以根据实际需要进行选择和使用。

需要注意的是,在使用 KafkaTemplate 发送消息时应该注意消息的序列化方式、主题和分区的选择以及错误处理等问题,以保证消息的可靠性和正确性。

当然,很多同学可能还注意到一个细节,我们在上面的Demo中,我们直接将其 @Autowired进我们的代码中,这是怎么做到的呢?换句话说,这个 KafkaTemplate 为什么自己就会被spring 容器管理的呢?其实这得益于SpringBoot中对Kafka有了很多自动配置的内容。如下:

在这里插入图片描述
在这里插入图片描述

如上图,相信对Spring Boot熟悉的同学看到 ConditionalOnClassConditionalOnMissingBean 应该就明白了。其实Spring Boot 早就贴心的为我们预留了这些自动配置,只要我们引入了 spring-kafka 包,使得项目中出现了 KafkaTemplate 类,那么它就能被自动配置并存入Spring 容器内

总结

今天我们通过一个Demo讲解了在SpringBoot中如何对接Kafka,也介绍了下关键类 KafkaTemplate ,得益于Spring Boot 的自动配置,开发者要做的配置内容其实并不多,使用也主要是依赖其提供的API,相对简单,相信大家很容易也都学会了,那么在后面的过程中,我们将继续学习其使用,并且会着重讲解 Kafka 的原理与结构

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/532160.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Android输入框架

输入是一个操作系统的重要组成部分&#xff0c;没有输入&#xff0c;用户就无法向系统发送指令&#xff0c;也就没法完成人机交互。在Android系统中&#xff0c;输入系统是不可缺少的&#xff0c;下面简单介绍输入系统的整体框架&#xff0c;以下内容参考清华出版社出版的《And…

DSP笔记6-C2000的中断机制

中断Interrupt&#xff1a; 单核CPU顺序执行程序 中断源&#xff0c;引起计算机中断的时间&#xff0c;解放cpu&#xff0c;提高效率。 三个等级&#xff1a;CPU中断&#xff0c;PIE中断&#xff0c;外设中断 cpu定时器&#xff0c;EPWM&#xff0c;ADC&#xff0c;eCAP&…

计算机网友将饭卡余额改成100多万

你在学校干过最疯狂的事是什么&#xff1f; 一位学计算机的网友说&#xff0c;他改造过的水卡和饭卡都能无限使用&#xff0c;两年后在食堂刷卡&#xff0c;被食堂阿姨发现余额竟然还剩一百多万&#xff0c;虽然没有赔钱&#xff0c;但是被学校教务处处分了&#xff0c;怎么说…

03-JAVA设计模式-装饰模式

装饰模式 什么装饰模式 装饰器模式&#xff08;Decorator Pattern&#xff09;也叫包装器模式&#xff0c;是一种结构型设计模式&#xff0c;允许用户在不改变对象的情况下&#xff0c;动态地给对象增加一些额外的职责&#xff08;功能&#xff09;。装饰器模式相比生成子类更…

OSCP靶场--Hetemit

OSCP靶场–Hetemit 考点(python代码注入 systemctrl提权) 1.nmap扫描 ## ┌──(root㉿kali)-[~/Desktop] └─# nmap 192.168.173.117 -sV -sC -Pn --min-rate 2500 -p- Starting Nmap 7.92 ( https://nmap.org ) at 2024-04-10 05:52 EDT Nmap scan report for 192.168.1…

预训练的启蒙:浅谈BERT、RoBERTa、ALBERT、T5

文章目录 Transformer揭开预训练序幕为什么RNN/LSTM需要从头训练&#xff1f; BERT核心特点预训练任务架构应用和影响 RoBERTa改进点BERT和RoBERTa的MASK策略对比BERT的静态MASK策略RoBERTa的动态MASK策略效果 总结 ALBERT改进点参数共享因式分解嵌入参数和LoRa对比 总结 T5核心…

Chrome谷歌下载入口

​hello&#xff0c;我是小索奇 发现好多人说谷歌浏览器在哪里下载呀&#xff0c;哪里可以找到&#xff1f; 你可能会心想&#xff0c;一个浏览器你还不会下载啊&#xff1f; 还真是&#xff0c;有很多伙伴找不到下载入口&#xff0c;为什么呢&#xff1f; Bing进行搜索&am…

微信小程序转盘抽奖

场景&#xff1a; 在微信小程序里面开展抽奖活动使用转盘抽奖&#xff1b;类似下图&#xff08;图片来自百度&#xff09; 方法&#xff1a; 使用lukcy-canvas组件 在 微信小程序 中使用 | 基于 Js / TS / Vue / React / 微信小程序 / uni-app / Taro 的【大转盘 & 九宫…

unipush+个推实现消息推送

1.注册个推平台的帐号个推&#xff0c;专业的数据智能服务商-为垂直领域提供数据智能解决方案 2.应用列表中选择新增应用/服务 3.填写下应用信息4.创建好应用后在manifest.json中的sdkConfigs配置上写入appid、appkey、appsecret "sdkConfigs" : {"ad" :…

hive 数据库表常用操作及相关函数讲解

创建数据库并指定hdfs存储位置 create database myhive2 location ‘/myhive2’; 使用location关键字&#xff0c;可以指定数据库在HDFS的存储路径。 Hive的库在HDFS上就是一个以.db结尾的目录 默认存储在&#xff1a; /user/hive/warehouse内 当你为Hive表指定一个LOCATION时…

二分查找详解

以力扣2529为例&#xff0c;题目要求找到正整数的个数和负整数的个数。 一次遍历数组的方法的时间复杂度为O&#xff08;n&#xff09;&#xff0c;而二分查找的时间复杂度为O&#xff08;logn&#xff09;。 使用二分查找思路&#xff1a;所给nums数组升序排列&#xff0c;找…

基于用户的协同过滤算法实现商品推荐

文章目录 简介基于协同过滤算法&#xff08;UserCF&#xff09;原理&#xff08;我的理解&#xff09;皮尔逊相关系数计算 总结 简介 最近在做关于健康商城的项目&#xff0c;在首页需要向用户展示食品推荐&#xff0c;要求采用协同过滤的方式展示推荐的食品&#xff0c;第一次…

【Python】FANUC机器人OPC UA通信并记录数据

目录 引言机器人仿真环境准备代码实现1. 导入库2. 设置参数3. 日志配置4. OPC UA通信5. 备份旧CSV文件6. 主函数 总结 引言 OPC UA&#xff08;Open Platform Communications Unified Architecture&#xff09;是一种跨平台的、开放的数据交换标准&#xff0c;常用于工业自动化…

Vue - 2( 10000 字 Vue 入门级教程)

一&#xff1a;初识 Vue 1.1 绑定样式 1.1.1 绑定 class 样式 <!DOCTYPE html> <html><head><meta charset"UTF-8" /><title>绑定样式</title><style>......</style><script type"text/javascript"…

AOF文件重写

1.2.3.AOF文件重写 因为是记录命令&#xff0c;AOF文件会比RDB文件大的多。而且AOF会记录对同一个key的多次写操作&#xff0c;但只有最后一次写操作才有意义。通过执行bgrewriteaof命令&#xff0c;可以让AOF文件执行重写功能&#xff0c;用最少的命令达到相同效果。 如图&am…

互联网产品经理必备知识详解

1. 前言 本文档全面探讨了产品经理在产品管理过程中的关键环节,包括市场调研、产品定义及设计、项目管理、产品宣介、产品市场以及产品生命周期。通过深入剖析这些方面,本文旨在帮助产品经理系统地理解和掌握产品管理的核心要素,从而提升产品开发的效率和成功率。在市场调研…

分布式锁-redission

5、分布式锁-redission 5.1 分布式锁-redission功能介绍 基于setnx实现的分布式锁存在下面的问题&#xff1a; 重入问题&#xff1a;重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中&#xff0c;可重入锁的意义在于防止死锁&#xff0c;比如HashTable这样的代码…

多维 HighCharts

1&#xff1a;showHighChart.html <!DOCTYPE html> <html lang"zh-CN"><head><meta charset"UTF-8"><!-- js脚本都是官方的,后两个是highchart脚本 --><script type"text/javascript" src"jquery1.7.1.mi…

Unity 九宫格

1. 把图片拖拽进资源文件夹 2.选中图片&#xff0c;然后设置图片 3.设置九宫格 4.使用图片&#xff0c;在界面上创建2个相同的Image,然后使用图片&#xff0c;修改Image Type 为Sliced

书生·浦语大模型第二期实战营第二课笔记和基础作业

来源&#xff1a; 作业要求:Homework - Demo 文档教程:轻松玩转书生浦语大模型趣味 Demo B站教程:轻松玩转书生浦语大模型趣味 Demo 1. 笔记 2.基础作业 2.1 作业要求 2.2 算力平台 2.3 新建demo目录&#xff0c;以及新建目录下的文件&#xff0c;下载模型参数 2.4 Intern…