Operator NetflowMessageParser

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.teracloud.streams.network/op$com.teracloud.streams.network.parse$NetflowMessageParser.svg

NetflowMessageParser is an operator for the Streams product that parses individual fields of Netflow messages received in input tuples, and emits tuples containing message data. The operator may be configured with one or more output ports, and each port may be configured to emit different tuples, as specified by output filters. The tuples contain individual fields from the input message, as specified by output attribute assignments.

The NetflowMessageParser operator expects only Netflow messages in its input tuples, without any of the headers that precede them in network packets. The PacketLiveSource and PacketFileSource operators can produce tuples that contain Netflow messages with the PAYLOAD_DATA() output attribute assignment function.

The NetflowMessageParser operator consumes input tuples containing Netflow messages, parses indivdual fields in flow records within Netflow version 5 and version 9 messages, selects flows to emit as output tuples with filter expressions, and assigns values to them with output attribute assignment expressions.

Output filters and attribute assignments are SPL expressions. They may use any of the built-in SPL functions, and any of these functions, which are specific to the NetflowMessageParser operator:

For Netflow version 5 messages, there are result functions for each field in the message header and flow records. For Netflow version 9 messages, there are result functions for each field in the message header and the standardized fields in flow records. Nonstandardized fields in flow records can be accessed by field number.

The NetflowMessageParser operator emits a tuple on each output port for each flow record within a Netflow version 5 or 9 message, optionally filtered by the 'outputFilters' parameter. Specified fields from each flow record are assigned to output attributes with the Netflow parser result functions. All attributes of all output ports must be assigned values, either with explicit assignment expressions, or implicitly by copy from input tuples.

Example


use com.teracloud.streams.network.parse::*;
use com.teracloud.streams.network.source::*;
composite Main {
    type
      PacketType =
          float64 captureTime,            // time that packet was captured, in seconds since Unix epoch
          uint32 netflowSource,           // IP source address of Netflow message
          blob netflowMessage;            // the Netflow message from a packet, excluding all network headers

      NetflowMessageType =
          rstring captureTime,            // time that packet was captured, in seconds since Unix epoch
          uint64  messageNumber,
          rstring flowStartTime,          // time flow started, according to Netflow source, relative to Unix epoch, in seconds
          rstring flowEndTime,            // time flow ended, according to Netflow source, relative to Unix epoch, in seconds
          rstring flowDuration,           // duration of this flow, in seconds
          uint64  flowBytes,              // number of bytes in this flow
          uint64  flowPackets,            // number of packets in this flow
          rstring ipProtocol,             // IP protocol byte
          rstring ipSourceAddress,        // IP source address
          uint16  ipSourcePort,           // TCP/UDP source port number
          rstring ipDestinationAddress,   // IP destination address
          uint16  ipDestinationPort,      // TCP/UDP destination port number
          uint8   tcpFlags;               // all the TCP flags seen for this flow

    graph
      stream<PacketType> PacketStream as Out = PacketFileSource() {
          param
              pcapFilename: getSubmissionTimeValue("pcapFilename", "data/sample_netflow9_new.pcap" );
              inputFilter: "udp port 2055";
              metricsInterval: 0.0;
          output Out:
              captureTime = (float64)CAPTURE_SECONDS() + (float64)CAPTURE_MICROSECONDS() / 1000000.0,
              netflowSource = IPV4_SRC_ADDRESS(),
              netflowMessage = PAYLOAD_DATA();
      }

      stream<NetflowMessageType> NetflowMessageStream as Out = NetflowMessageParser(PacketStream) {
        logic state: { map<uint8,rstring> ipProtocols = { 1: "icmp", 6: "tcp", 17: "udp" }; }
        param
            messageAttribute: netflowMessage;
            sourceAttribute: netflowSource;
            outputFilters: !parseError();
        output Out:
            captureTime = formatEpochTime(captureTime),
            messageNumber = messagesProcessed(),
            flowStartTime = formatEpochTime( captureTime - ( (float64)NETFLOW_SYSTEM_UPTIME() - (float64)NETFLOW_FIRST_SWITCHED() ) / 1000.0, 3u ),
            flowEndTime = formatEpochTime( captureTime - ( (float64)NETFLOW_SYSTEM_UPTIME() - (float64)NETFLOW_LAST_SWITCHED() ) / 1000.0, 3u ),
            flowDuration = formatElapsedTime( ( (float64)NETFLOW_LAST_SWITCHED() - (float64)NETFLOW_FIRST_SWITCHED() ) / 1000.0, 3u ),
            flowBytes = NETFLOW_IN_BYTES(),
            flowPackets = NETFLOW_IN_PKTS(),
            ipProtocol = NETFLOW_PROTOCOL() in ipProtocols ? ipProtocols[NETFLOW_PROTOCOL()] : (rstring)NETFLOW_PROTOCOL(),
            ipSourceAddress = NETFLOW_IPV4_SRC_ADDR()>0u ? convertIPV4AddressNumericToString(NETFLOW_IPV4_SRC_ADDR()) : size(NETFLOW_IPV6_SRC_ADDR())>0 ? convertIPV6AddressNumericToString(NETFLOW_IPV6_SRC_ADDR())  : "",
            ipSourcePort = NETFLOW_SRC_PORT(),
            ipDestinationAddress = NETFLOW_IPV4_DST_ADDR()>0u ? convertIPV4AddressNumericToString(NETFLOW_IPV4_DST_ADDR()) : size(NETFLOW_IPV6_DST_ADDR())>0 ? convertIPV6AddressNumericToString(NETFLOW_IPV6_DST_ADDR())  : "",
            ipDestinationPort = NETFLOW_DST_PORT(),
            tcpFlags = NETFLOW_TCP_FLAGS();

      }

      // See the $STREAMS_INSTALL/samples/com.teracloud.streams.network directory for more examples
}

