Skip to content

Commit

Permalink
Handle FsCodec.StreamId changes
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Aug 6, 2023
1 parent 37a1879 commit 068f176
Show file tree
Hide file tree
Showing 18 changed files with 128 additions and 134 deletions.
8 changes: 6 additions & 2 deletions src/Propulsion.CosmosStore/CosmosStorePruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module Pruner =
// 3. Some deletions deferred
// (requested trim point was in the middle of a batch; touching it would put the batch out of order)
// in this case, we mark the event as handled and await a successor event triggering another attempt
let! deleted, deferred, trimmedPos = pruneUntil (FsCodec.StreamName.toString stream, untilIndex, ct)
let! deleted, deferred, trimmedPos = pruneUntil (stream, untilIndex, ct)
// Categorize the outcome so the stats handler can summarize the work being carried out
let res = if deleted = 0 && deferred = 0 then Nop span.Length else Ok (deleted, deferred)
// For case where we discover events have already been deleted beyond our requested position, signal to reader to drop events
Expand Down Expand Up @@ -71,7 +71,11 @@ type CosmosStorePruner =
?ingesterStatsInterval)
: Sink =
let dispatcher =
let inline pruneUntil (stream, index, ct) = Equinox.CosmosStore.Core.Events.pruneUntil context stream index |> Async.executeAsTask ct
#if COSMOSV3
let inline pruneUntil (sn, index, ct) = Equinox.CosmosStore.Core.Events.pruneUntil context (FsCodec.StreamName.toString sn) index |> Async.executeAsTask ct
#else
let inline pruneUntil (sn, index, ct) = Equinox.CosmosStore.Core.Events.pruneUntil context sn index |> Async.executeAsTask ct
#endif
let interpret _stream span =
let metrics = StreamSpan.metrics Event.storedSize span
struct (metrics, span)
Expand Down
4 changes: 4 additions & 0 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ module Internal =
let writerResultLog = log.ForContext<Writer.Result>()
let attemptWrite stream span ct = task {
let struct (met, span') = StreamSpan.slice Event.renderedSize (maxEvents, maxBytes) span
#if COSMOSV3
try let! res = Writer.write log eventsContext (StreamName.toString stream) span' ct
#else
try let! res = Writer.write log eventsContext stream span' ct
#endif
return struct (span'.Length > 0, Ok struct (met, res))
with e -> return struct (false, Error struct (met, e)) }
let interpretWriteResultProgress (streams: Scheduling.StreamStates<_>) stream res =
Expand Down
22 changes: 12 additions & 10 deletions src/Propulsion.CosmosStore/ReaderCheckpoint.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ let streamName (source, tranche, consumerGroupName : string) =
if consumerGroupName = null then
let Category = "ReaderCheckpoint"
// This form is only used for interop with the V3 Propulsion.Feed.FeedSource - anyone starting with V4 should only ever encounter tripartite names
FsCodec.StreamName.compose Category [SourceId.toString source; TrancheId.toString tranche]
FsCodec.StreamName.compose Category [| SourceId.toString source; TrancheId.toString tranche |]
else
let (*[<Literal>]*) Category = "$ReaderCheckpoint"
FsCodec.StreamName.compose Category [SourceId.toString source; TrancheId.toString tranche; consumerGroupName]
FsCodec.StreamName.compose Category [| SourceId.toString source; TrancheId.toString tranche; consumerGroupName |]
#else
let [<Literal>] Category = "$ReaderCheckpoint"
let streamId = Equinox.StreamId.gen3 SourceId.toString TrancheId.toString (*consumerGroupName*)id
module Stream =
let [<Literal>] Category = "$ReaderCheckpoint"
let id = FsCodec.StreamId.gen3 SourceId.toString TrancheId.toString (*consumerGroupName*)id

Check failure on line 19 in src/Propulsion.CosmosStore/ReaderCheckpoint.fs

View workflow job for this annotation

GitHub Actions / Build and test

The value, constructor, namespace or type 'StreamId' is not defined. Maybe you want one of the following:� StreamName� streamName� StreamName� streamName� StreamName
let name = id >> FsCodec.StreamName.create Category
#endif

// NB - these schemas reflect the actual storage formats and hence need to be versioned with care
Expand Down Expand Up @@ -159,9 +161,9 @@ module MemoryStore =
open Equinox.MemoryStore

let create log (consumerGroupName, defaultCheckpointFrequency) context =
let cat = MemoryStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial)
let cat = MemoryStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial)
let resolve = Equinox.Decider.forStream log cat
Service(streamId >> resolve, consumerGroupName, defaultCheckpointFrequency)
Service(Stream.id >> resolve, consumerGroupName, defaultCheckpointFrequency)
#else
let private defaultCacheDuration = TimeSpan.FromMinutes 20.
#if COSMOSV3
Expand All @@ -176,9 +178,9 @@ module DynamoStore =

