Zookeeper-Zookeeper应用场景实战

1. Zookeeper Java客户端实战

ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。 可供选择的Java客户端API有:
        ZooKeeper官方的Java客户端API。
        第三方的Java客户端API,比如Curator。
ZooKeeper官方的客户端API提供了基本的操作。例如,创建会话、创建节点、读取节点、更新数据、 删除节点和检查节点是否存在等。不过,对于实际开发来说,ZooKeeper官方API有一些不足之处,具 体如下:
       1. ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新进行注册。
        2.会话超时之后没有实现重连机制。
        3.异常处理烦琐,ZooKeeper提供了很多异常,对于开发人员来说可能根本不知道应该如何处理这些抛出的异常。
       4. 仅提供了简单的byte[]数组类型的接口,没有提供Java POJO级别的序列化数据处理接口。
       5. 创建节点时如果抛出异常,需要自行检查节点是否存在。
        6.无法实现级联删除。
总之, ZooKeeper官方API功能比较简单,在实际开发过程中比较笨重,一般不推荐使用。

1.1 Zookeeper 原生Java客户端使用

引入zookeeper client依赖
<!-- zookeeper client -->
 <dependency>
 <groupId>org.apache.zookeeper</groupId>
 <artifactId>zookeeper</artifactId>
 <version>3.8.0</version>
</dependency>
注意:保持与服务端版本一致,不然会有很多兼容性的问题。
ZooKeeper原生客户端主要使用org.apache.zookeeper.ZooKeeper这个类来使用ZooKeeper服务。
ZooKeeper常用构造器
ZooKeeper (connectString, sessionTimeout, watcher)
        1.connectString:使用逗号分隔的列表,每个ZooKeeper节点是一个host.port对,host 是机器名或者IP地址,port是ZooKeeper节点对客户端提供服务的端口号。客户端会任意选connectString 中的一个节点建立连接。
        2.sessionTimeout : session timeout时间。
        3.watcher:用于接收到来自ZooKeeper集群的事件。 使用 zookeeper 原生 API,连接zookeeper集群
public class ZkClientDemo {

 private static final String CONNECT_STR="localhost:2181";
 private final static String
CLUSTER_CONNECT_STR="192.168.65.156:2181,192.168.65.190:2181,192.168.65.200:2181";

 public static void main(String[] args) throws Exception {

 final CountDownLatch countDownLatch=new CountDownLatch(1);
 ZooKeeper zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR,
 4000, new Watcher() {
 @Override
 public void process(WatchedEvent event) {
 if(Event.KeeperState.SyncConnected==event.getState()
 && event.getType()== Event.EventType.None){
 //如果收到了服务端的响应事件,连接成功
 countDownLatch.countDown();
 System.out.println("连接建立");
 }
 }
 });
 System.out.printf("连接中");
 countDownLatch.await();
 //CONNECTED

System.out.println(zooKeeper.getState());

//创建持久节点
 zooKeeper.create("/user","fox".getBytes(),
 ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

 }

}
Zookeeper主要方法
        create(path, data, acl,createMode): 创建一个给定路径的 znode,并在 znode 保存 data[]的 数据, createMode指定 znode 的类型。
        delete(path, version):如果给定 path 上的 znode 的版本和给定的 version 匹配, 删除 znode。
        exists(path, watch):判断给定 path 上的 znode 是否存在,并在 znode 设置一个 watch。
        getData(path, watch):返回给定 path 上的 znode 数据,并在 znode 设置一个 watch。
        setData(path, data, version):如果给定 path 上的 znode 的版本和给定的 version 匹配,设置 znode 数据。
        getChildren(path, watch):返回给定 path 上的 znode 的孩子 znode 名字,并在 znode 设置一个 watch。
        sync(path):把客户端 session 连接节点和 leader 节点进行同步。
方法特点:
        所有获取 znode 数据的 API 都可以设置一个 watch 用来监控 znode 的变化。
        所有更新 znode 数据的 API 都有两个版本: 无条件更新版本和条件更新版本。 如果 version 为 -1,更新为无条 件更新。否则只有给定的 version 和 znode 当前的 version 一样,才会进行更新,这样的更新是条件更新。
        所有的方法都有同步和异步两个版本。 同步版本的方法发送请求给 ZooKeeper 并等待服务器的响 应。异步版本 把请求放入客户端的请求队列,然后马上返回。异步版本通过 callback 来接受来 自服务端的响应。
