ReentrantLock非公平锁的源码分析
ReentrantLock 非公平锁的实现源码分析:lock、unlock、线程阻塞、阻塞线程如何唤醒等
# 1. 前置知识
# 1.1 CAS
CAS,Compare And Swap,比较与交换,是一种无锁的算法。在不使用锁的情况下实现多线程之间变量的同步。
CAS操作涉及三个操作数:需要读写的内存值V,进行比较的值A,要写入的新值B。
当V=A时,CAS通过原子方式用新值B更新内存中的旧值V,否则将不会执行操作。一般情况下,”更新“是一个不断重试的操作。
# 1.2 Unsafe类
在Java中,Unsafe类是CAS的核心类,存在于sun.misc
包中,由于Java方法无法直接访问底层系统,需要本地(Native)方法来访问,Unsafe相当于一个后门,其内部方法操作可以像C的指针一样直接操作内存。
CAS,是一条CPU并发原语。它的功能是判断内存某个位置是否位预期值,如果是则更改为新的值,这个过程是原子的。JVM会帮我们实现出CAS汇编指令。这是一种完全依赖于硬件的功能,通过它实现了原子操作。
原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成数据不一致问题。
public final class Unsafe {
...
// CAS相关
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
// 阻塞相关
public native void unpark(Object var1);
public native void park(boolean var1, long var2);
...
}
2
3
4
5
6
7
8
9
10
11
12
# 1.2 LockSupport
LockSupport是用来创建锁和其他同步类的基本线程阻塞原语,是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,阻塞之后也有对应的唤醒方法。归根结底,LockSupport调用的Unsafe中的native代码。
LockSupport中的park()
和unpark()
的作用分别是阻塞线程和解除阻塞线程。
LockSupport和每个使用它的线程都有一个许可(permit)关联,permit相当于1,0开关,默认是0。可以把许可看成是一种(0, 1)信号量(Semaphore),但与Semaphore不同的是,许可的累加上限是1。
调用unpark就加1;调用一次park就会消费一个permit,也就是将1变成0,同时park立刻返回。如果再次调用park会阻塞,因为permit变为零会阻塞在这里,直到permit变为1。permit最大值为1,重复调用也不会累加。
LockSupport.park底层:
public static void park() {
UNSAFE.park(false, 0L);
}
2
3
LockSupport.unpark底层:
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
2
3
4
📃 代码案例
public class LockSupportDemo {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---come in!");
LockSupport.park(); // 阻塞,等待通知放行,它需要许可证
System.out.println(Thread.currentThread().getName() + "\t" + "---被唤醒!");
}, "A");
t1.start();
Thread t2 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t" + "---通知!");
LockSupport.unpark(t1);
}, "B");
t2.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 1.3 AQS
# 1.3.1 简单介绍
AQS,AbstractQueuedSynchronizer,抽象队列同步器,用来构建锁和同步器的架构。
通过AQS构建的组件有:ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch、CyclicBarrier
AQS核心思想是,如果被请求的资源空闲,则将当前请求资源的线程设置为有效工作线程,并将共享资源设置为锁定状态;如果被请求的共享资源被占用,那么这个线程就要排队等待资源的释放。
通过核心思想我们可以得到两个重要概念:共享资源状态、队列。
- 共享资源状态:用来判断当前线程是被占用还是空闲。
- 队列:没有获得资源的线程将放到队列中等待。
共享资源的状态通过state变量表示,0表示空闲,1表示被占用,通过CAS操作对其状态进行修改。
如果被请求的共享资源被占用。那么将该线程封装到一个Node内,通过CAS操作放到等待队列中的尾部进行排队。
从图中可以看出,CLH队列是一个双向队列,或者说是双向链表,可以通过当前节点获取前后两个节点。
# 1.3.2 参数介绍
⭐ AQS自身
private volatile int state
:0表示资源空闲,线程可以直接获取;1表示资源被占用,线程阻塞等待。
CLH队列
:三个大牛的名字组成,为一个双向队列。通过自选等待,state变量判断是否阻塞,从尾部入队,从头部出队。
private transient volatile Node head
:头指针
private transient volatile Node tail
:尾指针
⭐ 内部类Node
组成:waitStatus等待状态 + 前后指针 + 线程
static final Node SHARED = new Node()
:表示线程以共享的模式等待锁
static final Node EXCLUSIVE = null
:表示线程以独占的方式等待锁
volatile Node prev
:前指针
volatile Node next
:后指针
volatile int waitStatus
:阻塞线程的等待状态
- 0,默认值
- 1,CANCALLED,表示线程获取锁的请求已取消
- -2,CONDITION,表节点在等待队列中,节点线程等待唤醒
- -3,PROPAGATE,当前线程处于SHARED情况下,该字段才会使用
- -1,SIGANAL,表示后继节点的线程需要被唤醒。当一个节点的前驱节点释放锁或者取消等待时,它会将自己的等待状态设置为SIGNAL,以通知后继节点需要被唤醒。
volatile Thread thread
:阻塞的线程
# 2. 源码解读
# 2.1 分析案例
下面以一段简单的代码为切入点,分析每一步代码是如何运转的。
在该案例中,一共有三个线程和一把锁,它们将依次获取锁,执行任务后释放锁。我们接下来将分析:
- 如果锁没有被占用,执行
lock()
后会发生上面? - 如果锁被占用,执行
lock()
后会发生上面? - 执行
unlock()
后会发生上面?
public static void main(String[] args) {
// ReentrantLock 非公平锁
Lock lock = new ReentrantLock();
// 线程A
new Thread(() -> {
lock.lock();
System.out.println(Thread.currentThread().getName() + "\t" + "开始工作!");
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
lock.unlock();
}, "A").start();
// 线程B
new Thread(() -> {
lock.lock();
System.out.println(Thread.currentThread().getName() + "\t" + "开始工作!");
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
lock.unlock();
}, "B").start();
// 线程C
new Thread(() -> {
lock.lock();
System.out.println(Thread.currentThread().getName() + "\t" + "开始工作!");
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
lock.unlock();
}, "C").start();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 2.2 公平锁&非公平锁
公平锁:多个线程按照申请锁的顺序来获取锁,先来先解决。 非公平锁:多线程获取锁的顺序可能不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。
首先来看看ReentrantLock
的构造器:
// 创建一个非公平锁的实例
public ReentrantLock() {
sync = new NonfairSync();
}
// 根据传入参数创建公平锁或非公平锁,true:公平锁,false:非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
2
3
4
5
6
7
8
9
也就是说,我们通过下面这个命令创建的锁是非公平锁:
Lock lock = new ReentrantLock();
# 2.3 Sync
可以看到非公平锁NonfairSync
和公平锁FairSync
两个内部类都是继承了Sync
内部类。
static final class NonfairSync extends Sync {
...
}
static final class FairSync extends Sync {
...
}
2
3
4
5
6
7
而Sync
这个抽象内部类则是继承了AQS(AQS就是一个模板)。
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}
2
3
所以说,ReentrantLock
的实现原理就是AQS
。
# 2.4 state=0状态下的lock()
第一个线程开始工作【🚩标记0】
// 线程A
new Thread(() -> {
lock.lock();
System.out.println(Thread.currentThread().getName() + "\t" + "开始工作!");
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
lock.unlock();
}, "A").start();
2
3
4
5
6
7
执行第一句lock.lock();
public void lock() {
sync.lock();
}
2
3
最终会定位到NonfairSync
中的lock()
方法【🚩标记1】
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
2
3
4
5
6
首先看compareAndSetState(0, 1)
,通过名字也可以看出它的涵义,通过CAS操作将state状态从0变为1。它的实现是通过unsafe中的本地方法来实现的,第一部分已经简单介绍过了。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2
3
4
因为此时资源空闲,所以state=0,因此CAS操作成功,成功把资源状态state设置为1。记住,现在的state=1。
回到【标记1】的代码:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
2
3
4
5
6
if条件成立,执行setExclusiveOwnerThread(Thread.currentThread())
,通过名字看出它的含义是:将当前线程设置为拥有锁且正在执行线程,也就是说线程A为这把锁的拥有者。
此时lock.lock()
语句执行完毕,回到【标记0】,继续执行下面部分。
System.out.println(Thread.currentThread().getName() + "\t" + "开始工作!");
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
2
记住,此时state状态为1,线程A拥有这把锁。
# 2.5 state=1状态下的lock()
第二个线程开始工作【🚩标记2】
// 线程B
new Thread(() -> {
lock.lock();
System.out.println(Thread.currentThread().getName() + "\t" + "开始工作!");
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
lock.unlock();
}, "A").start();
2
3
4
5
6
7
与之前相同,执行第一句lock.lock();
public void lock() {
sync.lock();
}
2
3
与之前相同,最终会定位到NonfairSync
中的lock()
方法【🚩标记3】
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
2
3
4
5
6
与之前相同,执行compareAndSetState(0, 1)
,如果state=0,则通过CAS操作将state状态从0变为1,但是state不为0,所以CAS操作失败,if不成立,执行acquire(1)
。
下面来看看acquire()
方法,传入的参数arg=1【🚩标记4】
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2
3
4
先看tryAcquire(arg)
方法,找到它在AQS中的方法,如果调用AQS中的方法会直接抛出异常,所以子类必须要重写这个方法。如果我们要使用AQS模板自定义一些功能的话,必须要重写这个方法。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
2
3
我们找到它在NonfairSync
中的实现。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
2
3
它又调用了Sync
中的nonfairTryAcquire(acquires)
方法,也就是说【标记4】中的tryAcquire(arg)
其实就是nonfairTryAcquire(int acquires)
方法。
final boolean nonfairTryAcquire(int acquires) { // 此时 acquires = 1
// 获取当前线程,当前线程是B线程
final Thread current = Thread.currentThread();
// 获取state状态,前面我们反复强调了,state的状态为1,表示被占用(当前被线程A占用)
int c = getState();
// state=1,if条件不成立
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 判断当前的线程是不是拥有这个锁的线程,这个地方是判断可重入
// 拥有锁的线程是A,当前线程是B,所以else if不成立
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 最后只能返回false
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
经过前面的判断,我们知道了当前state=1,被其他线程占用;同时,该线程不是拥有这个锁的线程,不可重入,将返回false。
【标记4】的代码(重新写一下,这样就不用往上翻了)
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2
3
4
也就是说【标记4】中的tryAcquire(arg)
为false,那么!tryAcquire(arg)
为true,接下来就应该将该线程放到队列中等待资源的释放。继续看下面的条件
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
其中包含两个方法acquireQueued()
和addWaiter()
我们先看addWaiter(Node.EXCLUSIVE)
【🚩标记5】
private Node addWaiter(Node mode) {
// 把当前线程封装成一个Node节点,这是AQS里面的知识点。
// 传入的mode为Node.EXCLUSIVE,表示独占锁
// Semaphore、CountDownLatch使用Node.SHARED
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 获得CLH队列中的为尾节点(未获取共享资源的线程封装为Node节点放到这个CLH队列中等待)
Node pred = tail;
// 判断尾节点是否尾null
// 当前线程封装城的节点是第一个加入到队列中的节点,此时尾节点为null,if条件不成立。
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾节点尾null,跳到这里
enq(node);
return node;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
经过分析,此时尾节点pred
(也就是tail
)为null
,调用方法enq(node)
【🚩标记6】
private Node enq(final Node node) {
// for (;;)等价于while(true), 一直循环
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
执行第一遍enq
方法
// 获取尾节点,此时 t = tail = null
Node t = tail;
// if成立
if (t == null) { // Must initialize
// 创建一个新的空节点,通过CAS操作把他设置为头结点
// 这一步会设置成功
if (compareAndSetHead(new Node()))
// 设置成功后,令尾节点等于头结点
tail = head;
// 至此,第一遍循环结束
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
【标记6】中说了for (;;)
表示一直循环,现在还没有跳出循环,所以下面来看第二遍循环
// 获取尾节点,此时 t = tail = head = 一个空的节点(哨兵节点)
Node t = tail;
// if条件不成立
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
// else成立
} else {
// node节点是传入的参数,也就是封装了线程B的节点
// 让t节点作为node节点的前置节点。
node.prev = t;
// 通过CAS操作将node节点设置为尾节点
// 该操作能成功
if (compareAndSetTail(t, node)) {
// 由于这是双向链表,node设置成尾节点后,将node设置尾t节点的后置节点
t.next = node;
// 最后返回t节点,返回值用不上。
return t;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
通过两次循环,我们初始化了这个队列,为队列创建了一个头结点,并将当前节点node设置尾尾节点。也就是说【标记6】enq(node)
的作用就是初始化队列,并将当前节点设置为尾节点。
我们回到【标记5】addWaiter(Node.EXCLUSIVE)
,将node节点入队后返回node节点。
private Node addWaiter(Node mode) {
...
// 尾节点尾null,跳到这里
enq(node);
return node;
}
2
3
4
5
6
我们再回到【标记4】的代码,addWaiter(Node.EXCLUSIVE)
返回节点node(里面封装了线程B)
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2
3
4
下面将调用方法acquireQueued(node, arg)
【🚩标记7】
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 表示该线程是否被中断,
boolean interrupted = false;
// 死循环
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
由于其中包含for (;;)
,将执行多次,我们先来看第一次循环【🚩标记8】:
// 获取当前节点的前置节点
// 此时CLH中只有两个节点,当前节点的前置节点就是头结点head
final Node p = node.predecessor();
// p == head成立
// tryAcquire(arg)参考【标记4】,再次尝试获取线程,但还是获取失败了,返回false
// 所以当前if不成立
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 上面的if不成立,再看看这个if
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
这个if中包含了两个方法,我们先看看shouldParkAfterFailedAcquire(p, node)
【🚩标记9】
p是头结点,node是当前节点(封装线程B的节点)
waitStatus:AQS知识点【详情见1.3.2】,包含五种状态:
- 0,默认值
- 1,CANCALLED,表示线程获取锁的请求已取消
- -2,CONDITION,表节点在等待队列中,节点线程等待唤醒
- -3,PROPAGATE,当前线程处于SHARED情况下,该字段才会使用
- -1,SIGANAL,表示线程已经准备好了,就等资源释放了
通过名字看出它的功能是:获取资源失败,应该阻塞线程。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// pred是头结点,node是当前节点
// 获取pred头结点的,waitStatus状态,当前头结点为初始化的默认值0
int ws = pred.waitStatus;
// 不成立
if (ws == Node.SIGNAL)
return true;
// 不成立
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
// 成立
} else {
// 同CAS操作将,pred节点,也就是头结点的状态从0变为Node.SIGNAL(-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 返回false
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
回到【标记8】
// 获取当前节点的前置节点
// 此时CLH中只有两个节点,当前节点的前置节点就是头结点head
final Node p = node.predecessor();
// p == head成立
// tryAcquire(arg)参考【标记4】,再次尝试获取线程,但还是获取失败了,返回false
// 所以当前if不成立
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 上面的if不成立,再看看这个if
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
shouldParkAfterFailedAcquire(p, node)
为false,if条件不成立。【标记7】中的第一遍循环结束,下面开始第二遍循环。【🚩标记10】
// 获取当前节点的前置节点
// 此时CLH中只有两个节点,当前节点的前置节点还是头结点head
final Node p = node.predecessor();
// p == head成立
// tryAcquire(arg)参考【标记4】,再次尝试获取线程,但还是获取失败了,返回false
// 所以当前if不成立
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 上面的if不成立,再看看这个if
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
此时又一次调用shouldParkAfterFailedAcquire(p, node)
方法
与【标记9一样】,只不过ws的值发生了改变,在【标记9】中,将ws的值从0设置为Node.SIGNAL(-1),后面的运行的结果将发生改变。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// pred是头结点,node是当前节点
// 获取pred头结点的,waitStatus状态,当前头结点为Node.SIGNAL(-1)
int ws = pred.waitStatus;
// 成立
if (ws == Node.SIGNAL)
// 返回true
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
此时将返回true,回到【标记10】,shouldParkAfterFailedAcquire(p, node)
的结果就是true
// 获取当前节点的前置节点
// 此时CLH中只有两个节点,当前节点的前置节点还是头结点head
final Node p = node.predecessor();
// p == head成立
// tryAcquire(arg)参考【标记4】,再次尝试获取线程,但还是获取失败了,返回false
// 所以当前if不成立
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 上面的if不成立,再看看这个if
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
下面看看parkAndCheckInterrupt()
方法【🚩标记11】
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程
LockSupport.park(this); // 将一直卡在这里
return Thread.interrupted();
}
2
3
4
5
这个地方调用了LockSupport.park(this)
方法阻塞当前线程,LockSupport参考【1.2】。
此时线程B完成了入队等待和阻塞的操作,此时将一直阻塞在这里,直到上一个线程释放资源,并通知唤醒线程B。我们记住这个【标记11】,后面通知唤醒线程B后将回到这里。
同理,线程C在执行lock.lock()
方法的时候,也会经过上面的过程,线程C封装成的node节点排在最后面作为尾节点,现在队列的状态如下图:
线程C在添加节点的时候不会在执行 【标记5】中
enq()
方法,因为头结点已经完成了初始化。
# 2.6 unlock()
此时回到【标记0】线程A的定义的位置,假设现在线程已经完成了任务,执行lock.unlock()
释放锁。
// 线程A
new Thread(() -> {
lock.lock();
System.out.println(Thread.currentThread().getName() + "\t" + "开始工作!");
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
lock.unlock();
}, "A").start();
2
3
4
5
6
7
下面看看lock.unlock()
方法在ReentrantLock
中的实现:
public void unlock() {
sync.release(1);
}
2
3
调用了sync的release方法,下面看看release()
方法【🚩标记12】
public final boolean release(int arg) {
if (tryRelease(arg)) { // 先执行tryRelease(1)
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
先执行tryRelease(1)
方法,找到他在Sync
中的实现【🚩标记13】
protected final boolean tryRelease(int releases) {
// 获取当前线程状态state=1
// c = 1 - 1 为 0
int c = getState() - releases;
// 判断当前线程是不是锁的拥有者
// A线程拥有这把锁,if不成立
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// if成立
if (c == 0) {
free = true;
// 设置当前拥有这把锁的线程为 null ,表示现在没有线程占用了
setExclusiveOwnerThread(null);
}
// 将state设置为0,表示当前资源空闲(锁没有被占用)
setState(c);
// 返回true
return free;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
通过tryRelease()
方法,将state状态从1变为了0,当前占用的线程从【线程A】变为【null】,此时这把锁已经被释放了。回到【标记12】
public final boolean release(int arg) {
if (tryRelease(arg)) { // tryRelease返回true,条件成立
Node h = head;
// 头结点不为null, 在【标记9】将头结点的waitStatus设置为Node.SIGNAL(-1)
// 此时条件成立
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 执行方法unparkSuccessor(h)
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
执行unparkSuccessor(h)
,h是头结点。【🚩标记14】
private void unparkSuccessor(Node node) {
// 获得头结点的ws = Node.SIGNAL(-1)
int ws = node.waitStatus;
// 条件成立
if (ws < 0)
// 通过CAS的方式将头结点的ws设置为0
compareAndSetWaitStatus(node, ws, 0);
// 获得头结点的下一个节点,就是线程B封装成的Node节点
Node s = node.next;
// 该节点不为null,ws为0
// 条件不成立
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// s不为null,成立
if (s != null)
// 通过unpark方法唤醒节点对应的线程,也就是线程B
LockSupport.unpark(s.thread);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
通过unparkSuccessor(h)
方法,我们成功唤醒了队列中的一个节点封装的线程,也就是线程B。记住,我们在这里唤醒了线程B。
再次回到【标记12】,返回true,不过这个返回结果没什么用。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 该放法执行完毕
// 返回true
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
至此,lock.unlock()
方法运行完毕。
# 2.7 阻塞线程被唤醒
回到【标记11】,线程B被阻塞
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程
LockSupport.park(this); // 将一直卡在这里
return Thread.interrupted();
}
2
3
4
5
回到【标记14】,在这里释放了线程B
private void unparkSuccessor(Node node) {
...
// s不为null,成立
if (s != null)
// 通过unpark方法唤醒节点对应的线程,也就是线程B
LockSupport.unpark(s.thread);
}
2
3
4
5
6
7
所以,此时线程B成功被唤醒,我们回到【标记11】继续运行
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程
LockSupport.park(this);
// 线程b没有被中断,所以返回false
return Thread.interrupted();
}
2
3
4
5
6
回到【标记7】
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 表示该线程是否被中断,
boolean interrupted = false;
// 死循环
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 因为parkAndCheckInterrupt()返回false,所以这里的if条件不成立
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
我们继续里面的死循环【🚩标记15】
for (;;) {
// 获得node节点的前置节点,也就是头结点
final Node p = node.predecessor();
// p == head成立
// 我们再去看看tryAcquire(1)方法
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 因为parkAndCheckInterrupt()返回false,所以这里的if条件不成立
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
下面再看看tryAcquire(arg)
方法,这个方法之前已经看过了两次,每一次都返回false。
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取state状态 = 0
int c = getState();
// 条件成立
if (c == 0) {
// 通过CAS操作,将state状态从0变为1,操作一定成功
if (compareAndSetState(0, acquires)) {
// 设置当前线程(线程B)为拥有锁的线程
setExclusiveOwnerThread(current);
// 返回true
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
此时tryAcquire(arg)
方法返回true,再回到【标记15】
for (;;) {
// 获得node节点的前置节点,也就是头结点
final Node p = node.predecessor();
// if条件成立
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为头结点
setHead(node);
// 将原来的头结点的next指针指向null,垃圾回收的时候会回收掉原来头结点
p.next = null; // help GC
failed = false;
// 返回false
return interrupted;
}
// 因为parkAndCheckInterrupt()返回false,所以这里的if条件不成立
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
至此【标记7】处的acquireQueued
方法执行完毕,返回false。
再回到【标记4】
public final void acquire(int arg) {
// 前面讨论过 !tryAcquire(arg) = true
// 刚刚讨论出acquireQueued() 返回false
// if条件不成立
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2
3
4
5
6
7
acquire
方法执行完毕,回到【标记3】NonfairSync
中的lock()
方法,此时lock()运行完毕
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
2
3
4
5
6
回到【标记2】,线程获取锁成功,开始工作。
// 线程B
new Thread(() -> {
lock.lock();
System.out.println(Thread.currentThread().getName() + "\t" + "开始工作!");
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
lock.unlock();
}, "A").start();
2
3
4
5
6
7
至此,我们分析完了线程空闲状态获取锁、释放锁、阻塞状态获取锁的全过程。