let accessStrategy = AccessStrategy.Custom (Fold.isOrigin, Fold.transmute)
let create log (consumerGroupName, defaultCheckpointFrequency) (context, cache) =
let cat = DynamoStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy cache)
let cat = DynamoStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy cache)
let resolve = Equinox.Decider.forStream log cat
Service(streamId >> resolve, consumerGroupName, defaultCheckpointFrequency)
Service(Stream.id >> resolve, consumerGroupName, defaultCheckpointFrequency)
#else
#if !COSMOSV3
module CosmosStore =
Expand All @@ -187,9 +189,9 @@ module CosmosStore =

let accessStrategy = AccessStrategy.Custom (Fold.isOrigin, Fold.transmute)
let create log (consumerGroupName, defaultCheckpointFrequency) (context, cache) =
let cat = CosmosStoreCategory(context, Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy cache)
let cat = CosmosStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy cache)
let resolve = Equinox.Decider.forStream log cat
Service(streamId >> resolve, consumerGroupName, defaultCheckpointFrequency)
Service(Stream.id >> resolve, consumerGroupName, defaultCheckpointFrequency)

Check failure on line 194 in src/Propulsion.CosmosStore/ReaderCheckpoint.fs

View workflow job for this annotation

GitHub Actions / Build and test

Type mismatch. Expecting a� 'string -> Equinox.Decider<Events.Event,Fold.State>' �but given a� 'Equinox.Core.StreamId -> Equinox.Decider<Events.Event,Fold.State>' �The type 'string' does not match the type 'Equinox.Core.StreamId'
#else
let private create log defaultCheckpointFrequency resolveStream =
let resolve id = Equinox.Decider(log, resolveStream Equinox.AllowStale (streamName id), maxAttempts = 3)
Expand Down
11 changes: 4 additions & 7 deletions src/Propulsion.DynamoStore.Indexer/Function.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,19 @@ type Configuration(?tryGet) =
member _.DynamoIndexTable = get Propulsion.DynamoStore.Lambda.Args.Dynamo.INDEX_TABLE
member _.OnlyWarnGap = tryGet Propulsion.DynamoStore.Lambda.Args.Dynamo.ONLY_WARN_GAP |> Option.map bool.Parse

type Store(connector : DynamoStoreConnector, table, dynamoItemSizeCutoffBytes) =
type Store(connector: DynamoStoreConnector, table, dynamoItemSizeCutoffBytes) =
let queryMaxItems = 100

let client = connector.CreateClient()
let storeClient = DynamoStoreClient(client, table)
let context = DynamoStoreContext(storeClient, maxBytes = dynamoItemSizeCutoffBytes, queryMaxItems = queryMaxItems)
let storeClient = connector.CreateDynamoDbClient() |> DynamoStoreClient
member val Context = DynamoStoreContext(storeClient, table, maxBytes = dynamoItemSizeCutoffBytes, queryMaxItems = queryMaxItems)

new (c : Configuration, requestTimeout, retries, dynamoItemSizeCutoffBytes) =
new (c: Configuration, requestTimeout, retries, dynamoItemSizeCutoffBytes) =
let conn =
match c.DynamoRegion with
| Some r -> DynamoStoreConnector(r, requestTimeout, retries)
| None -> DynamoStoreConnector(c.DynamoServiceUrl, c.DynamoAccessKey, c.DynamoSecretKey, requestTimeout, retries)
Store(conn, c.DynamoIndexTable, dynamoItemSizeCutoffBytes)

member _.Context = context

type Function() =

