文章目錄
- 前言
- 一、示例代碼
- 總結
前言
在我另一篇[Python快速實作MQTT雙向訂閱發布](https://blog.csdn.net/m0_47958289/article/details/115004791) ,已經詳細介紹了相關概念,環境的搭建,代碼方面不夠完善,下面是完善后的代碼,后面還會更新阿里云aliyun-iot-linkkit,這是阿里云物聯網官方封裝paho-mqtt后的,結合阿里云物聯網平臺使用更加方便,同時也可以將阿里云aliyun-iot-linkkit相關屬性決議后,繼續使用paho-mqtt,提示:以下是本篇文章正文內容,下面案例可供參考
一、示例代碼
如果代碼看不懂,建議結合Python paho-mqtt 模塊使用,決議和實戰相結合,學習,也歡迎留言私信交流
import json
import sys
# 引入mqtt包
import paho.mqtt.client as mqtt
# 使用獨立執行緒運行
from threading import Thread
# 建立mqtt連接
def on_connect(client, userdata, flag, rc):
if rc == 0:
# 連接成功
print("Connection successful")
elif rc == 1:
# 協議版本錯誤
print("Protocol version error")
elif rc == 2:
# 無效的客戶端標識
print("Invalid client identity")
elif rc == 3:
# 服務器無法使用
print("server unavailable")
elif rc == 4:
# 錯誤的用戶名或密碼
print("Wrong user name or password")
elif rc == 5:
# 未經授權
print("unaccredited")
print("Connect with the result code " + str(rc))
# 訂閱頻道
# client.subscribe('31765425213673472', qos=2)
# 當與代理斷開連接時呼叫
def on_disconnect(client, userdata, rc):
# rc == 0回呼被呼叫以回應disconnect()呼叫
# 如果以任何其他值斷開連接是意外的,例如可能出現網路錯誤,
if rc != 0:
print("Unexpected disconnection %s" % rc)
# 當收到關于客戶訂閱的主題的訊息時呼叫,
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
json_msg = json.loads(msg.payload.decode('utf-8'))
# 加入個人邏輯
pass
# 當使用使用publish()發送的訊息已經傳輸到代理時被呼叫,
def on_publish(client, obj, mid):
print("on_Publish, mid: " + str(mid))
# 當代理回應訂閱請求時被呼叫
def on_subscribe(client, userdata, mid, granted_qos):
print("on_Subscribed: " + str(mid) + " " + str(granted_qos))
# 當代理回應取消訂閱請求時呼叫,
def on_unsubscribe(client, userdata, mid):
print("on_unsubscribe, mid: " + str(mid))
# 當客戶端有日志資訊時呼叫
def on_log(client, obj, level, string):
print("on_Log:" + string)
# mqtt訂閱啟動函式
def mqtt_subscribe():
global client
client.loop_forever()
# mqtt發布啟動函式
def mqtt_publish(sensor_data, topic='xxxxxxxx', qos=2):
global client
try:
client.publish(topic=topic, payload=sensor_data, qos=qos)
except KeyboardInterrupt:
print("EXIT")
# 這是網路回圈的阻塞形式,直到客戶端呼叫disconnect()時才會回傳,它會自動處理重新連接,
client.disconnect()
sys.exit(0)
client = mqtt.Client()
# 啟動函式
def mqtt_run():
# 賬號密碼驗證放到最前面
client.username_pw_set('user', 'user')
# client = mqtt.Client()
# 建立mqtt連接
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.on_message = on_message
# 當與代理斷開連接時呼叫
client.on_disconnect = on_disconnect
client.on_log = on_log
# 系結 MQTT 服務器地址
broker_ip = ''
# MQTT服務器的埠號
# client.connect(host=broker_ip, port=1883, keepalive=6000)
client.connect(host=broker_ip, port=1883)
client.reconnect_delay_set(min_delay=1, max_delay=2000)
client.subscribe('xxxxxxxx', qos=0)
# 創建執行緒去持續接收訂閱資訊
subscribe_thread = Thread(target=mqtt_subscribe)
subscribe_thread.start()
if __name__ == "__main__":
mqtt_run()
總結
歡迎各位交流,如有錯誤,歡迎指正,Figting!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/295738.html
標籤:其他
上一篇:STM32串口輸出字串