0%

ZooKeeper 系统模型

从数据节点、Watcher 和 ACL 五方面来讲述 ZooKeeper 的系统模型。

数据节点

节点结构

ZooKeeper 的视图结构和使用了其特有的“数据节点”概念,我们称之为 ZNode。ZNode 是 ZooKeeper 中数据的最小单元,每个 ZNode 上都可以保存数据,同时还可以挂载子节点,因此构成了一个层次化的命名空间,我们称之为树。

节点特性

节点类型

在 ZooKeeper 中,每个数据节点都是有生命周期的,其生命周期的长短取决于数据节点的节点类型。在 ZooKeeper 中,节点类型可以分为持久节点(PERSISTENT)、临时节点(EPHEMERAL)和顺序节点(SEQUENTIAL)三大类,具体在节点创建过程中,通过组合使用,可以生成以下四种组合型节点类型:

持久节点(PERSISTENT)

持久节点是指该数据节点被创建后,就会一直存在于 ZK 服务器上,直到有删除操作来主动清除这个节点。

持久顺序节点(PERSISTENT SEQUENTIAL)

持久顺序节点的基本特性和持久节点是一致的,额外的特性表现在顺序性上。在 ZK 中,每个父节点都会为它的第一级子节点维护一份顺序,用于记录下每个子节点创建的先后顺序。基于这个顺序特性,在创建子节点的时候,可以设置这个标记,那么在创建节点过程中,ZK 会自动为给定节点名加上一个数字后缀,作为一个新的、完整的节点名。另外需要注意的是,这个数字后缀的上限是整型的最大值。

临时节点(EPHEMERAL)

和持久节点不同的是,临时节点的生命周期和客户端的会话绑定在一起,也就是说,如果客户端会话失效,那么这个节点就会被自动清理掉。注意,这里提到的是客户端会话失效,而非 TCP 连接断开。另外, ZK 规定了不能基于临时节点来创建子节点,即临时节点只能作为叶子节点。

临时顺序节点(EPHEMERAL SEQUENTIAL)

临时顺序节点的基本特性和临时节点也是一致的,同样是在临时节点的基础上,添加了顺序的特性。

状态信息

版本(保证分布式数据原子性操作)

ZooKeeper 中的版本表示的是对数据节点的数据内容、子节点列表,或是节点 ACL 信息的修改次数,我们以其中的 version 这种版本类型为例来说明。

在一个数据节点/zk-node被创建完毕之后,节点的 version 值是 0,表示的含义是“当前节点自从创建之后,被更新过 0 次”。如果现在对该节点的数据内容进行更新操作,那么随后,version 的值就会变成 1。同时需要注意的是,在上文中提到的关于 version 的说明,其表示的是对数据节点数据内容的变更次数,强调的是变更次数,因此即使前后两次变更并没有使得数据内容的值发生变化,version 的值依然会变更。

version = setDataRequest.getversion();
int currentVersion = nodeRecord.stat.getVersion();
if (version = -1 && version != currentVersion) {
	throw new KeeperException.BadVersionException(path);
}
version = currentVersion + 1;

从上面的执行逻辑中,我们可以看出,在进行一次 setDataRequest 请求处理时,首先进行了版本检查: ZooKeeper 会从 setDataRequest 请求中获取到当前请求的版本 version,同时从数据记录 nodeRecord 中获取到当前服务器上该数据的最新版本 currentVersion。

如果 version 为“-1”,那么说明客户端并不要求使用乐观锁,可以忽略版本比对;如果 version 不是“-1”,那么就比对 version 和 currentVersion,如果两个版本不匹配,那么将会抛出 BadVersionException 异常。

Watcher(数据变更的通知)

ZooKeeper 提供了分布式数据的发布/订阅功能。一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理。在 ZooKeeper 中,引入了 Watcher 机制来实现这种分布式的通知功能。

ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个 Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。整个 Watcher 注册与通知过程如图所示。

ZooKeeper 的 Watcher 机制主要包括客户端线程、客户端 WatchManager 和 ZooKeeper 服务器三部分。在具体工作流程上,简单地讲,客户端在向 ZooKeeper 服务器注册 Watcher 的同时,会将 Watcher 对象存储在客户端的 WatchManager 中。当 ZooKeeper 服务器端触发 Watcher 事件后,会向客户端发送通知,客户端线程从 WatchManager 中取出对应的 Watcher 对象来执行回调逻辑。

