Skip to content

Commit

Permalink
New client PreInvocationInterceptor and PostInvocationInterceptor SPI…
Browse files Browse the repository at this point in the history
… executed around the request

Signed-off-by: Jan Supol <[email protected]>
  • Loading branch information
jansupol committed Oct 24, 2019
1 parent 65b1770 commit 0f9149d
Show file tree
Hide file tree
Showing 9 changed files with 1,492 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@
package org.glassfish.jersey.client;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.ClientRequestFilter;
import javax.ws.rs.client.ClientResponseFilter;
import javax.ws.rs.client.ResponseProcessingException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ReaderInterceptor;

import org.glassfish.jersey.client.internal.routing.AbortedRequestMediaTypeDeterminer;
import org.glassfish.jersey.client.internal.routing.ClientResponseMediaTypeDeterminer;
import org.glassfish.jersey.internal.inject.InjectionManager;
import org.glassfish.jersey.internal.inject.Providers;
import org.glassfish.jersey.message.internal.HeaderUtils;
import org.glassfish.jersey.message.internal.InboundMessageContext;
import org.glassfish.jersey.message.internal.OutboundJaxrsResponse;
import org.glassfish.jersey.model.internal.RankedComparator;
import org.glassfish.jersey.process.internal.AbstractChainableStage;
import org.glassfish.jersey.process.internal.ChainableStage;
Expand Down Expand Up @@ -62,6 +58,29 @@ static ChainableStage<ClientRequest> createRequestFilteringStage(InjectionManage
return requestFilters.iterator().hasNext() ? new RequestFilteringStage(requestFilters) : null;
}

/**
* Create client request filtering stage using the injection manager. May return {@code null}.
*
* @param firstFilter Non null {@link ClientRequestFilter client request filter} to be executed
* in the client request filtering stage.
* @param injectionManager injection manager to be used.
* @return configured request filtering stage, or {@code null} in case there are no
* {@link ClientRequestFilter client request filters} registered in the injection manager
* and {@code firstFilter} is null.
*/
static ChainableStage<ClientRequest> createRequestFilteringStage(ClientRequestFilter firstFilter,
InjectionManager injectionManager) {
RankedComparator<ClientRequestFilter> comparator = new RankedComparator<>(RankedComparator.Order.ASCENDING);
Iterable<ClientRequestFilter> requestFilters =
Providers.getAllProviders(injectionManager, ClientRequestFilter.class, comparator);
if (firstFilter != null && !requestFilters.iterator().hasNext()) {
return new RequestFilteringStage(Collections.singletonList(firstFilter));
} else if (firstFilter != null && requestFilters.iterator().hasNext()) {
return new RequestFilteringStage(prependFilter(firstFilter, requestFilters));
}
return null;
}

/**
* Create client response filtering stage using the injection manager. May return {@code null}.
*
Expand All @@ -76,6 +95,39 @@ static ChainableStage<ClientResponse> createResponseFilteringStage(InjectionMana
return responseFilters.iterator().hasNext() ? new ResponseFilterStage(responseFilters) : null;
}

/**
* Prepend an filter to a given iterable.
* @param filter to be prepend.
* @param filters the iterable the given filter is to be prependto
* @param <T> filter type
* @return iterable with first item of prepended filter.
*/
private static <T> Iterable<T> prependFilter(T filter, Iterable<T> filters) {
return new Iterable<T>() {
boolean wasInterceptorFilterNext = false;
final Iterator<T> filterIterator = filters.iterator();
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
@Override
public boolean hasNext() {
return !wasInterceptorFilterNext || filterIterator.hasNext();
}

@Override
public T next() {
if (wasInterceptorFilterNext) {
return filterIterator.next();
} else {
wasInterceptorFilterNext = true;
return filter;
}
}
};
}
};
}

