
       📝个人主页:五敷有你      
  🔥系列专栏:并发编程
 ⛺️稳重求进,晒太阳

示意图

步骤1:自定义任务队列
变量定义
- 用Deque双端队列来承接任务
 - 用ReentrantLock 来做锁
 - 并声明两个条件变量 Condition fullWaitSet emptyWaitSet
 - 最后定义容量 capcity
 
方法:
- 添加任务 
  
- 注意点: 
    
- 任务容量慢了 用await
 - 每个添加都进行一个emptyWaitSet.signalAll 唤醒沉睡的线程
 - 考虑万一死等的情况,加入时间的判断
 
 
 - 注意点: 
    
 - 取出任务 
  
- 注意点: 
    
- 任务空了 用await
 - 每个任务取出来都进行一个fullWaitSet.signAll来唤醒沉睡的线程
 - 考虑超时的情况,加入时间的判断
 
 
 - 注意点: 
    
 
public class MyBlockQueue<T> {
    //1.任务队列
    private Deque<T> deque=new ArrayDeque();
    //2.锁
    private ReentrantLock lock=new ReentrantLock();
    //3.生产者条件变量
    private Condition fullWaitSet=lock.newCondition();
    //4.消费者条件变量
    private Condition emptyWaitSet=lock.newCondition();
    //5.容量
    private int capcity;
    public MyBlockQueue(int capcity) {
        this.capcity = capcity;
    }
    //带超时的阻塞获取
    public T poll(long timeOut, TimeUnit unit){
        lock.lock();
        try {
            //将timeOUt转换成统一转换为ns
            long nanos = unit.toNanos(timeOut);
            while (deque.isEmpty()) {
                try {
                    //返回值=等待时间-经过的时间
                    if(nanos<=0){
                        return null;
                    }
                   nanos= emptyWaitSet.awaitNanos(nanos);
                }catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            T t = deque.removeFirst();
            fullWaitSet.signalAll();
            return t;
        }finally {
            lock.unlock();
        }
    }
    //6. 阻塞获取
    public T take() {
        lock.lock();
        try {
            while (deque.isEmpty()) {
                try {
                    emptyWaitSet.await();
                }catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
         T t = deque.removeFirst();
            fullWaitSet.signalAll();
            return t;
        }finally {
            lock.unlock();
        }
    }
    //阻塞添加
    public void put(T element){
        lock.lock();
        try {
            while (deque.size()==capcity){
                try {
                    fullWaitSet.await();
                }catch (Exception e){
                }
             }
            deque.addLast(element);
            emptyWaitSet.signalAll();
        } finally {
            lock.unlock();
        }
    }
    public int size(){
        lock.lock();
        try {
            return deque.size();
        }finally {
            lock.unlock();
        }
    }
    } 
步骤2:自定义线程池
- 定义变量: 
  
- 任务队列 taskQueue
 - 队列的容量
 - 线程的集合
 - 核心线程数
 - 获取任务的超时时间
 - 时间单位
 
 - 方法 
  
- 构造方法 初始化一些核心的参数
 - 执行方法 execute(task) 里面处理任务 
    
- 每执行一个任务就放入一个worker中,并开启线程执行 同时放入workers集合中
 - 当任务数量>核心数量时,就加入到阻塞队列中
 
 
 - 自定义的类worker 
  
- 继承Thread 重写Run方法 
    
- 执行传递的任务,每次任务执行完毕,不回收,
 - 去队列中拿任务 当队列也空了之后 workers集合中移除线程,线程停止。
 
 
 - 继承Thread 重写Run方法 
    
 
package com.aqiuo.juc;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
    //任务队列
    private MyBlockQueue<Runnable> taskQueue;
    //队列容量
    int queueCapcity;
    //线程集合
    private HashSet<Worker> workers=new HashSet();
    //线程池的核心线程
    private int coreSize;
    //获取任务的超时时间
    private long timeOut;
    //时间单位
    private TimeUnit timeUnit;
    public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity) {
        this.coreSize = coreSize;
        this.timeOut = timeOut;
        this.timeUnit = timeUnit;
        taskQueue=new MyBlockQueue<>(queueCapcity);
    }
    public void exectue(Runnable task){
            //当任务数没有超过coreSize时,直接交给work对象执行
            //如果任务超过coreSize时,加入任务队列
      synchronized (workers){
          if(workers.size()<coreSize){
              Worker worker=new Worker(task);
              System.out.println("新增worker");
              workers.add(worker);
              worker.start();
              //任务数超过了核心数
          }else{
              System.out.println(task+"加入任务队列");
              taskQueue.put(task);
          }
      }
    }
    class Worker extends Thread{
        private Runnable task;
        public Worker(Runnable task){
            this.task=task;
        }
        @Override
        public void run() {
            //执行任务
            //1)当task不为空,执行任务
            //2)当task执行完毕,再接着从任务队列中获取任务
            while (task!=null||(task=taskQueue.take())!=null){
                try {
                    System.out.println("正在执行worker"+this);
                    sleep(10000);
                    task.run();
                } catch (Exception e) {
                }finally {
                    task=null;
                }
            }
            //执行完任务后销毁线程
            synchronized (workers){
                workers.remove(this);
            }
        }
    }
    
}
 
