Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threads stuck when jersey client with apache connector is used concurrently #4960

Closed
tgillain opened this issue Jan 10, 2022 · 5 comments · Fixed by #4969
Closed

Threads stuck when jersey client with apache connector is used concurrently #4960

tgillain opened this issue Jan 10, 2022 · 5 comments · Fixed by #4969

Comments

@tgillain
Copy link

Hello Jersey Maintainers,

It seems the bug solved in #3772 came back in version 2.30 and is still present in 2.35.
We ran into it in our product but it is still reproducible using the code sample in #3772.

Thanks in advance for your feedback.
Brs.

Th.

@jansupol
Copy link
Contributor

What Apache HTTP client version is being used?

@tgillain
Copy link
Author

We are currently using 4.5.13.
With this version it is still working fine with Jersey 2.29.1 but fails starting from 2.30.

@jansupol
Copy link
Contributor

Relates to #4321.

@tgillain
Copy link
Author

I tried to apply the changes from #4321 to the sample of #3772.

Threads are still remaining stuck.

`import java.net.URI;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;

import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.glassfish.jersey.apache.connector.ApacheClientProperties;
import org.glassfish.jersey.apache.connector.ApacheConnectionClosingStrategy;
import org.glassfish.jersey.apache.connector.ApacheConnectorProvider;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.jackson.JacksonFeature;

import com.spotify.docker.client.LogsResponseReader;
import com.spotify.docker.client.ObjectMapperProvider;
import com.spotify.docker.client.ProgressResponseReader;
import com.spotify.docker.client.UnixConnectionSocketFactory;
import com.spotify.docker.client.messages.Container;
import com.spotify.docker.client.messages.ContainerInfo;

import static javax.ws.rs.HttpMethod.;
import static javax.ws.rs.core.MediaType.
;

public class MultipleThreadsCountDown {
public static void main(String[] args) throws InterruptedException {

    final URI originalUri = URI.create("unix:///var/run/docker.sock");
    final URI uri = UnixConnectionSocketFactory.sanitizeUri(originalUri);

    final Registry<ConnectionSocketFactory> registry = RegistryBuilder
            .<ConnectionSocketFactory>create()
            .register("http", PlainConnectionSocketFactory.getSocketFactory())
            .register("unix", new UnixConnectionSocketFactory(originalUri))
            .build();

    final PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registry);
    cm.setMaxTotal(100);
    cm.setDefaultMaxPerRoute(100);

    final RequestConfig requestConfig = RequestConfig.custom()
                                                     .build();

    final ClientConfig config = new ClientConfig(
            ObjectMapperProvider.class,
            JacksonFeature.class,
            LogsResponseReader.class,
            ProgressResponseReader.class)
            .connectorProvider(new ApacheConnectorProvider())
            .property(ApacheClientProperties.CONNECTION_MANAGER, cm)
            .property(ApacheClientProperties.REQUEST_CONFIG, requestConfig)
            .property(ApacheClientProperties.CONNECTION_CLOSING_STRATEGY, new ApacheConnectionClosingStrategy.GracefulClosingStrategy());

    final Client client = ClientBuilder.newBuilder()
                                       .withConfig(config)
                                       .build();

    final int threadCount = 100;

    final CountDownLatch startLatch = new CountDownLatch(threadCount);
    final CountDownLatch endLatch = new CountDownLatch(threadCount);
    for (int i = 0; i < threadCount; ++i) {
        new Thread(new Worker(i, client, uri, startLatch, endLatch)).start();
    }
    endLatch.await();
    System.out.println("All done");

    client.close();
}

static class Worker implements Runnable {
    private final Client client;
    private final CountDownLatch myEndLatch;
    private final URI uri;
    private final CountDownLatch myStartLatch;
    private final int myI;
    private static final GenericType<List<Container>> CONTAINER_LIST =
            new GenericType<List<Container>>() {
            };

    Worker(final int i,
           final Client client,
           final URI uri,
           final CountDownLatch startLatch,
           final CountDownLatch endLatch) {
        myI = i;
        this.client = client;
        this.uri = uri;
        myStartLatch = startLatch;
        myEndLatch = endLatch;
    }

    public void run() {

        try {
            myStartLatch.countDown();
            myStartLatch.await();
            for (int i = 0; i < 10; i++) {
                List<Container> containers = listContainers();
                for (Container container : containers) {
                    // These two methods do the same thing - the first one uses ContainerInfo as result
                    // type, while the second one uses String as result type. When using the first one the threads
                    // will get stuck, while with the second one program works as expected
                    //System.out.println(inspectContainer(container.id()));
                    inspectContainerReturnString(container.id());
                }
                ping();
            }
        } catch (final Throwable e) {
            e.printStackTrace();
        } finally {
            System.out.println("done " + myI);
            myEndLatch.countDown();
        }
    }

    List<Container> listContainers() throws InterruptedException {
        final WebTarget resource = client.target(uri).path("v1.27").path("containers").path("json").queryParam("all", 1);

        return request(GET, CONTAINER_LIST, resource.request(APPLICATION_JSON_TYPE));
    }

    ContainerInfo inspectContainer(String containerId) throws InterruptedException {
        final WebTarget resource = client.target(uri).path("v1.27").path("containers").path(containerId).path("json");
        return request(GET, ContainerInfo.class, resource.request(APPLICATION_JSON_TYPE));
    }

    String inspectContainerReturnString(String containerId) throws InterruptedException {
        final WebTarget resource = client.target(uri).path("v1.27").path("containers").path(containerId).path("json");
        return request(GET, String.class, resource.request(APPLICATION_JSON_TYPE));
    }

    String ping() throws InterruptedException {
        final WebTarget resource = client.target(uri).path("v1.27").path("_ping");
        return request(GET, String.class, resource.request(APPLICATION_JSON_TYPE));
    }

    private <T> T request(final String method, final Class<T> clazz, final Invocation.Builder request)
            throws InterruptedException {
        try {
            return request.async().method(method, clazz).get();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private <T> T request(final String method, final GenericType<T> type, final Invocation.Builder request)
            throws InterruptedException {
        try {
            return request.async().method(method, type).get();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }
}

}`

If I understand correctly the changes from #4321, the detection of the needed closing strategy is even automatic
if (connectionClosingStrategy == null) { if (vi.getRelease().compareTo("4.5") > 0) { connectionClosingStrategy = ApacheConnectionClosingStrategy.GracefulClosingStrategy.INSTANCE; } else { connectionClosingStrategy = ApacheConnectionClosingStrategy.ImmediateClosingStrategy.INSTANCE; } }

Any idea ?

@jansupol
Copy link
Contributor

The sollution for this use case can be to use a user-defined closing strategy:

            .property(ApacheClientProperties.CONNECTION_CLOSING_STRATEGY,
                    (ApacheConnectionClosingStrategy) (config, request, response, stream) -> {
                try {
                    stream.close();
                } catch (Exception e) {
                    // timeout, no chunk ending
                } finally {
                    response.close();
                }
            });

Unfortunately, while this strategy can help with this use-case, it makes some other use-cases hang. This would need to be investigated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants