Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FsCodec.StreamName #31

Merged
merged 1 commit into from
Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
### Added

- Tutorial and Documentation re `IEventCodec` [#29](https:/jet/FsCodec/pull/29)
- `StreamName`, with associated helper module [#31](https:/jet/FsCodec/pull/31)

### Changed

Expand Down
167 changes: 124 additions & 43 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The components within this repository are delivered as multi-targeted Nuget pack
- No dependencies.
- [`FsCodec.IEventCodec`](https:/jet/FsCodec/blob/master/src/FsCodec/FsCodec.fs#L19): defines a base interface for serializers.
- [`FsCodec.Codec`](https:/jet/FsCodec/blob/master/src/FsCodec/Codec.fs#L5): enables plugging in a serializer and/or Union Encoder of your choice (typically this is used to supply a pair `encode` and `tryDecode` functions)
- [`FsCodec.StreamName`](https:/jet/FsCodec/blob/master/src/FsCodec/StreamName.fs): strongly-typed wrapper for a Stream Name, together with factory functions and active patterns for parsing same
- [![Newtonsoft.Json Codec NuGet](https://img.shields.io/nuget/v/FsCodec.NewtonsoftJson.svg)](https://www.nuget.org/packages/FsCodec.NewtonsoftJson/) `FsCodec.NewtonsoftJson`: As described in [a scheme for the serializing Events modelled as an F# Discriminated Union](https://eiriktsarpalis.wordpress.com/2018/10/30/a-contract-pattern-for-schemaless-datastores/), enabled tagging of F# Discriminated Union cases in a versionable manner with low-dependencies using [TypeShape](https:/eiriktsarpalis/TypeShape)'s [`UnionContractEncoder`](https://eiriktsarpalis.wordpress.com/2018/10/30/a-contract-pattern-for-schemaless-datastores)
- Uses the ubiquitous [`Newtonsoft.Json`](https:/JamesNK/Newtonsoft.Json) library to serialize the event bodies.
- Provides relevant Converters for common non-primitive types prevalent in F#
Expand Down Expand Up @@ -216,7 +217,7 @@ _See [tests/FsCodec.NewtonsoftJson.Tests/Examples.fsx](tests/FsCodec.NewtonsoftJ
<a name="IEventCodec"></a>
## [`FsCodec.IEventCodec`](https:/jet/FsCodec/blob/master/src/FsCodec/FsCodec.fs#L31)

```
```fsharp
/// Defines a contract interpreter that encodes and/or decodes events representing the known set of events borne by a stream category
type IEventCodec<'Event, 'Format, 'Context> =
/// Encodes a <c>'Event</c> instance into a <c>'Format</c> representation
Expand All @@ -241,7 +242,7 @@ type IEventCodec<'Event, 'Format, 'Context> =

Pending and timeline Events share the following common contract:

```
```fsharp
/// Common form for either a Domain Event or an Unfolded Event, without any context regarding its place in the timeline of events
type IEventData<'Format> =
/// The Event Type, used to drive deserialization
Expand All @@ -264,7 +265,7 @@ type IEventData<'Format> =

Events from a versioned feed and/or being loaded from an Event Store bring additional context beyond the base information in [IEventData](#IEventData)

```
```fsharp
/// Represents a Domain Event or Unfold, together with it's 0-based <c>Index</c> in the event sequence
type ITimelineEvent<'Format> =
inherit IEventData<'Format>
Expand All @@ -278,14 +279,11 @@ type ITimelineEvent<'Format> =

See [a scheme for the serializing Events modelled as an F# Discriminated Union](https://eiriktsarpalis.wordpress.com/2018/10/30/a-contract-pattern-for-schemaless-datastores/) for details of the representation scheme used for the events when using `FsCodec.NewtonsoftJson.Codec.Create`. We'll use the following example contract for the illustration:

```
```fsharp
module Events =

// By convention, each contract defines a 'category' used as the first part of the stream name (e.g. `"Favorites-ClientA"`)
let [<Literal>] categoryId = "Favorites"
// The second part of the stream name is the ClientId; here we define an Active Pattern to enable easy decoding of this portion into a UMX type
// See (umx, below)
let (|ClientId|) = ClientId.parse
let [<Literal>] CategoryId = "Favorites"

type Added = { item : string }
type Removed = { name: string }
Expand All @@ -296,7 +294,7 @@ module Events =

let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

// See "logging unmatched events" later in this section for information about EventCodec
// See "logging unmatched events" later in this section for information about this wrapping using an EventCodec helper
let (|Decode|_|) stream = EventCodec.tryDecode codec Serilog.Log.Logger stream
```

Expand All @@ -305,14 +303,15 @@ module Events =

The example event stream contract above uses a `ClientId` type which (while being a string at heart) represents the identifier for a specific entity. We use the `FSharp.UMX` library that leans on the F# units of measure feature to tag the strings such that they can't be confused with other identifiers - think of it as a type alias on steroids.

```
```fsharp
open FSharp.UMX

type [<Measure>] clientId
type ClientId = string<clientId>
module ClientId =
let parse (str : string) : ClientId = % str
let toString (value : ClientId) : string = % value
let (|Parse|) = ClientId.parse
```

## Stream naming conventions
Expand All @@ -327,44 +326,89 @@ Where:
- `-` (hyphen/minus) represents the by-convention canonical stream name separator
- `{Identifier}` represents the identifier of the aggregate [root] for which we're maintaining the data

In F#, the following set of helpers are useful for splitting and filtering Stream Names by Categories and/or Identifiers. Similar helpers would of course make sense in other languages e.g. C#:
[`FsCodec.StreamName`](https:/jet/FsCodec/blob/master/src/FsCodec/StreamName.fs): presents the following set of helpers that are useful for splitting and filtering Stream Names by Categories and/or Identifiers. Similar helpers would of course make sense in other languages e.g. C#:

```fsharp
// Type aliases for a type-tagged `string`
type [<Measure>] streamName
type StreamName = string<streamName>

```
module StreamName =

let private catSeparators = [|'-'|]
let private split (streamName : string) = streamName.Split(catSeparators, 2, StringSplitOptions.RemoveEmptyEntries)
let category (streamName : string) = let fragments = split streamName in fragments.[0]
let (|Category|Other|) (streamName : string) =
match split streamName with
| [| category; id |] -> Category (category, id)
| _ -> Other streamName
(* Creators: Building from constituent parts
Guards against malformed category, aggregateId and/or aggregateIdElements with exceptions *)

// Recommended way to specify a stream identifier; a category identifier and an aggregate identity
// category is separated from id by `-`
let create (category : string) aggregateId : StreamName = ...

// Composes a StreamName from a category and > 1 name elements.
// category is separated from the aggregateId by '-'; elements are separated from each other by '_'
let compose (category : string) (aggregateIdElements : string seq) : StreamName = ...

// Validates and maps a trusted Stream Name consisting of a Category and an Id separated by a '-` (dash)
// Throws <code>InvalidArgumentException</code> if it does not adhere to that form
let parse (rawStreamName : string) : StreamName = ...

(* Rendering *)


let toString (streamName : StreamName) : string = UMX.untag streamName

(* Parsing: Raw Stream name Validation functions/pattern that handle malformed cases without throwing *)

// Attempts to split a Stream Name in the form {category}-{id} into the two elements.
// Returns <code>None</code> if it does not adhere to that form.
let trySplitCategoryAndId (rawStreamName : string) : (string * string) option = ...

// Attempts to split a Stream Name in the form {category}-{id} into the two elements.
// Yields <code>NotCategorized</code> if it does not adhere to that form.
let (|Categorized|NotCategorized|) (rawStreamName : string) : Choice<string*string,unit> = ...

(* Splitting: functions/Active patterns for (i.e. generated via `parse`, `create` or `compose`) well-formed Stream Names
Will throw if presented with malformed strings [generated via alternate means] *)

// Splits a well-formed Stream Name of the form {category}-{id} into the two elements.
// Throws <code>InvalidArgumentException</code> if it does not adhere to the well known format (i.e. if it was not produced by `parse`).
// <remarks>Inverse of <code>create</code>
let splitCategoryAndId (streamName : StreamName) : string * string = ...
let (|CategoryAndId|) : StreamName -> (string * string) = splitCategoryAndId

// Splits a `_`-separated set of id elements (as formed by `compose`) into its (one or more) constituent elements.
// <remarks>Inverse of what <code>compose</code> does to the subElements
let (|IdElements|) (aggregateId : string) : string[] = ...

// Splits a well-formed Stream Name of the form {category}-{id1}_{id2}_{idN} into a pair of category and ids
// Throws <code>InvalidArgumentException</code> if it does not adhere to the well known format (i.e. if it was not produced by `parse`).
// <remarks>Inverse of <code>create</code>
let splitCategoryAndIds (streamName : StreamName) : string * string[] = ...
let (|CategoryAndIds|) : StreamName -> (string * string[]) = splitCategoryAndIds
```

## Decoding events

Given the following example events from across streams:

```
```fsharp
let utf8 (s : string) = System.Text.Encoding.UTF8.GetBytes(s)
let events = [
"Favorites-ClientA", FsCodec.Core.TimelineEvent.Create(0L, "Added", utf8 """{ "item": "a" }""")
"Favorites-ClientB", FsCodec.Core.TimelineEvent.Create(0L, "Added", utf8 """{ "item": "b" }""")
"Favorites-ClientA", FsCodec.Core.TimelineEvent.Create(1L, "Added", utf8 """{ "item": "b" }""")
"Favorites-ClientB", FsCodec.Core.TimelineEvent.Create(1L, "Added", utf8 """{ "item": "a" }""")
"Favorites-ClientB", FsCodec.Core.TimelineEvent.Create(2L, "Removed", utf8 """{ "item": "a" }""")
"Favorites-ClientB", FsCodec.Core.TimelineEvent.Create(3L, "Exported", utf8 """{ "count": 2 }""")
"Misc-x", FsCodec.Core.TimelineEvent.Create(0L, "Dummy", utf8 """{ "item": "z" }""")
StreamName.parse "Favorites-ClientA", Core.TimelineEvent.Create(0L, "Added", utf8 """{ "item": "a" }""")
StreamName.parse "Favorites-ClientB", Core.TimelineEvent.Create(0L, "Added", utf8 """{ "item": "b" }""")
StreamName.parse "Favorites-ClientA", Core.TimelineEvent.Create(1L, "Added", utf8 """{ "item": "b" }""")
StreamName.parse "Favorites-ClientB", Core.TimelineEvent.Create(1L, "Added", utf8 """{ "item": "a" }""")
StreamName.parse "Favorites-ClientB", Core.TimelineEvent.Create(2L, "Removed", utf8 """{ "item": "a" }""")
StreamName.create "Favorites" "ClientB", Core.TimelineEvent.Create(3L, "Exported", utf8 """{ "count": 2 }""")
StreamName.parse "Misc-x", Core.TimelineEvent.Create(0L, "Dummy", utf8 """{ "item": "z" }""")
]
```

and the helpers defined above, we can route and/or filter them as follows:

```
```fsharp
let runCodec () =
for stream, event in events do
match stream, event with
| StreamName.Category (Events.categoryId, ClientId id), (Events.Decode stream e) ->
| StreamName.Category (Events.CategoryId, ClientId.Parse id), (Events.Decode stream e) ->
printfn "Client %s, event %A" (ClientId.toString id) e
| StreamName.Category (cat, id), e ->
printfn "Unhandled Event: Category %s, Id %s, Index %d, Event: %A " cat id e.Index e.EventType
Expand All @@ -383,35 +427,70 @@ Client ClientB, event Removed {name = null;}
Unhandled Event: Category Favorites, Id ClientB, Index 3, Event: "Exported"
Unhandled Event: Category Misc, Id x, Index 0, Event: "Dummy"
```

There are two events that we were not able to decode, for varying reasons:

1. `"Misc-x", FsCodec.Core.TimelineEvent.Create(0L, "Dummy", utf8 """{ "item": "z" }""")` represents an Event that happens to pass through our event processor that we don't want to decode and/or handle - we don't need to define a contract type for
2. `"Favorites-ClientB", FsCodec.Core.TimelineEvent.Create(3L, "Exported", utf8 """{ "count": 2 }""")` represents an Event that has recently been defined in the source system, but not yet handled by the processor. In the event of such an unclassified event occurring within a stream contract we want to know when we're not handling a given event. That's trapped above and logged as `Unhandled Event: Category Favorites, Id ClientB, Index 3, Event: "Exported"`.
2. `"Favorites" "ClientB", FsCodec.Core.TimelineEvent.Create(3L, "Exported", utf8 """{ "count": 2 }""")` represents an Event that has recently been defined in the source system, but not yet handled by the processor. In the event of such an unclassified event occurring within a stream contract we want to know when we're not handling a given event. That's trapped above and logged as `Unhandled Event: Category Favorites, Id ClientB, Index 3, Event: "Exported"`.

_Note however, that we don't have a clean way to trap the data and log it. See [Logging unmatched events](#logging-unmatched-events) for an example of how one might log such unmatched events_

## Adding Matchers to the Event Contract

We can clarify the consuming code a little by adding further helper Active Patterns alongside the event contract :-

```fsharp
module Events =

// ... (as above)

// Pattern to determine whether a given {category}-{aggregateId} StreamName represents the stream associated with this Aggregate
// Yields a strongly typed id from the aggregateId if the Category does match
let (|MatchesCategory|_|) = function
| FsCodec.StreamName.CategoryAndId (CategoryId, ClientId.Parse clientId) -> Some clientId
| _ -> None

// ... (as above)

// Yields decoded event and relevant strongly typed ids if the category of the Stream Name is correct
let (|Match|_|) (streamName, span) =
match streamName, span with
| MatchesCategory clientId, (Decode streamName event) -> Some (clientId, event)
| _ -> None
```

That boxes off the complex pattern matching close to the contract itself, and lets us match on the events in a handler as follows:

```fsharp
let runCodecCleaner () =
for stream, event in events do
match stream, event with
| Events.Match (clientId, event) ->
printfn "Client %s, event %A" (ClientId.toString clientId) event
| StreamName.CategoryAndId (cat, id), e ->
printfn "Unhandled Event: Category %s, Id %s, Index %d, Event: %A " cat id e.Index e.EventType
```

## Logging unmatched events

The following helper (which uses the [`Serilog`](https:/serilog/serilog) library), can be used to selectively layer on some logging when run with logging upped to `Debug` level:

```
```fsharp
module EventCodec =

/// Uses the supplied codec to decode the supplied event record `x` (iff at LogEventLevel.Debug, detail fails to `log` citing the `stream` and content)
let tryDecode (codec : FsCodec.IEventCodec<_,_,_>) (log : Serilog.ILogger) (stream : string) (x : FsCodec.ITimelineEvent<byte[]>) =
// Uses the supplied codec to decode the supplied event record `x` (iff at LogEventLevel.Debug, detail fails to `log` citing the `stream` and content)
let tryDecode (codec : FsCodec.IEventCodec<_,_,_>) (log : Serilog.ILogger) streamName (x : FsCodec.ITimelineEvent<byte[]>) =
match codec.TryDecode x with
| None ->
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then
log.ForContext("event", System.Text.Encoding.UTF8.GetString(x.Data), true)
.Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, x.EventType, stream)
.Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, x.EventType, streamName)
None
| x -> x
```

Normally, the `log.IsEnabled` call instantly rules out any need for logging. We can activate this inert logging hook by reconfiguring the logging as follows:

```
```fsharp
// Switch on debug logging to get detailed information about events that don't match (which has no singificant perf cost when not switched on)
open Serilog
open Serilog.Events
Expand All @@ -438,7 +517,7 @@ For example, we may wish to log (or process as part of our domain logic) metadat

A clean way to wrap such a set of transitions is as follows:

```
```fsharp
module EventsWithMeta =

type EventWithMeta = int64 * DateTimeOffset * Events.Event
Expand All @@ -449,20 +528,22 @@ module EventsWithMeta =
event, None, Some timestamp
FsCodec.NewtonsoftJson.Codec.Create(up, down)
let (|Decode|_|) stream event : EventWithMeta option = EventCodec.tryDecode codec Serilog.Log.Logger stream event
let (|Match|_|) (streamName, span) =
match streamName, span with
| Events.MatchesCategory clientId, (Decode streamName event) -> Some (clientId, event)
| _ -> None
```

This allows us to tweak the `runCodec` above as follows to also surface additional contextual information:

```
```fsharp
let runWithContext () =
for stream, event in events do
match stream, event with
| StreamName.Category (Events.categoryId, Events.ClientId id), (EventsWithMeta.Decode stream (index, ts, e)) ->
printfn "Client %s index %d time %O event %A" (ClientId.toString id) index (ts.ToString "u") e
| StreamName.Category (cat, id), e ->
| EventsWithMeta.Match (clientId, (index, ts, e)) ->
printfn "Client %s index %d time %O event %A" (ClientId.toString clientId) index (ts.ToString "u") e
| FsCodec.StreamName.CategoryAndId (cat, id), e ->
printfn "Unhandled Event: Category %s, Id %s, Index %d, Event: %A " cat id e.Index e.EventType
| StreamName.Other streamName, _e ->
failwithf "Invalid Stream Name: %s" streamName
```

which yields the following output:
Expand Down
2 changes: 2 additions & 0 deletions src/FsCodec/FsCodec.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<ItemGroup>
<Compile Include="FsCodec.fs" />
<Compile Include="Codec.fs" />
<Compile Include="StreamName.fs" />
</ItemGroup>

<ItemGroup>
Expand All @@ -19,6 +20,7 @@

<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' == 'net461' " />
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />
<PackageReference Include="FSharp.UMX" Version="1.0.0" />
</ItemGroup>

</Project>
Loading