Skip to content

Commit

Permalink
fix: too many queries on metrics exporter on mongodb (#257)
Browse files Browse the repository at this point in the history
  • Loading branch information
aneojgurhem authored Feb 23, 2023
2 parents 9186131 + 7a2e204 commit f528fd7
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 9 deletions.
28 changes: 25 additions & 3 deletions Common/src/Utils/ExecutionSingleizer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -28,7 +29,16 @@ namespace ArmoniK.Core.Common.Utils;
/// <typeparam name="T">Type of the return object</typeparam>
public class ExecutionSingleizer<T> : IDisposable
{
private Handle handle_ = new();
private readonly long tickValidity_;
private Handle handle_ = new();


/// <summary>
/// Allow initialization of <see cref="ExecutionSingleizer" />
/// </summary>
/// <param name="timeValidity">Results from the execution will be in cache during timeValidity</param>
public ExecutionSingleizer(TimeSpan timeValidity = default)
=> tickValidity_ = (long)Math.Ceiling(Stopwatch.Frequency * timeValidity.TotalSeconds);

/// <inheritdoc />
public void Dispose()
Expand All @@ -51,8 +61,8 @@ public async Task<T> Call(Func<CancellationToken, Task<T>> func,
var currentHandle = handle_;

// If there is no waiters, the task is complete (success or failed), and no thread is currently running it.
// We therefore need to call func again
if (currentHandle.Waiters == 0)
// We therefore need to call func again if the data has expired.
if (currentHandle.Waiters == 0 && Stopwatch.GetTimestamp() > currentHandle.ValidUntil)
{
// Prepare new handle, with new cancellation token source and new task
var cts = new CancellationTokenSource();
Expand Down Expand Up @@ -121,6 +131,13 @@ public async Task<T> Call(Func<CancellationToken, Task<T>> func,
}
finally
{
// Reset the validity of the result once the result is available.
// This is done by all threads in order to avoid race condition with the Waiters decrement.
if (!currentHandle.CancellationTokenSource.IsCancellationRequested)
{
currentHandle.ValidUntil = Stopwatch.GetTimestamp() + tickValidity_;
}

// Remove the current thread from the list of waiters.
var i = Interlocked.Decrement(ref currentHandle.Waiters);

Expand Down Expand Up @@ -154,6 +171,11 @@ public async Task<T> Call(Func<CancellationToken, Task<T>> func,
/// </summary>
private sealed class Handle : IDisposable
{
/// <summary>
/// Specify the timestamp until the data is valid
/// </summary>
public long ValidUntil = long.MinValue;

/// <summary>
/// Number of threads waiting for the result.
/// </summary>
Expand Down
37 changes: 37 additions & 0 deletions Common/tests/Utils/ExecutionSingleizerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

using System;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -207,6 +208,42 @@ public void ConcurrentCancelExecutionShouldFail()
val_);
}


[Test]
public async Task CheckExpire()
{
var single = new ExecutionSingleizer<int>(TimeSpan.FromMilliseconds(100));
var i = await single.Call(ct => Set(1,
0,
ct))
.ConfigureAwait(false);
Assert.AreEqual(1,
i);
Assert.AreEqual(1,
val_);

i = await single.Call(ct => Set(2,
0,
ct))
.ConfigureAwait(false);
Assert.AreEqual(1,
i);
Assert.AreEqual(1,
val_);

await Task.Delay(150)
.ConfigureAwait(false);

i = await single.Call(ct => Set(3,
0,
ct))
.ConfigureAwait(false);
Assert.AreEqual(3,
i);
Assert.AreEqual(3,
val_);
}

private async Task<int> Set(int i,
int delay,
CancellationToken cancellationToken)
Expand Down
10 changes: 5 additions & 5 deletions Control/Metrics/src/ArmoniKMeter.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
// This file is part of the ArmoniK project
//
//
// Copyright (C) ANEO, 2021-2023. All rights reserved.
//
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY, without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

Expand Down Expand Up @@ -51,7 +51,7 @@ public ArmoniKMeter(ITaskTable taskTable,
taskTable_ = taskTable;
logger_ = logger;
gauges_ = new Dictionary<Tuple<string, string>, ObservableGauge<long>>();
measurements_ = new ExecutionSingleizer<IDictionary<Tuple<string, string>, long>>();
measurements_ = new ExecutionSingleizer<IDictionary<Tuple<string, string>, long>>(TimeSpan.FromSeconds(5));

CreateObservableCounter("test",
() => i_++);
Expand Down
2 changes: 1 addition & 1 deletion Control/PartitionMetrics/src/ArmoniKMeter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public ArmoniKMeter(IPartitionTable partitionTable,
partitionTable_ = partitionTable;
logger_ = logger;
gauges_ = new Dictionary<string, ObservableGauge<long>>();
measurements_ = new ExecutionSingleizer<IDictionary<string, long>>();
measurements_ = new ExecutionSingleizer<IDictionary<string, long>>(TimeSpan.FromSeconds(5));

CreateObservableCounter("test",
() => i_++);
Expand Down

0 comments on commit f528fd7

Please sign in to comment.