Skip to content

Commit

Permalink
Target Equinox 4rc12.8
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jul 26, 2023
1 parent 90974a4 commit 323e4c2
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 67 deletions.
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.12" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.12.8" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10" />
</ItemGroup>

Expand Down
60 changes: 31 additions & 29 deletions src/Propulsion.CosmosStore/ReaderCheckpoint.fs
Original file line number Diff line number Diff line change
Expand Up @@ -96,31 +96,40 @@ let decideStart establishOrigin at freq state = async {
| Fold.NotStarted ->
let! origin = establishOrigin
let config, checkpoint = mk at freq origin
return checkpoint.pos, [Events.Started { config = config; origin = checkpoint}]
return checkpoint.pos, [| Events.Started { config = config; origin = checkpoint } |]
| Fold.Running s ->
return s.state.pos, [] }
return s.state.pos, [||] }

let decideOverride at (freq : TimeSpan) pos = function
| Fold.Running s when s.state.pos = pos && s.config.checkpointFreqS = int freq.TotalSeconds -> []
| Fold.Running s when s.state.pos = pos && s.config.checkpointFreqS = int freq.TotalSeconds -> [||]
| _ ->
let config, checkpoint = mk at freq pos
[Events.Overrode { config = config; pos = checkpoint}]
[| Events.Overrode { config = config; pos = checkpoint } |]

let decideUpdate at pos = function
| Fold.NotStarted -> failwith "Cannot Commit a checkpoint for a series that has not been Started"
| Fold.Running state ->
| Fold.Running state -> [|
if at < state.state.nextCheckpointDue then
if pos = state.state.pos then [] // No checkpoint due, pos unchanged => No write
else // No checkpoint due, pos changed => Write, but maintain same nextCheckpointDue
[Events.Updated { config = state.config; pos = mkCheckpoint at state.state.nextCheckpointDue pos }]
if pos <> state.state.pos then // No checkpoint due, pos changed => Write, but maintain same nextCheckpointDue
Events.Updated { config = state.config; pos = mkCheckpoint at state.state.nextCheckpointDue pos }
else // Checkpoint due => Force a write every N seconds regardless of whether the position has actually changed
let freq = TimeSpan.FromSeconds(float state.config.checkpointFreqS)
let config, checkpoint = mk at freq pos
[Events.Checkpointed { config = config; pos = checkpoint }]
Events.Checkpointed { config = config; pos = checkpoint } |]

type Decider<'e, 's> = Equinox.Decider<'e, 's>
#if COSMOSV3
module Equinox =
let AnyCachedValue = ()
type Equinox.Decider<'e, 's> with
member x.TransactAsync(decide, load : unit): Async<'r> =
x.TransactAsync(fun s -> async { let! r, es = decide s in return r, Array.toList es })
member x.Transact(decide, load : unit): Async<'r> =
x.Transact(decide >> function r, es -> r, Array.toList es)
member x.Transact(decide, ?load : unit): Async<unit> =
x.Transact(decide >> Array.toList)
#endif

type Service internal (resolve : SourceId * TrancheId * string -> Decider<Events.Event, Fold.State>, consumerGroupName, defaultCheckpointFrequency) =
type Service internal (resolve: SourceId * TrancheId * string -> Equinox.Decider<Events.Event, Fold.State>, consumerGroupName, defaultCheckpointFrequency) =

interface IFeedCheckpointStore with

