diff --git a/observability-test/database.ts b/observability-test/database.ts index b14813cce..5da43c37e 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -1565,6 +1565,137 @@ describe('Database', () => { }); }); + describe('runTransactionAsync', () => { + const SESSION = new FakeSession(); + const TRANSACTION = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.ReadWrite + ); + + let pool: FakeSessionPool; + + beforeEach(() => { + pool = database.pool_; + (sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake( + callback => { + callback(null, SESSION, TRANSACTION); + } + ); + }); + + it('with no error', async () => { + const fakeValue = {}; + + sandbox + .stub(FakeAsyncTransactionRunner.prototype, 'run') + .resolves(fakeValue); + + const value = await database.runTransactionAsync(async txn => { + const result = await txn.run('SELECT 1'); + await txn.commit(); + return result; + }); + + assert.strictEqual(value, fakeValue); + + await provider.forceFlush(); + await traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.runTransactionAsync']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected span status' + ); + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + + const expectedEventNames = ['Using Session']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); + + it('with error', async () => { + const ourException = new Error('our thrown error'); + sandbox + .stub(FakeAsyncTransactionRunner.prototype, 'run') + .throws(ourException); + + assert.rejects(async () => { + const value = await database.runTransactionAsync(async txn => { + const result = await txn.run('SELECT 1'); + await txn.commit(); + return result; + }); + }, ourException); + + await provider.forceFlush(); + await traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = ['CloudSpanner.Database.runTransactionAsync']; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + firstSpan.status.code, + SpanStatusCode.ERROR, + 'Unexpected span status' + ); + assert.strictEqual( + firstSpan.status.message, + ourException.message, + 'Unexpected span status message' + ); + + const expectedEventNames = ['Using Session']; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); + }); + describe('runStream', () => { const QUERY = { sql: 'SELECT * FROM table', diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index 93300a1ad..fe8460aa0 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -468,6 +468,52 @@ describe('EndToEnd', () => { }); }); + it('runTransactionAsync', async () => { + const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( + database.formattedName_ + ); + await database.runTransactionAsync(async transaction => { + const [rows] = await transaction!.run('SELECT 1'); + }); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); + + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Database.runTransactionAsync', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Transaction Creation Done', + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + 'Using Session', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); + it('writeAtLeastOnce', done => { const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( database.formattedName_ diff --git a/src/database.ts b/src/database.ts index ba46d691d..c770b0c8e 100644 --- a/src/database.ts +++ b/src/database.ts @@ -3363,46 +3363,56 @@ class Database extends common.GrpcServiceObject { let sessionId = ''; const getSession = this.pool_.getSession.bind(this.pool_); - const span = getActiveOrNoopSpan(); - // Loop to retry 'Session not found' errors. - // (and yes, we like while (true) more than for (;;) here) - // eslint-disable-next-line no-constant-condition - while (true) { - try { - const [session, transaction] = await promisify(getSession)(); - transaction.requestOptions = Object.assign( - transaction.requestOptions || {}, - options.requestOptions - ); - if (options.optimisticLock) { - transaction.useOptimisticLock(); - } - if (options.excludeTxnFromChangeStreams) { - transaction.excludeTxnFromChangeStreams(); - } - sessionId = session?.id; - span.addEvent('Using Session', {'session.id': sessionId}); - const runner = new AsyncTransactionRunner( - session, - transaction, - runFn, - options - ); - - try { - return await runner.run(); - } finally { - this.pool_.release(session); - } - } catch (e) { - if (!isSessionNotFoundError(e as ServiceError)) { - span.addEvent('No session available', { - 'session.id': sessionId, - }); - throw e; + return startTrace( + 'Database.runTransactionAsync', + this._traceConfig, + async span => { + // Loop to retry 'Session not found' errors. + // (and yes, we like while (true) more than for (;;) here) + // eslint-disable-next-line no-constant-condition + while (true) { + try { + const [session, transaction] = await promisify(getSession)(); + transaction.requestOptions = Object.assign( + transaction.requestOptions || {}, + options.requestOptions + ); + if (options.optimisticLock) { + transaction.useOptimisticLock(); + } + if (options.excludeTxnFromChangeStreams) { + transaction.excludeTxnFromChangeStreams(); + } + sessionId = session?.id; + span.addEvent('Using Session', {'session.id': sessionId}); + const runner = new AsyncTransactionRunner( + session, + transaction, + runFn, + options + ); + + try { + return await runner.run(); + } catch (e) { + setSpanError(span, e as Error); + throw e; + } finally { + span.end(); + this.pool_.release(session); + } + } catch (e) { + if (!isSessionNotFoundError(e as ServiceError)) { + span.addEvent('No session available', { + 'session.id': sessionId, + }); + span.end(); + throw e; + } + } } } - } + ); } /**