文章目錄
- 前言
- 一、快速搭建Python對阿里云物聯網MQTT設備接入端代碼實作
- 1. mqtt子應用下view.py 主檔案
- 2. wsgi.py
- 二、具體使用介紹
- 1. 安裝包
- 2. 設備認證,一機一密型接入(11-22行)
- 3. 回呼函式
- 3.1 on_connect,設備連接云端成功后會通過on_connect回呼函式通知用戶(25-51行)
- 3.2 on_disconnect ,連接成功以后如果連接斷開會通過on_disconnect 回呼通知用戶(54-62行)
- 4. 訂閱云端訊息(82-93行,155-156行)
- 5. 接收與處理來自云端的訊息(65-79行,145-146行)
- 6. 發送訊息到云端(106-112行,115-125行,147-148行)
- 7. 取消訊息訂閱(95-103行,149-150行)
- 總結
前言
MQTT是用于物聯網(IoT)的OASIS標準訊息傳遞協議,本文主要記錄使用阿里云物聯網平臺中,網關設備接入提示:需要理解Python paho-mqtt 模塊,本文使用aliyun-iot-linkkit實作,適用于Django環境下
建議先看完我的另一篇文章阿里云物聯網平臺使用,在進行使用
一、快速搭建Python對阿里云物聯網MQTT設備接入端代碼實作
先上代碼
1. mqtt子應用下view.py 主檔案
import json
import logging
import sys
import threading
import time
from linkkit import linkkit
logger = logging.getLogger('django')
# 來自一機一密的設備
options = {"ProductKey": "xxxxxxxxxxx",
"DeviceName": "device-name",
"DeviceSecret": "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy"
}
# 示例代碼配置設備的設備證書以及連接的公共示例的RegionID
lk = linkkit.LinkKit(
host_name="cn-shanghai", # 華東2(上海),根據自己的RegionID
product_key=options["ProductKey"],
device_name=options["DeviceName"],
device_secret=options["DeviceSecret"])
def on_connect(session_flag, rc, userdata):
"""
callback after connect_async
:param session_flag: type:int description:is previous connect session,0 new session; 1 previous session
:param rc: type:int rc的值決定了連接成功或者不成功:
:param userdata: type: description:same as LinkKit input parameter user_data
"""
print("on_connect:%d,rc:%d,userdata:" % (session_flag, rc))
if rc == 0:
# 連接成功
print("Connection successful")
elif rc == 1:
# 協議版本錯誤
print("Protocol version error")
elif rc == 2:
# 無效的客戶端標識
print("Invalid client identity")
elif rc == 3:
# 服務器無法使用
print("server unavailable")
elif rc == 4:
# 錯誤的用戶名或密碼
print("Wrong user name or password")
elif rc == 5:
# 未經授權
print("unaccredited")
print("Connect with the result code " + str(rc))
def on_disconnect(rc, userdata):
"""
callback after connection disconnect
:param rc: type:int description:0 success call for disconnect,1 network error
:param userdata: type: description:same as LinkKit input parameter user_data
"""
print("on_disconnect:rc:%d,userdata:" % rc)
if rc != 0:
print("Unexpected disconnection %s" % rc)
def on_topic_message(topic, payload, qos, userdata):
"""
callback after subscribe_topic call
:param topic: 訂閱的主題
:param payload: 內容
:param qos: 質量服務等級
:param userdata:
"""
print("on_topic_message:" + topic + " payload:" + str(payload) + " qos:" + str(qos))
json_msg = json.loads(payload.decode('utf-8')) # 一般為json資料
try:
# 接收到訊息后的業務邏輯,同時處理多任務亦可以采用異步、執行緒池等
pass
except Exception as e:
print('No this moType', e)
def on_subscribe_topic(mid, granted_qos, userdata):
"""
callback after subscribe_topic call
:param mid: type: int description:publish message id
:param granted_qos: type:list(int) description: corresponding to subscribe_topic parameter topic,0 represent qos=0,1 represent qos=1,128 represent subscribe error
:param userdata: type: description:same as LinkKit input parameter user_data
"""
print("on_subscribe_topic mid:%d, granted_qos:%s" %
(mid, str(','.join('%s' % it for it in granted_qos))))
print(granted_qos)
if granted_qos == 128:
print("訂閱失敗")
def on_unsubscribe_topic(mid, userdata):
"""
callback after unsubscribe topic
:param mid: type: int description:publish message id
:param userdata: type: description:same as LinkKit input parameter user_data
"""
print("on_unsubscribe_topic mid:%d" % mid)
pass
def on_publish_topic(mid, userdata):
"""
callback after publish_topic call
:param mid: type: int description:publish message id
:param userdata: type: description:same as LinkKit input parameter user_data
"""
print("on_publish_topic mid:%d" % mid)
# mqtt發布啟動函式
def mqtt_publish(sensor_data, topic='defult', qos=0):
try:
rc, mid = lk.publish_topic(lk.to_full_topic("user/update"), sensor_data)
print("mqtt_publish:已啟動...", "user/update", sensor_data)
return
except KeyboardInterrupt:
print("EXIT")
# 這是網路回圈的阻塞形式,直到客戶端呼叫disconnect()時才會回傳,它會自動處理重新連接,
lk.on_disconnect()
sys.exit(0)
# 啟動函式
def mqtt_run():
# 賬號密碼驗證放到最前面
# client.username_pw_set('user', 'user')
# client = mqtt.Client()
# 建立mqtt連接
# 注冊接收到云端資料的方法
lk.on_connect = on_connect
# 注冊取消接收到云端資料的方法
lk.on_disconnect = on_disconnect
# 如果產品生產時錯誤地將一個三元組燒寫到了多個設備,多個設備將會被物聯網平臺認為是同一個設備,
# 從而出現一個設備上線將另外一個設備的連接斷開的情況,用戶可以將自己的介面資訊上傳到云端,那么云端可以通過介面的資訊來進行問題定位,
lk.config_device_info("Eth|03ACDEFF0032|Eth|03ACDEFF0031")
# 企業實體域名配置的更改
lk.config_mqtt(secure="", endpoint="iot-060a085o.mqtt.iothub.aliyuncs.com")
# 注冊云端訂閱的方法
lk.on_subscribe_topic = on_subscribe_topic
# 注冊當接受到云端發送的資料的時候的方法
lk.on_topic_message = on_topic_message
# 注冊向云端發布資料的時候順便所呼叫的方法
lk.on_publish_topic = on_publish_topic
# 注冊取消云端訂閱的方法
lk.on_unsubscribe_topic = on_unsubscribe_topic
# 連接阿里云的函式(異步呼叫)
lk.connect_async()
# 因為他是他是異步呼叫需要時間所以如果沒有這個延時函式的話,他就會出現not in connected state的錯誤
time.sleep(2)
# 訂閱這個topic,不需要寫prodect_key和device_name
rc, mid = lk.subscribe_topic(lk.to_full_topic("user/get"))
2. wsgi.py
為mqtt服務創建一個執行緒
import threading
from apps.mqtt import views
# 啟用多執行緒 運行脫機主控模塊
thread_mqtt_run = threading.Thread(target=views.mqtt_run)
thread_mqtt_run.start()
二、具體使用介紹
1. 安裝包
安裝Python對接mqtt協議庫,paho-mqtt
pip install paho-mqtt
安裝阿里云物聯網封裝paho-mqtt后的庫,aliyun-iot-linkkit
pip install aliyun-iot-linkkit
2. 設備認證,一機一密型接入(11-22行)
另一篇文章阿里云物聯網平臺使用有介紹,
從創建好的設備中,找一個設備證書,一鍵復制傳入
注意:host_name是阿里云上你買的服務地址,“地域和可用區中查看對應的RegionID,公共實體和企業實體還有區別,
如果需要改變MQTT連接的一些默認引數,可以通過config_mqtt 指定埠等連接引數,如下所示:
config_mqtt(self, port=1883, protocol="MQTTv311", transport="TCP",
secure="TLS", keep_alive=60, clean_session=True,
max_inflight_message=20, max_queued_message=0,
auto_reconnect_min_sec=1,
auto_reconnect_max_sec=60,
cadata=None):
3. 回呼函式
用戶可以在回呼中加入自己的業務處理邏輯,
如果不夠清晰,可自行查看Python Link SDK中API串列資訊,
3.1 on_connect,設備連接云端成功后會通過on_connect回呼函式通知用戶(25-51行)
135行,為注冊該回呼函式
3.2 on_disconnect ,連接成功以后如果連接斷開會通過on_disconnect 回呼通知用戶(54-62行)
137行,為注冊該回呼函式
4. 訂閱云端訊息(82-93行,155-156行)
訂閱結果通過on_subscribe_topic通知用戶:
5. 接收與處理來自云端的訊息(65-79行,145-146行)
通過on_topic_message()回呼告知用戶
6. 發送訊息到云端(106-112行,115-125行,147-148行)
發布訊息結果通知
訊息發送后,云端是否成功接收通過on_publish_topic回呼通知用戶:
發送訊息
通過呼叫publish_topic()實作將訊息發送到云端:
核心為:rc, mid = lk.publish_topic(lk.to_full_topic(“user/pub”), “123”)
此處為個人封裝,上送時隨時調取函式
7. 取消訊息訂閱(95-103行,149-150行)
通過呼叫unsubscribe_topic()取消對指定topic訊息的訂閱:
本文暫時沒用用到,如有需要,按照如下寫即可
rc, mid = lk.unsubscribe_topic(lk.to_full_topic(“user/test”))
取消訂閱的結果通過on_unsubscribe_topic回呼通知用戶:
總結
希望大家多多交流,一起進步,
您的點贊是我堅持的動力,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/295734.html
標籤:其他