Implementing a source operator with the Python Operator API
A source operator fetches information or events from an external system, such as a sensor, messaging system, or a database, and presents them as a stream.
Due to the nature of a source operation, the fetching is done asynchronously in a
background thread before being submitted via submitTuple
.
Here is a simple web scrapping operator that fetches the HTML contents of a website and submits a tuple with the URL and text fields.
import streams
import requests
class Operator(streams.Operator):
def __init__(self, ctx, params):
super().__init__(ctx)
self.url = params.get("url", "example.com")
def allPortsReady(self):
self.createThreads(1) # create one thread for the source operator
def processTuple(self, tuple, port):
pass # this is a source operator
def processPunctuation(self, punctuation, port):
pass
def processThread(self, port):
while not self.getPE().getShutdownRequested():
response = requests.get(self.url)
response.raise_for_status()
self.submitTuple({ "url": self.url, "text": response.text })