Listing 1: SleepyObject.java

package net.sourceforge.concurrentQuery.article.serialized;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import net.sourceforge.concurrentQuery.test.pool.JDCConnectionDriver;

public class SleepyObject {
    private String jdbcDriver = "org.postgresql.Driver";
    private String jdbcURL = "jdbc:postgresql://localhost:5432/test?user=postgres";
    private String jdbcPoolURL = "jdbc:jdc:jdcpool";
    private String jdbcUser = "postgres";
    private String jdbcPasswd = "postgres"

    private int value = 0;

    public SleepyObject(int sleepSeconds) throws SQLException {
        try {
            Class.forName(jdbcDriver).newInstance();
        } catch (Exception e) {
            throw new SQLException(e.getMessage());
        }
        try {
            new JDCConnectionDriver(jdbcDriver, jdbcURL, jdbcUser, jdbcPasswd);
        } catch (Exception e) {
            throw new SQLException(e.getMessage());
        }
        create(sleepSeconds);
    }

    private void create(int sleepSeconds) throws SQLException {
        Connection connection = DriverManager.getConnection(jdbcPoolURL, jdbcUser, jdbcPasswd);
        Statement statement = connection.createStatement();
        String sql = "select sleep(" + sleepSeconds + ")";
        System.out.println("query is: " + sql);
        if (statement.execute(sql)) {
            ResultSet resultSet = statement.getResultSet();
            if (resultSet.next()) {
                value = resultSet.getInt(1);
            }
            connection.close();
        }
    }

    public int getValue() {
        return value;
    }
}


Listing 2: ConcurrentQueryThreadImpl.java properties and initialization

public class ConcurrentQueryThreadImpl {
    private String jdbcDriver = "org.postgresql.Driver";
    private String jdbcURL = "jdbc:postgresql://localhost:5432/test?user=postgres";
    private String jdbcPoolURL = "jdbc:jdc:jdcpool";
    private String jdbcUser = "postgres";
    private String jdbcPasswd = "postgres";

    private static ConcurrentQueryThreadImpl instance = null;
    private final int numberOfConcurrentQueries = 5;
    private static ConcurrentHashMap<CanResolveAConcurrentQuery, String> queuedQueries;
    private static ConcurrentHashMap<QueryThread, CanResolveAConcurrentQuery> runningThreads;

    public static ConcurrentQueryThreadImpl getInstance() throws SQLException {
        if (instance == null) {
            instance = new ConcurrentQueryThreadImpl();
            queuedQueries = new ConcurrentHashMap<CanResolveAConcurrentQuery, String>();
            runningThreads = new ConcurrentHashMap<QueryThread, CanResolveAConcurrentQuery>();
        }
        return instance;
    }

    private ConcurrentQueryThreadImpl() throws SQLException {
        try {
                Class.forName(jdbcDriver).newInstance();
        } catch (Exception e) {
                throw new SQLException(e.getMessage());
        }
        try {
                new JDCConnectionDriver(jdbcDriver, jdbcURL, jdbcUser, jdbcPasswd);
        } catch (Exception e) {
                throw new SQLException(e.getMessage());
        }
    }

== stuff deleted ==


Listing 3: ConcurrentSleepyObject.java

package net.sourceforge.concurrentQuery.article.concurrent;
import java.sql.ResultSet;
import java.sql.SQLException;

public class ConcurrentSleepyObject implements CanResolveAConcurrentQuery {

    private int value = 0;
    private boolean reaped = false;

    public ConcurrentSleepyObject(int seconds) throws SQLException {
        create(seconds);
    }

    private void create(int sleepSeconds) throws SQLException {
        ConcurrentQueryThreadImpl.getInstance().runQuery("select sleep("+ sleepSeconds + ")", this);
    }

    public int getValue() throws SQLException {
        if (!reaped) {
            ConcurrentQueryThreadImpl.getInstance().waitForQueriesToComplete();
        }
        return value;
    }

