SpringBoot集成Kafka

Kafka是一种基于分布式发布-订阅消息系统的开源软件。 将消息存储在可配置数量的分区中,以便实现横向扩展,并且支持多个生产者和消费者,具有良好的可靠性保证机制。
Kafka还支持数据复制、故障转移和离线数据处理等功能,并被广泛应用于网站活动跟踪、日志收集与分析、流式处理、消息队列等场景

环境搭建

采用docker-compose搭建测试环境,具体配置如下:

version: '3'

# 网桥 -> 方便相互通讯
networks:
  kafka:
    ipam:
      driver: default
      config:
        - subnet: "172.22.6.0/24"

services:
  zookepper:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latest                    # 原镜像`bitnami/zookeeper:latest`
    container_name: zookeeper-server                        # 容器名为'zookeeper-server'
    restart: unless-stopped                                  # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
    volumes:                                         # 数据卷挂载路径设置,将本机目录映射到容器目录
      - "/etc/localtime:/etc/localtime"
    environment:
      ALLOW_ANONYMOUS_LOGIN: yes
    ports:                                           # 映射端口
      - "2181:2181"
    networks:
      kafka:
        ipv4_address: 172.22.6.11

  kafka:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1                                # 原镜像`bitnami/kafka:3.4.1`
    container_name: kafka                                    # 容器名为'kafka'
    restart: unless-stopped                                          # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
    volumes:                                                 # 数据卷挂载路径设置,将本机目录映射到容器目录
      - "/etc/localtime:/etc/localtime"
    environment:
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181                          # zookeeper地址
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092        # TODO 填写域名或主机IP -- 让客户端能够监听消息  ( host.docker.internal:自动识别主机IP,在Windows或Mac上运行Docker有效 )
    ports:                              # 映射端口
      - "9092:9092"
    depends_on:                         # 解决容器依赖启动先后问题
      - zookepper
    networks:
      kafka:
        ipv4_address: 172.22.6.12

  # kafka-map图形化管理工具
  kafka-map:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-map                         # 原镜像`dushixiang/kafka-map:latest`
    container_name: kafka-map                            # 容器名为'kafka-map'
    restart: unless-stopped                                          # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
    volumes:
      - "./kafka/kafka-map/data:/usr/local/kafka-map/data"
    environment:
      DEFAULT_USERNAME: admin
      DEFAULT_PASSWORD: 123456
    ports:                              # 映射端口
      - "8080:8080"
    depends_on:                         # 解决容器依赖启动先后问题
      - kafka
    networks:
      kafka:
        ipv4_address: 172.22.6.13

启动 : docker-compose -f docker-compose-kafka.yml -p kafka up -d

访问:http://127.0.0.1:8080 账号密码:admin/123456

①、添加集群
在这里插入图片描述

②、创建topic
在这里插入图片描述

一、依赖:


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>kafaka</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <!-- kafka依赖 begin -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- kafka依赖 end -->

    </dependencies>
</project>

二、配置文件和启动类


server.port=8088
#### kafka配置生产者 begin ####
#============== kafka ===================
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=127.0.0.1:9092

#=============== provider  =======================
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432

#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#### kafka配置生产者 end ####


#### kafka配置消费者 start ####
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=test
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=true
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=1000

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#### kafka配置消费者 end ####
@SpringBootApplication
public class DemoApplication {

   public static void main(String[] args) {
      SpringApplication.run(DemoApplication.class, args);
   }
}

三、生产者

@Controller
@RequestMapping("/api/kafka/")
public class KafkaController{
	
	@Autowired
	private KafkaTemplate<String,Object> kafkaTemplate;

	@GetMapping("send")
	@ResponseBody
	public boolean send(@RequestParam String message){
		
		try{
			kafkaTemplate.send("testTopic", message);
            System.out.println("消息发送成功...");
		}catch(Exception e){
			e.printStackTrace();
		}

		return true;
	}

	@GetMapping("test")
    @ResponseBody
    public String test(){
        System.out.println("hello world!");
        return "ok";
    }
}

四、消费者

