我嘗試為 NATS 限制佇列撰寫訂閱者:
sub, err := js.SubscribeSync(fullSubject, nats.Context(ctx))
if err != nil {
return err
}
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
if errors.Is(err, nats.ErrSlowConsumer) {
log.Printf("Slow consumer error returned. Waiting for reset...")
time.Sleep(50 * time.Millisecond)
continue
} else {
return err
}
}
msg.InProgress()
var message pnats.NatsMessage
if err := conn.unmarshaller(msg.Data, &message); err != nil {
msg.Term()
return err
}
actualSubject := message.Context.FullSubject()
handler, ok := callbacks[message.Context.Category]
if !ok {
msg.Nak()
continue
}
callback, err := handler(&message)
if err == nil {
msg.Ack()
msg.Term()
} else {
msg.Nak()
return err
}
callback(ctx)
此代碼的目標是使用多個主題的任何訊息并呼叫與主題關聯的回呼函式。handler
此代碼有效,但我遇到的問題是,如果該函式未回傳錯誤,我希望在呼叫后洗掉該訊息。我認為這就是msg.Term
正在做的事情,但我仍然看到佇列中的所有訊息。
我最初是圍繞作業佇列設計的,但我希望它能夠與多個訂閱者一起作業,所以我不得不重新設計它。有什么辦法可以使這項作業?
uj5u.com熱心網友回復:
根據提供的代碼,我假設您在使用 JetStream 庫創建訂閱時沒有提供流和消費者資訊。
在該方法的檔案中SubscribeSync
,它說當未提供流和消費者資訊時,庫將創建一個臨時消費者,并且消費者的名稱由服務器選擇。它還試圖找出訂閱是針對哪個流的。
這是我認為在您的代碼中發生的事情:
- 當您呼叫該
SubscribeSync
方法時,將使用您提供的主題創建一個臨時使用者。 - 當
msg.Ack
和msg.Term
被呼叫時,您確實會確認該訊息,但僅針對該當前使用者。 - 下次呼叫該
SubscribeSync
方法時,會創建一個新的臨時使用者,其中包含您已在另一個使用者上洗掉的訊息。這就是 Jetstream 的流、消費者和訂閱概念的設計原理。
根據您想要完成的任務,這里有一些建議:
- 使用普通的 NATS Core 庫來處理發布/訂閱或佇列。不要使用 JetStream。NATS Core 庫直接處理主題,而 Jetstream 庫在未提供資訊的情況下在后臺創建其他內容(流和消費者)。
- 使用 JetStream 但自己創建一個流和一個持久的消費者,可以通過代碼或直接在 NATS 服務器上。這樣,在已經定義了流和消費者的情況下,您應該能夠使其按預期作業。
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/508331.html