软件世界网 购物 网址 三丰软件 | 小说 美女秀 图库大全 游戏 笑话 | 下载 开发知识库 新闻 开发 图片素材
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
移动开发 架构设计 编程语言 Web前端 互联网
开发杂谈 系统运维 研发管理 数据库 云计算 Android开发资料
  软件世界网 -> 互联网 -> Java1.7ThreadPoolExecutor源码解析 -> 正文阅读

[互联网]Java1.7ThreadPoolExecutor源码解析


相比1.6,1.7有些变化:
1、        增加了一个TIDYING状态,这个状态是介于STOP和TERMINATED之间的,如果执行完terminated钩子函数后状态就变成TERMINATED了;
2、        内部类Worker继承了AQS类作为一个独享锁,在运行每个任务前会获取自己的锁;
3、        runState和poolSize两个字段被合并成一个原子字段ctl了,不再使用mainLock保护了。

一、成员变量介绍

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * ctl字段其实表示两个含义:runState和workerCount(近似1.6中的poolSize)
     * int类型,高3位表示runState,低29位表示workerCount。目前这个版本也就限
     * 制了线程个数不会超过2^29-1。
     * RUNNING: 能接受新的任务且能处理队列里的请求
     * SHUTDOWN: 不能接受新的任务但是能处理队列里的请求
     * STOP: 不能接受新的任务、不能处理队列里的请求,workers会被interrupt
     * TIDYING: 所有的线程都已经terminated了,正准备调用terminated()方法
     * TERMININATED: terminated()方法已经调用结束了
     * 
     * RUNNING->SHUTDOWN: 调用shutdown方法
     * (RUNNING/SHUTDOWN)>STOP: 调用shutdownNow方法
     * SHUTDOWN->TIDYING: 当workers和queue都空的时候
     * STOP->TIDYING: 当workers为空的时候
     * TIDYING->TERMINATED: 当terminated方法调用结束的时候。
     * awaitTermination()直到状态为TERMINATED时才会返回。
     * 

     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 取ctl的高三位,获取runState(运行状态)
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 取ctl的低29位,获取workerCount(worker的数量)
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 把runState和workerCount合并成ctl,上面两个函数的反操作
    private static int ctlOf(int rs, int wc) { return rs | wc; }

二、execute函数

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 三步走:
         * 1. 如果RUNNING的线程数目小于corePoolSize,直接调用addWorker方法
         * 启动一个新线程。addWorker函数会检查runState和workerCount,如果不
         * 需要新建一个thread就会返回false了
         * 
         * 2. 如果任务被成功的放入了workQueue,我们仍然需要做个double-check
         * 因为调用完isRunning(c)后池中的线程可能都退出了或者线程池被shut 
         * down了。重新检查状态看是要remove掉新来的任务还是创建一个新线程来执
         * 行(如果没有活动的线程了)
         * 
         * 3. 如果放入workQueue失败了,我们尝试创建一个新worker。如果失败了,
         * 说明线程池被关闭了或者饱和了(超过最大值了),就直接拒了。
         * 
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
// addWorker有可能会失败,失败后重新获取状态并继续往下走
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
// 如果isRunning(c)&&workQueue.offer中间并发发生了shutdown,需要remove
// 掉刚放入workQueue的command任务。注意:此时如果有一个worker刚执行完一个task
// 然后从workQueue获取下一个task时,这里的remove就会失败了。
            if (! isRunning(recheck) && remove(command))
                reject(command);
// 如果是RUNNING状态但是没有可工作的线程,需要直接new一个
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

execute函数大体思路和1.6一致,就三种情况:
①          当前线程池中线程数目小于corePoolSize,直接new一个thread;
②          当先线程池数据大于corePoolSize,则放入workQueue中;
③          如果workQueue满了且线程池中线程数小于maximumPoolSize,则new一个thread。
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

