spring-boot redis stream消息队列demo-及死信简单处理

Redis stream 是 Redis 5 引入的一种新的数据结构,它是一个高性能、高可靠性的消息队列,主要用于异步消息处理和流式数据处理。在此之前,想要使用 Redis 实现消息队列,通常可以使用例如:列表,有序集合、发布与订阅 3 种数据结构。但是 stream 相比它们具有以下的优势:

  • 支持范围查找:内置的索引功能,可以通过索引来对消息进行范围查找
  • 支持阻塞操作:避免低效的反复轮询查找消息
  • 支持 ACK:可以通过确认机制来告知已经成功处理了消息,保证可靠性
  • 支持多个消费者:多个消费者可以同时消费同一个流,Redis 会确保每个消费者都可以独立地消费流中的消息

情况

当前项目,是在window server 上 部署 rocketmq (ps 单体)使用 消息队列,使用三方exe实现 注册到 window 服务中(实现开机自启-自行搜索实现吧)。
在这里插入图片描述
在这里插入图片描述

问题

当前发现一个问题,window 被关机(主动、断电等),mq的broker无法启动,原因是 delayOffset.json文件损坏造成,文件内容变成如下图。
在这里插入图片描述
修改为,重新启动没问题了
在这里插入图片描述
每次重新都需要处理,虽然可以考虑使用脚本 处理,但是徒增成本。就想着 使用redis实现 进行替换(项目规模较小)。redis-stream 需要将 版本升级为 5(当前是4)。
Redis for Windows
redis下载 直接替换就行。

死信问题

网上看到 处理的方式有好几种。定时调度,处理pending信息(直接ack、重新消费、转移消费组等);人工运维处理 等。看了很多,结合 spring 提供的函数。
StringRedisTemplate.redisTemplate.opsForStream()返回的接口类StreamOperations 参考spring文档,可以用的方法有 acknowledgeadddeletecreateGroupdeleteConsumerdestroyGroupconsumersgroupsinfopendingsizerangereadreverseRangetrim等。
没有包含命令:转移消费 xClaim,实际在RedisStreamCommands接口中,通过代码也能解决;

redisTemplate.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands().xClaim("stream".getBytes(), "group", "consumer", Duration.ofSeconds(10), RecordId.of("streamId")))

也没有包含命令:设置消费组的起始消息 ID xgroupSetid,实际在RedisStreamAsyncCommands接口中,代码在比较底层,不能拿来就用。

RedisFuture<String> xgroupSetid(StreamOffset<K> streamOffset, K group);
当前方式

当前,实现的方式。在项目 重启 或 定时调度(暂无代码demo),通过 pending 获取消息 streamId,先复制消息add,在确认旧消息acknowledge,最后删除旧消息delete

ps:自己感觉还行吧(迷之自信O(∩_∩)O哈哈~),不要拿来就用啊(⊙o⊙)…

redis桌面管理

下载redis可视化管理工具:AnotherRedisDesktopManager
在这里插入图片描述

参考

理解 Redis 新特性:Stream
springboot + redis stream做轻量级消息队列
Redis Stream实现消息队列
redis Stream消息队列 redision redis stream消息队列pending
使用redis流和spring数据获取挂起的消息
在SpringBoot中使用RedisTemplate重新消费Redis Stream中未ACK的消息

说明

redis-stream命令

  • XADD:向流中添加新的消息。
  • XREAD:从流中读取消息。
  • XREADGROUP:从消费组中读取消息。
  • XRANGE:根据消息 ID 范围读取流中的消息。
  • XREVRANGE:与 XRANGE 类似,但以相反顺序返回结果。
  • XDEL:从流中删除消息。
  • XTRIM:根据 MAXLEN 参数修剪流的长度。
  • XLEN:获取流的长度。
  • XGROUP:管理消费组,包括创建、删除和修改。
  • XACK:确认消费组中的消息已被处理。
  • XPENDING:查询消费组中挂起(未确认)的消息。
  • XCLAIM:将挂起的消息从一个消费者转移到另一个消费者。
  • XINFO:获取流、消费组或消费者的详细信息。

pom

spring-boot 加 redis依赖,简单项目,测试用

<parent>
    <artifactId>spring-boot-starter-parent</artifactId>
    <groupId>org.springframework.boot</groupId>
    <version>2.7.0</version>
    <relativePath/>
