Operator IPFIXMessageParser

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.teracloud.streams.network/op$com.teracloud.streams.network.parse$IPFIXMessageParser.svg

IPFIXMessageParser is an operator for the Streams product that parses individual fields of IPFIX messages received in input tuples, and emits tuples containing message data. The operator may be configured with one or more output ports, and each port may be configured to emit different tuples, as specified by output filters. The tuples contain individual fields from the input message, as specified by output attribute assignments.

The IPFIXMessageParser operator expects only IPFIX messages in its input tuples, without any of the headers that precede them in network packets. The PacketLiveSource and PacketFileSource operators can produce tuples that contain IPFIX messages with the PAYLOAD_DATA() and the source IP address with IPV4_SRC_ADDRESS() output attribute assignment functions.

The IPFIXMessageParser operator consumes input tuples containing IPFIX messages, parses indivdual fields in flow records within IPFIX messages, selects flows to emit as output tuples with filter expressions, and assigns values to them with output attribute assignment expressions.

Output filters and attribute assignments are SPL expressions. They may use any of the built-in SPL functions, and any of these functions, which are specific to the IPFIXMessageParser operator:

There are result functions for each of the standardized fields. Nonstandardized fields in flow records can be accessed by field number.

The IPFIXMessageParser operator emits a tuple on each output port for each flow record within a IPFIX message, optionally filtered by the 'outputFilters' parameter. Specified fields from each flow record are assigned to output attributes with the IPFIX parser result functions. All attributes of all output ports must be assigned values, either with explicit assignment expressions, or implicitly by copy from input tuples.

Example


