Operator MQTTSink

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.teracloud.streams.mqtt/op$com.teracloud.streams.mqtt$MQTTSink.svg

The MQTTSink operator creates a message for every tuple it receives on its input port and publishes the message to an MQTT server. The data to publish to the MQTT server must be of type blob or rstring.

To select which MQTT topic to publish to, you can use the topic parameter to directly specify one, or you can use the topicAttributeName parameter to specify an attribute in the input stream to dynamically select at run time. The input stream can optionally contain the quality of service for the message. This information is not considered to be part of the data.

Behavior in a consistent region

MQTTSink operators can be in a consistent region, but it can not be the start of a consistent region. Having a control port in a consistent region is not supported. The control information may not be replayed, persisted and restored correctly. You may need to manually replay the control signals to bring the operator back to a consistent state.

When the MQTTSink operator is in a consistent region, messages with qos=1 or qos=2 will be delivered to an MQTT provider at least once. Messages with qos=0 can still be lost as a result of application failures.

Examples


use com.teracloud.streams.mqtt::MQTTSink;
composite Main {
  type StudentSchema = int32 id, rstring fname, rstring lname, uint8 grade, float32 score, float64 total;
  graph
    stream<StudentSchema> Msgs = Beacon() {}
    stream<blob data> FormattedMsgs = Format(Msgs) {
      param
        format : csv;
      output
        FormattedMsgs : data = Output();
    }

    // Static topic
    () as StaticSink = MQTTSink(FormattedMsgs) {
      param
        serverURI : getSubmissionTimeValue("serverUri");
        topic : "Students";
    }

    stream<StudentSchema, tuple<rstring topic>> MsgsWithTopic = Beacon() {}
    stream<rstring topic, blob data> FormattedMsgsWithTopic = Format(MsgsWithTopic) {
      param
        suppress : topic;
        format : csv;
      output
        FormattedMsgsWithTopic: data = Output();
    }

    // Dynamic topic
    () as DynamicSink = MQTTSink(FormattedMsgsWithTopic) {
      param
        serverURI : getSubmissionTimeValue("serverUri");
        topicAttributeName : "topic";
    }
}

Summary

Ports
This operator has 1 or more input ports and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 24 parameters.

Optional: appConfigName, clientID, commandTimeout, connection, connectionDocument, dataAttributeName, keepAliveInterval, keyStore, keyStorePassword, password, passwordPropName, period, qos, qosAttributeName, reconnectionBound, retain, serverURI, sslProtocol, topic, topicAttributeName, trustStore, trustStorePassword, userID, userPropName

Metrics
This operator reports 2 metrics.

Properties

Implementation
Java

Input Ports

Ports (0)

Input port 0 is a data port and is mandatory.

Properties
Ports (1...)

Input port 1 is an optional control port that can be used to update the configuration of the operator at run time.

Properties

Output Ports

Assignments
Java operators do not support output assignments.
Ports (0)

This port is an error port where a single tuple is sent for each failed message. The tuple contains a single attribute of type rstring, which contains the details of the error message.

Properties

Parameters

This operator supports 24 parameters.

Optional: appConfigName, clientID, commandTimeout, connection, connectionDocument, dataAttributeName, keepAliveInterval, keyStore, keyStorePassword, password, passwordPropName, period, qos, qosAttributeName, reconnectionBound, retain, serverURI, sslProtocol, topic, topicAttributeName, trustStore, trustStorePassword, userID, userPropName

appConfigName

Specifies the name of the application configuration that stores client credential information, the credential specified via application configruation overrides the one specified with userID and password parameters.

Properties
clientID

All clients connected to the same server must have a unique ID. This optional parameter allows users to specify a client ID to use when connecting to a MQTT provider. An ID will be generated by the operator if this parameter is not specified.

Properties
commandTimeout

Specifies maximum time in millisecond to wait for an MQTT action to complete. A MQTT action can include connecting to a server, or publshing to a message. A value of 0 will cause the operator to wait indefinitely for an action to complete. A negative number will cause a runtime error. If unspecified, the default value for this parameter is 0.

Properties
connection

Name of the connection specification in the connection document to use.

Properties
connectionDocument

