Skip to content

Commit

Permalink
Support multipart by Jetty & Netty
Browse files Browse the repository at this point in the history
Signed-off-by: jansupol <[email protected]>
  • Loading branch information
jansupol committed Oct 13, 2023
1 parent dbbf057 commit fcc1bb1
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -274,10 +276,13 @@ public CookieStore getCookieStore() {
@Override
public ClientResponse apply(final ClientRequest jerseyRequest) throws ProcessingException {
final Request jettyRequest = translateRequest(jerseyRequest);
final Map<String, String> clientHeadersSnapshot = writeOutBoundHeaders(jerseyRequest.getHeaders(), jettyRequest);
final ContentProvider entity = getBytesProvider(jerseyRequest);
final Map<String, String> clientHeadersSnapshot = new HashMap<>();
final ContentProvider entity =
getBytesProvider(jerseyRequest, jerseyRequest.getHeaders(), clientHeadersSnapshot, jettyRequest);
if (entity != null) {
jettyRequest.content(entity);
} else {
clientHeadersSnapshot.putAll(writeOutBoundHeaders(jerseyRequest.getHeaders(), jettyRequest));
}

try {
Expand Down Expand Up @@ -362,12 +367,15 @@ private Map<String, String> writeOutBoundHeaders(final MultivaluedMap<String, Ob
// remove User-agent header set by Jetty; Jersey already sets this in its request (incl. Jetty version)
request.getHeaders().remove(HttpHeader.USER_AGENT);
for (final Map.Entry<String, String> e : stringHeaders.entrySet()) {
request.getHeaders().add(e.getKey(), e.getValue());
request.getHeaders().put(e.getKey(), e.getValue());
}
return stringHeaders;
}

private ContentProvider getBytesProvider(final ClientRequest clientRequest) {
private ContentProvider getBytesProvider(final ClientRequest clientRequest,
final MultivaluedMap<String, Object> headers,
final Map<String, String> snapshot,
final Request request) {
final Object entity = clientRequest.getEntity();

if (entity == null) {
Expand All @@ -378,6 +386,7 @@ private ContentProvider getBytesProvider(final ClientRequest clientRequest) {
clientRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() {
@Override
public OutputStream getOutputStream(final int contentLength) throws IOException {
snapshot.putAll(writeOutBoundHeaders(headers, request));
return outputStream;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
Expand Down Expand Up @@ -77,6 +78,11 @@ CompletableFuture<HttpResponseStatus> processExpect100ContinueRequest(HttpReques
final DefaultFullHttpRequest nettyRequestHeaders =
new DefaultFullHttpRequest(nettyRequest.protocolVersion(), nettyRequest.method(), nettyRequest.uri());
nettyRequestHeaders.headers().setAll(nettyRequest.headers());

if (!nettyRequestHeaders.headers().contains(HttpHeaderNames.HOST)) {
nettyRequestHeaders.headers().add(HttpHeaderNames.HOST, jerseyRequest.getUri().getHost());
}

//If Expect:100-continue feature is enabled and client supports it, the nettyRequestHeaders will be
//enriched with the 'Expect:100-continue' header.
expect100ContinueExtension.invoke(jerseyRequest, nettyRequestHeaders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,23 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Stream;

import javax.net.ssl.SSLContext;
import javax.ws.rs.ProcessingException;
Expand Down Expand Up @@ -383,11 +386,13 @@ protected void initChannel(SocketChannel ch) throws Exception {
}

// headers
setHeaders(jerseyRequest, nettyRequest.headers(), false);
if (!jerseyRequest.hasEntity()) {
setHeaders(jerseyRequest, nettyRequest.headers(), false);

// host header - http 1.1
if (!nettyRequest.headers().contains(HttpHeaderNames.HOST)) {
nettyRequest.headers().add(HttpHeaderNames.HOST, jerseyRequest.getUri().getHost());
// host header - http 1.1
if (!nettyRequest.headers().contains(HttpHeaderNames.HOST)) {
nettyRequest.headers().add(HttpHeaderNames.HOST, jerseyRequest.getUri().getHost());
}
}

if (jerseyRequest.hasEntity()) {
Expand Down Expand Up @@ -428,25 +433,22 @@ public void operationComplete(io.netty.util.concurrent.Future<? super Void> futu
}
}

if (!expect100ContinueHandler.isExpected()) {
// Send the HTTP request. Expect:100-continue processing is not applicable
// in this case.
entityWriter.writeAndFlush(nettyRequest);
}
final CountDownLatch headersSet = new CountDownLatch(1);
final CountDownLatch contentLengthSet = new CountDownLatch(1);

jerseyRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() {
@Override
public OutputStream getOutputStream(int contentLength) throws IOException {
replaceHeaders(jerseyRequest, nettyRequest.headers()); // WriterInterceptor changes
if (!nettyRequest.headers().contains(HttpHeaderNames.HOST)) {
nettyRequest.headers().add(HttpHeaderNames.HOST, jerseyRequest.getUri().getHost());
}
headersSet.countDown();

return entityWriter.getOutputStream();
}
});

if (HttpUtil.isTransferEncodingChunked(nettyRequest)) {
entityWriter.write(new HttpChunkedInput(entityWriter.getChunkedInput()));
} else {
entityWriter.write(entityWriter.getChunkedInput());
}

executorService.execute(new Runnable() {
@Override
public void run() {
Expand All @@ -457,9 +459,8 @@ public void run() {
jerseyRequest.writeEntity();

if (entityWriter.getType() == NettyEntityWriter.Type.DELAYED) {
replaceHeaders(jerseyRequest, nettyRequest.headers()); // WriterInterceptor changes
nettyRequest.headers().set(HttpHeaderNames.CONTENT_LENGTH, entityWriter.getLength());
entityWriter.flush();
contentLengthSet.countDown();
}

} catch (IOException e) {
Expand All @@ -468,9 +469,23 @@ public void run() {
}
});

if (entityWriter.getType() != NettyEntityWriter.Type.DELAYED) {
entityWriter.flush();
headersSet.await();
if (!expect100ContinueHandler.isExpected()) {
// Send the HTTP request. Expect:100-continue processing is not applicable
// in this case.
entityWriter.writeAndFlush(nettyRequest);
}

if (HttpUtil.isTransferEncodingChunked(nettyRequest)) {
entityWriter.write(new HttpChunkedInput(entityWriter.getChunkedInput()));
} else {
entityWriter.write(entityWriter.getChunkedInput());
}

if (entityWriter.getType() == NettyEntityWriter.Type.DELAYED) {
contentLengthSet.await();
}
entityWriter.flush();
} else {
// Send the HTTP request.
ch.writeAndFlush(nettyRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ public class BufferedTest extends JerseyTest {
public static class BufferedTestResource {
@POST
public String post(@Context HttpHeaders headers, String entity) {
System.out.println("Remote");
String ret = headers.getHeaderString(HEADER_1)
+ headers.getHeaderString(HEADER_2)
+ headers.getHeaderString(HEADER_3)
+ entity;
System.out.println(ret);
return ret;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2022 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2023 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -16,6 +16,7 @@

package org.glassfish.jersey.netty.connector;

import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
Expand Down Expand Up @@ -46,6 +47,15 @@ public String post(
assertEquals("client", xClient);
return "POST";
}

@GET
public String get(
@HeaderParam("Transfer-Encoding") String transferEncoding,
@HeaderParam("X-CLIENT") String xClient,
@HeaderParam("X-WRITER") String xWriter) {
assertEquals("client", xClient);
return "GET";
}
}

@Override
Expand All @@ -66,4 +76,13 @@ public void testPost() {
assertTrue(response.hasEntity());
response.close();
}

@Test
public void testGet() {
Response response = target("test").request().header("X-CLIENT", "client").get();

assertEquals(200, response.getStatus());
assertTrue(response.hasEntity());
response.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.tests.e2e.client.connector;

import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.HttpUrlConnectorProvider;
import org.glassfish.jersey.client.spi.ConnectorProvider;
import org.glassfish.jersey.jdk.connector.JdkConnectorProvider;
import org.glassfish.jersey.jetty.connector.JettyConnectorProvider;
import org.glassfish.jersey.netty.connector.NettyConnectorProvider;
import org.glassfish.jersey.logging.LoggingFeature;
import org.glassfish.jersey.media.multipart.BodyPart;
import org.glassfish.jersey.media.multipart.BodyPartEntity;
import org.glassfish.jersey.media.multipart.MultiPart;
import org.glassfish.jersey.media.multipart.MultiPartFeature;
import org.glassfish.jersey.message.internal.ReaderWriter;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
import org.glassfish.jersey.test.TestProperties;
import org.glassfish.jersey.test.spi.TestHelper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DynamicContainer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestFactory;

import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MultiPartTest {

private static final Logger LOGGER = Logger.getLogger(RequestHeaderModificationsTest.class.getName());

public static List<ConnectorProvider> testData() {
return Arrays.asList(
new HttpUrlConnectorProvider(),
new JettyConnectorProvider(),
new NettyConnectorProvider(),
new JdkConnectorProvider()
);
}

@TestFactory
public Collection<DynamicContainer> generateTests() {
Collection<DynamicContainer> tests = new ArrayList<>();
for (ConnectorProvider provider : testData()) {
HttpMultipartTest test = new HttpMultipartTest(provider) {};
DynamicContainer container = TestHelper.toTestContainer(test,
String.format("MultiPartTest (%s)", provider.getClass().getSimpleName()));
tests.add(container);
}
return tests;
}

public abstract static class HttpMultipartTest extends JerseyTest {
private final ConnectorProvider connectorProvider;
private static final String ENTITY = "hello";

public HttpMultipartTest(ConnectorProvider connectorProvider) {
this.connectorProvider = connectorProvider;
}

@Override
protected Application configure() {
set(TestProperties.RECORD_LOG_LEVEL, Level.WARNING.intValue());
enable(TestProperties.LOG_TRAFFIC);
return new ResourceConfig(MultipartResource.class)
.register(MultiPartFeature.class)
.register(new LoggingFeature(LOGGER, LoggingFeature.Verbosity.HEADERS_ONLY));
}

@Override
protected void configureClient(ClientConfig clientConfig) {
clientConfig.connectorProvider(connectorProvider);
clientConfig.register(MultiPartFeature.class);
}

@Path("/")
public static class MultipartResource {
@POST
@Path("/upload")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public String upload(@Context HttpHeaders headers, MultiPart multiPart) throws IOException {
return ReaderWriter.readFromAsString(
((BodyPartEntity) multiPart.getBodyParts().get(0).getEntity()).getInputStream(),
multiPart.getMediaType());
}
}

@Test
public void testMultipart() {
MultiPart multipart = new MultiPart().bodyPart(new BodyPart().entity(ENTITY));
multipart.setMediaType(MediaType.MULTIPART_FORM_DATA_TYPE);

for (int i = 0; i != 5; i++) {
try (Response r = target().register(MultiPartFeature.class)
.path("upload")
.request()
.post(Entity.entity(multipart, multipart.getMediaType()))) {
Assertions.assertEquals(Response.Status.OK.getStatusCode(), r.getStatus());
Assertions.assertEquals(ENTITY, r.readEntity(String.class));
}
}
}
}
}

0 comments on commit fcc1bb1

Please sign in to comment.