分布式锁-02丨ZooKeeper实现分布式锁

Posted by jiefang on January 5, 2020

ZooKeeper实现分布式锁

基于比较常用的Curator这个开源框架,聊一下这个框架对ZooKeeper(以下简称zk)分布式锁的实现。

实现机制

  • 临时有序节点;
  • 事件监听机制;

ZooKeeper实现分布式锁

  1. 客户端A抢先一步,对zk发起了加分布式锁的请求,直接在”my_lock”这个锁节点下,创建一个临时顺序节点,这个顺序节点有zk内部自行维护的一个节点序号;
  2. 客户端A判断创建的节点是否是锁节点下的所有子节点中排序第一的节点,如果是加锁成功;
  3. 客户端B过来想要加锁了,在”my_lock”这个锁节点下创建一个临时顺序节点;
  4. 客户端B查询”my_lock”锁节点下的所有子节点,按序号顺序排列,检查自己创建的顺序节点,是不是集合中排第一,不是加锁失败;
  5. 客户端B就会通过ZK的API对它的顺序节点的上一个顺序节点加一个监听器;
  6. 客户端A执行业务逻辑,释放锁,删除临时顺序节点;
  7. ZK通知监听这个节点的监听器,即客户端B加的监听器,通知:你监听的节点已删除;
  8. 客户端B尝试加锁;
  9. 客户端B加锁成功;

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Locker {

    public static void main(String[] args) throws Exception {
        //创建zookeeper的客户端
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy);
        client.start();
        InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
        //加锁
        mutex.acquire();
        //获得了锁, 进行业务流程
        System.out.println("Enter mutex");
        //完成业务流程, 释放锁
        mutex.release();
        //关闭客户端
        client.close();
    }
}

加锁

acquire()

获取锁,当锁被占用时会阻塞等待,这个操作支持同线程的可重入。当与Zookeeper通信存在异常时,acquire会直接抛出异常,需要使用者自身做重试策略。

1
2
3
4
5
    public void acquire() throws Exception {
        if (!this.internalLock(-1L, (TimeUnit)null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
        }
    }

internalLock(long time, TimeUnit unit)

获取锁或设置可重入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    private boolean internalLock(long time, TimeUnit unit) throws Exception {
        //获取当前线程
        Thread currentThread = Thread.currentThread();
        //获取当前线程锁信息
        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
        //锁信息不为空,设置锁重入
        if (lockData != null) {
            //锁获取次数+1
            lockData.lockCount.incrementAndGet();
            return true;
        //否则
        } else {
            //获取锁,返回锁路径
            String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
            if (lockPath != null) {
                //新建锁信息包含当前线程、锁路径和默认锁获取次数1
                InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
                //锁信息放在ConcurrentMap中,当前线程为key
                this.threadData.put(currentThread, newLockData);
                return true;
            } else {
                return false;
            }
        }
    }

attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        long startMillis = System.currentTimeMillis();
        Long millisToWait = unit != null ? unit.toMillis(time) : null;
        byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
        int retryCount = 0;
        String ourPath = null;
        boolean hasTheLock = false;
        boolean isDone = false;

        while(!isDone) {
            isDone = true;

            try {
                //在锁空间下创建临时且有序的子节点
                ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
                //判断是否获得锁(子节点序号最小),获得锁则直接返回,否则阻塞等待前一个子节点删除通知
                hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
            } catch (NoNodeException var14) {
                //只有发生session过期才会在这里抛出NoNodeException
                if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                    throw var14;
                }

                isDone = false;
            }
        }
        //如果获得锁则返回该子节点的路径
        return hasTheLock ? ourPath : null;
    }

internalLockLoop(long startMillis, Long millisToWait, String ourPath)

