Implementing non-blocking checkpointing in stateful primitive operators

A stateful C++ or Java primitive operator can implement non-blocking checkpointing of operator state data to reduce the time that the tuple flow is blocked during checkpointing.

When a consistent state of a consistent region is established, the tuple flow is temporarily blocked until the operators in the consistent region are drained and checkpointed. With non-blocking checkpointing, the tuple flow can be resumed while an internal SPL Runtime thread checkpoints operator state data in the background.

By default, a stateful operator in a consistent region that implements the SPL StateHandler interface is checkpointed in a blocking manner:
  1. The tuple flow is blocked.
  2. The SPL Runtime invokes the operator's StateHandler::drain() callback function.
  3. The SPL Runtime invokes the operator's StateHandler::checkpoint() callback function.
  4. The tuple flow is resumed.
If non-blocking checkpointing is enabled for an operator, checkpointing of operator state data is done as follows:
  1. The tuple flow is blocked.
  2. The SPL Runtime invokes the operator's StateHandler::drain() callback function.
  3. The SPL Runtime invokes the operator's StateHandler::prepareForNonBlockingCheckpoint() callback function.
  4. The tuple flow is resumed. Meanwhile the SPL Runtime delegates an internal thread to execute the operator’s StateHandler::checkpoint() callback function in the background. Background checkpointing is internally tracked by the SPL Runtime.
  5. The background thread finishes checkpointing the operator state.
With non-blocking checkpointing, tuple processing can overlap with checkpointing, which reduces the blocking time of the tuple flow.
To enable non-blocking checkpointing, primitive operators must implement these functions:
  • The operator implementation must call the ConsistentRegionContext::enableNonBlockingCheckpoint() function in the operator's constructor.
  • The operator must implement the StateHandler::prepareForNonBlockingCheckpoint() and StateHandler::checkpoint() callback functions.

The StateHandler::prepareForNonBlockingCheckpoint() function is called after the StateHandler::drain() function. The StateHandler::prepareForNonBlockingCheckpoint() function prepares the operator state in a certain way so that later it can be checkpointed by a StateHandler::checkpoint() callback while new tuples are being processed. Note that the processing of new tuples usually requires reading and updating the latest version of the operator state, and background checkpointing requires reading the version of the operator state at the time when StateHandler::prepareForNonBlockingCheckpoint() is called.

With the StateHandler::prepareForNonBlockingCheckpoint() and StateHandler::checkpoint() callback interface, an operator can implement non-blocking checkpointing in various ways. One simple approach is to make a copy of the operator state in StateHandler::prepareForNonBlockingCheckpoint() and checkpoint that copy of the operator state in StateHandler::checkpoint(). This way, background checkpointing reads the copied operator state data, while tuple processing reads and updates the original copy of the state. Thus, both activities can make progress concurrently without any synchronization with each other. Another approach is to serialize the operator state into a memory buffer (such as SPL::NativeByteBuffer) in StateHandler::prepareForNonBlockingCheckpoint() and to checkpoint the serialized data in StateHandler::checkpoint().

As an example, suppose the operator has a vector of strings as its state. It can implement the StateHandler::prepareForNonBlockingCheckpoint() and StateHandler::checkpoint() callbacks as follows.

This code snippet is the MyOp_h.cgt file of the operator implementation:
// in MyOp_h.cgt file
<%
  # use Code Generation API to check if the operator is used in a Consistent Region
   my $isInConsistentRegion = 
     $model->getContext()->getOptionalContext("ConsistentRegion");
   my @includes;
   if ($isInConsistentRegion) {
     push @includes, "#include <SPL/Runtime/Operator/State/StateHandler.h>";
   }
   SPL::CodeGen::headerPrologue($model, \@includes);
%>
#include <vector>
#include <string>
#include < SPL/Runtime/Serialization/NativeByteBuffer.h >
class MY_OPERATOR : public MY_BASE_OPERATOR, public StateHandler {
public:
    MY_OPERATOR();
    ~MY_OPERATOR();

    virtual void process(Punctuation const & punct, uint32_t port);
    virtual void process(Tuple const & tuple, uint32_t port);

    // StateHandler callback interface
    virtual void checkpoint(Checkpoint & ckpt);
    virtual void reset(Checkpoint & ckpt);
    virtual void resetToInitialState();
    virtual void prepareForNonBlockingCheckpoint(int64_t id);
private:
SPL::Mutex _mutex; // mutex lock to guard concurrent access to operator state
Std::vector<std::string> _myState;     // a vector of strings as operator state
SPL::NativeByteBuffer _serializedState; // serialization buffer
}
This code snippet is the MyOp_cpp.cgt file of the operator implementation:
// in MyOp_cpp.cgt file
<%
    # use Code Generation API to check if the operator is used in a Consistent Region
    my $isInConsistentRegion = $model->getContext()->getOptionalContext("ConsistentRegion");
%>
// Constructor
MY_OPERATOR::MY_OPERATOR() {
    <% if ($isInConsistentRegion) {%>  
    ConsistentRegionContext * crContext = 
    static_cast<ConsistentRegionContext *>(getContext().getOptionalContext(CONSISTENT_REGION));
    // call ConsistentRegionContext::enableNonBlockingCheckpoint() in operator constructor
    crContext->enableNonBlockingCheckpoint();
    <% } %>    
}

// process() function for tuple processing
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)
{
    AutoPortMutex apm (_mutex, *this);
    // manipulate operator state (i.e. _myState) based on incoming tuple
    std::string newStirng = functionFoo(tuple);
    _myState.push_back(newString);
}

