From 05e7a2983d649dc84f6bc0e671b2349ccc56d6b4 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Thu, 18 Apr 2024 16:45:58 +0300 Subject: [PATCH] incremental: introduce GraphQLWrappedResult to avoid filtering (#4026) following https://github.com/graphql/graphql-spec/pull/1077 now part of the following PR stack, with the laters PRs extracted from this one #4026: incremental: introduce GraphQLWrappedResult to avoid filtering #4050: perf: allow skipping of field plan generation #4051: perf: introduce completePromisedListItemValue #4052: refactor: introduce completeIterableValue #4053: perf: optimize completion loops #4046: perf: use undefined for empty --- src/execution/IncrementalPublisher.ts | 952 ++++++++-------- src/execution/__tests__/defer-test.ts | 50 +- src/execution/__tests__/executor-test.ts | 51 + src/execution/__tests__/stream-test.ts | 68 +- src/execution/buildFieldPlan.ts | 50 +- src/execution/execute.ts | 1250 ++++++++++++---------- src/jsutils/promiseForObject.ts | 7 +- 7 files changed, 1207 insertions(+), 1221 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 1ca31acb88..b5f66b6322 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -1,6 +1,8 @@ +import { isPromise } from '../jsutils/isPromise.js'; import type { ObjMap } from '../jsutils/ObjMap.js'; import type { Path } from '../jsutils/Path.js'; import { pathToArray } from '../jsutils/Path.js'; +import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; import type { @@ -8,14 +10,6 @@ import type { GraphQLFormattedError, } from '../error/GraphQLError.js'; -import type { GroupedFieldSet } from './buildFieldPlan.js'; - -interface IncrementalUpdate> { - pending: ReadonlyArray; - incremental: ReadonlyArray>; - completed: ReadonlyArray; -} - /** * The result of GraphQL execution. * @@ -78,7 +72,10 @@ export interface FormattedInitialIncrementalExecutionResult< export interface SubsequentIncrementalExecutionResult< TData = unknown, TExtensions = ObjMap, -> extends Partial> { +> { + pending?: ReadonlyArray; + incremental?: ReadonlyArray>; + completed?: ReadonlyArray; hasNext: boolean; extensions?: TExtensions; } @@ -94,12 +91,15 @@ export interface FormattedSubsequentIncrementalExecutionResult< extensions?: TExtensions; } +interface BareDeferredGroupedFieldSetResult> { + errors?: ReadonlyArray; + data: TData; +} + export interface IncrementalDeferResult< TData = ObjMap, TExtensions = ObjMap, -> { - errors?: ReadonlyArray; - data: TData; +> extends BareDeferredGroupedFieldSetResult { id: string; subPath?: ReadonlyArray; extensions?: TExtensions; @@ -116,12 +116,15 @@ export interface FormattedIncrementalDeferResult< extensions?: TExtensions; } -export interface IncrementalStreamResult< - TData = Array, - TExtensions = ObjMap, -> { +interface BareStreamItemsResult> { errors?: ReadonlyArray; items: TData; +} + +export interface IncrementalStreamResult< + TData = ReadonlyArray, + TExtensions = ObjMap, +> extends BareStreamItemsResult { id: string; subPath?: ReadonlyArray; extensions?: TExtensions; @@ -166,214 +169,209 @@ export interface FormattedCompletedResult { errors?: ReadonlyArray; } +export function buildIncrementalResponse( + context: IncrementalPublisherContext, + result: ObjMap, + errors: ReadonlyArray, + incrementalDataRecords: ReadonlyArray, +): ExperimentalIncrementalExecutionResults { + const incrementalPublisher = new IncrementalPublisher(context); + return incrementalPublisher.buildResponse( + result, + errors, + incrementalDataRecords, + ); +} + +interface IncrementalPublisherContext { + cancellableStreams: Set; +} + /** * This class is used to publish incremental results to the client, enabling semi-concurrent * execution while preserving result order. * - * The internal publishing state is managed as follows: - * - * '_released': the set of Subsequent Result records that are ready to be sent to the client, - * i.e. their parents have completed and they have also completed. - * - * `_pending`: the set of Subsequent Result records that are definitely pending, i.e. their - * parents have completed so that they can no longer be filtered. This includes all Subsequent - * Result records in `released`, as well as the records that have not yet completed. - * * @internal */ -export class IncrementalPublisher { - private _nextId = 0; - private _released: Set; +class IncrementalPublisher { + private _context: IncrementalPublisherContext; + private _nextId: number; private _pending: Set; - + private _completedResultQueue: Array; + private _newPending: Set; + private _incremental: Array; + private _completed: Array; // these are assigned within the Promise executor called synchronously within the constructor private _signalled!: Promise; private _resolve!: () => void; - constructor() { - this._released = new Set(); + constructor(context: IncrementalPublisherContext) { + this._context = context; + this._nextId = 0; this._pending = new Set(); + this._completedResultQueue = []; + this._newPending = new Set(); + this._incremental = []; + this._completed = []; this._reset(); } - reportNewDeferFragmentRecord( - deferredFragmentRecord: DeferredFragmentRecord, - parentIncrementalResultRecord: - | InitialResultRecord - | DeferredFragmentRecord - | StreamItemsRecord, - ): void { - parentIncrementalResultRecord.children.add(deferredFragmentRecord); - } + buildResponse( + data: ObjMap, + errors: ReadonlyArray, + incrementalDataRecords: ReadonlyArray, + ): ExperimentalIncrementalExecutionResults { + this._addIncrementalDataRecords(incrementalDataRecords); + this._pruneEmpty(); - reportNewDeferredGroupedFieldSetRecord( - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, - ): void { - for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) { - deferredFragmentRecord._pending.add(deferredGroupedFieldSetRecord); - deferredFragmentRecord.deferredGroupedFieldSetRecords.add( - deferredGroupedFieldSetRecord, - ); - } + const pending = this._pendingSourcesToResults(); + + const initialResult: InitialIncrementalExecutionResult = + errors.length === 0 + ? { data, pending, hasNext: true } + : { errors, data, pending, hasNext: true }; + + return { + initialResult, + subsequentResults: this._subscribe(), + }; } - reportNewStreamItemsRecord( - streamItemsRecord: StreamItemsRecord, - parentIncrementalDataRecord: IncrementalDataRecord, + private _addIncrementalDataRecords( + incrementalDataRecords: ReadonlyArray, ): void { - if (isDeferredGroupedFieldSetRecord(parentIncrementalDataRecord)) { - for (const parent of parentIncrementalDataRecord.deferredFragmentRecords) { - parent.children.add(streamItemsRecord); + for (const incrementalDataRecord of incrementalDataRecords) { + if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) { + for (const deferredFragmentRecord of incrementalDataRecord.deferredFragmentRecords) { + deferredFragmentRecord.expectedReconcilableResults++; + + this._addDeferredFragmentRecord(deferredFragmentRecord); + } + + const result = incrementalDataRecord.result; + if (isPromise(result)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + result.then((resolved) => { + this._enqueueCompletedDeferredGroupedFieldSet(resolved); + }); + } else { + this._enqueueCompletedDeferredGroupedFieldSet(result); + } + + continue; } - } else { - parentIncrementalDataRecord.children.add(streamItemsRecord); - } - } - completeDeferredGroupedFieldSet( - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, - data: ObjMap, - ): void { - deferredGroupedFieldSetRecord.data = data; - for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) { - deferredFragmentRecord._pending.delete(deferredGroupedFieldSetRecord); - if (deferredFragmentRecord._pending.size === 0) { - this.completeDeferredFragmentRecord(deferredFragmentRecord); + const streamRecord = incrementalDataRecord.streamRecord; + if (streamRecord.id === undefined) { + this._newPending.add(streamRecord); } - } - } - markErroredDeferredGroupedFieldSet( - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, - error: GraphQLError, - ): void { - for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) { - deferredFragmentRecord.errors.push(error); - this.completeDeferredFragmentRecord(deferredFragmentRecord); + const result = incrementalDataRecord.result; + if (isPromise(result)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + result.then((resolved) => { + this._enqueueCompletedStreamItems(resolved); + }); + } else { + this._enqueueCompletedStreamItems(result); + } } } - completeDeferredFragmentRecord( + private _addDeferredFragmentRecord( deferredFragmentRecord: DeferredFragmentRecord, ): void { - this._release(deferredFragmentRecord); - } - - completeStreamItemsRecord( - streamItemsRecord: StreamItemsRecord, - items: Array, - ) { - streamItemsRecord.items = items; - streamItemsRecord.isCompleted = true; - this._release(streamItemsRecord); - } + const parent = deferredFragmentRecord.parent; + if (parent === undefined) { + // Below is equivalent and slightly faster version of: + // if (this._pending.has(deferredFragmentRecord)) { ... } + // as all released deferredFragmentRecords have ids. + if (deferredFragmentRecord.id !== undefined) { + return; + } - markErroredStreamItemsRecord( - streamItemsRecord: StreamItemsRecord, - error: GraphQLError, - ) { - streamItemsRecord.streamRecord.errors.push(error); - this.setIsFinalRecord(streamItemsRecord); - streamItemsRecord.isCompleted = true; - streamItemsRecord.streamRecord.earlyReturn?.().catch(() => { - // ignore error - }); - this._release(streamItemsRecord); - } + this._newPending.add(deferredFragmentRecord); + return; + } - setIsFinalRecord(streamItemsRecord: StreamItemsRecord) { - streamItemsRecord.isFinalRecord = true; - } + if (parent.children.has(deferredFragmentRecord)) { + return; + } - setIsCompletedAsyncIterator(streamItemsRecord: StreamItemsRecord) { - streamItemsRecord.isCompletedAsyncIterator = true; - this.setIsFinalRecord(streamItemsRecord); - } + parent.children.add(deferredFragmentRecord); - addFieldError( - incrementalDataRecord: IncrementalDataRecord, - error: GraphQLError, - ) { - incrementalDataRecord.errors.push(error); + this._addDeferredFragmentRecord(parent); } - buildDataResponse( - initialResultRecord: InitialResultRecord, - data: ObjMap | null, - ): ExecutionResult | ExperimentalIncrementalExecutionResults { - const pendingSources = this._publish(initialResultRecord.children); - - const errors = initialResultRecord.errors; - const initialResult = errors.length === 0 ? { data } : { errors, data }; - if (pendingSources.size > 0) { - return { - initialResult: { - ...initialResult, - pending: this._pendingSourcesToResults(pendingSources), - hasNext: true, - }, - subsequentResults: this._subscribe(), - }; + private _pruneEmpty() { + const maybeEmptyNewPending = this._newPending; + this._newPending = new Set(); + for (const node of maybeEmptyNewPending) { + if (isDeferredFragmentRecord(node)) { + if (node.expectedReconcilableResults) { + this._newPending.add(node); + continue; + } + for (const child of node.children) { + this._addNonEmptyNewPending(child); + } + } else { + this._newPending.add(node); + } } - return initialResult; } - buildErrorResponse( - initialResultRecord: InitialResultRecord, - error: GraphQLError, - ): ExecutionResult { - const errors = initialResultRecord.errors; - errors.push(error); - return { data: null, errors }; + private _addNonEmptyNewPending( + deferredFragmentRecord: DeferredFragmentRecord, + ): void { + if (deferredFragmentRecord.expectedReconcilableResults) { + this._newPending.add(deferredFragmentRecord); + return; + } + /* c8 ignore next 5 */ + // TODO: add test case for this, if when skipping an empty deferred fragment, the empty fragment has nested children. + for (const child of deferredFragmentRecord.children) { + this._addNonEmptyNewPending(child); + } } - filter( - nullPath: Path | undefined, - erroringIncrementalDataRecord: IncrementalDataRecord, + private _enqueueCompletedDeferredGroupedFieldSet( + result: DeferredGroupedFieldSetResult, ): void { - const nullPathArray = pathToArray(nullPath); - - const streams = new Set(); - - const children = this._getChildren(erroringIncrementalDataRecord); - const descendants = this._getDescendants(children); - - for (const child of descendants) { - if (!this._nullsChildSubsequentResultRecord(child, nullPathArray)) { - continue; - } - - child.filtered = true; - - if (isStreamItemsRecord(child)) { - streams.add(child.streamRecord); + let hasPendingParent = false; + for (const deferredFragmentRecord of result.deferredFragmentRecords) { + if (deferredFragmentRecord.id !== undefined) { + hasPendingParent = true; } + deferredFragmentRecord.results.push(result); } + if (hasPendingParent) { + this._completedResultQueue.push(result); + this._trigger(); + } + } - streams.forEach((stream) => { - stream.earlyReturn?.().catch(() => { - // ignore error - }); - }); + private _enqueueCompletedStreamItems(result: StreamItemsResult): void { + this._completedResultQueue.push(result); + this._trigger(); } - private _pendingSourcesToResults( - pendingSources: ReadonlySet, - ): Array { + private _pendingSourcesToResults(): Array { const pendingResults: Array = []; - for (const pendingSource of pendingSources) { - pendingSource.pendingSent = true; - const id = this._getNextId(); + for (const pendingSource of this._newPending) { + const id = String(this._getNextId()); + this._pending.add(pendingSource); pendingSource.id = id; const pendingResult: PendingResult = { id, - path: pendingSource.path, + path: pathToArray(pendingSource.path), }; if (pendingSource.label !== undefined) { pendingResult.label = pendingSource.label; } pendingResults.push(pendingResult); } + this._newPending.clear(); return pendingResults; } @@ -391,47 +389,67 @@ export class IncrementalPublisher { const _next = async (): Promise< IteratorResult > => { - // eslint-disable-next-line no-constant-condition - while (true) { - if (isDone) { - return { value: undefined, done: true }; - } + while (!isDone) { + let pending: Array = []; + + let completedResult: IncrementalDataRecordResult | undefined; + while ( + (completedResult = this._completedResultQueue.shift()) !== undefined + ) { + if (isDeferredGroupedFieldSetResult(completedResult)) { + this._handleCompletedDeferredGroupedFieldSet(completedResult); + } else { + this._handleCompletedStreamItems(completedResult); + } - for (const item of this._released) { - this._pending.delete(item); + pending = [...pending, ...this._pendingSourcesToResults()]; } - const released = this._released; - this._released = new Set(); - const result = this._getIncrementalResult(released); + if (this._incremental.length > 0 || this._completed.length > 0) { + const hasNext = this._pending.size > 0; - if (this._pending.size === 0) { - isDone = true; - } + if (!hasNext) { + isDone = true; + } + + const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult = + { hasNext }; + + if (pending.length > 0) { + subsequentIncrementalExecutionResult.pending = pending; + } + if (this._incremental.length > 0) { + subsequentIncrementalExecutionResult.incremental = + this._incremental; + } + if (this._completed.length > 0) { + subsequentIncrementalExecutionResult.completed = this._completed; + } - if (result !== undefined) { - return { value: result, done: false }; + this._incremental = []; + this._completed = []; + + return { value: subsequentIncrementalExecutionResult, done: false }; } // eslint-disable-next-line no-await-in-loop await this._signalled; } + + await returnStreamIterators().catch(() => { + // ignore errors + }); + + return { value: undefined, done: true }; }; const returnStreamIterators = async (): Promise => { - const streams = new Set(); - const descendants = this._getDescendants(this._pending); - for (const subsequentResultRecord of descendants) { - if (isStreamItemsRecord(subsequentResultRecord)) { - streams.add(subsequentResultRecord.streamRecord); - } - } const promises: Array> = []; - streams.forEach((streamRecord) => { - if (streamRecord.earlyReturn) { + for (const streamRecord of this._context.cancellableStreams) { + if (streamRecord.earlyReturn !== undefined) { promises.push(streamRecord.earlyReturn()); } - }); + } await Promise.all(promises); }; @@ -475,385 +493,293 @@ export class IncrementalPublisher { this._signalled = signalled; } - private _introduce(item: SubsequentResultRecord) { - this._pending.add(item); - } - - private _release(item: SubsequentResultRecord): void { - if (this._pending.has(item)) { - this._released.add(item); - this._trigger(); - } - } - - private _push(item: SubsequentResultRecord): void { - this._released.add(item); - this._pending.add(item); - this._trigger(); - } - - private _getIncrementalResult( - completedRecords: ReadonlySet, - ): SubsequentIncrementalExecutionResult | undefined { - const { pending, incremental, completed } = - this._processPending(completedRecords); - - const hasNext = this._pending.size > 0; - if (incremental.length === 0 && completed.length === 0 && hasNext) { - return undefined; - } - - const result: SubsequentIncrementalExecutionResult = { hasNext }; - if (pending.length) { - result.pending = pending; - } - if (incremental.length) { - result.incremental = incremental; - } - if (completed.length) { - result.completed = completed; - } - - return result; - } - - private _processPending( - completedRecords: ReadonlySet, - ): IncrementalUpdate { - const newPendingSources = new Set(); - const incrementalResults: Array = []; - const completedResults: Array = []; - for (const subsequentResultRecord of completedRecords) { - this._publish(subsequentResultRecord.children, newPendingSources); - if (isStreamItemsRecord(subsequentResultRecord)) { - if (subsequentResultRecord.isFinalRecord) { - newPendingSources.delete(subsequentResultRecord.streamRecord); - completedResults.push( - this._completedRecordToResult(subsequentResultRecord.streamRecord), - ); - } - if (subsequentResultRecord.isCompletedAsyncIterator) { - // async iterable resolver just finished but there may be pending payloads - continue; - } - if (subsequentResultRecord.streamRecord.errors.length > 0) { - continue; - } - const incrementalResult: IncrementalStreamResult = { - // safe because `items` is always defined when the record is completed - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - items: subsequentResultRecord.items!, - // safe because `id` is defined once the stream has been released as pending - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - id: subsequentResultRecord.streamRecord.id!, - }; - if (subsequentResultRecord.errors.length > 0) { - incrementalResult.errors = subsequentResultRecord.errors; - } - incrementalResults.push(incrementalResult); - } else { - newPendingSources.delete(subsequentResultRecord); - completedResults.push( - this._completedRecordToResult(subsequentResultRecord), - ); - if (subsequentResultRecord.errors.length > 0) { - continue; - } - for (const deferredGroupedFieldSetRecord of subsequentResultRecord.deferredGroupedFieldSetRecords) { - if (!deferredGroupedFieldSetRecord.sent) { - deferredGroupedFieldSetRecord.sent = true; - const incrementalResult: IncrementalDeferResult = - this._getIncrementalDeferResult(deferredGroupedFieldSetRecord); - if (deferredGroupedFieldSetRecord.errors.length > 0) { - incrementalResult.errors = deferredGroupedFieldSetRecord.errors; - } - incrementalResults.push(incrementalResult); - } + private _handleCompletedDeferredGroupedFieldSet( + deferredGroupedFieldSetResult: DeferredGroupedFieldSetResult, + ): void { + if ( + isNonReconcilableDeferredGroupedFieldSetResult( + deferredGroupedFieldSetResult, + ) + ) { + for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { + const id = deferredFragmentRecord.id; + if (id !== undefined) { + this._completed.push({ + id, + errors: deferredGroupedFieldSetResult.errors, + }); + this._pending.delete(deferredFragmentRecord); } } + return; + } + for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { + deferredFragmentRecord.reconcilableResults.push( + deferredGroupedFieldSetResult, + ); } - return { - pending: this._pendingSourcesToResults(newPendingSources), - incremental: incrementalResults, - completed: completedResults, - }; - } + this._addIncrementalDataRecords( + deferredGroupedFieldSetResult.incrementalDataRecords, + ); - private _getIncrementalDeferResult( - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, - ): IncrementalDeferResult { - const { data, deferredFragmentRecords } = deferredGroupedFieldSetRecord; - let maxLength: number | undefined; - let idWithLongestPath: string | undefined; - for (const deferredFragmentRecord of deferredFragmentRecords) { + for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { const id = deferredFragmentRecord.id; - // TODO: add test + // TODO: add test case for this. + // Presumably, this can occur if an error causes a fragment to be completed early, + // while an asynchronous deferred grouped field set result is enqueued. /* c8 ignore next 3 */ if (id === undefined) { continue; } - const length = deferredFragmentRecord.path.length; - if (maxLength === undefined || length > maxLength) { - maxLength = length; - idWithLongestPath = id; - } - } - const subPath = deferredGroupedFieldSetRecord.path.slice(maxLength); - const incrementalDeferResult: IncrementalDeferResult = { - // safe because `data``is always defined when the record is completed - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - data: data!, - // safe because `id` is always defined once the fragment has been released - // as pending and at least one fragment has been completed, so must have been - // released as pending - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - id: idWithLongestPath!, - }; - - if (subPath.length > 0) { - incrementalDeferResult.subPath = subPath; - } - - return incrementalDeferResult; - } - - private _completedRecordToResult( - completedRecord: DeferredFragmentRecord | StreamRecord, - ): CompletedResult { - const result: CompletedResult = { - // safe because `id` is defined once the stream has been released as pending - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - id: completedRecord.id!, - }; - if (completedRecord.errors.length > 0) { - result.errors = completedRecord.errors; - } - return result; - } - - private _publish( - subsequentResultRecords: ReadonlySet, - pendingSources = new Set(), - ): Set { - const emptyRecords: Array = []; - - for (const subsequentResultRecord of subsequentResultRecords) { - if (subsequentResultRecord.filtered) { + const reconcilableResults = deferredFragmentRecord.reconcilableResults; + if ( + deferredFragmentRecord.expectedReconcilableResults !== + reconcilableResults.length + ) { continue; } - if (isStreamItemsRecord(subsequentResultRecord)) { - if (subsequentResultRecord.isCompleted) { - this._push(subsequentResultRecord); - } else { - this._introduce(subsequentResultRecord); + for (const reconcilableResult of reconcilableResults) { + if (reconcilableResult.sent) { + continue; } - - const stream = subsequentResultRecord.streamRecord; - if (!stream.pendingSent) { - pendingSources.add(stream); + reconcilableResult.sent = true; + const { bestId, subPath } = this._getBestIdAndSubPath( + id, + deferredFragmentRecord, + reconcilableResult, + ); + const incrementalEntry: IncrementalDeferResult = { + ...reconcilableResult.result, + id: bestId, + }; + if (subPath !== undefined) { + incrementalEntry.subPath = subPath; } - continue; - } - - if (subsequentResultRecord._pending.size > 0) { - this._introduce(subsequentResultRecord); - } else if ( - subsequentResultRecord.deferredGroupedFieldSetRecords.size === 0 - ) { - emptyRecords.push(subsequentResultRecord); - continue; - } else { - this._push(subsequentResultRecord); + this._incremental.push(incrementalEntry); } - - if (!subsequentResultRecord.pendingSent) { - pendingSources.add(subsequentResultRecord); + this._completed.push({ id }); + this._pending.delete(deferredFragmentRecord); + for (const child of deferredFragmentRecord.children) { + this._newPending.add(child); + this._completedResultQueue.push(...child.results); } } - for (const emptyRecord of emptyRecords) { - this._publish(emptyRecord.children, pendingSources); - } - - return pendingSources; + this._pruneEmpty(); } - private _getChildren( - erroringIncrementalDataRecord: IncrementalDataRecord, - ): ReadonlySet { - const children = new Set(); - if (isDeferredGroupedFieldSetRecord(erroringIncrementalDataRecord)) { - for (const erroringIncrementalResultRecord of erroringIncrementalDataRecord.deferredFragmentRecords) { - for (const child of erroringIncrementalResultRecord.children) { - children.add(child); - } + private _handleCompletedStreamItems( + streamItemsResult: StreamItemsResult, + ): void { + const streamRecord = streamItemsResult.streamRecord; + const id = streamRecord.id; + // TODO: Consider adding invariant or non-null assertion, as this should never happen. Since the stream is converted into a linked list + // for ordering purposes, if an entry errors, additional entries will not be processed. + /* c8 ignore next 3 */ + if (id === undefined) { + return; + } + if (streamItemsResult.errors !== undefined) { + this._completed.push({ + id, + errors: streamItemsResult.errors, + }); + this._pending.delete(streamRecord); + if (isCancellableStreamRecord(streamRecord)) { + this._context.cancellableStreams.delete(streamRecord); + streamRecord.earlyReturn().catch(() => { + /* c8 ignore next 1 */ + // ignore error + }); } - } else { - for (const child of erroringIncrementalDataRecord.children) { - children.add(child); + } else if (streamItemsResult.result === undefined) { + this._completed.push({ id }); + this._pending.delete(streamRecord); + if (isCancellableStreamRecord(streamRecord)) { + this._context.cancellableStreams.delete(streamRecord); } - } - return children; - } - - private _getDescendants( - children: ReadonlySet, - descendants = new Set(), - ): ReadonlySet { - for (const child of children) { - descendants.add(child); - this._getDescendants(child.children, descendants); - } - return descendants; - } + } else { + const incrementalEntry: IncrementalStreamResult = { + id, + ...streamItemsResult.result, + }; - private _nullsChildSubsequentResultRecord( - subsequentResultRecord: SubsequentResultRecord, - nullPath: ReadonlyArray, - ): boolean { - const incrementalDataRecords = isStreamItemsRecord(subsequentResultRecord) - ? [subsequentResultRecord] - : subsequentResultRecord.deferredGroupedFieldSetRecords; + this._incremental.push(incrementalEntry); - for (const incrementalDataRecord of incrementalDataRecords) { - if (this._matchesPath(incrementalDataRecord.path, nullPath)) { - return true; + if (streamItemsResult.incrementalDataRecords.length > 0) { + this._addIncrementalDataRecords( + streamItemsResult.incrementalDataRecords, + ); + this._pruneEmpty(); } } - - return false; } - private _matchesPath( - testPath: ReadonlyArray, - basePath: ReadonlyArray, - ): boolean { - for (let i = 0; i < basePath.length; i++) { - if (basePath[i] !== testPath[i]) { - // testPath points to a path unaffected at basePath - return false; + private _getBestIdAndSubPath( + initialId: string, + initialDeferredFragmentRecord: DeferredFragmentRecord, + deferredGroupedFieldSetResult: DeferredGroupedFieldSetResult, + ): { bestId: string; subPath: ReadonlyArray | undefined } { + let maxLength = pathToArray(initialDeferredFragmentRecord.path).length; + let bestId = initialId; + + for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) { + if (deferredFragmentRecord === initialDeferredFragmentRecord) { + continue; + } + const id = deferredFragmentRecord.id; + // TODO: add test case for when an fragment has not been released, but might be processed for the shortest path. + /* c8 ignore next 3 */ + if (id === undefined) { + continue; + } + const fragmentPath = pathToArray(deferredFragmentRecord.path); + const length = fragmentPath.length; + if (length > maxLength) { + maxLength = length; + bestId = id; } } - return true; + const subPath = deferredGroupedFieldSetResult.path.slice(maxLength); + return { + bestId, + subPath: subPath.length > 0 ? subPath : undefined, + }; } } +function isDeferredFragmentRecord( + subsequentResultRecord: SubsequentResultRecord, +): subsequentResultRecord is DeferredFragmentRecord { + return 'parent' in subsequentResultRecord; +} + function isDeferredGroupedFieldSetRecord( - incrementalDataRecord: unknown, + incrementalDataRecord: IncrementalDataRecord, ): incrementalDataRecord is DeferredGroupedFieldSetRecord { - return incrementalDataRecord instanceof DeferredGroupedFieldSetRecord; + return 'deferredFragmentRecords' in incrementalDataRecord; } -function isStreamItemsRecord( - subsequentResultRecord: unknown, -): subsequentResultRecord is StreamItemsRecord { - return subsequentResultRecord instanceof StreamItemsRecord; +export type DeferredGroupedFieldSetResult = + | ReconcilableDeferredGroupedFieldSetResult + | NonReconcilableDeferredGroupedFieldSetResult; + +function isDeferredGroupedFieldSetResult( + subsequentResult: DeferredGroupedFieldSetResult | StreamItemsResult, +): subsequentResult is DeferredGroupedFieldSetResult { + return 'deferredFragmentRecords' in subsequentResult; } -/** @internal */ -export class InitialResultRecord { - errors: Array; - children: Set; - constructor() { - this.errors = []; - this.children = new Set(); - } +interface ReconcilableDeferredGroupedFieldSetResult { + deferredFragmentRecords: ReadonlyArray; + path: Array; + result: BareDeferredGroupedFieldSetResult; + incrementalDataRecords: ReadonlyArray; + sent?: true | undefined; + errors?: never; } -/** @internal */ -export class DeferredGroupedFieldSetRecord { - path: ReadonlyArray; +interface NonReconcilableDeferredGroupedFieldSetResult { + errors: ReadonlyArray; deferredFragmentRecords: ReadonlyArray; - groupedFieldSet: GroupedFieldSet; - shouldInitiateDefer: boolean; - errors: Array; - data: ObjMap | undefined; - sent: boolean; + path: Array; + result?: never; +} - constructor(opts: { - path: Path | undefined; - deferredFragmentRecords: ReadonlyArray; - groupedFieldSet: GroupedFieldSet; - shouldInitiateDefer: boolean; - }) { - this.path = pathToArray(opts.path); - this.deferredFragmentRecords = opts.deferredFragmentRecords; - this.groupedFieldSet = opts.groupedFieldSet; - this.shouldInitiateDefer = opts.shouldInitiateDefer; - this.errors = []; - this.sent = false; - } +function isNonReconcilableDeferredGroupedFieldSetResult( + deferredGroupedFieldSetResult: DeferredGroupedFieldSetResult, +): deferredGroupedFieldSetResult is NonReconcilableDeferredGroupedFieldSetResult { + return deferredGroupedFieldSetResult.errors !== undefined; } -/** @internal */ -export class DeferredFragmentRecord { - path: ReadonlyArray; +export interface DeferredGroupedFieldSetRecord { + deferredFragmentRecords: ReadonlyArray; + result: PromiseOrValue; +} + +export interface SubsequentResultRecord { + path: Path | undefined; label: string | undefined; - id: string | undefined; - children: Set; - deferredGroupedFieldSetRecords: Set; - errors: Array; - filtered: boolean; - pendingSent?: boolean; - _pending: Set; - - constructor(opts: { path: Path | undefined; label: string | undefined }) { - this.path = pathToArray(opts.path); - this.label = opts.label; - this.children = new Set(); - this.filtered = false; - this.deferredGroupedFieldSetRecords = new Set(); - this.errors = []; - this._pending = new Set(); - } + id?: string | undefined; } /** @internal */ -export class StreamRecord { +export class DeferredFragmentRecord implements SubsequentResultRecord { + path: Path | undefined; label: string | undefined; - path: ReadonlyArray; - id: string | undefined; - errors: Array; - earlyReturn?: (() => Promise) | undefined; - pendingSent?: boolean; + id?: string | undefined; + parent: DeferredFragmentRecord | undefined; + expectedReconcilableResults: number; + results: Array; + reconcilableResults: Array; + children: Set; + constructor(opts: { + path: Path | undefined; label: string | undefined; - path: Path; - earlyReturn?: (() => Promise) | undefined; + parent: DeferredFragmentRecord | undefined; }) { + this.path = opts.path; this.label = opts.label; - this.path = pathToArray(opts.path); - this.errors = []; - this.earlyReturn = opts.earlyReturn; + this.parent = opts.parent; + this.expectedReconcilableResults = 0; + this.results = []; + this.reconcilableResults = []; + this.children = new Set(); } } -/** @internal */ -export class StreamItemsRecord { - errors: Array; - streamRecord: StreamRecord; - path: ReadonlyArray; - items: Array | undefined; - children: Set; - isFinalRecord?: boolean; - isCompletedAsyncIterator?: boolean; - isCompleted: boolean; - filtered: boolean; - - constructor(opts: { streamRecord: StreamRecord; path: Path | undefined }) { - this.streamRecord = opts.streamRecord; - this.path = pathToArray(opts.path); - this.children = new Set(); - this.errors = []; - this.isCompleted = false; - this.filtered = false; - } +export interface CancellableStreamRecord extends SubsequentResultRecord { + earlyReturn: () => Promise; +} + +function isCancellableStreamRecord( + subsequentResultRecord: SubsequentResultRecord, +): subsequentResultRecord is CancellableStreamRecord { + return 'earlyReturn' in subsequentResultRecord; +} + +interface ReconcilableStreamItemsResult { + streamRecord: SubsequentResultRecord; + result: BareStreamItemsResult; + incrementalDataRecords: ReadonlyArray; + errors?: never; +} + +export function isReconcilableStreamItemsResult( + streamItemsResult: StreamItemsResult, +): streamItemsResult is ReconcilableStreamItemsResult { + return streamItemsResult.result !== undefined; +} + +interface TerminatingStreamItemsResult { + streamRecord: SubsequentResultRecord; + result?: never; + incrementalDataRecords?: never; + errors?: never; +} + +interface NonReconcilableStreamItemsResult { + streamRecord: SubsequentResultRecord; + errors: ReadonlyArray; + result?: never; +} + +export type StreamItemsResult = + | ReconcilableStreamItemsResult + | TerminatingStreamItemsResult + | NonReconcilableStreamItemsResult; + +export interface StreamItemsRecord { + streamRecord: SubsequentResultRecord; + result: PromiseOrValue; } export type IncrementalDataRecord = - | InitialResultRecord | DeferredGroupedFieldSetRecord | StreamItemsRecord; -type SubsequentResultRecord = DeferredFragmentRecord | StreamItemsRecord; +export type IncrementalDataRecordResult = + | DeferredGroupedFieldSetResult + | StreamItemsResult; diff --git a/src/execution/__tests__/defer-test.ts b/src/execution/__tests__/defer-test.ts index d03570270a..03bf8126c6 100644 --- a/src/execution/__tests__/defer-test.ts +++ b/src/execution/__tests__/defer-test.ts @@ -367,12 +367,6 @@ describe('Execute: defer directive', () => { }, id: '0', }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - incremental: [ { data: { friends: [{ name: 'Han' }, { name: 'Leia' }, { name: 'C-3PO' }], @@ -380,7 +374,7 @@ describe('Execute: defer directive', () => { id: '1', }, ], - completed: [{ id: '1' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, ]); @@ -674,8 +668,8 @@ describe('Execute: defer directive', () => { hero: {}, }, pending: [ - { id: '0', path: [], label: 'DeferName' }, - { id: '1', path: ['hero'], label: 'DeferID' }, + { id: '0', path: ['hero'], label: 'DeferID' }, + { id: '1', path: [], label: 'DeferName' }, ], hasNext: true, }, @@ -685,17 +679,17 @@ describe('Execute: defer directive', () => { data: { id: '1', }, - id: '1', + id: '0', }, { data: { name: 'Luke', }, - id: '0', + id: '1', subPath: ['hero'], }, ], - completed: [{ id: '1' }, { id: '0' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, ]); @@ -983,37 +977,27 @@ describe('Execute: defer directive', () => { hasNext: true, }, { - pending: [{ id: '1', path: ['hero', 'nestedObject'] }], + pending: [ + { id: '1', path: ['hero', 'nestedObject'] }, + { id: '2', path: ['hero', 'nestedObject', 'deeperObject'] }, + ], incremental: [ { data: { bar: 'bar' }, id: '0', subPath: ['nestedObject', 'deeperObject'], }, - ], - completed: [{ id: '0' }], - hasNext: true, - }, - { - pending: [{ id: '2', path: ['hero', 'nestedObject', 'deeperObject'] }], - incremental: [ { data: { baz: 'baz' }, id: '1', subPath: ['deeperObject'], }, - ], - hasNext: true, - completed: [{ id: '1' }], - }, - { - incremental: [ { data: { bak: 'bak' }, id: '2', }, ], - completed: [{ id: '2' }], + completed: [{ id: '0' }, { id: '1' }, { id: '2' }], hasNext: false, }, ]); @@ -1132,8 +1116,8 @@ describe('Execute: defer directive', () => { }, }, pending: [ - { id: '0', path: [] }, - { id: '1', path: ['a', 'b'] }, + { id: '0', path: ['a', 'b'] }, + { id: '1', path: [] }, ], hasNext: true, }, @@ -1141,14 +1125,14 @@ describe('Execute: defer directive', () => { incremental: [ { data: { e: { f: 'f' } }, - id: '1', + id: '0', }, { data: { g: { h: 'h' } }, - id: '0', + id: '1', }, ], - completed: [{ id: '1' }, { id: '0' }], + completed: [{ id: '0' }, { id: '1' }], hasNext: false, }, ]); @@ -1277,6 +1261,7 @@ describe('Execute: defer directive', () => { }, ], completed: [ + { id: '0' }, { id: '1', errors: [ @@ -1288,7 +1273,6 @@ describe('Execute: defer directive', () => { }, ], }, - { id: '0' }, ], hasNext: false, }, diff --git a/src/execution/__tests__/executor-test.ts b/src/execution/__tests__/executor-test.ts index c29b4ae60d..de33f8c91b 100644 --- a/src/execution/__tests__/executor-test.ts +++ b/src/execution/__tests__/executor-test.ts @@ -635,6 +635,57 @@ describe('Execute: Handles basic execution tasks', () => { expect(isAsyncResolverFinished).to.equal(true); }); + it('handles async bubbling errors combined with non-bubbling errors', async () => { + const schema = new GraphQLSchema({ + query: new GraphQLObjectType({ + name: 'Query', + fields: { + asyncNonNullError: { + type: new GraphQLNonNull(GraphQLString), + async resolve() { + await resolveOnNextTick(); + return null; + }, + }, + asyncError: { + type: GraphQLString, + async resolve() { + await resolveOnNextTick(); + throw new Error('Oops'); + }, + }, + }, + }), + }); + + // Order is important here, as the nullable error should resolve first + const document = parse(` + { + asyncError + asyncNonNullError + } + `); + + const result = execute({ schema, document }); + + expectJSON(await result).toDeepEqual({ + data: null, + errors: [ + { + message: 'Oops', + locations: [{ line: 3, column: 9 }], + path: ['asyncError'], + }, + { + message: + 'Cannot return null for non-nullable field Query.asyncNonNullError.', + locations: [{ line: 4, column: 9 }], + path: ['asyncNonNullError'], + }, + ], + }); + }); + it('Full response path is included for non-nullable fields', () => { const A: GraphQLObjectType = new GraphQLObjectType({ name: 'A', diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index 6e1928f945..522b82f3d4 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -146,11 +146,10 @@ describe('Execute: stream directive', () => { hasNext: true, }, { - incremental: [{ items: ['banana'], id: '0' }], - hasNext: true, - }, - { - incremental: [{ items: ['coconut'], id: '0' }], + incremental: [ + { items: ['banana'], id: '0' }, + { items: ['coconut'], id: '0' }, + ], completed: [{ id: '0' }], hasNext: false, }, @@ -170,15 +169,11 @@ describe('Execute: stream directive', () => { hasNext: true, }, { - incremental: [{ items: ['apple'], id: '0' }], - hasNext: true, - }, - { - incremental: [{ items: ['banana'], id: '0' }], - hasNext: true, - }, - { - incremental: [{ items: ['coconut'], id: '0' }], + incremental: [ + { items: ['apple'], id: '0' }, + { items: ['banana'], id: '0' }, + { items: ['coconut'], id: '0' }, + ], completed: [{ id: '0' }], hasNext: false, }, @@ -228,11 +223,6 @@ describe('Execute: stream directive', () => { items: ['banana'], id: '0', }, - ], - hasNext: true, - }, - { - incremental: [ { items: ['coconut'], id: '0', @@ -297,11 +287,6 @@ describe('Execute: stream directive', () => { items: [['banana', 'banana', 'banana']], id: '0', }, - ], - hasNext: true, - }, - { - incremental: [ { items: [['coconut', 'coconut', 'coconut']], id: '0', @@ -811,7 +796,7 @@ describe('Execute: stream directive', () => { } `); const result = await complete(document, { - nonNullFriendList: () => [friends[0], null], + nonNullFriendList: () => [friends[0], null, friends[1]], }); expectJSON(result).toDeepEqual([ @@ -1954,9 +1939,7 @@ describe('Execute: stream directive', () => { hasNext: true, }); - const result2Promise = iterator.next(); - resolveIterableCompletion(null); - const result2 = await result2Promise; + const result2 = await iterator.next(); expectJSON(result2).toDeepEqual({ value: { pending: [{ id: '2', path: ['friendList', 1], label: 'DeferName' }], @@ -1977,7 +1960,7 @@ describe('Execute: stream directive', () => { }); const result3Promise = iterator.next(); - resolveSlowField('Han'); + resolveIterableCompletion(null); const result3 = await result3Promise; expectJSON(result3).toDeepEqual({ value: { @@ -1986,7 +1969,9 @@ describe('Execute: stream directive', () => { }, done: false, }); - const result4 = await iterator.next(); + const result4Promise = iterator.next(); + resolveSlowField('Han'); + const result4 = await result4Promise; expectJSON(result4).toDeepEqual({ value: { incremental: [ @@ -2077,8 +2062,19 @@ describe('Execute: stream directive', () => { done: false, }); - const result3 = await iterator.next(); + const result3Promise = iterator.next(); + resolveIterableCompletion(null); + const result3 = await result3Promise; expectJSON(result3).toDeepEqual({ + value: { + completed: [{ id: '1' }], + hasNext: true, + }, + done: false, + }); + + const result4 = await iterator.next(); + expectJSON(result4).toDeepEqual({ value: { incremental: [ { @@ -2087,16 +2083,6 @@ describe('Execute: stream directive', () => { }, ], completed: [{ id: '2' }], - hasNext: true, - }, - done: false, - }); - const result4Promise = iterator.next(); - resolveIterableCompletion(null); - const result4 = await result4Promise; - expectJSON(result4).toDeepEqual({ - value: { - completed: [{ id: '1' }], hasNext: false, }, done: false, diff --git a/src/execution/buildFieldPlan.ts b/src/execution/buildFieldPlan.ts index 390e2cf813..970b8d5c46 100644 --- a/src/execution/buildFieldPlan.ts +++ b/src/execution/buildFieldPlan.ts @@ -12,17 +12,12 @@ export interface FieldGroup { export type GroupedFieldSet = Map; -export interface NewGroupedFieldSetDetails { - groupedFieldSet: GroupedFieldSet; - shouldInitiateDefer: boolean; -} - export function buildFieldPlan( fields: Map>, parentDeferUsages: DeferUsageSet = new Set(), ): { groupedFieldSet: GroupedFieldSet; - newGroupedFieldSetDetailsMap: Map; + newGroupedFieldSets: Map; } { const groupedFieldSet = new Map< string, @@ -32,18 +27,15 @@ export function buildFieldPlan( } >(); - const newGroupedFieldSetDetailsMap = new Map< + const newGroupedFieldSets = new Map< DeferUsageSet, - { - groupedFieldSet: Map< - string, - { - fields: Array; - deferUsages: DeferUsageSet; - } - >; - shouldInitiateDefer: boolean; - } + Map< + string, + { + fields: Array; + deferUsages: DeferUsageSet; + } + > >(); const map = new Map< @@ -94,12 +86,8 @@ export function buildFieldPlan( continue; } - let newGroupedFieldSetDetails = getBySet( - newGroupedFieldSetDetailsMap, - deferUsageSet, - ); - let newGroupedFieldSet; - if (newGroupedFieldSetDetails === undefined) { + let newGroupedFieldSet = getBySet(newGroupedFieldSets, deferUsageSet); + if (newGroupedFieldSet === undefined) { newGroupedFieldSet = new Map< string, { @@ -108,19 +96,7 @@ export function buildFieldPlan( knownDeferUsages: DeferUsageSet; } >(); - - newGroupedFieldSetDetails = { - groupedFieldSet: newGroupedFieldSet, - shouldInitiateDefer: Array.from(deferUsageSet).some( - (deferUsage) => !parentDeferUsages.has(deferUsage), - ), - }; - newGroupedFieldSetDetailsMap.set( - deferUsageSet, - newGroupedFieldSetDetails, - ); - } else { - newGroupedFieldSet = newGroupedFieldSetDetails.groupedFieldSet; + newGroupedFieldSets.set(deferUsageSet, newGroupedFieldSet); } let fieldGroup = newGroupedFieldSet.get(responseKey); if (fieldGroup === undefined) { @@ -135,7 +111,7 @@ export function buildFieldPlan( return { groupedFieldSet, - newGroupedFieldSetDetailsMap, + newGroupedFieldSets, }; } diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 767a3f77d1..68037516e1 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -51,23 +51,25 @@ import type { DeferUsageSet, FieldGroup, GroupedFieldSet, - NewGroupedFieldSetDetails, } from './buildFieldPlan.js'; import { buildFieldPlan } from './buildFieldPlan.js'; import type { DeferUsage, FieldDetails } from './collectFields.js'; import { collectFields, collectSubfields } from './collectFields.js'; import type { + CancellableStreamRecord, + DeferredGroupedFieldSetRecord, + DeferredGroupedFieldSetResult, ExecutionResult, ExperimentalIncrementalExecutionResults, IncrementalDataRecord, + StreamItemsRecord, + StreamItemsResult, + SubsequentResultRecord, } from './IncrementalPublisher.js'; import { + buildIncrementalResponse, DeferredFragmentRecord, - DeferredGroupedFieldSetRecord, - IncrementalPublisher, - InitialResultRecord, - StreamItemsRecord, - StreamRecord, + isReconcilableStreamItemsResult, } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; import { @@ -142,7 +144,7 @@ export interface ExecutionContext { fieldResolver: GraphQLFieldResolver; typeResolver: GraphQLTypeResolver; subscribeFieldResolver: GraphQLFieldResolver; - incrementalPublisher: IncrementalPublisher; + cancellableStreams: Set; } export interface ExecutionArgs { @@ -163,6 +165,8 @@ export interface StreamUsage { fieldGroup: FieldGroup; } +type GraphQLWrappedResult = [T, Array]; + const UNEXPECTED_EXPERIMENTAL_DIRECTIVES = 'The provided schema unexpectedly contains experimental directives (@defer or @stream). These directives may only be utilized if experimental execution features are explicitly enabled.'; @@ -255,16 +259,10 @@ export function experimentalExecuteIncrementally( function executeOperation( exeContext: ExecutionContext, ): PromiseOrValue { - const initialResultRecord = new InitialResultRecord(); + const errors: Array = []; try { - const { - operation, - schema, - fragments, - variableValues, - rootValue, - incrementalPublisher, - } = exeContext; + const { operation, schema, fragments, variableValues, rootValue } = + exeContext; const rootType = schema.getRootType(operation.operation); if (rootType == null) { throw new GraphQLError( @@ -280,58 +278,93 @@ function executeOperation( rootType, operation, ); - const { groupedFieldSet, newGroupedFieldSetDetailsMap } = - buildFieldPlan(fields); + const { groupedFieldSet, newGroupedFieldSets } = buildFieldPlan(fields); - const newDeferMap = addNewDeferredFragments( - incrementalPublisher, - newDeferUsages, - initialResultRecord, - ); + const newDeferMap = addNewDeferredFragments(newDeferUsages, new Map()); - const path = undefined; - - const newDeferredGroupedFieldSetRecords = addNewDeferredGroupedFieldSets( - incrementalPublisher, - newGroupedFieldSetDetailsMap, - newDeferMap, - path, - ); - - const result = executeRootGroupedFieldSet( + let graphqlWrappedResult = executeRootGroupedFieldSet( exeContext, operation.operation, rootType, rootValue, groupedFieldSet, - initialResultRecord, + errors, newDeferMap, ); - executeDeferredGroupedFieldSets( + const newDeferredGroupedFieldSetRecords = executeDeferredGroupedFieldSets( exeContext, rootType, rootValue, - path, - newDeferredGroupedFieldSetRecords, + undefined, + undefined, + newGroupedFieldSets, newDeferMap, ); - if (isPromise(result)) { - return result.then( + graphqlWrappedResult = withNewDeferredGroupedFieldSets( + graphqlWrappedResult, + newDeferredGroupedFieldSetRecords, + ); + if (isPromise(graphqlWrappedResult)) { + return graphqlWrappedResult.then( (resolved) => - incrementalPublisher.buildDataResponse(initialResultRecord, resolved), - (error) => - incrementalPublisher.buildErrorResponse(initialResultRecord, error), + buildDataResponse(exeContext, resolved[0], errors, resolved[1]), + (error) => ({ + data: null, + errors: withError(errors, error), + }), ); } - return incrementalPublisher.buildDataResponse(initialResultRecord, result); - } catch (error) { - return exeContext.incrementalPublisher.buildErrorResponse( - initialResultRecord, - error, + return buildDataResponse( + exeContext, + graphqlWrappedResult[0], + errors, + graphqlWrappedResult[1], ); + } catch (error) { + return { data: null, errors: withError(errors, error) }; + } +} + +function withNewDeferredGroupedFieldSets( + result: PromiseOrValue>>, + newDeferredGroupedFieldSetRecords: ReadonlyArray, +): PromiseOrValue>> { + if (isPromise(result)) { + return result.then((resolved) => { + resolved[1].push(...newDeferredGroupedFieldSetRecords); + return resolved; + }); } + + result[1].push(...newDeferredGroupedFieldSetRecords); + return result; +} + +function withError( + errors: Array, + error: GraphQLError, +): ReadonlyArray { + return errors.length === 0 ? [error] : [...errors, error]; +} + +function buildDataResponse( + exeContext: ExecutionContext, + data: ObjMap, + errors: ReadonlyArray, + incrementalDataRecords: ReadonlyArray, +): ExecutionResult | ExperimentalIncrementalExecutionResults { + if (incrementalDataRecords.length === 0) { + return errors.length > 0 ? { errors, data } : { data }; + } + + return buildIncrementalResponse( + exeContext, + data, + errors, + incrementalDataRecords, + ); } /** @@ -435,7 +468,7 @@ export function buildExecutionContext( fieldResolver: fieldResolver ?? defaultFieldResolver, typeResolver: typeResolver ?? defaultTypeResolver, subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, - incrementalPublisher: new IncrementalPublisher(), + cancellableStreams: new Set(), }; } @@ -455,9 +488,9 @@ function executeRootGroupedFieldSet( rootType: GraphQLObjectType, rootValue: unknown, groupedFieldSet: GroupedFieldSet, - initialResultRecord: InitialResultRecord, - newDeferMap: ReadonlyMap, -): PromiseOrValue> { + errors: Array, + deferMap: ReadonlyMap | undefined, +): PromiseOrValue>> { switch (operation) { case OperationTypeNode.QUERY: return executeFields( @@ -466,8 +499,8 @@ function executeRootGroupedFieldSet( rootValue, undefined, groupedFieldSet, - initialResultRecord, - newDeferMap, + errors, + deferMap, ); case OperationTypeNode.MUTATION: return executeFieldsSerially( @@ -476,8 +509,8 @@ function executeRootGroupedFieldSet( rootValue, undefined, groupedFieldSet, - initialResultRecord, - newDeferMap, + errors, + deferMap, ); case OperationTypeNode.SUBSCRIPTION: // TODO: deprecate `subscribe` and move all logic here @@ -488,8 +521,8 @@ function executeRootGroupedFieldSet( rootValue, undefined, groupedFieldSet, - initialResultRecord, - newDeferMap, + errors, + deferMap, ); } } @@ -504,12 +537,12 @@ function executeFieldsSerially( sourceValue: unknown, path: Path | undefined, groupedFieldSet: GroupedFieldSet, - incrementalDataRecord: InitialResultRecord, - deferMap: ReadonlyMap, -): PromiseOrValue> { + errors: Array, + deferMap: ReadonlyMap | undefined, +): PromiseOrValue>> { return promiseReduce( groupedFieldSet, - (results, [responseName, fieldGroup]) => { + (graphqlWrappedResult, [responseName, fieldGroup]) => { const fieldPath = addPath(path, responseName, parentType.name); const result = executeField( exeContext, @@ -517,22 +550,24 @@ function executeFieldsSerially( sourceValue, fieldGroup, fieldPath, - incrementalDataRecord, + errors, deferMap, ); if (result === undefined) { - return results; + return graphqlWrappedResult; } if (isPromise(result)) { - return result.then((resolvedResult) => { - results[responseName] = resolvedResult; - return results; + return result.then((resolved) => { + graphqlWrappedResult[0][responseName] = resolved[0]; + graphqlWrappedResult[1].push(...resolved[1]); + return graphqlWrappedResult; }); } - results[responseName] = result; - return results; + graphqlWrappedResult[0][responseName] = result[0]; + graphqlWrappedResult[1].push(...result[1]); + return graphqlWrappedResult; }, - Object.create(null), + [Object.create(null), []] as GraphQLWrappedResult>, ); } @@ -546,10 +581,14 @@ function executeFields( sourceValue: unknown, path: Path | undefined, groupedFieldSet: GroupedFieldSet, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, -): PromiseOrValue> { + errors: Array, + deferMap: ReadonlyMap | undefined, +): PromiseOrValue>> { const results = Object.create(null); + const graphqlWrappedResult: GraphQLWrappedResult> = [ + results, + [], + ]; let containsPromise = false; try { @@ -561,36 +600,47 @@ function executeFields( sourceValue, fieldGroup, fieldPath, - incrementalDataRecord, + errors, deferMap, ); if (result !== undefined) { - results[responseName] = result; if (isPromise(result)) { + results[responseName] = result.then((resolved) => { + graphqlWrappedResult[1].push(...resolved[1]); + return resolved[0]; + }); containsPromise = true; + } else { + results[responseName] = result[0]; + graphqlWrappedResult[1].push(...result[1]); } } } } catch (error) { if (containsPromise) { // Ensure that any promises returned by other fields are handled, as they may also reject. - return promiseForObject(results).finally(() => { + return promiseForObject(results, () => { + /* noop */ + }).finally(() => { throw error; - }); + }) as never; } throw error; } - // If there are no promises, we can just return the object + // If there are no promises, we can just return the object and any incrementalDataRecords if (!containsPromise) { - return results; + return graphqlWrappedResult; } // Otherwise, results is a map from field name to the result of resolving that // field, which is possibly a promise. Return a promise that will return this // same map, but with any promises replaced with the values they resolved to. - return promiseForObject(results); + return promiseForObject(results, (resolved) => [ + resolved, + graphqlWrappedResult[1], + ]); } function toNodes(fieldGroup: FieldGroup): ReadonlyArray { @@ -609,9 +659,9 @@ function executeField( source: unknown, fieldGroup: FieldGroup, path: Path, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, -): PromiseOrValue { + errors: Array, + deferMap: ReadonlyMap | undefined, +): PromiseOrValue> | undefined { const fieldName = fieldGroup.fields[0].node.name.value; const fieldDef = exeContext.schema.getField(parentType, fieldName); if (!fieldDef) { @@ -655,7 +705,7 @@ function executeField( info, path, result, - incrementalDataRecord, + errors, deferMap, ); } @@ -667,7 +717,7 @@ function executeField( info, path, result, - incrementalDataRecord, + errors, deferMap, ); @@ -675,30 +725,14 @@ function executeField( // Note: we don't rely on a `catch` method, but we do expect "thenable" // to take a second callback for the error case. return completed.then(undefined, (rawError) => { - handleFieldError( - rawError, - exeContext, - returnType, - fieldGroup, - path, - incrementalDataRecord, - ); - exeContext.incrementalPublisher.filter(path, incrementalDataRecord); - return null; + handleFieldError(rawError, returnType, fieldGroup, path, errors); + return [null, []]; }); } return completed; } catch (rawError) { - handleFieldError( - rawError, - exeContext, - returnType, - fieldGroup, - path, - incrementalDataRecord, - ); - exeContext.incrementalPublisher.filter(path, incrementalDataRecord); - return null; + handleFieldError(rawError, returnType, fieldGroup, path, errors); + return [null, []]; } } @@ -731,11 +765,10 @@ export function buildResolveInfo( function handleFieldError( rawError: unknown, - exeContext: ExecutionContext, returnType: GraphQLOutputType, fieldGroup: FieldGroup, path: Path, - incrementalDataRecord: IncrementalDataRecord, + errors: Array, ): void { const error = locatedError(rawError, toNodes(fieldGroup), pathToArray(path)); @@ -747,7 +780,7 @@ function handleFieldError( // Otherwise, error protection is applied, logging the error and resolving // a null value for this field if one is encountered. - exeContext.incrementalPublisher.addFieldError(incrementalDataRecord, error); + errors.push(error); } /** @@ -778,9 +811,9 @@ function completeValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, -): PromiseOrValue { + errors: Array, + deferMap: ReadonlyMap | undefined, +): PromiseOrValue> { // If result is an Error, throw a located error. if (result instanceof Error) { throw result; @@ -796,10 +829,10 @@ function completeValue( info, path, result, - incrementalDataRecord, + errors, deferMap, ); - if (completed === null) { + if ((completed as GraphQLWrappedResult)[0] === null) { throw new Error( `Cannot return null for non-nullable field ${info.parentType.name}.${info.fieldName}.`, ); @@ -809,7 +842,7 @@ function completeValue( // If result value is null or undefined then return null. if (result == null) { - return null; + return [null, []]; } // If field type is List, complete each item in the list with the inner type @@ -821,7 +854,7 @@ function completeValue( info, path, result, - incrementalDataRecord, + errors, deferMap, ); } @@ -829,7 +862,7 @@ function completeValue( // If field type is a leaf type, Scalar or Enum, serialize to a valid value, // returning null if serialization is not possible. if (isLeafType(returnType)) { - return completeLeafValue(returnType, result); + return [completeLeafValue(returnType, result), []]; } // If field type is an abstract type, Interface or Union, determine the @@ -842,7 +875,7 @@ function completeValue( info, path, result, - incrementalDataRecord, + errors, deferMap, ); } @@ -856,7 +889,7 @@ function completeValue( info, path, result, - incrementalDataRecord, + errors, deferMap, ); } @@ -875,9 +908,9 @@ async function completePromisedValue( info: GraphQLResolveInfo, path: Path, result: Promise, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, -): Promise { + errors: Array, + deferMap: ReadonlyMap | undefined, +): Promise> { try { const resolved = await result; let completed = completeValue( @@ -887,24 +920,17 @@ async function completePromisedValue( info, path, resolved, - incrementalDataRecord, + errors, deferMap, ); + if (isPromise(completed)) { completed = await completed; } return completed; } catch (rawError) { - handleFieldError( - rawError, - exeContext, - returnType, - fieldGroup, - path, - incrementalDataRecord, - ); - exeContext.incrementalPublisher.filter(path, incrementalDataRecord); - return null; + handleFieldError(rawError, returnType, fieldGroup, path, errors); + return [null, []]; } } @@ -982,6 +1008,7 @@ function getStreamUsage( return streamUsage; } + /** * Complete a async iterator value by completing the result and calling * recursively until all the results are completed. @@ -993,37 +1020,48 @@ async function completeAsyncIteratorValue( info: GraphQLResolveInfo, path: Path, asyncIterator: AsyncIterator, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, -): Promise> { - const streamUsage = getStreamUsage(exeContext, fieldGroup, path); + errors: Array, + deferMap: ReadonlyMap | undefined, +): Promise>> { let containsPromise = false; const completedResults: Array = []; + const graphqlWrappedResult: GraphQLWrappedResult> = [ + completedResults, + [], + ]; let index = 0; + const streamUsage = getStreamUsage(exeContext, fieldGroup, path); // eslint-disable-next-line no-constant-condition while (true) { if (streamUsage && index >= streamUsage.initialCount) { - const earlyReturn = asyncIterator.return; - const streamRecord = new StreamRecord({ - label: streamUsage.label, + const returnFn = asyncIterator.return; + let streamRecord: SubsequentResultRecord | CancellableStreamRecord; + if (returnFn === undefined) { + streamRecord = { + label: streamUsage.label, + path, + } as SubsequentResultRecord; + } else { + streamRecord = { + label: streamUsage.label, + path, + earlyReturn: returnFn.bind(asyncIterator), + }; + exeContext.cancellableStreams.add(streamRecord); + } + + const firstStreamItems = firstAsyncStreamItems( + streamRecord, path, - earlyReturn: - earlyReturn === undefined - ? undefined - : earlyReturn.bind(asyncIterator), - }); - // eslint-disable-next-line @typescript-eslint/no-floating-promises - executeStreamAsyncIterator( index, asyncIterator, exeContext, streamUsage.fieldGroup, info, itemType, - path, - incrementalDataRecord, - streamRecord, ); + + graphqlWrappedResult[1].push(firstStreamItems); break; } @@ -1032,31 +1070,65 @@ async function completeAsyncIteratorValue( try { // eslint-disable-next-line no-await-in-loop iteration = await asyncIterator.next(); - if (iteration.done) { - break; - } } catch (rawError) { throw locatedError(rawError, toNodes(fieldGroup), pathToArray(path)); } - if ( + // TODO: add test case for stream returning done before initialCount + /* c8 ignore next 3 */ + if (iteration.done) { + break; + } + + const item = iteration.value; + // TODO: add tests for stream backed by asyncIterator that returns a promise + /* c8 ignore start */ + if (isPromise(item)) { + completedResults.push( + completePromisedValue( + exeContext, + itemType, + fieldGroup, + info, + itemPath, + item, + errors, + deferMap, + ).then((resolved) => { + graphqlWrappedResult[1].push(...resolved[1]); + return resolved[0]; + }), + ); + containsPromise = true; + } else if ( + /* c8 ignore stop */ completeListItemValue( - iteration.value, + item, completedResults, + graphqlWrappedResult, exeContext, itemType, fieldGroup, info, itemPath, - incrementalDataRecord, + errors, deferMap, ) + // TODO: add tests for stream backed by asyncIterator that completes to a promise + /* c8 ignore start */ ) { containsPromise = true; } - index += 1; + /* c8 ignore stop */ + index++; } - return containsPromise ? Promise.all(completedResults) : completedResults; + + return containsPromise + ? /* c8 ignore start */ Promise.all(completedResults).then((resolved) => [ + resolved, + graphqlWrappedResult[1], + ]) + : /* c8 ignore stop */ graphqlWrappedResult; } /** @@ -1070,9 +1142,9 @@ function completeListValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, -): PromiseOrValue> { + errors: Array, + deferMap: ReadonlyMap | undefined, +): PromiseOrValue>> { const itemType = returnType.ofType; if (isAsyncIterable(result)) { @@ -1085,7 +1157,7 @@ function completeListValue( info, path, asyncIterator, - incrementalDataRecord, + errors, deferMap, ); } @@ -1096,65 +1168,90 @@ function completeListValue( ); } - const streamUsage = getStreamUsage(exeContext, fieldGroup, path); - // This is specified as a simple map, however we're optimizing the path // where the list contains no Promises by avoiding creating another Promise. let containsPromise = false; - let currentParents = incrementalDataRecord; const completedResults: Array = []; + const graphqlWrappedResult: GraphQLWrappedResult> = [ + completedResults, + [], + ]; let index = 0; - let streamRecord: StreamRecord | undefined; - for (const item of result) { - // No need to modify the info object containing the path, - // since from here on it is not ever accessed by resolver functions. - const itemPath = addPath(path, index, undefined); + const streamUsage = getStreamUsage(exeContext, fieldGroup, path); + const iterator = result[Symbol.iterator](); + let iteration = iterator.next(); + while (!iteration.done) { + const item = iteration.value; if (streamUsage && index >= streamUsage.initialCount) { - if (streamRecord === undefined) { - streamRecord = new StreamRecord({ label: streamUsage.label, path }); - } - currentParents = executeStreamField( + const streamRecord: SubsequentResultRecord = { + label: streamUsage.label, path, - itemPath, + }; + + const firstStreamItems = firstSyncStreamItems( + streamRecord, item, + index, + iterator, exeContext, streamUsage.fieldGroup, info, itemType, - currentParents, - streamRecord, ); - index++; - continue; + + graphqlWrappedResult[1].push(firstStreamItems); + break; } - if ( + // No need to modify the info object containing the path, + // since from here on it is not ever accessed by resolver functions. + const itemPath = addPath(path, index, undefined); + + if (isPromise(item)) { + completedResults.push( + completePromisedValue( + exeContext, + itemType, + fieldGroup, + info, + itemPath, + item, + errors, + deferMap, + ).then((resolved) => { + graphqlWrappedResult[1].push(...resolved[1]); + return resolved[0]; + }), + ); + containsPromise = true; + } else if ( completeListItemValue( item, completedResults, + graphqlWrappedResult, exeContext, itemType, fieldGroup, info, itemPath, - incrementalDataRecord, + errors, deferMap, ) ) { containsPromise = true; } - index++; - } - if (streamRecord !== undefined) { - exeContext.incrementalPublisher.setIsFinalRecord( - currentParents as StreamItemsRecord, - ); + iteration = iterator.next(); } - return containsPromise ? Promise.all(completedResults) : completedResults; + return containsPromise + ? Promise.all(completedResults).then((resolved) => [ + resolved, + graphqlWrappedResult[1], + ]) + : graphqlWrappedResult; } /** @@ -1165,31 +1262,15 @@ function completeListValue( function completeListItemValue( item: unknown, completedResults: Array, + parent: GraphQLWrappedResult>, exeContext: ExecutionContext, itemType: GraphQLOutputType, fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemPath: Path, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, + errors: Array, + deferMap: ReadonlyMap | undefined, ): boolean { - if (isPromise(item)) { - completedResults.push( - completePromisedValue( - exeContext, - itemType, - fieldGroup, - info, - itemPath, - item, - incrementalDataRecord, - deferMap, - ), - ); - - return true; - } - try { const completedItem = completeValue( exeContext, @@ -1198,7 +1279,7 @@ function completeListItemValue( info, itemPath, item, - incrementalDataRecord, + errors, deferMap, ); @@ -1206,40 +1287,26 @@ function completeListItemValue( // Note: we don't rely on a `catch` method, but we do expect "thenable" // to take a second callback for the error case. completedResults.push( - completedItem.then(undefined, (rawError) => { - handleFieldError( - rawError, - exeContext, - itemType, - fieldGroup, - itemPath, - incrementalDataRecord, - ); - exeContext.incrementalPublisher.filter( - itemPath, - incrementalDataRecord, - ); - return null; - }), + completedItem.then( + (resolved) => { + parent[1].push(...resolved[1]); + return resolved[0]; + }, + (rawError) => { + handleFieldError(rawError, itemType, fieldGroup, itemPath, errors); + return null; + }, + ), ); - return true; } - completedResults.push(completedItem); + completedResults.push(completedItem[0]); + parent[1].push(...completedItem[1]); } catch (rawError) { - handleFieldError( - rawError, - exeContext, - itemType, - fieldGroup, - itemPath, - incrementalDataRecord, - ); - exeContext.incrementalPublisher.filter(itemPath, incrementalDataRecord); + handleFieldError(rawError, itemType, fieldGroup, itemPath, errors); completedResults.push(null); } - return false; } @@ -1272,9 +1339,9 @@ function completeAbstractValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, -): PromiseOrValue> { + errors: Array, + deferMap: ReadonlyMap | undefined, +): PromiseOrValue>> { const resolveTypeFn = returnType.resolveType ?? exeContext.typeResolver; const contextValue = exeContext.contextValue; const runtimeType = resolveTypeFn(result, contextValue, info, returnType); @@ -1295,7 +1362,7 @@ function completeAbstractValue( info, path, result, - incrementalDataRecord, + errors, deferMap, ), ); @@ -1315,7 +1382,7 @@ function completeAbstractValue( info, path, result, - incrementalDataRecord, + errors, deferMap, ); } @@ -1385,9 +1452,9 @@ function completeObjectValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, -): PromiseOrValue> { + errors: Array, + deferMap: ReadonlyMap | undefined, +): PromiseOrValue>> { // If there is an isTypeOf predicate function, call it with the // current result. If isTypeOf returns false, then raise an error rather // than continuing execution. @@ -1405,7 +1472,7 @@ function completeObjectValue( fieldGroup, path, result, - incrementalDataRecord, + errors, deferMap, ); }); @@ -1422,7 +1489,7 @@ function completeObjectValue( fieldGroup, path, result, - incrementalDataRecord, + errors, deferMap, ); } @@ -1456,46 +1523,25 @@ function invalidReturnTypeError( * */ function addNewDeferredFragments( - incrementalPublisher: IncrementalPublisher, newDeferUsages: ReadonlyArray, - incrementalDataRecord: IncrementalDataRecord, - deferMap?: ReadonlyMap, + newDeferMap: Map, path?: Path | undefined, ): ReadonlyMap { - if (newDeferUsages.length === 0) { - // Given no DeferUsages, return the existing map, creating one if necessary. - return deferMap ?? new Map(); - } - - // Create a copy of the old map. - const newDeferMap = - deferMap === undefined - ? new Map() - : new Map(deferMap); - // For each new deferUsage object: for (const newDeferUsage of newDeferUsages) { const parentDeferUsage = newDeferUsage.parentDeferUsage; - // If the parent defer usage is not defined, the parent result record is either: - // - the InitialResultRecord, or - // - a StreamItemsRecord, as `@defer` may be nested under `@stream`. const parent = parentDeferUsage === undefined - ? (incrementalDataRecord as InitialResultRecord | StreamItemsRecord) + ? undefined : deferredFragmentRecordFromDeferUsage(parentDeferUsage, newDeferMap); // Instantiate the new record. const deferredFragmentRecord = new DeferredFragmentRecord({ path, label: newDeferUsage.label, - }); - - // Report the new record to the Incremental Publisher. - incrementalPublisher.reportNewDeferFragmentRecord( - deferredFragmentRecord, parent, - ); + }); // Update the map. newDeferMap.set(newDeferUsage, deferredFragmentRecord); @@ -1512,74 +1558,22 @@ function deferredFragmentRecordFromDeferUsage( return deferMap.get(deferUsage)!; } -function addNewDeferredGroupedFieldSets( - incrementalPublisher: IncrementalPublisher, - newGroupedFieldSetDetailsMap: Map, - deferMap: ReadonlyMap, - path?: Path | undefined, -): ReadonlyArray { - const newDeferredGroupedFieldSetRecords: Array = - []; - - for (const [ - deferUsageSet, - { groupedFieldSet, shouldInitiateDefer }, - ] of newGroupedFieldSetDetailsMap) { - const deferredFragmentRecords = getDeferredFragmentRecords( - deferUsageSet, - deferMap, - ); - const deferredGroupedFieldSetRecord = new DeferredGroupedFieldSetRecord({ - path, - deferredFragmentRecords, - groupedFieldSet, - shouldInitiateDefer, - }); - incrementalPublisher.reportNewDeferredGroupedFieldSetRecord( - deferredGroupedFieldSetRecord, - ); - newDeferredGroupedFieldSetRecords.push(deferredGroupedFieldSetRecord); - } - - return newDeferredGroupedFieldSetRecords; -} - -function getDeferredFragmentRecords( - deferUsages: DeferUsageSet, - deferMap: ReadonlyMap, -): ReadonlyArray { - return Array.from(deferUsages).map((deferUsage) => - deferredFragmentRecordFromDeferUsage(deferUsage, deferMap), - ); -} - function collectAndExecuteSubfields( exeContext: ExecutionContext, returnType: GraphQLObjectType, fieldGroup: FieldGroup, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord, - deferMap: ReadonlyMap, -): PromiseOrValue> { + errors: Array, + deferMap: ReadonlyMap | undefined, +): PromiseOrValue>> { // Collect sub-fields to execute to complete this value. - const { groupedFieldSet, newGroupedFieldSetDetailsMap, newDeferUsages } = + const { groupedFieldSet, newGroupedFieldSets, newDeferUsages } = buildSubFieldPlan(exeContext, returnType, fieldGroup); - const incrementalPublisher = exeContext.incrementalPublisher; - const newDeferMap = addNewDeferredFragments( - incrementalPublisher, newDeferUsages, - incrementalDataRecord, - deferMap, - path, - ); - - const newDeferredGroupedFieldSetRecords = addNewDeferredGroupedFieldSets( - incrementalPublisher, - newGroupedFieldSetDetailsMap, - newDeferMap, + new Map(deferMap), path, ); @@ -1589,20 +1583,24 @@ function collectAndExecuteSubfields( result, path, groupedFieldSet, - incrementalDataRecord, + errors, newDeferMap, ); - executeDeferredGroupedFieldSets( + const newDeferredGroupedFieldSetRecords = executeDeferredGroupedFieldSets( exeContext, returnType, result, path, - newDeferredGroupedFieldSetRecords, + fieldGroup.deferUsages, + newGroupedFieldSets, newDeferMap, ); - return subFields; + return withNewDeferredGroupedFieldSets( + subFields, + newDeferredGroupedFieldSetRecords, + ); } /** @@ -1902,343 +1900,407 @@ function executeDeferredGroupedFieldSets( parentType: GraphQLObjectType, sourceValue: unknown, path: Path | undefined, - newDeferredGroupedFieldSetRecords: ReadonlyArray, + parentDeferUsages: DeferUsageSet | undefined, + newGroupedFieldSets: Map, deferMap: ReadonlyMap, -): void { - for (const deferredGroupedFieldSetRecord of newDeferredGroupedFieldSetRecords) { - if (deferredGroupedFieldSetRecord.shouldInitiateDefer) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - Promise.resolve().then(() => - executeDeferredGroupedFieldSet( - exeContext, - parentType, - sourceValue, - path, - deferredGroupedFieldSetRecord, - deferMap, - ), - ); - continue; - } +): ReadonlyArray { + const newDeferredGroupedFieldSetRecords: Array = + []; - executeDeferredGroupedFieldSet( - exeContext, - parentType, - sourceValue, - path, - deferredGroupedFieldSetRecord, + for (const [deferUsageSet, groupedFieldSet] of newGroupedFieldSets) { + const deferredFragmentRecords = getDeferredFragmentRecords( + deferUsageSet, deferMap, ); + + const executor = () => + executeDeferredGroupedFieldSet( + deferredFragmentRecords, + exeContext, + parentType, + sourceValue, + path, + groupedFieldSet, + [], + deferMap, + ); + + const deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord = { + deferredFragmentRecords, + result: shouldDefer(parentDeferUsages, deferUsageSet) + ? Promise.resolve().then(executor) + : executor(), + }; + + newDeferredGroupedFieldSetRecords.push(deferredGroupedFieldSetRecord); } + + return newDeferredGroupedFieldSetRecords; +} + +function shouldDefer( + parentDeferUsages: undefined | DeferUsageSet, + deferUsages: DeferUsageSet, +): boolean { + // If we have a new child defer usage, defer. + // Otherwise, this defer usage was already deferred when it was initially + // encountered, and is now in the midst of executing early, so the new + // deferred grouped fields set can be executed immediately. + return ( + parentDeferUsages === undefined || + !Array.from(deferUsages).every((deferUsage) => + parentDeferUsages.has(deferUsage), + ) + ); } function executeDeferredGroupedFieldSet( + deferredFragmentRecords: ReadonlyArray, exeContext: ExecutionContext, parentType: GraphQLObjectType, sourceValue: unknown, path: Path | undefined, - deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord, + groupedFieldSet: GroupedFieldSet, + errors: Array, deferMap: ReadonlyMap, -): void { +): PromiseOrValue { + let result; try { - const incrementalResult = executeFields( + result = executeFields( exeContext, parentType, sourceValue, path, - deferredGroupedFieldSetRecord.groupedFieldSet, - deferredGroupedFieldSetRecord, + groupedFieldSet, + errors, deferMap, ); - - if (isPromise(incrementalResult)) { - incrementalResult.then( - (resolved) => - exeContext.incrementalPublisher.completeDeferredGroupedFieldSet( - deferredGroupedFieldSetRecord, - resolved, - ), - (error) => - exeContext.incrementalPublisher.markErroredDeferredGroupedFieldSet( - deferredGroupedFieldSetRecord, - error, - ), - ); - return; - } - - exeContext.incrementalPublisher.completeDeferredGroupedFieldSet( - deferredGroupedFieldSetRecord, - incrementalResult, - ); } catch (error) { - exeContext.incrementalPublisher.markErroredDeferredGroupedFieldSet( - deferredGroupedFieldSetRecord, - error, + return { + deferredFragmentRecords, + path: pathToArray(path), + errors: withError(errors, error), + }; + } + + if (isPromise(result)) { + return result.then( + (resolved) => + buildDeferredGroupedFieldSetResult( + errors, + deferredFragmentRecords, + path, + resolved, + ), + (error) => ({ + deferredFragmentRecords, + path: pathToArray(path), + errors: withError(errors, error), + }), ); } + + return buildDeferredGroupedFieldSetResult( + errors, + deferredFragmentRecords, + path, + result, + ); } -function executeStreamField( - path: Path, - itemPath: Path, - item: PromiseOrValue, +function buildDeferredGroupedFieldSetResult( + errors: ReadonlyArray, + deferredFragmentRecords: ReadonlyArray, + path: Path | undefined, + result: GraphQLWrappedResult>, +): DeferredGroupedFieldSetResult { + return { + deferredFragmentRecords, + path: pathToArray(path), + result: + errors.length === 0 ? { data: result[0] } : { data: result[0], errors }, + incrementalDataRecords: result[1], + }; +} + +function getDeferredFragmentRecords( + deferUsages: DeferUsageSet, + deferMap: ReadonlyMap, +): ReadonlyArray { + return Array.from(deferUsages).map((deferUsage) => + deferredFragmentRecordFromDeferUsage(deferUsage, deferMap), + ); +} + +function firstSyncStreamItems( + streamRecord: SubsequentResultRecord, + initialItem: PromiseOrValue, + initialIndex: number, + iterator: Iterator, exeContext: ExecutionContext, fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemType: GraphQLOutputType, - incrementalDataRecord: IncrementalDataRecord, - streamRecord: StreamRecord, ): StreamItemsRecord { - const incrementalPublisher = exeContext.incrementalPublisher; - const streamItemsRecord = new StreamItemsRecord({ - streamRecord, - path: itemPath, - }); - incrementalPublisher.reportNewStreamItemsRecord( - streamItemsRecord, - incrementalDataRecord, - ); + const path = streamRecord.path; + const initialPath = addPath(path, initialIndex, undefined); - if (isPromise(item)) { - completePromisedValue( - exeContext, - itemType, - fieldGroup, - info, - itemPath, - item, - streamItemsRecord, - new Map(), - ).then( - (value) => - incrementalPublisher.completeStreamItemsRecord(streamItemsRecord, [ - value, - ]), - (error) => { - incrementalPublisher.filter(path, streamItemsRecord); - incrementalPublisher.markErroredStreamItemsRecord( - streamItemsRecord, - error, - ); - }, - ); - - return streamItemsRecord; - } - - let completedItem: PromiseOrValue; - try { - try { - completedItem = completeValue( + const firstStreamItems: StreamItemsRecord = { + streamRecord, + result: Promise.resolve().then(() => { + let result = completeStreamItems( + streamRecord, + initialPath, + initialItem, exeContext, - itemType, + [], fieldGroup, info, - itemPath, - item, - streamItemsRecord, - new Map(), - ); - } catch (rawError) { - handleFieldError( - rawError, - exeContext, itemType, - fieldGroup, - itemPath, - streamItemsRecord, ); - completedItem = null; - incrementalPublisher.filter(itemPath, streamItemsRecord); - } - } catch (error) { - incrementalPublisher.filter(path, streamItemsRecord); - incrementalPublisher.markErroredStreamItemsRecord(streamItemsRecord, error); - return streamItemsRecord; - } - - if (isPromise(completedItem)) { - completedItem - .then(undefined, (rawError) => { - handleFieldError( - rawError, + const results = [result]; + let currentIndex = initialIndex; + let iteration = iterator.next(); + let erroredSynchronously = false; + while (!iteration.done) { + if (!isPromise(result) && !isReconcilableStreamItemsResult(result)) { + erroredSynchronously = true; + break; + } + const item = iteration.value; + currentIndex++; + const currentPath = addPath(path, currentIndex, undefined); + result = completeStreamItems( + streamRecord, + currentPath, + item, exeContext, - itemType, + [], fieldGroup, - itemPath, - streamItemsRecord, + info, + itemType, ); - incrementalPublisher.filter(itemPath, streamItemsRecord); - return null; - }) - .then( - (value) => - incrementalPublisher.completeStreamItemsRecord(streamItemsRecord, [ - value, - ]), - (error) => { - incrementalPublisher.filter(path, streamItemsRecord); - incrementalPublisher.markErroredStreamItemsRecord( - streamItemsRecord, - error, - ); - }, - ); + results.push(result); + iteration = iterator.next(); + } + + currentIndex = results.length - 1; + // If a non-reconcilable stream items result was encountered, then the stream terminates in error. + // Otherwise, add a stream terminator. + let currentResult = erroredSynchronously + ? results[currentIndex] + : prependNextStreamItems(results[currentIndex], { + streamRecord, + result: { streamRecord }, + }); + + while (currentIndex-- > 0) { + currentResult = prependNextStreamItems(results[currentIndex], { + streamRecord, + result: currentResult, + }); + } + + return currentResult; + }), + }; + return firstStreamItems; +} - return streamItemsRecord; +function prependNextStreamItems( + result: PromiseOrValue, + nextStreamItems: StreamItemsRecord, +): PromiseOrValue { + if (isPromise(result)) { + return result.then((resolved) => + prependNextResolvedStreamItems(resolved, nextStreamItems), + ); } + return prependNextResolvedStreamItems(result, nextStreamItems); +} - incrementalPublisher.completeStreamItemsRecord(streamItemsRecord, [ - completedItem, - ]); - return streamItemsRecord; +function prependNextResolvedStreamItems( + result: StreamItemsResult, + nextStreamItems: StreamItemsRecord, +): StreamItemsResult { + return isReconcilableStreamItemsResult(result) + ? { + ...result, + incrementalDataRecords: [ + nextStreamItems, + ...result.incrementalDataRecords, + ], + } + : result; } -async function executeStreamAsyncIteratorItem( +function firstAsyncStreamItems( + streamRecord: SubsequentResultRecord, + path: Path, + initialIndex: number, asyncIterator: AsyncIterator, exeContext: ExecutionContext, fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemType: GraphQLOutputType, - streamItemsRecord: StreamItemsRecord, - itemPath: Path, -): Promise> { - let item; - try { - const iteration = await asyncIterator.next(); - if (streamItemsRecord.streamRecord.errors.length > 0) { - return { done: true, value: undefined }; - } - if (iteration.done) { - exeContext.incrementalPublisher.setIsCompletedAsyncIterator( - streamItemsRecord, - ); - return { done: true, value: undefined }; - } - item = iteration.value; - } catch (rawError) { - throw locatedError( - rawError, - toNodes(fieldGroup), - streamItemsRecord.streamRecord.path, - ); - } - let completedItem; - try { - completedItem = completeValue( +): StreamItemsRecord { + const firstStreamItems: StreamItemsRecord = { + streamRecord, + result: getNextAsyncStreamItemsResult( + streamRecord, + path, + initialIndex, + asyncIterator, exeContext, - itemType, fieldGroup, info, - itemPath, - item, - streamItemsRecord, - new Map(), - ); - - if (isPromise(completedItem)) { - completedItem = completedItem.then(undefined, (rawError) => { - handleFieldError( - rawError, - exeContext, - itemType, - fieldGroup, - itemPath, - streamItemsRecord, - ); - exeContext.incrementalPublisher.filter(itemPath, streamItemsRecord); - return null; - }); - } - return { done: false, value: completedItem }; - } catch (rawError) { - handleFieldError( - rawError, - exeContext, itemType, - fieldGroup, - itemPath, - streamItemsRecord, - ); - exeContext.incrementalPublisher.filter(itemPath, streamItemsRecord); - return { done: false, value: null }; - } + ), + }; + return firstStreamItems; } -async function executeStreamAsyncIterator( - initialIndex: number, +async function getNextAsyncStreamItemsResult( + streamRecord: SubsequentResultRecord, + path: Path, + index: number, asyncIterator: AsyncIterator, exeContext: ExecutionContext, fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemType: GraphQLOutputType, - path: Path, - incrementalDataRecord: IncrementalDataRecord, - streamRecord: StreamRecord, -): Promise { - const incrementalPublisher = exeContext.incrementalPublisher; - let index = initialIndex; - let currentIncrementalDataRecord = incrementalDataRecord; - // eslint-disable-next-line no-constant-condition - while (true) { - const itemPath = addPath(path, index, undefined); - const streamItemsRecord = new StreamItemsRecord({ +): Promise { + let iteration; + try { + iteration = await asyncIterator.next(); + } catch (error) { + return { streamRecord, - path: itemPath, - }); - incrementalPublisher.reportNewStreamItemsRecord( - streamItemsRecord, - currentIncrementalDataRecord, + errors: [locatedError(error, toNodes(fieldGroup), pathToArray(path))], + }; + } + + if (iteration.done) { + return { streamRecord }; + } + + const itemPath = addPath(path, index, undefined); + + const result = completeStreamItems( + streamRecord, + itemPath, + iteration.value, + exeContext, + [], + fieldGroup, + info, + itemType, + ); + + const nextStreamItems: StreamItemsRecord = { + streamRecord, + result: getNextAsyncStreamItemsResult( + streamRecord, + path, + index, + asyncIterator, + exeContext, + fieldGroup, + info, + itemType, + ), + }; + + return prependNextStreamItems(result, nextStreamItems); +} + +function completeStreamItems( + streamRecord: SubsequentResultRecord, + itemPath: Path, + item: unknown, + exeContext: ExecutionContext, + errors: Array, + fieldGroup: FieldGroup, + info: GraphQLResolveInfo, + itemType: GraphQLOutputType, +): PromiseOrValue { + if (isPromise(item)) { + return completePromisedValue( + exeContext, + itemType, + fieldGroup, + info, + itemPath, + item, + errors, + new Map(), + ).then( + (resolvedItem) => + buildStreamItemsResult(errors, streamRecord, resolvedItem), + (error) => ({ + streamRecord, + errors: withError(errors, error), + }), ); + } - let iteration; + let result: PromiseOrValue>; + try { try { - // eslint-disable-next-line no-await-in-loop - iteration = await executeStreamAsyncIteratorItem( - asyncIterator, + result = completeValue( exeContext, + itemType, fieldGroup, info, - itemType, - streamItemsRecord, itemPath, + item, + errors, + new Map(), ); - } catch (error) { - incrementalPublisher.filter(path, streamItemsRecord); - incrementalPublisher.markErroredStreamItemsRecord( - streamItemsRecord, - error, - ); - return; + } catch (rawError) { + handleFieldError(rawError, itemType, fieldGroup, itemPath, errors); + result = [null, []]; } + } catch (error) { + return { + streamRecord, + errors: withError(errors, error), + }; + } - const { done, value: completedItem } = iteration; - - if (isPromise(completedItem)) { - completedItem.then( - (value) => - incrementalPublisher.completeStreamItemsRecord(streamItemsRecord, [ - value, - ]), - (error) => { - incrementalPublisher.filter(path, streamItemsRecord); - incrementalPublisher.markErroredStreamItemsRecord( - streamItemsRecord, - error, - ); - }, + if (isPromise(result)) { + return result + .then(undefined, (rawError) => { + handleFieldError(rawError, itemType, fieldGroup, itemPath, errors); + return [null, []] as GraphQLWrappedResult; + }) + .then( + (resolvedItem) => + buildStreamItemsResult(errors, streamRecord, resolvedItem), + (error) => ({ + streamRecord, + errors: withError(errors, error), + }), ); - } else { - incrementalPublisher.completeStreamItemsRecord(streamItemsRecord, [ - completedItem, - ]); - } - - if (done) { - break; - } - currentIncrementalDataRecord = streamItemsRecord; - index++; } + + return buildStreamItemsResult(errors, streamRecord, result); +} + +function buildStreamItemsResult( + errors: ReadonlyArray, + streamRecord: SubsequentResultRecord, + result: GraphQLWrappedResult, +): StreamItemsResult { + return { + streamRecord, + result: + errors.length === 0 + ? { items: [result[0]] } + : { + items: [result[0]], + errors: [...errors], + }, + incrementalDataRecords: result[1], + }; } diff --git a/src/jsutils/promiseForObject.ts b/src/jsutils/promiseForObject.ts index ff48d9f218..25b3413923 100644 --- a/src/jsutils/promiseForObject.ts +++ b/src/jsutils/promiseForObject.ts @@ -7,9 +7,10 @@ import type { ObjMap } from './ObjMap.js'; * This is akin to bluebird's `Promise.props`, but implemented only using * `Promise.all` so it will work with any implementation of ES6 promises. */ -export async function promiseForObject( +export async function promiseForObject( object: ObjMap>, -): Promise> { + callback: (object: ObjMap) => U, +): Promise { const keys = Object.keys(object); const values = Object.values(object); @@ -18,5 +19,5 @@ export async function promiseForObject( for (let i = 0; i < keys.length; ++i) { resolvedObject[keys[i]] = resolvedValues[i]; } - return resolvedObject; + return callback(resolvedObject); }