判断锁以及阻塞等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;

        try {
            //锁路径增加watcher
            if (this.revocable.get() != null) {
                ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
            }
            //自旋直至获得锁
            while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
                //获取所有的子节点列表,并且按序号从小到大排序
                List<String> children = this.getSortedChildren();
                //根据序号判断当前子节点是否为最小子节点
                String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
                PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
                if (predicateResults.getsTheLock()) {
                    //如果为最小子节点则认为获得锁
                    haveTheLock = true;
                } else {
                    //否则获取前一个子节点
                    String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
                    //这里使用对象监视器做线程同步,当获取不到锁时监听前一个子节点删除消息并且进行wait(),当前一个子节点删除(也就是锁释放)时,回调会通过notifyAll唤醒此线程,此线程继续自旋判断是否获得锁
                    synchronized(this) {
                        try {
                           //这里使用getData()接口而不是checkExists()是因为,如果前一个子节点已经被删除了那么会抛出异常而且不会设置事件监听器,而checkExists虽然也可以获取到节点是否存在的信息但是同时设置了监听器,这个监听器其实永远不会触发,对于zookeeper来说属于资源泄露 ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
                           //如果不设置阻塞等待的时间,阻塞
                            if (millisToWait == null) {
                                this.wait();
                            } else {
                                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if (millisToWait > 0L) {
                                    //等待相应的时间
                                    this.wait(millisToWait);
                                } else {
                                    // 等待时间到达,删除对应的子节点
                                    doDelete = true;
                                    break;
                                }
                            }
                        } catch (NoNodeException var19) {
                        //使用getData来设置监听器时,如果前一个子节点已经被删除那么会抛出NoNodeException,只需要自旋一次即可,无需额外处理
                        }
                    }
                }
            }
        } catch (Exception var21) {
            ThreadUtils.checkInterrupted(var21);
            doDelete = true;
            throw var21;
        } finally {
            if (doDelete) {
                this.deleteOurPath(ourPath);
            }

        }

        return haveTheLock;
    }

解锁

release()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void release() throws Exception{
    /*
        Note on concurrency: a given lockData instance
        can be only acted on by a single thread so locking isn't necessary
     */
    //获取当前线程
    Thread currentThread = Thread.currentThread();
    //从map中根据当前线程获取锁
    LockData lockData = threadData.get(currentThread);
    if ( lockData == null ){
        throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
    }
	//解锁后次数-1,任然大于0返回
    int newLockCount = lockData.lockCount.decrementAndGet();
    if ( newLockCount > 0 ){
        return;
    }
    if ( newLockCount < 0 ){
        throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
    }
    try{
        internals.releaseLock(lockData.lockPath);
    }finally{
        //解锁最后从map中删除当前线程
        threadData.remove(currentThread);
    }
}

releaseLock(String lockPath)

1
2
3
4
5
6
7
final void releaseLock(String lockPath) throws Exception{
    //删除观察者
    client.removeWatchers();
    revocable.set(null);
    //删除节点
    deleteOurPath(lockPath);
}

ZK锁与Redis锁对比

Redis分布式锁的问题

  1. Redis单实例实现:Redis单点故障,不能高可用,可能会导致系统崩溃;
  2. Redis主从架构+哨兵:保证高可用,master宕机,slave接替,master宕机一瞬间,锁还没有异步复制到salve,会导致重复加锁,可能系统异常;
  3. Redis集群+redLock算法实现太复杂繁琐,太脆弱,多节点同时设置分布式锁,失效时间不一样,随着不同linux机器时间不同步,可能会出现各种问题,导致重复加锁;

Redisson对Redis分布式锁的支持很好,支持可重入锁,读写锁,公平锁,信号量,CountDownLatch,很多复杂的锁语义;

ZK分布式锁呢?其实有优点也有缺点,优点是锁模型健壮、稳定、可用性高。

使用场景:

  1. Redis分布式锁要知道优势和劣势,可以容忍劣势,做好对应措施和预案,可以容忍做补数据的操作,需要使用各种高级分布式锁的操作;
  2. ZK锁,使用分布式锁的需求很简单,就是普通的悲观锁模型,不涉及公平性,读写锁之类的;ZK的临时顺序节点实现的分布式锁,模型简单健壮;