分布式前修课:Zookeeper锁实现方式

news2025/7/18 5:50:23

前言

聊完MySQL和Redis,我们接下来在聊一聊Zookeeper。相信大家都已经发现了,这些都是我们在开发过程非常常用的技术。搞定他们,一切难题都不在话下。

Zookeeper,盘它

官网是我们学习某一种技术框架的第一手资料,通过官网我们能挖掘到该框架的最新动态

What Is Zookeeper

Zookeeper是一款主要解决分布式协调的服务框架,可以用来维护配置信息、命名、提供分布式同步和服务提供等功能。Zookeeper基于ZAB【ZooKeeper 原子广播】协议,支持高可用。

图片来源自[官方介绍]ZooKeeper: Because Coordinating Distributed Systems is a Zoo)

Zookeeper的设计

设计目标

Zookeeper的设计很简单,其目的就是为了:

  • 减轻分布式应用程序实现协调服务的压力,允许分布式进程通过共享的分层命名空间相互协调

而在Zookeeper中的文件存储可以称为:znodes,类似于Linux下的目录和文件;而不同的一点是:ZooKeeper 数据保存在内存中。这样也就意味着Zookeeper自身可以实现实现高吞吐量和低延迟

命名空间设计

图片来源自官方介绍

Zookeeper中名称全部由斜杠【/】 分隔的一系列路径元素,命名空间中的每个节点都由路径标识。而每个节点都可以拥有与其关联的数据以及子节点。这就像拥有一个允许文件也成为目录的文件系统

专业点来说Zookeeper中的每一个节点都可以称为znode, 主要分为两类:

  • 持久节点:【节点只要创建就存在,除非手动删除】
  • 临时节点:【只要创建znode的会话处于活动状态,那么当前节点就会存在;当会话结束,临时节点自动删除】

有序节点是在临时节点和持久节点的基础上创建的时候后面跟上顺序,本质上没有发生很大的变化

Zookeeper提供了监听/回调的机制,当客户端对znode进行操作之后,会触发watch机制,客户端受到znode已经改变的数据包。

从开发角度来看,这种属于Reactor编程模型,纯异步编程

Netty就是这种编程模型的典型案例

稳定大局

对Zookeeper有一点了解之后,我们就要开始使用它了,我们使用它的目的是为了实现分布式锁。那么我们先来搞定基础环境

我们这里先按照单机环境来做,后面会给出集群环境的配置方式

需要注意的是:2N + 1原则

Zookeeper集群最少需要三台服务器,并且强烈建议使用奇数台服务器。如果您只有两台服务器,那么您会遇到这样的情况:如果其中一台出现故障,则没有足够的机器来形成多数法定人数。两台服务器本质上不如一台服务器稳定,因为有两个单点故障

环境规划

我们这里使用的Zookeeper版本:3.6.2

Zookeeper强依赖于JDK,并且需要安装JDk1.8之上的版本

nodeipport
zookeeper192.168.10.2002181

单机环境

环境规划完成之后,接下来就看我操作吧。

 wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
 # 解压
 tar xf apache-zookeeper-3.6.2-bin.tar.gz -C /usr/local/
 
 # 进入到/usr/local下,改个名字
 mv apache-zookeeper-3.6.2-bin/ zookeeper-3.6.2

其实到这里环境就已经安装完成了,下面就是针对Zookeeper的配置

 # 配置文件全部存放在conf下,并且我们需要将模板配置换成`zoo.cfg`,不然无法生效
 cd /usr/local/zookeeper-3.6.2/conf && cp ./zoo_sample.cfg ./zoo.cfg
 
 vim zoo.cfg
  # 默认在tmp下,但是tmp属于系统临时文件目录,我们最好进行修改
  dataDir=/var/data/bigdata/zookeeper

按照zoo.cfg中的配置,我们也只需要改动dataDir的目录就可以了,其他的暂时默认就好

关于Zookeeper更多的配置,在官网《配置参数》中找到

环境变量配置

 # 编辑配置
 vim /etc/profile
  export ZOOKEEPER_HOME=/usr/local/zookeeper-3.6.2
  export JAVA_HOME=/usr/java/jdk1.8.0_221-amd64
  export PATH=$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$PATH

 # 使其生效
 source /etc/profile