同步创建节点:
@Test
public void createTest() throws KeeperException, InterruptedException {
String path = zooKeeper.create(ZK_NODE, "data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 log.info("created path: {}",path);
 }
异步创建节点:
@Test
 public void createAsycTest() throws InterruptedException {
 zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
        CreateMode.PERSISTENT,
        (rc, path, ctx, name) -> log.info("rc {},path {},ctx {},name
{}",rc,path,ctx,name),"context");
    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
 }
修改节点数据:
@Test
 public void setTest() throws KeeperException, InterruptedException {

 Stat stat = new Stat();
 byte[] data = zooKeeper.getData(ZK_NODE, false, stat);
 log.info("修改前: {}",new String(data));
 zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());
 byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);
 log.info("修改后: {}",new String(dataAfter));
 }

1.2 Curator开源客户端使用

        Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节 开发工作,包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。
Curator是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和 可读性更强的Fluent风格的客户端API框架。
        Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案, 例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。
Guava is to Java that Curator to ZooKeeper
在实际的开发场景中,使用Curator客户端就足以应付日常的ZooKeeper集群操作的需求。
官网: https://curator.apache.org/
引入依赖
Curator 包含了几个包:
         curator-framework是对ZooKeeper的底层API的一些封装。
        curator-client提供了一些客户端的操作,例如重试策略等。
        curator-recipes封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
<!-- zookeeper client -->
 <dependency>
     <groupId>org.apache.zookeeper</groupId>
     <artifactId>zookeeper</artifactId>
     <version>3.8.0</version>
 </dependency>

 <!--curator-->
 <dependency>
     <groupId>org.apache.curator</groupId>
         <artifactId>curator-recipes</artifactId>
             <version>5.1.0</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.zookeeper</groupId>
                     <artifactId>zookeeper</artifactId>
                 </exclusion>
             </exclusions>
  </dependency>
创建一个客户端实例
        在使用curator-framework包操作ZooKeeper前,首先要创建一个客户端实例。这是一个
CuratorFramework类型的对象,有两种方法:
使用工厂类CuratorFrameworkFactory的静态newClient()方法。
// 重试策略
 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
 //创建客户端实例
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString,
retryPolicy);

 //启动客户端
 client.start();
使用工厂类CuratorFrameworkFactory的静态builder构造者方法。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
 CuratorFramework client = CuratorFrameworkFactory.builder()
 .connectString("192.168.128.129:2181")
 .sessionTimeoutMs(5000) // 会话超时时间
 .connectionTimeoutMs(5000) // 连接超时时间
 .retryPolicy(retryPolicy)
 .namespace("base") // 包含隔离名称
 .build();
 client.start();
connectionString:服务器地址列表 在指定服务器地址列表的时候可以是一个地址,也可以是多个地址。如果 是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3;port3 。
retryPolicy:重试策略 当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通 过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作 都没有问题,而 SYSTEMERROR 表示系统或服务端错误

 

超时时间: Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间 ,用 来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间 ,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。

创建节点
        创建节点的方式如下面的代码所示,回顾我们之前课程中讲到的内容,描述一个节点要包括节点的类 型,即临时节点还是持久节点、节点的数据信息、节点是否是有序节点等属性和性质。
@Test
 public void testCreate() throws Exception {
 String path = curatorFramework.create().forPath("/curator-node");
 curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curatornode","some-data".getBytes())

 log.info("curator create node :{} successfully.",path);
 }
在 Curator 中,可以使用 create 函数创建数据节点,并通过 withMode 函数指定节点类型(持久化
节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用
forPath 函数来指定节点的路径和数据信息。
一次性创建带层级结构的节点
@Test
 public void testCreateWithParent() throws Exception {
 String pathWithParent="/node-parent/sub-node-1";
 String path =
curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);

 log.info("curator create node :{} successfully.",path);
 }
获取数据
 @Test
 public void testGetData() throws Exception {
 byte[] bytes = curatorFramework.getData().forPath("/curator-node");
 log.info("get data from node :{} successfully.",new String(bytes));
 }
更新节点
我们通过客户端实例的 setData() 方法更新 ZooKeeper 服务上的数据节点,在setData 方法的后边, 通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。
@Test
 public void testSetData() throws Exception {
 curatorFramework.setData().forPath("/curator-node","changed!".getBytes());
 byte[] bytes = curatorFramework.getData().forPath("/curator-node");
 log.info("get data from node /curator-node :{} successfully.",new String(bytes));
 }
删除节点
 @Test
 public void testDelete() throws Exception {
 String pathWithParent="/node-parent";
 
curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
}
guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除成功的作用,其底层工作方式
是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。
deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删 除其子节点,以及子节点的子节点。
异步接口
Curator 引入了BackgroundCallback 接口,用来处理服务器端返回来的信息,这个处理过程是在异
步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。
public interface BackgroundCallback
 {
 /**
 * Called when the async background operation completes
 *
 * @param client the client
 * @param event operation result details
 * @throws Exception errors
 */
 public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
 }
