Operator WebSocketSendReceive

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.teracloud.streams.websocket/op$com.teracloud.streams.websocket.op$WebSocketSendReceive.svg

The client-based WebSocketSendReceive operator simultaneously sends data to and receives data from a remote WebSocket server.

This operator can be used to send and receive text (plain text, JSON or XML) and/or binary data. Sending and receiving of data happens independent of each other due the bidirectional (full duplex) connection offered by the WebSocket protocol. Connections to the remote server are persistent and long running until either party (client or server) closes the connection.

This operator will establish a plain (ws or http) or TLS (wss or https) WebSocket based on the protocol provided in the url parameter.

This operator supports send-and-receive to and from a single WebSocket server, promoting a one-to-one data access pattern.

This operator provides one input port for applications to send data to the remote WebSocket server. This input port must have either rstring strData or blob blobData or both tuple attribute(s).

Additionally, this operator provides two output ports. The first output port is mandatory and outputs data received from the remote server. The output stream must have either rstring strData or blob blobData or both tuple attribute(s) in its schema. Additionally, the stream can have attributes assigned via several output functions.

The second output port is optional and outputs whether a given incoming tuple was successfully sent to the remote server. This output port must have int32 sendResultCode and rstring sendResultMessage as its output tuple attributes. A value of 0 in the sendResultCode attribute means data was successfully sent; otherwise there was an error in sending the data. If this attribute carries a non-zero value, then the application can have additional logic to retransmit that same data item at a later time. Please refer to the output port section below for more details.

Connection handling

A connection to the remote server is attempted when the operator invocation is initialized at runtime.

Once established, the operator will keep that connection persistent until the operator is shutdown, the remote server is stopped or any other network error occurs.

If the remote WebSocket server becomes unavailable or closes the connection, the operator will try to re-establish the connection at a periodic, user configurable interval.

It is recommended to program application logic to backoff and wait sending input tuples for a reasonable amount of time when a connection error occurs. Otherwise, every incoming tuple will trigger a reconnection attempt.

Example


use com.teracloud.streams.websocket.op::WebSocketSendReceive;
composite Main {
  param
    expression<uint32> $numberOfDataItemsToBeSent : 1000u;
  type
    SendData_t = rstring strData, blob blobData;

    ReceivedData_t =
      rstring strData,               // String based data (e.g., plain text, JSON, etc.) received from the remote server
      blob blobData,                 // Blob data received from the remote server
      uint64 totalDataItemsReceived, // Total data items received from the remote server
      uint64 totalDataBytesReceived, // Number of data bytes received from the remote server
      uint64 totalDataItemsSent,     // Total data items sent to the remote server
      uint64 totalDataBytesSent;     // Number of data bytes sent to the remote server

  graph
    stream<SendData_t> BinaryData as BD = Beacon() {
      param
        iterations: $numberOfDataItemsToBeSent;
        initDelay: 10.0;
      output
        BD: blobData = convertToBlob("This data item " + 
          (rstring)(IterationCount() + 1ul) + " is sent as a blob.");
    }
    
    stream<SendData_t> TextData as TD = Beacon() {
      param
        iterations: $numberOfDataItemsToBeSent;
        initDelay: 10.0;
      output
        TD: strData = "This data item " + 
          (rstring)(IterationCount() + 1ul) + " is sent as a text.";
    }		
    
    // Send binary and text data to the remote WebSocket server and at the same time  
    // receive binary data or text data or both from the remote WebSocket server.
    stream<ReceivedData_t> ReceivedData = WebSocketSendReceive(BinaryData,TextData) {
      param
        url: "ws://example.com";
      
      output
          // strData and/or blobData attributes will be automatically
          // assigned with received data values within the operator logic.
          // Other attributes can be assigned manually as done below.
        ReceivedData: totalDataItemsReceived = getTotalDataItemsReceived(),
          totalDataBytesReceived = getTotalDataBytesReceived(),
          totalDataItemsSent = getTotalDataItemsSent(),
          totalDataBytesSent = getTotalDataBytesSent();
    }

    // See $STREAMS_INSTALL/samples/com.teracloud.streams.websocket for more examples
}