下面就开始启动阶段了

 # 以下为启动的全部命令
 zkServer.sh [--config <conf-dir>] {start|start-foreground|stop|version|restart|status|print-cmd}
 
 # 启动:这里已经将Zookeeper加入到了环境变量中
 zkServer.sh start
 # 展示启动状态
 zkServer.sh status

集群配置

集群配置环境下,需要改变两个地方:

第一步:在zoo.cfg配置文件中添加集群节点的配置

 server.1=192.168.10.200:2181:2888:3888
 server.2=192.168.10.201:2181:2888:3888
 server.3=192.168.10.202:2181:2888:3888

第二步:在各自节点的$dataDir目录下添加myid文件,内容对应上面配置的序号

 echo 1 > myid
 echo 2 > myid
 echo 3 > myid

记得要和zoo.cfg中配置的唯一序号一一对应

集群对比单机版只是多了一些配置,其他的没有任何变化。相对比还是非常简单的

客户端操作

Zookeeper提供了命令行的操作方式,通过zkCli.sh来启动,并且操作方式和Linux命令基本相同,下面我们简单演示一下

 # 本地环境可以不配置
 zkCli.sh [-server 127.0.0.1:2181]

下面通过一张图来简单介绍一些Zookeeper的增删改查吧

这其实非常简单的,而且我们并不用搞懂它,毕竟我们在操作的时候并不能直接连到服务器上,下面我们来看看如何通过提供的API来对Zookeeper进行操作吧

锁住它

知其然

在《分布式锁原理》一文中我们曾经介绍过基于Zookeeper实现分布式锁的思路,主要通过Zookeeper的临时节点来实现:

  • 在主节点下每个客户端过来都会注册临时有序节点
  • 每个节点只监听自己前一个节点,如果发现自己是第一个节点,说明已经获取到了锁

而只要客户端断开session连接,临时有序节点自动删除,客户端锁就被释放

知其所以然

下面我们就通过Zookeeper的API来实现一个分布式锁吧。还是老样子,一版自己写,一版看看人家的实现方式。对比一下。

原生代码

 private static final CountDownLatch LATCH = new CountDownLatch(1);
 // 获取ZooKeeper的操作
 public static ZooKeeper getZk() {
     ZooKeeper zooKeeper = null;
     try {
         zooKeeper = new ZooKeeper("192.168.10.200:2181/locks", 1000, event -> {
             switch (event.getState()) {
                 case SyncConnected:
                     // 等到回到 链接成功的事件,就能释放阻塞
                     LATCH.countDown();
                     break;
             }
         });
         //Reactor编程模型,返回很快,但是内存中并没有构建完成,所以需要等待
         LATCH.await();
     } catch (Exception e) {
         e.printStackTrace();
     }
     return zooKeeper;
 }

主要代码

public class LockWatchCallback implements Watcher, AsyncCallback.StringCallback, AsyncCallback.ChildrenCallback, AsyncCallback.StatCallback {

    private ZooKeeper zk;
    private String name;
    private String nodePathName;

    private CountDownLatch latch = new CountDownLatch(1);

    public LockWatchCallback(ZooKeeper zk, String name) {
        this.zk = zk;
        this.name = name;
    }

