Processing JMX API notifications

To successfully process JMX notifications, your application must implement the NotificationListener interface, become a listener for notifications, and be tolerant of potential lost notifications.

Procedure

  1. Implement the NotificationListener interface.

    To receive JMX notifications, you must implement the Java™ javax.management.NotificationListener interface. The interface contains a single handleNotification method, where you can add your notification handling logic.

  2. Make the application a listener for notifications.

    To become a listener for notifications for a JMX bean, you must call the javax.management.MBeanServerConnection.addNotificationListener method. To limit which notifications are received, you can pass an optional filter on the call.

  3. Make the application tolerant of potential lost notifications. For a complete notification-handling implementation, potential lost notifications must be addressed. This tolerance is necessary because JMX guarantees that an application either receives all notifications for which it is listening or can discover that notifications might have been lost. An application can discover when notifications are lost by registering a connection notification listener. To register a connection notification listener, use the javax.management.remote.JMXConnector.addConnectionNotificationListener method.

Example

The following example Java code uses the Teracloud Streams JMX API to register a job and keeps it registered by handling inactivity warning notifications:
import java.math.BigInteger;
import java.util.HashMap;

import javax.management.JMX;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import javax.management.Notification;
import javax.management.NotificationFilterSupport;
import javax.management.NotificationListener;

import com.ibm.streams.management.Notifications;
import com.ibm.streams.management.ObjectNameBuilder;
import com.ibm.streams.management.instance.InstanceMXBean;
import com.ibm.streams.management.job.JobMXBean;

// javac -cp com.ibm.streams.management.jmxmp.jar:com.ibm.streams.management.mx.jar Client.java
// java -cp .:com.ibm.streams.management.jmxmp.jar:com.ibm.streams.management.mx.jar:jmxremote_optional.jar:glassfish-corba-omgapi-[version].jar:commons-cli-[version].jar 
//   Client service:jmx:jmxmp://server:9975 domainA instanceA 0 user password
// Note: It is important to include com.ibm.streams.management.jmxmp.jar in the class path before jmxremote_optional.jar
public class Client implements NotificationListener {
  JobMXBean _job;
  int _inactivityCount = 0;

  public Client(String jmxUrl, String domainName, String instanceName, String jobId, String user, String password) {
    try {             
      HashMap<String, Object> env = new HashMap<String, Object>();
      String [] creds = {user, password};
      env.put("jmx.remote.credentials", creds);
      env.put("jmx.remote.protocol.provider.pkgs", "com.ibm.streams.management");

      JMXConnector jmxc = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), env);
      MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();

      ObjectName instanceObjName = ObjectNameBuilder.instance(domainName, instanceName);
      InstanceMXBean instance = JMX.newMXBeanProxy(mbsc, instanceObjName, InstanceMXBean.class, true);

      ObjectName jobObjName = instance.registerJob(new BigInteger(jobId));
      _job = JMX.newMXBeanProxy(mbsc, jobObjName, JobMXBean.class, true);
      System.out.println("Job name: " + _job.getName());

      NotificationFilterSupport filter = new NotificationFilterSupport();
      filter.enableType(Notifications.INACTIVITY_WARNING);
      mbsc.addNotificationListener(jobObjName, this, filter, null);
      jmxc.addConnectionNotificationListener(this,null,null); // listen for potential lost notifications
    }
    catch (Exception e) {
      e.printStackTrace();
    }
  }

  public void showStatus() {
    try {
      while (true) {  
        Thread.sleep(60000);
        System.out.println("Number of times in status loop since last job access: " + ++_inactivityCount);
      }
    }
    catch (Exception e) {
      e.printStackTrace();
    }
  }
   
  public void handleNotification(Notification notification, Object handback) { 
    System.out.println("Notification type: " + notification.getType());
    System.out.println("Notification source: " + notification.getSource());
    _job.keepRegistered();  // call on inactivity warning of if notifications were potentially lost
    _inactivityCount = 0;
  }
 
  public static void main(String[] args) {
    try {
      Client client = new Client(args[0],  // jmxUrl - use streamtool getjmxconnect to find
                                 args[1],  // domainName
                                 args[2],  // instanceName
                                 args[3],  // jobId 
                                 args[4],  // user - use streamtool setacl 
                                           // or streamtool setacl to assign required permissions
                                 args[5]); // password
      client.showStatus();
    }
    catch (Exception e) {
      e.printStackTrace();
    }
  }
}