RabbitMQ
MQ全稱為Message Queue,即訊息佇列,RabbitMQ是由erlang語言開發,基于AMQP(Advanced Message Queue高級訊息佇列協議)協議實作的訊息佇列,它是一種應用程式之間的通訊方式,訊息佇列在分布式系統應用非常廣泛,
應用場景
- 任務異步處理
將不需要同步處理的并且消耗時長的操作由訊息佇列接收進行異步處理,可以提高程式的回應時間- 應用程式解耦合
MQ相當于一個中介,生產方式通過MQ與消費方互動,它將應用程式進行解耦合
常用的訊息佇列
ActiveMQ RabbitMQ ZeroMQ Kafka MetaMQ RocketMQ Redis
為什么使用RabbitMQ呢
- 使用簡單,功能強大
- 基于AMQP協議
- 社區活躍,檔案完善
- 高并發性能好,基于erlang語言
- SpringBoot默認集成RabbitMQ
AMQP介紹
AMQP即Advanced Message Queuing Protocol一個統一訊息服務的應用層標準高級訊息佇列協議,即應用層協議的一個開放標準,為面向訊息的中間件設計,基于此協議的客戶端與訊息中間件可傳遞訊息,并不受客戶端/中間件不同產品,不同的開發語言等條件的限制,Erlang中的實作有RabbitMQ等,
JMS是什么?
是java訊息服務(Java Message Service)應用程式介面是一個java平臺中關于面向訊息中間件(MOM)的API,用于在兩個應用程式之間,或分布式系統中發送訊息,進行異步通信,Java訊息服務是一個與平臺無關的API,絕大多數MOM提供商都對JMS提供支持,
總結
JMS是java提供的一套訊息服務API標準,其目的是為所有的java應用程式提供統一的訊息通信的標準,類似于java的jdbc,只是遵循jms標準的應用程式之間都可以進行訊息通信,它和AMQP有什么不同,jms是java語言專屬的訊息服務標準,它是在api層定義標準,并且智能用于java應用,而AMQP是在協議層定義的標準,是跨語言的,
RabbitMQ的作業原理
Broker:訊息佇列服務行程,該行程包括兩部分:Exchange和Queue
Exchange:訊息佇列交換機,按一定的規則將訊息路由轉發到某和佇列,對訊息進行過濾
Queue:訊息佇列,存盤訊息的佇列,訊息到達佇列并轉發給指定的消費方
Producer:訊息生產者,即生產方客戶端,生產方客戶端將訊息發送到MQ
Consumer:訊息消費者,即消費方客戶端,接收MQ轉發的訊息
訊息發布接收流程
發送訊息
- 生產者和Broker建立TCP連接
- 生產者和Broker建立通道
- 生產者通過通道訊息發送給Broker,由Exchange將訊息進行轉發
- Exchange將訊息轉發到指定的Queue
接收訊息- 消費者和Broker建立TCP連接
- 消費者和Broker建立通道
- 消費者監聽指定的Queue
- 當有訊息到達Queue時Broker默認將訊息推送給消費者
- 消費者接收到訊息
下載與安裝
本來想把自己的opt與rabbitMQ上傳上來的,該檔案已存在,安裝的話就百度下吧,安裝好后,打開服務,界面如下
簡單入門案例撰寫
- 創建maven工程
- 匯入依賴
- 撰寫生產者代碼
- 撰寫消費者代碼
- 結果截圖
maven工程
我是用springboot創建的maven工程,其parent使用的是2.0.0,里面使用了lambok插件
依賴如下
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.8.13</version>
<scope>test</scope>
</dependency>
</dependencies>
創建生產者
public class Producer01 {
//佇列名稱
private static final String QUEUE = "helloworld";
@SneakyThrows
public static void main(String[] args) {
//創建連接工廠物件
ConnectionFactory factory = new ConnectionFactory();
//鏈接資訊
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//rabbitmq默認虛擬機名稱為“/”,虛擬機相當于一個獨立的mq服務 器
factory.setVirtualHost("/");
//創建于rabbitMQ服務得tcp連接
Connection connection = null;
Channel channel = null;
try {
connection= factory.newConnection();
//創建與exchange的通道,每個連接可以創建多個通道,每個通道代表一個會話任務
channel = connection.createChannel();
/**
* 宣告佇列,如果rabbit中沒有次佇列將自動創建
* param1:佇列名稱
* param2:是否持久化,若持久化,mq重啟后該佇列仍然存在
* param3:佇列是否獨占此連接,佇列只允許在該連接中訪問,如果連接時關閉,則佇列自動洗掉
* param4:佇列不再使用時是夠自動洗掉次佇列
* param5:擴展引數
*/
channel.queueDeclare(QUEUE,true,false,false,null);
String message = "hello world"+System.currentTimeMillis();
/**
* 訊息發布方法
* param1:exchange的名稱,如果沒有指定,則使用default exchange
* param2:routinKey,訊息的路由key,是用于Exchange(交換機)將訊息轉發到指定的訊息
* param3:訊息包含的屬性
* param4:訊息體
*/
channel.basicPublish("",QUEUE,null,message.getBytes());
System.out.println("Send Message is:"+message+"'");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if (channel!= null){
channel.close();
}
if (connection != null){
connection.close();
}
}
}
}
創建消費者
public class Consumer01 {
private static final String QUEUE = "helloworld";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
//設定rabbitMQ所在服務器的ip和埠
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
//宣告一個佇列
channel.queueDeclare(QUEUE,true,false,false,null);
//定義消費方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 消費者接收訊息呼叫此方法
* @param consumerTag 消費者的標簽,在channel.baseicConsumer()去指定
* @param envelope 訊息包的內容,可從中獲取訊息id,訊息routingkey,叫花雞,訊息和重傳標志
* (收到訊息失敗后是否需要沖洗發送)
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//訊息id
long deliveryTag = envelope.getDeliveryTag();
//訊息內容
String msg = new String(body, "utf-8");
System.out.println("receive message..."+msg);
}
};
/**
* 監聽佇列String queue,boolean autoAck,Consumer callback
* String queue:佇列名稱
* boolean autoAck:設定為true表示訊息接收自動向mq回復收到了,mq接收到回復會洗掉訊息,設定為false則需要手動回復
* Consumer callback:消費訊息的方法,消費者接收到訊息后呼叫此方法
*/
channel.basicConsume(QUEUE,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
結果截圖
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/295594.html
標籤:其他