Context

The context element describes the properties that apply to the operator as a whole and are not associated with particular parameters or ports of the operator. It also includes common definitions that are referenced in other places in the operator model.

Listing 4: The context type

Listing 4 gives the basic structure.

 <xs:complexTypename=contextType">
   <xs:sequence> 
     <xs:element name="description" type="common:descriptionType" minOccurs="0" maxOccurs="1"/>
     <xs:element name="iconUri" type="iconUriType" minOccurs="0" maxOccurs="unbounded"/>
     <xs:element name="metrics" minOccurs="0" maxOccurs="1"/>
     <xs:element name="customLiterals" type="enumerationsType" minOccurs="0" maxOccurs="1"/>
     <xs:element name="customOutputFunctions" type="customOutputFunctionsType" 
                 minOccurs="0" maxOccurs="1"/>
     <xs:element name="libraryDependencies" type="libraryDependenciesType" 
                 minOccurs="0" maxOccurs="1"/>
     <xs:element name="providesSingleThreadedContext" type="singleThreadedContextType"/>
     <xs:element name="incrementalCompilationStrategy" type="incrementalCompilationStrategyType" 
                 minOccurs="0" maxOccurs="1"/>
     <xs:element name="allowCustomLogic" type="xs:boolean" minOccurs="0" maxOccurs="1"/>
     <xs:element name="codeTemplates" type="codeTemplatesType" minOccurs="0" maxOccurs="1"/>
     <xs:element name="splExpressionTree" type="splExpressionTreeType" 
                 minOccurs="0" maxOccurs="1"/>
     <xs:element name="capability" type="xs:string" 
                 minOccurs="0" maxOccurs="unbounded"/>
   </xs:sequence>
   <xs:attribute name="verificationModule" type="xs:token"/>
 </xs:complexType>
          
 <xs:complexType name="iconUriType">
   <xs:simpleContent>
     <xs:extension base="xs:string">
       <xs:attribute name="size" type="xs:int" use="required"/>
     </xs:extension>
   </xs:simpleContent>
 </xs:complexType>
          
<xs:complexType name="metricsType">
   <xs:sequence>
     <xs:element name="description" type="common:descriptionType" minOccurs="0" maxOccurs="1"/>
     <xs:element name="metric" type="metricType" minOccurs="0" maxOccurs="unbounded"/>
   </xs:sequence>
 </xs:complexType>
                
 <xs:complexType name="metricType">
   <xs:sequence>
     <xs:element name="name" type="xs:string"/>
     <xs:element name="description" type="common:descriptionType"/>
     <xs:element name="kind" type="metricKindType"/>
     <xs:element name="dynamic" type="xs:boolean" minOccurs="0" maxOccurs="1"/>
   </xs:sequence>
  </xs:complexType>
                  
 <xs:simpleType name="metricKindType">
   <xs:restriction base="xs:string">
     <xs:enumeration value="Counter"/>
     <xs:enumeration value="Gauge"/>
     <xs:enumeration value="Time"/>
   </xs:restriction>
 </xs:simpleType>
 
<xs:complexType name="enumerationsType">
   <xs:sequence>
     <xs:element name="enumeration" type="enumerationType" 
                 minOccurs="0" maxOccurs="unbounded"/>
   </xs:sequence>
 </xs:complexType>
            
 <xs:complexType name="enumerationType">
   <xs:sequence>
     <xs:element name="name" type="xs:string"/>
     <xs:element name="value" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
   </xs:sequence>
 </xs:complexType>
              
 <xs:complexType name="customOutputFunctionsType">
  <xs:sequence>
    <xs:element name="customOutputFunction" type="customOutputFunctionSetType" 
                minOccurs="0" maxOccurs="unbounded"/>
  </xs:sequence>
</xs:complexType>
<xs:complexType name="customOutputFunctionSetType">
  <xs:sequence>
    <xs:element name="name" type="xs:string"/>
    <xs:element name="function" type="customOutputFunctionType" 
                minOccurs="0" maxOccurs="unbounded"/>
  </xs:sequence>
</xs:complexType>

<xs:complexType name="customOutputFunctionType">
  <xs:sequence>
    <xs:element name="description" type="common:descriptionType" 
                minOccurs="0" maxOccurs="1"/>
    <xs:element name="prototype" type="xs:string"/>
  </xs:sequence>
  <xs:attribute name="pseudoFunction" type="xs:boolean"/>
