Multi-threading and operator fusion considerations

SPL operators can potentially run in a multi-threaded context, where the process functions of the operators are invoked concurrently. SPL provides a set of utility classes that handle threads, locks, and conditional variables to protect against concurrent access.

Here is a summary of how to write thread-safe operators and details about which conditions multi-threading occurs.

Writing Thread-safe Operators: Operator writers can use the AutoPortMutex class to protect their operator state from concurrent access. This class provides a single style of programming for the operator developer, but avoids paying the performance cost that is associated with locking in cases where a multi-threaded context does not exist. The SPL run time is responsible for ensuring that these optimizations are in place. In order for the SPL run time to assert the correct multi-threading behavior of the operator graph under fusion, operator developers must provide information about the threading behavior of their operators in their operator models.

Here is a summary of multi-threading considerations, from the perspective of writing operators, as follows:

  • If an operator is stateful, the operator developer must use AutoPortMutex to protect the operator state from concurrent access.
  • The operator model must be configured to inform the SPL compiler about the operator's threading behavior.

Additionally, as an operator developer, consider protecting the operator state with an AutoMutex, when:

  • The operator is designed to handle periodic checkpointing and requires access to the operator state for checkpoint creation. The getCheckpoint function is called concurrently with the process functions.
  • The operator has additional threads that run its non-input process functions, which require access to the operator state.

The prepareToShutdown function is run concurrently with any other thread involved in operator processing.

Threading Behavior with Fusion: While it is sufficient to follow the these guidelines to create operators that can run in a multi-threaded setting in a safe and efficient manner, the developer often wants to have a better insight in terms of how and under which conditions an operator graph fused inside a PE is multi-threaded. Under fusion, operators that are within the same PE are connected to each other by streams that are implemented by regular function calls. The following is a list of conditions under which multi-threading issues come into the picture when running the graph of operators that are tied together by function calls.

  • When you use the default TCP transport, each PE input port is served by a separate thread. Each such thread makes calls only to the operator that is connected to the PE input port served by the thread.
  • When you use the optional LLM transport, there might be a number of threads that serve the set of input ports, depending on the details of the LLM configuration. However, at any time only one thread is active for a given port. Each such thread makes calls only to the operator that is connected to the PE input port served by the thread.
  • Any tuple-submitting source operator in the operator graph introduces a separate thread, which drives the execution that is rooted at that operator.
  • Any operator input port marked by a threadedPort configuration has a separate thread that drives the execution for that input port.
  • Any additional threads that are created by an operator, which are used to submit tuples, drive the execution of the flow that is rooted at the output ports on which they submit.
  • Any windowing related threads that are created as a result of time-based window policies that are used to submit tuples, drive the execution of the flow that is rooted at the output ports on which they submit.

A PE port always maps to a single operator port. The relation between the PE and operator ports can be summarized as follows:

  • An operator input port is exposed as a PE port, if:
    • it is connected to the output port of an operator from a different PE
    • it has an imported stream that is connected to it.
  • An operator output port is exposed as a PE port, if:
    • it is connected to the input port of an operator from a different PE
    • it has an exported stream that is connected to it.

Waiting within Process Functions: Care must be taken when an operator performs a wait operation in a process function and expects to be signaled by another call to a process function. This signal is generally achieved by the use of conditional variables. Use extreme caution with this kind of pattern. There are two potential problems to watch for. First, depending on the threading context in which the operator is running, the process call that performs the wait operation and the one that is supposed to perform the signal operation might or might not be executed by different threads. If there is only a single thread that is involved, then the wait call will never unblock, resulting in a deadlock. A common use case is to synchronize two streams, in which case the wait and the signal operations are performed by process calls attached to different ports. When an operator implements this kind of processing pattern, explicitly state in the operator documentation the requirement that the ports must be executed by different threads. In turn the application developer who uses this operator must verify that the operator is used in the right context within the stream graph, allowing ports in question to be executed by different threads.

Second, in the case that the operator has custom logic, the implicit locking performed by the SPL language run time prevents any thread other than the one that is currently running a process call to proceed with running process calls. This processing results in a deadlock if the thread is blocked on a wait call. This processing can be avoided by disabling custom logic in the operator model.