Java多线程介绍与线程池底层实现原理


Java线程跟操作系统的关系

CPU一般有4个安全等级ring0,ring1,ring2,ring3,操作系统的内部程序指令一般运行在ring0级别,而我们的应用程序会运行在ring3上面,比如JVM进程。为什么说JVM线程的创建是一个比较重的操作。下面是一个线程创建的过程:

  1. ring3级别切换到ring0去创建线程
  2. 从ring0切回ring3,然后线程去执行程序
  3. 执行完毕就会销毁线程,这时候又会切换到ring0去销毁线程

正因为线程创建销毁很重,所以才有了线程池,让一个线程多干点活,不让它活一干完就死了。这一点后面再说。

操作系统在内存上面分为用户空间,和内核空间

  • 用户空间放的是我们应用程序执行的代码
  • 内核空间放的是内核代码

因为操作系统这样的划分,所以我们有2种线程模型,KLT(内核线程模型),ULT(用户线程模型)

内核线程模型是由内核去创建线程,用户线程模型是由用户去创建的线程。JVM就是一种内核线程模型所以便会有大量的用户态内核态之间的切换,用户线程模型就不会有这样的问题。

线程上下文切换

假设现在有2个线程thread1,thread2

  1. CPU会给这2个线程分配一个时间周期,这里假设为50ns
  2. thread1执行了50ns就要切换回thread2执行,此时thread1的执行状态保存到TSS(程序任务状态段)上面去,方便下次找到我执行的时刻。

正因为线程的时间片轮转机制所以给人的感知就是多个线程在同时执行,也就是并发。

那么现在有个问题就是多线程运行程序一定比单线程快吗?

其实不一定,多线程运行程序必然会存在着大量的线程上下文切换还有线程的创建和销毁,这些操作也是会耗时间的,而单线程不会有这样过的问题,所以是有可能多线程没有单线程要快的。

如何减少线程的上下文切换

  • 无锁并发编程:线程竞争锁的时候是会引起上下文切换的,用多线程处理数据可以分段处理,将数据拆分处理比如thread1处理0到100,thread2处理101到200,减少线程间资源的竞争
  • CAS算法:用CAS来对数据进行更新不需要加锁,Doug lea的AQS框架大量的运用到了这种算法。
  • 尽量不要创建不必要的线程,用多少创建多少。

线程池

为什么要线程池

线程随用即用不好吗,为什么还要引入一个线程池的概念。首先线程资源很珍贵,不能让你想用我就new Thread()就行了,这样不仅浪费了了CPU的资源还可能拖慢运行程序。打个比方,我执行100个任务,每个任务执行时间是1ns,而我线程创建和销毁时间大概是5ns,那么我对线程的操作比执行任务的时间还长,这样降低了整个系统的运行效率。那么如何让我们创建的线程放在那不动(不会销毁),来一个活,我就接一个。这样不是会减少大量的线程创建和销毁吗。这种运用线程的方法就是线程池。简单来说就是对线程的重用。

线程池的参数配置

关于线程池的使用不介绍了,主要谈谈线程池的参数配置。先看下创建一个线程池对象的静态方法。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
}

在一个简单的例子中介绍这7个参数

首先来一个任务,我们会判断已经创建的线程数是否小于等于 corePoolSize(核心线程数),如果小于就会创建一个线程(线程会带着任务),如果大于就会往workQueue(阻塞队列)里面去放,如果阻塞队列里面也放满了,如果此时线程数(核心线程数+非核心线程数)小于maximumPoolSize(线程总数)就会去创建一个非核心线程,大于就会走handler(拒绝策略)

注意:核心线程只有创建的时候是直接拿到的任务,后续的任务全部都是去阻塞队列里面去拿的。

拒绝策略分为4种当然你也可以实现RejectedExecutionHandler接口去重写rejectedExecution拒绝策略

默认4种,比较重要的就是CallerRunsPolicy,DiscardOldestPolicy:

  • AbortPolicy: 抛异常
  • CallerRunsPolicy:主线程自己去执行任务
  • DiscardOldestPolicy:从阻塞队列里面踢掉最先进的那个元素,然后自己入队
  • DiscardPolicy:不作任何处理

线程池状态

    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
  • RUNNING:线程池在RUNNING状态,可以接受新任务,和处理所有任务,线程刚创建就是RUNNING状态
  • SHUTDOWN:不接受新任务,但处理已经有的任务,shutdown方法将RUNNING->SHUTDOWN
  • STOP:不接受新任务,不处理已有任务,并且中断正在执行的任务,shutdownNow方法切换到STOP状态
  • TIDYING:所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态,而且还会执行terminated方法钩子函数,执行逻辑可以自己实现。阻塞队列为空并且线程池中执行的任务也为空时会转变为TIDYING
  • TERMINATED:线程池彻底终止。线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

