一个通用并发对象池的实现

Published: 28 Feb 2014 Category: Java

这篇文章里我们主要讨论下如何在Java里实现一个对象池。最近几年,Java虚拟机的性能在各方面都得到了极大的提升,因此对大多数对象而言,已经没有必要通过对象池来提高性能了。根本的原因是,创建一个新的对象的开销已经不像过去那样昂贵了。

然而,还是有些对象,它们的创建开销是非常大的。比如线程,数据库连接等这些非轻量级的对象,创建这些对象还是会略有些昂贵。在任何一个应用程序里面,上述这些对象,我们肯定会用到不止一个。如果有一种很方便的创建管理这些对象的池,使得这些对象能够动态的重用,而客户端代码也不用关心它们的生命周期,还是会很给力的。

在真正开始写代码前,我们先来梳理下一个对象池需要完成哪些功能。

如果有可用的对象,对象池应当能返回给客户端。 客户端把对象放回池里后,可以对这些对象进行重用。 对象池能够创建新的对象来满足客户端不断增长的需求。 需要有一个正确关闭池的机制来确保关闭后不会发生内存泄露。

不用说了,上面几点就是我们要暴露给客户端的连接池的接口的基本功能。 我们的声明的接口如下:

package com.test.pool;
 
 
/**
 * Represents a cached pool of objects.
 *
 * @author Swaranga
 *
 * @param < T > the type of object to pool.
 */
public interface Pool< T >
{
 /**
  * Returns an instance from the pool.
  * The call may be a blocking one or a non-blocking one
  * and that is determined by the internal implementation.
  *
  * If the call is a blocking call,
  * the call returns immediately with a valid object
  * if available, else the thread is made to wait
  * until an object becomes available.
  * In case of a blocking call,
  * it is advised that clients react
  * to {@link InterruptedException} which might be thrown
  * when the thread waits for an object to become available.
  *
  * If the call is a non-blocking one,
  * the call returns immediately irrespective of
  * whether an object is available or not.
  * If any object is available the call returns it
  * else the call returns < code >null< /code >.
  *
  * The validity of the objects are determined using the
  * {@link Validator} interface, such that
  * an object < code >o< /code > is valid if
  * < code > Validator.isValid(o) == true < /code >.
  *
  * @return T one of the pooled objects.
  */
 T get();
 
 /**
  * Releases the object and puts it back to the pool.
  *
  * The mechanism of putting the object back to the pool is
  * generally asynchronous,
  * however future implementations might differ.
  *
  * @param t the object to return to the pool
  */
  
 void release(T t);
 
 /**
  * Shuts down the pool. In essence this call will not
  * accept any more requests
  * and will release all resources.
  * Releasing resources are done
  * via the < code >invalidate()< /code >
  * method of the {@link Validator} interface.
  */
 
 void shutdown();
}

为了能够支持任意对象,上面这个接口故意设计得很简单通用。它提供了从池里获取/返回对象的方法,还有一个关闭池的机制,以便释放对象。

现在我们来实现一下这个接口。开始动手之前,值得一提的是,一个理想的release方法应该先尝试检查下这个客户端返回的对象是否还能重复使用。如果是的话再把它扔回池里,如果不是,就舍弃掉这个对象。我们希望这个Pool接口的所有实现都能遵循这个规则。在开始具体的实现类前,我们先创建一个抽象类,以便限制后续的实现能遵循这点。我们实现的抽象类就叫做AbstractPool,它的定义如下:

package com.test.pool;
 
/**
 * Represents an abstract pool, that defines the procedure
 * of returning an object to the pool.
 *
 * @author Swaranga
 *
 * @param < T > the type of pooled objects.
 */
abstract class AbstractPool < T > implements Pool < T >
{
 /**
  * Returns the object to the pool.
  * The method first validates the object if it is
  * re-usable and then puts returns it to the pool.
  *
  * If the object validation fails,
  * some implementations
  * will try to create a new one
  * and put it into the pool; however
  * this behaviour is subject to change
  * from implementation to implementation
  *
  */
 @Override
 public final void release(T t)
 {
  if(isValid(t))
  {
   returnToPool(t);
  }
  else
  {
   handleInvalidReturn(t);
  }
 }
 
 protected abstract void handleInvalidReturn(T t);
 
 protected abstract void returnToPool(T t);
 
 protected abstract boolean isValid(T t);
}

