zookeeper基础应用与实战

zookeeper基础应用与实战 1. Zookeeper命令操作1.1 Zookeeper 数据模型ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似拥有一个层次化结构。Zookeeper这里面的每一个节点都被称为 ZNode每个节点上都会保存自己的数据和节点信息。节点可以拥有子节点同时也允许少量1MB数据存储在该节点之下。节点可以分为四大类PERSISTENT 持久化节点EPHEMERAL 临时节点 -ePERSISTENT_SEQUENTIAL 持久化顺序节点 -sEPHEMERAL_SEQUENTIAL 临时顺序节点 -es1.2 Zookeeper服务端常用命令•启动 ZooKeeper 服务./zkServer.sh start•查看 ZooKeeper 服务状态./zkServer.sh status•停止 ZooKeeper 服务./zkServer.sh stop•重启 ZooKeeper 服务./zkServer.sh restart1.3 Zookeeper客户端常用命令1.3.1 基本CRUD连接Zookeeper客户端# 本地连接 zkCli.sh ​ # 远程连接 zkCli.sh -server ip:2181断开连接quit查看命令帮助help显示制定目录下节点# ls 目录 ls /创建节点# create /节点path value [zk: localhost:2181(CONNECTED) 0] ls / [zookeeper] [zk: localhost:2181(CONNECTED) 1] create /app1 msb123 Created /app1 [zk: localhost:2181(CONNECTED) 2] ls / [app1, zookeeper] [zk: localhost:2181(CONNECTED) 3] create /app2 Created /app2 [zk: localhost:2181(CONNECTED) 4] ls / [app1, app2, zookeeper]获取节点值# get /节点path [zk: localhost:2181(CONNECTED) 15] get /app1 msb123 [zk: localhost:2181(CONNECTED) 16] get /app2 null设置节点值# set /节点path value [zk: localhost:2181(CONNECTED) 17] set /app2 msb456 [zk: localhost:2181(CONNECTED) 18] get /app2 msb456删除单个节点# delete /节点path [zk: localhost:2181(CONNECTED) 19] delete /app2 [zk: localhost:2181(CONNECTED) 20] get /app2 Node does not exist: /app2 [zk: localhost:2181(CONNECTED) 21] ls / [app1, zookeeper]删除带有子节点的节点# deleteall /节点path [zk: localhost:2181(CONNECTED) 22] create /app1 Node already exists: /app1 [zk: localhost:2181(CONNECTED) 23] create /app1/p1 Created /app1/p1 [zk: localhost:2181(CONNECTED) 24] create /app1/p2 Created /app1/p2 [zk: localhost:2181(CONNECTED) 25] delete /app1 Node not empty: /app1 [zk: localhost:2181(CONNECTED) 26] deleteall /app1 [zk: localhost:2181(CONNECTED) 27] ls / [zookeeper]1.3.2 创建临时顺序节点创建临时节点 (-e)临时节点是在会话结束后自动被删除的# create -e /节点path value [zk: localhost:2181(CONNECTED) 29] create -e /app1 msb123 Created /app1 [zk: localhost:2181(CONNECTED) 30] get /app1 msb123 [zk: localhost:2181(CONNECTED) 31] quit # 退出后再次连接临时节点已经删除 [zk: localhost:2181(CONNECTED) 0] ls / [zookeeper]创建顺序节点 (-s)创建出的节点根据先后顺序会在节点之后带上一个数值越后执行数值越大适用于分布式锁的应用场景- 单调递增.# create -s /节点path value [zk: localhost:2181(CONNECTED) 0] ls / [zookeeper] [zk: localhost:2181(CONNECTED) 1] create -s /app2 Created /app20000000003 [zk: localhost:2181(CONNECTED) 2] ls / [app20000000003, zookeeper] [zk: localhost:2181(CONNECTED) 3] create -s /app2 Created /app20000000004 [zk: localhost:2181(CONNECTED) 4] ls / [app20000000003, app20000000004, zookeeper] [zk: localhost:2181(CONNECTED) 5] create -s /app2 Created /app20000000005 [zk: localhost:2181(CONNECTED) 6] ls / [app20000000003, app20000000004, app20000000005, zookeeper] # 创建临时顺序节点 [zk: localhost:2181(CONNECTED) 7] create -es /app3 Created /app30000000006 [zk: localhost:2181(CONNECTED) 8] ls / [app20000000003, app20000000004, app20000000005, app30000000006, zookeeper] # 退出 [zk: localhost:2181(CONNECTED) 9] quit # 重新链接临时顺序节点已经被删除 [zk: localhost:2181(CONNECTED) 0] ls / [app20000000003, app20000000004, app20000000005, zookeeper]查询节点详细信息# ls –s /节点path [zk: localhost:2181(CONNECTED) 5] ls / -s [app20000000003, app20000000004, app20000000005, zookeeper] cZxid 0x0 ctime Thu Jan 01 08:00:00 CST 1970 mZxid 0x0 mtime Thu Jan 01 08:00:00 CST 1970 pZxid 0x14 cversion 10 dataVersion 0 aclVersion 0 ephemeralOwner 0x0 dataLength 0 numChildren 4czxid节点被创建的事务IDctime: 创建时间mzxid: 最后一次被更新的事务IDmtime: 修改时间pzxid子节点列表最后一次被更新的事务IDcversion子节点的版本号dataversion数据版本号aclversion权限版本号ephemeralOwner用于临时节点代表临时节点的事务ID如果为持久节点则为0dataLength节点存储的数据的长度numChildren当前节点的子节点个数2. Zookeeper JavaAPI操作2.1 Curator介绍Curator是Netflix公司开源的一套zookeeper客户端框架Curator是对Zookeeper支持最好的客户端框架。Curator封装了大部分Zookeeper的功能比如Leader选举、分布式锁等减少了技术人员在使用Zookeeper时的底层细节开发工作。Curator框架主要解决了三类问题封装ZooKeeper Client与ZooKeeper Server之间的连接处理提供连接重试机制等。提供了一套Fluent风格的API并且在Java客户端原生API的基础上进行了增强创捷多层节点、删除多层节点等。提供ZooKeeper各种应用场景分布式锁、leader选举、共享计数器、分布式队列等的抽象封装。2.2 引入Curator创建maven项目引入依赖?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd modelVersion4.0.0/modelVersion groupIdcom.mashibing/groupId artifactIdzk-client1/artifactId version1.0-SNAPSHOT/version dependencies dependency groupIdjunit/groupId artifactIdjunit/artifactId version4.10/version scopetest/scope /dependency !--curator-- dependency groupIdorg.apache.curator/groupId artifactIdcurator-framework/artifactId version4.0.0/version /dependency dependency groupIdorg.apache.curator/groupId artifactIdcurator-recipes/artifactId version4.0.0/version /dependency !--日志-- dependency groupIdorg.slf4j/groupId artifactIdslf4j-api/artifactId version1.7.21/version /dependency dependency groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId version1.7.21/version /dependency /dependencies build plugins plugin groupIdorg.apache.maven.plugins/groupId artifactIdmaven-compiler-plugin/artifactId version3.1/version configuration source1.8/source target1.8/target /configuration /plugin /plugins /build /project2.3 建立连接方式1public class CuratorTest { /** * 建立连接 */ Test public void testConnect(){ /** * String connectString 连接字符串。 zk地址和端口 192.168.58.100:2181,192.168.58.101:2181 * int sessionTimeoutMs 会话超时时间 单位ms * int connectionTimeoutMs 连接超时时间 单位ms * RetryPolicy retryPolicy 重试策略 */ //1. 第一种方式 //重试策略 baseSleepTimeMs 重试之间等待的初始时间maxRetries 重试的最大次数 RetryPolicy retryPolicy new ExponentialBackoffRetry(3000,10); CuratorFramework client CuratorFrameworkFactory.newClient(192.168.58.100:2181, 60 * 1000, 15 * 1000, retryPolicy); //开启连接 client.start(); } }重试策略RetryNTimes 重试没有次数限制RetryOneTime只重试没有次数限制一般也不常用ExponentialBackoffRetry 只重试一次的重试策略方式2public class CuratorTest { private CuratorFramework client; /** * 建立连接 */ Test public void testConnect(){ /** * String connectString 连接字符串。 zk地址和端口 192.168.58.100:2181,192.168.58.101:2181 * int sessionTimeoutMs 会话超时时间 单位ms * int connectionTimeoutMs 连接超时时间 单位ms * RetryPolicy retryPolicy 重试策略 */ //1. 第一种方式 //重试策略 baseSleepTimeMs 重试之间等待的初始时间maxRetries 重试的最大次数 RetryPolicy retryPolicy new ExponentialBackoffRetry(3000,10); // client CuratorFrameworkFactory.newClient(192.168.58.100:2181, 60 * 1000, // 15 * 1000, retryPolicy); //2. 第二种方式建造者方式创建 client CuratorFrameworkFactory.builder() .connectString(192.168.58.100:2181) .sessionTimeoutMs(60*1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .namespace(mashibing) //根节点名称设置 .build(); //开启连接 client.start(); } }2.4 添加节点修改testConnect注解Before/** * 建立连接 */ Before public void testConnect()创建节点create 持久 临时 顺序 数据public class CuratorTest { /** * 创建节点 create 持久 临时 顺序 数据 */ //1.创建节点 Test public void testCreate1() throws Exception { // 如果没有创建节点没有指定数据则默认将当前客户端的IP 作为数据存储 String path client.create().forPath(/app1); System.out.println(path); } After public void close(){ client.close(); } }//2.创建节点 带有数据 Test public void testCreate2() throws Exception { String path client.create().forPath(/app2,hehe.getBytes()); System.out.println(path); }//3.设置节点类型 默认持久化 Test public void testCreate3() throws Exception { //设置临时节点 String path client.create().withMode(CreateMode.EPHEMERAL).forPath(/app3); System.out.println(path); }由于是临时节点需要打断点才能看到节点信息//1.查询数据 getData Test public void testGet1() throws Exception { byte[] data client.getData().forPath(/app1); System.out.println(new String(data)); } //2.查询子节点 getChildren() Test public void testGet2() throws Exception { ListString path client.getChildren().forPath(/); System.out.println(path); } //3.查询节点状态信息 Test public void testGet3() throws Exception { Stat status new Stat(); System.out.println(status); //查询节点状态信息 ls -s client.getData().storingStatIn(status).forPath(/app1); System.out.println(status); }2.5 修改节点//1. 基本数据修改 Test public void testSet() throws Exception { client.setData().forPath(/app1,hahaha.getBytes()); } //根据版本修改乐观锁 Test public void testSetVersion() throws Exception { //查询版本 Stat status new Stat(); //查询节点状态信息 ls -s client.getData().storingStatIn(status).forPath(/app1); int version status.getVersion(); System.out.println(version); //2 client.setData().withVersion(version).forPath(/app1,hehe.getBytes()); }2.6 删除节点//1.删除单个节点 Test public void testDelete1() throws Exception { client.delete().forPath(/app4); } //删除带有子节点的节点 Test public void testDelete2() throws Exception { client.delete().deletingChildrenIfNeeded().forPath(/app4); } //必须删除成功超时情况下重试删除 Test public void testDelete3() throws Exception { client.delete().guaranteed().forPath(/app2); } //回调 删除完成后执行 Test public void testDelete4() throws Exception { client.delete().guaranteed().inBackground((curatorFramework, curatorEvent) - { System.out.println(我被删除了); System.out.println(curatorEvent); }).forPath(/app1); }2.7 Watch事件监听ZooKeeper 允许用户在指定节点上注册一些Watcher并且在一些特定事件触发的时候ZooKeeper 服务端会将事件通知到感兴趣的客户端上去该机制是 ZooKeeper 实现分布式协调服务的重要特性。ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能能够让多个订阅者同时监听某一个对象当一个对象自身状态变化时会通知所有订阅者。2.7.1 zkCli客户端使用watch添加 -w 参数可实时监听节点与子节点的变化并且实时收到通知。非常适用保障分布式情况下的数据一至性。其使用方式如下命令描述ls -w path监听子节点的变化增删 [监听目录]get -w path监听节点数据的变化stat -w path监听节点属性的变化Zookeeper事件类型NodeCreated 节点创建NodeDeleted 节点删除NodeDataChanged节点数据变化NodeChildrenChanged子节点列表变化DataWatchRemoved节点监听被移除ChildWatchRemoved子节点监听被移除1get -w path 监听节点数据变化会话1会话2再回到会话一2 ls -w /path 监听子节点的变化增删 [监听目录]会话1会话2切到【会话一】 观察输出的监听日志当然了 delete 目录也会发生变化如果对节点数据内容ls -w 是收不到通知的只能通过 get -w来实现 。这里监听一点触发就失效了切记。3) ls -R -w /path 例子二 循环递归的监听2.7.2 curator客户端使用watchZooKeeper 原生支持通过注册Watcher来进行事件监听但是其使用并不是特别方便需要开发人员自己反复注册Watcher比较繁琐。Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。ZooKeeper提供了三种WatcherNodeCache : 只是监听某一个特定的节点PathChildrenCache : 监控一个ZNode的子节点.TreeCache : 可以监控整个树上的所有节点类似于PathChildrenCache和NodeCache的组合1watch监听 NodeCachepublic class CuratorWatchTest { /** * 演示 NodeCache : 给指定一个节点注册监听 */ Test public void testNodeCache() throws Exception { //1. 创建NodeCache对象 NodeCache nodeCache new NodeCache(client, /app1); //监听的是 /mashibing和其子目录app1 //2. 注册监听 nodeCache.getListenable().addListener(new NodeCacheListener() { Override public void nodeChanged() throws Exception { System.out.println(节点变化了。。。。。。); //获取修改节点后的数据 byte[] data nodeCache.getCurrentData().getData(); System.out.println(new String(data)); } }); //3. 设置为true开启监听 nodeCache.start(true); while(true){ } } }2watch监听 PathChildrenCache/** * 演示 PathChildrenCache: 监听某个节点的所有子节点 */ Test public void testPathChildrenCache() throws Exception { //1.创建监听器对象 (第三个参数表示缓存每次节点更新后的数据) PathChildrenCache pathChildrenCache new PathChildrenCache(client, /app2, true); //2.绑定监听器 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println(子节点发生变化了。。。。。。); System.out.println(pathChildrenCacheEvent); if(PathChildrenCacheEvent.Type.CHILD_UPDATED pathChildrenCacheEvent.getType()){ //更新子节点 System.out.println(子节点更新了); //在一个getData中有很多数据我们只拿data部分 byte[] data pathChildrenCacheEvent.getData().getData(); System.out.println(更新后的值为 new String(data)); }else if(PathChildrenCacheEvent.Type.CHILD_ADDED pathChildrenCacheEvent.getType()){ //添加子节点 System.out.println(添加子节点); String path pathChildrenCacheEvent.getData().getPath(); System.out.println(子节点路径为 path); }else if(PathChildrenCacheEvent.Type.CHILD_REMOVED pathChildrenCacheEvent.getType()){ //删除子节点 System.out.println(删除了子节点); String path pathChildrenCacheEvent.getData().getPath(); System.out.println(子节点路径为 path); } } }); //3. 开启 pathChildrenCache.start(); while(true){ } }事件对象信息分析PathChildrenCacheEvent{ typeCHILD_UPDATED, dataChildData { path/app2/m1, stat164,166,1670114647087,1670114698259,1,0,0,0,3,0,164, data[49, 50, 51] } }3watch监听 TreeCacheTreeCache相当于NodeCache只监听当前结点 PathChildrenCache只监听子结点的结合版即监听当前和子结点。/** * 演示 TreeCache: 监听某个节点的所有子节点 */ Test public void testCache() throws Exception { //1.创建监听器对象 TreeCache treeCache new TreeCache(client, /app2); //2.绑定监听器 treeCache.getListenable().addListener(new TreeCacheListener() { Override public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception { System.out.println(节点变化了); System.out.println(treeCacheEvent); if(TreeCacheEvent.Type.NODE_UPDATED treeCacheEvent.getType()){ //更新节点 System.out.println(节点更新了); //在一个getData中有很多数据我们只拿data部分 byte[] data treeCacheEvent.getData().getData(); System.out.println(更新后的值为 new String(data)); }else if(TreeCacheEvent.Type.NODE_ADDED treeCacheEvent.getType()){ //添加子节点 System.out.println(添加节点); String path treeCacheEvent.getData().getPath(); System.out.println(子节点路径为 path); }else if(TreeCacheEvent.Type.NODE_REMOVED treeCacheEvent.getType()){ //删除子节点 System.out.println(删除节点); String path treeCacheEvent.getData().getPath(); System.out.println(删除节点路径为 path); } } }); //3. 开启 treeCache.start(); while(true){ } }