Skip to content

Commit

Permalink
Finesse
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Aug 8, 2023
1 parent 0cb169a commit 6ca6027
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 5 deletions.
2 changes: 1 addition & 1 deletion DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ There's a [glossary of terms in the Equinox Documentation](https:/je
| Consumer Group | Name used to identify a set of checkpoint positions for each Tranche of a Source, aka Subscription
| Consumer Load Balancing | Built in lease-based allocation of Partitions to distribute load across instances of a Processor based on a common Consumer Group Name e.g. the broker managed system in Kafka, the Change Feed Processor library within the `Microsoft.Azure.Cosmos` Client
| Batch | Group of Events from a Source. Typically includes a Checkpoint callback that's invoked when all events have been handled
| Category | Group of Streams for a Source matching `{categooryName}-{streamId}` pattern. MessageDB exposes a Feed per Category
| Category | Group of Streams for a Source matching `{category}-{streamId}` pattern. MessageDB exposes a Feed per Category
| DynamoStore Index | An `Equinox.DynamoStore` containing a sequence of (Stream,Index,Event Type) entries referencing Events in a Store.<br/>Written by a `Propulsion.DynamoStore.Indexer` Lambda. Can be split into Partitions
| Event | An Event from a Stream, obtained from a
| Feed | Incrementally readable portion of a Source that affords a way to represent a Position within that as a durable Checkpoint, with Events being appended at the tail<br/>e.g. the EventStoreDb `$all` stream, an ATOM feed over HTTP, the content of a Physical Partition of a CosmosDb Container
Expand Down
3 changes: 1 addition & 2 deletions src/Propulsion.MemoryStore/MemoryStoreLogger.fs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ let renderCompleted (log : Serilog.ILogger) struct (epoch, categoryName, streamI
/// Wires specified <c>Observable</c> source (e.g. <c>VolatileStore.Committed</c>) to the Logger
let subscribe log source =
let mutable epoch = -1L
let aux struct (sn, events) =
let struct (categoryName, streamId) = FsCodec.StreamName.split sn
let aux struct (FsCodec.StreamName.Split (categoryName, streamId), events) =
let epoch = Interlocked.Increment &epoch
renderSubmit log (epoch, categoryName, streamId, events)
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then Observable.subscribe aux source
Expand Down
3 changes: 1 addition & 2 deletions src/Propulsion.MemoryStore/MemoryStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ type MemoryStoreSource<'F>(log, store : Equinox.MemoryStore.VolatileStore<'F>, c

let storeCommitsSubscription =
store.Committed
|> Observable.choose (fun struct (sn, es) ->
let struct (categoryName, streamId) = FsCodec.StreamName.split sn
|> Observable.choose (fun struct (FsCodec.StreamName.Split (categoryName, streamId) as sn, es) ->
if categoryFilter categoryName then
let items : Propulsion.Sinks.StreamEvent[] = es |> Array.map (fun e -> sn, mapTimelineEvent.Invoke e)
Some (struct (categoryName, streamId, items))
Expand Down

0 comments on commit 6ca6027

Please sign in to comment.