在上面这个类里,我们让对象池必须得先验证对象后才能把它放回到池里。具体的实现可以自由选择如何实现这三种方法,以便定制自己的行为。它们根据自己的逻辑来决定如何判断一个对象有效,无效的话应该怎么处理(handleInvalidReturn方法),怎么把一个有效的对象放回到池里(returnToPool方法)。

有了上面这几个类,我们就可以着手开始具体的实现了。不过还有个问题,由于上面这些类是设计成能支持通用的对象池的,因此具体的实现不知道该如何验证对象的有效性(因为对象都是泛型的)。因此我们还需要些别的东西来帮助我们完成这个。

我们需要一个通用的方法来完成对象的校验,而具体的实现不必关心对象是何种类型。因此我们引入了一个新的接口,Validator,它定义了验证对象的方法。这个接口的定义如下:

package com.test.pool;
 
 /**
  * Represents the functionality to
  * validate an object of the pool
  * and to subsequently perform cleanup activities.
  *
  * @author Swaranga
  *
  * @param < T > the type of objects to validate and cleanup.
  */
 public static interface Validator < T >
 {
  /**
   * Checks whether the object is valid.
   *
   * @param t the object to check.
   *
   * @return <code>true</code>
   * if the object is valid else <code>false</code>.
   */
  public boolean isValid(T t);
  
  /**
   * Performs any cleanup activities
   * before discarding the object.
   * For example before discarding
   * database connection objects,
   * the pool will want to close the connections.
   * This is done via the
   * <code>invalidate()</code> method.
   *
   * @param t the object to cleanup
   */
  
  public void invalidate(T t);
 }

上面这个接口定义了一个检验对象的方法,以及一个把对象置为无效的方法。当准备废弃一个对象并清理内存的时候,invalidate方法就派上用场了。值得注意的是这个接口本身没有任何意义,只有当它在对象池里使用的时候才有意义,所以我们把这个接口定义到Pool接口里面。这和Java集合库里的Map和Map.Entry是一样的。所以我们的Pool接口就成了这样:

package com.test.pool;
 
 
/**
 * Represents a cached pool of objects.
 *
 * @author Swaranga
 *
 * @param < T > the type of object to pool.
 */
public interface Pool< T >
{
 /**
  * Returns an instance from the pool.
  * The call may be a blocking one or a non-blocking one
  * and that is determined by the internal implementation.
  *
  * If the call is a blocking call,
  * the call returns immediately with a valid object
  * if available, else the thread is made to wait
  * until an object becomes available.
  * In case of a blocking call,
  * it is advised that clients react
  * to {@link InterruptedException} which might be thrown
  * when the thread waits for an object to become available.
  *
  * If the call is a non-blocking one,
  * the call returns immediately irrespective of
  * whether an object is available or not.
  * If any object is available the call returns it
  * else the call returns < code >null< /code >.
  *
  * The validity of the objects are determined using the
  * {@link Validator} interface, such that
  * an object < code >o< /code > is valid if
  * < code > Validator.isValid(o) == true < /code >.
  *
  * @return T one of the pooled objects.
  */
 T get();
 
 /**
  * Releases the object and puts it back to the pool.
  *
  * The mechanism of putting the object back to the pool is
  * generally asynchronous,
  * however future implementations might differ.
  *
  * @param t the object to return to the pool
  */
  
 void release(T t);
 
 /**
  * Shuts down the pool. In essence this call will not
  * accept any more requests
  * and will release all resources.
  * Releasing resources are done
  * via the < code >invalidate()< /code >
  * method of the {@link Validator} interface.
  */
 
 void shutdown();
 
 /**
  * Represents the functionality to
  * validate an object of the pool
  * and to subsequently perform cleanup activities.
  *
  * @author Swaranga
  *
  * @param < T > the type of objects to validate and cleanup.
  */
 public static interface Validator < T >
 {
  /**
   * Checks whether the object is valid.
   *
   * @param t the object to check.
   *
   * @return <code>true</code>
   * if the object is valid else <code>false</code>.
   */
  public boolean isValid(T t);
  
  /**
   * Performs any cleanup activities
   * before discarding the object.
   * For example before discarding
   * database connection objects,
   * the pool will want to close the connections.
   * This is done via the
   * <code>invalidate()</code> method.
   *
   * @param t the object to cleanup
   */
  
  public void invalidate(T t);
 }
}

准备工作已经差不多了,在最后开始前我们还需要一个终极武器,这才是这个对象池的杀手锏。就是“能够创建新的对象”。我们的对象池是泛型的,因此它们得知道如何去生成新的对象来填充这个池子。这个功能不能依赖于对象池本身,必须要有一个通用的方式来创建新的对象。通过一个ObjectFactory的接口就能完成这个,它只有一个方法,就是“如何创建新的对象”。我们的ObjectFactory接口如下

