Window handling
The SPL code generation framework provides code generation APIs to ease the common tasks that are associated with window handling.
These include helper functions within the SPL::CodeGen
module,
which is used for generating code for window definitions and window
initializations, and validating window configurations.
The getWindowCppType
function, which is a window
object, and a tuple type as input, returns the C++ class name for
the window. It also takes an optional third argument, which specifies
the partition attribute type for partitioned windows. Here is an example:
// This is an <opname>_h.cgt file
<%
my $partitionByParam = $model->getParameterByName("partitionBy");
my $partitionByCppType = ...
my $window = $model->getInputPortAt(0)->getWindow();
my $windowCppType = ($partitionByParam)
? SPL::CodeGen::getWindowCppType($window, "IPort0Type*", "PartitionByType")
: SPL::CodeGen::getWindowCppType($window, "IPort0Type*");
...
%>
class MY_OPERATOR : public MY_BASE_OPERATOR {
public:
typedef <%=$partitionByCppType%> PartitionByType;
typedef <%=$windowCppType%> WindowType;
...
private:
WindowType _window;
};
getWindowCppInitializer
function, which is
a window object, and a tuple type as input, returns a C++ code segment
that can be used as a constructor initializer for the window. It also
takes optional third and fourth arguments, which specify attribute
getter function names for delta-based eviction and trigger polices. The following
example illustrates a common use case:// This is an <opname>_cpp.cgt file
<%
my $window = $model->getInputPortAt(0)->getWindow();
my $windowCppInitializer = SPL::CodeGen::getWindowCppInitializer($window, "IPort0Type*");
...
%>
MY_OPERATOR::MY_OPERATOR()
: MY_BASE_OPERATOR(), _window(<%=$windowCppInitializer%>)
{}
The getPartitionedWindowCppInitializer
function,
which is given a window object for a partitioned window, returns the
C++ type initializer for the window. Partition eviction is supported
only if an operator uses the getPartitionedWindowCppInitializer
function.
The SPL compiler warns at compile time if the getWindowCppInitializer
function
is used with a window that has a partition eviction policy.
The checkWindowConfiguration
function, which is
given an input port object, and a reference to a window configuration
list, returns whether the input port has a window configuration that
is among the list of configurations that are passed in or not. The validateWindowConfiguration
function
is similar, but rather than returning a value, it prints an error
message and exits in case there are no matching configurations. To
create a configuration list reference, use the createWindowConfigurations
function.
This function takes the input port object and a list of valid window
configurations as input. Each window configuration is a hash data
structure that contains one or more of the following mappings:
type => <window-type>
, where the window type is one of the following constants$SPL::Operator::Instance::Window::TUMBLING
orSLIDING
.eviction => <eviction-policy>
, where the eviction policy is one of the following constants$SPL::Operator::Instance::Window::COUNT
,DELTA
,TIME
, orPUNCT
.trigger => <trigger-policy>
, where the trigger policy is one of the following constants$SPL::Operator::Instance::Window::COUNT
,DELTA
, orTIME
.partitioned => <partition-status>
, where the partition status is0
or1
.
Each mapping in the hash data structure is a constraint and the hash itself represents a conjunction of these constraints. For instance, if the hash contains both type and partitioned mappings, then it represents windows configurations that match both constraints.
The following example shows where it is checked whether the window
is a tumbling window of any kind or a sliding window with a count-based
eviction and trigger policy. The example uses the createWindowConfigurations
function
to create a configuration list reference. It passes a set of valid
configurations. The list has two entries. The first one is a configuration
that captures all tumbling windows. The second one is a configuration
that captures sliding windows with count-based eviction and trigger
policies.
my $confs = SPL::CodeGen::createWindowConfigurations(
{ type => $SPL::Operator::Instance::Window::TUMBLING }, # conf 1
{ type => $SPL::Operator::Instance::Window::SLIDING, # conf 2
eviction => $SPL::Operator::Instance::Window::COUNT,
trigger => $SPL::Operator::Instance::Window::COUNT});
my $iport = $model->getInputPortAt(0);
SPL::CodeGen::validateWindowConfiguration($iport, $confs);
Putting it All Together: Now take a second look at the SensorQuery
application from Window handling. This
example rewrites the FindReadings
operator that is
used in that application as a generic operator that handles all variations
of the eviction policies for the sliding window it defines on the
sensor readings. The name of this new operator is TableMatch
,
since it matches the queries that are received on its second input
port against the table of readings that are maintained as a sliding
window on its first input port. This operator also supports arbitrary
match expressions. Similar to the FindReadings
operator,
it outputs matches as part of the query processing. It also outputs
the popular readings on a second output port, as part of evictions,
just like FindReadings
does. For brevity, the operator
model is not described. The interested reader can check the sample/feature/WindowLibrary that
comes with the product installation for details. The sample contains
the complete implementations of the FindReadings
and TableMatch
operators,
together with the SampleQuery
sample application.
Here is the sample use of the TableMatch
operator
in SPL.
(stream<Match> Matches; stream<Reading> PopReadings)
= TableMatch(Readings as R; Queries as Q) {
window
R : sliding, count(100);
param
match : sqrt(
pow(R.position.x - Q.position.x, 2.0) +
pow(R.position.y - Q.position.y, 2.0)) < Q.distance;
threshold : 3u;
}
Unlike the FindReadings
operator, the TableMatch
operator
allows the specification of a window on its first input port and has
a match parameter that accepts arbitrary Boolean
conditions.
Now look at the implementation of the TableMatch
operator.
Start with the TableMatchCommon
Perl module that
checks for a few errors that are not covered by the operator model.
package TableMatchCommon;
use strict;
use warnings;
sub verify($) {
my ($model) = @_;
my $lhsInputPort = $model->getInputPortAt(0);
my $rhsOutputPort = $model->getOutputPortAt(1);
if($lhsInputPort->getCppTupleType() ne $rhsOutputPort->getCppTupleType()) {
SPL::CodeGen::exitln("Schema for the output port at index 1 does not match".
"that of input port at index 0", $rhsOutputPort->getSourceLocation());
}
my $confs = SPL::CodeGen::createWindowConfigurations(
{ type => $SPL::Operator::Instance::Window::SLIDING,
trigger => $SPL::Operator::Instance::Window::COUNT});
SPL::CodeGen::validateWindowConfiguration($lhsInputPort, $confs);
} 1;
Two checks are performed by the module. First, it makes sure that
the schema of the second output port that is used to output popular
readings matches the schema of the first input port that is used to
receive the readings. Second, it employs the createWindowConfigurations
and validateWindowConfiguration
code
generation helper routines to make sure that a valid window configuration
is specified. In this case, all sliding windows with a count-based
trigger policy are allowed. Recall that the TableMatch
operator
does not use trigger processing and the trigger policy defaults to count(1)
when
it is omitted in the SPL operator invocation.
Now look at the header code generator template.
<%
use TableMatchCommon;
TableMatchCommon::verify($model);
my $windowLHS = $model->getInputPortAt(0)->getWindow();
my $windowCppType = SPL::CodeGen::getWindowCppType($windowLHS, "IPort0Type*");
my $windowEventType = SPL::CodeGen::getWindowEventCppType($windowLHS, "IPort0Type*");
%>
<%SPL::CodeGen::headerPrologue($model);%>
class MY_OPERATOR : public MY_BASE_OPERATOR {
public:
typedef <%=$windowCppType%> WindowType;
typedef <%=$windowEventType%> WindowEventType;
struct WindowHandler : WindowEventType {
WindowHandler(MY_OPERATOR & op) : op_(op) {}
void beforeTupleEvictionEvent(WindowType & window,
TupleType & tuple, PartitionType const &)
{ op_.evict(*tuple); }
MY_OPERATOR & op_;
};
MY_OPERATOR();
virtual ~MY_OPERATOR();
void process(Tuple const & tuple, uint32_t port);
void evict(IPort0Type & tuple);
private:
Mutex mutex_;
WindowType windowOfReadings_;
WindowHandler windowHandler_;
map<uintptr_t,uint32> matchedReadings_;
};
<%SPL::CodeGen::headerEpilogue($model);%>
This version is almost the same as the non-generic version, with
a few differences. First, it uses the headerPrologue
and headerEpilogue
code
generation helper routines, instead of the #pragma
's
used for the non-generic version, since it is a generic operator that
is written in mixed-mode C++/Perl. Second, it uses the getWindowCppType
and
the getWindowEventCppType
code generation helper
routines to get the type of the window object and the window event
class it wants to extend from. Then, typedef
to WindowType
and WindowEventType
for
ease of use. Another difference is that, rather than making the operator
extend from the window event class directly, it creates an inner struct
that extends from the window event class. An instance of this struct,
named windowHandler_
, is defined as a member variable
and routes any beforeTupleEviction
events it receives
to the evict
member function of the operator. This
code provides syntactic clarity,1 and can scale better
in the case of multiple windows.
Now look at the implementation code generator template, and in particular, the Perl code that sets up a few variables that are later used in the implementation.
<%
use TableMatchCommon;
TableMatchCommon::verify($model);
my $class = $model->getContext()->getClass();
my $inputPortLHS = $model->getInputPortAt(0);
my $inputPortRHS = $model->getInputPortAt(1);
# tuple names used in expressions
my $readingTuple = $inputPortLHS->getCppTupleName();
my $queryTuple = $inputPortRHS->getCppTupleName();
# parameter expressions
my $match = $model->getParameterByName("match");
$match = $match->getValueAt(0)->getCppExpression();
my $threshold = $model->getParameterByName("threshold");
$threshold = $threshold->getValueAt(0)->getCppExpression();
my $window = $inputPortLHS->getWindow();
my $windowCppInitializer = SPL::CodeGen::getWindowCppInitializer($window, "IPort0Type*");
# assignments used for output tuples
my $assignments = SPL::CodeGen::getOutputTupleCppInitializer($model->getOutputPortAt(0));
%>
<%SPL::CodeGen::implementationPrologue($model);%>
...
<%SPL::CodeGen::implementationEpilogue($model);%>
Use the $readingTuple and $queryTuple variables
to represent the tuple variable names that appear in C++ expressions,
for the first and the second port. The $match variable
is used to represent the match condition expression and the $threshold variable is used to represent
the threshold that determines popular readings. Use the getWindowCppInitializer
helper
routine to get the expression list to be used for initializing the
window object and the getOutputTupleCppInitializer
helper
routine to get the expression list to be used to initialize the output
tuples sent out for matches. With the Perl variables defined, the
rest of the code is, surprisingly, briefer than the non-generic implementation.
First look at the constructor and destructor.
MY_OPERATOR::MY_OPERATOR()
: windowOfReadings_(<%=$windowCppInitializer%>),
windowHandler_(*this)
{ windowOfReadings_.registerBeforeTupleEvictionHandler(&windowHandler_); }
MY_OPERATOR::~MY_OPERATOR()
{ windowOfReadings_.deleteWindowObjects(); }
Other than the use of the Perl variable $windowCppInitializer for the initialization of the window object, the constructor, and the destructor are identical to the non-generic case.
Now look at the implementation of the processing logic.
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {
AutoPortMutex apm(mutex_, *this);
if(port==0) {
IPort0Type const & reading = static_cast<IPort0Type const&>(tuple);
windowOfReadings_.insert(new IPort0Type(reading));
} else {
AutoWindowDataAcquirer<IPort0Type*> awda(windowOfReadings_);
IPort1Type const & <%=$queryTuple%> = static_cast<IPort1Type const&>(tuple);
WindowType::DataType & content = windowOfReadings_.getWindowData();
for(WindowType::DataType::const_iterator it=content.begin(); it!=content.end(); ++it) {
IPort0Type const & <%=$readingTuple%> = **it;
if(<%=$match%>) {
OPort0Type otuple(<%=$assignments%>);
submit(otuple, 0);
uintptr_t ptr = reinterpret_cast<uintptr_t>(*it);
map<uintptr_t,uint32>::iterator mit = matchedReadings_.find(ptr);
if(mit==matchedReadings_.end())
matchedReadings_.insert(std::make_pair(ptr,1));
else
mit->second = mit->second+1;
} } } }
The processing logic for the first port, that is for the sensor
readings, is the same as the non-generic case. The processing for
the second port, that is for the queries, is very similar, but with
a few differences. First, define tuple aliases with names $queryTuple and $readingTuple to
create the right syntactic context for the output assignment expressions
and the match condition expression. Second, $match is
used in place of the match condition expression, which makes the operator
generic with respect to the match condition. Third, $assignments is
used as the initializer for the output tuple that is created for matches,
which makes the operator generic with respect to the output assignments.
Finally, the generic version of the operator employs an AutoWindowDataAcquirer
that
guards the access to the window contents. This code is needed because,
unlike the non-generic version, this operator handles windows that
contain time-based eviction policies, in which case an eviction event
might take place while it is doing processing for a query.
Finally, look at the eviction event handler.
void MY_OPERATOR::evict(IPort0Type & reading) {
uintptr_t ptr = reinterpret_cast<uintptr_t>(&reading);
map<uintptr_t,uint32>::iterator mit = matchedReadings_.find(ptr);
if(mit!=matchedReadings_.end()) {
uint32 numMatches = mit->second;
if(numMatches >= <%=$threshold%>)
submit(reading, 1);
matchedReadings_.erase(mit);
}
delete &reading;
}
The only difference between the generic and non-generic versions is the way the popularity threshold parameter is accessed.
TupleType
used
in the beforeTupleEviction
member of the WindowHandler
class
is not the one defined in the operator class (that would be SlidingWindow<...>
)
but rather comes from the WindowEventType
base class
(that is Window<...>
).