Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New client PreInvocationInterceptor and PostInvocationInterceptor SPI #4301

Merged
merged 3 commits into from
Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@
package org.glassfish.jersey.client;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.ClientRequestFilter;
import javax.ws.rs.client.ClientResponseFilter;
import javax.ws.rs.client.ResponseProcessingException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ReaderInterceptor;

import org.glassfish.jersey.client.internal.routing.AbortedRequestMediaTypeDeterminer;
import org.glassfish.jersey.client.internal.routing.ClientResponseMediaTypeDeterminer;
import org.glassfish.jersey.internal.inject.InjectionManager;
import org.glassfish.jersey.internal.inject.Providers;
import org.glassfish.jersey.message.internal.HeaderUtils;
import org.glassfish.jersey.message.internal.InboundMessageContext;
import org.glassfish.jersey.message.internal.OutboundJaxrsResponse;
import org.glassfish.jersey.model.internal.RankedComparator;
import org.glassfish.jersey.process.internal.AbstractChainableStage;
import org.glassfish.jersey.process.internal.ChainableStage;
Expand Down Expand Up @@ -62,6 +58,29 @@ static ChainableStage<ClientRequest> createRequestFilteringStage(InjectionManage
return requestFilters.iterator().hasNext() ? new RequestFilteringStage(requestFilters) : null;
}

/**
* Create client request filtering stage using the injection manager. May return {@code null}.
*
* @param firstFilter Non null {@link ClientRequestFilter client request filter} to be executed
* in the client request filtering stage.
* @param injectionManager injection manager to be used.
* @return configured request filtering stage, or {@code null} in case there are no
* {@link ClientRequestFilter client request filters} registered in the injection manager
* and {@code firstFilter} is null.
*/
static ChainableStage<ClientRequest> createRequestFilteringStage(ClientRequestFilter firstFilter,
InjectionManager injectionManager) {
RankedComparator<ClientRequestFilter> comparator = new RankedComparator<>(RankedComparator.Order.ASCENDING);
Iterable<ClientRequestFilter> requestFilters =
Providers.getAllProviders(injectionManager, ClientRequestFilter.class, comparator);
if (firstFilter != null && !requestFilters.iterator().hasNext()) {
return new RequestFilteringStage(Collections.singletonList(firstFilter));
} else if (firstFilter != null && requestFilters.iterator().hasNext()) {
return new RequestFilteringStage(prependFilter(firstFilter, requestFilters));
}
return null;
}

/**
* Create client response filtering stage using the injection manager. May return {@code null}.
*
Expand All @@ -76,6 +95,39 @@ static ChainableStage<ClientResponse> createResponseFilteringStage(InjectionMana
return responseFilters.iterator().hasNext() ? new ResponseFilterStage(responseFilters) : null;
}

/**
* Prepend an filter to a given iterable.
* @param filter to be prepend.
* @param filters the iterable the given filter is to be prependto
* @param <T> filter type
* @return iterable with first item of prepended filter.
*/
private static <T> Iterable<T> prependFilter(T filter, Iterable<T> filters) {
return new Iterable<T>() {
boolean wasInterceptorFilterNext = false;
final Iterator<T> filterIterator = filters.iterator();
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
@Override
public boolean hasNext() {
return !wasInterceptorFilterNext || filterIterator.hasNext();
}

@Override
public T next() {
if (wasInterceptorFilterNext) {
return filterIterator.next();
} else {
wasInterceptorFilterNext = true;
return filter;
}
}
};
}
};
}

