To successfully process JMX notifications, your
application must implement the NotificationListener interface,
become a listener for notifications, and be tolerant of potential
lost notifications.
Procedure
- Implement the NotificationListener interface.
To receive JMX notifications, you must implement the Java™ javax.management.NotificationListener interface.
The interface contains a single handleNotification method,
where you can add your notification handling logic.
- Make the application a listener for notifications.
To become a listener for notifications for a JMX bean, you
must call the javax.management.MBeanServerConnection.addNotificationListener method.
To limit which notifications are received, you can pass an optional
filter on the call.
- Make the application tolerant of potential lost notifications. For a complete notification-handling implementation, potential
lost notifications must be addressed. This tolerance is necessary
because JMX guarantees that an application either receives all notifications
for which it is listening or can discover that notifications might have
been lost. An application can discover when notifications are lost
by registering a connection notification listener. To register a connection
notification listener, use the javax.management.remote.JMXConnector.addConnectionNotificationListener method.
Example
The following example Java code uses the
Teracloud
Streams JMX API to register a job and keeps it registered by handling inactivity warning
notifications:
import java.math.BigInteger;
import java.util.HashMap;
import javax.management.JMX;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import javax.management.Notification;
import javax.management.NotificationFilterSupport;
import javax.management.NotificationListener;
import com.ibm.streams.management.Notifications;
import com.ibm.streams.management.ObjectNameBuilder;
import com.ibm.streams.management.instance.InstanceMXBean;
import com.ibm.streams.management.job.JobMXBean;
// javac -cp com.ibm.streams.management.jmxmp.jar:com.ibm.streams.management.mx.jar Client.java
// java -cp .:com.ibm.streams.management.jmxmp.jar:com.ibm.streams.management.mx.jar:jmxremote_optional.jar:glassfish-corba-omgapi-[version].jar:commons-cli-[version].jar
// Client service:jmx:jmxmp://server:9975 domainA instanceA 0 user password
// Note: It is important to include com.ibm.streams.management.jmxmp.jar in the class path before jmxremote_optional.jar
public class Client implements NotificationListener {
JobMXBean _job;
int _inactivityCount = 0;
public Client(String jmxUrl, String domainName, String instanceName, String jobId, String user, String password) {
try {
HashMap<String, Object> env = new HashMap<String, Object>();
String [] creds = {user, password};
env.put("jmx.remote.credentials", creds);
env.put("jmx.remote.protocol.provider.pkgs", "com.ibm.streams.management");
JMXConnector jmxc = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), env);
MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
ObjectName instanceObjName = ObjectNameBuilder.instance(domainName, instanceName);
InstanceMXBean instance = JMX.newMXBeanProxy(mbsc, instanceObjName, InstanceMXBean.class, true);
ObjectName jobObjName = instance.registerJob(new BigInteger(jobId));
_job = JMX.newMXBeanProxy(mbsc, jobObjName, JobMXBean.class, true);
System.out.println("Job name: " + _job.getName());
NotificationFilterSupport filter = new NotificationFilterSupport();
filter.enableType(Notifications.INACTIVITY_WARNING);
mbsc.addNotificationListener(jobObjName, this, filter, null);
jmxc.addConnectionNotificationListener(this,null,null); // listen for potential lost notifications
}
catch (Exception e) {
e.printStackTrace();
}
}
public void showStatus() {
try {
while (true) {
Thread.sleep(60000);
System.out.println("Number of times in status loop since last job access: " + ++_inactivityCount);
}
}
catch (Exception e) {
e.printStackTrace();
}
}
public void handleNotification(Notification notification, Object handback) {
System.out.println("Notification type: " + notification.getType());
System.out.println("Notification source: " + notification.getSource());
_job.keepRegistered(); // call on inactivity warning of if notifications were potentially lost
_inactivityCount = 0;
}
public static void main(String[] args) {
try {
Client client = new Client(args[0], // jmxUrl - use streamtool getjmxconnect to find
args[1], // domainName
args[2], // instanceName
args[3], // jobId
args[4], // user - use streamtool setacl
// or streamtool setacl to assign required permissions
args[5]); // password
client.showStatus();
}
catch (Exception e) {
e.printStackTrace();
}
}
}