如上接口,主要参数为 client 客户端, 和 服务端事件 event。
inBackground 异步处理默认在EventThread中执行
@Test
 public void test() throws Exception {
 curatorFramework.getData().inBackground((item1, item2) -> {
 log.info(" background: {}", item2);
 }).forPath(ZK_NODE);

 TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
 }
指定线程池
@Test
 public void test() throws Exception {
 ExecutorService executorService = Executors.newSingleThreadExecutor();

 curatorFramework.getData().inBackground((item1, item2) -> {
 log.info(" background: {}", item2);
 },executorService).forPath(ZK_NODE);

 TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
 }
Curator 监听器
/**
 * Receives notifications about errors and background events
 */
 public interface CuratorListener
 {
 /**
 * Called when a background task has completed or a watch has triggered
 *
 * @param client client
 * @param event the event
 * @throws Exception any errors
 */
 public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}
针对 background 通知和错误通知。使用此监听器之后,调用inBackground 方法会异步获得监听
Curator Caches:
        Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 事件监听可以理解为一个本地 缓存视图与远程 Zookeeper 视图的对比过程。 Cache 提供了反复注册的功能。 Cache 分为两类注册类型:节点监听和子节点监听。
node cache:
NodeCache 对某一个节点进行监听
public NodeCache(CuratorFramework client,
 String path)
 Parameters:
 client - the client
 path - path to cache
可以通过注册监听器来实现,对当前节点数据变化的处理
public void addListener(NodeCacheListener listener)
 Add a change listener
 Parameters:
 listener - the listener
 @Slf4j
 public class NodeCacheTest extends AbstractCuratorTest{

 public static final String NODE_CACHE="/node-cache";

 @Test
 public void testNodeCacheTest() throws Exception {

 createIfNeed(NODE_CACHE);
 NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);
 nodeCache.getListenable().addListener(new NodeCacheListener() {
 @Override
 public void nodeChanged() throws Exception {
 log.info("{} path nodeChanged: ",NODE_CACHE);
 printNodeData();
 }
 });

 nodeCache.start();
 }


public void printNodeData() throws Exception {
 byte[] bytes = curatorFramework.getData().forPath(NODE_CACHE);
 log.info("data: {}",new String(bytes));
 }
 }
path cache:
PathChildrenCache 会对子节点进行监听,但是不会对二级子节点进行监听,
public PathChildrenCache(CuratorFramework client,
 String path,
 boolean cacheData)
 Parameters:
 client - the client
 path - path to watch
 cacheData - if true, node contents are cached in addition to the stat
可以通过注册监听器来实现,对当前节点的子节点数据变化的处理
public void addListener(PathChildrenCacheListener listener)
 Add a change listener
 Parameters:
 listener - the listener
 @Slf4j
 public class PathCacheTest extends AbstractCuratorTest{

 public static final String PATH="/path-cache";

 @Test
 public void testPathCache() throws Exception {

 createIfNeed(PATH);
 PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework,
PATH, true);
 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
 @Override
 public void childEvent(CuratorFramework client, PathChildrenCacheEvent
event) throws Exception {
 log.info("event: {}",event);
 }
 });

 // 如果设置为true则在首次启动时就会缓存节点内容到Cache中
 pathChildrenCache.start(true);
 }
 }
tree cache:
        TreeCache 使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点进行
了映 射。所以TreeCache 可以监听当前节点下所有节点的事件。
public TreeCache(CuratorFramework client,
 String path,
 boolean cacheData)
 Parameters:
 client - the client
 path - path to watch
 cacheData - if true, node contents are cached in addition to the stat
可以通过注册监听器来实现,对当前节点的子节点,及递归子节点数据变化的处理
public void addListener(TreeCacheListener listener)
 Add a change listener
 Parameters:
 listener - the listener
 @Slf4j
 public class TreeCacheTest extends AbstractCuratorTest{

 public static final String TREE_CACHE="/tree-path";

 @Test
 public void testTreeCache() throws Exception {
 createIfNeed(TREE_CACHE);
 TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);
 treeCache.getListenable().addListener(new TreeCacheListener() {
 @Override
 public void childEvent(CuratorFramework client, TreeCacheEvent event)
throws Exception {
 log.info(" tree cache: {}",event);
 }
 });
 treeCache.start();
 }
 }

2. Zookeeper在分布式命名服务中的实战

        命名服务是为系统中的资源提供标识能力。 ZooKeeper的命名服务主要是利用ZooKeeper节点的树形 分层结构和子节点的顺序维护能力,来为分布式系统中的资源命名。
