Parallel transformations and fusion
The parallel transformation has two primary purposes: replicating operators, and replicating streams. If the width of a parallel region is N, then each logical operator in the parallel region will become N physical operators. Physical operators that map back to the same logical operator are called sibling operators. Sibling operators are identical except for their names and indexes.
All streams inside of a parallel region must also be replicated. Streams between replicated operators are replicated such that they maintain the same connections as the logical version of the application. Streams that go into and out of parallel regions are also replicated. Every logical stream going into a parallel region will be replicated N times, and the operator port that produced it will now have N outgoing physical streams in place of the logical stream. A splitter on the operator port will control the N new streams. Every stream going out of a parallel region will also be replicated N times, and they will all converge on the same operator input port that consumed the original logical stream.
Because of the replication rules for operators and streams, directly adjacent parallel regions
will have a shuffle between them. Directly adjacent parallel regions are distinct
parallel regions (generated by separate uses of @parallel
) with streams that go
directly from one parallel region into another. In such a case, the operator port with a splitter is
itself in a parallel region, and it will also be replicated. If the upstream parallel region has
N channels, and the downstream parallel region has M channels,
then the upstream parallel region will end with N splitters that each have
streams connecting to all M parallel channels in the downstream parallel
region.
Following the parallel transformation, operators are fused into PEs. How operators are fused
depends on a number of factors. These include the use of the placement configs
partitionIsolation
, partitionColocation
and
partitionExlocation
and the fusion mode specified when the job is submitted.
Placement configs are always respected. In general placement configs that appear on operators in a
parallel region are replicated along with the operators they are associated with. That means, for
example, that if you have partitionColocation(“A”)
specified on one operator, all
its replicas would be colocated in the same PE. This might not be what you want.
You can use the intrinsic functions byChannel()
and
byReplica()
to fuse operators in a parallel region into PEs. For more information,
see Operator fusion in
parallel regions.
Examples of transformations for valid stream graphs
This is a set of samples that exemplify how various stream graph topologies that utilize user-defined parallelism are transformed and then fused into PEs. Explicit fusion is used in all cases in order to constrain the possible fusion results. Each example provides the supporting sample code and displays a graphical representation of one or more of the parallel transformation rules.
The following legend explains the symbols that are used in the example graphics for the transformation rules:
A graphical legend of the symbols that are used in the example stream graphs that represent the parallel transformation rules.
