Skip to content

Commit

Permalink
Default Lag Reporting Frequency of Cosmos consumers to 1m (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
ragiano215 authored Aug 2, 2021
1 parent 1f2d065 commit 8d9337f
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 26 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added
### Changed

- Added defaulting of 1 min for lag reporting frequency to all Cosmos consumers [#95](https:/jet/dotnet-templates/pull/95)

### Removed
### Fixed

Expand Down Expand Up @@ -548,4 +551,4 @@ The `Unreleased` section name is replaced by the expected version of next releas
[2.1.2]: https:/jet/dotnet-templates/compare/2.0.0...2.1.2
[2.0.0]: https:/jet/dotnet-templates/compare/1.2.0...2.0.0
[1.2.0]: https:/jet/dotnet-templates/compare/1.1.1...1.2.0
[1.1.1]: https:/jet/dotnet-templates/compare/1061b32ff1d86633e4adb0ce591992aea9c48c1e...1.1.1
[1.1.1]: https:/jet/dotnet-templates/compare/1061b32ff1d86633e4adb0ce591992aea9c48c1e...1.1.1
9 changes: 5 additions & 4 deletions equinox-shipping/Watchdog/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ module Args =
Log.Information("Monitoring Group {processorName} in Database {db} Container {container} with maximum document count limited to {maxDocuments}",
x.ConsumerGroupName, leases.Database.Id, leases.Id, Option.toNullable srcC.MaxDocuments)
if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.")
srcC.LagFrequency |> Option.iter<TimeSpan> (fun i -> Log.Information("ChangeFeed Lag stats interval {lagS:n0}s", i.TotalSeconds))
Log.Information("ChangeFeed Lag stats interval {lagS:n0}s", let f = srcC.LagFrequency in f.TotalSeconds)
let storeClient, monitored = srcC.ConnectStoreAndMonitored()
(storeClient, monitored, leases, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency)

and [<NoEquality; NoComparison>] CosmosSourceParameters =
| [<AltCommandLine "-Z"; Unique>] FromTail
| [<AltCommandLine "-md"; Unique>] MaxDocuments of int
Expand All @@ -80,7 +81,7 @@ module Args =
member a.Usage = a |> function
| FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event."
| MaxDocuments _ -> "maximum item count to request from feed. Default: unlimited"
| LagFreqM _ -> "frequency (in minutes) to dump lag stats. Default: off"
| LagFreqM _ -> "frequency (in minutes) to dump lag stats. Default: 1"
| LeaseContainer _ -> "specify Container Name for Leases container. Default: `sourceContainer` + `-aux`."

| ConnectionMode _ -> "override the connection mode. Default: Direct."
Expand All @@ -103,7 +104,7 @@ module Args =

member val FromTail = a.Contains CosmosSourceParameters.FromTail
member val MaxDocuments = a.TryGetResult MaxDocuments
member val LagFrequency = a.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes
member val LagFrequency : TimeSpan = a.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes
member val LeaseContainerId = a.TryGetResult CosmosSourceParameters.LeaseContainer
member private _.ConnectLeases containerId = connector.CreateUninitialized(database, containerId)
member x.ConnectLeases() = match x.LeaseContainerId with
Expand Down Expand Up @@ -152,7 +153,7 @@ let build (args : Args.Arguments) =
let pipeline =
let log = Log.ForContext<CosmosStoreSource>()
use observer = CosmosStoreSource.CreateObserver(log, watchdogSink.StartIngester, Seq.collect Handler.transformOrFilter)
CosmosStoreSource.Run(log, monitored, leases, processorName, observer, startFromTail, ?maxDocuments=maxDocuments, ?lagReportFreq=lagFrequency)
CosmosStoreSource.Run(log, monitored, leases, processorName, observer, startFromTail, ?maxDocuments=maxDocuments, lagReportFreq=lagFrequency)
watchdogSink, pipeline

let run args = async {
Expand Down
8 changes: 4 additions & 4 deletions propulsion-archiver/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ module Args =
Log.Information("Monitoring Group {processorName} in Database {db} Container {container} with maximum document count limited to {maxDocuments}",
x.ConsumerGroupName, leases.Database.Id, leases.Id, Option.toNullable srcC.MaxDocuments)
if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.")
srcC.LagFrequency |> Option.iter<TimeSpan> (fun i -> Log.Information("ChangeFeed Lag stats interval {lagS:n0}s", i.TotalSeconds))
Log.Information("ChangeFeed Lag stats interval {lagS:n0}s", let f = srcC.LagFrequency in f.TotalSeconds)
let monitored = srcC.MonitoredContainer()
(monitored, leases, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency)
and [<NoEquality; NoComparison>] CosmosSourceParameters =
Expand All @@ -99,7 +99,7 @@ module Args =
member a.Usage = a |> function
| FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event."
| MaxDocuments _ -> "maximum item count to request from feed. Default: unlimited"
| LagFreqM _ -> "frequency (in minutes) to dump lag stats. Default: off"
| LagFreqM _ -> "frequency (in minutes) to dump lag stats. Default: 1"
| LeaseContainer _ -> "specify Container Name for Leases container. Default: `sourceContainer` + `-aux`."

| ConnectionMode _ -> "override the connection mode. Default: Direct."
Expand All @@ -124,7 +124,7 @@ module Args =

member val FromTail = a.Contains CosmosSourceParameters.FromTail
member val MaxDocuments = a.TryGetResult MaxDocuments
member val LagFrequency = a.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes
member val LagFrequency : TimeSpan = a.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes
member val LeaseContainer = a.TryGetResult CosmosSourceParameters.LeaseContainer
member private _.ConnectLeases containerId = connector.CreateUninitialized(database, containerId)
member x.ConnectLeases() = match x.LeaseContainer with
Expand Down Expand Up @@ -191,7 +191,7 @@ let build (args : Args.Arguments, log, storeLog : ILogger) =
let pipeline =
use observer = CosmosStoreSource.CreateObserver(log, archiverSink.StartIngester, Seq.collect Handler.selectArchivable)
let monitored, leases, processorName, startFromTail, maxDocuments, lagFrequency = args.MonitoringParams()
CosmosStoreSource.Run(log, monitored, leases, processorName, observer, startFromTail, ?maxDocuments=maxDocuments, ?lagReportFreq=lagFrequency)
CosmosStoreSource.Run(log, monitored, leases, processorName, observer, startFromTail, ?maxDocuments=maxDocuments, lagReportFreq=lagFrequency)
archiverSink, pipeline

// A typical app will likely have health checks etc, implying the wireup would be via `UseMetrics()` and thus not use this ugly code directly
Expand Down
8 changes: 4 additions & 4 deletions propulsion-projector/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ module Args =
| CfpVerbose -> "request Verbose Logging from ChangeFeedProcessor. Default: off"
| FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event."
| MaxDocuments _ -> "maximum document count to supply for the Change Feed query. Default: use response size limit"
| LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: off"
| LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1"

| ConnectionMode _ -> "override the connection mode. Default: Direct."
| Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)"
Expand All @@ -85,7 +85,7 @@ module Args =
member val CfpVerbose = a.Contains CfpVerbose
member val FromTail = a.Contains FromTail
member val MaxDocuments = a.TryGetResult MaxDocuments
member val LagFrequency = a.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes
member val LagFrequency = a.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes
member val LeaseContainerId = a.TryGetResult LeaseContainer
member private x.ConnectLeases containerId = connector.CreateUninitialized(database, containerId)
member x.ConnectLeases() = match x.LeaseContainerId with
Expand Down Expand Up @@ -313,7 +313,7 @@ module Args =
Log.Information("Monitoring Group {processorName} in Database {db} Container {container} with maximum document count limited to {maxDocuments}",
x.ConsumerGroupName, leases.Database.Id, leases.Id, Option.toNullable srcC.MaxDocuments)
if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.")
srcC.LagFrequency |> Option.iter<TimeSpan> (fun i -> Log.Information("ChangeFeed Lag stats interval {lagS:n0}s", i.TotalSeconds))
Log.Information("ChangeFeed Lag stats interval {lagS:n0}s", let f = srcC.LagFrequency in f.TotalSeconds)
let monitored = srcC.MonitoredContainer()
(monitored, leases, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency)
#endif
Expand Down Expand Up @@ -394,7 +394,7 @@ let build (args : Args.Arguments) =
let pipeline =
use observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, Handler.mapToStreamItems)
let monitored, leases, processorName, startFromTail, maxDocuments, lagFrequency = args.MonitoringParams()
Propulsion.CosmosStore.CosmosStoreSource.Run(Log.Logger, monitored, leases, processorName, observer, startFromTail, ?maxDocuments=maxDocuments, ?lagReportFreq=lagFrequency)
Propulsion.CosmosStore.CosmosStoreSource.Run(Log.Logger, monitored, leases, processorName, observer, startFromTail, ?maxDocuments=maxDocuments, lagReportFreq=lagFrequency)
#endif // cosmos
#if esdb
let srcE, context, spec = args.BuildEventStoreParams()
Expand Down
8 changes: 4 additions & 4 deletions propulsion-pruner/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ module Args =
Log.Information("Monitoring Group {leaseId} in Database {db} Container {container} with maximum document count limited to {maxDocuments}",
x.ConsumerGroupName, leases.Database.Id, leases.Id, Option.toNullable srcC.MaxDocuments)
if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.")
srcC.LagFrequency |> Option.iter<TimeSpan> (fun i -> Log.Information("ChangeFeed Lag stats interval {lagS:n0}s", i.TotalSeconds))
Log.Information("ChangeFeed Lag stats interval {lagS:n0}s", let f = srcC.LagFrequency in f.TotalSeconds)
let monitored = srcC.MonitoredContainer()
(monitored, leases, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency)
and [<NoEquality; NoComparison>] CosmosSourceParameters =
Expand All @@ -92,7 +92,7 @@ module Args =
member a.Usage = a |> function
| FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event."
| MaxDocuments _ -> "maximum item count to request from feed. Default: unlimited"
| LagFreqM _ -> "frequency (in minutes) to dump lag stats. Default: off"
| LagFreqM _ -> "frequency (in minutes) to dump lag stats. Default: 1"
| LeaseContainer _ -> "specify Container Name for Leases container. Default: `sourceContainer` + `-aux`."

| ConnectionMode _ -> "override the connection mode. Default: Direct."
Expand All @@ -117,7 +117,7 @@ module Args =

member val FromTail = a.Contains CosmosSourceParameters.FromTail
member val MaxDocuments = a.TryGetResult MaxDocuments
member val LagFrequency = a.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes
member val LagFrequency : TimeSpan = a.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes
member val LeaseContainerId = a.TryGetResult CosmosSourceParameters.LeaseContainer
member x.ConnectLeases containerId = connector.CreateUninitialized(x.DatabaseId, containerId)

Expand Down Expand Up @@ -188,7 +188,7 @@ let build (args : Args.Arguments, log : ILogger, storeLog : ILogger) =
let pipeline =
use observer = CosmosStoreSource.CreateObserver(log.ForContext<CosmosStoreSource>(), deletingEventsSink.StartIngester, Seq.collect Handler.selectPrunable)
let monitored, leases, processorName, startFromTail, maxDocuments, lagFrequency = args.MonitoringParams()
CosmosStoreSource.Run(log, monitored, leases, processorName, observer, startFromTail, ?maxDocuments=maxDocuments, ?lagReportFreq=lagFrequency)
CosmosStoreSource.Run(log, monitored, leases, processorName, observer, startFromTail, ?maxDocuments=maxDocuments, lagReportFreq=lagFrequency)
deletingEventsSink, pipeline

// A typical app will likely have health checks etc, implying the wireup would be via `UseMetrics()` and thus not use this ugly code directly
Expand Down
10 changes: 5 additions & 5 deletions propulsion-reactor/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ module Args =
Log.Information("Monitoring Group {processorName} in Database {db} Container {container} with maximum document count limited to {maxDocuments}",
x.ConsumerGroupName, srcC.DatabaseId, srcC.ContainerId, Option.toNullable srcC.MaxDocuments)
if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.")
srcC.LagFrequency |> Option.iter<TimeSpan> (fun i -> Log.Information("ChangeFeed Lag stats interval {lagS:n0}s", i.TotalSeconds))
Log.Information("ChangeFeed Lag stats interval {lagS:n0}s", let f = srcC.LagFrequency in f.TotalSeconds)
let storeClient, monitored = srcC.ConnectStoreAndMonitored()
let context = CosmosStoreContext.create storeClient
#if changeFeedOnly
Expand Down Expand Up @@ -184,7 +184,7 @@ module Args =
| Broker _ -> "specify Kafka Broker, in host:port format. (optional if environment variable PROPULSION_KAFKA_BROKER specified)"
| Topic _ -> "specify Kafka Topic Id. (optional if environment variable PROPULSION_KAFKA_TOPIC specified)"
| MaxInflightMb _ -> "maximum MiB of data to read ahead. Default: 10."
| LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: off."
| LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1."
#if (kafka && blank)
| Kafka _ -> "Kafka Source parameters."
#else
Expand Down Expand Up @@ -231,7 +231,7 @@ module Args =
member a.Usage = a |> function
| FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event."
| MaxDocuments _ -> "maximum item count to request from feed. Default: unlimited"
| LagFreqM _ -> "frequency (in minutes) to dump lag stats. Default: off"
| LagFreqM _ -> "frequency (in minutes) to dump lag stats. Default: 1"
| LeaseContainer _ -> "specify Container Name for Leases container. Default: `sourceContainer` + `-aux`."

| ConnectionMode _ -> "override the connection mode. Default: Direct."
Expand Down Expand Up @@ -259,7 +259,7 @@ module Args =

member val FromTail = a.Contains CosmosSourceParameters.FromTail
member val MaxDocuments = a.TryGetResult MaxDocuments
member val LagFrequency = a.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes
member val LagFrequency : TimeSpan = a.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes
member val private LeaseContainerId = a.TryGetResult CosmosSourceParameters.LeaseContainer
member private x.ConnectLeases containerId = connector.CreateUninitialized(x.DatabaseId, containerId)
member x.ConnectLeases() = match x.LeaseContainerId with
Expand Down Expand Up @@ -630,7 +630,7 @@ let build (args : Args.Arguments) =
#endif
let pipeline =
use observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, mapToStreamItems)
Propulsion.CosmosStore.CosmosStoreSource.Run(Log.Logger, monitored, leases, processorName, observer, startFromTail, ?maxDocuments=maxDocuments, ?lagReportFreq=lagFrequency)
Propulsion.CosmosStore.CosmosStoreSource.Run(Log.Logger, monitored, leases, processorName, observer, startFromTail, ?maxDocuments=maxDocuments, lagReportFreq=lagFrequency)
sink, pipeline
#endif // !kafkaEventSpans

