From 11d8c2f51f97ce5f908f9145f38e37fedb810d18 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 29 Apr 2024 13:42:13 +0200 Subject: [PATCH 1/7] Replace custom InvocationId encoding with Protobuf in storage This commit changes how we store the InvocationId in the partition storage from a custom encoding to using Protobuf. Thereby, we will be able to evolve the InvocationId in the future if needed. This fixes #1478. --- .../proto/dev/restate/storage/v1/domain.proto | 22 ++--- crates/storage-api/src/storage.rs | 99 +++++++++++-------- 2 files changed, 71 insertions(+), 50 deletions(-) diff --git a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto index 8b44c7fda..833698fb0 100644 --- a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto +++ b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto @@ -68,7 +68,7 @@ message JournalMeta { message Source { message Service { - bytes invocation_id = 1; + InvocationId invocation_id = 1; InvocationTarget invocation_target = 2; } @@ -221,7 +221,7 @@ message Header { } message ServiceInvocation { - bytes invocation_id = 1; + InvocationId invocation_id = 1; InvocationTarget invocation_target = 2; bytes argument = 3; ServiceInvocationResponseSink response_sink = 4; @@ -240,7 +240,7 @@ message StateMutation { message InboxEntry { message Invocation { - bytes invocation_id = 1; + InvocationId invocation_id = 1; ServiceId service_id = 2; } @@ -252,7 +252,7 @@ message InboxEntry { message InvocationResolutionResult { message Success { - bytes invocation_id = 1; + InvocationId invocation_id = 1; InvocationTarget invocation_target = 2; SpanContext span_context = 3; } @@ -264,7 +264,7 @@ message InvocationResolutionResult { } message BackgroundCallResolutionResult { - bytes invocation_id = 1; + InvocationId invocation_id = 1; InvocationTarget invocation_target = 2; SpanContext span_context = 3; } @@ -311,7 +311,7 @@ message EnrichedEntryHeader { } message CompleteAwakeable { - bytes invocation_id = 1; + InvocationId invocation_id = 1; uint32 entry_index = 2; } @@ -404,17 +404,17 @@ message OutboxMessage { } message OutboxServiceInvocationResponse { - bytes invocation_id = 1; + InvocationId invocation_id = 1; uint32 entry_index = 2; ResponseResult response_result = 3; } message OutboxKill { - bytes invocation_id = 1; + InvocationId invocation_id = 1; } message OutboxCancel { - bytes invocation_id = 1; + InvocationId invocation_id = 1; } oneof outbox_message { @@ -437,7 +437,7 @@ message Timer { } message CleanInvocationStatus { - bytes invocation_id = 1; + InvocationId invocation_id = 1; } oneof value { @@ -478,7 +478,7 @@ message DedupSequenceNumber { // --------------------------------------------------------------------- message IdempotencyMetadata { - bytes invocation_id = 1; + InvocationId invocation_id = 1; } message IdempotentRequestMetadata { diff --git a/crates/storage-api/src/storage.rs b/crates/storage-api/src/storage.rs index 26e39d5af..1ffe522c5 100644 --- a/crates/storage-api/src/storage.rs +++ b/crates/storage-api/src/storage.rs @@ -757,8 +757,10 @@ pub mod v1 { { source::Source::Ingress(_) => restate_types::invocation::Source::Ingress, source::Source::Service(service) => restate_types::invocation::Source::Service( - restate_types::identifiers::InvocationId::from_slice( - &service.invocation_id, + restate_types::identifiers::InvocationId::try_from( + service + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, )?, restate_types::invocation::InvocationTarget::try_from( service @@ -781,7 +783,7 @@ pub mod v1 { invocation_id, invocation_target, ) => source::Source::Service(source::Service { - invocation_id: invocation_id.into(), + invocation_id: Some(InvocationId::from(invocation_id)), invocation_target: Some(InvocationTarget::from(invocation_target)), }), restate_types::invocation::Source::Internal => source::Source::Internal(()), @@ -806,8 +808,10 @@ pub mod v1 { .service_id .ok_or(ConversionError::missing_field("service_id"))?, )?, - restate_types::identifiers::InvocationId::from_slice( - &invocation.invocation_id, + restate_types::identifiers::InvocationId::try_from( + invocation + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, )?, ) } @@ -829,7 +833,7 @@ pub mod v1 { crate::inbox_table::InboxEntry::Invocation(service_id, invocation_id) => { inbox_entry::Entry::Invocation(inbox_entry::Invocation { service_id: Some(service_id.into()), - invocation_id: invocation_id.into(), + invocation_id: Some(InvocationId::from(invocation_id)), }) } crate::inbox_table::InboxEntry::StateMutation(state_mutation) => { @@ -859,8 +863,9 @@ pub mod v1 { idempotency, } = value; - let invocation_id = - restate_types::identifiers::InvocationId::from_slice(&invocation_id)?; + let invocation_id = restate_types::identifiers::InvocationId::try_from( + invocation_id.ok_or(ConversionError::missing_field("invocation_id"))?, + )?; let invocation_target = restate_types::invocation::InvocationTarget::try_from( invocation_target.ok_or(ConversionError::missing_field("invocation_target"))?, @@ -911,7 +916,6 @@ pub mod v1 { impl From for ServiceInvocation { fn from(value: restate_types::invocation::ServiceInvocation) -> Self { - let invocation_id = Bytes::copy_from_slice(&value.invocation_id.to_bytes()); let invocation_target = InvocationTarget::from(value.invocation_target); let span_context = SpanContext::from(value.span_context); let response_sink = ServiceInvocationResponseSink::from(value.response_sink); @@ -919,7 +923,7 @@ pub mod v1 { let headers = value.headers.into_iter().map(Into::into).collect(); ServiceInvocation { - invocation_id, + invocation_id: Some(InvocationId::from(value.invocation_id)), invocation_target: Some(invocation_target), span_context: Some(span_context), response_sink: Some(response_sink), @@ -1535,11 +1539,11 @@ pub mod v1 { }) => { restate_types::journal::enriched::EnrichedEntryHeader::CompleteAwakeable { enrichment_result: AwakeableEnrichmentResult { - invocation_id: - restate_types::identifiers::InvocationId::from_slice( - &invocation_id, - ) - .map_err(ConversionError::invalid_data)?, + invocation_id: restate_types::identifiers::InvocationId::try_from( + invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, + ) + .map_err(ConversionError::invalid_data)?, entry_index, }, } @@ -1618,9 +1622,7 @@ pub mod v1 { enrichment_result, .. } => enriched_entry_header::Kind::CompleteAwakeable(CompleteAwakeable { - invocation_id: Bytes::copy_from_slice( - &enrichment_result.invocation_id.to_bytes(), - ), + invocation_id: Some(InvocationId::from(enrichment_result.invocation_id)), entry_index: enrichment_result.entry_index, }), restate_types::journal::enriched::EnrichedEntryHeader::Run { .. } => { @@ -1649,8 +1651,10 @@ pub mod v1 { { invocation_resolution_result::Result::None(_) => None, invocation_resolution_result::Result::Success(success) => { - let invocation_id = restate_types::identifiers::InvocationId::from_slice( - &success.invocation_id, + let invocation_id = restate_types::identifiers::InvocationId::try_from( + success + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, )?; let invocation_target = @@ -1692,7 +1696,7 @@ pub mod v1 { span_context, } => invocation_resolution_result::Result::Success( invocation_resolution_result::Success { - invocation_id: invocation_id.into(), + invocation_id: Some(InvocationId::from(invocation_id)), invocation_target: Some(invocation_target.into()), span_context: Some(SpanContext::from(span_context)), }, @@ -1712,8 +1716,11 @@ pub mod v1 { type Error = ConversionError; fn try_from(value: BackgroundCallResolutionResult) -> Result { - let invocation_id = - restate_types::identifiers::InvocationId::from_slice(&value.invocation_id)?; + let invocation_id = restate_types::identifiers::InvocationId::try_from( + value + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, + )?; let invocation_target = restate_types::invocation::InvocationTarget::try_from( value @@ -1740,7 +1747,7 @@ pub mod v1 { { fn from(value: restate_types::journal::enriched::CallEnrichmentResult) -> Self { BackgroundCallResolutionResult { - invocation_id: value.invocation_id.into(), + invocation_id: Some(InvocationId::from(value.invocation_id)), invocation_target: Some(value.invocation_target.into()), span_context: Some(SpanContext::from(value.span_context)), } @@ -1769,8 +1776,10 @@ pub mod v1 { ) => crate::outbox_table::OutboxMessage::ServiceResponse( restate_types::invocation::InvocationResponse { entry_index: invocation_response.entry_index, - id: restate_types::identifiers::InvocationId::from_slice( - &invocation_response.invocation_id, + id: restate_types::identifiers::InvocationId::try_from( + invocation_response + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, )?, result: restate_types::invocation::ResponseResult::try_from( invocation_response @@ -1782,8 +1791,10 @@ pub mod v1 { outbox_message::OutboxMessage::Kill(outbox_kill) => { crate::outbox_table::OutboxMessage::InvocationTermination( InvocationTermination::kill( - restate_types::identifiers::InvocationId::from_slice( - &outbox_kill.invocation_id, + restate_types::identifiers::InvocationId::try_from( + outbox_kill + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, )?, ), ) @@ -1791,8 +1802,10 @@ pub mod v1 { outbox_message::OutboxMessage::Cancel(outbox_cancel) => { crate::outbox_table::OutboxMessage::InvocationTermination( InvocationTermination::cancel( - restate_types::identifiers::InvocationId::from_slice( - &outbox_cancel.invocation_id, + restate_types::identifiers::InvocationId::try_from( + outbox_cancel + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, )?, ), ) @@ -1819,7 +1832,7 @@ pub mod v1 { outbox_message::OutboxMessage::ServiceInvocationResponse( OutboxServiceInvocationResponse { entry_index: invocation_response.entry_index, - invocation_id: invocation_response.id.into(), + invocation_id: Some(InvocationId::from(invocation_response.id)), response_result: Some(ResponseResult::from( invocation_response.result, )), @@ -1831,12 +1844,16 @@ pub mod v1 { ) => match invocation_termination.flavor { TerminationFlavor::Kill => { outbox_message::OutboxMessage::Kill(OutboxKill { - invocation_id: invocation_termination.invocation_id.into(), + invocation_id: Some(InvocationId::from( + invocation_termination.invocation_id, + )), }) } TerminationFlavor::Cancel => { outbox_message::OutboxMessage::Cancel(OutboxCancel { - invocation_id: invocation_termination.invocation_id.into(), + invocation_id: Some(InvocationId::from( + invocation_termination.invocation_id, + )), }) } }, @@ -1910,8 +1927,10 @@ pub mod v1 { ), timer::Value::CleanInvocationStatus(clean_invocation_status) => { crate::timer_table::Timer::CleanInvocationStatus( - restate_types::identifiers::InvocationId::from_slice( - &clean_invocation_status.invocation_id, + restate_types::identifiers::InvocationId::try_from( + clean_invocation_status + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, )?, ) } @@ -1935,7 +1954,7 @@ pub mod v1 { } crate::timer_table::Timer::CleanInvocationStatus(invocation_id) => { timer::Value::CleanInvocationStatus(timer::CleanInvocationStatus { - invocation_id: Bytes::copy_from_slice(&invocation_id.to_bytes()), + invocation_id: Some(InvocationId::from(invocation_id)), }) } }), @@ -2024,7 +2043,7 @@ pub mod v1 { impl From for IdempotencyMetadata { fn from(value: crate::idempotency_table::IdempotencyMetadata) -> Self { IdempotencyMetadata { - invocation_id: Bytes::copy_from_slice(&value.invocation_id.to_bytes()), + invocation_id: Some(InvocationId::from(value.invocation_id)), } } } @@ -2034,8 +2053,10 @@ pub mod v1 { fn try_from(value: IdempotencyMetadata) -> Result { Ok(crate::idempotency_table::IdempotencyMetadata { - invocation_id: restate_types::identifiers::InvocationId::from_slice( - &value.invocation_id, + invocation_id: restate_types::identifiers::InvocationId::try_from( + value + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, ) .map_err(|e| ConversionError::invalid_data(e))?, }) From 0f325fd43af6d4a501953bfcc53f87c07b33f018 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 29 Apr 2024 13:59:26 +0200 Subject: [PATCH 2/7] Make Timer::CompleteSleepEntry value self-contained This commit makes the Timer::CompleteSleepEntry contain the full InvocationId and the journal entry index to make it self-contained. This simplifies the change of the TimerKey in a follow-up commit. --- .../tests/timer_table_test/mod.rs | 29 ++++++++++--------- .../proto/dev/restate/storage/v1/domain.proto | 3 +- crates/storage-api/src/storage.rs | 20 +++++++++---- crates/storage-api/src/timer_table/mod.rs | 4 +-- crates/wal-protocol/src/timer.rs | 2 +- .../state_machine/command_interpreter/mod.rs | 7 ++--- .../src/partition/state_machine/effects.rs | 15 ++++++---- 7 files changed, 45 insertions(+), 35 deletions(-) diff --git a/crates/partition-store/tests/timer_table_test/mod.rs b/crates/partition-store/tests/timer_table_test/mod.rs index 6b5d5990e..2376199a9 100644 --- a/crates/partition-store/tests/timer_table_test/mod.rs +++ b/crates/partition-store/tests/timer_table_test/mod.rs @@ -13,12 +13,13 @@ use futures_util::StreamExt; use restate_partition_store::PartitionStore; use restate_storage_api::timer_table::{Timer, TimerKey, TimerTable}; use restate_storage_api::Transaction; -use restate_types::identifiers::{InvocationUuid, PartitionId, ServiceId}; +use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionId, ServiceId}; use restate_types::invocation::ServiceInvocation; use std::pin::pin; -const FIXTURE_INVOCATION: InvocationUuid = +const FIXTURE_INVOCATION_UUID: InvocationUuid = InvocationUuid::from_parts(1706027034946, 12345678900001); +const FIXTURE_INVOCATION: InvocationId = InvocationId::from_parts(1337, FIXTURE_INVOCATION_UUID); const PARTITION1337: PartitionId = PartitionId::new_unchecked(1337); @@ -26,22 +27,22 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, + invocation_uuid: FIXTURE_INVOCATION.invocation_uuid(), journal_index: 0, timestamp: 0, }, - Timer::CompleteSleepEntry(1337), + Timer::CompleteSleepEntry(FIXTURE_INVOCATION, 0), ) .await; txn.add_timer( PARTITION1337, &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, + invocation_uuid: FIXTURE_INVOCATION.invocation_uuid(), journal_index: 1, timestamp: 0, }, - Timer::CompleteSleepEntry(1337), + Timer::CompleteSleepEntry(FIXTURE_INVOCATION, 1), ) .await; @@ -51,7 +52,7 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, + invocation_uuid: service_invocation.invocation_id.invocation_uuid(), journal_index: 2, timestamp: 1, }, @@ -65,22 +66,22 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, + invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, timestamp: 0, }, - Timer::CompleteSleepEntry(1336), + Timer::CompleteSleepEntry(InvocationId::from_parts(1336, FIXTURE_INVOCATION_UUID), 0), ) .await; txn.add_timer( PartitionId::from(1338), &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, + invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, timestamp: 0, }, - Timer::CompleteSleepEntry(1338), + Timer::CompleteSleepEntry(InvocationId::from_parts(1338, FIXTURE_INVOCATION_UUID), 0), ) .await; } @@ -98,7 +99,7 @@ async fn demo_how_to_find_first_timers_in_a_partition(txn: &mut T async fn find_timers_greater_than(txn: &mut T) { let timer_key = &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, + invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, timestamp: 0, }; @@ -123,7 +124,7 @@ async fn delete_the_first_timer(txn: &mut T) { txn.delete_timer( PARTITION1337, &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, + invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, timestamp: 0, }, @@ -133,7 +134,7 @@ async fn delete_the_first_timer(txn: &mut T) { async fn verify_next_timer_after_deletion(txn: &mut T) { let timer_key = &TimerKey { - invocation_uuid: FIXTURE_INVOCATION, + invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, timestamp: 0, }; diff --git a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto index 833698fb0..4efcd3e58 100644 --- a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto +++ b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto @@ -433,7 +433,8 @@ message OutboxMessage { message Timer { message CompleteSleepEntry { - uint64 partition_key = 1; + InvocationId invocation_id = 1; + uint32 entry_index = 2; } message CleanInvocationStatus { diff --git a/crates/storage-api/src/storage.rs b/crates/storage-api/src/storage.rs index 1ffe522c5..fa6cdf5bc 100644 --- a/crates/storage-api/src/storage.rs +++ b/crates/storage-api/src/storage.rs @@ -1920,7 +1920,13 @@ pub mod v1 { Ok( match value.value.ok_or(ConversionError::missing_field("value"))? { timer::Value::CompleteSleepEntry(cse) => { - crate::timer_table::Timer::CompleteSleepEntry(cse.partition_key) + crate::timer_table::Timer::CompleteSleepEntry( + restate_types::identifiers::InvocationId::try_from( + cse.invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, + )?, + cse.entry_index, + ) } timer::Value::Invoke(si) => crate::timer_table::Timer::Invoke( restate_types::invocation::ServiceInvocation::try_from(si)?, @@ -1943,11 +1949,13 @@ pub mod v1 { fn from(value: crate::timer_table::Timer) -> Self { Timer { value: Some(match value { - crate::timer_table::Timer::CompleteSleepEntry(partition_key) => { - timer::Value::CompleteSleepEntry(timer::CompleteSleepEntry { - partition_key, - }) - } + crate::timer_table::Timer::CompleteSleepEntry( + invocation_id, + entry_index, + ) => timer::Value::CompleteSleepEntry(timer::CompleteSleepEntry { + invocation_id: Some(InvocationId::from(invocation_id)), + entry_index, + }), crate::timer_table::Timer::Invoke(si) => { timer::Value::Invoke(ServiceInvocation::from(si)) diff --git a/crates/storage-api/src/timer_table/mod.rs b/crates/storage-api/src/timer_table/mod.rs index 38b174029..747a8febb 100644 --- a/crates/storage-api/src/timer_table/mod.rs +++ b/crates/storage-api/src/timer_table/mod.rs @@ -42,7 +42,7 @@ impl Ord for TimerKey { #[derive(Clone, Debug, Eq, PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum Timer { - CompleteSleepEntry(PartitionKey), + CompleteSleepEntry(InvocationId, u32), Invoke(ServiceInvocation), CleanInvocationStatus(InvocationId), } @@ -50,7 +50,7 @@ pub enum Timer { impl WithPartitionKey for Timer { fn partition_key(&self) -> PartitionKey { match self { - Timer::CompleteSleepEntry(partition_key) => *partition_key, + Timer::CompleteSleepEntry(invocation_id, _) => invocation_id.partition_key(), Timer::Invoke(service_invocation) => service_invocation.partition_key(), Timer::CleanInvocationStatus(invocation_id) => invocation_id.partition_key(), } diff --git a/crates/wal-protocol/src/timer.rs b/crates/wal-protocol/src/timer.rs index ec124cfd2..3bbfea252 100644 --- a/crates/wal-protocol/src/timer.rs +++ b/crates/wal-protocol/src/timer.rs @@ -44,7 +44,7 @@ impl TimerValue { Self { timer_key, - value: Timer::CompleteSleepEntry(invocation_id.partition_key()), + value: Timer::CompleteSleepEntry(invocation_id, entry_index), } } diff --git a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs index 7279c1899..bec830f89 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs @@ -809,15 +809,12 @@ where effects: &mut Effects, ) -> Result<(), Error> { let (key, value) = timer_value.into_inner(); - let invocation_uuid = key.invocation_uuid; - let entry_index = key.journal_index; - effects.delete_timer(key); match value { - Timer::CompleteSleepEntry(partition_key) => { + Timer::CompleteSleepEntry(invocation_id, entry_index) => { Self::handle_completion( - InvocationId::from_parts(partition_key, invocation_uuid), + invocation_id, Completion { entry_index, result: CompletionResult::Empty, diff --git a/crates/worker/src/partition/state_machine/effects.rs b/crates/worker/src/partition/state_machine/effects.rs index 910089e58..0f77e67cf 100644 --- a/crates/worker/src/partition/state_machine/effects.rs +++ b/crates/worker/src/partition/state_machine/effects.rs @@ -416,15 +416,16 @@ impl Effect { span_context, .. } => match timer_value.value() { - Timer::CompleteSleepEntry(_) => { + Timer::CompleteSleepEntry(invocation_id, entry_index) => { info_span_if_leader!( is_leader, span_context.is_sampled(), span_context.as_parent(), "sleep", - restate.invocation.id = %timer_value.invocation_id(), - restate.timer.key = %TimerKeyDisplay(timer_value.key()), + restate.invocation.id = %invocation_id, + restate.journal.index = entry_index, restate.timer.wake_up_time = %timer_value.wake_up_time(), + restate.timer.key = %TimerKeyDisplay(timer_value.key()), // without converting to i64 this field will encode as a string // however, overflowing i64 seems unlikely restate.internal.end_time = i64::try_from(timer_value.wake_up_time().as_u64()).expect("wake up time should fit into i64"), @@ -432,8 +433,10 @@ impl Effect { debug_if_leader!( is_leader, - restate.timer.key = %TimerKeyDisplay(timer_value.key()), + restate.journal.index = entry_index, + restate.invocation.id = %invocation_id, restate.timer.wake_up_time = %timer_value.wake_up_time(), + restate.timer.key = %TimerKeyDisplay(timer_value.key()), "Effect: Register Sleep timer" ) } @@ -445,8 +448,8 @@ impl Effect { rpc.method = %service_invocation.invocation_target.handler_name(), restate.invocation.id = %service_invocation.invocation_id, restate.invocation.target = %service_invocation.invocation_target, - restate.timer.key = %TimerKeyDisplay(timer_value.key()), restate.timer.wake_up_time = %timer_value.wake_up_time(), + restate.timer.key = %TimerKeyDisplay(timer_value.key()), "Effect: Register background invoke timer" ) } @@ -454,8 +457,8 @@ impl Effect { debug_if_leader!( is_leader, restate.invocation.id = %invocation_id, - restate.timer.key = %TimerKeyDisplay(timer_value.key()), restate.timer.wake_up_time = %timer_value.wake_up_time(), + restate.timer.key = %TimerKeyDisplay(timer_value.key()), "Effect: Register cleanup invocation status timer" ) } From 2bcec0f4406125451e52a54deea9ed39959ae853 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 29 Apr 2024 14:07:05 +0200 Subject: [PATCH 3/7] Remove TimerKeyWrapper since it is no longer needed The TimerKey struct now directly implements the TimerKey trait used by the timer service. --- crates/storage-api/src/timer_table/mod.rs | 12 ++++ crates/wal-protocol/src/timer.rs | 64 ++++--------------- crates/worker/src/partition/leadership/mod.rs | 4 +- crates/worker/src/partition/storage/mod.rs | 10 +-- 4 files changed, 29 insertions(+), 61 deletions(-) diff --git a/crates/storage-api/src/timer_table/mod.rs b/crates/storage-api/src/timer_table/mod.rs index 747a8febb..7661886c8 100644 --- a/crates/storage-api/src/timer_table/mod.rs +++ b/crates/storage-api/src/timer_table/mod.rs @@ -14,9 +14,15 @@ use restate_types::identifiers::{ InvocationId, InvocationUuid, PartitionId, PartitionKey, WithPartitionKey, }; use restate_types::invocation::ServiceInvocation; +use restate_types::time::MillisSinceEpoch; use std::cmp::Ordering; use std::future::Future; +/// # Important +/// We use the [`TimerKey`] to read the timers in an absolute order. The timer service +/// relies on this order in order to process each timer exactly once. That is the +/// reason why the in-memory and in-rocksdb ordering of the TimerKey needs to be exactly +/// the same. #[derive(Clone, Debug, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub struct TimerKey { pub timestamp: u64, @@ -39,6 +45,12 @@ impl Ord for TimerKey { } } +impl restate_types::timer::TimerKey for TimerKey { + fn wake_up_time(&self) -> MillisSinceEpoch { + self.timestamp.into() + } +} + #[derive(Clone, Debug, Eq, PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum Timer { diff --git a/crates/wal-protocol/src/timer.rs b/crates/wal-protocol/src/timer.rs index 3bbfea252..fe088fe7a 100644 --- a/crates/wal-protocol/src/timer.rs +++ b/crates/wal-protocol/src/timer.rs @@ -19,16 +19,13 @@ use std::hash::{Hash, Hasher}; #[derive(Debug, Clone)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct TimerValue { - timer_key: TimerKeyWrapper, + timer_key: TimerKey, value: Timer, } impl TimerValue { pub fn new(timer_key: TimerKey, value: Timer) -> Self { - Self { - timer_key: TimerKeyWrapper(timer_key), - value, - } + Self { timer_key, value } } pub fn new_sleep( @@ -36,11 +33,11 @@ impl TimerValue { wake_up_time: MillisSinceEpoch, entry_index: EntryIndex, ) -> Self { - let timer_key = TimerKeyWrapper(TimerKey { + let timer_key = TimerKey { invocation_uuid: invocation_id.invocation_uuid(), timestamp: wake_up_time.as_u64(), journal_index: entry_index, - }); + }; Self { timer_key, @@ -54,11 +51,11 @@ impl TimerValue { entry_index: EntryIndex, service_invocation: ServiceInvocation, ) -> Self { - let timer_key = TimerKeyWrapper(TimerKey { + let timer_key = TimerKey { invocation_uuid: invocation_id.invocation_uuid(), timestamp: wake_up_time.as_u64(), journal_index: entry_index, - }); + }; Self { timer_key, @@ -67,11 +64,11 @@ impl TimerValue { } pub fn into_inner(self) -> (TimerKey, Timer) { - (self.timer_key.0, self.value) + (self.timer_key, self.value) } pub fn key(&self) -> &TimerKey { - &self.timer_key.0 + &self.timer_key } pub fn value(&self) -> &Timer { @@ -79,11 +76,11 @@ impl TimerValue { } pub fn invocation_id(&self) -> InvocationId { - InvocationId::from_parts(self.value.partition_key(), self.timer_key.0.invocation_uuid) + InvocationId::from_parts(self.value.partition_key(), self.timer_key.invocation_uuid) } pub fn wake_up_time(&self) -> MillisSinceEpoch { - MillisSinceEpoch::from(self.timer_key.0.timestamp) + MillisSinceEpoch::from(self.timer_key.timestamp) } } @@ -102,55 +99,20 @@ impl PartialEq for TimerValue { impl Eq for TimerValue {} -/// New type wrapper to implement [`restate_types::timer::TimerKey`] for [`TimerKey`]. -/// -/// # Important -/// We use the [`TimerKey`] to read the timers in an absolute order. The timer service -/// relies on this order in order to process each timer exactly once. That is the -/// reason why the in-memory and in-rocksdb ordering of the TimerKey needs to be exactly -/// the same. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct TimerKeyWrapper(TimerKey); - -impl TimerKeyWrapper { - pub fn into_inner(self) -> TimerKey { - self.0 - } -} - -impl Borrow for TimerValue { - fn borrow(&self) -> &TimerKeyWrapper { +impl Borrow for TimerValue { + fn borrow(&self) -> &TimerKey { &self.timer_key } } impl restate_types::timer::Timer for TimerValue { - type TimerKey = TimerKeyWrapper; + type TimerKey = TimerKey; fn timer_key(&self) -> &Self::TimerKey { &self.timer_key } } -impl restate_types::timer::TimerKey for TimerKeyWrapper { - fn wake_up_time(&self) -> MillisSinceEpoch { - MillisSinceEpoch::from(self.0.timestamp) - } -} - -impl From for TimerKeyWrapper { - fn from(timer_key: TimerKey) -> Self { - TimerKeyWrapper(timer_key) - } -} - -impl fmt::Display for TimerKeyWrapper { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", TimerKeyDisplay(&self.0)) - } -} - // Helper to display timer key #[derive(Debug)] pub struct TimerKeyDisplay<'a>(pub &'a TimerKey); diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index b12e92606..5b272d2b3 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -364,9 +364,7 @@ where message, } => shuffle_hint_tx.send(shuffle::NewOutboxMessage::new(seq_number, message)), Action::RegisterTimer { timer_value } => timer_service.as_mut().add_timer(timer_value), - Action::DeleteTimer { timer_key } => { - timer_service.as_mut().remove_timer(timer_key.into()) - } + Action::DeleteTimer { timer_key } => timer_service.as_mut().remove_timer(timer_key), Action::AckStoredEntry { invocation_id, entry_index, diff --git a/crates/worker/src/partition/storage/mod.rs b/crates/worker/src/partition/storage/mod.rs index 3d5926099..e295cddec 100644 --- a/crates/worker/src/partition/storage/mod.rs +++ b/crates/worker/src/partition/storage/mod.rs @@ -40,7 +40,7 @@ use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal::CompletionResult; use restate_types::logs::Lsn; use restate_types::message::MessageIndex; -use restate_wal_protocol::timer::{TimerKeyWrapper, TimerValue}; +use restate_wal_protocol::timer::TimerValue; use std::future::Future; use std::ops::RangeInclusive; @@ -673,14 +673,10 @@ where async fn get_timers( &mut self, num_timers: usize, - previous_timer_key: Option, + previous_timer_key: Option, ) -> Vec { self.storage - .next_timers_greater_than( - self.partition_id, - previous_timer_key.map(|t| t.into_inner()).as_ref(), - num_timers, - ) + .next_timers_greater_than(self.partition_id, previous_timer_key.as_ref(), num_timers) .map(|result| result.map(|(timer_key, timer)| TimerValue::new(timer_key, timer))) // TODO: Update timer service to maintain transaction while reading the timer stream: See https://github.com/restatedev/restate/issues/273 // have to collect the stream because it depends on the local transaction From 0b5d40e3ce97030221906a3c1d3f11bd970832c8 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 29 Apr 2024 15:38:48 +0200 Subject: [PATCH 4/7] Introduce TimerKind to support different types of timers The TimerKind is used to store differently typed timers. Currently, we support journal scoped timers that are scoped to a certain journal entry. Additionally, we support invocation scoped timers that are scoped to an invocation (e.g. only a single timer for a given wake up time for a given invocation can exist). This fixes #1417. --- Cargo.lock | 1 + crates/partition-store/src/keys.rs | 70 +++++++ crates/partition-store/src/timer_table/mod.rs | 188 +++++++++++++++--- .../tests/timer_table_test/mod.rs | 69 +++++-- crates/storage-api/Cargo.toml | 1 + crates/storage-api/src/timer_table/mod.rs | 92 ++++++++- crates/types/src/identifiers.rs | 6 + crates/types/src/timer.rs | 2 +- crates/wal-protocol/src/timer.rs | 46 +++-- .../src/partition/action_effect_handler.rs | 12 +- .../state_machine/command_interpreter/mod.rs | 10 +- .../command_interpreter/tests.rs | 5 +- .../worker/src/partition/state_machine/mod.rs | 7 +- 13 files changed, 418 insertions(+), 91 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7373fe5de..06070ace8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6066,6 +6066,7 @@ dependencies = [ "prost-types", "restate-types", "serde", + "strum 0.26.2", "strum_macros 0.26.2", "thiserror", ] diff --git a/crates/partition-store/src/keys.rs b/crates/partition-store/src/keys.rs index 279db7d7b..991d5cf88 100644 --- a/crates/partition-store/src/keys.rs +++ b/crates/partition-store/src/keys.rs @@ -12,6 +12,7 @@ use anyhow::anyhow; use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytestring::ByteString; use prost::encoding::encoded_len_varint; +use std::mem; use strum_macros::EnumIter; /// Every table key needs to have a key kind. This allows to multiplex different keys in the same @@ -305,6 +306,7 @@ macro_rules! define_table_key { use crate::TableKind; pub(crate) use define_table_key; use restate_storage_api::deduplication_table::ProducerId; +use restate_storage_api::timer_table::TimerKind; use restate_storage_api::StorageError; use restate_types::identifiers::{InvocationUuid, PartitionId}; @@ -485,6 +487,74 @@ impl KeyCodec for ProducerId { } } +impl KeyCodec for TimerKind { + fn encode(&self, target: &mut B) { + assert!( + self.serialized_length() <= target.remaining_mut(), + "serialization buffer has not enough space to serialize TimerKind: '{}' bytes required", + self.serialized_length() + ); + match self { + TimerKind::Invocation { invocation_uuid } => { + target.put_u8(0); + invocation_uuid.encode(target); + } + TimerKind::Journal { + invocation_uuid, + journal_index, + } => { + target.put_u8(1); + invocation_uuid.encode(target); + journal_index.encode(target); + } + } + } + + fn decode(source: &mut B) -> crate::partition_store::Result { + if source.remaining() < mem::size_of::() { + return Err(StorageError::Generic(anyhow!( + "TimerKind discriminator byte is missing" + ))); + } + + Ok(match source.get_u8() { + 0 => { + let invocation_uuid = InvocationUuid::decode(source)?; + TimerKind::Invocation { invocation_uuid } + } + 1 => { + let invocation_uuid = InvocationUuid::decode(source)?; + let journal_index = u32::decode(source)?; + TimerKind::Journal { + invocation_uuid, + journal_index, + } + } + i => { + return Err(StorageError::Generic(anyhow!( + "Unknown discriminator for TimerKind: '{}'", + i + ))) + } + }) + } + + fn serialized_length(&self) -> usize { + 1 + match self { + TimerKind::Invocation { invocation_uuid } => { + KeyCodec::serialized_length(invocation_uuid) + } + TimerKind::Journal { + invocation_uuid, + journal_index, + } => { + KeyCodec::serialized_length(invocation_uuid) + + KeyCodec::serialized_length(journal_index) + } + } + } +} + #[inline] fn write_delimited(source: impl AsRef<[u8]>, target: &mut B) { let source = source.as_ref(); diff --git a/crates/partition-store/src/timer_table/mod.rs b/crates/partition-store/src/timer_table/mod.rs index a2ed766d5..5d2e225af 100644 --- a/crates/partition-store/src/timer_table/mod.rs +++ b/crates/partition-store/src/timer_table/mod.rs @@ -15,7 +15,7 @@ use crate::{PartitionStore, RocksDBTransaction, StorageAccess}; use crate::{TableScan, TableScanIterationDecision}; use futures::Stream; use futures_util::stream; -use restate_storage_api::timer_table::{Timer, TimerKey, TimerTable}; +use restate_storage_api::timer_table::{Timer, TimerKey, TimerKind, TimerTable}; use restate_storage_api::{Result, StorageError}; use restate_types::identifiers::{InvocationUuid, PartitionId}; use restate_types::storage::StorageCodec; @@ -26,8 +26,7 @@ define_table_key!( TimersKey( partition_id: PartitionId, timestamp: u64, - invocation_id: InvocationUuid, - journal_index: u32 + kind: TimerKind, ) ); @@ -36,8 +35,7 @@ fn write_timer_key(partition_id: PartitionId, timer_key: &TimerKey) -> TimersKey TimersKey::default() .partition_id(partition_id) .timestamp(timer_key.timestamp) - .invocation_id(timer_key.invocation_uuid) - .journal_index(timer_key.journal_index) + .kind(timer_key.kind.clone()) } #[inline] @@ -48,9 +46,8 @@ fn timer_key_from_key_slice(slice: &[u8]) -> Result { return Err(StorageError::DataIntegrityError); } let timer_key = TimerKey { - invocation_uuid: key.invocation_id.unwrap(), - journal_index: key.journal_index.unwrap(), timestamp: key.timestamp.unwrap(), + kind: key.kind.unwrap(), }; Ok(timer_key) @@ -71,11 +68,35 @@ fn exclusive_start_key_range( timer_key: Option<&TimerKey>, ) -> TableScan { if let Some(timer_key) = timer_key { - let mut lower_bound = write_timer_key(partition_id, timer_key); - - let next_index = lower_bound.journal_index.map(|i| i + 1).unwrap_or(1); + let next_timer_key = match timer_key.kind { + TimerKind::Invocation { invocation_uuid } => { + let invocation_uuid_value: u128 = invocation_uuid.into(); + TimerKey { + timestamp: timer_key.timestamp, + kind: TimerKind::Invocation { + invocation_uuid: InvocationUuid::from( + invocation_uuid_value + .checked_add(1) + .expect("invocation_uuid should be smaller than u128::MAX"), + ), + }, + } + } + TimerKind::Journal { + invocation_uuid, + journal_index, + } => TimerKey { + timestamp: timer_key.timestamp, + kind: TimerKind::Journal { + invocation_uuid, + journal_index: journal_index + .checked_add(1) + .expect("journal index should be smaller than u64::MAX"), + }, + }, + }; - lower_bound.journal_index = Some(next_index); + let lower_bound = write_timer_key(partition_id, &next_timer_key); let upper_bound = TimersKey::default() .partition_id(partition_id) @@ -174,16 +195,35 @@ mod tests { use super::*; use crate::timer_table::TimerKey; use rand::Rng; + use restate_storage_api::timer_table::TimerKindDiscriminants; use restate_types::identifiers::InvocationUuid; + use strum::VariantArray; const FIXTURE_INVOCATION: InvocationUuid = InvocationUuid::from_parts(1706027034946, 12345678900001); #[test] - fn round_trip() { + fn round_trip_journal_kind() { + let key = TimerKey { + kind: TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION, + journal_index: 1448, + }, + timestamp: 87654321, + }; + + let key_bytes = write_timer_key(PartitionId::from(1337), &key).serialize(); + let got = timer_key_from_key_slice(&key_bytes).expect("should not fail"); + + assert_eq!(got, key); + } + + #[test] + fn round_trip_invocation_kind() { let key = TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 1448, + kind: TimerKind::Invocation { + invocation_uuid: FIXTURE_INVOCATION, + }, timestamp: 87654321, }; @@ -195,38 +235,83 @@ mod tests { #[test] fn test_lexicographical_sorting_by_timestamp() { + let kinds = [ + TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION, + journal_index: 0, + }, + TimerKind::Invocation { + invocation_uuid: FIXTURE_INVOCATION, + }, + ]; + + for first_kind in &kinds { + for second_kind in &kinds { + let a = TimerKey { + kind: first_kind.clone(), + timestamp: 300, + }; + let b = TimerKey { + kind: second_kind.clone(), + timestamp: 301, + }; + assert_in_range(a, b); + } + } + } + + #[test] + fn test_lexicographical_sorting_by_invocation_journal_kind() { + // Higher random part should be sorted correctly in bytes let a = TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 0, + kind: TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION, + journal_index: 0, + }, + timestamp: 300, + }; + let b = TimerKey { + kind: TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION.increment_random(), + journal_index: 0, + }, timestamp: 300, }; + assert_in_range(a.clone(), b); + + // Also ensure that higher timestamp is sorted correctly let b = TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 0, - timestamp: 301, + kind: TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(), + journal_index: 0, + }, + timestamp: 300, }; assert_in_range(a, b); } #[test] - fn test_lexicographical_sorting_by_invocation() { + fn test_lexicographical_sorting_by_invocation_invocation_kind() { // Higher random part should be sorted correctly in bytes let a = TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 0, + kind: TimerKind::Invocation { + invocation_uuid: FIXTURE_INVOCATION, + }, timestamp: 300, }; let b = TimerKey { - invocation_uuid: FIXTURE_INVOCATION.increment_random(), - journal_index: 0, + kind: TimerKind::Invocation { + invocation_uuid: FIXTURE_INVOCATION.increment_random(), + }, timestamp: 300, }; assert_in_range(a.clone(), b); // Also ensure that higher timestamp is sorted correctly let b = TimerKey { - invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(), - journal_index: 0, + kind: TimerKind::Invocation { + invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(), + }, timestamp: 300, }; assert_in_range(a, b); @@ -235,20 +320,46 @@ mod tests { #[test] fn test_lexicographical_sorting_by_journal_index() { let a = TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 0, + kind: TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION, + journal_index: 0, + }, + timestamp: 300, + }; + let b = TimerKey { + kind: TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION, + journal_index: 1, + }, + timestamp: 300, + }; + assert_in_range(a, b); + } + + #[test] + fn test_lexicographical_sorting_journal_invocation_kind() { + let a = TimerKey { + kind: TimerKind::Invocation { + invocation_uuid: FIXTURE_INVOCATION, + }, timestamp: 300, }; + let b = TimerKey { - invocation_uuid: FIXTURE_INVOCATION, - journal_index: 1, + kind: TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION, + journal_index: 0, + }, timestamp: 300, }; + assert_in_range(a, b); } #[track_caller] fn assert_in_range(key_a: TimerKey, key_b: TimerKey) { + assert!(key_a < key_b); + let key_a_bytes = write_timer_key(PartitionId::from(1), &key_a).serialize(); let key_b_bytes = write_timer_key(PartitionId::from(1), &key_b).serialize(); @@ -296,9 +407,22 @@ mod tests { } pub fn random_timer_key() -> TimerKey { + let kind = { + match TimerKindDiscriminants::VARIANTS + [rand::thread_rng().gen_range(0..TimerKindDiscriminants::VARIANTS.len())] + { + TimerKindDiscriminants::Invocation => TimerKind::Invocation { + invocation_uuid: InvocationUuid::new(), + }, + TimerKindDiscriminants::Journal => TimerKind::Journal { + invocation_uuid: InvocationUuid::new(), + journal_index: rand::thread_rng().gen_range(0..2 ^ 16), + }, + } + }; + TimerKey { - invocation_uuid: InvocationUuid::new(), - journal_index: rand::thread_rng().gen_range(0..2 ^ 16), + kind, timestamp: rand::thread_rng().gen_range(0..2 ^ 16), } } diff --git a/crates/partition-store/tests/timer_table_test/mod.rs b/crates/partition-store/tests/timer_table_test/mod.rs index 2376199a9..e056f6436 100644 --- a/crates/partition-store/tests/timer_table_test/mod.rs +++ b/crates/partition-store/tests/timer_table_test/mod.rs @@ -10,8 +10,10 @@ use crate::mock_service_invocation; use futures_util::StreamExt; +use googletest::matchers::eq; +use googletest::{assert_that, pat}; use restate_partition_store::PartitionStore; -use restate_storage_api::timer_table::{Timer, TimerKey, TimerTable}; +use restate_storage_api::timer_table::{Timer, TimerKey, TimerKind, TimerTable}; use restate_storage_api::Transaction; use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionId, ServiceId}; use restate_types::invocation::ServiceInvocation; @@ -27,8 +29,10 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - invocation_uuid: FIXTURE_INVOCATION.invocation_uuid(), - journal_index: 0, + kind: TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION.invocation_uuid(), + journal_index: 0, + }, timestamp: 0, }, Timer::CompleteSleepEntry(FIXTURE_INVOCATION, 0), @@ -38,8 +42,10 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - invocation_uuid: FIXTURE_INVOCATION.invocation_uuid(), - journal_index: 1, + kind: TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION.invocation_uuid(), + journal_index: 1, + }, timestamp: 0, }, Timer::CompleteSleepEntry(FIXTURE_INVOCATION, 1), @@ -52,8 +58,9 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - invocation_uuid: service_invocation.invocation_id.invocation_uuid(), - journal_index: 2, + kind: TimerKind::Invocation { + invocation_uuid: service_invocation.invocation_id.invocation_uuid(), + }, timestamp: 1, }, Timer::Invoke(service_invocation), @@ -66,8 +73,10 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - invocation_uuid: FIXTURE_INVOCATION_UUID, - journal_index: 0, + kind: TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION_UUID, + journal_index: 0, + }, timestamp: 0, }, Timer::CompleteSleepEntry(InvocationId::from_parts(1336, FIXTURE_INVOCATION_UUID), 0), @@ -77,8 +86,10 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PartitionId::from(1338), &TimerKey { - invocation_uuid: FIXTURE_INVOCATION_UUID, - journal_index: 0, + kind: TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION_UUID, + journal_index: 0, + }, timestamp: 0, }, Timer::CompleteSleepEntry(InvocationId::from_parts(1338, FIXTURE_INVOCATION_UUID), 0), @@ -99,8 +110,10 @@ async fn demo_how_to_find_first_timers_in_a_partition(txn: &mut T async fn find_timers_greater_than(txn: &mut T) { let timer_key = &TimerKey { - invocation_uuid: FIXTURE_INVOCATION_UUID, - journal_index: 0, + kind: TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION_UUID, + journal_index: 0, + }, timestamp: 0, }; let mut stream = pin!(txn.next_timers_greater_than(PARTITION1337, Some(timer_key), usize::MAX)); @@ -108,13 +121,19 @@ async fn find_timers_greater_than(txn: &mut T) { if let Some(Ok((key, _))) = stream.next().await { // make sure that we skip the first timer that has a journal_index of 0 // take a look at populate_data once again. - assert_eq!(key.journal_index, 1); + assert_that!( + key.kind, + pat!(TimerKind::Journal { + journal_index: eq(1), + }) + ); } else { panic!("test failure"); } if let Some(Ok((key, _))) = stream.next().await { - assert_eq!(key.journal_index, 2); + assert_that!(key.kind, pat!(TimerKind::Invocation { .. })); + assert_eq!(key.timestamp, 1); } else { panic!("test failure"); } @@ -124,8 +143,10 @@ async fn delete_the_first_timer(txn: &mut T) { txn.delete_timer( PARTITION1337, &TimerKey { - invocation_uuid: FIXTURE_INVOCATION_UUID, - journal_index: 0, + kind: TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION_UUID, + journal_index: 0, + }, timestamp: 0, }, ) @@ -134,16 +155,22 @@ async fn delete_the_first_timer(txn: &mut T) { async fn verify_next_timer_after_deletion(txn: &mut T) { let timer_key = &TimerKey { - invocation_uuid: FIXTURE_INVOCATION_UUID, - journal_index: 0, + kind: TimerKind::Journal { + invocation_uuid: FIXTURE_INVOCATION_UUID, + journal_index: 0, + }, timestamp: 0, }; let mut stream = pin!(txn.next_timers_greater_than(PARTITION1337, Some(timer_key), usize::MAX,)); if let Some(Ok((key, _))) = stream.next().await { - // make sure that we skip the first timer - assert_eq!(key.journal_index, 1); + assert_that!( + key.kind, + pat!(TimerKind::Journal { + journal_index: eq(1) + }) + ); } else { panic!("test failure"); } diff --git a/crates/storage-api/Cargo.toml b/crates/storage-api/Cargo.toml index 8b26beac0..20010498a 100644 --- a/crates/storage-api/Cargo.toml +++ b/crates/storage-api/Cargo.toml @@ -23,6 +23,7 @@ opentelemetry = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } serde = { workspace = true, optional = true } +strum = { workspace = true } strum_macros = { workspace = true } thiserror = { workspace = true } diff --git a/crates/storage-api/src/timer_table/mod.rs b/crates/storage-api/src/timer_table/mod.rs index 7661886c8..7f23ae6fd 100644 --- a/crates/storage-api/src/timer_table/mod.rs +++ b/crates/storage-api/src/timer_table/mod.rs @@ -26,8 +26,30 @@ use std::future::Future; #[derive(Clone, Debug, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub struct TimerKey { pub timestamp: u64, - pub invocation_uuid: InvocationUuid, - pub journal_index: u32, + pub kind: TimerKind, +} + +impl TimerKey { + pub fn new_journal_entry( + timestamp: u64, + invocation_uuid: InvocationUuid, + journal_index: u32, + ) -> Self { + TimerKey { + timestamp, + kind: TimerKind::Journal { + invocation_uuid, + journal_index, + }, + } + } + + pub fn new_invocation(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { + TimerKey { + timestamp, + kind: TimerKind::Invocation { invocation_uuid }, + } + } } impl PartialOrd for TimerKey { @@ -40,8 +62,70 @@ impl Ord for TimerKey { fn cmp(&self, other: &Self) -> Ordering { self.timestamp .cmp(&other.timestamp) - .then_with(|| self.invocation_uuid.cmp(&other.invocation_uuid)) - .then_with(|| self.journal_index.cmp(&other.journal_index)) + .then_with(|| self.kind.cmp(&other.kind)) + } +} + +#[derive( + Clone, + Debug, + Eq, + PartialEq, + Hash, + serde::Serialize, + serde::Deserialize, + strum_macros::EnumDiscriminants, +)] +#[strum_discriminants(derive(strum_macros::VariantArray))] +pub enum TimerKind { + /// Invocation-scoped timers (e.g. a service invocation or clean up of invocation state) + Invocation { invocation_uuid: InvocationUuid }, + /// Journal-scoped timers (e.g. completing a sleep journal entry) + Journal { + invocation_uuid: InvocationUuid, + journal_index: u32, + }, +} + +impl TimerKind { + pub fn invocation_uuid(&self) -> InvocationUuid { + *match self { + TimerKind::Invocation { invocation_uuid } => invocation_uuid, + TimerKind::Journal { + invocation_uuid, .. + } => invocation_uuid, + } + } +} + +impl PartialOrd for TimerKind { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TimerKind { + fn cmp(&self, other: &Self) -> Ordering { + match self { + TimerKind::Invocation { invocation_uuid } => match other { + TimerKind::Invocation { + invocation_uuid: other_invocation_uuid, + } => invocation_uuid.cmp(other_invocation_uuid), + TimerKind::Journal { .. } => Ordering::Less, + }, + TimerKind::Journal { + invocation_uuid, + journal_index, + } => match other { + TimerKind::Invocation { .. } => Ordering::Greater, + TimerKind::Journal { + invocation_uuid: other_invocation_uuid, + journal_index: other_journal_index, + } => invocation_uuid + .cmp(other_invocation_uuid) + .then_with(|| journal_index.cmp(other_journal_index)), + }, + } } } diff --git a/crates/types/src/identifiers.rs b/crates/types/src/identifiers.rs index 997ba72dc..e79d9be56 100644 --- a/crates/types/src/identifiers.rs +++ b/crates/types/src/identifiers.rs @@ -294,6 +294,12 @@ impl From for InvocationUuid { } } +impl From for u128 { + fn from(value: InvocationUuid) -> Self { + value.0.into() + } +} + impl From for opentelemetry::trace::TraceId { fn from(value: InvocationUuid) -> Self { Self::from_bytes(value.to_bytes()) diff --git a/crates/types/src/timer.rs b/crates/types/src/timer.rs index 6d2d81ba6..6aca78582 100644 --- a/crates/types/src/timer.rs +++ b/crates/types/src/timer.rs @@ -19,7 +19,7 @@ pub trait Timer: Hash + Eq + Borrow { fn timer_key(&self) -> &Self::TimerKey; } -/// Timer key establishes an absolute order on [`Timer`]. Naturally, this should be key under +/// Timer key establishes an absolute order on [`Timer`]. Naturally, this should be the key under /// which the timer value is stored and can be retrieved. pub trait TimerKey: Ord + Clone + Hash + Debug { fn wake_up_time(&self) -> MillisSinceEpoch; diff --git a/crates/wal-protocol/src/timer.rs b/crates/wal-protocol/src/timer.rs index fe088fe7a..be7935a1d 100644 --- a/crates/wal-protocol/src/timer.rs +++ b/crates/wal-protocol/src/timer.rs @@ -8,7 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_storage_api::timer_table::{Timer, TimerKey}; +use restate_storage_api::timer_table::{Timer, TimerKey, TimerKind}; use restate_types::identifiers::{EntryIndex, InvocationId, WithPartitionKey}; use restate_types::invocation::ServiceInvocation; use restate_types::time::MillisSinceEpoch; @@ -33,11 +33,11 @@ impl TimerValue { wake_up_time: MillisSinceEpoch, entry_index: EntryIndex, ) -> Self { - let timer_key = TimerKey { - invocation_uuid: invocation_id.invocation_uuid(), - timestamp: wake_up_time.as_u64(), - journal_index: entry_index, - }; + let timer_key = TimerKey::new_journal_entry( + wake_up_time.as_u64(), + invocation_id.invocation_uuid(), + entry_index, + ); Self { timer_key, @@ -48,14 +48,10 @@ impl TimerValue { pub fn new_invoke( invocation_id: InvocationId, wake_up_time: MillisSinceEpoch, - entry_index: EntryIndex, service_invocation: ServiceInvocation, ) -> Self { - let timer_key = TimerKey { - invocation_uuid: invocation_id.invocation_uuid(), - timestamp: wake_up_time.as_u64(), - journal_index: entry_index, - }; + let timer_key = + TimerKey::new_invocation(wake_up_time.as_u64(), invocation_id.invocation_uuid()); Self { timer_key, @@ -63,6 +59,19 @@ impl TimerValue { } } + pub fn new_clean_invocation_status( + invocation_id: InvocationId, + wake_up_time: MillisSinceEpoch, + ) -> Self { + TimerValue { + timer_key: TimerKey::new_invocation( + wake_up_time.as_u64(), + invocation_id.invocation_uuid(), + ), + value: Timer::CleanInvocationStatus(invocation_id), + } + } + pub fn into_inner(self) -> (TimerKey, Timer) { (self.timer_key, self.value) } @@ -76,7 +85,10 @@ impl TimerValue { } pub fn invocation_id(&self) -> InvocationId { - InvocationId::from_parts(self.value.partition_key(), self.timer_key.invocation_uuid) + InvocationId::from_parts( + self.value.partition_key(), + self.timer_key.kind.invocation_uuid(), + ) } pub fn wake_up_time(&self) -> MillisSinceEpoch { @@ -119,6 +131,12 @@ pub struct TimerKeyDisplay<'a>(pub &'a TimerKey); impl<'a> fmt::Display for TimerKeyDisplay<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "[{}]({})", self.0.invocation_uuid, self.0.journal_index) + match self.0.kind { + TimerKind::Invocation { invocation_uuid } => write!(f, "{}", invocation_uuid), + TimerKind::Journal { + invocation_uuid, + journal_index, + } => write!(f, "{}[{}]", invocation_uuid, journal_index), + } } } diff --git a/crates/worker/src/partition/action_effect_handler.rs b/crates/worker/src/partition/action_effect_handler.rs index 954cb5c19..b3ab9d7c7 100644 --- a/crates/worker/src/partition/action_effect_handler.rs +++ b/crates/worker/src/partition/action_effect_handler.rs @@ -12,7 +12,6 @@ use super::leadership::ActionEffect; use restate_bifrost::Bifrost; use restate_core::metadata; use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber}; -use restate_storage_api::timer_table::{Timer, TimerKey}; use restate_types::identifiers::{PartitionId, PartitionKey, WithPartitionKey}; use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::timer::TimerValue; @@ -82,14 +81,9 @@ impl ActionEffectHandler { &mut self.bifrost, Envelope::new( header.clone(), - Command::ScheduleTimer(TimerValue::new( - TimerKey { - timestamp: MillisSinceEpoch::from(SystemTime::now() + duration) - .as_u64(), - invocation_uuid: invocation_id.invocation_uuid(), - journal_index: 0, - }, - Timer::CleanInvocationStatus(invocation_id), + Command::ScheduleTimer(TimerValue::new_clean_invocation_status( + invocation_id, + MillisSinceEpoch::from(SystemTime::now() + duration), )), ), ) diff --git a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs index bec830f89..a069b798d 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs @@ -253,8 +253,6 @@ where TimerValue::new_invoke( service_invocation.invocation_id, execution_time, - // This entry_index here makes little sense - 0, service_invocation, ), span_context, @@ -754,11 +752,11 @@ where ProtobufRawEntryCodec::deserialize(EntryType::Sleep, entry)? ); - let timer_key = TimerKey { - invocation_uuid: invocation_id.invocation_uuid(), + let timer_key = TimerKey::new_journal_entry( + wake_up_time, + invocation_id.invocation_uuid(), journal_index, - timestamp: wake_up_time, - }; + ); effects.delete_timer(timer_key); } diff --git a/crates/worker/src/partition/state_machine/command_interpreter/tests.rs b/crates/worker/src/partition/state_machine/command_interpreter/tests.rs index e0a118243..b137970b2 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/tests.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/tests.rs @@ -12,6 +12,7 @@ use restate_service_protocol::pb::protocol::SleepEntryMessage; use restate_storage_api::idempotency_table::IdempotencyMetadata; use restate_storage_api::inbox_table::SequenceNumberInboxEntry; use restate_storage_api::invocation_status_table::{JournalMetadata, StatusTimestamps}; +use restate_storage_api::timer_table::TimerKind; use restate_storage_api::{Result as StorageResult, StorageError}; use restate_test_util::matchers::*; use restate_test_util::{assert_eq, let_assert}; @@ -762,7 +763,9 @@ fn forward_canceled_completion_matcher(entry_index: EntryIndex) -> impl Matcher< fn delete_timer(entry_index: EntryIndex) -> impl Matcher { pat!(Effect::DeleteTimer(pat!(TimerKey { - journal_index: eq(entry_index), + kind: pat!(TimerKind::Journal { + journal_index: eq(entry_index), + }), timestamp: eq(1337), }))) } diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 65c9bee9a..f34d273ad 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -729,7 +729,7 @@ mod tests { IdempotencyMetadata, IdempotencyTable, ReadOnlyIdempotencyTable, }; use restate_storage_api::invocation_status_table::{CompletedInvocation, StatusTimestamps}; - use restate_storage_api::timer_table::{Timer, TimerKey}; + use restate_storage_api::timer_table::{Timer, TimerKey, TimerKind}; use restate_types::errors::GONE_INVOCATION_ERROR; use restate_types::identifiers::IdempotencyId; use restate_types::invocation::{Idempotency, InvocationTarget}; @@ -1122,9 +1122,10 @@ mod tests { let _ = state_machine .apply(Command::Timer(TimerValue::new( TimerKey { + kind: TimerKind::Invocation { + invocation_uuid: invocation_id.invocation_uuid(), + }, timestamp: 0, - invocation_uuid: invocation_id.invocation_uuid(), - journal_index: 0, }, Timer::CleanInvocationStatus(invocation_id), ))) From f881aa21f488829de57185a5d20342cdb4417d7b Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 29 Apr 2024 17:05:12 +0200 Subject: [PATCH 5/7] Remove restate.invocation.id from timer events in effects logger --- crates/worker/src/partition/state_machine/effects.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/worker/src/partition/state_machine/effects.rs b/crates/worker/src/partition/state_machine/effects.rs index 0f77e67cf..c64b0938e 100644 --- a/crates/worker/src/partition/state_machine/effects.rs +++ b/crates/worker/src/partition/state_machine/effects.rs @@ -422,7 +422,6 @@ impl Effect { span_context.is_sampled(), span_context.as_parent(), "sleep", - restate.invocation.id = %invocation_id, restate.journal.index = entry_index, restate.timer.wake_up_time = %timer_value.wake_up_time(), restate.timer.key = %TimerKeyDisplay(timer_value.key()), @@ -434,7 +433,6 @@ impl Effect { debug_if_leader!( is_leader, restate.journal.index = entry_index, - restate.invocation.id = %invocation_id, restate.timer.wake_up_time = %timer_value.wake_up_time(), restate.timer.key = %TimerKeyDisplay(timer_value.key()), "Effect: Register Sleep timer" @@ -446,7 +444,6 @@ impl Effect { is_leader, rpc.service = %service_invocation.invocation_target.service_name(), rpc.method = %service_invocation.invocation_target.handler_name(), - restate.invocation.id = %service_invocation.invocation_id, restate.invocation.target = %service_invocation.invocation_target, restate.timer.wake_up_time = %timer_value.wake_up_time(), restate.timer.key = %TimerKeyDisplay(timer_value.key()), @@ -456,7 +453,6 @@ impl Effect { Timer::CleanInvocationStatus(invocation_id) => { debug_if_leader!( is_leader, - restate.invocation.id = %invocation_id, restate.timer.wake_up_time = %timer_value.wake_up_time(), restate.timer.key = %TimerKeyDisplay(timer_value.key()), "Effect: Register cleanup invocation status timer" From 75653c576a3327fabf578e2099d3f4533e5b7f8f Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 29 Apr 2024 17:45:08 +0200 Subject: [PATCH 6/7] Introduce distinction TimerKind::Invoke and TimerKind::CleanInvocationStatus --- crates/partition-store/src/keys.rs | 25 ++- crates/partition-store/src/timer_table/mod.rs | 154 +++++++++++++----- .../tests/timer_table_test/mod.rs | 30 ++-- crates/storage-api/src/storage.rs | 4 +- crates/storage-api/src/timer_table/mod.rs | 56 +++++-- crates/wal-protocol/src/timer.rs | 30 ++-- .../src/partition/action_effect_handler.rs | 2 +- .../state_machine/command_interpreter/mod.rs | 8 +- .../command_interpreter/tests.rs | 2 +- .../src/partition/state_machine/effects.rs | 4 +- .../worker/src/partition/state_machine/mod.rs | 2 +- 11 files changed, 212 insertions(+), 105 deletions(-) diff --git a/crates/partition-store/src/keys.rs b/crates/partition-store/src/keys.rs index 991d5cf88..79338e1ea 100644 --- a/crates/partition-store/src/keys.rs +++ b/crates/partition-store/src/keys.rs @@ -495,11 +495,11 @@ impl KeyCodec for TimerKind { self.serialized_length() ); match self { - TimerKind::Invocation { invocation_uuid } => { + TimerKind::Invoke { invocation_uuid } => { target.put_u8(0); invocation_uuid.encode(target); } - TimerKind::Journal { + TimerKind::CompleteJournalEntry { invocation_uuid, journal_index, } => { @@ -507,6 +507,10 @@ impl KeyCodec for TimerKind { invocation_uuid.encode(target); journal_index.encode(target); } + TimerKind::CleanInvocationStatus { invocation_uuid } => { + target.put_u8(2); + invocation_uuid.encode(target); + } } } @@ -520,16 +524,20 @@ impl KeyCodec for TimerKind { Ok(match source.get_u8() { 0 => { let invocation_uuid = InvocationUuid::decode(source)?; - TimerKind::Invocation { invocation_uuid } + TimerKind::Invoke { invocation_uuid } } 1 => { let invocation_uuid = InvocationUuid::decode(source)?; let journal_index = u32::decode(source)?; - TimerKind::Journal { + TimerKind::CompleteJournalEntry { invocation_uuid, journal_index, } } + 2 => { + let invocation_uuid = InvocationUuid::decode(source)?; + TimerKind::CleanInvocationStatus { invocation_uuid } + } i => { return Err(StorageError::Generic(anyhow!( "Unknown discriminator for TimerKind: '{}'", @@ -541,16 +549,17 @@ impl KeyCodec for TimerKind { fn serialized_length(&self) -> usize { 1 + match self { - TimerKind::Invocation { invocation_uuid } => { - KeyCodec::serialized_length(invocation_uuid) - } - TimerKind::Journal { + TimerKind::Invoke { invocation_uuid } => KeyCodec::serialized_length(invocation_uuid), + TimerKind::CompleteJournalEntry { invocation_uuid, journal_index, } => { KeyCodec::serialized_length(invocation_uuid) + KeyCodec::serialized_length(journal_index) } + TimerKind::CleanInvocationStatus { invocation_uuid } => { + KeyCodec::serialized_length(invocation_uuid) + } } } } diff --git a/crates/partition-store/src/timer_table/mod.rs b/crates/partition-store/src/timer_table/mod.rs index 5d2e225af..fd5e0f755 100644 --- a/crates/partition-store/src/timer_table/mod.rs +++ b/crates/partition-store/src/timer_table/mod.rs @@ -69,31 +69,36 @@ fn exclusive_start_key_range( ) -> TableScan { if let Some(timer_key) = timer_key { let next_timer_key = match timer_key.kind { - TimerKind::Invocation { invocation_uuid } => { - let invocation_uuid_value: u128 = invocation_uuid.into(); + TimerKind::Invoke { invocation_uuid } => { + let incremented_invocation_uuid = increment_invocation_uuid(invocation_uuid); TimerKey { timestamp: timer_key.timestamp, - kind: TimerKind::Invocation { - invocation_uuid: InvocationUuid::from( - invocation_uuid_value - .checked_add(1) - .expect("invocation_uuid should be smaller than u128::MAX"), - ), + kind: TimerKind::Invoke { + invocation_uuid: incremented_invocation_uuid, }, } } - TimerKind::Journal { + TimerKind::CompleteJournalEntry { invocation_uuid, journal_index, } => TimerKey { timestamp: timer_key.timestamp, - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid, journal_index: journal_index .checked_add(1) .expect("journal index should be smaller than u64::MAX"), }, }, + TimerKind::CleanInvocationStatus { invocation_uuid } => { + let incremented_invocation_uuid = increment_invocation_uuid(invocation_uuid); + TimerKey { + timestamp: timer_key.timestamp, + kind: TimerKind::CleanInvocationStatus { + invocation_uuid: incremented_invocation_uuid, + }, + } + } }; let lower_bound = write_timer_key(partition_id, &next_timer_key); @@ -108,6 +113,15 @@ fn exclusive_start_key_range( } } +fn increment_invocation_uuid(invocation_uuid: InvocationUuid) -> InvocationUuid { + let invocation_uuid_value: u128 = invocation_uuid.into(); + InvocationUuid::from( + invocation_uuid_value + .checked_add(1) + .expect("invocation_uuid should be smaller than u128::MAX"), + ) +} + fn add_timer( storage: &mut S, partition_id: PartitionId, @@ -203,9 +217,9 @@ mod tests { InvocationUuid::from_parts(1706027034946, 12345678900001); #[test] - fn round_trip_journal_kind() { + fn round_trip_complete_journal_entry_kind() { let key = TimerKey { - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION, journal_index: 1448, }, @@ -219,9 +233,24 @@ mod tests { } #[test] - fn round_trip_invocation_kind() { + fn round_trip_invoke_kind() { + let key = TimerKey { + kind: TimerKind::Invoke { + invocation_uuid: FIXTURE_INVOCATION, + }, + timestamp: 87654321, + }; + + let key_bytes = write_timer_key(PartitionId::from(1337), &key).serialize(); + let got = timer_key_from_key_slice(&key_bytes).expect("should not fail"); + + assert_eq!(got, key); + } + + #[test] + fn round_trip_clean_invocation_status_kind() { let key = TimerKey { - kind: TimerKind::Invocation { + kind: TimerKind::CleanInvocationStatus { invocation_uuid: FIXTURE_INVOCATION, }, timestamp: 87654321, @@ -236,11 +265,14 @@ mod tests { #[test] fn test_lexicographical_sorting_by_timestamp() { let kinds = [ - TimerKind::Journal { + TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION, journal_index: 0, }, - TimerKind::Invocation { + TimerKind::Invoke { + invocation_uuid: FIXTURE_INVOCATION, + }, + TimerKind::CleanInvocationStatus { invocation_uuid: FIXTURE_INVOCATION, }, ]; @@ -255,117 +287,152 @@ mod tests { kind: second_kind.clone(), timestamp: 301, }; - assert_in_range(a, b); + assert_in_range(&a, &b); } } } #[test] - fn test_lexicographical_sorting_by_invocation_journal_kind() { + fn test_lexicographical_sorting_by_invocation_uuid_complete_journal_entry_kind() { // Higher random part should be sorted correctly in bytes let a = TimerKey { - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION, journal_index: 0, }, timestamp: 300, }; let b = TimerKey { - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION.increment_random(), journal_index: 0, }, timestamp: 300, }; - assert_in_range(a.clone(), b); + assert_in_range(&a, &b); // Also ensure that higher timestamp is sorted correctly let b = TimerKey { - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(), journal_index: 0, }, timestamp: 300, }; - assert_in_range(a, b); + assert_in_range(&a, &b); } #[test] - fn test_lexicographical_sorting_by_invocation_invocation_kind() { + fn test_lexicographical_sorting_by_invocation_uuid_invoke_kind() { // Higher random part should be sorted correctly in bytes let a = TimerKey { - kind: TimerKind::Invocation { + kind: TimerKind::Invoke { invocation_uuid: FIXTURE_INVOCATION, }, timestamp: 300, }; let b = TimerKey { - kind: TimerKind::Invocation { + kind: TimerKind::Invoke { invocation_uuid: FIXTURE_INVOCATION.increment_random(), }, timestamp: 300, }; - assert_in_range(a.clone(), b); + assert_in_range(&a, &b); // Also ensure that higher timestamp is sorted correctly let b = TimerKey { - kind: TimerKind::Invocation { + kind: TimerKind::Invoke { invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(), }, timestamp: 300, }; - assert_in_range(a, b); + assert_in_range(&a, &b); + } + + #[test] + fn test_lexicographical_sorting_by_invocation_uuid_clean_invoation_status_kind() { + // Higher random part should be sorted correctly in bytes + let a = TimerKey { + kind: TimerKind::CleanInvocationStatus { + invocation_uuid: FIXTURE_INVOCATION, + }, + timestamp: 300, + }; + let b = TimerKey { + kind: TimerKind::CleanInvocationStatus { + invocation_uuid: FIXTURE_INVOCATION.increment_random(), + }, + timestamp: 300, + }; + assert_in_range(&a, &b); + + // Also ensure that higher timestamp is sorted correctly + let b = TimerKey { + kind: TimerKind::CleanInvocationStatus { + invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(), + }, + timestamp: 300, + }; + assert_in_range(&a, &b); } #[test] fn test_lexicographical_sorting_by_journal_index() { let a = TimerKey { - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION, journal_index: 0, }, timestamp: 300, }; let b = TimerKey { - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION, journal_index: 1, }, timestamp: 300, }; - assert_in_range(a, b); + assert_in_range(&a, &b); } #[test] - fn test_lexicographical_sorting_journal_invocation_kind() { + fn test_lexicographical_sorting_timer_kind() { let a = TimerKey { - kind: TimerKind::Invocation { + kind: TimerKind::Invoke { invocation_uuid: FIXTURE_INVOCATION, }, timestamp: 300, }; let b = TimerKey { - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION, journal_index: 0, }, timestamp: 300, }; - assert_in_range(a, b); + let c = TimerKey { + kind: TimerKind::CleanInvocationStatus { + invocation_uuid: FIXTURE_INVOCATION, + }, + timestamp: 300, + }; + + assert_in_range(&a, &b); + assert_in_range(&b, &c); } #[track_caller] - fn assert_in_range(key_a: TimerKey, key_b: TimerKey) { + fn assert_in_range(key_a: &TimerKey, key_b: &TimerKey) { assert!(key_a < key_b); - let key_a_bytes = write_timer_key(PartitionId::from(1), &key_a).serialize(); - let key_b_bytes = write_timer_key(PartitionId::from(1), &key_b).serialize(); + let key_a_bytes = write_timer_key(PartitionId::from(1), key_a).serialize(); + let key_b_bytes = write_timer_key(PartitionId::from(1), key_b).serialize(); assert!(less_than(&key_a_bytes, &key_b_bytes)); - let (low, high) = match exclusive_start_key_range(PartitionId::from(1), Some(&key_a)) { + let (low, high) = match exclusive_start_key_range(PartitionId::from(1), Some(key_a)) { TableScan::KeyRangeInclusiveInSinglePartition(p, low, high) if *p == 1 => (low, high), _ => panic!(""), }; @@ -411,13 +478,16 @@ mod tests { match TimerKindDiscriminants::VARIANTS [rand::thread_rng().gen_range(0..TimerKindDiscriminants::VARIANTS.len())] { - TimerKindDiscriminants::Invocation => TimerKind::Invocation { + TimerKindDiscriminants::Invoke => TimerKind::Invoke { invocation_uuid: InvocationUuid::new(), }, - TimerKindDiscriminants::Journal => TimerKind::Journal { + TimerKindDiscriminants::CompleteJournalEntry => TimerKind::CompleteJournalEntry { invocation_uuid: InvocationUuid::new(), journal_index: rand::thread_rng().gen_range(0..2 ^ 16), }, + TimerKindDiscriminants::CleanInvocationStatus => TimerKind::CleanInvocationStatus { + invocation_uuid: InvocationUuid::new(), + }, } }; diff --git a/crates/partition-store/tests/timer_table_test/mod.rs b/crates/partition-store/tests/timer_table_test/mod.rs index e056f6436..592910718 100644 --- a/crates/partition-store/tests/timer_table_test/mod.rs +++ b/crates/partition-store/tests/timer_table_test/mod.rs @@ -29,26 +29,26 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION.invocation_uuid(), journal_index: 0, }, timestamp: 0, }, - Timer::CompleteSleepEntry(FIXTURE_INVOCATION, 0), + Timer::CompleteJournalEntry(FIXTURE_INVOCATION, 0), ) .await; txn.add_timer( PARTITION1337, &TimerKey { - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION.invocation_uuid(), journal_index: 1, }, timestamp: 0, }, - Timer::CompleteSleepEntry(FIXTURE_INVOCATION, 1), + Timer::CompleteJournalEntry(FIXTURE_INVOCATION, 1), ) .await; @@ -58,7 +58,7 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - kind: TimerKind::Invocation { + kind: TimerKind::Invoke { invocation_uuid: service_invocation.invocation_id.invocation_uuid(), }, timestamp: 1, @@ -73,26 +73,26 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, }, timestamp: 0, }, - Timer::CompleteSleepEntry(InvocationId::from_parts(1336, FIXTURE_INVOCATION_UUID), 0), + Timer::CompleteJournalEntry(InvocationId::from_parts(1336, FIXTURE_INVOCATION_UUID), 0), ) .await; txn.add_timer( PartitionId::from(1338), &TimerKey { - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, }, timestamp: 0, }, - Timer::CompleteSleepEntry(InvocationId::from_parts(1338, FIXTURE_INVOCATION_UUID), 0), + Timer::CompleteJournalEntry(InvocationId::from_parts(1338, FIXTURE_INVOCATION_UUID), 0), ) .await; } @@ -110,7 +110,7 @@ async fn demo_how_to_find_first_timers_in_a_partition(txn: &mut T async fn find_timers_greater_than(txn: &mut T) { let timer_key = &TimerKey { - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, }, @@ -123,7 +123,7 @@ async fn find_timers_greater_than(txn: &mut T) { // take a look at populate_data once again. assert_that!( key.kind, - pat!(TimerKind::Journal { + pat!(TimerKind::CompleteJournalEntry { journal_index: eq(1), }) ); @@ -132,7 +132,7 @@ async fn find_timers_greater_than(txn: &mut T) { } if let Some(Ok((key, _))) = stream.next().await { - assert_that!(key.kind, pat!(TimerKind::Invocation { .. })); + assert_that!(key.kind, pat!(TimerKind::Invoke { .. })); assert_eq!(key.timestamp, 1); } else { panic!("test failure"); @@ -143,7 +143,7 @@ async fn delete_the_first_timer(txn: &mut T) { txn.delete_timer( PARTITION1337, &TimerKey { - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, }, @@ -155,7 +155,7 @@ async fn delete_the_first_timer(txn: &mut T) { async fn verify_next_timer_after_deletion(txn: &mut T) { let timer_key = &TimerKey { - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, }, @@ -167,7 +167,7 @@ async fn verify_next_timer_after_deletion(txn: &mut T) { if let Some(Ok((key, _))) = stream.next().await { assert_that!( key.kind, - pat!(TimerKind::Journal { + pat!(TimerKind::CompleteJournalEntry { journal_index: eq(1) }) ); diff --git a/crates/storage-api/src/storage.rs b/crates/storage-api/src/storage.rs index fa6cdf5bc..d0beee92b 100644 --- a/crates/storage-api/src/storage.rs +++ b/crates/storage-api/src/storage.rs @@ -1920,7 +1920,7 @@ pub mod v1 { Ok( match value.value.ok_or(ConversionError::missing_field("value"))? { timer::Value::CompleteSleepEntry(cse) => { - crate::timer_table::Timer::CompleteSleepEntry( + crate::timer_table::Timer::CompleteJournalEntry( restate_types::identifiers::InvocationId::try_from( cse.invocation_id .ok_or(ConversionError::missing_field("invocation_id"))?, @@ -1949,7 +1949,7 @@ pub mod v1 { fn from(value: crate::timer_table::Timer) -> Self { Timer { value: Some(match value { - crate::timer_table::Timer::CompleteSleepEntry( + crate::timer_table::Timer::CompleteJournalEntry( invocation_id, entry_index, ) => timer::Value::CompleteSleepEntry(timer::CompleteSleepEntry { diff --git a/crates/storage-api/src/timer_table/mod.rs b/crates/storage-api/src/timer_table/mod.rs index 7f23ae6fd..3075f9e0c 100644 --- a/crates/storage-api/src/timer_table/mod.rs +++ b/crates/storage-api/src/timer_table/mod.rs @@ -30,24 +30,31 @@ pub struct TimerKey { } impl TimerKey { - pub fn new_journal_entry( + pub fn complete_journal_entry( timestamp: u64, invocation_uuid: InvocationUuid, journal_index: u32, ) -> Self { TimerKey { timestamp, - kind: TimerKind::Journal { + kind: TimerKind::CompleteJournalEntry { invocation_uuid, journal_index, }, } } - pub fn new_invocation(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { + pub fn invoke(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { TimerKey { timestamp, - kind: TimerKind::Invocation { invocation_uuid }, + kind: TimerKind::Invoke { invocation_uuid }, + } + } + + pub fn clean_invocation_status(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { + TimerKey { + timestamp, + kind: TimerKind::CleanInvocationStatus { invocation_uuid }, } } } @@ -78,22 +85,25 @@ impl Ord for TimerKey { )] #[strum_discriminants(derive(strum_macros::VariantArray))] pub enum TimerKind { - /// Invocation-scoped timers (e.g. a service invocation or clean up of invocation state) - Invocation { invocation_uuid: InvocationUuid }, - /// Journal-scoped timers (e.g. completing a sleep journal entry) - Journal { + /// Delayed invocation + Invoke { invocation_uuid: InvocationUuid }, + /// Completion of a journal entry + CompleteJournalEntry { invocation_uuid: InvocationUuid, journal_index: u32, }, + /// Cleaning of invocation status + CleanInvocationStatus { invocation_uuid: InvocationUuid }, } impl TimerKind { pub fn invocation_uuid(&self) -> InvocationUuid { *match self { - TimerKind::Invocation { invocation_uuid } => invocation_uuid, - TimerKind::Journal { + TimerKind::Invoke { invocation_uuid } => invocation_uuid, + TimerKind::CompleteJournalEntry { invocation_uuid, .. } => invocation_uuid, + TimerKind::CleanInvocationStatus { invocation_uuid } => invocation_uuid, } } } @@ -107,23 +117,33 @@ impl PartialOrd for TimerKind { impl Ord for TimerKind { fn cmp(&self, other: &Self) -> Ordering { match self { - TimerKind::Invocation { invocation_uuid } => match other { - TimerKind::Invocation { + TimerKind::Invoke { invocation_uuid } => match other { + TimerKind::Invoke { invocation_uuid: other_invocation_uuid, } => invocation_uuid.cmp(other_invocation_uuid), - TimerKind::Journal { .. } => Ordering::Less, + TimerKind::CompleteJournalEntry { .. } + | TimerKind::CleanInvocationStatus { .. } => Ordering::Less, }, - TimerKind::Journal { + TimerKind::CompleteJournalEntry { invocation_uuid, journal_index, } => match other { - TimerKind::Invocation { .. } => Ordering::Greater, - TimerKind::Journal { + TimerKind::Invoke { .. } => Ordering::Greater, + TimerKind::CompleteJournalEntry { invocation_uuid: other_invocation_uuid, journal_index: other_journal_index, } => invocation_uuid .cmp(other_invocation_uuid) .then_with(|| journal_index.cmp(other_journal_index)), + TimerKind::CleanInvocationStatus { .. } => Ordering::Less, + }, + TimerKind::CleanInvocationStatus { invocation_uuid } => match other { + TimerKind::Invoke { .. } | TimerKind::CompleteJournalEntry { .. } => { + Ordering::Greater + } + TimerKind::CleanInvocationStatus { + invocation_uuid: other_invocation_uuid, + } => invocation_uuid.cmp(other_invocation_uuid), }, } } @@ -138,15 +158,15 @@ impl restate_types::timer::TimerKey for TimerKey { #[derive(Clone, Debug, Eq, PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum Timer { - CompleteSleepEntry(InvocationId, u32), Invoke(ServiceInvocation), + CompleteJournalEntry(InvocationId, u32), CleanInvocationStatus(InvocationId), } impl WithPartitionKey for Timer { fn partition_key(&self) -> PartitionKey { match self { - Timer::CompleteSleepEntry(invocation_id, _) => invocation_id.partition_key(), + Timer::CompleteJournalEntry(invocation_id, _) => invocation_id.partition_key(), Timer::Invoke(service_invocation) => service_invocation.partition_key(), Timer::CleanInvocationStatus(invocation_id) => invocation_id.partition_key(), } diff --git a/crates/wal-protocol/src/timer.rs b/crates/wal-protocol/src/timer.rs index be7935a1d..b8b9feb52 100644 --- a/crates/wal-protocol/src/timer.rs +++ b/crates/wal-protocol/src/timer.rs @@ -28,12 +28,12 @@ impl TimerValue { Self { timer_key, value } } - pub fn new_sleep( + pub fn complete_journal_entry( invocation_id: InvocationId, wake_up_time: MillisSinceEpoch, entry_index: EntryIndex, ) -> Self { - let timer_key = TimerKey::new_journal_entry( + let timer_key = TimerKey::complete_journal_entry( wake_up_time.as_u64(), invocation_id.invocation_uuid(), entry_index, @@ -41,17 +41,16 @@ impl TimerValue { Self { timer_key, - value: Timer::CompleteSleepEntry(invocation_id, entry_index), + value: Timer::CompleteJournalEntry(invocation_id, entry_index), } } - pub fn new_invoke( + pub fn invoke( invocation_id: InvocationId, wake_up_time: MillisSinceEpoch, service_invocation: ServiceInvocation, ) -> Self { - let timer_key = - TimerKey::new_invocation(wake_up_time.as_u64(), invocation_id.invocation_uuid()); + let timer_key = TimerKey::invoke(wake_up_time.as_u64(), invocation_id.invocation_uuid()); Self { timer_key, @@ -59,12 +58,12 @@ impl TimerValue { } } - pub fn new_clean_invocation_status( + pub fn clean_invocation_status( invocation_id: InvocationId, wake_up_time: MillisSinceEpoch, ) -> Self { TimerValue { - timer_key: TimerKey::new_invocation( + timer_key: TimerKey::clean_invocation_status( wake_up_time.as_u64(), invocation_id.invocation_uuid(), ), @@ -132,11 +131,20 @@ pub struct TimerKeyDisplay<'a>(pub &'a TimerKey); impl<'a> fmt::Display for TimerKeyDisplay<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self.0.kind { - TimerKind::Invocation { invocation_uuid } => write!(f, "{}", invocation_uuid), - TimerKind::Journal { + TimerKind::Invoke { invocation_uuid } => { + write!(f, "Delayed invocation '{}'", invocation_uuid) + } + TimerKind::CompleteJournalEntry { invocation_uuid, journal_index, - } => write!(f, "{}[{}]", invocation_uuid, journal_index), + } => write!( + f, + "Complete journal entry [{}] for '{}'", + journal_index, invocation_uuid + ), + TimerKind::CleanInvocationStatus { invocation_uuid } => { + write!(f, "Clean invocation status '{}'", invocation_uuid) + } } } } diff --git a/crates/worker/src/partition/action_effect_handler.rs b/crates/worker/src/partition/action_effect_handler.rs index b3ab9d7c7..fe9979a9f 100644 --- a/crates/worker/src/partition/action_effect_handler.rs +++ b/crates/worker/src/partition/action_effect_handler.rs @@ -81,7 +81,7 @@ impl ActionEffectHandler { &mut self.bifrost, Envelope::new( header.clone(), - Command::ScheduleTimer(TimerValue::new_clean_invocation_status( + Command::ScheduleTimer(TimerValue::clean_invocation_status( invocation_id, MillisSinceEpoch::from(SystemTime::now() + duration), )), diff --git a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs index a069b798d..cdbcf22a4 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs @@ -250,7 +250,7 @@ where if let Some(execution_time) = service_invocation.execution_time { let span_context = service_invocation.span_context.clone(); effects.register_timer( - TimerValue::new_invoke( + TimerValue::invoke( service_invocation.invocation_id, execution_time, service_invocation, @@ -752,7 +752,7 @@ where ProtobufRawEntryCodec::deserialize(EntryType::Sleep, entry)? ); - let timer_key = TimerKey::new_journal_entry( + let timer_key = TimerKey::complete_journal_entry( wake_up_time, invocation_id.invocation_uuid(), journal_index, @@ -810,7 +810,7 @@ where effects.delete_timer(key); match value { - Timer::CompleteSleepEntry(invocation_id, entry_index) => { + Timer::CompleteJournalEntry(invocation_id, entry_index) => { Self::handle_completion( invocation_id, Completion { @@ -1301,7 +1301,7 @@ where journal_entry.deserialize_entry_ref::()? ); effects.register_timer( - TimerValue::new_sleep( + TimerValue::complete_journal_entry( invocation_id, MillisSinceEpoch::new(wake_up_time), entry_index, diff --git a/crates/worker/src/partition/state_machine/command_interpreter/tests.rs b/crates/worker/src/partition/state_machine/command_interpreter/tests.rs index b137970b2..ee1e3e976 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/tests.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/tests.rs @@ -763,7 +763,7 @@ fn forward_canceled_completion_matcher(entry_index: EntryIndex) -> impl Matcher< fn delete_timer(entry_index: EntryIndex) -> impl Matcher { pat!(Effect::DeleteTimer(pat!(TimerKey { - kind: pat!(TimerKind::Journal { + kind: pat!(TimerKind::CompleteJournalEntry { journal_index: eq(entry_index), }), timestamp: eq(1337), diff --git a/crates/worker/src/partition/state_machine/effects.rs b/crates/worker/src/partition/state_machine/effects.rs index c64b0938e..2666a02ad 100644 --- a/crates/worker/src/partition/state_machine/effects.rs +++ b/crates/worker/src/partition/state_machine/effects.rs @@ -416,7 +416,7 @@ impl Effect { span_context, .. } => match timer_value.value() { - Timer::CompleteSleepEntry(invocation_id, entry_index) => { + Timer::CompleteJournalEntry(_, entry_index) => { info_span_if_leader!( is_leader, span_context.is_sampled(), @@ -450,7 +450,7 @@ impl Effect { "Effect: Register background invoke timer" ) } - Timer::CleanInvocationStatus(invocation_id) => { + Timer::CleanInvocationStatus(_) => { debug_if_leader!( is_leader, restate.timer.wake_up_time = %timer_value.wake_up_time(), diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index f34d273ad..410104fd6 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -1122,7 +1122,7 @@ mod tests { let _ = state_machine .apply(Command::Timer(TimerValue::new( TimerKey { - kind: TimerKind::Invocation { + kind: TimerKind::Invoke { invocation_uuid: invocation_id.invocation_uuid(), }, timestamp: 0, From 6ea99f10cf1970e29ad23df36337e739a4a77f20 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 30 Apr 2024 17:45:54 +0200 Subject: [PATCH 7/7] Make the relationship between Timer and TimerKey more explicit This commit introduces factory methods on the Timer which creates the right timer kind and corresponding timer key. This prevents the accidental mixup of Timers with their keys. --- crates/partition-store/src/keys.rs | 24 ++--- crates/partition-store/src/timer_table/mod.rs | 82 ++++++++-------- .../tests/timer_table_test/mod.rs | 24 ++--- crates/storage-api/src/timer_table/mod.rs | 95 ++++++++++++++----- crates/wal-protocol/src/lib.rs | 6 +- crates/wal-protocol/src/timer.rs | 70 +++++--------- .../src/partition/action_effect_handler.rs | 6 +- .../partition/leadership/action_collector.rs | 4 +- crates/worker/src/partition/leadership/mod.rs | 6 +- .../src/partition/state_machine/actions.rs | 4 +- .../state_machine/command_interpreter/mod.rs | 20 ++-- .../command_interpreter/tests.rs | 4 +- .../src/partition/state_machine/effects.rs | 6 +- .../worker/src/partition/state_machine/mod.rs | 8 +- crates/worker/src/partition/storage/mod.rs | 8 +- 15 files changed, 197 insertions(+), 170 deletions(-) diff --git a/crates/partition-store/src/keys.rs b/crates/partition-store/src/keys.rs index 79338e1ea..c42d48516 100644 --- a/crates/partition-store/src/keys.rs +++ b/crates/partition-store/src/keys.rs @@ -306,7 +306,7 @@ macro_rules! define_table_key { use crate::TableKind; pub(crate) use define_table_key; use restate_storage_api::deduplication_table::ProducerId; -use restate_storage_api::timer_table::TimerKind; +use restate_storage_api::timer_table::TimerKeyKind; use restate_storage_api::StorageError; use restate_types::identifiers::{InvocationUuid, PartitionId}; @@ -487,7 +487,7 @@ impl KeyCodec for ProducerId { } } -impl KeyCodec for TimerKind { +impl KeyCodec for TimerKeyKind { fn encode(&self, target: &mut B) { assert!( self.serialized_length() <= target.remaining_mut(), @@ -495,11 +495,11 @@ impl KeyCodec for TimerKind { self.serialized_length() ); match self { - TimerKind::Invoke { invocation_uuid } => { + TimerKeyKind::Invoke { invocation_uuid } => { target.put_u8(0); invocation_uuid.encode(target); } - TimerKind::CompleteJournalEntry { + TimerKeyKind::CompleteJournalEntry { invocation_uuid, journal_index, } => { @@ -507,7 +507,7 @@ impl KeyCodec for TimerKind { invocation_uuid.encode(target); journal_index.encode(target); } - TimerKind::CleanInvocationStatus { invocation_uuid } => { + TimerKeyKind::CleanInvocationStatus { invocation_uuid } => { target.put_u8(2); invocation_uuid.encode(target); } @@ -524,19 +524,19 @@ impl KeyCodec for TimerKind { Ok(match source.get_u8() { 0 => { let invocation_uuid = InvocationUuid::decode(source)?; - TimerKind::Invoke { invocation_uuid } + TimerKeyKind::Invoke { invocation_uuid } } 1 => { let invocation_uuid = InvocationUuid::decode(source)?; let journal_index = u32::decode(source)?; - TimerKind::CompleteJournalEntry { + TimerKeyKind::CompleteJournalEntry { invocation_uuid, journal_index, } } 2 => { let invocation_uuid = InvocationUuid::decode(source)?; - TimerKind::CleanInvocationStatus { invocation_uuid } + TimerKeyKind::CleanInvocationStatus { invocation_uuid } } i => { return Err(StorageError::Generic(anyhow!( @@ -549,15 +549,17 @@ impl KeyCodec for TimerKind { fn serialized_length(&self) -> usize { 1 + match self { - TimerKind::Invoke { invocation_uuid } => KeyCodec::serialized_length(invocation_uuid), - TimerKind::CompleteJournalEntry { + TimerKeyKind::Invoke { invocation_uuid } => { + KeyCodec::serialized_length(invocation_uuid) + } + TimerKeyKind::CompleteJournalEntry { invocation_uuid, journal_index, } => { KeyCodec::serialized_length(invocation_uuid) + KeyCodec::serialized_length(journal_index) } - TimerKind::CleanInvocationStatus { invocation_uuid } => { + TimerKeyKind::CleanInvocationStatus { invocation_uuid } => { KeyCodec::serialized_length(invocation_uuid) } } diff --git a/crates/partition-store/src/timer_table/mod.rs b/crates/partition-store/src/timer_table/mod.rs index fd5e0f755..9e3b2aaf4 100644 --- a/crates/partition-store/src/timer_table/mod.rs +++ b/crates/partition-store/src/timer_table/mod.rs @@ -15,7 +15,7 @@ use crate::{PartitionStore, RocksDBTransaction, StorageAccess}; use crate::{TableScan, TableScanIterationDecision}; use futures::Stream; use futures_util::stream; -use restate_storage_api::timer_table::{Timer, TimerKey, TimerKind, TimerTable}; +use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind, TimerTable}; use restate_storage_api::{Result, StorageError}; use restate_types::identifiers::{InvocationUuid, PartitionId}; use restate_types::storage::StorageCodec; @@ -26,7 +26,7 @@ define_table_key!( TimersKey( partition_id: PartitionId, timestamp: u64, - kind: TimerKind, + kind: TimerKeyKind, ) ); @@ -69,32 +69,32 @@ fn exclusive_start_key_range( ) -> TableScan { if let Some(timer_key) = timer_key { let next_timer_key = match timer_key.kind { - TimerKind::Invoke { invocation_uuid } => { + TimerKeyKind::Invoke { invocation_uuid } => { let incremented_invocation_uuid = increment_invocation_uuid(invocation_uuid); TimerKey { timestamp: timer_key.timestamp, - kind: TimerKind::Invoke { + kind: TimerKeyKind::Invoke { invocation_uuid: incremented_invocation_uuid, }, } } - TimerKind::CompleteJournalEntry { + TimerKeyKind::CompleteJournalEntry { invocation_uuid, journal_index, } => TimerKey { timestamp: timer_key.timestamp, - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid, journal_index: journal_index .checked_add(1) .expect("journal index should be smaller than u64::MAX"), }, }, - TimerKind::CleanInvocationStatus { invocation_uuid } => { + TimerKeyKind::CleanInvocationStatus { invocation_uuid } => { let incremented_invocation_uuid = increment_invocation_uuid(invocation_uuid); TimerKey { timestamp: timer_key.timestamp, - kind: TimerKind::CleanInvocationStatus { + kind: TimerKeyKind::CleanInvocationStatus { invocation_uuid: incremented_invocation_uuid, }, } @@ -209,7 +209,7 @@ mod tests { use super::*; use crate::timer_table::TimerKey; use rand::Rng; - use restate_storage_api::timer_table::TimerKindDiscriminants; + use restate_storage_api::timer_table::TimerKeyKindDiscriminants; use restate_types::identifiers::InvocationUuid; use strum::VariantArray; @@ -219,7 +219,7 @@ mod tests { #[test] fn round_trip_complete_journal_entry_kind() { let key = TimerKey { - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION, journal_index: 1448, }, @@ -235,7 +235,7 @@ mod tests { #[test] fn round_trip_invoke_kind() { let key = TimerKey { - kind: TimerKind::Invoke { + kind: TimerKeyKind::Invoke { invocation_uuid: FIXTURE_INVOCATION, }, timestamp: 87654321, @@ -250,7 +250,7 @@ mod tests { #[test] fn round_trip_clean_invocation_status_kind() { let key = TimerKey { - kind: TimerKind::CleanInvocationStatus { + kind: TimerKeyKind::CleanInvocationStatus { invocation_uuid: FIXTURE_INVOCATION, }, timestamp: 87654321, @@ -265,14 +265,14 @@ mod tests { #[test] fn test_lexicographical_sorting_by_timestamp() { let kinds = [ - TimerKind::CompleteJournalEntry { + TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION, journal_index: 0, }, - TimerKind::Invoke { + TimerKeyKind::Invoke { invocation_uuid: FIXTURE_INVOCATION, }, - TimerKind::CleanInvocationStatus { + TimerKeyKind::CleanInvocationStatus { invocation_uuid: FIXTURE_INVOCATION, }, ]; @@ -296,14 +296,14 @@ mod tests { fn test_lexicographical_sorting_by_invocation_uuid_complete_journal_entry_kind() { // Higher random part should be sorted correctly in bytes let a = TimerKey { - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION, journal_index: 0, }, timestamp: 300, }; let b = TimerKey { - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION.increment_random(), journal_index: 0, }, @@ -313,7 +313,7 @@ mod tests { // Also ensure that higher timestamp is sorted correctly let b = TimerKey { - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(), journal_index: 0, }, @@ -326,13 +326,13 @@ mod tests { fn test_lexicographical_sorting_by_invocation_uuid_invoke_kind() { // Higher random part should be sorted correctly in bytes let a = TimerKey { - kind: TimerKind::Invoke { + kind: TimerKeyKind::Invoke { invocation_uuid: FIXTURE_INVOCATION, }, timestamp: 300, }; let b = TimerKey { - kind: TimerKind::Invoke { + kind: TimerKeyKind::Invoke { invocation_uuid: FIXTURE_INVOCATION.increment_random(), }, timestamp: 300, @@ -341,7 +341,7 @@ mod tests { // Also ensure that higher timestamp is sorted correctly let b = TimerKey { - kind: TimerKind::Invoke { + kind: TimerKeyKind::Invoke { invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(), }, timestamp: 300, @@ -353,13 +353,13 @@ mod tests { fn test_lexicographical_sorting_by_invocation_uuid_clean_invoation_status_kind() { // Higher random part should be sorted correctly in bytes let a = TimerKey { - kind: TimerKind::CleanInvocationStatus { + kind: TimerKeyKind::CleanInvocationStatus { invocation_uuid: FIXTURE_INVOCATION, }, timestamp: 300, }; let b = TimerKey { - kind: TimerKind::CleanInvocationStatus { + kind: TimerKeyKind::CleanInvocationStatus { invocation_uuid: FIXTURE_INVOCATION.increment_random(), }, timestamp: 300, @@ -368,7 +368,7 @@ mod tests { // Also ensure that higher timestamp is sorted correctly let b = TimerKey { - kind: TimerKind::CleanInvocationStatus { + kind: TimerKeyKind::CleanInvocationStatus { invocation_uuid: FIXTURE_INVOCATION.increment_timestamp(), }, timestamp: 300, @@ -379,14 +379,14 @@ mod tests { #[test] fn test_lexicographical_sorting_by_journal_index() { let a = TimerKey { - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION, journal_index: 0, }, timestamp: 300, }; let b = TimerKey { - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION, journal_index: 1, }, @@ -398,14 +398,14 @@ mod tests { #[test] fn test_lexicographical_sorting_timer_kind() { let a = TimerKey { - kind: TimerKind::Invoke { + kind: TimerKeyKind::Invoke { invocation_uuid: FIXTURE_INVOCATION, }, timestamp: 300, }; let b = TimerKey { - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION, journal_index: 0, }, @@ -413,7 +413,7 @@ mod tests { }; let c = TimerKey { - kind: TimerKind::CleanInvocationStatus { + kind: TimerKeyKind::CleanInvocationStatus { invocation_uuid: FIXTURE_INVOCATION, }, timestamp: 300, @@ -475,19 +475,23 @@ mod tests { pub fn random_timer_key() -> TimerKey { let kind = { - match TimerKindDiscriminants::VARIANTS - [rand::thread_rng().gen_range(0..TimerKindDiscriminants::VARIANTS.len())] + match TimerKeyKindDiscriminants::VARIANTS + [rand::thread_rng().gen_range(0..TimerKeyKindDiscriminants::VARIANTS.len())] { - TimerKindDiscriminants::Invoke => TimerKind::Invoke { - invocation_uuid: InvocationUuid::new(), - }, - TimerKindDiscriminants::CompleteJournalEntry => TimerKind::CompleteJournalEntry { - invocation_uuid: InvocationUuid::new(), - journal_index: rand::thread_rng().gen_range(0..2 ^ 16), - }, - TimerKindDiscriminants::CleanInvocationStatus => TimerKind::CleanInvocationStatus { + TimerKeyKindDiscriminants::Invoke => TimerKeyKind::Invoke { invocation_uuid: InvocationUuid::new(), }, + TimerKeyKindDiscriminants::CompleteJournalEntry => { + TimerKeyKind::CompleteJournalEntry { + invocation_uuid: InvocationUuid::new(), + journal_index: rand::thread_rng().gen_range(0..2 ^ 16), + } + } + TimerKeyKindDiscriminants::CleanInvocationStatus => { + TimerKeyKind::CleanInvocationStatus { + invocation_uuid: InvocationUuid::new(), + } + } } }; diff --git a/crates/partition-store/tests/timer_table_test/mod.rs b/crates/partition-store/tests/timer_table_test/mod.rs index 592910718..731469f07 100644 --- a/crates/partition-store/tests/timer_table_test/mod.rs +++ b/crates/partition-store/tests/timer_table_test/mod.rs @@ -13,7 +13,7 @@ use futures_util::StreamExt; use googletest::matchers::eq; use googletest::{assert_that, pat}; use restate_partition_store::PartitionStore; -use restate_storage_api::timer_table::{Timer, TimerKey, TimerKind, TimerTable}; +use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind, TimerTable}; use restate_storage_api::Transaction; use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionId, ServiceId}; use restate_types::invocation::ServiceInvocation; @@ -29,7 +29,7 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION.invocation_uuid(), journal_index: 0, }, @@ -42,7 +42,7 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION.invocation_uuid(), journal_index: 1, }, @@ -58,7 +58,7 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - kind: TimerKind::Invoke { + kind: TimerKeyKind::Invoke { invocation_uuid: service_invocation.invocation_id.invocation_uuid(), }, timestamp: 1, @@ -73,7 +73,7 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PARTITION1337, &TimerKey { - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, }, @@ -86,7 +86,7 @@ async fn populate_data(txn: &mut T) { txn.add_timer( PartitionId::from(1338), &TimerKey { - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, }, @@ -110,7 +110,7 @@ async fn demo_how_to_find_first_timers_in_a_partition(txn: &mut T async fn find_timers_greater_than(txn: &mut T) { let timer_key = &TimerKey { - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, }, @@ -123,7 +123,7 @@ async fn find_timers_greater_than(txn: &mut T) { // take a look at populate_data once again. assert_that!( key.kind, - pat!(TimerKind::CompleteJournalEntry { + pat!(TimerKeyKind::CompleteJournalEntry { journal_index: eq(1), }) ); @@ -132,7 +132,7 @@ async fn find_timers_greater_than(txn: &mut T) { } if let Some(Ok((key, _))) = stream.next().await { - assert_that!(key.kind, pat!(TimerKind::Invoke { .. })); + assert_that!(key.kind, pat!(TimerKeyKind::Invoke { .. })); assert_eq!(key.timestamp, 1); } else { panic!("test failure"); @@ -143,7 +143,7 @@ async fn delete_the_first_timer(txn: &mut T) { txn.delete_timer( PARTITION1337, &TimerKey { - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, }, @@ -155,7 +155,7 @@ async fn delete_the_first_timer(txn: &mut T) { async fn verify_next_timer_after_deletion(txn: &mut T) { let timer_key = &TimerKey { - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid: FIXTURE_INVOCATION_UUID, journal_index: 0, }, @@ -167,7 +167,7 @@ async fn verify_next_timer_after_deletion(txn: &mut T) { if let Some(Ok((key, _))) = stream.next().await { assert_that!( key.kind, - pat!(TimerKind::CompleteJournalEntry { + pat!(TimerKeyKind::CompleteJournalEntry { journal_index: eq(1) }) ); diff --git a/crates/storage-api/src/timer_table/mod.rs b/crates/storage-api/src/timer_table/mod.rs index 3075f9e0c..a392c9397 100644 --- a/crates/storage-api/src/timer_table/mod.rs +++ b/crates/storage-api/src/timer_table/mod.rs @@ -26,35 +26,35 @@ use std::future::Future; #[derive(Clone, Debug, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub struct TimerKey { pub timestamp: u64, - pub kind: TimerKind, + pub kind: TimerKeyKind, } impl TimerKey { - pub fn complete_journal_entry( + fn complete_journal_entry( timestamp: u64, invocation_uuid: InvocationUuid, journal_index: u32, ) -> Self { TimerKey { timestamp, - kind: TimerKind::CompleteJournalEntry { + kind: TimerKeyKind::CompleteJournalEntry { invocation_uuid, journal_index, }, } } - pub fn invoke(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { + fn invoke(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { TimerKey { timestamp, - kind: TimerKind::Invoke { invocation_uuid }, + kind: TimerKeyKind::Invoke { invocation_uuid }, } } - pub fn clean_invocation_status(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { + fn clean_invocation_status(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { TimerKey { timestamp, - kind: TimerKind::CleanInvocationStatus { invocation_uuid }, + kind: TimerKeyKind::CleanInvocationStatus { invocation_uuid }, } } } @@ -84,7 +84,7 @@ impl Ord for TimerKey { strum_macros::EnumDiscriminants, )] #[strum_discriminants(derive(strum_macros::VariantArray))] -pub enum TimerKind { +pub enum TimerKeyKind { /// Delayed invocation Invoke { invocation_uuid: InvocationUuid }, /// Completion of a journal entry @@ -96,52 +96,52 @@ pub enum TimerKind { CleanInvocationStatus { invocation_uuid: InvocationUuid }, } -impl TimerKind { +impl TimerKeyKind { pub fn invocation_uuid(&self) -> InvocationUuid { *match self { - TimerKind::Invoke { invocation_uuid } => invocation_uuid, - TimerKind::CompleteJournalEntry { + TimerKeyKind::Invoke { invocation_uuid } => invocation_uuid, + TimerKeyKind::CompleteJournalEntry { invocation_uuid, .. } => invocation_uuid, - TimerKind::CleanInvocationStatus { invocation_uuid } => invocation_uuid, + TimerKeyKind::CleanInvocationStatus { invocation_uuid } => invocation_uuid, } } } -impl PartialOrd for TimerKind { +impl PartialOrd for TimerKeyKind { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl Ord for TimerKind { +impl Ord for TimerKeyKind { fn cmp(&self, other: &Self) -> Ordering { match self { - TimerKind::Invoke { invocation_uuid } => match other { - TimerKind::Invoke { + TimerKeyKind::Invoke { invocation_uuid } => match other { + TimerKeyKind::Invoke { invocation_uuid: other_invocation_uuid, } => invocation_uuid.cmp(other_invocation_uuid), - TimerKind::CompleteJournalEntry { .. } - | TimerKind::CleanInvocationStatus { .. } => Ordering::Less, + TimerKeyKind::CompleteJournalEntry { .. } + | TimerKeyKind::CleanInvocationStatus { .. } => Ordering::Less, }, - TimerKind::CompleteJournalEntry { + TimerKeyKind::CompleteJournalEntry { invocation_uuid, journal_index, } => match other { - TimerKind::Invoke { .. } => Ordering::Greater, - TimerKind::CompleteJournalEntry { + TimerKeyKind::Invoke { .. } => Ordering::Greater, + TimerKeyKind::CompleteJournalEntry { invocation_uuid: other_invocation_uuid, journal_index: other_journal_index, } => invocation_uuid .cmp(other_invocation_uuid) .then_with(|| journal_index.cmp(other_journal_index)), - TimerKind::CleanInvocationStatus { .. } => Ordering::Less, + TimerKeyKind::CleanInvocationStatus { .. } => Ordering::Less, }, - TimerKind::CleanInvocationStatus { invocation_uuid } => match other { - TimerKind::Invoke { .. } | TimerKind::CompleteJournalEntry { .. } => { + TimerKeyKind::CleanInvocationStatus { invocation_uuid } => match other { + TimerKeyKind::Invoke { .. } | TimerKeyKind::CompleteJournalEntry { .. } => { Ordering::Greater } - TimerKind::CleanInvocationStatus { + TimerKeyKind::CleanInvocationStatus { invocation_uuid: other_invocation_uuid, } => invocation_uuid.cmp(other_invocation_uuid), }, @@ -163,6 +163,51 @@ pub enum Timer { CleanInvocationStatus(InvocationId), } +impl Timer { + pub fn complete_journal_entry( + timestamp: u64, + invocation_id: InvocationId, + journal_index: u32, + ) -> (TimerKey, Self) { + ( + TimerKey::complete_journal_entry( + timestamp, + invocation_id.invocation_uuid(), + journal_index, + ), + Timer::CompleteJournalEntry(invocation_id, journal_index), + ) + } + + pub fn invoke(timestamp: u64, service_invocation: ServiceInvocation) -> (TimerKey, Self) { + ( + TimerKey::invoke( + timestamp, + service_invocation.invocation_id.invocation_uuid(), + ), + Timer::Invoke(service_invocation), + ) + } + + pub fn clean_invocation_status( + timestamp: u64, + invocation_id: InvocationId, + ) -> (TimerKey, Self) { + ( + TimerKey::clean_invocation_status(timestamp, invocation_id.invocation_uuid()), + Timer::CleanInvocationStatus(invocation_id), + ) + } + + pub fn invocation_id(&self) -> InvocationId { + match self { + Timer::Invoke(service_invocation) => service_invocation.invocation_id, + Timer::CompleteJournalEntry(invocation_id, _) => *invocation_id, + Timer::CleanInvocationStatus(invocation_id) => *invocation_id, + } + } +} + impl WithPartitionKey for Timer { fn partition_key(&self) -> PartitionKey { match self { diff --git a/crates/wal-protocol/src/lib.rs b/crates/wal-protocol/src/lib.rs index 6afb6f9ca..9f42e5bb9 100644 --- a/crates/wal-protocol/src/lib.rs +++ b/crates/wal-protocol/src/lib.rs @@ -20,7 +20,7 @@ use restate_types::{flexbuffers_storage_encode_decode, Version}; use crate::control::AnnounceLeader; use crate::effects::BuiltinServiceEffects; -use crate::timer::TimerValue; +use crate::timer::TimerKeyValue; use restate_types::logs::{LogId, Lsn, Payload}; use restate_types::partition_table::{FindPartition, PartitionTableError}; use restate_types::storage::{StorageCodec, StorageDecodeError, StorageEncodeError}; @@ -134,9 +134,9 @@ pub enum Command { /// Invoker is reporting effect(s) from an ongoing invocation. InvokerEffect(restate_invoker_api::Effect), /// Timer has fired - Timer(TimerValue), + Timer(TimerKeyValue), /// Schedule timer - ScheduleTimer(TimerValue), + ScheduleTimer(TimerKeyValue), /// Another partition processor is reporting a response of an invocation we requested. InvocationResponse(InvocationResponse), /// A built-in invoker reporting effects from an invocation. diff --git a/crates/wal-protocol/src/timer.rs b/crates/wal-protocol/src/timer.rs index b8b9feb52..6ee182371 100644 --- a/crates/wal-protocol/src/timer.rs +++ b/crates/wal-protocol/src/timer.rs @@ -8,8 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_storage_api::timer_table::{Timer, TimerKey, TimerKind}; -use restate_types::identifiers::{EntryIndex, InvocationId, WithPartitionKey}; +use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind}; +use restate_types::identifiers::{EntryIndex, InvocationId}; use restate_types::invocation::ServiceInvocation; use restate_types::time::MillisSinceEpoch; use std::borrow::Borrow; @@ -18,57 +18,40 @@ use std::hash::{Hash, Hasher}; #[derive(Debug, Clone)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct TimerValue { +pub struct TimerKeyValue { timer_key: TimerKey, value: Timer, } -impl TimerValue { +impl TimerKeyValue { pub fn new(timer_key: TimerKey, value: Timer) -> Self { Self { timer_key, value } } pub fn complete_journal_entry( - invocation_id: InvocationId, wake_up_time: MillisSinceEpoch, + invocation_id: InvocationId, entry_index: EntryIndex, ) -> Self { - let timer_key = TimerKey::complete_journal_entry( - wake_up_time.as_u64(), - invocation_id.invocation_uuid(), - entry_index, - ); - - Self { - timer_key, - value: Timer::CompleteJournalEntry(invocation_id, entry_index), - } + let (timer_key, value) = + Timer::complete_journal_entry(wake_up_time.as_u64(), invocation_id, entry_index); + + Self { timer_key, value } } - pub fn invoke( - invocation_id: InvocationId, - wake_up_time: MillisSinceEpoch, - service_invocation: ServiceInvocation, - ) -> Self { - let timer_key = TimerKey::invoke(wake_up_time.as_u64(), invocation_id.invocation_uuid()); + pub fn invoke(wake_up_time: MillisSinceEpoch, service_invocation: ServiceInvocation) -> Self { + let (timer_key, value) = Timer::invoke(wake_up_time.as_u64(), service_invocation); - Self { - timer_key, - value: Timer::Invoke(service_invocation), - } + Self { timer_key, value } } pub fn clean_invocation_status( - invocation_id: InvocationId, wake_up_time: MillisSinceEpoch, + invocation_id: InvocationId, ) -> Self { - TimerValue { - timer_key: TimerKey::clean_invocation_status( - wake_up_time.as_u64(), - invocation_id.invocation_uuid(), - ), - value: Timer::CleanInvocationStatus(invocation_id), - } + let (timer_key, value) = + Timer::clean_invocation_status(wake_up_time.as_u64(), invocation_id); + Self { timer_key, value } } pub fn into_inner(self) -> (TimerKey, Timer) { @@ -84,10 +67,7 @@ impl TimerValue { } pub fn invocation_id(&self) -> InvocationId { - InvocationId::from_parts( - self.value.partition_key(), - self.timer_key.kind.invocation_uuid(), - ) + self.value.invocation_id() } pub fn wake_up_time(&self) -> MillisSinceEpoch { @@ -95,28 +75,28 @@ impl TimerValue { } } -impl Hash for TimerValue { +impl Hash for TimerKeyValue { fn hash(&self, state: &mut H) { Hash::hash(&self.timer_key, state); // We don't hash the value field. } } -impl PartialEq for TimerValue { +impl PartialEq for TimerKeyValue { fn eq(&self, other: &Self) -> bool { self.timer_key == other.timer_key } } -impl Eq for TimerValue {} +impl Eq for TimerKeyValue {} -impl Borrow for TimerValue { +impl Borrow for TimerKeyValue { fn borrow(&self) -> &TimerKey { &self.timer_key } } -impl restate_types::timer::Timer for TimerValue { +impl restate_types::timer::Timer for TimerKeyValue { type TimerKey = TimerKey; fn timer_key(&self) -> &Self::TimerKey { @@ -131,10 +111,10 @@ pub struct TimerKeyDisplay<'a>(pub &'a TimerKey); impl<'a> fmt::Display for TimerKeyDisplay<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self.0.kind { - TimerKind::Invoke { invocation_uuid } => { + TimerKeyKind::Invoke { invocation_uuid } => { write!(f, "Delayed invocation '{}'", invocation_uuid) } - TimerKind::CompleteJournalEntry { + TimerKeyKind::CompleteJournalEntry { invocation_uuid, journal_index, } => write!( @@ -142,7 +122,7 @@ impl<'a> fmt::Display for TimerKeyDisplay<'a> { "Complete journal entry [{}] for '{}'", journal_index, invocation_uuid ), - TimerKind::CleanInvocationStatus { invocation_uuid } => { + TimerKeyKind::CleanInvocationStatus { invocation_uuid } => { write!(f, "Clean invocation status '{}'", invocation_uuid) } } diff --git a/crates/worker/src/partition/action_effect_handler.rs b/crates/worker/src/partition/action_effect_handler.rs index fe9979a9f..560f99185 100644 --- a/crates/worker/src/partition/action_effect_handler.rs +++ b/crates/worker/src/partition/action_effect_handler.rs @@ -14,7 +14,7 @@ use restate_core::metadata; use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber}; use restate_types::identifiers::{PartitionId, PartitionKey, WithPartitionKey}; use restate_types::time::MillisSinceEpoch; -use restate_wal_protocol::timer::TimerValue; +use restate_wal_protocol::timer::TimerKeyValue; use restate_wal_protocol::{ append_envelope_to_bifrost, Command, Destination, Envelope, Header, Source, }; @@ -81,9 +81,9 @@ impl ActionEffectHandler { &mut self.bifrost, Envelope::new( header.clone(), - Command::ScheduleTimer(TimerValue::clean_invocation_status( - invocation_id, + Command::ScheduleTimer(TimerKeyValue::clean_invocation_status( MillisSinceEpoch::from(SystemTime::now() + duration), + invocation_id, )), ), ) diff --git a/crates/worker/src/partition/leadership/action_collector.rs b/crates/worker/src/partition/leadership/action_collector.rs index b6e8f2ba9..8cfb7f1b8 100644 --- a/crates/worker/src/partition/leadership/action_collector.rs +++ b/crates/worker/src/partition/leadership/action_collector.rs @@ -11,7 +11,7 @@ use crate::partition::shuffle; use futures::{Stream, StreamExt}; use restate_types::identifiers::InvocationId; -use restate_wal_protocol::timer::TimerValue; +use restate_wal_protocol::timer::TimerKeyValue; use std::ops::DerefMut; use std::pin::Pin; use std::task::{Context, Poll}; @@ -46,7 +46,7 @@ impl ActionEffectStream { pub(crate) enum ActionEffect { Invoker(restate_invoker_api::Effect), Shuffle(shuffle::OutboxTruncation), - Timer(TimerValue), + Timer(TimerKeyValue), ScheduleCleanupTimer(InvocationId, Duration), } diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 5b272d2b3..62b48fc07 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -37,12 +37,12 @@ use restate_partition_store::PartitionStore; use restate_storage_api::deduplication_table::EpochSequenceNumber; use restate_types::identifiers::{InvocationId, PartitionKey}; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionLeaderEpoch}; -use restate_wal_protocol::timer::TimerValue; +use restate_wal_protocol::timer::TimerKeyValue; use super::storage::invoker::InvokerStorageReader; type PartitionStorage = storage::PartitionStorage; -type TimerService = restate_timer::TimerService; +type TimerService = restate_timer::TimerService; pub(crate) struct LeaderState { leader_epoch: LeaderEpoch, @@ -294,7 +294,7 @@ where } } - pub(crate) async fn run_timer(&mut self) -> TimerValue { + pub(crate) async fn run_timer(&mut self) -> TimerKeyValue { match self { LeadershipState::Follower { .. } => future::pending().await, LeadershipState::Leader { diff --git a/crates/worker/src/partition/state_machine/actions.rs b/crates/worker/src/partition/state_machine/actions.rs index 23573530d..8ed5cf514 100644 --- a/crates/worker/src/partition/state_machine/actions.rs +++ b/crates/worker/src/partition/state_machine/actions.rs @@ -16,7 +16,7 @@ use restate_types::ingress::IngressResponse; use restate_types::invocation::InvocationTarget; use restate_types::journal::Completion; use restate_types::message::MessageIndex; -use restate_wal_protocol::timer::TimerValue; +use restate_wal_protocol::timer::TimerKeyValue; use std::time::Duration; #[derive(Debug)] @@ -31,7 +31,7 @@ pub enum Action { message: OutboxMessage, }, RegisterTimer { - timer_value: TimerValue, + timer_value: TimerKeyValue, }, DeleteTimer { timer_key: TimerKey, diff --git a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs index cdbcf22a4..98a63158b 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs @@ -26,7 +26,7 @@ use restate_storage_api::invocation_status_table::{ use restate_storage_api::journal_table::{JournalEntry, ReadOnlyJournalTable}; use restate_storage_api::outbox_table::OutboxMessage; use restate_storage_api::service_status_table::VirtualObjectStatus; -use restate_storage_api::timer_table::{Timer, TimerKey}; +use restate_storage_api::timer_table::Timer; use restate_storage_api::Result as StorageResult; use restate_types::errors::{ InvocationError, InvocationErrorCode, CANCELED_INVOCATION_ERROR, GONE_INVOCATION_ERROR, @@ -51,7 +51,7 @@ use restate_types::message::MessageIndex; use restate_types::state_mut::ExternalStateMutation; use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::effects::{BuiltinServiceEffect, BuiltinServiceEffects}; -use restate_wal_protocol::timer::TimerValue; +use restate_wal_protocol::timer::TimerKeyValue; use restate_wal_protocol::Command; use std::collections::HashSet; use std::fmt::{Debug, Formatter}; @@ -250,11 +250,7 @@ where if let Some(execution_time) = service_invocation.execution_time { let span_context = service_invocation.span_context.clone(); effects.register_timer( - TimerValue::invoke( - service_invocation.invocation_id, - execution_time, - service_invocation, - ), + TimerKeyValue::invoke(execution_time, service_invocation), span_context, ); // The span will be created later on invocation @@ -752,9 +748,9 @@ where ProtobufRawEntryCodec::deserialize(EntryType::Sleep, entry)? ); - let timer_key = TimerKey::complete_journal_entry( + let (timer_key, _) = Timer::complete_journal_entry( wake_up_time, - invocation_id.invocation_uuid(), + invocation_id, journal_index, ); @@ -802,7 +798,7 @@ where async fn on_timer( &mut self, - timer_value: TimerValue, + timer_value: TimerKeyValue, state: &mut State, effects: &mut Effects, ) -> Result<(), Error> { @@ -1301,9 +1297,9 @@ where journal_entry.deserialize_entry_ref::()? ); effects.register_timer( - TimerValue::complete_journal_entry( - invocation_id, + TimerKeyValue::complete_journal_entry( MillisSinceEpoch::new(wake_up_time), + invocation_id, entry_index, ), invocation_metadata.journal_metadata.span_context.clone(), diff --git a/crates/worker/src/partition/state_machine/command_interpreter/tests.rs b/crates/worker/src/partition/state_machine/command_interpreter/tests.rs index ee1e3e976..2cc5974c1 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/tests.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/tests.rs @@ -12,7 +12,7 @@ use restate_service_protocol::pb::protocol::SleepEntryMessage; use restate_storage_api::idempotency_table::IdempotencyMetadata; use restate_storage_api::inbox_table::SequenceNumberInboxEntry; use restate_storage_api::invocation_status_table::{JournalMetadata, StatusTimestamps}; -use restate_storage_api::timer_table::TimerKind; +use restate_storage_api::timer_table::{TimerKey, TimerKeyKind}; use restate_storage_api::{Result as StorageResult, StorageError}; use restate_test_util::matchers::*; use restate_test_util::{assert_eq, let_assert}; @@ -763,7 +763,7 @@ fn forward_canceled_completion_matcher(entry_index: EntryIndex) -> impl Matcher< fn delete_timer(entry_index: EntryIndex) -> impl Matcher { pat!(Effect::DeleteTimer(pat!(TimerKey { - kind: pat!(TimerKind::CompleteJournalEntry { + kind: pat!(TimerKeyKind::CompleteJournalEntry { journal_index: eq(entry_index), }), timestamp: eq(1337), diff --git a/crates/worker/src/partition/state_machine/effects.rs b/crates/worker/src/partition/state_machine/effects.rs index 2666a02ad..06770761f 100644 --- a/crates/worker/src/partition/state_machine/effects.rs +++ b/crates/worker/src/partition/state_machine/effects.rs @@ -33,7 +33,7 @@ use restate_types::message::MessageIndex; use restate_types::state_mut::ExternalStateMutation; use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::timer::TimerKeyDisplay; -use restate_wal_protocol::timer::TimerValue; +use restate_wal_protocol::timer::TimerKeyValue; use std::collections::HashSet; use std::fmt; use std::time::Duration; @@ -99,7 +99,7 @@ pub(crate) enum Effect { // Timers RegisterTimer { - timer_value: TimerValue, + timer_value: TimerKeyValue, span_context: ServiceInvocationSpanContext, }, DeleteTimer(TimerKey), @@ -827,7 +827,7 @@ impl Effects { pub(crate) fn register_timer( &mut self, - timer_value: TimerValue, + timer_value: TimerKeyValue, span_context: ServiceInvocationSpanContext, ) { self.effects.push(Effect::RegisterTimer { diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 410104fd6..ccccd3362 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -729,11 +729,11 @@ mod tests { IdempotencyMetadata, IdempotencyTable, ReadOnlyIdempotencyTable, }; use restate_storage_api::invocation_status_table::{CompletedInvocation, StatusTimestamps}; - use restate_storage_api::timer_table::{Timer, TimerKey, TimerKind}; + use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind}; use restate_types::errors::GONE_INVOCATION_ERROR; use restate_types::identifiers::IdempotencyId; use restate_types::invocation::{Idempotency, InvocationTarget}; - use restate_wal_protocol::timer::TimerValue; + use restate_wal_protocol::timer::TimerKeyValue; use test_log::test; #[test(tokio::test)] @@ -1120,9 +1120,9 @@ mod tests { // Send timer fired command let _ = state_machine - .apply(Command::Timer(TimerValue::new( + .apply(Command::Timer(TimerKeyValue::new( TimerKey { - kind: TimerKind::Invoke { + kind: TimerKeyKind::Invoke { invocation_uuid: invocation_id.invocation_uuid(), }, timestamp: 0, diff --git a/crates/worker/src/partition/storage/mod.rs b/crates/worker/src/partition/storage/mod.rs index e295cddec..c7b3afab2 100644 --- a/crates/worker/src/partition/storage/mod.rs +++ b/crates/worker/src/partition/storage/mod.rs @@ -40,7 +40,7 @@ use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal::CompletionResult; use restate_types::logs::Lsn; use restate_types::message::MessageIndex; -use restate_wal_protocol::timer::TimerValue; +use restate_wal_protocol::timer::TimerKeyValue; use std::future::Future; use std::ops::RangeInclusive; @@ -666,7 +666,7 @@ where } } -impl TimerReader for PartitionStorage +impl TimerReader for PartitionStorage where for<'a> Storage: TimerTable + Send + Sync + 'a, { @@ -674,10 +674,10 @@ where &mut self, num_timers: usize, previous_timer_key: Option, - ) -> Vec { + ) -> Vec { self.storage .next_timers_greater_than(self.partition_id, previous_timer_key.as_ref(), num_timers) - .map(|result| result.map(|(timer_key, timer)| TimerValue::new(timer_key, timer))) + .map(|result| result.map(|(timer_key, timer)| TimerKeyValue::new(timer_key, timer))) // TODO: Update timer service to maintain transaction while reading the timer stream: See https://github.com/restatedev/restate/issues/273 // have to collect the stream because it depends on the local transaction .try_collect::>()