該資料結構需要 Redis 5.0.0 + 版本才可用使用
概述
Redis stream 是 Redis 5 引入的一種新的資料結構,它是一個高性能、高可靠性的訊息佇列,主要用于異步訊息處理和流式資料處理,在此之前,想要使用 Redis 實作訊息佇列,通常可以使用例如:串列,有序集合、發布與訂閱 3 種資料結構,但是 stream 相比它們具有以下的優勢:
- 支持范圍查找:內置的索引功能,可以通過索引來對訊息進行范圍查找
- 支持阻塞操作:避免低效的反復輪詢查找訊息
- 支持 ACK:可以通過確認機制來告知已經成功處理了訊息,保證可靠性
- 支持多個消費者:多個消費者可以同時消費同一個流,Redis 會確保每個消費者都可以獨立地消費流中的訊息
話不多說,接下來具體看看如何使用它,(PS:萬字長文,行駛途中請系好安全帶)
XADD 添加元素
XADD 命令的語法格式如下:
XADD stream-name id field value [field value]
- stream-name: 指定 redis stream 的名字
- id: 是指 stream 中的訊息 ID,通常使用
*
號表示自動生成 - field value: 就是訊息的內容,是 K-V 格式的鍵值對
關于使用 XADD 添加元素,還有以下特點:
- 自動創建流:當
my-stream
流不存在時,redis 會自動創建,然后將元素追加在流的末尾處 - 任意鍵值對:流中的每個元素可以包含一個或任意多個鍵值對
下面是一個使用 XADD 命令添加新訊息的示例:
XADD my-stream * name John age 30 email [email protected]
上述命令的說明:
- 向名為
my-stream
的 Redis stream 中添加了一條新訊息, *
表示使用自動生成的訊息 ID,name
、age
和email
是訊息的欄位名John
、30
和[email protected]
是訊息的欄位值,
流元素 ID
XADD 命令在成功執行后會回傳元素 ID 作為結果:
"1681138020163-0"
每個元素的 ID 是一個遞增的唯一識別符號,由兩部分組成:一個時間戳和一個序列號,
- 時間戳部分是一個 64 位的有符號整數,以毫秒為單位表示自 Unix 時間起經過的毫秒數,
- 序列號部分是一個遞增的整數,從 0 開始逐步增加,
為了證明,我們可以指定訊息 ID 向指定流中發送一條訊息:
XADD my-stream 1681138020163-1 name Mary age 25 email [email protected]
回傳結果:
"1681138020163-1"
最后,可以提前使用 XRANGE
指令查看推入流中的資料
XRANGE my-stream - +
回傳結果:
1) 1) "1681138020163-0"
2) 1) "name"
2) "John"
3) "age"
4) "30"
5) "email"
6) "[email protected]"
2) 1) "1681138020163-1"
2) 1) "name"
2) "Mary"
3) "age"
4) "25"
5) "email"
6) "[email protected]"
流元素 ID 的限制
元素 ID 在 Redis stream 中扮演著非常重要的角色,它不僅保證了元素的唯一性和順序性,還提供了高效的范圍查詢和分析功能,在使用 Redis stream 時,需要特別注意元素 ID 的限制,并保證 ID 的唯一性和遞增性,
限制如下::
- ID 必須是唯一的
- 新元素的 ID 必須比流中所有已有元素的 ID 都要大
還有一些長度和特殊字符的限制等等,不符合上述限制的添加元素操作,會被 redis 拒絕,并且回傳一個錯誤等,
最大元素 ID 是如何更新的 ?
在成功執行XADD命令之后,流的最大元素ID也會隨之更新,
為什么要限制 新元素的 ID 必須比流中所有已有元素的 ID 都要大 ?
限制新元素的 ID 必須比流中所有已有元素的 ID 都要大,是為了保證 stream 中每個元素的唯一性和順序性,這種特性對于使用流實作訊息佇列和事件系統的用戶來說是非常重要的:用戶可以確信,新的訊息和事件只會出現在已有訊息和事件之后,就像現實世界里新事件總是發生在已有事件之后一樣,一切都是有序進行的,
自動生成 ID 的規則
示例開始就演示自動生成訊息向流中推送資料,在日常使用非常方便,這里說一下它的生成規則:
- 時間戳部分是當前時間的毫秒數,表示自 Unix 時間起經過的毫秒數
- 序列號從 0 開始遞增,序列號是一個 64 位的整數,從 0 開始遞增
限制流長度
流的資料大多只是臨時保存的,如果不對流的長度進行限制,會出現以下情況:
- 存盤耗盡:隨著流中訊息的增加,占用的記憶體也會相應增加,長時間運行的應用程式可能會面臨記憶體耗盡的風險
- 影響性能:隨著資料越多,查詢和操作流的速度會更慢,維護也更困難
為了避免該問題,在使用 Redis stream 時,可以使用 MAXLEN 選項指定 stream 的最大長度,命令格式如下:
XADD stream [MAXLEN len] id field value [field value ...]
示例:
XADD mini-stream MAXLEN 3 * k1 v1
XADD mini-stream MAXLEN 3 * k2 v2
XADD mini-stream MAXLEN 3 * k3 v3
XADD mini-stream MAXLEN 3 * k4 v4
# 我們向一個限制長度為 3 的 `mini-stream` 流中添加 4 條資料,然后查看流內的訊息:
XRANGE mini-stream - +
1) 1) "1681140898447-0"
2) 1) "k2"
2) "v2"
2) 1) "1681140901790-0"
2) 1) "k3"
2) "v3"
3) 1) "1681140906703-0"
2) 1) "k4"
2) "v4"
最后會看到最早創建的 k1
訊息已經被移除,redis 洗掉在流中存在時間最長的元素,從而來保證流的整體長度,
XTRIM 限制流
除了在 XADD 命令時限制流,Redis 還提供單獨限制流長度的 MAXLEN 命令,基礎語法如下:
XTRIM stream MAXLEN len
示例:
XTRIM my-stream MAXLEN 2
(integer) 1
這條命令 XTRIM my-stream MAXLEN 2
的作用是將名為 my-stream
的流修剪為最多包含 2 條訊息,換句話說,流中超出這個長度的較舊訊息將被移除,
XDEL 移除元素
XDEL 用于從流中洗掉特定的訊息,這個命令需要提供流的鍵(key)和一個或多個訊息 ID 作為引數,當訊息被成功洗掉時,XDEL
命令會回傳被洗掉訊息的數量,
XDEL
的基本語法如下:
XDEL key ID [ID ...]
示例:
# 這個命令將從名為 `mystream` 的流中洗掉訊息 ID 為 `1681480521617-0` 的訊息,
XDEL my-stream 1681480521617-0
(integer) 1
# 你也可以傳入多個 `id` 引數進行批量洗掉
XDEL my-stream 1681480524451-0 1681480526810-0 1681480965273-0
(integer) 3
注意:,XDEL
不會修改流的長度計數,這意味著洗掉訊息后,流的長度保持不變,
XLEN 獲取流長度
XLEN 用于獲取流中訊息的數量,這個命令非常簡單且高效,因為它只要一個引數,
XLEN
的基本語法如下:
XLEN key
示例:
XLEN my-stream
(integer) 4
注意:XLEN
命令僅回傳流中訊息的數量,并不提供訊息的具體內容,獲取訊息內容的命令,看下面的 XRANGE
XRANGE 查詢訊息
XRANG
主要用于獲取流中的一段連續訊息,它還有一個非常相似的 XREVRANGE
命令,區別:
XRANGE
按照訊息 ID 順序回傳結果XREVRANGE
按照訊息 ID 逆序回傳結果(用來查詢流中最新的訊息,非常有用!)
XRANG
的的基本語法如下:
XRANGE key start end [COUNT count]
獲取指定訊息
獲取指定訊息,我們可以把 start
和 end
設定同一條訊息 ID,可以用來達到查詢指定訊息 ID 的效果,使用示例:
# 獲取指定訊息 ID
XRANGE my-stream 1681480968241-0 1681480968241-0
獲取多條訊息
獲取多條訊息,可以利用 COUNT 選項引數,使用示例:
# 獲取流中最早的 5 條訊息
XRANGE my-stream - + COUNT 5
這條命令獲取流中最早的 5 條訊息(按訊息 ID 順序排序),-
和 +
分別表示最小和最大的訊息 ID,用于獲取流中的所有訊息,
獲取全部訊息
想要讀取流中全部訊息內容,移除 COUNT 即可:
# 獲取全部訊息
XRANGE my-stream - +
逆序獲取流
XREVRANGE
按照訊息 ID 逆序回傳結果,基本語法如下:
XREVRANGE key end start [COUNT count]
用法完全和 XRANGE 一樣,這里就不過多介紹了,使用示例:
XREVRANGE my-stream + - COUNT 5
這個命令將回傳名為 mystream
的流中的最新的 3 條訊息(按訊息 ID 逆序排序),
XRANGE 的使用場景
在實際業務場景中,可以利用 XRANGE
和 XREVRANGE
命令可以用于實作以下功能:
- 分頁查詢:通過指定
start
、end
和COUNT
引數,可以實作對流中訊息的分頁查詢 - 實時監控:可以使用這些命令來獲取流中的最新訊息,以便在實時監控或分析系統中展示
- 資料匯出:如果需要將流中的資料匯出到其他系統或檔案中,可以使用這些命令來獲取指定范圍內的訊息
XREAD 阻塞讀取流
相比 XRANGE,XREVRANGE 類似,XREAD 也是用于從流中讀取訊息的命令,但它們之間有一些關鍵區別:
- XREAD 支持同時讀取多個流的訊息
- XREAD 支持阻塞模式,可以在新訊息到達時候,立即粗處理
- XREAD 支持
BLOCK
阻塞等待時間引數,控制阻塞時間
XREAD 的阻塞模式,可以更好的構建實時資料處理應用程式,如事件驅動系統、實時分析系統等,
XREAD
命令的基本語法如下:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
查詢模式
查詢的話,除了同時讀取多個流的特點外,其他和 XRANGE,XREVRANGE 類似,
使用示例:
- 讀取單個流的訊息:
XREAD STREAMS my-stream 0
這個命令將從名為 my-stream
的流中讀取訊息,0 代表讀取所有訊息,如果指定的訊息 ID,表示從該訊息 ID 之后開始讀取
- 讀取多個流的訊息:
XREAD STREAMS my-stream mini-stream 0 0
這個命令將從名為 my-stream
和 mini-stream
的流中分別讀取所有訊息,后面的 2 個引數 0 分別對應 2 個訊息 ID 0
開始的位置
阻塞模式
當使用阻塞模式時,XREAD
命令會在以下幾種情況下表現出不同的行為:
- 不會阻塞的情況:找到符合條件的元素會立即回傳
- 會解除阻塞的情況:超時,或者新訊息到達
- 一直阻塞的情況:一直阻塞,等待新訊息的到達
使用示例:
- 不會阻塞的情況
如果流中有滿足條件的訊息(即從指定的訊息 ID 之后的新訊息),那么 XREAD
命令會立即回傳這些訊息,不會發生阻塞,
XREAD BLOCK 1000000 COUNT 1 STREAMS my-stream 0
1) 1) "my-stream"
2) 1) 1) "1681480968241-0"
2) 1) "k5"
2) "v5"
- 會解除阻塞的情況
XREAD
命令解除阻塞也分 2 情況:超時,新訊息到達
示例代碼:
# 超時: 阻塞超時,沒有新訊息到達,解除阻塞
XREAD BLOCK 5000 STREAMS my-stream 1681482023346-0
(nil)
(5.09s)
# 新訊息到達: 新訊息到達,且滿足讀取條件 (新訊息的 ID 大于指定的訊息 ID) 解除阻塞
XREAD BLOCK 50000 STREAMS my-stream 1681482023346-0
1) 1) "my-stream"
2) 1) 1) "1681485525804-0"
2) 1) "newMessage"
2) "v1"
(18.46s)
- 一直阻塞的情況:
如果設定的阻塞等待時間為 0,那么 XREAD
命令會一直阻塞:
示例代碼:
XREAD BLOCK 0 STREAMS my-stream $
這個命令將一直阻塞等待,直到新訊息到達,$
符號表示只讀取新訊息,
當然如果客戶端主動斷開連接,阻塞的 XREAD
命令也會被取消
在實際應用中,XREAD
使用阻塞模式,可以在新訊息到達時立即處理,實作實時訊息處理,
消費組
在 Redis 流的訊息模型中,是通過消費者組(Consumer Group)來組織和管理多個消費者以協同處理來自同一個流的訊息的機制,消費者組的主要目的是在多個消費者之間分發訊息,實作負載均衡、高可用性和容錯能力,
作業原理:
- Stream 將訊息分發,所有訂閱的消費者組 Consumer Group 都會收到訊息(消費組組共享 stream 的訊息)
- 消費者組本身不處理訊息,而是再將訊息分發給消費者,由消費者進行真正的消費(消費者獨占組內的訊息)
如圖所示:
graph LR Stream((Stream)) -- messages --> ConsumerGroup1(Consumer Group 1) Stream((Stream)) -- messages --> ConsumerGroup2(Consumer Group 2) ConsumerGroup1(Consumer Group 1) -- messages --> Consumer1A(Consumer 1A) ConsumerGroup1(Consumer Group 1) -- messages --> Consumer1B(Consumer 1B) ConsumerGroup2(Consumer Group 2) -- messages --> Consumer2A(Consumer 2A) ConsumerGroup2(Consumer Group 2) -- messages --> Consumer2B(Consumer 2B)使用消費者組這種模型的設計,以為在 Redis Stream 中實作以下功能:
- 負載均衡:消費者組可以將訊息分發給多個消費者,實作負載均衡
- 高可用性:在某個消費者發生故障的情況下,仍然可以確保訊息被處理
- 容錯能力:消費者組支持重新處理失敗的訊息,這有助于確保訊息被可靠地處理
接下來我們再詳細說明消費組相關的命令使用
XGOUP 管理消費組
CREATE 創建消費組
通過 XGROUP
命令可以為你的 Redis Stream 創建和管理消費組,
命令格式如下:
XGROUP CREATE stream group id
引數說明:
<stream>
:要關聯的流的鍵,<group>
:消費組的名稱,<id>
:開始讀取訊息的起始 ID,通常使用$
表示僅消費新訊息,或者使用0
表示消費流中的所有訊息,[MKSTREAM]
(可選):如果流不存在,自動創建一個新的流,
使用示例:
# 創建消費組,如果流不存在則自動創建
XGROUP CREATE mystream mygroup $ MKSTREAM
OK
# 查看流中的消費組
XINFO GROUPS mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "0-0"
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 0
以上命令是使用 XGROUP CREATE
命令創建一個名為 mygroup
的消費組,從最新的訊息開始消費,使用 MKSTREAM
選項,如果流不存在則會自動創建流,回傳 OK 既代表創建成功,最后使用 XINFO 查看結果,
SETID 修改組的最后訊息 ID
在某些情況下,你可能想要消費組忽略某些訊息,或者重新處理某些訊息來重現 bug,那么可以使用 XGROUP SETID
命令設定消費組的起始訊息 ID,
命令格式非常簡單:
XGROUP SETID stream group id
使用示例:
# 設定 mygroup 組的最新訊息為指定 ID
XGROUP SETID mystream mygroup 1681655893911-0
OK
# 查看消費組
XINFO GROUPS mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1681655893911-0" # 已被改變
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 4
# 設定 mygroup 組的最新訊息為流的最新訊息 ID
XGROUP SETID mystream mygroup $
# 查看消費組
127.0.0.1:6379> XINFO GROUPS mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1681655916001-0" # 已更新
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 0
以上命令將 mygroup
組的最新訊息 ID 更新為指定 ID 和流的最新 ID 的使用示例,
XREADGROUP 讀取訊息
使用 XREADGROUP
命令讀取消費組里面的訊息,基本語法:
XREADGROUP GROUP <group> <consumer> [COUNT <n>] [BLOCK <ms>] STREAMS <stream_key_1> <stream_key_2> ... <id_1> <id_2> ...
引數說明:
<group>
:消費組的名稱,<consumer>
:消費者的名稱,<n>
(可選):要讀取的最大訊息數,<ms>
(可選):阻塞等待新訊息的時間(以毫秒為單位),<stream_key_1>
,<stream_key_2>
:要從中讀取訊息的流的鍵,<id_1>
,<id_2>
:從每個流中開始讀取的訊息 ID,通常使用特殊字符>
表示從上次讀取的位置開始讀取新的訊息,
使用示例:
我們創建一個 myconsumer
的消費組讀取上面創建 mygroup
消費組的資訊,以下是多種用法示例:
# 以 myconsumer 消費者身份從 mystream 中讀取分配給 mygroup 的訊息
# 讀取所有最新的訊息(常用)
XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
(nil)
# 其他用法:
# 讀取最多 10 條訊息
XREADGROUP GROUP mygroup myconsumer COUNT 10 STREAMS mystream >
# 進行阻塞讀取最新訊息
XREADGROUP GROUP mygroup myconsumer BLOCK 5000 STREAMS mystream >
這里拿不到資料是因為我們上面把消費組 mygroup 的訊息 ID 設定為最新,我們嘗試修改訊息 ID 重新消費試試
# 設定消費組的訊息 ID,進行重新消費
XGROUP SETID mystream mygroup 1681655893911-0
# 消費組 myconsumer 讀取消費組的訊息
XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1681655897993-0"
2) 1) "k1"
2) "v1"
2) 1) "1681655899297-0"
2) 1) "k1"
2) "v1"
3) 1) "1681655915496-0"
2) 1) "k1"
2) "v1"
4) 1) "1681655916001-0"
2) 1) "k1"
2) "v1"
# 查看消費組的資訊
XINFO GROUPS mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 1 # 消費組有一個消費者
5) "pending"
6) (integer) 4 # 有 4 條正在處理的訊息
7) "last-delivered-id"
8) "1681655916001-0"
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 0
通過以上命令可以確認,myconsumer
消費者拿到 mygroup
消費組的訊息未確認處理,所以看到有 4 條訊息正在等待處理中,
XPENDING 查看訊息
通過 XPENDING 命令,可以獲取指定流的指定消費者組目前的待處理訊息的相關資訊,在很多場景下,你需要通過它來觀察和了解消費者的處理情況,從而做出處理,例如以下場景:
- 您可以獲取消費組中掛起(未確認)的訊息資訊,從而了解消費者處理訊息的速度和效率
- 如果某個消費者的掛起訊息數量不斷增加,或者某些訊息長時間未被處理,可能表明該消費者存在問題
- 在高并發或高負載的情況下,消費者可能無法及時處理所有訊息,通過
XPENDING
命令檢測到積壓訊息 - 通過定期運行
XPENDING
命令,您可以在發現掛起訊息數量超過預設閾值時觸發報警
基本語法:
XPENDING stream group [start stop count] [consumer]
引數說明:
<stream>
:流的鍵,<group>
:消費組的名稱,<start>
(可選):掛起訊息范圍的起始 ID,<stop>
(可選):掛起訊息范圍的結束 ID,<count>
(可選):回傳的最大掛起訊息數,<consumer>
(可選):篩選特定消費者的掛起訊息,
使用示例:
使用 XPENDING
命令查看上面的 mygroup
組的訊息去哪兒了:
XPENDING mystream mygroup
1) (integer) 4 # 待處理訊息數量
2) "1681655897993-0" # 首條訊息 ID
3) "1681655916001-0" # 最后一條訊息的 ID
4) 1) 1) "myconsumer" # 各消費者正在處理的訊息數量
2) "4"
以上展示的匯總資訊,你還可以通過以下命令,查看待處理訊息更詳細的資訊:
# 查看指定待處理訊息
XPENDING mystream mygroup 1681655897993-0 1681655897993-0 1
1) 1) "1681655897993-0" # 訊息 ID
2) "myconsumer" # 所屬消費者
3) (integer) 2397387 # 最后一次投遞時間
4) (integer) 1 # 投遞次數
從以上資訊你可以看到訊息正在被誰處理和處理的時間,你也可以指定消費者查看資訊:
XPENDING mystream mygroup - + 10 myconsumer
1) 1) "1681655897993-0"
2) "myconsumer"
3) (integer) 2591145
4) (integer) 1
2) 1) "1681655899297-0"
2) "myconsumer"
3) (integer) 2591145
4) (integer) 1
3) 1) "1681655915496-0"
2) "myconsumer"
3) (integer) 2591145
4) (integer) 1
4) 1) "1681655916001-0"
2) "myconsumer"
3) (integer) 2591145
4) (integer) 1
以上命令列出 myconsumer 消費者所有待處理的訊息的詳細資訊
XACK 處理訊息
XACK 用于確認消費組中的特定訊息已被處理,在消費者成功處理訊息后,應使用 XACK
命令通知 Redis,以便從消費組的掛起訊息串列中移除該訊息,
命令格式:
XACK stream group id [id id ...]
使用示例:
通過 XACK 命令,我們將上面 myconsumer 消費者的訊息進行確認處理:
# 確認訊息
XACK mystream mygroup 1681655897993-0
(integer) 1
# .....
當消費者對所有訊息進行處理后,再查看消費組內容進行驗證:
XPENDING mystream mygroup - + 10 myconsumer
(empty array)
XPENDING mystream mygroup
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)
使用 XACK
可以確保訊息不會重復處理防止其他消費者或相同消費者在故障恢復后重復處理該訊息等等好處,
XCLAIM 訊息轉移
XCLAIM
訊息轉移類似我們生活中的呼叫轉移,當一個消費者無法處理某個訊息或出現故障時,XCLAIM
可以確保其他消費者接管并處理這些訊息,命令格式非常簡單:
XCLAIM stream group new_consumer max_pending_time id [id id id]
使用示例:
# 使用 XPENDING 命令查詢消費組中掛起的訊息
XPENDING mystream mygroup
1) (integer) 2
2) "1681660259887-0"
3) "1681660263096-0"
4) 1) 1) "myconsumer"
2) "2"
# 使用 XCLAIM 命令將訊息轉移
XCLAIM mystream mygroup myconsumer2 10000 1681660259887-0
1) 1) "1681660259887-0" # 被轉移的訊息 ID
2) 1) "k1" # 訊息內容
2) "v1"
上面的命令意思是:如果訊息 ID 1681660259887-0 處理時間超過 10000ms,那么訊息轉移給 myconsumer2,我們使用 XPENDING 命令來驗證:
XPENDING mystream mygroup
1) (integer) 2
2) "1681660259887-0"
3) "1681660263096-0"
4) 1) 1) "myconsumer"
2) "1"
2) 1) "myconsumer2"
2) "1"
XINFO 查看流和組資訊
XINFO 用于獲取流或消費組的詳細資訊,XINFO
命令有多個子命令,可以提供不同型別的資訊,
以下是一些常用的 XINFO
子命令及其介紹:
XINFO STREAM:此子命令用于獲取流的詳細資訊,包括長度、消費組數量、第一個和最后一個條目等,例如:
XINFO STREAM mystream
XINFO GROUPS:此子命令用于獲取流中消費組的串列及其相關資訊,例如:
XINFO GROUPS mystream
XINFO CONSUMERS:此子命令用于獲取消費組中消費者的串列及其相關資訊,例如:
XINFO CONSUMERS mystream mygroup
通過使用這些子命令,您可以了解流、消費組和消費者的狀態,從而監控和優化 Redis Stream 應用程式的性能,在處理問題或分析系統性能時,這些資訊可能特別有用,
洗掉操作
洗掉消費者
當用戶不再需要某個消費者的時候,可以通過執行以下命令將其洗掉,命令格式:
XGROUP DELCONSUMER stream group consumer
使用示例:
# 洗掉 myconsumer 消費者
XGROUP DELCONSUMER mystream mygroup myconsumer
(integer) 1
洗掉消費組
當你不需要消費組時,可以通過以下命令洗掉它,命令格式:
XGROUP DESTROY stream group
使用示例:
# 洗掉 mygroup 消費組
XGROUP DESTROY mystream mygroup
(integer) 1
總結
以下是本篇文章涉及的 Redis Stream 命令命令和簡要總結:
- XADD:向流中添加新的訊息,
- XREAD:從流中讀取訊息,
- XREADGROUP:從消費組中讀取訊息,
- XRANGE:根據訊息 ID 范圍讀取流中的訊息,
- XREVRANGE:與 XRANGE 類似,但以相反順序回傳結果,
- XDEL:從流中洗掉訊息,
- XTRIM:根據 MAXLEN 引數修剪流的長度,
- XLEN:獲取流的長度,
- XGROUP:管理消費組,包括創建、洗掉和修改,
- XACK:確認消費組中的訊息已被處理,
- XPENDING:查詢消費組中掛起(未確認)的訊息,
- XCLAIM:將掛起的訊息從一個消費者轉移到另一個消費者,
- XINFO:獲取流、消費組或消費者的詳細資訊,
這些命令提供了對 Redis Stream 的全面操作支持,包括添加、洗掉、讀取、修剪訊息以及管理消費組和消費者,通過熟練使用這些命令,您可以實作高效且可擴展的訊息傳遞和日志處理系統,edis Stream 是 Redis 提供的一種強大、持久且可擴展的資料結構,用于實作訊息傳遞和日志處理等場景,Stream 資料結構類似于日志檔案,訊息以有序的方式存盤在流中,同時還支持消費組的概念,允許多個消費者并行處理訊息,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/550243.html
標籤:其他
上一篇:c/c++快樂演算法第三天