Kafka基础入门-代码实操

   Kafka是基于发布/订阅模式的消息队列,消息的生产和消费都需要指定主题,因此,我们想要实现消息的传递,第一步必选是创建一个主题(Topic)。下面我们看下在命令行和代码中都是如何创建主题和实现消息的传递的。

使用命令行操作Kafka

使用命令行操作主题

  • 使用kafka-topics.sh脚本来实现对Topic的操作
sh kafka-topics.sh

   执行命令之后,我们可以找到到下面这行提示,REQUIRED代表必须的,就是说我们想要实现对Kafak的操作必须要带有这个参数,表示我们要连接的Kafka具体服务。

--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect  
  connect to>                              to.   

   接下来,就让我们创建一个主题吧。

# --bootstrap-server  用于指定我们连接的Kafka服务地址,9092是默认端口号  
# --topic  指定要操作的Topic名称  
# --create 表示本次是要创建一个主题  
sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test  --create
# 执行结果
Created topic test.

   查看下我们的主题是否创建成功

sh kafka-topics.sh --bootstrap-server localhost:9092 --list    
# 执行结果
test

   查看某一个主题的详细信息

sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test  --describe  
# 执行结果
Topic: test	TopicId: ehyjS3R3Saq8Cx2V1x0p7g	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: test	Partition: 0	Leader: 1	Replicas: 1	Isr: 1

使用命令行消费数据

  • 我们通过kafka-console-consumer.sh来生产消息。
 sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
 #输出
hello kafka

使用命令行生产数据

  • 我们通过kafka-console-consumer.sh来生产消息。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test  
# 输入
>hello kafka

   想了解如何启动Kafka,可以看这篇文章《Kafka基础入门》。

使用代码操作Kafka

   添加依赖包

 <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.7.1</version>
        </dependency>
    </dependencies>

生产者代码

        // 创建配置对象
        Map<String,Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"lcoalhost:9092");
        // 对生产的数据的K,V 进行序列化的操作
        configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSelection.class.getName());
        configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSelection.class.getName());

        // 创建生产者对象
        //      生产者需要设定泛型:数据类型的约束
        KafkaProducer<String,String> producer = new KafkaProducer<String,String>(configMap);
        // 创建数据
        //      构建数据时,需要传递三个参数
        //          第一个参数表示主题名称,主题不存在时会自动创建
        //          第二个参数表示数据的Key
        //          第二个参数表示数据的Value
        ProducerRecord<String,String> record = new ProducerRecord<String,String>("test","key","value");
        // 通过生产者对象,将数据发送到Kafka
        producer.send(record);
        //关闭生产者对象
        producer.close();

消费者代码

        // 创建配置对象
        Map<String,Object> configMap = new HashMap<>();
        configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"lcoalhost:9092");
        // 对生产的数据的K,V 进行反序列化的操作
        configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSelection.class.getName());
        configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSelection.class.getName());
        configMap.put(ConsumerConfig.GROUP_ID_CONFIG,"com.kafka.test");
        // 创建消费者对象
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String,String>(configMap);
        //订阅主题
        kafkaConsumer.subscribe(Collections.singleton("test"));
        // 从Kafka中获取数据
        //      消费者从Kafka中拉取数据
        ConsumerRecords<String, String> datas = kafkaConsumer.poll(1000);
        datas.forEach(data ->{
            System.out.println(data);
        });
        // 关闭消费者对象
        kafkaConsumer.close();

在这里插入图片描述
点击下方名片,关注『编程青衫客』
随时随地获取最新好文章!

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/798360.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MySql性能调优04-[MySql事务与锁机制原理]

MySql事务与锁机制原理 从undo与redo日志&#xff0c;理解事务底层ACID底层原理事务四大隔离级别事务底层锁机制和MVCC并发优化机制串行化底层实现机制读已提交和可重复读底层实现MVCC机制详解脏写问题(重要)读已提交&#xff1f;实现机制 BufferPool缓存与redo日志是如何提升事…