哪些应用场景需要用到分布式命名服务呢?典型的有:
         分布式API目录
        分布式节点命名
        分布式ID生成器

2.1 分布式API目录

        为分布式系统中各种API接口服务的名称、链接地址,提供类似JNDI(Java命名和目录接口)中的文件 系统的功能。借助于ZooKeeper的树形分层结构就能提供分布式的API调用功能。
著名的Dubbo分布式框架就是应用了ZooKeeper的分布式的JNDI功能。在Dubbo中,使用
ZooKeeper维护的全局服务接口API的地址列表。大致的思路为:
         服务提供者(Service Provider) 在启动的时候,向ZooKeeper上的指定节
点/dubbo/${serviceName}/providers写入自己的API地址,这个操作就相当于服务的公开。
         服务消费者(Consumer) 启动的时候,订阅节点/dubbo/{serviceName}/providers下的服务提供者的URL地 址,获得所有服务提供者的API。

2.2 分布式节点的命名

        一个分布式系统通常会由很多的节点组成,节点的数量不是固定的,而是不断动态变化的。比如说, 当业务不断膨胀和流量洪峰到来时,大量的节点可能会动态加入到集群中。而一旦流量洪峰过去了, 就需要下线大量的节点。再比如说,由于机器或者网络的原因,一些节点会主动离开集群。
如何为大量的动态节点命名呢? 一种简单的办法是可以通过配置文件,手动为每一个节点命名。但
是,如果节点数据量太大,或者说变动频繁,手动命名则是不现实的,这就需要用到分布式节点的命名服务。
可用于生成集群节点的编号的方案:
         (1)使用数据库的自增ID特性,用数据表存储机器的MAC地址或者IP来维护。
        (2)使用ZooKeeper持久顺序节点的顺序特性来维护节点的NodeId编号。
