成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

Java concurrent 源碼學(xué)習(xí)筆記2 - 鎖

vpants / 1345人閱讀

摘要:同步器的實(shí)現(xiàn)根據(jù)其狀態(tài)是否獨(dú)占而有所不同。這個(gè)框架為同步狀態(tài)的原子性管理線程的阻塞和解除阻塞以及排隊(duì)提供了一種通用的機(jī)制。在需要使用同步器的目標(biāo)類中,繼承了的子類要求被聲明為目標(biāo)類的非公有內(nèi)部類。類通過(guò)一組方法實(shí)現(xiàn)線程的阻塞和解除阻塞。

java.util.concurrent.locks包主要是提供線程通信的鎖,下面看一下包中有哪些類。

Unsafe

concurrent包里的很多方法都是基于sun.misc.Unsafe這個(gè)類,Unsafe這個(gè)類從名字上可以看出是一個(gè)不安全的類,JDK也并沒(méi)有把這個(gè)類開(kāi)放給用戶使用(但是我們可以通過(guò)一些比較hack的方式使用到這個(gè)類)。Unsafe是一個(gè)單例的類,通過(guò)靜態(tài)的getUnsafe()方法獲取到他的實(shí)例,可以看到,在方法中會(huì)判斷調(diào)用Unsafe.getUnsafe()方法的類的類加載器是不是引導(dǎo)類加載器BootstrapClassLoader,一般我們開(kāi)發(fā)的代碼所屬的類加載器會(huì)是AppClassLoader及其子類,所以此時(shí)會(huì)拋出SecurityException,告訴我們unsafe,不要用??!

@CallerSensitive
public static Unsafe getUnsafe() {
    Class var0 = Reflection.getCallerClass();
    if(!VM.isSystemDomainLoader(var0.getClassLoader())) {
        throw new SecurityException("Unsafe");
    } else {
        return theUnsafe;
    }
}

Unsafe類在JDK源碼中經(jīng)常用到,主要作用是任意內(nèi)存地址位置處讀寫(xiě)數(shù)據(jù),以及CAS操作。它的大部分操作都是通過(guò)JNI(Java Native Interface)完成的,因此它所分配的內(nèi)存需要手動(dòng)free,所以是非常危險(xiǎn)的。

Java并發(fā)中主要用到是的Unsafe中的Compare And Swap操作,CAS 操作包含三個(gè)操作數(shù) —— 內(nèi)存位置(offset)、預(yù)期原值(A)和新值(B)。如果內(nèi)存位置的值與預(yù)期原值相匹配,那么處理器會(huì)自動(dòng)將該位置值更新為新值。否則,處理器不做任何操作。無(wú)論哪種情況,它都會(huì)在 CAS 指令之前返回該位置的值。CAS 有效地說(shuō)明了“我認(rèn)為位置 V 應(yīng)該包含值 A;如果包含該值,則將 B 放到這個(gè)位置;否則,不要更改該位置,只告訴我這個(gè)位置現(xiàn)在的值即可。”

// 獲取類的某個(gè)字段在類的實(shí)例中內(nèi)存位置的偏移量
public native long objectFieldOffset(Field var1);
/*
* 下面三個(gè)方法是類似的,對(duì)var1對(duì)象的偏移量是var2的字段進(jìn)行CAS操作
* 預(yù)期值是var4,如果該字段當(dāng)前值是var4,則更新為var5,否則什么都不做
*/
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
LockSupport

LockSupport是j.u.c包中并發(fā)控制的基礎(chǔ),它的底層是依賴于Unsafe實(shí)現(xiàn)的。LockSupport提供了Thread.suspend()和Thread.resume()的替代方案,因?yàn)閟uspend和resume是不安全的,所以已經(jīng)被標(biāo)記為deprecated。suspend()和resume()必須要成對(duì)出現(xiàn),否則非常容易發(fā)生死鎖。
因?yàn)閟uspend方法并不會(huì)釋放鎖,如果使用suspend的目標(biāo)線程對(duì)一個(gè)重要的系統(tǒng)資源持有鎖,那么沒(méi)任何線程可以使用這個(gè)資源直到要suspend的目標(biāo)線程被resumed,如果一個(gè)線程在resume目標(biāo)線程之前嘗試持有這個(gè)重要的系統(tǒng)資源鎖再去resume目標(biāo)線程,這兩條線程就相互死鎖了。