Path to connection document. If unspecified, defaults to "etc/connections.xml". If a relative path is specified, the path is relative to the application directory.

Properties
dataAttributeName

Specifies the name of the attribute that is used to hold actual content of message. If not specified and there are multiple attributes in the stream's schema, the operator will look for attribute named data. In the case where the schema contains only a single attribute, the operator will assume that the attribute is the data attribute

Properties
keepAliveInterval

Sets the maximum time interval (in seconds) between messages sent or received. It enables the client to detect if the server is no longer available. By default, it is set to 60 seconds. A value of 0 will disable it. Negative number will cause a runtime error.

Properties
keyStore

Specifies the file that contains the public and private key certificates of the MQTT client. If a relative path is specified, the path is relative to the application directory.

Properties
keyStorePassword

Specifies the password to decrypt the encrypted keyStore file.

Properties
password

Sets the password to use for the connection. Must be specified when userID parameter is used, or compile time error will occur

Properties
passwordPropName

Specifies the property name of password in the application configuration. If the appConfigName parameter is specified and the passwordPropName parameter is not set, a compile time error occurs.

Properties
period

This parameter specifies the time period in milliseconds the operator waits before it tries to reconnect. It is an optional parameter of type int64. Default value is 60000 ms.

Properties
qos

Specifies the quality of service that the operator provides for each MQTT message it publishes to an MQTT topic. The valid values are 0, 1, and 2. The default value is 0.

Important: The quality of service is provided by the MQTT server to its subscribers. For each message published, the MQTTSink operator passes the value of the qos parameter as a part of the message header to the MQTT server.

If the qos parameter is set to 0, there is no guarantee that the message is received by the MQTT server or is handled by any of the message subscribers. The operator publishes the message at most once. No further attempts are made to publish the message again, and the message is lost in case of failures. If the qos value is set to 1 or 2, the operator publishes the message and waits until it receives an acknowledgement from the MQTT server before it discards the message. However, if the MQTTSink operator terminates unexpectedly while it is processing the input tuple or creating a message, or if there is a connection failure, the message is lost. There is no guarantee that the message is received by the MQTT server. If the MQTTSink operator publishes the topic successfully, the MQTT server ensures that the quality of service that is defined by the qos parameter is provided to the message subscribers.

Properties
qosAttributeName

Attribute name that contains the qos to publish the message with. This parameter is mutually exclusive with the qos parameter.

Properties
reconnectionBound

Specifies the number of successive connections that are attempted for an operator. Specify 0 for no retry, n for n number of retries, -1 for inifinite retry.

Properties
retain

Indicates if messages should be retained on the MQTT server. Default is false.

Properties
serverURI

Specifies the URI of the MQTT server to connect to. The serverURI has the following format: protocol://hostname or IP address:portnumber. The supported protocols are SSL and TCP. To use SSL authentication, set the protocol to ssl.

Properties
sslProtocol

Specifies the ssl protocol to use for making SSL connections. If this parameter is not specified, the default protocol TLSv1.2 will be used.

Properties
topic

This mandatory parameter of type rstring specifies the MQTT topic that you want to publish to. You can specify a static string, such as "traffic/freeway/880". This parameter is mutually exclusive with the topicAttributeName parameter.

Properties
topicAttributeName

Attribute name that contains the topic to publish the message with. This parameter is mutually exclusive with the topic parameter.

Properties
trustStore

Specifies the name of the file that contains the public certificate of the trusted MQTT server. If a relative path is specified, the path is relative to the application directory.

Properties
trustStorePassword

Specifies the password to decrypt the encrypted trustStore file.

Properties
userID

Sets the user name to use for the connection. Must be specified when password parameter is used, or compile time error will occur

Properties
userPropName

Specifies the property name of user name in the application configuration. If the appConfigName parameter is specified and the userPropName parameter is not set, a compile time error occurs.

Properties

Metrics

isConnected - Gauge

Indicates if operator currently connected to MQTT server, a value of 1 indicates it is connected and a value of 0 indicates it is not connected.

nConnectionLost - Counter

The number of lost connections to current MQTT server.

Libraries

Operator class library
Library Path: ../../impl/lib/com.teracloud.streams.mqtt.jar, ../../opt/downloaded/*