Skip to content

Commit

Permalink
fix(CosmoStoreSink): Improve error handling (#226)
Browse files Browse the repository at this point in the history
* Equinox 4.0.0-rc.12.12
* Sink/Pruner stats/errors
* Make limit more conservative
* Tidy exns
* Store/Streams patterns
  • Loading branch information
bartelink authored Aug 3, 2023
1 parent 35dfe3f commit 37a1879
Show file tree
Hide file tree
Showing 28 changed files with 269 additions and 283 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Streams.SpanResult`: Renamed to `Sinks.StreamResult` [#208](https:/jet/propulsion/pull/208)
- `Propulsion.CosmosStore`: Changed to target `Equinox.CosmosStore` v `4.0.0` [#139](https:/jet/propulsion/pull/139)
- `Propulsion.CosmosStore.CosmosSource`: Changed parsing to use `System.Text.Json` [#139](https:/jet/propulsion/pull/139)
- `Propulsion.CosmosStore.CosmosStoreSink`+`CosmosStorePruner`: Exposed `*Stats` [#226](https:/jet/propulsion/pull/226)
- `Propulsion.EventStore`: Pinned to target `Equinox.EventStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.EventStoreDb`** [#139](https:/jet/propulsion/pull/139)
- `Propulsion.EventStoreDb.EventStoreSource`: Changed API to match`Propulsion.SqlStreamStore` API rather than`Propulsion.EventStore` [#139](https:/jet/propulsion/pull/139)
- `Propulsion.Feed`,`Kafka`: Replaced `Async` with `task` for supervision [#158](https:/jet/propulsion/pull/158), [#159](https:/jet/propulsion/pull/159)
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/ChangeFeedProcessor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type internal SourcePipeline =
static member Start(log : ILogger, start, maybeStartChild, stop, observer : IDisposable) =
let cts = new CancellationTokenSource()
let triggerStop _disposing =
let level = if cts.IsCancellationRequested then Events.LogEventLevel.Debug else Events.LogEventLevel.Information
let level = if cts.IsCancellationRequested then LogEventLevel.Debug else LogEventLevel.Information
log.Write(level, "Source stopping...")
observer.Dispose()
cts.Cancel()
Expand Down
71 changes: 29 additions & 42 deletions src/Propulsion.CosmosStore/CosmosStorePruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,10 @@ open System

module Pruner =

let (|TimedOutMessage|RateLimitedMessage|Other|) (e : exn) =
match e with
| :? Microsoft.Azure.Cosmos.CosmosException as ce when ce.StatusCode = System.Net.HttpStatusCode.TooManyRequests -> RateLimitedMessage
| e when e.GetType().FullName = "Microsoft.Azure.Documents.RequestTimeoutException" -> TimedOutMessage
| _ -> Other

type Outcome =
| Ok of completed : int * deferred : int
| Nop of int

type Stats(log, statsInterval, stateInterval) =
inherit Propulsion.Streams.Stats<Outcome>(log, statsInterval, stateInterval)

let mutable nops, totalRedundant, ops, totalDeletes, totalDeferred = 0, 0, 0, 0, 0

override _.DumpStats() =
log.Information("Deleted {ops}r {deletedCount}e Deferred {deferred}e Redundant {nops}r {nopCount}e",
ops, totalDeletes, totalDeferred, nops, totalRedundant)
ops <- 0; totalDeletes <- 0; nops <- 0; totalDeferred <- totalDeferred; totalRedundant <- 0
base.DumpStats()
Equinox.CosmosStore.Core.Log.InternalMetrics.dump log

override _.HandleOk outcome =
match outcome with
| Nop count ->
nops <- nops + 1
totalRedundant <- totalRedundant + count
| Ok (completed, deferred) ->
ops <- ops + 1
totalDeletes <- totalDeletes + completed
totalDeferred <- totalDeferred + deferred
override x.Classify e =
match e with
| RateLimitedMessage -> OutcomeKind.RateLimited
| TimedOutMessage -> OutcomeKind.Timeout
| Other -> OutcomeKind.Exception
override _.HandleExn(log, exn) = log.Warning(exn, "Unhandled")

// Per set of accumulated events per stream (selected via `selectExpired`), attempt to prune up to the high water mark
let handle pruneUntil stream (span: Event[]) ct = task {
// The newest event eligible for deletion defines the cutoff point
Expand All @@ -66,17 +32,40 @@ module Pruner =
let writePos = max trimmedPos (untilIndex + 1L)
return struct (writePos, res) }

type CosmosStorePrunerStats(log, statsInterval, stateInterval) =
inherit Propulsion.Streams.Stats<Pruner.Outcome>(log, statsInterval, stateInterval)

let mutable nops, totalRedundant, ops, totalDeletes, totalDeferred = 0, 0, 0, 0, 0
override _.HandleOk outcome =
match outcome with
| Pruner.Outcome.Nop count ->
nops <- nops + 1
totalRedundant <- totalRedundant + count
| Pruner.Outcome.Ok (completed, deferred) ->
ops <- ops + 1
totalDeletes <- totalDeletes + completed
totalDeferred <- totalDeferred + deferred
override _.DumpStats() =
log.Information("Deleted {ops}r {deletedCount}e Deferred {deferred}e Redundant {nops}r {nopCount}e",
ops, totalDeletes, totalDeferred, nops, totalRedundant)
ops <- 0; totalDeletes <- 0; nops <- 0; totalDeferred <- totalDeferred; totalRedundant <- 0
base.DumpStats()
Equinox.CosmosStore.Core.Log.InternalMetrics.dump log

override x.Classify e =
match e with
| Equinox.CosmosStore.Exceptions.RateLimited -> OutcomeKind.RateLimited
| Equinox.CosmosStore.Exceptions.RequestTimeout -> OutcomeKind.Timeout
| e -> base.Classify e
override _.HandleExn(log, exn) = log.Warning(exn, "Unhandled")

/// DANGER: <c>CosmosPruner</c> DELETES events - use with care
type CosmosStorePruner =

/// DANGER: this API DELETES events - use with care
/// Starts a <c>Sink</c> that prunes _all submitted events from the supplied <c>context</c>_
static member Start
( log : ILogger, maxReadAhead, context, maxConcurrentStreams,
// Default 5m
?statsInterval,
// Default 5m
?stateInterval,
( log : ILogger, maxReadAhead, context, maxConcurrentStreams, stats: CosmosStorePrunerStats,
?purgeInterval, ?wakeForResults, ?idleDelay,
// Defaults to statsInterval
?ingesterStatsInterval)
Expand All @@ -87,9 +76,7 @@ type CosmosStorePruner =
let metrics = StreamSpan.metrics Event.storedSize span
struct (metrics, span)
Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil, (fun _ r -> r))
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
let stats = Pruner.Stats(log.ForContext<Pruner.Stats>(), statsInterval, stateInterval)
let dumpStreams logStreamStates _log = logStreamStates Event.storedSize
let scheduler = Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5,
?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay)
Projector.Pipeline.Start(log, scheduler.Pump, maxReadAhead, scheduler, ingesterStatsInterval = defaultArg ingesterStatsInterval statsInterval)
Projector.Pipeline.Start(log, scheduler.Pump, maxReadAhead, scheduler, ingesterStatsInterval = defaultArg ingesterStatsInterval stats.StatsInterval.Period)
Loading

0 comments on commit 37a1879

Please sign in to comment.