十年河東,十年河西,莫欺少年窮
學無止境,精益求精
netcore3.1控制臺應用程式,引入MQTTnet 2.8版本
訂閱端:

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using MQTTnet; using MQTTnet.Server; using MQTTnet.Client; using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using MQTTnet.Protocol; namespace swapConsole { class Program { private static MqttClient mqttClient = null; private static string topic = "test123ABC"; private static IMqttClientOptions Options { get { MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder(); builder.WithCleanSession(false); //用戶名 密碼 builder.WithCredentials("", ""); var id = Guid.NewGuid().ToString(); builder.WithClientId(id); builder.WithTcpServer("1270.0.0.0", 1883); return builder.Build(); } } static async Task Main(string[] args) { MqttFactory factory = new MqttFactory(); if (mqttClient == null) { mqttClient = (MqttClient)factory.CreateMqttClient(); mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived; mqttClient.Connected += MqttClient_Connected; mqttClient.Disconnected += async (s, e) => { Console.WriteLine("嘗試重連!" + Environment.NewLine); await ConnectToServer(); }; } await ConnectToServer(); Console.ReadLine(); } /// <summary> /// 連接MQTT服務器 /// </summary> private static async Task ConnectToServer() { try { var res =await mqttClient.ConnectAsync(Options); } catch (Exception ex) { Console.WriteLine($"連接到MQTT服務器失敗!" + Environment.NewLine + ex.Message + Environment.NewLine); } } /// <summary> /// 連接MQTT服務器觸發 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void MqttClient_Connected(object sender, EventArgs e) { Console.WriteLine("已連接到MQTT服務器!" + Environment.NewLine); SubscribeInfo(); } /// <summary> /// 接收訊息 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) { Console.WriteLine($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}"); } /// <summary> /// 訂閱訊息 /// </summary> public static void SubscribeInfo() { if (string.IsNullOrEmpty(topic)) { Console.WriteLine("訂閱主題不能為空!"); return; } if (!mqttClient.IsConnected) { Console.WriteLine("MQTT客戶端尚未連接!"); return; } mqttClient.SubscribeAsync(new List<TopicFilter> { new TopicFilter(topic, MqttQualityOfServiceLevel.ExactlyOnce) }); Console.WriteLine($"已訂閱[{topic}]主題" + Environment.NewLine); } /// <summary> /// 退訂訊息 /// </summary> public static void UnSubscribeInfo() { if (string.IsNullOrEmpty(topic)) { Console.WriteLine("退訂主題不能為空!"); return; } if (!mqttClient.IsConnected) { Console.WriteLine("MQTT客戶端尚未連接!"); return; } mqttClient.UnsubscribeAsync(topic); Console.WriteLine($"已退訂[{topic}]主題" + Environment.NewLine); } } }View Code
發布端:

using MQTTnet; using MQTTnet.Client; using System; using System.Text; using System.Threading; using System.Threading.Tasks; namespace swapPublish { class Program { private static MqttClient mqttClient = null; private static string topic = "test123ABC"; private static IMqttClientOptions Options { get { MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder(); builder.WithCleanSession(false); //用戶名 密碼 builder.WithCredentials("", ""); var id = Guid.NewGuid().ToString(); builder.WithClientId(id); builder.WithTcpServer("127.0.0.1", 1883); return builder.Build(); } } static async Task Main(string[] args) { MqttFactory factory = new MqttFactory(); if (mqttClient == null) { mqttClient = (MqttClient)factory.CreateMqttClient(); mqttClient.Connected += MqttClient_Connected; mqttClient.Disconnected += async(s, e) => { Console.WriteLine("嘗試重連!" + Environment.NewLine); await ConnectToServer(); }; } await ConnectToServer(); Console.WriteLine("已斷開MQTT連接!" + Environment.NewLine); Console.ReadLine(); } /// <summary> /// 連接MQTT服務器 /// </summary> private static async Task ConnectToServer() { try { var res = await mqttClient.ConnectAsync(Options); } catch (Exception ex) { Console.WriteLine($"連接到MQTT服務器失敗!" + Environment.NewLine + ex.Message + Environment.NewLine); } } /// <summary> /// 連接MQTT服務器觸發 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private static void MqttClient_Connected(object sender, EventArgs e) { Console.WriteLine("已連接到MQTT服務器!" + Environment.NewLine); for(int i = 0; i < 10; i++) { var tak = PublishInfo(); Thread.Sleep(2000); } } private static async Task PublishInfo( ) { if (string.IsNullOrEmpty(topic)) { Console.WriteLine("發布主題不能為空!"); return; } string inputString = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); MqttApplicationMessageBuilder builder = new MqttApplicationMessageBuilder(); builder.WithPayload(Encoding.UTF8.GetBytes(inputString)); builder.WithTopic(topic); builder.WithRetainFlag(false); builder.WithExactlyOnceQoS(); await mqttClient.PublishAsync(builder.Build()); } } }View Code
如何只允許一個客戶端消費同一個訊息,暫時未解決!
大家有解決方法,請貼出評論,謝謝
MQTTnet 3.0.16 版本的使用
客戶端:

using MQTTnet; using MQTTnet.Adapter; using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; using MQTTnet.Protocol; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace mqttsub { class Program { static async Task Main(string[] args) { MqttClient mqtt = new MqttClient(); await mqtt.StartAsync(); Console.ReadKey(); } } public class MqttClient { private IMqttClient client; private IMqttClientOptions options; MqttClientDto model =null; public MqttClient() { model = new MqttClientDto { Account = "", PassWord = "", ClientId = Guid.NewGuid().ToString(), IP = "", Port = 1883, Topic="test/+/ABC" //通配符模式 該模式匹配 test/123/ABC testABC test/DDDDD/ABC 等 }; } public async Task StartAsync() { try { client = new MqttFactory().CreateMqttClient(); var build = new MqttClientOptionsBuilder() //配置客戶端Id .WithClientId(Guid.NewGuid().ToString()) //配置登錄賬號 .WithCredentials(model.Account,model.PassWord) //配置服務器IP埠 這里得埠號是可空的 .WithTcpServer(model.IP, 1883) .WithCleanSession(); options = build.Build(); //收到服務器發來訊息 client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceivedHandler); //client.UseApplicationMessageReceivedHandler(args=> { // Console.WriteLine("==================================================="); // Console.WriteLine("收到訊息:"); // Console.WriteLine($"主題:{args.ApplicationMessage.Topic}"); // Console.WriteLine($"訊息:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}"); // Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++"); // Console.WriteLine(); //}); //連接成功 client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(ConnectedHandler); //client.UseConnectedHandler(args=> { // Console.WriteLine("本客戶端已連接成功"); // Console.WriteLine($"地址:{model.IP}"); // Console.WriteLine($"埠:{model.Port}"); // Console.WriteLine($"客戶端:{model.ClientId}"); // Console.WriteLine($"賬號:{model.Account}"); // Console.WriteLine(); // //第1種訂閱方式 // client.SubscribeAsync("主題名稱").GetAwaiter().GetResult(); // //第2種訂閱方式 // List<MqttTopicFilter> Topics = new List<MqttTopicFilter>(); // Topics.Add(new MqttTopicFilter() { Topic = "主題名稱A", QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce }); // Topics.Add(new MqttTopicFilter() { Topic = "主題名稱B" }); // Topics.Add(new MqttTopicFilter() { Topic = "主題名稱C" }); // client.SubscribeAsync(Topics.ToArray()).GetAwaiter().GetResult(); // //第3種訂閱方式 // MqttClientSubscribeOptionsBuilder builder = new MqttClientSubscribeOptionsBuilder(); // builder.WithTopicFilter("AAA"); // client.SubscribeAsync(builder.Build()).GetAwaiter().GetResult(); //}); //斷開連接 重連就寫在此處 client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(DisconnectedHandler); //client.UseDisconnectedHandler(args => //{ // Console.WriteLine("本客戶端已經斷開連接"); // Console.WriteLine(); // try // { // client.ConnectAsync(options).GetAwaiter().GetResult(); // } // catch (Exception ex) // { // Console.WriteLine("重連失敗"); // } //}); //客戶端發送訊息 //await client.PublishAsync("你想要的主題", "你需要發送的東西"); //await client.PublishAsync("你想要的主題", Encoding.UTF8.GetBytes("你需要發送的東西").ToList()); //連接 await client.ConnectAsync(options); } catch (MqttConnectingFailedException) { Console.WriteLine("身份校驗失敗"); } catch (Exception ex) { Console.WriteLine("出現例外"); Console.WriteLine(ex.Message); } } /// <summary> /// 客戶端斷開連接后,如果需要重連在此處實作 /// </summary> /// <param name="obj"></param> private async void DisconnectedHandler(MqttClientDisconnectedEventArgs obj) { Console.WriteLine("本客戶端已經斷開連接"); Console.WriteLine(); try { await client.ConnectAsync(options); } catch (Exception) { Console.WriteLine("重連失敗"); } } /// <summary> /// 連接成功 在此處做訂閱主題(Topic)操作 /// </summary> /// <param name="obj"></param> private async void ConnectedHandler(MqttClientConnectedEventArgs obj) { Console.WriteLine("本客戶端已連接成功"); Console.WriteLine($"地址:{model.IP}"); Console.WriteLine($"埠:{model.Port}"); Console.WriteLine($"客戶端:{model.ClientId}"); Console.WriteLine($"賬號:{model.Account}"); Console.WriteLine(); //第1種訂閱方式 // client.SubscribeAsync("主題名稱").GetAwaiter().GetResult(); //第2種訂閱方式 List<MqttTopicFilter> Topics = new List<MqttTopicFilter>(); Topics.Add(new MqttTopicFilter() { Topic = model.Topic, QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce}); //Topics.Add(new MqttTopicFilter() { Topic = "主題名稱B" }); //Topics.Add(new MqttTopicFilter() { Topic = "主題名稱C" }); await client.SubscribeAsync(Topics.ToArray()); //第3種訂閱方式 //MqttClientSubscribeOptionsBuilder builder = new MqttClientSubscribeOptionsBuilder(); //builder.WithTopicFilter("AAA"); //client.SubscribeAsync(builder.Build()).GetAwaiter().GetResult(); } /// <summary> /// 收到訊息 /// </summary> /// <param name="obj"></param> private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj) { Console.WriteLine("==================================================="); Console.WriteLine("收到訊息:"); Console.WriteLine($"主題:{obj.ApplicationMessage.Topic}"); Console.WriteLine($"訊息:{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}"); Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++"); Console.WriteLine(); } } public class MqttClientDto { /// <summary> /// 連接地址 /// </summary> public string IP { get; set; } /// <summary> /// 賬號 /// </summary> public string Account { get; set; } /// <summary> /// 密碼 /// </summary> public string PassWord { get; set; } /// <summary> /// 客戶端Id /// </summary> public string ClientId { get; set; } public int Port { get; set; } public string Topic { get; set; } } }View Code
服務端:

using MQTTnet; using MQTTnet.Client.Receiving; using MQTTnet.Protocol; using MQTTnet.Server; using System; using System.Net; using System.Text; using System.Threading.Tasks; namespace MqttPub { class Program { static async Task Main(string[] args) { await new ServerDome(). StartAsync(); Console.Read(); } } public class ServerDome { private IMqttServer server; MqttClientDto model = null; public ServerDome() { model = new MqttClientDto { Account = "", PassWord = "", ClientId = Guid.NewGuid().ToString(), IP = "", Port = 1883, Topic = "test" }; } public async Task StartAsync() { if (server == null || !server.IsStarted) { server = new MqttFactory().CreateMqttServer(); MqttServerOptionsBuilder serverOptions = new MqttServerOptionsBuilder(); //、默認監聽埠 serverOptions.WithDefaultEndpointPort(model.Port); //校驗客戶端資訊 serverOptions.WithConnectionValidator(client => { string Account = client.Username; string PassWord = client.Password; string clientid = client.ClientId; if (Account == "" && PassWord == "") { client.ReasonCode = MqttConnectReasonCode.Success; Console.WriteLine("校驗成功"); } else { client.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; Console.WriteLine("校驗失敗"); } }); //客戶端發送訊息監聽 server.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(MessageReceivedHandler); //server.UseApplicationMessageReceivedHandler(args=>{ // Console.WriteLine("==================================================="); // Console.WriteLine("收到訊息:"); // Console.WriteLine($"客戶端:{args.ClientId}"); // Console.WriteLine($"主題:{args.ApplicationMessage.Topic}"); // Console.WriteLine($"訊息:{Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}"); // Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++"); // Console.WriteLine(); //}); //客戶端連接事件 server.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(ClientConnectedHandler); //server.UseClientConnectedHandler(args => //{ // Console.WriteLine($"{args.ClientId}此客戶端已經連接到服務器"); //}); //客戶端斷開連接事件 server.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(ClientDisconnectedHandler); //server.UseClientDisconnectedHandler(args => { // Console.WriteLine($"斷開連接的客戶端:{args.ClientId}"); // Console.WriteLine($"斷開連接型別:{args.DisconnectType.ToString()}"); //}); //客戶端訂閱主題事件 server.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(ClientSubscribedTopicHandler); //客戶端取消訂閱主題事件 server.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(ClientUnsubscribedTopicHandler); //服務器啟動事件 server.StartedHandler = new MqttServerStartedHandlerDelegate(StartedHandler); //服務器停止事件 server.StoppedHandler = new MqttServerStoppedHandlerDelegate(StoppedHandler); //服務端發送資料 //await server.PublishAsync("你想要的主題","你需要發送的東西"); //var mqttApplicationMessage = new MqttApplicationMessage(); //mqttApplicationMessage.Topic = "你想要的主題"; //mqttApplicationMessage.Payload = Encoding.ASCII.GetBytes("你需要發送的東西"); //await server.PublishAsync(mqttApplicationMessage); //啟動服務器 await server.StartAsync(serverOptions.Build()); } } public async Task StopAsync() { if (server != null) { if (server.IsStarted) { await server.StopAsync(); server.Dispose(); } } } /// <summary> /// 客戶端取消訂閱主題 /// </summary> /// <param name="obj"></param> private void ClientUnsubscribedTopicHandler(MqttServerClientUnsubscribedTopicEventArgs obj) { Console.WriteLine($"客戶端:{obj.ClientId}"); Console.WriteLine($"取消訂閱主題:{obj.TopicFilter}"); } /// <summary> /// 客戶端訂閱的主題 /// </summary> /// <param name="obj"></param> private void ClientSubscribedTopicHandler(MqttServerClientSubscribedTopicEventArgs obj) { Console.WriteLine($"客戶端:{obj.ClientId}"); Console.WriteLine($"訂閱主題:{obj.TopicFilter.Topic}"); } /// <summary> /// 客戶端斷開連接 /// </summary> /// <param name="obj"></param> private void ClientDisconnectedHandler(MqttServerClientDisconnectedEventArgs obj) { Console.WriteLine($"斷開連接的客戶端:{obj.ClientId}"); Console.WriteLine($"斷開連接型別:{obj.DisconnectType.ToString()}"); } /// <summary> /// 客戶端連接到服務器事件 /// </summary> /// <param name="obj"></param> private void ClientConnectedHandler(MqttServerClientConnectedEventArgs obj) { throw new NotImplementedException(); } /// <summary> /// 收到各個客戶端發送的訊息 /// </summary> /// <param name="obj"></param> private void MessageReceivedHandler(MqttApplicationMessageReceivedEventArgs obj) { Console.WriteLine("==================================================="); Console.WriteLine("收到訊息:"); Console.WriteLine($"客戶端:{obj.ClientId}"); Console.WriteLine($"主題:{obj.ApplicationMessage.Topic}"); Console.WriteLine($"訊息:{Encoding.UTF8.GetString(obj.ApplicationMessage.Payload)}"); Console.WriteLine("+++++++++++++++++++++++++++++++++++++++++++++++++++"); Console.WriteLine(); } /// <summary> /// MQTT啟動服務器事件 /// </summary> /// <param name="obj"></param> private void StartedHandler(EventArgs obj) { Console.WriteLine($"程式已經啟動!監聽埠為:{model.Port}"); } /// <summary> /// MQTT服務器停止事件 /// </summary> /// <param name="obj"></param> private void StoppedHandler(EventArgs obj) { Console.WriteLine("程式已經關閉"); } } public class MqttClientDto { /// <summary> /// 連接地址 /// </summary> public string IP { get; set; } /// <summary> /// 賬號 /// </summary> public string Account { get; set; } /// <summary> /// 密碼 /// </summary> public string PassWord { get; set; } /// <summary> /// 客戶端Id /// </summary> public string ClientId { get; set; } public int Port { get; set; } public string Topic { get; set; } } }View Code
這里說明下如何使用通配符
例如,發送 topic 主題為:test/123/ABC 或者 test/234/ABC ,消費者在訂閱時,可以使用:test/+/ABC 來訂閱該類訊息,
通配符的作用為分組訂閱、
發布者發布內容為: test//status ,訂閱者訂閱的為:test/+/status
當然,發布者也可以在 / / 之間增加內容,例如設備號:
主題名不能使用通配符, 但是主題過濾器中可以使用通配符
.因此,訂閱者可以通過過濾器接合通配符訂閱一類訊息
以MQTTnet 3.0.16 為例,開啟自動確認,開啟不保留最后一跳訊息,
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/547589.html
標籤:.NET技术