SpringBoot集成系列--Kakfa

文章目录

  • 一、代码
    • 1、添加依赖
    • 2、配置kafka
    • 3、创建生产者
    • 4、创建消费者
    • 5、测试
  • 二、遇到问题
    • 1、could not be established. Broker may not be available
    • 2、Error while fetching metadata with correlation id xxx

一、代码

1、添加依赖

在pom.xml文件中添加Kafka的依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2、配置kafka

生产者项目的application.properties文件中配置kafka

spring.kafka.bootstrap-servers=192.168.56.100:9092
spring.kafka.producer.client-id=forlan-client-id
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

参数说明:

  • spring.kafka.bootstrap-servers:指定Kafka集群的地址。这是一个以逗号分隔的服务器列表,每个服务器都以IP地址和端口号的形式表示。
  • spring.kafka.producer.client-id:指定Kafka生产者的客户端ID。这是一个用于标识生产者的唯一字符串。在诊断和监控时,这个ID可以帮助识别发送给Kafka的哪些消息是由哪个生产者发送的。
  • spring.kafka.producer.key-serializer:定义Kafka的消息的键的序列化器。
  • spring.kafka.producer.value-serializer:定义Kafka的消息的值的序列化器。

消费者项目的application.properties文件中配置kafka消费者

spring.kafka.bootstrap-servers=192.168.56.100:9092
spring.kafka.consumer.client-id=forlan-client-id
spring.kafka.consumer.group-id=forlan-group-id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

参数说明:

  • spring.kafka.bootstrap-servers:指定Kafka集群的地址。这是一个以逗号分隔的服务器列表,每个服务器都以IP地址和端口号的形式表示。
  • spring.kafka.consumer.client-id:指定Kafka消费者的客户端ID。这是一个用于标识消费者的唯一字符串。在诊断和监控时,这个ID可以帮助识别从Kafka接收到的哪些消息是由哪个消费者接收的。
  • spring.kafka.consumer.group-id:指定Kafka消费者的组ID。在消费者组中,成员之间可以共享消息消费进度,以便在需要时进行重新消费或进行故障转移。
  • spring.kafka.consumer.key-deserializer:指定用于反序列化从Kafka接收的消息的键的类。
  • spring.kafka.consumer.value-deserializer:指定用于反序列化从Kafka接收的消息的值的类。

3、创建生产者

设置发送消息服务类

@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void testSend(String topic, String message) throws ExecutionException, InterruptedException {
        SendResult<String, String> stringStringSendResult = kafkaTemplate.send(topic, message).get();
        System.out.println(stringStringSendResult);
    }
}

4、创建消费者

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {
	@KafkaListener(topics = "forlan_topic")
	public void listen(String message) {
		System.out.println("Kafka收到消息:" + message);
	}
}

5、测试

编写测试类,发送消息

@Autowired
private KafkaProducer kafkaProducer;

@Test
public void testKafka() throws ExecutionException, InterruptedException {
	kafkaProducer.testSend("forlan_topic", "Forlan测试发送Kafka消息"+ LocalDateTime.now());
}

执行测试类,效果如下:
在这里插入图片描述
消费者后,会收到消息,如下:
在这里插入图片描述

二、遇到问题

1、could not be established. Broker may not be available

[Consumer clientId=consumer-forlan-group-id-1, groupId=forlan-group-id] Connection to node 1001 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

由于我们的kafka是装在容器内,使用下面这种方式,localhost指的是容器的ip,会有问题

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

正确的做法,应该是改为我们宿主机的ip,如下:

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.56.100:9092

其实就是把localhost设置为具体的ip,宿主机的ip。

2、Error while fetching metadata with correlation id xxx

 [Producer clientId=forlan-client-id-1] Error while fetching metadata with correlation id 3 : {forlan-topic=LEADER_NOT_AVAILABLE}

重启kafka即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/239189.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

conda环境下ImportError: libmkl_intel_lp64.so.1: cannot open shared object file问题解决

