Zookeeper
发表于:2023-12-25 | 分类: 中间件
字数统计: 3.9k | 阅读时长: 16分钟 | 阅读量:

介绍

官网:Apache ZooKeeper

Zookeeper 是一个开源的分布式的,为分布式框架提供协调服务的 Apache 项目。

从设计模式的角度来理解,Zookeeper是一个基于观察者模式设计的分布式服务管理框架,负责存储和管理系统服务的关键数据。

特点:

  1. 一个Leader,多个Follower组成的集群。
  2. 集群中只要有半数以上的节点存活,Zookeeper集群就能正常服务,所以集群机器一般为奇数
  3. 全局数据一致:每个Server保存一份相同的数据副本
  4. 更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。
  5. 数据更新原子性,一次数据更新要么成功,要么失败。
  6. 实时性,在一定时间范围内,Client能读到最新数据。

ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个 ZNode。每一个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识。

应用场景:

Zookeeper能够提供的服务包括::统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下 线、软负载均衡等。

统一命名服务:在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。

统一配置管理:可以将配置信息写入一个Znode,各个客户端服务器监听这个Znode。

比如Kafka集群中,可以保证所有节点的配置信息是一致的。

统一集群管理:ZooKeeper可以实现实时监控节点状态变化;监听对应的ZNode可获取到对应的节点信息。

服务器动态上下线:客户端可以通过Zookeeper实时得知服务器上下线的变化。

软负载均衡:Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求

安装

镜像下载:/apache/zookeeper(tsinghua.edu.cn)

1
2
3
4
5
6
7
8
9
10
11
12
13
#解压
tar -zxvf apache-zookeeper-3.7.2-bin.tar.gz -C /opt/app/
#修改名称
mv apache-zookeeper-3.7.2 -bin/ zookeeper
#zookeeper目录下创建数据存放地
mkdir zkData
#修改conf目录下的zoo_sample.cfg
mv zoo_sample.cfg zoo.cfg
#修改配置文件
dataDir=/opt/app/zookeeper/zkData
server.1=localhost:2888:3888
#zkData目录下创建myid,官网指定,固定名称
touch myid

配置文件不一定要使用zoo.cfg的名称

myid文件中只需要添加与server对应的编号

配置参数解读:

  1. tickTime = 2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
  2. initLimit = 10:LF初始通信时限。Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)
  3. syncLimit = 5:LF同步通信时限;超过时限Leader就会认为Follwer宕机
  4. dataDir:保存Zookeeper中的数据
  5. clientPort = 2181:客户端连接端口,通常不做修改。
  6. server.A=B:C:D:A表示是第几号服务器,记录在myid文件中;B是服务器的地址;C是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;D是用来执行选举时服务器相互通信的端口。

启停指令

1
2
3
4
5
6
7
8
9
10
11
12
#启动
bin/zkServer.sh start
#查看java进程
jps
#查看状态
bin/zkServer.sh status
#启动客户端
bin/zkCli.sh
#退出客户端
quit
#关闭
bin/zkServer.sh stop

启动脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/bin/bash
case $1 in
"start"){
echo ---------- zookeeper 启动 ------------
/opt/app/zookeeper/bin/zkServer.sh start
};;
"stop"){
echo ---------- zookeeper 关闭 ------------
/opt/app/zookeeper/bin/zkServer.sh stop
};;
"status"){
echo ---------- zookeeper 状态 ------------
/opt/app/zookeeper/bin/zkServer.sh status
};;
esac

选举机制

Zookeeper 内部维护有一些信息来决定 Leader 的归属:

  • SID:服务器ID;用来唯一标识集群中的机器,不能重复,和myid一致
  • ZXID:事务ID;ZXID是一个事务ID,用来标识一次服务器状态的变更。

在某一时刻, 集群中的每台机器的ZXID值不一定完全一 致,这和ZooKeeper服务器对于客户端“更 新请求”的处理逻辑有关。

  • Epoch:每个Leader任期的代号;没有 Leader 时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加

首次启动

假设当前的集群由五台Zookeeper组成。

  1. 1号启动,发起第一次选举;优先投给自己,此时票数为1,选举无法完成(小于3),服务器状态保持为LOOKING;
  2. 2号启动,发起第二次选举;1号和2号各自投自己一票并交换选票信息,此时1号发现2号的myid比自己当前投票推举的(1号)更大,更改选票推举2号。此时1号0票,2号两票,选举无法完成,1号、2号保持LOOKING;
  3. 3号启动,发起第三次选举;根据之前的规律,选举完成后3号持有三票,3号成为Leader。1,2号状态为FOLLOWING,3号更改状态为LEADING;
  4. 4号启动,发起第四次选举,但此时1,2,3已经不是LOOKING状态,不会更改选票信息。结果为3号3票,4号1票。4号服从多数将更改选票为3号,状态变为FOLLOWING;
  5. 5号启动,成为FOLLOWING;

