手指上的代码-之ACTIVEMQ(2)

指尖上的代码--之ACTIVEMQ(2)

以下重点讲一下Transport transport = createTransport();创建Transport的过程。

protected Transport createTransport() throws JMSException {

    try {

        return TransportFactory.connect(brokerURL, DEFAULT_CONNECTION_EXECUTOR);

    } catch (Exception e) {

        throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);

    }

}

 

public static Transport connect(URI location, Executor ex) throws Exception {

    TransportFactory tf = findTransportFactory(location);

    return tf.doConnect(location, ex);

}

这里使用了一个典型的设计模式:工厂方法。

TransportFactory tf = findTransportFactory(location);通过FactoryFinder的内部类StandaloneObjectFactory,找到"META-INF/services/org/apache/activemq/transport/"目录下的具体location.getScheme()对应的配置文件,并实例化对应的工厂类。

然后tf.doConnect(location, ex);通过实例化的具体工厂类获取transport

 

不同的底层通讯需要不同的Transport,那就需要不同TransportFactory。在这里我们只需要写一个工厂类继承自TransportFactory,然后在"META-INF/services/org/apache/activemq/transport/"目录下放个配置文件,指向该工厂类就OK了。这种方式还是很值得我们学习的,使用配置的方式新增,而不是在TansportFactory代码中去添加,更好的遵守了OCP原则。

 

回头再过来看一下类图结构,就会发现另外三种设计模式:Decorator(装饰)、Compositor(合成)和Observer(观察者)模式。

 


手指上的代码-之ACTIVEMQ(2)

 

这里的Decorator模式和java.io包中InputStreamOutputStreamDecorator如出一辙,连命名都差不多。

InputStream  --   Transport

FilterInputStream  --  FilterTransport

BufferedInputStream – TcpTransport

只是这里在Decorator模式的基础上增加了CompositeObserver模式的运用,使系统的扩展性更好。

回到例子中,继续挖掘Transport的创建过程。

根据schemefailover,找到了FailoverTransportFactory工厂类,调用doConnect方法:

public Transport doConnect(URI location) throws IOException {

    try {

        Transport transport = createTransport(URISupport.parseComposite(location));

        transport = new MutexTransport(transport);

        transport = new ResponseCorrelator(transport);

        return transport;

    } catch (URISyntaxException e) {

        throw new IOException("Invalid location: " + location);

    }

}

 

public Transport createTransport(CompositeData compositData) throws IOException {

    Map options = compositData.getParameters();

    FailoverTransport transport = createTransport(options);

    if (!options.isEmpty()) {

        throw new IllegalArgumentException("Invalid connect parameters: " + options);

    }

    transport.add(false,compositData.getComponents());

    return transport;

}

 

public FailoverTransport createTransport(Map parameters) throws IOException {

    FailoverTransport transport = new FailoverTransport();

    IntrospectionSupport.setProperties(transport, parameters);

    return transport;

}

为了配合合成模式的使用,url也使用了合成的方式,如例子中的failover://tcp://localhost:61616就是failovertcp的合成。可以看到这里的CompositTransport中合成的不是Transport而是url,与经典的合成模式还是有稍许差别。但这并影响它是合成模式的事实,因为合成的url的最终目的还是用于生成对应的Transport 。那么,为什么接口CompositeTransportaddremove都是数组形式uris?

public interface CompositeTransport extends Transport {

    void add(boolean rebalance,URI[] uris);

    void remove(boolean rebalance,URI[] uris);

}

    另外可以发现TransportFilter继承了TransportListener并且合成了一个TransportListener。为什么装饰对象既是Transport又带上了TransportListener的功能,而且装饰对象中又包含了一个TransportListener

public class TransportFilter implements TransportListener, Transport {

    protected final Transport next;

    protected TransportListener transportListener;

 

    public TransportFilter(Transport next) {

        this.next = next;

    }

 

    public TransportListener getTransportListener() {

        return transportListener;

    }

 

    public void setTransportListener(TransportListener channelListener)     {

        this.transportListener = channelListener;

        if (channelListener == null) {

            next.setTransportListener(null);

        } else {

            next.setTransportListener(this);

        }

}

……

}

带着这些问题,下面具体分析FailoverTransport