消息中间件 --Kafka

一、 Kafka

1.kafka介绍

        Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。

生产者发送消息,多个消费者只能有一个消费者接收到消息

生产者发送消息,多个消费者都可以接收到消息 

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个 代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些 已发布的消息。 
2.kafka 下载(windows下)

ksfka下载链接如下,点击链接进入官网即可下载

kafka官网:http://kafka.apach e.org/

温馨提示:JDK版本至少需要1.8,高版本也可兼容;

第一步:将kafka文件夹解压缩,

注意:一定一定要解压到自己磁盘的根目录下,不然会出现闪退情况!!!

第二步:创建一个 data 空文件夹

后续需要用来存放日志文件,只要创建完成就可以了,kafka启动后会自动生成日志文件;

第三步: 修改 zookeeper.properties 配置文件

我们点击进入config文件夹,找到 zookeeper.properties 配置文件,双击进行修改

然后,我们找到 dataDir ,将它的值修改为我们刚才创建的 data 文件的路径,还要注意一点,在后面还要多加一个 "/zk",因为一会还要配置 server.properties ,所以要用将她们两个区分开

第四步:修改 server.properties 配置文件

和刚才一样,我们双击修改 "server.properties" 配置文件

我们修改 log.dirs 的值为刚才创建的 data 文件夹的路径,在路径末尾再添加上 "/kafka" ,用来和刚才的zk做区分,kafka 文件夹用来存放kafka的日志文件,zk 文件夹用来存放zoopeeper的日志文件;

第五步:创建 "zk.cmd" windows脚本文件

以记事本的方式打开,然后加入下面这句话,

这句话的含义就是启动 Zookeeper ,并且启动文件为 "zookeeper.properties" ;

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

第六步:创建 "kfk.cmd" windows脚本文件

仍然以记事本的方式打开,然后加入下面这句话,

这句话的含义就是启动 kafka ,并且启动文件为 "server.properties" ;

call bin/windows/kafka-server-start.bat config/server.properties

注意:先启动双击 zk.cmd 启动 zookeeper双击 kafka.cmd 启动 kafka,关闭的时候,需要先关闭 kafka,再关闭 zookeeper ;

3.kafka使用:

(1)创建kafka-demo项目,导入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

(2)生产者发送消息

/**
 * @author 生产者
 * @version 1.0
 * @since 2024/9/2
 */
public class ProducerQuickStart {
    public static void main(String[] args) {

        // 1.kafka链接配置信息
        Properties prop = new Properties();

        // kafka链接地址
        // 指定 Kafka 集群的地址。在这个例子中,Kafka 服务运行在本地主机(localhost)的 9092 端口上。
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

        // key和value的序列化
        // 配置消息的 key 和 value 序列化类。
        // Kafka 需要将 key 和 value 转换为字节数组进行传输,这里使用的是 StringSerializer,表示 key 和 value 都是字符串类型。
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        // 2.创建kafka生产者对象
        // 生产者对象依赖于前面配置的 Properties 对象来连接 Kafka 集群并发送消息。
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);

        //3.发送消息
        /**
         * 第一个参数 :topic
         * 第二个参数:消息的key
         * 第三个参数:消息的value
         */
        // topic-first:消息要发送到的 Kafka 主题(topic)。
        // key-001:消息的 key,用于分区策略或消息的唯一标识。
        // hello kafka:消息的内容(value)。
        ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("topic-first","key-001","hello kafka");
        // 同步发送消息
        // 使用 Kafka 生产者的 send 方法将消息发送到 Kafka 集群中。
        // 这是一个异步操作,意味着消息会在后台发送,不会阻塞主线程。
        producer.send(kvProducerRecord);

        //4.关闭消息通道  必须要关闭,否则消息发送不成功
        producer.close();

    }
}

(3)消费者接收消息

/**
 * @author 甜甜
 * @version 1.0
 * @since 2024/9/2
 */
public class ConsumerQuickStart1 {

    public static void main(String[] args) {

        // 1.kafka的配置信息
        Properties prop = new Properties();

        // 链接地址
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //key和value的反序列化器
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        // 设置消费者组
        // 指定消费者所属的消费者组(group1)。
        //
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");

        // 2.创建消费者对象
        // Kafka 使用消费者组来协调同一组中多个消费者的消息消费情况,确保每条消息只会被组内的一个消费者处理。
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);

        //3.订阅主题
        // 消费者订阅 topic-first 主题。
        // Collections.singletonList("topic-first") 创建了一个包含 topic-first 的列表,表示消费者只订阅一个主题。
        // 对应的生产者那边的主题
        consumer.subscribe(Collections.singletonList("topic-first"));

