Skip to content

Commit

Permalink
Simplequeue improvements: dead letters and backoff policy (#827)
Browse files Browse the repository at this point in the history
## Motivation and Context (Why the change? What's the scenario?)

When working with simplequeue locally, it's difficult to debug errors
missing some better pipeline retry logic.

## High level description (Approach, Design)

* Add poison message handling to SimpleQueue
* Add backoff policy to SimpleQueue
  • Loading branch information
dluc authored Oct 8, 2024
1 parent 60440d6 commit 2bc6547
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 121 deletions.
2 changes: 1 addition & 1 deletion extensions/AzureQueues/AzureQueuesPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ private void DispatchMessages(object? sender, ElapsedEventArgs ev)
{
this._busy = true;

QueueMessage[] messages = Array.Empty<QueueMessage>();
QueueMessage[] messages = [];

// Fetch messages
try
Expand Down
2 changes: 1 addition & 1 deletion service/Core/FileSystem/DevTools/VolatileFileSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ private static string ValidateVolumeName(string volume)

if (s_invalidCharsRegex.Match(volume).Success)
{
throw new ArgumentException("The volume name contains some invalid chars or empty spaces");
throw new ArgumentException($"The volume name '{volume}' contains some invalid chars or empty spaces");
}

return volume;
Expand Down
55 changes: 55 additions & 0 deletions service/Core/Pipeline/Queue/DevTools/Message.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Text.Json.Serialization;

namespace Microsoft.KernelMemory.Pipeline.Queue.DevTools;

internal class Message
{
[JsonPropertyName("id")]
public string Id { get; set; } = string.Empty;

[JsonPropertyName("content")]
public string Content { get; set; } = string.Empty;

[JsonPropertyName("deliveries")]
public uint DequeueCount { get; set; } = 0;

[JsonPropertyName("created")]
public DateTimeOffset Created { get; set; } = DateTimeOffset.UtcNow;

[JsonPropertyName("schedule")]
public DateTimeOffset Schedule { get; set; } = DateTimeOffset.UtcNow;

[JsonPropertyName("lock")]
public DateTimeOffset LockedUntil { get; set; } = DateTimeOffset.MinValue;

[JsonPropertyName("error")]
public string LastError { get; set; } = string.Empty;

public bool IsLocked()
{
return this.LockedUntil > DateTimeOffset.UtcNow;
}

public bool IsTimeToRun()
{
return this.Schedule <= DateTimeOffset.UtcNow;
}

public void RunIn(TimeSpan delay)
{
this.Schedule = DateTimeOffset.UtcNow + delay;
}

public void Lock(int seconds)
{
this.LockedUntil = DateTimeOffset.UtcNow + TimeSpan.FromSeconds(Math.Max(0, seconds));
}

public void Unlock()
{
this.LockedUntil = DateTimeOffset.MinValue;
}
}
Loading

0 comments on commit 2bc6547

Please sign in to comment.