Skip to content

Commit

Permalink
Complete basic shape
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 30, 2024
1 parent d310aec commit 27084e3
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 22 deletions.
6 changes: 3 additions & 3 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ module Internal =
| AppendResult.Ok pos -> Result.Ok pos.index
| AppendResult.Conflict (pos, _) | AppendResult.ConflictUnknown pos ->
match pos.index with
| actual when actual < i -> Result.PrefixMissing (actual - i |> int, actual)
| actual when actual < i -> Result.PrefixMissing (i - actual |> int, actual)
| actual when actual >= n -> Result.Duplicate actual
| actual -> Result.PartialDuplicate actual
log.Debug("Result: {res}", res')
Expand Down Expand Up @@ -124,11 +124,11 @@ module Internal =

type WriterResult = Internal.Writer.Result

type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D null>] ?failThreshold, [<O; D null>] ?logExternalStats) =
type CosmosStoreSinkStats(log: ILogger, statsInterval, stateInterval, [<O; D null>]?storeLog, [<O; D null>] ?failThreshold, [<O; D null>] ?logExternalStats) =
inherit Scheduling.Stats<Dispatcher.ResProgressAndMetrics<WriterResult>, Dispatcher.ExnAndMetrics>(
log, statsInterval, stateInterval, ?failThreshold = failThreshold,
logExternalStats = defaultArg logExternalStats Equinox.CosmosStore.Core.Log.InternalMetrics.dump)
let writerResultLog = log.ForContext<WriterResult>()
let writerResultLog = (defaultArg storeLog log).ForContext<WriterResult>()
let mutable okStreams, okEvents, okUnfolds, okBytes = HashSet(), 0, 0, 0L
let mutable exnCats, exnStreams, exnEvents, exnUnfolds, exnBytes = Stats.Counters(), HashSet(), 0, 0, 0L
let mutable resultOk, resultDup, resultPartialDup, resultPrefix, resultExn = 0, 0, 0, 0, 0
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="4.1.0-alpha.13" />
<PackageReference Include="Equinox.CosmosStore" Version="4.1.0-alpha.14" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStore/EventStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ module Internal =
Result.Ok (pos'.streamVersion + 1L)
| GatewaySyncResult.ConflictUnknown (Token.Unpack pos) ->
match pos.streamVersion + 1L with
| actual when actual < i -> Result.PrefixMissing (actual - i |> int, actual)
| actual when actual < i -> Result.PrefixMissing (i - actual |> int, actual)
| actual when actual >= i + span.LongLength -> Result.Duplicate actual
| actual -> Result.PartialDuplicate actual
log.Debug("Result: {res}", res')
Expand Down
32 changes: 16 additions & 16 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ module Buffer =
let increment (x: Revision): Revision = % (% x + 1)
type HandlerProgress = (struct (int64 * Revision))
module HandlerProgress =
let ofPos pos: HandlerProgress = (pos, FSharp.UMX.UMX.tag -1)
let ofPos pos: HandlerProgress = (pos, Revision.initial)
let ofMetricsAndPos revision ((_es, us, _bs): StreamSpan.Metrics) pos: HandlerProgress = if us <> 0 then (pos, revision) else ofPos pos
type ProgressRequirement = (struct (int64 * Revision voption))
module ProgressRequirement =
Expand All @@ -177,11 +177,8 @@ module Buffer =
[<NoComparison; NoEquality; Struct>]
type StreamState<'Format> = private { write: int64; revision: Revision; queue: FsCodec.ITimelineEvent<'Format>[][] } with
static member Create(write, queue, revision) = { write = defaultValueArg write WritePosUnknown; revision = revision; queue = queue }
static member Create(write, queue, revision, malformed) =
if malformed then { write = WritePosMalformed; revision = revision; queue = queue }
else StreamState<'Format>.Create(write, queue, revision)
static member Create(write, queue, revision, malformed) = StreamState<'Format>.Create((if malformed then ValueSome WritePosMalformed else write), queue, revision)
member x.IsEmpty = LanguagePrimitives.PhysicalEquality null x.queue
member internal x.CanPurge = x.IsEmpty
member x.EventsSumBy(f) = if x.IsEmpty then 0L else x.queue |> Seq.map (Seq.sumBy f) |> Seq.sum |> int64
member x.EventsCount = if x.IsEmpty then 0 else x.queue |> Seq.sumBy Array.length

Expand All @@ -208,8 +205,7 @@ module Buffer =
// if there's an event after the required position, then the Unfolds will have been dropped
if tailEvent.Index > index then ProgressRequirement.ofPos (index + 1L) |> ValueSome
// if there's an unfold, then we'll know we're covered when either that, a newer equivalent, or a successor event has been ingested
elif hadUnfold && tailEvent.IsUnfold && tailEvent.Index = index then
ProgressRequirement.ofPosUnfoldRevision index x.revision |> ValueSome
elif hadUnfold && tailEvent.IsUnfold && tailEvent.Index = index then ProgressRequirement.ofPosUnfoldRevision index x.revision |> ValueSome
// Otherwise (if the tail does not have unfolds and/or the batch didnt have an unfold), we can count on it being covered when the event has been ingested
else ProgressRequirement.ofPos index |> ValueSome

Expand All @@ -229,13 +225,14 @@ module Buffer =
let changed =
match maybeLastUnfold queue, maybeLastUnfold s1.queue with
| ValueNone, ValueNone -> false
| ValueNone, ValueSome _ -> true
| ValueNone, ValueSome _
| ValueSome _, ValueNone -> true
| ValueSome l1, ValueSome l2 -> LanguagePrimitives.PhysicalEquality l1 l2
| ValueSome l1, ValueSome l2 -> LanguagePrimitives.PhysicalEquality l1 l2 |> not
let revision = if changed then Revision.increment s1.revision else s1.revision
StreamState<'Format>.Create(writePos, queue, revision, malformed)
let tryTrimUnfoldsIffPosAndRevisionStill ((pos, revision): HandlerProgress) ({ write = xw; revision = xr; queue = xq } as x) =
if xw <> pos || xr <> revision then ValueNone
elif xq = null then ValueSome { x with revision = Revision.increment xr }
else ValueSome { x with revision = Revision.increment xr; queue = xq |> Array.map (Array.filter (fun x -> not x.IsUnfold)) }

type Streams<'Format>() =
Expand Down Expand Up @@ -332,17 +329,17 @@ module Scheduling =
let updateStreamState stream = function
| Error malformed ->
// Flag that the data at the head of the stream is triggering a non-transient error condition from the handler, preventing any further handler dispatches for `stream`
merge stream (StreamState<'Format>.Create(write = ValueNone, queue = null, revision = Revision.initial, malformed = malformed)) |> ignore
merge stream (StreamState<'Format>.Create(ValueNone, null, Revision.initial, malformed = malformed)) |> ignore
| Ok (updatedPos, _dispatchedRevision as up: HandlerProgress) ->
// Ensure we have a position (in case it got purged); Drop any events or unfolds implied by updatedPos
merge stream (StreamState<'Format>.Create(ValueSome updatedPos, queue = null, revision = Revision.initial, malformed = false))
merge stream (StreamState<'Format>.Create(ValueSome updatedPos, null, Revision.initial))
// Strip unfolds out of the queue if the handler reported the position as unchanged, but the unfolds were included in the invocation
|> StreamState.tryTrimUnfoldsIffPosAndRevisionStill up |> ValueOption.iter (fun trimmed -> states[ stream ] <- trimmed)
let purge () =
let mutable purged = 0
for x in states do
let streamState = x.Value
if streamState.CanPurge then
if streamState.IsEmpty then
states.Remove x.Key |> ignore // Safe to do while iterating on netcore >= 3.0
purged <- purged + 1
states.Count, purged
Expand Down Expand Up @@ -620,9 +617,9 @@ module Scheduling =

member x.DumpStats(struct (dispatchActive, dispatchMax), struct (batchesWaiting, batchesRunning), abend) =
let batchesCompleted = System.Threading.Interlocked.Exchange(&batchesCompleted, 0)
log.Information("Batches waiting {waiting} started {started} {streams:n0}s {events:n0}e {unfolds:n0}u skipped {streamsSkipped:n0}s {eventsSkipped:n0}e {unfoldsSkipped:n0}e completed {completed} Running {active}",
log.Information("Batches waiting {waiting} started {started} {streams:n0}s {events:n0}e {unfolds:n0}u skipped {streamsSkipped:n0}s {eventsSkipped:n0}e {unfoldsSkipped:n0}u completed {completed} Running {active}",
batchesWaiting, batchesStarted, streamsStarted, eventsStarted, unfoldsStarted, streamsWrittenAhead, eventsWrittenAhead, unfoldsWrittenAhead, batchesCompleted, batchesRunning)
batchesStarted <- 0; streamsStarted <- 0; eventsStarted <- 0; streamsWrittenAhead <- 0; eventsWrittenAhead <- 0; (*batchesCompleted <- 0*)
batchesStarted <- 0; streamsStarted <- 0; eventsStarted <- 0; unfoldsStarted <- 0; streamsWrittenAhead <- 0; eventsWrittenAhead <- 0; unfoldsWrittenAhead <- 0; (*batchesCompleted <- 0*)
x.Timers.Dump log
log.Information("Scheduler {cycles} cycles {@states} Running {busy}/{processors}",
cycles, stateStats.StatsDescending, dispatchActive, dispatchMax)
Expand Down Expand Up @@ -728,7 +725,8 @@ module Scheduling =
member _.RemoveAttainedRequirements(stream, updatedPosAndDispatchedRevision) =
for x in pending do
match x.reqs.TryGetValue stream with
| true, req when ProgressRequirement.isSatisfiedBy updatedPosAndDispatchedRevision req -> x.reqs.Remove stream |> ignore
| true, req when ProgressRequirement.isSatisfiedBy updatedPosAndDispatchedRevision req ->
x.reqs.Remove stream |> ignore
| _ -> ()

member _.Dump(log: ILogger, lel, classify: FsCodec.StreamName -> Stats.Busy.State) =
Expand Down Expand Up @@ -852,6 +850,8 @@ module Scheduling =
// Enumerates the active batches; when the caller pulls beyond that, more get ingested on the fly
let enumBatches ingestStreams ingestBatches = seq {
yield! batches.EnumPending()
// TODO integrate this to guarantee ordering
ingestStreams ()
// We'll get here as soon as the dispatching process has exhausted the currently queued items
match ingestBatches () with
| [||] -> () // Nothing more available
Expand Down Expand Up @@ -897,7 +897,7 @@ module Scheduling =
| ValueSome req ->
if isUnfold then unfolds <- unfolds + 1 else events <- events + 1
reqs[item.Key] <- req
stats.RecordIngested(reqs.Count, events, batch.StreamsCount - reqs.Count, eventsSkipped, unfolds, unfoldsSkipped)
stats.RecordIngested(reqs.Count, batch.StreamsCount - reqs.Count, events, eventsSkipped, unfolds, unfoldsSkipped)
let onCompletion () =
batch.OnCompletion ()
stats.RecordBatchCompletion()
Expand Down
2 changes: 1 addition & 1 deletion tools/Propulsion.Tool/Sync.fs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
requireAll = requireAll)
| SubCommand.Sync sa ->
let eventsContext = sa.ConnectEvents() |> Async.RunSynchronously
let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, statsInterval, stateInterval,
let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, statsInterval, stateInterval, storeLog = Metrics.log,
logExternalStats = dumpStoreStats, Categorize = a.Categorize)
Propulsion.CosmosStore.CosmosStoreSink.Start(Metrics.log, maxReadAhead, eventsContext, maxConcurrentProcessors, stats,
maxBytes = sa.MaxBytes, requireAll = requireAll,
Expand Down

0 comments on commit 27084e3

Please sign in to comment.