(五)库存超卖案例实战——使用zookeeper分布式锁解决“超卖”问题

news2025/7/18 11:01:12

前言

本节内容使用zookeeper实现分布式锁,完成并发访问“超卖”问题的解决。相对于redis分布式锁,zookeeper能够保证足够的安全性。关于zookeeper的安装内容这里不做介绍,开始本节内容之前先自行安装好zookeeper中间键服务。这里我们利用创建zookeeper路径节点的唯一性实现分布式锁。并同时演示如何使用Curator工具包,完成分布式锁。

正文

  • 在项目中添加zookeeper的pom依赖
<dependency>
	<groupId>org.apache.zookeeper</groupId>
	<artifactId>zookeeper</artifactId>
	<version>3.7.2</version>
</dependency>
  • 创建zookeeper客户端工具,实现加锁和解锁方法 

- 创建ZookeeperClient客户端工具

package com.ht.atp.plat.util;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;


@Component
public class ZookeeperClient {
    /**
     * zookeeper连接地址
     */
    private static final String connectString = "192.168.110.88:2181";

    /**
     * 分布式锁根路径
     */
    private static final String ROOT_PATH = "/distributed";

    /**
     * zookeeper客户端
     */
    private ZooKeeper zooKeeper;

    /**
     * 初始化zookeeper客户端
     */
    @PostConstruct
    public void init() {
        try {
            // 连接zookeeper服务器
            this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!"));
            // 创建分布式锁根节点
            if (this.zooKeeper.exists(ROOT_PATH, false) == null) {
                this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            System.out.println("获取链接失败!");
            e.printStackTrace();
        }
    }

    /**
     * 销毁zookeeper客户端
     */
    @PreDestroy
    public void destroy() {
        try {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 加锁
     *
     * @param lockName
     */
    public void lock(String lockName) {
        try {
            zooKeeper.create(ROOT_PATH + "/" + lockName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (Exception e) {
            // 重试
            try {
                Thread.sleep(200);
                lock(lockName);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
        System.out.println("----------加锁成功------------");
    }

    /**
     * 解锁
     *
     * @param lockName
     */
    public void unlock(String lockName) {
        try {
            this.zooKeeper.delete(ROOT_PATH + "/" + lockName, 0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        System.out.println("----------解锁成功------------");
    }
}

- 使用临时节点EPHEMERAL加锁,这里使用临时节点是方便锁的自动释放,避免发生死锁问题

- 业务执行完成,删除临时节点,解锁

  • 实现“超卖”的业务方法,使用自定义的zookeeper工具类加锁
    @Autowired
    ZookeeperClient zookeeperClient;

    @Override
    public void checkAndReduceStock() {
        zookeeperClient.lock("lock");

        // 查询库存
        WmsStock wmsStock = baseMapper.selectById(1L);
        // 验证库存大于0再扣减库存
        if (wmsStock != null && wmsStock.getStockQuantity() > 0) {
            wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);
            baseMapper.updateById(wmsStock);
        }

        // 释放锁
        zookeeperClient.unlock("lock");
    }
  • 将数据库库存表的库存恢复为10000,分别启动7000,7001,7002服务

  •  启动jmeter压测工具,压测库存扣减接口,查看结果

- 库存扣减为0

- jmeter压测结果,平均访问时间502ms,请求吞吐量为每秒161

- 不存在并发“超卖问题”,但是接口访问吞吐量较低。由于在加锁过程中,获取不到锁,会无限自旋去获取锁,导致性能下降。

  •  优化分布式锁,使用zk的临时序列化节点实现分布式锁,避免锁的自旋操作

- 优化代码

package com.ht.atp.plat.util;

import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;

@Component
public class ZookeeperClientNoBlock {
    /**
     * zookeeper连接地址
     */
    private static final String connectString = "192.168.110.88:2181";

    /**
     * 分布式锁根路径
     */
    private static final String ROOT_PATH = "/distributed";

    /**
     * zookeeper客户端
     */
    private ZooKeeper zooKeeper;

    /**
     * 初始化zookeeper客户端
     */
    @PostConstruct
    public void init() {
        try {
            // 连接zookeeper服务器
            this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!"));
            // 创建分布式锁根节点
            if (this.zooKeeper.exists(ROOT_PATH, false) == null) {
                this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            System.out.println("获取链接失败!");
            e.printStackTrace();
        }
    }

    /**
     * 销毁zookeeper客户端
     */
    @PreDestroy
    public void destroy() {
        try {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建锁
     *
     * @param lockName
     */
    public String lock(String lockName) {
        try {
            String realLock = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            return realLock;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("----------加锁成功------------");
        return null;
    }

    /**
     * 检查锁
     *
     * @param lockName
     */
    public void checkLock(String lockName) {
        String preNode = getPreNode(lockName);
        // 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑
        if (StringUtils.isEmpty(preNode)) {
            return;
        }
        // 重新检查。是否获取到锁
        try {

            Thread.sleep(20);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        checkLock(lockName);
    }

    /**
     * 获取指定节点的前节点
     *
     * @param path
     * @return
     */
    private String getPreNode(String path) {
        try {
            // 获取当前节点的序列化号
            Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));
            // 获取根路径下的所有序列化子节点
            List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);

            // 判空
            if (CollectionUtils.isEmpty(nodes)) {
                return null;
            }

            // 获取前一个节点
            Long flag = 0L;
            String preNode = null;
            for (String node : nodes) {
                // 获取每个节点的序列化号
                Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));
                if (serial < curSerial && serial > flag) {
                    flag = serial;
                    preNode = node;
                }
            }

            return preNode;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 解锁
     *
     * @param lockName
     */
    public void unlock(String lockName) {
        try {
            this.zooKeeper.delete(lockName, 0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        System.out.println("----------解锁成功------------");
    }
}

- 使用临时序列化节点EPHEMERAL_SEQUENTIAL加锁,保证每个节点都可以加锁成功,避免自旋操作

- 通过判断当前节点是否是第一个节点,如果是第一个节点,才可执行后续的业务

- 解锁操作

  • 扣减库存业务实现方法
public void checkAndReduceStock() {
        //加锁
        String lock = zookeeperClientNoBlock.lock("lock");
        //检查锁
        zookeeperClientNoBlock.checkLock(lock);

        // 查询库存
        WmsStock wmsStock = baseMapper.selectById(1L);
        // 验证库存大于0再扣减库存
        if (wmsStock != null && wmsStock.getStockQuantity() > 0) {
            wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);
            baseMapper.updateById(wmsStock);
        }

        // 释放锁
        zookeeperClientNoBlock.unlock(lock);
    }
  • 将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测

- 库存扣减为0

- 压测结果:平均访问时间1597ms,请求访问吞吐量为每秒62

从优化结果来看,此种方式更加耗时,性能更差。将加锁操作改为非自旋操作,虽然加锁不在耗时,但是会自旋判断自己是否是最小的节点,依然存在耗时操作,且逻辑更为复杂。

  • 优化分布式锁,通过使用zookeeper的Watcher监听来实现阻塞锁

- 实现代码

package com.ht.atp.plat.util;

import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.proto.WatcherEvent;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.CountDownLatch;


@Component
public class ZookeeperClientBlockWatch {
    /**
     * zookeeper连接地址
     */
    private static final String connectString = "192.168.110.88:2181";

    /**
     * 分布式锁根路径
     */
    private static final String ROOT_PATH = "/distributed";

    /**
     * zookeeper客户端
     */
    private ZooKeeper zooKeeper;

    /**
     * 初始化zookeeper客户端
     */
    @PostConstruct
    public void init() {
        try {
            // 连接zookeeper服务器
            this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!"));
            // 创建分布式锁根节点
            if (this.zooKeeper.exists(ROOT_PATH, false) == null) {
                this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            System.out.println("获取链接失败!");
            e.printStackTrace();
        }
    }

    /**
     * 销毁zookeeper客户端
     */
    @PreDestroy
    public void destroy() {
        try {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建锁
     *
     * @param lockName
     */
    public String lock(String lockName) {
        try {
            String realLock = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            return realLock;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("----------加锁成功------------");
        return null;
    }

    /**
     * 检查锁
     *
     * @param lockName
     */
    public void checkLock(String lockName) {
        try {
            String preNode = getPreNode(lockName);
            // 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑
            if (!StringUtils.isEmpty(preNode)) {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println("监控:"+ watchedEvent.getPath());
                        countDownLatch.countDown();
                    }
                }) == null) {
                    return;
                }
                // 阻塞,减少自旋检查
                countDownLatch.await();
            }
            return;

        } catch (Exception e) {
            e.printStackTrace();
            // 重新检查。是否获取到锁
            try {
                Thread.sleep(20);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            checkLock(lockName);
        }
    }

    /**
     * 获取指定节点的前节点
     *
     * @param path
     * @return
     */
    private String getPreNode(String path) {
        try {
            // 获取当前节点的序列化号
            Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));
            // 获取根路径下的所有序列化子节点
            List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);

            // 判空
            if (CollectionUtils.isEmpty(nodes)) {
                return null;
            }

            // 获取前一个节点
            Long flag = 0L;
            String preNode = null;
            for (String node : nodes) {
                // 获取每个节点的序列化号
                Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));
                if (serial < curSerial && serial > flag) {
                    flag = serial;
                    preNode = node;
                }
            }

            return preNode;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 解锁
     *
     * @param lockName
     */
    public void unlock(String lockName) {
        try {
            this.zooKeeper.delete(lockName, 0);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        System.out.println("----------解锁成功------------");
    }
}

- 检查加锁,加入前一个节点的监控判断,如果前一个节点的锁还没有释放,就使用CountDownLatch计数器阻塞程序,减少自旋检查,当监控到前一个节点的锁释放,则当前节点获取到锁,开始执行业务逻辑

  •  修改扣减库存业务实现方法为监控阻塞的方式
    @Autowired
    ZookeeperClientBlockWatch zookeeperClientBlockWatch;

    public void checkAndReduceStock() {
        //加锁
        String lock = zookeeperClientBlockWatch.lock("lock");
        //检查锁
        zookeeperClientBlockWatch.checkLock(lock);

        // 查询库存
        WmsStock wmsStock = baseMapper.selectById(1L);
        // 验证库存大于0再扣减库存
        if (wmsStock != null && wmsStock.getStockQuantity() > 0) {
            wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);
            baseMapper.updateById(wmsStock);
        }

        // 释放锁
        zookeeperClientBlockWatch.unlock(lock);
    }
  •  将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测

- 库存扣减为了0

- jmeter压测结果:平均访问时间441ms,请求访问吞吐量为每秒223

从优化结果来看,使用带监控的阻塞锁吞吐量更高,平均访问时间更小,性能比自旋加锁的方式性能更优。

  • 使用ThreadLocal优化分布式锁,将zookeeper的Watcher监听来实现阻塞锁优化为可重入分布式锁

- 优化代码

package com.ht.atp.plat.util;

import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.List;
import java.util.concurrent.CountDownLatch;


@Component
public class ZookeeperClientBlockWatchReentrant {
    /**
     * zookeeper连接地址
     */
    private static final String connectString = "192.168.110.88:2181";

    /**
     * 分布式锁根路径
     */
    private static final String ROOT_PATH = "/distributed";

    /**
     * zookeeper客户端
     */
    private ZooKeeper zooKeeper;

    /**
     * ThreadLocal本地线程
     */
    private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>();

    /**
     * 初始化zookeeper客户端
     */
    @PostConstruct
    public void init() {
        try {
            // 连接zookeeper服务器
            this.zooKeeper = new ZooKeeper(connectString, 30000, event -> System.out.println("获取链接成功!!"));
            // 创建分布式锁根节点
            if (this.zooKeeper.exists(ROOT_PATH, false) == null) {
                this.zooKeeper.create(ROOT_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            System.out.println("获取链接失败!");
            e.printStackTrace();
        }
    }

    /**
     * 销毁zookeeper客户端
     */
    @PreDestroy
    public void destroy() {
        try {
            if (zooKeeper != null) {
                zooKeeper.close();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建锁
     *
     * @param lockName
     */
    public String lock(String lockName) {
        try {
            if (THREAD_LOCAL.get() == null || THREAD_LOCAL.get() == 0){
                String realLock = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                return realLock;
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("----------加锁成功------------");
        return null;
    }

    /**
     * 检查锁
     *
     * @param lockName
     */
    public void checkLock(String lockName) {
        Integer flag = THREAD_LOCAL.get();
        if (flag != null && flag > 0) {
            THREAD_LOCAL.set(flag + 1);
            return;
        }
        try {
            String preNode = getPreNode(lockName);
            // 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑
            if (!StringUtils.isEmpty(preNode)) {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher() {
                    @Override
                    public void process(WatchedEvent watchedEvent) {
                        System.out.println("监控:"+ watchedEvent.getPath());
                        countDownLatch.countDown();
                    }
                }) == null) {
                    THREAD_LOCAL.set(1);
                    return;
                }
                // 阻塞,减少自旋检查
                countDownLatch.await();
            }
            THREAD_LOCAL.set(1);
            return;

        } catch (Exception e) {
            e.printStackTrace();
            // 重新检查。是否获取到锁
            try {
                Thread.sleep(20);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            checkLock(lockName);
        }
    }

    /**
     * 获取指定节点的前节点
     *
     * @param path
     * @return
     */
    private String getPreNode(String path) {
        try {
            // 获取当前节点的序列化号
            Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));
            // 获取根路径下的所有序列化子节点
            List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);

            // 判空
            if (CollectionUtils.isEmpty(nodes)) {
                return null;
            }

            // 获取前一个节点
            Long flag = 0L;
            String preNode = null;
            for (String node : nodes) {
                // 获取每个节点的序列化号
                Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));
                if (serial < curSerial && serial > flag) {
                    flag = serial;
                    preNode = node;
                }
            }

            return preNode;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 解锁
     *
     * @param lockName
     */
    public void unlock(String lockName) {
        try {
            THREAD_LOCAL.set(THREAD_LOCAL.get() - 1);
            if (THREAD_LOCAL.get() == 0) {
                this.zooKeeper.delete(lockName, 0);
                THREAD_LOCAL.remove();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        System.out.println("----------解锁成功------------");
    }
}

- 使用ThreadLocal存储当前线程的操作

- 加锁:先检测本地线程是否存在或者值是否为0,如果不存在或者为0,则创建锁,避免重复创建

- 检查加锁:如果值大于0,代表可重入,值加1;如果不存在或者等于0,将值设置为1,代表第一次获取到锁

- 解锁:获取线程的值,并减少1,如果减少后的值变为0,代表锁已经不在占用,此时才释放锁,并且将当前线程的值存储移除;如果减少的值还是大于0,代表此时锁还在占用

  •  扣减库存业务代码
    @Autowired
    ZookeeperClientBlockWatchReentrant zookeeperClientBlockWatchReentrant;

    public void checkAndReduceStock() {
        //加锁
        String lock = zookeeperClientBlockWatchReentrant.lock("lock");
        //检查锁
        zookeeperClientBlockWatchReentrant.checkLock(lock);

        // 查询库存
        WmsStock wmsStock = baseMapper.selectById(1L);
        // 验证库存大于0再扣减库存
        if (wmsStock != null && wmsStock.getStockQuantity() > 0) {
            wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);
            baseMapper.updateById(wmsStock);
        }

        // 释放锁
        zookeeperClientBlockWatchReentrant.unlock(lock);
    }
  • 将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测

- 库存扣减为0

- jmeter压测结果:平均访问时间426ms,吞吐量每秒231

  •  使用zookeeper的Curator工具包实现分布式锁

- 引入pom依赖

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.1.0</version>
</dependency>

- 创建CuratorFramework的bean工具类

package com.ht.atp.plat.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.WatchedEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * CuratorFramework工具类配置
 */
@Slf4j
@Configuration
public class ZookeeperConfig {
 
    @Bean
    public CuratorFramework curatorFramework() {
        // ExponentialBackoffRetry是种重连策略,每次重连的间隔会越来越长,1000毫秒是初始化的间隔时间,3代表尝试重连次数。
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 创建客户端
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.110.88:2181", retryPolicy);
        // 添加watched 监听器
        curatorFramework.getCuratorListenable().addListener((CuratorFramework client, CuratorEvent event) -> {
            CuratorEventType type = event.getType();
            if (type == CuratorEventType.WATCHED) {
                WatchedEvent watchedEvent = event.getWatchedEvent();
                String path = watchedEvent.getPath();
                log.info(watchedEvent.getType() + " ----------------------------> " + path);
                // 重新设置该节点监听
                if (null != path) {
                    client.checkExists().watched().forPath(path);
                }
            }
        });
        // 启动客户端
        curatorFramework.start();
        return curatorFramework;
    }
}

- 使用可重入锁,扣减库存业务方法

@Autowired
    private CuratorFramework curatorFramework;

    public void checkAndReduceStock() throws Exception {
        //加锁
        InterProcessMutex mutex = new InterProcessMutex(curatorFramework, "/curator/lock");
        try {
            // 加锁
            mutex.acquire();
            // 查询库存
            WmsStock wmsStock = baseMapper.selectById(1L);
            // 验证库存大于0再扣减库存
            if (wmsStock != null && wmsStock.getStockQuantity() > 0) {
                wmsStock.setStockQuantity(wmsStock.getStockQuantity() - 1);
                baseMapper.updateById(wmsStock);
            }
            this.testSub(mutex);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放锁
            mutex.release();
        }
    }

    public void testSub(InterProcessMutex mutex) throws Exception {

        try {
            mutex.acquire();
            System.out.println("测试可重入锁。。。。");
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 释放锁
            mutex.release();
        }
    }
  • 将扣减库存恢复为10000,重新启动7000,7001,7002服务,使用jmeter压测工具再次压测 

- 库存扣减为0

- jmeter压测结果:平均访问时间497,吞吐量每秒198

- 能够解决并发访问“超卖问题”

结语

关于使用zookeeper分布式锁解决“超卖”问题的内容到这里就结束了,我们下期见。。。。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1157652.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Redis与Mysql的数据一致性(双写一致性)

双写一致性&#xff1a;当修改了数据库的数据也要同时的更新缓存的数据&#xff0c;使缓存和数据库的数据要保持一致。 一般是在写数据的时候添加延迟双删的策略 先删缓存 再修改数据 延迟一段时间后再次删除缓存 这种方式其实不是很靠谱 一致性要求高 共享锁&#xff1a;读…

Leetcode刷题---删除有序数组中的重复项 II(双指针问题)

题目描述&#xff1a; 题目中已经给出该数组是一个升序的数组。要求数组中最多出现两个相同的元素&#xff0c;而且不能使用额外的存储空间&#xff0c;并且将新的数组的长度返回。 解题思想&#xff1a; 该题可以使用双指针来解决&#xff0c;我们可以定义一个快指针和一个…

安装docker报错:except yum.Errors.RepoError, e:

问题描述&#xff1a; 在安装docker的时候&#xff0c;配置阿里云地址出现以下问题 问题原因&#xff1a; linux 系统中存在多版本的python. yum 依赖 python 2, 而个人使用 python 3 导致. 解决办法&#xff1a; 修改 /usr/bin/yum-config-manager文件中第一行 #!/usr/bin/p…

项目间的”藕断丝连“——从零到一搓个组件库

文章从零到一的封装设计 Starter&#xff0c;并提供可插拔 Starter 以及元数据配置等说明&#xff0c;并在可插拔上与开源 Zuul 进行比对&#xff0c;希望大家看后有所收获。 SpringBoot Starter 1. Starter 定义 SpringBoot Starter 类似于一种插件机制&#xff0c;抛弃了之…

pycharm更改远程服务器地址

一、问题描述 在运行一些项目时&#xff0c;我们常需要在pycharm中连接远程服务器&#xff0c;但万一远程服务器的ip发生了变化&#xff0c;该如何修改呢&#xff1f;我们在file-settings-python interpreter中找到远程服务器&#xff0c;但是发现ip是灰色的&#xff0c;没有办…

kkFileview任意文件读取漏洞复现

一、kkFileview简介 kkFileView&#xff0c;一款成熟且开源的文件文档在线预览项目解决方案。kkFileView为文件文档在线预览解决方案&#xff0c;该项目使用流行的spring boot搭建&#xff0c;易上手和部署&#xff0c;基本支持主流办公文档的在线预览&#xff0c;如doc,docx,x…

你一般会什么时候使用CHATGPT?

在当今数字时代&#xff0c;人们对于人工智能&#xff08;AI&#xff09;的依赖程度日益增加&#xff0c;而ChatGPT作为一种强大的自然语言处理工具&#xff0c;吸引了人们的广泛关注和应用。那么&#xff0c;人一般在什么时候会想要使用ChatGPT呢&#xff1f;这个问题涵盖了多…

Debookee 8 for Mac网络数据分析工具

Debookee是一款用于网络数据流量分析和嗅探的软件。它为用户提供了一个直观的界面&#xff0c;让他们能够查看和分析来自从网络上的各种设备的数据流量。 Debookee具有以下主要功能&#xff1a; 实时监控&#xff1a;Debookee可以实时监控网络上的数据流量&#xff0c;并将其显…

对话InfoQ,聊聊百度开源高性能检索引擎 Puck

近日&#xff0c;百度宣布在 Apache 2.0 协议下开源自研检索引擎 Puck&#xff0c;这也是国内首个适用于超大规模数据集的开源向量检索引擎。向量检索算法在个性化推荐系统、多模态检索、自然语言处理等应用场景中都发挥着重要作用&#xff0c;特别是在处理大规模数据和高维特征…

MyBatis无法读取XML中的Method的乌龙事件

事件背景 同事反馈&#xff0c;相同的jar包&#xff0c;在多人本地的电脑、多台服务器中&#xff0c;都是可以正常启动的&#xff0c;只有在其中一台服务器&#xff0c;简称它为A&#xff0c;无法启动&#xff0c;因为启动后的初始化操作中有一个调用mybatis方法的操作&#x…

python实现MC协议(SLMP 3E帧)的TCP服务端(篇二)

python实现MC协议&#xff08;SLMP 3E帧&#xff09;的TCP服务端是一件稍微麻烦点的事情。它不像modbusTCP那样&#xff0c;可以使用现成的pymodbus模块去实现。但是&#xff0c;我们可以根据协议帧进行组包&#xff0c;自己去实现帧的格式&#xff0c;而这一切可以基于socket模…

聊聊无源滤波器与有源滤波器的概念、区别与应用

随着电子技术的迅速发展&#xff0c;电子设备得到广泛的应用&#xff0c;然而电磁环境污染日趋严重&#xff0c;已成为当今主要公害之一。在很多领域里&#xff0c;电磁兼容性已成为电气和电子产品必须有的技术指标或性能评价的依据&#xff0c;通过使用电源滤波器来过滤掉电源…

Debug技巧-不启用前端访问后端

在日常开发中&#xff0c;我们经常会遇到各种问题需要调试&#xff0c;前后端都启动需要耗费一定的时间和内存&#xff0c;方便起见&#xff0c;可以直接用抓包数据访问后端&#xff0c;这里我们需要用到Postman或者ApiFox 抓包数据 在系统前台触发后端请求&#xff0c;在控制…

LiveNVR监控流媒体Onvif/RTSP功能-支持海康摄像头通过海康SDK接入支持回看倍速播放海康设备存储的设备录像

LiveNVR支持海康摄像头通过海康SDK接入支持回看倍速播放海康设备存储的设备录像 1、流媒体服务说明2、支持海康SDK接入3、查看设备录像3.1、时间轴模式3.2、列表模式 4、RTSP/HLS/FLV/RTMP拉流Onvif流媒体服务 1、流媒体服务说明 LiveNVR可接入传统监控行业里面的高清网络摄像…

黑马 小兔鲜儿 uniapp 小程序开发- 微信登录用户模块- 06-07

黑马 小兔鲜儿 uniapp 小程序开发- 商品详情模块- day05-CSDN博客 小兔鲜儿 - 微信登录-06 涉及知识点&#xff1a;微信授权登录&#xff0c;文件上传&#xff0c;Store 状态管理等。 微信登录 微信小程序的开放能力&#xff0c;允许开发者获取微信用户的基本信息&#xff…

回馈式电子负载核心组成

回馈式电子负载是一种用于模拟负载电流和电压的测试设备。它由以下几个核心组成部分构成&#xff1a; 控制电路&#xff1a;控制电路是负载的核心部分&#xff0c;它负责接收输入的控制信号&#xff0c;并根据信号的要求来调整负载的工作状态。控制电路通常包括一个微处理器或者…

自主创建抖音商城小程序源码系统 带完整搭建教程

随着抖音平台的日益普及&#xff0c;越来越多的商家和用户选择在抖音上开展业务。抖音作为一款短视频社交平台&#xff0c;拥有庞大的用户群体和广阔的市场前景。今天罗峰就来给大家介绍一款抖音商城小程序源码系统&#xff0c;帮助用户快速创建自己的抖音商城&#xff0c;从而…

Revo Uninstaller Pro:终极卸载工具,彻底清除电脑痕迹

你是否曾为无法彻底卸载软件&#xff0c;残留大量无用文件而感到烦恼&#xff1f;是否曾因恶意软件难以清除&#xff0c;导致电脑运行缓慢&#xff1f;这些问题&#xff0c;Revo Uninstaller Pro都能帮你解决。 Revo Uninstaller Pro是一款专业的卸载工具&#xff0c;它不仅具…

国产系统(Linux)不支持长文件名的问题和解决方案

前言 众所周知&#xff0c;Linux系统中文件名长度不能超过255个字符&#xff01; 而大多数的Linux系统在显示中文时&#xff0c;使用的是UTF-8编码。这种编码在Linux中&#xff0c;一个中文需要占用3个字符&#xff01;因此&#xff0c;在Linux系统中&#xff0c;文件名最多也…

无测试组织:测试团队的敏捷转型

文章目录 写在前面01 从测试角度理解敏捷理念什么是敏捷&#xff1f;测试人员应该怎样理解敏捷理念&#xff1f;敏捷宣言对于测试活动的启发与思考总结如下敏捷原则12条敏捷实践框架为什么要做敏捷 02 什么是敏捷测试03 敏捷测试为什么会失败04 诊断脑暴会的成果示例测试团队转…