</xs:complexType>
                      
 <xs:complexType name="libraryDependenciesType">
   <xs:sequence>
     <xs:element name="library" type="common:libraryType" 
                 minOccurs="0" maxOccurs="unbounded"/>
   </xs:sequence>
 </xs:complexType>
                        
 <xs:simpleType name="singleThreadedContextType>
   <xs:restriction base="xs:string">
     <xs:enumeration value="Never"/>
     <xs:enumeration value="Always"/>
     <xs:enumeration value="WindowBound"/>
     <xs:enumeration value="WindowTriggerBound"/>
     <xs:enumeration value="WindowEvictionBound"/>
     <xs:enumeration value="WindowPartitionEvictionBound"/>
   </xs:restriction>
 </xs:simpleType>

<xs:complexType name="codeTemplateType">
  <xs:sequence>
    <xs:element name="description" type="common:descriptionType" 
                minOccurs="0" maxOccurs="1"/>
    <xs:element name="template" type="xs:string"/>
  </xs:sequence>
  <xs:attribute name="name" type="xs:string" use="required"/>
</xs:complexType>

<xs:complexType name="splExpressionTreeType">
  <xs:attribute name="cppCode" type="xs:boolean" use="optional"/>
  <xs:attribute name="param" type="xs:boolean" use="optional"/>
  <xs:attribute name="output" type="xs:boolean" use="optional"/>
 </xs:complexType>

The description element, which is optional, provides a common description for the operator.

The iconUri element, which is optional, provides one or more URIs to an image file name to be used to represent the operator. Each URI must have a size attribute that specifies the size of the icon in pixels. The iconUri element supports .gif files, the image must be square, and users must provide both 16x16 and 32x32 icons. If both sizes are not provided, the graph view displays a default icon in place of the missing icon. Listing 5 gives a sample XML segment for the icon URIs.

Listing 5: Sample icon URI XML segment

<iconUri size="16">file://path/to/file16.gif</iconUri>
<iconUri size="32">file32.gif</iconUri> 

The metrics element, which is optional, contains the list of metrics that are exposed by the operator. It is structured as an optional description of all the metrics, followed by a list of metric elements, where each metric element contains a name, a description, a kind, and an optional dynamic element. The kind is an enumeration that can be one of Counter, Gauge, or Time. Counter kind is used to represent metrics whose values are either non-decreasing or non-increasing, that is, they move in one direction. Gauge kind is used to represent metrics that can change their values freely, that is, they can go up or down. Time kind is used to represents metrics whose values represent a point in time. The dynamic element is included and set to true for a metric that is created dynamically at run time by the operator instead of automatically by the Teracloud® Streams instance. This is useful, for example, if the operator creates a variable number of a certain type of metric, where the number is not known until run time, allowing the metric to be listed in the operator model and included in the operator documentation generated by spl-make-doc.

Listing 6: Sample metrics XML segment

Listing 6 gives a sample XML segment for the metrics.

<<metrics>
  <description>The metrics for this operator relate to tuples.</description>
  <metric>
    <name>nTuplesFailed</name>
    <description>Number of tuples failed</description>
    <kind>Counter</kind>
  </metric>
  <metric>
    <name>lastTimeOfFailure</name>
    <description>The time of the last failure</description>
    <kind>Time</kind>
  </metric>
  <metric>
    <name>nTuplesInTheQueue (port N)</name>
    <description>Number of tuples currently in the queue for output port N</description>
    <kind>Gauge</kind>
    <dynamic>true</dynamic>
  </metric>
</metrics>

The customLiterals element, which is optional, captures the identifiers that might appear in parameter configurations of an operator. It is structured as a list of enumeration elements. For instance, a Source operator might support different source formats, in which case you can have an enumeration that is called FileFormat that contains values {csv, xml, bin}.

Listing 7: Sample custom literal XML segment

Listing 7 gives a sample XML segment for the custom literals.

<customLiterals>
  <enumeration>
    <name>FileFormat</name>
    <value>csv</value>
    <value>xml</value>
    <value>bin</value>
  </enumeration>
</customLiterals>

An example use in an SPL program is as follows, assuming that the format parameter of the FileSource operator is configured to be of type FileFormat.