Expand All @@ -129,21 +138,14 @@ type Service internal (resolve : SourceId * TrancheId * string -> Decider<Events
member _.Start(source, tranche, establishOrigin, ct) : Task<Position> =
let decider = resolve (source, tranche, consumerGroupName)
let establishOrigin = match establishOrigin with None -> async { return Position.initial } | Some f -> Async.call f.Invoke
#if COSMOSV3
decider.TransactAsync(decideStart establishOrigin DateTimeOffset.UtcNow defaultCheckpointFrequency)
#else
decider.TransactAsync(decideStart establishOrigin DateTimeOffset.UtcNow defaultCheckpointFrequency, load = Equinox.AnyCachedValue)
#endif
|> Async.executeAsTask ct

/// Ingest a position update
/// NB fails if not already initialized; caller should ensure correct initialization has taken place via Read -> Start
member _.Commit(source, tranche, pos : Position, ct) =
let decider = resolve (source, tranche, consumerGroupName)
#if COSMOSV3
decider.Transact(decideUpdate DateTimeOffset.UtcNow pos)
#else
decider.Transact(decideUpdate DateTimeOffset.UtcNow pos, load = Equinox.AnyCachedValue)
#endif
|> Async.executeAsTask ct :> _

/// Override a checkpointing series with the supplied parameters
Expand All @@ -157,9 +159,9 @@ module MemoryStore =
open Equinox.MemoryStore

let create log (consumerGroupName, defaultCheckpointFrequency) context =
let cat = MemoryStoreCategory(context, Events.codec, Fold.fold, Fold.initial)
let resolve = Equinox.Decider.resolve log cat
Service(streamId >> resolve Category, consumerGroupName, defaultCheckpointFrequency)
let cat = MemoryStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial)
let resolve = Equinox.Decider.forStream log cat
Service(streamId >> resolve, consumerGroupName, defaultCheckpointFrequency)
#else
#if DYNAMOSTORE
module DynamoStore =
Expand All @@ -169,9 +171,9 @@ module DynamoStore =
let accessStrategy = AccessStrategy.Custom (Fold.isOrigin, Fold.transmute)
let create log (consumerGroupName, defaultCheckpointFrequency) (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let cat = DynamoStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
let resolve = Equinox.Decider.resolve log cat
Service(streamId >> resolve Category, consumerGroupName, defaultCheckpointFrequency)
let cat = DynamoStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)
let resolve = Equinox.Decider.forStream log cat
Service(streamId >> resolve, consumerGroupName, defaultCheckpointFrequency)
#else
#if !COSMOSV3
module CosmosStore =
Expand All @@ -181,12 +183,12 @@ module CosmosStore =
let accessStrategy = AccessStrategy.Custom (Fold.isOrigin, Fold.transmute)
let create log (consumerGroupName, defaultCheckpointFrequency) (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let cat = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
let resolve = Equinox.Decider.resolve log cat
Service(streamId >> resolve Category, consumerGroupName, defaultCheckpointFrequency)
let cat = CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)
let resolve = Equinox.Decider.forStream log cat
Service(streamId >> resolve, consumerGroupName, defaultCheckpointFrequency)
#else
let private create log defaultCheckpointFrequency resolveStream =
let resolve id = Decider(log, resolveStream Equinox.AllowStale (streamName id), maxAttempts = 3)
let resolve id = Equinox.Decider(log, resolveStream Equinox.AllowStale (streamName id), maxAttempts = 3)
Service(resolve, null, defaultCheckpointFrequency)

#if COSMOSV3
Expand Down
18 changes: 9 additions & 9 deletions src/Propulsion.DynamoStore/AppendsEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,16 @@ module Ingest =
| ({ closed = false; versions = cur } as state : Fold.State) ->
let closed, ingested, events =
match tryToIngested state inputs with
| None -> false, Array.empty, []
| None -> false, Array.empty, [||]
| Some diff ->
let closing = shouldClose (diff.app.Length + diff.add.Length + cur.Count)
let ingestEvent = Events.Ingested diff
let ingested = (seq { for x in diff.add -> x.p }, seq { for x in diff.app -> x.p }) ||> Seq.append |> Array.ofSeq
closing, ingested, [ ingestEvent ; if closing then Events.Closed ]
closing, ingested, [| ingestEvent ; if closing then Events.Closed |]
let res : ExactlyOnceIngester.IngestResult<_, _> = { accepted = ingested; closed = closed; residual = [||] }
res, events
| { closed = true } as state ->
{ accepted = [||]; closed = true; residual = removeDuplicates state inputs }, []
{ accepted = [||]; closed = true; residual = removeDuplicates state inputs }, [||]

type Service internal (shouldClose, resolve : AppendsPartitionId * AppendsEpochId -> Equinox.Decider<Events.Event, Fold.State>) =

