主頁 >  其他 > RabbitMQ入門到精通(入門篇)

RabbitMQ入門到精通(入門篇)

2021-08-23 07:00:15 其他

RabbitMQ

一. 初識

1. 什么是MQ

官方解釋

MQ(Message Queue)訊息佇列,是基礎資料結構中“先進先出”的一種資料結構,
指把要傳輸的資料(訊息)放在佇列中,用佇列機制來實作訊息傳遞——生產者產生訊息并把訊息放入佇列,然后由消費者去處理,
消費者可以到指定佇列拉取訊息,或者訂閱相應的佇列,由MQ服務端給其推送訊息,

個人理解

mq就是訊息中間件,說白了,有點類似于中介,中間人,傳話者,

就拿快遞智能柜舉例:

  1. 快遞員將快遞放在快遞智能柜
  2. 用戶根據取貨碼,去找到存放自己快遞的快遞柜

在這個程序中:

快遞就是訊息

快遞員就是生產者

投放快遞就是生產訊息

用戶就是消費者

取出快遞就是消費訊息

快遞智能柜就是中間件

2. 幾大流行MQ對比

img

二. 基礎

1.概述

訊息佇列已經逐漸成為企業IT系統內部通信的核心手段,它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列功能,成為異步RPC的主要手段之一,當今市面上有很多主流的訊息中間件,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開發RocketMQ等,

2. 訊息中間件的組成

2.1 Broker

訊息服務器,作為server提供訊息核心服務

2.2 Producer

訊息生產者,業務的發起方,負責生產訊息傳輸給broker,

2.3 Consumer

訊息消費者,業務的處理方,負責從broker獲取訊息并進行業務邏輯處理

2.4 Topic

主題,發布訂閱模式下的訊息統一匯集地,不同生產者向topic發送訊息,由MQ服務器分發到不同的訂閱者,實作訊息的 廣播

2.5 Queue

佇列,PTP模式下,特定生產者向特定queue發送訊息,消費者訂閱特定的queue完成指定訊息的接收

2.6 Message

訊息體,根據不同通信協議定義的固定格式進行編碼的資料包,來封裝業務資料,實作訊息的傳輸

3. 訊息中間件模式分類

3.1 點對點

