MQTT Write

Topic MQTT v3.1/v3.1.1

The topic to write data is named with table name of the destination.

To utilize payload formats other than JSON, construct the MQTT topic by concatenating the table name, payload format, and compression type, each separated by a colon (:).

The full syntax of the topic is:

db/{method}/{table}:{format}:{compress}

method: There are two methods for writing data - append and write. The append is recommend for the general situation of MQTT environment.

  • append: writing data in append mode.
  • write: writing data in INSERT sql statement.

format: Current version of machbase-neo supports json and csv. The default format is json.

compress: Currently gzip is supported.

Examples

  • db/append/EXAMPLE means writing data to the table EXAMPLE in append method and the payload is JSON.

  • db/append/EXAMPLE:json is equivalent the example above. The last :json part can be omitted, because json is the default format.

  • db/append/EXAMPLE:json:gzip means writing data to the table EXAMPLE in append method and the payload is gzip compressed JSON.

  • db/append/EXAMPLE:csv means writing data to the table EXAMPLE in append method and the payload is CSV.

  • db/write/EXAMPLE:csv means writing data to the table EXAMPLE with INSERT INTO... SQL statement and the payload is CSV.

  • db/write/EXAMPLE:csv:gzip means writing data to the table EXAMPLE with INSERT INTO... SQL statement and the payload is gzip compressed CSV.

Topic MQTT v5

Since v8.0.33

In MQTT v5, user properties are custom key-value pairs that can be attached to each message. They offer greater flexibility compared to previous versions(MQTT v3.1/v3.1.1) and allow for additional metadata to be included with the message.

📌
Note: It is also possible to use MQTT v3.1 topic syntax with MQTT v5 without using custom properties.

When using MQTT v5, the topic syntax can be simply db/write/{table}, and the following properties are supported:

User PropertyDefaultValues
formatjsoncsv, json, ndjson
timeformatnsTime format: s, ms, us, ns
tzUTCTime Zone: UTC, Local and location spec
compressgzip
methodinsertinsert, append
replyTopic to which the server sends the result messages

Additional properties for format=csv

User PropertyDefaultValues
delimiter,
headerskip, columns
📌
According to the semantics of append method, header=columns does not work with method=append.

APPEND method

Since MQTT is a connection-oriented protocol, a client program can continuously send data while maintaining the same MQTT session. This is the real benefit of using MQTT over HTTP for writing data.

In this example, we use mosquitto_pub just for demonstration. Since it makes a connection to MQTT server and close when it finishes to publish a single message. You will barely see performance gains against HTTP write api or some cases it may be worse.
Use this MQTT method only when a client can keep a connection relatively long time and send multiple messages.

JSON

PUBLISH multiple records

The payload format in the example below is an array of tuples (an array of arrays in JSON). It appends multiple records to the table through a single MQTT message. It is also possible to publish a single tuple, as shown below. Machbase Neo accepts both types of payloads via MQTT.

  • mqtt-data.json
[
    [ "my-car", 1670380342000000000, 32.1 ],
    [ "my-car", 1670380343000000000, 65.4 ],
    [ "my-car", 1670380344000000000, 76.5 ]
]
mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/write/EXAMPLE \
    -V 5 \
    -D PUBLISH user-property method append \
    -f ./mqtt-data.json
mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/append/EXAMPLE \
    -f ./mqtt-data.json

PUBLISH single record

  • mqtt-data.json
[ "my-car", 1670380345000000000, 87.6 ]
mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/write/EXAMPLE \
    -V 5 \
    -D PUBLISH user-property method append \
    -f ./mqtt-data.json
mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/append/EXAMPLE \
    -f ./mqtt-data.json

PUBLISH gzip JSON

mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/write/EXAMPLE \
    -V 5 \
    -D PUBLISH user-property method append \
    -D PUBLISH user-property compress gzip \
    -f mqtt-data.json.gz
mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/append/EXAMPLE:json:gzip \
    -f mqtt-data.json.gz

NDJSON

Since v8.0.33

NDJSON (Newline Delimited JSON) is a format for streaming JSON data where each line is a valid JSON object. This is useful for processing large datasets or streaming data. Each line should be a complete JSON object where all field names match the columns of the table.

  • mqtt-nd.json
{"NAME":"ndjson-data", "TIME":1670380342000000000, "VALUE":1.001}
{"NAME":"ndjson-data", "TIME":1670380343000000000, "VALUE":2.002}
mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/write/EXAMPLE \
    -V 5 \
    -D PUBLISH user-property method append \
    -D PUBLISH user-property format ndjson \
    -f mqtt-nd.json
mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/append/EXAMPLE:ndjson \
    -f mqtt-nd.json

CSV

  • mqtt-data.csv