Expand All @@ -125,15 +125,15 @@ type Service internal (shouldClose, resolve : AppendsPartitionId * AppendsEpochI

module Config =

let private createCategory (context, cache) = Config.createUnoptimized Events.codec Fold.initial Fold.fold (context, Some cache)
let private createCategory (context, cache) = Config.createUnoptimized Category Events.codec Fold.initial Fold.fold (context, Some cache)
let create log (maxBytes : int, maxVersion : int64, maxStreams : int) store =
let resolve = createCategory store |> Equinox.Decider.resolve log
let resolve = createCategory store |> Equinox.Decider.forStream log
let shouldClose (totalBytes : int64 voption, version) totalStreams =
let closing = totalBytes.Value > maxBytes || version >= maxVersion || totalStreams >= maxStreams
if closing then log.Information("Epoch Closing v{version}/{maxVersion} {streams}/{maxStreams} streams {kib:f0}/{maxKib:f0} KiB",
version, maxVersion, totalStreams, maxStreams, float totalBytes.Value / 1024., float maxBytes / 1024.)
closing
Service(shouldClose, streamId >> resolve Category)
Service(shouldClose, streamId >> resolve)

/// Manages the loading of Ingested Span Batches in a given Epoch from a given position forward
/// In the case where we are polling the tail, this should mean we typically do a single round-trip for a point read of the Tip
Expand Down Expand Up @@ -166,8 +166,8 @@ module Reader =

module Config =

let private createCategory context minIndex = Config.createWithOriginIndex codec initial fold context minIndex
let private createCategory context minIndex = Config.createWithOriginIndex Category codec initial fold context minIndex
let create log context =
let resolve minIndex = Equinox.Decider.resolve log (createCategory context minIndex)
Service(fun (pid, eid, minIndex) -> streamId (pid, eid) |> resolve minIndex Category)
let resolve minIndex = Equinox.Decider.forStream log (createCategory context minIndex)
Service(fun (pid, eid, minIndex) -> streamId (pid, eid) |> resolve minIndex)
#endif
16 changes: 8 additions & 8 deletions src/Propulsion.DynamoStore/AppendsIndex.fs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ let readEpochId partitionId (state : Fold.State) =
state
|> Map.tryFind partitionId

let interpret (partitionId, epochId) (state : Fold.State) =
[if state |> readEpochId partitionId |> Option.forall (fun cur -> cur < epochId) && epochId >= AppendsEpochId.initial then
yield Events.Started { partition = partitionId; epoch = epochId }]
let interpret (partitionId, epochId) (state : Fold.State) = [|
if state |> readEpochId partitionId |> Option.forall (fun cur -> cur < epochId) && epochId >= AppendsEpochId.initial then
Events.Started { partition = partitionId; epoch = epochId } |]

type Service internal (resolve : unit -> Equinox.Decider<Events.Event, Fold.State>) =
type Service internal (resolve: unit -> Equinox.Decider<Events.Event, Fold.State>) =

/// Determines the current active epoch for the specified Partition
member _.ReadIngestionEpochId(partitionId) : Async<AppendsEpochId> =
Expand All @@ -62,9 +62,9 @@ type Service internal (resolve : unit -> Equinox.Decider<Events.Event, Fold.Stat

module Config =

let private createCategory store = Config.createSnapshotted Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) store
let resolve log store = createCategory store |> Equinox.Decider.resolve log
let create log (context, cache) = Service(streamId >> resolve log (context, Some cache) Category)
let private createCategory store = Config.createSnapshotted Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) store
let resolve log store = createCategory store |> Equinox.Decider.forStream log
let create log (context, cache) = Service(streamId >> resolve log (context, Some cache))

/// On the Reading Side, there's no advantage to caching (as we have snapshots, and it's Dynamo)
module Reader =
Expand All @@ -89,5 +89,5 @@ module Reader =
let decider = resolve ()
decider.Query(readIngestionEpochId partitionId)

let create log context = Service(streamId >> Config.resolve log (context, None) Category)
let create log context = Service(streamId >> Config.resolve log (context, None))
#endif
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.12" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.12.8" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10" />
</ItemGroup>

Expand Down
16 changes: 8 additions & 8 deletions src/Propulsion.DynamoStore/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,27 @@ module internal Config =

let private defaultCacheDuration = System.TimeSpan.FromMinutes 20.

let private create codec initial fold accessStrategy (context, cache) =
let private create name codec initial fold accessStrategy (context, cache) =
let cs = match cache with None -> CachingStrategy.NoCaching | Some cache -> CachingStrategy.SlidingWindow (cache, defaultCacheDuration)
DynamoStoreCategory(context, codec, fold, initial, cs, accessStrategy)
DynamoStoreCategory(context, name, codec, fold, initial, accessStrategy, cs)

