主頁 > 後端開發 > 從ReentrantLock角度決議AQS

從ReentrantLock角度決議AQS

2023-04-14 09:51:26 後端開發

是它,是它,就是它,并發包的基石;

一、概述

閑來不卷,隨便聊一點,

一般情況下,大家系統中至少也是JDK8了,那想必對于JDK5加入的一系列功能并不陌生吧,那時候重點加入了java.util.concurrent并發包,我們簡稱為JUC,JUC下提供了很多并發編程實用的工具類,比如并發鎖lock、原子操作atomic、執行緒池操作Executor等等,下面,我對JUC做了整理,大致分為下面幾點:

基于JDK8,今天重點來聊下JUC并發包下的一個類,AbstractQueuedSynchronizer

首先,淺顯的從名字上看,抽象的佇列同步器;實際上,這名字也跟它的作用如出一轍,抽象,即需要被繼承;佇列同步器,其內部維護了一個佇列,供執行緒入隊等待;最終實作多個執行緒訪問共享資源的功能,

二、原始碼決議

進入AbstractQueuedSynchronizer內部,需要掌握三個重要的屬性:

private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;
  • head:標記等待佇列頭部節點,
  • tail:標記等待佇列尾部節點,
  • state:執行緒的鎖定狀態;state=0,表示資源未被上鎖;state>0,表示資源被上鎖

我們除錯AQS的原始碼,必須尋找一個原始碼除錯的切入點,我這里用我們并發編程常用的Lock鎖作為除錯AQS的切入點,因為這是解決執行緒安全問題常用的手段之一,

2.1、原始碼的切入點

AQS的原始碼除錯,從Lock介面出發,JDK原始碼定義如下:

public interface Lock {

    void lock();

    void lockInterruptibly() throws InterruptedException;

    boolean tryLock();

    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    void unlock();

    Condition newCondition();
}

從原始碼中看到,Lock是一個介面,所以該介面會有一些實作類,其中有一個實作類ReentrantLock,可重入鎖,想必大家都不會陌生,

2.2、ReentrantLock的lock方法

通過跟蹤原始碼可以看到,ReentrantLock#lock內部實作貌似比較簡單,只有簡短的一行代碼

public void lock() {
    sync.lock();
}

其實內部是維護了一個Sync的抽象類,呼叫的是Sync的lock()方法,

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    abstract void lock();

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    protected final boolean isHeldExclusively() {
        return getExclusiveOwnerThread() == Thread.currentThread();
    }
    // ...
}

可以看到,Sync也是個抽象類,它有兩個實作類:NonfairSyncFairSync,這里其實就引出了我們今天的主角,AbstractQueuedSynchronizerSync繼承了它,

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

下面我整理了這一系列類的UML圖

通過類圖可知,lock()方法最終呼叫的是ReentrantLock類下,內部類NonfairSyncFairSync的lock方法;對于這兩個類,前者叫非公平鎖,后者叫公平鎖,通過ReentrantLock的構造器可知,默認使用NonfairSync類,

public ReentrantLock() {
    sync = new NonfairSync();
}

NonfairSync類的lock方法出發,引出第一個AQS下的方法compareAndSetState,

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

從compareAndSetState方法的命名可以發現,就是比較并交換的意思,典型的CAS無鎖機制,

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

我們可以觀察到,這里其實呼叫的是Unsafe類的compareAndSwapInt方法,傳入的expect為0,update為1;意思是如果當前值為0,那我就把值最終更新為1,

Unsafe這個類下面,發現好多方法都是用native這個關鍵詞進行修飾的(也包括compareAndSwapInt方法),用native關鍵詞修飾的方法,表示原生的方法;原生方法的實作并不是Java語言,最終實作是C/C++;這并不是本文的討論范圍,

回到AQS的compareAndSetState方法,回傳值是boolean型別,true表示值更新為1成功,false表示不成功,這里出現兩個分支,成功,走setExclusiveOwnerThread方法;不成功,走acquire方法,咱優先討論acquire方法,

2.3、AQS的acquire方法

先來看一下該方法的原始碼;

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

這里的核心是兩個方法,tryAcquire方法和acquireQueued方法,首先會呼叫tryAcquire()方法,看方法命名是嘗試獲取;實際上這個方法確實在就在做一件事“嘗試獲取資源”,

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