References

Netflow version 5 messages and the fields they contain are described here:

Netflow version 9 messages and the fields they contain are described here:

Summary

Ports
This operator has 1 input port and 0 or more output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 4 parameters.

Required: messageAttribute, sourceAttribute

Optional: outputFilters, processorAffinity

Metrics
This operator does not report any metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Input Ports

Ports (0)

The NetflowMessageParser operator requires one input port. One input attribute must be of type blob and must contain a Netflow message, excluding the network headers that proceed them in network packets, as specified by the required parameter messageAttribute.

The PAYLOAD_DATA() and IPV4_SRC_ADDRESS() output assignment functions of the PacketLiveSource and PacketFileSource operators produces attributes that can be consumed by the NetflowMessageParser operator.

Properties

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes.
Ports (0...)

The NetflowMessageParser operator requires one or more output ports.

Each output port will produce one output tuple for each input tuple if the corresponding expression in the outputFilters parameter evaluates true, or if no outputFilters parameter is specified.

Output attributes can be assigned values with any SPL expression that evaluates to the proper type, and the expressions may include any of the Netflow result functions. Output attributes that match input attributes in name and type are copied automatically.

Properties

Parameters

Required: messageAttribute, sourceAttribute

Optional: outputFilters, processorAffinity

messageAttribute

This required parameter specifies an input attribute of type blob that contains a Netflow message to be parsed by the operator.

Properties

outputFilters

This optional parameter takes a list of SPL expressions that specify which Netflow messages should be emitted by the corresponding output port. The number of expressions in the list must match the number of output ports, and each expression must evaluate to a boolean value. The output filter expressions may include any of the Netflow result functions.

The default value of the outputFilters parameter is an empty list, which causes all Netflow messages processed to be emitted by all output ports.

Properties

processorAffinity

This optional parameter takes one expression of type uint32 that specifies which processor core the operator's thread will run on. The maximum value is P-1, where P is the number of processors on the machine where the operator will run.

Where the operator runs on a thread of its own, this parameter applies to the operator's thread. This is the situation when the operator's input port is configured as a threaded input port, and when the operator has an @parallel annotation.

Where the operator runs on the thread of an upstream operator, this parameter affects the thread of the operator that sends tuples to it. This is the situation when the operator is fused with an upstream operator.

The default is to dispatch the operator's thread on any available processor.

Properties

sourceAttribute

Specifies an input attribute of type uint32 that contains the IP source address of the Netflow message.

Properties

Code Templates

NetflowMessageParser

stream<${outputSchema}> ${outputStream} = com.teracloud.streams.network.parse::NetflowMessageParser(${inputPacketStream}) {
  param
      messageAttribute: ${inputPacketStream-blob-attribute};
      sourceAttribute: ${inputPacketStream-uint32-attribute};
      ${parameter}: ${parameterExpression}
  output
      ${outputStream}: ${attributeAssignments}
}
      

Libraries

common code for toolkit operators
Include Path: ../../impl/include