Parallel annotation
To take advantage of data-parallelism, apply the @parallel
annotation to invocations of primitive or composite operators. Data-parallelism means
replicating copies of operators and splitting streams so that different tuples go to each set of
replicas.
To apply data-parallelism to an arbitrary group of operators, refactor those operators into a
single composite operator, and then apply the @parallel
annotation to the
invocation of that composite operator.
The parallel region name is the name of the logical operator that the
@parallel
annotation is applied to. You can change the parallel region width after
the job is submitted by specifying the parallel region name and its new width. For information, see
Changing the
degree of parallelism at run time.
Required elements
- width
- Specifies the number of channels in a parallel region. For each
channel in a parallel region,
Teracloud®
Streams
replicates the invoked operator. The width must be either an
int32
literal, anexpression<int32>
composite parameter, or a submission-time value that is cast as anint32
data type.
Optional elements
- partitionBy
- Specifies how to partition tuples across the channels for specific ports that enter a parallel region. Ports are individually partitioned and each port can be different. There is one splitter per port. Partitioning a parallel region ensures that tuples with specific values are always routed to the same channel. These specific values are called partitioning keys, and are specified as tuple attributes. The partitioning specification for a parallel region is supplied as a list of port and attribute pairs. The pairs are specified as follows:
- broadcast
- Specifies that the tuples from the following list of ports should not be split, but should be broadcast to all channels. The port names must be input ports on the operator invocation the parallel annotation applies to.
- replicateHostTags
- Specifies host tags that identify the host pools to replicate in a parallel region. When a host pool is replicated, it is used only for a single channel in a parallel region. By replicating host pools, you can assign sibling operators in a parallel region to different sets of hosts.
Examples
- Example 1: Parallel regions that are not partitioned
- Example 1 shows the simplest form of a parallel annotation for parallel regions that are not
partitioned. This example applies the
@parallel
annotation to the invocation of the Op operator. The annotation is placed immediately before the first output stream that is associated with the operator invocation. The newline is not required. The width parameter specifies that the operator is replicated five times and that there are five channels. Therefore, the output port of the input that feeds the Op operator uses a splitter. ThepartitionBy
element is not specified, therefore the splitter routes the tuples to any channel.
- Example 2: Partitions in a parallel region
- To partition a parallel region, you specify the partition keys
for each input port. The partition keys are the tuple attributes that
the splitter uses to create the partition. The valid attributes for
the partitioning keys are from the stream type of the input port.
In Example 2, the splitter to the
Input1
port partitions the tuples based on theattr1
andattr2
attributes. The partitioning for theInput1
port is implemented with a hash, and the splitter routes tuples that are based on the hash value. TheInput2
port does not specify apartitionBy
element, therefore the splitter does not partition the parallel region and the tuples are evenly distributed among the parallel channels.
- Example 3: Partitioning for two streams that are coming into one port
- Partitioning applies to all the streams that are incoming to the
port. Example 3 shows two streams that are coming into one port. Notice
that, the
Input1
andInput2
streams are partitioned in the same way.
- Example 4: Broadcasting tuples
- Broadcasting tuples sends all tuples from a port to all channels.
This option is useful if operators in all channels in a parallel region need to receive the same
data. Broadcasting is mutually exclusive with partitioning; a port can be one or the other, but not
both. In Example 4, the input port
Updates
contains tuples which appear infrequently, and represent periodic updates that all operators in all channels need to process. The portInput
contains tuples which should be processed in parallel, partitioned on the attributeattr1
.