Stream processing
Write, compile, and run an application that processes streams of an unknown or infinite length.
In this tutorial, you will:
- Write a stream application that:
- Reads the contents of a file specified at run time
- Prepends each line with an incrementing line number
- Writes the modified lines to another file
- Compile the application using the Streams Compiler (
sc
) - Run the application
Before you begin
- Download and install Teracloud® Streams
- Open a Bash command terminal
- Open a text editor
Procedure
-
Set up your environment for Streams.
In your command terminal, source the streamsprofile.sh file under Streams installation directory. For example:
source streams-install-directory/bin/streamsprofile.sh
-
Create a directory called numberedcat with a
data directory inside.
For example, use the following commands:
mkdir -p numberedcat/data cd numberedcat
- Using your text editor, create a file named NumberedCat.spl in the numberedcat directory.
-
Paste the following code into your file and save it:
composite NumberedCat { graph stream<rstring contents> Lines = FileSource() { param format: line; file: getSubmissionTimeValue("file"); } stream<rstring modifiedcontents> Numbered = Functor(Lines) { logic state: mutable int32 i = 0; onTuple Lines: i++; output Numbered: modifiedcontents = (rstring)i + " " + contents; } () as Sink = FileSink(Numbered) { param file: "result.txt"; format: line; } }
This SPL code does the following:
- Declares a main composite operator called
NumberedCat
. - Starts a data flow graph with the
graph
clause. - Invokes a
FileSource
operator to produce a stream calledLines
whose tuples have one attribute,contents
, of type rstring. The invocation is configured to:- Read one line at a time (
format: line
) - From a file specified via a file parameter at
submission-time (
file: getSubmissionTimeValue("file")
)
- Read one line at a time (
- Invokes a
Functor
operator which reads from streamLines
and produces a stream calledNumbered
. The invocation is configured to:- Maintain state of a 32-byte integer stored in variable
i
starting at 0. - Increment
i
by one when a tuple on streamLines
arrives. - Outputs into stream
Numbered
a tuple of the concatenation of the value ofi
as a string, a space, and the line contents.
- Maintain state of a 32-byte integer stored in variable
- Invokes a
FileSink
operator which reads from streamNumbered
. The invocation does not output any streams evident by the()
and is namedSink
.Sink
is configured to write to file result.txt.
- Declares a main composite operator called
- Using your text editor, create a file named cat.txt in the numberedcat/data directory.
-
Paste the following text into the file and save it:
The Unix utility "cat" is so called because it can con"cat"enate files. NumberedCat behaves like "cat -n", listing one file and numbering lines.
-
Compile the code.
In the numberedcat directory, run the following command:
sc -M NumberedCat --data-directory data
The command instructs the SPL compiler to create a stream application from the
NumberedCat
main composite operator and use the data directory as the root of relative paths for the application. -
Run the application:
./output/bin/standalone
The program fails to run as we forgot to specify the required file parameter.
-
Rerun the application specifying which file to read:
./output/bin/standalone file="cat.txt"
The program runs and exits without any console output. However, the program did create a file called result.txt in the numberedcat/data directory containing the numbered lines of cat.txt.