Example for guaranteed processing with exactly-once semantics
To achieve exactly-once semantics with the ElasticsearchIndex operator:
- The operator must be part of a consistent region. In case of failures tuples are replayed by the source operator.
- In order to overwrite existing documents, ensure that the idNameAttribute parameter set, for an input stream attribute containing a unique key.
In the sample below the ElasticsearchIndex operator reads Elasticsearch credentials from application configuration. Ensure that application configuration with name "es" has been created with the properties nodeList, userName and password.
composite Main {
param
expression<rstring> $indexName: getSubmissionTimeValue("indexName", "index-sample");
graph
() as JCP = JobControlPlane() {}
@consistent(trigger=periodic, period=5.0)
stream<rstring key, uint64 dummy> Documents = Beacon() {
param
period: 0.01;
output
Documents : key = "SAMPLE"+(rstring) IterationCount();
}
() as ElasticsearchSink = ElasticsearchIndex(Documents) {
param
indexName: $indexName;
idNameAttribute: key;
}
}