关于ActiveMQ中Session跟Connection资源的管理
关于ActiveMQ中Session和Connection资源的管理
在ActiveMQ中Session和Connection是一种重要的资源,在数据库中,针对重要的资源如Connection我们采用数据库连接池,在ActiveMQ中有一个可选的组件为activmq-pool组件,用于处理关于Session和Connection,管理的方式采用池的原理即对象池。
在activemq-pool中资源管理器类为ActiveMQResourceManager其中PooledConnectionFactory中定义Pool的管理的基本方法。其中在此类中定义默认的一些信息如下:
private ConnectionFactory connectionFactory; private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>(); private ObjectPoolFactory poolFactory; private int maximumActive = 500; private int maxConnections = 1; private int idleTimeout = 30 * 1000; private AtomicBoolean stopped = new AtomicBoolean(false); private long expiryTimeout = 0l; /* Sets the maximum number of active sessions per connection */ public void setMaximumActive(int maximumActive) { this.maximumActive = maximumActive; } /** * @return the maxConnections */ public int getMaxConnections() { return maxConnections; } /** * @param maxConnections the maxConnections to set */ public void setMaxConnections(int maxConnections) { this.maxConnections = maxConnections; } protected ObjectPoolFactory createPoolFactory() { return new GenericObjectPoolFactory(null, maximumActive); } public int getIdleTimeout() { return idleTimeout; } public void setIdleTimeout(int idleTimeout) { this.idleTimeout = idleTimeout; } /** * allow connections to expire, irrespective of load or idle time. This is useful with failover * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery * * @param expiryTimeout non zero in milliseconds */ public void setExpiryTimeout(long expiryTimeout) { this.expiryTimeout = expiryTimeout; } public long getExpiryTimeout() { return expiryTimeout; }
其中默认的Connecton连接数为1:
默认的每一个Connecton的创建的Session数量为500个。
默认是实现的PooledConnectionFactory类如下:
AmqJNDIPooledConnectionFactory
JcaPooledConnectionFactory
XaPooledConnectionFactory
PooledConnectionFactory
其中关于Session的管理的资源池使用如下类:
public class SessionPool implements PoolableObjectFactory
获取Session是从ConnectonPool中获取代码如下:
public Session createSession(boolean transacted, int ackMode) throws JMSException { SessionKey key = new SessionKey(transacted, ackMode); SessionPool pool = cache.get(key); if (pool == null) { pool = createSessionPool(key); cache.put(key, pool); } PooledSession session = pool.borrowSession(); return session; }
使用实例如下:
package easyway.app.activemq.demo.acknow; import java.io.IOException; import java.util.concurrent.CountDownLatch; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.pool.PooledConnection; import org.apache.activemq.pool.PooledConnectionFactory; import org.apache.activemq.transport.TransportListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * ActiveMQ 中连接池的使用 * * @author longgangbai * */ public class ActiveMQConnectionPool { private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionPool.class); private BrokerService broker; private ActiveMQConnectionFactory factory; private PooledConnectionFactory pooledFactory; public void testEviction() throws Exception { broker = new BrokerService(); broker.setPersistent(false); broker.addConnector("tcp://localhost:61619"); broker.start(); factory = new ActiveMQConnectionFactory("tcp://localhost:61619?closeAsync=false"); pooledFactory = new PooledConnectionFactory(factory); PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); ActiveMQConnection amqC = connection.getConnection(); final CountDownLatch gotExceptionEvent = new CountDownLatch(1); amqC.addTransportListener(new TransportListener() { public void onCommand(Object command) { } public void onException(IOException error) { // we know connection is dead... // listeners are fired async gotExceptionEvent.countDown(); } public void transportInterupted() { } public void transportResumed() { } }); sendMessage(connection); Connection connection2 = pooledFactory.createConnection(); sendMessage(connection2); } /** * 发送消息的方法 * @param connection * @throws JMSException */ private void sendMessage(Connection connection) throws JMSException { //获取会话信息 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(new ActiveMQQueue("FOO")); producer.send(session.createTextMessage("Test")); session.close(); } public static void main(String[] args) throws Exception { ActiveMQConnectionPool test=new ActiveMQConnectionPool(); test.testEviction(); } }
在ActiveMQResourceManage的配置如下:
/** * This class allows wiring the ActiveMQ broker and the Geronimo transaction manager * in a way that will allow the transaction manager to correctly recover XA transactions. * * For example, it can be used the following way: * <pre> * <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> * <property name="brokerURL" value="tcp://localhost:61616" /> * </bean> * * <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean"> * <property name="maxConnections" value="8" /> * <property name="transactionManager" ref="transactionManager" /> * <property name="connectionFactory" ref="activemqConnectionFactory" /> * <property name="resourceName" value="activemq.broker" /> * </bean> * * <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource"> * <property name="transactionManager" ref="transactionManager" /> * <property name="connectionFactory" ref="activemqConnectionFactory" /> * <property name="resourceName" value="activemq.broker" /> * </bean> * </pre> */