Springboot整合Kafka消息队列服务实例

一、Kafka相关概念

1、关于Kafka的描述

Kafka是由Apache开源,具有分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式处理平台,由Scala和Java语言编写。通常用来搜集用户在应用服务中产生的动作日志数据,并高速的处理。日志类的数据需要高吞吐量的性能要求,对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

2、关于Kafka的功能特点

  1. 通过磁盘数据结构提供消息的持久化,消息存储也能够保持长时间稳定性;
  2. 高吞吐量,即使是非常普通的硬件Kafka也可以支持每秒超高的并发量;
  3. 支持通过Kafka服务器和消费机集群来分区消息;
  4. 支持Hadoop并行数据加载;
  5. API包封装的非常好,简单易用,上手快 ;
  6. 分布式消息队列。Kafka对消息保存时根据Topic(主题)进行归类,发送消息者称为Producer(生产者),消息接受者称为Consumer(消费者);

3、Kafka消息功能

如下图所示,Kafka作为一个中间服务,代表一个broker(经纪人)角色,负责接收APP的消费与推送消息给其他相关APP。这里APP可分为Producer,Consumer。

消息的消费模式

点对点模式:点对点模式通常是一个基于拉取或者轮询的消息传递模型,消费者主动拉取数据,消息收到后从队列移除消息,这种模型不是将消息推送到客户端,而是从队列中请求消息。特点是发送到队列的消息被一个且只有一个消费者接收处理,即使有多个消费者监听队列也是如此。

发布订阅模式:订阅模式是一个基于推送的消费传送模型,消息产生后,Kafka会推送给所有订阅相关Topic的订阅者。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

4、Kafka消息队列的作用

  • 应用程序之间解耦,生产者与消费者相互独立,各自异步执行。
  • 消息数据持久化存储,直到所有消息都被消费,规避消息数据丢失的风险。
  • 流量削峰,使用Kafka消息队列可以帮助server承接访问压力,尽可能避免应用程序崩溃。
  • 降低进程间的耦合度,系统部分应用组件发生崩溃时,不会影响到整体系统的运行。
  • 保证消息顺序执行,解决特定场景业务需求。

5、Kafka相关术语介绍

  • Broker

   一台kafka服务器就是一个broker(经纪人)。一个集群由多个broker组成。一个broker可以容纳多个topic(消息主题)。

  • Producer

    消息生产者,就是向kafka broker发消息的APP客户端。

  • Consumer

    消息消费者,向kafka broker取消息的APP客户端。

  • Topic

    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,可以理解为一个队列。

  • Consumer Group

     每个Consumer属于一个特定的Consumer Group,可为每个Consumer指定group name,若不指定group name则属于默认的分组。

  • Partition

一个庞大大的topic可以分布到多个broker上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体的顺序。Partition是物理上的概念,方便在集群中扩展,提高并发。

二、liunx系统下搭建Kafka环境

       

--新建kafka应用目录。并下载到当前目录下
cd  /usr/local

mkdir kafka

cd kafka 
--下载

wget https://downloads.apache.org/kafka/3.7.0/kafka-3.7.0-src.tgz

--解压

tar -zxvf  kafka-3.7.0-src.tgz

--启动服务

cd kafka-3.7.0

./bin/kafka-server-start.sh    config/server.properties

--查看服务

ps -aux |grep kafka

--开放kafka地址端口

vim server.properties

--添加下面注释

advertised.listeners=PLAINTEXT://10.98.3.22:9092

三、Springboot2整合Kafka 服务

1、导入基础依赖

<!-- SpringBoot依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafka 依赖 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.4.RELEASE</version>
</dependency>

2、项目目录结构

3、生产者与消费者yml文件配置

#消费者配置
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: test-consumer-group


#生产者配置

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092

4、生成消息

@RestController
@RequestMapping("/kafka")
public class ProducerController {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public HttpResult sendMsg () {
        MsgLog msgLog = new MsgLog(1,"消息生成",
                                 1,"消息日志",new Date()) ;
        String msg = JSON.toJSONString(msgLog) ;
        // 这里Topic如果不存在,会自动创建
        kafkaTemplate.send("cicada-topic", msg);
        return HttpResult.create(HttpStatus.SUCCESS,msg);
    }
}
@Component
public class ConsumerMsg {

