文章目录
- 一、概述
- 二、导入依赖包
- 三、与 Zookeeper 建立连接
- 四、判断 ZooKeeper 节点是否存在
- 四、创建 ZooKeeper 节点数据
- 五、获取 ZooKeeper 节点数据
- 六、修改 ZooKeeper 节点数据
- 七、异步获取 ZooKeeper 节点数据
- 八、完整示例
如果您还没有安装Zookeeper请看ZooKeeper 安装说明,Zookeeper 命令使用方法和数据说明。
一、概述
-
ZooKeeper是一个开源的、分布式的协调服务,它主要用于分布式系统中的数据管理和协调任务。它提供了一个具有高可用性的分布式环境,用于存储和管理小规模数据,例如配置信息、命名服务、分布式锁等。
-
本文主要介绍如何使用 Java 与 ZooKeeper 建立连接,进行数据创建、修改、读取、删除等操作。
-
源码地址:https://github.com/apache/zookeeper
二、导入依赖包
-
在 pom.xml 文件中导入 Zookeeper 包,注意一般这个包的版本要和您安装的 Zookeeper 服务端版本一致。
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.8.2</version> </dependency>
三、与 Zookeeper 建立连接
- 与ZooKeeper集群建立连接使用 ZooKeeper 类,传递三个参数,分别是
- connectionString ,是ZooKeeper 集群地址(没连接池的概念,是Session的概念)
- sessionTimeout , 是ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
- watcher, ZooKeeper Session 级别监听器( Watcher),(Watch只发生在读方法上,如 get、exists等)
private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
// ZooKeeper 集群地址(没连接池的概念,是Session的概念)
String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";
// ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
Integer sessionTimeout = 3000;
// ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)
final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
Event.KeeperState state = watchedEvent.getState();
Event.EventType type = watchedEvent.getType();
String path = watchedEvent.getPath();
switch (state) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
countDownLatch.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
switch (type) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
System.out.println("Session watch state=" + state);
System.out.println("Session watch type=" + type);
System.out.println("Session watch path=" + path);
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 由于建立连接是异步的,这里先阻塞等待连接结果
countDownLatch.await();
ZooKeeper.States state = zooKeeper.getState();
switch (state) {
case CONNECTING:
break;
case ASSOCIATING:
break;
case CONNECTED:
break;
case CONNECTEDREADONLY:
break;
case CLOSED:
break;
case AUTH_FAILED:
break;
case NOT_CONNECTED:
break;
}
System.out.println("ZooKeeper state=" + state);
return zooKeeper;
}
四、判断 ZooKeeper 节点是否存在
- 创建节点数据使用 exists 方法,传递四个参数
- path , 表示节点目录名称
- watch, 表示监听器(只对该路径有效)
- stat, 判断结果回调函数
- context, 自定义上下文对象
private static void testExists(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
// 判断 ZooKeeper 节点是否存在
Object context = new Object();
zooKeeper.exists("/yiqifu", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
}, new AsyncCallback.StatCallback() {
@Override
public void processResult(int i, String s, Object o, Stat stat) {
if(null != stat){
System.out.println("ZooKeeper /yiqifu 节点存在");
}
else {
System.out.println("ZooKeeper /yiqifu 节点不存在");
}
}
}, context);
}
四、创建 ZooKeeper 节点数据
- 创建节点数据使用 create 方法,传递四个参数
- path , 表示节点目录名称
- data , 表示节点数据
private static void testCreateNode(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
// 在 ZooKeeper 中创建节点
String nodeName = zooKeeper.create("/yiqifu", "test create data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("ZooKeeper 创建节点成功:" + nodeName);
}
五、获取 ZooKeeper 节点数据
- 获取 ZooKeeper 节点数据使用 getData 方法,传递三个参数
- path , 表示节点目录名称
- watch, 表示路径级别的监听器,这个监听器只对该路径下的数据操作监听生效。
private static void testGetdata(final ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
// 获取 ZooKeeper 节点数据,这里设置了Path级Watch
final Stat stat = new Stat();
byte[] nodeData = zooKeeper.getData("/yiqifu", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
Event.KeeperState state = watchedEvent.getState();
Event.EventType type = watchedEvent.getType();
String path = watchedEvent.getPath();
System.out.println("Path watch state=" + state);
System.out.println("Path watch type=" + type);
System.out.println("Path watch path=" + path);
//zooKeeper.getData("/yiqifu", true, stat); // 这里会使用Session级Watch
zooKeeper.getData("/yiqifu", this, stat);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, stat);
System.out.println("ZooKeeper 同步获取节点数据:" + new String(nodeData));
}
六、修改 ZooKeeper 节点数据
- 修改 ZooKeeper 节点数据使用 setData 方法,传递三个参数
- path , 表示节点目录名称。
- data, 表示新数据。
- version, 表示数据版本。
private static void testSetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
// 更新 ZooKeeper 节点数据(修改数据会触发Watch)
zooKeeper.setData("/yiqifu", "test modify data 1 ".getBytes(), 0);
zooKeeper.setData("/yiqifu", "test modify data 2 ".getBytes(), 1);
}
七、异步获取 ZooKeeper 节点数据
-
修改 ZooKeeper 节点数据使用 getData 方法,传递三个参数
-
path , 表示节点目录名称。
-
watch, 表示是否触发监听器。
-
dataCallback, 表示异步获取数据的回调函数。
-
private static void testAsyncGetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
// 获取 ZooKeeper 节点数据(使用异步回调方式)
Object context = new Object();
zooKeeper.getData("/yiqifu", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
System.out.println("ZooKeeper 同步获取节点数据的目录:"+s);
System.out.println("ZooKeeper 异步获取节点数据:"+new String(bytes));
}
}, context);
}
八、完整示例
package top.yiqifu.study.p131;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class Test01_Zookeeper {
public static void main(String[] args) {
try {
// 创建 ZooKeeper 对象
ZooKeeper zooKeeper = testCreateZookeeper();
// 在 ZooKeeper 创建数据节点
testCreateNode(zooKeeper);
// 在 ZooKeeper 中同步获取节点数据
testGetdata(zooKeeper);
// 在 ZooKeeper 中更新节点数据
testSetdata(zooKeeper);
// 在 ZooKeeper 异步获取节点数据
testAsyncGetdata(zooKeeper);
Thread.sleep(3000);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
private static ZooKeeper testCreateZookeeper() throws IOException, InterruptedException, KeeperException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
// ZooKeeper 集群地址(没连接池的概念,是Session的概念)
String connectionString = "192.168.8.51:2181,192.168.8.52:2181,192.168.8.53:2181";
// ZooKeeper Session 数据超时时间(也就是这个Session关闭后,与这个Session相关的数据在ZooKeeper中保存多信)。
Integer sessionTimeout = 3000;
// ZooKeeper Session 级别 Watcher(Watch只发生在读方法上,如 get、exists)
final ZooKeeper zooKeeper = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
Event.KeeperState state = watchedEvent.getState();
Event.EventType type = watchedEvent.getType();
String path = watchedEvent.getPath();
switch (state) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
countDownLatch.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
switch (type) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
}
System.out.println("Session watch state=" + state);
System.out.println("Session watch type=" + type);
System.out.println("Session watch path=" + path);
} catch (Exception e) {
e.printStackTrace();
}
}
});
countDownLatch.await();
ZooKeeper.States state = zooKeeper.getState();
switch (state) {
case CONNECTING:
break;
case ASSOCIATING:
break;
case CONNECTED:
break;
case CONNECTEDREADONLY:
break;
case CLOSED:
break;
case AUTH_FAILED:
break;
case NOT_CONNECTED:
break;
}
System.out.println("ZooKeeper state=" + state);
return zooKeeper;
}
private static void testCreateNode(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
// 在 ZooKeeper 中创建节点
String nodeName = zooKeeper.create("/yiqifu", "test create data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("ZooKeeper 创建节点成功:" + nodeName);
}
private static void testGetdata(final ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
// 获取 ZooKeeper 节点数据,这里设置了Path级Watch
final Stat stat = new Stat();
byte[] nodeData = zooKeeper.getData("/yiqifu", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
Event.KeeperState state = watchedEvent.getState();
Event.EventType type = watchedEvent.getType();
String path = watchedEvent.getPath();
System.out.println("Path watch state=" + state);
System.out.println("Path watch type=" + type);
System.out.println("Path watch path=" + path);
//zooKeeper.getData("/yiqifu", true, stat); // 这里会使用Session级Watch
zooKeeper.getData("/yiqifu", this, stat);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, stat);
System.out.println("ZooKeeper 同步获取节点数据:" + new String(nodeData));
}
private static void testSetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
// 更新 ZooKeeper 节点数据(修改数据会触发Watch)
zooKeeper.setData("/yiqifu", "test modify data 1 ".getBytes(), 0);
zooKeeper.setData("/yiqifu", "test modify data 2 ".getBytes(), 1);
}
private static void testAsyncGetdata(ZooKeeper zooKeeper) throws IOException, InterruptedException, KeeperException {
// 获取 ZooKeeper 节点数据(使用异步回调方式)
Object context = new Object();
zooKeeper.getData("/yiqifu", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
System.out.println("ZooKeeper 同步获取节点数据的目录:"+s);
System.out.println("ZooKeeper 异步获取节点数据:"+new String(bytes));
}
}, context);
}
}