Operator RabbitMQSink
This operator acts as a RabbitMQ producer, sending messages to a RabbitMQ broker. The broker is assumed to be already configured and running.
The incoming stream can have three attributes: message, routing_key, and messageHeader. The message attribute is required. The exchange name, queue name, and routing key can be specified using parameters. If a specified exchange does not exist, it will be created as a non-durable exchange. All exchanges created by this operator are non-durable and auto-delete.
This operator supports direct, fanout, and topic exchanges. It does not support header exchanges.
Messages are non-persistent and sending will only be attempted once by default. This behavior can be modified using the deliveryMode and maxMessageSendRetries parameters.
Behavior in a Consistent Region
This operator can participate in a consistent region. It cannot be the start of a consistent region.
Configuring the operator to communicate with RabbitMQ brokers
- credentials from an application configuration
- Operator parameters (username and password).
The valid key-value pairs in the appConfig are <userPropName>=<username> and <passwordPropName>=<password>, where <userPropName> and <passwordPropName> are specified by the corresponding parameters. This operator will only automatically recover with new credentials from the appConfig if automaticRecovery is set to true.
Summary
- Ports
- This operator has 1 input port and 0 output ports.
- Windowing
- This operator does not accept any windowing configurations.
- Parameters
- This operator supports 28 parameters.
Optional: URI, appConfigName, automaticRecovery, deliveryMode, exchangeName, exchangeType, hostAndPort, keyStoreAlgorithm, keyStorePassword, keyStorePath, keyStoreType, maxMessageSendRetries, messageAttribute, messageSendRetryDelay, msgHeaderAttribute, password, passwordPropName, routingKeyAttribute, setNetworkRecoveryInterval, sslProtocol, trustStoreAlgorithm, trustStorePassword, trustStorePath, trustStoreType, useSSL, userPropName, username, virtualHost
- Metrics
- This operator reports 3 metrics.
Properties
- Implementation
- Java
- Ports (0)
- Properties
-
- Optional: false
- ControlPort: false
- WindowingMode: NonWindowed
- WindowPunctuationInputMode: Oblivious
Optional: URI, appConfigName, automaticRecovery, deliveryMode, exchangeName, exchangeType, hostAndPort, keyStoreAlgorithm, keyStorePassword, keyStorePath, keyStoreType, maxMessageSendRetries, messageAttribute, messageSendRetryDelay, msgHeaderAttribute, password, passwordPropName, routingKeyAttribute, setNetworkRecoveryInterval, sslProtocol, trustStoreAlgorithm, trustStorePassword, trustStorePath, trustStoreType, useSSL, userPropName, username, virtualHost
- URI
-
Convenience URI of form: amqp://userName:password@hostName:portNumber/virtualHost. If URI is specified, you cannot specify username, password, and host.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- appConfigName
-
This parameter specifies the name of application configuration that stores client credentials, the property specified via application configuration is overridden by the application parameters. The hierarchy of credentials goes: credentials from the appConfig beat out parameters (username and password). The valid key-value pairs in the appConfig are <userPropName>=<username> and <passwordPropName>=<password>, where <userPropName> and <passwordPropName> are specified by the corresponding parameters. If the operator loses connection to the RabbitMQ server, or it fails authentication, it will check for new credentials in the appConfig and attempt to reconnect if they exist. The attempted reconnection will only take place if automaticRecovery is set to true (which it is by default).
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- automaticRecovery
-
Have connections to RabbitMQ automatically recovered. Default is true.
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- deliveryMode
-
Marks message as persistent(2) or non-persistent(1). Default as 1.
- Properties
-
- Type: int32
- Cardinality: 1
- Optional: true
- exchangeName
-
Name of the RabbitMQ exchange to send messages to. To use default RabbitMQ exchange, use empty quotes or do not specify: "".
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- exchangeType
-
Optional attribute. Name of the RabbitMQ exchange type. Default direct.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- hostAndPort
-
List of host and port in form: "myhost1:3456","myhost2:3456".
- keyStoreAlgorithm
-
Specifies the algorithm that was used to encrypt the keyStore. If not specified, the operator will use the JVM's default algorithm (typically IbmX509).
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- keyStorePassword
-
Specifies the password used to unlock the keyStore.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- keyStorePath
-
Specifies the path to the keyStore file. This parameter is required if the useSSL parameter is set to true.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- keyStoreType
-
Specifies the keyStore type. If not specified, the operator will use the JVM's default type (typically JKS).
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- maxMessageSendRetries
-
This optional parameter specifies the number of successive retries that are attempted for a message if a failure occurs when the message is sent. The default value is zero; no retries are attempted.
- Properties
-
- Type: int32
- Cardinality: 1
- Optional: true
- messageAttribute
-
Name of the attribute for the message. Default is "message".
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- messageSendRetryDelay
-
This optional parameter specifies the time in milliseconds to wait before the next delivery attempt. If the maxMessageSendRetries is specified, you must also specify a value for this parameter.
- Properties
-
- Type: int32
- Cardinality: 1
- Optional: true
- msgHeaderAttribute
-
Name of the attribute for the message_header. Schema of type must be Map<ustring,ustring>. Default is "message_header".
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- password
-
Password for RabbitMQ authentication.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- passwordPropName
-
This parameter specifies the property name of password in the application configuration. If the appConfigName parameter is specified and the passwordPropName parameter is not set, a compile time error occurs.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- routingKeyAttribute
-
Name of the attribute for the routing_key. Default is "routing_key".
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- setNetworkRecoveryInterval
-
If automaticRecovery is set to true, this is the interval (in ms) that will be used between reconnection attempts. The default is 5000 ms.
- Properties
-
- Type: int64
- Cardinality: 1
- Optional: true
- sslProtocol
-
Specifies the SSL protocol to use. If not specified, the default value is "TLSv1.2".
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- trustStoreAlgorithm
-
Specifies the algorithm that was used to encrypt the trustStore. If not specified, the operator will use the JVM's default algorithm (typically IbmX509).
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- trustStorePassword
-
Specifies the password used to unlock the trustStore.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- trustStorePath
-
Specifies the path to the trustStore file. This parameter is required if the useSSL parameter is set to true.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- trustStoreType
-
Specifies the trustStore type. If not specified, the operator will use the JVM's default type (typically JKS).
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- useSSL
-
Specifies whether an SSL connection should be created. If not specified, the default value is false.
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- userPropName
-
This parameter specifies the property name of user name in the application configuration. If the appConfigName parameter is specified and the userPropName parameter is not set, a compile time error occurs.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- username
-
Username for RabbitMQ authentication.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- virtualHost
-
Set Virtual Host. Default is null.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- isConnected - Gauge
-
Describes whether we are currently connected to the RabbitMQ server.
- reconnectionAttempts - Counter
-
The accumulated number of times we have attempted to reconnect since the operator has been started.
- reconnectionAttemptsLatestBatch - Counter
-
The number of times we have attempted to reconnect to establish the last successful connection.
- Operator class library