使用Spring Boot和Kafka实现消息发送和订阅

文章目录

  • 一,新建Spring Boot
    • 1,Maven配置
    • 2,无法识别为SpringBoot项目
    • 3,无效的源发行版
    • 4,无法访问SpringApplication
    • 5,运行直接Finish
    • 6,服务运行成功
  • 二,安装启动Kafka
    • 1,下载
    • 2,配置
    • 3,启动
    • 4,其他命令
  • 三,生产消费消息
    • 1,加入依赖
    • 2,yam配置文件
    • 3,报错enabled mechanisms are []
    • 4,生产者生产消息
    • 5,订阅和消费消息
    • 6,接口
    • 7,测试结果
  • 四,参考博文

一,新建Spring Boot

最近忙着搞低代码开发,好久没新建spring项目了,结果今天心血来潮准备建个springboot项目
在这里插入图片描述
注意Type选Maven,java选8,其他默认
在这里插入图片描述

1,Maven配置

点下一步后完成就新建了一个spring boot项目,配置下Maven环境,主要是settings.xml文件,里面要包含阿里云仓库,不然可能依赖下载不下来
在这里插入图片描述
在这里插入图片描述

2,无法识别为SpringBoot项目

在maven配置没问题的前提下,IDEA无法识别这是一个Spring Boot项目,倒腾半天,终于发现问题原因所在=======>是Maven版本太高的原因
在这里插入图片描述
把.mvn/wrapper目录下的maven-wrapper.properties文件第一行的版本号降低,比如说降为3.5.4,然后重新点下Maven的同步按钮
在这里插入图片描述

3,无效的源发行版

接下来运行项目报错:java: 无效的源发行版: 14
在这里插入图片描述
修改pom.xml中java.version值为8,原来是17

	<properties>
        <java.version>17</java.version>
    </properties>

4,无法访问SpringApplication

继续运行,继续报错在这里插入图片描述
降低spring-boot-starter-parent版本,原来是3.1.3,改为2.7.2

5,运行直接Finish

继续运行,没报错,服务直接Finished
在这里插入图片描述
需要添加web依赖

 		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

6,服务运行成功

终于,一个空的spring boot项目成功跑起来了,喜极而泣
在这里插入图片描述

二,安装启动Kafka

1,下载

官网=>https://kafka.apache.org/downloads,下载最新版的kafka,目前是3.5.1
在这里插入图片描述

2,配置

解压到D盘Config目录下即完成安装,目录为D:\Config\kafka_2.13-3.5.1
修改配置文件
(1) server.properties

broker.id=1
log.dirs=/Config/kafka_2.13-3.5.1/logs-kafka

(2) zookeeper.properties

dataDir=/Config/kafka_2.13-3.5.1/logs-zookeeper

3,启动

先启动zookeeper

bin\windows\zookeeper-server-start.bat config\zookeeper.properties	

再启动kafka

bin\windows\kafka-server-start.bat config\server.properties

停止的时候,先停止kafka,再停止zookeeper,直接ctrl+c停止

4,其他命令

1,查看topic列表

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

2,查看topic具体信息

bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic test

3,创建topic

bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

三,生产消费消息

1,加入依赖

 		<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2,yam配置文件

application.yaml

spring:
  profiles:
    active: dev

application-dev.yaml

server:
  port: 8082
  servlet:
    context-path: /test-kafka

spring:
  cache:
    type: ehcache
    config: classpath:ehcache.xml
  jpa:
    database-platform: com.enigmabridge.hibernate.dialect.SQLiteDialect
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: kafka-demo-kafka-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer 
      value-serializer: org.apache.kafka.common.serialization.StringSerializer 
      retries: 10

3,报错enabled mechanisms are []

Connection to node -1 (activate.navicat.com/127.0.0.1:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []

在这里插入图片描述
这个错误我本地测试下来是因为没把账号密码配置这块注释掉
在这里插入图片描述

4,生产者生产消息

@Slf4j
@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public String sendMessage(String content) {
        String topic = "test_topic";
        kafkaTemplate.send(topic, content).addCallback(success -> {
            String topic = success.getRecordMetadata().topic();
            int partition = success.getRecordMetadata().partition();
            long offset = success.getRecordMetadata().offset();
            log.info("发送成功:主题:{},分区:{},偏移量:{}",topic,partition,offset);
        }, failure -> {
            log.info("发送失败:{}",failure.getMessage());
        });
        return "发送成功";
    }
}

5,订阅和消费消息

一,订阅主题
1,获取消费者

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.Properties;

/**
 * kafka消费者配置
 * @author liuxunming
 */
@Configuration
@Component
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;

    public KafkaConsumer<String, String> createConsumer() {

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        return consumer;
    }

}

