Skip to content

Commit

Permalink
impl
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Aug 20, 2020
1 parent 9a01366 commit 5126c6f
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 70 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- `Cosmos`: `Prune` API to delete events from the head of a stream [#226](https:/jet/equinox/pull/226)

### Changed

- target `Microsoft.Azure.Cosmos` v `3.9.0` (instead of `Microsoft.Azure.DocumentDB`[`.Core`] v 2.x) [#144](https:/jet/equinox/pull/144)
Expand Down
171 changes: 115 additions & 56 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,19 @@ module Log =
/// Summarizes a set of Responses for a given Read request
| Query of Direction * responses: int * Measurement
/// Individual read request in a Batch
/// Charges are rolled up into Query (so do not double count)
| Response of Direction * Measurement
| SyncSuccess of Measurement
| SyncResync of Measurement
| SyncConflict of Measurement
| DelResponse of Measurement
/// Handled response from listing of batches in a stream
/// Charges are rolled up into Prune (so do not double count)
| PruneResponse of Measurement
/// Deleted an individual Batch
| Delete of Measurement
/// Pruned batches from head of a stream
/// Count in Measurement is number of PruneResponses handled
| Prune of eventCount: int * batchCount : int * Measurement
let prop name value (log : ILogger) = log.ForContext(name, value)
let propData name (events: #IEventData<byte[]> seq) (log : ILogger) =
let render = function null -> "null" | bytes -> System.Text.Encoding.UTF8.GetString bytes
Expand Down Expand Up @@ -266,16 +274,20 @@ module Log =
module Stats =
let inline (|Stats|) ({ interval = i; ru = ru }: Measurement) = ru, let e = i.Elapsed in int64 e.TotalMilliseconds

let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosResponseRc|) = function
let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosResponseRc|CosmosDeleteRc|CosmosPruneRc|) = function
| Tip (Stats s)
| TipNotFound (Stats s)
| TipNotModified (Stats s)
| Query (_,_, (Stats s)) -> CosmosReadRc s
// slices are rolled up into batches so be sure not to double-count
| Response (_,(Stats s)) -> CosmosResponseRc s
| Response (_,(Stats s))
// costs roll up into Prune operation so be sure not to double-count
| PruneResponse (Stats s) -> CosmosResponseRc s
| SyncSuccess (Stats s)
| SyncConflict (Stats s) -> CosmosWriteRc s
| SyncResync (Stats s) -> CosmosResyncRc s
| Delete (Stats s) -> CosmosDeleteRc s
| Prune (_, _, (Stats s)) -> CosmosPruneRc s
let (|SerilogScalar|_|) : LogEventPropertyValue -> obj option = function
| (:? ScalarValue as x) -> Some x.Value
| _ -> None
Expand All @@ -295,10 +307,14 @@ module Log =
static member val Read = Counter.Create() with get, set
static member val Write = Counter.Create() with get, set
static member val Resync = Counter.Create() with get, set
static member val Delete = Counter.Create() with get, set
static member val Prune = Counter.Create() with get, set
static member Restart() =
LogSink.Read <- Counter.Create()
LogSink.Write <- Counter.Create()
LogSink.Resync <- Counter.Create()
LogSink.Delete <- Counter.Create()
LogSink.Prune <- Counter.Create()
let span = epoch.Elapsed
epoch.Restart()
span
Expand All @@ -307,6 +323,9 @@ module Log =
| CosmosMetric (CosmosReadRc stats) -> LogSink.Read.Ingest stats
| CosmosMetric (CosmosWriteRc stats) -> LogSink.Write.Ingest stats
| CosmosMetric (CosmosResyncRc stats) -> LogSink.Resync.Ingest stats
| CosmosMetric (CosmosDeleteRc stats) -> LogSink.Delete.Ingest stats
| CosmosMetric (CosmosPruneRc stats) -> LogSink.Prune.Ingest stats
| CosmosMetric (CosmosResponseRc _) -> () // Costs are already included in others
| _ -> ()

/// Relies on feeding of metrics from Log through to Stats.LogSink
Expand All @@ -315,7 +334,9 @@ module Log =
let stats =
[ "Read", Stats.LogSink.Read
"Write", Stats.LogSink.Write
"Resync", Stats.LogSink.Resync ]
"Resync", Stats.LogSink.Resync
"Delete", Stats.LogSink.Delete
"Prune", Stats.LogSink.Prune ]
let mutable rows, totalCount, totalRc, totalMs = 0, 0L, 0., 0L
let logActivity name count rc lat =
log.Information("{name}: {count:n0} requests costing {ru:n0} RU (average: {avg:n2}); Average latency: {lat:n0}ms",
Expand Down Expand Up @@ -752,67 +773,106 @@ module internal Tip =

// Manages deletion of batches
// Note: it's critical that we delete individually, in the correct order so as not to leave gaps
// Note: public so BatchIndices can be deserialized into
module Delete =

// If we have results: []
// - deleteBefore 9 would: return 0,0,0

// If we have results: ["-1",10,10; "0",0,1; "2",1,3; "3",3,8; "8",8,10]
// - deleteBefore 3 would: inspect first 3, delete 2, return 3,0,3
// - deleteBefore 4 would: inspect first 4, delete 2, return 3,1,3
// - deleteBefore 8 would: inspect first 4, delete 2, return 8,0,8

// If we have results: ["-1",10,10; "8",8,10]
// - deleteBefore 3 would: inspect first 2, delete 0, return 0,0,8
// - deleteBefore 8 would: inspect first 2, delete 0, return 0,0,8
// - deleteBefore 9 would: inspect first 2, delete 0, return 0,1,8
// - deleteBefore 10 would: inspect first 2, delete 1, return 2,0,10
// - deleteBefore 11 would: inspect first 2, delete 1, return 2,0,10

// If we have results: ["-1",10,10]
// - deleteBefore 9 would: inspect first 1, delete 0, return 0,0,10
// - deleteBefore 10 would: inspect first 1, delete 0, return 0,0,10
// - deleteBefore 11 would: inspect first 1, delete 0, return 0,0,10
type BatchIndices = { id : string; i : int; n : int }
open FSharp.Control

type BatchIndices = { id : string; i : int64; n : int64 }

let pruneBefore (log: ILogger) (container: Container, stream: string) maxItems beforePos : Async<int * int * int64> = async {
let! ct = Async.CancellationToken
let log = log |> Log.prop "stream" stream
let deleteItem id count : Async<float> = async {
let! t, res = container.DeleteItemAsync(id, PartitionKey stream, cancellationToken=ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time
let rc, ms = res.RequestCharge, (let e = t.Elapsed in e.TotalMilliseconds)
let reqMetric : Log.Measurement = { stream = stream; interval = t; bytes = -1; count = count; ru = rc }
let log = let evt = Log.Delete reqMetric in log |> Log.event evt
log.Information("EqxCosmos {action:l} {id} {ms}ms rc={ru}", "Delete", id, ms, rc)
return res.RequestCharge
}
let log = log |> Log.prop "beforePos" beforePos
let query : FeedIterator<BatchIndices> =
let qro = QueryRequestOptions(PartitionKey=Nullable(PartitionKey stream), MaxItemCount=Nullable maxItems)
container.GetItemQueryIterator<_>(QueryDefinition "SELECT c.id, c.i, c.n FROM c", requestOptions=qro)
let tryReadNextPage (x : FeedIterator<_>) = async {
if not x.HasMoreResults then return None else

let! ct = Async.CancellationToken
let! t, (res : FeedResponse<_>) = query.ReadNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time
let batches, rc, ms = Array.ofSeq res, res.RequestCharge, (let e = t.Elapsed in e.TotalMilliseconds)
let reqMetric : Log.Measurement = { stream = stream; interval = t; bytes = -1; count = batches.Length; ru = rc }
let next = Array.tryLast batches |> Option.map (fun x -> x.n) |> Option.toNullable
let log = let evt = Log.DelResponse reqMetric in log |> Log.event evt |> Log.prop "beforePos" beforePos
log.Information("EqxCosmos {action:l} {batches} {ms}ms n={next} rc={ru}", "DelResponse", batches.Length, ms, next, rc)
return Some ((t, rc, batches), x)
let reqMetric : Log.Measurement = { stream = stream; interval = t; bytes = -1; count = batches.Length; ru = rc }
let log = let evt = Log.PruneResponse reqMetric in log |> Log.event evt
log.Information("EqxCosmos {action:l} {batches} {ms}ms n={next} rc={ru}", "PruneResponse", batches.Length, ms, next, rc)
return Some ((rc, batches), x)
}
let! outcomes =
// If we have results: []
// - deleteBefore 9 would: return 0,0,0

// If we have results: ["-1",10,10; "0",0,1; "2",1,3; "3",3,8; "8",8,10]
// - deleteBefore 3 would: inspect first 3, delete 2, return 3,0,3
// - deleteBefore 4 would: inspect first 4, delete 2, return 3,1,3
// - deleteBefore 8 would: inspect first 4, delete 2, return 8,0,8

// If we have results: ["-1",10,10; "8",8,10]
// - deleteBefore 3 would: inspect first 2, delete 0, return 0,0,8
// - deleteBefore 8 would: inspect first 2, delete 0, return 0,0,8
// - deleteBefore 9 would: inspect first 2, delete 0, return 0,1,8
// - deleteBefore 10 would: inspect first 2, delete 1, return 2,0,10
// - deleteBefore 11 would: inspect first 2, delete 1, return 2,0,10

// If we have results: ["-1",10,10]
// - deleteBefore 9 would: inspect first 1, delete 0, return 0,0,10
// - deleteBefore 10 would: inspect first 1, delete 0, return 0,0,10
// - deleteBefore 11 would: inspect first 1, delete 0, return 0,0,10
let! pt, outcomes =
let isTip (x : BatchIndices) = x.id = Tip.WellKnownDocumentId
let isValid x = isTip x || x.n <= beforePos
let hasValidItems (_, _, batches) = batches |> Array.exists isValid
let handle (t, rc, batches : BatchIndices[]) = async {
let mutable lwm = 0
for x in batches |> Seq.takeWhile isValid do
lwm <- min lwm x.n
if (not << isTip) x then
do!

let lwm = batches |> Seq.min
return t, rc
let isRelevant x = isTip x || x.i < beforePos
let hasRelevantItems (_, batches) = batches |> Array.exists isRelevant
let handle (rc, batches : BatchIndices[]) = async {
let mutable delCharges, batchesDeleted, eventsDeleted, eventsDeferred = 0., 0, 0, 0
let mutable tipI, lwm = None, None
for x in batches |> Seq.takeWhile isRelevant do
let count = x.n - x.i |> int
if isTip x then
tipI <- Some x.i
elif x.n > beforePos then
eventsDeferred <- eventsDeferred + min count (int (beforePos - x.i))
lwm <- Some x.i
else
let! charge = deleteItem x.id count
delCharges <- delCharges + charge
batchesDeleted <- batchesDeleted + 1
eventsDeleted <- eventsDeleted + count
lwm <- Some x.n
return rc, (tipI, lwm), (delCharges, batchesDeleted, eventsDeleted, eventsDeferred)
}
AsyncSeq.unfoldAsync tryReadNextPage query
|> AsyncSeq.takeWhile hasValidItems
|> AsyncSeq.takeWhile hasRelevantItems
|> AsyncSeq.mapAsync handle
|> AsyncSeq.toArrayAsync
let index = if count = 0 then Nullable () else Nullable <| Seq.min (seq { for x in batches -> x.i })
(log |> (match startPos with Some pos -> Log.propStartPos pos | None -> id) |> Log.prop "bytes" bytes)
.Information("EqxCosmos {action:l} {count}/{batches} {direction} {ms}ms i={index} rc={ru}",
"Delete", count, batches.Length, direction, (let e = t.Elapsed in e.TotalMilliseconds), index, ru)
return ()
|> Stopwatch.Time
let mutable queryCharges, delCharges, responses, batchesDeleted, eventsDeleted, eventsDeferred = 0., 0., 0, 0, 0, 0
let mutable lwm, tipI = None, None
for qc, (bTipI, bLwm), (dc, bDel, eDel, eDef) in outcomes do
lwm <- max lwm bLwm
tipI <- max tipI bTipI
queryCharges <- queryCharges + qc
delCharges <- delCharges + dc
responses <- responses + 1
batchesDeleted <- batchesDeleted + bDel
eventsDeleted <- eventsDeleted + eDel
eventsDeferred <- eventsDeferred + eDef
let reqMetric : Log.Measurement = { stream = stream; interval = pt; bytes = -1; count = responses; ru = queryCharges }
let log = let evt = Log.Prune (eventsDeleted, batchesDeleted, reqMetric) in log |> Log.event evt
let lwm =
match lwm, tipI with
| Some lwm, _ -> lwm // we saw a batch and identified a Low Water mark based on it
| None, Some tipI -> tipI // we saw the Tip, but no batches along the way, therefore it's i is the low water mark
| None, None -> 0L // If we've seen no batches at all, then the write position is 0L
log.Information("EqxCosmos {action:l} {events}/{batches} lwm={lwm} {ms}ms queryRu={ru} deleteRu={delRu}",
"Prune", eventsDeleted, batchesDeleted, lwm, (let e = pt.Elapsed in e.TotalMilliseconds), queryCharges, delCharges)
return eventsDeleted, eventsDeferred, lwm
}

type [<NoComparison>] Token = { container: Container; stream: string; pos: Position }
Expand Down Expand Up @@ -919,9 +979,8 @@ type Gateway(conn : Connection, batching : BatchingPolicy) =
| Sync.Result.Conflict (pos',events) -> return InternalSyncResult.Conflict (Token.create containerStream pos',events)
| Sync.Result.ConflictUnknown pos' -> return InternalSyncResult.ConflictUnknown (Token.create containerStream pos')
| Sync.Result.Written pos' -> return InternalSyncResult.Written (Token.create containerStream pos') }
member __.Prune(log, container, stream, beforeIndex) = async {
return! Delete.pruneBefore log (container, stream) maxItems
}
member __.Prune(log, (container, stream), beforeIndex) =
Delete.pruneBefore log (container, stream) batching.MaxItems beforeIndex

type private Category<'event, 'state, 'context>(gateway : Gateway, codec : IEventCodec<'event,byte[],'context>) =
let (|TryDecodeFold|) (fold: 'state -> 'event seq -> 'state) initial (events: ITimelineEvent<byte[]> seq) : 'state = Seq.choose codec.TryDecode events |> fold initial
Expand Down Expand Up @@ -981,7 +1040,7 @@ module Caching =
(slidingExpiration : TimeSpan)
(category : ICategory<'event, 'state, Container*string, 'context>)
: ICategory<'event, 'state, Container*string, 'context> =
let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let mkCacheEntry (initialToken : StreamToken, initialState : 'state) = CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let options = CacheItemOptions.RelativeExpiration slidingExpiration
let addOrUpdateSlidingExpirationCacheEntry streamName value = cache.UpdateIfNewer(prefix + streamName, options, mkCacheEntry value)
CategoryTee<'event, 'state, 'context>(category, addOrUpdateSlidingExpirationCacheEntry) :> _
Expand Down Expand Up @@ -1327,8 +1386,8 @@ type Context
| AppendResult.Ok token -> return token
| x -> return x |> sprintf "Conflict despite it being disabled %A" |> invalidOp }

member __.Prune(stream, beforePosition) : Async<int * int * int64> =
gateway.Prune(stream, beforePosition)
member __.Prune((container,stream), beforeIndex) : Async<int * int * int64> =
gateway.Prune(log, (container,stream), beforeIndex)

/// Provides mechanisms for building `EventData` records to be supplied to the `Events` API
type EventData() =
Expand Down Expand Up @@ -1387,9 +1446,9 @@ module Events =

/// Requests deletion of events prior to the specified Index
/// Due to the need to preserve ordering of data in the stream, only full batches will be removed
/// Returns count of events deleted this time, events that were retained due to partial batches, and the updated lowest sequence number
let prune (ctx: Context) (streamName: string) (index: int64): Async<int * int * int64> =
ctx.Prune(ctx.CreateStream streamName, Position.fromI index)
/// Returns count of events deleted this time, events that could not be deleted due to partial batches, and the stream's lowest remaining sequence number
let prune (ctx: Context) (streamName: string) (beforeIndex: int64): Async<int * int * int64> =
ctx.Prune(ctx.CreateStream streamName, beforeIndex)

/// Returns an async sequence of events in the stream backwards starting from the specified sequence number,
/// reading in batches of the specified size.
Expand Down
Loading

0 comments on commit 5126c6f

Please sign in to comment.