主頁 > 後端開發 > 深入理解 Redis 新特性:Stream

深入理解 Redis 新特性:Stream

2023-04-17 07:38:32 後端開發

該資料結構需要 Redis 5.0.0 + 版本才可用使用

概述

Redis stream 是 Redis 5 引入的一種新的資料結構,它是一個高性能、高可靠性的訊息佇列,主要用于異步訊息處理和流式資料處理,在此之前,想要使用 Redis 實作訊息佇列,通常可以使用例如:串列,有序集合、發布與訂閱 3 種資料結構,但是 stream 相比它們具有以下的優勢:

  • 支持范圍查找:內置的索引功能,可以通過索引來對訊息進行范圍查找
  • 支持阻塞操作:避免低效的反復輪詢查找訊息
  • 支持 ACK:可以通過確認機制來告知已經成功處理了訊息,保證可靠性
  • 支持多個消費者:多個消費者可以同時消費同一個流,Redis 會確保每個消費者都可以獨立地消費流中的訊息

話不多說,接下來具體看看如何使用它,(PS:萬字長文,行駛途中請系好安全帶)

XADD 添加元素

XADD 命令的語法格式如下:

XADD stream-name id field value [field value]
  • stream-name: 指定 redis stream 的名字
  • id: 是指 stream 中的訊息 ID,通常使用 * 號表示自動生成
  • field value: 就是訊息的內容,是 K-V 格式的鍵值對

關于使用 XADD 添加元素,還有以下特點:

  • 自動創建流:當 my-stream 流不存在時,redis 會自動創建,然后將元素追加在流的末尾處
  • 任意鍵值對:流中的每個元素可以包含一個或任意多個鍵值對

下面是一個使用 XADD 命令添加新訊息的示例:

XADD my-stream * name John age 30 email [email protected]

上述命令的說明:

  1. 向名為 my-stream 的 Redis stream 中添加了一條新訊息,
  2. * 表示使用自動生成的訊息 ID,
  3. nameageemail 是訊息的欄位名
  4. John30[email protected] 是訊息的欄位值,

流元素 ID

XADD 命令在成功執行后會回傳元素 ID 作為結果:

"1681138020163-0"

每個元素的 ID 是一個遞增的唯一識別符號,由兩部分組成:一個時間戳和一個序列號,

  • 時間戳部分是一個 64 位的有符號整數,以毫秒為單位表示自 Unix 時間起經過的毫秒數,
  • 序列號部分是一個遞增的整數,從 0 開始逐步增加,

為了證明,我們可以指定訊息 ID 向指定流中發送一條訊息:

XADD my-stream 1681138020163-1 name Mary age 25 email [email protected]

回傳結果:

"1681138020163-1"

最后,可以提前使用 XRANGE 指令查看推入流中的資料

XRANGE my-stream - +

回傳結果:

1) 1) "1681138020163-0"
   2) 1) "name"
      2) "John"
      3) "age"
      4) "30"
      5) "email"
      6) "[email protected]"
2) 1) "1681138020163-1"
   2) 1) "name"
      2) "Mary"
      3) "age"
      4) "25"
      5) "email"
      6) "[email protected]"

流元素 ID 的限制

元素 ID 在 Redis stream 中扮演著非常重要的角色,它不僅保證了元素的唯一性和順序性,還提供了高效的范圍查詢和分析功能,在使用 Redis stream 時,需要特別注意元素 ID 的限制,并保證 ID 的唯一性和遞增性,

限制如下::

  • ID 必須是唯一的
  • 新元素的 ID 必須比流中所有已有元素的 ID 都要大

還有一些長度和特殊字符的限制等等,不符合上述限制的添加元素操作,會被 redis 拒絕,并且回傳一個錯誤等,

最大元素 ID 是如何更新的 ?

在成功執行XADD命令之后,流的最大元素ID也會隨之更新,

為什么要限制 新元素的 ID 必須比流中所有已有元素的 ID 都要大 ?

