diff --git a/Adaptors/Redis/src/ObjectStorage.cs b/Adaptors/Redis/src/ObjectStorage.cs index 4c0a6f00a..ba6d89062 100644 --- a/Adaptors/Redis/src/ObjectStorage.cs +++ b/Adaptors/Redis/src/ObjectStorage.cs @@ -38,18 +38,22 @@ public class ObjectStorage : IObjectStorage private readonly ILogger logger_; private readonly string objectStorageName_; private readonly IDatabaseAsync redis_; + private readonly Options.Redis redisOptions_; /// /// implementation for Redis /// /// Connection to redis database + /// Redis object storage options /// Name of the object storage used to differentiate them /// Logger used to print logs public ObjectStorage(IDatabaseAsync redis, + Options.Redis redisOptions, string objectStorageName, ILogger logger) { redis_ = redis; + redisOptions_ = redisOptions; objectStorageName_ = objectStorageName; logger_ = logger; } @@ -59,21 +63,23 @@ public async Task AddOrUpdateAsync(string key, IAsyncEnumerable valueChunks, CancellationToken cancellationToken = default) { - using var _ = logger_.LogFunction(objectStorageName_ + key); + var storageNameKey = objectStorageName_ + key; + using var _ = logger_.LogFunction(storageNameKey); var idx = 0; var taskList = new List(); await foreach (var chunk in valueChunks.WithCancellation(cancellationToken) .ConfigureAwait(false)) { - taskList.Add(redis_.StringSetAsync(objectStorageName_ + key + "_" + idx, - chunk)); + var storageNameKeyWithIndex = $"{storageNameKey}_{idx}"; + taskList.Add(PerformActionWithRetry(() => redis_.StringSetAsync(storageNameKeyWithIndex, + chunk))); ++idx; } - await redis_.StringSetAsync(objectStorageName_ + key + "_count", - idx) - .ConfigureAwait(false); + await PerformActionWithRetry(() => redis_.StringSetAsync(objectStorageName_ + key + "_count", + idx)) + .ConfigureAwait(false); await taskList.WhenAll() .ConfigureAwait(false); } @@ -83,21 +89,22 @@ public async Task AddOrUpdateAsync(string key, IAsyncEnumerable> valueChunks, CancellationToken cancellationToken = default) { - using var _ = logger_.LogFunction(objectStorageName_ + key); + var storageNameKey = objectStorageName_ + key; + using var _ = logger_.LogFunction(storageNameKey); var idx = 0; var taskList = new List(); await foreach (var chunk in valueChunks.WithCancellation(cancellationToken) .ConfigureAwait(false)) { - taskList.Add(redis_.StringSetAsync(objectStorageName_ + key + "_" + idx, - chunk)); + var storageNameKeyWithIndex = $"{storageNameKey}_{idx}"; + taskList.Add(PerformActionWithRetry(() => redis_.StringSetAsync(storageNameKeyWithIndex, + chunk))); ++idx; } - await redis_.StringSetAsync(objectStorageName_ + key + "_count", - idx) - .ConfigureAwait(false); + taskList.Add(PerformActionWithRetry(() => redis_.StringSetAsync(storageNameKey + "_count", + idx))); await taskList.WhenAll() .ConfigureAwait(false); } @@ -107,8 +114,8 @@ public async IAsyncEnumerable GetValuesAsync(string [EnumeratorCancellation] CancellationToken cancellationToken = default) { using var _ = logger_.LogFunction(objectStorageName_ + key); - var value = await redis_.StringGetAsync(objectStorageName_ + key + "_count") - .ConfigureAwait(false); + var value = await PerformActionWithRetry(() => redis_.StringGetAsync(objectStorageName_ + key + "_count")) + .ConfigureAwait(false); if (!value.HasValue) { @@ -124,7 +131,7 @@ public async IAsyncEnumerable GetValuesAsync(string foreach (var chunkTask in Enumerable.Range(0, valuesCount) - .Select(index => redis_.StringGetAsync(objectStorageName_ + key + "_" + index)) + .Select(index => PerformActionWithRetry(() => redis_.StringGetAsync(objectStorageName_ + key + "_" + index))) .ToList()) { yield return (await chunkTask.ConfigureAwait(false))!; @@ -136,8 +143,8 @@ public async Task TryDeleteAsync(string key, CancellationToken cancellationToken = default) { using var _ = logger_.LogFunction(objectStorageName_ + key); - var value = await redis_.StringGetAsync(objectStorageName_ + key + "_count") - .ConfigureAwait(false); + var value = await PerformActionWithRetry(() => redis_.StringGetAsync(objectStorageName_ + key + "_count")) + .ConfigureAwait(false); if (!value.HasValue) { @@ -155,11 +162,45 @@ public async Task TryDeleteAsync(string key, .ToArray(); - return await redis_.KeyDeleteAsync(keyList) - .ConfigureAwait(false) == valuesCount + 1; + return await PerformActionWithRetry(() => redis_.KeyDeleteAsync(keyList)) + .ConfigureAwait(false) == valuesCount + 1; } /// public IAsyncEnumerable ListKeysAsync(CancellationToken cancellationToken = default) => throw new NotImplementedException(); + + private async Task PerformActionWithRetry(Func> action) + { + for (var retryCount = 0; retryCount < redisOptions_.MaxRetry; retryCount++) + { + try + { + return await action() + .ConfigureAwait(false); + } + catch (RedisTimeoutException ex) + { + if (retryCount + 1 >= redisOptions_.MaxRetry) + { + logger_.LogError(ex, + "A RedisTimeoutException occurred {MaxRetryCount} times for the same action", + redisOptions_.MaxRetry); + throw; + } + + var retryDelay = (retryCount + 1) * (retryCount + 1) * redisOptions_.MsAfterRetry; + logger_.LogWarning(ex, + "A RedisTimeoutException occurred {retryCount}/{redisOptions_.MaxRetry}, retry in {retryDelay} ms", + retryCount, + redisOptions_.MaxRetry, + retryDelay); + await Task.Delay(retryDelay) + .ConfigureAwait(false); + } + } + + throw new RedisTimeoutException("A RedisTimeoutException occurred", + CommandStatus.Unknown); + } } diff --git a/Adaptors/Redis/src/ObjectStorageFactory.cs b/Adaptors/Redis/src/ObjectStorageFactory.cs index 213a86823..84c671c5b 100644 --- a/Adaptors/Redis/src/ObjectStorageFactory.cs +++ b/Adaptors/Redis/src/ObjectStorageFactory.cs @@ -33,14 +33,17 @@ public class ObjectStorageFactory : IObjectStorageFactory { private readonly ILoggerFactory loggerFactory_; private readonly IDatabaseAsync redis_; + private readonly Options.Redis redisOptions_; private bool isInitialized_; public ObjectStorageFactory(IDatabaseAsync redis, + Options.Redis redisOptions, ILoggerFactory loggerFactory) { redis_ = redis; + redisOptions_ = redisOptions; loggerFactory_ = loggerFactory; } @@ -79,6 +82,7 @@ public Task Check(HealthCheckTag tag) public IObjectStorage CreateObjectStorage(string objectStorageName) => new ObjectStorage(redis_, + redisOptions_, objectStorageName, loggerFactory_.CreateLogger()); } diff --git a/Adaptors/Redis/src/Options/Redis.cs b/Adaptors/Redis/src/Options/Redis.cs index 7c52a59ce..dd45ba83e 100644 --- a/Adaptors/Redis/src/Options/Redis.cs +++ b/Adaptors/Redis/src/Options/Redis.cs @@ -17,7 +17,7 @@ namespace ArmoniK.Core.Adapters.Redis.Options; -internal class Redis +public class Redis { public const string SettingSection = nameof(Redis); public string InstanceName { get; set; } = ""; @@ -30,4 +30,6 @@ internal class Redis public bool Ssl { get; set; } public string CredentialsPath { get; set; } = ""; public string CaPath { get; set; } = ""; + public int MaxRetry { get; set; } = 5; + public int MsAfterRetry { get; set; } = 500; } diff --git a/Adaptors/Redis/tests/ObjectStorageTests.cs b/Adaptors/Redis/tests/ObjectStorageTests.cs index c651680ad..6f4b2b866 100644 --- a/Adaptors/Redis/tests/ObjectStorageTests.cs +++ b/Adaptors/Redis/tests/ObjectStorageTests.cs @@ -19,6 +19,7 @@ using System.Collections.Generic; using System.IO; +using ArmoniK.Core.Common.Injection; using ArmoniK.Core.Common.Storage; using ArmoniK.Core.Common.Tests.TestBase; @@ -55,6 +56,9 @@ public override void GetObjectStorageInstance() { "Components:ObjectStorage", "ArmoniK.Adapters.Redis.ObjectStorage" }, + { + "Redis:MaxRetry", "5" + }, }; var configuration = new ConfigurationManager(); @@ -78,6 +82,10 @@ public override void GetObjectStorageInstance() .GetDatabase()); services.AddSingleton(); + services.AddOption(configuration, + Options.Redis.SettingSection, + out Options.Redis redisOptions); + var provider = services.BuildServiceProvider(new ServiceProviderOptions { ValidateOnBuild = true,