MQTT Query

The database query topic for MQTT is db/query. Send a query request to this topic, and the server will respond with the result to the db/reply topic or the topic specified in the reply field of the request.

Query JSON

paramdefaultdescription
qn/aSQL query string
replydb/replyThe topic where to receive the result of query
formatjsonResult data format: json, csv, box
timeformatnsTime format: s, ms, us, ns
tzUTCTime Zone: UTC, Local and location spec
compressno compressioncompression method: gzip
rownumfalseincluding rownum: true, false
headingtrueshowing heading: true, false
precision-1precision of float value, -1 for no round, 0 for int

More Parameters in format=json Since v8.0.12

Those options are available only when format=json

paramdefaultdescription
transposefalseproduce cols array instead of rows.
rowsFlattenfalsereduce the array dimension of the rows field in the JSON object.
rowsArrayfalseproduce JSON that contains only array of object for each record.

A basic query example shows the client subscribe to db/reply/# and publish a query request to db/query with reply field db/reply/my_query so that it can identify the individual reply from multiple messages.

{
    "q": "select name,time,value from example limit 5",
    "format": "csv",
    "reply": "db/reply/my_query"
}
A demonstration shows how to query and receive responses over MQTT. (Using MQTTX.app)

A demonstration shows how to query and receive responses over MQTT. (Using MQTTX.app)

Client Examples

Javascript Client

npm install mqtt --save
const mqtt = require("mqtt");

const client = mqtt.connect("mqtt://127.0.0.1:5653", {
    clean: true,
    connectTimeout: 3000,
    autoUseTopicAlias: true,
    protocolVersion: 5,
});

client.on("connect", () => {
    client.subscribe("db/reply/#", (err) => {
        if (!err) {
            const req = {
                q: "SELECT time,value FROM example where name = 'neo_cpu.percent' limit 3",
                format: "box",
                precision: 1,
                timeformat: "15:04:05",
            };
            client.publish("db/query", JSON.stringify(req));
        }
    });
});

client.on("message", (topic, message) => {
    console.log(message.toString());
    client.end();
});
$ node main.js

+----------+-------+
| TIME     | VALUE |
+----------+-------+
| 05:46:19 | 69.4  |
| 05:46:22 | 26.4  |
| 05:46:25 | 42.8  |
+----------+-------+

Go client

Define data structure for response

type Result struct {
	Success bool       `json:"success"`
	Reason  string     `json:"reason"`
	Elapse  string     `json:"elapse"`
	Data    ResultData `json:"data"`
}

type ResultData struct {
	Columns []string `json:"columns"`
	Types   []string `json:"types"`
	Rows    [][]any  `json:"rows"`
}

Subscribe ‘db/reply’

client.Subscribe("db/reply", 1, func(_ paho.Client, msg paho.Message) {
    buff := msg.Payload()
    result := Result{}
    if err := json.Unmarshal(buff, &result); err != nil {
        panic(err)
    }
    if !result.Success {
        fmt.Println("RECV: query failed:", result.Reason)
        return
    }
    if len(result.Data.Rows) == 0 {
        fmt.Println("Empty result")
        return
    }
    for i, rec := range result.Data.Rows {
        // do something for each record
        name := rec[0].(string)
        ts := time.Unix(0, int64(rec[1].(float64)))
        value := float64(rec[2].(float64))
        fmt.Println(i+1, name, ts, value)
    }
})

Publish ‘db/query’

jsonStr := `{ "q": "select * from EXAMPLE order by time desc limit 5" }`
client.Publish("db/query", 1, false, []byte(jsonStr))
Last updated on