Kafka学习-Java使用Kafka

文章目录

  • 前言
  • 一、Kafka
    • 1、什么是消息队列
      • offset
    • 2、高性能
      • topic
      • partition
    • 3、高扩展
      • broker
    • 4、高可用
      • replicas、leader、follower
    • 5、持久化和过期策略
    • 6、消费者组
    • 7、Zookeeper
    • 8、架构图
  • 二、安装Zookeeper
  • 三、安装Kafka
  • 四、Java中使用Kafka
    • 1、引入依赖
    • 2、生产者
    • 3、消费者
    • 4、运行效果


前言

Kafka消息中间件

一、Kafka

1、什么是消息队列

假设我们有两个服务:生产者A每秒能生产200个消息,消费者B每秒能消费100个消息。

在这里插入图片描述

那么B服务是处理不了A这么多消息的,那么怎么使B不被压垮的同时还能处理A的消息呢,我们引入一个中间件,即Kafka。(当然着并不能使消费者的处理速度上升)

在这里插入图片描述

offset

那么我们可以在B服务中加入一个队列,也就是一个链表,链表的每个节点相当于一条消息,每个节点有一个序号即offset,记录消息的位置。

在这里插入图片描述

在这里插入图片描述

但是这样也会有个问题,还没有处理的消息是存储在内存中的,如果B服务挂掉,那么消息也就丢失了。
所以我们可以把队列移出,变成一个单独的进程,即使B服务挂掉,消息也不会丢失。

在这里插入图片描述

2、高性能

B服务由于性能差,队列中未处理的消息会越来越多,我们可以增加更多的消费者来处理消息,相对的也可以增加更多的生产者来生成消息。

在这里插入图片描述

topic

但是,生产者与消费者会争抢同一个队列,没有抢到就要等待,那么怎么解决呢?
我们可以将消息进行分类,每一类消息是一个topic,生产者按消息的类型投递到不同的topic中,消费者也按照不同的topic进行消费。

在这里插入图片描述

partition

但是单个topic的消息还是有可能过多,我们可以将单个队列拆分,每段是一个partition分区,每个消费者负责一个partition

在这里插入图片描述

3、高扩展

broker

随着partition过多,所有的partition都在同一个机器上,就可能会导致单机的cpu和内存过高,影响性能,那么我们可以使用多台机器,将partition分散部署在不同的机器上。每台机器就代表一个broker
我们可以增加broker来缓解服务器的cpu过高的性能问题。

在这里插入图片描述

4、高可用

replicas、leader、follower

假如某个broker挂了, 那么其中partition中的消息也就都丢失了,那么这个问题怎么解决呢?
我们可以给partition多加几个副本,统称replicas,并将它们分为leaderfollower
leader负责生产者和消费者的读写,follower只负责同步leader的数据。假如leader挂了,也不会影响follower,随后在follower中选出一个leader,保证消息队列的高可用。

在这里插入图片描述

5、持久化和过期策略

在上面讲述了leader挂掉的情况,如果所有的broker都挂了,消息不就都丢失了?
为了解决这个问题,就不能只把数据存在内存中,也要存在磁盘中。
但是如果所有消息一直保存在磁盘中,那磁盘也会被占满,所以引入保留策略。

6、消费者组

如果我想在原有的基础上增加一个消费者,那么它只能跟着最新的offset接着消费,如果我想从某个offset开始消费呢?
我们引入消费者组,实现不同消费者维护自己的消费进度。

在这里插入图片描述

7、Zookeeper

上面介绍了很多的组件,每个组件都有自己的状态信息,那么就需要有一个组件去统一维护这些组件的状态信息,于是引入了Zookeeper组件,它会定期与broker通信,获取Kafka集群的状态,判断哪些broker挂了,消费者组消费到哪了等等。

8、架构图

在这里插入图片描述

二、安装Zookeeper

1、官网地址

https://zookeeper.apache.org/

2、下载

在这里插入图片描述

选择稳定版本下载

在这里插入图片描述

3、解压,修改配置文件

解压后,复制 zoo_sample.cfg,重命名为 zoo.cfg

在这里插入图片描述

修改数据文件目录位置

在这里插入图片描述

4、启动

我们是在windows系统下安装的,运行 bin 目录下的 zkServer.cmd

在这里插入图片描述

三、安装Kafka

1、官网地址

https://kafka.apache.org/

2、下载

在这里插入图片描述

3、解压,修改配置文件

修改 config 目录下 server.properties 文件
修改日志文件位置,其他参数(如zookeeper端口,根据需要修改)

在这里插入图片描述

4、启动

bin\windows\kafka-server-start.bat config\server.properties

在这里插入图片描述

四、Java中使用Kafka

1、引入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

2、生产者

