Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not create a connector multiple times for each rx() call #4705

Merged
merged 1 commit into from
Feb 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2011, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -19,6 +19,7 @@
import java.lang.reflect.Type;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CancellationException;
Expand Down Expand Up @@ -61,10 +62,14 @@
import org.glassfish.jersey.client.internal.ClientResponseProcessingException;
import org.glassfish.jersey.client.internal.LocalizationMessages;
import org.glassfish.jersey.internal.MapPropertiesDelegate;
import org.glassfish.jersey.internal.inject.Bindings;
import org.glassfish.jersey.internal.inject.DisposableSupplier;
import org.glassfish.jersey.internal.inject.Providers;
import org.glassfish.jersey.internal.inject.ServiceHolder;
import org.glassfish.jersey.internal.util.Producer;
import org.glassfish.jersey.internal.util.PropertiesHelper;
import org.glassfish.jersey.internal.util.ReflectionHelper;
import org.glassfish.jersey.process.internal.ExecutorProviders;
import org.glassfish.jersey.process.internal.RequestScope;
import org.glassfish.jersey.spi.ExecutorServiceProvider;

Expand Down Expand Up @@ -474,7 +479,7 @@ public <T extends RxInvoker> T rx(Class<T> clazz) {
if (configured == null) {
final ExecutorService provided = executorService();
if (provided != null) {
request().getClientConfig().executorService(provided);
((ClientConfig) request().getConfiguration()).executorService(provided);
}
}
return (T) new JerseyCompletionStageRxInvoker(this);
Expand All @@ -498,9 +503,36 @@ private ExecutorService executorService() {
return result;
}

return this.requestContext.getInjectionManager()
.getInstance(ExecutorServiceProvider.class)
.getExecutorService();
final List<ServiceHolder<ExecutorServiceProvider>> serviceHolders =
this.requestContext.getInjectionManager().getAllServiceHolders(ExecutorServiceProvider.class);

BestServiceHolder best = serviceHolders.stream()
.map(BestServiceHolder::new).sorted((a, b) -> a.isBetterThen(b) ? -1 : 1).findFirst().get();

return best.provider.getExecutorService();
}

