Stream processing
As the name implies, SPL is a language for processing data streams.
The HelloWorld
example from the previous section hardly qualifies as
stream processing, since there was only a single stream with a single tuple in that program.
Now, let's see a more idiomatic example that processes streams of an unknown or infinite
length, by using a graph of operator invocations that have pipeline parallelism. The purpose
of the program is to list a file, prefix each line with a line number, and write the result
to another file. It accomplishes this purpose with the following stream graph:

A stream is a (possibly infinite) sequence of tuples; in the example,
Lines
and Numbered
are streams. A tuple is
a data item on a stream. In the example, the stream Lines
transports one
tuple for each line in the input file. An operator is a reusable stream
transformer: each operator invocation transforms some input streams into some output
streams. The place where a stream connects to an operator is called a port.
Many operators have one input port and one output port (like Functor in
the example), but operators can also have zero input ports (FileSource),
zero output ports (FileSink), or multiple input or output ports (which is
shown in later examples).
But back to the line-numbering program. It is called NumberedCat
as a
homage to the UNIX™
cat utility that, given the right command-line options, performs the same
task. Here is the code:
composite NumberedCat {
graph
stream<rstring contents> Lines = FileSource() {
param format : line;
file : getSubmissionTimeValue("file");
}
stream<rstring contents> Numbered = Functor(Lines) {
logic state : mutable int32 i = 0;
onTuple Lines : i++;
output Numbered : contents = (rstring)i + " " + contents;
}
() as Sink = FileSink(Numbered) {
param file : "result.txt";
format : line;
}
}
Like in the previous example, there is a composite operator definition with a graph clause
that contains operator invocations. The invocation of FileSource in Lines
3-6 reads one line at a time (param format : line
), from a file that is
specified at submission-time (param file : getSubmissionTimeValue("file")
).
Later, how to supply the file name at submission time is shown. The invocation of
Functor in Lines 7-11 maintains a state variable mutable int32
i = 0
, which it increments each time that a tuple arrives (onTuple Lines
: i++
). SPL variables are immutable by default, so without the
mutable modifier, the compiler prevents it from incrementing
i++
. The output clause output Numbered : contents = (rstring)i +
" " + contents
assigns the contents
attribute of the output
stream by casting the line number i
to a string
(rstring)i
, and concatenating it with the contents
attribute of the input stream. As the example shows, an output clause has assignments where
the left side is an attribute of the output stream, whereas attribute names in the right
side belong to input streams. Finally, the invocation of FileSink on
Lines 12-15 writes the results to a file named result.txt.
Try out the following procedure. Create a directory that is called NumberedCat and also make a subdirectory NumberedCat/data. Put the example program in a file NumberedCat/NumberedCat.spl. Put the following text in a file NumberedCat/data/catFood.txt:
The Unix utility "cat" is so called
because it can con"cat"enate files.
Our program behaves like "cat -n",
listing one file and numbering lines.
Compile it to a stand-alone executable file with the following command:
sc --data-directory data -M NumberedCat
When the program runs, you must supply the input file name as a submission-time value. The FileSource operator expects a file name that is relative to the NumberedCat/data directory. Therefore, run the program with the following:
./output/bin/standalone file="catFood.txt"
Look at the NumberedCat/data directory. If everything went fine, then the program created a file that is called result.txt that contains the numbered lines of catFood.txt.
Running the application in a distributed environment
sc --data-directory data -M NumberedCat # compile
streamtool mkdomain -d streamsdomain --embeddedzk # make a domain
streamtool startdomain -d streamsdomain --embeddedzk # start the domain
streamtool genkey -d streamsdomain --embeddedzk # generate keys
streamtool mkinstance -i streams -d streamsdomain --embeddedzk # make an instance
streamtool startinstance -i streams -d streamsdomain --embeddedzk # start the runtime instance
# submit the job
streamtool submitjob -i streams -d streamsdomain --embeddedzk -P file=catFood.txt output/NumberedCat.sab
streamtool lsjobs -i streams -d streamsdomain --embeddedzk # list running jobs
# wait until data/result.txt contains the numbered lines of data/catFood.txt
streamtool canceljob -i streams -d streamsdomain --embeddedzk 0 # cancel the job
streamtool stopinstance -i streams -d streamsdomain --embeddedzk # stop the runtime instance
streamtool rminstance -i streams -d streamsdomain --embeddedzk # remove the runtime instance
streamtool stopdomain -d streamsdomain --embeddedzk # stop the domain
streamtool rmdomain -d streamsdomain --embeddedzk # remove the domain
If everything went well, this action accomplished the same result as running the program
stand-alone. If anything went wrong, consult your system administrator, or try to diagnose
the problem yourself by using the streamtool getlog
or streamtool
viewlog
commands. As mentioned before, the best way to learn a language is to
write and run programs in it, so now is a good time to ensure that you have the right setup
to do that. Note how the streamtool submitjob
command accepts
submission-time values with the -P option, and uses the application bundle file to figure out which operators to submit.
The previous description illustrated SPL as a streaming language, and gave you a taste for
how to run programs on an instance of the Teracloud®
Streams distributed run time. You saw three SPL standard toolkit operators FileSource, Functor, and
FileSink. To learn more about working with the distributed run time,
type streamtool man
, which contains a plethora of information about
commands like submitjob
and family.