Redis队列自研组件

     背景

        年初的时候设计实践过一个课题:SpringBoot+Redis实现不重复消费的队列,并用它开发了一个年夜饭下单和制作的服务。不知道大家还有没有印象。完成这个课题后,我兴致勃勃的把它运用到了项目里面,可谁曾想,运行不久后运维人员就找到了我,说我的功能有问题,而且问题很大。具体表现在用户下单之后,很久都没有收到订单完成的消息,后台服务里的日志也是残缺的,只查到了开始制作年夜饭的日志,没有年夜饭制作过程和完成的日志。

        为此,我花了大量的时间和精力去分析程序,定位问题,最终发现,是运维人员在系统上线时将生产服务和UAT服务的redis服务地址给配成了同一个,这就导致生产上的订单进入Redis队列后,被UAT服务给消费了,而UAT和生产又是不同的数据库,自然导致UAT上通过队列中的主键在数据库中找不到相关数据,从而消费失败的结果。而这些失败的信息全都记录在了UAT的服务器上,生产服务器中自然很难分析和定位到问题。

        要解决这个问题其实很简单,只需要把channelTopic改为从配置文件中获取,然后生产、UAT环境配置不同的字符串即可。

        但实际上真的只做这些就够了吗?如果下次再发生不可预料的问题,我还要花那么多的时间吭哧吭哧的去看日志,调程序,定位问题吗?答案是否定的!

      原服务整改

        结合生产环境产生的问题,痛定思痛,我决定对原来的服务进行一轮大整改,优化服务的可维护性,可测试性。在我的构想中,新的服务需要有以下功能:

        1、保证原有的“年夜饭”功能稳定正常的运行。

        2、可以查询哪些订单还未开始处理

        3、可以查询哪些订单已经处理,以及处理结果。

        4、可以清空N天以前的处理成功的订单。

        5、可以清空待处理的订单

        6、对于已经处理但处理失败的订单,可以一键重新处理

        7、待处理订单插队

        我的设想是,通过redisTemplate.opsForZSet()方法创建两个新队列:待办队列和已办队列,在下单时,插入一条数据到待办队列,在处理任务时,从待办队列中删除该数据,在处理完成后,插入一条数据到已办队列,这样,通过查询待办队列,已办队列,就可以知道哪些任务还在排队,哪些任务已经完成了。

        以下是我的程序整改过程,老粉可以对比上一篇博客看看两者的不同之处。

#以下是application.yml配置

server:
  port: 19200
  servlet:
    context-path: /leixi
  max-http-header-size: 102400
spring:
  redis:
    database: 9
    host: 127.0.0.1
    port: 6379
    password:
    jedis:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0
leixi:
  redis-queue-key: NEW_YEAR_DINNER_DEV
