Implementing a sliding window operator with the Java™ Operator API
A sliding window differs from a tumbling window by having a trigger policy in addition to an eviction policy.
StreamWindowListener
,
but two additional events are created to make a total of five:INSERT
: One or more tuples are being inserted into the window.INITIAL_FULL
: The window is full for the first time. This event allows listeners to delay submitting tuples that are based on the window state for trigger events that occur before the first time a window is considered full. For example, with an eviction policy ofcount(10)
and trigger policycount(2)
, there are four trigger events before the window is considered full, and thus aggregation might not be applicable.TRIGGER
: The window is being triggered according to its trigger policy. Listeners typically submit tuples that are based on an aggregated state of the tuples that are currently in the window.EVICT
: Tuples in the window are being evicted, thus (typically) the listener needs to update its window state that is based on the tuples that are removed.FINAL
: Final punctuation is received on the input port and thus no more tuples arrive. Even though the final punctuation is received, the window might not ever been full or reached a trigger according to its policies. It is up to the specific operator or listener implementation as to how to handle this situation.
StreamWindowListener.Type
contains
details on the ordering of events for the possible combinations of
policy types.Implementations of StreamWindowListener
can be
written to be independent of the types or sizes of eviction and trigger
policies. Instead, just generically handle the events that indicate
tuples are evicted and a trigger occurred. You can implement StreamWindowListener
directly,
as in the Implementing a tumbling window operator with the Java Operator API, or subclass StatefulWindowListener
,
which provides typed state management through use of Java™ generics. A sample implementation of StatefulWindowListener
is
provided which aggregates the sum of a decimal attribute and count
of tuples in a window, supporting tumbling and sliding windows. The
class is com.ibm.streams.operator.samples.windows.DecimalSumAggregatorListener
.
The DecimalSumAggregatorListener
maintains the sum
and count in a static inner class DecimalSumAggregatorListener.Aggregator
,
thus the class is declared as:
public class DecimalSumAggregatorListener extends
StatefulWindowListener<DecimalSumAggregatorListener.AggregateInfo, Tuple> {
This example indicates that the state is of typeDecimalSumAggregatorListener.AggregateInfo
and
it is aggregating Tuple
objects. This example then
means the method getPartitionState()
is typed to
return an AggregateInfo
.
INSERTION Event
TheDecimalSumAggregatorListener
handles
a sliding window. Then on tuple insertion into the window, the sum
and count are incrementally maintained using the AggregateInfo
:case INSERTION:
/*
* On insertion update the sum and count with each tuple being
* inserted by adding its attribute's value to the sum.
*/
for (Tuple tuple : event.getTuples()) {
state.sum = state.sum.add(tuple.getBigDecimal(attributeName),mc);
state.count++;
}
break;
This INSERTION logic works for sliding and tumbling windows.
EVICTION Event
For eviction, the logic is different for tumbling and sliding. With a tumbling window, the state is used to submit tuples with the sum and count to the output port. With a sliding window, the state must be incrementally modified by the specific tuples that are evicted:case EVICTION:
if (getWindow().getType() == StreamWindow.Type.TUMBLING) {
// No need to maintain the state incrementally,
// each tumble will allocate a new state via
// getInitializedState().
// Just submit the aggregation.
submitAggregateOutput(state);
break;
}
/*
* Sliding window only.
* On eviction update the sum and count with each tuple being
* removed by subtracting its attribute's value from the sum.
*/
for (Tuple tuple : event.getTuples()) {
if (mc != null)
state.sum = state.sum.subtract(tuple
.getBigDecimal(attributeName), mc);
state.count--;
}
break;
TRIGGER Event
When a trigger event occurs (only for sliding window) a tuple is submitted to the output port that contains the sum and count, but only if the window becomes full at least once.case TRIGGER:
// Only trigger once a full window has been seen.
if (!seenInitialFull(event.getPartition()))
break;
/**
* Submit the aggregation to the output port.
*/
submitAggregateOutput(state);
break;
Here the class uses the methods of the super-class StatefulWindowListener
,
which is tracking if the INITIAL_FULL
event is seen
(on a per-partition basis). The submitAggregateOutput()
populates
and submits a tuple from the values in the state.
protected void submitAggregateOutput(AggregateInfo state) throws Exception {
// Only submit output if the window contains tuples.
if (state.count == 0)
return;
OutputTuple tuple = outputPort.newTuple();
setupOutputTuple(tuple, state);
outputPort.submit(tuple);
outputPort.punctuate(StreamingData.Punctuation.WINDOW_MARKER);
}
protected void setupOutputTuple(OutputTuple tuple, AggregateInfo state) {
tuple.setBigDecimal("sum", state.sum);
tuple.setInt("count", state.count);
}
To use an implementation of StatefulWindowListener
you
construct an instance in an Operator
's initialize()
method,
passing in the reference to the StreamWindow
it applies
to. The StatefulWindowListener
itself registers StreamWindowListener
with
the window. With most implementations, the constructor is passed more
information than just the window reference. For example, DecimalSumAggregatorListener
has
this example as its constructor:
public DecimalSumAggregatorListener(StreamWindow<Tuple> window,
String attributeName,
StreamingOutput<OutputTuple> outputPort) {
super(window);
…
Thus the use of this code in an operator is shown in this example.
@Override
public void initialize(OperatorContext context) throws Exception {
super.initialize(context);
String sumAttribute = context.getParameterValues("sumAttribute").get(0);
new DecimalSumAggregatorListener(
getInput(0).getStreamWindow(),
sumAttribute, getOutput(0));