不過AQS中的這個方法是protected修飾,并沒有去實作,僅僅只是預留了方法入口,后期需要由其子類去實作;這里的子類就是上文中的NonfairSync類,該類的原始碼在上文中已經貼出,這段原始碼其實運用了我們常見的一個設計模式,“模板方法模式”,

2.4、NonfairSync的tryAcquire方法

NonfairSync的tryAcquire方法原始碼如下

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

這里并沒有直接去實作tryAcquire方法,而是呼叫了Sync類下的nonfairTryAcquire方法,

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

這里有個getState方法,最侄訓傳的是AQS中的state欄位,這個欄位就是多個執行緒搶占的共享資源,所以這個欄位很重要volatile關鍵字修飾,保證記憶體的可見性,int型別,對于ReentrantLock鎖而言,當state=0時,表示無鎖,當state>0時,表示資源已被執行緒鎖定,

下面分析下這段代碼:

  • 如果state=0表示無鎖,通過cas去更新state的值,這里更新為1,
  • 將持有鎖的執行緒更新為當前執行緒,
  • 如果上述cas未更新成功,或者state!=0,表示已上鎖,
  • 繼續判斷下持有鎖的執行緒如果是當前執行緒,state欄位做疊加,這里表示ReentrantLock的含義,表示可重入鎖,
  • 最后,state!=0,持有鎖的執行緒也不是當前執行緒,表示不能對資源加鎖,回傳false,

tryAcquire方法的判斷至此結束,不過最終的走向需要看它的回傳值;回傳true,表示當前執行緒搶占到鎖,或者當前執行緒就是搶占鎖的執行緒,直接重入,加鎖流程結束;回傳false,表示沒有搶占到鎖,流程繼續,這里就引出下個話題,CLH執行緒等待佇列,

2.5、AQS的addWaiter方法

2.5.1、CLH佇列

首先咱來看一段原始碼中的注釋

The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks

大致意思是:CLH佇列是由Craig、Landin、Hagersten這三位老哥名字的首字母疊加在一起命名的,它是一個等待佇列,它是一個變種佇列,用到了自旋,

這里的資訊要抓住三點:等待佇列、變種佇列、自旋,

2.5.2、Node類

在決議addWaiter方法實作之前,就不得不提到一個內部類Node;addWaiter方法的入參是這個型別,所以先來看看這個類,原始碼如下:

static final class Node {
    
    static final Node SHARED = new Node();
    
    static final Node EXCLUSIVE = null;
   
    static final int CANCELLED =  1;
    
    static final int SIGNAL    = -1;

    static final int CONDITION = -2;
 
    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;

    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

這里先大致介紹下,每個屬性的意思:

  • SHARED:型別就是Node,表示共享模式,
  • EXCLUSIVE:型別也是Node,表示獨占模式,這里的ReentrantLock就是獨占模式,
  • waitStatus:int型別,當前Node節點下,存盤的執行緒狀態,
  • CANCELLED:int型別,等于1,waitStatus屬性的值之一,表示節點被取消狀態,
  • SIGNAL:int型別,等于-1,waitStatus屬性的值之一,表示當前節點需要去喚醒下一個節點,
  • CONDITION:int型別,等于-2,waitStatus屬性的值之一,表示節點處于等待狀態,
  • PROPAGATE:int型別,等于-2,waitStatus屬性的值之一,表示下一個被獲取的物件應該要無條件傳播,該值僅在共享模式下使用,
  • prev:Node型別,指向佇列中當前節點的前一個節點,
  • next:Node型別,指向佇列中當前節點的下一個節點,
  • thread:存盤當前執行緒資訊,
  • nextWaiter:用來存盤節點的指標,不過會出現兩種情況;等待佇列中,會將該屬性的值設定成SHARED或者EXCLUSIVE,用來區分當前節點處于共享模式還是獨享模式;條件佇列中,用于存放下一個節點的指標,所以當是條件佇列的情況下,這個佇列是單向佇列,
  • isShared():回傳是否屬于共享模式,true表示共享模式,false表示獨享模式,
  • predecessor():獲取當前節點的前一個節點,

另外,Node類還有兩個有參構造器:
從作者的注釋就能看出來,第一個構造器是在等待佇列的時,創建節點使用,第二個構造器是在條件佇列時,創建節點使用,

2.5.3、方法決議

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

其實這段方法是在創建Node物件,Node物件就是組成CLH佇列的基礎元素,

