Operator WebSocketSource
The server-based WebSocketSource operator is designed to receive text (plain text, JSON or XML) or binary data from remote WebSocket and HTTP clients.
- Plain (ws or http) and secure, TLS (wss or https) WebSocket and HTTP connections
- Client allowlisting
- HTTP GET, PUT, and POST requests from remote clients
- Receive-only and Receive-Process-Respond patterns
- The first output port sends data received from remote clients
- The second output port sends client session IDs whenever a client closes its connection
- The first input port receives data to send a response to a remote client
- The second input port is used to dynamically change the client allowlist
See the respective port sections for more information.
Server connections and configuration
WebSocket connections established to this operator are always persistent and long running. HTTP connections established with this operator can also be made persistent (Keep-Alive) if the client requested it and the allowPersistentHttpConnections parameter on the operator is true. Persistent (Keep-Alive) HTTP connections are useful in improving throughput for non-browser based client applications with a need to send data items iteratively in a tight loop.
Additionally, this operator can be configured to allow a certain maximum number of concurrent connections from the remote clients depending on the needs of the application and based on the capacity of the underlying OS and hardware configuration such as networking, CPU cores and memory limits.
Secure connection prerequisites
To support TLS connections, a server-side certificate must be provided and operator invocations must be configured using the certificateFileName parameter.
Enabling plain connections
By default, only secure connections are enabled. To enable plain connections, set the nonTlsEndpointNeeded parameter to true.
Server ports
If the operator is not configured with a server port, 443 is used for TLS-based connections and 80 for plain connections. These ports can be configured via the tlsPort and nonTlsPort parameters.
Client allowlisting
By default, any client is allowed to connect to this operator for sending and receiving data. If only a specific set of clients should be allowed to connect to this operator, the clientWhitelist parameter can be used to provide a list<rstring> with one or more client IP addresses. An empty list means connections will be accepted from any client with no restrictions.
A new allowlist can also be injected into this operator via an optional input port in the middle of an application run. On receiving such a new allowlist, this operator will start using it immediately and it will terminate its existing connections with those remote clients that are no longer specified in the allowlist. IP addresses can be exact or contain wildcards as shown below.
clientWhitelist: ["172.34.18.212", "10.5.23.17", "172.*.42.*", "10.29.42.*"];
HTTP method requests
This operator provides an option for clients to do the HTTP GET, PUT, or POST requests by sending text-based data (plain text, JSON or XML). This option is not enabled by default, but can be by setting the allowHttpPost parameter to true. This feature is helpful for traditional HTTP(S)-based web applications that are not suitable for quick modernization to use WebSocket instead of HTTP(S).
For a HTTP(S) GET/PUT/POST client to send text-based data, it can have its Content-Type header as text/plain, application/xml or application/json.
To send binary data, a HTTP(S) PUT/POST client must have its Content-Type header as application/octet-stream and can use a standard mechanism available in many web frameworks for including multipart MIME content.
Data receive configurations
This operator can be configured (using the numberOfMessagesToReceiveBeforeAnAck parameter) to send an ack back to the remote WebSocket clients after receiving a certain number of data items (tuples) from a given client. This feature can be used by a remote WebSocket client to wait for an ack after sending a certain number of data items. A missing ack can indicate the remote client about a server side network error or an operator crash/error and make that remote WebSocket client to retransmit the previously sent, but unacknowledged data items (tuples).
While this operator is labeled a source operator, it can be configured to perform like an processing operator to receive data from a client, process that data and send a result/response back to that client by using the operator's first input port.
Example
use com.teracloud.streams.websocket.op::WebSocketSource;
composite Main {
type
ReceivedData_t =
rstring strData, // String data (plain text, JSON and XML) received from the remote client
blob blobData, // Binary data (audio, video, image etc.) received from the remote client
uint64 clientSessionId, // Unique identifier of a remote client as assigned by this operator
rstring clientIpAddress, // IP address of the remote client from where the data was received
boolean isWebSocketClient, // True indicates that the remote client holds a WebSocket connection.
uint64 totalDataItemsReceived, // Total data items received so far from a given clientSessionId.
uint64 totalDataBytesReceived; // Number of data bytes received so far from a given clientSessionId.
graph
// Accept plain connections and HTTP method requests
(stream<ReceivedData_t> WebSocketRxData as WSRD;
stream<uint64 clientSessionId> EndOfClientSessionSignal) = WebSocketSource() {
param
nonTlsEndpointNeeded: true;
initDelay: 30.0;
allowHttpPost: true;
maxClientConnectionsAllowed: 30u;
output
// strData and/or blobData attributes will be automatically
// assigned with values by the operator logic.
// Other attributes can be assigned manually as done below.
WSRD: clientSessionId = getClientSessionId(),
clientIpAddress = getClientIpAddress(),
isWebSocketClient = isWebSocketClient(),
totalDataItemsReceived = getTotalDataItemsReceived(),
totalDataBytesReceived = getTotalDataBytesReceived();
}
// See $STREAMS_INSTALL/samples/com.teracloud.streams.websocket for more examples
}
Summary
- Ports
- This operator has 2 input ports and 2 output ports.
- Windowing
- This operator does not accept any windowing configurations.
- Parameters
- This operator supports 26 parameters.
Optional: allowHttpPost, allowPersistentHttpConnections, certificateFileName, certificatePassword, clientWhitelist, initDelay, ipv6Available, maxClientConnectionsAllowed, metricsResetInterval, newDataCpuYieldTimeInSenderThread, noDataCpuYieldTimeInSenderThread, nonTlsEndpointNeeded, nonTlsPort, numberOfMessagesToReceiveBeforeAnAck, responseTimeout, tcpNoDelay, tlsCipherWhitelist, tlsPort, trustedClientCertificateFileName, trustedClientX509SubjectIdentifiers, urlContextPath, websocketLiveMetricsUpdateNeeded, websocketLoggingNeeded, websocketStaleConnectionPurgeInterval, wsClientSessionLoggingNeeded, wsConnectionLoggingNeeded
- Metrics
- This operator reports 15 metrics.
Properties
- Implementation
- C++
- Threading
- Never - Operator never provides a single threaded execution context.
- Ports (0)
-
Even though this operator is labeled as a source, it can optionally be configured to function like an analytic operator. In this changed role, it will accept an input port to receive tuples and send them to the remote clients. In addition, this operator can also have another optional port to receive a client allowlist and dynamically update it inside this operator for allowing or rejecting connections from remote clients. To meet that requirement, this operator provides two optional input ports. The presence of these two input ports are purely based on the needs of the application. They can be completely absent or only one of them can be present or both of them can be present. One of them is used to send response to a remote client and the other is used to dynamically change the client allowlist. That is why it is not possible to impose a particular ordering between them. These input ports can be used in any order and the internal operator logic knows how to identify which is which.
This is one of those two input ports. It is used for an application to send data to a remote WebSocket client as a response to the data it received earlier from that client and finished processing it. This input port must have either rstring strData or blob blobData or both of them and uint64 clientSessionId as the tuple attribute(s). By using the client session id, this operator will correlate to find the correct remote client to where a given response should be sent. If HTTP based remote clients are sending data to this operator, application logic can send its own custom HTTP response headers back to those HTTP clients. In order to do that, this input port can have an optional attribute map<rstring, rstring> httpResponseHeaders which the application logic can populate with its custom HTTP headers. When this input port is present, it is required for the application logic to send a (round-trip) response back to the client from where it received a data item. Such a response should be sent as quick as possible before the configured response timeout value. If not, a timeout handler thread running inside this operator will detect that missed reponse and do the necessary clean-up work for the remote client connection that is waiting for a response.
Attributes on this input port:- strData (required, rstring) - Any text based data (plain text, JSON or XML) can be sent to the remote WebSocket clients via this attribute.
- blobData (required, blob) - Any binary (blob) data can be sent to the remote WebSocket clients via this attribute.
- clientSessionId (required, uint64) - Client session id to where a given response should be sent.
- httpResponseHeaders (optional, map<rstring, rstring>) - This map attribute can be present if the application logic has to send custom HTTP response headers to a remote HTTP client.
All the extra input attributes will be ignored.
- Properties
-
- Optional: true
- ControlPort: false
- TupleMutationAllowed: true
- WindowingMode: NonWindowed
- WindowPunctuationInputMode: Oblivious
- Ports (1)
-
This is the other optional input port. It is used to send a new client allowlist into this operator to accept client connections only from the specified IP addresses. This input port must have only one attribute named clientWhitelist that is of type list<rstring>. This can be an empty list for accepting connections from any client with no restrictions or a list with one or more remote client IP addresses that are allowed to connect with this operator to send and receive data.
Attributes on this input port:- clientWhitelist (required, list<rstring>) - An empty list or a list of client IP addresses that are allowed to connect to this operator.
All the extra input attributes will be ignored.
As mentioned in the description for the other input port, ordering of these two ports can be at the discretion of the application developers.
- Properties
-
- Optional: true
- ControlPort: false
- TupleMutationAllowed: true
- WindowingMode: NonWindowed
- WindowPunctuationInputMode: Oblivious
- Assignments
- This operator allows any SPL expression of the correct type to be assigned to output attributes.
- Output Functions
-
- WebSocketSourceFunctions
-
- <any T> T AsIs(T)
-
The default function for output attributes. This function assigns the output attribute to the value of the input attribute with the same name.
- uint64 getClientSessionId()
-
Returns the client session ID that corresponds to the current output tuple.
- uint64 getTotalDataItemsReceived()
-
Returns the total number of data items received so far from a remote client.
- uint64 getTotalDataBytesReceived()
-
Returns the total number of data bytes received so far from a remote client.
- uint64 getTotalDataItemsSent()
-
Returns the total number of data items sent so far to a remote client.
- uint64 getTotalDataBytesSent()
-
Returns the total number of data bytes sent so far to a remote client.
- rstring getClientIpAddress()
-
Returns a string representation of the remote client IP address.
- rstring getClientPort()
-
Returns a string representation of the remote client port.
- rstring getUrlContextPath()
-
Returns the context path present in the URL being accessed by the client.
- boolean isWebSocketClient()
-
Returns whether a remote client holds a WebSocket connection.
- boolean isHttpClient()
-
Returns whether a remote client holds a HTTP connection.
- map<rstring, rstring> getHttpRequestHeaders()
-
Returns a map holding the HTTP headers that were part of a client's HTTP GET/PUT/POST request.
- rstring getHttpRequestMethodName()
-
Returns a string indicating the method name (GET, PUT or POST) found in the client's HTTP request.
- map<rstring, rstring> getUrlQueryStringKeyValuePairs()
-
Returns a map holding the key/value pairs found in the URL query string of a client's HTTP GET request.
- rstring getFullUrlBeingAccessedByTheClient()
-
Returns a string indicating the full URL being accessed by the remote client.
- rstring getFileNameInUrlForHttpGet()
-
Returns a string indicating the file name in the URL for HTTP GET-based requests.
- Ports (0)
-
This port produces the output tuples that carry the text or binary data received from the remote WebSocket or HTTP GET/PUT/POST client. The schema for this port must either have an attribute named strData with an rstring data type to hold the received text data or an attribute named blobData with a blob data type to hold the received binary data or both. Remaining attributes can be of any type based on the needs of the application. This operator is capable of sending out data received from multiple WebSocket and HTTP GET/PUT/POST clients that can all communicate with this operator at the very same time. Please refer to the custom output functions provided by this operator to query the client session id and other client session specific metrics and assign such values to other optional attributes in this output port. Depending on the needs of a given application, this operator can perform more beyond being just a source operator. If applications are interested in receiving data from a remote client, process and then send a result a.k.a response back to that client, then this output port must have an additional attribute uint64 clientSessionId. As mentioned earlier, the value of this attribute can be queried by using a custom output function. When using this operator to send a result back to a client, there must be an input port present in this operator that has its own attribute uint64 clientSessionId. Application developers can arrange their application logic to receive data from a client, process it and then send a result along with the client session id for that client. This attribute present in both the input and output ports will let this operator to do the correlation of the result/response with the correct client before sending it to that client.
There are multiple available output functions, and output attributes can also be assigned values with any SPL expression that evaluates to the proper type.
- Properties
-
- Optional: false
- TupleMutationAllowed: true
- WindowPunctuationOutputMode: Free
- Ports (1)
-
This port produces periodic output tuples to give an indication about the end of a specific WebSocket client session that was in progress moments ago for the given client session id. The schema for this port must have one attribute with its correct data type as shown here. uint64 clientSessionId This source operator will set the appropriate value for this attribute to indicate which particular WebSocket client (i.e. session id) just ended the data exchange. Downstream operators can make use of this "End Of Client Session" signal as they see fit.
- Properties
-
- Optional: false
- TupleMutationAllowed: false
- WindowPunctuationOutputMode: Free
Optional: allowHttpPost, allowPersistentHttpConnections, certificateFileName, certificatePassword, clientWhitelist, initDelay, ipv6Available, maxClientConnectionsAllowed, metricsResetInterval, newDataCpuYieldTimeInSenderThread, noDataCpuYieldTimeInSenderThread, nonTlsEndpointNeeded, nonTlsPort, numberOfMessagesToReceiveBeforeAnAck, responseTimeout, tcpNoDelay, tlsCipherWhitelist, tlsPort, trustedClientCertificateFileName, trustedClientX509SubjectIdentifiers, urlContextPath, websocketLiveMetricsUpdateNeeded, websocketLoggingNeeded, websocketStaleConnectionPurgeInterval, wsClientSessionLoggingNeeded, wsConnectionLoggingNeeded
- allowHttpPost
-
This parameter indicates whether the client applications are allowed to send data via HTTP(S) GET/PUT/POST. (Default is false)
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- allowPersistentHttpConnections
-
This parameter indicates whether this operator will allow the client applications to make persistent (Keep-Alive) HTTP connections. It is better to allow this only for non-browser based client applications. (Default is false)
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- certificateFileName
-
This parameter specifies the full path of the WebSocket server's private key and public certificate holding PEM file name. Default is "<applicationDirectory>/etc/ws-server.pem".
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- certificatePassword
-
This parameter specifies a password needed for decrypting the WebSocket server's private key in the PEM file. Default is an empty string.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- clientWhitelist
-
This parameter specifies a list of client IP addresses to accept connections only from those clients. (Default is an empty list to have no client connection restrictions.)
- Properties
-
- Type: list<rstring>
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- initDelay
-
This parameter specifies a one time delay, in seconds, for which this source operator should wait before start generating its first tuple. Default delay is 0.0.
- Properties
-
- Type: float64
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- ipv6Available
-
This parameter indicates whether the IPv6 protocol stack is available in the Linux machine where the WebSocketSource operator is running. (Default is true)
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- maxClientConnectionsAllowed
-
This parameter specifies the maximum number of concurrent clients allowed to connect with this operator. After this limit is reached, new client connections will be denied until any existing clients close their connections. (Default is 32.)
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- metricsResetInterval
-
This parameter can be used to specify a non-zero periodic time interval in minutes after which all the metrics counters will be reset to 0. (Default is 0 which will never reset the metrics counters.)
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- newDataCpuYieldTimeInSenderThread
-
This parameter specifies the CPU yield time (in partial seconds) needed inside the thread that is just about to send a new data item to the remote clients. It should be >= 0.0 (Default is 0.001 i.e. 1 millisecond)
- Properties
-
- Type: float64
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- noDataCpuYieldTimeInSenderThread
-
This parameter specifies the CPU yield time (in partial seconds) needed inside the thread that spin loops when no data is available to send to the remote clients. It should be >= 0.0 (Default is 0.001 i.e. 1 millisecond)
- Properties
-
- Type: float64
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- nonTlsEndpointNeeded
-
This parameter specifies whether a non-TLS (plain) WebSocket endpoint is needed. (Default is false)
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- nonTlsPort
-
This parameter specifies the non-TLS (plain) WebSocket port number. Default port number is 80.
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- numberOfMessagesToReceiveBeforeAnAck
-
This parameter indicates how many messages are to be received before sending an ack to the remote client. (Default is 0 i.e. no ack sent)
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- responseTimeout
-
This parameter specifies the time in seconds before which the application logic should send its pending response to a remote client. If this time expires, a timeout handler thread in this operator will do the necessary internal clean-up work. (Default is 20 seconds.)
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- tcpNoDelay
-
This parameter can be used to control the TCP Nagle's algorithm. Setting it to true will disable Nagle's algorithm and setting it to false will enable. (Default is false.)
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- tlsCipherWhitelist
-
This parameter can be used to specify a string containing one or more comma separated TLS/SSL ciphers that should be used during TLS/SSL connection negotiations with clients. It is handy when there is a need to avoid using ciphers that are found to have security vulnerabilities. (Default is an empty string.)
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- tlsPort
-
This parameter specifies the WebSocket TLS port number. Default port number is 443.
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- trustedClientCertificateFileName
-
This parameter specifies the full path of the PEM file name that can contain the public certificates of all the trusted clients. This allows for the client (mutual) authentication. If this parameter is not used or empty, then there will be no client authentication. (Default is an empty string.)
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- trustedClientX509SubjectIdentifiers
-
This parameter specifies a list of verifiable identifiers present in the subject field of the trusted client's public certificate. It is helpful in performing the client (mutual) authentication using the unsupported certificate types such as the self-signed ones. Some examples of such identifiers: ["ST=New York","L=Armonk","O=IBM","CN=www.ibm.com","emailAddress=websocket.streams@ibm.com"] (Default is an empty list.)
- Properties
-
- Type: list<rstring>
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- urlContextPath
-
This parameter specifies a list with zero or more URL context path(s) for a given WebSocket server endpoint. Users can come up with any application-specific value(s) made of either a single or a multi-part path (e.g., "Orders", "MyServices/Banking/Deposit"). With that example, WebSocket server URL should either be https://host:port/Orders (OR) https://host:port/MyServices/Banking/Deposit. Default is an empty list to indicate no URL context path.
- Properties
-
- Type: list<rstring>
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- websocketLiveMetricsUpdateNeeded
-
This parameter specifies whether live update for this operator's custom metrics is needed. (Default is true)
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- websocketLoggingNeeded
-
This parameter specifies whether logging is needed from the WebSocket library. (Default is false)
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- websocketStaleConnectionPurgeInterval
-
This parameter specifies periodic time interval in seconds during which any stale remote client connection should be purged to free up memory usage. (Default is 0 i.e. no purging)
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- wsClientSessionLoggingNeeded
-
This parameter specifies whether logging is needed when the remote client session is in progress with this operator. (Default is false)
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- wsConnectionLoggingNeeded
-
This parameter specifies whether logging is needed when the remote clients connect and disconnect. (Default is false)
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- WebSocketSource
-
(stream<${dataSchema}> ${outputDataStream} as D; stream<uint64 clientSessionId> ${closedSessionStream}) = com.teracloud.streams.websocket.op::WebSocketSource() { param ${parameter}: ${parameterExpression}; output D: ${attributeAssignment}; } - WebSocketSource with HTTP response data port
-
(stream<${dataSchema}> ${outputDataStream} as D; stream<uint64 clientSessionId> ${closedSessionStream}) = com.teracloud.streams.websocket.op::WebSocketSource(${inputResponseStream}) { param ${parameter}: ${parameterExpression}; output D: ${attributeAssignment}; } - WebSocketSource with allowlist control port
-
(stream<${dataSchema}> ${outputDataStream} as D; stream<uint64 clientSessionId> ${closedSessionStream}) = com.teracloud.streams.websocket.op::WebSocketSource(; ${inputControlStream}) { param ${parameter}: ${parameterExpression}; output D: ${attributeAssignment}; }
- nClientsConnected - Gauge
-
Number of remote clients currently connected to this operator invocation.
- nDataBytesReceivedFromRemoteClients - Gauge
-
Total number of data bytes received by this operator invocation.
- nDataBytesSentToRemoteClients - Gauge
-
Total number of data bytes sent to the remote clients thus far.
- nDataItemsReceivedFromRemoteClients - Gauge
-
Number of data items received from the remote clients thus far.
- nDataItemsSentToRemoteClients - Gauge
-
Number of data items sent to the remote clients thus far.
- nHttpMessagesReceivedFromRemoteClients - Gauge
-
Number of HTTP GET, PUT and POST messages received by this operator invocation.
- nHttpPostAllowed - Gauge
-
Does this operator allow HTTP GET/PUT/POST from clients?
- nNonTlsPort - Gauge
-
Non-TLS port number configured for this operator.
- nNonTlsPortNeeded - Gauge
-
Did the user configure to exchange data via a non-TLS port?
- nResponseTimeoutOccurrences - Gauge
-
Total number of response timeout occurrences thus far.
- nSizeOfMostRecentDataItemReceived - Gauge
-
Size of the most recent data item received from the remote server.
- nSizeOfMostRecentDataItemSent - Gauge
-
Size of the most recent data item sent to the remote server.
- nTcpNoDelay - Gauge
-
Did the user configure TCP_NODELAY for this operator?
- nTimeTakenToSendMostRecentDataItem - Gauge
-
Time taken in milliseconds to send the most recent data item to the remote server.
- nTlsPort - Gauge
-
TLS port number configured for this operator.
- Boost Library
- Websocketpp Library