【AI大模型】李彦宏从“卷模型”到“卷应用”的深度解析:卷用户场景卷能给用户解决什么问题

文章目录 一、理解李彦宏的发言1.1 李彦宏的核心观点1.2 背景分析 二、技术发展&#xff1a;从辨别式到生成式2.1 辨别式AI技术2.2 生成式AI技术2.3 技术发展的挑战 三、“卷应用”&#xff1a;聚焦实际应用与价值3.1 应用为王3.2 技术落地的关键 四、“卷场景”&#xff1a;多…

007-端口隔离

端口隔离配置 端口隔离简介 为了实现报文之间的二层隔离&#xff0c;可以将不同的端口加入不同的VLAN&#xff0c;但会浪费有限的VLAN资源。采用端口隔离特性&#xff0c;可以实现同一VLAN内端口之间的隔离。 设备支持以下方式进行端口隔离&#xff1a; 基于隔离组的端口隔…

idea中打开静态网页端口是63342而不是8080

问题&#xff1a; 安装了tomcat 并且也配置了环境&#xff0c;但是在tomcat下运行&#xff0c;总是在63342下面显示。这也就意味着&#xff0c;并没有运行到tomcat环境下。 找了好几个教程&#xff08;中间还去学习了maven&#xff0c;因为跟的教程里面&#xff0c;没有maven,但…

LabVIEW心电信号自动测试系统

开发了一种基于LabVIEW的心电信号自动测试系统&#xff0c;通过LabVIEW开发的上位机软件&#xff0c;实现对心电信号的实时采集、分析和自动化测试。系统包括心电信号采集模块、信号处理模块和自动化测试模块&#xff0c;能够高效、准确地完成心电信号的测量与分析。 硬件系统…

EmlogPro资源博客主题assets源码_仿ripro日主题

EmlogPro资源博客主题assets源码&#xff0c;有点像ripro日主题&#xff0c;做资源网非常合适&#xff0c;首页支持幻灯片&#xff0c;分类展示&#xff0c;本月热门资源。 源码下载&#xff1a;assets.zip 需要按照安装说明使用&#xff01; 我看了一下很多地方还有点问题&…

Open3d入门 一文读懂三维点云

三维点云技术的发展始于20世纪60年代&#xff0c;随着激光雷达和三维扫描技术的进步&#xff0c;在建筑、考古、地理信息系统和制造等领域得到了广泛应用。20世纪90年代&#xff0c;随着计算机处理能力的提升&#xff0c;点云数据的采集和处理变得更加高效&#xff0c;推动了自…

[数仓]十二、离线数仓(Atlas元数据管理)

第1章 Atlas入门 1.1 Atlas概述 Apache Atlas为组织提供开放式元数据管理和治理功能,用以构建其数据资产目录,对这些资产进行分类和管理,并为数据分析师和数据治理团队,提供围绕这些数据资产的协作功能。 Atlas的具体功能如下: 元数据分类 支持对元数据进行分类管理,例…

深入探讨:CPU问题的深度分析与调优

引言 你是否曾经遇到过这样的情况:系统运行突然变慢,用户抱怨不断,检查后发现CPU使用率居高不下?这时候,你会如何解决?本文将详细解析CPU问题的分析与调优方法,帮助你在面对类似问题时游刃有余。 案例分析:一次CPU性能瓶颈的解决过程 某知名互联网公司在一次促销活动…

Jenkins中Node节点与构建任务

目录 节点在 Jenkins 中的主要作用 1. 分布式构建 分布式处理 负载均衡 2. 提供不同的运行环境 多平台支持 特殊环境需求 3. 提高资源利用率 动态资源管理 云端集成 4. 提供隔离和安全性 任务隔离 权限控制 5. 提高可扩展性 横向扩展 高可用性 Jenkins 主服务…

<数据集>绝缘子缺陷检测数据集<目标检测>

