Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add priorities for plans #39

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 67 additions & 3 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ type Callback = dyn FnOnce(&mut Context);
/// A handler for an event type `E`
type EventHandler<E> = dyn Fn(&mut Context, E);

/// An enum to indicate the priority for plans at a given time.
///
/// Most plans will have `Normal` priority. Plans with priority `First` are
/// handled before all `Normal` plans, and those with priority `Last` are
/// handled after all `Normal` plans. In all cases ties between plans at the
/// same time and with the same priority are handled in the order of scheduling.
///
#[derive(PartialEq, Eq, Ord, PartialOrd)]
pub enum PlanPriority {
First,
Normal,
Last,
}

/// A manager for the state of a discrete-event simulation
///
/// Provides core simulation services including
Expand Down Expand Up @@ -44,7 +58,7 @@ type EventHandler<E> = dyn Fn(&mut Context, E);
/// occurred and have other modules take turns reacting to these occurrences.
///
pub struct Context {
plan_queue: Queue<Box<Callback>>,
plan_queue: Queue<Box<Callback>, PlanPriority>,
callback_queue: VecDeque<Box<Callback>>,
event_handlers: HashMap<TypeId, Box<dyn Any>>,
data_plugins: HashMap<TypeId, Box<dyn Any>>,
Expand Down Expand Up @@ -104,19 +118,37 @@ impl Context {
}
}

/// Add a plan to the future event list at the specified time
/// Add a plan to the future event list at the specified time with normal
/// priority
///
/// Returns an `Id` for the newly-added plan that can be used to cancel it
/// if needed.
/// # Panics
///
/// Panics if time is in the past, infinite, or NaN.
pub fn add_plan(&mut self, time: f64, callback: impl FnOnce(&mut Context) + 'static) -> Id {
self.add_plan_with_priority(time, callback, PlanPriority::Normal)
}

/// Add a plan to the future event list at the specified time and with the
/// specified priority
///
/// Returns an `Id` for the newly-added plan that can be used to cancel it
/// if needed.
/// # Panics
///
/// Panics if time is in the past, infinite, or NaN.
pub fn add_plan_with_priority(
&mut self,
time: f64,
callback: impl FnOnce(&mut Context) + 'static,
priority: PlanPriority,
) -> Id {
assert!(
!time.is_nan() && !time.is_infinite() && time >= self.current_time,
"Time is invalid"
);
self.plan_queue.add_plan(time, Box::new(callback))
self.plan_queue.add_plan(time, Box::new(callback), priority)
}

/// Cancel a plan that has been added to the queue
Expand Down Expand Up @@ -274,6 +306,21 @@ mod tests {
})
}

fn add_plan_with_priority(
context: &mut Context,
time: f64,
value: u32,
priority: PlanPriority,
) -> Id {
context.add_plan_with_priority(
time,
move |context| {
context.get_data_container_mut(ComponentA).push(value);
},
priority,
)
}

