Dynamic stream importing and exporting
The Export operator exports a stream using a set of export properties, which can be modified at run time. Similarly, the Import operator is used to import a stream using an import subscription that is defined over the export properties. Import subscriptions can also be changed at run time.
The export properties of a stream can be updated at run time only
by the operator that produces the stream. Such an operator must first
get a handle on the output port that produces the stream, which can
be achieved by the getOutputPortAt member function
of the Operator class. Given an output port object
of type OperatorOutputPort, operations such as adding
and removing stream properties can be done with the provided APIs.
These APIs take objects of type StreamPropertyCollection and StreamProperty to
update the properties of the exported stream. The following is an
example that adds an export property at run time. The property values
must be defined with SPL C++ types (rstring in the
example).
OperatorOutputPort & oport = getOutputPortAt(0);
assert(oport.isExported() &&
oport.getExportType()==OperatorOutputPort::ByProperties);
std::string propName = "color";
rstring propValue = "orange";
StreamProperty colorProp(propName, propValue);
oport.addStreamProperty(colorProp);
The import subscription for an imported stream can be updated at run time
only by an operator that subscribes to the stream. Unlike an exported
stream, which is necessarily produced by a specific output port, an
imported stream can be consumed by multiple operators. Any changes
that are performed on the subscription at run time apply only to the operator
that performs the update. Such an operator needs to first get a handle
on the input port that subscribes to the stream, which can be achieved
by the getInputPortAt function of the Operator class.
Given an input port object of type OperatorInputPort,
operations such as updating or modifying the subscription expressions
can be done with the provided APIs. These APIs take objects of type SubscriptionExpression,
which holds a modifiable expression. The following is an example that
creates the subscription expression ids[0]<5 &&
color=="orange" at run time.
OperatorInputPort & iport = getInputPortAt(0);
SubscriptionExpressionPtr idPred = SubscriptionExpression::createPredicate("ids",
SubscriptionExpression::LessThan, SubscriptionExpression::Literal(5), 0);
SubscriptionExpressionPtr colorPred = SubscriptionExpression::createPredicate("color",
SubscriptionExpression::Equal, SubscriptionExpression::Literal("orange"));
SubscriptionExpressionPtr subscription =
SubscriptionExpression::createAndClause(idPred, colorPred);
iport.setSubscriptionExpression(*subscription);
In the example, SubscriptionExpressionPtr is a
shared pointer, and thus the memory does not need to be managed explicitly. SubscriptionExpressionPtr objects
can also be created from string representations of the expression.
The following is an example:
SubscriptionExpressionPtr
exp = SubscriptionExpression::fromString("ids[0]<5 && color==\"orange\"");