Examples
This example show the use of multiple JMSSink operators with different parameter combinations.
composite Main {
graph
stream <int32 id, rstring fname, rstring lname>
MyPersonNamesStream = Beacon()
{
param
iterations:10u;
}
// JMSSink operator with connections document in the default directory ../etc/connections.xml
// (relative to the data directory)
() as MySink1 = JMSSink( MyPersonNamesStream )
{
param
connection : "amqConn";
access : "amqAccess";
}
// JMSSink operator with fully qualified name of connections document
() as MySink2 = JMSSink( MyPersonNamesStream )
{
param
connectionDocument : "/home/streamsuser/connections/JMSconnections.xml";
connection : "amqConn";
access : "amqAccess";
}
// JMSSink operator with optional output error port specified
stream <tuple< int32 id, rstring fname, rstring lname> inTuple, rstring errorMessage>
MySink3 = JMSSink(MyPersonNamesStream )
{
param
connection : "amqConn";
access : "amqAccess";
}
// JMSSink operator with reconnectionPolicy specified as NoRetry
() as MySink4 = JMSSink( MyPersonNamesStream )
{
param
connection : "amqConn";
access : "amqAccess";
reconnectionPolicy : "NoRetry";
}
// JMSSink operator with optional period and reconnectionPolicy specified
() as MySink5 = JMSSink( MyPersonNamesStream )
{
param
connection: "amqConn";
access: "amqAccess";
reconnectionPolicy : "InfiniteRetry";
period: 1.20;
}
// JMSSink operator with reconnectionPolicy specified as BoundedRetry
() as MySink6 = JMSSink( MyPersonNamesStream )
{
param
connection: "amqConn";
access: "amqAccess";
reconnectionPolicy : "BoundedRetry";
reconnectionBound : 2;
period: 1.20;
}
}
The following example shows a sample connections.xml file:
<st:connections xmlns:st="http://www.ibm.com/xmlns/prod/streams/adapters"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<connection_specifications>
<connection_specification name="amqConn">
<JMS initial_context="org.apache.activemq.jndi.ActiveMQInitialContextFactory"
provider_url = "tcp://machine1.com:61616"
connection_factory="ConnectionFactory"/>
</connection_specification>
</connection_specifications>
<access_specifications>
<access_specification name="amqAccess">
<destination identifier="dynamicQueues/MapQueue" message_class="bytes" />
<uses_connection connection="amqConn"/>
<native_schema>
<attribute name="id" type="Int" />
<attribute name="fname" type="Bytes" length="15" />
<attribute name="lname" type="Bytes" length="20" />
<attribute name="age" type="Int" />
<attribute name="gender" type="Bytes" length="1" />
<attribute name="score" type="Float" />
<attribute name="total" type="Double" />
</native_schema>
</access_specification>
</access_specifications>
</st:connections>