抽象队列同步器AQS
# 抽象队列同步器AQS
# AQS介绍
AQS,AbstractQueuedSynchronizer,抽象队列同步器,用来构建锁和同步器的架构,依赖FIFO等待队列和一个表示状态的原子整型类。子类必须实现改变state状态的protected方法,state表示资源对象是否被获取和释放。
AQS组件有:ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch、CyclicBarrier
锁,面向锁的使用者,定义了程序员和锁交互的使用层API,隐藏了实现细节,调用即可。
同步器,面向锁的实现者,比如Java并发大神DougLee,提出统一规范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。
https://mp.weixin.qq.com/s/1Nq_izUkOGmtvkIQ9c0QRQ
# AQS原理简单分析
AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效工作线程,并将共享资源设置为锁定状态(通过CAS操作将state从0变为1)。如果被请求的共享资源被占用,那么将该线程封装到一个Node内,放到等待队列的尾部进行排队。
通过 CAS、自旋的方式,维护 state 变量的状态,使并发达到同步的控制效果。
Node 的数据结构包含 thread、waitStatus、pre、next,witStatus=-1 表示后续有节点需要当前线程释放资源后,唤醒后面的线程。
AQS使用一个 volatile 修饰的整型的成员变量 state 来表示同步状态,通过内置的队列来完成获取资源线程的排队工作,AQS 使用 CAS 对该同步状态进行修改。
# AQS参数介绍
⭐ AQS自身
private volatile int state
:0表示资源空闲,线程可以直接获取;1表示资源被占用,线程阻塞等待。
CLH队列
:三个大牛的名字组成,为一个双向队列。通过自选等待,state变量判断是否阻塞,从尾部入队,从头部出队。
private transient volatile Node head
:头指针
private transient volatile Node tail
:尾指针
⭐ 内部类Node
组成:waitStatus等待状态 + 前后指针 + 线程
static final Node SHARED = new Node()
:表示线程以共享的模式等待锁
static final Node EXCLUSIVE = null
:表示线程以独占的方式等待锁
volatile Node prev
:前指针
volatile Node next
:后指针
volatile int waitStatus
:阻塞线程的等待状态
- 0,默认值
- 1,CANCALLED,表示线程获取锁的请求已取消
- -2,CONDITION,表节点在等待队列中,节点线程等待唤醒
- -3,PROPAGATE,当前线程处于SHARED情况下,该字段才会使用
- -1,SIGANAL,表示线程已经准备好了,就等资源释放了
volatile Thread thread
:阻塞的线程
# ReentrantLock与AQS的关系
ReentrantLock 的内部类 Sync 就是 AQS
Lock 接口的实现类,基本都是通过【聚合】一个【队列同步器】的子类完成线程访问控制。
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {...}
static final class NonfairSync extends Sync {...}
static final class FairSync extends Sync {...}
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# AQS对资源的共享方式
Exclusive(独占)
只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁。
公平锁:按照现场在堆中的排队顺序,先到者先拿到锁。
非公平锁:当线程要获取锁时,先通过两次 CAS 操作去枪锁,如果没抢到,当前线程再加入队列中等待唤醒。ReentrantLock 默认采用非公平锁,效率高。
Share(共享)
多个线程同时执行,如 Semaphore,CountDownLatch。
# Semaphore介绍
信号量,完成信号量控制,可以控制某个资源被多个线程同时访问:synchronized和ReetrantLock都是一次只允许一个线程访问某个资源,Semaphore可以指定多个线程同时访问某个资源。
执行acquire()方法会阻塞,直到有一个许可证可以获取,然后拿走许可证,方法就会进行下去;release()释放一个许可证,这样可能释放一个阻塞的acquire()方法。
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "\t抢到车位!");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "\t离开车位!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
输出
Thread-0 抢到车位!
Thread-1 抢到车位!
Thread-2 抢到车位!
等待3s
Thread-1 离开车位!
Thread-3 抢到车位!
Thread-0 离开车位!
Thread-4 抢到车位!
Thread-2 离开车位!
Thread-5 抢到车位!
等待3s
Thread-5 离开车位!
Thread-3 离开车位!
Thread-4 离开车位!
2
3
4
5
6
7
8
9
10
11
12
13
14
# CountDownLatch介绍
倒计时器,允许count个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
两种典型用法:
某个线程在开始运行前等待n个线程执行完毕。
实现多个线程开始执行任务的最大并行性。
并行性,不是并发,强调的时多个线程在某一个时刻同时执行。类似于赛跑的发令枪。
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 山晚自习,离开教室");
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "\t \t 班长关门走人!");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
输出
0 山晚自习,离开教室
4 山晚自习,离开教室
5 山晚自习,离开教室
3 山晚自习,离开教室
2 山晚自习,离开教室
1 山晚自习,离开教室
main 班长关门走人!
2
3
4
5
6
7
# CyclicBarrier介绍
循环栅栏,让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙!!");
});
for (int i = 0; i < 14; i++) {
int index = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "/t 收集到第:" + index + "龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
输出
Thread-0/t 收集到第:0龙珠
Thread-9/t 收集到第:9龙珠
Thread-5/t 收集到第:5龙珠
Thread-10/t 收集到第:10龙珠
Thread-6/t 收集到第:6龙珠
Thread-7/t 收集到第:7龙珠
Thread-4/t 收集到第:4龙珠
Thread-12/t 收集到第:12龙珠
Thread-1/t 收集到第:1龙珠
Thread-8/t 收集到第:8龙珠
Thread-3/t 收集到第:3龙珠
Thread-2/t 收集到第:2龙珠
Thread-13/t 收集到第:13龙珠
召唤神龙!!
Thread-11/t 收集到第:11龙珠
召唤神龙!!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# CountDownLatch和CyclicBarrier的区别
CountDownLatch是做减法,CyclicBarrier是做加法。
CountDownLatch是计数器,只能用一次;CyclicBarrier的计数器提供了reset功能,可以多次使用。
对于CountDownLatch来说,重点是一个线程(多个线程)等待,而其他N个线程在完成某件事情之后,可以终止,也可以等待;对于CyclicBarrier,重点是多个线程,任意一个线程没有完成,所有的线程都必须等待。
CountDownLatch是计数器,线程完成一个记录一个,只不过计数器不是递增而是递减,而CyclicBarrier更像一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。