spring boot 使用 Kafka

一、Kafka作为消息队列的好处

  1. 高吞吐量:Kafka能够处理大规模的数据流,并支持高吞吐量的消息传输。

  2. 持久性:Kafka将消息持久化到磁盘上,保证了消息不会因为系统故障而丢失。

  3. 分布式:Kafka是一个分布式系统,可以在多个节点上运行,具有良好的可扩展性和容错性。

  4. 支持多种协议:Kafka支持多种协议,如TCP、HTTP、UDP等,可以与不同的系统进行集成。

  5. 灵活的消费模式:Kafka支持多种消费模式,如拉取和推送,可以根据需要选择合适的消费模式。

  6. 可配置性强:Kafka的配置参数非常丰富,可以根据需要进行灵活配置。

  7. 社区支持:Kafka作为Apache旗下的开源项目,拥有庞大的用户基础和活跃的社区支持,方便用户得到及时的技术支持。

二、springboot中使用Kafka

  1. 添加依赖:在pom.xml文件中添加Kafka的依赖,包括spring-kafka和kafka-clients。确保版本与你的项目兼容。

  2. 创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。

  3. 配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。

  4. 发送消息:在需要发送消息的地方,注入Kafka生产者,并使用其发送消息到指定的Kafka主题。

  5. 创建消费者:创建一个Kafka消费者类,实现Consumer接口,并使用KafkaTemplate订阅指定的Kafka主题。

  6. 配置消费者:在Spring Boot的配置文件中配置Kafka消费者的相关参数,例如group id、auto offset reset等。

  7. 接收消息:在需要接收消息的地方,注入Kafka消费者,并使用其接收消息。

  8. 处理消息:对接收到的消息进行处理,例如保存到数据库或进行其他业务逻辑处理。

三、使用Kafka

pom中填了依赖

<dependency>  
    <groupId>org.springframework.kafka</groupId>  
    <artifactId>spring-kafka</artifactId>  
    <version>2.8.1</version>  
</dependency>  
<dependency>  
    <groupId>org.apache.kafka</groupId>  
    <artifactId>kafka-clients</artifactId>  
    <version>2.8.1</version>  
</dependency>
  1. 创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。

import org.apache.kafka.clients.producer.*;  
import org.springframework.beans.factory.annotation.Value;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.stereotype.Component;  
  
@Component  
public class KafkaProducer {  
    @Value("${kafka.bootstrap}")  
    private String bootstrapServers;  
  
    @Value("${kafka.topic}")  
    private String topic;  
  
    private KafkaTemplate<String, String> kafkaTemplate;  
  
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {  
        this.kafkaTemplate = kafkaTemplate;  
    }  
  
    public void sendMessage(String message) {  
        Producer<String, String> producer = new KafkaProducer<>(bootstrapServers, new StringSerializer(), new StringSerializer());  
        try {  
            producer.send(new ProducerRecord<>(topic, message));  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            producer.close();  
        }  
    }  
}
  1. 配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。

import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.kafka.core.DefaultKafkaProducerFactory;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.kafka.core.ProducerFactory;  
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;  
import org.springframework.kafka.core.ConsumerFactory;  
import org.springframework.kafka.core.ConsumerConfig;  
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;  
import org.springframework.kafka.listener.MessageListener;  
import org.springframework.context.annotation.PropertySource;  
import java.util.*;  
import org.springframework.beans.factory.*;  
import org.springframework.*;  
import org.springframework.*;expression.*;value; 																																		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	  @Value("${kafka}")   Properties kafkaProps = new Properties(); @Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf){ KafkaTemplate<String, String> template = new KafkaTemplate<>(pf); template .setMessageConverter(new StringJsonMessageConverter()); template .setSendTimeout(Duration .ofSeconds(30)); return template ; } @Bean public ProducerFactory<String, String> producerFactory(){ DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(kafkaProps); factory .setBootstrapServers(bootstrapServers); factory .setKeySerializer(new StringSerializer()); factory .setValueSerializer(new StringSerializer()); return factory ; } @Bean public ConsumerFactory<String, String> consumerFactory(){ DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(consumerConfigProps); factory .setBootstrapServers(bootstrapServers); factory .setKeyDeserializer(new StringDeserializer()); factory .setValueDeserializer(new StringDeserializer()); return factory ; } @Bean public ConcurrentMessageListenerContainer<String, String> container(ConsumerFactory<String, String> consumerFactory, MessageListener listener){ ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory); container .setMessageListener(listener); container .setConcurrency(3); return container ; } @Bean public MessageListener

