Operator NetflowMessageParser
NetflowMessageParser is an operator for the Streams product that parses individual fields of Netflow messages received in input tuples, and emits tuples containing message data. The operator may be configured with one or more output ports, and each port may be configured to emit different tuples, as specified by output filters. The tuples contain individual fields from the input message, as specified by output attribute assignments.
The NetflowMessageParser operator expects only Netflow messages in its input tuples, without any of the headers that precede them in network packets. The PacketLiveSource and PacketFileSource operators can produce tuples that contain Netflow messages with the PAYLOAD_DATA() output attribute assignment function.
The NetflowMessageParser operator consumes input tuples containing Netflow messages, parses indivdual fields in flow records within Netflow version 5 and version 9 messages, selects flows to emit as output tuples with filter expressions, and assigns values to them with output attribute assignment expressions.
Output filters and attribute assignments are SPL expressions. They may use any of the built-in SPL functions, and any of these functions, which are specific to the NetflowMessageParser operator:
For Netflow version 5 messages, there are result functions for each field in the message header and flow records. For Netflow version 9 messages, there are result functions for each field in the message header and the standardized fields in flow records. Nonstandardized fields in flow records can be accessed by field number.
The NetflowMessageParser operator emits a tuple on each output port for each flow record within a Netflow version 5 or 9 message, optionally filtered by the 'outputFilters' parameter. Specified fields from each flow record are assigned to output attributes with the Netflow parser result functions. All attributes of all output ports must be assigned values, either with explicit assignment expressions, or implicitly by copy from input tuples.
Example
use com.teracloud.streams.network.parse::*;
use com.teracloud.streams.network.source::*;
composite Main {
type
PacketType =
float64 captureTime, // time that packet was captured, in seconds since Unix epoch
uint32 netflowSource, // IP source address of Netflow message
blob netflowMessage; // the Netflow message from a packet, excluding all network headers
NetflowMessageType =
rstring captureTime, // time that packet was captured, in seconds since Unix epoch
uint64 messageNumber,
rstring flowStartTime, // time flow started, according to Netflow source, relative to Unix epoch, in seconds
rstring flowEndTime, // time flow ended, according to Netflow source, relative to Unix epoch, in seconds
rstring flowDuration, // duration of this flow, in seconds
uint64 flowBytes, // number of bytes in this flow
uint64 flowPackets, // number of packets in this flow
rstring ipProtocol, // IP protocol byte
rstring ipSourceAddress, // IP source address
uint16 ipSourcePort, // TCP/UDP source port number
rstring ipDestinationAddress, // IP destination address
uint16 ipDestinationPort, // TCP/UDP destination port number
uint8 tcpFlags; // all the TCP flags seen for this flow
graph
stream<PacketType> PacketStream as Out = PacketFileSource() {
param
pcapFilename: getSubmissionTimeValue("pcapFilename", "data/sample_netflow9_new.pcap" );
inputFilter: "udp port 2055";
metricsInterval: 0.0;
output Out:
captureTime = (float64)CAPTURE_SECONDS() + (float64)CAPTURE_MICROSECONDS() / 1000000.0,
netflowSource = IPV4_SRC_ADDRESS(),
netflowMessage = PAYLOAD_DATA();
}
stream<NetflowMessageType> NetflowMessageStream as Out = NetflowMessageParser(PacketStream) {
logic state: { map<uint8,rstring> ipProtocols = { 1: "icmp", 6: "tcp", 17: "udp" }; }
param
messageAttribute: netflowMessage;
sourceAttribute: netflowSource;
outputFilters: !parseError();
output Out:
captureTime = formatEpochTime(captureTime),
messageNumber = messagesProcessed(),
flowStartTime = formatEpochTime( captureTime - ( (float64)NETFLOW_SYSTEM_UPTIME() - (float64)NETFLOW_FIRST_SWITCHED() ) / 1000.0, 3u ),
flowEndTime = formatEpochTime( captureTime - ( (float64)NETFLOW_SYSTEM_UPTIME() - (float64)NETFLOW_LAST_SWITCHED() ) / 1000.0, 3u ),
flowDuration = formatElapsedTime( ( (float64)NETFLOW_LAST_SWITCHED() - (float64)NETFLOW_FIRST_SWITCHED() ) / 1000.0, 3u ),
flowBytes = NETFLOW_IN_BYTES(),
flowPackets = NETFLOW_IN_PKTS(),
ipProtocol = NETFLOW_PROTOCOL() in ipProtocols ? ipProtocols[NETFLOW_PROTOCOL()] : (rstring)NETFLOW_PROTOCOL(),
ipSourceAddress = NETFLOW_IPV4_SRC_ADDR()>0u ? convertIPV4AddressNumericToString(NETFLOW_IPV4_SRC_ADDR()) : size(NETFLOW_IPV6_SRC_ADDR())>0 ? convertIPV6AddressNumericToString(NETFLOW_IPV6_SRC_ADDR()) : "",
ipSourcePort = NETFLOW_SRC_PORT(),
ipDestinationAddress = NETFLOW_IPV4_DST_ADDR()>0u ? convertIPV4AddressNumericToString(NETFLOW_IPV4_DST_ADDR()) : size(NETFLOW_IPV6_DST_ADDR())>0 ? convertIPV6AddressNumericToString(NETFLOW_IPV6_DST_ADDR()) : "",
ipDestinationPort = NETFLOW_DST_PORT(),
tcpFlags = NETFLOW_TCP_FLAGS();
}
// See the $STREAMS_INSTALL/samples/com.teracloud.streams.network directory for more examples
}
References
Netflow version 5 messages and the fields they contain are described here:
Netflow version 9 messages and the fields they contain are described here:
Summary
- Ports
- This operator has 1 input port and 0 or more output ports.
- Windowing
- This operator does not accept any windowing configurations.
- Parameters
- This operator supports 4 parameters.
Required: messageAttribute, sourceAttribute
Optional: outputFilters, processorAffinity
- Metrics
- This operator does not report any metrics.
Properties
- Implementation
- C++
- Threading
- Always - Operator always provides a single threaded execution context.
- Ports (0)
-
The NetflowMessageParser operator requires one input port. One input attribute must be of type blob and must contain a Netflow message, excluding the network headers that proceed them in network packets, as specified by the required parameter messageAttribute.
The PAYLOAD_DATA() and IPV4_SRC_ADDRESS() output assignment functions of the PacketLiveSource and PacketFileSource operators produces attributes that can be consumed by the NetflowMessageParser operator.
- Properties
-
- Optional: false
- ControlPort: false
- TupleMutationAllowed: false
- WindowingMode: NonWindowed
- WindowPunctuationInputMode: Oblivious
- Assignments
- This operator allows any SPL expression of the correct type to be assigned to output attributes.
- Ports (0...)
-
The NetflowMessageParser operator requires one or more output ports.
Each output port will produce one output tuple for each input tuple if the corresponding expression in the outputFilters parameter evaluates true, or if no outputFilters parameter is specified.
Output attributes can be assigned values with any SPL expression that evaluates to the proper type, and the expressions may include any of the Netflow result functions. Output attributes that match input attributes in name and type are copied automatically.
- Properties
-
- TupleMutationAllowed: false
- WindowPunctuationOutputMode: Preserving
Required: messageAttribute, sourceAttribute
Optional: outputFilters, processorAffinity
- messageAttribute
-
This required parameter specifies an input attribute of type blob that contains a Netflow message to be parsed by the operator.
- Properties
-
- Type: blob
- Cardinality: 1
- Optional: false
- ExpressionMode: Attribute
- outputFilters
-
This optional parameter takes a list of SPL expressions that specify which Netflow messages should be emitted by the corresponding output port. The number of expressions in the list must match the number of output ports, and each expression must evaluate to a boolean value. The output filter expressions may include any of the Netflow result functions.
The default value of the outputFilters parameter is an empty list, which causes all Netflow messages processed to be emitted by all output ports.
- Properties
-
- Type: boolean
- Optional: true
- ExpressionMode: Expression
- processorAffinity
-
This optional parameter takes one expression of type uint32 that specifies which processor core the operator's thread will run on. The maximum value is P-1, where P is the number of processors on the machine where the operator will run.
Where the operator runs on a thread of its own, this parameter applies to the operator's thread. This is the situation when the operator's input port is configured as a threaded input port, and when the operator has an @parallel annotation.
Where the operator runs on the thread of an upstream operator, this parameter affects the thread of the operator that sends tuples to it. This is the situation when the operator is fused with an upstream operator.
The default is to dispatch the operator's thread on any available processor.
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: true
- ExpressionMode: Expression
- sourceAttribute
-
Specifies an input attribute of type uint32 that contains the IP source address of the Netflow message.
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: false
- ExpressionMode: Attribute
- NetflowMessageParser
-
stream<${outputSchema}> ${outputStream} = com.teracloud.streams.network.parse::NetflowMessageParser(${inputPacketStream}) { param messageAttribute: ${inputPacketStream-blob-attribute}; sourceAttribute: ${inputPacketStream-uint32-attribute}; ${parameter}: ${parameterExpression} output ${outputStream}: ${attributeAssignments} }
- common code for toolkit operators