RabbitMQ
一. 初識
1. 什么是MQ
官方解釋
MQ(Message Queue)訊息佇列,是基礎資料結構中“先進先出”的一種資料結構,
指把要傳輸的資料(訊息)放在佇列中,用佇列機制來實作訊息傳遞——生產者產生訊息并把訊息放入佇列,然后由消費者去處理,
消費者可以到指定佇列拉取訊息,或者訂閱相應的佇列,由MQ服務端給其推送訊息,
個人理解
mq就是訊息中間件,說白了,有點類似于中介,中間人,傳話者,
就拿快遞智能柜舉例:
- 快遞員將快遞放在快遞智能柜
- 用戶根據取貨碼,去找到存放自己快遞的快遞柜
在這個程序中:
快遞就是訊息
快遞員就是生產者
投放快遞就是生產訊息
用戶就是消費者
取出快遞就是消費訊息
快遞智能柜就是中間件
2. 幾大流行MQ對比
二. 基礎
1.概述
訊息佇列已經逐漸成為企業IT系統內部通信的核心手段,它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成為異步RPC的主要手段之一,當今市面上有很多主流的訊息中間件,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開發RocketMQ等,
2. 訊息中間件的組成
2.1 Broker
訊息服務器,作為server提供訊息核心服務
2.2 Producer
訊息生產者,業務的發起方,負責生產訊息傳輸給broker,
2.3 Consumer
訊息消費者,業務的處理方,負責從broker獲取訊息并進行業務邏輯處理
2.4 Topic
主題,發布訂閱模式下的訊息統一匯集地,不同生產者向topic發送訊息,由MQ服務器分發到不同的訂閱者,實作訊息的 廣播
2.5 Queue
佇列,PTP模式下,特定生產者向特定queue發送訊息,消費者訂閱特定的queue完成指定訊息的接收
2.6 Message
訊息體,根據不同通信協議定義的固定格式進行編碼的資料包,來封裝業務資料,實作訊息的傳輸
3. 訊息中間件模式分類
3.1 點對點
PTP點對點:使用queue作為通信載體
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-1YZ20w6N-1629445988070)(https://leanote.com/api/file/getImage?fileId=5ad56d7cab64411333000bb0)]
說明:
訊息生產者生產訊息發送到queue中,然后訊息消費者從queue中取出并且消費訊息,
訊息被消費以后,queue中不再存盤,所以訊息消費者不可能消費到已經被消費的訊息, Queue支持存在多個消費者,但是對一個訊息而言,只會有一個消費者可以消費,
3.2 發布/訂閱
Pub/Sub發布訂閱(廣播):使用topic作為通信載體
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-0m4382Ei-1629445988072)(https://leanote.com/api/file/getImage?fileId=5ad56d8bab6441153c000ab3)]
說明:
訊息生產者(發布)將訊息發布到topic中,同時有多個訊息消費者(訂閱)消費該訊息,和點對點方式不同,發布到topic的訊息會被所有訂閱者消費,
queue實作了負載均衡,將producer生產的訊息發送到訊息佇列中,由多個消費者消費,但一個訊息只能被一個消費者接受,當沒有消費者可用時,這個訊息會被保存直到有一個可用的消費者,
topic實作了發布和訂閱,當你發布一個訊息,所有訂閱這個topic的服務都能得到這個訊息,所以從1到N個訂閱者都能得到一個訊息的拷貝,
4. 訊息中間件的優勢
4.1 系統解耦
互動系統之間沒有直接的呼叫關系,只是通過訊息傳輸,故系統侵入性不強,耦合度低,
4.2 提高系統回應時間
例如原來的一套邏輯,完成支付可能涉及先修改訂單狀態、計算會員積分、通知物流配送幾個邏輯才能完成;通過MQ架構設計,就可將緊急重要(需要立刻回應)的業務放到該呼叫方法中,回應要求不高的使用訊息佇列,放到MQ佇列中,供消費者處理,
4.3 為大資料處理架構提供服務
通過訊息作為整合,大資料的背景下,訊息佇列還與實時處理架構整合,為資料處理提供性能支持,
4.4 Java訊息服務——JMS
Java訊息服務(Java Message Service,JMS)應用程式介面是一個Java平臺中關于面向訊息中間件(MOM)的API,用于在兩個應用程式之間,或分布式系統中發送訊息,進行異步通信,
JMS中的P2P和Pub/Sub訊息模式:點對點(point to point, queue)與發布訂閱(publish/subscribe,topic)最初是由JMS定義的,這兩種模式主要區別或解決的問題就是發送到佇列的訊息能否重復消費(多訂閱),
5. 訊息中間件應用場景
? 5.1 異步通信
有些業務不想也不需要立即處理訊息,訊息佇列提供了異步處理機制,允許用戶把一個訊息放入佇列,但并不立即處理它,想向佇列中放入多少訊息就放多少,然后在需要的時候再去處理它們,
5.2 解耦
降低工程間的強依賴程度,針對異構系統進行適配,在專案啟動之初來預測將來專案會碰到什么需求,是極其困難的,通過訊息系統在處理程序中間插入了一個隱含的、基于資料的介面層,兩邊的處理程序都要實作這一介面,當應用發生變化時,可以獨立的擴展或修改兩邊的處理程序,只要確保它們遵守同樣的介面約束,
5.3 冗余
有些情況下,處理資料的程序會失敗,除非資料被持久化,否則將造成丟失,訊息佇列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丟失風險,許多訊息佇列所采用的”插入-獲取-洗掉”范式中,在把一個訊息從佇列中洗掉之前,需要你的處理系統明確的指出該訊息已經被處理完畢,從而確保你的資料被安全的保存直到你使用完畢,
5.4 擴展性
因為訊息佇列解耦了你的處理程序,所以增大訊息入隊和處理的頻率是很容易的,只要另外增加處理程序即可,不需要改變代碼、不需要調節引數,便于分布式擴容,
5.5 過載保護
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量無法提取預知;如果以為了能處理這類瞬間峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費,使用訊息佇列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰,
5.6 可恢復性
系統的一部分組件失效時,不會影響到整個系統,訊息佇列降低了行程間的耦合度,所以即使一個處理訊息的行程掛掉,加入佇列中的訊息仍然可以在系統恢復后被處理,
5.7 順序保證
在大多使用場景下,資料處理的順序都很重要,大部分訊息佇列本來就是排序的,并且能保證資料會按照特定的順序來處理,
5.8 緩沖
在任何重要的系統中,都會有需要不同的處理時間的元素,訊息佇列通過一個緩沖層來幫助任務最高效率的執行,該緩沖有助于控制和優化資料流經過系統的速度,以調節系統回應時間,
5.9 資料流處理
分布式系統產生的海量資料流,如:業務日志、監控資料、用戶行為等,針對這些資料流進行實時或批量采集匯總,然后進行大資料分析是當前互聯網的必備技術,通過訊息佇列完成此類資料收集是最好的選擇,
6 訊息中間件常用協議
6.1 AMQP協議
AMQP即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高級訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中間件設計,基于此協議的客戶端與訊息中間件可傳遞訊息,并不受客戶端/中間件不同產品,不同開發語言等條件的限制,
優點:可靠、通用
6.2 MQTT協議
MQTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸)是IBM開發的一個即時通訊協議,有可能成為物聯網的重要組成部分,該協議支持所有平臺,幾乎可以把所有聯網物品和外部連接起來,被用來當做傳感器和致動器(比如通過Twitter讓房屋聯網)的通信協議,
優點:格式簡潔、占用帶寬小、移動端通信、PUSH、嵌入式系統
6.3 STOMP協議
STOMP(Streaming Text Orientated Message Protocol)是流文本定向訊息協議,是一種為MOM(Message Oriented Middleware,面向訊息的中間件)設計的簡單文本協議,STOMP提供一個可互操作的連接格式,允許客戶端與任意STOMP訊息代理(Broker)進行互動,
優點:命令模式(非topic\queue模式)
6.4 XMPP協議
XMPP(可擴展訊息處理現場協議,Extensible Messaging and Presence Protocol)是基于可擴展標記語言(XML)的協議,多用于即時訊息(IM)以及在線現場探測,適用于服務器之間的準即時操作,核心是基于XML流傳輸,這個協議可能最終允許因特網用戶向因特網上的其他任何人發送即時訊息,即使其作業系統和瀏覽器不同,
優點:通用公開、兼容性強、可擴展、安全性高,但XML編碼格式占用帶寬大
6.5 其他基于TCP/IP自定義的協議
有些特殊框架(如:redis、kafka、zeroMq等)根據自身需要未嚴格遵循MQ規范,而是基于TCP\IP自行封裝了一套協議,通過網路socket介面進行傳輸,實作了MQ的功能,
三. 入門
1. 環境配置
1.1 安裝erlang語言環境;
然后配置環境變數,和jdk步驟一抹一樣;
RabbitsMQ 3.93版本
官方下載:https://www.rabbitmq.com/install-windows.html
云盤下載:https://www.aliyundrive.com/s/3ZQr3PXpBER
erlang 24.0版本
官方下載:https://www.erlang.org/downloads
云盤下載:https://www.aliyundrive.com/s/tUWUznkcuBs
erlang官網:https://www.erlang.org/
erlang與RabbitsMQ版本參考:https://www.rabbitmq.com/which-erlang.html
1.2 安裝rabbitMQ
無腦下一步
在開始選單中找到rabbitMQ.start ,點擊運行;
1.3 進入圖形頁面
訪問http://127.0.0.1:15672 默認賬戶:guest 密碼:guest
1.4 如果訪問報錯
cmd進入rabbitmq安裝目錄sbin目錄(D:\app_instrall\rabbitmq\rabbitmq_server-3.9.3\sbin)下分別執行這兩句
(1) rabbitmq-plugins enable rabbitmq_management //安裝圖形頁面插件
(2)rabbitmqctl start_app //重啟RebbitMQ
1.5 創建用戶
1.6 創建VirtualHost
1.7 給用戶賦予權限
*VirtualHost:如果把RabbitMQ看成一個資料庫,那么VirtualHost 就是 作業空間,RabbitMQ可以擁有很多個VirtualHost,相當于資料庫中的作業空間,可以有多個,根據業務的不同,連接不同的VirtualHost,達到解耦的目的;
2. 簡單佇列(點對點)
功能:一個生產者P發送訊息到佇列Q,一個消費者C接收
點對點 一個訊息只能讓一個消費者消費
2.1 引入依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
2.2 連接工具類
package com.xzz.utlis;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
/**
* 獲取連接
* @return Connection
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定義連接工廠
ConnectionFactory factory = new ConnectionFactory();
//RabbitMQ ip地址
factory.setHost("127.0.0.1");
//amqp 通訊地址
factory.setPort(5672);
//設定vhost
factory.setVirtualHost("test");
factory.setUsername("xzz");
factory.setPassword("123@qwe");
//通過工廠獲取連接
Connection connection = factory.newConnection();
return connection;
}
}
2.3 生產者代碼
private static final String QUEUE_NAME = "test_queue";
//創建佇列,發送訊息
public static void main(String[] args) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//創建通道
Channel channel = connection.createChannel();
//宣告創建佇列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//訊息內容
String message = "Hello World!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("發送訊息:"+message);
//關閉連接和通道
channel.close();
connection.close();
}
2.4 消費者代碼
private static final String QUEUE_NAME = "test_queue";
//消費者消費訊息
public static void main(String[] args) throws Exception {
//獲取連接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//宣告通道
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定義消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//監聽佇列
channel.basicConsume(QUEUE_NAME,true,consumer);
while(true){
//這個方法會阻塞住,直到獲取到訊息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("接收到訊息:"+message);
}
}
三. 方法詳解
3.1 創建佇列方法
創建佇列宣告channel.queueDeclare方法
DeclareOk queueDeclare() throws IOException;
DeclareOk queueDeclare (String queue , boolean durable , boolean exclusive , boolean autoDelete , Map arguments) throws IOException;
queueDeclare引數詳解
String queue | 佇列的名稱 |
---|---|
boolean durable | 設定是否持久化,為 true 則設定佇列為持久化,持久化的佇列會存盤,在 服務器重啟的時候可以保證不丟失相關資訊, |
boolean exclusive | 設定是否排他,為 true 則設定佇列為排他的,如果一個佇列被宣告為排 他佇列,該佇列僅對首次宣告它的連接可見,并在連接斷開時自動洗掉 |
boolean autoDelete | 設定是否自動洗掉,為 true 則設定佇列為自動洗掉,自動洗掉的前提是: 至少有一個消費者連接到這個佇列,之后所有與這個佇列連接的消費者都斷開時,才會 自動洗掉 |
Map arguments | 設定佇列的其他一些引數 |
3.2 發送訊息方法
發送訊息channel.basicPublish方法
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
basicPublish引數詳解
exchange | 交換器名稱 |
---|---|
routingKey | 路由鍵 |
props | 有14個成員 |
body | 訊息體,payload真正需要發送的訊息 |
mandatory | true時,交換器無法根據自動的型別和路由鍵找到一個符合條件的佇列,那么RabbitMq會呼叫Basic.Ruturn命令將訊息回傳給生產都,為false時,出現上述情況訊息被直接丟棄 |
immediate | true,如果交換器在訊息路由到佇列時發現沒有任何消費者,那么 這個訊息將不會存和佇列,當與路由匹配的所有佇列都沒有消費者時,會Basic.Return回傳給生產者3.0去掉了immediate 引數 |
immediate和mandatory 都是訊息傳遞程序中,不可達目的地是,將訊息回傳給生產者的功能
3.2 監聽佇列方法
監聽佇列channel.basicConsume方法
channel.basicConsume(String queue, boolean autoAck, Consumer callback);
basicConsume引數詳解
String queue | 佇列名 |
---|---|
boolean autoAck | 是否自動確認訊息,true自動確認,false 手動呼叫,建立設定為false |
Consumer callback | 消費者 DefaultConsumer建立使用,重寫其中的方法 |
3.3 限流策略
消費端的限流策略basicQos
void basicQos(unit prefetchSize , ushort prefetchCount, bool global )
basicQos引數詳解
unit prefetchSize | 0訊息大小是否限制 |
---|---|
ushort prefetchCount | 會告訴 RabbitMQ 不要同時給一個消費者推送多于 N 個訊息,即一旦有 N 個訊息還沒有 ack,則該 consumer 將 block 掉,直到有訊息 ack |
bool global | true、false 是否將上面設定應用于 channel,簡單點說,就是上面限制是 channel 級別的還是 consumer 級別 |
四. RabbitMQ特征
4.1 交換機
生產者發送訊息不會向傳統方式直接將訊息投遞到佇列中,而是先將訊息投遞到交換機中,在由交換機轉發到具體的佇列,佇列在將訊息以推送或者拉取方式給消費者進行消費,這和我們之前學習Nginx有點類似,
交換機的作用根據具體的路由策略分發到不同的佇列中,交換機有四種型別,
Direct exchange(直連交換機) | 根據訊息攜帶的路由鍵(routing key)將訊息投遞給對應佇列的 |
---|---|
Fanout exchange(扇型交換機) | 將訊息路由給系結到它身上的所有佇列 |
Topic exchange(主題交換機) | 佇列通過路由鍵系結到交換機上,然后,交換機根據訊息里的路由值,將訊息路由給一個或多個系結佇列 |
Headers exchange(頭交換機) | 類似主題交換機,但是頭交換機使用多個訊息屬性來代替路由鍵建立路由規則,通過判斷訊息頭的值能否與指定的系結相匹配來確立路由規則, |
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-qfVFQZcD-1629445988074)(C:\Users\Administrator\Desktop\gitee\檔案\RabbitMQ\img\image-20210820145734002.png)]
P:消費者
X:交換機
**紅色:**佇列
C:為消費者
4.2 應答模式
為了確保訊息不會丟失,RabbitMQ支持訊息應答,
消費者發送一個訊息應答,告訴RabbitMQ這個訊息已經接收并且處理完畢了,RabbitMQ就可以洗掉它了,
消費者掛掉卻沒有發送應答
如果一個消費者掛掉卻沒有發送應答,RabbitMQ會理解為這個訊息沒有處理完全,然后交給另一個消費者去重新處理,這樣,你就可以確認即使消費者偶爾掛掉也不會丟失任何訊息了,
沒有任何訊息超時限制;只有當消費者掛掉時,RabbitMQ才會重新投遞,即使處理一條訊息會花費很長的時間,
訊息應答是默認打開的,我們通過顯示的設定autoAsk=true關閉這種機制,
現即自動應答開,一旦我們完成任務,消費者會自動發送應答,通知RabbitMQ訊息已被處理,可以從記憶體洗掉,
如果消費者因宕機或鏈接失敗等原因沒有發送ACK(不同于ActiveMQ,在RabbitMQ里,訊息沒有過期的概念),則RabbitMQ會將訊息重新發送給其他監聽在佇列的下一個消費者,
案例:
//生產者端代碼不變,消費者端代碼這部分就是用于開啟手動應答模式的,
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
//注:第二個引數值為false代表關閉RabbitMQ的自動應答機制,改為手動應答,
//在處理完訊息時,回傳應答狀態,true表示為自動應答模式,
channel.basicAck(envelope.getDeliveryTag(), false);
4.3 公平轉發
目前訊息轉發機制是平均分配,這樣就會出現一種情況:
假設現在有兩個消費者,A消費者,消費完一個訊息耗時1秒,B消費者消費一個訊息耗時10秒,
現在有一萬條訊息進來,A和B各5000條,
A消費完耗時:1個小時多
B消費完耗時:14個小時
這合理么?這不合理!!!
為了解決這樣的問題,我們可以使用basicQos方法,
傳遞引數為prefetchCount= 1,這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條訊息,
換句話說,只有在消費者空閑的時候會發送下一條資訊,
調度分發訊息的方式,也就是告訴RabbitMQ每次只給消費者處理一條訊息,也就是等待消費者處理完畢并自己對剛剛處理的訊息進行確認之后,才發送下一條訊息,防止消費者太過于忙碌,也防止它太過去清閑,
通過 設定channel.basicQos(1);
五. 發布訂閱
5.1 扇型交換機
這個可能是訊息佇列中最重要的佇列了,其他的都是在它的基礎上進行了擴展,
功能實作:一個生產者發送訊息,多個消費者獲取訊息(同樣的訊息),包括一個生產者,一個交換機,多個佇列,多個消費者,
思路解讀(重點理解):
(1)一個生產者,多個消費者
(2)每一個消費者都有自己的一個佇列
(3)生產者沒有直接發訊息到佇列中,而是發送到交換機
(4)每個消費者的佇列都系結到交換機上
(5)訊息通過交換機到達每個消費者的佇列
該模式就是Fanout Exchange(扇型交換機)將訊息路由給系結到它身上的所有佇列
以用戶發郵件案例講解
注意:交換機沒有存盤訊息功能,如果訊息發送到沒有系結消費佇列的交換機,訊息則丟失,
5.2 生產者代碼
package com.xzz.level_two;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xzz.utlis.ConnectionUtil;
public class ProducerFanout {
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception{
Producer(1);
}
public static void Producer(int i) throws Exception {
// 1.創建新的連接
Connection connection = ConnectionUtil.getConnection();
// 2.創建通道
Channel channel = connection.createChannel();
// 3.系結的交換機 引數1互動機名稱 引數2 exchange型別
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String msg = "生產者發送訊息-----"+i;
// 4.發送訊息
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("生產者發送msg:" + msg);
// // 5.關閉通道、連接
channel.close();
connection.close();
// 注意:如果消費沒有系結交換機和佇列,則訊息會丟失
}
}
5.3 消費者代碼
package com.xzz.level_two;
import com.rabbitmq.client.*;
import com.xzz.utlis.ConnectionUtil;
import java.io.IOException;
public class ConsumerEmailFanout {
private static final String QUEUE_NAME = "consumerFanout_email";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception {
Consumer();
}
public static void Consumer() throws Exception {
System.out.println("郵件消費者啟動01");
// 1.創建新的連接
Connection connection = ConnectionUtil.getConnection();
// 2.創建通道
Channel channel = connection.createChannel();
// 3.消費者關聯佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.消費者系結交換機 引數1 佇列 引數2交換機 引數3 routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("01消費者獲取生產者訊息:" + msg);
}
};
// 5.消費者監聽佇列訊息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
5.2 直連交換機(路由模式)
生產者發送訊息到交換機并指定一個路由key,消費者佇列系結到交換機時要制定路由key(key匹配就能接受訊息,key不匹配就不能接受訊息)
例如:我們可以把路由key設定為insert ,那么消費者佇列key指定包含insert才可以接收訊息,消費者佇列key定義為update或者delete就不能接收訊息,很好的控制了更新,插入和洗掉的操作,
采用交換機direct模式
5.2.1 生產者
public class ProducerFanout {
private static final String EXCHANGE_NAME = "fanout_direct";
public static void main(String[] args) throws Exception{
// 1.創建新的連接
Connection connection = MQConnection.newConnection();
// 2.創建通道
Channel channel = connection.createChannel();
// 3.系結的交換機 引數1互動機名稱 引數2 exchange型別
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String routingKey = "error";
String msg = "fanout_exchange_msg"+routingKey;
// 4.發送訊息
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("生產者發送msg:" + msg);
// // 5.關閉通道、連接
channel.close();
connection.close();
// 注意:如果消費沒有系結交換機和佇列,則訊息會丟失
}
}
5.2.2 郵件消費者
public class ConsumerEmailFanout {
private static final String QUEUE_NAME = "consumerFanout_email";
private static final String EXCHANGE_NAME = "fanout_direct";
public static void main(String[] args) throws Exception {
System.out.println("郵件消費者啟動");
// 1.創建新的連接
Connection connection = MQConnection.newConnection();
// 2.創建通道
Channel channel = connection.createChannel();
// 3.消費者關聯佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.消費者系結交換機 引數1 佇列 引數2交換機 引數3 routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消費者獲取生產者訊息:" + msg);
}
};
// 5.消費者監聽佇列訊息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
5.2.3 短信消費者
public class ConsumerSMSFanout {
private static final String QUEUE_NAME = "ConsumerFanout_sms";
private static final String EXCHANGE_NAME = "fanout_direct";
public static void main(String[] args) throws Exception {
System.out.println("短信消費者啟動");
// 1.創建新的連接
Connection connection = MQConnection.newConnection();
// 2.創建通道
Channel channel = connection.createChannel();
// 3.消費者關聯佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.消費者系結交換機 引數1 佇列 引數2交換機 引數3 routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消費者獲取生產者訊息:" + msg);
}
};
// 5.消費者監聽佇列訊息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
5.3 通配符模式
說明:此模式是在路由key模式的基礎上,使用了通配符來管理消費者接收訊息,
生產者P發送訊息到交換機X,type=topic,交換機根據系結佇列的routing key的值進行通配符匹配;
符號#:匹配一個或者多個詞lazy.# 可以匹配lazy.irs或者lazy.irs.cor
符號*:只能匹配一個詞lazy.* 可以匹配lazy.irs或者lazy.cor
六. 確認機制
問題產生背景:
生產者發送訊息出去之后,不知道到底有沒有發送到RabbitMQ服務器, 默認是不知道的,而且有的時候我們在發送訊息之后,后面的邏輯出問題了,我們不想要發送之前的訊息了,需要撤回該怎么做,
解決方案:
1.AMQP 事務機制
2.Confirm 模式
6.1 事務模式
txSelect 將當前channel設定為transaction模式
txCommit 提交當前事務
txRollback 事務回滾
6.1.1 生產者
public class Producer {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws Exception{
// 1.獲取連接
Connection newConnection = MQConnection.newConnection();
// 2.創建通道
Channel channel = newConnection.createChannel();
// 3.創建佇列宣告
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 將當前管道設定為 txSelect 將當前channel設定為transaction模式 開啟事務
channel.txSelect();
String msg = "test_yushengjun110";
try {
// 4.發送訊息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
int i = 1 / 0;
channel.txCommit();// 提交事務
System.out.println("生產者發送訊息:" + msg);
} catch (Exception e) {
System.out.println("訊息進行回滾操作");
channel.txRollback();// 回滾事務
} finally {
channel.close();
newConnection.close();
}
}
}
6.1.2 消費者
public class Customer {
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws Exception{
// 1.獲取連接
Connection newConnection = MQConnection.newConnection();
// 2.獲取通道
final Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msgString = new String(body, "UTF-8");
System.out.println("消費者獲取訊息:" + msgString);
}
};
// 3.監聽佇列
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
七. SpringBoot 整合RabbitMQ
7.1 maven依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xzz</groupId>
<artifactId>RabbitMQ</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<!-- Environment Settings -->
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- common Settings -->
<fastjson.version>1.2.54</fastjson.version>
<jjwt.version>0.9.1</jjwt.version>
<druid-starter>1.1.10</druid-starter>
<mybatis-plus-boot-starter.version>3.4.0</mybatis-plus-boot-starter.version>
<hutool.version>5.7.3</hutool.version>
<aliyun-sdk-oss>3.4.0</aliyun-sdk-oss>
<easypoi.version>4.0.0</easypoi.version>
<lombok.version>1.18.12</lombok.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
</parent>
<dependencies>
<!-- web 依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- junit 依賴 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<!-- fastjson 依賴 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- test 依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- lombok 依賴 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
</dependencies>
</project>
7.2 配置類
spring:
rabbitmq:
####連接地址
host: 127.0.0.1
####埠號
port: 5672
####賬號
username: guest
####密碼
password: guest
### 地址
virtual-host: /
7.3 定義交換機和佇列
@Component
public class FanoutConfig {
// 郵件佇列
private String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue";
// 短信佇列
private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
// 交換機
private String EXCHANGE_NAME = "fanoutExchange";
// 1.定義佇列郵件
@Bean
public Queue fanOutEamilQueue() {
return new Queue(FANOUT_EMAIL_QUEUE);
}
// 1.定義佇列短信
@Bean
public Queue fanOutSmsQueue() {
return new Queue(FANOUT_SMS_QUEUE);
}
// 2.定義交換機
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}
// 3.佇列與交換機系結郵件佇列
@Bean
Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
}
// 4.佇列與交換機系結短信佇列
@Bean
Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
}
}
7.4 生產者
@Component
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String queueName) {
String msg = "my_fanout_msg:" + new Date();
System.out.println(msg + ":" + msg);
amqpTemplate.convertAndSend(queueName, msg);
}
}
7.5 消費者
@Component
@RabbitListener(queues = "fanout_eamil_queue")
public class FanoutEamilConsumer {
@RabbitHandler
public void process(String msg) throws Exception {
System.out.println("郵件消費者獲取生產者訊息msg:" + msg);
}
}
@Component
@RabbitListener(queues = "fanout_sms_queue")
public class FanoutSmsConsumer {
@RabbitHandler
public void process(String msg) {
System.out.println("短信消費者獲取生產者訊息msg:" + msg);
}
}
BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
}
// 4.佇列與交換機系結短信佇列
@Bean
Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
}
}
### 7.4 生產者
~~~java
@Component
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String queueName) {
String msg = "my_fanout_msg:" + new Date();
System.out.println(msg + ":" + msg);
amqpTemplate.convertAndSend(queueName, msg);
}
}
7.5 消費者
@Component
@RabbitListener(queues = "fanout_eamil_queue")
public class FanoutEamilConsumer {
@RabbitHandler
public void process(String msg) throws Exception {
System.out.println("郵件消費者獲取生產者訊息msg:" + msg);
}
}
@Component
@RabbitListener(queues = "fanout_sms_queue")
public class FanoutSmsConsumer {
@RabbitHandler
public void process(String msg) {
System.out.println("短信消費者獲取生產者訊息msg:" + msg);
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/295586.html
標籤:其他