消费者

import org.apache.kafka.clients.consumer.*;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.stereotype.Component;  
  
@Component  
public class KafkaConsumer {  
    @Value("${kafka.bootstrap}")  
    private String bootstrapServers;  
  
    @Value("${kafka.group}")  
    private String groupId;  
  
    @Value("${kafka.topic}")  
    private String topic;  
  
    private KafkaTemplate<String, String> kafkaTemplate;  
  
    public KafkaConsumer(KafkaTemplate<String, String> kafkaTemplate) {  
        this.kafkaTemplate = kafkaTemplate;  
    }  
  
    public void consume() {  
        Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs());  
        consumer.subscribe(Collections.singletonList(topic));  
        while (true) {  
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  
            for (ConsumerRecord<String, String> record : records) {  
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());  
            }  
        }  
    }  
  
    private Properties consumerConfigs() {  
        Properties props = new Properties();  
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);  
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  
        return props;  
    }  
}

四、kafka与rocketMQ比较

Kafka和RocketMQ都是开源的消息队列系统,它们具有许多相似之处,但在一些关键方面也存在差异。以下是它们在数据可靠性、性能、消息传递方式等方面的比较:

  1. 数据可靠性:
  • Kafka使用异步刷盘方式,而RocketMQ支持异步实时刷盘、同步刷盘、同步复制和异步复制。这使得RocketMQ在单机可靠性上比Kafka更高,因为它不会因为操作系统崩溃而导致数据丢失。此外,RocketMQ新增的同步刷盘机制也进一步保证了数据的可靠性。
  1. 性能:
  • Kafka和RocketMQ在性能方面各有千秋。由于Kafka的数据以partition为单位,一个Kafka实例上可能有多达上百个partition,而一个RocketMQ实例上只有一个partition。这使得RocketMQ可以充分利用IO组的commit机制,批量传输数据,从而在replication时具有更好的性能。然而,Kafka的异步replication性能理论上低于RocketMQ的replication,因为同步replication与异步replication相比,性能上会有约20%-30%的损耗。
  1. 消息传递方式:
  • Kafka和RocketMQ在消息传递方式上也有所不同。Kafka采用Producer发送消息后,broker马上把消息投递给consumer,这种方式实时性较高,但会增加broker的负载。而RocketMQ基于Pull模式和Push模式的长轮询机制,来平衡Push和Pull模式各自的优缺点。RocketMQ的消息及时性较好,严格的消息顺序得到了保证。
  1. 其他特性:
  • Kafka在单机支持的队列数超过64个队列,而RocketMQ最高支持5万个队列。队列越多,可以支持的业务就越多。