public class LockSupport {
    private LockSupport() {} // Cannot be instantiated.
    public static void unpark(Thread thread);
    public static void park(Object blocker);
    public static void parkNanos(Object blocker, long nanos);
    public static void parkUntil(Object blocker, long deadline);
    public static void park();
    public static void parkNanos(long nanos);
    public static void parkUntil(long deadline);
}

LockSupport中主要用到park和unpark方法,park阻塞當(dāng)前線程,unpark解除指定線程的阻塞。而且unpark可以在park之前執(zhí)行,比Thread的wait/notify更加靈活。
LockSupport中有個(gè)叫做permit(許可)的概念,unpark方法有兩種情況:

如果入?yún)⒌木€程是阻塞的,那么解除該線程的阻塞

否則給該線程一個(gè)permit,確保該線程下一次執(zhí)行park的時(shí)候不被阻塞,直接返回。

相應(yīng)的,park也分為兩種情況:

如果一個(gè)線程有許可的話,那么它在調(diào)用park方法時(shí)就會(huì)收回它那個(gè)許可,但是不會(huì)被阻塞,而是直接返回。但是當(dāng)它再次調(diào)用park方法時(shí),因?yàn)樵S可已經(jīng)被用掉了,于是又成了第2種情況。

如果一個(gè)線程沒(méi)有許可,那么它在調(diào)用park方法時(shí)就會(huì)被阻塞,直到以下事件之一發(fā)生才會(huì)解除阻塞。

有其它線程調(diào)用unpark方法給它發(fā)許可

其他線程調(diào)用了當(dāng)前線程的interrupt方法

阻塞過(guò)時(shí)(調(diào)用parkNanos(long nanos)阻塞指定時(shí)間長(zhǎng)度或調(diào)用parkUntil(long deadline)阻塞直到指定的時(shí)間戳)

虛假喚醒(Spurious wakeup)

需要注意的一點(diǎn)是,一個(gè)線程一個(gè)時(shí)刻最多只能有一個(gè)許可,即使你多次調(diào)用unpark方法它也只能有一個(gè)許可.

The three forms of park each also support a blocker object parameter. This object is recorded while the thread is blocked to permit monitoring and diagnostic tools to identify the reasons that threads are blocked. (Such tools may access blockers using method getBlocker(Thread).) The use of these forms rather than the original forms without this parameter is strongly encouraged. The normal argument to supply as a blocker within a lock implementation is this.

park, parkUntil, parkNanos這3個(gè)方法都分別對(duì)應(yīng)有一個(gè)帶Object blocker參數(shù)的方法,表示把線程阻塞在這個(gè)對(duì)象上,類似于synchronized()中的鎖對(duì)象,以允許監(jiān)視工具和診斷工具確定線程受阻塞的原因。Java官方建議使用帶blocker參數(shù)的park方法,并用this關(guān)鍵字作為blocker參數(shù)。

AbstractOwnableSynchronizer

可以由線程以獨(dú)占方式擁有的同步器。此類為創(chuàng)建鎖和相關(guān)同步器(伴隨著所有權(quán)的概念)提供了基礎(chǔ)。AbstractOwnableSynchronizer 類本身不管理或使用此信息。但是,子類和工具可以使用適當(dāng)維護(hù)的值幫助控制和監(jiān)視訪問(wèn)以及提供診斷。

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    private static final long serialVersionUID = 3737899427754241961L;
    
    protected AbstractOwnableSynchronizer() { }
    
    private transient Thread exclusiveOwnerThread;
    
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

這是一個(gè)抽象類,在j.u.c包中它有2個(gè)子類:AbstractQueuedSynchronizerAbstractQueuedLongSynchronizer。同步器的實(shí)現(xiàn)根據(jù)其狀態(tài)是否獨(dú)占而有所不同。獨(dú)占狀態(tài)的同步器,在同一時(shí)間只有一個(gè)線程可以通過(guò)阻塞點(diǎn),而共享狀態(tài)的同步器可以同時(shí)有多個(gè)線程在執(zhí)行。一般鎖的實(shí)現(xiàn)類往往只維護(hù)獨(dú)占狀態(tài),但是,例如計(jì)數(shù)信號(hào)量在數(shù)量許可的情況下,允許多個(gè)線程同時(shí)執(zhí)行。為了使框架能得到廣泛應(yīng)用,這兩種模式都要支持。

