zookeeper的一篇概述
2010 年 6 月 6 日
之前在公司由于业务需要,对zookeeper进行了一些知识点的梳理进行分享,对一些刚刚接触zookeeper的小伙伴来说,或许可以借鉴一下
一、ZOOKEEPER介绍
简介
Zookeeper致力于提供一个 高性能 、 高可用 ,且具备 严格的顺序访问 控制能力的分布式协调服务。
设计目标
- 简单的数据结构: 共享的树形结构,类似文件系统,存储于内存;
- 可以构建集群: 避免单点故障,3-5台机器就可以组成集群,超过半数,正常工作就能对外提供服务;
- 顺序访问: 对于每个写请求,zk会分配一个全局唯一的递增编号,利用 这个特性可以实现高级协调服务;
- 高性能: 基于内存操作,服务于非事务请求,适用于读操作为主的业务 场景。3台zk集群能达到13w QPS;
应用场景
- 数据发布订阅
- 负载均衡
- 命名服务
- Master选举
- 集群管理
- 配置管理
- 分布式队列
- 分布式锁
二、zookeeper特性
会话(session):客户端与服务端的一次会话连接,本质是TCP长连接,通过会话可以进行心跳检测和数据传输;
数据节点(znode)
- 持久节点(PERSISTENT)
- 持久顺序节点(PERSISTENT_SEQUENTIAL)
- 临时节点(EPHEMERAL)
- 临时顺序节点(EPHEMERAL_SEQUENTIAL)
对于持久节点和临时节点,同一个znode下,节点的名称是唯一的:[center red 20px]
Watcher 事件监听器:客户端可以在节点上注册监听器,当特定的事件发生后,zk会通知到感兴趣的客户端。
EventType: NodeCreated、NodeDeleted、NodeDataChanged、NodeChildrenChange
ACL:Zk采用ACL(access control lists)策略来控制权限
权限类型:create,read,write,delete,admin
三、zookeeper常用命令
- 启动ZK服务: bin/zkServer.sh start
- 查看ZK服务状态:bin/zkServer.sh status
- 停止ZK服务: bin/zkServer.sh stop
- 重启ZK服务: bin/zkServer.sh restart
- 客户端连接:zkCli.sh -server 127.0.0.1:2181
- 显示目录:ls /
- 创建:create /zk “test”
- 获得值:get /zk
- 修改值:set /zk “test”
- 删除:delete /zk
- ACL:
- getAcl / setAcl
- addauth
四、zookeeper的java客户端
org.apache.curator curator-framework 2.12.0 org.apache.curator curator-recipes 2.12.0
public class App { public static void main(String[] args) throws Exception { String connectString = "211.159.174.226:2181"; RetryPolicy retryPolicy = getRetryPolicy(); CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, 5000, 5000, retryPolicy); client.start(); //增删改查 client.create().withMode(CreateMode.PERSISTENT).forPath("/test-Curator-PERSISTENT-nodata"); client.create().withMode(CreateMode.PERSISTENT).forPath("/test-Curator-PERSISTENT-data", "test-Curator-PERSISTENT-data".getBytes()); client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-Curator-EPHEMERAL-nodata"); client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-Curator-EPHEMERAL-data", "/test-Curator-EPHEMERAL-data".getBytes()); for (int i = 0; i < 5; i++) { client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test-Curator-PERSISTENT_SEQUENTIAL-nodata"); } byte[] bytes = client.getData().forPath("/test-Curator-PERSISTENT-data"); System.out.println("----------zk节点数据:" + new String(bytes) + "------------"); client.create().withMode(CreateMode.PERSISTENT).forPath("/test-listener", "test-listener".getBytes()); final NodeCache nodeCache = new NodeCache(client, "/test-listener"); nodeCache.start(); NodeCacheListener listener = new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("node changed : " + nodeCache.getCurrentData()); } }; nodeCache.getListenable().addListener(listener); client.setData().forPath("/test-listener", "/test-listener-change".getBytes()); } /** * RetryOneTime: 只重连一次. * RetryNTime: 指定重连的次数N. * RetryUtilElapsed: 指定最大重连超时时间和重连时间间隔,间歇性重连直到超时或者链接成功. * ExponentialBackoffRetry: 基于"backoff"方式重连,和RetryUtilElapsed的区别是重连的时间间隔是动态的 * BoundedExponentialBackoffRetry: 同ExponentialBackoffRetry,增加了最大重试次数的控制. */ public static RetryPolicy getRetryPolicy() { return new ExponentialBackoffRetry(1000, 3); } }
五、分布式锁
public class ZookeeperLock { private final String lockPath = "/distributed-lock"; private String connectString; private RetryPolicy retry; private CuratorFramework client; private InterProcessLock interProcessMutex; public void init() throws Exception { connectString = "211.159.174.226:2181"; retry = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry); client.start(); //共享可重入锁 interProcessMutex = new InterProcessMutex(client,lockPath); } public void lock(){ try { interProcessMutex.acquire(); } catch (Exception e) { System.out.println("锁失败了,真惨"); } } public void unlock(){ try { interProcessMutex.release(); } catch (Exception e) { System.out.println("释放失败了,更惨"); } } public static void main(String[] args) throws Exception { final ZookeeperLock zookeeperLock = new ZookeeperLock(); zookeeperLock.init(); Executor executor = Executors.newFixedThreadPool(5); for (int i = 0;i<50;i++) { executor.execute(new Runnable() { @Override public void run() { zookeeperLock.lock(); Long time = System.nanoTime(); System.out.println(time); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(time); zookeeperLock.unlock(); } }); } while (true){ } } }
六、zab协议
- ZAB协议所定义的三种节点状态
- Looking :选举状态。
- Following :Follower节点(从节点)所处的状态。
- Leading :Leader节点(主节点)所处状态。
- Zxid(64位的数据结构)
前32位:Leader 周期编号 myid
低32位:事务的自增序列(单调递增的序列)只要客户端有请求,就+1
当产生新Leader的时候,就从这个Leader服务器上取出本地log中最大事务zxid,从里面读出epoch+1,作为一个新epoch,并将低32位置0(保证id绝对自增)。 - 崩溃恢复
- 每个server都有一张选票,选票投自己。
- 搜集各个服务器的投票。
- 比较投票,比较逻辑:优先比较zxid,然后才比较myid。
- 改变服务器状态(崩溃恢复=》数据同步,或者崩溃恢复=》消息广播)
- 消息广播(类似2P提交):
- Leader接受请求后,讲这个请求赋予全局的唯一64位自增Id(zxid)。
- 将zxid作为议案发给所有follower。
- 所有的follower接受到议案后,想将议案写入硬盘后,马上回复Leader一个ACK(OK)。
- 当Leader接受到合法数量Acks,Leader给所有follower发送commit命令。
- follower执行commit命令。
- PS: :到了这个阶段,ZK集群才正式对外提供服务,并且Leader可以进行消息广播,如果有新节点加入,还需要进行同步。