参数组成

Watcher接口

在 ZooKeeper 中,接口类 Watcher 用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,包含 KeeperState 和 EventType 两个枚举类,分别代表了通知状态和事件类型,同时定义了事件的回调方法:process(WatchedEvent event)

Watcher事件

同一个事件类型在不同的通知状态中代表的含义有所不同,下表列举了常见的通知状态和事件类型

其中,针对 NodeDataChanged 事件的变更,包括节点的数据内容和数据的版本号 dataVersion。因此,即使使用相同的数据内容来更新,还是会触发这个事件通知,因为对于 ZooKeeper 来说,无论数据内容是否变更,一旦有客户端调用了数据更新的接口,且更新成功,就会更新 dataVersion 值。

WatchedEvent 事件只包含 KeeperState、EventType 以及 Path,并没有传输数据,因此客户端无法直接从该事件中获取到对应数据节点的原始数据内容以及变更后的新数据内容,而是需要客户端再次主动去重新获取数据——这也是 ZooKeeper Watcher 机制的一个非常重要的特性。

工作机制

如下图所示,客户端通过 SendThread 发送注册请求到 ZkServer 端;ZkServer 端通过 WatcherManager 管理所有的 Watcher,ZkClient 通过 ZKWatcherManager 管理所有的watcher。

客户端注册Watcher

向构造方法传入一个默认的 Watcher

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher);

这个 Watcher 将作为整个 ZooKeeper 会话期间的负责监听会话变更状态的默认 Watcher,会一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中。

另外也可以通过getData()getChildren()exist()三个接口来向 ZooKeeper 服务器注册 Watcher。

getData()举例,注册原理如下:

public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
    final String clientPath = path;
    PathUtils.validatePath(clientPath);

    // the watch contains the un-chroot path
    WatchRegistration wcb = null;
    //封装一个 Watcher 的注册信息 WatchRegistration 对象,
    //用于暂时保存数据节点的路径和 Watcher 的对应关系
    if (watcher != null) {
        wcb = new DataWatchRegistration(watcher, clientPath);
    }

    final String serverPath = prependChroot(clientPath);

    RequestHeader h = new RequestHeader();
    h.setType(ZooDefs.OpCode.getData);
    GetDataRequest request = new GetDataRequest();
    request.setPath(serverPath);
    //对Request请求进行标记,将其设置为使用监听
    request.setWatch(watcher != null);
    GetDataResponse response = new GetDataResponse();
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (r.getErr() != 0) {
        throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
    }
    if (stat != null) {
        DataTree.copyStat(response.getStat(), stat);
    }
    return response.getData();
}

在 ZooKeeper 中,Packet 可以被看作一个最小的通信协议单元,用于进行客户端与服务端之间的网络传输,任何需要传输的对象都需要包装成一个 Packet 对象。因此,在 ClientCnxn 中 WatchRegistration 又会被封装到 Packet 中去,然后放入发送队列中等待客户端发送。

public Packet queuePacket(
    RequestHeader h,
    ReplyHeader r,
    Record request,
    Record response,
    AsyncCallback cb,
    String clientPath,
    String serverPath,
    Object ctx,
    WatchRegistration watchRegistration,
    WatchDeregistration watchDeregistration) {
    Packet packet = null;

    // Note that we do not generate the Xid for the packet yet. It is
    // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
    // where the packet is actually sent.
    packet = new Packet(h, r, request, response, watchRegistration);
    // The synchronized block here is for two purpose:
    // 1. synchronize with the final cleanup() in SendThread.run() to avoid race
    // 2. synchronized against each packet. So if a closeSession packet is added,
    // later packet will be notified.
    synchronized (state) {
        if (!state.isAlive() || closing) {
            conLossPacket(packet);
        } else {
            // If the client is asking to close the session then
            // mark as closing
            if (h.getType() == OpCode.closeSession) {
                closing = true;
            }
            outgoingQueue.add(packet);
        }
    }
    sendThread.getClientCnxnSocket().packetAdded();
    return packet;
}

随后,ZooKeeper 客户端就会向服务端发送这个请求,同时等待请求的返回。完成请求发送后,会由客户端 SendThread 线程的 readResponse 方法负责接收来自服务端的响应,finishPacket 方法会从 Packet 中取出对应的 Watcher 并注册到 ZKWatchManager 中去。

