1.Zookeeper系统模型
1)Zookeeper数据模型之ZNode
在Zookeeper中,数据信息被保存在一个个数据节点上,这些节点被称为ZNode。ZNode是Zookeeper中最小数据单位,在ZNode下面又可以再挂ZNode,这样一层层下去就形成了一个层次化命名空间ZNode树,我们称为ZNode Tree,它采用了类似文件系统的层级树状结构进行管理。
在Zookeeper中,每一个数据节点都是一个ZNode,上图根目录下有两个节点,分别是:app1和app2,其中app1下面又有三个子节点,所有ZNode按层次化进行组织,形成这么一棵树,ZNode的节点路径标识方式和Unix文件系统路径非常相似,都是由一系列使用斜杠(/)进行分割的路径表示,开发人员可以向这个节点写入数据,也可以在这个节点下面创建子节点
2)ZNode的类型
Zookeeper节点类型可以分为三大类:
- 持久性节点(Persistent)
- 临时性节点(Ephemeral)
- 顺序性节点(Sequential)
在开发中在创建节点的时候通过组合可以生成以下四种节点类型:持久节点、持久顺序节点、临时节点、临时顺序节点。不同类型的节点则会有不同的生命周期。
(1)持久节点
是Zookeeper中最常见的一种节点类型,所谓持久节点,就是指节点被创建后会一直存在服务器,直到删除操作主动清除
(2)持久顺序节点
就是顺序的持久节点,节点特性和持久节点是一样的,只是额外特性表现在顺序上。顺序特性实质是在创建节点的时候,会在节点名后面加上一个数字后缀,来表示其顺序
(3)临时节点
就是会被自动清理掉的节点,它的生命周期和客户端会话绑在一起,客户端会话结束,节点会被删除掉。与持久性节点不同的是,临时节点不能创建子节点
(4)临时顺序节点
就是有顺序的临时节点,和持久顺序节点相同,在其创建的时候会在名字后面加上数字后缀
3)事务ID
事务是对物理和抽象的应用状态上的操作集合。往往在现在的概念中,狭义上的事务通常指的是数据库事务,一般包含了一系列对数据库有序的读写操作,这些数据库事务具有所谓的ACID特性,即原子性(Atomic)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)
而在Zookeeper中,事务是指能够改变Zookeeper服务器状态的操作,我们也称之为事务操作或更新操作,一般包括数据节点创建与删除、数据节点内容更新等操作。对于每一个事务请求,Zookeeper都会为其分配一个全局唯一的事务ID,用ZXID来表示,通常是一个64位的数字。每一个ZXID对应一次更新操作,从这些ZXID中可以间接地识别出Zookeeper处理这些更新操作请求的全局顺序。
4)ZNode的状态信息
整个ZNode节点内容包括两部分:节点数据内容和节点状态信息。图中的quota是数据内容,其他的属于状态信息。那么这些状态信息都有什么含义呢?
- cZxid:Create ZXID,表示节点被创建时的事务ID
- ctime:Create Time,表示节点创建时间
- mZxid:Modified ZXID,表示节点最后一次被修改时的事务ID
- mtime:Modified Time,表示节点最后一次被修改的时间
- pZxid:表示该节点的子节点列表最后一次被修改时的事务ID,只有子节点列表变更才会更新pZxid,子节点内容变更不会更新
- cversion:表示子节点的版本号
- dataVersion:表示内容版本号
- aclVersion:标识acl版本
- ephemeralOwner:表示创建该临时节点时的会话sessionID,如果是持久性节点那么值为0
- dataLength:表示数据长度
- numChildren:表示直系节点数
5)Watcher–数据变更通知
Zookeeper使用Watcher机制实现分布式数据的发布/订阅功能
一个典型的发布/订阅模式系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理。
在Zookeeper中,引入了Watcher机制来实现这种分布式的通知方式。Zookeeper允许客户端向服务端注册一个Watcher监听,当服务端的一些指定事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。
整个Watcher注册与通知过程如图所示
Zookeeper的Watcher机制主要包括客户端线程、客户端WatcherManager、Zookeeper服务器三部分。
具体工作流程为:客户端在向Zookeeper服务器注册的同时,会将Watcher对象存储在客户端的WatcherManager当中。当Zookeeper服务器触发Watcher事件后,会向客户端发送通知,客户端线程从WatcherManager中取出对应的Watcher对象来执行回调逻辑
6)ACL–保障数据的安全
Zookeeper作为一个分布式协调框架,其内部存储了分布式系统运行时状态的元数据,这些元数据会直接影响基于Zookeeper进行构造的分布式系统的运行状态,因此,如何保障系统中数据的安全,从而避免因误操作所带来的数据随意变更而导致的数据库异常十分重要,在Zookeeper中,提供了一套完善的ACL(Access Control List)权限控制机制来保障数据的安全
我们可以从三个方面来理解ACL机制:权限模式(Scheme)、授权对象(ID)、权限)Permission),通常使用“scheme : id : permission"来标识一个有效的ACL信息
(1)权限模式:Scheme
权限模式用来确定权限验证过程中使用的检验策略,有如下四种模式:
- IP
- IP模式就是通过IP地址粒度来进行权限控制
- 如“ip:192.168.0.110"表示权限控制针对该IP地址
- 同时IP模式可以支持按照网段方式进行配置,如"ip:192.168.0.1/24"表示针对192.168.0.*这个网段进行权限控制
- Digest
- Digest是最常用的权限控制模式,要更符合我们对权限控制的认识,其使用"username:password"形式的权限标识来进行权限配置,便于区分不同应用来进行权限控制。
- 当我们通过"username:password"形式配置了权限标识后,Zookeeper会先后对其进行SHA-1加密和BASE64编码
- World
- World是一种最开放的权限控制模式,这种权限控制方式几乎没有任何作用,数据节点的访问权限对所有用户开放,即所有用户都可以在不进行任何权限校验的情况下操作Zookeeper上的数据。
- 另外,World模式也可以看作是一种特殊的Digest模式,它只有一个权限标识,即"world:anyone"
- Super
- Super模式,顾名思义就是超级用户的意思,也是一种特殊的Digest模式。在Super模式下,超级用户可以对任意Zookeeper上的数据节点进行任何操作
(2)授权对象:ID
授权对象指的是权限赋予的用户或一个指定实体,例如IP地址或是机器等。在不同的权限模式下,授权对象是不同的,表中列出了各个权限模式和授权对象之间的对应关系。
权限模式 | 授权对象 |
---|---|
IP | 通常是一个Ip地址或IP段,例如:192.168.10.110或192.168.10.1/24 |
Digest | 自定义,通常是username:BASE64(SHA-1((username:password)) |
World | 只有一个ID:anyone |
Super | 超级用户 |
(3)权限:Permission
权限就是指那些通过权限检查后可以被允许执行的操作。在Zookeeper中,所有对数据的操作权限分为以下五大类:
- CREATE(C):数据节点的创建权限,允许授权对象在该数据节点下创建子节点
- DELETE(D):子节点的删除权限,允许授权对象删除该数据节点的子节点
- READ(R):数据节点的读取权限,允许授权对象访问该数据节点并读取其数据内容或子节点列表等
- WAITE(W):数据节点的更新权限,允许授权对象对该数据节点进行更新操作
- ADMIN(A):数据节点的管理权限,允许授权对象对该数据节点进行ACL相关的设置操作
2.Zookeeper命令行操作
首先,进入到zookeeper的bin目录之后
通过zkClient进入Zookeeper客户端命令行
./zkcli.sh 连接本地的zookeeper服务器
./zkCli.sh -server ip:port 连接指定的服务器
连接成功之后,系统会输出Zookeeper的相关环境及配置信息等信息。输入help之后,屏幕会输出可用的zookeeper命令,如下图所示
1)创建节点
使用create命令,可以创建一个Zookeeper节点,如
create [-s][-e] path data acl
其中,-s或-e分别指定节点特性,顺序或临时节点,若不指定,则创建持久节点;acl用来进行权限控制
(1)创建顺序节点
create -s /zk-test 123
(2)创建临时节点
create -e /zk-temp 123
临时节点在客户端会话结束后,就会自动删除,下面使用quit命令退出客户端
quit
再次使用客户端连接服务器,并使用ls/命令查看根目录下的节点
可以看到根目录下已经不存在zk-temp临时节点了
(3)创建永久节点
create /zk-permanent 123
可以看到永久节点不同于顺序节点,不会自动在后面添加一串数字
2)读取节点
与读取相关的命令有 ls 命令和 get 命令
ls 命令可以列出Zookeeper指定节点下的所有子节点,但只能查看指定节点下的第一级的所有子节点
ls path
#其中,path表示的是指定数据字节的节点路径
get 命令可以获取Zookeeper指定节点的数据内容和属性信息
get path
若获取根节点下面的所有子节点,使用 ls / 命令即可
若想获取 /zk-permanent 的数据内容和属性,可使用如下命令:get /zk-permanent
3)更新节点
使用set命令,可以更新指定节点的数据内容,用法如下:
set path data [version]
其中,data就是要更新的新内容,version表示数据版本,在zookeeper中,节点的数据是有版本概念的,这个参数用于指定本次更新操作是基于ZNode的哪一个数据版本进行的,如将/zk-permanent节点的数据更新为456,可以使用如下命令:set /zk-permanent 456
现在dataVersion已经变为1了,表示进行了更新
4)删除节点
使用delete命令可以删除Zookeeper上的指定节点,用法如下
delete path [version]
其中version表示数据版本,使用delete /zk-permanent 命令即可删除 /zk-permanent节点
【注】若删除节点存在子节点,那么无法删除该节点,必须先删除子节点,再删除父节点
3.Zookeeper的api使用
Zookeeper作为一个分布式框架,主要用来解决分布式一致性问题,它提供了简单的分布式源语,并且对多种编程语言提供了API,所以接下来重点来看下Zookeeper的Java客户端API使用方式
Zookeeper API共包含五个包,分别为:
- org.apache.zookeeper
- org.apache.zookeeper.data
- org.apache.zookeeper.server
- org.apache.zookeeper.server.quorum
- org.apache.zookeeper.server.upgrade
包含Zookeeper类,他是我们编程时最常用的类文件。这个类是Zookeeper客户端的主要类文件。如果要使用Zookeeper服务,应用程序首先必须创建一个Zookeeper实例,这是就需要使用此类。一旦客户端和Zookeeper服务端建立起了连接,Zookeeper系统将会给本次连接会话分配一个ID值,并且客户端将会周期性的向服务器端发送心跳维持会话连接。主要连接有效,客户端就可以使用Zookeeper API来做相应处理了。
准备工作:导入依赖
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.9.2</version>
</dependency>
</dependencies>
1)建立会话
public class CreateSession implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
/*
* 建立会话
*/
public static void main(String[] args) throws IOException, InterruptedException {
/*
客户端可以通过创建⼀个zk实例来连接zk服务器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 连接地址:IP:端⼝
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
ZooKeeper zooKeeper = new ZooKeeper("192.168.101.3:2181", 5000, new CreateSession());
// System.out.println(zooKeeper.getState());
//计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
countDownLatch.await();
System.out.println("客户端与服务端会话真正建立了"+zooKeeper.getState());
}
/*
* 回调方法:处理来自服务器端的Watcher通知
**/
@Override
public void process(WatchedEvent watchedEvent) {
// SyncConnectedif
if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
//解除主程序在CountDownLatch的等待阻塞
countDownLatch.countDown();
}
}
}
【注】
Zookeeper客户端和服务端会话的建立是一个异步的过程,也就是说在程序中,构造方法会在处理完客户端初始化工作后立即返回,在大多数情况下,此时并没有真正建立一个可用的会话,在会话的生命周期中处于“CONNECTING”状态。当该会话真正创建完毕后Zookeeper服务端向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话
2)创建节点
public class CreateNode implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper("192.168.101.3:2181", 5000, new CreateNode());
// countDownLatch.await();
Thread.sleep(Integer.MAX_VALUE);
}
@Override
public void process(WatchedEvent watchedEvent) {
//当连接创建了,服务端发送给客户端SyncConnected事件
if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
//解除主程序在CountDownLatch的等待阻塞
// countDownLatch.countDown();
//调用创建节点方法
try {
createNodeSync();
}catch (Exception e){
e.printStackTrace();
}
}
}
private void createNodeSync() throws Exception {
/*
* path :节点创建的路径
* data[] :节点创建要保存的数据,是个byte类型的
* acl :节点创建的权限信息(4种类型)
* ANYONE_ID_UNSAFE : 表示任何⼈
* AUTH_IDS :此ID仅可⽤于设置ACL。它将被客户机验证的ID替换。
* OPEN_ACL_UNSAFE :这是⼀个完全开放的ACL(常⽤)--> world:anyone
* CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
* createMode :创建节点的类型(4种类型)
* PERSISTENT:持久节点
* PERSISTENT_SEQUENTIAL:持久顺序节点
* EPHEMERAL:临时节点
* EPHEMERAL_SEQUENTIAL:临时顺序节点
String node = zookeeper.create(path,data,acl,createMode);
**/
String node_PERSISTENT = zooKeeper.create("/xg_persistent","持久节点内容".getBytes(StandardCharsets.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String node_PERSISTENT_SEQUENTIAL = zooKeeper.create("/xg_persistent_sequential","持久顺序节点内容".getBytes(StandardCharsets.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL);
String node_EPERSISTENT = zooKeeper.create("/xg_ephemeral","临时节点内容".getBytes(StandardCharsets.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
System.out.println("创建的持久节点是:"+node_PERSISTENT);
System.out.println("创建的持久顺序节点是:"+node_PERSISTENT_SEQUENTIAL);
System.out.println("创建的临时节点是:"+node_EPERSISTENT);
}
}
3)获取节点数据
public class GetNodeData implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
/*
建立会话
*/
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
/*
客户端可以通过创建一个zk实例来连接zk服务器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 连接地址:IP:端口
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
zooKeeper = new ZooKeeper("192.168.101.3:2181", 5000, new GetNodeData());
System.out.println(zooKeeper.getState());
// 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
//countDownLatch.await();\
Thread.sleep(Integer.MAX_VALUE);
}
/*
回调方法:处理来自服务器端的watcher通知
*/
public void process(WatchedEvent watchedEvent) {
/*
子节点列表发生改变时,服务器端会发生noteChildrenChanged事件通知
要重新获取子节点列表,同时注意:通知是一次性的,需要反复注册监听
*/
if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
List<String> children = null;
try {
children = zooKeeper.getChildren("/xg_persistent", true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(children);
}
// SyncConnected
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
//解除主程序在CountDownLatch上的等待阻塞
System.out.println("process方法执行了...");
// 获取节点数据的方法
try {
getNoteData();
// 获取节点的子节点列表方法
getChildren();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/*
获取某个节点的内容
*/
private void getNoteData() throws KeeperException, InterruptedException {
/**
* path : 获取数据的路径
* watch : 是否开启监听
* stat : 节点状态信息
* null: 表示获取最新版本的数据
* zk.getData(path, watch, stat);
*/
byte[] data = zooKeeper.getData("/xg_persistent", false, null);
System.out.println(new String(data));
}
/*
获取某个节点的子节点列表方法
*/
public static void getChildren() throws KeeperException, InterruptedException {
/*
path:路径
watch:是否要启动监听,当子节点列表发生变化,会触发监听
zooKeeper.getChildren(path, watch);
*/
List<String> children = zooKeeper.getChildren("/xg_persistent", true);
System.out.println(children);
}
}
4)修改节点数据
public class UpdateNodeData implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
/*
建立会话
*/
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
/*
客户端可以通过创建一个zk实例来连接zk服务器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 连接地址:IP:端口
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
zooKeeper = new ZooKeeper("192.168.101.3:2181", 5000, new UpdateNodeData());
System.out.println(zooKeeper.getState());
// 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
//countDownLatch.await();\
Thread.sleep(Integer.MAX_VALUE);
}
/*
回调方法:处理来自服务器端的watcher通知
*/
public void process(WatchedEvent watchedEvent) {
// SyncConnected
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
//解除主程序在CountDownLatch上的等待阻塞
System.out.println("process方法执行了...");
// 更新数据节点内容的方法
try {
updateNoteSync();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/*
更新数据节点内容的方法
*/
private void updateNoteSync() throws KeeperException, InterruptedException {
/*
path:路径
data:要修改的内容 byte[]
version:为-1,表示对最新版本的数据进行修改
zooKeeper.setData(path, data,version);
*/
byte[] data = zooKeeper.getData("/xg_persistent", false, null);
System.out.println("修改前的值:" + new String(data));
//修改/lg-persistent 的数据 stat: 状态信息对象
Stat stat = zooKeeper.setData("/xg_persistent", "客户端修改了节点数据".getBytes(), -1);
byte[] data2 = zooKeeper.getData("/xg_persistent", false, null);
System.out.println("修改后的值:" + new String(data2));
}
}
5)删除节点
public class DeleteNode implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
/*
建立会话
*/
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
/*
客户端可以通过创建一个zk实例来连接zk服务器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 连接地址:IP:端口
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
*/
zooKeeper = new ZooKeeper("192.168.101.3:2181", 5000, new DeleteNode());
System.out.println(zooKeeper.getState());
// 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
//countDownLatch.await();\
Thread.sleep(Integer.MAX_VALUE);
}
/*
回调方法:处理来自服务器端的watcher通知
*/
public void process(WatchedEvent watchedEvent) {
// SyncConnected
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
//解除主程序在CountDownLatch上的等待阻塞
System.out.println("process方法执行了...");
// 删除节点
try {
deleteNoteSync();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/*
删除节点的方法
*/
private void deleteNoteSync() throws KeeperException, InterruptedException {
/*
zooKeeper.exists(path,watch) :判断节点是否存在
zookeeper.delete(path,version) : 删除节点
*/
Stat stat = zooKeeper.exists("/xg_persistent/c1", false);
System.out.println(stat == null ? "该节点不存在":"该节点存在");
if(stat != null){
zooKeeper.delete("/xg_persistent/c1",-1);
}
Stat stat2 = zooKeeper.exists("/xg_persistent/c1", false);
System.out.println(stat2 == null ? "该节点不存在":"该节点存在");
}
}
4.Zookeeper开源客户端
1)ZkClient
ZkClient是Github上一个开源的zookeeper客户端,在zookeeper原生API接口之上进行了包装,是一个更易用的Zookeeper客户端,同时,ZkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能
添加依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency>
(1)创建会话
public class CreateSession {
public static void main(String[] args) {
/*
* 创建一个zkclient实例就可以完成连接,完成会话的创建
* serverString : 服务器连接地址
* 注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了..
*/
ZkClient zkClient = new ZkClient("192.168.101.3:2181");
System.out.println("会话被创建了...");
}
}
(2)创建节点
public class CreateNode {
/*
* 借助zkclient完成会话的创建
*/
public static void main(String[] args) {
/*
* 创建一个zkclient实例就可以完成连接,完成会话的创建
* serverString : 服务器连接地址
* 注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了..
*/
ZkClient zkClient = new ZkClient("192.168.101.3:2181");
System.out.println("会话被创建了..");
/*
* createParents : 是否要创建父节点,如果值为true,那么就会递归创建节点
*/
zkClient.createPersistent("/xg_zkclient/c1",true);
System.out.println("节点递归创建完成");
}
}
【注】
在原生态接口是无法递归创建节点的(父节点不存在),但是通过ZkClient通过设置createParents参数为true可以递归的先创建父节点,再创建子节点。
(3)删除节点
ZkClient提供了递归删除节点的接口,即其帮助开发者先删除所有子节点(存在),再删除父节点
public class DeleteNode {
/*
* 借助zkclient完成会话的创建
*/
public static void main(String[] args) {
/*
创建一个zkclient实例就可以完成连接,完成会话的创建
serverString : 服务器连接地址
注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了..
*/
ZkClient zkClient = new ZkClient("192.168.101.3:2181");
System.out.println("会话被创建了..");
// 递归删除节点
String path = "/xg_zkclient/c1";
zkClient.createPersistent(path+"/c11");
zkClient.deleteRecursive(path);
System.out.println("递归删除成功");
}
}
(4)获取子节点
public class GetNode {
/*
* 借助zkclient完成会话的创建
*/
public static void main(String[] args) throws InterruptedException {
/*
创建一个zkclient实例就可以完成连接,完成会话的创建
serverString : 服务器连接地址
注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了..
*/
ZkClient zkClient = new ZkClient("192.168.101.3:2181");
System.out.println("会话被创建了..");
// 获取子节点列表
List<String> children = zkClient.getChildren("/xg_zkclient");
System.out.println(children);
// 注册监听事件
/*
客户端可以对一个不存在的节点进行子节点变更的监听
只要该节点的子节点列表发生变化,或者该节点本身被创建或者删除,都会触发监听
*/
zkClient.subscribeChildChanges("/xg_zkclient_get", new IZkChildListener() {
/*
s : parentPath
list : 变化后子节点列表
*/
public void handleChildChange(String parentPath, List<String> list) throws Exception {
System.out.println(parentPath + "的子节点列表发生了变化,变化后的子节点列表为"+ list);
}
});
//测试
zkClient.createPersistent("/xg_zkclient_get");
Thread.sleep(1000);
zkClient.createPersistent("/xg_zkclient_get/c1");
Thread.sleep(1000);
}
}
客户端可以对一个不存在的节点进行子节点变更的监听
一旦客户端对一个节点注册了子节点列表变更监听之后,那么当该节点的子节点列表发生变更时,服务端都会通知客户端,并将最新的子节点列表发送给客户端。
该节点本身的创建或删除也会通知到客户端
(5)获取数据(节点是否存在、更新、删除)
public class GetNodeSample {
public static void main(String[] args) throws InterruptedException {
/*
创建一个zkclient实例就可以完成连接,完成会话的创建
serverString : 服务器连接地址
注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了..
*/
ZkClient zkClient = new ZkClient("192.168.101.3:2181");
System.out.println("会话被创建了..");
// 判断节点是否存在
String path = "/xg_zkClient_ep";
boolean exists = zkClient.exists(path);
if(!exists){
// 创建临时节点
zkClient.createEphemeral(path,"123");
}
// 读取节点内容
Object o = zkClient.readData(path);
System.out.println(o);
// 注册监听
zkClient.subscribeDataChanges(path, new IZkDataListener() {
/*
当节点数据内容发生变化时,执行的回调方法
s: path
o: 变化后的节点内容
*/
public void handleDataChange(String s, Object o) throws Exception {
System.out.println(s+"该节点内容被更新,更新的内容"+o);
}
/*
当节点被删除时,会执行的回调方法
s : path
*/
public void handleDataDeleted(String s) throws Exception {
System.out.println(s+"该节点被删除");
}
});
// 更新节点内容
zkClient.writeData(path,"456");
Thread.sleep(1000);
// 删除节点
zkClient.delete(path);
Thread.sleep(1000);
}
}
运行结果:
123
/xg_zkClient_ep该节点内容被更新,更新后的内容4567
/xg_zkClient_ep该节点被删除
表明可以成功监听节点数据变化或删除事件
2)Curator客户端
curator是Netflix公司开源的一套Zookeeper客户端框架,和ZkClient一样,Curator解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连,反复注册Watcher和NodeExistsException异常等,是最流行的Zookeeper客户端之一。从编码风格上来讲,它提供了给予Fluent的变成风格支持
依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
(1)创建会话
Curator的创建会话方式与原生的API和ZkClient的创建方式区别很大。Curator创建客户端是通过CuratorFrameworkFactor工厂类来实现的。具体如下:
1.使用CuratorFramework这个工厂类的两个静态方法来创建一个客户端
public static CuratorFramework newClient(String connectString,
RetryPolicy retryPolicy)
public static CuratorFramework newClient(String connectString,
int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
其中RetryPolicy提供重试策略的接口,可以让用户实现自定义的重试策略,默认提供了以下实现,分别为ExponentialBackoffRetry(基于backoff的重连策略)、RetryNTimes(重连N次策略)、RetryForever(永远重试策略)
2.通过调用CuratorFramework中的start()方法来启动会话
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client =
CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy);
client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
5000,1000,retryPolicy);
client.start();
其实进一步查看源代码可以得知,其实这两种方法内部实现一样,只是对外包装不同的方法。它们的底层都是通过第三个方法builder来实现的
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
private static CuratorFramework Client = CuratorFrameworkFactory.builder()
.connectString("server1:2181,server2:2181,server3:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(retryPolicy)
.build();
client.start();
参数:
- connectString:zk的server地址,多个server之间使用英文逗号分隔开
- connectionTimeoutMs:连接超时时间,如上是30s,默认是15s
- sessionTimeoutMs:会话超时时间,如上是50s,默认是60s
- retryPolicy:失败重试策略
- ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
- baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间,
- 计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
- maxRetries:最大重试次数
- maxSleepMs:最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间,默认的最大事件是Integer.MAX_VALUE毫秒
- baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间,
- ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
- start():完成会话的创建
public class CreateSession {
// 创建会话
public static void main(String[] args) {
//不使用fluent编程风格
RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.101.3:2181", exponentialBackoffRetry);
curatorFramework.start();
System.out.println( "会话被建立了");
// 使用fluent编程风格
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.101.3:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(exponentialBackoffRetry)
.namespace("base") // 独立的命名空间 /base
.build();
client.start();
System.out.println("会话2创建了");
}
}
【注】
session2会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对 /base目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离。
(2)创建节点
curator提供了一系列Fluent风格的接口,通过使用Fluent编程风格的接口,开发人员可以进行自由组合来完成各种类型节点的创建。
下面简单介绍一下常用的几个节点创建场景
1.创建一个初始内容为空的节点
client.create().forPath(path);
Curator默认创建的时持久节点,内容为空
2.创建一个包含内容的节点
client.create().forPath(path,"我是内容".getBytes());
Curator和ZkClient不同的是依旧采用Zookeeper原生API的风格,内容使用byte[]作为方法参数
3.递归创建父节点,并选择节点类型
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(path);
creatingParentsIfNeeded这个接口非常有用,在使用Zookeeper的过程中,开发人员经常会碰到NoNodeException异常,其中一个可能的远呀就是试图对一个不存在的父节点创建子节点。因此,开发人员不得不在每次创建节点之前,都判断一下该父节点是否存在。在使用Curator之后,通过调用creatingParentsIfNeeded接口,Curator就能够自动地递归创建所有需要的父节点。
public class CreateNode {
// 创建会话
public static void main(String[] args) throws Exception {
//不使用fluent编程风格
RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
// 使用fluent编程风格
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.101.3:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(exponentialBackoffRetry)
.namespace("base") // 独立的命名空间 /base
.build();
client.start();
System.out.println("会话2创建了");
// 创建节点
String path = "/xg_curator/c1";
String s = client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
System.out.println("节点递归创建成功,该节点路径" + s);
}
}
(3)删除节点
删除节点的方法也是基于Fluent方式来进行操作,不同类型的操作调用 新增不同的方法调用即可。
1.删除一个子节点
client.delete().forPath(path);
2.删除节点并递归删除其子节点
client.delete().deletingChildrenIfNeeded().forPath(path);
3.指定版本进行删除
client.delete().withVersion(1).forPath(path);
如果版本已经不存在,则删除异常,异常信息如下
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion for
4.强制保证删除一个节点
client.delete().guaranteed().forPath(path);
只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到节点删除成功。比如遇到一些网络异常的情况,此guaranteed的强制删除就会很有效果。
public class DeleteNode {
// 创建会话
public static void main(String[] args) throws Exception {
//不使用fluent编程风格
RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.101.3:2181", exponentialBackoffRetry);
curatorFramework.start();
System.out.println( "会话被建立了");
// 使用fluent编程风格
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.101.3:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(exponentialBackoffRetry)
.namespace("base") // 独立的命名空间 /base
.build();
client.start();
System.out.println("会话2创建了");
// 删除节点
String path = "/xg_curator";
client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);
System.out.println("删除成功,删除的节点" + path);
}
}
(4)获取数据
获取节点数据内容API相当简单,同时Curator提供了传入一个Stat变量的方式来存储服务器返回的最新的节点状态信息
//普通查询
client.getData().forPath(path);
//包含状态查询
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
public class GetNode {
// 创建会话
public static void main(String[] args) throws Exception {
//不使用fluent编程风格
RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
// 使用fluent编程风格
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.101.3:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(exponentialBackoffRetry)
.namespace("base") // 独立的命名空间 /base
.build();
client.start();
System.out.println("会话2创建了");
// 创建节点
String path = "/xg_curator/c1";
String s = client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
System.out.println("节点递归创建成功,该节点路径" + s);
// 获取节点的数据内容及状态信息
// 数据内容
byte[] bytes = client.getData().forPath(path);
System.out.println("获取到的节点数据内容:" + new String(bytes));
// 状态信息
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
System.out.println("获取到的节点状态信息:" + stat );
}
}
(5)更新数据
更新数据,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如果version已经变更,则抛出异常。
//普遍更新
client.setData().forPath(path,"新内容".getBytes());
//制定版本更新
client.setData().withVersion(1).forPath(path);
版本不一致异常信息:
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion for
public class UpdateNode {
public static void main(String[] args) throws Exception {
RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
// 使用fluent编程风格
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.101.3:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(exponentialBackoffRetry)
.namespace("base") // 独立的命名空间 /base
.build();
client.start();
System.out.println("会话2创建了");
// 创建节点
String path = "/xg_curator/c1";
// 获取节点的数据内容及状态信息
// 数据内容
byte[] bytes = client.getData().forPath(path);
System.out.println("获取到的节点数据内容:" + new String(bytes));
// 状态信息
Stat stat = new Stat(); //0
client.getData().storingStatIn(stat).forPath(path);
System.out.println("获取到的节点状态信息:" + stat );
// 更新节点内容 //1
int version = client.setData().withVersion(stat.getVersion()).forPath(path, "修改内容1".getBytes()).getVersion();
System.out.println("当前的最新版本是" + version);
byte[] bytes2 = client.getData().forPath(path);
System.out.println("修改后的节点数据内容:" + new String(bytes2));
// BadVersionException
client.setData().withVersion(stat.getVersion()).forPath(path,"修改内容2".getBytes());
}
}