diff --git a/tools/Propulsion.Tool/Infrastructure.fs b/tools/Propulsion.Tool/Infrastructure.fs index b42815f4..2339f7ef 100644 --- a/tools/Propulsion.Tool/Infrastructure.fs +++ b/tools/Propulsion.Tool/Infrastructure.fs @@ -21,7 +21,7 @@ module EnvVar = module Sinks = let equinoxMetricsOnly (l: LoggerConfiguration) = - l.WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink()) + l.WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink(categorize = true)) .WriteTo.Sink(Equinox.DynamoStore.Core.Log.InternalMetrics.Stats.LogSink()) let console verbose (configuration: LoggerConfiguration) = let outputTemplate = @@ -64,7 +64,7 @@ type Logging() = module CosmosStoreConnector = let private get (role: string) (client: Microsoft.Azure.Cosmos.CosmosClient) databaseId containerId = - Log.Information("CosmosDB {role} Database {database} Container {container}", role, databaseId, containerId) + Log.Information("CosmosDB {role} {database}/{container}", role, databaseId, containerId) client.GetDatabase(databaseId).GetContainer(containerId) let getSource = get "Source" let getLeases = get "Leases" @@ -74,7 +74,7 @@ type Logging() = type Equinox.CosmosStore.CosmosStoreContext with member x.LogConfiguration(role, databaseId: string, containerId: string) = - Log.Information("CosmosStore {role:l} {db}/{container} Tip maxEvents {maxEvents} maxSize {maxJsonLen} Query maxItems {queryMaxItems}", + Log.Information("CosmosStore {role:l} {database}/{container} Tip maxEvents {maxEvents} maxSize {maxJsonLen} Query maxItems {queryMaxItems}", role, databaseId, containerId, x.TipOptions.MaxEvents, x.TipOptions.MaxJsonLength, x.QueryOptions.MaxItems) type Equinox.CosmosStore.CosmosStoreClient with @@ -89,7 +89,7 @@ type Logging() = member private x.LogConfiguration(role, databaseId: string, containers: string[]) = let o = x.Options let timeout, retries429, timeout429 = o.RequestTimeout, o.MaxRetryAttemptsOnRateLimitedRequests, o.MaxRetryWaitTimeOnRateLimitedRequests - Log.Information("CosmosDB {role} {mode} {endpointUri} {db} {containers} timeout {timeout}s Throttling retries {retries}, max wait {maxRetryWaitTime}s", + Log.Information("CosmosDB {role} {mode} {endpointUri} {database}/{containers} timeout {timeout}s Retries {retries}<{maxRetryWaitTime}s", role, o.ConnectionMode, x.Endpoint, databaseId, containers, timeout.TotalSeconds, retries429, let t = timeout429.Value in t.TotalSeconds) member private x.CreateAndInitialize(role, databaseId, containers) = x.LogConfiguration(role, databaseId, containers) diff --git a/tools/Propulsion.Tool/Sync.fs b/tools/Propulsion.Tool/Sync.fs index 0eba6dc5..2c112d0b 100644 --- a/tools/Propulsion.Tool/Sync.fs +++ b/tools/Propulsion.Tool/Sync.fs @@ -30,7 +30,7 @@ type [] Parameters = interface IArgParserTemplate with member a.Usage = a |> function | ConsumerGroupName _ -> "Projector instance context name. Optional if source is JSON" - | MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: File: 32768 Other: 2." + | MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: File: 32768 Other: 4." | MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 8 (Cosmos: 16)." | FromTail -> "(iff fresh projection) - force starting from present Position. Default: Ensure each and every event is projected from the start." | Follow -> "Stop when the Tail is reached." @@ -40,7 +40,7 @@ type [] Parameters = "Default: assume events arrive from the changefeed (and/or the input JSON file) without any gaps or out of order deliveries for any stream." | EventsOnly -> "Exclude Unfolds from processing. Default: Unfolds are read, parsed and processed" | Categorize -> "Gather handler latency stats by category" - | MaxItems _ -> "Limits RU consumption when reading; impacts checkpointing granularity by adjusting the batch size being loaded from the feed. Default (Sync): 9999. Default: Unlimited" + | MaxItems _ -> "Limits RU consumption when reading; impacts checkpointing granularity by adjusting the batch size being loaded from the feed. Default (Sync): 9999. Default: 100 (CosmosClient default)" | ExcSys -> "Exclude System streams. Default: Include Index Streams, identified by a $ prefix." | IncCat _ -> "Allow Stream Category. Multiple values are combined with OR. Default: include all, subject to Category Deny and Stream Deny rules." @@ -61,7 +61,7 @@ and Arguments(c, p: ParseResults) = includeSystem = not (p.Contains ExcSys), allowEts = p.GetResults IncEvent, denyEts = p.GetResults ExcEvent) member val Categorize = p.Contains Categorize - member val IncludeUnfolds = not (p.Contains EventsOnly) + member val EventsOnly = p.Contains EventsOnly member val Command = match p.GetSubCommand() with | Kafka a -> KafkaArguments(c, a) |> SubCommand.Kafka @@ -122,7 +122,7 @@ and [] CosmosParameters = | Container _ -> "specify a container name for store." | LeaseContainerId _ -> "store leases in Sync target DB (default: use `-aux` adjacent to the Source Container). Enables the Source to be read via a ReadOnly connection string." | Timeout _ -> "specify operation timeout in seconds. Default: 5." - | Retries _ -> "specify operation retries. Default: 0." + | Retries _ -> "specify operation retries. Default: 2." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." | MaxKiB _ -> "specify maximum size in KiB to pass to the Sync stored proc (reduce if Malformed Streams due to 413 RequestTooLarge responses). Default: 128." | From _ -> "Specify Source." @@ -133,9 +133,9 @@ and CosmosArguments(c: Args.Configuration, p: ParseResults) = | Json _ -> p.GetResult Connection | x -> p.Raise $"unexpected subcommand %A{x}" let connector = - let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.seconds - let retries = p.GetResult(Retries, 1) - let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.seconds + let timeout = p.GetResult(Timeout, 5) |> TimeSpan.seconds + let retries = p.GetResult(Retries, 2) + let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5) |> TimeSpan.seconds Equinox.CosmosStore.CosmosStoreConnector(Equinox.CosmosStore.Discovery.ConnectionString connection, timeout, retries, maxRetryWaitTime) let database = match source.Store with | Cosmos c -> p.GetResult(Database, fun () -> c.Database) @@ -238,10 +238,10 @@ let run appName (c: Args.Configuration, p: ParseResults) = async { let isFileSource = match a.Command.Source with Json _ -> true | _ -> false let parse = a.Filters.CreateStreamFilter() - |> if a.IncludeUnfolds then Propulsion.CosmosStore.EquinoxSystemTextJsonParser.eventsAndUnfoldsWhereStream - else Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream + |> if a.EventsOnly then Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream + else Propulsion.CosmosStore.EquinoxSystemTextJsonParser.eventsAndUnfoldsWhereStream let statsInterval, stateInterval = a.StatsInterval, a.StateInterval - let maxReadAhead = p.GetResult(MaxReadAhead, if isFileSource then 32768 else 2) + let maxReadAhead = p.GetResult(MaxReadAhead, if isFileSource then 32768 else 4) let maxConcurrentProcessors = p.GetResult(MaxWriters, 8) let sink = match a.Command with