Implementing a blocking source operator with the Java™ Operator API
An external data source might require that data is fetched using a blocking style. With blocking, the API for the data source is called repeatedly, which might block processing while data is fetched.
Before you begin
About this task
ResultSet where the underlying driver
might block waiting for a response from the database, or thejava.io.InputStream class
where a reader might block waiting from data from a file or the network. Within
a Java™ operator, the approach
for using such APIs is to have a thread for fetching data from the
API in a loop and submitting tuples that are based on the data. This
pattern also applies for cases where blocking might not be involved
but the API still requires repeated calls to fetch the data. Alternative
data access APIs are polling where the Java™ operator
periodically polls the data source for new information. Procedure
- Implement the
Operator.initialize()method.For more information, see Implementing a source operator with the Java Operator API. - Implement the
Operator.allPortsReady()method.This method creates an implementation ofjava.lang.Runnablethat accesses the external data source API and submit tuples. It also creates and starts one (or more) Java™ threads usingOperatorContext.getThreadFactory(), passing an instance of the Runnable implementation. Use of theOperator'sThreadFactoryensures that the threads have the right context (including the context class loader) and that the threads are seen by the Teracloud® Streams instance. Starting threads any other way can lead to early termination of the operator as the Teracloud® Streams instance might determine that no outstanding work remains. - If the data source is finite, then ensure that the runnable implementation submits a final punctuation mark and cleans up any resources before it returns (which leads to completion of the thread).
- Implement your Operator's
shutdown()method using either of the following options:- Handling
java.lang.InterruptedExceptionexceptions that are thrown - Checking to see whether the operator is shut down using the
OperatorContext.shutdownRequested()method if the external data access API does not support being interrupted. For example, checking once per-loop.
- Handling
Results
Example
com.ibm.streams.operator.samples.sources.SystemPropertySource is a
simple source operator that uses a Thread to read Java™ system properties and deliver them as a stream of two
ustring attributes, name and value.
SystemPropertySource extends the sample pattern abstract class
ProcessTupleProducer, which allows subclasses to implement a
process() method to fetch data and submit tuples, thus hiding the
details of creating a Thread and implementing
Runnable. This use of a single process() method is
similar to the C++ operator runtime API style of using a process()
function as the entry point for one or more threads.
ProcessTupleProducer itself extends TupleProducer
to allow a delay to be specified before tuples are submitted. Looking at the
ProcessTupleProducer.initialize() method, see that it creates a new
thread using the operator's ThreadFactory and that thread calls
the abstract no-argument process() method when it is
run.public synchronized void initialize(OperatorContext context) throws Exception {
super.initialize(context);
/*
* Create the thread at initialize time but do not start it.
* The thread will be started by startProcessing() which will
* be called at allPortsReady() time if no delay is specified,
* otherwise it will be called once the delay has expired.
*/
processThread = getOperatorContext().getThreadFactory().newThread(
new Runnable() {
@Override
public void run() {
try {
process();
} catch (Exception e) {
trace.log(TraceLevel.ERROR, e.getMessage(), e);
}
}
});
/*
* Set the thread not to be a daemon to ensure that the SPL runtime
* will wait for the thread to complete before determining the
* operator is complete.
*/
processThread.setDaemon(false);
}The implementation of Runnable that calls process is
provided by an anonymous inner class that calls process() and logs any
thrown exception to the error log. The thread is set to a non-daemon thread to ensure
that the Teracloud® Streams
instance waits for the thread to complete before it marks the operator as complete. When
initialize() completes, the thread exists but is not started,
ProcessTupleProducer then implements
startProcessing() to start the thread during or after
allPortsReady() (if a delay was specified) is
called.protected synchronized void startProcessing() {
processThread.start();
}ProcessTupleProducer thus provides the framework
for subclasses to implement the no-argument process() to loop and fetch
data. SystemPropertySource has a simple process()
method that fetches the Java™ system properties and
submits a tuple for each
property.protected void process() throws Exception {
final StreamingOutput<OutputTuple> out = getOutput(0);
for (Enumeration<?> e = System.getProperties().propertyNames();
e.hasMoreElements(); )
{
String name = (String) e.nextElement();
String value = System.getProperty(name);
OutputTuple tuple = out.newTuple();
tuple.setString("name", name);
tuple.setString("value", value);
out.submit(tuple);
}
// Finite set of properties so send a final mark.
out.punctuate(Punctuation.FINAL_MARKER);
}Since the properties are a finite set, the operator submits a final
punctuation mark when all the properties are submitted as tuples and then returns, which
completes the thread and the operator.What to do next
- Look at the complete source for
ProcessTupleProducerandSystemPropertySource. - Implement your own operator using a
Threador by extendingProcessTupleProducer.