  • 創建一個Node物件,mode引數由上述的acquire()方法傳遞而來,可以看到傳入Node.EXCLUSIVE,表示獨占模式,
  • 判斷隊尾有指向節點,剛創建的節點放入佇列的隊尾,并且通過cas將隊尾指標改成當前創建節點,最后回傳當前創建節點,
  • 如果隊尾沒有指向節點,呼叫enq方法,做佇列的初始化操作,
  • 這里出現了第一個自旋,enq方法是無限回圈的,就像作者注釋的一樣,Must initialize,必須初始化,
  • 這里先是重新new了一個新的node(也可以叫空節點),標記它為佇列頭,
  • 隨后再將addWaiter方法中創建的node,加入到佇列尾,

總結下addWaiter方法干的事情:

  1. 創建一個節點,存盤當前執行緒,并標記獨占模式,
  2. 判斷佇列是否為空,不為空,通過cas將存盤當前執行緒的node節點加入到對尾,并且對該節點做對尾標記,
  3. 佇列為空,通過自旋,做初始化操作,
  4. 初始化過后的佇列,佇列頭是一個空節點,佇列尾是存盤當前執行緒的節點,

2.6、AQS的acquireQueued方法

還是先來看下這個方法的原始碼;

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private void cancelAcquire(Node node) {
    if (node == null)
        return;

    node.thread = null;

    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;

    node.waitStatus = Node.CANCELLED;

    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

從這個方法看到,又是運用了無限回圈,需要分兩個步驟去觀察:1.當前方法中的判斷,自己的上一個節點是否是頭部節點(頭部節點就是占用資源的節點);2.當前節點正式入佇列,并且被掛起,

2.6.1、acquireQueued方法中的判斷

當前節點的前一個節點是佇列頭部,意味著當前節點的前一個節點,就是持有資源的節點;當資源被釋放,當前節點會去嘗試爭奪鎖資源;如果拿到鎖資源,當前節點會被標記為佇列頭部節點,它的上個節點(老的頭部節點)會被置為null,需要被GC及時清除,所以作者在這里添加了一個注釋:help GC;下圖就是描述了這個流程:

2.6.2、shouldParkAfterFailedAcquire方法實作

如果當前節點的上一個節點,并不是頭部節點;這里就需要用到上述Node類中介紹的各種狀態欄位了;先來重點介紹下Node類中的兩個狀態屬性:

  • CANCELLED:int型別,等于1,waitStatus屬性的值之一,表示節點被取消
  • SIGNAL:int型別,等于-1,waitStatus屬性的值之一,表示當前節點需要去喚醒下一個節點

進入的shouldParkAfterFailedAcquire這個方法內部,該方法接受兩個引數:當前節點前一個節點和當前節點,首先,獲取上一個節點的waitStatus屬性,然后通過這個屬性做如下判斷:

  1. 如果狀態是SIGNAL(即等于-1),直接回傳true,后續就會交給parkAndCheckInterrupt方法去將當前執行緒掛起,
  2. 如果不是SIGNAL,對于當前ReentrantLock而言,ws>0的操作是滿足的,所以下面的步驟就是當前節點一直往前尋找,跳過已被標記狀態為CANCELLED的節點,直到找到狀態是SIGNAL的節點,將該節點作為當前節點的上一個節點,也印證了SIGNAL狀態的解釋:當前節點的上一個節點是SIGNAL,那么當前節點需要掛起,等待被喚醒,最后進入下個回圈,直到上個節點狀態是SIGNAL,執行上面的第一步,回傳true,

這里可以想象成一個排隊去食堂打飯的場景,你在低頭玩手機前,跟你前面的同學說,我玩會手機,快到了叫我一下;結果你前面的同學嫌隊伍長走了(CANCELLED狀態),所以你只能繼續找他的上一個同學;直到有同學回答你,好的(該同學被標記SIGNAL狀態);然后你就低頭玩手機,等待回答你“好的”的那個同學叫你,

  1. 最后compareAndSetWaitStatus方法其實不用看也知道,通過cas機制,將當前節點的上一個節點的waitStatus修改成SIGNAL狀態,這樣的話,當前節點才能被掛起,等待喚醒,

再來看下parkAndCheckInterrupt這個方法

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

// LockSupport#park
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

其中最終又是這個Unsafe類,通過它的原生方法park,去掛起當前執行緒,這里就不展開贅述了,

2.7、資源上鎖總結

下面整理下從lock方法作為切入點,一系列的呼叫:

2.8、ReentrantLock的unlock方法

之前一直在講資源“上鎖”,那么這個方法就是給資源解鎖,這里給出重要的部分原始碼

// AQS中
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// AQS中
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

// ReentrantLock中
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

2.9、ReentrantLock的tryRelease方法

在呼叫unlock方法去解鎖后,最終是呼叫AQS中的release方法去實作這個解鎖功能的;在該方法中,首先會呼叫ReentrantLock中的tryRelease方法,去做state狀態值的遞減操作,

  1. 首先,獲取state值(在AQS中有這個公共屬性,上文提到過),這里是對當前state值減去1,
  2. 再判斷當前解鎖的執行緒與持有鎖的執行緒是不是同一個,不是的話,直接拋例外,所以t執行緒占用鎖,只有t執行緒才能解鎖,解鈴還須系鈴人,
  3. 最后判斷做完遞減的值是不是等于0,如果為0,將持有鎖的執行緒清空,更新state欄位為遞減值(這里是0),最后回傳true,代表鎖已經被釋放了,
  4. 如果不是0,更新state欄位為遞減值(不是0),也不會清空持有鎖的執行緒,意味著資源還是被執行緒加鎖中,最后回傳false,

2.10、AQS的release方法

在tryRelease方法回傳false的時候,release方法并不會做任何操作,直接就結束了,意味著解鎖并沒有完成;
但是在回傳true的時候,具體分以下幾部操作:

  1. 拿到CLH佇列被標記頭部的節點,
  2. 判斷不是空(佇列不能是空的),并且頭部節點的等待狀態不是0,在這種情況下,它只能是-1(SIGNAL),所以是需要去喚醒下個節點的,
  3. 最后,呼叫AQS中的unparkSuccessor方法,去喚醒執行緒,

2.11、AQS的unparkSuccessor方法

上面說到了,這個方法主要是用來喚醒執行緒的,下面還是做一下具體的決議:

  1. 該方法傳參是一個Node節點,這里傳入的是被標記佇列頭的節點(頭部節點是持有鎖資源的節點),
  2. 拿到頭部節點的waitStatus狀態屬性,并且判斷小于0的情況下(該情況是waitStatus=-1),通過cas機制將頭部節點的狀態改為0,初始化狀態,
  3. 拿到頭部節點的下個節點,也就是真正意義上處于等待中的第一個節點,
  4. 它還是先判斷了這個拿到的節點是否為null,或者狀態大于0(亦或說判斷狀態等于1);如果條件成立,說明頭節點的下個節點是空,或者下個節點被取消了,
  5. 如果第四個判斷條件滿足,從隊尾一直從后往前找,找到離頭節點最近的那個節點,
  6. 通過Unsafe類的unpark原生方法去喚醒上述找到的,距離頭部節點最近的未處于取消狀態下的節點,

2.12、資源解鎖總結

通過上面的描述可以發現,資源解鎖是相對簡單的;它只能被上鎖的執行緒去解鎖;通過遞減AQS內部維護的state屬性值,直到state減為0,表示資源已被解鎖;當資源被解鎖后,需要通過Unsafe的unpark方法,去喚醒CLH佇列中,被掛起的第一個節點上的執行緒,

2.13、公平鎖與非公平鎖的差異

在2.2中說過,當我們使用無參構造器創建一把“鎖”的時候,默認是使用NonfairSync這個內部類,也就是非公平鎖;但是在原始碼中發現ReentrantLock還存在一個有參構造器,引數是一個boolean型別;

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

很明顯,這種方式就是將選擇權交給開發人員,當我們傳入true時,就會創建一把“公平鎖”,還是一樣,先來看下公平鎖的內部;

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

從原始碼的角度,咱來看下,為什么一個叫“非公平鎖”,另一個叫“公平鎖”?

其實不難發現,NonfairSync內部的lock方法,它是一上來就通過cas機制去搶占state公共資源,搶不到才去執行acquire方法實作后續入佇列等一系列的操作;而這里FairSync的lock方法,它是直接執行acquire方法,執行后續的操作,等于非公平鎖,會去多爭取一次資源,對于在CLH佇列中等待的執行緒,是“不公平”的,

除了lock方法存在差異之外,在tryAcquire方法中,也存在著不同,FairSync類中,會多執行hasQueuedPredecessors方法,它是AQS下的一個公用方法,下面具體看下這個方法;

public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

只有簡短的幾行,卻有很多種可能性,但是整個方法主要功能就是判斷當前執行緒是否需要入佇列:回傳false,佇列為空,不對等待;回傳true,佇列不是空,去排隊等待,下面需要重點講下這一行代碼:return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());

2.13.1、hasQueuedPredecessors回傳false

回傳false,情況也有兩種:1、h != t** **是false,2、h != t是true,并且 (s = h.next) == null 是false, s.thread != Thread.currentThread()是false,

第一種情況比較簡單,意思是頭結點和尾節點是同一個,說明佇列是空的,不需要排隊等待,所以直接回傳false,

第二種情況,頭尾不是同一個節點,頭部節點的下個節點也不是空,并且頭部節點的下一個節點就是當前執行緒,
其實就可以理解為,前面的資源剛釋放,正好輪到當前執行緒來搶占資源,這種情況相對較少,

2.13.2、hasQueuedPredecessors回傳true

回傳true,有兩種情況:1、h != t是true,并且 (s = h.next) == null 是true,2、h != t是true,并且 (s = h.next) == null 是false, s.thread != Thread.currentThread()是true,

1、這里的頭尾不是同一個節點是必要滿足的條件,保證了佇列起碼不是空的,然后(s = h.next) == null 滿足是true,這里解釋起來就必須回顧下enq初始化佇列這個方法,

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

從這個方法可知,先是將節點的prev指向前一個節點,然后再通過cas修改尾部標識,最后再將前一個節點的next指向當前節點;因此AQS,入隊操作是非原子性的

繼續回到判斷本身,頭部節點拿到鎖在執行;中間節點沒拿到鎖在入隊;此時頭部節點執行完后釋放鎖,當前節點嘗試不入隊拿鎖,但是中間執行緒已經在排隊了,但是還沒來得及執行t.next = node的操作,導致(s = h.next) == null 滿足,所以當前節點必須入隊,最侄訓傳true,

2、滿足s.thread != Thread.currentThread()的情況,執行到這里,可以明確佇列首先不是空,并且h.next != null,也就是頭節點之后還有其他節點,最后再判斷了下,s.thread != Thread.currentThread為true,也就是頭節點的下個節點并不是當前節點,既然如此,那只能乖乖去佇列中排隊了,所以最侄訓傳true,

三、業務運用

想必大家對于并發鎖并不陌生了,上文我也是通過ReentrantLock這個并發鎖為入口,一步步來決議AQS中的實作,所以這里就不用ReentrantLock舉例,這里換一個同步工具:CountDownLatch,它也是基于AQS來實作的,

CountDownLatch是通過一個計數器來實作的,初始值為執行緒的數量,每當一個執行緒完成了自己的任務,計數器的值就相應得減1,當計數器到達0時,表示所有的執行緒都已執行完畢,然后在等待的執行緒就可以恢復執行任務,

這個其實跟ReentrantLock思路差不多,一個是state初始值就是0,通過“上鎖”一步步疊加這個值;一個是state讓使用者自己設定初始值,通過執行緒呼叫,一步步遞減這個值,

CountDownLatch具體的運用情況如下:1、一個主執行緒中,需要開啟多個子執行緒,并且要在多個子執行緒執行完畢后,主執行緒才能繼續往下執行,2、通過多個執行緒一起執行,提高執行的效率,

下面,通過一個真實的業務場景,來進一步了解下CountDownLatch這個同步工具,具體是怎么使用的,

現在有這么一個介面,查詢用戶的詳情資訊;用戶資訊由五部分組成:1、用戶基本資訊;2、用戶影像資訊;3、用戶工商資訊;4、用戶賬戶資訊;5、用戶組織架構資訊;按照原本的邏輯是按照順序1-5這樣一步步查詢,最后組裝用戶VO物件,介面回傳,但是這里可以用上CountDownLatch這個工具類,申請五個執行緒,分別去查詢這五種資訊,提高介面效率,

/**
 * @author 往事如風
 * @version 1.0
 * @date 2023/4/11 18:10
 * @description:匯出報表
 */
@RestController
public class QueryController {

