From 68a443e74754dac31a5ac2e79d95e2c35089f5b2 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 27 Sep 2023 13:00:01 +0100 Subject: [PATCH] Tidy; add event name --- CHANGELOG.md | 1 + src/Propulsion.CosmosStore/CosmosStoreSink.fs | 12 +- src/Propulsion.EventStore/EventStoreSink.fs | 10 +- src/Propulsion.Kafka/Consumers.fs | 46 +++--- src/Propulsion/Sinks.fs | 10 +- src/Propulsion/Streams.fs | 150 ++++++++---------- src/Propulsion/Sync.fs | 23 ++- .../ConsumersIntegration.fs | 6 +- 8 files changed, 121 insertions(+), 137 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b938de7b..548e7d13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Feed`: `Checkpoint` enables committing progress (and obtaining the achieved positions) without stopping the Sink [#162](https://github.com/jet/propulsion/pull/162) - `Feed.SinglePassFeedSource`: Coordinates reads of a set of tranches until each reaches its Tail [#179](https://github.com/jet/propulsion/pull/179) - `Scheduler`: Split out stats re `rateLimited` and `timedOut` vs `exceptions` [#194](https://github.com/jet/propulsion/pull/194) +- `Scheduler`: Added `index`, `eventType` to error logging [#233](https://github.com/jet/propulsion/pull/233) - `Scheduler`: `purgeInterval` to control memory usage [#97](https://github.com/jet/propulsion/pull/97) - `Scheduler`: `wakeForResults` option to maximize throughput (without having to drop sleep interval to zero) [#161](https://github.com/jet/propulsion/pull/161) - `Sinks`, `Sinks.Config`: top level APIs for wiring common sink structures [#208](https://github.com/jet/propulsion/pull/208) diff --git a/src/Propulsion.CosmosStore/CosmosStoreSink.fs b/src/Propulsion.CosmosStore/CosmosStoreSink.fs index 6d951a51..56db2eb1 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreSink.fs +++ b/src/Propulsion.CosmosStore/CosmosStoreSink.fs @@ -109,21 +109,21 @@ module Internal = #else try let! res = Writer.write log eventsContext stream span' ct #endif - return struct (Events.index span', span'.Length > 0, Ok struct (met, res)) - with e -> return struct (Events.index span', false, Error struct (met, e)) } - let interpretWriteResultProgress (streams: Scheduling.StreamStates<_>) stream res = + return Ok struct (met, res) + with e -> return Error struct (met, e) } + let interpretProgress (streams: Scheduling.StreamStates<_>) stream res = let applyResultToStreamState = function | Ok struct (_stats, Writer.Result.Ok pos) -> struct (streams.RecordWriteProgress(stream, pos, null), false) | Ok (_stats, Writer.Result.Duplicate pos) -> streams.RecordWriteProgress(stream, pos, null), false - | Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, overage[0].Index, [| overage |]), false - | Ok (_stats, Writer.Result.PrefixMissing (overage, pos)) -> streams.RecordWriteProgress(stream, pos, [|overage|]), false + | Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, Events.index overage, [| overage |]), false + | Ok (_stats, Writer.Result.PrefixMissing (overage, pos)) -> streams.RecordWriteProgress(stream, pos, [| overage |]), false | Error struct (_stats, exn) -> let malformed = Writer.classify exn |> Writer.isMalformed streams.SetMalformed(stream, malformed), malformed let struct (ss, malformed) = applyResultToStreamState res Writer.logTo writerResultLog malformed (stream, res) struct (ss.WritePos, res) - Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretWriteResultProgress) + Dispatcher.Concurrent<_, _, _, _>.Create(itemDispatcher, attemptWrite, interpretProgress) type WriterResult = Internal.Writer.Result diff --git a/src/Propulsion.EventStore/EventStoreSink.fs b/src/Propulsion.EventStore/EventStoreSink.fs index bb4f2d81..d0e9292d 100755 --- a/src/Propulsion.EventStore/EventStoreSink.fs +++ b/src/Propulsion.EventStore/EventStoreSink.fs @@ -71,19 +71,19 @@ module Internal = let maxEvents, maxBytes = 65536, 4 * 1024 * 1024 - (*fudge*)4096 let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span try let! res = Writer.write storeLog selectedConnection (FsCodec.StreamName.toString stream) span' ct - return struct (Events.index span', span'.Length > 0, Ok struct (met, res)) - with e -> return Events.index span', false, Error struct (met, e) } - let interpretWriteResultProgress (streams : Scheduling.StreamStates<_>) stream res = + return Ok struct (met, res) + with e -> return Error struct (met, e) } + let interpretProgress (streams : Scheduling.StreamStates<_>) stream res = let applyResultToStreamState = function | Ok struct (_stats, Writer.Result.Ok pos) -> streams.RecordWriteProgress(stream, pos, null) | Ok (_stats, Writer.Result.Duplicate pos) -> streams.RecordWriteProgress(stream, pos, null) - | Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, overage[0].Index, [| overage |]) + | Ok (_stats, Writer.Result.PartialDuplicate overage) -> streams.RecordWriteProgress(stream, Events.index overage, [| overage |]) | Ok (_stats, Writer.Result.PrefixMissing (overage, pos)) -> streams.RecordWriteProgress(stream, pos, [| overage |]) | Error struct (_stats, _exn) -> streams.SetMalformed(stream, false) let ss = applyResultToStreamState res Writer.logTo writerResultLog (stream, res) struct (ss.WritePos, res) - Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, attemptWrite, interpretWriteResultProgress) + Dispatcher.Concurrent<_, _, _, _>.Create(maxDop, attemptWrite, interpretProgress) type WriterResult = Internal.Writer.Result diff --git a/src/Propulsion.Kafka/Consumers.fs b/src/Propulsion.Kafka/Consumers.fs index 4d8ca913..25cf348e 100644 --- a/src/Propulsion.Kafka/Consumers.fs +++ b/src/Propulsion.Kafka/Consumers.fs @@ -344,33 +344,25 @@ type Factory private () = ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay) static member StartBatchedAsync<'Info> - ( log : ILogger, config : KafkaConsumerConfig, consumeResultToInfo, infoToStreamEvents, - select, handle : Func[], CancellationToken, Task>>>, stats, + ( log: ILogger, config: KafkaConsumerConfig, consumeResultToInfo, infoToStreamEvents, + select, handle: Func[], CancellationToken, Task)>>>, stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = - let handle (items : Scheduling.Item[]) ct - : Task>[]> = task { - let sw = Stopwatch.start () - let avgElapsed () = - let tot = float sw.ElapsedMilliseconds - TimeSpan.FromMilliseconds(tot / float items.Length) + let handle (items: Scheduling.Item[]) ct + : Task>[]> = task { + let start = Stopwatch.timestamp () + let inline err ts e (x: Scheduling.Item<_>) = + let met = StreamSpan.metrics Event.renderedSize x.span + Scheduling.InternalRes.create (x, ts, Result.Error struct (met, e)) try let! results = handle.Invoke(items, ct) - let ae = avgElapsed () - return - [| for x in Seq.zip items results -> - match x with - | item, Ok index' -> - let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index' ) |> Array.ofSeq - let metrics = StreamSpan.metrics Event.storedSize used - Scheduling.Res.create (ae, item.stream, Events.index item.span, not (Array.isEmpty used), Ok struct (index', struct (metrics, ()))) - | item, Error e -> - let metrics = StreamSpan.metrics Event.renderedSize item.span - Scheduling.Item.createResE (ae, item, metrics, e) |] + return Array.ofSeq (Seq.zip items results |> Seq.map(function + | item, (ts, Ok index') -> + let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index' ) |> Array.ofSeq + let metrics = StreamSpan.metrics Event.storedSize used + Scheduling.InternalRes.create (item, ts, Result.Ok struct (metrics, index')) + | item, (ts, Error e) -> err ts e item)) with e -> - let ae = avgElapsed () - return - [| for x in items -> - let metrics = StreamSpan.metrics Event.renderedSize x.span - Scheduling.Item.createResE (ae, x, metrics, e) |] } + let ts = Stopwatch.elapsed start + return items |> Array.map (err ts e) } let dispatcher = Dispatcher.Batched(select, handle) let dumpStreams logStreamStates log = logExternalState |> Option.iter (fun f -> f log) @@ -394,14 +386,14 @@ type Factory private () = /// Processor 'Outcomes are passed to be accumulated into the stats for periodic emission.
/// Processor will run perpetually in a background until `Stop()` is requested. static member StartBatched<'Info> - ( log : ILogger, config : KafkaConsumerConfig, consumeResultToInfo, infoToStreamEvents, - select : StreamState seq -> StreamState[], + ( log: ILogger, config: KafkaConsumerConfig, consumeResultToInfo, infoToStreamEvents, + select: StreamState seq -> StreamState[], // Handler responses: // - the result seq is expected to match the ordering of the input Scheduling.Items // - Ok: Index at which next processing will proceed (which can trigger discarding of earlier items on that stream) // - Error: Records the processing of the stream in question as having faulted (the stream's pending events and/or // new ones that arrived while the handler was processing are then eligible for retry purposes in the next dispatch cycle) - handle : StreamState[] -> Async>>, + handle: StreamState[] -> Async)>>, // The responses from each handle invocation are passed to stats for periodic emission stats, ?logExternalState, ?purgeInterval, ?wakeForResults, ?idleDelay) = diff --git a/src/Propulsion/Sinks.fs b/src/Propulsion/Sinks.fs index 6c58ef33..6501447c 100644 --- a/src/Propulsion/Sinks.fs +++ b/src/Propulsion/Sinks.fs @@ -88,14 +88,14 @@ type Factory private () = /// Project Events sequentially via a handle function that yields a StreamResult per selected Item static member StartBatchedAsync<'Outcome> ( log, maxReadAhead, - select : Func, - handle : Func>>>, + select: Func, + handle: Func)>>>, stats, [] ?pendingBufferSize, [] ?purgeInterval, [] ?wakeForResults, [] ?idleDelay, [] ?ingesterStatsInterval, [] ?requireCompleteStreams) = let handle items ct = task { let! res = handle.Invoke(items, ct) - return seq { for i, r in Seq.zip items res -> Result.map (StreamResult.toIndex i.span) r } } + return seq { for i, (ts, r) in Seq.zip items res -> struct (ts, Result.map (StreamResult.toIndex i.span) r) } } Streams.Batched.Start(log, maxReadAhead, select, handle, Event.storedSize, stats, ?pendingBufferSize = pendingBufferSize, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay, ?ingesterStatsInterval = ingesterStatsInterval, ?requireCompleteStreams = requireCompleteStreams) @@ -143,8 +143,8 @@ type Factory private () = /// Per handled stream, the result can be either a StreamResult conveying progress, or an exception static member StartBatched<'Outcome> ( log, maxReadAhead, - select : StreamState seq -> StreamState[], - handle : StreamState[] -> Async>>, + select: StreamState seq -> StreamState[], + handle: StreamState[] -> Async)>>, stats, // Configure max number of batches to buffer within the scheduler; Default: Same as maxReadAhead [] ?pendingBufferSize, [] ?purgeInterval, [] ?wakeForResults, [] ?idleDelay, diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index e5adfdfc..69d50bd7 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -350,9 +350,6 @@ module Scheduling = if malformedStreams.Any then log.Information(" Malformed Streams, MB {@malformedStreams}", malformedStreams.StatsDescending) gapStreams.Any - [] - type InternalResult<'R> = { duration: TimeSpan; stream: FsCodec.StreamName; index: int64; progressed: bool; result: 'R } - type [] BufferState = Idle | Active | Full module Stats = @@ -476,6 +473,9 @@ module Scheduling = if full > 0 then t.Ingest(nameof Full, full) t.StatsDescending + [] + type Res<'R> = { duration: TimeSpan; stream: FsCodec.StreamName; index: int64; event: string; index': int64; result: 'R } + /// Gathers stats pertaining to the core projection/ingestion activity [] type Stats<'R, 'E>(log: ILogger, statsInterval: TimeSpan, stateInterval: TimeSpan, [] ?failThreshold) = @@ -540,10 +540,11 @@ module Scheduling = member _.HandleStarted(stream, stopwatchTicks) = monitor.HandleStarted(stream, stopwatchTicks) - abstract member Handle : InternalResult> -> unit + abstract member Handle : Res> -> unit member private _.RecordOutcomeKind(r, k) = - let inline updateMonitor succeeded = monitor.HandleResult(r.stream, succeeded = succeeded, progressed = r.progressed) + let progressed = r.index = r.index' + let inline updateMonitor succeeded = monitor.HandleResult(r.stream, succeeded = succeeded, progressed = progressed) let outcomeKind = match k with | OutcomeKind.Ok -> updateMonitor true; oks.Record r.duration; "ok" @@ -554,7 +555,7 @@ module Scheduling = if metricsLog.IsEnabled LogEventLevel.Information then let m = Log.Metric.HandlerResult (outcomeKind, r.duration.TotalSeconds) (metricsLog |> Log.withMetric m).Information("Outcome {kind} in {ms:n0}ms, progressed: {progressed}", - outcomeKind, r.duration.TotalMilliseconds, r.progressed) + outcomeKind, r.duration.TotalMilliseconds, progressed) if monitorInterval.IfDueRestart() then monitor.EmitMetrics metricsLog member x.RecordOk(r) = x.RecordOutcomeKind(r, OutcomeKind.Ok) member x.RecordExn(r, k, log, exn) = @@ -645,13 +646,9 @@ module Scheduling = if xs.MoveNext() then Seq.append (prioritizeHead f xs.Current) (collectUniqueStreams xs) else Seq.empty - [] - type Res<'R> = { duration: TimeSpan; stream: FsCodec.StreamName; index: int64; progressed: bool; result: 'R } - module Res = let create (d, s, i, p, r) = { duration = d; stream = s; index = i; progressed = p; result = r } - /// Defines interface between Scheduler (which owns the pending work) and the Dispatcher which periodically selects work to commence based on a policy type IDispatcher<'P, 'R, 'E, 'F> = - [] abstract member Result : IEvent>> + [] abstract member Result : IEvent>> abstract member Pump : CancellationToken -> Task abstract member State : struct (int * int) abstract member HasCapacity : bool with get @@ -660,22 +657,21 @@ module Scheduling = abstract member InterpretProgress : StreamStates<'F> * FsCodec.StreamName * Result<'P, 'E> -> struct (int64 voption * Result<'R, 'E>) and [] Item<'Format> = { stream: FsCodec.StreamName; nextIndex: int64 voption; span: FsCodec.ITimelineEvent<'Format>[] } - module Item = - let createResE (d, i: Item<'F>, m, e) = - Res.create (d, i.stream, StreamSpan.idx i.span, false, Error struct (m, e)) - let createResO (d, i: Item<'F>, m, i') = - let index = StreamSpan.idx i.span - Res.create (d, i.stream, index, i' > index, Ok struct (i', struct (m, ()))) + and [] InternalRes<'R> = { stream: FsCodec.StreamName; index: int64; event: string; duration: TimeSpan; result: 'R } + module InternalRes = + let inline create (i: Item<_>, d, r) = + let h = i.span[0] + { stream = i.stream; index = h.Index; event = h.EventType; duration = d; result = r } + /// Consolidates ingested events into streams; coordinates dispatching of these to projector/ingester in the order implied by the submission order /// a) does not itself perform any reading activities /// b) triggers synchronous callbacks as batches complete; writing of progress is managed asynchronously by the TrancheEngine(s) /// c) submits work to the supplied Dispatcher (which it triggers pumping of) /// d) periodically reports state (with hooks for ingestion engines to report same) type Engine<'P, 'R, 'E, 'F> - ( dispatcher : IDispatcher<'P, 'R, 'E, 'F>, stats : Stats<'R,'E>, dumpState, + ( dispatcher: IDispatcher<'P, 'R, 'E, 'F>, stats: Stats<'R,'E>, dumpState, // Number of batches to buffer locally; rest held by submitter to enable fair ordering of submission as partitions are added/removed pendingBufferSize, - // Tune the max number of check/dispatch cycles. Default 2. // Frequency of jettisoning Write Position state of inactive streams (held by the scheduler for deduplication purposes) to limit memory consumption // NOTE: Purging can impair performance, increase write costs or result in duplicate event emissions due to redundant inputs not being deduplicated ?purgeInterval, @@ -726,18 +722,18 @@ module Scheduling = dispatcher.TryReplenish(candidateItems, handleStarted) // Ingest information to be gleaned from processing the results into `streams` (i.e. remove stream requirements as they are completed) - let handleResult ({ duration = duration; stream = stream; index = i; progressed = p; result = r } : Res<_>) = + let handleResult ({ stream = stream; index = i; event = et; duration = duration; result = r }: InternalRes<_>) = match dispatcher.InterpretProgress(streams, stream, r) with - | ValueSome index, Ok (r : 'R) -> - batches.MarkStreamProgress(stream, index) - streams.RecordProgress(stream, index) - stats.Handle { duration = duration; stream = stream; index = i; progressed = p; result = Ok r } - | ValueNone, Ok (r : 'R) -> + | ValueSome index', Ok (r: 'R) -> + batches.MarkStreamProgress(stream, index') + streams.RecordProgress(stream, index') + stats.Handle { duration = duration; stream = stream; index = i; event = et; index' = index'; result = Ok r } + | ValueNone, Ok (r: 'R) -> streams.RecordNoProgress(stream) - stats.Handle { duration = duration; stream = stream; index = i; progressed = p; result = Ok r } + stats.Handle { duration = duration; stream = stream; index = i; event = et; index' = i; result = Ok r } | _, Error exn -> streams.RecordNoProgress(stream) - stats.Handle { duration = duration; stream = stream; index = i; progressed = p; result = Error exn } + stats.Handle { duration = duration; stream = stream; index = i; event = et; index' = i; result = Error exn } let tryHandleResults () = tryApplyResults handleResult // Take an incoming batch of events, correlating it against our known stream state to yield a set of remaining work @@ -864,7 +860,7 @@ module Dispatcher = /// Kicks off enough work to fill the inner Dispatcher up to capacity type internal ItemDispatcher<'R, 'F>(maxDop) = - let inner = DopDispatcher>(maxDop) + let inner = DopDispatcher>(maxDop) // On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first let tryFillDispatcher (potential : seq>) markStarted project = @@ -888,27 +884,27 @@ module Dispatcher = /// Implementation of IDispatcher that feeds items to an item dispatcher that maximizes concurrent requests (within a limit) type Concurrent<'P, 'R, 'E, 'F> internal - ( inner : ItemDispatcher, 'F>, - project : struct (int64 * Scheduling.Item<'F>) -> CancellationToken -> Task>>, - interpretProgress : Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P, 'E> -> struct (int64 voption * Result<'R, 'E>)) = + ( inner: ItemDispatcher, 'F>, + project: struct (int64 * Scheduling.Item<'F>) -> CancellationToken -> Task>>, + interpretProgress: Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P,'E> -> struct (int64 voption * Result<'R, 'E>)) = static member Create ( maxDop, - project : FsCodec.StreamName -> FsCodec.ITimelineEvent<'F>[] -> CancellationToken -> Task)>, - interpretProgress) = - let project struct (startTs, item : Scheduling.Item<'F>) (ct : CancellationToken) = task { - let! struct (index, progressed, res) = project item.stream item.span ct - return Scheduling.Res.create (Stopwatch.elapsed startTs, item.stream, index, progressed, res) } + project: FsCodec.StreamName -> FsCodec.ITimelineEvent<'F>[] -> CancellationToken -> Task>, + interpretProgress: Scheduling.StreamStates<'F> -> FsCodec.StreamName -> Result<'P, 'E> -> struct (int64 voption * Result<'R, 'E>)) = + let project struct (startTs, item: Scheduling.Item<'F>) (ct: CancellationToken) = task { + let! res = project item.stream item.span ct + return Scheduling.InternalRes.create (item, Stopwatch.elapsed startTs, res) } Concurrent<_, _, _, _>(ItemDispatcher(maxDop), project, interpretProgress) - static member Create(maxDop, prepare : Func<_, _, _>, handle : Func<_, _, CancellationToken, Task<_>>, toIndex : Func<_, 'R, int64>) = - let project sn span ct = task { - let struct (met, span : FsCodec.ITimelineEvent<'F>[]) = prepare.Invoke(sn, span) - try let! struct (spanResult, outcome) = handle.Invoke(sn, span, ct) + static member Create(maxDop, prepare: Func<_, _, _>, handle: Func<_, _, CancellationToken, Task<_>>, toIndex: Func<_, 'R, int64>) = + let project stream span ct = task { + let struct (met, span: FsCodec.ITimelineEvent<'F>[]) = prepare.Invoke(stream, span) + try let! struct (spanResult, outcome) = handle.Invoke(stream, span, ct) let index' = toIndex.Invoke(span, spanResult) - return struct (StreamSpan.idx span, index' > StreamSpan.idx span, Ok struct (index', met, outcome)) - with e -> return struct (StreamSpan.idx span, false, Error struct (met, e)) } - let interpretProgress (_streams : Scheduling.StreamStates<'F>) _stream = function - | Ok struct (index, met, outcome) -> struct (ValueSome index, Ok struct (met, outcome)) - | Error struct (stats, exn) -> ValueNone, Error struct (stats, exn) + return Ok struct (index', met, outcome) + with e -> return Error struct (met, e) } + let interpretProgress (_streams: Scheduling.StreamStates<'F>) _stream = function + | Ok struct (index', met, outcome) -> struct (ValueSome index', Ok struct (met, outcome)) + | Error struct (met, exn) -> ValueNone, Error struct (met, exn) Concurrent<_, _, _, 'F>.Create(maxDop, project, interpretProgress) interface Scheduling.IDispatcher<'P, 'R, 'E, 'F> with [] override _.Result = inner.Result @@ -921,11 +917,12 @@ module Dispatcher = /// Implementation of IDispatcher that allows a supplied handler select work and declare completion based on arbitrarily defined criteria type Batched<'F> - ( select : Func seq, Scheduling.Item<'F>[]>, - handle : Scheduling.Item<'F>[] -> CancellationToken -> - Task>[]>) = + ( select: Func seq, Scheduling.Item<'F>[]>, + // NOTE Handler must not throw under any circumstances, or the exception will go unobserved + handle: Scheduling.Item<'F>[] -> CancellationToken -> + Task>[]>) = let inner = DopDispatcher 1 - let result = Event>>() + let result = Event>>() // On each iteration, we offer the ordered work queue to the selector // we propagate the selected streams to the handler @@ -941,7 +938,7 @@ module Dispatcher = hasCapacity <- false struct (dispatched, hasCapacity) - interface Scheduling.IDispatcher with + interface Scheduling.IDispatcher with [] override _.Result = result.Publish override _.Pump ct = task { use _ = inner.Result.Subscribe(Array.iter result.Trigger) @@ -950,10 +947,10 @@ module Dispatcher = override _.HasCapacity = inner.HasCapacity override _.AwaitCapacity(ct) = inner.AwaitButRelease(ct) override _.TryReplenish(pending, handleStarted) = trySelect pending handleStarted - override _.InterpretProgress(_streams : Scheduling.StreamStates<_>, _stream : FsCodec.StreamName, res : Result<_, _>) = + override _.InterpretProgress(_streams: Scheduling.StreamStates<_>, _stream: FsCodec.StreamName, res: Result<_, _>) = match res with - | Ok (pos', (stats, outcome)) -> ValueSome pos', Ok (stats, outcome) - | Error (stats, exn) -> ValueNone, Error (stats, exn) + | Ok (met, pos') -> ValueSome pos', Ok (met, ()) + | Error (met, exn) -> ValueNone, Error (met, exn) [] type Stats<'Outcome>(log: ILogger, statsInterval, statesInterval, [] ?failThreshold) = @@ -986,12 +983,12 @@ type Stats<'Outcome>(log: ILogger, statsInterval, statesInterval, [] resultOk <- resultOk + 1 base.RecordOk res this.HandleOk outcome - | { duration = duration; stream = stream; index = index; result = Error ((es, bs), Exception.Inner exn) } -> + | { duration = duration; stream = stream; index = index; event = et; result = Error ((es, bs), Exception.Inner exn) } -> addBadStream stream failStreams exnEvents <- exnEvents + es exnBytes <- exnBytes + int64 bs resultExnOther <- resultExnOther + 1 - base.RecordExn(res, this.Classify exn, log.ForContext("stream", stream).ForContext("index", index).ForContext("events", es).ForContext("duration", duration), exn) + base.RecordExn(res, this.Classify exn, log.ForContext("stream", stream).ForContext("index", index).ForContext("eventType", et).ForContext("count", es).ForContext("duration", duration), exn) abstract member HandleOk : outcome : 'Outcome -> unit @@ -1086,37 +1083,30 @@ type Batched private () = /// Establishes a Sink pipeline that continually dispatches to a single instance of a handle function /// Prior to the dispatch, the potential streams to include in the batch are identified by the select function static member Start<'Progress, 'Outcome, 'F> - ( log : ILogger, maxReadAhead, - select : Func seq, Scheduling.Item<'F>[]>, handle : Func[], CancellationToken, Task>>>, - eventSize, stats : Scheduling.Stats<_, _>, + ( log: ILogger, maxReadAhead, + select: Func seq, Scheduling.Item<'F>[]>, + handle: Func[], CancellationToken, Task)>>>, + eventSize, stats: Scheduling.Stats<_, _>, ?pendingBufferSize, ?purgeInterval, ?wakeForResults, ?idleDelay, ?ingesterStatsInterval, ?requireCompleteStreams) : Sink seq>> = - let handle (items : Scheduling.Item<'F>[]) ct - : Task>[]> = task { - let sw = Stopwatch.start () - let avgElapsed () = - let tot = float sw.ElapsedMilliseconds - TimeSpan.FromMilliseconds(tot / float items.Length) + let handle (items: Scheduling.Item<'F>[]) ct + : Task>[]> = task { + let start = Stopwatch.timestamp () + let err ts e (x: Scheduling.Item<_>) = + let met = StreamSpan.metrics eventSize x.span + Scheduling.InternalRes.create (x, ts, Error struct (met, e)) try let! results = handle.Invoke(items, ct) - let ae = avgElapsed () - return - [| for x in Seq.zip items results -> - match x with - | item, Ok index' -> - let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index' ) |> Array.ofSeq - let metrics = StreamSpan.metrics eventSize used - Scheduling.Item.createResO (ae, item, metrics, index') - | item, Error e -> - let metrics = StreamSpan.metrics eventSize item.span - Scheduling.Item.createResE (ae, item, metrics, e) |] + return Array.ofSeq (Seq.zip items results |> Seq.map (function + | item, (ts, Ok index') -> + let used = item.span |> Seq.takeWhile (fun e -> e.Index <> index') |> Array.ofSeq + let met = StreamSpan.metrics eventSize used + Scheduling.InternalRes.create (item, ts, Ok struct (met, index')) + | item, (ts, Error e) -> err ts e item)) with e -> - let ae = avgElapsed () - return - [| for x in items -> - let metrics = StreamSpan.metrics eventSize x.span - Scheduling.Item.createResE (ae, x, metrics, e) |] } + let ts = Stopwatch.elapsed start + return items |> Array.map (err ts e) } let dispatcher = Dispatcher.Batched(select, handle) let dumpStreams logStreamStates _log = logStreamStates eventSize let scheduler = Scheduling.Engine(dispatcher, stats, dumpStreams, diff --git a/src/Propulsion/Sync.fs b/src/Propulsion/Sync.fs index bfe094da..39a6e371 100644 --- a/src/Propulsion/Sync.fs +++ b/src/Propulsion/Sync.fs @@ -11,7 +11,7 @@ open System.Threading.Tasks [] type Stats<'Outcome>(log : ILogger, statsInterval, stateInterval, [] ?failThreshold) = - inherit Scheduling.Stats(log, statsInterval, stateInterval, ?failThreshold = failThreshold) + inherit Scheduling.Stats(log, statsInterval, stateInterval, ?failThreshold = failThreshold) let okStreams, failStreams = HashSet(), HashSet() let prepareStats = Stats.LatencyStats("prepare") let mutable okEvents, okBytes, exnEvents, exnBytes = 0, 0L, 0, 0L @@ -29,7 +29,7 @@ type Stats<'Outcome>(log : ILogger, statsInterval, stateInterval, [] override this.Handle message = let inline adds x (set : HashSet<_>) = set.Add x |> ignore match message with - | { stream = stream; result = Ok (((es, bs), prepareElapsed), outcome) } -> + | { stream = stream; result = Ok ((es, bs), prepareElapsed, outcome) } -> adds stream okStreams okEvents <- okEvents + es okBytes <- okBytes + int64 bs @@ -57,27 +57,26 @@ type Factory private () = let maxEvents, maxBytes = defaultArg maxEvents 16384, (defaultArg maxBytes (1024 * 1024 - (*fudge*)4096)) - let attemptWrite stream (events : FsCodec.ITimelineEvent<'F>[]) ct = task { + let attemptWrite stream (events: FsCodec.ITimelineEvent<'F>[]) ct = task { let struct (met, span') = StreamSpan.slice<'F> sliceSize (maxEvents, maxBytes) events let prepareTs = Stopwatch.timestamp () try let! res, outcome = handle.Invoke(stream, span', ct) let index' = toIndex.Invoke(span', res) - return struct (StreamSpan.idx events, index' > StreamSpan.idx events, Ok struct (index', struct (met, Stopwatch.elapsed prepareTs), outcome)) - with e -> return struct (StreamSpan.idx events, false, Error struct (met, e)) } + return Ok struct (index', met, Stopwatch.elapsed prepareTs, outcome) + with e -> return Error struct (met, e) } - let interpretWriteResultProgress _streams (stream : FsCodec.StreamName) = function - | Ok struct (i', stats, outcome) -> - struct (ValueSome i', Ok struct (stats, outcome)) - | Error struct (struct (eventCount, bytesCount) as stats, exn : exn) -> + let interpretProgress _streams (stream: FsCodec.StreamName) = function + | Ok struct (i', met, prep, outcome) -> struct (ValueSome i', Ok struct (met, prep, outcome)) + | Error struct (struct (eventCount, bytesCount) as met, exn: exn) -> log.Warning(exn, "Handling {events:n0}e {bytes:n0}b for {stream} failed, retrying", eventCount, bytesCount, stream) - ValueNone, Error struct (stats, exn) + ValueNone, Error struct (met, exn) - let dispatcher : Scheduling.IDispatcher<_, _, _, _> = Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, attemptWrite, interpretWriteResultProgress) + let dispatcher: Scheduling.IDispatcher<_, _, _, _> = Dispatcher.Concurrent<_, _, _, _>.Create(maxConcurrentStreams, attemptWrite, interpretProgress) let dumpStreams logStreamStates log = logStreamStates eventSize match dumpExternalStats with Some f -> f log | None -> () let scheduler = - Scheduling.Engine + Scheduling.Engine<_, struct (StreamSpan.Metrics * TimeSpan * 'Outcome), struct (StreamSpan.Metrics * exn), 'F> (dispatcher, stats, dumpStreams, pendingBufferSize = maxReadAhead, ?idleDelay = idleDelay, ?purgeInterval = purgeInterval) Projector.Pipeline.Start(log, scheduler.Pump, maxReadAhead, scheduler, stats.StatsInterval.Period) diff --git a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs index e83b90a5..f24346d2 100644 --- a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs +++ b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs @@ -119,14 +119,16 @@ module Helpers = // When offered, take whatever is pending let select = Array.ofSeq // when processing, declare all items processed each time we're invoked - let handle (streams : Propulsion.Sinks.StreamState[]) = async { + let handle (streams: Propulsion.Sinks.StreamState[]) = async { + let ts = Stopwatch.timestamp () let mutable c = 0 for stream in streams do for event in stream.span do c <- c + 1 do! handler (getConsumer()) (deserialize consumerId event) (log : ILogger).Information("BATCHED CONSUMER Handled {c} events in {l} streams", c, streams.Length ) - return seq { for x in streams -> Ok (Propulsion.Streams.StreamSpan.ver x.span) } } + let ts = Stopwatch.elapsed ts + return seq { for x in streams -> struct (ts, Ok (Propulsion.Sinks.Events.nextIndex x.span)) } } let stats = Stats(log, TimeSpan.FromSeconds 5.,TimeSpan.FromSeconds 5.) let messageIndexes = StreamNameSequenceGenerator() let consumer =