#[test]
#[should_panic(expected = "Time is invalid")]
fn negative_plan_time() {
Expand Down Expand Up @@ -412,6 +459,23 @@ mod tests {
assert_eq!(*context.get_data_container_mut(ComponentA), vec![1, 2]);
}

#[test]
fn plans_at_same_time_follow_priority() {
let mut context = Context::new();
add_plan(&mut context, 1.0, 1);
add_plan_with_priority(&mut context, 1.0, 5, PlanPriority::Last);
add_plan_with_priority(&mut context, 1.0, 3, PlanPriority::First);
add_plan(&mut context, 1.0, 2);
add_plan_with_priority(&mut context, 1.0, 6, PlanPriority::Last);
add_plan_with_priority(&mut context, 1.0, 4, PlanPriority::First);
context.execute();
assert_eq!(context.get_current_time(), 1.0);
assert_eq!(
*context.get_data_container_mut(ComponentA),
vec![3, 4, 1, 2, 5, 6]
);
}

#[derive(Copy, Clone)]
struct Event {
pub data: usize,
Expand Down
112 changes: 73 additions & 39 deletions src/plan.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! A priority queue that stores arbitrary data sorted by time
//!
//! Defines a `Queue<T>` that is intended to store a queue of items of type T,
//! sorted by `f64` time, called 'plans'. This queue has methods for adding
//! plans, cancelling plans, and retrieving the earliest plan in the queue.
//! Adding a plan is *O*(log(*n*)) while cancellation and retrieval are *O*(1).
//! Defines a `Queue<T, Q>` that is intended to store a queue of items of type
//! T - sorted by `f64` time and definable priority `Q` - called 'plans'.
//! This queue has methods for adding plans, cancelling plans, and retrieving
//! the earliest plan in the queue. Adding a plan is *O*(log(*n*)) while
//! cancellation and retrieval are *O*(1).
//!
//! This queue is used by `Context` to store future events where some callback
//! closure `FnOnce(&mut Context)` will be executed at a given point in time.
Expand All @@ -16,24 +17,27 @@ use std::{
/// A priority queue that stores arbitrary data sorted by time
///
/// Items of type `T` are stored in order by `f64` time and called `Plan<T>`.
/// Plans can have priorities given by some specified orderable type `Q`.
/// When plans are created they are sequentially assigned an `Id` that is a
/// wrapped `u64`. If two plans are scheduled for the same time the plan that is
/// scheduled first (i.e., that has the lowest id) is placed earlier.
/// wrapped `u64`. If two plans are scheduled for the same time then the plan
/// with the lowest priority is placed earlier. If two plans have the same time
/// and priority then the plan that is scheduled first (i.e., that has the
/// lowest id) is placed earlier.
///
/// The pair of time and plan id are stored in a binary heap of `Entry` objects.
/// The data payload of the event is stored in a hash map by plan id.
/// The time, plan id, and priority are stored in a binary heap of `Entry<P>`
/// objects. The data payload of the event is stored in a hash map by plan id.
/// Plan cancellation occurs by removing the corresponding entry from the data
/// hash map.
pub struct Queue<T> {
queue: BinaryHeap<Entry>,
pub struct Queue<T, P: Eq + PartialEq + Ord> {
queue: BinaryHeap<Entry<P>>,
data_map: HashMap<u64, T>,
plan_counter: u64,
}

impl<T> Queue<T> {
impl<T, P: Eq + PartialEq + Ord> Queue<T, P> {
/// Create a new empty `Queue<T>`
#[must_use]
pub fn new() -> Queue<T> {
pub fn new() -> Queue<T, P> {
Queue {
queue: BinaryHeap::new(),
data_map: HashMap::new(),
Expand All @@ -45,10 +49,10 @@ impl<T> Queue<T> {
///
/// Returns an `Id` for the newly-added plan that can be used to cancel it
/// if needed.
pub fn add_plan(&mut self, time: f64, data: T) -> Id {
pub fn add_plan(&mut self, time: f64, data: T, priority: P) -> Id {
// Add plan to queue, store data, and increment counter
let id = self.plan_counter;
self.queue.push(Entry { time, id });
self.queue.push(Entry { time, id, priority });
self.data_map.insert(id, data);
self.plan_counter += 1;
Id { id }
Expand Down Expand Up @@ -90,36 +94,49 @@ impl<T> Queue<T> {
}
}

impl<T> Default for Queue<T> {
impl<T, P: Eq + PartialEq + Ord> Default for Queue<T, P> {
fn default() -> Self {
Self::new()
}
}

/// A time and id pair used to order plans in the `Queue<T>`
/// A time, id, and priority object used to order plans in the `Queue<T>`
///
/// `Entry` objects are sorted in increasing order of time and then plan id
/// `Entry` objects are sorted in increasing order of time, priority and then
/// plan id
#[derive(PartialEq, Debug)]
struct Entry {
struct Entry<P: Eq + PartialEq + Ord> {
time: f64,
id: u64,
priority: P,
}

impl Eq for Entry {}
impl<P: Eq + PartialEq + Ord> Eq for Entry<P> {}

impl PartialOrd for Entry {
impl<P: Eq + PartialEq + Ord> PartialOrd for Entry<P> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

/// Entry objects are ordered in increasing order by time and then plan id
impl Ord for Entry {
/// Entry objects are ordered in increasing order by time, priority, and then
/// plan id
impl<P: Eq + PartialEq + Ord> Ord for Entry<P> {
fn cmp(&self, other: &Self) -> Ordering {
let time_ordering = self.time.partial_cmp(&other.time).unwrap().reverse();
match time_ordering {
// Break time ties in order of plan id
Ordering::Equal => self.id.cmp(&other.id).reverse(),
// Break time ties in order of priority and then plan id
Ordering::Equal => {
let priority_ordering = self
.priority
.partial_cmp(&other.priority)
.unwrap()
.reverse();
match priority_ordering {
Ordering::Equal => self.id.cmp(&other.id).reverse(),
_ => priority_ordering,
}
}
_ => time_ordering,
}
}
Expand All @@ -143,16 +160,16 @@ mod tests {

#[test]
fn empty_queue() {
let mut plan_queue = Queue::<()>::new();
let mut plan_queue = Queue::<(), ()>::new();
assert!(plan_queue.get_next_plan().is_none());
}

#[test]
fn add_plans() {
let mut plan_queue = Queue::new();
plan_queue.add_plan(1.0, 1);
plan_queue.add_plan(3.0, 3);
plan_queue.add_plan(2.0, 2);
plan_queue.add_plan(1.0, 1, ());
plan_queue.add_plan(3.0, 3, ());
plan_queue.add_plan(2.0, 2, ());

let next_plan = plan_queue.get_next_plan().unwrap();
assert_eq!(next_plan.time, 1.0);
Expand All @@ -170,10 +187,10 @@ mod tests {
}

#[test]
fn add_plans_at_same_time() {
fn add_plans_at_same_time_with_same_priority() {
let mut plan_queue = Queue::new();
plan_queue.add_plan(1.0, 1);
plan_queue.add_plan(1.0, 2);
plan_queue.add_plan(1.0, 1, ());
plan_queue.add_plan(1.0, 2, ());

let next_plan = plan_queue.get_next_plan().unwrap();
assert_eq!(next_plan.time, 1.0);
Expand All @@ -186,12 +203,29 @@ mod tests {
assert!(plan_queue.get_next_plan().is_none());
}

#[test]
fn add_plans_at_same_time_with_different_priority() {
let mut plan_queue = Queue::new();
plan_queue.add_plan(1.0, 1, 1);
plan_queue.add_plan(1.0, 2, 0);

let next_plan = plan_queue.get_next_plan().unwrap();
assert_eq!(next_plan.time, 1.0);
assert_eq!(next_plan.data, 2);

let next_plan = plan_queue.get_next_plan().unwrap();
assert_eq!(next_plan.time, 1.0);
assert_eq!(next_plan.data, 1);

assert!(plan_queue.get_next_plan().is_none());
}

#[test]
fn add_and_cancel_plans() {
let mut plan_queue = Queue::new();
plan_queue.add_plan(1.0, 1);
let plan_to_cancel = plan_queue.add_plan(2.0, 2);
plan_queue.add_plan(3.0, 3);
plan_queue.add_plan(1.0, 1, ());
let plan_to_cancel = plan_queue.add_plan(2.0, 2, ());
plan_queue.add_plan(3.0, 3, ());
plan_queue.cancel_plan(&plan_to_cancel);

let next_plan = plan_queue.get_next_plan().unwrap();
Expand All @@ -208,14 +242,14 @@ mod tests {
#[test]
fn add_and_get_plans() {
let mut plan_queue = Queue::new();
plan_queue.add_plan(1.0, 1);
plan_queue.add_plan(2.0, 2);
plan_queue.add_plan(1.0, 1, ());
plan_queue.add_plan(2.0, 2, ());

let next_plan = plan_queue.get_next_plan().unwrap();
assert_eq!(next_plan.time, 1.0);
assert_eq!(next_plan.data, 1);

plan_queue.add_plan(1.5, 3);
plan_queue.add_plan(1.5, 3, ());

let next_plan = plan_queue.get_next_plan().unwrap();
assert_eq!(next_plan.time, 1.5);
Expand All @@ -231,8 +265,8 @@ mod tests {
#[test]
#[should_panic(expected = "Plan does not exist")]
fn cancel_invalid_plan() {
let mut plan_queue = Queue::<()>::new();
let plan_to_cancel = plan_queue.add_plan(1.0, ());
let mut plan_queue = Queue::new();
let plan_to_cancel = plan_queue.add_plan(1.0, (), ());
plan_queue.get_next_plan();
plan_queue.cancel_plan(&plan_to_cancel);
}
Expand Down
Loading