Operator Sort
The Sort operator is used to order tuples that are based on user-specified ordering expressions and window configurations.
Checkpointed data
When the Sort operator is checkpointed, the contents of its window is saved in checkpoint. The window is checkpointed incrementally to reduce checkpoint size and latency. Logic state variables (if present) are also included in checkpoint.
Behavior in a consistent region
A Sort operator can be used in a consistent region, but it cannot be the start of the region. In a consistent region, a Sort operator stores its state when a checkpoint is taken. When the region is reset, the operator restores the state from the checkpoint.
Checkpointing behavior in an autonomous region
When the Sort operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.
When the Sort operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.
Such checkpointing behavior is subject to change in the future.
Windowing
The Sort operator supports the following window configurations:
tumbling, (count | delta | time | punctuation)-based eviction (, partitioned (, partitionEvictionSpec)? )?
sliding, count-based eviction, count-based trigger of 1 (, partitioned (, partitionEvictionSpec)? )?
For the tumbling variants, tuples are sorted when the window gets full and are output immediately. A window marker punctuation is output at the end.
For the sliding variants, tuples are always kept in sorted order. When the window gets full, every new tuple causes the first one in the sorted order to be removed from the window and output. This type of sort is referred to as progressive sort.
For the partitioned variants, the window specification applies to individual subwindows identified by the partitionBy parameter.
For the tumbling variants, the final punctuation marker does not flush the window (so as not to break invariants on the output). For the sliding variants (progressive), the final punctuation marker does flush the window.
When a tumbling, punctuation-based window with no tuples in it receives a window marker punctuation, just a window marker punctuation is output.
Assignments
The Sort operator does not allow assignments to output attributes. The output tuple attributes are automatically forwarded from the input ones.
Examples
This example uses the Sort operator.
composite Main {
graph
stream<rstring name, uint32 age, uint64 salary> Beat = Beacon() {}
// count based window
stream<Beat> Sorted0 = Sort(Beat)
{
window
Beat : tumbling, count(10);
param
sortBy : name, (float64)salary/(float64)age;
}
// count based partitioned window
stream<Beat> Sorted1 = Sort(Beat)
{
window
Beat : tumbling, count(10), partitioned;
param
partitionBy : name;
sortBy : (float64)salary/(float64)age;
}
// count based window, with sort order
stream<Beat> Sorted2 = Sort(Beat)
{
window
Beat : tumbling, count(10);
param
sortBy : name, (float64)salary/(float64)age;
order : descending;
}
// count based window, with sort order for each sortBy expression
stream<Beat> Sorted3 = Sort(Beat)
{
window
Beat : tumbling, count(10);
param
sortBy : name, (float64)salary/(float64)age;
order : ascending, descending;
}
// punctuation based window
stream<Beat> Sorted4 = Sort(Beat)
{
window
Beat : tumbling, punct();
param
sortBy : name, (float64)salary/(float64)age;
}
// time based window
stream<Beat> Sorted5 = Sort(Beat)
{
window
Beat : tumbling, time(10);
param
sortBy : name, (float64)salary/(float64)age;
}
// delta based window
stream<uint32 id, uint32 age, uint64 salary> BeatId = Beacon() {}
stream<BeatId> Sorted6 = Sort(BeatId)
{
window
BeatId : tumbling, delta(id, 10u);
param
sortBy : (float64)salary/(float64)age;
}
// progressive sort
stream<Beat> Sorted = Sort(Beat)
{
window
Beat : sliding, count(10);
param
sortBy : name, (float64)salary/(float64)age;
}
}
Summary
- Ports
- This operator has 1 input port and 1 output port.
- Windowing
- This operator requires a windowing configuration.
- Parameters
- This operator supports 3 parameters.
Required: sortBy
Optional: order, partitionBy
- Metrics
- This operator reports 1 metric.
Properties
- Implementation
- C++
- Threading
- WindowEvictionBound - Operator provides a single threaded execution context only if a time-based window eviction policy is not used.
- Ports (0)
-
The Sort operator is configurable with a single input port, which ingests tuples to be sorted. The Sort operator processes window punctuation markers when configured with a punctuation-based window.
- Properties
-
- Optional: false
- ControlPort: false
- TupleMutationAllowed: false
- WindowingMode: Windowed
- WindowPunctuationInputMode: WindowBound
- Assignments
- This operator allows any SPL expression of the correct type to be assigned to output attributes. Attributes not assigned in the output clause will be automatically assigned from the attributes of the input ports that have the same name and type. If there is no such input attribute, an error is reported at compile-time.
- Ports (0)
-
The Sort operator is configurable with a single output port, which produces sorted tuples. The Sort operator generates a punctuation after each batch of sorted tuples it outputs. The Sort operator requires that the stream type for the output port matches the stream type for the input port.
- Properties
-
- Optional: false
- TupleMutationAllowed: true
- WindowPunctuationOutputMode: Generating
Required: sortBy
Optional: order, partitionBy
- order
-
Specifies either the global sort order, or the sort order for the individual expressions that appear in the sortBy parameter. The valid values are ascending and descending. When a single value is specified for the order parameter, it determines the global sort order. When multiple values are specified, then the number of values must match the number of sortBy expressions.
- Properties
-
- Type: SortOrder (ascending, descending)
- Optional: true
- ExpressionMode: CustomLiteral
- partitionBy
-
Specifies one or more expressions to be used for partitioning the input tuples into subwindows, where all window and parameter configurations apply to the subwindows, independently. This parameter is only valid for a Sort operator that is configured with a partitioned window.
- Properties
-
- Optional: true
- ExpressionMode: Expression
- sortBy
-
Specifies one or more expressions to be used for sorting the tuples. The sort is performed in lexicographical manner in ascending order. That is, the first expression is used first for the comparison and in the case of equality the second expression is considered, and so on. The default sort order of ascending implies that the output stream produces tuples in non-decreasing order. The sort order can be changed by using the order parameter.
- Properties
-
- Optional: false
- ExpressionMode: Expression
- Sort
-
stream<${schema}> ${streamName} = Sort(${inputStream}) { window ${inputStream}: ${windowMode}; param sortBy : ${sortExpression} }
- nCurrentPartitions - Gauge
-
Returns the number of partitions in the current window.