Skip to content

Commit

Permalink
Add feedApi, feedConsumer templates (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored May 12, 2021
1 parent 057adbf commit b07d244
Show file tree
Hide file tree
Showing 26 changed files with 1,328 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `eqxPatterns`: `Period`: Skeleton Deciders+Tests for `Period` with Rolling Balance [#89](https:/jet/dotnet-templates/pull/89)
- `eqxPatterns`: `Series`+`Epoch`: Skeleton Deciders+Tests for deduplicated ingestion of items [#89](https:/jet/dotnet-templates/pull/89)
- `eqxProjector --source cosmos --kafka --synthesizeSequence`: Sample code for custom parsing of document changes [#84](https:/jet/dotnet-templates/pull/84)
- `feedApi`: Template for a Web API serving a feed that can be tracked by a `feedConsumer` [#88](https:/jet/dotnet-templates/pull/88)
- `feedConsumer`: Template for Feed Consumer tracking a `feedApi` [#88](https:/jet/dotnet-templates/pull/88)

### Changed

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ The specific behaviors carried out in reaction to incoming events often use `Equ

**NOTE At present, checkpoint storage when projecting from EventStore uses Azure CosmosDB - help wanted ;)**

- [`feedApi`](feed-source/) - Boilerplate for an ASP.NET Core Web Api serving a feed of items stashed in an `Equinox.CosmosStore`. See `dotnet new feedConsumer` for the associated consumption logic
- [`feedConsumer`](feed-consumer/) - Boilerplate for a service consuming a feed of items served by `dotnet new feedApi` using [`Propulsion.Feed`](https:/jet/propulsion)

- [`summaryConsumer`](propulsion-summary-consumer/README.md) - Boilerplate for an Apache Kafka Consumer using [`Propulsion.Kafka`](https:/jet/propulsion) to ingest versioned summaries produced by a `dotnet new proReactor --kafka`

- [`trackingConsumer`](propulsion-tracking-consumer/README.md) - Boilerplate for an Apache Kafka Consumer using [`Propulsion.Kafka`](https:/jet/propulsion) to ingest accumulating changes in an `Equinox.Cosmos` store idempotently.
Expand Down
31 changes: 31 additions & 0 deletions dotnet-templates.sln
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,22 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "equinox-patterns\
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain.Tests", "equinox-patterns\Domain.Tests\Domain.Tests.fsproj", "{C899EB07-FEC8-41F8-95DC-203DA80F5A32}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "feedApi", "feedApi", "{CF54228E-4DC5-4CE4-B5BC-B9A5D438B03F}"
ProjectSection(SolutionItems) = preProject
feed-source\.template.config\template.json = feed-source\.template.config\template.json
EndProjectSection
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FeedApi", "feed-source\FeedApi\FeedApi.fsproj", "{28B7FEDA-9F91-49D3-990B-1E4D7B71932A}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "feed-source\Domain\Domain.fsproj", "{28EBD7E4-6130-4444-B44F-BB741172A233}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "feedConsumer", "feedConsumer", "{EDC5EB0D-EE6D-489A-B0B0-DA85ECDF04B2}"
ProjectSection(SolutionItems) = preProject
feed-consumer\.template.config\template.json = feed-consumer\.template.config\template.json
EndProjectSection
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "FeedConsumer", "feed-consumer\FeedConsumer.fsproj", "{FE141E84-1AA3-4136-BD7C-318AE7BD84A5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -209,6 +225,18 @@ Global
{C899EB07-FEC8-41F8-95DC-203DA80F5A32}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C899EB07-FEC8-41F8-95DC-203DA80F5A32}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C899EB07-FEC8-41F8-95DC-203DA80F5A32}.Release|Any CPU.Build.0 = Release|Any CPU
{28B7FEDA-9F91-49D3-990B-1E4D7B71932A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{28B7FEDA-9F91-49D3-990B-1E4D7B71932A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{28B7FEDA-9F91-49D3-990B-1E4D7B71932A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{28B7FEDA-9F91-49D3-990B-1E4D7B71932A}.Release|Any CPU.Build.0 = Release|Any CPU
{28EBD7E4-6130-4444-B44F-BB741172A233}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{28EBD7E4-6130-4444-B44F-BB741172A233}.Debug|Any CPU.Build.0 = Debug|Any CPU
{28EBD7E4-6130-4444-B44F-BB741172A233}.Release|Any CPU.ActiveCfg = Release|Any CPU
{28EBD7E4-6130-4444-B44F-BB741172A233}.Release|Any CPU.Build.0 = Release|Any CPU
{FE141E84-1AA3-4136-BD7C-318AE7BD84A5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FE141E84-1AA3-4136-BD7C-318AE7BD84A5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FE141E84-1AA3-4136-BD7C-318AE7BD84A5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FE141E84-1AA3-4136-BD7C-318AE7BD84A5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{F66A5BFE-7C81-44DC-97DE-3FD8C83B8F06} = {B72FFAAE-7801-41B2-86F5-FD90E97A30F7}
Expand All @@ -230,5 +258,8 @@ Global
{A0FB44F5-15E5-47C8-81E5-1991269849CB} = {7F80222A-2687-4E91-8F5B-4717DD13DEF5}
{8D9867A9-1B5D-4AD3-A890-ACC81D011C00} = {76721F1E-851C-4970-A276-DF61FCE3DA23}
{C899EB07-FEC8-41F8-95DC-203DA80F5A32} = {76721F1E-851C-4970-A276-DF61FCE3DA23}
{28B7FEDA-9F91-49D3-990B-1E4D7B71932A} = {CF54228E-4DC5-4CE4-B5BC-B9A5D438B03F}
{28EBD7E4-6130-4444-B44F-BB741172A233} = {CF54228E-4DC5-4CE4-B5BC-B9A5D438B03F}
{FE141E84-1AA3-4136-BD7C-318AE7BD84A5} = {EDC5EB0D-EE6D-489A-B0B0-DA85ECDF04B2}
EndGlobalSection
EndGlobal
2 changes: 1 addition & 1 deletion equinox-patterns/Domain/Period.fs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Result<'request, 'result> =
/// 1. Streams must open with a BroughtForward event (obtained via Rules.getIncomingBalance if this is an uninitialized Period)
/// 2. (If the Period has not closed) Rules.decide gets to map the request to events and a residual
/// 3. Rules.decideCarryForward may trigger the closing of the Period based on the residual and/or the State by emitting Some balance
let decideIngestWithCarryForward rules req s : Async<Result<'result, 'req> * Events.Event list> = async {
let decideIngestWithCarryForward rules req s : Async<Result<'req, 'result> * Events.Event list> = async {
let acc = Accumulator(s, Fold.fold)
do! acc.TransactAsync(Fold.maybeOpen rules.getIncomingBalance)
let residual, result = acc.Transact(Fold.tryIngest rules.decideIngestion req)
Expand Down
11 changes: 6 additions & 5 deletions equinox-web/Web/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ let createWebHostBuilder args : IWebHostBuilder =
[<EntryPoint>]
let main argv =
try Log.Logger <- LoggerConfiguration().Configure().CreateLogger()
createWebHostBuilder(argv).Build().Run()
0
with e ->
eprintfn "%s" e.Message
1
try createWebHostBuilder(argv).Build().Run()
0
with e ->
Log.Fatal(e, "Application Startup failed")
1
finally Log.CloseAndFlush()
19 changes: 19 additions & 0 deletions feed-consumer/.template.config/template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"$schema": "http://json.schemastore.org/template",
"author": "@jet @bartelink",
"classifications": [
"Equinox",
"Propulsion",
"Event Sourcing",
"Feed"
],
"tags": {
"language": "F#",
"type": "project"
},
"identity": "FeedConsumer",
"name": "Propulsion FeedConsumer",
"shortName": "feedConsumer",
"sourceName": "FeedConsumerTemplate",
"preferNameDirectory": true
}
83 changes: 83 additions & 0 deletions feed-consumer/ApiClient.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
module FeedConsumerTemplate.ApiClient

open FSharp.UMX
open System.Net.Http

open FeedConsumerTemplate.Domain

(* The feed presents a Tranche (series of epochs) per FC *)

module TrancheId =

let toFcId (x : Propulsion.Feed.TrancheId) : FcId = %x
let ofFcId (x : FcId) : Propulsion.Feed.TrancheId = %x

type TicketsEpochId = int<ticketsEpochId>
and [<Measure>] ticketsEpochId

[<NoComparison; NoEquality>]
type TicketsTranchesDto = { activeEpochs : TrancheReferenceDto[] }
and TrancheReferenceDto = { fc : FcId; epochId : TicketsEpochId }

(* Each Tranche response includes a checkpoint, which can be presented to Poll in order to resume consumption *)

type TicketsCheckpoint = int64<ticketsCheckpoint>
and [<Measure>] ticketsCheckpoint
module TicketsCheckpoint =
let ofPosition (x : Propulsion.Feed.Position) : TicketsCheckpoint = %x
let toPosition (x : TicketsCheckpoint) : Propulsion.Feed.Position = %x
let toStreamIndex (x : TicketsCheckpoint) : int64 = %x

type SliceDto = { closed : bool; tickets : TicketId[]; position : TicketsCheckpoint; checkpoint : TicketsCheckpoint }

type Session(client: HttpClient) =

member _.Send(req : HttpRequestMessage) : Async<HttpResponseMessage> =
client.Send(req)

type TicketsClient(session: Session) =

let basePath = "api/tickets"

member _.ActiveFcs() : Async<FcId[]> = async {
let request = HttpReq.get () |> HttpReq.withPath basePath
let! response = session.Send request
let! body = response |> HttpRes.deserializeOkJsonNet<TicketsTranchesDto>
return [| for f in body.activeEpochs -> f.fc |]
}

member _.ReadPage(fc : FcId, index : int) : Async<SliceDto> = async {
let request = HttpReq.post () |> HttpReq.withPathf "%s/%O/%d" basePath fc index
let! response = session.Send request
return! response |> HttpRes.deserializeOkJsonNet<SliceDto>
}

member _.Poll(fc : FcId, checkpoint: TicketsCheckpoint) : Async<SliceDto> = async {
let request = HttpReq.create () |> HttpReq.withPathf "%s/%O/slice/%O" basePath fc checkpoint
let! response = session.Send request
return! response |> HttpRes.deserializeOkJsonNet<SliceDto>
}

type Session with

member session.Tickets = TicketsClient session

type TicketsFeed(baseUri) =

let client = new HttpClient(BaseAddress = baseUri)
let tickets = Session(client).Tickets

// TODO add retries - consumer loop will abort if this throws
member _.Poll(trancheId, pos) : Async<Propulsion.Feed.Page<byte[]>> = async {
let checkpoint = TicketsCheckpoint.ofPosition pos
let! pg = tickets.Poll(TrancheId.toFcId trancheId, checkpoint)
let baseIndex = TicketsCheckpoint.toStreamIndex pg.position
let items = pg.tickets |> Array.mapi (fun i -> Ingester.PipelineEvent.ofIndexAndTicketId (baseIndex + int64 i))
return { checkpoint = TicketsCheckpoint.toPosition pg.checkpoint; items = items; isTail = not pg.closed }
}

// TODO add retries - consumer loop will not commence if this emits an exception
member _.ReadTranches() : Async<Propulsion.Feed.TrancheId[]> = async {
let! activeFcs = tickets.ActiveFcs()
return [| for f in activeFcs -> TrancheId.ofFcId f |]
}
28 changes: 28 additions & 0 deletions feed-consumer/FeedConsumer.fsproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<WarningLevel>5</WarningLevel>
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
</PropertyGroup>

<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="Types.fs" />
<Compile Include="Ingester.fs" />
<Compile Include="ApiClient.fs" />
<Compile Include="Program.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="FSharp.Core" Version="4.7.1" />
<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.1.1-dev-00033" />
<PackageReference Include="Propulsion.Feed" Version="[2.10.0-rc10]" />
<PackageReference Include="Equinox.CosmosStore" Version="3.0.0-beta.4" />
<PackageReference Include="Propulsion.CosmosStore" Version="[2.10.0-rc10]" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
</ItemGroup>

</Project>
Loading

0 comments on commit b07d244

Please sign in to comment.