MQTT : MQTT subscribe and publish protocol client

更新时间:
2024-05-13

MQTT : MQTT subscribe and publish protocol client

This module is an MQTT client that can be used to interact with the Iot cloud service for data interaction. This module supports both tcp and tls securely connections.

MQTT is a Machine-to-Machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium. For example, it has been used in sensors communicating to a broker via satellite link, over occasional dial-up connections with healthcare providers, and in a range of home automation and small device scenarios. It is also ideal for mobile applications because of its small size, low power usage, minimised data packets, and efficient distribution of information to one or many receivers.

MQTT does not restrict the subject matter of the transport and the type of the message content, but for compatibility reasons, JSRE recommends using the JSON data format for publishing and subscribing.

User can use the following code to import the mqtt module.

var mqtt = require('mqtt');

Support

The following shows mqtt module APIs available for each permissions.

 User ModePrivilege Mode
mqtt.open
mqtt.mode
client.close
client.connect
client.isConnected
client.isQueued
client.publish
client.subscribe
client.unsubscribe

Mqtt Object

mqtt.open(saddr[, tlsOpt[, timeout]])

  • saddr {Object} Server socket address.
  • tlsOpt {Object} TLS securely connections options. default: undefined, means use TCP connection.
  • timeout {Integer} Synchronous connection time to wait in milliseconds, default: undefined means timeout with default connect timeout setting.
  • Returns: {Object} A MQTT Client object.

saddr Please consult the socket module documentation.
tlsOpt Please consult the tls module documentation.

Create an MQTT client and connect to the specified server. Use synchronous mode.

Example

  • TCP
var serAddr = socket.sockaddr('192.168.0.1', 8010);

var client = mqtt.open(serAddr, undefined, 5000);
if (!client) {
  console.log('Can not connect to broker.');
}
  • TLS
var serAddr = socket.sockaddr('192.168.0.1', 8020);
var tlsOpt = {
  rejectUnauthorized: true,
  ca: fs.readString('./ca.pem'),
  server: 'xxx.xxx.com'
};

var client = mqtt.open(serAddr, tlsOpt, 5000);
if (!client) {
  console.log('Can not connect to broker.');
}

mqtt.open(saddr, tlsOpt, callback[, bufSize])

  • saddr {Object} Server socket address.
  • tlsOpt {Object} TLS securely connections options. undefined means TCP connection.
  • callback {Function} Connected callback function.
    • client {Object} Client object.
    • remote {Object} Remote address.
  • bufSize {Integer} Buffer size (512 ~ 4096). default: 2048.
  • Returns: {Object} A MQTT Client object.

Example

var serIp = dns.lookup('mqtt://mybroker.com');
var serAddr = socket.sockaddr(serIp, 8010);
var client = null;

mqtt.open(serAddr, undefined, (client, remote) => {
  if (remote) {
    // TCP is connected!
  } else {
    // TCP connect error!
  }
});

mqtt.mode()

  • Returns: {String} Operating mode.

Get the current process MQTT working mode:

  • 'off' MQTT is not enabled.
  • 'listener' MQTT listen only mode.
  • 'publisher' MQTT can subscribe and publish.

Client Object

client.close()

Close the connection with server.

client.connect(clientOpt[, callback])

  • clientOpt {Object} MQTT connect parameters.
  • callback {Function} Connected callback.
    • client {Object} Client object.

clientOpt MQTT connect parameters can includes following members:

  • client {Buffer} | {String} The broker identifies each client by its client id.
  • user {String} Optional. User name when connecting to a broker.
  • passwd {String} Optional. User password authentication when connecting to a broker.
  • keepalive {Integer} Keepalive time in seconds. If no data is sent on the connection in the given time window the broker disconnects the client.
  • will {Boolean} Optional. If this flag is set to true, a message and a topic must follow with a QoS value between 0 and 2.
  • qos {Integer} If will is set to true, the message will be sent with the given QoS.
  • topic {String} Only processed when will is set to true. The topic of the message should be sent to.
  • message {String} | {Buffer} Only processed when will is set to true. The message to be sent to the broker when connection broken.

Connect to the MQTT service with the specified parameters.

Example

/* MQTT Client Options */
var options = { client: 'mybroker', user: 'admin', passwd: 'password', will: false, topic: '', message: '', keepalive: 60, qos: 0 };

client.connect(options, (client) => {
  // MQTT Connected!
  client.publish(...);
});

client.isConnected()

  • Returns: {Boolean} Current MQTT connection status. true is connected, otherwise false.

Get current connection status.

client.isQueued()

  • Returns: {Boolean} Whether the send queue contains messages that are not acknowledged by the server.