//以下是Java程序代码:
/**
 * Redis配置
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Configuration
public class RedisConfig {

    @Value("${leixi.redis-queue-key}")
    private String REDIS_QUEUE_KEY ;


    @Bean
    public ChannelTopic topic() {
        return new ChannelTopic(REDIS_QUEUE_KEY+ Constant.WORKING_QUEUE_SUFFIX);
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter(DinnerListener listener) {
        return new MessageListenerAdapter(listener);
    }

    @Bean
    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory redisConnectionFactory,
                                                        MessageListenerAdapter messageListenerAdapter,
                                                        ChannelTopic topic) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        container.addMessageListener(messageListenerAdapter, topic);
        return container;
    }
}


/**
 * 订单处理控制层
 * 
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@RestController
public class DinnerController {

    private int i = 0;

    @Autowired
    private DinnerService service;

    @Value("${leixi.redis-queue-key}")
    private String REDIS_QUEUE_KEY ;

    @GetMapping("/orderDinner")
    public Object orderDinner() {
        OrderEntity entity = new OrderEntity();
        entity.setOrderCode("Order" + (++i));
        entity.setCustomerName("第"+i+"位客户");
        return service.orderNewYearEveDinner(entity);
    }

    @Autowired
    private RedisTemplate<String, String> redisTemplate;


    @GetMapping("/getPendingOrder")
    public Object getPendingOrder() {
       return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
    }

    @GetMapping("/cleanPendingOrder")
    public Object cleanPendingOrder() {
        Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
        if(set.size() > 0) {
            redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY + Constant.PENDING_SUFFIX, set.toArray());
        }
        return "待处理订单已被清空!";

    }

    @GetMapping("/getHandledOrder")
    public Object getHandledOrder() {
        return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, -1);
    }

    @GetMapping("/cleanOldSucceedOrder")
    public Object cleanHandledOrder(@RequestParam("day") Integer day) {
        Set<String> set = redisTemplate.opsForZSet().rangeByScore(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, Constant.getScoreByDate() - day);
        set.forEach(s -> {
            JSONObject obj = JSONObject.parseObject(s);
            if (obj.getString("result").equals("SUCCESS")) {
                redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, s);
            }
        });
        return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, System.currentTimeMillis());
    }

    /**
     * 这里还有最后一个问题, 把已办里的错误的信息摘除出来,重新走请求。并且反馈哪些信息重新走了请求。
     */
    @GetMapping("/restartFailedOrder")
    public Object restartFailedOrder() {
        Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, -1);
        StringBuilder sb = new StringBuilder();
        set.forEach(s -> {
            JSONObject obj = JSONObject.parseObject(s);
            if (!obj.getString("result").equals("SUCCESS")) {
                redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, s);
                OrderEntity entity = JSON.parseObject(obj.getString("msg"), OrderEntity.class);
                service.orderNewYearEveDinner(entity);
                sb.append(entity.getOrderCode()).append(",");
            }
        });
        return "以下订单号被重启: "+ sb;
    }


    @GetMapping("/cutInLineJob")
    public Object cutInLineJob(@RequestParam("orderCode")  String orderCode) {
        Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
        for (String s : set) {
            OrderEntity obj = JSONObject.parseObject(s, OrderEntity.class);
            if (obj.getOrderCode().equals(orderCode)) {
                CompletableFuture.runAsync(() -> {
                    service.doListenerWork(s);
                });
                return "订单 " + orderCode + " 插队成功!";
            }
        }
        return " 插队失败,该订单已经在制作了!";
    }
}

/**
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Data
public class OrderEntity implements Serializable {

    /**
     * 客户姓名
     */
    private String customerName;

    /**
     * 订单号
     */
    private String orderCode;

    /**
     * 菜单
     */
    List<String> menus;

    /**
     * 出餐状态
     */
    private String dinnerState;

    /**
     * 做饭开始时间
     */
    private String dinnerStartTime;

    /**
     * 做饭结束时间
     */
    private String dinnerEndTime;

    /**
     * 备注
     */
    private String remark;
}

