SpringBoot整合Kafka(包含Kafka_2.12-3.3.1单节点安装,kafka可视化程序efak v3.0.1安装)

SpringBoot整合Kafka(包含Kafka_2.12-3.3.1单节点安装,kafka可视化程序efka v3.0.1安装)

kafka、efak安装包下载

kafka安装

  • 资源下载:

  • 下载tgz安装包:http://archive.apache.org/dist/kafka/

//解压
tar -zxvf /home/soft/kafka_2.12-3.3.1.tgz
//更名
mv kafka_2.12-3.3.1/ kafka 
  • 目录结构:进入 kafka目录后目录结构

  • 配置kafka内嵌的zookeeper (位置:vim ./config/zookeeper.properties )注意端口号和存储地址

dataDir=/home/hzh04/Desktop/kafka/data-zookeeper
clientPort=2181
  • 启动zookeeper:

./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties

如果zookeeper需要开启kerberos认证需要给zookeeper.properties添加配置(demo中没有配置):

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

requireClientAuthScheme=sasl

jaasLoginRenew=3600000

  • 修改kafka配置文件(位置:./config/server.properties)关于zookeeper相关配置与zookeeper保持一致:

# *** listeners & zookeeper.connect ***
listeners=PLAINTEXT://0.0.0.0:9092
zookeeper.connect=0.0.0.0:2181
advertised.listeners=PLAINTEXT://ip:9092
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/hzh04/Desktop/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
  • 启动kafka

./bin/kafka-server-start.sh ./config/server.properties
  • 查看java程序进程:jps

表示已经启动成功了

  • 订阅测试(demo中测试主题为:kafka1):

  • 消费者进入主题(topic

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka1 --from-beginning
  • 生产者进入主题(topic):

./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka1 

表示已经能正常通讯了,kafka单节点搭建成功

如果kafka不能实现topic监听,就试着以下操作:

  1. 停止kafka服务;

  1. 停止zookeeper服务;

  1. 清除kafka配置文件(server.properties)中log.dirs=''路径下的文件,或者换个地址;

  1. 清除zookeeper配置(kafka原生zookeeper配置文件是:zookeeper.properties)文件中配置的dataDir=''路径下的文件,或者换个地址;

  1. 重新启动zookeeper、kafka;

efak(kafka管理工具)安装

  • 资源下载:

  • 下载tgz安装包:http://www.kafka-eagle.org/

  • 解压完压缩包配置环境变量:

# 编辑环境变量
vim /etc/profile
export KE_HOME= /opt/software/kafka-eagle
PATH=$PATH:$KE_HOME/bin
# 刷新环境变量:
source /etc/profile
  • 修改配置(./conf/system-config.properties):

vim ./conf/system-config.properties
# Multi zookeeper&kafka cluster list -- The client connection address of the Zookeeper cluster is set here
efak.zk.cluster.alias=cluster1
cluster1.zk.list=ip:2181

# Add zookeeper acl
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123

# Kafka broker nodes online list
cluster1.efak.broker.size=10

# Zkcli limit -- Zookeeper cluster allows the number of clients to connect to
# If you enable distributed mode, you can set value to 4 or 8
kafka.zk.limit.size=16

# EFAK webui port -- WebConsole port access address
efak.webui.port=8048

######################################
# EFAK enable distributed
######################################
efak.distributed.enable=false
# master worknode set status to master, other node set status to slave
efak.cluster.mode.status=slave
# deploy efak server address
efak.worknode.master.host=localhost
efak.worknode.port=8085

# Kafka offset storage -- Offset stored in a Kafka cluster, if stored in the zookeeper, you can not use this option
cluster1.efak.offset.storage=kafka

# Whether the Kafka performance monitoring diagram is enabled
efak.metrics.charts=false

# EFAK keeps data for 30 days by default
efak.metrics.retain=30

# If offset is out of range occurs, enable this property -- Only suitable for kafka sql
efak.sql.fix.error=false
efak.sql.topic.records.max=5000

# Delete kafka topic token -- Set to delete the topic token, so that administrators can have the right to delete
efak.topic.token=keadmin

# Kafka sasl authenticate
cluster1.efak.sasl.enable=false
cluster1.efak.sasl.protocol=SASL_PLAINTEXT
cluster1.efak.sasl.mechanism=SCRAM-SHA-256
cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret";
# If not set, the value can be empty
cluster1.efak.sasl.client.id=
# Add kafka cluster cgroups
cluster1.efak.sasl.cgroup.enable=false
cluster1.efak.sasl.cgroup.topics=kafka_ads01

# Default use sqlite to store data
efak.driver=org.sqlite.JDBC
# It is important to note that the '/hadoop/kafka-eagle/db' path must be exist.
efak.url=jdbc:sqlite:/home/hzh04/Desktop/kafka-eagle/kafka-eagle-web/db/ke.db
efak.username=admin
efak.password=CHENYB%@!

# (Optional) set mysql address
#efak.driver=com.mysql.jdbc.Driver
#efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
#efak.username=root
#efak.password=smartloli
  • 授权:

cd ../bin/
chmod +x ke.sh
  • 启动efak

./ke.sh start
  • 查看java程序进程 jps

web 访问地址:http://ip:8048/

账号:admin

密码:123456

表示启动成功

springboot整合

  • maven pom

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  • application.yml核心配置

spring:
  kafka:
    bootstrap-servers: ip(kafka节点或者集群):9092
    # 生产者
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 消费者
    consumer:
      group-id: kafka1
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  • 主题配置(生产者最好配置指定主题,不要开放权限过大,导致脏数据存留)

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * topic 主题配置
 */
@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic createTopic_kafka1(){
        return new NewTopic("kafka1",4,(short) -1);
    }

    @Bean
    public NewTopic createTopic_kafka_topic_001(){
        return new NewTopic("kafka_topic_001",4,(short) -1);
    }

    @Bean
    public NewTopic createTopic_kafka_89757(){
        return new NewTopic("kafka_89757",4,(short) -1);
    }

    @Bean
    public NewTopic createTopic_plc1(){
        return new NewTopic("plc1",4,(short) -1);
    }
}
  • 监听者配置

  • @SendTo("kafka_89757") 表示消息转发

