12.3 使用ActiveMQ 源

12.3 使用ActiveMQ 流

12.3 ActiveMQ streams

12.3 使用ActiveMQ 流

 

ActiveMQ streams are an advanced feature that allows you to use an ActiveMQ client as

a Java IOStream. ActiveMQ will break an OutputStream into distinct chunks of data

and send each chunk through ActiveMQ as a JMS message. A corresponding ActiveMQ

JMS InputStream should be used on the consumer side to reassemble the data

chunks.

 

ActiveMQ流是一个高级特性,允许你在ActiveMQ客户端使用Java 的IOStream.ActiveMQ会将

一个OutputStream拆分成不同的数据块,然后将每一个数据库当成JMS消息并发送.在消息消费者

端需要使用一个相对应的ActiveMQ JMS InputStream来重新组装收到的数据块.

 

If you use a queue as the destination for the stream, using more than one consumer

on a queue (or an exclusive consumer) is fine because this feature uses message

groups. This causes messages with the same group ID to be pinned to a single

consumer. Using more than one producer in this scenario could cause problems with

the message order.

 

如果使用消息队列作为流的消息目的地,使用多个消息消费者(或者使用排他性消息消费者)比较好,因为

这个特性中用到了消息群组.这样会导致具有相同群组ID的消息会被发送到同一个消息消费者.这种场景

下使用多个消息生产者会因为消息顺序问题产生问题.

 

The benefit of using JMS streams is that ActiveMQ will break a stream into manageable

chunks and reassemble them for you at the consumer. So it’s possible to transfer

very large files using this functionality, as depicted in figure 12.4.

 

使用JMS 流的好处是ActiveMQ将一个流分割成可管理的数据块并且可以在消息消费者中重新组装

这些数据库.因此,使用ActiveMQ流可以传输非常大的文件,如图12.4所示:

 

To demonstrate using streams, here’s an example of reading a large file and writing

it out over ActiveMQ:

下面用一个读取一个大文件并使用ActiveMQ重新获取文件的示例来说明如何使用流:

 

//source of our large data

FileInputStream in = new FileInputStream("largetextfile.txt");

String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);

Connection connection = (ActiveMQConnection) connectionFactory.createConnection();

connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Queue destination = session.createQueue(QUEUE_NAME);

OutputStream out = connection.createOutputStream(destination);

 

//now write the file on to ActiveMQ

byte[] buffer = new byte[1024];

while(true)

{

int bytesRead = in.read(buffer);

if (bytesRead==-1)

{

break;

}

out.write(buffer,0,bytesRead);

}

 

//close the stream so the receiving side knows the steam is finished

out.close();

 

In the example, we create an ActiveMQConnection and create an OutputStream using

a queue as the destination. We read the file using a FileInputStream, then write the

FileInputStream onto the ActiveMQ OutputStream.

在上面的示例代码中,我们创建了一个ActiveMQConnection,并使用一个消息队列创建了一个

OutputStream.我们使用FileInputStream读取了一个文件,然后将FileInputStream写入到

ActiveMQ的OutputStream中.

 

Note that we close the ActiveMQ OutputStream when we’ve completed reading the

file. This is important so that the receiving side can determine whether the stream is

finished. It’s recommended that you use a new OutputStream for each file you send.

 

需要注意的是,我们在完成了读取文件后关闭了ActiveMQ的OutputStream.这点很重要,因为关闭

了OutputStream后,ActiveMQ流的接收端就可以决定当前接收的流是否结束.推荐你为每个要发

送的文件使用新的OutputStream.

 

For completeness, here’s the receiving end of an ActiveMQ stream:

为了完整起见,下面列出了ActiveMQ流的接收端代码:

 

//destination of our large data

FileOutputStream out = new FileOutputStream("copied.txt");

String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);

Connection connection = (ActiveMQConnection)

connectionFactory.createConnection();

connection.start();

Session session =connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 

//we want to be an exclusive consumer

String exclusiveQueueName= QUEUE_NAME + "?consumer.exclusive=true";

Queue destination = session.createQueue(exclusiveQueueName);

InputStream in = connection.createInputStream(destination);

 

//now write the file from ActiveMQ

byte[] buffer = new byte[1024];

while(true)

{

int bytesRead = in.read(buffer);

if (bytesRead==-1)

{

break;

}

out.write(buffer,0,bytesRead);

}

out.close();

 

In the example, we create an ActiveMQConnection and from that create an Input-

Stream using a queue as a consumer. Note that we use an exclusive consumer by

appending "?consumer.exclusive=true" to the name of the queue. We do this to

ensure that only one consumer will be reading the stream at a time. We read the

ActiveMQ InputStream and then write it to a FileOutputStream to reassemble the

file on disk. Note that we expect the end of the file to be denoted by the end of the

stream (or -1).

 

在上面的示例代码中,我们创建了一个ActiveMQConnection,并再此基础上创建了一个

Input-Stream使用一个队列作为消费者. 需要注意的是,我们在队列名称后面加上了

"?consumer.exclusive=true"了创建一个排他性的消息消费者.这样做是为了确保同一

时刻只有一个消费者可以读取流.读取了ActiveMQ的InputStream后,将其写入到

FileOutputStream中以便重新组装流并写入到磁盘文件中.注意,我们希望通过流的末

尾(或者读取到-1)来确定文件的结尾.

 

You can use streams with topics too—though if a consumer for a topic starts partway

through the delivery of a stream, it won’t receive any data that was sent before it

was started.

 

你也可以在主题中使用流--但,如果消息消费者在流已经开始传送后的中途启动,则该消费者将不会

收到任何在它启动之前就已经发送过的任何数据.

 

ActiveMQ breaks the stream into manageable chunks of data and sends each

chunk of data as a separate message. This means that you have to be careful when

using them, because if the message consumer should fail partway through reading the

InputStream, there’s currently no way to replay the messages already consumed by

the failed message consumer.

 

ActiveMQ将流分成可管理的数据块然后将所有的数据块作为独立的消息发送.这就意味着你在使用

流时必须十分小心.因为如果消息消费者在读取InputStream的中途失效了,则是前期的消息消费者已经

读取的消费无法重现了.

 

ActiveMQ streams are useful for transferring large payloads, though you’ll need to

think about how an application using ActiveMQ streams should handle failure scenarios.

There’s an alternative and more robust method of sending large payloads: using

blob messages, which we cover in the next section.

 

ActiveMQ流对于传输大尺寸负载来说十分有用,尽管你需要仔细思考,以应对使用ActiveMQ流的应用程序的失效场景.

对于发送给大尺寸负载来说还有一种更加健壮的替代方法,即使用二进制消息,我们将在下一节中介绍.