Stream processing

Write, compile, and run an application that processes streams of an unknown or infinite length.

In this tutorial, you will:

  1. Write a stream application that:
    1. Reads the contents of a file specified at run time
    2. Prepends each line with an incrementing line number
    3. Writes the modified lines to another file
  2. Compile the application using the Streams Compiler (sc)
  3. Run the application

Before you begin

Procedure

  1. Set up your environment for Streams.

    In your command terminal, source the streamsprofile.sh file under Streams installation directory. For example:

    source streams-install-directory/bin/streamsprofile.sh
  2. Create a directory called numberedcat with a data directory inside.
    For example, use the following commands:
    mkdir -p numberedcat/data
    cd numberedcat
  3. Using your text editor, create a file named NumberedCat.spl in the numberedcat directory.
  4. Paste the following code into your file and save it:
    composite NumberedCat {
      graph
        stream<rstring contents> Lines = FileSource() {
          param  
            format: line;
            file:   getSubmissionTimeValue("file");
        }
        stream<rstring modifiedcontents> Numbered = Functor(Lines) {
          logic
            state: mutable int32 i = 0;
            onTuple Lines: i++;
          output
            Numbered: modifiedcontents = (rstring)i + " " + contents;
        }
        () as Sink = FileSink(Numbered) {
          param
            file:   "result.txt";
            format: line;
        }
    }

    This SPL code does the following:

    1. Declares a main composite operator called NumberedCat.
    2. Starts a data flow graph with the graph clause.
    3. Invokes a FileSource operator to produce a stream called Lines whose tuples have one attribute, contents, of type rstring. The invocation is configured to:
      1. Read one line at a time (format: line)
      2. From a file specified via a file parameter at submission-time (file: getSubmissionTimeValue("file"))
    4. Invokes a Functor operator which reads from stream Lines and produces a stream called Numbered. The invocation is configured to:
      1. Maintain state of a 32-byte integer stored in variable i starting at 0.
      2. Increment i by one when a tuple on stream Lines arrives.
      3. Outputs into stream Numbered a tuple of the concatenation of the value of i as a string, a space, and the line contents.
    5. Invokes a FileSink operator which reads from stream Numbered. The invocation does not output any streams evident by the () and is named Sink. Sink is configured to write to file result.txt.
  5. Using your text editor, create a file named cat.txt in the numberedcat/data directory.
  6. Paste the following text into the file and save it:
        The Unix utility "cat" is so called 
        because it can con"cat"enate files.
        NumberedCat behaves like "cat -n",
        listing one file and numbering lines.
  7. Compile the code.

    In the numberedcat directory, run the following command:

    sc -M NumberedCat --data-directory data 

    The command instructs the SPL compiler to create a stream application from the NumberedCat main composite operator and use the data directory as the root of relative paths for the application.

  8. Run the application:
    ./output/bin/standalone

    The program fails to run as we forgot to specify the required file parameter.

  9. Rerun the application specifying which file to read:
    ./output/bin/standalone file="cat.txt"

    The program runs and exits without any console output. However, the program did create a file called result.txt in the numberedcat/data directory containing the numbered lines of cat.txt.

What to do next

Continue to the Running distributed tutorial to run the "NumberedCat" application in a distributed environment.