Skip to content

MQTT 조회

MQTT에서 데이터베이스 쿼리를 실행하려면 db/query 토픽으로 요청을 보내십시오. 서버는 db/reply 토픽 또는 요청의 reply 필드에 지정한 토픽으로 결과를 돌려줍니다.

쿼리 JSON

paramdefaultdescription
qn/a실행할 SQL 쿼리 문자열
p선택 사항. ? bind placeholder에 전달할 파라미터의 JSON 배열입니다.
예: ["name", 1234, 1.23, true] Since v8.0.75
replydb/reply쿼리 결과를 받을 토픽
formatjson결과 형식: json, csv, box
timeformatns시간 단위: s, ms, us, ns
tzUTC시간대: UTC, Local, 지역 지정
compressno compression압축 방식: gzip
rownumfalse행 번호 포함 여부: true, false
headingtrue헤더 표시 여부: true, false
precision-1부동소수점 자릿수: -1은 반올림 없음, 0은 정수

format=json에서 사용 가능한 추가 매개변수 Since v8.0.12

다음 옵션은 format=json일 때만 사용할 수 있습니다.

paramdefaultdescription
transposefalse행 대신 열 배열(cols)을 생성합니다.
rowsFlattenfalseJSON 객체의 rows 필드 차원을 한 단계 줄입니다.
rowsArrayfalse각 레코드를 객체 배열로 구성한 JSON을 생성합니다.

기본 예시는 클라이언트가 db/reply/#를 구독한 뒤 reply 필드를 db/reply/my_query로 지정해 db/query 토픽으로 쿼리를 발행하고, 여러 메시지 중 자신의 응답만 구분하는 방법을 보여 줍니다. ? bind placeholder를 사용할 때는 p 필드에 JSON 배열을 지정합니다.

{
    "q": "select name,time,value from example where name = ? limit 5",
    "p": ["wave.sin"],
    "format": "csv",
    "reply": "db/reply/my_query"
}
A demonstration shows how to query and receive responses over MQTT. (Using MQTTX.app)

A demonstration shows how to query and receive responses over MQTT. (Using MQTTX.app)

클라이언트 예시

JSH 앱

Since v8.5.0

이 예제에서는 응답 토픽을 구독하고 SQL 쿼리를 발행한 뒤 MQTT를 통해 결과를 받는 과정을 살펴봅니다.

  1. 응답 토픽 구독
    먼저 db/reply/my_query처럼 결과를 받을 토픽을 구독합니다.

  2. SQL 쿼리 발행
    db/query 토픽으로 SQL 쿼리(q), 결과 형식(format), 응답 토픽(reply)을 포함한 메시지를 발행합니다.

  3. 응답 수신 및 처리
    서버가 쿼리를 처리하면 지정한 응답 토픽으로 결과를 전송합니다. 클라이언트는 메시지를 받아 결과를 출력합니다.

JSH 예시 코드:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
const process = require("process");
const mqtt = require("mqtt");

const topicReply = "db/reply/my_query";
const topicQuery = "db/query";
const queryRequest = {
    q: `select name,time,value from example limit 5`,
    format: 'csv',
    reply: topicReply,
};

var client = new mqtt.Client({
    servers: ["tcp://127.0.0.1:5653"],
    keepAlive: 10,
});
client.on('open', () => {
    console.println('---- subscribe:', topicReply);
    client.subscribe(topicReply, {qos:0})
});
client.on('error', (err) => {
    console.println('MQTT ERROR:', err.message);
});
client.on('close', () => {
    console.println('---- disconnected');
});
client.on('subscribed', (topic, reason) => {
    console.println('---- publish:', topicQuery);
    client.publish(topicQuery, JSON.stringify(queryRequest));
});
client.on('message', (msg) => {
    console.println('---- reply')
    console.println(msg.payload);
    client.unsubscribe(msg.topic);
});
client.on('unsubscribed', (topic, reason) => {
    console.println('---- unsubscribed:', topic, 'reason:', reason);
    setTimeout(()=>{
        client.close();
    }, 500)
});

실행과 결과 출력:

/work > ./mqtt_query.js
---- subscribe: db/reply/my_query ----
---- publish: db/query ----
---- reply ----
name,time,value
my-car,1782260468085501458,1.2345
my-car,1782260474814668541,1.35795
my-car,1782260474827077041,1.4814
my-car,1782260474839257291,1.60485

---- unsubscribed: db/reply/my_query reason: 0 ----
---- disconnected ----

Node.js 클라이언트

npm install mqtt --save
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const mqtt = require("mqtt");

const client = mqtt.connect("mqtt://127.0.0.1:5653", {
    clean: true,
    connectTimeout: 3000,
    autoUseTopicAlias: true,
    protocolVersion: 5,
});

const sqlText = "SELECT time,value FROM example "+
    "where name = 'neo_cpu.percent' limit 3";

client.on("connect", () => {
    client.subscribe("db/reply/#", (err) => {
        if (!err) {
            const req = {
                q: sqlText,
                format: "box",
                precision: 1,
                timeformat: "15:04:05",
            };
            client.publish("db/query", JSON.stringify(req));
        }
    });
});

client.on("message", (topic, message) => {
    console.log(message.toString());
    client.end();
});
$ node main.js

+----------+-------+
| TIME     | VALUE |
+----------+-------+
| 05:46:19 | 69.4  |
| 05:46:22 | 26.4  |
| 05:46:25 | 42.8  |
+----------+-------+

Go 클라이언트

응답 구조 정의

type Result struct {
	Success bool       `json:"success"`
	Reason  string     `json:"reason"`
	Elapse  string     `json:"elapse"`
	Data    ResultData `json:"data"`
}

type ResultData struct {
	Columns []string `json:"columns"`
	Types   []string `json:"types"`
	Rows    [][]any  `json:"rows"`
}

db/reply 구독

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
client.Subscribe("db/reply", 1, func(_ paho.Client, msg paho.Message) {
    buff := msg.Payload()
    result := Result{}
    if err := json.Unmarshal(buff, &result); err != nil {
        panic(err)
    }
    if !result.Success {
        fmt.Println("RECV: query failed:", result.Reason)
        return
    }
    if len(result.Data.Rows) == 0 {
        fmt.Println("Empty result")
        return
    }
    for i, rec := range result.Data.Rows {
        // 각 레코드에 대해 필요한 작업을 수행합니다.
        name := rec[0].(string)
        ts := time.Unix(0, int64(rec[1].(float64)))
        value := float64(rec[2].(float64))
        fmt.Println(i+1, name, ts, value)
    }
})

Publish ‘db/query’

jsonStr := `{ "q": "select * from EXAMPLE order by time desc limit 5" }`
client.Publish("db/query", 1, false, []byte(jsonStr))
최근 업데이트