Rabbitmq消息丢失-生产者消息丢失(一)

说明:消息生产者在将数据发送到Mq的时候,可能由于网络等原因造成数据投递失败。

消息丢失大致分三种:这里说的是生产者消息丢失

分析原因:

1.有没有一种可能,我刚发送消息,消息还没有到交换机就断网了,是不是消息就没有发送成功,这个时候如果不对这种情况处理,消息是不是就丢失了

2.又有没有一种可能,我又发送了一条消息,交换机拿到消息后正要发送给某个队列,就是你,你把那个队列给删掉了,这个时候消息找不到队列,消息就也丢失了

解决方法:

1.事务:Rabbitmq提供了事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

缺点:RabbitMQ 事务机制是同步的,提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,太耗性能了

2.confirm机制:相比于事务的同步,confirm机制是异步的,你发送完这个消息之后就可以发送下一个消息,RabbitMQ 接收了之后会异步回调confirm接口通知你这个消息接收到了。一般在生产者这块解决数据丢失,建议使用 confirm 机制。

话不多说,干代码

工程图:

1.pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <parent>
    <artifactId>spring-boot-starter-parent</artifactId>  <!-- 被继承的父项目的构件标识符 -->
    <groupId>org.springframework.boot</groupId>  <!-- 被继承的父项目的全球唯一标识符 -->
    <version>2.2.2.RELEASE</version>  <!-- 被继承的父项目的版本 -->
  </parent>

  <groupId>MqLossDemo</groupId>
  <artifactId>MqLossDemo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>war</packaging>

  <name>MqLossDemo Maven Webapp</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <!--spring boot核心-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--spring boot 测试-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <!--springmvc web-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--开发环境调试-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <optional>true</optional>
    </dependency>
    <!--amqp 支持-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!--redis-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
      <version>2.1.7.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.78</version>
    </dependency>

    <dependency>
      <groupId>commons-lang</groupId>
      <artifactId>commons-lang</artifactId>
      <version>2.6</version>
    </dependency>

    <!--lombok-->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.16.10</version>
    </dependency>
  </dependencies>

  <build>
    <finalName>MqLossDemo</finalName>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-war-plugin</artifactId>
          <version>3.2.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

2.application.yml

server:
  port: 8080
spring:
  rabbitmq:
    port: 5672
    host: 你的 rabbitmq IP
    username: admin
    password: admin
    virtual-host: /
    # 发送者开启 confirm 确认机制
    publisher-confirm-type: correlated
    # 发送者开启 return 确认机制
    publisher-returns: true
    template:
      #在配置文件中配置 mandatory: true 页无用,需要在RabbitTemplate中手动设置
      mandatory: true

3.RabbitMqConfig

package com.dev.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 类名称:
 *
 * @author 李庆伟
 * @date 2024年03月04日 14:12
 */
@Configuration
public class RabbitMqConfig {

    @Bean
    public ConfirmCallbackService confirmCallbackService() {
        return new ConfirmCallbackService();
    }

    @Bean
    public ReturnCallbackService returnCallbackService() {
        return new ReturnCallbackService();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(@Autowired CachingConnectionFactory factory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);

        //生产者发送消息到Mq交换机回执,手动ack回执回调处理
        //可以理解为:消息推送到server,但是在server里找不到交换机
        //如果想看效果【先清除交换机和队列】:在工程运行前注释掉RabbitMqQueueConfig类中的directExchange和bindingDirect方法
        rabbitTemplate.setConfirmCallback(confirmCallbackService());

        //生产者发送消息到Mq,交换机发送到队列回执,一定要设置手动设置Mandatory(true),配置文件中不生效
        //可以理解为:消息推送到server,但是在server里找不到队列
        //如果想看效果【先清除交换机和队列】:如果之前看过setConfirmCallback效果,先去掉RabbitMqQueueConfig类中注释
        //           在工程运行前注释掉RabbitMqQueueConfig类中的directQueue和bindingDirect方法
        rabbitTemplate.setReturnCallback(returnCallbackService());
        rabbitTemplate.setMandatory(true);

        return rabbitTemplate;
    }

    //生产者发送消息到Mq交换机回执 //可以理解为:消息推送到server,但是在server里找不到交换机
    class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                //log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData, ack, cause);
                System.out.println(correlationData);
                System.out.println(ack);
                System.out.println(cause);
                System.out.println("--------");
            } else {
                System.out.println("消息发送异常!");
                //可以进行重发等操作
                //这里可以处理失败的业务
            }
        }
    }

    //生产者发送的消息到Mq队列回执 //可以理解为:消息推送到server,但是在server里找不到队列
    class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

        //public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        @Override
        public void returnedMessage(Message message, int i, String s, String s1, String s2) {
            System.out.println(message.getMessageProperties().getMessageId());
            System.out.println(new String(message.getBody()));
            System.out.println(i);
            System.out.println(s);
            System.out.println(s1);
            System.out.println(s2);
            //可以将消息存储到一个新的位置,这里可以处理失败的业务
        }
    }

}

