新聞中心
java并發(fā)容器J.U.C AQS怎么用,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
成都創(chuàng)新互聯(lián)公司堅持“要么做到,要么別承諾”的工作理念,服務領域包括:成都做網站、網站設計、企業(yè)官網、英文網站、手機端網站、網站推廣等服務,滿足客戶于互聯(lián)網時代的江陰網站設計、移動媒體設計的需求,幫助企業(yè)找到有效的互聯(lián)網解決方案。努力成為您成熟可靠的網絡建設合作伙伴!
AbstractQueueSynchronizer (AQS)
J.U.C 大大提高了java并發(fā)的性能,而AQS則是J.U.C的核心。
AQS底層使用雙向列表(隊列的一種實現(xiàn))。
使用Node實現(xiàn)FIFO隊列,可以用于構建鎖或者其他同步裝置的基礎框架
利用了一個int類型表示狀態(tài)。 在AQS中有一個status的成員變量,基于AQS有一個同步組件ReentrantLock,在這個ReentrantLock中status表示獲取鎖的線程數(shù),例如status=0表示還沒有線程獲取鎖,status=1表示已經有線程獲取了鎖,status>1表示重入鎖的數(shù)量。
使用方法:繼承
子類通過繼承并通過實現(xiàn)它的方法管理其狀態(tài){acquire和release}的方法操縱狀態(tài)
可以同時實現(xiàn)排它鎖和共享鎖模式(獨占,共享)
AQS同步組件
countdownLatch,閉鎖,通過一個計數(shù)來保證線程是否需要一直阻塞。
semaphore,控制同一時間并發(fā)線程的數(shù)量。
cyclicbarrier,與countdownlatch很像,都能阻塞進程。
reentrantlock
condition
futuretask
countdownlatch
是一個同步輔助類,通過他可以實現(xiàn)類似于阻塞當前線程的功能。一個線程或多個線程一直等待,直到其他線程操作完成,countdownlatch用了一個給定的計數(shù)器來進行初始化,該計數(shù)器的操作是原子操作,也就是同時只能有一個線程操作該計數(shù)器。調用該類的await()方法則會一直處于阻塞狀態(tài),直到其他線程調用countdown()方法,每次調用countdown()方法會使得計數(shù)器的值減1,當計數(shù)器的值減為0時,所有因調用await方法處于等待狀態(tài)的線程就會繼續(xù)往下執(zhí)行。這種狀態(tài)只會出現(xiàn)一次,因為這里的計數(shù)器是不能被重置的,如果業(yè)務上需要一個可以重置計數(shù)次數(shù)的版本,可以考慮使用cyclicbarrier。

