Skip to content

Commit

Permalink
Make the relationship between Timer and TimerKey more explicit
Browse files Browse the repository at this point in the history
This commit introduces factory methods on the Timer which creates the right
timer kind and corresponding timer key. This prevents the accidental mixup
of Timers with their keys.
  • Loading branch information
tillrohrmann committed Apr 30, 2024
1 parent 29ac700 commit 64c7193
Show file tree
Hide file tree
Showing 15 changed files with 197 additions and 170 deletions.
24 changes: 13 additions & 11 deletions crates/partition-store/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ macro_rules! define_table_key {
use crate::TableKind;
pub(crate) use define_table_key;
use restate_storage_api::deduplication_table::ProducerId;
use restate_storage_api::timer_table::TimerKind;
use restate_storage_api::timer_table::TimerKeyKind;
use restate_storage_api::StorageError;
use restate_types::identifiers::{InvocationUuid, PartitionId};

Expand Down Expand Up @@ -487,27 +487,27 @@ impl KeyCodec for ProducerId {
}
}

impl KeyCodec for TimerKind {
impl KeyCodec for TimerKeyKind {
fn encode<B: BufMut>(&self, target: &mut B) {
assert!(
self.serialized_length() <= target.remaining_mut(),
"serialization buffer has not enough space to serialize TimerKind: '{}' bytes required",
self.serialized_length()
);
match self {
TimerKind::Invoke { invocation_uuid } => {
TimerKeyKind::Invoke { invocation_uuid } => {
target.put_u8(0);
invocation_uuid.encode(target);
}
TimerKind::CompleteJournalEntry {
TimerKeyKind::CompleteJournalEntry {
invocation_uuid,
journal_index,
} => {
target.put_u8(1);
invocation_uuid.encode(target);
journal_index.encode(target);
}
TimerKind::CleanInvocationStatus { invocation_uuid } => {
TimerKeyKind::CleanInvocationStatus { invocation_uuid } => {
target.put_u8(2);
invocation_uuid.encode(target);
}
Expand All @@ -524,19 +524,19 @@ impl KeyCodec for TimerKind {
Ok(match source.get_u8() {
0 => {
let invocation_uuid = InvocationUuid::decode(source)?;
TimerKind::Invoke { invocation_uuid }
TimerKeyKind::Invoke { invocation_uuid }
}
1 => {
let invocation_uuid = InvocationUuid::decode(source)?;
let journal_index = u32::decode(source)?;
TimerKind::CompleteJournalEntry {
TimerKeyKind::CompleteJournalEntry {
invocation_uuid,
journal_index,
}
}
2 => {
let invocation_uuid = InvocationUuid::decode(source)?;
TimerKind::CleanInvocationStatus { invocation_uuid }
TimerKeyKind::CleanInvocationStatus { invocation_uuid }
}
i => {
return Err(StorageError::Generic(anyhow!(
Expand All @@ -549,15 +549,17 @@ impl KeyCodec for TimerKind {

fn serialized_length(&self) -> usize {
1 + match self {
TimerKind::Invoke { invocation_uuid } => KeyCodec::serialized_length(invocation_uuid),
TimerKind::CompleteJournalEntry {
TimerKeyKind::Invoke { invocation_uuid } => {
KeyCodec::serialized_length(invocation_uuid)
}
TimerKeyKind::CompleteJournalEntry {
invocation_uuid,
journal_index,
} => {
KeyCodec::serialized_length(invocation_uuid)
+ KeyCodec::serialized_length(journal_index)
}
TimerKind::CleanInvocationStatus { invocation_uuid } => {
TimerKeyKind::CleanInvocationStatus { invocation_uuid } => {
KeyCodec::serialized_length(invocation_uuid)
}
}
Expand Down
82 changes: 43 additions & 39 deletions crates/partition-store/src/timer_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{PartitionStore, RocksDBTransaction, StorageAccess};
use crate::{TableScan, TableScanIterationDecision};
use futures::Stream;
use futures_util::stream;
use restate_storage_api::timer_table::{Timer, TimerKey, TimerKind, TimerTable};
use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind, TimerTable};
use restate_storage_api::{Result, StorageError};
use restate_types::identifiers::{InvocationUuid, PartitionId};
use restate_types::storage::StorageCodec;
Expand All @@ -26,7 +26,7 @@ define_table_key!(
TimersKey(
partition_id: PartitionId,
timestamp: u64,
kind: TimerKind,
kind: TimerKeyKind,
)
);

Expand Down Expand Up @@ -69,32 +69,32 @@ fn exclusive_start_key_range(
) -> TableScan<TimersKey> {
if let Some(timer_key) = timer_key {
let next_timer_key = match timer_key.kind {
TimerKind::Invoke { invocation_uuid } => {
TimerKeyKind::Invoke { invocation_uuid } => {
let incremented_invocation_uuid = increment_invocation_uuid(invocation_uuid);
TimerKey {
timestamp: timer_key.timestamp,
kind: TimerKind::Invoke {
kind: TimerKeyKind::Invoke {
invocation_uuid: incremented_invocation_uuid,
},
}
}
TimerKind::CompleteJournalEntry {
TimerKeyKind::CompleteJournalEntry {
invocation_uuid,
journal_index,
} => TimerKey {
timestamp: timer_key.timestamp,
kind: TimerKind::CompleteJournalEntry {
kind: TimerKeyKind::CompleteJournalEntry {
invocation_uuid,
journal_index: journal_index
.checked_add(1)
.expect("journal index should be smaller than u64::MAX"),
},
},
TimerKind::CleanInvocationStatus { invocation_uuid } => {
TimerKeyKind::CleanInvocationStatus { invocation_uuid } => {
let incremented_invocation_uuid = increment_invocation_uuid(invocation_uuid);
TimerKey {
timestamp: timer_key.timestamp,
kind: TimerKind::CleanInvocationStatus {
kind: TimerKeyKind::CleanInvocationStatus {
invocation_uuid: incremented_invocation_uuid,
},
}
Expand Down Expand Up @@ -209,7 +209,7 @@ mod tests {
use super::*;
use crate::timer_table::TimerKey;
use rand::Rng;
use restate_storage_api::timer_table::TimerKindDiscriminants;
use restate_storage_api::timer_table::TimerKeyKindDiscriminants;
use restate_types::identifiers::InvocationUuid;
use strum::VariantArray;

Expand All @@ -219,7 +219,7 @@ mod tests {
#[test]
fn round_trip_complete_journal_entry_kind() {
let key = TimerKey {
kind: TimerKind::CompleteJournalEntry {
kind: TimerKeyKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 1448,
},
Expand All @@ -235,7 +235,7 @@ mod tests {
#[test]
fn round_trip_invoke_kind() {
let key = TimerKey {
kind: TimerKind::Invoke {
kind: TimerKeyKind::Invoke {
invocation_uuid: FIXTURE_INVOCATION,
},
timestamp: 87654321,
Expand All @@ -250,7 +250,7 @@ mod tests {
#[test]
fn round_trip_clean_invocation_status_kind() {
let key = TimerKey {
kind: TimerKind::CleanInvocationStatus {
kind: TimerKeyKind::CleanInvocationStatus {
invocation_uuid: FIXTURE_INVOCATION,
},
timestamp: 87654321,
Expand All @@ -265,14 +265,14 @@ mod tests {
#[test]
fn test_lexicographical_sorting_by_timestamp() {
let kinds = [
TimerKind::CompleteJournalEntry {
TimerKeyKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 0,
},
TimerKind::Invoke {
TimerKeyKind::Invoke {
invocation_uuid: FIXTURE_INVOCATION,
},
TimerKind::CleanInvocationStatus {
TimerKeyKind::CleanInvocationStatus {
invocation_uuid: FIXTURE_INVOCATION,
},
];
Expand All @@ -296,14 +296,14 @@ mod tests {
fn test_lexicographical_sorting_by_invocation_uuid_complete_journal_entry_kind() {
// Higher random part should be sorted correctly in bytes
let a = TimerKey {
kind: TimerKind::CompleteJournalEntry {
kind: TimerKeyKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 0,
},
timestamp: 300,
};
let b = TimerKey {
kind: TimerKind::CompleteJournalEntry {
kind: TimerKeyKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION.increment_random(),
journal_index: 0,
},
Expand All @@ -313,7 +313,7 @@ mod tests {

// Also ensure that higher timestamp is sorted correctly
let b = TimerKey {
kind: TimerKind::CompleteJournalEntry {
kind: TimerKeyKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(),
journal_index: 0,
},
Expand All @@ -326,13 +326,13 @@ mod tests {
fn test_lexicographical_sorting_by_invocation_uuid_invoke_kind() {
// Higher random part should be sorted correctly in bytes
let a = TimerKey {
kind: TimerKind::Invoke {
kind: TimerKeyKind::Invoke {
invocation_uuid: FIXTURE_INVOCATION,
},
timestamp: 300,
};
let b = TimerKey {
kind: TimerKind::Invoke {
kind: TimerKeyKind::Invoke {
invocation_uuid: FIXTURE_INVOCATION.increment_random(),
},
timestamp: 300,
Expand All @@ -341,7 +341,7 @@ mod tests {

// Also ensure that higher timestamp is sorted correctly
let b = TimerKey {
kind: TimerKind::Invoke {
kind: TimerKeyKind::Invoke {
invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(),
},
timestamp: 300,
Expand All @@ -353,13 +353,13 @@ mod tests {
fn test_lexicographical_sorting_by_invocation_uuid_clean_invoation_status_kind() {
// Higher random part should be sorted correctly in bytes
let a = TimerKey {
kind: TimerKind::CleanInvocationStatus {
kind: TimerKeyKind::CleanInvocationStatus {
invocation_uuid: FIXTURE_INVOCATION,
},
timestamp: 300,
};
let b = TimerKey {
kind: TimerKind::CleanInvocationStatus {
kind: TimerKeyKind::CleanInvocationStatus {
invocation_uuid: FIXTURE_INVOCATION.increment_random(),
},
timestamp: 300,
Expand All @@ -368,7 +368,7 @@ mod tests {

// Also ensure that higher timestamp is sorted correctly
let b = TimerKey {
kind: TimerKind::CleanInvocationStatus {
kind: TimerKeyKind::CleanInvocationStatus {
invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(),
},
timestamp: 300,
Expand All @@ -379,14 +379,14 @@ mod tests {
#[test]
fn test_lexicographical_sorting_by_journal_index() {
let a = TimerKey {
kind: TimerKind::CompleteJournalEntry {
kind: TimerKeyKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 0,
},
timestamp: 300,
};
let b = TimerKey {
kind: TimerKind::CompleteJournalEntry {
kind: TimerKeyKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 1,
},
Expand All @@ -398,22 +398,22 @@ mod tests {
#[test]
fn test_lexicographical_sorting_timer_kind() {
let a = TimerKey {
kind: TimerKind::Invoke {
kind: TimerKeyKind::Invoke {
invocation_uuid: FIXTURE_INVOCATION,
},
timestamp: 300,
};

let b = TimerKey {
kind: TimerKind::CompleteJournalEntry {
kind: TimerKeyKind::CompleteJournalEntry {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 0,
},
timestamp: 300,
};

let c = TimerKey {
kind: TimerKind::CleanInvocationStatus {
kind: TimerKeyKind::CleanInvocationStatus {
invocation_uuid: FIXTURE_INVOCATION,
},
timestamp: 300,
Expand Down Expand Up @@ -475,19 +475,23 @@ mod tests {

pub fn random_timer_key() -> TimerKey {
let kind = {
match TimerKindDiscriminants::VARIANTS
[rand::thread_rng().gen_range(0..TimerKindDiscriminants::VARIANTS.len())]
match TimerKeyKindDiscriminants::VARIANTS
[rand::thread_rng().gen_range(0..TimerKeyKindDiscriminants::VARIANTS.len())]
{
TimerKindDiscriminants::Invoke => TimerKind::Invoke {
invocation_uuid: InvocationUuid::new(),
},
TimerKindDiscriminants::CompleteJournalEntry => TimerKind::CompleteJournalEntry {
invocation_uuid: InvocationUuid::new(),
journal_index: rand::thread_rng().gen_range(0..2 ^ 16),
},
TimerKindDiscriminants::CleanInvocationStatus => TimerKind::CleanInvocationStatus {
TimerKeyKindDiscriminants::Invoke => TimerKeyKind::Invoke {
invocation_uuid: InvocationUuid::new(),
},
TimerKeyKindDiscriminants::CompleteJournalEntry => {
TimerKeyKind::CompleteJournalEntry {
invocation_uuid: InvocationUuid::new(),
journal_index: rand::thread_rng().gen_range(0..2 ^ 16),
}
}
TimerKeyKindDiscriminants::CleanInvocationStatus => {
TimerKeyKind::CleanInvocationStatus {
invocation_uuid: InvocationUuid::new(),
}
}
}
};

Expand Down
Loading

0 comments on commit 64c7193

Please sign in to comment.