1 问题描述 conda环境下运行模型推理&#xff0c;出现如下错误&#xff1a; (retalking) [rootlocalhost video-retalking]# python inference.py --face examples/face/01.mp4 --audio examples/audio/01.wav --outfile results/01.mp4 Traceback (most recent call last):F…

论文笔记:A review on multi-label learning

一、介绍 传统的监督学习是单标签学习&#xff0c;但是现实中一个实例可能对应多个标签。这篇文章介绍了多标签分类的定义和评价指标、多标签学习的算法还有其他相关的任务。 二、问题相关定义 2.1 多标签学习任务 假设 X R d X R^d XRd&#xff0c;表示d维的输入空间&am…

阿里云cdn设置相同的域名路径访问不同的oss目录

1.设置回源配置&#xff0c;添加回源URL改写 2.设置跨域&#xff0c;cdn的跨域优先oss 3.回源设置

【智能家居】九、停车场车牌识别功能点(回调、解耦)

一、翔云 人工智能开放平台&#xff08;车牌识别&#xff09; 二、cJSON 库 三、实现代码 四、回调函数 五、人脸识别和车牌识别获取数据的区别 六、异步网络请求和同步网络请求的区别 七、解耦 一、翔云 人工智能开放平台&#xff08;车牌识别&#xff09; 翔云 人工智能开放…

写 SVG 动画必看!SVG系列文章4-微信公众号编写

1、基础设置 1.1 上传背景图素材 使用到的图片需要上传至微信后台&#xff0c;获取线上地址&#xff1a; 1.2 导入微信文章正文 新建图文消息&#xff0c;先输入好标题、作者&#xff0c;上传好封面图。然后在正文区域输入点文字&#xff0c;打开 chrome 调试工具&#xff0…

CorelDRAW软件2024版本好用吗?有哪些功能优势

CorelDRAW是一款综合性强大的专业平面设计软件&#xff0c;其功能覆盖了矢量图形设计、高级文字编辑、精细绘图以及多页文档和页面设计。该软件不仅适用于广告设计、包装设计&#xff0c;还广泛应用于出版、网页设计和多媒体制作等多个领域。下面就给大家介绍一下CorelDRAW这款…

台式扫描电镜中的扫描速度和扫描模式如何选择?

台式扫描电镜&#xff08;SEM&#xff09;是一种利用电子束扫描样品表面&#xff0c;通过检测样品反射或发射的次级电子、背散射电子、X 射线等信号&#xff0c;来获取样品的形貌、结构、组成和分布等信息的仪器。台式扫描电镜具有体积小、操作简单、样品制备方便、分辨率高、成…

论文怎么改才能降低重复率

一、引言&#xff1a;智能工具助力&#xff0c;轻松降低论文重复率 论文的重复率是学术写作中的重要问题&#xff0c;如何有效降低重复率成为了许多研究者的关注焦点。如今&#xff0c;智能工具的发展为我们提供了更多选择。本文将介绍几种实用的智能工具&#xff0c;包括快码…

PyInstaller 打包 Python 脚本为 .exe 可执行文件闪退、No Model named XXX问题

文章目录 前言.exe 可执行文件闪退No Model named XXXPython 环境问题查看当前python路径查看当前python环境使用的site-package路径 个人简介 前言 在上一篇文章中&#xff0c;我们介绍了如何将 Python 脚本打包为 .exe 可执行文件&#xff0c;但有时候打包生成的 .exe 文件会…

EasyV易知微数字孪生助力解决实际行业问题与痛点

数字孪生技术在当前多个领域得到了广泛的应用&#xff0c;特别是在航空航天、工业、城市和医学等领域&#xff0c;它被视为许多科技企业所关注的焦点。这种技术已经成为实现智能化的重要手段&#xff0c;它可以应用于项目设计、建造和运营等各个阶段&#xff0c;能够解决实际问…

总线一:I2C简介(介绍看这一篇就够啦)

