Zookeeper(九)客户端的启动流程

目录

  • 一 ZooKeeper会话的创建与连接
    • 1.1 会话的创建
      • 1.1.1 ClientWatchManager
      • 1.1.2 ConnectStringParser
      • 1.1.3 HostProvider
      • 1.1.4 ClientCnxn
    • 1.2 会话的连接
      • 1.2.1 SendThread
      • 1.2.2 eventThread
  • 二 ZooKeeper会话的响应
    • 2.1 接受服务端响应
  • 三 ClientCnxn 详解
    • 3.1 Packet
    • 3.2 队列
    • 3.3 ClientCnxnSocket:底层Socket通信层

  • 官网:Apache ZooKeeper

ZooKeeper的客户端主要由以下几个核心组件组成。

  • ZooKeeper实例:客户端的入口。
  • ClientWatchManager:客户端Watcher管理器。
  • HostProvider:客户端地址列表管理器。
  • ClientCnxn:客户端核心线程,其内部又包含两个线程,即SendThread和EventThread。前者是一个I/O线程,主要负责ZooKeeper客户端和服务端之间的网络I/O通信;后者是一个事件线程,主要负责对服务端事件进行处理。

一 ZooKeeper会话的创建与连接

  • ZooKeeper客户端的初始化与启动环节,实际上就是ZooKeeper对象的实例化过程

image.png

  • 我们可以看到需要的参数:设置默认Watcher,设置ZooKeeper服务器地址列表,创建ClientCnxn。
  • 如果在ZooKeeper的构造方法中传入一个Watcher对象的话,那么ZooKeeper就会将这个Watcher对象保存在ZKWatchManager的defaultWatcher中,作为整个客户端会话期间的默认Watcher。

1.1 会话的创建

private final ZKWatchManager watchManager = new ZKWatchManager();


public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                 boolean canBeReadOnly)
throws IOException
{
    LOG.info("Initiating client connection, connectString=" + connectString
             + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

    watchManager.defaultWatcher = watcher;

    ConnectStringParser connectStringParser = new ConnectStringParser(
        connectString);
    HostProvider hostProvider = new StaticHostProvider(
        connectStringParser.getServerAddresses());
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                          hostProvider, sessionTimeout, this, watchManager,
                          getClientCnxnSocket(), canBeReadOnly);
    cnxn.start();
}
  • 通过调用ZooKeeper的构造方法来实例化一个ZooKeeper对象,在初始化过程中,会创建一个客户端的Watcher管理器:ClientWatchManager。
  • 如果在构造方法中传入了一个Watcher对象,那么客户端会将这个对象作为默认Watcher保存在ClientWatchManager中。
  • 对于构造方法中传入的服务器地址,客户端会将其存放在服务器地址列表管理器HostProvider中。
  • ZooKeeper客户端首先会创建一个网络连接器ClientCnxn,用来管理客户端与服务器的网络交互。另外,客户端在创建ClientCnxn的同时,还会初始化客户端两个核心队列outgoingQueue和pendingQueue,分别作为客户端的请求发送队列和服务端响应的等待队列。

1.1.1 ClientWatchManager

  • ZKWatchManager类的主要作用是管理和触发Zookeeper客户端的监视事件(watches)。
  • 在Zookeeper中,监视事件是一种机制,允许客户端在Zookeeper服务上注册兴趣,当指定节点的数据发生变化或者子节点列表发生变化时,客户端会收到通知。
  • 这个类中定义了三个Map类型的成员变量,分别用来存储数据监视(dataWatches)、存在监视(existWatches)和子节点监视(childWatches)的Watcher集合。Watcher是一个接口,它的实现类可以定义当特定事件发生时客户端需要执行的操作。
  • materialize方法是ClientWatchManager接口的一个实现,它的作用是根据事件类型和状态来触发相应的Watcher。方法的参数包括Zookeeper的连接状态(KeeperState)、事件类型(EventType)和客户端路径(clientPath)。