composite Main {                                                  //1
  type TradeT = rstring ticker, uint32 volume, decimal64 value;   //2
  graph                                                           //3
    stream<TradeT> Trades = FileSource() {                        //4
      param file   : "Trades.csv";                                //5
            format : csv;                                         //6
    }                                                             //7
}                                                                 //8

The customOutputFunctions element, which is optional, captures the output function prototypes that are used by an operator in its output attribute assignments. It is structured as a list of customOutputFunctions elements, where each enumeration contains a name and a list of output function definitions. Each definition consists of a prototype, a description, and an optional flag pseudoFunction of type boolean. Set pseudoFunction to true when the operator does complex processing of the output function arguments, when it treats the arguments as independent values, and not as one complex expression.

For example, an Aggregate operator might support relational aggregations, in which case it can have an enumeration called RelationalAggs that contains output functions {Min, Max, Avg, Sum, and so on}.

Listing 8: Sample custom output function XML segment

Listing 8 gives a sample XML segment for the custom output functions. The output functions are specified using the native function declaration syntax (with generics).

<customOutputFunctions>
    <customOutputFunction>
        <name>RelationalAggregation</name>
        <function>
            <description>Pick attribute from any tuple</description>
            <prototype><![CDATA[ <any T> T Any (T v) ]]></prototype>
        </function>
        <function>
            <description>Return average of given attribute</description>
            <prototype><![CDATA[ <numeric T> T Avg (T v) ]]></prototype>
        </function>
        <function>
            <description>Return attributre from last tuple</description>
            <prototype><![CDATA[ <any T> T Last (T v) ]]></prototype>
        </function>
        <function>
            <description>Return minimum of given attribute</description>
            <prototype><![CDATA[ <ordered T> T Min (T v) ]]></prototype>
        </function>
        <function>            
            <description>Return maximum of given attribute</description>
            <prototype><![CDATA[ <ordered T> T Max (T v) ]]></prototype>
        </function>
        <function>
            <description>Return sum of given attribute</description>
            <prototype><![CDATA[ <numeric T> T Sum (T v) ]]></prototype>
        </function>
        <function>
            <description>Return count of given attribute</description>
            <prototype><![CDATA[ <any T> uint64 Count (T v) ]]></prototype>
        </function>    
     </customOutputFunction>
</customOutputFunctions>
An example use in an SPL program is as follows, assuming that the output attribute assignments of the Aggregate operator are configured to use output functions from the RelationalAggregation enumeration.
composite Main {                                                                 //1
  type TradeFilterT = decimal64 price, decimal64 volume, rstring ticker;         //2
       PreVwapT = rstring ticker, decimal64 minprice, decimal64 maxprice,        //3
                  decimal64 avgprice, decimal64 vwap, decimal64 sumvolume;       //4
  graph                                                                          //5
    stream<TradeFilterT> TradeFilter = Beacon(){}                               //6
    stream<PreVwapT> PreVwap = Aggregate(TradeFilter) {                          //7
      window TradeFilter : partitioned, sliding, count(4), count(1);             //8
      param  partitionBy : ticker;                                               //9
      output PreVwap     : ticker   = Any(ticker), vwap = Sum(price*volume),     //10
                           minprice = Min(price),  maxprice = Max(price),        //12
                           avgprice = Average(price),  sumvolume = Sum(volume);  //13
    }                                                                            //14
}                                                                                //15

Listing 9: The library type

<?xmlversion="1.0"encoding="UTF-8"?>
<xs:schema
  xmlns="http://www.ibm.com/xmlns/prod/streams/spl/common"
  xmlns:xs="http://www.w3.org/2001/XMLSchema"
  targetNamespace="http://www.ibm.com/xmlns/prod/streams/spl/common"
  elementFormDefault="qualified"
  attributeFormDefault="unqualified">
          
<xs:complexType name="libraryType">
  <xs:elemen tname="description" type="common:descriptionType"/>
  <xs:element name="managedLibrary" type="managedLibraryType"/>
</xs:complexType>
            
<xs:complexType name="managedLibraryType">
  <xs:sequence>
    <xs:element name="lib" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
    <xs:element name="libPath" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
    <xs:element name="includePath" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
    <xs:element name="command" type="xs:string" minOccurs="0" maxOccurs="1"/>
  </xs:sequence>
</xs:complexType>
</xs:schema> 

