主頁 > 後端開發 > 淺談errgroup的使用以及原始碼分析

淺談errgroup的使用以及原始碼分析

2023-04-27 07:29:54 後端開發

本文講解的是golang.org/x/sync這個包中的errgroup

1、errgroup 的基礎介紹

學習過 Go 的朋友都知道 Go 實作并發編程是比較容易的事情,只需要使用go關鍵字就可以開啟一個 goroutine,那對于并發場景中,如何實作goroutine的協調控制呢?常見的一種方式是使用sync.WaitGroup 來進行協調控制,

使用過sync.WaitGroup 的朋友知道,sync.WaitGroup 雖然可以實作協調控制,但是不能傳遞錯誤,那該如何解決呢?聰明的你可能馬上想到使用 chan 或者是 context來傳遞錯誤,確實是可以的,那接下來,我們一起看看官方是怎么實作上面的需求的呢?

1.1 errgroup的安裝

安裝命令:

go get golang.org/x/sync

//下面的案例是基于v0.1.0 演示的
go get golang.org/x/[email protected]

1.2 errgroup的基礎例子

這里我們需要請求3個url來獲取資料,假設請求url2時報錯,url3耗時比較久,需要等一秒,

package main

import (
	"errors"
	"fmt"
	"golang.org/x/sync/errgroup"
	"strings"
	"time"
)

func main()  {
	queryUrls := map[string]string{
		"url1": "http://localhost/url1",
		"url2": "http://localhost/url2",
		"url3": "http://localhost/url3",
	}

	var eg errgroup.Group
	var results []string

	for _, url := range queryUrls {
		url := url
		eg.Go(func() error {
			result, err := query(url)
			if err != nil {
				return err
			}
			results = append(results, fmt.Sprintf("url:%s -- ret: %v", url, result))
			return nil
		})
	}
	
  // group 的wait方法,等待上面的 eg.Go 的協程執行完成,并且可以接受錯誤
	err := eg.Wait()
	if err != nil {
		fmt.Println("eg.Wait error:", err)
		return
	}

	for k, v := range results {
		fmt.Printf("%v ---> %v\n", k, v)
	}
}

func query(url string) (ret string, err error) {
	// 假設這里是發送請求,獲取資料
	if strings.Contains(url, "url2") {
		// 假設請求 url2 時出現錯誤
		fmt.Printf("請求 %s 中....\n", url)
		return "", errors.New("請求超時")
	} else if strings.Contains(url, "url3") {
		// 假設 請求 url3 需要1秒
		time.Sleep(time.Second*1)
	}
	fmt.Printf("請求 %s 中....\n", url)
	return "success", nil
}

執行結果:

請求 http://localhost/url2 中....
請求 http://localhost/url1 中....
請求 http://localhost/url3 中....
eg.Wait error: 請求超時

果然,當其中一個goroutine出現錯誤時,會把goroutine中的錯誤傳遞出來,

我們自己運行一下上面的代碼就會發現這樣一個問題,請求 url2 出錯了,但是依舊在請求 url3 ,因為我們需要聚合 url1、url2、url3 的結果,所以當其中一個出現問題時,我們是可以做一個優化的,就是當其中一個出現錯誤時,取消還在執行的任務,直接回傳結果,不用等待任務執行結果,

那應該如何做呢?

這里假設 url1 執行1秒,url2 執行報錯,url3執行3秒,所以當url2報錯后,就不用等url3執行結束就可以回傳了,

package main

import (
	"context"
	"errors"
	"fmt"
	"golang.org/x/sync/errgroup"
	"strings"
	"time"
)

func main()  {
	queryUrls := map[string]string{
		"url1": "http://localhost/url1",
		"url2": "http://localhost/url2",
		"url3": "http://localhost/url3",
	}

	var results []string
	ctx, cancel := context.WithCancel(context.Background())
	eg, errCtx := errgroup.WithContext(ctx)

	for _, url := range queryUrls {
		url := url
		eg.Go(func() error {
			result, err := query(errCtx, url)
			if err != nil {
        //其實這里不用手動取消,看完原始碼就知道為啥了
				cancel()
				return err
			}
			results = append(results, fmt.Sprintf("url:%s -- ret: %v", url, result))
			return nil
		})
	}

	err := eg.Wait()
	if err != nil {
		fmt.Println("eg.Wait error:", err)
		return
	}

	for k, v := range results {
		fmt.Printf("%v ---> %v\n", k, v)
	}
}