/**
 * 监听类
 *  
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Component
public class DinnerListener implements MessageListener {

    @Autowired
    private DinnerService service;
    private final Object lock = new Object();

    @Override
    public void onMessage(Message message, byte[] pattern)  {
        synchronized (lock) {
            service.doListenerWork(message.toString());
        }
    }
}


/**
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Slf4j
@Service
public class DinnerService {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Value("${leixi.redis-queue-key}")
    private String REDIS_QUEUE_KEY ;

    /**
     * 年夜饭下单
     *
     * @param req 订单信息
     * @return
     */
    public Object orderNewYearEveDinner(OrderEntity req) {
        // 存储订单信息
        saveOrder(req);
        // 异步开始做菜
        redisTemplate.delete(JSON.toJSONString(req));
        redisTemplate.opsForZSet().add(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, JSON.toJSONString(req), Constant.getScoreByDate());
        redisTemplate.convertAndSend(REDIS_QUEUE_KEY+ Constant.WORKING_QUEUE_SUFFIX, JSON.toJSONString(req));
        return "您已成功下单,订单号为"+ req.getOrderCode()+",后厨正在准备预制菜!";
    }
    /**
     * 这里模拟的是做年夜饭的过程方法,该方法用时较长,整个过程需要10秒,但是,这个过程中存在多种意外,该方法可能失败
     *
     * @param req 订单信息
     */
    public void doNewYearEveDinner(OrderEntity req) throws Exception {
        System.out.println("开始做订单 " + req.getOrderCode() + " 的年夜饭");
        Thread.sleep(10000);
        // 这里写个方法模拟报错的场景
        int i = new Random().nextInt(6) + 1;
        if (i ==4) {
            throw new Exception("厨师跑了");
        }
        if (i ==5) {
            throw new Exception("食物跑了");
        }
        if (i ==6) {
            throw new Exception("厨房着火了");
        }
        System.out.println("订单 " + req.getOrderCode() + " 的年夜饭已经完成");
    }

    private void saveOrder(OrderEntity req) {
        //这里假设做的是订单入库操作
        System.out.println("订单 " + req.getOrderCode() + " 已经入库, 做饭开始时间为 "+ new Date());
    }

    /**
     * 根据订单编号修改订单信息
     *
     * @param orderCode 订单编号
     * @param dinnerStatus
     * @param remark
     */
    public void updateOrder(String orderCode, String dinnerStatus, String remark) {
        // 根据订单编号修改订单的出餐结束时间,出餐状态,备注等信息。
        System.out.println("更新订单 "+ orderCode +" 信息,做饭结束时间为 "+ new Date() + ", 出餐状态为"+ dinnerStatus +", 备注为 " +remark);
    }


    public void doListenerWork(String message) {
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(message, "1", 1, TimeUnit.DAYS);
        // 加锁失败,已有消费端在此时对此消息进行处理,这里不再做处理
        if (!flag) {
            return;
        }
        redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, message);
        OrderEntity param = CastUtils.cast(JSON.parseObject(message, OrderEntity.class));
        JSONObject obj = new JSONObject();
        obj.put("msg", message);

        try {
            obj.put("server", InetAddress.getLocalHost().getHostAddress());
            this.doNewYearEveDinner(param);
            this.updateOrder(param.getOrderCode(), "SUCCESS", "成功");
            obj.put("result", "SUCCESS");
        }catch (Exception e) {
            e.printStackTrace();
            this.updateOrder(param.getOrderCode(), "FAIL", e.getMessage());
            obj.put("result", "FAIL");
            obj.put("desc", e.getMessage());
        }finally {
            obj.put("endTime", new Date());
            redisTemplate.opsForZSet().add(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, obj.toJSONString(), Constant.getScoreByDate());
        }
    }


/**
 * 静态工具类
 * 
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
public class Constant {

    // 工作队列后缀
    public static final String WORKING_QUEUE_SUFFIX = "_QUEUE";

    //待处理工作队列后缀
    public static final String PENDING_SUFFIX = "_PENDING";

    // 已处理工作后缀
    public static final String HANDLED_SUFFIX = "_HANDLED";

    //一天的毫秒数
    private static final Integer ONE_DAY_MINI = 86400000;

    /**
     * 根据当前日期计算队列的分数
     *
     * @return
     */
    public static Integer getScoreByDate() {
        return (int)System.currentTimeMillis()/ONE_DAY_MINI;
    }
}

      接口测试

        1、年夜饭下单

        

        2、查询待处理订单

        

        3、查询已处理订单

        

        4、清空已处理且成功的订单

        

        5、清空待处理订单

        

        6、一键重启处理失败的订单

        

        7、订单插队

        

      组件化封装

        完成了以上测试,基本上我想要的功能都已经实现了。但是仔细想了下,上述的功能里除了第一个下单接口是跟业务相关的,剩下的所有接口都是业务无关的。如果我们公司主营业务变了,从年夜饭变成中秋做月饼,端午包棕子,本服务中的大部分代码都可以在调整之后复用。那么,为什么我不整理出一个与业务无关的Redis队列工具出来呢,这样可以极大的提升代码的可复用性。后面有新的业务时,直接引入这个工具包,完善业务部分即可。

        以下是我在反思之后,对代码的整改(只包含有整改或新增的代码)

