Implementing an operator using the Python Operator API
An operator processes incoming tuples and submits outgoing tuples. The relationship between an input tuple and the submitted tuples varies according to the function of the operator.
For example, a filtering operator can submit a subset of the incoming tuples without modifying individual tuples. Another operator might submit a tuple to an output port that is derived from an input tuple but not a direct copy, such as enriching it or taking a subset of the input tuple's attributes.
Filtering Operator
import streams
class Operator(streams.Operator):
def __init__(self, ctx, params):
super().__init__(ctx)
def __del__(self):
pass
def allPortsReady(self):
pass
def processTuple(self, stream_tuple, port):
if (stream_tuple["reading"] > 0.8):
self.submitTuple(stream_tuple, port)
def processPunctuation(self, punctuation, port):
self.submitPunctuation(punctuation, port)
Multi threaded example of using an external library (Ollama)
# This is a sample Python operator that uses the Ollama library to interact with an LLM service
#
# It expects a tuple with the following attributes:
# - messages: a JSON string with a list of messages as per Ollama's API (rstring)
#
# It emits the following tuple format when the LLM replies
# - reply: the reply from the LLM (rstring)
#
# This file is placed in the impl/bin directory of the application and packaged in the sab by
# the compiler. It is then executed by the SPL runtime when the operator is invoked
# as specified in the SPL file, in the 'module' parameter of the PythonOp invocation
# All Python Operators must import the streams module
import streams
# Other imports required for the functionality of your operator
import threading
import queue
import json
import os
# Import an external library
# These are currently resolved based on the system installed packages
# venvs are not supported for now
from ollama import Client
# Each operator module must contain a class named Operator that extends streams.Operator
class Operator(streams.Operator):
# The constructor takes the context object and a params dictionary
def __init__(self, ctx, params):
# The constructor of the parent class must be called with the context object
super().__init__(ctx)
# A part of the C++ Streams api is available and mapped to python
self.data_directory = self.getPE().getDataDirectory()
self.tmp_directory = os.path.join(self.data_directory, "tmp")
self.llm = Client(params.get("ollama_url", "127.0.0.1")) # Defaults to localhost
self.llm_model = params.get("llm_model", "gemma2:27b")
self.llm_cv = threading.Condition()
self.pending_messages = queue.Queue()
# The allPortsReady method is called once all the input ports are connected
# It is required to provide the implementation for this method
#
# Multiple threads are supported, however only a single thread runs a time due to the Global Interpreter Lock in Python
# Still, most libraries release the GIL when doing heavy processing and thus can run in parallel
#
# Also note that care needs to be taken as the streams threading model applies to python operators
# And functions may be called from different threads and/or concurrently depending on the operator graph and placement
def allPortsReady(self):
self.createThreads(1)
# The processTuple method is called whenever a tuple arrives
# It is required to provide the implementation for this method
def processTuple(self, tuple, port):
with self.llm_cv:
self.pending_messages.put(tuple)
self.llm_cv.notify()
# The processPunctuation method is called whenever a punctuation arrives
# It is required to provide the implementation for this method
def processPunctuation(self, punctuation, port):
pass
# This is the entrypoint of threads created by createThreads
#
# This method is required only if createThreads is called
#
# Threads need to pool getShutdownRequested and exit when it returns True
# Otherwise the SPL application will not exit gracefully
def processThread(self, thread_id):
while not self.getPE().getShutdownRequested():
while not self.pending_messages.empty() and not self.getPE().getShutdownRequested():
try:
tuple = self.pending_messages.get()
# print("LLM: Sending message", message)
reply = self.llm.chat(model=self.llm_model, messages=json.loads(tuple["messages"]))["message"]["content"]
self.submitTuple({"reply": reply})
except Exception as e:
print("llm:", e)
self.submitTuple({"n": 0}, 1) # signal error
with self.llm_cv:
# Using a timed wait here to give a chance to the operator to exit
self.llm_cv.wait(1)