Logic clause
The logic
clause consists of local operator
state that persists across operator firings, and statements that execute
when an input port receives a tuple or a punctuation and thus fires.
For example:
composite Main {
graph
stream<float64 sample> IStr = Beacon() { }
stream<float64 runningAvg> FStr = Functor(IStr) {
logic state : { mutable float64 avg = 0.0, alpha = 0.1;
mutable int64 no = 0; }
onTuple IStr : { if (no == 0l) {
avg = sample; //that's IStr.sample
} else {
float64 beta = 1.0 - alpha;
avg = alpha * sample + beta * avg;
}
no++;
}
onPunct IStr : no = 0l;
param filter : true;
output FStr : runningAvg = avg;
}
}
The state:
label introduces variable definitions. These
logic-state variables are initialized during program start, and persist until the program
terminates, unless there is a partial fault and the current execution container dies. The example
has only one input port IStr
and the onTuple IStr:
label
introduces code to be run each time that a tuple arrives on that stream. It opens up the scope of
the input stream, so attributes can be accessed without qualifiers. SPL supports custom logic on
each port and the state variables are shared among them. State variables that have the same name as
an attribute shadow the attribute. When a state variable shadows an attribute, the attribute can be
fully qualified with the stream name. The order of the different subclauses for logic does not
matter, since only at most one of them runs each time the operator fires. The code itself is
typically a block, but can be any SPL statement. Access to operator invocation state is implicitly
synchronized. The SPL compiler inserts a lock per operator instance to ensure that no two threads
experience race conditions from concurrent access to state variables in the stream logic clause.
The lock that serializes access to
the state does not cover state that is introduced by the native language implementation of the
operator.
Any onPunct
port logic can call the following
intrinsic function:
enum{WindowMarker,FinalMarker} currentPunct() //1
A return
(with no return expression)
is allowed in logic onTuple
, onProcess
,
and onPunct
clauses. A return causes the remainder
of the onTuple
, onProcess
, and onPunct
clause
to be skipped. For operators other than Custom,
normal processing of the tuple is still done.
While all operator invocations have an optional logic clause, SPL has one special logic-related
operator, Custom. The Custom operator can receive and send any
number of streams, and does not do anything by itself. Thus, it offers a blank slate for
customization. The Custom operator can submit tuples to
output streams by using the submit
function.
The logic onProcess
clause is only available in the
Custom operator. If present, the logic onProcess
clause is run
once at the beginning of the program execution.
Tuple modification and port mutability
When tuples can be modified depends on the mutability of the operator ports. Ports of primitive operators can be either mutating or non-mutating. For more information about the port mutability for each port of each operator, see the Teracloud Streams Standard Toolkit Reference.
If an input port is declared mutating, the operator is allowed to modify tuples that arrive on that port. If an output port is declared mutating, the operator allows mutation to tuples it submits on that port. In other words, input port mutability determines what the operator itself can do, whereas output port mutability determines what can happen downstream from the operator. If an input port is non-mutating, and the logic clause attempts to modify the incoming tuple, the compiler raises an error. With the Custom operator, both input ports and output ports are mutable, but the SPL compiler performs a special transformation in the Custom operator. For any tuple that is referenced after it is submitted, it creates a copy for the submit, so the Custom operator will see an unmodified tuple after the submit.