Python 클라이언트
준비
paho 설치
pip install paho-mqtt
프로젝트 디렉터리 생성
mkdir python-mqtt && cd python-mqtt
퍼블리셔
클라이언트
import paho.mqtt.client as mqtt
mqttClient = mqtt.Client("python_pub") # 퍼블리셔 이름
연결 콜백
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("CONNACK OK")
else:
print("CONNACK KO code=", rc)
연결 (비 TLS)
MQTT 평문 소켓을 통해 machbase-neo에 연결합니다.
mqttClient = mqtt.Client("python_pub", clean_session=True)
mqttClient.on_connect = on_connect
mqttClient.connect("127.0.0.1", port=5653, keepalive=10, clean_session=True)
mqttClient.loop_start()
연결 종료
mqttClient.disconnect()
mqttClient.loop_stop()
발행 콜백
def on_publish(client, userdata, mid):
print("PUBACK mid=",mid)
발행
mqttClient.on_publish = on_publish
mqttClient.publish("db/append/example", """[
["temperature",1677033057000000000, 21.1],
["humidity", 1677033057000000000, 0.53]
]""", qos=1)
전체 소스 코드
import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("CONNACK OK")
else:
print("CONNACK KO code:", rc)
def on_publish(client, userdata, mid):
print("PUBACK mid:",mid)
mqttClient = mqtt.Client("python_pub", clean_session=True)
mqttClient.on_connect = on_connect
mqttClient.on_publish = on_publish
mqttClient.connect("127.0.0.1", port=5653, keepalive=10)
mqttClient.loop_start()
mqttClient.publish("db/append/example", """[
["temperature",1677033057000000000, 21.1],
["humidity", 1677033057000000000, 0.53]
]""", qos=1)
time.sleep(1)
mqttClient.disconnect()
mqttClient.loop_stop()
최근 업데이트