Python operator lifecycle
Python operators have a runtime lifecycle that starts when an instance of the operator is initialized and ends when the processing element (PE) that hosts the operator is shut down.
Operators that are implemented with the Python Operator API are subclasses of
streams.Operator and must have a two argument constructor taking
ctx and params. They must also provide the
following methods: allPortsReady, processTuple,
processPunctuation.
Example of a minimal operator that forwards tuples and punctuations:
class Operator(streams.Operator): def __init__(self, ctx, params): # This must be done to ensure that the operator is initialized properly super().__init__(ctx) def __del__(self): # Write any cleanup code here pass def allPortsReady(self): pass def processTuple(self, stream_tuple, port): self.submitTuple(stream_tuple, port) def processPunctuation(self, punctuation, port): self.submitPunctuation(punctuation, port)
An operator instance starts its runtime lifecycle with a call to its
__init__ method. The operator cannot receive or send tuples and
punctuation until its ports are ready. When the operator ports are ready, the operator
starts receiving calls to its tuple and punctuation processing methods and can submit
tuples accordingly. The allPortsReady method is called as a
notification to the operator that its ports are ready to receive and submit tuples. When
the processing element (PE) that hosts the operator is being shut down, the operator
receives a call to its __del__ method.
All incoming tuples are sent to the operator through the processTuple
method. SPL Tuples are converted to python dicts and mapped to native python types.
Outgoing tuples are converted from Python dicts to SPL tuples based on the tuple spec in the SPL application.
Here is an example of a processTuple method in an operator that shows
how to read from incoming tuples and submit and outgoing tuple.
def processTuple(self, stream_tuple, port):
self.submitTuple({ "res": stream_tuple["a"] + stream_tuple["b"] })