脚本宝典收集整理的这篇文章主要介绍了[JDK源码]-J.U.C-ScheduledThreadPoolExecutor,脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
以下是在学习中整理的一些内容,如有错误点,多谢指出。
可以用来在给定延时后执行异步任务或者周期性执行任务,由于放入的任务不一定能够立即执行,所以还是需要得放入队列,然后获取,看看是否满足执行条件F1a;时间是否满足。
public interface ScheduledExecutorService extends ExecutorService {
//在延迟delay时间后执行command。unIT为时间单位。只调度一次
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
//执行callable。
public <V> ScheduleDFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
//基于 上一次开始时间 来延迟固定时间后执行下一次任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long PEriod,TimeUnit unit);
//基于 上一次结束时间
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
//在关闭时应该取消周期性任务
PRivate volatile boolean continueExistingPeriodicTasksAfterShutdown;
//如果在关闭时应该取消非周期性的任务
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
//是否应该从队列中删除
private volatile boolean removeOnCancel = false;
//顺序号, 保证FIFO
private static final AtomicLong sequencer = new AtomicLong();
// ScheduledFutureTask类 用于封装Runnable Callable 对象
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
//序号
private final long sequenceNumber;
//以纳秒为单位,表明该任务下一次能够被调度的时间
private long time;
//重复任务的周期
private final long period;
//被 reExecutePeriodic 方法重新加入队列中的实际任务,默认当前任务
RunnableScheduledFuture<V> outerTask = this;
//延迟队列的索引
int heapIndex;
//用于取消任务执行
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
//如果取消成功,则从任务队列里移除任务
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
public void run() {
boolean periodic = isPeriodic();//是不是周期性执行任务
//判断是否能继续执行
if (!canRunIncurrentRunState(periodic))
cancel(false);
else if (!periodic)//不是周期性调度任务,直接调用run
ScheduledFutureTask.super.run(); //FutureTask的run方法
else if (ScheduledFutureTask.super.runAndReset()) {//是周期性调度任务runAndReset方法执行
setNextRunTime(); //设置下一次调度时间
reExecutePeriodic(outerTask);//通过这个进行调度
}
}
private void setNextRunTime() {
long p = period;
if (p > 0)//p>0 任务开始执行的时间 +周期调度时间
time += p;
else
//如果period 小于0 ,以任务执行完毕后的的时间来计算下一次执行的时间
time = triggerTime(-p);
}
//将任务重新放入任务队列中执行
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
//根据当前线程池状态,判断当前任务是否允许被执行
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task); //将任务添加到延迟队列中
if (!canRunInCurrentRunState(true) && remove(task))//再次判断是否应该执行
task.cancel(false); //取消任务执行
else
//正常情况下,保证线程池中至少有一个工作线程在处理任务
ensurePrestart();
}
}
}
//延时队列的实现原理:DelayedWorkQueue :因为都是周期性任务 带有时间的 对其排序 使用的是小根堆
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
//初始容量为 16
private static final int INITIAL_CAPACITY = 16;
//任务队列 数组
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
//线程Leader
private Thread leader = null;
//条件变量用于工作线程等待执行任务
private final Condition available = lock.newCondition();
}
}
表示周期性任务调度,每次任务基于上一次任务开始执行的时间来决定下次启动时间
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalargumentException();
//triggerTime 用于计算该任务应被调度的时间,unit.toNanos(period) 用于执行周期变为纳秒
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
//封装成了 ScheduledFutureTask 对象,通过 decorateTask, 调用 delayedExecute 方法执行
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
//主执行周期性调度或者延迟任务的方法。
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())//SHURDOWN 了?
reject(task);
else {
super.getQueue().add(task); // 添加到队列里去
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&//当前线程是不是在shutdown 的状态下执行
remove(task))//为false的情况下 从当前队列移除 然后取消当前任务
task.cancel(false);
else
//确保至少还有一个线程在执行任务
ensurePrestart();
}
}
如果线程池未关闭那么 ensurePrestart方法
//保证的是至少启动一个核心线程
void ensurePrestart() {
int wc = workerCountOf(ctl.get());//获取工作线程数
if (wc < corePoolSize)//如果小于 核心线程数
addWorker(null, true);//添加 核心线程数
else if (wc == 0)//至少保证还有一个工作线程
addWorker(null, false);
}
如果线程未关闭,那么直接调用 DelayedWorkQueue 里的 add -> offer 方法,将任务task 放入队列中
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {//向数组中添加任务
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {//要不要扩容
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
//之前没有任务,放在第一位就行
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
//否则将任务放到最后一位,然后通过siftUp 方法调整
siftUp(i, e);
}
//如果队列中的第一个任务是当前e则清除leader线程,然后唤醒一个等待队列可用的线程来执行任务
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
private void grow() {
int oldCapacity = queue.length;
//每次扩容 50%
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity < 0) //如果新容量小于0,那么表明溢出了
newCapacity = Integer.MAX_VALUE;
//将 old数组任务 复制到相信数组上
queue = Arrays.copyOf(queue, newCapacity);
}
//调整堆
private void siftUp(int k, RunnableScheduledFuture<?> key) {
//当K>0时,不断进行调整。k等于0表明调整到了根节点,也就是第一个元素,这时必须退出循环
while (k > 0) {
// 找到他的 父亲节点
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
//与父亲节点比较 看看需不需要动,直到找到 位置
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
如果在添加到优先级队列后,线程池已经关闭,需要通过 来判断是否应该继续执行该任务
/*通过是否是周期任务判断 continueExistingPeriodicTasksAfterShutdown(线程池shutdown后是否执行周期性任务)
executeExistingDelayedTasksAfterShutdown(线程池shutdown后是否执行延迟任务) 是否应该继续执行任务 */
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
//从队列中获取任务,如果当前队列没有任务可取,则阻塞直到队列有任务,即等待offer方法唤醒
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//可响应中断的方式加锁
try {
for (;;) {
RunnableScheduledFuture<?> First = queue[0];
//如果队列第一个任务为null,则证明没有任务了,当前线程等待
if (first == null)
available.await();
else {//否则获取第一个任务的剩余等待时间,判断是否小于0.需不需要执行
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // 在线程等待任务可执行时不保留引用
//由于任务还需要等待一段时间才能执行,这时看看前面有没有线程正在等待,如果有,则当前线程继续等待
if (leader != null)//根本没有必要让拿到第一个任务的线程等待
available.await();
else {/*如果前面没有线程等待,则把自己设置为leader线程,然后开始等待delay时间
这时如果再来别的线程获取任务,就只能让这个成为leader的线程延迟被唤醒*/
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
//将 leader 变量去掉
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
/*当任务被线程获取后,判断leader是否为空且队列不为空,由于没有线程去等待或者获取队列中的下一个任务,因此需要唤醒一个线程担任leader等待或者获取下一个任务*/
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
//完成最终的任务出队,这里传入的f 为第一个等待任务。由于任务被出队,因此需要调整堆
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
//当前任务队列内任务数量 --,然后取出队列尾部的一个任务,调用siftDown重新调整堆的顺序
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
//调整堆。将任务向堆尾移动。
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1; //获取左孩子节点索引
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1; //右孩子
if (right < size && c.compareTo(queue[right]) > 0)//比较左右孩子的大小
c = queue[child = right];
if (key.compareTo(c) <= 0)// 以左右孩子的min 来和 key比较
break;
queue[k] = c; //如果key 小于 他们的min 那么交换
setIndex(c, k);
k = child;
}
queue[k] = key; //此时k 即为传入key 应该存放的下标
setIndex(key, k);
}
scheduleAtFixedRate 和 scheduleWithFixedDelay,前者是基于任务开始时间计算的 ,后者 是 基于上一个任务执行完成的时间计算的
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalargumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
ThreadPoolExecutor 的shutdown ->onShutdown
@override void onShutdown() {
//首先获取任务队列 q
BlockingQueue<Runnable> q = super.getQueue();
//默认 true
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownpolicy();//线程池关闭后是否应该继续执行延迟任务标志
//默认false
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();//线程池关闭后是否应该继续执行周期性任务标志
//判断是否在线程池shutdown后继续执行延迟任务,是否继续执行周期性调度任务
if (!keepDelayed && !keepPeriodic) {//都不是,将任务队列清空,同时取消任务执行
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
//否则遍历任务队列,分别处理周期任务和延迟任务
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) { // 这里的 t.isCancelled() 表示任务已经被取消,应移除
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate();//调用该方法尝试进一步转换线程池状态
}
以上是脚本宝典为你收集整理的[JDK源码]-J.U.C-ScheduledThreadPoolExecutor全部内容,希望文章能够帮你解决[JDK源码]-J.U.C-ScheduledThreadPoolExecutor所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。