这篇文章是让大家了解Zookeeper基于Java客户端Curator的基本操作,以及如何使用Zookeeper解决实际问题。
Zookeeper基于Java访问 针对zookeeper,比较常用的Java客户端有zkclient、curator。由于Curator对于zookeeper的抽象层次比较高,简化了zookeeper客户端的开发量。使得curator逐步被广泛应用。
封装zookeeper client与zookeeper server之间的连接处理
提供了一套fluent风格的操作api
提供zookeeper各种应用场景(共享锁、leader选举)的抽象封装
依赖jar包 <dependency > <groupId > org.apache.curator</groupId > <artifactId > curator-framework</artifactId > <version > 5.2.0</version > </dependency >
建立连接 curator提供了两种操作方式来进行操作,一种是Fluent风格,另外一种就是普通的方法调用风格
public class CuratorMain { public static void main (String[] args) throws Exception { CuratorFramework curatorFramework= CuratorFrameworkFactory.newClient("192.168.221.128:2181" ,5000 ,20000 , new ExponentialBackoffRetry(1000 ,3 )); curatorFramework.start(); curatorFramework.blockUntilConnected(); System.out.println("zookeeper starter success" ); String data=new String(curatorFramework.getData().forPath("/pr" )); System.out.println("输出结果:" +data); } }
重试策略: Curator内部实现的几种重试策略:
ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加,时间间隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
RetryNTimes:指定最大重试次数的重试策略
RetryOneTime:仅重试一次
RetryUntilElapsed:一直重试直到达到规定的时间
namespace: 值得注意的是session会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对/curator目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离
public static void main (String[] args) throws Exception { CuratorFramework curatorFramework=CuratorFrameworkFactory.builder() .connectString("192.168.221.128:2181" ) .sessionTimeoutMs(5000 ).connectionTimeoutMs(20000 ) .retryPolicy(new ExponentialBackoffRetry(1000 ,3 )) .namespace("curator" ).build(); curatorFramework.start(); String data=new String(curatorFramework.getData().forPath("/pr" )); System.out.println("输出结果:" +data); }
节点的增删改查 下面代码演示了Curator访问Zookeeper实现数据的增删改查功能。
public class CuratorMain { private final CuratorFramework curatorFramework; public CuratorMain () { curatorFramework=CuratorFrameworkFactory.builder() .connectString("192.168.221.128:2181" ) .sessionTimeoutMs(5000 ).connectionTimeoutMs(20000 ) .retryPolicy(new ExponentialBackoffRetry(1000 ,3 )) .namespace("curator" ).build(); curatorFramework.start(); } public void nodeCRUD () throws Exception { System.out.println("开始创建节点" ); String node=curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/node" ); System.out.println("节点创建成功:" +node); Stat stat=new Stat(); curatorFramework.getData().storingStatIn(stat).forPath(node); System.out.println("查询节点:" +node+"信息,stat:" +stat.toString()); stat=curatorFramework.setData().withVersion(stat.getVersion()).forPath(node,"Hello World" .getBytes()); String result=new String(curatorFramework.getData().forPath(node)); System.out.println("修改节点后的数据信息:" +result); System.out.println("开始删除节点" ); curatorFramework.delete().forPath(node); Stat exist=curatorFramework.checkExists().forPath(node); if (exist==null ){ System.out.println("节点删除成功" ); } } public static void main (String[] args) throws Exception { CuratorMain curatorMain=new CuratorMain(); curatorMain.nodeCRUD(); } }
异步请求 所谓异步请求,就是客户端发起请求后,由一个异步线程去执行,当收到服务端的返回结果后,再通过回调方法进行通知。
public void asyncCrud () throws Exception { CountDownLatch countDownLatch=new CountDownLatch(2 ); ExecutorService executorService= Executors.newFixedThreadPool(2 ); System.out.println("开始节点创建" ); String node=curatorFramework.create().withMode(CreateMode.PERSISTENT).inBackground((session,event)->{ System.out.println(Thread.currentThread().getName()+":执行创建节点->" +event.getPath()); countDownLatch.countDown(); },executorService).forPath("/async-node" ); System.out.println("异步等待节点创建,此时节点创建状态,node:" +node); curatorFramework.delete().inBackground((session,event)->{ System.out.println(Thread.currentThread().getName()+":执行删除节点->" +event.getPath()); countDownLatch.countDown(); },executorService).forPath("/async-node" ); System.out.println("等待异步执行结束" ); countDownLatch.await(); executorService.shutdown(); }
Zookeeper权限控制 Zookeeper作为一个分布式协调框架,内部存储了一些分布式系统运行时的状态的数据,比如master选举、比如分布式锁。对这些数据的操作会直接影响到分布式系统的运行状态。因此,为了保证zookeeper中的数据的安全性,避免误操作带来的影响。Zookeeper提供了一套ACL权限控制机制来保证数据的安全。
ACL权限控制,使用:scheme:id:perm
来标识。
Scheme(权限模式),标识授权策略
ID(授权对象)
Permission:授予的权限
ZooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限,每个znode支持设置多种权限控制方案和多个权限,子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点。
Scheme权限模式 Zookeeper提供以下权限模式,所谓权限模式,就是使用什么样的方式来进行授权。
world: 默认方式,相当于全部都能访问。
auth :代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户)
digest :即用户名:密码这种方式认证,这也是业务系统中最常用的。用 username:password 字符串来产生一个MD5串,然后该串被用来作为ACL ID。认证是通过明文发送username:password 来进行的,当用在ACL时,表达式为username:base64 ,base64是password的SHA1摘要的编码。
ip :通过ip地址来做权限控制,比如 ip:192.168.1.1 表示权限控制都是针对这个ip地址的。也可以针对网段 ip:192.168.1.1/24,此时addr中的有效位与客户端addr中的有效位进行比对。
ID授权对象 指权限赋予的用户或一个指定的实体,不同的权限模式下,授权对象不同
Id ipId1 = new Id("ip" , "192.168.190.1" ); Id ANYONE_ID_UNSAFE = new Id("world" , "anyone" );
Permission权限类型 指通过权限检查后可以被允许的操作,create /delete /read/write/admin
权限模式(Schema)和授权对象主要用来确认权限验证过程中使用的验证策略: 比如ip地址、digest:username:password ,匹配到验证策略并验证成功后,再根据权限操作类型来决定当前客户端的访问权限。
在控制台上实现权限操作 在Zookeeper中提供了ACL相关的命令如下。
getAcl getAcl <path> 读取ACL权限 setAcl setAcl <path> <acl> 设置ACL权限 addauth addauth <scheme> <auth> 添加认证用户
word方式 创建一个节点后默认就是world模式
[zk: localhost:2181(CONNECTED) 2] create /auth Created /auth [zk: localhost:2181(CONNECTED) 3] getAcl /auth 'world,'anyone : cdrwa [zk: localhost:2181(CONNECTED) 4] create /world Created /world [zk: localhost:2181(CONNECTED) 5] getAcl /world 'world,'anyone : cdrwa [zk: localhost:2181(CONNECTED) 6] setAcl /world:anyone:acd setAcl [-s] [-v version] [-R] path acl [zk: localhost:2181(CONNECTED) 7] setAcl /world world:anyone:acd [zk: localhost:2181(CONNECTED) 8] getAcl /world 'world,'anyone : cda
其中, cdrwa,分别对应 create . delete read write admin
IP模式 在ip模式中,首先连接到zkServer的命令需要使用如下方式
./zkCli.sh -server 192.168.221.128:2181
接着按照IP的方式操作如下
[zk: 192.168.221.128:2181(CONNECTED) 3] create /ip-model Created /ip-model [zk: 192.168.221.128:2181(CONNECTED) 4] setAcl /ip-model ip:127.0.0.1:cdrwa,ip:192.168.221.128/131:cdrwa Acl is not valid : /ip-model [zk: 192.168.221.128:2181(CONNECTED) 5] setAcl /ip-model ip:127.0.0.1:cdrwa,ip:192.168.221.128:cdrwa [zk: 192.168.221.128:2181(CONNECTED) 6] getAcl /ip-model 'ip,'127.0.0.1 : cdrwa 'ip,'192.168.221.128 : cdrwa
Auth模式 auth模式的操作如下。
[zk: 192.168.221.128:2181(CONNECTED) 7] create /auth Created /auth [zk: 192.168.221.128:2181(CONNECTED) 8] addauth digest mic:mic # 增加授权用户,明文用户名和密码,zk会对密码加密 [zk: 192.168.221.128:2181(CONNECTED) 9] setAcl /auth auth:mic:cdrwa # 授予权限 [zk: 192.168.221.128:2181(CONNECTED) 11] getAcl /auth 'digest,'mic:xUsfnPBF9eNvHVWZx/TZt9ioxBA= : cdrwa [zk: 192.168.221.128:2181(CONNECTED) 12]
当我们退出当前的会话后,再次连接,执行如下操作,会提示没有权限
[zk: localhost:2181(CONNECTED) 1] get /auth Insufficient permission : /auth
这时候,我们需要重新授权。
[zk: localhost:2181(CONNECTED) 2] addauth digest mic:mic [zk: localhost:2181(CONNECTED) 3] get /auth null [zk: localhost:2181(CONNECTED) 4]
Digest 使用语法,会发现使用方式和Auth模式相同。
setAcl /digest digest:用户名:密码:权限
但是有一个不一样的点,密码需要用加密后的,否则无法被识别。
密码: 用户名和密码加密后的字符串。
使用下面程序生成密码
public static void main (String[] args) throws Exception { String up="mic:mic" ; byte [] digest=MessageDigest.getInstance("SHA1" ).digest(up.getBytes()); String encodeString=Base64.getEncoder().encodeToString(digest); System.out.println(encodeString); }
得到:xUsfnPBF9eNvHVWZx/TZt9ioxBA=
再回到client上进行如下操作
[zk: localhost:2181(CONNECTED) 10] create /digest Created /digest [zk: localhost:2181(CONNECTED) 11] setAcl /digest digest:mic:xUsfnPBF9eNvHVWZx/TZt9ioxBA=:cdrwa [zk: localhost:2181(CONNECTED) 12] getAcl /digest 'digest,'mic:xUsfnPBF9eNvHVWZx/TZt9ioxBA= : cdrwa
当退出当前会话后,需要再次授权才能访问**/digest**节点
[zk: localhost:2181(CONNECTED) 1] get /digest Insufficient permission : /digest [zk: localhost:2181(CONNECTED) 2] addauth digest mic:mic [zk: localhost:2181(CONNECTED) 3] get /digest null
Curator演示ACL的使用 接下来我们使用Curator简单演示一下ACL权限的访问操作。
public class CuratorMain { private final CuratorFramework curatorFramework; public CuratorMain () { curatorFramework=CuratorFrameworkFactory.builder() .connectString("192.168.221.128:2181" ) .sessionTimeoutMs(5000 ).connectionTimeoutMs(20000 ) .retryPolicy(new ExponentialBackoffRetry(1000 ,3 )) .namespace("curator" ).build(); curatorFramework.start(); } public void aclExample () throws Exception { Id id=new Id("digest" , DigestAuthenticationProvider.generateDigest("mic:mic" )); List<ACL> acls=new ArrayList<>(); acls.add(new ACL(ZooDefs.Perms.ALL,id)); String node=curatorFramework.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(acls,false ).forPath("/auth" ,"Hello" .getBytes()); System.out.println("成功创建带权限的节点:" +node); String data=new String(curatorFramework.getData().forPath(node)); System.out.println("获取数据结果:" +data); } public static void main (String[] args) throws Exception { CuratorMain curatorMain=new CuratorMain(); curatorMain.aclExample(); } }
上述代码执行后会报错
Exception in thread "main" org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /curator/auth
修改后代码如下。
public class CuratorMain { private final CuratorFramework curatorFramework; public CuratorMain () { curatorFramework=CuratorFrameworkFactory.builder() .connectString("192.168.221.128:2181" ) .sessionTimeoutMs(20000 ).connectionTimeoutMs(20000 ) .retryPolicy(new ExponentialBackoffRetry(1000 ,3 )) .authorization("digest" ,"mic:mic" .getBytes()) .namespace("curator" ).build(); curatorFramework.start(); } public void aclExample () throws Exception { Id id=new Id("digest" , DigestAuthenticationProvider.generateDigest("mic:mic" )); List<ACL> acls=new ArrayList<>(); acls.add(new ACL(ZooDefs.Perms.ALL,id)); String node=curatorFramework.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(acls,false ).forPath("/auth" ,"Hello" .getBytes()); System.out.println("成功创建带权限的节点:" +node); String data=new String(curatorFramework.getData().forPath(node)); System.out.println("获取数据结果:" +data); } public static void main (String[] args) throws Exception { CuratorMain curatorMain=new CuratorMain(); curatorMain.aclExample(); } }
事件监听机制详解 在上一节课中,我们了解了Zookeeper中的事件监听机制,基于事件监听,应用程序可以订阅指定节点的变更事件来完成响应的逻辑,这个特性可以让zookeeper实现分布式锁、注册中心、配置中心等功能。
在Zookeeper客户端中,提供了一下以下事件类型
public static enum EventType { None(-1 ), NodeCreated(1 ), NodeDeleted(2 ), NodeDataChanged(3 ), NodeChildrenChanged(4 ), DataWatchRemoved(5 ), ChildWatchRemoved(6 ), PersistentWatchRemoved(7 ); }
在zookeeper3.6版本之前,Curator提供了三种Watcher来监听节点的变化。
PathChildCache :监视一个路径下子结点的创建、删除、更新。
NodeCache :监视当前结点的创建、更新、删除,并将结点的数据缓存在本地。
TreeCache :PathChildCache和NodeCache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
但是在zookeeper3.6版本之后,只提供了一个CuratorCache来实现时间订阅。当然,如果要使用事件订阅功能,我们需要引入下面的jar包。
<dependency > <groupId > org.apache.curator</groupId > <artifactId > curator-recipes</artifactId > <version > 5.2.0</version > </dependency >
普通事件订阅 普通的事件订阅,就是使用如getData、exists等命令添加的CuratorWatcher机制。这种方式触发的事件,只会响应一次。
public class CuratorWatchMain { private final CuratorFramework curatorFramework; public CuratorWatchMain () { curatorFramework=CuratorFrameworkFactory.builder() .connectString("192.168.221.128:2181" ) .sessionTimeoutMs(20000 ).connectionTimeoutMs(20000 ) .retryPolicy(new ExponentialBackoffRetry(1000 ,3 )) .namespace("curator" ).build(); curatorFramework.start(); } public void normalWatcher () throws Exception { CuratorWatcher watcher=new CuratorWatcher() { @Override public void process (WatchedEvent watchedEvent) throws Exception { System.out.println("监听事件:" +watchedEvent.toString()); } }; String node=curatorFramework.create().forPath("/listener" ,"I'Listener" .getBytes()); String data=new String(curatorFramework.getData().usingWatcher(watcher).forPath(node)); System.out.println(node+"节点对应的value:" +data); curatorFramework.setData().forPath(node,"change listener" .getBytes()); curatorFramework.setData().forPath(node,"change listener" .getBytes()); } public static void main (String[] args) throws Exception { CuratorWatchMain curatorMain=new CuratorWatchMain(); curatorMain.normalWatcher(); } }
如果希望事件监听是持久化的,则改造代码如下
public void normalWatcher () throws Exception { CuratorWatcher watcher=new CuratorWatcher() { @Override public void process (WatchedEvent watchedEvent) throws Exception { System.out.println("监听事件:" +watchedEvent.toString()); curatorFramework.checkExists().usingWatcher(this ).forPath("/listener" ); } }; String node=curatorFramework.create().forPath("/listener" ,"I'Listener" .getBytes()); String data=new String(curatorFramework.getData().usingWatcher(watcher).forPath(node)); System.out.println(node+"节点对应的value:" +data); curatorFramework.setData().forPath(node,"change listener" .getBytes()); Thread.sleep(3000 ); curatorFramework.setData().forPath(node,"change listener" .getBytes()); System.in.read(); }
CuratorCache API说明 在Curator包中,提供了另外一个可以持续订阅的API,CuratorCacheListener
CuratorCacheListener是基于CuratorCache缓存实现的监听器,CuratorCache对Zookeeper事件监听进行了封装,能够自动处理反复注册监听,在使用CuratorListener时,首选需要构建CuratorCache缓存实例,具体定义如下。
CuratorCache.build(CuratorFramework client, String path, Options... options) Parameters: client - the client path - path to watch options - empty or one or more options
options有三个选项:
CuratorCache.Options.SINGLE_NODE_CACHE
enum Options { SINGLE_NODE_CACHE, COMPRESSED_DATA, DO_NOT_CLEAR_ON_CLOSE }
CuratorCache实现事件订阅 代码实现如下。
public class CuratorWatchMain { private final CuratorFramework curatorFramework; public CuratorWatchMain () { curatorFramework=CuratorFrameworkFactory.builder() .connectString("192.168.221.128:2181" ) .sessionTimeoutMs(20000 ).connectionTimeoutMs(20000 ) .retryPolicy(new ExponentialBackoffRetry(1000 ,3 )) .authorization("digest" ,"mic:mic" .getBytes()) .namespace("curator" ).build(); curatorFramework.start(); } public void normalWatcher () throws Exception { CuratorWatcher watcher=new CuratorWatcher() { @Override public void process (WatchedEvent watchedEvent) throws Exception { System.out.println("监听事件:" +watchedEvent.toString()); curatorFramework.checkExists().usingWatcher(this ).forPath("/listener" ); } }; String node=curatorFramework.create().forPath("/listener" ,"I'Listener" .getBytes()); String data=new String(curatorFramework.getData().usingWatcher(watcher).forPath(node)); System.out.println(node+"节点对应的value:" +data); curatorFramework.setData().forPath(node,"change listener" .getBytes()); Thread.sleep(3000 ); curatorFramework.setData().forPath(node,"change listener" .getBytes()); System.in.read(); } public void addListenerWithNodeCache (String node) { CuratorCache curatorCache=CuratorCache.build(curatorFramework,node,CuratorCache.Options.SINGLE_NODE_CACHE); CuratorCacheListener listener=CuratorCacheListener .builder() .forAll((type, oldNode, newNode)->{ System.out.println("事件类型:" +type+"\n\r原节点:" +oldNode+"\n\r新节点" +newNode); }).forInitialized(()->{ System.out.println("初始化" ); }).build(); curatorCache.listenable().addListener(listener); curatorCache.start(); } public void operation (String node) throws Exception { curatorFramework.create().forPath(node); curatorFramework.setData().forPath(node,"Hello" .getBytes()); curatorFramework.delete().forPath(node); } public static void main (String[] args) throws Exception { CuratorWatchMain curatorMain=new CuratorWatchMain(); String node="/node" ; curatorMain.addListenerWithNodeCache(node); curatorMain.operation(node); System.in.read(); } }