zkCli.sh脚本
这个命令行脚本在bin目录下:
ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
# 加载zkEnv.sh脚本
if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
. "$ZOOBINDIR"/../libexec/zkEnv.sh
else
. "$ZOOBINDIR"/zkEnv.sh
fi
ZOO_LOG_FILE=zookeeper-$USER-cli-$HOSTNAME.log
"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.log.file=${ZOO_LOG_FILE}" \
-cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
org.apache.zookeeper.ZooKeeperMain "$@"
可以看到使用org.apache.zookeeper.ZooKeeperMain作为主类。
ZooKeeperMain入口类
The command line client to ZooKeeper.
idea运行
main方法入口
public static void main(String[] args) throws IOException, InterruptedException {
ZooKeeperMain main = new ZooKeeperMain(args);
// 执行命令
main.run();
}
public ZooKeeperMain(String[] args) throws IOException, InterruptedException {
// 用于解析命令行选项
// -server host1:port1,host2:port2 -timeout 30000 -r
// -r 表示readonly
cl.parseOptions(args);
System.out.println("Connecting to " + cl.getOption("server"));
// 连接zookeeper服务器
connectToZK(cl.getOption("server"));
}
连接服务器
protected void connectToZK(String newHost) throws InterruptedException, IOException {
// 略
host = newHost;
boolean readOnly = cl.getOption("readonly") != null;
if (cl.getOption("secure") != null) {
System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
System.out.println("Secure connection is enabled");
}
ZKClientConfig clientConfig = null;
if (cl.getOption("client-configuration") != null) {
try {
clientConfig = new ZKClientConfig(cl.getOption("client-configuration"));
} catch (QuorumPeerConfig.ConfigException e) {
e.printStackTrace();
ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue());
}
}
if (cl.getOption("waitforconnection") != null) {
connectLatch = new CountDownLatch(1);
}
int timeout = Integer.parseInt(cl.getOption("timeout")); // 默认30000
// 创建ZooKeeperAdmin
zk = new ZooKeeperAdmin(host, timeout, new MyWatcher(), readOnly, clientConfig);
// 等待连接完成
if (connectLatch != null) {
if (!connectLatch.await(timeout, TimeUnit.MILLISECONDS)) {
zk.close();
throw new IOException(KeeperException.create(KeeperException.Code.CONNECTIONLOSS));
}
}
}
执行命令
void run() throws IOException, InterruptedException {
if (cl.getCommand() == null) {
System.out.println("Welcome to ZooKeeper!");
boolean jlinemissing = false;
// only use jline if it's in the classpath
// jline命令行工具 略
if (jlinemissing) { // 当jline工具不可用时,使用原生标准输入接收客户端命令
System.out.println("JLine support is disabled");
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
String line;
while ((line = br.readLine()) != null) {
executeLine(line); // 解析命令之后调用processCmd方法
}
}
} else {
// Command line args non-null. Run what was passed.
processCmd(cl);
}
}
protected boolean processCmd(MyCommandOptions co) throws IOException, InterruptedException {
boolean watch = false;
try {
watch = processZKCmd(co);
exitCode = ExitCode.EXECUTION_FINISHED.getValue();
} catch (CliException ex) {
exitCode = ex.getExitCode();
System.err.println(ex.getMessage());
}
return watch;
}
protected boolean processZKCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
String[] args = co.getArgArray();
String cmd = co.getCommand();
// 略
boolean watch = false;
// quit、redo、history、connect等命令
// 获取命令处理类
CliCommand cliCmd = commandMapCli.get(cmd);
if (cliCmd != null) {
cliCmd.setZk(zk);
watch = cliCmd.parse(args).exec();
} else if (!commandMap.containsKey(cmd)) {
usage(); // 打印帮助
}
return watch;
}
CliCommand抽象类
Base class for all CLI commands.
public abstract class CliCommand {
protected ZooKeeper zk;
protected PrintStream out;
protected PrintStream err;
private String cmdStr;
private String optionStr;
public CliCommand(String cmdStr, String optionStr) {
this.out = System.out;
this.err = System.err;
this.cmdStr = cmdStr;
this.optionStr = optionStr;
}
// Set out printStream
public void setOut(PrintStream out) {
this.out = out;
}
// Set err printStream
public void setErr(PrintStream err) {
this.err = err;
}
// set the zookeeper instance
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
// get the string used to call this command
public String getCmdStr() {
return cmdStr;
}
// get the option string
public String getOptionStr() {
return optionStr;
}
// get a usage string, contains the command and the options
public String getUsageStr() {
return cmdStr + " " + optionStr;
}
// add this command to a map. Use the command string as key.
public void addToMap(Map<String, CliCommand> cmdMap) {
cmdMap.put(cmdStr, this);
}
// parse the command arguments
public abstract CliCommand parse(String[] cmdArgs) throws CliParseException;
// return true if command has watch option, false otherwise
public abstract boolean exec() throws CliException;
}
CreateCommand示例
以CreateCommand为例说明一下CliCommand的使用方式:
public class CreateCommand extends CliCommand {
private static Options options = new Options();
private String[] args;
private CommandLine cl;
static {
options.addOption(new Option("e", false, "ephemeral"));
options.addOption(new Option("s", false, "sequential"));
options.addOption(new Option("c", false, "container"));
options.addOption(new Option("t", true, "ttl"));
}
public CreateCommand() {
super("create", "[-s] [-e] [-c] [-t ttl] path [data] [acl]");
}
@Override
public CliCommand parse(String[] cmdArgs) throws CliParseException {
DefaultParser parser = new DefaultParser();
try {
cl = parser.parse(options, cmdArgs);
} catch (ParseException ex) {
throw new CliParseException(ex);
}
args = cl.getArgs();
if (args.length < 2) {
throw new CliParseException(getUsageStr());
}
return this;
}
@Override
public boolean exec() throws CliException {
boolean hasE = cl.hasOption("e");
boolean hasS = cl.hasOption("s");
boolean hasC = cl.hasOption("c");
boolean hasT = cl.hasOption("t");
if (hasC && (hasE || hasS)) {
throw new MalformedCommandException(
"-c cannot be combined with -s or -e. Containers cannot be ephemeral or sequential.");
}
long ttl;
try {
ttl = hasT ? Long.parseLong(cl.getOptionValue("t")) : 0;
} catch (NumberFormatException e) {
throw new MalformedCommandException("-t argument must be a long value");
}
if (hasT && hasE) {
throw new MalformedCommandException("TTLs cannot be used with Ephemeral znodes");
}
if (hasT && hasC) {
throw new MalformedCommandException("TTLs cannot be used with Container znodes");
}
CreateMode flags;
if (hasE && hasS) {
flags = CreateMode.EPHEMERAL_SEQUENTIAL;
} else if (hasE) {
flags = CreateMode.EPHEMERAL;
} else if (hasS) {
flags = hasT ? CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL : CreateMode.PERSISTENT_SEQUENTIAL;
} else if (hasC) {
flags = CreateMode.CONTAINER;
} else {
flags = hasT ? CreateMode.PERSISTENT_WITH_TTL : CreateMode.PERSISTENT;
}
if (hasT) {
try {
EphemeralType.TTL.toEphemeralOwner(ttl);
} catch (IllegalArgumentException e) {
throw new MalformedCommandException(e.getMessage());
}
}
String path = args[1];
byte[] data = null;
if (args.length > 2) {
data = args[2].getBytes(UTF_8);
}
List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
if (args.length > 3) {
acl = AclParser.parse(args[3]);
}
try {
// 调用zookeeper客户端创建节点
String newPath = hasT
? zk.create(path, data, acl, flags, new Stat(), ttl)
: zk.create(path, data, acl, flags);
// 打印返回值
err.println("Created " + newPath);
} catch (IllegalArgumentException ex) {
throw new MalformedPathException(ex.getMessage());
} catch (KeeperException.EphemeralOnLocalSessionException e) {
err.println("Unable to create ephemeral node on a local session");
throw new CliWrapperException(e);
} catch (KeeperException.InvalidACLException ex) {
err.println(ex.getMessage());
throw new CliWrapperException(ex);
} catch (KeeperException | InterruptedException ex) {
throw new CliWrapperException(ex);
}
return true;
}
}
ZooKeeperAdmin类
This is the main class for ZooKeeperAdmin client library. This library is used to perform cluster administration tasks, such as reconfigure cluster membership. The ZooKeeperAdmin class inherits ZooKeeper and has similar usage pattern as ZooKeeper class. Please check ZooKeeper class document for more details.
继承了ZooKeeper类,扩展了reconfig相关命令。