时间:2022-12-02 11:42:47 | 栏目:JAVA代码 | 点击:次
ZooKeeper入门教程二在单机和集群环境下的安装搭建及使用
首先我们先介绍一个简单的zookeeper实现分布式锁的思路:
用zookeeper中一个临时节点代表锁,比如在/exlusive_lock下创建临时子节点/exlusive_lock/lock。
我们的程序按照上述逻辑直至抢占到锁,执行完业务逻辑。
上述是较为简单的分布式锁实现方式。能够应付一般使用场景,但存在着如下两个问题:
1、锁的获取顺序和最初客户端争抢顺序不一致,这不是一个公平锁。每次锁获取都是当次最先抢到锁的客户端。
2、羊群效应,所有没有抢到锁的客户端都会监听/exlusive_lock变更。当并发客户端很多的情况下,所有的客户端都会接到通知去争抢锁,此时就出现了羊群效应。
为了解决上面的问题,我们重新设计。
我们在2.0版本中,让每个客户端在/exlusive_lock下创建的临时节点为有序节点,这样每个客户端都在/exlusive_lock下有自己对应的锁节点,而序号排在最前面的节点,代表对应的客户端获取锁成功。排在后面的客户端监听自己前面一个节点,那么在他前序客户端执行完成后,他将得到通知,获得锁成功。逻辑修改如下:
如此修改后,每个客户端只关心自己前序锁是否释放,所以每次只会有一个客户端得到通知。而且,所有客户端的执行顺序和最初锁创建的顺序是一致的。解决了1.0版本的两个问题。
接下来我们看看代码如何实现。
此类是分布式锁类,实现了2个分布式锁的相关方法:
1、获取锁
2、释放锁
主要程序逻辑围绕着这两个方法的实现,特别是获取锁的逻辑。我们先看一下该类的成员变量:
private ZooKeeper zkClient; private static final String LOCK_ROOT_PATH = "/Locks"; private static final String LOCK_NODE_NAME = "Lock_"; private String lockPath;
定义了zkClient,用来操作zookeeper。
锁的根路径,及自增节点的前缀。此处生产环境应该由客户端传入。
当前锁的路径。
public LockSample() throws IOException { zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() { @Override public void process(WatchedEvent event) { if(event.getState()== Event.KeeperState.Disconnected){ System.out.println("失去连接"); } } }); }
创建zkClient,同时创建了状态监听。此监听可以去掉,这里只是打印出失去连接状态。
暴露出来的获取锁的方法为acquireLock(),逻辑很简单:
public void acquireLock() throws InterruptedException, KeeperException { //创建锁节点 createLock(); //尝试获取锁 attemptLock(); }
首先创建锁节点,然后尝试去取锁。真正的逻辑都在这两个方法中。
先判断锁的根节点/Locks是否存在,不存在的话创建。然后在/Locks下创建有序临时节点,并设置当前的锁路径变量lockPath。
代码如下:
private void createLock() throws KeeperException, InterruptedException { //如果根节点不存在,则创建根节点 Stat stat = zkClient.exists(LOCK_ROOT_PATH, false); if (stat == null) { zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 创建EPHEMERAL_SEQUENTIAL类型节点 String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath); this.lockPath=lockPath; }
这是最核心的方法,客户端尝试去获取锁,是对2.0版本逻辑的实现,这里就不再重复逻辑,直接看代码:
private void attemptLock() throws KeeperException, InterruptedException { // 获取Lock所有子节点,按照节点序号排序 List<String> lockPaths = null; lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false); Collections.sort(lockPaths); int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1)); // 如果lockPath是序号最小的节点,则获取锁 if (index == 0) { System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath); return ; } else { // lockPath不是序号最小的节点,监听前一个节点 String preLockPath = lockPaths.get(index - 1); Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher); // 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁 if (stat == null) { attemptLock(); } else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock System.out.println(" 等待前锁释放,prelocakPath:"+preLockPath); synchronized (watcher) { watcher.wait(); } attemptLock(); } } }
注意这一行代码
Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher);
我们在获取前一个节点的时候,同时设置了监听watcher。如果前锁存在,则阻塞主线程。
watcher定义代码如下:
private Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getPath() + " 前锁释放"); synchronized (this) { notifyAll(); } } };
watcher只是notifyAll,让主线程继续执行,以便再次调用attemptLock(),去尝试获取lock。如果没有异常情况的话,此时当前客户端应该能够成功获取锁。
释放锁原语实现很简单,参照releaseLock()方法。代码如下:
public void releaseLock() throws KeeperException, InterruptedException { zkClient.delete(lockPath, -1); zkClient.close(); System.out.println(" 锁释放:" + lockPath); }
关于分布式锁的代码到此就讲解完了,我们再看下客户端如何使用它。
我们创建一个TicketSeller类,作为客户端来使用分布式锁。
不带锁的业务逻辑方法,代码如下:
private void sell(){ System.out.println("售票开始"); // 线程随机休眠数毫秒,模拟现实中的费时操作 int sleepMillis = (int) (Math.random() * 2000); try { //代表复杂逻辑执行了一段时间 Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("售票结束"); }
仅是为了演示,sleep了一段时间。
此方法中,加锁后执行业务逻辑,代码如下:
public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException { LockSample lock = new LockSample(); lock.acquireLock(); sell(); lock.releaseLock(); }
接下来我们写一个main函数做测试:
public static void main(String[] args) throws KeeperException, InterruptedException, IOException { TicketSeller ticketSeller = new TicketSeller(); for(int i=0;i<1000;i++){ ticketSeller.sellTicketWithLock(); } }
main函数中我们循环调用ticketSeller.sellTicketWithLock(),执行加锁后的卖票逻辑。
1、先启动一个java程序运行,可以看到日志输出如下:
main 锁创建: /Locks/Lock_0000000391 main 锁获得, lockPath: /Locks/Lock_0000000391 售票开始 售票结束 锁释放:/Locks/Lock_0000000391 main 锁创建: /Locks/Lock_0000000392 main 锁获得, lockPath: /Locks/Lock_0000000392 售票开始 售票结束 锁释放:/Locks/Lock_0000000392 main 锁创建: /Locks/Lock_0000000393 main 锁获得, lockPath: /Locks/Lock_0000000393 售票开始 售票结束 锁释放:/Locks/Lock_0000000393
可见每次执行都是按照锁的顺序执行,而且由于只有一个进程,并没有锁的争抢发生。
2、我们再启动一个同样的程序,锁的争抢此时发生了,可以看到双方的日志输出如下:
程序1:
main 锁获得, lockPath: /Locks/Lock_0000000471 售票开始 售票结束 锁释放:/Locks/Lock_0000000471 main 锁创建: /Locks/Lock_0000000473 等待前锁释放,prelocakPath:Lock_0000000472 /Locks/Lock_0000000472 前锁释放 main 锁获得, lockPath: /Locks/Lock_0000000473 售票开始 售票结束 锁释放:/Locks/Lock_0000000473
可以看到Lock_0000000471执行完成后,该进程获取的锁为Lock_0000000473,这说明Lock_0000000472被另外一个进程创建了。此时Lock_0000000473在等待前锁释放。Lock_0000000472释放后,Lock_0000000473才获得锁,然后才执行业务逻辑。
我们再看程序2的日志:
main 锁获得, lockPath: /Locks/Lock_0000000472 售票开始 售票结束 锁释放:/Locks/Lock_0000000472 main 锁创建: /Locks/Lock_0000000474 等待前锁释放,prelocakPath:Lock_0000000473 /Locks/Lock_0000000473 前锁释放 main 锁获得, lockPath: /Locks/Lock_0000000474 售票开始 售票结束 锁释放:/Locks/Lock_0000000474
可以看到,确实是进程2获取了Lock_0000000472。
zookeeper实现分布式锁就先讲到这。注意代码只做演示用,并不适合生产环境使用。
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; public class LockSample { //ZooKeeper配置信息 private ZooKeeper zkClient; private static final String LOCK_ROOT_PATH = "/Locks"; private static final String LOCK_NODE_NAME = "Lock_"; private String lockPath; // 监控lockPath的前一个节点的watcher private Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getPath() + " 前锁释放"); synchronized (this) { notifyAll(); } } }; public LockSample() throws IOException { zkClient= new ZooKeeper("localhost:2181", 10000, new Watcher() { @Override public void process(WatchedEvent event) { if(event.getState()== Event.KeeperState.Disconnected){ System.out.println("失去连接"); } } }); } //获取锁的原语实现. public void acquireLock() throws InterruptedException, KeeperException { //创建锁节点 createLock(); //尝试获取锁 attemptLock(); } //创建锁的原语实现。在lock节点下创建该线程的锁节点 private void createLock() throws KeeperException, InterruptedException { //如果根节点不存在,则创建根节点 Stat stat = zkClient.exists(LOCK_ROOT_PATH, false); if (stat == null) { zkClient.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 创建EPHEMERAL_SEQUENTIAL类型节点 String lockPath = zkClient.create(LOCK_ROOT_PATH + "/" + LOCK_NODE_NAME, Thread.currentThread().getName().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName() + " 锁创建: " + lockPath); this.lockPath=lockPath; } private void attemptLock() throws KeeperException, InterruptedException { // 获取Lock所有子节点,按照节点序号排序 List<String> lockPaths = null; lockPaths = zkClient.getChildren(LOCK_ROOT_PATH, false); Collections.sort(lockPaths); int index = lockPaths.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1)); // 如果lockPath是序号最小的节点,则获取锁 if (index == 0) { System.out.println(Thread.currentThread().getName() + " 锁获得, lockPath: " + lockPath); return ; } else { // lockPath不是序号最小的节点,监控前一个节点 String preLockPath = lockPaths.get(index - 1); Stat stat = zkClient.exists(LOCK_ROOT_PATH + "/" + preLockPath, watcher); // 假如前一个节点不存在了,比如说执行完毕,或者执行节点掉线,重新获取锁 if (stat == null) { attemptLock(); } else { // 阻塞当前进程,直到preLockPath释放锁,被watcher观察到,notifyAll后,重新acquireLock System.out.println(" 等待前锁释放,prelocakPath:"+preLockPath); synchronized (watcher) { watcher.wait(); } attemptLock(); } } } //释放锁的原语实现 public void releaseLock() throws KeeperException, InterruptedException { zkClient.delete(lockPath, -1); zkClient.close(); System.out.println(" 锁释放:" + lockPath); } }
import org.apache.zookeeper.KeeperException; import java.io.IOException; public class TicketSeller { private void sell(){ System.out.println("售票开始"); // 线程随机休眠数毫秒,模拟现实中的费时操作 int sleepMillis = (int) (Math.random() * 2000); try { //代表复杂逻辑执行了一段时间 Thread.sleep(sleepMillis); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("售票结束"); } public void sellTicketWithLock() throws KeeperException, InterruptedException, IOException { LockSample lock = new LockSample(); lock.acquireLock(); sell(); lock.releaseLock(); } public static void main(String[] args) throws KeeperException, InterruptedException, IOException { TicketSeller ticketSeller = new TicketSeller(); for(int i=0;i<1000;i++){ ticketSeller.sellTicketWithLock(); } } }