diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 43696094..5ef44db8 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -50,7 +50,7 @@ There's a [glossary of terms in the Equinox Documentation](https://github.com/je | Consumer Group | Name used to identify a set of checkpoint positions for each Tranche of a Source, aka Subscription | Consumer Load Balancing | Built in lease-based allocation of Partitions to distribute load across instances of a Processor based on a common Consumer Group Name e.g. the broker managed system in Kafka, the Change Feed Processor library within the `Microsoft.Azure.Cosmos` Client | Batch | Group of Events from a Source. Typically includes a Checkpoint callback that's invoked when all events have been handled -| Category | Group of Streams for a Source matching `{categooryName}-{streamId}` pattern. MessageDB exposes a Feed per Category +| Category | Group of Streams for a Source matching `{category}-{streamId}` pattern. MessageDB exposes a Feed per Category | DynamoStore Index | An `Equinox.DynamoStore` containing a sequence of (Stream,Index,Event Type) entries referencing Events in a Store.
Written by a `Propulsion.DynamoStore.Indexer` Lambda. Can be split into Partitions | Event | An Event from a Stream, obtained from a | Feed | Incrementally readable portion of a Source that affords a way to represent a Position within that as a durable Checkpoint, with Events being appended at the tail
e.g. the EventStoreDb `$all` stream, an ATOM feed over HTTP, the content of a Physical Partition of a CosmosDb Container diff --git a/src/Propulsion.MemoryStore/MemoryStoreLogger.fs b/src/Propulsion.MemoryStore/MemoryStoreLogger.fs index 47d99d66..f6d304e5 100644 --- a/src/Propulsion.MemoryStore/MemoryStoreLogger.fs +++ b/src/Propulsion.MemoryStore/MemoryStoreLogger.fs @@ -30,8 +30,7 @@ let renderCompleted (log : Serilog.ILogger) struct (epoch, categoryName, streamI /// Wires specified Observable source (e.g. VolatileStore.Committed) to the Logger let subscribe log source = let mutable epoch = -1L - let aux struct (sn, events) = - let struct (categoryName, streamId) = FsCodec.StreamName.split sn + let aux struct (FsCodec.StreamName.Split (categoryName, streamId), events) = let epoch = Interlocked.Increment &epoch renderSubmit log (epoch, categoryName, streamId, events) if log.IsEnabled Serilog.Events.LogEventLevel.Debug then Observable.subscribe aux source diff --git a/src/Propulsion.MemoryStore/MemoryStoreSource.fs b/src/Propulsion.MemoryStore/MemoryStoreSource.fs index da8bd070..ae6dc115 100644 --- a/src/Propulsion.MemoryStore/MemoryStoreSource.fs +++ b/src/Propulsion.MemoryStore/MemoryStoreSource.fs @@ -36,8 +36,7 @@ type MemoryStoreSource<'F>(log, store : Equinox.MemoryStore.VolatileStore<'F>, c let storeCommitsSubscription = store.Committed - |> Observable.choose (fun struct (sn, es) -> - let struct (categoryName, streamId) = FsCodec.StreamName.split sn + |> Observable.choose (fun struct (FsCodec.StreamName.Split (categoryName, streamId) as sn, es) -> if categoryFilter categoryName then let items : Propulsion.Sinks.StreamEvent[] = es |> Array.map (fun e -> sn, mapTimelineEvent.Invoke e) Some (struct (categoryName, streamId, items))