限制新元素的 ID 必須比流中所有已有元素的 ID 都要大,是為了保證 stream 中每個元素的唯一性和順序性,這種特性對于使用流實作訊息佇列和事件系統的用戶來說是非常重要的:用戶可以確信,新的訊息和事件只會出現在已有訊息和事件之后,就像現實世界里新事件總是發生在已有事件之后一樣,一切都是有序進行的,

自動生成 ID 的規則

示例開始就演示自動生成訊息向流中推送資料,在日常使用非常方便,這里說一下它的生成規則:

  1. 時間戳部分是當前時間的毫秒數,表示自 Unix 時間起經過的毫秒數
  2. 序列號從 0 開始遞增,序列號是一個 64 位的整數,從 0 開始遞增

限制流長度

流的資料大多只是臨時保存的,如果不對流的長度進行限制,會出現以下情況:

  1. 存盤耗盡:隨著流中訊息的增加,占用的記憶體也會相應增加,長時間運行的應用程式可能會面臨記憶體耗盡的風險
  2. 影響性能:隨著資料越多,查詢和操作流的速度會更慢,維護也更困難

為了避免該問題,在使用 Redis stream 時,可以使用 MAXLEN 選項指定 stream 的最大長度,命令格式如下:

XADD stream [MAXLEN len] id field value [field value ...]

示例:

XADD mini-stream MAXLEN 3 * k1 v1
XADD mini-stream MAXLEN 3 * k2 v2
XADD mini-stream MAXLEN 3 * k3 v3
XADD mini-stream MAXLEN 3 * k4 v4

# 我們向一個限制長度為 3 的 `mini-stream` 流中添加 4 條資料,然后查看流內的訊息:
XRANGE mini-stream - +
1) 1) "1681140898447-0"
   2) 1) "k2"
      2) "v2"
2) 1) "1681140901790-0"
   2) 1) "k3"
      2) "v3"
3) 1) "1681140906703-0"
   2) 1) "k4"
      2) "v4"

最后會看到最早創建的 k1 訊息已經被移除,redis 洗掉在流中存在時間最長的元素,從而來保證流的整體長度,

XTRIM 限制流

除了在 XADD 命令時限制流,Redis 還提供單獨限制流長度的 MAXLEN 命令,基礎語法如下:

XTRIM stream MAXLEN len

示例:

XTRIM my-stream MAXLEN 2
(integer) 1

這條命令 XTRIM my-stream MAXLEN 2 的作用是將名為 my-stream 的流修剪為最多包含 2 條訊息,換句話說,流中超出這個長度的較舊訊息將被移除,

XDEL 移除元素

XDEL 用于從流中洗掉特定的訊息,這個命令需要提供流的鍵(key)和一個或多個訊息 ID 作為引數,當訊息被成功洗掉時,XDEL 命令會回傳被洗掉訊息的數量,

XDEL 的基本語法如下:

XDEL key ID [ID ...]

示例:

# 這個命令將從名為 `mystream` 的流中洗掉訊息 ID 為 `1681480521617-0` 的訊息,
XDEL my-stream 1681480521617-0
(integer) 1

# 你也可以傳入多個 `id` 引數進行批量洗掉
XDEL my-stream 1681480524451-0 1681480526810-0 1681480965273-0
(integer) 3

注意:,XDEL 不會修改流的長度計數,這意味著洗掉訊息后,流的長度保持不變,

XLEN 獲取流長度

XLEN 用于獲取流中訊息的數量,這個命令非常簡單且高效,因為它只要一個引數,

XLEN 的基本語法如下:

XLEN key

示例:

XLEN my-stream
(integer) 4

注意:XLEN 命令僅回傳流中訊息的數量,并不提供訊息的具體內容,獲取訊息內容的命令,看下面的 XRANGE

XRANGE 查詢訊息

