Implementing a source operator with the Python Operator API

A source operator fetches information or events from an external system, such as a sensor, messaging system, or a database, and presents them as a stream.

Due to the nature of a source operation, the fetching is done asynchronously in a background thread before being submitted via submitTuple.

Here is a simple web scrapping operator that fetches the HTML contents of a website and submits a tuple with the URL and text fields.

import streams
import requests

class Operator(streams.Operator):
    def __init__(self, ctx, params):
        super().__init__(ctx)
        self.url = params.get("url", "example.com")
        

    def allPortsReady(self):
        self.createThreads(1) # create one thread for the source operator
    
    def processTuple(self, tuple, port):
        pass # this is a source operator

    def processPunctuation(self, punctuation, port):
        pass

    def processThread(self, port):
        while not self.getPE().getShutdownRequested():
           response = requests.get(self.url)
           response.raise_for_status()
           self.submitTuple({ "url": self.url, "text": response.text })