PTP點對點:使用queue作為通信載體

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-1YZ20w6N-1629445988070)(https://leanote.com/api/file/getImage?fileId=5ad56d7cab64411333000bb0)]

說明:
訊息生產者生產訊息發送到queue中,然后訊息消費者從queue中取出并且消費訊息,
訊息被消費以后,queue中不再存盤,所以訊息消費者不可能消費到已經被消費的訊息, Queue支持存在多個消費者,但是對一個訊息而言,只會有一個消費者可以消費,

3.2 發布/訂閱

Pub/Sub發布訂閱(廣播):使用topic作為通信載體

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-0m4382Ei-1629445988072)(https://leanote.com/api/file/getImage?fileId=5ad56d8bab6441153c000ab3)]

說明:
訊息生產者(發布)將訊息發布到topic中,同時有多個訊息消費者(訂閱)消費該訊息,和點對點方式不同,發布到topic的訊息會被所有訂閱者消費,

queue實作了負載均衡,將producer生產的訊息發送到訊息佇列中,由多個消費者消費,但一個訊息只能被一個消費者接受,當沒有消費者可用時,這個訊息會被保存直到有一個可用的消費者,
topic實作了發布和訂閱,當你發布一個訊息,所有訂閱這個topic的服務都能得到這個訊息,所以從1到N個訂閱者都能得到一個訊息的拷貝,

4. 訊息中間件的優勢

4.1 系統解耦

互動系統之間沒有直接的呼叫關系,只是通過訊息傳輸,故系統侵入性不強,耦合度低,

4.2 提高系統回應時間

例如原來的一套邏輯,完成支付可能涉及先修改訂單狀態、計算會員積分、通知物流配送幾個邏輯才能完成;通過MQ架構設計,就可將緊急重要(需要立刻回應)的業務放到該呼叫方法中,回應要求不高的使用訊息佇列,放到MQ佇列中,供消費者處理,

4.3 為大資料處理架構提供服務

通過訊息作為整合,大資料的背景下,訊息佇列還與實時處理架構整合,為資料處理提供性能支持,

4.4 Java訊息服務——JMS

Java訊息服務(Java Message Service,JMS)應用程式介面是一個Java平臺中關于面向訊息中間件(MOM)的API,用于在兩個應用程式之間,或分布式系統中發送訊息,進行異步通信,
JMS中的P2P和Pub/Sub訊息模式:點對點(point to point, queue)與發布訂閱(publish/subscribe,topic)最初是由JMS定義的,這兩種模式主要區別或解決的問題就是發送到佇列的訊息能否重復消費(多訂閱),

5. 訊息中間件應用場景

? 5.1 異步通信

有些業務不想也不需要立即處理訊息,訊息佇列提供了異步處理機制,允許用戶把一個訊息放入佇列,但并不立即處理它,想向佇列中放入多少訊息就放多少,然后在需要的時候再去處理它們,

5.2 解耦

降低工程間的強依賴程度,針對異構系統進行適配,在專案啟動之初來預測將來專案會碰到什么需求,是極其困難的,通過訊息系統在處理程序中間插入了一個隱含的、基于資料的介面層,兩邊的處理程序都要實作這一介面,當應用發生變化時,可以獨立的擴展或修改兩邊的處理程序,只要確保它們遵守同樣的介面約束,

5.3 冗余

有些情況下,處理資料的程序會失敗,除非資料被持久化,否則將造成丟失,訊息佇列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丟失風險,許多訊息佇列所采用的”插入-獲取-洗掉”范式中,在把一個訊息從佇列中洗掉之前,需要你的處理系統明確的指出該訊息已經被處理完畢,從而確保你的資料被安全的保存直到你使用完畢,

5.4 擴展性

因為訊息佇列解耦了你的處理程序,所以增大訊息入隊和處理的頻率是很容易的,只要另外增加處理程序即可,不需要改變代碼、不需要調節引數,便于分布式擴容,

5.5 過載保護

在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量無法提取預知;如果以為了能處理這類瞬間峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費,使用訊息佇列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰,

5.6 可恢復性

系統的一部分組件失效時,不會影響到整個系統,訊息佇列降低了行程間的耦合度,所以即使一個處理訊息的行程掛掉,加入佇列中的訊息仍然可以在系統恢復后被處理,

5.7 順序保證

在大多使用場景下,資料處理的順序都很重要,大部分訊息佇列本來就是排序的,并且能保證資料會按照特定的順序來處理,

5.8 緩沖

在任何重要的系統中,都會有需要不同的處理時間的元素,訊息佇列通過一個緩沖層來幫助任務最高效率的執行,該緩沖有助于控制和優化資料流經過系統的速度,以調節系統回應時間,

5.9 資料流處理

分布式系統產生的海量資料流,如:業務日志、監控資料、用戶行為等,針對這些資料流進行實時或批量采集匯總,然后進行大資料分析是當前互聯網的必備技術,通過訊息佇列完成此類資料收集是最好的選擇,

6 訊息中間件常用協議

6.1 AMQP協議

AMQP即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高級訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中間件設計,基于此協議的客戶端與訊息中間件可傳遞訊息,并不受客戶端/中間件不同產品,不同開發語言等條件的限制,
優點:可靠、通用

6.2 MQTT協議

MQTT(Message Queuing Telemetry Transport,訊息佇列遙測傳輸)是IBM開發的一個即時通訊協議,有可能成為物聯網的重要組成部分,該協議支持所有平臺,幾乎可以把所有聯網物品和外部連接起來,被用來當做傳感器和致動器(比如通過Twitter讓房屋聯網)的通信協議,
優點:格式簡潔、占用帶寬小、移動端通信、PUSH、嵌入式系統

6.3 STOMP協議

STOMP(Streaming Text Orientated Message Protocol)是流文本定向訊息協議,是一種為MOM(Message Oriented Middleware,面向訊息的中間件)設計的簡單文本協議,STOMP提供一個可互操作的連接格式,允許客戶端與任意STOMP訊息代理(Broker)進行互動,
優點:命令模式(非topic\queue模式)

6.4 XMPP協議

XMPP(可擴展訊息處理現場協議,Extensible Messaging and Presence Protocol)是基于可擴展標記語言(XML)的協議,多用于即時訊息(IM)以及在線現場探測,適用于服務器之間的準即時操作,核心是基于XML流傳輸,這個協議可能最終允許因特網用戶向因特網上的其他任何人發送即時訊息,即使其作業系統和瀏覽器不同,
優點:通用公開、兼容性強、可擴展、安全性高,但XML編碼格式占用帶寬大

6.5 其他基于TCP/IP自定義的協議

有些特殊框架(如:redis、kafka、zeroMq等)根據自身需要未嚴格遵循MQ規范,而是基于TCP\IP自行封裝了一套協議,通過網路socket介面進行傳輸,實作了MQ的功能,

三. 入門

1. 環境配置

1.1 安裝erlang語言環境;

然后配置環境變數,和jdk步驟一抹一樣;

RabbitsMQ 3.93版本

官方下載:https://www.rabbitmq.com/install-windows.html

云盤下載:https://www.aliyundrive.com/s/3ZQr3PXpBER

erlang 24.0版本

官方下載:https://www.erlang.org/downloads

云盤下載:https://www.aliyundrive.com/s/tUWUznkcuBs

erlang官網:https://www.erlang.org/
erlang與RabbitsMQ版本參考:https://www.rabbitmq.com/which-erlang.html

1.2 安裝rabbitMQ

無腦下一步

在開始選單中找到rabbitMQ.start ,點擊運行;

1.3 進入圖形頁面

訪問http://127.0.0.1:15672 默認賬戶:guest 密碼:guest

1.4 如果訪問報錯

cmd進入rabbitmq安裝目錄sbin目錄(D:\app_instrall\rabbitmq\rabbitmq_server-3.9.3\sbin)下分別執行這兩句
(1) rabbitmq-plugins enable rabbitmq_management //安裝圖形頁面插件
(2)rabbitmqctl start_app	//重啟RebbitMQ

1.5 創建用戶

在這里插入圖片描述

1.6 創建VirtualHost

在這里插入圖片描述

1.7 給用戶賦予權限

在這里插入圖片描述

*VirtualHost:如果把RabbitMQ看成一個資料庫,那么VirtualHost 就是 作業空間,RabbitMQ可以擁有很多個VirtualHost,相當于資料庫中的作業空間,可以有多個,根據業務的不同,連接不同的VirtualHost,達到解耦的目的;

2. 簡單佇列(點對點)

功能:一個生產者P發送訊息到佇列Q,一個消費者C接收

點對點 一個訊息只能讓一個消費者消費

在這里插入圖片描述

2.1 引入依賴

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>

2.2 連接工具類

package com.xzz.utlis;


import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {

	/**
	 * 獲取連接
	 * @return Connection
	 * @throws Exception
	 */
	public static Connection getConnection() throws Exception {
		//定義連接工廠
		ConnectionFactory factory = new ConnectionFactory();
		//RabbitMQ ip地址
		factory.setHost("127.0.0.1");
		//amqp 通訊地址
		factory.setPort(5672);
		//設定vhost
		factory.setVirtualHost("test");
		factory.setUsername("xzz");
		factory.setPassword("123@qwe");
		//通過工廠獲取連接
		Connection connection = factory.newConnection();
		return connection;
	}

}

2.3 生產者代碼

private static final String QUEUE_NAME = "test_queue";

    	//創建佇列,發送訊息
	public static void main(String[] args) throws Exception {
		//獲取連接
		Connection connection = ConnectionUtil.getConnection();
		//創建通道
		Channel channel = connection.createChannel();
		//宣告創建佇列
		channel.queueDeclare(QUEUE_NAME,false,false,false,null);
		//訊息內容
		String message = "Hello World!";
		channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
		System.out.println("發送訊息:"+message);
		//關閉連接和通道
		channel.close();
		connection.close();
	}

2.4 消費者代碼

private static final String QUEUE_NAME = "test_queue";    
//消費者消費訊息
    public static void main(String[] args) throws Exception {
        //獲取連接和通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //宣告通道
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //定義消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //監聽佇列
        channel.basicConsume(QUEUE_NAME,true,consumer);

        while(true){
            //這個方法會阻塞住,直到獲取到訊息
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("接收到訊息:"+message);
        }
    }

三. 方法詳解

3.1 創建佇列方法

創建佇列宣告channel.queueDeclare方法

DeclareOk queueDeclare() throws IOException;

DeclareOk queueDeclare (String queue , boolean durable , boolean exclusive , boolean autoDelete , Map arguments) throws IOException; 

queueDeclare引數詳解

String queue佇列的名稱
boolean durable設定是否持久化,為 true 則設定佇列為持久化,持久化的佇列會存盤,在 服務器重啟的時候可以保證不丟失相關資訊,
boolean exclusive設定是否排他,為 true 則設定佇列為排他的,如果一個佇列被宣告為排 他佇列,該佇列僅對首次宣告它的連接可見,并在連接斷開時自動洗掉
boolean autoDelete設定是否自動洗掉,為 true 則設定佇列為自動洗掉,自動洗掉的前提是: 至少有一個消費者連接到這個佇列,之后所有與這個佇列連接的消費者都斷開時,才會 自動洗掉
Map arguments設定佇列的其他一些引數

3.2 發送訊息方法

發送訊息channel.basicPublish方法

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
            throws IOException;

 void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)  
