Operator WebSocketSink

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.teracloud.streams.websocket/op$com.teracloud.streams.websocket.op$WebSocketSink.svg

The server-based WebSocketSink operator is designed to send data to any connected WebSocket clients (inside or outside of Streams).

This operator can be used to send text (plain text, JSON or XML) and/or binary data. While WebSocket connections are bidirectional (full duplex) in nature, this sink operator focuses only on sending data to the connected clients and will ignore any data received from them. This approach promotes a one-to-many data access pattern, and the operator will take as much time as it needs to send a piece of data to all connected clients.

This operator offers two input ports. The first input port is required and is used to receive data that will be sent to WebSocket clients. The schema for the input stream(s) must have:
  • either rstring strData or blob blobData or both tuple attributes
  • a list<rstring> sendToUrlContextPaths attribute
The input attribute sendToUrlContextPaths can be used to send a given data item to one or more remote clients that accessed a particular URL context path by including such context paths as elements in the list. If this attribute has an empty list, then a given data item will be sent to all the connected clients irrespective of the URL context paths they accessed.

The second input port is optional and is used to receive a new client allowlist which restricts client connections only to the specified IP addresses. This input port must have only one attribute named clientWhitelist that is of type list<rstring>. This value can be an empty list (for accepting connections from any client with no restrictions) or a list with one or more remote client IP addresses.

Connection handling

This operator supports both plain (ws or http) or TLS (wss or https) WebSocket client connections though plain connections are now enabled by default (see nonTlsEndpointNeeded). Additionally, the operator can be configured to allow a maximum number of concurrent client connections.

By default, any client is allowed to connect to this operator to receive data. If only a specific set of clients should be allowed to connect to this operator, the clientWhitelist parameter be used to provide a list<rstring> value with one or more client IP addresses. An empty list means connections will be accepted from any client with no restrictions. A new allowlist can also be injected into this operator via the optional second input port while the application is running. On receiving a new allowlist, this operator will start using it immediately and terminates any existing connections that are no longer specified in the allowlist. IP addresses can be exact or contain wildcards as shown below.


clientWhitelist: ["172.34.18.212", "10.5.23.17", "172.*.42.*", "10.29.42.*"];

Examples


use com.teracloud.streams.websocket.op::WebSocketSink;
composite Main {
  param
    expression<uint32> $numberOfDataItemsToBeSent : 1000u;
      
  type
    SendData_t = rstring strData, blob blobData, list<rstring> sendToUrlContextPaths; 
    
    ClientWhitelist_t = list<rstring> clientWhitelist;

  graph
    stream<SendData_t> BinaryData = Beacon() {
      param
        iterations: $numberOfDataItemsToBeSent;
        initDelay: 70.0; // Allow time for clients to connect
      
      output
        BinaryData: blobData = convertToBlob("This data item " + 
          (rstring)(IterationCount() + 1ul) + " is sent as a blob."),
          sendToUrlContextPaths = (list<rstring>)[];
    }
    
    stream<SendData_t> TextData = Beacon() {
      param
        iterations: $numberOfDataItemsToBeSent;
        initDelay: 70.0; // Allow time for clients to connect
      
      output
        TextData: strData = "This data item " + 
          (rstring)(IterationCount() + 1ul) + " is sent as a text.",
          sendToUrlContextPaths = (list<rstring>)[];
    }

    // Example1: Send text and binary data to restricted client list; non-TLS and TLS enabled
    () as MySink1 = WebSocketSink(BinaryData, TextData) {
      param
        tlsPort: 8443u;
        nonTlsEndpointNeeded: true;
        nonTlsPort: 8080u;
        clientWhitelist: ["192.168.1.*"];
    }

    stream<ClientWhitelist_t> ClientWhitelist = FileSource() {
      param
        file: "client-whitelist.txt";
        hotFile: true;
        initDelay: 15.0;
    }

    // Example2: Send text and binary data to client list (up to 10 allowed)
    // The server offers a "/Orders" and "/Inventory" context path for clients to
    // connect to receive data related to those specific topics.
    // The allowed client source IPs are controlled via 2nd input port
    () as MySink2 = WebSocketSink(BinaryData, TextData; ClientWhitelist) {
      param
        tlsPort: 8444u;
        urlContextPath: ["Orders", "Inventory"];
        maxClientConnectionsAllowed: 10u;
    }

    // See $STREAMS_INSTALL/samples/com.teracloud.streams.websocket for more examples
}

