Skip to content

Commit

Permalink
feat(Tool sync): Log/defaults polish
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Aug 2, 2024
1 parent 9fb6c06 commit c6c18d3
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
8 changes: 4 additions & 4 deletions tools/Propulsion.Tool/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ module EnvVar =
module Sinks =

let equinoxMetricsOnly (l: LoggerConfiguration) =
l.WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink())
l.WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink(categorize = true))
.WriteTo.Sink(Equinox.DynamoStore.Core.Log.InternalMetrics.Stats.LogSink())
let console verbose (configuration: LoggerConfiguration) =
let outputTemplate =
Expand Down Expand Up @@ -64,7 +64,7 @@ type Logging() =
module CosmosStoreConnector =

let private get (role: string) (client: Microsoft.Azure.Cosmos.CosmosClient) databaseId containerId =
Log.Information("CosmosDB {role} Database {database} Container {container}", role, databaseId, containerId)
Log.Information("CosmosDB {role} {database}/{container}", role, databaseId, containerId)
client.GetDatabase(databaseId).GetContainer(containerId)
let getSource = get "Source"
let getLeases = get "Leases"
Expand All @@ -74,7 +74,7 @@ type Logging() =
type Equinox.CosmosStore.CosmosStoreContext with

member x.LogConfiguration(role, databaseId: string, containerId: string) =
Log.Information("CosmosStore {role:l} {db}/{container} Tip maxEvents {maxEvents} maxSize {maxJsonLen} Query maxItems {queryMaxItems}",
Log.Information("CosmosStore {role:l} {database}/{container} Tip maxEvents {maxEvents} maxSize {maxJsonLen} Query maxItems {queryMaxItems}",
role, databaseId, containerId, x.TipOptions.MaxEvents, x.TipOptions.MaxJsonLength, x.QueryOptions.MaxItems)

type Equinox.CosmosStore.CosmosStoreClient with
Expand All @@ -89,7 +89,7 @@ type Logging() =
member private x.LogConfiguration(role, databaseId: string, containers: string[]) =
let o = x.Options
let timeout, retries429, timeout429 = o.RequestTimeout, o.MaxRetryAttemptsOnRateLimitedRequests, o.MaxRetryWaitTimeOnRateLimitedRequests
Log.Information("CosmosDB {role} {mode} {endpointUri} {db} {containers} timeout {timeout}s Throttling retries {retries}, max wait {maxRetryWaitTime}s",
Log.Information("CosmosDB {role} {mode} {endpointUri} {database}/{containers} timeout {timeout}s Retries {retries}<{maxRetryWaitTime}s",
role, o.ConnectionMode, x.Endpoint, databaseId, containers, timeout.TotalSeconds, retries429, let t = timeout429.Value in t.TotalSeconds)
member private x.CreateAndInitialize(role, databaseId, containers) =
x.LogConfiguration(role, databaseId, containers)
Expand Down
20 changes: 10 additions & 10 deletions tools/Propulsion.Tool/Sync.fs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type [<NoEquality; NoComparison; RequireSubcommand>] Parameters =
interface IArgParserTemplate with
member a.Usage = a |> function
| ConsumerGroupName _ -> "Projector instance context name. Optional if source is JSON"
| MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: File: 32768 Other: 2."
| MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: File: 32768 Other: 4."
| MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 8 (Cosmos: 16)."
| FromTail -> "(iff fresh projection) - force starting from present Position. Default: Ensure each and every event is projected from the start."
| Follow -> "Stop when the Tail is reached."
Expand All @@ -40,7 +40,7 @@ type [<NoEquality; NoComparison; RequireSubcommand>] Parameters =
"Default: assume events arrive from the changefeed (and/or the input JSON file) without any gaps or out of order deliveries for any stream."
| EventsOnly -> "Exclude Unfolds from processing. Default: Unfolds are read, parsed and processed"
| Categorize -> "Gather handler latency stats by category"
| MaxItems _ -> "Limits RU consumption when reading; impacts checkpointing granularity by adjusting the batch size being loaded from the feed. Default (Sync): 9999. Default: Unlimited"
| MaxItems _ -> "Limits RU consumption when reading; impacts checkpointing granularity by adjusting the batch size being loaded from the feed. Default (Sync): 9999. Default: 100 (CosmosClient default)"

| ExcSys -> "Exclude System streams. Default: Include Index Streams, identified by a $ prefix."
| IncCat _ -> "Allow Stream Category. Multiple values are combined with OR. Default: include all, subject to Category Deny and Stream Deny rules."
Expand All @@ -61,7 +61,7 @@ and Arguments(c, p: ParseResults<Parameters>) =
includeSystem = not (p.Contains ExcSys),
allowEts = p.GetResults IncEvent, denyEts = p.GetResults ExcEvent)
member val Categorize = p.Contains Categorize
member val IncludeUnfolds = not (p.Contains EventsOnly)
member val EventsOnly = p.Contains EventsOnly
member val Command =
match p.GetSubCommand() with
| Kafka a -> KafkaArguments(c, a) |> SubCommand.Kafka
Expand Down Expand Up @@ -122,7 +122,7 @@ and [<NoEquality; NoComparison; RequireSubcommand>] CosmosParameters =
| Container _ -> "specify a container name for store."
| LeaseContainerId _ -> "store leases in Sync target DB (default: use `-aux` adjacent to the Source Container). Enables the Source to be read via a ReadOnly connection string."
| Timeout _ -> "specify operation timeout in seconds. Default: 5."
| Retries _ -> "specify operation retries. Default: 0."
| Retries _ -> "specify operation retries. Default: 2."
| RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5."
| MaxKiB _ -> "specify maximum size in KiB to pass to the Sync stored proc (reduce if Malformed Streams due to 413 RequestTooLarge responses). Default: 128."
| From _ -> "Specify Source."
Expand All @@ -133,9 +133,9 @@ and CosmosArguments(c: Args.Configuration, p: ParseResults<CosmosParameters>) =
| Json _ -> p.GetResult Connection
| x -> p.Raise $"unexpected subcommand %A{x}"
let connector =
let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.seconds
let retries = p.GetResult(Retries, 1)
let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.seconds
let timeout = p.GetResult(Timeout, 5) |> TimeSpan.seconds
let retries = p.GetResult(Retries, 2)
let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5) |> TimeSpan.seconds
Equinox.CosmosStore.CosmosStoreConnector(Equinox.CosmosStore.Discovery.ConnectionString connection, timeout, retries, maxRetryWaitTime)
let database = match source.Store with
| Cosmos c -> p.GetResult(Database, fun () -> c.Database)
Expand Down Expand Up @@ -238,10 +238,10 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
let isFileSource = match a.Command.Source with Json _ -> true | _ -> false
let parse =
a.Filters.CreateStreamFilter()
|> if a.IncludeUnfolds then Propulsion.CosmosStore.EquinoxSystemTextJsonParser.eventsAndUnfoldsWhereStream
else Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream
|> if a.EventsOnly then Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream
else Propulsion.CosmosStore.EquinoxSystemTextJsonParser.eventsAndUnfoldsWhereStream
let statsInterval, stateInterval = a.StatsInterval, a.StateInterval
let maxReadAhead = p.GetResult(MaxReadAhead, if isFileSource then 32768 else 2)
let maxReadAhead = p.GetResult(MaxReadAhead, if isFileSource then 32768 else 4)
let maxConcurrentProcessors = p.GetResult(MaxWriters, 8)
let sink =
match a.Command with
Expand Down

0 comments on commit c6c18d3

Please sign in to comment.