4.RabbitMqQueueConfig

package com.dev.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 类名称:
 *
 * @author 李庆伟
 * @date 2024年03月04日 14:12
 */
@Configuration
public class RabbitMqQueueConfig {

    //绑定键
    public final static String QUEUE_ONE = "loss_queue";

    public final static String EXCHANGE_ONE = "loss_exchange";



    @Bean
    public Queue directQueue() {
        return new Queue(RabbitMqQueueConfig.QUEUE_ONE);
    }

    //Direct交换机 起名:directExchange
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(RabbitMqQueueConfig.EXCHANGE_ONE,true,false);
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:directRoutingKey
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRoutingKey");
    }


}

5.RabbitContoller

package com.dev.controller;

import com.alibaba.fastjson.JSONObject;
import com.dev.config.RabbitMqQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;


/**
 * 类名称:消息丢失问题
 *
 * @author lqw
 * @date 2024年02月27日 14:47
 */
@Slf4j
@RestController
@RequestMapping("loss")
public class RabbitController {


    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法


    /**
     * 消息丢失
     * @return
     */
    @GetMapping("/sendMessage")
    public String sendMessage() {
        String id = UUID.randomUUID().toString().replace("-","");
        Map<String,Object> map = new HashMap<>();
        map.put("id",id);
        map.put("name","张龙");
        Message msg = MessageBuilder.withBody(JSONObject.toJSONString(map).getBytes()).setMessageId(id).build();

        rabbitTemplate.convertAndSend(RabbitMqQueueConfig.EXCHANGE_ONE, "directRoutingKey", msg);
        return "ok";
    }





}

6.App

package com.dev;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 类名称:
 *
 * @author 李庆伟
 * @date 2024年03月04日 14:11
 */
@SpringBootApplication
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class);
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/429702.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

超全Chat GPT论文修改指令

文献综述指令润色修改指令论文选题指令论文大指令研究理论指令论文致谢指令参考文献指令论文润色整体逻辑论文整体优化提问指令 1&#xff0e;文献综述指令 请你帮我写一份关于&#xff08;研究主题&#xff09;的文献综述。我的论文选题方向是 XXXX &#xff0c;我已经找到了…

JS逆向进阶篇【去哪儿旅行登录】【下篇-逆向Bella参数JS加密逻辑Python生成】

目录&#xff1a; 每篇前言&#xff1a;引子——本篇目的1、 代码混淆和还原&#xff08;1&#xff09;单独替换&#xff1a;&#xff08;2&#xff09;整个js文件替换&#xff1a; 2、算法入口分析3、 深入分析&#xff08;0&#xff09;整体分析&#xff1a;&#xff08;1&am…

前后端分离项目Docker部署指南(上)

目录 前言 一.搭建局域网 1.搭建net-ry局域网&#xff0c;用于部署若依项目 2.注意点 二.安装redis 创建目录 将容器进行挂载 ​编辑 测试是否安装成功 ​编辑 三. 安装MySQL 创建文件夹 上传配置文件并且修改 .启动MySQL容器服务 充许远程连接 四.部署后端 使用…

linux 交叉编译curl(+openssl)

一、交叉编译openssl 参考博客&#xff1a;点击跳转 二、交叉编译curl 1、源码下载 地址&#xff1a;点击跳转 2、配置 CPPFLAGS"-I/home/gui/gui/openssl/build_arm/include" LDFLAGS"-L/home/gui/gui/openssl/build_arm/lib" LIBS"-ldl" \ …

Android之Handler原理解析与问题分享

一、Handler运行原理剖析 1.关系剖析图 如果把整个Handler交互看做一个工厂&#xff0c;Thread就是动力MessageQueue是履带Looper是转轴Loooper的loop方法就是开关&#xff0c;当调用loop方法时整个工厂开始循环工作&#xff0c;处理来自send和post提交到MessageQueue的消息&a…

使用Javassist 在android运行时生成类

序言 最近在写框架&#xff0c;有一个需求就是动态的生成一个类&#xff0c;然后查阅了相关文献&#xff0c;发现在android中动态生成一个类还挺麻烦。因次把一些内容分享出来&#xff0c;帮助大家少走弯路。 方案一 DexMaker DexMaker 是一个针对 Android 平台的库&#xf…

游戏引擎用什么语言开发上层应用

现在主流的游戏引擎包括&#xff1a; 1、Unity3D&#xff0c;C#语言&#xff0c;优点在于支持几乎所有平台 丹麦创立的一家公司&#xff0c;现已被微软收购。在中国市场占有率最高&#xff0c;也是社群很强大&#xff0c;一般解决方案也能在网上找到&#xff0c;教程丰富。物理…

