Skip to content

Commit

Permalink
Shard collections at DB init when sharding is required
Browse files Browse the repository at this point in the history
  • Loading branch information
tschneider-aneo committed Sep 24, 2024
1 parent 13ce4ba commit 9fd5452
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 36 deletions.
7 changes: 5 additions & 2 deletions Adaptors/MongoDB/src/Common/IMongoDataModelMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public interface IMongoDataModelMapping<T>
string CollectionName { get; }

Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
IMongoCollection<T> collection,
Options.MongoDB options);
IMongoCollection<T> collection,
Options.MongoDB options);

Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options);
}
74 changes: 53 additions & 21 deletions Adaptors/MongoDB/src/Common/MongoCollectionProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ public class MongoCollectionProvider<TData, TModelMapping> : IInitializable, IAs
private bool isInitialized_;
private IMongoCollection<TData>? mongoCollection_;

public MongoCollectionProvider(Options.MongoDB options,
SessionProvider sessionProvider,
IMongoDatabase mongoDatabase,
public MongoCollectionProvider(Options.MongoDB options,
SessionProvider sessionProvider,
IMongoDatabase mongoDatabase,
ILogger<IMongoCollection<TData>> logger,
CancellationToken cancellationToken = default)
CancellationToken cancellationToken = default)
{
if (options.DataRetention == TimeSpan.Zero)
{
Expand All @@ -64,17 +64,17 @@ public MongoCollectionProvider(Options.MongoDB options,
/// <inheritdoc />
public Task<HealthCheckResult> Check(HealthCheckTag tag)
=> tag switch
{
HealthCheckTag.Startup or HealthCheckTag.Readiness => Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy("MongoCollection not initialized yet.")),
HealthCheckTag.Liveness => Task.FromResult(isInitialized_ && mongoCollection_ is null
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy("MongoCollection not initialized yet.")),
_ => throw new ArgumentOutOfRangeException(nameof(tag),
tag,
null),
};
{
HealthCheckTag.Startup or HealthCheckTag.Readiness => Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy("MongoCollection not initialized yet.")),
HealthCheckTag.Liveness => Task.FromResult(isInitialized_ && mongoCollection_ is null
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy("MongoCollection not initialized yet.")),
_ => throw new ArgumentOutOfRangeException(nameof(tag),
tag,
null),
};

/// <inheritdoc />
public async Task Init(CancellationToken cancellationToken)
Expand All @@ -87,13 +87,13 @@ public async Task Init(CancellationToken cancellationToken)
isInitialized_ = true;
}

private static async Task<IMongoCollection<TData>> InitializeAsync(Options.MongoDB options,
SessionProvider sessionProvider,
IMongoDatabase mongoDatabase,
private static async Task<IMongoCollection<TData>> InitializeAsync(Options.MongoDB options,
SessionProvider sessionProvider,
IMongoDatabase mongoDatabase,
ILogger<IMongoCollection<TData>> logger,
CancellationToken cancellationToken = default)
CancellationToken cancellationToken = default)
{
var model = new TModelMapping();
var model = new TModelMapping();
Exception? lastException = null;

for (var collectionRetry = 1; collectionRetry < options.MaxRetries; collectionRetry++)
Expand Down Expand Up @@ -167,9 +167,41 @@ await Task.Delay(1000 * indexRetry,
}
}

if (options.Sharding){

for (var indexRetry = 1; indexRetry < options.MaxRetries; indexRetry++)
{
lastException = null;
try
{
await model.ShardCollectionAsync(session,
options)
.ConfigureAwait(false);
break;
}
catch (MongoCommandException ex) when (ex.CodeName == "IndexOptionsConflict")
{
logger.LogWarning(ex,
"Index options conflict for {CollectionName} collection",
model.CollectionName);
break;
}
catch (Exception ex)
{
lastException = ex;
logger.LogDebug(ex,
"Retrying to shard {CollectionName} collection",
model.CollectionName);
await Task.Delay(1000 * indexRetry,
cancellationToken)
.ConfigureAwait(false);
}
}
}

if (lastException is not null)
{
throw new TimeoutException($"Init Indexes for {model.CollectionName}: Max retries reached",
throw new TimeoutException($"Init Index or shard for {model.CollectionName}: Max retries reached",
lastException);
}

Expand Down
2 changes: 2 additions & 0 deletions Adaptors/MongoDB/src/Object/ObjectDataModelMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,6 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public Task ShardCollectionAsync(IClientSessionHandle sessionHandle, Options.MongoDB options) => Task.CompletedTask;
}
7 changes: 6 additions & 1 deletion Adaptors/MongoDB/src/Options/MongoDB.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public class MongoDB

public QueueStorage QueueStorage { get; set; } = new();

public int MaxConnectionPoolSize { get; set; } = 500;
public int MaxConnectionPoolSize { get; set; } = 500;

public TimeSpan ServerSelectionTimeout { get; set; } = TimeSpan.FromMinutes(2);

public bool Sharding { get; set; }

public string AuthSource { get; set; } = "";
}
4 changes: 4 additions & 0 deletions Adaptors/MongoDB/src/ServiceCollectionExt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ public static IServiceCollection AddMongoClient(this IServiceCollection services
mongoOptions.DatabaseName);
}

