EventTime annotation

The SPL annotation @eventTime specifies that all the output ports of operators where the annotation is in effect emit a stream with event-time values and watermarks. An event-time graph starts from the output port of an event-time source operator. The event-time source operator inserts watermarks into the output stream from time to time.

When the @eventTime annotation is applied to a primitive operator, it indicates that the operator and all the downstream operators which are connected using event-time streams participate in an event-time graph.

The @eventTime annotation applies to the annotated operator and all the downstream operators within a connected graph which contain the eventTimeAttribute in their input tuple schemas with the type specified by the annotated operator output schema:
  • The event-time graph is contiguous.
  • Event-time connectivity extends only downstream. If, for example, the annotated operator's output is merged with another stream at the input port of the downstream operator, the other stream is not part of the event-time graph, even though it has an equivalent schema.
  • The event-time graph ends at a sink or at an operator which does not output the event-time attribute.
    Note: The primitive operator must have an output port. This output port must have a schema that has an attribute with one of the accepted event-time tuple types (uint64, int64, timestamp). Otherwise the @eventTime annotation is not valid.

1   @eventTime ( eventTimeAttribute = name?  , resolution = resolution-value?  , lag = lag-valueSeconds?  , minimumGap = minimumGap-valueSeconds  )

The @eventTime annotation can be applied to primitive operators, only.

eventTimeAttribute
Specifies the name of the tuple attribute which represents the event time of the tuple. The following attribute types are supported:
timestamp
Represents a point in time measured in nanoseconds since the Unix Epoch. The only legal resolution value is nanoseconds. The machine identifier in an SPL timestamp (and its Java/C++ representations) is ignored for any event-time calculations.
uint64
Represents the number of time units since the Unix Epoch.
int64
Represents the number of time units since the Unix Epoch.
Note:
  • If the attribute name exists on multiple output ports, all attributes must have the same type.
  • Optional data types are not supported as event-time attributes.
  • eventTimeAttribute and resolution indicate how the event-time values are represented in the SPL graph.
resolution
Specifies the resolution of the event-time attribute in:
  • Milliseconds. This is the default value for the event-time attribute of type int64 or uint64 if the resolution element is not specified.
  • Microseconds.
  • Nanoseconds. If the event-time attribute is of type SPL timestamp, the default resolution value is nanoseconds.
    Note: A compiler error is issued if the resolution element does not specify nanoseconds.
Note: eventTimeAttribute and resolution indicate how the event-time values are represented in the SPL graph.
lag
Defines the duration in seconds between the maximum event-time of submitted tuples and the value of the watermark to submit. The operator watermark (wm) is:
wm = max(event_time_of_input stream) - lag
The lag-valueSeconds is specified as float64. If it is not specified, the default value is 0.0.
Note: lag and minimumGap specify how watermarks are generated.
minimumGap
Defines the minimum event-time duration in seconds between subsequent watermarks. The minimumGap-valueSeconds is specified as float64. If it is not specified, the default value is 0.1(100 milliseconds).
Note: lag and minimumGap specify how watermarks are generated.

Example 1 - Emit watermarks on all output ports

The following example shows how to specify an event-time source which emits watermarks on all the output ports that have an eventTimeAttribute n readingTime of an accepted event-time type. The attribute resolution-value represents milliseconds of event-time.

@eventTime(eventTimeAttribute=readingTime, resolution=Milliseconds)

Example 2 - Emit watermarks as specified by the annotation elements

The following example shows how to specify an event-time source which emits watermarks as specified by the annotation elements. The watermark lag has a default value specified at compile time as value defaultLag and is overwritten at submission time by value watermarkLag. The minimumGap element can be specified at submission time by value minimumGap.


@eventTime(eventTimeAttribute=readingTime, resolution=Milliseconds, 
         lag=(float64)getSubmissionTimeValue("watermarkLag"), minimumGap=(float64)getSubmissionTimeValue("minimumGap"))

Example 3 - An operator creates two event-time streams with error

The following example shows how an operator creates two event-time streams. Out1 has an event time of type timestamp, Out2 has an event time of type int64. In this case the SPL compiler returns an error because the two streams use event-time attributes with the same name (et) but with different data types.

@eventTime(eventTimeAttribute=et, resolution=Milliseconds)
    (stream<uint32 b, uint64 et> Out1; stream<uint32 b, int64 et> Out2) = MySource()
    { . . . }