Expand Down
8 changes: 4 additions & 4 deletions propulsion-sync/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ module Args =
Log.Information("Monitoring Group {leaseId} in Database {db} Container {container} with maximum document count limited to {maxDocuments}",
x.ConsumerGroupName, leases.Database.Id, leases.Id, srcC.MaxDocuments)
if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.")
srcC.LagFrequency |> Option.iter<TimeSpan> (fun i -> Log.Information("ChangeFeed Lag stats interval {lagS:n0}s", i.TotalSeconds))
Log.Information("ChangeFeed Lag stats interval {lagS:n0}s", let f = srcC.LagFrequency in f.TotalSeconds)
let monitored = srcC.MonitoredContainer()
Choice1Of2 (monitored, leases, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency)
| Choice2Of2 srcE ->
Expand Down Expand Up @@ -165,7 +165,7 @@ module Args =
member a.Usage = a |> function
| FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event."
| MaxDocuments _ -> "maximum item count to request from feed. Default: unlimited"
| LagFreqM _ -> "frequency (in minutes) to dump lag stats. Default: off"
| LagFreqM _ -> "frequency (in minutes) to dump lag stats. Default: 1"
| LeaseContainer _ -> "specify Container Name for Leases container. Default: `sourceContainer` + `-aux`."

| ConnectionMode _ -> "override the connection mode. Default: Direct."
Expand All @@ -191,7 +191,7 @@ module Args =

member val FromTail = a.Contains CosmosSourceParameters.FromTail
member val MaxDocuments = a.TryGetResult MaxDocuments
member val LagFrequency = a.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes
member val LagFrequency : TimeSpan = a.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes
member val LeaseContainerId = a.TryGetResult CosmosSourceParameters.LeaseContainer
member private _.ConnectLeases containerId = connector.CreateUninitialized(database, containerId)
member x.ConnectLeases() = match x.LeaseContainerId with
Expand Down Expand Up @@ -537,7 +537,7 @@ let build (args : Args.Arguments, log, storeLog : ILogger) =
let runPipeline =
Propulsion.CosmosStore.CosmosStoreSource.Run(
Log.Logger, monitored, leases, processorName, observer, startFromTail,
?maxDocuments=maxDocuments, ?lagReportFreq=lagFrequency)
?maxDocuments=maxDocuments, lagReportFreq=lagFrequency)
sink, runPipeline
| Choice2Of2 (srcE, checkpointsContext, spec) ->
match maybeDstCosmos with
Expand Down

0 comments on commit 8d9337f

Please sign in to comment.