    // implemented method to process JDBC results
    public boolean processResultSet(ResultSet rs) throws SQLException {
        if (rs.next()) {
            value = rs.getInt(1);
            return true;
        } else {
            return false;
        }

    }

    public void setReaped(boolean isReaped) {
        this.reaped = isReaped;
    }


}


Listing 4: Code fragment of ConcurrentQueryImpl.java

public class ConcurrentQueryThreadImpl {
...
public void runQuery(String query, CanResolveAConcurrentQuery domainObject) throws SQLException {
        Connection connection = DriverManager.getConnection(jdbcPoolURL);

        // reap any results from completed threads, if any
        reapCompletedThreads();

        // before we start a QueryThread for this query, let's submit any queries
        // that have already been queued
        while (!queuedQueries.isEmpty()
                && runningThreads.size() < numberOfConcurrentQueries) {
            CanResolveAConcurrentQuery queuedDomainObject 
                  = (CanResolveAConcurrentQuery)queuedQueries.keySet().toArray()[0];
            String queuedQuery = queuedQueries.get(queuedDomainObject);
            queuedQueries.remove(queuedDomainObject);
            runningThreads.put(new QueryThread(connection, queuedQuery),queuedDomainObject);
        }

        // now, either start a thread for this query or add it to the queued queries.
        if (runningThreads.size() < numberOfConcurrentQueries) {
            runningThreads.put(new QueryThread(connection, query), domainObject);;
        } else {
            queuedQueries.put(domainObject, query);
        }
    }

    public void waitForQueriesToComplete() throws SQLException {
        while (!queuedQueries.isEmpty() || !runningThreads.isEmpty()) {
            do {
                reapCompletedThreads();
            } while (!runningThreads.isEmpty());

            while (!queuedQueries.isEmpty()
                    && runningThreads.size() < numberOfConcurrentQueries) {
                CanResolveAConcurrentQuery queuedDomainObject
                      = (CanResolveAConcurrentQuery)queuedQueries.keySet().toArray()[0];
                String queuedQuery = queuedQueries.get(queuedDomainObject);
                queuedQueries.remove(queuedDomainObject);
                runningThreads.put(new QueryThread(
				 DriverManager.getConnection(jdbcPoolURL),queuedQuery),queuedDomainObject);
            }
        }
    }
...


Listing 5: Implementation of a QueryThread private class in ConcurrentQueryThreadImpl

public class ConcurrentQueryThreadImpl {

...

private class QueryThread extends Thread implements IsAConcurrentQueryThreadRunner {

        private Connection connection;
        private String query;
        private ResultSet resultSet;
        private SQLException sqlException;

        private QueryThread() {}
        public QueryThread(Connection connection, String query) {
            System.out.println("(QueryThread) query is: " + query);
            this.connection = connection;
            this.query = query;
            start();
        }

        public void run() {
            Statement statement = null;
            try {
                statement = connection.createStatement();
               if (statement.execute(query)) {
                   resultSet = statement.getResultSet();
                   connection.close();
               }
            } catch (SQLException e) {
                sqlException = e;
            }
        }
        public SQLException getSQLException() {
            return sqlException;
        }
        public ResultSet getResultSet() {
            return resultSet;
        }

    };
...


Listing 6: QueryThreadPool private class

private static ExecutorService executor = Executors.newFixedThreadPool(numberOfConcurrentQueries);
    private class QueryThreadPool implements IsAConcurrentQueryThreadRunner, Callable {
        private Connection connection;
        private String query;
        private ResultSet resultSet;
        private SQLException sqlException;
        private Future future;

        private QueryThreadPool() {}
        public QueryThreadPool(Connection connection, String query) {
                System.out.println("(QueryThreadPool) query is: " + query);
                this.connection = connection;
                this.query = query;
            future = executor.submit(this);
        }
        public ResultSet call() throws SQLException {
            Statement statement = null;
            try {
                statement = connection.createStatement();
               if (statement.execute(query)) {
                   resultSet = statement.getResultSet();
                   connection.close();
               }
            } catch (SQLException e) {
                sqlException = e;
                throw new SQLException(e.getMessage());
            }
            return resultSet;
        }

