kafka安装说明以及在项目中使用

一、window 安装

1.1、下载安装包

  • 下载kafka 地址,其中官方版内置zk, kafka_2.12-3.4.0.tgz
  • 其中这个名称的意思是 kafka3.4.0 版本 ,所用语言 scala 版本为 2.12
    在这里插入图片描述

1.2、安装配置

1、解压刚刚下载的配置文件,解压后如下,其中 datakafka-logs 这两个文件是没有的

在这里插入图片描述

2、修改配置:进入到config目录,

  • 修改service.properties里面log.dirs路径未 log.dirs=F:\kafka\installSurround\kafka3.4.0\kafka-logs,该目录是kafka的数据存储目录

在这里插入图片描述

  • 修改zookeeper.properties里面dataDir路径为 dataDir=F:\kafka\installSurround\kafka3.4.0\data,该目录是 zookeeper存储的kafka的数据目录
    在这里插入图片描述

3、server.properties说明

属性说明
log.dirs指定Broker需要使用的若干个文件目录路径,没有默认值,必须指定。在生产环境中一定要为log.dirs配置多个路径,如果条件允许,需要保证目录被挂载到不同的物理磁盘上。优势在于,提升读写性能,多块物理磁盘同时读写数据具有更高的吞吐量;能够实现故障转移(Failover),Kafka 1.1版本引入Failover功能,坏掉磁盘上的数据会自动地转移到其它正常的磁盘上,而且Broker还能正常工作,基于Failover机制,Kafka可以舍弃RAID方案。
zookeeper.connectCS格式参数,可以指定值为zk1:2181,zk2:2181,zk3:2181,不同Kafka集群可以指定:zk1:2181,zk2:2181,zk3:2181/kafka1,chroot只需要写一次。
listeners设置内网访问Kafka服务的监听器。
advertised.listeners设置外网访问Kafka服务的监听器。
auto.create.topics.enable是否允许自动创建Topic。
unclean.leader.election.enable是否允许Unclean Leader 选举。
auto.leader.rebalance.enable是否允许定期进行Leader选举,生产环境中建议设置成false。
log.retention.{hoursminutes
log.retention.bytes指定Broker为消息保存的总磁盘容量大小。message.max.bytes:控制Broker能够接收的最大消息大小。

1.3、启动

1、 启动脚本都在bin目录的window目录下,一定要先启动 zookeeper,再启动kafka

如果是linux,不使用window下的命令即可,使用对应的 xxxx.sh 即可

在这里插入图片描述

2、首先启动zookeeper

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

在这里插入图片描述

3、在启动kafka

.\bin\windows\kafka-server-start.bat .\config\server.properties

在这里插入图片描述

二、linux 安装

暂略

三、docker 安装

暂略

四、docker 安装

暂略

五、命令行使用

5.1、topic 命令

1、关于topic,这里用window 来示例

bin\windows\kafka-topics.bat

在这里插入图片描述

2、创建 first topic,五个分区,1个副本

bin\windows\kafka-topics.bat  --bootstrap-server localhost:9092 --create --partitions 5 --replication-factor 1 --topic first

在这里插入图片描述
3、查看当前服务器中的所有 topic

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

在这里插入图片描述

4、查看 first 主题的详情

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic first

在这里插入图片描述
5、修改分区数**(注意:分区数只能增加,不能减少)**

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic first --partitions 6

在这里插入图片描述

6、删除 topic,该操作在winodw,会出现文件授权问题,日志可以在kafka的启动命令窗口中查看,只需要修改文件权限即可,如果出现这个问题,我们需要清空之前配置的 datakafka-logs 这两个文件中的内容,再次重新启动即可。

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --delete --topic first

在这里插入图片描述

5.2、生产者命令行操作

1、关于查看操作生产者命令参数,这里用window 来示例

.\bin\windows\kafka-console-producer.bat

在这里插入图片描述

2、发送消息,这里发送了2次的数据,第一次是hello,第二次是world

.\bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic first

在这里插入图片描述

5.3、消费者命令行操作

1、关于查看操作生产者命令参数,这里用window 来示例

.\bin\windows\kafka-console-consumer.bat

在这里插入图片描述
在这里插入图片描述

2、接受消息,因为前面我们在发送消息的时候,消费者没有启动,所以第一次发的数据这里是收不到的,并没有存储到topic中

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic first

在这里插入图片描述

在这里插入图片描述

3、把主题中所有的数据都读取出来(包括历史数据),可以看到我们获取到了从消费者没有上线之前到上线之后的所有数据,一共6条。

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic first

在这里插入图片描述

5.4、脚本说明

项目Value
connect-standalone.sh用于启动单节点的Standalone模式的Kafka Connect组件。
connect-distributed.sh用于启动多节点的Distributed模式的Kafka Connect组件。
kafka-acls.sh脚本用于设置Kafka权限,比如设置哪些用户可以访问Kafka的哪些TOPIC的权限。
kafka-delegation-tokens.sh用于管理Delegation Token。基于Delegation Token的认证是一种轻量级的认证机制,是对SASL认证机制的补充。
kafka-topics.sh用于管理所有TOPIC。
kafka-console-producer.sh用于生产消息。
kafka-console-consumer.sh用于消费消息。
kafka-producer-perf-test.sh用于生产者性能测试
kafka-consumer-perf-test.sh用于消费者性能测试
kafka-delete-records.sh用于删除Kafka的分区消息,由于Kafka有自己的自动消息删除策略,使用率不高。
kafka-dump-log.sh用于查看Kafka消息文件的内容,包括消息的各种元数据信息、消息体数据。
kafka-log-dirs.sh用于查询各个Broker上的各个日志路径的磁盘占用情况。
kafka-mirror-maker.sh用于在Kafka集群间实现数据镜像。
kafka-preferred-replica-election.sh用于执行Preferred Leader选举,可以为指定的主题执行更换Leader的操作。
kafka-reassign-partitions.sh用于执行分区副本迁移以及副本文件路径迁移。
kafka-run-class.sh用于执行任何带main方法的Kafka类。
kafka-server-start.sh用于启动Broker进程。
kafka-server-stop.sh用于停止Broker进程。
kafka-streams-application-reset.sh用于给Kafka Streams应用程序重设位移,以便重新消费数据。
kafka-verifiable-producer.sh用于测试验证生产者的功能。
kafka-verifiable-consumer.sh用于测试验证消费者功能。
trogdor.sh是Kafka的测试框架,用于执行各种基准测试和负载测试。
kafka-broker-api-versions.sh脚本主要用于验证不同Kafka版本之间服务器和客户端的适配性

5.5、关闭kafka

1、一定要先关闭 kafka,再关闭zookeeper,否则容易出现数据错乱

如果出现数据错错乱,最简单的方法就是清空data和kafka-logs 这两个文件下的内容,重新启动即可

2、关闭

.\bin\windows\kafka-server-stop.bat
.\bin\windows\zookeeper-server-stop.bat

在这里插入图片描述

5.6、选择分区数及kafka性能测试

1、主要工具是 kafka-producer-perf-test.batkafka-consumer-perf-test.bat 两个脚本,可以参考 kafka如何选择分区数及kafka性能测试

六、java 使用

6.1、使用原生客户端

1、依赖

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>

2、发送和消费消息,具体代码如下:

public class KafkaConfig {
 
    public static void main(String[] args) {
        // 声明主题
        String topic = "first";
        // 创建消费者
        Properties consumerConfig = new Properties();
        consumerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"boot-kafka");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig);
        // 订阅主题并循环拉取消息
        kafkaConsumer.subscribe(Arrays.asList(topic));
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(10000));
                    for(ConsumerRecord<String, String> record:records){
                        System.out.println(record.value());
                    }
                }
            }
        }).start();
        // 创建生产者
        Properties producerConfig = new Properties();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094");
        producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG,"boot-kafka-client");
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<>(producerConfig);
        // 给主题发送消息
        producer.send(new ProducerRecord<>(topic, "hello,"+System.currentTimeMillis()));
    }
}

