Primitive operators

SPL also has primitive operators, which encapsulate code in a native language.

Recall that an operator is a reusable stream transformer, and a composite operator encapsulates a stream graph. If all operators were composite, there would be a chicken-and-egg problem; therefore, SPL also has primitive operators, which encapsulate code in a native language. The language is usually a more traditional language such as Java or C++. This section describes a primitive operator RoundRobinSplit in C++, but Teracloud® Streams also provides the ability for you to write primitive operators in Java. If you are not a C++ programmer, or if you anticipate that you will mostly use operators from the SPL standard toolkit or other toolkits, you can skip this information. First, here is an example of invoking RoundRobinSplit from SPL code in the following stream graph:

Figure 1. Stream graph of the test driver for the RoundRobinSplit operator.

The test driver for RoundRobinSplit consists of a Beacon producing stream Input; a RoundRobinSplit consuming Input and producing A0 and A1; a Functor consuming A0 and producing B0; another Functor consuming A1 and producing B1; a Pair consuming B0 and B1 and producing Output; and a Writer consuming Output.

Graphs like this example are called split-joins, and are a common cause of non-determinism in streaming applications, because data might be processed at different speeds along the different paths. However, some applications require deterministic behavior, which is also useful for testing purposes. The new RoundRobinSplit operator, together with the Pair operator from the standard library, provides a simple way to implement a deterministic split-join without giving up much of the performance advantage that is afforded by the parallelism in the middle portion of the stream graph. Specifically, RoundRobinSplit deterministically alternates between sending data to each of its output ports, and Pair deterministically alternates between receiving data from each of its input ports. Here is the code for this stream graph:
use my.util::RoundRobinSplit;
composite Main {
  graph
    stream<int32 count> Input = Beacon() {
      logic  state      : mutable int32 n = 0;
      param  iterations : 10u;
      output Input      : count = n++;
    }
    (stream<int32 count> A0; stream<int32 count> A1) = RoundRobinSplit(Input) {
      param  batch      : 2u;
    }
    stream<int32 count, int32 path> B0 = Functor(A0) {
      output B0         : path = 0;
    }
    stream<int32 count, int32 path> B1 = Functor(A1) {
      output B1         : path = 1;
    }
    stream<int32 count, int32 path> Output = Pair(B0; B1) {}
    () as Writer = FileSink(Output) {
      param  file       : "/dev/stdout";
             flush      : 1u;
    }
}

Line 9, (stream<int32 count> A0; stream<int32 count> A1) = RoundRobinSplit(Input), invokes operator RoundRobinSplit to produce two output streams A0 and A1. The operator takes a parameter param batch : 2u that indicates that it alternates after every two tuples. Line 18 invokes operator Pair on two input streams B0 and B1, with the code stream<int32 count, int32 path> Output = Pair(B0; B1). For now, put this code into a file RoundRobinSplit/Main.spl. However, do not try to compile it yet; first you must implement the operator RoundRobinSplit.

Create a directory RoundRobinSplit/my.util/RoundRobinSplit, and change into that directory. Now, run spl-make-operator --kind c++. That command generates several skeleton files for you, including an operator model RoundRobinSplit.xml and two code generation templates (.cgt files), one for a header file RoundRobinSplit_h.cgt and one for a C++ implementation file RoundRobinSplit_cpp.cgt. When you write more sophisticated primitive operators, you often must edit the XML operator model, but in this case, the operator is simple enough so you do not need to change the operator model at all. Open the header file code generation template RoundRobinSplit_h.cgt. You see a class definition with several method declarations. Remove most methods except for the constructor and process(Tuple & tuple, uint32_t port). Add two instance fields Mutex _mutex and uint32_t _count. You end up with the following code in RoundRobinSplit_h.cgt:
    #pragma SPL_NON_GENERIC_OPERATOR_HEADER_PROLOGUE
    class MY_OPERATOR : public MY_BASE_OPERATOR {
      public:
        MY_OPERATOR();
        void process(Tuple & tuple, uint32_t port);
      private:
        Mutex _mutex;
        uint32_t _count;
    };
    #pragma SPL_NON_GENERIC_OPERATOR_HEADER_EPILOGUE
Next, open the C++ implementation file code generation template RoundRobinSplit_cpp.cgt. Remove most methods except for the constructor and process(Tuple & tuple, uint32_t port). Implement these methods as shown in the following listing of RoundRobinSplit_cpp.cgt:
    #pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_PROLOGUE
    MY_OPERATOR::MY_OPERATOR() : _count(0) {}
    void MY_OPERATOR::process(Tuple & tuple, uint32_t port) {
      uint32_t const nOutputs = getNumberOfOutputPorts();
      uint32_t const batchSize = getParameter("batch");
      AutoPortMutex apm(_mutex, *this);
      uint32 outputPort = (_count / batchSize) % nOutputs;
      _count = (_count + 1) % (batchSize * nOutputs);
      assert(outputPort < nOutputs);
      submit(tuple, outputPort);
    }
    #pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_EPILOGUE

The constructor just initializes the _count instance variable to zero. The process method queries the runtime APIs for the number of output ports (Line 4) and the batch size parameter (Line 5); acquires the mutex to guard against concurrent manipulation of the _count instance variable (Line 6); determines the output port (Line 7), updates _count (Line 8), and submits the input tuple to the appropriate output port (Line 10). The mutex is necessary because without it, if there are two threads T1 and T2 , then T1 's invocation of process might be interrupted in the middle of Line 8, after it reads the old value of _count but before it writes the new value; then T2 might call process and update _count; and finally, T1 might resume and overwrite T2 's update to _count.

Now, you are finally ready to compile the application. Change to the RoundRobinSplit directory and run the SPL compiler with sc -M Main. The SPL compiler invokes the C++ compiler to compile the instance of the RoundRobinSplit operator: the sources are the files A0.cpp and A0.h in directory RoundRobinSplit/output/src/operator, and the object file is RoundRobinSplit/output/build/operator/A0.o. Run the application by changing to directory RoundRobinSplit and running ./output/bin/standalone. You get the following output:
0,0
2,1
1,0
3,1
4,0
6,1
5,0
7,1

Each line shows the count and path attributes that are separated by a comma. Because the split uses a batch size of two but the join uses a batch size of one, the counts (left column) have a progression of 0,2,1,3,4,6,5,7 whereas the paths (right column) just alternate between 0,1,0,1,0,1,0,1. This output is deterministically repeatable, independent of the processing speed of the two paths.

It is instructional to introduce an error in the C++ code to see what happens. If you change the call on Line 10 of RoundRobinSplit_cpp.cgt to submit(outputPort, tuple), the C++ compiler reports an error message with the correct file name and line number:
my.util/RoundRobinSplit/RoundRobinSplit_cpp.cgt:10: error:
  no matching function for call to 'SPL::_Operator::A0::submit(SPL::uint32&, SPL::Tuple&)'
note: candidates are: virtual void SPL::Operator::submit(SPL::Tuple&, uint32_t)
note:                 virtual void SPL::Operator::submit(const SPL::Tuple&, uint32_t)
note:                 void SPL::Operator::submit(const SPL::Punctuation&, uint32_t)
     

This section barely scratched the surface of developing primitive operators in SPL. There is a rich API for generating specialized code for performance, and for compile-time error checking on things like the number and types of ports.