Overview of the MQTT Collector

The MQTT collector collects data published to a topic using an MQTT broker.
Note: We have tested with the MQTT brokers Mosquitto 2.0.15 and HiveMQ-4.2.1. You can, however, use other MQTT brokers as well.
Supported MQTT versions:
  • MQTT V5
  • MQTT V3.1.1
Supported data formats:
  • Sparkplug B V1.0
    Note: The MQTT collector supports Sparkplug B payload and processes only NDATA or DDATA messages, discarding the other messages.
  • KairosDB (that is, the Predix Timeseries format)

Topology: The MQTT collector supports a distributed model, that is, the MQTT broker, the collector, and the Historian server can be installed on the same machine or on different machines.

Features:
  • You can subscribe to multiple-level topics using a wildcard.
  • Only the unsolicited data collection is supported; polled collection is not supported.
  • The timestamp resolution is seconds, milliseconds, and microseconds.
  • Array, boolean, floating point, integer, and string data types are supported.
Note: Although the Recalculate button in Historian Administrator is enabled, the functionality is not available because the MQTT collector is a non-historic collector (the source broker does not store the historical data).
How it works:
  1. The MQTT collector connects to an MQTT broker and subscribes to a topic. You can use username/password-based authentication or certificate-based authentication. Transport Layer Security (TLS) authentication is used for subscribing the data from message broker to avoid middleware attacks so that the data is securely transferred from message broker to the MQTT collector.
  2. The collector converts the data from the Sparkplug B v1.0 or the KairosDB format to a Historian-understandable format.
  3. It verifies whether the tag is available in Historian; if not, it will add the tag and then add the data samples, and streams the data to the Historian server or a cloud destination.
KairosDB Message Format:
{
"body":
[
    {
        "attributes":{"machine_type":"<value>"},
        "datapoints":[[<value>,<value>,<value>]],
        "name":"<value>"}],
        "messageId":"<value>"}
The following table describes these parameters.
JSON Parameter Description Required/Optional
machine_type The name of the machine from which you want to collect data. Optional
datapoints Time (in epoch format), value, and quality. Required
name The tag name Required
messageId The type of the message Optional
Note: For the parameters marked optional, you need not enter values. However, you must enter the parameter names. For example:
{"body":[{"attributes":{"machine_type":" "},
"datapoints":[[1558110998983,9547909,3]],"name":"QuadInteger"}],"messageId":" "}
Supported Data Types
Source Data Type Historian Data Type
Array ihArray
DoubleFloat, DoubleInteger, FixedByte, QuadInteger, SingleFloat ihDoubleFloat
ByteString, String ihVariableString
Boolean ihBool