Primitive operators
Try out creating a primitive operator written in C++ to implement logic in a traditional language.
In this tutorial, you will:
- Use the
spl-make-operatortool to generate skeleton C++ primitive operator files - Modify the C++ header file to declare class variables
- Modify the C++ implementation file to round robin tuples across output ports when an input tuple is received
- Write a stream application that:
- Generates tuples
- Splits tuples across two output ports; round robining them
- Label tuples on the the two different streams with a path ID
- Join the two streams into one
- Write the received tuples to the console
- Compile the application using the Streams Compiler (
sc) - Run the application
Before you begin
- Download and install Teracloud® Streams
- Open a Bash command terminal
- Open a text editor
Procedure
-
Set up your environment for Streams.
In your command terminal, source the streamsprofile.sh file under Streams installation directory. For example:
source streams-install-directory/bin/streamsprofile.sh -
Create a directory called primitiveop with
my.util/RoundRobinSplit directories inside of it.
For example, use the following command:
mkdir -p primitiveop/my.util/RoundRobinSplit -
Enter the RoundRobinSplit directory.
cd primitiveop/my.util/RoundRobinSplit -
Create the C++ primitive operator skeleton files.
spl-make-operator -s --kind c++The command creates several files including RoundRobinSplit_h.cgt and RoundRobinSplit_cpp.cgt, which are C++ header and implementation code generation templates (cgt), and a RoundRobinSplit.xml which defines the operator model (name, parameters, etc.).
- Using your text editor, edit the RoundRobinSplit_h.cgt file in the RoundRobinSplit directory.
-
Paste the following code into your file to fully replace the existing content
and save it:
#pragma SPL_NON_GENERIC_OPERATOR_HEADER_PROLOGUE class MY_OPERATOR : public MY_BASE_OPERATOR { public: MY_OPERATOR(); void process(Tuple & tuple, uint32_t port); private: Mutex _mutex; uint32_t _count; }; #pragma SPL_NON_GENERIC_OPERATOR_HEADER_EPILOGUEThis C++ code does the following:
- Declares a compiler pragma to ensure that any errors reported during the compilation of the generated code references the cgt, rather than the generated code.
- Define a class
MY_OPERATORthat extends theMY_BASE_OPERATORclass. - Declare a constructor and
processmethod. - Keep two instance variables for mutex locking and a count.
- Using your text editor, edit the RoundRobinSplit_cpp.cgt file in the RoundRobinSplit directory.
-
Paste the following code into your file to fully replace the existing content
and save it:
#pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_PROLOGUE MY_OPERATOR::MY_OPERATOR() : _count(0) {} void MY_OPERATOR::process(Tuple & tuple, uint32_t port) { uint32_t const nOutputs = getNumberOfOutputPorts(); uint32_t const batchSize = getParameter("batch"); AutoPortMutex apm(_mutex, *this); uint32 outputPort = (_count / batchSize) % nOutputs; _count = (_count + 1) % (batchSize * nOutputs); assert(outputPort < nOutputs); submit(tuple, outputPort); } #pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_EPILOGUEThis C++ code does the following:
- Declare a compiler pragma to ensure that any errors reported during the compilation of the generated code references the cgt, rather than the generated code.
- Defines the
MY_OPERATORconstructor, initializing the_countvariable to 0. - Defines the
processmethod which:- Queries the runtime APIs for the number of output ports.
- Obtains the value of the operator parameter batch.
- Constructs a
AutoPortMutexobject using the mutex and current instance to guard against concurrent manipulation of the_countinstance variable. - Determines the output port to send the current tuple.
- Increments the count.
- Asserts that the calculated output port is not greater than the number of output ports present.
- Submits the tuple to the calculated output port.
- Using your text editor, create a file named Main.spl in the primitiveop directory.
-
Paste the following code into your file and save it:
use my.util::RoundRobinSplit; composite Main { graph stream<int32 count> Input = Beacon() { param iterations : 10u; output Input : count = (int32) IterationCount(); } (stream<int32 count> A0; stream<int32 count> A1) = RoundRobinSplit(Input) { param batch : 2u; } stream<int32 count, int32 path> B0 = Functor(A0) { output B0 : path = 0; } stream<int32 count, int32 path> B1 = Functor(A1) { output B1 : path = 1; } stream<int32 count, int32 path> Output = Pair(B0; B1) {} () as Writer = FileSink(Output) { param file : "/dev/stdout"; flush : 1u; } }This SPL code does the following:
- Specifies that our
my.util::RoundRobinSplitoperator will be used. - Declares a main composite operator called
Main. - Starts a data flow graph with the
graphclause. - Invokes a
Beaconoperator to produce a stream calledAll. The invocation is configured to:- Produce 10 tuples
- Output into stream
Inputthe current iteration count
- Invokes our
RoundRobinSplitprimitive operator reading from streamInputand outputing two streams:A0andA1. The invocation is configured to send two tuples to one stream before sending to the next stream. - Invokes two
Functoroperators reading fromA0andA1respectively to add a path ID attribute to the tuples and output to a respective streamB0andB1. - Invokes a
Pairoperator reading the twoB0andB1streams on separate input ports. The invocation has no configuration, but thePairoperator will send one tuple from each input port to the streamOutputonce a tuple has arrived on each input port. - Invokes a
FileSinkoperator namedWriter.Writeris configured to write to standard out and flush output every tuple received.
A data flow graph representing the application looks like this:
Figure 1. Stream graph of the application 
- Specifies that our
-
Compile the code.
In the primitiveop directory, run the following command:
sc -M MainThe SPL compiler invokes the C++ compiler to compile the invocation of the
RoundRobinSplitoperator. The generated source files can be seen in A0.cpp and A0.h under the output/src/operator directory. -
Run the application:
./output/bin/standaloneThe program prints out the following:
0,0 2,1 1,0 3,1 4,0 6,1 5,0 7,1Each line shows the
countandpathattribute values separated by a comma. While theRoundRobinSplitsend two tuples to a stream at a time, thePairinvocation only let one tuple through from each stream hence why the paths column alternates between 0 and 1. This output is deterministically repeatable, independent of the processing speed of the two paths.
What to do next
There are many more facets of SPL to explore including:
- Other operators and functions in the SPL standard toolkit
- Writing functions in a native language
- Configs
- Java and Python primitive operators
- Streams debugger
- Dynamic stream application
See the Exercises page for more information to learn and practice with SPL.