Primitive operators that start consistent regions
Any primitive operator that starts a consistent region and that replays its submitted tuples upon a failure must acquire tuple submission permits to pause and resume tuple submission during a checkpoint or a reset.
You write C++ primitive operators that start a consistent region by using the StateHandler interface and acquiring a consistent region permit to submit tuples. Acquiring permits helps to pause and resume tuple submission during a checkpoint and reset. Acquiring permits for tuple submission is mandatory for operators that start a consistent region and for operators in a consistent region that have background threads.
If a start operator is in a periodic consistent region and it completes submitting output tuples, draining and resetting of the consistent region continues until the job finishes.
Consistent region permits
In general, start operators of a consistent region are source operators. In C++, source operators must implement the process(uint32_t)
method, which is the function that is called by the Teracloud® Streams
instance to run the main logic of a thread. When the operator is outside of a consistent region, the operator can submit tuples or punctuation at any time. When the operator is in a consistent region, the operator must pause tuple or punctuation submission every time a checkpoint or a reset occurs.
Code your operator so that checkpoints and resets happen at natural points of an operator execution, such as after the operators change their internal state and when they submit a tuple or punctuation, or after a more elaborate operation, such as when the operators reach an end of file while they read a file. After you define these natural points, acquire permits to guard this code block with a tuple or punctuation submission permit that is specific to a consistent region.
A consistent region permit ensures that if an operator thread is running the code block, checkpoints and resets do not occur. Multiple threads can hold permits at the same time. However, checkpoints and resets occur after they receive the corresponding notifications from the consistent region controller, and after all operator threads release their permits. After that, the Teracloud® Streams instance calls the StateHandler methods (checkpoint and reset), and starts checkpointing or resetting the downstream operators of the region.
If any thread attempts to acquire a permit when a checkpoint or reset is in progress, the thread blocks. A thread also blocks on a permit when the operator restarts and has not yet reset. The thread is granted a permit, that is, unblocks, after a consistent state is established or reset successfully and the consistent region controller notifies that tuple or punctuation submission can be resumed.
- Calling a paired
ConsistentRegionContext::acquirePermit()
andConsistentRegionContext::releasePermit()
- Defining a scope with a
ConsistentRegionPermit
, which is a RAII handler for the permit acquisition and release methods in theConsistentRegionContext
A Java™ primitive operator can acquire and release permits by calling acquirePermit()
and releasePermit()
from a ConsistentRegionContext
. Releasing a permit is an indication that a consistent state can be correctly established or reset. If an unrecoverable error happens when a permit is being held, the operator must explicitly indicate that a consistent region must reset via the ConsistentRegionContext
.
A thread can block when it tries to acquire a lock. As a result, be mindful of when the code acquires a lock to avoid deadlock situations. If a thread must acquire the lock, then acquire it inside the code block that is running with a permit. The reason to acquire a lock inside the code block with a permit is that the same lock might be acquired by a different thread, which then cannot acquire the lock until the checkpoint or reset is complete. An example of a different thread that is unable to acquire the lock if it is not acquired inside a code block with a permit is the SPL runtime thread, which cannot not acquire the lock when the thread calls checkpoint or reset.
Sample start operator
This example concerns only source operators that start a consistent region. The same principles that are used for acquiring permits can be applied to primitive operators that do not start a region but have background threads that do tuple or punctuation submission.
uint64
attribute.
The attribute value starts at 0
and is incremented
by one at each tuple submission. If the operator does not persist
and restore its state during a checkpoint and reset, the following
outcomes can occur:- On the failure of other operators, the
StartOfRegion
operator continues submitting new tuples with attribute values that increase by one. Because tuples are sometimes not replayed, you might not see consecutive numbers in the output. - On the failure of the operator itself, the
StartOfRegion
operator loses the value of the tuple that was last submitted. In this case, the application starts submitting data that starts at 0, which would make the whole application process data from the beginning of the application.
Header file
<%
my $crContext =
$model->getContext()->getOptionalContext("ConsistentRegion");
my @includes;
if ($crContext) {
push @includes,
"#include <SPL/Runtime/Operator/State/StateHandler.h>";
"#include <SPL/Runtime/Operator/State/ConsistentRegionContext.h>";
}
my $isTriggerOperator =
$crContext->isTriggerOperator();
if ($isTriggerOperator) {
SPL::CodeGen::errorln("Operator does not support trigger=operatorDriven in a
consistent region.", $model->getContext()->getSourceLocation());
}
SPL::CodeGen::headerPrologue($model,\@includes);
%>
The
rest of the header file for this operator is similar to the header file for the
StatefulPrimitive
operator. The only difference is that it includes a new
member variable that is named _crContext
of type pointer to a
ConsistentRegionContext
. To view the header file for that operator, see
Stateful primitive operators that participate in consistent regions.C++ file
Similar to the header file, the C++ file uses the code generation API to conditionally compile code that is used only when the operator is in a consistent region.
process(uint32_t)
method. When the operator is in a consistent region, the main loop of the process method instantiates the ConsistentRegionPermit
for the following reasons:- To protect tuple submission.
- To bind the tuple submission with the state update in a single operation.
By binding the tuple submission code to the state update code, the operator ensures that both operations occur as a single unit during a checkpoint or a reset. This code means that when the operator checkpoints or resets its state, the state of the operator reflects the fact that both operations occurred. If these operations were not coupled, the operator code would need to be able to deal with different reset situations, for example, a reset in which the operator submitted the tuple but did not update the internal state. This issue arises because a checkpoint or a reset can occur anytime the operator code is not running with an acquired permit.
void MY_OPERATOR::process(uint32_t)
{
ProcessingElement& pe = getPE();
while(!pe.getShutdownRequested()) {
<%if ($isInConsistentRegion) {%>
{
ConsistentRegionPermit crp(_crContext);
<%}%>
{
AutoMutex am (_mutex);
OPort0Type tuple(_counter);
submit (tuple, 0);
_counter++;
}
<%if ($isInConsistentRegion) {%>
}
<%}%>
pe.blockUntilShutdownRequest(0.2);
}
}
Any primitive operator that has background tuple submission
threads can use the ConsistentRegionPermit
as in the previous code sample
to pause and resume submission during a checkpoint or reset.This example does not show the implementation of the operator state checkpoint and reset methods
because those methods are similar to the implementation that is described in Stateful primitive operators that participate in consistent regions. For
information about how to create a sample application that uses the
StartOfRegion primitive operator, see the
sample::ReplayableSource
sample application at
$STREAMS_INSTALL/samples/spl/feature/ConsistentRegion/sample/. A
sample Java™ primitive operator that acquires
consistent region permits can be found in
com.ibm.streams.operator.samples.sources.SystemPropertySource
.