Primitive operators
Try out creating a primitive operator written in C++ to implement logic in a traditional language.
In this tutorial, you will:
- Use the
spl-make-operator
tool to generate skeleton C++ primitive operator files - Modify the C++ header file to declare class variables
- Modify the C++ implementation file to round robin tuples across output ports when an input tuple is received
- Write a stream application that:
- Generates tuples
- Splits tuples across two output ports; round robining them
- Label tuples on the the two different streams with a path ID
- Join the two streams into one
- Write the received tuples to the console
- Compile the application using the Streams Compiler (
sc
) - Run the application
Before you begin
- Download and install Teracloud® Streams
- Open a Bash command terminal
- Open a text editor
Procedure
-
Set up your environment for Streams.
In your command terminal, source the streamsprofile.sh file under Streams installation directory. For example:
source streams-install-directory/bin/streamsprofile.sh
-
Create a directory called primitiveop with
my.util/RoundRobinSplit directories inside of it.
For example, use the following command:
mkdir -p primitiveop/my.util/RoundRobinSplit
-
Enter the RoundRobinSplit directory.
cd primitiveop/my.util/RoundRobinSplit
-
Create the C++ primitive operator skeleton files.
spl-make-operator -s --kind c++
The command creates several files including RoundRobinSplit_h.cgt and RoundRobinSplit_cpp.cgt, which are C++ header and implementation code generation templates (cgt), and a RoundRobinSplit.xml which defines the operator model (name, parameters, etc.).
- Using your text editor, edit the RoundRobinSplit_h.cgt file in the RoundRobinSplit directory.
-
Paste the following code into your file to fully replace the existing content
and save it:
#pragma SPL_NON_GENERIC_OPERATOR_HEADER_PROLOGUE class MY_OPERATOR : public MY_BASE_OPERATOR { public: MY_OPERATOR(); void process(Tuple & tuple, uint32_t port); private: Mutex _mutex; uint32_t _count; }; #pragma SPL_NON_GENERIC_OPERATOR_HEADER_EPILOGUE
This C++ code does the following:
- Declares a compiler pragma to ensure that any errors reported during the compilation of the generated code references the cgt, rather than the generated code.
- Define a class
MY_OPERATOR
that extends theMY_BASE_OPERATOR
class. - Declare a constructor and
process
method. - Keep two instance variables for mutex locking and a count.
- Using your text editor, edit the RoundRobinSplit_cpp.cgt file in the RoundRobinSplit directory.
-
Paste the following code into your file to fully replace the existing content
and save it:
#pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_PROLOGUE MY_OPERATOR::MY_OPERATOR() : _count(0) {} void MY_OPERATOR::process(Tuple & tuple, uint32_t port) { uint32_t const nOutputs = getNumberOfOutputPorts(); uint32_t const batchSize = getParameter("batch"); AutoPortMutex apm(_mutex, *this); uint32 outputPort = (_count / batchSize) % nOutputs; _count = (_count + 1) % (batchSize * nOutputs); assert(outputPort < nOutputs); submit(tuple, outputPort); } #pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_EPILOGUE
This C++ code does the following:
- Declare a compiler pragma to ensure that any errors reported during the compilation of the generated code references the cgt, rather than the generated code.
- Defines the
MY_OPERATOR
constructor, initializing the_count
variable to 0. - Defines the
process
method which:- Queries the runtime APIs for the number of output ports.
- Obtains the value of the operator parameter batch.
- Constructs a
AutoPortMutex
object using the mutex and current instance to guard against concurrent manipulation of the_count
instance variable. - Determines the output port to send the current tuple.
- Increments the count.
- Asserts that the calculated output port is not greater than the number of output ports present.
- Submits the tuple to the calculated output port.
- Using your text editor, create a file named Main.spl in the primitiveop directory.
-
Paste the following code into your file and save it:
use my.util::RoundRobinSplit; composite Main { graph stream<int32 count> Input = Beacon() { param iterations : 10u; output Input : count = (int32) IterationCount(); } (stream<int32 count> A0; stream<int32 count> A1) = RoundRobinSplit(Input) { param batch : 2u; } stream<int32 count, int32 path> B0 = Functor(A0) { output B0 : path = 0; } stream<int32 count, int32 path> B1 = Functor(A1) { output B1 : path = 1; } stream<int32 count, int32 path> Output = Pair(B0; B1) {} () as Writer = FileSink(Output) { param file : "/dev/stdout"; flush : 1u; } }
This SPL code does the following:
- Specifies that our
my.util::RoundRobinSplit
operator will be used. - Declares a main composite operator called
Main
. - Starts a data flow graph with the
graph
clause. - Invokes a
Beacon
operator to produce a stream calledAll
. The invocation is configured to:- Produce 10 tuples
- Output into stream
Input
the current iteration count
- Invokes our
RoundRobinSplit
primitive operator reading from streamInput
and outputing two streams:A0
andA1
. The invocation is configured to send two tuples to one stream before sending to the next stream. - Invokes two
Functor
operators reading fromA0
andA1
respectively to add a path ID attribute to the tuples and output to a respective streamB0
andB1
. - Invokes a
Pair
operator reading the twoB0
andB1
streams on separate input ports. The invocation has no configuration, but thePair
operator will send one tuple from each input port to the streamOutput
once a tuple has arrived on each input port. - Invokes a
FileSink
operator namedWriter
.Writer
is configured to write to standard out and flush output every tuple received.
A data flow graph representing the application looks like this:
Figure 1. Stream graph of the application
- Specifies that our
-
Compile the code.
In the primitiveop directory, run the following command:
sc -M Main
The SPL compiler invokes the C++ compiler to compile the invocation of the
RoundRobinSplit
operator. The generated source files can be seen in A0.cpp and A0.h under the output/src/operator directory. -
Run the application:
./output/bin/standalone
The program prints out the following:
0,0 2,1 1,0 3,1 4,0 6,1 5,0 7,1
Each line shows the
count
andpath
attribute values separated by a comma. While theRoundRobinSplit
send two tuples to a stream at a time, thePair
invocation only let one tuple through from each stream hence why the paths column alternates between 0 and 1. This output is deterministically repeatable, independent of the processing speed of the two paths.
What to do next
There are many more facets of SPL to explore including:
- Other operators and functions in the SPL standard toolkit
- Writing functions in a native language
- Configs
- Java and Python primitive operators
- Streams debugger
- Dynamic stream application
See the Exercises page for more information to learn and practice with SPL.