Skip to content

Commit

Permalink
feat(Streams): Stats/error logging polish
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 26, 2023
1 parent c10c4a8 commit 3c495cb
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 89 deletions.
4 changes: 2 additions & 2 deletions src/Propulsion.CosmosStore/CosmosStorePruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ module Pruner =
let writePos = max trimmedPos (untilIndex + 1L)
return struct (writePos, res) }

type CosmosStorePrunerStats(log, statsInterval, stateInterval) =
inherit Propulsion.Streams.Stats<Pruner.Outcome>(log, statsInterval, stateInterval)
type CosmosStorePrunerStats(log, statsInterval, stateInterval, [<O; D null>] ?failThreshold) =
inherit Propulsion.Streams.Stats<Pruner.Outcome>(log, statsInterval, stateInterval, ?failThreshold = failThreshold)

let mutable nops, totalRedundant, ops, totalDeletes, totalDeferred = 0, 0, 0, 0, 0
override _.HandleOk outcome =
Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ module Internal =
#else
try let! res = Writer.write log eventsContext stream span' ct
#endif
return struct (span'.Length > 0, Ok struct (met, res))
with e -> return struct (false, Error struct (met, e)) }
return struct (StreamSpan.idx span', span'.Length > 0, Ok struct (met, res))
with e -> return struct (StreamSpan.idx span', false, Error struct (met, e)) }
let interpretWriteResultProgress (streams: Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
| Ok struct (_stats, Writer.Result.Ok pos) -> struct (streams.RecordWriteProgress(stream, pos, null), false)
Expand All @@ -127,8 +127,8 @@ module Internal =

type WriterResult = Internal.Writer.Result

type CosmosStoreSinkStats(log : ILogger, statsInterval, stateInterval) =
inherit Scheduling.Stats<struct (StreamSpan.Metrics * WriterResult), struct (StreamSpan.Metrics * exn)>(log, statsInterval, stateInterval)
type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D null>] ?failThreshold) =
inherit Scheduling.Stats<struct (StreamSpan.Metrics * WriterResult), struct (StreamSpan.Metrics * exn)>(log, statsInterval, stateInterval, ?failThreshold = failThreshold)
let mutable okStreams, resultOk, resultDup, resultPartialDup, resultPrefix, resultExnOther = HashSet(), 0, 0, 0, 0, 0
let mutable badCats, failStreams, rateLimited, timedOut, tooLarge, malformed = Stats.CatStats(), HashSet(), 0, 0, 0, 0
let rlStreams, toStreams, tlStreams, mfStreams, oStreams = HashSet(), HashSet(), HashSet(), HashSet(), HashSet()
Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion.EventStore/EventStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ module Internal =
let maxEvents, maxBytes = 65536, 4 * 1024 * 1024 - (*fudge*)4096
let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span
try let! res = Writer.write storeLog selectedConnection (FsCodec.StreamName.toString stream) span' ct
return struct (span'.Length > 0, Ok struct (met, res))
with e -> return false, Error struct (met, e) }
return struct (StreamSpan.idx span', span'.Length > 0, Ok struct (met, res))
with e -> return StreamSpan.idx span', false, Error struct (met, e) }
let interpretWriteResultProgress (streams : Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
| Ok struct (_stats, Writer.Result.Ok pos) -> streams.RecordWriteProgress(stream, pos, null)
Expand All @@ -87,8 +87,8 @@ module Internal =

type WriterResult = Internal.Writer.Result

type EventStoreSinkStats(log : ILogger, statsInterval, stateInterval) =
inherit Scheduling.Stats<struct (StreamSpan.Metrics * WriterResult), struct (StreamSpan.Metrics * exn)>(log, statsInterval, stateInterval)
type EventStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D null>] ?failThreshold) =
inherit Scheduling.Stats<struct (StreamSpan.Metrics * WriterResult), struct (StreamSpan.Metrics * exn)>(log, statsInterval, stateInterval, ?failThreshold = failThreshold)