// 如果被shutdown了,一般就直接返回false。但是需要排除一个特例情况:当线程池状
// 态是shutdown,但workQueue不空且workers空了,会调用addWorker(null,false)
// 方法创建一个线程处理workQueue里的任务,这时不能直接返回false。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
// 如果当前workers数目大于CAPACITY或者大于用户设置了,直接返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
// 如果仅仅是workerCount变化了,那么继续内层的循环;如果连runState也变化了,
// 则要重新继续外层的循环。
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);
// 再次检查runState的状态,如果是RUNNING或者SHUTDOWN但是firstTask不空,则
// 把new出来的worker放入workers中。
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
// 创建worker成功后直接启动线程了
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
// 创建失败要做清理操作
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker函数尝试新建一个thread来运行传递给它的task。当线程池被STOP或SHUTDOWN或threadFactory返回null时或者OOM时,会返回false并做相应的清理。整个过程分为两步:1、尝试设置workerCount,成功了就到步骤2;2、尝试创建一个worker并加入到workers里。
private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
  addWorkerFailed函数做些清理操作:1、把创建的worker从workers中删除;2、把workerCount减1;3、检查是否可以terminated线程池,防止这个worker的存在导致执行awaitTermination操作的客户端线程阻塞了。
  final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
// 如果是以下三种情况直接返回:
// 1.RUNNING状态; 2.runState>=TIDYING,说明有其他线程执行了tryTerminate操
// 作; 3.SHUTDOWN状态且workQueue不空
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
// 如果workerCount大于0,则中断一个空闲的worker,就返回了。为啥只中断一个呢?
// 因为worker线程退出时也会调用tryTerminate方法(一个接一个的传播)
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
// 走到这里说明workers数量为0了,尝试把线程池状态改成TIDYING并调用terminated
// 函数->状态再设置成TERMINATED。如果设置TIDYING失败,则继续循环。
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
// terminated函数抛异常也需要执行下面的操作。
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

tryTerminate函数尝试TERMINATED线程池(当a、SHUTDOWN且queue和pool都空;b、STOP且queue为空了)。如果workers不为0,则中断任意一个空闲的worker后直接返回。否则:首先,将线程池状态改成TIDYING;其次,调用用户的钩子函数terminated;最后,将状态设置成TERMINATED。
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
// 如果tryLock成功,就说明这个worker是空闲的。
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
// 如果只中断一个就break,只有tryTerminate函数中使用到这种情况。
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

interruptIdleWorkers函数根据onlyOne参数决定中断一个或所有空闲的workers(这些workers都阻塞在getTask方法中)。

三、shutdown函数

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
// 检查调用者是否有权限执行shutdown
            checkShutdownAccess();
// 将线程池的状态改成SHUTDOWN
            advanceRunState(SHUTDOWN);
// 中断所有空闲的workers
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
// 尝试终止线程池
        tryTerminate();
    }

shutdown函数就执行几步:把状态改成SHUTDOWN,中断所有空闲的workers,调用onShutdown钩子函数,最后调用tryTerminate尝试终止线程池。
private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

advanceRunState函数将线程池的状态改成指定状态值,如果现在状态值比target值大就直接返回。targeState的值是SHUTDOWN或者STOP,不能是TIDYING或者TERMINATED(这两种状态需要调用tryTerminate函数设置)。

四、shutdownNow函数


public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
// 检查调用者是否有权限执行关闭
            checkShutdownAccess();
// 将线程池的状态改成STOP
            advanceRunState(STOP);
// 和shutdown不同,这里中断所有的worker线程
            interruptWorkers();
// 删除workQueue里的任务并返回任务列表
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
// 尝试终止线程池
        tryTerminate();
        return tasks;
    }

shutdownNow函数会中断所有的worker线程,删除workQueue里的任务,最后尝试终止线程池并返回workQueue里的任务。
private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
// 中断所有的worker线程
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        List<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

五、Worker内部类

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
// 初始值为-1,防止worker还没启动就被interrupt了;在start开始时会将状态改成0
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
// 参数1没有意义,是独占锁
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

Worker类主要维护着中断的管理和其他操作(runWorker函数),继承了AQS类实现了一个不可重入的Lock,在获取到一个任务后,准备执行前首先要获取这个锁。同时,在中断空闲的worker时也要先获取到这个锁。
public void run() {
            runWorker(this);
        }
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
// 有时我们不想从workQueue取第一个任务,直接执行刚提交的任务
        Runnable task = w.firstTask;
        w.firstTask = null;
