Operator Custom
The Custom operator is a special logic-related operator that can receive and send any number of streams and does not do anything by itself. Thus, it offers a blank slate for customization.
The Custom operator can submit tuples from within its onTuple, onPunct, and onProcess clauses. It requires special support from the compiler. The submit intrinsic function is available only in the Custom operator, and has the following signatures:
<tuple T> void submit (T tupleValue, T port) //1
<tuple T> void submit (enum{WindowMarker, FinalMarker} punctuation, T port ) //2
<tuple T> void submit (T tupleValue, uint32 port) //3
<tuple T> void submit (enum{WindowMarker, FinalMarker} punctuation, uint32 port) //4
It is preferable to use the first two versions of the submit function because the symbolic name of the output stream is used. This symbolic name allows the output stream order to be changed without needing to update calls to submit and the tuple type to be checked at compile time.
streams<int32 a, int32 b> A = Custom() {
logic onProcess : {
mutable int32 i = 0;
for (int32 j in range (1000)) {
int32 a = i++;
int32 b = i++;
submit ({a = a, b = b}, A);
}
}
}
If the Custom operator has one or more input streams, then the onTuple and onPunct clauses are allowed.
Checkpointed data
When the Custom operator is checkpointed, logic state variables (if present) are saved in checkpoint.
Behavior in a consistent region
The Custom operator can be an operator within the reachability graph of a consistent region. It cannot be the start of a consistent region. Custom operators with an 'onProcess' clause are not supported. The Streams instance automatically checkpoints and resets logic state variables.
Checkpointing behavior in an autonomous region
When the Custom operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, a background thread in SPL Runtime checkpoints the operator every T seconds, and such periodic checkpointing activity is asynchronous to tuple processing. Upon restart, the operator restores its state from the last checkpoint.
When the Custom operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.
Such checkpointing behavior is subject to change in the future.
Examples
This example uses the Custom operator.
composite Main {
graph
stream<rstring color, int32 intensity> Sensors = Beacon() {}
stream<rstring color> Queries = Beacon() {}
stream<rstring key, int32 val> Output = Custom(Sensors; Queries) {
logic state : mutable map<rstring, int32> m;
onTuple Sensors: m[color] = intensity;
onTuple Queries: if (color in m)
submit({key=color, val=m[color]}, Output);
onPunct Queries: if (currentPunct() == Sys.WindowMarker)
submit(Sys.WindowMarker, Output);
}
}
Summary
- Ports
- This operator has 0 or more input ports and 0 or more output ports.
- Windowing
- This operator does not accept any windowing configurations.
- Parameters
- This operator does not support parameters.
- Metrics
- This operator does not report any metrics.
Properties
- Implementation
- C++
- Threading
- Always - Operator always provides a single threaded execution context.
- Ports (0...)
-
The Custom operator can have zero or more input ports, which ingest input tuples.
- Properties
-
- ControlPort: false
- TupleMutationAllowed: true
- WindowingMode: NonWindowed
- WindowPunctuationInputMode: Oblivious
- Assignments
- This operator does not allow assignments to output attributes.
- Ports (0...)
-
The Custom operator can have zero or more output ports, which produce submitted tuples. If the tuple is referenced after a submit call, the compiler ensures that the tuple is unchanged by submitting a copy of the tuple.
- Properties
-
- TupleMutationAllowed: true
- WindowPunctuationOutputMode: Generating
- Custom Sink
-
() as ${opInstanceName}Sink = Custom(${inputStream}) { logic onTuple ${inputStream}: { // Add code here ${cursor} } }
- Custom with Input and Output
-
stream<${schema}> ${outputStream} = Custom(${inputStream}) { logic onTuple ${inputStream}: { // Add code here ${cursor} } }
- Custom as a Source
-
stream<${schema}> ${outputStream} = Custom() { logic onProcess : { // Add code here ${cursor} } }