Stream processing
Write, compile, and run an application that processes streams of an unknown or infinite length.
In this tutorial, you will:
- Write a stream application that:
- Reads the contents of a file specified at run time
- Prepends each line with an incrementing line number
- Writes the modified lines to another file
- Compile the application using the Streams Compiler (
sc) - Run the application
Before you begin
- Download and install Teracloud® Streams
- Open a Bash command terminal
- Open a text editor
Procedure
-
Set up your environment for Streams.
In your command terminal, source the streamsprofile.sh file under Streams installation directory. For example:
source streams-install-directory/bin/streamsprofile.sh -
Create a directory called numberedcat with a
data directory inside.
For example, use the following commands:
mkdir -p numberedcat/data cd numberedcat - Using your text editor, create a file named NumberedCat.spl in the numberedcat directory.
-
Paste the following code into your file and save it:
composite NumberedCat { graph stream<rstring contents> Lines = FileSource() { param format: line; file: getSubmissionTimeValue("file"); } stream<rstring modifiedcontents> Numbered = Functor(Lines) { logic state: mutable int32 i = 0; onTuple Lines: i++; output Numbered: modifiedcontents = (rstring)i + " " + contents; } () as Sink = FileSink(Numbered) { param file: "result.txt"; format: line; } }This SPL code does the following:
- Declares a main composite operator called
NumberedCat. - Starts a data flow graph with the
graphclause. - Invokes a
FileSourceoperator to produce a stream calledLineswhose tuples have one attribute,contents, of type rstring. The invocation is configured to:- Read one line at a time (
format: line) - From a file specified via a file parameter at
submission-time (
file: getSubmissionTimeValue("file"))
- Read one line at a time (
- Invokes a
Functoroperator which reads from streamLinesand produces a stream calledNumbered. The invocation is configured to:- Maintain state of a 32-byte integer stored in variable
istarting at 0. - Increment
iby one when a tuple on streamLinesarrives. - Outputs into stream
Numbereda tuple of the concatenation of the value ofias a string, a space, and the line contents.
- Maintain state of a 32-byte integer stored in variable
- Invokes a
FileSinkoperator which reads from streamNumbered. The invocation does not output any streams evident by the()and is namedSink.Sinkis configured to write to file result.txt.
- Declares a main composite operator called
- Using your text editor, create a file named cat.txt in the numberedcat/data directory.
-
Paste the following text into the file and save it:
The Unix utility "cat" is so called because it can con"cat"enate files. NumberedCat behaves like "cat -n", listing one file and numbering lines. -
Compile the code.
In the numberedcat directory, run the following command:
sc -M NumberedCat --data-directory dataThe command instructs the SPL compiler to create a stream application from the
NumberedCatmain composite operator and use the data directory as the root of relative paths for the application. -
Run the application:
./output/bin/standaloneThe program fails to run as we forgot to specify the required file parameter.
-
Rerun the application specifying which file to read:
./output/bin/standalone file="cat.txt"The program runs and exits without any console output. However, the program did create a file called result.txt in the numberedcat/data directory containing the numbered lines of cat.txt.