Operator Beacon

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.utility$Beacon.svg

The Beacon operator is a utility source that generates tuples on an ad hoc basis.

Checkpointed data

When the Beacon operator is checkpointed, its iteration count value and logic state variables (if present) are saved.

Behavior in a consistent region

The Beacon operator can be the start operator of a consistent region. The operator checkpoints and restores its iteration count value and logic state variables (if any), as needed to maintain consistency. If a stateful function is used to assign values to tuples, the replayed value of a tuple might not be the same as the original value assigned to that same logical tuple. For example, if a random value is assigned to a tuple, the replayed value can differ from the original.

Checkpointing behavior in an autonomous region

When the Beacon operator is in an autonomous region and configured with a config checkpoint : periodic(T) clause, the operator state is checkpointed every T seconds, asynchronously to the normal tuple processing. Upon restart, the operator restores its state from the last checkpoint.

When the Beacon operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoints are taken. Upon restart, the operator restores its initial state.

Such checkpointing behavior is subject to change in the future.

Examples

This example uses the Beacon operator.


composite Main {                                                           
  graph                                                                    
    // with no options                                                     
    stream<rstring name, uint32 age> Beat1 = Beacon() {}                   
    // with iterations                                                     
    stream<rstring name, uint32 age> Beat2 = Beacon()                      
    {                                                                      
      param                                                                
        iterations : 10u;                                                  
    }                                                                      
    // with iterations and period                                          
    stream<rstring name, uint32 age> Beat3 = Beacon()                      
    {                                                                      
      param                                                                
        period     : 0.2;                                                  
        iterations : 10u;                                                  
    }                                                                      
    // with output assignments and an initial delay of 5 seconds           
    stream<rstring name, uint32 age> Beat4 = Beacon()                      
    {                                                                      
      param                                                                
        period : 0.2;                                                      
        initDelay: 5.0;                                                    
      output                                                               
        Beat4 : name = "SPL"+(rstring)(random()*10.0),                     
               age = (uint32)(random()*40.0);                              
    }                                                                      
}    

Summary

Ports
This operator has 0 input ports and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 4 parameters.

Optional: initDelay, iterations, period, triggerCount

Metrics
This operator does not report any metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes.
Output Functions
Functions
<any T> T AsIs(T)

Passthrough function.

public uint64 IterationCount()

Returns the number of tuples that have been emitted so far. This function is used to generate sequences of data with different values. The initial value is 0.

For example:


submit {a=0, b=1}, {a=2, b=3}, ...

stream<int32 a, int32 b> A = Beacon() {
   param iterations : 1000;
   output A : a = (int32) (IterationCount()*2ul), b = (int32) (IterationCount()*2ul+1ul);
}

Ports (0)

The Beacon operator is configurable with a single output port. The Beacon operator outputs a window marker punctuation when it finishes generating tuples.

Properties

Parameters

This operator supports 4 parameters.

Optional: initDelay, iterations, period, triggerCount

initDelay

Specifies the number of seconds that the operator delays before starting to produce tuples.

Properties

iterations

Specifies the number of tuples to be produced by the Beacon operator. It takes a single expression of type uint32 or int32 as its value, which is evaluated as uint32. When the parameter is not specified, the operator produces tuples until the application is shut down.

Properties

period

Specifies the time interval between successive tuple submissions, in seconds. When the parameter is not specified, the default value is 0.0.

Properties

triggerCount

Specifies how many tuples are submitted before the operator starts to drain the pipeline of a consistent region and establish a consistent state. If the Beacon operator is the trigger operator of a consistent region, this parameter must be set and must be greater than zero.

Properties

Code Templates

Beacon

stream <${schema}> ${outputStream} = Beacon() {
            logic
                state:  mutable ${stateVariable};
            param
                period: ${period};
            output
                ${outputStream}:  ${outputExpression};
        }
      

Beacon As Counter

stream <int32 counter> ${outputStream} as O = Beacon() {
            logic
                state: mutable int32 i = 0;
            param
                iterations: ${numIterations};
                period: ${period};
            output
                O:  counter = i++;
        }
      

Beacon As Trigger

stream<int32 i> ${beaconOutStream} = Beacon()  {
            param iterations: 1;
        }

        stream<${schema}> ${outputStream} = Custom(${beaconOutStream} as I)  {
            logic
                onTuple I: {
                    while (!isShutdown()) {
                        // Add code here
                        ${cursor}
                    }
                }
        }
      

Beacon as a start operator of an operator-driven consistent region


            @consistent(trigger=operatorDriven)
            stream <int32 itCount> Beat = Beacon() {
              param
                triggerCount: 10u;
              output Beat: itCount = (int32) IterationCount();
            }