6.2、使用springBoot

1、依赖

 <!-- 不使用kafka的原始客户端,使用spring集成的,这样比较方便  -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <!-- 可以不用指定,springBoot 会帮我们选择,如果有特殊需求,可以更改 -->
            <!--            <version>3.0.2</version>-->
        </dependency>

2、配置文件

server:
  port: 7280
  servlet:
    context-path: /thermal-emqx2kafka
  shutdown: graceful

spring:
  application:
    name: thermal-api-demonstration-tdengine
  lifecycle:
    timeout-per-shutdown-phase: 30s
  mvc:
    pathmatch:
      matching-strategy: ant_path_matcher  # 不然spring boot 2.6以后的版本 和 swagger 会出现 问题,可以参考 https://blog.csdn.net/qq_41027259/article/details/125747298
  kafka:
    bootstrap-servers: 127.0.0.1:9092  # 192.168.189.128:9092,92.168.189.128:9093,192.168.189.128:9094  连接的 Kafka Broker 主机名称和端口号
    #properties.key-serializer: # 用于配置客户端的附加属性,对于生产者和消费者都是通用的,。 org.apache.kafka.common.serialization.StringSerializer
    producer: # 生产者
      retries: 3 # 重试次数
      #acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      #batch-size: 16384 # 一次最多发送数据量
      #buffer-memory: 33554432 # 生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: # 消费者
      group-id: test-consumer-group #默认的消费组ID,在Kafka的/config/consumer.properties中查看和修改
      #enable-auto-commit: true # 是否自动提交offset
      #auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)
      #auto-offset-reset: latest  #earliest,latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、发送消息

