【RabbitMQ实战】邮件发送(直连交换机、手动ack)

一、实现思路

二、异常情况测试现象及解决

在这里插入图片描述

说明:本文涵盖了关于RabbitMQ很多方面的知识点, 如:
消息发送确认机制 、消费确认机制 、消息的重新投递 、消费幂等性,

二、实现思路
1.简略介绍163邮箱授权码的获取
2.编写发送邮件工具类
3.编写RabbitMQ配置文件
4.生产者发起调用
5.消费者发送邮件
6.定时任务定时拉取投递失败的消息, 重新投递
7.各种异常情况的测试验证
8.拓展: 使用动态代理实现消费端幂等性验证和消息确认(ack)

三、 代码实现

配置版本如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.atguigu.gulimall</groupId>
    <artifactId>provider-and-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>provider-and-consumer</name>
    <description>Demo project for Spring Boot</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>1.8</java.version>
        <!--        <spring-cloud.version>2021.0.4</spring-cloud.version>-->
        <spring-cloud.version>2021.0.1</spring-cloud.version>
    </properties>
    <dependencies>
        <!--joda time  ? 这个还有些问题,这个类库是做什么的-->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.10</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>
        <dependency>
            <groupId>com.atguigu.gulimall</groupId>
            <artifactId>gulimall-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
            <exclusions>
                <exclusion>
                    <artifactId>servlet-api</artifactId>
                    <groupId>javax.servlet</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--什么作用? -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.4.2</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/java</directory><!--所在的目录-->
                <includes><!--包括目录下的.properties,.xml文件都会扫描到-->
                    <include>**/*.properties</include>
                    <include>**/*.xml</include>
                </includes>
                <filtering>false</filtering>
            </resource>
        </resources>
    </build>

</project>

完整代码可以参考我的GitHub, https://gitee.com/zhai_jiahao/gulimall

代码实现
1.163邮箱授权码的获取, 如图:
在这里插入图片描述
每次启用授权码的时候,就会出现一行字符串,其实就是三方发送邮件的时候,使用的密码(该授权码就是配置文件spring.mail.password需要的密码)

项目结构
在这里插入图片描述

1、rabbitmq、邮箱配置:

server:
  port: 8023

#数据源配置
spring:
  datasource:
    url: jdbc:mysql://192.168.56.10:3306/gulimall_ums
    username: root
    password: root
    driver-class-name:  com.mysql.cj.jdbc.Driver
  #配置nacos
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
  #配置服务名称
  application:
    name: provider-and-consumer
  # 配置rabbitMq 服务器
  #spring.application.name=rabbitmq-consumer-true
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    #虚拟host 可以不设置,使用server默认host
    virtual-host: /
    publisher-returns: true  #确认消息已发送到队列(Queue)  这个在生产者模块配置 这个后期再配置,这会还用不到
    publisher-confirm-type: correlated   #确认消息已发送到交换机(Exchange) 这个在生产者模块配置 这个后期再配置,这会还用不到
    listener:  #这个在测试消费多个消息的时候,不能有下面这些配置,否则只能消费一个消息后就不继续消费了
      simple:
        acknowledge-mode: manual  #指定MQ消费者的确认模式是手动确认模式  这个在消费者者模块配置  设置手动确认(ack)
        prefetch: 1 #一次只能消费一条消息   这个在消费者者模块配置

  #配置mail
  mail:
    host: smtp.163.com
    username: 15131650119@163.com
    from: 15131650119@163.com
    password: GTMCFUFBTNZERDJA
    default-encoding: UTF-8
    properties:
      mail:
        stmp:
          auth: true
          starttls:
            enable: true
            required: true


#配置日志输出级别
logging:
  level:
    com.atguigu.gulimall: debug   #level 日志等级 指定命名空间的日志输出
  pattern:
    console: "%d %-5level %logger : %msg%n"
    file: "%d %-5level [%thread] %logger : %msg%n"
  file:
    name: d://spring/log



说明: password即授权码, username和from要一致

2、表结构

