SpringBoot整合rabbitmq-重复消费问题

说明:重复消费的原因大致是生产者将信息A发送到队列中,消费者监听到消息A后开始处理业务,业务处理完成后,监听在告知rabbitmq消息A已经被消费完成途中中断,也就时说我已经处理完业务,而队列中还存在当前消息A,导致消费者服务恢复后又消费到消息A,出现重复操作的业务。

解决思路:我只要有一个地方记录了消息A已经被消费过了【这个消息必须得设置一个唯一标记】,即使消息A再次被消费时,比对一下,如果有记录则说明消息A已经被消费,如果没有说明没有被消费。

我使用redis及设置redis过期时间来解决重复消费问题。

工程图:

1.pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <artifactId>spring-boot-starter-parent</artifactId>  <!-- 被继承的父项目的构件标识符 -->
    <groupId>org.springframework.boot</groupId>  <!-- 被继承的父项目的全球唯一标识符 -->
    <version>2.2.2.RELEASE</version>  <!-- 被继承的父项目的版本 -->
  </parent>

  <groupId>RabbitmqDemoOne</groupId>
  <artifactId>RabbitmqDemoOne</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>war</packaging>

  <name>RabbitmqDemoOne Maven Webapp</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>
  <dependencies>
    <!--spring boot核心-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!--spring boot 测试-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <!--springmvc web-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!--开发环境调试-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <optional>true</optional>
    </dependency>

    <!--amqp 支持-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!--redis-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.78</version>
    </dependency>
    <!-- commons-lang -->
    <dependency>
      <groupId>commons-lang</groupId>
      <artifactId>commons-lang</artifactId>
      <version>2.5</version>
    </dependency>

    <!--lombok-->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.16.10</version>
    </dependency>
  </dependencies>


  <build>
    <finalName>RabbitmqDemoOne</finalName>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-war-plugin</artifactId>
          <version>3.2.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

2.application.yml

server:
  port: 8080
spring:
  redis:
    host: 127.0.0.1
    port: 6379
  rabbitmq:
    port: 5672
    host: 192.168.199.139
    username: admin
    password: admin
    virtual-host: /

3.RabbitMqConfig

package com.dev.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 李庆伟
 * @title: RabbitMqConfig
 * @date 2024/3/3 14:12
 */
@Configuration
public class RabbitMqConfig {

    /**
     * 队列
     * @return repeatQueue队列名称 true 持久化
     */
    @Bean
    public Queue makeQueue(){
        return new Queue("repeatQueue",true);
    }

}

4.RedisTemplateConfig

package com.dev.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @author 李庆伟
 * @title: RedisTemplateConfig
 * @date 2024/3/3 14:24
 */
@Configuration
public class RedisTemplateConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        // 设置键(key)的序列化采用StringRedisSerializer。
        redisTemplate.setKeySerializer(new StringRedisSerializer());

        //redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());//设置值(value)的序列化采用jdk
        // 设置值(value)的序列化采用FastJsonRedisSerializer。
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        //redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        //redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);

        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }


}

5.RabbitRepeatController

package com.dev.controller;

import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @author 李庆伟
 * @title: RabbitRepeatContoller
 * @date 2024/3/3 14:05
 */
@RestController
@RequestMapping("repeatQueue")
public class RabbitRepeatContoller {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法


    /**
     * 测试
     * @return
     */
    @GetMapping("/sendMessage")
    public String sendMessage() {
        for (int i = 0; i < 1000; i++) {
            String id = UUID.randomUUID().toString().replace("-","");
            Map<String,Object> map = new HashMap<>();
            map.put("id",id);
            map.put("name","张龙");
            map.put("phone","123..11");
            map.put("num",i);
            String str = JSONObject.toJSONString(map);
            Message msg = MessageBuilder.withBody(str.getBytes()).setMessageId(id).build();
            rabbitTemplate.convertAndSend("", "repeatQueue", msg);
        }
        return "ok";
    }

}

6.RabbitMqListener

package com.dev.listener;

import com.alibaba.fastjson.JSON;
import com.dev.utils.RedisUtil;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.Map;

/**
 * @author 李庆伟
 * @title: RabbitMqListener
 * @date 2024/3/3 14:13
 */
@Component
public class RabbitMqListener {

    @Autowired
    private RedisUtil redisUtil;

    @RabbitListener(queues = "repeatQueue")
    @RabbitHandler
    public void process(Message msg) throws UnsupportedEncodingException {

        //获取在发送消息时设置的唯一id
        String id = msg.getMessageProperties().getMessageId();
        
        //去redis中查看是否有记录,如果有证明已经消费过了
        String val = redisUtil.get(id);

        if(StringUtils.isNotEmpty(val)){
            return;
        }
        String str = new String(msg.getBody(),"utf-8");
        if(StringUtils.isNotEmpty(str)){
            Map<String,Object> map = JSON.parseObject(str,Map.class);
            System.out.println(map.get("num")+"----"+map.get("id")+"----"+map.get("name")+"----"+map.get("phone"));
            //将消费过的消息记录到redis中,失效时间为1个小时
            redisUtil.set(id,id,3600L);
            System.out.println("----------");
        }


    }

}

