User-defined parallelism
@parallel
annotation, allows you to
easily take advantage of data-parallelism in your Teracloud®
Streams applications.
- Define parallel regions in your application, which replicate all operators in that region and automatically creates and connects new streams to the replicated operators.
- Specify the amount of parallelism (how many replicas to generate, also called the parallel width) at compile or submission time.
- Optionally partition the parallel region based on tuple attribute values. The SPL runtime will ensure that tuples with the same attribute values are sent to the same operators.
- Optionally broadcast tuples from selected streams to all operators in the parallel region.
- Preserve the tuple or punctuation order in a parallel region.
- Maintain state consistency across the replicated operators.
To use user-defined parallelism, add the @parallel
annotation to the invocation
of either a primitive or a composite operator. When applied to just a primitive operator, the
parallel region consists of just that primitive operator. When applied to a composite operator, the
parallel region consists of all operators inside of that composite operator. Parallelized composite
operators can themselves contain parallel regions, resulting in nested parallel
regions. Parallel regions are composed of parallel
channels, where each channel is an independent set of replicated operators. Parallel channels
can contain multiple operators, with multiple input and output streams. Parallel regions have names
which are the same as the logical operator (either primitive or composite) that the
@parallel
annotation is applied to.
The number of channels in a parallel region is determined by the width
parameter to the @parallel
annotation. The process of actually replicating the
operators and all of the necessary streams inside of a parallel region is the parallel
transformation. Because the parallel transformation is performed at submission time, you can
specify the width as a submission time value. Delaying the decision of how much
parallelism to use until submission time allows you to compile an application once, and change its
level of parallelism on each resubmission without recompiling.
Operators that produce streams that flow into a parallel region must distribute those tuples across the channels of the parallel region. This distribution is done by a splitter, which is the runtime component that splits the stream of tuples and distributes them to the different channels of a parallel region. The splitters exist on the output ports that are immediately upstream from the parallel region. The splitters can also be nested and contain other splitters; a splitter submits the tuples and punctuation to all its nested splitters. If you want to ensure that tuples with specific attribute values are always routed to the same channel, you can partition the parallel regions. The partitions are defined by a set of user-defined partition keys that are composed of tuple attributes.
- Regardless of whether the parallel region is partitioned, if the incoming stream is not set to
broadcast, each tuple is routed to a single channel only:
- When the parallel region is not partitioned, the splitter routes the tuple to maintain an even distribution of tuples to the channels.
- When the parallel region is partitioned, the splitter uses the partition keys, which are the attribute values of each tuple, to determine where it routes the tuple. The splitter routes each tuple by creating a hash from the set of tuple attributes.
- For streams that are set to broadcast, all tuples are routed to all channels.
- Each window punctuation and final punctuation is routed to all channels. Because punctuations are logical markers in a stream, all the channels require all the punctuation.
Parallel regions can contain source operators, sink operators, Import
and
Export
operators. In the same
way that other operators are replicated in the parallel region, source and sink operators are also replicated
in the parallel region. However, since source operators have no incoming data streams, no splitter is involved.
To use data parallelism, these parallel regions must invoke the source operators in a way that partitions the data
before it enters
the stream application.
The actual mechanisms used at runtime to achieve parallel execution will depend on both how the application was fused, and the threading model for the PEs in the application. When parallel operators are in separate PEs, they will naturally execute in parallel. How parallel operators are fused will depend on the fusion scheme and their partitionColocation
, partitionExlocation
and partitionIsolation
placement configuration options. When the PE is under the manual threading model, and the splitter is in the same PE as operators in the parallel region that it sends tuples to, then the runtime injects threaded ports to ensure parallel execution. When the PE is under the dynamic threading model, the runtime does not inject threaded ports, but instead allows the parallelized operators to be executed by the dynamic thread pool. For more details about fusion schemes, see Specifying how operators are fused when you submit a job. For more on threading models, see the SPL threading annotation.
The parallel transformation connects all the outgoing physical streams from the end of each channel to the same input port that the logical stream originally connected to, outside of the parallel region. Because there is no merge operation performed for the tuples flowing out of a parallel region, tuple order is not preserved. For example, if tuples A, B, C, and D arrive at a parallel region in that order, the operators that are downstream from the parallel region can receive these tuples in any order, such as D, A, C, B. For the cases where Teracloud® Streams typically preserves the tuple order, the use of user-defined parallelism breaks that tuple order. It is possible for you to implement your own merging operators outside of a parallel region, if your applications requires preserving tuple order.