Threading annotation
The SPL @threading
annotation determines the threading model to use for
the operators inside of a composite operator.
The @threading
annotation can be applied to a composite definition or
invocation. This annotation can nest, and the inner-most annotation takes precedence. This nesting
makes it possible to specify one threading model for most of the application, but switch to other
threading models for subsets of the application.
@threading
annotations apply to an operator:
- The composite invocation takes precedence over the composite definition.
- The most deeply nested annotation takes precedence.
- All operators in a PE under the dynamic threading model will share the same thread pool, even if the operators' threading models were specified by different threading annotations.
- Operators in a PE might have conflicting dynamic threading requests for the number of threads and elasticity. These conflicts are resolved by always favoring the higher number of requested threads, and favoring turning elasticity off.
- The PE respects all preexisting threaded ports, including when dynamic and dedicated are turned on.
- Operator threads. An operator thread is any thread started by an operator. Source operators always have their own thread, but non-source operators can also have their own threads.
- PE input port threads. Each PE input port has its own thread which receives tuples from the network and submits those tuples downstream.
- Threaded ports. Developers can explitictly add threads to their application through the
threadedPort
configuration option. Threaded ports execute the operator they are applied to, and all downstream operators until they reach an operator under a different thread.
Required elements
- model
- Specifies the threading model. The following options are supported:
- manual
- Each operator input port is executed by the thread that executed the operator upstream from it, unless the programmer manually adds a threaded port to an operator input port. The SPL runtime does not insert any threads. All threads come from operators, explicitly requested threaded ports, threaded ports injected by user-defined parallelism, or PE input ports. This threading model favors reducing individual tuple latency over throughput.
- dynamic
- Each operator input port can be executed by any thread. The assignment of threads to input
ports can change at runtime, as can the number of available threads. By default, the dynamic
threading model will start with the default number of threads, and periodically increase or
decrease the number of threads to improve overall PE throughput. This threading model favors
increasing overall throughput at the expense of individual tuple latency. The following
options are supported:
- threads
- An integer value that specifies the number of threads to start with; values must be greather than or equal to 1. The default number of threads is one more than the maximum number of input ports for a single operator across all operators in the PE. This default ensures that there are enough threads to simultaneously execute all input ports for any given operator, and an additional thread to process other operators.
- elastic
- A boolean value which controls if elasticity is:
- Active (true)
- This is the default value. If elasticity is active, the SPL runtime periodically evaluates if it should change the number of active threads as well as the number of operators under the dynamic threading model to improve overall PE throughput. Adjusting the number of operators under the dynamic threading model means that the input ports of an operator that is determined to be under the dynamic threading model, can be either executed by any thread in the thread pool or by the thread that executed the operator upstream from it, whichever maximizes the overall PE throughput.
- Inactive (false)
- If elasticity is inactive, the number of threads does not change during the lifetime of the PE and the number of operators under the dynamic threading model remains the same. As a consequence, when elasticity is inactive, threads controls the number of threads for the lifetime of the PE.
- automatic
- During PE initialization, the SPL runtime chooses between the dynamic and manual threading models. The SPL runtime uses both application information (such as the number of operators and number of preexisting threads in the PE) and system information (such as the number of logical processors on the host) to make the decision. While the SPL runtime tries to choose the best threading model for the PE and the system it is on, it is not guarenteed to do so. This option is the default.
- dedicated
- Each operator input port has a dedicated thread that processes all tuples on that input port for that operator. Note that under this threading model, the number of additional threads in a PE is entirely determined by the number of operator input ports in the PE. If the number of operator input ports is greather than the number of logical processors on the host, it is possible to over-subscribe the system. This threading model favors increasing overall throughput over individual tuple latency, but it is not adaptive.
Examples
- Example 1: Setting the automatic annotation on the main composite
- Applying the automatic threading model on the main composite definition means that the entire SPL application will execute under the automatic threading model. This is the default behavior. Each PE in the application will independently determine if it should use the manual or dynamic threading model.
- Example 2: Setting the dynamic annotation on the main composite
- Applying the dynamic threading model on the main composite chooses the dynamic threading model for the entire SPL application. This example also explicitly sets the number of threads and turns off elasticity. Note that each PE in this application will have 8 threads and elasticity will be inactive. In general, explicitly setting the number of threads and turning off elasticity should only be done when the developer knows exactly how the application will be fused, and knows exactly what resources are available on the hosts each PE will run on.
- Example 3: Overriding the model for a specific invocation
-
This example applies the dynamic threading model to the main composite definition, but then applies the manual threading model to a particular composite invocation. Most of the application will execute under the dynamic threading model, but the subset of the application in the
Ingest
composite will execute under the manual threading model.The fusion and host placement constraints in the
Ingest
composite mean that all of the operators in it will be fused into the same PE, and will run on a particular host with special access to the outside network. The developer knows that the first-level parsing operator is resource-intensive, and that in order to improve throughput, it should be executed by a separate thread. In order to achieve this, the developer places a threaded port on that operator. The subsequent operators are light weight, and are run by the same thread as the heavy-weight parsing thread. Because the developer knows the performance characteristics of the operators in theIngest
composite, they can switch to the manual threading model and manually place threads where they know they should go.
- Example 4: Overriding the threading model on a composite definition
-
This example is similar to Example 3, but with two key differences. First, the
Ingress
operator exclusively contains operators with heavy-weight processing. In this case, the developer wants to place threads inbetween each operator, and the dedicated threading model is most appropriate. Second, the threading model annotation goes on the operator definition, not its invocation. For these examples, it does not change the behavior. In general, however, applying a threading model to a composite definition means that threading model will apply to every invocation of that operator. Applying a threading model to just an operator's invocation applies that threading model to just that single invocation. Additionally, if that composite operator has a threading model on its definition, the threading model from the invocation will override it.