Skip to content

Commit

Permalink
feat(observability): trace Database.batchWriteAtLeastOnce
Browse files Browse the repository at this point in the history
This change traces Database.batchWriteAtLeastOnce. Sadly though
MockSpanner doesn't yet support batch writes hence no end-to-end
test with it.

Updates #2079
  • Loading branch information
odeke-em committed Oct 14, 2024
1 parent f01516e commit a4d4b66
Show file tree
Hide file tree
Showing 2 changed files with 295 additions and 59 deletions.
227 changes: 225 additions & 2 deletions observability-test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ import {EventEmitter} from 'events';
import * as assert from 'assert';
import * as extend from 'extend';
import {google} from '../protos/protos';
import {CommitCallback, CommitOptions, MutationSet} from '../src/transaction';
import {
BatchWriteOptions,
CommitCallback,
CommitOptions,
MutationSet,
} from '../src/transaction';
import {util} from '@google-cloud/common';
import {Transform} from 'stream';
import * as proxyquire from 'proxyquire';
Expand All @@ -35,7 +40,7 @@ const {
// eslint-disable-next-line n/no-extraneous-require
const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base');
import * as db from '../src/database';
import {Instance, Spanner} from '../src';
import {Instance, MutationGroup, Spanner} from '../src';
import * as pfy from '@google-cloud/promisify';
import {grpc} from 'google-gax';
import {MockError} from '../test/mockserver/mockspanner';
Expand Down Expand Up @@ -1215,6 +1220,224 @@ describe('Database', () => {
});
});

describe('batchWriteAtLeastOnce', () => {
const mutationGroup1 = new MutationGroup();
mutationGroup1.insert('MyTable', {
Key: 'ks1',
Thing: 'abc',
});
const mutationGroup2 = new MutationGroup();
mutationGroup2.insert('MyTable', {
Key: 'ks2',
Thing: 'xyz',
});

const mutationGroups = [mutationGroup1, mutationGroup2];

let fakePool: FakeSessionPool;
let fakeSession: FakeSession;
let fakeDataStream: Transform;
let getSessionStub: sinon.SinonStub;
let requestStreamStub: sinon.SinonStub;

const options = {
requestOptions: {
transactionTag: 'batch-write-tag',
},
excludeTxnFromChangeStream: true,
gaxOptions: {autoPaginate: false},
} as BatchWriteOptions;

beforeEach(() => {
fakePool = database.pool_;
fakeSession = new FakeSession();
fakeDataStream = through.obj();

getSessionStub = (
sandbox.stub(fakePool, 'getSession') as sinon.SinonStub
).callsFake(callback => callback(null, fakeSession));

requestStreamStub = sandbox
.stub(database, 'requestStream')
.returns(fakeDataStream);
});

it('on retry with "Session not found" error', done => {
const sessionNotFoundError = {
code: grpc.status.NOT_FOUND,
message: 'Session not found',
} as grpc.ServiceError;
let retryCount = 0;

database
.batchWriteAtLeastOnce(mutationGroups, options)
.on('data', () => {})
.on('error', err => {
assert.fail(err);
})
.on('end', () => {
assert.strictEqual(retryCount, 1);

const spans = traceExporter.getFinishedSpans();
withAllSpansHaveDBName(spans);

const actualSpanNames: string[] = [];
const actualEventNames: string[] = [];
spans.forEach(span => {
actualSpanNames.push(span.name);
span.events.forEach(event => {
actualEventNames.push(event.name);
});
});

const expectedSpanNames = [
'CloudSpanner.Database.batchWriteAtLeastOnce',
'CloudSpanner.Database.batchWriteAtLeastOnce',
];
assert.deepStrictEqual(
actualSpanNames,
expectedSpanNames,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
);

// Ensure that the span actually produced an error that was recorded.
const firstSpan = spans[0];
assert.strictEqual(
SpanStatusCode.ERROR,
firstSpan.status.code,
'Expected an ERROR span status'
);

const errorMessage = firstSpan.status.message;
assert.deepStrictEqual(
firstSpan.status.message,
sessionNotFoundError.message
);

// The last span should not have an error status.
const lastSpan = spans[spans.length - 1];
assert.strictEqual(
SpanStatusCode.UNSET,
lastSpan.status.code,
'Unexpected span status'
);

assert.deepStrictEqual(lastSpan.status.message, undefined);

const expectedEventNames = [
'Using Session',
'No session available',
'Using Session',
];
assert.deepStrictEqual(actualEventNames, expectedEventNames);

done();
});

fakeDataStream.emit('error', sessionNotFoundError);
retryCount++;
});

it('on getSession errors', done => {
const fakeError = new Error('err');

getSessionStub.callsFake(callback => callback(fakeError));
database
.batchWriteAtLeastOnce(mutationGroups, options)
.on('error', err => {
assert.strictEqual(err, fakeError);

const spans = traceExporter.getFinishedSpans();
withAllSpansHaveDBName(spans);

const actualSpanNames: string[] = [];
const actualEventNames: string[] = [];
spans.forEach(span => {
actualSpanNames.push(span.name);
span.events.forEach(event => {
actualEventNames.push(event.name);
});
});

const expectedSpanNames = [
'CloudSpanner.Database.batchWriteAtLeastOnce',
];
assert.deepStrictEqual(
actualSpanNames,
expectedSpanNames,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
);

// Ensure that the span actually produced an error that was recorded.
const firstSpan = spans[0];
assert.strictEqual(
SpanStatusCode.ERROR,
firstSpan.status.code,
'Expected an ERROR span status'
);

assert.deepStrictEqual(firstSpan.status.message, fakeError.message);

const expectedEventNames = [];
assert.deepStrictEqual(expectedEventNames, actualEventNames);

done();
});
});

it('with no errors', done => {
getSessionStub.callsFake(callback => callback(null, {}));
database
.batchWriteAtLeastOnce(mutationGroups, options)
.on('data', () => {})
.on('error', assert.ifError)
.on('end', () => {
const spans = traceExporter.getFinishedSpans();
withAllSpansHaveDBName(spans);

const actualSpanNames: string[] = [];
const actualEventNames: string[] = [];
spans.forEach(span => {
actualSpanNames.push(span.name);
span.events.forEach(event => {
actualEventNames.push(event.name);
});
});

const expectedSpanNames = [
'CloudSpanner.Database.batchWriteAtLeastOnce',
];
assert.deepStrictEqual(
actualSpanNames,
expectedSpanNames,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
);

// Ensure that the span actually produced an error that was recorded.
const firstSpan = spans[0];
assert.strictEqual(
SpanStatusCode.UNSET,
firstSpan.status.code,
'Unexpected span status code'
);

assert.strictEqual(
undefined,
firstSpan.status.message,
'Unexpected span status message'
);

const expectedEventNames = ['Using Session'];
assert.deepStrictEqual(actualEventNames, expectedEventNames);

done();
});

fakeDataStream.emit('data', 'response');
fakeDataStream.end('end');
});
});

