【大数据学习 | kafka】简述kafka的消费者consumer

1. 消费者的结构

能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者。

这里面要涉及到一个动作叫做拉取。

首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用,比如flume采集数据然后交给spark或者flink进行计算分析,但是flume采用的就是消息的push方式,这个方式不能够保证推送的数据消费者端一定会消费完毕,会出现数据的反压问题,这个问题很难解决,所以才出现了消息队列kafka,它可以起到一个缓冲的作用,生产者部分将数据直接全部推送到kafka,然后消费者从其中拉取数据,这边如果也采用推送的方式,那么也就在计算端会出现反压问题,所以kafka的消费者一般都是采用拉的方式pull,并不是push

1.1 消费者组

在一个topic中存在多个分区,可以分摊压力实现负载均衡,那么整体topic中的数据会很多,如果消费者只有一个的话很难全部消费其中的数据压力也会集中在一个消费者中,并且在大数据行业中几乎所有的计算架构都是分布式的集群模式,那么这个集群模式中,计算的节点也会存在多个,这些节点都是可以从kafka中拉取数据的,所有消费者不可能只有一个,一般情况下都会有多个消费者。

正因为topic存在多个分区,每个分区中的数据是独立的,那么消费者最好也是一个一个和分区进行一一对应的,所以有几个分区应该对应存在几个消费者是最好的。

这个和分蛋糕是一样的,一个蛋糕分成几块,那么有几个人吃,应该是对应关系的

消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

2. 消费者实现

在实现消费者的时候我们需要知道几个消费者的配置重要参数

参数解释
bootstrap.servers集群地址
key.deserializerkey反序列化器
value.deserializervalue反序列化器
group.id消费者组id

首先创建消费者对象

消费者对象订阅相应的topic然后拉取其中的数据进行消费

整体代码如下

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

public class Consumer1 {
    public static void main(String[] args) {
        Properties pro = new Properties();
        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
        pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");
        //设定组id
        pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //设定key的反序列化器
        pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //设定value的反序列化器
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
        List<String> topics = Arrays.asList("topic_a","topic_b");
        //一个消费者可以消费多个分区的数据
        consumer.subscribe(topics);
        //订阅这个topic
        while (true){
            //死循环要一直消费数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            //间隔一秒钟消费一次数据,拉取一批数据过来
            Iterator<ConsumerRecord<String, String>> it = records.iterator();
            while(it.hasNext()){
                ConsumerRecord<String, String> record = it.next();
                System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
            }
        }
    }
}
[hexuan@hadoop106 datas]$ kafka-console-producer.sh --bootstrap-server hadoop106:9092 --topic topic_b

>>1
>2
>3
>4
>5
>

3. 消费者与分区之间的对应关系

一个消费者组中的消费者和分区是一一对应的关系,一个分区应该对应一个消费者,但是如果消费者多了,那么有的消费者就没有分区消费,如果消费者少了那么会出现一个消费者消费多个分区的情况。

# 首先创建topic_c 用于测试分区和消费者的对应关系
kafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_c --partitions 3 --replication-factor 2
# 启动两个消费者 刚才我们写的消费者main方法运行两次
# 然后分别在不同的分区使用生产者发送数据,看数据在消费者中的打印情况

首先选择任务可以并行执行

选择任务修改配置

我们可以看到允许多实例并行执行

启动两次,这个时候我们就有了两个消费者实例

生产者线程:分别向三个分区中发送1 2 3元素

package com.hainiu.kafka.consumer;

/**
 * ClassName : test3_producer
 * Package : com.hainiu.kafka.consumer
 * Description
 *
 * @Author HeXua
 * @Create 2024/11/3 23:40
 * Version 1.0
 */

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class test3_producer {
    public static void main(String[] args) {
        Properties pro = new Properties();
        pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
        pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
        ProducerRecord<String, String> record1 = new ProducerRecord<>("topic_d", 0,null,"1");
        ProducerRecord<String, String> record2 = new ProducerRecord<>("topic_d", 1,null,"2");
        ProducerRecord<String, String> record3 = new ProducerRecord<>("topic_d", 2,null,"3");
        producer.send(record1);
        producer.send(record2);
//        producer.send(record3);
        producer.close();
    }
}

可以看到有的消费者消费了两个分区的数据

如果启动三个消费者会发现每个人消费一个分区的数据

如果启动四个消费者

我们发现有一个消费者没有数据

3. 1 消费多topic的数据

不同组消费不同的topic或者一个组可以消费多个topic都是可以的

