Operator DeDuplicate
The DeDuplicate operator suppresses duplicate tuples that are seen within a specified time period.
Checkpointed data
When the DeDuplicate operator is checkpointed, it checkpoints the most recently seen tuples according to the given 'timeOut', 'count', or 'delta' value. Logic state variables (if present) are also included in checkpoint.
Behavior in a consistent region
The DeDuplicate operator can be an operator within the reachability graph of a consistent region. It cannot be the start of a consistent region. The DeDuplicate operator checkpoints and resets the most recently seen tuples according to the given 'timeOut', 'count', or 'delta' value. Any logic state variables (if present) are also automatically checkpointed and resetted. The behavior of the 'timeOut' parameter does not change. This means that the time spent draining, checkpointing, and resetting the consistent region are part of the elapsed time accounted for evicting tuples from the operator.
Checkpointing behavior in an autonomous region
When the DeDuplicate operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.
When the DeDuplicate operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.
Such checkpointing behavior is subject to change in the future.
Examples
This example uses the DeDuplicate operator.
composite Main {
graph
stream<rstring name, uint32 age> Data = Beacon () {
param period : 0.1;
output
Data : name = "test", age = (uint32)(random()*80.0);
}
// only pass tuples that have unique ages in the last minute
stream<rstring name, uint32 age> Out = DeDuplicate(Data)
{
param
timeOut : 60.0; // only remember for 60 seconds
key : age / 3u; // expression to be used for checking duplicates
}
}
//detect duplicates based on couple of attribute values
// Duplicated data will go to the Duplicates stream
(stream<rstring name , rstring country> Out; stream<Data> Duplicates) = DeDuplicate (Data) {
param timeOut : 60.0;
key : name, country; //duplicate checking uses both name and country.
// "Harvey", "Canada" is not a duplicate of "Harvey", "USA"
}
Summary
- Ports
- This operator has 1 input port and 2 output ports.
- Windowing
- This operator does not accept any windowing configurations.
- Parameters
- This operator supports 7 parameters.
Optional: count, delta, deltaAttribute, flushOnPunctuation, key, resetOnDuplicate, timeOut
- Metrics
- This operator does not report any metrics.
Properties
- Implementation
- C++
- Threading
- Always - Operator always provides a single threaded execution context.
- Ports (0)
-
The DeDuplicate operator has one input port, which ingests the possibly duplicated tuples.
- Properties
-
- Optional: false
- ControlPort: false
- TupleMutationAllowed: false
- WindowingMode: NonWindowed
- WindowPunctuationInputMode: Oblivious
- Ports (0)
-
The DeDuplicate operator has one or two output ports. The first output port receives the tuples that are not duplicates for the criteria.
The first output port allows assignments to output attributes. The output tuple attributes whose assignments are not specified are automatically forwarded from the input ones. After the automatic forwarding, the operator expects all output tuple attributes to be completely assigned.
- Assignments
- This port set allows any SPL expression of the correct type to be assigned to output attributes. Attributes not assigned in the output clause will be automatically assigned from the attributes of the input ports that have the same name and type. If there is no such input attribute, an error is reported at compile-time.
- Properties
-
- Optional: false
- TupleMutationAllowed: true
- WindowPunctuationOutputMode: Preserving
- Ports (1)
-
The second output port (if present) receives the tuples that are duplicates for the criteria. The stream type of the second output port must match that of the input port.
- Assignments
- This port set does not allow assignments to output attributes.
- Properties
-
- Optional: true
- TupleMutationAllowed: false
- WindowPunctuationOutputMode: Preserving
Optional: count, delta, deltaAttribute, flushOnPunctuation, key, resetOnDuplicate, timeOut
- count
-
Specifies that identical tuples are suppressed within the next count tuples that are received. This parameter cannot be used with the timeOut or deltaAttribute parameters.
- Properties
-
- Type: uint64
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- delta
-
Specifies the difference between the original value of the deltaAttribute parameter on an unduplicated tuple and that of the current tuple. If the difference is greater than the delta parameter value, the tuple is emitted.
- Properties
-
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- deltaAttribute
-
Specifies the input attribute that is used to control the suppression of duplicates. This parameter is analogous to the delta function for window clauses. Together with the delta parameter, duplicate tuples are suppressed unless the original value of the deltaAttribute parameter is increased more than the delta value. This parameter is of type integral, floating point, enum, or time stamp.
This parameter cannot be used with the count or timeOut parameters.
- Properties
-
- Cardinality: 1
- Optional: true
- ExpressionMode: Attribute
- flushOnPunctuation
-
Specifies whether punctuation causes the operator to forget all history of remembered tuples. If this parameter is not specified, the default value is false. If the parameter value is true, all remembered keys are erased when punctuation is received.
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: Constant
- key
-
Specifies a list of expressions that are used to determine whether a tuple is a duplicate. If this parameter is omitted, the whole tuple is used as the key.
- Properties
-
- Optional: true
- ExpressionMode: Expression
- resetOnDuplicate
-
Specifies whether a duplicate tuple that is suppressed causes the timeOut, count, or deltaAttribute on the saved tuple to be reset to that of the current value. The timeOut resets the time to the current time. The count resets to the current tuple number. The deltaAttribute resets to the current value of the attribute.
If this parameter is not specified, the default value is true.
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: Constant
- timeOut
-
Specifies the number of seconds during which no duplicate of a tuple is emitted. If this parameter is not specified, the default value is 600.0 seconds (10 minutes). Identical tuples, which are separated by more than timeOut seconds, are seen on the output port.
This parameter cannot be used with the count or deltaAttribute parameters.
Note: Tuples are retained by the DeDuplicate operator until timeOut seconds elapse, count tuples are processed, or until delta is large enough. If the rate of incoming unique tuples is large, large values of these parameters might cause the operator to occupy a huge amount of memory.
- Properties
-
- Type: float64
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- DeDuplicate
-
stream<${schema}> ${outputStream} = DeDuplicate(${inputStream}) { param ${parameter}:${parameterExpression}; output ${outputStream} : ${outputExpression}; }
- DeDuplicate with Suppressed Tuples
-
(stream<${schema}> ${outputStream}; stream<${inputStream}> ${outputStream2}) = DeDuplicate(${inputStream}) { param ${parameter}:${parameterExpression}; output ${outputStream} : ${outputExpression}; }