Redis --- 秒杀优化方案(阻塞队列+基于Stream流的消息队列)

下面是我们的秒杀流程:

对于正常的秒杀处理,我们需要多次查询数据库,会给数据库造成相当大的压力,这个时候我们需要加入缓存,进而缓解数据库压力。

在上面的图示中,我们可以将一条流水线的任务拆成两条流水线来做,如果我们直接将判断秒杀库存与校验一人一单放在流水线A上,剩下的放在另一条流水线B,那么如果流水线A就可以相当于服务员直接判断是否符合资格,如果符合资格那么直接生成信息给另一条流水线B去处理业务,这里的流水线就是咱们的线程,而流水线A也是基于数据库进行查询,也会压力数据库,那么这种情况我们就可以将待查询信息保存在Redis缓存中。

但是我们不能再流水线A判断完成后去直接调用流水线B,这样的效率是大打折扣的,这种情况我们需要开启独立线程去执行流水线B的操作,如何知道给哪个用户创建订单呢?这个时候就要流水线A在判断成功后去生成信息给独立线程

最后的业务就变成,用户直接访问流水线A,通过流水线A去判断,如果通过则生成信息给流水线B去创建订单,过程如下图:

那么什么样的数据结构满足下面条件:① 一个key能够保存很多值   ②唯一性:一人一单需要保证用户id不能重复。

所以我们需要使用set:

那么如何判断校验用户的购买资格呢?

 而上述判断需要保证原子性,所以我们需要使用Lua脚本进行编写:

local voucherId = ARGV[1]; -- 优惠劵id
local userId = ARGV[2]; -- 用户id

-- 库存key
local stockKey = 'seckill:stock' .. voucherId; -- 拼接
-- 订单key
local stockKey = 'seckill:stock' .. voucherId; -- 拼接
-- 判断库存是否充足
if(tonumber(redis.call('get',stockKey) <= 0)) then
    -- 库存不足,返回1
    return 1;
end;
-- 判断用户是否下单
if(redis.call('sismember',orderKey,userId)) then
    -- 存在,说明重复下单,返回2
    return 2;
end
-- 扣减库存 incrby stockKey -1
redis.call('incrby',stockKey,-1);
-- 下单(保存用户) sadd orderKey userId
redis.call('sadd',orderKey,userId);
return 0;

之后我们按照下面步骤来实现代码:

在方法体内执行Lua脚本来原子性判断,然后判断是否能够处理并传入阻塞队列:

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Autowired
    private ISeckillVoucherService seckillVoucherService;
    @Autowired
    private RedisIdWorker redisIdWorker;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Resource
    private RedissonClient redissonClient;
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT; // 泛型内填入返回值类型
    static { // 静态属性要使用静态代码块进行初始化
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setResultType(Long.class);
        SECKILL_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    }
    public Result seckillVoucherMax(Long voucherId) {
        // 获取用户信息
        Long userId = UserHolder.getUser().getId();
        // 1.执行Lua脚本来判断用户资格
        Long result = stringRedisTemplate.execute(
                            SECKILL_SCRIPT,
                            Collections.emptyList(), // Lua无需接受key
                            voucherId.toString(),
                            userId.toString()
                        );
        // 2.判断结果是否为0
        int r = result.intValue();
        if(r != 0) {
            // 不为0代表无资格购买
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
        // 3.有购买资格则将下单信息保存到阻塞队列中
        // ... 
        return Result.ok();
    }

}

 接下来我们创建阻塞队列,线程池以及线程方法,随后使用Springboot提供的注解在@PostConstruct去给线程池传入线程方法:

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Autowired
    private ISeckillVoucherService seckillVoucherService;
    @Autowired
    private RedisIdWorker redisIdWorker;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Resource
    private RedissonClient redissonClient;
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT; // 泛型内填入返回值类型
    static { // 静态属性要使用静态代码块进行初始化
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setResultType(Long.class);
        SECKILL_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    }
    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); // 创建阻塞队列
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();  // 创建线程池
    // 让大类在开始初始化时就能够执行线程任务
    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderTask());
    }
    // 创建线程任务
    private class VoucherOrderTask implements Runnable {
        @Override
        public void run() {
            while(true){
                try {
                    // 获取队列中的订单信息
                    VoucherOrder voucherOrder = orderTasks.take();// 取出头部信息
                    // 创建订单
                    handleVoucherOrder(voucherOrder);
                } catch (Exception e) {
                    log.error("处理订单异常",e);
                }
            }
        }
    }
    // 创建订单
    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        RLock lock = redissonClient.getLock("lock:order:" + voucherOrder.getUserId().toString());
        boolean isLock = lock.tryLock();
        // 判断是否获取锁成功
        if (!isLock) {
            // 获取锁失败,返回错误或重试
            log.error("不允许重复下单");
            return ;
        }
        try {
            proxy.createVoucherOrderMax(voucherOrder);
        } finally {
            lock.unlock();
        }
    }
    @Override
    public void createVoucherOrderMax(VoucherOrder voucherOrder) {
        // 一人一单
        Long userId = voucherOrder.getUserId();
        // 查询订单
        int count = query().eq("user_id",userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        // 判断是否存在
        if(count > 0){
            // 用户已经购买过
            log.error("用户已经购买过");
            return ;
        }
        // CAS改进:将库存判断改成stock > 0以此来提高性能
        boolean success = seckillVoucherService.update()
                .setSql("stock= stock -1") // set stock = stock - 1
                .eq("voucher_id", voucherOrder.getVoucherId()).eq("stock",0) // where id = ? and stock > 0
                .update();
        if (!success) {
            //扣减库存
            log.error("库存不足!");
            return ;
        }
        //6.创建订单
        save(voucherOrder);
    }
    private IVoucherOrderService proxy; // 代理对象
    public Result seckillVoucherMax(Long voucherId) {
        // 获取用户信息
        Long userId = UserHolder.getUser().getId();
        // 1.执行Lua脚本来判断用户资格
        Long result = stringRedisTemplate.execute(
                            SECKILL_SCRIPT,
                            Collections.emptyList(), // Lua无需接受key
                            voucherId.toString(),
                            userId.toString()
                        );
        // 2.判断结果是否为0
        int r = result.intValue();
        if(r != 0) {
            // 不为0代表无资格购买
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
        // 3.有购买资格则将下单信息保存到阻塞队列中
        Long orderId = redisIdWorker.nextId("order");
        // 创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(userId);
        voucherOrder.setVoucherId(voucherId);
        // 放入阻塞队列
        orderTasks.add(voucherOrder);
        // 4.获取代理对象(线程异步执行,需要手动在方法内获取)
        proxy = (IVoucherOrderService)AopContext.currentProxy(); // 获取当前类的代理对象  (需要引入aspectjweaver依赖,并且在实现类加入@EnableAspectJAutoProxy(exposeProxy = true)以此来暴露代理对象)
        return Result.ok();
    }

}

在上面代码中,我们使用下面代码创建了一个单线程的线程池。它保证所有提交的任务都按照提交的顺序执行,每次只有一个线程在工作。

private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

下面代码是一个常见的阻塞队列实现,具有固定大小(在这里是 1024 * 1024),它的作用是缓冲和排队任务。ArrayBlockingQueue 是一个线程安全的队列,它会自动处理线程之间的同步问题。当队列满时,调用 put() 方法的线程会被阻塞,直到队列有空间;当队列为空时,调用 take() 方法的线程会被阻塞,直到队列中有数据。

private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

在下面代码中,orderTasks 阻塞队列用于存放需要处理的订单对象,每个订单的处理逻辑都由 VoucherOrderTask 线程池中的线程异步执行:

VoucherOrder voucherOrder = orderTasks.take();
handleVoucherOrder(voucherOrder);

之后我们需要调用 Runnable 接口去实现VoucherOrderTask类以此来创建线程方法

private class VoucherOrderTask implements Runnable {
    @Override
    public void run() {
        while (true) {
            try {
                // 获取队列中的订单信息
                VoucherOrder voucherOrder = orderTasks.take(); // 获取订单
                // 创建订单
                handleVoucherOrder(voucherOrder);
            } catch (Exception e) {
                log.error("处理订单异常", e);
            }
        }
    }
}

随后将线程方法通过 submit() 方法将 VoucherOrderTask 提交到线程池中,这个任务是一个无限循环的任务,它会不断从阻塞队列中取出订单并处理,直到线程池关闭。这种方式使得订单处理任务可以异步执行,而不阻塞主线程,提高了系统的响应能力:

@PostConstruct
private void init() {
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderTask());
}

