kafka用java收发消息

用java客户端代码来对kafka收发消息
具体代码如下

package com.cool.interesting.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;

public class KafkaTest {

    private static final String BOOTSTRAP_SERVERS = "192.168.47.145:9092";
    private static final String TOPIC_NAME = "test";

    public static void main(String[] args) {
        // 生产者示例
        produceMessage();

        // 消费者示例
        consumeMessage();

        //从指定偏移量消费消息
        consumeOffsetMessage();
    }
    //生产者代码
    private static void produceMessage() {
    	Properties props = new Properties();
    	//acks是保证消息的发送机制,有以下几个值
    	//acks = 0:表示生产端发送消息后立即返回,不等待broker端的响应结果。通常此时生产端吞吐量最高,消息发送的可靠性最低。
    	//acks = 1: 表示leader副本成功写入就会响应Producer,而无需等待ISR(同步副本)集合中的其他副本写入成功。这种方案提供了适当的持久性,保证了一定的吞吐量。默认值即是1。
    	//acks = all或-1: 表示不仅要等leader副本成功写入,还要求ISR中的其他副本成功写入,才会响应Producer。这种方案提供了最高的持久性,但也提供了最差的吞吐量。
    	//调优建议:建议根据实际情况设置,如果要严格保证消息不丢失,请设置为all或-1;如果允许存在丢失,建议设置为1;一般不建议设为0,除非无所谓消息丢不丢失。props.put(ProducerConfig.ACKS_CONFIG,1);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        //key和value序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //其他配置参数详见org.apache.kafka.clients.producer.ProducerConfig类

        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                //异步发送
                Future<RecordMetadata> send = producer.send(new ProducerRecord<>(TOPIC_NAME,  message));
                System.out.println("Sent message: " + message);
            }
        }
    }
    //正常消费者代码
     private static void consumeMessage() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        //将订阅的topic绑定到一个消费者(这个group_id 是自己定义的)
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test99");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅一个topic
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        while (true) {
            //设置kafak从broker拉取消息的超时时间
            // (这意味着 poll() 方法将在等待最多 2秒的时间内尝试从 Kafka 集群拉取消息,如果在超时时间内没有拉取到消息,将返回一个空的 ConsumerRecords 对象)
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received_message: " + record.value());
            }
        }

    }
    //指定偏移量开始消费
    private static void consumeOffsetMessage() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        //将订阅的topic绑定到一个消费者(这个group_id 是自己定义的)
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //订阅一个topic
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));
        //如果要指定偏移量,必须先poll一次,不然代码报错
        ConsumerRecords<String, String> poll = consumer.poll(0);
        System.out.println("poll:"+poll.isEmpty());
        //创建一个分区(参数为topic_name,和分区序号)
        TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, 0);
        // 指定要消费的偏移量
        long offset = 3;
        //从指定偏移量开始消息消息
        consumer.seek(topicPartition, offset);
        while (true) {
            //设置kafka从broker拉取消息的超时时间
            // (这意味着 poll() 方法将在等待最多 2秒的时间内尝试从 Kafka 集群拉取消息,如果在超时时间内没有拉取到消息,将返回一个空的 ConsumerRecords 对象)
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received_message: " + record.value());
            }
        }
    }
 }

安装kafka的可视化工具:offset explorer
offset explorer 是一个用于查看和管理 Kafka 消费者组的工具,它允许你检查消费者组的偏移量(offset),并且可以查看每个消费者组在每个分区上的偏移量情况。这对于监控和调试 Kafka 消费者组非常有用。
下载地址为:https://www.kafkatool.com/download.html
如下图所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/630628.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MP3解码入门(基于libhelix)

主要参考资料: 【Arduino Linux】基于 Helix 解码库实现 MP3 音频播放: https://blog.csdn.net/weixin_42258222/article/details/122640413 libhelix-mp3: https://github.com/ultraembedded/libhelix-mp3/tree/master 目录 一、MP3文件二、MP3 解码库三、libhelix-mp3库3.1 …

Java 自然排序和比较器排序区别?Comparable接口和Comparator比较器区别?

注&#xff1a;如果你对排序不理解&#xff0c;请您耐心看完&#xff0c;你一定会明白的。文章通俗易懂。建议用idea运行一下案例。 1&#xff09;自然排序和比较器排序的区别&#xff1f; 自然排序是对象本身定义的排序规则&#xff0c;由对象实现 Comparable 接口&#xff…

思科模拟器--2.静态路由和默认路由配置24.5.15

首先&#xff0c;创建三个路由器和两个个人电脑。 接着&#xff0c;配置两台电脑的IP&#xff0c;子网掩码和默认网关 对Router 0&#xff0c;进行以下命令&#xff1a; 对Router进行以下命令&#xff1a; 对Router2进行以下命令&#xff1a; 本实验完成。 验证&#xff1a;PC…

Leetcode39.组合总和

文章目录 题目描述解题思路重复子集剪枝 代码 题目 参考题解 题目描述 给你一个 无重复元素 的整数数组 candidates 和一个目标整数 target &#xff0c;找出 candidates 中可以使数字和为目标数 target 的 所有 不同组合 &#xff0c;并以列表形式返回。你可以按 任意顺序 返…

Android Studio kotlin 转 Java

一. 随笔记录 java代码可以转化成kotlin代码&#xff0c;当然 Kotlin 反过来也可以转java 在Android Studio中 可以很方便的操作 AS 环境&#xff1a;Android Studio Iguana | 2023.2.1 二. 操作步骤 1.步骤 顶部Tools ----->Kotlin ------>Show Kotlin Bytecode 步…

错误: 找不到或无法加载主类问题(已解决)