/**
 * 消息承载类
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Data
public class RedisQueueMsg<T> implements Serializable {

    /**
     * 消息Id
     */
    private String id;

    /**
     * 服务名
     */
    private String serverName;

    /**
     * 数据体
     */
    private T data;
}

package com.leixi.queue.pojo;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
 * 任务处理结果封装类
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Data
public class RedisResultVo implements Serializable {

    private String status;

    private Object data;

    private String desc;

    private Date startTime;

    private Date endTime;

    private String server;

    public RedisResultVo() {
        this.startTime = new Date();
    }

    public RedisResultVo(Object data) {
        this.data = data;
        this.startTime = new Date();
    }
}


/**
 * 抽象的业务处理服务
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
public abstract class QueueBusiBasicService {

    /**
     * 处理任务的方法
     *
     * @param obj 业务类
     */
    public abstract void handle(Object obj);

    /**
     * 处理失败的回调方法
     *
     * @param obj 业务类
     * @param e
     */
    public abstract void callBack(Object obj, Exception e);
}


/**
 * Redis队列的服务层
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Slf4j
@Service
public class QueueCommonService {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private Map<String, QueueBusiBasicService> serviceMap;

    @Value("${leixi.redis-queue-key}")
    private String REDIS_QUEUE_KEY ;


    /**
     * 插入消息到队列
     *
     * @param obj 业务对象
     * @param serverName  服务名
     * @return
     */
    public RedisQueueMsg sendMessage(Object obj, String serverName) {
        RedisQueueMsg msg = new RedisQueueMsg();
        msg.setId(IdUtil.fastSimpleUUID());
        msg.setServerName(serverName);
        msg.setData(obj);
        redisTemplate.delete(JSON.toJSONString(msg));
        redisTemplate.opsForZSet().add(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, JSON.toJSONString(msg), Constant.getScoreByDate());
        redisTemplate.convertAndSend(REDIS_QUEUE_KEY+ Constant.WORKING_QUEUE_SUFFIX, JSON.toJSONString(msg));
        return msg;
    }

    /**
     * 处理队列中的工作
     *
     * @param message
     */
    public void handle(String message) {
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(message, "1", 1, TimeUnit.DAYS);
        // 加锁失败,已有消费端在此时对此消息进行处理,这里不再做处理
        if (!flag) {
            return;
        }
        redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, message);
        RedisQueueMsg param = CastUtils.cast(JSON.parseObject(message, RedisQueueMsg.class));
        RedisResultVo result = new RedisResultVo(param);
        try {
            result.setServer(InetAddress.getLocalHost().getHostAddress());
            serviceMap.get(param.getServerName()).handle(param.getData());
            result.setStatus(Constant.SUCCESS);
        }catch (Exception e) {
            e.printStackTrace();
            serviceMap.get(param.getServerName()).callBack(param.getData(), e);
            result.setStatus(Constant.FAIL);
            result.setDesc(e.getMessage());
        }finally {
            result.setEndTime(new Date());
            redisTemplate.opsForZSet().add(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, JSON.toJSONString(result), Constant.getScoreByDate());
        }
    }

    /**
     * 查询待处理任务
     *
     * @return
     */
    public Object getPendingTask() {
        return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
    }

    /**
     * 清理待处理任务
     *
     * @return
     */
    public Object cleanPendingTask() {
        Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
        if(set.size() > 0) {
            redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY + Constant.PENDING_SUFFIX, set.toArray());
        }
        return "待处理任务已被清空!";
    }

    /**
     * 查询已处理任务
     *
     * @return
     */
    public Object getHandledTask() {
        return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, -1);
    }

    /**
     * 清理某天前的处理任务
     *
     * @param day 天数
     * @return
     */
    public Object cleanHandledTask(Integer day) {
        Set<String> set = redisTemplate.opsForZSet().rangeByScore(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, Constant.getScoreByDate() - day);
        set.forEach(s -> {
            RedisResultVo obj = JSONObject.parseObject(s, RedisResultVo.class);
            if (obj.getStatus().equals(Constant.SUCCESS)) {
                redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, s);
            }
        });
        return redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, System.currentTimeMillis());
    }

    /**
     * 重新处理已处理任务
     */
    public String restartFailedTask() {
        Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, 0, -1);
        StringBuilder sb = new StringBuilder();
        set.forEach(s -> {
            RedisResultVo obj = JSONObject.parseObject(s, RedisResultVo.class);
            if (!obj.getStatus().equals(Constant.SUCCESS)) {
                redisTemplate.opsForZSet().remove(REDIS_QUEUE_KEY+Constant.HANDLED_SUFFIX, s);
                RedisQueueMsg msg = JSON.parseObject(JSON.toJSONString(obj.getData()), RedisQueueMsg.class); ;
                sendMessage(msg.getData(), msg.getServerName());
                sb.append(msg.getId()).append(",");
            }
        });
        return "以下任务被重启: "+ sb;
    }

    /**
     * 任务插队
     *
     * @param msgId 要插队的消息ID
     */
    public RedisQueueMsg cutInLineTask(String msgId) {
        Set<String> set = redisTemplate.opsForZSet().range(REDIS_QUEUE_KEY+ Constant.PENDING_SUFFIX, 0, -1);
        for (String s : set) {
            RedisQueueMsg msg = JSONObject.parseObject(s, RedisQueueMsg.class);
            if (msg.getId().equals(msgId)) {
                CompletableFuture.runAsync(() -> {
                    this.handle(s);
                });
                return msg;
            }
        }
        throw new RuntimeException("未找到相关任务,该项任务已经在执行了!");
    }
}