throws IOException;

basicPublish引數詳解

exchange交換器名稱
routingKey路由鍵
props有14個成員
body訊息體,payload真正需要發送的訊息
mandatorytrue時,交換器無法根據自動的型別和路由鍵找到一個符合條件的佇列,那么RabbitMq會呼叫Basic.Ruturn命令將訊息回傳給生產都,為false時,出現上述情況訊息被直接丟棄
immediatetrue,如果交換器在訊息路由到佇列時發現沒有任何消費者,那么 這個訊息將不會存和佇列,當與路由匹配的所有佇列都沒有消費者時,會Basic.Return回傳給生產者3.0去掉了immediate 引數

immediate和mandatory 都是訊息傳遞程序中,不可達目的地是,將訊息回傳給生產者的功能

3.2 監聽佇列方法

監聽佇列channel.basicConsume方法

channel.basicConsume(String queue, boolean autoAck, Consumer callback);

basicConsume引數詳解

String queue佇列名
boolean autoAck是否自動確認訊息,true自動確認,false 手動呼叫,建立設定為false
Consumer callback消費者 DefaultConsumer建立使用,重寫其中的方法

3.3 限流策略

消費端的限流策略basicQos

void basicQos(unit prefetchSize , ushort prefetchCount, bool global )

basicQos引數詳解