//消费者监听topic=testTopic的消息
@Component
public class ConsumerListener {
     
    @KafkaListener(topics = "testTopic")
    public void onMessage(String message){
        //insertIntoDb(buffer);//这里为插入数据库代码
        System.out.println("message: " + message);
    }

}

测试:

生产者:http://localhost:8088/api/kafka/send?message=aaabbbcccdddd
消费者接收到消息aaabbbcccdddd

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/437735.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【面试必看!】如何介绍项目,如何陈述工作经历?

“简单介绍一下你的工作经历”&#xff0c;这是很多面试者在面试中遇到的第一个问题。因为现在招聘单位都很看重工作经验&#xff0c;尤其是软件行业&#xff0c;所以我想有必要在这里说一下如何在面试中陈述自己的工作经历。 在“说道理”之前&#xff0c;先举几个小例子&…

备战蓝桥(模板篇)

扩展欧德里几算法 质数筛 分解质因数 LCA BFS floyd Dijkstra prime 日期是否合法 Tire异或 模拟散列表 字符哈希 Tire字符串统计

【Linux篇】gdb的使用

&#x1f49b;不要有太大压力&#x1f9e1; &#x1f49b;生活不是选择而是热爱&#x1f9e1; &#x1f49a;文章目录&#x1f49a; 1. 背景知识2. 使用 1. 背景知识 1. 程序发布的方式有两种&#xff0c;debug模式和release模式 2. Linux下&#xff0c;gcc和g编译生成的可执行…

【MybatisPlus】QueryWrapper、UpdateWrappe、LambdaQueryWrapper、LambdaUpdateWrapper

一、Wrapper简介 QueryWrapper、UpdateWrapper、LambdaQueryWrapper 和 LambdaUpdateWrapper 都是 MyBatis-Plus 框架中用于构建条件的工具类&#xff0c;它们之间的关系是继承关系。其中 QueryWrapper 和 UpdateWrapper 是基于普通的对象属性名来构建条件的&#xff0c;而 La…

mongo用命令将csv导入导出数据表

1、准备csv数据 2、新建数据库 数据库名&#xff1a;POIDB&#xff0c;用Robo数据库可视化软件操作的 3、命令行导入csv文件 命令模板 mongoimport --db mydatabase --collection mycollection --type csv --headerline --file data.csv导入kword文件命令&#xff0c;数据…

相机标定实验

相机标定 文章目录 相机标定1 ROS标定1.1安装标定程序1.2 下载标定板1.3 标定1.4 标定结果 2 Kalibr相机标定2.1 下载官方提供的标定板2.2 自定义标定板2.3 cam数据录制2.4 标定2.5 输出结果 3 MATLAB标定3.1 打开工具3.2 添加标定板图片3.3 设置标定参数3.4 生成标定结果3.5 标…

【网络原理】初识网络原理

目录 &#x1f384;网络发展史&#x1f338;独立模式&#x1f338;网络互连&#x1f33b;局域网LAN&#x1f33c;基于网线直连&#x1f33c;基于集线器组建&#x1f33c;基于交换机组建&#x1f33c;基于交换机和路由器组建 &#x1f33b;广域网WAN &#x1f333;网络通信基础&…

如何在小程序中绑定身份证

在小程序中绑定身份证信息是一项常见的需求&#xff0c;特别是在需要进行实名认证或者身份验证的场景下。通过绑定身份证信息&#xff0c;可以提高用户身份的真实性和安全性&#xff0c;同时也为小程序提供了更多的个性化服务和功能。下面就介绍一下怎么在小程序中绑定居民身份…

position定位学习

加了绝对定位的盒子不能通过margin:0 auto水平居中 脱标元素不会产生外边距合并问题

vue svelte solid 虚拟滚动性能对比

前言 由于svelte solid 两大无虚拟DOM框架&#xff0c;由于其性能好&#xff0c;在前端越来越有影响力。 因此本次想要验证&#xff0c;这三个框架关于实现表格虚拟滚动的性能。 比较版本 vue3.4.21svelte4.2.12solid-js1.8.15 比较代码 这里使用了我的 stk-table-vue(np…

