Listing 1

public class TaskFIFOQueue{
  private static TaskFIFOQueue instance=null;
  private Vector queue;

  private TaskFIFOQueue(){
   queue=new Vector();
  }
  public static TaskFIFOQueue getInstance(){
    if(instance == null){
      instance=new TaskFIFOQueue ();
    }
    return instance;
  }
  public synchronized void addOneTask(Task task){
    queue.add(task);
    notifyAll();
  }
  public synchronized Task getOneTask()
    throws InterruptedException{
      if(queue.isEmpty()) {
        wait();
      }
      Task task = (Task) queue.get(0);
      queue.remove(0);
      notifyAll();
      return task;
   }
}


Listing 2

public class ThreadPool {
       private static ThreadPool instance=null;
       private Thread pool [];
       private static final int POOL_SIZE=10;

       public static ThreadPool getInstance() {

              if(instance == null) {
                   instance=new ThreadPool (POOL_SIZE);
              }
              return instance;
       }

       private ThreadPool (int size) {

              TaskFIFOQueue taskQueue=TaskFIFOQueue.getInstance();
              pool = new Thread[size];

              for (int i=0;i<size;i++) {
                  pool[i]= new Thread (new WorkerThread(taskQueue));
              }

              for (int i=0;i<size;i++) {
                  pool[i].start();
              }
       }
}

public class WorkerThread implements Runnable {

       private TaskFIFOQueue taskQueue;

       public WorkerThread (TaskFIFOQueue taskQueue) {
              this.taskQueue=taskQueue;
       }

       public void run() {
              do {
                   try {
                         Task task = taskQueue.getOneTask();
                         task.execute();
                   } catch (InterruptedException ex) {
                         ex.printStackTrace();
                   }
              } while (true);
       }
}


Listing 3

public class SessionDispatcherBean implements SessionBean {

  private SessionContext ctx;

  private final static String JMS_FACTORY=
      "customthread.jms.QueueConnectionFactory";
  private final static String QUEUE=
      "customthread.taskQueue";

  private QueueConnectionFactory qconFactory;
  private QueueConnection qcon;
  private QueueSession qsession;
  private QueueSender qsender;
  private Queue queue;
  private ObjectMessage msg;

public void ejbCreate () throws CreateException {
    try {
      InitialContext ic = new InitialContext();

      qconFactory=(QueueConnectionFactory) ic.lookup(JMS_FACTORY);
      qcon=qconFactory.createQueueConnection();
      qsession=qcon.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
      queue=(Queue)ic.lookup(QUEUE);
      qsender=qsession.createSender(queue);
      qcon.start();
    } catch (NamingException ne) {
      throw new CreateException(ne.getMessage());
    } catch (JMSException je) {
      throw new CreateException(je.getMessage());
    }
  }

  public void ejbRemove() {
     try {
          qcon.close();
     } catch ( JMSException je) {
          System.out.println("JMSException thrown in ejbRemove "+je);
     }
  }

  public void processTrade (String stockSymbol, int shares, float price, String userId,
  String tradePassword)
         throws EJBException {
     try {
       Task trade = new Order(stockSymbol, shares, price, userId, tradePassword);
       send(trade);
     } catch (JMSException je) {
          throw new EJBException (je.getMessage());
     }
  }

  private void send(Task task) throws JMSException {
           if ( msg == null ) {
                msg=qsession.createObjectMessage();
           }
           msg.setObject(task);
           msg.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
           qsender.send(msg);
  }
}


Listing 4

           public class WorkerMDB
  implements MessageDrivenBean, MessageListener {
  private MessageDrivenContext ctx;

  public void onMessage(Message msg) {
         Object obj=null;
         Task task=null;
         try {
                if (msg instanceof ObjectMessage) {
                     obj= ( (ObjectMessage) msg ).getObject();
	      if( obj instanceof Task ) {
		  task =(Task) obj;
			  task.execute();
     	                } else {
        		      System.out.println("Wrong object type.");
                }
         } else {
              	      System.out.println("Wrong message type.");
                       }
    } catch(JMSException ex) {
      ex.printStackTrace();
    }
  }
}


Listing 5

         Environment env = new Environment();
           env.setProviderUrl(url);
           env.setSecurityPrincipal(username);
           env.setSecurityCredentials(password);
           Context ctx = env.getInitialContext();
           MBeanHome home = (MBeanHome) ctx.lookup(MBeanHome.ADMIN_JNDI_NAME);
           String domain=home.getDomainName();

           //Retrieve all JMSDestinationRuntimeMBean instances in the domain.
           Set mbeanSet = home.getMBeansByType("JMSDestinationRuntime");
           List list = Collections.synchronizedList(new ArrayList());
           Iterator rIt = mbeanSet.iterator();
           int i = 0;

           while (rIt.hasNext()) {
             WebLogicMBean bean = (WebLogicMBean) rIt.next();
             //Get the objectName for observed MBean.
             WebLogicObjectName observedObjectName = bean.getObjectName();

             //Monitor the queues for MDB-based multithreading framework.
             if (observedObjectName.getName().equals("customThread.taskQueue")) {

               //Instantiate a monitor object.
               CounterMonitor monitor = new CounterMonitor();
               //Configure the monitor object
               monitor.setThreshold(new Integer(100));
               monitor.setNotify(true);
               monitor.setObservedObject(observedObjectName);
               monitor.setObservedAttribute("MessagesCurrentCount");

               //Instantiate and register a notification listener
               MonitoringFrameworkListener listener = new MonitoringFrameworkListener();
               monitor.addNotificationListener(listener, null, null);

               String location = observedObjectName.getLocation();
               MBeanHome localhome = (MBeanHome) ctx.lookup(MBeanHome.JNDI_NAME +"." +
			    location);
               RemoteMBeanServer rmbs = localhome.getMBeanServer();

               //Construct the objectName for monitor object.
               WebLogicObjectName monitorObjectName = new WebLogicObjectName
			    ("MessageCounter" + (++i), "CounterMonitor", domain, location);
               //Pre-registering the monitor object with the objectName.
               monitor.preRegister(rmbs, monitorObjectName);
               //Start the monitor object.
               monitor.start();

list.add(monitor);
observedObjectName = null;
           }
         }

Additional Source Code - Zip file 9KB