AbstractQueuedSynchronizer

在JDK1.5之前,線程同步是通過(guò)synchronized關(guān)鍵字實(shí)現(xiàn)的,
從JDK1.5開(kāi)始提供的java.util.concurrent包中,大部分的同步器(例如鎖,屏障等等)都是基于AbstractQueuedSynchronizer類(下稱AQS類)而構(gòu)建的。這個(gè)框架為同步狀態(tài)的原子性管理、線程的阻塞和解除阻塞以及排隊(duì)提供了一種通用的機(jī)制。

線程同步涉及兩個(gè)操作,對(duì)臨界資源的競(jìng)爭(zhēng)和釋放。在j.u.c包中,這兩個(gè)操作的設(shè)計(jì)思想是:

acquire

while (synchronization state does not allow acquire) {
enqueue current thread if not already queued;
possibly block current thread;
}
dequeue current thread if it was queued;

release

update synchronization state;
if (state may permit a blocked thread to acquire)
unblock one or more queued threads;

為了實(shí)現(xiàn)上述操作,需要下面三個(gè)基本組件的相互協(xié)作:

同步狀態(tài)的原子性管理;

線程的阻塞與解除阻塞;

隊(duì)列的管理;

AQS類的一般用法是繼承,在子類中定義管理同步狀態(tài)的方法,并且定義這個(gè)AQS實(shí)現(xiàn)類在acquire和release操作時(shí)同步狀態(tài)變化對(duì)應(yīng)的含義。AQS類負(fù)責(zé)管理線程的阻塞和線程隊(duì)列。在需要使用同步器的目標(biāo)類中,繼承了AQS的子類要求被聲明為目標(biāo)類的非公有內(nèi)部類。例如下圖j.u.c包中,在需要使用AQS控制線程同步時(shí),都是在類中聲明一個(gè)內(nèi)部類并繼承AQS。

AQS類支持共享和排他兩種模式,排他模式下,只能有一個(gè)線程acquire,共享模式下可以多個(gè)線程同時(shí)acquire。

1. 同步狀態(tài)
AQS類使用單個(gè)int(32位)來(lái)保存同步狀態(tài),并暴露出getState、setState以及compareAndSetState操作來(lái)讀取和更新這個(gè)狀態(tài)。compareAndSetState僅當(dāng)同步狀態(tài)擁有一個(gè)期望值的時(shí)候,才會(huì)被原子地設(shè)置成新值。

private volatile int state;

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

將同步狀態(tài)限制為一個(gè)32位的整型是出于實(shí)踐上的考量。雖然JSR166也提供了64位long字段的原子性操作,但這些操作在很多平臺(tái)上還是使用內(nèi)部鎖的方式來(lái)模擬實(shí)現(xiàn)的,這會(huì)使同步器的性能可能不會(huì)很理想。JDK1.6中新增的java.util.concurrent.locks.AbstractQueuedLongSynchronizer類,就是使用long變量維護(hù)同步狀態(tài)的一個(gè)AbstractOwnableSynchronizer版本。目前來(lái)說(shuō),32位的狀態(tài)對(duì)大多數(shù)應(yīng)用程序都是足夠的。在j.u.c包中,只有一個(gè)同步器類可能需要多于32位來(lái)維持狀態(tài),那就是CyclicBarrier類,所以它用了鎖(該包中大多數(shù)更高層次的工具亦是如此)。

基于AQS的具體實(shí)現(xiàn)類必須根據(jù)暴露出的狀態(tài)相關(guān)的方法定義tryAcquire和tryRelease方法,以控制acquire和release操作。當(dāng)同步狀態(tài)滿足時(shí),tryAcquire方法必須返回true,而當(dāng)新的同步狀態(tài)允許后續(xù)acquire時(shí),tryRelease方法也必須返回true。這些方法都接受一個(gè)int類型的參數(shù)用于傳遞想要的狀態(tài)。例如:可重入鎖中,當(dāng)某個(gè)線程從條件等待中返回,然后重新獲取鎖時(shí),為了重新建立循環(huán)計(jì)數(shù)的場(chǎng)景。很多同步器并不需要這樣一個(gè)參數(shù),因此忽略它即可。
2. 隊(duì)列
整個(gè)框架的關(guān)鍵就是如何管理被阻塞的線程的隊(duì)列,該隊(duì)列是嚴(yán)格的FIFO隊(duì)列,因此,框架不支持基于優(yōu)先級(jí)的同步。

