Developing and running applications that use the DPS Toolkit
- Configure the product environment variables by entering the following command:
source product-installation-root-directory/7.2.0.0/bin/streamsprofile.sh
-
Ensure that all of the following additional RPMs required by the DPS toolkit are present on your system:
- curl
- curl-devel
- lua
- lua-devel
- openssl-devel
- openldap-devel
- Install and configure an external key-value data store that is supported by DPS.
- Configure the DPS toolkit to connect to the data store from step 3. See the Data store connection configuration page for more info.
Using the toolkit in SPL
All the functionality of the toolkit is available via the native functions in the com.teracloud.streams.store.distributed and com.teracloud.streams.lock.distributed namespaces. You need to include these namespaces in your SPL application via a use directive. See the "Getting Started" section below and the native function documentation has more details on how to use the toolkit within applications written in SPL.
If you have a C++ or Java operator or function that uses the DPS toolkit, your SPL application graph must also include an instance of the DPSAux operator. This operator does not require any additional configuration but must be present in order for your application to work correctly. See the DPSUsageFromJava sample for an example.
Using the toolkit in Java operators
Ensure that <dps_toolkit_home>/impl/java/lib/dps-helper.jar, which contains the Java implementation of the DPS functions is accessible to your application. Packages of interest are com.teracloud.streams.dps and com.teracloud.streams.dl. See the "Getting Started" section below and the Javadoc for details on using the API from operators written in Java.
PrimitiveOperator(name="MyJavaOperator", namespace="com.teracloud.demo", description="Java Operator MyJavaOperator")
@SharedLoader(true)
public class MyJavaOperator extends AbstractOperator {
...
}
Using the toolkit in C++ operators
Include the C++ header file DistributedProcessStoreWrappers.h found in impl/include. This is the main entry point for the C++ functions, which are in the C++ namespace com::teracloud::streams::store::distributed.
Getting Started
The following snippets demonstrate the basic usage of the toolkit from SPL and Java. Usage from C++ is very similar to the SPL example below.
SPL:
rstring dummyRstring = "";
uint32 dummyUint32 = 0u;
mutable uint64 err = 0ul;
mutable uint64 dbStore_handle = 0ul;
dbStore_handle = dpsCreateStore("myDBStore1", dummyRstring, dummyUint32, err);
if (err == 0ul ) { //no error occurred
//create lock for the store
mutable uint64 lock_id = dlCreateOrGetLock("My db store lock", err);
// Acquire the newly created lock, specifying a lease time and maximum time to wait to acquire the lock.
float64 max_wait = 10.0;
float64 lease_time = 10.0;
dlAcquireLock(lock_id, lease_time, max_wait, err);
//add a key/value pair to the store
mutable boolean result = true;
rstring key = "StreamsKey";
uint32 value = 399;
err = 0ul;
result = dpsPut(dbStore_handle, key, value, err);
if (err != 0ul) {
//use dpsGetLastStoreErrorCode() and dpsGetLastStoreErrorString() as needed
}
// finished our store operations, release the lock
err = 0ul;
dlReleaseLock(lock_id, err);
}
Java:
StoreFactory sf = DistributedStores.getStoreFactory();
Store store = null;
try {
//specify the SPL types for the keys and values in the store
String keyType = "rstring";
String valueType = "int32";
store = sf.createOrGetStore("Java Test Store1", keyType, valueType);
} catch (StoreFactoryException sfe) {
// use sfe.getErrorCode() and sfe.getErrorMessage()) for more info
}
...
//once ready to access the store,
//get the lock for the store, may have previously been created
LockFactory lf = DistributedLocks.getLockFactory();
Lock myLock = lf.createOrGetLock("Lock_For_Test_Store1");
// Acquire the lock
try {
myLock.acquireLock();
} catch (LockException le) {
System.out.print("Unable to acquire the lock named 'Lock_For_Test_Store1'");
System.out.println(" Error = " + le.getErrorCode() + ", Error msg = " + le.getErrorMessage());
throw le;
}
//perform store operations
store.put("Key", 39);
store.put("Key2", 50);
//release the lock when finished
myLock.releaseLock();
Note that error checking in the above examples is minimal.
Error Handling in C++ and SPL
Most C++ functions include a mutable err parameter that will contain the result of executing the function. If an error occurs, this variable's value will be non-zero. It is the caller's responsibility to provide a mutable parameter to contain the error code and check its value afterwards.
- For errors relating to the store functions, use dpsGetLastStoreErrorCode() and dpsGetLastStoreErrorString().
- For errors related to the TTL functions, use dpsGetLastErrorCodeTTL() and dpsGetLastErrorStringTTL().
- For locking related errors, use dlGetLastDistributedLockErrorCode() and dlGetLastDistributedLockErrorString().
The following example shows how to check for errors, after a function call, in this case, after creating a lock:
mutable uint64 err = 0ul;
mutable uint64 lock_id = dlCreateOrGetLock("My Sentinel Lock1", err);
if (err != 0ul) {
rstring msg = dlGetLastDistributedLockErrorString();
uint64 rc = dlGetLastDistributedLockErrorCode();
printStringLn("Error creating lock, rc = " + (rstring)(rc) + ", msg =" + msg );
}
Additional Examples
To specifically learn how to call the DPS APIs from SPL native functions, C++ and Java primitive operators, see the samples included in <STREAMS_INSTALL>/samples/com.teracloud.streams.dps or in <YOUR_DPS_TOOLKIT_DIRECTORY>/samples directory. The advanced sub-directory there contains examples that showcase bulk of the available DPS APIs.