Skip to content

Commit

Permalink
fix: Timeout handling after acquisition (#774)
Browse files Browse the repository at this point in the history
# Motivation

Since the refactor to use the exception manager, tasks that were
acquired, but not processed because the runningTaskProcessor did not
finish executing the current task in the allotted time were not release
in the ArmoniK sense.
The message from the queue was put back into the queue, but the task
itself remained in the dispatched state (acquired by the current agent).

This had two implications: such a task would need more work to be
re-acquired by another pod by using the message duplication algorithm,
and the timeout was considered like an actual error of the agent, and
would make the agent unhealthy after a few acquire timeouts.

# Description

This PR adds a proper catch for the timeout, and release the task in the
catch.

# Testing

A new test has been added to ensure that the pollster does not produce
any error when the timeout occurs, and that the task is actually
released properly.

# Impact

This should help with long running tasks and avoid agent restarts.
It should also help improve the performance of the orchestration on long
running tasks.

# Additional Information

NA

# Checklist

- [X] My code adheres to the coding and style guidelines of the project.
- [X] I have performed a self-review of my code.
- [ ] I have commented my code, particularly in hard-to-understand
areas.
- [ ] I have made corresponding changes to the documentation.
- [X] I have thoroughly tested my modifications and added tests when
necessary.
- [ ] Tests pass locally and in the CI.
- [ ] I have assessed the performance impact of my modifications.
  • Loading branch information
lemaitre-aneo authored Oct 11, 2024
2 parents e8b9d88 + b619582 commit 220a1d4
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 11 deletions.
24 changes: 16 additions & 8 deletions Common/src/Pollster/Pollster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,22 @@ await messages.Current.DisposeIgnoreErrorAsync(logger_)
await taskHandler.PreProcessing()
.ConfigureAwait(false);

await runningTaskQueue_.WriteAsync(taskHandler,
pollsterOptions_.TimeoutBeforeNextAcquisition,
exceptionManager_.EarlyCancellationToken)
.ConfigureAwait(false);

// TaskHandler has been successfully sent to the next stage of the pipeline
// So remove the automatic dispose of the TaskHandler
taskHandlerDispose.Reset();
try
{
await runningTaskQueue_.WriteAsync(taskHandler,
pollsterOptions_.TimeoutBeforeNextAcquisition,
exceptionManager_.EarlyCancellationToken)
.ConfigureAwait(false);

// TaskHandler has been successfully sent to the next stage of the pipeline
// So remove the automatic dispose of the TaskHandler
taskHandlerDispose.Reset();
}
catch (TimeoutException)
{
await taskHandler.ReleaseAndPostponeTask()
.ConfigureAwait(false);
}
}
catch (Exception e)
{
Expand Down
37 changes: 36 additions & 1 deletion Common/tests/Helpers/TestPollsterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
using MongoDB.Bson;
using MongoDB.Driver;

using NUnit.Framework;

namespace ArmoniK.Core.Common.Tests.Helpers;

public class TestPollsterProvider : IDisposable
Expand Down Expand Up @@ -76,7 +78,9 @@ public class TestPollsterProvider : IDisposable
public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler,
IAgentHandler agentHandler,
IPullQueueStorage pullQueueStorage,
TimeSpan? graceDelay = null)
TimeSpan? graceDelay = null,
TimeSpan? acquireTimeout = null,
int maxError = 5)
{
graceDelay_ = graceDelay;
var logger = NullLogger.Instance;
Expand Down Expand Up @@ -128,6 +132,17 @@ public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler,
: graceDelay
.ToString()
},
{
$"{Injection.Options.Pollster.SettingSection}:{nameof(Injection.Options.Pollster.TimeoutBeforeNextAcquisition)}",
acquireTimeout is null
? TimeSpan.FromSeconds(10)
.ToString()
: acquireTimeout.ToString()
},
{
$"{Injection.Options.Pollster.SettingSection}:{nameof(Injection.Options.Pollster.MaxErrorAllowed)}",
maxError.ToString()
},
{
$"{Injection.Options.Pollster.SettingSection}:{nameof(Injection.Options.Pollster.SharedCacheFolder)}",
Path.Combine(Path.GetTempPath(),
Expand Down Expand Up @@ -226,4 +241,24 @@ await Task.Delay(delay,
Lifetime.StopApplication();
});

public void AssertFailAfterError(int nbError = 1)
{
for (var i = 0; i < nbError; i++)
{
if (ExceptionManager.Failed)
{
Assert.Fail($"ExceptionManager failed after {i} errors while it was expected to failed after {nbError}");
}

ExceptionManager.RecordError(null,
null,
"Dummy Error");
}

if (!ExceptionManager.Failed)
{
Assert.Fail($"ExceptionManager did not failed while it was expected to failed after {nbError}");
}
}
}
120 changes: 118 additions & 2 deletions Common/tests/Pollster/PollsterTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ await testServiceProvider.Pollster.Init(CancellationToken.None)
Assert.AreEqual(HealthStatus.Healthy,
(await testServiceProvider.Pollster.Check(HealthCheckTag.Startup)
.ConfigureAwait(false)).Status);

testServiceProvider.AssertFailAfterError(6);
}

