目錄
1:什么是AQS?
2:AQS都有那些用途?
3:我們如何使用AQS
4:AQS的實作原理
5:對AQS的設計與實作的一些思考
1:什么是AQS
? 隨著計算機的算力越來越強大,各種各樣的并行編程模型也隨即踴躍而來,但當我們要在并行計算中使用共享資源的時候,就需要利用一種手段控制對共享資源的訪問和修改來保證我們程式的正確的運行,而Java中除了在語言級別實作的synchronized鎖之外,還有另一種對共享資源訪問控制的實作,而這些實作都依賴于一個叫做抽象佇列同步器(AbstractQueuedSynchronizer)的模板框架類,簡稱AQS,所以我們想要更深刻的理解Java中對共享資源的訪問控制,就不可避免的要對AQS深入的了解和探討,
? 我們首先看一下官方對于AQS的描述:提供一個依賴先進先出(FIFO)等待佇列的掛起鎖和相關同步器(信號量、事件等)框架,此類被設計為大多數型別的同步器的基類,這些同步器依賴于單個原子int值來表示狀態,子類必須定義更改該狀態的受保護方法,以及定義該狀態在獲取或釋放該物件方面的含義,考慮到這些,類中的其他方法實作排隊和掛起機制,子類可以維護狀態欄位,但只有使用方法getState、setState和compareAndSetState方法才可以更改狀態,
2:AQS有哪些用途

