Skip to content

Commit

Permalink
upgrade executor to non-duplicating incremental delivery format
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Jun 5, 2024
1 parent 7d445ed commit 4d3cf4d
Show file tree
Hide file tree
Showing 24 changed files with 5,382 additions and 1,633 deletions.
17 changes: 17 additions & 0 deletions packages/executor/src/execution/AccumulatorMap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* ES6 Map with additional `add` method to accumulate items.
*/
export class AccumulatorMap<K, T> extends Map<K, Array<T>> {
get [Symbol.toStringTag]() {
return 'AccumulatorMap';
}

add(key: K, item: T): void {
const group = this.get(key);
if (group === undefined) {
this.set(key, [item]);
} else {
group.push(item);
}
}
}
25 changes: 25 additions & 0 deletions packages/executor/src/execution/BoxedPromiseOrValue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { isPromise } from '@graphql-tools/utils';
import type { MaybePromise } from '@graphql-tools/utils';

/**
* A BoxedPromiseOrValue is a container for a value or promise where the value
* will be updated when the promise resolves.
*
* A BoxedPromiseOrValue may only be used with promises whose possible
* rejection has already been handled, otherwise this will lead to unhandled
* promise rejections.
*
* @internal
* */
export class BoxedPromiseOrValue<T> {
value: MaybePromise<T>;

constructor(value: MaybePromise<T>) {
this.value = value;
if (isPromise(value)) {
value.then(resolved => {
this.value = resolved;
});
}
}
}
340 changes: 340 additions & 0 deletions packages/executor/src/execution/IncrementalGraph.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
import type { GraphQLError } from 'graphql';
import { isPromise } from '@graphql-tools/utils';
import { BoxedPromiseOrValue } from './BoxedPromiseOrValue.js';
import { promiseWithResolvers } from './promiseWithResolvers.js';
import type {
DeferredFragmentRecord,
DeferredGroupedFieldSetRecord,
IncrementalDataRecord,
IncrementalDataRecordResult,
ReconcilableDeferredGroupedFieldSetResult,
StreamItemRecord,
StreamRecord,
SubsequentResultRecord,
} from './types.js';
import { isDeferredGroupedFieldSetRecord } from './types.js';

interface DeferredFragmentNode {
deferredFragmentRecord: DeferredFragmentRecord;
deferredGroupedFieldSetRecords: Set<DeferredGroupedFieldSetRecord>;
reconcilableResults: Set<ReconcilableDeferredGroupedFieldSetResult>;
children: Array<DeferredFragmentNode>;
}

function isDeferredFragmentNode(
node: DeferredFragmentNode | undefined,
): node is DeferredFragmentNode {
return node !== undefined;
}

function isStreamNode(
record: SubsequentResultNode | IncrementalDataRecord,
): record is StreamRecord {
return 'streamItemQueue' in record;
}

type SubsequentResultNode = DeferredFragmentNode | StreamRecord;

/**
* @internal
*/
export class IncrementalGraph {
private _pending: Set<SubsequentResultNode>;
private _deferredFragmentNodes: Map<DeferredFragmentRecord, DeferredFragmentNode>;

private _newPending: Set<SubsequentResultNode>;
private _newIncrementalDataRecords: Set<IncrementalDataRecord>;
private _completedQueue: Array<IncrementalDataRecordResult>;
private _nextQueue: Array<
(iterable: IteratorResult<Iterable<IncrementalDataRecordResult>>) => void
>;

constructor() {
this._pending = new Set();
this._deferredFragmentNodes = new Map();
this._newIncrementalDataRecords = new Set();
this._newPending = new Set();
this._completedQueue = [];
this._nextQueue = [];
}

addIncrementalDataRecords(incrementalDataRecords: ReadonlyArray<IncrementalDataRecord>): void {
for (const incrementalDataRecord of incrementalDataRecords) {
if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) {
this._addDeferredGroupedFieldSetRecord(incrementalDataRecord);
} else {
this._addStreamRecord(incrementalDataRecord);
}
}
}