/**
 * 业务服务类,继承抽象类务类,实现业务逻辑
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@Service(Constant.DINNER_SERVER)
public class QueueDinnerService extends QueueBusiBasicService {

    @Autowired
    private QueueCommonService queueCommonService;

    /**
     * 年夜饭下单
     *
     * @param entity 订单信息
     * @return
     */
    public Object orderNewYearEveDinner(OrderEntity entity) {
        // 存储订单信息
        saveOrder(entity);
        queueCommonService.sendMessage(entity, Constant.DINNER_SERVER);
        // 异步开始做菜
        return "您已成功下单,订单号为"+ entity.getOrderCode()+",后厨正在准备预制菜!";
    }

    /**
     * 这里模拟的是做年夜饭的过程方法,该方法用时较长,整个过程需要10秒,但是,这个过程中存在多种意外,该方法可能失败
     *
     * @param req 订单信息
     */
    private void doNewYearEveDinner(OrderEntity req) throws Exception {
        System.out.println("开始做订单 " + req.getOrderCode() + " 的年夜饭");
        Thread.sleep(10000);
        int i = new Random().nextInt(6) + 1;
        if (i ==4) {
            throw new Exception("厨师跑了");
        }
        if (i ==5) {
            throw new Exception("食物跑了");
        }
        if (i ==6) {
            throw new Exception("厨房着火了");
        }
        System.out.println("订单 " + req.getOrderCode() + " 的年夜饭已经完成");
    }

    /**
     * 保存订单信息
     *
     * @param req
     */
    private void saveOrder(OrderEntity req) {
        //这里假设做的是订单入库操作
        System.out.println("订单 " + req.getOrderCode() + " 已经入库, 做饭开始时间为 "+ new Date());
    }

    /**
     * 根据订单编号修改订单信息
     *
     * @param orderCode 订单编号
     * @param dinnerStatus
     * @param remark
     */
    private void updateOrder(String orderCode, String dinnerStatus, String remark) {
        // 根据订单编号修改订单的出餐结束时间,出餐状态,备注等信息。
        System.out.println("更新订单 "+ orderCode +" 信息,做饭结束时间为 "+ new Date() + ", 出餐状态为"+ dinnerStatus +", 备注为 " +remark);
    }


    /**
     * 处理订单
     *
     * @param obj 业务类
     */
    @Override
    @SneakyThrows
    public void handle(Object obj) {
        OrderEntity entity = JSON.parseObject(JSON.toJSONString(obj), OrderEntity.class);
        doNewYearEveDinner(entity);
        updateOrder(entity.getOrderCode(), Constant.SUCCESS, "出餐成功");
    }

    @Override
    public void callBack(Object obj, Exception e) {
        OrderEntity entity = JSON.parseObject(JSON.toJSONString(obj), OrderEntity.class);
        System.out.println("更新订单 "+ entity.getOrderCode() +" 信息,做饭结束时间为 "+ new Date() + ", 出餐状态为FAIL, 原因为 " +e.getMessage());
    }
}