XRANG 主要用于獲取流中的一段連續訊息,它還有一個非常相似的 XREVRANGE 命令,區別:

  • XRANGE 按照訊息 ID 順序回傳結果
  • XREVRANGE 按照訊息 ID 逆序回傳結果(用來查詢流中最新的訊息,非常有用!)

XRANG 的的基本語法如下:

XRANGE key start end [COUNT count]

獲取指定訊息

獲取指定訊息,我們可以把 startend 設定同一條訊息 ID,可以用來達到查詢指定訊息 ID 的效果,使用示例:

# 獲取指定訊息 ID
XRANGE my-stream 1681480968241-0 1681480968241-0

獲取多條訊息

獲取多條訊息,可以利用 COUNT 選項引數,使用示例:

# 獲取流中最早的 5 條訊息
XRANGE my-stream - + COUNT 5

這條命令獲取流中最早的 5 條訊息(按訊息 ID 順序排序),-+ 分別表示最小和最大的訊息 ID,用于獲取流中的所有訊息,

獲取全部訊息

想要讀取流中全部訊息內容,移除 COUNT 即可:

# 獲取全部訊息
XRANGE my-stream - +

逆序獲取流

XREVRANGE 按照訊息 ID 逆序回傳結果,基本語法如下:

XREVRANGE key end start [COUNT count]

用法完全和 XRANGE 一樣,這里就不過多介紹了,使用示例:

XREVRANGE my-stream + - COUNT 5

這個命令將回傳名為 mystream 的流中的最新的 3 條訊息(按訊息 ID 逆序排序),

XRANGE 的使用場景

在實際業務場景中,可以利用 XRANGEXREVRANGE 命令可以用于實作以下功能:

  • 分頁查詢:通過指定 startendCOUNT 引數,可以實作對流中訊息的分頁查詢
  • 實時監控:可以使用這些命令來獲取流中的最新訊息,以便在實時監控或分析系統中展示
  • 資料匯出:如果需要將流中的資料匯出到其他系統或檔案中,可以使用這些命令來獲取指定范圍內的訊息

XREAD 阻塞讀取流

相比 XRANGE,XREVRANGE 類似,XREAD 也是用于從流中讀取訊息的命令,但它們之間有一些關鍵區別:

  • XREAD 支持同時讀取多個流的訊息
  • XREAD 支持阻塞模式,可以在新訊息到達時候,立即粗處理
  • XREAD 支持 BLOCK 阻塞等待時間引數,控制阻塞時間

XREAD 的阻塞模式,可以更好的構建實時資料處理應用程式,如事件驅動系統、實時分析系統等,

XREAD 命令的基本語法如下:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

查詢模式

查詢的話,除了同時讀取多個流的特點外,其他和 XRANGE,XREVRANGE 類似,

使用示例:

  1. 讀取單個流的訊息:
XREAD STREAMS my-stream 0

這個命令將從名為 my-stream 的流中讀取訊息,0 代表讀取所有訊息,如果指定的訊息 ID,表示從該訊息 ID 之后開始讀取

  1. 讀取多個流的訊息:
XREAD STREAMS my-stream mini-stream 0 0

這個命令將從名為 my-streammini-stream 的流中分別讀取所有訊息,后面的 2 個引數 0 分別對應 2 個訊息 ID 0 開始的位置

阻塞模式

當使用阻塞模式時,XREAD 命令會在以下幾種情況下表現出不同的行為:

  1. 不會阻塞的情況:找到符合條件的元素會立即回傳
  2. 會解除阻塞的情況:超時,或者新訊息到達
  3. 一直阻塞的情況:一直阻塞,等待新訊息的到達

使用示例:

  1. 不會阻塞的情況

如果流中有滿足條件的訊息(即從指定的訊息 ID 之后的新訊息),那么 XREAD 命令會立即回傳這些訊息,不會發生阻塞,

XREAD BLOCK 1000000 COUNT 1 STREAMS my-stream 0
1) 1) "my-stream"
   2) 1) 1) "1681480968241-0"
         2) 1) "k5"
            2) "v5"
  1. 會解除阻塞的情況