    private static Logger LOGGER = LoggerFactory.getLogger(ConsumerMsg.class);
    
    //此注解是监听主题为cicada-topic的消息队列
    @KafkaListener(topics = "cicada-topic")
    public void listenMsg (ConsumerRecord<?,String> record) {
        String value = record.value();
        LOGGER.info("ConsumerMsg====>>"+value);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/720139.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Erlang程序设计[Part1-Part2 chapter4]

前言&#xff1a; 环境安装 Erlang Shell&#xff0c;CSDN搜教程 Erlang Shell实操 启动 erl Erlang Shell以表达式为执行单位&#xff1f; 结束标志 .加上回车 Part1 为何用 Erlang chapter 1 什么是并发 并发vs并行 并发 单核cpu运行多个进程 一次运行一个 并行 多…

css-Echarts图表柱状图,X轴横坐标值显示不完全问题

1.问题 在Echarts图表中当横坐标数值过多&#xff0c;或者数值过长时会导致部分横坐标不显示。数据量少或简单会默认显示完全&#xff0c;当放大时会导致部分横坐标隐藏。 更改第一个Mon字段名 会发现偶数横坐标显示隐藏&#xff1b; 2.解决方法 2.1 在x横坐标中添加interval…

CPU占用100%的解决办法

上课&#xff0c;先看问题 这里看有个.logrotate的进程占用CPU最高&#xff0c;这里就需要把这个清理掉 执行 ps aux | grep logrotate然后将这个进程杀掉 kill -9 3194067然后再看CPU占有率就下来了 下课

2024北京智源大会开幕,智源推出大模型全家桶及全栈开源技术基座新版图,大模型先锋集结共探AGI之路

2024年6月14日&#xff0c;第六届“北京智源大会”在中关村展示中心开幕。 北京智源大会是智源研究院主办的“AI内行顶级盛会”&#xff0c;以“全球视野、思想碰撞、前沿引领”为特色&#xff0c;汇聚海内外研究者分享研究成果、探寻前沿知识、交流实践经验。2024北京智源大会…

STM32学习和实践笔记(36):DAC数模转换实验

1.STM32F1 DAC简介 DAC&#xff08;Digital to analog converter&#xff09;即数字模拟转换器&#xff0c;它可以将数字信号转换为模拟信号。它的功能与ADC相反。在常见的数字信号系统中&#xff0c;大部分传感器信号被转化成电压信号&#xff0c;而 ADC 把电压模拟信号转换成…

哈尔滨等保测评流程

哈尔滨的等保测评程序是一项严格的、系统化的检测程序&#xff0c;其目的在于保证信息系统的安全、稳定。下面详细介绍了这个过程&#xff1a; 一、引言 随着信息技术的飞速发展&#xff0c;信息系统在各行各业中的应用越来越广泛&#xff0c;信息安全问题也日益凸显。为了保障…

【方法】如何隐藏和保护Excel表格中的敏感数据?

在工作中&#xff0c;很多人经常需要处理包含敏感信息的Excel表格。 为了确保这些数据的安全性&#xff0c;我们可以通过隐藏单元格、行和列&#xff0c;以及设置密码保护工作表的方法&#xff0c;来保护数据&#xff0c;下面一起来看看吧&#xff01; 一、隐藏数据&#xff1…

idea2020版本下载及注册

一。准备idea2020和BetterIntelliJ插件和补丁key 二、开始安装。 idea就正常安装&#xff0c;然后打开&#xff0c;选择试用30天打开即可&#xff0c;然后File - settings - plugins 点击 Install Plugin from Disk 然后选择BetterIntelliJ这个&#xff0c;这个后期不可变名称…

【数据分析】线性及逻辑回归模型和Python实现

各位大佬好 &#xff0c;这里是阿川的博客&#xff0c;祝您变得更强 个人主页&#xff1a;在线OJ的阿川 大佬的支持和鼓励&#xff0c;将是我成长路上最大的动力 阿川水平有限&#xff0c;如有错误&#xff0c;欢迎大佬指正 Python 初阶 Python–语言基础与由来介绍 Python–…

聆思CSK6大模型+AI交互多模态开源SDK介绍

视觉语音大模型 AI 开发套件( CSK6-MIX )是围绕 CSK6011A 芯片设计的具备丰富语音图像功能与硬件外设的开发板&#xff0c;采用具备丰富组件生态的 Zephyr RTOS作为操作系统&#xff0c;官方提供了十几种开源SDK&#xff0c;包含大模型语音交互、大模型拍照识图、文生图、人脸识…

冒泡排序、选择排序

冒泡排序 按照冒泡排序的思想&#xff0c;我们要把相邻的元素两两比较&#xff0c;当一个元素大于右侧相元素时&#xff0c;交换它们的位置&#xff1b;当一个元素小于或等于右侧相邻元素时&#xff0c;位置不变 大的往右丢&#xff08;往下沉&#xff09;&#xff0c;小的往…

动手学深度学习(Pytorch版)代码实践 -深度学习基础-09过拟合与欠拟合

09过拟合与欠拟合 #通过多项式拟合来探索过拟合和欠拟合 #欠拟合是指模型无法继续减少训练误差。 #过拟合是指训练误差远小于验证误差。 import math import numpy as np import torch from torch import nn from d2l import torch as d2l import liliPytorch as lp#生成数据集…

数据驱动决策:工单统计工具如何赋能企业精准运营

在当今这个数字化飞速发展的时代&#xff0c;企业对于内部运营效率的追求已经达到了前所未有的高度。你是否曾为了繁杂的工单统计管理而头疼不已&#xff1f;是否曾因为无法准确进行工单统计数据而错失商机&#xff1f;今天&#xff0c;我将向你展示一款革命性的工单统计工具&a…

AI从云端到边缘:人员入侵检测算法的技术原理和视频监控方案应用

在当今数字化、智能化的时代&#xff0c;安全已成为社会发展的重要基石。特别是在一些关键领域&#xff0c;如公共安全、智能化监管以及智慧园区/社区管理等&#xff0c;确保安全无虞至关重要。而人员入侵检测AI算法作为一种先进的安全技术&#xff0c;正逐渐在这些领域发挥着不…

MySQl基础入门⑯【操作视图】完结

上一边文章内容 表准备 CREATE TABLE Students (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(100),email VARCHAR(255),major VARCHAR(100),score int,phone_number VARCHAR(20),entry_year INT,salary DECIMAL(10, 2) );数据准备 INSERT INTO Students (id, name, ema…

DolphinScheduler日志乱码、worker日志太多磁盘报警、版本更新导致不兼容怎么办?

作者 | 刘宇星 本文作者总结了在使用Apache DolphinScheduler过程中遇见过的常见问题及其解决方案&#xff0c;包括日志出现乱码、worker日志太多磁盘报警、版本更新导致不兼容问题等&#xff0c;快来看看有没有困扰你想要的答案吧&#xff01; DolphinScheduler集群环境有多…

AI大模型的策略布局:通用广度与垂直深度的融合之道

1. 设计理念&#xff1a; 通用大模型&#xff08;GeneralPurpose Large Models&#xff09;&#xff1a;旨在处理多种类型的任务&#xff0c;不特定于某个领域或应用。它们通常具有广泛的知识和能力&#xff0c;能够理解和生成自然语言、进行图像识别、解决数学问题等。 1. 广…

FileZilla证书过期,导致FileZilla Client联不上,或者老断开的处理

1、先到服务器上去重新生成一下证书&#xff0c;并且覆盖掉老的证书。edit--settings 输入信息&#xff0c;并且确认生成新的证书&#xff1a; 2、Client连接的时候&#xff0c;弹出证书信任&#xff0c;点击确认。 这样第一次连接&#xff0c;然后访问目录全都是好的&#xff…

如何开发盲盒小程序APP——入门指南

一、前言 随着盲盒经济的兴起&#xff0c;越来越多的开发者开始关注如何开发盲盒小程序APP。盲盒小程序不仅能为用户提供新颖的购物体验&#xff0c;还能为商家带来可观的利润。本文将为大家介绍如何入门开发盲盒小程序APP。 二、需求分析 目标用户&#xff1a;明确你的目标…

视频太大了怎么缩小内存

我们在分享视频的时候&#xff0c;有时候会遇到过视频文件太大&#xff0c;无法发送或者上传的情况&#xff0c;别担心&#xff0c;今天我就来给大家分享一个简单有效的方法&#xff0c;让你的视频变得更小&#xff0c;更方便分享&#xff01; 打开 “51视频处理官网 的网站。上…