Skip to content

mqtt

Since v8.0.74

mqtt는 JSH의 MQTT 클라이언트 모듈입니다.

JSH 애플리케이션에서는 일반적으로 아래처럼 사용합니다.

const mqtt = require('mqtt');

현재 API는 이벤트 기반이며, Client를 생성하면 자동으로 브로커 연결을 시도합니다.

Client

MQTT 클라이언트 객체입니다.

생성
new Client(options)
옵션
옵션타입기본값설명
serversString[]MQTT 브로커 URL 목록 (예: tcp://127.0.0.1:1883)
usernameString브로커 인증 사용자 이름
passwordString브로커 인증 비밀번호
keepAliveNumber30Keep Alive(초)
connectRetryDelayNumber0재접속 지연(밀리초)
cleanStartOnInitialConnectionBooleanfalse최초 연결 시 MQTT v5 clean start 여부
connectTimeoutNumber0연결 타임아웃(밀리초)
사용 예시
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
const mqtt = require('mqtt');

const client = new mqtt.Client({
    servers: ['tcp://127.0.0.1:1883'],
    username: 'user',
    password: 'pass',
    keepAlive: 60,
    connectRetryDelay: 2000,
    connectTimeout: 10 * 1000,
    cleanStartOnInitialConnection: true,
});

client.on('open', () => {
    console.println('Connected');
    client.subscribe('test/topic', {
        qos: 0,
        properties: {
            subscriptionIdentifier: 7,
        },
    });
});

client.on('subscribed', (topic, reason) => {
    console.println('Subscribed:', topic, 'reason:', reason);
    client.publish('test/topic', 'Hello, MQTT!');
});

client.on('message', (msg) => {
    console.println('Message:', msg.topic, msg.payloadText);
    client.unsubscribe(msg.topic, {
        properties: {
            user: {
                source: 'example',
            },
        },
    });
});

client.on('unsubscribed', (topic, reason) => {
    console.println('Unsubscribed:', topic, 'reason:', reason);
    client.close();
});

client.on('error', (err) => {
    console.println('Error:', err.message);
});

client.on('close', () => {
    console.println('Disconnected');
});

메서드

publish()

토픽으로 메시지를 발행합니다.

사용 형식
publish(topic, message[, options])
매개변수
  • topic String
  • message String | Uint8Array | Object | Array
  • options Object (선택)
옵션타입기본값설명
qosNumber0QoS 레벨
retainBooleanfalseRetain 플래그
propertiesObjectMQTT v5 발행 프로퍼티

options.properties 필드:

프로퍼티타입설명
payloadFormatNumberPayload format indicator
messageExpiryNumber만료 시간
contentTypeString콘텐츠 타입
responseTopicString응답 토픽
correlationDataString바이트로 변환됨
topicAliasNumberTopic alias
subscriptionIdentifierNumberSubscription identifier
userObject사용자 정의 프로퍼티 (key: value)
반환값

없음. 결과는 published 또는 error 이벤트로 전달됩니다.

subscribe()

토픽을 구독합니다.

사용 형식
subscribe(topic[, options])
매개변수
  • topic String
  • options Object (선택)
옵션타입기본값설명
qosNumber1QoS 레벨
retainHandlingNumberMQTT v5 retain handling
noLocalBooleanfalse동일 클라이언트가 발행한 메시지 수신 여부
retainAsPublishedBooleanfalse브로커의 retain 플래그 유지 여부
propertiesObjectMQTT v5 구독 프로퍼티

options.properties 필드:

프로퍼티타입설명
subscriptionIdentifierNumberSubscription identifier
userObject사용자 정의 프로퍼티 (key: value)
반환값

없음. 결과는 subscribed 또는 error 이벤트로 전달됩니다.

사용 예시
client.subscribe('test/topic', {
    qos: 0,
    properties: {
        subscriptionIdentifier: 7,
        user: {
            source: 'example',
        },
    },
});

unsubscribe()

토픽 구독을 해제합니다.

사용 형식
unsubscribe(topic[, options])
매개변수
  • topic String
  • options Object (선택)
옵션타입설명
propertiesObjectMQTT v5 구독 해제 프로퍼티

options.properties 필드:

프로퍼티타입설명
userObject사용자 정의 프로퍼티 (key: value)
반환값

없음. 결과는 unsubscribed 또는 error 이벤트로 전달됩니다.

사용 예시
client.unsubscribe('test/topic', {
    properties: {
        user: {
            source: 'example',
        },
    },
});

close()

클라이언트 연결을 종료합니다.

사용 형식
close()
반환값

없음. close 이벤트가 발생합니다.

이벤트

open

클라이언트 연결이 완료되면 발생합니다.

client.on('open', () => { ... })

message

구독한 메시지를 수신하면 발생합니다.

client.on('message', (msg) => { ... })

msg 필드:

프로퍼티타입설명
topicString토픽 이름
payloadBuffer바이너리 안전한 메시지 페이로드
payloadTextStringUTF-8로 디코딩한 텍스트 편의 필드
propertiesObjectMQTT v5 publish 프로퍼티

msg.properties 필드:

프로퍼티타입설명
payloadFormatNumberPayload format indicator
messageExpiryNumber만료 시간
contentTypeString콘텐츠 타입
responseTopicString응답 토픽
correlationDataBuffer바이너리 안전한 correlation data
topicAliasNumberTopic alias
subscriptionIdentifierNumberSubscription identifier
userObject사용자 정의 프로퍼티

텍스트 메시지는 msg.payloadText 또는 msg.payload.toString()으로 읽을 수 있습니다.

바이너리 메시지는 msg.payload를 그대로 사용합니다.

client.on('message', (msg) => {
    console.println('Payload is buffer:', Buffer.isBuffer(msg.payload));
    console.println('Payload bytes:', Array.from(msg.payload).join(','));
});

MQTT v5 publish 프로퍼티는 msg.properties로 접근할 수 있습니다.

client.on('message', (msg) => {
    console.println('Content type:', msg.properties.contentType);
    console.println('Response topic:', msg.properties.responseTopic);
    console.println('Correlation data:', msg.properties.correlationData.toString());
    console.println('User source:', msg.properties.user.source);
});

subscribed

구독 ACK를 받으면 발생합니다.

client.on('subscribed', (topic, reason) => { ... })
  • topic String
  • reason Number (MQTT reason code)

published

발행 ACK를 받으면 발생합니다.

client.on('published', (topic, reason) => { ... })
  • topic String
  • reason Number (MQTT reason code)

unsubscribed

구독 해제 ACK를 받으면 발생합니다.

client.on('unsubscribed', (topic, reason) => { ... })
  • topic String
  • reason Number (MQTT reason code)

error

연결, 구독, 구독 해제, 발행 중 오류가 발생하면 전달됩니다.

client.on('error', (err) => { ... })
  • err Error

연결 전이거나 close() 호출 후 publish(), subscribe(), unsubscribe()를 호출하면 error 이벤트로 오류가 전달됩니다.

close

close() 호출 시 발생합니다.

client.on('close', () => { ... })

MQTT v5 쓰기 프로퍼티 예시

아래 예시는 MQTT v5 write API의 사용자 프로퍼티를 사용해 db/write/{table} 토픽으로 데이터를 기록합니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
const mqtt = require('mqtt');

const client = new mqtt.Client({
    servers: ['tcp://127.0.0.1:5653'],
});

const rows = [
    ['my-car', Date.now(), 32.1],
    ['my-car', Date.now() + 1000, 65.4],
];

client.on('open', () => {
    client.publish('db/write/EXAMPLE', rows, {
        qos: 1,
        properties: {
            user: {
                method: 'append',
                timeformat: 'ms',
            },
        },
    });
});

client.on('published', () => client.close());
client.on('error', (err) => console.println(err.message));
최근 업데이트