diff --git a/node/core/dispute-coordinator/src/error.rs b/node/core/dispute-coordinator/src/error.rs index 662db12ab6fd..cbda3dc1d121 100644 --- a/node/core/dispute-coordinator/src/error.rs +++ b/node/core/dispute-coordinator/src/error.rs @@ -18,7 +18,7 @@ use fatality::Nested; use futures::channel::oneshot; use polkadot_node_subsystem::{errors::ChainApiError, SubsystemError}; -use polkadot_node_subsystem_util::{rolling_session_window::SessionsUnavailable, runtime}; +use polkadot_node_subsystem_util::runtime; use crate::{db, participation, LOG_TARGET}; use parity_scale_codec::Error as CodecError; @@ -96,8 +96,8 @@ pub enum Error { Codec(#[from] CodecError), /// `RollingSessionWindow` was not able to retrieve `SessionInfo`s. - #[error("Sessions unavailable in `RollingSessionWindow`: {0}")] - RollingSessionWindow(#[from] SessionsUnavailable), + #[error("Session can't be fetched via `RuntimeInfo`")] + SessionInfo, #[error(transparent)] QueueError(#[from] participation::QueueError), diff --git a/node/core/dispute-coordinator/src/import.rs b/node/core/dispute-coordinator/src/import.rs index 4968058a13fc..0c53211b358f 100644 --- a/node/core/dispute-coordinator/src/import.rs +++ b/node/core/dispute-coordinator/src/import.rs @@ -31,9 +31,10 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use polkadot_node_primitives::{ disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement, Timestamp, }; -use polkadot_node_subsystem_util::rolling_session_window::RollingSessionWindow; +use polkadot_node_subsystem::overseer; +use polkadot_node_subsystem_util::runtime::RuntimeInfo; use polkadot_primitives::{ - CandidateReceipt, DisputeStatement, IndexedVec, SessionIndex, SessionInfo, + CandidateReceipt, DisputeStatement, Hash, IndexedVec, SessionIndex, SessionInfo, ValidDisputeStatementKind, ValidatorId, ValidatorIndex, ValidatorPair, ValidatorSignature, }; use sc_keystore::LocalKeystore; @@ -50,18 +51,29 @@ pub struct CandidateEnvironment<'a> { controlled_indices: HashSet, } +#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)] impl<'a> CandidateEnvironment<'a> { /// Create `CandidateEnvironment`. /// /// Return: `None` in case session is outside of session window. - pub fn new( + pub async fn new( keystore: &LocalKeystore, - session_window: &'a RollingSessionWindow, + ctx: &mut Context, + runtime_info: &'a mut RuntimeInfo, session_index: SessionIndex, - ) -> Option { - let session = session_window.session_info(session_index)?; - let controlled_indices = find_controlled_validator_indices(keystore, &session.validators); - Some(Self { session_index, session, controlled_indices }) + relay_parent: Hash, + ) -> Option> { + let session_info = match runtime_info + .get_session_info_by_index(ctx.sender(), relay_parent, session_index) + .await + { + Ok(extended_session_info) => &extended_session_info.session_info, + Err(_) => return None, + }; + + let controlled_indices = + find_controlled_validator_indices(keystore, &session_info.validators); + Some(Self { session_index, session: session_info, controlled_indices }) } /// Validators in the candidate's session. diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index e050de0cdae3..cd94770da888 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -26,7 +26,8 @@ use futures::{ use sc_keystore::LocalKeystore; use polkadot_node_primitives::{ - disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement, Timestamp, + disputes::ValidCandidateVotes, CandidateVotes, DisputeStatus, SignedDisputeStatement, + Timestamp, DISPUTE_WINDOW, }; use polkadot_node_subsystem::{ messages::{ @@ -35,17 +36,16 @@ use polkadot_node_subsystem::{ }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, }; -use polkadot_node_subsystem_util::rolling_session_window::{ - RollingSessionWindow, SessionWindowUpdate, SessionsUnavailable, -}; +use polkadot_node_subsystem_util::runtime::RuntimeInfo; use polkadot_primitives::{ BlockNumber, CandidateHash, CandidateReceipt, CompactStatement, DisputeStatement, - DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, SessionInfo, - ValidDisputeStatementKind, ValidatorId, ValidatorIndex, + DisputeStatementSet, Hash, ScrapedOnChainVotes, SessionIndex, ValidDisputeStatementKind, + ValidatorId, ValidatorIndex, }; use crate::{ - error::{log_error, Error, FatalError, FatalResult, JfyiError, JfyiResult, Result}, + db, + error::{log_error, FatalError, FatalResult, JfyiError, JfyiResult, Result}, import::{CandidateEnvironment, CandidateVoteState}, is_potential_spam, metrics::Metrics, @@ -55,7 +55,7 @@ use crate::{ use super::{ backend::Backend, - db, make_dispute_message, + make_dispute_message, participation::{ self, Participation, ParticipationPriority, ParticipationRequest, ParticipationStatement, WorkerMessageReceiver, @@ -65,23 +65,31 @@ use super::{ OverlayedBackend, }; +// Initial data for `dispute-coordinator`. It is provided only at first start. +pub struct InitialData { + pub participations: Vec<(ParticipationPriority, ParticipationRequest)>, + pub votes: Vec, + pub leaf: ActivatedLeaf, +} + /// After the first active leaves update we transition to `Initialized` state. /// /// Before the first active leaves update we can't really do much. We cannot check incoming -/// statements for validity, we cannot query orderings, we have no valid `RollingSessionWindow`, +/// statements for validity, we cannot query orderings, we have no valid `SessionInfo`, /// ... -pub struct Initialized { +pub(crate) struct Initialized { keystore: Arc, - rolling_session_window: RollingSessionWindow, - highest_session: SessionIndex, + runtime_info: RuntimeInfo, + /// This is the highest `SessionIndex` seen via `ActiveLeavesUpdate`. It doen't matter if it was + /// cached successfully or not. It is used to detect ancient disputes. + highest_session_seen: SessionIndex, + /// Will be set to `true` if an error occured during the last caching attempt + gaps_in_cache: bool, spam_slots: SpamSlots, participation: Participation, scraper: ChainScraper, participation_receiver: WorkerMessageReceiver, metrics: Metrics, - // This tracks only rolling session window failures. - // It can be a `Vec` if the need to track more arises. - error: Option, } #[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)] @@ -89,55 +97,46 @@ impl Initialized { /// Make initialized subsystem, ready to `run`. pub fn new( subsystem: DisputeCoordinatorSubsystem, - rolling_session_window: RollingSessionWindow, + runtime_info: RuntimeInfo, spam_slots: SpamSlots, scraper: ChainScraper, + highest_session_seen: SessionIndex, + gaps_in_cache: bool, ) -> Self { let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem; let (participation_sender, participation_receiver) = mpsc::channel(1); let participation = Participation::new(participation_sender, metrics.clone()); - let highest_session = rolling_session_window.latest_session(); Self { keystore, - rolling_session_window, - highest_session, + runtime_info, + highest_session_seen, + gaps_in_cache, spam_slots, scraper, participation, participation_receiver, metrics, - error: None, } } /// Run the initialized subsystem. /// - /// Optionally supply initial participations and a first leaf to process. + /// `initial_data` is optional. It is passed on first start and is `None` on subsystem restarts. pub async fn run( mut self, mut ctx: Context, mut backend: B, - mut participations: Vec<(ParticipationPriority, ParticipationRequest)>, - mut votes: Vec, - mut first_leaf: Option, + mut initial_data: Option, clock: Box, ) -> FatalResult<()> where B: Backend, { loop { - let res = self - .run_until_error( - &mut ctx, - &mut backend, - &mut participations, - &mut votes, - &mut first_leaf, - &*clock, - ) - .await; + let res = + self.run_until_error(&mut ctx, &mut backend, &mut initial_data, &*clock).await; if let Ok(()) = res { gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting"); return Ok(()) @@ -155,23 +154,29 @@ impl Initialized { &mut self, ctx: &mut Context, backend: &mut B, - participations: &mut Vec<(ParticipationPriority, ParticipationRequest)>, - on_chain_votes: &mut Vec, - first_leaf: &mut Option, + initial_data: &mut Option, clock: &dyn Clock, ) -> Result<()> where B: Backend, { - for (priority, request) in participations.drain(..) { - self.participation.queue_participation(ctx, priority, request).await?; - } - + if let Some(InitialData { participations, votes: on_chain_votes, leaf: first_leaf }) = + initial_data.take() { + for (priority, request) in participations { + self.participation.queue_participation(ctx, priority, request).await?; + } + let mut overlay_db = OverlayedBackend::new(backend); - for votes in on_chain_votes.drain(..) { + for votes in on_chain_votes { let _ = self - .process_on_chain_votes(ctx, &mut overlay_db, votes, clock.now()) + .process_on_chain_votes( + ctx, + &mut overlay_db, + votes, + clock.now(), + first_leaf.hash, + ) .await .map_err(|error| { gum::warn!( @@ -185,9 +190,7 @@ impl Initialized { let ops = overlay_db.into_write_ops(); backend.write(ops)?; } - } - if let Some(first_leaf) = first_leaf.take() { // Also provide first leaf to participation for good measure. self.participation .process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf)) @@ -283,37 +286,56 @@ impl Initialized { self.participation.process_active_leaves_update(ctx, &update).await?; if let Some(new_leaf) = update.activated { - match self - .rolling_session_window - .cache_session_info_for_head(ctx.sender(), new_leaf.hash) - .await - { - Err(e) => { - gum::warn!( + let session_idx = + self.runtime_info.get_session_index_for_child(ctx.sender(), new_leaf.hash).await; + + match session_idx { + Ok(session_idx) + if self.gaps_in_cache || session_idx > self.highest_session_seen => + { + // If error has occurred during last session caching - fetch the whole window + // Otherwise - cache only the new sessions + let lower_bound = if self.gaps_in_cache { + session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1) + } else { + self.highest_session_seen + 1 + }; + + // There is a new session. Perform a dummy fetch to cache it. + for idx in lower_bound..=session_idx { + if let Err(err) = self + .runtime_info + .get_session_info_by_index(ctx.sender(), new_leaf.hash, idx) + .await + { + gum::debug!( + target: LOG_TARGET, + session_idx, + leaf_hash = ?new_leaf.hash, + ?err, + "Error caching SessionInfo on ActiveLeaves update" + ); + self.gaps_in_cache = true; + } + } + + self.highest_session_seen = session_idx; + + db::v1::note_earliest_session( + overlay_db, + session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1), + )?; + self.spam_slots.prune_old(session_idx.saturating_sub(DISPUTE_WINDOW.get() - 1)); + }, + Ok(_) => { /* no new session => nothing to cache */ }, + Err(err) => { + gum::debug!( target: LOG_TARGET, - err = ?e, - "Failed to update session cache for disputes", + ?err, + "Failed to update session cache for disputes - can't fetch session index", ); - self.error = Some(e); - }, - Ok(SessionWindowUpdate::Advanced { - new_window_end: window_end, - new_window_start, - .. - }) => { - self.error = None; - let session = window_end; - if self.highest_session < session { - gum::trace!(target: LOG_TARGET, session, "Observed new session. Pruning"); - - self.highest_session = session; - - db::v1::note_earliest_session(overlay_db, new_window_start)?; - self.spam_slots.prune_old(new_window_start); - } }, - Ok(SessionWindowUpdate::Unchanged) => {}, - }; + } gum::trace!( target: LOG_TARGET, @@ -325,15 +347,16 @@ impl Initialized { // The `runtime-api` subsystem has an internal queue which serializes the execution, // so there is no point in running these in parallel for votes in scraped_updates.on_chain_votes { - let _ = self.process_on_chain_votes(ctx, overlay_db, votes, now).await.map_err( - |error| { + let _ = self + .process_on_chain_votes(ctx, overlay_db, votes, now, new_leaf.hash) + .await + .map_err(|error| { gum::warn!( target: LOG_TARGET, ?error, "Skipping scraping block due to error", ); - }, - ); + }); } } @@ -349,6 +372,7 @@ impl Initialized { overlay_db: &mut OverlayedBackend<'_, impl Backend>, votes: ScrapedOnChainVotes, now: u64, + block_hash: Hash, ) -> Result<()> { let ScrapedOnChainVotes { session, backing_validators_per_candidate, disputes } = votes; @@ -360,23 +384,24 @@ impl Initialized { // the new active leaf as if we received them via gossip. for (candidate_receipt, backers) in backing_validators_per_candidate { // Obtain the session info, for sake of `ValidatorId`s - // either from the rolling session window. - // Must be called _after_ `fn cache_session_info_for_head` - // which guarantees that the session info is available - // for the current session. - let session_info: &SessionInfo = - if let Some(session_info) = self.rolling_session_window.session_info(session) { - session_info - } else { + let relay_parent = candidate_receipt.descriptor.relay_parent; + let session_info = match self + .runtime_info + .get_session_info_by_index(ctx.sender(), relay_parent, session) + .await + { + Ok(extended_session_info) => &extended_session_info.session_info, + Err(err) => { gum::warn!( target: LOG_TARGET, ?session, - "Could not retrieve session info from rolling session window", + ?err, + "Could not retrieve session info from RuntimeInfo", ); return Ok(()) - }; + }, + }; - let relay_parent = candidate_receipt.descriptor.relay_parent; let candidate_hash = candidate_receipt.hash(); gum::trace!( target: LOG_TARGET, @@ -470,18 +495,23 @@ impl Initialized { ?session, "Importing dispute votes from chain for candidate" ); - let session_info = - if let Some(session_info) = self.rolling_session_window.session_info(session) { - session_info - } else { + let session_info = match self + .runtime_info + .get_session_info_by_index(ctx.sender(), block_hash, session) + .await + { + Ok(extended_session_info) => &extended_session_info.session_info, + Err(err) => { gum::warn!( target: LOG_TARGET, ?candidate_hash, ?session, - "Could not retrieve session info from rolling session window for recently concluded dispute" + ?err, + "Could not retrieve session info for recently concluded dispute" ); continue - }; + }, + }; let statements = statements .into_iter() @@ -593,9 +623,6 @@ impl Initialized { } }, DisputeCoordinatorMessage::RecentDisputes(tx) => { - // Return error if session information is missing. - self.ensure_available_session_info()?; - gum::trace!(target: LOG_TARGET, "Loading recent disputes from db"); let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? { disputes @@ -609,11 +636,7 @@ impl Initialized { ); }, DisputeCoordinatorMessage::ActiveDisputes(tx) => { - // Return error if session information is missing. - self.ensure_available_session_info()?; - gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::ActiveDisputes"); - let recent_disputes = if let Some(disputes) = overlay_db.load_recent_disputes()? { disputes } else { @@ -629,11 +652,7 @@ impl Initialized { ); }, DisputeCoordinatorMessage::QueryCandidateVotes(query, tx) => { - // Return error if session information is missing. - self.ensure_available_session_info()?; - gum::trace!(target: LOG_TARGET, "DisputeCoordinatorMessage::QueryCandidateVotes"); - let mut query_output = Vec::new(); for (session_index, candidate_hash) in query { if let Some(v) = @@ -673,8 +692,6 @@ impl Initialized { block_descriptions, tx, } => { - // Return error if session information is missing. - self.ensure_available_session_info()?; gum::trace!( target: LOG_TARGET, "DisputeCoordinatorMessage::DetermineUndisputedChain" @@ -694,15 +711,6 @@ impl Initialized { Ok(Box::new(|| Ok(()))) } - // Helper function for checking subsystem errors in message processing. - fn ensure_available_session_info(&self) -> Result<()> { - if let Some(subsystem_error) = self.error.clone() { - return Err(Error::RollingSessionWindow(subsystem_error)) - } - - Ok(()) - } - // We use fatal result rather than result here. Reason being, We for example increase // spam slots in this function. If then the import fails for some non fatal and // unrelated reason, we should likely actually decrement previously incremented spam @@ -717,16 +725,39 @@ impl Initialized { now: Timestamp, ) -> FatalResult { gum::trace!(target: LOG_TARGET, ?statements, "In handle import statements"); - if !self.rolling_session_window.contains(session) { + if self.session_is_ancient(session) { // It is not valid to participate in an ancient dispute (spam?) or too new. return Ok(ImportStatementsResult::InvalidImport) } + let candidate_hash = candidate_receipt.hash(); + let votes_in_db = overlay_db.load_candidate_votes(session, &candidate_hash)?; + let relay_parent = match &candidate_receipt { + MaybeCandidateReceipt::Provides(candidate_receipt) => + candidate_receipt.descriptor().relay_parent, + MaybeCandidateReceipt::AssumeBackingVotePresent(candidate_hash) => match &votes_in_db { + Some(votes) => votes.candidate_receipt.descriptor().relay_parent, + None => { + gum::warn!( + target: LOG_TARGET, + session, + ?candidate_hash, + "Cannot obtain relay parent without `CandidateReceipt` available!" + ); + return Ok(ImportStatementsResult::InvalidImport) + }, + }, + }; + let env = match CandidateEnvironment::new( &self.keystore, - &self.rolling_session_window, + ctx, + &mut self.runtime_info, session, - ) { + relay_parent, + ) + .await + { None => { gum::warn!( target: LOG_TARGET, @@ -739,8 +770,6 @@ impl Initialized { Some(env) => env, }; - let candidate_hash = candidate_receipt.hash(); - gum::trace!( target: LOG_TARGET, ?candidate_hash, @@ -757,10 +786,7 @@ impl Initialized { // There is one exception: A sufficiently sophisticated attacker could prevent // us from seeing the backing votes by withholding arbitrary blocks, and hence we do // not have a `CandidateReceipt` available. - let old_state = match overlay_db - .load_candidate_votes(session, &candidate_hash)? - .map(CandidateVotes::from) - { + let old_state = match votes_in_db.map(CandidateVotes::from) { Some(votes) => CandidateVoteState::new(votes, &env, now), None => if let MaybeCandidateReceipt::Provides(candidate_receipt) = candidate_receipt { @@ -1120,12 +1146,17 @@ impl Initialized { ?now, "Issuing local statement for candidate!" ); + // Load environment: let env = match CandidateEnvironment::new( &self.keystore, - &self.rolling_session_window, + ctx, + &mut self.runtime_info, session, - ) { + candidate_receipt.descriptor.relay_parent, + ) + .await + { None => { gum::warn!( target: LOG_TARGET, @@ -1175,10 +1206,10 @@ impl Initialized { statements.push((signed_dispute_statement, *index)); }, Ok(None) => {}, - Err(e) => { + Err(err) => { gum::error!( target: LOG_TARGET, - err = ?e, + ?err, "Encountered keystore error while signing dispute statement", ); }, @@ -1233,6 +1264,10 @@ impl Initialized { Ok(()) } + + fn session_is_ancient(&self, session_idx: SessionIndex) -> bool { + return session_idx < self.highest_session_seen.saturating_sub(DISPUTE_WINDOW.get() - 1) + } } /// Messages to be handled in this subsystem. diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 6ad7b13517a8..7379b392f312 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -24,7 +24,7 @@ //! validation results as well as a sink for votes received by other subsystems. When importing a dispute vote from //! another node, this will trigger dispute participation to recover and validate the block. -use std::sync::Arc; +use std::{num::NonZeroUsize, sync::Arc}; use futures::FutureExt; @@ -33,6 +33,7 @@ use sc_keystore::LocalKeystore; use polkadot_node_primitives::{ CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement, + DISPUTE_WINDOW, }; use polkadot_node_subsystem::{ messages::DisputeDistributionMessage, overseer, ActivatedLeaf, FromOrchestra, OverseerSignal, @@ -40,12 +41,14 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_util::{ database::Database, - rolling_session_window::{DatabaseParams, RollingSessionWindow}, + runtime::{Config as RuntimeInfoConfig, RuntimeInfo}, +}; +use polkadot_primitives::{ + DisputeStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidatorIndex, }; -use polkadot_primitives::{DisputeStatement, ScrapedOnChainVotes, SessionInfo, ValidatorIndex}; use crate::{ - error::{FatalResult, JfyiError, Result}, + error::{FatalResult, Result}, metrics::Metrics, status::{get_active_with_status, SystemClock}, }; @@ -65,7 +68,7 @@ pub(crate) mod error; /// Subsystem after receiving the first active leaf. mod initialized; -use initialized::Initialized; +use initialized::{InitialData, Initialized}; /// Provider of data scraped from chain. /// @@ -187,7 +190,7 @@ impl DisputeCoordinatorSubsystem { }; initialized - .run(ctx, backend, participations, votes, Some(first_leaf), clock) + .run(ctx, backend, Some(InitialData { participations, votes, leaf: first_leaf }), clock) .await } @@ -210,31 +213,32 @@ impl DisputeCoordinatorSubsystem { B: Backend + 'static, { loop { - let db_params = - DatabaseParams { db: self.store.clone(), db_column: self.config.col_session_data }; - - let (first_leaf, rolling_session_window) = - match get_rolling_session_window(ctx, db_params).await { - Ok(Some(update)) => update, - Ok(None) => { - gum::info!(target: LOG_TARGET, "received `Conclude` signal, exiting"); - return Ok(None) - }, - Err(e) => { - e.split()?.log(); - continue - }, - }; + let first_leaf = match wait_for_first_leaf(ctx).await { + Ok(Some(activated_leaf)) => activated_leaf, + Ok(None) => continue, + Err(e) => { + e.split()?.log(); + continue + }, + }; + // `RuntimeInfo` cache should match `DISPUTE_WINDOW` so that we can + // keep all sessions for a dispute window + let mut runtime_info = RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: NonZeroUsize::new(DISPUTE_WINDOW.get() as usize) + .expect("DISPUTE_WINDOW can't be 0; qed."), + }); let mut overlay_db = OverlayedBackend::new(&mut backend); - let (participations, votes, spam_slots, ordering_provider) = match self - .handle_startup( - ctx, - first_leaf.clone(), - &rolling_session_window, - &mut overlay_db, - clock, - ) + let ( + participations, + votes, + spam_slots, + ordering_provider, + highest_session_seen, + gaps_in_cache, + ) = match self + .handle_startup(ctx, first_leaf.clone(), &mut runtime_info, &mut overlay_db, clock) .await { Ok(v) => v, @@ -252,7 +256,14 @@ impl DisputeCoordinatorSubsystem { participations, votes, first_leaf, - Initialized::new(self, rolling_session_window, spam_slots, ordering_provider), + Initialized::new( + self, + runtime_info, + spam_slots, + ordering_provider, + highest_session_seen, + gaps_in_cache, + ), backend, ))) } @@ -267,7 +278,7 @@ impl DisputeCoordinatorSubsystem { &self, ctx: &mut Context, initial_head: ActivatedLeaf, - rolling_session_window: &RollingSessionWindow, + runtime_info: &mut RuntimeInfo, overlay_db: &mut OverlayedBackend<'_, impl Backend>, clock: &dyn Clock, ) -> Result<( @@ -275,10 +286,9 @@ impl DisputeCoordinatorSubsystem { Vec, SpamSlots, ChainScraper, + SessionIndex, + bool, )> { - // Prune obsolete disputes: - db::v1::note_earliest_session(overlay_db, rolling_session_window.earliest_session())?; - let now = clock.now(); let active_disputes = match overlay_db.load_recent_disputes() { @@ -292,23 +302,63 @@ impl DisputeCoordinatorSubsystem { }, }; + // We assume the highest session is the passed leaf. If we can't get the session index + // we can't initialize the subsystem so we'll wait for a new leaf + let highest_session = runtime_info + .get_session_index_for_child(ctx.sender(), initial_head.hash) + .await?; + + let mut gap_in_cache = false; + // Cache the sessions. A failure to fetch a session here is not that critical so we + // won't abort the initialization + for idx in highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1)..=highest_session { + if let Err(e) = runtime_info + .get_session_info_by_index(ctx.sender(), initial_head.hash, idx) + .await + { + gum::debug!( + target: LOG_TARGET, + leaf_hash = ?initial_head.hash, + session_idx = idx, + err = ?e, + "Can't cache SessionInfo during subsystem initialization. Skipping session." + ); + gap_in_cache = true; + continue + }; + } + + // Prune obsolete disputes: + db::v1::note_earliest_session( + overlay_db, + highest_session.saturating_sub(DISPUTE_WINDOW.get() - 1), + )?; + let mut participation_requests = Vec::new(); let mut spam_disputes: UnconfirmedDisputes = UnconfirmedDisputes::new(); + let leaf_hash = initial_head.hash; let (scraper, votes) = ChainScraper::new(ctx.sender(), initial_head).await?; for ((session, ref candidate_hash), _) in active_disputes { - let env = - match CandidateEnvironment::new(&self.keystore, &rolling_session_window, session) { - None => { - gum::warn!( - target: LOG_TARGET, - session, - "We are lacking a `SessionInfo` for handling db votes on startup." - ); + let env = match CandidateEnvironment::new( + &self.keystore, + ctx, + runtime_info, + highest_session, + leaf_hash, + ) + .await + { + None => { + gum::warn!( + target: LOG_TARGET, + session, + "We are lacking a `SessionInfo` for handling db votes on startup." + ); - continue - }, - Some(env) => env, - }; + continue + }, + Some(env) => env, + }; let votes: CandidateVotes = match overlay_db.load_candidate_votes(session, candidate_hash) { @@ -370,26 +420,14 @@ impl DisputeCoordinatorSubsystem { } } - Ok((participation_requests, votes, SpamSlots::recover_from_state(spam_disputes), scraper)) - } -} - -/// Wait for `ActiveLeavesUpdate` on startup, returns `None` if `Conclude` signal came first. -#[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)] -async fn get_rolling_session_window( - ctx: &mut Context, - db_params: DatabaseParams, -) -> Result> { - if let Some(leaf) = { wait_for_first_leaf(ctx) }.await? { - let sender = ctx.sender().clone(); - Ok(Some(( - leaf.clone(), - RollingSessionWindow::new(sender, leaf.hash, db_params) - .await - .map_err(JfyiError::RollingSessionWindow)?, - ))) - } else { - Ok(None) + Ok(( + participation_requests, + votes, + SpamSlots::recover_from_state(spam_disputes), + scraper, + highest_session, + gap_in_cache, + )) } } diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 1792075e1978..b685660d62b7 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -281,14 +281,8 @@ impl TestState { ))) .await; - self.handle_sync_queries( - virtual_overseer, - block_hash, - block_number, - session, - candidate_events, - ) - .await; + self.handle_sync_queries(virtual_overseer, block_hash, session, candidate_events) + .await; } /// Returns any sent `DisputeMessage`s. @@ -296,7 +290,6 @@ impl TestState { &mut self, virtual_overseer: &mut VirtualOverseer, block_hash: Hash, - block_number: BlockNumber, session: SessionIndex, candidate_events: Vec, ) -> Vec { @@ -334,61 +327,24 @@ impl TestState { assert_eq!(h, block_hash); let _ = tx.send(Ok(session)); - // Queries for fetching earliest unfinalized block session. See `RollingSessionWindow`. + // Queries for session caching - see `handle_startup` if self.known_session.is_none() { - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( - s_tx, - )) => { - let _ = s_tx.send(Ok(block_number)); - } - ); - - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( - number, - s_tx, - )) => { - assert_eq!(block_number, number); - let _ = s_tx.send(Ok(Some(block_hash))); - } - ); - - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) => { - assert_eq!(h, block_hash); - let _ = s_tx.send(Ok(session)); - } - ); - } - - // No queries, if subsystem knows about this session already. - if self.known_session == Some(session) { - continue - } - self.known_session = Some(session); - - loop { - // answer session info queries until the current session is reached. - assert_matches!( - overseer_recv(virtual_overseer).await, - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - h, - RuntimeApiRequest::SessionInfo(session_index, tx), + for i in 0..=session { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionInfo(session_index, tx), )) => { - assert_eq!(h, block_hash); - - let _ = tx.send(Ok(Some(self.session_info()))); - if session_index == session { break } + assert_eq!(h, block_hash); + assert_eq!(session_index, i); + let _ = tx.send(Ok(Some(self.session_info()))); + } + ); } - ); } + + self.known_session = Some(session); }, AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(tx)) => { assert!( @@ -481,9 +437,8 @@ impl TestState { let events = if n == 1 { std::mem::take(&mut initial_events) } else { Vec::new() }; - let mut new_messages = self - .handle_sync_queries(virtual_overseer, *leaf, n as BlockNumber, session, events) - .await; + let mut new_messages = + self.handle_sync_queries(virtual_overseer, *leaf, session, events).await; messages.append(&mut new_messages); } messages