本节主要介绍以下内容&#xff1a; I2C协议简介 STM32的I2C特性及架构 I2C初始化结构体详解 一、I2C协议简介 I2C 通讯协议(Inter&#xff0d;Integrated Circuit)是由Phiilps公司开发的&#xff0c;由于它引脚少&#xff0c;硬件实现简单&#xff0c;可扩展性强&#xff…

C/C++,动态 DP 问题的计算方法与源程序

1 文本格式 #include <bits/stdc.h> using namespace std; typedef long long LL; const int maxn 500010; const int INF 0x3f3f3f3f; int Begin[maxn], Next[maxn], To[maxn], e, n, m; int size[maxn], son[maxn], top[maxn], fa[maxn], dis[maxn], p[maxn], i…

HelpLook VS Confluence:知识管理方面谁更有优势?

多年来&#xff0c;在线协作和文档工具市场一直被Confluence所主导。Confluence由Atlassian于2004年创立&#xff0c;很迅速地成为企业寻求强大而全面的协作解决方案和知识管理的热门选择。然而&#xff0c;随着新工具如Notion和HelpLook的出现&#xff0c;市场格局发生了变化&…

OpenVINS学习3——初始化原理学习

一、OpenVINS初始化概述 VIO初始化的主要意义有&#xff1a; &#xff08;1&#xff09;对齐相机的世界坐标系和惯性系&#xff0c;因此需要估计重力方向。 &#xff08;2&#xff09;为后续的VIO算法提供较为准确的初始参数和状态&#xff08;尺度、IMU bias、初始速度&…

记录hive/spark取最新且不为null的方法

听标题可能听不懂我想表达的意思&#xff0c;我来描述一下我要做的事&#xff1a; 比如采集同学对某一网站进行数据采集&#xff0c;同一个用户每天会有很多条记录&#xff0c;所以我们要取一条这个用户最新的状态&#xff0c;比如用户改了N次昵称&#xff0c;我们只想得到最后…

C++STL之List的实现

首先我们要实现List的STL,我们首先要学会双向带头链表的数据结构。那么第一步肯定是要构建我们的节点的数据结构。 首先要有数据域&#xff0c;前后指针域即可。 再通过模板类进行模板化。 然后再写List的构造函数&#xff0c;这个地方用T&,通过引用就可以减少一次形参拷…

坑爹的奥数(枚举法)

枚举法是一种解决问题的基本方法&#xff0c;它通过列举问题的所有可能情况来找到问题的解。这种方法适用于问题的解空间相对较小&#xff0c;可以通过穷举所有可能的解来找到最优解或满足特定条件的解。 以下是枚举法的一般步骤&#xff1a; 定义问题&#xff1a; 确定问题的…

学习-面试java基础-(集合)

String 为什么不可变&#xff1f; 1线程安全 2支持hash映射和缓存。因为String的hash值经常会使用到&#xff0c;比如作为 Map 的键&#xff0c;不可变的特性使得 hash 值也不会变&#xff0c;不需要重新计算。 3出于安全考虑。网络地址URL、文件路径path、密码通常情况下都是以…

易点易动设备管理系统:助力企业高效巡检的智能选择

在现代企业管理中&#xff0c;设备巡检是确保设备正常运行和生产高效的重要环节。然而&#xff0c;传统的巡检方式常常面临着效率低下、信息不准确等问题。为了解决这些挑战&#xff0c;易点易动设备管理系统应运而生。本文将详细介绍易点易动设备管理系统如何助力企业实现高效…

红队攻防实战之DEATHNOTE

难道向上攀爬的那条路&#xff0c;不是比站在顶峰更让人热血澎湃吗 渗透过程 获取ip 使用Kali中的arp-scan工具扫描探测 端口扫描 可以看到开放了22和80端口。 访问80端口&#xff0c;重定向到 修改hosts文件&#xff0c;将该域名解析到ip 如图 修改完再次访问&#xff0…