在第2种方案中,集群节点命名服务的基本流程是:
         启动节点服务,连接ZooKeeper,检查命名服务根节点是否存在,如果不存在,就创建系统的根节点。
        在根节点下创建一个临时顺序ZNode节点,取回ZNode的编号把它作为分布式系统中节点的NODEID。
        [[如果临时节点太多,可以根据需要删除临时顺序ZNode节点。
2.3 分布式的ID生成器
在分布式系统中,分布式ID生成器的使用场景非常之多:
         大量的数据记录,需要分布式ID。
        大量的系统消息,需要分布式ID。
        大量的请求日志,如restful的操作记录,需要唯一标识,以便进行后续的用户行为分析和调用链路分析。
        分布式节点的命名服务,往往也需要分布式ID。
        。。。
传统的数据库自增主键已经不能满足需求。在分布式系统环境中,迫切需要一种全新的唯一ID系统, 这种系统需要满足以下需求:
        (1)全局唯一:不能出现重复ID。
        (2)高可用:ID生成系统是基础系统,被许多关键系统调用,一旦宕机,就会造成严重影响。
有哪些分布式的ID生成器方案呢? 大致如下:
          1. Java的UUID。
        2. 分布式缓存Redis生成ID:利用Redis的原子操作INCR和INCRBY,生成全局唯一的ID。
        3. Twitter的SnowFlake算法。
        4. ZooKeeper生成ID:利用ZooKeeper的顺序节点,生成全局唯一的ID。
        5. MongoDb的ObjectId:MongoDB是一个分布式的非结构化NoSQL数据库,每插入一条记录会自动生成全局唯 一的一个“_id”字段值,它是一个12字节的字符串,可以作为分布式系统中全局唯一的ID。
基于Zookeeper实现分布式ID生成器
在ZooKeeper节点的四种类型中,其中有以下两种类型具备自动编号的能力
        PERSISTENT_SEQUENTIAL持久化顺序节点。
        EPHEMERAL_SEQUENTIAL临时顺序节点。
ZooKeeper的每一个节点都会为它的第一级子节点维护一份顺序编号,会记录每个子节点创建的先后 、顺序,这个顺序编号是分布式同步的,也是全局唯一的。
可以通过创建ZooKeeper的临时顺序节点的方法,生成全局唯一的ID
@Slf4j
 public class IDMaker extends CuratorBaseOperations {

 private String createSeqNode(String pathPefix) throws Exception {
 CuratorFramework curatorFramework = getCuratorFramework();
 //创建一个临时顺序节点
 String destPath = curatorFramework.create()
 .creatingParentsIfNeeded()
 .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
 .forPath(pathPefix);
 return destPath;
 }

 public String makeId(String path) throws Exception {
 String str = createSeqNode(path);
 if(null != str){
 //获取末尾的序号
 int index = str.lastIndexOf(path);
 if(index>=0){
 index+=path.length();
 return index<=str.length() ? str.substring(index):"";
 }
 }
 return str;
 }
 }
测试
 @Test
 public void testMarkId() throws Exception {
 IDMaker idMaker = new IDMaker();
 idMaker.init();
 String pathPrefix = "/idmarker/id-";

 for(int i=0;i<5;i++){
 new Thread(()->{
 for (int j=0;j<10;j++){
 String id = null;
 try {
 id = idMaker.makeId(pathPrefix);
 log.info("{}线程第{}个创建的id为
{}",Thread.currentThread().getName(),
 j,id);
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 },"thread"+i).start();
 }

 Thread.sleep(Integer.MAX_VALUE);

 }
测试结果

基于Zookeeper实现SnowFlakeID算法

        Twitter(推特)的SnowFlake算法是一种著名的分布式服务器用户ID生成算法。SnowFlake算法所生 成的ID是一个64bit的长整型数字,如图10-2所示。这个64bit被划分成四个部分,其中后面三个部分 分别表示时间戳、工作机器ID、序列号。
SnowFlakeID的四个部分,具体介绍如下:
        (1)第一位 占用1 bit,其值始终是0,没有实际作用。
        (2)时间戳 占用41 bit,精确到毫秒,总共可以容纳约69年的时间。
        (3)工作机器id占用10 bit,最多可以容纳1024个节点。
        (4)序列号 占用12 bit。这个值在同一毫秒同一节点上从0开始不断累加,最多可以累加到4095。
        在工作节点达到1024顶配的场景下,SnowFlake算法在同一毫秒最多可以生成的ID数量为: 1024 * 4096 =4194304,在绝大多数并发场景下都是够用的。
SnowFlake算法的优点:
         生成ID时不依赖于数据库,完全在内存生成,高性能和高可用性。
        容量大,每秒可生成几百万个ID。
        ID呈趋势递增,后续插入数据库的索引树时,性能较高。
SnowFlake算法的缺点:
         依赖于系统时钟的一致性,如果某台机器的系统时钟回拨了,有可能造成ID冲突,或者ID乱序。
        在启动之前,如果这台机器的系统时间回拨过,那么有可能出现ID重复的危险。
基于zookeeper实现雪花算法:
public class SnowflakeIdGenerator {

 /**
 * 单例
 */
 public static SnowflakeIdGenerator instance =
 new SnowflakeIdGenerator();


 /**
 * 初始化单例
 *
 * @param workerId 节点Id,最大8091
 * @return the 单例
 */
 public synchronized void init(long workerId) {
 if (workerId > MAX_WORKER_ID) {
 // zk分配的workerId过大
 throw new IllegalArgumentException("woker Id wrong: " + workerId);
 }
 instance.workerId = workerId;
 }

 private SnowflakeIdGenerator() {

 }


 /**
 * 开始使用该算法的时间为: 2017-01-01 00:00:00
 */
 private static final long START_TIME = 1483200000000L;

 /**
 * worker id 的bit数,最多支持8192个节点
 */
 private static final int WORKER_ID_BITS = 13;

 /**
 * 序列号,支持单节点最高每毫秒的最大ID数1024
 */
 private final static int SEQUENCE_BITS = 10;

 /**
 * 最大的 worker id ,8091
 * -1 的补码(二进制全1)右移13位, 然后取反
 */
 private final static long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);

 /**
 * 最大的序列号,1023
 * -1 的补码(二进制全1)右移10位, 然后取反
 */
 private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);

 /**
 * worker 节点编号的移位
 */
 private final static long WORKER_ID_SHIFT = SEQUENCE_BITS;

 /**
 * 时间戳的移位
 */
 private final static long TIMESTAMP_LEFT_SHIFT = WORKER_ID_BITS + SEQUENCE_BITS;

 /**
 * 该项目的worker 节点 id
 */
 private long workerId;

 /**
 * 上次生成ID的时间戳
 */
 private long lastTimestamp = -1L;

 /**
 * 当前毫秒生成的序列
 */
 private long sequence = 0L;

 /**
 * Next id long.
 *
 * @return the nextId
 */
 public Long nextId() {
 return generateId();
 }

 /**
 * 生成唯一id的具体实现
 */
 private synchronized long generateId() {
 long current = System.currentTimeMillis();

 if (current < lastTimestamp) {
 // 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过,出现问题返回-1
 return -1;
 }

 if (current == lastTimestamp) {
 // 如果当前生成id的时间还是上次的时间,那么对sequence序列号进行+1
 sequence = (sequence + 1) & MAX_SEQUENCE;

 if (sequence == MAX_SEQUENCE) {
 // 当前毫秒生成的序列数已经大于最大值,那么阻塞到下一个毫秒再获取新的时间戳
 current = this.nextMs(lastTimestamp);
 }
 } else {
 // 当前的时间戳已经是下一个毫秒
 sequence = 0L;
 }

 // 更新上次生成id的时间戳
 lastTimestamp = current;

 // 进行移位操作生成int64的唯一ID

 //时间戳右移动23位
 long time = (current - START_TIME) << TIMESTAMP_LEFT_SHIFT;

 //workerId 右移动10位
 long workerId = this.workerId << WORKER_ID_SHIFT;

 return time | workerId | sequence;
 }

 /**
 * 阻塞到下一个毫秒
 */
 private long nextMs(long timeStamp) {
 long current = System.currentTimeMillis();
 while (current <= timeStamp) {
 current = System.currentTimeMillis();
 }
 return current;
 }
 }

3. zookeeper实现分布式队列

        常见的消息队列有:RabbitMQ,RocketMQ,Kafka等。Zookeeper作为一个分布式的小文件管理系 统,同样能实现简单的队列功能。 Zookeeper不适合大数据量存储,官方并不推荐作为队列使用,但 由于实现简单,集群搭建较为便利,因此在一些吞吐量不高的小型系统中还是比较好用的。

3.1 设计思路

        1. 创建队列根节点:在Zookeeper中创建一个持久节点,用作队列的根节点。所有队列元素的节点将放在这个根节 点下。
        2. 实现入队操作:当需要将一个元素添加到队列时,可以在队列的根节点下创建一个临时有序节点。节点的数据可 以包含队列元素的信息。
        3. 实现出队操作:当需要从队列中取出一个元素时,可以执行以下操作:
                 获取根节点下的所有子节点。
                找到具有最小序号的子节点。
                获取该节点的数据。
                删除该节点。
                返回节点的数据。
/**
 * 入队
 * @param data
 * @throws Exception
 */
 public void enqueue(String data) throws Exception {
 // 创建临时有序子节点
 zk.create(QUEUE_ROOT + "/queue-", data.getBytes(StandardCharsets.UTF_8),
 ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
 }

 /**
 * 出队
 * @return
 * @throws Exception
 */
 public String dequeue() throws Exception {
 while (true) {
 List<String> children = zk.getChildren(QUEUE_ROOT, false);
 if (children.isEmpty()) {
 return null;
 }

 Collections.sort(children);

 for (String child : children) {
 String childPath = QUEUE_ROOT + "/" + child;
 try {
 byte[] data = zk.getData(childPath, false, null);
 zk.delete(childPath, -1);
 return new String(data, StandardCharsets.UTF_8);
 } catch (KeeperException.NoNodeException e) {
 // 节点已被其他消费者删除,尝试下一个节点
 }
 }
 }
 }
3.2 使用Apache Curator实现分布式队列
        Apache Curator是一个ZooKeeper客户端的封装库,提供了许多高级功能,包括分布式队列。
public class CuratorDistributedQueueDemo {
 private static final String QUEUE_ROOT = "/curator_distributed_queue";

 public static void main(String[] args) throws Exception {
 CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",
 new ExponentialBackoffRetry(1000, 3));
 client.start();

 // 定义队列序列化和反序列化
 QueueSerializer<String> serializer = new QueueSerializer<String>() {
 @Override
 public byte[] serialize(String item) {
 return item.getBytes();
 }

 @Override
 public String deserialize(byte[] bytes) {
 return new String(bytes);
 }
 };

 // 定义队列消费者
 QueueConsumer<String> consumer = new QueueConsumer<String>() {
 @Override
 public void consumeMessage(String message) throws Exception {
 System.out.println("消费消息: " + message);
 }

 @Override
 public void stateChanged(CuratorFramework curatorFramework, ConnectionState
 connectionState) {

 }
 };

 // 创建分布式队列
 DistributedQueue<String> queue = QueueBuilder.builder(client, consumer,
serializer, QUEUE_ROOT)
 .buildQueue();
 queue.start();

 // 生产消息
 for (int i = 0; i < 5; i++) {
 String message = "Task-" + i;
 System.out.println("生产消息: " + message);
 queue.put(message);
 Thread.sleep(1000);
 }

 Thread.sleep(10000);
queue.close();
 client.close();
 }
 }
3.3 注意事项
        使用Curator的DistributedQueue时,默认情况下不使用锁。当调用QueueBuilder的lockPath()方法 并指定一个锁节点路径时,才会启用锁。如果不指定锁节点路径,那么队列操作可能会受到并发问题 的影响。
        在创建分布式队列时,指定一个锁节点路径可以帮助确保队列操作的原子性和顺序性。 分布式环境 中,多个消费者可能同时尝试消费队列中的消息。如果不使用锁来同步这些操作,可能会导致消息被 多次处理或者处理顺序出现混乱。当然,并非所有场景都需要指定锁节点路径。如果您的应用场景允许消息被多次处理,或者处理顺序不是关键问题,那么可以不使用锁。这样可以提高队列操作的性 能,因为不再需要等待获取锁。
// 创建分布式队列
 QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, serializer,
"/order");
 //指定了一个锁节点路径/orderlock,用于实现分布式锁,以保证队列操作的原子性和顺序性。
 queue = builder.lockPath("/orderlock").buildQueue();
 //启动队列,这时队列开始监听ZooKeeper中/order节点下的消息。
 queue.start();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/282269.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Select工作原理

I/O多路复用是一种并发处理的机制&#xff0c;允许一个进程通过一种机制监视多个描述符&#xff0c;从而在有多个I/O操作需要处理时选择其中之一进行服务。select 函数是一种常见的实现 I/O 多路复用的系统调用&#xff0c;它允许一个进程同时监视多个文件描述符的可读性、可写…

机器学习:贝叶斯估计在新闻分类任务中的应用

文章摘要 随着互联网的普及和发展&#xff0c;大量的新闻信息涌入我们的生活。然而&#xff0c;这些新闻信息的质量参差不齐&#xff0c;有些甚至包含虚假或误导性的内容。因此&#xff0c;对新闻进行有效的分类和筛选&#xff0c;以便用户能够快速获取真实、有价值的信息&…

全渠道客服系统推荐:选型指南与最佳实践分享

售后服务是影响客户满意度的最直接的因素。有些企业不注重产品的售后服务&#xff0c;不仅是对客户的伤害&#xff0c;更是对企业品牌的损害。所以&#xff0c;做好售后服务对于企业来讲至关重要。 企业谈到做好售后服务&#xff0c;少不了一款好用的客服系统工具。其中&#…

ARM CCA机密计算软件架构之内存加密上下文(MEC)

内存加密上下文(MEC) 内存加密上下文是与内存区域相关联的加密配置,由MMU分配。 MEC是Arm Realm Management Extension(RME)的扩展。RME系统架构要求对Realm、Secure和Root PAS进行加密。用于每个PAS的加密密钥、调整或加密上下文在该PAS内是全局的。例如,对于Realm PA…

ACW741.斐波那契额数列

输入整数 N&#xff0c;求出斐波那契数列中的第 N项是多少。 斐波那契数列的第 0项是 0&#xff0c;第 1项是 1&#xff0c;从第 2 项开始的每一项都等于前两项之和。输入格式 第一行包含整数 T&#xff0c;表示共有T个测试数据。接下来 T行&#xff0c;每行包含一个整数 N。输…

Android长按图标展示快捷方式

if (Build.VERSION.SDK_INT > Build.VERSION_CODES.O) {new Thread(() -> {// 获取ShortcutManager实例ShortcutManager shortcutManager getSystemService(ShortcutManager.class);// 创建要添加的快捷方式ShortcutInfo.Builder shortcutBuilder new ShortcutInfo.Bui…

UGF框架中尝试加载AB资源来运行案例工程失败的解决办法

打开GameFramework场景&#xff0c;在编辑器模式下找到 表示当前资源加载模式是编辑器模式。&#xff08;个人理解是和正常开发下的资源加载模式无异&#xff09; CXK补充的内容&#xff1a;需要找到如下图的脚本&#xff0c;把资源加载的模式改为Package模式&#xff08;单机…

com.microsoft.sqlserver.jdbc.SQLServerException: 驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。错误:“The

配置文件示例: # SQL Server 数据源配置 spring.datasource.dynamic.datasource.sqlserver.urljdbc:sqlserver://100.100.0.0\\shili;databaseNamecs; spring.datasource.dynamic.datasource.sqlserver.usernamesa spring.datasource.dynamic.datasource.sqlserver.password sp…

【LearnOpenGL基础入门——5】着色器

目录 一.简介 二.GLSL 三.数据类型 四.输入与输出 五.Uniform 六.更多属性 一.简介 着色器(Shader)是运行在GPU上的小程序。这些小程序为图形渲染管线的某个特定部分而运行。从基本意义上来说&#xff0c;着色器只是一种把输入转化为输出的程序。着色器也是一种非常独立…

python 1200例——【13】计算阶乘

阶乘是一个数学概念,表示为 n!(读作 n 的阶乘),表示从 1 到 n 的所有正整数的乘积。例如,5! = 5 4 3 2 1 = 120。 在 Python 中,我们可以使用多种方法来计算阶乘。以下是其中的一些方法: 方法一:使用循环 这是最基本的方法,我们通过循环从 1 到 n 依次乘起来。…

【Linux】内核编译 镜像制作

文章目录 一、Ubuntu内核编译1.1 为什么自己编译内核1.2 Ubuntu 内核源码下载1.21 内核的作用1.22 Linux内核与ubuntu内核1.23 Ubuntu内核源码获取 1.3 在Windows系统下编译ubuntu内核1.4 在Linux系统下编译ubuntu内核 二、镜像制作 一、Ubuntu内核编译 1.1 为什么自己编译内核…

拓扑排序

目录 拓扑排序 有向图的拓扑排序 拓扑排序 一个有向图&#xff0c;如果图中有入度为 0 的点&#xff0c;就把这个点删掉&#xff0c;同时也删掉这个点所连的边。 一直进行上面出处理&#xff0c;如果所有点都能被删掉&#xff0c;则这个图可以进行拓扑排序。 举例子&#…

【机器学习】人工智能概述

人工智能&#xff08;Artificial Intelligence&#xff0c;简称AI&#xff09;是一门研究如何使机器能够像人一样思考、学习和执行任务的学科。它是计算机科学的一个重要分支&#xff0c;涉及机器学习、自然语言处理、计算机视觉等多个领域。 人工智能的概念最早可以追溯到20世…

vue3框架笔记

Vue Vue 是一个渐进式的前端开发框架&#xff0c;很容易上手。Vue 目前的版本是 3.x&#xff0c;但是公司中也有很多使用的是 Vue2。Vue3 的 API 可以向下兼容 2&#xff0c;Vue3 中新增了很多新的写法。我们课程主要以 Vue3 为主 官网 我们学习 Vue 需要转变思想&#xff0…

[YoloV8目标检测与实例分割——目标检测onnx模型推理]

一、模型转换 1.onnxruntime ONNX Runtime&#xff08;ONNX Runtime或ORT&#xff09;是一个开源的高性能推理引擎&#xff0c;用于部署和运行机器学习模型。它的设计目标是优化执行使用Open Neural Network Exchange&#xff08;ONNX&#xff09;格式定义的模型&#xff0c;ON…

设备健康管理系统助力制造企业实现数字化转型

在当今快速变革的制造业环境中&#xff0c;数字化转型已成为制造企业保持竞争力和实现可持续发展的关键。在这个数字化转型的浪潮中&#xff0c;设备健康管理系统正发挥着重要的作用。设备健康管理系统通过实时监测、预测分析和智能诊断等功能&#xff0c;为制造企业提供了全面…

亚马逊云科技Amazon Q,一款基于生成式人工智能的新型助手

近日&#xff0c;亚马逊云科技宣布推出Amazon Q&#xff0c;这是一款基于生成式人工智能&#xff08;AI&#xff09;的新型助手&#xff0c;专为辅助工作而设计&#xff0c;可以根据您的业务量身定制。通过连接到公司的信息存储库、代码、数据和企业系统&#xff0c;可以使用Am…

Redis(认识NoSQL,认识redis,安装redis,redis桌面客户端,redis常见命令,redis的Java客户端)

文章目录 Redis快速入门1.初识Redis1.1.认识NoSQL1.1.1.结构化与非结构化1.1.2.关联和非关联1.1.3.查询方式1.1.4.事务1.1.5.总结 1.2.认识Redis1.3.安装Redis1.3.1.依赖库1.3.2.上传安装包并解压1.3.3.启动1.3.4.默认启动1.3.5.指定配置启动1.3.6.开机自启 1.4.Redis桌面客户端…

Apollo自动驾驶:改变交通运输的游戏规则

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 ChatGPT体验地址 文章目录 前言1. Apollo缓存层2. 本地状态管理库3. 离线同步和冲突解决4. 离线数据同步和离线优先策略结论 &#x1f4f2;&#x1f50c; 构建离线应用&#xff1a;Apollo…

【微服务】springboot整合skywalking使用详解

目录 一、前言 二、SkyWalking介绍 2.1 SkyWalking是什么 2.2 SkyWalking核心功能 2.3 SkyWalking整体架构 2.4 SkyWalking主要工作流程 三、为什么选择SkyWalking 3.1 业务背景 3.2 常见监控工具对比 3.3 为什么选择SkyWalking 3.3.1 代码侵入性极低 3.3.2 功能丰…