Implementing a blocking source operator with the Java™ Operator API
An external data source might require that data is fetched using a blocking style. With blocking, the API for the data source is called repeatedly, which might block processing while data is fetched.
Before you begin
About this task
ResultSet
where the underlying driver
might block waiting for a response from the database, or thejava.io.InputStream
class
where a reader might block waiting from data from a file or the network. Within
a Java™ operator, the approach
for using such APIs is to have a thread for fetching data from the
API in a loop and submitting tuples that are based on the data. This
pattern also applies for cases where blocking might not be involved
but the API still requires repeated calls to fetch the data. Alternative
data access APIs are polling where the Java™ operator
periodically polls the data source for new information. Procedure
- Implement the
Operator.initialize()
method.For more information, see Implementing a source operator with the Java Operator API. - Implement the
Operator.allPortsReady()
method.This method creates an implementation ofjava.lang.Runnable
that accesses the external data source API and submit tuples. It also creates and starts one (or more) Java™ threads usingOperatorContext.getThreadFactory()
, passing an instance of the Runnable implementation. Use of theOperator
'sThreadFactory
ensures that the threads have the right context (including the context class loader) and that the threads are seen by the Teracloud® Streams instance. Starting threads any other way can lead to early termination of the operator as the Teracloud® Streams instance might determine that no outstanding work remains. - If the data source is finite, then ensure that the runnable implementation submits a final punctuation mark and cleans up any resources before it returns (which leads to completion of the thread).
- Implement your Operator's
shutdown()
method using either of the following options:- Handling
java.lang.InterruptedException
exceptions that are thrown - Checking to see whether the operator is shut down using the
OperatorContext.shutdownRequested()
method if the external data access API does not support being interrupted. For example, checking once per-loop.
- Handling
Results
Example
com.ibm.streams.operator.samples.sources.SystemPropertySource
is
a simple source operator that uses a Thread
to read Java™ system properties and deliver
them as a stream of two ustring
attributes, name
and value. SystemPropertySource
extends the sample
pattern abstract class ProcessTupleProducer
, which
allows subclasses to implement a process()
method
to fetch data and submit tuples, thus hiding the details of creating
a Thread
and implementing Runnable
. This
use of a single process()
method is similar to the
C++ operator runtime API style of using a process()
function
as the entry point for one or more threads. ProcessTupleProducer
itself
extends TupleProducer
to allow a delay to be specified
before tuples are submitted. Looking at the ProcessTupleProducer.initialize()
method,
see that it creates a new thread using the operator's ThreadFactory
and
that thread calls the abstract no-argument process()
method
when it is run.public synchronized void initialize(OperatorContext context) throws Exception {
super.initialize(context);
/*
* Create the thread at initialize time but do not start it.
* The thread will be started by startProcessing() which will
* be called at allPortsReady() time if no delay is specified,
* otherwise it will be called once the delay has expired.
*/
processThread = getOperatorContext().getThreadFactory().newThread(
new Runnable() {
@Override
public void run() {
try {
process();
} catch (Exception e) {
trace.log(TraceLevel.ERROR, e.getMessage(), e);
}
}
});
/*
* Set the thread not to be a daemon to ensure that the SPL runtime
* will wait for the thread to complete before determining the
* operator is complete.
*/
processThread.setDaemon(false);
}
The implementation of Runnable
that
calls process is provided by an anonymous inner class that calls process()
and
logs any thrown exception to the error log. The thread is set to
a non-daemon thread to ensure that the Teracloud® Streams
instance waits
for the thread to complete before it marks the operator as complete.
When initialize()
completes, the thread exists but
is not started, ProcessTupleProducer
then implements startProcessing()
to
start the thread during or after allPortsReady()
(if
a delay was specified) is called.protected synchronized void startProcessing() {
processThread.start();
}
ProcessTupleProducer
thus provides
the framework for subclasses to implement the no-argument process()
to
loop and fetch data. SystemPropertySource
has a simple process()
method
that fetches the Java™ system
properties and submits a tuple for each property.protected void process() throws Exception {
final StreamingOutput<OutputTuple> out = getOutput(0);
for (Enumeration<?> e = System.getProperties().propertyNames();
e.hasMoreElements(); )
{
String name = (String) e.nextElement();
String value = System.getProperty(name);
OutputTuple tuple = out.newTuple();
tuple.setString("name", name);
tuple.setString("value", value);
out.submit(tuple);
}
// Finite set of properties so send a final mark.
out.punctuate(Punctuation.FINAL_MARKER);
}
Since the properties are a finite set, the operator
submits a final punctuation mark when all the properties are submitted
as tuples and then returns, which completes the thread and the operator.What to do next
- Look at the complete source for
ProcessTupleProducer
andSystemPropertySource
. - Implement your own operator using a
Thread
or by extendingProcessTupleProducer
.