Implementing operators
An operator is implemented as a C++ class that extends
from the SPL runtime class SPL::Operator
.
- Constructor and destructor
- Port readiness notification:
allPortsReady
- Tuple processing:
process(Tuple)
- Punctuation processing:
process(Punctuation)
- Termination notification:
prepareToShutdown

An operator instance starts its runtime lifecycle with a call to
its constructor. The operator cannot receive or send tuples and punctuation
until its ports are ready. When the operator ports are ready, the
operator starts receiving calls to its tuple and punctuation processing
functions and can submit tuples accordingly. The allPortsReady
function
is called as a notification to the operator that its ports are ready
to receive and submit tuples. The operator's tuple and punctuation
processing functions can be invoked before the allPortsReady
notification
is received. If a tuple submission is to be made outside the context
of a tuple or punctuation processing function, then the allPortsReady
notification
must be received first. When the processing element (PE) that hosts
the operator is being shut down, the operator receives a call to its prepareToShutdown
function,
which can take place while tuple and punctuation processing functions
are active. Eventually, the destructor is called to free any resources
that are allocated by the operator.
Guidelines for Submitting and Processing Tuples and Punctuation
- All operators are constructed before any tuple/punctuation can be received.
- You cannot submit tuples or punctuation from a constructor because it causes a C++ exception.
- You can start submitting tuples and punctuation when the
allPortsReady
function is called. - You might receive tuples and punctuation before the
allPortsReady
function is called. - After a final punctuation is received on an input port, no more tuples or punctuation will be received from that input port
- After a final punctuation is submitted on an output port, all subsequent tuples and punctuation that are submitted to that port will be silently discarded.
Processing Input Tuples
void process(Tuple const & tuple, uint32_t port)
is the tuple processing function for input ports that are non-mutating. The tuple parameter represents the tuple that is received on the input port at indexport
. Port indexing is zero-based.void process(Tuple & tuple, uint32_t port)
is the tuple processing function for input ports that are mutating.void process(Punctuation const & punct, uint32_t port)
is the punctuation processing function for all input ports. Thepunct
parameter represents the punctuation that is received on the input port at indexport
.
An input port is either mutating or non-mutating, as specified
by the operator model. An input port that is declared as mutating in
the operator model indicates that the operator developer must implement
the process
function that takes a non-const tuple
reference for adding the logic for that port. It is allowed to modify
the tuple as part of the implementation. An input port that is declared
as non-mutating in the operator model indicates that
the operator developer must implement the process
function
that takes a const tuple reference for adding the logic for that port.
It is not allowed to modify the tuple as part of the implementation.
It is common that an operator implements only one of the two tuple
processing functions. However, in the general case, an operator can
have both mutating and non-mutating input ports, which would require
implementing both functions.
A tuple that is passed in as a
parameter to a process
function can be used during
the lifetime of the process
function call. Do not
store a pointer or a reference to the tuple for use in a context other
than the current process call. A tuple received from the process call
can be safely submitted when the submit call is performed in the context
of the process call. If the tuple is to be stored as part of the operator
state and made available across process calls, then make a copy.
Stateful Operators and Concurrency
Mutex
,
AutoMutex
, and AutoPortMutex
. Mutex
is a class
that wraps the functionality of a pthread
mutex. AutoMutex
is a
class that creates a critical section from a Mutex
object. It locks the mutex when
constructed and unlocks it when it goes out of scope (that is, when the object is destructed).
AutoPortMutex
is similar to AutoMutex
, but it reduces to an
untaken branch when the SPL run time knows that there cannot be concurrent calls to the process
functions of the operator. Whether the input process functions of an operator are called
concurrently or not depends on the operator fusion configuration that is employed, the properties of
the upstream operators, the transport options, and the existence of threaded ports.
As a result, at operator development time, it is not possible to tell
whether the operator's input process functions are called concurrently or not. It is important that
the developer always protects the state against concurrent accesses. The SPL run time makes sure
that the locking and unlocking cost is not paid when no concurrency is involved. For this
optimization to take place seamlessly, use the AutoPortMutex
class. An example use
is as follows:class MY_OPERATOR ... {
...
private:
Mutex mutex_;
};
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {
...
{ // mutex_.lock() is called upon entry if needed
AutoPortMutex am(mutex_, *this);
... // access state
} // mutex_.unlock() is called upon exit if needed
...
}
Source Operators and Operator Threads
So
far, implementation of operator logic that is triggered by calls to
process functions associated with the input ports is described. However, source operators
do not have input ports. Furthermore, some operators might prefer
to perform work that is not triggered by incoming tuples. For these
purposes, SPL's operator API provides an additional process
function
that is not associated with an input port. It is referred to as the non-input process
function.
void process(uint32_t idx)
is a process
function that runs in a thread of its own. The idx
parameter
is the thread index, local to the operator.
As implied by the
index argument of the non-input process function, there can be more
than one thread that runs it. These threads are created by calls to
the uint32_t createThreads(uint32_t num)
function.
This function creates num
number of threads. These
threads are referred to as operator threads. An operator
thread calls the non-input process function once, passing it its local
thread index.
With this functionality, a source operator might be implemented as follows:
void MY_OPERATOR::allPortsReady() {
createThreads(1);
}
void MY_OPERATOR::process(uint32_t) {
while(!getPE().getShutdownRequested()) {
...
}
}
In the example, the createThreads
function
is called from within the allPortsReady
function.
While it is not strictly necessary, it is good practice. If the non-input
process function contains code that submits tuples (which is very
common), delaying the creation of the thread ensures that no tuples
are submitted before the connections to the operators are set up.
It is not valid to submit tuples before allPortsReady
is
called. Another important point to note is the use of getPE().getShutdownRequested()
to
check for the shutdown status of the processing element that manages
this operator. If the non-input process function does not return upon
shutdown of the PE, the operator and the PE are terminated forcefully
when necessary, which is undesirable as the shutdown processing specified
by the operators is not run. See Blocking and shutdown handling for
further details.
Additional operator threads can be used to create multi-threaded operators. If the operator logic involves submitting tuples from more than one thread, care must be taken in specifying the threading behavior of the operator in the operator model.
The SPL language run time considers the execution of a source operator complete when all threads of the operator complete their execution. Since source operators do not have tuple or punctuation processing functions, the completion of the operator threads marks the end of the execution of the operator. The language run time ensures that final punctuation is sent on all output ports upon completion of the execution of the source operator. For more information about final punctuation, see Punctuation processing ). The runtime keeps track only of threads that are created with the SPL runtime APIs. If the operator relies on other threads, such as those created by third-party libraries, then an SPL operator thread must be created to wait on the external threads. Otherwise, the run time prematurely sends final punctuation on the output ports.
Similarly, non-source operators are considered complete when all operator threads are terminated and final punctuation is received on all input ports and is processed in full. In the general case, final punctuation is forwarded from the input ports to the output ports, automatically by the SPL language run time. However, if the operator model turns off auto-forwarding of final punctuation, then the operator developer is responsible for submitting final punctuation. The run time does not submit final punctuation on the output ports of the completed non-source operator.
Submitting Tuples and Punctuation
void submit(Tuple const & tuple, uint32_t port)
: Thetuple
parameter represents the tuple to be submitted on the output port at indexport
, where the indexing is zero-based.void submit(Tuple & tuple, uint32_t port)
is similar to the first function, but the tuple passed in can be modified as a result of the submit call.
For output ports that are non-mutating, both submit functions are valid and are equivalent in functionality. For mutating ports, only the non-const version is valid. For both versions, passing a tuple whose concrete type is not the same as the tuple type of the port that is specified results in a runtime error.
An output
port that is declared as mutating in the operator model indicates
that the operator developer must use the submit
function
that takes a non-const tuple reference for submitting tuples on that
port. Expect that the submit call modifies the tuple as part of its
processing. An output port that is declared as non-mutating in the
operator model indicates that the operator developer is guaranteed
that the submit call does not modify the tuple as part of its processing.
Thus it can use any one of the submit functions. The submit
function
that is used for a submit call is defined by the function resolution
rules of the C++ language. Here are the different cases:
Port Mutability | Tuple | Call site | C++ resolution | Validity |
---|---|---|---|---|
Mutating | Non-const, for example: Tuple &
t =... |
submit(t, 0) |
Non-const submit |
valid |
Mutating | Const, for example: Tuple const &
t =... |
submit(t, 0) |
Const submit |
invalid |
Non-mutating | Non-const, for example: Tuple &
t =... |
submit(t, 0) |
Non-const submit |
valid |
Non-mutating | Const, for example: Tuple const &
t =... |
submit(t, 0) |
Const submit |
valid |
As seen from Table 1, you need to pay close attention when you submit a const tuple to a mutating port. It is invalid and the SPL language run time throws an exception, resulting in a runtime error. The SPL compiler forces a C++ compilation error when the operator has no non-mutating output ports, by shadowing the non-const submit function in the generated operator code. This action results in an unresolved submit call, rather than a runtime error later on.
For punctuation, there is a single submit
function.
void
submit(Punctuation const & punct, uint32_t port)
: The punct
parameter
represents the punctuation to be submitted on the output port at index port
.
Port Mutability
The tuple mutation settings
for input and output ports are defined in the operator model. To summarize,
for input ports, setting the tupleMutationAllowed
property
to true
means declaring your intent to modify the
incoming tuples. For output ports, setting the tupleMutationAllowed
property
to true
means declaring your permission to let others
modify the submitted tuples. Mutability of ports do not restrict
ways in which operators can be connected to each other, as the SPL
language run time handles any incompatibilities by creating copies
as needed. The port mutability settings do have an impact on performance
in the presence of operator fusion. For instance, a mutating output
port can pass a tuple to a mutating input port under fusion, without
requiring any copies. Alternatively, in a fan-out configuration, a
tuple that is submitted on an output port can be passed in to multiple
non-mutating input ports, without creating any copies. The following
figure provides a general guideline for setting port mutability.


