文章目录
- ZK节点类型
 - watch监听机制
 - Zookeeper实现分布式锁
 - 锁原理
 - 创建锁的过程
 - 释放锁的过程
 - ZK锁的种类
 
- 代码实现
 
Zookeeper是一个开源的分布式协调服务,是一个典型的分布式数据一致性解决方案。
分布式应用程序可以基于Zookeeper实现诸如数据发布/订阅,负载均衡,命名服务,分布式协调/通知,集群管理,Master选举,分布式锁和分布式队列等功能。
ZK节点类型
-  
临时节点
客户端与zookeeper断开连接后,该节点会自动删除 -  
临时有序节点
客户端与zookeeper断开连接后,该节点会自动删除,但是这些节点都是有序排列的。 -  
持久节点
客户端与zookeeper断开连接后,该节点依然存在 -  
持久节点
客户端与zookeeper断开连接后,该节点依然存在,但是这些节点都是有序排列的。 
watch监听机制
主要是监听以下节点变化信息
- 节点创建
 - 节点删除
 - 节点数据修改
 - 子节点变更
 
Zookeeper实现分布式锁
锁原理
多个客户端来竞争锁,各自创建自己的节点,按照顺序创建,谁排在第一个,谁就成功的获取了锁。
就像排队买东西一样,谁排在第一个,谁就先买。
 
创建锁的过程
-  
A、B、C、D四个客户端来抢锁
 -  
A 先来了,他创建了000001的临时顺序节点,他发现自己是最小的节点,那么就成功的获取到了锁
 -  
然后 B 来获取锁,他按照顺序创建了000001的临时顺序节点,发现前面有一个比他小的节点,那么就获取锁失败。他开始监听A客户端,看他什么时候能释放锁
 -  
同理C和D。
 
释放锁的过程
-  
A 客户端执行完任务后,断开了和zookeeper的会话,这时候临时顺序节点自动删除了,也就释放了锁
 -  
B 客户端一直在虎视眈眈的watch监听着A,发现他释放了锁,立马就判断自己是不是最小的节点,如果是就获取锁成功
 -  
C 监听着B,D监听着C。
 
分布式锁的流程图

ZK锁的种类
- InterProcessMutex 分布式可重入排它锁
 - InterProcessSemaphoreMutex 分布式排它锁
 - InterProcessReadWriteLock 分布式可读写锁
 - InterProcessMultiLock 用多个锁去进行一组操作
 - InterProcessSemaphoreV2 共享信号量
 
代码实现
我们这里直接用封装好的工具类,因为如果你自己写的话,如果测试不到位,一旦线上出现问题,那就是大问题。
这里我们用curator这个工具类,他这里把分布式锁已经都给我们实现好了,我们使用起来就像ReentrantLock这些锁一样,非常简单。
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.10</version>
</dependency>
 
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import java.util.concurrent.TimeUnit;
public class CuratorDistrLockTest implements Runnable{
    //zookeeper的地址
    private static final String ZK_ADDRESS = "127.0.0.1:2181";
    private static final String ZK_LOCK_PATH = "/zkLock";
    static CuratorFramework client = null;
    static {
        // 连接ZK,如果连接失败,设置每5000毫秒重试一次,最多重试10次
        client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,
                new RetryNTimes(10, 5000));
        client.start();
    }
    private static void curatorLockTest() {
        InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
        try {
            if (lock.acquire(6 * 1000, TimeUnit.SECONDS)) {
                System.out.println("====== " + Thread.currentThread().getName() + " 抢到了锁 ======");
                //执行业务逻辑
                Thread.sleep(15000);
                System.out.println(Thread.currentThread().getName() + "任务执行完毕");
            }
        } catch (Exception e) {
            System.out.println("业务异常");
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                System.out.println("锁释放异常");
            }
        }
    }
    public static void main(String[] args) {
        // 用两个线程,模拟两个客户端
        // 每个线程创建各自的zookeeper连接对象
        new Thread(new CuratorDistrLockTest()).start();
        new Thread(new CuratorDistrLockTest()).start();
    }
    @Override
    public void run() {
        curatorLockTest();
    }
}
                


















