Combining the operators into a graph
All the operators and their streams for this application can be represented by the following graph.

No input stream exists for the FileSource operator
and no output stream exists for the FileSink operator.
The output stream of the FileSource operator (BigDataType
)
is the input stream of the Functor operator; and
that the output stream of the Functor operator
(SmallDataType
) is the input stream of the FileSink operator.
To complete the stream application, you need to enclose the graph in a composite operator. For more information about composite operators, see the example in Combining the operators into a graph.
The SPL code for this simple Functor application is shown here.
composite MyFunctorApp {
type
BigDataType = rstring ticker, rstring date, rstring time, int32 gmtOffset,
rstring ttype, rstring exCntrbID, decimal64 price,
decimal64 volume, decimal64 vwap, rstring buyerID,
decimal64 bidprice, decimal64 bidsize, int32 numbuyers,
rstring sellerID, decimal64 askprice, decimal64 asksize,
int32 numsellers, rstring qualifiers, int32 seqno,
rstring exchtime, decimal64 blockTrd, decimal64 floorTrd,
decimal64 PEratio, decimal64 yield, decimal64 newprice,
decimal64 newvol, int32 newseqno, decimal64 bidimpvol,
decimal64 askimpcol, decimal64 impvol;
SmallDataType = rstring ticker, rstring date, rstring time, decimal64 price;
graph
stream<BigDataType> BigDataStream = FileSource() {
param
file : "TradesAndQuotes.csv.gz";
format : csv;
compression : gzip;
}
stream<SmallDataType> SmallDataStream = Functor(BigDataStream) {
output
SmallDataStream: ticker = BigDataStream.ticker,
date = BigDataStream.date,
time = BigDataStream.time,
price = BigDataStream.price;
}
() as Sink = FileSink(SmallDataStream) {
param
file : "SmallTrades.csv";
format : csv;
}
}