.md转pdf

1、使用vscode安装Markdown PDF Markdown PDF 打开预览转pdf,同目录下自动生成pdf文件

稀碎从零算法笔记Day5-LeetCode:多数元素

题型&#xff1a;数组、计数、排序、STL函数、查找众数 链接&#xff1a;169. 多数元素 - 力扣&#xff08;LeetCode&#xff09; 来源&#xff1a;LeetCode 著作权归作者所有。商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处。 题目描述 给定一个大小为 n …

30分钟做200多张报表的金蝶云星空BI方案来了

曾经一张报表都要做好久&#xff0c;但现在&#xff0c;200多张的BI数据分析报表只需30分钟就能完成&#xff01;BI智能数据分析的高效性在这一刻具象化了。奥威-金蝶云星空BI方案&#xff0c;一套注册、下载、执行&#xff0c;即见效果的标准化BI数据分析方案。 30分钟&#…

【S32DS报错】-7-程序进入HardFault_Handler,无法正常运行

【S32K3_MCAL从入门到精通】合集&#xff1a; S32K3_MCAL从入门到精通https://blog.csdn.net/qfmzhu/category_12519033.html 问题背景&#xff1a; 在S32DS IDE中使用PEmicro&#xff08;Multilink ACP&#xff0c;Multilink Universal&#xff0c;Multilink FX&#xff09…

智能驾驶规划控制理论学习06-基于优化的规划方法之数值优化基础

目录 一、优化概念 1、一般优化问题 2、全局最优和局部最优 二、无约束优化 1、无约束优化概述 2、梯度方法 通用框架 线性搜索 回溯搜索 3、梯度下降 基本思想 实现流程 ​4、牛顿法 基本思想 实现流程 5、高斯牛顿法 6、LM法&#xff08;Le…

java数据结构与算法刷题-----LeetCode637. 二叉树的层平均值

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 1. 广度优先2. 深度优先 解题思路&#xff1a;时间复杂度O(n)&am…

网络基础(二)

目录 再谈"协议" 序列化 JSON 网络版计算器 HTTP协议 认识URL urlencode和urldecode HTTP协议格式 telnet指令 stat函数 struct stat类型 stringstream类型 wget指令 HTTP的方法 HTTP的状态码 传输层 再谈端口号 端口号范围划分 认识知名端口号(W…

深度学习_16_权重衰退调整过拟合

所谓过拟合即模型复杂度较高&#xff0c;但用于训练数据集过于简单&#xff0c;最后导致模型将过多无用渣质作为学习对象 这个在上篇 深度学习_15_过拟合&欠拟合 已经详细介绍&#xff0c;以下便不再赘述。 上篇提到要想解决过拟合现象可以试着降低模型复杂度&#xff0c…

Python web框架fastapi中间件与CORS详细教学

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;Fastapi 景天的主页&#xff1a;景天科技苑 文章目录 fastapi中间件与CORS1、中间件1.创建中间件方法2.中间件里面添加响应头…

抖音视频评论批量采集软件|视频下载工具

《轻松搞定&#xff01;视频评论批量采集软件&#xff0c;助您高效工作》 在短视频这个充满活力和创意的平台上&#xff0c;了解用户评论是了解市场和观众心声的重要途径之一。为了帮助您快速获取大量视频评论数据&#xff0c;我们推出了一款操作便捷、功能强大的软件&#xff…

第一弹:Flutter安装和配置

目标&#xff1a; 1&#xff09;配置Flutter开发环境 2&#xff09;创建第一个Flutter Demo项目 Flutter中文开发者网站&#xff1a; https://flutter.cn/ 一、配置Flutter开发环境 Flutter开发环境已经提供集成IDE开发环境&#xff0c;因此需要配置开发环境的时候&#xf…

【STM32】STM32学习笔记-读写内部FLASH 读取芯片ID(49)

00. 目录 文章目录 00. 目录01. FLASH概述02. 读写内部FLASH接线图03. 读写内部FLASH相关API04. 读写内部FLASH程序示例05. 读写芯片ID接线图06. 读写芯片ID程序示例07. 程序示例下载08. 附录 01. FLASH概述 STM32F10xxx内嵌的闪存存储器可以用于在线编程(ICP)或在程序中编程(…

Spark中读parquet文件是怎么实现的

背景 最近在整理了一下 spark对Parquet的写文件的过程&#xff0c;也是为了更好的理解和调优Spark相关的任务&#xff0c; 因为对于Spark来说&#xff0c;任何一个事情都不是独立的存在的&#xff0c;比如说parquet文件的rowgroup设置的大小对读写的影响&#xff0c;以及parqu…