Examples
These examples demonstrate how to use the DSPFilter2 operator.
The following example uses the DSPFilter2 operator to produce a moving average of an IBM stock series:
use com.teracloud.streams.timeseries.analysis::DSPFilter2;
composite Main{
type
stock=tuple<rstring Date, float64 Open, float64 High,
float64 Low, float64 Close, float64 Volume, float64 AdjClose>;
graph
stream <stock> stockStream=FileSource(){
param
file: "stock.csv"; // read some stock data from file
format: csv;
initDelay: 1.0f;
}
stream <rstring Date, float64 Open, float64 MAOpen>
filteredStockStream=DSPFilter2(stockStream){
param
xcoef:{0u:0.2,1u:0.2,2u:0.2,3u:0.2,4u:0.2}; // implement a moving average with lag 2
ycoef:{0u:1.0}; // output value coefficient is always 1
inputTimeSeries: Open; // the time series to be filtered
output
filteredStockStream:
MAOpen=filteredTimeSeries(); // get the trend value
}
}
The following graph illustrates how short-term fluctuations in stock prices are smoothed out when you use a DSPFilter2 operator to calculate a moving average:

The following example demonstrates how you can use the DSPFilter2 operator to extract signal components. In particular, a sine wave is separated from a noisy sine wave. Two DSPFilter2 operators are used: the first extracts the sine wave and the second extracts the noise:
//Read the noisy sinewave
stream <float64 time, float64 noisysine> sineStream=FileSource(){
param
file: "noisysine.csv"; //get a noisy sinewave
format: csv;
initDelay: 1.0f;
}
//Extract the sine wave by using a low-pass filter
stream <float64 time, float64 noisysine, float64 sine>
extractSineStream=DSPFilter2(sineStream){
param
xcoef:{0u:0.2,1u:0.2,2u:0.2,3u:0.2,4u:0.2}; // implement a moving average of lag 5 to smooth out th noise and get sinewave
ycoef:{0u:1.0}; // output value coefficient is just one (this is a FIR filter)
inputTimeSeries: noisysine;
output
extractSineStream:
sine=filteredTimeSeries(); // get smoothed value, which is the original sine
}
//Now extract the noise by using a high-pass filter.
//The coefficients of the filter are estimated by using an external tool
stream<float64 time, float64 noisysine, float64 sine,
float64 noise> extractNoiseStream=DSPFilter2(extractSineStream)
{
param
xcoef:{0u:0.4328,1u:-1.7314,2u:2.5971,3u:-1.7314,4u:0.4328}; //xcoeff parameter
ycoef:{0u:1.0000,1u:-2.3695,2u:2.3140,3u:-1.0547,4u:0.18740}; //ycoeff parameter. value at 0 is always 1
inputTimeSeries:noisysine;
output
extractNoiseStream:
noise=filteredTimeSeries();
}
//Write out the result
() as SinkOP3=FileSink(extractNoiseStream){
param
file: "decomposedSine.csv";
}
The following graph illustrates the separation of a sine wave from a noisy sine wave:

The figure shows the noisy sine wave, as well as its extracted components: the sine wave and the noise.
The following example shows how the DSPFilter2 operator filters all signals less than 5.0 Hz from the multi dimensional ECG time series data:
composite Main {
type
ecgRaw = tuple < float64 delay, float64 ts, rstring patientID, list < float64 > ECGValue >;
ECGDerivative = tuple <float64 ts, rstring patientID, list <float64> derivative,list<float64> ECGValue>;
ECGDerivative2 = tuple < float64 ts, rstring patientID, list < float64 > derivative >;
graph
stream<ecgRaw> rawECGStream = FileSource(){
param
file : "ts.dat";
format: csv;
}
stream<ECGDerivative> ecgDerivativeStream= DSPFilter2(rawECGStream)
{
param
filterType: HighPass; //implement a high-pass filer
inputTimeSeries:ECGValue;
cutOffFrequency:5.0; /// cut of frequency of high-pass filter
samplingRate:1000.0; // sampling rate of the data in Herz
output
ecgDerivative-Stream:derivative=filteredTimeSeries();
}
() as SinkOP = FileSink(ecgDerivativeStream){
param
file: "firstOrder.csv";
format:csv;
}
}