Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewritten Netty Jersey implementation using direct ByteBuf consumption #4312

Merged
merged 1 commit into from
Nov 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.glassfish.jersey.netty.connector;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
Expand All @@ -31,6 +30,7 @@
import org.glassfish.jersey.netty.connector.internal.NettyInputStream;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpContent;
Expand All @@ -50,7 +50,7 @@
class JerseyClientHandler extends SimpleChannelInboundHandler<HttpObject> {

private final NettyConnector connector;
private final LinkedBlockingDeque<InputStream> isList = new LinkedBlockingDeque<>();
private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();

private final AsyncConnectorCallback asyncConnectorCallback;
private final ClientRequest jerseyRequest;
Expand Down Expand Up @@ -89,15 +89,15 @@ public String getReasonPhrase() {
for (Map.Entry<String, String> entry : response.headers().entries()) {
jerseyResponse.getHeaders().add(entry.getKey(), entry.getValue());
}

isList.clear(); // clearing the content - possible leftover from previous request processing.
// request entity handling.
if ((response.headers().contains(HttpHeaderNames.CONTENT_LENGTH) && HttpUtil.getContentLength(response) > 0)
|| HttpUtil.isTransferEncodingChunked(response)) {

ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
isList.add(NettyInputStream.END_OF_INPUT_ERROR);
isList.add(Unpooled.EMPTY_BUFFER);
}
});