unit prefetchSize0訊息大小是否限制
ushort prefetchCount會告訴 RabbitMQ 不要同時給一個消費者推送多于 N 個訊息,即一旦有 N 個訊息還沒有 ack,則該 consumer 將 block 掉,直到有訊息 ack
bool globaltrue、false 是否將上面設定應用于 channel,簡單點說,就是上面限制是 channel 級別的還是 consumer 級別

四. RabbitMQ特征

4.1 交換機

生產者發送訊息不會向傳統方式直接將訊息投遞到佇列中,而是先將訊息投遞到交換機中,在由交換機轉發到具體的佇列,佇列在將訊息以推送或者拉取方式給消費者進行消費,這和我們之前學習Nginx有點類似,

交換機的作用根據具體的路由策略分發到不同的佇列中,交換機有四種型別,

Direct exchange(直連交換機)根據訊息攜帶的路由鍵(routing key)將訊息投遞給對應佇列的
Fanout exchange(扇型交換機)將訊息路由給系結到它身上的所有佇列
Topic exchange(主題交換機)佇列通過路由鍵系結到交換機上,然后,交換機根據訊息里的路由值,將訊息路由給一個或多個系結佇列
Headers exchange(頭交換機)類似主題交換機,但是頭交換機使用多個訊息屬性來代替路由鍵建立路由規則,通過判斷訊息頭的值能否與指定的系結相匹配來確立路由規則,

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-qfVFQZcD-1629445988074)(C:\Users\Administrator\Desktop\gitee\檔案\RabbitMQ\img\image-20210820145734002.png)]