// @VisibleForTesting
protected void finishPacket(Packet p) {
    int err = p.replyHeader.getErr();
    if (p.watchRegistration != null) {
        p.watchRegistration.register(err);
    }
    // Add all the removed watch events to the event queue, so that the
    // clients will be notified with 'Data/Child WatchRemoved' event type.
    
    if (p.cb == null) {
        synchronized (p) {
            p.finished = true;
            p.notifyAll();
        }
    } else {
        p.finished = true;
        eventThread.queuePacket(p);
    }
}

从上面的内容中,我们已经了解到客户端已经将 Watcher 暂时封装在了 WatchRegistration 对象中,现在就需要从这个封装对象中再次提取出 Watcher 来:

public void register(int rc) {
    if (shouldAddWatch(rc)) {
        Map<String, Set<Watcher>> watches = getWatches(rc);
        synchronized (watches) {
            Set<Watcher> watchers = watches.get(clientPath);
            if (watchers == null) {
                watchers = new HashSet<Watcher>();
                watches.put(clientPath, watchers);
            }
            watchers.add(watcher);
        }
    }
}

在 register 方法中,客户端会将之前暂时保存的 Watcher 对象转交给 ZKWatchManager,并最终保存到 dataWatches 中去。ZKWatchManager.dataWatches 是一个Map<String,Set<Watcher>>类型的数据结构,用于将数据节点的路径和 Watcher 对象进行一一映射后管理起来。整个客户端 Watcher 的注册流程如下图所示。

通过上面的讲解,相信读者已经对客户端的 Watcher 注册流程有了一个大概的了解。但同时我们也可以发现,极端情况下,客户端毎调用一次getData()接口,就会注册上个 Watcher,那么这些 Watcher 实体都会随着客户端请求被发送到服务端去吗?

答案是否定的。如果客户端注册的所有 Watcher 都被传递到服务端的话,那么服务端肯定会出现内存紧张或其他性能问题了,幸运的是,在 ZooKeeper 的设计中充分考虑到了这个问题。在上面的流程中,我们提到把 WatchRegistration 封装到了 Packet 对象中去,但事实上,在底层实际的网络传输序列化过程中,并没有将 WatchRegistration 对象完全地序列化到底层字节数组中去。为了证实这一点,我们可以看下 Packet 内部的序列化过程

public void createBB() {
    try {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
        boa.writeInt(-1, "len"); // We'll fill this in later
        if (requestHeader != null) {
            requestHeader.serialize(boa, "header");
        }
        if (request instanceof ConnectRequest) {
            request.serialize(boa, "connect");
            // append "am-I-allowed-to-be-readonly" flag
            boa.writeBool(readOnly, "readOnly");
        } else if (request != null) {
            request.serialize(boa, "request");
        }
        baos.close();
        this.bb = ByteBuffer.wrap(baos.toByteArray());
        this.bb.putInt(this.bb.capacity() - 4);
        this.bb.rewind();
    } catch (IOException e) {
        LOG.warn("Unexpected exception", e);
    }
}

从上面的代码片段中,我们可以看到,在Packet.create()方法中,ZooKeeper 只会将 requestHeader 和 request 两个属性进行序列化,也就是说,尽管 WatchRegistration 被封装在了 Packet 中,但是并没有被序列化到底层字节数组中去,因此也就不会进行网络传输了。

服务端处理Watcher

ServerCnxn存储

从上图中我们可以看到,服务端收到来自客户端的请求之后,在FinalRequestProcessor.processRequest()中会判断当前请求是否需要注册 Watcher:

case OpCode.getData:
    rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo);
    GetDataResponse gdr = (GetDataResponse) rec;
    subResult = new GetDataResult(gdr.getData(), gdr.getStat());
    break;
