Operator WebSocketSendReceive
The client-based WebSocketSendReceive operator simultaneously sends data to and receives data from a remote WebSocket server.
This operator can be used to send and receive text (plain text, JSON or XML) and/or binary data. Sending and receiving of data happens independent of each other due the bidirectional (full duplex) connection offered by the WebSocket protocol. Connections to the remote server are persistent and long running until either party (client or server) closes the connection.
This operator will establish a plain (ws or http) or TLS (wss or https) WebSocket based on the protocol provided in the url parameter.
This operator supports send-and-receive to and from a single WebSocket server, promoting a one-to-one data access pattern.
This operator provides one input port for applications to send data to the remote WebSocket server. This input port must have either rstring strData or blob blobData or both tuple attribute(s).
Additionally, this operator provides two output ports. The first output port is mandatory and outputs data received from the remote server. The output stream must have either rstring strData or blob blobData or both tuple attribute(s) in its schema. Additionally, the stream can have attributes assigned via several output functions.
The second output port is optional and outputs whether a given incoming tuple was successfully sent to the remote server. This output port must have int32 sendResultCode and rstring sendResultMessage as its output tuple attributes. A value of 0 in the sendResultCode attribute means data was successfully sent; otherwise there was an error in sending the data. If this attribute carries a non-zero value, then the application can have additional logic to retransmit that same data item at a later time. Please refer to the output port section below for more details.
Connection handling
A connection to the remote server is attempted when the operator invocation is initialized at runtime.
Once established, the operator will keep that connection persistent until the operator is shutdown, the remote server is stopped or any other network error occurs.
If the remote WebSocket server becomes unavailable or closes the connection, the operator will try to re-establish the connection at a periodic, user configurable interval.
It is recommended to program application logic to backoff and wait sending input tuples for a reasonable amount of time when a connection error occurs. Otherwise, every incoming tuple will trigger a reconnection attempt.
Example
use com.teracloud.streams.websocket.op::WebSocketSendReceive;
composite Main {
param
expression<uint32> $numberOfDataItemsToBeSent : 1000u;
type
SendData_t = rstring strData, blob blobData;
ReceivedData_t =
rstring strData, // String based data (e.g., plain text, JSON, etc.) received from the remote server
blob blobData, // Blob data received from the remote server
uint64 totalDataItemsReceived, // Total data items received from the remote server
uint64 totalDataBytesReceived, // Number of data bytes received from the remote server
uint64 totalDataItemsSent, // Total data items sent to the remote server
uint64 totalDataBytesSent; // Number of data bytes sent to the remote server
graph
stream<SendData_t> BinaryData as BD = Beacon() {
param
iterations: $numberOfDataItemsToBeSent;
initDelay: 10.0;
output
BD: blobData = convertToBlob("This data item " +
(rstring)(IterationCount() + 1ul) + " is sent as a blob.");
}
stream<SendData_t> TextData as TD = Beacon() {
param
iterations: $numberOfDataItemsToBeSent;
initDelay: 10.0;
output
TD: strData = "This data item " +
(rstring)(IterationCount() + 1ul) + " is sent as a text.";
}
// Send binary and text data to the remote WebSocket server and at the same time
// receive binary data or text data or both from the remote WebSocket server.
stream<ReceivedData_t> ReceivedData = WebSocketSendReceive(BinaryData,TextData) {
param
url: "ws://example.com";
output
// strData and/or blobData attributes will be automatically
// assigned with received data values within the operator logic.
// Other attributes can be assigned manually as done below.
ReceivedData: totalDataItemsReceived = getTotalDataItemsReceived(),
totalDataBytesReceived = getTotalDataBytesReceived(),
totalDataItemsSent = getTotalDataItemsSent(),
totalDataBytesSent = getTotalDataBytesSent();
}
// See $STREAMS_INSTALL/samples/com.teracloud.streams.websocket for more examples
}
Summary
- Ports
- This operator has 1 input port and 2 output ports.
- Windowing
- This operator does not accept any windowing configurations.
- Parameters
- This operator supports 14 parameters.
Required: url
Optional: certificateFileName, certificatePassword, metricsResetInterval, newDataCpuYieldTimeInSenderThread, noDataCpuYieldTimeInSenderThread, reconnectionInterval, tcpNoDelay, trustedServerCertificateFileName, trustedServerX509SubjectIdentifiers, websocketLiveMetricsUpdateNeeded, websocketLoggingNeeded, wsClientSessionLoggingNeeded, wsConnectionLoggingNeeded
- Metrics
- This operator reports 9 metrics.
Properties
- Implementation
- C++
- Threading
- Never - Operator never provides a single threaded execution context.
- Ports (0)
-
This port brings the data into this operator for sending it to the remote WebSocket server.
Attributes on this input port:- strData (required, rstring) - Any text based data (plain text, JSON or XML) can be sent to the remote WebSocket server via this attribute.
- blobData (required, blob) - Any binary (blob) data can be sent to the remote WebSocket server via this attribute.
- Properties
-
- Optional: false
- ControlPort: false
- TupleMutationAllowed: true
- WindowingMode: NonWindowed
- WindowPunctuationInputMode: Oblivious
- Assignments
- This operator allows any SPL expression of the correct type to be assigned to output attributes.
- Output Functions
-
- WebSocketSendReceiveFunctions
-
- <any T> T AsIs(T)
-
The default function for output attributes. This function assigns the output attribute to the value of the input attribute with the same name.
- uint64 getTotalDataItemsReceived()
-
Returns an uint64 value indicating the total number of data items received so far from the remote server.
- uint64 getTotalDataBytesReceived()
-
Returns an uint64 value indicating the total number of data bytes received so far from the remote server.
- uint64 getTotalDataItemsSent()
-
Returns an uint64 value indicating the total number of data items sent so far to the remote server.
- uint64 getTotalDataBytesSent()
-
Returns an uint64 value indicating the total number of data bytes sent so far to the remote server.
- Ports (0)
-
This port produces output tuples that carry the text or binary data received from the remote WebSocket server.
The schema for this port must either have an attribute named strData with an rstring data type to hold the received text data or an attribute named blobData with a blob data type to hold the received binary data or both.
Additional output tuple attributes can be assigned via the available output functions. For example, two uint64 attributes can be set to the total number of tuples and bytes received from the remote server. See the available output functions for more info.
- Properties
-
- Optional: false
- TupleMutationAllowed: false
- WindowPunctuationOutputMode: Free
- Ports (1)
-
This is an optional output port. If it is present, it returns whether a given input tuple consumed by this operator was successfully sent to the remote WebSocket server or not. The schema for this port must have two attributes:
- int32 sendResultCode
- rstring sendResultMessage
sendResultCode will carry a value of 0 on a successful send to the remote server and a non-zero result in case of an error in sending the data. sendResultMessage will carry a descriptive message about the send result. Application logic can check these output tuple attributes and have additional logic to retransmit a given input tuple if it was not sent to the remote server due to an error (e.g: broken connection).
- Properties
-
- Optional: true
- TupleMutationAllowed: false
- WindowPunctuationOutputMode: Free
Required: url
Optional: certificateFileName, certificatePassword, metricsResetInterval, newDataCpuYieldTimeInSenderThread, noDataCpuYieldTimeInSenderThread, reconnectionInterval, tcpNoDelay, trustedServerCertificateFileName, trustedServerX509SubjectIdentifiers, websocketLiveMetricsUpdateNeeded, websocketLoggingNeeded, wsClientSessionLoggingNeeded, wsConnectionLoggingNeeded
- certificateFileName
-
This parameter specifies the full path of the WebSocket client's private key and public certificate holding PEM file name. Default is an empty string. User can override this.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- certificatePassword
-
This parameter specifies a password needed for decrypting the WebSocket client's private key in the PEM file. Default is an empty string.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- metricsResetInterval
-
This parameter can be used to specify a non-zero periodic time interval in minutes after which all the metrics counters will be reset to 0. (Default is 0 which will never reset the metrics counters.)
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- newDataCpuYieldTimeInSenderThread
-
This parameter specifies the CPU yield time (in partial seconds) needed inside the thread that is just about to send a new data item to the remote server. It should be >= 0.0 (Default is 0.001 i.e. 1 millisecond)
- Properties
-
- Type: float64
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- noDataCpuYieldTimeInSenderThread
-
This parameter specifies the CPU yield time (in partial seconds) needed inside the thread that spin loops when no data is available to send to the remote server. It should be >= 0.0 (Default is 0.001 i.e. 1 millisecond)
- Properties
-
- Type: float64
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- reconnectionInterval
-
This parameter specifies the periodic time interval (in partial seconds) at which reconnection to the remote WebSocket server will be attempted. It should be > 0.0 (Default is 60.0 seconds)
- Properties
-
- Type: float64
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- tcpNoDelay
-
This parameter can be used to control the TCP Nagle's algorithm. Setting it to true will disable Nagle's algorithm and setting it to false will enable. (Default is false.)
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- trustedServerCertificateFileName
-
This parameter specifies the full path of the PEM file name that contains the public certificate of the trusted remote server. This allows for the server authentication. If this parameter is not used or empty, then there will be no server authentication. (Default is an empty string.)
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- trustedServerX509SubjectIdentifiers
-
This parameter specifies a list of verifiable identifiers present in the subject field of the trusted server's public certificate. It is helpful in performing the server authentication using the unsupported certificate types such as the self-signed ones. Some examples of such identifiers: ["ST=New York","L=Armonk","O=IBM","CN=www.ibm.com","emailAddress=websocket.streams@ibm.com"] (Default is an empty list.)
- Properties
-
- Type: list<rstring>
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- url
-
This parameter specifies the URL of the remote WebSocket server.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: false
- ExpressionMode: AttributeFree
- websocketLiveMetricsUpdateNeeded
-
This parameter specifies whether live update for this operator's custom metrics is needed. (Default is true)
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- websocketLoggingNeeded
-
This parameter specifies whether logging is needed from the WebSocket library. (Default is false)
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- wsClientSessionLoggingNeeded
-
This parameter specifies whether logging is needed when the client session send/receive is in progress with the remote server. (Default is false)
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- wsConnectionLoggingNeeded
-
This parameter specifies whether logging is needed when this operator connects and disconnects to/from the remote server. (Default is false)
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- WebSocketSendReceive
-
stream<${outputSchema}> ${outputStream} as O = com.teracloud.streams.websocket.op::WebSocketSendReceive(${inputStream}) { param url: ${url}; ${parameter}: ${parameterExpression}; output O: ${attributeAssignment}; } - WebSocketSendReceive with send result stream
-
(stream<${outputSchema}> ${outputStream} as O; stream<int32 sendResultCode, rstring sendResultMessage> ${resultStream}) = com.teracloud.streams.websocket.op::WebSocketSendReceive(${inputStream}) { param url: ${url}; ${parameter}: ${parameterExpression}; output O: ${attributeAssignment}; }
- nDataBytesReceivedFromRemoteServer - Gauge
-
Total number of data bytes received from the remote server thus far.
- nDataBytesSentToRemoteServer - Gauge
-
Total number of data bytes sent to the remote server thus far.
- nDataItemsReceivedFromRemoteServer - Gauge
-
Number of data items received from the remote server thus far.
- nDataItemsSentToRemoteServer - Gauge
-
Number of data items sent to the remote server thus far.
- nSizeOfMostRecentDataItemReceived - Gauge
-
Size of the most recent data item received from the remote server.
- nSizeOfMostRecentDataItemSent - Gauge
-
Size of the most recent data item sent to the remote server.
- nTcpNoDelay - Gauge
-
Did the user configure TCP_NODELAY for this operator?
- nTimeTakenToSendMostRecentDataItem - Gauge
-
Time taken, in milliseconds, to send the most recent data item to the remote server.
- nWebsocketConnectionActiveStatus - Gauge
-
Indicates the current active status of the WebSocket connection.
- Boost Library
- Websocketpp Library