Checkpointing

Checkpointing is the process of persisting operator state at run time to allow recovery from a failure. In case of failure, the operator can be restarted by resetting from the checkpointed state.

For an operator, checkpointing (and the associated reset) can be triggered in two ways:
  1. If an operator is within a consistent region, checkpointing/resetting an operator's state is automatically invoked by the consistent region protocol. An operator's state is checkpointed during draining of the consistent region, and reset during resetting of the consistent region.
  2. If an operator is outside a consistent region (that is, in an autonomous region), an operator can be configured to perform checkpointing via the config checkpoint clause in SPL. This type of checkpointing can be further divided into two types: operator driven checkpointing and periodic checkpointing. Operator driven checkpointing is requested explicitly from within the Java/C++ operator code, whereas periodic checkpointing takes place automatically at regular user-defined intervals. In case of failure, the SPL Runtime automatically resets the operator state to the last saved checkpoint. Two sample operators configured with operator driven checkpointing and periodic checkpointing, respectively are given here.
stream<...> Out1 = MyOper1(...) {
   ...
   config
      checkpoint : operatorDriven; // operator driven checkpointing
      restartable : true;
}
 
stream<...> Out2 = MyOper2(...) {
   ...
   config
      checkpoint : periodic(10.5); // periodic checkpointing; checkpointing interval is 10.5 seconds
      restartable : true;
}

In the sample SPL code, operator MyOper1 is configured to perform operator driven checkpointing, and operator MyOper2 is configured to perform periodic checkpointing every 10.5 seconds.

Regardless of the checkpointing type, an operator's checkpoint data is represented as a set of key-value pairs. Each checkpoint has a unique sequence ID as key and the serialized state as value. When checkpointing is invoked, the SPL Runtime automatically assigns a sequence ID, and the operator uses the Checkpointing API to serialize its state data. The resulting checkpoint is stored to some backend persistent data store in the form of key-value pairs. At reset, the SPL Runtime decides which sequence ID to reset from, retrieves the corresponding checkpoint from the backend data store, and resets the operator state by deserializing data from the checkpoint.

Since Version 4.0, Teracloud® Streams provides a new set of Checkpointing Runtime APIs in both C++ and Java. The new Checkpointing API is part of the C++ and Java Operator Runtime API, and is intended for an operator to checkpoint and reset its state data. It should be noted that the Checkpointing API in versions prior to Version 4.0 is deprecated and we encourage users to use the new API described in this page.

The Checkpointing Runtime API has three parts: the StateHandler interface, the Checkpoint class, and the optional CheckpointContext class.

The first part of the C++ Checkpointing Runtime API is the StateHandler class. The StateHandler class defines a set of callback interfaces such as checkpoint() and reset() functions. Operator developers provide those callback functions in the operator implementation and register the callback functions with the SPL Runtime. At runtime, the SPL Runtime automatically invokes the registered checkpoint() and reset() callback functions to checkpoint and reset an operator's state.

An operator class implements the StateHandler interface by inheriting from it. The following example shows a C++ operator that implements its own StateHandler callback functions.

This code snippet is the header file of the operator implementation:

#include <SPL/Runtime/Operator/StateHandler.h>
 
class MY_OPERATOR : public MY_BASE_OPERATOR, StateHandler // inherit from StateHandler
{
public:
    ...
 
    // callback functions from StateHandler.h
    virtual void checkpoint(Checkpoint & ckpt);
    virtual void reset(Checkpoint & ckpt);
    virtual void resetToInitialState();
private:
    // variables myInt_ and myVector_ constitute the state of this operator
    uint32_t myInt_;
    std::vector<IPort0Type> myVector_;    
    Mutex myMutex_; // lock for guarding concurrent access to operator state
};

This code snippet is the C++ file of the operator implementation:

// Constructor of operator
MY_OPERATOR::MY_OPERATOR()
    : myInt_(0)
{
    ...
    // register StateHandler callbacks
    getContext().registerStateHandler(*this);
}
 
// process() function for tuple processing
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)
{
    // acquire lock
    AutoMutex am(myMutex_);
              
    // manipulate operator state during tuple processing
    IPort0Type & ituple = static_cast<IPort0Type &>(tuple);
    myInt_ ++;
    myVector_.push_back(ituple);
 
    // submit tuple downstream
    OPort0Type otuple(tuple.getAttributeValue(0), myInt_);
    submit(otuple, 0);
}
 
