Dynamic application composition
When a composite operator in the source code is intended as the main operator in a compiled application, its graph clause typically invokes operators for edge adaptation, that is, ingesting external data as well as producing data that can be consumed by external parties. In particular, it uses various source and sink operators for I/O to URLs, files, databases, and so on; and it uses the special operators Import and Export for I/O to other jobs on the same streaming middleware.
Streaming applications commonly run for long periods of time. Streams that are generated by one application often serve as input to another, and the second application might be deployed when the first is already running. The Import and Export special operators support dynamic application composition, where one application exports a stream, another application imports the stream, but both applications are instantiated separately and might even be mutually anonymous. For example, the streaming middleware might host one or more long-running backbone applications that carry out the bulk of the data processing, and users might launch transient applications that import streams, for example, to visualize results in a dashboard.
type BargainType = rstring symbol, decimal32 price;
composite ExportingMain {
graph
stream<BargainType> TechSectorBargains = FileSource(){param file:"tech";}
stream<BargainType> HealthCareSectorBargains = FileSource(){param file:"health";}
() as ExportOp1 = Export(TechSectorBargains) {
param properties : { kind="bargains", category="tech",
tickers=["IBM", "GOOG", "MSFT"] };
}
() as ExportOp2 = Export(HealthCareSectorBargains) {
param properties : { kind="bargains", category="healthcare",
tickers=["AET", "UNH", "WLP"] };
}
}
properties
parameter specifies properties of the stream as a tuple literal
with name-value pairs. Here are examples of importing streams from a different application
instance:
type BargainType = rstring symbol, decimal32 price;
composite ImportingMain {
graph
stream<BargainType> TechBargains = Import() {
param subscription : kind == "bargains" && category == "tech";
}
stream<BargainType> IBMOrWLPBargains = Import() {
param subscription : "IBM" in tickers || "WLP" in tickers;
}
}
Using special operators for dynamic application composition provides a consistent syntax for specifying types and parameters. The middleware connects the Import and Export streams if the subscription predicate matches the exported stream properties. If the Import predicate matches multiple streams that are exported by jobs that are running in the middleware, they are all connected. If there are no matching streams, nothing arrives at the Import operator. Properties can also be added, updated, or removed at run time, and so can subscriptions. The compile-time properties and subscriptions serve as initial settings. Since the compiler sees only one application at a time, it cannot statically check whether the types in publication properties and subscription predicates in other applications match. The output from an Import operator cannot be fed into an input port that expects window punctuation. Also, a port of a down-stream operator that is connected to the output stream of an Import operator stays open forever, no matter how many final punctuation markers it receives.
properties
parameter: namespace some.nameSpace;
composite Comp(input E) {
graph () as ExportInvoke = Export(E) {
param streamId : "StreamName";
}
}
public composite ExportingMain {
graph stream<int32 x> A = Beacon() { }
() as CompInvoke = Comp(A) { }
}
streamId
parameter specifies the external
name of the stream that is being exported. Only one of streamId
or properties
can
be specified on an Export operator. If neither streamId
nor properties
are
specified, the stream is exported by properties with an initially
empty selection of properties; which is useful only if properties
are later set at run time. The importing application instance again
uses the Import operator, but this time with a
different set of parameters: composite ImportingMain {
graph
stream<int32 x> I2 = Import() {
param applicationScope : "myApplicationScope";
//application scope selected when exporting application launched
applicationName : "some.nameSpace::ExportingMain";
//main operator selected when exporting application launched
streamId : "StreamName";
//string agreed upon by exporting and importing application
}
}
If the explicit applicationScope
is
omitted, it is implicitly bound to the scope in which the current
application was launched. An explicit applicationScope
can
be used equally with property-based and name-based subscription. The
optional applicationName
selects the main operator
that is selected when the exporting application is launched. It is
only valid when streamId
is specified. You
cannot run two applications that specify the same streamId
; Teracloud®
Streams rejects
the submission of the second application instance.
properties
parameters
internally as XML and represents subscription
parameters
internally as XPath. As a result, only a restricted set of SPL expressions
are valid for these two parameters, and the compiler emits error messages
if others are used. Also, while the language represents Import and Export as
operators, the implementation erases them (treats them as ghost operators).
This situation has the consequence that it is not valid to directly
export an imported stream without sending it through another operator
first. Dynamic optimization in the transport fabric ensures that the
bandwidth for dynamic application composition connections is only
used when a connection is in place. As a rule of thumb, it is more
efficient to establish a name-based subscription than a property-based
anonymous subscription, though when the connections are established,
they perform equally well.