Sample
The following sample shows how to integrate the StructureParse operator with an spl.adapter::FileSource operator. The StructureParse operator generates data and metric tuples.
namespace test;
use com.teracloud.streams.teda.parser.binary::StructureParse;
composite Main
{
type
Data = tuple
<
uint8 RECORD_TYPE,
rstring ORIG_ADDR,
rstring DEST_ADDR,
rstring SMS_CENTRE,
rstring INCOMING_TIME,
rstring IMSI_ADDR,
rstring filename,
int64 tupleNo
>;
Metric = tuple
<
rstring filename,
// operator lifetime metrics
uint64 nTuplesReceivedTotal,
uint64 nTuplesSentTotal,
uint64 nBytesReceivedTotal,
uint64 nBytesDroppedTotal,
// metrics, which are reset after each metrics tuple
uint64 nTuplesReceived,
uint64 nTuplesSent,
uint64 nBytesReceived,
uint64 nBytesDropped
>;
graph
stream<rstring filename> Files as O = DirectoryScan()
{
param directory: "input"; pattern: "^.*\\.dat$";
}
stream<rstring filename, int64 tupleNo, blob payload> SourceData as O = FileSource(Files as I)
{
param format: block; blockSize: 1024u * 1024u;
output O: filename = FileName(), tupleNo = TupleNumber();
}
// Two output ports.
(stream<Data> Output as O; stream<Metric> Metrics as M) = StructureParse(SourceData as I)
{
param
structure: "etc/structure.xml";
mapping: "etc/mapping.xml";
output M:
nTuplesReceivedTotal = nTuplesReceivedTotal(),
nTuplesSentTotal = nTuplesSentTotal(),
nBytesReceivedTotal = nBytesReceivedTotal(),
nBytesDroppedTotal = nBytesDroppedTotal(),
nTuplesReceived = nTuplesReceived(),
nTuplesSent = nTuplesSent(),
nBytesReceived = nBytesReceived(),
nBytesDropped = nBytesDropped()
;
}
() as OutputSink = FileSink(Output as I)
{
param file: "output.txt"; format: txt; flush: 1u;
}
() as MetricsSink = FileSink(Metrics as I)
{
param file: "metrics.txt"; format: txt; flush: 1u;
}
}