Operator IPFIXMessageParser
IPFIXMessageParser is an operator for the Streams product that parses individual fields of IPFIX messages received in input tuples, and emits tuples containing message data. The operator may be configured with one or more output ports, and each port may be configured to emit different tuples, as specified by output filters. The tuples contain individual fields from the input message, as specified by output attribute assignments.
The IPFIXMessageParser operator expects only IPFIX messages in its input tuples, without any of the headers that precede them in network packets. The PacketLiveSource and PacketFileSource operators can produce tuples that contain IPFIX messages with the PAYLOAD_DATA() and the source IP address with IPV4_SRC_ADDRESS() output attribute assignment functions.
The IPFIXMessageParser operator consumes input tuples containing IPFIX messages, parses indivdual fields in flow records within IPFIX messages, selects flows to emit as output tuples with filter expressions, and assigns values to them with output attribute assignment expressions.
Output filters and attribute assignments are SPL expressions. They may use any of the built-in SPL functions, and any of these functions, which are specific to the IPFIXMessageParser operator:
There are result functions for each of the standardized fields. Nonstandardized fields in flow records can be accessed by field number.
The IPFIXMessageParser operator emits a tuple on each output port for each flow record within a IPFIX message, optionally filtered by the 'outputFilters' parameter. Specified fields from each flow record are assigned to output attributes with the IPFIX parser result functions. All attributes of all output ports must be assigned values, either with explicit assignment expressions, or implicitly by copy from input tuples.
Example
use com.teracloud.streams.network.parse::*;
use com.teracloud.streams.network.source::*;
composite Main {
type
PacketType =
float64 captureTime, // time that packet was captured, in seconds since Unix epoch
uint32 ipfixSource, // IP source address of IPFIX message
blob ipfixMessage; // the IPFIX message from a packet, excluding all network headers
IPFIXMessageType =
rstring captureTime, // time that packet was captured, in seconds since Unix epoch
uint64 messageNumber, // sequence number of message
rstring ipProtocol, // standard field 4, IP protocol byte
uint8 ipTOS, // standard field 5, IP class of service
rstring ipSourceAddress, // standard field 8 or 27, IP source address
uint16 ipSourcePort, // standard field 7, IP source port
rstring ipDestinationAddress, // standard field 12 or 28, IP destination address
uint16 ipDestinationPort, // standard field 11, IP destination port
uint64 flowIdentifier, // standard field 148, flow identifier
uint64 flowByteCount, // standard field 1, byte count
uint64 flowPacketCount, // standard field 2, packet count
rstring flowStartTime, // standard field 152, flow start time, in milliseconds since Unix epoch
rstring flowEndTime; // standard field 153, flow end time, in milliseconds since Unix epoch
DecapperMessageType =
rstring captureTime, // time that packet was captured, in seconds since Unix epoch
uint64 messageNumber, // sequence number of message
rstring ipProtocol, // standard field 4, IP protocol byte
rstring ipSourceAddress, // standard field 8 or 27, IP source address
uint16 ipSourcePort, // standard field 7, IP source port
rstring ipDestinationAddress, // standard field 12 or 28, IP destination address
uint16 ipDestinationPort, // standard field 11, IP destination port
uint64 flowIdentifier, // standard field 148, flow identifier
rstring flowStartTime, // standard field 152, flow start time, in milliseconds since Unix epoch
rstring usernamePrimary, // Decapper enterprise field 19
list<rstring> usernameSecondary, // Decapper enterprise field 20
rstring httpURL, // Decapper enterprise field 21
rstring httpContentType, // Decapper enterprise field 31
rstring httpHostname, // Decapper enterprise field 33
uint16 httpResponseCode, // Decapper enterprise field 35
rstring httpServer, // Decapper enterprise field 36
rstring httpUserAgent, // Decapper enterprise field 37
rstring httpVersion, // Decapper enterprise field 38
list<rstring> webCategory, // Decapper enterprise field 53
rstring fileName, // Decapper enterprise field 43
rstring fileHash, // Decapper enterprise field 55
uint64 fileSize, // Decapper enterprise field 56
uint8 aggregationControl; // Decapper enterprise field 57
HTTPMessageType =
rstring captureTime, // time that packet was captured, in seconds since Unix epoch
uint64 messageNumber, // sequence number of message
rstring ipProtocol, // standard field 4, IP protocol byte
rstring ipSourceAddress, // standard field 8 or 27, IP source address
uint16 ipSourcePort, // standard field 7, IP source port
rstring ipDestinationAddress, // standard field 12 or 28, IP destination address
uint16 ipDestinationPort, // standard field 11, IP destination port
uint64 flowIdentifier, // standard field 148, flow identifier
rstring flowStartTime, // standard field 152, flow start time, in milliseconds since Unix epoch
rstring httpURL, // Decapper enterprise field 21
rstring httpContentType, // Decapper enterprise field 31
rstring httpHostname, // Decapper enterprise field 33
uint16 httpResponseCode, // Decapper enterprise field 35
rstring httpServer, // Decapper enterprise field 36
rstring httpUserAgent, // Decapper enterprise field 37
rstring httpVersion, // Decapper enterprise field 38
list<rstring> webCategory, // Decapper enterprise field 53
rstring fileName, // Decapper enterprise field 43
rstring fileHash, // Decapper enterprise field 55
uint64 fileSize; // Decapper enterprise field 56
UsernameMessageType =
rstring captureTime, // time that packet was captured, in seconds since Unix epoch
uint64 messageNumber, // sequence number of message
rstring ipProtocol, // standard field 4, IP protocol byte
rstring ipSourceAddress, // standard field 8 or 27, IP source address
uint16 ipSourcePort, // standard field 7, IP source port
rstring ipDestinationAddress, // standard field 12 or 28, IP destination address
uint16 ipDestinationPort, // standard field 11, IP destination port
uint64 flowIdentifier, // standard field 148, flow identifier
rstring flowStartTime, // standard field 152, flow start time, in milliseconds since Unix epoch
rstring usernamePrimary, // Decapper enterprise field 19
list<rstring> usernameSecondary, // Decapper enterprise field 20
rstring fileName, // Decapper enterprise field 43
rstring fileHash, // Decapper enterprise field 55
uint64 fileSize; // Decapper enterprise field 56
graph
stream<PacketType> PacketStream as Out = PacketFileSource() {
param
pcapFilename: getSubmissionTimeValue("pcapFilename", "data/sample_ipfix_decapper.pcap" );
inputFilter: "udp port 2055";
metricsInterval: 0.0;
output Out:
captureTime = (float64)CAPTURE_SECONDS() + (float64)CAPTURE_MICROSECONDS() / 1000000.0,
ipfixSource = IPV4_SRC_ADDRESS(),
ipfixMessage = PAYLOAD_DATA();
}
( stream<IPFIXMessageType> AllMessageStream as OutAll ;
stream<DecapperMessageType> DecapperMessageStream as OutDecapper ;
stream<HTTPMessageType> HTTPMessageStream as OutHTTP ;
stream<UsernameMessageType> UsernameMessageStream as OutUsername ) = IPFIXMessageParser(PacketStream) {
logic
state: { map<uint8,rstring> ipProtocols = { 1: "icmp", 6: "tcp", 17: "udp" }; }
param
messageAttribute: ipfixMessage;
sourceAttribute: ipfixSource;
outputFilters: !parseError(),
!parseError() && IPFIX_enterpriseIdentifier(57)==2u,
!parseError() && IPFIX_enterpriseIdentifier(57)==2u && IPFIX_enterpriseFieldAsString(21)!="",
!parseError() && IPFIX_enterpriseIdentifier(57)==2u && IPFIX_enterpriseFieldAsString(19)!="";
output
OutAll:
captureTime = formatEpochDateTime(captureTime),
messageNumber = messagesProcessed(),
ipProtocol = IPFIX_protocolIdentifier() in ipProtocols ? ipProtocols[IPFIX_protocolIdentifier()] : (rstring)IPFIX_protocolIdentifier(),
ipTOS = IPFIX_ipClassOfService(),
ipSourceAddress = IPFIX_sourceIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_sourceIPv4Address()) : size(IPFIX_sourceIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_sourceIPv6Address()) : "",
ipSourcePort = IPFIX_sourceTransportPort(),
ipDestinationAddress = IPFIX_destinationIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_destinationIPv4Address()) : size(IPFIX_destinationIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_destinationIPv6Address()) : "",
ipDestinationPort = IPFIX_destinationTransportPort(),
flowIdentifier = IPFIX_flowId(),
flowByteCount = IPFIX_octetDeltaCount(),
flowPacketCount = IPFIX_packetDeltaCount(),
flowStartTime= formatEpochDateTime( (float64)IPFIX_flowStartMilliseconds() / 1000.0 , 3u ),
flowEndTime = formatEpochDateTime( (float64)IPFIX_flowEndMilliseconds() / 1000.0 , 3u );
OutDecapper:
captureTime = formatEpochDateTime(captureTime),
messageNumber = messagesProcessed(),
ipProtocol = IPFIX_protocolIdentifier() in ipProtocols ? ipProtocols[IPFIX_protocolIdentifier()] : (rstring)IPFIX_protocolIdentifier(),
ipSourceAddress = IPFIX_sourceIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_sourceIPv4Address()) : size(IPFIX_sourceIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_sourceIPv6Address()) : "",
ipSourcePort = IPFIX_sourceTransportPort(),
ipDestinationAddress = IPFIX_destinationIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_destinationIPv4Address()) : size(IPFIX_destinationIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_destinationIPv6Address()) : "",
ipDestinationPort = IPFIX_destinationTransportPort(),
flowIdentifier = IPFIX_flowId(),
flowStartTime= formatEpochDateTime( (float64)IPFIX_flowStartMilliseconds() / 1000.0 , 3u ),
usernamePrimary = IPFIX_enterpriseFieldAsString(19),
usernameSecondary = IPFIX_enterpriseBasicListFieldElementIdentifier(20)==60uh ? IPFIX_enterpriseBasicListFieldAsStrings(20) : (list<rstring>)[],
httpURL = IPFIX_enterpriseFieldAsString(21),
httpContentType = IPFIX_enterpriseFieldAsString(31),
httpHostname = IPFIX_enterpriseFieldAsString(33),
httpResponseCode = (uint16)IPFIX_enterpriseFieldAsInteger(35),
httpServer = IPFIX_enterpriseFieldAsString(36),
httpUserAgent = IPFIX_enterpriseFieldAsString(37),
httpVersion = IPFIX_enterpriseFieldAsString(38),
webCategory = IPFIX_enterpriseBasicListFieldElementIdentifier(53)==62uh ? IPFIX_enterpriseBasicListFieldAsStrings(53) : (list<rstring>)[],
fileName = IPFIX_enterpriseFieldAsString(43),
fileHash = (rstring)IPFIX_enterpriseFieldAsByteList(55),
fileSize = IPFIX_enterpriseFieldAsInteger(56),
aggregationControl = (uint8)IPFIX_enterpriseFieldAsInteger(57);
OutHTTP:
captureTime = formatEpochDateTime(captureTime),
messageNumber = messagesProcessed(),
ipProtocol = IPFIX_protocolIdentifier() in ipProtocols ? ipProtocols[IPFIX_protocolIdentifier()] : (rstring)IPFIX_protocolIdentifier(),
ipSourceAddress = IPFIX_sourceIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_sourceIPv4Address()) : size(IPFIX_sourceIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_sourceIPv6Address()) : "",
ipSourcePort = IPFIX_sourceTransportPort(),
ipDestinationAddress = IPFIX_destinationIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_destinationIPv4Address()) : size(IPFIX_destinationIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_destinationIPv6Address()) : "",
ipDestinationPort = IPFIX_destinationTransportPort(),
flowIdentifier = IPFIX_flowId(),
flowStartTime= formatEpochDateTime( (float64)IPFIX_flowStartMilliseconds() / 1000.0 , 3u ),
httpURL = IPFIX_enterpriseFieldAsString(21),
httpContentType = IPFIX_enterpriseFieldAsString(31),
httpHostname = IPFIX_enterpriseFieldAsString(33),
httpResponseCode = (uint16)IPFIX_enterpriseFieldAsInteger(35),
httpServer = IPFIX_enterpriseFieldAsString(36),
httpUserAgent = IPFIX_enterpriseFieldAsString(37),
httpVersion = IPFIX_enterpriseFieldAsString(38),
webCategory = IPFIX_enterpriseBasicListFieldElementIdentifier(53)==60uh ? IPFIX_enterpriseBasicListFieldAsStrings(53) : (list<rstring>)[],
fileName = IPFIX_enterpriseFieldAsString(43),
fileHash = (rstring)IPFIX_enterpriseFieldAsByteList(55),
fileSize = IPFIX_enterpriseFieldAsInteger(56);
OutUsername:
captureTime = formatEpochDateTime(captureTime),
messageNumber = messagesProcessed(),
ipProtocol = IPFIX_protocolIdentifier() in ipProtocols ? ipProtocols[IPFIX_protocolIdentifier()] : (rstring)IPFIX_protocolIdentifier(),
ipSourceAddress = IPFIX_sourceIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_sourceIPv4Address()) : size(IPFIX_sourceIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_sourceIPv6Address()) : "",
ipSourcePort = IPFIX_sourceTransportPort(),
ipDestinationAddress = IPFIX_destinationIPv4Address()>0u ? convertIPV4AddressNumericToString(IPFIX_destinationIPv4Address()) : size(IPFIX_destinationIPv6Address())>0 ? convertIPV6AddressNumericToString(IPFIX_destinationIPv6Address()) : "",
ipDestinationPort = IPFIX_destinationTransportPort(),
flowIdentifier = IPFIX_flowId(),
flowStartTime= formatEpochDateTime( (float64)IPFIX_flowStartMilliseconds() / 1000.0 , 3u ),
usernamePrimary = IPFIX_enterpriseFieldAsString(19),
usernameSecondary = IPFIX_enterpriseBasicListFieldElementIdentifier(20) == 62uh ? IPFIX_enterpriseBasicListFieldAsStrings(20) : (list<rstring>)[],
fileName = IPFIX_enterpriseFieldAsString(43),
fileHash = (rstring)IPFIX_enterpriseFieldAsByteList(55),
fileSize = IPFIX_enterpriseFieldAsInteger(56);
}
// See the $STREAMS_INSTALL/samples/com.teracloud.streams.network directory for more examples
}
References
IPFIX messages and the fields they contain are described here:
- https://tools.ietf.org/html/rfc5101
- https://tools.ietf.org/html/rfc5102
- https://tools.ietf.org/html/rfc6313
- http://www.iana.org/assignments/ipfix/ipfix.xml#ipfix-information-element-registration-procedures
- http://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-structured-data-types-semantics
- http://www.iana.org/assignments/enterprise-numbers/enterprise-numbers
Summary
- Ports
- This operator has 1 input port and 0 or more output ports.
- Windowing
- This operator does not accept any windowing configurations.
- Parameters
- This operator supports 4 parameters.
Required: messageAttribute, sourceAttribute
Optional: outputFilters, processorAffinity
- Metrics
- This operator does not report any metrics.
Properties
- Implementation
- C++
- Threading
- Always - Operator always provides a single threaded execution context.
- Ports (0)
-
The IPFIXMessageParser operator requires one input port. One input attribute must be of type blob and must contain a IPFIX message, excluding the network headers that proceed them in network packets, as specified by the required parameter messageAttribute.
The PAYLOAD_DATA() and IPV4_SRC_ADDRESS() output assignment functions of the PacketLiveSource and PacketFileSource operators produces attributes that can be consumed by the IPFIXMessageParser operator.
- Properties
-
- Optional: false
- ControlPort: false
- TupleMutationAllowed: false
- WindowingMode: NonWindowed
- WindowPunctuationInputMode: Oblivious
- Assignments
- This operator allows any SPL expression of the correct type to be assigned to output attributes.
- Ports (0...)
-
The IPFIXMessageParser operator requires one or more output ports.
Each output port will produce one output tuple for each input tuple if the corresponding expression in the outputFilters parameter evaluates true, or if no outputFilters parameter is specified.
Output attributes can be assigned values with any SPL expression that evaluates to the proper type, and the expressions may include any of the IPFIX result functions. Output attributes that match input attributes in name and type are copied automatically.
- Properties
-
- TupleMutationAllowed: false
- WindowPunctuationOutputMode: Preserving
Required: messageAttribute, sourceAttribute
Optional: outputFilters, processorAffinity
- messageAttribute
-
This required parameter specifies an input attribute of type blob that contains a IPFIX message to be parsed by the operator.
- Properties
-
- Type: blob
- Cardinality: 1
- Optional: false
- ExpressionMode: Attribute
- outputFilters
-
This optional parameter takes a list of SPL expressions that specify which IPFIX messages should be emitted by the corresponding output port. The number of expressions in the list must match the number of output ports, and each expression must evaluate to a boolean value. The output filter expressions may include any of the IPFIX result functions.
The default value of the outputFilters parameter is an empty list, which causes all IPFIX messages processed to be emitted by all output ports.
- Properties
-
- Type: boolean
- Optional: true
- ExpressionMode: Expression
- processorAffinity
-
This optional parameter takes one expression of type uint32 that specifies which processor core the operator's thread will run on. The maximum value is P-1, where P is the number of processors on the machine where the operator will run.
Where the operator runs on a thread of its own, this parameter applies to the operator's thread. This is the situation when the operator's input port is configured as a threaded input port, and when the operator has an @parallel annotation.
Where the operator runs on the thread of an upstream operator, this parameter affects the thread of the operator that sends tuples to it. This is the situation when the operator is fused with an upstream operator.
The default is to dispatch the operator's thread on any available processor.
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: true
- ExpressionMode: Expression
- sourceAttribute
-
Specifies an input attribute of type uint32 that contains the IP source address of the IPFIX message.
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: false
- ExpressionMode: Attribute
- IPFIXMessageParser
-
stream<${outputSchema}> ${outputStream} = com.teracloud.streams.network.parse::IPFIXMessageParser(${inputPacketStream}) { param messageAttribute: ${inputPacketStream-blob-attribute}; sourceAttribute: ${inputPacketStream-uint32-attribute}; ${parameter}: ${parameterExpression} output ${outputStream}: ${attributeAssignments} }
- common code for toolkit operators