        public SQLException getSQLException() {
            return sqlException;
        }

        public ResultSet getResultSet() {
            return resultSet;
        }

        public boolean isAlive() {
            return !future.isDone();
        }

        
    };


Listing 7:  ConcurrentQueryThreadImpl.java

package net.sourceforge.concurrentQuery.article.concurrent;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import net.sourceforge.concurrentQuery.test.pool.JDCConnectionDriver;

public class ConcurrentQueryThreadImpl {
    private String jdbcDriver = "org.postgresql.Driver";
    private String jdbcURL = "jdbc:postgresql://localhost:5432/test?user=postgres";
    private String jdbcPoolURL = "jdbc:jdc:jdcpool";
    private String jdbcUser = "postgres";
    private String jdbcPasswd = "postgres";

    private static ConcurrentQueryThreadImpl instance = null;
    private static final int numberOfConcurrentQueries = 5;
    private static ConcurrentHashMap<CanResolveAConcurrentQuery, String>queuedQueries;
    private static ConcurrentHashMap<IsAConcurrentQueryThreadRunner,
	  CanResolveAConcurrentQuery> runningThreads;

    public static ConcurrentQueryThreadImpl getInstance() throws SQLException {
        if (instance == null) {
            instance = new ConcurrentQueryThreadImpl();
            queuedQueries = new ConcurrentHashMap<CanResolveAConcurrentQuery, String>();
            runningThreads = new ConcurrentHashMap<IsAConcurrentQueryThreadRunner,
			  CanResolveAConcurrentQuery>();
        }
        return instance;
    }

    private ConcurrentQueryThreadImpl() throws SQLException {
        try {
                Class.forName(jdbcDriver).newInstance();
        } catch (Exception e) {
                throw new SQLException(e.getMessage());
        }
        try {
                new JDCConnectionDriver(jdbcDriver, jdbcURL, jdbcUser, jdbcPasswd);
        } catch (Exception e) {
                throw new SQLException(e.getMessage());
        }
    }

    public void runQuery(String query, CanResolveAConcurrentQuery domainObject)
	      throws SQLException {
        Connection connection = DriverManager.getConnection(jdbcPoolURL);

        // reap any results from completed threads, if any
        reapCompletedThreads();

        // before we start a QueryThread for this query, let's submit any queries
        // that have already been queued
        while (!queuedQueries.isEmpty()
                && runningThreads.size() < numberOfConcurrentQueries) {
            CanResolveAConcurrentQuery queuedDomainObject
			   = (CanResolveAConcurrentQuery) queuedQueries.keySet().toArray()[0];
            String queuedQuery = queuedQueries.get(queuedDomainObject);
            queuedQueries.remove(queuedDomainObject);
            runningThreads.put(new QueryThread(connection, queuedQuery),queuedDomainObject);
        }

        // now, either start a thread for this query or add it to the queued queries.
        if (runningThreads.size() < numberOfConcurrentQueries) {
            runningThreads.put(new QueryThread(connection, query), domainObject);;
        } else {
            queuedQueries.put(domainObject, query);
        }
    }

    public void waitForQueriesToComplete() throws SQLException {
        while (!queuedQueries.isEmpty() || !runningThreads.isEmpty()) {
            do {
                reapCompletedThreads();
            } while (!runningThreads.isEmpty());

            while (!queuedQueries.isEmpty()
                    && runningThreads.size() < numberOfConcurrentQueries) {
                CanResolveAConcurrentQuery queuedDomainObject
				   = (CanResolveAConcurrentQuery)queuedQueries.keySet().toArray()[0];
                String queuedQuery = queuedQueries.get(queuedDomainObject);
                queuedQueries.remove(queuedDomainObject);
                runningThreads.put(new QueryThread(
				  DriverManager.getConnection(jdbcPoolURL),queuedQuery),queuedDomainObject);
            }
        }
    }

