Combining the operators into a graph

A stream application that is written in SPL code is a directed graph where streams of data (edges in the graph) flow into and out of operators (nodes in the graph). Operators perform analysis on the incoming data; and produce new streams of data which then flow into subsequent operators. You can use this basic pattern, input data --> operator --> output data, to create an assembly line of analytics where downstream operators use data from upstream operators to perform analytics and produce new data.

For this example, all the operators and their streams can be represented by the following graph.


This figure is described in the surrounding text.

There is no input stream for the FileSource operator and no output stream for the Custom operator. Also, the output stream of the FileSource operator (All transactions) is the input stream of the Filter operator; and that the output stream of the Filter operator (IBM transactions) is the input stream of the Custom operator.

To complete the stream application, you need to enclose the graph in a composite operator. A composite operator contains global type definitions; and a graph of streams and operators. The general structure of the stream application is shown. Global definitions are declared under the type clause. Operators and their streams are declared under the graph clause.

composite Name {
   type
      global_definitions

   graph
      OutputStream1 = Operator1()
      OutputStream2 = Operator2(OutputStream1)
      OutputStream3 = Operator3(OutputStream2)
         .   .   .
      OutputStreamN = OperatorN(OutputStreamN-1)
}

Given this general structure, the SPL code for this simple stream application is shown.

composite MyFilterApp {
   type
      TransactionRecord = rstring ticker,
                          rstring date,
                          rstring time,
                          decimal64 price;

   graph
      stream<TransactionRecord> AllTransactions = FileSource() {
         param
            file : "StockTrades.csv";
            format : csv;
      }
      stream<TransactionRecord> IBMTransactions = Filter(AllTransactions) {
         param
            filter : ticker == "IBM";
      }
      () as Sink = Custom(IBMTransactions) {
         logic
            state : {
               mutable decimal64 highest = 0.0d, lowest = 999.99d;
            }
            onTuple IBMTransactions : {
               if (price > highest)
                  highest = price;
               if (price < lowest)
                  lowest = price;
            }
            onPunct IBMTransactions : {
               if (currentPunct() == Sys.FinalMarker) {
                  printStringLn("highest = " + (rstring)highest);
                  printStringLn("lowest = " + (rstring)lowest);
               }
            }
      }
}