测试
开启15个线程测试
public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
        for (int i=0;i<15;i++){
            int j=i;
            threadPool.exectue(()->{
                System.out.println(j);
            });
        }
    } 


执行过程中,超过了队列容量之后,就会发生fullWaitSet阻塞。这个阻塞的线程就开始等待,当有队列不满之后,唤醒fullWaitSet阻塞的队列,
同理,当队列为空,emptyWaitSet小黑屋阻塞,当有任务被放入,EmptyWaitSet唤醒所有的线程。
这就有一个执行完毕之后,线程不会停止,他会一定等待拿去任务,线程阻塞了EmptyWaitSet
改进
获取任务的超时结束
获取任务take的增强 超时
  //带超时的阻塞获取
    public T poll(long timeOut, TimeUnit unit){
        lock.lock();
        try {
            //将timeOUt转换成统一转换为ns
            long nanos = unit.toNanos(timeOut);
            while (deque.isEmpty()) {
                try {
                    //返回值=等待时间-经过的时间
                    if(nanos<=0){
                        return null;
                    }
                   nanos= emptyWaitSet.awaitNanos(nanos);
                }catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            T t = deque.removeFirst();
            fullWaitSet.signalAll();
            return t;
        }finally {
            lock.unlock();
        }
    } 
修改worker的run函数
      public void run() {
            //执行任务
            //1)当task不为空,执行任务
            //2)当task执行完毕,再接着从任务队列中获取任务
//            while (task!=null||(task=taskQueue.take())!=null){
            //修改如下
            while (task!=null||(task=taskQueue.poll(timeOut,timeUnit))!=null){
                try {
                    System.out.println("正在执行worker"+this);
                    sleep(1000);
                    task.run();
                } catch (Exception e) {
                }finally {
                    task=null;
                }
            } 
正常结束了

放入任务的超时结束offer()
那么有装入任务 的增强 ,就再提供一个超时装入入offer()吧 ,当放入一个满的队列时,超时后返回false不再放入
//带有超时的队列添加
public Boolean offer(T element,long timeOut, TimeUnit unit){
    lock.lock();
    long nanos = unit.toNanos(timeOut);
    try {
        while (deque.size()==capcity){
            try {
                long l = fullWaitSet.awaitNanos(nanos);
                if(l<=0){
                    return false;
                }
            }catch (Exception e){
            }
        }
        deque.addLast(element);
        emptyWaitSet.signalAll();
        return true;
    } finally {
        lock.unlock();
    }
} 
拒绝策略
函数式接口
@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {
    void reject(MyBlockQueue<T> queue, T task);
} 
代码改进
如下部分代码是存入任务的部分
public void exectue(Runnable task){
            //当任务数没有超过coreSize时,直接交给work对象执行
            //如果任务超过coreSize时,加入任务队列
      synchronized (workers){
          if(workers.size()<coreSize){
              Worker worker=new Worker(task);
              System.out.println("新增worker");
              workers.add(worker);
              worker.start();
              //任务数超过了核心数
          }else{
              //存入任务
              //taskQueue.put(task);
              //当队列满了之后 执行的策略
              //1) 死等
              //2)带有超时的等待
              //3)当调用者放弃任务执行
              //4)让调用者抛出异常
              //5)让调用者自己执行任务...
              //为了增加灵活性,这里不写死,交给调用者
              //重新写了一个放入任务的方法
              taskQueue.tryPut(rejectPolicy,task);
          }
      }
    } 
阻塞队列里的tryPut
public void tryPut(ThreadPool.RejectPolicy<T> rejectPolicy, T task) {
    lock.lock();
    try {
        //如果队列容量满了,就开始执行拒绝策略
        if(capcity>= deque.size()){
            rejectPolicy.reject(this,task);
        }else{
            //不满就正常加入到队列中
            System.out.println(task+"正常加入到队列");
            deque.addLast(task);
        }
    }finally {
        lock.unlock();
    }
} 
//1) 死等
//2)带有超时的等待
//3)当调用者放弃任务执行
//4)让调用者抛出异常
//5)让调用者自己执行任务...
谁调用方法,谁写拒绝策略
为了传入策略,就再构造函数里面加入一个方法的参数传入
//部分代码...
//拒绝策略
RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
    this.coreSize = coreSize;
    this.timeOut = timeOut;
    this.timeUnit = timeUnit;
    taskQueue=new MyBlockQueue<>(queueCapcity);
    this.rejectPolicy=rejectPolicy;
} 
主函数编写拒绝的策略,就lamda表达式会把...
public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,(queue,task)->{
            //死等
//            queue.put(task);
            //超时添加
//            System.out.println(queue.offer(task, 100, TimeUnit.NANOSECONDS));
            //放弃执行
//            System.out.print("我放弃");
            //调用者抛出异常
//            throw new RuntimeException("任务执行失败");
            //调用者执行
//            task.run();
        });
        for (int i=0;i<5;i++){
            int j=i;
            threadPool.exectue(()->{
                System.out.println(j);
            });
        }
    } 
五种拒绝策略的结果(我不会用slog4j)
1.死等的结果

2.超时拒绝的结果(每个false都是时间到了,每加进去)

3.不作为,调用者放弃任务

4.抛出异常,停止

5.调用者线程执行了


