public static void main(String[] args) throws InterruptedException {
    Properties prop = new Properties();

    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    prop.put(ProducerConfig.ACKS_CONFIG, "all");
    prop.put(ProducerConfig.RETRIES_CONFIG, 0);
    prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    prop.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

    String topic = "hello";

    KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
    for (int i = 0; i < 100; i++) {
        producer.send(new ProducerRecord<String, String>(topic, Integer.toString(2), "hello kafka" + i));
        System.out.println("生产消息:" + i);
        Thread.sleep(1000);
    }
    producer.close();
}

3、消费者

public static void main(String[] args) {
    Properties prop = new Properties();

    prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
    prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    prop.put(ConsumerConfig.GROUP_ID_CONFIG, "con-1");    // 消费者组
    prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);    //自动提交偏移量
    prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);     //自动提交时间

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
    ArrayList<String> topics = new ArrayList<>();
    //可以订阅多个消息
    topics.add("hello");
    consumer.subscribe(topics);

    try {
        while(true) {
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(10));
            for (TopicPartition topicPartition : poll.partitions()) {

                //	通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息
                List<ConsumerRecord<String, String>> partitionRecords = poll.records(topicPartition);

                //	获取TopicPartition对应的主题名称
                String topic = topicPartition.topic();
                //	获取TopicPartition对应的分区位置
                int partition = topicPartition.partition();
                //	获取当前TopicPartition下的消息条数
                int size = partitionRecords.size();
                System.out.printf("--- 获取topic: %s, 分区位置:%s, 消息总数: %s%n",
                        topic,
                        partition,
                        size);

                for(int i = 0; i < size; i++) {
                    ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
                    //	实际的数据内容
                    String key = consumerRecord.key();
                    //	实际的数据内容
                    String value = consumerRecord.value();
                    //	当前获取的消息偏移量
                    long offset = consumerRecord.offset();
                    //	表示下一次从什么位置(offset)拉取消息
                    long commitOffser = offset + 1;
                    System.out.printf("消费消息 key:%s, value:%s, 消息offset: %s, 提交offset: %s%n",
                            key, value, offset, commitOffser);
                    Thread.sleep(1500);
                }

            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        consumer.close();
    }
}

4、运行效果

生产消息

在这里插入图片描述

消费消息

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/628571.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Unity使用sherpa-onnx实现离线语音合成

sherpa-onnx https://github.com/k2-fsa/sherpa-onnx 相关dll和lib库拷进Unity&#xff0c;官方示例代码稍作修改 using SherpaOnnx; using System; using System.IO; using System.Runtime.InteropServices; using UnityEngine;public class TTS : MonoBehaviour {public st…

Google I/O 2024 干货全解读:Gemini AI 横空出世,智能未来触手可及!

Google I/O 2024 干货全解读&#xff1a;Gemini AI 横空出世&#xff0c;智能未来触手可及&#xff01; 博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》…

git 拉取指定目录

指令方式 打开 git 自带的Git Bash 工具 以拉取github中 fastjson 的 /src/test/java/oracle/sql/ 目录为例 1.创建文件夹和git 初始化 cd D:/Program\ Files mkdir fastjson cd fastjson git init 2.设置允许克隆子目录 git config core.sparsecheckout true 3.添加远程…

前端开发攻略---用代码带你走近双色球再到远离双色球

1、演示 2、玩法及规则 双色球是一种流行的彩票游戏&#xff0c;它在很多国家都有自己的版本。以下是双色球的详细玩法&#xff1a; 选择号码&#xff1a;玩家需要从1至33的红色球中选择6个号码&#xff0c;并且从1至16的蓝色球中选择1个号码&#xff0c;构成一组7个号码。 购…

使用make_blobs生成数据并使用KNN机器学习算法进行分类和预测以及可视化

生成数据 使用make_blobs生成数据并使用matplotlib进行可视化 完整代码&#xff1a; from sklearn.datasets import make_blobs # KNN 分类器 from sklearn.neighbors import KNeighborsClassifier # 画图工具 import matplotlib.pyplot as plt # 数据集拆分工具 from sklea…

Linux 第三十一章

&#x1f436;博主主页&#xff1a;ᰔᩚ. 一怀明月ꦿ ❤️‍&#x1f525;专栏系列&#xff1a;线性代数&#xff0c;C初学者入门训练&#xff0c;题解C&#xff0c;C的使用文章&#xff0c;「初学」C&#xff0c;linux &#x1f525;座右铭&#xff1a;“不要等到什么都没有了…

力扣127.单词接龙讲解

距离上一次刷题已经过去了.........嗯............我数一一下............整整十天&#xff0c;今天再来解一道算法题 由于这段时间准备简历&#xff0c;没咋写博客。。今天回来了&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&…

【二叉树】(二)二叉树的基础修改构造及属性求解1