P:消費者

X:交換機

**紅色:**佇列

C:為消費者

4.2 應答模式

為了確保訊息不會丟失,RabbitMQ支持訊息應答,

消費者發送一個訊息應答,告訴RabbitMQ這個訊息已經接收并且處理完畢了,RabbitMQ就可以洗掉它了,

消費者掛掉卻沒有發送應答

如果一個消費者掛掉卻沒有發送應答,RabbitMQ會理解為這個訊息沒有處理完全,然后交給另一個消費者去重新處理,這樣,你就可以確認即使消費者偶爾掛掉也不會丟失任何訊息了,

沒有任何訊息超時限制;只有當消費者掛掉時,RabbitMQ才會重新投遞,即使處理一條訊息會花費很長的時間,

訊息應答是默認打開的,我們通過顯示的設定autoAsk=true關閉這種機制,

現即自動應答開,一旦我們完成任務,消費者會自動發送應答,通知RabbitMQ訊息已被處理,可以從記憶體洗掉,

如果消費者因宕機或鏈接失敗等原因沒有發送ACK(不同于ActiveMQ,在RabbitMQ里,訊息沒有過期的概念),則RabbitMQ會將訊息重新發送給其他監聽在佇列的下一個消費者,

案例:

//生產者端代碼不變,消費者端代碼這部分就是用于開啟手動應答模式的,

channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

//注:第二個引數值為false代表關閉RabbitMQ的自動應答機制,改為手動應答,

//在處理完訊息時,回傳應答狀態,true表示為自動應答模式, 

channel.basicAck(envelope.getDeliveryTag(), false);

4.3 公平轉發

目前訊息轉發機制是平均分配,這樣就會出現一種情況:

假設現在有兩個消費者,A消費者,消費完一個訊息耗時1秒,B消費者消費一個訊息耗時10秒,

現在有一萬條訊息進來,A和B各5000條,

A消費完耗時:1個小時多

B消費完耗時:14個小時

這合理么?這不合理!!!

為了解決這樣的問題,我們可以使用basicQos方法,

傳遞引數為prefetchCount= 1,這樣告訴RabbitMQ不要在同一時間給一個消費者超過一條訊息,

換句話說,只有在消費者空閑的時候會發送下一條資訊,

調度分發訊息的方式,也就是告訴RabbitMQ每次只給消費者處理一條訊息,也就是等待消費者處理完畢并自己對剛剛處理的訊息進行確認之后,才發送下一條訊息,防止消費者太過于忙碌,也防止它太過去清閑,

通過 設定channel.basicQos(1);

五. 發布訂閱

5.1 扇型交換機

在這里插入圖片描述

這個可能是訊息佇列中最重要的佇列了,其他的都是在它的基礎上進行了擴展,

功能實作:一個生產者發送訊息,多個消費者獲取訊息(同樣的訊息),包括一個生產者,一個交換機,多個佇列,多個消費者,

思路解讀(重點理解):

(1)一個生產者,多個消費者

(2)每一個消費者都有自己的一個佇列

(3)生產者沒有直接發訊息到佇列中,而是發送到交換機

(4)每個消費者的佇列都系結到交換機上