XREAD 命令解除阻塞也分 2 情況:超時,新訊息到達

示例代碼:

# 超時: 阻塞超時,沒有新訊息到達,解除阻塞
XREAD BLOCK 5000 STREAMS my-stream 1681482023346-0
(nil)
(5.09s)

# 新訊息到達: 新訊息到達,且滿足讀取條件 (新訊息的 ID 大于指定的訊息 ID) 解除阻塞
XREAD BLOCK 50000 STREAMS my-stream 1681482023346-0
1) 1) "my-stream"
   2) 1) 1) "1681485525804-0"
         2) 1) "newMessage"
            2) "v1"
(18.46s)
  1. 一直阻塞的情況:

如果設定的阻塞等待時間為 0,那么 XREAD 命令會一直阻塞:

示例代碼:

XREAD BLOCK 0 STREAMS my-stream $

這個命令將一直阻塞等待,直到新訊息到達,$ 符號表示只讀取新訊息,

當然如果客戶端主動斷開連接,阻塞的 XREAD 命令也會被取消

在實際應用中,XREAD 使用阻塞模式,可以在新訊息到達時立即處理,實作實時訊息處理,

消費組

在 Redis 流的訊息模型中,是通過消費者組(Consumer Group)來組織和管理多個消費者以協同處理來自同一個流的訊息的機制,消費者組的主要目的是在多個消費者之間分發訊息,實作負載均衡、高可用性和容錯能力,

作業原理:

  1. Stream 將訊息分發,所有訂閱的消費者組 Consumer Group 都會收到訊息(消費組組共享 stream 的訊息)
  2. 消費者組本身不處理訊息,而是再將訊息分發給消費者,由消費者進行真正的消費(消費者獨占組內的訊息)

如圖所示:

graph LR Stream((Stream)) -- messages --> ConsumerGroup1(Consumer Group 1) Stream((Stream)) -- messages --> ConsumerGroup2(Consumer Group 2) ConsumerGroup1(Consumer Group 1) -- messages --> Consumer1A(Consumer 1A) ConsumerGroup1(Consumer Group 1) -- messages --> Consumer1B(Consumer 1B) ConsumerGroup2(Consumer Group 2) -- messages --> Consumer2A(Consumer 2A) ConsumerGroup2(Consumer Group 2) -- messages --> Consumer2B(Consumer 2B)

使用消費者組這種模型的設計,以為在 Redis Stream 中實作以下功能:

  • 負載均衡:消費者組可以將訊息分發給多個消費者,實作負載均衡
  • 高可用性:在某個消費者發生故障的情況下,仍然可以確保訊息被處理
  • 容錯能力:消費者組支持重新處理失敗的訊息,這有助于確保訊息被可靠地處理

接下來我們再詳細說明消費組相關的命令使用

XGOUP 管理消費組

CREATE 創建消費組

通過 XGROUP 命令可以為你的 Redis Stream 創建和管理消費組,

命令格式如下:

XGROUP CREATE stream group id

引數說明:

  • <stream>:要關聯的流的鍵,
  • <group>:消費組的名稱,
  • <id>:開始讀取訊息的起始 ID,通常使用 $ 表示僅消費新訊息,或者使用 0 表示消費流中的所有訊息,
  • [MKSTREAM](可選):如果流不存在,自動創建一個新的流,

使用示例:

# 創建消費組,如果流不存在則自動創建
XGROUP CREATE mystream mygroup $ MKSTREAM
OK

# 查看流中的消費組
XINFO GROUPS mystream
1)  1) "name"
    2) "mygroup"
    3) "consumers"
    4) (integer) 0
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "0-0"
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 0

以上命令是使用 XGROUP CREATE 命令創建一個名為 mygroup 的消費組,從最新的訊息開始消費,使用 MKSTREAM 選項,如果流不存在則會自動創建流,回傳 OK 既代表創建成功,最后使用 XINFO 查看結果,

