Consume all partitions
A single KafkaConsumer operator consumes all messages from a topic regardless of the number of partitions.
Details
Without a partition specification, the operator will consume from all partitions of the topic. The partitions of the subscribed topic are assigned by Kafka, and the operator represents a consumer group with only one member. When no group identifier is specified, the consumer operator creates an identifier. The operator will automatically be assigned new partitions, when partitions are added to the topic. On the other side, partition de-assignment and re-assignment can happen when:
- Group management related timers expire
- The group coordinator node goes down
- Meta data of the subscribed topic changes (for example, the number of partitions)
Partition re-assignment makes the consumer replay Kafka messages beginning with last committed offsets.
Pros and Cons
- Pro: Very simple
- Con: Volume is limited by a single operator reading messages from all partitions
Guaranteed processing
- Consistent region: Supported (periodic and operator driven)
- Checkpointing via config checkpoint: Supported, but ignored. The operator does not save any data.
When the operator is used in a consistent region, at least-once processing through the Streams application is guaranteed. Without a consistent region, tuples can get lost within the Streams application when a PE restarts.
Operator configuration
No assignment of partitions is configured through the partition parameter.
Examples
stream <rstring json> Messages = KafkaConsumer() {
param
propertiesFile: "etc/consumer.properties";
topic: "myTopic";
outputMessageAttributeName: "json";
}
() as JCP = JobControlPlane() {}
@consistent (trigger = operatorDriven)
stream <rstring json, rstring messageKey> Messages = KafkaConsumer() {
param
propertiesFile: "etc/consumer.properties";
topic: "myTopic";
outputMessageAttributeName: "json";
outputKeyAttributeName: "messageKey";
triggerCount: 10000; // make the region consistent every 10000 tuples
}
() as JCP = JobControlPlane() {}
@consistent (trigger = periodic, period = 60.0)
stream <rstring message, rstring key> Messages = KafkaConsumer() {
param
propertiesFile: "etc/consumer.properties";
topic: "myTopic";
}
Since message and key are the default attribute names for the Kafka message and the key, they do not need to be specified.