/*
* Priority goes to: 1) user async
* 2) user nonasync
* 3) default async
*/
private static final class BestServiceHolder {
private final ExecutorServiceProvider provider;
private final int value;

private BestServiceHolder(ServiceHolder<ExecutorServiceProvider> holder) {
provider = holder.getInstance();
boolean isDefault = DefaultClientAsyncExecutorProvider.class.equals(holder.getImplementationClass())
|| ClientExecutorProvidersConfigurator.ClientExecutorServiceProvider.class
.equals(holder.getImplementationClass());
boolean isAsync = holder.getImplementationClass().getAnnotation(ClientAsyncExecutor.class) != null;
value = 10 * (isDefault ? 0 : 1) + (isAsync ? 1 : 0);
}

public boolean isBetterThen(BestServiceHolder other) {
return this.value > other.value;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -16,9 +16,16 @@

package org.glassfish.jersey.client;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import javax.ws.rs.client.Client;
Expand All @@ -29,10 +36,12 @@
import javax.ws.rs.client.RxInvokerProvider;
import javax.ws.rs.client.SyncInvoker;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Configuration;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.Provider;

import org.glassfish.jersey.client.spi.Connector;
import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;

import org.glassfish.jersey.spi.ExecutorServiceProvider;
Expand All @@ -54,8 +63,9 @@
*/
public class ClientRxTest {

private static final ExecutorService EXECUTOR_SERVICE =
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("rxTest-%d").build());
private static final ExecutorService EXECUTOR_SERVICE = new ClientRxExecutorServiceWrapper(
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("rxTest-%d").build())
);

private final Client CLIENT;
private final Client CLIENT_WITH_EXECUTOR;
Expand Down Expand Up @@ -160,6 +170,35 @@ public void testRxInvokerNotRegistered() {
request.rx(TestRxInvoker.class).get();
}

@Test
public void testConnectorIsReusedWhenRx() throws ExecutionException, InterruptedException {
final AtomicInteger atomicInteger = new AtomicInteger(0);
HttpUrlConnectorProvider provider = new HttpUrlConnectorProvider() {
@Override
public Connector getConnector(Client client, Configuration config) {
atomicInteger.incrementAndGet();
return super.getConnector(client, config);
}
};

ClientConfig clientConfig = new ClientConfig();
clientConfig.connectorProvider(provider);

ClientRequestFilter abortFilter = (f) -> { f.abortWith(Response.ok().build()); };
Client client = ClientBuilder.newClient(clientConfig).register(abortFilter);

AtomicReference<String> threadName = new AtomicReference<>();
for (int cnt = 0; cnt != 5; cnt++) {
try (Response r = target(client)
.request().rx().get().toCompletableFuture().get()) {

assertEquals(200, r.getStatus());
assertEquals(1, atomicInteger.get());
}
}

}

private WebTarget target(Client client) {
// Uri is not relevant, the call won't be ever executed.
return client.target("http://localhost:9999");
Expand Down Expand Up @@ -207,4 +246,141 @@ public void dispose(ExecutorService executorService) {
//@After
}
}

// -----------------------------------------------------------------------------------------------------

@Test
public void testRxInvokerWithPriorityExecutorServiceProvider() {
AtomicReference<String> threadName = new AtomicReference<>();
String s = target(CLIENT)
.register(PriorityTestRxInvokerProvider.class)
.register(TestExecutorServiceProvider.class)
.register(PriorityTestExecutorServiceProvider.class)
.request().rx(PriorityTestRxInvoker.class).get();

assertTrue("Provided RxInvoker was not used.", s.startsWith("PriorityTestRxInvoker"));
assertTrue("@ClientAsyncExecutor Executor Service was not passed to RxInvoker", s.contains("TRUE"));
}

@ClientAsyncExecutor
private static class PriorityTestExecutorServiceProvider extends TestExecutorServiceProvider {
@Override
public ExecutorService getExecutorService() {
return new ClientRxExecutorServiceWrapper(EXECUTOR_SERVICE) {
//new class
};
}
}

@Provider
public static class PriorityTestRxInvokerProvider implements RxInvokerProvider<PriorityTestRxInvoker> {
@Override
public PriorityTestRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
return new PriorityTestRxInvoker(syncInvoker, executorService);
}

@Override
public boolean isProviderFor(Class<?> clazz) {
return PriorityTestRxInvoker.class.equals(clazz);
}
}

private static class PriorityTestRxInvoker extends AbstractRxInvoker<String> {

private PriorityTestRxInvoker(SyncInvoker syncInvoker, ExecutorService executor) {
super(syncInvoker, executor);
}

@Override
public <R> String method(String name, Entity<?> entity, Class<R> responseType) {
return "PriorityTestRxInvoker " + (getExecutorService() != null
&& !ClientRxExecutorServiceWrapper.class.equals(getExecutorService().getClass())
&& ClientRxExecutorServiceWrapper.class.isInstance(getExecutorService()) ? "TRUE" : "FALSE");
}

@Override
public <R> String method(String name, Entity<?> entity, GenericType<R> responseType) {
return method(null, null, (Class<?>) null);
}
}

// -----------------------------------------------------------------------------------------------------

/**
* Wrap the executor service to distinguish the executor service obtained from the Injection Manager by class name
*/
private static class ClientRxExecutorServiceWrapper implements ExecutorService {
private final ExecutorService executorService;

private ClientRxExecutorServiceWrapper(ExecutorService executorService) {
this.executorService = executorService;
}

@Override
public void shutdown() {
executorService.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return executorService.shutdownNow();
}

@Override
public boolean isShutdown() {
return executorService.isShutdown();
}

@Override
public boolean isTerminated() {
return executorService.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executorService.awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return executorService.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return executorService.submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return executorService.submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return executorService.invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return invokeAny(tasks, timeout, unit);
}

@Override
public void execute(Runnable command) {
executorService.execute(command);
}
}
}