(5)訊息通過交換機到達每個消費者的佇列

該模式就是Fanout Exchange(扇型交換機)將訊息路由給系結到它身上的所有佇列

以用戶發郵件案例講解

注意:交換機沒有存盤訊息功能,如果訊息發送到沒有系結消費佇列的交換機訊息則丟失,

5.2 生產者代碼

package com.xzz.level_two;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xzz.utlis.ConnectionUtil;

public class ProducerFanout {
   private static final String EXCHANGE_NAME = "fanout_exchange";

   public static void main(String[] args) throws Exception{
           Producer(1);
   }

   public static void Producer(int i) throws Exception {
       // 1.創建新的連接
       Connection connection = ConnectionUtil.getConnection();
       // 2.創建通道
       Channel channel = connection.createChannel();
       // 3.系結的交換機 引數1互動機名稱 引數2 exchange型別
       channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
       String msg = "生產者發送訊息-----"+i;
       // 4.發送訊息
       channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
       System.out.println("生產者發送msg:" + msg);
       // // 5.關閉通道、連接
       channel.close();
       connection.close();
       // 注意:如果消費沒有系結交換機和佇列,則訊息會丟失
   }

}

5.3 消費者代碼

package com.xzz.level_two;

import com.rabbitmq.client.*;
import com.xzz.utlis.ConnectionUtil;


import java.io.IOException;

public class ConsumerEmailFanout {
   private static final String QUEUE_NAME = "consumerFanout_email";
   private static final String EXCHANGE_NAME = "fanout_exchange";

   public static void main(String[] args) throws Exception {
      Consumer();
   }

   public static void Consumer() throws Exception {
      System.out.println("郵件消費者啟動01");
      // 1.創建新的連接
      Connection connection = ConnectionUtil.getConnection();
      // 2.創建通道
      Channel channel = connection.createChannel();
      // 3.消費者關聯佇列
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      // 4.消費者系結交換機 引數1 佇列 引數2交換機 引數3 routingKey
      channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
      DefaultConsumer consumer = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                 throws IOException {
            String msg = new String(body, "UTF-8");
            System.out.println("01消費者獲取生產者訊息:" + msg);
         }
      };
      // 5.消費者監聽佇列訊息
      channel.basicConsume(QUEUE_NAME, true, consumer);
   }
}

5.2 直連交換機(路由模式)

在這里插入圖片描述

生產者發送訊息到交換機并指定一個路由key,消費者佇列系結到交換機時要制定路由key(key匹配就能接受訊息,key不匹配就不能接受訊息)

例如:我們可以把路由key設定為insert ,那么消費者佇列key指定包含insert才可以接收訊息,消費者佇列key定義為update或者delete就不能接收訊息,很好的控制了更新,插入和洗掉的操作,

采用交換機direct模式

5.2.1 生產者

public class ProducerFanout {
   private static final String EXCHANGE_NAME = "fanout_direct";

   public static void main(String[] args) throws Exception{
      // 1.創建新的連接
      Connection connection = MQConnection.newConnection();
      // 2.創建通道
      Channel channel = connection.createChannel();
      // 3.系結的交換機 引數1互動機名稱 引數2 exchange型別
      channel.exchangeDeclare(EXCHANGE_NAME, "direct");
      String routingKey = "error";
      String msg = "fanout_exchange_msg"+routingKey;
      // 4.發送訊息
      channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
       System.out.println("生產者發送msg:" + msg);
      // // 5.關閉通道、連接
       channel.close();
       connection.close();
      // 注意:如果消費沒有系結交換機和佇列,則訊息會丟失
   }
}

5.2.2 郵件消費者

public class ConsumerEmailFanout {
   private static final String QUEUE_NAME = "consumerFanout_email";
   private static final String EXCHANGE_NAME = "fanout_direct";