</parent>
<properties>
    <java.version>1.8</java.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <spring-boot.version>2.7.0</spring-boot.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring-boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

配置

server:
  port: 8888
  servlet:
    context-path: /
spring:
  redis:
    msg:
      listener: false
    host: 127.0.0.1
    port: 6379
    database: 1
    client-type: lettuce
    lettuce:
      pool:
        max-active: 8

代码结构

引入spring boot的 StringRedisTemplate

    private static StringRedisTemplate redisTemplate;
    @Autowired
    public void setRedisTemplate(StringRedisTemplate redisTemplate) {
        RedisStreamConfig.redisTemplate = redisTemplate;
    }

声明变量。redis 的 key键名称,消费者 组名,消费者 客户端名。

final static String STREAM = "tsp", GROUP = "tsp-g-1", CONSUMER = "tsp-c-1";

生产者-发送消息

这块比较简单,直接调用就行

/** 生产者-发送消息 */
    public static void push(String msg){
        // 创建消息记录, 以及指定stream
        StringRecord record = StreamRecords.string(Collections.singletonMap("data", msg)).withStreamKey(STREAM);
        // 将消息添加至消息队列中 XADD stream [MAXLEN len] id field value [field value ...]
        redisTemplate.opsForStream().add(record);
        log.info("redis-消息队列-stream, {} ,send msg: {}", STREAM,msg);
    }

消费者

消费者-监听类,实际的业务逻辑处理内容。消息队列 创建 会有测试消息,需要跳过。实际业务逻辑需要考虑,挂起消息 重新消费情况(解决 死信问题的简单方法)

