Skip to content

Commit

Permalink
add pending changes from graphql-js
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Jul 9, 2024
1 parent 9e21e91 commit f7c7411
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 110 deletions.
79 changes: 29 additions & 50 deletions packages/executor/src/execution/IncrementalGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ export class IncrementalGraph {
private _rootNodes: Set<SubsequentResultRecord>;

private _completedQueue: Array<IncrementalDataRecordResult>;
private _nextQueue: Array<
(iterable: IteratorResult<Iterable<IncrementalDataRecordResult>>) => void
>;
private _nextQueue: Array<(iterable: Iterable<IncrementalDataRecordResult> | undefined) => void>;

constructor() {
this._rootNodes = new Set();
Expand Down Expand Up @@ -60,46 +58,30 @@ export class IncrementalGraph {
}
}

currentCompletedIncrementalData() {
return {
[Symbol.iterator]() {
return this;
},
next: (): IteratorResult<IncrementalDataRecordResult> => {
const value = this._completedQueue.shift();
if (value !== undefined) {
return { value, done: false };
}
return { value: undefined, done: true };
},
};
*currentCompletedBatch(): Generator<IncrementalDataRecordResult> {
let completed;
while ((completed = this._completedQueue.shift()) !== undefined) {
yield completed;
}
if (this._rootNodes.size === 0) {
for (const resolve of this._nextQueue) {
resolve(undefined);
}
}
}

completedIncrementalData() {
return {
[Symbol.asyncIterator]() {
return this;
},
next: (): Promise<IteratorResult<Iterable<IncrementalDataRecordResult>>> => {
const firstResult = this._completedQueue.shift();
if (firstResult !== undefined) {
return Promise.resolve({
value: this._yieldCurrentCompletedIncrementalData(firstResult),
done: false,
});
}
const { promise, resolve } =
createDeferred<IteratorResult<Iterable<IncrementalDataRecordResult>>>();
this._nextQueue.push(resolve);
return promise;
},
return: (): Promise<IteratorResult<Iterable<IncrementalDataRecordResult>>> => {
for (const resolve of this._nextQueue) {
resolve({ value: undefined, done: true });
}
return Promise.resolve({ value: undefined, done: true });
},
};
nextCompletedBatch(): Promise<Iterable<IncrementalDataRecordResult> | undefined> {
const { promise, resolve } = createDeferred<
Iterable<IncrementalDataRecordResult> | undefined
>();
this._nextQueue.push(resolve);
return promise;
}

abort(): void {
for (const resolve of this._nextQueue) {
resolve(undefined);
}
}

hasNext(): boolean {
Expand Down Expand Up @@ -144,11 +126,6 @@ export class IncrementalGraph {

private _removePending(subsequentResultRecord: SubsequentResultRecord): void {
this._rootNodes.delete(subsequentResultRecord);
if (this._rootNodes.size === 0) {
for (const resolve of this._nextQueue) {
resolve({ value: undefined, done: true });
}
}
}

private _addIncrementalDataRecords(
Expand Down Expand Up @@ -312,15 +289,17 @@ export class IncrementalGraph {
while ((completed = this._completedQueue.shift()) !== undefined) {
yield completed;
}
if (this._rootNodes.size === 0) {
for (const resolve of this._nextQueue) {
resolve(undefined);
}
}
}

private _enqueue(completed: IncrementalDataRecordResult): void {
const next = this._nextQueue.shift();
if (next !== undefined) {
next({
value: this._yieldCurrentCompletedIncrementalData(completed),
done: false,
});
next(this._yieldCurrentCompletedIncrementalData(completed));
return;
}
this._completedQueue.push(completed);
Expand Down
67 changes: 41 additions & 26 deletions packages/executor/src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ interface SubsequentIncrementalExecutionResultContext<TData = any> {
completed: Array<CompletedResult>;
}

/**
* The IncrementalPublisherState Enum tracks the state of the IncrementalPublisher, which is initialized to
* "Started". When there are no more incremental results to publish, the state is set to "Completed". On the
* next call to next, clean-up is potentially performed and the state is set to "Finished".
*
* If the IncrementalPublisher is ended early, it may be advanced directly from "Started" to "Finished".
*/
enum IncrementalPublisherState {
Started = 1,
Completed = 2,
Finished = 3,
}

/**
* This class is used to publish incremental results to the client, enabling semi-concurrent
* execution while preserving result order.
Expand Down Expand Up @@ -113,18 +126,32 @@ class IncrementalPublisher {
void,
void
> {
let isDone = false;
let incrementalPublisherState: IncrementalPublisherState = IncrementalPublisherState.Started;

const _finish = async (): Promise<void> => {
incrementalPublisherState = IncrementalPublisherState.Finished;
this._incrementalGraph.abort();
await this._returnAsyncIterators();
};

this._context.signal?.addEventListener('abort', () => {
this._incrementalGraph.completedIncrementalData().return();
this._incrementalGraph.abort();
});

const _next = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult<TData>, void>
> => {
if (isDone) {
await this._returnAsyncIteratorsIgnoringErrors();
return { value: undefined, done: true };
switch (incrementalPublisherState) {
case IncrementalPublisherState.Finished: {
return { value: undefined, done: true };
}
case IncrementalPublisherState.Completed: {
await _finish();
return { value: undefined, done: true };
}
case IncrementalPublisherState.Started: {
// continue
}
}

const context: SubsequentIncrementalExecutionResultContext<TData> = {
Expand All @@ -133,12 +160,10 @@ class IncrementalPublisher {
completed: [],
};

let currentCompletedIncrementalData =
this._incrementalGraph.currentCompletedIncrementalData();
const completedIncrementalData = this._incrementalGraph.completedIncrementalData();
const asyncIterator = completedIncrementalData[Symbol.asyncIterator]();
let batch: Iterable<IncrementalDataRecordResult> | undefined =
this._incrementalGraph.currentCompletedBatch();
do {
for (const completedResult of currentCompletedIncrementalData) {
for (const completedResult of batch) {
this._handleCompletedIncrementalData(completedResult, context);
}

Expand All @@ -147,7 +172,7 @@ class IncrementalPublisher {
const hasNext = this._incrementalGraph.hasNext();

if (!hasNext) {
isDone = true;
incrementalPublisherState = IncrementalPublisherState.Completed;
}

const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult<TData> =
Expand All @@ -169,31 +194,27 @@ class IncrementalPublisher {
return { value: subsequentIncrementalExecutionResult, done: false };
}

const iteration = await asyncIterator.next();
currentCompletedIncrementalData = iteration.value;
} while (currentCompletedIncrementalData !== undefined);
batch = await this._incrementalGraph.nextCompletedBatch();
} while (batch !== undefined);

if (this._context.signal?.aborted) {
throw this._context.signal.reason;
}

await this._returnAsyncIteratorsIgnoringErrors();
return { value: undefined, done: true };
};

const _return = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult<TData>, void>
> => {
isDone = true;
await this._returnAsyncIterators();
await _finish();
return { value: undefined, done: true };
};

const _throw = async (
error?: unknown,
): Promise<IteratorResult<SubsequentIncrementalExecutionResult<TData>, void>> => {
isDone = true;
await this._returnAsyncIterators();
await _finish();
return Promise.reject(error);
};

Expand Down Expand Up @@ -400,7 +421,7 @@ class IncrementalPublisher {
}

private async _returnAsyncIterators(): Promise<void> {
await this._incrementalGraph.completedIncrementalData().return();
await this._incrementalGraph.abort();

const cancellableStreams = this._context.cancellableStreams;
if (cancellableStreams === undefined) {
Expand All @@ -414,10 +435,4 @@ class IncrementalPublisher {
}
await Promise.all(promises);
}

private async _returnAsyncIteratorsIgnoringErrors(): Promise<void> {
await this._returnAsyncIterators().catch(() => {
// Ignore errors
});
}
}
53 changes: 19 additions & 34 deletions packages/executor/src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
} from 'graphql';
import { createDeferred, MaybePromise } from '@graphql-tools/utils';
import { expectJSON } from '../../__testUtils__/expectJSON.js';
import { expectPromise } from '../../__testUtils__/expectPromise.js';
import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js';
import { execute } from '../execute.js';
import type {
Expand Down Expand Up @@ -1831,7 +1832,7 @@ describe('Execute: stream directive', () => {
]);
});

it('Returns iterator and ignores errors when stream payloads are filtered', async () => {
it('Returns iterator and passes through errors when stream payloads are filtered', async () => {
let returned = false;
let requested = false;
const iterable = {
Expand All @@ -1854,7 +1855,7 @@ describe('Execute: stream directive', () => {
},
return: () => {
returned = true;
// Ignores errors from return.
// This error should be passed through.
return Promise.reject(new Error('Oops'));
},
}),
Expand Down Expand Up @@ -1927,8 +1928,8 @@ describe('Execute: stream directive', () => {
},
});

const result3 = await iterator.next();
expectJSON(result3).toDeepEqual({ done: true, value: undefined });
const result3Promise = iterator.next();
await expectPromise(result3Promise).toRejectWith('Oops');

expect(returned).toBeTruthy();
});
Expand Down Expand Up @@ -2371,29 +2372,24 @@ describe('Execute: stream directive', () => {
});
it('Returns underlying async iterables when returned generator is returned', async () => {
let returned = false;
let index = 0;
const iterable = {
[Symbol.asyncIterator]: () => ({
next: () => {
const friend = friends[index++];
if (friend == null) {
return Promise.resolve({ done: true, value: undefined });
}
return Promise.resolve({ done: false, value: friend });
},
next: () =>
new Promise(() => {
/* never resolves */
}),
return: () => {
returned = true;
// This error should be passed through.
return Promise.reject(new Error('Oops'));
},
}),
};

const document = parse(/* GraphQL */ `
query {
friendList @stream(initialCount: 1) {
friendList @stream {
id
... @defer {
name
}
}
}
`);
Expand All @@ -2412,26 +2408,21 @@ describe('Execute: stream directive', () => {
const result1 = executeResult.initialResult;
expectJSON(result1).toDeepEqual({
data: {
friendList: [
{
id: '1',
},
],
friendList: [],
},
pending: [
{ id: '0', path: ['friendList', 0] },
{ id: '1', path: ['friendList'] },
],
pending: [{ id: '0', path: ['friendList'] }],
hasNext: true,
});

const result2Promise = iterator.next();
const returnPromise = iterator.return();

const result2 = await iterator.next();
const result2 = await result2Promise;
expectJSON(result2).toDeepEqual({
done: true,
value: undefined,
});
await returnPromise;
await expectPromise(returnPromise).toRejectWith('Oops');
expect(returned).toBeTruthy();
});
it('Can return async iterable when underlying iterable does not have a return method', async () => {
Expand Down Expand Up @@ -2553,13 +2544,7 @@ describe('Execute: stream directive', () => {
done: true,
value: undefined,
});
try {
await throwPromise; /* c8 ignore start */
// Not reachable, always throws
/* c8 ignore stop */
} catch (e) {
// ignore error
}
await expectPromise(throwPromise).toRejectWith('bad');
expect(returned).toBeTruthy();
});
});

0 comments on commit f7c7411

Please sign in to comment.