Node

隊(duì)列中的元素Node(AQS的內(nèi)部類)就是保存著線程引用和線程狀態(tài)的容器,每個(gè)線程對(duì)同步器的訪問(wèn),都可以看做是隊(duì)列中的一個(gè)節(jié)點(diǎn)。Node的主要包含以下成員變量:

static final class Node {
    volatile int waitStatus;
    volatile Node prev; // 前驅(qū)節(jié)點(diǎn)
    volatile Node next; // 后繼節(jié)點(diǎn)
    volatile Thread thread; // 入隊(duì)列時(shí)的當(dāng)前線程
    Node nextWaiter; // 存儲(chǔ)condition隊(duì)列中的后繼節(jié)點(diǎn)
    /* waitStatus */
    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    /* 標(biāo)識(shí)節(jié)點(diǎn)的等待是共享模式或排他模式 */
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
}

waitStatus的含義:

CANCELLED,值為1,表示當(dāng)前的線程因超時(shí)或中斷被取消;
SIGNAL,值為-1,表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線程處于阻塞狀態(tài),當(dāng)前節(jié)點(diǎn)線程釋放時(shí)需要對(duì)后繼進(jìn)行unpark;
CONDITION,值為-2,表示當(dāng)前節(jié)點(diǎn)在等待condition,也就是在condition隊(duì)列中;
PROPAGATE,值為-3,表示在同步器在共享模式下,當(dāng)前節(jié)點(diǎn)釋放后傳播到其他節(jié)點(diǎn);
值為0,表示當(dāng)前節(jié)點(diǎn)在sync隊(duì)列中,等待著獲取鎖

enq節(jié)點(diǎn)入隊(duì),如果隊(duì)列為空則先初始化隊(duì)列,創(chuàng)建一個(gè)空節(jié)點(diǎn)作為頭節(jié)點(diǎn)。

private transient volatile Node head; // 隊(duì)列頭節(jié)點(diǎn)
private transient volatile Node tail; // 隊(duì)列尾節(jié)點(diǎn)
/* 入隊(duì) */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 延遲初始化,隊(duì)列為空時(shí)創(chuàng)建一個(gè)空Node,head和tail都指向這個(gè)Node
            if (compareAndSetHead(new Node()))
            tail = head;
        } else { // 死循環(huán)CAS操作,把新節(jié)點(diǎn)和隊(duì)列當(dāng)前尾節(jié)點(diǎn)做雙向綁定
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter先判斷tail如果不為空則進(jìn)行一次快速的插入,否則使用enq進(jìn)行可能包括隊(duì)列初始化的入隊(duì)操作。

/*
* 把當(dāng)前線程用Node包裝起來(lái)并入隊(duì)
* mode有兩種情況: Node.EXCLUSIVE/Node.SHARED
* this.nextWaiter = mode;
*/
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

3. 阻塞
AQS可以根據(jù)具體的場(chǎng)景提供exclusive模式和shared模式,在exclusive模式下,同一時(shí)刻最多只能有一個(gè)線程能夠處于成功獲取的狀態(tài),排他鎖是一個(gè)exclusive模式的例子,shared模式則可以多個(gè)線程一起獲取成功,如多個(gè)許可的Semaphore。

AQS類通過(guò)一組aquire/release方法實(shí)現(xiàn)線程的阻塞和解除阻塞。在共享模式和獨(dú)占模式下,又有所區(qū)別。
子類需要去實(shí)現(xiàn)以下方法:

/* 獨(dú)占模式 */
protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
/* 共享模式 */
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)
獨(dú)占模式下的acquire

首先嘗試一次tryAcquire, 如果不成功則添加一個(gè)Node節(jié)點(diǎn)到等待隊(duì)列反復(fù)重試。

public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

當(dāng)前線程包裝為node對(duì)象加入隊(duì)尾,acquireQueued則在循環(huán)中判斷node的前驅(qū)節(jié)點(diǎn)是不是head,如果是則繼續(xù)嘗試tryAcquire,如果acquire成功則說(shuō)明成功通過(guò)了acquire,則將自己設(shè)置為新的head。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        /* 死循環(huán)中不斷重試acquire */
        for (;;) {
            final Node p = node.predecessor();
            /* 嘗試acquire,成功則把自己設(shè)為隊(duì)列head節(jié)點(diǎn) */
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            /* acquire失敗后判斷是否park阻塞,還是要繼續(xù)重試acquire */
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed) cancelAcquire(node);
    }
}