use com.teracloud.streams.network.parse::*;
use com.teracloud.streams.network.source::*;
composite Main {
    type
      PacketType =
          float64 captureTime,          // time that packet was captured, in seconds since Unix epoch
          uint32 ipfixSource,           // IP source address of IPFIX message
          blob ipfixMessage;            // the IPFIX message from a packet, excluding all network headers

      IPFIXMessageType =
          rstring captureTime,                   // time that packet was captured, in seconds since Unix epoch
          uint64  messageNumber,                 // sequence number of message
          rstring ipProtocol,                    // standard field 4, IP protocol byte
          uint8   ipTOS,                         // standard field 5, IP class of service
          rstring ipSourceAddress,               // standard field 8 or 27, IP source address
          uint16  ipSourcePort,                  // standard field 7, IP source port
          rstring ipDestinationAddress,          // standard field 12 or 28, IP destination address
          uint16  ipDestinationPort,             // standard field 11, IP destination port
          uint64  flowIdentifier,                // standard field 148, flow identifier
          uint64  flowByteCount,                 // standard field 1, byte count
          uint64  flowPacketCount,               // standard field 2, packet count
          rstring flowStartTime,                 // standard field 152, flow start time, in milliseconds since Unix epoch
          rstring flowEndTime;                   // standard field 153, flow end time, in milliseconds since Unix epoch

      DecapperMessageType =
          rstring captureTime,                   // time that packet was captured, in seconds since Unix epoch
          uint64  messageNumber,                 // sequence number of message
          rstring ipProtocol,                    // standard field 4, IP protocol byte
          rstring ipSourceAddress,               // standard field 8 or 27, IP source address
          uint16  ipSourcePort,                  // standard field 7, IP source port
          rstring ipDestinationAddress,          // standard field 12 or 28, IP destination address
          uint16  ipDestinationPort,             // standard field 11, IP destination port
          uint64  flowIdentifier,                // standard field 148, flow identifier
          rstring flowStartTime,                 // standard field 152, flow start time, in milliseconds since Unix epoch
          rstring usernamePrimary,               // Decapper enterprise field 19
          list<rstring> usernameSecondary,       // Decapper enterprise field 20
          rstring httpURL,                       // Decapper enterprise field 21
          rstring httpContentType,               // Decapper enterprise field 31
          rstring httpHostname,                  // Decapper enterprise field 33
          uint16  httpResponseCode,              // Decapper enterprise field 35 
          rstring httpServer,                    // Decapper enterprise field 36
          rstring httpUserAgent,                 // Decapper enterprise field 37
          rstring httpVersion,                   // Decapper enterprise field 38
          list<rstring> webCategory,             // Decapper enterprise field 53
          rstring fileName,                      // Decapper enterprise field 43
          rstring fileHash,                      // Decapper enterprise field 55
          uint64  fileSize,                      // Decapper enterprise field 56 
          uint8   aggregationControl;            // Decapper enterprise field 57 

      HTTPMessageType =
          rstring captureTime,                   // time that packet was captured, in seconds since Unix epoch
          uint64  messageNumber,                 // sequence number of message
          rstring ipProtocol,                    // standard field 4, IP protocol byte
          rstring ipSourceAddress,               // standard field 8 or 27, IP source address
          uint16  ipSourcePort,                  // standard field 7, IP source port
          rstring ipDestinationAddress,          // standard field 12 or 28, IP destination address
          uint16  ipDestinationPort,             // standard field 11, IP destination port
          uint64  flowIdentifier,                // standard field 148, flow identifier
          rstring flowStartTime,                 // standard field 152, flow start time, in milliseconds since Unix epoch
          rstring httpURL,                       // Decapper enterprise field 21
          rstring httpContentType,               // Decapper enterprise field 31
          rstring httpHostname,                  // Decapper enterprise field 33
          uint16  httpResponseCode,              // Decapper enterprise field 35 
          rstring httpServer,                    // Decapper enterprise field 36
          rstring httpUserAgent,                 // Decapper enterprise field 37
          rstring httpVersion,                   // Decapper enterprise field 38
          list<rstring> webCategory,             // Decapper enterprise field 53
          rstring fileName,                      // Decapper enterprise field 43
          rstring fileHash,                      // Decapper enterprise field 55
          uint64  fileSize;                      // Decapper enterprise field 56 

      UsernameMessageType =
          rstring captureTime,                   // time that packet was captured, in seconds since Unix epoch
          uint64  messageNumber,                 // sequence number of message
          rstring ipProtocol,                    // standard field 4, IP protocol byte
          rstring ipSourceAddress,               // standard field 8 or 27, IP source address
          uint16  ipSourcePort,                  // standard field 7, IP source port
          rstring ipDestinationAddress,          // standard field 12 or 28, IP destination address
          uint16  ipDestinationPort,             // standard field 11, IP destination port
          uint64  flowIdentifier,                // standard field 148, flow identifier
          rstring flowStartTime,                 // standard field 152, flow start time, in milliseconds since Unix epoch
          rstring usernamePrimary,               // Decapper enterprise field 19
          list<rstring> usernameSecondary,       // Decapper enterprise field 20
          rstring fileName,                      // Decapper enterprise field 43
          rstring fileHash,                      // Decapper enterprise field 55
          uint64  fileSize;                      // Decapper enterprise field 56 

    graph
      stream<PacketType> PacketStream as Out = PacketFileSource() {
          param
              pcapFilename: getSubmissionTimeValue("pcapFilename", "data/sample_ipfix_decapper.pcap" );
              inputFilter: "udp port 2055";
              metricsInterval: 0.0;
          output Out:
              captureTime = (float64)CAPTURE_SECONDS() + (float64)CAPTURE_MICROSECONDS() / 1000000.0,
              ipfixSource = IPV4_SRC_ADDRESS(),
              ipfixMessage = PAYLOAD_DATA();
      }

      ( stream<IPFIXMessageType> AllMessageStream as OutAll ;
        stream<DecapperMessageType> DecapperMessageStream as OutDecapper ;
        stream<HTTPMessageType> HTTPMessageStream as OutHTTP ;
        stream<UsernameMessageType> UsernameMessageStream as OutUsername ) = IPFIXMessageParser(PacketStream) {
        logic
            state: { map<uint8,rstring> ipProtocols = { 1: "icmp", 6: "tcp", 17: "udp" }; }
        param
            messageAttribute: ipfixMessage;
            sourceAttribute: ipfixSource;
            outputFilters: !parseError(), 
                          !parseError() && IPFIX_enterpriseIdentifier(57)==2u,
                          !parseError() && IPFIX_enterpriseIdentifier(57)==2u && IPFIX_enterpriseFieldAsString(21)!="",
                          !parseError() && IPFIX_enterpriseIdentifier(57)==2u && IPFIX_enterpriseFieldAsString(19)!="";
        output 
        OutAll:
            captureTime = formatEpochDateTime(captureTime),
            messageNumber = messagesProcessed(),
            ipProtocol = IPFIX_protocolIdentifier() in ipProtocols ? ipProtocols[IPFIX_protocolIdentifier()] : (rstring)IPFIX_protocolIdentifier(),
            ipTOS = IPFIX_ipClassOfService(),
            ipSourceAddress = IPFIX_sourceIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_sourceIPv4Address()) : size(IPFIX_sourceIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_sourceIPv6Address())  : "",
            ipSourcePort = IPFIX_sourceTransportPort(),
            ipDestinationAddress = IPFIX_destinationIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_destinationIPv4Address()) : size(IPFIX_destinationIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_destinationIPv6Address())  : "",
            ipDestinationPort = IPFIX_destinationTransportPort(),
            flowIdentifier = IPFIX_flowId(),
            flowByteCount = IPFIX_octetDeltaCount(),
            flowPacketCount = IPFIX_packetDeltaCount(),
            flowStartTime= formatEpochDateTime( (float64)IPFIX_flowStartMilliseconds() / 1000.0 , 3u ),
            flowEndTime = formatEpochDateTime( (float64)IPFIX_flowEndMilliseconds() / 1000.0 , 3u );
        OutDecapper:
            captureTime = formatEpochDateTime(captureTime),
            messageNumber = messagesProcessed(),
            ipProtocol = IPFIX_protocolIdentifier() in ipProtocols ? ipProtocols[IPFIX_protocolIdentifier()] : (rstring)IPFIX_protocolIdentifier(),
            ipSourceAddress = IPFIX_sourceIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_sourceIPv4Address()) : size(IPFIX_sourceIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_sourceIPv6Address())  : "",
            ipSourcePort = IPFIX_sourceTransportPort(),
            ipDestinationAddress = IPFIX_destinationIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_destinationIPv4Address()) : size(IPFIX_destinationIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_destinationIPv6Address())  : "",
            ipDestinationPort = IPFIX_destinationTransportPort(),
            flowIdentifier = IPFIX_flowId(),
            flowStartTime= formatEpochDateTime( (float64)IPFIX_flowStartMilliseconds() / 1000.0 , 3u ),
            usernamePrimary = IPFIX_enterpriseFieldAsString(19),             
            usernameSecondary = IPFIX_enterpriseBasicListFieldElementIdentifier(20)==60uh ? IPFIX_enterpriseBasicListFieldAsStrings(20) : (list<rstring>)[],                        
            httpURL = IPFIX_enterpriseFieldAsString(21),                     
            httpContentType = IPFIX_enterpriseFieldAsString(31),             
            httpHostname = IPFIX_enterpriseFieldAsString(33),                
            httpResponseCode = (uint16)IPFIX_enterpriseFieldAsInteger(35),   
            httpServer = IPFIX_enterpriseFieldAsString(36),                  
            httpUserAgent = IPFIX_enterpriseFieldAsString(37),               
            httpVersion = IPFIX_enterpriseFieldAsString(38),                 
            webCategory = IPFIX_enterpriseBasicListFieldElementIdentifier(53)==62uh ? IPFIX_enterpriseBasicListFieldAsStrings(53) : (list<rstring>)[],      
            fileName = IPFIX_enterpriseFieldAsString(43),         
            fileHash = (rstring)IPFIX_enterpriseFieldAsByteList(55),         
            fileSize = IPFIX_enterpriseFieldAsInteger(56),                   
            aggregationControl = (uint8)IPFIX_enterpriseFieldAsInteger(57);  
        OutHTTP:
            captureTime = formatEpochDateTime(captureTime),
            messageNumber = messagesProcessed(),
            ipProtocol = IPFIX_protocolIdentifier() in ipProtocols ? ipProtocols[IPFIX_protocolIdentifier()] : (rstring)IPFIX_protocolIdentifier(),
            ipSourceAddress = IPFIX_sourceIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_sourceIPv4Address()) : size(IPFIX_sourceIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_sourceIPv6Address())  : "",
            ipSourcePort = IPFIX_sourceTransportPort(),
            ipDestinationAddress = IPFIX_destinationIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_destinationIPv4Address()) : size(IPFIX_destinationIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_destinationIPv6Address())  : "",
            ipDestinationPort = IPFIX_destinationTransportPort(),
            flowIdentifier = IPFIX_flowId(),
            flowStartTime= formatEpochDateTime( (float64)IPFIX_flowStartMilliseconds() / 1000.0 , 3u ),
            httpURL = IPFIX_enterpriseFieldAsString(21),                     
            httpContentType = IPFIX_enterpriseFieldAsString(31),             
            httpHostname = IPFIX_enterpriseFieldAsString(33),                
            httpResponseCode = (uint16)IPFIX_enterpriseFieldAsInteger(35),   
            httpServer = IPFIX_enterpriseFieldAsString(36),                  
            httpUserAgent = IPFIX_enterpriseFieldAsString(37),               
            httpVersion = IPFIX_enterpriseFieldAsString(38),                 
            webCategory = IPFIX_enterpriseBasicListFieldElementIdentifier(53)==60uh ? IPFIX_enterpriseBasicListFieldAsStrings(53) : (list<rstring>)[],           
            fileName = IPFIX_enterpriseFieldAsString(43),         
            fileHash = (rstring)IPFIX_enterpriseFieldAsByteList(55),         
            fileSize = IPFIX_enterpriseFieldAsInteger(56);                   
        OutUsername:
            captureTime = formatEpochDateTime(captureTime),
            messageNumber = messagesProcessed(),
            ipProtocol = IPFIX_protocolIdentifier() in ipProtocols ? ipProtocols[IPFIX_protocolIdentifier()] : (rstring)IPFIX_protocolIdentifier(),
            ipSourceAddress = IPFIX_sourceIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_sourceIPv4Address()) : size(IPFIX_sourceIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_sourceIPv6Address())  : "",
            ipSourcePort = IPFIX_sourceTransportPort(),
            ipDestinationAddress = IPFIX_destinationIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_destinationIPv4Address()) : size(IPFIX_destinationIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_destinationIPv6Address())  : "",
            ipDestinationPort = IPFIX_destinationTransportPort(),
            flowIdentifier = IPFIX_flowId(),
            flowStartTime= formatEpochDateTime( (float64)IPFIX_flowStartMilliseconds() / 1000.0 , 3u ),
            usernamePrimary = IPFIX_enterpriseFieldAsString(19),             
            usernameSecondary = IPFIX_enterpriseBasicListFieldElementIdentifier(20) == 62uh ? IPFIX_enterpriseBasicListFieldAsStrings(20) : (list<rstring>)[],               
            fileName = IPFIX_enterpriseFieldAsString(43),         
            fileHash = (rstring)IPFIX_enterpriseFieldAsByteList(55),         
            fileSize = IPFIX_enterpriseFieldAsInteger(56);                   
      }

      // See the $STREAMS_INSTALL/samples/com.teracloud.streams.network directory for more examples
}