// Function to prepare operator state for non-blocking checkpointing
void MY_OPERATOR::prepareForNonBlockingCheckpoint(int64_t id)
{
    // serialize the state
    _serializedState << _myState;
}

// Function to checkpoint operator state
// This function can be executed while process() is called for processing new tuples
void MY_OPERATOR::checkpoint(Checkpoint & ckpt)
{
    // Note that no locking on _mutex is needed since this function does not access _myState
    // checkpoint the serialized state
    ckpt.addCharSequence((char const *)_serializedState.getPtr(), 
        (uint32_t)_serializedState.getContentSize());
    // reset the cursor of serialization buffer to the beginning
    _serializedState.setICursor(0);
}

// Function to restore operator state
void MY_OPERATOR::reset(Checkpoint & ckpt)
{
AutoMutex am(_mutex);
    // restore _myState from checkpoint
    ckpt >> _myState;
    // reset the cursor of serialization buffer to the beginning
    _serializedState.setICursor(0);
}

// Function to reset operator to initial state
void MY_OPERATOR::resetToInitialState()
{
    AutoMutex am(_mutex);
    _myState.clear();
    _serializedState.setICursor(0);
}

The approach shown in the example is simple to implement, and it has a performance advantage over the blocking checkpointing implementation. In the blocking checkpointing implementation, the vector of strings is directly checkpointed in StateHandler::checkpoint(), and both data serialization and I/O time contribute to the time the tuple flow is blocked. With the non-blocking checkpointing implementation, however, the blocking time is only the serialization time, which should be significantly shorter.

Besides the simple copy approach, there are other ways to implement non-blocking checkpointing. One alternative is to use copy-on-write data structures.

Operators that enable non-blocking checkpointing and operators that do not enable non-blocking checkpointing can be put in the same consistent region. In this case, the tuple flow of the consistent region is blocked when the region starts draining, and it is resumed when all the non-blocking checkpointing operators have completed their StateHandler::prepareForNonBlockingCheckpoint() callbacks and all the blocking checkpointing operators have completed their StateHandler::checkpoint() callbacks.

If an operator calls the ConsistentRegionContext::enableNonBlockingCheckpoint() function in its constructor while having logic state variables defined, its StateHandler::prepareForNonBlockingCheckpoint() and StateHandler::checkpoint() callbacks are still invoked by the SPL Runtime during draining the consistent region, but non-blocking checkpointing does not take effect at run time. That means, the tuple flow is resumed only after the StateHandler::checkpoint() callback is completed.

If an operator that is configured with checkpointing is in an autonomous region, the StateHandler::prepareForNonBlockingCheckpoint() callback is ignored

Implementing start operators that support non-blocking checkpointing

An operator that starts a consistent region can implement the StateHandler::regionCheckpointed() callback function to get notified by the SPL Runtime when all the operators in the consistent region were successfully drained and checkpointed, including those operators that perform non-blocking checkpointing. Within the StateHandler::regionCheckpointed() callback function, the start operator can implement logic such as:
  • Printing log messages, for example to record that a consistent state was established for the consistent region.
  • Cleaning up certain resources, for example purging any buffered input tuples received before this drain, because those tuples were reflected in the persisted state of the consistent region and are no longer needed for replay upon recovery from a failure.

A start operator that starts an operator-driven consistent region initiates the draining of the region by calling either the ConsistentRegionContext::makeConsistent() or the ConsistentRegionContext::makeConsistentNonBlocking() function.

When any thread in a start operator calls the ConsistentRegionContext::makeConsistent() function to initiate the draining of the consistent region, the calling thread is blocked on this call. The ConsistentRegionContext::makeConsistent() function returns true when all operators in the consistent region were drained and checkpointed, including any operator with non-blocking checkpointing enabled. The ConsistentRegionContext::makeConsistent() function returns false when there is a shutdown request or the region is reset due to failure. This means that if the start operator uses the ConsistentRegionContext::makeConsistent() function, the tuple flow is blocked until the whole consistent region is checkpointed, so that any operator that has non-blocking checkpointing enabled effectively falls back to blocking checkpointing at run time.

Alternatively, a start operator can call the ConsistentRegionContext::makeConsistentNonBlocking() API to initiate the draining of the consistent region and allow true non-blocking checkpointing at run time. The ConsistentRegionContext::makeConsistentNonBlocking() function returns an Enum object DrainResult with a value of either CHECKPOINT_PENDING, COMPLETED, or FAILED. Returning CHECKPOINT_PENDING means that the StateHandler::drain() callback is complete for all operators that do blocking checkpointing, and the StateHandler::prepareForNonBlockingCheckpoint() callback is complete for all operators that do non-blocking checkpointing. In this case, the tuple flow can be resumed while the pending non-blocking checkpointing is ongoing. When all the pending non-blocking checkpointing has finished, the SPL Runtime calls the start operator’s StateHandler::regionCheckpointed() function. The ConsistentRegionContext::makeConsistentNonBlocking() function can return COMPLETED in case all operators in the consistent region have been drained and checkpointed, or return FAILED if there is a shutdown request or the region is reset due to failure.

Operator developers should use the ConsistentRegionContext::makeConsistentNonBlocking() function in favor of the ConsistentRegionContext::makeConsistent() function to achieve non-blocking checkpointing of an operator-driven consistent region. The ConsistentRegionContext::makeConsistent() function is still available in Teracloud® Streams mainly for maintaining backwards compatibility with previous product versions.