Operator granularity
To design effective stream applications, operators need to be sized appropriately. Effective operators perform a single task.
Writing operators that accomplish one task makes designing stream applications easier. The applications are easier to read, and there is more potential to reuse those operators in other applications. However, programmers who are new to the streams programming model might not be able to recognize what is the best operator granularity. Programmers who are used to writing applications in languages such as C++ and Java™ might be tempted to write stream applications with large operators that perform multiple conceptual tasks that use many expressions. Well-designed stream applications tend to be more declarative (say what you want) rather than imperative (say how to do it).
Consider an application that reads telephone records from a source. Individual records are on separate lines of text, and in the form "<name> <source number> <destination number>". Assume that the data is read from a file called calls.txt. The task is to convert this string of text (unstructured data) into tuples (structured data), and to filter out all numbers that come from the 540 area code.
Consider this first attempt:
01: composite Main {
02: type CDR = rstring name, rstring src, rstring dest;
03: graph
04: stream<rstring line> RawLine = FileSource() {
05: param
06: file: "calls.txt";
07: format: line;
08: }
09:
10: stream<CDR> Processed = Custom(RawLine) {
11: logic
12: onTuple RawLine: {
13: list<rstring> tokens = tokenize(line, " ", false);
14: if (findFirst(tokens[1], "540")) == 0) {
15: submit({name = tokens[0], src = tokens[1], dest = tokens[2]});
16: }
17: }
18: }
19: }
This application defines a tuple type named CDR (call detail record). This type uses strings to represent names, and source and destination phone numbers (line 2). Next, the FileSource operator from the SPL Standard Toolkit reads lines from the data file (lines 4-8). The Custom operator then parses a line (lines 10-18). The parsing is implemented with the tokenize() function from the Standard Library (line 13), which accepts a string and a pattern, and returns a list of strings that were separated by that pattern. The string-list tokens contains the name as its 0th item, the source number as its first item, and the destination number as its second item (line 13). The Standard Library function findFirst() takes a string to search as its first parameter, and a substring to search for as its second parameter. It returns the index of the first location if the substring is found within the first string parameter, or -1 if the substring is not found. In this case, if 540 is the area code, then it must appear at index 0. A check is needed to see whether the result of the call to findFirst() is equal to 0, and then those tuples can be submitted (lines 14-16). This SPL application correctly solves the stated problem, but it is poor design for several reasons.
First, it is not obvious that there is a filter condition inside of the Custom operator. Semantically, a call to the submit() function that is protected by an if-statement is the same as a filter. However, a reader must inspect the particular logic of the Custom operator to discern this fact. This approach is not declarative. Instead of specifying how to filter tuples, it is better to state what the filter conditions can be. This Custom operator also performs two tasks: it parses a line and it filters tuples. This approach violates the principle that operators perform one task.
Second, the approach limits reuse in two ways. The logic in the Custom operator both parses and filters. Parsers, operators that convert a raw data format to SPL tuples, are a common operator pattern in stream applications. They are also a particularly useful kind of operator that can be reused in many applications. But, when logic specific to one application is combined with the parsing itself, that operator is less useful in other applications. There is also no longer an output stream with all of the CDRs; the resultant output stream only contains CDRs from the area code 540. By separating parsing and filtering, the output stream from the parsing operator has all CDRs. It is then easier to write operators in the same application that process all CDRs.
Performance is another consideration. streams application are able to obtain high throughput through pipeline parallelism, where two separate operators can process independent tuples in parallel. As soon as each tuple is parsed, it is sent to the next operator where the filtering can start. While the next operator is filtering, the first operator can start parsing the next tuple. In more complex examples, this design can lead to better throughput since the two operations can occur in parallel. Combined operators cannot benefit from pipeline parallelism.
The reason that the prior application is poor design from a performance perspective is that it only can be two combined operators. Pipeline parallelism is not possible in this application because the two tasks were hardcoded into one large operator. Sometimes, combining two operators does result in better performance. For those instances, the best method is to request operator fusion through the partitionColocation requirement that is part of the placement configuration option.
Without knowing more about the kinds of calls that the application processes, whether fusion or pipeline parallelism yields higher throughput cannot be predicted. If most of the calls are from the 540 area code, then pipeline parallelism most likely yields better performance. However, if only a small fraction of the calls come from the 540 area code, then fusion probably yields better performance because it avoids sending unnecessary data downstream. Therefore, it is better to write two separate operators, and then use configuration options on those operators to tune their performance.
The stream application in the next example exhibits appropriate operator granularity. The tuples are filtered, as represented by the use of the Filter operator (lines 18-20). This design is also more declarative; it provides what the filter condition is, but it does not dictate how to apply it. The flow of the application is now accurately represented by the stream graph. Calls are read from a file that is represented as lines of text, each line is parsed into a tuple, and calls not from the 540 area code are filtered out. Finally, by using the placement configuration option, developers can choose to fuse the operators to avoid sending unnecessary data, or to force them to operate separately to best use pipeline parallelism. Consider this revision, which uses two operators:
01: composite Main {
02: type CDR = rstring name, rstring src, rstring dest;
03: graph
04: stream<rstring line> RawLine = FileSource() {
05: param
06: file: "test.txt";
07: format: line;
08: }
09:
10: stream<CDR> Processed = Custom(RawLine) {
11: logic
12: onTuple RawLine: {
13: list<rstring> tokens = tokenize(line, " ", false);
14: submit({name = tokens[0], src = tokens[1], dest = tokens[2]});
15: }
16: }
17:
18: stream<CDR> Filtered = Filter(Processed) {
19: param filter: findFirst(src, "540") == 0;
20: }
21: }
There are circumstances where integrating multiple tasks in one operator is the best choice. A general principle in streaming applications is to filter data as soon as possible. While fusing two operators can reduce communication costs, the operators still must create tuples. In the first example, tuples that are not needed are not even created. If the size of the tuples is large, and many are filtered, the code in the first example can significantly outperform even a fused version of the second example. However, code such optimizations that obscure code intent and restrict code reuse only after discerning that there is a significant performance benefit.