Context
The context
element describes the properties
that apply to the operator as a whole and are not associated with
particular parameters or ports of the operator. It also includes common
definitions that are referenced in other places in the operator model.
Listing 4: The context type
Listing 4 gives the basic structure.
<xs:complexTypename=contextType">
<xs:sequence>
<xs:element name="description" type="common:descriptionType" minOccurs="0" maxOccurs="1"/>
<xs:element name="iconUri" type="iconUriType" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="metrics" minOccurs="0" maxOccurs="1"/>
<xs:element name="customLiterals" type="enumerationsType" minOccurs="0" maxOccurs="1"/>
<xs:element name="customOutputFunctions" type="customOutputFunctionsType"
minOccurs="0" maxOccurs="1"/>
<xs:element name="libraryDependencies" type="libraryDependenciesType"
minOccurs="0" maxOccurs="1"/>
<xs:element name="providesSingleThreadedContext" type="singleThreadedContextType"/>
<xs:element name="incrementalCompilationStrategy" type="incrementalCompilationStrategyType"
minOccurs="0" maxOccurs="1"/>
<xs:element name="allowCustomLogic" type="xs:boolean" minOccurs="0" maxOccurs="1"/>
<xs:element name="codeTemplates" type="codeTemplatesType" minOccurs="0" maxOccurs="1"/>
<xs:element name="splExpressionTree" type="splExpressionTreeType"
minOccurs="0" maxOccurs="1"/>
<xs:element name="capability" type="xs:string"
minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
<xs:attribute name="verificationModule" type="xs:token"/>
</xs:complexType>
<xs:complexType name="iconUriType">
<xs:simpleContent>
<xs:extension base="xs:string">
<xs:attribute name="size" type="xs:int" use="required"/>
</xs:extension>
</xs:simpleContent>
</xs:complexType>
<xs:complexType name="metricsType">
<xs:sequence>
<xs:element name="description" type="common:descriptionType" minOccurs="0" maxOccurs="1"/>
<xs:element name="metric" type="metricType" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="metricType">
<xs:sequence>
<xs:element name="name" type="xs:string"/>
<xs:element name="description" type="common:descriptionType"/>
<xs:element name="kind" type="metricKindType"/>
<xs:element name="dynamic" type="xs:boolean" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
</xs:complexType>
<xs:simpleType name="metricKindType">
<xs:restriction base="xs:string">
<xs:enumeration value="Counter"/>
<xs:enumeration value="Gauge"/>
<xs:enumeration value="Time"/>
</xs:restriction>
</xs:simpleType>
<xs:complexType name="enumerationsType">
<xs:sequence>
<xs:element name="enumeration" type="enumerationType"
minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="enumerationType">
<xs:sequence>
<xs:element name="name" type="xs:string"/>
<xs:element name="value" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="customOutputFunctionsType">
<xs:sequence>
<xs:element name="customOutputFunction" type="customOutputFunctionSetType"
minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="customOutputFunctionSetType">
<xs:sequence>
<xs:element name="name" type="xs:string"/>
<xs:element name="function" type="customOutputFunctionType"
minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="customOutputFunctionType">
<xs:sequence>
<xs:element name="description" type="common:descriptionType"
minOccurs="0" maxOccurs="1"/>
<xs:element name="prototype" type="xs:string"/>
</xs:sequence>
<xs:attribute name="pseudoFunction" type="xs:boolean"/>
</xs:complexType>
<xs:complexType name="libraryDependenciesType">
<xs:sequence>
<xs:element name="library" type="common:libraryType"
minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:simpleType name="singleThreadedContextType>
<xs:restriction base="xs:string">
<xs:enumeration value="Never"/>
<xs:enumeration value="Always"/>
<xs:enumeration value="WindowBound"/>
<xs:enumeration value="WindowTriggerBound"/>
<xs:enumeration value="WindowEvictionBound"/>
<xs:enumeration value="WindowPartitionEvictionBound"/>
</xs:restriction>
</xs:simpleType>
<xs:complexType name="codeTemplateType">
<xs:sequence>
<xs:element name="description" type="common:descriptionType"
minOccurs="0" maxOccurs="1"/>
<xs:element name="template" type="xs:string"/>
</xs:sequence>
<xs:attribute name="name" type="xs:string" use="required"/>
</xs:complexType>
<xs:complexType name="splExpressionTreeType">
<xs:attribute name="cppCode" type="xs:boolean" use="optional"/>
<xs:attribute name="param" type="xs:boolean" use="optional"/>
<xs:attribute name="output" type="xs:boolean" use="optional"/>
</xs:complexType>
The description
element, which
is optional, provides a common description for the operator.
The iconUri
element,
which is optional, provides one or more URIs to an image file name
to be used to represent the operator. Each URI must have a size attribute
that specifies the size of the icon in pixels. The iconUri
element
supports .gif files, the image must be square,
and users must provide both 16x16 and 32x32 icons. If both sizes are
not provided, the graph view displays a default icon in place of
the missing icon. Listing 5 gives a sample XML segment for the icon
URIs.
Listing 5: Sample icon URI XML segment
<iconUri size="16">file://path/to/file16.gif</iconUri>
<iconUri size="32">file32.gif</iconUri>
The metrics
element, which is optional, contains the list of metrics that are
exposed by the operator. It is structured as an
optional description of all the metrics, followed by a list of metric
elements, where each metric element contains a name
, a
description
, a kind
, and an optional dynamic
element. The kind is an enumeration that can be one
of Counter
, Gauge
, or Time
.
Counter
kind is used to represent metrics whose values are either non-decreasing or
non-increasing, that is, they move in one direction. Gauge
kind is used to
represent metrics that can change their values freely, that is, they can go up or down.
Time
kind is used to represents metrics whose values represent a point in time. The dynamic
element is included and set
to true for a metric that is created dynamically at run time by the operator instead of
automatically by the Teracloud® Streams
instance. This is useful, for example, if the operator creates a variable number of a certain type of
metric, where the number is not known until run time, allowing the metric to be listed in the
operator model and included in the operator documentation generated by spl-make-doc.
Listing 6: Sample metrics XML segment
Listing 6 gives a sample XML segment for the metrics.
<<metrics>
<description>The metrics for this operator relate to tuples.</description>
<metric>
<name>nTuplesFailed</name>
<description>Number of tuples failed</description>
<kind>Counter</kind>
</metric>
<metric>
<name>lastTimeOfFailure</name>
<description>The time of the last failure</description>
<kind>Time</kind>
</metric>
<metric>
<name>nTuplesInTheQueue (port N)</name>
<description>Number of tuples currently in the queue for output port N</description>
<kind>Gauge</kind>
<dynamic>true</dynamic>
</metric>
</metrics>
The customLiterals
element, which
is optional, captures the identifiers that might appear in parameter
configurations of an operator. It is structured as a list of enumeration
elements.
For instance, a Source operator might support different
source formats, in which case you can have an enumeration that is
called FileFormat
that contains values {csv
, xml
, bin
}.
Listing 7: Sample custom literal XML segment
Listing 7 gives a sample XML segment for the custom literals.
<customLiterals>
<enumeration>
<name>FileFormat</name>
<value>csv</value>
<value>xml</value>
<value>bin</value>
</enumeration>
</customLiterals>
An example use in an SPL program is as follows, assuming
that the format
parameter of the FileSource operator
is configured to be of type FileFormat
.
composite Main { //1
type TradeT = rstring ticker, uint32 volume, decimal64 value; //2
graph //3
stream<TradeT> Trades = FileSource() { //4
param file : "Trades.csv"; //5
format : csv; //6
} //7
} //8
The customOutputFunctions
element,
which is optional, captures the output function prototypes that are
used by an operator in its output attribute assignments. It is structured
as a list of customOutputFunctions
elements, where
each enumeration contains a name and a list of output function definitions.
Each definition consists of a prototype, a description, and an optional
flag pseudoFunction
of type boolean
. Set pseudoFunction
to true
when
the operator does complex processing of the output function arguments, when it
treats the arguments as independent values, and not as one complex expression.
For example, an Aggregate operator might support relational aggregations, in which case it can have an enumeration called RelationalAggs
that contains output functions {Min
, Max
, Avg
, Sum
, and so on}.
Listing 8: Sample custom output function XML segment
Listing 8 gives a sample XML segment for the custom output functions. The output functions are specified using the native function declaration syntax (with generics).
<customOutputFunctions>
<customOutputFunction>
<name>RelationalAggregation</name>
<function>
<description>Pick attribute from any tuple</description>
<prototype><![CDATA[ <any T> T Any (T v) ]]></prototype>
</function>
<function>
<description>Return average of given attribute</description>
<prototype><![CDATA[ <numeric T> T Avg (T v) ]]></prototype>
</function>
<function>
<description>Return attributre from last tuple</description>
<prototype><![CDATA[ <any T> T Last (T v) ]]></prototype>
</function>
<function>
<description>Return minimum of given attribute</description>
<prototype><![CDATA[ <ordered T> T Min (T v) ]]></prototype>
</function>
<function>
<description>Return maximum of given attribute</description>
<prototype><![CDATA[ <ordered T> T Max (T v) ]]></prototype>
</function>
<function>
<description>Return sum of given attribute</description>
<prototype><![CDATA[ <numeric T> T Sum (T v) ]]></prototype>
</function>
<function>
<description>Return count of given attribute</description>
<prototype><![CDATA[ <any T> uint64 Count (T v) ]]></prototype>
</function>
</customOutputFunction>
</customOutputFunctions>
RelationalAggregation
enumeration. composite Main { //1
type TradeFilterT = decimal64 price, decimal64 volume, rstring ticker; //2
PreVwapT = rstring ticker, decimal64 minprice, decimal64 maxprice, //3
decimal64 avgprice, decimal64 vwap, decimal64 sumvolume; //4
graph //5
stream<TradeFilterT> TradeFilter = Beacon(){} //6
stream<PreVwapT> PreVwap = Aggregate(TradeFilter) { //7
window TradeFilter : partitioned, sliding, count(4), count(1); //8
param partitionBy : ticker; //9
output PreVwap : ticker = Any(ticker), vwap = Sum(price*volume), //10
minprice = Min(price), maxprice = Max(price), //12
avgprice = Average(price), sumvolume = Sum(volume); //13
} //14
} //15
Listing 9: The library type
<?xmlversion="1.0"encoding="UTF-8"?>
<xs:schema
xmlns="http://www.ibm.com/xmlns/prod/streams/spl/common"
xmlns:xs="http://www.w3.org/2001/XMLSchema"
targetNamespace="http://www.ibm.com/xmlns/prod/streams/spl/common"
elementFormDefault="qualified"
attributeFormDefault="unqualified">
<xs:complexType name="libraryType">
<xs:elemen tname="description" type="common:descriptionType"/>
<xs:element name="managedLibrary" type="managedLibraryType"/>
</xs:complexType>
<xs:complexType name="managedLibraryType">
<xs:sequence>
<xs:element name="lib" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="libPath" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="includePath" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
<xs:element name="command" type="xs:string" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
</xs:complexType>
</xs:schema>
The libraryDependencies
element
captures external library dependencies of an operator. For instance,
a signal processing operator might use an external library for performing
FFT transformations. The library type is specified in a separate xsd schema
document. Each library has a description
element,
which describes the library (such as Foo FFT Library
).
Each library also has a managedLibrary
element that
specifies details of the individual library artifacts. In particular,
it contains zero or more lib
elements, where each
one specifies a name to be passed to C++ compiler's -l
argument
(such as fft
that is translated into -lfft
when
passed to the linker) zero or more libPath
elements,
where each one specifies a path to be passed to C++ compiler's -L
argument;
and zero or more includePath
elements, where each
one specifies a path to be passed to C++ compiler's -I
argument.
The paths can contain environment variables that are embedded between @
signs
(for example: @FOO_FFT_HOME@/lib), which are
fully resolved by the SPL compiler at compile time.
The managedLibrary
element
also contains a command
element, which gives the
path to a program that is executed to retrieve includePath
, libPath
,
and lib
information. If the path to the program is
relative, it is assumed to be rooted at the directory of the operator
model. The program is executed three times, each time with a different
argument, namely lib
, libPath
, and includePath
. 1 The standard output from these executions is
read and each line (trimmed of white spaces) is added to one of the lib
, libPath
,
and includePath
elements, depending on the type of
the execution. A line that begins with # is ignored. Relative
paths are assumed to be rooted at the directory where the operator
model XML document resides.
Listing 10: Sample library XML segment (user managed)
Listing 10 gives a sample XML segment for the library element.
<library>
<cmn:description>FooFFTLibraries</cmn:description>
<cmn:managedLibrary>
<cmn:lib>fft</cmn:lib>
<cmn:libPath>@FOO_FFT_PATH@/lib</cmn:libPath>
<cmn:includePath>@FOO_FFT_PATH@/include</cmn:includePath>
<cmn:command>@HOME@/genOutput</cmn:command>
</cmn:managedLibrary>
</library>
providesSingleThreadedContext
element
describes the threading semantics of the operator with respect to
the flow of execution. An operator provides a single threaded execution
context, if and only if:- It does not perform concurrent
submit
calls unless itsprocess
methods are called concurrently. - Its
submit
calls complete before theprocess
call that triggered the submission completes.
Both source and non-source operators have process methods, and the definition applies globally. Based on this definition, if an operator has submit calls that are not triggered by a process call, such as calls triggered by a time-based event, then that operator does not provide a single threaded execution context. This definition does not require a submit call to execute under the same thread that executes the process call that triggered the submission (even though in the common case they execute under the same thread).
Never
: Instances of this operator never provide a single threaded execution context.Always
: Instances of this operator always provide a single threaded execution context.WindowBound
: Instances of this operator that do not specify time-based window eviction policies or time-based window trigger policies provide a single threaded execution context.WindowEvictionBound
: Instances of this operator that do not specify time-based window eviction policies provide a single threaded execution context.WindowTriggerBound
: Instances of this operator that do not specify time-based window trigger policies provide a single threaded execution context.WindowPartitionEvictionBound
: Instances of this operator use a thread to implement partition eviction. Use this setting if tuples are submitted from theonWindowPartitionEvictionSelection
event.
As an example, consider a Filter operator.
Unless its process
method is being called concurrently,
the Filter operator does not make concurrent submit
calls.
Its submit
calls are triggered by incoming tuples. When
it receives a tuple via a process
call, it makes
a submit
call if the received tuple passes the filter
condition, and that submit
call completes before
the process
call that triggered it is complete. As
a result, all instances of a Filter
operator provide
a single threaded context and the setting Always is
appropriate.
Implementation note: The providesSingleThreadedContext
element
is used to enable the Teracloud® Streams
instance to
avoid unnecessary thread synchronization. Setting it to the value Never
is
safe for all operators. It would prevent optimizations that reduce
synchronization processing when the operator does provide a single
threaded context. Specifying a value other than Never
that
is inconsistent with the threading semantics implemented by the operator results
in undefined behavior.
incrementalCompilationStrategy
element
specifies how the compiler manages incremental compilation of operators.
The two choices are:SourceDependent
: In this mode, the compiler regenerates only the operator source if it is out-of-date with the SPL source or the code generator for that operator. This mode is the default.ResultDependent
: In this mode, the compiler always generates the operator source, but updates only the source files if they differ from what exists before the compile. Use this mode if the operator code generator relies on external configurations that are not captured by the parameterization that is given in the SPL source.
The optional allowCustomLogic
element
specifies whether the use of an operator is allowed to have a logic clause that
specifies either state
, onTuple
or onPunct
processing.
When set to false, no logic clause can be specified for the operator.
The default, in the absence of this element, is true.
The optional codeTemplate
element specifies one or more code templates for the
operator. Each code template has a name attribute that names it, a description element that
describes it, and a value
element, which is a string that contains the
boilerplate code for the template. The parts of the code that are in the form
${name}
are used to indicate the pieces that must be customized by the
user. One example for the Barrier operator is as follows:
<codeTemplates>
<codeTemplate name="Barrier">
<description>Basic Barrier template</description>
<template>
<![CDATA[
stream<${schema}> ${outputStream} = Barrier(${inputStream1};${inputStream2})
{
param
${parameter}: ${parameterExpression};
output
${outputStream}: ${outputExpression};
${cursor}
}
]]>
<template>
</codeTemplate>
</codeTemplates>
splExpressionTree
element controls the generation of SPL
expression trees for use in generic C++ primitive operators. The valid values are
cppCode
, param
, and output
. If the
param
attribute is set to true
, SPL expression trees are generated
for parameters. If the output
attribute is set to true
, SPL
expression trees are generated for output. If the cppCode
attribute is set to
true
, each node in the generated operator instance XML is enhanced with C++ code
using templates. This C++ code can be used to generate the C++ code for an SPL expression. For
example, for the SPL
code:param predicates : {a = "a" == In.letter, b = "b" == In.letter};
The
generated SPL expression tree
includes:<expressionTree cppCode="SPL::BeJwrMUoyTEwyTAIAC7UCCQ({attr:0}, {attr:1})">
<literal cppCode="SPL::BeJwrMUoyTEwyTAIAC7UCCQ({attr:0}, {attr:1})" type="1">
<tuple count="2" cppCode="SPL::BeJwrMUoyTEwyTAIAC7UCCQ({attr:0}, {attr:1})" type="1">
<attr id="a">
<value cppCode="({Lhs} == {Rhs})" type="2">
<expn cppCode="({Lhs} == {Rhs})">
<binary cppCode="({Lhs} == {Rhs})" op="==" type="2">
<lhs cppCode="SPL::rstring("a")">
<literal cppCode="SPL::rstring("a")" type="0">"a"</literal>
</lhs>
<rhs cppCode="iport$0.get_letter()">
<attribute attribute="letter" cppCode="iport$0.get_letter()" type="0">
<lhs cppCode="iport$0">
<stream cppCode="iport$0" name="In" port="0" type="3"/>
</lhs>
</attribute>
</rhs>
</binary>
</expn>
</value>
</attr>
The
templates (for example, {Lhs}, {attr:0}
) are used to ensure that code replacement
is well-defined.These expressions represent the SPL expression, but
are available in a form that can easily be walked. Perl objects are
derived from SPL::Operator::Instance::ExpressionTree
,
and have a kind, type, and methods to access the fields of the expression. ExpressionTreeVisitor
is
a visitor pattern that is provided to allow easy walking of the expression
tree.
The optional capability
element defines special privileges for the operator. Teracloud®
Streams supports the Linux capabilities model via the capability
element. You can include any number of strings to specify the exact privileges your operator requires. For example, <capability>CAP_NET_RAW+eip</capability>
indicates that the operator needs permission to access raw sockets. Note that the Teracloud®
Streams instance must be configured to allow PE processes to run with special operating system capabilities.
The optional verificationModule
attribute is the path name of a Perl file (without the .pm suffix). This attribute can be used to check parameters, inputs, and outputs for an operator invocation. The path is relative to the operator model directory. The name is used in a Perl driver program, and the Perl verify
routine is called with the Operator Instance Model as the parameter. Simplification of parameters and other expressions is done beforehand. No replacement of getSubmissionTimeValue
or lit$X
is done.
The verification is performed for each operator in the SPL program that has verificationModule
set in the associated operator model. errorln
and warnln
can be used from within verify($model)
. If errorln
is called, compilation of the SPL program will stop after all the operators are verified.
verificationModule
can be used to perform compile time checking for all invocations, even if rewriteAllowed
is true
for a parameter, and similar operator invocations are merged into one at compile time.