// checkpoint() callback to checkpoint operator state data
void MY_OPERATOR::checkpoint(Checkpoint & ckpt)
{
    // acquire lock
    AutoMutex am(myMutex_);
 
    SPLAPPTRC(L_TRACE, "Checkpointing with sequence id: " << ckpt.getSequenceId(), "MY_OP");
              
    // serialize state data to Checkpoint
    ckpt << myInt_;
    ckpt << myVector_; 
}
 
// reset() callback to reset operator state data
void MY_OPERATOR::reset(Checkpoint & ckpt)
{
    // acquire lock
    AutoMutex am(myMutex_);
 
    SPLAPPTRC(L_TRACE, "Resetting with sequence id: " << ckpt.getSequenceId(), "MY_OP");
              
    // de-serialize state data from Checkpoint   
    ckpt >> myInt_;
    ckpt >> myVector_; 
}
 
// resetToIntialState() callback to reset operator to initial state
void MY_OPERATOR::resetToInitialState()
{
    // acquire lock
    AutoMutex am(myMutex_);
    myInt_ = 0;
    myVector_.clear();
}

For more information on the equivalent Java StateHandler interface, take a look at the SPL Java Operator API documentation.

The second part of C++ Checkpointing Runtime API is the Checkpoint class. The Checkpoint class provides an abstraction to the checkpoint data. During checkpointing, a Checkpoint instance is passed to the operator's StateHandler::checkpoint() callback function. The operator can specify the data to checkpoint by using the << operator to serialize data to the provided Checkpoint instance. The SPL Runtime internally serializes checkpointed data and transfers the data to the backend data store. At reset, a Checkpoint instance is passed to the operator's StateHandler::reset() callback function. The operator can extract the data from checkpoint and reset its state by using the >> operator to deserialize data from the provided Checkpoint instance.

The Checkpoint class provides serialization and deserialization support for all primitive data types, SPL data types, and typical C++ STL container types. Serialization and deserialization of nested data types are automatically handled as well.

Besides serialization and deserialization, operator code can also call the Checkpoint::getSequenceId() function to get the sequence ID of the checkpoint being written or reset. For an operator within a consistent region, the sequence ID of a checkpoint is the same as the marker ID assigned by the consistent region protocol; for an operator in an autonomous region, the sequence ID is generated by an internal counter local to each operator. In both cases, the sequence ID is a positive integer number and increases with new checkpoints, but the sequence ID numbers are not guaranteed to be consecutive.

For more information on the equivalent Java Checkpoint interface, take a look at the SPL Java Operator API documentation.

Note that an operator's state has two parts: the optional SPL logic-state variables and the state data. The logic-state variables (if present) are automatically checkpointed and reset by the SPL Runtime, while the state data is checkpointed and reset via operator-provided StateHandler::checkpoint() and StateHandler::reset() callbacks.

If an operator has logic-state variables, during code generation, the SPL compiler automatically generates two functions to checkpoint and reset the logic-state variables. The signature of the two functions, checkpointStateVariables() and resetStateVariables(), are shown here.

// auto-generated functions for checkpointing and resetting logic-state variables
void MY_BASE_OPERATOR::checkpointStateVariables(Checkpoint & ckpt);
void MY_BASE_OPERATOR::resetStateVariables(Checkpoint & ckpt);

During checkpointing, the SPL Runtime first calls the StateHandler::checkpoint() callback function to checkpoint operator state data, and then automatically calls the checkpointStateVariables() function to checkpoint logic-state variables. At reset, the SPL Runtime first calls the StateHandler::reset() callback function to reset operator state data, and then automatically calls the resetStateVariables() function to reset logic-state variables.

The third part of the Checkpointing Runtime API is the optional CheckpointContext class. If an operator is in autonomous region, runs in distributed mode, and has config checkpoint specified to either operatorDriven or periodic(T) in SPL, then a CheckpointContext instance is provided to the operator. This CheckpointContext instance provides functions to query the checkpointing configuration of the operator (such as whether operator driven or periodic checkpointing is configured, and what the periodic checkpointing interval is). This CheckpointContext instance also provides a function CheckpointContext::createCheckpoint() for operator code to perform operator driven checkpointing. The following code shows how to use the CheckpointContext::createCheckpoint() function in operator code for operator driven checkpointing.

// the header file which defines CheckpointContext class
#include <SPL/Runtime/Operator/State/CheckpointContext.h>
.... 
// operator code can use CheckpointContext::createCheckpoint() function for operator driven checkpointing;
// to do so, first retrieve the CheckpointContext instance
CheckpointContext * ckptContext = static_cast<CheckpointContext *>(this->getContext().getOptionalContext(CHECKPOINT));
if (ckptContext != NULL) {
    // then call CheckpointContext::createCheckpoint() to perform checkpointing
    ckptContext->createCheckpoint();
}

