Examples
The following example showcases a Beacon operator sending 5000 tuples to an ObjectStorageSink operator that writes the tuples to an object called "static_name.txt". After the 5000 tuples are sent, the Beacon sends a window punctuation that will cause the ObjectStorageSink to close the object.
For authentication, the example uses the default application configuration, named cos, that contains a property named cos.creds with a value of the IBM COS credentials. The objectStorageURI must be either in "cos://<bucket-name>/" or "s3a://<bucket-name>/" format.
composite Main {
param
expression<rstring> $objectStorageURI: getSubmissionTimeValue("os-uri");
expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
graph
stream<rstring i> SampleData = Beacon() {
param
iterations: 5000;
period: 0.1;
output SampleData: i = (rstring)IterationCount();
}
() as osSink = com.teracloud.streams.objectstorage::ObjectStorageSink(SampleData) {
param
objectStorageURI: $objectStorageURI;
objectName : "static_name.txt";
endpoint : $endpoint;
}
}
The next example showcases ObjectStorageSink creating 2000 byte objects with an incrementing number in the object name.
For authentication, credentials parameter is used to specify the IBM COS credentials.
composite Main {
param
expression<rstring> $credentials: getSubmissionTimeValue("os-credentials");
expression<rstring> $objectStorageURI: getSubmissionTimeValue("os-uri");
expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
graph
stream<rstring i> SampleData = Beacon() {
param
period: 0.1;
output SampleData: i = (rstring)IterationCount();
}
() as osSink = com.teracloud.streams.objectstorage::ObjectStorageSink(SampleData) {
param
credentials: $credentials;
objectStorageURI: $objectStorageURI;
objectName : "%OBJECTNUM.txt";
endpoint : $endpoint;
bytesPerObject: 2000l;
}
}
The last example showcases ObjectStorageSink creating objects in Apache Parquet format every 10 seconds
For authentication, IAM credentials are read from the default cos application configuration and cos.creds property.
composite Main {
param
expression<rstring> $objectStorageURI: getSubmissionTimeValue("os-uri", "cos://streams-sample-001/");
expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
expression<float64> $timePerObject: 10.0;
type
S3ObjectStorageSinkOut_t = tuple<rstring objectName, uint64 size>;
graph
stream<rstring username, uint64 id> SampleData = Beacon() {
param
period: 0.1;
output
SampleData : username = "Test"+(rstring) IterationCount(), id = IterationCount();
}
stream<S3ObjectStorageSinkOut_t> ObjStSink = com.teracloud.streams.objectstorage::ObjectStorageSink(SampleData) {
param
objectStorageURI: $objectStorageURI;
endpoint : $endpoint;
objectName: "sample_%TIME.snappy.parquet";
timePerObject : $timePerObject;
storageFormat: "parquet";
parquetCompression: "SNAPPY";
}
() as SampleSink = Custom(ObjStSink as I) {
logic
onTuple I: {
printStringLn("Object with name '" + I.objectName + "' of size '" + (rstring)I.size + "' has been created.");
}
}
}