   public static void main(String[] args) throws Exception {
      System.out.println("郵件消費者啟動");
      // 1.創建新的連接
      Connection connection = MQConnection.newConnection();
      // 2.創建通道
      Channel channel = connection.createChannel();
      // 3.消費者關聯佇列
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      // 4.消費者系結交換機 引數1 佇列 引數2交換機 引數3 routingKey
      channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
      channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
      DefaultConsumer consumer = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
               throws IOException {
            String msg = new String(body, "UTF-8");
            System.out.println("消費者獲取生產者訊息:" + msg);
         }
      };
      // 5.消費者監聽佇列訊息
      channel.basicConsume(QUEUE_NAME, true, consumer);
   }
}


5.2.3 短信消費者

public class ConsumerSMSFanout {
   private static final String QUEUE_NAME = "ConsumerFanout_sms";
   private static final String EXCHANGE_NAME = "fanout_direct";

   public static void main(String[] args) throws Exception {
      System.out.println("短信消費者啟動");
      // 1.創建新的連接
      Connection connection = MQConnection.newConnection();
      // 2.創建通道
      Channel channel = connection.createChannel();
      // 3.消費者關聯佇列
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      // 4.消費者系結交換機 引數1 佇列 引數2交換機 引數3 routingKey
      channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
      DefaultConsumer consumer = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
               throws IOException {
            String msg = new String(body, "UTF-8");
            System.out.println("消費者獲取生產者訊息:" + msg);
         }
      };
      // 5.消費者監聽佇列訊息
      channel.basicConsume(QUEUE_NAME, true, consumer);
   }

}

5.3 通配符模式

在這里插入圖片描述

說明:此模式是在路由key模式的基礎上,使用了通配符來管理消費者接收訊息,

生產者P發送訊息到交換機X,type=topic,交換機根據系結佇列的routing key的值進行通配符匹配;

符號#:匹配一個或者多個詞lazy.# 可以匹配lazy.irs或者lazy.irs.cor

符號*:只能匹配一個詞lazy.* 可以匹配lazy.irs或者lazy.cor

六. 確認機制

問題產生背景:

生產者發送訊息出去之后,不知道到底有沒有發送到RabbitMQ服務器, 默認是不知道的,而且有的時候我們在發送訊息之后,后面的邏輯出問題了,我們不想要發送之前的訊息了,需要撤回該怎么做,

解決方案:

1.AMQP 事務機制

2.Confirm 模式

6.1 事務模式

txSelect 將當前channel設定為transaction模式

txCommit 提交當前事務

txRollback 事務回滾

6.1.1 生產者

public class Producer {
   private static final String QUEUE_NAME = "test_queue";

   public static void main(String[] args) throws Exception{
      // 1.獲取連接
      Connection newConnection = MQConnection.newConnection();
      // 2.創建通道
      Channel channel = newConnection.createChannel();
      // 3.創建佇列宣告
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      // 將當前管道設定為 txSelect 將當前channel設定為transaction模式 開啟事務
      channel.txSelect();
      String msg = "test_yushengjun110";
      try {
         // 4.發送訊息
         channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
         int i = 1 / 0;
         channel.txCommit();// 提交事務
         System.out.println("生產者發送訊息:" + msg);
      } catch (Exception e) {
         System.out.println("訊息進行回滾操作");
         channel.txRollback();// 回滾事務
      } finally {
         channel.close();
         newConnection.close();
      }
   }
}

6.1.2 消費者

public class Customer {
   private static final String QUEUE_NAME = "test_queue";

   public static void main(String[] args) throws Exception{
      // 1.獲取連接
      Connection newConnection = MQConnection.newConnection();
      // 2.獲取通道
      final Channel channel = newConnection.createChannel();
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
               throws IOException {
            String msgString = new String(body, "UTF-8");
            System.out.println("消費者獲取訊息:" + msgString);
         }
      };
      // 3.監聽佇列
      channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
   }
}

七. SpringBoot 整合RabbitMQ

