pubsub如何知道某个时间点发布了多少条消息?
在此处发布消息的代码:
Code for publishing the messages here:
async function publishMessage(topicName) {
console.log(`[${new Date().toISOString()}] publishing messages`);
const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
const topic = pubsub.topic(topicName, {
batching: {
maxMessages: 10,
maxMilliseconds: 10 * 1000,
},
});
const n = 5;
const dataBufs: Buffer[] = [];
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
dataBufs.push(dataBuffer);
}
const results = await Promise.all(
dataBufs.map((dataBuf, idx) =>
topic.publish(dataBuf).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
return messageId;
})
)
);
console.log('results:', results.toString());
}
如您所见,我将发布5条消息.发布时间是await Promise.all(...)
,对于用户来说,这时我们可以说发送消息,但是对于pubsub库内部而言,可能不是.我将maxMessages
设置为10
,因此pubsub将等待10秒(maxMilliseconds
),然后发布这5条消息.
As you can see, I am going to publish 5 messages. The time to publish is await Promise.all(...)
, I mean, for users, We can say send messages at this moment, but for internal of pubsub library maybe not. I set maxMessages
to 10
, so pubsub will wait for 10 seconds(maxMilliseconds
), then publish these 5 messages.
抽烟结果符合我的期望:
The exuection result meets my expectations:
[2020-05-05T09:53:32.078Z] publishing messages
[2020-05-05T09:53:42.209Z] Message 36854 published. index: 0
[2020-05-05T09:53:42.209Z] Message 36855 published. index: 1
[2020-05-05T09:53:42.209Z] Message 36856 published. index: 2
[2020-05-05T09:53:42.209Z] Message 36857 published. index: 3
[2020-05-05T09:53:42.209Z] Message 36858 published. index: 4
results: 36854,36855,36856,36857,36858
实际上,我认为topic.publish
不会直接调用远程pubsub服务,而是将消息推送到内存队列中.而且有一个窗口时间可以计算邮件的数量,可能是打勾或类似的时间:
In fact, I think topic.publish
does not directly call the remote pubsub service, but pushes the message into the memory queue. And there is a window time to calculate the count of the messages, maybe in a tick or something like:
// internal logic of @google/pubsub library
setTimeout(() => {
// if user messages to be published gte maxMessages, then, publish them immediately
if(getLength(messageQueue) >= maxMessages) {
callRemotePubsubService(messageQueue)
}
}, /* window time = */ 100);
还是使用setImmediate()
,process.nextTick()
?
请注意,向服务发送消息的条件是或"而不是与".换句话说,如果maxMessages
消息正在等待发送,或者自库接收到第一条未完成的消息以来已经过去了maxMilliseconds
,它将把未完成的消息发送到服务器.
Note that the conditions for sending a message to the service is an OR not an AND. In other words, if either maxMessages
messages are waiting to be sent OR maxMilliseconds
has passed since the library received the first outstanding message, it will send the outstanding messages to the server.
客户端库的源代码可用,因此您可以确切地看到它的内容.做.该库有一个队列,用于跟踪尚未发送的消息.添加消息后,如果队列现在已满(基于批处理设置),则它使用setTimeout
安排最终在该服务上发布的呼叫.发布者客户端具有队列实例时,它会在调用publish
时向其添加消息.
The source code for the client library is available, so you can see exactly what it does. The library has a queue that it uses to track messages that haven't been sent yet. When a message is added, if the queue is now full (based on the batching settings), then it immediately calls publish. When the first message is added, it uses setTimeout
to schedule a call that ultimately calls publish on the service. The publisher client has an instance of the queue to which it adds messages when publish
is called.