Composite operators

Just like user-defined types and functions, user-defined composite operators help code reuse and readability. A composite operator encapsulates a stream subgraph, which can then be used in different contexts.

Each of the examples so far has a main composite operator, which encapsulates the stream graph that forms the whole program. Main composite operators are self-contained in the sense that their stream graph has no output or input ports, and they have no mandatory parameters. Here, instead look at a composite operator that has both ports and parameters. The operator reads a stream from the input port In, and then removes duplicate consecutive lines, and then writes the result to a stream to output port Out. It calls the operator Uniq as a homage to the uniq utility that performs the same task. Internally, the Uniq operator uses a Custom operator to implement its function. Here is a diagram of the stream graph:

Figure 1. Stream graph of the body of the Uniq operator.
The graph of the Uniq composite operator has an input port that is called In, connected to an operator invocation of a Custom operator, which is connected to an output port called Out.

To make things more interesting, the Uniq operator has a parameter type $key, which is the type that contains the subset of attributes of the input tuple that are used to determine uniqueness. If two consecutive tuples are identical for these attributes, the second one is dropped even if it differs in some other attributes. The following code implements operator Uniq:

 namespace my.util;
 public composite Uniq(output Out; input In) {
   param
     type $key;
   graph
     stream<In> Out = Custom(In) {
       logic state      : {
                             mutable boolean first = true;
                             mutable $key prev;
                          }
             onTuple In : {
                             $key curr = ($key)In;
                             if (first || prev != curr) {
                               submit(In, Out);
                               first = false;
                               prev = curr;
                             }
                          }
   }
 }

Line 1, namespace my.util, specifies a namespace for the operator. That means that the operator's full name is really my.util::Uniq. Put the example source code in a file Uniq/my.util/Uniq.spl. Line 2, public composite Uniq(output Out; input In), specifies that the operator is public, meaning it can be used from other namespaces; and that it has one output port Out and one input port In. Lines 3 and 4 declare the mandatory formal parameter $key, which is a type. Line 12, $key curr = ($key)In;, declares a local variable curr of type $key, and initializes it with the expression ($key)In, which takes the current tuple from input stream In and casts it to type $key, in other words, drops any attributes that are not relevant for the comparison with the previous tuple. You must consider one special case: for the very first tuple, there is no previous tuple, so you always treat it as unique.

Now that you defined your own operator my.util::Uniq, you need to test it. To test, generate a stream All of tuples that have some duplicates, and send them through the Uniq operator to get the stream Some of unique tuples. Print both All and Some so you can inspect whether the operator actually worked as expected. The stream graph for the test driver is:

Figure 2. Stream graph of the test driver for the Uniq operator.

The test driver for Uniq consists of a Beacon producing stream All; Uniq consuming All and producing Some; PrintAll consuming All; and PrintSome consuming Some.

As far as the driver is concerned, Uniq is just an ordinary operator, whose invocation can serve as a vertex in a stream graph just like any of the other operators. Note also that a single stream from a single output port, like All in the example, can be used as the input to multiple operators; in this case, all tuples are duplicated once for each recipient. The following code implements the test driver:

use my.util::Uniq;
composite Main {
  type
    KeyType = tuple<int32 j>;
  graph
    stream<int32 i, int32 j> All = Beacon() {
      param  iterations   : 10u;
      output All          : i = ((int32) IterationCount() + 1),
                            j = ((int32) IterationCount() + 1) / 3;
    }
    stream<All> Some = Uniq(All) {
      param  key          : KeyType;
    }
    () as PrintAll = Custom(All) {
      logic  onTuple All  : printString("All" + (rstring)All + "\n");
    }
    () as PrintSome = Custom(Some) {
      logic  onTuple Some : printString("Some" + (rstring)Some + "\n");
    }
}

Note how Lines 11-13 invoke the operator Uniq, passing an actual parameter param key : KeyType, which indicates that only attribute j is to be used in the uniqueness test. Put this code into a file Uniq/Main.spl, and run sc -M Main to compile it. Now run ./output/bin/standalone. You should see the following output:

All {i=1,j=0}
Some {i=1,j=0}
All {i=2,j=0}
All {i=3,j=1}
Some {i=3,j=1}
All {i=4,j=1}
All {i=5,j=1}
All {i=6,j=2}
Some {i=6,j=2}
All {i=7,j=2}
All {i=8,j=2}
All {i=9,j=3}
Some {i=9,j=3}
All {i=10,j=3}

If you look just at All lines, you see that the i attribute just counts up iterations from 1 to 10, while the j attribute is always i/3 rounded down to the nearest integer. Since type tuple<int32 j> is used as the uniqueness key, only every third tuple is considered unique, and therefore, Some lines show only every third tuple.

This section described how to define your own composite operators to encapsulate useful reusable function. The Beacon operator from the SPL standard toolkit can serve as a useful workload generator for testing. Test your own operators with test drivers like the one shown in this example. Besides helping you to iron out bugs during development, these drivers are also useful to keep around later for regression testing.

SPL composite operators are more powerful than this example illustrates. Composite operators can encapsulate not just a single operator, but a whole graph; they can have multiple output and input ports; and they can have more parameters, of different kinds besides types. For more information about composite operators, see Compiler reference.