import com.hzh.demo.kafka.config.KafkaConnectCondition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Conditional;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    @KafkaListener(groupId = "top_group_1",topics = {"kafka_topic_001","kafka_89757"})
    public String consumerTopic(ConsumerRecord<String, String> record, Consumer consumer){
        String msg = record.value();
        System.out.println("------------------- consumerTopic 收到消息:" + msg + "-------------------");
        consumer.commitAsync();
        return "received msg is " + msg;
    }

    @KafkaListener(groupId = "top_group_1",topics = "kafka1")
    @SendTo("kafka_89757")
    public String consumerTopic2(ConsumerRecord<String, String> record, Consumer consumer){
        String msg = record.value();
        System.out.println("------------------- consumerTopic2 收到消息:" + msg + "-------------------");
        consumer.commitAsync();
        return "received msg is " + msg;
    }

    @KafkaListener(groupId = "top_group_1",topics = "kafka_89757")
    public String consumerTopic3(ConsumerRecord<String, String> record, Consumer consumer){
        String msg = record.value();
        System.out.println("------------------- consumerTopic3 收到消息:" + msg + "-------------------");
        consumer.commitAsync();
        return "received msg is " + msg;
    }

    @KafkaListener(topics = "plc1")
    public String consumerTopic_plc1(ConsumerRecord<String, String> record, Consumer consumer){
        String msg = record.value();
        System.out.println("------------------- consumerTopic_plc1 收到消息:" + msg + "-------------------");
        consumer.commitAsync();
        return "received msg is " + msg;
    }
}
  • 生产者编写

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("kafka")
public class KafkaProducerController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/send/{msg}")
    public String send(
            @PathVariable("msg") String msg
    ){
        kafkaTemplate.send("kafka1",msg);
        return "success";
    }

    @GetMapping("/send2/{msg}")
    public String send2(
            @PathVariable("msg") String msg
    ) {
        kafkaTemplate.send("kafka_topic_001", msg);
        return "success";
    }
}
  • 测试结果

  • 增值服务

  • 生产者监听

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 消息监控
 */
@Component
public class KafkaListenerResult {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    //配置监听
    @PostConstruct
    private void listener() {
        kafkaTemplate.setProducerListener(new ProducerListener() {

            //kafka消息监控
            @Override
            public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
                System.out.println("消息监控 --- success message=" + producerRecord.value());
            }

            @Override
            public void onError(ProducerRecord producerRecord, Exception exception) {
                System.out.println("消息监控 --- error message={}" + producerRecord.value());
            }
        });
    }
}
  • 连接检查

# 需要在生产者或者监听者开启注解,程序运行时会调用
@Conditional(KafkaConnectCondition.class) // 连接检查

import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;

/**
 * 检查kafka连接状态
 */
public class KafkaConnectCondition implements Condition {
    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
        Environment environment = context.getEnvironment();
        String kafkaServers = environment.getProperty("spring.kafka.bootstrap-servers");
        System.out.println("获取到的kafkaServers:"+kafkaServers);
        if (null == kafkaServers || kafkaServers.equals("")){
            return false;
        }

