线程池
# 线程池
# 使用线程池的好处
处理过程中将任务放入队列,如果线程数量超过最大数量,超出数量的线程排队等候。
线程复用,控制最大并发数,管理线程。
- 降低资源消耗,通过重复利用已创建的线程,降低线程创建和销毁造成的消耗。
- 执行响应速度快,当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
# Executor启动线程的好处
通过Executor来启动线程比使用Thread的start方法更好,方便管理、效率高、避免this逃逸问题。
this逃逸是指在构造函数返回之前其他线程就持有该对象的引用。调用尚未构造完全的对象的方法可能引发令人疑惑的错误。
Executor框架不仅包括线程池的管理,还提供线程工厂、队列以及拒绝策略等,Executor框架让并发编程更简单。
# 线程实现类ThreadPoolExectuor类简单介绍
提供四个构造方法,来看看最长的构造方法:
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
七大参数
- corePoolSize:核心线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中。
- maximumPoolSize:最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量。
- keepAliveTime:当前线程池中空闲线程数量超过corePoolSize时,多余的线程会多长时间内被销毁。
- unit:keepAliveTime的单位
- workQueue:任务队列,用来存储等待执行任务的队列;它一般分为直接提交队列、有界任务队列、无界任务队列、优先队列。
- threadFactory:线程工厂,用于线程创建,一般默认即可。
- handler:拒绝策略;当任务太多来不及处理时,可以定制策略来处理任务。
# ThreadPoolExecutor核心线程数量和最大线程数量
提交的新任务少于corePoolSize,即使其他工作线程处于空闲状态,也会创建新线程处理。
提交的新任务大于corePoolSize,小于maximumPoolSize,只有当队列已满时才会创建新线程
通过设置corePoolSize和maximumPoolSize相同,可以创建固定大小的线程池。
将maximumPoolSize设置为Integer.MAX_VALUE,可以容纳任意数量的并发数量。
可以使用setCorePoolSize和setMaximumSize进行动态更改。
# 线程池的底层工作原理
1 提交任务后,如果核心线程数量没有满,则创建新线程来执行任务。
2 提交任务后,如果核心线程数量已满,则将任务放入等待队列。
3 当等待队列已满时,且没有达到最大线程数量,则创建新线程来执行任务。
4 当等待队列已满时,且达到最大线程数量时,新提交任务将被拒绝。
5 当队列中的任务执行完毕时,空闲线程空闲的时间超过keepAliveTime,则关闭空闲线程。(超过核心数量的线程为空闲线程)
# 线程数量的选择
首先查看CPU核心数量:
System.out.println(Runtime.getRuntime().availableProcessors());
📌 CPU密集型
CPU密集型任务配置尽可能少的线程数量,一般:CPU核数 + 1个线程 的线程池
📌 IO密集型
由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程
(1)CPU核数*2
(2)CPU核数 / (1 - 阻塞系数),阻塞系数在0.8~0.9之间
# ThreadPoolExecutor拒绝策略
队列已满且线程池创建的线程数量达到最大线程数时,需要指定拒绝策略来处理线程池超载的情况。
- AbortPolicy策略(默认):直接抛出RejectedExecutionException异常,阻止系统正常运行。
- CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行。
- DiscardOldestPolicy策略:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。
- DiscardPolicy策略:直接丢弃任务。
# 自定义创建线程池
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
2
3
4
5
6
7
8
# 三种创建线程池的方法
⭐ newFixedThreadPool
特点:执行长期的任务,性能好很多
固定线程数的线程池,corePoolSize和maximumPoolSize值相同,keepAliveTIme为0,等待队列为LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
2
3
⭐ newSingleThreadExecutor
特点:一个任务一个任务执行的场景
corePoolSize和maximumPoolSize值均为1,其他参数与FixedThreadPool相同。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
2
3
4
⭐ newCachedThreadPoolPool
特点:执行很多短期异步的小程序或者负载较轻的服务器
corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,KeepAliveTime为60L,使用无容量的SynchronousQueue作为队列。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
2
3
案例:
public class ThreadPoolDemo {
public static void main(String[] args) {
// 一池5个处理线程
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// 一池1个处理线程
// ExecutorService threadPool = Executors.newFixedThreadPool(5);
// 一池N个处理线程
// ExecutorService threadPool = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 20; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 处理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
存在问题:阻塞队列使用LinkedBlockingQueue,允许请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
# 阻塞队列BlockingQueue
支持两个附加操作的队列,分别为:
当阻塞队列为空时,从队列中获取元素的操作将被阻塞。
当组设队列为满时,往队列里添加元素的操作将被阻塞。
试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。
试图往已满的阻塞队列中添加新的元素的线程将会被阻塞,直到其他的线程从队列中移除一个或多个元素。
具有以下七种实现类:
- ⭐ArrayBlockingQueue:由数组结构组成的有界阻塞队列,构造时需要指明大小,FIFO。
- ⭐LinkedBlockingQueue:由链表结构组成的有界阻塞队列(默认大小Integer.MAX_VALUE,有界但等同于无界),可指明大小,也可不指明大小,FIFO。
- PriorityBlockingQueue:支持优先级排序的无界阻塞队列,依据对象的自然排序顺序或者构造函数所带的Comaprator决定顺序。
- DelayQueue:使用优先级队列实现的延迟无解阻塞队列。
- ⭐SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列,对齐的操作必须是放和取交替完成。
- LinkedTransferQueue:由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列
使用阻塞队列,不需要关心什么时候需要阻塞线程,什么时候唤醒线程,因为这一切BlockingQueue都帮我们解决了。在JUC包发布之前,在多线程环境下,每个程序员都必须自己控制这些细节。
# 阻塞队列操作详解
Throws exception | Special value | Blocks | Times out | |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | not applicable | not applicable |
抛出异常(Throws exception):
- 队列满时,再add插入元素会抛出异常
IllegalStateException: Queue full
- 队列空时,再remove溢出元素会抛出异常
NoSuchElementException
特殊值(Special Value):
- 插入offer方法,成功true,失败false
- 移除poll方法,成功返回元素,失败返回null
阻塞(Blocks)
- 队列满时,生产者线程put元素,队列会一直阻塞生产线程知道put数据成功。
- 队列空时,消费者线程take元素,队列会一直阻塞消费者线程知道队列有可用元素。
超时退出(Times out)
- 队列满时,添加元素会阻塞线程一定时间,超过时间后生产者线程会退出返回false。
- 队列空时,获取元素会组设线程一定时间,超过时间后消费者线程会退出返回false。