Python3連接阿里云物聯網設備發送接收資料(樹莓派)
- 阿里云物聯網IOT
- 代碼部分
- 庫檔案
- Windows下安裝環境
- 樹莓派安裝環境
- 可能遇到的錯誤
- 代碼
- 效果展示
阿里云物聯網IOT
首先,準備好阿里云產品設備的創建 (三碼)
創建流程可以參考這篇檔案 :移遠BC35-G配置網路連接阿里云MQTT發送資料
代碼部分
這里我用的IDE是Pycham 2021.2 (可以移植到spyder,Thonny等樹莓派的IDE)
庫檔案
這里我們需要一個阿里云IOT庫檔案
from linkkit import linkkit
和兩個基礎庫檔案 time,Json
import time
import json
Windows下安裝環境
打開命令提示符 輸入
pip install aliyun-iot-linkkit
樹莓派安裝環境
打開終端
pip install aliyun-iot-linkkit
可能遇到的錯誤
網路延遲問題,重復pip install aliyun-iot-linkkit即可
代碼
配置 “ 三碼 ”
ProductKey="ProductKey" #阿里云物聯網ProductKey
DeviceName="DeviceName" #阿里云物聯網DeviceName
DeviceSecret="DeviceSecret" #阿里云物聯網DeviceSecret
連接與取消阿里云設備
#連接阿里云設備
def on_connect(session_flag, rc, userdata):
print("on_connect:%d,rc:%d,userdata:" % (session_flag, rc))
pass
#取消連接阿里云
def on_disconnect(rc, userdata):
print("on_disconnect:rc:%d,userdata:" % rc)
訂閱topic
# 訂閱topic
def on_subscribe_topic(mid, granted_qos, userdata):
print("on_subscribe_topic mid:%d, granted_qos:%s" %
(mid, str(','.join('%s' % it for it in granted_qos))))
pass
接收與停止阿里云的資料
#接收阿里云的資料
def on_topic_message(topic, payload, qos, userdata):
print("阿里云發布資料:", str(payload))
#拿到接收來的資料
data=str(payload)[2:-1]
print("阿里云發布資料:",data)
dataDict=json.loads(data)
# 切片左閉右開 取頭不取尾
print("阿里云發布資料:",type(dataDict))
#多層決議
{"temp":{"value":32}}
print(dataDict["temp"]["value"]) #temp是溫度識別符號,value模擬溫度資料
pass
#停止訂閱云端資料
def on_unsubscribe_topic(mid, userdata):
print("取消訂閱topic mid:%d" % mid)
pass
發布訊息的結果,判斷是否成功呼叫發布函式
#發布訊息的結果,判斷是否成功呼叫發布函式
def on_publish_topic(mid, userdata):
print("發布公共topic mid:%d" % mid)
發布主題
#發布主題
while True:
data={
"RoomTemp":28
}
#產品屬性上報: /sys/a1TbSHGVD5F/${deviceName}/thing/event/property/post 發布 設備屬性上報
rc, mid = lk.publish_topic(lk.to_full_topic("/sys/ProductKey/deviceName/thing/event/property/post"),str(data))
time.sleep(2)
pass
p_forever()
完整代碼
from linkkit import linkkit #阿里云aliyun-iot-linkkit庫
import time #python延時庫
import json #發送json資料
ProductKey="ProductKey" #阿里云物聯網ProductKey
DeviceName="DeviceName" #阿里云物聯網DeviceName
DeviceSecret="DeviceSecret" #阿里云物聯網DeviceSecret
#連接阿里云設備
def on_connect(session_flag, rc, userdata):
print("on_connect:%d,rc:%d,userdata:" % (session_flag, rc))
pass
#取消連接阿里云
def on_disconnect(rc, userdata):
print("on_disconnect:rc:%d,userdata:" % rc)
# 訂閱topic
def on_subscribe_topic(mid, granted_qos, userdata):
print("on_subscribe_topic mid:%d, granted_qos:%s" %
(mid, str(','.join('%s' % it for it in granted_qos))))
pass
#接收阿里云的資料
def on_topic_message(topic, payload, qos, userdata):
print("阿里云發布資料:", str(payload))
#拿到接收來的資料
data=str(payload)[2:-1]
print("阿里云發布資料:",data)
dataDict=json.loads(data)
# 切片左閉右開 取頭不取尾
print("阿里云發布資料:",type(dataDict))
#多層決議
{"temp":{"value":29.8}}
print(dataDict["temp"]["value"]) #temp是溫度識別符號,value模擬溫度資料
pass
#停止訂閱云端資料
def on_unsubscribe_topic(mid, userdata):
print("取消訂閱topic mid:%d" % mid)
pass
#發布訊息的結果,判斷是否成功呼叫發布函式
def on_publish_topic(mid, userdata):
print("發布公共topic mid:%d" % mid)
#初始化連接引數,阿里云三碼設定
lk = linkkit.LinkKit(
host_name="cn-shanghai",#當前設備服務器(上海-華東二)
product_key=ProductKey,#當前設備product_key
device_name=DeviceName,#當前設備device_name
device_secret=DeviceSecret)#當前設備device_secret
#注冊接收到云端資料的方法
lk.on_connect = on_connect
#注冊取消接收到云端資料的方法
lk.on_disconnect = on_disconnect
#注冊云端訂閱的方法
lk.on_subscribe_topic = on_subscribe_topic
#注冊當接受到云端發送的資料的時候的方法
lk.on_topic_message = on_topic_message
#注冊向云端發布資料的時候順便所呼叫的方法
lk.on_publish_topic = on_publish_topic
#注冊取消云端訂閱的方法
lk.on_unsubscribe_topic = on_unsubscribe_topic
#連接阿里云的函式(異步呼叫)
lk.connect_async()
time.sleep(2) #延時設定
#訂閱主題
rc, mid = lk.subscribe_topic(lk.to_full_topic("user/get"))
#發布主題
while True:
data={
"RoomTemp":28
}
#產品屬性上報: /sys/ProductKey/${deviceName}/thing/event/property/post 發布 設備屬性上報
rc, mid = lk.publish_topic(lk.to_full_topic("/sys/ProductKey/deviceName/thing/event/property/post"),str(data))
time.sleep(2)
pass
p_forever()
產品屬性上報: /sys/ProductKey/${deviceName}/thing/event/property/post 發布 設備屬性上報
效果展示
模擬資料發送
平臺發送資料
樹莓派同理
祝你順利!!!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/295741.html
標籤:其他
上一篇:ST單片機概況
下一篇:物聯網-電力監控平臺(二)