Window handling

The SPL code generation framework provides code generation APIs to ease the common tasks that are associated with window handling.

These include helper functions within the SPL::CodeGen module, which is used for generating code for window definitions and window initializations, and validating window configurations.

The getWindowCppType function, which is a window object, and a tuple type as input, returns the C++ class name for the window. It also takes an optional third argument, which specifies the partition attribute type for partitioned windows. Here is an example:

// This is an <opname>_h.cgt file
<%
  my $partitionByParam = $model->getParameterByName("partitionBy");
  my $partitionByCppType = ...
  my $window = $model->getInputPortAt(0)->getWindow();
  my $windowCppType = ($partitionByParam)
    ? SPL::CodeGen::getWindowCppType($window, "IPort0Type*", "PartitionByType")
    : SPL::CodeGen::getWindowCppType($window, "IPort0Type*");
  ...
%>
class MY_OPERATOR : public MY_BASE_OPERATOR {
public:
  typedef <%=$partitionByCppType%> PartitionByType;
  typedef <%=$windowCppType%> WindowType;
  ...  
private:
  WindowType _window;
};
The getWindowCppInitializer function, which is a window object, and a tuple type as input, returns a C++ code segment that can be used as a constructor initializer for the window. It also takes optional third and fourth arguments, which specify attribute getter function names for delta-based eviction and trigger polices. The following example illustrates a common use case:
// This is an <opname>_cpp.cgt file
<%
  my $window = $model->getInputPortAt(0)->getWindow();
  my $windowCppInitializer = SPL::CodeGen::getWindowCppInitializer($window, "IPort0Type*");
  ...
%>
MY_OPERATOR::MY_OPERATOR()
  : MY_BASE_OPERATOR(), _window(<%=$windowCppInitializer%>)
{}

The getPartitionedWindowCppInitializer function, which is given a window object for a partitioned window, returns the C++ type initializer for the window. Partition eviction is supported only if an operator uses the getPartitionedWindowCppInitializer function. The SPL compiler warns at compile time if the getWindowCppInitializer function is used with a window that has a partition eviction policy.

The checkWindowConfiguration function, which is given an input port object, and a reference to a window configuration list, returns whether the input port has a window configuration that is among the list of configurations that are passed in or not. The validateWindowConfiguration function is similar, but rather than returning a value, it prints an error message and exits in case there are no matching configurations. To create a configuration list reference, use the createWindowConfigurations function. This function takes the input port object and a list of valid window configurations as input. Each window configuration is a hash data structure that contains one or more of the following mappings:

  • type => <window-type>, where the window type is one of the following constants $SPL::Operator::Instance::Window::TUMBLING or SLIDING.
  • eviction => <eviction-policy>, where the eviction policy is one of the following constants $SPL::Operator::Instance::Window::COUNT, DELTA, TIME, or PUNCT.
  • trigger => <trigger-policy>, where the trigger policy is one of the following constants $SPL::Operator::Instance::Window::COUNT, DELTA, or TIME.
  • partitioned => <partition-status>, where the partition status is 0 or 1.

Each mapping in the hash data structure is a constraint and the hash itself represents a conjunction of these constraints. For instance, if the hash contains both type and partitioned mappings, then it represents windows configurations that match both constraints.

The following example shows where it is checked whether the window is a tumbling window of any kind or a sliding window with a count-based eviction and trigger policy. The example uses the createWindowConfigurations function to create a configuration list reference. It passes a set of valid configurations. The list has two entries. The first one is a configuration that captures all tumbling windows. The second one is a configuration that captures sliding windows with count-based eviction and trigger policies.

my $confs = SPL::CodeGen::createWindowConfigurations(
        { type     => $SPL::Operator::Instance::Window::TUMBLING }, # conf 1
        { type     => $SPL::Operator::Instance::Window::SLIDING,    # conf 2
          eviction => $SPL::Operator::Instance::Window::COUNT,
          trigger  => $SPL::Operator::Instance::Window::COUNT});
my $iport = $model->getInputPortAt(0);
SPL::CodeGen::validateWindowConfiguration($iport, $confs);

Putting it All Together: Now take a second look at the SensorQuery application from Window handling. This example rewrites the FindReadings operator that is used in that application as a generic operator that handles all variations of the eviction policies for the sliding window it defines on the sensor readings. The name of this new operator is TableMatch, since it matches the queries that are received on its second input port against the table of readings that are maintained as a sliding window on its first input port. This operator also supports arbitrary match expressions. Similar to the FindReadings operator, it outputs matches as part of the query processing. It also outputs the popular readings on a second output port, as part of evictions, just like FindReadings does. For brevity, the operator model is not described. The interested reader can check the sample/feature/WindowLibrary that comes with the product installation for details. The sample contains the complete implementations of the FindReadings and TableMatch operators, together with the SampleQuery sample application. Here is the sample use of the TableMatch operator in SPL.

