ZooKeeper开发分布式系统,动态服务上下线感知

什么是ZooKeeper

ZooKeeper是一个分布式开源框架,提供了协调分布式应用的基本服务,它向外部应用暴露一组通用服务——分布式同步(Distributed Synchronization)、命名服务(Naming Service)、集群维护(Group Maintenance)等,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。ZooKeeper本身可以以单机模式安装运行,不过它的长处在于通过分布式ZooKeeper集群(一个Leader,多个Follower),基于一定的策略来保证ZooKeeper集群的稳定性和可用性,从而实现分布式应用的可靠性。

Zookeeper简介

  1. ZooKeeper是为别的分布式程序服务的
  2. ZooKeeper本身就是一个分布式程序(只要有半数以上节点存活,ZooKeeper就能正常服务)
  3. ZooKeeper所提供的服务涵盖:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统> 一名称服务等
  4. 虽然说可以提供各种服务,但是ZooKeeper在底层其实只提供了两个功能:
    1. 管理(存储,读取)用户程序提交的数据(类似namenode中存放的metadata)
    2. 为用户程序提供数据节点监听服务

ZooKeeper应用场景图

ZooKeeper集群机制

ZooKeeper集群的角色: Leader 和 follower

只要集群中有半数以上节点存活,集群就能提供服务。

ZooKeeper特性

  1. ZooKeeper:一个leader,多个follower组成的集群
  2. 全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的
  3. 分布式读写,更新请求转发,由leader实施
  4. 更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行
  5. 数据更新原子性,一次数据更新要么成功,要么失败
  6. 实时性,在一定时间范围内,client能读到最新数据

ZooKeeper的数据存储机制

数据存储形式

ZooKeeper中对用户的数据采用kv形式存储。 如果你想和更多ZooKeeper技术专家交流,可以加我微信liyingjiese,备注『加群』。群里每周都有全球各大公司的最佳实践以及行业最新动态

只是ZooKeeper有点特别:

key:是以路径的形式表示的,那就以为着,各key之间有父子关系,比如:

  • / 是顶层key
  • 用户建的key只能在/ 下作为子节点,比如建一个key: /aa 这个key可以带value数据
  • 也可以建一个key: /bb
  • 也可以建key: /aa/xx

ZooKeeper中,对每一个数据key,称作一个znode

综上所述,ZooKeeper中的数据存储形式如下:

znode类型

ZooKeeper中的znode有多种类型:

  1. PERSISTENT 持久的:创建者就算跟集群断开联系,该类节点也会持久存在与zk集群中
  2. EPHEMERAL 短暂的:创建者一旦跟集群断开联系,zk就会将这个节点删除
  3. SEQUENTIAL 带序号的:这类节点,zk会自动拼接上一个序号,而且序号是递增的

组合类型:

  • PERSISTENT :持久不带序号
  • EPHEMERAL :短暂不带序号
  • PERSISTENT 且 SEQUENTIAL :持久且带序号
  • EPHEMERAL 且 SEQUENTIAL :短暂且带序号

ZooKeeper的集群部署

集群选举示意图:

解压ZooKeeper安装包到apps目录下:

tar -zxvf zookeeper-3.4.6.tar.gz -C apps



cd /root/apps/zookeeper-3.4.6/conf

cp zoo_sample.cfg zoo.cfg

vi zoo.cfg

修改dataDir=/root/zkdata

在后面加上集群的机器:2888是leader和follower通讯端口,3888是投票的

server.1=hdp-01:2888:3888

server.2=hdp-02:2888:3888

server.3=hdp-03:2888:3888

对3台节点,都创建目录 mkdir /root/zkdata

对3台节点,在工作目录中生成myid文件,但内容要分别为各自的id: 1,2,3

echo 1 > /root/zkdata/myid

echo 2 > /root/zkdata/myid

echo 3 > /root/zkdata/myid

从hdp20-01上scp安装目录到其他两个节点

cd apps

scp -r zookeeper-3.4.6/ hdp-02:$PWD

scp -r zookeeper-3.4.6/ hdp-03:$PWD

启动ZooKeeper集群

ZooKeeper没有提供自动批量启动脚本,需要手动一台一台地起ZooKeeper进程

在每一台节点上,运行命令:

cd /root/apps/zookeeper-3.4.6

bin/zkServer.sh start

启动后,用jps应该能看到一个进程:QuorumPeerMain

但是,光有进程不代表zk已经正常服务,需要用命令检查状态:

bin/zkServer.sh status

能看到角色模式:为leader或follower,即正常了。

自己写个脚本,一键启动

vi zkmanage.sh



#!/bin/bash

for host in hdp-01 hdp-02 hdp-03

do

echo "${host}:$1ing....."

ssh $host "/root/apps/zookeeper-3.4.6/bin/zkServer.sh $1"

done

停止命令:sh zjmanage.sh stop

加个可执行权限:chmod +zkmanage.sh

启动命令:./zkmanage.sh start

但是出现没有Java环境变量问题,修改配置文件

