Implementing a sink operator using the Java™ Operator API
A sink operator transmits information or events to an external system, such as a dashboard, web-server, mail-server, or a database. When such a system provides a Java™ API, a sink operator to that system can be implemented in Java™.
Before you begin
About this task
PreparedStatement
based on the
tuple's attributes values, and then run the PreparedStatement
to
update the database.A simple model is that the arrival of each
incoming tuple results in data transmitted to the external system
in the Operator.process()
method. Typically this
model might not be sufficient for performance reasons and, instead,
batching must be used so that a batch of information that represents
multiple tuples is transmitted to the external system in a single operation.
For more information about batching, see Batching in a sink operator.
The
general approach for writing a sink operator in Java™ is described here. Some steps include example
information for the implementation of a sink operator in Java™ that implements HTTP POST, where incoming
tuples result in HTTP POST operations, such as a web order form. Sample
class com.ibm.streams.operator.samples.sinks.HttpPOST
is
provided.
Procedure
- Design the model for transmission of information from tuples
to the external system.
- Specify the parameters that are required for the operator,
such as information to determine the location of the external system
and any batching related configuration.For HTTP POST, a url parameter is required to specify the URL for the POST and the
TupleConsumer
sample class provides parameters to configure batching. - Design error handling for errors that are communicated
with the external system.
- Handling errors by trying the connection to the external system again, logging the data that cannot be transmitted, or by passing the exception to the Teracloud® Streams instance, which causes the PE to terminate.
- Handling connection errors, such as failure to connect to remote database.
- Handling data errors, such as constraint violations for an external database.
- Create an initial implementation of
com.ibm.streams.operator.Operator
, either directly, by extendingcom.ibm.streams.operator.AbstractOperator
,TupleConsumer
, or any other existing Operator implementation suitable for the wanted goal. - Implement the
Operator.initialize()
method, ensuringsuper.initialize()
is called as required.Typically theinitialize()
method performs the following operations:- Fetches and validates the parameters that are passed in from SPL.
- Sets up connections to the external system.
- Implement
Operator.process()
to process incoming tuples.Typically, theOperator.process()
performs the following operations:- Implements batching as required
- Adds the tuple to the batch or processes it immediately
- Converts information from tuples into the form that is expected by the external system's Java™ API
- Transmits the information to the external system.
- Implement the logic to process incoming punctuation.
If batching is implemented, then the processing of the final punctuation (
Punctuation.FINAL_MARKER
) must ensure that the batch processing is completed. If the final punctuation is ignored, then the operator might be shut down while a batch is outstanding. - Implement
Operator.shutdown()
to handle any cleanup, such as closing connections to external systems.If any processing requires asynchronous processing, thenThreadFactory
orScheduledExectorService
provided by theOperatorContext
must be used. Use of these standard Java™ interfaces that are provided by theOperatorContext
allows the Teracloud® Streams instance to track which operators have outstanding work to be done and thus allow the asynchronous work to be completed when a final punctuation is received.
Results
Example
com.ibm.streams.operator.samples.sinks.HttpPOST
shows the logic specific to creating an HTTP POST request from tuples.com.ibm.streams.operator.samples.patterns.TupleConsumer
is an abstract class that demonstrates a flexible batching of tuples that is intended for use as a sink operator.
HttpPost extends TupleConsumer: Starting with HttpPost,
there are two key methods initialize()
and processBatch()
.
The initialize()
method uses the SPL parameter url to
determine the URL for the POST request. A connection attempt is not
made to validate the URL since the only knowledge is that the URL
is suitable for a POST request and a POST request with no data might
have unknown effects on the server.
/**
* URL of the HTTP server we will be posting data to.
*/
private URL url;
/**
* Initialize by setting the URL and the batch size.
*/
@Override
public void initialize(OperatorContext context) throws Exception {
super.initialize(context);
url = getURL();
setBatchSize(batchSize());
}
/**
* Get the URL for the POST requests from the required parameter url.
* Sub-classes may override this to set the URL another way.
*
* @return URL for POST requests
* @throws MalformedURLException
*/
protected URL getURL() throws MalformedURLException {
String urlPath = getOperatorContext().getParameterValues("url").get(0);
return new URL(urlPath);
}
/**
* Get the batch size to use from the parameter batchSize using 1 if that is
* not set. Sub-classes may override this to set the batchSize another way.
*
* @return batchSize to use
*/
protected int batchSize() {
List<String> bp = getOperatorContext().getParameterValues("batchSize");
if (bp.isEmpty())
return 1;
return Integer.getInteger(bp.get(0));
}
The processBatch()
method is an abstract
method that the super-class TupleConsumer
requires
and whose purpose is to take a batch of Tuple
objects
and transmit them to the external system. For HttpPost each attribute
in the tuple is converted into a name-value pair with URL encoding.
A single HTTP POST request might contain multiple tuples, with each
name, value pair repeated multiple times.
@Override
protected final boolean processBatch(Queue<BatchedTuple> batch)
throws Exception {
StringBuilder data = new StringBuilder(1024);
for (BatchedTuple item : batch) {
StreamSchema schema = item.getStream().getStreamSchema();
for (Attribute attribute : schema) {
if (data.length() != 0)
data.append('&');
data.append(URLEncoder.encode(attribute.getName(), "UTF-8"));
data.append('=');
data.append(URLEncoder.encode(item.getTuple().getString(
attribute.getName()), "UTF-8"));
}
}
// Send data
URLConnection conn = url.openConnection();
conn.setDoOutput(true);
OutputStreamWriter wr = new OutputStreamWriter(conn.getOutputStream(),
"UTF-8");
wr.write(data.toString());
wr.flush();
// Get the response
BufferedReader rd = new BufferedReader(new InputStreamReader(conn
.getInputStream()));
String line;
while ((line = rd.readLine()) != null) {
trace.log(TraceLevel.DEBUG, line);
}
wr.close();
rd.close();
return false;
}
}
TupleConsumer
implements
generic batching of Tuples as follows:- Batch that is defined by number of tuples, including support for a single tuple in the batch.
- Asynchronous processing of the batch so that processing of the batch does not affect the submission of tuples to the operator.
- Optional timeout, so that if a time elapsed since the last tuple arrival, then the batch is processed regardless of how full it is and when it is not empty.
- Option to preserve tuple arrival order of batch processing. If order preservation is not required, then multiple threads might be processing batches asynchronously.
- The ability for the subclass to process partial batches, allowing unprocessed tuples to be processed in a future batch.
- Final punctuation handling ensures that all batches are processed before
they return from
processPunctuation()
.
What to do next
- Study the code that implements HttpPost (
com.ibm.streams.operator.samples.sinks.HttpPOST
andcom.ibm.streams.operator.samples.patterns.TupleConsumer
). - Implement a sink operator in Java™.
- Study the code for other sample sink Java™ operators.