参考官网中文文档,连接上3主3从的Redis Cluster。
public class Redisson implements RedissonClient {
protected final Config config;//Redis配置类
protected final ConnectionManager connectionManager;//Redis的连接管理器
protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器
public static RedissonClient create(Config config) {
return new Redisson(config);
protected Redisson(Config config) {
this.config = config;
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
RedissonObjectBuilder objectBuilder = null;
if (config.isReferenceEnabled()) {
objectBuilder = new RedissonObjectBuilder(this);
commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
evictionScheduler = new EvictionScheduler(commandExecutor);
writeBehindService = new WriteBehindService(commandExecutor);
public class ConfigSupport {
public static ConnectionManager createConnectionManager(Config configCopy) {
UUID id = UUID.randomUUID();
if (configCopy.getClusterServersConfig() != null) {
return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
super(config, id);
this.natMapper = cfg.getNatMapper();
this.config = create(cfg);
Throwable lastException = null;
List<String> failedMasters = new ArrayList<String>();
for (String address : cfg.getNodeAddresses()) {
RedisURI addr = new RedisURI(address);
CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
RedisConnection connection = connectionFuture.toCompletableFuture().join();
List<ClusterNodeInfo> nodes = connection.sync(clusterNodesCommand);
CompletableFuture<Collection<ClusterPartition>> partitionsFuture = parsePartitions(nodes);
Collection<ClusterPartition> partitions = partitionsFuture.join();
List<CompletableFuture<Void>> masterFutures = new ArrayList<>();
for (ClusterPartition partition : partitions) {
if (partition.isMasterFail()) {
if (partition.getMasterAddress() == null) {
throw new IllegalStateException("Master node: " + partition.getNodeId() + " doesn't have address.");
CompletableFuture<Void> masterFuture = addMasterEntry(partition, cfg);
CompletableFuture<Void> masterFuture = CompletableFuture.allOf(masterFutures.toArray(new CompletableFuture[0]));
public class MasterSlaveConnectionManager implements ConnectionManager {
protected final String id;//初始化时为UUID
private final Map<RedisURI, RedisConnection> nodeConnections = new ConcurrentHashMap<>();
protected MasterSlaveConnectionManager(Config cfg, UUID id) {
this.id = id.toString();//传入的是UUID
this.cfg = cfg;
protected final CompletionStage<RedisConnection> connectToNode(NodeType type, BaseConfig<?> cfg, RedisURI addr, String sslHostname) {
RedisConnection conn = nodeConnections.get(addr);
if (conn != null) {
if (!conn.isActive()) {
} else {
return CompletableFuture.completedFuture(conn);
RedisClient client = createClient(type, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname);
CompletionStage<RedisConnection> future = client.connectAsync();
return future.thenCompose(connection -> {
if (connection.isActive()) {
if (!addr.isIP()) {
RedisURI address = new RedisURI(addr.getScheme() + "://" + connection.getRedisClient().getAddr().getAddress().getHostAddress() + ":" + connection.getRedisClient().getAddr().getPort());
nodeConnections.put(address, connection);
nodeConnections.put(addr, connection);
return CompletableFuture.completedFuture(connection);
} else {
CompletableFuture<RedisConnection> f = new CompletableFuture<>();
f.completeExceptionally(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!"));
return f;
public RedisClient createClient(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) {
RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname);
return RedisClient.create(redisConfig);
public final class RedisClient {
private final Bootstrap bootstrap;
private final Bootstrap pubSubBootstrap;
public static RedisClient create(RedisClientConfig config) {
return new RedisClient(config);
private RedisClient(RedisClientConfig config) {
bootstrap = createBootstrap(copy, Type.PLAIN);
pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);
this.commandTimeout = copy.getCommandTimeout();
private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
Bootstrap bootstrap = new Bootstrap()
bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
return bootstrap;
public RFuture<RedisConnection> connectAsync() {
CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();
CompletableFuture<RedisConnection> f = addrFuture.thenCompose(res -> {
CompletableFuture<RedisConnection> r = new CompletableFuture<>();
ChannelFuture channelFuture = bootstrap.connect(res);
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(final ChannelFuture future) throws Exception {
if (bootstrap.config().group().isShuttingDown()) {
IllegalStateException cause = new IllegalStateException("RedisClient is shutdown");
if (future.isSuccess()) {
RedisConnection c = RedisConnection.getFrom(future.channel());
c.getConnectionPromise().whenComplete((res, e) -> {
bootstrap.config().group().execute(new Runnable() {
public void run() {
if (e == null) {
if (!r.complete(c)) {
} else {
} else {
bootstrap.config().group().execute(new Runnable() {
public void run() {
return r;
return new CompletableFutureWrapper<>(f);
public class CommandSyncService extends CommandAsyncService implements CommandExecutor {
public CommandSyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {
super(connectionManager, objectBuilder, RedissonObjectBuilder.ReferenceType.DEFAULT);
public <T, R> R read(String key, RedisCommand<T> command, Object... params) {
return read(key, connectionManager.getCodec(), command, params);
public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {
RFuture<R> res = readAsync(key, codec, command, params);
return get(res);
public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
public <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
RFuture<R> res = evalReadAsync(key, codec, evalCommandType, script, keys, params);
return get(res);
public <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
return evalWrite(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
public <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
RFuture<R> res = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
return get(res);
public class CommandAsyncService implements CommandAsyncExecutor {
final ConnectionManager connectionManager;
final RedissonObjectBuilder objectBuilder;
final RedissonObjectBuilder.ReferenceType referenceType;
public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {
this.connectionManager = connectionManager;
this.objectBuilder = objectBuilder;
this.referenceType = referenceType;
public <V> V getNow(CompletableFuture<V> future) {
try {
return future.getNow(null);
} catch (Exception e) {
return null;
public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {
RFuture<R> res = readAsync(key, codec, command, params);
return get(res);
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
NodeSource source = getNodeSource(key);
return async(true, source, codec, command, params, false, false);
private NodeSource getNodeSource(String key) {
int slot = connectionManager.calcSlot(key);
return new NodeSource(slot);
public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) {
CompletableFuture<R> mainPromise = createPromise();
RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry);
return new CompletableFutureWrapper<>(mainPromise);
public <V> V get(RFuture<V> future) {
if (Thread.currentThread().getName().startsWith("redisson-netty")) {
throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
try {
return future.toCompletableFuture().get();
} catch (InterruptedException e) {
throw new RedisException(e);
} catch (ExecutionException e) {
throw convertException(e);
RLock lock = redisson.getLock("myLock");
public RLock getLock(String name) {
return new RedissonLock(commandExecutor, name);
public class RedissonLock extends RedissonBaseLock {
protected long internalLockLeaseTime;
protected final LockPubSub pubSub;
final CommandAsyncExecutor commandExecutor;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
public class Config {
private long lockWatchdogTimeout = 30 * 1000;
//This parameter is only used if lock has been acquired without leaseTimeout parameter definition.
//Lock expires after "lockWatchdogTimeout" if watchdog didn't extend it to next "lockWatchdogTimeout" time interval.
//This prevents against infinity locked locks due to Redisson client crush or any other reason when lock can't be released in proper way.
//Default is 30000 milliseconds
public Config setLockWatchdogTimeout(long lockWatchdogTimeout) {
this.lockWatchdogTimeout = lockWatchdogTimeout;
return this;
public long getLockWatchdogTimeout() {
return lockWatchdogTimeout;
public class RedissonLock extends RedissonBaseLock {
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
public void unlock() {
try {
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
} else {
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
// waiting for message
if (ttl >= 0) {
try {
commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
if (interruptibly) {
} else {
} finally {
unsubscribe(commandExecutor.getNow(future), threadId);
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
return ttlRemaining;
return new CompletableFutureWrapper<>(f);
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
getLockName(threadId)//ARGV[2],值为UUID + 线程ID
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
final String id;
final String entryName;
final CommandAsyncExecutor commandExecutor;
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
protected String getLockName(long threadId) {
return id + ":" + threadId;
abstract class RedissonExpirable extends RedissonObject implements RExpirable {
RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
public abstract class RedissonObject implements RObject {
protected final CommandAsyncExecutor commandExecutor;
protected String name;
protected final Codec codec;
public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
this.codec = codec;
this.commandExecutor = commandExecutor;
if (name == null) {
throw new NullPointerException("name can't be null");
protected final <V> V get(RFuture<V> future) {
return commandExecutor.get(future);
public class CommandAsyncService implements CommandAsyncExecutor {
public <V> V get(RFuture<V> future) {
if (Thread.currentThread().getName().startsWith("redisson-netty")) {
throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
try {
return future.toCompletableFuture().get();
} catch (InterruptedException e) {
throw new RedisException(e);
} catch (ExecutionException e) {
throw convertException(e);
首先通过Redis的hset命令设置一个key为锁名的Hash值。该Hash值的key为锁名,value是一个映射。也就是在value值中会有一个field为UUID + 线程ID,value为1的映射。比如:hset myLock UUID:ThreadID 1,lua脚本中的ARGV[2]就是由UUID + 线程ID组成的唯一值。
然后通过Redis的pexpire命令设置key为锁名的Hash值的过期时间,也就是设置key为锁名的Hash值的过期时间为30秒。比如:pexpire myLock 30000。所以默认情况下,myLock这个锁在30秒后就会自动过期。
首先通过Redis的hexists命令判断在key为锁名的Hash值里,field为UUID + 线程ID的映射是否已经存在。
如果在key为锁名的Hash值里,field为UUID + 线程ID的映射存在,那么就通过Redis的hincrby命令,对field为UUID + 线程ID的value值进行递增1。比如:hincrby myLock UUID:ThreadID 1。也就是在key为myLock的Hash值里,把field为UUID:ThreadID的value值从1累加到2,发生这种情况的时候往往就是当前线程对锁进行了重入。接着执行:pexpire myLock 30000,再次将myLock的有效期设置为30秒。
如果在key为锁名的Hash值里,field为UUID + 线程ID的映射不存在,发生这种情况的时候往往就是其他线程获取不到这把锁而产生互斥。那么就通过Redis的pttl命令,返回key为锁名的Hash值的剩余存活时间,因为不同线程的ARGV[2]是不一样的,ARGV[2] = UUID + 线程ID。
protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName());
int availableSlaves = entry.getAvailableSlaves();
CommandBatchService executorService = createCommandBatchService(availableSlaves);
RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
if (commandExecutor instanceof CommandBatchService) {
return result;
RFuture<BatchResult<?>> future = executorService.executeAsync();
CompletionStage<T> f = future.handle((res, ex) -> {
if (ex == null && res.getSyncedSlaves() != availableSlaves) {
throw new CompletionException(new IllegalStateException("Only " + res.getSyncedSlaves() + " of " + availableSlaves + " slaves were synced"));
return result.getNow();
return new CompletableFutureWrapper<>(f);
private CommandBatchService createCommandBatchService(int availableSlaves) {
if (commandExecutor instanceof CommandBatchService) {
return (CommandBatchService) commandExecutor;
BatchOptions options = BatchOptions.defaults().syncSlaves(availableSlaves, 1, TimeUnit.SECONDS);
return new CommandBatchService(commandExecutor, options);
public class CommandBatchService extends CommandAsyncService {
public CommandBatchService(CommandAsyncExecutor executor, BatchOptions options) {
this(executor.getConnectionManager(), options, executor.getObjectBuilder(), RedissonObjectBuilder.ReferenceType.DEFAULT);
private CommandBatchService(ConnectionManager connectionManager, BatchOptions options, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {
super(connectionManager, objectBuilder, referenceType);
this.options = options;
public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
NodeSource source = getNodeSource(key);
return evalAsync(source, false, codec, evalCommandType, script, keys, false, params);
//获取key对应的Redis Cluster节点
private NodeSource getNodeSource(String key) {
int slot = connectionManager.calcSlot(key);
return new NodeSource(slot);
private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, boolean noRetry, Object... params) {
if (isEvalCacheActive() && evalCommandType.getName().equals("EVAL")) {
CompletableFuture<R> mainPromise = new CompletableFuture<>();
Object[] pps = copy(params);
CompletableFuture<R> promise = new CompletableFuture<>();
String sha1 = calcSHA(script);
RedisCommand cmd = new RedisCommand(evalCommandType, "EVALSHA");
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
RedisExecutor<T, R> executor = new RedisExecutor<>(readOnlyMode, nodeSource, codec, cmd, args.toArray(), promise, false, connectionManager, objectBuilder, referenceType, noRetry);
public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public static final int MAX_SLOT = 16384;//Redis Cluster默认有16384个slot
public int calcSlot(String key) {
if (key == null) {
return 0;
int start = key.indexOf('{');
if (start != -1) {
int end = key.indexOf('}');
if (end != -1 && start + 1 < end) {
key = key.substring(start + 1, end);
int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
return result;
public class RedisExecutor<V, R> {
NodeSource source;
public void execute() {
CompletableFuture<RedisConnection> connectionFuture = getConnection().toCompletableFuture();
protected CompletableFuture<RedisConnection> getConnection() {
connectionFuture = connectionManager.connectionWriteOp(source, command);
return connectionFuture;
public class MasterSlaveConnectionManager implements ConnectionManager {
public CompletableFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command) {
MasterSlaveEntry entry = getEntry(source);
private MasterSlaveEntry getEntry(NodeSource source) {
if (source.getRedirect() != null) {
return getEntry(source.getAddr());
MasterSlaveEntry entry = source.getEntry();
if (source.getRedisClient() != null) {
entry = getEntry(source.getRedisClient());
if (entry == null && source.getSlot() != null) {
entry = getEntry(source.getSlot());
return entry;
public class ClusterConnectionManager extends MasterSlaveConnectionManager {
private final AtomicReferenceArray<MasterSlaveEntry> slot2entry = new AtomicReferenceArray<>(MAX_SLOT);
private final Map<RedisClient, MasterSlaveEntry> client2entry = new ConcurrentHashMap<>();
public MasterSlaveEntry getEntry(int slot) {
return slot2entry.get(slot);
public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
for (String address : cfg.getNodeAddresses()) {
CompletableFuture<Collection<ClusterPartition>> partitionsFuture = parsePartitions(nodes);
Collection<ClusterPartition> partitions = partitionsFuture.join();
List<CompletableFuture<Void>> masterFutures = new ArrayList<>();
for (ClusterPartition partition : partitions) {
CompletableFuture<Void> masterFuture = addMasterEntry(partition, cfg);
private CompletableFuture<Void> addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) {
CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName);
connectionFuture.whenComplete((connection, ex1) -> {
if (ex1 != null) {
log.error("Can't connect to master: {} with slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
MasterSlaveServersConfig config = create(cfg);
MasterSlaveEntry entry;
if (config.checkSkipSlavesInit()) {
entry = new SingleEntry(ClusterConnectionManager.this, config);
} else {
Set<String> slaveAddresses = partition.getSlaveAddresses().stream().map(r -> r.toString()).collect(Collectors.toSet());
entry = new MasterSlaveEntry(ClusterConnectionManager.this, config);
CompletableFuture<RedisClient> f = entry.setupMasterEntry(new RedisURI(config.getMasterAddress()), configEndpointHostName);
f.whenComplete((masterClient, ex3) -> {
if (ex3 != null) {
log.error("Can't add master: " + partition.getMasterAddress() + " for slot ranges: " + partition.getSlotRanges(), ex3);
for (Integer slot : partition.getSlots()) {
addEntry(slot, entry);
lastPartitions.put(slot, partition);
private void addEntry(Integer slot, MasterSlaveEntry entry) {
MasterSlaveEntry oldEntry = slot2entry.getAndSet(slot, entry);
if (oldEntry != entry) {
client2entry.put(entry.getClient(), entry);