原创

【并发编程系列12】从Java线程池的常用4种写法深入分析线程池(Thread Pool)的实现原理

写在前面的话

并发编程里面,线程池这个一直就想写一篇文章来总结下,但是直到并发编程系列的第12篇才写的原因是线程池里面用到了AQS同步队列和阻塞队列等一些知识,所以为了铺垫,就先把前面的知识点写完了,到现在,终于可以总结一下线程池的实现原理了。

什么是线程池

在Java中,创建一个线程可以通过继承Thread或者实现Runnable接口来实现,但是,如果每个请求都创建一个新线程,那么创建和销毁线程花费的时间和消耗的系统资源都相当大,甚至可能要比在处理实际的用户请求的时间和资源要多的多。

为了解决这个问题,就有了线程池的概念,线程池的核心逻辑是提前创建好若干个线程放在一个容器中。如果有任务需要处理,则将任务直接分配给线程池中的线程来执行就行,任务处理完以后这个线程不会被销毁,而是等待后续分配任务。同时通过线程池来重复管理线程还可以避免创建大量线程增加开销。

创建线程池

为了方便使用,Java中的Executors类里面提供了几个线程池的工厂方法,可以直接利用提供的方法创建不同类型的线程池:

  • newFixedThreadPool:创建一个固定线程数的线程池
  • newSingleThreadExecutor:创建只有1个线程的线程池
  • newCachedThreadPool:返回一个可根据实际情况调整线程个数的线程池,不限制最大线程 数量,若用空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在60秒 后自动回收。
  • newScheduledThreadPool: 创建一个可以指定线程的数量的线程池,但是这个线程池还带有 延迟和周期性执行任务的功能,类似定时器。

FixedThreadPool

创建一个固定数量N个线程在一个共享的无边界队列上操作的线程池。在任何时候,最多N个线程被激活处理任务。如果所有线程都在活动状态时又有新的任务被提交,那么新提交的任务会加入队列等待直到有线程可用为止。

如果有任何线程在shutdown前因为失败而被终止,那么当有新的任务需要执行时会产生一个新的线程,新的线程将会一直存在线程池中,直到被显式的shutdown。

示例

package com.zwx.concurrent.threadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestThreadPool {
    public static void main(String[] args) {
        //FixedThreadPool - 固定线程数
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i=0;i<10;i++){
            fixedThreadPool.execute(()-> {
                System.out.println("线程名:" + Thread.currentThread().getName());
            });
        }
        fixedThreadPool.shutdown();
    }
}

输出结果为:
在这里插入图片描述
可以看到,最多只有3个线程在循环执行任务(运行结果是不一定的,但是最多只会有3个线程)。

FixedThreadPool调用了如下方法构造线程池:
在这里插入图片描述

SingleThreadExecutor

只有一个工作线程的执行器。如果这个线程在正常关闭前因为执行失败而被关闭,那么就会重新创建一个新的线程加入执行器。

这种执行器可以保证所有的任务按顺序执行,并且在任何给定的时间内,确保活动的任务只有1个。

示例

package com.zwx.concurrent.threadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestThreadPool {
    public static void main(String[] args) {
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i=0;i<9;i++){
            singleThreadExecutor.execute(()-> {
                System.out.println("线程名:" + Thread.currentThread().getName());
            });
        }
    }
}
singleThreadExecutor.shutdown();

运行结果只有1个线程:
在这里插入图片描述
SingleThreadExecutor调用了如下方法构造线程池:
在这里插入图片描述

CachedThreadPool

一个在需要处理任务时才会创建线程的线程池,如果一个线程处理完任务了还没有被回收,那么线程可以被重复使用。

当我们调用execute方法时,如果之前创建的线程有空闲可用的,则会复用之前创建好的线程,否则就会创建新的线程加入到线程池中。

创建好的线程如果在60s内没被使用,那么线程就会被终止并移出缓存。因此,这种线程池可以保持长时间空闲状态而不会消耗任何资源。