package com.test.pool;
 
/**
 * Represents the mechanism to create
 * new objects to be used in an object pool.
 *
 * @author Swaranga
 *
 * @param < T > the type of object to create.
 */
public interface ObjectFactory < T >
{
 /**
  * Returns a new instance of an object of type T.
  *
  * @return T an new instance of the object of type T
  */
 public abstract T createNew();
}

我们的工具类都已经搞定了,现在可以开始真正实现我们的Pool接口了。因为我们希望这个池能在并发程序里面使用,所以我们会创建一个阻塞的对象池,当没有对象可用的时候,让客户端先阻塞住。我们的阻塞机制是让客户端一直阻塞直到有对象可用为止。这种实现方式导致我们需要再增加一个只阻塞一定时间的方法,如果在超时时间到来前有对象可用则返回,如果超时了就返回null而不是一直等待下去。这样的实现有点类似Java并发库里的LinkedBlockingQueue,因此真正实现前我们再暴露一个接口,BlockingPool,类似于Java并发库里的BlockingQueue接口。

这里是BlockingQueue的声明:

package com.test.pool;
 
import java.util.concurrent.TimeUnit;
 
/**
 * Represents a pool of objects that makes the
 * requesting threads wait if no object is available.
 *
 * @author Swaranga
 *
 * @param < T > the type of objects to pool.
 */
public interface BlockingPool < T > extends Pool < T >
{
 /**
  * Returns an instance of type T from the pool.
  *
  * The call is a blocking call,
  * and client threads are made to wait
  * indefinitely until an object is available.
  * The call implements a fairness algorithm
  * that ensures that a FCFS service is implemented.
  *
  * Clients are advised to react to InterruptedException.
  * If the thread is interrupted while waiting
  * for an object to become available,
  * the current implementations
  * sets the interrupted state of the thread
  * to <code>true</code> and returns null.
  * However this is subject to change
  * from implementation to implementation.
  *
  * @return T an instance of the Object
  * of type T from the pool.
  */
 T get();
 
 /**
  * Returns an instance of type T from the pool,
  * waiting up to the
  * specified wait time if necessary
  * for an object to become available..
  *
  * The call is a blocking call,
  * and client threads are made to wait
  * for time until an object is available
  * or until the timeout occurs.
  * The call implements a fairness algorithm
  * that ensures that a FCFS service is implemented.
  *
  * Clients are advised to react to InterruptedException.
  * If the thread is interrupted while waiting
  * for an object to become available,
  * the current implementations
  * set the interrupted state of the thread
  * to <code>true</code> and returns null.
  * However this is subject to change
  * from implementation to implementation.
  *
  *
  * @param time amount of time to wait before giving up,
  *   in units of <tt>unit</tt>
  * @param unit a <tt>TimeUnit</tt> determining
  *   how to interpret the
  *        <tt>timeout</tt> parameter
  *      
  * @return T an instance of the Object
  * of type T from the pool.
  *      
  * @throws InterruptedException
  * if interrupted while waiting
  */
 
 T get(long time, TimeUnit unit) throws InterruptedException;
}

BoundedBlockingPool的实现如下:

package com.test.pool;
 
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
 