if (!string.IsNullOrEmpty(mongoOptions.AuthSource)) {
connectionString = $"{connectionString}?authSource={mongoOptions.AuthSource}";
}

var settings = MongoClientSettings.FromUrl(new MongoUrl(connectionString));
settings.AllowInsecureTls = mongoOptions.AllowInsecureTls;
settings.UseTls = mongoOptions.Tls;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public string CollectionName
=> nameof(AuthData);

/// <inheritdoc />
public async Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
public async Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
IMongoCollection<AuthData> collection,
Options.MongoDB options)
Options.MongoDB options)
{
var indexModels = new[]
{
Expand All @@ -77,4 +77,6 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public Task ShardCollectionAsync(IClientSessionHandle sessionHandle, Options.MongoDB options) => Task.CompletedTask;
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public string CollectionName
=> nameof(RoleData);

/// <inheritdoc />
public async Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
public async Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
IMongoCollection<RoleData> collection,
Options.MongoDB options)
Options.MongoDB options)
{
var indexModels = new[]
{
Expand All @@ -73,4 +73,5 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}
public Task ShardCollectionAsync(IClientSessionHandle sessionHandle, Options.MongoDB options) => Task.CompletedTask;
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public string CollectionName
=> nameof(UserData);

/// <inheritdoc />
public async Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
public async Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
IMongoCollection<UserData> collection,
Options.MongoDB options)
Options.MongoDB options)
{
var indexModels = new[]
{
Expand All @@ -73,4 +73,6 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public Task ShardCollectionAsync(IClientSessionHandle sessionHandle, Options.MongoDB options) => Task.CompletedTask;
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,6 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public Task ShardCollectionAsync(IClientSessionHandle sessionHandle, Options.MongoDB options) => Task.CompletedTask;
}
11 changes: 9 additions & 2 deletions Adaptors/MongoDB/src/Table/DataModel/ResultDataModelMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public string CollectionName
=> nameof(Result);

/// <inheritdoc />
public async Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
public async Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
IMongoCollection<Result> collection,
Options.MongoDB options)
Options.MongoDB options)
{
var indexModels = new[]
{
Expand All @@ -92,4 +92,11 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public async Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options)
{
await sessionHandle.shardCollection(options, CollectionName)
.ConfigureAwait(false);
}
}
11 changes: 9 additions & 2 deletions Adaptors/MongoDB/src/Table/DataModel/SessionDataModelMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ public string CollectionName
=> nameof(SessionData);

/// <inheritdoc />
public async Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
public async Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
IMongoCollection<SessionData> collection,
Options.MongoDB options)
Options.MongoDB options)
{
var indexModels = new[]
{
Expand All @@ -137,4 +137,11 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public async Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options)
{
await sessionHandle.shardCollection(options, CollectionName)
.ConfigureAwait(false);
}
}
46 changes: 46 additions & 0 deletions Adaptors/MongoDB/src/Table/DataModel/ShardingExt.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// This file is part of the ArmoniK project
//
// Copyright (C) ANEO, 2021-2024. All rights reserved.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY, without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System.Collections.Generic;
using System.Threading.Tasks;

using MongoDB.Driver;
using MongoDB.Bson;

namespace ArmoniK.Core.Adapters.MongoDB.Table.DataModel;

public static class ShardingExt
{
public static async Task shardCollection(this IClientSessionHandle sessionHandle, Options.MongoDB options, string collectionName)
{
var adminDB = sessionHandle.Client.GetDatabase("admin");
var shardingCommandDict = new Dictionary<string, object>
{
{ "shardCollection", $"{options.DatabaseName}.{collectionName}"},
{
"key",
new Dictionary<string, object>{
{ "_id", "hashed"}
}
}
};

var shardingCommand = new BsonDocumentCommand<BsonDocument>(new BsonDocument(shardingCommandDict));
await adminDB.RunCommandAsync(shardingCommand)
.ConfigureAwait(false);
}
}
12 changes: 10 additions & 2 deletions Adaptors/MongoDB/src/Table/DataModel/TaskDataModelMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using MongoDB.Bson;

namespace ArmoniK.Core.Adapters.MongoDB.Table.DataModel;

Expand Down Expand Up @@ -184,9 +185,9 @@ public string CollectionName


/// <inheritdoc />
public async Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
public async Task InitializeIndexesAsync(IClientSessionHandle sessionHandle,
IMongoCollection<TaskData> collection,
Options.MongoDB options)
Options.MongoDB options)
{
var indexModels = new[]
{
Expand All @@ -210,4 +211,11 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}

public async Task ShardCollectionAsync(IClientSessionHandle sessionHandle,
Options.MongoDB options)
{
await sessionHandle.shardCollection(options, CollectionName)
.ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,5 @@ await collection.Indexes.CreateManyAsync(sessionHandle,
indexModels)
.ConfigureAwait(false);
}
public Task ShardCollectionAsync(IClientSessionHandle sessionHandle, Adapters.MongoDB.Options.MongoDB options) => Task.CompletedTask;
}

0 comments on commit 9fd5452

Please sign in to comment.