目錄
- 1. AQS是什么?
- 2. AQS核心思想
- 2.1 基本框架
- 2.1.1 資源state
- 2.1.2 CLH雙向佇列
- 2.2 AQS模板
- 2.1 基本框架
- 3. 原始碼分析
- 3.1 acquire(int)
- 3.1.1 tryAcquire(int)
- 3.1.2 addWaiter(Node.EXCLUSIVE)
- 3.1.3 acquireQueued(Node node, int arg)
- 3.2 release(int)
- 3.2.1 tryRelease(int)
- 3.2.2 unparkSuccessor(h)
- 3.3 acquireShared(int)和releaseShared(int)
- 3.3.1 acquireShared(int)
- 3.3.2 releaseShared(int)
- 3.1 acquire(int)
- 4. 面試問題模擬
- 參考資料
1. AQS是什么?
AQS定義了一套多執行緒訪問共享資源的同步器框架,許多同步類實作都依賴于它,如常用的ReentrantLock,
簡單來說,AQS定義了一套框架,來實作同步類,
2. AQS核心思想
2.1 基本框架
AQS的核心思想是對于共享資源,維護一個雙端佇列來管理執行緒,佇列中的執行緒依次獲取資源,獲取不到的執行緒進入佇列等待,直到資源釋放,佇列中的執行緒依次獲取資源,
AQS的基本框架如圖所示:
2.1.1 資源state
state變數表示共享資源,通常是int
型別,
- 訪問方法
state型別用戶無法直接進行修改,而需要借助于AQS提供的方法進行修改,即getState()
、setState()
、compareAndSetState()
等, - 訪問型別
AQS定義了兩種資源訪問型別:- 獨占(Exclusive):一個時間點資源只能由一個執行緒占用;
- 共享(Share):一個時間點資源可以被多個執行緒共用,
2.1.2 CLH雙向佇列
CLH佇列是一種基于邏輯佇列非執行緒饑餓的自旋公平鎖,具體介紹可參考此篇博客,CLH中每個節點都表示一個執行緒,處于頭部的節點獲取資源,而其他資源則等待,
- 節點結構
Node
類原始碼如下所示:
static final class Node {
// 模式,分為共享與獨占
// 共享模式
static final Node SHARED = new Node();
// 獨占模式
static final Node EXCLUSIVE = null;
// 結點狀態
// CANCELLED,值為1,表示當前的執行緒被取消
// SIGNAL,值為-1,表示當前節點的后繼節點包含的執行緒需要運行,也就是unpark
// CONDITION,值為-2,表示當前節點在等待condition,也就是在condition佇列中
// PROPAGATE,值為-3,表示當前場景下后續的acquireShared能夠得以執行
// 值為0,表示當前節點在sync佇列中,等待著獲取鎖
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 結點狀態
volatile int waitStatus;
// 前驅結點
volatile Node prev;
// 后繼結點
volatile Node next;
// 結點所對應的執行緒
volatile Thread thread;
// 下一個等待者
Node nextWaiter;
// 結點是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 獲取前驅結點,若前驅結點為空,拋出例外
final Node predecessor() throws NullPointerException {
// 保存前驅結點
Node p = prev;
if (p == null) // 前驅結點為空,拋出例外
throw new NullPointerException();
else // 前驅結點不為空,回傳
return p;
}
// 無參構造方法
Node() { // Used to establish initial head or SHARED marker
}
// 構造方法
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 構造方法
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
Node
的方法和屬性值如圖所示:
其中,
waitStatus
表示當前節點在佇列中的狀態;thread
表示當前節點表示的執行緒;prev
和next
分別表示當前節點的前驅節點和后繼節點;nextWaiter
d當存在CONDTION佇列時,表示一個condition狀態的后繼節點,
- waitStatus
結點的等待狀態是一個整數值,具體的引數值和含義如下所示:
1
-CANCELLED
,表示節點獲取鎖的請求被取消,此時節點不再請求資源;0
,是節點初始化的默認值;-1
-SIGNAL
,表示執行緒做好準備,等待資源釋放;-2
-CONDITION
,表示節點在condition等待佇列中,等待被喚醒而進入同步佇列;-3
-PROPAGATE
,當前執行緒處于共享模式下的時候會使用該欄位,
2.2 AQS模板
AQS提供一系列結構,作為一個完整的模板,自定義的同步器只需要實作資源的獲取和釋放就可以,而不需要考慮底層的佇列修改、狀態改變等邏輯,
使用AQS實作一個自定義同步器,需要實作的方法:
isHeldExclusively()
:該執行緒是否獨占資源,在使用到condition的時候會實作這一方法;tryAcquire(int)
:獨占模式獲取資源的方式,成功獲取回傳true
,否則回傳false
;tryRelease(int)
:獨占模式釋放資源的方式,成功獲取回傳true
,否則回傳false
;tryAcquireShared(int)
:共享模式獲取資源的方式,成功獲取回傳true
,否則回傳false
;tryReleaseShared(int)
:共享模式釋放資源的方式,成功獲取回傳true
,否則回傳false
;
一般來說,一個同步器是資源獨占模式或者資源共享模式的其中之一,因此tryAcquire(int)
和tryAcquireShared(int)
只需要實作一個即可,tryRelease(int)
和tryReleaseShared(int)
同理,
但是同步器也可以實作兩種模式的資源獲取和釋放,從而實作獨占和共享兩種模式,
3. 原始碼分析
3.1 acquire(int)
acquire(int)
是獲取原始碼部分的頂層入口,原始碼如下所示:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這段代碼展現的資源獲取流程如下:
tryAcquire()
嘗試直接去獲取資源;獲取成功則直接回傳- 如果獲取失敗,則
addWaiter()
將該執行緒加入等待佇列的尾部,并標記為獨占模式; acquireQueued()
使執行緒阻塞在等待佇列中獲取資源,一直獲取到資源后才回傳,
簡單總結就是:
- 獲取資源;
- 失敗就排隊;
- 排隊要等待,
從上文的描述可見重要的方法有三個:tryAquire()
、addWaiter()
、acquireQueued()
,下面將逐個分析其原始碼:
3.1.1 tryAcquire(int)
tryAcquire(int)
是獲取資源的方法,原始碼如下所示:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
該方法是一個空方法,需要自定義同步器實作,因此在使用AQS實作同步器時,需要重寫該方法,這也是“自定義的同步器只需要實作資源的獲取和釋放就可以”的體現,
3.1.2 addWaiter(Node.EXCLUSIVE)
addWaiter(Node.EXCLUSIVE)
是將執行緒加入等待佇列的尾部,原始碼如下所示:
private Node addWaiter(Node mode) {
//以給定模式構造結點,mode有兩種:EXCLUSIVE(獨占)和SHARED(共享)
//aquire()方法是獨占模式,因此直接使用Exclusive引數,
Node node = new Node(Thread.currentThread(), mode);
//嘗試快速方式直接放到隊尾,
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失敗則通過enq入隊,
enq(node);
return node;
}
首先,使用模式將當前執行緒構造為一個節點,然后嘗試將該節點放入隊尾,如果成功則回傳,否則呼叫enq(node)
將節點放入隊尾,最侄訓傳當前節點的位置指標,
其中,enq(node)
方法是將節點加入佇列的方法,原始碼如下所示:
private Node enq(final Node node) {
for (;;) { // 無限回圈,確保結點能夠成功入佇列
// 保存尾結點
Node t = tail;
if (t == null) { // 尾結點為空,即還沒被初始化
if (compareAndSetHead(new Node())) // 頭節點為空,并設定頭節點為新生成的結點
tail = head; // 頭節點與尾結點都指向同一個新生結點
} else { // 尾結點不為空,即已經被初始化過
// 將node結點的prev域連接到尾結點
node.prev = t;
if (compareAndSetTail(t, node)) { // 比較結點t是否為尾結點,若是則將尾結點設定為node
// 設定尾結點的next域為node
t.next = node;
return t; // 回傳尾結點
}
}
}
}
3.1.3 acquireQueued(Node node, int arg)
這部分原始碼是將執行緒阻塞在等待佇列中,執行緒處于等待狀態,直到獲取到資源后才回傳,原始碼如下所示:
// sync佇列中的結點在獨占且忽略中斷的模式下獲取(資源)
final boolean acquireQueued(final Node node, int arg) {
// 標志
boolean failed = true;
try {
// 中斷標志
boolean interrupted = false;
for (;;) { // 無限回圈
// 獲取node節點的前驅結點
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 前驅為頭節點并且成功獲得鎖
setHead(node); // 設定頭節點
p.next = null; // help GC
failed = false; // 設定標志
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//
//shouldParkAfterFailedAcquire只有當該節點的前驅結點的狀態為SIGNAL時,才可以對該結點所封裝的執行緒進行park操作,否則,將不能進行park操作,
//parkAndCheckInterrupt首先執行park操作,即禁用當前執行緒,然后回傳該執行緒是否已經被中斷
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued(Node node, int arg)
方法的主要邏輯如下:
- 獲取
node
節點的前驅結點,判斷前驅節點是不是頭部節點head,有沒有成功獲取資源, - 如果前驅結點是頭部節點head并且獲取了資源,說明自己應該被喚醒,設定該節點為head節點等待下一個獲得資源;
- 如果前驅節點不是頭部節點或者沒有獲取資源,則判斷是否需要park當前執行緒,
- 判斷前驅節點狀態是不是
SIGNAL
,是的話則park當前節點,否則不執行park操作;
- 判斷前驅節點狀態是不是
park
當前節點之后,當前節點進入等待狀態,等待被其他節點unpark
操作喚醒,然后重復此邏輯步驟,
3.2 release(int)
release(int)
是釋放資源的頂層入口方法,原始碼如下所示:
public final boolean release(int arg) {
if (tryRelease(arg)) { // 釋放成功
// 保存頭節點
Node h = head;
if (h != null && h.waitStatus != 0) // 頭節點不為空并且頭節點狀態不為0
unparkSuccessor(h); //釋放頭節點的后繼結點
return true;
}
return false;
}
release(int)
方法的主要邏輯如下:
- 嘗試釋放資源,如果釋放成功則回傳
true
,否則回傳false
; - 釋放成功之后,需要呼叫
unparkSuccessor(h)
喚醒后繼節點,
下面介紹兩個重要的原始碼函式:tryRelease(int)
和unparkSuccessor(h)
,
3.2.1 tryRelease(int)
tryRelease(int)
是釋放資源的方法,原始碼如下所示:
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
這部分是需要自定義同步器自己實作的,要注意的是回傳值需要為boolean
型別,表示釋放資源是否成功,
3.2.2 unparkSuccessor(h)
unparkSuccessor(h)
是喚醒后繼節點的方法,原始碼如下所示:
private void unparkSuccessor(Node node) {
//這里,node一般為當前執行緒所在的結點,
int ws = node.waitStatus;
if (ws < 0)//置零當前執行緒所在的結點狀態,允許失敗,
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一個需要喚醒的結點s
if (s == null || s.waitStatus > 0) {//如果為慷訓已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 從后向前找,
if (t.waitStatus <= 0)//從這里可以看出,<=0的結點,都是還有效的結點,
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//喚醒
}
這部分主要是查找第一個還處于等待狀態的節點,將其喚醒;
查找順序是從后往前找,這是因為CLH佇列中的prev
鏈是強一致的,從后往前找更加安全,而next
鏈因為addWaiter()
方法和cancelAcquire()
方法的存在,不是強一致的,因此從前往后找可能會出現問題,這部分的具體解釋可以參考參考文獻-1
3.3 acquireShared(int)和releaseShared(int)
3.3.1 acquireShared(int)
是使用共享模式獲取共享資源的頂層入口方法,原始碼如下所示:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
流程如下:
- 通過
tryAcquireShared(arg)
嘗試獲取資源,如果獲取成功則直接回傳; - 如果獲取資源失敗,則呼叫
doAcquireShared(arg)
將執行緒阻塞在等待佇列中,直到被unpark()
/interrupt()
并成功獲取到資源才回傳,
其中,tryAcquireShared(arg)
是獲取共享資源的方法,也是需要用戶自己實作,
而doAcquireShared(arg)
是將執行緒阻塞在等待佇列中,直到獲取到資源后才回傳,具體流程和acquireQueued()
方法類似,
原始碼如下所示:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//加入佇列尾部
boolean failed = true;//是否成功標志
try {
boolean interrupted = false;//等待程序中是否被中斷過的標志
for (;;) {
final Node p = node.predecessor();//前驅
if (p == head) {//如果到head的下一個,因為head是拿到資源的執行緒,此時node被喚醒,很可能是head用完資源來喚醒自己的
int r = tryAcquireShared(arg);//嘗試獲取資源
if (r >= 0) {//成功
setHeadAndPropagate(node, r);//將head指向自己,還有剩余資源可以再喚醒之后的執行緒
p.next = null; // help GC
if (interrupted)//如果等待程序中被打斷過,此時將中斷補上,
selfInterrupt();
failed = false;
return;
}
}
//判斷狀態,尋找安全點,進入waiting狀態,等著被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.3.2 releaseShared(int)
releaseShared(int)
是釋放共享資源的頂層入口方法,原始碼如下所示:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//嘗試釋放資源
doReleaseShared();//喚醒后繼結點
return true;
}
return false;
}
流程如下:
- 使用
tryReleaseShared(arg)
嘗試釋放資源,如果釋放成功則回傳true,否則回傳false; - 如果釋放成功,則呼叫
doReleaseShared()
喚醒后繼節點,
下面介紹一下doReleaseShared()
方法,原始碼如下所示:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);//喚醒后繼
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)// head發生變化
break;
}
}
4. 面試問題模擬
Q:AQS是介面嗎?有哪些沒有實作的方法?看過相關原始碼嗎?
AQS定義了一個實作同步類的框架,實作方法主要有tryAquire
和tryRelease
,表示獨占模式的資源獲取和釋放,tryAquireShared
和tryReleaseShared
表示共享模式的資源獲取和釋放,
原始碼分析如上文所述,
參考資料
- Java并發之AQS詳解
- JUC鎖: 鎖核心類AQS詳解
- 從ReentrantLock的實作看AQS的原理及應用
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/556281.html
標籤:其他
下一篇:返回列表