(stream<Match> Matches; stream<Reading> PopReadings)
 = TableMatch(Readings as R; Queries as Q) {
 window
  R : sliding, count(100);
 param
  match : sqrt(
   pow(R.position.x - Q.position.x, 2.0) +
   pow(R.position.y - Q.position.y, 2.0)) < Q.distance;
  threshold : 3u;
}

Unlike the FindReadings operator, the TableMatch operator allows the specification of a window on its first input port and has a match parameter that accepts arbitrary Boolean conditions.

Now look at the implementation of the TableMatch operator. Start with the TableMatchCommon Perl module that checks for a few errors that are not covered by the operator model.

package TableMatchCommon;
use strict;
use warnings;
sub verify($) {
  my ($model) = @_;
  my $lhsInputPort = $model->getInputPortAt(0);
  my $rhsOutputPort = $model->getOutputPortAt(1);
  if($lhsInputPort->getCppTupleType() ne $rhsOutputPort->getCppTupleType()) {
    SPL::CodeGen::exitln("Schema for the output port at index 1 does not match".
      "that of input port at index 0", $rhsOutputPort->getSourceLocation());
  }
  my $confs = SPL::CodeGen::createWindowConfigurations(
         { type      => $SPL::Operator::Instance::Window::SLIDING,
           trigger => $SPL::Operator::Instance::Window::COUNT});
  SPL::CodeGen::validateWindowConfiguration($lhsInputPort, $confs);
} 1;

Two checks are performed by the module. First, it makes sure that the schema of the second output port that is used to output popular readings matches the schema of the first input port that is used to receive the readings. Second, it employs the createWindowConfigurations and validateWindowConfiguration code generation helper routines to make sure that a valid window configuration is specified. In this case, all sliding windows with a count-based trigger policy are allowed. Recall that the TableMatch operator does not use trigger processing and the trigger policy defaults to count(1) when it is omitted in the SPL operator invocation.

Now look at the header code generator template.

<%
  use TableMatchCommon;
  TableMatchCommon::verify($model);
  my $windowLHS = $model->getInputPortAt(0)->getWindow();
  my $windowCppType = SPL::CodeGen::getWindowCppType($windowLHS, "IPort0Type*");
  my $windowEventType = SPL::CodeGen::getWindowEventCppType($windowLHS, "IPort0Type*");
%>
<%SPL::CodeGen::headerPrologue($model);%>
  class MY_OPERATOR : public MY_BASE_OPERATOR {
  public:
   typedef <%=$windowCppType%> WindowType;
   typedef <%=$windowEventType%> WindowEventType;
   struct WindowHandler : WindowEventType {
     WindowHandler(MY_OPERATOR & op) : op_(op) {}
     void beforeTupleEvictionEvent(WindowType & window,
       TupleType & tuple, PartitionType const &)
       { op_.evict(*tuple); }
     MY_OPERATOR & op_;
   }; 
   MY_OPERATOR();
   virtual ~MY_OPERATOR();
   void process(Tuple const & tuple, uint32_t port);
   void evict(IPort0Type & tuple);
  private:
   Mutex mutex_;
   WindowType windowOfReadings_;
   WindowHandler windowHandler_;
   map<uintptr_t,uint32> matchedReadings_;
};
<%SPL::CodeGen::headerEpilogue($model);%>

This version is almost the same as the non-generic version, with a few differences. First, it uses the headerPrologue and headerEpilogue code generation helper routines, instead of the #pragma's used for the non-generic version, since it is a generic operator that is written in mixed-mode C++/Perl. Second, it uses the getWindowCppType and the getWindowEventCppType code generation helper routines to get the type of the window object and the window event class it wants to extend from. Then, typedef to WindowType and WindowEventType for ease of use. Another difference is that, rather than making the operator extend from the window event class directly, it creates an inner struct that extends from the window event class. An instance of this struct, named windowHandler_, is defined as a member variable and routes any beforeTupleEviction events it receives to the evict member function of the operator. This code provides syntactic clarity,1 and can scale better in the case of multiple windows.

Now look at the implementation code generator template, and in particular, the Perl code that sets up a few variables that are later used in the implementation.