但是在高并发的情况下就会产生大量订单,就会超出JVM阻塞队列的上线,并且每当服务重启或者宕机的情况发生,阻塞队列的所有订单任务就都会丢失。

所以为了解决这种情况,我们就要使用消息队列去解决这个问题:


什么是消息队列?


消息队列(Message Queue, MQ)是一种用于在应用程序之间传递消息的通信方式。它允许应用程序通过发送和接收消息来解耦,从而提高系统的可扩展性、可靠性和灵活性。消息队列通常用于异步通信、任务队列、事件驱动架构等场景。

消息队列的核心概念 :

  1. 生产者(Producer):发送消息到消息队列的应用程序。

  2. 消费者(Consumer):从消息队列中接收并处理消息的应用程序。

  3. 队列(Queue):消息的存储区域,生产者将消息发送到队列,消费者从队列中获取消息。

  4. 消息(Message):在生产者与消费者之间传递的数据单元。

  5. Broker:消息队列的服务器,负责接收、存储和转发消息。

消息队列是在JVM以外的一个独立的服务,能够不受JVM内存的限制,并且存入MQ的信息都可以做持久化存储。

详细教学可以查询下面链接:微服务架构 --- 使用RabbitMQ进行异步处理 


但是这样的方式是需要额外提供服务的,所以我们可以使用Redis提供的三种不同的方式来实现消息队列

  1. List 结构实现消息队列

  2. Pub/Sub(发布/订阅)模式

  3. Stream 结构(Redis 5.0 及以上版本)(推荐使用)(详细介绍)


使用 List 结构实现消息队列:

Redis 的 List 数据结构是一个双向链表,支持从头部或尾部插入和弹出元素。我们可以利用 LPUSH 和 BRPOP 命令实现一个简单的消息队列。

实现步骤:

  • 生产者:使用 LPUSH 将消息推入队列。

  • 消费者:使用 BRPOP 阻塞地从队列中获取消息。

生产者代码:

import redis.clients.jedis.Jedis;

public class ListProducer {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379); // 连接 Redis
        String queueName = "myQueue";

        // 发送消息
        for (int i = 1; i <= 5; i++) {
            String message = "Message " + i;
            jedis.lpush(queueName, message); // 将消息推入队列
            System.out.println("Sent: " + message);
        }

        jedis.close(); // 关闭连接
    }
}

消费者代码:

import redis.clients.jedis.Jedis;

public class ListConsumer {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379); // 连接 Redis
        String queueName = "myQueue";

        while (true) {
            // 阻塞获取消息,超时时间为 0(无限等待)
            var result = jedis.brpop(0, queueName);
            String message = result.get(1); // 获取消息内容
            System.out.println("Received: " + message);
        }
    }
}
  • 优点:简单易用,适合轻量级场景。

  • 缺点不支持消息确认机制,消息一旦被消费(从队列内取出)就会从队列中删除。并且只支持单消费者(一个消息只能拿出一次)


使用 Pub/Sub 模式实现消息队列: 

Redis 的 Pub/Sub 模式是一种发布-订阅模型,生产者将消息发布到频道,消费者订阅频道以接收消息。

实现步骤:

  • 生产者:使用 PUBLISH 命令向频道发布消息。

  • 消费者:使用 SUBSCRIBE 命令订阅频道。

生产者代码:

import redis.clients.jedis.Jedis;