非首次启动

当集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:

  • 服务器初始化启动
  • 服务器运行期间无法和Leader保持连接

当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:

  • 集群中存在Leader;

选举阶段会被告知Leader的信息,仅需要和Leader建立连接,同步状态即可。

  • 集群中不存在Leader

Leader选举规则:

  1. EPOCH大的胜出
  2. EPOCH相同,事务id大的胜出
  3. 事务id相同,服务器id大的胜出

集群搭配推荐:10 台服务器:3 台 zk;20 台服务器:5 台 zk;100 台及以上服务器:11 台 zk;

服务器台数越多;好处:提高可靠性;坏处:提高通信延时

客户端操作命令

命令基本语法 功能描述
help 显示所有操作命令
ls [path] 使用 ls 命令来查看当前 znode 的子节点 [可监听]
-w 监听子节点变化
-s 附加次级信息
create 普通创建
-s 含有序列
-e 临时(重启或者超时消失)
get [path] 获得节点的值 [可监听]
-w 监听子节点变化
-s 附加次级信息
set 设置节点的具体值
stat 查看节点状态
delete 删除节点
deleteall 递归删除节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#创建临时目录
[zk: localhost:2181(CONNECTED) 2] create -e /test
Created /test
[zk: localhost:2181(CONNECTED) 3] ls -s /
[test, zookeeper]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0xd
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 2
  • cZxid:创建节点的事务 zxid
  • ctime:znode 被创建的毫秒数(从 1970 年开始)
  • mzxid:znode 最后更新的事务 zxid
  • mtime:znode 最后修改的毫秒数(从 1970 年开始)
  • pZxid:znode 最后更新的子节点 zxid
  • cversion:znode 子节点变化号,znode 子节点修改次数
  • dataversion:znode 数据变化号
  • aclVersion:znode 访问控制列表的变化号
  • ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是 临时节点则是 0。
  • dataLength:znode 的数据长度
  • numChildren:znode 子节点数量

节点类型

Zookeeper中的节点分为两种类型:

  • 持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除
    • 持久化目录节点
    • 持久化顺序编号目录节点
  • 短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除
    • 临时目录节点
    • 临时顺序编号目录节点

创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护

临时节点不能有孩子节点

在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。

1
2
3
4
5
6
7
8
#创建永久节点(不带序号)
create /test
#创建节点+绑定数据
create /test/city "hangzhou"
#获取节点数据
get /test/city
#修改节点数据
set /test/city "wuhan"

节点只能绑定一个数据

简单理解: 路径:数据相当于Java中Map的Key:Value

监听器

Zookeeper中内置两种监听指令:

  • get -w [path]:监听节点数据的变化
  • ls -w [path]:监听子节点增减的变化

原理:

  1. 客户端创建main()线程
  2. 在main线程中连接zookeeper客户端,再创建两个线程,一个负责网络连接(connet),一个负责监听(listener)
  3. 通过connet线程将注册的监听事件发送给Zookeeper
  4. 在Zookeeper的注册监听器列表中将注册的监听事件添加到列表中
  5. Zookeeper监听到有数据或路径变化,就会将这个消息发送给listener线程
  6. listener线程内部调用了process()方法

注意:调用一次监听指令(get/ls)只能监听一次变化

Java客户端调用

引入依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.2</version>
</dependency>

实践:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class ZookeeperTest {
private ZooKeeper zk;
// 集群可以使用,分隔
private final String connectString = "192.168.124.105:2181";
// 时间太短的话打断点会断开连接
private final int sessionTime = 20000;

public static void main(String[] args) throws Exception {
ZookeeperTest test = new ZookeeperTest();
// 建立连接
test.connect();
// 创建节点
System.out.println(test.create("/servers", "hello".getBytes()));
// 获取节点
System.out.println(test.getList("/"));
// 设置节点数据
System.out.println(test.setData("/servers", "hi"));
// 获取节点数据
System.out.println(test.getData("/servers"));
// 删除节点
test.delNode("/servers");

System.out.println(test.getList("/"));

test.close();
}

private void close() throws InterruptedException {
zk.close();
}

private void delNode(String path) throws InterruptedException, KeeperException {
zk.delete(path, -1);
}

private String getData(String path) throws InterruptedException, KeeperException {
return new String(zk.getData(path, false, null));
}

private Stat setData(String path, String data) throws InterruptedException, KeeperException {
// 这里的version是实现乐观锁的版本号,当版本号不匹配就不能成功更新
// 设置-1表示忽略版本号直接更新
return zk.setData(path, data.getBytes(), -1);
}

private List<String> getList(String path) throws InterruptedException, KeeperException {
return zk.getChildren(path, false);
}

private String create(String path, byte[] data) throws InterruptedException, KeeperException {
// 第三个参数表示权限控制,这里设置所有人都能操作
// 第四个参数表示生成模式,这里使用的是临时非有序
return zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}

private void connect() throws IOException, InterruptedException {
zk = new ZooKeeper(connectString, sessionTime, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("===========监听记录============");
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getState());
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getWrapper());
try {
// true表示使用默认的Watcher
System.out.println(zk.getChildren("/", true));
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println("===========监听结束============");
}
});
}
}

