[JDK源码]-J.U.C-ScheduledThreadPoolExecutor

发布时间:2022-06-27 发布网站:脚本宝典
脚本宝典收集整理的这篇文章主要介绍了[JDK源码]-J.U.C-ScheduledThreadPoolExecutor脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。

以下是在学习中整理的一些内容,如有错误点,多谢指出。

ScheduledExecutorService

可以用来在给定延时后执行异步任务或者周期性执行任务,由于放入的任务不一定能够立即执行,所以还是需要得放入队列,然后获取,看看是否满足执行条件F1a;时间是否满足。

ScheduledExecutorService接口

public interface ScheduledExecutorService extends ExecutorService {
	//在延迟delay时间后执行command。unIT为时间单位。只调度一次
    public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
	//执行callable。
    public <V> ScheduleDFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
	//基于 上一次开始时间 来延迟固定时间后执行下一次任务
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long PEriod,TimeUnit unit);
	//基于 上一次结束时间
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

ScheduledThreadPoolExecutor核心变量

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
    //在关闭时应该取消周期性任务
    PRivate volatile boolean continueExistingPeriodicTasksAfterShutdown;
	//如果在关闭时应该取消非周期性的任务
    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
	//是否应该从队列中删除
    private volatile boolean removeOnCancel = false;
	//顺序号, 保证FIFO
    private static final AtomicLong sequencer = new AtomicLong();

// ScheduledFutureTask类		用于封装Runnable Callable 对象
	private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
		//序号
        private final long sequenceNumber;
		//以纳秒为单位,表明该任务下一次能够被调度的时间
        private long time;
		//重复任务的周期
        private final long period;
		//被 reExecutePeriodic 方法重新加入队列中的实际任务,默认当前任务
        RunnableScheduledFuture<V> outerTask = this;
		//延迟队列的索引
        int heapIndex;
		//用于取消任务执行
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            //如果取消成功,则从任务队列里移除任务
            if (cancelled && removeOnCancel &amp;& heapIndex >= 0)
                remove(this);
            return cancelled;
        }
    	public void run() {
		    boolean periodic = isPeriodic();//是不是周期性执行任务
		    //判断是否能继续执行
		    if (!canRunIncurrentRunState(periodic))
		        cancel(false);
		    else if (!periodic)//不是周期性调度任务,直接调用run
		        ScheduledFutureTask.super.run(); //FutureTask的run方法
		    else if (ScheduledFutureTask.super.runAndReset()) {//是周期性调度任务runAndReset方法执行
		        setNextRunTime(); //设置下一次调度时间
		        reExecutePeriodic(outerTask);//通过这个进行调度
		    }
		}
     	private void setNextRunTime() {
		    long p = period;
		    if (p > 0)//p>0 任务开始执行的时间 +周期调度时间
		        time += p;
		    else
		        //如果period 小于0 ,以任务执行完毕后的的时间来计算下一次执行的时间
		        time = triggerTime(-p);
		}   
        //将任务重新放入任务队列中执行
		void reExecutePeriodic(RunnableScheduledFuture<?> task) {
		    //根据当前线程池状态,判断当前任务是否允许被执行
		    if (canRunInCurrentRunState(true)) {
		        super.getQueue().add(task);	//将任务添加到延迟队列中
		        if (!canRunInCurrentRunState(true) && remove(task))//再次判断是否应该执行
		            task.cancel(false); //取消任务执行
		        else
		            //正常情况下,保证线程池中至少有一个工作线程在处理任务
		            ensurePrestart();
		    }
		}
	} 
//延时队列的实现原理:DelayedWorkQueue :因为都是周期性任务 带有时间的 对其排序 使用的是小根堆
    static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
        //初始容量为 16
        private static final int INITIAL_CAPACITY = 16;
        //任务队列  数组
        private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        private final ReentrantLock lock = new ReentrantLock();
        private int size = 0;
		//线程Leader
        private Thread leader = null;
        //条件变量用于工作线程等待执行任务
        private final Condition available = lock.newCondition(); 
    }
}