addCompletedReconcilableDeferredGroupedFieldSet(
reconcilableResult: ReconcilableDeferredGroupedFieldSetResult,
): void {
const deferredFragmentNodes: Array<DeferredFragmentNode> =
reconcilableResult.deferredGroupedFieldSetRecord.deferredFragmentRecords
.map(deferredFragmentRecord => this._deferredFragmentNodes.get(deferredFragmentRecord))
.filter<DeferredFragmentNode>(isDeferredFragmentNode);
for (const deferredFragmentNode of deferredFragmentNodes) {
deferredFragmentNode.deferredGroupedFieldSetRecords.delete(
reconcilableResult.deferredGroupedFieldSetRecord,
);
deferredFragmentNode.reconcilableResults.add(reconcilableResult);
}
}

getNewPending(): ReadonlyArray<SubsequentResultRecord> {
const newPending: Array<SubsequentResultRecord> = [];
for (const node of this._newPending) {
if (isStreamNode(node)) {
this._pending.add(node);
newPending.push(node);
this._newIncrementalDataRecords.add(node);
} else if (node.deferredGroupedFieldSetRecords.size > 0) {
for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) {
this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode);
}
this._pending.add(node);
newPending.push(node.deferredFragmentRecord);
} else {
for (const child of node.children) {
this._newPending.add(child);
}
}
}
this._newPending.clear();

for (const incrementalDataRecord of this._newIncrementalDataRecords) {
if (isStreamNode(incrementalDataRecord)) {
this._onStreamItems(incrementalDataRecord, incrementalDataRecord.streamItemQueue);
} else {
const deferredGroupedFieldSetResult = incrementalDataRecord.result;
const result =
deferredGroupedFieldSetResult instanceof BoxedPromiseOrValue
? deferredGroupedFieldSetResult.value
: deferredGroupedFieldSetResult().value;

if (isPromise(result)) {
result.then(resolved => this._enqueue(resolved));
} else {
this._enqueue(result);
}
}
}
this._newIncrementalDataRecords.clear();

return newPending;
}

completedIncrementalData() {
return {
[Symbol.asyncIterator]() {
return this;
},
next: (): Promise<IteratorResult<Iterable<IncrementalDataRecordResult>>> => {
const firstResult = this._completedQueue.shift();
if (firstResult !== undefined) {
return Promise.resolve({
value: this._yieldCurrentCompletedIncrementalData(firstResult),
done: false,
});
}
const { promise, resolve } =
promiseWithResolvers<IteratorResult<Iterable<IncrementalDataRecordResult>>>();
this._nextQueue.push(resolve);
return promise;
},
return: (): Promise<IteratorResult<Iterable<IncrementalDataRecordResult>>> => {
for (const resolve of this._nextQueue) {
resolve({ value: undefined, done: true });
}
return Promise.resolve({ value: undefined, done: true });
},
};
}

hasNext(): boolean {
return this._pending.size > 0;
}

completeDeferredFragment(
deferredFragmentRecord: DeferredFragmentRecord,
): Array<ReconcilableDeferredGroupedFieldSetResult> | undefined {
const deferredFragmentNode = this._deferredFragmentNodes.get(deferredFragmentRecord);
// TODO: add test case?
/* c8 ignore next 3 */
if (deferredFragmentNode === undefined) {
return undefined;
}
if (deferredFragmentNode.deferredGroupedFieldSetRecords.size > 0) {
return;
}
const reconcilableResults = Array.from(deferredFragmentNode.reconcilableResults);
for (const reconcilableResult of reconcilableResults) {
for (const otherDeferredFragmentRecord of reconcilableResult.deferredGroupedFieldSetRecord
.deferredFragmentRecords) {
const otherDeferredFragmentNode = this._deferredFragmentNodes.get(
otherDeferredFragmentRecord,
);
if (otherDeferredFragmentNode === undefined) {
continue;
}
otherDeferredFragmentNode.reconcilableResults.delete(reconcilableResult);
}
}
this._removePending(deferredFragmentNode);
for (const child of deferredFragmentNode.children) {
this._newPending.add(child);
}
return reconcilableResults;
}

removeDeferredFragment(deferredFragmentRecord: DeferredFragmentRecord): boolean {
const deferredFragmentNode = this._deferredFragmentNodes.get(deferredFragmentRecord);
if (deferredFragmentNode === undefined) {
return false;
}
this._removePending(deferredFragmentNode);
this._deferredFragmentNodes.delete(deferredFragmentRecord);
// TODO: add test case for an erroring deferred fragment with child defers
/* c8 ignore next 3 */
for (const child of deferredFragmentNode.children) {
this.removeDeferredFragment(child.deferredFragmentRecord);
}
return true;
}

