streamsx.spl.spl
SPL Python primitive operators.
Overview
SPL primitive operators that call a Python function or class methods are created by decorators provided by this module.
The name of the function or callable class becomes the name of the operator.
A decorated function is a stateless operator while a decorated class is an optionally stateful operator.
These are the supported decorators that create an SPL operator:
@spl.source
- Creates a source operator that produces tuples.
@spl.filter
- Creates a operator that filters tuples.
@spl.map
- Creates a operator that maps input tuples to output tuples.
@spl.for_each
- Creates a operator that terminates a stream processing each tuple.
@spl.primitive_operator
- Creates an SPL primitive operator that has an arbitrary number of input and output ports.
Decorated functions and classes must be located in the directory
opt/python/streams
in the SPL toolkit. Each module in that directory
will be inspected for operators during extraction. Each module defines
the SPL namespace for its operators by the function spl_namespace
,
for example:
from streamsx.spl import spl
def spl_namespace():
return 'com.example.ops'
@spl.map()
def Pass(*tuple_):
return tuple_
creates a pass-through operator com.example.ops::Pass
.
SPL primitive operators are created by executing the extraction script spl-python-extract against the SPL toolkit. Once created the operators become part of the toolkit and may be used like any other SPL operator.
Python classes as SPL operators
Overview
Decorating a Python class creates a stateful SPL operator where the instance fields of the class are the operator’s state. An instance of the class is created when the SPL operator invocation is initialized at SPL runtime. The instance of the Python class is private to the SPL operator and is maintained for the lifetime of the operator.
If the class has instance fields then they are the state of the operator and are private to each invocation of the operator.
If the __init__ method has parameters beyond the first self parameter then they are mapped to operator parameters. Any parameter that has a default value becomes an optional parameter to the SPL operator. Parameters of the form *args and **kwargs are not supported.
Warning
Parameter names must be valid SPL identifers, SPL identifiers start with an ASCII letter or underscore, followed by ASCII letters, digits, or underscores. The name also must not be an SPL keyword.
Parameter names suppress
and include
are reserved.
The value of the operator parameters at SPL operator invocation are passed to the __init__ method. This is equivalent to creating an instance of the class passing the operator parameters into the constructor.
For example, with this decorated class producing an SPL source operator:
@spl.source()
class Range(object):
def __init__(self, stop, start=0):
self.start = start
self.stop = stop
def __iter__(self):
return zip(range(self.start, self.stop))
The SPL operator Range has two parameters, stop is mandatory and start is optional, defaulting to zero. Thus the SPL operator may be invoked as:
// Produces the sequence of values from 0 to 99
//
// Creates an instance of the Python class
// Range using Range(100)
//
stream<int32 seq> R = Range() {
param
stop: 100;
}
or both operator parameters can be set:
// Produces the sequence of values from 50 to 74
//
// Creates an instance of the Python class
// Range using Range(75, 50)
//
stream<int32 seq> R = Range() {
param
start: 50;
stop: 75;
}
Operator state
Use of a class allows the operator to be stateful by maintaining state in instance attributes across invocations (tuple processing).
When the operator is in a consistent region or checkpointing then it is serialized using dill. The default serialization may be modified by using the standard Python pickle mechanism of __getstate__
and __setstate__
. This is required if the state includes objects that cannot be serialized, for example file descriptors. For details see See https://docs.python.org/3.5/library/pickle.html#handling-stateful-objects .
If the class has __enter__
and __exit__
context manager methods then __enter__
is called after the instance has been deserialized by dill. Thus __enter__
is used to recreate runtime objects that cannot be serialized such as open files or sockets.
Operator initialization & shutdown
Execution of an instance for an operator effectively run in a context manager so that an instance’s __enter__
method is called when the processing element containing the operator is initialized
and its __exit__
method called when the processing element is stopped. To take advantage of this
the class must define both __enter__
and __exit__
methods.
Note
Initialization such as opening files should be in __enter__
in order to support stateful operator restart & checkpointing.
Example of using __enter__
and __exit__
to open and close a file:
import streamsx.ec as ec
@spl.map()
class Sentiment(object):
def __init__(self, name):
self.name = name
self.file = None
def __enter__(self):
self.file = open(self.name, 'r')
def __exit__(self, exc_type, exc_value, traceback):
if self.file is not None:
self.file.close()
def __call__(self):
pass
When an instance defines a valid __exit__
method then it will be called with an exception when:
the instance raises an exception during processing of a tuple
a data conversion exception is raised converting a Python value to an SPL tuple or attribute
If __exit__
returns a true value then the exception is suppressed and processing continues, otherwise the enclosing processing element will be terminated.
Application log and trace
Streams provides application trace and log services which are accesible through standard Python loggers from the logging module.
Python functions as SPL operators
Decorating a Python function creates a stateless SPL operator. In SPL terms this is similar to an SPL Custom operator, where the code in the Python function is the custom code. For operators with input ports the function is called for each input tuple, passing a Python representation of the SPL input tuple. For an SPL source operator the function is called to obtain an iterable whose contents will be submitted to the output stream as SPL tuples.
Operator parameters are not supported.
An example SPL sink operator that prints each input SPL tuple after its conversion to a Python tuple:
@spl.for_each()
def PrintTuple(*tuple_):
"Print each tuple to standard out."
print(tuple_, flush=True)
Processing SPL tuples in Python
SPL tuples are converted to Python objects and passed to a decorated callable.
Overview
For each SPL tuple arriving at an input port a Python function is called with the SPL tuple converted to Python values suitable for the function call. How the tuple is passed is defined by the tuple passing style.
Tuple Passing Styles
- An input tuple can be passed to Python function using a number of different styles:
dictionary
tuple
attributes by name not yet implemented
attributes by position
Dictionary
Passing the SPL tuple as a Python dictionary is flexible
and makes the operator independent of any schema.
A disadvantage is the reduction in code readability
for Python function by not having formal parameters,
though getters such as tuple['id']
mitigate that to some extent.
If the function is general purpose and can derive meaning
from the keys that are the attribute names then **kwargs
can be useful.
When the only function parameter is **kwargs
(e.g. def myfunc(**tuple_):
) then the passing style is dictionary.
All of the attributes are passed in the dictionary using the SPL schema attribute name as the key.
Tuple
Passing the SPL tuple as a Python tuple is flexible
and makes the operator independent of any schema
but is brittle to changes in the SPL schema.
Another disadvantage is the reduction in code readability
for Python function by not having formal parameters.
However if the function is general purpose and independent
of the tuple contents *args
can be useful.
When the only function parameter is *args
(e.g. def myfunc(*tuple_):
) then the passing style is tuple.
All of the attributes are passed as a Python tuple with the order of values matching the order of the SPL schema.
Attributes by name
(not yet implemented)
Passing attributes by name can be robust against changes in the SPL scheme, e.g. additional attributes being added in the middle of the schema, but does require that the SPL schema has matching attribute names.
When attributes by name is used then SPL tuple attributes
are passed to the function by name for formal parameters.
Order of the attributes and parameters need not match.
This is supported for function parameters of
kind POSITIONAL_OR_KEYWORD
and KEYWORD_ONLY
.
If the function signature also contains a parameter of the form
**kwargs
(VAR_KEYWORD
) then any attributes not bound to
formal parameters are passed in its dictionary using the
SPL schema attribute name as the key.
If the function signature also contains an arbitrary argument
list *args
then any attributes not bound to formal parameters
or to **kwargs
are passed in order of the SPL schema.
If there are only formal parameters any non-bound attributes are not passed into the function.
Attributes by position
Passing attributes by position allows the SPL operator to be independent of the SPL schema but is brittle to changes in the SPL schema. For example a function expecting an identifier and a sensor reading as the first two attributes would break if an attribute representing region was added as the first SPL attribute.
When attributes by position is used then SPL tuple attributes are passed to the function by position for formal parameters. The first SPL attribute in the tuple is passed as the first parameter. This is supported for function parameters of kind POSITIONAL_OR_KEYWORD.
If the function signature also contains an arbitrary argument list *args (VAR_POSITIONAL) then any attributes not bound to formal parameters are passed in order of the SPL schema.
The function signature must not contain a parameter of the form
**kwargs
(VAR_KEYWORD).
If there are only formal parameters any non-bound attributes are not passed into the function.
The SPL schema must have at least the number of positional arguments the function requires.
Selecting the style
For signatures only containing a parameter of the form
*args
or **kwargs
the style is implicitly defined:
def f(**tuple_)
- dictionary -tuple_
will contain a dictionary of all of the SPL tuple attribute’s values with the keys being the attribute names.
def f(*tuple_)
- tuple -tuple_
will contain all of the SPL tuple attribute’s values in order of the SPL schema definition.
Otherwise the style is set by the style
parameter to the decorator,
defaulting to attributes by name. The style value can be set to:
'name'
- attributes by name (the default)
'position'
- attributes by position
Examples
These examples show how an SPL tuple with the schema and value:
tuple<rstring id, float64 temp, boolean increase>
{id='battery', temp=23.7, increase=true}
is passed into a variety of functions by showing the effective Python call and the resulting values of the function’s parameters.
Dictionary consuming all attributes by **kwargs
:
@spl.map()
def f(**tuple_)
pass
# f({'id':'battery', 'temp':23.7, 'increase': True})
# tuple_={'id':'battery', 'temp':23.7, 'increase':True}
Tuple consuming all attributes by *args
:
@spl.map()
def f(*tuple_)
pass
# f('battery', 23.7, True)
# tuple_=('battery',23.7, True)
Attributes by name consuming all attributes:
@spl.map()
def f(id, temp, increase)
pass
# f(id='battery', temp=23.7, increase=True)
# id='battery'
# temp=23.7
# increase=True
Attributes by name consuming a subset of attributes:
@spl.map()
def f(id, temp)
pass
# f(id='battery', temp=23.7)
# id='battery'
# temp=23.7
Attributes by name consuming a subset of attributes in a different order:
@spl.map()
def f(increase, temp)
pass
# f(temp=23.7, increase=True)
# increase=True
# temp=23.7
Attributes by name consuming id by name and remaining attributes by **kwargs
:
@spl.map()
def f(id, **tuple_)
pass
# f(id='battery', {'temp':23.7, 'increase':True})
# id='battery'
# tuple_={'temp':23.7, 'increase':True}
Attributes by name consuming id by name and remaining attributes by *args
:
@spl.map()
def f(id, *tuple_)
pass
# f(id='battery', 23.7, True)
# id='battery'
# tuple_=(23.7, True)
Attributes by position consuming all attributes:
@spl.map(style='position')
def f(key, value, up)
pass
# f('battery', 23.7, True)
# key='battery'
# value=23.7
# up=True
Attributes by position consuming a subset of attributes:
@spl.map(style='position')
def f(a, b)
pass
# f('battery', 23.7)
# a='battery'
# b=23.7
Attributes by position consuming id by position and remaining attributes by *args
:
@spl.map(style='position')
def f(key, *tuple_)
pass
# f('battery', 23.7, True)
# key='battery'
# tuple_=(23.7, True)
In all cases the SPL tuple must be able to provide all parameters required by the function. If the SPL schema is insufficient then an error will result, typically an SPL compile time error.
The SPL schema can provide a subset of the formal parameters if the remaining attributes are optional (having a default).
Attributes by name consuming a subset of attributes with an optional parameter not matched by the schema:
@spl.map()
def f(id, temp, pressure=None)
pass
# f(id='battery', temp=23.7)
# id='battery'
# temp=23.7
# pressure=None
Submission of SPL tuples from Python
The return from a decorated callable results in submission of SPL tuples on the associated outut port.
- A Python function must return:
None
a Python tuple
a Python dictionary
a list containing any of the above.
None
When None
is return then no tuple will be submitted to
the operator output port.
Python tuple
When a Python tuple is returned it is converted to an SPL tuple and submitted to the output port.
The values of a Python tuple are assigned to an output SPL tuple by position, so the first value in the Python tuple is assigned to the first attribute in the SPL tuple:
# SPL input schema: tuple<int32 x, float64 y>
# SPL output schema: tuple<int32 x, float64 y, float32 z>
@spl.map(style='position')
def myfunc(a,b):
return (a,b,a+b)
# The SPL output will be:
# All values explictly set by returned Python tuple
# based on the x,y values from the input tuple
# x is set to: x
# y is set to: y
# z is set to: x+y
The returned tuple may be sparse, any attribute value in the tuple
that is None
will be set to their SPL default or copied from
a matching attribute in the input tuple
(same name and type,
or same name and same type as the underlying type of an output attribute
with an optional type),
depending on the operator kind:
# SPL input schema: tuple<int32 x, float64 y>
# SPL output schema: tuple<int32 x, float64 y, float32 z>
@spl.map(style='position')
def myfunc(a,b):
return (a,None,a+b)
# The SPL output will be:
# x is set to: x (explictly set by returned Python tuple)
# y is set to: y (set by matching input SPL attribute)
# z is set to: x+y
When a returned tuple has fewer values than attributes in the SPL output schema the attributes not set by the Python function will be set to their SPL default or copied from a matching attribute in the input tuple (same name and type, or same name and same type as the underlying type of an output attribute with an optional type), depending on the operator kind:
# SPL input schema: tuple<int32 x, float64 y>
# SPL output schema: tuple<int32 x, float64 y, float32 z>
@spl.map(style='position')
def myfunc(a,b):
return a,
# The SPL output will be:
# x is set to: x (explictly set by returned Python tuple)
# y is set to: y (set by matching input SPL attribute)
# z is set to: 0 (default int32 value)
When a returned tuple has more values than attributes in the SPL output schema then the additional values are ignored:
# SPL input schema: tuple<int32 x, float64 y>
# SPL output schema: tuple<int32 x, float64 y, float32 z>
@spl.map(style='position')
def myfunc(a,b):
return (a,b,a+b,a/b)
# The SPL output will be:
# All values explictly set by returned Python tuple
# based on the x,y values from the input tuple
# x is set to: x
# y is set to: y
# z is set to: x+y
#
# The fourth value in the tuple a/b = x/y is ignored.
Python dictionary
A Python dictionary is converted to an SPL tuple for submission to
the associated output port. An SPL attribute is set from the
dictionary if the dictionary contains a key equal to the attribute
name. The value is used to set the attribute, unless the value is
None
.
If the value in the dictionary is None
, or no matching key exists,
then the attribute value is set to its SPL default or copied from
a matching attribute in the input tuple (same name and type,
or same name and same type as the underlying type of an output attribute
with an optional type), depending on the operator kind.
Any keys in the dictionary that do not map to SPL attribute names are ignored.
Python list
When a list is returned, each value is converted to an SPL tuple and submitted to the output port, in order of the list starting with the first element (position 0). If the list contains None at an index then no SPL tuple is submitted for that index.
The list must only contain Python tuples, dictionaries or None. The list can contain a mix of valid values.
The list may be empty resulting in no tuples being submitted.
Module contents
Functions
|
Is a module being loaded by |
|
Decorator to ignore a Python function. |
Classes
|
Primitive operator super class. |
|
Decorator that creates a filter SPL operator from a callable class or function. |
|
Creates an SPL operator with a single input port. |
|
Declare an input port and its processor method. |
|
Decorator to create a map SPL operator from a callable class or function. |
|
Creates an SPL primitive operator with an arbitrary number of input ports and output ports. |
|
Create a source SPL operator from an iterable. |