kafka入门(一):kafka消息消费

安装kafka,创建 topic:

Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851

Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353

添加依赖包:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.1.10.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

kafka代码示例(一):

主要按照以下步骤:

  • 设置 broker服务器的ip和端口, 设置 消费者群组id

  • 初始化消费者

  • 消费者订阅主题

  • 消费者批量拉取消息

public class KafkaDemo1 {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "myTopic1";
    public static final String GROUP_ID = "group.demo";

    public static void main(String[] args) {
        consumerRecord();
    }

    public static void consumerRecord() {
        //属性配置
        Properties properties = getProperties(BROKER_LIST, GROUP_ID);
        //消费者初始化
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //消息者订阅主题
        consumer.subscribe(Collections.singletonList(TOPIC));
        //循环
        while (true) {
            //每次拉取 1千条消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("=============> 消费kafka消息:"+ record.value());
            }
        }
    }

    public static Properties getProperties(String brokerList, String groupId) {
        Properties properties = new Properties();
        //序列化
        properties.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        //broker服务器的ip和端口,多个用逗号隔开
        properties.put("bootstrap.servers", brokerList);
        //消费者群组id
        properties.put("group.id", groupId);
        return properties;
    }

}


使用文章开头安装好的 kafka,并按文章中的步骤,创建 topic ,打开一个 生产者 producer,并发送消息。
在这里插入图片描述

观察idea 控制台,可以看到 成功消费了消息:

=============> 消费kafka消息:hello kafka

参考资料:

《深入理解kafka 核心设计与实践原理》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/162500.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

联想笔记本电脑触摸板失灵了怎么办

这里写自定义目录标题 thinkbook笔记本电脑触摸板失灵 thinkbook笔记本电脑触摸板失灵 由于重装系统&#xff0c;导致笔记本的触控板失灵&#xff0c; 网上说的办法有 1、按键盘上的ctrlf6键&#xff0c;打开触控板功能&#xff1a;无效 2、设置——>设备——>触控板&am…

SQL注入1

对sql进行一个小结 还有其他的注入 其他注入:堆叠注入&#xff0c;宽字节注入&#xff0c;二次注入 首先是数值和字符 id1 and 11和id1 and 12 如果这两个语句返回的页面不一样就说明是数字型 id1 and 11#和id1 and 12# 如果这两个语句返回的页面不一样就说明是字符型 常…

【Qt开发流程】之HelloWorld程序

【Qt开发流程】之HelloWorld程序 目的编写程序新建项目文件说明及界面设计 程序运行及发布程序运行程序发布手动构建使用windeployqt进行构建 设置应用程序图标修改快捷键类型列表命令行编译程序命令行编译.ui文件自定义类项目模式及项目文件介绍项目模式项目文件 目的 从Hell…

【Java 进阶篇】深入理解 JQuery 事件绑定:标准方式

在前端开发中&#xff0c;处理用户与页面的交互是至关重要的一部分。JQuery作为一个广泛应用的JavaScript库&#xff0c;为我们提供了简便而强大的事件绑定机制&#xff0c;使得我们能够更加灵活地响应用户的行为。本篇博客将深入解析 JQuery 的标准事件绑定方式&#xff0c;为…

vue2【axios请求】

1&#xff1a;axios作用 axios&#xff08;发音&#xff1a;艾克c奥斯&#xff09;是前端圈最火的&#xff0c;专注于数据请求的库。 Axios 是一个基于 promise 的 HTTP 库&#xff0c;可以用在浏览器和 node.js 中axios的github:https://github.com/axios/axios 中文官网地址…

【C++】类和对象(7)--友元, static成员

目录 一 友元 1 友元概念 2 友元函数 3 友元类 二 static成员 1 概念 2 用法 3 static成员特性 4 例题 一 友元 1 友元概念 友元提供了一种突破封装的方式&#xff0c;有时提供了便利。但是友元会增加耦合度&#xff0c;破坏了封装&#xff0c;所以 友元不宜多用。 …

QGIS003:【05高级数字化工具栏】-要素移动、修改、合并操作