removeStream(streamRecord: StreamRecord): void {
this._removePending(streamRecord);
}

private _removePending(subsequentResultNode: SubsequentResultNode): void {
this._pending.delete(subsequentResultNode);
if (this._pending.size === 0) {
for (const resolve of this._nextQueue) {
resolve({ value: undefined, done: true });
}
}
}

private _addDeferredGroupedFieldSetRecord(
deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord,
): void {
for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) {
const deferredFragmentNode = this._addDeferredFragmentNode(deferredFragmentRecord);
if (this._pending.has(deferredFragmentNode)) {
this._newIncrementalDataRecords.add(deferredGroupedFieldSetRecord);
}
deferredFragmentNode.deferredGroupedFieldSetRecords.add(deferredGroupedFieldSetRecord);
}
}

private _addStreamRecord(streamRecord: StreamRecord): void {
this._newPending.add(streamRecord);
}

private _addDeferredFragmentNode(
deferredFragmentRecord: DeferredFragmentRecord,
): DeferredFragmentNode {
let deferredFragmentNode = this._deferredFragmentNodes.get(deferredFragmentRecord);
if (deferredFragmentNode !== undefined) {
return deferredFragmentNode;
}
deferredFragmentNode = {
deferredFragmentRecord,
deferredGroupedFieldSetRecords: new Set(),
reconcilableResults: new Set(),
children: [],
};
this._deferredFragmentNodes.set(deferredFragmentRecord, deferredFragmentNode);
const parent = deferredFragmentRecord.parent;
if (parent === undefined) {
this._newPending.add(deferredFragmentNode);
return deferredFragmentNode;
}
const parentNode = this._addDeferredFragmentNode(parent);
parentNode.children.push(deferredFragmentNode);
return deferredFragmentNode;
}

private async _onStreamItems(
streamRecord: StreamRecord,
streamItemQueue: Array<StreamItemRecord>,
): Promise<void> {
let items: Array<unknown> = [];
let errors: Array<GraphQLError> = [];
let incrementalDataRecords: Array<IncrementalDataRecord> = [];
let streamItemRecord: StreamItemRecord | undefined;
while ((streamItemRecord = streamItemQueue.shift()) !== undefined) {
let result =
typeof streamItemRecord === 'function' ? streamItemRecord().value : streamItemRecord.value;
if (isPromise(result)) {
if (items.length > 0) {
this._enqueue({
streamRecord,
result:
// TODO add additional test case or rework for coverage
errors.length > 0 /* c8 ignore start */
? { items, errors } /* c8 ignore stop */
: { items },
incrementalDataRecords,
});
items = [];
errors = [];
incrementalDataRecords = [];
}
result = await result;
// wait an additional tick to coalesce resolving additional promises
// within the queue
await Promise.resolve();
}
if (result.item === undefined) {
if (items.length > 0) {
this._enqueue({
streamRecord,
result: errors.length > 0 ? { items, errors } : { items },
incrementalDataRecords,
});
}
this._enqueue(
result.errors === undefined
? { streamRecord }
: {
streamRecord,
errors: result.errors,
},
);
return;
}
items.push(result.item);
if (result.errors !== undefined) {
errors.push(...result.errors);
}
if (result.incrementalDataRecords !== undefined) {
incrementalDataRecords.push(...result.incrementalDataRecords);
}
}
}

private *_yieldCurrentCompletedIncrementalData(
first: IncrementalDataRecordResult,
): Generator<IncrementalDataRecordResult> {
yield first;
let completed;
while ((completed = this._completedQueue.shift()) !== undefined) {
yield completed;
}
}

private _enqueue(completed: IncrementalDataRecordResult): void {
const next = this._nextQueue.shift();
if (next !== undefined) {
next({
value: this._yieldCurrentCompletedIncrementalData(completed),
done: false,
});
return;
}
this._completedQueue.push(completed);
}
}
Loading

0 comments on commit 4d3cf4d

Please sign in to comment.