    public void lock() {
        /**
         * 创建节点:
         *  path: 如果在192.168.10.200:2181/locks指定了目录,那么这里的 根目录 代表的是 /locks,然后在创建对应的临时节点
         *  ZooDefs.Ids.OPEN_ACL_UNSAFE: 权限:全部开放
         *  CreateMode.EPHEMERAL_SEQUENTIAL: 临时有序节点
         *  StringCallback: 节点创建完成之后的回调
         */
        zk.create("/lock", name.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, name);
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void unLock() {
        try {
            zk.delete(nodePathName, -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            // 当节点删除之后,重新拉取一次全部子节点,然后进行监听处理
            case NodeDeleted:
                zk.getChildren("/", false, this, "abc");
                break;
        }

    }

    // zk.create("/lock", name.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, name); 回调
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if (null != name) {
            nodePathName = name;
            // 得到根节点下创建的节点,我们不需要watch根目录
            zk.getChildren("/", false, this, "abc");
        }
    }

    // zk.getChildren("/", false, this, "abc"); 回调
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children) {
        // 得到的children是无序的,所以要先做一个排序
        Collections.sort(children);

        // /lock0000000000, 而children中是没有斜线的,所以要截取一下
        int i = 1;
        if ((i = children.indexOf(nodePathName.substring(1))) < 1) {
            // 自己已经是第一个节点了,获取到了锁,开始执行
            try {
                zk.setData("/", this.name.getBytes(StandardCharsets.UTF_8), 1);
            } catch (Exception e) {
                e.printStackTrace();
            }
            // 释放掉阻塞,让执行
            latch.countDown();
        } else {
            // 监控自己的前一个节点是否还存在
            try {
                zk.exists("/" + children.get(i - 1), this, this, "abc");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
    }
}

全程采用Zookeeper提供的异步API方式进行回调处理,在每一步回调的地方都添加了注释,看起来是比较方便的。

毕竟是个不成熟的小案例,缺少了分布式锁的很多特性,比如:锁重入等等

『不用太刻意对上面的代码做研究,在生产环境下是不会使用这样的代码的』

下面我看一下如何测试:为了能和之前的程序进行统一,做了一个小小的封装,也可以直接使用LockWatchCallback对象来处理锁操作

public class ZookeeperLock extends AbstractLock {

    ZooKeeper zk;
    LockWatchCallback watchCallback;
    public ZookeeperLock(ZooKeeper zk) {
        this.zk = zk;
    }

    @Override
    public void start() {
        // 每个线程都需要创建一个临时有序节点,所以每个线程都会new一个watchCallback对象
        watchCallback = new LockWatchCallback(zk, Thread.currentThread().getName());
    }

    @Override
    public void lock() {
        // 加锁,创建节点
        this.watchCallback.lock();
    }

    @Override
    public void unlock() {
        // 解锁,删除节点
        this.watchCallback.unLock();
    }

    @Override
    public void destory() {
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static ExecutorService executorService = Executors.newCachedThreadPool();

    public static void main(String[] args) throws InterruptedException {
        int[] count = {0};
        final ZookeeperLock zkLock = new ZookeeperLock(getZk());
        for (int i = 0; i < 100; i++) {
            executorService.submit(() -> {
                zkLock.start();

                zkLock.lock();
                count[0]++;
                zkLock.unlock();
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.HOURS);
        System.out.println(count[0]);

        zkLock.destory();
    }
}

成熟框架

那接下来我们就聊一聊成熟的框架是怎么实现分布式锁的:Curator

  • 实现方式是不变的,不过在我们上一版的基础丰富了更多的锁特性,并且实现更加稳定,调用更加方便
public class ZkLock extends AbstractLock {

    private static final Logger LOGGER = LoggerFactory.getLogger(ZkLock.class);

    /**
     * Zookeeper地址 ip:port
     */
    private final String zkAddr;
    /**
     * 总路径
     */
    private final String lockPath;
    private CuratorFramework client;
    private InterProcessLock lock;

    public ZkLock(String zkAddr, String lockPath) {
        this.zkAddr = zkAddr;
        this.lockPath = lockPath;
    }

    @Override
    public void lock() {
        try {
            this.lock.acquire();
        } catch (Exception e) {
            LOGGER.error("Lock异常,异常信息:{}", e.getMessage());
        }
    }

    @Override
    public boolean tryLock() {
        boolean isLocked = false;
        try {
            isLocked = this.lock.acquire(0, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOGGER.error("tryLock异常,异常信息:{}", e.getMessage());
        }
        return isLocked;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        boolean isLocked = false;
        try {
            isLocked = this.lock.acquire(time, unit);
        } catch (Exception e) {
            LOGGER.error("tryLock异常,异常信息:{}", e.getMessage());
        }
        return isLocked;
    }

    @Override
    public void unlock() {
        try {
            this.lock.release();
        } catch (Exception e) {
            LOGGER.error("unlock异常,异常信息:{}", e.getMessage());
        }
    }

    @Override
    public void start() {
        client = CuratorFrameworkFactory.newClient(
                this.zkAddr,
                new RetryNTimes(5, 5000)
        );
        client.start();
        if (client.getState() == CuratorFrameworkState.STARTED) {
            LOGGER.info("zk client start successfully!");
            LOGGER.info("zkAddress:{},lockPath:{}", this.zkAddr, lockPath);
        } else {
            throw new RuntimeException("客户端启动失败。。。");
        }
        this.lock = defaultLock(lockPath);
    }

    /**
     * 公平可重入锁
     *
     * @param lockPath 路径
     * @return InterProcessMutex
     */
    InterProcessLock defaultLock(String lockPath) {
        return new InterProcessMutex(client, lockPath);
    }
}

看看这个代码量是不是简洁了很多,虽然简洁,但是功能俱全。我们来验证一下:

private static ExecutorService executorService = Executors.newCachedThreadPool();

public static void main(String[] args) throws InterruptedException {
    ZkLock zkLock = new ZkLock("192.168.10.200:2181","/locks");
    zkLock.start();

    int[] num = {0};
    long start = System.currentTimeMillis();
    for(int i=0;i<200;i++){
        executorService.submit(()->{
            try {
                zkLock.lock();
                num[0]++;
            } catch (Exception e){
                throw new RuntimeException(e);
            } finally {
                zkLock.unlock();
            }
        });

    }
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.HOURS);
    System.out.println(num[0]);

}

完全OK!!!

最后

关于Zookeeper分布式锁的实现我们就介绍到这里。Zookeeper在实际使用中的场景还是非常丰富的,包括分布式协调等功能都在等着大家一一探索。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/17450.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JAVA泛型

泛型的由来 因为JAVA中假如构建了一个object集合&#xff0c;在集合里存储任何的数据类型对象&#xff0c;定义了一个字符串&#xff0c;又定义一个常数。呢么在遍历数组Arraylist的时候&#xff0c;在代码行里并不会报错&#xff0c;但是运行之后会出现ClassCastException异常…

传奇私服开服架设教程 传奇服务端设置教程

首先开区要使用两个必备的软件&#xff1a;DBC2000、传奇引擎&#xff0c;当然也少不了传奇服务端(传奇服务端在论坛免费服务端和商业服务端中有下载)。 先把下载好的服务端解压到D盘或者某个般的根目录(所谓根目录&#xff0c;就是D盘或E盘&#xff0c;不是D盘和E盘的某个文件…

TCN代码详解-Torch (误导纠正)

1. 绪论 TCN网络由Shaojie Bai&#xff0c; J. Zico Kolter&#xff0c; Vladlen Koltun 三人于2018提出。对于序列预测而言&#xff0c;通常考虑循环神经网络结构&#xff0c;例如RNN、LSTM、GRU等。他们三个人的研究建议我们&#xff0c;对于某些序列预测&#xff08;音频合…

基于Matlab仿真极化双基地雷达系统(附源码)

目录 一、系统设置 二、系统仿真 三、使用圆极化接收阵列 四、总结 五、程序 此示例演示如何仿真极化双基地雷达系统以估计目标的范围和速度。发射器、接收器和目标运动学被考虑在内。 一、系统设置 该系统以 300 MHz 的频率运行&#xff0c;使用线性 FM 波形&#xff0…

Devkit代码迁移工具——smartdenovo源码迁移

smartdenovo源码迁移 迁移前准备工作 1、服务器和操作系统正常运行。 2、PC端已经安装SSH远程登录工具。 3、Porting Advisor已在准备好的x86平台环境和鲲鹏平台环境中完成安装并正常运行。 4、待迁移的相关软件包、源代码已准备就绪。 迁移步骤 1、利用Porting Advisor的源码…

人工智能学习:ResNet神经网络(8)

ResNet是一种非常有效的图像分类识别的模型&#xff0c;可以参考如下的链接 https://blog.csdn.net/qq_45649076/article/details/120494328 ResNet网络由残差&#xff08;Residual&#xff09;结构的基本模块构成&#xff0c;每一个基本模块包含几个卷积层。其中&#xff0c;…

【MySQL数据库笔记 - 进阶篇】(五)锁

✍个人博客&#xff1a;https://blog.csdn.net/Newin2020?spm1011.2415.3001.5343 &#x1f4da;专栏地址&#xff1a;暂定 &#x1f4dd;视频地址&#xff1a;黑马程序员 MySQL数据库入门到精通 &#x1f4e3;专栏定位&#xff1a;这个专栏我将会整理 B 站黑马程序员的 MySQL…

硬件科普系列之显示篇——LCD与OLED知多少

前言 无论是手机还是电脑&#xff0c;作为机器与人交互最为频繁的硬件设备&#xff0c;显示屏一直是决定用户体验最为关键的因素之一。大家近几年在购买手机的时候&#xff0c;可以发现目前大部分手机都在使用OLED屏幕&#xff0c;那么你有没有思考过为什么各大厂商都在大力推…

jupuyter的背景主题

jupuyter的背景主题一.背景主题安装查看可用主题1.主题安装2. **查看可用主题**3.更换主题&#xff0c;字体等其他设置4.其他命令&#xff0c;还原原本主题二.每个主题的效果1.chesterish2. grade33.gruvboxd4.oceans165.onedork6.solarizedd7.solarizedl一.背景主题安装查看可…

上帝视角看Vue源码整体架构+相关源码问答

前言 这段时间利用课余时间夹杂了很多很多事把 Vue2 源码学习了一遍&#xff0c;但很多都是跟着视频大概过了一遍&#xff0c;也都画了自己的思维导图。但还是对详情的感念模糊不清&#xff0c;故这段时间对源码进行了总结梳理。 本篇文章更合适于已看过 Vue2 源码&#xff0c…

使用NNI对DLASeg剪枝的失败记录

本文希望对CenterNet算法的Backbone暨DLASeg进行剪枝。 剪枝试验涉及3个文件&#xff0c;分别为&#xff1a; DCN可变性卷积dcn_v2.py&#xff0c;因为DLASeg依赖DCN。 #!/usr/bin/env python from __future__ import absolute_import from __future__ import print_functio…

如何在 Windows 10上修复0x000006ba错误

修复0x000006ba错误 可能导致此错误代码的原因已确认的可行的解决办法运行打印机疑难解答重新启动后台打印程序服务清除 PRINTERS 文件夹运行 SFC 和 DISM 扫描启用打印机共享某些 Windows 10 在尝试在 Windows 10 上打印新文档时遇到0x000006ba错误代码。其他用户在尝试使用 W…

【面试题】line-height继承问题

1. line-height为具体数值 当父元素line-height的值为具体数值的时候&#xff0c;例如30px&#xff0c;则子元素的line-height直接继承该数值。 <style>body{font-size: 20px;line-height: 50px;}p{background-color: #ccc;font-size: 16px;} </style><body&g…

类和对象的初步介绍

文章目录面向对象的初步认识什么是面向对象面向对象与面向过程类定义和使用简单认识类类的定义格式随堂练习定义一个学生类类的实例化什么是实例化类和对象的说明this 引用为什么要有this引用什么时this引用this引用的特性对象的构造和初始化构造方法概念特性默认初始化就地初始…

Shell脚本学习指南(三)——文本处理工具

文章目录排序文本行的排序以字段的排序文本块排序sort的效率sort的稳定性sort小结删除重复重新格式化段落计算行数、字数以及字符数打印打印技术的演化其他打印软件提取开头或结尾数行排序文本 含有独立数据记录的文本文恶剪&#xff0c;通常都可以拿来排序。一个可预期的记录…

Vue3 - 组件通信(父传子)

前言 在 Vue3 中&#xff0c;父组件向子组件传参的方法。 与 Vue2 相比&#xff0c;还是有一些区别的。 基础示例 现在我们的需求是&#xff0c;要通过父组件&#xff0c;传递一个标题来让子组件显示。 子组件 Com.vue&#xff1a; <template><div>{{ title }}&l…

大数据工程师必备之数据可视化技术

可视化技术 数据&#xff1a; 偏耀明 7800 高军鹏 8000 代欣 8800 王国庆 20000 ​ 应对现在数据可视化的趋势&#xff0c;越来越多企业需要在很多场景(营销数据、生产数据、用户数据)下使用&#xff0c;可视化图表来展示体现数据&#xff0c;让数据更加直观&#xff0c;数…

tp6使用redis消息队列

尾部写入 for ($i1;$i<1000;$i){Cache::store(redis)->rpush(list,date("Y-m-d H:i:s")."消息{$i}"); }头部读取消息队列并删除 $list Cache::store(redis)->lpop(list); 1、新建个方法运行写入队列 public function hello(){for ($i1;$i<…

C++ Reference: Standard C++ Library reference: Containers: deque: deque: erase

C官网参考链接&#xff1a;https://cplusplus.com/reference/deque/deque/erase/ 公有成员函数 <deque> std::deque::erase C98 iterator erase (iterator position); iterator erase (iterator first, iterator last); C11 iterator erase (const_iterator position )…

Android 后台服务启动Actvity

一、问题背景 相机自动化测试需求&#xff0c;测试apk通过bindService绑定相机apk里面的一个服务&#xff0c;通过AIDL接口的方式向相机apk发送命令&#xff0c;服务接收到命令之后会拉起相机的Activity。原本没有人为干预的情况下是可以拉起这个Activity的&#xff0c;但是拉…