根据事件类型,materialize方法会执行以下操作:

  1. None类型:表示没有特定的事件发生,此时会触发默认监视器(defaultWatcher),并且如果配置了禁用自动重置监视器,并且当前连接状态不是同步连接状态,那么会清除所有的监视器集合。
  2. NodeDataChanged和NodeCreated类型:表示节点数据发生变化或者新节点被创建,此时会移除与指定路径相关的数据监视器和存在监视器,并将它们添加到结果集合中。
  3. NodeChildrenChanged类型:表示子节点列表发生变化,此时会移除与指定路径相关的子节点监视器,并将它们添加到结果集合中。
  4. NodeDeleted类型:表示节点被删除,此时会移除与指定路径相关的数据监视器、存在监视器和子节点监视器,并将它们添加到结果集合中。如果在存在监视器中发现了一个不应该出现的节点被删除的情况,将会记录一条警告日志。
  5. 其他未处理的事件类型:将会记录一条错误日志,并抛出运行时异常。

1.1.2 ConnectStringParser

  • 解析字符串转为InetSocketAddress存在集合中
    private final ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();
  • 在3.2.0及其之后版本的ZooKeeper中,添加了“Chroot”特性[插图],该特性允许每个客户端为自己设置一个命名空间(Namespace)。如果一个ZooKeeper客户端设置了Chroot,那么该客户端对服务器的任何操作,都将会被限制在其自己的命名空间下。
 private final String chrootPath;

1.1.3 HostProvider

在ConnectStringParser解析器中会对服务器地址做一个简单的处理,并将服务器地址和相应的端口封装成一个InetSocketAddress对象,以ArrayList形式保存在ConnectStringParser.serverAddresses属性中,然后,经过处理的地址列表会被进一步封装到StaticHostProvider类中。
image.png

  • 我们可以看到默认实现:StaticHostProvider,把解析号可用的InetSocketAddress,最后进行一个打散

image.png

  • 通过调用StaticHostProvider的next()方法,能够从StaticHostProvider中获取一个可用的服务器地址。这个next()方法并非简单地从serverAddresses中依次获取一个服务器地址,而是先将随机打散后的服务器地址列表拼装成一个环形循环队列

image.png

1.1.4 ClientCnxn

image.png

  • ClientCnxn是ZooKeeper客户端的核心工作类,负责维护客户端与服务端之间的网络连接并进行一系列网络通信。
  • 客户端会创建两个核心网络线程SendThread和EventThread,前者用于管理客户端和服务端之间的所有网络I/O,后者则用于进行客户端的事件处理。
  • 同时,客户端还会将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器,并初始化EventThread的待处理事件队列waitingEvents,用于存放所有等待被客户端处理的事件。
   public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;
        connectTimeout = sessionTimeout / hostProvider.size();
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;
        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();
    }

1.2 会话的连接

image.png

    public void start() {
        sendThread.start();
        eventThread.start();
    }

启动SendThread和EventThread

1.2.1 SendThread

SendThread首先会判断当前客户端的状态,进行一系列清理性工作,为客户端发送“会话创建”请求做准备。
image.png
在开始创建TCP连接之前,SendThread首先需要获取一个ZooKeeper服务器的目标地址,这通常是从HostProvider中随机获取出一个地址,然后委托给ClientCnxnSocket去创建与ZooKeeper服务器之间的TCP连接。

private void startConnect() throws IOException {
            state = States.CONNECTING;

            InetSocketAddress addr;
            if (rwServerAddress != null) {
                addr = rwServerAddress;
                rwServerAddress = null;
            } else {
                addr = hostProvider.next(1000);
            }

            setName(getName().replaceAll("\\(.*\\)",
                    "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
            if (ZooKeeperSaslClient.isEnabled()) {
                try {
                    String principalUserName = System.getProperty(
                            ZK_SASL_CLIENT_USERNAME, "zookeeper");
                    zooKeeperSaslClient =
                        new ZooKeeperSaslClient(
                                principalUserName+"/"+addr.getHostName());
                } catch (LoginException e) {
                    // An authentication error occurred when the SASL client tried to initialize:
                    // for Kerberos this means that the client failed to authenticate with the KDC.
                    // This is different from an authentication error that occurs during communication
                    // with the Zookeeper server, which is handled below.
                    LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
                      + "SASL authentication, if Zookeeper server allows it.");
                    eventThread.queueEvent(new WatchedEvent(
                      Watcher.Event.EventType.None,
                      Watcher.Event.KeeperState.AuthFailed, null));
                    saslLoginFailed = true;
                }
            }
            logStartConnect(addr);

            clientCnxnSocket.connect(addr);
        }

获取到一个服务器地址后,ClientCnxnSocket负责和服务器创建一个TCP长连接。
image.png
这里有两个实现类,默认第一个,封装连接请求,ConnectRequest,最后在封装成Packet对象,放入请求发送队列outgoingQueue中去。
image.png
当客户端请求准备完毕后,就可以开始向服务端发送请求了。ClientCnxnSocket负责从outgoingQueue中取出一个待发送的Packet对象,将其序列化成ByteBuffer后,向服务端进行发送。

 void sendPacket(Packet p) throws IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        p.createBB();
        ByteBuffer pbb = p.bb;
        sock.write(pbb);
    }

