Operator WebSocketSource

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.teracloud.streams.websocket/op$com.teracloud.streams.websocket.op$WebSocketSource.svg

The server-based WebSocketSource operator is designed to receive text (plain text, JSON or XML) or binary data from the remote WebSocket and HTTP clients. This operator can also accept HTTP GET, HTTP PUT and HTTP POST requests from the remote HTTP clients. One or more remote clients can connect to this operator either via plain WebSocket/HTTP (ws or http) or via TLS WebSocket/HTTP (wss or https). For a TLS WebSocket/HTTP, a server side certificate must be generated first and the text file available in the etc directory of this toolkit provides useful details on that topic. For a HTTP(S) GET/PUT/POST client to send text based data, it can have its Content-Type header as text/plain, application/xml or application/json. To send binary based data, a HTTP(S) PUT/POST client must have its Content-Type header as application/octet-stream and it can use a standard mechanism available in many web frameworks for including multipart MIME content. WebSocket connections established with this operator are always persistent and long running. HTTP connections established with this operator can also be made persistent (Keep-Alive) by configuring it to do so. Persistent (Keep-Alive) HTTP connections will be useful in improving throughput for non-browser based client applications with a need to send data items iteratively in a tight loop.

This operator can also be optionally configured to send an ack back to the remote WebSocket clients after receiving a certain number of data items (tuples) from a given client. This feature can be used by a remote WebSocket client to wait for an ack after sending a certain number of data items. A missing ack can indicate the remote client about a server side network error or an operator crash/error and make that remote WebSocket client to retransmit the previously sent, but unacknowledged data items (tuples).

This operator can be configured with a WebSocket server port number which is optional. If the user of this operator doesn't specify a WebSocket server port number, then a default port number of 443 will be used.

This operator also provides an option for the client applications to do the HTTP/HTTPS GET/PUT/POST for sending text based data (plain text, JSON or XML). If this operator's allowHttpPost parameter is set to true, client applications can connect to this operator's HTTP/HTTPS URL endpoint and send their data via HTTP/HTTPS GET, PUT and POST. This feature will be handy for the traditional HTTP(S) based web applications that are not suitable for quick modernization to use WebSocket instead of HTTP(S).

Even though it is labeled as a source operator, it can be configured to perform like an analytic operator to receive data from a client, process that data and send result/response back to that client. In addition, this operator can be configured to allow a certain maximum number of concurrent connections from the remote clients depending on the needs of the application and based on the capacity of the underlying OS and hardware configuration such as networking, CPU cores and memory limits.

In summary, this operator supports data reception via both WebSocket and HTTP GET/PUT/POST on plain as well as secure endpoints. Users will get a five-in-one benefit (WebSocket, HTTP, plain, secure and roundtripping) from this operator. This is Receive-only from multiple clients with an option to make it a Receive-Process-Send operator. This operator promotes Many-To-One data access pattern.

By default, any remote client that knows the WebSocket URL of this operator can connect to this operator for sending and receiving data. If only a specific set of remote clients should be allowed to connect to this operator, an optional operator parameter named clientWhitelist can be used to provide a list<rstring> with one or more client IP addresses. An empty list means connections will be accepted from any client with no restrictions. A new whitelist can also be injected into this operator via an optional input port in the middle of an application run. On receiving such a new whitelist, this operator will start using it immediately and it will terminate its existing connections with those remote clients that are no longer specified in the whitelist. IP addresses can be exact or wildcard based as shown below.


clientWhitelist: \["172.34.18.212", "10.5.23.17", "172.*.42.*", "10.29.42.*"\];

This operator provides two output ports. On the first output port, it keeps sending the data received from the remote clients. On the second output port, it keeps sending the "End of Client Session" signal whenever a remote WebSocket client closes its connection. Please refer to the output port section below to learn more.

This operator provides two optional input ports. The presence of these two input ports are purely based on the needs of the application. They can be completely absent or only one of them can be present or both of them can be present. One of them is used to send response to a remote client and the other is used to dynamically change the client whitelist as explained in a previous paragraph. Please refer to the input port section below to learn more.

See the samples folder inside this toolkit for a working example that shows how to use this operator.

Summary

Ports
This operator has 2 input ports and 2 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 26 parameters.

Optional: allowHttpPost, allowPersistentHttpConnections, certificateFileName, certificatePassword, clientWhitelist, initDelay, ipv6Available, maxClientConnectionsAllowed, metricsResetInterval, newDataCpuYieldTimeInSenderThread, noDataCpuYieldTimeInSenderThread, nonTlsEndpointNeeded, nonTlsPort, numberOfMessagesToReceiveBeforeAnAck, responseTimeout, tcpNoDelay, tlsCipherWhitelist, tlsPort, trustedClientCertificateFileName, trustedClientX509SubjectIdentifiers, urlContextPath, websocketLiveMetricsUpdateNeeded, websocketLoggingNeeded, websocketStaleConnectionPurgeInterval, wsClientSessionLoggingNeeded, wsConnectionLoggingNeeded