private Record handleGetDataRequest(Record request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException, IOException {
    GetDataRequest getDataRequest = (GetDataRequest) request;
    String path = getDataRequest.getPath();
    DataNode n = zks.getZKDatabase().getNode(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
    Stat stat = new Stat();
    byte[] b = zks.getZKDatabase().getData(path, stat, getDataRequest.getWatch() ? cnxn : null);
    return new GetDataResponse(b, stat);
}

从 getData 请求的处理逻辑中,我们可以看到,当getDataRequest.getWatch()为 true 的时侯,ZooKeeper 就认为当前客户端请求需要进行 Watcher 注册,于是就会将当前的 ServerCnxn 对象和数据节点路径传入 getData 方法中去。那么为什么要传入ServerCnxn 呢?ServerCnxn 是一个 ZooKeeper 客户端和服务器之间的连接接口,代表了一个客户端和服务器的连接。ServerCnxn 接口的默认实现是 NIOServerCnxn,同时从3.4.0版本开始,引入了基于 Netty 的实现:NettyServerCnxn。无论采用哪种实现方式,都实现了 Watcher 的 process 接口,因此我们可以把 ServerCnxn 看作是一个 Watcher对象。数据节点的节点路径和 ServerCnxn 最终会被存储在 WatchManager 的 watchTable 和 watch2Paths 中。

WatchManager 是 ZooKeeper 服务端 Watcher 的管理者,其内部管理的 watchTable 和 watch2Paths 两个存储结构,分别从两个维度对 Watcher 进行存储。

  • watchTable 是从数据节点路径的粒度来托管 Watcher。
  • watch2Paths 是从 Watcher 的粒度来控制事件触发需要触发的数据节点。

同时,WatchManager 还负责 Watcher 事件的触发,并移除那些已经被触发的 Watcher。注意,WatchManager 只是一个统称,在服务端,DataTree 中会托管两个 WatchManager,分别是 dataWatches 和 childWatches,分别对应数据变更 Watcher 和子节点变更 Watcher。在本例中,因为是 getData 接口,因此最终会被存储在 dataWatches 中,其数据结构如下图所示。

Watcher触发

对于标记了 Watcher 注册的请求,ZooKeeper 会将其对应的 ServerCnxn 存储到 WatchManager 中,下面我们来看看服务端是如何触发 Watcher 的。

public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException {
    Stat s = new Stat();
    DataNode n = nodes.get(path);
   
    //...
    
    updateWriteStat(path, dataBytes);
    dataWatches.triggerWatch(path, EventType.NodeDataChanged);
    return s;
}
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
    WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    Set<Watcher> watchers = new HashSet<>();
    PathParentIterator pathParentIterator = getPathParentIterator(path);
    
    //...
    
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        w.process(e);
    }

    switch (type) {
        case NodeCreated:
            ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(watchers.size());
            break;

        case NodeDeleted:
            ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(watchers.size());
            break;

        case NodeDataChanged:
            ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(watchers.size());
            break;

        case NodeChildrenChanged:
            ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(watchers.size());
            break;
        default:
            // Other types not logged.
            break;
    }

    return new WatcherOrBitSet(watchers);
}

过程总结

  1. 封装 WatchedEvent

    首先将通知状态(KeeperState)、事件类型(EventType)以及节点路径(Path)封装成一个 WatchedEvent 对象。

  2. 查询 Watcher
    根据数据节点的节点路径从 watchTable 中取出对应的 Watcher。如果没有找到 Watcher,说明没有任何客户端在该数据节点上注册过 Watcher,直接退出。而如果找到了这个 Watcher,会将其提取出来,同时会直接从 watchTable 和 watch2Paths 中将其删除 —— 从这里我们也可以看出,Watcher 在服务端是一次性的,即触发一次就失效了。

  3. 调用 process 方法来触发 Watcher

    在这一步中,会逐个依次地调用从步骤 2 中找出的所有 Watcher 的 process 方法。那么这里的 process 方法究竟做了些什么呢?在上文中我们已经提到,对于需要注册 Watcher 的请求,ZooKeeper 会把当前请求对应的 ServerCnxn 作为一个 Watcher 进行存储,因此,这里调用的 process 方法,事实上就是 ServerCnxn 的对应方法:

    public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
    
        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();
    
        // The last parameter OpCode here is used to select the response cache.
        // Passing OpCode.error (with a value of -1) means we don't care, as we don't need
        // response cache on delivering watcher events.
        sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error);
    }

    从上面的代码片段中,我们可以看出在 process 方法中,主要逻辑如下。

    • 在请求头中标记“-1”,表明当前是一个通知。
    • 将 WatchedEvent 包装成 WatcherEvent,以便进行网络传输序列化。
    • 向客户端发送该通知。

      从以上几个步骤中可以看到,ServerCnxn 的 process 方法中的逻辑非常简单,本质上并不是处理客户端 Watcher 真正的业务逻辑,而是借助当前客户端连接的 ServerCnxn 对象来实现对客户端的 WatchedEvent 传递,真正的客户端 Watcher 回调与业务逻辑执行都在客户端。

