Implementing non-blocking checkpointing in stateful primitive operators
A stateful C++ or Java primitive operator can implement non-blocking checkpointing of operator state data to reduce the time that the tuple flow is blocked during checkpointing.
When a consistent state of a consistent region is established, the tuple flow is temporarily blocked until the operators in the consistent region are drained and checkpointed. With non-blocking checkpointing, the tuple flow can be resumed while an internal SPL Runtime thread checkpoints operator state data in the background.
- The tuple flow is blocked.
- The SPL Runtime invokes the operator's
StateHandler::drain()
callback function. - The SPL Runtime invokes the operator's
StateHandler::checkpoint()
callback function. - The tuple flow is resumed.
- The tuple flow is blocked.
- The SPL Runtime invokes the operator's
StateHandler::drain()
callback function. - The SPL Runtime invokes the operator's
StateHandler::prepareForNonBlockingCheckpoint()
callback function. - The tuple flow is resumed. Meanwhile the SPL Runtime delegates an internal thread to execute the operator’s
StateHandler::checkpoint()
callback function in the background. Background checkpointing is internally tracked by the SPL Runtime. - The background thread finishes checkpointing the operator state.
- The operator implementation must call the
ConsistentRegionContext::enableNonBlockingCheckpoint()
function in the operator's constructor. - The operator must implement the
StateHandler::prepareForNonBlockingCheckpoint()
andStateHandler::checkpoint()
callback functions.
The StateHandler::prepareForNonBlockingCheckpoint()
function is called after the StateHandler::drain()
function. The StateHandler::prepareForNonBlockingCheckpoint()
function prepares the operator state in a certain way so that later it can be checkpointed by a StateHandler::checkpoint()
callback while new tuples are being processed. Note that the processing of new tuples usually requires reading and updating the latest version of the operator state, and background checkpointing requires reading the version of the operator state at the time when StateHandler::prepareForNonBlockingCheckpoint()
is called.
With the StateHandler::prepareForNonBlockingCheckpoint()
and StateHandler::checkpoint()
callback interface, an operator can implement non-blocking checkpointing in various ways. One simple approach is to make a copy of the operator state in StateHandler::prepareForNonBlockingCheckpoint()
and checkpoint that copy of the operator state in StateHandler::checkpoint()
. This way, background checkpointing reads the copied operator state data, while tuple processing reads and updates the original copy of the state. Thus, both activities can make progress concurrently without any synchronization with each other. Another approach is to serialize the operator state into a memory buffer (such as SPL::NativeByteBuffer
) in StateHandler::prepareForNonBlockingCheckpoint()
and to checkpoint the serialized data in StateHandler::checkpoint()
.
As an example, suppose the operator has a vector of strings as its state. It can implement the StateHandler::prepareForNonBlockingCheckpoint()
and StateHandler::checkpoint()
callbacks as follows.
MyOp_h.cgt
file of the operator implementation:// in MyOp_h.cgt file
<%
# use Code Generation API to check if the operator is used in a Consistent Region
my $isInConsistentRegion =
$model->getContext()->getOptionalContext("ConsistentRegion");
my @includes;
if ($isInConsistentRegion) {
push @includes, "#include <SPL/Runtime/Operator/State/StateHandler.h>";
}
SPL::CodeGen::headerPrologue($model, \@includes);
%>
#include <vector>
#include <string>
#include < SPL/Runtime/Serialization/NativeByteBuffer.h >
class MY_OPERATOR : public MY_BASE_OPERATOR, public StateHandler {
public:
MY_OPERATOR();
~MY_OPERATOR();
virtual void process(Punctuation const & punct, uint32_t port);
virtual void process(Tuple const & tuple, uint32_t port);
// StateHandler callback interface
virtual void checkpoint(Checkpoint & ckpt);
virtual void reset(Checkpoint & ckpt);
virtual void resetToInitialState();
virtual void prepareForNonBlockingCheckpoint(int64_t id);
private:
SPL::Mutex _mutex; // mutex lock to guard concurrent access to operator state
Std::vector<std::string> _myState; // a vector of strings as operator state
SPL::NativeByteBuffer _serializedState; // serialization buffer
}
MyOp_cpp.cgt
file of the operator implementation:// in MyOp_cpp.cgt file
<%
# use Code Generation API to check if the operator is used in a Consistent Region
my $isInConsistentRegion = $model->getContext()->getOptionalContext("ConsistentRegion");
%>
// Constructor
MY_OPERATOR::MY_OPERATOR() {
<% if ($isInConsistentRegion) {%>
ConsistentRegionContext * crContext =
static_cast<ConsistentRegionContext *>(getContext().getOptionalContext(CONSISTENT_REGION));
// call ConsistentRegionContext::enableNonBlockingCheckpoint() in operator constructor
crContext->enableNonBlockingCheckpoint();
<% } %>
}
// process() function for tuple processing
void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)
{
AutoPortMutex apm (_mutex, *this);
// manipulate operator state (i.e. _myState) based on incoming tuple
std::string newStirng = functionFoo(tuple);
_myState.push_back(newString);
}
// Function to prepare operator state for non-blocking checkpointing
void MY_OPERATOR::prepareForNonBlockingCheckpoint(int64_t id)
{
// serialize the state
_serializedState << _myState;
}
// Function to checkpoint operator state
// This function can be executed while process() is called for processing new tuples
void MY_OPERATOR::checkpoint(Checkpoint & ckpt)
{
// Note that no locking on _mutex is needed since this function does not access _myState
// checkpoint the serialized state
ckpt.addCharSequence((char const *)_serializedState.getPtr(),
(uint32_t)_serializedState.getContentSize());
// reset the cursor of serialization buffer to the beginning
_serializedState.setICursor(0);
}
// Function to restore operator state
void MY_OPERATOR::reset(Checkpoint & ckpt)
{
AutoMutex am(_mutex);
// restore _myState from checkpoint
ckpt >> _myState;
// reset the cursor of serialization buffer to the beginning
_serializedState.setICursor(0);
}
// Function to reset operator to initial state
void MY_OPERATOR::resetToInitialState()
{
AutoMutex am(_mutex);
_myState.clear();
_serializedState.setICursor(0);
}
The approach shown in the example is simple to implement, and it has a performance advantage over the blocking checkpointing implementation. In the blocking checkpointing implementation, the vector of strings is directly checkpointed in StateHandler::checkpoint()
, and both data serialization and I/O time contribute to the time the tuple flow is blocked. With the non-blocking checkpointing implementation, however, the blocking time is only the serialization time, which should be significantly shorter.
Besides the simple copy approach, there are other ways to implement non-blocking checkpointing. One alternative is to use copy-on-write data structures.
Operators that enable non-blocking checkpointing and operators that do not enable non-blocking checkpointing can be put in the same consistent region. In this case, the tuple flow of the consistent region is blocked when the region starts draining, and it is resumed when all the non-blocking checkpointing operators have completed their StateHandler::prepareForNonBlockingCheckpoint()
callbacks and all the blocking checkpointing operators have completed their StateHandler::checkpoint()
callbacks.
If an operator calls the ConsistentRegionContext::enableNonBlockingCheckpoint()
function in its constructor while having logic state variables defined, its StateHandler::prepareForNonBlockingCheckpoint()
and StateHandler::checkpoint()
callbacks are still invoked by the SPL Runtime during draining the consistent region, but non-blocking checkpointing does not take effect at run time. That means, the tuple flow is resumed only after the StateHandler::checkpoint()
callback is completed.
If an operator that is configured with checkpointing is in an autonomous region, the StateHandler::prepareForNonBlockingCheckpoint()
callback is ignored
Implementing start operators that support non-blocking checkpointing
StateHandler::regionCheckpointed()
callback function to get notified by the SPL Runtime when all the operators in the consistent region were successfully drained and checkpointed, including those operators that perform non-blocking checkpointing. Within the StateHandler::regionCheckpointed()
callback function, the start operator can implement logic such as:- Printing log messages, for example to record that a consistent state was established for the consistent region.
- Cleaning up certain resources, for example purging any buffered input tuples received before this drain, because those tuples were reflected in the persisted state of the consistent region and are no longer needed for replay upon recovery from a failure.
A start operator that starts an operator-driven consistent region initiates the draining of the
region by calling either the
ConsistentRegionContext::makeConsistent()
or the
ConsistentRegionContext::makeConsistentNonBlocking()
function.
When any thread in a start operator calls the ConsistentRegionContext::makeConsistent()
function to initiate the draining of the consistent region, the calling thread is blocked on this call. The ConsistentRegionContext::makeConsistent()
function returns true when all operators in the consistent region were drained and checkpointed, including any operator with non-blocking checkpointing enabled. The ConsistentRegionContext::makeConsistent()
function returns false when there is a shutdown request or the region is reset due to failure. This means that if the start operator uses the ConsistentRegionContext::makeConsistent()
function, the tuple flow is blocked until the whole consistent region is checkpointed, so that any operator that has non-blocking checkpointing enabled effectively falls back to blocking checkpointing at run time.
Alternatively, a start operator can call the ConsistentRegionContext::makeConsistentNonBlocking()
API to initiate the draining of the consistent region and allow true non-blocking checkpointing at run time. The ConsistentRegionContext::makeConsistentNonBlocking()
function returns an Enum object DrainResult
with a value of either CHECKPOINT_PENDING
, COMPLETED
, or FAILED
. Returning CHECKPOINT_PENDING
means that the StateHandler::drain()
callback is complete for all operators that do blocking checkpointing, and the StateHandler::prepareForNonBlockingCheckpoint()
callback is complete for all operators that do non-blocking checkpointing. In this case, the tuple flow can be resumed while the pending non-blocking checkpointing is ongoing. When all the pending non-blocking checkpointing has finished, the SPL Runtime calls the start operator’s StateHandler::regionCheckpointed()
function. The ConsistentRegionContext::makeConsistentNonBlocking()
function can return COMPLETED
in case all operators in the consistent region have been drained and checkpointed, or return FAILED
if there is a shutdown request or the region is reset due to failure.
Operator developers should use the
ConsistentRegionContext::makeConsistentNonBlocking()
function
in favor of the ConsistentRegionContext::makeConsistent()
function
to achieve non-blocking checkpointing of an operator-driven consistent region. The
ConsistentRegionContext::makeConsistent()
function is still
available in Teracloud® Streams mainly for maintaining backwards compatibility with previous product
versions.