kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

目录

  • 1- 单播模式,只有一个消费者组
  • 2- 广播模式,多个消费者组
  • 3- Java实践

kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一般情况下partition-0存储a,c,e3个数据,partition-1存储b,d,f另外3个数据。

1- 单播模式,只有一个消费者组

topic只有1个partition,该组内有多个消费者时,此时同一个partition内的消息只能被该组中的一个consumer消费。当消费者数量多于partition数量时,多余的消费者是处于空闲状态的,如图1所示。topic,test只有一个partition,并且只有1个group,G1,该group内有多个consumer,只能被其中一个消费者消费,其他的处于空闲状态。

在这里插入图片描述
该topic有多个partition,该组内有多个消费者,比如test 有3个partition,该组内有2个消费者,那么可能就是C0对应消费p0,p1内的数据,c1对应消费p2的数据;如果有3个消费者,就是一个消费者对应消费一个partition内的数据了。图解分别如图2,图3.这种模式在集群模式下使用是非常普遍的,比如我们可以起3个服务,对应的topic设置3个partiition,这样就可以实现并行消费,大大提高处理消息的效率。

在这里插入图片描述
在这里插入图片描述

2- 广播模式,多个消费者组

如果想实现广播的模式就需要设置多个消费者组,这样当一个消费者组消费完这个消息后,丝毫不影响其他组内的消费者进行消费,这就是广播的概念。

多个消费者组,1个partition;
该topic内的数据被多个消费者组同时消费,当某个消费者组有多个消费者时也只能被一个消费者消费.

在这里插入图片描述

多个消费者组,多个partition

该topic内的数据可被多个消费者组多次消费,在一个消费者组内,每个消费者又可对应该topic内的一个或者多个partition并行消费,如图

在这里插入图片描述

3- Java实践

这里使用Java服务进行实践,模拟2个parition,然后同一个组内有2个消费者的情况:

首先创建一个发送消息的controller方法:

@ApiOperation(value = "向具有kafka-2个partition的topic发送信息")
    @RequestMapping(value = "/testSendMessage2", method = RequestMethod.POST)
    public String testSendMessage(@RequestParam("msg") String msg) {
        KafkaTemplate.send(KafkaTopicEnum.TEST_TWO_PARTITION_MSG.code,msg);
        System.out.println("发送的消息是:"+msg);
        return "2个partition的topic数据!--ok";
    }

然后再创建一个监听类监听该topic,这里的监听类即为消费者。

