Skip to content

Commit

Permalink
TimeOut property for Netty Connector
Browse files Browse the repository at this point in the history
MaxConnections for Netty Connector
MaxConnectionsTotal for NettyConnector

Fixes eclipse-ee4j#4548

Signed-off-by: jansupol <[email protected]>
  • Loading branch information
jansupol committed Sep 23, 2020
1 parent ece6708 commit 62ec3b6
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import javax.ws.rs.core.Response;

Expand All @@ -36,6 +37,7 @@
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.timeout.IdleStateEvent;

/**
* Jersey implementation of Netty channel handler.
Expand All @@ -51,6 +53,8 @@ class JerseyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
private NettyInputStream nis;
private ClientResponse jerseyResponse;

private boolean readTimedOut;

JerseyClientHandler(ClientRequest request,
CompletableFuture<ClientResponse> responseAvailable,
CompletableFuture<?> responseDone) {
Expand All @@ -67,7 +71,12 @@ public void channelReadComplete(ChannelHandlerContext ctx) {
@Override
public void channelInactive(ChannelHandlerContext ctx) {
// assert: no-op, if channel is closed after LastHttpContent has been consumed
responseDone.completeExceptionally(new IOException("Stream closed"));

if (readTimedOut) {
responseDone.completeExceptionally(new TimeoutException("Stream closed: read timeout"));
} else {
responseDone.completeExceptionally(new IOException("Stream closed"));
}
}

protected void notifyResponse() {
Expand Down Expand Up @@ -145,4 +154,14 @@ public int read() throws IOException {
public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) {
responseDone.completeExceptionally(cause);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
readTimedOut = true;
ctx.close();
} else {
super.userEventTriggered(ctx, evt);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.netty.connector;

import org.glassfish.jersey.internal.util.PropertiesClass;

/**
* Configuration options specific to the Client API that utilizes {@link NettyConnectorProvider}.
*
* @since 2.32
*/
@PropertiesClass
public class NettyClientProperties {

/**
* <p>
* This property determines the maximum number of idle connections that will be simultaneously kept alive
* in total, rather than per destination. The default is 60.
* </p>
*/
public static final String MAX_CONNECTIONS_TOTAL = "jersey.config.client.maxTotalConnections";

/**
* <p>
* This property determines the maximum number of idle connections that will be simultaneously kept alive, per destination.
* The default is 5.
* </p>
* <p>
* This property is a Jersey alternative to System property {@code}http.maxConnections{@code}. The Jersey property takes
* precedence over the system property.
* </p>
*/
public static final String MAX_CONNECTIONS = "jersey.config.client.maxConnections";
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -37,6 +37,8 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
Expand All @@ -58,6 +60,9 @@
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GenericFutureListener;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.ClientRequest;
Expand All @@ -79,9 +84,30 @@ class NettyConnector implements Connector {
final Client client;
final HashMap<String, ArrayList<Channel>> connections = new HashMap<>();

// If HTTP keepalive is enabled the value of "http.maxConnections" determines the maximum number
// of idle connections that will be simultaneously kept alive, per destination.
private static final String HTTP_KEEPALIVE_STRING = System.getProperty("http.keepAlive");
// http.keepalive (default: true)
private static final Boolean HTTP_KEEPALIVE =
HTTP_KEEPALIVE_STRING == null ? Boolean.TRUE : Boolean.parseBoolean(HTTP_KEEPALIVE_STRING);

// http.maxConnections (default: 5)
private static final int DEFAULT_MAX_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = Integer.getInteger("http.maxConnections", DEFAULT_MAX_POOL_SIZE);
private static final int MAX_POOL_IDLE = 60;

private final Integer maxPoolSize; // either from system property, or from Jersey config, or default
private final Integer maxPoolIdle; // either from Jersey config, or default

private static final String INACTIVE_POOLED_CONNECTION_HANDLER = "inactive_pooled_connection_handler";
private static final String PRUNE_INACTIVE_POOL = "prune_inactive_pool";
private static final String READ_TIMEOUT_HANDLER = "read_timeout_handler";
private static final String REQUEST_HANDLER = "request_handler";

NettyConnector(Client client) {

final Object threadPoolSize = client.getConfiguration().getProperties().get(ClientProperties.ASYNC_THREADPOOL_SIZE);
final Map<String, Object> properties = client.getConfiguration().getProperties();
final Object threadPoolSize = properties.get(ClientProperties.ASYNC_THREADPOOL_SIZE);

if (threadPoolSize != null && threadPoolSize instanceof Integer && (Integer) threadPoolSize > 0) {
executorService = Executors.newFixedThreadPool((Integer) threadPoolSize);
Expand All @@ -92,20 +118,31 @@ class NettyConnector implements Connector {
}

this.client = client;

final Object maxPoolIdleProperty = properties.get(NettyClientProperties.MAX_CONNECTIONS_TOTAL);
final Object maxPoolSizeProperty = properties.get(NettyClientProperties.MAX_CONNECTIONS);

maxPoolIdle = maxPoolIdleProperty != null ? (Integer) maxPoolIdleProperty : MAX_POOL_IDLE;
maxPoolSize = maxPoolSizeProperty != null
? (Integer) maxPoolSizeProperty
: (HTTP_KEEPALIVE ? MAX_POOL_SIZE : DEFAULT_MAX_POOL_SIZE);

if (maxPoolIdle == null || maxPoolIdle < 0) {
throw new ProcessingException(LocalizationMessages.WRONG_MAX_POOL_IDLE(maxPoolIdle));
}

if (maxPoolSize == null || maxPoolSize < 0) {
throw new ProcessingException(LocalizationMessages.WRONG_MAX_POOL_SIZE(maxPoolIdle));
}
}

@Override
public ClientResponse apply(ClientRequest jerseyRequest) {
try {
CompletableFuture<ClientResponse> resultFuture = execute(jerseyRequest);

Integer timeout = jerseyRequest.resolveProperty(ClientProperties.READ_TIMEOUT, 0);

return (timeout != null && timeout > 0) ? resultFuture.get(timeout, TimeUnit.MILLISECONDS)
: resultFuture.get();
} catch (ExecutionException ex) {
Throwable e = ex.getCause() == null ? ex : ex.getCause();
throw new ProcessingException(e.getMessage(), e);
return execute(jerseyRequest).join();
} catch (CompletionException cex) {
final Throwable t = cex.getCause() == null ? cex : cex.getCause();
throw new ProcessingException(t.getMessage(), t);
} catch (Exception ex) {
throw new ProcessingException(ex.getMessage(), ex);
}
Expand All @@ -120,6 +157,11 @@ public Future<?> apply(final ClientRequest jerseyRequest, final AsyncConnectorCa
}

protected CompletableFuture<ClientResponse> execute(final ClientRequest jerseyRequest) {
Integer timeout = jerseyRequest.resolveProperty(ClientProperties.READ_TIMEOUT, 0);
if (timeout == null || timeout < 0) {
throw new ProcessingException(LocalizationMessages.WRONG_READ_TIMEOUT(timeout));
}

final CompletableFuture<ClientResponse> responseAvailable = new CompletableFuture<>();
final CompletableFuture<?> responseDone = new CompletableFuture<>();

Expand All @@ -128,6 +170,7 @@ protected CompletableFuture<ClientResponse> execute(final ClientRequest jerseyRe
int port = requestUri.getPort() != -1 ? requestUri.getPort() : "https".equals(requestUri.getScheme()) ? 443 : 80;

try {

String key = requestUri.getScheme() + "://" + host + ":" + port;
ArrayList<Channel> conns;
synchronized (connections) {
Expand All @@ -138,9 +181,16 @@ protected CompletableFuture<ClientResponse> execute(final ClientRequest jerseyRe
}
}

Channel chan;
Channel chan = null;
synchronized (conns) {
chan = conns.size() == 0 ? null : conns.remove(conns.size() - 1);
while (chan == null && !conns.isEmpty()) {
chan = conns.remove(conns.size() - 1);
chan.pipeline().remove(INACTIVE_POOLED_CONNECTION_HANDLER);
chan.pipeline().remove(PRUNE_INACTIVE_POOL);
if (!chan.isOpen()) {
chan = null;
}
}
}

if (chan == null) {
Expand Down Expand Up @@ -199,16 +249,30 @@ protected void initChannel(SocketChannel ch) throws Exception {
// will leak
final Channel ch = chan;
JerseyClientHandler clientHandler = new JerseyClientHandler(jerseyRequest, responseAvailable, responseDone);
ch.pipeline().addLast(clientHandler);
// read timeout makes sense really as an inactivity timeout
ch.pipeline().addLast(READ_TIMEOUT_HANDLER,
new IdleStateHandler(0, 0, timeout, TimeUnit.MILLISECONDS));
ch.pipeline().addLast(REQUEST_HANDLER, clientHandler);

responseDone.whenComplete((_r, th) -> {
ch.pipeline().remove(READ_TIMEOUT_HANDLER);
ch.pipeline().remove(clientHandler);

if (th == null) {
ch.pipeline().addLast(INACTIVE_POOLED_CONNECTION_HANDLER, new IdleStateHandler(0, 0, maxPoolIdle));
ch.pipeline().addLast(PRUNE_INACTIVE_POOL, new PruneIdlePool(connections, key));
synchronized (connections) {
ArrayList<Channel> conns1 = connections.get(key);
synchronized (conns1) {
if (conns1 == null) {
conns1 = new ArrayList<>(1);
conns1.add(ch);
connections.put(key, conns1);
} else {
synchronized (conns1) {
if (conns1.size() < maxPoolSize) {
conns1.add(ch);
} // else do not add the Channel to the idle pool
}
}
}
} else {
Expand Down Expand Up @@ -331,4 +395,35 @@ private static URI getProxyUri(final Object proxy) {
throw new ProcessingException(LocalizationMessages.WRONG_PROXY_URI_TYPE(ClientProperties.PROXY_URI));
}
}

protected static class PruneIdlePool extends ChannelDuplexHandler {
HashMap<String, ArrayList<Channel>> connections;
String key;

public PruneIdlePool(HashMap<String, ArrayList<Channel>> connections, String key) {
this.connections = connections;
this.key = key;
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.ALL_IDLE) {
ctx.close();
synchronized (connections) {
ArrayList<Channel> chans = connections.get(key);
synchronized (chans) {
chans.remove(ctx.channel());
if (chans.isEmpty()) {
connections.remove(key);
}
}
}
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2016, 2018 Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2016, 2020 Oracle and/or its affiliates. All rights reserved.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -15,3 +15,7 @@
#

wrong.proxy.uri.type=The proxy URI ("{0}") property MUST be an instance of String or URI.
wrong.read.timeout=Unexpected ("{0}") READ_TIMEOUT.
wrong.max.pool.size=Unexpected ("{0}") maximum number of connections per destination.
wrong.max.pool.idle=Unexpected ("{0}") maximum number of connections total.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.glassfish.jersey.netty.connector;

import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;

import javax.ws.rs.GET;
Expand Down Expand Up @@ -80,6 +81,7 @@ public void testSlow() {
target("test/timeout").property(ClientProperties.READ_TIMEOUT, 1_000).request().get();
fail("Timeout expected.");
} catch (ProcessingException e) {
assertEquals(e.getMessage(), "Stream closed: read timeout");
assertThat("Unexpected processing exception cause",
e.getCause(), instanceOf(TimeoutException.class));
}
Expand All @@ -91,8 +93,41 @@ public void testTimeoutInRequest() {
target("test/timeout").request().property(ClientProperties.READ_TIMEOUT, 1_000).get();
fail("Timeout expected.");
} catch (ProcessingException e) {
assertEquals(e.getMessage(), "Stream closed: read timeout");
assertThat("Unexpected processing exception cause",
e.getCause(), instanceOf(TimeoutException.class));
e.getCause(), instanceOf(TimeoutException.class));
}
}

@Test
public void testRxSlow() {
try {
target("test/timeout").property(ClientProperties.READ_TIMEOUT, 1_000).request()
.rx().get().toCompletableFuture().join();
fail("Timeout expected.");
} catch (CompletionException cex) {
assertThat("Unexpected async cause",
cex.getCause(), instanceOf(ProcessingException.class));
ProcessingException e = (ProcessingException) cex.getCause();
assertThat("Unexpected processing exception cause",
e.getCause(), instanceOf(TimeoutException.class));
assertEquals(e.getCause().getMessage(), "Stream closed: read timeout");
}
}

@Test
public void testRxTimeoutInRequest() {
try {
target("test/timeout").request().property(ClientProperties.READ_TIMEOUT, 1_000)
.rx().get().toCompletableFuture().join();
fail("Timeout expected.");
} catch (CompletionException cex) {
assertThat("Unexpected async cause",
cex.getCause(), instanceOf(ProcessingException.class));
ProcessingException e = (ProcessingException) cex.getCause();
assertThat("Unexpected processing exception cause",
e.getCause(), instanceOf(TimeoutException.class));
assertEquals(e.getCause().getMessage(), "Stream closed: read timeout");
}
}
}
Loading

0 comments on commit 62ec3b6

Please sign in to comment.