Expand All @@ -123,21 +123,16 @@ public void run() {

}
if (msg instanceof HttpContent) {

HttpContent httpContent = (HttpContent) msg;

ByteBuf content = httpContent.content();

if (content.isReadable()) {
// copy bytes - when netty reads last chunk, it automatically closes the channel, which invalidates all
// relates ByteBuffs.
byte[] bytes = new byte[content.readableBytes()];
content.getBytes(content.readerIndex(), bytes);
isList.add(new ByteArrayInputStream(bytes));
content.retain();
isList.add(content);
}

if (msg instanceof LastHttpContent) {
isList.add(NettyInputStream.END_OF_INPUT);
isList.add(Unpooled.EMPTY_BUFFER);
}
}
}
Expand All @@ -153,6 +148,6 @@ public void run() {
});
}
future.completeExceptionally(cause);
isList.add(NettyInputStream.END_OF_INPUT_ERROR);
ctx.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@
public class JerseyChunkedInput extends OutputStream implements ChunkedInput<ByteBuf>, ChannelFutureListener {

private static final ByteBuffer VOID = ByteBuffer.allocate(0);
private static final int CAPACITY = 8;
// TODO this needs to be configurable, see JERSEY-3228
private static final int WRITE_TIMEOUT = 10000;
private static final int READ_TIMEOUT = 10000;
private static final int CAPACITY = Integer.getInteger("jersey.ci.capacity", 8);
private static final int WRITE_TIMEOUT = Integer.getInteger("jersey.ci.read.timeout", 10000);
private static final int READ_TIMEOUT = Integer.getInteger("jersey.ci.write.timeout", 10000);

private final LinkedBlockingDeque<ByteBuffer> queue = new LinkedBlockingDeque<>(CAPACITY);
private final Channel ctx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,77 +20,47 @@
import java.io.InputStream;
import java.util.concurrent.LinkedBlockingDeque;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

/**
* Input stream which servers as Request entity input.
* <p>
* Converts Netty NIO buffers to an input streams and stores them in the queue,
* waiting for Jersey to process it.
*
* @author Pavel Bucek
* Consumes a list of pending {@link ByteBuf}s and processes them on request by Jersey
*/
public class NettyInputStream extends InputStream {

private volatile boolean end = false;

/**
* End of input.
*/
public static final InputStream END_OF_INPUT = new InputStream() {
@Override
public int read() throws IOException {
return 0;
}

@Override
public String toString() {
return "END_OF_INPUT " + super.toString();
}
};

/**
* Unexpected end of input.
*/
public static final InputStream END_OF_INPUT_ERROR = new InputStream() {
@Override
public int read() throws IOException {
return 0;
}

@Override
public String toString() {
return "END_OF_INPUT_ERROR " + super.toString();
}
};

private final LinkedBlockingDeque<InputStream> isList;
private final LinkedBlockingDeque<ByteBuf> isList;

public NettyInputStream(LinkedBlockingDeque<InputStream> isList) {
public NettyInputStream(LinkedBlockingDeque<ByteBuf> isList) {
this.isList = isList;
}

private interface ISReader {
int readFrom(InputStream take) throws IOException;
}

private int readInternal(ISReader isReader) throws IOException {
if (end) {
return -1;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {

InputStream take;
ByteBuf take;
try {
take = isList.take();

if (checkEndOfInput(take)) {
boolean isReadable = take.isReadable();
int read = -1;
if (checkEndOfInputOrError(take)) {
take.release();
return -1;
}

int read = isReader.readFrom(take);

if (take.available() > 0) {
isList.addFirst(take);
if (isReadable) {
int readableBytes = take.readableBytes();
read = Math.min(readableBytes, len);
take.readBytes(b, off, read);
if (read < len) {
take.release();
} else {
isList.addFirst(take);
}
} else {
take.close();
read = 0;
take.release(); //We don't need `0`
}

return read;
Expand All @@ -100,33 +70,53 @@ private int readInternal(ISReader isReader) throws IOException {
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return readInternal(take -> take.read(b, off, len));
public int read() throws IOException {

ByteBuf take;
try {
take = isList.take();
boolean isReadable = take.isReadable();
if (checkEndOfInputOrError(take)) {
take.release();
return -1;
}

if (isReadable) {
return take.readInt();
} else {
take.release(); //We don't need `0`
}

return 0;
} catch (InterruptedException e) {
throw new IOException("Interrupted.", e);
}
}

@Override
public int read() throws IOException {
return readInternal(InputStream::read);
public void close() throws IOException {
if (isList != null) {
while (!isList.isEmpty()) {
try {
isList.take().release();
} catch (InterruptedException e) {
throw new IOException("Interrupted. Potential ByteBuf Leak.", e);
}
}
}
super.close();
}

@Override
public int available() throws IOException {
InputStream peek = isList.peek();
if (peek != null) {
return peek.available();
ByteBuf peek = isList.peek();
if (peek != null && peek.isReadable()) {
return peek.readableBytes();
}

return 0;
}

private boolean checkEndOfInput(InputStream take) throws IOException {
if (take == END_OF_INPUT) {
end = true;
return true;
} else if (take == END_OF_INPUT_ERROR) {
end = true;
throw new IOException("Connection was closed prematurely.");
}
return false;
private boolean checkEndOfInputOrError(ByteBuf take) throws IOException {
return take == Unpooled.EMPTY_BUFFER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import java.util.concurrent.LinkedBlockingDeque;

import javax.ws.rs.core.SecurityContext;

import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -55,7 +55,7 @@
class JerseyHttp2ServerHandler extends ChannelDuplexHandler {

private final URI baseUri;
private final LinkedBlockingDeque<InputStream> isList = new LinkedBlockingDeque<>();
private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
private final NettyHttpContainer container;
private final ResourceConfig resourceConfig;

Expand Down Expand Up @@ -92,9 +92,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
* Process incoming data.
*/
private void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) throws Exception {
isList.add(new ByteBufInputStream(data.content(), true));
isList.add(data.content());
if (data.isEndStream()) {
isList.add(NettyInputStream.END_OF_INPUT);
isList.add(Unpooled.EMPTY_BUFFER);
}
}

Expand Down Expand Up @@ -163,7 +163,7 @@ public void removeProperty(String name) {
ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
isList.add(NettyInputStream.END_OF_INPUT_ERROR);
isList.add(Unpooled.EMPTY_BUFFER);
}
});

Expand Down
Loading