其实创建节点就相当于在Zookeeper中注册,当连接断开就表示服务停止了。

分布式锁案例

手动实现分布式锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class ZookeeperLock {
private ZooKeeper zk;
private final String connectString = "192.168.124.105:2181";
private final int sessionTime = 20000;
private String current;
private String watchNode = "";
private final CountDownLatch connect = new CountDownLatch(1);
private final CountDownLatch wait = new CountDownLatch(1);


public ZookeeperLock() {
connect();
}

private void connect() {
try {
zk = new ZooKeeper(connectString, sessionTime, (watchedEvent) -> {
if (watchedEvent.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
connect.countDown();
}
if (watchedEvent.getPath().equals(watchNode) && watchedEvent.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
wait.countDown();
}
});
// 确保成功连接Zookeeper
connect.await();
Stat exists = zk.exists("/locks", false);
if (exists == null) {
zk.create("/locks", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public void lock() throws Exception {
current = zk.create("/locks/task", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = zk.getChildren("/locks", false);
if (children.size() == 1) {
return;
}
// 得到需要监听的节点
long cur = Long.parseLong(current.substring("/locks/task".length()));
for (String temp : children) {
long watch = Long.parseLong(temp.substring("/task".length()));
if (watch < cur) {
watchNode = "/locks/" + temp;
}
}
if (Objects.equals(watchNode, "")) {
return;
}
zk.getData(watchNode, true, null);
// 确保监听完成
wait.await();
}

public void unlock() throws Exception {
zk.delete(current, -1);
}
}

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class LockTest {
public static void main(String[] args) {
ZookeeperLock lock1 = new ZookeeperLock();
ZookeeperLock lock2 = new ZookeeperLock();
new Thread(() -> {
try {
lock1.lock();
System.out.println("线程1上锁");
TimeUnit.SECONDS.sleep(3);
System.out.println("线程1解锁");
lock1.unlock();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).start();
new Thread(() -> {
try {
lock2.lock();
System.out.println("线程2上锁");
TimeUnit.SECONDS.sleep(3);
System.out.println("线程2解锁");
lock2.unlock();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).start();
}
}

Curator

原生JavaAPI开发存在的问题:

  1. 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
  2. Watch 需要重复注册,不然就不能生效
  3. 不支持多节点删除和创建

Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。

官方文档:Welcome to Apache Curator | Apache Curator

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>

Zookeeper 实现分布式锁的场景并不多,这里仅提供一个简单例子作为使用参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import
org.apache.curator.framework.recipes.locks.InterProcessLock;
import
org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest {
private String rootNode = "/locks";
// zookeeper server 列表
private String connectString =
"hadoop102:2181,hadoop103:2181,hadoop104:2181";
// connection 超时时间
private int connectionTimeout = 2000;
// session 超时时间
private int sessionTimeout = 2000;
public static void main(String[] args) {
new CuratorLockTest().test();
}
// 测试
private void test() {
// 创建分布式锁 1
final InterProcessLock lock1 = new
InterProcessMutex(getCuratorFramework(), rootNode);
// 创建分布式锁 2
final InterProcessLock lock2 = new
InterProcessMutex(getCuratorFramework(), rootNode);
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock1.acquire();
System.out.println("线程 1 获取锁");
// 测试锁重入
lock1.acquire();
System.out.println("线程 1 再次获取锁");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("线程 1 释放锁");
lock1.release();
System.out.println("线程 1 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock2.acquire();
System.out.println("线程 2 获取锁");
// 测试锁重入
lock2.acquire();
System.out.println("线程 2 再次获取锁");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("线程 2 释放锁");
lock2.release();
System.out.println("线程 2 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
// 分布式锁初始化
public CuratorFramework getCuratorFramework (){
//重试策略,初试时间 3 秒,重试 3 次
RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
//通过工厂创建 Curator
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectString)
.connectionTimeoutMs(connectionTimeout)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(policy).build();
//开启连接
client.start();
System.out.println("zookeeper 初始化完成...");
return client;
}
}

简单测试了一下可重入锁的实现

源码分析

继续学习:30_尚硅谷_zk_算法基础_拜占庭将军问题_哔哩哔哩_bilibili

上一篇:
Kafka
下一篇:
Redis系列第七章