func query(errCtx context.Context, url string) (ret string, err error) {
	fmt.Printf("請求 %s 開始....\n", url)
	// 假設這里是發送請求,獲取資料
	if strings.Contains(url, "url2") {
		// 假設請求 url2 時出現錯誤
		time.Sleep(time.Second*2)
		return "", errors.New("請求出錯")


	} else if strings.Contains(url, "url3") {
		// 假設 請求 url3 需要1秒
		select {
		case <- errCtx.Done():
			ret, err = "", errors.New("請求3被取消")
			return
		case <- time.After(time.Second*3):
			fmt.Printf("請求 %s 結束....\n", url)
			return "success3", nil
		}
	} else {
		select {
		case <- errCtx.Done():
			ret, err = "", errors.New("請求1被取消")
			return
		case <- time.After(time.Second):
			fmt.Printf("請求 %s 結束....\n", url)
			return "success1", nil
		}
	}

}

執行結果:

請求 http://localhost/url2 開始....
請求 http://localhost/url3 開始....
請求 http://localhost/url1 開始....
請求 http://localhost/url1 結束....
eg.Wait error: 請求出錯

2、errgroup原始碼分析

看了上面的例子,我們對errgroup有了一定了解,接下來,我們一起看看errgroup做了那些封裝,

2.1 errgroup.Group

errgroup.Group原始碼如下:

// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
  // context 的 cancel 方法
	cancel func()

	wg sync.WaitGroup
	
  //傳遞信號的通道,這里主要是用于控制并發創建 goroutine 的數量
  //通過 SetLimit 設定過后,同時創建的goroutine 最大數量為n
	sem chan token
	
  // 保證只接受一次錯誤
	errOnce sync.Once
  // 最先回傳的錯誤
	err     error
}

看結構體中的內容,發現比原生的sync.WaitGroup多了下面的內容:

  • cancel func()
  • sem chan token
  • errOnce sync.Once
  • err error

2.2 WithContext 方法

// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
	ctx, cancel := context.WithCancel(ctx)
	return &Group{cancel: cancel}, ctx
}

方法邏輯還是比較簡單的,主要做了兩件事:

  • 使用contextWithCancel()方法創建一個可取消的Context
  • context.WithCancel(ctx)創建的 cancel賦值給 Group中的cancel

2.3 Go

1.2 最后一個例子說,不用手動去執行 cancel 的原因就在這里,

g.cancel() //這里就是為啥不用手動執行 cancel的原因

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) {
	if g.sem != nil {
    //往 sem 通道中發送空結構體,控制并發創建 goroutine 的數量
		g.sem <- token{}
	}

	g.wg.Add(1)
	go func() {
    // done()函式的邏輯就是當 f 執行完后,從 sem 取一條資料,并且 g.wg.Done()
		defer g.done()

		if err := f(); err != nil {
			g.errOnce.Do(func() { // 這里就是確保 g.err 只被賦值一次
				g.err = err
				if g.cancel != nil {
					g.cancel() //這里就是為啥不用手動執行 cancel的原因
				}
			})
		}
	}()
}

2.4 TryGo

看注釋,知道此函式的邏輯是:當正在執行的goroutine數量小于通過SetLimit()設定的數量時,可以啟動成功,回傳 true,否則啟動失敗,回傳false,

// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
	if g.sem != nil {
		select {
		case g.sem <- token{}: // 當g.sem的緩沖區滿了過后,就會執行default,也代表著未啟動成功
			// Note: this allows barging iff channels in general allow barging.
		default:
			return false
		}
	}
  
  //----主要看上面的邏輯,下面的邏輯和Go中的一樣-------

	g.wg.Add(1)
	go func() {
		defer g.done()

		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
	return true
}

2.5 Wait

代碼邏輯很簡單,這里主要注意這里:

//我看這里的時候,有點疑惑,為啥這里會去呼叫 cancel()方法呢?
//這里是為了代碼的健壯性,用 context.WithCancel() 創建得到的 cancel,在代碼執行完畢之前取消是一個好習慣
g.cancel()

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
  g.wg.Wait() //通過 g.wg.Wait() 阻塞等待所有的 goroutine 執行完
	if g.cancel != nil {
    //我看這里的時候,有點疑惑,為啥這里會去呼叫 cancel()方法呢?
    //這里是為了代碼的健壯性,用 context.WithCancel() 創建得到的 cancel,在代碼執行完畢之前取消是一個好習慣
 		g.cancel()
	}
	return g.err
}

2.6 SetLimit

看代碼的注釋,我們知道:SetLimit的邏輯主要是限制同時執行的 goroutines 的數量為n,當n小于0時,沒有限制,如果有運行的 goroutine,呼叫此方法會報錯,

// SetLimit limits the number of active goroutines in this group to at most n.
// A negative value indicates no limit.
//
// Any subsequent call to the Go method will block until it can add an active
// goroutine without exceeding the configured limit.
//
// The limit must not be modified while any goroutines in the group are active.
func (g *Group) SetLimit(n int) {
	if n < 0 {
		g.sem = nil
		return
	}
	if len(g.sem) != 0 {
		panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
	}
	g.sem = make(chan token, n)
}

3、errgroup 容易忽視的坑

這個坑是看別人的記錄看到的,對errgroup不太熟悉時,是不小心確實容易掉進去,所以摘抄了過來,如果侵權,請聯系洗掉,謝謝!

原文鏈接:并發編程包之 errgroup

需求:

開啟多個Goroutine去快取中設定資料,同時開啟一個Goroutine去異步寫日志,很快我的代碼就寫出來了:

package main

import (
	"context"
	"errors"
	"fmt"
	"golang.org/x/sync/errgroup"
	"time"
)

func main()  {
	g, ctx := errgroup.WithContext(context.Background())

	// 單獨開一個協程去做其他的事情,不參與waitGroup
	go WriteChangeLog(ctx)

	for i:=0 ; i< 3; i++{
		g.Go(func() error {
			return errors.New("訪問redis失敗\n")
		})
	}
	if err := g.Wait();err != nil{
		fmt.Printf("appear error and err is %s",err.Error())
	}
	time.Sleep(1 * time.Second)
}

func WriteChangeLog(ctx context.Context) error {
	select {
	case <- ctx.Done():
		return nil
	case <- time.After(time.Millisecond * 50):
		fmt.Println("write changelog")
	}
	return nil
}

結果:

appear error and err is 訪問redis失敗

代碼看著沒有問題,但是日志一直沒有寫入,這是為什么呢?

其實原因就是因為這個ctxerrgroup.WithContext方法回傳的一個帶取消的ctx,我們把這個ctx當作父context傳入WriteChangeLog方法中了,如果errGroup取消了,也會導致背景關系的context都取消了,所以WriteChangelog方法就一直執行不到,

這個點是我們在日常開發中想不到的,所以需要注意一下~,

解決方法:

解決方法就是在 go WriteChangeLog(context.Background()) 傳入新的ctx

參考資料:

八. Go并發編程--errGroup

并發編程包之 errgroup

上面這個案例中講了一個容易忽視的坑,大家可以看看

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/551290.html

標籤:其他

上一篇:boot-admin整合Quartz實作動態管理定時任務

下一篇:返回列表