Metrics
This operator reports 15 metrics.

Properties

Implementation
C++
Threading
Never - Operator never provides a single threaded execution context.

Input Ports

Ports (0)

Even though this operator is labeled as a source, it can optionally be configured to function like an analytic operator. In this changed role, it will accept an input port to receive tuples and send them to the remote clients. In addition, this operator can also have another optional port to receive a client whitelist and dynamically update it inside this operator for allowing or rejecting connections from remote clients. To meet that requirement, this operator provides two optional input ports. The presence of these two input ports are purely based on the needs of the application. They can be completely absent or only one of them can be present or both of them can be present. One of them is used to send response to a remote client and the other is used to dynamically change the client whitelist. That is why it is not possible to impose a particular ordering between them. These input ports can be used in any order and the internal operator logic knows how to identify which is which.

This is one of those two input ports. It is used for an application to send data to a remote WebSocket client as a response to the data it received earlier from that client and finished processing it. This input port must have either rstring strData or blob blobData or both of them and uint64 clientSessionId as the tuple attribute(s). By using the client session id, this operator will correlate to find the correct remote client to where a given response should be sent. If HTTP based remote clients are sending data to this operator, application logic can send its own custom HTTP response headers back to those HTTP clients. In order to do that, this input port can have an optional attribute map<rstring, rstring> httpResponseHeaders which the application logic can populate with its custom HTTP headers. When this input port is present, it is required for the application logic to send a (round-trip) response back to the client from where it received a data item. Such a response should be sent as quick as possible before the configured response timeout value. If not, a timeout handler thread running inside this operator will detect that missed reponse and do the necessary clean-up work for the remote client connection that is waiting for a response.

Attributes on this input port:
  • strData (required, rstring) - Any text based data (plain text, JSON or XML) can be sent to the remote WebSocket clients via this attribute.
  • blobData (required, blob) - Any binary (blob) data can be sent to the remote WebSocket clients via this attribute.
  • clientSessionId (required, uint64) - Client session id to where a given response should be sent.
  • httpResponseHeaders (optional, map<rstring, rstring>) - This map attribute can be present if the application logic has to send custom HTTP response headers to a remote HTTP client.

All the extra input attributes will be ignored.

Properties

Ports (1)

This is the other optional input port. It is used to send a new client whitelist into this operator to accept client connections only from the specified IP addresses. This input port must have only one attribute named clientWhitelist that is of type list<rstring>. This can be an empty list for accepting connections from any client with no restrictions or a list with one or more remote client IP addresses that are allowed to connect with this operator to send and receive data.

Attributes on this input port:
  • clientWhitelist (required, list<rstring>) - An empty list or a list of client IP addresses that are allowed to connect to this operator.

All the extra input attributes will be ignored.

As mentioned in the description for the other input port, ordering of these two ports can be at the discretion of the application developers.

Properties

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes.
Output Functions
WebSocketSourceFunctions
<any T> T AsIs(T)

The default function for output attributes. This function assigns the output attribute to the value of the input attribute with the same name.

uint64 getClientSessionId()

Returns an uint64 value indicating the client session id that corresponds to the current output tuple.

uint64 getTotalDataItemsReceived()

Returns an uint64 value indicating the total number of data items received so far from a remote client.

uint64 getTotalDataBytesReceived()

Returns an uint64 value indicating the total number of data bytes received so far from a remote client.

uint64 getTotalDataItemsSent()

Returns an uint64 value indicating the total number of data items sent so far to a remote client.

uint64 getTotalDataBytesSent()

Returns an uint64 value indicating the total number of data bytes sent so far to a remote client.

rstring getClientIpAddress()

Returns a string indicating the IP address of a remote client.

rstring getClientPort()

Returns a string indicating the port of a remote client.

rstring getUrlContextPath()

Returns a string indicating the context path present in the URL being accessed by the client.

boolean isWebSocketClient()

Returns a boolean indicating whether a remote client holds a WebSocket connection.

boolean isHttpClient()

Returns a boolean indicating whether a remote client holds a Http connection.

map<rstring, rstring> getHttpRequestHeaders()

Returns an SPL map holding the HTTP headers that were part of a client's HTTP GET/PUT/POST request.

rstring getHttpRequestMethodName()

Returns a string indicating the method name (GET, PUT or POST) found in the client's HTTP request.

map<rstring, rstring> getUrlQueryStringKeyValuePairs()

Returns an SPL map holding the key/value pairs found in the URL query string of a client's HTTP GET request.