    @GetMapping("/query")
    public Result download() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        // 模擬查詢資料
        List<String> row1 = CollUtil.newArrayList("aa", "bb", "cc", "dd");
        List<String> row2 = CollUtil.newArrayList("aa1", "bb1", "cc1", "dd1");
        List<String> row3 = CollUtil.newArrayList("aa2", "bb2", "cc2", "dd2");
        List<String> row4 = CollUtil.newArrayList("aa3", "bb3", "cc3", "dd3");
        List<String> row5 = CollUtil.newArrayList("aa4", "bb4", "cc4", "dd4");
        CountDownLatch count = new CountDownLatch(5);
        DataQuery d = new DataQuery();
        // 開始時間
        long start = System.currentTimeMillis();
        System.out.println("開始查詢資料,,,,");
        executorService.execute(() -> {
            System.out.println("查詢用戶基本資訊,,,,,,");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            d.setBaseInfo(row1);
            count.countDown();
        });
        executorService.execute(() -> {
            System.out.println("查詢用戶影像資訊,,,,,,");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            d.setImgInfo(row2);
            count.countDown();
        });
        executorService.execute(() -> {
            System.out.println("查詢用戶工商資訊,,,,,,");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            d.setBusinessInfo(row3);
            count.countDown();
        });
        executorService.execute(() -> {
            System.out.println("查詢用戶賬戶資訊,,,,,,");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            d.setAccountInfo(row4);
            count.countDown();
        });
        executorService.execute(() -> {
            System.out.println("查詢用戶組織架構資訊,,,,,,");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            d.setOrgInfo(row5);
            count.countDown();
        });
        // 阻塞:直到count的值減為0
        count.await();
        executorService.shutdown();
        // 結束時間
        long end = System.currentTimeMillis();
        System.out.println("查詢結束,,,,,");
        System.out.println("用時時間:" + (end - start));
        return Result.success(d);
    }