使用場景
在某些業(yè)務場景中,程序執(zhí)行需要等到某個條件完成后才能繼續(xù)執(zhí)行后續(xù)的操作,典型的應用例如并行計算:當某個處理的運算量很大時,可以將該運算任務拆分多個子任務,等待所有的子任務都完成之后,父任務拿到所有的子任務的運行結果進行匯總。
下面舉例countdownlatch的基本用法:
@Slf4j
public class CountDownLatchExample1 {
private final static int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for(int i = 0; i< threadCount; i++) {
final int threadNum = i;
executorService.execute(() ->{
try {
test(threadNum);
} catch (InterruptedException e) {
log.error("exception", e);
}finally {
countDownLatch.countDown();
}
});
}
//可以保證之前的線程都執(zhí)行完成
countDownLatch.await();
log.info("finish");
executorService.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}一個復雜的場景:我們開了很多個線程去完成一個任務,但是這個任務需要在指定的時間內完成,如果超過一定的時間沒有完成則放棄該任務。
@Slf4j
public class CountDownLatchExample2 {
private final static int threadCount = 200;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for(int i = 0; i< threadCount; i++) {
final int threadNum = i;
executorService.execute(() ->{
try {
test(threadNum);
} catch (InterruptedException e) {
log.error("exception", e);
}finally {
countDownLatch.countDown();
}
});
}
//可以保證之前的線程都執(zhí)行完成
countDownLatch.await(10, TimeUnit.MILLISECONDS);
log.info("finish");
// 第一時間內并不會把所有線程都銷毀,而是讓當前已有線程執(zhí)行完之后在把線程池銷毀。
executorService.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
Thread.sleep(100);
log.info("{}", threadNum);
}
}semaphore 信號量
可以控制某個資源可被同時訪問的個數(shù),與countdownlatch有些類似,提供了兩個核心方法:aquire和release。aquire表示獲取一個許可,如果沒有則等待,release表示操作完成后釋放一個許可。semaphore維護了當前訪問的個數(shù),提供同步機制控制訪問的個數(shù)。
使用場景
常用于僅能提供有限訪問的資源例如數(shù)據庫連接數(shù)是有限的,而上層應用的并發(fā)數(shù)會遠遠大于連接數(shù),如果同時對數(shù)據庫進行操作可能出現(xiàn)因為無法獲取數(shù)據庫連接而導致異常。這時可以通過信號量semaphore來并發(fā)訪問控制。當semaphore把并發(fā)數(shù)控制到1時就跟單線程運行很相似了。
舉例如下:
@Slf4j
public class SemaphoreExample1 {
private final static int threadCount = 20;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
//允許的并發(fā)數(shù)
final Semaphore semaphore = new Semaphore(3);
for(int i = 0; i< threadCount; i++) {
final int threadNum = i;
executorService.execute(() ->{
try {
// 獲取一個許可
semaphore.acquire();
test(threadNum);
// 釋放一個許可
semaphore.release();
} catch (InterruptedException e) {
log.error("exception", e);
}
});
}
log.info("finish");
executorService.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}運行結果可以看到同時3個線程在執(zhí)行。
也可以獲得多個許可:
@Slf4j
public class SemaphoreExample2 {
private final static int threadCount = 20;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
//允許的并發(fā)數(shù)
final Semaphore semaphore = new Semaphore(3);
for(int i = 0; i< threadCount; i++) {
final int threadNum = i;
executorService.execute(() ->{
try {
// 獲取多個許可
semaphore.acquire(3);
test(threadNum);
// 釋放多個許可
semaphore.release(3);
} catch (InterruptedException e) {
log.error("exception", e);
}
});
}
log.info("finish");
executorService.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}每一次獲取三個許可,而同時只允許3個并發(fā)數(shù),相當于單線程在運行。
@Slf4j
public class SemaphoreExample3 {
private final static int threadCount = 20;
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
//允許的并發(fā)數(shù)
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
executorService.execute(() -> {
try {
// 嘗試獲取一個許可
if (semaphore.tryAcquire()) {
test(threadNum);
// 釋放一個許可
semaphore.release();
}
} catch (InterruptedException e) {
log.error("exception", e);
}
});
}
log.info("finish");
executorService.shutdown();
}
private static void test(int threadNum) throws InterruptedException {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}輸出結果:
15:24:21.098 [pool-1-thread-1] INFO com.vincent.example.aqs.SemaphoreExample3 - 0 15:24:21.098 [pool-1-thread-2] INFO com.vincent.example.aqs.SemaphoreExample3 - 1 15:24:21.098 [main] INFO com.vincent.example.aqs.SemaphoreExample3 - finish 15:24:21.098 [pool-1-thread-3] INFO com.vincent.example.aqs.SemaphoreExample3 - 2
因為我們往線程池中放了二十個請求,二十個請求在同一時間內都會嘗試去執(zhí)行,semaphore會嘗試讓每個線程去獲取許可,而同一時刻內我們的并發(fā)數(shù)是3,也就是只有三個線程獲取到了許可,而test方法內有Thread.sleep(1000),因此其余17個線程都不能拿到許可,直接結束。
semaphore.tryAcquire(3, TimeUnit.SECONDS)
表示可以等3秒,如果3秒內沒拿到許可就結束。
CyclicBarrier

也是一個同步輔助類,允許一組線程相互等待,直到到達某個公共的屏障點。可以完成多個線程之間相互等待,只有當每個線程都準備就緒后,才能各自繼續(xù)往下執(zhí)行謀面的操作。它和countdownlatch有相似的地方,都是通過計數(shù)器來實現(xiàn)的,當一個線程調用await()方法后,該線程就進入了等待狀態(tài)。當循環(huán)計數(shù)器的值達到設置的初始值之后,進入等待狀態(tài)的線程會被喚醒,繼續(xù)執(zhí)行后續(xù)操作。因為CyclicBarrier在釋放等待線程后可以重用,所以稱他為循環(huán)屏障。
CyclicBarrier的使用場景與countdownlatch類似,CyclicBarrier可以用于多線程計算數(shù)據,最后合并計算結果的應用場景。
CyclicBarrier與Countdownlatch的區(qū)別:
countdownlatch的計數(shù)器只能使用一次,CyclicBarrier可以使用reset方法重復使用
countdownlatch主要是實現(xiàn)一個或n個線程需要等待其他線程完成某項操作之后,才能繼續(xù)往下執(zhí)行,他描述的是1個或n個線程等待其他線程的關系。而CyclicBarrier主要實現(xiàn)了多個線程之間相互等待,直到所有線程都滿足了條件之后才能繼續(xù)執(zhí)行后續(xù)的操作,它描述的是各個線程內部相互等待的關系。所以CyclicBarrier可以處理更復雜的業(yè)務場景,例如計數(shù)器發(fā)生錯誤可以重置計數(shù)器,讓線程重新執(zhí)行一次。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對創(chuàng)新互聯(lián)的支持。
網頁標題:java并發(fā)容器J.U.CAQS怎么用
轉載源于:http://fisionsoft.com.cn/article/iijgjs.html


咨詢
建站咨詢