Summary

Ports
This operator has 2 input ports and 0 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 21 parameters.

Optional: certificateFileName, certificatePassword, clientWhitelist, ipv6Available, maxClientConnectionsAllowed, metricsResetInterval, newDataCpuYieldTimeInSenderThread, noDataCpuYieldTimeInSenderThread, nonTlsEndpointNeeded, nonTlsPort, tcpNoDelay, tlsCipherWhitelist, tlsPort, trustedClientCertificateFileName, trustedClientX509SubjectIdentifiers, urlContextPath, websocketLiveMetricsUpdateNeeded, websocketLoggingNeeded, websocketStaleConnectionPurgeInterval, wsClientSessionLoggingNeeded, wsConnectionLoggingNeeded

Metrics
This operator reports 11 metrics.

Properties

Implementation
C++
Threading
Never - Operator never provides a single threaded execution context.

Input Ports

Ports (0)

This port brings the data into this operator for sending it to the remote WebSocket clients.

Attributes on this input port:
  • strData (required, rstring) - Any text based data (plain text, JSON or XML) can be sent to the remote WebSocket clients via this attribute.
  • blobData (required, blob) - Any binary (blob) data can be sent to the remote WebSocket clients via this attribute.
  • sendToUrlContextPaths (required, list<rstring>) - This input attribute can be used to send a given data item to one or more remote clients that accessed a particular URL context path by including such context paths as elements in the list. If this attribute has an empty list, then a given data item will be sent to all the connected clients irrespective of the URL context paths they accessed.

All the extra input attributes will be ignored.

Properties

Ports (1)

This port brings the new client whitelist..

Attributes on this input port:
  • clientWhitelist (required, list<rstring>) - An empty list or a list of client IP addresses that are allowed to connect to this operator.

All the extra input attributes will be ignored.

Properties

Parameters

This operator supports 21 parameters.

Optional: certificateFileName, certificatePassword, clientWhitelist, ipv6Available, maxClientConnectionsAllowed, metricsResetInterval, newDataCpuYieldTimeInSenderThread, noDataCpuYieldTimeInSenderThread, nonTlsEndpointNeeded, nonTlsPort, tcpNoDelay, tlsCipherWhitelist, tlsPort, trustedClientCertificateFileName, trustedClientX509SubjectIdentifiers, urlContextPath, websocketLiveMetricsUpdateNeeded, websocketLoggingNeeded, websocketStaleConnectionPurgeInterval, wsClientSessionLoggingNeeded, wsConnectionLoggingNeeded

certificateFileName

This parameter specifies the full path of the WebSocket server PEM certificate file name. Default is "<applicationDirectory>/etc/ws-server.pem".

Properties

certificatePassword

This parameter specifies a password needed for decrypting the WebSocket server's private key in the PEM file. Default is an empty string.

Properties

clientWhitelist

This parameter specifies a list of client IP addresses to accept connections only from those clients. (Default is an empty list to have no client connection restrictions.)

Properties

ipv6Available

This parameter indicates whether the IPv6 protocol stack is available in the Linux machine where the WebSocketSink operator is running. (Default is true)

Properties

maxClientConnectionsAllowed

This parameter specifies the maximum number of concurrent clients allowed to connect with this operator. After this limit is reached, new client connections will be denied until any existing clients close their connections. (Default is 32.)

Properties

metricsResetInterval

This parameter can be used to specify a non-zero periodic time interval in minutes after which all the metrics counters will be reset to 0. (Default is 0 which will never reset the metrics counters.)

Properties

newDataCpuYieldTimeInSenderThread

This parameter specifies the CPU yield time (in partial seconds) needed inside the thread that is just about to send a new data item to the remote clients. It should be >= 0.0 (Default is 0.001 i.e. 1 millisecond)

Properties

noDataCpuYieldTimeInSenderThread

This parameter specifies the CPU yield time (in partial seconds) needed inside the thread that spin loops when no data is available to send to the remote clients. It should be >= 0.0 (Default is 0.001 i.e. 1 millisecond)

Properties

nonTlsEndpointNeeded

This parameter specifies whether a WebSocket endpoint is needed. (Default is false)

