在日常的專案開發中,我們會經常遇到通過多執行緒執行程式并需要回傳執行結果的場景,下面我們就對獲取多執行緒回傳結果的幾種方式進行一下歸納,并進行簡要的分析與總結,
一、Thread.join
在一些簡單的應用場景中我們可以使用執行緒本身提供的join方法,我們知道join方法的目的是讓一個執行緒等待另一個執行緒結束后才能執行,利用此原理我們可以設定一個監控執行緒用來等待程式執行緒執行完畢后輸出回傳結果,下面我們看下具體示例代碼
首先定義一個結果物體類
public class Result { private String value; public String getValue() { return value; } public void setValue(String value) { this.value =https://www.cnblogs.com/dafanjoy/archive/2021/04/28/ value; } }
定義作業執行緒,模擬程式執行并輸出執行緒執行結果
public class WorkThread extends Thread { private Result result ; public void init(Result result) { this.result = result; } public void run() { try { Thread.sleep(1000*10);//模擬程式執行 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } result.setValue("執行緒執行完畢,輸出結果"); } }
主執行緒等待作業執行緒執行,并獲取作業執行緒的執行成果
public class MainThread { public static void main(String[] args) throws InterruptedException { Result result = new Result(); WorkThread workThread = new WorkThread(); workThread.init(result); System.out.println("執行緒啟動"); workThread.start(); System.out.println("執行緒等待"); // 等待work執行緒運行完再繼續運行 workThread.join(); System.out.println("執行緒執行結果:"+result.getValue()); } }
輸出結果
執行緒啟動
執行緒等待
執行緒執行結果:執行緒執行完畢,輸出結果
以上代碼通過Thread.join的方式,模擬了一個最基本的獲取執行緒執行結果場景,采用Thread.join的方式雖然使用方便,但這種原生的方式只適用于一些簡單的應用場景中,其主要存在以下問題:
1、獲取多個執行緒回傳結果時較為繁瑣,需要自己手動實作;
2、與執行緒池無法配合使用;
3、作業執行緒內部執行復雜度與耗時不確定,程式需要額外完善;
4、本質上還是同步回傳結果,主執行緒阻塞;
二、CountDownLatch
CountDownLatch做為jdk提供的多執行緒同步工具,CountDownLatch其實本質上可以看做一個執行緒計數器,統計多個執行緒執行完成的情況,適用于控制一個或多個執行緒等待,直到所有執行緒都執行完畢的場景,因此我們可以利用其功能特點實作獲取多個執行緒的執行結果,一定程度上彌補了Thread.join的不足,代碼示例如下:
作業執行緒
public class WorkThread extends Thread { private Vector<Result> vectors ; private CountDownLatch countDownLatch; public WorkThread(CountDownLatch countDownLatch) { this.countDownLatch=countDownLatch; } public void init(Vector<Result> vectors) { this.vectors = vectors; } public void run() { try { Thread.sleep(1000*3);//模擬程式執行 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } Result result = new Result(); result.setValue(Thread.currentThread().getName()+"執行緒執行完畢,輸出結果"); vectors.add(result);//結果放入Vector中 countDownLatch.countDown(); } }
主執行緒
public class MainThread { public static void main(String[] args) throws InterruptedException { Vector<Result> vectors = new Vector<Result>();//定義一個Vector做為存盤回傳結果的容器; final CountDownLatch countDownLatch = new CountDownLatch(5); // 啟動多個作業執行緒 for (int i = 0; i < 5; i++) { WorkThread workThread = new WorkThread(countDownLatch); workThread.init(vectors); workThread.start(); } System.out.println("主執行緒等待作業執行緒執行"); countDownLatch.await(); for (int i=0; i<vectors.size(); i++) { System.out.println(vectors.get(i).getValue()); } } }
輸出結果
主執行緒等待作業執行緒執行 Thread-0執行緒執行完畢,輸出結果 Thread-1執行緒執行完畢,輸出結果 Thread-2執行緒執行完畢,輸出結果 Thread-4執行緒執行完畢,輸出結果 Thread-3執行緒執行完畢,輸出結果
通過利用jdk的多執行緒工具類CountDownLatch,我們可以等待多個執行緒執行完畢后獲取結果,但這種方式局限性較多,如果你的應用場景中啟動的執行緒次數是固定的且需要等待執行結果全部回傳后統一處理,使用CountDownLatch是一個不錯的選擇,
三、Future
1、Future與FutureTask
使用Future,包括 FutureTask、CompletionService、CompletableFuture等
首先我們使用Future配合執行緒池,獲取執行緒池執行執行緒的回傳結果
定義一個實作Callable介面的作業執行緒
public class WorkThread implements Callable<Result> { public Result call() throws Exception { Thread.sleep(5000); Result result = new Result(); result.setValue(Thread.currentThread().getName()+"執行緒執行完畢,輸出結果"); return result; } }
主執行緒
public class MainThread { public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { ExecutorService taskPool = new ThreadPoolExecutor(5, 15, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.CallerRunsPolicy()); Future<Result> future = taskPool.submit(new WorkThread()); System.out.println("執行緒池執行作業執行緒"); Result result = future.get();//注意這里get操作是阻塞,future仍屬于同步回傳,主執行緒需要阻塞等待結果回傳 //result = future.get(3,TimeUnit.SECONDS);//設定阻塞超時時間 System.out.println(result.getValue()); } }
Future與FutureTask實作方式基本類似,FutureTask其實是對Futue的進一步封裝,通過上面的代碼我們可以看到Future能夠配合ExecutorService 執行緒池來獲取執行緒執行的結果,使用起來也較為方便,同時可以設定獲取結果的超時時間,避免長時間阻塞帶來的問題,基本上能夠滿足大部分應用場景下的要求, 但Future獲取結果的get方法是阻塞,本質上是個同步回傳,如果希望獲取結果所在執行緒不阻塞,需要引入其他模式相互配合,這個我們下面會說到,
2、CompletionService
CompletionService可以看作FutureTask的一個進階版,通過FutureTask+阻塞佇列的方式能夠按照執行緒執行完畢的順序獲取執行緒執行結果,起到聚合的目的,這個其實跟CountDownLatch差不多,如果你需要執行的執行緒次數是固定的且需要等待執行結果全部回傳后統一處理,可以使用CompletionService,下面我們通過示例代碼進行演示
同上先實作一個作業執行緒,這次我們為了能體現出結果輸出的順序,在作業執行緒內部定義一個編號,編號為偶數的執行緒阻塞一定時間
public class WorkThread implements Callable<Result>{ int num;//執行緒編號 public WorkThread(int num) { this.num=num; } public Result call() throws InterruptedException { int count = num; if(count%2==0) {//編號為偶數的執行緒阻塞3秒鐘 Thread.sleep(3*1000); } Result result = new Result(); result.setValue(num+"號執行緒執行完畢,輸出結果"); return result; } }
主執行緒中啟動十個執行緒
public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService exec = new ThreadPoolExecutor(10, 20, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); //定義一個阻塞佇列 BlockingQueue<Future<Result>> futureQueue = new LinkedBlockingQueue<Future<Result>>(); //傳入ExecutorService與阻塞佇列,構造一個completionService CompletionService<Result> completionService = new ExecutorCompletionService<Result>( exec,futureQueue); for(int i=0;i<10;i++) { completionService.submit(new WorkThread(i)); } for(int i=0;i<10;i++) { Result res = completionService.take().get();//注意阻塞佇列take操作,如果獲取不到資料時處于阻塞狀態的 System.out.println(new Date()+ "--"+res.getValue()); } } }
輸出結果如下,可以看到奇數編號的執行緒結果優先回傳,偶數編號的執行緒由于阻塞3秒后才輸出回傳結果,符合程式預期;
Sun Apr 11 18:38:46 CST 2021--3號執行緒執行完畢,輸出結果 Sun Apr 11 18:38:46 CST 2021--1號執行緒執行完畢,輸出結果 Sun Apr 11 18:38:46 CST 2021--7號執行緒執行完畢,輸出結果 Sun Apr 11 18:38:46 CST 2021--9號執行緒執行完畢,輸出結果 Sun Apr 11 18:38:46 CST 2021--5號執行緒執行完畢,輸出結果 Sun Apr 11 18:38:49 CST 2021--2號執行緒執行完畢,輸出結果 Sun Apr 11 18:38:49 CST 2021--4號執行緒執行完畢,輸出結果 Sun Apr 11 18:38:49 CST 2021--0號執行緒執行完畢,輸出結果 Sun Apr 11 18:38:49 CST 2021--8號執行緒執行完畢,輸出結果 Sun Apr 11 18:38:49 CST 2021--6號執行緒執行完畢,輸出結果
上面主執行緒代碼中的completionService.take().get()操作,當獲取不到資料也就是當偶數編號執行緒休眠時仍然會產生阻塞, 其實我們只要對上面代碼進行稍微改造就能避免主執行緒的阻塞,這也就引出了我們下面要說的生產者與消費者模式;
四、生產者消費者模式
上面我們列舉的幾種獲取多執行緒執行結果的方式,都是通過不同技術方法來實作的,而生產者消費者模式本身跟你運用的技術實作沒有太多關系,接觸過多執行緒開發的同學應該都有所了解;
生產者消費者模式如下圖所示
生產者消費者模式是一種能夠解耦與同步生產執行緒、消費執行緒、資料集合的多執行緒設計模式,一個或一組生產者執行緒負責向資料佇列中生產資料,也就是執行緒執行結果;另外一個或一組消費者執行緒負責消費處理資料佇列中的資料,生產者執行緒與消費者執行緒相互之間并沒有直接的關聯,資料的互動都是通過資料佇列,通過這種模式能夠很好的在一定程度上解決多執行緒開發中存在執行緒同步與安全的問題,同時程式也會看起來更加清晰與方便理解;
當然一個完善的生產者消費者模式我們需要考慮很多其他方面, 但最關鍵的還是以下兩個要素:
1、執行緒安全,生產者與消費者分別執行讀寫操作,特別是在多個生產執行緒與消費執行緒時,一定會存在資料讀寫的并發操作,所以資料佇列一定要保證執行緒安全;
2、生產與消費的協調,資料佇列滿時生產執行緒是否停止寫入,資料佇列空時消費執行緒是否停止消費,這里一方面需要結合你的應用場景,同時也是需要考慮不必要的性能浪費;
下面看下基本的代碼實作
首先定義一個全域的資料佇列,這里我用的JDK提供的阻塞佇列ArrayBlockingQueue,這里同樣也直接可以上面講到的completionService,當然也可以用其他執行緒安全的資料集合或者自己定義實作,但要注意無論使用哪種都要注意上面的兩個關鍵要素,平常使用中JDK封裝的阻塞佇列已經基本滿足要求;
public class Container { public static ArrayBlockingQueue<Result> arrayBlockingQueue = new ArrayBlockingQueue<>(100);//這里最好根據系統負載量評估一個閾值,避免OOM問題 }
生產者執行緒實作,佇列資料插入時是采用put還是offer結合應用場景調整
public class ProducerThread extends Thread { public void run() { try { Thread.sleep(1000*3);//模擬程式執行 Result result = new Result(); result.setValue(Thread.currentThread().getName()+"執行緒執行完畢,輸出結果"); Container.arrayBlockingQueue.put(result);//超過阻塞佇列最大閾值時阻塞,一直阻塞 // if(!Container.arrayBlockingQueue.offer(result, 5, TimeUnit.SECONDS)) {//規定時間內資料入隊失敗 // System.err.println("資料入隊失敗"); // } } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
消費者執行緒實作,消費者執行緒是常駐執行緒,佇列中沒有資料時就執行緒阻塞
public class ConsumerThread extends Thread { public void run() { while (!this.isInterrupted()) { try { Result result = Container.arrayBlockingQueue.take();//有資料就消費,沒有就阻塞等待 System.out.println(result.getValue()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
主執行緒中同時啟動生產執行緒與消費執行緒
public class MainThread { public static void main(String[] args) { ExecutorService exec = new ThreadPoolExecutor(10, 20, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); for(int i=0;i<100;i++) {//使用執行緒池模擬生成者生產資料 exec.execute(new ProducerThread()); } for(int i=0;i<2;i++) {//啟動兩個消費者執行緒 new ConsumerThread().start(); } } }
消費者執行緒中會輪詢獲取生產者執行緒執行并放到阻塞佇列中的結果
pool-1-thread-13執行緒執行完畢,輸出結果 pool-1-thread-2執行緒執行完畢,輸出結果 pool-1-thread-1執行緒執行完畢,輸出結果 pool-1-thread-10執行緒執行完畢,輸出結果 pool-1-thread-9執行緒執行完畢,輸出結果 pool-1-thread-15執行緒執行完畢,輸出結果 pool-1-thread-4執行緒執行完畢,輸出結果 pool-1-thread-5執行緒執行完畢,輸出結果 pool-1-thread-8執行緒執行完畢,輸出結果 pool-1-thread-12執行緒執行完畢,輸出結果 pool-1-thread-16執行緒執行完畢,輸出結果 ..................................................... .....................................................
生產者消費者模式是程式開發當中一種十分常見且易于理解與掌握的開發設計模式,且適用場景廣泛,希望大家都能夠深入理解與掌握
五、異步回呼
上面列舉的獲取執行緒執行結果的方法都存在一個共性的問題,就是在等待結果的回傳程序中,主執行緒或者消費者執行緒都是需要阻塞或輪詢等待的,但在一些應用場景下我們是希望執行緒執行的程序中,程式該干嘛干嘛,繼續向下執行,等到結果回傳了再通過回呼來通知,這就是異步回呼的必要性,實作異步回呼思路我這里列舉兩種,一種是多執行緒與回呼,第二種JDK1.8中新加入了一個實作類CompletableFuture,通過這兩種都能夠實作異步獲取執行緒執行結果的目標
1、多執行緒與回呼
這里其實是在多執行緒中通過回呼的方式把結果回傳的方式,我們看下具體實作
首先宣告一個回呼介面
public interface CallBack { void notice(Result result); }
定義作業執行緒,在建構式中傳入回呼介面的實作物件
public class WorkThread implements Runnable{ int num;//執行緒編號 CallBack callBack; public WorkThread(CallBack callBack, int num) { this.num=num; this.callBack = callBack; } @Override public void run() { // TODO Auto-generated method stub try { Thread.sleep((10-num)*1000);//模擬程式運行時間,倒序輸出 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } Result result = new Result(); result.setValue(num+"號執行緒執行完畢,輸出結果"); callBack.notice(result); } }
呼叫方及回呼方法具體實作
public class MainThread implements CallBack { public void run(int num) { WorkThread workThread = new WorkThread(this,num); new Thread(workThread).start(); } @Override public void notice(Result result) { System.out.println("回傳結果:"+result.getValue()); } }
程式執行及輸出
public class App { public static void main(String[] args) { MainThread mainThread = new MainThread(); for(int i=0;i<10;i++) { mainThread.run(i); } System.out.println("繼續執行,表示異步操作"); } }
輸出結果
繼續執行,表示異步操作
回傳結果:9號執行緒執行完畢,輸出結果
回傳結果:8號執行緒執行完畢,輸出結果
回傳結果:7號執行緒執行完畢,輸出結果
回傳結果:6號執行緒執行完畢,輸出結果
回傳結果:5號執行緒執行完畢,輸出結果
回傳結果:4號執行緒執行完畢,輸出結果
回傳結果:3號執行緒執行完畢,輸出結果
回傳結果:2號執行緒執行完畢,輸出結果
回傳結果:1號執行緒執行完畢,輸出結果
回傳結果:0號執行緒執行完畢,輸出結果
多執行緒+回呼也是一種常見的異步回呼實作方式,但需要注意的是我們自己手動實作異步回呼時還是有很多細節需要考慮完善的,如例外、超時、執行緒開辟與管理等,這里就不再過多的展開,
2、CompletableFuture
JDK1.8中新增的CompletableFuture中通過函式式的編程方法提供了等同于異步回呼的能力,下面我們進行具體實作
作業執行緒
public class WorkThread { public static Result call(int num) throws InterruptedException { Thread.sleep(5*1000);//模擬程式執行時間 Result result = new Result(); result.setValue(String.valueOf(num)); return result; } }
主執行緒
public class MainThread {
public static void main(String[] args) { List<String> reslist = new ArrayList<String>(); ExecutorService exs = Executors.newFixedThreadPool(10);//定義一個執行緒池 List<CompletableFuture<Result>> futureList = new ArrayList<>(); try { for(int i=0;i<10;i++) { final int k = i; CompletableFuture<Result> future=CompletableFuture.supplyAsync(()->{ try { return WorkThread.call(k); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } return null; },exs).thenApply(e->mul(e)).whenComplete((v, e) -> {//thenApply 里面執行就是回呼函式CallBack System.out.println("執行緒"+k+"完成! 結果:"+v.getValue()+",例外 :"+e+","+new Date()); reslist.add(v.getValue()); }); futureList.add(future);//聚合回傳結果 } System.out.println("繼續執行,表示異步操作"); }catch (Exception e) { // TODO: handle exception } } public static Result mul(Result result){ try { Integer val = Integer.valueOf(result.getValue())*2; result.setValue(val.toString()); } catch (Exception e) { e.printStackTrace(); } return result; } }
輸出結果如下,可以看到主執行緒沒有等待執行緒執行結果回傳,繼續向下執行
直接輸出,標識異步操作 執行緒9完成! 結果:18,例外 :null,Sun Apr 18 17:27:29 CST 2021 執行緒2完成! 結果:4,例外 :null,Sun Apr 18 17:27:29 CST 2021 執行緒5完成! 結果:10,例外 :null,Sun Apr 18 17:27:29 CST 2021 執行緒1完成! 結果:2,例外 :null,Sun Apr 18 17:27:29 CST 2021 執行緒6完成! 結果:12,例外 :null,Sun Apr 18 17:27:29 CST 2021 執行緒3完成! 結果:6,例外 :null,Sun Apr 18 17:27:29 CST 2021 執行緒0完成! 結果:0,例外 :null,Sun Apr 18 17:27:29 CST 2021 執行緒4完成! 結果:8,例外 :null,Sun Apr 18 17:27:29 CST 2021 執行緒8完成! 結果:16,例外 :null,Sun Apr 18 17:27:29 CST 2021 執行緒7完成! 結果:14,例外 :null,Sun Apr 18 17:27:29 CST 2021
CompletableFuture中提供了豐富的API實作,提供了諸如聚合計算等一整套功能,這里就不做太多表述,有興趣的小伙伴可以去多做了解,
六、總結
以上就是針對如何獲取多執行緒執行結果進行的方法匯總與簡要分析,雖然方法手段多樣,但本質上都還是圍繞執行緒同步、資料共享、異步回呼等幾個思路來進行實作的,在實際的日常開發中的應用,大家還是要結合業務場景具體問題具體分析,一方面固然是要注意程式性能的高效與實作方式的優雅,但另一方面也要注意避免簡單的問題復雜化,反而過猶不及,特別是在多執行緒的開發程序中,多執行緒的使用并不就等同于處理效率的提高,要不斷的深入學習與理解,結合應用場景,多分析,多總結,在實踐與積累的程序中逐步提高,真正領會,在此我也希望與大家相互勉勵,共同進步,希望本文對大家能有所幫助,其中如有不足與不正確的地方還望指正與海涵,十分感謝,
關注微信公眾號,查看更多技術文章,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/280933.html
標籤:其他