1.2.2 eventThread

  • 启动一个线程不断轮训事件,对不同的事件对出不同步的反应,processEvent方法是关键

image.png
首先,方法通过instanceof关键字检查传入的事件对象是否是WatcherSetEventPair类型。如果是,这意味着事件包含了一组Watcher对象,每个Watcher都会处理这个事件。代码遍历这些Watcher对象,并调用它们的process方法来处理事件。如果在处理过程中抛出异常,将会记录错误日志。
如果事件对象不是WatcherSetEventPair类型,那么它将被当作Packet类型来处理。Packet对象包含了客户端路径、回复头信息以及回调接口(cb)。代码首先检查回复头中的错误码,如果有错误,就将错误码保存在局部变量rc中。
接下来,代码根据响应的类型(例如ExistsResponse、GetDataResponse等)来调用相应的回调接口方法。这些回调接口是Zookeeper客户端用来接收操作结果的机制。例如,如果响应是GetDataResponse类型,那么代码会调用DataCallback接口的processResult方法,并传入操作结果码、客户端路径、上下文信息、节点数据和状态信息。
此外,代码还处理了MultiResponse类型,这是一种特殊的情况,表示一个请求包含了多个操作,每个操作都有自己的结果。在这种情况下,代码会遍历结果列表,并在所有操作都成功的情况下调用MultiCallback接口的processResult方法。
最后,如果回调接口是VoidCallback类型,那么代码会调用processResult方法,但不会传入任何数据,因为VoidCallback不关心操作结果。

二 ZooKeeper会话的响应

2.1 接受服务端响应

ClientCnxnSocket接收到服务端的响应后,会首先判断当前的客户端状态是否是“已初始化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由readConnectResult方法来处理该响应。
image.png

  • 反序列响应结果,使用Jute进行的

image.png

  • 连接成功后,一方面需要通知SendThread线程,进一步对客户端进行会话参数的设置,包括readTimeout和connectTimeout等,并更新客户端状态;另一方面,需要通知地址管理器HostProvider当前成功连接的服务器地址。

image.png

  • 为了能够让上层应用感知到会话的成功创建,SendThread会生成一个事件SyncConnected-None,代表客户端与服务器会话创建成功,并将该事件传递给EventThread线程。

image.png
EventThread线程收到事件后,会从ClientWatchManager管理器中查询出对应的Watcher,针对SyncConnected-None事件,那么就直接找出步骤2中存储的默认Watcher,然后将其放到EventThread的waitingEvents队列中去。
EventThread不断地从waitingEvents队列中取出待处理的Watcher对象,然后直接调用该对象的process接口方法,以达到触发Watcher的目的。

三 ClientCnxn 详解

3.1 Packet

Packet是ClientCnxn内部定义的一个对协议层的封装,作为ZooKeeper中请求与响应的载体
image.png
Packet中包含了最基本的请求头(requestHeader)、响应头(replyHeader)、请求体(request)、响应体(response)、节点路径(clientPath/serverPath)和注册的Watcher(watchRegistration)等信息。
Packet的createBB()方法负责对Packet对象进行序列化,最终生成可用于底层网络传输的ByteBuffer对象。在这个过程中,只会将requestHeader、request和readOnly三个属性进行序列化,其余属性都保存在客户端的上下文中,不会进行与服务端之间的网络传输。

 public void createBB() {
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                boa.writeInt(-1, "len"); // We'll fill this in later
                if (requestHeader != null) {
                    requestHeader.serialize(boa, "header");
                }
                if (request instanceof ConnectRequest) {
                    request.serialize(boa, "connect");
                    // append "am-I-allowed-to-be-readonly" flag
                    boa.writeBool(readOnly, "readOnly");
                } else if (request != null) {
                    request.serialize(boa, "request");
                }
                baos.close();
                this.bb = ByteBuffer.wrap(baos.toByteArray());
                this.bb.putInt(this.bb.capacity() - 4);
                this.bb.rewind();
            } catch (IOException e) {
                LOG.warn("Ignoring unexpected exception", e);
            }
        }