rstring getFullUrlBeingAccessedByTheClient()

Returns a string indicating the full URL being accessed by the remote client.

rstring getFileNameInUrlForHttpGet()

Returns a string indicating the file name in the URL for HTTP GET based requests.

Ports (0)

This port produces the output tuples that carry the text or binary data received from the remote WebSocket or HTTP GET/PUT/POST client. The schema for this port must either have an attribute named strData with an rstring data type to hold the received text data or an attribute named blobData with a blob data type to hold the received binary data or both. Remaining attributes can be of any type based on the needs of the application. This operator is capable of sending out data received from multiple WebSocket and HTTP GET/PUT/POST clients that can all communicate with this operator at the very same time. Please refer to the custom output functions provided by this operator to query the client session id and other client session specific metrics and assign such values to other optional attributes in this output port. Depending on the needs of a given application, this operator can perform more beyond being just a source operator. If applications are interested in receiving data from a remote client, process and then send a result a.k.a response back to that client, then this output port must have an additional attribute uint64 clientSessionId. As mentioned earlier, the value of this attribute can be queried by using a custom output function. When using this operator to send a result back to a client, there must be an input port present in this operator that has its own attribute uint64 clientSessionId. Application developers can arrange their application logic to receive data from a client, process it and then send a result along with the client session id for that client. This attribute present in both the input and output ports will let this operator to do the correlation of the result/response with the correct client before sending it to that client.

There are multiple available output functions, and output attributes can also be assigned values with any SPL expression that evaluates to the proper type.

Properties

Ports (1)

This port produces periodic output tuples to give an indication about the end of a specific WebSocket client session that was in progress moments ago for the given client session id. The schema for this port must have one attribute with its correct data type as shown here. uint64 clientSessionId This source operator will set the appropriate value for this attribute to indicate which particular WebSocket client (i.e. session id) just ended the data exchange. Downstream operators can make use of this "End Of Client Session" signal as they see fit.

Properties

Parameters

This operator supports 26 parameters.

Optional: allowHttpPost, allowPersistentHttpConnections, certificateFileName, certificatePassword, clientWhitelist, initDelay, ipv6Available, maxClientConnectionsAllowed, metricsResetInterval, newDataCpuYieldTimeInSenderThread, noDataCpuYieldTimeInSenderThread, nonTlsEndpointNeeded, nonTlsPort, numberOfMessagesToReceiveBeforeAnAck, responseTimeout, tcpNoDelay, tlsCipherWhitelist, tlsPort, trustedClientCertificateFileName, trustedClientX509SubjectIdentifiers, urlContextPath, websocketLiveMetricsUpdateNeeded, websocketLoggingNeeded, websocketStaleConnectionPurgeInterval, wsClientSessionLoggingNeeded, wsConnectionLoggingNeeded

allowHttpPost

This parameter indicates whether the client applications are allowed to send data via HTTP(S) GET/PUT/POST. (Default is false)

Properties

allowPersistentHttpConnections

This parameter indicates whether this operator will allow the client applications to make persistent (Keep-Alive) HTTP connections. It is better to allow this only for non-browser based client applications. (Default is false)

Properties

certificateFileName

This parameter specifies the full path of the WebSocket server's private key and public certificate holding PEM file name. Default is to read ws-server.pem from the etc sub-directory of the application that is invoking this operator.

Properties

certificatePassword

This parameter specifies a password needed for decrypting the WebSocket server's private key in the PEM file. Default is an empty string.

Properties

clientWhitelist

This parameter specifies a list of client IP addresses to accept connections only from those clients. (Default is an empty list to have no client connection restrictions.)

Properties

initDelay

This parameter specifies a one time delay in seconds for which this source operator should wait before start generating its first tuple. Default delay is 0.0.

Properties

ipv6Available

This parameter indicates whether the ipv6 protocol stack is available in the Linux machine where the WebSocketSource operator is running. (Default is true)

Properties

maxClientConnectionsAllowed

This parameter specifies the maximum number of concurrent clients allowed to connect with this operator. After this limit is reached, new client connections will be denied until any existing clients close their connections. (Default is 32.)

Properties

metricsResetInterval

This parameter can be used to specify a non-zero periodic time interval in minutes after which all the metrics counters will be reset to 0. (Default is 0 which will never reset the metrics counters.)

Properties

newDataCpuYieldTimeInSenderThread

This parameter specifies the CPU yield time (in partial seconds) needed inside the thread that is just about to send a new data item to the remote clients. It should be >= 0.0 (Default is 0.001 i.e. 1 millisecond)

Properties

noDataCpuYieldTimeInSenderThread

This parameter specifies the CPU yield time (in partial seconds) needed inside the thread that spin loops when no data is available to send to the remote clients. It should be >= 0.0 (Default is 0.001 i.e. 1 millisecond)

