Multi-threading and operator fusion considerations
SPL operators can potentially run in a multi-threaded context, where the process functions of the operators are invoked concurrently. SPL provides a set of utility classes that handle threads, locks, and conditional variables to protect against concurrent access.
Here is a summary of how to write thread-safe operators and details about which conditions multi-threading occurs.
Writing Thread-safe Operators: Operator writers can use the AutoPortMutex
class
to protect their operator state from concurrent access. This class
provides a single style of programming for the operator developer,
but avoids paying the performance cost that is associated with locking
in cases where a multi-threaded context does not exist. The SPL run time
is responsible for ensuring that these optimizations are in place. In
order for the SPL run time to assert the correct multi-threading behavior
of the operator graph under fusion, operator developers must provide
information about the threading behavior of their operators in their
operator models.
Here is a summary of multi-threading considerations, from the perspective of writing operators, as follows:
- If an operator is stateful, the operator developer must use
AutoPortMutex
to protect the operator state from concurrent access. - The operator model must be configured to inform the SPL compiler about the operator's threading behavior.
Additionally, as an operator developer, consider protecting the
operator state with an AutoMutex
, when:
- The operator is designed to handle periodic checkpointing and
requires access to the operator state for checkpoint creation. The
getCheckpoint
function is called concurrently with the process functions. - The operator has additional threads that run its non-input process functions, which require access to the operator state.
The prepareToShutdown
function is run concurrently
with any other thread involved in operator processing.
Threading Behavior with Fusion: While it is sufficient to follow the these guidelines to create operators that can run in a multi-threaded setting in a safe and efficient manner, the developer often wants to have a better insight in terms of how and under which conditions an operator graph fused inside a PE is multi-threaded. Under fusion, operators that are within the same PE are connected to each other by streams that are implemented by regular function calls. The following is a list of conditions under which multi-threading issues come into the picture when running the graph of operators that are tied together by function calls.
- When you use the default TCP transport, each PE input port is served by a separate thread. Each such thread makes calls only to the operator that is connected to the PE input port served by the thread.
- When you use the optional LLM transport, there might be a number of threads that serve the set of input ports, depending on the details of the LLM configuration. However, at any time only one thread is active for a given port. Each such thread makes calls only to the operator that is connected to the PE input port served by the thread.
- Any tuple-submitting source operator in the operator graph introduces a separate thread, which drives the execution that is rooted at that operator.
- Any operator input port marked by a
threadedPort
configuration has a separate thread that drives the execution for that input port. - Any additional threads that are created by an operator, which are used to submit tuples, drive the execution of the flow that is rooted at the output ports on which they submit.
- Any windowing related threads that are created as a result of time-based window policies that are used to submit tuples, drive the execution of the flow that is rooted at the output ports on which they submit.
A PE port always maps to a single operator port. The relation between the PE and operator ports can be summarized as follows:
- An operator input port is exposed as a PE port, if:
- it is connected to the output port of an operator from a different PE
- it has an imported stream that is connected to it.
- An operator output port is exposed as a PE port, if:
- it is connected to the input port of an operator from a different PE
- it has an exported stream that is connected to it.
Waiting within Process Functions: Care must be taken when
an operator performs a wait
operation in a process
function
and expects to be signaled by another call to a process function. This
signal is generally achieved by the use of conditional variables.
Use extreme caution with this kind of pattern. There are two potential
problems to watch for. First, depending on the threading context in
which the operator is running, the process call that performs the wait
operation
and the one that is supposed to perform the signal
operation might
or might not be executed by different threads. If there is only a
single thread that is involved, then the wait
call will
never unblock, resulting in a deadlock. A common use case is to synchronize
two streams, in which case the wait
and the signal
operations are performed
by process calls attached to different ports. When an operator implements
this kind of processing pattern, explicitly state in the operator
documentation the requirement that the ports must be executed by different
threads. In turn the application developer who uses this operator must
verify that the operator is used in the right context within the stream
graph, allowing ports in question to be executed by different threads.
Second, in the case that the operator has custom logic, the implicit
locking performed by the SPL language run time prevents any thread
other than the one that is currently running a process call to proceed
with running process calls. This processing results in a deadlock
if the thread is blocked on a wait
call. This processing
can be avoided by disabling custom logic in the operator model.