Examples
These examples use the S3ObjectStorageSink operator.
a) S3ObjectStorageSink creating objects of size 200 bytes with incremented number in object name. As endpoint is the public us-geo (CROSS REGION) the default value of the os-endpoint submission parameter.
composite Main {
param
expression<rstring> $accessKeyID : getSubmissionTimeValue("os-access-key-id");
expression<rstring> $secretAccessKey : getSubmissionTimeValue("os-secret-access-key");
expression<rstring> $bucket: getSubmissionTimeValue("os-bucket");
expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
graph
stream<rstring i> SampleData = Beacon() {
param
period: 0.1;
output SampleData: i = (rstring)IterationCount();
}
() as osSink = com.teracloud.streams.objectstorage.s3::S3ObjectStorageSink(SampleData) {
param
accessKeyID : $accessKeyID;
secretAccessKey : $secretAccessKey;
bucket : $bucket;
objectName : "%OBJECTNUM.txt";
endpoint : $endpoint;
bytesPerObject: 200l;
}
}
b) S3ObjectStorageSink creating objects in parquet format. Objects are created in parquet format after $timePerObject in seconds
composite Main {
param
expression<rstring> $accessKeyID : getSubmissionTimeValue("os-access-key-id");
expression<rstring> $secretAccessKey : getSubmissionTimeValue("os-secret-access-key");
expression<rstring> $bucket: getSubmissionTimeValue("os-bucket");
expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
expression<float64> $timePerObject: 10.0;
type
S3ObjectStorageSinkOut_t = tuple<rstring objectName, uint64 size>;
graph
stream<rstring username, uint64 id> SampleData = Beacon() {
param
period: 0.1;
output
SampleData : username = "Test"+(rstring) IterationCount(), id = IterationCount();
}
stream<S3ObjectStorageSinkOut_t> ObjStSink = com.teracloud.streams.objectstorage.s3::S3ObjectStorageSink(SampleData) {
param
accessKeyID : $accessKeyID;
secretAccessKey : $secretAccessKey;
bucket : $bucket;
endpoint : $endpoint;
objectName: "sample_%TIME.snappy.parquet";
timePerObject : $timePerObject;
storageFormat: "parquet";
parquetCompression: "SNAPPY";
}
() as SampleSink = Custom(ObjStSink as I) {
logic
onTuple I: {
printStringLn("Object with name '" + I.objectName + "' of size '" + (rstring)I.size + "' has been created.");
}
}
}