3.2 队列

outgoingQueue和pendingQueueClientCnxn中,有两个比较核心的队列outgoingQueue和pendingQueue,分别代表客户端的请求发送队列和服务端响应的等待队列。Outgoing队列是一个请求发送队列,专门用于存储那些需要发送到服务端的Packet集合。Pending队列是为了存储那些已经从客户端发送到服务端的,但是需要等待服务端响应的Packet集合。

3.3 ClientCnxnSocket:底层Socket通信层

  • ClientCnxnSocket定义了底层Socket通信的接口。在ZooKeeper3.4.0以前的版本中,客户端的这个底层通信层并没有被独立出来,而是混合在了ClientCnxn代码中。
  • 但后来为了使客户端代码结构更为清晰,同时也是为了便于对底层Socket层进行扩展(例如使用Netty来实现),因此从3.4.0版本开始,抽取出了这个接口类。在使用ZooKeeper客户端的时候,可以通过在zookeeper.clientCnxnSocket这个系统变量中配置ClientCnxnSocket实现类的全类名,以指定底层Socket通信层的自定义实现,例如,-Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNIO。在ZooKeeper中,其默认的实现是ClientCnxnSocketNIO。该实现类使用Java原生的NIO接口,其核心是doIO逻辑,主要负责对请求的发送和响应接收过程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/502860.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

一文彻底搞懂并发容器

文章目录 1. 什么是并发容器2. 并发容器的分类 1. 什么是并发容器 并发容器是一种用于多线程环境的数据结构&#xff0c;它们能够有效地处理并发访问和修改的问题。在多线程应用程序中&#xff0c;多个线程可能会同时访问和修改共享的数据结构&#xff0c;这可能会导致数据不一…

一文教你如何轻松领取阿里云优惠券

随着云计算技术的飞速发展&#xff0c;越来越多的企业和个人选择使用阿里云作为他们的云服务提供商。为了吸引更多的用户上云&#xff0c;阿里云推出了各种优惠券和促销活动。本文将教大家如何轻松领取阿里云优惠券&#xff0c;以便在购买阿里云产品和服务时享受更多优惠。 一、…

激发数据潜力:企业数据中台的策略性构建与优化_光点科技

在信息时代&#xff0c;数据是企业价值链中不可或缺的一环。构建一个策略性的企业数据中台不仅能够整合分散的数据资源&#xff0c;还能提高决策效率和业务敏捷性。本文聚焦于如何策略性地构建和优化数据中台&#xff0c;以便企业能够最大化地利用数据资源&#xff0c;推动企业…

Sora是否能颠覆视频制作行业?一文带你了解

一个月前OpenAI宣布了一款名为Sora的新生成式人工智能系统&#xff0c;该系统可以根据文本提示生成短视频。虽然Sora尚未向公众开放&#xff0c;但迄今为止发布的高质量样本已经引起了兴奋和担忧的反应。 OpenAI发布的样本视频&#xff08;该公司称这些视频是由Sora直接制作&am…

反应式编程(三)什么是粘包、拆包?如何解决?

目录 一、粘包、拆包介绍1.1 什么是 TCP 协议&#xff1f;1.2 什么是粘包、拆包&#xff1f;1.3 粘包、拆包的四种情况1.4 粘包、拆包的原因1&#xff09;TCP协议中的滑动窗口机制2&#xff09;传输层的 MSS 与链路层的 MTU3&#xff09;TCP协议中的 Nagle 算法4&#xff09;应…

【智能算法】晶体结构算法(CryStAl)原理及实现

目录 1.背景2.算法原理2.1算法思想2.2算法过程 3.结果展示4.参考文献 1.背景 2021年&#xff0c;S Talatahari等人受到晶体自然结构启发&#xff0c;提出了晶体构造算法&#xff08;Crystal Structure Algorithm , CryStAl&#xff09;。 2.算法原理 2.1算法思想 CryStAl受…

Python学习笔记-简单案例实现多进程与多线程

Python 的多进程与多线程是并发编程的两种重要方式&#xff0c;用于提高程序的执行效率。它们各自有不同的特点和适用场景。 多进程&#xff08;Multiprocessing&#xff09; 概念&#xff1a; 多进程是指操作系统中同时运行多个程序实例&#xff0c;每个实例称为一个进程。…

Jmeter 分布式压测,你的系统能否承受高负载?

