Examples

The following example showcases ObjectStorageSource reading statically named objects of different types

The credentials parameter to specify the IBM COS credentials for authentication. The objectStorageURI must be in either "cos://<bucket-name>/" or "s3a://<bucket-name>/" format.


composite Main {
    param
        expression<rstring> $credentials: getSubmissionTimeValue("os-credentials");
        expression<rstring> $objectStorageURI: getSubmissionTimeValue("os-uri");
        expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
    graph
        // read text object
        // use a ObjectStorageSource operator with no input port to process a single object
        stream<rstring line> TxtData = com.teracloud.streams.objectstorage::ObjectStorageSource() {
            param
                credentials: $credentials;
                objectStorageURI: $objectStorageURI;
                endpoint: $endpoint;
                objectName: "sample.txt";
        }

        // read binary object
        // use a ObjectStorageSource operator with no input port to process a single object
        stream<blob block> BinData = com.teracloud.streams.objectstorage::ObjectStorageSource() {
            param
                credentials: $credentials;
                objectStorageURI: $objectStorageURI;
                endpoint: $endpoint;
                objectName: "sample.bin";
                blockSize: 0; // loads file as a single tuple
        }
}

The next example showcases ObjectStorageSource reading objects from names sent in via an input port.

This example uses submission-time parameters to get bucket and endpoint information. For authentication, the default application configuration cos is used with property cos.creds to specify the IBM COS credentials.


composite Main {
    param
        expression<rstring> $bucket: getSubmissionTimeValue("os-bucket");
        expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
    graph
        // ObjectStorageScan operator with directory and pattern
        stream<rstring objectname> Scanned = com.teracloud.streams.objectstorage::ObjectStorageScan() {
            param

                objectStorageURI: com.teracloud.streams.objectstorage.s3::getObjectStorageURI($bucket);
                endpoint: $endpoint;
                directory: "/sample";
                pattern: ".*";
        }

        // use a ObjectStorageSource operator to process the object names
        // The first output attribute must be the data attribute.
        // Attributes from input stream, like objectname, are forwarded if defined in output stream with same attribute name and type.
        stream<rstring line, rstring objectname> Data = com.teracloud.streams.objectstorage::ObjectStorageSource(Scanned) {
            param
                objectStorageURI: com.teracloud.streams.objectstorage.s3::getObjectStorageURI($bucket);
                endpoint: $endpoint;
        }
}