public class PubSubProducer {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379); // 连接 Redis
        String channelName = "myChannel";

        // 发布消息
        for (int i = 1; i <= 5; i++) {
            String message = "Message " + i;
            jedis.publish(channelName, message); // 发布消息到频道
            System.out.println("Published: " + message);
        }

        jedis.close(); // 关闭连接
    }
}

 消费者代码:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class PubSubConsumer {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379); // 连接 Redis
        String channelName = "myChannel";

        // 创建订阅者
        JedisPubSub subscriber = new JedisPubSub() {
            @Override
            public void onMessage(String channel, String message) {
                System.out.println("Received: " + message);
            }
        };

        // 订阅频道
        jedis.subscribe(subscriber, channelName);
    }
}
  • 优点:支持一对多的消息广播。

  • 缺点:消息是即时的,如果消费者不在线,消息会丢失。


但是上面两方式都是有缺点的:

  1. 不支持消息确认机制,消息一旦被消费(从队列内取出)就会从队列中删除。并且只支持单消费者(一个消息只能拿出一次)
  2. 消息是即时的,如果消费者不在线,消息会丢失。

所以根据上面的两种方式,我们推出一款全新的方式 ->

使用 Stream 结构实现消息队列:

Redis Stream 是一种强大的数据结构,用于管理消息流。它将消息存储在 Redis 中,并允许消费者按顺序获取消息。Stream 具有以下特点:

  • 有序消息:消息按插入顺序排列。
  • 消费者组:一个消费者组可以有多个消费者,每个消费者可以独立消费不同的消息。
  • 消息 ID:每条消息都有唯一的 ID(如:1588890470850-0),ID 按时间戳生成。
  • 自动分配消息:多个消费者可以从 Stream 中并行消费消息,保证消息不会重复消费。

在 Redis Stream 中,一个队列可以有多个消费者组,每个消费者组可以独立地消费队列中的消息。每个消费者组内有多个消费者,而消费者是基于 消费者名称 进行识别的。 

消费者组的工作方式:

  • 每个消费者组拥有自己的 消费进度,也就是每个消费者组会从 自己独立的消息 ID 开始消费
  • 多个消费者组之间是相互独立的,即使它们消费的是同一个队列,它们也可以从不同的位置开始消费队列中的消息。
  • 每个消费者组都可以有多个 消费者(在同一个组内,多个消费者可以并行消费同一个队列的消息,但每个消息在消费者组内只能被一个消费者处理一次)。

假设有一个队列(Stream)mystream,可以为它创建多个消费者组:

XGROUP CREATE mystream group1 $ MKSTREAM
XGROUP CREATE mystream group2 $ MKSTREAM

这样,mystream 队列上就有了两个消费者组:group1group2。每个消费者组可以有自己的消费者并从该队列中读取消息。此时,group1group2 都在消费同一个队列 mystream,但它们的消费进度是独立的,它们各自有自己的消息 ID 记录。

每个消费者组可以有多个消费者,而每个消费者通过一个 唯一的消费者名称 来标识。


每个消费者组有独立的消费进度


每个消费者组会记录自己的消费进度,也就是它消费到队列中的 哪个消息 ID。即使多个消费者组在消费同一个消息队列,它们每个组都会从 不同的消费位置(消息 ID)开始读取消息。

例如,假设有一个队列 mystream,同时有两个消费者组 group1group2,它们都从 mystream 队列中读取消息:

  • group1mystream 队列中的消息 id1 开始消费,group1 的进度会记录在 Redis 中。
  • group2mystream 队列中的消息 id2 开始消费,group2 的进度也会记录在 Redis 中。

消费进度互不干扰,即便 group1group2 都在消费 mystream 队列,它们的消费位置是独立的。


消费者组内部的消息消费


一个消费者组内的消费者会 共享 组内的消息。即使有多个消费者,每条消息 在消费者组内部只会被 一个消费者 消费。消费者之间会并行处理消息,但每条消息只会被一个消费者处理。

举个例子:假设 group1 中有三个消费者 consumer1consumer2consumer3,如果队列 mystream 有 6 条消息,那么它们会如下消费:

  • consumer1 处理消息 12
  • consumer2 处理消息 34
  • consumer3 处理消息 56

但对于消费者组 group2,如果它有自己的消费者,group2 内的消费者也会并行消费 mystream 中的消息,而 group1group2 之间没有直接关系。