// 把state设置成0,允许中断
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
// 进入循环了
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 如果是STOP状态,需要保证线程是被中断了的;
                // 如果不是需要清空中断状态,但是需要重新检查下状态防止在清除
                // 中断时发生了shutdownNow
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
// 执行前的钩子函数
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
// 执行后的钩子函数
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker函数循环从workQueue里获取task并执行,但是需要注意以下几个问题:1.如果不想从workQueue里获取第一个任务执行,那就给worker.firstTask赋值。2、如果getTask获取的值为null,或者你的task里抛异常了,那循环就退出了,然后worker线程也就退出了。3、在执行任务前先要获取worker的锁,这里防止中断正在执行的线程。4、如果你的钩子函数beforeExecute函数抛异常了,那么你的任务就不会被执行了,worker线程也会退出。5、如果task.run方法抛出Runtime或Error异常,会原样抛出,如果是Throwable,则会包装成一个Error抛出,抛出异常前会执行afterExecute钩子函数,最后线程会退出。6、如果afterExecute钩子函数抛出异常,那么worker线程也会退出。
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果SHUTDOWN且workQueue为空,或者STOP了,worker线程直接退出
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
// 是否要回收这个worker线程?
            boolean timed;      // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果还没有超时过(循环第一次执行到这里)直接break
                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
// 否则,如果线程数大于最大限制或者已经超时过了说明这个worker线程要准备退出了
// 先设置workerCount-1,成功的话直接退出;否则,看下runState是否和rs一样,如
// 果一样就在内部循环,不一样就要到外部循环
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
// 无限阻塞或超时阻塞
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
// 没有获取到task肯定是超时了
                timedOut = true;
            } catch (InterruptedException retry) {
// 如果被中断了,不能算作超时
                timedOut = false;
            }
        }
    }

getTask函数是从workQueue里获取一个task,有两种策略(无限阻塞或者超时,具体要看客户端的配置)。如果这个函数返回了null,那么worker线程就会退出了。退出的原因不外乎以下几种:
1.   当前线程池中worker数量大于maximumPoolSize了;
2.   线程池被STOP了(workQueue.poll/take时会捕获到InterruptedException异常);
3.   线程池被SHUTDOWN了且workQueue为空(workQueue.poll/take时会捕获到InterruptedException异常);
4.   获取task超时了(timedOut)&&(timed)。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 用户的函数抛异常了,需要调整workerCount的值,因为worker线程准备退出了
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

// 做些统计操作(bookkeeping)
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
// 尝试终止线程池
        tryTerminate();

        int c = ctl.get();
// 如果是RUNNING或SHUTDOWN状态,要看下workQueue是否为空,
// 不能直接退出。如果workQueue不空,至少要保留1或corePoolSize个
// 线程(看allowCoreThreadTimeOut配置)。少于这个数目,就需要通过
// addWorker(null,false)方法补充新的线程进来。
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

processWorkerExit函数是在runWork循环退出后做的清理和bookkeeping(应该就是指completedTaskCount等变量的操作吧)操作。completedAbruptly参数的含义是指用户的函数是否抛异常了(before/after/run等)。注意下函数最后会根据线程池的状态和配置决定是否新建一个worker线程。
......显示全文...
    点击查看全文


上一篇文章      下一篇文章      查看所有文章
2016-04-02 20:59:29  
互联网 最新文章
C++11并发API总结
16.收款(AcceptingMoney)
数据链路层综述
IP协议及IP数据报解析
《浅谈HTTP协议》
计算机网络基础
LoadRunner和RPT之间关于手动关联和参数化的
HTTPS中的对称密钥加密,公开密钥加密,数字
上班需要打卡吗?(开通微信公众号--乘着风
ofbizjmsactivemq
360图书馆 软件开发资料 文字转语音 购物精选 软件下载 美食菜谱 新闻资讯 电影视频 小游戏 Chinese Culture 股票 租车
生肖星座 三丰软件 视频 开发 短信 中国文化 网文精选 搜图网 美图 阅读网 多播 租车 短信 看图 日历 万年历 2018年1日历
2018-1-17 9:15:34
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  软件世界网 --