Stateful primitive operators that participate in consistent regions
StateHandler interface
Start operators of a consistent region replay tuples when the region resets after a failure. This behavior implies that downstream operators receive repeated tuples after a reset. If the operators are stateful, the same tuple might be applied to the operator state. If this action does not meet the requirements of the application, primitive operators can persist and restore their states after a drain or during a reset.
A drain and a reset have sequence numbers. If the persisted state is associated with these sequence numbers, the state is consistent with the state of other operators that use the same sequence numbers. The state is also consistent to all an operator's input and output streams at the point after the drain. If the operator state is restored during a reset, the operator does not need to handle duplicates because restoring the state is as if all the tuples that were sent after the last successful drain (the duplicates) are discarded.
The StateHandler interface enables operators to persist and restore their
states to be consistent with other operators in the region. The logical division of the
output streams of operators is established after the drain() callback in
the SPL StateHandler interface returns. Any tuple or punctuation that is
submitted to output ports before or during the drain method call is processed by the
consistent region before the successful conclusion of the drain. If the drain callback is
not implemented by an operator, the logical division is established automatically by the
Teracloud® Streams
instance. The Teracloud® Streams
instance calls the checkpoint()
and reset()
callbacks at the
appropriate time.
Consistent region permits
If the primitive operator has background threads that are submitting tuples or punctuation, operators must acquire tuple or punctuation submission permits by creating a ConsistentRegionPermit
. These permits can be created using the ConsistentRegionContext
. The permits enable the operator to pause tuple or punctuation submissions during a checkpoint or a reset. Pausing tuple submission ensures that the internal state of the operator is consistent with its input and output streams and the state of other operators in the consistent region. Enabling threads to pause and resume submission requires writing nontrivial thread coordination code that involves mutual exclusion and conditional variables. Acquiring a tuple or punctuation submission permit omits the need to write that coordination code.
Sample C++ primitive operator
This example shows
a providesSingleThreadedContext=Always
operator,
where tuples are submitted only from process(Tuple)
and process(Punctuation)
functions.
numTuples_
).
If checkpointing and resetting code is not implemented by the operator,
then the following outcomes can occur:- On the failure of other operators, the
TupleCounter
operator over counts tuples on a replay. - On the failure of the operator itself, the
TupleCounter
operator loses the value of the counter and under counts tuples.
Header file
<%
my $isInConsistentRegion =
$model->getContext()->getOptionalContext("ConsistentRegion");
my @includes;
if ($isInConsistentRegion) {
push @includes, "#include <SPL/Runtime/Operator/State/StateHandler.h>";
}
SPL::CodeGen::headerPrologue($model, \@includes);
%>
The checkpoint and reset virtual methods are inherited from the StateHandler
interface. These methods are parameterized with a reference to an object of the Checkpoint
class. This reference can be used to serialize and deserialize operator state into a
configured checkpoint backend store. The resetToInitialState()
method is
used to reset the operator state to the state of the operator after the
Operator::allPortsReady()
routine is invoked. This method is used when
the consistent region must reset before the completion of the first successful consistent
state.
drain()
, the checkpoint()
, the reset()
, and the resetToInitialState()
methods.class MY_OPERATOR : public MY_BASE_OPERATOR
<%if ($isInConsistentRegion) {%>
, public StateHandler
<%}%>{
public:
...
<%if ($isInConsistentRegion) {%>
// Callbacks from StateHandler.h
virtual void drain();
virtual void checkpoint(Checkpoint & ckpt);
virtual void reset(Checkpoint & ckpt);
virtual void resetToInitialState();
<%}%>
private:
uint32_t numTuples_;
Mutex mutex_;
};
C++ file
...
<%
my $isInConsistentRegion =
$model->getContext()->getOptionalContext("ConsistentRegion");
%>
On the constructor, the operator must register the object implementing the StateHandler interface. In this example, the object is the operator itself, making the registration step optional. MY_OPERATOR::MY_OPERATOR()
: numTuples_(0)
{
<%if ($isInConsistentRegion) {%>
getContext().registerStateHandler(*this);
<%}%>
}
On the process function, the operator updates its internal state and submits a resulting tuple.void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)
{
AutoPortMutex am(mutex_, *this);
numTuples_++;
OPort0Type otuple(tuple.getAttributeValue(0), numTuples_);
submit(otuple, 0);
}
The drain method is empty, as there are no pending actions to be done by the operator prior to the establishment of a consistent state. The checkpoint and reset methods serialize (checkpoint) and deserialize (reset) the state of the operator into and from the checkpoint backend. <%if ($isInConsistentRegion) {%>
void MY_OPERATOR::drain()
{
}
void MY_OPERATOR::checkpoint(Checkpoint & ckpt)
{
AutoMutex am(mutex_);
ckpt << numTuples_;
}
void MY_OPERATOR::reset(Checkpoint & ckpt)
{
AutoMutex am(mutex_);
ckpt >> numTuples_;
}
The resetToInitialState
method resets the operator to its initial state.void MY_OPERATOR::resetToInitialState
{
AutoMutex am(mutex_);
numTuples_ = 0;
}
<%}%>
For information about how to create a sample application
that uses the TupleCounter primitive operator,
see the sample::StatefulPrimitive
sample application
that is located at $STREAMS_INSTALL/samples/spl/feature/ConsistentRegion/sample/.