客户端回调Watcher

SendThread接收时间通知

class SendThread extends ZooKeeperThread {
    void readResponse(ByteBuffer incomingBuffer) throws IOException {
        //..
    }
}

对于一个来自服务端的响应,客户端都是由Sendthread.readResponse(ByteBuffer incomingBuffer)方法来统一进行处理的,如果响应头 replyHdr 中标识了 XID 为 -1,表明这是一个通知类型的响应,对其的处理大体上分为以下 4 个主要步骤。

  1. 反序列化
    ZooKeeper 客户端接到请求后,首先会将字节流转换成 WatcherEvent 对象。
  2. 处理 chrootPath
    如果客户端设置了 chrootPath 属性,那么需要对服务端传过来的完整的节点路径进行 chrootPath 处理,生成客户端的一个相对节点路径。例如客户端设置了 chrootPath 为/app1,那么针对服务端传过来的响应包含的节点路径为/app1/locks,经过 chrootPath 处理后,就会变成一个相对路径:/locks。关于 ZooKeeper 的 chrootPath。
  3. 还原 WatchedEvent
    process 接口的参数定义是 WatchedEvent,因此这里需要将 WatcherEvent 对象转换成 WatchedEvent。

  4. 回调 Watcher

    最后将 WatchedEvent 对象交给 EventThread 线程,在下一个轮询周期中进行 Watcher 回调。

EventThread处理事件通知

private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
    if (event.getType() == EventType.None && sessionState == event.getState()) {
        return;
    }
    sessionState = event.getState();
    final Set<Watcher> watchers;
    if (materializedWatchers == null) {
        // materialize the watchers based on the event
        watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
    } else {
        watchers = new HashSet<Watcher>();
        watchers.addAll(materializedWatchers);
    }
    WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
    // queue the pair (watch set & event) for later processing
    waitingEvents.add(pair);
}

queueEvent 方法首先会根据该通知事件,从 ZKWatchManager 中取出所有相关的 Watcher

public Set<Watcher> materialize(
        Watcher.Event.KeeperState state,
        Watcher.Event.EventType type,
        String clientPath) {
        Set<Watcher> result = new HashSet<Watcher>();

        switch (type) {
        case None:
            result.add(defaultWatcher);
            boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected;
            synchronized (dataWatches) {
                for (Set<Watcher> ws : dataWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    dataWatches.clear();
                }
            }

            synchronized (existWatches) {
                for (Set<Watcher> ws : existWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    existWatches.clear();
                }
            }

            synchronized (childWatches) {
                for (Set<Watcher> ws : childWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    childWatches.clear();
                }
            }

            synchronized (persistentWatches) {
                for (Set<Watcher> ws: persistentWatches.values()) {
                    result.addAll(ws);
                }
            }

            synchronized (persistentRecursiveWatches) {
                for (Set<Watcher> ws: persistentRecursiveWatches.values()) {
                    result.addAll(ws);
                }
            }

            return result;
        case NodeDataChanged:
        case NodeCreated:
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);
            }
            synchronized (existWatches) {
                addTo(existWatches.remove(clientPath), result);
            }
            addPersistentWatches(clientPath, result);
            break;
        case NodeChildrenChanged:
            synchronized (childWatches) {
                addTo(childWatches.remove(clientPath), result);
            }
            addPersistentWatches(clientPath, result);
            break;
        case NodeDeleted:
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);
            }
            // TODO This shouldn't be needed, but just in case
            synchronized (existWatches) {
                Set<Watcher> list = existWatches.remove(clientPath);
                if (list != null) {
                    addTo(list, result);
                    LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                }
            }
            synchronized (childWatches) {
                addTo(childWatches.remove(clientPath), result);
            }
            addPersistentWatches(clientPath, result);
            break;
        default:
            String errorMsg = String.format(
                "Unhandled watch event type %s with state %s on path %s",
                type,
                state,
                clientPath);
            LOG.error(errorMsg);
            throw new RuntimeException(errorMsg);
        }

        return result;
    }

    private void addPersistentWatches(String clientPath, Set<Watcher> result) {
        synchronized (persistentWatches) {
            addTo(persistentWatches.get(clientPath), result);
        }
        synchronized (persistentRecursiveWatches) {
            for (String path : PathParentIterator.forAll(clientPath).asIterable()) {
                addTo(persistentRecursiveWatches.get(path), result);
            }
        }
    }
}