@Component
public class RedisQueueListener implements MessageListener {
    @Autowired
    private QueueCommonService service;

    private final Object lock = new Object();

    @Override
    public void onMessage(Message message, byte[] pattern)  {
        synchronized (lock) {
            service.handle(message.toString());
        }
    }
}


/**
 * 业务控制层
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@RestController
@RequestMapping("/dinner")
public class DinnerController {

    @Autowired
    private QueueDinnerService dinnerService;

    private int i = 0;


    @GetMapping("/orderDinner")
    public Object orderDinner() {
        OrderEntity entity = new OrderEntity();
        entity.setOrderCode("Order" + (++i));
        entity.setCustomerName("第"+i+"位客户");
        return dinnerService.orderNewYearEveDinner(entity);
    }

}



/**
 * Redis工具控制器
 *
 * @author leixiyueqi
 * @since 2024/06/15 22:00
 */
@RestController
@RequestMapping("/redisQueue")
public class RedisQueueController {
    @Autowired
    private QueueCommonService service;

    @GetMapping("/getPendingTask")
    public Object getPendingTask() {
       return service.getPendingTask();
    }

    @GetMapping("/cleanPendingTask")
    public Object cleanPendingTask() {
        return service.cleanPendingTask();
    }

    @GetMapping("/getHandledTask")
    public Object getHandledTask() {
        return service.getHandledTask();
    }

    @GetMapping("/cleanOldSucceedTask")
    public Object cleanHandledTask(@RequestParam("day") Integer day) {
        return service.cleanHandledTask(day);
    }

    /**
     * 这里还有最后一个问题, 把已办里的错误的信息摘除出来,重新走请求。并且反馈哪些信息重新走了请求。
     */
    @GetMapping("/restartFailedTask")
    public Object restartFailedTask() {
        service.restartFailedTask();
        return "重启失败的服务成功";
    }

    @GetMapping("/cutInLineTask")
    public Object cutInLineTask(@RequestParam("msgId")  String msgId) {
        RedisQueueMsg msg = service.cutInLineTask(msgId);
        return "任务 "+ JSON.toJSONString(msg) + "插队成功!";
    }

}

        组件化的调整就是把属于Redis队列的操作与业务类操作完全分开,这样,以后有别的业务需要引入组件处理时,只需要写个业务服务继承QueueBusiBasicService即可,最大限度的复用了队列的这套机制和代码。

        注意,本组件有它特定的适用场景:处理任务的频度不高,每次处理任务用时较长,而且任务有一定的小概率失败,失败之后重新处理不会影响最终处理结果。

        完成这个工具研发后,我结合之前在网上查到的Redis队列的一些案例,发现用别的方案可以更简单的去实现我要的效果,比如直接用Redis队列详解(springboot实战)里的方案,仅仅是因为不想在代码里写 while(true) 这种不是优雅的代码,再加上用Listener的方式长时间没对消息进行消费时,消息会丢失,因此才额外花费了这么多的功夫来打补丁。果然方向选错了,工作量会成倍的增加,希望看到本文的读者能引以为戒,不要自误啊。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/750588.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

phpMyAdmin 4.0.10 文件包含 -> getshell