let createSnapshotted codec initial fold (isOrigin, toSnapshot) (context, cache) =
let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) =
let accessStrategy = AccessStrategy.Snapshot (isOrigin, toSnapshot)
create codec initial fold accessStrategy (context, cache)
create name codec initial fold accessStrategy (context, cache)

let createUnoptimized codec initial fold (context, cache) =
let createUnoptimized name codec initial fold (context, cache) =
let accessStrategy = AccessStrategy.Unoptimized
create codec initial fold accessStrategy (context, cache)
create name codec initial fold accessStrategy (context, cache)

let createWithOriginIndex codec initial fold context minIndex =
let createWithOriginIndex name codec initial fold context minIndex =
// TOCONSIDER include way to limit item count being read
// TOCONSIDER implement a loader hint to pass minIndex to the query as an additional filter
let isOrigin struct (i, _) = i <= minIndex
// There _should_ always be an event at minIndex - if there isn't for any reason, the load might go back one event too far
// Here we trim it for correctness (although Propulsion would technically ignore it)
let trimPotentialOverstep = Seq.filter (fun struct (i, _e) -> i >= minIndex)
let accessStrategy = AccessStrategy.MultiSnapshot (isOrigin, fun _ -> failwith "writing not applicable")
create codec initial (fun s -> trimPotentialOverstep >> fold s) accessStrategy (context, None)
create name codec initial (fun s -> trimPotentialOverstep >> fold s) accessStrategy (context, None)

module internal EventCodec =

Expand Down
15 changes: 7 additions & 8 deletions src/Propulsion.EventStore/Checkpoint.fs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type Command =
| Override of at : DateTimeOffset * checkpointFreq : TimeSpan * pos : int64
| Update of at : DateTimeOffset * pos : int64

let interpret command (state: Fold.State): seq<Events.Event> =
let interpret command (state: Fold.State): Events.Event[] = [|
let mkCheckpoint at next pos = { at = at; nextCheckpointDue = next; pos = pos } : Events.Checkpoint
let mk (at : DateTimeOffset) (interval : TimeSpan) pos : Events.Config * Events.Checkpoint =
let next = at.Add interval
Expand All @@ -78,20 +78,19 @@ let interpret command (state: Fold.State): seq<Events.Event> =
match command, state with
| Start (at, freq, pos), Fold.NotStarted ->
let config, checkpoint = mk at freq pos
[Events.Started { config = config; origin = checkpoint}]
Events.Started { config = config; origin = checkpoint }
| Override (at, freq, pos), Fold.Running _ ->
let config, checkpoint = mk at freq pos
[Events.Overrode { config = config; pos = checkpoint}]
Events.Overrode { config = config; pos = checkpoint}
| Update (at, pos), Fold.Running state ->
if at < state.state.nextCheckpointDue then
if pos = state.state.pos then [] // No checkpoint due, pos unchanged => No write
else // No checkpoint due, pos changed => Write, but maintain same nextCheckpointDue
[Events.Updated { config = state.config; pos = mkCheckpoint at state.state.nextCheckpointDue pos }]
if pos <> state.state.pos then // No checkpoint due, pos changed => Write, but maintain same nextCheckpointDue
Events.Updated { config = state.config; pos = mkCheckpoint at state.state.nextCheckpointDue pos }
else // Checkpoint due => Force a write every N seconds regardless of whether the position has actually changed
let freq = TimeSpan.FromSeconds(float state.config.checkpointFreqS)
let config, checkpoint = mk at freq pos
[Events.Checkpointed { config = config; pos = checkpoint }]
| c, s -> failwithf "Command %A invalid when %A" c s
Events.Checkpointed { config = config; pos = checkpoint }
| c, s -> failwith $"Command %A{c} invalid when %A{s}" |]

type Service internal (resolve : CheckpointSeriesId -> Equinox.DeciderCore<Events.Event, Fold.State>) =

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStore/Propulsion.EventStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.EventStore" Version="4.0.0-rc.12" />
<PackageReference Include="Equinox.EventStore" Version="4.0.0-rc.12.8" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.10" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-rc.12" />
<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-rc.12.8" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.10" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.12" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.12.8" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit 323e4c2

Please sign in to comment.