Examples
These examples use the S3ObjectStorageSource operator.
a) S3ObjectStorageSource with dynamic object names to be read
composite Main {
param
expression<rstring> $accessKeyID : getSubmissionTimeValue("os-access-key-id");
expression<rstring> $secretAccessKey : getSubmissionTimeValue("os-secret-access-key");
expression<rstring> $bucket: getSubmissionTimeValue("os-bucket");
expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
graph
// S3ObjectStorageScan operator with directory and pattern
stream<rstring objectname> Scanned = com.teracloud.streams.objectstorage.s3::S3ObjectStorageScan() {
param
accessKeyID : $accessKeyID;
secretAccessKey : $secretAccessKey;
bucket : $bucket;
endpoint: $endpoint;
directory: "/sample";
pattern: ".*";
}
// use a S3ObjectStorageSource operator to process the object names
// The first output attribute must be the data attribute.
// Attributes from input stream, like objectname, are forwarded if defined in output stream with same attribute name and type.
stream<rstring line, rstring objectname> Data = com.teracloud.streams.objectstorage.s3::S3ObjectStorageSource(Scanned) {
param
accessKeyID : $accessKeyID;
secretAccessKey : $secretAccessKey;
bucket : $bucket;
endpoint: $endpoint;
}
}
b) S3ObjectStorageSource with static object name to be read
composite Main {
param
expression<rstring> $accessKeyID : getSubmissionTimeValue("os-access-key-id");
expression<rstring> $secretAccessKey : getSubmissionTimeValue("os-secret-access-key");
expression<rstring> $bucket: getSubmissionTimeValue("os-bucket");
expression<rstring> $endpoint: getSubmissionTimeValue("os-endpoint", "s3.us.cloud-object-storage.appdomain.cloud");
graph
// read text object
// use a S3ObjectStorageSource operator with no input port to process a single object
stream<rstring line> TxtData = com.teracloud.streams.objectstorage.s3::S3ObjectStorageSource() {
param
accessKeyID : $accessKeyID;
secretAccessKey : $secretAccessKey;
bucket : $bucket;
endpoint: $endpoint;
objectName: "sample.txt";
}
// read binary object
// use a S3ObjectStorageSource operator with no input port to process a single object
stream<blob block> BinData = com.teracloud.streams.objectstorage.s3::S3ObjectStorageSource() {
param
accessKeyID : $accessKeyID;
secretAccessKey : $secretAccessKey;
bucket : $bucket;
endpoint: $endpoint;
objectName: "sample.bin";
blockSize: 0; // loads file as a single tuple
}
}