The libraryDependencies element captures external library dependencies of an operator. For instance, a signal processing operator might use an external library for performing FFT transformations. The library type is specified in a separate xsd schema document. Each library has a description element, which describes the library (such as Foo FFT Library). Each library also has a managedLibrary element that specifies details of the individual library artifacts. In particular, it contains zero or more lib elements, where each one specifies a name to be passed to C++ compiler's -l argument (such as fft that is translated into -lfft when passed to the linker) zero or more libPath elements, where each one specifies a path to be passed to C++ compiler's -L argument; and zero or more includePath elements, where each one specifies a path to be passed to C++ compiler's -I argument. The paths can contain environment variables that are embedded between @ signs (for example: @FOO_FFT_HOME@/lib), which are fully resolved by the SPL compiler at compile time.

The managedLibrary element also contains a command element, which gives the path to a program that is executed to retrieve includePath, libPath, and lib information. If the path to the program is relative, it is assumed to be rooted at the directory of the operator model. The program is executed three times, each time with a different argument, namely lib, libPath, and includePath. 1 The standard output from these executions is read and each line (trimmed of white spaces) is added to one of the lib, libPath, and includePath elements, depending on the type of the execution. A line that begins with # is ignored. Relative paths are assumed to be rooted at the directory where the operator model XML document resides.

Listing 10: Sample library XML segment (user managed)

Listing 10 gives a sample XML segment for the library element.

<library>
  <cmn:description>FooFFTLibraries</cmn:description>
  <cmn:managedLibrary>
    <cmn:lib>fft</cmn:lib>
    <cmn:libPath>@FOO_FFT_PATH@/lib</cmn:libPath>
    <cmn:includePath>@FOO_FFT_PATH@/include</cmn:includePath>
    <cmn:command>@HOME@/genOutput</cmn:command>
  </cmn:managedLibrary>
</library>
      
The providesSingleThreadedContext element describes the threading semantics of the operator with respect to the flow of execution. An operator provides a single threaded execution context, if and only if:
  • It does not perform concurrent submit calls unless its process methods are called concurrently.
  • Its submit calls complete before the process call that triggered the submission completes.

Both source and non-source operators have process methods, and the definition applies globally. Based on this definition, if an operator has submit calls that are not triggered by a process call, such as calls triggered by a time-based event, then that operator does not provide a single threaded execution context. This definition does not require a submit call to execute under the same thread that executes the process call that triggered the submission (even though in the common case they execute under the same thread).

There are several valid values for this property:
  • Never: Instances of this operator never provide a single threaded execution context.
  • Always: Instances of this operator always provide a single threaded execution context.
  • WindowBound: Instances of this operator that do not specify time-based window eviction policies or time-based window trigger policies provide a single threaded execution context.
  • WindowEvictionBound: Instances of this operator that do not specify time-based window eviction policies provide a single threaded execution context.
  • WindowTriggerBound: Instances of this operator that do not specify time-based window trigger policies provide a single threaded execution context.
  • WindowPartitionEvictionBound: Instances of this operator use a thread to implement partition eviction. Use this setting if tuples are submitted from the onWindowPartitionEvictionSelection event.

As an example, consider a Filter operator. Unless its process method is being called concurrently, the Filter operator does not make concurrent submit calls. Its submit calls are triggered by incoming tuples. When it receives a tuple via a process call, it makes a submit call if the received tuple passes the filter condition, and that submit call completes before the process call that triggered it is complete. As a result, all instances of a Filter operator provide a single threaded context and the setting Always is appropriate.

Implementation note: The providesSingleThreadedContext element is used to enable the Teracloud® Streams instance to avoid unnecessary thread synchronization. Setting it to the value Never is safe for all operators. It would prevent optimizations that reduce synchronization processing when the operator does provide a single threaded context. Specifying a value other than Never that is inconsistent with the threading semantics implemented by the operator results in undefined behavior.

The incrementalCompilationStrategy element specifies how the compiler manages incremental compilation of operators. The two choices are:
  • SourceDependent: In this mode, the compiler regenerates only the operator source if it is out-of-date with the SPL source or the code generator for that operator. This mode is the default.
  • ResultDependent: In this mode, the compiler always generates the operator source, but updates only the source files if they differ from what exists before the compile. Use this mode if the operator code generator relies on external configurations that are not captured by the parameterization that is given in the SPL source.