2,订阅topic

 		KafkaConsumer<String, String> consumer = kafkaConfig.createConsumer();
        consumer.subscribe(Collections.singleton("traffic"));

3,拉取消息

 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 for (ConsumerRecord<String, String> record : records) {
		String key = record.key();
		String value = record.value();
		log.info("\n收到消息key=>{}\n收到消息value=>{}",key,value);
}

4,消费位移,释放资源

// 提交消费位移
consumer.commitSync();
// 关闭消费者以释放资源
consumer.close();

二,点对点模式

@Slf4j
@Component
public class KafkaConsumer {
    @KafkaListener(topics = {"test_topic"})
    public void handlerMsg(String content) {
        log.info("接收到消息:消息值:{} ",content);
    }
}

6,接口

@Slf4j
@RestController
public class KafkaController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @PostMapping("/sendMessage")
    public String sendMessage(@RequestParam String content) {
        kafkaProducer.sendMessage(content);
        return "ok";
    }
}

7,测试结果

在这里插入图片描述
接收到消息
在这里插入图片描述

四,参考博文

  1. 解决IDEA无法识别SpringBoot项目
  2. SpringBoot从入门到精通(十二)SpringBoot集成Kafka
  3. Kafka的下载安装以及使用
  4. Kafka消息消费流程详解
  5. Kafka之Consumer使用与基本原理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/99632.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2023年05月 C/C++(五级)真题解析#中国电子学会#全国青少年软件编程等级考试

第1题&#xff1a;问题求解 给定一个正整数N&#xff0c;求最小的M满足比N大且M与N的二进制表示中有相同数目的1。 举个例子&#xff0c;假如给定N为78&#xff0c;二进制表示为1001110&#xff0c;包含4个1&#xff0c;那么最小的比N大的并且二进制表示中只包含4个1的数是83&a…

如何更好的设计测试用例

测试用例设计的最基本要求&#xff1a;覆盖住所要测试的功能。这是再基本不过的要求了&#xff0c;但别看只是简单的一句话&#xff0c;要能够达到切实覆盖全面&#xff0c;需要对被测试产品功能的全面了解、明确测试范围(特别是要明确哪些是不需要测试的)、具备基本的测试技术…

【爬虫】实验项目二:模拟登录和数据持久化

目录 一、实验目的 二、实验预习提示 三、实验内容 实验要求 基本要求&#xff1a; 改进要求A&#xff1a; 改进要求B&#xff1a; 四、实验过程 基本要求&#xff1a; 源码如下&#xff1a; 改进要求A: 源码如下&#xff1a; 改进要求B&#xff1a; 源码如下&…

图像扭曲之万花筒

源码&#xff1a; void kaleidoscope(cv::Mat& src,cv::Mat& dst,double angle,double radius) {dst.create(src.rows, src.cols, CV_8UC3);dst.setTo(0);int cx src.cols / 2;int cy src.rows / 2;//angle PI / 4;double angle2 PI / 4;double sides radius / 3…

C++面试题(叁)---操作系统篇

目录 操作系统篇 1 Linux中查看进程运行状态的指令、查看内存使用情况的指令、 tar解压文件的参数。 2 文件权限怎么修改 3 说说常用的Linux命令 4 说说如何以root权限运行某个程序。 5 说说软链接和硬链接的区别。 6 说说静态库和动态库怎么制作及如何使用&#xff0c;区…

【网络安全防护】上海道宁与Bitdefender帮助您构建弹性网络并降低安全运营成本

在网络的世界中 风险变得更加常见与复杂 企业需要从网络安全转向网络弹性 复杂的网络攻击已非常普遍 在面临攻击时 企业如何保持业务连续性&#xff1f; Bitdefender GravityZone将 风险分析、安全加固、威胁预防 检测和响应功能相结合 帮助您构建弹性网络 并降低安全…

windows下Mysql安装配置教程

Mysql下载 在官网下载mysql community Server https://dev.mysql.com/downloads/mysql/ 可以选择下载压缩包或者MSI安装程序 使用压缩包安装 MySQL 压缩包安装通常需要以下步骤&#xff1a; 1. 下载 MySQL 安装包 你可以从 MySQL 官网上下载适合你系统的 MySQL 安装包&am…

基于PIC单片机温度-脉搏-DS18B20温度-液晶12864显示(proteus仿真+源程序)

一、系统方案 1、上电初始化液晶第一行显示脉搏&#xff0c;第二行显示温度&#xff0c;第三行显示模式&#xff0c;第四行显示强度&#xff1b;按下K1按键可以选择模式&#xff0c;催眼模式或治疗模式。 2、治疗模块下&#xff0c;可以通过K2、K3修改强度。 二、硬件设计 原理…

