Nodejs 第七十九章(Kafka进阶)

在这里插入图片描述

kafka前置知识在上一章讲过了 不再复述

kafka进阶

1. server.properties配置文件

server.properties是Kafka服务器的配置文件,它用于配置Kafka服务的各个方面,包括网络设置、日志存储、消息保留策略、安全认证

#broker的全局唯一编号,不能重复
broker.id=0
#端口号
port=9092
#处理网络请求的线程数量
#接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
num.network.threads=3
#用来处理磁盘IO的线程数量
#消息从内存中写入磁盘是时候使用的线程数量。
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=./logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#每个topic的分区数
offsets.topic.replication.factor=1
#每个topic的副本数
transaction.state.log.replication.factor=1
#每个topic的最小副本数
transaction.state.log.min.isr=1
#日志保留时间,单位小时 168就是7天
log.retention.hours=168
#定期检查日志是否过期的间隔,单位毫秒
log.retention.check.interval.ms=300000
#日志清理器是否启用
log.cleaner.enable=true
#zookeeper地址
zookeeper.connect=localhost:2181
#zookeeper连接超时时间
zookeeper.connection.timeout.ms=18000
#zookeeper会话超时时间
group.initial.rebalance.delay.ms=0
2.producer.properties配置文件

producer.properties是Kafka生产者客户端的配置文件,用于配置Kafka生产者的行为和属性。当你使用Kafka生产者API发送消息到Kafka集群时,可以使用该配置文件哟

#配置生产者的broker列表 可以配置多个,以逗号隔开 也就是做集群的
#来获取每一个topic的分片数等元数据信息。
bootstrap.servers=localhost:9092
# 配置数据压缩方式 有none,gzip,snappy,lz4,zstd
compression.type=none
#客户端等待请求的响应的最长时间 超时时间
#request.timeout.ms=
#定期发送消息的时间间隔,一般配合batch.size使用,例如设置了50ms,那么每50ms就会发送一次消息合集
#linger.ms=
#每次发送给Kafka服务器请求消息的最大大小
#max.request.size=
#批量发送消息比如说设置了值16KB,那么消息内容凑够16KB就会被发送出去,否则就不会发送,这样可以避免单条消息太大导致的发送失败
#batch.size=
#约束producer缓存池的大小,默认是32MB,可以根据实际情况调整
#buffer.memory=
3.consumer.properties配置文件

用于配置Kafka消费者的属性。它包含了一系列用于定义消费者行为的参数和数值

#定义Kafka的Broker列表 可以配置多个,以逗号隔开 也就是做集群的
bootstrap.servers=localhost:9092
#定义消费者组的ID
group.id=test-consumer-group
#用于指定当消费者加入一个消费者组但没有可用的消费位移时的行为
#有三种选项 earliest/latest/none
#earliest:表示消费者将从最早的可用消费位移开始消费。消费者将从主题的最早消息开始消费,即使这些消息已经过期。
#latest:表示消费者将从最新的可用消费位移开始消费。消费者将从主题的最新消息开始消费,即跳过已经过期的消息。
#none:表示如果没有可用的消费位移,消费者将抛出异常。这样可以确保消费者只消费已经提交的消费位移。
#auto.offset.reset=
#心跳间隔用于保持消费者活跃状态
#session.timeout.ms
#指定消费者一次性获取最大的消息数量,如果为0表示不限制
#fetch.max.bytes=1048576
#指定消费者一次性获取的最大等待时间,如果为0表示不限制
#fetch.max.wait.ms=500

消息模式

kafka同样支持发布订阅的方式发送消息 我们来编写一下案例

官方文档 https://kafka.js.org/docs/getting-started

1. 压缩

引入CompressionTypes 选择压缩模式 GIZP LZ4 zSTD

import { Kafka,CompressionTypes } from 'kafkajs'

await producer.send({
    topic: 'xiaoman',
    compression: CompressionTypes.GZIP,
    messages: [
        {
            value: '测试数据1',
            headers: {
                'name': Buffer.from('小满')
            }
        },
        { value: Buffer.from('测试数据2') },
    ],
})
2. 标头

允许使用标头传递对象元数据,把需要传递的数据放在headers即可数据将一起被发送过去

await producer.send({
    topic: 'xiaoman',
    messages: [
        {
            value: '测试数据1',
            headers: {
                'name': Buffer.from('小满')
            }
        },
        { value: Buffer.from('测试数据2') },
    ],
})

消费者获取headers 元数据

