Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate Future.cancel() to connectors #5542

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010, 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2010, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -31,9 +31,11 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -521,7 +523,7 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing

try {
final ConnectionClosingMechanism closingMechanism = new ConnectionClosingMechanism(clientRequest, request);
responseContext.setEntityStream(getInputStream(response, closingMechanism));
responseContext.setEntityStream(getInputStream(response, closingMechanism, () -> clientRequest.isCancelled()));
} catch (final IOException e) {
LOGGER.log(Level.SEVERE, null, e);
}
Expand Down Expand Up @@ -730,13 +732,14 @@ private static Map<String, String> writeOutBoundHeaders(final ClientRequest clie
}

private static InputStream getInputStream(final CloseableHttpResponse response,
final ConnectionClosingMechanism closingMechanism) throws IOException {
final ConnectionClosingMechanism closingMechanism,
final Supplier<Boolean> isCancelled) throws IOException {
final InputStream inputStream;

if (response.getEntity() == null) {
inputStream = new ByteArrayInputStream(new byte[0]);
} else {
final InputStream i = response.getEntity().getContent();
final InputStream i = new CancellableInputStream(response.getEntity().getContent(), isCancelled);
if (i.markSupported()) {
inputStream = i;
} else {
Expand Down Expand Up @@ -885,4 +888,68 @@ protected void prepareSocket(SSLSocket socket) throws IOException {
}
}
}

private static class CancellableInputStream extends InputStream {
private final InputStream in;
private final Supplier<Boolean> isCancelled;

private CancellableInputStream(InputStream in, Supplier<Boolean> isCancelled) {
this.in = in;
this.isCancelled = isCancelled;
}

public int read(byte b[]) throws IOException {
checkAborted();
return in.read();
}

public int read(byte b[], int off, int len) throws IOException {
checkAborted();
return in.read(b, off, len);
}

@Override
public int read() throws IOException {
checkAborted();
return in.read();
}

public boolean markSupported() {
return in.markSupported();
}

@Override
public long skip(long n) throws IOException {
checkAborted();
return in.skip(n);
}

@Override
public int available() throws IOException {
checkAborted();
return in.available();
}

@Override
public void close() throws IOException {
in.close();
}

@Override
public synchronized void mark(int readlimit) {
in.mark(readlimit);
}

@Override
public synchronized void reset() throws IOException {
checkAborted();
in.reset();
}

private void checkAborted() throws IOException {
if (isCancelled.get()) {
throw new IOException(new CancellationException());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2022, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -32,8 +32,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -531,7 +533,7 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing

try {
final ConnectionClosingMechanism closingMechanism = new ConnectionClosingMechanism(clientRequest, request);
responseContext.setEntityStream(getInputStream(response, closingMechanism));
responseContext.setEntityStream(getInputStream(response, closingMechanism, () -> clientRequest.isCancelled()));
} catch (final IOException e) {
LOGGER.log(Level.SEVERE, null, e);
}
Expand Down Expand Up @@ -741,13 +743,14 @@ private static Map<String, String> writeOutBoundHeaders(final ClientRequest clie
}

private static InputStream getInputStream(final CloseableHttpResponse response,
final ConnectionClosingMechanism closingMechanism) throws IOException {
final ConnectionClosingMechanism closingMechanism,
final Supplier<Boolean> isCancelled) throws IOException {
final InputStream inputStream;

if (response.getEntity() == null) {
inputStream = new ByteArrayInputStream(new byte[0]);
} else {
final InputStream i = response.getEntity().getContent();
final InputStream i = new CancellableInputStream(response.getEntity().getContent(), isCancelled);
if (i.markSupported()) {
inputStream = i;
} else {
Expand Down Expand Up @@ -889,4 +892,69 @@ protected void prepareSocket(SSLSocket socket) throws IOException {
}
}
}

private static class CancellableInputStream extends InputStream {
private final InputStream in;
private final Supplier<Boolean> isCancelled;

private CancellableInputStream(InputStream in, Supplier<Boolean> isCancelled) {
this.in = in;
this.isCancelled = isCancelled;
}

public int read(byte b[]) throws IOException {
checkAborted();
return in.read();
}

public int read(byte b[], int off, int len) throws IOException {
checkAborted();
return in.read(b, off, len);
}

@Override
public int read() throws IOException {
checkAborted();
return in.read();
}

public boolean markSupported() {
return in.markSupported();
}

@Override
public long skip(long n) throws IOException {
checkAborted();
return in.skip(n);
}

@Override
public int available() throws IOException {
checkAborted();
return in.available();
}

@Override
public void close() throws IOException {
in.close();
}

@Override
public synchronized void mark(int readlimit) {
in.mark(readlimit);
}

@Override
public synchronized void reset() throws IOException {
checkAborted();
in.reset();
}

private void checkAborted() throws IOException {
if (isCancelled.get()) {
throw new IOException(new CancellationException());
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -24,6 +24,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
Expand Down Expand Up @@ -157,6 +158,10 @@ protected void notifyResponse() {

@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
if (jerseyRequest.isCancelled()) {
jansupol marked this conversation as resolved.
Show resolved Hide resolved
responseAvailable.completeExceptionally(new CancellationException());
return;
}
if (msg instanceof HttpResponse) {
final HttpResponse response = (HttpResponse) msg;
jerseyResponse = new ClientResponse(new Response.StatusType() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -25,6 +25,10 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -90,6 +94,8 @@ public class ClientRequest extends OutboundMessageContext
private LazyValue<PropertiesResolver> propertiesResolver = Values.lazy(
(Value<PropertiesResolver>) () -> PropertiesResolver.create(getConfiguration(), getPropertiesDelegate())
);
// by default nothing to be cancelled.
private Future cancellable = NotCancellable.INSTANCE;

private static final Logger LOGGER = Logger.getLogger(ClientRequest.class.getName());

Expand Down Expand Up @@ -126,6 +132,7 @@ public ClientRequest(final ClientRequest original) {
this.writerInterceptors = original.writerInterceptors;
this.propertiesDelegate = new MapPropertiesDelegate(original.propertiesDelegate);
this.ignoreUserAgent = original.ignoreUserAgent;
this.cancellable = original.cancellable;
}

@Override
Expand Down Expand Up @@ -584,4 +591,66 @@ public boolean ignoreUserAgent() {
public void ignoreUserAgent(final boolean ignore) {
this.ignoreUserAgent = ignore;
}

/**
* Sets the new {@code Future} that may cancel this {@link ClientRequest}.
* @param cancellable
*/
void setCancellable(Future cancellable) {
this.cancellable = cancellable;
}

/**
* Cancels this {@link ClientRequest}. May result in {@link java.util.concurrent.CancellationException} later in this
* request processing if this {@link ClientRequest} is backed by a {@link Future} provided to
* {@link JerseyInvocation.Builder#setCancellable(Future)}.
* @param mayInterruptIfRunning may have no effect or {@code true} if the thread executing this task should be interrupted
* (if the thread is known to the implementation);
* otherwise, in-progress tasks are allowed to complete
*/
public void cancel(boolean mayInterruptIfRunning) {
cancellable.cancel(mayInterruptIfRunning);
}

/**
* Returns {@code true} if this {@link ClientRequest} was cancelled
* before it completed normally.
*
* @return {@code true} if this {@link ClientRequest} was cancelled
* before it completed normally
*/
public boolean isCancelled() {
return cancellable.isCancelled();
}

private static class NotCancellable implements Future {
public static final Future INSTANCE = new NotCancellable();
private boolean isCancelled = false;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
isCancelled = true;
return isCancelled;
}

@Override
public boolean isCancelled() {
return isCancelled;
}

@Override
public boolean isDone() {
return false;
}

@Override
public Object get() throws InterruptedException, ExecutionException {
return null;
}

@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
}
}
Loading
Loading