7.RedisUtil

package com.dev.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundZSetOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * @author 李庆伟
 * @title: RedisUtil
 * @date 2024/3/3 14:27
 */

@Component
public class RedisUtil {

    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 批量删除对应的value
     *
     * @param keys
     */
    public void remove(final String... keys) {
        for (String key : keys) {
            remove(key);
        }
    }
    /**
     * 批量删除key
     *
     * @param pattern
     */
    public void removePattern(final String pattern) {
        Set<Serializable> keys = redisTemplate.keys(pattern);
        if (keys.size() > 0)
            redisTemplate.delete(keys);
    }
    /**
     * 删除对应的value
     *
     * @param key
     */
    public void remove(final String key) {
        if (exists(key)) {
            redisTemplate.delete(key);
        }
    }
    /**
     * 判断缓存中是否有对应的value
     *
     * @param key
     * @return
     */
    public boolean exists(final String key) {
        return redisTemplate.hasKey(key);
    }
    /**
     * 读取缓存
     *
     * @param key
     * @return
     */
    public String get(final String key) {
        Object result = null;
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
        result = operations.get(key);
        if(result==null){
            return null;
        }
        return result.toString();
    }
    /**
     * 写入缓存
     *
     * @param key
     * @param value
     * @return
     */
    public boolean set(final String key, Object value) {
        boolean result = false;
        try {
            ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
            operations.set(key, value);
            result = true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }
    /**
     * 写入缓存
     *
     * @param key
     * @param value
     * @return
     */
    public boolean set(final String key, Object value, Long expireTime) {
        boolean result = false;
        try {
            ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
            operations.set(key, value);
            redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
            result = true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }

    public  boolean hmset(String key, Map<String, String> value) {
        boolean result = false;
        try {
            redisTemplate.opsForHash().putAll(key, value);
            result = true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }

    public  Map<String,String> hmget(String key) {
        Map<String,String> result =null;
        try {
            result=  redisTemplate.opsForHash().entries(key);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }

    /**
     * 递增
     *
     * @param key 键
     * @paramby 要增加几(大于0)
     * @return
     */
    public long incr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递增因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, delta);
    }

    /**
     * 递减
     *
     * @param key 键
     * @paramby 要减少几(小于0)
     * @return
     */
    public long decr(String key, long delta) {
        if (delta < 0) {
            throw new RuntimeException("递减因子必须大于0");
        }
        return redisTemplate.opsForValue().increment(key, -delta);
    }

    /**
     * redis zset可已设置排序(案例,热搜)
     *
     * @param key 键
     * @paramby
     * @return
     */
    public void zadd(String key ,String name) {
        BoundZSetOperations<Object, Object> boundZSetOperations = redisTemplate.boundZSetOps(key);
        //自增长后的数据
        boundZSetOperations.incrementScore(name,1);
    }


}

8.App

package com.dev;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author 李庆伟
 * @title: App
 * @date 2024/3/3 14:01
 */
@SpringBootApplication
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/425389.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【C++】类的默认成员函数(上)

&#x1f525;博客主页&#xff1a; 小羊失眠啦. &#x1f3a5;系列专栏&#xff1a;《C语言》 《数据结构》 《C》 《Linux》 《Cpolar》 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 文章目录 一、默认成员函数二、构造函数构造函数的概念及特性 三、析构函数析构函数的特性…

#QT(DEMO2-登录界面)

1.IDE&#xff1a;QTCreator 2.实验&#xff1a;DEMO登录 3.记录 Line Edit输入不换行 密码框输入如下设置: 运行效果 4.代码

使用Matplotlib绘制圆环图

圆环图是饼图的修改版&#xff0c;中间区域被切掉。圆环图更关注使用弧的面积来以最有效的方式表示信息&#xff0c;而不是饼图&#xff0c;饼图更关注比较切片之间的比例面积。圆环图在空间方面更有效&#xff0c;因为圆环图内部的空白空间可用于显示有关圆环图的一些附加信息…

【C++那些事儿】深入理解C++类与对象:从概念到实践(中)| 默认构造函数 | 拷贝构造函数 | 析构函数 | 运算符重载 | const成员函数

&#x1f4f7; 江池俊&#xff1a; 个人主页 &#x1f525;个人专栏&#xff1a; ✅数据结构冒险记 ✅C那些事儿 &#x1f305; 有航道的人&#xff0c;再渺小也不会迷途。 文章目录 1. 类的6个默认成员函数2. 构造函数2.1 概念2.2 特性 3. 析构函数3.1 概念3.2 特性 4. 拷贝…

flutter 文字一行显示,超出换行

因为app有多语言&#xff0c;中文和其他语言长度不一致&#xff0c;可能导致英文会很长。 中文样式 英文样式 代码 Row(mainAxisAlignment: MainAxisAlignment.end,crossAxisAlignment: CrossAxisAlignment.end,children: [Visibility(visible: controller.info.fee ! null,ch…

产品经理岗位的任职资格和职业规划

产品经理主要是商业银行以客户为导向的&#xff0c;具体负责组织银行某一金融产品线的创新设计、生产营销和管理服务的工作。这类人士主要负责应用实施工作&#xff0c;其中产品线由一系列的产品构成&#xff0c;公司的产品经理主要分为全过程产品创新设计专家、全过程产品生产…

驾驭栈和队列,首先逃不开这些(有效括号、栈和队列相互模拟、循环队列)

一.有效括号 . - 力扣&#xff08;LeetCode&#xff09; 给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 左括号必须用相同类型的右括号闭合。左括号必须…

力扣74. 搜索二维矩阵(二分查找)

Problem: 74. 搜索二维矩阵 文章目录 题目描述思路复杂度Code 题目描述 思路 思路1&#xff1a;映射为一维数组二分查找 1.由于题目矩阵中的元素整体是升序的&#xff0c;我们可以将其放置在一个大小为 m n m \times n mn的一维数组array中进行二分查找 2.对应的映射关系是ar…

微信小程序,h5端自适应登陆方式

微信小程序端只显示登陆(获取opid),h5端显示通过账户密码登陆 例如: 通过下面的变量控制: const isWeixin ref(false); // #ifdef MP-WEIXIN isWeixin.value true; // #endif

(C语言)qsort函数模拟实现

前言 我们需先了解qsort函数 qsort函数详解&#xff1a;http://t.csdnimg.cn/rTNv9 qsort函数可以排序多种数据类型&#xff0c;很是神奇&#xff0c;这是为什么&#xff0c;我们在里模拟实现这样的功能 目录 1. qsort函数模拟实现 2. 我们使用bubble_sort函数排序整形数…

ruoyi 图片等文件资源读取

老是忘&#xff0c;记录一下 ResourcesConfig 文件下 /** 本地文件上传路径 */ registry.addResourceHandler(Constants.RESOURCE_PREFIX "/**").addResourceLocations("file:" RuoYiConfig.getProfile() "/"); /*** 资源映射路径 前缀*/ …

【Web安全靶场】xss-labs-master 1-20

xss-labs-master 其他靶场见专栏 文章目录 xss-labs-masterlevel-1level-2level-3level-4level-5level-6level-7level-8level-9level-10level-11level-12level-13level-14level-15level-16level-17level-18level-19level-20 level-1 第一关没有进行任何限制&#xff0c;get请求…

python复盘(1)

1、变量名的命名规则 #3、变量名的命名规则&#xff1a;可以用中文作为变量名&#xff1b;其他和go语言一样 # 变量名可以用数字、字母、下划线组成&#xff0c;但是数字不能作为开头 # 变量名不能使用空格&#xff0c;不能使用函数名或关键字 # 变量名最好能表示出他的作用2、…

Qt/事件分发器/事件过滤器

事件分发器 //事件分发器bool event(QEvent* e); //事件分发器&#xff1a;&#xff1a;用途 分发事件 bool MyLabel::event(QEvent* e) {if(e->type() QEvent::MouseButtonPress){//如果是鼠标摁下 拦截事件 不向下分发QMouseEvent* ev static_cast<QMouseEvent*>…

python类型注解,多态详解

目录 1.类型注解 1.变量的类型注解 2.函数&#xff08;方法&#xff09;的类型注解 3.union类型 2.多态 抽象类 1.类型注解 1.变量的类型注解 #基础数据类型注解 import randomv1 : int 10 v2 : str "aaa" v3 : bool True#类对象类型注解 class student:p…

基于springboot+vue的公寓报修管理系统

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

奇舞周刊第521期:“一切非 Rust 项目均为非法”

奇舞推荐 ■ ■ ■ 拜登&#xff1a;“一切非 Rust 项目均为非法” 科技巨头要为Coding安全负责。这并不是拜登政府对内存安全语言的首次提倡。“程序员编写代码并非没有后果&#xff0c;他们的⼯作⽅式于国家利益而言至关重要。”白宫国家网络总监办公室&#xff08;ONCD&…

6020一拖二快充线:手机充电的革命性创新

在快节奏的现代生活中&#xff0c;手机已不仅仅是一个通讯工具&#xff0c;更是我们工作、学习和娱乐的得力助手。然而&#xff0c;手机的电量问题一直是困扰着我们的难题。为了解决这个问题&#xff0c;市场上出现了一种名为“一拖二快充线”的充电设备&#xff0c;它不仅具备…

用JavaScript动态提取视频中的文字

现阶段整个社会短视频&#xff0c;中视频为王&#xff0c;文字传播虽然被弱化&#xff0c;但在业务中还是有一定的传播价值&#xff0c;今天就来讲一讲如何使用js动态提取视频中的字幕。 先来看看效果&#xff1a; 屏幕录制2024-02-29 15.40.18 一&#xff0c;tesseract.js介…

SpringMVC 学习(十一)之数据校验

目录 1 数据校验介绍 2 普通校验 3 分组校验 4 参考文档 1 数据校验介绍 在实际的项目中&#xff0c;一般会有两种校验数据的方式&#xff1a;客户端校验和服务端校验 客户端校验&#xff1a;这种校验一般是在前端页面使用 JS 代码进行校验&#xff0c;主要是验证输入数据…