Operator ThreadedSplit
The ThreadedSplit operator splits tuples across multiple output ports to improve concurrency.
One thread is used for each output port to submit tuples as quickly as possible. It is not possible to determine in advance to which output port a tuple is sent. The ThreadedSplit operator maintains one buffer for each output port. When all the queues fill up, the ThreadedSplit operator busy waits until there is space in one of them to add a new tuple, therefore blocking the input tuple. When a thread that submits tuples encounters an empty buffer, it busy waits until a tuple arrives at its buffer.
The ThreadedSplit operator is not a direct replacement for the Split operator. The ThreadedSplit operator does not allow choosing to which stream a tuple is sent.
For information about controlling the mechanism that is used when the ThreadedSplit is waiting on a full queue, see the --avoid-runtime-yield compiler option in the sc command.
Checkpointed data
When the ThreadedSplit operator is checkpointed, logic state variables (if present) are saved in checkpoint.
Behavior in a consistent region
The ThreadedSplit operator can be an operator within the reachability graph of a consistent region. During drain processing, all threads finish submitting queued tuples. During reset processing, the operator discards all currently queued tuples. In a consistent region, a ThreadedSplit operator stores its state when a checkpoint is taken. When the region is reset, the operator restores the state from the checkpoint.
Checkpointing behavior in an autonomous region
When the ThreadedSplit operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.
When the ThreadedSplit operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.
Such checkpointing behavior is subject to change in the future.
Examples
This example uses the ThreadedSplit operator.
composite Main {
graph
stream<rstring name> Input = Beacon () {}
// split tuples to two output streams.
(stream<rstring name> Out1;
stream<rstring name> Out2) = ThreadedSplit(Input)
{
param
bufferSize: 1000u;
}
}
Summary
- Ports
- This operator has 1 input port and 1 or more output ports.
- Windowing
- This operator does not accept any windowing configurations.
- Parameters
- This operator supports 1 parameter.
Required: bufferSize
- Metrics
- This operator reports 6 metrics.
Properties
- Implementation
- C++
- Threading
- Never - Operator never provides a single threaded execution context.
- Ports (0)
-
The ThreadedSplit operator has one input port, which ingests tuples to be split amongst multiple threads.
- Properties
-
- Optional: false
- ControlPort: false
- TupleMutationAllowed: false
- WindowingMode: NonWindowed
- WindowPunctuationInputMode: Oblivious
- Assignments
- This operator does not allow assignments to output attributes.
- Ports (0)
-
The ThreadedSplit operator is configurable by having one or more output ports. The schema for each output port must match the schema of the input port. The output tuple attributes are automatically forwarded from the input ones.
- Properties
-
- Optional: false
- TupleMutationAllowed: true
- WindowPunctuationOutputMode: Preserving
- Ports (1...)
-
Additional ports that produce the split tuples.
- Properties
-
- TupleMutationAllowed: true
- WindowPunctuationOutputMode: Preserving
Required: bufferSize
- bufferSize
-
Specifies the size of each buffer that is used to store the input tuples for each output port. The final punctuation marker is not forwarded until all buffers are drained.
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: false
- ExpressionMode: AttributeFree
- ThreadedSplit
-
(stream<${inputStream}> ${outputStream1};stream<${inputStream}> ${outputStream2}) = ThreadedSplit(${inputStream}) { param bufferSize: ${bufferSize}; }
- maxItemsQueued (port N) - Gauge
-
The largest number of items queued for port number N.
- nEnqueueWaits - Gauge
-
The number of waits due to all queues full.
- nItemsQueued (port N) - Gauge
-
The number of items currently queued for port number N.
- queueSize - Gauge
-
The size of the queue for each output port.
- recentMaxItemsQueued (port N) - Gauge
-
The recent largest number of items queued for port number N.
- recentMaxItemsQueuedInterval - Gauge
-
The interval used to determine the recent largest number of items queued (milliseconds), where the number reported is the largest number from the current and previous intervals, and each interval duration is five minutes.
- spl-std-tk-lib