Implementing tuple load balancing
A ThreadedSplit operator can be used to split a stream and process tuples in parallel. This method works for many applications. If the processing time of a tuple varies considerably depending on the tuple data, however, it might cause problems when a tuple with a long processing time causes subsequent tuples to be backed up in the stream that is waiting for processing. This problem can occur even though there might be another thread available and idle. This problem can be aggravated by tuples that are in a TCP/IP connection to another processing element (PE).
To ensure that the load is balanced, a ThreadedSplit operator with a buffer size of 1 can be tied to two or more Gate operators with a maximum unacknowledged tuple count of 1 or more. The ThreadedSplit and the Gate operators must be on the same PE to avoid tuples queued between PEs. To put operators on the same PE, use a partitionColocation placement config.
The Gate operator allows only the specified number of tuples to pass at a time. It waits until a subsequent operator acknowledges receipt of the tuple before it passes the next tuple. In this manner, no tuple can ever be queued behind another, waiting to be processed.
The following example demonstrates balancing the load:
composite Main {
graph
// Generate a stream of data to process
stream<uint32 i> I = Beacon(){
logic
state : mutable uint32 c = 0 ;
output
I : i = c ++ ;
}
// Split the stream into 2 streams. Use a following Gate to ensure load balancing
(stream<I> X ; stream<I> Y)= ThreadedSplit(I){
param
bufferSize : 1u ;
config
placement : partitionColocation("Split"), // ensure same PE as the Gates
partitionExlocation("Process");
}
stream<I> O0 = Gate(X ; Control0){
param
maxUnackedTupleCount : 1u ;
config
placement : partitionColocation("Split");
// ensure same PE as ThreadedSplit
}
stream<I> O1 = Gate(Y ; Control1){
param
maxUnackedTupleCount : 1u ;
config
placement : partitionColocation("Split");
// ensure same PE as ThreadedSplit
}
(stream<I> R0 as out ; stream<uint32 i> Control0 as control)= Custom(O0 as In){
logic
onTuple In : {
// do some processing
submit(In, out); // forward tuple
submit({ i = 1u }, control);
}
// Place on a different PE from Gate or other processing operator
config
placement : partitionExlocation("Process");
}
(stream<I> R1 as out ; stream<uint32 i> Control1 as control)= Custom(O1 as In){
logic
onTuple In : {
// do some processing
submit(In, out); // forward tuple
submit({ i = 1u }, control);
}
// Place on a different PE from Gate and other processing operator
config
placement : partitionExlocation("Process");
}
}