excute方法源码分析

这里有个重要参数ctl,ctl存储的是线程状态位和活跃线程数,他是一个ArtomicInteger,利用了int类型32位的特性,就32位中的前3位用来存储线程的状态后28位存储活跃线程数。


public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // ctl这里存储的是线程池状态位和活跃线程数
    int c = ctl.get();
    // 从ctl里面取得活跃线程数看活跃是否小于核心线程池数量
    if (workerCountOf(c) < corePoolSize) {
        // 小于核心线程池数量就创建一个核心线程,注意这里传的值是true,标识了核心线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 活跃状态就往阻塞队列里面放
    if (isRunning(c) && workQueue.offer(command)) {
        // 这里为什么又要拿一次状态,主要就是防止入队前后线程池状态的变化
        int recheck = ctl.get();
        // 再次检查线程池状态是否运行状态,不是就移除刚刚放进来的任务,并且走拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 这里为什么要判有效线程数为什么是0,主要是创建线程池时核心线程数量可以为0
        // 如果核心线程数是0,那么执行到这里队列中有1个任务,但没有一个可以干活的线程,这时候要创建
        // 一个线程去干活了,为什么是false,因为你设置的核心线程数为0这里只能创建一个非核心线程
        // 为什么是null,是因为这时候任务在队列里面了,我不能创建线程的时候在带着任务,只能让线程去
        // 队列里面去取了。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 到这一步说明了干活线程大于核心线程池数,而且阻塞队列也满了,此时会创建一个非核心线程
    else if (!addWorker(command, false))
        // 创建非核心线程失败了,就直接走拒绝策略
        reject(command);
}

addWorker方法源码分析

addWorder方法主要工作是创建一个新的线程

/**
 * firstTask:是线程创建时的第一个任务
 * core: true就是与corePoolSize比较,false就是与maximumPoolSize比较
 **/
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        // 拿运行状态
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 运行状态大于shutdown状态就不接活了,
        // 如果状态是shutdown,虽然不接活,但也要处理之前遗留的队列里面的任务,
        // 但前提你不能给我任务firstTask
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // wc>=CAPACITY:工作不能大于等于所设的最大值就是int位的低29位
            // private static final int COUNT_BITS = Integer.SIZE - 3;
            // private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

            //  wc >= (core ? corePoolSize : maximumPoolSize)就是按core传值判断是否不和规范
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // CAS增加workcount,成功则跳出第一个循环,失败继续自旋
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 重新拿一下状态看状态是否修改,修改了就去第一个循环重新判断
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建worker对象
        w = new Worker(firstTask);
        // worker对象里面都有一个线程
        final Thread t = w.thread;
        if (t != null) {

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // 这里主要是判断的是线程可不可以运行
                // 2个条件满足其一
                // 1: 运行状态
                // 2: shutdown状态,但新任务为空,可以执行wordQueue里面的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一个HashSet
                    workers.add(w);
                    // 记录出现过的线程最大数量
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动 worker里面的线程,这时候应该去看Worker对象里面的run方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker对象的run方法

Worker是一个ThreadPoolExecutor内部类,此时才是真正干活的线程

// 他这里继承了AQS和实现了Runable
// 实现Runable是为了让自己成为一个线程
// 继承AQS是为了实现独占锁,在addWorker里面会用到,为什么不用ReentrantLock呢?
// TODO
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            // 初始化把state设值为-1,是为了使还没执行任务的线程不会被中断,执行任务时会把它设为0
            setState(-1); // inhibit interrupts until runWorker
            // 这里吧firstTask保存,并带到线程去
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            // 调用外部类runWorker方法
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            // 尝试CAS更新state状态值为1失败,返回false,从这一点可以看出Worker是想做一把不可重入的锁
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

runWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 初始化的时候state是-1,这里调用unlock是吧它置为0,可以中断
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 如果task为空,则通过getTask来获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务,如果任务发生了异常,会往外抛执行processWorkerExit方法,
                    // 从这个方法可以看出线程池不会因为某个任务发生异常就不执行了,发生异常线程池会创建一个没有任务的线程
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

getTask方法

private Runnable getTask() {
    // timeOut:上次从队列拿任务时是否超时
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池不在runing看线程池是否在stop或者阻塞队列是否为空
        // 如果是true。那线程池是不能执行新任务
        // 就应该吧之前放进来的任务去掉,也就是要把wordcount-1
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // 判断是否需要进行超时控制
        // allowCoreThreadTimeOut默认false,这一点可以看得出来核心线程数不会进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 这里做的事情主要是如果有超时控制且确实超时了,而且阻塞队列是空的
        // 也就说明我们工作线程太多了,这时候就应该要减少工作线程,减一失败则重试
        // 但这里有timed控制也说明了它不会减核心线程
        // 线程如何销毁呢?
        // 这里CAS成功则返回null,返回null则表示runWorker 的while循环要跳出去了,也就是线程结束了,JVM回收即可。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果有超时控制就用阻塞队列的poll方法可以控制时间,
            // workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)表示在keepAliveTime时间下如果取不出来值会阻塞,超过时间就返回null
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
            timedOut = false;
        }
    }
}

