Skip to content

Commit

Permalink
Change JettyConnector 'readTimeout' behavior to match socket read tim… (
Browse files Browse the repository at this point in the history
#5114)

* Change JettyConnector 'readTimeout' behavior to match socket read timeout definition - e.g., ApacheConnector behavior matches it.

* Read timeout: Time on waiting to receive the first data byte.

The `timeout` method timeouts the request even if data were already received, capping the query to a maximum execution time.
This behavior is problematic when streaming data over a prolonged duration; the client has already received data bytes, but data continues to flow.

I provided a jetty specific property (jersey.config.jetty.client.totalTimeout) that configures the 'totalTimeout' when required.
  • Loading branch information
daijithegeek committed Sep 2, 2022
1 parent 16eaaff commit 77074f1
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ private JettyClientProperties() {
public static final String SYNC_LISTENER_RESPONSE_MAX_SIZE =
"jersey.config.jetty.client.syncListenerResponseMaxSize";

/**
* Total timeout interval, in milliseconds.
* <p>
* The value MUST be an instance convertible to {@link java.lang.Integer}. A
* value of zero (0) is equivalent to an interval of infinity.
* </p>
* <p>
* The default value is infinity (0).
* </p>
* <p>
* The name of the configuration property is <tt>{@value}</tt>.
* </p>
*/
public static final String TOTAL_TIMEOUT = "jersey.config.jetty.client.totalTimeout";

/**
* Get the value of the specified property.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,14 @@ private Request translateRequest(final ClientRequest clientRequest) {
request.followRedirects(clientRequest.resolveProperty(ClientProperties.FOLLOW_REDIRECTS, true));
final Object readTimeout = clientRequest.resolveProperty(ClientProperties.READ_TIMEOUT, -1);
if (readTimeout != null && readTimeout instanceof Integer && (Integer) readTimeout > 0) {
request.timeout((Integer) readTimeout, TimeUnit.MILLISECONDS);
request.idleTimeout((Integer) readTimeout, TimeUnit.MILLISECONDS);
}

final Object totalTimeout = clientRequest.resolveProperty(JettyClientProperties.TOTAL_TIMEOUT, -1);
if (totalTimeout != null && totalTimeout instanceof Integer && (Integer) totalTimeout > 0) {
request.timeout((Integer) totalTimeout, TimeUnit.MILLISECONDS);
}

return request;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2022 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -16,30 +16,37 @@

package org.glassfish.jersey.jetty.connector;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;

import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.QueryParam;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;

import org.glassfish.jersey.CommonProperties;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.logging.LoggingFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;

import org.junit.Test;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

/**
* @author Martin Matula
Expand All @@ -65,6 +72,48 @@ public String getTimeout() {
}
return "GET";
}

/**
* Long-running streaming request
*
* @param count number of packets send
* @param pauseMillis pause between each packets
*/
@GET
@Path("stream")
public Response streamsWithDelay(@QueryParam("start") @DefaultValue("0") int startMillis, @QueryParam("count") int count,
@QueryParam("pauseMillis") int pauseMillis) {
StreamingOutput streamingOutput = streamSlowly(startMillis, count, pauseMillis);

return Response.ok(streamingOutput)
.build();
}
}

private static StreamingOutput streamSlowly(int startMillis, int count, int pauseMillis) {

return output -> {
try {
TimeUnit.MILLISECONDS.sleep(startMillis);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
output.write("begin\n".getBytes(StandardCharsets.UTF_8));
output.flush();
for (int i = 0; i < count; i++) {
try {
TimeUnit.MILLISECONDS.sleep(pauseMillis);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

output.write(("message " + i + "\n").getBytes(StandardCharsets.UTF_8));
output.flush();
}
output.write("end".getBytes(StandardCharsets.UTF_8));
};
}

@Override
Expand Down Expand Up @@ -121,4 +170,74 @@ public void testTimeoutInRequest() {
c.close();
}
}

/**
* Test accessing an operation that is streaming slowly
*
* @throws ProcessingException in case of a test error.
*/
@Test
public void testSlowlyStreamedContentDoesNotReadTimeout() throws Exception {

int count = 5;
int pauseMillis = 50;

final Response response = target("test")
.property(ClientProperties.READ_TIMEOUT, 100L)
.property(CommonProperties.OUTBOUND_CONTENT_LENGTH_BUFFER_SERVER, "-1")
.path("stream")
.queryParam("count", count)
.queryParam("pauseMillis", pauseMillis)
.request().get();

assertTrue(response.readEntity(String.class).contains("end"));
}

@Test
public void testSlowlyStreamedContentDoesTotalTimeout() throws Exception {

int count = 5;
int pauseMillis = 50;

try {
target("test")
.property(JettyClientProperties.TOTAL_TIMEOUT, 100L)
.property(CommonProperties.OUTBOUND_CONTENT_LENGTH_BUFFER_SERVER, "-1")
.path("stream")
.queryParam("count", count)
.queryParam("pauseMillis", pauseMillis)
.request().get();

fail("This operation should trigger total timeout");
} catch (ProcessingException e) {
assertEquals(TimeoutException.class, e.getCause().getClass());
}
}

/**
* Test accessing an operation that is streaming slowly
*
* @throws ProcessingException in case of a test error.
*/
@Test
public void testSlowToStartStreamedContentDoesReadTimeout() throws Exception {

int start = 150;
int count = 5;
int pauseMillis = 50;

try {
target("test")
.property(ClientProperties.READ_TIMEOUT, 100L)
.property(CommonProperties.OUTBOUND_CONTENT_LENGTH_BUFFER_SERVER, "-1")
.path("stream")
.queryParam("start", start)
.queryParam("count", count)
.queryParam("pauseMillis", pauseMillis)
.request().get();
fail("This operation should trigger idle timeout");
} catch (ProcessingException e) {
assertEquals(TimeoutException.class, e.getCause().getClass());
}
}
}

0 comments on commit 77074f1

Please sign in to comment.