? AQS的用途有很多,幾乎Java中所有的共享資源控制的實作都離不開它,比如我們常用的鎖ReentrantLock、是基于AQS實作了一套可重入的公平和非公平互斥鎖的實作,上圖中我列舉了我們常用的一些對于共享資源訪問控制的一些工具,也都是基于AQS實作的,
3:如何使用AQS
? 我們要是用AQS實作我們自己的鎖,都離不開AQS中一個叫做int型別state的變數,而這個變數的具體意義是由使用者自已定義的,比如我們要基于AQS實作一個不可重入的互斥鎖,我們可以定義state為1代表有執行緒已經獲取了鎖,state為0為空閑狀態,
下面是AQS檔案中的一段使用AQS自定義互斥鎖的一段示例代碼,我把它放到此處,方便大家查閱,
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
//Reports whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provides a Condition
Condition newCondition() {
return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
4:AQS的實作原理
? AQS實作的核心思想是,如果被請求的共享資源空閑,那么就將當前請求資源的執行緒設定為有效的作業執行緒,將共享資源設定為鎖定狀態;如果共享資源被占用,就需要將此執行緒放入一個叫做CLH(三個人名Craig、Landin and Hagersten)的等待佇列中,然后通過掛起和喚醒機制來保證鎖的分配,而將資源設定為鎖定狀態主要是通過說到的一個叫做int型別的state的變數來控制的,佇列的FIFO操作則是利用CLH佇列來實作,
等待佇列是“CLH”(Craig、Landin和Hagersten)鎖佇列的變體,我們使用它們來作為同步器,在其節點中存盤有關執行緒的一些控制資訊,每個節點中的status欄位跟蹤執行緒是否應該掛起,當節點的前驅節點釋放時,節點會發出信號,佇列的每個節點在其他方面都充當一個特定的通知樣式監視器,其中包含一個等待執行緒,要進入CLH鎖的佇列,可以將其原子性地拼接在佇列的尾部,要退出佇列,只需將其放在佇列頭部,也就是將此節點設定為head欄位,原理圖如下,

4.1:AQS的資料結構
首先我們先看一下AQS中最基本的資料結構,也就是CLH佇列中的節點,是一個名為Node的靜態內部類
static final class Node {
/** 標記此節點是一個以共享模式等待的鎖 */
static final Node SHARED = new Node();
/** 標記此節點是一個以互斥模式等待的鎖 */
static final Node EXCLUSIVE = null;
/** 表示執行緒獲取鎖的執行緒已經取消了*/
static final int CANCELLED = 1;
/** 原文注釋:waitStatus value to indicate successor's thread needs unparking
表示此執行緒的后繼執行緒需要通過park()方法掛起,
我的理解是此執行緒的后繼節點需要被掛起,
但當前節點必須是釋放鎖或者取消獲取鎖,后繼執行緒等待被喚醒獲取鎖,后續可以通過原始碼解釋,
*/
static final int SIGNAL = -1;
/** 表示節點在等待佇列中,節點執行緒等待喚醒,在使用Conditional的時候會有此狀態 */
static final int CONDITION = -2;
/**
* 當前執行緒處在SHARED情況下,該欄位才會使用
*/
static final int PROPAGATE = -3;
/**
這些值以數字形式排列,以簡化使用,非負值表示節點不需要信號,因此,大多數代碼不需要檢查特定的值,僅用檢查符號就可以 了,對于正常同步節點,該欄位初始化為0,并且條件節點的CONDITION,使用CAS進行修改,
*/
volatile int waitStatus;
/**
前驅界定
*/
volatile Node prev;
/**
后繼節點
*/
volatile Node next;
/**
* 此節點的執行緒
*/
volatile Thread thread;
/**
指向下一個處于CONDITION狀態的節點,使用Conditional模式才會用到此節點,
篇幅原因,本片不講述Condition Queue佇列,故對此欄位不多作介紹,
*/
Node nextWaiter;
/**
* 是否是共享模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 回傳當前節點的前驅節點,前驅節點為null,則拋出NPE
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS中其他重要欄位
/**
* 佇列的頭節點(虛節點,不存盤資料),懶初始化,除了初始化之外,僅能通過setHead方法修改,
* 注:假如頭節點存在,一定會保證waitStatus變數的值不是CANCELLED
*/
private transient volatile Node head;
/**
* 佇列的尾節點(虛節點,不存盤資料),懶初始化,僅僅可以通過enq方法初始化
*/
private transient volatile Node tail;
/**
* 同步狀態
*/
private volatile int state;
由靜態內部類Node中的prev和next欄位我們可以知道CLH變體佇列是一個由Node組成的雙向佇列,由欄位head和tail可以知道AQS中保存了頭節點和尾節點,state欄位則是用來作為同步的重要欄位,AQS中提供了三個final型別的方法來訪問該欄位,
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
4.2:模板方法
/**
嘗試以獨占模式獲取鎖,此方法此方法應查詢物件的狀態是否允許以獨占模式獲取該物件,若允許的話則可以獲取它,
此方法總是由執行acquire方法的執行緒呼叫,如果此方法回傳false且執行緒尚未排隊,則acquire方法可以對執行緒進行排隊,直到某 個其他執行緒發出釋放信號,則該執行緒停止掛起,
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
嘗試以獨占模型釋放鎖
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
嘗試以共享模式獲取鎖,此方法此方法應查詢物件的狀態是否允許以獨占模式獲取該物件,若允許的話則可以獲取它,
此方法總是由執行acquire方法的執行緒呼叫,如果此方法回傳false且執行緒尚未排隊,則acquire方法可以對執行緒進行排隊,直到某 個其他執行緒發出釋放信號,則該執行緒停止掛起,
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
嘗試以共享模式釋放鎖
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
回傳是否以獨占模式持有鎖
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
思考:AQS作為一個模板框架類,為什么 tryLock和tryRelease等方法,那么為什么這些方法不定義成abstract方法,而要定義成普通方法,且在方法中拋出例外呢?我的理解是AQS作為一個模板框架提供了加鎖和釋放鎖的通用邏輯,但是又不僅僅是提供了獨占鎖或共享鎖的邏輯,而是作為一個底層的通用執行模板類,如何定義成抽象的模板方法,那么所有的子類都要實作所有的模板方法,但是有些子類僅僅需要獨占鎖,比如ReentrantLock,那么就會要多先實作共享鎖的邏輯(即使是空方法也要實作),所以我猜想是為了子類不必要實作多余的方法,所以才定義成普通的方法并在方法內部拋出例外,
4.3:獲取鎖原始碼及流程分析
由于篇幅原因,本文將以獨占且忽略中斷的模式的方法未入口分析,首先看一下獲取鎖的方法,獲取鎖的總體流程圖如下:
獲取鎖的入口方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//acquireQueued方法會回傳執行緒在掛起程序中是否被中斷,然后回傳執行緒中斷的狀態
selfInterrupt();
}
可以看到獲取鎖的方法是一個由final修飾的不可被子類重寫的方法,首先呼叫了上面的模板方法(必須由子類重寫獲取邏輯)獲取鎖
如果獲取成功則獲取鎖流程執行結束,否則執行addWaiter方法執行入隊邏輯,
執行緒入隊方法
private Node addWaiter(Node mode) {
//mode執行緒獲取鎖的模式(獨占或者共享)
Node node = new Node(Thread.currentThread(), mode);
//嘗試快速入隊,失敗則執行完整入隊邏輯
Node pred = tail;
if (pred != null) {
node.prev = pred;
//如果尾節點不為null,則以原子方式把當前節點設定為尾節點并回傳
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果尾節點不存(說明頭節點和尾節點未初始化)在或者由于競爭導致一次設定尾節點失敗,
//則執行完整入隊邏輯(會進行頭節點和尾節點初始化的作業)
enq(node);
return node;
}
addWaiter方法會先進行快速入隊操作,如果快速入隊失敗(由于競爭或者頭、尾節點未初始化),則進行完整入隊操作(如果頭、尾節點未初始化的話會先進行初始化操作)
完整入隊邏輯
private Node enq(final Node node) {
//自旋把當前節點鏈接到尾節點之后
for (;;) {
Node t = tail;
if (t == null) {
//尾節為空,說明佇列為空,則需要進行頭節點和尾節點的初始化
//這里直接new Node(),一個虛節點作為頭節點,然后將頭節點和尾節點相同
//這里說明頭節點和尾節點不存盤資料
if (compareAndSetHead(new Node()))
tail = head;
} else {
//尾節點不為空,使用cas把當前節點設定為尾節點
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq方法會利用自旋先檢查頭節點和尾節點是否初始化,如果未初始化的話則先進行初始化,初始化完成之后以原子的方式插入node到隊尾并且插入成功之后回傳此節點,
掛起執行緒并等待獲取鎖
final boolean acquireQueued(final Node node, int arg) {
//是否失敗,此執行緒被中斷可能失敗
boolean failed = true;
try {
//執行緒是否被中斷
boolean interrupted = false;
//自旋一直獲取鎖
for (;;) {
//獲取當前節點的前驅節點
final Node p = node.predecessor();
//如果當前節點的前驅節點是頭節點(因為頭節點是虛節點,所以當前節點可以獲取鎖),
//且當前節點獲取所成功
if (p == head && tryAcquire(arg)) {
//設定node為頭節點,因為當前節點已經獲取鎖成功了,當前節點需要作為頭節點
setHead(node);
p.next = null; // help GC
//設定失敗標志為false
failed = false;
//回傳中斷狀態
return interrupted;
}
//shouldParkAfterFailedAcquire方法檢查并更新獲取失敗的節點的狀態,如果執行緒應該掛起則回傳true
//parkAndCheckInterrupt則掛起執行緒并回傳是否中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//失敗,則取消獲取鎖
if (failed)
cancelAcquire(node);
}
}
? 通過上面流程分析可知,獲取鎖失敗,會呼叫addWaiter方法把當前節點放到隊尾,那么執行緒入隊之后什么時候掛起執行緒,什么時候出隊,我們一點一點分析acquireQueued方法這些問題就會逐漸顯露出來,
? 首先該方法會一直自旋獲取鎖(中間可能會被掛起,防止無效自旋),判斷當前節點的前驅節點是否是頭節點來判斷當前是否有獲取鎖的資格,如果有的話則設定當前節點為頭節點并回傳中斷狀態,否則呼叫shouldParkAfterFailedAcquire判斷獲取鎖失敗后是否可以掛起,可以的話則呼叫parkAndCheckInterrupt進行執行緒掛起操作,
設定頭節點
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
注:setHead方法是把當前節點置為虛節點,但并沒有修改waitStatus,因為其他地方要用到,
檢查并更新獲取失敗的節點的狀態
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* node的狀態已經是SIGNAL可以安全的掛起,直接回傳true
*/
return true;
if (ws > 0) {
/*
* 由之前的waitStatus變數列舉值可知,waitStatus大于0為取消狀態,直接跳過此節點
*/
do {
//重新鏈接prev指標,至于為什么沒有操作next指標后面會通過代碼解釋
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 原子方式設定waitStatus的值為SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
掛起并檢查執行緒的中斷狀態
private final boolean parkAndCheckInterrupt() {
//使用LockSupport掛起此執行緒
LockSupport.park(this);
//回傳并清除中斷狀態
return Thread.interrupted();
}
取消獲取鎖
private void cancelAcquire(Node node) {
// 忽略不存在的節點
if (node == null)
return;
//設定當前節點不持有執行緒
node.thread = null;
// 跳過取消的前驅節點
Node pred = node.prev;
//waitStatus>0未已取消的節點
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 未取消的節點的后繼節點
Node predNext = pred.next;
//設定狀態未取消狀態
node.waitStatus = Node.CANCELLED;
// 如果node為尾節點,設定pred為尾節點,然后設定尾節點的下一個節點為null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果當前節點不是head的后繼節點,
//1:判斷當前節點前驅節點的是否為SIGNAL,
//2:如果不是,則把前驅節點設定為SINGAL看是否成功
// 如果1和2中有一個為true,再判斷當前節點的執行緒是否為null
// 如果上述條件都滿足,把當前節點的前驅節點的后繼指標指向當前節點的后繼節點
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
//為什么沒有自旋,如果此處設定失敗,下次仍然會丟掉predNext到next中間節點,所以不會出現問題
compareAndSetNext(pred, predNext, next);
} else {
//當前節點是頭節點的后繼節點或者上述條件都不滿足
unparkSuccessor(node);
}
//為什么此處能help GC,不得不多說Doug Lea大神心思之縝密,考慮之周全,
//解釋參考http://www.codebaoku.com/question/question-sd-1010000043795300.html
node.next = node; // help GC
}
}
當node==tail時,節點變化情況如下圖

當pred==head時,節點的變化情況如下圖

當pred!=head時,且上述條件滿足時節點的變化情況如下圖

通過上面的流程,我們對于取消獲取鎖的cancelAcquire方法對節點操作狀態的產生和變化已經有了一定的了解,但是為什么所有的變化都是對next指標進行了操作,而沒有對Prev指標進行操作呢?帶著這個問題我們回顧一下shouldParkAfterFailedAcquire方法,
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
原因:執行cancelAcquire的時候,當前節點的前置節點可能已經從佇列中出去了(已經執行過Try代碼塊中的shouldParkAfterFailedAcquire方法了),如果此時修改Prev指標,有可能會導致Prev指向另一個已經移除佇列的Node,因此這塊變化Prev指標不安全, shouldParkAfterFailedAcquire方法中,會執行下面的代碼,其實就是在處理Prev指標,shouldParkAfterFailedAcquire是獲取鎖失敗的情況下才會執行,進入該方法后,說明共享資源已被獲取,當前節點之前的節點都不會出現變化,因此這個時候變更Prev指標比較安全,
喚醒后繼節點
private void unparkSuccessor(Node node) {
/*
* node一般為head節點,如果waitStatus為負,則嘗試清除信號,設定為0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
* 正常情況下我們是要喚醒頭節點的后繼節點,但是如果后繼節點為慷訓者已被取消,
* 則需要從尾節點開始,找到離頭節點最近的未被取消的后繼節點,
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果當前節點的下個節點不為空,而且狀態小于等于0,就把當前節點喚醒
if (s != null)
LockSupport.unpark(s.thread);
}
為什么需要從后往前找呢?從美團技術團隊中的一片文章中(https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html)找到了答案,我把大佬的解釋放到下面,供大家參考!
4.4:釋放鎖原始碼及流程分析
釋放鎖流程圖如下:
public final boolean release(int arg) {
//嘗試釋放鎖成功
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
//嘗試釋放鎖失敗
return false;
}
釋放鎖的流程就比較簡單了,先嘗試釋放鎖,如果釋放鎖成功,(如果頭節點不為null且頭節點的狀態不等于0則釋放頭節點的后繼節點)回傳true,否則回傳false,
5:對AQS的設計與實作的一些思考
? 1:設計方面,AQS作為底層一個通用的模板框架類,就要考慮到一些易用性和擴展性,比如作者模板方法使用的拋出例外,而不是作為抽象方法強制使用者實作所有的模板方法,而是使用者可以自由選擇要使用的方法和特性選擇性實作模板方法,當然也犧牲掉了一些其他東西,比如設計原則的最小職責性,這也就體現了一些折衷的思想,任何方案都不是完美的,我們只有權衡利弊之后達到一個相對完美的方案,
? 2:實作方面,AQS的實作不得不令人驚嘆,每一次讀都會想到書讀百遍,其意自現這句名言,每次讀有不一樣的識訓,看似一行不經意的代碼,體現了作者對每一行代碼細致又獨到的思考,在讀AQS代碼的時候參考下面的鏈接,看大牛對AQS的的理解時見解,不僅加深了我對AQS核心思想的理解,也讓我從另一方面看到了AQS的優秀之處(由于個人水平有限,理解不到位或者錯誤還請各位道友不吝賜教),路漫漫其修遠兮,吾將上下而求索,
參考:
https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
//help GC 相關的解釋
http://www.codebaoku.com/question/question-sd-1010000043795300.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/556846.html
標籤:其他
上一篇:計算機COM口資料測驗
下一篇:返回列表