如何完成不是字节数组输入流的异步HTTP客户端输入流?
我正在使用异步Http客户端下载很多(可能很大)文件来自网络。
I am using Async Http Client to download lots of (possibly large) files from the internet.
在我的特殊情况下,我需要将这些下载URL的字节输入流发送到另一个服务以进行解析。
In my particular case, I need to send along the InputStream of bytes from these downloading URLs to another service to parse.
天真的方法是这样做:
AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient(Dsl.config()
.setMaxConnectionsPerHost(-1)
.setMaxConnections(-1)
.setPooledConnectionIdleTimeout(60 * 10 * 1000)
.setConnectionTtl(6 * 60 * 1000)
.setConnectTimeout(5 * 1000)
.setRequestTimeout(5 * 60 * 1000)
.setFollowRedirect(true)
.setRealm(new Realm.Builder(username, password)
.setNtlmDomain(domain)
.setScheme(Realm.AuthScheme.NTLM)
.build())
Response httpGetResponse = asyncHttpClient.prepareGet(url).execute().get();
return httpGetResponse.getResponseBodyAsStream();
但是在本教程中针对异步http请求,我们了解到,与HTTP组件http客户端不同,异步http客户端会将整个文件下载到内存中。
But in this tutorial for async http requests we learn that unlike HTTP Components http client, async http client will download the entire file to the memory.
在我的情况下,这会迅速导致OOM。
This will, in my case, quickly cause OOMs.
所以替代方法是:
Response httpGetResponse = asyncHttpClient.prepareGet(url).execute(new AsyncHandler<Response>() {
private final Response.ResponseBuilder builder = new Response.ResponseBuilder();
@Override
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
bodyPart.getBodyByteBuffer(); // Each chunk of bytes will be fed into this method.
// I need to write these bytes to the resuting input stream
// without streaming them all into memory.
return State.CONTINUE;
}
@Override
public State onHeadersReceived(HttpHeaders headers) throws Exception {
builder.accumulate(headers);
return State.CONTINUE;
}
@Override
public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
builder.accumulate(responseStatus);
return State.CONTINUE;
}
@Override
public Response onCompleted() throws Exception {
return builder.build();
}
@Override
public void onThrowable(Throwable t) {
}
}).get();
什么是最简单,最干净的方法来获取这些字节进入输入流?
What is the easiest, cleanest way to get these bytes as they come to an input stream?
我有两个想法:
1)将输入内容写入文件,然后流式传输文件
或
2)立即返回管道输入流,接收到的字节将被写入管道输入流。
1) Write the input to file, then stream the file or 2) Return a piped input stream right away and the bytes will be written to the piped input stream as they are received.
有人有可以分享的可行示例吗?
Does anyone have a working example they can share with this?
我正确地认为有人已经这样做了。实际上,在我对异步http客户端和管道输入流进行搜索之后,我在项目本身中发现了这一点:
I correctly assumed someone had already done this. In fact, after I did a search on "async http client" and "piped input stream" i found this in the project itself:
使用情况:
PipedInputStream pipedInputStream = new PipedInputStream();
PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream);
BodyDeferringAsyncHandler bodyDeferringAsyncHandler = new BodyDeferringAsyncHandler(pipedOutputStream);
Future<Response> futureResponse = asyncHttpClient.prepareGet(url).execute(bodyDeferringAsyncHandler);
Response response = bodyDeferringAsyncHandler.getResponse();
if (response.getStatusCode() == 200) {
return new BodyDeferringAsyncHandler.BodyDeferringInputStream(futureResponse,
bodyDeferringAsyncHandler,
pipedInputStream);
} else {
return null;
}