‍你可以使用 JMeter 来模拟高并发秒杀场景下的压力测试。这里有一个例子&#xff0c;它模拟了同时有 5000 个用户&#xff0c;循环 10 次的情况‍。 请求默认配置 token 配置 秒杀接口 ​结果分析 ​但是&#xff0c;实际企业中&#xff0c;这种压测方式根本不满足实际需求。下…

java入门学习Day03

本篇文章主要有java中的变量、命名方法、数据类型。 一、java中的变量 数据类型 变量名 数据值&#xff1b;int money 50&#xff1b; public class varibledemo {public static void main(String[] args) {int money 50;//变量的输出System.out.println(money);money 6…

ctfshow-web入门-xxe

什么是xxe&#xff1f; XXE&#xff0c;全称XML External Entity Injection&#xff0c;即XML外部实体注入。这是一种针对应用程序解析XML输入类型的攻击。当包含对外部实体的引用的XML输入被弱配置的XML解析器处理时&#xff0c;就会发生这种攻击。这种攻击通过构造恶意内容&…

bugku-web-alert

这里可以看到flag在页面弹窗内 有两种弹窗 利用Python和bp各自尝试 得到的结果 这里得到一串不知道是什么的加密代码 经过尝试大量解码器后得知&#xff0c;这时unicode编码 进行解码

Linux中的文件操作

共识原理 在讲文件操作之前&#xff0c; 我们先形成一个共识 1 文件 内容 属性 2 文件分为打开的文件 和 没打开的文件 3 打开的文件是谁打开的&#xff1f; 进程&#xff01;&#xff01; – 研究文件操作本质是研究进程和文件的关系&#xff01; 4 没打开的文件&#xff1…

基于ssm的留学生交流互动论坛网站(java项目+文档)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的留学生交流互动论坛网站。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 留学生交流互动论坛网站…

RUST工程构建工具CARGO及代码编写工具RUSTROVER使用

1.使用cargo创建rust工程 cargo new hello_rust 生成的内容如下: 使用cargo build进行编译工程 编译成功会生成一个target目录 进入target目录运行生成程序 也可直接使用cargo run直接编译并运行 如果要测试工程执行cargo test 如果要为工程创建文档执行cargo doc 也可发布工程…

【MySQL】DQL-基础查询-语句&演示(查询多个字段 / 所有字段/并设置别名/去重)

前言 大家好吖&#xff0c;欢迎来到 YY 滴MySQL系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C Linux的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; YY的《C》专栏YY的《C11》专栏YY的…

【Java】MyBatis快速入门及详解

文章目录 1. MyBatis概述2. MyBatis快速入门2.1 创建项目2.2 添加依赖2.3 数据准备2.4 编写代码2.4.1 编写核心配置文件2.4.2 编写SQL映射文件2.4.3 编写Java代码 3. Mapper代理开发4. MyBatis核心配置文件5. 案例练习5.1 数据准备5.2 查询数据5.2.1 查询所有数据5.2.2 查询单条…

学生综合测评系统的设计与实现|Springboot+ Mysql+Java+ B/S结构(可运行源码+数据库+设计文档)

本项目包含可运行源码数据库LW&#xff0c;文末可获取本项目的所有资料。 推荐阅读100套最新项目持续更新中..... 2024年计算机毕业论文&#xff08;设计&#xff09;学生选题参考合集推荐收藏&#xff08;包含Springboot、jsp、ssmvue等技术项目合集&#xff09; 1. 系统功能…

删除数据操作

注意session.commit()的位置成功

基于SpringBoot + Vue实现的在线装修管理系统设计与实现+毕业论文

介绍 系统包含用户、装修队、管理员三个角色 管理员&#xff1a; 管理员管理&#xff1a;管理其他管理员的账号和权限&#xff0c;确保系统管理的层次化和安全性。 装修队管理&#xff1a;审核装修队的资质&#xff0c;管理装修队的人员信息&#xff0c;监控工程进度&#xff…

云服务器8核32G配置价格115元1个月、345元3个月

腾讯云轻量应用服务器8核32G配置、22M公网带宽&#xff0c;优惠价格115元1个月、345元3个月&#xff0c;系统盘为320GB SSD盘、4500GB月流量&#xff0c;活动链接 txybk.com/go/txy 腾讯云8核32G服务器22M带宽优惠价格 本文8核32G服务器为轻量应用服务器&#xff0c;虽然是22M公…