Upgrading the new outdatedRecords statistics attribute
The new partitioned deduplication feature provides new statistics information. You must count and prepare the outdated records statistics that the BloomFilter operator marks as unknown result in the bloomFilterResult output streams attribute.
Typically, the application prepares the statistics in <namespace>.chainsink.custom::PostContextDataProcessor composite.
If the application implements the Variant A, then you don't need to follow the described procedure. This procedure is valid for variant B and C only, if the deduplication feature is turned on.
Procedure
- Open the file that prepares the statistics. For example:
gedit <YourProjectFolder>/<namespace>.chainsink.custom/PostContextDataProcessor.spl
-
Include new use statement for BloomFilterTypes composite.
use com.teracloud.streams.teda.utility::BloomFilterTypes;
-
Add new variable in the logic state clause to count the outdated records.
For example:
logic state: { ... mutable int64 detectedOutdatedRecords = 0l; }
-
Increase the counter value for each record that provides the BloomFilterTypes.unique value in the bloomFilterResult attribute. You implement the code in the custom code begin/end block of the onTuple clause of the record input stream.
For example:
onTuple InRec: { // ------------------------------------------------ // custom code begin // ------------------------------------------------ ... } else if (BloomFilterTypes.unknown == bloomFilterResult) { // check if tuple outdated, to increase statistics counter detectedOutdatedRecords++; } ... // ------------------------------------------------ // custom code end // ------------------------------------------------ ... }
-
Assign the detectedOutdatedRecords counter to the outdatedRecords attribute of the statistics tuple in the custom code begin/end block of the onTuple clause of the statistics input stream.
For example:
onTuple InStat: { // ------------------------------------------------ // custom code begin // ------------------------------------------------ ... InStat.outdatedRecords=detectedOutdatedRecords; ... // ------------------------------------------------ // custom code end // ------------------------------------------------ ... }
-
Reset the counter value after submission of the statistics tuple in the onTuple clause of the statistics input stream.
For example:
onTuple InStat: { // ------------------------------------------------ // custom code begin // ------------------------------------------------ ... // ------------------------------------------------ // custom code end // ------------------------------------------------ ... // forward statistic tuple submit(InStat,OutStat); // reset member ... detectedOutdatedRecords = 0l; }
You find the whole sample in <YourProjectFolder>/<namespace>/chainsink/sample/PostContextDataProcessor.splmm file:
use <APPLICATION_NAMESPACE>.streams::*; use <APPLICATION_NAMESPACE>.functions::*; use com.teracloud.streams.teda.utility::BloomFilterTypes; ... public composite PostContextDataProcessor ( ... ) as DedupedRecord = Custom(InRec; InStat) { logic state: { mutable int64 detectedRecordDuplicates = 0l; mutable int64 detectedOutdatedRecords = 0l; } onTuple InRec: { // ------------------------------------------------ // custom code begin // ------------------------------------------------ <%if (0 == $dedupDisabled) { %>if (BloomFilterTypes.duplicate == bloomFilterResult) { // check if duplicate, to increase statistics counter detectedRecordDuplicates++; } else if (BloomFilterTypes.unknown == bloomFilterResult) { // check if tuple outdated, to increase statistics counter detectedOutdatedRecords++; } else {<%}# end of 'if (0 == $dedupDisabled)'%> mutable OutRec otuple = {}; assignFrom(otuple, InRec); submit(otuple,OutStream);<% if (0 == $dedupDisabled) {%> } // end of 'else bloomFilterResult <%}# end of 'else (0 == $dedupDisabled)'%> // ------------------------------------------------ // custom code end // ------------------------------------------------ } onTuple InStat: { // ------------------------------------------------ // custom code begin // ------------------------------------------------ <%if (0 == $dedupDisabled) {%> // update statistics with detected duplicates InStat.recordDuplicates=detectedRecordDuplicates; InStat.outdatedRecords=detectedOutdatedRecords;<% }%> // ------------------------------------------------ // custom code end // ------------------------------------------------ // forward statistic tuple submit(InStat,OutStat); // reset member detectedRecordDuplicates = 0l; detectedOutdatedRecords = 0l; } ... }
-
Save and close files that implement the affected composite.