Day5:react函数组件与类组件

「目标」: 持续输出&#xff01;每日分享关于web前端常见知识、面试题、性能优化、新技术等方面的内容。 「主要面向群体&#xff1a;」前端开发工程师&#xff08;初、中、高级&#xff09;、应届、转行、培训、自学等同学 Day4-今日话题 react「函数组件和类组件」的区别&…

Spring-5.0.x源码下载及本地环境搭建

一、Spring源码下载 从github上下载Spring的源代码 下载地址&#xff1a;https://github.com/spring-projects/spring-framework 访问地址之后&#xff0c;打开Spring的代码页面找到你想下载的版本&#xff0c;如5.0.x&#xff0c;如下图所示&#xff1a; 下载方式一&#x…

多应用模式下,忽略项目的入口文件,重写Apache规则

多应用模式下&#xff0c;忽略项目的入口文件&#xff0c;重写Apache规则 首先&#xff0c;我的项目是具有两个应用&#xff0c;admin和index,同时给它们绑定了域名&#xff0c;但是每次访问时都需要加入项目的入口文件地址 index.php ,为了忽略这个入口文件&#xff0c;只能通…

Linux 进程

目录 进程与程序main()函数由谁调用&#xff1f;程序如何结束&#xff1f;何为进程&#xff1f;进程号 进程的环境变量应用程序中获取环境变量添加/删除/修改环境变量清空环境变量环境变量的作用 进程的内存布局进程的虚拟地址空间fork()创建子进程父、子进程间的文件共享系统调…

恢复已删除的git分支

1.打开对应项目文件夹目录,在目录下执行git命令 2.执行命令 git reflog --dateiso , 找到最后一次commit 的id 3. 执行git checkout -b 新建分支名称 commitId 就会基于commitId这次提交时工作区新建一个分支&#xff0c;就能达到我们找到删除分支的代码效果。 4.直接看ide…

HTTP协议详解:互联网通信背后的规则与秘密

个人主页&#xff1a;insist--个人主页​​​​​​ 本文专栏&#xff1a;网络基础——带你走进网络世界 本专栏会持续更新网络基础知识&#xff0c;希望大家多多支持&#xff0c;让我们一起探索这个神奇而广阔的网络世界。 目录 一、HTTP协议的基本概念 二、HTTP协议的主要特…

文件包含漏洞利用的几种方法

文章目录 安装环境启动环境漏洞花式利用蚁剑连接图片马读取敏感文件&#xff08;hosts&#xff09;读取该网站的php源码 代码审计 安装环境 安装phpstudy&#xff0c;下载MetInfo 5.0.4版本软件&#xff0c;复制到phpstudy目录下的www目录中。 打开phpstudy&#xff0c;访问浏…

【JavaSE】String类

两种创建String对象的区别 String s1 "hello"; String s2 new String("hello");s1是先查看常量池是否有 “hello” 数据空间&#xff0c;如果有就直接指向它&#xff0c;如果没有就创建然后指向它。s1最终指向的是常量池的空间地址。 s2是先在堆中创建空…

C++笔记之临时变量与临时对象与匿名对象

C笔记之临时变量与临时对象与匿名对象 code review! 文章目录 C笔记之临时变量与临时对象与匿名对象1.C中的临时变量指的是什么&#xff1f;2.C中的临时对象指的是什么&#xff1f;3.C中临时对象的作用是什么&#xff1f;什么时候要用到临时对象?4.给我列举具体的例子说明临…

问题记录:jenkins添加节点时Launch method没有Launch agents via SSH选项

jenkins问题记录 在jenkins主页&#xff0c;左侧点击Manage Jenkins&#xff0c;找到plugins选项&#xff0c;搜索如下插件安装&#xff1a; 安装完插件后&#xff0c;即可看到ssh选项出来了

NodeJS的简介以及下载和安装

本章节会带大家下载并安装NodeJs 以及简单的入门&#xff0c;配有超详细的图片&#xff0c;一步步带大家进行下载与安装 NodeJs简介关于前端与后端Node是什么&#xff1f;为什么要学习NodeNodeJS的优点&#xff1a; NodeJS的下载与安装NodeJS的下载&#xff1a; NodeJS的快速入…

浏览器端vscode docker搭建(附带python环境)

dockerfile from centos:7 #安装python环境 run yum -y install wget openssl-devel bzip2-devel expat-devel gdbm-devel readline-devel zlib-devel libffi-devel gcc make run wget https://www.python.org/ftp/python/3.9.0/Python-3.9.0.tgz run tar -xvf Python-3.9.…