Summary

Ports
This operator has 1 input port and 2 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 14 parameters.

Required: url

Optional: certificateFileName, certificatePassword, metricsResetInterval, newDataCpuYieldTimeInSenderThread, noDataCpuYieldTimeInSenderThread, reconnectionInterval, tcpNoDelay, trustedServerCertificateFileName, trustedServerX509SubjectIdentifiers, websocketLiveMetricsUpdateNeeded, websocketLoggingNeeded, wsClientSessionLoggingNeeded, wsConnectionLoggingNeeded

Metrics
This operator reports 9 metrics.

Properties

Implementation
C++
Threading
Never - Operator never provides a single threaded execution context.

Input Ports

Ports (0)

This port brings the data into this operator for sending it to the remote WebSocket server.

Attributes on this input port:
  • strData (required, rstring) - Any text based data (plain text, JSON or XML) can be sent to the remote WebSocket server via this attribute.
  • blobData (required, blob) - Any binary (blob) data can be sent to the remote WebSocket server via this attribute.
Properties

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes.
Output Functions
WebSocketSendReceiveFunctions
<any T> T AsIs(T)

The default function for output attributes. This function assigns the output attribute to the value of the input attribute with the same name.

uint64 getTotalDataItemsReceived()

Returns an uint64 value indicating the total number of data items received so far from the remote server.

uint64 getTotalDataBytesReceived()

Returns an uint64 value indicating the total number of data bytes received so far from the remote server.

uint64 getTotalDataItemsSent()

Returns an uint64 value indicating the total number of data items sent so far to the remote server.

uint64 getTotalDataBytesSent()

Returns an uint64 value indicating the total number of data bytes sent so far to the remote server.

Ports (0)

This port produces output tuples that carry the text or binary data received from the remote WebSocket server.

The schema for this port must either have an attribute named strData with an rstring data type to hold the received text data or an attribute named blobData with a blob data type to hold the received binary data or both.

Additional output tuple attributes can be assigned via the available output functions. For example, two uint64 attributes can be set to the total number of tuples and bytes received from the remote server. See the available output functions for more info.

Properties

Ports (1)
This is an optional output port. If it is present, it returns whether a given input tuple consumed by this operator was successfully sent to the remote WebSocket server or not. The schema for this port must have two attributes:
  • int32 sendResultCode
  • rstring sendResultMessage

sendResultCode will carry a value of 0 on a successful send to the remote server and a non-zero result in case of an error in sending the data. sendResultMessage will carry a descriptive message about the send result. Application logic can check these output tuple attributes and have additional logic to retransmit a given input tuple if it was not sent to the remote server due to an error (e.g: broken connection).

Properties

Parameters

This operator supports 14 parameters.

Required: url

Optional: certificateFileName, certificatePassword, metricsResetInterval, newDataCpuYieldTimeInSenderThread, noDataCpuYieldTimeInSenderThread, reconnectionInterval, tcpNoDelay, trustedServerCertificateFileName, trustedServerX509SubjectIdentifiers, websocketLiveMetricsUpdateNeeded, websocketLoggingNeeded, wsClientSessionLoggingNeeded, wsConnectionLoggingNeeded

certificateFileName

This parameter specifies the full path of the WebSocket client's private key and public certificate holding PEM file name. Default is an empty string. User can override this.

Properties

certificatePassword

This parameter specifies a password needed for decrypting the WebSocket client's private key in the PEM file. Default is an empty string.

Properties

metricsResetInterval

This parameter can be used to specify a non-zero periodic time interval in minutes after which all the metrics counters will be reset to 0. (Default is 0 which will never reset the metrics counters.)

Properties

newDataCpuYieldTimeInSenderThread

