Composite operators
Try out non-main composite operators to encapsulate a stream subgraph and improve code reuse and readability.
In this tutorial, you will:
- Write a composite operator within a namespace that:
- Declares a type parameter
- Compares consecutive tuples attribute value(s)
- Outputs tuples if the values of the attribute(s) do not match
- Write a stream application that:
- Defines a tuple type to pass to the composite operator invocation
- Generates tuples
- Invokes the composite operator
- Prints tuples from the generated stream and composite operator output stream
- Compile the application using the Streams Compiler
(
sc) - Run the application
Before you begin
- Download and install Teracloud® Streams
- Open a Bash command terminal
- Open a text editor
Procedure
-
Set up your environment for Streams.
In your command terminal, source the streamsprofile.sh file under Streams installation directory. For example:
source streams-install-directory/bin/streamsprofile.sh
-
Create a directory called compositeop with a
my.util directory inside of it.
For example, use the following commands:
mkdir -p compositeop/my.util cd compositeop
- Using your text editor, create a file named Uniq.spl in the compositeop/my.util directory.
-
Paste the following code into your file and save
it:
namespace my.util; public composite Uniq(output Out; input In) { param type $key; graph stream<In> Out = Custom(In) { logic state: { mutable boolean first = true; mutable $key prev; } onTuple In : { $key curr = ($key)In; if (first || prev != curr) { submit(In, Out); first = false; prev = curr; } } } }This SPL code does the following:
- Specifies that the following code is in
namespace
my.util. - Declares a public composite operator called
Uniqwith one output portOutand one input portIn.Note: Marking the operator aspublicallows the operator to be used from other namespaces. - Declares a mandatory parameter $key which specifes an SPL type.
- Starts a data flow graph with the
graphclause. - Invokes a
Customoperator reading from the composite input streamInand outputs to the composite output streamOut. StreamOuthas the same schema asIn. The invocation is configured to:- Maintain state of a boolean
first, initialized totrue, and a value of the specified type inside ofprev. - Grab the attribute subset
$keyfrom streamInand assign it to variablecurr. - Check if
firstistrueorprevis not equal tocurr. - If either is true, submit the entire tuple from
IntoOut, setfirsttofalse, and setprevtocurr.
- Maintain state of a boolean
A diagram of the subgraph can be seen below:
Figure 1. Stream graph of the body of the Uniq operator 
Note: Composite operators can encapsulate more than one operator. Additionally, they can have as many input ports, output ports, and parameters as necessary. - Specifies that the following code is in
namespace
- Using your text editor, create a file named Main.spl in the compositeop directory.
-
Paste the following code into your file and save
it:
use my.util::Uniq; composite Main { graph stream<int32 i, int32 j> All = Beacon() { param iterations : 10u; output All : i = ((int32) IterationCount() + 1), j = ((int32) IterationCount() + 1) / 3; } stream<All> Some = Uniq(All) { param key : tuple<int32 j>; } () as PrintAll = Custom(All) { logic onTuple All : printString("All" + (rstring)All + "\n"); } () as PrintSome = Custom(Some) { logic onTuple Some : printString("Some" + (rstring)Some + "\n"); } }This SPL code does the following:
- Specifies that our
my.util::Uniqoperator will be used. - Declares a main composite operator called
Main. - Starts a data flow graph with the
graphclause. - Invokes a
Beaconoperator to produce a stream calledAll. The invocation is configured to:- Produce 10 tuples
- Output into stream
Allthe current iteration count into attributeiand the current iteration count divided by 3, rounded down to the nearest integer, into attributej.
- Invokes the
Uniqcomposite operator reading from streamAlland outputing streamSome. The invocation is configured to compare values of attributej. - Invokes two
Customoperators namedPrintAllandPrintSomereading from streamAlland streamSomerespectively to print out the streams with qualifiers.Note: A stream likeAll, can be used as the input to multiple operators. When this is done, all tuples are duplicated once for each recipient.
A data flow graph representing the application looks like this:
Figure 2. Stream graph of the application 
- Specifies that our
-
Compile the code.
In the
compositeopdirectory, run the following command:sc -M Main
-
Run the application.
./output/bin/standalone
The program prints out the following:
All {i=1,j=0} Some {i=1,j=0} All {i=2,j=0} All {i=3,j=1} Some {i=3,j=1} All {i=4,j=1} All {i=5,j=1} All {i=6,j=2} Some {i=6,j=2} All {i=7,j=2} All {i=8,j=2} All {i=9,j=3} Some {i=9,j=3} All {i=10,j=3}Lines starting with
Allhave attributeicounting from 1 to 10 while attributejis alwaysi/3 rounded down to the nearest integer. Lines starting withSomeshow every third tuple because attributejwas used as the uniqueness key.