package cn.jt.thermalemqx2kafka.kafka.controller;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年08月17日
 */
@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/mock")
    public String sendKafkaMessage() {
        Map<String, Object> data = new HashMap<>(2);
        data.put("id", 1);
        data.put("name", "gkj");
        kafkaTemplate.send("first", JSON.toJSONString(data));
        return "ok";
    }
}

4、接受消息

package cn.jt.thermalemqx2kafka.kafka.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author GXM
 * @version 1.0.0
 * @Description TODO
 * @createTime 2023年08月17日
 */
@Slf4j
@Component
public class KafkaListener {

    @org.springframework.kafka.annotation.KafkaListener(topics = "first")
    private void handler(String content) {
        log.info("consumer received: {} ", content);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/82284.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

通过安全日志读取WFP防火墙放行日志

前言 之前的文档中&#xff0c;描写了如何对WFP防火墙进行操作以及如何在防火墙日志中读取被防火墙拦截网络通讯的日志。这边文档&#xff0c;着重描述如何读取操作系统中所有被放行的网络通信行为。 读取系统中放行的网络通信行为日志&#xff0c;在win10之后的操作系统上&am…

Monitor.Analog烧机室|高温老化箱软件概要设计

Monitor.Analog产品老化试验软件概要设计&#xff1a; 1. 引言&#xff1a; 模拟量采集软件的目标是实现对模拟量信号的采集、处理和展示。该软件旨在提供一个用户友好的界面&#xff0c;允许用户配置采集参数、实时监测模拟量信号&#xff0c;并提供数据分析和导出功能。 2. 功…

LVS-DR集群(一台LVS,一台CIP,两台web,一台NFS)的构建以及LVS-DR模式工作原理和特点

一.LVS-DR工作模式原理和特点 1.工作模式 2.模式特点 二.构建环境 1.五台关闭防火墙&#xff0c;关闭selinux&#xff0c;拥有固定IP&#xff0c;部署有http服务的虚拟机&#xff0c;LVS设备下载ipvsadm工具&#xff0c;NFS 设备需要下载rpcbind和nfs-utils 2.实现功能 3…

【简单认识Docker基本管理】

文章目录 一、Docker概述1、定义2.容器化流行的原因3.Docker和虚拟机的区别4.Docker核心概念 二、安装docker三、镜像管理1.搜索镜像2.下载&#xff08;拉取&#xff09;镜像3.查看已下载镜像4.查看镜像详细信息5.修改镜像标签6.删除镜像7.导出镜像文件和拉取本地镜像文件8.上传…

ruoyi-vue-pro yudao 项目报表设计器 积木报表模块启用及相关SQL脚本

目前ruoyi-vue-pro 项目虽然开源&#xff0c;但是report模块被屏蔽了&#xff0c;查看文档却要收费 199元&#xff08;知识星球&#xff09;&#xff0c;价格有点太高了吧。 分享下如何启用 report 模块&#xff0c;顺便贴上sql相关脚本。 一、启用模块 修改根目录 pom.xml …

《安富莱嵌入式周报》第320期:键盘敲击声解码, 军工级boot设计,开源CNC运动控制器,C语言设计笔记,开源GPS车辆跟踪器,一键生成RTOS任务链表

周报汇总地址&#xff1a;嵌入式周报 - uCOS & uCGUI & emWin & embOS & TouchGFX & ThreadX - 硬汉嵌入式论坛 - Powered by Discuz! 视频版&#xff1a; https://www.bilibili.com/video/BV1Cr4y1d7Mp/ 《安富莱嵌入式周报》第320期&#xff1a;键盘敲击…

空洞卷积网络实现

代码中涉及的图片实验数据下载地址&#xff1a;https://download.csdn.net/download/m0_37567738/88235543?spm1001.2014.3001.5501 代码&#xff1a; import torch.nn as nn import numpy as npfrom matplotlib import pyplot as plt import time #from utils import get_ac…

【深入探究人工智能】:常见机器学习算法总结

文章目录 1、前言1.1 机器学习算法的两步骤1.2 机器学习算法分类 2、逻辑回归算法2.1 逻辑函数2.2 逻辑回归可以用于多类分类2.3 逻辑回归中的系数 3、线性回归算法3.1 线性回归的假设3.2 确定线性回归模型的拟合优度3.3线性回归中的异常值处理 4、支持向量机&#xff08;SVM&a…

基于ArcGis提取道路中心线

基于ArcGis提取道路中心线 文章目录 基于ArcGis提取道路中心线前言一、生成缓冲区二、导出栅格数据三、导入栅格数据四、新建中心线要素五、生成中心线总结 前言 最近遇到一个问题&#xff0c;根据道路SHP数据生成模型的时候由于下载的道路数据杂项数据很多&#xff0c;所以导…

链表之第三回

欢迎来到我的&#xff1a;世界 该文章收入栏目&#xff1a;链表 希望作者的文章对你有所帮助&#xff0c;有不足的地方还请指正&#xff0c;大家一起学习交流 ! 目录 前言第一题&#xff1a;判断是否为环形链表第二题&#xff1a;找到两条链表的相交点第三题&#xff1a;返回…

webSocket 开发

1 认识webSocket WebSocket_ohana&#xff01;的博客-CSDN博客 一&#xff0c;什么是websocket WebSocket是HTML5下一种新的协议&#xff08;websocket协议本质上是一个基于tcp的协议&#xff09;它实现了浏览器与服务器全双工通信&#xff0c;能更好的节省服务器资源和带宽…

【机器学习】— 2 图神经网络GNN

一、说明 在本文中&#xff0c;我们探讨了图神经网络&#xff08;GNN&#xff09;在推荐系统中的潜力&#xff0c;强调了它们相对于传统矩阵完成方法的优势。GNN为利用图论来改进推荐系统提供了一个强大的框架。在本文中&#xff0c;我们将在推荐系统的背景下概述图论和图神经网…

回归预测 | MATLAB实现PSO-RBF粒子群优化算法优化径向基函数神经网络多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现PSO-RBF粒子群优化算法优化径向基函数神经网络多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现PSO-RBF粒子群优化算法优化径向基函数神经网络多输入单输出回归预测&#xff08;多指标&#xff0c;多图&a…

matlab使用教程(19)—曲线拟合与一元方程求根

1.多项式曲线拟合 此示例说明如何使用 polyfit 函数将多项式曲线与一组数据点拟合。您可以按照以下语法&#xff0c;使用 polyfit 求出以最小二乘方式与一组数据拟合的多项式的系数 p polyfit(x,y,n), 其中&#xff1a; • x 和 y 是包含数据点的 x 和 y 坐标的向量 …

深入理解SSO原理,项目实践使用一个优秀开源单点登录项目(附源码)

深入理解SSO原理,项目实践使用一个优秀开源单点登录项目(附源码)。 一、简介 单点登录(Single Sign On),简称为 SSO。 它的解释是在多个应用系统中,用户只需要登录一次就可以访问所有相互信任的应用系统。 ❝ 所谓一次登录,处处登录。同样一处退出,处处退出。 ❞ 二…

Axios使用CancelToken取消重复请求

处理重复请求&#xff1a;没有响应完成的请求&#xff0c;再去请求一个相同的请求&#xff0c;会把之前的请求取消掉 新增一个cancelRequest.js文件 import axios from "axios" const cancelTokens {}export const addPending (config) > {const requestKey …

时序预测 | MATLAB实现基于CNN-BiGRU卷积双向门控循环单元的时间序列预测-递归预测未来(多指标评价)

时序预测 | MATLAB实现基于CNN-BiGRU卷积双向门控循环单元的时间序列预测-递归预测未来(多指标评价) 目录 时序预测 | MATLAB实现基于CNN-BiGRU卷积双向门控循环单元的时间序列预测-递归预测未来(多指标评价)预测结果基本介绍程序设计参考资料 预测结果 基本介绍 MATLAB实现基于…

Vs code 使用中的小问题

1.Java在Vs code 中使用单元测试失败或者如何使用单元测试 创建Java项目&#xff0c;或者将要测试的文件夹添加进工作区 要出现lib包&#xff0c;并有两个测试用的jar包 编写测试文件 public class TestUnit{ public static void main(String[] args) {String str "…

Pycharm与Anaconda Python的开发环境搭建

目录 一&#xff1a;下载 二&#xff1a;安装python 三&#xff1a;设置Pycharm 一&#xff1a;下载 下载Anaconda&#xff1a; Anaconda | The World’s Most Popular Data Science Platform 安装好以后&#xff0c;设置一下环境变量&#xff1a; 打开命令行&#xff0c…

OpenCV-Python中的图像处理-图像特征

OpenCV-Python中的图像处理-图像特征 图像特征Harris角点检测亚像素级精度的角点检测Shi-Tomasi角点检测SIFT(Scale-Invariant Feature Transfrom)SURF(Speeded-Up Robust Features)FAST算法BRIEF(Binary Robust Independent Elementary Features)算法ORB (Oriented FAST and R…