phpMyAdmin 4.0.10 文件包含 -> getshell 前言&#xff1a;这里这个漏洞相对来说审计起来不是特别难&#xff0c;但是对于初学者还是有点挑战性的&#xff0c;从zkaq web课过来的小伙伴想挑战一下自己代码审计能力的话&#xff0c;可以直接跳到最后下载源码&#xff0c;聂风…

【总结】在SpringBoot项目中如何动态切换数据源、数据库?(可直接CV)

注意&#xff1a;文章若有错误的地方&#xff0c;欢迎评论区里面指正 &#x1f36d; 前言 本文参考若依源码&#xff0c;介绍了如何在SpringBoot项目中使用AOP和自定义注解实现MySQL主从数据库的动态切换&#xff0c;当从库故障时&#xff0c;能自动切换到主库&#xff0c;确…

手写SpringMVC之ApplicationContextListener

什么是Spring MVC&#xff1f; Spring Web MVC是基于Servlet API构建的原始Web框架&#xff0c;从一开始就包含在Spring Framework中。正式名称“Spring Web MVC”来自其源模块的名称&#xff08; spring-webmvc &#xff09;&#xff0c;但它通常被称为“Spring MVC”。 手写…

SD-WAN解决多云环境的挑战

随着SD-WAN成为远程用户访问基于云的应用程序的主要途径&#xff0c;促使越来越多的部署多云环境以优化性能的企业、IT专业人员选择支持安全、低延迟且易于管理的SD-WAN技术。与此同时&#xff0c;SD-WAN供应商和云服务供应商之间的合作&#xff0c;有助于跨多个云供应商轻松管…

分别使用netty和apache.plc4x测试读取modbus协议的设备信号

记录一下常见的工业协议数据读取方法 目录 前言Modbus协议说明Netty 读取测试使用plc4x 读取测试结束语 前言 Modbus 是一种通讯协议&#xff0c;用于在工业控制系统中进行数据通信和控制。Modbus 协议主要分为两种常用的变体&#xff1a;Modbus RTU 和 Modbus TCP/IP Modbus …

基于51单片机太阳能风能风光互补路灯控制器

一.硬件方案 本设计由STC89C52单片机电路太阳能电池板电路风机发电电路锂电池充电保护电路升压电路稳压电路光敏电阻电路4位高亮LED灯电路2档拨动开关电路电源电路设计而成。 二.设计功能 &#xff08;1&#xff09;采用风机和太阳能电池板给锂电池充电&#xff0c;具有充电…

微服务开发 —— 项目环境搭建篇

环境搭建 Linux 环境搭建 Linux 环境搭建大家可以使用虚拟机 VMware、VirtualBox 等应用创建虚拟机&#xff0c;使用Vagrant也可以快捷搭建虚拟环境&#xff1b;Windows 中有 WSL2&#xff0c;Windows 中的 Docker 也对 WSL 进行了支持&#xff0c;也是一个不错的选择。或者可…

麒麟系统安装Redis

一、背景 如前文&#xff08;《麒麟系统安装MySQL》&#xff09;所述。 二、下载Redis源码 官方未提供麒麟系统的Redis软件&#xff0c;须下载源码编译。 下载地址&#xff1a;https://redis.io/downloads 6.2.14版本源码下载地址&#xff1a;https://download.redis.io/re…

构建LangChain应用程序的示例代码:46、使用 Meta-Prompt 构建自我改进代理的 LangChain 实现

Meta-Prompt 实现 摘要&#xff1a; 本文介绍了 Noah Goodman 提出的 Meta-Prompt 方法的 LangChain 实现&#xff0c;该方法用于构建能够自我反思和改进的智能代理。 核心思想&#xff1a; Meta-Prompt 的核心思想是促使代理反思自己的性能&#xff0c;并修改自己的指令。…

降低IT运营成本,提升客户体验 |LinkSLA亮相第十届CDIE