客户端在识别出事件类型 EventType 后,会从相应的 Watcher 存储(即 dataWatches、existWatches 或 childWatches 中的一个或多个)中去除对应的 Watcher。注意,此处使用的是 remove 接口,因此也表明了客户端的 Watcher 机制同样也是一次性的,即一旦被触发后,该 Watcher 就失效了。

获取到相关的所有 Watcher 之后,会将其放入 waitingEvents 这个队列中去。WaitingEvents 是一个待处理 Watcher 的队列,EventThread 的 run 方法会不断对该队列进行处理。EventThread 线程每次都会从 waitingEvents 队列中取出一个 Watcher,并进行串行同步处理。注意,此处 processEvent 方法中的 Watcher 才是之前客户端真正注册的 Watcher,调用其 process 方法就可以实现 Watcher 的回调了。

特性总结

一次性

一次使用后就会被移除,这样的设计有效地减轻了服务端的压力。如果注册一个 Watcher 之后一直有效,那么,针对那些更新非常频繁的节点,服务端会不断地向客户端发送事件通知,这无论对于网络还是服务端性能的影响都非常大。

客户端串行执行

客户端 Watcher 回调的过程是一个串行同步的过程,这为我们保证了顺序,同时,需要开发人员注意的一点是,千万不要因为一个 Watcher 的处理逻辑影响了整个客户端的 Watcher 回调。

轻量

WatchedEvent 是 ZooKeeper 整个 Watcher 通知机制的最小通知单元,这个数据结构中只包含三部分内容:通知状态、事件类型和节点路径。也就是说,Watcher 通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。例如针对 NodeDataChanged 事件,ZooKeeper 的 Watcher 只会通知客户端指定数据节点的数据内容发生了变更,而对于原始数据以及变更后的新数据都无法从这个事件中直接获取到,而是需要客户端主动重新去获取数据 —— 这也是 ZooKeeper 的 Watcher 机制的一个非常重要的特性。

另外,客户端向服务端注册 Watcher 的时候,并不会把客户端真实的 Watcher 对象传递到服务端,仅仅只是在客户端请求中使用 boolean 类型属性进行了标记,同时服务端也仅仅只是保存了当前连接的 ServerCnxn 对象。

ACL(保障数据安全)

ZooKeeper 提供了一套完善的 ACL(Access Control List)权限控制机制来保障数据的安全。

提到权限控制,在 Unix/Linux 文件系统中使用的,也是目前应用最广泛的权限控制方式 —— UGO(User、Group 和 Others)权限控制机制。简单地讲,UGO 就是针对一个文件或目录,对创建者(User)、创建者所在的组(Group)和其他用户(Other)分别配置不同的权限。从这里可以看出,UGO 其实是一种粗粒度的文件系统权限控制模式,利用 UGO 只能对三类用户进行权限控制,即文件的创建者、创建者所在的组以及其他所有用户,很显然,UGO 无法解决下面这个场景:

用户 U1 创建了文件 F1,希望 U1 所在的用户组 G1 拥有对 F1 读写和执行的权限,另一个用户组 G2 拥有读权限,而另外一个用户 U3 则没有任何权限。

接下去我们来看另外一种典型的权限控制方式:ACL。ACL,即访问控制列表,是一种相对来说比较新颖且更细粒度的权限管理方式,可以针对任意用户和组进行细粒度的权限控制。目前绝大部分 Unix 系统都已经支持了 ACL 方式的权限控制,Linux 也从 2.6 版本的内核开始支持这个特性。

模式介绍

权限模式:Scheme

权限模式用来确定权限验证过程中使用的检验策略。在 ZooKeeper 中,开发人员使用最多的就是以下四种权限模式。

IP

IP 模式通过 IP 地址粒度来进行权限控制,例如配置了ip:192.168.0.110,即表示权限控制都是针对这个 IP 地址的。同时,IP 模式也支持按照网段的方式进行配置,例如ip:192.168.0.1/24表示针对192.168.0.*这个 IP 段进行权限控制。

Digest

Digest 是最常用的权限控制模式,也更符合我们对于权限控制的认识,其以类似于username:password形式的权限标识来进行权限配置,便于区分不同应用来进行权限控制。