五、kafka使用场景

  1. 实时数据流处理:Kafka可以处理大量的实时数据流,这些数据流可以来自不同的源,如用户行为、传感器数据、日志文件等。通过Kafka,可以将这些数据流进行实时的处理和分析,例如进行实时数据分析和告警。
  2. 消息队列:Kafka可以作为一个消息队列使用,用于在分布式系统中传递消息。它能够处理高吞吐量的消息,并保证消息的有序性和可靠性。
  3. 事件驱动架构:Kafka可以作为事件驱动架构的核心组件,将事件数据发布到不同的消费者,以便进行实时处理。这种架构可以简化应用程序的设计和开发,提高系统的可扩展性和灵活性。
  4. 数据管道:Kafka可以用于数据管道,将数据从一个系统传输到另一个系统。例如,可以将数据从数据库或日志文件传输到大数据平台或数据仓库。
  5. 业务事件通知:Kafka可以用于通知业务事件,例如订单状态变化、库存更新等。通过订阅Kafka主题,相关的应用程序和服务可以实时地接收到这些事件通知,并进行相应的处理。
  6. 流数据处理框架集成:Kafka可以与流处理框架集成,如Apache Flink、Apache Spark等。通过集成,可以将流数据从Kafka中实时导入到流处理框架中进行处理,实现流式计算和实时分析。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/367168.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

海康威视有插件、无插件播放;webrtc直播;西瓜视频播放器;mpegts.js直播;flvjs直播

Notes 视频播放的几种方式 一、Video mp4链接直接播放 二、海康威视3.3插件版直播、云台控制&#xff0c;资源下载地址 index.html引入hk文件中的js文件双击HCWebSDKPlugin.exe安装插件前端参照文件夹hkCamera中的示例代码 三、海康威视3.2无插件版直播&#xff0c;资源下…

go_view同后端集成时的注意事项

goview是一个不错的可视化大屏配置工具;提供了丰富的功能可供调用。 官方地址和文档: https://gitee.com/dromara/go-view https://www.mtruning.club/guide/start/ 同nodejs集成可参考;https://gitee.com/qwdingyu/led (建议–后端集成有api功能,可直接配置sql)同dotne…

Leetcode—203. 移除链表元素【简单】

2024每日刷题&#xff08;一零九&#xff09; Leetcode—203. 移除链表元素 实现代码 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x), next(n…

Python爬虫的基本原理

我们可以把互联网比作一张大网&#xff0c;而爬虫&#xff08;即网络爬虫&#xff09;便是在网上爬行的蜘蛛。把网的节点比作一个个网页&#xff0c;爬虫爬到这就相当于访问了该页面&#xff0c;获取了其信息。可以把节点间的连线比作网页与网页之间的链接关系&#xff0c;这样…

ref和reactive

看尤雨溪说&#xff1a;为什么Vue3 中应该使用 Ref 而不是 Reactive&#xff1f;

华为1.24秋招笔试题

