序言
在高并发编程中,线程池是一个非常重要的组件。它不仅能够有效地管理和复用线程资源,还可以提升应用程序的性能和稳定性。本文将详细介绍Java中的线程池机制,以及如何正确地使用线程池。
一、什么是线程池
线程池是一组已经初始化并等待执行任务的线程集合。通过使用线程池,我们可以避免频繁地创建和销毁线程,从而节省资源和减少系统开销。线程池的核心思想是通过复用线程来提高性能。
二、为什么使用线程池
资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。提高响应速度:任务到达时,无需等待线程创建即可立即执行。提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。三、Java 中的线程池
Java 提供了 java.util.concurrent 包来支持并发编程。其中有几种方法创建线程池
1、通过 Executors 工厂类的静态方法创建线程池:
newFixedThreadPool
/** * 创建一个可重用固定数量线程的线程池,这些线程操作于共享的无界队列。 * 在任何时候,最多有 {@code nThreads} 个线程会处于活动状态处理任务。 * 当所有线程都在活动时,如果提交了额外的任务,它们将在队列中等待, * 直到有线程可用。 * 如果在关闭前,任何线程因执行失败而终止,将根据需要创建一个新的线程来执行后续任务。 * 线程池中的线程将持续存在,直到显式调用 {@link ExecutorService#shutdown shutdown} 方法关闭它。 * * @param nThreads 线程池中的线程数量 * @return 新创建的线程池 * @throws IllegalArgumentException 如果 {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newSingleThreadExecutor
/** * 创建一个使用单一工作线程的ExecutorService来执行任务。 * 此执行器服务使用单一线程以顺序方式处理任务,确保每次只执行一个任务。 * 即使在执行前一个任务时因失败导致线程终止,在关闭前也会创建新的线程继续执行后续任务。 * 与等效的{@code newFixedThreadPool(1)}不同,返回的执行器保证不能重新配置为使用额外的线程。 * * @return 新创建的单线程ExecutorService */public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));}
其中这两个线程池里面使用到的阻塞队列如下:
// 队列的最大长度是Integer的最大整数public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}
newCachedThreadPool
/** * 创建一个可根据需要创建新线程的线程池,同时会重用之前已构建的空闲线程。 * 这种线程池通常能提升执行大量短生命周期异步任务程序的性能。 * 调用{@code execute}方法时,如果存在可用的先前构建的线程则会重用它们。 * 若无现有线程可用,将创建新线程并加入到线程池中。 * 在线程闲置六十秒后,线程将被终止并从缓存中移除。 * 因此,长时间处于空闲状态的线程池不会消耗任何资源。 * 注意:可以通过{@link ThreadPoolExecutor}构造器创建具有类似特性但细节不同的(例如超时参数)线程池。 * * @return 新创建的线程池实例 */public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}
注:一般在工程上不建议使用它们创建线程池,阿里java开发手册
2、通过 ScheduledExecutorService 创建定时任务线程池:
ScheduledExecutorService 是 ExecutorService 的子接口,专门用于支持定时及周期性任务执行的线程池。可以通过 Executors.newScheduledThreadPool(int corePoolSize) 方法创建:
/** * 创建一个可以调度命令在指定延迟后运行,或周期性执行的线程池。 * * @param corePoolSize 即使在空闲时,线程池中也要保持的线程数量 * @return 新创建的ScheduledExecutorService实例,即计划线程池 * @throws IllegalArgumentException 如果corePoolSize小于0 */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue());}
3、通过 ThreadPoolExecutor 构造函数直接创建:
使用 ThreadPoolExecutor 的构造函数可以更加灵活地配置线程池,例如指定核心线程数、最大线程数、线程存活时间、工作队列等参数,如下所示:
public ThreadPoolExecutor( int corePoolSize, // 核心线程数int maximumPoolSize, // 最大线程数long keepAliveTime, // 线程存活时间TimeUnit unit, // 线程存活时间单位BlockingQueue<Runnable> workQueue, // 阻塞队列ThreadFactory threadFactory, // 线程工厂RejectedExecutionHandler handler //拒绝策略)
四、ThreadPoolExecutor 线程池的执行过程
流程图是这样的:
整体流程就是:
1、任务来的时候,如果当前线程数小于核心线程数,则创建线程执行任务2、否则,将任务添加到阻塞队列3、如果队列已满case1:如果当前线程数小于最大线程数,则创建线程执行任务case2:如果当前线程数大于等于最大线程数,则执行拒绝策略
其实这个流程整体上来说是没什么问题的,也是大多数面试者回答该问题的标准答案。
但是这里有一个小问题,就是当我们把核心线程数设置为0的时候,声明的阻塞队列长度随便给一个大于0的值即可(此处我使用LinkedBlockingQueue队列,它的默认长度是Integer的最大值)。
我们来看看下面的代码,大家觉得会输出什么?
public static void testThreadPool() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); threadPoolExecutor.execute(()->{ System.out.println("线程池测试: " + new Date()); });}
如果按照上面流程图的步骤进行的话,什么也不会输出,因为此任务会加入到阻塞队列里面,对不对?
其实它输出了,结果如下
你现在可能有一个大大问号,其实我们可以从源码那里得到答案,源码如下:
public void execute(Runnable command) { // 检查传入任务是否为空,为空抛出异常 if (command == null) throw new NullPointerException(); // 获取当前线程池控制状态 int c = ctl.get(); // 1、如果当前线程数小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 添加一个新的工作线程来执行任务 // 添加成功,直接返回 if (addWorker(command, true)) return; // 再次获取线程池控制状态,防止并发 c = ctl.get(); } // 2、如果线程池处于运行状态,并且任务能够加入队列 if (isRunning(c) && workQueue.offer(command)) { // 再次获取线程池控制状态,防止并发 int recheck = ctl.get(); // 2.1、如果线程池不再运行,并且任务能够从队列中移除,则拒绝任务 if (! isRunning(recheck) && remove(command)) reject(command); // 2.2、如果当前线程池没有运行的线程(此处可以打消你的问号) else if (workerCountOf(recheck) == 0) // 添加一个新的非核心工作线程 addWorker(null, false); } // 3、添加一个新的非核心工作线程,如果失败,拒绝任务 else if (!addWorker(command, false)) reject(command);}
所以说,从源码的角度具体去分析线程池执行任务的流程如下:
1、任务来的时候,如果当前线程数小于核心线程数,则创建线程执行任务2、否则,线程池处于运行状态并且任务能够加入队列case1:判断线程池运行状态,如果线程池状态不是运行的情况下,同时任务可以从队列移除,那么就拒绝该任务case2:判断当前是否有工作线程,如果没有,则创建一个非核心的工作线程3、如果队列已满case1:如果当前线程数小于最大线程数,则创建线程执行任务case2:如果当前线程数大于等于最大线程数,则执行拒绝策略
我专门下载了一个jdk1.5测试了一下,代码如下:
package main;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Test { public static void main(String[] args) { System.out.println(1); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1)); threadPoolExecutor.execute(new TestThread()); } static class TestThread implements Runnable{ public void run() { System.out.println(123); } }}
它确实只输出了1,没有输出123,
我将核心线程数改为1,再看下运行结果,这个时候他把123打印出来啦,此处就直接截图啦
我们在进入1.5的源码里面看看
/** * 执行任务 * @param command 要执行的任务 * @throws RejectedExecutionException 如果任务无法被接受执行,根据<tt>RejectedExecutionHandler</tt>的策略抛出此异常 * @throws NullPointerException 如果command参数为null */public void execute(Runnable command) { // 检查任务是否为空,如果为空则抛出NullPointerException异常 if (command == null) throw new NullPointerException(); // 循环尝试提交任务执行,直到成功或达到某个条件退出 for (;;) { // 检查线程池的运行状态,如果不是运行状态,则拒绝任务执行 if (runState != RUNNING) { reject(command); return; } // 尝试增加线程池中的线程数来执行任务,如果成功则退出循环 if (poolSize < corePoolSize && addIfUnderCorePoolSize(command)) return; // 尝试将任务放入工作队列,如果成功则退出循环 if (workQueue.offer(command)) return; // 尝试在不超过最大线程池大小的情况下增加线程池大小并执行任务,如果成功则退出循环 Runnable r = addIfUnderMaximumPoolSize(command); if (r == command) return; // 如果无法增加线程池大小且任务无法放入工作队列,则拒绝任务执行 if (r == null) { reject(command); return; } // 否则继续尝试提交任务执行 }}
这个源码才符合上面哪个线程池执行任务的流程图。
五、线程池的拒绝策略
名称 | 描述 |
---|---|
AbortPolicy | 丢弃任务并抛出RejectedExecutionException异常。这是线程池默认的拒绝策略,在任务不能再提交的时候,抛出异常,及时反馈程序运行状态。如果是比较关键的业务推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现。 |
CallerRunsPolicy | 由调用线程(提交任务的线程)处理该任务。这种情况是需要让所有任务都执行完毕那么就适合大量计算的任务类型去执行,多线程仅仅是增大吞吐量的手段,最终必须要让每个任务都执行完毕。 |
DiscardPolicy | 丢弃任务,但是不抛出异常。 使用此策略,可能会使我们无法发现系统的异常状态。建议是一些无关紧要的业务采用此策略。 |
DiscardOldestPolicy | 丢弃队列最前面的任务,然后重新提交被拒绝的任务。是否要采用此种拒绝策略,还得根据实际业务是否允许丢弃老任务来认真衡量。 |
六、线程池提交任务
线程池有两种提交任务的方式,
1、使用submit(Runnable task),有返回值
public static void testThreadPool() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); Future<?> submit = threadPoolExecutor.submit(() -> { System.out.println("线程池测试submit: " + new Date()); return 1; }); try { Object o = submit.get(); System.out.println(o); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); }}
2、使用execute(Runnable task),没有返回值
public static void testThreadPool() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); threadPoolExecutor.execute(()->{ System.out.println("线程池测试execute: " + new Date()); });}
七、线程池的状态
名称 | 描述 |
---|---|
RUNNING | 会接收新任务并且会处理队列中的任务 |
SHUTDOWN | 不会接收新任务并且会处理队列中的任务,任务处理完后中断所有线程 |
STOP | 不会接收新任务并且不会处理队列中的任务,直接中断所有线程 |
TIDYING | 所有任务都已终止,workerCount为0 |
TERMINATED | terminated()执行完成之后就会转变为TERMINATED |
这五种状态之间的转换
RUNNING -> SHUTDOWN:调用shutdown方法RUNNING or SHUTDOWN -> STOP:调用shutdownNow方法SHUTDOWN -> TIDYING:阻塞队列为空,线程池中工作线程数为0STOP-> TIDYING:线程池中工作线程数为0TIDYING -> TERMINATED:执行erminated方法