private static final class RequestFilteringStage extends AbstractChainableStage<ClientRequest> {

private final Iterable<ClientRequestFilter> requestFilters;
Expand All @@ -91,24 +143,9 @@ public Continuation<ClientRequest> apply(ClientRequest requestContext) {
filter.filter(requestContext);
final Response abortResponse = requestContext.getAbortResponse();
if (abortResponse != null) {
if (abortResponse.hasEntity() && abortResponse.getMediaType() == null) {
final InboundMessageContext headerContext
= new InboundMessageContext(requestContext.getConfiguration()) {
@Override
protected Iterable<ReaderInterceptor> getReaderInterceptors() {
return null;
}
};
headerContext.headers(
HeaderUtils.asStringHeaders(abortResponse.getHeaders(), requestContext.getConfiguration())
);

final AbortedRequestMediaTypeDeterminer determiner = new AbortedRequestMediaTypeDeterminer(
requestContext.getWorkers());
final MediaType mediaType = determiner.determineResponseMediaType(abortResponse.getEntity(),
headerContext.getQualifiedAcceptableMediaTypes());
abortResponse.getHeaders().add(HttpHeaders.CONTENT_TYPE, mediaType);
}
final ClientResponseMediaTypeDeterminer determiner = new ClientResponseMediaTypeDeterminer(
requestContext.getWorkers());
determiner.setResponseMediaTypeIfNotSet(abortResponse, requestContext.getConfiguration());
throw new AbortException(new ClientResponse(requestContext, abortResponse));
}
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class ClientRuntime implements JerseyClient.ShutdownHook, ClientExecutor {
private final ManagedObjectsFinalizer managedObjectsFinalizer;
private final InjectionManager injectionManager;

private final InvocationInterceptorStages.PreInvocationInterceptorStage preInvocationInterceptorStage;
private final InvocationInterceptorStages.PostInvocationInterceptorStage postInvocationInterceptorStage;

/**
* Create new client request processing runtime.
*
Expand All @@ -95,7 +98,14 @@ public ClientRuntime(final ClientConfig config, final Connector connector, final

Stage.Builder<ClientRequest> requestingChainBuilder = Stages.chain(requestProcessingInitializationStage);

ChainableStage<ClientRequest> requestFilteringStage = ClientFilteringStages.createRequestFilteringStage(injectionManager);
preInvocationInterceptorStage = InvocationInterceptorStages.createPreInvocationInterceptorStage(injectionManager);
postInvocationInterceptorStage = InvocationInterceptorStages.createPostInvocationInterceptorStage(injectionManager);

ChainableStage<ClientRequest> requestFilteringStage = preInvocationInterceptorStage.hasPreInvocationInterceptors()
? ClientFilteringStages.createRequestFilteringStage(
preInvocationInterceptorStage.createPreInvocationInterceptorFilter(), injectionManager)
: ClientFilteringStages.createRequestFilteringStage(injectionManager);

this.requestProcessingRoot = requestFilteringStage != null
? requestingChainBuilder.build(requestFilteringStage) : requestingChainBuilder.build();

Expand Down Expand Up @@ -136,33 +146,41 @@ public ClientRuntime(final ClientConfig config, final Connector connector, final
* @return {@code Runnable} to be submitted for async processing using {@link #submit(Runnable)}.
*/
Runnable createRunnableForAsyncProcessing(ClientRequest request, final ResponseCallback callback) {
try {
requestScope.runInScope(() -> preInvocationInterceptorStage.beforeRequest(request));
} catch (Throwable throwable) {
return () -> requestScope.runInScope(() -> processFailure(request, throwable, callback));
}

return () -> requestScope.runInScope(() -> {
RuntimeException runtimeException = null;
try {
ClientRequest processedRequest;

try {
processedRequest = Stages.process(request, requestProcessingRoot);
processedRequest = addUserAgent(processedRequest, connector.getName());
} catch (final AbortException aborted) {
processResponse(aborted.getAbortResponse(), callback);
processResponse(request, aborted.getAbortResponse(), callback);
return;
}

final AsyncConnectorCallback connectorCallback = new AsyncConnectorCallback() {

@Override
public void response(final ClientResponse response) {
requestScope.runInScope(() -> processResponse(response, callback));
requestScope.runInScope(() -> processResponse(request, response, callback));
}

@Override
public void failure(final Throwable failure) {
requestScope.runInScope(() -> processFailure(failure, callback));
requestScope.runInScope(() -> processFailure(request, failure, callback));
}
};

connector.apply(processedRequest, connectorCallback);
} catch (final Throwable throwable) {
processFailure(throwable, callback);
processFailure(request, throwable, callback);
}
});
}
Expand Down Expand Up @@ -192,17 +210,38 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
return backgroundScheduler.get().schedule(command, delay, unit);
}

private void processResponse(final ClientResponse response, final ResponseCallback callback) {
final ClientResponse processedResponse;
private void processResponse(final ClientRequest request, final ClientResponse response, final ResponseCallback callback) {
ClientResponse processedResponse = null;
Throwable caught = null;
try {
processedResponse = Stages.process(response, responseProcessingRoot);
} catch (final Throwable throwable) {
caught = throwable;
}

try {
processedResponse = postInvocationInterceptorStage.afterRequest(request, processedResponse, caught);
} catch (Throwable throwable) {
processFailure(throwable, callback);
return;
}
callback.completed(processedResponse, requestScope);
}

private void processFailure(final ClientRequest request, final Throwable failure, final ResponseCallback callback) {
if (postInvocationInterceptorStage.hasPostInvocationInterceptor()) {
try {
final ClientResponse clientResponse = postInvocationInterceptorStage.afterRequest(request, null, failure);
callback.completed(clientResponse, requestScope);
} catch (RuntimeException e) {
final Throwable t = e.getSuppressed().length == 1 && e.getSuppressed()[0] == failure ? failure : e;
processFailure(t, callback);
}
} else {
processFailure(failure, callback);
}
}

private void processFailure(final Throwable failure, final ResponseCallback callback) {
callback.failed(failure instanceof ProcessingException
? (ProcessingException) failure : new ProcessingException(failure));
Expand Down Expand Up @@ -248,19 +287,25 @@ private ClientRequest addUserAgent(final ClientRequest clientRequest, final Stri
* @throws javax.ws.rs.ProcessingException in case of an invocation failure.
*/
public ClientResponse invoke(final ClientRequest request) {
ClientResponse response;
ProcessingException processingException = null;
ClientResponse response = null;
try {
preInvocationInterceptorStage.beforeRequest(request);

try {
response = connector.apply(addUserAgent(Stages.process(request, requestProcessingRoot), connector.getName()));
} catch (final AbortException aborted) {
response = aborted.getAbortResponse();
}

return Stages.process(response, responseProcessingRoot);
response = Stages.process(response, responseProcessingRoot);
} catch (final ProcessingException pe) {
throw pe;
processingException = pe;
} catch (final Throwable t) {
throw new ProcessingException(t.getMessage(), t);
processingException = new ProcessingException(t.getMessage(), t);
} finally {
response = postInvocationInterceptorStage.afterRequest(request, response, processingException);
return response;
}
}

Expand Down
Loading