Vue3实现页面跳转功能

目标&#xff1a; 首页&#xff1a; 点击About后&#xff1a; 第一步&#xff1a;安装 Vue Router和创建你先 npm install vue-router4第二步&#xff1a;在router.js中设置路由 import { createRouter, createWebHistory } from vue-router; import Home from ./views/Home…

HTML超详细简介

HTML是什么 超文本标记语言&#xff08;HyperText Mark-up Language &#xff09;用来设计网页的标记语言用该语言编写的文件&#xff0c;以 .html或 .htm为后缀由浏览器解释执行不区分大小写&#xff0c;建议小写 HTML标签 HTML用于描述功能的符号成为“标签”标签都封装在…

b站小土堆pytorch学习记录—— P25-P26 网络模型的使用和修改、保存和读取

文章目录 一、修改1.方法2.代码 二、保存和读取1.方法2.代码&#xff08;1&#xff09;保存&#xff08;2&#xff09;加载 3.陷阱 一、修改 1.方法 add_module(name: str, module: Module) -> None name 是要添加的子模块的名称。 module 是要添加的子模块。 调用 add_m…

部署SpringBoot项目

方案一&#xff1a;纯手工部署 1&#xff0c;购买一台云服务器 这里我使用腾讯云&#xff0c;推荐Centos8/Centos7.6 2&#xff0c;安装springBoot项目所需要的环境 1&#xff0c;数据库单独安装在另一台服务器上&#xff0c;只需要修改IP地址即可 2&#xff0c;安装jdk yum…

泛型 --java学习笔记

什么是泛型 定义类、接口、方法时&#xff0c;同时声明了一个或者多个类型变量&#xff08;如&#xff1a;<E>&#xff09;&#xff0c;称为泛型类、泛型接口&#xff0c;泛型方法、它们统称为泛型 可以理解为扑克牌中的癞子&#xff0c;给它什么类型它就是什么类型 如…

如何将中科方德桌面操作系统加入Windows域

往期文章&#xff1a;自定义SSH客户端连接时的显示信息 | 统信UOS | 麒麟KYLINOS Hello&#xff0c;大家好啊&#xff0c;今天我非常高兴地给大家带来一篇关于如何将中科方德桌面操作系统加入Windows域的教程文章。对于使用中科方德桌面操作系统的用户来说&#xff0c;将其加入…

TSINGSEE配电房/配电站/变电站远程视频智能监管、无人值守方案

一、背景需求分析 随着社会的快速发展和科技进步&#xff0c;电力作为现代社会的核心驱动力&#xff0c;其稳定运行与安全管理变得愈发重要。特别是在配电房这一关键环节中&#xff0c;实施高效的远程视频智能监管方案&#xff0c;不仅能够有效提升电力供应的可靠性&#xff0…

模拟实现std::string类(包含完整、分文件程序)

std库中的string是一个类&#xff0c;对string的模拟实现&#xff0c;既可以复习类的特性&#xff0c;也可以加深对std::string的理解。 &#x1f308;一、搭建框架 ☀️1.新命名空间 本质上string是一个储存在库std里面的类&#xff0c;现在需要模拟实现一个string类&#…

Linux:kubernetes(k8s)探针ReadinessProbe的使用(9)

本章yaml文件是根据之前文章迭代修改过来的 先将之前的pod删除&#xff0c;然后使用下面这个yaml进行生成pod apiVersion: v1 # api文档版本 kind: Pod # 资源对象类型 metadata: # pod相关的元数据&#xff0c;用于描述pod的数据name: nginx-po # pod名称labels: # pod的标…

好物周刊#46:在线工具箱

https://github.com/cunyu1943 村雨遥的好物周刊&#xff0c;记录每周看到的有价值的信息&#xff0c;主要针对计算机领域&#xff0c;每周五发布。 一、项目 1. twelvet 一款基于 Spring Cloud Alibaba 的权限管理系统&#xff0c;集成市面上流行库&#xff0c;可以作用为快…