介绍
官网:Apache ZooKeeper
Zookeeper 是一个开源的分布式的,为分布式框架提供协调服务的 Apache 项目。
从设计模式的角度来理解,Zookeeper是一个基于观察者模式设计的分布式服务管理框架,负责存储和管理系统服务的关键数据。
特点:
- 一个Leader,多个Follower组成的集群。
- 集群中只要有半数以上的节点存活,Zookeeper集群就能正常服务,所以集群机器一般为奇数
- 全局数据一致:每个Server保存一份相同的数据副本
- 更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。
- 数据更新原子性,一次数据更新要么成功,要么失败。
- 实时性,在一定时间范围内,Client能读到最新数据。
ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个 ZNode。每一个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识。
应用场景:
Zookeeper能够提供的服务包括::统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下 线、软负载均衡等。
统一命名服务:在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。
统一配置管理:可以将配置信息写入一个Znode,各个客户端服务器监听这个Znode。
比如Kafka集群中,可以保证所有节点的配置信息是一致的。
统一集群管理:ZooKeeper可以实现实时监控节点状态变化;监听对应的ZNode可获取到对应的节点信息。
服务器动态上下线:客户端可以通过Zookeeper实时得知服务器上下线的变化。
软负载均衡:Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求
安装
镜像下载:/apache/zookeeper(tsinghua.edu.cn)
1 2 3 4 5 6 7 8 9 10 11 12 13
| tar -zxvf apache-zookeeper-3.7.2-bin.tar.gz -C /opt/app/
mv apache-zookeeper-3.7.2 -bin/ zookeeper
mkdir zkData
mv zoo_sample.cfg zoo.cfg
dataDir=/opt/app/zookeeper/zkData server.1=localhost:2888:3888
touch myid
|
配置文件不一定要使用zoo.cfg的名称
myid文件中只需要添加与server对应的编号
配置参数解读:
- tickTime = 2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
- initLimit = 10:LF初始通信时限。Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)
- syncLimit = 5:LF同步通信时限;超过时限Leader就会认为Follwer宕机
- dataDir:保存Zookeeper中的数据
- clientPort = 2181:客户端连接端口,通常不做修改。
server.A=B:C:D
:A表示是第几号服务器,记录在myid文件中;B是服务器的地址;C是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;D是用来执行选举时服务器相互通信的端口。
启停指令
1 2 3 4 5 6 7 8 9 10 11 12
| bin/zkServer.sh start
jps
bin/zkServer.sh status
bin/zkCli.sh
quit
bin/zkServer.sh stop
|
启动脚本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| #!/bin/bash case $1 in "start"){ echo ---------- zookeeper 启动 ------------ /opt/app/zookeeper/bin/zkServer.sh start };; "stop"){ echo ---------- zookeeper 关闭 ------------ /opt/app/zookeeper/bin/zkServer.sh stop };; "status"){ echo ---------- zookeeper 状态 ------------ /opt/app/zookeeper/bin/zkServer.sh status };; esac
|
选举机制
Zookeeper 内部维护有一些信息来决定 Leader 的归属:
- SID:服务器ID;用来唯一标识集群中的机器,不能重复,和myid一致
- ZXID:事务ID;ZXID是一个事务ID,用来标识一次服务器状态的变更。
在某一时刻, 集群中的每台机器的ZXID值不一定完全一 致,这和ZooKeeper服务器对于客户端“更 新请求”的处理逻辑有关。
- Epoch:每个Leader任期的代号;没有 Leader 时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加
首次启动
假设当前的集群由五台Zookeeper组成。
- 1号启动,发起第一次选举;优先投给自己,此时票数为1,选举无法完成(小于3),服务器状态保持为LOOKING;
- 2号启动,发起第二次选举;1号和2号各自投自己一票并交换选票信息,此时1号发现2号的myid比自己当前投票推举的(1号)更大,更改选票推举2号。此时1号0票,2号两票,选举无法完成,1号、2号保持LOOKING;
- 3号启动,发起第三次选举;根据之前的规律,选举完成后3号持有三票,3号成为Leader。1,2号状态为FOLLOWING,3号更改状态为LEADING;
- 4号启动,发起第四次选举,但此时1,2,3已经不是LOOKING状态,不会更改选票信息。结果为3号3票,4号1票。4号服从多数将更改选票为3号,状态变为FOLLOWING;
- 5号启动,成为FOLLOWING;
非首次启动
当集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:
- 服务器初始化启动
- 服务器运行期间无法和Leader保持连接
当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:
选举阶段会被告知Leader的信息,仅需要和Leader建立连接,同步状态即可。
Leader选举规则:
- EPOCH大的胜出
- EPOCH相同,事务id大的胜出
- 事务id相同,服务器id大的胜出
集群搭配推荐:10 台服务器:3 台 zk;20 台服务器:5 台 zk;100 台及以上服务器:11 台 zk;
服务器台数越多;好处:提高可靠性;坏处:提高通信延时
客户端操作命令
命令基本语法 |
功能描述 |
help |
显示所有操作命令 |
ls [path] |
使用 ls 命令来查看当前 znode 的子节点 [可监听] -w 监听子节点变化 -s 附加次级信息 |
create |
普通创建 -s 含有序列 -e 临时(重启或者超时消失) |
get [path] |
获得节点的值 [可监听] -w 监听子节点变化 -s 附加次级信息 |
set |
设置节点的具体值 |
stat |
查看节点状态 |
delete |
删除节点 |
deleteall |
递归删除节点 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| [zk: localhost:2181(CONNECTED) 2] create -e /test Created /test [zk: localhost:2181(CONNECTED) 3] ls -s / [test, zookeeper] cZxid = 0x0 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x0 mtime = Thu Jan 01 08:00:00 CST 1970 pZxid = 0xd cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 2
|
- cZxid:创建节点的事务 zxid
- ctime:znode 被创建的毫秒数(从 1970 年开始)
- mzxid:znode 最后更新的事务 zxid
- mtime:znode 最后修改的毫秒数(从 1970 年开始)
- pZxid:znode 最后更新的子节点 zxid
- cversion:znode 子节点变化号,znode 子节点修改次数
- dataversion:znode 数据变化号
- aclVersion:znode 访问控制列表的变化号
- ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是 临时节点则是 0。
- dataLength:znode 的数据长度
- numChildren:znode 子节点数量
节点类型
Zookeeper中的节点分为两种类型:
- 持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除
- 短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除
创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护
临时节点不能有孩子节点
在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。
1 2 3 4 5 6 7 8
| create /test
create /test/city "hangzhou"
get /test/city
set /test/city "wuhan"
|
节点只能绑定一个数据
简单理解: 路径:数据
相当于Java中Map的Key:Value
监听器
Zookeeper中内置两种监听指令:
get -w [path]
:监听节点数据的变化
ls -w [path]
:监听子节点增减的变化
原理:
- 客户端创建main()线程
- 在main线程中连接zookeeper客户端,再创建两个线程,一个负责网络连接(connet),一个负责监听(listener)
- 通过connet线程将注册的监听事件发送给Zookeeper
- 在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中
- Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程
- listener线程内部调用了process()方法
注意:调用一次监听指令(get/ls)只能监听一次变化
Java客户端调用
引入依赖:
1 2 3 4 5
| <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.7.2</version> </dependency>
|
实践:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| public class ZookeeperTest { private ZooKeeper zk; private final String connectString = "192.168.124.105:2181"; private final int sessionTime = 20000;
public static void main(String[] args) throws Exception { ZookeeperTest test = new ZookeeperTest(); test.connect(); System.out.println(test.create("/servers", "hello".getBytes())); System.out.println(test.getList("/")); System.out.println(test.setData("/servers", "hi")); System.out.println(test.getData("/servers")); test.delNode("/servers");
System.out.println(test.getList("/"));
test.close(); }
private void close() throws InterruptedException { zk.close(); }
private void delNode(String path) throws InterruptedException, KeeperException { zk.delete(path, -1); }
private String getData(String path) throws InterruptedException, KeeperException { return new String(zk.getData(path, false, null)); }
private Stat setData(String path, String data) throws InterruptedException, KeeperException { return zk.setData(path, data.getBytes(), -1); }
private List<String> getList(String path) throws InterruptedException, KeeperException { return zk.getChildren(path, false); }
private String create(String path, byte[] data) throws InterruptedException, KeeperException { return zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); }
private void connect() throws IOException, InterruptedException { zk = new ZooKeeper(connectString, sessionTime, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("===========监听记录============"); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getState()); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getWrapper()); try { System.out.println(zk.getChildren("/", true)); } catch (Exception e) { throw new RuntimeException(e); } System.out.println("===========监听结束============"); } }); } }
|
其实创建节点就相当于在Zookeeper中注册,当连接断开就表示服务停止了。
分布式锁案例
手动实现分布式锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| public class ZookeeperLock { private ZooKeeper zk; private final String connectString = "192.168.124.105:2181"; private final int sessionTime = 20000; private String current; private String watchNode = ""; private final CountDownLatch connect = new CountDownLatch(1); private final CountDownLatch wait = new CountDownLatch(1);
public ZookeeperLock() { connect(); }
private void connect() { try { zk = new ZooKeeper(connectString, sessionTime, (watchedEvent) -> { if (watchedEvent.getState().equals(Watcher.Event.KeeperState.SyncConnected)) { connect.countDown(); } if (watchedEvent.getPath().equals(watchNode) && watchedEvent.getType().equals(Watcher.Event.EventType.NodeDeleted)) { wait.countDown(); } }); connect.await(); Stat exists = zk.exists("/locks", false); if (exists == null) { zk.create("/locks", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { throw new RuntimeException(e); } }
public void lock() throws Exception { current = zk.create("/locks/task", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); List<String> children = zk.getChildren("/locks", false); if (children.size() == 1) { return; } long cur = Long.parseLong(current.substring("/locks/task".length())); for (String temp : children) { long watch = Long.parseLong(temp.substring("/task".length())); if (watch < cur) { watchNode = "/locks/" + temp; } } if (Objects.equals(watchNode, "")) { return; } zk.getData(watchNode, true, null); wait.await(); }
public void unlock() throws Exception { zk.delete(current, -1); } }
|
测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class LockTest { public static void main(String[] args) { ZookeeperLock lock1 = new ZookeeperLock(); ZookeeperLock lock2 = new ZookeeperLock(); new Thread(() -> { try { lock1.lock(); System.out.println("线程1上锁"); TimeUnit.SECONDS.sleep(3); System.out.println("线程1解锁"); lock1.unlock(); } catch (Exception e) { throw new RuntimeException(e); } }).start(); new Thread(() -> { try { lock2.lock(); System.out.println("线程2上锁"); TimeUnit.SECONDS.sleep(3); System.out.println("线程2解锁"); lock2.unlock(); } catch (Exception e) { throw new RuntimeException(e); } }).start(); } }
|
Curator
原生JavaAPI开发存在的问题:
- 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
- Watch 需要重复注册,不然就不能生效
- 不支持多节点删除和创建
Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。
官方文档:Welcome to Apache Curator | Apache Curator
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> </dependency>
|
Zookeeper 实现分布式锁的场景并不多,这里仅提供一个简单例子作为使用参考:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorLockTest { private String rootNode = "/locks"; private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private int connectionTimeout = 2000; private int sessionTimeout = 2000; public static void main(String[] args) { new CuratorLockTest().test(); } private void test() { final InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode); final InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode); new Thread(new Runnable() { @Override public void run() { try { lock1.acquire(); System.out.println("线程 1 获取锁"); lock1.acquire(); System.out.println("线程 1 再次获取锁"); Thread.sleep(5 * 1000); lock1.release(); System.out.println("线程 1 释放锁"); lock1.release(); System.out.println("线程 1 再次释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { lock2.acquire(); System.out.println("线程 2 获取锁"); lock2.acquire(); System.out.println("线程 2 再次获取锁"); Thread.sleep(5 * 1000); lock2.release(); System.out.println("线程 2 释放锁"); lock2.release(); System.out.println("线程 2 再次释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } public CuratorFramework getCuratorFramework (){ RetryPolicy policy = new ExponentialBackoffRetry(3000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(connectString) .connectionTimeoutMs(connectionTimeout) .sessionTimeoutMs(sessionTimeout) .retryPolicy(policy).build(); client.start(); System.out.println("zookeeper 初始化完成..."); return client; } }
|
简单测试了一下可重入锁的实现
源码分析
继续学习:30_尚硅谷_zk_算法基础_拜占庭将军问题_哔哩哔哩_bilibili