Python operator lifecycle

Python operators have a runtime lifecycle that starts when an instance of the operator is initialized and ends when the processing element (PE) that hosts the operator is shut down.

Operators that are implemented with the Python Operator API are subclasses of streams.Operator and must have a two argument constructor taking ctx and params. They must also provide the following methods: allPortsReady, processTuple, processPunctuation.

Example of a minimal operator that forwards tuples and punctuations:

class Operator(streams.Operator):
    def __init__(self, ctx, params):
        # This must be done to ensure that the operator is initialized properly
        super().__init__(ctx)

    
    def __del__(self):
        # Write any cleanup code here
        pass

    def allPortsReady(self):
        pass

    def processTuple(self, stream_tuple, port):
        self.submitTuple(stream_tuple, port)
    
    def processPunctuation(self, punctuation, port):
        self.submitPunctuation(punctuation, port)

An operator instance starts its runtime lifecycle with a call to its __init__ method. The operator cannot receive or send tuples and punctuation until its ports are ready. When the operator ports are ready, the operator starts receiving calls to its tuple and punctuation processing methods and can submit tuples accordingly. The allPortsReady method is called as a notification to the operator that its ports are ready to receive and submit tuples. When the processing element (PE) that hosts the operator is being shut down, the operator receives a call to its __del__ method.

All incoming tuples are sent to the operator through the processTuple method. SPL Tuples are converted to python dicts and mapped to native python types.

Outgoing tuples are converted from Python dicts to SPL tuples based on the tuple spec in the SPL application.

Here is an example of a processTuple method in an operator that shows how to read from incoming tuples and submit and outgoing tuple.

    def processTuple(self, stream_tuple, port):
        self.submitTuple({ "res": stream_tuple["a"] + stream_tuple["b"] })