首先初始化一个消息队列:

在项目启动时,确保 Redis 中存在对应的 Stream 和消费者组。可以通过程序在启动时检查并创建(如果不存在的话)。

@Configuration
public class RedisStreamConfig {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final String STREAM_KEY = "mystream";
    private static final String GROUP_NAME = "mygroup";

    @PostConstruct
    public void init() {
        // 检查消费者组是否存在,若不存在则创建
        try {
            // 如果消费者组不存在则会抛出异常,我们捕获异常进行创建
            redisTemplate.opsForStream().groups(STREAM_KEY);
        } catch (Exception e) {
            // 创建消费者组,起始位置为 $ 表示从末尾开始消费新消息
            redisTemplate.opsForStream().createGroup(STREAM_KEY, GROUP_NAME);
        }
    }
}

注意:

  • opsForStream().groups(STREAM_KEY):查询消费者组是否已存在。
  • opsForStream().createGroup(STREAM_KEY, GROUP_NAME):如果没有消费者组,则创建一个新的组。

随后我们生产者发送消息示例:

@Service  
public class RedisStreamProducerService {  // 定义生产者服务类 RedisStreamProducerService

    private static final String STREAM_KEY = "mystream";  // 定义 Redis Stream 的名称,这里指定队列名为 "mystream"

    @Autowired  
    private StringRedisTemplate redisTemplate;

    public void sendMessage(String content) {  // 定义一个方法,发送消息到 Redis Stream,参数 content 是消息的内容
        Map<String, String> map = new HashMap<>();  // 创建一个 Map 用来存储消息内容
        map.put("content", content);  // 将消息内容添加到 Map 中,键是 "content",值是传入的内容

        // 在消息队列中添加消息,调用 StringRedisTemplate 的 opsForStream 方法
        RecordId recordId = redisTemplate.opsForStream()  // 获取操作 Redis Stream 的操作对象
                .add(StreamRecords.objectBacked(map)  // 创建一个 Stream 记录,将 Map 转化为对象记录
                .withStreamKey(STREAM_KEY));  // 设置该记录属于的 Stream(消息队列)的名称
        // 输出记录的 ID,表示消息已经成功发送
        System.out.println("消息发送成功,id: " + recordId.getValue());  // 打印消息的 ID,表明该消息已经被成功加入到 Stream 中
    }
}

RecordId 是 Spring Data Redis 中的一个类,用来表示 消息的唯一标识符。它对应 Redis Stream 中的 消息 ID,该 ID 是 Redis Stream 中每条消息的唯一标识。Redis 中的消息 ID 通常是由时间戳和序号组成的(如 1588890470850-0)。

主要功能:
  • 表示消息 IDRecordId 是一个封装类,表示 Redis Stream 中消息的 ID。
  • 用于识别和操作消息:在消费和确认消息时,RecordId 用来标识每条消息的唯一性,并帮助 Redis 确定消息是否已经被消费
使用场景:

RecordId 用来标识从 Stream 中读取到的消息,我们可以通过 RecordId 来进行消息的确认、删除或其他操作。

RecordId recordId = redisTemplate.opsForStream().add(StreamRecords.objectBacked(map).withStreamKey("mystream"));

通过 StreamRecords.objectBacked(map)map 对象作为消息内容,并用 add 方法将其写入 Stream。

在然后编写消费者服务:

使用 RedisTemplate 的 read 方法(底层执行的是 XREADGROUP 命令)从消费者组中拉取消息,并进行处理。消费者可以采用定时任务或后台线程不断轮询

@Slf4j  
@Service  
public class RedisStreamConsumerService { 
    private static final String STREAM_KEY = "mystream";  // Redis Stream 的名称,这里指定队列名为 "mystream"
    private static final String GROUP_NAME = "mygroup";  // 消费者组的名称,多个消费者可以通过组名共享消费队列
    private static final String CONSUMER_NAME = "consumer-1";  // 消费者的名称,消费者名称在同一消费者组内必须唯一

    @Autowired  
    private StringRedisTemplate redisTemplate;