References

IPFIX messages and the fields they contain are described here:

Summary

Ports
This operator has 1 input port and 0 or more output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 4 parameters.

Required: messageAttribute, sourceAttribute

Optional: outputFilters, processorAffinity

Metrics
This operator does not report any metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Input Ports

Ports (0)

The IPFIXMessageParser operator requires one input port. One input attribute must be of type blob and must contain a IPFIX message, excluding the network headers that proceed them in network packets, as specified by the required parameter messageAttribute.

The PAYLOAD_DATA() and IPV4_SRC_ADDRESS() output assignment functions of the PacketLiveSource and PacketFileSource operators produces attributes that can be consumed by the IPFIXMessageParser operator.

Properties

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes.
Ports (0...)

The IPFIXMessageParser operator requires one or more output ports.

Each output port will produce one output tuple for each input tuple if the corresponding expression in the outputFilters parameter evaluates true, or if no outputFilters parameter is specified.

Output attributes can be assigned values with any SPL expression that evaluates to the proper type, and the expressions may include any of the IPFIX result functions. Output attributes that match input attributes in name and type are copied automatically.

Properties

Parameters

Required: messageAttribute, sourceAttribute

Optional: outputFilters, processorAffinity

messageAttribute

This required parameter specifies an input attribute of type blob that contains a IPFIX message to be parsed by the operator.

