docker安装和使用kafka

1. 启动zookeeper

Kafka依赖zookeeper, 首先安装zookeeper
-p:设置映射端口(默认2181

docker run --name zookeeper \
	--network app-tier \
    -e ALLOW_ANONYMOUS_LOGIN=yes \
	--restart=always \
    -d bitnami/zookeeper:latest

2. 启动kafka

docker run --name kafka \
	--network app-tier \
    -p 9092:9092 \
    -e ALLOW_PLAINTEXT_LISTENER=yes \
    -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
    -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092	 \
	--restart=always \
    -d bitnami/kafka:latest
命令解释
ALLOW_PLAINTEXT_LISTENER=yes任何人可以访问
KAFKA_CFG_ZOOKEEPER_CONNECTzookeeper地址
KAFKA_CFG_ADVERTISED_LISTENERS当前kafka安装的主机地址 如果是服务器部署则配服务器IP或域名否则客户端监听消息会报地址错误

2. 启动kafka-map管理工具

docker run --name kafka-map \
	--network app-tier \
    -p 9001:8080 \
    -v /usr/local/kafka-map/data:/usr/local/kafka-map/data \
    -e DEFAULT_USERNAME=admin \
    -e DEFAULT_PASSWORD=admin \
    --restart=always \
	-d dushixiang/kafka-map:latest

启动成功后, 访问客户端: http://localhost:9001
账户: admin
密码: admin

在这里插入图片描述

3. springboot集成kafka

pom.xml配置

    <dependencies>
    	<!--kafka依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>        

配置application.yml

#------------------------------------spring----------------------------------
spring:
  #------------------------------------消息队列kafka配置----------------------------------
  kafka:
    #  kafka server的地址,如果有多个,使用逗号分割
    bootstrap-servers: localhost:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 1
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。32MB的批处理缓冲区
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
      properties:
        # 自定义拦截器
        interceptor.classes: com.wms.message.kafka.interceptor.CustomProducerInterceptor
        #自定义分区器
        partitioner.classes: com.wms.message.kafka.interceptor.CustomPartitioner
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        # 自定义消费者拦截器
        interceptor.classes: com.wms.message.kafka.interceptor.CustomConsumerInterceptor
      # 默认消费者组
      group-id: code-safe-group
      # 设置最大轮询间隔时间(毫秒),默认值为 300000(5分钟)
      # 如果两次 poll() 之间的时间超过此配置值,可能导致 rebalance, 消费者会被剔除 此处设置10分钟
      max-poll-interval-ms: 600000
      # 批量一次最大拉取数据量
      max-poll-records: 1000
      batch:
        # 批消费并发量,小于或等于Topic的分区数
        concurrency: 3
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false
    topics:
      # 自定义主题名称
      twsm: webSocket_send_message_dev
      group-id: group-id
      topic-name:
        - topic1

测试发送消息到kafka

/**
 * Kafka测试
 *
 * @version 1.0
 * @author: web
 * @date: 2024/1/18 15:07
 */
@Slf4j
@RestController
@RequestMapping("/message/kafkaTest")
public class KafkaTestController extends BaseController
{

    @Autowired
    private KafkaUtils kafkaUtils;

    /**
     * 生产者_推送消息到kafka
     *
     * @param msg
     * @author: web
     * @return: AjaxResult
     * @date: 2024/1/18 15:16
     */
    @PostMapping("/send")
    public AjaxResult send(@RequestBody Map<String, Object> msg)
    {
        try
        {
            String userId = msg.get("userId").toString();
            Object content = msg.get("content");
            Message message = kafkaUtils.setMessage(userId, content);
            kafkaUtils.send(KafkaUtils.TOPIC_TEST, message);
        }
        catch (Exception e)
        {
            log.error("生产者_推送消息到kafka发生异常");
        }
        return success();
    }

    /**
     * 消费者1
     *
     * @param record
     * @param ack
     * @param topic
     * @author: web
     * @return: void
     * @date: 2024/1/18 15:07
     */
    @KafkaListener(topics = KafkaUtils.TOPIC_TEST)
    public void topicTest1(ConsumerRecord<?, ?> record, Acknowledgment ack,
                           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
    {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent())
        {
            Object msg = message.get();
            log.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();
        }
    }

    /**
     * 消费者2
     *
     * @param record
     * @param ack
     * @param topic
     * @author: web
     * @return: void
     * @date: 2024/1/18 15:07
     */
    //    @KafkaListener(topics = KafkaUtils.TOPIC_TEST, groupId = KafkaUtils.TOPIC_GROUP2)
    //    public void topicTest2(ConsumerRecord<?, ?> record, Acknowledgment ack,
    //                           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
    //    {
    //
    //        Optional message = Optional.ofNullable(record.value());
    //        if (message.isPresent())
    //        {
    //            Object msg = message.get();
    //            log.info("topic.group2 消费了: Topic:" + topic + ",Message:" + msg);
    //            ack.acknowledge();
    //        }
    //    }

}

KafkaUtils类

/**
 * 生产者
 *
 * @version: 1.0
 * @author: web
 * @date: 2024/1/18 10:37
 */
@Component
@Slf4j
public class KafkaUtils
{

    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * 自定义topic
     */
    public static final String TOPIC_TEST = "topic.code-safe";
    /**
     * 自定义消费组
     */
    public static final String TOPIC_GROUP1 = "topic.group1";
    public static final String TOPIC_GROUP2 = "topic.group2";

    // 业务相关topic

    /**
     * 主题: webSocket发送消息到客户端
     */
    public static String TOPIC_WEBSOCKET_SEND_MESSAGE;

    @Autowired
    private String[] kafkaTopicName;

    /**
     * 获取配置文件中的盐值,并设置到静态变量中
     *
     * @param topic 主题
     */
    @Value("${spring.kafka.topics.twsm}")
    private void setTwsmTopic(String topic)
    {
        TOPIC_WEBSOCKET_SEND_MESSAGE = topic;
    }

    /**
     * 发送消息
     *
     * @param topic   主题
     * @param message 消息内容
     * @author: web
     * @return: void
     * @date: 2024/1/18 10:42
     */
    public void send(String topic, Object message)
    {
        if (StringUtils.isEmpty(topic) || StringUtils.isNull(message))
        {
            throw new ServiceException("生产者发送消息到kafka_主题或消息内容不能为空!");
        }
        String obj2String = JsonUtils.toJsonString(message);
        //        log.info("准备发送消息为:{}", obj2String);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj2String);
        // 监听回调
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>()
        {
            @Override
            public void onFailure(Throwable throwable)
            {
                //发送失败的处理
                log.error(topic + " - 生产者 发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult)
            {
                //成功的处理
//                log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }

    /**
     * 设置websocket发送的消息体
     *
     * @param userId 用户ID
     * @param msg    消息内容
     * @author: web
     * @return: Message 消息对象
     * @date: 2024/1/19 11:36
     */
    public Message setMessage(String userId, Object msg)
    {
        Message message = new Message();
        message.setSendUserId(userId);
        message.setSendTime(DateUtils.getTime());
        message.setSendContent(String.valueOf(msg));
        return message;
    }
}

Message类

@Data
public class Message implements Serializable
{

    private static final long serialVersionUID = -118L;

    /**
     * 发送人ID
     */
    private String sendUserId;

    /**
     * 发送人
     */
    //    private String sendUserName;

    /**
     * 发送时间
     */
    private String sendTime;

    /**
     * 发送内容
     */
    private String sendContent;
}

监听消息

/**
 * 消息接收监听器【分布式系统】
 *
 * @version: 1.0
 * @author: web
 * @date: 2024/1/19 13:44
 */
@Component
@Slf4j
public class MessageListener
{
    /**
     * 根据用户id发送消息到客户端
     *
     * @param record
     * @param ack
     * @param topic
     * @author: web
     * @return: void
     * @date: 2024/1/20 22:05
     */
    @KafkaListener(topics = "#{'${spring.kafka.topics.twsm}'}", groupId = "#{topicGroupId}")
    public void sendMessageByUserId(ConsumerRecord<String, String> record, Acknowledgment ack,
                                    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
    {
        Optional<String> optional = Optional.ofNullable(record.value());
        if (optional.isPresent())
        {
            Message message = JsonUtils.parseObject(optional.get(), Message.class);
            if (StringUtils.isNull(message))
            {
                log.error("消费者收到kafka消息的内容为空!");
                return;
            }
//            log.info("消费者收到kafka消息");
            String sendUserId = message.getSendUserId();
            String sendContent = message.getSendContent();
            // 确认收到消息
			ack.acknowledge();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/435817.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

实验七 综合实验

一&#xff0e; 下载并成功运行Anaconda,jupyter book ,spyder 输入检验(print (“hello”)) 二&#xff0e; 在jupyter prompt中安装库&#xff1a; 找到anaconda 的Scripts库&#xff0c;并复制路径以备后面安装命令 D:\Program Files\anaconda3\Scripts 进入prompt命令…

网络原理初识

一、IP地址 概念 IP 地址主要用于标识网络主机、其他网络设备&#xff08;如路由器&#xff09;的网络地址。简单说&#xff0c; IP 地址用于定位主机 的网络地址 。 就像我们发送快递一样&#xff0c;需要知道对方的收货地址&#xff0c;快递员才能将包裹送到目的地。 二、…

搭建的svn 1.14.1,拉取代码时候没输入账户密码就报错 auth failed

这边在ubuntu里面搭的svn server&#xff0c;但是拉代码的是否一直报错 auth faield&#xff0c;一开始以为是有auth cache&#xff0c;去设置里面清楚了&#xff0c;windows 里面也清楚了&#xff0c;但是还是报错 问题原因 一直排查才发现&#xff0c;我新增用户的时候&…

vue+uniapp实现图形验证码功能-插件(附源码)

一、需求背景 vueuniapp实现图形验证码功能-插件&#xff08;附源码&#xff09; 在登录系统时&#xff0c;除了密码登录&#xff0c;还需要提供验证码登录。 涉及验证码&#xff0c;为了安全&#xff0c;一般会加入图形验证码&#xff0c;然后再发短信验证码。 如图&#xff1…

为国产信创服务器提供LDAP统一身份认证方案

金融信创作为 8 大行业信创之首&#xff0c;早已成为其他行业信创建设的参考。金融行业有着极为复杂的业务场景&#xff0c;对系统有着极高的稳定可靠需求&#xff0c;因此&#xff0c;在寻找微软 AD 国产化替代方案时&#xff0c;常会涉及到更深层次的场景。例如&#xff0c;最…

计算机大数据毕业设计-基于Flask的旅游推荐可视化系统的设计与实现

基于Flask的旅游推荐可视化系统的设计与实现 编程语言&#xff1a;Python3.10 涉及技术&#xff1a;FlaskMySQL8.0Echarts 开发工具&#xff1a;PyCharm 摘要&#xff1a;以Pycharm为旅游推荐系统开发工具&#xff0c;采用B/S结构&#xff0c;使用Python语言开发旅游景点推…

电网数字孪生的开发框架

电网数字孪生的开发框架通常会综合利用多种技术和工具&#xff0c;包括数据处理、模型建立、仿真与优化等方面的工具和平台。以下是一些常用的开发框架&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。…

网络安全: Kali Linux 进行 SSH 渗透与防御

目录 一、实验 1.环境 2.nmap扫描目标主机 3.Kali Linux 进行 SSH 渗透 3.Kali Linux 进行 SSH 防御 二、问题 1.SSH有哪些安全配置 一、实验 1.环境 &#xff08;1&#xff09;主机 表1 主机 系统版本IP备注Kali Linux2022.4 192.168.204.154&#xff08;动态&…

【JavaScript 漫游】【031】window 对象总结

文章简介 本篇文章为【JavaScript 漫游】专栏的第 030 篇文章&#xff0c;记录了浏览器模型中 window 对象的相关知识点。 window 对象概述 浏览器里面&#xff0c;window 对象&#xff08;注意&#xff0c;w 为小写&#xff09;指当前的浏览器窗口。它也是当前页面的顶层对…

Java中线程操作的相关方法

当涉及到在Java中操作线程时&#xff0c;有许多内置的类和方法可供使用。下面是关于Java中线程操作的主要方法和技术的简要教程&#xff1a; 1. 创建线程 在Java中&#xff0c;有两种主要的方式来创建线程&#xff1a; - 继承 Thread 类并重写其 run() 方法。 - 实现 Runna…

深入探讨javascript的流程控制与分支结构,以及js的函数

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属的专栏&#xff1a;前端泛海 景天的主页&#xff1a;景天科技苑 文章目录 1.流程控制与分支结构1.if分支结构2.switch case 分支结构3.循环结…

C++消息队列处理提高性能的方法

1 消息队列特点 在当前多数软件系统中,处理传递消息多用消息队列机制,他具有以下优点: 1. 异步通信:消息队列支持异步通信,发送者和接收者之间的解耦程度较高。发送者将消息放入队列后即可继续执行,而不需要等待接收者的响应。这样可以提高系统的吞吐量和处理能力。 2.…

删除指定的数

删除指定的数 题目描述&#xff1a;解法思路&#xff1a;解法代码&#xff1a;运行结果&#xff1a; 题目描述&#xff1a; 先输入10个整数存放在数组中&#xff0c;再输入⼀个整数n&#xff0c;删除数组中所有等于n的数字&#xff0c;数组中剩余的数组保证数组的最前面&#…

电脑键盘快捷键,掌握这些,快速提高效率!

“我是一名电脑新手&#xff0c;在使用电脑时还有很多不懂的。想问问大家平常有什么比较好用的电脑键盘快捷键可以推荐吗&#xff1f;” 在数字化时代&#xff0c;电脑已成为我们生活与工作中不可或缺的工具。掌握一些常用的电脑键盘快捷键&#xff0c;不仅能提高我们的工作效率…

[备赛笔记]——5G大唐杯(5G考试等级考考试基础试题)

个人名片&#xff1a; &#x1f981;作者简介&#xff1a;学生 &#x1f42f;个人主页&#xff1a;妄北y &#x1f427;个人QQ&#xff1a;2061314755 &#x1f43b;个人邮箱&#xff1a;2061314755qq.com &#x1f989;个人WeChat&#xff1a;Vir2021GKBS &#x1f43c;本文由…

学生信息管理展示-h5版(uniapp+springboot+vue)

记录一下做的第一个完整的h5业务。 一、登录 二、个人中心 三、首页&#xff08;管理员&#xff09; 四、首页&#xff08;学生&#xff09; 五、视频展示 学生信息管理展示&#xff08;h5&#xff09;完整版

如何解决由于浏览器版本升级导致脚本用不了的问题【文章底部可得就业内推码】

目录 1. 使用WebDriverManager&#xff1a; 2. 手动下载更新驱动&#xff1a; 3. 设置浏览器选项&#xff1a; 4. 使用Selenium Grid&#xff1a; 5. 参考官方文档和社区&#xff1a; 面对浏览器版本升级导致的网页自动化脚本无法正常运行的问题&#xff0c;你可以采取以下…

chromedriverUnable to obtain driver for chrome using ,selenium找不到chromedriver

1、下载chromedriver chromedriver下载网址&#xff1a;CNPM Binaries Mirror 老版本在&#xff1a;chromedriver/ 较新版本在&#xff1a;chrome-for-testing/ 2、设置了环境变量还是找不到chromedriverUnable to obtain driver for chrome using NoSuchDriverException:…

Java基础及开发环境配置教程

Java基础 Java是一种广泛使用的编程语言&#xff0c;以其“一次编写&#xff0c;到处运行”的能力而闻名。无论是开发桌面应用程序、Web应用程序还是移动应用程序&#xff0c;Java都是一个优秀的选择。本文将介绍Java的基础知识和如何配置Java开发环境。 1. Java简介 Java是…

【CSP试题回顾】201609-2-火车购票

CSP-201609-2-火车购票 解题思路 初始化座位: 首先&#xff0c;它创建了一个20行5列的二维向量 seatMap 用于表示车厢的座位情况。每个座位按顺序赋予了一个编号&#xff0c;从1到100。这部分代码通过两层循环完成&#xff0c;外层循环遍历所有的排&#xff0c;内层循环遍历每…