SETID 修改組的最后訊息 ID

在某些情況下,你可能想要消費組忽略某些訊息,或者重新處理某些訊息來重現 bug,那么可以使用 XGROUP SETID 命令設定消費組的起始訊息 ID,

命令格式非常簡單:

XGROUP SETID stream group id

使用示例:

# 設定 mygroup 組的最新訊息為指定 ID
XGROUP SETID mystream mygroup 1681655893911-0
OK

# 查看消費組
XINFO GROUPS mystream
1)  1) "name"
    2) "mygroup"
    3) "consumers"
    4) (integer) 0
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1681655893911-0"		# 已被改變
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 4
   
# 設定 mygroup 組的最新訊息為流的最新訊息 ID
XGROUP SETID mystream mygroup $

# 查看消費組
127.0.0.1:6379> XINFO GROUPS mystream
1)  1) "name"
    2) "mygroup"
    3) "consumers"
    4) (integer) 0
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1681655916001-0"		# 已更新
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 0

以上命令將 mygroup 組的最新訊息 ID 更新為指定 ID 和流的最新 ID 的使用示例,

XREADGROUP 讀取訊息

使用 XREADGROUP 命令讀取消費組里面的訊息,基本語法:

XREADGROUP GROUP <group> <consumer> [COUNT <n>] [BLOCK <ms>] STREAMS <stream_key_1> <stream_key_2> ... <id_1> <id_2> ...

引數說明

  • <group>:消費組的名稱,
  • <consumer>:消費者的名稱,
  • <n>(可選):要讀取的最大訊息數,
  • <ms>(可選):阻塞等待新訊息的時間(以毫秒為單位),
  • <stream_key_1>, <stream_key_2>:要從中讀取訊息的流的鍵,
  • <id_1>, <id_2>:從每個流中開始讀取的訊息 ID,通常使用特殊字符 > 表示從上次讀取的位置開始讀取新的訊息,

使用示例:

我們創建一個 myconsumer 的消費組讀取上面創建 mygroup 消費組的資訊,以下是多種用法示例:

# 以 myconsumer 消費者身份從 mystream 中讀取分配給 mygroup 的訊息
# 讀取所有最新的訊息(常用)
XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
(nil)

# 其他用法:
# 讀取最多 10 條訊息
XREADGROUP GROUP mygroup myconsumer COUNT 10 STREAMS mystream >

# 進行阻塞讀取最新訊息
XREADGROUP GROUP mygroup myconsumer BLOCK 5000 STREAMS mystream >

這里拿不到資料是因為我們上面把消費組 mygroup 的訊息 ID 設定為最新,我們嘗試修改訊息 ID 重新消費試試

# 設定消費組的訊息 ID,進行重新消費
XGROUP SETID mystream mygroup 1681655893911-0

# 消費組 myconsumer 讀取消費組的訊息
XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1681655897993-0"
         2) 1) "k1"
            2) "v1"
      2) 1) "1681655899297-0"
         2) 1) "k1"
            2) "v1"
      3) 1) "1681655915496-0"
         2) 1) "k1"
            2) "v1"
      4) 1) "1681655916001-0"
         2) 1) "k1"
            2) "v1"
            
# 查看消費組的資訊
XINFO GROUPS mystream
1)  1) "name"
    2) "mygroup"
    3) "consumers"
    4) (integer) 1		# 消費組有一個消費者
    5) "pending"
    6) (integer) 4		# 有 4 條正在處理的訊息
    7) "last-delivered-id"
    8) "1681655916001-0"
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 0

通過以上命令可以確認,myconsumer 消費者拿到 mygroup 消費組的訊息未確認處理,所以看到有 4 條訊息正在等待處理中,

XPENDING 查看訊息