private static final class RequestFilteringStage extends AbstractChainableStage<ClientRequest> {

private final Iterable<ClientRequestFilter> requestFilters;
Expand All @@ -91,21 +143,9 @@ public Continuation<ClientRequest> apply(ClientRequest requestContext) {
filter.filter(requestContext);
final Response abortResponse = requestContext.getAbortResponse();
if (abortResponse != null) {
if (abortResponse.hasEntity() && abortResponse.getMediaType() == null) {
final InboundMessageContext headerContext = new InboundMessageContext() {
@Override
protected Iterable<ReaderInterceptor> getReaderInterceptors() {
return null;
}
};
headerContext.headers(HeaderUtils.asStringHeaders(abortResponse.getHeaders()));

final AbortedRequestMediaTypeDeterminer determiner = new AbortedRequestMediaTypeDeterminer(
requestContext.getWorkers());
final MediaType mediaType = determiner.determineResponseMediaType(abortResponse.getEntity(),
headerContext.getQualifiedAcceptableMediaTypes());
abortResponse.getHeaders().add(HttpHeaders.CONTENT_TYPE, mediaType);
}
final ClientResponseMediaTypeDeterminer determiner = new ClientResponseMediaTypeDeterminer(
requestContext.getWorkers());
determiner.setResponseMediaTypeIfNotSet(abortResponse);
throw new AbortException(new ClientResponse(requestContext, abortResponse));
}
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class ClientRuntime implements JerseyClient.ShutdownHook, ClientExecutor {
private final ManagedObjectsFinalizer managedObjectsFinalizer;
private final InjectionManager injectionManager;

private final InvocationInterceptorStages.PreInvocationInterceptorStage preInvocationInterceptorStage;
private final InvocationInterceptorStages.PostInvocationInterceptorStage postInvocationInterceptorStage;

/**
* Create new client request processing runtime.
*
Expand All @@ -95,7 +98,14 @@ public ClientRuntime(final ClientConfig config, final Connector connector, final

Stage.Builder<ClientRequest> requestingChainBuilder = Stages.chain(requestProcessingInitializationStage);

ChainableStage<ClientRequest> requestFilteringStage = ClientFilteringStages.createRequestFilteringStage(injectionManager);
preInvocationInterceptorStage = InvocationInterceptorStages.createPreInvocationInterceptorStage(injectionManager);
postInvocationInterceptorStage = InvocationInterceptorStages.createPostInvocationInterceptorStage(injectionManager);

ChainableStage<ClientRequest> requestFilteringStage = preInvocationInterceptorStage.hasPreInvocationInterceptors()
? ClientFilteringStages.createRequestFilteringStage(
preInvocationInterceptorStage.createPreInvocationInterceptorFilter(), injectionManager)
: ClientFilteringStages.createRequestFilteringStage(injectionManager);

this.requestProcessingRoot = requestFilteringStage != null
? requestingChainBuilder.build(requestFilteringStage) : requestingChainBuilder.build();

Expand Down Expand Up @@ -136,33 +146,41 @@ public ClientRuntime(final ClientConfig config, final Connector connector, final
* @return {@code Runnable} to be submitted for async processing using {@link #submit(Runnable)}.
*/
Runnable createRunnableForAsyncProcessing(ClientRequest request, final ResponseCallback callback) {
try {
requestScope.runInScope(() -> preInvocationInterceptorStage.beforeRequest(request));
} catch (Throwable throwable) {
return () -> requestScope.runInScope(() -> processFailure(request, throwable, callback));
}

return () -> requestScope.runInScope(() -> {
RuntimeException runtimeException = null;
try {
ClientRequest processedRequest;

try {
processedRequest = Stages.process(request, requestProcessingRoot);
processedRequest = addUserAgent(processedRequest, connector.getName());
} catch (final AbortException aborted) {
processResponse(aborted.getAbortResponse(), callback);
processResponse(request, aborted.getAbortResponse(), callback);
return;
}

final AsyncConnectorCallback connectorCallback = new AsyncConnectorCallback() {

@Override
public void response(final ClientResponse response) {
requestScope.runInScope(() -> processResponse(response, callback));
requestScope.runInScope(() -> processResponse(request, response, callback));
}

@Override
public void failure(final Throwable failure) {
requestScope.runInScope(() -> processFailure(failure, callback));
requestScope.runInScope(() -> processFailure(request, failure, callback));
}
};

connector.apply(processedRequest, connectorCallback);
} catch (final Throwable throwable) {
processFailure(throwable, callback);
processFailure(request, throwable, callback);
}
});
}
Expand Down Expand Up @@ -192,17 +210,38 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
return backgroundScheduler.get().schedule(command, delay, unit);
}

private void processResponse(final ClientResponse response, final ResponseCallback callback) {
final ClientResponse processedResponse;
private void processResponse(final ClientRequest request, final ClientResponse response, final ResponseCallback callback) {
ClientResponse processedResponse = null;
Throwable caught = null;
try {
processedResponse = Stages.process(response, responseProcessingRoot);
} catch (final Throwable throwable) {
caught = throwable;
}

try {
processedResponse = postInvocationInterceptorStage.afterRequest(request, processedResponse, caught);
} catch (Throwable throwable) {
processFailure(throwable, callback);
return;
}
callback.completed(processedResponse, requestScope);
}

private void processFailure(final ClientRequest request, final Throwable failure, final ResponseCallback callback) {
if (postInvocationInterceptorStage.hasPostInvocationInterceptor()) {
try {
final ClientResponse clientResponse = postInvocationInterceptorStage.afterRequest(request, null, failure);
callback.completed(clientResponse, requestScope);
} catch (RuntimeException e) {
final Throwable t = e.getSuppressed().length == 1 && e.getSuppressed()[0] == failure ? failure : e;
processFailure(t, callback);
}
} else {
processFailure(failure, callback);
}
}

private void processFailure(final Throwable failure, final ResponseCallback callback) {
callback.failed(failure instanceof ProcessingException
? (ProcessingException) failure : new ProcessingException(failure));
Expand Down Expand Up @@ -248,19 +287,25 @@ private ClientRequest addUserAgent(final ClientRequest clientRequest, final Stri
* @throws javax.ws.rs.ProcessingException in case of an invocation failure.
*/
public ClientResponse invoke(final ClientRequest request) {
ClientResponse response;
ProcessingException processingException = null;
ClientResponse response = null;
try {
preInvocationInterceptorStage.beforeRequest(request);

try {
response = connector.apply(addUserAgent(Stages.process(request, requestProcessingRoot), connector.getName()));
} catch (final AbortException aborted) {
response = aborted.getAbortResponse();
}

return Stages.process(response, responseProcessingRoot);
response = Stages.process(response, responseProcessingRoot);
} catch (final ProcessingException pe) {
throw pe;
processingException = pe;
} catch (final Throwable t) {
throw new ProcessingException(t.getMessage(), t);
processingException = new ProcessingException(t.getMessage(), t);
} finally {
response = postInvocationInterceptorStage.afterRequest(request, response, processingException);
return response;
}
}

Expand Down
Loading

0 comments on commit 0f9149d

Please sign in to comment.