Properties

nonTlsEndpointNeeded

This parameter specifies whether a WebSocket (plain) non-TLS endpoint is needed. (Default is false)

Properties

nonTlsPort

This parameter specifies the WebSocket (plain) non-TLS port number. Default port number is 80.

Properties

numberOfMessagesToReceiveBeforeAnAck

This parameter indicates how many messages are to be received before sending an ack to the remote client. (Default is 0 i.e. no ack sent)

Properties

responseTimeout

This parameter specifies the time in seconds before which the application logic should send its pending response to a remote client. If this time expires, a timeout handler thread in this operator will do the necessary internal clean-up work. (Default is 20 seconds.)

Properties

tcpNoDelay

This parameter can be used to control the TCP Nagle's algorithm. Setting it to true will disable Nagle's algorithm and setting it to false will enable. (Default is false.)

Properties

tlsCipherWhitelist

This parameter can be used to specify a string containing one or more comma separated TLS/SSL ciphers that should be used during TLS/SSL connection negotiations with clients. It is handy when there is a need to avoid using ciphers that are found to have security vulnerabilities. (Default is an empty string.)

Properties

tlsPort

This parameter specifies the WebSocket TLS port number. Default port number is 443.

Properties

trustedClientCertificateFileName

This parameter specifies the full path of the PEM file name that can contain the public certificates of all the trusted clients. This allows for the client (mutual) authentication. If this parameter is not used or empty, then there will be no client authentication. (Default is an empty string.)

Properties

trustedClientX509SubjectIdentifiers

This parameter specifies a list of verifiable identifiers present in the subject field of the trusted client's public certificate. It is helpful in performing the client (mutual) authentication using the unsupported certificate types such as the self-signed ones. Some examples of such identifiers: ["ST=New York","L=Armonk","O=IBM","CN=www.ibm.com","emailAddress=websocket.streams@ibm.com"] (Default is an empty list.)

Properties

urlContextPath

This parameter specifies a list with zero or more URL context path(s) for a given WebSocket server endpoint. Users can come up with any application-specific value(s) made of either a single or a multi-part path [e-g: Orders (OR) MyServices/Banking/Deposit]. With that example, WebSocket server URL should either be https://host:port/Orders (OR) https://host:port/MyServices/Banking/Deposit. Default is an empty list to indicate no url context path.

Properties

websocketLiveMetricsUpdateNeeded

This parameter specifies whether live update for this operator's custom metrics is needed. (Default is true)

Properties

websocketLoggingNeeded

This parameter specifies whether logging is needed from the WebSocket library. (Default is false)

Properties

websocketStaleConnectionPurgeInterval

This parameter specifies periodic time interval in seconds during which any stale remote client connection should be purged to free up memory usage. (Default is 0 i.e. no purging)

Properties

wsClientSessionLoggingNeeded

This parameter specifies whether logging is needed when the remote client session is in progress with this operator. (Default is false)

Properties

wsConnectionLoggingNeeded

This parameter specifies whether logging is needed when the remote clients connect and disconnect. (Default is false)

Properties

Metrics

nClientsConnected - Gauge

Number of remote clients currently connected to this operator instance.

nDataBytesReceivedFromRemoteClients - Gauge

Total number of data bytes received by this operator instance.

nDataBytesSentToRemoteClients - Gauge

Total number of data bytes sent to the remote clients thus far.

nDataItemsReceivedFromRemoteClients - Gauge

Number of data items received from the remote clients thus far.

nDataItemsSentToRemoteClients - Gauge

Number of data items sent to the remote clients thus far.

nHttpMessagesReceivedFromRemoteClients - Gauge

Number of HTTP GET, PUT and POST messages received by this operator instance.

nHttpPostAllowed - Gauge

Does this operator allow HTTP GET/PUT/POST from clients?

nNonTlsPort - Gauge

Non-TLS port number configured for this operator.

nNonTlsPortNeeded - Gauge

Did the user configure to exchange data via a non-TLS port?

nResponseTimeoutOccurrences - Gauge

Total number of response timeout occurrences thus far.

nSizeOfMostRecentDataItemReceived - Gauge

Size of the most recent data item received from the remote server.

nSizeOfMostRecentDataItemSent - Gauge

Size of the most recent data item sent to the remote server.

nTcpNoDelay - Gauge

Did the user configure TCP_NODELAY for this operator?

nTimeTakenToSendMostRecentDataItem - Gauge

Time taken in milliseconds to send the most recent data item to the remote server.

nTlsPort - Gauge

TLS port number configured for this operator.

Libraries

Boost Library
Library Name: boost_system, boost_chrono, boost_random, boost_thread
Library Path: ../../lib
Include Path: ../../include
Websocketpp Library
Include Path: ../../include