创建多线程的方法
# 创建多线程的方法
# 继承Thread创建线程
- 继承Thread类,重写run()方法。
- 创建该子类的实例
- 使用start()方法启动线程。
public class MyThread extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": Running!");
}
public static void main(String[] args) {
MyThread t = new MyThread();
t.setName("t1");
t.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
匿名内部类的写法
public class MyThread{
public static void main(String[] args) {
Thread t = new Thread() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": Running!");
}
};
t.setName("t1");
t.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
# 通过Runable创建线程
- 定义Runnable接口的实现类,重写run方法(执行体)
- 创建Runnable实现类的实例,以此实例作为Thread的target对象创建Thread对象。
- 调用start()启动该线程。
把【线程】和【任务】(要执行的代码)分开
public class MethodA implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": Running!");
}
public static void main(String[] args) {
Runnable r = new MethodA();
Thread t1 = new Thread(r);
t1.setName("t1");
t1.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
匿名内部类的写法
public class MyThread{
public static void main(String[] args) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": Running!");
}
});
t.setName("t1");
t.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
Lambda表达式简化
public class MyThread{
public static void main(String[] args) {
Thread t = new Thread(() -> System.out.println(Thread.currentThread().getName() + ": Running!"));
t.setName("t1");
t.start();
}
}
2
3
4
5
6
7
🍳 Thread与Runnable的关系
Thread
实现了Runnable
class Thread implements Runnable {...}
所以,在Thread
内要实现Runnable
接口内的方法run()
。可以看到,当target不为空时,在run()
方法中又调用target.run()
。
@Override
public void run() {
if (target != null) {
target.run();
}
}
2
3
4
5
6
所以接下来要看看target
是什么,在初始化方法init()
中。
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
...
this.target = target;
...
}
2
3
4
5
6
7
所以,target
就是一个实现Runnable
接口所创建的对象。
所以,如果创建了一个Runnable
对象给Thread,那么Thread
运行的就是一个Runnable
对象中的run
方法;如果没创建Runnable
对象,就需要重写run
方法,Thread
将运行这个重写的run
方法。
# 通过Callable和FutureTask创建线程
- 创建Callable接口的实现类,实现call方法(执行体,有返回值),并创建Callable实现类的对象。
- 使用FutureTask类来包装Callable对象,该FutureTask对象封装了该Callable对象的call()方法的返回值。
- 使用FutureTask对象作为Thread对象的target创建并启动线程。
- 调用FutureTask对象的get()方法来获得子线程执行结束后的返回值。
与【Runnable】相比,可以有返回值,通过get()方法来获得子线程执行结束后的返回值。
public class MyThread{
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建Callable接口的实现类对象
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName() + ": Running!");
return 1;
}
};
// 使用FutureTask类来包装Callable对象
FutureTask<Integer> task = new FutureTask<>(callable);
// 使用FutureTask对象作为Thread对象的target创建并启动线程
Thread thread = new Thread(task, "t1");
thread.start();
// 阻塞获取结果
System.out.println("call方法的返回值: " + task.get());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
通过查看Thread的构造器,可以看到它可以传入的只有Runnable的对象
那它为何能够传入Callable对象呢?
因为我们使用FutureTask类来包装Callable对象,同时FutureTask间接实现了Runnable接口。
# 线程池创建线程
# 线程参数说明
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
🚩 重要参数
- corePoolSize:核心线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中。
- maximumPoolSize:最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量。
- keepAliveTime:当前线程池中空闲线程数量超过corePoolSize时,多余的线程会多长时间内被销毁。
- unit:keepAliveTime的单位
- workQueue:任务队列,用来存储等待执行任务的队列;它一般分为直接提交队列、有界任务队列、无界任务队列、优先队列。
- threadFactory:线程工厂,用于线程创建,一般默认即可。
- handler:拒绝策略;当任务太多来不及处理时,可以定制策略来处理任务。
🧭 核心线程数量和最大线程数量
提交的新任务少于corePoolSize,即使其他工作线程处于空闲状态,也会创建新线程处理。
提交的新任务大于corePoolSize,小于maximumPoolSize,只有当队列已满时才会创建新线程
通过设置corePoolSize和maximumPoolSize相同,可以创建固定大小的线程池。
将maximumPoolSize设置为Integer.MAX_VALUE,可以容纳任意数量的并发数量。
可以使用setCorePoolSize和setMaximumSize进行动态更改。
当线程数量达到maximumPoolSize且等待队列已满时开始执行拒绝策略。
🚀 任务队列workQueue
- 直接提交队列,SynchronousQueue是一个特殊的BlockingQueue,他没有容量,提交的任务不会被保存,总是会马上提交执行,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。
- 有界的任务队列,ArrayBlockingQueue,若有新的任务需要执行,线程池会创建新的线程,直到创建数量达到corePoolSzie时,则会将新的任务加入到等待队列中。若多列已满,则继续创建线程数量达到maximumPoolSize,若线程数量大于maximumuPoolSize,则执行拒绝策略。
- 无界任务队列,LinkedBlockingQueue,任务队列可以无限制的添加新的任务,线程池创建的最大数量是corePoolSize,maximumPoolSize这个参数是无效的。(不设置容量时,默认大小为Integer.MAX_VALUE,可以看成无界)
- 优先任务队列,ProrityBlockingQueue,特殊的无界队列,线程池创建的最大数量是corePoolSize,maximumPoolSize这个参数是无效的。一般队列是按照先进先出的规则处理任务,而该队列可以自定义规则根据优先级顺序先后执行。
🚨 拒绝策略
队列已满且线程池创建的线程数量达到最大线程数时,需要指定拒绝策略来处理线程池超载的情况。
- AbortPolicy策略:直接抛出异常,阻止系统正常工作。
- CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行。
- DiscardOldestPolicy策略:丢弃任务队列中最老的一个任务,并尝试再次提交。
- DiscardPolicy策略:直接将任务丢弃。
📑 关闭线程池
shutdownNow():立即关闭线程池(暴力),正在执行中的及队列中的任务会被中断,同时该方法会返回被中断的队列中的任务列表 shutdown():平滑关闭线程池,正在执行中的及队列中的任务能执行完成,后续进来的任务会被执行拒绝策略 isTerminated():当正在执行的任务及对列中的任务全部都执行(清空)完就会返回true
# Executors工具类创建线程
线程池可以自动创建也可以手动创建,自动创建体现在Executors工具类中,常见的可以创建newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor、newScheduledThreadPool;
手动创建体现在可以灵活设置线程池的各个参数,体现在代码中即ThreadPoolExecutor类构造器上各个实参的不同。
# newFixedThreadPool()
创建固定大小的线程池
Executors.newFixedThreadPool(2)
测试代码:
public class MyThread{
public static void main(String[] args) {
// 创建最多含有2个线程数量的线程池。
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 10; i++) {
executorService.execute(new MethodA());
}
}
}
2
3
4
5
6
7
8
9
10
11
使用的构造方式为
new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())
设置了corePoolSize=maxPoolSize,keepAliveTime=0(此时该参数没作用),无界队列,任务可以无限放入,当请求过多时(任务处理速度跟不上任务提交速度造成请求堆积)可能导致占用过多内存或直接导致OOM异常
# newSingleThreadExecutor()
创建只有一个线程的线程池
Executors.newSingleThreadExecutor()
测试代码:
public class MyThread{
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
executorService.execute(new MethodA());
}
}
}
2
3
4
5
6
7
8
9
使用的构造方式为
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0)
基本同newFixedThreadPool,但是将线程数设置为了1,单线程,弊端和newFixedThreadPool一致。
# newCachedThreadPool()
创建一个不限线程数上限的线程池,任何提交的任务都将立即执行
Executors.newCachedThreadPool();
测试代码:
public class MyThread{
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
executorService.execute(new MethodA());
}
}
}
2
3
4
5
6
7
8
9
使用的构造方式为
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue())
corePoolSize=0,maxPoolSize为很大的数,同步移交队列,也就是说不维护常驻线程(核心线程),每次来请求直接创建新线程来处理任务,也不使用队列缓冲,会自动回收多余线程,由于将maxPoolSize设置成Integer.MAX_VALUE,当请求很多时就可能创建过多的线程,导致资源耗尽OOM
# newScheduledThreadPool()
支持定时周期性执行,注意一下使用的是延迟队列
Executors.newScheduledThreadPool(2)
测试代码:
public class MyThread{
public static void main(String[] args) {
ExecutorService executorService = Executors.newScheduledThreadPool(2);
for (int i = 0; i < 10; i++) {
executorService.execute(new MethodA());
}
}
}
2
3
4
5
6
7
8
9
使用的构造方式为
new ThreadPoolExecutor(var1, Integer.MAX_VALUE, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue())
支持定时周期性执行,注意一下使用的是延迟队列,弊端同newCachedThreadPool一致
# 向线程池中添加任务
向线程池中添加任务
import java.util.concurrent.*;
public class TestFuture {
public static void main(String[] args) {
TestBusiness t = new TestBusiness();
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,3,0L, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>());
try {
// 添加任务1,submit内需要添加Callable接口的实现类,通过匿名内部类的方法实现
Future<Integer> future1 = threadPool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return t.getCorpId();
}
});
// 添加任务2,submit内使用lambda表达式简化匿名内部类
Future<Integer> future2 = threadPool.submit(() -> t.getCorpId());
System.out.println(future1.get());
System.out.println(future2.get());
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
class TestBusiness{
public Integer getCorpId() {
return new Integer(1);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36