public final class BoundedBlockingPool < T >
 extends AbstractPool < T >
 implements BlockingPool < T >
{
 private int size;
  
 private BlockingQueue < T > objects;
  
 private Validator < T > validator;
 private ObjectFactory < T > objectFactory;
  
 private ExecutorService executor =
  Executors.newCachedThreadPool();
   
 private volatile boolean shutdownCalled;
  
 public BoundedBlockingPool(
   int size,
   Validator < T > validator,
   ObjectFactory < T > objectFactory)
 {
  super();
   
  this.objectFactory = objectFactory;
  this.size = size;
  this.validator = validator;
   
  objects = new LinkedBlockingQueue < T >(size);
   
  initializeObjects();
   
  shutdownCalled = false;
 }
  
 public T get(long timeOut, TimeUnit unit)
 {
  if(!shutdownCalled)
  {
   T t = null;
    
   try
   {
    t = objects.poll(timeOut, unit);
     
    return t;
   }
   catch(InterruptedException ie)
   {
    Thread.currentThread().interrupt();
   }
    
   return t;
  }
   
  throw new IllegalStateException(
   'Object pool is already shutdown');
 }
  
 public T get()
 {
  if(!shutdownCalled)
  {
   T t = null;
    
   try
   {
    t = objects.take();
   }
   catch(InterruptedException ie)
   {
    Thread.currentThread().interrupt();
   }
    
   return t;
  }
   
  throw new IllegalStateException(
   'Object pool is already shutdown');
 }
  
 public void shutdown()
 {
  shutdownCalled = true;
   
  executor.shutdownNow();
   
  clearResources();
 }
  
 private void clearResources()
 {
  for(T t : objects)
  {
   validator.invalidate(t);
  }
 }
  
 @Override
 protected void returnToPool(T t)
 {
  if(validator.isValid(t))
  {
   executor.submit(new ObjectReturner(objects, t));
  }
 }
  
 @Override
 protected void handleInvalidReturn(T t)
 {
   
 }
  
 @Override
 protected boolean isValid(T t)
 {
  return validator.isValid(t);
 }
  
 private void initializeObjects()
 {
  for(int i = 0; i < size; i++)
  {
   objects.add(objectFactory.createNew());
  }
 }
  
 private class ObjectReturner < E >
            implements Callable < Void >
 {
  private BlockingQueue < E > queue;
  private E e;
   
  public ObjectReturner(BlockingQueue < E > queue, E e)
  {
   this.queue = queue;
   this.e = e;
  }
   
  public Void call()
  {
   while(true)
   {
    try
    {
     queue.put(e);
     break;
    }
    catch(InterruptedException ie)
    {
     Thread.currentThread().interrupt();
    }
   }
    
   return null;
  }
 }
}

上面是一个非常基本的对象池,它内部是基于一个LinkedBlockingQueue来实现的。这里唯一比较有意思的方法就是returnToPool。因为内部的存储是一个LinkedBlockingQueue实现的,如果我们直接把返回的对象扔进去的话,如果队列已满可能会阻塞住客户端。不过我们不希望客户端因为把对象放回池里这么个普通的方法就阻塞住了。所以我们把最终将对象插入到队列里的任务作为一个异步的的任务提交给一个Executor来执行,以便让客户端线程能立即返回。

现在我们将在自己的代码中使用上面这个对象池,用它来缓存数据库连接。我们需要一个校验器来验证数据库连接是否有效。

下面是这个JDBCConnectionValidator:

package com.test;
 
import java.sql.Connection;
import java.sql.SQLException;
 
import com.test.pool.Pool.Validator;
 
public final class JDBCConnectionValidator
    implements Validator < Connection >
{
 public boolean isValid(Connection con)
 {
  if(con == null)
  {
   return false;
  }
  
  try
  {
   return !con.isClosed();
  }
  catch(SQLException se)
  {
   return false;
  }
 }
 
 public void invalidate(Connection con)
 {
  try
  {
   con.close();
  }
  catch(SQLException se)
  {
   
  }
 }
}

还有一个JDBCObjectFactory,它将用来生成新的数据库连接对象:

package com.test;
 
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
 
import com.test.pool.ObjectFactory;
 
public class JDBCConnectionFactory
 implements ObjectFactory < Connection >
{
 private String connectionURL;
 private String userName;
 private String password;
  
 public JDBCConnectionFactory(
  String driver,
  String connectionURL,
  String userName,
  String password)
        {
         super();
         
         try
         {
          Class.forName(driver);
         }
         catch(ClassNotFoundException ce)
         {
          throw new IllegalArgumentException(
           'Unable to find driver in classpath', ce);
         }
         
         this.connectionURL = connectionURL;
         this.userName = userName;
         this.password = password;
        }
 
 public Connection createNew()
 {
  try
  {
   return
       DriverManager.getConnection(
    connectionURL,
    userName,
    password);
  }
  catch(SQLException se)
  {
   throw new IllegalArgumentException(
    'Unable to create new connection', se);
  }
 }
}

现在我们用上述的Validator和ObjectFactory来创建一个JDBC的连接池:

package com.test;
import java.sql.Connection;
 
import com.test.pool.Pool;
import com.test.pool.PoolFactory;
 
 
public class Main
{
 public static void main(String[] args)
    {
  Pool < Connection > pool =
   new BoundedBlockingPool < Connection > (
    10,
    new JDBCConnectionValidator(),
    new JDBCConnectionFactory('', '', '', '')
    );
  
  //do whatever you like
    }
}

为了犒劳下能读完整篇文章的读者,我这再提供另一个非阻塞的对象池的实现,这个实现和前面的唯一不同就是即使对象不可用,它也不会让客户端阻塞,而是直接返回null。具体的实现在这:

