ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。可供选择的Java客户端API有:
- ZooKeeper官方的Java客户端API。
- 第三方的Java客户端API,比如Curator。
接下来我们将逐一学习一下这两个java客户端是如何操作zookeeper的。
1. ZooKeeper官方的Java客户端
1.1 简介
ZooKeeper官方的客户端API提供了基本的操作。例如,创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。不过,对于实际开发来说,ZooKeeper官方API有一些不足之处,具体如下:
- ZooKeeper的Watcher监听是一次性的,每次触发之后都需要重新进行注册。
- 会话超时之后没有实现重连机制。
- 异常处理烦琐,ZooKeeper提供了很多异常,对于开发人员来说可能根本不知道应该如何处理这些抛出的异常。
- 仅提供了简单的byte[]数组类型的接口,没有提供Java POJO级别的序列化数据处理接口。
- 创建节点时如果抛出异常,需要自行检查节点是否存在。
- 无法实现级联删除。
总之,ZooKeeper官方API功能比较简单,在实际开发过程中比较笨重,一般不推荐使用。
1.2 基础使用
使用zookeeper原生客户端,需要引入zookeeper客户端的依赖。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.3</version>
</dependency>
注意:保持与服务端版本一致,不然会有很多兼容性的问题。
ZooKeeper原生客户端主要使用org.apache.zookeeper.ZooKeeper这个类来调用ZooKeeper服务的。
1.2.1 连接zk集群
ZooKeeper构造器:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException { this(connectString, sessionTimeout, watcher, false); }
connectString:使用逗号分隔的列表,每个ZooKeeper节点是一个host.port对,host 是机器名或者IP地址,port是ZooKeeper节点对客户端提供服务的端口号。客户端会任意选取connectString 中的一个节点建立连接。
sessionTimeout : session timeout时间。
watcher:用于接收到来自ZooKeeper集群的事件。
如何使用客户端构造器与服务端建立连接:
建立连接的工具类;因为zookepper建立连接时特别慢,所以采用了CountDownLatch同步工具类,等待zookeeper客户端与服务端建立完成后,继续后续操作。
public class ZooKeeperFacotry { private static final int SESSION_TIMEOUT = 5000; public static ZooKeeper create(String connectionString) throws Exception { final CountDownLatch connectionLatch = new CountDownLatch(1); ZooKeeper zooKeeper = new ZooKeeper(connectionString, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType()== Event.EventType.None && event.getState() == Watcher.Event.KeeperState.SyncConnected) { connectionLatch.countDown(); System.out.println("连接建立"); } } }); System.out.println("等待连接建立..."); connectionLatch.await(); return zooKeeper; } }
public class ZkClientDemo { private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181"; public static void main(String[] args) throws Exception { //创建zookeeper对象 ZooKeeper zooKeeper = ZooKeeperFacotry.create(CLUSTER_CONNECT_STR); //连接 System.out.println(zooKeeper.getState()); }
运行结果:
1.2.2 操作节点
以下是Zookeeper原生客户端操作服务端的一些主要API:
- create(path, data, acl,createMode): 创建一个给定路径的 znode,并在 znode 保存 data[]的 数据,createMode指定 znode 的类型。
- delete(path, version):如果给定 path 上的 znode 的版本和给定的 version 匹配, 删除 znode。
- exists(path, watch):判断给定 path 上的 znode 是否存在,并在 znode 设置一个 watch。
- getData(path, watch):返回给定 path 上的 znode 数据,并在 znode 设置一个 watch。
- setData(path, data, version):如果给定 path 上的 znode 的版本和给定的 version 匹配,设置znode 数据。
- getChildren(path, watch):返回给定 path 上的 znode 的孩子 znode 名字,并在 znode 设置一个 watch。
- sync(path):把客户端 session 连接节点和 leader 节点进行同步。
API特点:
- 所有获取 znode 数据的 API 都可以设置一个 watch 用来监控 znode 的变化。
- 所有更新 znode 数据的 API 都有两个版本: 无条件更新版本和条件更新版本。如果 version 为 -1,更新为无条件更新。否则只有给定的 version 和 znode 当前的 version 一样,才会进行更新,这样的更新是条件更新。
- 所有的方法都有同步和异步两个版本。同步版本的方法发送请求给 ZooKeeper 并等待服务器的响 应。异步版本把请求放入客户端的请求队列,然后马上返回。异步版本通过 callback 来接受来 自服务端的响应。
接下来,我们利用这些API对zk的节点进行简单的操作
1.2.2.1 创建持久节点
代码:
public class ZkClientDemo {
private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";
public static void main(String[] args) throws Exception {
//创建zookeeper对象
ZooKeeper zooKeeper = ZooKeeperFacotry.create(CLUSTER_CONNECT_STR);
//连接
System.out.println(zooKeeper.getState());
Stat stat = zooKeeper.exists("/order",false);
if(null ==stat){
//创建持久节点
zooKeeper.create("/order","001".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
System.out.println("执行完了");
}
运行的结果:
我们在服务器的客户端查看一下数据:
有数据,创建成功了。
1.2.2.2 永久监听节点
代码:
public class ZkClientDemo {
private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";
public static void main(String[] args) throws Exception {
//创建zookeeper对象
ZooKeeper zooKeeper = ZooKeeperFacotry.create(CLUSTER_CONNECT_STR);
//连接
System.out.println(zooKeeper.getState());
Stat stat = zooKeeper.exists("/order",false);
//永久监听 addWatch -m mode
zooKeeper.addWatch("/order",new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event);
//TODO
}
},AddWatchMode.PERSISTENT);
Thread.sleep(Integer.MAX_VALUE);
}
启动程序后,在服务器的客户端修改监听的节点。
程序监听到 /order这个节点被修改了。
1.2.2.3 根据版本更新
代码:
public class ZkClientDemo {
private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";
public static void main(String[] args) throws Exception {
//创建zookeeper对象
ZooKeeper zooKeeper = ZooKeeperFacotry.create(CLUSTER_CONNECT_STR);
//连接
System.out.println(zooKeeper.getState());
Stat stat = zooKeeper.exists("/order",false);
stat = new Stat();
byte[] data = zooKeeper.getData("/order", false, stat);
System.out.println(" data: "+new String(data));
// -1: 无条件更新
//zooKeeper.setData("/user", "third".getBytes(), -1);
// 带版本条件更新
int version = stat.getVersion();
zooKeeper.setData("/order", "updateByVersion".getBytes(), version);
Thread.sleep(Integer.MAX_VALUE);
}
}
第一次打印的数据结果:
待程序执行完,在服务器客户端查询的结果:更具版本更新数据成功了。
对于zookeeper java原生客户端的使用,就简单介绍这么多,不在过多赘述,只是为了是让大家感受一下,原生客户端的使用。实际开发中并不推荐使用zk的原生客户端,过于笨重。
2. 开源的第三方客户端:Curator
2.1 简介
官网:https://curator.apache.org/
Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。
Curator是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。
Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。
在实际的开发场景中,使用Curator客户端就足以应付日常的ZooKeeper集群操作的需求。