消费者api编写教程

1.基本属性配置

输入new Properties().var 回车

//创建属性

        Properties properties = new Properties();

       //连接集群

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");

        //反序列化

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

        //指定消费者组id

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");

2.创建消费者

输入new KafkaConsumer<String,String>(properties).var 回车选择消费者名称

//创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

3.订阅主题/分区

3.1订阅主题

   输入new ArrayList<String,String>().var 回车修改变量名为topics

        //创建一个数组列表变量接收topics值
        ArrayList<String> topics = new ArrayList<>();
        //指定要订阅的主题
        topics.add("customers");
        //订阅主题
        kafkaConsumer.subscribe(topics);

3.2订阅分区

    输入new ArrayList<TopicPartition>().var 回车选择变量名为topicsPartitions

4.消费数据

//消费数据
        while (true){
            //if (flag  == true) flag 标志位置
            //break;
            //}生产中退出循环的位置;
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            //将消费的信息输出到控制台
            for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
                System.out.println(consumerRecord);
            }
        }

5.运行MyConsumer,通过生产者api发送消息

输出台上可以看到输出的都是订阅的主题/分区的信息

6.完整代码

package com.ljr.kafka.replay;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args) {
    //创建属性
        Properties properties = new Properties();
       //连接集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");
        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        //指定消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");

    //创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

    /*//订阅主题
        //创建一个数组列表变量接收topics值
        ArrayList<String> topics = new ArrayList<>();
        //指定要订阅的主题
        topics.add("customers");
        //订阅主题
        kafkaConsumer.subscribe(topics);*/

    //订阅分区
        //创建一个数组列表变量接收主题分区值
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        //指定要订阅的分区
        topicPartitions.add(new TopicPartition("customers",2));
        //订阅分区
        kafkaConsumer.assign(topicPartitions);

    //消费数据
        while (true){
            //if (flag  == true) flag 标志位置
            //break;
            //}生产中退出循环的位置;
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            //将消费的信息输出到控制台
            for (ConsumerRecord<String,String> consumerRecord : consumerRecords){
                System.out.println(consumerRecord);
            }
        }


    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/678699.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

springboot配置

springboot配置 配置文件分类 springboot提供了多种属性配置方式 application.xml(只有老的spring项目使用)application.propertiesapplication.yml(或yaml) 优先级:properties>yml>yaml 配置文件 pom.xml★ 在Spring Boot项目中&#xff0c;pom.xml文件是Maven项目…

随便用css换个渐变的太阳

来源于GPT4o&#xff1a;代码来源 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>渐变色上半圆…

javaScript垃圾回收机制(垃圾数据是如何自动回收的)

一.javaScript数据类型存储 若想了解javaScript的垃圾数据是如何自动回收的&#xff0c;首先我们应该先简单知道一下关于javascript的数据垃圾数据是如何存储的&#xff08;JavaScript的内存机制&#xff09;。 1.JavaScript的数据类型 JavaScript是一种弱类型的、动态的语言 …

基于t-SNE的泰坦尼克号数据集降维

目录 1. 作者介绍2. 算法介绍2.1 t-SNE介绍2.2.SNE基本原理2.3.拥挤问题2.4.t-SNE基本原理2.5.t-SNE算法过程 3. 泰坦尼克号数据集降维实验3.1.数据集介绍3.2 任务介绍3.3 代码实现3.4 实验结果 参考连接 1. 作者介绍 刘方星&#xff0c;男&#xff0c;西安工程大学电子信息学…

数仓建模—指标拆解和选取

数仓建模—指标拆解和选取 第一节指标体系初识介绍了什么是指标体系 第二节指标体系分类分级和评价管理介绍了指标体系管理相关的,也就是指标体系的分级分类 这一节我们看一下指标体系的拆解和指标选取,这里我们先说指标选取,其实在整个企业的数字化建设过程中我们其实最…

一条sql的执行流程

文章地址 https://blog.csdn.net/qq_43618881/article/details/118657040 连接器 请求先走到连接器&#xff0c;与客户端建立连接、获取权限、维持和管理连接 mysql缓存池 如果要查找的数据直接在mysql缓存池里面就直接返回数据 分析器 请求已经建立了连接&#xff0c;现在…

Spark Streaming 概述及入门案例

一、介绍 1. 不同的数据处理 从数据处理的方式&#xff1a; 流式数据处理(Streaming)批量数据处理(Batch) 从数据处理的延迟&#xff1a; 实时数据处理(毫秒级别)离线数据处理(小时或天级别) 2. 简介 SparkStreaming 是一个准实时(秒或分钟级别)、微批量的数据处理框架Spa…

代码随想录算法训练营第28天(py)| 回溯 | 93.复原IP地址、78.子集、90.子集II

93.复原IP地址 力扣链接 给定一个只包含数字的字符串&#xff0c;复原它并返回所有可能的 IP 地址格式。 有效的IP地址不能含有前导0&#xff0c;共有4个字段&#xff0c;且每个字段不能超过255 思路 class Solution:def restoreIpAddresses(self, s: str) -> List[str]:r…

异或炸弹(easy)(牛客小白月赛95)

题目链接: D-异或炸弹&#xff08;easy&#xff09;_牛客小白月赛95 (nowcoder.com) 题目&#xff1a; 题目分析&#xff1a; 一看 还以为是二维差分的题呢 到后来才发现是一维差分问题 这里的距离是 曼哈顿距离 dis abs(x - xi) abs(y - yi) 暴力的做法 就是枚举 n * n 个…

NFC隐藏功能大公开:乘车刷门禁,NFC实用无风险

手机到底集成了多少功能&#xff1f;如今恐怕已经没有人能数的清了。但是你又用到了多少呢&#xff1f;有些功能是不是就从来没打开过呢&#xff1f;花了全款却只用功能的百分之一&#xff0c;是不是感觉很不划算呢&#xff1f; 费流量、占内存、费电通常是用户浪费手机功能的…

SG7050CCN CMOS输出石英晶体振荡器适用于防盗防灾装置

爱普生晶振SG7050CCN是一款额定频率范围2.5MHz至50MHz的石英晶体振荡器(SPXO)&#xff0c;支持CMOS输出&#xff0c;具有小尺寸7.0x5.0mm四脚贴片晶振&#xff0c;体积小巧,高稳定性&#xff0c;其中爱普生的一款额定频率16.000MHz,/-50ppm晶振&#xff0c;7050封装常规有源晶振…

大量单号中如何分析出异常单号

什么情况下单号算异常单号呢&#xff0c;首先根据单号物流信息过程轨迹判断哦&#xff0c;比如某个单号已显示快递公司已揽收了&#xff0c;超过24或36、48甚至更长时间也没有看到走件信息哦&#xff0c;一般这类单号也叫揽收后无走信息&#xff0c;这类单号就只能一条揽收信息…

【智能体】文心智能体大赛第二季持续进行中,一起在智能体的海洋里发挥你的创意吧

目录 背景作文小助手AI迅哥问答程序员黄历助手比赛时间第二期赛题丰厚奖品评选说明获奖智能体推荐文章 背景 AI应用&#xff08;智能体&#xff09;&#xff0c;持续火热&#xff0c;一句话就能创建一个有趣、好玩的应用。 可以说一分钟内就能创建一个有创意的智能体。 看大多…

【第十课】空间数据基础与处理——空间范围处理

一、前言 在利用Arcgis分析中通常会研究我国局部地区的发展&#xff0c;如长江中游城市群、 某个省、长江经济带等&#xff0c;在对这类区域进行可视化时&#xff0c;经常会需要一幅局部地图&#xff0c;通常这种局部地图是很难直接获取的&#xff0c;需要通过一定的方法进行处…

2024年云计算、信号处理与网络技术国际学术会议(ICCCSPNT 2024)

2024年云计算、信号处理与网络技术国际学术会议&#xff08;ICCCSPNT 2024&#xff09; 2024 International Academic Conference on Cloud Computing, Signal Processing, and Network Technology&#xff08;ICCCSPNT 2024&#xff09; 会议简介&#xff1a; 2024年云计算、…

cesium Material的理解与使用

1.简介 材质Material可以是比较简单的&#xff0c;比如直接将一张图片赋予表面&#xff0c;或者使用条纹状、棋盘状的图案&#xff1b;也可以使用Fabric和GLSL&#xff0c;重新创建一个新的材质或者组合现有的材质。例如&#xff0c;我们可以通过程序生成的纹理(procedural bri…

【leetcode--盛水最多的容器】

给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 写出来了一半&#xff0c;想到用双指针&am…

高精度滚珠丝杆在自动化生产中的关键因素!

如今&#xff0c;自动化技术正以前所未有的速度改变着人们的生活和工作方式&#xff0c;特别是在高精度精密设备的制造与应用领域&#xff0c;提高生产效率和优化生产流程正变得越来越重要。在自动化生产中&#xff0c;滚珠丝杆的优化应用对于提高生产效率、保证产品质量至关重…

【OCPP】ocpp1.6协议第4.3章节DataTransfer的介绍及翻译

目录 4.3、DataTransfer-概述 DataTransfer 消息 数据传输请求&#xff08;DataTransfer Request&#xff09; 数据传输响应&#xff08;DataTransfer Response&#xff09; 使用场景 示例 DataTransfer 请求示例 处理 DataTransfer 响应 示例代码 可能的错误处理 总…

Java1.8语言+ springboot +mysql + Thymeleaf 全套家政上门服务平台app小程序源码

Java1.8语言 springboot mysql Thymeleaf 全套家政上门服务平台app小程序源码 家政系统是一套可以提供上门家政、上门维修、上门洗车、上门搬家等服务为一体的家政平台解决方案。它能够与微信对接、拥有用户端小程序&#xff0c;并提供师傅端app&#xff0c;可以帮助创业者在…