通過 XPENDING 命令,可以獲取指定流的指定消費者組目前的待處理訊息的相關資訊,在很多場景下,你需要通過它來觀察和了解消費者的處理情況,從而做出處理,例如以下場景:

  • 您可以獲取消費組中掛起(未確認)的訊息資訊,從而了解消費者處理訊息的速度和效率
  • 如果某個消費者的掛起訊息數量不斷增加,或者某些訊息長時間未被處理,可能表明該消費者存在問題
  • 在高并發或高負載的情況下,消費者可能無法及時處理所有訊息,通過 XPENDING 命令檢測到積壓訊息
  • 通過定期運行 XPENDING 命令,您可以在發現掛起訊息數量超過預設閾值時觸發報警

基本語法:

XPENDING stream group [start stop count] [consumer]

引數說明

  • <stream>:流的鍵,
  • <group>:消費組的名稱,
  • <start>(可選):掛起訊息范圍的起始 ID,
  • <stop>(可選):掛起訊息范圍的結束 ID,
  • <count>(可選):回傳的最大掛起訊息數,
  • <consumer>(可選):篩選特定消費者的掛起訊息,

使用示例:

使用 XPENDING 命令查看上面的 mygroup 組的訊息去哪兒了:

XPENDING mystream mygroup
1) (integer) 4			# 待處理訊息數量
2) "1681655897993-0"	# 首條訊息 ID
3) "1681655916001-0"	# 最后一條訊息的 ID
4) 1) 1) "myconsumer"	# 各消費者正在處理的訊息數量
      2) "4"

以上展示的匯總資訊,你還可以通過以下命令,查看待處理訊息更詳細的資訊:

# 查看指定待處理訊息
XPENDING mystream mygroup 1681655897993-0 1681655897993-0 1
1) 1) "1681655897993-0"		# 訊息 ID
   2) "myconsumer"			# 所屬消費者
   3) (integer) 2397387		# 最后一次投遞時間
   4) (integer) 1			# 投遞次數

從以上資訊你可以看到訊息正在被誰處理和處理的時間,你也可以指定消費者查看資訊:

XPENDING mystream mygroup - + 10 myconsumer
1) 1) "1681655897993-0"
   2) "myconsumer"
   3) (integer) 2591145
   4) (integer) 1
2) 1) "1681655899297-0"
   2) "myconsumer"
   3) (integer) 2591145
   4) (integer) 1
3) 1) "1681655915496-0"
   2) "myconsumer"
   3) (integer) 2591145
   4) (integer) 1
4) 1) "1681655916001-0"
   2) "myconsumer"
   3) (integer) 2591145
   4) (integer) 1

以上命令列出 myconsumer 消費者所有待處理的訊息的詳細資訊

XACK 處理訊息

XACK 用于確認消費組中的特定訊息已被處理,在消費者成功處理訊息后,應使用 XACK 命令通知 Redis,以便從消費組的掛起訊息串列中移除該訊息,

命令格式:

XACK stream group id [id id ...]

使用示例:

通過 XACK 命令,我們將上面 myconsumer 消費者的訊息進行確認處理:

# 確認訊息
XACK mystream mygroup 1681655897993-0
(integer) 1
# .....

當消費者對所有訊息進行處理后,再查看消費組內容進行驗證:

XPENDING mystream mygroup - + 10 myconsumer
(empty array)

XPENDING mystream mygroup
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)

使用 XACK 可以確保訊息不會重復處理防止其他消費者或相同消費者在故障恢復后重復處理該訊息等等好處,

XCLAIM 訊息轉移

XCLAIM 訊息轉移類似我們生活中的呼叫轉移,當一個消費者無法處理某個訊息或出現故障時,XCLAIM 可以確保其他消費者接管并處理這些訊息,命令格式非常簡單:

XCLAIM stream group new_consumer max_pending_time id [id id id]

使用示例:

# 使用 XPENDING 命令查詢消費組中掛起的訊息
XPENDING mystream mygroup
1) (integer) 2
2) "1681660259887-0"
3) "1681660263096-0"
4) 1) 1) "myconsumer"
      2) "2"
      