scheduleAtFixedRate实现

表示周期性任务调度,每次任务基于上一次任务开始执行的时间来决定下次启动时间

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalargumentException();
    //triggerTime 用于计算该任务应被调度的时间,unit.toNanos(period) 用于执行周期变为纳秒
    ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}
//封装成了 ScheduledFutureTask 对象,通过 decorateTask, 调用 delayedExecute 方法执行
protected <V> RunnableScheduledFuture<V> decorateTask(
    Callable<V> callable, RunnableScheduledFuture<V> task) {
    return task;
}

delayedExecute方法

//主执行周期性调度或者延迟任务的方法。
private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())//SHURDOWN 了?
        reject(task);
    else {
        super.getQueue().add(task); // 添加到队列里去
        if (isShutdown() && 
            !canRunInCurrentRunState(task.isPeriodic()) &&//当前线程是不是在shutdown 的状态下执行
            remove(task))//为false的情况下 从当前队列移除  然后取消当前任务
            task.cancel(false);
        else
            //确保至少还有一个线程在执行任务
            ensurePrestart();
    }
}

ensurePrestart方法

如果线程池未关闭那么 ensurePrestart方法

//保证的是至少启动一个核心线程
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());//获取工作线程数
    if (wc < corePoolSize)//如果小于 核心线程数
        addWorker(null, true);//添加 核心线程数
    else if (wc == 0)//至少保证还有一个工作线程
        addWorker(null, false);
}

add添加

如果线程未关闭,那么直接调用 DelayedWorkQueue 里的 add -> offer 方法,将任务task 放入队列中

