Skip to content

Commit

Permalink
Make sure the RX invoker gets ExecutorService from ExecutorServicePro…
Browse files Browse the repository at this point in the history
…vider (#4429)

Signed-off-by: Jan Supol <[email protected]>
  • Loading branch information
jansupol committed Apr 17, 2020
1 parent d88025b commit 3d04682
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2011, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -451,12 +451,19 @@ public Builder property(final String name, final Object value) {

@Override
public CompletionStageRxInvoker rx() {
return new JerseyCompletionStageRxInvoker(this);
return rx(JerseyCompletionStageRxInvoker.class);
}

@Override
public <T extends RxInvoker> T rx(Class<T> clazz) {
if (clazz == JerseyCompletionStageRxInvoker.class) {
final ExecutorService configured = request().getClientConfig().getExecutorService();
if (configured == null) {
final ExecutorService provided = executorService();
if (provided != null) {
request().getClientConfig().executorService(provided);

This comment has been minimized.

Copy link
@olotenko

olotenko Jan 15, 2021

This line causes creation of Connector for every invocation of rx(), and ruins connection reuse.

}
}
return (T) new JerseyCompletionStageRxInvoker(this);
}
return createRxInvoker(clazz, executorService());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -16,28 +16,35 @@

package org.glassfish.jersey.client;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.ClientRequestFilter;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.RxInvokerProvider;
import javax.ws.rs.client.SyncInvoker;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.Provider;

import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;

import org.glassfish.jersey.spi.ExecutorServiceProvider;
import org.hamcrest.core.AllOf;
import org.hamcrest.core.StringContains;
import org.junit.After;
import org.junit.Ignore;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
Expand All @@ -55,18 +62,21 @@ public class ClientRxTest {

public ClientRxTest() {
CLIENT = ClientBuilder.newClient();

// TODO JAX-RS 2.1
// CLIENT_WITH_EXECUTOR = ClientBuilder.newBuilder().executorService(EXECUTOR_SERVICE).build();
CLIENT_WITH_EXECUTOR = null;
CLIENT_WITH_EXECUTOR = ClientBuilder.newBuilder().executorService(EXECUTOR_SERVICE).build();
}

@Rule
public ExpectedException thrown = ExpectedException.none();

@After
public void afterClass() {
public void afterTest() {
CLIENT.close();
CLIENT_WITH_EXECUTOR.close();
}

@AfterClass
public static void afterClass() {
EXECUTOR_SERVICE.shutdownNow();
}

@Test
Expand All @@ -80,19 +90,57 @@ public void testRxInvoker() {
}

@Test
@Ignore("TODO JAX-RS 2.1")
public void testRxInvokerWithExecutor() {
// implicit register (not saying that the contract is RxInvokerProvider).
CLIENT.register(TestRxInvokerProvider.class);
String s = target(CLIENT_WITH_EXECUTOR).register(TestRxInvokerProvider.class).request().rx(TestRxInvoker.class).get();

ExecutorService executorService = Executors
.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("rxTest-%d").build());
String s = target(CLIENT_WITH_EXECUTOR).request().rx(TestRxInvoker.class).get();
assertTrue("Provided RxInvoker was not used.", s.startsWith("rxTestInvoker"));
assertTrue("Executor Service was not passed to RxInvoker", s.contains("rxTest-"));
}

@Test
public void testDefaultRxInvokerWithExecutor() throws ExecutionException, InterruptedException {
AtomicReference<String> threadName = new AtomicReference<>();
ClientRequestFilter threadFilter = (f) -> { threadName.set(Thread.currentThread().getName()); };
ClientRequestFilter abortFilter = (f) -> { f.abortWith(Response.ok().build()); };
try (Response r = target(CLIENT_WITH_EXECUTOR)
.register(threadFilter, 100)
.register(abortFilter, 200)
.request().rx().get().toCompletableFuture().get()) {

assertEquals(200, r.getStatus());
assertTrue("Executor Service was not passed to RxInvoker", threadName.get().contains("rxTest-"));
}
}

@Test
public void testRxInvokerWithExecutorServiceProvider() {
AtomicReference<String> threadName = new AtomicReference<>();
String s = target(CLIENT)
.register(TestRxInvokerProvider.class, 200)
.register(TestExecutorServiceProvider.class)
.request().rx(TestRxInvoker.class).get();

assertTrue("Provided RxInvoker was not used.", s.startsWith("rxTestInvoker"));
assertTrue("Executor Service was not passed to RxInvoker", s.contains("rxTest-"));
}

@Test
public void testDefaultRxInvokerWithExecutorServiceProvider() throws ExecutionException, InterruptedException {
AtomicReference<String> threadName = new AtomicReference<>();
ClientRequestFilter threadFilter = (f) -> { threadName.set(Thread.currentThread().getName()); };
ClientRequestFilter abortFilter = (f) -> { f.abortWith(Response.ok().build()); };
try (Response r = target(CLIENT)
.register(threadFilter, 100)
.register(abortFilter, 200)
.register(TestExecutorServiceProvider.class)
.request().rx().get().toCompletableFuture().get()) {

assertEquals(200, r.getStatus());
assertTrue("Executor Service was not passed to RxInvoker", threadName.get().contains("rxTest-"));
}
}

@Test
public void testRxInvokerInvalid() {
Invocation.Builder request = target(CLIENT).request();
Expand Down Expand Up @@ -146,4 +194,17 @@ public <R> String method(String name, Entity<?> entity, GenericType<R> responseT
return "rxTestInvoker" + (getExecutorService() == null ? "" : " rxTest-");
}
}

private static class TestExecutorServiceProvider implements ExecutorServiceProvider {

@Override
public ExecutorService getExecutorService() {
return EXECUTOR_SERVICE;
}

@Override
public void dispose(ExecutorService executorService) {
//@After
}
}
}

0 comments on commit 3d04682

Please sign in to comment.