From 397ba84a0372777603d741ec19d4118ca4978334 Mon Sep 17 00:00:00 2001 From: Rambert Yan Date: Wed, 21 Jul 2021 21:27:35 -0400 Subject: [PATCH] Corrected last bits for 1-container set up and wiring in `build`. --- propulsion-cosmos-reactor/Program.fs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/propulsion-cosmos-reactor/Program.fs b/propulsion-cosmos-reactor/Program.fs index fcb1773a6..f58066708 100644 --- a/propulsion-cosmos-reactor/Program.fs +++ b/propulsion-cosmos-reactor/Program.fs @@ -55,9 +55,8 @@ module Args = x.ConsumerGroupName, srcC.DatabaseId, srcC.ContainerId, Option.toNullable srcC.MaxDocuments) if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.") srcC.LagFrequency |> Option.iter (fun i -> Log.Information("ChangeFeed Lag stats interval {lagS:n0}s", i.TotalSeconds)) - let storeClient, monitored = srcC.ConnectStoreAndMonitored() - let context = CosmosStoreContext.create storeClient - (srcC, context, monitored, leases, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency) + let monitored = srcC.ConnectMonitored() + (srcC, monitored, leases, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency) and [] CosmosSourceParameters = | [] FromTail | [] MaxDocuments of int @@ -101,11 +100,13 @@ module Args = member val MaxDocuments = a.TryGetResult MaxDocuments member val LagFrequency = a.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes member val private LeaseContainerId = a.TryGetResult CosmosSourceParameters.LeaseContainer + member private x.ConnectLeases containerId = connector.CreateUninitialized(x.DatabaseId, containerId) member x.ConnectLeases() = match x.LeaseContainerId with | None -> x.ConnectLeases(x.ContainerId + "-aux") | Some sc -> x.ConnectLeases(sc) - member x.ConnectStoreAndMonitored() = connector.ConnectStoreAndMonitored(x.DatabaseId, x.ContainerId) + member x.ConnectMonitored() = connector.ConnectMonitored(x.DatabaseId, x.ContainerId) + member x.Connect() = connector.ConnectStore("Main", x.DatabaseId, x.ContainerId) /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args let parse tryGetConfigValue argv : Arguments = @@ -116,9 +117,9 @@ module Args = let [] AppName = "ReactorTemplate" let build (args : Args.Arguments) = - let source, context, monitored, leases, processorName, startFromTail, maxDocuments, lagFrequency = args.SourceParams() + let source, monitored, leases, processorName, startFromTail, maxDocuments, lagFrequency = args.SourceParams() - let context = source.Cosmos.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create + let context = source.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create let cache = Equinox.Cache(AppName, sizeMb=10) let srcService = Todo.Cosmos.create (context, cache) let dstService = TodoSummary.Cosmos.create (context, cache)