Skip to content

Commit

Permalink
optimize further
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Mar 19, 2024
1 parent 1e89149 commit 3b971c5
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 238 deletions.
181 changes: 108 additions & 73 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,15 @@ export interface FormattedSubsequentIncrementalExecutionResult<
extensions?: TExtensions;
}

interface RawDeferResult<TData = ObjMap<unknown>> {
errors?: ReadonlyArray<GraphQLError>;
data: TData;
}

export interface IncrementalDeferResult<
TData = ObjMap<unknown>,
TExtensions = ObjMap<unknown>,
> {
errors?: ReadonlyArray<GraphQLError>;
data: TData;
> extends RawDeferResult<TData> {
id: string;
subPath?: ReadonlyArray<string | number>;
extensions?: TExtensions;
Expand All @@ -115,12 +118,15 @@ export interface FormattedIncrementalDeferResult<
extensions?: TExtensions;
}

export interface IncrementalStreamResult<
TData = Array<unknown>,
TExtensions = ObjMap<unknown>,
> {
interface RawStreamItemsResult<TData = ReadonlyArray<unknown>> {
errors?: ReadonlyArray<GraphQLError>;
items: TData;
}

export interface IncrementalStreamResult<
TData = ReadonlyArray<unknown>,
TExtensions = ObjMap<unknown>,
> extends RawStreamItemsResult<TData> {
id: string;
subPath?: ReadonlyArray<string | number>;
extensions?: TExtensions;
Expand Down Expand Up @@ -166,23 +172,27 @@ export interface FormattedCompletedResult {
}

export function buildIncrementalResponse(
context: IncrementalPublisherContext,
result: ObjMap<unknown>,
errors: ReadonlyArray<GraphQLError>,
errors: ReadonlyArray<GraphQLError> | undefined,
futures: ReadonlyArray<Future>,
cancellableStreams: Set<StreamRecord>,
): ExperimentalIncrementalExecutionResults {
const incrementalPublisher = new IncrementalPublisher(cancellableStreams);
const incrementalPublisher = new IncrementalPublisher(context);
return incrementalPublisher.buildResponse(result, errors, futures);
}

interface IncrementalPublisherContext {
cancellableStreams?: Set<StreamRecord> | undefined;
}

/**
* This class is used to publish incremental results to the client, enabling semi-concurrent
* execution while preserving result order.
*
* @internal
*/
class IncrementalPublisher {
private _cancellableStreams: Set<StreamRecord>;
private _context: IncrementalPublisherContext;
private _nextId: number;
private _pending: Set<SubsequentResultRecord>;
private _completedResultQueue: Array<FutureResult>;
Expand All @@ -193,8 +203,8 @@ class IncrementalPublisher {
private _signalled!: Promise<unknown>;
private _resolve!: () => void;

constructor(cancellableStreams: Set<StreamRecord>) {
this._cancellableStreams = cancellableStreams;
constructor(context: IncrementalPublisherContext) {
this._context = context;
this._nextId = 0;
this._pending = new Set();
this._completedResultQueue = [];
Expand All @@ -206,7 +216,7 @@ class IncrementalPublisher {

buildResponse(
data: ObjMap<unknown>,
errors: ReadonlyArray<GraphQLError>,
errors: ReadonlyArray<GraphQLError> | undefined,
futures: ReadonlyArray<Future>,
): ExperimentalIncrementalExecutionResults {
this._addFutures(futures);
Expand All @@ -215,7 +225,7 @@ class IncrementalPublisher {
const pending = this._pendingSourcesToResults();

const initialResult: InitialIncrementalExecutionResult =
errors.length === 0
errors === undefined
? { data, pending, hasNext: true }
: { errors, data, pending, hasNext: true };

Expand Down Expand Up @@ -425,8 +435,12 @@ class IncrementalPublisher {
};

const returnStreamIterators = async (): Promise<void> => {
const cancellableStreams = this._context.cancellableStreams;
if (cancellableStreams === undefined) {
return;
}
const promises: Array<Promise<unknown>> = [];
for (const streamRecord of this._cancellableStreams) {
for (const streamRecord of cancellableStreams) {
if (streamRecord.earlyReturn !== undefined) {
promises.push(streamRecord.earlyReturn());
}
Expand Down Expand Up @@ -475,27 +489,36 @@ class IncrementalPublisher {
}

private _handleCompletedDeferredGroupedFieldSet(
result: DeferredGroupedFieldSetResult,
deferredGroupedFieldSetResult: DeferredGroupedFieldSetResult,
): void {
if (!isReconcilableDeferredGroupedFieldSetResult(result)) {
for (const deferredFragmentRecord of result.deferredFragmentRecords) {
if (
isNonReconcilableDeferredGroupedFieldSetResult(
deferredGroupedFieldSetResult,
)
) {
for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) {
const id = deferredFragmentRecord.id;
if (id !== undefined) {
this._completed.push({ id, errors: result.errors });
this._completed.push({
id,
errors: deferredGroupedFieldSetResult.errors,
});
this._pending.delete(deferredFragmentRecord);
}
}
return;
}
for (const deferredFragmentRecord of result.deferredFragmentRecords) {
deferredFragmentRecord.reconcilableResults.push(result);
for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) {
deferredFragmentRecord.reconcilableResults.push(
deferredGroupedFieldSetResult,
);
}

if (result.futures) {
this._addFutures(result.futures);
if (deferredGroupedFieldSetResult.futures) {
this._addFutures(deferredGroupedFieldSetResult.futures);
}

for (const deferredFragmentRecord of result.deferredFragmentRecords) {
for (const deferredFragmentRecord of deferredGroupedFieldSetResult.deferredFragmentRecords) {
const id = deferredFragmentRecord.id;
// TODO: add test case for this.
// Presumably, this can occur if an error causes a fragment to be completed early,
Expand All @@ -522,12 +545,9 @@ class IncrementalPublisher {
fragmentResult,
);
const incrementalEntry: IncrementalDeferResult = {
data: fragmentResult.data,
...fragmentResult.result,
id: bestId,
};
if (result.errors.length > 0) {
incrementalEntry.errors = fragmentResult.errors;
}
if (subPath !== undefined) {
incrementalEntry.subPath = subPath;
}
Expand All @@ -548,44 +568,48 @@ class IncrementalPublisher {
this._pruneEmpty();
}

private _handleCompletedStreamItems(result: StreamItemsResult): void {
const streamRecord = result.streamRecord;
private _handleCompletedStreamItems(
streamItemsResult: StreamItemsResult,
): void {
const streamRecord = streamItemsResult.streamRecord;
const id = streamRecord.id;
// TODO: Consider adding invariant or non-null assertion, as this should never happen. Since the stream is converted into a linked list
// for ordering purposes, if an entry errors, additional entries will not be processed.
/* c8 ignore next 3 */
if (id === undefined) {
return;
}
if (result.items === undefined) {
if (streamItemsResult.result === undefined) {
this._completed.push({ id });
this._pending.delete(streamRecord);
this._cancellableStreams.delete(streamRecord);
} else if (result.items === null) {
const cancellableStreams = this._context.cancellableStreams;
if (cancellableStreams !== undefined) {
cancellableStreams.delete(streamRecord);
}
} else if (streamItemsResult.result === null) {
this._completed.push({
id,
errors: result.errors,
errors: streamItemsResult.errors,
});
this._pending.delete(streamRecord);
this._cancellableStreams.delete(streamRecord);
const cancellableStreams = this._context.cancellableStreams;
if (cancellableStreams !== undefined) {
cancellableStreams.delete(streamRecord);
}
streamRecord.earlyReturn?.().catch(() => {
/* c8 ignore next 1 */
// ignore error
});
} else {
const incrementalEntry: IncrementalStreamResult = {
id,
items: result.items as Array<unknown>, // FIX!
...streamItemsResult.result,
};

if (result.errors !== undefined && result.errors.length > 0) {
incrementalEntry.errors = result.errors;
}

this._incremental.push(incrementalEntry);

if (result.futures) {
this._addFutures(result.futures);
if (streamItemsResult.futures) {
this._addFutures(streamItemsResult.futures);
this._pruneEmpty();
}
}
Expand Down Expand Up @@ -639,35 +663,39 @@ export function isDeferredGroupedFieldSetRecord(
export interface IncrementalContext {
deferUsageSet: DeferUsageSet | undefined;
path: Path | undefined;
errors: Array<GraphQLError>;
errorPaths: Set<Path>;
futures: Array<Future>;
errors?: Map<Path | undefined, GraphQLError> | undefined;
futures?: Array<Future> | undefined;
}

export interface DeferredGroupedFieldSetResult {
deferredFragmentRecords: ReadonlyArray<DeferredFragmentRecord>;
path: Array<string | number>;
data: ObjMap<unknown> | null;
futures?: ReadonlyArray<Future>;
errors: ReadonlyArray<GraphQLError>;
}
export type DeferredGroupedFieldSetResult =
| ReconcilableDeferredGroupedFieldSetResult
| NonReconcilableDeferredGroupedFieldSetResult;

export function isDeferredGroupedFieldSetResult(
subsequentResult: DeferredGroupedFieldSetResult | StreamItemsResult,
): subsequentResult is DeferredGroupedFieldSetResult {
return 'deferredFragmentRecords' in subsequentResult;
}

interface ReconcilableDeferredGroupedFieldSetResult
extends DeferredGroupedFieldSetResult {
data: ObjMap<unknown>;
interface ReconcilableDeferredGroupedFieldSetResult {
deferredFragmentRecords: ReadonlyArray<DeferredFragmentRecord>;
path: Array<string | number>;
result: RawDeferResult;
futures?: ReadonlyArray<Future> | undefined;
sent?: true | undefined;
}

export function isReconcilableDeferredGroupedFieldSetResult(
interface NonReconcilableDeferredGroupedFieldSetResult {
result: null;
errors: ReadonlyArray<GraphQLError>;
deferredFragmentRecords: ReadonlyArray<DeferredFragmentRecord>;
path: Array<string | number>;
}

export function isNonReconcilableDeferredGroupedFieldSetResult(
deferredGroupedFieldSetResult: DeferredGroupedFieldSetResult,
): deferredGroupedFieldSetResult is ReconcilableDeferredGroupedFieldSetResult {
return deferredGroupedFieldSetResult.data !== null;
): deferredGroupedFieldSetResult is NonReconcilableDeferredGroupedFieldSetResult {
return deferredGroupedFieldSetResult.result === null;
}

/** @internal */
Expand All @@ -691,9 +719,6 @@ export class DeferredGroupedFieldSetRecord {
const incrementalContext: IncrementalContext = {
deferUsageSet,
path,
errors: [],
errorPaths: new Set(),
futures: [],
};

for (const deferredFragmentRecord of deferredFragmentRecords) {
Expand Down Expand Up @@ -752,24 +777,36 @@ export class StreamRecord {
}
}

interface NonTerminatingStreamItemsResult {
interface NonReconcilableStreamItemsResult {
streamRecord: StreamRecord;
items: ReadonlyArray<unknown> | null;
futures?: ReadonlyArray<Future>;
result: null;
errors: ReadonlyArray<GraphQLError>;
}

interface NonTerminatingStreamItemsResult {
streamRecord: StreamRecord;
result: RawStreamItemsResult;
futures?: ReadonlyArray<Future> | undefined;
}

interface TerminatingStreamItemsResult {
streamRecord: StreamRecord;
items?: never;
result?: never;
futures?: never;
errors?: never;
}

export type StreamItemsResult =
| NonReconcilableStreamItemsResult
| NonTerminatingStreamItemsResult
| TerminatingStreamItemsResult;

export function isNonTerminatingStreamItemsResult(
streamItemsResult: StreamItemsResult,
): streamItemsResult is NonTerminatingStreamItemsResult {
return streamItemsResult.result != null;
}

/** @internal */
export class StreamItemsRecord {
streamRecord: StreamRecord;
Expand All @@ -789,9 +826,6 @@ export class StreamItemsRecord {
const incrementalContext: IncrementalContext = {
deferUsageSet: undefined,
path: itemPath,
errors: [],
errorPaths: new Set(),
futures: [],
};

this._result = executor(incrementalContext);
Expand All @@ -810,12 +844,13 @@ export class StreamItemsRecord {
private _prependNextStreamItems(
result: StreamItemsResult,
): StreamItemsResult {
return this.nextStreamItems === undefined
? result
: {
return isNonTerminatingStreamItemsResult(result) &&
this.nextStreamItems !== undefined
? {
...result,
futures: [this.nextStreamItems, ...(result.futures ?? [])],
};
}
: result;
}
}

Expand Down
Loading

0 comments on commit 3b971c5

Please sign in to comment.