SPL File Types.spl
Content
- Types
-
- Control.TopicPartition: Tuple type for a topic partition to be added or removed
- Control.TopicPartitionOffset: Tuple type for a topic partition with offset to be added
- MessageType.BlobMessage: This type represents a message with message attribute of type blob and key attribute with SPL type rstring.
- MessageType.ConsumerMessageMetadata: This type represents the meta data of a received message.
- MessageType.StringMessage: This type represents a message with message and key attribute, both being rstring.
- MessageType.TopicPartition: This type represents a topic and a partition number.
Composites
composite MessageType
Defines Message types with default attribute names and types.
Static Types
- MessageType.BlobMessage = blob message, rstring key;
-
This type represents a message with message attribute of type blob and key attribute with SPL type rstring. The type can be used for consumed messages or messages to be produced.
Example use:
// messages get de-serialized into blob stream <MessageType.BlobMessage> ReceivedMessages = KafkaConsumer() { param topic: "myTopic"; groupId: "consumerGroup1"; propertiesFile: "etc/consumer.properties"; }
- MessageType.ConsumerMessageMetadata = rstring topic, int32 partition, int64 offset, int64 messageTimestamp;
-
This type represents the meta data of a received message.
It consists of following attributes:- topic (rstring)
- partition (int32)
- offset (int64)
- messageTimestamp (int64) - milliseconds since Unix Epoch
Example Use:
stream <MessageType.BlobMessage, MessageType.ConsumerMessageMetadata> ReceivedMessages = KafkaConsumer() { param topic: "myTopic"; groupId: "consumerGroup1"; propertiesFile: "etc/consumer.properties"; }
- MessageType.StringMessage = rstring message, rstring key;
-
This type represents a message with message and key attribute, both being rstring. The type can be used for consumed messages and messages to be produced.
Example use:// messages get de-serialized into rstring stream <MessageType.StringMessage> ReceivedMessages = KafkaConsumer() { param topic: "myTopic"; groupId: "consumerGroup1"; propertiesFile: "etc/consumer.properties"; }
- MessageType.TopicPartition = rstring topic, int32 partition;
-
This type represents a topic and a partition number. It can be used to store meta data of received messages and to specify the topic and partition number of messages to be published (produced).
composite Control
Defines types for the JSON generator functions for the control port.
Static Types
- Control.TopicPartition = rstring topic, int32 partition;
-
Tuple type for a topic partition to be added or removed
Example Use:
mutable list <Control.TopicPartition> partitionsToAdd = []; appendM (partitionsToAdd, {topic = 'topic1', partition = 0}); appendM (partitionsToAdd, {topic = 'topic1', partition = 1}); submit ({controlMsg = createMessageAddTopicPartition (partitionsToAdd)}, ControlMessages);
- Control.TopicPartitionOffset = rstring topic, int32 partition, int64 offset;
-
Tuple type for a topic partition with offset to be added
Example Use:
mutable list <Control.TopicPartitionOffset> partitionsWithOffsetsToAdd = []; appendM (partitionsWithOffsetsToAdd, {topic = 'topic1', partition = 0, offset = -2l}); appendM (partitionsWithOffsetsToAdd, {topic = 'topic1', partition = 1, offset = 345678l}); submit ({controlMsg = createMessageAddTopicPartition (partitionsWithOffsetsToAdd)}, ControlMessages);