Implementing a polling source operator using the Java Operator API

Some external sources might require that an operator pulls its data periodically by polling it. In Java, you can create a periodic task using the standard Java SE java.util.concurrent.ScheduledExecutorService class.

The Streams Java Operator API provides a ScheduledExecutorService instance for each operator, accessible through the OperatorContext.getScheduledExecutorService(). Use this instance to run background and periodic task for Operator implementations. The tasks are run by threads that have names specific to the operator and have the correct class-loading setup.

For example, ScheduledExecutorService.scheduleAtFixedRate() creates and runs a periodic task, every 5 seconds. The details of the task are omitted but the result is that the run() method is called every 5 seconds when allPortsReady() is called by the Teracloud® Streams instance:

@Override
public void allPortsReady() {
  getOperatorContext().getScheduledExecutorService().scheduleAtFixedRate(
                       new Runnable() {

    @Override
    public void run() {
    // Work of task (not shown)
      …
    }
  }, 0l, 5l, TimeUnit.SECONDS);
}

If the task is to submit tuples or punctuation to any output port, then it must be started only when allPortsReady() is called, and thus is typically started by that method. An operator can start any number of background tasks using its ScheduledExecutorService, and also can create background threads using OperatorContext.getThreadFactory().

Here is an example that submits a tuple every 5 seconds that contains the host name where the operator is running and the current time stamp:

@Override
public void allPortsReady() throws Exception {
  getOperatorContext().getScheduledExecutorService().scheduleAtFixedRate(
                       new Runnable() {
                    
    final String host = InetAddress.getLocalHost().getCanonicalHostName();
    final StreamingOutput<OutputTuple> output = getOutput(0);

    @Override
    public void run() {
                        
      OutputTuple tuple = output.newTuple();
      tuple.setString("host", host);
      tuple.setTimestamp("ts", Timestamp.currentTime());

      try {
        output.submit(tuple);
        } catch (Exception e) {
          trace.log(LogLevel.ERROR, "Exception thrown submitting tuple!", e);
          }

        }
      }, 0l, 5l, TimeUnit.SECONDS);
  }
}

The Runnable.run() method does not allow declaration of checked exceptions, thus any implementation must catch checked exceptions that result from access to external systems or submission of tuples or punctuation to output ports. Also, catching the exception within the run() method or throwing it wrapped by a runtime exception does not result in terminating the operator.

Three sample classes show an approach to polling in more detail:
  • com.ibm.streams.operator.samples.patterns.PollingTupleProducer: An abstract pattern class that allows the subclass to handle the mechanics of polling and allows the subclass to focus on the function to be run periodically. The subclass needs to implement a single method fetchTuples() that is called periodically and is expected to fetch data, populate tuples, and submit them to output ports. PollingTupleProducer supports an SPL parameter, period, that optionally specifies the polling period in seconds, as an SPL double.
  • com.ibm.streams.operator.samples.patterns.PollingSingleTupleProducer: An abstract pattern class that is a specialization of PollingTupleProducer to produce a single tuple periodically. This class requires only that the subclass provides a single fetchSingleTuple() method that populates a single OutputTuple for submission, thus allowing the subclass to focus on fetching data. The return of fetchSingleTuple() indicates whether the tuple is submitted or not, to allow supporting times when there is no data available to send. In addition, the class supports an SPL parameter, iterations, that optionally specifies the number of times the polling task (fetchSingleTuple()) is run.
  • com.ibm.streams.operator.samples.sources.RandomBeacon: A concrete operator that extends PollingSingleTupleProducer to periodically produce a single tuple that is populated with random values, using Attribute.getType().randomValue() for each attribute in the single output port's schema.

RandomBeacon itself can be extended to provide a simple periodic data source with mainly random values, but some values specific to a simulation. This mechanism is useful to simulate tuples of an appropriate size with only a subset of data that is required to be meaningful for a simulation.

public class RandomRegion extends RandomBeacon {
    
@Override
protected boolean fetchSingleTuple(final OutputTuple tuple) throws Exception {
        
  // All attributes with be initialized with random values
  // by super-call method.
  super.fetchSingleTuple(tuple);
        
  // Override some with specific meaning
  // region is an instance field that could be set by a parameter
         
  tuple.setTimestamp("ts", Timestamp.currentTime());
  tuple.setString("region", region);
        
  // Always submit the tuple
  return true;
}