文章目录
- Redis消息队列实现异步秒杀
- 1. jvm阻塞队列问题
- 2. 什么是消息队列
- 3. Redis实现消息队列
- 1. 基于List结构模拟消息队列
- 操作
- 优缺点
- 2. 基于PubSub发布订阅的消息队列
- 操作
- 优缺点
- spring 结合redis的pubsub使用示例
- 1. 引入依赖
- 2. 配置文件
- 3. RedisConfig
- 4. CustomizeMessageListener
- 5. RedisMessageReceiver
- 6. 监听原理简析
- 7. 监听redis的key
- 修改redis.conf
- KeyspaceEventMessageListener
- KeyExpirationEventMessageListener
- 修改RedisConfig
Redis消息队列实现异步秒杀
1. jvm阻塞队列问题
java使用阻塞队列实现异步秒杀存在问题:
- jvm内存限制问题:jvm内存不是无限的,在高并发的情况下,当有大量的订单需要创建时,就有可能超出jvm阻塞队列的上限。
- 数据安全问题:jvm的内存没有持久化机制,当服务重启或宕机时,阻塞队列中的订单都会丢失。或者,当我们从阻塞队列中拿到订单任务,但是尚未处理时,如果此时发生了异常,这个订单任务就没有机会处理了,也就丢失了。
2. 什么是消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
(正常下单,我们需要将订单消息写入数据库。但由于秒杀并发访问量大,数据库本身并发处理能力不强,因此,在处理秒杀业务时,可以将部分业务在生产者这边做校验,然后将消息写入消息队列,而消费者处理该消息队列中的消息,从而实现双方解耦,更快的处理秒杀业务)
3. Redis实现消息队列
我们可以使用一些现成的mq,比如kafka,rabbitmq等等,但是呢,如果没有安装mq,我们也可以直接使用redis提供的mq方案,降低我们的部署和学习成本。Redis提供了三种不同的方式来实现消息队列:
- list结构:基于List结构模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:比较完善的消息队列模型
1. 基于List结构模拟消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
操作
命令介绍如下
优缺点
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失(如果消费者获取消息后,然后立马就宕机了,这个消息就得不到处理,等同于丢失了)
- 只支持单消费者(1个消息只能被1个消费者取走,其它消费者会收不到此消息)
2. 基于PubSub发布订阅的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
- SUBSCRIBE channel [channel] :订阅一个或多个频道
- PUBLISH channel msg :向一个频道发送消息
- PSUBSCRIBE pattern [pattern] :订阅与pattern格式匹配的所有频道
- ?匹配1个字符:
h?llo
subscribes tohello
,hallo
andhxllo
- *匹配0个或多个字符:
h*llo
subscribes tohllo
andheeeello
- []指定字符:
h[ae]llo
subscribes tohello
andhallo,
but nothillo
- ?匹配1个字符:
操作
优缺点
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化(如果发送消息时,这个消息的频道没有被任何人订阅,那这个消息就丢失了,也消息就是不会被保存)
- 无法避免消息丢失(发完了,没人收,直接就丢了)
- 消息堆积有上限,超出时数据丢失(当我们发送消息时,如果有消费者在监听,消费者会有1个缓存区去缓存这个消息数据,如果消费者处理的慢,那么客户端的缓存区中的消息会不断堆积,而这个缓存区是有大小限制的,如果超出了就会丢失)
spring 结合redis的pubsub使用示例
1. 引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.zzhua</groupId>
<artifactId>demo-redis-pubsub</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 如果使用lettuce-core作为连接redis的实现,
不引入此依赖会报错: Caused by: java.lang.ClassNotFoundException:
org.apache.commons.pool2.impl.GenericObjectPoolConfig -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2. 配置文件
spring:
redis:
host: 127.0.0.1
port: 6379
database: 0
password:
lettuce:
pool:
min-idle: 2
max-active: 8
max-idle: 8
3. RedisConfig
spring-data-redis提供了2种处理redis消息的方法:
-
自己实现MessageListener接口
public interface MessageListener { // 处理消息的方法 // 第1个参数封装了: 消息发布到哪1个具体频道 和 消息的内容 // 第2个参数封装了: // 1. 如果当前是通过普通模式去订阅的频道, 那么收到消息时该pattern就是消息发送的具体频道 // 2. 如果当前是通过pattern通配符匹配去订阅的频道, 那么收到消息时, 该pattern就是订阅的频道 void onMessage(Message message, @Nullable byte[] pattern); }
-
指定MessageListenerAdapter适配器,该适配器指定特定对象的特定方法来处理消息(对特定的方法有参数方面的要求)
@Slf4j
@Configuration
public class RedisConfig {
@Autowired
private RedisMessageReceiver redisMessageReceiver;
@Autowired
private CustomizeMessageListener customizeMessageListener;
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 监听order.q通道(不带通配符匹配channel)
container.addMessageListener(customizeMessageListener, new ChannelTopic("order.q"));
// 监听order.*通道(带通配符匹配channel)
container.addMessageListener(listenerAdapter(), new PatternTopic("order.*"));
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter() {
// 交给receiver的receiveMessage方法, 对于这个方法的参数有如下要求:
// (2个参数: 第一个参数是Object-即消息内容(默认由RedisSerializer#deserialize处理,见MessageListenerAdapter#onMessage),
// 第二个参数是String-即订阅的通道, 详细看上面MessageListener接口中第二个参数的解释)
// (1个参数: 参数是Object-即消息内容)
return new MessageListenerAdapter(redisMessageReceiver, "receiveMessage");
}
}
4. CustomizeMessageListener
@Slf4j
@Component
public class CustomizeMessageListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] bodyBytes = message.getBody();
byte[] channelBytes = message.getChannel();
log.info("order.q - 消息订阅频道: {}", new String(channelBytes));
log.info("order.q - 消息内容: {}", new String(bodyBytes));
log.info("order.q - 监听频道: {}", new String(channelBytes));
}
}
5. RedisMessageReceiver
@Slf4j
@Component
public class RedisMessageReceiver {
public void receiveMessage(String msg, String topic) {
log.info("order.* - 消息的订阅频道: {}", topic);
log.info("order.* - 消息的内容: {}", msg);
}
}
6. 监听原理简析
spring-data-redis的lettuce-core是基于netty的,消息监听处理过程如下:
PubSubCommandHandler(netty中的ChannelHandler处理器)->PubSubEndpoint(根据消息类型调用LettuceMessageListener 的不同方法)->LettuceMessageListener -> RedisMessageListenerContainer$DispatchMessageListener(如果是pattern,则从patternMapping中获取所有的listener;如果不是pattern,则从channelMapping中获取所有的listener。至于怎么判断是不是pattern?)->使用异步线程池对上一步获取的所有listener执行onMessage方法
至于怎么判断是不是pattern?这个是根据订阅关系来的,如果订阅的是pattern,那么如果这个向这个pattern中发送了消息,那么就会收到1次消息,并且是pattern。如果订阅的是普通channel,那么如果向这个普通channel发送了消息,那么又会收到1次消息不是pattern。如果向1个channel中发送消息,这个channel既符合订阅的pattern,也符合订阅的普通channel,那么会收到2次消息,并且这2次消息1次是pattern,1次不是pattern的
7. 监听redis的key
既然已经说到了监听redis发布消息了,那么也补充一下监听redis的key过期。因为监听redis的key过期也是通过redis的发布订阅实现的。
修改redis.conf
############################# EVENT NOTIFICATION ##############################
# Redis能够将在keyspace中发生的事件通知给 发布/订阅 客户端
# Redis can notify Pub/Sub clients about events happening in the key space.
# This feature is documented at http://redis.io/topics/notifications
# 例如:如果开启了keyspace事件通知(注意了,必须是开启了keyspace事件通知才可以,开启的方式就是添加参数K),
# 一个客户端在数据库0对一个叫'foo'的key执行了删除操作,
# 那么redis将会通过 发布订阅 机制发布2条消息
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo
# For instance if keyspace events notification is enabled, and a client
# performs a DEL operation on key "foo" stored in the Database 0, two
# messages will be published via Pub/Sub:
#
# PUBLISH __keyspace@0__:foo del
# PUBLISH __keyevent@0__:del foo
# 也可以指定一组 类名 来选择 Redis 会通知的一类事件。
# 每类事件 都通过一个字符定义
# It is possible to select the events that Redis will notify among a set
# of classes. Every class is identified by a single character:
# keySpace事件 以 __keyspace@<数据库序号>__ 为前缀 发布事件
# K Keyspace events, published with __keyspace@<db>__ prefix.
# Keyevent事件 以 __keyevent@<数据库序号>__ 为前缀 发布事件
# E Keyevent events, published with __keyevent@<db>__ prefix.
# 执行常规命令,比如del、expire、rename
# g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
# 执行 String 命令
# $ String commands
# 执行 List 命令
# l List commands
# 执行 Set 命令
# s Set commands
# 执行 Hash 命令
# h Hash commands 执行 Hash 命令
# 执行 ZSet 命令
# z Sorted set commands
# key过期事件(每个key失效都会触发这类事件)
# x Expired events (events generated every time a key expires)
# key驱逐事件(当key在内存满了被清除时生成)
# e Evicted events (events generated when a key is evicted for maxmemory)
# A是g$lshzxe的别名,因此AKE就意味着所有的事件
# A Alias for g$lshzxe, so that the "AKE" string means all the events.
#
# 配置中的notify-keyspace-events这个参数由0个或多个字符组成,
# 如果配置为空字符串表示禁用通知
# The "notify-keyspace-events" takes as argument a string that is composed
# of zero or multiple characters. The empty string means that notifications
# are disabled.
#
# 比如,要开启list命令和generic常规命令的事件通知,
# 应该配置成 notify-keyspace-events Elg
# Example: to enable list and generic events, from the point of view of the
# event name, use:
#
# notify-keyspace-events Elg
#
# 比如,订阅了__keyevent@0__:expired频道的客户端要收到key失效的时间,
# 应该配置成 notify-keyspace-events Ex
# Example 2: to get the stream of the expired keys subscribing to channel name __keyevent@0__:expired use:
#
# notify-keyspace-events Ex
#
# 默认情况下,所有的通知都被禁用了,并且这个特性有性能上的开销。
# 注意,K和E必须至少指定其中一个,否则,将收不到任何事件。
# By default all notifications are disabled because most users don't need
# this feature and the feature has some overhead. Note that if you don't
# specify at least one of K or E, no events will be delivered.
notify-keyspace-events "Ex"
############################### ADVANCED CONFIG ###############################
KeyspaceEventMessageListener
- 通过实现InitializingBean接口,在afterPropertiesSet方法中,调用初始化init方法,从redis中获取
notify-keyspace-events
配置项对应的值,如果未设置任何值,则改为EA
,结合上面的redis.conf节选可知,表示的是开启所有的事件通知 - 使用redisMessageListenerContainer,通过pattern通配符匹配的方式订阅
__keyevent@*
频道 - 它是个抽象类,实现了MessageListener接口,处理消息的方法是个抽象方法
- 它有1个子类KeyExpirationEventMessageListener,订阅的pattern的频道是:
__keyevent@*__:expired
,通过重写doRegister修改了订阅的频道。并且重写了处理消息的方法,通过将消息内容包装成RedisKeyExpiredEvent事件对象,然后通过事件发布器将事件发布出去。
public abstract class KeyspaceEventMessageListener implements MessageListener, InitializingBean, DisposableBean {
private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");
private final RedisMessageListenerContainer listenerContainer;
private String keyspaceNotificationsConfigParameter = "EA";
/**
* Creates new {@link KeyspaceEventMessageListener}.
*
* @param listenerContainer must not be {@literal null}.
*/
public KeyspaceEventMessageListener(RedisMessageListenerContainer listenerContainer) {
Assert.notNull(listenerContainer, "RedisMessageListenerContainer to run in must not be null!");
this.listenerContainer = listenerContainer;
}
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.MessageListener#onMessage(org.springframework.data.redis.connection.Message, byte[])
*/
@Override
public void onMessage(Message message, @Nullable byte[] pattern) {
if (message == null || ObjectUtils.isEmpty(message.getChannel()) || ObjectUtils.isEmpty(message.getBody())) {
return;
}
doHandleMessage(message);
}
/**
* Handle the actual message
*
* @param message never {@literal null}.
*/
protected abstract void doHandleMessage(Message message);
/**
* Initialize the message listener by writing requried redis config for {@literal notify-keyspace-events} and
* registering the listener within the container.
*/
public void init() {
if (StringUtils.hasText(keyspaceNotificationsConfigParameter)) {
RedisConnection connection = listenerContainer.getConnectionFactory().getConnection();
try {
Properties config = connection.getConfig("notify-keyspace-events");
if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {
connection.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter);
}
} finally {
connection.close();
}
}
doRegister(listenerContainer);
}
/**
* Register instance within the container.
*
* @param container never {@literal null}.
*/
protected void doRegister(RedisMessageListenerContainer container) {
listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);
}
/*
* (non-Javadoc)
* @see org.springframework.beans.factory.DisposableBean#destroy()
*/
@Override
public void destroy() throws Exception {
listenerContainer.removeMessageListener(this);
}
/**
* Set the configuration string to use for {@literal notify-keyspace-events}.
*
* @param keyspaceNotificationsConfigParameter can be {@literal null}.
* @since 1.8
*/
public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) {
this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;
}
/*
* (non-Javadoc)
* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
*/
@Override
public void afterPropertiesSet() throws Exception {
init();
}
}
KeyExpirationEventMessageListener
public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implements
ApplicationEventPublisherAware {
private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");
private @Nullable ApplicationEventPublisher publisher;
/**
* Creates new {@link MessageListener} for {@code __keyevent@*__:expired} messages.
*
* @param listenerContainer must not be {@literal null}.
*/
public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/*
* (non-Javadoc)
* @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doRegister(org.springframework.data.redis.listener.RedisMessageListenerContainer)
*/
@Override
protected void doRegister(RedisMessageListenerContainer listenerContainer) {
listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
}
/*
* (non-Javadoc)
* @see org.springframework.data.redis.listener.KeyspaceEventMessageListener#doHandleMessage(org.springframework.data.redis.connection.Message)
*/
@Override
protected void doHandleMessage(Message message) {
publishEvent(new RedisKeyExpiredEvent(message.getBody()));
}
/**
* Publish the event in case an {@link ApplicationEventPublisher} is set.
*
* @param event can be {@literal null}.
*/
protected void publishEvent(RedisKeyExpiredEvent event) {
if (publisher != null) {
this.publisher.publishEvent(event);
}
}
/*
* (non-Javadoc)
* @see org.springframework.context.ApplicationEventPublisherAware#setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher)
*/
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
}
修改RedisConfig
@Slf4j
@Configuration
public class RedisConfig {
@Autowired
private RedisMessageReceiver redisMessageReceiver;
@Autowired
private CustomizeMessageListener customizeMessageListener;
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 注意以下测试在redis.confi配置文件中设置了: notify-keyspace-events 为 AKE,
// 也可以参照KeyspaceEventMessageListener在代码中设置这个配置项
/*
redis提供的事件通知发布消息示例如下:
K => PUBLISH __keyspace@0__:foo del
E => PUBLISH __keyevent@0__:del foo
参照上述示例去写这个topic即可
*/
// 监听key删除事件
container.addMessageListener(new MessageListener() {
/*
执行命令: del order:1234
输出如下:
监听key删除事件 - 消息的发布频道: __keyevent@0__:del
监听key删除事件 - 消息内容: order:1234
监听key删除事件 - 消息的订阅频道: __keyevent@*__:del
*/
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] bodyBytes = message.getBody();
byte[] channelBytes = message.getChannel();
log.info("监听key删除事件 - 消息的发布频道: {}", new String(channelBytes));
log.info("监听key删除事件 - 消息内容: {}", new String(bodyBytes));
log.info("监听key删除事件 - 消息的订阅频道: {}", new String(pattern));
}
}, new PatternTopic("__keyevent@*__:del"));
// 监听指定前缀的key
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] bodyBytes = message.getBody();
byte[] channelBytes = message.getChannel();
/*
执行命令: set order:1234 a
输出如下:
监听指定前缀的key - 消息的发布频道: __keyspace@0__:order:1234
监听指定前缀的key - 消息内容: set
监听指定前缀的key - 消息的订阅频道: __keyspace@0__:order:*
*/
log.info("监听指定前缀的key - 消息的发布频道: {}", new String(channelBytes));
log.info("监听指定前缀的key - 消息内容: {}", new String(bodyBytes));
log.info("监听指定前缀的key - 消息的订阅频道: {}", new String(pattern));
}
}, new PatternTopic("__keyspace@0__:order:*"));
return container;
}
/* 借助了
1. 这个KeyspaceEventMessageListener的bean中的对redis的配置修改
2. 监听patter的topic
*/
@Bean
public KeyspaceEventMessageListener keyspaceEventMessageListener(RedisMessageListenerContainer container) {
return new KeyspaceEventMessageListener(container){
/* __keyevent@* */
@Override
protected void doHandleMessage(Message message) {
log.info("监听所有key命令事件, 消息内容:{}, {}",
// set name zzhua; expire name 5;
// 消息内容就是key的名称, 比如: name
new String(message.getBody()),
// 消息所发布的频道, 比如: __keyevent@0__:set, __keyevent@0__:expire等
new String(message.getChannel())
);
}
};
}
@Bean
public KeyExpirationEventMessageListener keyExpirationEventMessageListener(RedisMessageListenerContainer container) {
return new KeyExpirationEventMessageListener(container){
/* __keyevent@*__:expired */
@Override
protected void doHandleMessage(Message message) {
log.info("监听所有key失效, 消息内容:{}, {}",
// 消息内容就是key的名称, 比如: name
new String(message.getBody()),
// 消息所发布的频道, 比如: __keyevent@0__:expired
new String(message.getChannel())
);
}
};
}
}