    @Data
    class DataQuery {
        private List<String> baseInfo;
        private List<String> imgInfo;
        private List<String> businessInfo;
        private List<String> accountInfo;
        private List<String> orgInfo;
    }
}

/*
控制臺輸出:
開始查詢資料,,,,
查詢用戶基本資訊,,,,,,
查詢用戶影像資訊,,,,,,
查詢用戶工商資訊,,,,,,
查詢用戶賬戶資訊,,,,,,
查詢用戶組織架構資訊,,,,,,
查詢結束,,,,,
用時時間:1017
*/

這段代碼做了模擬查詢各種用戶資訊的操作,其中每個執行緒都暫停1秒,代表在查詢這五種資料;最終列印的用時時間是1017ms,說明這五個執行緒是同時進行的,大大提高了介面的效率,

四、寫在最后

AQS提供了一個FIFO佇列,這里稱為CLH佇列,可以看成是一個用來實作同步鎖以及其他涉及到同步功能的核心組件,常見的有:ReentrantLockCountDownLatchSemaphore等,

AQS是一個抽象類,主要是通過繼承的方式來使用,它本身沒有實作任何的同步介面,僅僅是定義了同步狀態的獲取以及釋放的方法來提供自定義的同步組件,

可以這么說,只要搞懂了AQS,那么J.U.C中絕大部分的api都能輕松掌握,

本文主要提供了從ReentrantLock出發,決議了AQS中的各種公用的方法,如果需要知道其他類中怎么去使用AQS中的方法,其實也只需要找到切入點,一步步除錯下去即可,不過,我想很多地方都是和ReentrantLock中一致的,

Gitee主頁: https://gitee.com/cicadasmile/butte-java-note

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/550079.html

標籤:其他

上一篇:一文吃透Elasticsearch

下一篇:Spring Boot 介面加解密,新姿勢來了!

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more