Skip to content

Commit

Permalink
Match exception by type in decider (#96)
Browse files Browse the repository at this point in the history
* Match exception by type in decider

Resolves #69

* Self-review fixes
  • Loading branch information
s-vitaliy authored Aug 13, 2024
1 parent 19fefc2 commit 52b1351
Showing 1 changed file with 73 additions and 63 deletions.
136 changes: 73 additions & 63 deletions src/Sources/RestApi/RestApiSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ public static RestApiSource Create(
{
return new RestApiSource(uriProvider, headerAuthenticatedMessageProvider, isBackfilling,
changeCaptureInterval,
lookBackInterval, httpRequestTimeout, stopAfterBackfill, rateLimitPolicy, apiSchema, responsePropertyKeyChain);
lookBackInterval, httpRequestTimeout, stopAfterBackfill, rateLimitPolicy, apiSchema,
responsePropertyKeyChain);
}

/// <summary>
Expand Down Expand Up @@ -200,7 +201,8 @@ public static RestApiSource Create(
{
return new RestApiSource(uriProvider, headerAuthenticatedMessageProvider, isBackfilling,
changeCaptureInterval,
lookBackInterval, httpRequestTimeout, stopAfterBackfill, rateLimitPolicy, apiSchema, responsePropertyKeyChain);
lookBackInterval, httpRequestTimeout, stopAfterBackfill, rateLimitPolicy, apiSchema,
responsePropertyKeyChain);
}

/// <summary>
Expand Down Expand Up @@ -350,11 +352,11 @@ public SourceLogic(RestApiSource source) : base(source.changeCaptureInterval, so
Timeout = this.source.httpRequestTimeout
};

this.decider = Decider.From((ex) => ex.GetType().Name switch
this.decider = Decider.From(ex => ex switch
{
nameof(IOException) => Directive.Restart,
nameof(TimeoutException) => Directive.Restart,
nameof(HttpRequestException) => Directive.Restart,
IOException => Directive.Restart,
TimeoutException => Directive.Restart,
HttpRequestException => Directive.Restart,
_ => Directive.Stop
});

Expand Down Expand Up @@ -390,6 +392,11 @@ public override void PreStart()
}
}

protected override void OnTimer(object timerKey)
{
this.PullChanges();
}

private void OnRecordReceived(Task<Option<JsonElement>> readTask)
{
if (readTask.IsFaulted || readTask.IsCanceled)
Expand Down Expand Up @@ -434,74 +441,77 @@ private void OnRecordReceived(Task<Option<JsonElement>> readTask)
}
}

private void PullChanges()
{
this.source.rateLimitPolicy.ExecuteAsync(() => this.source.authenticatedMessageProvider
private void PullChanges() =>
this.source.rateLimitPolicy.ExecuteAsync(this.SendRequestOnce)
.TryMap(result => result, HandleException)
.ContinueWith(this.responseReceived);

private Task<Option<JsonElement>> SendRequestOnce() => this.source
.authenticatedMessageProvider
.GetAuthenticatedMessage(this.httpClient)
.Map(msg =>
{
this.Log.Debug("Successfully authenticated");
var (maybeNextUri, requestMethod, maybePayload) = this.source.uriProvider.GetNextResultUri(
this.currentResponse, this.IsRunningInBackfillMode, this.source.lookBackInterval,
this.ChangeCaptureInterval);
.Map(this.SendRequest)
.Flatten();

if (maybeNextUri.IsEmpty)
{
return Task.FromResult(Option<JsonElement>.None);
}
private Task<Option<JsonElement>> SendRequest(HttpRequestMessage msg)
{
this.Log.Debug("Successfully authenticated");
var (maybeNextUri, requestMethod, maybePayload) = this.source.uriProvider.GetNextResultUri(
this.currentResponse, this.IsRunningInBackfillMode, this.source.lookBackInterval,
this.ChangeCaptureInterval);

if (maybeNextUri.IsEmpty)
{
return Task.FromResult(Option<JsonElement>.None);
}

msg.RequestUri = maybeNextUri.Value;
msg.Method = requestMethod;
msg.RequestUri = maybeNextUri.Value;
msg.Method = requestMethod;

if (maybePayload.HasValue)
if (maybePayload.HasValue)
{
msg.Content = new StringContent(maybePayload.Value);
if (!string.IsNullOrEmpty(maybePayload.Value))
{
this.Log.Info($"Request payload for next result: {maybePayload.Value}");
}
}

this.Log.Info($"Requesting next result from {msg.RequestUri}");

return this.httpClient.SendAsync(msg, default(CancellationToken))
.Map(response =>
{
if (response.IsSuccessStatusCode)
{
msg.Content = new StringContent(maybePayload.Value);
if (!string.IsNullOrEmpty(maybePayload.Value))
this.currentResponse = response;
return response.Content.ReadAsStringAsync().Map(value =>
{
this.Log.Info($"Request payload for next result: {maybePayload.Value}");
}
this.Log.Debug($"Got response: {value}");
return JsonSerializer.Deserialize<JsonElement>(value).AsOption();
});
}
this.Log.Info($"Requesting next result from {msg.RequestUri}");
var errorMsg =
$"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}";
return this.httpClient.SendAsync(msg, default(CancellationToken))
.Map(response =>
{
if (response.IsSuccessStatusCode)
{
this.currentResponse = response;
return response.Content.ReadAsStringAsync().Map(value =>
{
this.Log.Debug($"Got response: {value}");
return JsonSerializer.Deserialize<JsonElement>(value).AsOption();
});
}
var errorMsg =
$"API request to {msg.RequestUri} failed with {response.StatusCode}, reason: {response.ReasonPhrase}, content: {response.Content.ReadAsStringAsync().ConfigureAwait(false).GetAwaiter().GetResult()}";
this.Log.Warning(errorMsg);
throw new HttpRequestException(errorMsg, null, response.StatusCode);
}).Flatten();
}).Flatten()).TryMap(result => result, exception => exception switch
{
RateLimitRejectedException => Option<JsonElement>.None, // configured rate limit
HttpRequestException
{
StatusCode: HttpStatusCode.TooManyRequests
} => Option<JsonElement>.None, // API rate limit, in case configured rate limit is not good enough
HttpRequestException
{
StatusCode: HttpStatusCode.RequestTimeout
} => Option<JsonElement>.None, // Potential server-side timeout due to overload
_ => throw exception
}).ContinueWith(this.responseReceived);
this.Log.Warning(errorMsg);
throw new HttpRequestException(errorMsg, null, response.StatusCode);
}).Flatten();
}

protected override void OnTimer(object timerKey)
private static Option<JsonElement> HandleException(Exception exception) => exception switch
{
this.PullChanges();
}
RateLimitRejectedException => Option<JsonElement>.None, // configured rate limit
HttpRequestException
{
StatusCode: HttpStatusCode.TooManyRequests
} => Option<JsonElement>.None, // API rate limit, in case configured rate limit is not good enough
HttpRequestException
{
StatusCode: HttpStatusCode.RequestTimeout
} => Option<JsonElement>.None, // Potential server-side timeout due to overload
_ => throw exception
};
}
}

0 comments on commit 52b1351

Please sign in to comment.