本篇內(nèi)容介紹了“Python MQTT客戶端怎么使用”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

成都創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),華容企業(yè)網(wǎng)站建設(shè),華容品牌網(wǎng)站建設(shè),網(wǎng)站定制,華容網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營(yíng)銷,網(wǎng)絡(luò)優(yōu)化,華容網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭(zhēng)力。可充分滿足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長(zhǎng)自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。
paho-mqtt 可以說是 Python MQTT 開源客戶端庫(kù)中的佼佼者。它由 Eclipse 基金會(huì)主導(dǎo)開發(fā),除了 Python 庫(kù)以外,同樣支持各大主流的編程語言,比如 C++、Java、JavaScript、Golang 等。目前 Python 版本已經(jīng)實(shí)現(xiàn)了 3.1 和 3.1.1 MQTT 協(xié)議,在最新開發(fā)版中實(shí)現(xiàn)了 MQTT 5.0。
在基金會(huì)的支持下,以每年一個(gè)版本的速度更新,本文發(fā)布時(shí)的最新版本為 1.5.0(于 2019 年 8 月發(fā)布)。
在 GitHub 主頁(yè)上,它提供了從入門的快速實(shí)現(xiàn)到每一個(gè)函數(shù)的詳細(xì)解讀,涵蓋了從初學(xué)者到高級(jí)使用者需要了解的各個(gè)部分。即使遇到超出范圍的問題,在 Google 上搜索,可以得到近 20 萬個(gè)相關(guān)詞條,是目前最為流行的 MQTT 客戶端。
得到如此多的關(guān)注度,除了穩(wěn)定的代碼外,還有其易用性。Paho 的接口使用非常簡(jiǎn)單優(yōu)雅,您只需要少量的代碼就能實(shí)現(xiàn) MQTT 的訂閱及消息發(fā)布。
pip3 install paho-mqtt
或者
git clone https://github.com/eclipse/paho.mqtt.python cd paho.mqtt.python python3 setup.py install
import paho.mqtt.client as mqtt
# 連接的回調(diào)函數(shù)
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
client.subscribe("$SYS/#")
# 收到消息的回調(diào)函數(shù)
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("broker.emqx.io", 1883, 60)
client.loop_forever()import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
client = mqtt.Client()
client.on_connect = on_connect
client.connect("broker.emqx.io", 1883, 60)
for i in range(3):
client.publish('a/b', payload=i, qos=0, retain=False)
print(f"send {i} to a/b")
time.sleep(1)
client.loop_forever()甚至,你可以通過一行代碼,實(shí)現(xiàn)訂閱、發(fā)布。
import paho.mqtt.subscribe as subscribe
# 當(dāng)調(diào)用這個(gè)函數(shù)時(shí),程序會(huì)堵塞在這里,直到有一條消息發(fā)送到 paho/test/simple 主題
msg = subscribe.simple("paho/test/simple", hostname="broker.emqx.io")
print(f"{msg.topic} {msg.payload}")import paho.mqtt.publish as publish
# 發(fā)送一條消息
publish.single("a/b", "payload", hostname="broker.emqx.io")
# 或者一次發(fā)送多個(gè)消息
msgs = [{'topic':"a/b", 'payload':"multiple 1"}, ("a/b", "multiple 2", 0, False)]
publish.multiple(msgs, hostname="broker.emqx.io")HBMQTT 基于 Python asyncio 開發(fā),僅支持 3.1.1 的 MQTT 協(xié)議。由于使用 asyncio 庫(kù),開發(fā)者需要使用 3.4 以上的 Python 版本。
CPU 的速度遠(yuǎn)遠(yuǎn)快于磁盤、網(wǎng)絡(luò)等 IO 操作,而在一個(gè)線程中,無論 CPU 執(zhí)行得再快,遇到 IO 操作時(shí),都得停下來等待讀寫完成,這無疑浪費(fèi)了許多時(shí)間。
為了解決這個(gè)問題,Python 加入了異步 IO 的特性。在 Python 3.4 中,正式將 asyncio 納入標(biāo)準(zhǔn)庫(kù)中,并在 Python 3.5 中,加入了 async/await 關(guān)鍵字。用戶可以很輕松的使用在函數(shù)前加入 async 關(guān)鍵字,使函數(shù)變成異步函數(shù)。
HBMQTT 便是建立在 asyncio 標(biāo)準(zhǔn)庫(kù)之上。它允許用戶顯示的設(shè)置異步斷點(diǎn),通過異步 IO,MQTT 客戶端在收取消息或發(fā)送消息時(shí),掛載當(dāng)前的任務(wù),繼續(xù)處理下一個(gè)。
不過 HBMQTT 的知名度卻小得多。在 Google 上搜索,關(guān)于 HBMQTT 僅有 6000 多個(gè)詞條,在 Stack Overflow 上只有 10 個(gè)提問數(shù)。這就意味著,如果選擇 HBMQTT 的話你需要很強(qiáng)的解決問題的能力。
有意思的是,HBMQTT 本身也是一個(gè) MQTT 服務(wù)器。你可以通過 hbmqtt 命令一鍵開啟。
$ hbmqtt [2020-08-28 09:35:56,608] :: INFO - Exited state new [2020-08-28 09:35:56,608] :: INFO - Entered state starting [2020-08-28 09:35:56,609] :: INFO - Listener 'default' bind to 0.0.0.0:1883 (max_connections=-1)
pip3 install hbmqtt
或者
git clone https://github.com/beerfactory/hbmqtt cd hbmqtt python3 setup.py install
import logging
import asyncio
from hbmqtt.client import MQTTClient, ClientException
from hbmqtt.mqtt.constants import QOS_1, QOS_2
async def uptime_coro():
C = MQTTClient()
await C.connect('mqtt://broker.emqx.io/')
await C.subscribe([
('$SYS/broker/uptime', QOS_1),
('$SYS/broker/load/#', QOS_2),
])
try:
for i in range(1, 100):
message = await C.deliver_message()
packet = message.publish_packet
print(f"{i}: {packet.variable_header.topic_name} => {packet.payload.data}")
await C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
await C.disconnect()
except ClientException as ce:
logging.error("Client exception: %s" % ce)
if __name__ == '__main__':
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
asyncio.get_event_loop().run_until_complete(uptime_coro())import logging
import asyncio
import time
from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
async def test_coro():
C = MQTTClient()
await C.connect('mqtt://broker.emqx.io/')
tasks = [
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)),
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)),
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)),
]
await asyncio.wait(tasks)
logging.info("messages published")
await C.disconnect()
if __name__ == '__main__':
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())更多使用細(xì)節(jié)情參考官方文檔:https://hbmqtt.readthedocs.io/en/latest/。
gmqtt 是由個(gè)人開發(fā)者開源的客戶端庫(kù)。默認(rèn)支持 MQTT 5.0 協(xié)議,如果連接的 MQTT 代理不支持 5.0 協(xié)議,則會(huì)降級(jí)到 3.1 并重新進(jìn)行連接。
相較于前兩者,gmqtt 還屬于初級(jí)開發(fā)階段,本文發(fā)布時(shí)的版本號(hào)是 0.6.7。但它是早期支持 MQTT 5.0 的 Python 庫(kù)之一,因此在網(wǎng)絡(luò)上知名度尚可。
同樣,它建立在 asyncio 庫(kù)上,因此需要使用 Python 3.4 以上的版本。
pip3 install gmqtt
或者
git clone https://github.com/wialon/gmqtt cd gmqtt python3 setup.py install
import asyncio
import os
import signal
import time
from gmqtt import Client as MQTTClient
STOP = asyncio.Event()
def on_connect(client, flags, rc, properties):
print('Connected')
def on_message(client, topic, payload, qos, properties):
print(f'RECV MSG: {topic} {payload}')
def on_subscribe(client, mid, qos, properties):
print('SUBSCRIBED')
def on_disconnect(client, packet, exc=None):
print('Disconnected')
def ask_exit(*args):
STOP.set()
async def main(broker_host):
client = MQTTClient("client-id")
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
client.on_disconnect = on_disconnect
# 連接 MQTT 代理
await client.connect(broker_host)
# 訂閱主題
client.subscribe('TEST/#')
# 發(fā)送測(cè)試數(shù)據(jù)
client.publish("TEST/A", 'AAA')
client.publish("TEST/B", 'BBB')
await STOP.wait()
await client.disconnect()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, ask_exit)
loop.add_signal_handler(signal.SIGTERM, ask_exit)
host = 'broker.emqx.io'
loop.run_until_complete(main(host))import asyncio
import os
import signal
import time
from gmqtt import Client as MQTTClient
STOP = asyncio.Event()
def on_connect(client, flags, rc, properties):
print('Connected')
client.subscribe('TEST/#', qos=0)
def on_message(client, topic, payload, qos, properties):
print(f'RECV MSG: {topic}, {payload}')
def on_disconnect(client, packet, exc=None):
print('Disconnected')
def ask_exit(*args):
STOP.set()
async def main(broker_host):
client = MQTTClient("client-id")
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
await client.connect(broker_host)
client.publish('TEST/TIME', str(time.time()), qos=1)
await STOP.wait()
await client.disconnect()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, ask_exit)
loop.add_signal_handler(signal.SIGTERM, ask_exit)
host = 'broker.emqx.io'
loop.run_until_complete(main(host))在介紹完這三款 Python MQTT 客戶端庫(kù)之后,我們?cè)賮砜纯慈绾螢樽约哼x擇合適的 MQTT 客戶端庫(kù)。這三個(gè)客戶端各有自己的優(yōu)缺點(diǎn):
paho-mqtt 有著最優(yōu)秀的文檔,代碼風(fēng)格易于理解,同時(shí)有著強(qiáng)大的基金會(huì)支持,但目前文檔的版本還不支持 MQTT 5.0。
HBMQTT 使用 asyncio 庫(kù)實(shí)現(xiàn),可以優(yōu)化網(wǎng)絡(luò) I/O 帶來的延遲。但是代碼風(fēng)格不友好,同樣不支持 MQTT 5.0。
gmqtt 同樣通過 asyncio 庫(kù)實(shí)現(xiàn),相比 HBMQTT ,代碼風(fēng)格友好,最重要的是,它支持 MQTT 5.0。但開發(fā)進(jìn)程慢,未來前景不明。
因此,在選擇時(shí),您可以參考一下的思路:
如果您是正常開發(fā),想要將其運(yùn)用在生產(chǎn)環(huán)境中,paho-mqtt 無疑是最好的選擇,其穩(wěn)定性和代碼易讀性遠(yuǎn)遠(yuǎn)超過其它兩個(gè)庫(kù)。在遇到問題時(shí),優(yōu)秀的文檔和互聯(lián)網(wǎng)上大量的詞條,也能幫您找到更多的解決方案。
對(duì)于熟練使用 asyncio 庫(kù)的讀者,不妨嘗試一下 HBMQTT 和 gmqtt。
如果您想要學(xué)習(xí)、參與開源項(xiàng)目或者使用 MQTT 5.0, 則不妨試用一下 gmqtt,并嘗試為其共享一份代碼吧。
“Python MQTT客戶端怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
本文標(biāo)題:PythonMQTT客戶端怎么使用
本文地址:http://www.chinadenli.net/article16/gpssgg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營(yíng)銷、定制開發(fā)、企業(yè)建站、網(wǎng)站設(shè)計(jì)公司、App設(shè)計(jì)、自適應(yīng)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)