Operator WebSocketSendReceive

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.teracloud.streams.websocket/op$com.teracloud.streams.websocket.op$WebSocketSendReceive.svg

The client-based WebSocketSendReceive operator is designed to serve a dual purpose of simultaneously sending data to and receiving data from a remote WebSocket server. This operator can be used to send and receive text (plain text, JSON or XML) and/or binary data. With this arrangement, sending and receiving of data can happen independent of each other. This feature of simultaneously sending and receiving data to/from the remote WebSocket server is possible due to the bidirectional (full duplex) connection offered by the WebSocket protocol. Such a connection to the remote server is persistent and long running until either party (client or server) closes the connection. Based on the remote WebSocket server url's protocol field, this operator will establish a connection either via plain WebSocket (ws or http) or via TLS WebSocket (wss or https).

This is Send-and-Receive to/from a single server-based remote WebSocket application. This operator promotes One-To-One data access pattern.

Connection to the remote server is attempted when this operator starts up. If the remote WebSocket server becomes unavailable at any given time, this operator behind the scenes will try to establish the connection at a user configurable periodic interval. Once established, this operator will keep that connection persistent until this operator is shutdown or the remote server is stopped or any other network error occurs. If the remote server closes the connection when this operator is still active, any incoming tuple into this operator at that time will not be sent to the remote server at all. In this case, the application logic invoking this operator can retransmit that tuple at a later time just because this operator will keep trying to reestablish the connection behind the scenes. It is a good practice for the application logic to backoff and wait for a reasonable amount of time when there is a connection error with the remote WebSocket server before inputting a tuple again into this operator. Otherwise, it will trigger too many connection attempts on every incoming tuple to send it to the remote server. So, the application logic should make an attempt to wait for a while before deciding to send the data after knowing that there is an ongoing connection problem with the remote server.

This operator provides one input port through which an application can send data to the remote WebSocket server. This input port must have either rstring strData or blob blobData or both of them as the incoming tuple attribute(s).

This operator provides two output ports where the first output port is mandatory and the second output port is optional. On the first output port, it keeps returning the data received from the remote server. This output port must have either rstring strData or blob blobData or both of them as the output tuple attribute(s). Optionally, it can have uint64 numberOfDataItemsReceived and uint64 numberOfDataBytesReceived attributes to be assigned via the output functions provided by this operator. If the second output port is present, it keeps returning the result about whether a given incoming tuple was successfully sent to the remote server or not. This output port must have int32 sendResultCode and rstring sendResultMessage as its output tuple attributes. A value of 0 in sendResultCode means a successful sending of data, otherwise error in sending the data. If this attribute carries a non-zero value, then the application can have additional logic to retransmit that same data item at a later time. Please refer to the output port section below for more details.

See the samples folder inside this toolkit for a working example that shows how to use this operator.

Summary

Ports
This operator has 1 input port and 2 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 14 parameters.

Required: url

Optional: certificateFileName, certificatePassword, metricsResetInterval, newDataCpuYieldTimeInSenderThread, noDataCpuYieldTimeInSenderThread, reconnectionInterval, tcpNoDelay, trustedServerCertificateFileName, trustedServerX509SubjectIdentifiers, websocketLiveMetricsUpdateNeeded, websocketLoggingNeeded, wsClientSessionLoggingNeeded, wsConnectionLoggingNeeded

Metrics
This operator reports 9 metrics.

Properties

Implementation
C++
Threading
Never - Operator never provides a single threaded execution context.

Input Ports

Ports (0)

This port brings the data into this operator for sending it to the remote WebSocket server.

Attributes on this input port:
  • strData (required, rstring) - Any text based data (plain text, JSON or XML) can be sent to the remote WebSocket server via this attribute.
  • blobData (required, blob) - Any binary (blob) data can be sent to the remote WebSocket server via this attribute.

All the extra input attributes will be ignored.

Properties

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes.
Output Functions
WebSocketSendReceiveFunctions
<any T> T AsIs(T)

The default function for output attributes. This function assigns the output attribute to the value of the input attribute with the same name.

uint64 getTotalDataItemsReceived()

Returns an uint64 value indicating the total number of data items received so far from the remote server.

uint64 getTotalDataBytesReceived()

Returns an uint64 value indicating the total number of data bytes received so far from the remote server.

uint64 getTotalDataItemsSent()

Returns an uint64 value indicating the total number of data items sent so far to the remote server.

uint64 getTotalDataBytesSent()

Returns an uint64 value indicating the total number of data bytes sent so far to the remote server.

Ports (0)

This port produces the output tuples that carry the text or binary data received from the remote WebSocket server. The schema for this port must either have an attribute named strData with an rstring data type to hold the received text data or an attribute named blobData with a blob data type to hold the received binary data or both. This port can also have two more uint64 optional attributes to receive the current output tuple count and the total number of data bytes received thus far from the remote server. Please refer to the custom output functions provided by this operator to query the current tuple count and the total data bytes received and assign such values to the optional tuple attributes in this output port.

