GO 語言中 chan 的理解
chan 的底層實作是怎么樣的?
chan 是 Go 語言中的一個關鍵字,用于實作并發通信,chan 可以用于在不同的 goroutine 之間傳遞資料,實作資料的同步和異步傳輸,
在底層實作上,chan 是通過一個結構體來表示的,這個結構體包含了一個指向資料的指標和兩個指向信道的指標,其中,一個指標用于發送資料,另一個指標用于接收資料,
下面是 chan 的底層實作代碼:
type hchan struct {
qcount uint // 當前佇列中的元素數量
dataqsiz uint // 佇列的容量
buf unsafe.Pointer // 指向佇列的指標
elemsize uint16 // 元素的大小
closed uint32 // 是否關閉
elemtype *_type // 元素的型別
sendx uint // 發送的位置
recvx uint // 接收的位置
recvq waitq // 接收等待佇列
sendq waitq // 發送等待佇列
lock mutex // 鎖
}
chan 的發送和接收操作的底現
當我們向 chan 發送資料時,會先檢查 chan 是否已經關閉,如果 chan 已經關閉,那么發送操作會直接回傳一個 panic,否則,會將資料復制到佇列中,并更新發送位置,
下面是 chan 發送操作的底層實作代碼:
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
// 檢查 chan 是否已經關閉
if c.closed != 0 {
panic("send on closed channel")
}
// 計算發送位置
i := c.sendx
// 計算佇列中的元素數量
if c.qcount < c.dataqsiz {
c.qcount++
} else {
// 如果佇列已滿,需要擴容
grow(c)
}
// 更新發送位置
c.sendx++
// 將資料復制到佇列中
qput(c, i, ep)
return true
}
當我們從 chan 接收資料時,也會先檢查 chan 是否已經關閉,如果 chan 已經關閉并且佇列中沒有資料,那么接收操作會直接回傳一個零值,否則,會從佇列中取出資料,并更新接收位置,
下面是 chan 接收操作的底層實作代碼:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 檢查 chan 是否已經關閉
if c.closed != 0 && c.qcount == 0 {
return false, false
}
// 計算接收位置
i := c.recvx
// 如果佇列中沒有資料,需要阻塞等待
for c.qcount <= 0 {
if !block {
return false, false
}
gopark(chanparkcommit, unsafe.Pointer(c), "chan receive", traceEvGoBlockRecv, 1)
}
// 從佇列中取出資料
qget(c, i, ep)
// 更新接收位置
c.recvx++
// 更新佇列中的元素數量
c.qcount--
return true, true
}
chan 是如何實作多個 gorouting 并發安全訪問的?
如上 hchan 結構中的 recvq 和 sendq 分別表示接收等待佇列和發送等待佇列,它們的定義如下:
type waitq struct {
first *sudog // 等待佇列的第一個元素
last *sudog // 等待佇列的最后一個元素
}
sudog 表示等待佇列中的一個元素,它的定義如下:
type sudog struct {
// 等待的 goroutine
g *g
// 是否是 select 操作
isSelect bool
// 等待佇列中的下一個元素
next *sudog
// 等待佇列中的上一個元素
prev *sudog
// 等待的元素
elem unsafe.Pointer
// 獲取鎖的時間
acquiretime int64
// 保留欄位
release2 uint32
// 等待的 ticket
ticket uint32
// 父 sudog
parent *sudog
// 等待鏈表
waitlink *sudog
// 等待鏈表的尾部
waittail *sudog
// 關聯的 chan
c *hchan
// 喚醒時間
releasetime int64
}
當 chan 的佇列已滿或為空時,當前 goroutine 會被加入到發送等待佇列或接收等待佇列中,并釋放鎖,當另一個 goroutine 從 chan 中取出資料或向 chan 發送資料時,它會重新獲取鎖,并從等待佇列中取出一個 goroutine,將其喚醒,這樣,多個 goroutine 就可以通過等待佇列來實作并發訪問 chan,
sudog 是 Go 中非常重要的資料結構,因為 g 與同步物件關系是多對多的,
一個 g 可以出現在許多等待佇列上,因此一個 g 可能有很多sudog:在 select 操作中,一個 goroutine 可以等待多個 chan 中的任意一個就緒, sudog 中的 isSelect 欄位被用來標記它是否是 select 操作,當一個 chan 就緒時,它會喚醒對應的 sudog,并將其從等待佇列中移除,如果一個 sudog 是 select 操作,它會在喚醒后回傳一個特殊的值,表示哪個 chan 就緒了
多個 g 可能正在等待同一個同步物件,因此一個物件可能有許多 sudog:chan 在不同的 gorouting 中傳遞等待
完整的發送和接受方法實作如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
// 獲取 chan 的鎖
lock(&c.lock)
// 檢查 chan 是否已經關閉
if c.closed != 0 {
unlock(&c.lock)
panic("send on closed channel")
}
// 計算發送位置
i := c.sendx
// 計算佇列中的元素數量
if c.qcount < c.dataqsiz {
c.qcount++
} else {
// 如果佇列已滿,需要將當前 goroutine 加入到發送等待佇列中
g := getg()
gp := g.m.curg
if !block {
unlock(&c.lock)
return false
}
// 創建一個 sudog,表示當前 goroutine 等待發送
sg := acquireSudog()
sg.releasetime = 0
sg.acquiretime = 0
sg.g = gp
sg.elem = ep
sg.c = c
// 將 sudog 加入到發送等待佇列中
c.sendq.enqueue(sg)
// 釋放鎖,并將當前 goroutine 阻塞
unlock(&c.lock)
park_m(gp, waitReasonChanSend, traceEvGoBlockSend, 1)
// 當 goroutine 被喚醒時,重新獲取鎖
lock(&c.lock)
// 檢查 chan 是否已經關閉
if c.closed != 0 {
unlock(&c.lock)
panic("send on closed channel")
}
// 從發送等待佇列中取出 sudog
sg = c.sendq.dequeue()
if sg == nil {
throw("chan send inconsistency")
}
// 將資料復制到佇列中
qput(c, i, ep)
}
// 更新發送位置
c.sendx++
// 釋放鎖
unlock(&c.lock)
return true
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 獲取 chan 的鎖
lock(&c.lock)
// 檢查 chan 是否已經關閉
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
return false, false
}
// 計算接收位置
i := c.recvx
// 如果佇列中沒有資料,需要將當前 goroutine 加入到接收等待佇列中
if c.qcount <= 0 {
g := getg()
gp := g.m.curg
if !block {
unlock(&c.lock)
return false, false
}
// 創建一個 sudog,表示當前 goroutine 等待接收
sg := acquireSudog()
sg.releasetime = 0
sg.acquiretime = 0
sg.g = gp
sg.elem = ep
sg.c = c
// 將 sudog 加入到接收等待佇列中
c.recvq.enqueue(sg)
// 釋放鎖,并將當前 goroutine 阻塞
unlock(&c.lock)
park_m(gp, waitReasonChanReceive, traceEvGoBlockRecv, 1)
// 當 goroutine 被喚醒時,重新獲取鎖
lock(&c.lock)
// 檢查 chan 是否已經關閉
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
return false, false
}
// 從接收等待佇列中取出 sudog
sg = c.recvq.dequeue()
if sg == nil {
throw("chan receive inconsistency")
}
// 從佇列中取出資料
qget(c, i, ep)
} else {
// 從佇列中取出資料
qget(c, i, ep)
}
// 更新接收位置
c.recvx++
// 更新佇列中的元素數量
c.qcount--
// 釋放鎖
unlock(&c.lock)
return true, true
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/556518.html
標籤:其他
上一篇:【序列化與反序列化】關于序列化與反序列化MessagePack的實踐
下一篇:返回列表