Operator Pair
The Pair operator is used to pair tuples from two or more streams. Corresponding tuples from each input port are used to create a series of output tuples. The Pair operator creates and sends these output tuples only when all the tuples from the input ports are received. It operates exactly like the Barrier operator, with one major difference: rather than combining the input tuples into one output tuple, it outputs them individually, using the order of the input ports, followed by a window marker. As a result, all the input ports and the output port must have the same schema.
Checkpointed data
When the Pair operator is checkpointed, the unpaired tuples are saved in checkpoint.
Behavior in a consistent region
The Pair operator can be an operator within the reachability graph of a consistent region. It cannot be the start of a consistent region. When in a consistent region, the Pair operator checkpoints the unpaired tuples. During reset processing, unpaired tuples are discarded and tuples that are written to a checkpoint are restored.
Checkpointing behavior in an autonomous region
When the Pair operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.
When the Pair operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.
Such checkpointing behavior is subject to change in the future.
Examples
This example uses the Pair operator.
composite Main {
graph
// with no buffer size, used for parallel tasks
stream<rstring name, uint32 value> OpA = Beacon() {}
stream<rstring name, uint32 value> OpB = Beacon() {}
stream<rstring name, uint32 value> Res1 = Pair(OpA; OpB) {}
// with buffer size, used for synchronizing independent sources
stream<rstring name, uint32 value> OpC = Beacon() {}
stream<rstring name, uint32 value> Res2 = Pair(OpA; OpC)
{
param bufferSize : 1000u;
}
// with partitioning
stream<rstring name, uint32 value> Res3 = Pair(OpA; OpB)
{
param partitionByLHS : OpA.name;
partitionByRHS : OpB.name;
}
}
Summary
- Ports
- This operator has 2 or more input ports and 1 output port.
- Windowing
- This operator does not accept any windowing configurations.
- Parameters
- This operator supports arbitrary parameters in addition to 3 specific parameters.
Optional: bufferSize, partitionByLHS, partitionByRHS
- Metrics
- This operator does not report any metrics.
Properties
- Implementation
- C++
- Threading
- Always - Operator always provides a single threaded execution context.
- Ports (0...1)
-
The Pair operator is configurable with two or more input ports, which ingest tuples to be paired. The Pair operator does not support custom port logic to be specified in its invocations.
- Properties
-
- Optional: false
- ControlPort: false
- TupleMutationAllowed: false
- WindowingMode: NonWindowed
- WindowPunctuationInputMode: Oblivious
- Ports (2...)
-
Additional ports that ingest tuples to be paired.
- Properties
-
- ControlPort: false
- TupleMutationAllowed: false
- WindowingMode: NonWindowed
- WindowPunctuationInputMode: Oblivious
- Assignments
- This operator does not allow assignments to output attributes.
- Ports (0)
-
The Pair operator is configurable with a single output port, which produces paired tuples, matching input port declaration order.
- Properties
-
- Optional: false
- TupleMutationAllowed: true
- WindowPunctuationOutputMode: Generating
Optional: bufferSize, partitionByLHS, partitionByRHS
- bufferSize
-
Specifies the size of the internal buffer that is used to queue up tuples from an input port that do not yet have matching tuples from other ports. This parameter is not supported in a consistent region. For more information, see the bufferSize parameter in the Barrier operator.
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- partitionByLHS
-
Specifies the expressions that are used for partitioning the input tuples from the left port, where the synchronization applies to matching partitions from different ports. For more information, see the partitionByLHS parameter in the Barrier operator.
- Properties
-
- Optional: true
- ExpressionMode: Expression
- partitionByRHS
-
Specifies the expressions that are used for partitioning the input tuples from the right port, where the synchronization applies to matching partitions from different ports. For more information, see the partitionByRHS parameter in the Barrier operator.
- Properties
-
- Optional: true
- ExpressionMode: Expression
- Pair
-
stream<${schema}> ${outputStream} = Pair(${inputStream1};${inputStream2}) { param ${parameter}:${parameterExpression}; output ${outputStream}: ${outputExpression}; }