Skip to content

Commit

Permalink
Support NettyConnector & RequestEntityProcessing.BUFFERED
Browse files Browse the repository at this point in the history
Signed-off-by: jansupol <[email protected]>
  • Loading branch information
jansupol committed Apr 12, 2023
1 parent a07939c commit 9675a05
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
import org.glassfish.jersey.client.spi.Connector;
import org.glassfish.jersey.message.internal.OutboundMessageContext;
import org.glassfish.jersey.netty.connector.internal.JerseyChunkedInput;
import org.glassfish.jersey.netty.connector.internal.NettyEntityWriter;

/**
* Netty connector implementation.
Expand Down Expand Up @@ -391,27 +391,34 @@ public void operationComplete(io.netty.util.concurrent.Future<? super Void> futu
}
};
ch.closeFuture().addListener(closeListener);
if (jerseyRequest.getLengthLong() == -1) {
HttpUtil.setTransferEncodingChunked(nettyRequest, true);
} else {
nettyRequest.headers().add(HttpHeaderNames.CONTENT_LENGTH, jerseyRequest.getLengthLong());

final NettyEntityWriter entityWriter = NettyEntityWriter.getInstance(jerseyRequest, ch);
switch (entityWriter.getType()) {
case CHUNKED:
HttpUtil.setTransferEncodingChunked(nettyRequest, true);
break;
case PRESET:
nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, jerseyRequest.getLengthLong());
break;
// case DELAYED:
// // Set later after the entity is "written"
// break;
}

// Send the HTTP request.
ch.writeAndFlush(nettyRequest);
entityWriter.writeAndFlush(nettyRequest);

final JerseyChunkedInput jerseyChunkedInput = new JerseyChunkedInput(ch);
jerseyRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() {
@Override
public OutputStream getOutputStream(int contentLength) throws IOException {
return jerseyChunkedInput;
return entityWriter.getOutputStream();
}
});

if (HttpUtil.isTransferEncodingChunked(nettyRequest)) {
ch.write(new HttpChunkedInput(jerseyChunkedInput));
entityWriter.write(new HttpChunkedInput(entityWriter.getChunkedInput()));
} else {
ch.write(jerseyChunkedInput);
entityWriter.write(entityWriter.getChunkedInput());
}

executorService.execute(new Runnable() {
Expand All @@ -422,19 +429,27 @@ public void run() {

try {
jerseyRequest.writeEntity();

if (entityWriter.getType() == NettyEntityWriter.Type.DELAYED) {
nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, entityWriter.getLength());
entityWriter.flush();
}

} catch (IOException e) {
responseDone.completeExceptionally(e);
}
}
});

ch.flush();
if (entityWriter.getType() != NettyEntityWriter.Type.DELAYED) {
entityWriter.flush();
}
} else {
// Send the HTTP request.
ch.writeAndFlush(nettyRequest);
}

} catch (InterruptedException e) {
} catch (IOException | InterruptedException e) {
responseDone.completeExceptionally(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.netty.connector.internal;

import io.netty.channel.Channel;
import io.netty.handler.stream.ChunkedInput;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.ClientRequest;
import org.glassfish.jersey.client.RequestEntityProcessing;

import java.io.IOException;
import java.io.OutputStream;
import java.util.LinkedList;
import java.util.List;

/**
* The Entity Writer is used to write entity in Netty. One implementation is delayed,
* so that the complete message length can be set to Content-Length header.
*/
public interface NettyEntityWriter {

/**
* Type of the entity writer. {@code CHUNKED} is used for chunked data. {@code PRESET} is for buffered data, but the
* content length was pre-set by the customer. {@code DELAYED} is for buffered data where the content-length is unknown.
* The headers must not be written before the entity is provided by MessageBodyWriter to know the exact length.
*/
enum Type {
CHUNKED,
PRESET,
DELAYED
}

/**
* Writes the Object to the channel
* @param object object to be written
*/
void write(Object object);

/**
* Writes the Object to the channel and flush.
* @param object object to be written
*/
void writeAndFlush(Object object);

/**
* Flushes the writen objects. Can throw IOException.
* @throws IOException
*/
void flush() throws IOException;

/**
* Get the netty Chunked Input to be written.
* @return The Chunked input instance
*/
ChunkedInput getChunkedInput();

/**
* Get the {@link OutputStream} used to write an entity
* @return the OutputStream to write an entity
*/
OutputStream getOutputStream();

/**
* Get the length of the entity written to the {@link OutputStream}
* @return
*/
long getLength();

/**
* Return Type of
* @return
*/
Type getType();

static NettyEntityWriter getInstance(ClientRequest clientRequest, Channel channel) {
final long lengthLong = clientRequest.getLengthLong();
final RequestEntityProcessing entityProcessing = clientRequest.resolveProperty(
ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.class);

if ((entityProcessing == null && lengthLong == -1) || entityProcessing == RequestEntityProcessing.CHUNKED) {
return new DirectEntityWriter(channel, Type.CHUNKED);
} else if (lengthLong != -1) {
return new DirectEntityWriter(channel, Type.PRESET);
} else {
return new DelayedEntityWriter(channel, Type.DELAYED);
}
}

class DirectEntityWriter implements NettyEntityWriter {
private final Channel channel;
private final JerseyChunkedInput stream;
private final Type type;

public DirectEntityWriter(Channel channel, Type type) {
this.channel = channel;
stream = new JerseyChunkedInput(channel);
this.type = type;
}

@Override
public void write(Object object) {
channel.write(object);
}

@Override
public void writeAndFlush(Object object) {
channel.writeAndFlush(object);
}

@Override
public void flush() {
channel.flush();
}

@Override
public ChunkedInput getChunkedInput() {
return stream;
}

@Override
public OutputStream getOutputStream() {
return stream;
}

@Override
public long getLength() {
return stream.progress();
}

@Override
public Type getType() {
return type;
}
}

class DelayedEntityWriter implements NettyEntityWriter {
private final List<Runnable> delayedOps;
private final DirectEntityWriter writer;
private final DelayedOutputStream outputStream;

private boolean flushed = false;

public DelayedEntityWriter(Channel channel, Type type) {
this.writer = new DirectEntityWriter(channel, type);
this.delayedOps = new LinkedList<>();
this.outputStream = new DelayedOutputStream();
}


@Override
public void write(Object object) {
if (!flushed) {
delayedOps.add(() -> writer.write(object));
} else {
writer.write(object);
}
}

@Override
public void writeAndFlush(Object object) {
if (!flushed) {
delayedOps.add(() -> writer.writeAndFlush(object));
} else {
writer.writeAndFlush(object);
}
}

@Override
public void flush() throws IOException {
if (!flushed) {
flushed = true;
for (Runnable runnable : delayedOps) {
runnable.run();
}
writer.getOutputStream().write(outputStream.b, outputStream.off, outputStream.len);
}
writer.flush();
}

@Override
public ChunkedInput getChunkedInput() {
return writer.getChunkedInput();
}

@Override
public OutputStream getOutputStream() {
return outputStream;
}


@Override
public long getLength() {
return outputStream.len - outputStream.off;
}

@Override
public Type getType() {
return writer.getType();
}

private class DelayedOutputStream extends OutputStream {
private byte[] b;
private int off;
private int len;

@Override
public void write(int b) throws IOException {
write(new byte[]{(byte) (b & 0xFF)}, 0, 1);
}

@Override
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (!flushed && this.b == null) {
this.b = b;
this.off = off;
this.len = len;
} else {
DelayedEntityWriter.this.flush();
writer.getOutputStream().write(b, off, len);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.glassfish.jersey.grizzly.connector.GrizzlyConnectorProvider;
import org.glassfish.jersey.jdk.connector.JdkConnectorProvider;
import org.glassfish.jersey.logging.LoggingFeature;
import org.glassfish.jersey.netty.connector.NettyConnectorProvider;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;

Expand Down Expand Up @@ -78,6 +79,7 @@ public static Stream<Arguments> clientConfigs() {
Arguments.of(new TestArguments(() -> new ApacheConnectorProvider(), RequestEntityProcessing.CHUNKED)),
Arguments.of(new TestArguments(() -> new Apache5ConnectorProvider(), RequestEntityProcessing.CHUNKED)),
Arguments.of(new TestArguments(() -> new GrizzlyConnectorProvider(), RequestEntityProcessing.CHUNKED)),
Arguments.of(new TestArguments(() -> new NettyConnectorProvider(), RequestEntityProcessing.CHUNKED)),
Arguments.of(new TestArguments(() -> new HttpUrlConnectorProvider(), RequestEntityProcessing.BUFFERED)),
Arguments.of(new TestArguments(() -> new JdkConnectorProvider(), RequestEntityProcessing.BUFFERED))
);
Expand Down

0 comments on commit 9675a05

Please sign in to comment.