標籤雲
其他(158174) Python(38107) JavaScript(25394) Java(18001) C(15217) 區塊鏈(8260) C#(7972) AI(7469) 爪哇(7425) MySQL(7148) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5870) 数组(5741) R(5409) Linux(5329) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4562) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2431) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1960) Web開發(1951) HtmlCss(1927) python-3.x(1918) 弹簧靴(1913) C++(1912) xml(1889) PostgreSQL(1874) .NETCore(1855) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • 淺談errgroup的使用以及原始碼分析

    本文講解的是golang.org/x/sync這個包中的errgroup 1、errgroup 的基礎介紹 學習過 Go 的朋友都知道 Go 實作并發編程是比較容易的事情,只需要使用go關鍵字就可以開啟一個 goroutine。那對于并發場景中,如何實作goroutine的協調控制呢?常見的一種方式 ......

    uj5u.com 2023-04-27 07:29:54 more
  • boot-admin整合Quartz實作動態管理定時任務

    淄博燒烤爆紅出了圈,當你坐在八大局的燒烤攤,面前是火爐、烤串、小餅和蘸料,音樂響起,啤酒倒滿,燒烤靈魂的party即將開場的時候,你系統中的Scheduler(除錯器),也自動根據設定的Trigger(觸發器),從容優雅的啟動了一系列的Job(后臺定時任務)。作業一切早有安排,又何須費心勞神呢?因為 ......

    uj5u.com 2023-04-27 07:29:47 more
  • Android監聽事件

    監聽事件 ? 監聽事件機制由事件源,事件和事件監聽器三類物件組成,事件源一般就是activity中的UI控制元件。 下面參考別人整理的圖來更加形象的表達這些關系。 ? 事件監聽機制的意義就是讓事件源的行為委托給事件監聽器,讓監聽器控制事件的發生。 ? 1.實作監聽事件的方法 [ ] 通過內部類實作 [ ......

    uj5u.com 2023-04-27 07:29:38 more
  • Django筆記三十一之全域例外處理

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記三十一之全域例外處理 這一篇筆記介紹 Django 的全域例外處理。 當我們在處理一個 request 請求時,會盡可能的對介面資料的格式,內部呼叫的函式做一些例外處理,但可能還是會有一些意想不到的漏網之魚,造成程式的例外導致不能正常運行 ......

    uj5u.com 2023-04-27 07:29:30 more
  • elastic-job原始碼(1)- job自動裝配

    版本:3.1.0-SNAPSHOT git地址:https://github.com/apache/shardingsphere-elasticjob Maven 坐標 <dependency> <groupId>org.apache.shardingsphere.elasticjob</group ......

    uj5u.com 2023-04-27 07:29:24 more
  • java 多執行緒的start()和run()的理解

    run()方法中是各個執行緒要執行的具體內容。所以當一個執行緒直接呼叫run()時那么直接開始執行方法體,這是在main執行緒中的多個執行緒只能時按照順序的等待前面的執行緒結束run()方法的執行。 而呼叫start方法只是執行緒進入準備階段(Ready),并沒有真正執行,這需要JVM進行分配時間片進行輪轉執行緒 ......

    uj5u.com 2023-04-27 07:29:20 more
  • 面向物件可視化工具:UML類圖

    1. UML類圖 UML(Unified Modeling Language,統一建模語言),用來描述軟體模型和架構的圖形化語言。 常用的UML工具軟體有PowerDesinger、Rose和Enterprise Architect。 UML工具軟體不僅可以繪制軟體開發中所需的各種圖表,還可以生成對 ......

    uj5u.com 2023-04-27 07:29:14 more
  • Java中關于String類以及字串拼接的問題

    String類部分原始碼 //被final修飾不可被繼承 public final class String implements java.io.Serializable, Comparable<String>, CharSequence { //String維護char[] 所以不可修改 priv ......

    uj5u.com 2023-04-27 07:29:06 more
  • Python生成亂數的一個標準庫-random

    1.介紹 Random庫Python中用于生成亂數的一個標準庫。計算機沒有辦法產生真正的亂數,但它可以產生偽亂數。 偽亂數是計算機按照一定的運算規則產生的一些資料,只不過這些資料表現為亂數的形式。計算機中采用梅森旋轉演算法生成為隨機序列,序列中的每一個元素就是偽亂數,由于計算機不能產生真正 ......

    uj5u.com 2023-04-27 07:29:01 more
  • go中 for回圈的坑

    go中 for回圈的坑 在使用for回圈修改結構體切片中的值時,發現并沒有修改成功。 type Dog struct { name string } func (d *Dog) setNewName(name string) { d.name = name } func main() { d := ......

    uj5u.com 2023-04-27 07:28:56 more