    @PostConstruct  // 使用该注解能让方法在 Spring 完成依赖注入后自动调用,用于初始化任务
    @Async  // 将该方法标记为异步执行,允许它在单独的线程中运行,不会阻塞主线程,@EnableAsync 需要在配置类中启用
    public void start() {  // 启动方法,在应用启动时执行
        // 无限循环,不断从 Redis Stream 中读取消息(可以改为定时任务等方式)
        while (true) {
            try {
                // 设置 Stream 读取的阻塞超时,设置最多等待 2 秒
                StreamReadOptions options = StreamReadOptions.empty().block(Duration.ofSeconds(2));
                // 从指定的消费者组中读取消息,">" 表示只消费未被消费过的消息
                List<MapRecord<String, Object, Object>> messages = redisTemplate.opsForStream().read(
                        Consumer.from(GROUP_NAME, CONSUMER_NAME),  // 指定消费者组和消费者名称
                        options,  // 设置读取选项,包含阻塞时间
                        StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed())  // 从最后消费的消息开始读取
                );
                // 如果没有消息,继续循环读取
                if (messages == null || messages.isEmpty()) {
                    continue;  
                }
                // 处理每一条读取到的消息
                for (MapRecord<String, Object, Object> message : messages) {
                    String messageId = message.getId();  // 获取消息的唯一标识符(ID)
                    Map<Object, Object> value = message.getValue();  // 获取消息内容(以 Map 形式存储)
                    log.info("接收到消息,id={},内容={}", messageId, value);  // 打印日志,记录消息 ID 和内容
                    // 在这里加入业务逻辑处理
                    // 例如处理消息并执行相应的操作
                    // ...

                    // 消息处理成功后,需要确认消息已经被消费(通过 XACK 命令)
                    redisTemplate.opsForStream().acknowledge(STREAM_KEY, GROUP_NAME, messageId);  // 确认消费的消息
                }
            } catch (Exception e) {
                log.error("读取 Redis Stream 消息异常", e);  // 异常捕获,记录错误日志
            }
        }
    }
}

MapRecord<String, Object, Object> 是 Spring Data Redis 用来表示 Redis Stream 中的 消息记录 的类。它不仅包含了消息的 ID,还包含了消息的内容(即消息数据)。在 Redis 中,每条消息都存储为一个 key-value 对。

主要功能:
  • 封装消息 ID 和消息内容MapRecord 用来封装消息的 ID 和消息的内容。
  • 消息的内容:消息的内容通常是一个 键值对Map<String, Object>),可以是任意对象的数据结构(例如,JSON、Map 或其他序列化对象)。
字段:
  • getId():返回消息的 ID(RecordId 类型)。
  • getValue():返回消息的内容,以 Map<Object, Object> 的形式。
使用场景:

MapRecord 是用来表示从 Stream 中读取到的消息,它将消息的 ID 和内容(键值对)封装在一起。你可以使用 MapRecord 来获取消息的 ID 和内容并处理。

MapRecord<String, Object, Object> message = redisTemplate.opsForStream().read(Consumer.from("mygroup", "consumer1"), options, StreamOffset.create("mystream", ReadOffset.lastConsumed()));

在这个例子中,message 是一个 MapRecord 实例,它封装了从 mystream 队列中读取到的消息。我们可以通过 message.getId() 获取消息 ID,通过 message.getValue() 获取消息内容。

在消费者中,我们使用 MapRecord<String, Object, Object> 来封装消息,获取 message.getId() 来获取消息的 ID(RecordId),以及通过 message.getValue() 获取消息的内容。 随后在处理完消息后,调用 acknowledge() 来确认消息已经被消费。

最后启动异步支持:

@SpringBootApplication
@EnableAsync // 启动异步支持
public class MyApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
}

通过这种方式,Spring Data Redis 提供了高效且类型安全的接口来操作 Redis Stream,帮助我们在分布式系统中实现高效的消息队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/964294.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Rust HashMap :当储物袋遇上物品清单

