Examples

The following code block showcases several ways to use the KafkaConsumer:

use com.teracloud.streams.kafka::*;
public composite KafkaConsumerSample {
  graph
    // Basic single topic example
    stream<MessageType.StringMessage> SingleTopicMessages = KafkaConsumer() {
      param
        topic : "test";
        propertiesFile : "etc/consumer.properties";
    }

    // Multi-topic example
    stream<MessageType.StringMessage> MultiTopicMessages = KafkaConsumer() {
      param
        topic : "test,test2";
        propertiesFile : "etc/consumer.properties";
    }

    // Multi-topic partitioning example
    stream<MessageType.StringMessage> MultiTopicPartitionMessages = KafkaConsumer() {
      param
        topic : "test,test2";
        partition : 0, 1 ; // Assign consumer to partition 0 and 1 for both test and test2
        propertiesFile : "etc/consumer.properties";
    }

    // Multi-topic regex example
    stream<MessageType.StringMessage> MultiTopicRegexMessages = KafkaConsumer() {
      param
        pattern : "test.*";
        propertiesFile : "etc/consumer.properties";
    }

    // Dynamic consumer group example
    @parallel(width = 3)
    stream<MessageType.StringMessage> DynamicGroupMessages = KafkaConsumer() {
      param
        topic : "test";
        groupId : "myConsumerGroup";
        propertiesFile : "etc/consumer.properties";
    }

    // Static consumer group example
    @parallel(width = 3)
    stream<MessageType.StringMessage> StaticGroupMessages = KafkaConsumer() {
      param
        topic : "test2";
        groupId : "myGroup";
        staticGroupMember : true;
        propertiesFile : "etc/consumer.properties";
    }
}