kafka 快速上手

 下载 Apache Kafka

  演示window 安装

   编写启动脚本,脚本的路径根据自己实际的来

启动说明

先启动zookeeper后启动kafka,关闭是先关kafka,然后关闭zookeeper

巧记: 铲屎官(zookeeper)总是第一个到,最后一个走

启动zookeeper

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

启动kafka  

call bin/windows/kafka-server-start.bat config/server.properties

 测试脚本,主要用于创建主题 ‘test-topic’

# 创建主题(窗口1)
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic  --create

# 查看主题
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --list
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --describe

# 修改某主题的分区
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --alter --partitions 2


# 生产消息(窗口2)向test-topic主题发送消息
bin/window> kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test-topic
>hello kafka

# 消费消息(窗口3)消费test-topic主题的消息
bin/window> kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic
package com.ldj.kafka.admin;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

/**
 * User: ldj
 * Date: 2024/6/13
 * Time: 0:00
 * Description: 创建主题
 */
public class AdminTopic {

    public static void main(String[] args) {
        Map<String, Object> adminConfigMap = new HashMap<>();
        adminConfigMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        NewTopic topic1 = new NewTopic("topic-01", 1, (short) 1);
        NewTopic topic2 = new NewTopic("topic-02", 2, (short) 1);

        AdminClient adminClient = AdminClient.create(adminConfigMap);
        CreateTopicsResult addResult = adminClient.createTopics(Arrays.asList(topic1, topic2));

        //DeleteTopicsResult delResult = adminClient.deleteTopics(Arrays.asList("topic-02"));
        
        adminClient.close();
    }

}
package com.ldj.kafka.producer;

import com.alibaba.fastjson.JSON;
import com.ldj.kafka.model.UserEntity;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
 * User: ldj
 * Date: 2024/6/12
 * Time: 21:08
 * Description: 生产者
 */
public class KfkProducer {

    public static void main(String[] args) {

        //生产者配置
        Map<String, Object> producerConfigMap = new HashMap<>();
        producerConfigMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        producerConfigMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerConfigMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        //创建生产者
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfigMap);

        //构建消息 ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
        try {
            for (int i = 0; i < 10; i++) {
                UserEntity userEntity = new UserEntity()
                        .setUserId(2436687942335620L + i)
                        .setUsername("lisi")
                        .setGender(1)
                        .setAge(18);

                ProducerRecord<String, String> record = new ProducerRecord<>(
                        "test-topic",
                        userEntity.getUserId().toString(),
                        JSON.toJSONString(userEntity));
                //发送数据到Broker

                producer.send(record, (RecordMetadata var1, Exception var2) -> {
                    if (Objects.isNull(var2)) {
                        System.out.printf("[%s]消息发送成功!", userEntity.getUserId());
                    } else {
                        System.out.printf("[%s]消息发送失败!err:%s", userEntity.getUserId(), var2.getCause());
                    }
                });
            }
        } finally {
            //关闭通道
            producer.close();
        }
    }

}
package com.ldj.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
 * User: ldj
 * Date: 2024/6/12
 * Time: 21:10
 * Description: 消费者
 */
public class KfkConsumer {

    public static void main(String[] args) {

        //消费者配置
        Map<String, Object> consumerConfigMap = new HashMap<>();
        consumerConfigMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerConfigMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerConfigMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //所属消费组
        consumerConfigMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test123456");

        //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfigMap);

        //消费主题的消息  ConsumerRebalanceListener
        consumer.subscribe(Collections.singletonList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
                //数据存储结构:Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value());
                }
            }
        } finally {
            //关闭消费者
            consumer.close();
        }
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/703974.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

论文阅读ReLU-KAN和Wav-KAN

这是我读KAN系列论文的第三篇&#xff0c;今天把两篇论文放在一起写&#xff0c;分别是&#xff1a; ReLU-KAN&#xff1a; https://arxiv.org/abs/2406.02075 Wav-KAN&#xff1a; https://arxiv.org/abs/2405.12832 之所以放在一起&#xff0c;是因为这两篇论文针对KAN的…