CREATE TABLE `msg_log` (
  `msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',
  `msg` text COMMENT '消息体, json格式化',
  `exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机',
  `routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键',
  `status` int(11) NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费',
  `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
  `next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`msg_id`),
  UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递日志';

select * from msg_log t order by t.create_time  desc;



说明: exchange routing_key字段是在定时任务重新投递消息时需要用到的

后面会用到的sql(设置时区使用)

#查询需要定时任务处理的数据
select msg_id, msg, exchange, routing_key, status, try_count,
next_try_time, create_time, update_time,SYSDATE(), now()  from msg_log where status = 0 and next_try_time <= now() 

#设置时区
SELECT @@global.time_zone;
SET GLOBAL time_zone = 'Asia/Shanghai';

3、启动类、服务接口、服务接口实现类

启动类ProviderAndConsumerApplication

package com.atguigu.gulimall.providerconsumer;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableScheduling;


/**
 * MQ消息发送邮件功能实战(博客地址:https://blog.csdn.net/onceing/article/details/126407845)
 */

@EnableScheduling   //设置能使用定时任务
@EnableDiscoveryClient
@SpringBootApplication
@MapperScan("com.atguigu.gulimall.providerconsumer.mapper")
public class ProviderAndConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProviderAndConsumerApplication.class, args);
    }

}

4、TestController 向队列中入消息的入口

	package com.atguigu.gulimall.providerconsumer.controller;

import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.Errors;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 *
 * 测试入库控制器类
 * @author: jd
 * @create: 2024-06-28
 */

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {

    @Autowired
    private TestService testService;


    /**
     * 发送邮件
     * @param mail 邮件对象
     * @param errors JSR303验证结果错误对象  ,(猜测是可以拿到验证的错误信息的用于返回校验的提示)
     * @return
     */
    @PostMapping("/send")
    public ServerResponse sendMail(@RequestBody @Validated Mail mail, Errors errors){
        if(errors.hasErrors()){
            String defaultMessage = errors.getFieldError().getDefaultMessage();
            return ServerResponse.error(defaultMessage);
        }
        return testService.send(mail);
    }

}

5、消息生产接口 TestService.java

package com.atguigu.gulimall.providerconsumer.service;

import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;

/**
 * 消息生产接口
 */
public interface TestService {
    ServerResponse testIdempotence();

    ServerResponse accessLimit();

    ServerResponse send(Mail mail);

}

TestServiceImpl.java

package com.atguigu.gulimall.providerconsumer.service.impl;

import com.atguigu.gulimall.providerconsumer.common.ResponseCode;
import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
 * 消息生产接口实现类
 * @author: jd
 * @create: 2024-06-27
 */
@Service
@Slf4j
public class TestServiceImpl  implements TestService {

    @Autowired
    private MsgLogMapper msgLogMapper;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public ServerResponse testIdempotence() {
        return ServerResponse.success("testIdempotence: success");
    }

    @Override
    public ServerResponse accessLimit() {
        return ServerResponse.success("accessLimit: success");
    }

    @Override
    public ServerResponse send(Mail mail) {
        // 1. 生产唯一业务标识
        String msgId = String.valueOf(UUID.randomUUID());  //业务的唯一标识
        mail.setMsgId(msgId);

        //2.记录日志
        MsgLog msgLog = new MsgLog(msgId, mail, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);
        msgLogMapper.insertMsgLog(msgLog);// 消息入库  先记录日志

        //3.真正发送消息到MQ中
        CorrelationData correlationData = new CorrelationData(msgId);
        rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME,
                MessageHelper.objToMsg(mail), correlationData);// 发送消息

        log.info("====================>消息已发送队列");
        //返回公共的响应结果
        return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());
    }
}

MsgLogMapper.java

package com.atguigu.gulimall.providerconsumer.mapper;

import com.atguigu.gulimall.providerconsumer.batch.BatchProcessMapper;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import org.apache.ibatis.annotations.Mapper;

import java.util.List;

/**
 * 日志操作mapper接口
 */
@Mapper
public interface MsgLogMapper  extends BatchProcessMapper<MsgLog> {

    /**
     * 记录消息日志
     * @param msgLog
     */
    void insertMsgLog(MsgLog msgLog);

    /**
     * 更新消息日志状态
     * @param msgLog
     */
    void updateStatus(MsgLog msgLog);

    /**
     * 查询超时消息
     * @return
     */
    List<MsgLog> selectTimeoutMsg();

    /**
     * 更新尝试的次数
     * @param msgLog
     */
    void updateTryCount(MsgLog msgLog);

    /**
     * 通过主键筛选出消息日志对象
     * @param msgId
     * @return
     */
    MsgLog selectByPrimaryKey(String msgId);

}

MsgLogMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper" >
    <resultMap id="BaseResultMap" type="com.atguigu.gulimall.providerconsumer.pojo.MsgLog" >
        <id column="msg_id" property="msgId" jdbcType="VARCHAR" />
        <result column="msg" property="msg" jdbcType="VARCHAR" />
        <result column="exchange" property="exchange" jdbcType="VARCHAR" />
        <result column="routing_key" property="routingKey" jdbcType="VARCHAR" />
        <result column="status" property="status" jdbcType="INTEGER" />
        <result column="try_count" property="tryCount" jdbcType="INTEGER" />
        <result column="next_try_time" property="nextTryTime" jdbcType="TIMESTAMP" />
        <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
        <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
    </resultMap>

    <sql id="Base_Column_List" >
        msg_id, msg, exchange, routing_key, status, try_count, next_try_time, create_time, update_time
    </sql>


    <insert id="insertMsgLog" parameterType="com.atguigu.gulimall.providerconsumer.pojo.MsgLog">
        INSERT INTO msg_log(msg_id, msg, exchange, routing_key, status, try_count, next_try_time, create_time, update_time)
        VALUES (#{msgId}, #{msg}, #{exchange}, #{routingKey}, #{status}, #{tryCount}, #{nextTryTime}, #{createTime}, #{updateTime})
    </insert>

    <update id="updateStatus" parameterType="com.atguigu.gulimall.providerconsumer.pojo.MsgLog">
        update msg_log set status = #{status}, update_time = now()
        where msg_id = #{msgId}
    </update>

    <select id="selectTimeoutMsg" resultMap="BaseResultMap">
        select <include refid="Base_Column_List"/>
        from msg_log
        where status = 0
        and next_try_time &lt;= now()
    </select>

    <update id="updateTryCount">
        update msg_log set try_count = try_count + 1, next_try_time = #{nextTryTime}, update_time = now()
        where msg_id = #{msgId}
    </update>

    <select id="selectByPrimaryKey" parameterType="java.lang.String" resultMap="BaseResultMap">
        select
        <include refid="Base_Column_List" />
        from msg_log
        where msg_id = #{msgId,jdbcType=VARCHAR}
    </select>
</mapper>

MsgLogService.java

package com.atguigu.gulimall.providerconsumer.service;



import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;

import java.util.Date;
import java.util.List;

/**
 * 日志记录接口类
 */
public interface MsgLogService {

    void updateStatus(String msgId, Integer status);

    MsgLog selectByMsgId(String msgId);

    List<MsgLog> selectTimeoutMsg();

    void updateTryCount(String msgId, Date tryTime);
}

MsgLogServiceImpl.java 消息日志操作实现类

package com.atguigu.gulimall.providerconsumer.service.impl;

import com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import com.atguigu.gulimall.providerconsumer.util.JodaTimeUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;

/**
 * 消息日志操作实现类
 * @author: jd
 * @create: 2024-06-27
 */
@Service
public class MsgLogServiceImpl implements MsgLogService {

    @Autowired
    private MsgLogMapper msgLogMapper;

    @Override
    public void updateStatus(String msgId, Integer status) {
        MsgLog msgLog = new MsgLog();
        msgLog.setMsgId(msgId);
        msgLog.setStatus(status);
        msgLog.setUpdateTime(new Date());
        msgLogMapper.updateStatus(msgLog);
    }

    @Override
    public MsgLog selectByMsgId(String msgId) {
        return msgLogMapper.selectByPrimaryKey(msgId);
    }

    @Override
    public List<MsgLog> selectTimeoutMsg() {
        return msgLogMapper.selectTimeoutMsg();
    }

    @Override
    public void updateTryCount(String msgId, Date tryTime) {
        //获取下一次重发发送时间,上一次发送时间 加一分钟
        Date nextTryTime = JodaTimeUtil.plusMinutes(tryTime, 1);

        //构建消息对象
        MsgLog msgLog = new MsgLog();
        msgLog.setMsgId(msgId);
        msgLog.setNextTryTime(nextTryTime);  //设置下一次消息重发时间

        msgLogMapper.updateTryCount(msgLog);
    }
}

通用BatchProcessMapper.java 所有的mapper可以继承的

package com.atguigu.gulimall.providerconsumer.batch;

import java.util.List;

/**
 * 通用manpper接口
 * @param <T>
 */
public interface BatchProcessMapper<T> {
    void batchInsert(List<T> list);

    void batchUpdate(List<T> list);
}

通用manpper接口实现类 MapperProxy

package com.atguigu.gulimall.providerconsumer.batch.mapperproxy;

import com.atguigu.gulimall.providerconsumer.batch.BatchProcessMapper;
import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;

import java.util.List;

import static com.atguigu.gulimall.providerconsumer.common.Constant.MAX_SIZE_PER_TIME;

/**
 * 通用manpper接口实现类
 * @author: jd
 * @create: 2024-06-27
 */
public class MapperProxy<T> implements BatchProcessMapper<T> {
    private BatchProcessMapper batchProcessMapper;

    public MapperProxy(BatchProcessMapper batchProcessMapper) {
        this.batchProcessMapper = batchProcessMapper;
    }

    @Override
    public void batchInsert(List<T> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }

        List<List<T>> partition = Lists.partition(list, MAX_SIZE_PER_TIME);
        for (List<T> batchList : partition) {
            batchProcessMapper.batchInsert(batchList);
        }
    }

    @Override
    public void batchUpdate(List<T> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }

        List<List<T>> partition = Lists.partition(list, MAX_SIZE_PER_TIME);
        for (List<T> batchList : partition) {
            batchProcessMapper.batchUpdate(batchList);
        }
    }

}

常量类 Constant.java

package com.atguigu.gulimall.providerconsumer.common;

import java.util.Arrays;
import java.util.stream.Collectors;

/**
 * 常量 、枚举类
 * @author: jd
 * @create: 2024-06-27
 */
public class Constant {

    public static final int MAX_SIZE_PER_TIME = 1000;
    public static final int INDEX_ZERO = 0;
    public static final int INDEX_ONE = 1;
    public static final int INDEX_TWO = 2;
    public static final int INDEX_THREE = 3;

    public static final int NUMBER_ZERO = 0;
    public static final int NUMBER_ONE = 1;

    public static final String COLON = ":";
    public static final String COMMA = ",";
    public static final String DOUBLE_STRIGULA = "--";
    public static final String REPLACEMENT_TARGET = "-99999%";

    public static final String UNKNOWN_TYPE = "未知类型";

    public interface Redis {
        String OK = "OK";
        // 过期时间, 60s, 一分钟
        Integer EXPIRE_TIME_MINUTE = 60;
        // 过期时间, 一小时
        Integer EXPIRE_TIME_HOUR = 60 * 60;
        // 过期时间, 一天
        Integer EXPIRE_TIME_DAY = 60 * 60 * 24;
        String TOKEN_PREFIX = "token:";
        String MSG_CONSUMER_PREFIX = "consumer:";
        String ACCESS_LIMIT_PREFIX = "accessLimit:";
        String FUND_RANK = "fundRank";
        String FUND_LIST = "fundList";
    }

    public interface LogType {
        // 登录
        Integer LOGIN = 1;
        // 登出
        Integer LOGOUT = 2;
    }

    /**
     * 相较于生产者对消息的角度来设置的此项枚举值
     */
    public interface MsgLogStatus {
        // 消息投递中
        Integer DELIVERING = 0;
        // 投递成功
        Integer DELIVER_SUCCESS = 1;
        // 投递失败
        Integer DELIVER_FAIL = 2;
        // 已消费
        Integer CONSUMED_SUCCESS = 3;
    }

    public enum CalculateTypeEnum {
        ADD(1, "加"),
        SUBTRACT(2, "减"),
        MULTIPLY(3, "乘"),
        DIVIDE(4, "除")
        ;

        Integer type;
        String desc;

        CalculateTypeEnum(Integer type, String desc) {
            this.type = type;
            this.desc = desc;
        }

        public Integer getType() {
            return type;
        }

        public String getDesc() {
            return desc;
        }
    }

    public enum FundSortType {
        ASC("asc"),
        DESC("desc"),
        ;

        private String type;

        FundSortType(String type) {
            this.type = type;
        }

        public String getType() {
            return type;
        }
    }
}

公共服务响应包装类【这个一般的项目中都会用到这个公共的封装】ServerResponse.java

package com.atguigu.gulimall.providerconsumer.common;

import com.fasterxml.jackson.annotation.JsonIgnore;
import jdk.nashorn.internal.ir.annotations.Ignore;

import java.io.Serializable;

/**
 * 公共服务响应包装类【这个一般的项目中都会用到这个公共的封装】
 * @author: jd
 * @create: 2024-06-27
 */
public class ServerResponse  implements Serializable {

    private static final long serialVersionUID = 7498483649536881777L;

    private Integer status;

    private String msg;

    private Object data;

    public ServerResponse() {
    }

    public ServerResponse(Integer status, String msg, Object data) {
        this.status = status;
        this.msg = msg;
        this.data = data;
    }

    /**
     * @JsonIgnore注解在Java中主要用于处理JSON序列化和反序列化过程,其具体作用如下:
     *
     * 忽略属性:当在Java对象的某个属性或方法上使用@JsonIgnore注解时,该属性或方法对应的属性在序列化为JSON字符串时会被忽略,同样地,在将JSON字符串反序列化为Java对象时,该属性或方法对应的属性也不会被解析。
     * 当用在属性上时:表示忽略该属性的序列化和反序列化。
     * 当用在方法上时:表示忽略该方法对应的属性的序列化和反序列化。
     * 保护敏感信息:在实际应用中,@JsonIgnore注解可以用于隐藏一些敏感信息,比如密码、token等,确保这些信息不会被发送到客户端或存储在不安全的地方。
     * 减少数据大小:通过忽略一些不必要的属性,可以减少序列化后的JSON数据大小,提高数据传输效率。
     * 解决循环引用问题:当对象之间存在循环引用时,使用@JsonIgnore注解可以避免在序列化过程中出现无限递归的情况。
     * 提高程序的可维护性和安全性:通过精确控制哪些属性参与序列化和反序列化,可以使得程序更加健壮,减少潜在的安全风险。
     * 需要注意的是,@JsonIgnore注解是Jackson库提供的,因此需要确保项目中引入了Jackson库的相关依赖。同时,在使用@JsonIgnore注解时要确保被标记的属性或方法确实不需要参与序列化和反序列化,否则可能会导致意外的结果。
     *
     * 总之,@JsonIgnore注解在Java对象和JSON之间的转换过程中起到了非常重要的作用,能够帮助我们更灵活地控制序列化和反序列化的行为。
     * @return
     */
    @JsonIgnore
    public boolean isSuccess() {
        return this.status == ResponseCode.SUCCESS.getCode();
    }


    public static ServerResponse success() {
        return new ServerResponse(ResponseCode.SUCCESS.getCode(), null, null);
    }

    public static ServerResponse success(String msg) {
        return new ServerResponse(ResponseCode.SUCCESS.getCode(), msg, null);
    }

    public static ServerResponse success(Object data) {
        return new ServerResponse(ResponseCode.SUCCESS.getCode(), null, data);
    }

    public static ServerResponse success(String msg, Object data) {
        return new ServerResponse(ResponseCode.SUCCESS.getCode(), msg, data);
    }

    public static ServerResponse error(String msg) {
        return new ServerResponse(ResponseCode.ERROR.getCode(), msg, null);
    }

    public static ServerResponse error(Object data) {
        return new ServerResponse(ResponseCode.ERROR.getCode(), null, data);
    }

    public static ServerResponse error(String msg, Object data) {
        return new ServerResponse(ResponseCode.ERROR.getCode(), msg, data);
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }
}

服务响应状态码 大部分的服务中都会用到这个公共的状态码类 ResponseCode.java

package com.atguigu.gulimall.providerconsumer.common;

/**
 * 服务响应状态码  大部分的服务中都会用到这个公共的状态码类
 */
public enum ResponseCode {
    // 系统模块
    SUCCESS(0, "操作成功"),
    ERROR(1, "操作失败"),
    SERVER_ERROR(500, "服务器异常"),

    // 通用模块 1xxxx
    ILLEGAL_ARGUMENT(10000, "参数不合法"),
    REPETITIVE_OPERATION(10001, "请勿重复操作"),
    ACCESS_LIMIT(10002, "请求太频繁, 请稍后再试"),
    MAIL_SEND_SUCCESS(10003, "邮件发送成功"),

    // 用户模块 2xxxx
    NEED_LOGIN(20001, "登录失效"),
    USERNAME_OR_PASSWORD_EMPTY(20002, "用户名或密码不能为空"),
    USERNAME_OR_PASSWORD_WRONG(20003, "用户名或密码错误"),
    USER_NOT_EXISTS(20004, "用户不存在"),
    WRONG_PASSWORD(20005, "密码错误"),
    ;


    private Integer code;

    private String msg;

    ResponseCode(Integer code, String msg) {
        this.code = code;
        this.msg = msg;
    }

    public Integer getCode() {
        return code;
    }

    public void setCode(Integer code) {
        this.code = code;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

4、工具类

时间字符操作类 JodaTimeUtil.java

package com.atguigu.gulimall.providerconsumer.util;

import com.alibaba.cloud.commons.lang.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import java.util.Date;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
/**
 * 时间字符操作类 JodaTimeUtil
 * @author: jd
 * @create: 2024-06-27
 */
@Slf4j
public class JodaTimeUtil {


    private static final String STANDARD_FORMAT = "yyyy-MM-dd HH:mm:ss";

    /**
     * date类型 -> string类型
     *
     * @param date
     * @return
     */
    public static String dateToStr(Date date) {
        return dateToStr(date, STANDARD_FORMAT);
    }

    /**
     * date类型 -> string类型
     *
     * @param date
     * @param format 自定义日期格式
     * @return
     */
    public static String dateToStr(Date date, String format) {
        if (date == null) {
            return null;
        }

        format = StringUtils.isBlank(format) ? STANDARD_FORMAT : format;
        DateTime dateTime = new DateTime(date);
        return dateTime.toString(format);
    }

    /**
     * string类型 -> date类型
     *
     * @param timeStr
     * @return
     */
    public static Date strToDate(String timeStr) {
        return strToDate(timeStr, STANDARD_FORMAT);
    }

    /**
     * string类型 -> date类型
     *
     * @param timeStr
     * @param format  自定义日期格式
     * @return
     */
    public static Date strToDate(String timeStr, String format) {
        if (StringUtils.isBlank(timeStr)) {
            return null;
        }

        format = StringUtils.isBlank(format) ? STANDARD_FORMAT : format;

        DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern(format);
        DateTime dateTime;
        try {
            dateTime = dateTimeFormatter.parseDateTime(timeStr);
        } catch (Exception e) {
            log.error("strToDate error: timeStr: {}", timeStr, e);
            return null;
        }

        return dateTime.toDate();
    }

    /**
     * 判断date日期是否过期(与当前时刻比较)
     *
     * @param date
     * @return
     */
    public static Boolean isTimeExpired(Date date) {
        String timeStr = dateToStr(date);
        return isBeforeNow(timeStr);
    }

    /**
     * 判断date日期是否过期(与当前时刻比较)
     *
     * @param timeStr
     * @return
     */
    public static Boolean isTimeExpired(String timeStr) {
        if (StringUtils.isBlank(timeStr)) {
            return true;
        }

        return isBeforeNow(timeStr);
    }

    /**
     * 判断timeStr是否在当前时刻之前
     *
     * @param timeStr
     * @return
     */
    private static Boolean isBeforeNow(String timeStr) {
        DateTimeFormatter format = DateTimeFormat.forPattern(STANDARD_FORMAT);
        DateTime dateTime;
        try {
            dateTime = DateTime.parse(timeStr, format);
        } catch (Exception e) {
            log.error("isBeforeNow error: timeStr: {}", timeStr, e);
            return null;
        }
        return dateTime.isBeforeNow();
    }

    /**
     * 日期加天数
     *
     * @param date
     * @param days
     * @return
     */
    public static Date plusDays(Date date, int days) {
        return plusOrMinusDays(date, days, 0);
    }

    /**
     * 日期减天数
     *
     * @param date
     * @param days
     * @return
     */
    public static Date minusDays(Date date, int days) {
        return plusOrMinusDays(date, days, 1);
    }

    /**
     * 加减天数
     *
     * @param date
     * @param days
     * @param type 0:加天数 1:减天数
     * @return
     */
    private static Date plusOrMinusDays(Date date, int days, Integer type) {
        if (null == date) {
            return null;
        }

        DateTime dateTime = new DateTime(date);
        if (type == 0) {
            dateTime = dateTime.plusDays(days);
        } else {
            dateTime = dateTime.minusDays(days);
        }

        return dateTime.toDate();
    }

    /**
     * 日期加分钟
     *
     * @param date
     * @param minutes
     * @return
     */
    public static Date plusMinutes(Date date, int minutes) {
        return plusOrMinusMinutes(date, minutes, 0);
    }

    /**
     * 日期减分钟
     *
     * @param date
     * @param minutes
     * @return
     */
    public static Date minusMinutes(Date date, int minutes) {
        return plusOrMinusMinutes(date, minutes, 1);
    }

    /**
     * 加减分钟
     *
     * @param date
     * @param minutes
     * @param type    0:加分钟 1:减分钟
     * @return
     */
    private static Date plusOrMinusMinutes(Date date, int minutes, Integer type) {
        if (null == date) {
            return null;
        }

        DateTime dateTime = new DateTime(date);
        if (type == 0) {
            dateTime = dateTime.plusMinutes(minutes);
        } else {
            dateTime = dateTime.minusMinutes(minutes);
        }

        return dateTime.toDate();
    }

    /**
     * 日期加月份
     *
     * @param date
     * @param months
     * @return
     */
    public static Date plusMonths(Date date, int months) {
        return plusOrMinusMonths(date, months, 0);
    }

    /**
     * 日期减月份
     *
     * @param date
     * @param months
     * @return
     */
    public static Date minusMonths(Date date, int months) {
        return plusOrMinusMonths(date, months, 1);
    }

    /**
     * 加减月份
     *
     * @param date
     * @param months
     * @param type   0:加月份 1:减月份
     * @return
     */
    private static Date plusOrMinusMonths(Date date, int months, Integer type) {
        if (null == date) {
            return null;
        }

        DateTime dateTime = new DateTime(date);
        if (type == 0) {
            dateTime = dateTime.plusMonths(months);
        } else {
            dateTime = dateTime.minusMonths(months);
        }

        return dateTime.toDate();
    }

    /**
     * 判断target是否在开始和结束时间之间
     *
     * @param target
     * @param startTime
     * @param endTime
     * @return
     */
    public static Boolean isBetweenStartAndEndTime(Date target, Date startTime, Date endTime) {
        if (null == target || null == startTime || null == endTime) {
            return false;
        }

        DateTime dateTime = new DateTime(target);
        return dateTime.isAfter(startTime.getTime()) && dateTime.isBefore(endTime.getTime());
    }
}

Object 和String互转类 JsonUtil

package com.atguigu.gulimall.providerconsumer.util;

import com.alibaba.cloud.commons.lang.StringUtils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import lombok.extern.slf4j.Slf4j;

import java.text.SimpleDateFormat;

/**
 * Object 和String互转类
 * @author: jd
 * @create: 2024-06-27
 */
@Slf4j
public class JsonUtil {
    private static ObjectMapper objectMapper = new ObjectMapper();
    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

    static {
        // 对象的所有字段全部列入
        objectMapper.setSerializationInclusion(JsonInclude.Include.ALWAYS);
        // 取消默认转换timestamps形式
        objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
        // 忽略空bean转json的错误
        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        // 统一日期格式
        objectMapper.setDateFormat(new SimpleDateFormat(DATE_FORMAT));
        // 忽略在json字符串中存在, 但在java对象中不存在对应属性的情况, 防止错误
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }

    /**
     * 将Object转化为String对象
     * @param obj
     * @param <T>
     * @return
     */
    public static <T> String objToStr(T obj) {
        if (null == obj) {
            return null;
        }

        try {
            return obj instanceof String ? (String) obj : objectMapper.writeValueAsString(obj);
        } catch (Exception e) {
            log.warn("objToStr error: ", e);
            return null;
        }
    }

    /**
     * 将字符串转化成Object对象
     * @param str   待转的字符串
     * @param clazz 类名
     * @param <T>
     * @return
     */
    public static <T> T strToObj(String str, Class<T> clazz) {
        if (StringUtils.isBlank(str) || null == clazz) {
            return null;
        }

        try {
            return clazz.equals(String.class) ? (T) str : objectMapper.readValue(str, clazz);
        } catch (Exception e) {
            log.warn("strToObj error: ", e);
            return null;
        }
    }

    public static <T> T strToObj(String str, TypeReference<T> typeReference) {
        if (StringUtils.isBlank(str) || null == typeReference) {
            return null;
        }

        try {
            return (T) (typeReference.getType().equals(String.class) ? str : objectMapper.readValue(str, typeReference));
        } catch (Exception e) {
            log.error("strToObj error", e);
            return null;
        }
    }
}

发送邮件工具类 MailUtil.java

package com.atguigu.gulimall.providerconsumer.util;

import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.MailException;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;

import javax.mail.internet.AddressException;
import javax.mail.internet.InternetAddress;

/**
 *
 * 发送邮件工具类
 * @author: jd
 * @create: 2024-06-27
 */

@Component
@Slf4j
public class MailUtil {

    @Value("${spring.mail.from}")    //这里从application.xml中拿不到配置信息,所以从这里直接写死了
    private String from ="15131650119@163.com";

    @Autowired
    private JavaMailSender mailSender;


    public boolean send(Mail mail) throws AddressException {
        //模拟消费成功,但是业务实际没成功,此时会重新入队列,不会造成消息丢失
//        if(true){
//            return false;
//        }
        String to = mail.getTo();// 目标邮箱
        String title = mail.getTitle();// 邮件标题
        String content = mail.getContent();// 邮件正文

        SimpleMailMessage message = new SimpleMailMessage();
        message.setFrom(String.valueOf(new InternetAddress(from)));  //设置发送人
        message.setTo(to);  //设置目标账户
        message.setSubject(title); //设置邮件标题
        message.setText(content);  //设置邮件内容


        try {
            log.info("===================>开始发送邮件");
            mailSender.send(message);
            log.info("===================>邮件发送成功");
            return true;
        } catch (MailException e) {
            log.error("=============>邮件发送失败, to: {}, title: {}", to, title, e);
            return false;
        }


    }


}

SpringBeanUtil.java 获取BeanSpring容器类

package com.atguigu.gulimall.providerconsumer.util;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * @author: jd
 * @create: 2024-06-27
 */
@Component
public class SpringBeanUtil implements ApplicationContextAware {


    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
            throws BeansException {
        SpringBeanUtil.applicationContext = applicationContext;
    }

    /**
     * 通过名称在spring容器中获取对象
     *
     * @param beanName
     * @return
     */
    public static Object getBean(String beanName) {
        System.out.println(applicationContext);
        return applicationContext.getBean(beanName);
    }

}

5、RabbitMQ消费者、生产者配置类

A、MQ生产者:

TestController.java

package com.atguigu.gulimall.providerconsumer.service.impl;

import com.atguigu.gulimall.providerconsumer.common.ResponseCode;
import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
 * 消息生产接口实现类
 * @author: jd
 * @create: 2024-06-27
 */
@Service
@Slf4j
public class TestServiceImpl  implements TestService {

    @Autowired
    private MsgLogMapper msgLogMapper;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public ServerResponse testIdempotence() {
        return ServerResponse.success("testIdempotence: success");
    }

    @Override
    public ServerResponse accessLimit() {
        return ServerResponse.success("accessLimit: success");
    }

    @Override
    public ServerResponse send(Mail mail) {
        // 1. 生产唯一业务标识
        String msgId = String.valueOf(UUID.randomUUID());  //业务的唯一标识
        mail.setMsgId(msgId);

        //2.记录日志
        MsgLog msgLog = new MsgLog(msgId, mail, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);
        msgLogMapper.insertMsgLog(msgLog);// 消息入库  先记录日志

        //3.真正发送消息到MQ中
        CorrelationData correlationData = new CorrelationData(msgId);
        rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME,
                MessageHelper.objToMsg(mail), correlationData);// 发送消息

        log.info("====================>消息已发送队列");
        //返回公共的响应结果
        return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());
    }
}

队列 交换机配置,用于消息生产者:RabbitConfig.java

package com.atguigu.gulimall.providerconsumer.config;

import com.atguigu.gulimall.providerconsumer.common.Constant;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

/**
 *
 * 队列 交换机配置,用于消息生产者
 * @author: jd
 * @create: 2024-06-27
 */

@Slf4j
@Component
@Configuration
public class RabbitConfig {

    @Autowired
    private MsgLogService msgLogService;

    // 发送邮件
    public static final String MAIL_QUEUE_NAME = "mail.queue";
    public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
    public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";

    @Bean
    public Queue mailQueue() {
        return new Queue(MAIL_QUEUE_NAME, true);
    }

    @Bean
    public DirectExchange mailExchange() {
        return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);
    }

    @Bean
    public Binding mailBinding() {
        return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
    }

//    @Autowired
//    private CachingConnectionFactory connectionFactory;

//    ConnectionFactory connectionFactory = (ConnectionFactory) SpringBeanUtil.getBean("connectionFactory");

    /**
     * 设置生产者消息确认回调函数
     *
     */
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory  connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setMessageConverter(converter());
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    log.info("消息成功发送到Exchange");
                    String msgId = correlationData.getId();
                    msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);
                } else {
                    log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
                }
                System.out.println("ConfirmCallback回调:     "+"相关数据:"+correlationData);
                System.out.println("ConfirmCallback回调:     "+"确认情况:"+ack);
                System.out.println("ConfirmCallback回调:     "+"原因:"+cause);
            }
        });

        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {

                System.out.println("ReturnCallback回调:     "+"消息:"+returnedMessage.getMessage());
                System.out.println("ReturnCallback回调:     "+"回应码:"+returnedMessage.getReplyCode());
                System.out.println("ReturnCallback回调:     "+"回应信息:"+returnedMessage.getReplyText());
                System.out.println("ReturnCallback回调:     "+"交换机:"+returnedMessage.getExchange());
                System.out.println("ReturnCallback回调:     "+"路由键:"+returnedMessage.getRoutingKey());
                log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}",
                        returnedMessage.getExchange(),
                        returnedMessage.getRoutingKey(),
                        returnedMessage.getReplyCode(),
                        returnedMessage.getReplyText(),
                        returnedMessage.getMessage());

            }
        });

        return rabbitTemplate;
    }


    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }


}

B、MQ 消费者 其实就完成了3件事: 1.保证消费幂等性, 2.发送邮件, 3.更新消息状态, 手动ack
package com.atguigu.gulimall.providerconsumer.mq.consumer;

import com.atguigu.gulimall.providerconsumer.common.Constant;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import com.atguigu.gulimall.providerconsumer.util.JsonUtil;
import com.atguigu.gulimall.providerconsumer.util.MailUtil;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.mail.internet.AddressException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;

/**
 * MQ 监听者,操作业务(发送邮件)
 * 其实就完成了3件事:
 *      1.保证消费幂等性, 2.发送邮件, 3.更新消息状态, 手动ack
 * @author: jd
 * @create: 2024-06-27
 */
@Component
@Slf4j
@RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)  //指定监听队列
public class MailConsumer {

    @Autowired
     private MsgLogService msgLogService;

    @Autowired
    private MailUtil mailUtil;

    @RabbitHandler(isDefault = true)   //指定监听后的处理动作
    public void consume(Message message, Channel channel) throws IOException, AddressException {
        //将Message中的业务数据转化成Mail对象
        Mail mail = MessageHelper.msgToObj(message, Mail.class);
        log.info("================>消费者收到消息: {}", mail.toString());
        log.debug("=========测试debug和info有什么区别======");
        //根据ID查询Msg对象
        String msgId = mail.getMsgId();
        MsgLog msgLog = msgLogService.selectByMsgId(msgId);
        // 消费幂等性
        if (null == msgLog || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)) {
            log.info("===========>消费者重复消费,此时不进行消费 ,msgId: {}", msgId);
            //直接终止程序运行,程序返回
            return;
        }

        //拿到MQ中的每一条消息的唯一标识Tag
        MessageProperties properties = message.getMessageProperties();
        long tag = properties.getDeliveryTag();

        //业务操作:发送邮件
        log.info("================>准备发送邮件");
        boolean send = mailUtil.send(mail);
//
        try {
            //如果发送邮件成功,则修改消息状态为 已消费
            if(send){
                //发送成功后更新消息日志表的消息记录状态
                msgLogService.updateStatus(msgId, Constant.MsgLogStatus.CONSUMED_SUCCESS);
                //取得进程ID
                Thread t = Thread.currentThread();
                log.info("【消息队列】current request consumer success, request info: {}; thread info: {};", JsonUtil.objToStr(mail), t);
                // 消费确认,设置反馈给MQ
                channel.basicAck(tag, false);
            }else {
                log.error("【消息队列】consumer failed,, msg info: {}", JsonUtil.objToStr(mail));
                channel.basicNack(tag, false, true);  //这样会告诉rabbitmq该消息消费失败, 需要重新入队, 可以重新投递到其他正常的消费端进行消费, 从而保证消息不被丢失
            }
        } catch (Exception e) {
            //产生异常之后,则不消费,直接拒绝此消息,不进行消费;这样会导致这条失败的消息会一直存在队列里面,然后定时任务过一会在数据库中扫到这个信息之后,会再去MQ中拿这个消息进行消费
            e.printStackTrace();
            ByteArrayOutputStream bass = new ByteArrayOutputStream();
            e.printStackTrace(new PrintStream(bass));
            log.error("【消息队列】consumer error, error info: {}, msg info: {}", bass, JsonUtil.objToStr(mail));
            channel.basicNack(tag, false, true);
        }


    }

}

6、定时任务重发: ResendMsg.java (说明: 每一条消息都和exchange routingKey绑定, 所有消息重投共用这一个定时任务即可)

package com.atguigu.gulimall.providerconsumer.task;

import com.atguigu.gulimall.providerconsumer.common.Constant;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * 消息重发定时任务
 * @author: jd
 * @create: 2024-06-28
 */
@Component
@Slf4j
public class ResendMsg {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 最大投递次数。第四次投递失败
    private static final int MAX_TRY_COUNT = 3;

    @Autowired
    private MsgLogService msgLogService;

    /**
     * 每30s拉取投递失败的消息, 重新投递
     */
    @Scheduled(cron = "0/30 * * * * ?")
    public void reSend(){
        log.info("开始执行定时任务(重新投递消息)");
        List<MsgLog> msgLogs = msgLogService.selectTimeoutMsg();  //查询还在投递中的消息
        msgLogs.forEach(msgLog->{
            String msgId = msgLog.getMsgId();
            //超过投递次数则不会重新投递中的消息是否需要投递
            if(msgLog.getTryCount()>=MAX_TRY_COUNT){
                //不需要重新投递
                msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);
                log.info("消息ID {}超过最大的投递次数 {} 次,投递失败,需要人工查看!",msgId,MAX_TRY_COUNT);
            }else {
                //拿到消息在表中的本次重试时间,去获取下一次重试时间  同时 投递次数+1
                msgLogService.updateTryCount(msgId,msgLog.getNextTryTime());
                CorrelationData correlationData = new CorrelationData(msgId);//携带业务信息,作为业务的唯一标识
                //重新发送消息到MQ,让MQ去重新尝试消费这一条之前没有发送到MQ的消息(因为我们现在查的消息的状态是status =0 的代表是消息还是投递中的,没有变成投递成功的消息,肯定是投递有问题)
                rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME,
                        RabbitConfig.MAIL_ROUTING_KEY_NAME,  //每一条消息都和exchange routingKey绑定, 所有消息重投共用这一个定时任务即可
                        MessageHelper.objToMsg(msgLog),
                        correlationData);
                log.info("第 " + (msgLog.getTryCount() + 1) + " 次重新投递消息");
            }
        });
        log.info("定时任务执行结束(重新投递消息)");  //

    }


}

四、基本测试

OK, 目前为止, 代码准备就绪, 现在进行正常流程的测试 1.发送请求:
在这里插入图片描述
后台日志:
在这里插入图片描述
3.库消息记录:
在这里插入图片描述
状态为3, 表明已消费, 消息重试次数为0, 表明一次投递就成功了,此时就可以到目标邮箱中去查看是否接收到了这个邮件

五、异常情况测试

1.验证消息发送到Exchange失败情况下的回调, 对应上图P -> X

如何验证? 可以随便指定一个不存在的交换机名称, 请求接口, 看是否会触发回调
在这里插入图片描述
发送失败, 原因: reply-code=404, reply-text=NOT_FOUND - no exchange ‘mail.exchangeabcd’ in vhost ‘/’, 该回调能够保证消息正确发送到Exchange, 测试完成

2.验证消息从Exchange路由到Queue失败情况下的回调, 对应上图X -> Q 同理, 修改一下路由键为不存在的即可, 路由失败, 触发回调
在这里插入图片描述
发送失败, 原因: route: mail.routing.keyabcd, replyCode: 312, replyText: NO_ROUTE

3.验证在手动ack模式下, 消费端必须进行手动确认(ack), 否则消息会一直保存在队列中, 直到被消费, 对应上图Q -> C 将消费端代码channel.basicAck(tag, false);// 消费确认注释掉, 查看控制台和rabbitmq管控台
在这里插入图片描述
在这里插入图片描述
可以看到, 虽然消息确实被消费了, 但是由于是手动确认模式, 而最后又没手动确认, 所以, 消息仍被rabbitmq保存, 所以, 手动ack能够保证消息一定被消费, 但一定要记得basicAck

4.验证消费端幂等性 接着上一步, 去掉注释, 重启服务器, 由于有一条未被ack的消息, 所以重启后监听到消息, 进行消费, 但是由于消费前会判断该消息的状态是否未被消费, 发现status=3, 即已消费, 所以, 直接return, 这样就保证了消费端的幂等性, 即使由于网络等原因投递成功而未触发回调, 从而多次投递, 也不会重复消费进而发生业务异常
在这里插入图片描述

5.验证消费端发生异常消息也不会丢失 很显然, 消费端代码可能发生异常, 如果不做处理, 业务没正确执行, 消息却不见了, 给我们感觉就是消息丢失了, 由于我们消费端代码做了异常捕获, 业务异常时, 会触发: channel.basicNack(tag, false, true);, 这样会告诉rabbitmq该消息消费失败, 需要重新入队, 可以重新投递到其他正常的消费端进行消费, 从而保证消息不被丢失 测试: send方法直接返回false即可(这里跟抛出异常一个意思),因为我们向MQ插入了消息,但是实际业务消费了,但是发送邮件返回了false,这样会从新投递到MQ队列中,再进行消费,一直重复。
代码修改:
在这里插入图片描述
结果:
在这里插入图片描述

可以看到, 由于channel.basicNack(tag, false, true), 未被ack的消息(unacked)会重新入队并被消费, 这样就保证了消息不会走丢

6.验证定时任务的消息重投 实际应用场景中, 可能由于网络原因, 或者消息未被持久化MQ就宕机了, 使得投递确认的回调方法ConfirmCallback没有被执行, 从而导致数据库该消息状态一直是投递中的状态, 此时就需要进行消息重投, 即使也许消息已经被消费了 定时任务只是保证消息100%投递成功, 而多次投递的消费幂等性需要消费端自己保证 我们可以将回调和消费成功后更新消息状态的代码注释掉, 开启定时任务, 查看是否重投

这是没有异常信息的情况下,定时任务每次都不会做实际的业务:
在这里插入图片描述
当我们对一条消息,进行了实际的业务处理,而且也业务处理成功了,只是没有把状态修改成成功,这样定时任务会扫,重新入队列,但是有幂等性校验,所以一直发送到队列将这条信息,直到3次后,消息会被更新为发送失败
在这里插入图片描述

在这里插入图片描述

发送邮件其实很简单, 但深究起来其实有很多需要注意和完善的点, 一个看似很小的知识点, 也可以引申出很多问题, 甚至涉及到方方面面, 这些都需要自己踩坑, 当然我这代码肯定还有很多不完善和需要优化的点, 希望小伙伴多多提意见和建议 我的代码都是经过自测验证过的, 图也都是一点一点自己画的或认真截的, 希望小伙伴能学到一点东西, 路过的点个赞或点个关注呗, 谢谢

部分参考:springboot + rabbitmq发送邮件实战(保证消息100%投递成功并被消费)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/764886.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

8617 阶乘数字和

这是一个关于计算阶乘结果所有位上的数字之和的问题。我们可以通过以下步骤来解决这个问题&#xff1a; 1. 首先&#xff0c;我们需要一个函数来计算阶乘。由于n的范围可以达到50&#xff0c;阶乘的结果可能非常大&#xff0c;所以我们需要使用一个可以处理大整数的数据类型&a…

零成本、高效率:免费可视化工具的魅力所在

在如今这个数据驱动的时代&#xff0c;免费可视化工具越来越受到人们的欢迎。这些工具不仅降低了数据分析的门槛&#xff0c;还为用户提供了强大的功能和极高的灵活性&#xff0c;使得各行各业的人们都能够轻松地利用数据做出明智的决策。首先&#xff0c;免费可视化工具的零成…

软件鉴定测试的工作内容是什么?专业软件鉴定测试报告获取指南

软件鉴定测试是指对软件产品进行全面的检测和评估&#xff0c;以验证其是否符合规定的标准和要求。通过测试&#xff0c;能够发现软件中存在的问题和缺陷&#xff0c;并提供相应的改进建议。在不同的测试阶段&#xff0c;使用不同的测试方法和工具&#xff0c;包括功能测试、性…

智慧公厕系统改变了人们对服务区公厕的看法

在过去&#xff0c;服务区公厕常常给人留下脏乱差的印象&#xff0c;成为人们在长途旅行途中不愿停留的地方。然而&#xff0c;随着智慧科技的不断发展和应用&#xff0c;智慧公厕系统的出现改变了人们对服务区公厕的看法&#xff0c;为公共卫生设施的提升注入了新的活力。 一、…

链路全贯通,价值引领数据能力升级|爱分析报告

数据能力已经成为企业的核心竞争力。政策驱动数据产业发展加速&#xff0c;如2023年国家数据局成立&#xff1b;2024年&#xff0c;《“数据要素”三年行动计划&#xff08;2024-2026年&#xff09;》正式发布&#xff1b;并且 2024年起正式将数据资源视为资产纳入财务报表&…

一键变身!Cloud-Init让PVE镜像华丽转身,快来看看怎么做!

在虚拟化环境中&#xff0c;自动化配置虚拟机&#xff08;VM&#xff09;是提高效率的关键。Proxmox VE&#xff08;PVE&#xff09;是一款流行的开源虚拟化平台&#xff0c;而Cloud-Init是一种用于初始化云实例的工具。结合PVE和Cloud-Init&#xff0c;我们可以快速创建和配置…

红酒与露营:户外时光的好伴侣

在繁忙的都市生活中&#xff0c;人们总是渴望逃离喧嚣&#xff0c;寻找一处宁静的天地&#xff0c;与大自然亲密相拥。露营&#xff0c;作为一种返璞归真的户外生活方式&#xff0c;成为了许多人心中的理想选择。而在露营的浪漫时光里&#xff0c;一瓶雷盛红酒的陪伴&#xff0…

内网渗透学习-杀入内网

1、靶机上线cs 我们已经拿到了win7的shell&#xff0c;执行whoami&#xff0c;发现win7是administrator权限&#xff0c;且在域中 执行ipconfig发现了win7存在内网网段192.168.52.0/24 kali开启cs服务端 客户端启动cs 先在cs中创建一个监听器 接着用cs生成后门&#xff0c;记…

开源RAG个人知识库项目开发分析

前言 Hello&#xff0c;大家好&#xff0c;我是GISer Liu&#x1f601;&#xff0c;一名热爱AI技术的GIS开发者&#xff0c;这个LLM开发基础阶段已经进入尾声了&#xff0c;本文中我们不介绍更多的理论与知识点&#xff0c;而是通过的分析开源项目的解决方案来帮助各位开发者理…

Pytest--安装与入门

pytest是一个能够简化成测试系统构建、方便测试规模扩展的框架&#xff0c;它让测试变得更具表现力和可读性–模版代码不再是必需的。只需要几分钟的时间&#xff0c;就可以对你的应用开始一个简单的单元测试或者复杂的功能测试。 1. 安装pytest pip install -U pytest检查版…

计算机人说学校-北京交通大学-计算机方向

北京交通大学&#xff08;Beijing Jiaotong University&#xff0c;简称BJTU&#xff09;是一所位于中国首都北京的全国重点大学&#xff0c;隶属于中华人民共和国教育部&#xff0c;并由教育部、交通运输部、北京市人民政府和中国国家铁路集团有限公司共建。该校是国家“双一流…

为什么 [] == ![] 为 true?

&#x1f9d1;‍&#x1f4bb; 写在开头 点赞 收藏 学会&#x1f923;&#x1f923;&#x1f923; 前言 面试官问我&#xff0c;[] ![] 的结果是啥&#xff0c;我&#xff1a;蒙一个true&#xff1b; 面试官&#xff1a;你是对的&#xff1b;我&#xff1a;内心非常高兴&a…

【PyTest】玩转HTML报告:修改、汉化和优化

前言 Pytest框架可以使用两种测试报告&#xff0c;其中一种就是使用pytest-html插件生成的测试报告&#xff0c;但是报告中有一些信息没有什么用途或者显示的不太好看&#xff0c;还有一些我们想要在报告中展示的信息却没有&#xff0c;最近又有人问我pytest-html生成的报告&a…

搜索型数据库的技术发展历程与趋势前瞻

概述 随着数字科技的飞速发展和信息量的爆炸性增长&#xff0c;搜索引擎已成为我们获取信息的首选途径之一&#xff0c;典型的代表厂商如 Google。然而&#xff0c;随着用户需求的不断演变&#xff0c;传统的搜索技术已经无法满足人们对信息的实时性、个性化和多样性的需求。 …

【启明智显技术分享】SSD202D核心板方案双网口SBC2D06开发板开箱与实操全攻略上手指南

一、背景 本指南将详细介绍启明智显基于SSD202D核心板方案下的双网口-SBC2D06的开箱及实操上手应用。无论您是电子爱好者、开发者还是工程师&#xff0c;这份指南都能助您快速上手并充分利用这款双网口开发板的各项功能。 二、硬件介绍 SBC2D06双网口开发板&#xff0c;作为…

什么方法能快速分享视频给他人?视频二维码提供预览的制作技巧

现在想要分享一个或者多个视频时&#xff0c;很多人会选择将视频生成二维码的方法来展现视频内容&#xff0c;通过这种方式可以让多人同时扫码查看同一个视频&#xff0c;有效提升其他人获取内容的速度及视频传播的效率。那么视频转换成二维码的方法是什么样的呢&#xff1f; …

USB PD+TYPE -C快充电源中MOSFET选型,USB PD应用市场包含智能手机,平板电脑,笔记本电脑,游戏本,移动硬盘,数码相机,电动工具等传统领域

USB PD全称为USB Power Delivery&#xff0c;是由USB-IF组织制定的一种快速充电协议&#xff0c;也是目前市场非常看好的一种协议&#xff0c;可以支持输出功率高达100W&#xff1b;Type-C是一种接口规范&#xff0c;能够支持传输更大的电流。USB PD应用市场不仅包含智能手机&a…

【项目】个人论坛测试报告

论坛测试报告 一、项目背景二、项目功能三、测试计划一&#xff09;功能测试二&#xff09;自动化测试三&#xff09;性能测试登录测试 使用VUG创建脚本1&#xff09;编写用户进行登录的脚本2&#xff09;对脚本进行强化 使用controller创建场景使用Analysis生成测试报告测试报…

NX 二次开发-获取CAM切削层数据

int count 0;tag_t* objects;UF_UI_ONT_ask_selected_nodes(&count, &objects); //获取当前加工导航器选中的对象数量和tagif (count < 0){return 0;}UF_CUT_LEVELS_t* cut_levels_ptr_addr NULL; //读工序的切削层UF_CUT_LEVELS_load(objects[0], &…

高考失利咨询复读,银河补习班客服开挂回复

补习班的客服在高考成绩出来后&#xff0c;需要用专业的知识和足够的耐心来回复各种咨询&#xff0c;聊天宝快捷回复软件&#xff0c;帮助客服开挂回复。 ​ 前言 高考成绩出来&#xff0c;几家欢喜几家愁&#xff0c;对于高考失利的学生和家长&#xff0c;找一个靠谱的复读补…