6月25-26日&#xff0c;中国数字化创新博览会&#xff08;CDIE 2024&#xff09;在上海张江科学会堂举行。本届展览主题为“AI创新&#xff0c;引领商业增长新格局”&#xff0c;旨在交流企业在数字化时代&#xff0c;如何以科技为驱动&#xff0c;在转型中如何把握机遇&#x…

文本编辑命令和正则表达式

一、 编辑文本的命令 正则表达式匹配的是文本内容&#xff0c;Linux的文本三剑客&#xff0c;都是针对文本内容。 文本三剑客 grep&#xff1a;过滤文本内容 sed&#xff1a;针对文本内容进行增删改查 &#xff08;本文不相关&#xff09; awk&#xff1a;按行取列 &#x…

Web服务器与Apache(虚拟主机基于ip、域名和端口号)

一、Web基础 1.HTML概述 HTML&#xff08;Hypertext Markup Language&#xff09;是一种标记语音,用于创建和组织Web页面的结构和内容&#xff0c;HTML是构建Web页面的基础&#xff0c;定义了页面的结构和内容&#xff0c;通过标记和元素来实现 2.HTML文件结构 <html>…

Transformer教程之什么是Transformer

在过去的几年里&#xff0c;Transformer 模型已经成为了自然语言处理&#xff08;NLP&#xff09;领域的主流技术。无论是机器翻译、文本生成还是语音识别&#xff0c;Transformer 都表现出了非凡的性能。那么&#xff0c;什么是 Transformer&#xff1f;它是如何工作的&#x…

LeetCode 剑指 Offer 40

// void help(int[] a,int l,int r,int k){ // if(k0) return; // if(r-l1 < k){ // for(int il;i<r;i){ // ans[cnt] a[i]; // } // return; // } // // 快排的基准值 // int base a[l]; // int i l, j r; // while(i<j){ // while(i<j &&…

1961 Springboot自习室预约系统idea开发mysql数据库web结构java编程计算机网页源码maven项目

一、源码特点 springboot 自习室预约管理系统是一套完善的信息系统&#xff0c;结合springboot框架和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用springboot框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统具有完整的源代码和数据库…

鸿蒙开发系统基础能力:【Timer (定时器)】

定时器 setTimeout setTimeout(handler[,delay[,…args]]): number 设置一个定时器&#xff0c;该定时器在定时器到期后执行一个函数。 参数 参数名类型必填说明handlerFunction是定时器到期后执行函数。delaynumber否延迟的毫秒数&#xff0c;函数的调用会在该延迟之后发生…

谷歌SEO在外贸推广中的应用效果如何?

谷歌SEO在外贸推广中非常有效。通过优化网站&#xff0c;可以提高在搜索结果中的排名&#xff0c;这意味着更多的潜在客户会看到你的产品和服务。 一个高排名的网站能带来更多自然流量&#xff0c;不需要花费广告费用。这种流量通常质量较高&#xff0c;因为用户是主动搜索相关…

Java 流式编程的7个技巧,必学!

作为Java开发者&#xff0c;我们还没有完全掌握Java Streams这个多功能工具的威力。在这里&#xff0c;你将发现一些有价值的技巧&#xff0c;可以作为参考并应用到你的下一个项目中。 Java Streams在很多年前就被引入了&#xff0c;但作为Java开发者&#xff0c;我们还没有完…

2.4G特技翻斗车方案定制

遥控翻斗车不仅能够提供基本的前进、后退、左转和右转功能&#xff0c;还设计有多种特技动作和互动模式&#xff0c;以增加娱乐性和互动性。 1、无线遥控&#xff1a;玩具翻斗车一般通过2.4G无线遥控器进行控制&#xff0c;允许操作者在一定距离内远程操控车辆。 2、炫彩灯光…

安装VEX外部编辑器

Houdini20配置VEX外部编辑器方法_哔哩哔哩_bilibili 下载并安装Visual Studio Code软件&#xff1a;Download Visual Studio Code - Mac, Linux, Windows 在Visual Studio Code软件内&#xff0c;安装相关插件&#xff0c;如&#xff1a; 中文汉化插件vex插件 安装Houdini Expr…