        String serverPort = kafkaServers.split(",")[0];
        URI uri = URI.create("http://" + serverPort);

        return this.isConnectable(uri.getHost(), uri.getPort());
    }

    /**
     * 判断kafka服务能否正常连接
     * @param host
     * @param port
     * @return
     */
    private boolean isConnectable(String host, int port) {
        boolean result = true;
        Socket socket = new Socket();
        try {
            socket.connect(new InetSocketAddress(host, port),3000);
        } catch (IOException e) {
            System.out.println("========注意!!!!!未能连接上kafka服务,意味着kafka监听将不开启");
            result = false;
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                System.out.println("关闭kafka服务socket出错" + e.getMessage());
                result = false;
            }
        }
        System.out.println("========kafka服务能正常连接========");
        return result;
    }
}
  • postman api 测试

整合完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/2266.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

自定义类型的超详细讲解ᵎᵎ了解结构体和位段这一篇文章就够了ᵎ

目录 1.结构体的声明 1.1基础知识 1.2结构体的声明 1.3结构体的特殊声明 1.4结构体的自引用 1.5结构体变量的定义和初始化 1.6结构体内存对齐 那对齐这么浪费空间&#xff0c;为什么要对齐 1.7修改默认对齐数 1.8结构体传参 2.位段 2.1什么是位段 2.2位段的内存分配…

【java】笔试强训Day1

⛳选择题 1.在 Java 中&#xff0c;存放字符串常量的对象属于 &#xff08; &#xff09;类对象 A、Character B、String C、StringBuffer D、Vector &#x1f648;大家觉得答案是什么呢 &#x1f649;答案是…

GPT-4发布:人工智能新高度,以图生文技术震撼,短时间内挤爆OpenAI模型付费系统

“GPT-4&#xff0c;起飞&#xff01;”今日凌晨1点&#xff0c;OpenAI正式推出史上最强大的GPT-4文本生成AI系统 GPT-4&#xff1a;人工智能的新里程碑 你可能已经听说过GPT-3&#xff0c;它是一种能够生成自然语言文本的强大模型&#xff0c;可以用来回答问题、写文章、编程…

【Java SE】变量的本质

目录一. 前言二. 变量(variable)2.1 性质2.2 变量类型2.2.1 核心区别2.3 变量的使用三. 总结一. 前言 一天一个Java小知识点&#xff0c;助力小伙伴更好地入门Java&#xff0c;掌握更深层次的语法。 二. 变量(variable) 2.1 性质 变量本质上就是代表一个”可操作的存储空间”…

STL库中list的迭代器实现痛点分析

前文本篇文章准备换个模式&#xff0c;之前都是先详解模拟实现&#xff0c;但是模拟实现的基本逻辑大多数老铁都是明白的&#xff0c;所以我们这次主要讲解STL库中list的独特性&#xff0c;也就是模拟实现中的重难点文末有模拟实现的源码一&#xff0c;list实现的特殊类list实现…

【pytorch】使用deepsort算法进行目标跟踪,原理+pytorch实现

目录deepsort流程一、匈牙利算法二、卡尔曼滤波车速预测例子动态模型的概念卡尔曼滤波在deepsort中的动态模型三、预测值及测量值的含义deepsort在pytorch中的运行deepsort流程 DeepSORT是一种常用的目标跟踪算法&#xff0c;它结合了深度学习和传统的目标跟踪方法。DeepSORT的…

WireShark如何抓包,各种协议(HTTP、ARP、ICMP)的过滤或分析,用WireShark实现TCP三次握手和四次挥手

WireShark一、开启WireShark的大门二、如何抓包 搜索关键字2.1 协议过滤2.2 IP过滤2.3 过滤端口2.4 过滤MAC地址2.5 过滤包长度2.6 HTTP模式过滤三、ARP协议分析四、WireShark之ICMP协议五、TCP三次握手与四次挥手5.1 TCP三次握手实验5.2 可视化看TCP三次握手5.3 TCP四次挥手5.…

PCL 使用ICP点云拼接

一、简介 ICP算法详解——我见过最清晰的解释_负壹的博客-CSDN博客 两个点集&#xff0c;source和target&#xff0c;target不变&#xff0c;source经过旋转&#xff08;Rotation&#xff09;和平移&#xff08;Translation&#xff09;甚至加上尺度&#xff08;Scale&#x…

大聪明教你学Java | 深入浅出聊 SpringBoot 中的 starter 机制

前言 &#x1f34a;作者简介&#xff1a; 不肯过江东丶&#xff0c;一个来自二线城市的程序员&#xff0c;致力于用“猥琐”办法解决繁琐问题&#xff0c;让复杂的问题变得通俗易懂。 &#x1f34a;支持作者&#xff1a; 点赞&#x1f44d;、关注&#x1f496;、留言&#x1f4…