The optional allowCustomLogic element specifies whether the use of an operator is allowed to have a logic clause that specifies either state, onTuple or onPunct processing. When set to false, no logic clause can be specified for the operator. The default, in the absence of this element, is true.

The optional codeTemplate element specifies one or more code templates for the operator. Each code template has a name attribute that names it, a description element that describes it, and a value element, which is a string that contains the boilerplate code for the template. The parts of the code that are in the form ${name} are used to indicate the pieces that must be customized by the user. One example for the Barrier operator is as follows:

<codeTemplates>
  <codeTemplate name="Barrier">
    <description>Basic Barrier template</description>
    <template>
      <![CDATA[ 
        stream<${schema}> ${outputStream} = Barrier(${inputStream1};${inputStream2}) 
        {
          param
            ${parameter}: ${parameterExpression};
          output
            ${outputStream}: ${outputExpression};
          ${cursor} 
        }
      ]]>
    <template>
  </codeTemplate>
</codeTemplates>
The splExpressionTree element controls the generation of SPL expression trees for use in generic C++ primitive operators. The valid values are cppCode, param, and output. If the param attribute is set to true, SPL expression trees are generated for parameters. If the output attribute is set to true, SPL expression trees are generated for output. If the cppCode attribute is set to true, each node in the generated operator instance XML is enhanced with C++ code using templates. This C++ code can be used to generate the C++ code for an SPL expression. For example, for the SPL code:
param predicates : {a = "a" == In.letter, b = "b" == In.letter};
The generated SPL expression tree includes:
<expressionTree cppCode="SPL::BeJwrMUoyTEwyTAIAC7UCCQ({attr:0}, {attr:1})">
  <literal cppCode="SPL::BeJwrMUoyTEwyTAIAC7UCCQ({attr:0}, {attr:1})" type="1">
    <tuple count="2" cppCode="SPL::BeJwrMUoyTEwyTAIAC7UCCQ({attr:0}, {attr:1})" type="1">
      <attr id="a">
        <value cppCode="({Lhs} == {Rhs})" type="2">
          <expn cppCode="({Lhs} == {Rhs})">
            <binary cppCode="({Lhs} == {Rhs})" op="==" type="2">
              <lhs cppCode="SPL::rstring("a")">
                <literal cppCode="SPL::rstring("a")" type="0">"a"</literal>
              </lhs>
              <rhs cppCode="iport$0.get_letter()">
                <attribute attribute="letter" cppCode="iport$0.get_letter()" type="0">
                  <lhs cppCode="iport$0">
                    <stream cppCode="iport$0" name="In" port="0" type="3"/>
                  </lhs>
                </attribute>
              </rhs>
            </binary>
          </expn>
        </value>
      </attr>
The templates (for example, {Lhs}, {attr:0}) are used to ensure that code replacement is well-defined.

These expressions represent the SPL expression, but are available in a form that can easily be walked. Perl objects are derived from SPL::Operator::Instance::ExpressionTree, and have a kind, type, and methods to access the fields of the expression. ExpressionTreeVisitor is a visitor pattern that is provided to allow easy walking of the expression tree.

The optional capability element defines special privileges for the operator. Teracloud® Streams supports the Linux capabilities model via the capability element. You can include any number of strings to specify the exact privileges your operator requires. For example, <capability>CAP_NET_RAW+eip</capability> indicates that the operator needs permission to access raw sockets. Note that the Teracloud® Streams instance must be configured to allow PE processes to run with special operating system capabilities.

The optional verificationModule attribute is the path name of a Perl file (without the .pm suffix). This attribute can be used to check parameters, inputs, and outputs for an operator invocation. The path is relative to the operator model directory. The name is used in a Perl driver program, and the Perl verify routine is called with the Operator Instance Model as the parameter. Simplification of parameters and other expressions is done beforehand. No replacement of getSubmissionTimeValue or lit$X is done.

The verification is performed for each operator in the SPL program that has verificationModule set in the associated operator model. errorln and warnln can be used from within verify($model). If errorln is called, compilation of the SPL program will stop after all the operators are verified.

verificationModule can be used to perform compile time checking for all invocations, even if rewriteAllowed is true for a parameter, and similar operator invocations are merged into one at compile time.

1 Future releases might pass additional arguments and thus the command should be tolerant of them.