Nodejs 第八十章(Kafka高级)

在这里插入图片描述

kafka前置知识在前几章章讲过了 不再复述

Kafka集群操作

1.创建多个kafka服务

拷贝一份kafka完整目录改名为kafka2

修改配置文件 kafka2/config/server.properties 这个文件

broker.id=1 //唯一broker
port=9093 //切换端口
listeners=PLAINTEXT://:9093 //切换监听源

启动zooKeeper和kafka和kafka2

.\bin\windows\kafka-server-start.bat .\config\server.properties

2.客户端管理

查看集群信息和客户端对象

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

const admin = kafka.admin() //创建admin对象
await admin.connect() //连接kafka
const cluster = await admin.describeCluster() //获取集群信息

返回值 可以查看连接集群的信息比如端口id等

{
  brokers: [
    { nodeId: 0, host: '26.26.26.1', port: 9092 },
    { nodeId: 1, host: '26.26.26.1', port: 9093 }
  ],
  controller: 0,
  clusterId: 'XHa77me4TZWO8cfWSTHoaQ'
}

创建主题createTopics将解析true主题是否已成功创建或false是否已存在。如果发生错误,该方法将抛出异常

删除主题admin.deleteTopics 传入删除的主题

查看主题列表listTopics列出所有现有主题的名称,并返回一个字符串数组。如果发生错误,该方法将抛出异常`

//创建主题
await admin.createTopics({
    topics: [
        { topic: 'xiaoman', numPartitions: 1, replicationFactor: 1 },
        { topic: 'xiaoman2', numPartitions: 1, replicationFactor: 1 },
    ],
})
//删除主题
await admin.deleteTopics({ topics: ['xiaoman', 'xiaoman2'] })
//查看主题
await admin.listTopics().then(topics => {
    console.log('topics', topics)
})
3.事务

KafkaJS 提供了对 Kafka 事务的支持,可以使用它来执行具有事务特性的操作。Kafka 事务用于确保一组相关的消息要么全部成功提交要么全部回滚,从而保持数据的一致性

import { Kafka, CompressionTypes } from 'kafkajs'

const kafka = new Kafka({
    clientId: 'my-app', //客户端标识
    brokers: ['localhost:9092', 'localhost:9093'], //kafka集群
})

//生产者
const producer = kafka.producer({
    transactionalId: '填写事务ID',
    maxInFlightRequests: 1, //最大同时发送请求数
    idempotent: true, //是否开启幂等提交
})
//连接服务器
await producer.connect()

const transaction = await producer.transaction()
try {
    await transaction.send({
        topic: 'xiaoman',
        messages: [{ value: '100元' }],
    })
    await transaction.commit() // 事务提交
}
catch (e) {
    console.log(e)
    await transaction.abort() // 事务提交失败,回滚
}
await admin.disconnect()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/797444.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

多表联合的查询(实例)、对于前端返回数据有很多表,可以分开操作、debug调试教程

2024.7.13 一、 对于多表的更深层的认识1. 认识2. 多表联合查询的列子:3. 对于多表查询的进一步认识4. 在实现功能的时候,原本对于省市县这样的表,对于项目的要求,是直接全部查询出来,然后开始使用,但我想着…

Elasticsearch:使用 Amazon Bedrock 的 semantic_text

作者:来自 Elastic Gustavo Llermaly 使用 semantic_text 新功能,并使用 AWS Bedrock 作为推理端点服务。 Elasticsearch 的新 semantic_text 映射类型旨在简化构建 RAG 应用程序的常见挑战。它整合了文本分块、生成嵌入以及检索嵌入的步骤。 在本文中…

C++进阶(while循环——函数应用)

知识点代码框架总结 输入n组数据 ,对n组数据里面的每一组进行处理(输出、求和 、运算、其他) int n;//几组数据cin >> n;//2while(n--){//对每组数据进行处理}看到下面的样例,肌肉型反映出上面的框架//2// 1 2 3// 4 5 6若…

机器学习筑基篇,Jupyter Notebook 精简指南

[ 知识是人生的灯塔,只有不断学习,才能照亮前行的道路 ] 0x00 Jupyter Notebook 简明指南 描述:前面我们已经在机器学习工作站(Ubuntu 24.04 Desktop Geforce RTX 4070Ti SUPER)中安装 Anaconda 工具包,其…

Oracle23ai 新特性IF [NOT] EXISTS 语法支持

Oracle23ai 新特性IF [NOT] EXISTS Syntax Support 官方文档地址 https://docs.oracle.com/en/database/oracle/oracle-database/23/lnpls/release-changes.html#GUID-9EE96980-43F9-4068-893E-C191CD83ACA6 IF [NOT] EXISTS 语法支持 CREATE、ALTER和DROP DDL语句支持IF NO…

python:绘制一元三次函数的曲线

编写 test_x3_3x.py 如下 # -*- coding: utf-8 -*- """ 绘制函数 y x^33x4 在 -3<x<3 的曲线 """ import numpy as np from matplotlib import pyplot as plt# 用于正常显示中文标题&#xff0c;负号 plt.rcParams[font.sans-serif] […

【人工智能】线性回归

目录 一、使用正规化方法计算下列样本的预测函数 1. 没有归一化之前 2. 归一化之后 二、读取ex1data2.txt中的数据&#xff0c;建立样本集&#xff0c;使用正规化法获取&#xff08;房屋面积&#xff0c;房间数量&#xff09;与房屋价格间的预测函数 1. 读取数据&#xff…

pico+unity3d项目配置

重点&#xff1a;unity编辑器版本要和pico的sdk要求一致、比如&#xff1a; 对于 Unity 2022.1.14 及以上版本&#xff0c;若同时在项目中使用 URP、Linear 色彩空间、四倍抗锯齿和OpenGL&#xff0c;会出现崩溃。该问题待 Unity 引擎解决。对于 Unity 2022&#xff0c;若同时…

Ubuntu20.04 编译安装FFmpeg,出错分析以及解决方案

最近工程上需要对FFmpeg底层源码进行修改&#xff0c;需要重新编译&#xff0c;遇见不少坑&#xff0c;出篇教程记录一下。 文章目录 1.FFmpeg源码下载地址2.编译环境配置3.编译FFmpeg4.配置FFmpeg运行环境 1.FFmpeg源码下载地址 官方下载地址:Index of /releases (ffmpeg.or…

寄存器分配

概述 寄存器位于CPU 或 GPU内部的少量高速存储器&#xff0c;通常用于保存机器指令的操作数 由于其价格昂贵&#xff0c;导致其数量有限&#xff0c;又由于存储速度快&#xff0c;使其不可或缺。因此&#xff0c;寄存器是计算机体系结构中的关键资源之一。在计算复杂表达式的…

Python爬虫速成之路(2):爬天气情况

hello hello~ &#xff0c;这里是绝命Coding——老白~&#x1f496;&#x1f496; &#xff0c;欢迎大家点赞&#x1f973;&#x1f973;关注&#x1f4a5;&#x1f4a5;收藏&#x1f339;&#x1f339;&#x1f339; &#x1f4a5;个人主页&#xff1a;绝命Coding-CSDN博客 &a…

常见问题记录(持续更新)

备注&#xff1a; 在7月10日记录之前遇到的问题及解决方法: 一&#xff1a;常见的访问问题&#xff1a; 403 Forbidden&#xff1a;&#xff08;未有请求权限&#xff09; 表示服务器理解请求但是拒绝执行它。这通常是由于服务器上的文件或资源没有正确的读、写或执行权限&…

IDEA启动Web项目总是提示端口占用

文章目录 IDEA启动Web项目总是提示端口占用一、前言1.场景2.环境 二、正文1.场景一:真端口占用2. 场景二:假端口占用 IDEA启动Web项目总是提示端口占用 一、前言 1.场景 IDEA启动Web项目总是提示端口占用&#xff1a; 确实是端口被占用&#xff0c;比如&#xff1a;没有正常…

复制vmware虚拟机文件并改名(文件名使用python替换)得到一台新的虚拟机

文章目录 需求实验复制文件夹并重命名使用python将所有文件名“WinSer2022”字符替换成“wingetmac”修改虚拟机配置文件&#xff08;.vmx&#xff09;打开新的虚拟机成功 需求 将已有的Winser2022虚拟机复制成wingetmac并开机 实验 复制文件夹并重命名 将"WinSer2022…

使用机器学习 最近邻算法(Nearest Neighbors)进行点云分析 (scikit-learn Open3D numpy)

使用 NearestNeighbors 进行点云分析 在数据分析和机器学习领域&#xff0c;最近邻算法&#xff08;Nearest Neighbors&#xff09;是一种常用的非参数方法。它广泛应用于分类、回归和聚类分析等任务。下面将介绍如何使用 scikit-learn 库中的 NearestNeighbors 类来进行点云数…

NSSCTF_RE(一)暑期

[SWPUCTF 2021 新生赛]简单的逻辑 nss上附件都不对 没看明白怎么玩的 dnspy分析有三个 AchievePoint , game.Player.Bet - 22m; for (int i 0; i < Program.memory.Length; i) { byte[] array Program.memory; int num i; array[num] ^ 34; } Environment.SetEnvironment…

全自主巡航无人机项目思路:STM32/PX4 + ROS + AI 实现从传感融合到智能规划的端到端解决方案

1. 项目概述 本项目旨在设计并实现一款高度自主的自动巡航无人机系统。该系统能够按照预设路径自主飞行&#xff0c;完成各种巡航任务&#xff0c;如电力巡线、森林防火、边境巡逻和灾害监测等。 1.1 系统特点 基于STM32F4和PX4的高性能嵌入式飞控系统多传感器融合技术实现精…

机器学习(五) -- 监督学习(6) --逻辑回归

系列文章目录及链接 上篇&#xff1a;机器学习&#xff08;五&#xff09; -- 监督学习&#xff08;5&#xff09; -- 线性回归2 下篇&#xff1a;机器学习&#xff08;五&#xff09; -- 监督学习&#xff08;7&#xff09; --SVM1 前言 tips&#xff1a;标题前有“***”的内…

GuLi商城-商品服务-API-品牌管理-JSR303分组校验

注解:@Validated 实体类: package com.nanjing.gulimall.product.entity;import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.nanjing.common.valid.ListValue; import com.nanjing.common.valid.Updat…

[Vulnhub] Stapler wp-videos+ftp+smb+bash_history权限提升+SUID权限提升+Kernel权限提升

信息收集 IP AddressOpening Ports192.168.8.106TCP:21,22,53,80,123,137,138,139,666,3306, Using Nmap for scanning: $ nmap -p- 192.168.8.106 --min-rate 1000 -sC -sV The results are as follows: PORT STATE SERVICE VERSION 20/tcp closed ftp-data…