    private void reapCompletedThreads() throws SQLException {

        for (IsAConcurrentQueryThreadRunner queryThread : runningThreads.keySet()) {
            if (! queryThread.isAlive()) {
                CanResolveAConcurrentQuery domainObject = runningThreads.get(queryThread);
                domainObject.setReaped(true);
                if (queryThread.getSQLException() != null) {
                    runningThreads.remove(queryThread);
                    throw new SQLException(queryThread.getSQLException().getMessage());
                } else {
                    domainObject.processResultSet(queryThread.getResultSet());
                    runningThreads.remove(queryThread);
                }
            }
        }
    }

    private class QueryThread extends Thread implements IsAConcurrentQueryThreadRunner {

        private Connection connection;
        private String query;
        private ResultSet resultSet;
        private SQLException sqlException;
        private QueryThread() {}
        public QueryThread(Connection connection, String query) {
            System.out.println("(QueryThread) query is: " + query);
            this.connection = connection;
            this.query = query;
            start();
        }

        public void run() {
            Statement statement = null;
            try {
                statement = connection.createStatement();
               if (statement.execute(query)) {
                   resultSet = statement.getResultSet();
                   connection.close();
               }
            } catch (SQLException e) {
                sqlException = e;
            }

        }
        public SQLException getSQLException() {
            return sqlException;
        }
        public ResultSet getResultSet() {
            return resultSet;
        }

    };

    private static ExecutorService executor 
	   = Executors.newFixedThreadPool(numberOfConcurrentQueries);

    private class QueryThreadPool implements IsAConcurrentQueryThreadRunner, Callable {
        private Connection connection;
        private String query;
        private ResultSet resultSet;
        private SQLException sqlException;
        private Future future;

        private QueryThreadPool() {}
        public QueryThreadPool(Connection connection, String query) {
                System.out.println("(QueryThreadPool) query is: " + query);
                this.connection = connection;
                this.query = query;
            future = executor.submit(this);
        }
        public ResultSet call() throws SQLException {
            Statement statement = null;
            try {
                statement = connection.createStatement();
               if (statement.execute(query)) {
                   resultSet = statement.getResultSet();
                   connection.close();
               }
            } catch (SQLException e) {
                sqlException = e;
                throw new SQLException(e.getMessage());
            }
            return resultSet;
        }

        public SQLException getSQLException() {
            return sqlException;
        }

        public ResultSet getResultSet() {
            return resultSet;
        }

        public boolean isAlive() {
            return !future.isDone();
        }

        
    };
}


Listing 8: CountryList.java

package net.sourceforge.concurrentQuery.domain.model;

import net.sourceforge.concurrentQuery.domain.ResolvableFromConcurrentQuery;
import net.sourceforge.concurrentQuery.query.ConcurrentQueryFactoryBroker;
import net.sourceforge.concurrentQuery.query.ConcurrentQueryInterface;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
import java.util.TreeMap;

public class CountryList  implements ResolvableFromConcurrentQuery {

    private ConcurrentQueryInterface cq = null;
    private Map<String, DistrictList> mapOfCountries = new TreeMap<String, DistrictList>();

    private CountryList(){}
    public CountryList(int numberOfCities) throws SQLException {

        this.cq = ConcurrentQueryFactoryBroker.getInstance().createConcurrentQuery();

        cq.addQuery("select countrycode from city "
		   + "group by countrycode "
		   + "having count(*) > " + numberOfCities, this);
        cq.resolve();

        for (String country : mapOfCountries.keySet()) {
            mapOfCountries.get(country).fetchDistrictInfo(country);
        }

        cq.resolve();
    }

    public boolean processResultSet(ResultSet rs) throws SQLException {

        if (rs != null) {
            while (rs.next()) {
                mapOfCountries.put(rs.getString("countrycode"), newDistrictList());

            }
        }

        return true;
    }

    public Map<String, DistrictList> getMapOfCountries() {
        return mapOfCountries;
    }
}