Examples
The following example showcases S3ObjectStorageSink creating 2000 byte objects with an incrementing number in the object name.
For authentication, credentials parameter is used to specify the IBM COS credentials.
composite Main {
param
expression<rstring> $accessKeyID : getSubmissionTimeValue("os-access-key-id");
expression<rstring> $secretAccessKey : getSubmissionTimeValue("os-secret-access-key");
expression<rstring> $bucket: getSubmissionTimeValue("os-bucket");
expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
graph
stream<rstring i> SampleData = Beacon() {
param
period: 0.1;
output SampleData: i = (rstring)IterationCount();
}
() as osSink = com.teracloud.streams.objectstorage.s3::S3ObjectStorageSink(SampleData) {
param
accessKeyID : $accessKeyID;
secretAccessKey : $secretAccessKey;
bucket : $bucket;
objectName : "%OBJECTNUM.txt";
endpoint : $endpoint;
bytesPerObject: 2000l;
}
}
The next example showcases S3ObjectStorageSink creating objects in Apache Parquet format. The existing object is closed and a new one is created every 10 seconds.
composite Main {
param
expression<rstring> $accessKeyID : getSubmissionTimeValue("os-access-key-id");
expression<rstring> $secretAccessKey : getSubmissionTimeValue("os-secret-access-key");
expression<rstring> $bucket: getSubmissionTimeValue("os-bucket");
expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
expression<float64> $timePerObject: 10.0;
type
S3ObjectStorageSinkOut_t = tuple<rstring objectName, uint64 size>;
graph
stream<rstring username, uint64 id> SampleData = Beacon() {
param
period: 0.1;
output
SampleData : username = "Test"+(rstring) IterationCount(), id = IterationCount();
}
stream<S3ObjectStorageSinkOut_t> ObjStSink = com.teracloud.streams.objectstorage.s3::S3ObjectStorageSink(SampleData) {
param
accessKeyID : $accessKeyID;
secretAccessKey : $secretAccessKey;
bucket : $bucket;
endpoint : $endpoint;
objectName: "sample_%TIME.snappy.parquet";
timePerObject : $timePerObject;
storageFormat: "parquet";
parquetCompression: "SNAPPY";
}
() as SampleSink = Custom(ObjStSink as I) {
logic
onTuple I: {
printStringLn("Object with name '" + I.objectName + "' of size '" + (rstring)I.size + "' has been created.");
}
}
}