Operator control input port
Both operators in the MQTT Toolkit provide the capability to specify an optional control input port. This input port allows the configuration of the operator to be changed at run time without restarting or recompiling the application.
Each operator has a unique schema required for their control port.
MQTTSink
map<rstring, rstring> mqttConfig
This schema is predefined as the MQTTSinkUpdate.ControlPortType type for developer convenience.
The rstring elements contain the name of the configuration option and its value. When a tuple is received, the operator reads the options and values and updates the MQTT server connection at run time. If an invalid value is specified, the operator ignores the value and continues to use the earlier connection information.
See the Configuration options section below for a full list of valid options.
MQTTSource
list<TopicsUpdateDesc> topic, map<rstring, rstring> mqttConfig
where TopicsUpdateDesc is defined as:
type TopicsUpdateDesc = TopicsUpdateAction action, list<rstring> topics, uint32 qos;
- The action attribute is of type TopicsUpdateAction and defines the following operations that you can perform on the topics:
- ADD: Add the provided list of topics to the list of subscribed topics.
- REMOVE: Remove the provided list of topics from the list of subscribed topics.
- REPLACE: Replace the current list of subscribed topics with the list of topics provided.
- UPDATE: Change the QoS values for the specified list of topics with the QoS values that are provided.
- The topics attribute is the list of topics that you want to update
- The qos attribute is the quality of service that you want to assign to the list of topics
This schema is predefined as the MQTTSourceUpdate.ControlPortType type for developer convenience.
Similiar to the MQTTSink operator, the map attribute contains configuration options that you want to update at run time. If an invalid value is specified, the operator ignores the value and continues to use the earlier connection information. If you do not want to update the connection information, you can specify an empty map attribute (e.g., map<rstring, rstring> emptyCon = { "" : "" };).
- The operation to perform on the topics
- The list of topics
- The Quality of Service (QoS) to be provided for each of the topics
- NOTE: QoS applies only for ADD, REPLACE, and UPDATE operations.
- For ADD and REPLACE operations, the QoS is applied to the topics that are added or replaced.
- Tip: To update the QoS for all the subscribed topics, use asterisk * as a wildcard character for the topics element.
Configuration options
- connection.trustStore
- A new truststore file to establish a new SSL connection with the MQTT server. The connection.serverURI option must be provided and the protocol must be set to ssl. Otherwise, the tuple received on the control port is ignored and the operator continues to use its current connection configuration.
- connection.keyStore
- A new keystore to establish a new SSL connection with the MQTT server. The connection.serverURI option must be provided and the protocol must be set to ssl. Otherwise, the tuple received on the control port is ignored and the operator continues to use its current connection configuration.
- connection.keyStorePassword
- The password to decrypt the new keystore if the keystore is encrypted. The connection.serverURI and connection.keyStore options must be provided and the protocol must be set to ssl. Otherwise, the tuple received on the control port is ignored and the operator continues to use its current connection configuration.
- connection.serverURI
- URI of the new MQTT server that you want to connect to. If the connection is configured to use SSL, based on the type of SSL authentication, values for connection.trustStore, connection.keyStore, or both must be provided.
QoS examples
The following example shows how to add topics A, B, C, and D with different QoS values to the list of subscribed topics.
stream<MQTTSourceUpdate.ControlPortType> ControlOp_out1 = Custom(){
logic
onProcess :{
// empty mqtt config because we are not changing connection information.
map<rstring, rstring> emptyMqttConfig = { "" : "" } ;
// Topic A and Topic B needs QoS = 1
MQTTSourceUpdate.TopicsUpdateDesc set1 = {
action = MQTTSourceUpdate.ADD,topics = [ "topicA", "topicB" ], qos = 1u } ;
// Topic C and Topic D needs QoS = 0
MQTTSourceUpdate.TopicsUpdateDesc set2 = {
action = MQTTSourceUpdate.ADD, topics = [ "topicC", "topicD" ], qos = 0u } ;
// construct tuple to submit
tuple<MQTTSourceUpdate.ControlPortType> tupleToSubmit = {
mqttConfig = emptyMqttConfig, topic = [ set1, set2 ] } ;
submit(tupleToSubmit, ControlOp_out1) ;
}
}
The following example shows how to update QoS for all the existing topics by using the wildcard character (*):
stream<MQTTSourceUpdate.ControlPortType> ControlOp_out2 = Custom(){
logic
onProcess :{
// empty mqtt config because we are not changing connection information.
map<rstring, rstring> emptyMqttConfig = { "" : "" } ;
// Update all topics to QoS = 2
MQTTSourceUpdate.TopicsUpdateDesc set1 = { action =
MQTTSourceUpdate.UPDATE, topics = [ "*" ], qos = 2u } ;
// construct tuple to submit
tuple<MQTTSourceUpdate.ControlPortType> tupleToSubmit = {
mqttConfig = emptyMqttConfig, topic = [ set1 ] } ;
submit(tupleToSubmit, ControlOp_out2) ;
}
}