Composite operators
Try out non-main composite operators to encapsulate a stream subgraph and improve code reuse and readability.
In this tutorial, you will:
- Write a composite operator within a namespace that:
- Declares a type parameter
- Compares consecutive tuples attribute value(s)
- Outputs tuples if the values of the attribute(s) do not match
- Write a stream application that:
- Defines a tuple type to pass to the composite operator invocation
- Generates tuples
- Invokes the composite operator
- Prints tuples from the generated stream and composite operator output stream
- Compile the application using the Streams Compiler
(
sc
) - Run the application
Before you begin
- Download and install Teracloud® Streams
- Open a Bash command terminal
- Open a text editor
Procedure
-
Set up your environment for Streams.
In your command terminal, source the streamsprofile.sh file under Streams installation directory. For example:
source streams-install-directory/bin/streamsprofile.sh
-
Create a directory called compositeop with a
my.util directory inside of it.
For example, use the following commands:
mkdir -p compositeop/my.util cd compositeop
- Using your text editor, create a file named Uniq.spl in the compositeop/my.util directory.
-
Paste the following code into your file and save
it:
namespace my.util; public composite Uniq(output Out; input In) { param type $key; graph stream<In> Out = Custom(In) { logic state: { mutable boolean first = true; mutable $key prev; } onTuple In : { $key curr = ($key)In; if (first || prev != curr) { submit(In, Out); first = false; prev = curr; } } } }
This SPL code does the following:
- Specifies that the following code is in
namespace
my.util
. - Declares a public composite operator called
Uniq
with one output portOut
and one input portIn
.Note: Marking the operator aspublic
allows the operator to be used from other namespaces. - Declares a mandatory parameter $key which specifes an SPL type.
- Starts a data flow graph with the
graph
clause. - Invokes a
Custom
operator reading from the composite input streamIn
and outputs to the composite output streamOut
. StreamOut
has the same schema asIn
. The invocation is configured to:- Maintain state of a boolean
first
, initialized totrue
, and a value of the specified type inside ofprev
. - Grab the attribute subset
$key
from streamIn
and assign it to variablecurr
. - Check if
first
istrue
orprev
is not equal tocurr
. - If either is true, submit the entire tuple from
In
toOut
, setfirst
tofalse
, and setprev
tocurr
.
- Maintain state of a boolean
A diagram of the subgraph can be seen below:
Figure 1. Stream graph of the body of the Uniq operator
Note: Composite operators can encapsulate more than one operator. Additionally, they can have as many input ports, output ports, and parameters as necessary. - Specifies that the following code is in
namespace
- Using your text editor, create a file named Main.spl in the compositeop directory.
-
Paste the following code into your file and save
it:
use my.util::Uniq; composite Main { graph stream<int32 i, int32 j> All = Beacon() { param iterations : 10u; output All : i = ((int32) IterationCount() + 1), j = ((int32) IterationCount() + 1) / 3; } stream<All> Some = Uniq(All) { param key : tuple<int32 j>; } () as PrintAll = Custom(All) { logic onTuple All : printString("All" + (rstring)All + "\n"); } () as PrintSome = Custom(Some) { logic onTuple Some : printString("Some" + (rstring)Some + "\n"); } }
This SPL code does the following:
- Specifies that our
my.util::Uniq
operator will be used. - Declares a main composite operator called
Main
. - Starts a data flow graph with the
graph
clause. - Invokes a
Beacon
operator to produce a stream calledAll
. The invocation is configured to:- Produce 10 tuples
- Output into stream
All
the current iteration count into attributei
and the current iteration count divided by 3, rounded down to the nearest integer, into attributej
.
- Invokes the
Uniq
composite operator reading from streamAll
and outputing streamSome
. The invocation is configured to compare values of attributej
. - Invokes two
Custom
operators namedPrintAll
andPrintSome
reading from streamAll
and streamSome
respectively to print out the streams with qualifiers.Note: A stream likeAll
, can be used as the input to multiple operators. When this is done, all tuples are duplicated once for each recipient.
A data flow graph representing the application looks like this:
Figure 2. Stream graph of the application
- Specifies that our
-
Compile the code.
In the
compositeop
directory, run the following command:sc -M Main
-
Run the application.
./output/bin/standalone
The program prints out the following:
All {i=1,j=0} Some {i=1,j=0} All {i=2,j=0} All {i=3,j=1} Some {i=3,j=1} All {i=4,j=1} All {i=5,j=1} All {i=6,j=2} Some {i=6,j=2} All {i=7,j=2} All {i=8,j=2} All {i=9,j=3} Some {i=9,j=3} All {i=10,j=3}
Lines starting with
All
have attributei
counting from 1 to 10 while attributej
is alwaysi
/3 rounded down to the nearest integer. Lines starting withSome
show every third tuple because attributej
was used as the uniqueness key.