Composite operators

Try out non-main composite operators to encapsulate a stream subgraph and improve code reuse and readability.

In this tutorial, you will:

  1. Write a composite operator within a namespace that:
    1. Declares a type parameter
    2. Compares consecutive tuples attribute value(s)
    3. Outputs tuples if the values of the attribute(s) do not match
  2. Write a stream application that:
    1. Defines a tuple type to pass to the composite operator invocation
    2. Generates tuples
    3. Invokes the composite operator
    4. Prints tuples from the generated stream and composite operator output stream
  3. Compile the application using the Streams Compiler (sc)
  4. Run the application

Before you begin

Procedure

  1. Set up your environment for Streams.

    In your command terminal, source the streamsprofile.sh file under Streams installation directory. For example:

    source streams-install-directory/bin/streamsprofile.sh
  2. Create a directory called compositeop with a my.util directory inside of it.

    For example, use the following commands:

    mkdir -p compositeop/my.util
    cd compositeop
    
  3. Using your text editor, create a file named Uniq.spl in the compositeop/my.util directory.
  4. Paste the following code into your file and save it:
    namespace my.util;
    
    public composite Uniq(output Out; input In) {
      param
        type $key;
      graph
        stream<In> Out = Custom(In) {
          logic 
            state: {
              mutable boolean first = true;
              mutable $key prev;
            }
            onTuple In : {
              $key curr = ($key)In;
              if (first || prev != curr) {
                submit(In, Out);
                first = false;
                prev = curr;
              }
            }
        }
    }

    This SPL code does the following:

    1. Specifies that the following code is in namespace my.util.
    2. Declares a public composite operator called Uniq with one output port Out and one input port In.
      Note: Marking the operator as public allows the operator to be used from other namespaces.
    3. Declares a mandatory parameter $key which specifes an SPL type.
    4. Starts a data flow graph with the graph clause.
    5. Invokes a Custom operator reading from the composite input stream In and outputs to the composite output stream Out. Stream Out has the same schema as In. The invocation is configured to:
      1. Maintain state of a boolean first, initialized to true, and a value of the specified type inside of prev.
      2. Grab the attribute subset $key from stream In and assign it to variable curr.
      3. Check if first is true or prev is not equal to curr.
      4. If either is true, submit the entire tuple from In to Out, set first to false, and set prev to curr.

    A diagram of the subgraph can be seen below:

    Figure 1. Stream graph of the body of the Uniq operator


    Note: Composite operators can encapsulate more than one operator. Additionally, they can have as many input ports, output ports, and parameters as necessary.
  5. Using your text editor, create a file named Main.spl in the compositeop directory.
  6. Paste the following code into your file and save it:
    use my.util::Uniq;
    composite Main {
      graph
        stream<int32 i, int32 j> All = Beacon() {
          param  iterations   : 10u;
          output All          : i = ((int32) IterationCount() + 1),
                                j = ((int32) IterationCount() + 1) / 3;
        }
        stream<All> Some = Uniq(All) {
          param  key          : tuple<int32 j>;
        }
        () as PrintAll = Custom(All) {
          logic  onTuple All  : printString("All" + (rstring)All + "\n");
        }
        () as PrintSome = Custom(Some) {
          logic  onTuple Some : printString("Some" + (rstring)Some + "\n");
        }
    }

    This SPL code does the following:

    1. Specifies that our my.util::Uniq operator will be used.
    2. Declares a main composite operator called Main.
    3. Starts a data flow graph with the graph clause.
    4. Invokes a Beacon operator to produce a stream called All. The invocation is configured to:
      1. Produce 10 tuples
      2. Output into stream All the current iteration count into attribute i and the current iteration count divided by 3, rounded down to the nearest integer, into attribute j.
    5. Invokes the Uniq composite operator reading from stream All and outputing stream Some. The invocation is configured to compare values of attribute j.
    6. Invokes two Custom operators named PrintAll and PrintSome reading from stream All and stream Some respectively to print out the streams with qualifiers.
      Note: A stream like All, can be used as the input to multiple operators. When this is done, all tuples are duplicated once for each recipient.

    A data flow graph representing the application looks like this:

    Figure 2. Stream graph of the application


  7. Compile the code.

    In the compositeop directory, run the following command:

    sc -M Main
  8. Run the application.
    ./output/bin/standalone

    The program prints out the following:

    All {i=1,j=0}
    Some {i=1,j=0}
    All {i=2,j=0}
    All {i=3,j=1}
    Some {i=3,j=1}
    All {i=4,j=1}
    All {i=5,j=1}
    All {i=6,j=2}
    Some {i=6,j=2}
    All {i=7,j=2}
    All {i=8,j=2}
    All {i=9,j=3}
    Some {i=9,j=3}
    All {i=10,j=3}

    Lines starting with All have attribute i counting from 1 to 10 while attribute j is always i/3 rounded down to the nearest integer. Lines starting with Some show every third tuple because attribute j was used as the uniqueness key.

What to do next

Using provided operators and writing composite operators can make stream applications go a long way, developers may need to create primitive operators to fully customize logic. See the Primitive operator tutorial to learn how to create a primitive operator implemented in C++.