Listing 1
public class TaskFIFOQueue{
private static TaskFIFOQueue instance=null;
private Vector queue;
private TaskFIFOQueue(){
queue=new Vector();
}
public static TaskFIFOQueue getInstance(){
if(instance == null){
instance=new TaskFIFOQueue ();
}
return instance;
}
public synchronized void addOneTask(Task task){
queue.add(task);
notifyAll();
}
public synchronized Task getOneTask()
throws InterruptedException{
if(queue.isEmpty()) {
wait();
}
Task task = (Task) queue.get(0);
queue.remove(0);
notifyAll();
return task;
}
}
Listing 2
public class ThreadPool {
private static ThreadPool instance=null;
private Thread pool [];
private static final int POOL_SIZE=10;
public static ThreadPool getInstance() {
if(instance == null) {
instance=new ThreadPool (POOL_SIZE);
}
return instance;
}
private ThreadPool (int size) {
TaskFIFOQueue taskQueue=TaskFIFOQueue.getInstance();
pool = new Thread[size];
for (int i=0;i<size;i++) {
pool[i]= new Thread (new WorkerThread(taskQueue));
}
for (int i=0;i<size;i++) {
pool[i].start();
}
}
}
public class WorkerThread implements Runnable {
private TaskFIFOQueue taskQueue;
public WorkerThread (TaskFIFOQueue taskQueue) {
this.taskQueue=taskQueue;
}
public void run() {
do {
try {
Task task = taskQueue.getOneTask();
task.execute();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
} while (true);
}
}
Listing 3
public class SessionDispatcherBean implements SessionBean {
private SessionContext ctx;
private final static String JMS_FACTORY=
"customthread.jms.QueueConnectionFactory";
private final static String QUEUE=
"customthread.taskQueue";
private QueueConnectionFactory qconFactory;
private QueueConnection qcon;
private QueueSession qsession;
private QueueSender qsender;
private Queue queue;
private ObjectMessage msg;
public void ejbCreate () throws CreateException {
try {
InitialContext ic = new InitialContext();
qconFactory=(QueueConnectionFactory) ic.lookup(JMS_FACTORY);
qcon=qconFactory.createQueueConnection();
qsession=qcon.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
queue=(Queue)ic.lookup(QUEUE);
qsender=qsession.createSender(queue);
qcon.start();
} catch (NamingException ne) {
throw new CreateException(ne.getMessage());
} catch (JMSException je) {
throw new CreateException(je.getMessage());
}
}
public void ejbRemove() {
try {
qcon.close();
} catch ( JMSException je) {
System.out.println("JMSException thrown in ejbRemove "+je);
}
}
public void processTrade (String stockSymbol, int shares, float price, String userId,
String tradePassword)
throws EJBException {
try {
Task trade = new Order(stockSymbol, shares, price, userId, tradePassword);
send(trade);
} catch (JMSException je) {
throw new EJBException (je.getMessage());
}
}
private void send(Task task) throws JMSException {
if ( msg == null ) {
msg=qsession.createObjectMessage();
}
msg.setObject(task);
msg.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
qsender.send(msg);
}
}
Listing 4
public class WorkerMDB
implements MessageDrivenBean, MessageListener {
private MessageDrivenContext ctx;
public void onMessage(Message msg) {
Object obj=null;
Task task=null;
try {
if (msg instanceof ObjectMessage) {
obj= ( (ObjectMessage) msg ).getObject();
if( obj instanceof Task ) {
task =(Task) obj;
task.execute();
} else {
System.out.println("Wrong object type.");
}
} else {
System.out.println("Wrong message type.");
}
} catch(JMSException ex) {
ex.printStackTrace();
}
}
}
Listing 5
Environment env = new Environment();
env.setProviderUrl(url);
env.setSecurityPrincipal(username);
env.setSecurityCredentials(password);
Context ctx = env.getInitialContext();
MBeanHome home = (MBeanHome) ctx.lookup(MBeanHome.ADMIN_JNDI_NAME);
String domain=home.getDomainName();
//Retrieve all JMSDestinationRuntimeMBean instances in the domain.
Set mbeanSet = home.getMBeansByType("JMSDestinationRuntime");
List list = Collections.synchronizedList(new ArrayList());
Iterator rIt = mbeanSet.iterator();
int i = 0;
while (rIt.hasNext()) {
WebLogicMBean bean = (WebLogicMBean) rIt.next();
//Get the objectName for observed MBean.
WebLogicObjectName observedObjectName = bean.getObjectName();
//Monitor the queues for MDB-based multithreading framework.
if (observedObjectName.getName().equals("customThread.taskQueue")) {
//Instantiate a monitor object.
CounterMonitor monitor = new CounterMonitor();
//Configure the monitor object
monitor.setThreshold(new Integer(100));
monitor.setNotify(true);
monitor.setObservedObject(observedObjectName);
monitor.setObservedAttribute("MessagesCurrentCount");
//Instantiate and register a notification listener
MonitoringFrameworkListener listener = new MonitoringFrameworkListener();
monitor.addNotificationListener(listener, null, null);
String location = observedObjectName.getLocation();
MBeanHome localhome = (MBeanHome) ctx.lookup(MBeanHome.JNDI_NAME +"." +
location);
RemoteMBeanServer rmbs = localhome.getMBeanServer();
//Construct the objectName for monitor object.
WebLogicObjectName monitorObjectName = new WebLogicObjectName
("MessageCounter" + (++i), "CounterMonitor", domain, location);
//Pre-registering the monitor object with the objectName.
monitor.preRegister(rmbs, monitorObjectName);
//Start the monitor object.
monitor.start();
list.add(monitor);
observedObjectName = null;
}
}