Primitive C++ operators with windows in consistent regions
To checkpoint and reset the state of operators that use the C++ windowing library in a consistent region, operators must call window state management functions.
Window API functions
The C++ windowing library provides the following functions to support state management within consistent regions.
virtual void drain();
- Drain outstanding window processing. Wait until pending tuple eviction and window trigger actions finish.
virtual void checkpoint(Checkpoint & data);
- Checkpoint the window state into the specified object. The window state includes window contents and state that is associated with the eviction and trigger policies.
virtual void reset(Checkpoint & data);
- Restore the window to the state read from the specified checkpoint.
virtual void resetToInitialState();
- Restore the window to the initial state.
Window events
A window can be associated
with one or more listeners that implement the WindowEvent
interface
(two listeners cannot receive the same event). When a window checkpoints,
resets, or resets to its initial state, one single listener, which
is registered as the window’s serialization handler, receives
events immediately after the window checkpoints, resets, or resets
to its initial state. The WindowEvent
interface contains
the following functions.
virtual void onCheckpointEvent(Checkpoint & ckpt) const {}
- This event is fired after the window is saved to the specified checkpoint. The event handler is responsible for writing its state to the checkpoint stream.
virtual void onResetEvent(Checkpoint & ckpt) {}
- This event is fired after the window's state is read from the specified checkpoint. The event handler is responsible for reading its state, which was saved to the checkpoint stream.
virtual void onResetToInitialStateEvent() {}
- This event is fired after the window's state is initialized. The event handler is responsible for initializing its state.
C++ primitive operator with window
- State handler
- C++ operators that use windows require a
StateHandler
implementation to drain, checkpoint, and reset the windows. The following code sample illustrates how theStateHandler
implementation invokes the window API.void MY_OPERATOR::drain() { . . . _window.drain(); } void MY_OPERATOR::checkpoint(Checkpoint & ckpt) { . . . _window.checkpoint(ckpt); } void MY_OPERATOR::reset(Checkpoint & ckpt) { . . . _window.reset(ckpt); } void MY_OPERATOR::resetToInitialState() { . . . _window.resetToInitialState(); }
- Tumbling window summarizer
- A tumbling window summarizer can be used to remove the need for the windowing library to retain
all the tuples in a tumbling window. A summarizer can contain state, therefore it needs to save or
load the state when the window checkpoints or resets. The
TumblingWindowSummarizer
interface contains the following events.virtual void onCheckpointEvent(Checkpoint & ckpt) {}
- This event is fired when the current window’s state checkpoints. Write the summarizer code to serialize its state into the specified checkpoint stream.
virtual void onResetEvent(Checkpoint & ckpt) {}
- This event is fired when the current window is restored to the state that is provided by the specified checkpoint. Write the summarizer code to restore its state by reading from the specified checkpoint stream.
A window re-creates its summarizers when it is reset to its initial state.
The following example illustrates how to update a tumbling window summarizer that calculates the count of windowed tuples to participate in consistent regions:
#define MY$OP MY_OPERATOR_SCOPE::MY_OPERATOR struct MY_OPERATOR::MyCountSummarizer : public SPL::TumblingWindowSummarizer<MY$OP::IPort0Type,MY$OP::PartitionByType> { MY$OP& operator_; uint64_t count_; MyCountSummarizer(SPL::Operator& oper): operator_(static_cast<MY$OP&>(oper)) { Count_ = 0; } void onTupleInsertionEvent(MY$OP::IPort0Type const& tuple) { count_++; } void onCheckpointEvent(SPL::Checkpoint & ckpt) const { ckpt << count_; } void onResetEvent(SPL::Checkpoint & ckpt) { ckpt >> count_; } };
- Partition type helper class
- The PartitionType helper class generated that uses the
emitClass
function within theSPL::CodeGen
module provides serialization support.