示例

package com.zwx.concurrent.threadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class TestThreadPool {
    public static void main(String[] args) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i=0;i<9;i++){
            cachedThreadPool.execute(()-> {
                System.out.println("线程名:" + Thread.currentThread().getName());
            });
        }
        cachedThreadPool.shutdown();
}

输出结果可以看到,创建了9个不同的线程:
在这里插入图片描述
接下来我们对上面的示例改造一下,在执行execute之前休眠一段时间:

package com.zwx.concurrent.threadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestThreadPool {
    public static void main(String[] args) {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        for (int i=0;i<9;i++){
	        try {
	                Thread.sleep(i * 10L);
	            } catch (InterruptedException e) {
	                e.printStackTrace();
	            }
            cachedThreadPool.execute(()-> {
                System.out.println("线程名:" + Thread.currentThread().getName());
            });
        }
        cachedThreadPool.shutdown();
}

这时候输出的结果就只有1个线程了,因为有部分线程可以被复用:
在这里插入图片描述
注意:这两个示例的结果都不是固定的,第一种有可能也不会创建9个线程,第二种也有可能不止创建1个线程,具体要看线程的执行情况。

CachedThreadPool调用了如下方法构造线程池
在这里插入图片描述

ScheduledThreadPool

创建一个线程池,它可以在调度命令给定的延迟后运行或定期执行。这个相比较于其他的线程池,其自定义了一个子类ScheduledExecutorService继承了ExecutorService。

示例

package com.zwx.concurrent.threadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;

public class TestThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
        for (int i=0;i<9;i++){
            scheduledThreadPool.execute(()->{
                System.out.println("线程名:" + Thread.currentThread().getName());
            });
        }
        scheduledThreadPool.shutdown();
    }
}

输出结果(执行结果具有随机性,最多只有3个线程执行):
在这里插入图片描述
ScheduledThreadPool最终调用了如下方法构造线程池
在这里插入图片描述

线程池原理

根据上面的截图可以看到,列举的4中常用的线程池在构造时,最终调用的方法都是ThreadPoolExecutor类的构造方法,所以要分析原理,我们就去看看ThreadPoolExecutor吧!

构造线程池7大参数

下面就是ThreadPoolExecutor类中最完整的一个构造方法:
在这里插入图片描述
这个就是是构造线程池的核心方法,总共有7个参数:

  • corePoolSize:核心线程数量。一直保留在池中的线程,核心线程即使空闲状态也不会被回收,除非设置了allowCoreThreadTimeOut属性
  • maximumPoolSize:最大线程数量。线程池中允许的最大线程数,大于等于核心线程数
  • keepAliveTime:活跃时间。当最大线程数比核心线程数更大时,超出核心的线程数的其他线程如果空间时间超过keepAliveTime会被回收
  • TimeUnit:活跃时间的单位
  • BlockingQueue:阻塞队列。用于存储尚等待被执行的任务。
  • ThreadFactory:创建线程的工厂类
  • RejectedExecutionHandler:拒绝策略。当达到了线程边界和队列容量时提交的任务被阻塞时执行的策略。

线程池执行流程

execute(Runnable) 方法的主流程非常清晰:
在这里插入图片描述
根据上面源码,可以得出线程池执行流程图如下:
在这里插入图片描述

源码分析

首先看看ThreadPoolExecutor类中的ctl,是一个32位的int类型,其中将高3位用来表示线程数量,低29位用来表示,其中的计算方式都是采用二进制来计算。
在这里插入图片描述
其中各种状态的转换关系如下图:
在这里插入图片描述
其中状态的大小关系为:
RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED

addWork方法

 private boolean addWorker(Runnable firstTask, boolean core) {
       //第一段逻辑:线程数+1
        retry:
        for (;;) {
            int c = ctl.get();//获取线程池容量
            int rs = runStateOf(c);//获取状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&//即:SHUTDOWN,STOP,TIDYING,TERMINATED
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))//即:rs==RUNNING,firstTask!=null,queue==null
                return false;//如果已经关闭,不接受任务;如果正在运行,且queue为null,也返回false
            for (;;) {
                int wc = workerCountOf(c);//获取当前的工作线程数
                //如果工作线程数大于等于容量或者大于等于核心线程数(最大线程数),那么就不能再添加worker
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//cas增加线程数,失败则再次自旋尝试
                    break retry;
                c = ctl.get();  // Re-read ctl //再次获取工作线程数
                if (runStateOf(c) != rs)//不相等说明线程池的状态发生了变化,继续自旋尝试
                    continue retry;
            }
        }

		//第二段逻辑:将线程构造成Worker对象,并添加到线程池
        boolean workerStarted = false;//工作线程是否启动成功
        boolean workerAdded = false;//工作线程是否添加成功
        Worker w = null;
        try {
            w = new Worker(firstTask);//构建一个worker
            final Thread t = w.thread;//去除worker中的线程
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;//获取重入锁
                mainLock.lock();//上锁
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());//获得锁之后,再次检查状态

                    //只有当前线程池是正在运行状态,[或是 SHUTDOWN 且 firstTask 为空],才能添加到 workers 集合中
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//将新创建的 Worker 添加到 workers 集合中
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;//更新线程池中线程的数量
                        workerAdded = true;//添加线程(worker)成功
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//这里就会去执行Worker中的run()方法
                    workerStarted = true;//启动成功
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);//如果启动线程失败,需要回滚
        }
        return workerStarted;
    }

这个方法主要就是做两件事:

  • 一、将线程数+1
  • 二、将线程构造成Worker对象,加入到线程池中,并调用start()方法启动线程

Worker对象

在这里插入图片描述
上面这个方法继承了AbstractQueuedSynchronizer,前面我们讲述AQS同步队列的时候知道,AQS就是一个同步器,那么既然有线程的同步器,这里为什么不直接使用,反而要继承之后重写呢?

这是因为AQS同步器内是支持锁重入的,但是线程池这里的设计思想是并不希望支持重入,所以才会重写一个AQS来避免重入。

Worker中state初始化状态设置为-1,原因是在初始化Worker对象的时候,在线程真正执行runWorker()方法之前,不能被中断。而一旦线程构造完毕并开始执行任务的时候,是允许被中断的,所以在线程进入runWorker()之后的第一件事就是将state设置为0(无锁状态),也就是允许被中断。

我们再看看Worker的构造器:
在这里插入图片描述
addWork方法执行到这句:w = new Worker(firstTask);//构建一个worker 的时候,就会调用构造器创建一个Worker对象,state=-1,并且将当前任务作为firstTask,后面再运行的时候会优先执行firstTask。

上面addWorker方法在worker构造成功之后,就会调用worker.start方法,这时候就会去执行Worker中的run()方法,这也是一种委派的方式,如果对start()和run()具体是如何执行这一块不理解,可以点击这里进行详细了解(currentThread()和this的区别部分有解释)。

run()方法中调用了runWorker(this)方法,这个方法就是真正执行任务的方法:

