Dynamic stream importing and exporting
The Export operator exports a stream using a set of export properties, which can be modified at run time. Similarly, the Import operator is used to import a stream using an import subscription that is defined over the export properties. Import subscriptions can also be changed at run time.
The export properties of a stream can be updated at run time only
by the operator that produces the stream. Such an operator must first
get a handle on the output port that produces the stream, which can
be achieved by the getOutputPortAt
member function
of the Operator
class. Given an output port object
of type OperatorOutputPort
, operations such as adding
and removing stream properties can be done with the provided APIs.
These APIs take objects of type StreamPropertyCollection
and StreamProperty
to
update the properties of the exported stream. The following is an
example that adds an export property at run time. The property values
must be defined with SPL C++ types (rstring
in the
example).
OperatorOutputPort & oport = getOutputPortAt(0);
assert(oport.isExported() &&
oport.getExportType()==OperatorOutputPort::ByProperties);
std::string propName = "color";
rstring propValue = "orange";
StreamProperty colorProp(propName, propValue);
oport.addStreamProperty(colorProp);
The import subscription for an imported stream can be updated at run time
only by an operator that subscribes to the stream. Unlike an exported
stream, which is necessarily produced by a specific output port, an
imported stream can be consumed by multiple operators. Any changes
that are performed on the subscription at run time apply only to the operator
that performs the update. Such an operator needs to first get a handle
on the input port that subscribes to the stream, which can be achieved
by the getInputPortAt
function of the Operator
class.
Given an input port object of type OperatorInputPort
,
operations such as updating or modifying the subscription expressions
can be done with the provided APIs. These APIs take objects of type SubscriptionExpression
,
which holds a modifiable expression. The following is an example that
creates the subscription expression ids[0]<5 &&
color=="orange"
at run time.
OperatorInputPort & iport = getInputPortAt(0);
SubscriptionExpressionPtr idPred = SubscriptionExpression::createPredicate("ids",
SubscriptionExpression::LessThan, SubscriptionExpression::Literal(5), 0);
SubscriptionExpressionPtr colorPred = SubscriptionExpression::createPredicate("color",
SubscriptionExpression::Equal, SubscriptionExpression::Literal("orange"));
SubscriptionExpressionPtr subscription =
SubscriptionExpression::createAndClause(idPred, colorPred);
iport.setSubscriptionExpression(*subscription);
In the example, SubscriptionExpressionPtr
is a
shared pointer, and thus the memory does not need to be managed explicitly. SubscriptionExpressionPtr
objects
can also be created from string representations of the expression.
The following is an example:
SubscriptionExpressionPtr
exp = SubscriptionExpression::fromString("ids[0]<5 && color==\"orange\"");