Skip to content

Commit

Permalink
fix: stream monitoring (#573)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos authored Oct 18, 2024
1 parent 729bbdf commit f950fc4
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 128 deletions.
1 change: 0 additions & 1 deletion src/internal/concurrency/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
export * from './mutex'
export * from './stream'
export * from './async-abort-controller'
11 changes: 11 additions & 0 deletions src/internal/monitoring/otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { S3Store } from '@tus/s3-store'
import { Upload } from '@aws-sdk/lib-storage'
import { StreamSplitter } from '@tus/server'
import { PgLock } from '@storage/protocols/tus'
import { Semaphore, Permit } from '@shopify/semaphore'

const tracingEnabled = process.env.TRACING_ENABLED === 'true'
const headersEnv = process.env.OTEL_EXPORTER_OTLP_TRACES_HEADERS || ''
Expand Down Expand Up @@ -265,6 +266,16 @@ const sdk = new NodeSDK({
enabled: true,
methodsToInstrument: ['lock', 'unlock', 'acquireLock'],
}),
new ClassInstrumentation({
targetClass: Semaphore,
enabled: true,
methodsToInstrument: ['acquire'],
}),
new ClassInstrumentation({
targetClass: Permit,
enabled: true,
methodsToInstrument: ['release'],
}),
new ClassInstrumentation({
targetClass: S3Client,
enabled: true,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Transform, TransformCallback } from 'stream'
import { Transform, TransformCallback } from 'node:stream'

export const createByteCounterStream = () => {
let bytes = 0
Expand Down
3 changes: 3 additions & 0 deletions src/internal/streams/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './stream-speed'
export * from './byte-counter'
export * from './monitor'
42 changes: 42 additions & 0 deletions src/internal/streams/monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { createByteCounterStream } from './byte-counter'
import { monitorStreamSpeed } from './stream-speed'
import { trace } from '@opentelemetry/api'
import { Readable } from 'node:stream'

/**
* Monitor readable streams by tracking their speed and bytes read
* @param dataStream
*/
export function monitorStream(dataStream: Readable) {
const speedMonitor = monitorStreamSpeed(dataStream)
const byteCounter = createByteCounterStream()

let measures: number[] = []

// Handle the 'speed' event to collect speed measurements
speedMonitor.on('speed', (bps) => {
measures.push(bps)
const span = trace.getActiveSpan()
span?.setAttributes({ 'stream.speed': measures, bytesRead: byteCounter.bytes })
})

speedMonitor.on('close', () => {
measures = []
const span = trace.getActiveSpan()
span?.setAttributes({ uploadRead: byteCounter.bytes })
})

// Handle errors by cleaning up and destroying the downstream stream
speedMonitor.on('error', (err) => {
// Destroy the byte counter stream with the error
byteCounter.transformStream.destroy(err)
})

// Ensure the byteCounter stream ends when speedMonitor ends
speedMonitor.on('end', () => {
byteCounter.transformStream.end()
})

// Return the piped stream
return speedMonitor.pipe(byteCounter.transformStream)
}
51 changes: 51 additions & 0 deletions src/internal/streams/stream-speed.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { Readable } from 'stream'
import { PassThrough } from 'node:stream'

/**
* Keep track of a stream's speed
* @param stream
* @param frequency
*/
/**
* Keep track of a stream's speed
* @param stream
* @param frequency
*/
export function monitorStreamSpeed(stream: Readable, frequency = 1000) {
let totalBytes = 0
const startTime = Date.now()

const passThrough = new PassThrough()

const interval = setInterval(() => {
const currentTime = Date.now()
const elapsedTime = (currentTime - startTime) / 1000
const currentSpeedBytesPerSecond = totalBytes / elapsedTime

passThrough.emit('speed', currentSpeedBytesPerSecond)
}, frequency)

passThrough.on('data', (chunk) => {
totalBytes += chunk.length
})

const cleanup = () => {
clearInterval(interval)
passThrough.removeAllListeners('speed')
}

// Handle close event to ensure cleanup
passThrough.on('close', cleanup)

// Propagate errors from the source stream to the passThrough
stream.on('error', (err) => {
passThrough.destroy(err)
})

// Ensure the passThrough ends when the source stream ends
stream.on('end', () => {
passThrough.end()
})

return stream.pipe(passThrough)
}
137 changes: 11 additions & 126 deletions src/storage/backend/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,12 @@ import {
import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
import { ERRORS, StorageBackendError } from '@internal/errors'
import { getConfig } from '../../config'
import { addAbortSignal, PassThrough, Readable } from 'node:stream'
import { trace } from '@opentelemetry/api'
import { createByteCounterStream } from '@internal/concurrency'
import { AgentStats, createAgent, gatherHttpAgentStats, InstrumentedAgent } from '@internal/http'
import { Readable } from 'node:stream'
import { createAgent, InstrumentedAgent } from '@internal/http'
import { monitorStream } from '@internal/streams'

const { tracingFeatures, storageS3MaxSockets, tracingEnabled } = getConfig()

interface StreamStatus {
time: Date
bytesUploaded: number
progress: Progress[]
dataStream: {
closed: boolean
paused: boolean
errored: boolean
writable: boolean
byteRead: number
}
httpAgentStats: AgentStats
}

export interface S3ClientOptions {
endpoint?: string
region?: string
Expand Down Expand Up @@ -154,22 +139,19 @@ export class S3Backend implements StorageBackendAdapter {
throw ERRORS.Aborted('Upload was aborted')
}

const streamWatcher = tracingFeatures?.upload ? this.watchUploadStream(body, signal) : undefined
const uploadStream = streamWatcher ? streamWatcher.dataStream : body
const dataStream = tracingFeatures?.upload ? monitorStream(body) : body

const upload = new Upload({
client: this.client,
params: {
Bucket: bucketName,
Key: withOptionalVersion(key, version),
Body: uploadStream,
Body: dataStream,
ContentType: contentType,
CacheControl: cacheControl,
},
})

streamWatcher?.watchUpload(upload)

signal?.addEventListener(
'abort',
() => {
Expand All @@ -178,6 +160,12 @@ export class S3Backend implements StorageBackendAdapter {
{ once: true }
)

if (tracingFeatures?.upload) {
upload.on('httpUploadProgress', (progress: Progress) => {
dataStream.emit('s3_progress', JSON.stringify(progress))
})
}

try {
const data = await upload.done()
const metadata = await this.headObject(bucketName, key, version)
Expand All @@ -194,26 +182,9 @@ export class S3Backend implements StorageBackendAdapter {
}
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') {
const span = trace.getActiveSpan()
if (span) {
// Print how far we got uploading the file
const lastSeenStatus = streamWatcher?.lastSeenStreamStatus
const lastStreamStatus = streamWatcher?.getStreamStatus()

if (lastSeenStatus && lastStreamStatus) {
const { progress, ...lastSeenStream } = lastSeenStatus
span.setAttributes({
lastStreamStatus: JSON.stringify(lastStreamStatus),
lastSeenStatus: JSON.stringify(lastSeenStream),
})
}
}

throw ERRORS.AbortedTerminate('Upload was aborted', err)
}
throw StorageBackendError.fromError(err)
} finally {
streamWatcher?.stop()
}
}

Expand Down Expand Up @@ -493,92 +464,6 @@ export class S3Backend implements StorageBackendAdapter {
this.agent.close()
}

protected watchUploadStream(body: Readable, signal?: AbortSignal) {
const passThrough = new PassThrough()

if (signal) {
addAbortSignal(signal, passThrough)
}

passThrough.on('error', () => {
body.unpipe(passThrough)
})

body.on('error', (err) => {
if (!passThrough.closed) {
passThrough.destroy(err)
}
})

const byteReader = createByteCounterStream()
const bodyStream = body.pipe(passThrough)

// Upload stats
const uploadProgress: Progress[] = []
const getStreamStatus = (): StreamStatus => ({
time: new Date(),
bytesUploaded: uploadProgress[uploadProgress.length - 1]?.loaded || 0,
dataStream: {
closed: bodyStream.closed,
paused: bodyStream.isPaused(),
errored: Boolean(bodyStream.errored),
writable: bodyStream.writable,
byteRead: byteReader.bytes,
},
httpAgentStats: gatherHttpAgentStats(this.agent.httpsAgent.getCurrentStatus()),
progress: uploadProgress,
})

let streamStatus = getStreamStatus()

const streamWatcher = setInterval(() => {
streamStatus = getStreamStatus()
}, 1000)

const dataStream = passThrough.pipe(byteReader.transformStream)

body.on('error', (err) => {
passThrough.destroy(err)
})

passThrough.on('error', (err) => {
body.destroy(err)
})

passThrough.on('close', () => {
body.unpipe(passThrough)
})

function watchUpload(upload: Upload) {
upload.on('httpUploadProgress', (progress) => {
uploadProgress.push({
total: progress.total,
part: progress.part,
loaded: progress.loaded,
})
if (uploadProgress.length > 100) {
uploadProgress.shift()
}
})
}

return {
dataStream,
byteReader,
get uploadProgress() {
return uploadProgress
},
get lastSeenStreamStatus() {
return streamStatus
},
getStreamStatus,
stop() {
clearInterval(streamWatcher)
},
watchUpload,
}
}

protected createS3Client(options: S3ClientOptions & { name: string }) {
const params: S3ClientConfig = {
region: options.region,
Expand Down

0 comments on commit f950fc4

Please sign in to comment.