runWorker(this)方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        /** * 表示当前worker线程允许中断,因为new Worker默认的 state=-1,此处是调用 * Worker类的 tryRelease()方法,state置为 0, * 而 interruptIfStarted()中只有 state>=0 才允许调用中断 */
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                /** * 加锁,这里加锁不仅仅是为了防止并发,更是为了当调用shutDown()方法的时候线程不被中断, * 因为shutDown()的时候在中断线程之前会调用tryLock方法尝试获取锁,获取锁成功才会中断 */
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted. This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                /** * 如果是以下两种情况,需要中断线程 * 1.如果state>=STOP,且线程中断标记为false * 2.如果state<STOP,获取中断标记并复位,如果线程被中断,那么,再次判断state是否STOP * 如果是的话,且线程中断标记为false */
                if ((runStateAtLeast(ctl.get(), STOP) ||//状态>=STOP
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();//中断线程
                try {
                    beforeExecute(wt, task);//空方法,我们可以重写它,在执行任务前做点事情,常用于线程池运行的监控和统计
                    Throwable thrown = null;
                    try {
                        task.run();//正式调用run()执行任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//执行任务之后调用,也是个空方法,我们可以重写它,在执行任务后做点事情,常用于线程池运行的监控和统计
                    }
                } finally {
                    task = null;//将任务设置为空,那么下次循环就会通过getTask()方法从workerQueue中取任务了
                    w.completedTasks++;//任务完成数+1
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //核心线程会阻塞在getTask()方法中等待线程,除非设置了允许核心线程被销毁,
            // 否则正常的情况下只有非核心线程才会执行这里
            processWorkerExit(w, completedAbruptly);//销毁线程
        }
    }

主要执行步骤为:

  • 1、首先释放锁,因为进入这个方法之后线程允许被中断
  • 2、首先看看传入的firstTask是否为空,不为空则优先执行
  • 3、如果firstTask为空(执行完了),则尝试从getTask()中获取任务,getTask()就是从队列l里面获取任务
  • 4、如果获取到任务则开始执行,执行的时候需要重新上锁,因为执行任务期间也不允许中断
  • 5、任务运行前后分别有一个空方法,我们可以在有需要的时候重写这两个方法,实现付线程池的监控
  • 6、如果获取不到任务,则会执行processWorkerExit方法销毁线程

getTask()方法

private Runnable getTask() {
        //上一次获取任务是否超时,第一次进来默认false,第一次自旋后如果超时就会设置为true,则第二次自旋就会返回null
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            /** * 1. 线程池状态为shutdown,那么就必须要等到workQueue为空才行,因为shutdown()状态是需要执行队列中剩余任务的 * 2.线程池状态为stop,那么就不需要关注workQueue中是否有任务 */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//线程池中的线程数-1
                return null;//返回null的话,那么runWorker方法中就会跳出循环,执行finally中的processWorkerExit方法销毁线程
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //1.allowCoreThreadTimeOut-默认false,表示核心线程数不会超时
            //2.如果总线程数大于核心线程数,那就说明需要有线程被销毁
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            /** * 1. 线程数量超过maximumPoolSize可能是线程池在运行时被调用了setMaximumPoolSize() * 被改变了大小,否则已经 addWorker()成功的话是不会超过maximumPoolSize。 * 2.timed && timedOut 如果为 true,表示当前操作需要进行超时控制,并且上次从阻塞队列中 * 获取任务发生了超时.其实就是体现了空闲线程的存活时间 */
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://等待指定时间后返回
                    workQueue.take();//拿不到任务会一直阻塞(如核心线程)
                if (r != null)
                    return r;//如果拿到任务了,返回给worker进行处理
                timedOut = true;//走到这里就说明到了超期时间还没拿到任务,设置为true,第二次自旋就可以直接返回null
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

这个方法主要步骤为:

  • 1、首先判断状态是不是对的,如果是SHUTDOWN之类不符合要求的状态,那就直接返回null,并把线程数-1,而返回null之后前面的方法就会跳出while循环,执行销毁线程流程。
  • 2、判断下是不是有设置超时时间或者最大线程数超过了核心线程数
  • 3、根据上面的判断决定是执行带有超时时间的poll方法还是take方法从队列中获取元素。
    情况一:如果是执行带超时时间的poll方法,那么时间到了如果还没取到元素,那么就返回空,这种情况说明当前系统并不繁忙,所以返回null之后线程就会被销毁;
    情况二:如果是执行take方法,根据第2点的判断知道,除非我们人为设置了核心线程可以被回收,否则核心线程就是会执行take方法,如果获取不到任务就会一直阻塞等待获取到任务为止。

processWorkerExit方法

这是销毁线程的方法,上面的getTask()方法返回空,就会执行线程销毁方法,因为getTask()当中已经把线程数-1了,所以这里可以直接执行线程销毁工作。
在这里插入图片描述
直接调用的是workers集合的remove()方法,后面还有就是尝试中止和一些异常异常情况的补偿操作。

拒绝策略

JDK默认提供的拒绝策略有如下几种:

  • AbortPolicy:直接抛出异常,默认策略
  • CallerRunsPolicy:用调用者所在的线程来执行任务
  • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务
  • DiscardPolicy:直接丢弃任务

我们也可以自定义自己的拒绝策略,只要实现RejectedExecutionHandler接口,重写其中的唯一一个方法rejectedExecution就可以了。

常见的面试问题

线程池这一块面试非常喜欢问,我们来举几个常见的问题:

问题一

Q:为什么不建议直接使用Executors来构建线程池?

A:用Executors 使得我们不用关心线程池的参数含义,这样可能会导致问题,比如我们用newFixdThreadPool或者newSingleThreadPool.允许的队列长度为Integer.MAX_VALUE,如果使用不当会导致大量请求堆积到队列中导致OOM的风险而newCachedThreadPool,允许创建线程数量为 Integer.MAX_VALUE,也可能会导致大量 线程的创建出现CPU使用过高或者OOM的问题。而如果我们通过ThreadPoolExecutor来构造线程池的话,我们势必要了解线程池构造中每个 参数的具体含义,会更加谨慎。

问题二

Q:如何合理配置线程池的大小?

A:要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析:

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
  • 任务的优先级:高、中和低。
  • 任务的执行时间:长、中和短。
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

CPU密集型:
CPU密集型的特点是响应时间很快,cpu一直在运行,这种任务cpu 的利用率很高,那么线程数的配置应该根据CPU核心数来决定,CPU核心数=最大同时执行线程数,假如CPU核心数为4,那么服务器最多能同时执行4个线程。过多的线程会导致上 下文切换反而使得效率降低。那线程池的最大线程数可以配置为cpu核心数+1。

IO密集型:
主要是进行IO操作,执行IO操作的时间较长,这是cpu会处于空闲状态, 导致cpu的利用率不高,这种情况下可以增加线程池的大小。可以结合线程的等待时长来做判断,等待时间越高,那么线程数也相对越多。一般可以配置cpu核心数的2倍。
一个公式:线程池设定最佳线程数目 = ((线程池设定的线程等待时间+线程 CPU 时间)/ 线程CPU时间 )* CPU数目

附:获取CPU个数方法:Runtime.getRuntime().availableProcessors()

问题三

Q:线程池中的核心线程什么时候会初始化?

A:默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。 在实际中如果需要线程池创建之后立即创建线程,可以通过如下两个方法:

  • prestartCoreThread():初始化一个核心线程。
  • prestartAllCoreThreads():初始化所有核心线程

问题四

Q:线程池被关闭时,如果还有任务在执行,怎么办?

A:线程池的关闭有两个方法:

  • shutdown()
    不会立即终止线程池,要等所有任务缓存队列中的任务都执行完后才终止,但是不会接受新的任务
  • shutdownNow()
    立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务任务

问题五

Q:线程池容量是否可以动态调整?

A:可以通过两个方法动态调整线程池的大小。

  • setCorePoolSize():设置最大核心线程数
  • setMaximumPoolSize():设置最大工作线程数

总结

本文从线程池的常见的四种用法使用示例开始入手,最终发现都调用了同一个类去构造线程池(ThreadPoolExecutor),所以我们就从ThreadPoolExecutor构造器开始分析了构建一个线程池的7大参数,并从execute()方法开始逐步分析了线程池的使用原理,当然,其实线程池还有一个方法submit()也可以作为入口,这个会放在下篇并发系列讲述Future/Callable的时候再去分析。
请关注我,和孤狼一起学习进步

正文到此结束
本文目录