Properties

nonTlsPort

This parameter specifies the non-TLS (plain) WebSocket port number. Default port number is 80.

Properties

tcpNoDelay

This parameter can be used to control the TCP Nagle's algorithm. Setting it to true will disable Nagle's algorithm and setting it to false will enable. (Default is false.)

Properties

tlsCipherWhitelist

This parameter can be used to specify a string containing one or more comma separated TLS/SSL ciphers that should be used during TLS/SSL connection negotiations with clients. It is handy when there is a need to avoid using ciphers that are found to have security vulnerabilities. (Default is an empty string.)

Properties

tlsPort

This parameter specifies the WebSocket TLS port number. Default port number is 443.

Properties

trustedClientCertificateFileName

This parameter specifies the full path of the PEM file name that can contain the public certificates of all the trusted clients. This allows for the client (mutual) authentication. If this parameter is not used or empty, then there will be no client authentication. (Default is an empty string.)

Properties

trustedClientX509SubjectIdentifiers

This parameter specifies a list of verifiable identifiers present in the subject field of the trusted client's public certificate. It is helpful in performing the client (mutual) authentication using the unsupported certificate types such as the self-signed ones. Some examples of such identifiers: ["ST=New York","L=Armonk","O=IBM","CN=www.ibm.com","emailAddress=websocket.streams@ibm.com"] (Default is an empty list.)

Properties

urlContextPath

This parameter specifies a list with zero or more URL context path(s) for a given WebSocket server endpoint. Users can come up with any application-specific value(s) made of either a single or a multi-part path (e.g., "Orders", "MyServices/Banking/Deposit"). With that example, WebSocket server URL should either be https://host:port/Orders or https://host:port/MyServices/Banking/Deposit. Default is an empty list to indicate no URL context path.

Properties

websocketLiveMetricsUpdateNeeded

This parameter specifies whether live update for this operator's custom metrics is needed. (Default is true)

Properties

websocketLoggingNeeded

This parameter specifies whether logging is needed from the WebSocket library. (Default is false)

Properties

websocketStaleConnectionPurgeInterval

This parameter specifies periodic time interval in seconds during which any stale remote client connection should be purged to free up memory usage. (Default is 36060 seconds)

Properties

wsClientSessionLoggingNeeded

This parameter specifies whether logging is needed when the remote client session is in progress with this operator. (Default is false)

Properties

wsConnectionLoggingNeeded

This parameter specifies whether logging is needed when the remote clients connect and disconnect. (Default is false)

Properties

Code Templates

WebSocketSink

() as ${opInvocationName} = com.teracloud.streams.websocket.op::WebSocketSink(${inputStream}) {
  param
    ${parameter}: ${parameterExpression};
}
      

WebSocketSink with allowlist control port

() as ${opInvocationName} = com.teracloud.streams.websocket.op::WebSocketSink(${inputDataStream};${inputControlStream}) {
  param
    ${parameter}: ${parameterExpression};
}
      

Metrics

nBlobDataItemsWaitingToBeSentToRemoteClients - Gauge

Total number of blob data items waiting to be sent to the remote clients.

nClientsConnected - Gauge

Number of remote clients currently connected to this operator invocation.

nDataBytesSentToRemoteClients - Gauge

Total number of data bytes sent to the remote clients thus far.

nDataItemsSentToRemoteClients - Gauge

Number of data items sent to the remote clients thus far.

nNonTlsPort - Gauge

Non-TLS port number configured for this operator.

nNonTlsPortNeeded - Gauge

Did the user configure to exchange data via a non-TLS port?

nSizeOfMostRecentDataItemSent - Gauge

Size of the most recent data item sent to a remote client.

nStringDataItemsWaitingToBeSentToRemoteClients - Gauge

Total number of string data items waiting to be sent to the remote clients.

nTcpNoDelay - Gauge

Did the user configure TCP_NODELAY for this operator?

nTimeTakenToSendMostRecentDataItem - Gauge

Time taken in milliseconds to send the most recent data item to a remote client.

nTlsPort - Gauge

TLS port number configured for this operator.

Libraries

Boost Library
Library Name: boost_system, boost_chrono, boost_random, boost_thread
Library Path: ../../lib
Include Path: ../../include
Websocketpp Library
Include Path: ../../include