在上一篇文章,分布式 ID 生成策略(一),我们讨论了基于数据库的 ID 池策略,今天来看另一种实现,基于雪花算法的分布式 ID 生成策略。
如图所示,我们用 41 位时间戳 + 12 位机器 ID + 10 位序列号,来表示一个 64 位的分布式 ID。
基于这样的雪花算法来保证 ID 的唯一性
- 时间戳是递增的,不同时刻产生的 ID 肯定是不同的;
- 机器 ID 是不同的,同一时刻不同机器产生的 ID 肯定也是不同的;
- 同一时刻同一机器上,可以轻易控制序列号。
CREATE TABLE `global_sequence_time` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`node_name` varchar(32) NOT NULL DEFAULT '' COMMENT '机器名称,通常为内网IP',
`node_id` smallint(6) NOT NULL COMMENT '机器ID,数字,最大1023',
`sn` varchar(128) NOT NULL DEFAULT '' COMMENT '业务字段名称',
`version` bigint(20) NOT NULL DEFAULT '1' COMMENT '乐观锁版本',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_name` (`sn`,`node_name`) USING BTREE,
UNIQUE KEY `uk_id` (`sn`,`node_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='全局ID生成表';
-
node_name:节点名称,默认为节点的内网IP。所以,同一个机器上,部署多个应用,他们的 node_name 是一样的;
-
sn:业务名称,根据此值分类ID的生成;
-
node_id:数字类型,最大值不能超过1024,即此算法最多支持1024个节点。
还有两个唯一约束
- 同一个业务sn值,在一个机器上不能部署2个;
- 同一个业务sn值,node_id不能重复。
package idgenerator;
/**
* Description
* 基于SnowFlake算法实现
* 将node_id保存在数据库中,根据本机IP地址作为标识.每个机器对应一个node_id.
*/
public class TimeBasedIDGenerator extends IDGenerator {
protected static final int DEFAULT_RETRY = 6;
public static final int NODE_SHIFT = 10;
public static final int SEQ_SHIFT = 12;
public static final short MAX_NODE = 1024;//最大node 个数,
public static final short MAX_SEQUENCE = 4096;//每秒最大ID个数
private short sequence;
private long referenceTime;
private DataSource dataSource;
private Map<String, Integer> cachedNodeId = new HashMap<>();
private String nodeName;
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
/**
*
*/
public TimeBasedIDGenerator() {
nodeName = getLocalAddress();
}
private synchronized Integer getNodeId(String sn) {
Integer nodeId = cachedNodeId.get(sn);
//正常,返回
if (nodeId != null) {
return nodeId;
}
int i = 0;
while (i < DEFAULT_RETRY) {
nodeId = fetch(sn);
if (nodeId > MAX_NODE) {
throw new IllegalStateException("node_id is greater than " + MAX_NODE + ",please check sn=" + sn);
}
if (nodeId > 0) {
cachedNodeId.put(sn, nodeId);
break;
}
i++;
}
return nodeId;
}
/**
* @return The next 64-bit integer.
*/
public synchronized long next(String sn) {
Integer nodeId = getNodeId(sn);
if (nodeId == null || nodeId < 0) {
throw new IllegalStateException("无法获取nodeId,sn=" + sn);
}
long currentTime = System.currentTimeMillis();
long counter;
if (currentTime < referenceTime) {
throw new RuntimeException(String.format("Last referenceTime %s is after reference time %s", referenceTime, currentTime));
} else if (currentTime > referenceTime) {
this.sequence = 0;
} else {
if (this.sequence < MAX_SEQUENCE) {
this.sequence++;
} else {
throw new RuntimeException("Sequence exhausted at " + this.sequence);
}
}
counter = this.sequence;
referenceTime = currentTime;
return currentTime << NODE_SHIFT << SEQ_SHIFT | nodeId << SEQ_SHIFT | counter;
}
/**
* 获取nodeId.
* @param sn
* @return
*/
private int fetch(String sn) {
Connection connection = null;
PreparedStatement ps = null;
try {
connection = dataSource.getConnection();
connection.setAutoCommit(true);//普通操作
connection.setReadOnly(false);
ps = connection.prepareStatement("select node_id from `global_sequence_time` where `sn` = ? and node_name = ? limit 1");
ps.setString(1, sn);
ps.setString(2, nodeName);
ResultSet rs = ps.executeQuery();
//已有数据,则直接返回
if (rs.next()) {
return rs.getInt(1);
}
ps.close();
//如果没有数据,则首先获得已有的最大node_id,然后自增
//查询已知最大id
ps = connection.prepareStatement("select MAX(node_id) AS m_id from `global_sequence_time` where `sn` = ?");
ps.setString(1, sn);
rs = ps.executeQuery();
int id = 1;
if (rs.next()) {
id = rs.getInt(1) + 1;
}
ps.close();
//新建记录
ps = connection.prepareStatement("insert into global_sequence_time(node_name,node_id,sn,create_time,update_time) VALUE (?,?,?,NOW(),NOW())");
ps.setString(1, nodeName);
ps.setInt(2, id);
ps.setString(3, sn);
int row = ps.executeUpdate();
if (row > 0) {
return id;
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
if (ps != null) {
ps.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception ex) {
//
}
}
return -1;
}
private String getLocalAddress() {
try {
Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();//一个主机有多个网络接口
while (netInterfaces.hasMoreElements()) {
NetworkInterface netInterface = netInterfaces.nextElement();
Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress address = addresses.nextElement();
if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {
return address.getHostAddress();
}
}
}
} catch (Exception e) {
throw new RuntimeException("无法获取本地IP地址", e);
}
return null;
}
}