package com.test.pool;
 
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;
 
public class BoundedPool < T >
 extends AbstractPool < T >
{
 private int size;
 
 private Queue < T > objects;
 
 private Validator < T > validator;
 private ObjectFactory < T > objectFactory;
 
 private Semaphore permits;
  
 private volatile boolean shutdownCalled;
 
 public BoundedPool(
  int size,
  Validator < T > validator,
  ObjectFactory < T > objectFactory)
 {
  super();
  
  this.objectFactory = objectFactory;
  this.size = size;
  this.validator = validator;
  
  objects = new LinkedList < T >();
  
  initializeObjects();
  
  shutdownCalled = false;
 }
 
 
 @Override
 public T get()
 {
  T t = null;
  
  if(!shutdownCalled)
  {
   if(permits.tryAcquire())
   {
    t = objects.poll();
   }
  }
  else
  {
   throw new IllegalStateException(
    'Object pool already shutdown');
  }
  
  return t;
 }
 
 @Override
 public void shutdown()
 {
  shutdownCalled = true;
  
  clearResources();
 }
 
 private void clearResources()
 {
  for(T t : objects)
  {
   validator.invalidate(t);
  }
 }
 
 @Override
 protected void returnToPool(T t)
 {
  boolean added = objects.add(t);
  
  if(added)
  {
   permits.release();
  }
 }
 
 @Override
 protected void handleInvalidReturn(T t)
 {
  
 }
 
 @Override
 protected boolean isValid(T t)
 {
  return validator.isValid(t);
 }
 
 private void initializeObjects()
 {
  for(int i = 0; i < size; i++)
  {
   objects.add(objectFactory.createNew());
  }
 }
}

考虑到我们现在已经有两种实现,非常威武了,得让用户通过工厂用具体的名称来创建不同的对象池了。工厂来了:

package com.test.pool;
 
import com.test.pool.Pool.Validator;
 
/**
 * Factory and utility methods for
 * {@link Pool} and {@link BlockingPool} classes
 * defined in this package.
 * This class supports the following kinds of methods:
 *
 *
                      <ul>
                       
 *
<li> Method that creates and returns a default non-blocking
 *        implementation of the {@link Pool} interface.
 *   </li>
* 
 *
<li> Method that creates and returns a
 *        default implementation of
 *        the {@link BlockingPool} interface.
 *   </li>
*
                      </ul>
*
 * @author Swaranga
 */
 
public final class PoolFactory
{
 private PoolFactory()
 {
  
 }
 
 /**
  * Creates a and returns a new object pool,
  * that is an implementation of the {@link BlockingPool},
  * whose size is limited by
  * the <tt> size </tt> parameter.
  *
  * @param size the number of objects in the pool.
  * @param factory the factory to create new objects.
  * @param validator the validator to
  * validate the re-usability of returned objects.
  *
  * @return a blocking object pool
  * bounded by <tt> size </tt>
  */
 public static < T > Pool < T >
  newBoundedBlockingPool(
      int size,
      ObjectFactory < T > factory,
      Validator < T > validator)
 {
  return new BoundedBlockingPool < T > (
                                    size,
                                    validator,
                                    factory);
 }
 
 /**
  * Creates a and returns a new object pool,
  * that is an implementation of the {@link Pool}
  * whose size is limited
  * by the <tt> size </tt> parameter.
  *
  * @param size the number of objects in the pool.
  * @param factory the factory to create new objects.
  * @param validator the validator to validate
  * the re-usability of returned objects.
  *
  * @return an object pool bounded by <tt> size </tt>
  */
 
 public static < T > Pool < T > newBoundedNonBlockingPool(
  int size,
  ObjectFactory < T > factory,
  Validator < T > validator)
 {
  return new BoundedPool < T >(size, validator, factory);
 }
}

现在我们的客户端就能用一种可读性更强的方式来创建对象池了:

package com.test;
import java.sql.Connection;
 
import com.test.pool.Pool;
import com.test.pool.PoolFactory;
 
 
public class Main
{
 public static void main(String[] args)
    {
  Pool < Connection > pool =
   PoolFactory.newBoundedBlockingPool(
    10,
    new JDBCConnectionFactory('', '', '', ''),
    new JDBCConnectionValidator());
  
  //do whatever you like
    }
}

好吧,终于写完了,拖了这么久了。尽情使用和完善它吧,或者再多加几种实现。

快乐编码,快乐分享!

原创文章转载请注明出处:一个通用并发对象池的实现

英文原文链接