同步工具类:Semaphore
- 介绍
- 源码分析
- 构造函数
- acquire 获取信号量
- release 释放信号量
 
- 业务场景
- 代码
- 测试结果
 
- 总结
介绍
官方说明:
 Semaphore用于限制可以访问某些资源(物理或逻辑的)的线程数目,他维护了一个许可证集合,有多少资源需要限制就维护多少许可证集合,假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。一个线程获取许可证就调用acquire方法,用完了释放资源就调用release方法。
 通俗说明:
 Semaphore 中文俗称信号量,主要用于控制流量,比如:数据库连接池给你分配10个链接,那么让你来一个连一个,连到10个还没有人释放,那你就等等。
源码分析
构造函数
permits 可以理解为许可证,默认情况下只需要传入permits即可。也就是一次运行放行几个线程。如果你需要使用Semaphore 共享锁中的公平锁,那么可以传入第二个构造函数fair= false/true.
public Semaphore(int permits) {
		//permits 为许可数量,默认构造非公平版本
        sync = new NonfairSync(permits);
    }
public Semaphore(int permits, boolean fair) {
		//permits 为许可数量,fair 来决定构造公平版本还是非公平版本
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
acquire 获取信号量
如下所示,其实获取信号量的这四个方法,主要就是,一次获取几个和是否响应中断的组合。
 是否响应中断 可以理解为 其他线程调用interrupt方法时,该线程是否做出响应(一般是抛出异常)。
 下面举例了 acquire()方法。调用了AQS的模板方法 acquireSharedInterruptibly();其中Semaphore 实现了公平锁和非公平锁的tryAcquireShared()方法。公平锁的是首先判断队列里面有没有排队的。如果有的话,就需要等待阻塞。非公平锁 是一上来就死循环 去拿资源。
| 方法 | 描述 | 
|---|---|
| semaphore.acquire() | 一次获取一个信号量,响应中断 | 
| semaphore.acquire(2) | 一次获取n个信号量,响应中断(一次占2个坑) | 
| semaphore.acquireUninterruptibly() | 一次获取一个信号量,不响应中断 | 
| semaphore.acquireUninterruptibly(2) | 一次获取n个信号量,不响应中断 | 
 public void acquire() throws InterruptedException {
         //调用AQS 的模板方法
        sync.acquireSharedInterruptibly(1);
    }
 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
            //此处响应中断,抛出InterruptedException()
        if (Thread.interrupted())
            throw new InterruptedException();
            //Semaphore 的公平锁和非公平锁都实现了tryAcquireShared方法
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
公平锁:
     protected int tryAcquireShared(int acquires) {
            for (;;) {
                //调用AQS的 hasQueuedPredecessors方法,查看是否
                //有其他线程排队,有的话就返回-1,不获取资源,直接进入阻塞
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                //获取的资源需要小于剩余的资源
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
非公平锁:
protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                //判断资源是否够用
                int remaining = available - acquires;
                //够用的话直接 cas 修改返回
                //不够用的话也是cas 修改
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
release 释放信号量
有获取就得有释放,获取了几个信号量就要释放几个信号量
| 方法 | 描述 | 
|---|---|
| semaphore.release() | 一次释放一个信号量 | 
| semaphore.release(2) | 一次获取n个信号量 | 
public void release() {
		//调用AQS的模板方法 releaseShared()
        sync.releaseShared(1);
    }
  public final boolean releaseShared(int arg) {
  		//Semaphore Sync 实现了tryAcquireShared方法
  		//公平锁和非公平锁都是一样的逻辑
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                //对参数进行校验,首先不能是负数
                //其次不能两数相加超过了Integer.MaxValue 从而溢出了
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                    //在循环里面将资源释放回去即可
                if (compareAndSetState(current, next))
                    return true;
            }
        }
业务场景
此处模拟8个人同时去上厕所,但是测试只有两个。所以需要排队同步,此时可以利用Semaphore。
代码
/**
 * @author :echo_黄诗
 * @description:Semaphore 的业务模拟
 * @date :2023/3/1 17:11
 */
public class Demo {
    public static void main(String[] args) {
        //定义两个资源(此时是厕所)
        Semaphore semaphore=new Semaphore(2);
        //定义核心线程池个数为8
        ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(8,20,20,
              TimeUnit.SECONDS,new ArrayBlockingQueue(15));
        //定义8个人来上厕所
      for (int i=0;i<8;i++){
          threadPoolExecutor.execute(()->{
              try {
                  semaphore.acquire();
                  System.out.println(new SimpleDateFormat("YYYY-MM-DD HH:mm:ss").format(new Date()));
                   //模拟上厕所耗时
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }finally {
                  //上完厕所后,需要离开(释放资源)
                  semaphore.release();
              }
              System.out.println(Thread.currentThread().getName()+": 来上厕所了,");
          });
      }
    }
}
测试结果

总结
一:Semaphore 类的核心是Sync,它继承了AQS类,并重写了几个关键方法来实现自己的特殊功能。
 二:Semaphore 类中实现了公平锁和非公平锁,默认是非公平。和 ReentrantLock 类似。
 三:Semaphore 类中的资源是使用了AQS中的state属性。
 四: Semaphore 和CountDownLatch 的实现方式比较类似。大家可以比较学习。
 CountDownLatch 学习可以参考:
 https://blog.csdn.net/echohuangshihuxue/article/details/129280219
 大家有什么补充的,随时欢迎。



