There are multiple available output functions, and output attributes can also be assigned values with any SPL expression that evaluates to the proper type.

Properties

Ports (1)

This is an optional output port. If it is present, it returns the result about whether a given input tuple consumed by this operator was successfully sent to the remote WebSocket server or not. The schema for this port must have two attributes: int32 sendResultCode, rstring sendResultMessage. sendResultCode will carry a value of 0 on a successful send to the remote server and a non-zero result in case of an error in sending the data. sendResultMessage will carry a descriptive message about the send result. Application logic can check these output tuple attributes and have additional logic to retransmit a given input tuple if it was not sent to the remote server due to an error (e-g: broken connection).

Properties

Parameters

This operator supports 14 parameters.

Required: url

Optional: certificateFileName, certificatePassword, metricsResetInterval, newDataCpuYieldTimeInSenderThread, noDataCpuYieldTimeInSenderThread, reconnectionInterval, tcpNoDelay, trustedServerCertificateFileName, trustedServerX509SubjectIdentifiers, websocketLiveMetricsUpdateNeeded, websocketLoggingNeeded, wsClientSessionLoggingNeeded, wsConnectionLoggingNeeded

certificateFileName

This parameter specifies the full path of the WebSocket client's private key and public certificate holding PEM file name. Default is an empty string. User can override this.

Properties

certificatePassword

This parameter specifies a password needed for decrypting the WebSocket client's private key in the PEM file. Default is an empty string.

Properties

metricsResetInterval

This parameter can be used to specify a non-zero periodic time interval in minutes after which all the metrics counters will be reset to 0. (Default is 0 which will never reset the metrics counters.)

Properties

newDataCpuYieldTimeInSenderThread

This parameter specifies the CPU yield time (in partial seconds) needed inside the thread that is just about to send a new data item to the remote server. It should be >= 0.0 (Default is 0.001 i.e. 1 millisecond)

Properties

noDataCpuYieldTimeInSenderThread

This parameter specifies the CPU yield time (in partial seconds) needed inside the thread that spin loops when no data is available to send to the remote server. It should be >= 0.0 (Default is 0.001 i.e. 1 millisecond)

Properties

reconnectionInterval

This parameter specifies the periodic time interval (in partial seconds) at which reconnection to the remote WebSocket server will be attempted. It should be > 0.0 (Default is 60.0 seconds)

Properties

tcpNoDelay

This parameter can be used to control the TCP Nagle's algorithm. Setting it to true will disable Nagle's algorithm and setting it to false will enable. (Default is false.)

Properties

trustedServerCertificateFileName

This parameter specifies the full path of the PEM file name that contains the public certificate of the trusted remote server. This allows for the server authentication. If this parameter is not used or empty, then there will be no server authentication. (Default is an empty string.)

Properties

trustedServerX509SubjectIdentifiers

This parameter specifies a list of verifiable identifiers present in the subject field of the trusted server's public certificate. It is helpful in performing the server authentication using the unsupported certificate types such as the self-signed ones. Some examples of such identifiers: ["ST=New York","L=Armonk","O=IBM","CN=www.ibm.com","emailAddress=websocket.streams@ibm.com"] (Default is an empty list.)

Properties

url

This parameter specifies the URL of the remote WebSocket server.

Properties

websocketLiveMetricsUpdateNeeded

This parameter specifies whether live update for this operator's custom metrics is needed. (Default is true)

Properties

websocketLoggingNeeded

This parameter specifies whether logging is needed from the WebSocket library. (Default is false)

Properties

wsClientSessionLoggingNeeded

This parameter specifies whether logging is needed when the client session send/receive is in progress with the remote server. (Default is false)

Properties

wsConnectionLoggingNeeded

This parameter specifies whether logging is needed when this operator connects and disconnects to/from the remote server. (Default is false)

Properties

Metrics

nDataBytesReceivedFromRemoteServer - Gauge

Total number of data bytes received from the remote server thus far.

nDataBytesSentToRemoteServer - Gauge

Total number of data bytes sent to the remote server thus far.

nDataItemsReceivedFromRemoteServer - Gauge

Number of data items received from the remote server thus far.

nDataItemsSentToRemoteServer - Gauge

Number of data items sent to the remote server thus far.

nSizeOfMostRecentDataItemReceived - Gauge

Size of the most recent data item received from the remote server.

nSizeOfMostRecentDataItemSent - Gauge

Size of the most recent data item sent to the remote server.

nTcpNoDelay - Gauge

Did the user configure TCP_NODELAY for this operator?

nTimeTakenToSendMostRecentDataItem - Gauge

Time taken in milliseconds to send the most recent data item to the remote server.

nWebsocketConnectionActiveStatus - Gauge

Indicates the current active status of the WebSocket connection.

Libraries

Boost Library
Library Name: boost_system, boost_chrono, boost_random, boost_thread
Library Path: ../../lib
Include Path: ../../include
Websocketpp Library
Include Path: ../../include