Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Cosmos, propulsion sync): propagate unfolds #263

Merged
merged 49 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
9bc6130
feat(Cosmos Reader+Writer, Scheduler): propagate unfolds
bartelink Jul 12, 2024
cef23d8
Basic dropping on merges
bartelink Jul 12, 2024
6ce4dc9
Finalize merge cases
bartelink Jul 12, 2024
6e0d179
Wire up parse logic
bartelink Jul 13, 2024
8770230
Prevent orphaning of unfolds
bartelink Jul 13, 2024
06876c7
Comments
bartelink Jul 13, 2024
bb557a5
Remove StreamResult
bartelink Jul 15, 2024
c7d5041
Simplify split algorithm
bartelink Jul 15, 2024
c4c09df
feat!(Streams): Support propagating Unfolds
bartelink Jul 15, 2024
4c89f50
Merge branch 'remove-streamresult' into project-unfolds
bartelink Jul 15, 2024
d141ddc
Wire in ingestion in propulsion sync
bartelink Jul 15, 2024
a112eb7
Address review comments
bartelink Jul 16, 2024
207269f
Defer change to next PR
bartelink Jul 16, 2024
4254c9d
Merge branch 'remove-streamresult' into project-unfolds
bartelink Jul 16, 2024
d90d5fd
Revert "Defer change to next PR"
bartelink Jul 16, 2024
a46e104
Rewordings
bartelink Jul 16, 2024
a9261d8
Tidy
bartelink Jul 16, 2024
512bb75
Merge branch 'remove-streamresult' into project-unfolds
bartelink Jul 16, 2024
a100f1e
Tidy/fix slice algo
bartelink Jul 16, 2024
a8684a9
Stop misreporting stalls
bartelink Jul 16, 2024
dad903d
Polish maxItems
bartelink Jul 16, 2024
d32a4a5
Polish maxItems
bartelink Jul 16, 2024
bcbb099
Merge branch 'remove-streamresult' into project-unfolds
bartelink Jul 16, 2024
bcacc6c
Tidy/fix slice algo
bartelink Jul 16, 2024
e1c3c4b
Merge branch 'remove-streamresult' into project-unfolds
bartelink Jul 16, 2024
6b84c8c
Tidy
bartelink Jul 16, 2024
573e685
Tidy
bartelink Jul 17, 2024
0d7f33c
Fix edge cases
bartelink Jul 17, 2024
878a0a7
Simplify
bartelink Jul 17, 2024
acc08c3
Cleanup
bartelink Jul 17, 2024
2f919ce
Correct/pin type
bartelink Jul 17, 2024
685de1f
Default args to include sys and unfolds
bartelink Jul 18, 2024
e78b739
Polish comments
bartelink Jul 18, 2024
52b1405
remove Unchecked usage
bartelink Jul 18, 2024
3f25d69
Initial impl
bartelink Jul 22, 2024
0e76a98
Log consistency
bartelink Jul 23, 2024
d310aec
Cleanup Dispatch
bartelink Jul 23, 2024
27084e3
Complete basic shape
bartelink Jul 30, 2024
5223677
Debug, pplish
bartelink Aug 1, 2024
e4c741e
Timeout adjust
bartelink Aug 1, 2024
1227038
Polish batch stats
bartelink Aug 1, 2024
642d189
Format/style changes
bartelink Aug 1, 2024
50d6eb8
Renames: nextIndex->next
bartelink Aug 1, 2024
31b8fb8
Merge branch 'remove-streamresult' into project-unfolds
bartelink Aug 1, 2024
b375d90
Batch Ingestion/Progress extensions/fixes
bartelink Aug 1, 2024
edd3004
Merge branch 'remove-streamresult' into project-unfolds
bartelink Aug 1, 2024
5ac3755
Equinox.CosmosStore 4.1.0-alpha.15
bartelink Aug 1, 2024
5fd1679
Merge remote-tracking branch 'origin/master' into project-unfolds
bartelink Aug 1, 2024
c71d38f
Merge branch 'master' into project-unfolds
bartelink Aug 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.Tool`: `checkpoint` commandline option; enables viewing or overriding checkpoints [#141](https:/jet/propulsion/pull/141)
- `Propulsion.Tool`: `sync <kafka|stats>` supports `from json` source option [#250](https:/jet/propulsion/pull/250)
- `Propulsion.Tool`: Add support for [autoscaling throughput](https://docs.microsoft.com/en-us/azure/cosmos-db/provision-throughput-autoscale) of Cosmos containers and databases [#142](https:/jet/propulsion/pull/142) :pray: [@brihadish](https:/brihadish)
- `Propulsion.Tool`: `sync cosmos from <cosmos|json>` [#252](https:/jet/propulsion/pull/252)
- `Propulsion.Tool`: `sync cosmos from <cosmos|json>` [#252](https:/jet/propulsion/pull/252) [#263](https:/jet/propulsion/pull/263)

### Changed

Expand Down
5 changes: 3 additions & 2 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ module Internal =
let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData)
|> Async.executeAsTask ct
#else
span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore does not [yet] support ingesting unfolds")
let! res = ctx.Sync(stream, { index = i; etag = None }, span |> Array.map mapData, ct)
let unfolds, events = span |> Array.partition _.IsUnfold
log.Debug("Writing {s}@{i}x{n}+{u}", stream, i, events.Length, unfolds.Length)
let! res = ctx.Sync(stream, { index = i; etag = None }, events |> Array.map mapData, unfolds |> Array.map mapData, ct)
#endif
let res' =
match res with
Expand Down
78 changes: 48 additions & 30 deletions src/Propulsion.CosmosStore/EquinoxSystemTextJsonParser.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
namespace Propulsion.CosmosStore

open Equinox.CosmosStore.Core

open Propulsion.Internal
open Propulsion.Sinks

/// <summary>Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by Propulsion.Streams.</summary>
Expand All @@ -10,20 +10,19 @@ open Propulsion.Sinks
#if !COSMOSV3
module EquinoxSystemTextJsonParser =

type System.Text.Json.JsonDocument with
member document.Cast<'T>() =
System.Text.Json.JsonSerializer.Deserialize<'T>(document.RootElement)
type Batch with
member _.MapData x =
System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x
type System.Text.Json.JsonElement with
member x.Cast<'T>() = System.Text.Json.JsonSerializer.Deserialize<'T>(x)
member x.ToSinkEventBody() = System.Text.Json.JsonSerializer.SerializeToUtf8Bytes x |> System.ReadOnlyMemory

type System.Text.Json.JsonDocument with member x.Cast<'T>() = x.RootElement.Cast<'T>()
let timestamp (doc: System.Text.Json.JsonDocument) =
let unixEpoch = System.DateTime.UnixEpoch
let ts = let r = doc.RootElement in r.GetProperty("_ts")
unixEpoch.AddSeconds(ts.GetDouble())

/// Parses an Equinox.Cosmos Batch from a CosmosDB Item
/// returns ValueNone if it does not bear required elements of a `Equinox.Cosmos` >= 1.0 Batch, or the streamFilter predicate rejects it
let tryParseEquinoxBatch streamFilter (d: System.Text.Json.JsonDocument) =
let tryParseEquinoxBatchOrTip streamFilter (d: System.Text.Json.JsonDocument) =
let r = d.RootElement
let tryProp (id: string): ValueOption<System.Text.Json.JsonElement> =
let mutable p = Unchecked.defaultof<_>
Expand All @@ -33,36 +32,55 @@ module EquinoxSystemTextJsonParser =
match tryProp "p" with
| ValueSome je when je.ValueKind = System.Text.Json.JsonValueKind.String && hasProp "i" && hasProp "n" && hasProp "e" ->
let sn = je.GetString() |> FsCodec.StreamName.parse // we expect all Equinox data to adhere to "{category}-{streamId}" form (or we'll throw)
if streamFilter sn then ValueSome (struct (sn, d.Cast<Batch>())) else ValueNone
if streamFilter sn then ValueSome (struct (sn, d.Cast<Batch>(), tryProp "u")) else ValueNone
| _ -> ValueNone

/// Enumerates the events represented within a batch
let enumEquinoxCosmosEvents (batch: Batch): Event seq =
batch.e |> Seq.mapi (fun offset x ->
let d = batch.MapData x.d
let m = batch.MapData x.m
/// Enumerates the Events and/or Unfolds represented within an Equinox.CosmosStore Batch or Tip Item
let enumEquinoxCosmosBatchOrTip (u: System.Text.Json.JsonElement voption) (batch: Batch): Event seq =
let inline gen isUnfold i (x: Equinox.CosmosStore.Core.Event) =
let d = x.d.ToSinkEventBody()
let m = x.m.ToSinkEventBody()
let inline len s = if isNull s then 0 else String.length s
FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, d, m, timestamp = x.t,
FsCodec.Core.TimelineEvent.Create(i, x.c, d, m, timestamp = x.t,
size = x.c.Length + d.Length + m.Length + len x.correlationId + len x.causationId + 80,
correlationId = x.correlationId, causationId = x.causationId))

/// Attempts to parse a Document/Item from the Store
correlationId = x.correlationId, causationId = x.causationId, isUnfold = isUnfold)
let events = batch.e |> Seq.mapi (fun offset -> gen false (batch.i + int64 offset))
// an Unfold won't have a corr/cause id, but that's OK - can't use Tip type as don't want to expand compressed form etc
match u |> ValueOption.map (fun u -> u.Cast<Equinox.CosmosStore.Core.Event[]>()) with
| ValueNone | ValueSome null | ValueSome [||] -> events
| ValueSome unfolds -> seq {
yield! events
for x in unfolds do
gen true batch.n x }
let inline tryEnumStreamEvents_ withUnfolds streamFilter jsonDocument: seq<StreamEvent> voption =
tryParseEquinoxBatchOrTip streamFilter jsonDocument
|> ValueOption.map (fun struct (s, xs, u) -> enumEquinoxCosmosBatchOrTip (if withUnfolds then u else ValueNone) xs |> Seq.map (fun x -> s, x))

/// Attempts to parse the Events from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument
/// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects
let tryEnumStreamEvents streamFilter d: seq<StreamEvent> voption =
tryParseEquinoxBatch streamFilter d
|> ValueOption.map (fun struct (s, xs) -> enumEquinoxCosmosEvents xs |> Seq.map (fun x -> s, x))
let tryEnumStreamEvents streamFilter jsonDocument: seq<StreamEvent> voption =
tryEnumStreamEvents_ false streamFilter jsonDocument

/// Extracts all events that pass the streamFilter from a Feed item
let whereStream streamFilter jsonDocument: StreamEvent seq =
tryEnumStreamEvents streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty

/// Collects all events that pass the streamFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let whereStream streamFilter d: StreamEvent seq =
tryEnumStreamEvents streamFilter d |> ValueOption.defaultValue Seq.empty
/// Extracts all events passing the supplied categoryFilter from a Feed Item
let whereCategory categoryFilter jsonDocument: StreamEvent seq =
whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) jsonDocument

/// Collects all events passing the supplied categoryFilter from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let whereCategory categoryFilter d: StreamEvent seq =
whereStream (FsCodec.StreamName.Category.ofStreamName >> categoryFilter) d
/// Extracts all events from the specified category list from a Feed Item
let ofCategories (categories: string[]) jsonDocument: StreamEvent seq =
whereCategory (fun c -> Array.contains c categories) jsonDocument

/// Attempts to parse the Events and/or Unfolds from an Equinox.CosmosStore Batch or Tip Item represented as a JsonDocument
/// returns ValueNone if it does not bear the hallmarks of a valid Batch, or the streamFilter predicate rejects
let tryEnumStreamEventsAndUnfolds streamFilter jsonDocument: seq<StreamEvent> voption =
tryEnumStreamEvents_ true streamFilter jsonDocument

/// Collects all events from the specified category list from a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let ofCategories categories d: StreamEvent seq =
whereCategory (fun c -> Array.contains c categories) d
/// Extracts Events and Unfolds that pass the streamFilter from a Feed item
let eventsAndUnfoldsWhereStream streamFilter jsonDocument: StreamEvent seq =
tryEnumStreamEventsAndUnfolds streamFilter jsonDocument |> ValueOption.defaultValue Seq.empty
#else
module EquinoxNewtonsoftParser =

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="4.0.0" />
<PackageReference Include="Equinox.CosmosStore" Version="4.1.0-alpha.15" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion tools/Propulsion.Tool/Propulsion.Tool.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.38.1" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.42.0" />
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="All" />

<PackageReference Include="Argu" Version="6.2.2" />
Expand Down
22 changes: 14 additions & 8 deletions tools/Propulsion.Tool/Sync.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ type [<NoEquality; NoComparison; RequireSubcommand>] Parameters =
| [<AltCommandLine "-Z"; Unique>] FromTail
| [<AltCommandLine "-F"; Unique>] Follow
| [<AltCommandLine "-A"; Unique>] RequireAll
| [<AltCommandLine "-E"; Unique>] EventsOnly
| [<AltCommandLine "-C"; Unique>] Categorize
| [<AltCommandLine "-b"; Unique>] MaxItems of int

| [<AltCommandLine "-I"; AltCommandLine "--include-system"; Unique>] IncSys
| [<AltCommandLine "-N"; AltCommandLine "--exclude-system"; Unique>] ExcSys
| [<AltCommandLine "-cat"; AltCommandLine "--include-category">] IncCat of regex: string
| [<AltCommandLine "-ncat"; AltCommandLine "--exclude-category">] ExcCat of regex: string
| [<AltCommandLine "-sn"; AltCommandLine "--include-streamname">] IncStream of regex: string
Expand All @@ -37,10 +38,11 @@ type [<NoEquality; NoComparison; RequireSubcommand>] Parameters =
"NOTE normally a large `MaxReadAhead` and `cosmos -b` is required to avoid starving the scheduler. " +
"NOTE This mode does not make sense to apply unless the ProcessorName is fresh; if the consumer group name is not fresh (and hence items are excluded from the feed), there will inevitably be missing events, and processing will stall. " +
"Default: assume events arrive from the changefeed (and/or the input JSON file) without any gaps or out of order deliveries for any stream."
| EventsOnly -> "Exclude Unfolds from processing. Default: Unfolds are read, parsed and processed"
| Categorize -> "Gather handler latency stats by category"
| MaxItems _ -> "Limits RU consumption when reading; impacts checkpointing granularity by adjusting the batch size being loaded from the feed. Sync Default (Sync): 9999. Default: Unlimited"
| MaxItems _ -> "Limits RU consumption when reading; impacts checkpointing granularity by adjusting the batch size being loaded from the feed. Default (Sync): 9999. Default: Unlimited"

| IncSys -> "Include System streams. Default: Exclude Index Streams, identified by a $ prefix."
| ExcSys -> "Exclude System streams. Default: Include Index Streams, identified by a $ prefix."
| IncCat _ -> "Allow Stream Category. Multiple values are combined with OR. Default: include all, subject to Category Deny and Stream Deny rules."
| ExcCat _ -> "Deny Stream Category. Specified values/regexes are applied after the Category Allow rule(s)."
| IncStream _ -> "Allow Stream Name. Multiple values are combined with OR. Default: Allow all streams that pass the category Allow test, Fail the Category and Stream deny tests."
Expand All @@ -56,9 +58,10 @@ and Arguments(c, p: ParseResults<Parameters>) =
member val Filters = Propulsion.StreamFilter(
allowCats = p.GetResults IncCat, denyCats = p.GetResults ExcCat,
allowSns = p.GetResults IncStream, denySns = p.GetResults ExcStream,
includeSystem = p.Contains IncSys,
includeSystem = not (p.Contains ExcSys),
allowEts = p.GetResults IncEvent, denyEts = p.GetResults ExcEvent)
member val Categorize = p.Contains Categorize
member val IncludeUnfolds = not (p.Contains EventsOnly)
member val Command =
match p.GetSubCommand() with
| Kafka a -> KafkaArguments(c, a) |> SubCommand.Kafka
Expand Down Expand Up @@ -233,7 +236,10 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
Some p
| SubCommand.Stats _ | SubCommand.Sync _ -> None
let isFileSource = match a.Command.Source with Json _ -> true | _ -> false
let parse = a.Filters.CreateStreamFilter() |> Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream
let parse =
a.Filters.CreateStreamFilter()
|> if a.IncludeUnfolds then Propulsion.CosmosStore.EquinoxSystemTextJsonParser.eventsAndUnfoldsWhereStream
else Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream
let statsInterval, stateInterval = a.StatsInterval, a.StateInterval
let maxReadAhead = p.GetResult(MaxReadAhead, if isFileSource then 32768 else 2)
let maxConcurrentProcessors = p.GetResult(MaxWriters, 8)
Expand All @@ -254,9 +260,9 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
requireAll = requireAll)
| SubCommand.Sync sa ->
let eventsContext = sa.ConnectEvents() |> Async.RunSynchronously
let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, statsInterval, stateInterval,
let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, statsInterval, stateInterval, storeLog = Metrics.log,
logExternalStats = dumpStoreStats, Categorize = a.Categorize)
Propulsion.CosmosStore.CosmosStoreSink.Start(Metrics.log, maxReadAhead, eventsContext, maxConcurrentProcessors, stats,
Propulsion.CosmosStore.CosmosStoreSink.Start(Log.Logger, maxReadAhead, eventsContext, maxConcurrentProcessors, stats,
maxBytes = sa.MaxBytes, requireAll = requireAll,
?purgeInterval = if requireAll then None else Some (TimeSpan.hours 1))
let source =
Expand Down Expand Up @@ -299,7 +305,7 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
if follow then
source.AwaitWithStopOnCancellation()
else async {
let initialWait = TimeSpan.seconds 10
let initialWait = TimeSpan.seconds 30
do! source.Monitor.AwaitCompletion(initialWait, awaitFullyCaughtUp = true, logInterval = statsInterval / 2.) |> Async.ofTask
source.Stop()
do! source.Await() // Let it emit the stats
Expand Down
Loading