await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        console.log({
            topic,
            partition,
            value: message.value.toString(),
            headers: message.headers?.name?.toString(),
        })
    },
})
3. 多主题派发

send 发方法换成 sendBatch 增加 topicMessages 是个数组

await producer.sendBatch({
    topicMessages: [
        {
            topic: 'xiaoman',
            messages: [
                { value: Buffer.from('测试数据1') },
            ],
        },
        {
            topic: 'xiaoman2',
            messages: [
                { value: Buffer.from('测试数据2') },
            ],
        },
    ],
})

消费多个消息时候的时候可以根据业务自由选择模式

  1. 逐条处理

  2. 批量处理(批量处理可以减少网络开销)

await consumer.subscribe({ topic: 'xiaoman', fromBeginning: true })
await consumer.subscribe({ topic: 'xiaoman2', fromBeginning: true })
//逐条处理
await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        console.log({
            topic,
            partition,
            value: message.value.toString(),
            headers: message.headers?.name?.toString(),
        })
    },
})
//批量处理
await consumer.run({
    eachBatch: async ({ batch }) => {
        batch.messages.forEach( (message) => {
            console.log('Received message', message.value.toString())
        })
    },
})

案例演示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/730651.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu系统如何配置通过图形界面登录root用户

Ubuntu系统中的root账号默认是锁定的,但可以通过设置密码来启用。 需要注意的是,由于root用户具有对系统完全控制的权限,因此在使用root账户时应格外小心。一个错误的命令可能会导致系统损坏,这就是为什么Ubuntu默认不启用root账户…

[SAP ABAP] 变量与常量

1.变量 定义变量的基本方式 DATA <name> TYPE <type> [VALUE <val>]. <name>&#xff1a;指定变量的名称 <type>&#xff1a;指定变量的数据类型 <val>&#xff1a;指定<name>的初始值 示例1 定义变量lv_data1和lv_data3 输出结果…

qt 简单实验 画一个等边三角形

1.概要 2.代码 2.1 widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPainter>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent nullptr)…

U盘文件夹损坏0字节:现象解析、恢复方法与预防措施

在日常工作和生活中&#xff0c;U盘因其便携性和大容量成为我们存储和传输数据的重要工具。然而&#xff0c;当U盘中的文件夹突然损坏并显示为0字节时&#xff0c;我们可能会感到困惑和焦虑。本文将对U盘文件夹损坏0字节的现象进行详细描述&#xff0c;分析其可能的原因&#x…

python基础篇(3):print()补偿知识点

1 print输出不换行 默认print语句输出内容会自动换行&#xff0c;如下&#xff1a; print("hello") print(" world") 结果&#xff1a; 在print语句中&#xff0c;加上 end’’ 即可输出不换行了 print("hello",end) print(" world&quo…

pywinauto入门指南:轻松掌握Windows GUI自动化

pywinauto库概述: pywinauto是一个Python库,主要用于自动化Windows应用程序的GUI测试和操作.它提供了一组简单而强大的API,可以模拟用户与Windows应用程序的交互,包括点击按钮、输入文本、选择菜单等操作. 安装 ##pywinauto可以通过pip进行安装,打开命令行运行: pip install…

逻辑回归(Logistic Regression)及其在机器学习中的应用

&#x1f680;时空传送门 &#x1f50d;逻辑回归原理&#x1f4d5;Sigmoid函数&#x1f388;逻辑回归模型 &#x1f4d5;损失函数与优化&#x1f388;损失函数&#x1f680;优化算法 &#x1f50d;逻辑回归的应用场景&#x1f340;使用逻辑回归预测客户流失使用scikit-learn库实…

计算机网络 VLAN间路由单臂路由

一、理论知识 VLAN是一种将物理网络划分成多个逻辑网络的方法。不同的VLAN属于不同的网段&#xff0c;因此互相通信需要通过路由器进行路由。通常情况下&#xff0c;在同一VLAN内的设备可以直接通信&#xff0c;而不同VLAN之间的设备则需要通过路由器转发数据。本实验利用单臂…

HTTP性能测试工具-wrk

wrk性能测试工具详解 wrk是一款轻量级但功能强大的HTTP基准测试工具&#xff0c;主要用于在单机多核CPU环境下对HTTP服务进行性能测试。它通过利用系统自带的高性能I/O机制&#xff08;如epoll、kqueue等&#xff09;&#xff0c;结合多线程和事件模式&#xff0c;能够产生大量…

FPGA开发Vivado安装教程