When called from operator code, the CheckpointContext::createCheckpoint() function internally invokes the operator's StateHandler::checkpoint() callback function and the checkpointStateVariables() function for checkpointing operator's state. The createCheckpoint() function returns after checkpointing is finished, so checkpointing is synchronous to the calling thread. If checkpointing is successful, the createCheckpoint() function returns true. If there is any error during checkpointing, the createCheckpoint() function throws a SPL::DataStoreException error. If checkpointing is not performed, the createCheckpoint() function does nothing and returns false. Starting with Version 4.0, the CheckpointContext::createCheckpoint() function either returns true or throws exception; the case of returning false is reserved.

The CheckpointContext instance is not available to an operator which either runs in standalone mode or does not have config checkpoint specified in SPL. In those cases, the OperatorContext::getOptionalContext(CHECKPOINT) function returns NULL.

For more information on the equivalent Java CheckpointContext interface, take a look at the SPL Java Operator API documentation.

To use the Checkpointing API, follow these guidelines:
  1. Locking in the Checkpointing API:

    For consistent region driven checkpointing and periodic checkpointing, the StateHandler::checkpoint() callback function is called by an internal SPL Runtime thread, and is asynchronous to tuple processing or the background operator thread. Therefore, SPL Runtime requires operator code to manage user-defined locks to ensure that the checkpointing thread has exclusive access to the operator state. For operator driven checkpointing, the thread that calls the CheckpointContext::createCheckpoint() function internally executes the StateHandler::checkpoint() callback function. If the checkpoint() callback function tries to acquire any lock that is already held by the calling thread, the calling thread can block itself, resulting in a deadlock. SPL Runtime requires operator code to manage user-defined locks to ensure that the thread that checkpoints the operator state does not create a deadlock by acquiring the same user-defined locks more than once. One recommended solution to avoid this deadlock issue is to use some form of re-entrant lock, such as boost::recursive_mutex.

  2. Example usage of the Checkpointing API:
    Although there are several different types of checkpointing, with proper use of the Checkpointing API, it is easy to implement an operator that can be used in different scenarios and perform checkpointing and resetting properly. The following example shows an operator that can be used in these scenarios:
    • It can be used within a consistent region and perform consistent region driven checkpointing and resetting.
    • It can be used outside consistent regions, run in distributed mode, and perform periodic checkpointing when config checkpoint is set to periodic(T) in SPL;
    • It can be used outside consistent regions, run in distributed mode, and perform operator driven checkpointing when config checkpoint is set to operatorDriven in SPL;
    • It can be used outside consistent regions, run in distributed mode, and do not perform any checkpointing or resetting when config checkpoint is not set in SPL;
    • It can run in standalone mode, and do not perform any checkpointing or resetting.

This code snippet is the MyOp_h.cgt file of the operator implementation:

<%
    # use CodeGen API to query whether the operator is used inside a consistent region
    my $isInConsistentRegion = $model->getContext()->getOptionalContext("ConsistentRegion");
    # if operator is outside consistent region, use CodeGen API to determine the kind of checkpointing (none, periodic, or operatorDriven)
    my $ckptKind = $model->getContext()->getCheckpointingKind();
    my @includes;
    if ($isInConsistentRegion) {
        push @includes, "#include <SPL/Runtime/Operator/State/StateHandler.h>";
    }
    if ($ckptKind ne "none") {
        push @includes, "#include <SPL/Runtime/Operator/State/CheckpointContext.h>";
    }
    SPL::CodeGen::headerPrologue($model, \@includes);   
%>

<% if ($ckptKind eq "operatorDirven") { %>
#include <boost/thread/recursive_mutex.hpp>
<% } %>
 
class MY_OPERATOR : public MY_BASE_OPERATOR
<% if ($isInConsistentRegion || $ckptKind ne "none") {%>
, public StateHandler
<% } %> {
public:
    // constructor
    MY_OPERATOR();
              
    // destructor
    virtual ~MY_OPERATOR();
   
    // tuple processing for mutating ports
    void process(Tuple & tuple, uint32_t port);
    ...
 
<% if ($isInConsistentRegion || $ckptKind ne "none") { %>
    // StateHandler callbacks
    virtual void checkpoint(Checkpoint & ckpt);
    virtual void reset(Checkpoint & ckpt);
    virtual void resetToIntialState();
<% } %>
 
private:
<% if ($ckptKind eq "operatorDirven") { %>
    // for operator-driven checkpointing, use a re-entrant lock to guard operator state
    // so that the thread calling createCheckpoint() does not deadlock itself
    boost::recursive_mutex myLock_; 
<% } else { %>
    // for all other cases, use SPL::Mutex
    Mutex myLock_;
<% } %>
    // variables myVar1_ and myVar2_ constitute this operator's state
    int myVar1_;
    float myVar2_;
};