This parameter specifies the CPU yield time (in partial seconds) needed inside the thread that is just about to send a new data item to the remote server. It should be >= 0.0 (Default is 0.001 i.e. 1 millisecond)

Properties

noDataCpuYieldTimeInSenderThread

This parameter specifies the CPU yield time (in partial seconds) needed inside the thread that spin loops when no data is available to send to the remote server. It should be >= 0.0 (Default is 0.001 i.e. 1 millisecond)

Properties

reconnectionInterval

This parameter specifies the periodic time interval (in partial seconds) at which reconnection to the remote WebSocket server will be attempted. It should be > 0.0 (Default is 60.0 seconds)

Properties

tcpNoDelay

This parameter can be used to control the TCP Nagle's algorithm. Setting it to true will disable Nagle's algorithm and setting it to false will enable. (Default is false.)

Properties

trustedServerCertificateFileName

This parameter specifies the full path of the PEM file name that contains the public certificate of the trusted remote server. This allows for the server authentication. If this parameter is not used or empty, then there will be no server authentication. (Default is an empty string.)

Properties

trustedServerX509SubjectIdentifiers

This parameter specifies a list of verifiable identifiers present in the subject field of the trusted server's public certificate. It is helpful in performing the server authentication using the unsupported certificate types such as the self-signed ones. Some examples of such identifiers: ["ST=New York","L=Armonk","O=IBM","CN=www.ibm.com","emailAddress=websocket.streams@ibm.com"] (Default is an empty list.)

Properties

url

This parameter specifies the URL of the remote WebSocket server.

Properties

websocketLiveMetricsUpdateNeeded

This parameter specifies whether live update for this operator's custom metrics is needed. (Default is true)

Properties

websocketLoggingNeeded

This parameter specifies whether logging is needed from the WebSocket library. (Default is false)

Properties

wsClientSessionLoggingNeeded

This parameter specifies whether logging is needed when the client session send/receive is in progress with the remote server. (Default is false)

Properties

wsConnectionLoggingNeeded

This parameter specifies whether logging is needed when this operator connects and disconnects to/from the remote server. (Default is false)

Properties

Code Templates

WebSocketSendReceive

stream<${outputSchema}> ${outputStream} as O = com.teracloud.streams.websocket.op::WebSocketSendReceive(${inputStream}) {
  param
    url: ${url};
    ${parameter}: ${parameterExpression};
  output O: ${attributeAssignment};
}
      

WebSocketSendReceive with send result stream

(stream<${outputSchema}> ${outputStream} as O;
 stream<int32 sendResultCode, rstring sendResultMessage> ${resultStream})
 = com.teracloud.streams.websocket.op::WebSocketSendReceive(${inputStream}) {
  param
    url: ${url};
    ${parameter}: ${parameterExpression};
  output O: ${attributeAssignment};
}
      

Metrics

nDataBytesReceivedFromRemoteServer - Gauge

Total number of data bytes received from the remote server thus far.

nDataBytesSentToRemoteServer - Gauge

Total number of data bytes sent to the remote server thus far.

nDataItemsReceivedFromRemoteServer - Gauge

Number of data items received from the remote server thus far.

nDataItemsSentToRemoteServer - Gauge

Number of data items sent to the remote server thus far.

nSizeOfMostRecentDataItemReceived - Gauge

Size of the most recent data item received from the remote server.

nSizeOfMostRecentDataItemSent - Gauge

Size of the most recent data item sent to the remote server.

nTcpNoDelay - Gauge

Did the user configure TCP_NODELAY for this operator?

nTimeTakenToSendMostRecentDataItem - Gauge

Time taken, in milliseconds, to send the most recent data item to the remote server.

nWebsocketConnectionActiveStatus - Gauge

Indicates the current active status of the WebSocket connection.

Libraries

Boost Library
Library Name: boost_system, boost_chrono, boost_random, boost_thread
Library Path: ../../lib
Include Path: ../../include
Websocketpp Library
Include Path: ../../include