/* pred是node的前驅(qū)節(jié)點(diǎn),此方法用于判斷node節(jié)點(diǎn)acquire失敗后是否park阻塞 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
        * 前驅(qū)節(jié)點(diǎn)狀態(tài)是SIGNAL,release時(shí)會(huì)signal喚醒node
        * 所以node在acquire失敗時(shí)應(yīng)當(dāng)繼續(xù)park阻塞
        */
        return true;
    if (ws > 0) {
        /*
        * 前驅(qū)節(jié)點(diǎn)pred狀態(tài)是CANCELLED
        * 向前遍歷隊(duì)列,直到找到狀態(tài)不是CANCELLED的節(jié)點(diǎn)
        * 把這個(gè)節(jié)點(diǎn)和node設(shè)置為前驅(qū)后繼關(guān)系
        */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
        * 前驅(qū)節(jié)點(diǎn)的狀態(tài)是0或PROPAGATE
        * 前驅(qū)節(jié)點(diǎn)狀態(tài)更新為SIGNAL,release時(shí)喚醒node節(jié)點(diǎn)
        * node節(jié)點(diǎn)則不需要park,繼續(xù)嘗試acquire
        */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
/* 當(dāng)前線程park,并返回中斷狀態(tài) */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
共享模式下的acquire

子類實(shí)現(xiàn)tryAcquireShared(arg), 調(diào)用tryAcquireShared返回值小于0說(shuō)明獲取失敗,等于0表示獲取成功,但是接下來(lái)的acquireShared不會(huì)成功,大于0說(shuō)明tryAcquireShared獲取成功并且接下來(lái)的acquireShared也可能成功。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

實(shí)現(xiàn)共享模式acquire的主要邏輯在下邊的doAcquireShared方法中,把當(dāng)前線程封裝為Node加入隊(duì)列,向前遍歷隊(duì)列,直到當(dāng)前節(jié)點(diǎn)的前驅(qū)是頭節(jié)點(diǎn),然后嘗試tryAcquireShared,tryAcquireShared成功后(結(jié)果>=0),調(diào)用setHeadAndPropagate。

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireShared中tryAcquireShared返回值大于0,head為null或head的waitStatus小于0,滿足以上條件情況下,判斷當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)若為null或是共享類型,調(diào)用doReleaseShared喚醒后繼節(jié)點(diǎn)以確保共享沿隊(duì)列繼續(xù)傳播。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
    * Try to signal next queued node if:
    * Propagation was indicated by caller,
    * or was recorded (as h.waitStatus either before
    * or after setHead) by a previous operation
    * (note: this uses sign-check of waitStatus because
    * PROPAGATE status may transition to SIGNAL.)
    * and
    * The next node is waiting in shared mode,
    * or we don"t know, because it appears null
    *
    * The conservatism in both of these checks may cause
    * unnecessary wake-ups, but only when there are multiple
    * racing acquires/releases, so most need signals now or soon
    * anyway.
    */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
獨(dú)占模式下的release

如果tryRelease返回了true,說(shuō)明可以喚醒其他線程,則判斷head不為null并且waitStatus不為0的情況下去unpark后繼節(jié)點(diǎn)。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

unparkSuccessor中當(dāng)node的后繼節(jié)點(diǎn)為null或waitStatus > 0說(shuō)明
next已經(jīng)取消。此時(shí)需要從tail向前遍歷找到離node最近的沒(méi)有取消的節(jié)點(diǎn)進(jìn)行unpark。如果node的后繼節(jié)點(diǎn)s不是null而且waitStatus < 0則unpark節(jié)點(diǎn)s。

private void unparkSuccessor(Node node) {
    /*
    * If status is negative (i.e., possibly needing signal) try
    * to clear in anticipation of signalling. It is OK if this
    * fails or if status is changed by waiting thread.
    */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    
    /*
    * Thread to unpark is held in successor, which is normally
    * just the next node. But if cancelled or apparently null,
    * traverse backwards from tail to find the actual
    * non-cancelled successor.
    */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
共享模式下的release

tryReleaseShared返回true,調(diào)用doReleaseShared,允許一個(gè)等待的節(jié)點(diǎn) acquire成功。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

和獨(dú)占模式的release只unpark一個(gè)后繼節(jié)點(diǎn)不同的是,共享模式下 喚醒行為需要向后傳播。doReleaseShared會(huì)從head開(kāi)始往后檢查狀態(tài),如果節(jié)點(diǎn)是SIGNAL狀態(tài),就喚醒它的后繼節(jié)點(diǎn)。如果是0就標(biāo)記為PROPAGATE, 等它釋放鎖的時(shí)候會(huì)再次喚醒后繼節(jié)點(diǎn)。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue; // loop on failed CAS
        }
        if (h == head) // loop if head changed
            break;
    }
}

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://hztianpu.com/yun/77178.html