// Larger optimizes for not needing to use TransactWriteItems as frequently
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore.Indexer/Handler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ let private parse (log : Serilog.ILogger) (dynamoEvent : Amazon.Lambda.DynamoDBE
| ot when ot = OperationType.INSERT || ot = OperationType.MODIFY ->
let p = record.Dynamodb.Keys["p"].S
let sn, n = IndexStreamId.ofP p, int64 updated["n"].N
if p.StartsWith AppendsEpoch.Category || p.StartsWith AppendsIndex.Category then indexStream <- indexStream + 1
if p.StartsWith AppendsEpoch.Stream.Category || p.StartsWith AppendsIndex.Stream.Category then indexStream <- indexStream + 1
elif p.StartsWith '$' then systemStreams <- systemStreams + 1
else
// Equinox writes all enter via the Tip. The "a" field of the tip indicates how many events were pushed in this insert/update
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.DynamoStore.Notifier/Handler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ let private parse (log : Serilog.ILogger) (dynamoEvent : DynamoDBEvent) : KeyVal
| ot when ot = OperationType.INSERT || ot = OperationType.MODIFY ->
let p = record.Dynamodb.Keys["p"].S
match FsCodec.StreamName.parse p with
| AppendsEpoch.StreamName (partitionId, epochId) ->
| AppendsEpoch.Stream.For (partitionId, epochId) ->

Check failure on line 35 in src/Propulsion.DynamoStore.Notifier/Handler.fs

View workflow job for this annotation

GitHub Actions / Build and test

This expression was expected to have type� 'obj' �but here has type� ''a * 'b'
// Calf writes won't have an "a" field
let appendedLen = match updated.TryGetValue "a" with true, v -> int64 v.N | false, _ -> 0
// Tip writes may not actually have added events, if the sync was transmuted to an update of the unfolds only
Expand All @@ -48,7 +48,7 @@ let private parse (log : Serilog.ILogger) (dynamoEvent : DynamoDBEvent) : KeyVal
let checkpoint = Checkpoint.positionOfEpochClosedAndVersion epochId isClosed n
updateTails partitionId checkpoint
| _ ->
if p.StartsWith AppendsIndex.Category then indexStream <- indexStream + 1
if p.StartsWith AppendsIndex.Stream.Category then indexStream <- indexStream + 1
else otherStream <- otherStream + 1
| et -> invalidOp (sprintf "Unknown OperationType %s" et.Value)
log.Information("Index {indexCount} Other {otherCount} NoEvents {noEventCount} Tails {tails} {summary:l}",
Expand Down
23 changes: 13 additions & 10 deletions src/Propulsion.DynamoStore/AppendsEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ open System.Collections.Immutable

/// The absolute upper limit of number of streams that can be indexed within a single Epoch (defines how Checkpoints are encoded, so cannot be changed)
let [<Literal>] MaxItemsPerEpoch = Checkpoint.MaxItemsPerEpoch
let [<Literal>] Category = "$AppendsEpoch"
module Stream =
let [<Literal>] Category = "$AppendsEpoch"
let private decodeId = FsCodec.StreamId.dec2 AppendsPartitionId.parse AppendsEpochId.parse

Check failure on line 16 in src/Propulsion.DynamoStore/AppendsEpoch.fs

View workflow job for this annotation

GitHub Actions / Build and test

The value, constructor, namespace or type 'StreamId' is not defined. Maybe you want one of the following:� StreamName� streamName� StreamName� streamName� StreamName
let private tryDecode = FsCodec.StreamName.tryFind Category >> ValueOption.map decodeId

Check failure on line 17 in src/Propulsion.DynamoStore/AppendsEpoch.fs

View workflow job for this annotation

GitHub Actions / Build and test

The type 'string<_>' does not define the field, constructor or member 'tryFind'.
let [<return: Struct>] (|For|_|) = tryDecode
#if !PROPULSION_DYNAMOSTORE_NOTIFIER
let streamId = Equinox.StreamId.gen2 AppendsPartitionId.toString AppendsEpochId.toString
let id = FsCodec.StreamId.gen2 AppendsPartitionId.toString AppendsEpochId.toString
let name = id >> FsCodec.StreamName.create Category
#endif
let [<return: Struct>] (|StreamName|_|) = function
| FsCodec.StreamName.CategoryAndIds (Category, [| pid; eid |]) -> ValueSome struct (AppendsPartitionId.parse pid, AppendsEpochId.parse eid)
| _ -> ValueNone

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
module Events =
Expand Down Expand Up @@ -128,22 +131,22 @@ type Service internal (onlyWarnOnGap, shouldClose, resolve: AppendsPartitionId *
let decider = resolve (partitionId, epochId)
if Array.isEmpty spans then async { return { accepted = [||]; closed = false; residual = [||] } } else // special-case null round-trips

let isSelf p = match IndexStreamId.toStreamName p with FsCodec.StreamName.Category c -> c = Category
let isSelf p = match IndexStreamId.toStreamName p with FsCodec.StreamName.Category c -> c = Stream.Category
if spans |> Array.exists (function { p = p } -> isSelf p) then invalidArg (nameof spans) "Writes to indices should be filtered prior to indexing"
let decide (c: Equinox.ISyncContext<_>) = Ingest.decide onlyWarnOnGap (shouldClose (c.StreamEventBytes, c.Version)) spans c.State
decider.TransactEx(decide, Equinox.AnyCachedValue)

module Factory =

let private createCategory (context, cache) = Store.Dynamo.createUnoptimized Category Events.codec Fold.initial Fold.fold (context, Some cache)
let private createCategory (context, cache) = Store.Dynamo.createUnoptimized Stream.Category Events.codec Fold.initial Fold.fold (context, Some cache)
let create log (maxBytes: int, maxVersion: int64, maxStreams: int, onlyWarnOnGap) store =
let resolve = createCategory store |> Equinox.Decider.forStream log
let shouldClose (totalBytes : int64 voption, version) totalStreams =
let closing = totalBytes.Value > maxBytes || version >= maxVersion || totalStreams >= maxStreams
if closing then log.Information("Epoch Closing v{version}/{maxVersion} {streams}/{maxStreams} streams {kib:f0}/{maxKib:f0} KiB",
version, maxVersion, totalStreams, maxStreams, float totalBytes.Value / 1024., float maxBytes / 1024.)
closing
Service((if onlyWarnOnGap then Some log else None), shouldClose, streamId >> resolve)
Service((if onlyWarnOnGap then Some log else None), shouldClose, Stream.id >> resolve)

/// Manages the loading of Ingested Span Batches in a given Epoch from a given position forward
/// In the case where we are polling the tail, this should mean we typically do a single round-trip for a point read of the Tip
Expand Down Expand Up @@ -176,8 +179,8 @@ module Reader =

module Factory =

let private createCategory context minIndex = Store.Dynamo.createWithOriginIndex Category codec initial fold context minIndex
let private createCategory context minIndex = Store.Dynamo.createWithOriginIndex Stream.Category codec initial fold context minIndex
let create log context =
let resolve minIndex = Equinox.Decider.forStream log (createCategory context minIndex)
Service(fun (pid, eid, minIndex) -> streamId (pid, eid) |> resolve minIndex)
Service(fun (pid, eid, minIndex) -> Stream.id (pid, eid) |> resolve minIndex)
#endif
12 changes: 7 additions & 5 deletions src/Propulsion.DynamoStore/AppendsIndex.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
/// As an Epoch is marked `Closed`, `module Index` will mark a new epoch `Started` on this aggregate
module Propulsion.DynamoStore.AppendsIndex

let [<Literal>] Category = "$AppendsIndex"
module Stream =
let [<Literal>] Category = "$AppendsIndex"
#if !PROPULSION_DYNAMOSTORE_NOTIFIER
let streamId () = Equinox.StreamId.gen IndexId.toString IndexId.wellKnownId
let id () = FsCodec.StreamId.gen IndexId.toString IndexId.wellKnownId
let name = id >> FsCodec.StreamName.create Category

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
Expand Down Expand Up @@ -65,9 +67,9 @@ type Service internal (resolve: unit -> Equinox.Decider<Events.Event, Fold.State

module Factory =

let private createCategory store = Store.Dynamo.createSnapshotted Category Events.codec Fold.initial Fold.fold Fold.Snapshot.config store
let private createCategory store = Store.Dynamo.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold Fold.Snapshot.config store
let resolve log store = createCategory store |> Equinox.Decider.forStream log
let create log (context, cache) = Service(streamId >> resolve log (context, Some cache))
let create log (context, cache) = Service(Stream.id >> resolve log (context, Some cache))

/// On the Reading Side, there's no advantage to caching (as we have snapshots, and it's Dynamo)
module Reader =
Expand All @@ -92,5 +94,5 @@ module Reader =
let decider = resolve ()
decider.Query(readIngestionEpochId partitionId)

let create log context = Service(streamId >> Factory.resolve log (context, None))
let create log context = Service(Stream.id >> Factory.resolve log (context, None))
#endif
17 changes: 8 additions & 9 deletions src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,15 @@ module internal EventLoadMode =
true, withData eventsContext categoryFilter, dop

type DynamoStoreSource
( log : Serilog.ILogger, statsInterval,
indexClient : DynamoStoreClient, batchSizeCutoff, tailSleepInterval,
checkpoints : Propulsion.Feed.IFeedCheckpointStore, sink : Propulsion.Sinks.Sink,
( log: Serilog.ILogger, statsInterval,
indexContext: DynamoStoreContext, batchSizeCutoff, tailSleepInterval,
checkpoints: Propulsion.Feed.IFeedCheckpointStore, sink: Propulsion.Sinks.Sink,
// If the Handler does not utilize the Data/Meta of the events, we can avoid having to read from the Store Table
mode : EventLoadMode,
mode: EventLoadMode,
// The whitelist of Categories to use
?categories,
// Predicate to filter Categories to use
?categoryFilter : Func<string, bool>,
?categoryFilter: Func<string, bool>,
// Override default start position to be at the tail of the index. Default: Replay all events.
?startFromTail,
// Separated log for DynamoStore calls in order to facilitate filtering and/or gathering metrics
Expand All @@ -164,12 +164,12 @@ type DynamoStoreSource
( log, statsInterval, defaultArg sourceId FeedSourceId.wellKnownId, tailSleepInterval,
checkpoints,
( if startFromTail <> Some true then None
else Some (Impl.readTailPositionForPartition (defaultArg storeLog log) (DynamoStoreContext indexClient))),
else Some (Impl.readTailPositionForPartition (defaultArg storeLog log) indexContext)),
sink, Impl.renderPos,
Impl.materializeIndexEpochAsBatchesOfStreamEvents
(log, defaultArg sourceId FeedSourceId.wellKnownId, defaultArg storeLog log)
(EventLoadMode.map (Propulsion.Feed.Core.Categories.mapFilters categories categoryFilter) (defaultArg storeLog log) mode)
batchSizeCutoff (DynamoStoreContext indexClient),
batchSizeCutoff indexContext,
Impl.logReadFailure (defaultArg storeLog log),
defaultArg readFailureSleepInterval (tailSleepInterval * 2.),
Impl.logCommitFailure (defaultArg storeLog log))
Expand All @@ -179,9 +179,8 @@ type DynamoStoreSource
match trancheIds with
| Some ids -> return ids
| None ->
let context = DynamoStoreContext(indexClient)
let storeLog = defaultArg storeLog log
let! res = Impl.readPartitions storeLog context |> Async.executeAsTask ct
let! res = Impl.readPartitions storeLog indexContext |> Async.executeAsTask ct
let appendsPartitionIds = match res with [||] -> [| AppendsPartitionId.wellKnownId |] | ids -> ids
return appendsPartitionIds |> Array.map AppendsPartitionId.toTrancheId }

Expand Down
7 changes: 4 additions & 3 deletions src/Propulsion.EventStore/Checkpoint.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ module CheckpointSeriesId =
let ofGroupName (groupName : string) = UMX.tag groupName
let toString (x : CheckpointSeriesId) = UMX.untag x

let [<Literal>] Category = "Sync"
let streamId = Equinox.StreamId.gen CheckpointSeriesId.toString
module Stream =
let [<Literal>] Category = "Sync"
let id = FsCodec.StreamId.gen CheckpointSeriesId.toString

Check failure on line 15 in src/Propulsion.EventStore/Checkpoint.fs

View workflow job for this annotation

GitHub Actions / Build and test

The value, constructor, namespace or type 'StreamId' is not defined. Maybe you want one of the following:� StreamName� streamName� StreamName� streamName� StreamName

// NB - these schemas reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand Down Expand Up @@ -119,7 +120,7 @@ type Service internal (resolve: CheckpointSeriesId -> Equinox.DeciderCore<Events
// This light wrapper means we can adhere to that general pattern yet still end up with legible code while we in practice only maintain a single checkpoint series per running app
type CheckpointSeries(groupName, resolve) =
let seriesId = CheckpointSeriesId.ofGroupName groupName
let inner = Service(streamId >> resolve)
let inner = Service(Stream.id >> resolve)

member _.Read(ct): Task<Fold.State> = inner.Read(seriesId, ct)
member _.Start(freq, pos, ct): Task<unit> = inner.Start(seriesId, freq, pos, ct)
Expand Down
Loading

0 comments on commit 068f176

Please sign in to comment.