/** 消费者 监听 */
    public static class TspStreamListener implements StreamListener<String, ObjectRecord<String,String>> {
        @Override
        public void onMessage(ObjectRecord<String, String> message) {
            RecordId messageId = message.getId();
            // 消息的key和value
            String string = message.getValue();
            if("T".equals(string)){
                log.info("消费者-监听>>>测试消息-ack. msgId={}, stream={}, body={}", messageId, message.getStream(), string);
                redisTemplate.opsForStream().acknowledge(GROUP, message);
                return;
            }
            log.info("消费者-监听>>>get msg. msgId={}, stream={}, body={}", messageId, message.getStream(), string);
            //业务逻辑,需要考虑 被挂起的消息 重新消费情况
            try {
                log.info("消费者-监听>>>睡眠10s,模拟耗时逻辑-start");
                TimeUnit.SECONDS.sleep(10);
                log.info("消费者-监听>>>睡眠10s,模拟耗时逻辑-end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("消费者-监听>>>手动确认消息");
            redisTemplate.opsForStream().acknowledge(GROUP, message);
            log.info("消费者-监听>>>end");
        }
    }

消费者-注册,关闭自动ack,重连自动消费上次处理(已经在pending队列里面了)的下一个消息,拉取消息超时时间50s,每批数量1,使用默认线程池,传递数据类型 String,异常处理 打印错误日志。最后 start 启动消费。

    @Bean
    @ConditionalOnProperty(name = "spring.redis.msg.listener",havingValue = "true")
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> tspConsumerListener(
            RedisConnectionFactory factory){
        //spring-data-redis 2.3.1.RELEASE及更高版本中,createGroup如果不存在,则会自动使用创建流
        return streamContainer(StreamOffset.create(STREAM, ReadOffset.lastConsumed()),Consumer.from(GROUP,CONSUMER)
                ,factory,new TspStreamListener());
    }
    private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(StreamOffset<String> offset
            ,Consumer consumer, RedisConnectionFactory factory, StreamListener<String, ObjectRecord<String, String>> listener) {
        // pollTimeout 拉取消息超时时间,targetType 传递的数据类型, executor 线程池
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer
                .create(factory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder().pollTimeout(Duration.ofSeconds(50)).batchSize(1).targetType(String.class).build());
        //指定消费最新的消息
        try {
            //创建消费者,接收上次处理未ACK消费的消息,指定消费者对象,autoAcknowledge 关闭自动ack确认
            container.register(StreamMessageListenerContainer.StreamReadRequest
                    .builder(offset).errorHandler((error) -> log.error(error.getMessage()))
                    .cancelOnError(e -> false).consumer(consumer).autoAcknowledge(false)
                    .build(), listener);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
        container.start();
        return container;
    }

预处理

类加载完毕,执行方法。预处理操作,可以使用 spring 启动后处理类实现。本demo就先这样了。

本方法 是为了 解决,项目启动报错(无消息队列、无消费组)和程序重启死信问题(꒦_꒦)

    @PostConstruct
    private void start(){
    }

如果redis 没有 对应key 的 stream 消息队列,消费者启动后会频繁报错。如下代码会在 项目启动 发送测试消息,创建 消息队列,并限制 消息队列 长度(限制内存消耗)。

ps:我好菜呀,不知道spring是否允许消费者 自动创建 消息队列o(╥﹏╥)o

// 生产者 创建 消息队列,防止启动 消费者 报错
        if(!(Boolean.TRUE.equals(redisTemplate.hasKey(STREAM)))){
            log.info("没有key,测试发送(创建key)");
            push("T");
            //限制 消息队列 容量 XTRIM stream MAXLEN len
            redisTemplate.opsForStream().trim(STREAM,1000L);
        }

判断消息队列,是否存在消费组。没有创建消费组

// 创建 消息队列-消费者
        if (redisTemplate.opsForStream().groups(STREAM).isEmpty()) {
            log.info("redis-消息队列-stream,createGroup,{} {}",STREAM,GROUP);
            //XGROUP CREATE stream group [id|$|0] [MKSTREAM],使用 $ 表示仅消费新消息,或者使用 0 表示消费流中的所有消息
            redisTemplate.opsForStream().createGroup(STREAM,GROUP);
        }

死信处理代码

private void handlerPending(String key,String group){
        //判断是否存在挂起的消息 XPENDING stream group [start stop count] [consumer]
        PendingMessagesSummary pending = redisTemplate.opsForStream().pending(key, group);
        long size;
        if(pending == null || (size = pending.getTotalPendingMessages()) <= 0 ) {
            log.debug("redis-消息队列,{},{},暂无挂起消息pending", key, group);
            return;
        }
        //---------------------
        String minId = pending.minMessageId(),maxId = pending.maxMessageId(),id;
        // 从挂起消息开始处理 [1],minId : 1706178903044-0,maxId : 1706178903044-0
        log.info("redis-消息队列,{},{},pending-挂起消息[{}],minId : {},maxId : {}",key,group,size,minId,maxId);
        // - + 4 ---------------------获取挂起所有信息 streamId
        PendingMessages msgIds = redisTemplate.opsForStream().pending(key, group, Range.closed("-", "+"), size);
        log.info("redis-消息队列,{},{},pending-挂起消息,{}",key,group,msgIds);
        List<MapRecord<String, Object, Object>> list;
        // ---------------------循环处理,可以考虑 异步执行
        for (PendingMessage msgId : msgIds) {
            id = msgId.getId().getValue();
            //PendingMessage{id=1706178903044-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=66775, totalDeliveryCount=1}
            log.info("redis-消息队列,{},{},pending-挂起消息>>,{}",key,group,msgId);
            //XRANGE key start end [COUNT count]
            list = redisTemplate.opsForStream().range(key, Range.just(msgId.getIdAsString()));
            log.info("redis-消息队列,{},{},range-挂起消息>> {},{}",key,group,id,list);
            if(list == null || list.isEmpty()){
                continue;
            }
            // 开始 结束 id相同,只返回 一个消息结果
            MapRecord<String, Object, Object> record = list.get(0);
            //MapBackedRecord{recordId=1706233950802-0, kvMap={data=401}}
            if("1706233940011-0".equals(id)){
                MapRecord<String, Object, Object> copy = record.withId(RecordId.autoGenerate());
                log.info("redis-消息队列,{},{},range-挂起消息>0>add+++++copy {},{}",key,group,copy.getStream(),copy);
                //XADD stream-name id field value [field value]
                RecordId add = redisTemplate.opsForStream().add(copy);
                log.info("redis-消息队列,{},{},range-挂起消息>0>add+++++copy {},{}",key,group,add,copy);
                log.info("redis-消息队列,{},{},range-挂起消息>0>ack^^^^^old {},{}",key,group,record.getStream(),record);
                //XACK stream group id [id id ...]
                Long siz = redisTemplate.opsForStream().acknowledge(GROUP, record);
                log.info("redis-消息队列,{},{},range-挂起消息>0>ack^^^^^old [{}],{}",key,group,siz,record);
                log.info("redis-消息队列,{},{},range-挂起消息>0>del-----old {},{}",key,group,record.getStream(),record);
                //XDEL key ID [ID ...]
                siz = redisTemplate.opsForStream().delete(record);
                log.info("redis-消息队列,{},{},range-挂起消息>0>del-----old [{}],{}",key,group,siz,record);
            }else{
                log.info("redis-消息队列,{},{},range-挂起消息>0> {},{},{}",key,group,record.getStream(),id,record);
            }
        }
        log.info("redis-消息队列,{},{},pending-挂起消息-end",key,group);
    }

汇总

代码

package demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @author z.y.l
 * @version v1.0
 * @date 2024/1/25
 */
@Configuration
@Import (RedisAutoConfiguration.class)
public class RedisStreamConfig {
    private static final Logger log = LoggerFactory.getLogger(RedisStreamConfig.class);
    final static String STREAM = "tsp", GROUP = "tsp-g-1", CONSUMER = "tsp-c-1";
    private static StringRedisTemplate redisTemplate;
    @Autowired
    public void setRedisTemplate(StringRedisTemplate redisTemplate) {
        RedisStreamConfig.redisTemplate = redisTemplate;
    }
    /** 生产者-发送消息 */
    public static void push(String msg){
        // 创建消息记录, 以及指定stream
        StringRecord record = StreamRecords.string(Collections.singletonMap("data", msg)).withStreamKey(STREAM);
        // 将消息添加至消息队列中 XADD stream [MAXLEN len] id field value [field value ...]
        redisTemplate.opsForStream().add(record);
        log.info("redis-消息队列-stream, {} ,send msg: {}", STREAM,msg);
    }
    /** 项目启动 预处理,无消息队列(消费者-启动报错)、限制容量、手动创建消费组、处理异常挂起的消息 */
    @PostConstruct
    private void start(){
        // 生产者 创建 消息队列,防止启动 消费者 报错
        if(!(Boolean.TRUE.equals(redisTemplate.hasKey(STREAM)))){
            log.info("没有key,测试发送(创建key)");
            push("T");
            //限制 消息队列 容量 XTRIM stream MAXLEN len
            redisTemplate.opsForStream().trim(STREAM,1000L);
        }
        // 创建 消息队列-消费者
        if (redisTemplate.opsForStream().groups(STREAM).isEmpty()) {
            log.info("redis-消息队列-stream,createGroup,{} {}",STREAM,GROUP);
            //XGROUP CREATE stream group [id|$|0] [MKSTREAM],使用 $ 表示仅消费新消息,或者使用 0 表示消费流中的所有消息
            redisTemplate.opsForStream().createGroup(STREAM,GROUP);
        }else{
            // 处理 挂起消息,也可以 使用调度 定时处理
            // 当前 挂起消息处理方式:复制新消息-ack旧消息-删除旧消息
            // 也可以使用 转移 消费组
            handlerPending(STREAM,GROUP);
        }
    }
    private void handlerPending(String key,String group){
        //判断是否存在挂起的消息 XPENDING stream group [start stop count] [consumer]
        PendingMessagesSummary pending = redisTemplate.opsForStream().pending(key, group);
        long size;
        if(pending == null || (size = pending.getTotalPendingMessages()) <= 0 ) {
            log.debug("redis-消息队列,{},{},暂无挂起消息pending", key, group);
            return;
        }
        String minId = pending.minMessageId(),maxId = pending.maxMessageId(),id;
        // 从挂起消息开始处理 [1],minId : 1706178903044-0,maxId : 1706178903044-0
        log.info("redis-消息队列,{},{},pending-挂起消息[{}],minId : {},maxId : {}",key,group,size,minId,maxId);
        // - + 4 所有
        PendingMessages msgIds = redisTemplate.opsForStream().pending(key, group, Range.closed("-", "+"), size);
        log.info("redis-消息队列,{},{},pending-挂起消息,{}",key,group,msgIds);
        List<MapRecord<String, Object, Object>> list;
        for (PendingMessage msgId : msgIds) {
            id = msgId.getId().getValue();
            //PendingMessage{id=1706178903044-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=66775, totalDeliveryCount=1}
            log.info("redis-消息队列,{},{},pending-挂起消息>>,{}",key,group,msgId);
            //XRANGE key start end [COUNT count]
            list = redisTemplate.opsForStream().range(key, Range.just(msgId.getIdAsString()));
            log.info("redis-消息队列,{},{},range-挂起消息>> {},{}",key,group,id,list);
            if(list == null || list.isEmpty()){
                continue;
            }
            // 开始 结束 id相同,只返回 一个消息结果
            MapRecord<String, Object, Object> record = list.get(0);
            //MapBackedRecord{recordId=1706233950802-0, kvMap={data=401}}
            if("1706233940011-0".equals(id)){
                MapRecord<String, Object, Object> copy = record.withId(RecordId.autoGenerate());
                log.info("redis-消息队列,{},{},range-挂起消息>0>add+++++copy {},{}",key,group,copy.getStream(),copy);
                //XADD stream-name id field value [field value]
                RecordId add = redisTemplate.opsForStream().add(copy);
                log.info("redis-消息队列,{},{},range-挂起消息>0>add+++++copy {},{}",key,group,add,copy);
                log.info("redis-消息队列,{},{},range-挂起消息>0>ack^^^^^old {},{}",key,group,record.getStream(),record);
                //XACK stream group id [id id ...]
                Long siz = redisTemplate.opsForStream().acknowledge(GROUP, record);
                log.info("redis-消息队列,{},{},range-挂起消息>0>ack^^^^^old [{}],{}",key,group,siz,record);
                log.info("redis-消息队列,{},{},range-挂起消息>0>del-----old {},{}",key,group,record.getStream(),record);
                //XDEL key ID [ID ...]
                siz = redisTemplate.opsForStream().delete(record);
                log.info("redis-消息队列,{},{},range-挂起消息>0>del-----old [{}],{}",key,group,siz,record);
            }else{
                log.info("redis-消息队列,{},{},range-挂起消息>0> {},{},{}",key,group,record.getStream(),id,record);
            }
        }
        log.info("redis-消息队列,{},{},pending-挂起消息-end",key,group);
    }
    @Bean
    @ConditionalOnProperty(name = "spring.redis.msg.listener",havingValue = "true")
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> tspConsumerListener(
            RedisConnectionFactory factory){
        //spring-data-redis 2.3.1.RELEASE及更高版本中,createGroup如果不存在,则会自动使用创建流
        return streamContainer(StreamOffset.create(STREAM, ReadOffset.lastConsumed()),Consumer.from(GROUP,CONSUMER)
                ,factory,new TspStreamListener());
    }
    private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(StreamOffset<String> offset
            ,Consumer consumer, RedisConnectionFactory factory, StreamListener<String, ObjectRecord<String, String>> listener) {
        // pollTimeout 拉取消息超时时间,targetType 传递的数据类型, executor 线程池
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer
                .create(factory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder().pollTimeout(Duration.ofSeconds(50)).batchSize(1).targetType(String.class).build());
        //指定消费最新的消息
        try {
            //创建消费者,接收上次处理未ACK消费的消息,指定消费者对象,autoAcknowledge 关闭自动ack确认
            container.register(StreamMessageListenerContainer.StreamReadRequest
                    .builder(offset).errorHandler((error) -> log.error(error.getMessage()))
                    .cancelOnError(e -> false).consumer(consumer).autoAcknowledge(false)
                    .build(), listener);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
        container.start();
        return container;
    }
    
    /** 消费者 监听 */
    public static class TspStreamListener implements StreamListener<String, ObjectRecord<String,String>> {
        @Override
        public void onMessage(ObjectRecord<String, String> message) {
            RecordId messageId = message.getId();
            // 消息的key和value
            String string = message.getValue();
            if("T".equals(string)){
                log.info("消费者-监听>>>测试消息-ack. msgId={}, stream={}, body={}", messageId, message.getStream(), string);
                redisTemplate.opsForStream().acknowledge(GROUP, message);
                return;
            }
            log.info("消费者-监听>>>get msg. msgId={}, stream={}, body={}", messageId, message.getStream(), string);
            //业务逻辑,需要考虑 被挂起的消息 重新消费情况
            try {
                log.info("消费者-监听>>>睡眠10s,模拟耗时逻辑-start");
                TimeUnit.SECONDS.sleep(10);
                log.info("消费者-监听>>>睡眠10s,模拟耗时逻辑-end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("消费者-监听>>>手动确认消息");
            redisTemplate.opsForStream().acknowledge(GROUP, message);
            log.info("消费者-监听>>>end");
        }
    }
}

redis

在这里插入图片描述

日志

2024-01-26 10:33:54.566 - [main] INFO  o.s.b.w.s.c.ServletWebServerApplicationContext.prepareWebApplicationContext(292) - Root WebApplicationContext: initialization completed in 1162 ms
2024-01-26 10:33:58.388 - [main] INFO  demo.RedisStreamConfig.handlerPending(85) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息[3],minId : 1706178903044-0,maxId : 1706233950802-0
2024-01-26 10:33:58.396 - [main] INFO  demo.RedisStreamConfig.handlerPending(88) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息,PendingMessages{groupName='tsp-g-1', range=[--+], pendingMessages=[PendingMessage{id=1706178903044-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=57524430, totalDeliveryCount=1}, PendingMessage{id=1706233940011-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=2477103, totalDeliveryCount=1}, PendingMessage{id=1706233950802-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=1968963, totalDeliveryCount=1}]}
2024-01-26 10:33:58.397 - [main] INFO  demo.RedisStreamConfig.handlerPending(93) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息>>,PendingMessage{id=1706178903044-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=57524430, totalDeliveryCount=1}
2024-01-26 10:33:58.404 - [main] INFO  demo.RedisStreamConfig.handlerPending(95) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>> 1706178903044-0,[]
2024-01-26 10:33:58.405 - [main] INFO  demo.RedisStreamConfig.handlerPending(93) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息>>,PendingMessage{id=1706233940011-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=2477103, totalDeliveryCount=1}
2024-01-26 10:33:58.409 - [main] INFO  demo.RedisStreamConfig.handlerPending(95) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>> 1706233940011-0,[MapBackedRecord{recordId=1706233940011-0, kvMap={data=303}}]
2024-01-26 10:33:58.409 - [main] INFO  demo.RedisStreamConfig.handlerPending(104) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>0>add+++++copy tsp,MapBackedRecord{recordId=*, kvMap={data=303}}
2024-01-26 10:33:58.413 - [main] INFO  demo.RedisStreamConfig.handlerPending(106) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>0>add+++++copy 1706236438413-0,MapBackedRecord{recordId=*, kvMap={data=303}}
2024-01-26 10:33:58.413 - [main] INFO  demo.RedisStreamConfig.handlerPending(107) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>0>ack^^^^^old tsp,MapBackedRecord{recordId=1706233940011-0, kvMap={data=303}}
2024-01-26 10:33:58.418 - [main] INFO  demo.RedisStreamConfig.handlerPending(109) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>0>ack^^^^^old [1],MapBackedRecord{recordId=1706233940011-0, kvMap={data=303}}
2024-01-26 10:33:58.419 - [main] INFO  demo.RedisStreamConfig.handlerPending(110) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>0>del-----old tsp,MapBackedRecord{recordId=1706233940011-0, kvMap={data=303}}
2024-01-26 10:33:58.420 - [main] INFO  demo.RedisStreamConfig.handlerPending(112) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>0>del-----old [1],MapBackedRecord{recordId=1706233940011-0, kvMap={data=303}}
2024-01-26 10:33:58.421 - [main] INFO  demo.RedisStreamConfig.handlerPending(93) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息>>,PendingMessage{id=1706233950802-0, consumer=tsp-g-1:tsp-c-1, elapsedTimeSinceLastDeliveryMS=1968963, totalDeliveryCount=1}
2024-01-26 10:33:58.422 - [main] INFO  demo.RedisStreamConfig.handlerPending(95) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>> 1706233950802-0,[MapBackedRecord{recordId=1706233950802-0, kvMap={data=401}}]
2024-01-26 10:33:58.422 - [main] INFO  demo.RedisStreamConfig.handlerPending(114) - redis-消息队列,tsp,tsp-g-1,range-挂起消息>0> tsp,1706233950802-0,MapBackedRecord{recordId=1706233950802-0, kvMap={data=401}}
2024-01-26 10:33:58.422 - [main] INFO  demo.RedisStreamConfig.handlerPending(117) - redis-消息队列,tsp,tsp-g-1,pending-挂起消息-end

仅供参考-请结合实际情况使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/352370.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Leetcode刷题笔记题解(C++):1114. 按序打印(多线程)

思路&#xff1a; 保证A,B,C三个线程的顺序不会变&#xff0c;即优先级顺序的问题 A,B需要资源1&#xff0c;B,C需要资源2 A先占用资源1和资源2&#xff0c;A线程完了之后释放资源1不释放资源2&#xff0c;然后B线程占用资源1&#xff0c;A线程完了之后释放资源1和资源2&…

28个炫酷的纯CSS特效动画示例(含源代码)

CSS是网页的三驾马车之一&#xff0c;是对页面布局的总管家&#xff0c;2024年了&#xff0c;这里列出28个超级炫酷的纯CSS动画示例&#xff0c;让您的网站更加炫目多彩。 文章目录 1. 涌动的弹簧效果2. 超逼真的3D篮球弹跳&#xff0c;含挤压弹起模态3. 鼠标放div上&#xff0…

Cesium加载地图-高德影像

废话不多说&#xff0c;直接上代码 整体代码 <template><div id"cesiumContainer" style"height: 100vh;"></div><div id"toolbar" style"position: fixed;top:20px;left:220px;"><el-breadcrumb><…

PyTorch 中的nn.Conv2d 类

nn.Conv2d 是 PyTorch 中的一个类&#xff0c;代表二维卷积层&#xff08;2D Convolution Layer&#xff09;。这个类广泛用于构建卷积神经网络&#xff08;CNN&#xff09;&#xff0c;特别是在处理图像数据时。 基本概念 卷积: 在神经网络的上下文中&#xff0c;卷积是一种特…

xshell无法连接linux,查询本机ip时出现<NO-CARRIER,BROADCAST,MULTICAST,UP>

在用xshell连接虚拟机VMware中的linux时&#xff0c;发现昨天还能连通的&#xff0c;今天连接不了了 我寻思应该是网卡配置出问题了&#xff0c;就去终端ip addr试了一下&#xff0c;果然发现问题&#xff0c;ip 查看网卡ens33就发现出现ens33:<NO-CARRIER,BROADCAST,MULTI…

自然语言处理:transfomer架构

介绍 transfomer是自然语言处理中的一个重要神经网络结构&#xff0c;算是在传统RNN和LSTM上的一个升级&#xff0c;接下来让我们来看看它有处理语言序列上有哪些特殊之处 模型整体架构 原论文中模型的整体架构如下&#xff0c;接下来我们将层层解析各层的作用和代码实现 该…

java中aes加密解密工具类

java中aes加密解密工具类 字符串&#xff1a;{“DATA”:{“SJH”:“17600024168”,“DLZH”:“91510104MA67FPXR5T”,“DLMM”:“jhdz123456”,“DLSF”:“5”,“NSRSBH”:“91510104MA67FPXR5T”},“JRSF”:“23”} 加密后&#xff1a;y4mzmi3jta22aXeIPfEdzu8sgA9uy3OevaIY…

LSTM的多变量时间序列预测(北京PM2.5预测)

参考博客 文章目录 LSTM简介数据集简介数据预处理多元LSTM预测模型数据准备&#xff1a;定义和拟合模型评估模型 训练多个滞后时间步 LSTM简介 LSTM&#xff08;Long Short-Term Memory&#xff09;是一种特殊类型的循环神经网络&#xff08;RNN&#xff09;&#xff0c;它在处…

LeetCode:1706. 球会落何处(Java 模拟)

目录 1706. 球会落何处 题目描述&#xff1a; 实现代码与解析&#xff1a; 原理思路&#xff1a; 1706. 球会落何处 题目描述&#xff1a; 用一个大小为 m x n 的二维网格 grid 表示一个箱子。你有 n 颗球。箱子的顶部和底部都是开着的。 箱子中的每个单元格都有一个对角线…

【C语言/数据结构】排序(直接插入排序|希尔排序)

&#x1f308;个人主页&#xff1a;秦jh__https://blog.csdn.net/qinjh_?spm1010.2135.3001.5343&#x1f525; 系列专栏&#xff1a;《数据结构》https://blog.csdn.net/qinjh_/category_12536791.html?spm1001.2014.3001.5482 ​​​​ 目录 插入排序 直接插入排序&…

【STM32】STM32学习笔记-Unix时间戳(41)

00. 目录 文章目录 00. 目录01. Unix时间戳02. UTC/GMT03. 时间戳转换04. C 标准库 <time.h>05. 时间相关函数示例5.1 time函数5.2 gmtime函数5.3 localtime函数5.4 mktime函数5.5 ctime函数5.6 asctime函数5.7 strftime函数 06. 预留07. 附录 01. Unix时间戳 •Unix 时…

GD32移植FreeRTOS+CLI过程记录

背景 之前我只在STM32F0上基于HAL库和CubeMX移植FreeRTOS&#xff0c;但最近发现国产化替代热潮正盛&#xff0c;许多项目都有国产化器件指标&#xff0c;而且国产单片机确实比意法的便宜&#xff0c;所以也买了块兆易创新的GD32F303开发板&#xff0c;试一试它的优劣。虽然GD…

HarmonyOS鸿蒙学习基础篇 - 通用事件

一、引言 HarmonyOS鸿蒙是华为推出的分布式操作系统&#xff0c;旨在为各种智能设备提供统一的操作系统。鸿蒙系统的一大特色是其强大的分布式能力&#xff0c;而通用事件则是实现这一能力的关键技术之一&#xff0c;本篇博客将介绍HarmonyOS鸿蒙中的通用事件。 二、 点击事件…

Vue深入学习4—指令和生命周期

1.Vue是怎么识别 v- 指令的&#xff1f; 首先将HTML结构解析成属性列表&#xff0c;存入到数组中&#xff0c;接着遍历数组中的每一个节点&#xff0c;获取到不同指令对应的方法。 // 将HTML看作真正的属性列表 var ndoeAttrs node.attributes; var self this; // 类数组对象…

使用chrome爬取URL数据的实战代码

大家好,我是herosunly。985院校硕士毕业,现担任算法研究员一职,热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名,CCF比赛第二名,科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的见解。曾经辅导过若干个非计算机专业的学生进入到算法…

JavaScript 执行上下文与作用域

执行上下文与作用域 ​ 执行上下文的概念在 JavaScript 中是颇为重要的。变量或函数的上下文决定了它们可以访问哪些数据&#xff0c;以及它们的行为。每个上下文都有一个关联的变量对象&#xff08;variable object&#xff09;&#xff0c; 而这个上下文中定义的所有变量和函…

Java项目:17 基于SpringBoot的在线拍卖系统

作者主页&#xff1a;源码空间codegym 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 主要功能 前台登录&#xff1a; ①首页&#xff1a;轮播图、竞拍公告、拍卖商品展示 ②拍卖商品&#xff1a;分类&#xff1a;手机、数码、电器…

Vite学习指南

那本课程都适合哪些人群呢&#xff1f; 想要学习前端工程化&#xff0c;在新项目中投入使用 Vite 构建工具的朋友 Webpack 转战到 Vite 的小伙伴 前端架构师们&#xff0c;可以充实自己的工具箱 当然如果你没有项目相关开发经验&#xff0c;也可以从本课程中受益&#xff0…

【Linux】gcc中__builtin_expect的作用

本文首发于 慕雪的寒舍 引入 代码学习的时候&#xff0c;遇到了__builtin_expect这个之前从来没有遇到过的东西&#xff0c;网上搜了一下&#xff0c;发现纯C语言实现的GCD&#xff08;Grand Central Dispatch&#xff09;中就有定义过这个宏 #define _safe_cast_to_long(x) …

2017. 圆周排列

一、题目 Problem #2017 - ECNU Online Judge 二、思路 一开始以为是全排列&#xff0b;验证的问题&#xff0c;后来超时&#xff0c;然后转向组合排列思考&#xff0c;结果AC了 首先要知道&#xff1a;n个不同元素的圆排列有(n-1)!个 证明&#xff1a;将个n 元素中的某个元素…