let mutable okStreams, badCats, failStreams, toStreams, oStreams = HashSet(), Stats.CatStats(), HashSet(), HashSet(), HashSet()
let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExnOther, timedOut = 0, 0, 0, 0, 0, 0
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Feed/FeedReader.fs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type internal Stats(partition : int, source : SourceId, tranche : TrancheId, ren
elif lastCommittedPosition = batchLastPosition then "COMPLETE"
else if finishedReading then "End" else "Tail"
(Log.withMetric m log).ForContext("tail", lastWasTail).Information(
"Reader {partition} {state} @ {lastCommittedPosition}/{readPosition} Pages {pagesRead} empty {pagesEmpty} events {events} | Recent {l:f1}s Pages {recentPagesRead} empty {recentPagesEmpty} events {recentEvents} | Wait {pausedS:f1}s Ahead {cur}/{max}",
"Reader {partition} {state} @ {lastCommittedPosition}/{readPosition} Pages {pagesRead} empty {pagesEmpty} events {events} | Recent {l:f1}s Pages {recentPagesRead} empty {recentPagesEmpty} events {recentEvents} Wait {pausedS:f1}s Ahead {cur}/{max}",
partition, state, r lastCommittedPosition, r batchLastPosition, pagesRead, pagesEmpty, events, readS, recentPagesRead, recentPagesEmpty, recentEvents, postS, currentBatches, maxReadAhead)
readLatency <- TimeSpan.Zero; ingestLatency <- TimeSpan.Zero
recentPagesRead <- 0; recentEvents <- 0; recentPagesEmpty <- 0
Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion.Kafka/Consumers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ type Factory private () =
select, handle : Func<Scheduling.Item<_>[], CancellationToken, Task<seq<Result<int64, exn>>>>, stats,
?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) =
let handle (items : Scheduling.Item<EventBody>[]) ct
: Task<struct (TimeSpan * FsCodec.StreamName * bool * Result<struct (int64 * struct (StreamSpan.Metrics * unit)), struct (StreamSpan.Metrics * exn)>)[]> = task {
: Task<struct (TimeSpan * FsCodec.StreamName * int64 * bool * Result<struct (int64 * struct (StreamSpan.Metrics * unit)), struct (StreamSpan.Metrics * exn)>)[]> = task {
let sw = Stopwatch.start ()
let avgElapsed () =
let tot = float sw.ElapsedMilliseconds
Expand All @@ -361,16 +361,16 @@ type Factory private () =
| item, Ok index' ->
let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index' ) |> Array.ofSeq
let metrics = StreamSpan.metrics Event.storedSize used
struct (ae, item.stream, true, Ok struct (index', struct (metrics, ())))
struct (ae, item.stream, StreamSpan.idx item.span, true, Ok struct (index', struct (metrics, ())))
| item, Error exn ->
let metrics = StreamSpan.metrics Event.renderedSize item.span
ae, item.stream, false, Result.Error struct (metrics, exn) |]
ae, item.stream, StreamSpan.idx item.span, false, Result.Error struct (metrics, exn) |]
with e ->
let ae = avgElapsed ()
return
[| for x in items ->
let metrics = StreamSpan.metrics Event.renderedSize x.span
ae, x.stream, false, Result.Error struct (metrics, e) |] }
ae, x.stream, StreamSpan.idx x.span, false, Result.Error struct (metrics, e) |] }
let dispatcher = Dispatcher.Batched(select, handle)
let dumpStreams logStreamStates log =
logExternalState |> Option.iter (fun f -> f log)
Expand Down
9 changes: 5 additions & 4 deletions src/Propulsion/Ingestion.fs
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,16 @@ type Ingester<'Items> private
member private x.Pump(ct) = task {
use _ = progressWriter.Result.Subscribe(ProgressResult >> enqueueMessage)
Task.start (fun () -> x.CheckpointPeriodically ct)
while not ct.IsCancellationRequested do
let mutable exiting = false
while not exiting do
exiting <- ct.IsCancellationRequested
while applyIncoming handleIncoming || applyMessages stats.Handle do ()
stats.RecordCycle()
if stats.Interval.IfDueRestart() then let struct (active, max) = maxRead.State in stats.DumpStats(active, max)
if exiting || stats.Interval.IfDueRestart() then let struct (active, max) = maxRead.State in stats.DumpStats(active, max)
let startWaits ct = [| awaitIncoming ct :> Task
awaitMessage ct
Task.Delay(stats.Interval.RemainingMs, ct) |]
do! Task.runWithCancellation ct (fun ct -> Task.WhenAny(startWaits ct)) }

if not exiting then do! Task.runWithCancellation ct (fun ct -> Task.WhenAny(startWaits ct)) }
/// Starts an independent Task that handles
/// a) `unpack`ing of `incoming` items
/// b) `submit`ting them onward (assuming there is capacity within the `maxReadAhead`)
Expand Down
29 changes: 27 additions & 2 deletions src/Propulsion/Internal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ module Stats =
p95 = pc 95
p99 = pc 99 }
let stdDev = match l.stddev with None -> Double.NaN | Some d -> d.TotalSeconds
log.Information(" {kind} {count} : max={max:n3}s p99={p99:n3}s p95={p95:n3}s p50={p50:n3}s min={min:n3}s avg={avg:n3}s stddev={stddev:n3}s",
log.Information(" {kind} {count,4} : max={max:n3}s p99={p99:n3}s p95={p95:n3}s p50={p50:n3}s min={min:n3}s avg={avg:n3}s stddev={stddev:n3}s",
kind, sortedLatencies.Length, l.max.TotalSeconds, l.p99.TotalSeconds, l.p95.TotalSeconds, l.p50.TotalSeconds, l.min.TotalSeconds, l.avg.TotalSeconds, stdDev)

/// Operations on an instance are safe cross-thread
Expand All @@ -254,14 +254,39 @@ module Stats =
dumpStats kind buffer log
buffer.Clear() // yes, there is a race

/// Should only be used on one thread
/// Not thread-safe, i.e. suitable for use in a Stats handler only
type LatencyStats(kind) =
let buffer = ResizeArray<TimeSpan>()
member _.Record value = buffer.Add value
member _.Dump(log : Serilog.ILogger) =
if buffer.Count <> 0 then
dumpStats kind buffer log
buffer.Clear()
/// Not thread-safe, i.e. suitable for use in a Stats handler only
type LatencyStatsSet(?totalLabel) =
let totalLabel = defaultArg totalLabel " TOTAL"
let groups = Dictionary<string, ResizeArray<TimeSpan>>()
member _.Record(kind, value: TimeSpan) =
match groups.TryGetValue kind with
| false, _ -> let n = ResizeArray() in n.Add value; groups.Add(kind, n)
| true, buf -> buf.Add value
member _.Dump(log : Serilog.ILogger) =
let max = groups.Keys |> Seq.map String.length |> Seq.max
for name in Seq.sort groups.Keys do
dumpStats (name.PadRight(max)) groups[name] log
member _.DumpGrouped(f, log : Serilog.ILogger) =
dumpStats totalLabel (groups |> Seq.collect (fun kv -> kv.Value)) log
let clusters =
groups
|> Seq.groupBy (fun kv -> f kv.Key)
|> Seq.sortBy fst
|> Seq.toArray

let max = clusters |> Seq.map (fst >> String.length) |> Seq.max
for name, items in clusters do
let lats = seq { for kv in items -> kv.Value }
dumpStats (name.PadRight(max)) (Seq.concat lats) log
member _.Clear() = groups.Clear()

type LogEventLevel = Serilog.Events.LogEventLevel

Expand Down
Loading

0 comments on commit 3c495cb

Please sign in to comment.