Skip to content

Commit

Permalink
feat(observability): trace BatchTransaction and Table
Browse files Browse the repository at this point in the history
This change is part of a series of changes to add
OpenTelemetry traces, focused on BatchTransaction and Table.

While here, made the tests for sessionPool spans much more
precise to avoid flakes.

Updates #2079
Built from PR #2087
Updates #2114
  • Loading branch information
odeke-em committed Sep 19, 2024
1 parent 3300ab5 commit 38708a8
Show file tree
Hide file tree
Showing 7 changed files with 556 additions and 96 deletions.
166 changes: 108 additions & 58 deletions src/batch-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
CLOUD_RESOURCE_HEADER,
addLeaderAwareRoutingHeader,
} from '../src/common';
import {startTrace, setSpanError, traceConfig} from './instrument';

export interface TransactionIdentifier {
session: string | Session;
Expand Down Expand Up @@ -136,20 +137,37 @@ class BatchTransaction extends Snapshot {
delete reqOpts.gaxOptions;
delete reqOpts.types;

const headers: {[k: string]: string} = {};
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}
const traceConfig: traceConfig = {
sql: query,
opts: this.observabilityOptions,
};
return startTrace(
'BatchTransaction.createQueryPartitions',
traceConfig,
span => {
const headers: {[k: string]: string} = {};
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}

this.createPartitions_(
{
client: 'SpannerClient',
method: 'partitionQuery',
reqOpts,
gaxOpts: query.gaxOptions,
headers: headers,
},
callback
this.createPartitions_(
{
client: 'SpannerClient',
method: 'partitionQuery',
reqOpts,
gaxOpts: query.gaxOptions,
headers: headers,
},
(err, partitions, resp) => {
if (err) {
setSpanError(span, err);
}

span.end();
callback(err, partitions, resp);
}
);
}
);
}
/**
Expand All @@ -163,38 +181,52 @@ class BatchTransaction extends Snapshot {
* @param {function} callback Callback function.
*/
createPartitions_(config, callback) {
const query = extend({}, config.reqOpts, {
session: this.session.formattedName_,
transaction: {id: this.id},
});
config.reqOpts = extend({}, query);
config.headers = {
[CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_,
const traceConfig: traceConfig = {
opts: this.observabilityOptions,
};
delete query.partitionOptions;
this.session.request(config, (err, resp) => {
if (err) {
callback(err, null, resp);
return;
}

const partitions = resp.partitions.map(partition => {
return extend({}, query, partition);
});
return startTrace(
'BatchTransaction.createPartitions_',
traceConfig,
span => {
const query = extend({}, config.reqOpts, {
session: this.session.formattedName_,
transaction: {id: this.id},
});
config.reqOpts = extend({}, query);
config.headers = {
[CLOUD_RESOURCE_HEADER]: (this.session.parent as Database)
.formattedName_,
};
delete query.partitionOptions;
this.session.request(config, (err, resp) => {
if (err) {
setSpanError(span, err);
span.end();
callback(err, null, resp);
return;
}

if (resp.transaction) {
const {id, readTimestamp} = resp.transaction;
const partitions = resp.partitions.map(partition => {
return extend({}, query, partition);
});

this.id = id;
if (resp.transaction) {
const {id, readTimestamp} = resp.transaction;

if (readTimestamp) {
this.readTimestampProto = readTimestamp;
this.readTimestamp = new PreciseDate(readTimestamp);
}
}
this.id = id;

if (readTimestamp) {
this.readTimestampProto = readTimestamp;
this.readTimestamp = new PreciseDate(readTimestamp);
}
}

callback(null, partitions, resp);
});
span.end();
callback(null, partitions, resp);
});
}
);
}
/**
* @typedef {object} ReadPartition
Expand Down Expand Up @@ -226,28 +258,45 @@ class BatchTransaction extends Snapshot {
* @returns {Promise<CreateReadPartitionsResponse>}
*/
createReadPartitions(options, callback) {
const reqOpts = Object.assign({}, options, {
keySet: Snapshot.encodeKeySet(options),
});
const traceConfig: traceConfig = {
opts: this.observabilityOptions,
};

delete reqOpts.gaxOptions;
delete reqOpts.keys;
delete reqOpts.ranges;
return startTrace(
'BatchTransaction.createReadPartitions',
traceConfig,
span => {
const reqOpts = Object.assign({}, options, {
keySet: Snapshot.encodeKeySet(options),
});

const headers: {[k: string]: string} = {};
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}
delete reqOpts.gaxOptions;
delete reqOpts.keys;
delete reqOpts.ranges;

this.createPartitions_(
{
client: 'SpannerClient',
method: 'partitionRead',
reqOpts,
gaxOpts: options.gaxOptions,
headers: headers,
},
callback
const headers: {[k: string]: string} = {};
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}

this.createPartitions_(
{
client: 'SpannerClient',
method: 'partitionRead',
reqOpts,
gaxOpts: options.gaxOptions,
headers: headers,
},
(err, partitions, resp) => {
if (err) {
setSpanError(span, err);
}

span.end();
callback(err, partitions, resp);
}
);
}
);
}
/**
Expand Down Expand Up @@ -322,6 +371,7 @@ class BatchTransaction extends Snapshot {
* ```
*/
executeStream(partition) {
// TODO: Instrument the streams with Otel.
if (is.string(partition.table)) {
return this.createReadStream(partition.table, partition);
}
Expand Down
4 changes: 2 additions & 2 deletions src/instrument.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ interface SQLStatement {

interface observabilityOptions {
tracerProvider: TracerProvider;
enableExtendedTracing: boolean;
enableExtendedTracing?: boolean;
}

export type {observabilityOptions as ObservabilityOptions};
Expand Down Expand Up @@ -81,7 +81,7 @@ interface traceConfig {
}

const SPAN_NAMESPACE_PREFIX = 'CloudSpanner'; // TODO: discuss & standardize this prefix.
export {SPAN_NAMESPACE_PREFIX};
export {SPAN_NAMESPACE_PREFIX, traceConfig};

/**
* startTrace begins an active span in the current active context
Expand Down
61 changes: 41 additions & 20 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ import {
import {google as databaseAdmin} from '../protos/protos';
import {Schema, LongRunningCallback} from './common';
import IRequestOptions = databaseAdmin.spanner.v1.IRequestOptions;
import {
ObservabilityOptions,
startTrace,
setSpanError,
traceConfig,
} from './instrument';

export type Key = string | string[];

Expand Down Expand Up @@ -93,6 +99,7 @@ const POSTGRESQL = 'POSTGRESQL';
class Table {
database: Database;
name: string;
observabilityOptions?: ObservabilityOptions;
constructor(database: Database, name: string) {
/**
* The {@link Database} instance of this {@link Table} instance.
Expand Down Expand Up @@ -1072,29 +1079,43 @@ class Table {
options: MutateRowsOptions | CallOptions = {},
callback: CommitCallback
): void {
const requestOptions =
'requestOptions' in options ? options.requestOptions : {};
const traceConfig: traceConfig = {
opts: this.observabilityOptions,
};

const excludeTxnFromChangeStreams =
'excludeTxnFromChangeStreams' in options
? options.excludeTxnFromChangeStreams
: false;
startTrace('Table.' + method, traceConfig, span => {
const requestOptions =
'requestOptions' in options ? options.requestOptions : {};

this.database.runTransaction(
{
requestOptions: requestOptions,
excludeTxnFromChangeStreams: excludeTxnFromChangeStreams,
},
(err, transaction) => {
if (err) {
callback(err);
return;
}
const excludeTxnFromChangeStreams =
'excludeTxnFromChangeStreams' in options
? options.excludeTxnFromChangeStreams
: false;

transaction![method](this.name, rows as Key[]);
transaction!.commit(options, callback);
}
);
this.database.runTransaction(
{
requestOptions: requestOptions,
excludeTxnFromChangeStreams: excludeTxnFromChangeStreams,
},
(err, transaction) => {
if (err) {
setSpanError(span, err);
span.end();
callback(err);
return;
}

transaction![method](this.name, rows as Key[]);
transaction!.commit(options, (err, resp) => {
if (err) {
setSpanError(span, err);
}
span.end();
callback(err, resp);
});
}
);
});
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import IQueryOptions = google.spanner.v1.ExecuteSqlRequest.IQueryOptions;
import IRequestOptions = google.spanner.v1.IRequestOptions;
import {Database, Spanner} from '.';
import ReadLockMode = google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode;
import {ObservabilityOptions} from './instrument';

export type Rows = Array<Row | Json>;
const RETRY_INFO_TYPE = 'type.googleapis.com/google.rpc.retryinfo';
Expand Down Expand Up @@ -285,6 +286,7 @@ export class Snapshot extends EventEmitter {
queryOptions?: IQueryOptions;
resourceHeader_: {[k: string]: string};
requestOptions?: Pick<IRequestOptions, 'transactionTag'>;
observabilityOptions?: ObservabilityOptions;

/**
* The transaction ID.
Expand Down
Loading

0 comments on commit 38708a8

Please sign in to comment.