Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to configure the queue capacity for ChunkedOutput #5621

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9a58a6f
Update ChunkedOutput.java
rutterpaul-personal Apr 23, 2024
5176818
Update ChunkedOutput.java
rutterpaul-personal Apr 23, 2024
9069a05
Update ChunkedOutput.java
rutterpaul-personal Apr 23, 2024
95b77b7
Update ChunkedOutput.java
rutterpaul-personal Apr 23, 2024
a91c120
Update ChunkedOutput.java
rutterpaul-personal Apr 23, 2024
df15d68
Update ChunkedOutput.java
rutterpaul-personal Apr 23, 2024
270210e
Update ChunkedOutput.java
rutterpaul-personal Apr 23, 2024
2aa9218
Update ChunkedOutput.java
rutterpaul-personal Apr 23, 2024
3428f2b
Update ChunkedOutput.java
rutterpaul-personal Apr 23, 2024
5fefe35
Update ChunkedInputOutputTest.java
rutterpaul-personal Apr 23, 2024
383a883
Update ChunkedInputOutputTest.java
rutterpaul-personal Apr 23, 2024
370bde2
Update async.xml
rutterpaul-personal Apr 23, 2024
baaae71
Update async.xml
rutterpaul-personal Apr 23, 2024
e15ab82
Update ChunkedOutput.java
rutterpaul-personal Apr 23, 2024
14cd830
Update async.xml
rutterpaul-personal Apr 23, 2024
673b088
Update ChunkedInputOutputTest.java
rutterpaul-personal Apr 23, 2024
b436843
Update ChunkedInputOutputTest.java
rutterpaul-personal Apr 24, 2024
6fb8775
Update ChunkedOutput.java
rutterpaul-personal Apr 24, 2024
da8bbbe
Update ChunkedInputOutputTest.java
rutterpaul-personal Apr 24, 2024
6e2d1f0
Update async.xml
rutterpaul-personal Apr 24, 2024
d8dba05
Update ChunkedInputOutputTest.java
rutterpaul-personal Apr 24, 2024
5827f67
Update ChunkedOutput.java
rutterpaul-personal Apr 24, 2024
318610f
Update ChunkedInputOutputTest.java
rutterpaul-personal Apr 24, 2024
86e4a50
Update ChunkedOutput.java
rutterpaul-personal Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 163 additions & 12 deletions core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -27,12 +27,11 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.inject.Provider;
import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.ext.WriterInterceptor;

import javax.inject.Provider;

