Developing stream applications with user-defined parallelism
@parallel
annotation, allows you to easily take advantage of data-parallelism in your Teracloud®
Streams applications.
- Define parallel regions in your application, which replicate all operators in that region and automatically creates and connects new streams to the replicated operators.
- Specify the amount of parallelism (how many replicas to generate, also called the parallel width) at compile or submission time.
- Optionally partition the parallel region based on tuple attribute values. The SPL runtime will ensure that tuples with the same attribute values are sent to the same operators.
- Optionally broadcast tuples from selected streams to all operators in the parallel region.
- Preserve the tuple or punctuation order in a parallel region.
- Maintain state consistency across the replicated operators.
To use user-defined parallelism, add the @parallel
annotation to the
invocation of either a primitive or a composite operator. When applied to just a
primitive operator, the parallel region consists of just that primitive operator. When
applied to a composite operator, the parallel region consists of all operators inside of
that composite operator. Parallelized composite operators can themselves contain
parallel regions, resulting in nested parallel
regions. Parallel regions are composed of
parallel channels, where each channel is an independent set of
replicated operators. Parallel channels can contain multiple operators, with multiple
input and output streams. Parallel regions have names which are the same as the logical
operator (either primitive or composite) that the @parallel
annotation
is applied to.
The number of channels in a parallel region is determined by the
width parameter to the @parallel
annotation.
The process of actually replicating the operators and all of the necessary streams
inside of a parallel region is the parallel transformation. Because the
parallel transformation is performed at submission time, you can specify the
width as a submission time value. Delaying the decision of how
much parallelism to use until submission time allows you to compile an application once,
and change its level of parallelism on each resubmission without recompiling.
Operators that produce streams that flow into a parallel region must distribute those tuples across the channels of the parallel region. This distribution is done by a splitter, which is the runtime component that splits the stream of tuples and distributes them to the different channels of a parallel region. The splitters exist on the output ports that are immediately upstream from the parallel region. The splitters can also be nested and contain other splitters; a splitter submits the tuples and punctuation to all its nested splitters. If you want to ensure that tuples with specific attribute values are always routed to the same channel, you can partition the parallel regions. The partitions are defined by a set of user-defined partition keys that are composed of tuple attributes.
- Regardless of whether the parallel region is partitioned, if the incoming stream
is not set to broadcast, each tuple is routed to a single channel only:
- When the parallel region is not partitioned, the splitter routes the tuple to maintain an even distribution of tuples to the channels.
- When the parallel region is partitioned, the splitter uses the partition keys, which are the attribute values of each tuple, to determine where it routes the tuple. The splitter routes each tuple by creating a hash from the set of tuple attributes.
- For streams that are set to broadcast, all tuples are routed to all channels.
- Each window punctuation and final punctuation is routed to all channels. Because punctuations are logical markers in a stream, all the channels require all the punctuation.
Parallel regions can contain source operators, sink operators,
Import
and Export
operators. In the same way that
other operators are replicated in the parallel region, source and sink operators are
also replicated in the parallel region. However, since source operators have no incoming
data streams, no splitter is involved. To use data parallelism, these parallel regions
must invoke the source operators in a way that partitions the data before it enters the
stream application.
The actual mechanisms used at runtime to achieve parallel execution will
depend on both how the application was fused, and the threading model for the PEs in the
application. When parallel operators are in separate PEs, they will naturally execute in
parallel. How parallel operators are fused will depend on the fusion scheme and their
partitionColocation
, partitionExlocation
and
partitionIsolation
placement configuration options. When the PE is
under the manual threading model, and the splitter is in the same PE as operators in the
parallel region that it sends tuples to, then the runtime injects threaded ports to
ensure parallel execution. When the PE is under the dynamic threading model, the runtime
does not inject threaded ports, but instead allows the parallelized operators to be
executed by the dynamic thread pool. For more details about fusion schemes, see Specifying how operators are fused when you
submit a job. For more on threading models, see the SPL threading annotation.
The parallel transformation connects all the outgoing physical streams from the end of each channel to the same input port that the logical stream originally connected to, outside of the parallel region. Because there is no merge operation performed for the tuples flowing out of a parallel region, tuple order is not preserved. For example, if tuples A, B, C, and D arrive at a parallel region in that order, the operators that are downstream from the parallel region can receive these tuples in any order, such as D, A, C, B. For the cases where Teracloud® Streams typically preserves the tuple order, the use of user-defined parallelism breaks that tuple order. It is possible for you to implement your own merging operators outside of a parallel region, if your applications requires preserving tuple order.