Skip to content

Commit

Permalink
Propagate Future.cancel() to connectors
Browse files Browse the repository at this point in the history
Signed-off-by: jansupol <[email protected]>
  • Loading branch information
jansupol committed Mar 11, 2024
1 parent 663242e commit 9602806
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010, 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2010, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -31,9 +31,11 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -521,7 +523,7 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing

try {
final ConnectionClosingMechanism closingMechanism = new ConnectionClosingMechanism(clientRequest, request);
responseContext.setEntityStream(getInputStream(response, closingMechanism));
responseContext.setEntityStream(getInputStream(response, closingMechanism, () -> clientRequest.isCancelled()));
} catch (final IOException e) {
LOGGER.log(Level.SEVERE, null, e);
}
Expand Down Expand Up @@ -730,13 +732,14 @@ private static Map<String, String> writeOutBoundHeaders(final ClientRequest clie
}

private static InputStream getInputStream(final CloseableHttpResponse response,
final ConnectionClosingMechanism closingMechanism) throws IOException {
final ConnectionClosingMechanism closingMechanism,
final Supplier<Boolean> isCancelled) throws IOException {
final InputStream inputStream;

if (response.getEntity() == null) {
inputStream = new ByteArrayInputStream(new byte[0]);
} else {
final InputStream i = response.getEntity().getContent();
final InputStream i = new CancellableInputStream(response.getEntity().getContent(), isCancelled);
if (i.markSupported()) {
inputStream = i;
} else {
Expand Down Expand Up @@ -885,4 +888,68 @@ protected void prepareSocket(SSLSocket socket) throws IOException {
}
}
}

private static class CancellableInputStream extends InputStream {
private final InputStream in;
private final Supplier<Boolean> isCancelled;

private CancellableInputStream(InputStream in, Supplier<Boolean> isCancelled) {
this.in = in;
this.isCancelled = isCancelled;
}

public int read(byte b[]) throws IOException {
checkAborted();
return in.read();
}

public int read(byte b[], int off, int len) throws IOException {
checkAborted();
return in.read(b, off, len);
}

@Override
public int read() throws IOException {
checkAborted();
return in.read();
}

public boolean markSupported() {
return in.markSupported();
}

@Override
public long skip(long n) throws IOException {
checkAborted();
return in.skip(n);
}

@Override
public int available() throws IOException {
checkAborted();
return in.available();
}

@Override
public void close() throws IOException {
in.close();
}

@Override
public synchronized void mark(int readlimit) {
in.mark(readlimit);
}

@Override
public synchronized void reset() throws IOException {
checkAborted();
in.reset();
}

private void checkAborted() throws IOException {
if (isCancelled.get()) {
throw new IOException(new CancellationException());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2022, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -32,8 +32,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -531,7 +533,7 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing

try {
final ConnectionClosingMechanism closingMechanism = new ConnectionClosingMechanism(clientRequest, request);
responseContext.setEntityStream(getInputStream(response, closingMechanism));
responseContext.setEntityStream(getInputStream(response, closingMechanism, () -> clientRequest.isCancelled()));
} catch (final IOException e) {
LOGGER.log(Level.SEVERE, null, e);
}
Expand Down Expand Up @@ -741,13 +743,14 @@ private static Map<String, String> writeOutBoundHeaders(final ClientRequest clie
}

private static InputStream getInputStream(final CloseableHttpResponse response,
final ConnectionClosingMechanism closingMechanism) throws IOException {
final ConnectionClosingMechanism closingMechanism,
final Supplier<Boolean> isCancelled) throws IOException {
final InputStream inputStream;

if (response.getEntity() == null) {
inputStream = new ByteArrayInputStream(new byte[0]);
} else {
final InputStream i = response.getEntity().getContent();
final InputStream i = new CancellableInputStream(response.getEntity().getContent(), isCancelled);
if (i.markSupported()) {
inputStream = i;
} else {
Expand Down Expand Up @@ -889,4 +892,69 @@ protected void prepareSocket(SSLSocket socket) throws IOException {
}
}
}

private static class CancellableInputStream extends InputStream {
private final InputStream in;
private final Supplier<Boolean> isCancelled;

private CancellableInputStream(InputStream in, Supplier<Boolean> isCancelled) {
this.in = in;
this.isCancelled = isCancelled;
}

public int read(byte b[]) throws IOException {
checkAborted();
return in.read();
}

public int read(byte b[], int off, int len) throws IOException {
checkAborted();
return in.read(b, off, len);
}

@Override
public int read() throws IOException {
checkAborted();
return in.read();
}

public boolean markSupported() {
return in.markSupported();
}

@Override
public long skip(long n) throws IOException {
checkAborted();
return in.skip(n);
}

@Override
public int available() throws IOException {
checkAborted();
return in.available();
}

@Override
public void close() throws IOException {
in.close();
}

@Override
public synchronized void mark(int readlimit) {
in.mark(readlimit);
}

@Override
public synchronized void reset() throws IOException {
checkAborted();
in.reset();
}

private void checkAborted() throws IOException {
if (isCancelled.get()) {
throw new IOException(new CancellationException());
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -24,6 +24,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
Expand Down Expand Up @@ -157,6 +158,10 @@ protected void notifyResponse() {

@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
if (jerseyRequest.isCancelled()) {
responseAvailable.completeExceptionally(new CancellationException());
return;
}
if (msg instanceof HttpResponse) {
final HttpResponse response = (HttpResponse) msg;
jerseyResponse = new ClientResponse(new Response.StatusType() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -25,6 +25,10 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -90,6 +94,8 @@ public class ClientRequest extends OutboundMessageContext
private LazyValue<PropertiesResolver> propertiesResolver = Values.lazy(
(Value<PropertiesResolver>) () -> PropertiesResolver.create(getConfiguration(), getPropertiesDelegate())
);
// by default nothing to be cancelled.
private Future cancellable = NotCancellable.INSTANCE;

private static final Logger LOGGER = Logger.getLogger(ClientRequest.class.getName());

Expand Down Expand Up @@ -126,6 +132,7 @@ public ClientRequest(final ClientRequest original) {
this.writerInterceptors = original.writerInterceptors;
this.propertiesDelegate = new MapPropertiesDelegate(original.propertiesDelegate);
this.ignoreUserAgent = original.ignoreUserAgent;
this.cancellable = original.cancellable;
}

@Override
Expand Down Expand Up @@ -584,4 +591,66 @@ public boolean ignoreUserAgent() {
public void ignoreUserAgent(final boolean ignore) {
this.ignoreUserAgent = ignore;
}

/**
* Sets the new {@code Future} that may cancel this {@link ClientRequest}.
* @param cancellable
*/
void setCancellable(Future cancellable) {
this.cancellable = cancellable;
}

/**
* Cancels this {@link ClientRequest}. May result in {@link java.util.concurrent.CancellationException} later in this
* request processing if this {@link ClientRequest} is backed by a {@link Future} provided to
* {@link JerseyInvocation.Builder#setCancellable(Future)}.
* @param mayInterruptIfRunning may have no effect or {@code true} if the thread executing this task should be interrupted
* (if the thread is known to the implementation);
* otherwise, in-progress tasks are allowed to complete
*/
public void cancel(boolean mayInterruptIfRunning) {
cancellable.cancel(mayInterruptIfRunning);
}

/**
* Returns {@code true} if this {@link ClientRequest} was cancelled
* before it completed normally.
*
* @return {@code true} if this {@link ClientRequest} was cancelled
* before it completed normally
*/
public boolean isCancelled() {
return cancellable.isCancelled();
}

private static class NotCancellable implements Future {
public static final Future INSTANCE = new NotCancellable();
private boolean isCancelled = false;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
isCancelled = true;
return isCancelled;
}

@Override
public boolean isCancelled() {
return isCancelled;
}

@Override
public boolean isDone() {
return false;
}

@Override
public Object get() throws InterruptedException, ExecutionException {
return null;
}

@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
}
}
Loading

0 comments on commit 9602806

Please sign in to comment.