开场白&#xff1a;哈希映射的魔法本质 在Rust的奇幻世界里&#xff0c;HashMap就像魔法师的储物袋&#xff1a; 键值对存储 → 每个物品都有专属咒语&#xff08;键&#xff09;和实体&#xff08;值&#xff09;快速查找 → 念咒瞬间召唤物品动态扩容 → 自动伸展的魔法空间…

LabVIEW的智能电源远程监控系统开发

在工业自动化与测试领域&#xff0c;电源设备的精准控制与远程管理是保障系统稳定运行的核心需求。传统电源管理依赖本地手动操作&#xff0c;存在响应滞后、参数调节效率低、无法实时监控等问题。通过集成工业物联网&#xff08;IIoT&#xff09;技术&#xff0c;实现电源设备…

C# Winform制作一个登录系统

using System; using System.Collections; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows.Forms;namespace 登录 {p…

尝试把clang-tidy集成到AWTK项目

前言 项目经过一段时间的耕耘终于进入了团队开发阶段&#xff0c;期间出现了很多问题&#xff0c;其中一个就是开会讨论团队的代码风格规范&#xff0c;目前项目代码风格比较混乱&#xff0c;有的模块是驼峰&#xff0c;有的模块是匈牙利&#xff0c;后面经过讨论&#xff0c;…

Docker技术相关学习三

一、Docker镜像仓库管理 1.docker仓库&#xff1a;用于存储和分发docker镜像的集中式存储库&#xff0c;开发者可以将自己创建的镜像推送到仓库中也可以从仓库中拉取所需要的镜像。 2.docker仓库&#xff1a; 公有仓库&#xff08;docker hub&#xff09;&#xff1a;任何人都可…

挑战项目 --- 微服务编程测评系统(在线OJ系统)

一、前言 1.为什么要做项目 面试官要问项目&#xff0c;考察你到底是理论派还是实战派&#xff1f; 1.希望从你的项目中看到你的真实能力和对知识的灵活运用。 2.展示你在面对问题和需求时的思考方式及解决问题的能力。 3.面试官会就你项目提出一些问题&#xff0c;或扩展需求…

Python 与 PostgreSQL 集成:深入 psycopg2 的应用与实践

title: Python 与 PostgreSQL 集成:深入 psycopg2 的应用与实践 date: 2025/2/4 updated: 2025/2/4 author: cmdragon excerpt: PostgreSQL 作为开源关系型数据库的佼佼者,因其强大的功能与性能被广泛应用于各种项目中。而 Python 则因其简洁易用的语法、丰富的库和强大的…

计算机从何而来?计算技术将向何处发展?

计算机的前生&#xff1a;机械计算工具的演进 算盘是计算机的起点&#xff0c;它其实是一台“机械式半自动化运算器”。打算盘的“口诀”其实就是它的编程语言&#xff0c;算盘珠就是它的存储器。 第二阶段是可以做四则运算的加法器、乘法器。1642年&#xff0c;法国数学家帕斯…

【Blazor学习笔记】.NET Blazor学习笔记

我是大标题 我学习Blazor的顺序是基于Blazor University&#xff0c;然后实际内容不完全基于它&#xff0c;因为它的例子还是基于.NET Core 3.1做的&#xff0c;距离现在很遥远了。 截至本文撰写的时间&#xff0c;2025年&#xff0c;最新的.NET是.NET9了都&#xff0c;可能1…

MapReduce分区

目录 1. MapReduce分区1.1 哈希分区1.2 自定义分区 2. 成绩分组2.1 Map2.2 Partition2.3 Reduce 3. 代码和结果3.1 pom.xml中依赖配置3.2 工具类util3.3 GroupScores3.4 结果 参考 本文引用的Apache Hadoop源代码基于Apache许可证 2.0&#xff0c;详情请参阅 Apache许可证2.0。…

重生之我在异世界学编程之C语言:深入指针篇(上)

大家好&#xff0c;这里是小编的博客频道 小编的博客&#xff1a;就爱学编程 很高兴在CSDN这个大家庭与大家相识&#xff0c;希望能在这里与大家共同进步&#xff0c;共同收获更好的自己&#xff01;&#xff01;&#xff01; 本文目录 引言正文&#xff08;1&#xff09;内置数…

deep generative model stanford lecture note3 --- latent variable

1 Introduction 自回归模型随着gpt的出现取得很大的成功&#xff0c;还是有很多工程上的问题并不是很适合使用自回归模型&#xff1a; 1&#xff09;自回归需要的算力太大&#xff0c;满足不了实时性要求&#xff1a;例如在自动驾驶的轨迹预测任务中&#xff0c;如果要用纯自回…

STM32_SD卡的SDIO通信_DMA读写

本篇&#xff0c;将使用CubeMXKeil&#xff0c;创建一个SD卡的DMA读写工程。 目录 一、简述 二、CubeMX 配置 SDIO DMA 三、Keil 编辑代码 四、实验效果 实现效果&#xff0c;如下图&#xff1a; 一、简述 上篇已简单介绍了SD、SDIO&#xff0c;本篇不再啰嗦&#xff0c;…

互联网行业常用12个数据分析指标和八大模型

本文目录 前言 一、互联网线上业务数据分析的12个指标 1. 用户数据&#xff08;4个&#xff09; (1) 存量&#xff08;DAU/MAU&#xff09; (2) 新增用户 (3) 健康程度&#xff08;留存率&#xff09; (4) 渠道来源 2. 用户行为数据&#xff08;4个&#xff09; (1) 次数/频率…

【学术投稿-2025年计算机视觉研究进展与应用国际学术会议 (ACVRA 2025)】从计算机基础到HTML开发:Web开发的第一步

会议官网&#xff1a;www.acvra.org 简介 2025年计算机视觉研究进展与应用&#xff08;ACVRA 2025&#xff09;将于2025年2月28-3月2日在中国广州召开&#xff0c;将汇聚世界各地的顶尖学者、研究人员和行业专家&#xff0c;聚焦计算机视觉领域的最新研究动态与应用成就。本次…

【Unity踩坑】Unity项目管理员权限问题(Unity is running as administrator )

问题描述&#xff1a; 使用Unity Hub打开或新建项目时会有下面的提示。 解决方法&#xff1a; 打开“本地安全策略”&#xff1a; 在Windows搜索栏中输入secpol.msc并回车&#xff0c;或者从“运行”对话框&#xff08;Win R&#xff0c;然后输入secpol.msc&#xff09;启…

开发板上Qt运行的环境变量的三条设置语句的详解

在终端中运行下面三句命令用于配置开发板上Qt运行的环境变量&#xff1a; export QT_QPA_GENERIC_PLUGINStslib:/dev/input/event1 export QT_QPA_PLATFORMlinuxfb:fb/dev/fb0 export QT_QPA_FONTDIR/usr/lib/fonts/设置成功后可以用下面的语句检查设置成功没有 echo $QT_QPA…

【PyQt】pyqt小案例实现简易文本编辑器

pyqt小案例实现简易文本编辑器 分析 实现了一个简单的文本编辑器&#xff0c;使用PyQt5框架构建。以下是代码的主要功能和特点&#xff1a; 主窗口类 (MyWindow): 继承自 QWidget 类。使用 .ui 文件加载用户界面布局。设置窗口标题、状态栏消息等。创建菜单栏及其子菜单项&…

Java 数据库连接池:HikariCP 与 Druid 的对比

Java 数据库连接池&#xff1a;HikariCP 与 Druid 的对比 数据库连接池&#xff1a;HikariCP 1. 卓越的性能表现 HikariCP 在数据库连接池领域以其卓越的性能脱颖而出。 其字节码经过精心优化&#xff0c;减少了不必要的开销&#xff0c;使得连接获取和释放的速度极快。 在…

PHP实现混合加密方式,提高加密的安全性(代码解密)

代码1&#xff1a; <?php // 需要加密的内容 $plaintext 授权服务器拒绝连接;// 1. AES加密部分 $aesKey openssl_random_pseudo_bytes(32); // 生成256位AES密钥 $iv openssl_random_pseudo_bytes(16); // 生成128位IV// AES加密&#xff08;CBC模式&#xff09…