Skip to content

Commit

Permalink
incremental: handle Stream as stream rather than linked list (#4098)
Browse files Browse the repository at this point in the history
The incremental graph can handle a stream as a stream, rather than
creating a linked list where each incremental data record also includes
the next record in addition to any new defers and/or streams.

Enables easily batching all available stream items within the same
incremental entry.

Depends on #4094
  • Loading branch information
yaacovCR committed Jun 12, 2024
1 parent 07d5025 commit 89f9223
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 289 deletions.
97 changes: 82 additions & 15 deletions src/execution/IncrementalGraph.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { isPromise } from '../jsutils/isPromise.js';
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';

import type { GraphQLError } from '../error/GraphQLError.js';

import type {
DeferredFragmentRecord,
DeferredGroupedFieldSetRecord,
IncrementalDataRecord,
IncrementalDataRecordResult,
ReconcilableDeferredGroupedFieldSetResult,
StreamItemsRecord,
StreamItemRecord,
StreamRecord,
SubsequentResultRecord,
} from './types.js';
Expand All @@ -27,9 +29,9 @@ function isDeferredFragmentNode(
}

function isStreamNode(
subsequentResultNode: SubsequentResultNode,
): subsequentResultNode is StreamRecord {
return 'path' in subsequentResultNode;
record: SubsequentResultNode | IncrementalDataRecord,
): record is StreamRecord {
return 'streamItemQueue' in record;
}

type SubsequentResultNode = DeferredFragmentNode | StreamRecord;
Expand Down Expand Up @@ -67,7 +69,7 @@ export class IncrementalGraph {
if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) {
this._addDeferredGroupedFieldSetRecord(incrementalDataRecord);
} else {
this._addStreamItemsRecord(incrementalDataRecord);
this._addStreamRecord(incrementalDataRecord);
}
}
}
Expand Down Expand Up @@ -95,6 +97,7 @@ export class IncrementalGraph {
if (isStreamNode(node)) {
this._pending.add(node);
newPending.push(node);
this._newIncrementalDataRecords.add(node);
} else if (node.deferredGroupedFieldSetRecords.size > 0) {
for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) {
this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode);
Expand All @@ -110,12 +113,20 @@ export class IncrementalGraph {
this._newPending.clear();

for (const incrementalDataRecord of this._newIncrementalDataRecords) {
const result = incrementalDataRecord.result.value;
if (isPromise(result)) {
if (isStreamNode(incrementalDataRecord)) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
result.then((resolved) => this._enqueue(resolved));
this._onStreamItems(
incrementalDataRecord,
incrementalDataRecord.streamItemQueue,
);
} else {
this._enqueue(result);
const result = incrementalDataRecord.result.value;
if (isPromise(result)) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
result.then((resolved) => this._enqueue(resolved));
} else {
this._enqueue(result);
}
}
}
this._newIncrementalDataRecords.clear();
Expand Down Expand Up @@ -246,12 +257,8 @@ export class IncrementalGraph {
}
}

private _addStreamItemsRecord(streamItemsRecord: StreamItemsRecord): void {
const streamRecord = streamItemsRecord.streamRecord;
if (!this._pending.has(streamRecord)) {
this._newPending.add(streamRecord);
}
this._newIncrementalDataRecords.add(streamItemsRecord);
private _addStreamRecord(streamRecord: StreamRecord): void {
this._newPending.add(streamRecord);
}

private _addDeferredFragmentNode(
Expand Down Expand Up @@ -283,6 +290,66 @@ export class IncrementalGraph {
return deferredFragmentNode;
}

private async _onStreamItems(
streamRecord: StreamRecord,
streamItemQueue: Array<StreamItemRecord>,
): Promise<void> {
let items: Array<unknown> = [];
let errors: Array<GraphQLError> = [];
let incrementalDataRecords: Array<IncrementalDataRecord> = [];
let streamItemRecord: StreamItemRecord | undefined;
while ((streamItemRecord = streamItemQueue.shift()) !== undefined) {
let result = streamItemRecord.value;
if (isPromise(result)) {
if (items.length > 0) {
this._enqueue({
streamRecord,
result:
// TODO add additional test case or rework for coverage
errors.length > 0 /* c8 ignore start */
? { items, errors } /* c8 ignore stop */
: { items },
incrementalDataRecords,
});
items = [];
errors = [];
incrementalDataRecords = [];
}
// eslint-disable-next-line no-await-in-loop
result = await result;
// wait an additional tick to coalesce resolving additional promises
// within the queue
// eslint-disable-next-line no-await-in-loop
await Promise.resolve();
}
if (result.item === undefined) {
if (items.length > 0) {
this._enqueue({
streamRecord,
result: errors.length > 0 ? { items, errors } : { items },
incrementalDataRecords,
});
}
this._enqueue(
result.errors === undefined
? { streamRecord }
: {
streamRecord,
errors: result.errors,
},
);
return;
}
items.push(result.item);
if (result.errors !== undefined) {
errors.push(...result.errors);
}
if (result.incrementalDataRecords !== undefined) {
incrementalDataRecords.push(...result.incrementalDataRecords);
}
}
}

private *_yieldCurrentCompletedIncrementalData(
first: IncrementalDataRecordResult,
): Generator<IncrementalDataRecordResult> {
Expand Down
Loading

0 comments on commit 89f9223

Please sign in to comment.