ZooKeeper-第8章 Curator:ZooKeeper API的高级封装库 - 高飞网

第8章 Curator:ZooKeeper API的高级封装库

2018-06-04 23:52:03.0

    Curator作为ZooKeeper的一个尝试封装库,为开发人员封装了ZooKeeper的一组开发库,其核心目标是为用户管理ZooKeeper的相关操作,将连接管理的复杂操作部分隐藏起来。

    Curator为开发人员实现了一组常用的管理操作列表,同时结合开发过程中的最佳实践和常见的边际情况的处理。

8.1 Curator客户端程序

创建客户端实例

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)

    其中connectString输入参数为我们将要连接的ZooKeeper服务器列表,retryPolicy参数为Curator提供的新特性,通过这个参数,开发人员可以指定对于失去连接事件重试操作的处理策略。而之前常规的ZooKeeper接口的开发中,在发生连接丢失时,往往需要再次提交操作请求。

    注意:实例化CuratorFramework类作为客户端,而在工厂类中还提供了其他方法来创建实例。其中有一个CuratorZooKeeperClient类,该类在ZooKeeper客户端实例上提供了某些附加功能。

8.2 流量式API

    使用链式API创建节点。

public class CuratorTest1 {
    public static void main(String[] args) throws Exception {
        String path = "/curators";
        String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
        client.start();
        client.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path, "".getBytes());
        client.setData().forPath(path, "hello".getBytes());
        client.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path + "/test1","".getBytes());
        client.delete().forPath(path + "/test1");
        client.delete().forPath(path);
        Thread.sleep(10000);
        client.close();
    }
}

    以上调用会立刻返回,我们需要创建一个或多个监听器来接收znode节点创建后的返回结果。

异步调用回调

BackgroundCallback bc = new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                int rc = event.getResultCode();
                System.out.println(KeeperException.Code.get(rc));
            }
        };
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("任务完成");
            }
        });
        client.create().withMode(CreateMode.EPHEMERAL).inBackground(bc, executor).forPath(path, new byte[1024]);

8.3 监听器

    监听器(listener)负责处理Curator库所产生的事件,使用这种机制时,应用程序会实现一个或多个监听器,并将这些监听器注册到Curator框架客户端实例中,当有事件发生时,这些事件会传递给所有已注册的监听器。(类似于ZooKeeper原生客户端中的watcher监视器)

CuratorListener listener = new CuratorListener() {

            @Override
            public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("发生了事件:" + event);
            }
        };
        client.getCuratorListenable().addListener(listener);

    还有一类比较特殊的监听器,这类监听器负责处理后台工作线程捕获的异常时的错误报告,这类监听器提供了底层细节的处理。

//错误监听器
        UnhandledErrorListener errorListener = new UnhandledErrorListener() {
            @Override
            public void unhandledError(String message, Throwable e) {
                System.out.println("出错了:" + message + ":" + e.getStackTrace());
            }
        };
        client.getUnhandledErrorListenable().addListener(errorListener);


8.4 Curator中的状态的转换

    在Curtator中暴露了与ZooKeeper不同的一组状态,比如SUSPENDED状态(暂停),还有Curator使用LOST来表示会话过期的状态。

    当进入暂停状态时(相当于连接丢失),客户端最好能暂停一些请求。以防止别的客户端此时获取了主节点权限,造成事务重复。

    此外还有个状态是READ_ONLY状态,当进入只读模式时,该服务器会因隔离问题而无法与其他服务器形成仲裁的最低法定数量,客户端也将漏掉此时发生的任何更新操作。

8.5 两种边界情况

    有序节点的情况

    如果客户端所连接的服务器崩溃了,但还没来得及返回客户端所创建的有序节点的节点名称(即节点的序号),或者客户端只是连接丢失,客户端没接收到所请求的操作的响应信息,结果,客户端并不知道所创建的znode节点路径名称。为了解决这个问题,CreateBuilder提供了个withProtection方法来通知Curator客户端,在创建的有序节点前添加一个唯一标识符(uuid),如果create操作失败了,客户端会重试,重试前会验证是否存在一个节点包含这个唯一标识符。

    删除节点的保障

    在进行delete操作时,如果客户端在执行delete操作时,与服务器之间的连接丢失,客户端并不知道delete操作是否成功执行。如果这个节点代表了重要的含义,如表示一个资源的锁,此时删除与否非常重要。只要使用DeleteBuilder接口中的guranteed方法即可。这个保证会一直重试,直到成功。

8.6 菜谱

8.6.1 群首闩

public class LeaderLatchTest {
    private final static String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

    public static void main(String[] args) throws Exception {
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 10);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retry);
        client.start();
        LeaderLatch latch = new LeaderLatch(client, "/curator/master");
        LeaderLatchListener listener = new LeaderLatchListener() {
            @Override
            public void isLeader() {
                System.out.println("成为群首");
            }

            @Override
            public void notLeader() {
                System.out.println("未成为群首");
            }
        };
        latch.addListener(listener);
        latch.start();
        Thread.sleep(100000);
        client.close();

    }
}


8.6.2 群首选举器

    选举主节点还可以使用另一个LeaderSelector。它与LeaderLatch的主要区别在于使用的监听器接口不同,其中LeaderSelector使用了LeaderSelectorListener接口。

public class LeaderSelectorTest {
    private final static String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

    public static void main(String[] args) throws Exception {
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 10);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retry);
        client.start();

        LeaderSelectorListener listener = new LeaderSelectorListener() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("listener:" + newState);
            }

            @Override
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println("takeLeadership");
            }
        };
        LeaderSelector selector = new LeaderSelector(client, "/curator/master", listener);
        selector.start();
        Thread.sleep(100000);
        client.close();

    }
}

8.6.3 子节点缓存器


public class PathChildrenCachedTest {
    private final static String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

    public static void main(String[] args) throws Exception {
        ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 10);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retry);
        client.start();

        final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/curator/children", true);


        PathChildrenCacheListener listener = new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                boolean change = false;
                if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ||
                        event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED ||
                        event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {
                    change = true;
                } else {
                    return;
                }
                if (change) {
                    System.out.println(event.getType() + ":" + event.getData());
                }
            }
        };
        pathChildrenCache.getListenable().addListener(listener);
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        Thread.sleep(100000);
        client.close();
    }
}