Accessing Parameters
For non-generic
operators (operators using the pragma that was introduced earlier),
parameters can be accessed in a type-safe manner using auto-generated
convenience functions, created when the SPL_NON_GENERIC_OPERATOR_HEADER_PROLOGUE
pragma
is expanded during operator generation. These member functions are
named as getParameter_<param-name>
and return
a value that has the same type as the parameter's first expression
value that is specified in the application's SPL code. In other words:
<param-type>
getParameter_<param-name>() const;
This helper function is generated only if the parameter's expressions are all attribute-free expressions (that is, does not involve stream tuples and attributes whose values are only known at run time). For information about accessing parameter values in a more generic way from within generic operators, see Implementing generic operators. As an example of parameter access for non-generic operators, consider the following SPL segment:
stream<...> MyStream = MyOperator(...) {
param size : 10;
}
The value for the parameter size
can be retrieved
as follows:
int32 size = getParameter_size();
// Return the set of all parameter names
const std::tr1::unordered_set<std::string>& getParameterNames() const
class ParameterValue {
public:
virtual bool isValue() = 0;
virtual bool isExpression() = 0;
virtual ConstValueHandle& getValue() = 0;
};
For each expression for a parameter, there is a corresponding ParameterValue
associated
with it. If the expressionMode
for the parameter
in the operator model is Expression
or Attribute
,
then isExpression()
returns true
,
and isValue()
returns false
. For
all other expressionMode
values, isValue()
returns true
and isExpression()
returns false
.
If isValue()
is true
, then getValue()
can
be called to return the value of the expression.
If allowAny
is true
for
the parameters in the operator model, and it is a parameter that is
not explicitly listed, then the compiler examines each expression.
If a stream attribute is present, the ParameterValue
for
that expression has isExpression() == true
. If no
stream attribute is referenced, isValue()
returns true
.
typedef std::vector<ParameterValue*> ParameterValueListType;
typedef std::tr1::unordered_map<std::string, ParameterValueListType> ParameterMapType;
// Return the vector of parameter values for a given parameter name
const ParameterValueListType& getParameterValues(std::string const & param) const
// Return the map of parameter names to parameter values
virtual ParameterMapType& getParameters()
The getParameterNames
function can
be used to check whether a parameter is present. This check enables
working with optional parameters, which are not possible with the
non-reflective parameter access APIs. The getParameterValues
function
is used to access the values of a parameter. The parameter name is
passed as a string
argument. Multiple parameter expression
values are available, and the type and value of each ParameterValue
with isValue()
== true
can be extracted with ConstValueHandle
member
functions. If param
is not a valid parameter name
for this operator instance, SPLRuntimeInvalidArgumentException
is
thrown.
int32 size = 0; // the default
if (getParameterNames().count("size")) {
Operator::ParameterValueListType& sizeValues = getParameterValues("size");
assert (sizeValues[0]->isValue());
size = sizeValues[0]->getValue(); // read the first expression value
}
The convenience functions hasParameter
and getParameter
are
deprecated, replace them with these newer routines for accessing parameters.
For detailed information about ConstValueHandle
and
the reflective type system, see Using the reflective type system.