当我们通过username:password形式配置了权限标识后,ZooKeeper 会对其先后进行两次编码处理,分别是 SHA-1 算法加密和 BASE64 编码,其具体实现由DigestAuthenticationProvider.generateDigest(String idPassword)函数进行封装。

World

World 是一种最开放的权限控制模式,从其名字中也可以看出,事实上这种权限控制方式几乎没有任何作用,数据节点的访问权限对所有用户开放,即所有用户都可以在不进行任何权限校验的情况下操作 ZooKeeper 上的数据。另外,World 模式也可以看作是一种特殊的 Digest 模式,它只有一个权限标识,即“world:anyone”。

Super

Super 模式,顾名思义就是超级用户的意思,也是一种特殊的 Digest 模式。在 Super 模式下,超级用户可以对任意 ZooKeeper 上的数据节点进行任何操作。

下图为四种权限模式的区别

权限模式:Permission

权限就是指那些通过权限检査后可以被允许执行的操作。在 ZooKeeper 中,所有对数据的操作权限分为以下五大类:

  • CREATE(C):数据节点的创建权限,允许授权对象在该数据节点下创建子节点。
  • DELETE(D):子节点的删除权限,允许授权对象删除该数据节点的子节点。
  • READ(R):数据节点的读取权限,允许授权对象访问该数据节点并读取其数据内容或子节点列表等。
  • WRITE(W):数据节点的更新权限,允许授权对象对该数据节点进行更新操作。
  • ADMIN(A):数据节点的管理权限,允许授权对象对该数据节点进行 ACL 相关的设置操作。

权限扩展体系

同时,ZooKeeper 提供了特殊的权限控制插件体系,允许开发人员通过指定方式对 ZooKeeper 的杈限进行扩展。这些扩展的权限控制方式就像插件一样插入到 ZooKeeper 的权限体系中去,因此在 ZooKeeper 的官方文档中,也称该机制为“Pluggable ZooKeeper Authentication”。

实现自定义权限控制器

用户可以基于org.apache.zookeeper.server.auth.AuthenticationProvider来进行自定义权限控制器的实现。事实上,在前面内容中提到的几个权限模式,对应的就是 ZooKeeper 自带的 DigestAuthenticationProvider 和 IPAuthenticationProvider 两个权限控制器。

注册自定义权限控制器

完成自定义权限控制器的开发后,接下去就需要将该权限控制器注册到 ZooKeeper 服务器中去了。ZooKeeper 支持通过系统属性和配置文件两种方式来注册自定义的权限控制器。

系统属性 -Dzookeeper.authProvider.X

在 ZooKeeper 启动参数中配置类似于如下的系统属性

-Dzookeeper.authProvider.1=com.zkbook.CustomAuthenticationProvider

配置文件方式

在 zoo.cfg 配置文件中配置类似于如下的配置项:

authProvider.1=com.zkbook.CustomAuthenticationProvider

对于权限控制器的注册,ZooKeeper 采用了延迟加载的策略,即只有在第一次处理包含权限控制的客户端请求时,才会进行权限控制器的初始化。同时,ZooKeeper 还会将所有的权限控制器都注册到 ProviderRegistry 中去。在具体的实现中,ZooKeeper 首先会将 DigestAuthenticationProvider 和 IPAuthenticationProvider 这两个默认的控制器初始化,然后通过扫描zookeeper.authProvider.这一系统属性,获取到所有用户配置的自定义权限控制器,并完成其初始化。

Super模式

根据 ACL 权限控制的原理,一旦对一个数据节点设置了 ACL 权限控制,那么其他没有被授权的 ZooKeeper 客户端将无法访问该数据节点,这的确很好地保证了 ZooKeeper 的数据安全。但同时,ACL 权限控制也给 ZooKeeper 的运维人员带来了一个困扰:如果一个持久数据节点包含了 ACL 权限控制,而其创建者客户端已经退出或已不再使用,那么这些数据节点该如何清理呢?这个时候,就需要在 ACL 的 Super 模式下,使用超级管理员权限来进行处理了。要使用超级管理员权限,首先需要在 ZooKeeper 服务器上开启 Super 模式,方法是在 ZooKeeper 服务器启动的时候,添加如下系统属性:

-Dzookeeper.DigestAuthenticationProvider.superDigest=foo:kWN6aNSbjcKWPqjiv7cgoN24raU=