网络安全横向移动指南

在网络安全方面&#xff0c;了解威胁参与者的工具、技术和思维过程非常重要。 一旦对手获得对网络的初始访问权限&#xff0c;横向移动允许他们通过破坏目标组织网络中的其他主机来扩展访问权限并保持持久性。 威胁行为者可以收集有关公司用户活动和凭据、重要数据位置的信息…

Spark - 继承 FileOutputFormat 实现向 HDFS 地址追加文件

目录 一.引言 二.源码浅析 1.RDD.saveAsTextFile 2.TextOutputFormat 3.FileOutputFormat 三.源码修改 1.修改文件生成逻辑 - getRecordWriter 2.允许目录存在 - checkoutputSpecs 3.全部代码 - TextOutputFormatV2 四.追加存储代码实战 五.总结 一.引言 Output d…

关于STM32用DMA传输UART空闲中断中接收的数据时无法接收数据问题以及解决办法

一、stm32 cube ide 配置 1、DMA串口接收数据的ide配置如下图所示 串口1相关的设置及printf函数的使用&#xff0c;这里没放&#xff0c;建议先实现串口打印功能 2、相关的知识点 普通模式和循环模式的区别在于&#xff0c;普通模式下&#xff0c;DMA只会接收一次数据&#x…

微前端(无界)

前言&#xff1a;微前端已经是一个非常成熟的领域了&#xff0c;但开发者不管采用哪个现有方案&#xff0c;在适配成本、样式隔离、运行性能、页面白屏、子应用通信、子应用保活、多应用激活、vite 框架支持、应用共享等用户核心诉求都或存在问题&#xff0c;或无法提供支持。本…

DS18B20温度传感器简介和1-Wire驱动程序

目录DS18B20简介DS18B20的两种供电方式64位ROM温度传感器1-Wire Bus简介DS18B20通信时序初始化ROM相关命令(后续包含任何数据交换的操作)功能相关命令(后续包含任何数据交换的操作)单个DS18B20读取温度值驱动多个DS18B20读取温度值驱动DS18B20简介 DS18B20数字温度计提供9位到…

学习系统编程No.7【进程替换】

引言&#xff1a; 北京时间&#xff1a;2023/3/21/7:17&#xff0c;这篇博客本来昨天晚上就能开始写的&#xff0c;但是由于笔试强训的原因&#xff0c;导致时间用在了做题上&#xff0c;通过快2个小时的垂死挣扎&#xff0c;我充分意识到了自己做题能力的缺陷和运用新知识的缺…

致远OA敏感信息泄露漏洞合集(含批量检测POC)

文章目录前言敏感信息泄露A6 status.jsp 信息泄露漏洞漏洞描述漏洞影响网络测绘漏洞复现POC 批量检测getSessionList.jsp Session泄漏漏洞漏洞描述网络测绘批量检测POC致远OA 帆软组件 ReportServer 目录遍历漏洞漏洞描述漏洞影响网络测绘POC(批量检测)A6 createMysql.jsp 数据…

Java stream性能比较

环境 Ubuntu 22.04IntelliJ IDEA 2022.1.3JDK 17CPU&#xff1a;8核 ➜ ~ cat /proc/cpuinfo | egrep -ie physical id|cpu cores physical id : 0 cpu cores : 1 physical id : 2 cpu cores : 1 physical id : 4 cpu cores : 1 physical id : 6 cpu cores : 1 physical id …

浏览器工作原理

一、JavaScript 的历史 JavaScript&#xff08;简称JS&#xff09;Web前端开发的脚本语言。 它诞生1995年&#xff0c;由网景公司的 Brendan Eich 开发。最初&#xff0c;JavaScript 被设计用于在网页上嵌入动态内容和交互式功能。 1996年&#xff0c;JavaScript 1.1 成为国…

C++虚函数与多态

C虚函数与多态虚函数抽象类纯虚函数虚析构函数多态虚函数的几个问题纯虚函数和ADT虚函数 virtual修饰的成员函数就是虚函数&#xff0c; 1.虚函数对类的内存影响&#xff1a;增加一个指针类型大小&#xff08;32位和64位&#xff09; 2.无论有多少个虚函数&#xff0c;只增加一…

【ansible】模块介绍超详解(下)

目录 六&#xff0c;软件包管理 1&#xff0c;yum_repository模块 &#xff08;1&#xff09;yum_repository模块常用选项 &#xff08;2&#xff09;yum_repository模块案例 2&#xff0c;mount模块 &#xff08;1&#xff09;mount模块选项 &#xff08;2&#xff09;mount模…