This code snippet is the MyOp_cpp.cgt file of the operator implementation:

<%
   # use CodeGen API to query whether the operator is used inside a consistent region
   my $isInConsistentRegion = $model->getContext()->getOptionalContext("ConsistentRegion");
   # if operator is outside consistent region, use CodeGen API to determine the kind of checkpointing (none, periodic, or operatorDriven)
   my $ckptKind = $model->getContext()->getCheckpointingKind();
%>
...
 
<%if ($isInConsistentRegion || $ckptKind ne "none") {%>
 
// operator-provided StateHandler::checkpoint() callback
void MY_OPERATOR::checkpoint(Checkpoint & ckpt)
{
    <% if ($ckptKind eq " operatorDirven ") {
        // this operator is used outside consistent region and is configured with operator driven checkpointing
        // in this case, myLock_ is an re-entrant lock
        boost::recursive_mutex::scoped_lock am(myLock_);
    <% } else { %>
        // this operator is in consistent region, or outside consistent region but configured with periodic checkpointing
        // in this case, myLock_ is a regular SPL::Mutex
        AutoMutex am(myLock_);
    <% } %>
    // checkpoint operator state
    ckpt << myVar1_ << myVar2_;
}
 
// operator-provided StateHandler::reset() callback
void MY_OPERATOR::reset(Checkpoint & ckpt)
{
    <% if ($ckptKind eq " operatorDirven ") {
        // this operator is used outside consistent region and is configured with operator driven checkpointing
        // in this case, myLock_ is an re-entrant lock
        boost::recursive_mutex::scoped_lock am(myLock_);
    <% } else { %>
        // this operator is in consistent region, or outside consistent region but configured with periodic checkpointing
        // in this case, myLock_ is a regular SPL::Mutex
        AutoMutex am(myLock_);
    <% } %>
    // reset operator state
    ckpt >> myVar1_ >> myVar2_;
}
 
// operator-provided StateHandler::resetToInitialState() callback
void MY_OPERATOR::resetToInitialState()
{
    <% if ($ckptKind eq " operatorDirven ") {
        // this operator is used outside consistent region and is configured with operator driven checkpointing
        // in this case, myLock_ is an re-entrant lock
        boost::recursive_mutex::scoped_lock am(myLock_);
    <% } else { %>
        // this operator is in consistent region, or outside consistent region but configured with periodic checkpointing
        // in this case, myLock_ is a regular SPL::Mutex
        AutoMutex am(myLock_);
    <% } %>
    // reset operator state to initial values
    myVar1_ = 0;
    myVar2_ = 0.0;
}
<%}%>
 
// tuple processing function
void MY_OPERATOR::process(Tuple & tuple, uint32_t port)
{
    <% if ($ckptKind eq " operatorDirven ") {
        // this operator is used outside consistent region and is configured with operator driven checkpointing
        // in this case, myLock_ is an re-entrant lock
        boost::recursive_mutex::scoped_lock am(myLock_);
    <% } else { %>
        // this operator is in consistent region, or outside consistent region but configured with periodic checkpointing
        // in this case, myLock_ is a regular SPL::Mutex
        AutoMutex am(myLock_);
    <% } %>
    // process tuple and manipulate operator state
    IPort0Type & ituple = static_cast<IPort0Type &>(tuple);
    myVar1_ += ituple.get_myInt();
    myVar2_ += ituple.get_myFloat();
 
    // perform operator driven checkpointing synchronously
    CheckpointContext * ckptContext = static_cast<CheckpointContext *>(this->getContext().getOptionalContext(CHECKPOINT));
    if (ckptContext != NULL && ckptContext->getKind() == CheckpointContext::operatorDriven) {
        ckptContext->createCheckpoint();
    }
}

In the example code, the operator differentiates whether it is configured to do operatorDriven checkpointing or other kinds of checkpointing (periodic or consistent-cut driven). If it's operatorDriven checkpointing, the operator code uses a re-entrant lock to guard operator state, so that when the CheckpointContext::createCheckpoint() function is called from the process() function, the checkpoint() callback function does not cause a deadlock on the calling thread by trying to acquire the same lock already held by the thread. For other kinds of checkpointing, the operator code uses a regular SPL::Mutex lock.

If the operator does not use operator driven checkpointing in its code, there is no need to use a re-entrant lock to avoid deadlock. In that case, the operator code can just use a regular SPL::Mutex lock, and checking the type of checkpointing at code-generation time can be removed.