并發工具類
通常我們所說的并發包也就是java.util.concurrent(JUC),集中了Java并發的各種工具類, 合理地使用它們能幫忙我們快速地完成功能 ,
- 作者: 博學谷狂野架構師
- GitHub:GitHub地址 (有我精心準備的130本電子書PDF)
只分享干貨、不吹水,讓我們一起加油!??
1. CountDownLatch
CountDownLatch是一個同步計數器,初始化的時候 傳入需要計數的執行緒等待數,可以是需要等待執行完成的執行緒數,或者大于 ,一般稱為發令槍,\
? countdownlatch 是一個同步類工具,不涉及鎖定,當count的值為零時當前執行緒繼續運行,不涉及同步,只涉及執行緒通信的時候,使用它較為合適
1.1 作用
用來協調多個執行緒之間的同步,或者說起到執行緒之間的通信(而不是用作互斥的作用),是一組執行緒等待其他的執行緒完成作業以后在執行,相當于加強版join,
注意:這是一個一次性操作 - 計數無法重置, 如果你需要一個重置的版本計數,考慮使用CyclicBarrier,
1.2 舉例
? 我們去組團游玩一樣,總共30個人,來的人要等待還沒有到的人,一直等到第30個人到了,我們才開始出發,在等待程序中,其他人(執行緒)是等待狀態不做任何事情的,一直等所有人(執行緒)到齊了(準備完成)才開始執行,
1.3 概念
- countDownLatch這個類使一個執行緒等待其他執行緒各自執行完畢后再執行,
- 是通過一個計數器來實作的,計數器的初始值是執行緒的數量,每當一個執行緒執行完畢后,計數器的值就-1,當計數器的值為0時,表示所有執行緒都執行完畢,然后在閉鎖上等待的執行緒就可以恢復作業了,
我們打開CountDownLatch的源代碼分析,我們發現最重要的方法就是一下這兩個方法:
//阻塞當前執行緒,等待其他執行緒執行完成,直到計數器計數值減到0,
public void await() throws InterruptedException;
//阻塞當前執行緒指定的時間,如果達到時間就放行,等待其他執行緒執行完成,直到計數器計數值減到0,
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
//負責計數器的減一,
public void countDown():
1.4 應用場景
1.4.1 多執行緒壓測
有時我們想同時啟動多個執行緒,實作最大程度的并行性,
? 例如,我們想測驗一個單例類,如果我們創建一個初始計數為1的CountDownLatch,并讓所有執行緒都在這個鎖上等待,那么我們可以很輕松地完成測驗,我們只需呼叫 一次countDown()方法就可以讓所有的等待執行緒同時恢復執行,
1.4.2 等待其他執行緒
? 例如應用程式啟動類要確保在處理用戶請求前,所有N個外部系統已經啟動和運行了,例如處理excel中多個表單,如果一個一個出來很耗IO和性能,我們可以等100或者1000個執行緒都完成了表單的操作后一下子寫進excel表單中,
注意:一個執行緒不一定只能做countDown一次,也可以countDown多次
1.5 示例
1.5.1 準備完成后執行
在實際專案中可能有些執行緒需要資源準備完成后才能進行執行,這個時候就可以使用countDownLatch
package chapter02.countdownlatch;
import java.util.Random;
import java.util.concurrent.*;
/**
* countdownlatch 示例
*/
public class CountDownLatchTest {
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
private static Random random = new Random();
public static void execute(CountDownLatch countDownLatch) {
//獲取一個亂數
long sleepTime = random.nextInt(10);
//獲取執行緒ID
long threadId = Thread.currentThread().getId();
System.out.println("執行緒ID" + threadId + ",開始執行--countDown");
try {
//睡眠隨機秒
Thread.sleep(sleepTime * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//計數器減1
countDownLatch.countDown();
System.out.println("執行緒ID" + threadId + ",準備任務完成耗時:" + sleepTime + "當前時間" + System.currentTimeMillis());
try {
//執行緒等待其他任務完成后喚醒
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("執行緒ID" + threadId + ",開始執行任務,當前時間:" + System.currentTimeMillis());
}
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
executorService.submit(() -> {
execute(countDownLatch);
});
}
//執行緒等待其他任務完成后喚醒
countDownLatch.await();
Thread.sleep(1000);
executorService.shutdown();
System.out.println("全部任務執行完成");
}
}
1.5.2 多執行緒壓測
在實戰專案中,我們除了使用 jemter 等工具進行壓測外,還可以自己動手使用 CountDownLatch 類撰寫壓測代碼,
? 可以說 jemter 的并發壓測背后也是使用的 CountDownLatch,可見掌握 CountDownLatch 類的使用是有多么的重要, CountDownLatch是Java多執行緒同步器的四大金剛之一,CountDownLatch能夠使一個執行緒等待其他執行緒完成各自的作業后再執行,
package chapter02.countdownlatch;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* countDownLatch 壓測
*/
public class CountDownLatchPressure {
/**
* 壓測業務代碼
*/
public void testLoad() {
System.out.println("壓測:" + Thread.currentThread().getId() + ":" + System.currentTimeMillis());
}
/**
* 壓測啟動
* 主執行緒負責壓測執行緒準備作業
* 壓測執行緒準備完成后 呼叫 start.countDown(); 啟動執行緒執行
* @throws InterruptedException
*/
private void latchTest() throws InterruptedException {
//壓測執行緒數
int testThreads = 300;
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch end = new CountDownLatch(testThreads);
//創建執行緒池
ExecutorService exce = Executors.newFixedThreadPool(testThreads);
//準備執行緒準備
for (int i = 0; i < testThreads; i++) {
//添加到執行緒池
exce.submit(() -> {
try {
//啟動后等待 喚醒
start.await();
testLoad();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//壓測完成
end.countDown();
}
});
}
//連接池執行緒初始化完成 開始壓測
start.countDown();
//壓測完成后結束
end.await();
//關閉執行緒池
exce.shutdown();
}
public static void main(String[] args) throws InterruptedException {
CountDownLatchPressure countDownLatchPressure = new CountDownLatchPressure();
//開始壓測
countDownLatchPressure.latchTest();
}
}
2. CyclicBarrier
2.1 簡介
CyclicBarrier,是JDK1.5的java.util.concurrent(JUC)并發包中提供的一個并發工具類
C yclicBarrier可以使一定數量的執行緒反復地在柵欄位置處匯集,當執行緒到達柵欄位置時將呼叫await方法,這個方法將阻塞直到所有執行緒都到達柵欄位置,如果所有執行緒都到達柵欄位置,那么柵欄將打開,此時所有的執行緒都將被釋放,而柵欄將被重置以便下次使用,
2.2 舉例
就像生活中我們會約朋友們到某個餐廳一起吃飯,有些朋友可能會早到,有些朋友可能會晚到,但是這個餐廳規定必須等到所有人到齊之后才會讓我們進去,
? 這里的朋友們就是各個執行緒,餐廳就是CyclicBarrier,感覺和 CountDownLatch是一樣的,但是他們是有區別的,吃完飯之后可以選擇去玩一會,去處理任務,然后等待第二次聚餐,重復回圈,
2.3 功能
CyclicBarrier和CountDownLatch是非常類似的,CyclicBarrier核心的概念是在于設定一個等待執行緒的數量邊界,到達了此邊界之后進行執行,
? CyclicBarrier類是一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點(Common Barrier Point),
? CyclicBarrier類是一種同步機制,它能夠對處理一些演算法的執行緒實作同,換句話講,它就是一個所有執行緒必須等待的一個柵欄,直到所有執行緒都到達這里,然后所有執行緒才可以繼續做其他事情,
? 通過呼叫CyclicBarrier物件的await()方法,兩個執行緒可以實作互相等待,一旦N個執行緒在等待CyclicBarrier達成,所有執行緒將被釋放掉去繼續執行,
2.4 構造方法
我們可以看下 CyclicBarrier原始碼的構造方法
public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
2.4.1 引數介紹
-
parties : 是參與執行緒的個數 , 其引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴CyclicBarrier我已經到達了屏障,然后當前執行緒被阻塞,
-
barrierAction : 優先執行執行緒 ,用于在執行緒到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景,一般用于資料的整理以及匯總,例如excel插入一樣,等所有執行緒都插入完了,到達了屏障后,barrierAction執行緒開始進行保存操作,完成后,接下來由其他執行緒開始進行插入,然后到達屏障接著就是保存,不斷回圈,
CyclicBarrier可以用于多執行緒計算資料,最后合并計算結果的場景,
2.5 重要方法
我們上面介紹了構造方法,下面我們介紹下CyclicBarrier中重要的方法
//阻塞當前執行緒,等待其他執行緒執行完成,
public int await() throws InterruptedException, BrokenBarrierException
//阻塞當前執行緒指定的時間,如果達到時間就放行,等待其他執行緒執行完成,
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
- 執行緒呼叫 await() 表示自己已經到達柵欄
- BrokenBarrierException 表示柵欄已經被破壞,破壞的原因可能是其中一個執行緒 await() 時被中斷或者超時
2.6 基本使用
一個執行緒組的執行緒需要等待所有執行緒完成任務后再繼續執行下一次任務
package chapter02.cyclicbarrier;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
private static Random random = new Random();
/**
* 執行任務
*
* @param barrier
*/
public static void execute(CyclicBarrier barrier) {
//獲取一個亂數
long sleepTime = random.nextInt(10);
//獲取執行緒id
long threadId = Thread.currentThread().getId();
try {
//睡眠隨機秒
Thread.sleep(sleepTime * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("執行緒ID" + threadId + ",準備任務完成耗時:" + sleepTime + "當前時間" + System.currentTimeMillis());
//執行緒等待其他任務完成后喚醒
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("執行緒ID" + threadId + ",開始執行任務,當前時間:" + System.currentTimeMillis());
}
public static void main(String[] args) {
//初始化執行緒數量
int threadNum = 5;
//初始化一般的執行緒
CyclicBarrier barrier = new CyclicBarrier(5, () -> System.out.println("整理任務開始..."));
ExecutorService executor = Executors.newFixedThreadPool(threadNum);
for (int i = 0; i < threadNum; i++) {
executor.submit(() -> {
execute(barrier);
});
}
}
}
2.7 CyclicBarrier 與 CountDownLatch 區別
- CountDownLatch 是一次性的,CyclicBarrier 是可回圈利用的
- CountDownLatch.await一般阻塞作業執行緒,所有的進行預備作業的執行緒執行countDown,而CyclicBarrier通過作業執行緒呼叫await從而自行阻塞,直到所有作業執行緒達到指定屏障,再大家一起往下走,
- CountDownLatch 參與的執行緒的職責是不一樣的,有的在倒計時,有的在等待倒計時結束,CyclicBarrier 參與的執行緒職責是一樣的,
- 在控制多個執行緒同時運行上,CountDownLatch可以不限執行緒數量,而CyclicBarrier是固定執行緒數,
- CyclicBarrier還可以提供一個barrierAction,合并多執行緒計算結果,
3. Semaphore
3.1 簡介
Semaphore也叫信號量,在JDK1.5被引入,可以用來控制同時訪問特定資源的執行緒數量,通過協調各個執行緒,以保證合理的使用資源,
Semaphore內部維護了一組虛擬的許可,許可的數量可以通過建構式的引數指定,
- 訪問特定資源前,必須使用acquire方法獲得許可,如果許可數量為0,該執行緒則一直阻塞,直到有可用許可,
- 訪問資源后,使用release釋放許可,
? Semaphore是一種在多執行緒環境下使用的設施,該設施負責協調各個執行緒,以保證它們能夠正確、合理的使用公共資源的設施,也是作業系統中用于控制行程同步互斥的量,Semaphore是一種計數信號量,用于管理一組資源,內部是基于AQS的共享模式,它相當于給執行緒規定一個量從而控制允許活動的執行緒數,
? 可以用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源
3.2 舉例
? 這里面令牌就像停車位一樣,來了十輛車,停車位只有三個,只有三輛車能夠進行,只有等其他車開走后,其他車才能開進去,和鎖的不一樣的地方是,鎖一次只能進入一輛車,但是Semaphore允許一次進入很多車,這個令牌是可以調整的,隨時可以增減令牌,
3.3 應用場景
Semaphore 是 synchronized 的加強版,作用是控制執行緒的并發數量,就這一點而言,單純的synchronized 關鍵字是實作不了的,
? Semaphore可以用于做流量控制,特別是公用資源有限的應用場景,比如資料庫連接,假如有一個需求,要讀取幾萬個檔案的資料,因為都是IO密集型任務,我們可以啟動幾十個執行緒并發地讀取,但是如果讀到記憶體后,還需要存盤到資料庫中,而資料庫的連接數只有10個,這時我們必須控制只有10個執行緒同時獲取資料庫連接保存資料,否則會報錯無法獲取資料庫連接,這個時候,就可以使用Semaphore來做流量控制
3.4 作業原理
以一個停車場是運作為例,為了簡單起見,假設停車場只有三個車位,一開始三個車位都是空的,
? 這時如果同時來了五輛車,看門人允許其中三輛不受阻礙的進入,然后放下車攔,剩下的車則必須在入口等待,此后來的車也都不得不在入口處等待,
? 這時,有一輛車離開停車場,看門人得知后,打開車攔,放入一輛,如果又離開兩輛,則又可以放入兩輛,如此往復,
? 這個停車系統中,每輛車就好比一個執行緒,看門人就好比一個信號量,看門人限制了可以活動的執行緒,假如里面依然是三個車位,但是看門人改變了規則,要求每次只能停兩輛車,那么一開始進入兩輛車,后面得等到有車離開才能有車進入,但是得保證最多停兩輛車,
? 對于Semaphore類而言,就如同一個看門人,限制了可活動的執行緒數,
3.5 構造方法
創建具有給定許可數的計數信號量并設定為非公平信號量
查看Semaphore原始碼發現他有這兩個構造方法
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
3.5.1 引數介紹
-
permits 是設定同時允許通過的執行緒數
-
fair 等于true時,創建具有給定許可數的計數信號量并設定為公平信號量,
3.6 其他方法
Semaphore類里面還有一些重要的方法
//從此信號量獲取一個許可前執行緒將一直阻塞,相當于一輛車占了一個車位
public void acquire() throws InterruptedException
//從此信號量獲取給定數目許可,在提供這些許可前一直將執行緒阻塞,比如n=2,就相當于一輛車占了兩個車位,
public void acquire(int permits) throws InterruptedException
//釋放一個許可,將其回傳給信號量,就如同車開走回傳一個車位,
public void release()
//獲取當前可用許可數
public void release(int permits)
//獲取當前可用許可數
public int availablePermits()
3.7 示例代碼
共有5個車位但是有100個執行緒進行占用,車停幾秒后會離開,釋放車位給其他執行緒,
package chapter02.semaphore;
import java.util.Random;
import java.util.concurrent.*;
public class SemaphoreTest {
private static ExecutorService executorService = Executors.newCachedThreadPool();
private static Random random = new Random();
//阻塞佇列
private static BlockingQueue<String> parks = new LinkedBlockingQueue<>(5);
public static void execute(Semaphore semaphore) {
//獲取一個亂數
long sleepTime = random.nextInt(10);
long threadId = Thread.currentThread().getId();
String park = null;
try {
/**
* 獲取許可,首先判斷semaphore內部的數字是否大于0,如果大于0,
* 才能獲得許可,然后將初始值5減去1,執行緒才會接著去執行;如果沒有
* 獲得許可(原因是因為已經有5個執行緒獲得到許可,semaphore內部的數字為0),
* 執行緒會阻塞直到已經獲得到許可的執行緒,呼叫release()方法,釋放掉許可,
* 也就是將semaphore內部的數字加1,該執行緒才有可能獲得許可,
*/
semaphore.acquire();
/**
* 對應的執行緒會到阻塞對,對應車輛去獲取到車位,如果沒有拿到一致阻塞,
* 直到其他車輛歸還車位,
*/
park = parks.take();
System.out.println("執行緒ID" + threadId + ",開始占用車位:" + park + ",當前剩余車位" + semaphore.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//睡眠隨機秒
Thread.sleep(sleepTime * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//歸還車位
parks.offer(park);
System.out.println("執行緒ID" + threadId + ",開始歸還車位:" + park + ",共占用" + sleepTime + "秒");
//執行緒釋放掉許可,通俗來將就是將semaphore內部的數字加1
semaphore.release();
}
public static void main(String[] args) {
//初始化執行緒數量
int threadNum = 100;
parks.offer("車位一");
parks.offer("車位二");
parks.offer("車位三");
parks.offer("車位四");
parks.offer("車位五");
// 初始化5個許可證
Semaphore semaphore = new Semaphore(5);
//可以提前釋放但是車位就會被多個執行緒同時占用
//semaphore.release(5);
for (int i = 0; i < threadNum; i++) {
executorService.submit(() -> {
execute(semaphore);
});
}
}
}
3.8 注意事項
即使創建信號量的時候,指定了信號量的大小 ,但是在通過 release()操作釋放信號量仍然能釋放超過配置的大小,也就有可能同時執行的執行緒數量比最開始設定的要大,沒有任何執行緒獲取信號量的時候,依然能夠釋放并且釋放的有效,
? 推薦的做法是一個執行緒先 acquire 然后 release,如果釋放執行緒和獲取執行緒不是同一個,那么最好保證這種對應關系,不要釋放過多的許可證,
4. Fork/Join
4.1 簡介
java下多執行緒的開發可以我們自己啟用多執行緒,執行緒池,還可以使用forkjoin,forkjoin可以讓我們不去了解諸如Thread,Runnable等相關的知識,只要遵循forkjoin的開發模式,就可以寫出很好的多執行緒并發程式
? Fork/Join框架是Java7提供了的一個用于并行執行任務的框架, 是一個把大任務分割成若干個小任務,最侄訓總每個小任務結果后得到大任務結果的框架,
? Fork/Join框架是一個實作了ExecutorService介面的多執行緒處理器,它可以把一個大的任務劃分為若干個小的任務并發執行,充分利用可用的資源,進而提高應用的執行效率,
Fork/Join框架簡化了并行程式的原因有 :
- 它簡化了執行緒的創建,在框架中執行緒是自動被創建和管理,
- 它自動使用多個處理器,因此程式可以擴展到使用可用處理器,
4.2 舉例
? 就像我需要處理一百萬行的excel,普通的處理是一個一個的excel進行處理,但是使用Fork/Join框架后的處理方式呢,加入我們定義100條資料為一個批次,那么Fork/Join就會拆分這個excel先到中間拆分成各有50萬的資料,然后還比100大就繼續拆分,不斷的細分,最后分到了每一個執行緒分得到了100條然后才開始執行,
4.3 分而治之
“分而治之” 一直是一個有效的處理大量資料的方法,著名的 MapReduce 也是采取了分而治之的思想,
? 簡單來說,就是如果你要處理1000個資料,但是你并不具備處理1000個資料的能力,那么你可以只處理其中的10個,然后,分階段處理100次,將100次的結果進行合成,那就是最終想要的對原始的1000個資料的處理結果,
? 同時forkjoin在處理某一類問題時非常的有用,哪一類問題?分而治之的問題,十大計算機經典演算法:快速排序、堆排序、歸并排序、二分查找、線性查找、深度優先、廣度優先、Dijkstra、動態規劃、樸素貝葉斯分類,有幾個屬于分而治之?3個,快速排序、歸并排序、二分查找,還有大資料中M/R都是,
4.3.1 分治法的設計思想
? 將一個難以直接解決的大問題,分割成一些規模較小的相同問題,以便各個擊破,分而治之,
4.3.2 分治策略
? 對于一個規模為n的問題,若該問題可以容易地解決(比如說規模n較小)則直接解決,否則將其分解為k個規模較小的子問題,這些子問題互相獨立且與原問題形式相同(子問題相互之間有聯系就會變為動態規范演算法),遞回地解這些子問題,然后將各子問題的解合并得到原問題的解,這種演算法設計策略叫做分治法,
4.4 Fork-Join原理
? Fork/Join實作了ExecutorService,所以它的任務也需要放在執行緒池中執行,它的不同在于它使用了作業竊取演算法,空閑的執行緒可以從滿負荷的執行緒中竊取任務來幫忙執行,
? 由于執行緒池中的每個執行緒都有一個佇列,而且執行緒間互不影響,那么執行緒每次都從自己的任務佇列的頭部獲取一個任務出來執行,如果某個時候一個執行緒的任務佇列空了,而其余的執行緒任務佇列中還有任務,那么這個執行緒就會從其他執行緒的任務佇列中取一個任務出來幫忙執行,就像偷取了其他人的作業一樣
4.4.1 任務分割和合并
Fork/Join框架的基本思想就是將一個大任務分解(Fork)成一系列子任務,子任務可以繼續往下分解,當多個不同的子任務都執行完成后,可以將它們各自的結果合并(Join)成一個大結果,最終合并成大任務的結果
我們看下面這個圖
? 首先main Task 先fork成 0,1兩個任務 接著,因為還是太大,繼續fork成 0-0,0-1,1-0,1-1 然后進行計算計算完成后進行join操作,0-0,1-1 join到0, 1-0,1-1 join到1 然后 0和1繼續join到mainTask,完成計算任務,
4.4.2 作業密取
即當前執行緒的Task已經全被執行完畢,則自動取到其他執行緒的Task池中取出Task繼續執行即如果一個作業執行緒沒有事情要做,它可以從其他仍然忙碌的執行緒竊取任務,
? ForkJoinPool中維護著多個執行緒(一般為CPU核數)在不斷地執行Task,每個執行緒除了執行自己職務內的Task之外,還會根據自己作業執行緒的閑置情況去獲取其他繁忙的作業執行緒的Task,如此一來就能能夠減少執行緒阻塞或是閑置的時間,提高CPU利用率,
4.5 相關子類
? 我們已經很清楚 Fork/Join 框架的需求了,那么我們可以思考一下,如果讓我們來設計一個 Fork/Join 框架,該如何設計?這個思考有助于你理解 Fork/Join 框架的設計,
? 第一步分割任務,首先我們需要有一個 fork 類來把大任務分割成子任務,有可能子任務還是很大,所以還需要不停的分割,直到分割出的子任務足夠小,
? 第二步執行任務并合并結果,分割的子任務分別放在雙端佇列里,然后幾個啟動執行緒分別從雙端佇列里獲取任務執行,子任務執行完的結果都統一放在一個佇列里,啟動一個執行緒從佇列里拿資料,然后合并這些資料,
Fork/Join 使用兩個類來完成以上兩件事情:
4.5.1 ForkJoinTask
? 我們要使用 ForkJoin 框架,必須首先創建一個 ForkJoin 任務,它提供在任務中執行 fork() 和 join() 操作的機制,通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,Fork/Join 框架提供了以下兩個子類:
4.5.1.1 RecursiveAction
用于沒有回傳結果的任務
4.5.1.2 RecursiveTask
用于有回傳結果的任務,
4.5.2 ForkJoinPool
? ForkJoinTask 需要通過 ForkJoinPool 來執行,任務分割出的子任務會添加到當前作業執行緒所維護的雙端佇列中,進入佇列的頭部,當一個作業執行緒的佇列里暫時沒有任務時,它會隨機從其他作業執行緒的佇列的尾部獲取一個任務
4.6 Fork/Join使用
? Task要通過ForkJoinPool來執行,使用submit 或 invoke 提交,兩者的區別是:invoke是同步執行,呼叫之后需要等待任務完成,才能執行后面的代碼;submit是異步執行,join()和get方法當任務完成的時候回傳計算結果
? 在我們自己實作的compute方法里,首先需要判斷任務是否足夠小,如果足夠小就直接執行任務,如果不足夠小,就必須分割成兩個子任務,每個子任務在呼叫invokeAll方法時,又會進入compute方法,看看當前子任務是否需要繼續分割成孫任務,如果不需要繼續分割,則執行當前子任務并回傳結果,使用join方法會等待子任務執行完并得到其結果,
4.6.1 任務的提交邏輯
fork/join其實大部分邏輯處理操作都集中在提交任務和處理任務這兩塊,了解任務的提交基本上后面就很容易理解了, fork/join提交任務主要分為兩種:
4.6.1.1 第一次提交到forkJoinPool
//創建初始化任務
SubmitTask submitTask = new SubmitTask(start, end);
//將初始任務扔進連接池中執行
forkJoinPool.invoke(submitTask);
4.6.1.2 任務切分之后的提交
//沒有達到閾值 計算一個中間值
long mid = (start + end) / 2;
//拆分 左邊的
SubmitTask left = new SubmitTask(start, mid);
//拆分右邊的
SubmitTask right = new SubmitTask(mid + 1, end);
//添加到任務串列
invokeAll(left, right);
4.6.1.3 合并任務
//合并結果并回傳
return left.join() + right.join();
4.6.1.4 代碼案例
package chapter02.forkjoin;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* 計算 0-10000 階乘
*/
public class SubmitTask extends RecursiveTask<Long> {
/**
* 起始值
*/
private long start;
/**
* 結束值
*/
private long end;
/**
* 閾值
*/
private long threshold = 10L;
public SubmitTask(long start, long end) {
this.start = start;
this.end = end;
}
/**
* 計算邏輯
* 進行任務的拆分 以及 達到閾值的計算
*
* @return
*/
@Override
protected Long compute() {
//校驗是否達到了閾值
if (isLessThanThreshold()) {
//處理并回傳結果
return handle();
} else {
//沒有達到閾值 計算一個中間值
long mid = (start + end) / 2;
//拆分 左邊的
SubmitTask left = new SubmitTask(start, mid);
//拆分右邊的
SubmitTask right = new SubmitTask(mid + 1, end);
//添加到任務串列
invokeAll(left, right);
//合并結果并回傳
return left.join() + right.join();
}
}
/**
* 處理的任務
*
* @return
*/
public Long handle() {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return sum;
}
/*是否達到了閾值*/
private boolean isLessThanThreshold() {
return end - start <= threshold;
}
/**
* forkJoin 方式呼叫
*
* @param start
* @param end
*/
public static void forkJoinInvok(long start, long end) {
long sum = 0;
long currentTime = System.currentTimeMillis();
//創建ForkJoinPool 連接池
ForkJoinPool forkJoinPool = new ForkJoinPool();
//創建初始化任務
SubmitTask submitTask = new SubmitTask(start, end);
//將初始任務扔進連接池中執行
forkJoinPool.invoke(submitTask);
//forkJoinPool.submit(submitTask);
// System.out.println("異步方式,任務結束才會呼叫該方法,當前耗時"+(System.currentTimeMillis() - currentTime));
//等待回傳結果
sum = submitTask.join();
//forkjoin呼叫方式耗時
System.out.println("forkJoin呼叫:result:" + sum);
System.out.println("forkJoin呼叫耗時:" + (System.currentTimeMillis() - currentTime));
}
/**
* 普通方式呼叫
*
* @param start
* @param end
*/
public static void normalInvok(long start, long end) {
long sum = 0;
long currentTime = System.currentTimeMillis();
for (long i = start; i <= end; i++) {
sum += i;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//普通調動方式耗時
System.out.println("普通呼叫:result:" + sum);
System.out.println("普通呼叫耗時:" + (System.currentTimeMillis() - currentTime));
}
public static void main(String[] args) {
//起始值的大小
long start = 0;
//結束值的大小
long end = 10000;
//forkJoin 呼叫
forkJoinInvok(start, end);
System.out.println("========================");
//普通呼叫
normalInvok(start, end);
}
}
本文由
傳智教育博學谷狂野架構師
教研團隊發布,如果本文對您有幫助,歡迎
關注
和點贊
;如果您有任何建議也可留言評論
或私信
,您的支持是我堅持創作的動力,轉載請注明出處!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/550292.html
標籤:Java
下一篇:Java中生成二維碼