摘要&#xff1a;QGIS地图导航工具栏包括激活高级数字化工具、移动要素、旋转要素、缩放要素、简化要素、添加环、添加部件、填充环、删除环、删除部件、重塑要素、偏移曲线、反转线、裁剪/扩展要素、分割要素、分割部件、合并所选要素、合并所选要素的属性、旋转点符号等选项&…

Git配置代理:fatal: unable to access*** github Failure when receiving data from

~吐槽一下 github自从被微软收购以后&#xff0c;大多数情况没点科技上网都进不去了&#xff0c;还是怀念以前随时访问的时光。 我一直都是开着系统代理的&#xff0c;但是今天拉一个项目发现拉不下来了&#xff0c;报错&#xff1a; fatal: unable to access https://githu…

try-with-resources(TWR)方式关闭流资源

使用传统的方式关闭流资源当然没问题&#xff0c;但是需要写的代码有点多&#xff0c;而try-with-resources是Java 7 之后的新语法糖&#xff0c;旨在减轻开发人员释放try块中使用的资源的压力。一时用TWR方式一时爽&#xff0c;一直用一直爽&#xff0c;但是有些情况也是不能使…

HTTP 到 HTTPS 再到 HSTS 的转变

近些年&#xff0c;随着域名劫持、信息泄漏等网络安全事件的频繁发生&#xff0c;网站安全也变得越来越重要&#xff0c;也促成了网络传输协议从 HTTP 到 HTTPS 再到 HSTS 的转变。 HTTP HTTP&#xff08;超文本传输协议&#xff09; 是一种用于分布式、协作式和超媒体信息系…

如何零基础自学AI人工智能

随着人工智能&#xff08;AI&#xff09;的快速发展&#xff0c;越来越多的有志之士被其强大的潜力所吸引&#xff0c;希望投身其中。然而&#xff0c;对于许多零基础的人来说&#xff0c;如何入门AI成了一个难题。本文将为你提供一份详尽的自学AI人工智能的攻略&#xff0c;帮…

Git企业开发级讲解(五)

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、bug 分⽀二、删除临时分支三、小结 一、bug 分⽀ 假如我们现在正在 dev2 分⽀上进⾏开发…

【数据结构与算法】线性表 - 顺序表

目录 1. 线性表2.顺序表3.顺序表的优缺点4.实现&#xff08;C语言&#xff09;4.1 头文件 seqList.h4.2 实现 seqList.c 1. 线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使用的数据结构&#xff0c;常见…

gRPC 的原理 介绍带你从头了解gRPC

gRPC 的原理 什么是gRPC gRPC的官方介绍是&#xff1a;gRPC是一个现代的、高性能、开源的和语言无关的通用 RPC 框架&#xff0c;基于 HTTP2 协议设计&#xff0c;序列化使用PB(Protocol Buffer)&#xff0c;PB 是一种语言无关的高性能序列化框架&#xff0c;基于 HTTP2PB 保…

Qt布局技巧

可以先把控件放置了&#xff0c;再选中所有控件右键布局 或者是点击上面的&#xff1a;

cesium雷达效果(脉冲圆)

cesium雷达效果(脉冲圆) 下面富有源码 实现思路 使用ellipse方法加载圆型,修改ellipse中‘material’方法重写glsl来实现当前效果 示例代码 index.html <!DOCTYPE html> <html lang="en"><head>

CronExpression

CronTrigger配置格式: 格式: [秒] [分] [小时] [日] [月] [周] [年]序号 说明 是否必填 允许填写的值 允许的通配符 1 秒 是 0-59 , - * / 2 分 是 0-59 , - * / 3 小时 是 0-23 , - * / 4 日 是 1-31 , - * ? / L W 5 月 是 1-12 or JA…

Matlab群体智能优化算法之海象优化算法(WO)

文章目录 一、灵感来源二、算法的初始化三、GTO的数学模型Phase1&#xff1a;危险信号和安全信号Phase2&#xff1a;迁移&#xff08;探索&#xff09;Phase3&#xff1a;繁殖&#xff08;开发&#xff09; 四、流程图五、伪代码六、算法复杂度七、WO搜索示意图八、实验分析和结…

jbase打印导出实现

上一篇实现了虚拟M层&#xff0c;这篇基于虚拟M实现打印导出。 首先对接打印层 using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; using System.Xml;namesp…

探索NLP中的核心架构:编码器与解码器的区别

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…