Implementing a polling source operator using the Java™ Operator API
Some external sources might require that an operator pulls
its data periodically by polling it. In Java™,
you can create a periodic task using the standard Java™ SE java.util.concurrent.ScheduledExecutorService
class.
The Streams Java™ Operator
API provides a ScheduledExecutorService
instance
for each operator, accessible through the OperatorContext.getScheduledExecutorService()
.
Use this instance to run background and periodic task for Operator
implementations.
The tasks are run by threads that have names specific to the operator
and have the correct class-loading setup.
For example, ScheduledExecutorService.scheduleAtFixedRate()
creates
and runs a periodic task, every 5 seconds. The details of the task
are omitted but the result is that the run()
method
is called every 5 seconds when allPortsReady()
is
called by the Teracloud® Streams
instance:
@Override
public void allPortsReady() {
getOperatorContext().getScheduledExecutorService().scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
// Work of task (not shown)
…
}
}, 0l, 5l, TimeUnit.SECONDS);
}
If the task is to submit tuples or punctuation to any output port,
then it must be started only when allPortsReady()
is
called, and thus is typically started by that method. An operator can
start any number of background tasks using its ScheduledExecutorService
,
and also can create background threads using OperatorContext.getThreadFactory()
.
Here is an example that submits a tuple every 5 seconds that contains the host name where the operator is running and the current time stamp:
@Override
public void allPortsReady() throws Exception {
getOperatorContext().getScheduledExecutorService().scheduleAtFixedRate(
new Runnable() {
final String host = InetAddress.getLocalHost().getCanonicalHostName();
final StreamingOutput<OutputTuple> output = getOutput(0);
@Override
public void run() {
OutputTuple tuple = output.newTuple();
tuple.setString("host", host);
tuple.setTimestamp("ts", Timestamp.currentTime());
try {
output.submit(tuple);
} catch (Exception e) {
trace.log(LogLevel.ERROR, "Exception thrown submitting tuple!", e);
}
}
}, 0l, 5l, TimeUnit.SECONDS);
}
}
The Runnable.run()
method does not allow declaration
of checked exceptions, thus any implementation must catch checked exceptions that
result from access to external systems or submission of tuples or
punctuation to output ports. Also, catching the exception within the run()
method
or throwing it wrapped by a runtime exception does not result in terminating
the operator.
com.ibm.streams.operator.samples.patterns.PollingTupleProducer
: An abstract pattern class that allows the subclass to handle the mechanics of polling and allows the subclass to focus on the function to be run periodically. The subclass needs to implement a single methodfetchTuples()
that is called periodically and is expected to fetch data, populate tuples, and submit them to output ports.PollingTupleProducer
supports an SPL parameter,period
, that optionally specifies the polling period in seconds, as an SPLdouble
.com.ibm.streams.operator.samples.patterns.PollingSingleTupleProducer
: An abstract pattern class that is a specialization ofPollingTupleProducer
to produce a single tuple periodically. This class requires only that the subclass provides a singlefetchSingleTuple()
method that populates a singleOutputTuple
for submission, thus allowing the subclass to focus on fetching data. The return offetchSingleTuple()
indicates whether the tuple is submitted or not, to allow supporting times when there is no data available to send. In addition, the class supports an SPL parameter,iterations
, that optionally specifies the number of times the polling task (fetchSingleTuple()
) is run.com.ibm.streams.operator.samples.sources.RandomBeacon
: A concrete operator that extendsPollingSingleTupleProducer
to periodically produce a single tuple that is populated with random values, usingAttribute.getType().randomValue()
for each attribute in the single output port's schema.
RandomBeacon
itself can be extended to provide
a simple periodic data source with mainly random values, but some
values specific to a simulation. This mechanism is useful to simulate
tuples of an appropriate size with only a subset of data that is required
to be meaningful for a simulation.
public class RandomRegion extends RandomBeacon {
@Override
protected boolean fetchSingleTuple(final OutputTuple tuple) throws Exception {
// All attributes with be initialized with random values
// by super-call method.
super.fetchSingleTuple(tuple);
// Override some with specific meaning
// region is an instance field that could be set by a parameter
tuple.setTimestamp("ts", Timestamp.currentTime());
tuple.setString("region", region);
// Always submit the tuple
return true;
}