        //4.拉取消息
        // 无限循环,用于不断地从 Kafka 中拉取消息。
        while (true) {
            // 读取数据,读取超时时间为100ms ,即每个1000ms拉取一次
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            // ConsumerRecord 是 Kafka 中表示一条消息的对象,它包含消息的 key、value 以及一些元数据信息。
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }

    }

}

使用情景:

  • 生产者发送消息,多个消费者订阅同一个主题(多个消费者都是一个组)只能有一个消费者收到消息 (一对一)
  • 生产者发送消息,多个消费者订阅同一个主题(多个消费者不是一个组)所有消费者都能收到消息 (一对多) 
4.springboot集成kafka
4.1导入spring-kafka依赖信息
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--  kafkfa  -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.11.0-M2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
4.2在resources下创建文件application.yml
server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      retries: 10 #重试的次数
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

4.3消息生产者
4.4消息消费者 

传递消息为对象

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有 两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

  • 发送消息
  • 接收消息

二、消息中间件对比

消息中间件对比-选择建议

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/873475.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

时序数据库 IoTDB 为什么选择 TPCx-IoT 基准测评?

IoTDB 在 TPCx-IoT 榜单的 What 与 Why 解答&#xff01; 去年&#xff0c;我们发布了 IoTDB 多项性能表现位居国际数据库性能测试排行榜 benchANT&#xff08;Time Series: DevOps&#xff09;第一名的好消息。 刚刚落幕的数据库顶级会议 VLDB 上&#xff0c;我们又收获了一则…

Python QT实现A-star寻路算法

目录 1、界面使用方法 2、注意事项 3、补充说明 用Qt5搭建一个图形化测试寻路算法的测试环境。 1、界面使用方法 设定起点&#xff1a; 鼠标左键双击&#xff0c;设定红色的起点。左键双击设定起点&#xff0c;用红色标记。 设定终点&#xff1a; 鼠标右键双击&#xf…

猴王采集——多多采集必备,关键词,类目整店采集,正在拼爆款截流采集

在日新月异的电商领域&#xff0c;数据就是商战的情报&#xff0c;而精准、高效的数据采集则成为商家们制胜的关键。今天&#xff0c;让我们一同探索一款颠覆传统、引领未来的数据采集工具 一、关键词类目整店采集&#xff1a; “猴王采集”凭借其强大的关键词搜索能力&#…

什么是 TDengine?

TDengine 是一款专为物联网、工业互联网等场景设计并优化的大数据平台&#xff0c;其核心模块是高性能、集群开源、云原生、极简的时序数据库。它能安全高效地将大量设备、数据采集器每天产生的高达 TB 甚至 PB 级的数据进行汇聚、存储、分析和分发&#xff0c;对业务运行状态进…

深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念

文章目录 文章导图RabbitMQ架构及相关概念四大核心概念名词解读 七大工作模式及四大交换机类型0、前置了解-默认交换机DirectExchange1、简单模式(Simple Queue)-默认DirectExchange2、 工作队列模式(Work Queues)-默认DirectExchange3、发布/订阅模式(Publish/Subscribe)-Fano…

探索EasyCVR与AI技术深度融合:视频汇聚平台的新增长点

随着5G、AI、边缘计算、物联网&#xff08;IoT&#xff09;、云计算等技术的快速发展&#xff0c;万物互联已经从概念逐渐转变为现实&#xff0c;AIoT&#xff08;物联网人工智能&#xff09;的新时代正在加速到来。在这一背景下&#xff0c;视频技术作为信息传输和交互的重要手…

简单实用的php全新实物商城系统

免费开源电商系统,提供灵活的扩展特性、高度自动化与智能化、创新的管理模式和强大的自定义模块,让电商用户零成本拥有安全、高效、专业的移动商城。 代码是全新实物商城系统源码版。 代码下载

c++进阶——unordered的封装

嗨喽大家好呀&#xff0c;今天阿鑫给大家带来的是c进阶——unordered的封装&#xff0c;好久不见啦&#xff0c;下面让我们进入本节博客的内容吧&#xff01; c进阶——unordered的封装 unordered系列的基本架构unordered系列迭代器的封装unordered不支持修改keyoperator[]的…

macos系统内置php文件列表 系统自带php卸载方法

在macos系统中, 自带已经安装了php, 根据不同的macos版本php的版本号可能不同, 我们可以通过 which php 命令来查看mac自带的默认php安装路径, 不过注意这个只是php的执行文件路径. 系统自带php文件列表 一下就是macos默认安装的php文件列表. macos 10.15内置PHP文件列表配置…

