Tumbling and sliding windows
The SPL language supports tumbling and sliding windows to store tuples while they preserve the order of arrival, but differ in how they handle tuple evictions.
Rather than keeping all the tuples ever inserted, windows are configured to evict expired tuples. In this respect, tumbling windows operate in batches. When a tumbling window fills up, all the tuples in the window are evicted. This process is called a window flush.
Conversely, sliding windows operate in an incremental fashion. When a sliding window fills up, the future tuple insertions result in evicting the oldest tuples in the window.
The details of tuple eviction are defined by the eviction policy. There are different eviction
policies. Consider a count-based eviction policy of size c
. A tumbling window that
is configured in this way flushes whenever the number of tuples in the window reaches
c
. A similarly configured sliding window will evict the first tuple when
c+1
th tuple is received and each new tuple after that evicts the oldest
tuple in the window.
Tumbling window:
() -> (1) -> (2, 1) -> (3, 2, 1) -> (4, 3, 2, 1) -> () -> (5) -> (6, 5) -> ...
Sliding Window:
() -> (1) -> (2, 1) -> (3, 2, 1) -> (4, 3, 2, 1) -> (5, 4, 3, 2) -> (6, 5, 4, 3) -> ...
The trigger policy
Another important way in which tumbling and sliding windows differ is with how they trigger. Window triggers are events that indicate that the window is ready for processing. What the term processing means is application specific. In the context of an Aggregate operator, it might be the computation of an aggregation function, for a Sort operator it might be the sorting of the window. The window trigger event is the same as the window flush event for tumbling windows.
For a sliding window, the trigger policy defines when the window triggers. Consider
the simple case of a count-based trigger policy of size c
. The window will trigger
after c
new tuples are received since the last trigger.
*
is used to mark the window
trigger
events.Sliding Window:
() -> (1) -> *(2, 1) -> (3, 2, 1) -> *(4, 3, 2, 1) -> (5, 4, 3, 2) -> *(6, 5, 4, 3) -> ...
Partitioned variants of sliding and tumbling windows
The SPL language also supports partitioned variants of sliding and tumbling windows. For partitioned windows, eviction and trigger policies apply to individual partitions, independent of other partitions. Each tuple that is inserted into the window is associated with a user-specified partition. In effect, each partition has its own subwindow and all the windowing configurations apply to the subwindows, independently. As a result, window events such as evictions and triggers have partitions that are associated with them.
For tumbling windows, the subwindows that are associated with the partitions are removed when the windows are flushed. Conversely, for sliding windows the subwindows that are associated with the partitions might never become empty in certain cases. For example, consider a count or delta-based eviction policy; when the window is already partially populated, it can never become empty. It is important to understand this distinction, as it has implications on the memory usage.
Data Items with Partitions:
1a 2b 3a 4b 5b, 6b, 7a, 8a, 9b, 10b, 11a, 13a
Tumbling Partitioned Window:
a: () -> (1) -> (3, 1) -> (7, 3, 1) -> (8, 7, 3, 1) -> () -> (11) -> (13, 11) -> ...
b: () -> (2) -> (4, 2) -> (5, 4, 2) -> (6, 5, 4, 2) -> () -> (9) -> (10, 9) -> ...
Sliding Partitioned Window:
a: () -> (1) -> (3, 1) -> (7, 3, 1) -> (8, 7, 3, 1) -> (11, 8, 7, 3) -> (13, 11, 8, 7) -> ...
b: () -> (2) -> (4, 2) -> (5, 4, 2) -> (6, 5, 4, 2) -> (9, 6, 5, 4) -> (10, 9, 6, 5) -> ...
Eviction and trigger policies
- Count-based
- Count-based window policies are configured with a size parameter. An example is
CountWindowPolicy policy(5);
. For a tumbling window, when used to characterize an eviction policy, the size defines the number of tuples that constitute a full window. When a new tuple is received, it is inserted into the window. After the insertion, if the number of tuples in the window is equal to the specified size (a full window), then the whole window is flushed (mass eviction).For a sliding window, when used to characterize:- An eviction policy, the size defines the number of tuples to be kept in the window, as it slides. When a new tuple is received, if the number of existing tuples in the window is equal to the size specified, then the oldest tuple is evicted from the window. The new tuple is inserted into the window after the eviction (if any) is performed.
- A trigger policy, the size defines the number of tuples to be received since the last trigger, before triggering the window again. When a new tuple is received, the number of tuples that are received since the last trigger is updated. If this number is equal to the specified size, then the window is triggered again. The trigger policy (count-based) processing takes place after the eviction policy (count-based or delta-based) processing.
- Delta-based
- Delta-based window policies are configured with a size parameter and an attribute.
For instance, consider a tuple of type
TType
and an attribute that is namedage
in that tuple. Here is how a window policy fordelta(age, 15)
is created in C++:DeltaWindowPolicy<TType, // tuple type TType::age_type, // attribute type &TType::get_age // function to get the attribute > policy(15);
If pointers to tuples are to be stored in the window, instead of tuples, the example can be changed to:
DeltaWindowPolicy<TType *, // tuple type TType::age_type, // attribute type &TType::get_age // function to get the attribute > policy(15);
For a tumbling window, when used to characterize an eviction policy, the size defines the boundary of a full window in the units of the eviction policy attribute. When a new tuple is received, if the delta between its eviction policy attribute value and that of the oldest tuple in the window is larger than the eviction policy size, then the whole window is flushed (mass eviction). The new tuple is inserted into the window after the flush operation (if any) is performed.
For a sliding window, when used to characterize:- An eviction policy, the size defines the boundary of the window, as it slides, in the units of the eviction policy attribute. When a new tuple is received, existing tuples in the window for which the delta between the eviction policy attribute value of the new tuple and that of the existing tuple is greater than the eviction policy size are evicted. The new tuple is inserted into the window after the evictions (if any) are performed.
- A trigger policy, the size defines the boundary of the sequence of tuples that are received since the last trigger that triggers the window again. When a new tuple is received, if the value of its trigger attribute is larger than the trigger attribute of the last tuple that triggered the window plus the trigger policy size, the window is triggered again. The trigger policy (delta-based) processing takes place before the eviction policy (count or delta-based) processing.
- Time-based
- Time-based window policies are configured with a time period parameter (in
seconds). An example is
TimeWindowPolicy policy(5);
For a tumbling window, when used to characterize an eviction policy, the time period defines the boundary of a full window in units of real time. When a new tuple is received, it is inserted into the window. When the time elapsed since the last flush event exceeds the specified time period, then the whole window is flushed (mass eviction). Window flushes take place independently of tuple insertions.
For a sliding window, when used to characterize:- An eviction policy, the time period defines the boundary of the window, as it slides, in units of real time. When a new tuple is received, it is inserted into the window. Tuples that are in the window longer than the specified time period are evicted. Tuple evictions take place independently of tuple insertions.
- A trigger policy, the time period defines the inter-arrival time for successive trigger events. When the time elapsed since the last trigger event exceeds the specified time period, a new trigger is generated. The trigger policy (time-based) processing takes place independently of the eviction policy processing.
- Punctuation-based
- Punctuation-based window policies have no configuration options. An example is
PunctWindowPolicy policy();
A tumbling window with punctuation-based eviction policy is considered full when a punctuation is received. When a tuple is received, it is inserted into the window. When a punctuation is received, the whole window flushes (mass eviction).
Sliding windows cannot have punctuation-based eviction or trigger policies.
Partition eviction policies
Partitioned windows are used to group related tuples for processing. Each partition
contains 0
to n
tuples. In partitioned windows, tuples might never
be evicted from a partition. For example, consider an Aggregate operator with a
partitioned tumbling window that has a trigger policy of count(100)
. If the
application generates only 99 tuples in each partition, the partitions would never trigger, and the
memory usage of the window would continue to increase. Sometimes, the application requirements
specify that if no tuples are generated during a specific duration, the existing tuples in the
window are not useful. In such situations, you must delete partitions and recover memory that is
allocated to the tuples.
partitionAge(float64 n)
- If a tuple is not inserted in a partition for
n
seconds, the partition is deleted. partitionCount(uint32 c)
- If the number of partitions exceeds a count of
c
, partitions are deleted until the partition count is equal to c. tupleCount(uint32 t)
- If the total number of tuples across all partitions exceeds a count of
t
, partitions are deleted until the tuple count is less than or equal tot
.
OperatorDefined
- Implement and register an event handler to select the partitions to evict.
LRU
- Use the default event handler to evict the oldest partitions. A time-ordered list of existing partitions and the time when the last tuple was inserted is maintained to identify the least recently used partition.
OperatorDefined
and
LRU
policies.Execution order of events
The executions of the window events are set off by the insert
calls made on the
window object. The insertion, eviction, and trigger events are run in a certain order when the
insert
call is made. As an exception, for time-based window policies, the eviction
and trigger processing might proceed independently of the insertion, depending on the specific
time-based policies used. The specific order that is used for delivering window events depends on
the window policies specified. Two tables, one for tumbling windows (Table 1) and another for sliding windows (Table 2) summarize the execution order of the window
events for different eviction and trigger policies.
Eviction policy | Order of execution |
---|---|
Count-based | first insert tuple into window, then perform eviction |
Delta-based | first perform eviction, then insert tuple into window |
Time-based | perform evictions independently of tuple insertions |
Punctuation-based | perform eviction when a punctuation is received |
Notice the order of insertion and eviction are different for count-based and delta-based eviction
policies. This difference is because for a count-based tumbling window of size n
,
the window is full when the n
th tuple is received. Whereas for a
delta-based window of size delta-n
, the window is full when a tuple that increases
the delta value beyond n
is received. In the latter case, the tuples are first
evicted from the window and then the newly received tuple is inserted into the emptied window.
Eviction\Trigger | Count-based | Delta-based | Time-based |
---|---|---|---|
Count-based | evict → insert → trigger | trigger → evict → insert | evict → insert | trigger |
Delta-based | evict → insert → trigger | trigger → evict → insert | evict → insert | trigger |
Time-based | insert → trigger | evict | trigger → insert | evict | insert | evict | trigger |
For sliding windows that do not involve time-based policies, eviction always precedes insertion,
since evictions are needed to make room for the new tuple that arrives. However, the trigger
processing might come first or last depending on whether there are count-based or delta-based trigger
policies. For count-based trigger policies, the triggering happens at the end because the newly
inserted tuple is part of the contents of the window that needs to be triggered. For instance, for a
count-based trigger policy of size n
, the n
th tuple
that is received since the last trigger results in a new trigger and that
n
th tuple is part of the triggered window, meaning that the trigger
succeeds the insertion. However, the delta-based windows are different because the newly received
tuple is not part of the contents of the window that needs to be triggered. For instance, for a
delta-based trigger policy of size delta-n
, the tuple that increases the delta
value beyond n
since the last trigger results in a new trigger and that tuple is
not part of the triggered window, meaning that the trigger precedes both the insertion and the
eviction. For time-based eviction policies, the eviction happens independently of other events, and
similarly, for time-based trigger policies, the trigger happens independently of other events.
Defining tumbling and sliding windows
TumblingWindow
and
SlidingWindow
template classes. They both extend from the common
Window
class and have the same template parameters as Window
. The
common base class for tumbling and sliding windows template is as
follows:template <class TupleType,
class PartitionType=int32_t,
class DataType=std::deque<TupleType>,
class StorageType=std::tr1::
unordered_map<PartitionType,DataType> >
class Window;
The types that are involved in the template are:
TupleType
is the type of items that are stored in the window. It can be a pointer or non-pointer type.PartitionType
is the type of the window partitioning attribute.DataType
is the type of the structure that holds the tuples for each partition.StorageType
is the type of the structure that holds the data that is associated with all the partitions. It is a mapping from the partition attribute to the data for that partition.
All of these types are also defined within the Window
,
TumblingWindow
, and SlidingWindow
classes as typedefs. In general,
the developers customize only TupleType
and PartitionType
. The
following are some example templates.
class TumblingWindow<TType>;
class SlidingWindow<TType *, float32>; // partitioned window
For windows that store pointers to tuples, the memory management responsibilities belong to the user.
Tumbling window constructors take three parameters, a reference to the operator that contains the window, an index to the input port that this window is associated with, and the eviction policy for the window. Sliding windows are similar, but they also take the trigger policy, as the last parameter. The constructor signatures are given as follows:
TumblingWindow(Operator & oper, uint32_t port,
WindowPolicy const & evictionPolicy);
SlidingWindow(Operator & oper, uint32_t port,
WindowPolicy const & evictionPolicy,
WindowPolicy const & triggerPolicy);
The following are some example window configurations:
CountWindowPolicy pC5(5), pC2(2); // count-based policies
PunctWindowPolicy pP; // punctuation-based policy
DeltaWindowPolicy<TT,TT::age_type,&TT::get_age> pDage2(2), pDage0(0); // delta-based policy
DeltaWindowPolicy<TT,TT::time_type,&TT::get_time> pDtime2(2.0); // delta-based policy
TimeWindowPolicy pT8(8.0), pT2(2.0); // time-based policy
TumblingWindow<TT> wT_C5(op, 0, pC5); // tumbling, count(5)
TumblingWindow<TT> wT_P(op, 0, pP); // tumbling, punct()
TumblingWindow<TT> wT_Dage2(op, 0, pDage2); // tumbling, delta(age,2)
TumblingWindow<TT> wT_T8(op, pT8); // tumbling, time(8)
SlidingWindow<TT> wS_C5_C2(op, 0, pC5, pC2); // sliding, count(5), count(2)
SlidingWindow<TT> wS_C5_Dage2(op, 0, pC5, pDage2); // sliding, count(5), delta(age, 2)
SlidingWindow<TT> wS_Dage2_C5(op, 0, pDage2, pC5); // sliding, delta(age, 2), count(5)
SlidingWindow<TT> wS_Dage2_Dage0(op, 0, pDage2, pDage0); // sliding, delta(age, 2), delta(age, 0)
SlidingWindow<TT> wS_Dage2_T2(op, 0, pDage2, pT2); // sliding, delta(age, 2), time(2)
SlidingWindow<TT> wS_T8_C5(op, 0, pT8, pC5); // sliding, time(8), count(5)
SlidingWindow<TT> wS_T8_Dage2(op, 0, pT8, pDage2); // sliding, time(8), delta(age, 2)
SlidingWindow<TT> wS_T8_T2(op, 0, pT8, pT2); // sliding, time(8), time(2)
Inserting tuples
SPL run time has an event-driven windowing API. Except for certain events that are fired based on time-based eviction and trigger polices, most window events fire as a result of tuples or punctuation being inserted into the window. The following are the signatures of the two window functions available for inserting tuples and punctuation into the window.
void insert(TupleType const & tuple,
PartitionType const & partition=PartitionType());
void insert(Punctuation const & punct);
The partition parameter of the tuple insertion function is optional. For non-partitioned windows,
it can be omitted. A non-partitioned window has a single default partition with a default partition
attribute type (int32_t)
and value (0
).
Evicting partitions
If a partition eviction policy is defined for a window, each time a tuple is
inserted into a partition, the window is checked to see whether partition eviction is required. If
partition eviction is required and the LRU
option is selected, the default
onWindowPartitionEvictionSelection
event handler is called to select the partitions
to evict. The oldest partitions are selected for eviction from a time-ordered list of partitions. If
partition eviction is required and the OperatorDefined
option is selected, the
user-defined onWindowPartitionEvictionSelection
event handler is called to allow
the operator to select partitions for eviction.
If partitions are selected for eviction and the
onwindowPartitionEvicition
event handler is registered, it is called to take
user-defined actions before it deletes the partitions. For example, you can delete the tuples in a
partition to recover the memory that is allocated to the tuples before you delete the
partitions.
- Evict partitions to control memory use:
- The developer must edit the
<oper-name>_cpp.cgt
file to use thegetPartitionedWindowCppInitializer
function with the selectionType parameter set toLRU
. - The developer can implement and register the
onWindowPartitionEviction
function to delete tuples for windows that store pointers to tuples.
- The developer must edit the
- Evict partitions with user-defined actions:
- The developer must edit the
<oper-name>_cpp.cgt
file to use thegetPartitionedWindowCppInitializer
function with the selectionType parameter set toLRU
. - The developer must implement and register the
onWindowPartitionEviction
function to get notification of partitions that are selected for eviction and to define the action to take before the partitions are evicted.
- The developer must edit the
- Select partitions for eviction:
- The developer must edit the
<oper-name>_cpp.cgt
file to use thegetPartitionedWindowCppInitializer
function with the selectionType parameter set toOperatorDefined
. - The developer must implement and register the
onWindowPartitionEvictionSelection
function to select the partitions to evict with themarkForRemoval()
function of thePartitionSelectionIterator
class. - The developer can implement and register the
onWindowPartitionEviction
function to delete tuples for windows that store pointers to tuples or to take specific action when partitions are evicted.
- The developer must edit the
The following example shows a sample <oper-name>_cpp.cgt
file.
<%
my $inputPort = $model->getInputPortAt(0);
my $outputPort = $model->getOutputPortAt(0);
my $inTupleName = $inputPort->getCppTupleName();
my $inTupleType = $inputPort->getCppTupleType();
my $outTupleType = $outputPort->getCppTupleType();
my $partitionByParam = $model->getParameterByName("partitionBy");
my $partitionByInitializer = SPL::CodeGen::getParameterCppInitializer($partitionByParam);
my $window = $inputPort->getWindow();
my $windowCppInitializer = SPL::CodeGen::getPartitionedWindowCppInitializer($window,
"IPort0Type*",
"OperatorDefined");
%>
<%SPL::CodeGen::implementationPrologue($model);%>
MY_OPERATOR::MY_OPERATOR()
: MY_BASE_OPERATOR(), _window(<%=$windowCppInitializer%>)
{
<%if($window->hasPartitionEvictionPolicy()) {%>
_window.registerOnWindowPartitionEviction(this);
_window.registerOnWindowPartitionEvictionSelection(this);
<%}%>
}
MY_OPERATOR::~MY_OPERATOR()
{
// Delete any remaining tuples in the window
_window.deleteWindowObjects();
}
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)
{
AutoPortMutex apm(_mutex, *this);
IPort0Type const & <%=$inTupleName%> = static_cast<IPort0Type const&>(tuple);
PartitionByType partition(<%=$partitionByInitializer%>);
_window.insert(new IPort0Type(<%=$inTupleName%>), partition);
}
<%if($window->hasPartitionEvictionPolicy()) {%>
// We will get called when the partition eviction criteria has been reached.
// This can be partitionAge, partitionCount, or tupleCount. For this example,
// We will just select all the least recently used partitions that have no tuples in
// them, and then select one that still contains tuples.
// If that isn't enough to satisfy the criteria, we will be re-invoked to do more.
// We could use examine the partition eviction kind and expressions if we wanted to be more
// precise.
void MY_OPERATOR::onWindowPartitionEvictionSelection(WindowEventType::WindowType & window,
WindowEventType::PartitionSelectionIterator const& begin,
WindowEventType::PartitionSelectionIterator const& end)
{
// Remove all the old empty partitiones, and one non-empty one
for (PartitionSelectionIterator it = begin; it != end; it++) {
if (window.getWindowData(it.partition()).size() == 0) {
it.markForRemoval();
} else {
// this window isn't empty
it.markForRemoval();
break;
}
}
}
// For this oper, we are storing pointers to Tuples in the partitioned windows. When
// a partition is being evicted, we have to delete all the allocated tuples in that partition.
void MY_OPERATOR::onWindowPartitionEviction(WindowEventType::WindowType & window,
WindowEventType::PartitionIterator const& begin,
WindowEventType::PartitionIterator const& end)
{
// We have a window being evicted. Clean up
WindowEventType::PartitionIterator it;
for (it = begin; it != end; it++) {
// delete the tuples
WindowEventType::PartitionType const & partition = it.partition();
WindowType::DataType & data = window.getWindowData(partition);
for(uint32_t i=0, iu=data.size(); i<iu; ++i)
delete data[i];
}
}
<%}%>
<%SPL::CodeGen::implementationEpilogue($model);%>
Tumbling window summarizer
A tumbling window summarizer can be used to remove the need for the Windowing
library to retain all tuples in a tumbling window. If your operator would like to use the facilities
of the windowing library to group and partition tuples, but just needs to look at incoming tuples
once, a tumbling window summarizer can be used. An example is the Sum()
function in
Aggregate
for a tumbling window.
Here is the definition of the TumblingWindowSummarizer
class.
/// Inherit from this class and override select methods to implement
/// summarizers for tumbling windows.
template <class T, class G=int32_t>
class TumblingWindowSummarizer
{
public:
typedef T TupleType; //!< tuple type
typedef G PartitionType; //!< partition type
/// Destructor. The destructor is called immediately after the
/// afterWindowFlushEvent window event.
virtual ˜TumblingWindowSummarizer() {}
/// This event is fired when a new window is opened.
/// A new window is opened when the first tuple that is part of the new
/// window is received. The construction happens after the
/// beforeTupleInsertionEvent window event and before the
/// afterTupleInsertionEvent window event.
/// @param partition the partition for which the summarizer is created
virtual void onOpenWindowEvent(PartitionType const & partition) {}
/// This event is fired when a new tuple is added to the window
/// corresponding to this summarizer. This event is delivered after the
/// beforeTupleInsertionEvent window event and before the
/// afterTupleInsertionEvent window event.
/// @param tuple the tuple that was inserted
virtual void onTupleInsertionEvent(TupleType const & tuple) {}
/// This event is fired when the current window is closed. An existing
/// window is closed after the last tuple that is part of that window is
/// received. This event is delivered before the beforeWindowFlushEvent.
virtual void onCloseWindowEvent() {}
};
To use a TumblingWindowSummarizer
, it must be registered with the
windowing library, using
_window.registerWindowSummarizer<derivedTumblingWindowSummarizer>()
.
This registration causes the windowing library to instantiate a
derivedTumblingWindowSummarizer object for each partition in the tumbling window.
The constructor of the object is passed a SPL::Operator&
as the argument. As
tuples are inserted into the window, the onTupleInsertionEvent
is invoked on the
proper summarizer. When the beforeWindowFlushEvent
is called when the tumbling
window flushes, the summarizer for the window can be accessed with this code:
WindowType& twindow = static_cast<WindowType&>(window);
// WindowType must be a tumbling window class.
derivedTumblingWindowSummarizer& summarizer =
*static_cast<derivedTumblingWindowSummarizer*>(twindow.getSummarizer(partition));
The information in the summarizer can then be used to generate a tuple, or any other action.
Here is an example that shows the use of a tumbling window summarizer to implement a Sum operation:
#define MY$OP MY_OPERATOR_SCOPE::MY_OPERATOR
// Assumes $sumType and $sumArg are already set with the C++ type and C++ expression to be summed.
// $sumArg must use
// SPL::CodeGen::prefixLiteralsAndStatesInCppExpressions($somArg->getCppExpression(), "_oper.");
// to ensure that all literals and state variables are prefixed by _oper. to allow the C++
// expression to access the necessary information in the primitive operator.
// The summarize class must then provide a _oper object to resolve the references.
struct Sum : public SPL::TumblingWindowSummarizer<MY$OP::IPort0Type*,MY$OP::PartitionByType>{
MY$OP& _oper; // Needed to ensure access to literals in C++ code
uint64_t _count; // Used to check if the partition is empty (optional)
<%=$sumType%> _sum; // incremental sum to be accumulated
Sum (SPL::Operator& oper) : _oper(static_cast<My$OP &>(oper)), _count(0), _sum(0) {}
// Will be invoked for each tuple in a partition
void onTupleInsertionEvent(MY$OP::IPort0Type* const& tuple) {
_count++; // Keep a running count of the tuples seen in this window
// Establish the needed name for the incoming tuple
MY$OP::IPort0Type const &<%=$inTupleName%> = *static_cast<MY$OP::IPort0Type const*>(tuple);
// finally, do the actual sum
sum += <%=$sumArg%>
}
};
MY_OPERATOR::MY_OPERATOR()
: _window(<%=$windowCppInitializer%>)
{
_window.registerBeforeWindowFlushHandler(this);
_window.registerWindowSummarizer<Sum>();
...
}
void MY_OPERATOR::beforeWindowFlushEvent(
WindowEventType::WindowType & window,
WindowEventType::PartitionType const & partition)
{
WindowType& twindow = static_cast<WindowType&>(window);
Sum& summarizer = *static_cast<Sum*>(twindow.getSummarizer(partition));
if (summarizer._count == 0) {
// optional: take action for empty windows
// return;
}
// Window has data - use the value in the summarizer
OPort0Type otuple (summarizer._sum);
submit (otuple, 0);
}
Window events
Various events that are fired by windows are captured through the WindowEvent
template class. WindowEvent
has the same template form as the window classes
themselves. Window classes also include a typedef that is called WindowEventType
that identifies the window event class for the particular window. To handle window events, register
a class that derives from WindowEvent
with the window at hand. Two events are
common to both tumbling and sliding windows. They are listed here as they appear in the
WindowEvent
class:
virtual void beforeTupleInsertionEvent(
WindowType & window, TupleType const & tuple, PartitionType const & partition) {}
virtual void afterTupleInsertionEvent(
WindowType & window, TupleType & tuple, PartitionType const & partition) {}
beforeTupleInsertionEvent
is fired right before a tuple is inserted into the
window, whereas afterTupleInsertionEvent
is fired right after a tuple is inserted
into the window.
Three events are specific to the tumbling windows. They are listed here as they appear in the
WindowEvent
class:
virtual void beforeWindowFlushEvent(WindowType & window, PartitionType const & partition) {}
virtual void afterWindowFlushEvent(WindowType & window, PartitionType const & partition) {}
virtual void onEmptyWindowPunctEvent(WindowType & window) {}
beforeWindowFlushEvent
is fired right before a tumbling
window is flushed, whereas afterWindowFlushEvent
is fired right after a tumbling
window is flushed. onEmptyWindowPunctEvent
is fired when a
tumbling window has trigger punct
, and a punctuation is received when there are no
tuples in the window. The operator detects that this situation occurs and takes an action (such as
submitting a Window punctuation).
Four events are specific to the sliding windows. They are listed here as they appear in the
WindowEvent
class:
virtual void beforeTupleEvictionEvent(
WindowType & window, TupleType & tuple, PartitionType const & partition) {}
virtual void afterTupleEvictionEvent(
WindowType & window, TupleType & tuple, PartitionType const & partition) {}
virtual void onWindowTriggerEvent(
WindowType & window, PartitionType const & partition) {}
virtual void onWindowInitialFullEvent(
WindowType & window, PartitionType const & partition) {}
beforeTupleEvictionEvent
is fired right before a tuple is evicted from a sliding
window, whereas afterTupleEvictionEvent
is fired right after a tuple is evicted from
a sliding window. onWindowTriggerEvent
is fired when a sliding
window is triggered. Finally, onWindowInitialFullEvent
is fired when a sliding
window is full for the first time. This function is particularly useful when the window trigger
events are to be ignored before the window becomes full for the first time (a common use case for
operators like Aggregate).
WindowEvent
class:virtual void onWindowPartitionEvictionSelection(WindowType& window,
PartitionSelectionIteratorconst& begin,
PartitionSelectionIteratorconst& end){}
virtual void onWindowPartitionEviction(WindowType& window,
PartitionIteratorconst& begin,
PartitionIteratorconst& end) {}
The onWindowPartitionEvictionSelection
event
is fired to select partitions for eviction. The onWindowPartitionEviction
event
is fired before the selected partitions are evicted.register<EventType>Handler
family of functions. For the window events that are common to tumbling and sliding windows, these
registration functions are defined in the Window
base class, as
follows:WindowEventType * registerBeforeTupleInsertionHandler(WindowEventType * e);
WindowEventType * registerAfterTupleInsertionHandler(WindowEventType * e);
These functions take a pointer to a WindowEventType object, which represents the event handler for the particular type of event under consideration. If this pointer is NULL, then the current event handler (if any) is removed. The registration functions always return the previous value for the event handler object pointer. Initially all event handler object pointers are set to NULL, which means there are no event handlers registered.
TumblingWindow
class, as
follows:WindowEventType * registerBeforeWindowFlushHandler(WindowEventType * e);
WindowEventType * registerAfterWindowFlushHandler(WindowEventType * e);
SlidingWindow
class, as
follows:WindowEventType * registerBeforeTupleEvictionHandler(WindowEventType * e);
WindowEventType * registerAfterTupleEvictionHandler(WindowEventType * e);
WindowEventType * registerOnWindowTriggerHandler(WindowEventType * e);
WindowEventType * registerOnWindowInitialFullHandler(WindowEventType * e);
Accessing window data
There are three functions available for accessing data that is stored within windows. These
functions are defined in the Window
class, as follows:
StorageType & getWindowStorage();
DataType & getWindowData(PartitionType const & partition);
DataType & getWindowData();
The getWindowStorage
function returns a reference to the main window storage
area, which maps each partition to its own specific storage area that contains window data. For a
partition, its window data can be accessed through the getWindowData
function.
Also, a version of getWindowData
exists that does not take a
partition
parameter. This version is used for non-partitioned windows and creates
the default partition if it does not already exist.
Window events do not involve multi-threading in the absence of time-based eviction and trigger
policies. Otherwise, each time-based policy of a window brings in a thread of its own. For instance,
a tumbling window introduces a thread when it is configured with a time-based eviction policy. A
sliding window can bring in up to two threads, as it might have a time-based trigger policy in
addition to a time-based eviction policy. For a window with a time-based policy, the window is
always locked when an event is fired, so the users do not need to deal with locking inside event
handlers. In a time-based eviction policy, the tuple eviction and window flush events
(before/afterTupleEvictionEvent
and before/afterWindowFlushEvent
)
are fired by the eviction thread. In a time-based trigger policy, the window trigger events
(onWindowTriggerEvent
) are fired by the trigger thread.
To access the window contents from outside an event handler, in the presence of
time-based eviction or trigger policies, the AutoWindowDataAcquirer
class can be
used. The usage is as follows:
{ AutoWindowDataAcquirer awa(window);
... // access the window data safely
}
The AutoWindowDataAcquirer
object, when constructed, calls
acquireData()
on the window, and when destructed (when it goes out of scope in the
example), it calls releaseData()
on the window. These operations reduce to a no-op
for windows that are free of time-based eviction and trigger policies. As a result, this pattern can
be safely used in the general case without worrying about unnecessary processing.
Putting it all together
To summarize, here is a list of the steps that are involved in using the windowing library from within a primitive operator:
- Add member variables to your operator that represent the windows
- Extend your class from
WindowEvent
and add relevant member function overrides to handle window events - Add initialization code to your operator's constructor to set up the windowing policies for your windows
- Add registration code to your operator's constructor to register the events of interest
- Add logic to your processing functions to insert items into the window
- Add logic to your event handler function to implement windowing functionality
Look at an end-to-end example of using the windowing library in a non-generic operator. This example is extended to a generic operator that handles various types of windows in Window handling.
For illustration purposes, use a small application called SensorQuery
. This
application aims at answering queries that are posed against the recent history of sensor readings.
It contains two sources, one generating queries, and another generating sensor readings. Create an
operator that keeps the last n
sensor readings and evaluates the incoming queries
against these readings, outputting any matches that are found. The matching is based on the distance
of the query to the sensor that originates the reading. In other words, this operator keeps a
sliding window with a count-based eviction policy of n
on the sensor readings, and
matches the incoming queries against that window, using nested loop processing. To make things a
little more interesting, this operator also tracks how many times a sensor reading was matched
during its stay within the window, and outputs readings that are popular when such readings are
evicted from the window. A separate output stream is used for this purpose.
composite SensorQuery {
type
Position = float64 x, float64 y;
Reading = int32 sensorId, Position position, float64 value;
Query = int32 queryId, Position position, float64 distance;
Match = int32 queryId, int32 sensorId, float64 value;
graph
stream<Reading> Readings as O = Beacon() {
param iterations : 10000u;
output O : sensorId = (int32) (100.0*random()),
position = { x = random(), y = random() },
value = 1000.0 * random();
}
stream<Query> Queries as O = Beacon() {
logic state : mutable int32 id = -1;
param iterations : 10000u;
output O : queryId = ++id, distance = 0.05,
position = { x = random(), y = random() };
}
(stream<Match> Matches; stream<Reading> PopReadings)
= FindReadings(Readings; Queries)
{ param size : 100u; threshold : 3u; }
}
Beacon operators are used as simple workload generators for the sensor readings and the queries. An operator that is named FindReadings observes data on the sensor and query streams using two input ports. This operator produces the match results and the popular sensor readings, using two output streams on two different output ports. As part of this example, this operator can be implemented as a non-generic operator that employs the windowing library. For brevity, assume that the operator model is already set up, and focus on the C++ code for implementing the operator. Start with the header file that defines the operator, as follows:
#include <SPL/Runtime/Window/Window.h>
#pragma SPL_NON_GENERIC_OPERATOR_HEADER_PROLOGUE
class MY_OPERATOR : public MY_BASE_OPERATOR,
public WindowEvent<MY_BASE_OPERATOR::IPort0Type*> {
// extend from the window event class
public:
MY_OPERATOR();
virtual ~MY_OPERATOR();
void process(Tuple const & tuple, uint32_t port);
// method to handle window eviction events
void beforeTupleEvictionEvent(Window<IPort0Type*> & window,
IPort0Type * & tuple, int const &);
private:
Mutex mutex_; // This is a stateful operator
typedef SlidingWindow<IPort0Type*> WindowType; // Use pointers
WindowType windowOfReadings_; // Keep the current readings
map<uintptr_t,uint32> matchedReadings_; // Keep match counts
};
#pragma SPL_NON_GENERIC_OPERATOR_HEADER_EPILOGUE
First, a member variable named windowOfReadings_ is
defined. The type of the window that is used is SlidingWindow<IPort0Type*>
.
In other words, it stores pointers to tuples in the window. The tuple
type for the window corresponds to the sensor readings. Also, it keeps
a map from sensor reading tuples to the number of times they match
to a query. This map is called matchedReadings_
.
It contains tuples for which there was at least one match. Such tuples
are stored until they are evicted from the window. Use this map
to track the popular tuples.
Second, the operator class extends from WindowEvent<MY
BASE OPERATOR::IPort0Type*>
and implements only one event
handler, namely beforeTupleEvictionEvent
. This event
is sufficient to implement the wanted functionality.
Now look at the implementation of this operator. The implementation can be described in pieces. The first piece consists of the constructor and the destructor.
#pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_PROLOGUE
MY_OPERATOR::MY_OPERATOR()
: windowOfReadings_(*this /*operator*/, 0 /*port*/,
CountWindowPolicy(getParameter_size()), // eviction policy
CountWindowPolicy(1)) // trigger policy
{ windowOfReadings_.registerBeforeTupleEvictionHandler(this); }
MY_OPERATOR::~MY_OPERATOR()
{ windowOfReadings_.deleteWindowObjects(); }
...
#pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_EPILOGUE
In the constructor, initialize the window by passing it the operator,
the input port index, and the window policies. Use a count-based eviction
policy, where the size is retrieved from the parameter size
.
A trigger policy of size 1
is used. Also, register
the operator with the window to handle the beforeTupleEviction
events.
In the destructor, call the window's deleteWindowObjects
function
to reclaim memory for the tuples contained. This call is needed because
pointers in the window are stored, rather than tuple values.
Now look at the implementation of the processing logic for the tuples from the first port, representing sensor readings.
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {
AutoPortMutex apm(mutex_, *this);
if(port==0) {
IPort0Type const & reading = static_cast<IPort0Type const&>(tuple);
windowOfReadings_.insert(new IPort0Type(reading));
} else { ... }
}
Use an auto port mutex to protect the processing logic, since the operator is stateful. The processing logic for the first port is as simple as inserting a new tuple that represents the just received sensor reading into the window. This action might in turn fire an eviction event.
Now look at the processing logic for the tuples from the second port that represents the queries:
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {
AutoPortMutex apm(mutex_, *this);
if(port==0) { ... } else {
IPort1Type const & query = static_cast<IPort1Type const&>(tuple);
IPort1Type::position_type const & qPos = query.get_position();
WindowType::DataType & content = windowOfReadings_.getWindowData();
for(WindowType::DataType::iterator it=content.begin(); it!=content.end(); ++it) {
IPort0Type const & reading = **it;
IPort0Type::position_type const & rPos = reading.get_position();
float64 distance = spl::sqrt(spl::pow(rPos.get_x()-qPos.get_x(), 2.0) +
spl::pow(rPos.get_y()-qPos.get_y(), 2.0));
if(distance<=query.get_distance()) {
OPort0Type otuple(query.get_queryId(), reading.get_sensorId(),
reading.get_value());
submit(otuple, 0);
uintptr_t ptr = reinterpret_cast<uintptr_t>(&reading);
map<uintptr_t,uint32>::iterator mit = matchedReadings_.find(ptr);
if(mit==matchedReadings_.end())
matchedReadings_.insert(std::make_pair(ptr,1));
else
mit->second = mit->second+1;
} } } }
To process a query tuple, get access to the contents of the sensor
readings window and iterate over it. For each sensor reading seen,
compute the distance between the sensor reading and the query, and
check whether this distance is less than or equal to the threshold
specified in the query. If so, create an output tuple that represents
the match and submit it to the first output port. If a match, also
insert the sensor reading tuple pointer into the matchedReadings_
map.
Increment the number of times the sensor reading was matched to a
query, if it was already in the map.
Now look at the tuple eviction event handler for sensor readings.
void MY_OPERATOR::beforeTupleEvictionEvent(Window<IPort0Type*> & window,
IPort0Type * & reading, int const &) {
uintptr_t ptr = reinterpret_cast<uintptr_t>(reading);
map<uintptr_t,uint32>::iterator mit = matchedReadings_.find(ptr);
if(mit!=matchedReadings_.end()) {
uint32 numMatches = mit->second;
if(numMatches >= getParameter_threshold())
submit(*reading, 1);
matchedReadings_.erase(mit);
}
delete reading;
}
When a sensor reading tuple is evicted, first check if this tuple
had any matches. If so, check the number of matches and output the
tuple on the second port in case the number of matches is beyond the threshold
that is specified by the threshold
parameter. In
other words, if it is determined that the sensor reading tuple is
a popular one, output it. Also, remove the tuple from the matched
readings map. Before the event handler returns, reclaim the memory
for the tuple by deleting its pointer.