数据集格式&#xff1a;VOCYOLO格式 图片数量&#xff1a;2139张 标注数量(xml文件个数)&#xff1a;2139 标注数量(txt文件个数)&#xff1a;2139 标注类别数&#xff1a;8 标注类别名称&#xff1a;[insulator, broken disc, pollution-flashover, Two glass, Glassdirt…

Flexcel学习笔记

1.引用的单元 FlexCel.Core 始终需要使用的一个单元。 多系统运行时。{$IFDEF LINUX}SKIA.FlexCel.Core{$ELSE}{$IFDEF FIREMONKEY}FMX.FlexCel.Core{ $ELSE}VCL.FlexCel.Core{$ENDIF}{$ENDIF} FlexCel.XlsAdapter这是FlexCel的xls/x引擎。如果您正在处理xls或xlsx文件&#x…

软件模块的初始化

什么是初始化&#xff1f; 软件的初始化&#xff08;Initialization&#xff09;是指软件启动或重新配置时执行的一系列步骤和过程&#xff0c;旨在准备软件运行环境、加载必要的配置信息、检查系统依赖项、分配资源&#xff08;如内存、文件句柄等&#xff09;&#xff0c;以及…

小白学python(第七天)

哈哈&#xff0c;这个系列的文章也有一段时间没更新&#xff0c;主要是最近在忙c嘎嘎&#xff0c;不过没事接下来会优先更python啦&#xff0c;那么我们先进入正题吧 函数的定义及调用 函数定义 格式&#xff1a;def 函数名&#xff08;形参列表&#xff09;&#xff1a; 语…

ctfshow-web入门-php特性(web100-web103)is_numeric 函数绕过

目录 1、web100 2、web101 3、web102 4、web103 1、web100 提示&#xff1a;flag in class ctfshow&#xff0c;我们只需要构造输出 ctfshow 这个类即可。 代码分析&#xff1a; $v0is_numeric($v1) and is_numeric($v2) and is_numeric($v3); if($v0){ 虽然逻辑运算符的…

Web浏览器485通讯读取RFID卡号js JavaScript

本示例使用设备&#xff1a;485通讯液显带键盘RFID打菲计件读卡器工位机串口可二次开发编程-淘宝网 (taobao.com) <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> …

C++知识点:C和C++(自用)

C和C 1.类和结构体的关系&#xff1a;2.面向对象和面向过程3.头文件和标准命名空间4.cin和cout5. const在C中和C中的区别6.const全局作用域7 new和delete8 内联函数9 函数重载10. 函数重载的匹配 引用&#xff1a; [1]C语言中文网 1.类和结构体的关系&#xff1a; 类是结构体的…

vue3实现无缝滚动列表(大屏数据轮播场景)

实现思路 vue3目前可以通过第三方组件来实现这个需求。 下面介绍一下这个第三方滚动组件--vue3-scroll-seamless vue3-scroll-seamless 是一个用于 Vue 3 的插件&#xff0c;用于实现无缝滚动的组件。它可以让内容在水平或垂直方向上无缝滚动&#xff0c;适用于展示轮播图、新…

值得关注的数据资产入表

不错的讲解视频&#xff0c;来自&#xff1a;第122期-杜海博士-《数据资源入表及数据资产化》-大数据百家讲坛-厦门大学数据库实验室主办第122期-杜海博士-《数据资源入表及数据资产化》-大数据百家讲坛-厦门大学数据库实验室主办-20240708_哔哩哔哩_bilibili

深度学习和NLP中的注意力和记忆

深度学习和NLP中的注意力和记忆 文章目录 一、说明二、注意力解决了什么问题&#xff1f;#三、关注的代价#四、机器翻译之外的关注#五、注意力&#xff08;模糊&#xff09;记忆&#xff1f;# 一、说明 深度学习的最新趋势是注意力机制。在一次采访中&#xff0c;现任 OpenAI 研…