vi zkmanage.sh

修改配置如下:

#!/bin/bash

for host in hdp-01 hdp-02 hdp-03

do

echo "${host}:$1ing....."

ssh $host "source /etc/profile;/root/apps/zookeeper-3.4.6/bin/zkServer.sh $1"

done



sleep 2

for host in hdp-01 hdp-02 hdp-03

do

ssh $host "source /etc/profile;/root/apps/zookeeper-3.4.6/bin/zkServer.sh status"

done

启动集群结果:

hdp-01:starting.....

JMX enabled by default

Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

hdp-02:starting.....

JMX enabled by default

Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

hdp-03:starting.....

JMX enabled by default

Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

JMX enabled by default

Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg

Mode: follower

JMX enabled by default

Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg

Mode: leader

JMX enabled by default

Using config: /root/apps/zookeeper-3.4.6/bin/../conf/zoo.cfg

Mode: follower

ZooKeeper的Java客户端操作代码:

public class ZookeeperCliDemo {

ZooKeeper zk =null;

@Before

public void init() throws Exception {

   zk=new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);

}

@Test

public void testCreate() throws Exception {

    //参数1:要创建的节点路径;参数2:数据;参数3:访问权限;参数4:节点类型

    String create = zk.create("/eclipse", "hello eclipse".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

    System.out.println(create);

    zk.close();

}



@Test

public void testUpdate() throws Exception {

    //参数1:节点路径;参数2:数据;参数3:所要修改的版本,-1表示任意版本

    zk.setData("/eclipse","我喜欢青青".getBytes(),-1);

    zk.close();

}

@Test

public void testGet() throws Exception {

   //参数1:节点路径;参数2:事件监听;参数3:所要修改的版本,null表示最新版本

    byte[] data = zk.getData("/eclipse", false, null);

    System.out.println(new String(data,"UTF-8"));

    zk.close();

}



@Test

public void testListChildren() throws KeeperException, InterruptedException {

    //参数1:节点路径;参数2:是否要监听

    //注意:返回的结果只有子节点的名字,不带全路径

    List children = zk.getChildren("/cc", false);

    for(String child:children){

        System.out.println(child);

    }

    zk.close();

}



@Test

public void testRm() throws KeeperException, InterruptedException {

    zk.delete("/eclipse",-1);

    zk.close();

}

} 

ZooKeeper监听功能代码:

public class ZookeeperWatchDemo {

ZooKeeper zk =null;

@Before

public void init() throws Exception {

    zk=new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000,  new Watcher() {

        public void process(WatchedEvent watchedEvent) {

            if (watchedEvent.getState() == Event.KeeperState.SyncConnected

                    && watchedEvent.getType() == Event.EventType.NodeDataChanged) {

                System.out.println("收到事件所发生节点的路径" + watchedEvent.getPath());

                System.out.println("收到事件所发生节点的状态" + watchedEvent.getState());

                System.out.println("收到事件所发生节点的类型" + watchedEvent.getType());

                System.out.println("watch事件通知。。换照片");

                try {

                    zk.getData("/mygirls", true, null);

                } catch (Exception e) {

                    e.printStackTrace();

                }

            }else if(watchedEvent.getState()==Event.KeeperState.SyncConnected &&

                    watchedEvent.getType()==Event.EventType.NodeChildrenChanged){

                System.out.println("收到事件所发生节点的路径" + watchedEvent.getPath());

                System.out.println("收到事件所发生节点的状态" + watchedEvent.getState());

                System.out.println("收到事件所发生节点的类型" + watchedEvent.getType());



            }

        }

    });

}



@Test

public void testGetWatch() throws Exception {

    byte[] data = zk.getData("/mygirls",true, null);

    List children = zk.getChildren("/mygirls", true);

    System.out.println(new String(data,"UTF-8"));

    Thread.sleep(Long.MAX_VALUE);

}

} 

ZooKeeper开发分布式系统案例代码,动态上下线感知。

服务代码:

public class TimeQueryServer {

ZooKeeper zk=null;

public void connectZk()throws Exception{

    zk=new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);

}



public void registerServerInfo(String hostname,String port)throws Exception{

    /**

     * 先判断注册节点的父节点是否存在,如果不存在,则创建持久节点

     */

    Stat exists = zk.exists("/servers", false);

    if(exists==null){

        zk.create("/servers",null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);

    }

    /**

     * 注册服务器数据到zk的约定注册节点下

     */

    String create = zk.create("/servers/server", (hostname + ":" + port).getBytes(),

            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

    System.out.println(hostname+" 服务器向zk 注册成功,注册节点为:/servers"+create);

}

public static void main(String[] args) throws Exception {

    //1.构造zk连接

    TimeQueryServer timeQueryServer = new TimeQueryServer();

    timeQueryServer.connectZk();

    //2.注册服务器信息

    timeQueryServer.registerServerInfo("192.168.150.3","44772");

    //3.启动业务线程开始处理业务

    new TimeQueryService(44772).start();

}

} 
public class TimeQueryService extends Thread {

int port=0;

public TimeQueryService(int port){

    this.port=port;

}

@Override

public void run() {

    try {

        ServerSocket ss = new ServerSocket(port);

        System.out.println("业务线程已经绑定端口"+port+"开始接受客户端请求..");

        while (true){

            Socket sc = ss.accept();

            InputStream inputStream = sc.getInputStream();

            OutputStream outputStream = sc.getOutputStream();

            outputStream.write(new Date().toString().getBytes());

        }

    } catch (Exception e) {

        e.printStackTrace();

    }

}

} 

