Nested parallel regions
Nested user-defined parallelism (nested UDP) allows for parallel regions to contain other parallel regions in your Teracloud® Streams applications. If you change the width of a nested parallel region at run time, the change applies to all replicas of that nested region.
@parallel
annotation to either a primitive or a composite operator that itself is
inside of a composite operator invoked with the @parallel
annotation. For
example: composite Inner(input In; output C) {
graph
stream<int32 i> A = Functor(In) {}
stream<int32 i> B = Functor(A) {}
stream<int32 i> C = Functor(B) {}
}
composite Outer {
graph
stream<int32 i> Src = Beacon() {}
@parallel(width=3)
stream<int32 i> Op = Inner(Src) {}
stream<int32 i> Sink = Custom(Op) {}
}
composite Main {
graph
@parallel(width=2)
() as App = Outer() {}
}
The example of nested parallelism does not require understanding any new concepts that are not already present in parallel regions. However, when you need to refer to particular parallel channels in nested parallel regions, there are several new concepts.
First are the concepts of local channels and global channels. In the previous example there is a parallel region of width 3 nested within a parallel region of width 2. The nested (inner) parallel region, which by itself has three channels, is replicated twice when the outer parallel region is transformed, resulting in six actual channels within the inner parallel region. In order that the inner channels can be uniquely identified, they are given an index in the range of 0—5, inclusive, and this index is referred to as the global channel index. The local channel index does not consider any nesting and so, for the inner parallel region, the local channel index would be in the range of 0—2, inclusive. It follows that, for non-nested parallel regions, the local and global indexes are the same.
Global channel numbers are returned by the getChannel() function, and local channel numbers have a special function, getLocalChannel(). The global channel number, rather than the local channel number, is returned by the getChannel() function because the global number is more useful as a globally unique identifier. For example, a common pattern in parallel regions is for the operators to use their channel number to access an external resource. The global channel number enables this pattern.
As a consequence of the
getChannel() function returning the global channel number, global channel numbers
are also used in physical operator names. For example, the physical operator name,
Outer[1].Middle[5].Inner[11]
means that this operator is the twelfth replica of
Inner
across all parallel regions nests. It is inside the sixth replica of the
Middle
composite operator, which is inside of the second replica of the
Outer
composite operator.
Second is the concept of the closest enclosing parallel region. For any given operator in a parallel region, the closest enclosing parallel region would be the parallel region in which the operator is most immediately located.
Changing nested parallel regions at run time
You can change the parallel width at any level of a nested parallel region, but the change will apply to all parallel replicas at that level. You must use the logical name to refer to the parallel region, which means that you cannot specify individual replicas.
Physical operator names in nested parallel regions are based on the global channel numbers. For
example, consider a nested parallel region where the composite operator Foo
is
replicated with a width of 4, which contains the composite operator Bar
which is
replicated with a width of 2, which contains the primitive operator Op
which is
replicated with the width of 3. The physical operator Foo[3].Bar[7].Op[23]
is the
physical operator with the highest global channel number across all nesting levels. This replica of
Foo.Bar.Op
is the 23rd in the entire parallel region, across all nesting levels.
But when you change the parallel region width, you must use the logical name,
Foo.Bar.Op
:
streamtool updateoperators myjob --force --parallelRegionWidth Foo.Bar.Op=2
After this parallel region width change, the physical operator with the highest global channel
number across all nesting levels is now Foo[3].Bar[7].Op[15]
. Using the logical
operator name (such as Foo.Bar.Op
) to refer to nested parallel regions is not a
special case, but is actually an instance of the general case of referring to nested composite
operators.
Functions that control operator behavior within parallel regions
- int32 getChannel()
- Returns an
int32
which is the global index of the parallel channel.Note: This function returns -1 if the calling operator is not located in a parallel region. - int32 getMaxChannels()
- Returns an
int32
which is the global maximum number of parallel channels.Note: This function returns 0 if the calling operator is not located in a parallel region. - int32 getLocalChannel()
- Returns an
int32
which is the local index of the parallel channel in the closest enclosing parallel region.Note: This function returns -1 if the calling operator is not located in a parallel region. - int32 getLocalMaxChannels()
- Returns an
int32
which is the local number of parallel channels. This is the number of local channels in the closest enclosing parallel region.Note: This function returns 0 if the calling operator is not located in a parallel region. - list<int32> getAllChannels()
- Returns a list of
int32
values, where each element of the list is the global channel index that this operator is in. The 0th element of the list is the global channel index within the closest enclosing parallel region, which means that:getAllChannels()[0] == getChannel()
Note: This function returns an empty list if the calling operator is not located in a parallel region. - list<int32> getAllMaxChannels()
- Returns a list of
int32
values, where each element of the list is the global maximum number of parallel channels for the enclosing parallel regions. The 0th element is the global index for the closest enclosing parallel region, which means that:getAllMaxChannels()[0] == getMaxChannels()
Note: This function returns an empty list if it is not located in a parallel region.
partitionColocation
,
partitionExlocation
, hostColocation
and
hostExlocation
placement configs. - rstring byChannel()
- Indicates that operators within a channel are to be fused together. This function computes a
string from the global channel number and the name of the operator on which the placement config is
placed. Consider the following
example:
Due to the way configs propagate, all the operators that are part of the subgraph defined by the invocation oftype Data = int32 i; composite Par2(input I; output F1) { graph stream<Data> F1 = Functor(I) {} } composite Par1(input I; output P2) { graph stream<Data> P1 = Functor(I) {} @parallel(width=3) stream<Data> P2 = Par2(P1) {} } composite Main { graph stream<Data> Beat = Beacon() { } @parallel(width=2) stream<Data> P = Par1(Beat) { config placement: partitionColocation(byChannel()); } () as Out = FileSink(P) { param file: "out.dat"; } }
Par1
will see apartitionColocation
with the string“P_” + (string)getChannel()
. This ensures that one partition would contain all the operators from global channel 0 of the outer parallel region and global channels 0—2 of the inner parallel region and another partition would contain all the operators from global channel 1 of the outer parallel region and global channels 3—5 from the inner parallel region. - rstring byReplica()
- Indicates that replicated operators are fused together across channels. This function computes a
string from the name of the operator on which the placement config is placed. Consider the following
example:
In this example, all the operators defined by the subgraph defined by the invocation oftype Data = int32 i; composite Par2(input I; output F1) { graph stream<Data> F1 = Functor(I) {} } composite Par1(input I; output P2) { graph stream<Data> P1 = Functor(I) {} @parallel(width=3) stream<Data> P2 = Par2(P1) { config placement: partitionColocation(byReplica()); } } composite Main { graph stream<Data> Beat = Beacon() {} @parallel(width=2) stream<Data> P = Par1(Beat) {} () as Out = FileSink(P) { param file: "out.dat"; } }
Par2
would be in the same partition.
The @parallel
annotation also has an option for host pool replication. The host
pools specified in the annotation are replicated once for each channel in the region (leaving the
original pool unused). When replicated, their tags are extended with the pattern _i
where i
is a channel number. For example, if the tag "foo"
is a
replicated host tag, and it appears in a parallel region of size 3, the tags for each channel will
become "foo_0"
, "foo_1"
and "foo_2"
.
composite Inner(input In; output C) {
graph
stream<int32 i> A = Functor(In) {
config placement: host(innerHosts);
}
stream<int32 i> B = Functor(A) {
config placement: host(innerHosts);
}
stream<int32 i> C = Functor(B) {
config placement: host(innerHosts);
}
}
composite Outer {
graph
stream<int32 i> Src = Beacon() {}
@parallel(width=3, replicateHostTags=["innerHosts"])
stream<int32 i> Op = Inner(Src) {}
stream<int32 i> Sink = Custom(Op) {}
}
composite Main {
graph
@parallel(width=2)
() as App = Outer() {}
config hostPool: innerHosts = createPool({tags=["innerHosts"], size=1u}, Sys.Shared);
}
In the example, the generated host tags will have the form "innerHosts_i_j"
where i
is either 0, 1 or 2 (for the inner width of 3), and j
is 0
or 1 (for the outer width of 2).