import org.glassfish.jersey.process.internal.RequestContext;
import org.glassfish.jersey.process.internal.RequestScope;
import org.glassfish.jersey.server.internal.LocalizationMessages;
Expand All @@ -51,7 +50,7 @@
public class ChunkedOutput<T> extends GenericType<T> implements Closeable {
private static final byte[] ZERO_LENGTH_DELIMITER = new byte[0];

private final BlockingDeque<T> queue = new LinkedBlockingDeque<>();
private final BlockingDeque<T> queue;
private final byte[] chunkDelimiter;
private final AtomicBoolean resumed = new AtomicBoolean(false);
private final Object lock = new Object();
Expand All @@ -70,12 +69,59 @@ public class ChunkedOutput<T> extends GenericType<T> implements Closeable {
private volatile ContainerResponse responseContext;
private volatile ConnectionCallback connectionCallback;


/**
* Create new {@code ChunkedOutput}.
*/
protected ChunkedOutput() {
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
queue = new LinkedBlockingDeque<>();
}

/**
* Create new {@code ChunkedOutput} based on builder.
*
* @param builder the builder to use
*/
protected ChunkedOutput(Builder<T> builder) {
super();
if (builder.queueCapacity > 0) {
queue = new LinkedBlockingDeque<>(builder.queueCapacity);
} else {
queue = new LinkedBlockingDeque<>();
}
if (builder.chunkDelimiter != null) {
this.chunkDelimiter = new byte[builder.chunkDelimiter.length];
System.arraycopy(builder.chunkDelimiter, 0, this.chunkDelimiter, 0, builder.chunkDelimiter.length);
} else {
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
}
if (builder.asyncContextProvider != null) {
this.asyncContext = builder.asyncContextProvider.get();
}
}

/**
* Create new {@code ChunkedOutput} based on builder.
*
* @param builder the builder to use
*/
private ChunkedOutput(TypedBuilder<T> builder) {
super(builder.chunkType);

if (builder.queueCapacity > 0) {
queue = new LinkedBlockingDeque<>(builder.queueCapacity);
} else {
queue = new LinkedBlockingDeque<>();
}
if (builder.chunkDelimiter != null) {
this.chunkDelimiter = new byte[builder.chunkDelimiter.length];
System.arraycopy(builder.chunkDelimiter, 0, this.chunkDelimiter, 0, builder.chunkDelimiter.length);
} else {
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
}
if (builder.asyncContextProvider != null) {
this.asyncContext = builder.asyncContextProvider.get();
}
}

/**
Expand All @@ -86,6 +132,7 @@ protected ChunkedOutput() {
public ChunkedOutput(final Type chunkType) {
super(chunkType);
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
queue = new LinkedBlockingDeque<>();
}

/**
Expand All @@ -101,6 +148,7 @@ protected ChunkedOutput(final byte[] chunkDelimiter) {
} else {
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
}
queue = new LinkedBlockingDeque<>();
}

/**
Expand All @@ -118,6 +166,7 @@ protected ChunkedOutput(final byte[] chunkDelimiter, Provider<AsyncContext> asyn
}

this.asyncContext = asyncContextProvider == null ? null : asyncContextProvider.get();
queue = new LinkedBlockingDeque<>();
}

/**
Expand All @@ -135,6 +184,7 @@ public ChunkedOutput(final Type chunkType, final byte[] chunkDelimiter) {
} else {
this.chunkDelimiter = ZERO_LENGTH_DELIMITER;
}
queue = new LinkedBlockingDeque<>();
}

/**
Expand All @@ -149,6 +199,7 @@ protected ChunkedOutput(final String chunkDelimiter) {
} else {
this.chunkDelimiter = chunkDelimiter.getBytes();
}
queue = new LinkedBlockingDeque<>();
}

/**
Expand All @@ -165,6 +216,26 @@ public ChunkedOutput(final Type chunkType, final String chunkDelimiter) {
} else {
this.chunkDelimiter = chunkDelimiter.getBytes();
}
queue = new LinkedBlockingDeque<>();
}

/**
* Returns a builder to create a ChunkedOutput with custom configuration.
*
* @return builder
*/
public static <T> Builder<T> builder() {
return new Builder<>();
}

/**
* Returns a builder to create a ChunkedOutput with custom configuration.
*
* @param chunkType chunk type. Must not be {code null}.
* @return builder
*/
public static <T> TypedBuilder<T> builder(Type chunkType) {
return new TypedBuilder<>(chunkType);
}

/**
Expand All @@ -179,7 +250,12 @@ public void write(final T chunk) throws IOException {
}

if (chunk != null) {
queue.add(chunk);
try {
queue.put(chunk);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}

flushQueue();
Expand Down Expand Up @@ -265,9 +341,9 @@ public Void call() throws IOException {
}
throw mpe;
} finally {
synchronized (lock) {
touchingEntityStream = false;
}
synchronized (lock) {
touchingEntityStream = false;
}
}

t = queue.poll();
Expand Down Expand Up @@ -341,7 +417,6 @@ public void close() throws IOException {

/**
* Get state information.
*
* Please note that {@code ChunkedOutput} can be closed by the client side - client can close connection
* from its side.
*
Expand All @@ -353,10 +428,12 @@ public boolean isClosed() {

/**
* Executed only in case of close being triggered by client.
*
* @param e Exception causing the close
*/
protected void onClose(Exception e){

protected void onClose(Exception e) {
// drain queue when an exception occurs to prevent deadlocks
queue.clear();
}

@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
Expand Down Expand Up @@ -399,4 +476,78 @@ void setContext(final RequestScope requestScope,
this.connectionCallback = connectionCallbackRunner;
flushQueue();
}

/**
* Builder that allows to create a new ChunkedOutput based on the given configuration options.
*
* @param <Y>
*/
public static class Builder<Y> {
byte[] chunkDelimiter;
int queueCapacity = -1;
Provider<AsyncContext> asyncContextProvider;

private Builder() {
// hide constructor
}

/**
* Set the chunk delimiter, in bytes.
* @param chunkDelimiter the chunk delimiter in bytes
* @return builder
*/
public Builder<Y> chunkDelimiter(byte[] chunkDelimiter) {
this.chunkDelimiter = chunkDelimiter;
return this;
}

/**
* Set the queue capacity. If greater than 0, the queue is bounded and will block when full.
* @param queueCapacity the queue capacity
* @return builder
*/
public Builder<Y> queueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
return this;
}

/**
* Set the async context provider.
* @param asyncContextProvider the async context provider
* @return builder
*/
public Builder<Y> asyncContextProvider(Provider<AsyncContext> asyncContextProvider) {
this.asyncContextProvider = asyncContextProvider;
return this;
}

/**
* Build the ChunkedOutput based on the given configuration.
* @return the ChunkedOutput
*/
public ChunkedOutput<Y> build() {
return new ChunkedOutput<>(this);
}
}

/**
* Builder that allows to create a new ChunkedOutput based on the given configuration options.
*
* @param <Y>
*/
public static class TypedBuilder<Y> extends Builder<Y> {
private Type chunkType;

private TypedBuilder(Type chunkType) {
this.chunkType = chunkType;
}

/**
* Build the ChunkedOutput based on the given configuration.
* @return the ChunkedOutput
*/
public ChunkedOutput<Y> build() {
return new ChunkedOutput<>(this);
}
}
}
5 changes: 4 additions & 1 deletion docs/src/main/docbook/async.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0"?>
<!--
Copyright (c) 2012, 2021 Oracle and/or its affiliates. All rights reserved.
Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
This program and the accompanying materials are made available under the
terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -270,6 +270,9 @@ public class AsyncResource {
public ChunkedOutput<String> getChunkedResponse() {
final ChunkedOutput<String> output = new ChunkedOutput<String>(String.class);
// Or use the builder pattern instead, which also allows to configure the queue capacity
// final ChunkedOutput<String> output = ChunkedOutput.<String>builder(String.class).queueCapacity(10).build();
new Thread() {
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2022 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand Down Expand Up @@ -60,14 +60,34 @@ public class ChunkedInputOutputTest extends JerseyTest {
*/
@Path("/test")
public static class TestResource {
/**
* Get chunk stream with a queue capacity of 2.
*
* @return chunk stream.
*/
@GET
@Path("/testWithBuilder")
public ChunkedOutput<String> getWithBuilder() {
return getOutput(ChunkedOutput.<String>builder(String.class).queueCapacity(2)
.chunkDelimiter("\r\n".getBytes()).build());
}

/**
* Get chunk stream.
*
* @return chunk stream.
*/
@GET
public ChunkedOutput<String> get() {
final ChunkedOutput<String> output = new ChunkedOutput<>(String.class, "\r\n");
return getOutput(new ChunkedOutput<>(String.class, "\r\n"));
}

/**
* Get chunk stream.
*
* @return chunk stream.
*/
private ChunkedOutput<String> getOutput(ChunkedOutput<String> output) {

new Thread() {
@Override
Expand Down Expand Up @@ -182,6 +202,19 @@ public void testChunkedOutputToSingleString() throws Exception {
"Unexpected value of chunked response unmarshalled as a single string.");
}

/**
* Test retrieving chunked response stream as a single response string, when a builder with capacity is used.
*
* @throws Exception in case of a failure during the test execution.
*/
@Test
public void testChunkedOutputToSingleStringWithBuilder() throws Exception {
final String response = target().path("test/testWithBuilder").request().get(String.class);

assertEquals("test\r\ntest\r\ntest\r\n", response,
"Unexpected value of chunked response unmarshalled as a single string.");
}

/**
* Test retrieving chunked response stream sequentially as individual chunks using chunked input.
*
Expand Down