From 821b537d6c0f22f4dc252659e07eeb5696f29ef4 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 28 Apr 2023 17:42:36 +0100 Subject: [PATCH 01/10] Apply edits --- spec/Section 6 -- Execution.md | 349 +++++++++++++++++++++++++++++---- 1 file changed, 307 insertions(+), 42 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 9a889b544..c77df0055 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -118,6 +118,201 @@ provide a query root operation type. If mutations or subscriptions are supported, it must also provide a mutation or subscription root operation type, respectively. +### Delivery group + +::: A _delivery group_ represents either the root selection set or a particular +`@stream` or `@defer` directive at a particular {path} in the response. + +::: The _root delivery group_ is the _delivery group_ that represents the root +selection set in the operation. + +Each _delivery group_ belongs to a {parent} delivery group, except for the _root +delivery group_. During field collection, the delivery group of each field is +tracked, and this is used when determining when to execute and deliver defered +fields and streamed list items. + +In an operation that does not utilise the `@stream` and `@defer` directives, +there will only be a single delivery group, the _root delivery group_, and all +fields will belong to it. + +### Execution group + +::: An _execution group_ represents a number of fields at a number of given +paths in the operation that all belong to the same _delivery group_ or groups. +An execution group may depend on other execution groups, such that it cannot be +delivered until they also have been delivered. + +::: A _shared execution group_ is an _execution group_ that applies to more than +one _delivery group_. Shared execution groups cannot be delivered on their own, +they must only be delivered along with at least one completed _execution group_ +that depends on them. + +An execution group consists of: + +- {deliveryGroups}: a set of the delivery groups to which it applies, +- {pendingId}: a numeric id used in the response stream to identify where to + write the data, +- {status}: {PENDING}, {EXECUTING}, {COMPLETE} or {FAILED}, +- {dependencies}: a list of execution groups on which it is dependent, and +- {groupedFieldSetByPath}: a map of response path to grouped field set that it + is responsible for executing. +- {objectValueByPath}: a map of response path to the object value for that path, + to be used when executing field sets against that object value. + +Note: {dependencies}, {groupedFieldSetByPath}, and {objectValueByPath} may be +added to over time. + +ExecutionGroupPath(executionGroup): + +- Let {bestPath} be {null}. +- Let {deliveryGroups} be that property of {executionGroup}. +- For each {deliveryGroups} as {deliveryGroup}: +- If {bestPath} is {null} or {bestPath} contains fewer entries than {path}: + - Let {bestPath} be {path}. +- Return {bestPath}. + +IncrementalEventStream(data, errors, initialIncrementalFieldSetsByPath, +variableValues): + +- Return a new event stream {responseStream} which yields events as follows: +- Let {nextId} be {0}. +- Let {executionGroups} be an empty set. +- Let {pending} be an empty list. +- Define the sub-procedure {CreateExecutionGroup(deliveryGroups)} with the + following actions: + - Let {executionGroup} be a new execution group that relates to + {deliveryGroups} with no dependencies and an empty {groupedFieldSetByPath} + and {objectValueByPath}. + - If {deliveryGroups} contains more than one entry: + - Let {id} be {null}. + - Let {bestPath} be {null}. + - For each {deliveryGroups} as {deliveryGroup}: + - Let {deliveryGroupSet} be a set containing {deliveryGroup}. + - Let {childExecutionGroup} be the result of + {ExecutionGroupFor(deliveryGroupSet)}. + - Add {executionGroup} as a dependency of {childExecutionGroup}. + - Let {path} be the path of {deliveryGroup}. + - If {bestPath} is {null} or {bestPath} contains fewer entries than + {path}: + - Let {bestPath} be {path}. + - Let {id} be the {pendingId} of {childExecutionGroup}. + - Otherwise: + - Let {id} be {nextId} and increment {nextId} by one. + - Let {deliveryGroup} be the only entry in {deliveryGroups}. + - Let {path} be the path of {deliveryGroup}. + - Let {label} be the label of {deliveryGroup} (if any). + - Let {pendingPayload} be an unordered map containing {id}, {path}, {label}. + - Add {pendingPayload} to {pending}. + - Assert: {id} is not null. + - Set {id} as the value for {pendingId} in {executionGroup}. + - Add {executionGroup} to {executionGroups}. + - Return {executionGroup}. +- Define the sub-procedure {ExecutionGroupFor(deliveryGroups)} with the + following actions: + - Let {executionGroup} be the execution group for the current operation that + relates to the delivery groups {deliveryGroups} and only those delivery + groups. If no such execution group exists then let {executionGroup} be the + result of {CreateExecutionGroup(deliveryGroups)}. + - Return {executionGroup}. +- Define the sub-procedure {AddFieldDigestsToExecutionGroup(executionGroup, + path, objectValue, responseKey, fieldDigests)} with the following actions: + - Let {groupedFieldSetByPath} be that property of {executionGroup}. + - Let {objectValueByPath} be that property of {executionGroup}. + - Let {groupedFieldSet} be the map in {groupedFieldSetByPath} for {path}; if + no such list exists, create it as an empty map and set {objectValue} as the + value for {path} in {objectValueByPath}. + - Set {fieldDigests} as the value for {responseKey} in {groupedFieldSet}. +- Define the sub-procedure {HandleIncremental(fieldSetsByPath)} with the + following actions: + - For each {fieldSetsByPath} as {path} and {fieldSets}: + - For each {fieldSets} as {responseKey} and {fieldDigests}: + - Let {deliveryGroups} be the set of delivery groups in {fieldDigests}. + - Let {executionGroup} be {ExecutionGroupFor(deliveryGroups)}. + - Let {objectValueByPath} be that property of {executionGroup}. + - Let {objectValue} be the value for {path} in {objectValueByPath}. + - Assert: {objectValue} exists and is not {null}. + - Call {AddFieldDigestsToExecutionGroup(executionGroup, path, objectValue, + responseKey, fieldDigests)}. +- Define the sub-procedure {ExecuteExecutionGroup(executionGroup)}: + - Set {state} of {executionGroup} to {EXECUTING}. + - Let {groupedFieldSetByPath} be that property of {executionGroup}. + - Let {objectValueByPath} be that property of {executionGroup}. + - Let {deliveryGroups} be that property of {executionGroup}. + - Let {dependencies} be that property of {executionGroup}. + - For each {groupedFieldSetByPath} as {path} and {groupedFieldSet} (in + parallel): + - Let {objectValue} be the value for {path} in {objectValueByPath}. + - Assert: {objectValue} exists and is not {null}. + - TODO: we also need {objectType} - we should store that next to + {objectValue}. + - Let {data} and {incrementalFieldSetsByPath} be the result of running + {ExecuteGroupedFieldSet(groupedFieldSet, objectType, objectValue, + variableValues, path, deliveryGroups)} _normally_ (allowing + parallelization). + - TODO: collect {data} at {path} (relative to + {ExecutionGroupPath(executionGroup)}). + - TODO: collect {incrementalFieldSetsByPath}. + - If an error {error} bubbled past any of the grouped field sets above: + - Set {state} of {executionGroup} to {FAILED}. + - If {deliveryGroups} contains exactly one entry: + - Let {id} be {pendingId} of {executionGroup}. + - Let {completedPayload} be an unordered map containing {id}, {error}. + - Add {completedPayload} to {completed}. + - Otherwise: + - For each {deliveryGroups} as {deliveryGroup}: + - Let {deliveryGroupSet} be a set containing {deliveryGroup}. + - Let {dependentExecutionGroup} be + {ExecutionGroupFor(deliveryGroupSet)}. + - Set {state} of {dependentExecutionGroup} to {FAILED}. + - Let {id} be {pendingId} of {dependentExecutionGroup}. + - Let {completedPayload} be an unordered map containing {id}, {error}. + - Add {completedPayload} to {completed}. + - Remove {dependentExecutionGroup} from {executionGroups}. + - Remove {executionGroup} from {executionGroups}. + - Optionally, {FlushStream()}. + - Otherwise: + - Set {state} of {executionGroup} to {COMPLETE}. + - If {deliveryGroups} contains exactly one entry: + - For each {dependencies} as {dependency}: + - TODO: send that stored data (see below). + - Remove {dependency} as a dependency from each execution group in + {executionGroups} for which it is a dependency. + - Remove {dependency} from {executionGroups}. + - TODO: push all the data into {incremental}. + - Remove {executionGroup} from {executionGroups}. + - Optionally, {FlushStream()}. + - Otherwise: + - TODO: store the data for later, send it with one of our dependents. +- Call {HandleIncremental(initialIncrementalFieldSetsByPath)}. +- Assert: {pending} is not empty. +- Let {initialResponse} be an unordered map containing {data}, {errors}, + {pending}, and the value {true} for key {hasNext}. +- Yield an event containing {initialResponse}. +- Let {incremental} be an empty list. +- Let {pending} be an empty list. +- Let {completed} be an empty list. +- Define the sub-procedure {FlushStream()} with the following actions: + - Let {hasNext} be true if {executionGroups} is not empty, {false} otherwise. + - Let {incrementalPayload} be an empty unordered map. + - Add {hasNext} to {incrementalPayload}. + - If {incremental} is not empty: + - Add {incremental} to {incrementalPayload}. + - If {pending} is not empty: + - Add {pending} to {incrementalPayload}. + - If {completed} is not empty: + - Add {completed} to {incrementalPayload}. + - Yield an event containing {incrementalPayload}. + - Reset {incremental} to an empty list. + - Reset {pending} to an empty list. + - Reset {completed} to an empty list. + - If {hasNext} is {false}, complete {responseStream}. +- While {executionGroups} is not empty: + - Let {readyToExecute} be the list of {PENDING} execution groups in + {executionGroups} whose {dependencies} are all {COMPLETE}. + - For each {readyToExecute} as {executionGroup} (in parallel): + - Call {ExecuteExecutionGroup(executionGroup)}. +- Call {FlushStream()}. + ### Query If the operation is a query, the result of the operation is the result of @@ -285,7 +480,11 @@ MapSourceToResponseEvent(sourceStream, subscription, schema, variableValues): - For each {event} on {sourceStream}: - Let {response} be the result of running {ExecuteSubscriptionEvent(subscription, schema, variableValues, event)}. - - Yield an event containing {response}. + - If {response} is an event stream: + - For each {childEvent} on {response}: + - Yield {childEvent}. + - Otherwise: + - Yield an event containing {response}. - When {responseStream} completes: complete this event stream. ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue): @@ -327,14 +526,22 @@ ExecuteRootSelectionSet(variableValues, initialValue, objectType, selectionSet, serial): - If {serial} is not provided, initialize it to {false}. +- Let {path} be an empty list. +- Let {rootDeliveryGroup} be a new delivery group with path {path}. - Let {groupedFieldSet} be the result of {CollectFields(objectType, - selectionSet, variableValues)}. -- Let {data} be the result of running {ExecuteGroupedFieldSet(groupedFieldSet, - objectType, initialValue, variableValues)} _serially_ if {serial} is {true}, + selectionSet, variableValues, path, rootDeliveryGroup)}. +- Let {currentDeliveryGroups} be a set containing {rootDeliveryGroup}. +- Let {data} and {incrementalFieldSetsByPath} be the result of running + {ExecuteGroupedFieldSet(groupedFieldSet, objectType, initialValue, + variableValues, path, currentDeliveryGroups)} _serially_ if {serial} is true, _normally_ (allowing parallelization) otherwise. - Let {errors} be the list of all _field error_ raised while executing the selection set. -- Return an unordered map containing {data} and {errors}. +- If {incrementalFieldSetsByPath} is empty: + - Return an unordered map containing {data} and {errors}. +- Otherwise: + - Return {IncrementalEventStream(data, errors, incrementalFieldSetsByPath, + variableValues)}. ## Executing a Grouped Field Set @@ -348,17 +555,31 @@ response map. ExecuteGroupedFieldSet(groupedFieldSet, objectType, objectValue, variableValues): +- Let {incrementalFieldSetsByPath} be an empty map. - Initialize {resultMap} to an empty ordered map. -- For each {groupedFieldSet} as {responseKey} and {fields}: - - Let {fieldName} be the name of the first entry in {fields}. Note: This value - is unaffected if an alias is used. - - Let {fieldType} be the return type defined for the field {fieldName} of - {objectType}. - - If {fieldType} is defined: - - Let {responseValue} be {ExecuteField(objectType, objectValue, fieldType, - fields, variableValues)}. - - Set {responseValue} as the value for {responseKey} in {resultMap}. -- Return {resultMap}. +- For each {groupedFieldSet} as {responseKey} and {fieldDigests}: + - Let {deliveryGroups} be the set of delivery groups in {fieldDigests}. + - If {deliveryGroups} contains every delivery group in + {currentDeliveryGroups}: + - Let {fieldName} be the name of the field of the first entry in + {fieldDigests}. Note: This value is unaffected if an alias is used. + - Let {fieldType} be the return type defined for the field {fieldName} of + {objectType}. + - If {fieldType} is defined: + - Let {childPath} be the result of appending {responseKey} to {path}. + - Let {responseValue} and {childFieldSetsByPath} be + {ExecuteField(objectType, objectValue, fieldType, fieldDigests, + variableValues, childPath, currentDeliveryGroups)}. + - Set {responseValue} as the value for {responseKey} in {resultMap}. + - For each {childFieldSetsByPath} as {childPath} and {fieldSets}: + - Set {fieldSets} as the value for {childPath} in + {incrementalFieldSetsByPath}. + - Otherwise: + - Let {incrementalFieldSets} be the map in {incrementalFieldSetsByPath} for + {path}; if no such map exists, create it as an empty map. + - Set {fieldDigests} as the value for {responseKey} in + {incrementalFieldSets}. +- Return {resultMap} and {incrementalFieldSetsByPath}. Note: {resultMap} is ordered by which fields appear first in the operation. This is explained in greater detail in the Field Collection section below. @@ -375,6 +596,8 @@ yielded a value may be cancelled to avoid unnecessary work. Note: See [Handling Field Errors](#sec-Handling-Field-Errors) for more about this behavior. +Further, if this occurs, the {incrementalFieldSetsByPath} must be made empty. + ### Normal and Serial Execution Normally the executor can execute the entries in a grouped field set in whatever @@ -502,8 +725,12 @@ The depth-first-search order of the field groups produced by {CollectFields()} is maintained through execution, ensuring that fields appear in the executed response in a stable and predictable order. -CollectFields(objectType, selectionSet, variableValues, visitedFragments): +CollectFields(objectType, selectionSet, variableValues, path, deliveryGroup, +visitedFragments): +- If {path} is not provided, initialize it to an empty list. +- If {deliveryGroup} is not provided, initialize it to be a new delivery group + with path {path}. - If {visitedFragments} is not provided, initialize it to the empty set. - Initialize {groupedFields} to an empty ordered map of lists. - For each {selection} in {selectionSet}: @@ -522,8 +749,19 @@ CollectFields(objectType, selectionSet, variableValues, visitedFragments): defined, otherwise the field name). - Let {groupForResponseKey} be the list in {groupedFields} for {responseKey}; if no such list exists, create it as an empty list. - - Append {selection} to the {groupForResponseKey}. + - Let {fieldDigest} be a new field digest containing {selection} and + {deliveryGroup}. + - Append {fieldDigest} to the {groupForResponseKey}. - If {selection} is a {FragmentSpread}: + - Let {fragmentDeliveryGroup} be {deliveryGroup}. + - If {selection} provides the directive `@defer`, let {deferDirective} be + that directive. + - If {deferDirective}'s {if} argument is not {false} and is not a variable + in {variableValues} with the value {false}: + - Let {label} be the value of {deferDirective}'s {label} argument (or + the value of the associated variable) if any. + - Let {fragmentDeliveryGroup} be a new delivery group with path {path}, + parent {deliveryGroup} and label {label}. - Let {fragmentSpreadName} be the name of {selection}. - If {fragmentSpreadName} is in {visitedFragments}, continue with the next {selection} in {selectionSet}. @@ -537,8 +775,8 @@ CollectFields(objectType, selectionSet, variableValues, visitedFragments): with the next {selection} in {selectionSet}. - Let {fragmentSelectionSet} be the top-level selection set of {fragment}. - Let {fragmentGroupedFieldSet} be the result of calling - {CollectFields(objectType, fragmentSelectionSet, variableValues, - visitedFragments)}. + {CollectFields(objectType, fragmentSelectionSet, variableValues, path, + fragmentDeliveryGroup, visitedFragments)}. - For each {fragmentGroup} in {fragmentGroupedFieldSet}: - Let {responseKey} be the response key shared by all fields in {fragmentGroup}. @@ -546,14 +784,23 @@ CollectFields(objectType, selectionSet, variableValues, visitedFragments): {responseKey}; if no such list exists, create it as an empty list. - Append all items in {fragmentGroup} to {groupForResponseKey}. - If {selection} is an {InlineFragment}: + - Let {fragmentDeliveryGroup} be {deliveryGroup}. + - If {selection} provides the directive `@defer`, let {deferDirective} be + that directive. + - If {deferDirective}'s {if} argument is not {false} and is not a variable + in {variableValues} with the value {false}: + - Let {label} be the value of {deferDirective}'s {label} argument (or + the value of the associated variable) if any. + - Let {fragmentDeliveryGroup} be a new delivery group with path {path}, + parent {deliveryGroup} and label {label}. - Let {fragmentType} be the type condition on {selection}. - If {fragmentType} is not {null} and {DoesFragmentTypeApply(objectType, fragmentType)} is false, continue with the next {selection} in {selectionSet}. - Let {fragmentSelectionSet} be the top-level selection set of {selection}. - Let {fragmentGroupedFieldSet} be the result of calling - {CollectFields(objectType, fragmentSelectionSet, variableValues, - visitedFragments)}. + {CollectFields(objectType, fragmentSelectionSet, variableValues, path, + fragmentDeliveryGroup, visitedFragments)}. - For each {fragmentGroup} in {fragmentGroupedFieldSet}: - Let {responseKey} be the response key shared by all fields in {fragmentGroup}. @@ -585,16 +832,17 @@ coerces any provided argument values, then resolves a value for the field, and finally completes that value either by recursively executing another selection set or coercing a scalar value. -ExecuteField(objectType, objectValue, fieldType, fields, variableValues): +ExecuteField(objectType, objectValue, fieldType, fieldDigests, variableValues, +path, currentDeliveryGroups): -- Let {field} be the first entry in {fields}. +- Let {field} be the first entry in {fieldDigests}. - Let {fieldName} be the field name of {field}. - Let {argumentValues} be the result of {CoerceArgumentValues(objectType, field, variableValues)} - Let {resolvedValue} be {ResolveFieldValue(objectType, objectValue, fieldName, argumentValues)}. -- Return the result of {CompleteValue(fieldType, fields, resolvedValue, - variableValues)}. +- Return the result of {CompleteValue(fieldType, fieldDigests, resolvedValue, + variableValues, path, currentDeliveryGroups)}. ### Coercing Field Arguments @@ -681,34 +929,49 @@ After resolving the value for a field, it is completed by ensuring it adheres to the expected return type. If the return type is another Object type, then the field execution process continues recursively. -CompleteValue(fieldType, fields, result, variableValues): +CompleteValue(fieldType, fieldDigests, result, variableValues, path, +currentDeliveryGroups): - If the {fieldType} is a Non-Null type: - Let {innerType} be the inner type of {fieldType}. - - Let {completedResult} be the result of calling {CompleteValue(innerType, - fields, result, variableValues)}. + - Let {completedResult} and {incrementalFieldSetsByPath} be the result of + calling {CompleteValue(innerType, fieldDigests, result, variableValues, + path, currentDeliveryGroups)}. - If {completedResult} is {null}, raise a _field error_. - - Return {completedResult}. + - Return {completedResult} and {incrementalFieldSetsByPath}. - If {result} is {null} (or another internal value similar to {null} such as {undefined}), return {null}. - If {fieldType} is a List type: - If {result} is not a collection of values, raise a _field error_. - Let {innerType} be the inner type of {fieldType}. - - Return a list where each list item is the result of calling - {CompleteValue(innerType, fields, resultItem, variableValues)}, where - {resultItem} is each item in {result}. + - Let {incrementalFieldSetsByPath} be an empty map. + - Let {list} be an empty list. + - For each list item {resultItem} at 0-indexed index {resultIndex} in + {result}: + - Let {subpath} be the result of appending {resultIndex} to {path}. + - Let {listValue} and {itemIncrementalFieldSetsByPath} be the result of + calling {CompleteValue(innerType, fieldDigests, resultItem, + variableValues, subpath, currentDeliveryGroups)}. + - Append {listValue} to {list}. + - If {listValue} is not {null}: + - For each {itemIncrementalFieldSetsByPath} as {childPath} and + {childFieldSets}: + - Set {childFieldSets} as the value for {childPath} in + {incrementalFieldSetsByPath}. + - Return {list} and {incrementalFieldSetsByPath}. - If {fieldType} is a Scalar or Enum type: - - Return the result of {CoerceResult(fieldType, result)}. + - Let {completedResult} be the result of {CoerceResult(fieldType, result)}. + - Return {completedResult} and an empty map. - If {fieldType} is an Object, Interface, or Union type: - If {fieldType} is an Object type. - Let {objectType} be {fieldType}. - Otherwise if {fieldType} is an Interface or Union type. - Let {objectType} be {ResolveAbstractType(fieldType, result)}. - - Let {groupedFieldSet} be the result of calling {CollectSubfields(objectType, - fields, variableValues)}. + - Let {groupedFieldSet} be the result of calling + {CollectSubfields(objectType, fieldDigests, variableValues, path)}. - Return the result of evaluating {ExecuteGroupedFieldSet(groupedFieldSet, - objectType, result, variableValues)} _normally_ (allowing for - parallelization). + objectType, result, variableValues, path, currentDeliveryGroups)} _normally_ + (allowing for parallelization). **Coercing Results** @@ -774,18 +1037,20 @@ sub-selections. After resolving the value for `me`, the selection sets are merged together so `firstName` and `lastName` can be resolved for one value. -CollectSubfields(objectType, fields, variableValues): +CollectSubfields(objectType, fieldDigests, variableValues, path): - Let {groupedFieldSet} be an empty map. -- For each {field} in {fields}: +- For each {fieldDigest} in {fieldDigests}: + - Let {field} be the field of {fieldDigest}. + - Let {deliveryGroup} be the delivery group of {fieldDigest}. - Let {fieldSelectionSet} be the selection set of {field}. - If {fieldSelectionSet} is null or empty, continue to the next field. - Let {subGroupedFieldSet} be the result of {CollectFields(objectType, - fieldSelectionSet, variableValues)}. - - For each {subGroupedFieldSet} as {responseKey} and {subfields}: + fieldSelectionSet, variableValues, path, deliveryGroup)}. + - For each {subGroupedFieldSet} as {responseKey} and {subfieldDigests}: - Let {groupForResponseKey} be the list in {groupedFieldSet} for {responseKey}; if no such list exists, create it as an empty list. - - Append all fields in {subfields} to {groupForResponseKey}. + - Append all field digests in {subfieldDigests} to {groupForResponseKey}. - Return {groupedFieldSet}. ### Handling Field Errors From dd930a24c6fd1699d10d0924431de3aea09ad90f Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 28 Apr 2023 17:45:38 +0100 Subject: [PATCH 02/10] Move algorithm --- spec/Section 6 -- Execution.md | 398 +++++++++++++++++---------------- 1 file changed, 201 insertions(+), 197 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index c77df0055..e6eb5f3da 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -118,201 +118,6 @@ provide a query root operation type. If mutations or subscriptions are supported, it must also provide a mutation or subscription root operation type, respectively. -### Delivery group - -::: A _delivery group_ represents either the root selection set or a particular -`@stream` or `@defer` directive at a particular {path} in the response. - -::: The _root delivery group_ is the _delivery group_ that represents the root -selection set in the operation. - -Each _delivery group_ belongs to a {parent} delivery group, except for the _root -delivery group_. During field collection, the delivery group of each field is -tracked, and this is used when determining when to execute and deliver defered -fields and streamed list items. - -In an operation that does not utilise the `@stream` and `@defer` directives, -there will only be a single delivery group, the _root delivery group_, and all -fields will belong to it. - -### Execution group - -::: An _execution group_ represents a number of fields at a number of given -paths in the operation that all belong to the same _delivery group_ or groups. -An execution group may depend on other execution groups, such that it cannot be -delivered until they also have been delivered. - -::: A _shared execution group_ is an _execution group_ that applies to more than -one _delivery group_. Shared execution groups cannot be delivered on their own, -they must only be delivered along with at least one completed _execution group_ -that depends on them. - -An execution group consists of: - -- {deliveryGroups}: a set of the delivery groups to which it applies, -- {pendingId}: a numeric id used in the response stream to identify where to - write the data, -- {status}: {PENDING}, {EXECUTING}, {COMPLETE} or {FAILED}, -- {dependencies}: a list of execution groups on which it is dependent, and -- {groupedFieldSetByPath}: a map of response path to grouped field set that it - is responsible for executing. -- {objectValueByPath}: a map of response path to the object value for that path, - to be used when executing field sets against that object value. - -Note: {dependencies}, {groupedFieldSetByPath}, and {objectValueByPath} may be -added to over time. - -ExecutionGroupPath(executionGroup): - -- Let {bestPath} be {null}. -- Let {deliveryGroups} be that property of {executionGroup}. -- For each {deliveryGroups} as {deliveryGroup}: -- If {bestPath} is {null} or {bestPath} contains fewer entries than {path}: - - Let {bestPath} be {path}. -- Return {bestPath}. - -IncrementalEventStream(data, errors, initialIncrementalFieldSetsByPath, -variableValues): - -- Return a new event stream {responseStream} which yields events as follows: -- Let {nextId} be {0}. -- Let {executionGroups} be an empty set. -- Let {pending} be an empty list. -- Define the sub-procedure {CreateExecutionGroup(deliveryGroups)} with the - following actions: - - Let {executionGroup} be a new execution group that relates to - {deliveryGroups} with no dependencies and an empty {groupedFieldSetByPath} - and {objectValueByPath}. - - If {deliveryGroups} contains more than one entry: - - Let {id} be {null}. - - Let {bestPath} be {null}. - - For each {deliveryGroups} as {deliveryGroup}: - - Let {deliveryGroupSet} be a set containing {deliveryGroup}. - - Let {childExecutionGroup} be the result of - {ExecutionGroupFor(deliveryGroupSet)}. - - Add {executionGroup} as a dependency of {childExecutionGroup}. - - Let {path} be the path of {deliveryGroup}. - - If {bestPath} is {null} or {bestPath} contains fewer entries than - {path}: - - Let {bestPath} be {path}. - - Let {id} be the {pendingId} of {childExecutionGroup}. - - Otherwise: - - Let {id} be {nextId} and increment {nextId} by one. - - Let {deliveryGroup} be the only entry in {deliveryGroups}. - - Let {path} be the path of {deliveryGroup}. - - Let {label} be the label of {deliveryGroup} (if any). - - Let {pendingPayload} be an unordered map containing {id}, {path}, {label}. - - Add {pendingPayload} to {pending}. - - Assert: {id} is not null. - - Set {id} as the value for {pendingId} in {executionGroup}. - - Add {executionGroup} to {executionGroups}. - - Return {executionGroup}. -- Define the sub-procedure {ExecutionGroupFor(deliveryGroups)} with the - following actions: - - Let {executionGroup} be the execution group for the current operation that - relates to the delivery groups {deliveryGroups} and only those delivery - groups. If no such execution group exists then let {executionGroup} be the - result of {CreateExecutionGroup(deliveryGroups)}. - - Return {executionGroup}. -- Define the sub-procedure {AddFieldDigestsToExecutionGroup(executionGroup, - path, objectValue, responseKey, fieldDigests)} with the following actions: - - Let {groupedFieldSetByPath} be that property of {executionGroup}. - - Let {objectValueByPath} be that property of {executionGroup}. - - Let {groupedFieldSet} be the map in {groupedFieldSetByPath} for {path}; if - no such list exists, create it as an empty map and set {objectValue} as the - value for {path} in {objectValueByPath}. - - Set {fieldDigests} as the value for {responseKey} in {groupedFieldSet}. -- Define the sub-procedure {HandleIncremental(fieldSetsByPath)} with the - following actions: - - For each {fieldSetsByPath} as {path} and {fieldSets}: - - For each {fieldSets} as {responseKey} and {fieldDigests}: - - Let {deliveryGroups} be the set of delivery groups in {fieldDigests}. - - Let {executionGroup} be {ExecutionGroupFor(deliveryGroups)}. - - Let {objectValueByPath} be that property of {executionGroup}. - - Let {objectValue} be the value for {path} in {objectValueByPath}. - - Assert: {objectValue} exists and is not {null}. - - Call {AddFieldDigestsToExecutionGroup(executionGroup, path, objectValue, - responseKey, fieldDigests)}. -- Define the sub-procedure {ExecuteExecutionGroup(executionGroup)}: - - Set {state} of {executionGroup} to {EXECUTING}. - - Let {groupedFieldSetByPath} be that property of {executionGroup}. - - Let {objectValueByPath} be that property of {executionGroup}. - - Let {deliveryGroups} be that property of {executionGroup}. - - Let {dependencies} be that property of {executionGroup}. - - For each {groupedFieldSetByPath} as {path} and {groupedFieldSet} (in - parallel): - - Let {objectValue} be the value for {path} in {objectValueByPath}. - - Assert: {objectValue} exists and is not {null}. - - TODO: we also need {objectType} - we should store that next to - {objectValue}. - - Let {data} and {incrementalFieldSetsByPath} be the result of running - {ExecuteGroupedFieldSet(groupedFieldSet, objectType, objectValue, - variableValues, path, deliveryGroups)} _normally_ (allowing - parallelization). - - TODO: collect {data} at {path} (relative to - {ExecutionGroupPath(executionGroup)}). - - TODO: collect {incrementalFieldSetsByPath}. - - If an error {error} bubbled past any of the grouped field sets above: - - Set {state} of {executionGroup} to {FAILED}. - - If {deliveryGroups} contains exactly one entry: - - Let {id} be {pendingId} of {executionGroup}. - - Let {completedPayload} be an unordered map containing {id}, {error}. - - Add {completedPayload} to {completed}. - - Otherwise: - - For each {deliveryGroups} as {deliveryGroup}: - - Let {deliveryGroupSet} be a set containing {deliveryGroup}. - - Let {dependentExecutionGroup} be - {ExecutionGroupFor(deliveryGroupSet)}. - - Set {state} of {dependentExecutionGroup} to {FAILED}. - - Let {id} be {pendingId} of {dependentExecutionGroup}. - - Let {completedPayload} be an unordered map containing {id}, {error}. - - Add {completedPayload} to {completed}. - - Remove {dependentExecutionGroup} from {executionGroups}. - - Remove {executionGroup} from {executionGroups}. - - Optionally, {FlushStream()}. - - Otherwise: - - Set {state} of {executionGroup} to {COMPLETE}. - - If {deliveryGroups} contains exactly one entry: - - For each {dependencies} as {dependency}: - - TODO: send that stored data (see below). - - Remove {dependency} as a dependency from each execution group in - {executionGroups} for which it is a dependency. - - Remove {dependency} from {executionGroups}. - - TODO: push all the data into {incremental}. - - Remove {executionGroup} from {executionGroups}. - - Optionally, {FlushStream()}. - - Otherwise: - - TODO: store the data for later, send it with one of our dependents. -- Call {HandleIncremental(initialIncrementalFieldSetsByPath)}. -- Assert: {pending} is not empty. -- Let {initialResponse} be an unordered map containing {data}, {errors}, - {pending}, and the value {true} for key {hasNext}. -- Yield an event containing {initialResponse}. -- Let {incremental} be an empty list. -- Let {pending} be an empty list. -- Let {completed} be an empty list. -- Define the sub-procedure {FlushStream()} with the following actions: - - Let {hasNext} be true if {executionGroups} is not empty, {false} otherwise. - - Let {incrementalPayload} be an empty unordered map. - - Add {hasNext} to {incrementalPayload}. - - If {incremental} is not empty: - - Add {incremental} to {incrementalPayload}. - - If {pending} is not empty: - - Add {pending} to {incrementalPayload}. - - If {completed} is not empty: - - Add {completed} to {incrementalPayload}. - - Yield an event containing {incrementalPayload}. - - Reset {incremental} to an empty list. - - Reset {pending} to an empty list. - - Reset {completed} to an empty list. - - If {hasNext} is {false}, complete {responseStream}. -- While {executionGroups} is not empty: - - Let {readyToExecute} be the list of {PENDING} execution groups in - {executionGroups} whose {dependencies} are all {COMPLETE}. - - For each {readyToExecute} as {executionGroup} (in parallel): - - Call {ExecuteExecutionGroup(executionGroup)}. -- Call {FlushStream()}. - ### Query If the operation is a query, the result of the operation is the result of @@ -967,8 +772,8 @@ currentDeliveryGroups): - Let {objectType} be {fieldType}. - Otherwise if {fieldType} is an Interface or Union type. - Let {objectType} be {ResolveAbstractType(fieldType, result)}. - - Let {groupedFieldSet} be the result of calling - {CollectSubfields(objectType, fieldDigests, variableValues, path)}. + - Let {groupedFieldSet} be the result of calling {CollectSubfields(objectType, + fieldDigests, variableValues, path)}. - Return the result of evaluating {ExecuteGroupedFieldSet(groupedFieldSet, objectType, result, variableValues, path, currentDeliveryGroups)} _normally_ (allowing for parallelization). @@ -1089,3 +894,202 @@ upwards. If all fields from the root of the request to the source of the field error return `Non-Null` types, then the {"data"} entry in the response should be {null}. + +## Incremental Delivery + +### Delivery Group + +::: A _delivery group_ represents either the root selection set or a particular +`@stream` or `@defer` directive at a particular {path} in the response. + +::: The _root delivery group_ is the _delivery group_ that represents the root +selection set in the operation. + +Each _delivery group_ belongs to a {parent} delivery group, except for the _root +delivery group_. During field collection, the delivery group of each field is +tracked, and this is used when determining when to execute and deliver defered +fields and streamed list items. + +In an operation that does not utilise the `@stream` and `@defer` directives, +there will only be a single delivery group, the _root delivery group_, and all +fields will belong to it. + +### Execution Group + +::: An _execution group_ represents a number of fields at a number of given +paths in the operation that all belong to the same _delivery group_ or groups. +An execution group may depend on other execution groups, such that it cannot be +delivered until they also have been delivered. + +::: A _shared execution group_ is an _execution group_ that applies to more than +one _delivery group_. Shared execution groups cannot be delivered on their own, +they must only be delivered along with at least one completed _execution group_ +that depends on them. + +An execution group consists of: + +- {deliveryGroups}: a set of the delivery groups to which it applies, +- {pendingId}: a numeric id used in the response stream to identify where to + write the data, +- {status}: {PENDING}, {EXECUTING}, {COMPLETE} or {FAILED}, +- {dependencies}: a list of execution groups on which it is dependent, and +- {groupedFieldSetByPath}: a map of response path to grouped field set that it + is responsible for executing. +- {objectValueByPath}: a map of response path to the object value for that path, + to be used when executing field sets against that object value. + +Note: {dependencies}, {groupedFieldSetByPath}, and {objectValueByPath} may be +added to over time. + +ExecutionGroupPath(executionGroup): + +- Let {bestPath} be {null}. +- Let {deliveryGroups} be that property of {executionGroup}. +- For each {deliveryGroups} as {deliveryGroup}: +- If {bestPath} is {null} or {bestPath} contains fewer entries than {path}: + - Let {bestPath} be {path}. +- Return {bestPath}. + +### Incremental Event Stream + +IncrementalEventStream(data, errors, initialIncrementalFieldSetsByPath, +variableValues): + +- Return a new event stream {responseStream} which yields events as follows: +- Let {nextId} be {0}. +- Let {executionGroups} be an empty set. +- Let {pending} be an empty list. +- Define the sub-procedure {CreateExecutionGroup(deliveryGroups)} with the + following actions: + - Let {executionGroup} be a new execution group that relates to + {deliveryGroups} with no dependencies and an empty {groupedFieldSetByPath} + and {objectValueByPath}. + - If {deliveryGroups} contains more than one entry: + - Let {id} be {null}. + - Let {bestPath} be {null}. + - For each {deliveryGroups} as {deliveryGroup}: + - Let {deliveryGroupSet} be a set containing {deliveryGroup}. + - Let {childExecutionGroup} be the result of + {ExecutionGroupFor(deliveryGroupSet)}. + - Add {executionGroup} as a dependency of {childExecutionGroup}. + - Let {path} be the path of {deliveryGroup}. + - If {bestPath} is {null} or {bestPath} contains fewer entries than + {path}: + - Let {bestPath} be {path}. + - Let {id} be the {pendingId} of {childExecutionGroup}. + - Otherwise: + - Let {id} be {nextId} and increment {nextId} by one. + - Let {deliveryGroup} be the only entry in {deliveryGroups}. + - Let {path} be the path of {deliveryGroup}. + - Let {label} be the label of {deliveryGroup} (if any). + - Let {pendingPayload} be an unordered map containing {id}, {path}, {label}. + - Add {pendingPayload} to {pending}. + - Assert: {id} is not null. + - Set {id} as the value for {pendingId} in {executionGroup}. + - Add {executionGroup} to {executionGroups}. + - Return {executionGroup}. +- Define the sub-procedure {ExecutionGroupFor(deliveryGroups)} with the + following actions: + - Let {executionGroup} be the execution group for the current operation that + relates to the delivery groups {deliveryGroups} and only those delivery + groups. If no such execution group exists then let {executionGroup} be the + result of {CreateExecutionGroup(deliveryGroups)}. + - Return {executionGroup}. +- Define the sub-procedure {AddFieldDigestsToExecutionGroup(executionGroup, + path, objectValue, responseKey, fieldDigests)} with the following actions: + - Let {groupedFieldSetByPath} be that property of {executionGroup}. + - Let {objectValueByPath} be that property of {executionGroup}. + - Let {groupedFieldSet} be the map in {groupedFieldSetByPath} for {path}; if + no such list exists, create it as an empty map and set {objectValue} as the + value for {path} in {objectValueByPath}. + - Set {fieldDigests} as the value for {responseKey} in {groupedFieldSet}. +- Define the sub-procedure {HandleIncremental(fieldSetsByPath)} with the + following actions: + - For each {fieldSetsByPath} as {path} and {fieldSets}: + - For each {fieldSets} as {responseKey} and {fieldDigests}: + - Let {deliveryGroups} be the set of delivery groups in {fieldDigests}. + - Let {executionGroup} be {ExecutionGroupFor(deliveryGroups)}. + - Let {objectValueByPath} be that property of {executionGroup}. + - Let {objectValue} be the value for {path} in {objectValueByPath}. + - Assert: {objectValue} exists and is not {null}. + - Call {AddFieldDigestsToExecutionGroup(executionGroup, path, objectValue, + responseKey, fieldDigests)}. +- Define the sub-procedure {ExecuteExecutionGroup(executionGroup)}: + - Set {state} of {executionGroup} to {EXECUTING}. + - Let {groupedFieldSetByPath} be that property of {executionGroup}. + - Let {objectValueByPath} be that property of {executionGroup}. + - Let {deliveryGroups} be that property of {executionGroup}. + - Let {dependencies} be that property of {executionGroup}. + - For each {groupedFieldSetByPath} as {path} and {groupedFieldSet} (in + parallel): + - Let {objectValue} be the value for {path} in {objectValueByPath}. + - Assert: {objectValue} exists and is not {null}. + - TODO: we also need {objectType} - we should store that next to + {objectValue}. + - Let {data} and {incrementalFieldSetsByPath} be the result of running + {ExecuteGroupedFieldSet(groupedFieldSet, objectType, objectValue, + variableValues, path, deliveryGroups)} _normally_ (allowing + parallelization). + - TODO: collect {data} at {path} (relative to + {ExecutionGroupPath(executionGroup)}). + - TODO: collect {incrementalFieldSetsByPath}. + - If an error {error} bubbled past any of the grouped field sets above: + - Set {state} of {executionGroup} to {FAILED}. + - If {deliveryGroups} contains exactly one entry: + - Let {id} be {pendingId} of {executionGroup}. + - Let {completedPayload} be an unordered map containing {id}, {error}. + - Add {completedPayload} to {completed}. + - Otherwise: + - For each {deliveryGroups} as {deliveryGroup}: + - Let {deliveryGroupSet} be a set containing {deliveryGroup}. + - Let {dependentExecutionGroup} be + {ExecutionGroupFor(deliveryGroupSet)}. + - Set {state} of {dependentExecutionGroup} to {FAILED}. + - Let {id} be {pendingId} of {dependentExecutionGroup}. + - Let {completedPayload} be an unordered map containing {id}, {error}. + - Add {completedPayload} to {completed}. + - Remove {dependentExecutionGroup} from {executionGroups}. + - Remove {executionGroup} from {executionGroups}. + - Optionally, {FlushStream()}. + - Otherwise: + - Set {state} of {executionGroup} to {COMPLETE}. + - If {deliveryGroups} contains exactly one entry: + - For each {dependencies} as {dependency}: + - TODO: send that stored data (see below). + - Remove {dependency} as a dependency from each execution group in + {executionGroups} for which it is a dependency. + - Remove {dependency} from {executionGroups}. + - TODO: push all the data into {incremental}. + - Remove {executionGroup} from {executionGroups}. + - Optionally, {FlushStream()}. + - Otherwise: + - TODO: store the data for later, send it with one of our dependents. +- Call {HandleIncremental(initialIncrementalFieldSetsByPath)}. +- Assert: {pending} is not empty. +- Let {initialResponse} be an unordered map containing {data}, {errors}, + {pending}, and the value {true} for key {hasNext}. +- Yield an event containing {initialResponse}. +- Let {incremental} be an empty list. +- Let {pending} be an empty list. +- Let {completed} be an empty list. +- Define the sub-procedure {FlushStream()} with the following actions: + - Let {hasNext} be true if {executionGroups} is not empty, {false} otherwise. + - Let {incrementalPayload} be an empty unordered map. + - Add {hasNext} to {incrementalPayload}. + - If {incremental} is not empty: + - Add {incremental} to {incrementalPayload}. + - If {pending} is not empty: + - Add {pending} to {incrementalPayload}. + - If {completed} is not empty: + - Add {completed} to {incrementalPayload}. + - Yield an event containing {incrementalPayload}. + - Reset {incremental} to an empty list. + - Reset {pending} to an empty list. + - Reset {completed} to an empty list. + - If {hasNext} is {false}, complete {responseStream}. +- While {executionGroups} is not empty: + - Let {readyToExecute} be the list of {PENDING} execution groups in + {executionGroups} whose {dependencies} are all {COMPLETE}. + - For each {readyToExecute} as {executionGroup} (in parallel): + - Call {ExecuteExecutionGroup(executionGroup)}. +- Call {FlushStream()}. From 0f5e0a8256395e795f4d744954dbcbf948c7a672 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 28 Apr 2023 18:47:05 +0100 Subject: [PATCH 03/10] Various fixes --- spec/Section 6 -- Execution.md | 104 ++++++++++++++++----------------- 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index e6eb5f3da..46ab86086 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -336,16 +336,16 @@ serial): - Let {groupedFieldSet} be the result of {CollectFields(objectType, selectionSet, variableValues, path, rootDeliveryGroup)}. - Let {currentDeliveryGroups} be a set containing {rootDeliveryGroup}. -- Let {data} and {incrementalFieldSetsByPath} be the result of running +- Let {data} and {incrementalDetailsByPath} be the result of running {ExecuteGroupedFieldSet(groupedFieldSet, objectType, initialValue, variableValues, path, currentDeliveryGroups)} _serially_ if {serial} is true, _normally_ (allowing parallelization) otherwise. - Let {errors} be the list of all _field error_ raised while executing the selection set. -- If {incrementalFieldSetsByPath} is empty: +- If {incrementalDetailsByPath} is empty: - Return an unordered map containing {data} and {errors}. - Otherwise: - - Return {IncrementalEventStream(data, errors, incrementalFieldSetsByPath, + - Return {IncrementalEventStream(data, errors, incrementalDetailsByPath, variableValues)}. ## Executing a Grouped Field Set @@ -360,7 +360,7 @@ response map. ExecuteGroupedFieldSet(groupedFieldSet, objectType, objectValue, variableValues): -- Let {incrementalFieldSetsByPath} be an empty map. +- Let {incrementalDetailsByPath} be an empty map. - Initialize {resultMap} to an empty ordered map. - For each {groupedFieldSet} as {responseKey} and {fieldDigests}: - Let {deliveryGroups} be the set of delivery groups in {fieldDigests}. @@ -376,15 +376,19 @@ variableValues): {ExecuteField(objectType, objectValue, fieldType, fieldDigests, variableValues, childPath, currentDeliveryGroups)}. - Set {responseValue} as the value for {responseKey} in {resultMap}. - - For each {childFieldSetsByPath} as {childPath} and {fieldSets}: - - Set {fieldSets} as the value for {childPath} in - {incrementalFieldSetsByPath}. + - For each {childFieldSetsByPath} as {childPath} and {fieldSet}: + - Set {fieldSet} as the value for {childPath} in + {incrementalDetailsByPath}. - Otherwise: - - Let {incrementalFieldSets} be the map in {incrementalFieldSetsByPath} for - {path}; if no such map exists, create it as an empty map. + - Let {details} be the details object in {incrementalDetailsByPath} for + {path}; if no such details object exists, create it as a details object + containing {groupedFieldSet} as an empty map, {objectType} and + {objectValue}. + - Let {incrementalFieldSet} be the value for property {groupedFieldSet} in + {details}. - Set {fieldDigests} as the value for {responseKey} in - {incrementalFieldSets}. -- Return {resultMap} and {incrementalFieldSetsByPath}. + {incrementalFieldSet}. +- Return {resultMap} and {incrementalDetailsByPath}. Note: {resultMap} is ordered by which fields appear first in the operation. This is explained in greater detail in the Field Collection section below. @@ -401,7 +405,7 @@ yielded a value may be cancelled to avoid unnecessary work. Note: See [Handling Field Errors](#sec-Handling-Field-Errors) for more about this behavior. -Further, if this occurs, the {incrementalFieldSetsByPath} must be made empty. +Further, if this occurs, the {incrementalDetailsByPath} must be made empty. ### Normal and Serial Execution @@ -739,17 +743,17 @@ currentDeliveryGroups): - If the {fieldType} is a Non-Null type: - Let {innerType} be the inner type of {fieldType}. - - Let {completedResult} and {incrementalFieldSetsByPath} be the result of + - Let {completedResult} and {incrementalDetailsByPath} be the result of calling {CompleteValue(innerType, fieldDigests, result, variableValues, path, currentDeliveryGroups)}. - If {completedResult} is {null}, raise a _field error_. - - Return {completedResult} and {incrementalFieldSetsByPath}. + - Return {completedResult} and {incrementalDetailsByPath}. - If {result} is {null} (or another internal value similar to {null} such as {undefined}), return {null}. - If {fieldType} is a List type: - If {result} is not a collection of values, raise a _field error_. - Let {innerType} be the inner type of {fieldType}. - - Let {incrementalFieldSetsByPath} be an empty map. + - Let {incrementalDetailsByPath} be an empty map. - Let {list} be an empty list. - For each list item {resultItem} at 0-indexed index {resultIndex} in {result}: @@ -762,8 +766,8 @@ currentDeliveryGroups): - For each {itemIncrementalFieldSetsByPath} as {childPath} and {childFieldSets}: - Set {childFieldSets} as the value for {childPath} in - {incrementalFieldSetsByPath}. - - Return {list} and {incrementalFieldSetsByPath}. + {incrementalDetailsByPath}. + - Return {list} and {incrementalDetailsByPath}. - If {fieldType} is a Scalar or Enum type: - Let {completedResult} be the result of {CoerceResult(fieldType, result)}. - Return {completedResult} and an empty map. @@ -933,13 +937,10 @@ An execution group consists of: write the data, - {status}: {PENDING}, {EXECUTING}, {COMPLETE} or {FAILED}, - {dependencies}: a list of execution groups on which it is dependent, and -- {groupedFieldSetByPath}: a map of response path to grouped field set that it - is responsible for executing. -- {objectValueByPath}: a map of response path to the object value for that path, - to be used when executing field sets against that object value. +- {detailsByPath}: a map of response path to a details object containing + {groupedFieldSet}, {objectType} and {objectValue}. -Note: {dependencies}, {groupedFieldSetByPath}, and {objectValueByPath} may be -added to over time. +Note: {dependencies} and {detailsByPath} may be added to over time. ExecutionGroupPath(executionGroup): @@ -952,7 +953,7 @@ ExecutionGroupPath(executionGroup): ### Incremental Event Stream -IncrementalEventStream(data, errors, initialIncrementalFieldSetsByPath, +IncrementalEventStream(data, errors, initialIncrementalDetailsByPath, variableValues): - Return a new event stream {responseStream} which yields events as follows: @@ -962,8 +963,7 @@ variableValues): - Define the sub-procedure {CreateExecutionGroup(deliveryGroups)} with the following actions: - Let {executionGroup} be a new execution group that relates to - {deliveryGroups} with no dependencies and an empty {groupedFieldSetByPath} - and {objectValueByPath}. + {deliveryGroups} with no dependencies and an empty {detailsByPath}. - If {deliveryGroups} contains more than one entry: - Let {id} be {null}. - Let {bestPath} be {null}. @@ -996,43 +996,43 @@ variableValues): result of {CreateExecutionGroup(deliveryGroups)}. - Return {executionGroup}. - Define the sub-procedure {AddFieldDigestsToExecutionGroup(executionGroup, - path, objectValue, responseKey, fieldDigests)} with the following actions: - - Let {groupedFieldSetByPath} be that property of {executionGroup}. - - Let {objectValueByPath} be that property of {executionGroup}. - - Let {groupedFieldSet} be the map in {groupedFieldSetByPath} for {path}; if - no such list exists, create it as an empty map and set {objectValue} as the - value for {path} in {objectValueByPath}. + path, objectType, objectValue, responseKey, fieldDigests)} with the following + actions: + - Let {detailsByPath} be that property of {executionGroup}. + - Let {details} be the details object in {detailsByPath} for {path}; if no + such details object exists, create it as a details object containing + {groupedFieldSet} as an empty map, {objectType} and {objectValue}. + - Let {groupedFieldSet} be that property in {details}. - Set {fieldDigests} as the value for {responseKey} in {groupedFieldSet}. -- Define the sub-procedure {HandleIncremental(fieldSetsByPath)} with the - following actions: - - For each {fieldSetsByPath} as {path} and {fieldSets}: - - For each {fieldSets} as {responseKey} and {fieldDigests}: - - Let {deliveryGroups} be the set of delivery groups in {fieldDigests}. +- Define the sub-procedure {HandleIncremental(incrementalDetailsByPath)} with + the following actions: + - For each {incrementalDetailsByPath} as {path} and {details}: + - Let {objectType} be that property in {details}. + - Let {objectValue} be that property in {details}. + - Let {groupedFieldSet} be that property in {details}. + - For each {groupedFieldSet} as {responseKey} and {fieldDigests}: + - Let {deliveryGroups} be the set containing each unique delivery group + for each digest in {fieldDigests}. - Let {executionGroup} be {ExecutionGroupFor(deliveryGroups)}. - - Let {objectValueByPath} be that property of {executionGroup}. - - Let {objectValue} be the value for {path} in {objectValueByPath}. - - Assert: {objectValue} exists and is not {null}. - - Call {AddFieldDigestsToExecutionGroup(executionGroup, path, objectValue, - responseKey, fieldDigests)}. + - Call {AddFieldDigestsToExecutionGroup(executionGroup, path, objectType, + objectValue, responseKey, fieldDigests)}. - Define the sub-procedure {ExecuteExecutionGroup(executionGroup)}: - Set {state} of {executionGroup} to {EXECUTING}. - - Let {groupedFieldSetByPath} be that property of {executionGroup}. - - Let {objectValueByPath} be that property of {executionGroup}. + - Let {detailsByPath} be that property of {executionGroup}. - Let {deliveryGroups} be that property of {executionGroup}. - Let {dependencies} be that property of {executionGroup}. - - For each {groupedFieldSetByPath} as {path} and {groupedFieldSet} (in - parallel): - - Let {objectValue} be the value for {path} in {objectValueByPath}. + - For each {detailsByPath} as {path} and {details} (in parallel): + - Let {groupedFieldSet} be that property in {details}. + - Let {objectType} be that property in {details}. + - Let {objectValue} be that property in {details}. - Assert: {objectValue} exists and is not {null}. - - TODO: we also need {objectType} - we should store that next to - {objectValue}. - - Let {data} and {incrementalFieldSetsByPath} be the result of running + - Let {data} and {incrementalDetailsByPath} be the result of running {ExecuteGroupedFieldSet(groupedFieldSet, objectType, objectValue, variableValues, path, deliveryGroups)} _normally_ (allowing parallelization). - TODO: collect {data} at {path} (relative to {ExecutionGroupPath(executionGroup)}). - - TODO: collect {incrementalFieldSetsByPath}. + - TODO: collect {incrementalDetailsByPath}. - If an error {error} bubbled past any of the grouped field sets above: - Set {state} of {executionGroup} to {FAILED}. - If {deliveryGroups} contains exactly one entry: @@ -1064,7 +1064,7 @@ variableValues): - Optionally, {FlushStream()}. - Otherwise: - TODO: store the data for later, send it with one of our dependents. -- Call {HandleIncremental(initialIncrementalFieldSetsByPath)}. +- Call {HandleIncremental(initialIncrementalDetailsByPath)}. - Assert: {pending} is not empty. - Let {initialResponse} be an unordered map containing {data}, {errors}, {pending}, and the value {true} for key {hasNext}. From 4aac505983b8fa4e870b0fdc279aa215b19e0800 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Mon, 5 Feb 2024 11:40:56 +0000 Subject: [PATCH 04/10] Minor editorial and fixes --- spec/Section 6 -- Execution.md | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 46ab86086..1feb3c658 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -248,8 +248,10 @@ CreateSourceEventStream(subscription, schema, variableValues, initialValue): - Let {subscriptionType} be the root Subscription type in {schema}. - Assert: {subscriptionType} is an Object type. - Let {selectionSet} be the top level Selection Set in {subscription}. +- Let {path} be an empty list. +- Let {eventSourceDeliveryGroup} be a new delivery group with path {path}. - Let {groupedFieldSet} be the result of {CollectFields(subscriptionType, - selectionSet, variableValues)}. + selectionSet, variableValues, path, eventSourceDeliveryGroup)}. - If {groupedFieldSet} does not have exactly one entry, raise a _request error_. - Let {fields} be the value of the first entry in {groupedFieldSet}. - Let {fieldName} be the name of the first entry in {fields}. Note: This value @@ -357,8 +359,8 @@ be executed in parallel. Each represented field in the grouped field set produces an entry into a response map. -ExecuteGroupedFieldSet(groupedFieldSet, objectType, objectValue, -variableValues): +ExecuteGroupedFieldSet(groupedFieldSet, objectType, objectValue, variableValues, +path, currentDeliveryGroups): - Let {incrementalDetailsByPath} be an empty map. - Initialize {resultMap} to an empty ordered map. @@ -372,11 +374,11 @@ variableValues): {objectType}. - If {fieldType} is defined: - Let {childPath} be the result of appending {responseKey} to {path}. - - Let {responseValue} and {childFieldSetsByPath} be + - Let {responseValue} and {childIncrementalDetailsByPath} be {ExecuteField(objectType, objectValue, fieldType, fieldDigests, variableValues, childPath, currentDeliveryGroups)}. - Set {responseValue} as the value for {responseKey} in {resultMap}. - - For each {childFieldSetsByPath} as {childPath} and {fieldSet}: + - For each {childIncrementalDetailsByPath} as {childPath} and {fieldSet}: - Set {fieldSet} as the value for {childPath} in {incrementalDetailsByPath}. - Otherwise: @@ -537,9 +539,6 @@ response in a stable and predictable order. CollectFields(objectType, selectionSet, variableValues, path, deliveryGroup, visitedFragments): -- If {path} is not provided, initialize it to an empty list. -- If {deliveryGroup} is not provided, initialize it to be a new delivery group - with path {path}. - If {visitedFragments} is not provided, initialize it to the empty set. - Initialize {groupedFields} to an empty ordered map of lists. - For each {selection} in {selectionSet}: @@ -758,14 +757,14 @@ currentDeliveryGroups): - For each list item {resultItem} at 0-indexed index {resultIndex} in {result}: - Let {subpath} be the result of appending {resultIndex} to {path}. - - Let {listValue} and {itemIncrementalFieldSetsByPath} be the result of + - Let {listValue} and {itemIncrementalDetailsByPath} be the result of calling {CompleteValue(innerType, fieldDigests, resultItem, variableValues, subpath, currentDeliveryGroups)}. - Append {listValue} to {list}. - If {listValue} is not {null}: - - For each {itemIncrementalFieldSetsByPath} as {childPath} and - {childFieldSets}: - - Set {childFieldSets} as the value for {childPath} in + - For each {itemIncrementalDetailsByPath} as {childPath} and + {childIncrementalDetails}: + - Set {childIncrementalDetails} as the value for {childPath} in {incrementalDetailsByPath}. - Return {list} and {incrementalDetailsByPath}. - If {fieldType} is a Scalar or Enum type: From c26be659d2f6326e5079120aacd08e9973e4039c Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Mon, 5 Feb 2024 17:25:57 +0000 Subject: [PATCH 05/10] Consistency --- spec/Section 6 -- Execution.md | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 1feb3c658..575910797 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -378,8 +378,9 @@ path, currentDeliveryGroups): {ExecuteField(objectType, objectValue, fieldType, fieldDigests, variableValues, childPath, currentDeliveryGroups)}. - Set {responseValue} as the value for {responseKey} in {resultMap}. - - For each {childIncrementalDetailsByPath} as {childPath} and {fieldSet}: - - Set {fieldSet} as the value for {childPath} in + - For each {childIncrementalDetailsByPath} as {childPath} and + {childIncrementalDetails}: + - Set {childIncrementalDetails} as the value for {childPath} in {incrementalDetailsByPath}. - Otherwise: - Let {details} be the details object in {incrementalDetailsByPath} for @@ -748,7 +749,7 @@ currentDeliveryGroups): - If {completedResult} is {null}, raise a _field error_. - Return {completedResult} and {incrementalDetailsByPath}. - If {result} is {null} (or another internal value similar to {null} such as - {undefined}), return {null}. + {undefined}), return {null} and an empty map. - If {fieldType} is a List type: - If {result} is not a collection of values, raise a _field error_. - Let {innerType} be the inner type of {fieldType}. @@ -936,10 +937,10 @@ An execution group consists of: write the data, - {status}: {PENDING}, {EXECUTING}, {COMPLETE} or {FAILED}, - {dependencies}: a list of execution groups on which it is dependent, and -- {detailsByPath}: a map of response path to a details object containing - {groupedFieldSet}, {objectType} and {objectValue}. +- {incrementalDetailsByPath}: a map of response path to a details object + containing {groupedFieldSet}, {objectType} and {objectValue}. -Note: {dependencies} and {detailsByPath} may be added to over time. +Note: {dependencies} and {incrementalDetailsByPath} may be added to over time. ExecutionGroupPath(executionGroup): From 629f6fbb1f8a74e6f670e79bbffb6a0577e6b900 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Tue, 6 Feb 2024 13:22:43 +0000 Subject: [PATCH 06/10] New algorithm using streams, graphs and partitions --- spec/Section 6 -- Execution.md | 240 +++++++++++++++------------------ 1 file changed, 106 insertions(+), 134 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 575910797..5cfc1d6cf 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -956,140 +956,112 @@ ExecutionGroupPath(executionGroup): IncrementalEventStream(data, errors, initialIncrementalDetailsByPath, variableValues): +- Let {incrementalDetailsByPath} be {initialIncrementalDetailsByPath}. - Return a new event stream {responseStream} which yields events as follows: -- Let {nextId} be {0}. -- Let {executionGroups} be an empty set. + - Let {pendingDeliveryGroups} be + {CollectDeliveryGroups(incrementalDetailsByPath)}. + - Assert: {pendingDeliveryGroups} is not empty. + - Let {pending} be {MakePending(pendingDeliveryGroups)}. + - Yield an event containing {data}, {errors}, {pending}, and the value {true} + for {hasNext}. + - Let {streams} be {IncrementalStreams(incrementalDetailsByPath)}. + - For each {event} on each stream in {streams}: + - Yield {event}. + - When every stream in {streams} has completed: + - Yield a map with {hasNext} set to {false}. + +In order to increase efficiency, any two or more consecutive payloads in the +IncrementalEventStream stream may optionally be combined by concatenating the +lists therein (maintaining order) and setting {hasNext} to {false} if any of the +payloads has {hasNext} set to {false}, otherwise {true}. + +CollectDeliveryGroups(incrementalDetailsByPath): + +- Let {allDeliveryGroup} be an empty set. +- For each {incrementalDetailsByPath} as {path} and {details}: + - Let {objectType}, {objectValue} and {groupedFieldSet} be those properties in + {details}. + - For each {groupedFieldSet} as {responseKey} and {fieldDigests}: + - For each {fieldDigests} as {fieldDigest}. + - Let {deliveryGroup} be the delivery group in {fieldDigest}. + - Add {deliveryGroup} to {allDeliveryGroups}. +- Return {allDeliveryGroups}. + +MakePending(deliveryGroups): + - Let {pending} be an empty list. -- Define the sub-procedure {CreateExecutionGroup(deliveryGroups)} with the - following actions: - - Let {executionGroup} be a new execution group that relates to - {deliveryGroups} with no dependencies and an empty {detailsByPath}. - - If {deliveryGroups} contains more than one entry: - - Let {id} be {null}. - - Let {bestPath} be {null}. - - For each {deliveryGroups} as {deliveryGroup}: - - Let {deliveryGroupSet} be a set containing {deliveryGroup}. - - Let {childExecutionGroup} be the result of - {ExecutionGroupFor(deliveryGroupSet)}. - - Add {executionGroup} as a dependency of {childExecutionGroup}. - - Let {path} be the path of {deliveryGroup}. - - If {bestPath} is {null} or {bestPath} contains fewer entries than - {path}: - - Let {bestPath} be {path}. - - Let {id} be the {pendingId} of {childExecutionGroup}. - - Otherwise: - - Let {id} be {nextId} and increment {nextId} by one. - - Let {deliveryGroup} be the only entry in {deliveryGroups}. - - Let {path} be the path of {deliveryGroup}. - - Let {label} be the label of {deliveryGroup} (if any). - - Let {pendingPayload} be an unordered map containing {id}, {path}, {label}. - - Add {pendingPayload} to {pending}. - - Assert: {id} is not null. - - Set {id} as the value for {pendingId} in {executionGroup}. - - Add {executionGroup} to {executionGroups}. - - Return {executionGroup}. -- Define the sub-procedure {ExecutionGroupFor(deliveryGroups)} with the - following actions: - - Let {executionGroup} be the execution group for the current operation that - relates to the delivery groups {deliveryGroups} and only those delivery - groups. If no such execution group exists then let {executionGroup} be the - result of {CreateExecutionGroup(deliveryGroups)}. - - Return {executionGroup}. -- Define the sub-procedure {AddFieldDigestsToExecutionGroup(executionGroup, - path, objectType, objectValue, responseKey, fieldDigests)} with the following - actions: - - Let {detailsByPath} be that property of {executionGroup}. - - Let {details} be the details object in {detailsByPath} for {path}; if no - such details object exists, create it as a details object containing - {groupedFieldSet} as an empty map, {objectType} and {objectValue}. - - Let {groupedFieldSet} be that property in {details}. - - Set {fieldDigests} as the value for {responseKey} in {groupedFieldSet}. -- Define the sub-procedure {HandleIncremental(incrementalDetailsByPath)} with - the following actions: - - For each {incrementalDetailsByPath} as {path} and {details}: - - Let {objectType} be that property in {details}. - - Let {objectValue} be that property in {details}. - - Let {groupedFieldSet} be that property in {details}. - - For each {groupedFieldSet} as {responseKey} and {fieldDigests}: - - Let {deliveryGroups} be the set containing each unique delivery group - for each digest in {fieldDigests}. - - Let {executionGroup} be {ExecutionGroupFor(deliveryGroups)}. - - Call {AddFieldDigestsToExecutionGroup(executionGroup, path, objectType, - objectValue, responseKey, fieldDigests)}. -- Define the sub-procedure {ExecuteExecutionGroup(executionGroup)}: - - Set {state} of {executionGroup} to {EXECUTING}. - - Let {detailsByPath} be that property of {executionGroup}. - - Let {deliveryGroups} be that property of {executionGroup}. - - Let {dependencies} be that property of {executionGroup}. - - For each {detailsByPath} as {path} and {details} (in parallel): - - Let {groupedFieldSet} be that property in {details}. - - Let {objectType} be that property in {details}. - - Let {objectValue} be that property in {details}. - - Assert: {objectValue} exists and is not {null}. - - Let {data} and {incrementalDetailsByPath} be the result of running - {ExecuteGroupedFieldSet(groupedFieldSet, objectType, objectValue, - variableValues, path, deliveryGroups)} _normally_ (allowing - parallelization). - - TODO: collect {data} at {path} (relative to - {ExecutionGroupPath(executionGroup)}). - - TODO: collect {incrementalDetailsByPath}. - - If an error {error} bubbled past any of the grouped field sets above: - - Set {state} of {executionGroup} to {FAILED}. - - If {deliveryGroups} contains exactly one entry: - - Let {id} be {pendingId} of {executionGroup}. - - Let {completedPayload} be an unordered map containing {id}, {error}. - - Add {completedPayload} to {completed}. - - Otherwise: - - For each {deliveryGroups} as {deliveryGroup}: - - Let {deliveryGroupSet} be a set containing {deliveryGroup}. - - Let {dependentExecutionGroup} be - {ExecutionGroupFor(deliveryGroupSet)}. - - Set {state} of {dependentExecutionGroup} to {FAILED}. - - Let {id} be {pendingId} of {dependentExecutionGroup}. - - Let {completedPayload} be an unordered map containing {id}, {error}. - - Add {completedPayload} to {completed}. - - Remove {dependentExecutionGroup} from {executionGroups}. - - Remove {executionGroup} from {executionGroups}. - - Optionally, {FlushStream()}. - - Otherwise: - - Set {state} of {executionGroup} to {COMPLETE}. - - If {deliveryGroups} contains exactly one entry: - - For each {dependencies} as {dependency}: - - TODO: send that stored data (see below). - - Remove {dependency} as a dependency from each execution group in - {executionGroups} for which it is a dependency. - - Remove {dependency} from {executionGroups}. - - TODO: push all the data into {incremental}. - - Remove {executionGroup} from {executionGroups}. - - Optionally, {FlushStream()}. +- For each {deliveryGroups} as {deliveryGroup}: + - Let {id}, {path} and {label} be those properties in {deliveryGroup}. + - Let {pendingPayload} be an unordered map containing {id}, {path}, {label}. + - Append {pendingPayload} to {pending}. +- Return {pending}. + +IncrementalStreams(incrementalDetailsByPath): + +- Let {streams} be an empty list. +- Let {runnableDeliveryGroupsSets} be the result of + {PartitionDeliveryGroupsSets(incrementalDetailsByPath)}. +- For each {runnableDeliveryGroupsSets} as {runnableDeliveryGroupsSet}: + - Let {stream} be {IncrementalStream(incrementalDetailsByPath, + runnableDeliveryGroupsSet)}. + - Append {stream} to {streams}. +- Return {streams}. + +PartitionDeliveryGroupsSets(incrementalDetailsByPath): + +- Let {graph} be an empty graph, where the nodes are delivery groups. +- For each {incrementalDetailsByPath} as {path} and {details}: + - Let {objectType}, {objectValue} and {groupedFieldSet} be those properties in + {details}. + - For each {groupedFieldSet} as {responseKey} and {fieldDigests}: + - Let {deliveryGroupsSet} be the set containing the delivery group from each + digest in {fieldDigests}. + - Add each {deliveryGroup} in {deliveryGroupsSet} as a node to the {graph} + (if it's not already present). + - For each pair of delivery groups {deliveryGroup1} and {deliveryGroup2} in + {deliveryGroupsSet}: + - Ensure {deliveryGroup1} and {deliveryGroup2} are connected in {graph}. +- Let {partitionedDeliveryGroupsSets} be an empty unordered list. +- For each connected {subgraph} in {graph}: + - Let {deliveryGroupsSet} be the delivery groups in {subgraph}. + - Add {deliveryGroupsSet} to {partitionedDeliveryGroupsSets}. +- Assert: every {deliveryGroup} in {graph} must appear in exactly one set in + {partitionedDeliveryGroupsSets}. +- Return {partitionedDeliveryGroupsSets}. + +IncrementalStream(incrementalDetailsByPath, deliveryGroupsSet): + +- Let {remainingIncrementalDetailsByPath}, {runnable} be + {SplitRunnable(incrementalDetailsByPath, deliveryGroupsSet)}. +- Return a new event stream {incrementalStream} which yields events as follows: + - TODO: run the runnable + +SplitRunnable(incrementalDetailsByPath, runnableDeliveryGroupsSet): + +- Let {remainingIncrementalDetailsByPath} be an empty map. +- Let {runnable} be an empty map. +- For each {incrementalDetailsByPath} as {path} and {details}: + - Let {objectType}, {objectValue} and {groupedFieldSet} be those properties in + {details}. + - For each {groupedFieldSet} as {responseKey} and {fieldDigests}: + - Let {deliveryGroups} be the set containing the delivery group from each + digest in {fieldDigests}. + - If {deliveryGroups} contains the same number and set of delivery groups as + {runnableDeliveryGroupsSet} (order unimportant): + - Let {incrementalDetails} be the incremental details object in {runnable} + for {path}; if no such object exists, create it with {objectType}, + {objectValue}, and an empty {fieldDigests} map. + - Otherwise, if {deliveryGroups} only contains delivery groups that are also + in {runnableDeliveryGroupsSet}: + - Let {incrementalDetails} be the incremental details object in + {remainingIncrementalDetailsByPath} for {path}; if no such object + exists, create it with {objectType}, {objectValue}, and an empty + {fieldDigests} map. - Otherwise: - - TODO: store the data for later, send it with one of our dependents. -- Call {HandleIncremental(initialIncrementalDetailsByPath)}. -- Assert: {pending} is not empty. -- Let {initialResponse} be an unordered map containing {data}, {errors}, - {pending}, and the value {true} for key {hasNext}. -- Yield an event containing {initialResponse}. -- Let {incremental} be an empty list. -- Let {pending} be an empty list. -- Let {completed} be an empty list. -- Define the sub-procedure {FlushStream()} with the following actions: - - Let {hasNext} be true if {executionGroups} is not empty, {false} otherwise. - - Let {incrementalPayload} be an empty unordered map. - - Add {hasNext} to {incrementalPayload}. - - If {incremental} is not empty: - - Add {incremental} to {incrementalPayload}. - - If {pending} is not empty: - - Add {pending} to {incrementalPayload}. - - If {completed} is not empty: - - Add {completed} to {incrementalPayload}. - - Yield an event containing {incrementalPayload}. - - Reset {incremental} to an empty list. - - Reset {pending} to an empty list. - - Reset {completed} to an empty list. - - If {hasNext} is {false}, complete {responseStream}. -- While {executionGroups} is not empty: - - Let {readyToExecute} be the list of {PENDING} execution groups in - {executionGroups} whose {dependencies} are all {COMPLETE}. - - For each {readyToExecute} as {executionGroup} (in parallel): - - Call {ExecuteExecutionGroup(executionGroup)}. -- Call {FlushStream()}. + - Continue with the next {responseKey} and {fieldDigests} in + {groupedFieldSet}. + - Let {targetGroupedFieldSet} be the {groupedFieldSet} property of + {incrementalDetails}. + - Set {fieldDigests} as the value for {responseKey} in + {targetGroupedFieldSet}. +- Return {remainingIncrementalDetailsByPath} and {runnable}. From b0530f72eda3ebd19fd9400d3c4c66a057c7799b Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Tue, 6 Feb 2024 14:48:53 +0000 Subject: [PATCH 07/10] More of the execution algorithm --- spec/Section 6 -- Execution.md | 131 ++++++++++++++++++++++----------- 1 file changed, 86 insertions(+), 45 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 5cfc1d6cf..a6f2600dc 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -918,39 +918,6 @@ In an operation that does not utilise the `@stream` and `@defer` directives, there will only be a single delivery group, the _root delivery group_, and all fields will belong to it. -### Execution Group - -::: An _execution group_ represents a number of fields at a number of given -paths in the operation that all belong to the same _delivery group_ or groups. -An execution group may depend on other execution groups, such that it cannot be -delivered until they also have been delivered. - -::: A _shared execution group_ is an _execution group_ that applies to more than -one _delivery group_. Shared execution groups cannot be delivered on their own, -they must only be delivered along with at least one completed _execution group_ -that depends on them. - -An execution group consists of: - -- {deliveryGroups}: a set of the delivery groups to which it applies, -- {pendingId}: a numeric id used in the response stream to identify where to - write the data, -- {status}: {PENDING}, {EXECUTING}, {COMPLETE} or {FAILED}, -- {dependencies}: a list of execution groups on which it is dependent, and -- {incrementalDetailsByPath}: a map of response path to a details object - containing {groupedFieldSet}, {objectType} and {objectValue}. - -Note: {dependencies} and {incrementalDetailsByPath} may be added to over time. - -ExecutionGroupPath(executionGroup): - -- Let {bestPath} be {null}. -- Let {deliveryGroups} be that property of {executionGroup}. -- For each {deliveryGroups} as {deliveryGroup}: -- If {bestPath} is {null} or {bestPath} contains fewer entries than {path}: - - Let {bestPath} be {path}. -- Return {bestPath}. - ### Incremental Event Stream IncrementalEventStream(data, errors, initialIncrementalDetailsByPath, @@ -964,7 +931,8 @@ variableValues): - Let {pending} be {MakePending(pendingDeliveryGroups)}. - Yield an event containing {data}, {errors}, {pending}, and the value {true} for {hasNext}. - - Let {streams} be {IncrementalStreams(incrementalDetailsByPath)}. + - Let {streams} and {runnableDeliveryGroupsSets} be + {IncrementalStreams(incrementalDetailsByPath)}. - For each {event} on each stream in {streams}: - Yield {event}. - When every stream in {streams} has completed: @@ -975,8 +943,9 @@ IncrementalEventStream stream may optionally be combined by concatenating the lists therein (maintaining order) and setting {hasNext} to {false} if any of the payloads has {hasNext} set to {false}, otherwise {true}. -CollectDeliveryGroups(incrementalDetailsByPath): +CollectDeliveryGroups(incrementalDetailsByPath, excludingDeliveryGroups): +- If {excludingDeliveryGroups} is not provided, initialize it to the empty set. - Let {allDeliveryGroup} be an empty set. - For each {incrementalDetailsByPath} as {path} and {details}: - Let {objectType}, {objectValue} and {groupedFieldSet} be those properties in @@ -984,7 +953,8 @@ CollectDeliveryGroups(incrementalDetailsByPath): - For each {groupedFieldSet} as {responseKey} and {fieldDigests}: - For each {fieldDigests} as {fieldDigest}. - Let {deliveryGroup} be the delivery group in {fieldDigest}. - - Add {deliveryGroup} to {allDeliveryGroups}. + - If {deliveryGroup} is not in {excludingDeliveryGroups}: + - Add {deliveryGroup} to {allDeliveryGroups}. - Return {allDeliveryGroups}. MakePending(deliveryGroups): @@ -1005,7 +975,7 @@ IncrementalStreams(incrementalDetailsByPath): - Let {stream} be {IncrementalStream(incrementalDetailsByPath, runnableDeliveryGroupsSet)}. - Append {stream} to {streams}. -- Return {streams}. +- Return {streams} and {runnableDeliveryGroupsSets}. PartitionDeliveryGroupsSets(incrementalDetailsByPath): @@ -1033,27 +1003,76 @@ IncrementalStream(incrementalDetailsByPath, deliveryGroupsSet): - Let {remainingIncrementalDetailsByPath}, {runnable} be {SplitRunnable(incrementalDetailsByPath, deliveryGroupsSet)}. +- Let {hasNext} be {true}. - Return a new event stream {incrementalStream} which yields events as follows: - - TODO: run the runnable + - In the event of one or more errors, {errors}: + - Let {completed} be an empty list. + - For each {deliveryGroupsSet} as {deliveryGroup}: + - {id} be the id of {deliveryGroup}. + - Append an unordered map containing {id} and {errors} to {completed}. + - Yield an unordered map containing {hasNext} and {completed}. + - Complete {incrementalStream}. + - Return. + - Let {incremental} be an empty list. + - For each {runnable} as {path} and {incrementalDetails} (in parallel): + - Let {objectType}, {objectValue} and {groupedFieldSet} be those properties + in {incrementalDetails}. + - Assert: {objectValue} exists and is not {null}. + - Let {data} and {childIncrementalDetailsByPath} be the result of running + {ExecuteGroupedFieldSet(groupedFieldSet, objectType, objectValue, + variableValues, path, deliveryGroups)} _normally_ (allowing + parallelization). + - Let {errors} be the list of all _field error_ raised while executing the + grouped field set. + - Let {remainingIncrementalDetailsByPath} be + {MergeIncrementalDetailsByPath(remainingIncrementalDetailsByPath, + childIncrementalDetailsByPath)}. + - Append an unordered map containing {hasNext}, {id}, {data} and {errors} to + {incremental}. + - Let {pendingDeliveryGroups} be + {CollectDeliveryGroups(incrementalDetailsByPath, deliveryGroupsSet)}. + - Let {pending} be {MakePending(pendingDeliveryGroups)}. + - Let {sentInitial} be {false}. + - Let {streams} and {runnableDeliveryGroupsSets} be + {IncrementalStreams(incrementalDetailsByPath)}. + - For each {deliveryGroupsSet} as {deliveryGroup}: + - If {deliveryGroup} is not contained in any delivery group set in + {runnableDeliveryGroupsSets}: + - If {sentInitial} is not {true}: + - Let {sentInitial} be {true}. + - Yield an unordered map containing {hasNext}, {incremental} and + {pending}. + - Let {id} be the id of {deliveryGroup}. + - Let {completedItem} be an unordered map containing {id}. + - Let {completed} be a list containing {completedItem}. + - Yield an unordered map containing {hasNext} and {completed}. + - For each {event} on each stream in {streams}: + - If {sentInitial} is not {true}: + - Let {sentInitial} be {true}. + - Yield an unordered map containing {hasNext}, {incremental} and + {pending}. + - Yield {event}. + - When every stream in {streams} has completed: + - Complete {incrementalStreams}. SplitRunnable(incrementalDetailsByPath, runnableDeliveryGroupsSet): - Let {remainingIncrementalDetailsByPath} be an empty map. - Let {runnable} be an empty map. -- For each {incrementalDetailsByPath} as {path} and {details}: +- For each {incrementalDetailsByPath} as {path} and {incrementalDetails}: - Let {objectType}, {objectValue} and {groupedFieldSet} be those properties in - {details}. + {incrementalDetails}. - For each {groupedFieldSet} as {responseKey} and {fieldDigests}: - Let {deliveryGroups} be the set containing the delivery group from each digest in {fieldDigests}. - If {deliveryGroups} contains the same number and set of delivery groups as {runnableDeliveryGroupsSet} (order unimportant): - - Let {incrementalDetails} be the incremental details object in {runnable} - for {path}; if no such object exists, create it with {objectType}, - {objectValue}, and an empty {fieldDigests} map. + - Let {targetIncrementalDetails} be the incremental details object in + {runnable} for {path}; if no such object exists, create it with + {objectType}, {objectValue}, and an empty {fieldDigests} map. - Otherwise, if {deliveryGroups} only contains delivery groups that are also in {runnableDeliveryGroupsSet}: - - Let {incrementalDetails} be the incremental details object in + - Let {targetIncrementalDetails} be the incremental details object in {remainingIncrementalDetailsByPath} for {path}; if no such object exists, create it with {objectType}, {objectValue}, and an empty {fieldDigests} map. @@ -1061,7 +1080,29 @@ SplitRunnable(incrementalDetailsByPath, runnableDeliveryGroupsSet): - Continue with the next {responseKey} and {fieldDigests} in {groupedFieldSet}. - Let {targetGroupedFieldSet} be the {groupedFieldSet} property of - {incrementalDetails}. + {targetIncrementalDetails}. - Set {fieldDigests} as the value for {responseKey} in {targetGroupedFieldSet}. - Return {remainingIncrementalDetailsByPath} and {runnable}. + +MergeIncrementalDetailsByPath(incrementalDetailsByPath1, +incrementalDetailsByPath2): + +- Let {incrementalDetailsByPath} be a copy of {incrementalDetailsByPath1}. +- For each {incrementalDetailsByPath2} as {path} and {newIncrementalDetails}: + - Let {originalIncrementalDetails} be the value for {path} in + {incrementalDetailsByPath}, or {null} if no such entry exists. + - If {originalIncrementalDetails} is null: + - Set {newIncrementalDetails} as the value for {path} in + {incrementalDetailsByPath}. + - Otherwise: + - Let {originalGroupedFieldSet} be the grouped field set in + {originalIncrementalDetails}. + - Let {newGroupedFieldSet} be the grouped field set in + {newIncrementalDetails}. + - For each {newGroupedFieldSet} as {responseKey} and {newFieldDigests}: + - Let {fieldDigests} be the value for {responseKey} in + {originalGroupedFieldSet}; or if no such entry is found, create it as + the empty set. + - Add every entry in {newFieldDigests} to {fieldDigests}. +- Return {incrementalDetailsByPath}. From d1f82180b676621f413bf837c5811c640fe210be Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Tue, 6 Feb 2024 14:56:54 +0000 Subject: [PATCH 08/10] Remove stream --- spec/Section 6 -- Execution.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index a6f2600dc..5283a2e80 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -904,7 +904,7 @@ return `Non-Null` types, then the {"data"} entry in the response should be ### Delivery Group ::: A _delivery group_ represents either the root selection set or a particular -`@stream` or `@defer` directive at a particular {path} in the response. +`@defer` directive at a particular {path} in the response. ::: The _root delivery group_ is the _delivery group_ that represents the root selection set in the operation. @@ -914,9 +914,9 @@ delivery group_. During field collection, the delivery group of each field is tracked, and this is used when determining when to execute and deliver defered fields and streamed list items. -In an operation that does not utilise the `@stream` and `@defer` directives, -there will only be a single delivery group, the _root delivery group_, and all -fields will belong to it. +In an operation that does not utilise the `@defer` directive, there will only be +a single delivery group, the _root delivery group_, and all fields will belong +to it. ### Incremental Event Stream From efe3cd22ea02a95d51e39b24b4143c09cb549203 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Tue, 6 Feb 2024 15:12:19 +0000 Subject: [PATCH 09/10] Fix --- spec/Section 6 -- Execution.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 5283a2e80..97d812064 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -1030,11 +1030,12 @@ IncrementalStream(incrementalDetailsByPath, deliveryGroupsSet): - Append an unordered map containing {hasNext}, {id}, {data} and {errors} to {incremental}. - Let {pendingDeliveryGroups} be - {CollectDeliveryGroups(incrementalDetailsByPath, deliveryGroupsSet)}. + {CollectDeliveryGroups(remainingIncrementalDetailsByPath, + deliveryGroupsSet)}. - Let {pending} be {MakePending(pendingDeliveryGroups)}. - Let {sentInitial} be {false}. - Let {streams} and {runnableDeliveryGroupsSets} be - {IncrementalStreams(incrementalDetailsByPath)}. + {IncrementalStreams(remainingIncrementalDetailsByPath)}. - For each {deliveryGroupsSet} as {deliveryGroup}: - If {deliveryGroup} is not contained in any delivery group set in {runnableDeliveryGroupsSets}: From 6dc94890cd68db99ba5f2eb398f01f3cf81a2d37 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Tue, 6 Feb 2024 15:31:07 +0000 Subject: [PATCH 10/10] rr --- spec/Section 6 -- Execution.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/Section 6 -- Execution.md b/spec/Section 6 -- Execution.md index 97d812064..2d478b5fe 100644 --- a/spec/Section 6 -- Execution.md +++ b/spec/Section 6 -- Execution.md @@ -911,7 +911,7 @@ selection set in the operation. Each _delivery group_ belongs to a {parent} delivery group, except for the _root delivery group_. During field collection, the delivery group of each field is -tracked, and this is used when determining when to execute and deliver defered +tracked, and this is used when determining when to execute and deliver deferred fields and streamed list items. In an operation that does not utilise the `@defer` directive, there will only be