springboot集成kafka快速入门demo

一、kafka介绍

Kafka是一种基于分布式发布-订阅消息系统的开源软件。 其目标是提供高吞吐量、低延迟、可扩展性和容错能力。 Kafka中将消息存储在可配置数量的分区中,以便实现横向扩展,并且支持多个生产者和消费者,具有良好的可靠性保证机制。 除此之外,Kafka还支持数据复制、故障转移和离线数据处理等功能,并被广泛应用于网站活动跟踪、日志收集与分析、流式处理、消息队列等场景

二、环境搭建

采用docker-compose搭建测试环境,具体配置如下:

version: '3'


# 网桥 -> 方便相互通讯
networks:
  kafka:
    ipam:
      driver: default
      config:
        - subnet: "172.22.6.0/24"


services:
  zookepper:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latest                    # 原镜像`bitnami/zookeeper:latest`
    container_name: zookeeper-server                        # 容器名为'zookeeper-server'
    restart: unless-stopped                                  # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
    volumes:                                         # 数据卷挂载路径设置,将本机目录映射到容器目录
      - "/etc/localtime:/etc/localtime"
    environment:
      ALLOW_ANONYMOUS_LOGIN: yes
    ports:                                           # 映射端口
      - "2181:2181"
    networks:
      kafka:
        ipv4_address: 172.22.6.11


  kafka:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1                                # 原镜像`bitnami/kafka:3.4.1`
    container_name: kafka                                    # 容器名为'kafka'
    restart: unless-stopped                                          # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
    volumes:                                                 # 数据卷挂载路径设置,将本机目录映射到容器目录
      - "/etc/localtime:/etc/localtime"
    environment:
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181                          # zookeeper地址
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092        # TODO 填写域名或主机IP -- 让客户端能够监听消息  ( host.docker.internal:自动识别主机IP,在Windows或Mac上运行Docker有效 )
    ports:                              # 映射端口
      - "9092:9092"
    depends_on:                         # 解决容器依赖启动先后问题
      - zookepper
    networks:
      kafka:
        ipv4_address: 172.22.6.12


  # kafka-map图形化管理工具
  kafka-map:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-map                         # 原镜像`dushixiang/kafka-map:latest`
    container_name: kafka-map                            # 容器名为'kafka-map'
    restart: unless-stopped                                          # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
    volumes:
      - "./kafka/kafka-map/data:/usr/local/kafka-map/data"
    environment:
      DEFAULT_USERNAME: admin
      DEFAULT_PASSWORD: 123456
    ports:                              # 映射端口
      - "8080:8080"
    depends_on:                         # 解决容器依赖启动先后问题
      - kafka
    networks:
      kafka:
        ipv4_address: 172.22.6.13

启动测试环境

docker-compose -f docker-compose-kafka.yml -p kafka up -d

kafka-map

https://github.com/dushixiang/kafka-map

  • 访问:http://127.0.0.1:8080

  • 账号密码:admin/123456

添加集群adf150a6f4a65f04ad3dd642b62c2139.png创建topic43bcab1d79f24d35533e30bd7fc9ced0.png

三、代码工程

本工程主要是测试生产者发送消息到主题【testTopic】,然后消费者接收消息

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>


    <artifactId>kafaka</artifactId>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <!-- kafka依赖 begin -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- kafka依赖 end -->


    </dependencies>
</project>

属性文件

server.port=8088
#### kafka配置生产者 begin ####
#============== kafka ===================
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=127.0.0.1:9092


#=============== provider  =======================
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432


#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1


# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer


#### kafka配置生产者 end ####




#### kafka配置消费者 start ####
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=test
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=1000


# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer


#### kafka配置消费者 end ####

消费者

package com.et.kafaka.listener;


import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;


/**   
 * 消费者监听topic=testTopic的消息
 *
 * @author Lynch 
 */
@Component
public class ConsumerListener {
     
    @KafkaListener(topics = "testTopic")
    public void onMessage(String message){
        //insertIntoDb(buffer);//这里为插入数据库代码
        System.out.println("message: " + message);
    }


}

生产者

package com.et.kafaka.controller;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;


/**   
 *
 *
 * @author Lynch 
 */
@Controller
@RequestMapping("/api/kafka/")
public class KafkaController {
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;