public boolean add(Runnable e) {
    return offer(e);
}
    public boolean offer(Runnable x) {//向数组中添加任务
        if (x == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {//要不要扩容
            int i = size;
            if (i >= queue.length)
                grow();
            size = i + 1;
            //之前没有任务,放在第一位就行
            if (i == 0) {
                queue[0] = e;
                setIndex(e, 0);
            } else {
                //否则将任务放到最后一位,然后通过siftUp 方法调整
                siftUp(i, e);
            }
           //如果队列中的第一个任务是当前e则清除leader线程,然后唤醒一个等待队列可用的线程来执行任务 
            if (queue[0] == e) {
                leader = null;
                available.signal();
            }
        } finally {
            lock.unlock();
        }
        return true;
    }

grow 扩容方法

private void grow() {
    int oldCapacity = queue.length;
    //每次扩容 50%
    int newCapacity = oldCapacity + (oldCapacity >> 1); 
    if (newCapacity < 0) //如果新容量小于0,那么表明溢出了
        newCapacity = Integer.MAX_VALUE;
    //将 old数组任务 复制到相信数组上
    queue = Arrays.copyOf(queue, newCapacity);
}

siftUp调整方法

//调整堆
private void siftUp(int k, RunnableScheduledFuture<?> key) {
    //当K>0时,不断进行调整。k等于0表明调整到了根节点,也就是第一个元素,这时必须退出循环 
    while (k > 0) {
        // 找到他的 父亲节点   
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture<?> e = queue[parent];
        //与父亲节点比较 看看需不需要动,直到找到 位置
        if (key.compareTo(e) >= 0)
            break;
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}

canRunInCurrentRunState

如果在添加到优先级队列后,线程池已经关闭,需要通过 来判断是否应该继续执行该任务

/*通过是否是周期任务判断 continueExistingPeriodicTasksAfterShutdown(线程池shutdown后是否执行周期性任务)
executeExistingDelayedTasksAfterShutdown(线程池shutdown后是否执行延迟任务) 是否应该继续执行任务  */
boolean canRunInCurrentRunState(boolean periodic) {
    return isRunningOrShutdown(periodic ?
                               continueExistingPeriodicTasksAfterShutdown :
                               executeExistingDelayedTasksAfterShutdown);
}

take 方法

//从队列中获取任务,如果当前队列没有任务可取,则阻塞直到队列有任务,即等待offer方法唤醒
public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();//可响应中断的方式加锁
    try {
        for (;;) {
            RunnableScheduledFuture<?> First = queue[0];
            //如果队列第一个任务为null,则证明没有任务了,当前线程等待
            if (first == null)
                available.await();
            else {//否则获取第一个任务的剩余等待时间,判断是否小于0.需不需要执行
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // 在线程等待任务可执行时不保留引用
	  //由于任务还需要等待一段时间才能执行,这时看看前面有没有线程正在等待,如果有,则当前线程继续等待
                if (leader != null)//根本没有必要让拿到第一个任务的线程等待
                    available.await(); 
                else {/*如果前面没有线程等待,则把自己设置为leader线程,然后开始等待delay时间
                	这时如果再来别的线程获取任务,就只能让这个成为leader的线程延迟被唤醒*/
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        //将 leader 变量去掉
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        /*当任务被线程获取后,判断leader是否为空且队列不为空,由于没有线程去等待或者获取队列中的下一个任务,因此需要唤醒一个线程担任leader等待或者获取下一个任务*/
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}
//完成最终的任务出队,这里传入的f 为第一个等待任务。由于任务被出队,因此需要调整堆 
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    //当前任务队列内任务数量 --,然后取出队列尾部的一个任务,调用siftDown重新调整堆的顺序
    int s = --size;
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}

siftDown : 调整堆

//调整堆。将任务向堆尾移动。
private void siftDown(int k, RunnableScheduledFuture<?> key) {
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;	//获取左孩子节点索引
        RunnableScheduledFuture<?> c = queue[child];
        int right = child + 1;		//右孩子
        if (right < size && c.compareTo(queue[right]) > 0)//比较左右孩子的大小
            c = queue[child = right];
        if (key.compareTo(c) <= 0)// 以左右孩子的min 来和 key比较
            break;
        queue[k] = c;		//如果key 小于 他们的min 那么交换
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;	//此时k 即为传入key 应该存放下标
    setIndex(key, k);
}

scheduleWithFixedDelay实现

scheduleAtFixedRate 和 scheduleWithFixedDelay,前者是基于任务开始时间计算的 ,后者 是 基于上一个任务执行完成的时间计算的

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalargumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

shutdown

ThreadPoolExecutor 的shutdown ->onShutdown

@override void onShutdown() {
    //首先获取任务队列 q
    BlockingQueue<Runnable> q = super.getQueue();
    //默认 true
    boolean keepDelayed = 
        getExecuteExistingDelayedTasksAfterShutdownpolicy();//线程池关闭后是否应该继续执行延迟任务标志
        //默认false
    boolean keepPeriodic =
        getContinueExistingPeriodicTasksAfterShutdownPolicy();//线程池关闭后是否应该继续执行周期性任务标志
    //判断是否在线程池shutdown后继续执行延迟任务,是否继续执行周期性调度任务
    if (!keepDelayed && !keepPeriodic) {//都不是,将任务队列清空,同时取消任务执行
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        q.clear();
    }
    else {
        //否则遍历任务队列,分别处理周期任务和延迟任务
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t =
                    (RunnableScheduledFuture<?>)e;
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) { // 这里的 t.isCancelled() 表示任务已经被取消,应移除
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    tryTerminate();//调用该方法尝试进一步转换线程池状态
}

脚本宝典总结

以上是脚本宝典为你收集整理的[JDK源码]-J.U.C-ScheduledThreadPoolExecutor全部内容,希望文章能够帮你解决[JDK源码]-J.U.C-ScheduledThreadPoolExecutor所遇到的问题。

如果觉得脚本宝典网站内容还不错,欢迎将脚本宝典推荐好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。