diff --git a/Cargo.lock b/Cargo.lock index 7373fe5de..06070ace8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6066,6 +6066,7 @@ dependencies = [ "prost-types", "restate-types", "serde", + "strum 0.26.2", "strum_macros 0.26.2", "thiserror", ] diff --git a/crates/partition-store/src/keys.rs b/crates/partition-store/src/keys.rs index 279db7d7b..c42d48516 100644 --- a/crates/partition-store/src/keys.rs +++ b/crates/partition-store/src/keys.rs @@ -12,6 +12,7 @@ use anyhow::anyhow; use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytestring::ByteString; use prost::encoding::encoded_len_varint; +use std::mem; use strum_macros::EnumIter; /// Every table key needs to have a key kind. This allows to multiplex different keys in the same @@ -305,6 +306,7 @@ macro_rules! define_table_key { use crate::TableKind; pub(crate) use define_table_key; use restate_storage_api::deduplication_table::ProducerId; +use restate_storage_api::timer_table::TimerKeyKind; use restate_storage_api::StorageError; use restate_types::identifiers::{InvocationUuid, PartitionId}; @@ -485,6 +487,85 @@ impl KeyCodec for ProducerId { } } +impl KeyCodec for TimerKeyKind { + fn encode(&self, target: &mut B) { + assert!( + self.serialized_length() <= target.remaining_mut(), + "serialization buffer has not enough space to serialize TimerKind: '{}' bytes required", + self.serialized_length() + ); + match self { + TimerKeyKind::Invoke { invocation_uuid } => { + target.put_u8(0); + invocation_uuid.encode(target); + } + TimerKeyKind::CompleteJournalEntry { + invocation_uuid, + journal_index, + } => { + target.put_u8(1); + invocation_uuid.encode(target); + journal_index.encode(target); + } + TimerKeyKind::CleanInvocationStatus { invocation_uuid } => { + target.put_u8(2); + invocation_uuid.encode(target); + } + } + } + + fn decode(source: &mut B) -> crate::partition_store::Result { + if source.remaining() < mem::size_of::() { + return Err(StorageError::Generic(anyhow!( + "TimerKind discriminator byte is missing" + ))); + } + + Ok(match source.get_u8() { + 0 => { + let invocation_uuid = InvocationUuid::decode(source)?; + TimerKeyKind::Invoke { invocation_uuid } + } + 1 => { + let invocation_uuid = InvocationUuid::decode(source)?; + let journal_index = u32::decode(source)?; + TimerKeyKind::CompleteJournalEntry { + invocation_uuid, + journal_index, + } + } + 2 => { + let invocation_uuid = InvocationUuid::decode(source)?; + TimerKeyKind::CleanInvocationStatus { invocation_uuid } + } + i => { + return Err(StorageError::Generic(anyhow!( + "Unknown discriminator for TimerKind: '{}'", + i + ))) + } + }) + } + + fn serialized_length(&self) -> usize { + 1 + match self { + TimerKeyKind::Invoke { invocation_uuid } => { + KeyCodec::serialized_length(invocation_uuid) + } + TimerKeyKind::CompleteJournalEntry { + invocation_uuid, + journal_index, + } => { + KeyCodec::serialized_length(invocation_uuid) + + KeyCodec::serialized_length(journal_index) + } + TimerKeyKind::CleanInvocationStatus { invocation_uuid } => { + KeyCodec::serialized_length(invocation_uuid) + } + } + } +} + #[inline] fn write_delimited(source: impl AsRef<[u8]>, target: &mut B) { let source = source.as_ref(); diff --git a/crates/partition-store/src/timer_table/mod.rs b/crates/partition-store/src/timer_table/mod.rs index a2ed766d5..9e3b2aaf4 100644 --- a/crates/partition-store/src/timer_table/mod.rs +++ b/crates/partition-store/src/timer_table/mod.rs @@ -15,7 +15,7 @@ use crate::{PartitionStore, RocksDBTransaction, StorageAccess}; use crate::{TableScan, TableScanIterationDecision}; use futures::Stream; use futures_util::stream; -use restate_storage_api::timer_table::{Timer, TimerKey, TimerTable}; +use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind, TimerTable}; use restate_storage_api::{Result, StorageError}; use restate_types::identifiers::{InvocationUuid, PartitionId}; use restate_types::storage::StorageCodec; @@ -26,8 +26,7 @@ define_table_key!( TimersKey( partition_id: PartitionId, timestamp: u64, - invocation_id: InvocationUuid, - journal_index: u32 + kind: TimerKeyKind, ) ); @@ -36,8 +35,7 @@ fn write_timer_key(partition_id: PartitionId, timer_key: &TimerKey) -> TimersKey TimersKey::default() .partition_id(partition_id) .timestamp(timer_key.timestamp) - .invocation_id(timer_key.invocation_uuid) - .journal_index(timer_key.journal_index) + .kind(timer_key.kind.clone()) } #[inline] @@ -48,9 +46,8 @@ fn timer_key_from_key_slice(slice: &[u8]) -> Result { return Err(StorageError::DataIntegrityError); } let timer_key = TimerKey { - invocation_uuid: key.invocation_id.unwrap(), - journal_index: key.journal_index.unwrap(), timestamp: key.timestamp.unwrap(), + kind: key.kind.unwrap(), }; Ok(timer_key) @@ -71,11 +68,40 @@ fn exclusive_start_key_range( timer_key: Option<&TimerKey>, ) -> TableScan { if let Some(timer_key) = timer_key { - let mut lower_bound = write_timer_key(partition_id, timer_key); - - let next_index = lower_bound.journal_index.map(|i| i + 1).unwrap_or(1); + let next_timer_key = match timer_key.kind { + TimerKeyKind::Invoke { invocation_uuid } => { + let incremented_invocation_uuid = increment_invocation_uuid(invocation_uuid); + TimerKey { + timestamp: timer_key.timestamp, + kind: TimerKeyKind::Invoke { + invocation_uuid: incremented_invocation_uuid, + }, + } + } + TimerKeyKind::CompleteJournalEntry { + invocation_uuid, + journal_index, + } => TimerKey { + timestamp: timer_key.timestamp, + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid, + journal_index: journal_index + .checked_add(1) + .expect("journal index should be smaller than u64::MAX"), + }, + }, + TimerKeyKind::CleanInvocationStatus { invocation_uuid } => { + let incremented_invocation_uuid = increment_invocation_uuid(invocation_uuid); + TimerKey { + timestamp: timer_key.timestamp, + kind: TimerKeyKind::CleanInvocationStatus { + invocation_uuid: incremented_invocation_uuid, + }, + } + } + }; - lower_bound.journal_index = Some(next_index); + let lower_bound = write_timer_key(partition_id, &next_timer_key); let upper_bound = TimersKey::default() .partition_id(partition_id) @@ -87,6 +113,15 @@ fn exclusive_start_key_range( } } +fn increment_invocation_uuid(invocation_uuid: InvocationUuid) -> InvocationUuid { + let invocation_uuid_value: u128 = invocation_uuid.into(); + InvocationUuid::from( + invocation_uuid_value + .checked_add(1) + .expect("invocation_uuid should be smaller than u128::MAX"), + ) +} + fn add_timer( storage: &mut S, partition_id: PartitionId, @@ -174,16 +209,50 @@ mod tests { use super::*; use crate::timer_table::TimerKey; use rand::Rng; + use restate_storage_api::timer_table::TimerKeyKindDiscriminants; use restate_types::identifiers::InvocationUuid; + use strum::VariantArray; const FIXTURE_INVOCATION: InvocationUuid = InvocationUuid::from_parts(1706027034946, 12345678900001); #[test] - fn round_trip() { + fn round_trip_complete_journal_entry_kind() { + let key = TimerKey { + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION, + journal_index: 1448, + }, + timestamp: 87654321, + }; + + let key_bytes = write_timer_key(PartitionId::from(1337), &key).serialize(); + let got = timer_key_from_key_slice(&key_bytes).expect("should not fail"); + + assert_eq!(got, key); + } + + #[test] + fn round_trip_invoke_kind() { let key = TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 1448, + kind: TimerKeyKind::Invoke { + invocation_uuid: FIXTURE_INVOCATION, + }, + timestamp: 87654321, + }; + + let key_bytes = write_timer_key(PartitionId::from(1337), &key).serialize(); + let got = timer_key_from_key_slice(&key_bytes).expect("should not fail"); + + assert_eq!(got, key); + } + + #[test] + fn round_trip_clean_invocation_status_kind() { + let key = TimerKey { + kind: TimerKeyKind::CleanInvocationStatus { + invocation_uuid: FIXTURE_INVOCATION, + }, timestamp: 87654321, }; @@ -195,66 +264,175 @@ mod tests { #[test] fn test_lexicographical_sorting_by_timestamp() { + let kinds = [ + TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION, + journal_index: 0, + }, + TimerKeyKind::Invoke { + invocation_uuid: FIXTURE_INVOCATION, + }, + TimerKeyKind::CleanInvocationStatus { + invocation_uuid: FIXTURE_INVOCATION, + }, + ]; + + for first_kind in &kinds { + for second_kind in &kinds { + let a = TimerKey { + kind: first_kind.clone(), + timestamp: 300, + }; + let b = TimerKey { + kind: second_kind.clone(), + timestamp: 301, + }; + assert_in_range(&a, &b); + } + } + } + + #[test] + fn test_lexicographical_sorting_by_invocation_uuid_complete_journal_entry_kind() { + // Higher random part should be sorted correctly in bytes + let a = TimerKey { + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION, + journal_index: 0, + }, + timestamp: 300, + }; + let b = TimerKey { + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION.increment_random(), + journal_index: 0, + }, + timestamp: 300, + }; + assert_in_range(&a, &b); + + // Also ensure that higher timestamp is sorted correctly + let b = TimerKey { + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(), + journal_index: 0, + }, + timestamp: 300, + }; + assert_in_range(&a, &b); + } + + #[test] + fn test_lexicographical_sorting_by_invocation_uuid_invoke_kind() { + // Higher random part should be sorted correctly in bytes let a = TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 0, + kind: TimerKeyKind::Invoke { + invocation_uuid: FIXTURE_INVOCATION, + }, timestamp: 300, }; let b = TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 0, - timestamp: 301, + kind: TimerKeyKind::Invoke { + invocation_uuid: FIXTURE_INVOCATION.increment_random(), + }, + timestamp: 300, + }; + assert_in_range(&a, &b); + + // Also ensure that higher timestamp is sorted correctly + let b = TimerKey { + kind: TimerKeyKind::Invoke { + invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(), + }, + timestamp: 300, }; - assert_in_range(a, b); + assert_in_range(&a, &b); } #[test] - fn test_lexicographical_sorting_by_invocation() { + fn test_lexicographical_sorting_by_invocation_uuid_clean_invoation_status_kind() { // Higher random part should be sorted correctly in bytes let a = TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 0, + kind: TimerKeyKind::CleanInvocationStatus { + invocation_uuid: FIXTURE_INVOCATION, + }, timestamp: 300, }; let b = TimerKey { - invocation_uuid: FIXTURE_INVOCATION.increment_random(), - journal_index: 0, + kind: TimerKeyKind::CleanInvocationStatus { + invocation_uuid: FIXTURE_INVOCATION.increment_random(), + }, timestamp: 300, }; - assert_in_range(a.clone(), b); + assert_in_range(&a, &b); // Also ensure that higher timestamp is sorted correctly let b = TimerKey { - invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(), - journal_index: 0, + kind: TimerKeyKind::CleanInvocationStatus { + invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(), + }, timestamp: 300, }; - assert_in_range(a, b); + assert_in_range(&a, &b); } #[test] fn test_lexicographical_sorting_by_journal_index() { let a = TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 0, + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION, + journal_index: 0, + }, timestamp: 300, }; let b = TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 1, + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION, + journal_index: 1, + }, timestamp: 300, }; - assert_in_range(a, b); + assert_in_range(&a, &b); + } + + #[test] + fn test_lexicographical_sorting_timer_kind() { + let a = TimerKey { + kind: TimerKeyKind::Invoke { + invocation_uuid: FIXTURE_INVOCATION, + }, + timestamp: 300, + }; + + let b = TimerKey { + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION, + journal_index: 0, + }, + timestamp: 300, + }; + + let c = TimerKey { + kind: TimerKeyKind::CleanInvocationStatus { + invocation_uuid: FIXTURE_INVOCATION, + }, + timestamp: 300, + }; + + assert_in_range(&a, &b); + assert_in_range(&b, &c); } #[track_caller] - fn assert_in_range(key_a: TimerKey, key_b: TimerKey) { - let key_a_bytes = write_timer_key(PartitionId::from(1), &key_a).serialize(); - let key_b_bytes = write_timer_key(PartitionId::from(1), &key_b).serialize(); + fn assert_in_range(key_a: &TimerKey, key_b: &TimerKey) { + assert!(key_a < key_b); + + let key_a_bytes = write_timer_key(PartitionId::from(1), key_a).serialize(); + let key_b_bytes = write_timer_key(PartitionId::from(1), key_b).serialize(); assert!(less_than(&key_a_bytes, &key_b_bytes)); - let (low, high) = match exclusive_start_key_range(PartitionId::from(1), Some(&key_a)) { + let (low, high) = match exclusive_start_key_range(PartitionId::from(1), Some(key_a)) { TableScan::KeyRangeInclusiveInSinglePartition(p, low, high) if *p == 1 => (low, high), _ => panic!(""), }; @@ -296,9 +474,29 @@ mod tests { } pub fn random_timer_key() -> TimerKey { + let kind = { + match TimerKeyKindDiscriminants::VARIANTS + [rand::thread_rng().gen_range(0..TimerKeyKindDiscriminants::VARIANTS.len())] + { + TimerKeyKindDiscriminants::Invoke => TimerKeyKind::Invoke { + invocation_uuid: InvocationUuid::new(), + }, + TimerKeyKindDiscriminants::CompleteJournalEntry => { + TimerKeyKind::CompleteJournalEntry { + invocation_uuid: InvocationUuid::new(), + journal_index: rand::thread_rng().gen_range(0..2 ^ 16), + } + } + TimerKeyKindDiscriminants::CleanInvocationStatus => { + TimerKeyKind::CleanInvocationStatus { + invocation_uuid: InvocationUuid::new(), + } + } + } + }; + TimerKey { - invocation_uuid: InvocationUuid::new(), - journal_index: rand::thread_rng().gen_range(0..2 ^ 16), + kind, timestamp: rand::thread_rng().gen_range(0..2 ^ 16), } } diff --git a/crates/partition-store/tests/timer_table_test/mod.rs b/crates/partition-store/tests/timer_table_test/mod.rs index 6b5d5990e..731469f07 100644 --- a/crates/partition-store/tests/timer_table_test/mod.rs +++ b/crates/partition-store/tests/timer_table_test/mod.rs @@ -10,15 +10,18 @@ use crate::mock_service_invocation; use futures_util::StreamExt; +use googletest::matchers::eq; +use googletest::{assert_that, pat}; use restate_partition_store::PartitionStore; -use restate_storage_api::timer_table::{Timer, TimerKey, TimerTable}; +use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind, TimerTable}; use restate_storage_api::Transaction; -use restate_types::identifiers::{InvocationUuid, PartitionId, ServiceId}; +use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionId, ServiceId}; use restate_types::invocation::ServiceInvocation; use std::pin::pin; -const FIXTURE_INVOCATION: InvocationUuid = +const FIXTURE_INVOCATION_UUID: InvocationUuid = InvocationUuid::from_parts(1706027034946, 12345678900001); +const FIXTURE_INVOCATION: InvocationId = InvocationId::from_parts(1337, FIXTURE_INVOCATION_UUID); const PARTITION1337: PartitionId = PartitionId::new_unchecked(1337); @@ -26,22 +29,26 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 0, + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION.invocation_uuid(), + journal_index: 0, + }, timestamp: 0, }, - Timer::CompleteSleepEntry(1337), + Timer::CompleteJournalEntry(FIXTURE_INVOCATION, 0), ) .await; txn.add_timer( PARTITION1337, &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 1, + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION.invocation_uuid(), + journal_index: 1, + }, timestamp: 0, }, - Timer::CompleteSleepEntry(1337), + Timer::CompleteJournalEntry(FIXTURE_INVOCATION, 1), ) .await; @@ -51,8 +58,9 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 2, + kind: TimerKeyKind::Invoke { + invocation_uuid: service_invocation.invocation_id.invocation_uuid(), + }, timestamp: 1, }, Timer::Invoke(service_invocation), @@ -65,22 +73,26 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 0, + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION_UUID, + journal_index: 0, + }, timestamp: 0, }, - Timer::CompleteSleepEntry(1336), + Timer::CompleteJournalEntry(InvocationId::from_parts(1336, FIXTURE_INVOCATION_UUID), 0), ) .await; txn.add_timer( PartitionId::from(1338), &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 0, + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION_UUID, + journal_index: 0, + }, timestamp: 0, }, - Timer::CompleteSleepEntry(1338), + Timer::CompleteJournalEntry(InvocationId::from_parts(1338, FIXTURE_INVOCATION_UUID), 0), ) .await; } @@ -98,8 +110,10 @@ async fn demo_how_to_find_first_timers_in_a_partition(txn: &mut T async fn find_timers_greater_than(txn: &mut T) { let timer_key = &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 0, + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION_UUID, + journal_index: 0, + }, timestamp: 0, }; let mut stream = pin!(txn.next_timers_greater_than(PARTITION1337, Some(timer_key), usize::MAX)); @@ -107,13 +121,19 @@ async fn find_timers_greater_than(txn: &mut T) { if let Some(Ok((key, _))) = stream.next().await { // make sure that we skip the first timer that has a journal_index of 0 // take a look at populate_data once again. - assert_eq!(key.journal_index, 1); + assert_that!( + key.kind, + pat!(TimerKeyKind::CompleteJournalEntry { + journal_index: eq(1), + }) + ); } else { panic!("test failure"); } if let Some(Ok((key, _))) = stream.next().await { - assert_eq!(key.journal_index, 2); + assert_that!(key.kind, pat!(TimerKeyKind::Invoke { .. })); + assert_eq!(key.timestamp, 1); } else { panic!("test failure"); } @@ -123,8 +143,10 @@ async fn delete_the_first_timer(txn: &mut T) { txn.delete_timer( PARTITION1337, &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 0, + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION_UUID, + journal_index: 0, + }, timestamp: 0, }, ) @@ -133,16 +155,22 @@ async fn delete_the_first_timer(txn: &mut T) { async fn verify_next_timer_after_deletion(txn: &mut T) { let timer_key = &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 0, + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid: FIXTURE_INVOCATION_UUID, + journal_index: 0, + }, timestamp: 0, }; let mut stream = pin!(txn.next_timers_greater_than(PARTITION1337, Some(timer_key), usize::MAX,)); if let Some(Ok((key, _))) = stream.next().await { - // make sure that we skip the first timer - assert_eq!(key.journal_index, 1); + assert_that!( + key.kind, + pat!(TimerKeyKind::CompleteJournalEntry { + journal_index: eq(1) + }) + ); } else { panic!("test failure"); } diff --git a/crates/storage-api/Cargo.toml b/crates/storage-api/Cargo.toml index 8b26beac0..20010498a 100644 --- a/crates/storage-api/Cargo.toml +++ b/crates/storage-api/Cargo.toml @@ -23,6 +23,7 @@ opentelemetry = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } serde = { workspace = true, optional = true } +strum = { workspace = true } strum_macros = { workspace = true } thiserror = { workspace = true } diff --git a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto index 8b44c7fda..4efcd3e58 100644 --- a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto +++ b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto @@ -68,7 +68,7 @@ message JournalMeta { message Source { message Service { - bytes invocation_id = 1; + InvocationId invocation_id = 1; InvocationTarget invocation_target = 2; } @@ -221,7 +221,7 @@ message Header { } message ServiceInvocation { - bytes invocation_id = 1; + InvocationId invocation_id = 1; InvocationTarget invocation_target = 2; bytes argument = 3; ServiceInvocationResponseSink response_sink = 4; @@ -240,7 +240,7 @@ message StateMutation { message InboxEntry { message Invocation { - bytes invocation_id = 1; + InvocationId invocation_id = 1; ServiceId service_id = 2; } @@ -252,7 +252,7 @@ message InboxEntry { message InvocationResolutionResult { message Success { - bytes invocation_id = 1; + InvocationId invocation_id = 1; InvocationTarget invocation_target = 2; SpanContext span_context = 3; } @@ -264,7 +264,7 @@ message InvocationResolutionResult { } message BackgroundCallResolutionResult { - bytes invocation_id = 1; + InvocationId invocation_id = 1; InvocationTarget invocation_target = 2; SpanContext span_context = 3; } @@ -311,7 +311,7 @@ message EnrichedEntryHeader { } message CompleteAwakeable { - bytes invocation_id = 1; + InvocationId invocation_id = 1; uint32 entry_index = 2; } @@ -404,17 +404,17 @@ message OutboxMessage { } message OutboxServiceInvocationResponse { - bytes invocation_id = 1; + InvocationId invocation_id = 1; uint32 entry_index = 2; ResponseResult response_result = 3; } message OutboxKill { - bytes invocation_id = 1; + InvocationId invocation_id = 1; } message OutboxCancel { - bytes invocation_id = 1; + InvocationId invocation_id = 1; } oneof outbox_message { @@ -433,11 +433,12 @@ message OutboxMessage { message Timer { message CompleteSleepEntry { - uint64 partition_key = 1; + InvocationId invocation_id = 1; + uint32 entry_index = 2; } message CleanInvocationStatus { - bytes invocation_id = 1; + InvocationId invocation_id = 1; } oneof value { @@ -478,7 +479,7 @@ message DedupSequenceNumber { // --------------------------------------------------------------------- message IdempotencyMetadata { - bytes invocation_id = 1; + InvocationId invocation_id = 1; } message IdempotentRequestMetadata { diff --git a/crates/storage-api/src/storage.rs b/crates/storage-api/src/storage.rs index 26e39d5af..d0beee92b 100644 --- a/crates/storage-api/src/storage.rs +++ b/crates/storage-api/src/storage.rs @@ -757,8 +757,10 @@ pub mod v1 { { source::Source::Ingress(_) => restate_types::invocation::Source::Ingress, source::Source::Service(service) => restate_types::invocation::Source::Service( - restate_types::identifiers::InvocationId::from_slice( - &service.invocation_id, + restate_types::identifiers::InvocationId::try_from( + service + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, )?, restate_types::invocation::InvocationTarget::try_from( service @@ -781,7 +783,7 @@ pub mod v1 { invocation_id, invocation_target, ) => source::Source::Service(source::Service { - invocation_id: invocation_id.into(), + invocation_id: Some(InvocationId::from(invocation_id)), invocation_target: Some(InvocationTarget::from(invocation_target)), }), restate_types::invocation::Source::Internal => source::Source::Internal(()), @@ -806,8 +808,10 @@ pub mod v1 { .service_id .ok_or(ConversionError::missing_field("service_id"))?, )?, - restate_types::identifiers::InvocationId::from_slice( - &invocation.invocation_id, + restate_types::identifiers::InvocationId::try_from( + invocation + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, )?, ) } @@ -829,7 +833,7 @@ pub mod v1 { crate::inbox_table::InboxEntry::Invocation(service_id, invocation_id) => { inbox_entry::Entry::Invocation(inbox_entry::Invocation { service_id: Some(service_id.into()), - invocation_id: invocation_id.into(), + invocation_id: Some(InvocationId::from(invocation_id)), }) } crate::inbox_table::InboxEntry::StateMutation(state_mutation) => { @@ -859,8 +863,9 @@ pub mod v1 { idempotency, } = value; - let invocation_id = - restate_types::identifiers::InvocationId::from_slice(&invocation_id)?; + let invocation_id = restate_types::identifiers::InvocationId::try_from( + invocation_id.ok_or(ConversionError::missing_field("invocation_id"))?, + )?; let invocation_target = restate_types::invocation::InvocationTarget::try_from( invocation_target.ok_or(ConversionError::missing_field("invocation_target"))?, @@ -911,7 +916,6 @@ pub mod v1 { impl From for ServiceInvocation { fn from(value: restate_types::invocation::ServiceInvocation) -> Self { - let invocation_id = Bytes::copy_from_slice(&value.invocation_id.to_bytes()); let invocation_target = InvocationTarget::from(value.invocation_target); let span_context = SpanContext::from(value.span_context); let response_sink = ServiceInvocationResponseSink::from(value.response_sink); @@ -919,7 +923,7 @@ pub mod v1 { let headers = value.headers.into_iter().map(Into::into).collect(); ServiceInvocation { - invocation_id, + invocation_id: Some(InvocationId::from(value.invocation_id)), invocation_target: Some(invocation_target), span_context: Some(span_context), response_sink: Some(response_sink), @@ -1535,11 +1539,11 @@ pub mod v1 { }) => { restate_types::journal::enriched::EnrichedEntryHeader::CompleteAwakeable { enrichment_result: AwakeableEnrichmentResult { - invocation_id: - restate_types::identifiers::InvocationId::from_slice( - &invocation_id, - ) - .map_err(ConversionError::invalid_data)?, + invocation_id: restate_types::identifiers::InvocationId::try_from( + invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, + ) + .map_err(ConversionError::invalid_data)?, entry_index, }, } @@ -1618,9 +1622,7 @@ pub mod v1 { enrichment_result, .. } => enriched_entry_header::Kind::CompleteAwakeable(CompleteAwakeable { - invocation_id: Bytes::copy_from_slice( - &enrichment_result.invocation_id.to_bytes(), - ), + invocation_id: Some(InvocationId::from(enrichment_result.invocation_id)), entry_index: enrichment_result.entry_index, }), restate_types::journal::enriched::EnrichedEntryHeader::Run { .. } => { @@ -1649,8 +1651,10 @@ pub mod v1 { { invocation_resolution_result::Result::None(_) => None, invocation_resolution_result::Result::Success(success) => { - let invocation_id = restate_types::identifiers::InvocationId::from_slice( - &success.invocation_id, + let invocation_id = restate_types::identifiers::InvocationId::try_from( + success + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, )?; let invocation_target = @@ -1692,7 +1696,7 @@ pub mod v1 { span_context, } => invocation_resolution_result::Result::Success( invocation_resolution_result::Success { - invocation_id: invocation_id.into(), + invocation_id: Some(InvocationId::from(invocation_id)), invocation_target: Some(invocation_target.into()), span_context: Some(SpanContext::from(span_context)), }, @@ -1712,8 +1716,11 @@ pub mod v1 { type Error = ConversionError; fn try_from(value: BackgroundCallResolutionResult) -> Result { - let invocation_id = - restate_types::identifiers::InvocationId::from_slice(&value.invocation_id)?; + let invocation_id = restate_types::identifiers::InvocationId::try_from( + value + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, + )?; let invocation_target = restate_types::invocation::InvocationTarget::try_from( value @@ -1740,7 +1747,7 @@ pub mod v1 { { fn from(value: restate_types::journal::enriched::CallEnrichmentResult) -> Self { BackgroundCallResolutionResult { - invocation_id: value.invocation_id.into(), + invocation_id: Some(InvocationId::from(value.invocation_id)), invocation_target: Some(value.invocation_target.into()), span_context: Some(SpanContext::from(value.span_context)), } @@ -1769,8 +1776,10 @@ pub mod v1 { ) => crate::outbox_table::OutboxMessage::ServiceResponse( restate_types::invocation::InvocationResponse { entry_index: invocation_response.entry_index, - id: restate_types::identifiers::InvocationId::from_slice( - &invocation_response.invocation_id, + id: restate_types::identifiers::InvocationId::try_from( + invocation_response + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, )?, result: restate_types::invocation::ResponseResult::try_from( invocation_response @@ -1782,8 +1791,10 @@ pub mod v1 { outbox_message::OutboxMessage::Kill(outbox_kill) => { crate::outbox_table::OutboxMessage::InvocationTermination( InvocationTermination::kill( - restate_types::identifiers::InvocationId::from_slice( - &outbox_kill.invocation_id, + restate_types::identifiers::InvocationId::try_from( + outbox_kill + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, )?, ), ) @@ -1791,8 +1802,10 @@ pub mod v1 { outbox_message::OutboxMessage::Cancel(outbox_cancel) => { crate::outbox_table::OutboxMessage::InvocationTermination( InvocationTermination::cancel( - restate_types::identifiers::InvocationId::from_slice( - &outbox_cancel.invocation_id, + restate_types::identifiers::InvocationId::try_from( + outbox_cancel + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, )?, ), ) @@ -1819,7 +1832,7 @@ pub mod v1 { outbox_message::OutboxMessage::ServiceInvocationResponse( OutboxServiceInvocationResponse { entry_index: invocation_response.entry_index, - invocation_id: invocation_response.id.into(), + invocation_id: Some(InvocationId::from(invocation_response.id)), response_result: Some(ResponseResult::from( invocation_response.result, )), @@ -1831,12 +1844,16 @@ pub mod v1 { ) => match invocation_termination.flavor { TerminationFlavor::Kill => { outbox_message::OutboxMessage::Kill(OutboxKill { - invocation_id: invocation_termination.invocation_id.into(), + invocation_id: Some(InvocationId::from( + invocation_termination.invocation_id, + )), }) } TerminationFlavor::Cancel => { outbox_message::OutboxMessage::Cancel(OutboxCancel { - invocation_id: invocation_termination.invocation_id.into(), + invocation_id: Some(InvocationId::from( + invocation_termination.invocation_id, + )), }) } }, @@ -1903,15 +1920,23 @@ pub mod v1 { Ok( match value.value.ok_or(ConversionError::missing_field("value"))? { timer::Value::CompleteSleepEntry(cse) => { - crate::timer_table::Timer::CompleteSleepEntry(cse.partition_key) + crate::timer_table::Timer::CompleteJournalEntry( + restate_types::identifiers::InvocationId::try_from( + cse.invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, + )?, + cse.entry_index, + ) } timer::Value::Invoke(si) => crate::timer_table::Timer::Invoke( restate_types::invocation::ServiceInvocation::try_from(si)?, ), timer::Value::CleanInvocationStatus(clean_invocation_status) => { crate::timer_table::Timer::CleanInvocationStatus( - restate_types::identifiers::InvocationId::from_slice( - &clean_invocation_status.invocation_id, + restate_types::identifiers::InvocationId::try_from( + clean_invocation_status + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, )?, ) } @@ -1924,18 +1949,20 @@ pub mod v1 { fn from(value: crate::timer_table::Timer) -> Self { Timer { value: Some(match value { - crate::timer_table::Timer::CompleteSleepEntry(partition_key) => { - timer::Value::CompleteSleepEntry(timer::CompleteSleepEntry { - partition_key, - }) - } + crate::timer_table::Timer::CompleteJournalEntry( + invocation_id, + entry_index, + ) => timer::Value::CompleteSleepEntry(timer::CompleteSleepEntry { + invocation_id: Some(InvocationId::from(invocation_id)), + entry_index, + }), crate::timer_table::Timer::Invoke(si) => { timer::Value::Invoke(ServiceInvocation::from(si)) } crate::timer_table::Timer::CleanInvocationStatus(invocation_id) => { timer::Value::CleanInvocationStatus(timer::CleanInvocationStatus { - invocation_id: Bytes::copy_from_slice(&invocation_id.to_bytes()), + invocation_id: Some(InvocationId::from(invocation_id)), }) } }), @@ -2024,7 +2051,7 @@ pub mod v1 { impl From for IdempotencyMetadata { fn from(value: crate::idempotency_table::IdempotencyMetadata) -> Self { IdempotencyMetadata { - invocation_id: Bytes::copy_from_slice(&value.invocation_id.to_bytes()), + invocation_id: Some(InvocationId::from(value.invocation_id)), } } } @@ -2034,8 +2061,10 @@ pub mod v1 { fn try_from(value: IdempotencyMetadata) -> Result { Ok(crate::idempotency_table::IdempotencyMetadata { - invocation_id: restate_types::identifiers::InvocationId::from_slice( - &value.invocation_id, + invocation_id: restate_types::identifiers::InvocationId::try_from( + value + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, ) .map_err(|e| ConversionError::invalid_data(e))?, }) diff --git a/crates/storage-api/src/timer_table/mod.rs b/crates/storage-api/src/timer_table/mod.rs index 38b174029..a392c9397 100644 --- a/crates/storage-api/src/timer_table/mod.rs +++ b/crates/storage-api/src/timer_table/mod.rs @@ -14,14 +14,49 @@ use restate_types::identifiers::{ InvocationId, InvocationUuid, PartitionId, PartitionKey, WithPartitionKey, }; use restate_types::invocation::ServiceInvocation; +use restate_types::time::MillisSinceEpoch; use std::cmp::Ordering; use std::future::Future; +/// # Important +/// We use the [`TimerKey`] to read the timers in an absolute order. The timer service +/// relies on this order in order to process each timer exactly once. That is the +/// reason why the in-memory and in-rocksdb ordering of the TimerKey needs to be exactly +/// the same. #[derive(Clone, Debug, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub struct TimerKey { pub timestamp: u64, - pub invocation_uuid: InvocationUuid, - pub journal_index: u32, + pub kind: TimerKeyKind, +} + +impl TimerKey { + fn complete_journal_entry( + timestamp: u64, + invocation_uuid: InvocationUuid, + journal_index: u32, + ) -> Self { + TimerKey { + timestamp, + kind: TimerKeyKind::CompleteJournalEntry { + invocation_uuid, + journal_index, + }, + } + } + + fn invoke(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { + TimerKey { + timestamp, + kind: TimerKeyKind::Invoke { invocation_uuid }, + } + } + + fn clean_invocation_status(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { + TimerKey { + timestamp, + kind: TimerKeyKind::CleanInvocationStatus { invocation_uuid }, + } + } } impl PartialOrd for TimerKey { @@ -34,23 +69,149 @@ impl Ord for TimerKey { fn cmp(&self, other: &Self) -> Ordering { self.timestamp .cmp(&other.timestamp) - .then_with(|| self.invocation_uuid.cmp(&other.invocation_uuid)) - .then_with(|| self.journal_index.cmp(&other.journal_index)) + .then_with(|| self.kind.cmp(&other.kind)) + } +} + +#[derive( + Clone, + Debug, + Eq, + PartialEq, + Hash, + serde::Serialize, + serde::Deserialize, + strum_macros::EnumDiscriminants, +)] +#[strum_discriminants(derive(strum_macros::VariantArray))] +pub enum TimerKeyKind { + /// Delayed invocation + Invoke { invocation_uuid: InvocationUuid }, + /// Completion of a journal entry + CompleteJournalEntry { + invocation_uuid: InvocationUuid, + journal_index: u32, + }, + /// Cleaning of invocation status + CleanInvocationStatus { invocation_uuid: InvocationUuid }, +} + +impl TimerKeyKind { + pub fn invocation_uuid(&self) -> InvocationUuid { + *match self { + TimerKeyKind::Invoke { invocation_uuid } => invocation_uuid, + TimerKeyKind::CompleteJournalEntry { + invocation_uuid, .. + } => invocation_uuid, + TimerKeyKind::CleanInvocationStatus { invocation_uuid } => invocation_uuid, + } + } +} + +impl PartialOrd for TimerKeyKind { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TimerKeyKind { + fn cmp(&self, other: &Self) -> Ordering { + match self { + TimerKeyKind::Invoke { invocation_uuid } => match other { + TimerKeyKind::Invoke { + invocation_uuid: other_invocation_uuid, + } => invocation_uuid.cmp(other_invocation_uuid), + TimerKeyKind::CompleteJournalEntry { .. } + | TimerKeyKind::CleanInvocationStatus { .. } => Ordering::Less, + }, + TimerKeyKind::CompleteJournalEntry { + invocation_uuid, + journal_index, + } => match other { + TimerKeyKind::Invoke { .. } => Ordering::Greater, + TimerKeyKind::CompleteJournalEntry { + invocation_uuid: other_invocation_uuid, + journal_index: other_journal_index, + } => invocation_uuid + .cmp(other_invocation_uuid) + .then_with(|| journal_index.cmp(other_journal_index)), + TimerKeyKind::CleanInvocationStatus { .. } => Ordering::Less, + }, + TimerKeyKind::CleanInvocationStatus { invocation_uuid } => match other { + TimerKeyKind::Invoke { .. } | TimerKeyKind::CompleteJournalEntry { .. } => { + Ordering::Greater + } + TimerKeyKind::CleanInvocationStatus { + invocation_uuid: other_invocation_uuid, + } => invocation_uuid.cmp(other_invocation_uuid), + }, + } + } +} + +impl restate_types::timer::TimerKey for TimerKey { + fn wake_up_time(&self) -> MillisSinceEpoch { + self.timestamp.into() } } #[derive(Clone, Debug, Eq, PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum Timer { - CompleteSleepEntry(PartitionKey), Invoke(ServiceInvocation), + CompleteJournalEntry(InvocationId, u32), CleanInvocationStatus(InvocationId), } +impl Timer { + pub fn complete_journal_entry( + timestamp: u64, + invocation_id: InvocationId, + journal_index: u32, + ) -> (TimerKey, Self) { + ( + TimerKey::complete_journal_entry( + timestamp, + invocation_id.invocation_uuid(), + journal_index, + ), + Timer::CompleteJournalEntry(invocation_id, journal_index), + ) + } + + pub fn invoke(timestamp: u64, service_invocation: ServiceInvocation) -> (TimerKey, Self) { + ( + TimerKey::invoke( + timestamp, + service_invocation.invocation_id.invocation_uuid(), + ), + Timer::Invoke(service_invocation), + ) + } + + pub fn clean_invocation_status( + timestamp: u64, + invocation_id: InvocationId, + ) -> (TimerKey, Self) { + ( + TimerKey::clean_invocation_status(timestamp, invocation_id.invocation_uuid()), + Timer::CleanInvocationStatus(invocation_id), + ) + } + + pub fn invocation_id(&self) -> InvocationId { + match self { + Timer::Invoke(service_invocation) => service_invocation.invocation_id, + Timer::CompleteJournalEntry(invocation_id, _) => *invocation_id, + Timer::CleanInvocationStatus(invocation_id) => *invocation_id, + } + } +} + impl WithPartitionKey for Timer { fn partition_key(&self) -> PartitionKey { match self { - Timer::CompleteSleepEntry(partition_key) => *partition_key, + Timer::CompleteJournalEntry(invocation_id, _) => invocation_id.partition_key(), Timer::Invoke(service_invocation) => service_invocation.partition_key(), Timer::CleanInvocationStatus(invocation_id) => invocation_id.partition_key(), } diff --git a/crates/types/src/identifiers.rs b/crates/types/src/identifiers.rs index 997ba72dc..e79d9be56 100644 --- a/crates/types/src/identifiers.rs +++ b/crates/types/src/identifiers.rs @@ -294,6 +294,12 @@ impl From for InvocationUuid { } } +impl From for u128 { + fn from(value: InvocationUuid) -> Self { + value.0.into() + } +} + impl From for opentelemetry::trace::TraceId { fn from(value: InvocationUuid) -> Self { Self::from_bytes(value.to_bytes()) diff --git a/crates/types/src/timer.rs b/crates/types/src/timer.rs index 6d2d81ba6..6aca78582 100644 --- a/crates/types/src/timer.rs +++ b/crates/types/src/timer.rs @@ -19,7 +19,7 @@ pub trait Timer: Hash + Eq + Borrow { fn timer_key(&self) -> &Self::TimerKey; } -/// Timer key establishes an absolute order on [`Timer`]. Naturally, this should be key under +/// Timer key establishes an absolute order on [`Timer`]. Naturally, this should be the key under /// which the timer value is stored and can be retrieved. pub trait TimerKey: Ord + Clone + Hash + Debug { fn wake_up_time(&self) -> MillisSinceEpoch; diff --git a/crates/wal-protocol/src/lib.rs b/crates/wal-protocol/src/lib.rs index 6afb6f9ca..9f42e5bb9 100644 --- a/crates/wal-protocol/src/lib.rs +++ b/crates/wal-protocol/src/lib.rs @@ -20,7 +20,7 @@ use restate_types::{flexbuffers_storage_encode_decode, Version}; use crate::control::AnnounceLeader; use crate::effects::BuiltinServiceEffects; -use crate::timer::TimerValue; +use crate::timer::TimerKeyValue; use restate_types::logs::{LogId, Lsn, Payload}; use restate_types::partition_table::{FindPartition, PartitionTableError}; use restate_types::storage::{StorageCodec, StorageDecodeError, StorageEncodeError}; @@ -134,9 +134,9 @@ pub enum Command { /// Invoker is reporting effect(s) from an ongoing invocation. InvokerEffect(restate_invoker_api::Effect), /// Timer has fired - Timer(TimerValue), + Timer(TimerKeyValue), /// Schedule timer - ScheduleTimer(TimerValue), + ScheduleTimer(TimerKeyValue), /// Another partition processor is reporting a response of an invocation we requested. InvocationResponse(InvocationResponse), /// A built-in invoker reporting effects from an invocation. diff --git a/crates/wal-protocol/src/timer.rs b/crates/wal-protocol/src/timer.rs index ec124cfd2..6ee182371 100644 --- a/crates/wal-protocol/src/timer.rs +++ b/crates/wal-protocol/src/timer.rs @@ -8,8 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_storage_api::timer_table::{Timer, TimerKey}; -use restate_types::identifiers::{EntryIndex, InvocationId, WithPartitionKey}; +use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind}; +use restate_types::identifiers::{EntryIndex, InvocationId}; use restate_types::invocation::ServiceInvocation; use restate_types::time::MillisSinceEpoch; use std::borrow::Borrow; @@ -18,60 +18,48 @@ use std::hash::{Hash, Hasher}; #[derive(Debug, Clone)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct TimerValue { - timer_key: TimerKeyWrapper, +pub struct TimerKeyValue { + timer_key: TimerKey, value: Timer, } -impl TimerValue { +impl TimerKeyValue { pub fn new(timer_key: TimerKey, value: Timer) -> Self { - Self { - timer_key: TimerKeyWrapper(timer_key), - value, - } + Self { timer_key, value } } - pub fn new_sleep( - invocation_id: InvocationId, + pub fn complete_journal_entry( wake_up_time: MillisSinceEpoch, + invocation_id: InvocationId, entry_index: EntryIndex, ) -> Self { - let timer_key = TimerKeyWrapper(TimerKey { - invocation_uuid: invocation_id.invocation_uuid(), - timestamp: wake_up_time.as_u64(), - journal_index: entry_index, - }); - - Self { - timer_key, - value: Timer::CompleteSleepEntry(invocation_id.partition_key()), - } + let (timer_key, value) = + Timer::complete_journal_entry(wake_up_time.as_u64(), invocation_id, entry_index); + + Self { timer_key, value } } - pub fn new_invoke( - invocation_id: InvocationId, + pub fn invoke(wake_up_time: MillisSinceEpoch, service_invocation: ServiceInvocation) -> Self { + let (timer_key, value) = Timer::invoke(wake_up_time.as_u64(), service_invocation); + + Self { timer_key, value } + } + + pub fn clean_invocation_status( wake_up_time: MillisSinceEpoch, - entry_index: EntryIndex, - service_invocation: ServiceInvocation, + invocation_id: InvocationId, ) -> Self { - let timer_key = TimerKeyWrapper(TimerKey { - invocation_uuid: invocation_id.invocation_uuid(), - timestamp: wake_up_time.as_u64(), - journal_index: entry_index, - }); - - Self { - timer_key, - value: Timer::Invoke(service_invocation), - } + let (timer_key, value) = + Timer::clean_invocation_status(wake_up_time.as_u64(), invocation_id); + Self { timer_key, value } } pub fn into_inner(self) -> (TimerKey, Timer) { - (self.timer_key.0, self.value) + (self.timer_key, self.value) } pub fn key(&self) -> &TimerKey { - &self.timer_key.0 + &self.timer_key } pub fn value(&self) -> &Timer { @@ -79,84 +67,64 @@ impl TimerValue { } pub fn invocation_id(&self) -> InvocationId { - InvocationId::from_parts(self.value.partition_key(), self.timer_key.0.invocation_uuid) + self.value.invocation_id() } pub fn wake_up_time(&self) -> MillisSinceEpoch { - MillisSinceEpoch::from(self.timer_key.0.timestamp) + MillisSinceEpoch::from(self.timer_key.timestamp) } } -impl Hash for TimerValue { +impl Hash for TimerKeyValue { fn hash(&self, state: &mut H) { Hash::hash(&self.timer_key, state); // We don't hash the value field. } } -impl PartialEq for TimerValue { +impl PartialEq for TimerKeyValue { fn eq(&self, other: &Self) -> bool { self.timer_key == other.timer_key } } -impl Eq for TimerValue {} - -/// New type wrapper to implement [`restate_types::timer::TimerKey`] for [`TimerKey`]. -/// -/// # Important -/// We use the [`TimerKey`] to read the timers in an absolute order. The timer service -/// relies on this order in order to process each timer exactly once. That is the -/// reason why the in-memory and in-rocksdb ordering of the TimerKey needs to be exactly -/// the same. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct TimerKeyWrapper(TimerKey); +impl Eq for TimerKeyValue {} -impl TimerKeyWrapper { - pub fn into_inner(self) -> TimerKey { - self.0 - } -} - -impl Borrow for TimerValue { - fn borrow(&self) -> &TimerKeyWrapper { +impl Borrow for TimerKeyValue { + fn borrow(&self) -> &TimerKey { &self.timer_key } } -impl restate_types::timer::Timer for TimerValue { - type TimerKey = TimerKeyWrapper; +impl restate_types::timer::Timer for TimerKeyValue { + type TimerKey = TimerKey; fn timer_key(&self) -> &Self::TimerKey { &self.timer_key } } -impl restate_types::timer::TimerKey for TimerKeyWrapper { - fn wake_up_time(&self) -> MillisSinceEpoch { - MillisSinceEpoch::from(self.0.timestamp) - } -} - -impl From for TimerKeyWrapper { - fn from(timer_key: TimerKey) -> Self { - TimerKeyWrapper(timer_key) - } -} - -impl fmt::Display for TimerKeyWrapper { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", TimerKeyDisplay(&self.0)) - } -} - // Helper to display timer key #[derive(Debug)] pub struct TimerKeyDisplay<'a>(pub &'a TimerKey); impl<'a> fmt::Display for TimerKeyDisplay<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "[{}]({})", self.0.invocation_uuid, self.0.journal_index) + match self.0.kind { + TimerKeyKind::Invoke { invocation_uuid } => { + write!(f, "Delayed invocation '{}'", invocation_uuid) + } + TimerKeyKind::CompleteJournalEntry { + invocation_uuid, + journal_index, + } => write!( + f, + "Complete journal entry [{}] for '{}'", + journal_index, invocation_uuid + ), + TimerKeyKind::CleanInvocationStatus { invocation_uuid } => { + write!(f, "Clean invocation status '{}'", invocation_uuid) + } + } } } diff --git a/crates/worker/src/partition/action_effect_handler.rs b/crates/worker/src/partition/action_effect_handler.rs index 954cb5c19..560f99185 100644 --- a/crates/worker/src/partition/action_effect_handler.rs +++ b/crates/worker/src/partition/action_effect_handler.rs @@ -12,10 +12,9 @@ use super::leadership::ActionEffect; use restate_bifrost::Bifrost; use restate_core::metadata; use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber}; -use restate_storage_api::timer_table::{Timer, TimerKey}; use restate_types::identifiers::{PartitionId, PartitionKey, WithPartitionKey}; use restate_types::time::MillisSinceEpoch; -use restate_wal_protocol::timer::TimerValue; +use restate_wal_protocol::timer::TimerKeyValue; use restate_wal_protocol::{ append_envelope_to_bifrost, Command, Destination, Envelope, Header, Source, }; @@ -82,14 +81,9 @@ impl ActionEffectHandler { &mut self.bifrost, Envelope::new( header.clone(), - Command::ScheduleTimer(TimerValue::new( - TimerKey { - timestamp: MillisSinceEpoch::from(SystemTime::now() + duration) - .as_u64(), - invocation_uuid: invocation_id.invocation_uuid(), - journal_index: 0, - }, - Timer::CleanInvocationStatus(invocation_id), + Command::ScheduleTimer(TimerKeyValue::clean_invocation_status( + MillisSinceEpoch::from(SystemTime::now() + duration), + invocation_id, )), ), ) diff --git a/crates/worker/src/partition/leadership/action_collector.rs b/crates/worker/src/partition/leadership/action_collector.rs index b6e8f2ba9..8cfb7f1b8 100644 --- a/crates/worker/src/partition/leadership/action_collector.rs +++ b/crates/worker/src/partition/leadership/action_collector.rs @@ -11,7 +11,7 @@ use crate::partition::shuffle; use futures::{Stream, StreamExt}; use restate_types::identifiers::InvocationId; -use restate_wal_protocol::timer::TimerValue; +use restate_wal_protocol::timer::TimerKeyValue; use std::ops::DerefMut; use std::pin::Pin; use std::task::{Context, Poll}; @@ -46,7 +46,7 @@ impl ActionEffectStream { pub(crate) enum ActionEffect { Invoker(restate_invoker_api::Effect), Shuffle(shuffle::OutboxTruncation), - Timer(TimerValue), + Timer(TimerKeyValue), ScheduleCleanupTimer(InvocationId, Duration), } diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index b12e92606..62b48fc07 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -37,12 +37,12 @@ use restate_partition_store::PartitionStore; use restate_storage_api::deduplication_table::EpochSequenceNumber; use restate_types::identifiers::{InvocationId, PartitionKey}; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionLeaderEpoch}; -use restate_wal_protocol::timer::TimerValue; +use restate_wal_protocol::timer::TimerKeyValue; use super::storage::invoker::InvokerStorageReader; type PartitionStorage = storage::PartitionStorage; -type TimerService = restate_timer::TimerService; +type TimerService = restate_timer::TimerService; pub(crate) struct LeaderState { leader_epoch: LeaderEpoch, @@ -294,7 +294,7 @@ where } } - pub(crate) async fn run_timer(&mut self) -> TimerValue { + pub(crate) async fn run_timer(&mut self) -> TimerKeyValue { match self { LeadershipState::Follower { .. } => future::pending().await, LeadershipState::Leader { @@ -364,9 +364,7 @@ where message, } => shuffle_hint_tx.send(shuffle::NewOutboxMessage::new(seq_number, message)), Action::RegisterTimer { timer_value } => timer_service.as_mut().add_timer(timer_value), - Action::DeleteTimer { timer_key } => { - timer_service.as_mut().remove_timer(timer_key.into()) - } + Action::DeleteTimer { timer_key } => timer_service.as_mut().remove_timer(timer_key), Action::AckStoredEntry { invocation_id, entry_index, diff --git a/crates/worker/src/partition/state_machine/actions.rs b/crates/worker/src/partition/state_machine/actions.rs index 23573530d..8ed5cf514 100644 --- a/crates/worker/src/partition/state_machine/actions.rs +++ b/crates/worker/src/partition/state_machine/actions.rs @@ -16,7 +16,7 @@ use restate_types::ingress::IngressResponse; use restate_types::invocation::InvocationTarget; use restate_types::journal::Completion; use restate_types::message::MessageIndex; -use restate_wal_protocol::timer::TimerValue; +use restate_wal_protocol::timer::TimerKeyValue; use std::time::Duration; #[derive(Debug)] @@ -31,7 +31,7 @@ pub enum Action { message: OutboxMessage, }, RegisterTimer { - timer_value: TimerValue, + timer_value: TimerKeyValue, }, DeleteTimer { timer_key: TimerKey, diff --git a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs index 7279c1899..98a63158b 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs @@ -26,7 +26,7 @@ use restate_storage_api::invocation_status_table::{ use restate_storage_api::journal_table::{JournalEntry, ReadOnlyJournalTable}; use restate_storage_api::outbox_table::OutboxMessage; use restate_storage_api::service_status_table::VirtualObjectStatus; -use restate_storage_api::timer_table::{Timer, TimerKey}; +use restate_storage_api::timer_table::Timer; use restate_storage_api::Result as StorageResult; use restate_types::errors::{ InvocationError, InvocationErrorCode, CANCELED_INVOCATION_ERROR, GONE_INVOCATION_ERROR, @@ -51,7 +51,7 @@ use restate_types::message::MessageIndex; use restate_types::state_mut::ExternalStateMutation; use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::effects::{BuiltinServiceEffect, BuiltinServiceEffects}; -use restate_wal_protocol::timer::TimerValue; +use restate_wal_protocol::timer::TimerKeyValue; use restate_wal_protocol::Command; use std::collections::HashSet; use std::fmt::{Debug, Formatter}; @@ -250,13 +250,7 @@ where if let Some(execution_time) = service_invocation.execution_time { let span_context = service_invocation.span_context.clone(); effects.register_timer( - TimerValue::new_invoke( - service_invocation.invocation_id, - execution_time, - // This entry_index here makes little sense - 0, - service_invocation, - ), + TimerKeyValue::invoke(execution_time, service_invocation), span_context, ); // The span will be created later on invocation @@ -754,11 +748,11 @@ where ProtobufRawEntryCodec::deserialize(EntryType::Sleep, entry)? ); - let timer_key = TimerKey { - invocation_uuid: invocation_id.invocation_uuid(), + let (timer_key, _) = Timer::complete_journal_entry( + wake_up_time, + invocation_id, journal_index, - timestamp: wake_up_time, - }; + ); effects.delete_timer(timer_key); } @@ -804,20 +798,17 @@ where async fn on_timer( &mut self, - timer_value: TimerValue, + timer_value: TimerKeyValue, state: &mut State, effects: &mut Effects, ) -> Result<(), Error> { let (key, value) = timer_value.into_inner(); - let invocation_uuid = key.invocation_uuid; - let entry_index = key.journal_index; - effects.delete_timer(key); match value { - Timer::CompleteSleepEntry(partition_key) => { + Timer::CompleteJournalEntry(invocation_id, entry_index) => { Self::handle_completion( - InvocationId::from_parts(partition_key, invocation_uuid), + invocation_id, Completion { entry_index, result: CompletionResult::Empty, @@ -1306,9 +1297,9 @@ where journal_entry.deserialize_entry_ref::()? ); effects.register_timer( - TimerValue::new_sleep( - invocation_id, + TimerKeyValue::complete_journal_entry( MillisSinceEpoch::new(wake_up_time), + invocation_id, entry_index, ), invocation_metadata.journal_metadata.span_context.clone(), diff --git a/crates/worker/src/partition/state_machine/command_interpreter/tests.rs b/crates/worker/src/partition/state_machine/command_interpreter/tests.rs index e0a118243..2cc5974c1 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/tests.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/tests.rs @@ -12,6 +12,7 @@ use restate_service_protocol::pb::protocol::SleepEntryMessage; use restate_storage_api::idempotency_table::IdempotencyMetadata; use restate_storage_api::inbox_table::SequenceNumberInboxEntry; use restate_storage_api::invocation_status_table::{JournalMetadata, StatusTimestamps}; +use restate_storage_api::timer_table::{TimerKey, TimerKeyKind}; use restate_storage_api::{Result as StorageResult, StorageError}; use restate_test_util::matchers::*; use restate_test_util::{assert_eq, let_assert}; @@ -762,7 +763,9 @@ fn forward_canceled_completion_matcher(entry_index: EntryIndex) -> impl Matcher< fn delete_timer(entry_index: EntryIndex) -> impl Matcher { pat!(Effect::DeleteTimer(pat!(TimerKey { - journal_index: eq(entry_index), + kind: pat!(TimerKeyKind::CompleteJournalEntry { + journal_index: eq(entry_index), + }), timestamp: eq(1337), }))) } diff --git a/crates/worker/src/partition/state_machine/effects.rs b/crates/worker/src/partition/state_machine/effects.rs index 910089e58..06770761f 100644 --- a/crates/worker/src/partition/state_machine/effects.rs +++ b/crates/worker/src/partition/state_machine/effects.rs @@ -33,7 +33,7 @@ use restate_types::message::MessageIndex; use restate_types::state_mut::ExternalStateMutation; use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::timer::TimerKeyDisplay; -use restate_wal_protocol::timer::TimerValue; +use restate_wal_protocol::timer::TimerKeyValue; use std::collections::HashSet; use std::fmt; use std::time::Duration; @@ -99,7 +99,7 @@ pub(crate) enum Effect { // Timers RegisterTimer { - timer_value: TimerValue, + timer_value: TimerKeyValue, span_context: ServiceInvocationSpanContext, }, DeleteTimer(TimerKey), @@ -416,15 +416,15 @@ impl Effect { span_context, .. } => match timer_value.value() { - Timer::CompleteSleepEntry(_) => { + Timer::CompleteJournalEntry(_, entry_index) => { info_span_if_leader!( is_leader, span_context.is_sampled(), span_context.as_parent(), "sleep", - restate.invocation.id = %timer_value.invocation_id(), - restate.timer.key = %TimerKeyDisplay(timer_value.key()), + restate.journal.index = entry_index, restate.timer.wake_up_time = %timer_value.wake_up_time(), + restate.timer.key = %TimerKeyDisplay(timer_value.key()), // without converting to i64 this field will encode as a string // however, overflowing i64 seems unlikely restate.internal.end_time = i64::try_from(timer_value.wake_up_time().as_u64()).expect("wake up time should fit into i64"), @@ -432,8 +432,9 @@ impl Effect { debug_if_leader!( is_leader, - restate.timer.key = %TimerKeyDisplay(timer_value.key()), + restate.journal.index = entry_index, restate.timer.wake_up_time = %timer_value.wake_up_time(), + restate.timer.key = %TimerKeyDisplay(timer_value.key()), "Effect: Register Sleep timer" ) } @@ -443,19 +444,17 @@ impl Effect { is_leader, rpc.service = %service_invocation.invocation_target.service_name(), rpc.method = %service_invocation.invocation_target.handler_name(), - restate.invocation.id = %service_invocation.invocation_id, restate.invocation.target = %service_invocation.invocation_target, - restate.timer.key = %TimerKeyDisplay(timer_value.key()), restate.timer.wake_up_time = %timer_value.wake_up_time(), + restate.timer.key = %TimerKeyDisplay(timer_value.key()), "Effect: Register background invoke timer" ) } - Timer::CleanInvocationStatus(invocation_id) => { + Timer::CleanInvocationStatus(_) => { debug_if_leader!( is_leader, - restate.invocation.id = %invocation_id, - restate.timer.key = %TimerKeyDisplay(timer_value.key()), restate.timer.wake_up_time = %timer_value.wake_up_time(), + restate.timer.key = %TimerKeyDisplay(timer_value.key()), "Effect: Register cleanup invocation status timer" ) } @@ -828,7 +827,7 @@ impl Effects { pub(crate) fn register_timer( &mut self, - timer_value: TimerValue, + timer_value: TimerKeyValue, span_context: ServiceInvocationSpanContext, ) { self.effects.push(Effect::RegisterTimer { diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 65c9bee9a..ccccd3362 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -729,11 +729,11 @@ mod tests { IdempotencyMetadata, IdempotencyTable, ReadOnlyIdempotencyTable, }; use restate_storage_api::invocation_status_table::{CompletedInvocation, StatusTimestamps}; - use restate_storage_api::timer_table::{Timer, TimerKey}; + use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind}; use restate_types::errors::GONE_INVOCATION_ERROR; use restate_types::identifiers::IdempotencyId; use restate_types::invocation::{Idempotency, InvocationTarget}; - use restate_wal_protocol::timer::TimerValue; + use restate_wal_protocol::timer::TimerKeyValue; use test_log::test; #[test(tokio::test)] @@ -1120,11 +1120,12 @@ mod tests { // Send timer fired command let _ = state_machine - .apply(Command::Timer(TimerValue::new( + .apply(Command::Timer(TimerKeyValue::new( TimerKey { + kind: TimerKeyKind::Invoke { + invocation_uuid: invocation_id.invocation_uuid(), + }, timestamp: 0, - invocation_uuid: invocation_id.invocation_uuid(), - journal_index: 0, }, Timer::CleanInvocationStatus(invocation_id), ))) diff --git a/crates/worker/src/partition/storage/mod.rs b/crates/worker/src/partition/storage/mod.rs index 3d5926099..c7b3afab2 100644 --- a/crates/worker/src/partition/storage/mod.rs +++ b/crates/worker/src/partition/storage/mod.rs @@ -40,7 +40,7 @@ use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal::CompletionResult; use restate_types::logs::Lsn; use restate_types::message::MessageIndex; -use restate_wal_protocol::timer::{TimerKeyWrapper, TimerValue}; +use restate_wal_protocol::timer::TimerKeyValue; use std::future::Future; use std::ops::RangeInclusive; @@ -666,22 +666,18 @@ where } } -impl TimerReader for PartitionStorage +impl TimerReader for PartitionStorage where for<'a> Storage: TimerTable + Send + Sync + 'a, { async fn get_timers( &mut self, num_timers: usize, - previous_timer_key: Option, - ) -> Vec { + previous_timer_key: Option, + ) -> Vec { self.storage - .next_timers_greater_than( - self.partition_id, - previous_timer_key.map(|t| t.into_inner()).as_ref(), - num_timers, - ) - .map(|result| result.map(|(timer_key, timer)| TimerValue::new(timer_key, timer))) + .next_timers_greater_than(self.partition_id, previous_timer_key.as_ref(), num_timers) + .map(|result| result.map(|(timer_key, timer)| TimerKeyValue::new(timer_key, timer))) // TODO: Update timer service to maintain transaction while reading the timer stream: See https://github.com/restatedev/restate/issues/273 // have to collect the stream because it depends on the local transaction .try_collect::>()