describe('runTransaction', () => {
const SESSION = new FakeSession();
const TRANSACTION = new FakeTransaction(
Expand Down
127 changes: 70 additions & 57 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3469,66 +3469,79 @@ class Database extends common.GrpcServiceObject {
): NodeJS.ReadableStream {
const proxyStream: Transform = through.obj();

const span = getActiveOrNoopSpan();

this.pool_.getSession((err, session) => {
if (err) {
proxyStream.destroy(err);
return;
}

span.addEvent('Using Session', {'session.id': session?.id});
const gaxOpts = extend(true, {}, options?.gaxOptions);
const reqOpts = Object.assign(
{} as spannerClient.spanner.v1.BatchWriteRequest,
{
session: session!.formattedName_!,
mutationGroups: mutationGroups.map(mg => mg.proto()),
requestOptions: options?.requestOptions,
excludeTxnFromChangeStream: options?.excludeTxnFromChangeStreams,
}
);
let dataReceived = false;
let dataStream = this.requestStream({
client: 'SpannerClient',
method: 'batchWrite',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
});
dataStream
.once('data', () => (dataReceived = true))
.once('error', err => {
if (
!dataReceived &&
isSessionNotFoundError(err as grpc.ServiceError)
) {
// If there's a 'Session not found' error and we have not yet received
// any data, we can safely retry the writes on a new session.
// Register the error on the session so the pool can discard it.
if (session) {
session.lastError = err as grpc.ServiceError;
}
span.addEvent('No session available', {
'session.id': session?.id,
});
// Remove the current data stream from the end user stream.
dataStream.unpipe(proxyStream);
dataStream.end();
// Create a new stream and add it to the end user stream.
dataStream = this.batchWriteAtLeastOnce(mutationGroups, options);
dataStream.pipe(proxyStream);
} else {
return startTrace(
'Database.batchWriteAtLeastOnce',
this._traceConfig,
span => {
this.pool_.getSession((err, session) => {
if (err) {
proxyStream.destroy(err);
setSpanError(span, err);
span.end();
return;
}
})
.once('end', () => {
this.pool_.release(session!);
})
.pipe(proxyStream);
});

return proxyStream as NodeJS.ReadableStream;
span.addEvent('Using Session', {'session.id': session?.id});
const gaxOpts = extend(true, {}, options?.gaxOptions);
const reqOpts = Object.assign(
{} as spannerClient.spanner.v1.BatchWriteRequest,
{
session: session!.formattedName_!,
mutationGroups: mutationGroups.map(mg => mg.proto()),
requestOptions: options?.requestOptions,
excludeTxnFromChangeStream: options?.excludeTxnFromChangeStreams,
}
);
let dataReceived = false;
let dataStream = this.requestStream({
client: 'SpannerClient',
method: 'batchWrite',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
});
dataStream
.once('data', () => (dataReceived = true))
.once('error', err => {
setSpanError(span, err);

if (
!dataReceived &&
isSessionNotFoundError(err as grpc.ServiceError)
) {
// If there's a 'Session not found' error and we have not yet received
// any data, we can safely retry the writes on a new session.
// Register the error on the session so the pool can discard it.
if (session) {
session.lastError = err as grpc.ServiceError;
}
span.addEvent('No session available', {
'session.id': session?.id,
});
// Remove the current data stream from the end user stream.
dataStream.unpipe(proxyStream);
dataStream.end();
// Create a new stream and add it to the end user stream.
dataStream = this.batchWriteAtLeastOnce(
mutationGroups,
options
);
dataStream.pipe(proxyStream);
} else {
span.end();
proxyStream.destroy(err);
}
})
.once('end', () => {
span.end();
this.pool_.release(session!);
})
.pipe(proxyStream);
});

return proxyStream as NodeJS.ReadableStream;
}
);
}

/**
Expand Down

0 comments on commit a4d4b66

Please sign in to comment.