diff --git a/CHANGELOG.md b/CHANGELOG.md index 1293fded..b938de7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Streams.SpanResult`: Renamed to `Sinks.StreamResult` [#208](https://github.com/jet/propulsion/pull/208) - `Propulsion.CosmosStore`: Changed to target `Equinox.CosmosStore` v `4.0.0` [#139](https://github.com/jet/propulsion/pull/139) - `Propulsion.CosmosStore.CosmosSource`: Changed parsing to use `System.Text.Json` [#139](https://github.com/jet/propulsion/pull/139) +- `Propulsion.CosmosStore.CosmosStoreSink`+`CosmosStorePruner`: Exposed `*Stats` [#226](https://github.com/jet/propulsion/pull/226) - `Propulsion.EventStore`: Pinned to target `Equinox.EventStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.EventStoreDb`** [#139](https://github.com/jet/propulsion/pull/139) - `Propulsion.EventStoreDb.EventStoreSource`: Changed API to match`Propulsion.SqlStreamStore` API rather than`Propulsion.EventStore` [#139](https://github.com/jet/propulsion/pull/139) - `Propulsion.Feed`,`Kafka`: Replaced `Async` with `task` for supervision [#158](https://github.com/jet/propulsion/pull/158), [#159](https://github.com/jet/propulsion/pull/159) diff --git a/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs b/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs index 5429794c..77b3949d 100644 --- a/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs +++ b/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs @@ -31,7 +31,7 @@ type internal SourcePipeline = static member Start(log : ILogger, start, maybeStartChild, stop, observer : IDisposable) = let cts = new CancellationTokenSource() let triggerStop _disposing = - let level = if cts.IsCancellationRequested then Events.LogEventLevel.Debug else Events.LogEventLevel.Information + let level = if cts.IsCancellationRequested then LogEventLevel.Debug else LogEventLevel.Information log.Write(level, "Source stopping...") observer.Dispose() cts.Cancel() diff --git a/src/Propulsion.CosmosStore/CosmosStorePruner.fs b/src/Propulsion.CosmosStore/CosmosStorePruner.fs index 84f3bc6c..881ff333 100644 --- a/src/Propulsion.CosmosStore/CosmosStorePruner.fs +++ b/src/Propulsion.CosmosStore/CosmosStorePruner.fs @@ -9,44 +9,10 @@ open System module Pruner = - let (|TimedOutMessage|RateLimitedMessage|Other|) (e : exn) = - match e with - | :? Microsoft.Azure.Cosmos.CosmosException as ce when ce.StatusCode = System.Net.HttpStatusCode.TooManyRequests -> RateLimitedMessage - | e when e.GetType().FullName = "Microsoft.Azure.Documents.RequestTimeoutException" -> TimedOutMessage - | _ -> Other - type Outcome = | Ok of completed : int * deferred : int | Nop of int - type Stats(log, statsInterval, stateInterval) = - inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval) - - let mutable nops, totalRedundant, ops, totalDeletes, totalDeferred = 0, 0, 0, 0, 0 - - override _.DumpStats() = - log.Information("Deleted {ops}r {deletedCount}e Deferred {deferred}e Redundant {nops}r {nopCount}e", - ops, totalDeletes, totalDeferred, nops, totalRedundant) - ops <- 0; totalDeletes <- 0; nops <- 0; totalDeferred <- totalDeferred; totalRedundant <- 0 - base.DumpStats() - Equinox.CosmosStore.Core.Log.InternalMetrics.dump log - - override _.HandleOk outcome = - match outcome with - | Nop count -> - nops <- nops + 1 - totalRedundant <- totalRedundant + count - | Ok (completed, deferred) -> - ops <- ops + 1 - totalDeletes <- totalDeletes + completed - totalDeferred <- totalDeferred + deferred - override x.Classify e = - match e with - | RateLimitedMessage -> OutcomeKind.RateLimited - | TimedOutMessage -> OutcomeKind.Timeout - | Other -> OutcomeKind.Exception - override _.HandleExn(log, exn) = log.Warning(exn, "Unhandled") - // Per set of accumulated events per stream (selected via `selectExpired`), attempt to prune up to the high water mark let handle pruneUntil stream (span: Event[]) ct = task { // The newest event eligible for deletion defines the cutoff point @@ -66,17 +32,40 @@ module Pruner = let writePos = max trimmedPos (untilIndex + 1L) return struct (writePos, res) } +type CosmosStorePrunerStats(log, statsInterval, stateInterval) = + inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval) + + let mutable nops, totalRedundant, ops, totalDeletes, totalDeferred = 0, 0, 0, 0, 0 + override _.HandleOk outcome = + match outcome with + | Pruner.Outcome.Nop count -> + nops <- nops + 1 + totalRedundant <- totalRedundant + count + | Pruner.Outcome.Ok (completed, deferred) -> + ops <- ops + 1 + totalDeletes <- totalDeletes + completed + totalDeferred <- totalDeferred + deferred + override _.DumpStats() = + log.Information("Deleted {ops}r {deletedCount}e Deferred {deferred}e Redundant {nops}r {nopCount}e", + ops, totalDeletes, totalDeferred, nops, totalRedundant) + ops <- 0; totalDeletes <- 0; nops <- 0; totalDeferred <- totalDeferred; totalRedundant <- 0 + base.DumpStats() + Equinox.CosmosStore.Core.Log.InternalMetrics.dump log + + override x.Classify e = + match e with + | Equinox.CosmosStore.Exceptions.RateLimited -> OutcomeKind.RateLimited + | Equinox.CosmosStore.Exceptions.RequestTimeout -> OutcomeKind.Timeout + | e -> base.Classify e + override _.HandleExn(log, exn) = log.Warning(exn, "Unhandled") + /// DANGER: CosmosPruner DELETES events - use with care type CosmosStorePruner = /// DANGER: this API DELETES events - use with care /// Starts a Sink that prunes _all submitted events from the supplied context_ static member Start - ( log : ILogger, maxReadAhead, context, maxConcurrentStreams, - // Default 5m - ?statsInterval, - // Default 5m - ?stateInterval, + ( log : ILogger, maxReadAhead, context, maxConcurrentStreams, stats: CosmosStorePrunerStats, ?purgeInterval, ?wakeForResults, ?idleDelay, // Defaults to statsInterval ?ingesterStatsInterval) @@ -87,9 +76,7 @@ type CosmosStorePruner = let metrics = StreamSpan.metrics Event.storedSize span struct (metrics, span) Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, interpret, Pruner.handle pruneUntil, (fun _ r -> r)) - let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.) - let stats = Pruner.Stats(log.ForContext(), statsInterval, stateInterval) let dumpStreams logStreamStates _log = logStreamStates Event.storedSize let scheduler = Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay) - Projector.Pipeline.Start(log, scheduler.Pump, maxReadAhead, scheduler, ingesterStatsInterval = defaultArg ingesterStatsInterval statsInterval) + Projector.Pipeline.Start(log, scheduler.Pump, maxReadAhead, scheduler, ingesterStatsInterval = defaultArg ingesterStatsInterval stats.StatsInterval.Period) diff --git a/src/Propulsion.CosmosStore/CosmosStoreSink.fs b/src/Propulsion.CosmosStore/CosmosStoreSink.fs index 582232cb..a0dece3e 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreSink.fs +++ b/src/Propulsion.CosmosStore/CosmosStoreSink.fs @@ -6,7 +6,6 @@ open Propulsion.Internal open Propulsion.Sinks open Propulsion.Streams open Serilog -open System open System.Collections.Generic [] @@ -18,6 +17,18 @@ module private Impl = let private toNativeEventBody (xs : Propulsion.Sinks.EventBody) : byte[] = xs.ToArray() let defaultToNative_ = FsCodec.Core.TimelineEvent.Map toNativeEventBody + // Trimmed edition of what V4 exposes + module internal Equinox = + module CosmosStore = + module Exceptions = + open Microsoft.Azure.Cosmos + let [] (|CosmosStatus|_|) (x: exn) = match x with :? CosmosException as ce -> ValueSome ce.StatusCode | _ -> ValueNone + let (|RateLimited|RequestTimeout|CosmosStatusCode|Other|) = function + | CosmosStatus System.Net.HttpStatusCode.TooManyRequests -> RateLimited + | CosmosStatus System.Net.HttpStatusCode.RequestTimeout -> RequestTimeout + | CosmosStatus s -> CosmosStatusCode s + | _ -> Other + #else module StreamSpan = @@ -51,7 +62,7 @@ module Internal = | stream, Ok (_, Result.PrefixMissing (batch, pos)) -> log.Information("Waiting {stream} missing {gap} events ({count} events @ {pos})", stream, batch[0].Index - pos, batch.Length, batch[0].Index) | stream, Error (_, exn) -> - let level = if malformed then Events.LogEventLevel.Warning else Events.LogEventLevel.Information + let level = if malformed then LogEventLevel.Warning else Events.LogEventLevel.Information log.Write(level, exn, "Writing {stream} failed, retrying", stream) let write (log : ILogger) (ctx : EventsContext) stream (span : Event[]) ct = task { @@ -72,83 +83,24 @@ module Internal = | actual -> Result.PartialDuplicate (span |> Array.skip (actual - span[0].Index |> int)) log.Debug("Result: {res}", res') return res' } - let (|TimedOutMessage|RateLimitedMessage|TooLargeMessage|MalformedMessage|Other|) (e : exn) = - let isMalformed e = - let m = string e - m.Contains "SyntaxError: JSON.parse Error: Unexpected input at position" - || m.Contains "SyntaxError: JSON.parse Error: Invalid character at position" - match e with - | :? Microsoft.Azure.Cosmos.CosmosException as ce when ce.StatusCode = System.Net.HttpStatusCode.TooManyRequests -> RateLimitedMessage - | :? Microsoft.Azure.Cosmos.CosmosException as ce when ce.StatusCode = System.Net.HttpStatusCode.RequestEntityTooLarge -> TooLargeMessage - | e when e.GetType().FullName = "Microsoft.Azure.Documents.RequestTimeoutException" -> TimedOutMessage - | e when isMalformed e -> MalformedMessage - | _ -> Other - + let containsMalformedMessage e = + let m = string e + m.Contains "SyntaxError: JSON.parse Error: Unexpected input at position" + || m.Contains "SyntaxError: JSON.parse Error: Invalid character at position" let classify = function - | RateLimitedMessage -> ResultKind.RateLimited - | TimedOutMessage -> ResultKind.TimedOut - | TooLargeMessage -> ResultKind.TooLarge - | MalformedMessage -> ResultKind.Malformed - | Other -> ResultKind.Other + | Equinox.CosmosStore.Exceptions.RateLimited -> ResultKind.RateLimited + | Equinox.CosmosStore.Exceptions.RequestTimeout -> ResultKind.TimedOut + | Equinox.CosmosStore.Exceptions.CosmosStatusCode System.Net.HttpStatusCode.RequestEntityTooLarge -> ResultKind.TooLarge + | e when containsMalformedMessage e -> ResultKind.Malformed + | _ -> ResultKind.Other let isMalformed = function | ResultKind.RateLimited | ResultKind.TimedOut | ResultKind.Other -> false - | ResultKind.TooLarge | ResultKind.Malformed -> true - - type Stats(log : ILogger, statsInterval, stateInterval) = - inherit Scheduling.Stats(log, statsInterval, stateInterval) - let mutable okStreams, resultOk, resultDup, resultPartialDup, resultPrefix, resultExnOther = HashSet(), 0, 0, 0, 0, 0 - let mutable badCats, failStreams, rateLimited, timedOut, tooLarge, malformed = Stats.CatStats(), HashSet(), 0, 0, 0, 0 - let rlStreams, toStreams, tlStreams, mfStreams, oStreams = HashSet(), HashSet(), HashSet(), HashSet(), HashSet() - let mutable okEvents, okBytes, exnEvents, exnBytes = 0, 0L, 0, 0L - - override _.DumpStats() = - let results = resultOk + resultDup + resultPartialDup + resultPrefix - log.Information("Completed {mb:n0}MB {completed:n0}r {streams:n0}s {events:n0}e ({ok:n0} ok {dup:n0} redundant {partial:n0} partial {prefix:n0} waiting)", - Log.miB okBytes, results, okStreams.Count, okEvents, resultOk, resultDup, resultPartialDup, resultPrefix) - okStreams.Clear(); resultOk <- 0; resultDup <- 0; resultPartialDup <- 0; resultPrefix <- 0; okEvents <- 0; okBytes <- 0L - if rateLimited <> 0 || timedOut <> 0 || tooLarge <> 0 || malformed <> 0 || badCats.Any then - let fails = rateLimited + timedOut + tooLarge + malformed + resultExnOther - log.Warning("Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e Rate-limited {rateLimited:n0}r {rlStreams:n0}s Timed out {toCount:n0}r {toStreams:n0}s", - Log.miB exnBytes, fails, failStreams.Count, exnEvents, rateLimited, rlStreams.Count, timedOut, toStreams.Count) - rateLimited <- 0; timedOut <- 0; resultExnOther <- 0; failStreams.Clear(); rlStreams.Clear(); toStreams.Clear(); exnBytes <- 0L; exnEvents <- 0 - if badCats.Any then - log.Warning(" Affected cats {@badCats} Too large {tooLarge:n0}r {@tlStreams} Malformed {malformed:n0}r {@mfStreams} Other {other:n0}r {@oStreams}", - badCats.StatsDescending |> Seq.truncate 50, tooLarge, tlStreams |> Seq.truncate 100, malformed, mfStreams |> Seq.truncate 100, resultExnOther, oStreams |> Seq.truncate 100) - badCats.Clear(); tooLarge <- 0; malformed <- 0; resultExnOther <- 0; tlStreams.Clear(); mfStreams.Clear(); oStreams.Clear() - Equinox.CosmosStore.Core.Log.InternalMetrics.dump log - - override _.Handle message = - let inline adds x (set:HashSet<_>) = set.Add x |> ignore - let inline bads x (set:HashSet<_>) = badCats.Ingest(StreamName.categorize x); adds x set - match message with - | { stream = stream; result = Ok ((es, bs), res) } -> - adds stream okStreams - okEvents <- okEvents + es - okBytes <- okBytes + int64 bs - match res with - | Writer.Result.Ok _ -> resultOk <- resultOk + 1 - | Writer.Result.Duplicate _ -> resultDup <- resultDup + 1 - | Writer.Result.PartialDuplicate _ -> resultPartialDup <- resultPartialDup + 1 - | Writer.Result.PrefixMissing _ -> resultPrefix <- resultPrefix + 1 - base.RecordOk(message) - | { stream = stream; result = Error ((es, bs), Exception.Inner exn) } -> - adds stream failStreams - exnEvents <- exnEvents + es - exnBytes <- exnBytes + int64 bs - let kind = - match Writer.classify exn with - | ResultKind.RateLimited -> adds stream rlStreams; rateLimited <- rateLimited + 1; OutcomeKind.RateLimited - | ResultKind.TimedOut -> adds stream toStreams; timedOut <- timedOut + 1; OutcomeKind.Timeout - | ResultKind.TooLarge -> bads stream tlStreams; tooLarge <- tooLarge + 1; OutcomeKind.Failed - | ResultKind.Malformed -> bads stream mfStreams; malformed <- malformed + 1; OutcomeKind.Failed - | ResultKind.Other -> adds stream toStreams; timedOut <- timedOut + 1; OutcomeKind.Exception - base.RecordExn(message, kind, log.ForContext("stream", stream).ForContext("events", es), exn) - default _.HandleExn(_, _) : unit = () + | ResultKind.TooLarge | ResultKind.Malformed -> true type Dispatcher = static member Create(log : ILogger, eventsContext, itemDispatcher, ?maxEvents, ?maxBytes) = - let maxEvents, maxBytes = defaultArg maxEvents 16384, defaultArg maxBytes (1024 * 1024 - (*fudge*)4096) + let maxEvents, maxBytes = defaultArg maxEvents 16384, defaultArg maxBytes (256 * 1024) let writerResultLog = log.ForContext() let attemptWrite stream span ct = task { let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span @@ -169,27 +121,73 @@ module Internal = struct (ss.WritePos, res) Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretWriteResultProgress) +type WriterResult = Internal.Writer.Result + +type CosmosStoreSinkStats(log : ILogger, statsInterval, stateInterval) = + inherit Scheduling.Stats(log, statsInterval, stateInterval) + let mutable okStreams, resultOk, resultDup, resultPartialDup, resultPrefix, resultExnOther = HashSet(), 0, 0, 0, 0, 0 + let mutable badCats, failStreams, rateLimited, timedOut, tooLarge, malformed = Stats.CatStats(), HashSet(), 0, 0, 0, 0 + let rlStreams, toStreams, tlStreams, mfStreams, oStreams = HashSet(), HashSet(), HashSet(), HashSet(), HashSet() + let mutable okEvents, okBytes, exnEvents, exnBytes = 0, 0L, 0, 0L + override _.Handle message = + let inline adds x (set:HashSet<_>) = set.Add x |> ignore + let inline bads x (set:HashSet<_>) = badCats.Ingest(StreamName.categorize x); adds x set + match message with + | { stream = stream; result = Ok ((es, bs), res) } -> + adds stream okStreams + okEvents <- okEvents + es + okBytes <- okBytes + int64 bs + match res with + | WriterResult.Ok _ -> resultOk <- resultOk + 1 + | WriterResult.Duplicate _ -> resultDup <- resultDup + 1 + | WriterResult.PartialDuplicate _ -> resultPartialDup <- resultPartialDup + 1 + | WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1 + base.RecordOk(message) + | { stream = stream; result = Error ((es, bs), Exception.Inner exn) } -> + adds stream failStreams + exnEvents <- exnEvents + es + exnBytes <- exnBytes + int64 bs + let kind = + match Internal.Writer.classify exn with + | Internal.Writer.ResultKind.RateLimited -> adds stream rlStreams; rateLimited <- rateLimited + 1; OutcomeKind.RateLimited + | Internal.Writer.ResultKind.TimedOut -> adds stream toStreams; timedOut <- timedOut + 1; OutcomeKind.Timeout + | Internal.Writer.ResultKind.TooLarge -> bads stream tlStreams; tooLarge <- tooLarge + 1; OutcomeKind.Failed + | Internal.Writer.ResultKind.Malformed -> bads stream mfStreams; malformed <- malformed + 1; OutcomeKind.Failed + | Internal.Writer.ResultKind.Other -> adds stream toStreams; timedOut <- timedOut + 1; OutcomeKind.Exception + base.RecordExn(message, kind, log.ForContext("stream", stream).ForContext("events", es), exn) + override _.DumpStats() = + let results = resultOk + resultDup + resultPartialDup + resultPrefix + log.Information("Completed {mb:n0}MB {completed:n0}r {streams:n0}s {events:n0}e ({ok:n0} ok {dup:n0} redundant {partial:n0} partial {prefix:n0} waiting)", + Log.miB okBytes, results, okStreams.Count, okEvents, resultOk, resultDup, resultPartialDup, resultPrefix) + okStreams.Clear(); resultOk <- 0; resultDup <- 0; resultPartialDup <- 0; resultPrefix <- 0; okEvents <- 0; okBytes <- 0L + if rateLimited <> 0 || timedOut <> 0 || tooLarge <> 0 || malformed <> 0 || badCats.Any then + let fails = rateLimited + timedOut + tooLarge + malformed + resultExnOther + log.Warning(" Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e Rate-limited {rateLimited:n0}r {rlStreams:n0}s Timed out {toCount:n0}r {toStreams:n0}s", + Log.miB exnBytes, fails, failStreams.Count, exnEvents, rateLimited, rlStreams.Count, timedOut, toStreams.Count) + rateLimited <- 0; timedOut <- 0; resultExnOther <- 0; failStreams.Clear(); rlStreams.Clear(); toStreams.Clear(); exnBytes <- 0L; exnEvents <- 0 + if badCats.Any then + log.Warning(" Affected cats {@badCats} Too large {tooLarge:n0}r {@tlStreams} Malformed {malformed:n0}r {@mfStreams} Other {other:n0}r {@oStreams}", + badCats.StatsDescending |> Seq.truncate 50, tooLarge, tlStreams |> Seq.truncate 100, malformed, mfStreams |> Seq.truncate 100, resultExnOther, oStreams |> Seq.truncate 100) + badCats.Clear(); tooLarge <- 0; malformed <- 0; resultExnOther <- 0; tlStreams.Clear(); mfStreams.Clear(); oStreams.Clear() + Equinox.CosmosStore.Core.Log.InternalMetrics.dump log + + override _.HandleExn(log, exn) = log.Warning(exn, "Unhandled") + type CosmosStoreSink = /// Starts a Sink that ingests all submitted events into the supplied context static member Start - ( log : ILogger, maxReadAhead, eventsContext, maxConcurrentStreams, - // Default 5m - ?statsInterval, - // Default 5m - ?stateInterval, + ( log : ILogger, maxReadAhead, eventsContext, maxConcurrentStreams, stats: CosmosStoreSinkStats, ?purgeInterval, ?wakeForResults, ?idleDelay, // Default: 16384 ?maxEvents, - // Default: 1MB (limited by maximum size of a CosmosDB stored procedure invocation) + // Default: 256KB (limited by maximum size of a CosmosDB stored procedure invocation) ?maxBytes, ?ingesterStatsInterval) : Sink = - let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.) let dispatcher = Internal.Dispatcher.Create(log, eventsContext, maxConcurrentStreams, ?maxEvents = maxEvents, ?maxBytes = maxBytes) let scheduler = - let stats = Internal.Stats(log.ForContext(), statsInterval, stateInterval) let dumpStreams logStreamStates _log = logStreamStates Event.storedSize Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5, prioritizeStreamsBy = Event.storedSize, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay) - Projector.Pipeline.Start(log, scheduler.Pump, maxReadAhead, scheduler, ingesterStatsInterval = defaultArg ingesterStatsInterval statsInterval) + Projector.Pipeline.Start(log, scheduler.Pump, maxReadAhead, scheduler, ingesterStatsInterval = defaultArg ingesterStatsInterval stats.StatsInterval.Period) diff --git a/src/Propulsion.CosmosStore/CosmosStoreSource.fs b/src/Propulsion.CosmosStore/CosmosStoreSource.fs index 70938fc5..390468b1 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreSource.fs +++ b/src/Propulsion.CosmosStore/CosmosStoreSource.fs @@ -20,9 +20,8 @@ module Log = | Read of ReadMetric | Lag of LagMetric - /// Attach a property to the captured event record to hold the metric information - // Sidestep Log.ForContext converting to a string; see https://github.com/serilog/serilog/issues/1124 let [] PropertyTag = "propulsionCosmosEvent" + /// Attach a property to the captured event record to hold the metric information let internal withMetric (value : Metric) = Log.withScalarProperty PropertyTag value let [] (|MetricEvent|_|) (logEvent : Serilog.Events.LogEvent) : Metric voption = let mutable p = Unchecked.defaultof<_> diff --git a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj index 727a764e..cf33337f 100644 --- a/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj +++ b/src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj @@ -17,7 +17,7 @@ - + diff --git a/src/Propulsion.CosmosStore/ReaderCheckpoint.fs b/src/Propulsion.CosmosStore/ReaderCheckpoint.fs index a578d697..266c437c 100644 --- a/src/Propulsion.CosmosStore/ReaderCheckpoint.fs +++ b/src/Propulsion.CosmosStore/ReaderCheckpoint.fs @@ -163,6 +163,12 @@ module MemoryStore = let resolve = Equinox.Decider.forStream log cat Service(streamId >> resolve, consumerGroupName, defaultCheckpointFrequency) #else +let private defaultCacheDuration = TimeSpan.FromMinutes 20. +#if COSMOSV3 +let private cacheStrategy cache = Equinox.CosmosStore.CachingStrategy.SlidingWindow (cache, defaultCacheDuration) +#else +let private cacheStrategy cache = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration) +#endif #if DYNAMOSTORE module DynamoStore = @@ -170,8 +176,7 @@ module DynamoStore = let accessStrategy = AccessStrategy.Custom (Fold.isOrigin, Fold.transmute) let create log (consumerGroupName, defaultCheckpointFrequency) (context, cache) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - let cat = DynamoStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy) + let cat = DynamoStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy cache) let resolve = Equinox.Decider.forStream log cat Service(streamId >> resolve, consumerGroupName, defaultCheckpointFrequency) #else @@ -182,8 +187,7 @@ module CosmosStore = let accessStrategy = AccessStrategy.Custom (Fold.isOrigin, Fold.transmute) let create log (consumerGroupName, defaultCheckpointFrequency) (context, cache) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - let cat = CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy) + let cat = CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy cache) let resolve = Equinox.Decider.forStream log cat Service(streamId >> resolve, consumerGroupName, defaultCheckpointFrequency) #else @@ -198,8 +202,7 @@ module CosmosStore = let accessStrategy = AccessStrategy.Custom (Fold.isOrigin, Fold.transmute) let create log defaultCheckpointFrequency (context, cache) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - let cat = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + let cat = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy cache, accessStrategy) let resolveStream opt sn = cat.Resolve(sn, opt) create log defaultCheckpointFrequency resolveStream #endif diff --git a/src/Propulsion.DynamoStore.Notifier/Handler.fs b/src/Propulsion.DynamoStore.Notifier/Handler.fs index 90f7db9d..754430fb 100644 --- a/src/Propulsion.DynamoStore.Notifier/Handler.fs +++ b/src/Propulsion.DynamoStore.Notifier/Handler.fs @@ -72,7 +72,7 @@ let private publishBatch (client : IAmazonSimpleNotificationService) (log : Seri if res.HttpStatusCode <> HttpStatusCode.OK || res.Failed.Count <> 0 then let fails = [| for x in res.Failed -> struct (x.Code, x.SenderFault, x.Message) |] log.Warning("PublishBatchAsync {res}. Fails: {fails}", res.HttpStatusCode, fails) - failwithf "PublishBatchAsync result %A %A" res.HttpStatusCode fails } + failwithf $"PublishBatchAsync result {res.HttpStatusCode} %A{fails}" } type SnsClient(topicArn) = diff --git a/src/Propulsion.DynamoStore/AppendsEpoch.fs b/src/Propulsion.DynamoStore/AppendsEpoch.fs index 6a263e7c..9f6c715b 100644 --- a/src/Propulsion.DynamoStore/AppendsEpoch.fs +++ b/src/Propulsion.DynamoStore/AppendsEpoch.fs @@ -34,8 +34,8 @@ module Events = | Ingested of Ingested | Closed interface TypeShape.UnionContract.IUnionContract - let codec = EventCodec.gen - let isEventTypeClosed (et : string) = et = nameof Closed + let codec = Store.Codec.gen + let isEventTypeClosed (et: string) = et = nameof Closed #endif #if !PROPULSION_DYNAMOSTORE_NOTIFIER @@ -133,9 +133,9 @@ type Service internal (onlyWarnOnGap, shouldClose, resolve: AppendsPartitionId * let decide (c: Equinox.ISyncContext<_>) = Ingest.decide onlyWarnOnGap (shouldClose (c.StreamEventBytes, c.Version)) spans c.State decider.TransactEx(decide, Equinox.AnyCachedValue) -module Config = +module Factory = - let private createCategory (context, cache) = Config.createUnoptimized Category Events.codec Fold.initial Fold.fold (context, Some cache) + let private createCategory (context, cache) = Store.Dynamo.createUnoptimized Category Events.codec Fold.initial Fold.fold (context, Some cache) let create log (maxBytes: int, maxVersion: int64, maxStreams: int, onlyWarnOnGap) store = let resolve = createCategory store |> Equinox.Decider.forStream log let shouldClose (totalBytes : int64 voption, version) totalStreams = @@ -151,7 +151,7 @@ module Config = module Reader = type Event = (struct (int64 * Events.Event)) - let codec : FsCodec.IEventCodec = EventCodec.withIndex + let codec : FsCodec.IEventCodec = Streams.decWithIndex type State = { changes : struct (int * Events.StreamSpan[])[]; closed : bool } let initial = { changes = Array.empty; closed = false } @@ -174,9 +174,9 @@ module Reader = let decider = resolve (partitionId, epochId, System.Int64.MaxValue) decider.QueryEx(fun c -> c.Version) - module Config = + module Factory = - let private createCategory context minIndex = Config.createWithOriginIndex Category codec initial fold context minIndex + let private createCategory context minIndex = Store.Dynamo.createWithOriginIndex Category codec initial fold context minIndex let create log context = let resolve minIndex = Equinox.Decider.forStream log (createCategory context minIndex) Service(fun (pid, eid, minIndex) -> streamId (pid, eid) |> resolve minIndex) diff --git a/src/Propulsion.DynamoStore/AppendsIndex.fs b/src/Propulsion.DynamoStore/AppendsIndex.fs index 8cf679f2..18dab89a 100644 --- a/src/Propulsion.DynamoStore/AppendsIndex.fs +++ b/src/Propulsion.DynamoStore/AppendsIndex.fs @@ -24,21 +24,24 @@ module Events = | Started of Started | Snapshotted of {| active : Map |} interface TypeShape.UnionContract.IUnionContract - let codec = EventCodec.gen + let codec = Store.Codec.gen module Fold = type State = Map - let initial = Map.empty + + module Snapshot = + + let private generate (s: State) = Events.Snapshotted {| active = s |} + let private isOrigin = function Events.Snapshotted _ -> true | _ -> false + let config = isOrigin, generate + let private evolve state = function | Events.Started e -> state |> Map.add e.partition e.epoch | Events.Snapshotted e -> e.active let fold = Array.fold evolve - let isOrigin = function Events.Snapshotted _ -> true | _ -> false - let toSnapshot s = Events.Snapshotted {| active = s |} - let readEpochId partitionId (state : Fold.State) = state |> Map.tryFind partitionId @@ -60,9 +63,9 @@ type Service internal (resolve: unit -> Equinox.Decider Equinox.Decider.forStream log let create log (context, cache) = Service(streamId >> resolve log (context, Some cache)) @@ -89,5 +92,5 @@ module Reader = let decider = resolve () decider.Query(readIngestionEpochId partitionId) - let create log context = Service(streamId >> Config.resolve log (context, None)) + let create log context = Service(streamId >> Factory.resolve log (context, None)) #endif diff --git a/src/Propulsion.DynamoStore/DynamoStoreIndex.fs b/src/Propulsion.DynamoStore/DynamoStoreIndex.fs index f54882c3..e39a22c8 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreIndex.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreIndex.fs @@ -107,7 +107,7 @@ module Reader = return spans, state.closed, sizeB } let loadIndex (log, storeLog, context) partitionId gapsLimit: Async = async { - let indexEpochs = AppendsEpoch.Reader.Config.create storeLog context + let indexEpochs = AppendsEpoch.Reader.Factory.create storeLog context let mutable epochId, more, totalB, totalSpans = AppendsEpochId.initial, true, 0L, 0L let state = Buffer() let mutable invalidSpans = 0 diff --git a/src/Propulsion.DynamoStore/DynamoStoreIndexer.fs b/src/Propulsion.DynamoStore/DynamoStoreIndexer.fs index 53e60a05..fc8d196a 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreIndexer.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreIndexer.fs @@ -9,8 +9,8 @@ type DynamoStoreIndexer(log : Serilog.ILogger, context, cache, epochBytesCutoff, let onlyWarnOnGap = defaultArg onlyWarnOnGap false let ingester = - let epochs = AppendsEpoch.Config.create storeLog (epochBytesCutoff, maxVersion, maxStreams, onlyWarnOnGap) (context, cache) - let index = AppendsIndex.Config.create storeLog (context, cache) + let epochs = AppendsEpoch.Factory.create storeLog (epochBytesCutoff, maxVersion, maxStreams, onlyWarnOnGap) (context, cache) + let index = AppendsIndex.Factory.create storeLog (context, cache) let createIngester partitionId = let log = log.ForContext("partition", partitionId) let readIngestionEpoch () = index.ReadIngestionEpochId partitionId diff --git a/src/Propulsion.DynamoStore/DynamoStoreSource.fs b/src/Propulsion.DynamoStore/DynamoStoreSource.fs index 95b2437e..943f269f 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreSource.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreSource.fs @@ -18,18 +18,18 @@ module private Impl = let readTailPositionForPartition log context (AppendsPartitionId.Parse partitionId) ct = task { let index = AppendsIndex.Reader.create log context let! epochId = index.ReadIngestionEpochId(partitionId) |> Async.executeAsTask ct - let epochs = AppendsEpoch.Reader.Config.create log context + let epochs = AppendsEpoch.Reader.Factory.create log context let! version = epochs.ReadVersion(partitionId, epochId) |> Async.executeAsTask ct return Checkpoint.positionOfEpochAndOffset epochId version } let logReadFailure (storeLog : Serilog.ILogger) = - let force = storeLog.IsEnabled Serilog.Events.LogEventLevel.Verbose + let force = storeLog.IsEnabled LogEventLevel.Verbose function | Exceptions.ProvisionedThroughputExceeded when not force -> () | e -> storeLog.Warning(e, "DynamoStoreSource read failure") let logCommitFailure (storeLog : Serilog.ILogger) = - let force = storeLog.IsEnabled Serilog.Events.LogEventLevel.Verbose + let force = storeLog.IsEnabled LogEventLevel.Verbose function | Exceptions.ProvisionedThroughputExceeded when not force -> () | e -> storeLog.Warning(e, "DynamoStoreSource commit failure") @@ -45,7 +45,7 @@ module private Impl = let materializeIndexEpochAsBatchesOfStreamEvents (log : Serilog.ILogger, sourceId, storeLog) (hydrating, maybeLoad : _ -> _ -> (CancellationToken -> Task<_>) voption, loadDop) batchCutoff (context : DynamoStoreContext) (AppendsPartitionId.Parse pid) (Checkpoint.Parse (epochId, offset)) ct = taskSeq { - let epochs = AppendsEpoch.Reader.Config.create storeLog context + let epochs = AppendsEpoch.Reader.Factory.create storeLog context let sw = Stopwatch.start () let! _maybeSize, version, state = epochs.Read(pid, epochId, offset) |> Async.executeAsTask ct let totalChanges = state.changes.Length diff --git a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj index 29febd14..9ac04e2d 100644 --- a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj +++ b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj @@ -7,6 +7,7 @@ + @@ -22,7 +23,7 @@ - + diff --git a/src/Propulsion.DynamoStore/Store.fs b/src/Propulsion.DynamoStore/Store.fs new file mode 100644 index 00000000..476c497d --- /dev/null +++ b/src/Propulsion.DynamoStore/Store.fs @@ -0,0 +1,33 @@ +module internal Propulsion.DynamoStore.Store + +module Dynamo = + + open Equinox.DynamoStore + + let private defaultCacheDuration = System.TimeSpan.FromMinutes 20. + let private create name codec initial fold accessStrategy (context, cache) = + let cachingStrategy = match cache with None -> Equinox.CachingStrategy.NoCaching | Some cache -> Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration) + DynamoStoreCategory(context, name, codec, fold, initial, accessStrategy, cachingStrategy) + + let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) = + let accessStrategy = AccessStrategy.Snapshot (isOrigin, toSnapshot) + create name codec initial fold accessStrategy (context, cache) + + let createUnoptimized name codec initial fold (context, cache) = + let accessStrategy = AccessStrategy.Unoptimized + create name codec initial fold accessStrategy (context, cache) + + let createWithOriginIndex name codec initial fold context minIndex = + // TOCONSIDER include way to limit item count being read + // TOCONSIDER implement a loader hint to pass minIndex to the query as an additional filter + let isOrigin struct (i, _) = i <= minIndex + // There _should_ always be an event at minIndex - if there isn't for any reason, the load might go back one event too far + // Here we trim it for correctness (although Propulsion would technically ignore it) + let trimPotentialOverstep = Seq.filter (fun struct (i, _e) -> i >= minIndex) + let accessStrategy = AccessStrategy.MultiSnapshot (isOrigin, fun _ -> failwith "writing not applicable") + create name codec initial (fun s -> trimPotentialOverstep >> fold s) accessStrategy (context, None) + +module internal Codec = + + let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> = + FsCodec.SystemTextJson.Codec.Create<'t>() |> FsCodec.Deflate.EncodeTryDeflate diff --git a/src/Propulsion.DynamoStore/Types.fs b/src/Propulsion.DynamoStore/Types.fs index 3710828c..e49a9e5a 100644 --- a/src/Propulsion.DynamoStore/Types.fs +++ b/src/Propulsion.DynamoStore/Types.fs @@ -70,42 +70,13 @@ module internal FeedSourceId = let wellKnownId : Propulsion.Feed.SourceId = UMX.tag "dynamoStore" -module internal Config = +module Streams = - open Equinox.DynamoStore - - let private defaultCacheDuration = System.TimeSpan.FromMinutes 20. - - let private create name codec initial fold accessStrategy (context, cache) = - let cs = match cache with None -> CachingStrategy.NoCaching | Some cache -> CachingStrategy.SlidingWindow (cache, defaultCacheDuration) - DynamoStoreCategory(context, name, codec, fold, initial, accessStrategy, cs) - - let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) = - let accessStrategy = AccessStrategy.Snapshot (isOrigin, toSnapshot) - create name codec initial fold accessStrategy (context, cache) - - let createUnoptimized name codec initial fold (context, cache) = - let accessStrategy = AccessStrategy.Unoptimized - create name codec initial fold accessStrategy (context, cache) - - let createWithOriginIndex name codec initial fold context minIndex = - // TOCONSIDER include way to limit item count being read - // TOCONSIDER implement a loader hint to pass minIndex to the query as an additional filter - let isOrigin struct (i, _) = i <= minIndex - // There _should_ always be an event at minIndex - if there isn't for any reason, the load might go back one event too far - // Here we trim it for correctness (although Propulsion would technically ignore it) - let trimPotentialOverstep = Seq.filter (fun struct (i, _e) -> i >= minIndex) - let accessStrategy = AccessStrategy.MultiSnapshot (isOrigin, fun _ -> failwith "writing not applicable") - create name codec initial (fun s -> trimPotentialOverstep >> fold s) accessStrategy (context, None) - -module internal EventCodec = - - let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> = - FsCodec.SystemTextJson.Codec.Create<'t>() |> FsCodec.Deflate.EncodeTryDeflate let private withUpconverter<'c, 'e when 'c :> TypeShape.UnionContract.IUnionContract> up : FsCodec.IEventCodec<'e, _, _> = let down (_ : 'e) = failwith "Unexpected" FsCodec.SystemTextJson.Codec.Create<'e, 'c, _>(up, down) |> FsCodec.Deflate.EncodeTryDeflate - let withIndex<'c when 'c :> TypeShape.UnionContract.IUnionContract> : FsCodec.IEventCodec = + let decWithIndex<'c when 'c :> TypeShape.UnionContract.IUnionContract> : FsCodec.IEventCodec = let up (raw : FsCodec.ITimelineEvent<_>) e = struct (raw.Index, e) withUpconverter<'c, struct (int64 * 'c)> up + #endif diff --git a/src/Propulsion.EventStore/Checkpoint.fs b/src/Propulsion.EventStore/Checkpoint.fs index fbcc6c51..9208c10f 100755 --- a/src/Propulsion.EventStore/Checkpoint.fs +++ b/src/Propulsion.EventStore/Checkpoint.fs @@ -1,8 +1,7 @@ module Propulsion.EventStore.Checkpoint open FSharp.UMX -open Propulsion.Internal -open System // must shadow UMX to use DateTimeOffSet +open System // must shadow UMX to use DateTimeOffset open System.Threading.Tasks type CheckpointSeriesId = string @@ -116,14 +115,11 @@ type Service internal (resolve: CheckpointSeriesId -> Equinox.DeciderCore> resolve) - // General pattern is that an Equinox Service is a singleton and calls pass an identifier for a stream per call // This light wrapper means we can adhere to that general pattern yet still end up with legible code while we in practice only maintain a single checkpoint series per running app -type CheckpointSeries(groupName, resolve, ?log) = +type CheckpointSeries(groupName, resolve) = let seriesId = CheckpointSeriesId.ofGroupName groupName - let log = match log with Some x -> x | None -> Serilog.Log.ForContext() - let inner = create (resolve log) + let inner = Service(streamId >> resolve) member _.Read(ct): Task = inner.Read(seriesId, ct) member _.Start(freq, pos, ct): Task = inner.Start(seriesId, freq, pos, ct) diff --git a/src/Propulsion.EventStore/EventStoreSink.fs b/src/Propulsion.EventStore/EventStoreSink.fs index 5e994bc3..9cfefa23 100755 --- a/src/Propulsion.EventStore/EventStoreSink.fs +++ b/src/Propulsion.EventStore/EventStoreSink.fs @@ -13,8 +13,6 @@ open Propulsion.Sinks open Propulsion.Streams open Serilog open System.Collections.Generic -open System -open System.Threading module Internal = @@ -61,53 +59,6 @@ module Internal = type [] ResultKind = TimedOut | Other - type Stats(log : ILogger, statsInterval, stateInterval) = - inherit Scheduling.Stats(log, statsInterval, stateInterval) - let mutable okStreams, badCats, failStreams, toStreams, oStreams = HashSet(), Stats.CatStats(), HashSet(), HashSet(), HashSet() - let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExnOther, timedOut = 0, 0, 0, 0, 0, 0 - let mutable okEvents, okBytes, exnEvents, exnBytes = 0, 0L, 0, 0L - - override _.DumpStats() = - let results = resultOk + resultDup + resultPartialDup + resultPrefix - log.Information("Completed {mb:n0}MB {completed:n0}r {streams:n0}s {events:n0}e ({ok:n0} ok {dup:n0} redundant {partial:n0} partial {prefix:n0} waiting)", - Log.miB okBytes, results, okStreams.Count, okEvents, resultOk, resultDup, resultPartialDup, resultPrefix) - okStreams.Clear(); resultOk <- 0; resultDup <- 0; resultPartialDup <- 0; resultPrefix <- 0; okEvents <- 0; okBytes <- 0L - if timedOut <> 0 || badCats.Any then - let fails = timedOut + resultExnOther - log.Warning("Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e Timed out {toCount:n0}r {toStreams:n0}s", - Log.miB exnBytes, fails, failStreams.Count, exnEvents, timedOut, toStreams.Count) - timedOut <- 0; resultExnOther <- 0; failStreams.Clear(); toStreams.Clear(); exnBytes <- 0L; exnEvents <- 0 - if badCats.Any then - log.Warning(" Affected cats {@badCats} Other {other:n0}r {@oStreams}", - badCats.StatsDescending |> Seq.truncate 50, resultExnOther, oStreams |> Seq.truncate 100) - badCats.Clear(); resultExnOther <- 0; oStreams.Clear() - Log.InternalMetrics.dump log - - override _.Handle message = - let inline adds x (set : HashSet<_>) = set.Add x |> ignore - let inline bads streamName (set : HashSet<_>) = badCats.Ingest(StreamName.categorize streamName); adds streamName set - match message with - | { stream = stream; result = Ok ((es, bs), res) } -> - adds stream okStreams - okEvents <- okEvents + es - okBytes <- okBytes + int64 bs - match res with - | Writer.Result.Ok _ -> resultOk <- resultOk + 1 - | Writer.Result.Duplicate _ -> resultDup <- resultDup + 1 - | Writer.Result.PartialDuplicate _ -> resultPartialDup <- resultPartialDup + 1 - | Writer.Result.PrefixMissing _ -> resultPrefix <- resultPrefix + 1 - base.RecordOk(message) - | { stream = stream; result = Error ((es, bs), Exception.Inner exn) } -> - adds stream failStreams - exnEvents <- exnEvents + es - exnBytes <- exnBytes + int64 bs - let kind = OutcomeKind.classify exn - match kind with - | OutcomeKind.Timeout -> adds stream toStreams; timedOut <- timedOut + 1 - | _ -> bads stream oStreams; resultExnOther <- resultExnOther + 1 - base.RecordExn(message, kind, log.ForContext("stream", stream).ForContext("events", es), exn) - default _.HandleExn(_, _) : unit = () - type Dispatcher = static member Create(log : ILogger, storeLog, connections : _[], maxDop) = @@ -115,7 +66,7 @@ module Internal = let mutable robin = 0 let attemptWrite stream span ct = task { - let index = Interlocked.Increment(&robin) % connections.Length + let index = System.Threading.Interlocked.Increment(&robin) % connections.Length let selectedConnection = connections[index] let maxEvents, maxBytes = 65536, 4 * 1024 * 1024 - (*fudge*)4096 let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span @@ -134,15 +85,60 @@ module Internal = struct (ss.WritePos, res) Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, attemptWrite, interpretWriteResultProgress) +type WriterResult = Internal.Writer.Result + +type EventStoreSinkStats(log : ILogger, statsInterval, stateInterval) = + inherit Scheduling.Stats(log, statsInterval, stateInterval) + + let mutable okStreams, badCats, failStreams, toStreams, oStreams = HashSet(), Stats.CatStats(), HashSet(), HashSet(), HashSet() + let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExnOther, timedOut = 0, 0, 0, 0, 0, 0 + let mutable okEvents, okBytes, exnEvents, exnBytes = 0, 0L, 0, 0L + override _.Handle message = + let inline adds x (set : HashSet<_>) = set.Add x |> ignore + let inline bads streamName (set : HashSet<_>) = badCats.Ingest(StreamName.categorize streamName); adds streamName set + match message with + | { stream = stream; result = Ok ((es, bs), res) } -> + adds stream okStreams + okEvents <- okEvents + es + okBytes <- okBytes + int64 bs + match res with + | WriterResult.Ok _ -> resultOk <- resultOk + 1 + | WriterResult.Duplicate _ -> resultDup <- resultDup + 1 + | WriterResult.PartialDuplicate _ -> resultPartialDup <- resultPartialDup + 1 + | WriterResult.PrefixMissing _ -> resultPrefix <- resultPrefix + 1 + base.RecordOk(message) + | { stream = stream; result = Error ((es, bs), Exception.Inner exn) } -> + adds stream failStreams + exnEvents <- exnEvents + es + exnBytes <- exnBytes + int64 bs + let kind = OutcomeKind.classify exn + match kind with + | OutcomeKind.Timeout -> adds stream toStreams; timedOut <- timedOut + 1 + | _ -> bads stream oStreams; resultExnOther <- resultExnOther + 1 + base.RecordExn(message, kind, log.ForContext("stream", stream).ForContext("events", es), exn) + override _.DumpStats() = + let results = resultOk + resultDup + resultPartialDup + resultPrefix + log.Information("Completed {mb:n0}MB {completed:n0}r {streams:n0}s {events:n0}e ({ok:n0} ok {dup:n0} redundant {partial:n0} partial {prefix:n0} waiting)", + Log.miB okBytes, results, okStreams.Count, okEvents, resultOk, resultDup, resultPartialDup, resultPrefix) + okStreams.Clear(); resultOk <- 0; resultDup <- 0; resultPartialDup <- 0; resultPrefix <- 0; okEvents <- 0; okBytes <- 0L + if timedOut <> 0 || badCats.Any then + let fails = timedOut + resultExnOther + log.Warning(" Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e Timed out {toCount:n0}r {toStreams:n0}s", + Log.miB exnBytes, fails, failStreams.Count, exnEvents, timedOut, toStreams.Count) + timedOut <- 0; resultExnOther <- 0; failStreams.Clear(); toStreams.Clear(); exnBytes <- 0L; exnEvents <- 0 + if badCats.Any then + log.Warning(" Affected cats {@badCats} Other {other:n0}r {@oStreams}", + badCats.StatsDescending |> Seq.truncate 50, resultExnOther, oStreams |> Seq.truncate 100) + badCats.Clear(); resultExnOther <- 0; oStreams.Clear() + Log.InternalMetrics.dump log + + override _.HandleExn(log, exn) = log.Warning(exn, "Unhandled") + type EventStoreSink = /// Starts a Sink that ingests all submitted events into the supplied connections static member Start - ( log : ILogger, storeLog, maxReadAhead, connections, maxConcurrentStreams, - // Default 5m - ?statsInterval, - // Default 5m - ?stateInterval, + ( log : ILogger, storeLog, maxReadAhead, connections, maxConcurrentStreams, stats: EventStoreSinkStats, // Frequency with which to jettison Write Position information for inactive streams in order to limit memory consumption // NOTE: Can impair performance and/or increase costs of writes as it inhibits the ability of the ingester to discard redundant inputs ?purgeInterval, @@ -151,9 +147,7 @@ type EventStoreSink = ?ingesterStatsInterval) : Sink = let dispatcher = Internal.Dispatcher.Create(log, storeLog, connections, maxConcurrentStreams) - let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.) let scheduler = - let stats = Internal.Stats(log.ForContext(), statsInterval, stateInterval) let dumpStreams logStreamStates _log = logStreamStates Event.storedSize Scheduling.Engine(dispatcher, stats, dumpStreams, pendingBufferSize = 5, ?purgeInterval = purgeInterval, ?idleDelay = idleDelay) - Projector.Pipeline.Start( log, scheduler.Pump, maxReadAhead, scheduler, ingesterStatsInterval = defaultArg ingesterStatsInterval statsInterval) + Projector.Pipeline.Start(log, scheduler.Pump, maxReadAhead, scheduler, ingesterStatsInterval = defaultArg ingesterStatsInterval stats.StatsInterval.Period) diff --git a/src/Propulsion.EventStore/Propulsion.EventStore.fsproj b/src/Propulsion.EventStore/Propulsion.EventStore.fsproj index 14a08674..94a9fa7a 100644 --- a/src/Propulsion.EventStore/Propulsion.EventStore.fsproj +++ b/src/Propulsion.EventStore/Propulsion.EventStore.fsproj @@ -16,7 +16,7 @@ - + diff --git a/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj b/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj index 0130266e..2445875f 100644 --- a/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj +++ b/src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj @@ -15,7 +15,7 @@ - + diff --git a/src/Propulsion.Feed/FeedReader.fs b/src/Propulsion.Feed/FeedReader.fs index a40b98a2..4033cb34 100644 --- a/src/Propulsion.Feed/FeedReader.fs +++ b/src/Propulsion.Feed/FeedReader.fs @@ -32,9 +32,8 @@ module Log = token : Nullable; latency : TimeSpan; pages : int; items : int ingestLatency : TimeSpan; ingestQueued : int } - /// Attach a property to the captured event record to hold the metric information - // Sidestep Log.ForContext converting to a string; see https://github.com/serilog/serilog/issues/1124 let [] PropertyTag = "propulsionFeedEvent" + /// Attach a property to the captured event record to hold the metric information let internal withMetric (value : Metric) = Log.withScalarProperty PropertyTag value let [] (|MetricEvent|_|) (logEvent : Serilog.Events.LogEvent) : Metric voption = let mutable p = Unchecked.defaultof<_> @@ -144,7 +143,7 @@ type FeedReader stats.RecordBatch(readLatency, batch) match Array.length batch.items with | 0 -> log.Verbose("Page {latency:f0}ms Checkpoint {checkpoint} Empty", readLatency.TotalMilliseconds, batch.checkpoint) - | c -> if log.IsEnabled(Serilog.Events.LogEventLevel.Debug) then + | c -> if log.IsEnabled LogEventLevel.Debug then let streamsCount = batch.items |> Seq.distinctBy ValueTuple.fst |> Seq.length log.Debug("Page {latency:f0}ms Checkpoint {checkpoint} {eventCount}e {streamCount}s", readLatency.TotalMilliseconds, batch.checkpoint, c, streamsCount) diff --git a/src/Propulsion.Feed/FeedSource.fs b/src/Propulsion.Feed/FeedSource.fs index 183ad92a..72a82180 100644 --- a/src/Propulsion.Feed/FeedSource.fs +++ b/src/Propulsion.Feed/FeedSource.fs @@ -236,7 +236,7 @@ and FeedMonitor internal (log : Serilog.ILogger, positions : TranchePositions, s let isDrainedNow () = positions.Current() |> isDrained let linger = match lingerTime with None -> TimeSpan.Zero | Some lingerF -> lingerF (isDrainedNow ()) propagationDelay propUsed procUsed let skipLinger = linger = TimeSpan.Zero - let ll = if skipLinger then Serilog.Events.LogEventLevel.Information else Serilog.Events.LogEventLevel.Debug + let ll = if skipLinger then LogEventLevel.Information else LogEventLevel.Debug let originalCompleted = currentCompleted |> Seq.cache if log.IsEnabled ll then let completed = positions.Current() |> choose (fun v -> v.completed) diff --git a/src/Propulsion.MemoryStore/MemoryStoreSource.fs b/src/Propulsion.MemoryStore/MemoryStoreSource.fs index 55dce1a5..5a07c61d 100644 --- a/src/Propulsion.MemoryStore/MemoryStoreSource.fs +++ b/src/Propulsion.MemoryStore/MemoryStoreSource.fs @@ -14,7 +14,6 @@ type MemoryStoreSource<'F>(log, store : Equinox.MemoryStore.VolatileStore<'F>, c let ingester : Ingestion.Ingester<_> = sink.StartIngester(log, 0) let positions = TranchePositions() let monitor = lazy MemoryStoreMonitor(log, positions, sink) - let debug, verbose = log.IsEnabled Serilog.Events.LogEventLevel.Debug, log.IsEnabled Serilog.Events.LogEventLevel.Verbose // epoch index of most recently prepared submission - conceptually events arrive concurrently though V4 impl makes them serial let mutable prepared = -1L @@ -25,10 +24,12 @@ type MemoryStoreSource<'F>(log, store : Equinox.MemoryStore.VolatileStore<'F>, c let handleStoreCommitted struct (categoryName, aggregateId, items : Propulsion.Sinks.StreamEvent[]) = let epoch = Interlocked.Increment &prepared positions.Prepared <- epoch - if debug then MemoryStoreLogger.renderSubmit log (epoch, categoryName, aggregateId, items |> Array.map ValueTuple.snd) + if log.IsEnabled LogEventLevel.Debug then + MemoryStoreLogger.renderSubmit log (epoch, categoryName, aggregateId, items |> Array.map ValueTuple.snd) // Completion notifications are guaranteed to be delivered deterministically, in order of submission let markCompleted () = - if verbose then MemoryStoreLogger.renderCompleted log (epoch, categoryName, aggregateId) + if log.IsEnabled LogEventLevel.Verbose then + MemoryStoreLogger.renderCompleted log (epoch, categoryName, aggregateId) positions.Completed <- epoch // We don't have anything Async to do, so we pass a null checkpointing function enqueueSubmission { isTail = true; epoch = epoch; checkpoint = (fun _ -> task { () }); items = items; onCompletion = markCompleted } diff --git a/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj b/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj index 8bc4f304..da635e56 100644 --- a/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj +++ b/src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj @@ -19,7 +19,7 @@ - + diff --git a/src/Propulsion/Pipeline.fs b/src/Propulsion/Pipeline.fs index afe6d97f..a50edb1c 100755 --- a/src/Propulsion/Pipeline.fs +++ b/src/Propulsion/Pipeline.fs @@ -40,7 +40,7 @@ type Pipeline(task : Task, triggerStop) = static member Prepare(log : ILogger, pumpScheduler, pumpSubmitter, ?pumpIngester, ?pumpDispatcher) = let cts = new CancellationTokenSource() let triggerStop disposing = - let level = if disposing || cts.IsCancellationRequested then Events.LogEventLevel.Debug else Events.LogEventLevel.Information + let level = if disposing || cts.IsCancellationRequested then LogEventLevel.Debug else LogEventLevel.Information log.Write(level, "Sink stopping...") cts.Cancel() let ct = cts.Token @@ -79,7 +79,7 @@ type Pipeline(task : Task, triggerStop) = let ts = Stopwatch.timestamp () let finishedAsRequested = scheduler.Wait(TimeSpan.FromSeconds 2) let ms = let t = Stopwatch.elapsed ts in int t.TotalMilliseconds - let level = if finishedAsRequested && ms < 200 then Events.LogEventLevel.Information else Events.LogEventLevel.Warning + let level = if finishedAsRequested && ms < 200 then LogEventLevel.Information else LogEventLevel.Warning log.Write(level, "... sink completed {schedulerCleanupMs}ms", ms) } let task = Task.Run(supervise) diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index 06ba0afc..b3503d9d 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -50,14 +50,13 @@ module Log = newest : float let [] PropertyTag = "propulsionEvent" - let [] GroupTag = "group" /// Attach a property to the captured event record to hold the metric information - // Sidestep Log.ForContext converting to a string; see https://github.com/serilog/serilog/issues/1124 let internal withMetric (value : Metric) = Internal.Log.withScalarProperty PropertyTag value let tryGetScalar<'t> key (logEvent : Serilog.Events.LogEvent) : 't voption = let mutable p = Unchecked.defaultof<_> logEvent.Properties.TryGetValue(key, &p) |> ignore match p with Log.ScalarValue (:? 't as e) -> ValueSome e | _ -> ValueNone + let [] GroupTag = "group" let [] (|MetricEvent|_|) logEvent = match tryGetScalar PropertyTag logEvent with | ValueSome m -> ValueSome (m, tryGetScalar GroupTag logEvent) @@ -925,10 +924,10 @@ type Stats<'Outcome>(log : ILogger, statsInterval, statesInterval) = Log.miB okBytes, resultOk, okStreams.Count, okEvents, resultOk) okStreams.Clear(); resultOk <- 0; okEvents <- 0; okBytes <- 0L if resultExnOther <> 0 then - log.Warning("Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e", + log.Warning(" Exceptions {mb:n0}MB {fails:n0}r {streams:n0}s {events:n0}e", Log.miB exnBytes, resultExnOther, failStreams.Count, exnEvents) resultExnOther <- 0; failStreams.Clear(); exnBytes <- 0L; exnEvents <- 0 - log.Warning(" Affected cats {@badCats}", badCats.StatsDescending) + log.Warning(" Affected cats {@badCats}", badCats.StatsDescending) badCats.Clear() abstract member Classify : exn -> OutcomeKind diff --git a/tools/Propulsion.Tool/Infrastructure.fs b/tools/Propulsion.Tool/Infrastructure.fs index 8abd057c..d79ceccf 100644 --- a/tools/Propulsion.Tool/Infrastructure.fs +++ b/tools/Propulsion.Tool/Infrastructure.fs @@ -3,11 +3,12 @@ module Propulsion.Tool.Infrastructure open Serilog -module Log = +module Metrics = - let forMetrics = Log.ForContext("isMetric", true) - - let isStoreMetrics x = Serilog.Filters.Matching.WithProperty("isMetric").Invoke x + let [] PropertyTag = "isMetric" + let log = Log.ForContext(PropertyTag, true) + /// Allow logging to filter out emission of log messages whose information is also surfaced as metrics + let logEventIsMetric e = Serilog.Filters.Matching.WithProperty(PropertyTag).Invoke e module Sinks = @@ -49,4 +50,4 @@ type Logging() = [] static member Sinks(configuration : LoggerConfiguration, configureMetricsSinks, verboseStore, verboseConsole) = - configuration.Sinks(configureMetricsSinks, Sinks.console verboseConsole, ?isMetric = if verboseStore then None else Some Log.isStoreMetrics) + configuration.Sinks(configureMetricsSinks, Sinks.console verboseConsole, ?isMetric = if verboseStore then None else Some Metrics.logEventIsMetric) diff --git a/tools/Propulsion.Tool/Program.fs b/tools/Propulsion.Tool/Program.fs index dca417c3..a01c443f 100644 --- a/tools/Propulsion.Tool/Program.fs +++ b/tools/Propulsion.Tool/Program.fs @@ -178,10 +178,10 @@ module Checkpoints = let cache = Equinox.Cache (appName, sizeMb = 1) match a.StoreArgs with | Choice1Of3 a -> - let! store = a.CreateCheckpointStore(group, cache, Log.forMetrics) + let! store = a.CreateCheckpointStore(group, cache, Metrics.log) return (store : Propulsion.Feed.IFeedCheckpointStore), "cosmos", fun pos -> store.Override(source, tranche, pos, ct) | Choice2Of3 a -> - let store = a.CreateCheckpointStore(group, cache, Log.forMetrics) + let store = a.CreateCheckpointStore(group, cache, Metrics.log) return store, $"dynamo -t {a.IndexTable}", fun pos -> store.Override(source, tranche, pos, ct) | Choice3Of3 a -> let store = a.CreateCheckpointStore(group) @@ -234,7 +234,7 @@ module Indexer = Log.Warning("Gapped stream {stream}@{wp}: Missing {gap} events before {successorEventTypes}", stream, v.writePos, gap, v.spans[0].c) elif gapped = gapsLimit then Log.Error("Gapped Streams Dump limit ({gapsLimit}) reached; use commandline flag to show more", gapsLimit) - let level = if gapped > 0 then Serilog.Events.LogEventLevel.Warning else Serilog.Events.LogEventLevel.Information + let level = if gapped > 0 then LogEventLevel.Warning else LogEventLevel.Information Log.Write(level, "Index {events:n0} events {streams:n0} streams ({spans:n0} spans) Buffered {buffered} Queueing {queuing} Gapped {gapped:n0}", totalE, totalS, spanCount, buffered, queuing, gapped) @@ -246,7 +246,7 @@ module Indexer = | None when (not << List.isEmpty) a.ImportJsonFiles -> missingArg "Must specify a trancheId parameter to import into" | None -> - let index = AppendsIndex.Reader.create Log.forMetrics context + let index = AppendsIndex.Reader.create Metrics.log context let! state = index.Read() Log.Information("Current Partitions / Active Epochs {summary}", seq { for kvp in state -> struct (kvp.Key, kvp.Value) } |> Seq.sortBy (fun struct (t, _) -> t)) @@ -260,7 +260,7 @@ module Indexer = Log.Information("Inspect Batches in Epoch {epoch} of Index Partition {partition} 👉 {cmd}", eid, pid, dumpCmd AppendsEpoch.Category (AppendsEpoch.streamId (pid, eid)) "-B ") | Some trancheId -> - let! buffer, indexedSpans = DynamoStoreIndex.Reader.loadIndex (Log.Logger, Log.forMetrics, context) trancheId a.GapsLimit + let! buffer, indexedSpans = DynamoStoreIndex.Reader.loadIndex (Log.Logger, Metrics.log, context) trancheId a.GapsLimit let dump ingestedCount = dumpSummary a.GapsLimit buffer.Items (indexedSpans + ingestedCount) dump 0 @@ -271,7 +271,7 @@ module Indexer = Log.Information("Ingesting {files}...", files) let ingest = - let ingester = DynamoStoreIngester(Log.Logger, context, storeLog = Log.forMetrics) + let ingester = DynamoStoreIngester(Log.Logger, context, storeLog = Metrics.log) fun batch -> ingester.Service.IngestWithoutConcurrency(trancheId, batch) let import = DynamoDbExport.Importer(buffer, ingest, dump) for file in files do @@ -362,11 +362,11 @@ module Project = let (indexStore, indexFilter), loadMode = sa.MonitoringParams() let checkpoints = let cache = Equinox.Cache (appName, sizeMb = 1) - sa.CreateCheckpointStore(group, cache, Log.forMetrics) + sa.CreateCheckpointStore(group, cache, Metrics.log) Propulsion.DynamoStore.DynamoStoreSource( Log.Logger, stats.StatsInterval, indexStore, defaultArg maxItems 100, TimeSpan.FromSeconds 0.5, - checkpoints, sink, loadMode, startFromTail = startFromTail, storeLog = Log.forMetrics, + checkpoints, sink, loadMode, startFromTail = startFromTail, storeLog = Metrics.log, ?trancheIds = indexFilter ).Start() | Choice3Of3 sa ->