3.2 多个组消费一个topic

同一个topic可以由多个消费者组进行消费数据,并且相互之间是没有任何影响的

修改同一份代码的组标识不同。启动两个实例查看里面的消费信息

   pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group1");
   pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");
   //分别修改消费者组的id不同
package com.hainiu.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

public class Consumer1 {
    public static void main(String[] args) {
        Properties pro = new Properties();
        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
        pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");
        pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
        List<String> topics = Arrays.asList("topic_c");
        //订阅多个topic的数据变化
        consumer.subscribe(topics);

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> it = records.iterator();
            while(it.hasNext()){
                ConsumerRecord<String, String> record = it.next();
                System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/908771.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

​Controlnet作者新作IC-light V2:基于FLUX训练,支持处理风格化图像,细节远高于SD1.5。

大家好&#xff01;今天我要向大家介绍一个超级有趣的话题——Controlnet作者的新作IC-light V2&#xff01;这个工具基于FLUX训练&#xff0c;能够支持处理风格化图像&#xff0c;并且细节表现远高于SD1.5。 想象一下&#xff0c;你有一个强大的AI助手&#xff0c;它能够根据…

危机来临前---- 力扣: 876

危机即将来临 – 链表的中间节点 描述&#xff1a; 给你单链表的头结点 head &#xff0c;请你找出并返回链表的中间结点。如果有两个中间结点&#xff0c;则返回第二个中间结点。 示例&#xff1a; 何解&#xff1f; 1、遍历找到中间节点 &#xff1a; 这个之在回文链表中找…

【AI绘画】ComfyUI - AnimateDiff基础教程和使用心得

AnimateDiff是什么&#xff1f; AnimateDiff 是一个能够将个性化的文本转换为图像的扩展模型&#xff0c;它可以在无需特定调整的情况下实现动画效果。通过这个项目&#xff0c;用户可以将他们的想象力以高质量图像的形式展现出来&#xff0c;同时以合理的成本实现这一目标。随…

【docker】docker 环境配置及安装

本文介绍基于 官方存储库 docker 的环境配置、安装、代理配置、卸载等相关内容。 官方安装文档说明&#xff1a;https://docs.docker.com/engine/install/ubuntu/ 主机环境 宿主机环境 Ubuntu 20.04.6 LTS 安装步骤 添加相关依赖 sudo apt-get update sudo apt-get install…

一二三应用开发平台自定义查询设计与实现系列3——通用化重构

通用化重构 前面我们以一个实体为目标对象&#xff0c;完成了功能开发与调试。 在此基础上&#xff0c;我们对功能进行重构&#xff0c;使其成为平台的标准化、通用化的功能。 前端重构 首先&#xff0c;先把自定义组件挪到了平台公共组件目录下&#xff0c;如下&#xff1…

国标GB28181视频平台EasyCVR私有化视频平台工地防盗视频监控系统方案

一、方案背景 在当代建筑施工领域&#xff0c;安全监管和防盗监控是保障工程顺利进行和资产安全的关键措施。随着科技进步&#xff0c;传统的监控系统已不足以应对现代工地的安全挑战。因此&#xff0c;基于国标GB28181视频平台EasyCVR的工地防盗视频监控系统应运而生&#xf…

征程 6 工具链性能分析与优化 2|模型性能优化建议

01 引言 为了应对低、中、高阶智驾场景&#xff0c;以及当前 AI 模型在工业界的应用趋势&#xff0c;地平线推出了征程 6 系列芯片。 在软硬件架构方面&#xff0c;征程 6 不仅保持了对传统 CNN 网络的高效支持能力&#xff0c;还强化了对 Transformer 类型网络的支持&#xf…

【真题笔记】16年系统架构设计师要点总结

【真题笔记】16年系统架构设计师要点总结 存储部件接口嵌入式处理器产品配置配置管理用户文档系统文档CMM&#xff08;能力成熟度模型&#xff09;螺旋模型敏捷软件开发的方法学软件工具面向对象的分析模型设计模型COP&#xff08;面向构件的编程&#xff09;构件原子构件模块S…

GR2——在大规模视频数据集上预训练且机器人数据上微调,随后预测动作轨迹和视频(含GR1详解)

前言 上个月的24年10.9日&#xff0c;我在朋友圈看到字节发了个机器人大模型GR2&#xff0c;立马去看了下其论文(当然了&#xff0c;本质是个技术报告) 那天之后&#xff0c;我就一直想解读这个GR2来着 然&#xff0c;意外来了&#xff0c;如此文《OmniH2O——通用灵巧且可全…

Autocad2018

链接: https://pan.baidu.com/s/1MTd0gc5Q5zoKFnPNwk1VXw 提取码: x15v

把握数字化新趋势,迎接生态架构新时代——The Open Group 2024生态系统架构·可持续发展年度大会参会指南

距离大会还有&#xff1a;22天 在数字化转型的浪潮中&#xff0c;如何抓住机遇&#xff0c;实现可持续发展&#xff0c;已成为各行各业关注的焦点。The Open Group 2024生态系统架构可持续发展年度大会&#xff0c;将于2024年11月22日在北京国贸大酒店隆重举行。本次大会汇聚全…

OpenGL入门006——着色器在纹理混合中的应用

本节将理解顶点和片段着色器在纹理混合中的应用 文章目录 一些概念纹理时间依赖动画 实战简介dependenciesshader.fsshader.vsteenager.pngtex.png utilswindowFactory.hshader.hRectangleModel.hRectangleModel.cpp main.cppCMakeLists.txt最终效果 一些概念 纹理 概述&…

Spring Cloud Bus快速入门Demo

1.什么是Spring Cloud Bus&#xff1f; Spring Cloud Bus 是一个用于将分布式系统的节点连接起来的框架&#xff0c;它使用了轻量级消息代理来实现节点之间的通信。Spring Cloud Bus 可以将配置变更事件、状态变更事件和其他管理事件广播到系统中的所有节点&#xff0c;以便于…

通过Wireshark抓包分析,体验HTTP请求的一次完整交互过程

目录 一、关于Wireshark 1.1、 什么是Wireshark 1.2、下载及安装 二、HTTP介绍 2.1、HTTP请求过程介绍 2.2 、TCP协议基础知识 2.2.1、概念介绍 2.2.2、TCP协议的工作原理 2.2.3、三次握手建立连接 2.3.4、四次挥手断开连接 2.3、Wireshark抓包分析过程 2.3.1、三次握…

聚观早报 | 比亚迪腾势D9登陆泰国;苹果 iOS 18.2 将发布

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 11月5日消息 比亚迪腾势D9登陆泰国 苹果 iOS 18.2 将发布 真我GT7 Pro防尘防水细节 小米15 Ultra最快明年登场 …

tomcat 开启远程debug模式

1.修改位置 CATALINA_OPTS"-Xdebug -Xrunjdwp:transportdt_socket,address*:8000,servery,suspendn"2.修改环境变量的方式 apache-tomcat-9.0.86/bin/setenv.sh export JAVA_HOME/opt/jdk1.8.0_171 export CATALINA_HOME/opt/apache-tomcat-9.0.86 export JAVA_OP…

【工具变量】中国制造2025试点城市数据集(2000-2023年)

数据简介&#xff1a;《中国制造2025》是中国ZF于2015年5月8日印发的一项战略规划&#xff0c;旨在加快制造业的转型升级&#xff0c;提升制造业的质量和效益&#xff0c;实现从制造大国向制造强国的转变。该规划是中国实施制造强国战略的第一个十年行动纲领&#xff0c;明确提…

VScode的C/C++点击转到定义,不是跳转定义而是跳转声明怎么办?(内附详细做法)

以最简单的以原子的跑马灯为例&#xff1a; 1、点击CtrlShiftP&#xff0c;输入setting&#xff0c;然后回车 2、输入Browse 3、点击下面C_Cpp > Default > Browse:Path里面添加你的工程路径 然后就可以愉快地跳转定义啦~ 希望对你有帮助&#xff0c;如果还不可以的话&a…

java常用框架介绍

1. Spring Boot 特点&#xff1a;Spring Boot是Spring家族中的一个新成员&#xff0c;它基于Spring 4.0设计&#xff0c;提供了默认配置、简化依赖管理以及内嵌式容器等特性&#xff0c;使得开发者能够快速创建独立的、生产级别的Spring应用。 用途&#xff1a;Spring Boot特别…

Leetcode328奇偶链表,Leetcode21合并两个有序链表,Leetcode206反转链表 三者综合题

题目描述 思路分析 这题的思路就和我们的标题所述一样&#xff0c;可以看作是这3个题的合并&#xff0c;但是稍微还有一点点区别 比如&#xff1a;奇偶链表这道题主要是偶数链在了奇数后面&#xff0c;字节这个的话是奇偶链表分离了 所以字节这题的大概思路就是&#xff1a; …