Implementing an operator using the Java™ Operator API
An operator processes incoming tuples and submits outgoing tuples. The relationship between an input tuple and the submitted tuples varies according to the function of the operator.
For example, a filtering operator can submit a subset of the incoming tuples without modifying individual tuples. Another operator might submit a tuple to an output port that is derived from an input tuple but not a direct copy, such as enriching it or taking a subset of the input tuple's attributes.
Incoming tuples are passed to a Java™ operator as invocations of the process()
method.
Passing in the StreamingInput
reference describes
the port that the tuple arrived on and a reference to the tuple itself,
as an immutable Tuple
. Any Operator
implementation
must be thread-safe, which means that the process()
method can
be called concurrently by different threads for tuples that arrive
on the same or different ports.
Tuples are submitted to output
ports using the submit()
method of the StreamingOutput
reference
for the required port. A StreamingOutput
reference
is obtained though the getStreamingOutputs()
method
of OperatorContext
. Also, the AbstractOperator
class
provides direct access based on the output port number.
StreamingOutput.submit()
is
overloaded to accept a Tuple
or an OutputTuple
reference,
thus allowing an incoming tuple to be directly submitted to an output
port if the input and output ports have the same schema.
The following is an example of a process()
method
implementation that filters and submits any incoming Tuple to a single
output port
@Override
public void process(StreamingInput<Tuple> port, Tuple tuple) throws Exception {
final StreamingOutput<OutputTuple> output = getOutput(0);
// Submit any tuple with a reading greater
// than 0.8
if (tuple.getFloat("reading") > 0.8)
output.submit(tuple);
}
If the input and output ports have a different schema, then your
code must create a tuple instance that matches the schema. Typically,
a mutable OutputTuple
instance is created and then
its attributes are set as required.
The OutputTuple.assign()
method sets attributes
in the tuple from the passed-in tuple for those attributes that match,
that is, have a matching name and SPL type. Here the assign()
method
is used to copy fields from the input tuple to the output tuple. Any attributes
that are not otherwise explicitly set are set to their default values.
// Submit any tuple with a sensor reading greater than 0.8
if (tuple.getFloat("sensorReading") > 0.8) {
OutputTuple outputTuple = output.newTuple();
outputTuple.assign(tuple);
output.submit(outputTuple);
}
Thus this example behaves as the previous example, but allows the output port schema to be different from the input port's schema.
OutputTuple
has a number of setter
methods
to allow individual attributes to be set. Thus it allows the outgoing
tuple to be enriched with other data or data that is derived from
the input tuple's data. Here the example is expanded to add a time
stamp and a city name that is based on the incoming tuple's postal
code to the outgoing tuple. Thus the expectation is that the output
port's schema has a number of attributes in common with the input
port schema and adds two attributes, ts
and city
.
// Submit any tuple with a reading greater than 0.8
if (tuple.getFloat("reading") > 0.8) {
OutputTuple outputTuple = output.newTuple();
outputTuple.assign(tuple);
outputTuple.setTimestamp("ts", Timestamp.currentTime());
outputTuple.setString("city",Location.getCity(tuple.getString("zipcode)")));
output.submit(outputTuple);
}
The StreamSchema
interface has a number of methods
to create immutable Tuple
objects from other objects
include from Maps
, Lists
, and other Tuple
objects.
These methods can also be used to create tuples to be submitted to
output ports.
com.ibm.streams.operator.samples.operators.DecimalScaleSetter
: Operator that submits a tuple for every input tuple, modifying a singledecimal
attribute by setting its scale (seejava.math.BigDecimal.scale()
).com.ibm.streams.operator.samples.patterns.TupleInTupleOut
: An abstract pattern class that exposes filtering and transformation to a subclass but hides the mechanics of tuple reception and submission. For each incoming tuple, thefilter()
method of the subclass is called and if it returnsfalse
no further processing for that tuple is done. Iftrue
is returned, thentransform()
is called. Transform allows the subclass to set the attributes of the outgoing tuple and returntrue
to deliver the tuple to the output port.com.ibm.streams.operator.samples.operators.Regex
: Operator that extends theTupleInTupleOut
class to filter with regular expressions from the standardjava.util.regex
package.