<%
  use TableMatchCommon;
  TableMatchCommon::verify($model);
  my $class = $model->getContext()->getClass();
  my $inputPortLHS = $model->getInputPortAt(0);
  my $inputPortRHS = $model->getInputPortAt(1);
  # tuple names used in expressions
  my $readingTuple = $inputPortLHS->getCppTupleName();
  my $queryTuple = $inputPortRHS->getCppTupleName();
  # parameter expressions
  my $match = $model->getParameterByName("match");
  $match = $match->getValueAt(0)->getCppExpression();
  my $threshold = $model->getParameterByName("threshold");
  $threshold = $threshold->getValueAt(0)->getCppExpression();
  my $window = $inputPortLHS->getWindow();
  my $windowCppInitializer = SPL::CodeGen::getWindowCppInitializer($window, "IPort0Type*");
  # assignments used for output tuples
  my $assignments = SPL::CodeGen::getOutputTupleCppInitializer($model->getOutputPortAt(0));
%>
<%SPL::CodeGen::implementationPrologue($model);%>
...
<%SPL::CodeGen::implementationEpilogue($model);%>

Use the $readingTuple and $queryTuple variables to represent the tuple variable names that appear in C++ expressions, for the first and the second port. The $match variable is used to represent the match condition expression and the $threshold variable is used to represent the threshold that determines popular readings. Use the getWindowCppInitializer helper routine to get the expression list to be used for initializing the window object and the getOutputTupleCppInitializer helper routine to get the expression list to be used to initialize the output tuples sent out for matches. With the Perl variables defined, the rest of the code is, surprisingly, briefer than the non-generic implementation.

First look at the constructor and destructor.

MY_OPERATOR::MY_OPERATOR()
  : windowOfReadings_(<%=$windowCppInitializer%>),
    windowHandler_(*this)
  { windowOfReadings_.registerBeforeTupleEvictionHandler(&windowHandler_); }
MY_OPERATOR::~MY_OPERATOR()
  { windowOfReadings_.deleteWindowObjects(); }

Other than the use of the Perl variable $windowCppInitializer for the initialization of the window object, the constructor, and the destructor are identical to the non-generic case.

Now look at the implementation of the processing logic.

void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {
 AutoPortMutex apm(mutex_, *this);
 if(port==0) {
  IPort0Type const & reading = static_cast<IPort0Type const&>(tuple);
  windowOfReadings_.insert(new IPort0Type(reading));
 } else {
  AutoWindowDataAcquirer<IPort0Type*> awda(windowOfReadings_);
  IPort1Type const & <%=$queryTuple%> = static_cast<IPort1Type const&>(tuple);
  WindowType::DataType & content = windowOfReadings_.getWindowData();
  for(WindowType::DataType::const_iterator it=content.begin(); it!=content.end(); ++it) {
   IPort0Type const & <%=$readingTuple%> = **it;
   if(<%=$match%>) {
    OPort0Type otuple(<%=$assignments%>);
    submit(otuple, 0);
    uintptr_t ptr = reinterpret_cast<uintptr_t>(*it);
    map<uintptr_t,uint32>::iterator mit = matchedReadings_.find(ptr);
    if(mit==matchedReadings_.end())
       matchedReadings_.insert(std::make_pair(ptr,1));
    else
       mit->second = mit->second+1;
} } } }

The processing logic for the first port, that is for the sensor readings, is the same as the non-generic case. The processing for the second port, that is for the queries, is very similar, but with a few differences. First, define tuple aliases with names $queryTuple and $readingTuple to create the right syntactic context for the output assignment expressions and the match condition expression. Second, $match is used in place of the match condition expression, which makes the operator generic with respect to the match condition. Third, $assignments is used as the initializer for the output tuple that is created for matches, which makes the operator generic with respect to the output assignments. Finally, the generic version of the operator employs an AutoWindowDataAcquirer that guards the access to the window contents. This code is needed because, unlike the non-generic version, this operator handles windows that contain time-based eviction policies, in which case an eviction event might take place while it is doing processing for a query.

Finally, look at the eviction event handler.

void MY_OPERATOR::evict(IPort0Type & reading) {
 uintptr_t ptr = reinterpret_cast<uintptr_t>(&reading);
 map<uintptr_t,uint32>::iterator mit = matchedReadings_.find(ptr);
 if(mit!=matchedReadings_.end()) {
  uint32 numMatches = mit->second;
  if(numMatches >= <%=$threshold%>)
   submit(reading, 1);
   matchedReadings_.erase(mit);
  }
  delete &reading;
}

The only difference between the generic and non-generic versions is the way the popularity threshold parameter is accessed.

1 The TupleType used in the beforeTupleEviction member of the WindowHandler class is not the one defined in the operator class (that would be SlidingWindow<...>) but rather comes from the WindowEventType base class (that is Window<...>).