processWorkerExit方法

processWorkerExit方法在于runWorker的finally代码块里面的,执行任务发生异常或正常结束任务会走的方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // completedAbruptly这个参数在runWorker里面传入的,
    // true说明发生了异常,我们要将workcount减一;所以走了decrementWorkerCount方法
    // false说明正常结束任务,怎么结束?getTask方法为null.说明我们已经将wordcount已经减一了。
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 统计完成的任务数
        completedTaskCount += w.completedTasks;
        // 从workers中移除,说明从线程池移除了一个工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 根据线程池状态判断是否要终止线程池
    tryTerminate();

    int c = ctl.get();
    // 如果当前线程池状态是RUNNING,或SHUTDOWN状态
    if (runStateLessThan(c, STOP)) {
        // 如果不是异常终止
        // allowCoreThreadTimeOut = true 而且阻塞队列为空时,一个线程不留
        // allowCoreThreadTimeOut = true 而且阻塞队列不为空时,就保留最少一个线程
        // allowCoreThreadTimeOut = false 就保留最少线程小于核心线程数
        // min值有0,1,corePoolSize三种
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 如果是异常终止的会 addWorker(null, false)加一个null任务的线程
        addWorker(null, false);
    }
}

延迟类线程池

线程池家族里面有一类特殊的线程池,叫做延迟类线程池ScheduledThreadPoolExecutor,先看一下它的继承关系

很明显它继承了ThreadPoolExcutor,所以他也是一个线程池,只是在线程池上对他进行了增强,在看一下它的构造方法

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

从中可以看出它不需要很多的参数,在线程池中的大部分参数它不要设置,比如最大线程数和超时时间和超时时间单位,在延迟类线程池中只需要传入一个核心线程数就可以创建一个延迟线程池了。

执行过程

它和线程池的处理不一样,来了一个任务它不会直接创建工作线程,而是直接丢到队列里面去。线程去队列里面去拿任务执行。

他这里的队列有点特别,它是DelayQueue,它会根据时间排序。因为他要保证延迟线程池里面不同的线程所延迟时间不同,他需要根据延迟时间吧他们排好序,先执行的应该在最前面。

下面先介绍下ThreadPoolExcutor的三种提交任务的方式.

  1. schedule方法,延迟执行,scheduler第二个参数和第三个参数规定了延迟时间。

    ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
    scheduledThreadPoolExecutor.schedule(() -> System.out.println("1"), 5, TimeUnit.SECONDS);
    
  2. scheduleWithFixedDelay方法,延迟执行,周期执行,等待之前的任务执行完成才开始计算周期时间。第一个参数:任务,第二个参数:初始化延时时间,第三个参数是周期时间,第四个参数时间单位

    下面这段代码执行会是执行完第一个任务睡5秒,5秒睡完才开始计算时间计算到周期时间才会执行第二个任务

    scheduledThreadPoolExecutor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            System.out.println("这是scheduleWithFixedDelay调度1");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    },2,5, TimeUnit.SECONDS);
    
    scheduledThreadPoolExecutor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            System.out.println("这是scheduleWithFixedDelay调度2");
        }
    },2,5,TimeUnit.SECONDS);
    
  3. scheduleAtFixedRate方法,延迟执行,周期执行,不等待前一个任务执行完,就开始计算周期时间;第一个参数:任务,第二个参数:初始化延时时间,第三个参数是周期时间,第四个参数时间单位

    下面这段代码就是先执行第一个任务,任务已执行不管你执行完没完我就计算周期时间

    scheduledThreadPoolExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            System.out.println("这是scheduleAtFixedRate调度1");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    },2,5, TimeUnit.SECONDS);
    
    scheduledThreadPoolExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            System.out.println("这是scheduleAtFixedRate调度2");
        }
    },2,5,TimeUnit.SECONDS);
    

下面看下源码是怎么实现的这几种提交方式

对比三种提交方式

// 提交方式
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    // 这里decorateTask点进去一看就发现是一个空方法,不管它只是一个封装任务而已
    // triggerTime算出初始化执行时间now()+initDeley < 0 ? 0 : initDelay
    RunnableScheduledFuture<V> t = decorateTask(callable,
                                                new ScheduledFutureTask<V>(callable,
                                                                           triggerTime(delay, unit)));
    // 这里是提交任务的方法
    delayedExecute(t);
    return t;
}

// scheduleAtFixedRate提交方法
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    // 封装任务
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 这里是提交任务的方法
    delayedExecute(t);
    return t;
}

