Forwarding final punctuation

By default, the Teracloud® Streams instance automatically forwards final punctuation to all the output ports of an operator after a final punctuation marker is delivered to all input ports.

The run time does not deliver a final punctuation marker to an input port until final punctuation from all the streams that are connected to that input port are received. When a final punctuation marker is received on an input port, it means that no more tuples or punctuation can be received from that port. When a final punctuation marker is sent to an output port, it means that all future tuple submissions on that port will be ignored.

If one of the input streams of an operator that is associated with a port does not carry final punctuation, the Teracloud® Streams instance does not deliver a final punctuation marker to that input port. Thus, the forwarding of the final punctuation does not take place and the operator never sees a final punctuation marker from that port. Examples of streams that do not carry final punctuation are streams that are generated by Import operators.

Developers can change the default final punctuation forwarding behavior by changing the operator model. By using the tag <finalPunctuationPortScope>, developers can specify which input ports are used to forward final punctuation on a per output port basis.

Although final punctuation are automatically forwarded by the run time, developers of primitive operators might continue to take specific actions when a port receives them. These actions are specified in the process member function for punctuation. Final punctuation is forwarded only after all input ports receive final punctuation and the process method calls for the last final punctuation returns. Taking specialized actions on the delivery of final punctuation is especially useful when the operator buffers data and needs to send out tuples that are based on this data before it closes its output ports. This action must happen before it returns from the process member function.

Consider this example that shows a stream application that invokes a primitive operator named sample::Uniq (line 09). This operator is similar to the UNIX uniq command and drops all but one of the incoming tuples that are identical and consecutive.

01: namespace example;
02:
03: composite Main {
04:   graph
05:     stream <rstring line> Lines = FileSource() { 
06:       param file : "input.dat"; 
07:     } 
08: 
09:     stream <rstring line> UniqueLines = sample::Uniq() {}
10: } 

The next two examples show the C++ primitive operator templates for the sample::Uniq operator. The first example shows the C++ header template. The example includes the methods to process tuples (line 09) and punctuation (line 10), and a method to flush the latest incoming line (line 11).

01: #pragma SPL_NON_GENERIC_OPERATOR_HEADER_PROLOGUE
02:
03: class MY_OPERATOR : public MY_BASE_OPERATOR
04: {
05: public:
06:   MY_OPERATOR();
07:   ~MY_OPERATOR() {};
08:
09:   void process(Tuple const & tuple, uint32_t port);
10:   void process(Punctuation const &punct, uint32_t port); 
11:   void flushLatestLine(); 
12: private: 
13:   bool _firstTime; 
14:   IPort0Type _latest; 
15:   Mutex _mutex; 
16: }; 
17: 
18: #pragma SPL_NON_GENERIC_OPERATOR_HEADER_EPILOGUE

This second example shows the C++ implementation template. When it receives a tuple, the operator evaluates if the incoming tuple is equal to the previous tuple (line 21). When the tuple value is different, the previous value is sent out and the latest value is updated (lines 22-23). When the tuple value is the same, the tuple is discarded. Regarding final punctuation, sample::Uniq uses the default configuration. When a final punctuation marker is received from its input port, the punctuation is automatically forwarded. Before the operator forwards the final punctuation, it flushes its buffered contents, the latest line it received in its input port (line 32).

01: #pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTION_PROLOGUE
02:
03: MY_OPERATOR::MY_OPERATOR() 
04:   : _firstTime(true)
05: {
06: }
07:
08: void MY_OPERATOR::flushLatestLine()
09: {
10:   submit(_latest,0);
11: }
12:
13: void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)
14: {
15:   AutoPortMutex am(_mutex);
16:   if (_firstTime) {
17:     _latest = static_cast<IPort0Type>(tuple); 
18:     _firstTime = false; 
19:     return; 
20:   }  
21:   if (!tuple.equals(_latest)) { 
22:     submit(_latest, 0); 
23:     _latest = static_cast<IPort0Type>(tuple); 
24:   } 
25: } 
26: 
27: void MY_OPERATOR::process(Punctuation const & punct, uint32_t port) 
28: { 
29:   AutoPortMutex am(_mutex); 
30:  
31:   if(punct == Punctuation::FinalMarker) 
32:     flushLatestLine(); 
33: } 
34: 
35: #pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTION_EPILOGUE

Since this operator has one input port, the process member function for punctuation is invoked only once. If an operator has more than one input port, the function might need to track the number of final punctuation markers that are received before it flushes out its buffers. Any routine in the operator that generates output tuples must be completed before the process call returns (if the operator runs threads that might generate output, process must wait until all threads are finished). This critical process ensures that the operator does not drop data items by submitting tuples to closed ports.