# 使用 XCLAIM 命令將訊息轉移
XCLAIM mystream mygroup myconsumer2 10000 1681660259887-0
1) 1) "1681660259887-0"			# 被轉移的訊息 ID
   2) 1) "k1"					# 訊息內容
      2) "v1"

上面的命令意思是:如果訊息 ID 1681660259887-0 處理時間超過 10000ms,那么訊息轉移給 myconsumer2,我們使用 XPENDING 命令來驗證:

XPENDING mystream mygroup
1) (integer) 2
2) "1681660259887-0"
3) "1681660263096-0"
4) 1) 1) "myconsumer"
      2) "1"
   2) 1) "myconsumer2"
      2) "1"

XINFO 查看流和組資訊

XINFO 用于獲取流或消費組的詳細資訊,XINFO 命令有多個子命令,可以提供不同型別的資訊,

以下是一些常用的 XINFO 子命令及其介紹:

XINFO STREAM:此子命令用于獲取流的詳細資訊,包括長度、消費組數量、第一個和最后一個條目等,例如:

XINFO STREAM mystream

XINFO GROUPS:此子命令用于獲取流中消費組的串列及其相關資訊,例如:

XINFO GROUPS mystream

XINFO CONSUMERS:此子命令用于獲取消費組中消費者的串列及其相關資訊,例如:

XINFO CONSUMERS mystream mygroup

通過使用這些子命令,您可以了解流、消費組和消費者的狀態,從而監控和優化 Redis Stream 應用程式的性能,在處理問題或分析系統性能時,這些資訊可能特別有用,

洗掉操作

洗掉消費者

當用戶不再需要某個消費者的時候,可以通過執行以下命令將其洗掉,命令格式:

XGROUP DELCONSUMER stream group consumer

使用示例:

# 洗掉 myconsumer 消費者
XGROUP DELCONSUMER mystream mygroup myconsumer
(integer) 1
洗掉消費組

當你不需要消費組時,可以通過以下命令洗掉它,命令格式:

XGROUP DESTROY stream group

使用示例:

# 洗掉 mygroup 消費組
XGROUP DESTROY mystream mygroup
(integer) 1

總結

以下是本篇文章涉及的 Redis Stream 命令命令和簡要總結:

  1. XADD:向流中添加新的訊息,
  2. XREAD:從流中讀取訊息,
  3. XREADGROUP:從消費組中讀取訊息,
  4. XRANGE:根據訊息 ID 范圍讀取流中的訊息,
  5. XREVRANGE:與 XRANGE 類似,但以相反順序回傳結果,
  6. XDEL:從流中洗掉訊息,
  7. XTRIM:根據 MAXLEN 引數修剪流的長度,
  8. XLEN:獲取流的長度,
  9. XGROUP:管理消費組,包括創建、洗掉和修改,
  10. XACK:確認消費組中的訊息已被處理,
  11. XPENDING:查詢消費組中掛起(未確認)的訊息,
  12. XCLAIM:將掛起的訊息從一個消費者轉移到另一個消費者,
  13. XINFO:獲取流、消費組或消費者的詳細資訊,

這些命令提供了對 Redis Stream 的全面操作支持,包括添加、洗掉、讀取、修剪訊息以及管理消費組和消費者,通過熟練使用這些命令,您可以實作高效且可擴展的訊息傳遞和日志處理系統,edis Stream 是 Redis 提供的一種強大、持久且可擴展的資料結構,用于實作訊息傳遞和日志處理等場景,Stream 資料結構類似于日志檔案,訊息以有序的方式存盤在流中,同時還支持消費組的概念,允許多個消費者并行處理訊息,

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/550243.html

標籤:其他

上一篇:c/c++快樂演算法第三天

下一篇:【Visual Leak Detector】庫的 22 個 API 使用說明

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more