Examples

The following example showcases S3ObjectStorageSink creating 2000 byte objects with an incrementing number in the object name.

For authentication, credentials parameter is used to specify the IBM COS credentials.


composite Main {
    param
        expression<rstring> $accessKeyID : getSubmissionTimeValue("os-access-key-id");
        expression<rstring> $secretAccessKey : getSubmissionTimeValue("os-secret-access-key");
        expression<rstring> $bucket: getSubmissionTimeValue("os-bucket");
        expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
    graph
        stream<rstring i> SampleData = Beacon()  {
            param
                period: 0.1;
            output SampleData: i = (rstring)IterationCount();
        }

        () as osSink = com.teracloud.streams.objectstorage.s3::S3ObjectStorageSink(SampleData) {
            param
                accessKeyID : $accessKeyID;
                secretAccessKey : $secretAccessKey;
                bucket : $bucket;
                objectName : "%OBJECTNUM.txt";
                endpoint : $endpoint;
                bytesPerObject: 2000l;
        }
}

The next example showcases S3ObjectStorageSink creating objects in Apache Parquet format. The existing object is closed and a new one is created every 10 seconds.


composite Main {
    param
        expression<rstring> $accessKeyID : getSubmissionTimeValue("os-access-key-id");
        expression<rstring> $secretAccessKey : getSubmissionTimeValue("os-secret-access-key");
        expression<rstring> $bucket: getSubmissionTimeValue("os-bucket");
        expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
        expression<float64> $timePerObject: 10.0;

    type
        S3ObjectStorageSinkOut_t = tuple<rstring objectName, uint64 size>;

    graph

        stream<rstring username, uint64 id> SampleData = Beacon() {
            param
               period: 0.1;
            output
                SampleData : username = "Test"+(rstring) IterationCount(), id = IterationCount();
        }

        stream<S3ObjectStorageSinkOut_t> ObjStSink = com.teracloud.streams.objectstorage.s3::S3ObjectStorageSink(SampleData) {
            param
                accessKeyID : $accessKeyID;
                secretAccessKey : $secretAccessKey;
                bucket : $bucket;
                endpoint : $endpoint;
                objectName: "sample_%TIME.snappy.parquet";
                timePerObject : $timePerObject;
                storageFormat: "parquet";
                parquetCompression: "SNAPPY";
        }

        () as SampleSink = Custom(ObjStSink as I) {
            logic
                onTuple I: {
                     printStringLn("Object with name '" + I.objectName + "' of size '" + (rstring)I.size + "' has been created.");
                }
        }
}