NAME,TIME,VALUE
my-car,1670380346,87.7
my-car,1670380347,98.6
my-car,1670380348,99.9
mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/write/EXAMPLE \
    -V 5 \
    -D PUBLISH user-property format csv \
    -D PUBLISH user-property method append \
    -D PUBLISH user-property header skip \
    -D PUBLISH user-property timeformat s \
    -f mqtt-data.csv

The highlighted header skip option indicate that the first line is a header.

In MQTT v3.1, there is no mechanism to indicate whether the first line is a header or data. Therefore, the payload must not include a header, and all fields should match the column order in the table.

  • mqtt-data.csv
my-car,1670380346000000000,87.7
my-car,1670380347000000000,98.6
my-car,1670380348000000000,99.9
mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/append/EXAMPLE:csv \
    -f mqtt-data.csv

PUBLISH gzip CSV

NAME,TIME,VALUE
my-car,1670380346,87.7
my-car,1670380347,98.6
my-car,1670380348,99.9
mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/write/EXAMPLE \
    -V 5 \
    -D PUBLISH user-property format csv \
    -D PUBLISH user-property method append \
    -D PUBLISH user-property header skip \
    -D PUBLISH user-property timeformat s \
    -D PUBLISH user-property compress gzip \
    -f mqtt-data.csv.gz

Topic = Table + :csv:gzip

my-car,1670380346,87.7
my-car,1670380347,98.6
my-car,1670380348,99.9
mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/append/EXAMPLE:csv:gzip \
    -f mqtt-data.csv.gz

INSERT method

It is strongly recommended using append method for the better performance through MQTT. Refer to insert method only in situations where the order of data fields differs from the column order in the table or when not all columns are matched.

If the data has a different number of fields or a different order from the columns in the table, use the insert method instead of the default append method.

JSON

Since db/write works in INSERT INTO table(...) VALUE(...) SQL statement, it is required the columns in json payload. The example of data-write.json is below.

  • mqtt-data.json
1
2
3
4
5
6
7
8
9
{
  "data": {
    "columns": ["name", "time", "value"],
    "rows": [
      [ "wave.pi", 1687481466000000000, 1.2345],
      [ "wave.pi", 1687481467000000000, 3.1415]
    ]
  }
}

The method option defaults to insert, so it can be omitted.

mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/write/EXAMPLE \
    -V 5 \
    -D PUBLISH user-property method insert \
    -f mqtt-data.json

Topic db/write/{table} is for INSERT.

mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/write/EXAMPLE \
    -f mqtt-data.json

NDJSON

Since v8.0.33

This request message is equivalent that consists INSERT SQL statement as INSERT into {table} (columns...) values (values...)

  • mqtt-nd.json
{"NAME":"ndjson-data", "TIME":1670380342, "VALUE":1.001}
{"NAME":"ndjson-data", "TIME":1670380343, "VALUE":2.002}

The method option defaults to insert, so it can be omitted.

mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/write/EXAMPLE \
    -V 5 \
    -D PUBLISH user-property method insert \
    -D PUBLISH user-property format ndjson \
    -D PUBLISH user-property timeformat s \
    -f mqtt-nd.json
  • mqtt-nd.json
{"NAME":"ndjson-data", "TIME":1670380342000000000, "VALUE":1.001}
{"NAME":"ndjson-data", "TIME":1670380343000000000, "VALUE":2.002}

Topic db/write/{table}:ndjson is for INSERT.

mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/append/EXAMPLE:ndjson \
    -f mqtt-nd.json

CSV

Insert methods with CSV data that has a different number or order of fields compared to the table columns is supported only in MQTT v5 using custom properties.

VALUE,NAME,TIME
87.7,my-car,1670380346000
98.6,my-car,1670380347000
99.9,my-car,1670380348000
mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/write/EXAMPLE \
    -V 5 \
    -D PUBLISH user-property format csv \
    -D PUBLISH user-property method insert \
    -D PUBLISH user-property header columns \
    -D PUBLISH user-property timeformat ms \
    -f mqtt-data.csv
my-car,1670380346000000000,87.7
my-car,1670380347000000000,98.6
my-car,1670380348000000000,99.9
mosquitto_pub -h 127.0.0.1 -p 5653 \
    -t db/write/EXAMPLE:csv \
    -f mqtt-data.csv

TQL

Topic db/tql/{file.tql} is for invoking TQL file.

When the data transforming is required for writing into the database, prepare the proper tql script and publish the data to the topic named db/tql/{file.tql}.

Please refer to the As Writing API for the writing data via MQTT and tql.

Max message size

The maximum size of payload in a PUBLISH message is 256MB by MQTT specification. If a malicious or malfunctioning client sends large messages continuously it can consume all of network bandwidth and computing resource of server side then it may lead server to out of service status. It is good practice to set max message limit as just little more than what client applications demand. The default mqtt max message size is 1MB (1048576), it can be adjusted by command line flag like below or MaxMessageSizeLimit in the configuration file.

machbase-neo serve --mqtt-max-message 1048576
Last updated on