Primitive operators
SPL also has primitive operators, which encapsulate code in a native language.
Recall that an operator is a reusable stream transformer, and a composite operator encapsulates a stream graph. If all operators were composite, there would be a chicken-and-egg problem; therefore, SPL also has primitive operators, which encapsulate code in a native language. The language is usually a more traditional language such as Java™ or C++. This section describes a primitive operator RoundRobinSplit in C++, but Teracloud® Streams also provides the ability for you to write primitive operators in Java™. If you are not a C++ programmer, or if you anticipate that you will mostly use operators from the SPL standard toolkit or other toolkits, you can skip this information. First, here is an example of invoking RoundRobinSplit from SPL code in the following stream graph:

use my.util::RoundRobinSplit;
composite Main {
graph
stream<int32 count> Input = Beacon() {
logic state : mutable int32 n = 0;
param iterations : 10u;
output Input : count = n++;
}
(stream<int32 count> A0; stream<int32 count> A1) = RoundRobinSplit(Input) {
param batch : 2u;
}
stream<int32 count, int32 path> B0 = Functor(A0) {
output B0 : path = 0;
}
stream<int32 count, int32 path> B1 = Functor(A1) {
output B1 : path = 1;
}
stream<int32 count, int32 path> Output = Pair(B0; B1) {}
() as Writer = FileSink(Output) {
param file : "/dev/stdout";
flush : 1u;
}
}
Line 9, (stream<int32 count> A0; stream<int32
count> A1) = RoundRobinSplit(Input)
, invokes operator RoundRobinSplit to
produce two output streams A0
and A1
.
The operator takes a parameter param batch : 2u
that
indicates that it alternates after every two tuples. Line 18 invokes
operator Pair on two input streams B0
and B1
,
with the code stream<int32 count, int32 path> Output =
Pair(B0; B1)
. For now, put this code into a file RoundRobinSplit/Main.spl.
However, do not try to compile it yet; first you must implement the
operator RoundRobinSplit.
spl-make-operator --kind c++
. That command
generates several skeleton files for you, including an operator model
RoundRobinSplit.xml and two code generation templates
(.cgt files), one for a header file
RoundRobinSplit_h.cgt and one for a C++ implementation file
RoundRobinSplit_cpp.cgt. When you write more sophisticated primitive
operators, you often must edit the XML operator model, but in this case, the operator is
simple enough so you do not need to change the operator model at all. Open the header file
code generation template RoundRobinSplit_h.cgt. You see a class
definition with several method declarations. Remove most methods except for the constructor
and process(Tuple & tuple, uint32_t port)
. Add two instance fields
Mutex _mutex
and uint32_t _count
. You end up with the
following code in RoundRobinSplit_h.cgt:
#pragma SPL_NON_GENERIC_OPERATOR_HEADER_PROLOGUE
class MY_OPERATOR : public MY_BASE_OPERATOR {
public:
MY_OPERATOR();
void process(Tuple & tuple, uint32_t port);
private:
Mutex _mutex;
uint32_t _count;
};
#pragma SPL_NON_GENERIC_OPERATOR_HEADER_EPILOGUE
process(Tuple &
tuple, uint32_t port)
. Implement these methods as shown in
the following listing of RoundRobinSplit_cpp.cgt:
#pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_PROLOGUE
MY_OPERATOR::MY_OPERATOR() : _count(0) {}
void MY_OPERATOR::process(Tuple & tuple, uint32_t port) {
uint32_t const nOutputs = getNumberOfOutputPorts();
uint32_t const batchSize = getParameter("batch");
AutoPortMutex apm(_mutex, *this);
uint32 outputPort = (_count / batchSize) % nOutputs;
_count = (_count + 1) % (batchSize * nOutputs);
assert(outputPort < nOutputs);
submit(tuple, outputPort);
}
#pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_EPILOGUE
The constructor just initializes the _count
instance
variable to zero. The process method queries the
runtime APIs for the number of output ports (Line 4) and the batch
size parameter (Line 5); acquires the mutex to guard against concurrent
manipulation of the _count
instance variable (Line
6); determines the output port (Line 7), updates _count
(Line
8), and submits the input tuple to the appropriate output port (Line 10). The
mutex is necessary because without it, if there are two threads T1 and T2 ,
then T1 's invocation of process might
be interrupted in the middle of Line 8, after it reads the old value
of _count
but before it writes the new value; then
T2 might call process and update _count
;
and finally, T1 might resume and overwrite T2 's
update to _count
.
sc -M
Main
. The SPL compiler invokes the C++ compiler to compile the instance of the
RoundRobinSplit operator: the sources are the files
A0.cpp and A0.h in directory
RoundRobinSplit/output/src/operator, and the object file is
RoundRobinSplit/output/build/operator/A0.o. Run the application by
changing to directory RoundRobinSplit and running
./output/bin/standalone
. You get the following
output:0,0
2,1
1,0
3,1
4,0
6,1
5,0
7,1
Each line shows the count
and path
attributes
that are separated by a comma. Because the split uses a batch size
of two but the join uses a batch size of one, the counts (left column)
have a progression of 0,2,1,3,4,6,5,7 whereas the paths (right column) just alternate
between 0,1,0,1,0,1,0,1. This output is deterministically repeatable,
independent of the processing speed of the two paths.
submit(outputPort, tuple)
, the C++ compiler reports an error message with
the correct file name and line
number:my.util/RoundRobinSplit/RoundRobinSplit_cpp.cgt:10: error:
no matching function for call to 'SPL::_Operator::A0::submit(SPL::uint32&, SPL::Tuple&)'
note: candidates are: virtual void SPL::Operator::submit(SPL::Tuple&, uint32_t)
note: virtual void SPL::Operator::submit(const SPL::Tuple&, uint32_t)
note: void SPL::Operator::submit(const SPL::Punctuation&, uint32_t)
This section barely scratched the surface of developing primitive operators in SPL. There is a rich API for generating specialized code for performance, and for compile-time error checking on things like the number and types of ports.