&#xff08;二&#xff09;二叉树的基础修改构造及属性求解1 翻转二叉树递归实现迭代实现&#xff08;深度遍历&#xff09;层序实现&#xff08;广度遍历&#xff09; 对称二叉树递归实现迭代实现&#xff08;非层序遍历&#xff09; 二叉树的最大深度递归法迭代法&#xff0…

你了解黑龙江等保测评么?

等保测评的全称是信息安全等级保护测评&#xff0c;是信息安全等级保护工作中的一项重要内容。 具体来说&#xff0c;等保测评是指按照国家相关标准和技术规范&#xff0c;对信息系统安全等级保护状况进行检测评估的活动。 其主要目的是发现信息系统存在的安全隐患和不足&…

学会给文件夹加密,保密措施不可或缺!

我们的个人信息、工作文件和其他重要数据都存储在各种设备和文件夹中&#xff0c;如何保证这些数据的安全&#xff0c;防止未经授权的访问和泄露&#xff0c;成为了一个不容忽视的问题。本文将探讨给文件夹加密的必要性&#xff0c;以及如何在手机和电脑上进行文件夹加密。 操作…

使用KNN预测一个新的点,以及将这个点用五角星进行matplotlib可视化展示

概述 基于之前的KNN案例继续做一些操作。 之前的完整代码如下&#xff1a; from sklearn.datasets import make_blobs # KNN 分类器 from sklearn.neighbors import KNeighborsClassifier # 画图工具 import matplotlib.pyplot as plt # 数据集拆分工具 from sklearn.model_…

DiskGenius帮你恢复系统无法识别的U盘数据

场景还原 前两天早上U盘复制文件卡死后&#xff0c;强行断开U盘&#xff0c;再次使用直接无法访问&#xff0c;心拔凉拔凉&#xff01;&#xff01; 使用驱动器G:中的光盘之前需要将其格式化 位置不可用-无法访问U盘 常规科普 一、U盘无法识别 1、检查U盘是否插入正确&…

【汇编语言】多文件组织

【汇编语言】多文件组织 文章目录 【汇编语言】多文件组织前言一、8086拓展1.子程序的另外一种写法2.程序的多文件组织 总结 前言 本篇文章将讲到子程序的另一种写法&#xff0c;以及程序的多文件组织。 一、8086拓展 1.子程序的另外一种写法 初始的程序 在这里我们对比一下…

6款电脑精选工具软件推荐!

AI视频生成&#xff1a;小说文案智能分镜智能识别角色和场景批量Ai绘图自动配音添加音乐一键合成视频https://aitools.jurilu.com/ 1.IP地址查看工具——纯真ip数据库 纯真IP数据库是一个易于操作的IP地址查询工具&#xff0c;它允许用户通过输入IP地址来查询其对应的地理位置…

每天五分钟玩转深度学习pytorch:pytorch中的张量类型

本文重点 和numpy一样,pytorch中也有自己的类型,本节课程我们将对它的类型进行介绍,并且学习不同的类型之间的转换,这是pytorch的基础。 基本类型 pytorch的基本变量称为张量Tensor,这张表是pytorch中的类型,Tensor有不同的类型,他和很多编程语言中的类型相似,它有 32…

ROS学习笔记(15)小车巡墙驾驶

0.前提 前一章我讲解了拉氏变换和PID&#xff0c;这一章我来讲解一下小车巡墙驾驶的理论和部分代码。 1.前情回顾 1.拉氏变换 拉普拉斯变换是要将时域问题转换成频域问题来处理。 2.PID控制器 转向角&#xff1a; 误差牺牲&#xff1a; 3.具体参看上一篇文章 2.巡墙驾驶…

AI 一键生成高清短视频,视频 UP 主们卷起来...

现在短视频越来越火&#xff0c;据统计&#xff0c;2023年全球短视频用户数量已达 10 亿&#xff0c;预计到2027年将突破 24 亿。对于产品展示和用户营销来说&#xff0c;短视频已经成为重要阵地&#xff0c;不管你喜不喜欢它&#xff0c;你都得面对它&#xff0c;学会使用它。…

Proxy和Reflect,打造灵活的JS代理机制 (代码示例)

在 JavaScript 中&#xff0c;代理&#xff08;Proxy&#xff09;和反射&#xff08;Reflect&#xff09;是 ES6 引入的两个新特性。Proxy用于创建一个对象的代理&#xff0c;从而实现对这个对象的操作的拦截、转换或扩展&#xff1b;而Reflect则提供了一系列与 JavaScript 运行…

线上3D博物馆搭建简单吗?有何优势?有哪些应用场景?

随着科技的飞速发展&#xff0c;传统的博物馆参观方式正在经历一场前所未有的变革&#xff0c;在科技的“加持”下&#xff0c;不少博物馆凭借强大的技术、创意和美学实践&#xff0c;频频“出圈”&#xff0c;线上3D博物馆逐渐崛起&#xff0c;这不仅丰富了人们的文化体验&…