7.1 maven依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xzz</groupId>
    <artifactId>RabbitMQ</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <!-- Environment Settings -->
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <!-- common Settings -->
        <fastjson.version>1.2.54</fastjson.version>
        <jjwt.version>0.9.1</jjwt.version>
        <druid-starter>1.1.10</druid-starter>
        <mybatis-plus-boot-starter.version>3.4.0</mybatis-plus-boot-starter.version>
        <hutool.version>5.7.3</hutool.version>
        <aliyun-sdk-oss>3.4.0</aliyun-sdk-oss>
        <easypoi.version>4.0.0</easypoi.version>
        <lombok.version>1.18.12</lombok.version>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.5.RELEASE</version>
    </parent>

    <dependencies>
        <!-- web 依賴 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- junit 依賴 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>
        <!-- fastjson 依賴 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <!-- test 依賴 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- lombok 依賴 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>

    </dependencies>
</project>

7.2 配置類

spring:
  rabbitmq:
  ####連接地址
    host: 127.0.0.1
   ####埠號   
    port: 5672
   ####賬號 
    username: guest
   ####密碼  
    password: guest
   ### 地址
    virtual-host: /

7.3 定義交換機和佇列

@Component
public class FanoutConfig {

	// 郵件佇列
	private String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue";

	// 短信佇列
	private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
	// 交換機
	private String EXCHANGE_NAME = "fanoutExchange";

	// 1.定義佇列郵件
	@Bean
	public Queue fanOutEamilQueue() {
		return new Queue(FANOUT_EMAIL_QUEUE);
	}
	// 1.定義佇列短信
	@Bean
	public Queue fanOutSmsQueue() {
		return new Queue(FANOUT_SMS_QUEUE);
	}

	// 2.定義交換機
	@Bean
	FanoutExchange fanoutExchange() {
		return new FanoutExchange(EXCHANGE_NAME);
	}

	// 3.佇列與交換機系結郵件佇列
	@Bean
	Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
		return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
	}

	// 4.佇列與交換機系結短信佇列
	@Bean
	Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
		return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
	}
}

7.4 生產者

@Component
public class FanoutProducer {
	@Autowired
	private AmqpTemplate amqpTemplate;

	public void send(String queueName) {
		String msg = "my_fanout_msg:" + new Date();
		System.out.println(msg + ":" + msg);
		amqpTemplate.convertAndSend(queueName, msg);
	}
}

7.5 消費者

@Component
@RabbitListener(queues = "fanout_eamil_queue")
public class FanoutEamilConsumer {
	@RabbitHandler
	public void process(String msg) throws Exception {
		System.out.println("郵件消費者獲取生產者訊息msg:" + msg);
	}
}

@Component
@RabbitListener(queues = "fanout_sms_queue")
public class FanoutSmsConsumer {
	@RabbitHandler
	public void process(String msg) {
		System.out.println("短信消費者獲取生產者訊息msg:" + msg);
	}
}

BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
}

// 4.佇列與交換機系結短信佇列
@Bean
Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
	return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
}

}


### 7.4 生產者

~~~java
@Component
public class FanoutProducer {
	@Autowired
	private AmqpTemplate amqpTemplate;

	public void send(String queueName) {
		String msg = "my_fanout_msg:" + new Date();
		System.out.println(msg + ":" + msg);
		amqpTemplate.convertAndSend(queueName, msg);
	}
}

7.5 消費者

@Component
@RabbitListener(queues = "fanout_eamil_queue")
public class FanoutEamilConsumer {
	@RabbitHandler
	public void process(String msg) throws Exception {
		System.out.println("郵件消費者獲取生產者訊息msg:" + msg);
	}
}

@Component
@RabbitListener(queues = "fanout_sms_queue")
public class FanoutSmsConsumer {
	@RabbitHandler
	public void process(String msg) {
		System.out.println("短信消費者獲取生產者訊息msg:" + msg);
	}
}

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/295586.html

標籤:其他

上一篇:Flinksql ----HiveCatalog

下一篇:分布式框架之(四)通過Dubbo來實作服務消費方遠程呼叫服務提供方的方法

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more