消费者代码:

public class Consumer {



//定义一个list用于存放在线的服务器列表

private volatile ArrayListonlineServers=new ArrayList();

ZooKeeper zk=null;

public void connectZk()throws Exception{

    zk=new ZooKeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new Watcher() {

        public void process(WatchedEvent watchedEvent) {

            if (watchedEvent.getState()==Event.KeeperState.SyncConnected && watchedEvent.getType()==Event.EventType.NodeChildrenChanged){

                try{

                    //事件回调逻辑中,再次查询zk上在线服务器节点即可,查询逻辑中又再次注册子节点变化事件监听

                    getOnlineServers();

                }catch (Exception e){

                    e.printStackTrace();

                }

            }

        }

    });

}

//查询在线服务器列表

public void getOnlineServers()throws Exception{

    List children = zk.getChildren("/servers", true);

    ArrayList servers = new ArrayList();

    for (String child:children){

        byte[] data = zk.getData("/servers/" + child, false, null);

        String serverInfo=new String(data);

        servers.add(serverInfo);

    }

    onlineServers=servers;

    System.out.println("查询了一次zk,当前在线的服务器有:"+servers);



}



public void setRequest() throws Exception {

    Random random = new Random();

    while (true){

        try {

            int nextInt=random.nextInt(onlineServers.size());

            String server=onlineServers.get(nextInt);

            String hostname=server.split(":")[0];

            int port=Integer.parseInt(server.split(":")[1]);

            System.out.println("本次请求挑选的服务器为:"+server);



            Socket socket = new Socket(hostname, port);

            OutputStream out = socket.getOutputStream();

            InputStream in = socket.getInputStream();

            out.write("hahaha".getBytes());

            out.flush();



            byte[] buf = new byte[256];

            int read=in.read(buf);

            String s = new String(buf, 0, read);

            System.out.println("服务器响应时间为:"+s);

            out.close();

            in.close();

            socket.close();

            Thread.sleep(2000);

        }catch (Exception e){



        }



    }

}

public static void main(String[] args) throws Exception {

    //构造zk连接对象

    Consumer consumer = new Consumer();

    consumer.connectZk();

    //查询在线服务器列表

    consumer.getOnlineServers();

    //处理业务

    consumer.setRequest();

}

} 

pom


    

        junit

        junit

        RELEASE

    

    

        org.apache.logging.log4j

        log4j-core

        2.8.2

    

    

    

        org.apache.zookeeper

        zookeeper

        3.4.10

    


启动多个服务。

控制台输出:

192.168.150.3 服务器向zk 注册成功,注册节点为:/servers/servers/server0000000018

业务线程已经绑定端口44772开始接受客户端请求..

192.168.150.3 服务器向zk 注册成功,注册节点为:/servers/servers/server0000000019

业务线程已经绑定端口44773开始接受客户端请求..

192.168.150.3 服务器向zk 注册成功,注册节点为:/servers/servers/server0000000020

业务线程已经绑定端口44774开始接受客户端请求..

消费者启动

控制台输出:

查询了一次zk,当前在线的服务器有:[192.168.150.3:44773, 192.168.150.3:44772, 192.168.150.3:44774]

本次请求挑选的服务器为:192.168.150.3:44772

服务器响应时间为:Mon Jun 03 20:03:21 CST 2019

本次请求挑选的服务器为:192.168.150.3:44773

服务器响应时间为:Mon Jun 03 20:03:23 CST 2019

本次请求挑选的服务器为:192.168.150.3:44773

服务器响应时间为:Mon Jun 03 20:03:25 CST 2019

本次请求挑选的服务器为:192.168.150.3:44772

服务器响应时间为:Mon Jun 03 20:03:27 CST 2019

下线一个服务后,控制台输出:

查询了一次zk,当前在线的服务器有:[192.168.150.3:44773, 192.168.150.3:44772]

本次请求挑选的服务器为:192.168.150.3:44773

服务器响应时间为:Mon Jun 03 20:04:19 CST 2019

本次请求挑选的服务器为:192.168.150.3:44773

服务器响应时间为:Mon Jun 03 20:04:21 CST 2019

本次请求挑选的服务器为:192.168.150.3:44773

服务器响应时间为:Mon Jun 03 20:04:23 CST 2019

本次请求挑选的服务器为:192.168.150.3:44773

原文链接: https://my.oschina.net/u/3995125/blog/3057475