diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index 7e31f5000d..4d3f67ea06 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -23,8 +23,7 @@ //! several reasons, either a malicious collator that managed to include its own PoV and doesn't want //! to share it with the rest of the network or maybe a collator went down before it could distribute //! the block in the network. When something like this happens we can use the PoV recovery algorithm -//! implemented in this crate to recover a PoV and to propagate it with the rest of the network. This -//! protocol is only executed by the collators, to not overwhelm the relay chain validators. +//! implemented in this crate to recover a PoV and to propagate it with the rest of the network. //! //! It works in the following way: //! @@ -83,6 +82,26 @@ struct PendingCandidate { block_number: NumberFor, } +/// The delay between observing an unknown block and recovering this block. +#[derive(Clone, Copy)] +pub enum RecoveryDelay { + /// Start recovering the block in maximum of the given delay. + WithMax { max: Duration }, + /// Start recovering the block after at least `min` delay and in maximum `max` delay. + WithMinAndMax { min: Duration, max: Duration }, +} + +impl RecoveryDelay { + /// Return as [`Delay`]. + fn as_delay(self) -> Delay { + match self { + Self::WithMax { max } => Delay::new(max.mul_f64(thread_rng().gen())), + Self::WithMinAndMax { min, max } => + Delay::new(min + max.saturating_sub(min).mul_f64(thread_rng().gen())), + } + } +} + /// Encapsulates the logic of the pov recovery. pub struct PoVRecovery { /// All the pending candidates that we are waiting for to be imported or that need to be @@ -98,7 +117,7 @@ pub struct PoVRecovery { /// /// Uses parent -> blocks mapping. waiting_for_parent: HashMap>, - relay_chain_slot_duration: Duration, + recovery_delay: RecoveryDelay, parachain_client: Arc, parachain_import_queue: IQ, relay_chain_interface: RC, @@ -114,7 +133,7 @@ where /// Create a new instance. pub fn new( overseer_handle: OverseerHandle, - relay_chain_slot_duration: Duration, + recovery_delay: RecoveryDelay, parachain_client: Arc, parachain_import_queue: IQ, relay_chain_interface: RCInterface, @@ -124,7 +143,7 @@ where pending_candidates: HashMap::new(), next_candidate_to_recover: Default::default(), active_candidate_recovery: ActiveCandidateRecovery::new(overseer_handle), - relay_chain_slot_duration, + recovery_delay, waiting_for_parent: HashMap::new(), parachain_client, parachain_import_queue, @@ -186,9 +205,8 @@ where return } - // Wait some random time, with the maximum being the slot duration of the relay chain - // before we start to recover the candidate. - let delay = Delay::new(self.relay_chain_slot_duration.mul_f64(thread_rng().gen())); + // Delay the recovery by some random time to not spam the relay chain. + let delay = self.recovery_delay.as_delay(); self.next_candidate_to_recover.push( async move { delay.await; diff --git a/client/pov-recovery/tests/pov_recovery.rs b/client/pov-recovery/tests/pov_recovery.rs index c0240a48b9..0b536bfd71 100644 --- a/client/pov-recovery/tests/pov_recovery.rs +++ b/client/pov-recovery/tests/pov_recovery.rs @@ -73,16 +73,31 @@ async fn pov_recovery() { .build() .await; - // Run dave as parachain full node + // Run dave as parachain collator and eve as parachain full node // - // It will need to recover the pov blocks through availability recovery. - let dave = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle, Dave) + // They will need to recover the pov blocks through availability recovery. + let dave = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Dave) .enable_collator() .use_null_consensus() .connect_to_parachain_node(&charlie) .connect_to_relay_chain_nodes(vec![&alice, &bob]) + .wrap_announce_block(|_| { + // Never announce any block + Arc::new(|_, _| {}) + }) .build() .await; - dave.wait_for_blocks(7).await; + let eve = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle, Eve) + .use_null_consensus() + .connect_to_parachain_node(&charlie) + .connect_to_relay_chain_nodes(vec![&alice, &bob]) + .wrap_announce_block(|_| { + // Never announce any block + Arc::new(|_, _| {}) + }) + .build() + .await; + + futures::future::join(dave.wait_for_blocks(7), eve.wait_for_blocks(7)).await; } diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 925c957c6f..5b050e75aa 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -54,7 +54,7 @@ pub struct StartCollatorParams<'a, Block: BlockT, BS, Client, RCInterface, Spawn pub parachain_consensus: Box>, pub import_queue: IQ, pub collator_key: CollatorPair, - pub slot_duration: Duration, + pub relay_chain_slot_duration: Duration, } /// Start a collator node for a parachain. @@ -74,7 +74,7 @@ pub async fn start_collator<'a, Block, BS, Client, Backend, RCInterface, Spawner parachain_consensus, import_queue, collator_key, - slot_duration, + relay_chain_slot_duration, }: StartCollatorParams<'a, Block, BS, Client, RCInterface, Spawner, IQ>, ) -> sc_service::error::Result<()> where @@ -111,7 +111,9 @@ where relay_chain_interface .overseer_handle() .ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?, - slot_duration, + // We want that collators wait at maximum the relay chain slot duration before starting + // to recover blocks. + cumulus_client_pov_recovery::RecoveryDelay::WithMax { max: relay_chain_slot_duration }, client.clone(), import_queue, relay_chain_interface.clone(), @@ -140,26 +142,30 @@ where } /// Parameters given to [`start_full_node`]. -pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface> { +pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface, IQ> { pub para_id: ParaId, pub client: Arc, pub relay_chain_interface: RCInterface, pub task_manager: &'a mut TaskManager, pub announce_block: Arc>) + Send + Sync>, + pub relay_chain_slot_duration: Duration, + pub import_queue: IQ, } /// Start a full node for a parachain. /// /// A full node will only sync the given parachain and will follow the /// tip of the chain. -pub fn start_full_node( +pub fn start_full_node( StartFullNodeParams { client, announce_block, task_manager, relay_chain_interface, para_id, - }: StartFullNodeParams, + relay_chain_slot_duration, + import_queue, + }: StartFullNodeParams, ) -> sc_service::error::Result<()> where Block: BlockT, @@ -173,6 +179,7 @@ where for<'a> &'a Client: BlockImport, Backend: BackendT + 'static, RCInterface: RelayChainInterface + Clone + 'static, + IQ: ImportQueue + 'static, { let consensus = cumulus_client_consensus_common::run_parachain_consensus( para_id, @@ -185,10 +192,33 @@ where .spawn_essential_handle() .spawn("cumulus-consensus", None, consensus); + let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new( + relay_chain_interface + .overseer_handle() + .ok_or_else(|| "Polkadot full node did not provide an `OverseerHandle`!")?, + // Full nodes should at least wait 2.5 minutes (assuming 6 seconds slot duration) and + // in maximum 5 minutes before starting to recover blocks. Collators should already start + // the recovery way before full nodes try to recover a certain block and then share the + // block with the network using "the normal way". Full nodes are just the "last resort" + // for block recovery. + cumulus_client_pov_recovery::RecoveryDelay::WithMinAndMax { + min: relay_chain_slot_duration * 25, + max: relay_chain_slot_duration * 50, + }, + client.clone(), + import_queue, + relay_chain_interface.clone(), + para_id, + ); + + task_manager + .spawn_essential_handle() + .spawn("cumulus-pov-recovery", None, pov_recovery.run()); + Ok(()) } -/// Prepare the parachain's node condifugration +/// Prepare the parachain's node configuration /// /// This function will disable the default announcement of Substrate for the parachain in favor /// of the one of Cumulus. diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index 3d87547ded..d0c0826e32 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -301,6 +301,8 @@ where Arc::new(move |hash, data| network.announce_block(hash, data)) }; + let relay_chain_slot_duration = Duration::from_secs(6); + if validator { let parachain_consensus = build_consensus( client.clone(), @@ -327,7 +329,7 @@ where parachain_consensus, import_queue, collator_key, - slot_duration: Duration::from_secs(6), + relay_chain_slot_duration, }; start_collator(params).await?; @@ -338,6 +340,8 @@ where task_manager: &mut task_manager, para_id: id, relay_chain_interface, + relay_chain_slot_duration, + import_queue, }; start_full_node(params)?; diff --git a/polkadot-parachains/src/service.rs b/polkadot-parachains/src/service.rs index c8001eef54..9f81b9af0d 100644 --- a/polkadot-parachains/src/service.rs +++ b/polkadot-parachains/src/service.rs @@ -389,6 +389,8 @@ where Arc::new(move |hash, data| network.announce_block(hash, data)) }; + let relay_chain_slot_duration = Duration::from_secs(6); + if validator { let parachain_consensus = build_consensus( client.clone(), @@ -415,7 +417,7 @@ where parachain_consensus, import_queue, collator_key, - slot_duration: Duration::from_secs(6), + relay_chain_slot_duration, }; start_collator(params).await?; @@ -426,6 +428,8 @@ where task_manager: &mut task_manager, para_id: id, relay_chain_interface, + relay_chain_slot_duration, + import_queue, }; start_full_node(params)?; @@ -576,6 +580,8 @@ where Arc::new(move |hash, data| network.announce_block(hash, data)) }; + let relay_chain_slot_duration = Duration::from_secs(6); + if validator { let parachain_consensus = build_consensus( client.clone(), @@ -602,7 +608,7 @@ where parachain_consensus, import_queue, collator_key, - slot_duration: Duration::from_secs(6), + relay_chain_slot_duration, }; start_collator(params).await?; @@ -613,6 +619,8 @@ where task_manager: &mut task_manager, para_id: id, relay_chain_interface, + relay_chain_slot_duration, + import_queue, }; start_full_node(params)?; diff --git a/test/service/Cargo.toml b/test/service/Cargo.toml index 0253713884..a6fb18f16c 100644 --- a/test/service/Cargo.toml +++ b/test/service/Cargo.toml @@ -23,7 +23,7 @@ sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "mast sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-rpc = { git = "https://github.com/paritytech/substrate", branch = "master" } -sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-service = { git = "https://github.com/paritytech/substrate", branch = "master", features = [ "wasmtime" ] } sc-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-arithmetic = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index 3839a107e4..319076be30 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -207,13 +207,13 @@ where if let Some(ref key) = collator_key { polkadot_service::IsCollator::Yes(key.clone()) } else { - polkadot_service::IsCollator::No + polkadot_service::IsCollator::Yes(CollatorPair::generate().0) }, None, ) .map_err(|e| match e { polkadot_service::Error::Sub(x) => x, - s => format!("{}", s).into(), + s => s.to_string().into(), })?; let client = params.client.clone(); @@ -325,7 +325,7 @@ where relay_chain_interface, collator_key, import_queue, - slot_duration: Duration::from_secs(6), + relay_chain_slot_duration: Duration::from_secs(6), }; start_collator(params).await?; @@ -336,6 +336,11 @@ where task_manager: &mut task_manager, para_id, relay_chain_interface, + import_queue, + // The slot duration is currently used internally only to configure + // the recovery delay of pov-recovery. We don't want to wait for too + // long on the full node to recover, so we reduce this time here. + relay_chain_slot_duration: Duration::from_millis(6), }; start_full_node(params)?;