    @GetMapping("send")
    @ResponseBody
    public boolean send(@RequestParam String message) {
        try {
            kafkaTemplate.send("testTopic", message);
            System.out.println("消息发送成功...");
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        return true;
    }
    
    @GetMapping("test")
    @ResponseBody
    public String test(){
        System.out.println("hello world!");
        return "ok";
    }
}

启动类

package com.et.kafaka;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class DemoApplication {


   public static void main(String[] args) {
      SpringApplication.run(DemoApplication.class, args);
   }
}

代码仓库

  • https://github.com/Harries/springboot-demo

四、测试

  1. 启动java应用,

  2. 生产者:http://localhost:8088/api/kafka/send?message=aaabbbcccdddd

  3. 消费者接收到消息aaabbbcccdddd

消息发送成功...
message: aaabbbcccdddd

五、引用

  • https://spring.io/projects/spring-kafka

  • https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/

  • http://www.liuhaihua.cn/archives/710233.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/412572.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【精选】Java面向对象进阶——静态内部类和局部内部类

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【Java】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏 …

SpringCloud有哪些组件

什么是SpringCloud&#xff1f; Spring Cloud是基于Spring Boot的分布式系统开发工具&#xff0c;它提供了一系列开箱即用的、针对分布式系统开发的特性和组件&#xff0c;用于帮助开发人员快速构建和管理云原生应用程序。 Spring Cloud的主要目标是解决分布式系统中的常见问题…

FL Studio Fruity Edition2024中文入门版Win/Mac

FL Studio Fruity Edition2024是一款功能强大的音乐制作软件&#xff0c;适合初学者和音乐爱好者使用。它提供了丰富的音乐制作工具&#xff0c;包括音频录制、编辑、混音以及MIDI制作等功能&#xff0c;帮助用户轻松创作出动人的音乐作品。 FL Studio 21.2.3 Win-安装包下载如…

Linux之定时任务①(实施必会!!!)

文章目录 常见脚本一、 什么是crond二、crond的使用场景一、apache服务器监控三、crond服务四、命令格式五、cron格式六、定时任务备份七、数据库定时备份八、使用shell脚本发送邮件 常见脚本 [rootlocalhost ~]# vim apacheSentry.sh#!/bin/bash # author: tt # description:…

DAY34--learning English

一、积累 1.listless 2.sanction 3.inflict 4.stung 5.droplet 6.rot 7.soil 8.welfare 9.flock 10.mitigate 11.incubation 12.feces 13.urine 14.odor 15.sprinkle 16.guresome 17.slaughter 18.antibiotic 19.certify 20.tray 二、练习 1.牛津原译 Listless adj. /ˈlɪst…

【毛毛讲书】【老而不衰的科学】长寿的秘诀究竟是什么?

重磅推荐专栏&#xff1a; 《大模型AIGC》 《课程大纲》 《知识星球》 本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域&#xff0c;包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用&#xff0c;以及与之相关的人工智能生成内容&#xff…

用GGUF和Llama .cpp量化Llama模型

用GGUF和Llama .cpp量化Llama模型 什么是GGML如何用GGML量化llm使用GGML进行量化NF4 vs. GGML vs. GPTQ结论 由于大型语言模型&#xff08;LLMS&#xff09;的庞大规模&#xff0c;量化已成为有效运行它们的必要技术。通过降低其权重的精度&#xff0c;您可以节省内存并加快推理…

IP 电话

1 IP 电话概述 IP 电话是在互联网上传送多媒体信息。 多个英文同义词&#xff1a; VoIP (Voice over IP) Internet Telephony VON (Voice On the Net) 1.1 狭义的和广义的 IP 电话 狭义的 IP 电话&#xff1a;指在 IP 网络上打电话。 广义的 IP 电话&#xff1a;不仅仅是…

二 线性代数-向量

1、向量的表示方法&#xff1a; 其中的 i、j、k是坐标轴方向的单位向量。 2、向量的模&#xff1a; 用坐标计算的方法&#xff1a; 3、向量的运算&#xff1a; 3.1 向量的加法减法&#xff1a; 3.2 向量的数乘&#xff1a; 拉格朗日乘数法的 基础 公式。 3.3 向量的数量积&a…

分布式ID生成方案详解

✨✨ 祝屏幕前的您天天开心 &#xff0c;每天都有好运相伴。我们一起加油&#xff01;✨✨ &#x1f388;&#x1f388;作者主页&#xff1a; 喔的嘛呀&#x1f388;&#x1f388; 目录 引言 一. UUID&#xff08;Universally …

mysql的增删改查(常用)

增(insert) 语法&#xff1a; insert into 表名&#xff08;字段&#xff09; values( 字段对应的值) 案例&#xff1a; 创建一个学生表 结构如下&#xff1a; create table student(id int ,name varchar(20),age int); 向表中插入2条数据 create table student(id int ,n…

设计模式-结构型模式-组合模式

组合模式&#xff08;Composite Pattern&#xff09;&#xff1a;组合多个对象形成树形结构以表示具有“部分—整体”关系的层次结构。组合模式对单个对象&#xff08;即叶子对象&#xff09;和组合对象&#xff08;即容器对象&#xff09;的使用具有一致性&#xff0c;又可以称…

24考研成绩查询时间已公布!附最全查分攻略!

2月26日早上9点起&#xff01; 2024考研初试成绩即将公布&#xff01; 考研初试成绩即将公布&#xff0c;同学们都在紧张地期待着自己的成绩。不同院校的成绩查询入口开通时间有所不同&#xff0c;具体时间请大家查看各自官网的通知。 成绩在哪查&#xff1f;怎么查&#xff1…

亚马逊巨头都在用的自养号大法,赶快get!

随着时间的推移&#xff0c;越来越多做亚马逊生意的朋友开始意识到自养号的重要性。拥有自养号意味着掌握了一手资源&#xff0c;这种自主性让人感到更安全。高权重的买家号可以享有更多的操作权限&#xff0c;也能获得更好的效果。然而&#xff0c;要想成功地养好自养号并不是…

面试经典150题【31-40】

文章目录 面试经典150题【31-40】76.最小覆盖字串36.有效的数独54.螺旋矩阵48.旋转图像73.矩阵置零289.生命游戏383.赎金信205.同构字符串290.单词规律242.有效的字母异位词 面试经典150题【31-40】 76.最小覆盖字串 基本思路很简单&#xff0c;就是先移动右边到合适位置。再移…

Java SpringBoot 获取 yml properties 自定义配置信息

Java SpringBoot 获取 yml properties 自定义配置信息 application.yml server:port: 9090servlet:context-path: /app第一种方法 HelloController package com.zhong.demo01.controller;import org.springframework.beans.factory.annotation.Value; import org.springfram…

SAP中分包后续调整应用实例二(调减)

之前己写过一篇介绍过分包后续调整功能MB04的基本应用。当时的场景是某个原材料由于各方面原因&#xff08;比如没有维护到BOM中&#xff09;&#xff0c;在委外加工模式成品收货后&#xff0c;并没有消耗或少消耗&#xff0c;这时可以用该事务功能来补充消耗。在生产报工中的M…

集团机构组网

在数字化转型的浪潮中&#xff0c;企业网络需求日益复杂化&#xff0c;尤其是对于大规模的集团机构来说&#xff0c;高效、安全且可靠的网络连接成为了业务发展的关键。传统网络架构已难以满足这些需求&#xff0c;而SD-WAN&#xff08;软件定义广域网&#xff09;技术的崛起&a…

【总第49篇】2.3深度学习开发任务实例(2)机器学习和深度学习的对比【大厂AI课学习笔记】

机器学习和深度学习都是用于图片分类任务的强大工具&#xff0c;但它们采用的方法和原理有所不同。下面我将分别解释这两种技术是如何应用于图片分类的&#xff0c;并着重讨论深度学习中的卷积概念。 机器学习在图片分类中的应用 传统的机器学习方法在进行图片分类时&#xf…

干洗行业上门预约解决方案,干洗店洗鞋店小程序开发;

互联网干洗店洗鞋店小程序,企业干洗方案,干洗行业小程序,上门取衣小程序,预约干洗小程序,校园干洗店小程序,工厂干洗店小程序,干洗店小程序开发&#xff1b; 一、干洗店洗鞋店小程序核心功能介绍: 1.(支持上门取送、送货到店、寄存网点、智能衣柜四种下单方式) 用户下单-上门取…