Skip to content

Commit

Permalink
remove changes to stream from this PR
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed May 30, 2024
1 parent e3b52c3 commit 1aaaa3e
Show file tree
Hide file tree
Showing 5 changed files with 303 additions and 235 deletions.
113 changes: 31 additions & 82 deletions src/execution/IncrementalGraph.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import { isPromise } from '../jsutils/isPromise.js';
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';

import type { GraphQLError } from '../error/GraphQLError.js';

import type {
CompletedDeferredGroupedFieldSet,
CompletedIncrementalData,
CompletedReconcilableDeferredGroupedFieldSet,
DeferredFragmentRecord,
DeferredGroupedFieldSetRecord,
IncrementalDataRecord,
StreamItemRecord,
StreamItemsRecord,
StreamRecord,
SubsequentResultRecord,
} from './types.js';
import { isDeferredGroupedFieldSetRecord } from './types.js';
import {
isDeferredGroupedFieldSetRecord,
isStreamItemsRecord,
} from './types.js';

interface DeferredFragmentNode {
deferredFragmentRecord: DeferredFragmentRecord;
Expand All @@ -30,9 +31,9 @@ function isDeferredFragmentNode(
}

function isStreamNode(
record: SubsequentResultNode | IncrementalDataRecord,
): record is StreamRecord {
return 'streamItemRecords' in record;
subsequentResultNode: SubsequentResultNode,
): subsequentResultNode is StreamRecord {
return 'path' in subsequentResultNode;
}

type SubsequentResultNode = DeferredFragmentNode | StreamRecord;
Expand Down Expand Up @@ -70,7 +71,7 @@ export class IncrementalGraph {
if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) {
this._addDeferredGroupedFieldSetRecord(incrementalDataRecord);
} else {
this._addStreamRecord(incrementalDataRecord);
this._addStreamItemsRecord(incrementalDataRecord);
}
}
}
Expand Down Expand Up @@ -100,7 +101,6 @@ export class IncrementalGraph {
if (isStreamNode(node)) {
this._pending.add(node);
newPending.add(node);
this._newIncrementalDataRecords.add(node);
} else if (node.deferredGroupedFieldSetRecords.size > 0) {
for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) {
this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode);
Expand All @@ -116,12 +116,22 @@ export class IncrementalGraph {
this._newPending.clear();

for (const incrementalDataRecord of this._newIncrementalDataRecords) {
if (isStreamNode(incrementalDataRecord)) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this._onStreamItems(
incrementalDataRecord,
incrementalDataRecord.streamItemRecords,
);
if (isStreamItemsRecord(incrementalDataRecord)) {
const result = incrementalDataRecord.streamItemsResult.value;
if (isPromise(result)) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
result.then((resolved) =>
this._enqueue({
streamItemsRecord: incrementalDataRecord,
streamItemsResult: resolved,
}),
);
} else {
this._enqueue({
streamItemsRecord: incrementalDataRecord,
streamItemsResult: result,
});
}
} else {
const result =
incrementalDataRecord.deferredGroupedFieldSetResult.value;
Expand Down Expand Up @@ -264,8 +274,12 @@ export class IncrementalGraph {
}
}

private _addStreamRecord(streamRecord: StreamRecord): void {
this._newPending.add(streamRecord);
private _addStreamItemsRecord(streamItemsRecord: StreamItemsRecord): void {
const streamRecord = streamItemsRecord.streamRecord;
if (!this._pending.has(streamRecord)) {
this._newPending.add(streamRecord);
}
this._newIncrementalDataRecords.add(streamItemsRecord);
}

private _addDeferredFragmentNode(
Expand Down Expand Up @@ -297,71 +311,6 @@ export class IncrementalGraph {
return deferredFragmentNode;
}

private async _onStreamItems(
streamRecord: StreamRecord,
streamItemRecords: Array<StreamItemRecord>,
): Promise<void> {
let items: Array<unknown> = [];
let errors: Array<GraphQLError> = [];
let incrementalDataRecords: Array<IncrementalDataRecord> = [];
let streamItemRecord: StreamItemRecord | undefined;
while ((streamItemRecord = streamItemRecords.shift()) !== undefined) {
let result = streamItemRecord.value;
if (isPromise(result)) {
if (items.length > 0) {
this._enqueue({
streamRecord,
streamItemsResult: {
result:
// TODO add additional test case or rework for coverage
errors.length > 0 /* c8 ignore start */
? { items, errors } /* c8 ignore stop */
: { items },
incrementalDataRecords,
},
});
items = [];
errors = [];
incrementalDataRecords = [];
}
// eslint-disable-next-line no-await-in-loop
result = await result;
// wait an additional tick to coalesce resolving additional promises
// within the queue
// eslint-disable-next-line no-await-in-loop
await Promise.resolve();
}
if (result.item === undefined) {
if (items.length > 0) {
this._enqueue({
streamRecord,
streamItemsResult: {
result: errors.length > 0 ? { items, errors } : { items },
incrementalDataRecords,
},
});
}
this._enqueue({
streamRecord,
streamItemsResult:
result.errors === undefined
? {}
: {
errors: result.errors,
},
});
return;
}
items.push(result.item);
if (result.errors !== undefined) {
errors.push(...result.errors);
}
if (result.incrementalDataRecords !== undefined) {
incrementalDataRecords.push(...result.incrementalDataRecords);
}
}
}

private *_yieldCurrentCompletedIncrementalData(
first: CompletedIncrementalData,
): Generator<CompletedIncrementalData> {
Expand Down
5 changes: 3 additions & 2 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,9 @@ class IncrementalPublisher {
completedStreamItems: CompletedStreamItems,
context: SubsequentIncrementalExecutionResultContext,
): void {
const { streamRecord, streamItemsResult } = completedStreamItems;
const id = streamRecord.id;
const { streamItemsRecord, streamItemsResult } = completedStreamItems;
const streamRecord = streamItemsRecord.streamRecord;
const id = streamItemsRecord.streamRecord.id;
invariant(id !== undefined);
if (streamItemsResult.errors !== undefined) {
this._incrementalGraph.removeStream(streamRecord);
Expand Down
Loading

0 comments on commit 1aaaa3e

Please sign in to comment.