Checkpointing
Checkpointing is the process of persisting operator state at run time to allow recovery from a failure. In case of failure, the operator can be restarted by resetting from the checkpointed state.
- If an operator is within a consistent region, checkpointing/resetting an operator's state is automatically invoked by the consistent region protocol. An operator's state is checkpointed during draining of the consistent region, and reset during resetting of the consistent region.
- If an operator is outside a consistent region (that is, in an autonomous region), an operator can be configured to perform checkpointing via the
config checkpoint
clause in SPL. This type of checkpointing can be further divided into two types: operator driven checkpointing and periodic checkpointing. Operator driven checkpointing is requested explicitly from within the Java/C++ operator code, whereas periodic checkpointing takes place automatically at regular user-defined intervals. In case of failure, the SPL Runtime automatically resets the operator state to the last saved checkpoint. Two sample operators configured with operator driven checkpointing and periodic checkpointing, respectively are given here.
stream<...> Out1 = MyOper1(...) {
...
config
checkpoint : operatorDriven; // operator driven checkpointing
restartable : true;
}
stream<...> Out2 = MyOper2(...) {
...
config
checkpoint : periodic(10.5); // periodic checkpointing; checkpointing interval is 10.5 seconds
restartable : true;
}
In the sample SPL code, operator MyOper1
is configured to perform operator driven checkpointing, and operator MyOper2
is configured to perform periodic checkpointing every 10.5 seconds.
Regardless of the checkpointing type, an operator's checkpoint data is represented as a set of key-value pairs. Each checkpoint has a unique sequence ID as key and the serialized state as value. When checkpointing is invoked, the SPL Runtime automatically assigns a sequence ID, and the operator uses the Checkpointing API to serialize its state data. The resulting checkpoint is stored to some backend persistent data store in the form of key-value pairs. At reset, the SPL Runtime decides which sequence ID to reset from, retrieves the corresponding checkpoint from the backend data store, and resets the operator state by deserializing data from the checkpoint.
Since Version 4.0, Teracloud® Streams provides a new set of Checkpointing Runtime APIs in both C++ and Java. The new Checkpointing API is part of the C++ and Java Operator Runtime API, and is intended for an operator to checkpoint and reset its state data. It should be noted that the Checkpointing API in versions prior to Version 4.0 is deprecated and we encourage users to use the new API described in this page.
The Checkpointing Runtime API has three parts: the StateHandler
interface, the
Checkpoint
class, and the optional CheckpointContext
class.
The first part of the C++ Checkpointing Runtime API is the StateHandler
class. The StateHandler
class defines a set of callback interfaces such as checkpoint()
and reset()
functions. Operator developers provide those callback functions in the operator implementation and register the callback functions with the SPL Runtime. At runtime, the SPL Runtime automatically invokes the registered checkpoint()
and reset()
callback functions to checkpoint and reset an operator's state.
An operator class implements the StateHandler
interface by inheriting from it. The following example shows a C++ operator that implements its own StateHandler
callback functions.
This code snippet is the header file of the operator implementation:
#include <SPL/Runtime/Operator/StateHandler.h>
class MY_OPERATOR : public MY_BASE_OPERATOR, StateHandler // inherit from StateHandler
{
public:
...
// callback functions from StateHandler.h
virtual void checkpoint(Checkpoint & ckpt);
virtual void reset(Checkpoint & ckpt);
virtual void resetToInitialState();
private:
// variables myInt_ and myVector_ constitute the state of this operator
uint32_t myInt_;
std::vector<IPort0Type> myVector_;
Mutex myMutex_; // lock for guarding concurrent access to operator state
};
This code snippet is the C++ file of the operator implementation:
// Constructor of operator
MY_OPERATOR::MY_OPERATOR()
: myInt_(0)
{
...
// register StateHandler callbacks
getContext().registerStateHandler(*this);
}
// process() function for tuple processing
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)
{
// acquire lock
AutoMutex am(myMutex_);
// manipulate operator state during tuple processing
IPort0Type & ituple = static_cast<IPort0Type &>(tuple);
myInt_ ++;
myVector_.push_back(ituple);
// submit tuple downstream
OPort0Type otuple(tuple.getAttributeValue(0), myInt_);
submit(otuple, 0);
}
// checkpoint() callback to checkpoint operator state data
void MY_OPERATOR::checkpoint(Checkpoint & ckpt)
{
// acquire lock
AutoMutex am(myMutex_);
SPLAPPTRC(L_TRACE, "Checkpointing with sequence id: " << ckpt.getSequenceId(), "MY_OP");
// serialize state data to Checkpoint
ckpt << myInt_;
ckpt << myVector_;
}
// reset() callback to reset operator state data
void MY_OPERATOR::reset(Checkpoint & ckpt)
{
// acquire lock
AutoMutex am(myMutex_);
SPLAPPTRC(L_TRACE, "Resetting with sequence id: " << ckpt.getSequenceId(), "MY_OP");
// de-serialize state data from Checkpoint
ckpt >> myInt_;
ckpt >> myVector_;
}
// resetToIntialState() callback to reset operator to initial state
void MY_OPERATOR::resetToInitialState()
{
// acquire lock
AutoMutex am(myMutex_);
myInt_ = 0;
myVector_.clear();
}
For more information on the equivalent Java StateHandler
interface, take a look
at the SPL Java Operator API documentation.
The second part of C++ Checkpointing Runtime API is the Checkpoint
class. The Checkpoint
class provides an abstraction to the checkpoint data. During checkpointing, a Checkpoint
instance is passed to the operator's StateHandler::checkpoint()
callback function. The operator can specify the data to checkpoint by using the <<
operator to serialize data to the provided Checkpoint
instance. The SPL Runtime internally serializes checkpointed data and transfers the data to the backend data store. At reset, a Checkpoint
instance is passed to the operator's StateHandler::reset()
callback function. The operator can extract the data from checkpoint and reset its state by using the >>
operator to deserialize data from the provided Checkpoint
instance.
The Checkpoint
class provides serialization and deserialization support for all primitive data types, SPL data types, and typical C++ STL container types. Serialization and deserialization of nested data types are automatically handled as well.
Besides serialization and deserialization, operator code can also call the Checkpoint::getSequenceId()
function to get the sequence ID of the checkpoint being written or reset. For an operator within a consistent region, the sequence ID of a checkpoint is the same as the marker ID assigned by the consistent region protocol; for an operator in an autonomous region, the sequence ID is generated by an internal counter local to each operator. In both cases, the sequence ID is a positive integer number and increases with new checkpoints, but the sequence ID numbers are not guaranteed to be consecutive.
For more information on the equivalent Java Checkpoint
interface, take a look at
the SPL Java Operator API documentation.
Note that an operator's state has two parts: the optional SPL logic-state variables and the state data. The logic-state variables (if present) are automatically checkpointed and reset by the SPL Runtime, while the state data is checkpointed and reset via operator-provided StateHandler::checkpoint()
and StateHandler::reset()
callbacks.
If an operator has logic-state variables, during code generation, the SPL compiler automatically generates two functions to checkpoint and reset the logic-state variables. The signature of the two functions, checkpointStateVariables()
and resetStateVariables()
, are shown here.
// auto-generated functions for checkpointing and resetting logic-state variables
void MY_BASE_OPERATOR::checkpointStateVariables(Checkpoint & ckpt);
void MY_BASE_OPERATOR::resetStateVariables(Checkpoint & ckpt);
During checkpointing, the SPL Runtime first calls the StateHandler::checkpoint()
callback function to checkpoint operator state data, and then automatically calls the checkpointStateVariables()
function to checkpoint logic-state variables. At reset, the SPL Runtime first calls the StateHandler::reset()
callback function to reset operator state data, and then automatically calls the resetStateVariables()
function to reset logic-state variables.
The third part of the Checkpointing Runtime API is the optional CheckpointContext
class. If an operator is in autonomous region, runs in distributed mode, and has config checkpoint
specified to either operatorDriven
or periodic(T)
in SPL, then a CheckpointContext
instance is provided to the operator. This CheckpointContext
instance provides functions to query the checkpointing configuration of the operator (such as whether operator driven or periodic checkpointing is configured, and what the periodic checkpointing interval is). This CheckpointContext
instance also provides a function CheckpointContext::createCheckpoint()
for operator code to perform operator driven checkpointing. The following code shows how to use the CheckpointContext::createCheckpoint()
function in operator code for operator driven checkpointing.
// the header file which defines CheckpointContext class
#include <SPL/Runtime/Operator/State/CheckpointContext.h>
....
// operator code can use CheckpointContext::createCheckpoint() function for operator driven checkpointing;
// to do so, first retrieve the CheckpointContext instance
CheckpointContext * ckptContext = static_cast<CheckpointContext *>(this->getContext().getOptionalContext(CHECKPOINT));
if (ckptContext != NULL) {
// then call CheckpointContext::createCheckpoint() to perform checkpointing
ckptContext->createCheckpoint();
}
When called from operator code, the CheckpointContext::createCheckpoint()
function internally invokes the operator's StateHandler::checkpoint()
callback function and the checkpointStateVariables()
function for checkpointing operator's state. The createCheckpoint()
function returns after checkpointing is finished, so checkpointing is synchronous to the calling thread. If checkpointing is successful, the createCheckpoint()
function returns true
. If there is any error during checkpointing, the createCheckpoint()
function throws a SPL::DataStoreException
error. If checkpointing is not performed, the createCheckpoint()
function does nothing and returns false
. Starting with Version 4.0, the CheckpointContext::createCheckpoint()
function either returns true
or throws exception; the case of returning false
is reserved.
The CheckpointContext
instance is not available to an operator which either runs in standalone mode or does not have config checkpoint
specified in SPL. In those cases, the OperatorContext::getOptionalContext(CHECKPOINT)
function returns NULL
.
For more information on the equivalent Java CheckpointContext
interface, take a
look at the SPL Java Operator API documentation.
- Locking in the Checkpointing API:
For consistent region driven checkpointing and periodic checkpointing, the
StateHandler::checkpoint()
callback function is called by an internal SPL Runtime thread, and is asynchronous to tuple processing or the background operator thread. Therefore, SPL Runtime requires operator code to manage user-defined locks to ensure that the checkpointing thread has exclusive access to the operator state. For operator driven checkpointing, the thread that calls theCheckpointContext::createCheckpoint()
function internally executes theStateHandler::checkpoint()
callback function. If thecheckpoint()
callback function tries to acquire any lock that is already held by the calling thread, the calling thread can block itself, resulting in a deadlock. SPL Runtime requires operator code to manage user-defined locks to ensure that the thread that checkpoints the operator state does not create a deadlock by acquiring the same user-defined locks more than once. One recommended solution to avoid this deadlock issue is to use some form of re-entrant lock, such asboost::recursive_mutex
. - Example usage of the Checkpointing API:Although there are several different types of checkpointing, with proper use of the Checkpointing API, it is easy to implement an operator that can be used in different scenarios and perform checkpointing and resetting properly. The following example shows an operator that can be used in these scenarios:
- It can be used within a consistent region and perform consistent region driven checkpointing and resetting.
- It can be used outside consistent regions, run in distributed mode, and perform periodic checkpointing when
config checkpoint
is set toperiodic(T)
in SPL; - It can be used outside consistent regions, run in distributed mode, and perform operator driven checkpointing when
config checkpoint
is set tooperatorDriven
in SPL; - It can be used outside consistent regions, run in distributed mode, and do not perform any checkpointing or resetting when
config checkpoint
is not set in SPL; - It can run in standalone mode, and do not perform any checkpointing or resetting.
This code snippet is the MyOp_h.cgt
file of the operator implementation:
<%
# use CodeGen API to query whether the operator is used inside a consistent region
my $isInConsistentRegion = $model->getContext()->getOptionalContext("ConsistentRegion");
# if operator is outside consistent region, use CodeGen API to determine the kind of checkpointing (none, periodic, or operatorDriven)
my $ckptKind = $model->getContext()->getCheckpointingKind();
my @includes;
if ($isInConsistentRegion) {
push @includes, "#include <SPL/Runtime/Operator/State/StateHandler.h>";
}
if ($ckptKind ne "none") {
push @includes, "#include <SPL/Runtime/Operator/State/CheckpointContext.h>";
}
SPL::CodeGen::headerPrologue($model, \@includes);
%>
<% if ($ckptKind eq "operatorDirven") { %>
#include <boost/thread/recursive_mutex.hpp>
<% } %>
class MY_OPERATOR : public MY_BASE_OPERATOR
<% if ($isInConsistentRegion || $ckptKind ne "none") {%>
, public StateHandler
<% } %> {
public:
// constructor
MY_OPERATOR();
// destructor
virtual ~MY_OPERATOR();
// tuple processing for mutating ports
void process(Tuple & tuple, uint32_t port);
...
<% if ($isInConsistentRegion || $ckptKind ne "none") { %>
// StateHandler callbacks
virtual void checkpoint(Checkpoint & ckpt);
virtual void reset(Checkpoint & ckpt);
virtual void resetToIntialState();
<% } %>
private:
<% if ($ckptKind eq "operatorDirven") { %>
// for operator-driven checkpointing, use a re-entrant lock to guard operator state
// so that the thread calling createCheckpoint() does not deadlock itself
boost::recursive_mutex myLock_;
<% } else { %>
// for all other cases, use SPL::Mutex
Mutex myLock_;
<% } %>
// variables myVar1_ and myVar2_ constitute this operator's state
int myVar1_;
float myVar2_;
};
This code snippet is the MyOp_cpp.cgt
file of the operator implementation:
<%
# use CodeGen API to query whether the operator is used inside a consistent region
my $isInConsistentRegion = $model->getContext()->getOptionalContext("ConsistentRegion");
# if operator is outside consistent region, use CodeGen API to determine the kind of checkpointing (none, periodic, or operatorDriven)
my $ckptKind = $model->getContext()->getCheckpointingKind();
%>
...
<%if ($isInConsistentRegion || $ckptKind ne "none") {%>
// operator-provided StateHandler::checkpoint() callback
void MY_OPERATOR::checkpoint(Checkpoint & ckpt)
{
<% if ($ckptKind eq " operatorDirven ") {
// this operator is used outside consistent region and is configured with operator driven checkpointing
// in this case, myLock_ is an re-entrant lock
boost::recursive_mutex::scoped_lock am(myLock_);
<% } else { %>
// this operator is in consistent region, or outside consistent region but configured with periodic checkpointing
// in this case, myLock_ is a regular SPL::Mutex
AutoMutex am(myLock_);
<% } %>
// checkpoint operator state
ckpt << myVar1_ << myVar2_;
}
// operator-provided StateHandler::reset() callback
void MY_OPERATOR::reset(Checkpoint & ckpt)
{
<% if ($ckptKind eq " operatorDirven ") {
// this operator is used outside consistent region and is configured with operator driven checkpointing
// in this case, myLock_ is an re-entrant lock
boost::recursive_mutex::scoped_lock am(myLock_);
<% } else { %>
// this operator is in consistent region, or outside consistent region but configured with periodic checkpointing
// in this case, myLock_ is a regular SPL::Mutex
AutoMutex am(myLock_);
<% } %>
// reset operator state
ckpt >> myVar1_ >> myVar2_;
}
// operator-provided StateHandler::resetToInitialState() callback
void MY_OPERATOR::resetToInitialState()
{
<% if ($ckptKind eq " operatorDirven ") {
// this operator is used outside consistent region and is configured with operator driven checkpointing
// in this case, myLock_ is an re-entrant lock
boost::recursive_mutex::scoped_lock am(myLock_);
<% } else { %>
// this operator is in consistent region, or outside consistent region but configured with periodic checkpointing
// in this case, myLock_ is a regular SPL::Mutex
AutoMutex am(myLock_);
<% } %>
// reset operator state to initial values
myVar1_ = 0;
myVar2_ = 0.0;
}
<%}%>
// tuple processing function
void MY_OPERATOR::process(Tuple & tuple, uint32_t port)
{
<% if ($ckptKind eq " operatorDirven ") {
// this operator is used outside consistent region and is configured with operator driven checkpointing
// in this case, myLock_ is an re-entrant lock
boost::recursive_mutex::scoped_lock am(myLock_);
<% } else { %>
// this operator is in consistent region, or outside consistent region but configured with periodic checkpointing
// in this case, myLock_ is a regular SPL::Mutex
AutoMutex am(myLock_);
<% } %>
// process tuple and manipulate operator state
IPort0Type & ituple = static_cast<IPort0Type &>(tuple);
myVar1_ += ituple.get_myInt();
myVar2_ += ituple.get_myFloat();
// perform operator driven checkpointing synchronously
CheckpointContext * ckptContext = static_cast<CheckpointContext *>(this->getContext().getOptionalContext(CHECKPOINT));
if (ckptContext != NULL && ckptContext->getKind() == CheckpointContext::operatorDriven) {
ckptContext->createCheckpoint();
}
}
In the example code, the operator differentiates whether it is configured to do operatorDriven
checkpointing or other kinds of checkpointing (periodic or consistent-cut driven). If it's operatorDriven
checkpointing, the operator code uses a re-entrant lock to guard operator state, so that when the CheckpointContext::createCheckpoint()
function is called from the process()
function, the checkpoint()
callback function does not cause a deadlock on the calling thread by trying to acquire the same lock already held by the thread. For other kinds of checkpointing, the operator code uses a regular SPL::Mutex
lock.
If the operator does not use operator driven checkpointing in its code, there is no need to use a re-entrant lock to avoid deadlock. In that case, the operator code can just use a regular SPL::Mutex
lock, and checking the type of checkpointing at code-generation time can be removed.