Implementing a tumbling window operator with the Java™ Operator API
When an operator implemented with the Java™ Operator API is invoked, any window configurations
for its input ports are made available through the instance of the StreamWindow
interface
that is accessed by StreamingInput.getStreamWindow()
.
Every input port has its own instance of StreamWindow
,
even when the input port is not windowed.
For more information about windowing, seeWindow handling. For more information about Java™ Operator API, see Java Operator API overview.
For a tumbling window, StreamWindow.getType()
returns
the enumeration StreamWindow.Type.TUMBLING
, and StreamWindow.getEvictionPolicy()
returns
the tumbling window eviction policy.
User code acts upon window events by implementing a StreamWindowListener
and
typically a listener implementation can be written to be independent
of the eviction policy. The StreamWindowListener
implementation
must be registered using StreamWindow.registerListener
at Operator.initialize()
time
to ensure that all tuples are seen by the listener. For complete flexibility,
tuples are delivered to the listener as window events and then also
passed to the Operator
using the process()
method.
A custom Java™ operator typically,
though, needs only to handle tuples using the StreamWindowListener
,
such as an aggregator for a single input port. In this case, the Operator
can
extend from AbstractWindowOperator
, which enforces
that all tuples and punctuation are handled through the registered StreamWindowListener
.
The following examples all assume AbstractWindowOperator
as
a super class.
StreamWindowEvent
with
a specific type, enumerated by StreamWindowEvent.Type
.
For tumbling windows, three events can be passed to the StreamWindowListener
:INSERT
: One or more tuples are being inserted into the window.EVICT
: All tuples in the window are being evicted, thus (typically) the listener needs to submit any tuples that are derived from the window state and then reset its state to an empty window.FINAL
: Final punctuation is received on the input port and thus no more tuples arrive. Even though the final punctuation is received, the window might not be tumbled according to its eviction policy. It is up to the specific operator or listener implementation as to how to handle this situation.
Here is an example that walks through building an aggregator operator in Java™ that determines the maximum value of an integer attribute in a window, and submits a single tuple on eviction that contains a single attribute with the maximum value. The general framework for the listener is as follows:
public class TumbleMaxAggregator implements StreamWindowListener<Tuple> {
private int tupleCount;
private int maxReading;
// Single output port
private final StreamingOutput<OutputTuple> output;
@Override
public synchronized void handleEvent(StreamWindowEvent<Tuple> event) throws Exception {
switch (event.getType()) {
case INSERTION:
// handle insertion of tuples into the window
break;
case EVICTION:
// handle the tumble
break;
case FINAL:
// handle final mark
break;
}
}
}
Since this example is a listener that supports tumbling windows only, it needs to handle three types of events.
INSERTION Event
For the INSERTION event, the code is simple, iterate through all the tuples that are inserted and if there is a higher maximum than the current state, then update it.case INSERTION:
// handle insertion of tuples into the window
for (Tuple tuple : event.getTuples()) {
float reading = tuple.getFloat("reading");
if (tupleCount == 0)
maxReading = reading;
else if (reading > maxReading)
maxReading = reading;
tupleCount++;
}
break;
EVICTION Event
For the EVICTION event, an output tuple is created, populated with the maximum reading, and then submitted, but only if there is a tuple during this tumble. An empty window occurs when there is a time-based eviction policy and no tuples arrived during the time period. By convention, a window punctuation mark is submitted after the window tumble. Finally, the aggregator's state is reset for the next window.case EVICTION:
// handle the tumble
if (tupleCount != 0) {
OutputTuple tuple = output.newTuple();
tuple.setFloat("maxReading", maxReading);
output.submit(tuple);
output.punctuate(Punctuation.WINDOW_MARKER);
tupleCount= 0;
maxReading = 0;
}
break;
FINAL Event
For the FINAL event, there are two implementation choices. First do nothing that would be handled by:case FINAL:
// handle final mark, do nothing
break;
Or treat the final mark as an eviction event and submit using the latest (and last) maximum value, which would be handled by having the FINAL and EVICTION types share code:
case EVICTION:
case FINAL:
// handle the tumble
// treat end of the data as a tumble
...
There is a working simple maximum value aggregator, but to have
it be run by a stream application,
it must be used in the context of a Java™ operator. First,
the TumbleMaxAggregator
requires an output port to
be specified to indicate where to submit tuples to, so it can be passed
in the constructor.
public TumbleMaxAggregator(StreamingOutput<OutputTuple> output) {
this.output = output;
}
Then, an operator is needed to registers this listener:
public class MaxAggregator extends AbstractWindowOperator {
@Override
public void initialize(OperatorContext context) throws Exception {
super.initialize(context);
getInput(0).getStreamWindow().registerListener(
new TumbleMaxAggregator(getOutput(0)), false);
}
}