Get whether the send queue contains messages that are not acknowledged by the server.

client.publish(topic, message[, options[, callback]])

  • topic {String} Message topic.
  • message {Buffer} | {String} Message content.
  • options {Object} Publish options. default: qos is 0, retain is false.
  • callback {Function} Published callback function.
    • error {Error} Identifies the sending error information, if it is undefined, it means success.

options MQTT publish parameters can includes following members:

  • qos {Integer} Optional. default: 0.
  • retain {Boolean} Optional. If retain is true the broker stores the message for clients subscribing with retain true flag, therefore they can receive it later. default: false.

Publish a message with the specified options.

Example

client.publish('topic1', 'message1');
client.publish('topic2', 'message2', { qos: 1 });
client.publish('topic3', 'message3', { qos: 1 }, (error) => {
  if (error) {
    console.log('Publish error:', error);
  }
});

client.subscribe(topic[, options[, callback]])

  • topic {String} Subscribe message topic.
  • options {Object} Publish options. default: qos is 0, retain is false.
  • callback {Function} Subscribe callback function.
    • error {Error} Identifies the subscribe error information, if it is undefined, it means success.

options MQTT subscribe parameters can includes following members:

  • qos {Integer} Optional. default: 0.
  • retain {Boolean} Optional. If retain is true the client receives the messages that were sent to the desired topic before it connected. Defaults to false. default: false.

The client subscribes to a given topic. If there are messages available on the topic the client emits a data event with the message received from the broker.

Example

client.subscribe('topic1', { qos: 1 });

client.unsubscribe(topic[, callback])

  • topic {String} Unsubscribe message topic.
  • callback {Function} Unsubscribe callback function.
    • error {Error} Identifies the subscribe error information, if it is undefined, it means success.

Unsubscribes the client from a given topic. If QoS was turned on on the subscription the remaining packets will be sent by the server.

Example

client.unsubscribe('topic1');

Events

The client object inherits from the EventEmitter class. The following events are thrown in some specific situations.

connect

Emitted when the client successfully connects to a MQTT broker.

disconnect

A disconnect event is emitted when network link is broken, and mqtt object will be call mqtt.close() automatically.

close

A close event is emitted when the broker disconnects the client gracefully.

error

If an error occured and no callback function called, an error event is emitted with the error information. If this event has no listeners, a Task.uncaughtException() exception is thrown.

message

When data is received from the server a message event is emitted with a data object. It has the following properties:

  • topic {String} The topic the message was sent from.
  • message {Buffer} The message the broker sent.
  • qos {Integer} The QoS level the message was sent with.
  • packetId {Integer} The id of the packet if QoS was enabled.

Example

Here is a typical example of MQTT Client.

var iosched = require('iosched');
var socket = require('socket');
var mqtt = require('mqtt');

// Server
var server = socket.sockaddr('192.168.0.2', 61613);

// Seq-number
var seqNumer = 0;

// MQTT Client
var client = null;

// MQTT Client Options.
var options = { client: 'mybroker', user: 'admin', passwd: 'password', keepalive: 60 };

// MQTT open
function clientOpen() {
  client = mqtt.open(server, null, 5000);
  if (client === undefined) {
    console.log(`Can not open ${server.addr} MQTT server!`);
    return false;

  } else {
    console.log(`${server.addr} MQTT server opened!`);
    client.on('message', function(data) {
      console.log(`recevied a message from mqtt, topic: ${data.topic}, message: ${data.message}`);
    });
    client.on('disconnect', function() {
      console.error('Lost broker!');
      setTimeout(clientConnectTimeout, 2000);
    });
    return true;
  }
}

// MQTT Publish timer
function clientPublishTimeout() {
  if (client) {
    try {
      if (client.isConnected()) {
        client.publish('/testing', 'test' + seqNumer);
        seqNumer++;
      }
    } catch (error) {
      console.log('Pulish error:', error);
    }
  }
}

// MQTT Connect timer.
function clientConnectTimeout() {
  if (clientOpen()) {
    console.log(`${server.addr} MQTT server connecting!`);
    client.connect(options, () => {
      console.log(`${server.addr} MQTT server connected!`);
      client.subscribe('/testing', { qos: 1 }, () => {
        console.log(`${server.addr} MQTT server subscribed!`);
      });
    });
  } else {
    setTimeout(clientConnectTimeout, 2000);
  }
}

// Connect
setImmediate(clientConnectTimeout);

// Publish test every 5 seconds.
setInterval(clientPublishTimeout, 5000);

// Asynchronous event loop
iosched.forever();
文档内容是否对您有所帮助?
有帮助
没帮助