Skip to content

Commit

Permalink
feat: (observability): trace Database.runTransactionAsync (#2167)
Browse files Browse the repository at this point in the history
Extracted out of PR #2158, this change traces
Database.runTransactionAsync.

Updates #207
  • Loading branch information
odeke-em authored Oct 21, 2024
1 parent 51bc8a7 commit d0fe178
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 38 deletions.
131 changes: 131 additions & 0 deletions observability-test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,137 @@ describe('Database', () => {
});
});

describe('runTransactionAsync', () => {
const SESSION = new FakeSession();
const TRANSACTION = new FakeTransaction(
{} as google.spanner.v1.TransactionOptions.ReadWrite
);

let pool: FakeSessionPool;

beforeEach(() => {
pool = database.pool_;
(sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake(
callback => {
callback(null, SESSION, TRANSACTION);
}
);
});

it('with no error', async () => {
const fakeValue = {};

sandbox
.stub(FakeAsyncTransactionRunner.prototype, 'run')
.resolves(fakeValue);

const value = await database.runTransactionAsync(async txn => {
const result = await txn.run('SELECT 1');
await txn.commit();
return result;
});

assert.strictEqual(value, fakeValue);

await provider.forceFlush();
await traceExporter.forceFlush();
const spans = traceExporter.getFinishedSpans();
withAllSpansHaveDBName(spans);

const actualSpanNames: string[] = [];
const actualEventNames: string[] = [];
spans.forEach(span => {
actualSpanNames.push(span.name);
span.events.forEach(event => {
actualEventNames.push(event.name);
});
});

const expectedSpanNames = ['CloudSpanner.Database.runTransactionAsync'];
assert.deepStrictEqual(
actualSpanNames,
expectedSpanNames,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
);

// Ensure that the span actually produced an error that was recorded.
const firstSpan = spans[0];
assert.strictEqual(
SpanStatusCode.UNSET,
firstSpan.status.code,
'Unexpected span status'
);
assert.strictEqual(
undefined,
firstSpan.status.message,
'Unexpected span status message'
);

const expectedEventNames = ['Using Session'];
assert.deepStrictEqual(
actualEventNames,
expectedEventNames,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);
});

it('with error', async () => {
const ourException = new Error('our thrown error');
sandbox
.stub(FakeAsyncTransactionRunner.prototype, 'run')
.throws(ourException);

assert.rejects(async () => {
const value = await database.runTransactionAsync(async txn => {
const result = await txn.run('SELECT 1');
await txn.commit();
return result;
});
}, ourException);

await provider.forceFlush();
await traceExporter.forceFlush();
const spans = traceExporter.getFinishedSpans();
withAllSpansHaveDBName(spans);

const actualSpanNames: string[] = [];
const actualEventNames: string[] = [];
spans.forEach(span => {
actualSpanNames.push(span.name);
span.events.forEach(event => {
actualEventNames.push(event.name);
});
});

const expectedSpanNames = ['CloudSpanner.Database.runTransactionAsync'];
assert.deepStrictEqual(
actualSpanNames,
expectedSpanNames,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
);

// Ensure that the span actually produced an error that was recorded.
const firstSpan = spans[0];
assert.strictEqual(
firstSpan.status.code,
SpanStatusCode.ERROR,
'Unexpected span status'
);
assert.strictEqual(
firstSpan.status.message,
ourException.message,
'Unexpected span status message'
);

const expectedEventNames = ['Using Session', 'exception'];
assert.deepStrictEqual(
actualEventNames,
expectedEventNames,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);
});
});

describe('runStream', () => {
const QUERY = {
sql: 'SELECT * FROM table',
Expand Down
46 changes: 46 additions & 0 deletions observability-test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,52 @@ describe('EndToEnd', () => {
});
});

it('runTransactionAsync', async () => {
const withAllSpansHaveDBName = generateWithAllSpansHaveDBName(
database.formattedName_
);
await database.runTransactionAsync(async transaction => {
await transaction!.run('SELECT 1');
});

traceExporter.forceFlush();
const spans = traceExporter.getFinishedSpans();
withAllSpansHaveDBName(spans);

const actualEventNames: string[] = [];
const actualSpanNames: string[] = [];
spans.forEach(span => {
actualSpanNames.push(span.name);
span.events.forEach(event => {
actualEventNames.push(event.name);
});
});

const expectedSpanNames = [
'CloudSpanner.Snapshot.runStream',
'CloudSpanner.Snapshot.run',
'CloudSpanner.Database.runTransactionAsync',
];
assert.deepStrictEqual(
actualSpanNames,
expectedSpanNames,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
);

const expectedEventNames = [
'Transaction Creation Done',
'Acquiring session',
'Cache hit: has usable session',
'Acquired session',
'Using Session',
];
assert.deepStrictEqual(
actualEventNames,
expectedEventNames,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);
});

it('writeAtLeastOnce', done => {
const withAllSpansHaveDBName = generateWithAllSpansHaveDBName(
database.formattedName_
Expand Down
86 changes: 48 additions & 38 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3363,46 +3363,56 @@ class Database extends common.GrpcServiceObject {

let sessionId = '';
const getSession = this.pool_.getSession.bind(this.pool_);
const span = getActiveOrNoopSpan();
// Loop to retry 'Session not found' errors.
// (and yes, we like while (true) more than for (;;) here)
// eslint-disable-next-line no-constant-condition
while (true) {
try {
const [session, transaction] = await promisify(getSession)();
transaction.requestOptions = Object.assign(
transaction.requestOptions || {},
options.requestOptions
);
if (options.optimisticLock) {
transaction.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
sessionId = session?.id;
span.addEvent('Using Session', {'session.id': sessionId});
const runner = new AsyncTransactionRunner<T>(
session,
transaction,
runFn,
options
);

try {
return await runner.run();
} finally {
this.pool_.release(session);
}
} catch (e) {
if (!isSessionNotFoundError(e as ServiceError)) {
span.addEvent('No session available', {
'session.id': sessionId,
});
throw e;
return startTrace(
'Database.runTransactionAsync',
this._traceConfig,
async span => {
// Loop to retry 'Session not found' errors.
// (and yes, we like while (true) more than for (;;) here)
// eslint-disable-next-line no-constant-condition
while (true) {
try {
const [session, transaction] = await promisify(getSession)();
transaction.requestOptions = Object.assign(
transaction.requestOptions || {},
options.requestOptions
);
if (options.optimisticLock) {
transaction.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
sessionId = session?.id;
span.addEvent('Using Session', {'session.id': sessionId});
const runner = new AsyncTransactionRunner<T>(
session,
transaction,
runFn,
options
);

try {
return await runner.run();
} catch (e) {
setSpanErrorAndException(span, e as Error);
throw e;
} finally {
span.end();
this.pool_.release(session);
}
} catch (e) {
if (!isSessionNotFoundError(e as ServiceError)) {
span.addEvent('No session available', {
'session.id': sessionId,
});
span.end();
throw e;
}
}
}
}
}
);
}

/**
Expand Down

0 comments on commit d0fe178

Please sign in to comment.