Python operator lifecycle
Python operators have a runtime lifecycle that starts when an instance of the operator is initialized and ends when the processing element (PE) that hosts the operator is shut down.
Operators that are implemented with the Python Operator API are subclasses of
streams.Operator
and must have a two argument constructor taking
ctx
and params
. They must also provide the
following methods: allPortsReady
, processTuple
,
processPunctuation
.
Example of a minimal operator that forwards tuples and punctuations:
class Operator(streams.Operator):
def __init__(self, ctx, params):
# This must be done to ensure that the operator is initialized properly
super().__init__(ctx)
def __del__(self):
# Write any cleanup code here
pass
def allPortsReady(self):
pass
def processTuple(self, stream_tuple, port):
self.submitTuple(stream_tuple, port)
def processPunctuation(self, punctuation, port):
self.submitPunctuation(punctuation, port)
An operator instance starts its runtime lifecycle with a call to its
__init__
method. The operator cannot receive or send tuples and
punctuation until its ports are ready. When the operator ports are ready, the operator
starts receiving calls to its tuple and punctuation processing methods and can submit
tuples accordingly. The allPortsReady
method is called as a
notification to the operator that its ports are ready to receive and submit tuples. When
the processing element (PE) that hosts the operator is being shut down, the operator
receives a call to its __del__
method.
All incoming tuples are sent to the operator through the processTuple
method. SPL Tuples are converted to python dicts and mapped to native python types.
Outgoing tuples are converted from Python dicts to SPL tuples based on the tuple spec in the SPL application.
Here is an example of a processTuple
method in an operator that shows
how to read from incoming tuples and submit and outgoing tuple.
def processTuple(self, stream_tuple, port):
self.submitTuple({ "res": stream_tuple["a"] + stream_tuple["b"] })