用映射对比ab俩个数组 , 并把a的参数传递给b

项目背景 : react ant 需求 : 在项目进行表头设置时,根据aaa中的key和bbb中的name对应 , 并将sort值插入到bbb中 其中 a b 结构如下 具体实现 aaa[ { key: "orderNumber", orderNumber: "工单编号", sort: 1 } ... ]bbb [ { name: "orderNumber…

运行mvn命令打包项目jar包报错?“Fatal error compiling: 无效的目标发行版: 19 ”, 让我来看看~

最近写实验&#xff0c;要打包项目&#xff0c;但是不管是在cmd运行“mvn clean package -Dmaven.test.skiptrue”命令&#xff0c;还是在idea上去操作&#xff0c;都出现了这样的一个错误&#xff1a; [EROR] Failed to exeoute goal org.apache.maven.plugins:maven-comnpile…

Scapy——捕获网络流量(本机Win+虚拟机Ubuntu)

1、简介 Scapy 是一个强大的 Python 程序库&#xff0c;用于网络数据包处理、生成和嗅探。它提供了对网络层和传输层的深入控制&#xff0c;允许用户创建、修改、发送和接收数据包。主要功能&#xff1a; 数据包生成&#xff1a; 可以轻松生成各种类型的网络数据包&#xff0c…

C语言—数据在内存中的存储

1.整数在内存中的存储 文章回顾&#xff08;C语言—操作符详解&#xff09; 整数的2进制表示方式有三种&#xff0c;即原码、反码和补码。 有符号的整数&#xff0c;三种表示方式均有符号位和数值位两部分&#xff0c;符号位都是用0表示“正”&#xff0c;用1表示“负”&…

Salesforce‘s 爱因斯坦机器人助手引领工业聊天机器人时代

CRM的对话式人工智能助手&#xff0c;根据公司数据提供可靠的人工智能响应及日本工业聊天机器人现状 【前言】 爱因斯坦助手&#xff08;Einstein Copilot&#xff09;提供可靠的响应&#xff0c;因为它基于公司独特的数据和元数据&#xff0c;使其能够深入了解公司的业务和客…

Flutter- AutomaticKeepAliveClientMixin 实现Widget保持活跃状态

前言 在 Flutter 中&#xff0c;AutomaticKeepAliveClientMixin 是一个 mixin&#xff0c;用于给 State 类添加能力&#xff0c;使得当它的内容滚动出屏幕时仍能保持其状态&#xff0c;这对于 TabBarView 或者滚动列表中使用 PageView 时非常有用&#xff0c;因为这些情况下你…

-------------------------面试散文-----------------------------------

问题1&#xff1a;vue中动态引入图片&#xff0c;为什么使用require&#xff1f; 回答&#xff1a;因为动态添加的src 编译过后的文件地址和被编译过后的资源文件地址不一致&#xff0c;从而导致无法访问题 而使用require 返回的就是资源文件被编译后的文件地址&#xff0c;从…

[大模型]MiniCPM-2B-chat Lora Full 微调

MiniCPM-2B-chat 介绍 MiniCPM 是面壁智能与清华大学自然语言处理实验室共同开源的系列端侧大模型&#xff0c;主体语言模型 MiniCPM-2B 仅有 24亿&#xff08;2.4B&#xff09;的非词嵌入参数量。 经过 SFT 后&#xff0c;MiniCPM 在公开综合性评测集上&#xff0c;MiniCPM …

鸿蒙开发文件管理:【@ohos.securityLabel (数据标签)】

数据标签 该模块提供文件数据安全等级的相关功能&#xff1a;向应用程序提供查询、设置文件数据安全等级的JS接口。 说明&#xff1a; 本模块首批接口从API version 9开始支持。后续版本的新增接口&#xff0c;采用上角标单独标记接口的起始版本。 导入模块 import security…

pycharm终端pip安装模块成功但还是显示找不到 ModuleNotFoundError: No module named

报错信息&#xff1a; ModuleNotFoundError: No module named 但是分明已经安装过此模块&#xff1a; 在cmd运行pip list 查看所有安装过的包找到了安装过&#xff1a; 如果重新安装就是这样&#xff1a;显示已经存在了 问题排查&#xff1a; 直接根据重新安装的显示已存在的…

用 KV 缓存量化解锁长文本生成

很高兴和大家分享 Hugging Face 的一项新功能: KV 缓存量化 &#xff0c;它能够把你的语言模型的速度提升到一个新水平。 太长不看版: KV 缓存量化可在最小化对生成质量的影响的条件下&#xff0c;减少 LLM 在长文本生成场景下的内存使用量&#xff0c;从而在内存效率和生成速度…

禁渔期水域监管:EasyCVR视频智能监控方案

一、背景与需求分析 根据农业部印发的《中国渔政亮剑2024系列专项执法行动方案》&#xff0c;我国将持续推进长江十年禁渔、海洋伏季休渔、黄河等内陆重点水域禁渔等专项行动。根据四川省相关规定&#xff0c;每年3月1日至6月30日为禁渔期&#xff0c;在此期间&#xff0c;四川…

C语言 | Leetcode C语言题解之第148题排序链表

题目&#xff1a; 题解&#xff1a; struct ListNode* merge(struct ListNode* head1, struct ListNode* head2) {struct ListNode* dummyHead malloc(sizeof(struct ListNode));dummyHead->val 0;struct ListNode *temp dummyHead, *temp1 head1, *temp2 head2;while…

等级考试3-2021年12月题目

十二月&#xff1a; #include <iostream> using namespace std; int sum(int); int main(){int n;cin>>n;for(int i1;i<100000;i){for(int j1;j<i;j){if(sum(i)-j*2n){cout<<j<< <<i;return 0; }}}return 0; } int sum(int n){int sum0;fo…

大数据学习——linux操作系统(Centos)安装mysql(Hive的元数据库)

一. 准备工作 1. 打开虚拟机并连接shell工具 2. 将mysql安装包上传至虚拟机 mysql安装包 提取码&#xff1a;6666 将下载好的jar包拖至install_package目录下 3. 检查环境 rpm -qa|grep mariadb 如果上述命令返回有结果 那么进行mariadb的卸载 rpm -e --nodeps mariadb-l…

【前端项目笔记】1 登录与登出功能实现

项目笔记 ☆☆代表面试常见题 前后端分离&#xff1a;后端负责写接口&#xff0c;前端负责调接口。 登录/退出功能 登录业务流程 登录页面&#xff1a;用户名密码 调用后台接口进行验证 通过验证&#xff0c;根据后台响应状态跳到项目主页 登录业务相关技术点&#xff1…

计算机网络 —— 运输层(四次挥手)

计算机网络 —— 运输层&#xff08;四次挥手&#xff09; 四次挥手客户端到服务器的关闭第一次挥手第二次挥手 服务器到客户端连接的关闭第三次挥手第四次挥手 等待时间的必要性 我们今天来看TCP协议的四次挥手&#xff1a; 四次挥手 TCP的四次挥手&#xff08;Four-Way Han…

BigDecimal-解决java中的浮点运算

《阿里巴巴 Java 开发手册》中提到&#xff1a;“为了避免精度丢失&#xff0c;可以使用 BigDecimal 来进行浮点数的运算”。浮点数的运算竟然还会有精度丢失的风险吗&#xff1f;确实会&#xff01; 示例代码&#xff1a; float a 2.0f - 1.9f; float b 1.8f - 1.7f; Syst…

【沟通管理】项目经理《葵花宝典》之跨部门沟通

为什么每次跟其它部门的沟通总是不欢而散&#xff1f; 为什么每次想好好的就事论事的时候&#xff0c;却总是像在吵架&#xff1f; 为什么沟通总是不同频&#xff1f; 这是不是你作为项目经理在跨部门沟通时经常会遇到的问题&#xff1f; 在企业项目管理中&#xff0c;跨部门沟…