Primitive operators

Try out creating a primitive operator written in C++ to implement logic in a traditional language.

In this tutorial, you will:

  1. Use the spl-make-operator tool to generate skeleton C++ primitive operator files
  2. Modify the C++ header file to declare class variables
  3. Modify the C++ implementation file to round robin tuples across output ports when an input tuple is received
  4. Write a stream application that:
    1. Generates tuples
    2. Splits tuples across two output ports; round robining them
    3. Label tuples on the the two different streams with a path ID
    4. Join the two streams into one
    5. Write the received tuples to the console
  5. Compile the application using the Streams Compiler (sc)
  6. Run the application

Before you begin

Procedure

  1. Set up your environment for Streams.

    In your command terminal, source the streamsprofile.sh file under Streams installation directory. For example:

    source streams-install-directory/bin/streamsprofile.sh
  2. Create a directory called primitiveop with my.util/RoundRobinSplit directories inside of it.

    For example, use the following command:

    mkdir -p primitiveop/my.util/RoundRobinSplit
  3. Enter the RoundRobinSplit directory.
    cd primitiveop/my.util/RoundRobinSplit
  4. Create the C++ primitive operator skeleton files.
    spl-make-operator -s --kind c++

    The command creates several files including RoundRobinSplit_h.cgt and RoundRobinSplit_cpp.cgt, which are C++ header and implementation code generation templates (cgt), and a RoundRobinSplit.xml which defines the operator model (name, parameters, etc.).

  5. Using your text editor, edit the RoundRobinSplit_h.cgt file in the RoundRobinSplit directory.
  6. Paste the following code into your file to fully replace the existing content and save it:
    #pragma SPL_NON_GENERIC_OPERATOR_HEADER_PROLOGUE
    class MY_OPERATOR : public MY_BASE_OPERATOR {
      public:
        MY_OPERATOR();
        void process(Tuple & tuple, uint32_t port);
      private:
        Mutex _mutex;
        uint32_t _count;
    };
    #pragma SPL_NON_GENERIC_OPERATOR_HEADER_EPILOGUE

    This C++ code does the following:

    1. Declares a compiler pragma to ensure that any errors reported during the compilation of the generated code references the cgt, rather than the generated code.
    2. Define a class MY_OPERATOR that extends the MY_BASE_OPERATOR class.
    3. Declare a constructor and process method.
    4. Keep two instance variables for mutex locking and a count.
  7. Using your text editor, edit the RoundRobinSplit_cpp.cgt file in the RoundRobinSplit directory.
  8. Paste the following code into your file to fully replace the existing content and save it:
    #pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_PROLOGUE
    MY_OPERATOR::MY_OPERATOR() : _count(0) {}
    void MY_OPERATOR::process(Tuple & tuple, uint32_t port) {
      uint32_t const nOutputs = getNumberOfOutputPorts();
      uint32_t const batchSize = getParameter("batch");
      AutoPortMutex apm(_mutex, *this);
      uint32 outputPort = (_count / batchSize) % nOutputs;
      _count = (_count + 1) % (batchSize * nOutputs);
      assert(outputPort < nOutputs);
      submit(tuple, outputPort);
    }
    #pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_EPILOGUE

    This C++ code does the following:

    1. Declare a compiler pragma to ensure that any errors reported during the compilation of the generated code references the cgt, rather than the generated code.
    2. Defines the MY_OPERATOR constructor, initializing the _count variable to 0.
    3. Defines the process method which:
      1. Queries the runtime APIs for the number of output ports.
      2. Obtains the value of the operator parameter batch.
      3. Constructs a AutoPortMutex object using the mutex and current instance to guard against concurrent manipulation of the _count instance variable.
      4. Determines the output port to send the current tuple.
      5. Increments the count.
      6. Asserts that the calculated output port is not greater than the number of output ports present.
      7. Submits the tuple to the calculated output port.
  9. Using your text editor, create a file named Main.spl in the primitiveop directory.
  10. Paste the following code into your file and save it:
    use my.util::RoundRobinSplit;
    composite Main {
      graph
        stream<int32 count> Input = Beacon() {
          param  iterations : 10u;
          output Input      : count = (int32) IterationCount();
        }
        (stream<int32 count> A0; stream<int32 count> A1) = RoundRobinSplit(Input) {
          param  batch      : 2u;
        }
        stream<int32 count, int32 path> B0 = Functor(A0) {
          output B0         : path = 0;
        }
        stream<int32 count, int32 path> B1 = Functor(A1) {
          output B1         : path = 1;
        }
        stream<int32 count, int32 path> Output = Pair(B0; B1) {}
    
        () as Writer = FileSink(Output) {
         param  file       : "/dev/stdout";
                 flush      : 1u;
        }
    }

    This SPL code does the following:

    1. Specifies that our my.util::RoundRobinSplit operator will be used.
    2. Declares a main composite operator called Main.
    3. Starts a data flow graph with the graph clause.
    4. Invokes a Beacon operator to produce a stream called All. The invocation is configured to:
      1. Produce 10 tuples
      2. Output into stream Input the current iteration count
    5. Invokes our RoundRobinSplit primitive operator reading from stream Input and outputing two streams: A0 and A1. The invocation is configured to send two tuples to one stream before sending to the next stream.
    6. Invokes two Functor operators reading from A0 and A1 respectively to add a path ID attribute to the tuples and output to a respective stream B0 and B1.
    7. Invokes a Pair operator reading the two B0 and B1 streams on separate input ports. The invocation has no configuration, but the Pair operator will send one tuple from each input port to the stream Output once a tuple has arrived on each input port.
    8. Invokes a FileSink operator named Writer. Writer is configured to write to standard out and flush output every tuple received.

    A data flow graph representing the application looks like this:

    Figure 1. Stream graph of the application


  11. Compile the code.

    In the primitiveop directory, run the following command:

    sc -M Main

    The SPL compiler invokes the C++ compiler to compile the invocation of the RoundRobinSplit operator. The generated source files can be seen in A0.cpp and A0.h under the output/src/operator directory.

  12. Run the application:
    ./output/bin/standalone

    The program prints out the following:

    0,0
    2,1
    1,0
    3,1
    4,0
    6,1
    5,0
    7,1

    Each line shows the count and path attribute values separated by a comma. While the RoundRobinSplit send two tuples to a stream at a time, the Pair invocation only let one tuple through from each stream hence why the paths column alternates between 0 and 1. This output is deterministically repeatable, independent of the processing speed of the two paths.

What to do next

There are many more facets of SPL to explore including:

  • Other operators and functions in the SPL standard toolkit
  • Writing functions in a native language
  • Configs
  • Java and Python primitive operators
  • Streams debugger
  • Dynamic stream application

See the Exercises page for more information to learn and practice with SPL.