Skip to content

Commit

Permalink
Add proArchiver, proPruner (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Nov 4, 2020
1 parent da160ca commit 513caf0
Show file tree
Hide file tree
Showing 14 changed files with 767 additions and 1 deletion.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- `proArchiver`: CosmosDB ChangeFeedProcessor that syncs events from specified Hot Container's categories into a Warm Container [#79](https:/jet/dotnet-templates/pull/79)
- `proPruner`: CosmosDB ChangeFeedProcessor that prunes events from a Hot Container based on Expiration status determined by inspecting the Warm version of the Events synced by the `proArchiver` [#79](https:/jet/dotnet-templates/pull/79)

### Changed

- Cleaned and moved `Logging` logic out to `Infrastructure.fs` [#76](https:/jet/dotnet-templates/pull/76)
- Polished `SemaphoreSlim` extensions [#79](https:/jet/dotnet-templates/pull/79)
- Polished `SemaphoreSlim` extensions

### Removed
### Fixed
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ The specific behaviors carried out in reaction to incoming events often use `Equ

- [`proSync`](propulsion-sync/README.md) - Boilerplate for a console app that that syncs events between [`Equinox.Cosmos` and `Equinox.EventStore` stores](https:/jet/equinox) using the [relevant `Propulsion`.* libraries](https:/jet/propulsion), filtering/enriching/mapping Events as necessary.

- [`proArchiver`](propulsion-archiver/README.md) - Boilerplate for a console app that that syncs Events from relevant Categories from a Hot container and to an associated warm [`Equinox.Cosmos` stores](https:/jet/equinox) archival container using the [relevant `Propulsion`.* libraries](https:/jet/propulsion).
- An Archiver is intended to run continually as an integral part of a production system.

- [`proPruner`](propulsion-pruner/README.md) - Boilerplate for a console app that that inspects Events from relevant Categories in an [`Equinox.Cosmos` store's](https:/jet/equinox) Hot container and uses that to drive the removal of (archived) Events that have Expired from the associated Hot Container using the [relevant `Propulsion`.* libraries](https:/jet/propulsion).

- While a Pruner does not consume a large amount of RU capacity from either the Hot or Warm Containers, running one continually is definitely optional; a Pruner only has a purpose when there are Expired events in the Hot Container; running periodically during low-load periods may be appropriate, depending on the lifetime profile of the events in your system

- Reducing the traversal frequency needs to be balanced against the primary goal of deleting from the Hot Container: preventing it splitting into multiple physical Ranges.

- It is necessary to reset the CFP checkpoint (delete the checkpoint documents, or use a new Consumer Group Name) to trigger a re-traversal if events have expired since the lsat time a traversal took place.

<a name="eqxShipping"></a>
- [`eqxShipping`](equinox-shipping/README.md) - Example demonstrating the implementation of a [Process Manager](https://www.enterpriseintegrationpatterns.com/patterns/messaging/ProcessManager.html) using [`Equinox`](https:/jet/equinox) that manages the enlistment of a set of `Shipment` Aggregate items into a separated `Container` Aggregate as an atomic operation. :pray: [@Kimserey](https:/Kimserey)

Expand Down
24 changes: 24 additions & 0 deletions dotnet-templates.sln
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain.Tests", "equinox-shi
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Watchdog.Integration", "equinox-shipping\Watchdog.Integration\Watchdog.Integration.fsproj", "{83BA87C3-6288-40F4-BC4F-EC3A54586CDF}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Archiver", "propulsion-archiver\Archiver.fsproj", "{B9976751-C3A6-4F8B-BEF4-278382D8EAA6}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "proArchiver", "proArchiver", "{BBD6F425-C2F4-4857-BD68-2B13D0B4EDDE}"
ProjectSection(SolutionItems) = preProject
propulsion-archiver\.template.config\template.json = propulsion-archiver\.template.config\template.json
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "proPruner", "proPruner", "{7F80222A-2687-4E91-8F5B-4717DD13DEF5}"
ProjectSection(SolutionItems) = preProject
propulsion-pruner\.template.config\template.json = propulsion-pruner\.template.config\template.json
EndProjectSection
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Pruner", "propulsion-pruner\Pruner.fsproj", "{A0FB44F5-15E5-47C8-81E5-1991269849CB}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -169,6 +183,14 @@ Global
{83BA87C3-6288-40F4-BC4F-EC3A54586CDF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{83BA87C3-6288-40F4-BC4F-EC3A54586CDF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{83BA87C3-6288-40F4-BC4F-EC3A54586CDF}.Release|Any CPU.Build.0 = Release|Any CPU
{B9976751-C3A6-4F8B-BEF4-278382D8EAA6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B9976751-C3A6-4F8B-BEF4-278382D8EAA6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B9976751-C3A6-4F8B-BEF4-278382D8EAA6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B9976751-C3A6-4F8B-BEF4-278382D8EAA6}.Release|Any CPU.Build.0 = Release|Any CPU
{A0FB44F5-15E5-47C8-81E5-1991269849CB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A0FB44F5-15E5-47C8-81E5-1991269849CB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A0FB44F5-15E5-47C8-81E5-1991269849CB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A0FB44F5-15E5-47C8-81E5-1991269849CB}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{F66A5BFE-7C81-44DC-97DE-3FD8C83B8F06} = {B72FFAAE-7801-41B2-86F5-FD90E97A30F7}
Expand All @@ -186,5 +208,7 @@ Global
{9AFF6138-B63B-4EBF-B86B-4F626E1F1ADF} = {DAE9E2B9-EDA2-4064-B0CE-FD5294549374}
{5A45EF21-576B-4B40-86BD-F5960ECD66BF} = {DAE9E2B9-EDA2-4064-B0CE-FD5294549374}
{83BA87C3-6288-40F4-BC4F-EC3A54586CDF} = {DAE9E2B9-EDA2-4064-B0CE-FD5294549374}
{B9976751-C3A6-4F8B-BEF4-278382D8EAA6} = {BBD6F425-C2F4-4857-BD68-2B13D0B4EDDE}
{A0FB44F5-15E5-47C8-81E5-1991269849CB} = {7F80222A-2687-4E91-8F5B-4717DD13DEF5}
EndGlobalSection
EndGlobal
19 changes: 19 additions & 0 deletions propulsion-archiver/.template.config/template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"$schema": "http://json.schemastore.org/template",
"author": "@jet @bartelink",
"classifications": [
"Propulsion",
"Equinox",
"CosmosDb",
"ChangeFeed",
"ChangeFeedProcessor"
],
"tags": {
"language": "F#"
},
"identity": "Propulsion.Template.Archive",
"name": "Propulsion Archiver service",
"shortName": "proArchiver",
"sourceName": "ArchiverTemplate",
"preferNameDirectory": true,
}
25 changes: 25 additions & 0 deletions propulsion-archiver/Archiver.fsproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<WarningLevel>5</WarningLevel>
</PropertyGroup>

<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="Handler.fs" />
<Compile Include="Program.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Argu" Version="6.0.0" />
<PackageReference Include="Destructurama.FSharp" Version="1.1.1-dev-00033" />
<PackageReference Include="Microsoft.Azure.DocumentDB.ChangeFeedProcessor" Version="2.3.2" />
<PackageReference Include="Microsoft.Azure.DocumentDB.Core" Version="2.11.6" />
<PackageReference Include="Propulsion.Cosmos" Version="2.9.0" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.4.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="Serilog.Sinks.Seq" Version="4.0.0" />
</ItemGroup>
</Project>
24 changes: 24 additions & 0 deletions propulsion-archiver/Handler.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
module ArchiverTemplate.Handler

type Stats(log, statsInterval, stateInterval) =
inherit Propulsion.Streams.Sync.Stats<unit>(log, statsInterval, stateInterval)

override __.HandleOk(()) = ()
override __.HandleExn(log, exn) = log.Information(exn, "Unhandled")

let (|Archivable|NotArchivable|) = function
// TODO define Categories that should be copied to the secondary Container
| "CategoryName" ->
Archivable
| _ ->
NotArchivable

let selectArchivable (changeFeedDocument: Microsoft.Azure.Documents.Document) : Propulsion.Streams.StreamEvent<_> seq = seq {
let s = changeFeedDocument.GetPropertyValue("p") |> string
if s.StartsWith("events-") then () else
for batch in Propulsion.Cosmos.EquinoxCosmosParser.enumStreamEvents changeFeedDocument do
let (FsCodec.StreamName.CategoryAndId (cat,_)) = batch.stream
match cat with
| Archivable -> yield batch
| NotArchivable -> ()
}
58 changes: 58 additions & 0 deletions propulsion-archiver/Infrastructure.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
[<AutoOpen>]
module ArchiverTemplate.Infrastructure

open Serilog
open System.Runtime.CompilerServices

[<Extension>]
type LoggerConfigurationExtensions() =

[<Extension>]
static member inline ExcludeChangeFeedProcessorV2InternalDiagnostics(c : LoggerConfiguration) =
let isCfp429a = Filters.Matching.FromSource("Microsoft.Azure.Documents.ChangeFeedProcessor.LeaseManagement.DocumentServiceLeaseUpdater").Invoke
let isCfp429b = Filters.Matching.FromSource("Microsoft.Azure.Documents.ChangeFeedProcessor.PartitionManagement.LeaseRenewer").Invoke
let isCfp429c = Filters.Matching.FromSource("Microsoft.Azure.Documents.ChangeFeedProcessor.PartitionManagement.PartitionLoadBalancer").Invoke
let isCfp429d = Filters.Matching.FromSource("Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.PartitionProcessor").Invoke
let isCfp x = isCfp429a x || isCfp429b x || isCfp429c x || isCfp429d x
c.Filter.ByExcluding(fun x -> isCfp x)

[<Extension>]
static member inline ConfigureChangeFeedProcessorLogging(c : LoggerConfiguration, verbose : bool) =
// LibLog writes to the global logger, so we need to control the emission
let cfpl = if verbose then Events.LogEventLevel.Debug else Events.LogEventLevel.Warning
c.MinimumLevel.Override("Microsoft.Azure.Documents.ChangeFeedProcessor", cfpl)
|> fun c -> if verbose then c else c.ExcludeChangeFeedProcessorV2InternalDiagnostics()

[<Extension>]
type Logging() =

[<Extension>]
static member Configure(configuration : LoggerConfiguration, verbose, (logSyncToConsole, minRu), cfpVerbose) =
configuration
.Destructure.FSharpTypes()
.Enrich.FromLogContext()
|> fun c -> if verbose then c.MinimumLevel.Debug() else c
|> fun c -> c.ConfigureChangeFeedProcessorLogging(cfpVerbose)
|> fun c -> let ingesterLevel = if logSyncToConsole then Events.LogEventLevel.Debug else Events.LogEventLevel.Information
c.MinimumLevel.Override(typeof<Propulsion.Streams.Scheduling.StreamSchedulingEngine>.FullName, ingesterLevel)
|> fun c -> let generalLevel = if verbose then Events.LogEventLevel.Information else Events.LogEventLevel.Warning
c.MinimumLevel.Override(typeof<Propulsion.Cosmos.Internal.Writer.Result>.FullName, generalLevel)
|> fun c ->
let t = "[{Timestamp:HH:mm:ss} {Level:u3}] {partitionKeyRangeId,2} {Message:lj} {Properties}{NewLine}{Exception}"
let t = if verbose then t else t.Replace("{Properties}", "")
let configure (a : Configuration.LoggerSinkConfiguration) : unit =
a.Logger(fun l ->
l.WriteTo.Sink(Equinox.Cosmos.Store.Log.InternalMetrics.Stats.LogSink()) |> ignore) |> ignore
a.Logger(fun l ->
let isEqx = Filters.Matching.FromSource<Equinox.Cosmos.Core.Context>().Invoke
let isWriterB = Filters.Matching.FromSource<Propulsion.Cosmos.Internal.Writer.Result>().Invoke
let l = if logSyncToConsole then l else l.Filter.ByExcluding(fun x -> isEqx x || isWriterB x)
let isCheaperThan minRu = function
| Equinox.Cosmos.Store.Log.InternalMetrics.Stats.CosmosMetric
(Equinox.Cosmos.Store.Log.Event.SyncSuccess m | Equinox.Cosmos.Store.Log.Event.SyncConflict m) ->
m.ru < minRu
| _ -> false
let l = match minRu with Some mru -> l.Filter.ByExcluding(fun x -> isCheaperThan mru x) | None -> l
l.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t) |> ignore)
|> ignore
c.WriteTo.Async(bufferSize=65536, blockWhenFull=true, configure=System.Action<_> configure)
Loading

0 comments on commit 513caf0

Please sign in to comment.