前言 非常遗憾的一件事情是&#xff0c;在选修课程时我避开了FPGA&#xff0c;选择了其他方向的课程。然而&#xff0c;令我没有想到的是&#xff0c;通信项目设计的题目竟然使用FPGA&#xff0c;这简直是背刺。在仅有的半个月时间里&#xff0c;准备这个项目确实是非常紧张的…

c++里对 new 、delete 运算符的重载

&#xff08;1&#xff09;c 里 我们可以用默认的 new 和 delete 来分配对象和回收对象。 new 可以先申请内存&#xff0c;再调用对象的构造函数&#xff1b; delete 则先调用对象的析构函数&#xff0c;再回收内存。当然&#xff0c;当我们为类定义了 operator new () 和 oper…

千年古城的味蕾传奇-平凉锅盔

在甘肃平凉这片古老而神秘的土地上&#xff0c;有一种美食历经岁月的洗礼&#xff0c;依然散发着独特的魅力&#xff0c;那便是平凉锅盔。平凉锅盔&#xff0c;那可是甘肃平凉的一张美食名片。它外表金黄&#xff0c;厚实饱满&#xff0c;就像一轮散发着诱人香气的金黄月亮。甘…

高通Android 12 aapt报错问题踩坑

背景 最近因为要做多module模块&#xff0c;出现aapt报错&#xff0c;于是简单记录下&#xff0c;踩坑过程。 1、我一开始项目中三个module&#xff0c;然后在build.gradle设置androidApplication plugins {alias(libs.plugins.androidApplication) }2、运行完之后都是报下面…

当flex-direction: column时,设置flex:1不生效解决办法

当需求是: 页面纵向排列,且最后一个元素撑满剩余高度 flex:1在横向排列时是可以的,但是纵向排列会失效,此时需要给最后一个子元素设置align-self: stretch;即可撑满剩余高度 <div class"father"><div class"child child1"></div><div…

【数据库备份完整版】物理备份、逻辑备份,mysqldump、mysqlbinlog的备份方法

【数据库备份完整版】物理备份、逻辑备份&#xff0c;mysqldump、mysqlbinlog的备份方法 一、物理备份二、逻辑备份1.mysqldump和binlog备份的方式&#xff1a;2.mysqldump完整备份与恢复数据2.1 mysqldump概念2.2 mysqldump备份2.3 数据恢复2.4 **使用 Cron 自动执行备份**2.5…

客户集中度高,毛利率下滑,江苏永成的IPO之路能走通吗?

撰稿|行星 来源|贝多财经 近年来&#xff0c;汽车市场蓬勃向上&#xff0c;助推上游配套产业链进入增长热潮。 行业利好前景下&#xff0c;不少汽车上游供应商开始向资本市场进发&#xff0c;希望借助上市拓宽融资渠道&#xff0c;加速业务拓展和技术创新&#xff0c;在产业…

【单片机毕业设计选题24019】-基于STM32的安防监测灭火系统

系统功能: 1. 水泵喷水灭火功能&#xff1a;当火焰传感器监测到火焰时&#xff0c;蜂鸣器报警&#xff0c;水泵工作实现灭火。 2. 风扇功能&#xff1a;当烟雾传感器检测到CO或温度传感器检测到温度超过阈值时&#xff0c;蜂鸣器报警&#xff0c; 启动风扇进行驱散烟雾或降温…

椭圆的几何要素

椭圆的几何要素 flyfish 椭圆的方程为 x 2 a 2 y 2 b 2 1 \frac{x^2}{a^2} \frac{y^2}{b^2} 1 a2x2​b2y2​1。 长半轴 a a a&#xff08;绿色虚线&#xff09;和短半轴 b b b&#xff08;紫色虚线&#xff09;。 焦点 F 1 ( − c , 0 ) F1(-c, 0) F1(−c,0)&#…

学会python——获取文件信息(python实例八)

目录 1、认识Python 2、环境与工具 2.1 python环境 2.2 Visual Studio Code编译 3、获取文件信息 3.1 代码构思 3.2 代码示例 3.3 运行结果 4、总结 1、认识Python Python 是一个高层次的结合了解释性、编译性、互动性和面向对象的脚本语言。 Python 的设计具有很强的…

JS中的延时操作setTimeout()和setInterval()

JS中&#xff0c;给我们提供两种延时操作的内置方法setTimeout()和setInterval()。setTimeout和setInterval方法都是挂载在javascript的window对象下&#xff0c;通过两个参数控制&#xff0c;第一个参数控制运行的表达式或方法&#xff0c;第二个参数表示延时的时间&#xff0…