软件工程-图书管理系统的概要设计

软件概要设计说明书 目录 软件概要设计说明书 一、引言 1.1 编写目的 1.2 背景 1.3 定义 1.3.1特定对象 1.3.2专业术语 1.4 参考资料 二、总体设计 2.1 需求规定 2.1.1信息要求 2.1.2功能要求 2.2 运行环境 2.3 基本概要设计和处理流程 2.4 体系结构设计 2.5 模…

基于微信小程序在线订餐系统

微信小程序在线订餐系统 摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了微信小程序在线订餐系统的开发全过程。通过分析微信小程序在线订餐系统管理的不足&#xff0c;创建了一个计算机管理微信小程序在线订…

【Python基础】Python函数

本文收录于 《Python编程入门》专栏&#xff0c;从零基础开始&#xff0c;分享一些Python编程基础知识&#xff0c;欢迎关注&#xff0c;谢谢&#xff01; 文章目录 一、前言二、函数的定义与调用三、函数参数3.1 位置参数3.2 默认参数3.3 可变数量参数&#xff08;或不定长参数…

Kubernetes 简介及部署方法

目录 一、Kubernetes简介 1 应用部署方式演变 1.2 容器编排应用 1.3 kubernetes 简介 1.4 K8S的设计架构 1.4.1 K8S各个组件用途 1.4.2 K8S 各组件之间的调用关系 1.4.3 K8S 的 常用名词感念 1.4.4 k8S的分层架构 二 K8S集群环境搭建 2.1 k8s中容器的管理方式 2.2 …

网络安全知识:什么是访问控制列表 (ACL)?

访问控制列表 (ACL) 是网络安全和管理的基础。它们在确定谁或什么可以访问网络内的特定资源方面发挥着重要作用。 本文深入探讨了 ACL 的复杂性&#xff0c;探索了其类型、组件、应用程序和最佳实践。我们还将比较不同操作系统的 ACL&#xff0c;并讨论它们在网络架构中的战略…

生信圆桌x生信分析平台:助力生物信息学研究的综合工具

介绍 少走弯路,高效分析;了解生信云,访问 【生信圆桌x生信专用云服务器】 : www.tebteb.cc 生物信息学的迅速发展催生了众多生信分析平台&#xff0c;这些平台通过集成各种生物信息学工具和算法&#xff0c;极大地简化了数据处理和分析流程&#xff0c;使研究人员能够更高效地…

如何使div居中?CSS居中终极指南

前言 长期以来&#xff0c;如何在父元素中居中对齐一个元素&#xff0c;一直是一个让人头疼的问题&#xff0c;随着 CSS 的发展&#xff0c;越来越多的工具可以用来解决这个难题&#xff0c;五花八门的招式一大堆&#xff0c;这篇博客&#xff0c;旨在帮助你理解不同的居中方法…

EVO进行轨迹评估

EVO进行轨迹评估 文章目录 EVO进行轨迹评估1 前言1.1 轨迹对齐1.2 尺度变换1.3 绝对轨迹误差ATE和相对轨迹误差RTE1.4 绝对姿态误差APE和相对姿态误差RPE 2 安装evo2.1 evo安装2.2 相关报错2.2.1 版本不兼容问题2.2.2 解决PATH警告 2.3 测试 3 evo指令3.1 evo_traj3.2 evo_ape3…

Spring Boot3.x 启动自动执行sql脚本

1 引言 某些项目在首次启动时&#xff0c;需要先手动创建数据库表&#xff0c;然后再手动写入初始数据才能正常使用。为了省去这个手动操作过程&#xff0c;我们可以使用Spring Boot启动时执行sql脚本的配置&#xff0c;全自动完成这个过程。 2 配置 具体配置如下&#xff1…

Python 调用手机摄像头

Python 调用手机摄像头 在手机上安装软件 这里以安卓手机作为演示&#xff0c;ISO也是差不多的 软件下载地址 注意&#xff1a;要想在电脑上查看手机摄像头拍摄的内容的在一个局域网里面(没有 WIFI 可以使用 热点 ) 安装完打开IP摄像头服务器 点击分享查看IP 查看局域网的I…

[Linux]:进程(下)

✨✨ 欢迎大家来到贝蒂大讲堂✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;Linux学习 贝蒂的主页&#xff1a;Betty’s blog 1. 进程终止 1.1 进程退出的场景 进程退出只有以下三种情况&#xff1a; 代…