SPL File topicsfilter.spl
Subscribe functionality with data filters. FilteredSubscribe subscribes to streams and filters the data, optimizing the filter where possible by pushing it to the publisher to reduce network traffic.
Content
- Operators
-
- FilteredSubscribe: Filtered subscribe to a topic.
- Functions
-
- andFilter(rstring, rstring): Return a string representation of cond1 && cond2.
- equalsFilter(rstring, float64): Return a string representation of name == value for a float64 attribute.
- equalsFilter(rstring, int64): Return a string representation of name == value for a int64 attribute.
- equalsFilter(rstring, rstring): Return a string representation of name == value for a rstring attribute.
- getTopicSubscription(rstring, boolean): Get the subscription that matches a topic for topic based subscription with explicit matching for the publisher allowing filters.
- inFilter(rstring, list<rstring>): Return a string representation of filter name in values for a rstring attribute.
- orFilter(rstring, rstring): Return a string representation of cond1 || cond2.
- setSubscribeFilter(rstring): Set filter for input port zero.
Composites
composite FilteredSubscribe(output FilteredTopic)
Filtered subscribe to a topic. Generates a stream that is subscribed, through Streams dynamic connections, to all streams published to the same topic and streamType.
The tuples on FilteredTopic are a subset of all tuples on streams published to topic.
For any published stream that allows filters (see Publish) remoteFilter is pushed to the publishing application reducing the number of tuples sent over the network between applications by only sending tuples that match the filter.
Tuples that match the remote filter and those from published streams that do not allow filters are then filtered locally using the function localFilterFunction. For each tuple that arrives to this operator invocation from published streams localFilterFunction(tuple, localFilterParameter) is called. If localFilterFunction returns true then the tuple is present on FilteredTopic.
The benefit of this operator over a Publish followed by a Filter is that remote filtering will reduce network use and local cpu processing cost by filtering tuples at the publishing side. The operator is also designed for microservice architecture so that an application can use this operator without any knowledge of how a topic is being published, and the output stream always contains the correctly filtered data.
Typically the remote filter and local filter are equivalent, for example, a parameterized composite that filters sensor readings by sensor identifier, in this case the filter expression is id == $id where $id is sensor identifier being filtered for:
type Sensor = tuple<rstring id, float64 reading>;
// Function that acts as the local filter
public boolean filterSensor(Sensor t, rstring id) {
return t.id == id;
}
public composite SensorById(output stream<Sensor> Readings) {
param expression<rstring> $id;
graph
stream<Sensor> Readings = FilteredSubscribe() {
param
topic: "sensors/readings";
streamType: Sensor;
// Remote filter uses a utility method to
// create the filter
remoteFilter: equalsFilter("id", $id);
// local filtering uses a function
localFilterFunction: filterSensor;
localFilterParameter: $id;
}
}
The local filter can be different to the remote filter to take advantage of the richer programming model in SPL compared to filter expressions. To maintain expected behavior the filters should be consistent with each other so that if localFilterFunction returns true then remote filter should evaluate to true. The local filter thus passes through a strict subset of tuples that would pass the remote filter.
The remote filter is a filter expression represented as a rstring using the format specified by the Import operator. This namespace provides a number of utility functions to build an expression.
The local filter is a SPL function that takes two arguments, the tuple and the parameter. The type of the first argument, the tuple, must match the output stream type streamType. The second argument matches the parameter type localFilterParameter and is specific to the invocation of this operator. If local filtering does not need a parameter as the expression is only against tuple attributes, e.g. t.reading > 80.0, then localFilterParameter need not be set. In this case int32 0 is passed so the function would need to be:
public boolean filterByReading(Sensor t, int32 unused) {
return t.reading > 80.0;
}
Subscribe behavior in parallel regions matches the behavior described by Subscribe.
See namespace:com.teracloud.streams.topology.topic for details.
Parameters
- topic: Topic to subscribe to.
- streamType: Type of output stream Topic.
- remoteFilter: Filter expression as an rstring that is executed remotely in the publisher when it allows filters.
- localFilterFunction: Boolean function that is executed against every tuple that arrives locally from a pubisher. The function is passed two arguments, the tuple and the value of localFilterParameter. If the function returns true` then the filter is present on FilteredTopic, otherwise it is discarded.
- localFilterParameter: Optional parameter passed as the second argument to localFilterFunction. If not supplied then zero is passed as an int32.
- connect: Connection mode between subscriber and publishers. Can be set to Direct or Buffered. The default is Direct. See Connection mode.
- bufferFullPolicy: Buffer policy when the buffer is full. Ignored when connect is set to Direct.
- bufferCapacity: Capacity of the buffer in number of tuples. Ignored when connect is set to Direct.
Output Ports
- FilteredTopic: Filtered subscription to topic.
Functions
rstring getTopicSubscription(rstring topic, boolean allowFilter)
Get the subscription that matches a topic for topic based subscription with explicit matching for the publisher allowing filters.
This is a low-level function that allows applications to build custom composites with functionality similar to FilteredSubscribe. The preference should be to use FilteredSubscribe where possible.
Parameters
- topic: Topic to subscribe to.
- allowFilter: 'true to match publishers that allow filters, false to match those that do not allow filters. *
Returns
- rstring
rstring andFilter(rstring cond1, rstring cond2)
Return a string representation of cond1 && cond2.
Suitable for use as a remote filter for FilteredSubscribe.
Parameters
- cond1: Second condition
- cond2
Returns
- rstring
rstring orFilter(rstring cond1, rstring cond2)
Return a string representation of cond1 || cond2.
Suitable for use as a remote filter for FilteredSubscribe.
Parameters
- cond1: Second condition
- cond2
Returns
- rstring
rstring equalsFilter(rstring name, int64 value)
Return a string representation of name == value for a int64 attribute.
Suitable for use as a remote filter for FilteredSubscribe.
Parameters
- name: Attribute name.
- value: Value attribute must equal.
Returns
- rstring
rstring equalsFilter(rstring name, rstring value)
Return a string representation of name == value for a rstring attribute.
Suitable for use as a remote filter for FilteredSubscribe.
Parameters
- name: Attribute name.
- value: Value attribute must equal.
Returns
- rstring
rstring equalsFilter(rstring name, float64 value)
Return a string representation of name == value for a float64 attribute.
Suitable for use as a remote filter for FilteredSubscribe.
Parameters
- name: Attribute name.
- value: Value attribute must equal.
Returns
- rstring
rstring inFilter(rstring name, list<rstring> values)
Return a string representation of filter name in values for a rstring attribute.
If values is empty then the filter expression is equivalent name != "" with the intention of passing all tuples.
Suitable for use as a remote filter for FilteredSubscribe.
Parameters
- name: Attribute name.
- values: Attribute must match one of values. If empty then the expression matches any non-empty string.
Returns
- rstring
int32 setSubscribeFilter(rstring filter)
Set filter for input port zero. Calls setInputPortImportFilterExpression logging an error if the filter could not be set.
Parameters
- filter
Returns
- int32