zookeeper使用手册
ZooKeeper 简介
Apache ZooKeeper 是 Apache 软件基金会的一个软件项目,使用 Java 语言编写,为大型分布式系统提供开源分布式配置服务(存储)、同步服务(分布式锁)和命名注册表(服务注册与发现)。 ZooKeeper 是 Hadoop 的子项目,但现在它本身就是一个顶级项目。
ZooKeeper 的核心优势是实现了分布式环境的数据一致性,简单地说:每时每刻我们访问 ZooKeeper 的树结构时,不同的节点返回的数据都是一致的。也就是说,对 ZooKeeper 进行数据访问时,无论是什么时间,都不会引起“脏读”“幻读”“不可重复读”问题。
Zookeeper 中节点类型主要有持久节点、临时节点、持久顺序节点、临时顺序节点;临时节点在创建它的客户端会话有效期内存在;
ZooKeeper 集群选主需满足可用节点数 > 总节点数 / 2,以处理集群脑裂问题;ZooKeeper 集群至少需有 3 个节点;
安装&配置
docker-compose.yml 参考地址
提示
安装的 ZK 版本取决于 Curator 最大支持版本;Curator 是一个 ZK 的 Java 客户端工具
详见 Java 应用集成 Zookeeperversion: "3.8" services: zoo1: image: zookeeper:3.7.1 restart: always hostname: zoo1 ports: - 2181:2181 environment: TZ: Asia/Shanghai # myid作用:是一个文本文件,存放节点编号;其值只能是数字,范围为1~255;默认存放于 /data 目录 ZOO_MY_ID: 1 # 设置集群;容器启动后,关联的配置文件在 /conf/zoo.dfg 中可查看 # 详情见下节配置文件说明 ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181 zoo2: image: zookeeper:3.7.1 restart: always hostname: zoo2 ports: - 2182:2181 environment: ZOO_MY_ID: 2 ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181 zoo3: image: zookeeper:3.7.1 restart: always hostname: zoo3 ports: - 2183:2181 environment: ZOO_MY_ID: 3 ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181配置文件说明
配置文件路径: /conf/zoo.dfg
# 数据目录,myid文件就存放于此目录中 dataDir=/data # 日志目录 dataLogDir=/datalog # 单元时间(服务于其他时间设置,相当于一个计数单位),默认3000ms; tickTime=2000 # 从节点从启动到同步主节点数据所用时间,默认为10;此处表示为单元时间的5倍(10s); initLimit=5 # 主节点向从节点发送心跳检测最大延迟时间,默认为5;此处表示为单元时间的2倍(4s); syncLimit=2 # 定时清理功能--指定需要保留的文件数目 autopurge.snapRetainCount=3 # 定时清理功能--指定清理频率,单位是小时;默认为0,表示关闭自动清理 autopurge.purgeInterval=0 # 单个客户端允许的最大连接数 maxClientCnxns=60 # 是否以独立模式运行;设置为true可以在独立模式或分布式模式下运行ZooKeeper standaloneEnabled=true # 是否启用了 Admin 服务供管理和监控 admin.enableServer=true # server.id中的id和myid设置的一致 # 若在同一ip上(不使用容器)部署多个节点,需保证每个节点下面三组端口不同; # 2888端口--用于各个节点之间通信 # 3888端口--选主(选举Leader)端口 # 2181端口--客户端程序连接Zookeeper集群中的节点端口 server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
提示
PrettyZoo是一个国产的 ZK 图形化管理客户端,能以树形结构的方式查看各个节点的数据。
分布式存储
存储模型
Zookeeper 的存储模型类似于文件系统的层级树状结构,相当于数据结构中的多叉树(Multiway Tree)或者有序树(Ordered Tree),每个节点可以有多个子节点,而不仅限于二叉树的左右子节点。存储模型中每个节点被称为 ZNode,每个 ZNode 都用一个完整的路径来唯一标识,如'/fruit/apple';
Zookeeper 为了保证高吞吐和低延迟,整个树状目录结构全部存放于内存中,使得 ZNode 节点不能存放太大的数据,官方要求是每个节点的负载数据上限是1M;
使用 zkCli.sh 增删改查
提示
实际开发中使用开源客户端 Curator 去操作 Zookeeper 集群,他能很方便的整合进 Java 应用,并提供了便捷的 api ,本小节内容了解即可
zkCli 常用指令
注意,以下指令都需要指明 ZNode 路径
zkCli 常用指令 功能简介 create 创建路径节点 ls 查看路径下的所有节点 get 获取节点值 set 修改节点值 delete 删除节点 stat 节点状态信息 测试
提示
使用客户端(zkCli)连接 ZK 集群时,可连接集群中的任意节点,客户端会自动与集群中的 Leader 进行通信;
# 交互进入任意ZK容器,并连接到ZK集群 docker exec -it zookeeper-zoo1-1 bash cd /apache-zookeeper-3.7.1-bin/bin ./zkCli.sh -server localhost:2181 # 节点操作,注意特殊字符需用转义符'\' create /fruit 'hello, I am fruit\!' create /fruit/apple 'I am a big apple\!' ls /fruit stat /fruitZNode 信息主要属性介绍
使用 stat 指令可查看 ZNode 节点的状态,ZNode 状态的每一次改变都对应着一个递增的事务 ID,称为 Zxid,在节点状态中保存着 3 个 Zxid,如下为节点状态的主要属性介:
属性名 描述 cZxid 创建时事务 ID; mZxid 修改时事务 ID,与字节的无关; pZxid 子节点最后一次创建或修改事务 ID,与孙子节点无关; ctime 创建时间戳 mtime 修改时间戳 cversion 子节点版本号 aclVersion ACL(访问控制列表,包括读、写、删除等权限) 版本号 dataversion 数据版本号 ephemeralOwner 临时节点拥有者会话 ID dataLength 数据长度(字节) numChildren 子节点数量
Java 应用集成 Zookeeper
Java 客户端去操作 Zookeeper 集群主要有 ZkClient 和 Curator ;尽管 ZkClient 提供了相对简洁的 API、实现了 Session 会话超时重连、解决了 Watcher 反复注册问题等,但其社区不活跃、文档不够完善、重试机制难以使用等;相对的,Curator 是 Apache 的顶级项目之一,提供了一套非常优雅的链式调用 API,还提供了一些分布式开发的开箱即用的解决方案,如 Recipes、共享锁服务、Master 选举机制、分布式计算器等;总之,Curator 比 ZkClient 优雅太多,就连 ZK 的作者都对 Curator 给予高度评价,因此我们使用 Curator 集成进 Java 应用来操作 ZK。
提示
Curator [kjʊˈreɪtər] 馆长
添加依赖
提示
Curator 和 ZK 版本不匹配会导致兼容问题,在 Curator 官网貌似没找到对应的版本匹配列表,可以在 maven 仓库中查看 Curator 源码 pom 文件,找到关联 ZK 的版本,目前 apache-curator 的最新版本为 5.6.0,与之对应的 ZK 版本为 3.7.2;
<properties> <curator.version>5.5.0</curator.version> </properties> <dependencies> <!-- recipes封装了一些高级特性,如事件监听、选举、分布式锁、分布式计数器等 --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>${curator.version}</version> </dependency> <!-- framework是对ZK底层API的一些封装 --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>${curator.version}</version> </dependency> <!-- client提供一些客户端的操作,如重试策略等 --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>${curator.version}</version> </dependency> </dependencies>使用 Curator 实现节点的 CRUD
- 创建客户端实例,这里提供两种创建实例的方式,如下:
public class MyCuratorClient { static String connectString = "192.168.2.103:2181"; // 重试策略;第一个参数表示等待时间单位,第二个参数表示最大重试次数 static ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); // 方式一: public static CuratorFramework createClientSimple(){ return CuratorFrameworkFactory.newClient(connectString, retryPolicy); } // 方式二: public static CuratorFramework createClientWithOptions(){ return CuratorFrameworkFactory.builder() .connectString(connectString) .retryPolicy(retryPolicy) .connectionTimeoutMs(5000) // 连接超时时间 .sessionTimeoutMs(5000) // 会话超时时间 .build(); } }- CRUD
static CuratorFramework clientSimple = MyCuratorClient.createClientSimple(); static String path = "/test/CRUD/simple";新增public static void create(){ try { // 启动客户端实例,连接服务器 clientSimple.start(); String createdPath = clientSimple.create() .creatingParentsIfNeeded() // PERSISTENT表示节点类型为持久化 // PERSISTENT_SEQUENTIAL表示持久化顺序节点,若在创建子节点时设置此属性,ZK会自动为其节点名后加上一个数字后缀,以记录先后顺序 // EPHEMERAL表示临时节点,客户端会话失效会自动清除,但其不是立马清除,会存留大概10秒以内;临时节点不能创建子节点 // EPHEMERAL_SEQUENTIAL临时顺序节点,可用于分布式锁的实现 .withMode(CreateMode.PERSISTENT) .forPath(path, "hello i am testing...".getBytes(StandardCharsets.UTF_8)); log.info("新增成功,路径为: {}",createdPath); }catch (Exception e){ e.printStackTrace(); } finally { // 关闭连接 CloseableUtils.closeQuietly(clientSimple); } }查看public static void get(){ try { clientSimple.start(); Stat stat = clientSimple.checkExists().forPath(path); if(null!=stat) { // 读取节点数据 byte[] bytes = clientSimple.getData().forPath(path); String data = new String(bytes,StandardCharsets.UTF_8); log.info("read data is: " + data); // 读取子节点 String parentPath = "/test"; List<String> strings = clientSimple.getChildren().forPath(parentPath); log.info("read '/test' child is: "); strings.forEach(log::info); } }catch (Exception e){ e.printStackTrace(); } finally { CloseableUtils.closeQuietly(clientSimple); } }更新public static void update(){ try { /** 设置异步更新回调方法 * 参数说明: * int i: 返回代码或调用的结果 * string s: 传递给异步调用的路径 * object o: 传递给异步调用的任何上下文对象 * string s1: 已创建的znode的名称。成功时,名称和路径通常相等,除非已创建顺序节点。 */ AsyncCallback.StringCallback callback = (i, s, o, s1) -> log.info("更新成功后,我被调用: " + s1); clientSimple.start(); // 异步更新 Stat stat = clientSimple.setData() .inBackground(callback) .forPath(path, "hello i am change".getBytes(StandardCharsets.UTF_8)); log.info("异步更新成功!: " + stat.toString()); // 同步更新 // clientSimple.setData().forPath(path, "hello i am change".getBytes(StandardCharsets.UTF_8)); }catch (Exception e){ e.printStackTrace(); } finally { CloseableUtils.closeQuietly(clientSimple); } }删除public static void del(){ try { clientSimple.start(); clientSimple.delete().forPath(path); log.info("同步删除成功!: "); // 异步删除和异步更新同理,不再叙述 }catch (Exception e){ e.printStackTrace(); } finally { CloseableUtils.closeQuietly(clientSimple); } }
分布式命名
命名服务是为系统中的资源提供标识能力。需要用到分布式资源标识能力的场景有很多,其中有 3 个经典的命名场景:分别是分布式 API 目录、分布式 ID 生成器、分布式节点命名;
分布式 API 目录
借助 ZK 的树形分层结构,可维护全局的 API 地址,Dubbo 就使用了这一特性,其大致思路为:
- 服务提供者启动时向 ZK 的指定节点写入自己的 API 地址,如向/dubbo/${serviceName}/provider 写入其 API 地址;
- 服务消费者在启动时订阅节点/dubbo/${serviceName}/provider,以获取 API
分布式 ID 生成器
利用顺序节点会自动添加序号后缀的特性,这些节点的序号可以作为分布式环境中唯一的递增 ID。如新建一个 /test/order_ 顺序节点,其返回可能是 /test/order_0000000010,其中 0000000010 可作为唯一的分布式 ID;
这种方法简单易行,然而在高并发情况下,频繁地创建节点可能会影响性能,需要根据具体情况考虑其适用性和性能影响。
注意,ZooKeeper 的顺序节点生成的递增 ID 是一个 int 类型的整数(4 字节 32 位),意味着 ID 有个数限制(接近 43 亿个);虽然看似可以生成大量 ID, 但 ZK 对于单个父节点下子节点的数量有限制(默认 10000 个),当达到这个限制将无法创建更多的子节点!
分布式节点命名
分布式应用节点的命名主要是为各个应用节点进行唯一编号,可使用 ZK 的持久顺序节点实现,其基本流程如下:
- 启动节点服务,检查 ZK 命名根节点(如"/test/app-")是否存在(不存在则创建);
- 在 ZK 命名根节点下创建顺序节点,并取回编号(序号后缀);
分布式事件监听
在 Curator API 中,事件监听有两种模式:标准的观察者模式和缓存监听
标准的观察者模式通过 Watch 监听器来实现,只能监听一次;Cache(缓存)事件监听机制,客户端通过本地缓存视图和远程 ZK 视图对比,当感应到节点状态变化时触发事件,拥有事件监听器反复注册的能力;下面分别介绍这两种模式的实现:
提示
貌似事件机制并不是绝对可靠的,在极端条件下可能丢失事件通知
Watcher 标准的事件处理器
Watcher 监听器完成注册后,某些事件触发了这个 Watcher 时,会向客户端发送一个事件通知;Watcher 监听器注册成功后 只会触发一次 ,因此他只适用于一些特殊的、变动不频繁的场景,如会话超时、授权失败等;
@Slf4j // ZK事件监听 public class ZKWatcherDemo { static CuratorFramework clientSimple = MyCuratorClient.createClientSimple(); static String workPath = "/test/watcher/testNode"; public static void testWatcher() { try { clientSimple.start(); // 检查节点是否存在,不存在则创建 Stat stat = clientSimple.checkExists().forPath(workPath); if (null == stat ) clientSimple.create().creatingParentsIfNeeded().forPath(workPath,"test_watcher".getBytes(StandardCharsets.UTF_8)); // 定义监听回调方法 Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { log.info("监听到节点发生变化,我被执行了,但我只会执行一次!:" + event); } }; // 查询时为节点注册监听器 String data = new String(clientSimple.getData() .usingWatcher(watcher).forPath(workPath)); log.info("读取节点内容:{}",data); // 更变节点数据 for (int i = 1; i <= 3; i++) { clientSimple.setData().forPath(workPath,("第"+ i +"次更变内容").getBytes(StandardCharsets.UTF_8)); log.info("第"+ i +"次更变内容"); } // 事件回调触发可能不是及时的,让子弹飞一会儿 Thread.sleep(15000); // 删除测试的节点 clientSimple.delete().forPath(workPath); } catch (Exception e){ e.printStackTrace(); } finally { CloseableUtils.closeQuietly(clientSimple); } } }缓存监听
Curator 的缓存机制不仅用于缓存,还能实现事件监听;包括 NodeCache(监听指定节点)、PathCache(监听子节点)、TreeCache(即监听节点,又监听子节点)、CuratorCache(即监听节点,又监听子节点) 四类;其中前三类在新版本中已被弃用,CuratorCache 则可替代他们;Cache 提供了事件监听器重复注册的能力;如下为四种事件监听机制的实现:
CuratorCache@Slf4j public class ZKCuratorCacheDemo { static CuratorFramework clientSimple = MyCuratorClient.createClientSimple(); static String workPath = "/test/cache/testNode"; static String workPathChild = workPath + "/id-"; public static void testCuratorCache() { try { clientSimple.start(); // 检查节点是否存在,不存在则创建 Stat stat = clientSimple.checkExists().forPath(workPath); if (null == stat) clientSimple.create().creatingParentsIfNeeded().forPath(workPath, "testNode_curatorCache".getBytes(StandardCharsets.UTF_8)); // 实现监听器,编辑回调方法 CuratorCache curatorCache = CuratorCache.build(clientSimple, workPath); CuratorCacheListener curatorCacheListener = new CuratorCacheListener() { @Override public void event(Type type, ChildData oldData, ChildData data) { try { switch (type) { case NODE_CREATED -> log.info("【增加】节点时触发事件回调,path={},data={}", data.getPath(), new String(data.getData(), StandardCharsets.UTF_8)); case NODE_CHANGED -> log.info("【修改】节点触发事件回调,path={},data={}", data.getPath(), new String(data.getData(), StandardCharsets.UTF_8)); case NODE_DELETED -> log.info("【删除】节点触发事件回调,path={}", oldData.getPath()); default -> log.info("其他事件类型被触发,祥见type枚举"); } } catch (Exception e) { e.printStackTrace(); } } @Override public void initialized() { // 当缓存启动时,初始节点被跟踪,并且当它们完成加载到缓存中时,这个方法被调用。 log.info("我已经完成数据缓存了。。。"); } }; // 注册事件监听 curatorCache.listenable().addListener(curatorCacheListener); curatorCache.start(); // 修改当前节点数据 clientSimple.setData().forPath(workPath, "啊,我是主节点,我被修改了".getBytes(StandardCharsets.UTF_8)); // 新建子节点数据 for (int i = 1; i <= 2; i++) { clientSimple.create() .forPath(workPathChild + i, ("第" + i + "次新增子节点内容").getBytes(StandardCharsets.UTF_8)); log.info("第" + i + "次新子节点增内容"); // 事件回调触发可能不是及时的,让子弹飞一会儿 Thread.sleep(15000); clientSimple.delete().forPath(workPathChild + i); } // 清除测试数据关闭缓存 clientSimple.delete().forPath(workPath); curatorCache.close(); } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(clientSimple); } } }NodeCache@Slf4j public class ZKCacheDemo { static CuratorFramework clientSimple = MyCuratorClient.createClientSimple(); static String workPath = "/test/cache/testNode"; public static void testNodeCache() { try { clientSimple.start(); // 检查节点是否存在,不存在则创建 Stat stat = clientSimple.checkExists().forPath(workPath); if (null == stat) clientSimple.create().creatingParentsIfNeeded().forPath(workPath, "testNode_nodeCache".getBytes(StandardCharsets.UTF_8)); // 定义NodeCache回调,第三个参数表示不压缩数据 NodeCache nodeCache = new NodeCache(clientSimple, workPath, false); NodeCacheListener nodeCacheListener = new NodeCacheListener() { @Override public void nodeChanged() { ChildData childData = nodeCache.getCurrentData(); log.info("状态改变,回调开始,节点数据为:{}", new String(childData.getData(), StandardCharsets.UTF_8)); } }; // 启动节点事件监听 nodeCache.getListenable().addListener(nodeCacheListener); nodeCache.start(); // 更变节点数据 for (int i = 1; i <= 3; i++) { clientSimple.setData().forPath(workPath, ("第" + i + "次更变内容").getBytes(StandardCharsets.UTF_8)); log.info("第" + i + "次更变内容"); // 事件回调触发可能不是及时的,让子弹飞一会儿 Thread.sleep(15000); } // 关闭缓存并删除节点 nodeCache.close(); clientSimple.delete().forPath(workPath); } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(clientSimple); } } }PathCache@Slf4j public class ZKCacheDemo { static CuratorFramework clientSimple = MyCuratorClient.createClientSimple(); static String workPath = "/test/cache/testNode"; static String workPathChild = workPath + "/id-"; public static void testPathCache() { try { clientSimple.start(); // 检查节点是否存在,不存在则创建 Stat stat = clientSimple.checkExists().forPath(workPath); if (null == stat) clientSimple.create().creatingParentsIfNeeded().forPath(workPath, "testNode_pathChildrenCache".getBytes(StandardCharsets.UTF_8)); // 定义PathCache回调,第三个参数表示是否缓存 PathChildrenCache pathChildrenCache = new PathChildrenCache(clientSimple, workPath, true); PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { try { ChildData data = event.getData(); switch (event.getType()) { case CHILD_ADDED -> log.info("【增加】子节点时触发事件回调,path={},data={}", data.getPath(), new String(data.getData(), StandardCharsets.UTF_8)); case CHILD_UPDATED -> log.info("【修改】子节点触发事件回调,path={},data={}", data.getPath(), new String(data.getData(), StandardCharsets.UTF_8)); case CHILD_REMOVED -> log.info("【删除】子节点触发事件回调,path={}", data.getPath()); default -> log.info("其他事件类型被触发,祥见type枚举"); } } catch (Exception e) { e.printStackTrace(); } } }; // 注册事件监听 pathChildrenCache.getListenable().addListener(childrenCacheListener); // 设置启动项 // BUILD_INITIAL_CACHE——同步初始化cache,从服务器拉取对应数据; // POST_INITIALIZED_EVENT——异步初始化cache,从服务器拉取对应数据,拉取完成后触发INITIALIZED事件(对应event.getType()); // NORMAL——异步初始化cache,不触发任何事件; pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); // 新建子节点数据 for (int i = 1; i <= 3; i++) { clientSimple.create().forPath(workPathChild + i, ("第" + i + "次新增内容").getBytes(StandardCharsets.UTF_8)); log.info("第" + i + "次新增内容"); // 事件回调触发可能不是及时的,让子弹飞一会儿 Thread.sleep(15000); clientSimple.delete().forPath(workPathChild + i); } // 清除测试数据关闭缓存 clientSimple.delete().forPath(workPath); pathChildrenCache.close(); } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(clientSimple); } } }TreeCache@Slf4j public class ZKCacheDemo { static CuratorFramework clientSimple = MyCuratorClient.createClientSimple(); static String workPath = "/test/cache/testNode"; static String workPathChild = workPath + "/id-"; public static void testTreeCache() { try { clientSimple.start(); // 检查节点是否存在,不存在则创建 Stat stat = clientSimple.checkExists().forPath(workPath); if (null == stat) clientSimple.create().creatingParentsIfNeeded().forPath(workPath, "testNode_treeCache".getBytes(StandardCharsets.UTF_8)); TreeCache treeCache = new TreeCache(clientSimple, workPath); TreeCacheListener treeCacheListener = new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { try { ChildData data = event.getData(); switch (event.getType()) { case NODE_ADDED -> log.info("【增加】节点时触发事件回调,path={},data={}", data.getPath(), new String(data.getData(), StandardCharsets.UTF_8)); case NODE_UPDATED -> log.info("【修改】节点触发事件回调,path={},data={}", data.getPath(), new String(data.getData(), StandardCharsets.UTF_8)); case NODE_REMOVED -> log.info("【删除】节点触发事件回调,path={}", data.getPath()); default -> log.info("其他事件类型被触发,祥见type枚举"); } } catch (Exception e) { e.printStackTrace(); } } }; // 注册事件监听 treeCache.getListenable().addListener(treeCacheListener); treeCache.start(); // 修改当前节点数据 clientSimple.setData().forPath(workPath, "啊,我被修改了".getBytes(StandardCharsets.UTF_8)); // 新建子节点数据 for (int i = 1; i <= 2; i++) { clientSimple.create() .forPath(workPathChild + i, ("第" + i + "次新增内容").getBytes(StandardCharsets.UTF_8)); log.info("第" + i + "次新增内容"); // 事件回调触发可能不是及时的,让子弹飞一会儿 Thread.sleep(15000); clientSimple.delete().forPath(workPathChild + i); } // 清除测试数据关闭缓存 clientSimple.delete().forPath(workPath); treeCache.close(); } catch (Exception e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(clientSimple); } } }
分布式锁
最经典的分布式锁是可重入的公平锁;
ZK 的临时顺序节点天生就是实现分布式锁的胚子,其子节点同步自增,可作为排号依据,且 ZK 的节点监听机制可保障占有锁传递的高效和有序,且能避免羊群效应;
基于 ZK 的分布式锁,适用于高可用且并发量不是很大的场景,ZK 内部优越的机制,能够保证占用锁的客户端失联时能够被有效释放(ZNode 会自动删除并能通知后面的节点);
ZK 实现分布式锁的基本流程:
- 创建一个父节点(以/test/lock 为例),尽量是持久节点,代表一个业务的锁;
- 要获得锁的线程,在 /test/lock 节点下创建一个临时顺序节点 /test/lock/id- ,获取自己的排号;
- 每个线程在获得锁之前,判断自己创建的子节点是否为当前子节点列表中最小的子节点,如果是则认为加锁成功;否则监听前一个 ZNode 变更消息,等待其他线程释放锁;
- 每个线程在获得锁后,完成业务流程需要删除创建的 ZNode,完成释放锁的工作;
Curator 客户端提供各种官方实现的分布式锁,比如 InterProcessMutex,如下所示:
@Slf4j
public class TestLock {
static CuratorFramework clientSimple = MyCuratorClient.createClientSimple();
static String workPath = "/test/lock/testNode";
// 需要锁来保护的公共资源
int count = 0;
public void test() throws Exception {
clientSimple.start();
// 创建互斥锁
final InterProcessMutex interProcessMutex = new InterProcessMutex(clientSimple, workPath);
// 启动10个线程
for (int i = 0; i < 10; i++) {
Runnable runnable = () -> {
try {
// 获取互斥锁
interProcessMutex.acquire();
// 每条线程执行10次累加
for (int j = 0; j < 10; j++) {
count++;
}
// 释放互斥锁
interProcessMutex.release();
} catch (Exception e) {
throw new RuntimeException(e);
}
};
runnable.run();
}
Thread.sleep(15000);
log.info("累加结果为:{}", count);
CloseableUtils.closeQuietly(clientSimple);
}
}分布式计数器
ZooKeeper 分布式计数器的优势是高可用,劣势是低性能。在高并发场景下,分布式计数器需要基于 Redis 去实现,以下学习即可。

