From 6e3eecdb3bf148ddcf15fe79cfebadba11cb6f0a Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 3 Jul 2023 13:22:24 +0300 Subject: [PATCH 1/3] incremental: subsequent result records should not store parent references as memory then cannot be freed --- src/execution/IncrementalPublisher.ts | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index f48e62e6a2..7b7eca3a0c 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -276,6 +276,9 @@ export class IncrementalPublisher { publishInitial() { for (const child of this._initialResult.children) { + if (child.filtered) { + continue; + } this._publish(child); } } @@ -299,11 +302,7 @@ export class IncrementalPublisher { } this._delete(child); - const parent = - child.parentContext === undefined - ? this._initialResult - : child.parentContext; - parent.children.delete(child); + child.filtered = true; if (isStreamItemsRecord(child)) { if (child.asyncIterator !== undefined) { @@ -364,6 +363,9 @@ export class IncrementalPublisher { for (const incrementalDataRecord of completedRecords) { const incrementalResult: IncrementalResult = {}; for (const child of incrementalDataRecord.children) { + if (child.filtered) { + continue; + } this._publish(child); } if (isStreamItemsRecord(incrementalDataRecord)) { @@ -435,20 +437,16 @@ export class DeferredFragmentRecord { label: string | undefined; path: Array; data: ObjMap | null; - parentContext: IncrementalDataRecord | undefined; children: Set; isCompleted: boolean; - constructor(opts: { - label: string | undefined; - path: Path | undefined; - parentContext: IncrementalDataRecord | undefined; - }) { + filtered: boolean; + constructor(opts: { label: string | undefined; path: Path | undefined }) { this.label = opts.label; this.path = pathToArray(opts.path); - this.parentContext = opts.parentContext; this.errors = []; this.children = new Set(); this.isCompleted = false; + this.filtered = false; this.data = null; } } @@ -459,25 +457,24 @@ export class StreamItemsRecord { label: string | undefined; path: Array; items: Array | null; - parentContext: IncrementalDataRecord | undefined; children: Set; asyncIterator: AsyncIterator | undefined; isCompletedAsyncIterator?: boolean; isCompleted: boolean; + filtered: boolean; constructor(opts: { label: string | undefined; path: Path | undefined; asyncIterator?: AsyncIterator; - parentContext: IncrementalDataRecord | undefined; }) { this.items = null; this.label = opts.label; this.path = pathToArray(opts.path); - this.parentContext = opts.parentContext; this.asyncIterator = opts.asyncIterator; this.errors = []; this.children = new Set(); this.isCompleted = false; + this.filtered = false; this.items = null; } } From c9db779e797be7409177fd9cacd5dbdfba4295f9 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 3 Jul 2023 14:22:34 +0300 Subject: [PATCH 2/3] do not store initialResultRecord on context as then memory for the result record tree cannot be freed --- src/execution/IncrementalPublisher.ts | 111 ++++++++++++-------------- src/execution/execute.ts | 75 +++++++++-------- 2 files changed, 93 insertions(+), 93 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 7b7eca3a0c..c39486ce75 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -97,34 +97,17 @@ export type FormattedIncrementalResult< * parents have completed so that they can no longer be filtered. This includes all Incremental * Data records in `released`, as well as Incremental Data records that have not yet completed. * - * `_initialResult`: a record containing the state of the initial result, as follows: - * `isCompleted`: indicates whether the initial result has completed. - * `children`: the set of Incremental Data records that can be be published when the initial - * result is completed. - * - * Each Incremental Data record also contains similar metadata, i.e. these records also contain - * similar `isCompleted` and `children` properties. - * * @internal */ export class IncrementalPublisher { - private _initialResult: { - children: Set; - isCompleted: boolean; - }; - - private _released: Set; - private _pending: Set; + private _released: Set; + private _pending: Set; // these are assigned within the Promise executor called synchronously within the constructor private _signalled!: Promise; private _resolve!: () => void; constructor() { - this._initialResult = { - children: new Set(), - isCompleted: false, - }; this._released = new Set(); this._pending = new Set(); this._reset(); @@ -210,19 +193,22 @@ export class IncrementalPublisher { }; } + prepareInitialResultRecord(): InitialResultRecord { + return { + errors: [], + children: new Set(), + }; + } + prepareNewDeferredFragmentRecord(opts: { label: string | undefined; path: Path | undefined; - parentContext: IncrementalDataRecord | undefined; + parentContext: IncrementalDataRecord; }): DeferredFragmentRecord { const deferredFragmentRecord = new DeferredFragmentRecord(opts); const parentContext = opts.parentContext; - if (parentContext) { - parentContext.children.add(deferredFragmentRecord); - } else { - this._initialResult.children.add(deferredFragmentRecord); - } + parentContext.children.add(deferredFragmentRecord); return deferredFragmentRecord; } @@ -231,16 +217,12 @@ export class IncrementalPublisher { label: string | undefined; path: Path | undefined; asyncIterator?: AsyncIterator; - parentContext: IncrementalDataRecord | undefined; + parentContext: IncrementalDataRecord; }): StreamItemsRecord { const streamItemsRecord = new StreamItemsRecord(opts); const parentContext = opts.parentContext; - if (parentContext) { - parentContext.children.add(streamItemsRecord); - } else { - this._initialResult.children.add(streamItemsRecord); - } + parentContext.children.add(streamItemsRecord); return streamItemsRecord; } @@ -274,8 +256,8 @@ export class IncrementalPublisher { incrementalDataRecord.errors.push(error); } - publishInitial() { - for (const child of this._initialResult.children) { + publishInitial(initialResult: InitialResultRecord) { + for (const child of initialResult.children) { if (child.filtered) { continue; } @@ -283,20 +265,22 @@ export class IncrementalPublisher { } } - filter( - nullPath: Path, - erroringIncrementalDataRecord: IncrementalDataRecord | undefined, - ) { + getInitialErrors( + initialResult: InitialResultRecord, + ): ReadonlyArray { + return initialResult.errors; + } + + filter(nullPath: Path, erroringIncrementalDataRecord: IncrementalDataRecord) { const nullPathArray = pathToArray(nullPath); const asyncIterators = new Set>(); - const children = - erroringIncrementalDataRecord === undefined - ? this._initialResult.children - : erroringIncrementalDataRecord.children; + const descendants = this._getDescendants( + erroringIncrementalDataRecord.children, + ); - for (const child of this._getDescendants(children)) { + for (const child of descendants) { if (!this._matchesPath(child.path, nullPathArray)) { continue; } @@ -332,31 +316,31 @@ export class IncrementalPublisher { this._signalled = signalled; } - private _introduce(item: IncrementalDataRecord) { + private _introduce(item: SubsequentDataRecord) { this._pending.add(item); } - private _release(item: IncrementalDataRecord): void { + private _release(item: SubsequentDataRecord): void { if (this._pending.has(item)) { this._released.add(item); this._trigger(); } } - private _push(item: IncrementalDataRecord): void { + private _push(item: SubsequentDataRecord): void { this._released.add(item); this._pending.add(item); this._trigger(); } - private _delete(item: IncrementalDataRecord) { + private _delete(item: SubsequentDataRecord) { this._released.delete(item); this._pending.delete(item); this._trigger(); } private _getIncrementalResult( - completedRecords: ReadonlySet, + completedRecords: ReadonlySet, ): SubsequentIncrementalExecutionResult | undefined { const incrementalResults: Array = []; let encounteredCompletedAsyncIterator = false; @@ -398,18 +382,18 @@ export class IncrementalPublisher { : undefined; } - private _publish(incrementalDataRecord: IncrementalDataRecord) { - if (incrementalDataRecord.isCompleted) { - this._push(incrementalDataRecord); + private _publish(subsequentResultRecord: SubsequentDataRecord) { + if (subsequentResultRecord.isCompleted) { + this._push(subsequentResultRecord); } else { - this._introduce(incrementalDataRecord); + this._introduce(subsequentResultRecord); } } private _getDescendants( - children: ReadonlySet, - descendants = new Set(), - ): ReadonlySet { + children: ReadonlySet, + descendants = new Set(), + ): ReadonlySet { for (const child of children) { descendants.add(child); this._getDescendants(child.children, descendants); @@ -431,13 +415,18 @@ export class IncrementalPublisher { } } +export interface InitialResultRecord { + errors: Array; + children: Set; +} + /** @internal */ export class DeferredFragmentRecord { errors: Array; label: string | undefined; path: Array; data: ObjMap | null; - children: Set; + children: Set; isCompleted: boolean; filtered: boolean; constructor(opts: { label: string | undefined; path: Path | undefined }) { @@ -457,7 +446,7 @@ export class StreamItemsRecord { label: string | undefined; path: Array; items: Array | null; - children: Set; + children: Set; asyncIterator: AsyncIterator | undefined; isCompletedAsyncIterator?: boolean; isCompleted: boolean; @@ -479,10 +468,12 @@ export class StreamItemsRecord { } } -export type IncrementalDataRecord = DeferredFragmentRecord | StreamItemsRecord; +export type SubsequentDataRecord = DeferredFragmentRecord | StreamItemsRecord; + +export type IncrementalDataRecord = InitialResultRecord | SubsequentDataRecord; function isStreamItemsRecord( - incrementalDataRecord: IncrementalDataRecord, -): incrementalDataRecord is StreamItemsRecord { - return incrementalDataRecord instanceof StreamItemsRecord; + subsequentResultRecord: SubsequentDataRecord, +): subsequentResultRecord is StreamItemsRecord { + return subsequentResultRecord instanceof StreamItemsRecord; } diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 1ec11f72cc..af68c286e1 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -56,7 +56,9 @@ import type { FormattedIncrementalResult, IncrementalDataRecord, IncrementalResult, + InitialResultRecord, StreamItemsRecord, + SubsequentDataRecord, SubsequentIncrementalExecutionResult, } from './IncrementalPublisher.js'; import { IncrementalPublisher } from './IncrementalPublisher.js'; @@ -128,7 +130,6 @@ export interface ExecutionContext { fieldResolver: GraphQLFieldResolver; typeResolver: GraphQLTypeResolver; subscribeFieldResolver: GraphQLFieldResolver; - errors: Array; incrementalPublisher: IncrementalPublisher; } @@ -289,14 +290,17 @@ function executeImpl( // Errors from sub-fields of a NonNull type may propagate to the top level, // at which point we still log the error and null the parent field, which // in this case is the entire response. - const { incrementalPublisher, errors } = exeContext; + const incrementalPublisher = exeContext.incrementalPublisher; + const initialResultRecord = incrementalPublisher.prepareInitialResultRecord(); try { - const result = executeOperation(exeContext); + const result = executeOperation(exeContext, initialResultRecord); if (isPromise(result)) { return result.then( (data) => { + const errors = + incrementalPublisher.getInitialErrors(initialResultRecord); const initialResult = buildResponse(data, errors); - incrementalPublisher.publishInitial(); + incrementalPublisher.publishInitial(initialResultRecord); if (incrementalPublisher.hasNext()) { return { initialResult: { @@ -309,13 +313,15 @@ function executeImpl( return initialResult; }, (error) => { - errors.push(error); + incrementalPublisher.addFieldError(initialResultRecord, error); + const errors = + incrementalPublisher.getInitialErrors(initialResultRecord); return buildResponse(null, errors); }, ); } - const initialResult = buildResponse(result, errors); - incrementalPublisher.publishInitial(); + const initialResult = buildResponse(result, initialResultRecord.errors); + incrementalPublisher.publishInitial(initialResultRecord); if (incrementalPublisher.hasNext()) { return { initialResult: { @@ -327,7 +333,8 @@ function executeImpl( } return initialResult; } catch (error) { - errors.push(error); + incrementalPublisher.addFieldError(initialResultRecord, error); + const errors = incrementalPublisher.getInitialErrors(initialResultRecord); return buildResponse(null, errors); } } @@ -445,7 +452,6 @@ export function buildExecutionContext( typeResolver: typeResolver ?? defaultTypeResolver, subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver, incrementalPublisher: new IncrementalPublisher(), - errors: [], }; } @@ -456,8 +462,6 @@ function buildPerEventExecutionContext( return { ...exeContext, rootValue: payload, - // no need to update incrementalPublisher, incremental delivery is not supported for subscriptions - errors: [], }; } @@ -466,6 +470,7 @@ function buildPerEventExecutionContext( */ function executeOperation( exeContext: ExecutionContext, + initialResultRecord: InitialResultRecord, ): PromiseOrValue> { const { operation, schema, fragments, variableValues, rootValue } = exeContext; @@ -495,6 +500,7 @@ function executeOperation( rootValue, path, groupedFieldSet, + initialResultRecord, ); break; case OperationTypeNode.MUTATION: @@ -504,6 +510,7 @@ function executeOperation( rootValue, path, groupedFieldSet, + initialResultRecord, ); break; case OperationTypeNode.SUBSCRIPTION: @@ -515,6 +522,7 @@ function executeOperation( rootValue, path, groupedFieldSet, + initialResultRecord, ); } @@ -525,6 +533,7 @@ function executeOperation( rootType, rootValue, patchGroupedFieldSet, + initialResultRecord, label, path, ); @@ -543,6 +552,7 @@ function executeFieldsSerially( sourceValue: unknown, path: Path | undefined, groupedFieldSet: GroupedFieldSet, + incrementalDataRecord: InitialResultRecord, ): PromiseOrValue> { return promiseReduce( groupedFieldSet, @@ -554,6 +564,7 @@ function executeFieldsSerially( sourceValue, fieldGroup, fieldPath, + incrementalDataRecord, ); if (result === undefined) { return results; @@ -581,7 +592,7 @@ function executeFields( sourceValue: unknown, path: Path | undefined, groupedFieldSet: GroupedFieldSet, - incrementalDataRecord?: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue> { const results = Object.create(null); let containsPromise = false; @@ -638,7 +649,7 @@ function executeField( source: unknown, fieldGroup: FieldGroup, path: Path, - incrementalDataRecord?: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue { const fieldName = fieldGroup[0].name.value; const fieldDef = exeContext.schema.getField(parentType, fieldName); @@ -761,7 +772,7 @@ function handleFieldError( returnType: GraphQLOutputType, fieldGroup: FieldGroup, path: Path, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): void { const error = locatedError(rawError, fieldGroup, pathToArray(path)); @@ -771,11 +782,9 @@ function handleFieldError( throw error; } - const errors = incrementalDataRecord?.errors ?? exeContext.errors; - // Otherwise, error protection is applied, logging the error and resolving // a null value for this field if one is encountered. - errors.push(error); + exeContext.incrementalPublisher.addFieldError(incrementalDataRecord, error); } /** @@ -806,7 +815,7 @@ function completeValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue { // If result is an Error, throw a located error. if (result instanceof Error) { @@ -898,7 +907,7 @@ async function completePromisedValue( info: GraphQLResolveInfo, path: Path, result: Promise, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): Promise { try { const resolved = await result; @@ -997,7 +1006,7 @@ async function completeAsyncIteratorValue( info: GraphQLResolveInfo, path: Path, asyncIterator: AsyncIterator, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): Promise> { const stream = getStreamValues(exeContext, fieldGroup, path); let containsPromise = false; @@ -1019,8 +1028,8 @@ async function completeAsyncIteratorValue( info, itemType, path, - stream.label, incrementalDataRecord, + stream.label, ); break; } @@ -1067,7 +1076,7 @@ function completeListValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue> { const itemType = returnType.ofType; @@ -1117,8 +1126,8 @@ function completeListValue( fieldGroup, info, itemType, - stream.label, previousIncrementalDataRecord, + stream.label, ); index++; continue; @@ -1158,7 +1167,7 @@ function completeListItemValue( fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemPath: Path, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): boolean { if (isPromise(item)) { completedResults.push( @@ -1257,7 +1266,7 @@ function completeAbstractValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue> { const resolveTypeFn = returnType.resolveType ?? exeContext.typeResolver; const contextValue = exeContext.contextValue; @@ -1367,7 +1376,7 @@ function completeObjectValue( info: GraphQLResolveInfo, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue> { // If there is an isTypeOf predicate function, call it with the // current result. If isTypeOf returns false, then raise an error rather @@ -1423,7 +1432,7 @@ function collectAndExecuteSubfields( fieldGroup: FieldGroup, path: Path, result: unknown, - incrementalDataRecord: IncrementalDataRecord | undefined, + incrementalDataRecord: IncrementalDataRecord, ): PromiseOrValue> { // Collect sub-fields to execute to complete this value. const { groupedFieldSet: subGroupedFieldSet, patches: subPatches } = @@ -1445,9 +1454,9 @@ function collectAndExecuteSubfields( returnType, result, subPatchGroupedFieldSet, + incrementalDataRecord, label, path, - incrementalDataRecord, ); } @@ -1747,9 +1756,9 @@ function executeDeferredFragment( parentType: GraphQLObjectType, sourceValue: unknown, fields: GroupedFieldSet, + parentContext: IncrementalDataRecord, label?: string, path?: Path, - parentContext?: IncrementalDataRecord, ): void { const incrementalPublisher = exeContext.incrementalPublisher; const incrementalDataRecord = @@ -1808,9 +1817,9 @@ function executeStreamField( fieldGroup: FieldGroup, info: GraphQLResolveInfo, itemType: GraphQLOutputType, + parentContext: IncrementalDataRecord, label?: string, - parentContext?: IncrementalDataRecord, -): IncrementalDataRecord { +): SubsequentDataRecord { const incrementalPublisher = exeContext.incrementalPublisher; const incrementalDataRecord = incrementalPublisher.prepareNewStreamItemsRecord({ @@ -1990,12 +1999,12 @@ async function executeStreamAsyncIterator( info: GraphQLResolveInfo, itemType: GraphQLOutputType, path: Path, + parentContext: IncrementalDataRecord, label?: string, - parentContext?: IncrementalDataRecord, ): Promise { const incrementalPublisher = exeContext.incrementalPublisher; let index = initialIndex; - let previousIncrementalDataRecord = parentContext ?? undefined; + let previousIncrementalDataRecord = parentContext; // eslint-disable-next-line no-constant-condition while (true) { const itemPath = addPath(path, index, undefined); From cfbe5a4b3008d81dfeebe1e3798c00a1985b7745 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Wed, 5 Jul 2023 15:32:38 +0300 Subject: [PATCH 3/3] no need for actual deletion --- src/execution/IncrementalPublisher.ts | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index c39486ce75..b36c5c7653 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -285,7 +285,6 @@ export class IncrementalPublisher { continue; } - this._delete(child); child.filtered = true; if (isStreamItemsRecord(child)) { @@ -333,12 +332,6 @@ export class IncrementalPublisher { this._trigger(); } - private _delete(item: SubsequentDataRecord) { - this._released.delete(item); - this._pending.delete(item); - this._trigger(); - } - private _getIncrementalResult( completedRecords: ReadonlySet, ): SubsequentIncrementalExecutionResult | undefined {