Example

The following example shows how to integrate the StructureParse operator with an spl.adapter::FileSource operator. The StructureParse operator generates data and metric tuples.


namespace test;
use com.teracloud.streams.teda.parser.binary::StructureParse;
composite Main {
  type
    Data = tuple
    <
      uint8 RECORD_TYPE,
      rstring ORIG_ADDR,
      rstring DEST_ADDR,
      rstring SMS_CENTRE,
      rstring INCOMING_TIME,
      rstring IMSI_ADDR,
      rstring filename,
      int64 tupleNo
    >;
    Metric = tuple
    <
      rstring filename,
      // operator lifetime metrics
      uint64 nTuplesReceivedTotal,
      uint64 nTuplesSentTotal,
      uint64 nBytesReceivedTotal,
      uint64 nBytesDroppedTotal,
      // metrics, which are reset after each metrics tuple
      uint64 nTuplesReceived,
      uint64 nTuplesSent,
      uint64 nBytesReceived,
      uint64 nBytesDropped
    >;
  graph
    stream<rstring filename> Files = DirectoryScan() {
      param directory: "input"; pattern: "^.*\\.dat$";
    }
    stream<rstring filename, int64 tupleNo, blob payload> SourceData as O = FileSource(Files as I) {
      param format: block; blockSize: 1024u * 1024u;
      output O: filename = FileName(), tupleNo = TupleNumber();
    }

    // Two output ports
    (stream<Data> Output; stream<Metric> Metrics) = StructureParse(SourceData as I) {
      param
        structure: "etc/structure.xml";
        mapping: "etc/mapping.xml";
      output Metrics:
        nTuplesReceivedTotal = nTuplesReceivedTotal(),
        nTuplesSentTotal = nTuplesSentTotal(),
        nBytesReceivedTotal = nBytesReceivedTotal(),
        nBytesDroppedTotal = nBytesDroppedTotal(),
        nTuplesReceived = nTuplesReceived(),
        nTuplesSent = nTuplesSent(),
        nBytesReceived = nBytesReceived(),
        nBytesDropped = nBytesDropped();
    }

    // See $STREAMS_INSTALL/samples/com.teracloud.streams.teda for more examples
}