相關(guān)文章

  • Java concurrent 源碼學(xué)習(xí)筆記1 - 概覽

    摘要:源碼學(xué)習(xí)筆記基于包源碼大致分為以下幾組對(duì)包集合框架的擴(kuò)展更好的支持多線程并發(fā)操作線程池相關(guān)鎖基本數(shù)據(jù)類型的原子性封裝 Java concurrent 源碼學(xué)習(xí)筆記基于JDK1.8 concurrent包源碼大致分為以下幾組: 對(duì)util包集合框架的擴(kuò)展(更好的支持多線程并發(fā)操作) 線程池相關(guān) 鎖 基本數(shù)據(jù)類型的原子性封裝 showImg(https://segmentfault.c...

    CocoaChina 評(píng)論0 收藏0
  • 并發(fā)學(xué)習(xí)筆記(2)

    摘要:當(dāng)其他線程調(diào)用時(shí),它們被阻塞,直到第一個(gè)線程釋放鎖對(duì)象。包關(guān)于獲取這個(gè)鎖如果鎖同時(shí)被另一個(gè)線程擁有則發(fā)生阻塞。所以必須確保沒(méi)有其他線程再檢查余額和轉(zhuǎn)賬活動(dòng)之間修改金額。方法添加一個(gè)線程到等待集中,方法解除等待線程的阻塞狀態(tài)。 避免代碼塊受到并發(fā)訪問(wèn)的干擾 java語(yǔ)言提供了兩種機(jī)制實(shí)現(xiàn)這種功能 Synchonized 關(guān)鍵字(調(diào)用對(duì)象內(nèi)部的鎖) synchronized關(guān)鍵字自動(dòng)...

    saucxs 評(píng)論0 收藏0
  • 源碼筆記 Java AbstractQueuedSynchronizer

    摘要:總結(jié)總的來(lái)說(shuō),操作順序是進(jìn)入隊(duì)列喚醒,成功獲得鎖將狀態(tài)變?yōu)椴⑵鋸霓D(zhuǎn)到使再次獲得鎖執(zhí)行余下代碼。當(dāng)然這是理由狀態(tài)下,為了討論及的原理,實(shí)際的操作時(shí)序也有可能變化。 AQS Condition 最近面試被問(wèn)到j(luò)ava concurrent包下有哪些熟悉的,用過(guò)的工具。因此來(lái)回顧一下,這些工具的底層實(shí)現(xiàn),AbstractQueuedSynchronizer。在網(wǎng)上看到了其他人的一些技術(shù)博客...

    selfimpr 評(píng)論0 收藏0
  • 源碼筆記 Java AbstractQueuedSynchronizer

    摘要:總結(jié)總的來(lái)說(shuō),操作順序是進(jìn)入隊(duì)列喚醒,成功獲得鎖將狀態(tài)變?yōu)椴⑵鋸霓D(zhuǎn)到使再次獲得鎖執(zhí)行余下代碼。當(dāng)然這是理由狀態(tài)下,為了討論及的原理,實(shí)際的操作時(shí)序也有可能變化。 AQS Condition 最近面試被問(wèn)到j(luò)ava concurrent包下有哪些熟悉的,用過(guò)的工具。因此來(lái)回顧一下,這些工具的底層實(shí)現(xiàn),AbstractQueuedSynchronizer。在網(wǎng)上看到了其他人的一些技術(shù)博客...

    YuboonaZhang 評(píng)論0 收藏0
  • 高并發(fā)

    摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢?jiàn)它的使用,在開(kāi)始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽(tīng)名字多牛逼,來(lái),我們一步一步來(lái)?yè)羝魄皟蓚€(gè)名詞,今天我們首先來(lái)說(shuō)說(shuō)分布式。 探究...

    supernavy 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<