Skip to content

Commit

Permalink
fix: when redis adapter fails there is no retry (#255)
Browse files Browse the repository at this point in the history
  • Loading branch information
alouvion-aneo authored Feb 17, 2023
2 parents 648369e + f673d57 commit 70ea766
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 20 deletions.
79 changes: 60 additions & 19 deletions Adaptors/Redis/src/ObjectStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,22 @@ public class ObjectStorage : IObjectStorage
private readonly ILogger<ObjectStorage> logger_;
private readonly string objectStorageName_;
private readonly IDatabaseAsync redis_;
private readonly Options.Redis redisOptions_;

/// <summary>
/// <see cref="IObjectStorage" /> implementation for Redis
/// </summary>
/// <param name="redis">Connection to redis database</param>
/// <param name="redisOptions">Redis object storage options</param>
/// <param name="objectStorageName">Name of the object storage used to differentiate them</param>
/// <param name="logger">Logger used to print logs</param>
public ObjectStorage(IDatabaseAsync redis,
Options.Redis redisOptions,
string objectStorageName,
ILogger<ObjectStorage> logger)
{
redis_ = redis;
redisOptions_ = redisOptions;
objectStorageName_ = objectStorageName;
logger_ = logger;
}
Expand All @@ -59,21 +63,23 @@ public async Task AddOrUpdateAsync(string key,
IAsyncEnumerable<byte[]> valueChunks,
CancellationToken cancellationToken = default)
{
using var _ = logger_.LogFunction(objectStorageName_ + key);
var storageNameKey = objectStorageName_ + key;
using var _ = logger_.LogFunction(storageNameKey);

var idx = 0;
var taskList = new List<Task>();
await foreach (var chunk in valueChunks.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
taskList.Add(redis_.StringSetAsync(objectStorageName_ + key + "_" + idx,
chunk));
var storageNameKeyWithIndex = $"{storageNameKey}_{idx}";
taskList.Add(PerformActionWithRetry(() => redis_.StringSetAsync(storageNameKeyWithIndex,
chunk)));
++idx;
}

await redis_.StringSetAsync(objectStorageName_ + key + "_count",
idx)
.ConfigureAwait(false);
await PerformActionWithRetry(() => redis_.StringSetAsync(objectStorageName_ + key + "_count",
idx))
.ConfigureAwait(false);
await taskList.WhenAll()
.ConfigureAwait(false);
}
Expand All @@ -83,21 +89,22 @@ public async Task AddOrUpdateAsync(string key,
IAsyncEnumerable<ReadOnlyMemory<byte>> valueChunks,
CancellationToken cancellationToken = default)
{
using var _ = logger_.LogFunction(objectStorageName_ + key);
var storageNameKey = objectStorageName_ + key;
using var _ = logger_.LogFunction(storageNameKey);

var idx = 0;
var taskList = new List<Task>();
await foreach (var chunk in valueChunks.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
taskList.Add(redis_.StringSetAsync(objectStorageName_ + key + "_" + idx,
chunk));
var storageNameKeyWithIndex = $"{storageNameKey}_{idx}";
taskList.Add(PerformActionWithRetry(() => redis_.StringSetAsync(storageNameKeyWithIndex,
chunk)));
++idx;
}

await redis_.StringSetAsync(objectStorageName_ + key + "_count",
idx)
.ConfigureAwait(false);
taskList.Add(PerformActionWithRetry(() => redis_.StringSetAsync(storageNameKey + "_count",
idx)));
await taskList.WhenAll()
.ConfigureAwait(false);
}
Expand All @@ -107,8 +114,8 @@ public async IAsyncEnumerable<byte[]> GetValuesAsync(string
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
using var _ = logger_.LogFunction(objectStorageName_ + key);
var value = await redis_.StringGetAsync(objectStorageName_ + key + "_count")
.ConfigureAwait(false);
var value = await PerformActionWithRetry(() => redis_.StringGetAsync(objectStorageName_ + key + "_count"))
.ConfigureAwait(false);

if (!value.HasValue)
{
Expand All @@ -124,7 +131,7 @@ public async IAsyncEnumerable<byte[]> GetValuesAsync(string

foreach (var chunkTask in Enumerable.Range(0,
valuesCount)
.Select(index => redis_.StringGetAsync(objectStorageName_ + key + "_" + index))
.Select(index => PerformActionWithRetry(() => redis_.StringGetAsync(objectStorageName_ + key + "_" + index)))
.ToList())
{
yield return (await chunkTask.ConfigureAwait(false))!;
Expand All @@ -136,8 +143,8 @@ public async Task<bool> TryDeleteAsync(string key,
CancellationToken cancellationToken = default)
{
using var _ = logger_.LogFunction(objectStorageName_ + key);
var value = await redis_.StringGetAsync(objectStorageName_ + key + "_count")
.ConfigureAwait(false);
var value = await PerformActionWithRetry(() => redis_.StringGetAsync(objectStorageName_ + key + "_count"))
.ConfigureAwait(false);

if (!value.HasValue)
{
Expand All @@ -155,11 +162,45 @@ public async Task<bool> TryDeleteAsync(string key,
.ToArray();


return await redis_.KeyDeleteAsync(keyList)
.ConfigureAwait(false) == valuesCount + 1;
return await PerformActionWithRetry(() => redis_.KeyDeleteAsync(keyList))
.ConfigureAwait(false) == valuesCount + 1;
}

/// <inheritdoc />
public IAsyncEnumerable<string> ListKeysAsync(CancellationToken cancellationToken = default)
=> throw new NotImplementedException();

private async Task<T> PerformActionWithRetry<T>(Func<Task<T>> action)
{
for (var retryCount = 0; retryCount < redisOptions_.MaxRetry; retryCount++)
{
try
{
return await action()
.ConfigureAwait(false);
}
catch (RedisTimeoutException ex)
{
if (retryCount + 1 >= redisOptions_.MaxRetry)
{
logger_.LogError(ex,
"A RedisTimeoutException occurred {MaxRetryCount} times for the same action",
redisOptions_.MaxRetry);
throw;
}

var retryDelay = (retryCount + 1) * (retryCount + 1) * redisOptions_.MsAfterRetry;
logger_.LogWarning(ex,
"A RedisTimeoutException occurred {retryCount}/{redisOptions_.MaxRetry}, retry in {retryDelay} ms",
retryCount,
redisOptions_.MaxRetry,
retryDelay);
await Task.Delay(retryDelay)
.ConfigureAwait(false);
}
}

throw new RedisTimeoutException("A RedisTimeoutException occurred",
CommandStatus.Unknown);
}
}
4 changes: 4 additions & 0 deletions Adaptors/Redis/src/ObjectStorageFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ public class ObjectStorageFactory : IObjectStorageFactory
{
private readonly ILoggerFactory loggerFactory_;
private readonly IDatabaseAsync redis_;
private readonly Options.Redis redisOptions_;


private bool isInitialized_;

public ObjectStorageFactory(IDatabaseAsync redis,
Options.Redis redisOptions,
ILoggerFactory loggerFactory)
{
redis_ = redis;
redisOptions_ = redisOptions;
loggerFactory_ = loggerFactory;
}

Expand Down Expand Up @@ -79,6 +82,7 @@ public Task<HealthCheckResult> Check(HealthCheckTag tag)

public IObjectStorage CreateObjectStorage(string objectStorageName)
=> new ObjectStorage(redis_,
redisOptions_,
objectStorageName,
loggerFactory_.CreateLogger<ObjectStorage>());
}
4 changes: 3 additions & 1 deletion Adaptors/Redis/src/Options/Redis.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

namespace ArmoniK.Core.Adapters.Redis.Options;

internal class Redis
public class Redis
{
public const string SettingSection = nameof(Redis);
public string InstanceName { get; set; } = "";
Expand All @@ -30,4 +30,6 @@ internal class Redis
public bool Ssl { get; set; }
public string CredentialsPath { get; set; } = "";
public string CaPath { get; set; } = "";
public int MaxRetry { get; set; } = 5;
public int MsAfterRetry { get; set; } = 500;
}
8 changes: 8 additions & 0 deletions Adaptors/Redis/tests/ObjectStorageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Collections.Generic;
using System.IO;

using ArmoniK.Core.Common.Injection;
using ArmoniK.Core.Common.Storage;
using ArmoniK.Core.Common.Tests.TestBase;

Expand Down Expand Up @@ -55,6 +56,9 @@ public override void GetObjectStorageInstance()
{
"Components:ObjectStorage", "ArmoniK.Adapters.Redis.ObjectStorage"
},
{
"Redis:MaxRetry", "5"
},
};

var configuration = new ConfigurationManager();
Expand All @@ -78,6 +82,10 @@ public override void GetObjectStorageInstance()
.GetDatabase());
services.AddSingleton<IObjectStorageFactory, ObjectStorageFactory>();

services.AddOption(configuration,
Options.Redis.SettingSection,
out Options.Redis redisOptions);

var provider = services.BuildServiceProvider(new ServiceProviderOptions
{
ValidateOnBuild = true,
Expand Down

0 comments on commit 70ea766

Please sign in to comment.