Combining the operators into a graph
A stream application that
is written in SPL code is a directed graph where streams of data (edges
in the graph) flow into and out of operators (nodes in the graph).
Operators perform analysis on the incoming data; and produce new
streams of data which then flow into subsequent operators. You can
use this basic pattern, input data --> operator --> output
data
, to create an assembly line of analytics where downstream
operators use data from upstream operators to perform analytics and
produce new data.
For this example, all the operators and their streams can be represented by the following graph.

There is no input stream for the FileSource operator
and no output stream for the Custom operator. Also,
the output stream of the FileSource operator (All
transactions
) is the input stream of the Filter operator;
and that the output stream of the Filter operator
(IBM transactions
) is the input stream of the Custom operator.
To complete the stream application,
you need to enclose the graph in a composite operator. A composite
operator contains global type definitions; and a graph of streams
and operators. The general structure of the stream application is
shown. Global definitions are declared under the type
clause.
Operators and their streams are declared under the graph
clause.
composite Name {
type
global_definitions
graph
OutputStream1 = Operator1()
OutputStream2 = Operator2(OutputStream1)
OutputStream3 = Operator3(OutputStream2)
. . .
OutputStreamN = OperatorN(OutputStreamN-1)
}
Given this general structure, the SPL code for this simple stream application is shown.
composite MyFilterApp {
type
TransactionRecord = rstring ticker,
rstring date,
rstring time,
decimal64 price;
graph
stream<TransactionRecord> AllTransactions = FileSource() {
param
file : "StockTrades.csv";
format : csv;
}
stream<TransactionRecord> IBMTransactions = Filter(AllTransactions) {
param
filter : ticker == "IBM";
}
() as Sink = Custom(IBMTransactions) {
logic
state : {
mutable decimal64 highest = 0.0d, lowest = 999.99d;
}
onTuple IBMTransactions : {
if (price > highest)
highest = price;
if (price < lowest)
lowest = price;
}
onPunct IBMTransactions : {
if (currentPunct() == Sys.FinalMarker) {
printStringLn("highest = " + (rstring)highest);
printStringLn("lowest = " + (rstring)lowest);
}
}
}
}