Examples

The following example showcases a Beacon operator sending 5000 tuples to an ObjectStorageSink operator that writes the tuples to an object called "static_name.txt". After the 5000 tuples are sent, the Beacon sends a window punctuation that will cause the ObjectStorageSink to close the object.

For authentication, the example uses the default application configuration, named cos, that contains a property named cos.creds with a value of the IBM COS credentials. The objectStorageURI must be either in "cos://<bucket-name>/" or "s3a://<bucket-name>/" format.


composite Main {
    param
        expression<rstring> $objectStorageURI: getSubmissionTimeValue("os-uri");
        expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
    graph
        stream<rstring i> SampleData = Beacon()  {
            param
                iterations: 5000;
                period: 0.1;
            output SampleData: i = (rstring)IterationCount();
        }

        () as osSink = com.teracloud.streams.objectstorage::ObjectStorageSink(SampleData) {
            param
                objectStorageURI: $objectStorageURI;
                objectName : "static_name.txt";
                endpoint : $endpoint;
        }
}

The next example showcases ObjectStorageSink creating 2000 byte objects with an incrementing number in the object name.

For authentication, credentials parameter is used to specify the IBM COS credentials.


composite Main {
    param
        expression<rstring> $credentials: getSubmissionTimeValue("os-credentials");
        expression<rstring> $objectStorageURI: getSubmissionTimeValue("os-uri");
        expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
    graph
        stream<rstring i> SampleData = Beacon()  {
            param
                period: 0.1;
            output SampleData: i = (rstring)IterationCount();
        }

        () as osSink = com.teracloud.streams.objectstorage::ObjectStorageSink(SampleData) {
            param
                credentials: $credentials;
                objectStorageURI: $objectStorageURI;
                objectName : "%OBJECTNUM.txt";
                endpoint : $endpoint;
                bytesPerObject: 2000l;
        }
}

The last example showcases ObjectStorageSink creating objects in Apache Parquet format every 10 seconds

For authentication, IAM credentials are read from the default cos application configuration and cos.creds property.


composite Main {
    param
        expression<rstring> $objectStorageURI: getSubmissionTimeValue("os-uri", "cos://streams-sample-001/");
        expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
        expression<float64> $timePerObject: 10.0;

    type
        S3ObjectStorageSinkOut_t = tuple<rstring objectName, uint64 size>;

    graph

        stream<rstring username, uint64 id> SampleData = Beacon() {
            param
                period: 0.1;
            output
                SampleData : username = "Test"+(rstring) IterationCount(), id = IterationCount();
        }

        stream<S3ObjectStorageSinkOut_t> ObjStSink = com.teracloud.streams.objectstorage::ObjectStorageSink(SampleData) {
            param
                objectStorageURI: $objectStorageURI;
                endpoint : $endpoint;
                objectName: "sample_%TIME.snappy.parquet";
                timePerObject : $timePerObject;
                storageFormat: "parquet";
                parquetCompression: "SNAPPY";
        }

        () as SampleSink = Custom(ObjStSink as I) {
             logic
                onTuple I: {
                    printStringLn("Object with name '" + I.objectName + "' of size '" + (rstring)I.size + "' has been created.");
                }
        }
}