/**
     * @date 2020-09-24
     * 两个partition的topic,同一个组的两个消费者就可以并行的消费了,需要kafka也是集群才行,单机版并不支持
     * @param consumerRecord
     * @param acknowledgment
     */
    @KafkaListener(topics = "two-partition-msg",groupId ="serverGroup1",containerFactory = "ackContainerFactory")
    public void receiveKafkaTwoParMsg(ConsumerRecord<?,?> consumerRecord, Acknowledgment acknowledgment){
        InetAddress address = null;
        try {
            address = InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        System.out.println("当前的IP地址是:"+address.getHostAddress());
        System.out.println("监听服务A-收到的消息是::");
        System.out.println(consumerRecord.value().toString());
        System.out.println("=================== end =================");
//        ack 提交掉,避免服务重启再次拉取到消息
        acknowledgment.acknowledge();
    }

然后我们给该服务起2个实例,即模拟该组内serverGroup1内的2个消费者,然后我们使用测试方法进行测试,向该topic内发送多个消息,观察2个实例的输出日志:

实例1:

发送的消息是:111
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“111=================== end =================
发送的消息是:222
发送的消息是:333
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“333=================== end =================
发送的消息是:444
发送的消息是:555
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“555=================== end =================
发送的消息是:666
发送的消息是:777
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“777=================== end =================
发送的消息是:888
发送的消息是:999
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“999-----------------------------------

实例2:

当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“222=================== end =================
当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“444=================== end =================
当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“666=================== end =================
当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“888”
发现该组内的一个消费者消费到了111,333,555,777,999 ,另外一个消费者消费到了222,444,666,888,起到了均衡消费的效果。

所以在微服务的集群中,我们可以通过给topic设置多个partition,然后让每一个实例对应消费1个partition的数据,从而实现并行的处理数据,可以显著地提高处理消息的速度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/240323.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

光学遥感显著目标检测初探笔记总结

目录 观看地址介绍什么是显著性目标检测根据不同的输入会有不同的变体(显著性目标检测家族)目前这个领域的挑战 技术方案论文1(2019)论文2(2021)论文3(2022) 未来展望 观看地址 b站链接 介绍 什么是显著性目标检测 一张图片里最吸引注意力的部分就是显著性物体&#xff0c;…

【Stable Diffusion】在windows环境下部署并使用Stable Diffusion Web UI---By Conda

文章目录 一、Stable Diffusion介绍二、本地部署stable diffusion2.1 安装所需依赖环境2.1.1 安装CUDA2.1.2 安装显卡驱动2.1.3 安装Conda2.1.4 安装git工具--gitForWindows2.1.5 检查环境 2.2 配置Transformer环境变量2.3 安装SD WebUI2.4 安装SD WebUI过程中遇到的问题 三、 …

指针浅谈(三)

在指针浅谈(二)http://t.csdnimg.cn/SKAkD中我们讲到了const修饰指针、指针运算、野指针、assert断言和传址调用的内容&#xff0c;今天我们继续学习有关数组名、指针访问数组、一维数组传参的本质相关的内容&#xff0c;内容比较深入&#xff0c;如果觉得哪里讲解的不行&#…

Java EE 多线程之线程安全的集合类

文章目录 1. 多线程环境使用 ArrayList1. 1 Collections.synchronizedList(new ArrayList)1.2 CopyOnWriteArrayList 2. 多线程环境使用队列2.1 ArrayBlockingQueue2.2 LinkedBlockingQueue2.3 PriorityBlockingQueue2.4 TransferQueue 3. 多线程环境使用哈希表3.1 Hashtable3.…

QT----第三天,Visio stdio自定义封装控件

目录 第三天1 自定义控件封装 源码&#xff1a;CPP学习代码 第三天 1 自定义控件封装 新建一个QT widgetclass&#xff0c;同时生成ui,h,cpp文件 在smallWidget.ui里添加上你想要的控件并调试大小 回到mainwidget.ui&#xff0c;拖入一个widget&#xff08;因为我们封装的也…

jemeter,断言:响应断言、Json断言

一、响应断言 接口A请求正常返回值如下&#xff1a; {"status": 10013, "message": "user sign timeout"} 在该接口下创建【响应断言】元件&#xff0c;配置如下&#xff1a; 若断言成功&#xff0c;则查看结果树的接口显示绿色&#xff0c;若…

maui 开发音乐播放APP 优化(2)

界面改为&#xff1a; 音量可以调整 。同时当前状态 显示。以及播放音乐.视频有时可以自动播放有时候要手动。 界面代码 <?xml version"1.0" encoding"utf-8" ?> <ContentPage xmlns"http://schemas.microsoft.com/dotnet/2021/maui&quo…

MySQL的事务以及springboot中如何使用事务

事务的四大特性&#xff1a; 概念&#xff1a; 事务 是一组操作的集合&#xff0c;它是不可分割的工作单元。事务会把所有操作作为一个整体&#xff0c;一起向系统提交或撤销操作请求&#xff0c;即这些操作要么同时成功&#xff0c;要么同时失败。 注意&#xff1a; 默认MySQ…

最新UI酒桌喝酒游戏小程序源码,直接上传源码到开发者端即可,带流量主

源码介绍&#xff1a; 2023最新UI酒桌喝酒游戏小程序源码 娱乐小程序源码 带流量主.修改增加了广告位&#xff0c;直接上传源码到开发者端即可。 通过后改广告代码&#xff0c;然后关闭广告展示提交&#xff0c;通过后打开即可。无广告引流。 流量主版本的&#xff08;配合流…

proteus元件合集(一)

LCD LM018L​​ 绿色的LCD寻找方法&#xff1a; 直流电压源 直流电压源寻找方法&#xff1a; 滑动变阻器 滑动变阻器寻找方法&#xff1a; 注意&#xff1a;它出来之后会自动出现那两个红色的。那是电源。

崩坏:星穹铁道【V1.5攻略】五星(金)-遗器主、副词条成长数值参考

星穹铁道中五星遗器词条成长数值攻略&#xff1a; 温馨提示&#xff1a;以下数据会可能会出现一点一点误差&#xff0c;见谅... --------------------------- 一、如图&#xff1a; ----->>细节补充<<----- ①实际数值可能与游戏中不一&#xff0c;若数据出现无法忽…

详解Java中的异常体系结构(throw,throws,try-catch,finally,自定义异常)

目录 一.异常的概念 二.异常的体系结构 三.异常的处理 异常处理思路 LBYL&#xff1a;Look Before You Leap EAFP: Its Easier to Ask Forgiveness than Permission 异常抛出throw 异常的捕获 提醒声明throws try-catch捕获处理 finally的作用 四.自定义异常类 一.异…

深入理解亚信安慧AntDB-T数据库子计划的执行流程

概要&#xff1a; SQL语句在执行时会转换为执行计划&#xff0c;若其中包含了子查询或子链接并且不能被优化&#xff0c;则执行计划会生成子计划&#xff08;查看AntDB的执行计划时看到标记为SubPlan[1] 的部分即为子计划&#xff09;。在整个AntDB数据库中&#xff0c;子计划…

现代C++ 实现单例模式

传统写法有什么问题 如果你了解过单例模式&#xff0c;双重检查锁定模式&#xff08;Double-Checked Locking Pattern&#xff0c;后文简称DCLP&#xff09;的写法你一定不会陌生&#xff0c;甚至你或许认为它是最正确的代码。 class Singleton { public://获取单例Singleton…

MySQL 8.x temp空间不足问题

目录 一、系统环境 二、问题报错 三、问题回顾 四、解决问题 一、系统环境 系统Ubuntu20.04 数据库版本MySQL 8.0.21 二、问题报错 在MySQL上执行一个大的SQL查询报错Error writing file /tmp/MYfd142 (OS errno 28 - No space left on device) Exception in thread …

用C语言实现链队列的基本操作

不多解释&#xff0c;直接上代码&#xff0c;代码已经写了注释&#xff01; //队列链式结构的基本操作&#xff1a; #define _CRT_SECURE_NO_WARNINGS 1 #include<stdio.h> #include<stdlib.h> typedef int QueueElememtType; typedef struct QNode//链队的定义 {…

什么是蜘蛛池,蜘蛛池是什么蚂蚁SEO

蜘蛛池是一种通过大量模拟真实用户行为来提升网站搜索引擎排名的技术。这种技术利用大量的网络爬虫程序&#xff0c;模拟搜索引擎蜘蛛的爬行行为&#xff0c;通过大量的模拟爬行和页面抓取&#xff0c;提高网站的权重和排名。 如何联系蚂蚁seo&#xff1f; baidu搜索&#xf…

如何查看自己的文章是否被数据库收入?【查收查引】

致谢&#xff1a;特别感谢图书馆的蔡老师&#xff0c;告诉我怎么操作&#xff01; 另外&#xff0c;查收查引报告中的文章可以分开开&#xff0c;放在一起开不是必须的。&#xff08;放在一起开大概是院士工作量需要的。不是很了解。&#xff09; 如何查看自己的文章是否被数据…

杰发科技AC7840——CAN通信简介(1)

简介 7840支持4路CAN-FD Demo调试 官网下载demo&#xff0c;烧录&#xff0c;打开串口发现打印如下。原因是没有连接CAN盒子&#xff0c;总线错误。 CAN收发器端波形 CAN_L有信号&#xff0c;CAN_H没有 波形放大 GPIO端波形 有持续波形输出 波形放大查看&#xff0c;有50U…

.NET 反射优化的经验分享

比如针对 GetCustomAttributes 通过反射获取属性的优化,以下例子 // dotnet run -c Release -f net7.0 --filter "*" --runtimes net7.0 net8.0public class Tests{public object[] GetCustomAttributes() => typeof(C).GetCustomAttributes(typeof(MyAttribute…