// scheduleWithFixedDelay提交方法
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    // 封装任务
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 这里是提交任务的方法
    delayedExecute(t);
    return t;
}

我们对比一下scheduleAtFixedRate和scheduleWithFixedDelay这2个方法基本没什么区别。仔细对比发现他们之间除了参数名字不一致以外,还有一个很关键的区别在封装ScheduledFutureTask任务时,scheduleAtFixedRate传的是unit.toNanos(period),而scheduleWithFixedDelay传的是unit.toNanos(-delay),一个是正数一个是负数,其余就没区别了。这里的具体原因在后面修改周期时间那里会有介绍。

delayedExecute方法

delayedExecute是提交任务的方法,任务封装成ScheduledFutureTask,就要把这个任务放进任务队列里面去了。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 判断是否是shutdown状态,是的话不在处理新任务走拒绝策略
    if (isShutdown())
        reject(task);
    else {
        // 是运行状态将任务放进队列里面去
        super.getQueue().add(task);
        
        // task.isPeriodic()从构造方法可以看出周期任务period != 0,延迟任务是=0
        // isPeriodic方法只是通过period != 0 是true否则是false
        // canRunInCurrentRunState 获取了当前是周期任务还是延迟任务。
        // 分析出continueExistingPeriodicTasksAfterShutdown是周期任务
        // 分析出executeExistingDelayedTasksAfterShutdown是延迟任务
        // 判断线程池的状态,和当前任务是否能运行。如果不能继续执行,将任务移出队列并取消任务。
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 这里就是增加线程去处理任务,从这也可以看出它和线程池的区别,
            // 刚创建任务的时候不会创建把线程和任务绑定
            ensurePrestart();
    }
}

void ensurePrestart() {
    // 获取当前活跃线程数
    int wc = workerCountOf(ctl.get());
    // 如果活跃线程数小于核心线程数就创建线程
    if (wc < corePoolSize)
        addWorker(null, true);
    // 获取当前活跃线程数是0而且也不小于核心线程数,说明你此时设定的核心线程数是0,小于0会报错
    // 主要就是为了兼容线程池可以创建核心线程数为0
    // 这里是为了避免有任务但没线程执行的尴尬
    else if (wc == 0)
        addWorker(null, false);
}

SchduledFutureTask的run方法

真正执行任务的方法

 public void run() {
     // task.isPeriodic()从构造方法可以看出周期任务period != 0,延迟任务是=0
     // isPeriodic方法只是通过period != 0 是true否则是false
     // true代表是周期任务 false代表是延迟任务
     boolean periodic = isPeriodic();
     // 判断是否可以运行,不可以cancel掉任务
     if (!canRunInCurrentRunState(periodic))
         cancel(false);
     // 延迟任务
     else if (!periodic)
         // 直接调用一次
         ScheduledFutureTask.super.run();
     // 此时它一定是周期任务runAndReset运行不返回结果
     else if (ScheduledFutureTask.super.runAndReset()) {
         // 设值下一次执行时间
         setNextRunTime();
         // 重新入队
         reExecutePeriodic(outerTask);
     }
 }

private void setNextRunTime() {
    long p = period;
    // 此时就是前面2种周期任务提交方式的区别所导致的,还记得前面如果是
    // scheduleAtFixedRate提交方式period是正数;scheduleWithFixedDelay提交方式period是负数
    if (p > 0)
        // 执行线程时间加周期时间
        time += p;
    else
        // 现在时间加周期时间
        time = triggerTime(-p);
}

DelayedWorkQueue

DelayedWorkQueue是优先级队列,ScheduledThreadPoolExecutor它所使用的队列就是DelayedWorkQueue。之所以使用DelayedWorkQueue,是因为定时任务执行,总要取出最近要执行的任务,所以一定要取队列中最靠前的任务。

他是一个堆结构,底层是数组;它不能保证所有的元素一定是顺序的,但能保证你从堆顶拿到的元素一定是所有里面最大或最小的。


文章作者: dm
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 dm !
评论
 上一篇
Unsafe魔法类解析及应用 Unsafe魔法类解析及应用
Unsafe类提供了一些极度不安全的方法,这些方法会直接访问系统内存和对系统内存进行操作。,由于它是直接对内存进行的操作,所以从他的命名也可以看出它是不安全的。Unsafe类的使用必须慎重;juc包中大量运用了Unsafe类,对Unsafe
2021-10-05
本篇 
Java多线程介绍与线程池底层实现原理 Java多线程介绍与线程池底层实现原理
Java线程跟操作系统的关系CPU一般有4个安全等级ring0,ring1,ring2,ring3,操作系统的内部程序指令一般运行在ring0级别,而我们的应用程序会运行在ring3上面,比如JVM进程。为什么说JVM线程的创建是一个比较重
2021-08-27
  目录