华为1.24秋招笔试题 1.题目1 题目详情 - 2024.1.24-华为秋招笔试-第一题-计算积分 - CodeFun2000 1.1题解 import java.util.Scanner;class Main{public static void main(String[] args){Scanner scnew Scanner(System.in);String ssc.next();char[] chs.toCharArray();in…

记一次 Android CPU高使用率排查

文章目录 背景排查高占用的进程adb shelltoptop -b -H -n 1 | grep 29337 (打印各线程 cpu使用详情)kill -3 29337 (生成trace文件)adb pull /data/anr /Users/gerry.liang/Desktop定位问题 补充说明: 背景 测试同学反馈我们的App CPU使用率 90% 居高不下,经过一番艰难的排查后…

如何在PS5上使用金手指修改游戏

环境&#xff1a;windows PS5 问题&#xff1a;PS5 没有GodHen&#xff0c;无法使用json金手指&#xff0c;PKG金手指比较少 解决办法&#xff1a;使用MultiTrainerv从网络注入PS5&#xff0c;修改进程内存 背景&#xff1a;为了护肝&#xff0c;拒绝刷刷刷 解决过程&#xff…

网络异常案例四_IP异常

问题现象 终端设备离线&#xff0c;现场根据设备ip&#xff0c;ping不通。查看路由器。 同一个路由器显示的终端设备&#xff08;走同一个wifi模块接入&#xff09;&#xff0c;包含不同网段的ip。 现场是基于三层的无线漫游&#xff0c;多个路由器wifi配置了相同的ssid信息&a…

VUE项目导出excel

导出excel主要可分为以下两种&#xff1a; 1. 后端主导实现 流程&#xff1a;前端调用到导出excel接口 -> 后端返回excel文件流 -> 浏览器会识别并自动下载 场景&#xff1a;大部分场景都有后端来做 2. 前端主导实现 流程&#xff1a;前端获取要导出的数据 -> 把常规数…

部署实战--修改jar中的文件并重新打包成jar文件

一.jar文件 JAR 文件就是 Java Archive &#xff08; Java 档案文件&#xff09;&#xff0c;它是 Java 的一种文档格式JAR 文件与 ZIP 文件唯一的区别就是在 JAR 文件的内容中&#xff0c;多出了一个META-INF/MANIFEST.MF 文件META-INF/MANIFEST.MF 文件在生成 JAR 文件的时候…

Redis -- list列表

只有克服了情感的波动&#xff0c;才能专心致志地追求事业的成功 目录 列表 list命令 lpush lpushx rpush rpushx lrange lpop rpop lindex linsert llen lrem ltrim 阻塞命令 小结 列表 列表相当于 数组或者顺序表。 列表类型是用来存储多个有序的字符串&…

Javascript | JS如何断点测试(WebStorm)

JavaScript的断点与之前所学到的Java和python在jetbrain系列编辑器中的断点debug不太一样&#xff0c;往常我们在编写python的时候用pycharm的时候是直接断点进入debug的&#xff0c;就像下面这样 只要直接在代码中断点&#xff0c;然后运行debug功能即可 但是在WebStorm中不是…

delete、truncate和drop区别

一、从执行速度上来说 drop > truncate >> DELETE 二、从原理上讲 1、DELETE DELETE from TABLE_NAME where xxx1.1、DELETE属于数据库DML操作语言&#xff0c;只删除数据不删除表的结构&#xff0c;会走事务&#xff0c;执行时会触发trigger&#xff08; 触发器…

jsp粉丝社区系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 JSP 粉丝社区系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为 TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql5.0&…

BEV感知(2)--转换模块

目录 一、2D到3D转换模块 1、LSS 2、Pseudo LiDAR 二、3D到2D转换模块 1、Explicit mapping 2、Implicit mapping 三、transformer相关 1、VIT 2、Swin Transformer 一、2D到3D转换模块 核心目的&#xff1a;由于将2D空间转换到BEV&#xff0c;所以我们要引入一个媒…

导出pdf 加密、加水印、加页脚

1.依赖 <dependency> <groupId>com.itextpdf</groupId> <artifactId>itextpdf</artifactId> <version>5.5.10</version> </dependency> <dependency> …

2024美赛ABCDEF题成品参考论文+配套数据代码+参考文献

社区抗灾能力综合评估与决策模型研究&#xff08;其余题目都在文末&#xff09; 摘要&#xff1a;社区抗灾能力的提升对于灾害风险管理至关重要。本研究基于机器学 习方法&#xff0c;构建了社区抗灾能力预测模型&#xff0c;以评估社区在灾害事件中的表现。首先&#xff0c; 我…

20240202在WIN10下使用whisper.cpp

20240202在WIN10下使用whisper.cpp 2024/2/2 14:15 【结论&#xff1a;在Windows10下&#xff0c;确认large模式识别7分钟中文视频&#xff0c;需要83.7284 seconds&#xff0c;需要大概1.5分钟&#xff01;效率太差&#xff01;】 83.7284/4200.1993533333333333333333333333…

断路精灵:探秘Sentinel熔断策略的神奇效果

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 断路精灵&#xff1a;探秘Sentinel熔断策略的神奇效果 前言熔断策略基础&#xff1a;数字断路精灵的初见熔断策略的基本原理&#xff1a;简单示例演示熔断策略的基本用法&#xff1a; 慢调用比例熔断策…