ZooKeeper-3.3 获取管理权- 高飞网

3.3 获取管理权

2018-05-30 10:52:29.0

3.3.1 同步方式竞选群首

/**
 * @author xuyanhua
 * @description:
 * @date 2018/5/29 下午4:09
 */
public class Master {
    ZKClient zkClient = ZKClient.instance();
    Random random = new Random();
    String serverId = Integer.toHexString(random.nextInt());
    static Logger logger = LoggerFactory.getLogger(Master.class);
    final static String MASTER_PATH = "/master";
    boolean isLeader = false;

    public static void main(String[] args) throws InterruptedException {
        Master master = new Master();
        master.runForMaster(master.serverId);
        logger.info("isLeader-->" + master.isLeader);
        Thread.sleep(60000);
    }

    /**
     * 执行群首选举操作
     *
     * @param serverId
     * @return
     */
    void runForMaster(String serverId) throws InterruptedException {
        while (true) {
            try {
                logger.info("try election leader .");
                zkClient.create(MASTER_PATH, serverId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                //创建成功,成为群首
                isLeader = true;
                break;
            } catch (KeeperException.NodeExistsException e) {
                //如果群首节点存在,返回false
                isLeader = false;
                break;
            } catch (KeeperException.ConnectionLossException e) {

            } catch (Exception e) {
                e.printStackTrace();
            }
            //如果出现这种异常,要检查是否已有群首
            if (checkExistsMaster(serverId)) {
                //如果存在群首,就退出
                break;
            }
            //如果群首不存在,继续竞选群首
        }

    }

    /**
     * 检查有没有群首
     *
     * @param serverId
     * @return
     */
    boolean checkExistsMaster(String serverId) throws InterruptedException {
        try {
            Stat stat = new Stat();
            byte[] data = zkClient.getData(MASTER_PATH, false, stat);
            this.isLeader = serverId.equals(new String(data));
            return true;
        } catch (KeeperException.NoNodeException e) {
            return false;
        } catch (Exception e) {

        }
        //出现其他异常时继续检查
        return checkExistsMaster(serverId);
    }
}

由于Master选主时,也要满足当前应用下线时自动踢出,因此要创建临时节点;

这里对权限进行了弱化处理,使用ZooDefs.Ids.OPEN_ACL_UNSAFE;

这里通过一个随机数,来代表一个唯一的应用节点,现实中,可能是一个ip,或应用标识。

有两种异常需要特别说明:

KeeperException和InterruptedException:对于这两种异常,create方法可能执行成功了,所以需要进一步处理。

对于ConnectionLossException,KeeperException的子异常,通常是客户端与服务端失去连接时。多数是网络原因。这种情况下,客户端并不知道是在ZooKeeper服务器处理前丢失了消息,还是在处理后客户端未收到响应消息。

中断异常InterruptedException,源于客户端线程调用了Thread.interrupt,通常是因为应用程序部分关闭,但还在被其他相关应用的方法使用。从字面来看这个异常,进程会中断本地客户端的请求处理过程,并使该请求处理未知状态。

3.3.2 异步方式竞选群首

异步方式代码:

public class AsynMaster {
    ZKClient zkClient = ZKClient.instance();
    Random random = new Random();
    String serverId = Integer.toHexString(random.nextInt());
    static Logger logger = LoggerFactory.getLogger(AsynMaster.class);
    final static String MASTER_PATH = "/master";
    boolean isLeader = false;

    public static void main(String[] args) throws InterruptedException {
        AsynMaster master = new AsynMaster();
        master.runForMaster();
        Thread.sleep(60000);
    }

    /**
     * 执行群首选举操作
     *
     * @return
     */
    void runForMaster() {
        logger.info("try election leader .");
        AsyncCallback.StringCallback sc = new AsyncCallback.StringCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                switch (KeeperException.Code.get(rc)) {
                    case CONNECTIONLOSS:
                        checkExistsMaster();
                        return;
                    case OK:
                        isLeader = true;
                        break;
                    default:
                        isLeader = false;
                }
                logger.info("isLeader-->" + isLeader);

            }
        };
        zkClient.create(MASTER_PATH, serverId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, sc, null);
    }


    /**
     * 检查有没有群首
     *
     * @return
     */
    void checkExistsMaster() {
        AsyncCallback.DataCallback cb = new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                switch (KeeperException.Code.get(rc)) {
                    case CONNECTIONLOSS:
                        checkExistsMaster();
                        break;
                    case NONODE:
                        runForMaster();
                        return;
                    default:
                        ;
                }
            }
        };
        zkClient.getData(MASTER_PATH, false, cb, null);
    }
}

    在ZooKeeper中,所有同步调用方法都有对应的异步调用方法。通过异步调用,可以在单线程中同时进行多个调用,同时也可以简化实现方式。因为是异步调用的,不用关心异常,取而代之的是回调的错误代码。回调方法processResult中,rc是返回代码,path是传给create的path参数值,name是创建的znode节点名称(当节点是有序的时候,和path不同)。

    注:因为只有一个单独的线程处理所有回调调用,如果回调函数阻塞,所有后续回调调用都会被阻塞。有时,在回调中使用同步是合法的,但应尽量避免,以便后续回调调用可以快速被处理。

3.3.3 设置元数据

    void bootstrap() {
        //工作进程
        this.createParent("/workers", new byte[0]);
        //
        this.createParent("/assign", new byte[0]);
        //任务进程
        this.createParent("/tasks", new byte[0]);
        //任务完成状态进程
        this.createParent("/status", new byte[0]);
    }

    void createParent(String path, final byte[] data) {
        zkClient.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new AsyncCallback.StringCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                switch (KeeperException.Code.get(rc)) {
                    case CONNECTIONLOSS:
                        //重试
                        createParent(path, (byte[]) ctx);
                        break;
                    case OK:
                        logger.info("Parent created.");
                        break;
                    case NODEEXISTS:
                        logger.warn("Parent already registered:" + path);
                        break;
                    default:
                        logger.error("Something went wrong:", KeeperException.create(KeeperException.Code.get(rc), path));
                }
            }
        }, data);
    }