[Test]
Expand Down Expand Up @@ -411,6 +413,8 @@ await testServiceProvider.Pollster.Init(CancellationToken.None)
Assert.DoesNotThrowAsync(() => stop);
Assert.AreEqual(Array.Empty<string>(),
testServiceProvider.Pollster.TaskProcessing);

testServiceProvider.AssertFailAfterError(6);
}

public class WaitWorkerStreamHandler : IWorkerStreamHandler
Expand Down Expand Up @@ -479,12 +483,119 @@ await testServiceProvider.Pollster.Init(CancellationToken.None)

Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop());
Assert.DoesNotThrowAsync(() => stop);
Assert.False(testServiceProvider.ExceptionManager.Failed);

Assert.AreEqual(TaskStatus.Completed,
await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted,
CancellationToken.None)
.ConfigureAwait(false));

testServiceProvider.AssertFailAfterError(6);
}

[Test]
[Timeout(10000)]
public async Task ExecuteTaskTimeoutAcquire()
{
var mockPullQueueStorage = new SimplePullQueueStorageChannel();
var waitWorkerStreamHandler = new WaitWorkerStreamHandler(1000);
var simpleAgentHandler = new SimpleAgentHandler();

using var testServiceProvider = new TestPollsterProvider(waitWorkerStreamHandler,
simpleAgentHandler,
mockPullQueueStorage,
TimeSpan.FromMilliseconds(100),
TimeSpan.FromMilliseconds(100),
0);

var (sessionId, _, taskSubmitted) = await InitSubmitter(testServiceProvider.Submitter,
testServiceProvider.PartitionTable,
testServiceProvider.ResultTable,
testServiceProvider.SessionTable,
CancellationToken.None)
.ConfigureAwait(false);

await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandler
{
CancellationToken = CancellationToken.None,
Status = QueueMessageStatus.Waiting,
MessageId = Guid.NewGuid()
.ToString(),
TaskId = taskSubmitted,
})
.ConfigureAwait(false);

var expectedOutput3 = "ExpectedOutput3";
await testServiceProvider.ResultTable.Create(new[]
{
new Result(sessionId,
expectedOutput3,
"",
"",
"",
ResultStatus.Created,
new List<string>(),
DateTime.UtcNow,
0,
Array.Empty<byte>()),
},
CancellationToken.None)
.ConfigureAwait(false);

var requests = await testServiceProvider.Submitter.CreateTasks(sessionId,
sessionId,
new TaskOptions(),
new List<TaskRequest>
{
new(new[]
{
expectedOutput3,
},
new List<string>(),
new List<ReadOnlyMemory<byte>>
{
new(Encoding.ASCII.GetBytes("AAAA")),
}.ToAsyncEnumerable()),
}.ToAsyncEnumerable(),
CancellationToken.None)
.ConfigureAwait(false);

var sessionData = await testServiceProvider.SessionTable.GetSessionAsync(sessionId,
CancellationToken.None)
.ConfigureAwait(false);

await testServiceProvider.Submitter.FinalizeTaskCreation(requests,
sessionData,
sessionId,
CancellationToken.None)
.ConfigureAwait(false);

var taskSubmitted2 = requests.First()
.TaskId;

await mockPullQueueStorage.Channel.Writer.WriteAsync(new SimpleQueueMessageHandler
{
CancellationToken = CancellationToken.None,
Status = QueueMessageStatus.Waiting,
MessageId = Guid.NewGuid()
.ToString(),
TaskId = taskSubmitted2,
})
.ConfigureAwait(false);

await testServiceProvider.Pollster.Init(CancellationToken.None)
.ConfigureAwait(false);

var stop = testServiceProvider.StopApplicationAfter(TimeSpan.FromSeconds(2));

Assert.DoesNotThrowAsync(() => testServiceProvider.Pollster.MainLoop());
Assert.DoesNotThrowAsync(() => stop);

Assert.AreEqual(TaskStatus.Submitted,
await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted2,
CancellationToken.None)
.ConfigureAwait(false));

testServiceProvider.AssertFailAfterError();
}

[Test]
Expand Down Expand Up @@ -535,6 +646,8 @@ await Task.Delay(TimeSpan.FromMilliseconds(200),
await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted,
CancellationToken.None)
.ConfigureAwait(false));

testServiceProvider.AssertFailAfterError(5);
}

[Test]
Expand Down Expand Up @@ -593,7 +706,6 @@ await testServiceProvider.Pollster.StopCancelledTask()

Assert.DoesNotThrowAsync(() => mainLoopTask);
Assert.DoesNotThrowAsync(() => stop);
Assert.False(testServiceProvider.ExceptionManager.Failed);

Assert.That(await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted,
CancellationToken.None)
Expand All @@ -603,6 +715,8 @@ await testServiceProvider.Pollster.StopCancelledTask()

Assert.AreEqual(Array.Empty<string>(),
testServiceProvider.Pollster.TaskProcessing);

testServiceProvider.AssertFailAfterError(5);
}

public static IEnumerable ExecuteTooManyErrorShouldFailTestCase
Expand Down Expand Up @@ -733,5 +847,7 @@ await testServiceProvider.TaskTable.GetTaskStatus(taskSubmitted,
.ConfigureAwait(false));
Assert.AreEqual(Array.Empty<string>(),
testServiceProvider.Pollster.TaskProcessing);

testServiceProvider.AssertFailAfterError(5);
}
}

0 comments on commit 220a1d4

Please sign in to comment.