Properties

outputFilters

This optional parameter takes a list of SPL expressions that specify which IPFIX messages should be emitted by the corresponding output port. The number of expressions in the list must match the number of output ports, and each expression must evaluate to a boolean value. The output filter expressions may include any of the IPFIX result functions.

The default value of the outputFilters parameter is an empty list, which causes all IPFIX messages processed to be emitted by all output ports.

Properties

processorAffinity

This optional parameter takes one expression of type uint32 that specifies which processor core the operator's thread will run on. The maximum value is P-1, where P is the number of processors on the machine where the operator will run.

Where the operator runs on a thread of its own, this parameter applies to the operator's thread. This is the situation when the operator's input port is configured as a threaded input port, and when the operator has an @parallel annotation.

Where the operator runs on the thread of an upstream operator, this parameter affects the thread of the operator that sends tuples to it. This is the situation when the operator is fused with an upstream operator.

The default is to dispatch the operator's thread on any available processor.

Properties

sourceAttribute

Specifies an input attribute of type uint32 that contains the IP source address of the IPFIX message.

Properties

Code Templates

IPFIXMessageParser

stream<${outputSchema}> ${outputStream} = com.teracloud.streams.network.parse::IPFIXMessageParser(${inputPacketStream}) {
  param
      messageAttribute: ${inputPacketStream-blob-attribute};
      sourceAttribute: ${inputPacketStream-uint32-attribute};
      ${parameter}: ${parameterExpression}
  output
      ${outputStream}: ${attributeAssignments}
}
      

Libraries

common code for toolkit operators
Include Path: ../../impl/include