今天在虚拟机中安装了idea2023.2的版本&#xff0c;运行代码时发现错误找不到主类&#xff01; 直接说结论&#xff1a; 我先clean了一下target&#xff0c;然后重新build&#xff0c;发现maven报错了&#xff0c;idea2023.2默认使用了内置的maven&#xff0c;然后我切换了一下…

ThreadLocal,一次到位

一、定义 ThreadLocal是线程私有变量&#xff0c;用于保存每个线程的私有数据。 那么什么情况下需要进行线程隔离 二、源码分析 public class ThreadLocalTest01 {ThreadLocal<Integer> t new ThreadLocal<>();public void test() {t.set(1);Integer integer…

MT3036 第一节离数课后

思路&#xff1a; 这道题与之前的表达式求值题目不同的是&#xff0c;有not这个单目运算符。而且如果表达式错误&#xff0c;要输入error。 把true和false成为操作数&#xff0c;把and or not成为运算符。 考虑error的情况&#xff1a; 1.and 和 or是双目运算符&#xff0c…

文心一言指令多样化,你知道的有哪些?

文心一言的指令非常多样化&#xff0c;可以根据用户的需求和场景进行灵活调整。以下是一些常见的文心一言指令示例&#xff1a; 知识问答&#xff1a; 帮我解释一下什么是芯片&#xff1f;中国的历史上有哪些重要的朝代&#xff1f;人工智能在未来会有哪些发展趋势&#xff1f;…

表白成功率百分百的向女朋友表白网页源代码,向女友表白HTML源代码

表白成功率百分百的向女朋友表白网页源代码&#xff0c;向女友表白HTML源代码 效果&#xff1a; 完整代码下载地址&#xff1a;向女友表白HTML源代码 <!DOCTYPE html> <!--STATUS OK--> <html><head><meta http-equiv"Content-Type" c…

P8805 [蓝桥杯 2022 国 B] 机房

P8805 [蓝桥杯 2022 国 B] 机房 分析 是一道lca题目&#xff0c;可以直接套模板 前缀和处理点权 具体思路&#xff1a; 1.n台电脑用n-1条网线相连&#xff0c;任意两个节点之间有且仅有一条路径&#xff08;拆分成各自到公共祖先节点的路径——lca&#xff09;&#xff1b;…

CAD插入文字到另一图形样式变相同

CAD从一张图形复制到另外一张图形后&#xff0c;文字样式变成一样是因为两张图所用的文字样式名称一样&#xff0c;但是样式里面的使用字体样式不一样。如下图所示&#xff0c;找到工具栏中的注释 &#xff0c;点击文字样式。里面就会显示当前图形中使用的样式名称及其对应的字…

TINA 使用教程

常用功能 分析-电气规则检查&#xff1a;短路&#xff0c;断路等分析- 直流分析 交流分析 瞬态分析 视图-分离曲线 由于输出的容性负载导致的振荡 增加5欧电阻后OK 横扫参数 添加横扫曲线的电阻&#xff0c;选择R3&#xff1a;8K-20K PWL和WAV文件的支持 示例一&#xff1a;…

ubuntu在conda环境中使用 pip install -r requirements.txt但是没有安装在虚拟环境中

whereis pip pip listubuntu在conda环境中使用pip install lpips0.1.3 但是安装在了这里 Requirement already satisfied: lpips0.1.3 in /home/uriky/anaconda3/lib/python3.11/site-packages (0.1.3) 就会出现黄色波浪&#xff0c;未在虚拟环境中安装包 解决办法1&#xff1…

[NOIP2011 普及组] 瑞士轮

[NOIP2011 普及组] 瑞士轮 题目背景 在双人对决的竞技性比赛&#xff0c;如乒乓球、羽毛球、国际象棋中&#xff0c;最常见的赛制是淘汰赛和循环赛。前者的特点是比赛场数少&#xff0c;每场都紧张刺激&#xff0c;但偶然性较高。后者的特点是较为公平&#xff0c;偶然性较低…

如何使用JMeter测试导入接口/导出接口?

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 关注公众号&#xff1a;互联网杂货铺&#xff0c;回复1 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 今天上班&#xff0c;被开发问了一个问题&#xff1a;JM…

html基础(全)

html简介 目录 什么是网页 什么是 HTML 常用浏览器 WebE标准的构成 基本语法概述 第一个HTML页面 文档类型声明标签 lang 语言种类 字符集 标题标签 段落和换行标签 文本格式化标签 div和span标签 图像标签和路径 超链接标签 表格的主要作用 表头单元格标签 列…

[华为OD] C卷 dfs 特殊加密算法 100

题目&#xff1a; 有一种特殊的加密算法&#xff0c;明文为一段数字串&#xff0c;经过密码本查找转换&#xff0c;生成另一段密文数字串。 规则如下 1•明文为一段数字串由0-9组成 2.密码本为数字0-9组成的二维数组 3•需要按明文串的数字顺序在密码本里找到同样的数字串…

基于SpringBoot设计模式之创建型设计模式·工厂方法模式

文章目录 介绍开始架构图样例一定义工厂定义具体工厂&#xff08;上衣、下装&#xff09;定义产品定义具体生产产品&#xff08;上衣、下装&#xff09; 测试样例 总结优点缺点与抽象工厂不同点 介绍 在 Factory Method模式中&#xff0c;父类决定实例的生成方式&#xff0c;但…

用红黑树封装出map与set

目录 一、红黑树的改造 节点结构的定义 迭代器类的实现 红黑树中提供迭代器 红黑树的主要代码 二、set的实现 三、map的实现 四、测试代码 map与set的底层都是红黑树&#xff0c;所以本篇文章就分享如何用同一颗红黑树封装出map与set 所以大家可以先去看一下我的讲解红…