diff --git a/substrate/bin/node-template/node/src/service.rs b/substrate/bin/node-template/node/src/service.rs index 0f09ef436a..a70dd877ff 100644 --- a/substrate/bin/node-template/node/src/service.rs +++ b/substrate/bin/node-template/node/src/service.rs @@ -179,6 +179,7 @@ pub fn new_full(mut config: Configuration) -> Result let warp_sync = Arc::new(sc_finality_grandpa::warp_proof::NetworkProvider::new( backend.clone(), grandpa_link.shared_authority_set().clone(), + Vec::default(), )); let (network, system_rpc_tx, network_starter) = @@ -409,6 +410,7 @@ pub fn new_light(mut config: Configuration) -> Result let warp_sync = Arc::new(sc_finality_grandpa::warp_proof::NetworkProvider::new( backend.clone(), grandpa_link.shared_authority_set().clone(), + Vec::default(), )); let (network, system_rpc_tx, network_starter) = diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index b1a3bd4722..ec5497ab47 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -244,6 +244,7 @@ pub fn new_full_base( let warp_sync = Arc::new(grandpa::warp_proof::NetworkProvider::new( backend.clone(), import_setup.1.shared_authority_set().clone(), + Vec::default(), )); let (network, system_rpc_tx, network_starter) = @@ -531,6 +532,7 @@ pub fn new_light_base( let warp_sync = Arc::new(grandpa::warp_proof::NetworkProvider::new( backend.clone(), grandpa_link.shared_authority_set().clone(), + Vec::default(), )); let (network, system_rpc_tx, network_starter) = diff --git a/substrate/client/api/src/in_mem.rs b/substrate/client/api/src/in_mem.rs index e8fce19f81..2f4327dfc4 100644 --- a/substrate/client/api/src/in_mem.rs +++ b/substrate/client/api/src/in_mem.rs @@ -369,6 +369,7 @@ impl HeaderBackend for Blockchain { None }, number_leaves: storage.leaves.count(), + block_gap: None, } } diff --git a/substrate/client/authority-discovery/src/worker/tests.rs b/substrate/client/authority-discovery/src/worker/tests.rs index f10d2751cc..3c1610256f 100644 --- a/substrate/client/authority-discovery/src/worker/tests.rs +++ b/substrate/client/authority-discovery/src/worker/tests.rs @@ -73,6 +73,7 @@ impl HeaderBackend for TestApi { genesis_hash: Default::default(), number_leaves: Default::default(), finalized_state: None, + block_gap: None, } } diff --git a/substrate/client/cli/src/commands/check_block_cmd.rs b/substrate/client/cli/src/commands/check_block_cmd.rs index 07a76319dc..de0d1132ce 100644 --- a/substrate/client/cli/src/commands/check_block_cmd.rs +++ b/substrate/client/cli/src/commands/check_block_cmd.rs @@ -21,7 +21,7 @@ use crate::{ params::{BlockNumberOrHash, ImportParams, SharedParams}, CliConfiguration, }; -use sc_client_api::{BlockBackend, UsageProvider}; +use sc_client_api::{BlockBackend, HeaderBackend}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use std::{fmt::Debug, str::FromStr, sync::Arc}; use structopt::StructOpt; @@ -53,7 +53,7 @@ impl CheckBlockCmd { pub async fn run(&self, client: Arc, import_queue: IQ) -> error::Result<()> where B: BlockT + for<'de> serde::Deserialize<'de>, - C: BlockBackend + UsageProvider + Send + Sync + 'static, + C: BlockBackend + HeaderBackend + Send + Sync + 'static, IQ: sc_service::ImportQueue + 'static, B::Hash: FromStr, ::Err: Debug, diff --git a/substrate/client/cli/src/commands/import_blocks_cmd.rs b/substrate/client/cli/src/commands/import_blocks_cmd.rs index 9b211b88d5..19187f2859 100644 --- a/substrate/client/cli/src/commands/import_blocks_cmd.rs +++ b/substrate/client/cli/src/commands/import_blocks_cmd.rs @@ -21,7 +21,7 @@ use crate::{ params::{ImportParams, SharedParams}, CliConfiguration, }; -use sc_client_api::UsageProvider; +use sc_client_api::HeaderBackend; use sc_service::chain_ops::import_blocks; use sp_runtime::traits::Block as BlockT; use std::{ @@ -68,7 +68,7 @@ impl ImportBlocksCmd { /// Run the import-blocks command pub async fn run(&self, client: Arc, import_queue: IQ) -> error::Result<()> where - C: UsageProvider + Send + Sync + 'static, + C: HeaderBackend + Send + Sync + 'static, B: BlockT + for<'de> serde::Deserialize<'de>, IQ: sc_service::ImportQueue + 'static, { diff --git a/substrate/client/consensus/babe/src/aux_schema.rs b/substrate/client/consensus/babe/src/aux_schema.rs index b18220c3e3..d5b8a218a5 100644 --- a/substrate/client/consensus/babe/src/aux_schema.rs +++ b/substrate/client/consensus/babe/src/aux_schema.rs @@ -23,14 +23,17 @@ use log::info; use crate::{migration::EpochV0, Epoch}; use sc_client_api::backend::AuxStore; -use sc_consensus_epochs::{migration::EpochChangesForV0, EpochChangesFor, SharedEpochChanges}; +use sc_consensus_epochs::{ + migration::{EpochChangesV0For, EpochChangesV1For}, + EpochChangesFor, SharedEpochChanges, +}; use sp_blockchain::{Error as ClientError, Result as ClientResult}; use sp_consensus_babe::{BabeBlockWeight, BabeGenesisConfiguration}; use sp_runtime::traits::Block as BlockT; const BABE_EPOCH_CHANGES_VERSION: &[u8] = b"babe_epoch_changes_version"; const BABE_EPOCH_CHANGES_KEY: &[u8] = b"babe_epoch_changes"; -const BABE_EPOCH_CHANGES_CURRENT_VERSION: u32 = 2; +const BABE_EPOCH_CHANGES_CURRENT_VERSION: u32 = 3; /// The aux storage key used to store the block weight of the given block hash. pub fn block_weight_key(block_hash: H) -> Vec { @@ -60,11 +63,16 @@ pub fn load_epoch_changes( let maybe_epoch_changes = match version { None => - load_decode::<_, EpochChangesForV0>(backend, BABE_EPOCH_CHANGES_KEY)? + load_decode::<_, EpochChangesV0For>(backend, BABE_EPOCH_CHANGES_KEY)? .map(|v0| v0.migrate().map(|_, _, epoch| epoch.migrate(config))), Some(1) => - load_decode::<_, EpochChangesFor>(backend, BABE_EPOCH_CHANGES_KEY)? - .map(|v1| v1.map(|_, _, epoch| epoch.migrate(config))), + load_decode::<_, EpochChangesV1For>(backend, BABE_EPOCH_CHANGES_KEY)? + .map(|v1| v1.migrate().map(|_, _, epoch| epoch.migrate(config))), + Some(2) => { + // v2 still uses `EpochChanges` v1 format but with a different `Epoch` type. + load_decode::<_, EpochChangesV1For>(backend, BABE_EPOCH_CHANGES_KEY)? + .map(|v2| v2.migrate()) + }, Some(BABE_EPOCH_CHANGES_CURRENT_VERSION) => load_decode::<_, EpochChangesFor>(backend, BABE_EPOCH_CHANGES_KEY)?, Some(other) => @@ -164,7 +172,7 @@ mod test { .insert_aux( &[( BABE_EPOCH_CHANGES_KEY, - &EpochChangesForV0::::from_raw(v0_tree).encode()[..], + &EpochChangesV0For::::from_raw(v0_tree).encode()[..], )], &[], ) @@ -202,6 +210,6 @@ mod test { client.insert_aux(values, &[]).unwrap(); }); - assert_eq!(load_decode::<_, u32>(&client, BABE_EPOCH_CHANGES_VERSION).unwrap(), Some(2)); + assert_eq!(load_decode::<_, u32>(&client, BABE_EPOCH_CHANGES_VERSION).unwrap(), Some(3)); } } diff --git a/substrate/client/consensus/babe/src/lib.rs b/substrate/client/consensus/babe/src/lib.rs index a0b6bde025..1fde788041 100644 --- a/substrate/client/consensus/babe/src/lib.rs +++ b/substrate/client/consensus/babe/src/lib.rs @@ -1578,8 +1578,12 @@ where *block.header.parent_hash(), next_epoch, ) - .map_err(|e| ConsensusError::ClientImport(format!("{:?}", e)))?; - + .map_err(|e| { + ConsensusError::ClientImport(format!( + "Error importing epoch changes: {:?}", + e + )) + })?; Ok(()) }; @@ -1667,6 +1671,9 @@ where Client: HeaderBackend + HeaderMetadata, { let info = client.info(); + if info.block_gap.is_none() { + epoch_changes.clear_gap(); + } let finalized_slot = { let finalized_header = client diff --git a/substrate/client/consensus/epochs/src/lib.rs b/substrate/client/consensus/epochs/src/lib.rs index f3cfc55bae..661cb900ae 100644 --- a/substrate/client/consensus/epochs/src/lib.rs +++ b/substrate/client/consensus/epochs/src/lib.rs @@ -78,11 +78,11 @@ where /// /// Once an epoch is created, it must have a known `start_slot` and `end_slot`, which cannot be /// changed. Consensus engine may modify any other data in the epoch, if needed. -pub trait Epoch { +pub trait Epoch: std::fmt::Debug { /// Descriptor for the next epoch. type NextEpochDescriptor; /// Type of the slot number. - type Slot: Ord + Copy; + type Slot: Ord + Copy + std::fmt::Debug; /// The starting slot of the epoch. fn start_slot(&self) -> Self::Slot; @@ -228,7 +228,7 @@ impl ViableEpochDescriptor { } /// Persisted epoch stored in EpochChanges. -#[derive(Clone, Encode, Decode)] +#[derive(Clone, Encode, Decode, Debug)] pub enum PersistedEpoch { /// Genesis persisted epoch data. epoch_0, epoch_1. Genesis(E, E), @@ -246,8 +246,23 @@ impl<'a, E: Epoch> From<&'a PersistedEpoch> for PersistedEpochHeader { } } +impl PersistedEpoch { + /// Map the epoch to a different type using a conversion function. + pub fn map(self, h: &Hash, n: &Number, f: &mut F) -> PersistedEpoch + where + B: Epoch, + F: FnMut(&Hash, &Number, E) -> B, + { + match self { + PersistedEpoch::Genesis(epoch_0, epoch_1) => + PersistedEpoch::Genesis(f(h, n, epoch_0), f(h, n, epoch_1)), + PersistedEpoch::Regular(epoch_n) => PersistedEpoch::Regular(f(h, n, epoch_n)), + } + } +} + /// Persisted epoch header stored in ForkTree. -#[derive(Encode, Decode, PartialEq, Eq)] +#[derive(Encode, Decode, PartialEq, Eq, Debug)] pub enum PersistedEpochHeader { /// Genesis persisted epoch header. epoch_0, epoch_1. Genesis(EpochHeader, EpochHeader), @@ -264,6 +279,25 @@ impl Clone for PersistedEpochHeader { } } +impl PersistedEpochHeader { + /// Map the epoch header to a different type. + pub fn map(self) -> PersistedEpochHeader + where + B: Epoch, + { + match self { + PersistedEpochHeader::Genesis(epoch_0, epoch_1) => PersistedEpochHeader::Genesis( + EpochHeader { start_slot: epoch_0.start_slot, end_slot: epoch_0.end_slot }, + EpochHeader { start_slot: epoch_1.start_slot, end_slot: epoch_1.end_slot }, + ), + PersistedEpochHeader::Regular(epoch_n) => PersistedEpochHeader::Regular(EpochHeader { + start_slot: epoch_n.start_slot, + end_slot: epoch_n.end_slot, + }), + } + } +} + /// A fresh, incremented epoch to import into the underlying fork-tree. /// /// Create this with `ViableEpoch::increment`. @@ -279,6 +313,106 @@ impl AsRef for IncrementedEpoch { } } +/// A pair of epochs for the gap block download validation. +/// Block gap is created after the warp sync is complete. Blocks +/// are imported both at the tip of the chain and at the start of the gap. +/// This holds a pair of epochs that are required to validate headers +/// at the start of the gap. Since gap download does not allow forks we don't +/// need to keep a tree of epochs. +#[derive(Clone, Encode, Decode, Debug)] +pub struct GapEpochs { + current: (Hash, Number, PersistedEpoch), + next: Option<(Hash, Number, E)>, +} + +impl GapEpochs +where + Hash: Copy + PartialEq + std::fmt::Debug, + Number: Copy + PartialEq + std::fmt::Debug, + E: Epoch, +{ + /// Check if given slot matches one of the gap epochs. + /// Returns epoch identifier if it does. + fn matches( + &self, + slot: E::Slot, + ) -> Option<(Hash, Number, EpochHeader, EpochIdentifierPosition)> { + match &self.current { + (_, _, PersistedEpoch::Genesis(epoch_0, _)) + if slot >= epoch_0.start_slot() && slot < epoch_0.end_slot() => + return Some(( + self.current.0, + self.current.1, + epoch_0.into(), + EpochIdentifierPosition::Genesis0, + )), + (_, _, PersistedEpoch::Genesis(_, epoch_1)) + if slot >= epoch_1.start_slot() && slot < epoch_1.end_slot() => + return Some(( + self.current.0, + self.current.1, + epoch_1.into(), + EpochIdentifierPosition::Genesis1, + )), + (_, _, PersistedEpoch::Regular(epoch_n)) + if slot >= epoch_n.start_slot() && slot < epoch_n.end_slot() => + return Some(( + self.current.0, + self.current.1, + epoch_n.into(), + EpochIdentifierPosition::Regular, + )), + _ => {}, + }; + match &self.next { + Some((h, n, epoch_n)) if slot >= epoch_n.start_slot() && slot < epoch_n.end_slot() => + Some((*h, *n, epoch_n.into(), EpochIdentifierPosition::Regular)), + _ => None, + } + } + + /// Returns epoch data if it matches given identifier. + pub fn epoch(&self, id: &EpochIdentifier) -> Option<&E> { + match (&self.current, &self.next) { + ((h, n, e), _) if h == &id.hash && n == &id.number => match e { + PersistedEpoch::Genesis(ref epoch_0, _) + if id.position == EpochIdentifierPosition::Genesis0 => + Some(epoch_0), + PersistedEpoch::Genesis(_, ref epoch_1) + if id.position == EpochIdentifierPosition::Genesis1 => + Some(epoch_1), + PersistedEpoch::Regular(ref epoch_n) + if id.position == EpochIdentifierPosition::Regular => + Some(epoch_n), + _ => None, + }, + (_, Some((h, n, e))) + if h == &id.hash && + n == &id.number && id.position == EpochIdentifierPosition::Regular => + Some(e), + _ => None, + } + } + + /// Import a new gap epoch, potentially replacing an old epoch. + fn import(&mut self, slot: E::Slot, hash: Hash, number: Number, epoch: E) -> Result<(), E> { + match (&mut self.current, &mut self.next) { + ((_, _, PersistedEpoch::Genesis(_, epoch_1)), _) if slot == epoch_1.end_slot() => { + self.next = Some((hash, number, epoch)); + Ok(()) + }, + (_, Some((_, _, epoch_n))) if slot == epoch_n.end_slot() => { + let (cur_h, cur_n, cur_epoch) = + self.next.take().expect("Already matched as `Some`"); + self.current = (cur_h, cur_n, PersistedEpoch::Regular(cur_epoch)); + self.next = Some((hash, number, epoch)); + Ok(()) + }, + _ => Err(epoch), + } + } +} + /// Tree of all epoch changes across all *seen* forks. Data stored in tree is /// the hash and block number of the block signaling the epoch change, and the /// epoch that was signalled at that block. @@ -294,10 +428,14 @@ impl AsRef for IncrementedEpoch { /// same DAG entry, pinned to a specific block #1. /// /// Further epochs (epoch_2, ..., epoch_n) each get their own entry. -#[derive(Clone, Encode, Decode)] +/// +/// Also maintains a pair of epochs for the start of the gap, +/// as long as there's an active gap download after a warp sync. +#[derive(Clone, Encode, Decode, Debug)] pub struct EpochChanges { inner: ForkTree>, epochs: BTreeMap<(Hash, Number), PersistedEpoch>, + gap: Option>, } // create a fake header hash which hasn't been included in the chain. @@ -315,14 +453,14 @@ where Number: Ord, { fn default() -> Self { - EpochChanges { inner: ForkTree::new(), epochs: BTreeMap::new() } + EpochChanges { inner: ForkTree::new(), epochs: BTreeMap::new(), gap: None } } } impl EpochChanges where - Hash: PartialEq + Ord + AsRef<[u8]> + AsMut<[u8]> + Copy, - Number: Ord + One + Zero + Add + Sub + Copy, + Hash: PartialEq + Ord + AsRef<[u8]> + AsMut<[u8]> + Copy + std::fmt::Debug, + Number: Ord + One + Zero + Add + Sub + Copy + std::fmt::Debug, { /// Create a new epoch change. pub fn new() -> Self { @@ -335,6 +473,11 @@ where self.inner.rebalance() } + /// Clear gap epochs if any. + pub fn clear_gap(&mut self) { + self.gap = None; + } + /// Map the epoch changes from one storing data to a different one. pub fn map(self, mut f: F) -> EpochChanges where @@ -342,31 +485,15 @@ where F: FnMut(&Hash, &Number, E) -> B, { EpochChanges { - inner: self.inner.map(&mut |_, _, header| match header { - PersistedEpochHeader::Genesis(epoch_0, epoch_1) => PersistedEpochHeader::Genesis( - EpochHeader { start_slot: epoch_0.start_slot, end_slot: epoch_0.end_slot }, - EpochHeader { start_slot: epoch_1.start_slot, end_slot: epoch_1.end_slot }, - ), - PersistedEpochHeader::Regular(epoch_n) => - PersistedEpochHeader::Regular(EpochHeader { - start_slot: epoch_n.start_slot, - end_slot: epoch_n.end_slot, - }), + inner: self.inner.map(&mut |_, _, header: PersistedEpochHeader| header.map()), + gap: self.gap.map(|GapEpochs { current: (h, n, header), next }| GapEpochs { + current: (h, n, header.map(&h, &n, &mut f)), + next: next.map(|(h, n, e)| (h, n, f(&h, &n, e))), }), epochs: self .epochs .into_iter() - .map(|((hash, number), epoch)| { - let bepoch = match epoch { - PersistedEpoch::Genesis(epoch_0, epoch_1) => PersistedEpoch::Genesis( - f(&hash, &number, epoch_0), - f(&hash, &number, epoch_1), - ), - PersistedEpoch::Regular(epoch_n) => - PersistedEpoch::Regular(f(&hash, &number, epoch_n)), - }; - ((hash, number), bepoch) - }) + .map(|((hash, number), epoch)| ((hash, number), epoch.map(&hash, &number, &mut f))) .collect(), } } @@ -402,6 +529,9 @@ where /// Get a reference to an epoch with given identifier. pub fn epoch(&self, id: &EpochIdentifier) -> Option<&E> { + if let Some(e) = &self.gap.as_ref().and_then(|gap| gap.epoch(id)) { + return Some(e) + } self.epochs.get(&(id.hash, id.number)).and_then(|v| match v { PersistedEpoch::Genesis(ref epoch_0, _) if id.position == EpochIdentifierPosition::Genesis0 => @@ -537,6 +667,15 @@ where return Ok(Some(ViableEpochDescriptor::UnimportedGenesis(slot))) } + if let Some(gap) = &self.gap { + if let Some((hash, number, hdr, position)) = gap.matches(slot) { + return Ok(Some(ViableEpochDescriptor::Signaled( + EpochIdentifier { position, hash, number }, + hdr, + ))) + } + } + // We want to find the deepest node in the tree which is an ancestor // of our block and where the start slot of the epoch was before the // slot of our block. The genesis special-case doesn't need to look @@ -598,13 +737,30 @@ where ) -> Result<(), fork_tree::Error> { let is_descendent_of = descendent_of_builder.build_is_descendent_of(Some((hash, parent_hash))); - let header = PersistedEpochHeader::::from(&epoch.0); + let slot = epoch.as_ref().start_slot(); + let IncrementedEpoch(mut epoch) = epoch; + let header = PersistedEpochHeader::::from(&epoch); + + if let Some(gap) = &mut self.gap { + if let PersistedEpoch::Regular(e) = epoch { + epoch = match gap.import(slot, hash.clone(), number.clone(), e) { + Ok(()) => return Ok(()), + Err(e) => PersistedEpoch::Regular(e), + } + } + } else if !self.epochs.is_empty() && matches!(epoch, PersistedEpoch::Genesis(_, _)) { + // There's a genesis epoch imported when we already have an active epoch. + // This happens after the warp sync as the ancient blocks download start. + // We need to start tracking gap epochs here. + self.gap = Some(GapEpochs { current: (hash, number, epoch), next: None }); + return Ok(()) + } let res = self.inner.import(hash, number, header, &is_descendent_of); match res { Ok(_) | Err(fork_tree::Error::Duplicate) => { - self.epochs.insert((hash, number), epoch.0); + self.epochs.insert((hash, number), epoch); Ok(()) }, Err(e) => Err(e), @@ -916,4 +1072,112 @@ mod tests { assert!(epoch_for_x_child_before_genesis.is_none()); } } + + #[test] + fn gap_epochs_advance() { + // 0 - 1 - 2 - 3 - .... 42 - 43 + let is_descendent_of = |base: &Hash, block: &Hash| -> Result { + match (base, *block) { + (b"0", _) => Ok(true), + (b"1", b) => Ok(b == *b"0"), + (b"2", b) => Ok(b == *b"1"), + (b"3", b) => Ok(b == *b"2"), + _ => Ok(false), + } + }; + + let duration = 100; + + let make_genesis = |slot| Epoch { start_slot: slot, duration }; + + let mut epoch_changes = EpochChanges::new(); + let next_descriptor = (); + + let epoch42 = Epoch { start_slot: 42, duration: 100 }; + let epoch43 = Epoch { start_slot: 43, duration: 100 }; + epoch_changes.reset(*b"0", *b"1", 4200, epoch42, epoch43); + assert!(epoch_changes.gap.is_none()); + + // Import a new genesis epoch, this should crate the gap. + let genesis_epoch_a_descriptor = epoch_changes + .epoch_descriptor_for_child_of(&is_descendent_of, b"0", 0, 100) + .unwrap() + .unwrap(); + + let incremented_epoch = epoch_changes + .viable_epoch(&genesis_epoch_a_descriptor, &make_genesis) + .unwrap() + .increment(next_descriptor.clone()); + + epoch_changes + .import(&is_descendent_of, *b"1", 1, *b"0", incremented_epoch) + .unwrap(); + assert!(epoch_changes.gap.is_some()); + + let genesis_epoch = epoch_changes + .epoch_descriptor_for_child_of(&is_descendent_of, b"0", 0, 100) + .unwrap() + .unwrap(); + + assert_eq!(genesis_epoch, ViableEpochDescriptor::UnimportedGenesis(100)); + + // Import more epochs and check that gap advances. + let import_epoch_1 = + epoch_changes.viable_epoch(&genesis_epoch, &make_genesis).unwrap().increment(()); + + let epoch_1 = import_epoch_1.as_ref().clone(); + epoch_changes + .import(&is_descendent_of, *b"1", 1, *b"0", import_epoch_1) + .unwrap(); + let genesis_epoch_data = epoch_changes.epoch_data(&genesis_epoch, &make_genesis).unwrap(); + let end_slot = genesis_epoch_data.end_slot(); + let x = epoch_changes + .epoch_data_for_child_of(&is_descendent_of, b"1", 1, end_slot, &make_genesis) + .unwrap() + .unwrap(); + + assert_eq!(x, epoch_1); + assert_eq!(epoch_changes.gap.as_ref().unwrap().current.0, *b"1"); + assert!(epoch_changes.gap.as_ref().unwrap().next.is_none()); + + let epoch_1_desriptor = epoch_changes + .epoch_descriptor_for_child_of(&is_descendent_of, b"1", 1, end_slot) + .unwrap() + .unwrap(); + let epoch_1 = epoch_changes.epoch_data(&epoch_1_desriptor, &make_genesis).unwrap(); + let import_epoch_2 = epoch_changes + .viable_epoch(&epoch_1_desriptor, &make_genesis) + .unwrap() + .increment(()); + let epoch_2 = import_epoch_2.as_ref().clone(); + epoch_changes + .import(&is_descendent_of, *b"2", 2, *b"1", import_epoch_2) + .unwrap(); + + let end_slot = epoch_1.end_slot(); + let x = epoch_changes + .epoch_data_for_child_of(&is_descendent_of, b"2", 2, end_slot, &make_genesis) + .unwrap() + .unwrap(); + assert_eq!(epoch_changes.gap.as_ref().unwrap().current.0, *b"1"); + assert_eq!(epoch_changes.gap.as_ref().unwrap().next.as_ref().unwrap().0, *b"2"); + assert_eq!(x, epoch_2); + + let epoch_2_desriptor = epoch_changes + .epoch_descriptor_for_child_of(&is_descendent_of, b"2", 2, end_slot) + .unwrap() + .unwrap(); + let import_epoch_3 = epoch_changes + .viable_epoch(&epoch_2_desriptor, &make_genesis) + .unwrap() + .increment(()); + epoch_changes + .import(&is_descendent_of, *b"3", 3, *b"2", import_epoch_3) + .unwrap(); + + assert_eq!(epoch_changes.gap.as_ref().unwrap().current.0, *b"2"); + + epoch_changes.clear_gap(); + assert!(epoch_changes.gap.is_none()); + } } diff --git a/substrate/client/consensus/epochs/src/migration.rs b/substrate/client/consensus/epochs/src/migration.rs index 49e08240df..e4b685c6ff 100644 --- a/substrate/client/consensus/epochs/src/migration.rs +++ b/substrate/client/consensus/epochs/src/migration.rs @@ -30,9 +30,19 @@ pub struct EpochChangesV0 { inner: ForkTree>, } -/// Type alias for legacy definition of epoch changes. -pub type EpochChangesForV0 = +/// Legacy definition of epoch changes. +#[derive(Clone, Encode, Decode)] +pub struct EpochChangesV1 { + inner: ForkTree>, + epochs: BTreeMap<(Hash, Number), PersistedEpoch>, +} + +/// Type alias for v0 definition of epoch changes. +pub type EpochChangesV0For = EpochChangesV0<::Hash, NumberFor, Epoch>; +/// Type alias for v1 and v2 definition of epoch changes. +pub type EpochChangesV1For = + EpochChangesV1<::Hash, NumberFor, Epoch>; impl EpochChangesV0 where @@ -54,6 +64,17 @@ where header }); - EpochChanges { inner, epochs } + EpochChanges { inner, epochs, gap: None } + } +} + +impl EpochChangesV1 +where + Hash: PartialEq + Ord + Copy, + Number: Ord + Copy, +{ + /// Migrate the type into current epoch changes definition. + pub fn migrate(self) -> EpochChanges { + EpochChanges { inner: self.inner, epochs: self.epochs, gap: None } } } diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index 66adb64c01..549ef4012a 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -502,6 +502,11 @@ impl BlockchainDb { } } + fn update_block_gap(&self, gap: Option<(NumberFor, NumberFor)>) { + let mut meta = self.meta.write(); + meta.block_gap = gap; + } + // Get block changes trie root, if available. fn changes_trie_root(&self, block: BlockId) -> ClientResult> { self.header(block).map(|header| { @@ -538,6 +543,7 @@ impl sc_client_api::blockchain::HeaderBackend for Blockcha finalized_number: meta.finalized_number, finalized_state: meta.finalized_state.clone(), number_leaves: self.leaves.read().count(), + block_gap: meta.block_gap, } } @@ -1388,9 +1394,10 @@ impl Backend { operation.apply_offchain(&mut transaction); let mut meta_updates = Vec::with_capacity(operation.finalized_blocks.len()); - let mut last_finalized_hash = self.blockchain.meta.read().finalized_hash; - let mut last_finalized_num = self.blockchain.meta.read().finalized_number; - let best_num = self.blockchain.meta.read().best_number; + let (best_num, mut last_finalized_hash, mut last_finalized_num, mut block_gap) = { + let meta = self.blockchain.meta.read(); + (meta.best_number, meta.finalized_hash, meta.finalized_number, meta.block_gap.clone()) + }; let mut changes_trie_cache_ops = None; for (block, justification) in operation.finalized_blocks { @@ -1639,6 +1646,41 @@ impl Backend { children, ); } + + if let Some((mut start, end)) = block_gap { + if number == start { + start += One::one(); + utils::insert_number_to_key_mapping( + &mut transaction, + columns::KEY_LOOKUP, + number, + hash, + )?; + } + if start > end { + transaction.remove(columns::META, meta_keys::BLOCK_GAP); + block_gap = None; + debug!(target: "db", "Removed block gap."); + } else { + block_gap = Some((start, end)); + debug!(target: "db", "Update block gap. {:?}", block_gap); + transaction.set( + columns::META, + meta_keys::BLOCK_GAP, + &(start, end).encode(), + ); + } + } else if number > best_num + One::one() && + number > One::one() && self + .blockchain + .header(BlockId::hash(parent_hash))? + .is_none() + { + let gap = (best_num + One::one(), number - One::one()); + transaction.set(columns::META, meta_keys::BLOCK_GAP, &gap.encode()); + block_gap = Some(gap); + debug!(target: "db", "Detected block gap {:?}", block_gap); + } } meta_updates.push(MetaUpdate { @@ -1716,6 +1758,7 @@ impl Backend { for m in meta_updates { self.blockchain.update_meta(m); } + self.blockchain.update_block_gap(block_gap); Ok(()) } diff --git a/substrate/client/db/src/light.rs b/substrate/client/db/src/light.rs index bf2da5c61d..48cf0489cf 100644 --- a/substrate/client/db/src/light.rs +++ b/substrate/client/db/src/light.rs @@ -157,6 +157,7 @@ where None }, number_leaves: 1, + block_gap: None, } } diff --git a/substrate/client/db/src/utils.rs b/substrate/client/db/src/utils.rs index ea22c774f4..0e895eaaf3 100644 --- a/substrate/client/db/src/utils.rs +++ b/substrate/client/db/src/utils.rs @@ -54,6 +54,8 @@ pub mod meta_keys { pub const FINALIZED_BLOCK: &[u8; 5] = b"final"; /// Last finalized state key. pub const FINALIZED_STATE: &[u8; 6] = b"fstate"; + /// Block gap. + pub const BLOCK_GAP: &[u8; 3] = b"gap"; /// Meta information prefix for list-based caches. pub const CACHE_META_PREFIX: &[u8; 5] = b"cache"; /// Meta information for changes tries key. @@ -81,6 +83,8 @@ pub struct Meta { pub genesis_hash: H, /// Finalized state, if any pub finalized_state: Option<(H, N)>, + /// Block gap, start and end inclusive, if any. + pub block_gap: Option<(N, N)>, } /// A block lookup key: used for canonical lookup from block number to hash @@ -527,6 +531,7 @@ where finalized_number: Zero::zero(), genesis_hash: Default::default(), finalized_state: None, + block_gap: None, }), }; @@ -541,7 +546,7 @@ where "Opened blockchain db, fetched {} = {:?} ({})", desc, hash, - header.number() + header.number(), ); Ok((hash, *header.number())) } else { @@ -558,6 +563,10 @@ where } else { None }; + let block_gap = db + .get(COLUMN_META, meta_keys::BLOCK_GAP) + .and_then(|d| Decode::decode(&mut d.as_slice()).ok()); + debug!(target: "db", "block_gap={:?}", block_gap); Ok(Meta { best_hash, @@ -566,6 +575,7 @@ where finalized_number, genesis_hash, finalized_state, + block_gap, }) } diff --git a/substrate/client/finality-grandpa/src/authorities.rs b/substrate/client/finality-grandpa/src/authorities.rs index 6e5dfdd05e..6eb13099aa 100644 --- a/substrate/client/finality-grandpa/src/authorities.rs +++ b/substrate/client/finality-grandpa/src/authorities.rs @@ -168,7 +168,7 @@ pub struct AuthoritySet { /// Track at which blocks the set id changed. This is useful when we need to prove finality for /// a given block since we can figure out what set the block belongs to and when the set /// started/ended. - authority_set_changes: AuthoritySetChanges, + pub(crate) authority_set_changes: AuthoritySetChanges, } impl AuthoritySet @@ -714,6 +714,17 @@ impl AuthoritySetChanges { } } + pub(crate) fn insert(&mut self, block_number: N) { + let idx = self + .0 + .binary_search_by_key(&block_number, |(_, n)| n.clone()) + .unwrap_or_else(|b| b); + + let set_id = if idx == 0 { 0 } else { self.0[idx - 1].0 + 1 }; + assert!(idx == self.0.len() || self.0[idx].0 != set_id); + self.0.insert(idx, (set_id, block_number)); + } + /// Returns an iterator over all historical authority set changes starting at the given block /// number (excluded). The iterator yields a tuple representing the set id and the block number /// of the last block in that set. @@ -1632,6 +1643,18 @@ mod tests { assert_eq!(authorities.pending_forced_changes.first().unwrap().canon_hash, "D"); } + #[test] + fn authority_set_changes_insert() { + let mut authority_set_changes = AuthoritySetChanges::empty(); + authority_set_changes.append(0, 41); + authority_set_changes.append(1, 81); + authority_set_changes.append(4, 121); + + authority_set_changes.insert(101); + assert_eq!(authority_set_changes.get_set_id(100), AuthoritySetChangeId::Set(2, 101)); + assert_eq!(authority_set_changes.get_set_id(101), AuthoritySetChangeId::Set(2, 101)); + } + #[test] fn authority_set_changes_for_complete_data() { let mut authority_set_changes = AuthoritySetChanges::empty(); diff --git a/substrate/client/finality-grandpa/src/import.rs b/substrate/client/finality-grandpa/src/import.rs index 1c4d1b4e97..d54f7234b4 100644 --- a/substrate/client/finality-grandpa/src/import.rs +++ b/substrate/client/finality-grandpa/src/import.rs @@ -551,6 +551,32 @@ where return self.import_state(block, new_cache).await } + if number <= self.inner.info().finalized_number { + // Importing an old block. Just save justifications and authority set changes + if self.check_new_change(&block.header, hash).is_some() { + if block.justifications.is_none() { + return Err(ConsensusError::ClientImport( + "Justification required when importing \ + an old block with authority set change." + .into(), + )) + } + assert!(block.justifications.is_some()); + let mut authority_set = self.authority_set.inner_locked(); + authority_set.authority_set_changes.insert(number); + crate::aux_schema::update_authority_set::( + &authority_set, + None, + |insert| { + block + .auxiliary + .extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec())))) + }, + ); + } + return (&*self.inner).import_block(block, new_cache).await + } + // on initial sync we will restrict logging under info to avoid spam. let initial_sync = block.origin == BlockOrigin::NetworkInitialSync; diff --git a/substrate/client/finality-grandpa/src/warp_proof.rs b/substrate/client/finality-grandpa/src/warp_proof.rs index 34eaa49cdf..3c1fa4892f 100644 --- a/substrate/client/finality-grandpa/src/warp_proof.rs +++ b/substrate/client/finality-grandpa/src/warp_proof.rs @@ -31,7 +31,7 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT, NumberFor, One}, }; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; /// Warp proof processing error. #[derive(Debug, derive_more::Display, derive_more::From)] @@ -194,6 +194,7 @@ impl WarpSyncProof { &self, set_id: SetId, authorities: AuthorityList, + hard_forks: &HashMap<(Block::Hash, NumberFor), (SetId, AuthorityList)>, ) -> Result<(SetId, AuthorityList), Error> where NumberFor: BlockNumberOps, @@ -202,26 +203,34 @@ impl WarpSyncProof { let mut current_authorities = authorities; for (fragment_num, proof) in self.proofs.iter().enumerate() { - proof - .justification - .verify(current_set_id, ¤t_authorities) - .map_err(|err| Error::InvalidProof(err.to_string()))?; + let hash = proof.header.hash(); + let number = *proof.header.number(); - if proof.justification.target().1 != proof.header.hash() { - return Err(Error::InvalidProof( - "Mismatch between header and justification".to_owned(), - )) - } + if let Some((set_id, list)) = hard_forks.get(&(hash.clone(), number)) { + current_set_id = *set_id; + current_authorities = list.clone(); + } else { + proof + .justification + .verify(current_set_id, ¤t_authorities) + .map_err(|err| Error::InvalidProof(err.to_string()))?; - if let Some(scheduled_change) = find_scheduled_change::(&proof.header) { - current_authorities = scheduled_change.next_authorities; - current_set_id += 1; - } else if fragment_num != self.proofs.len() - 1 || !self.is_finished { - // Only the last fragment of the last proof message is allowed to be missing - // the authority set change. - return Err(Error::InvalidProof( - "Header is missing authority set change digest".to_string(), - )) + if proof.justification.target().1 != hash { + return Err(Error::InvalidProof( + "Mismatch between header and justification".to_owned(), + )) + } + + if let Some(scheduled_change) = find_scheduled_change::(&proof.header) { + current_authorities = scheduled_change.next_authorities; + current_set_id += 1; + } else if fragment_num != self.proofs.len() - 1 || !self.is_finished { + // Only the last fragment of the last proof message is allowed to be missing the + // authority set change. + return Err(Error::InvalidProof( + "Header is missing authority set change digest".to_string(), + )) + } } } Ok((current_set_id, current_authorities)) @@ -235,6 +244,7 @@ where { backend: Arc, authority_set: SharedAuthoritySet>, + hard_forks: HashMap<(Block::Hash, NumberFor), (SetId, AuthorityList)>, } impl> NetworkProvider @@ -245,8 +255,13 @@ where pub fn new( backend: Arc, authority_set: SharedAuthoritySet>, + hard_forks: Vec<(SetId, (Block::Hash, NumberFor), AuthorityList)>, ) -> Self { - NetworkProvider { backend, authority_set } + NetworkProvider { + backend, + authority_set, + hard_forks: hard_forks.into_iter().map(|(s, hn, list)| (hn, (s, list))).collect(), + } } } @@ -283,7 +298,7 @@ where .map(|p| p.header.clone()) .ok_or_else(|| "Empty proof".to_string())?; let (next_set_id, next_authorities) = - proof.verify(set_id, authorities).map_err(Box::new)?; + proof.verify(set_id, authorities, &self.hard_forks).map_err(Box::new)?; if proof.is_finished { Ok(VerificationResult::::Complete(next_set_id, next_authorities, last_header)) } else { @@ -417,7 +432,8 @@ mod tests { WarpSyncProof::generate(&*backend, genesis_hash, &authority_set_changes).unwrap(); // verifying the proof should yield the last set id and authorities - let (new_set_id, new_authorities) = warp_sync_proof.verify(0, genesis_authorities).unwrap(); + let (new_set_id, new_authorities) = + warp_sync_proof.verify(0, genesis_authorities, &Default::default()).unwrap(); let expected_authorities = current_authorities .iter() diff --git a/substrate/client/informant/src/display.rs b/substrate/client/informant/src/display.rs index 1f23856101..6496172b80 100644 --- a/substrate/client/informant/src/display.rs +++ b/substrate/client/informant/src/display.rs @@ -20,7 +20,7 @@ use crate::OutputFormat; use ansi_term::Colour; use log::info; use sc_client_api::ClientInfo; -use sc_network::{NetworkStatus, SyncState}; +use sc_network::{NetworkStatus, SyncState, WarpSyncPhase, WarpSyncProgress}; use sp_runtime::traits::{Block as BlockT, CheckedDiv, NumberFor, Saturating, Zero}; use std::{ convert::{TryFrom, TryInto}, @@ -97,11 +97,17 @@ impl InformantDisplay { net_status.state_sync, net_status.warp_sync, ) { + ( + _, + _, + _, + Some(WarpSyncProgress { phase: WarpSyncPhase::DownloadingBlocks(n), .. }), + ) => ("⏩", "Block history".into(), format!(", #{}", n)), (_, _, _, Some(warp)) => ( "⏩", "Warping".into(), format!( - ", {}, ({:.2}) Mib", + ", {}, {:.2} Mib", warp.phase, (warp.total_bytes as f32) / (1024f32 * 1024f32) ), @@ -110,7 +116,7 @@ impl InformantDisplay { "⚙️ ", "Downloading state".into(), format!( - ", {}%, ({:.2}) Mib", + ", {}%, {:.2} Mib", state.percentage, (state.size as f32) / (1024f32 * 1024f32) ), diff --git a/substrate/client/network/src/lib.rs b/substrate/client/network/src/lib.rs index 51bc370265..2f81ddfa1f 100644 --- a/substrate/client/network/src/lib.rs +++ b/substrate/client/network/src/lib.rs @@ -328,5 +328,5 @@ pub struct NetworkStatus { /// State sync in progress. pub state_sync: Option, /// Warp sync in progress. - pub warp_sync: Option, + pub warp_sync: Option>, } diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index e22d96f32a..70a17dc44b 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -710,8 +710,7 @@ impl Protocol { match self.sync.on_state_data(&peer_id, response) { Ok(sync::OnStateData::Import(origin, block)) => CustomMessageOutcome::BlockImport(origin, vec![block]), - Ok(sync::OnStateData::Request(peer, req)) => - prepare_state_request::(&mut self.peers, peer, req), + Ok(sync::OnStateData::Continue) => CustomMessageOutcome::None, Err(sync::BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); self.peerset_handle.report_peer(id, repu); @@ -728,10 +727,7 @@ impl Protocol { response: crate::warp_request_handler::EncodedProof, ) -> CustomMessageOutcome { match self.sync.on_warp_sync_data(&peer_id, response) { - Ok(sync::OnWarpSyncData::WarpProofRequest(peer, req)) => - prepare_warp_sync_request::(&mut self.peers, peer, req), - Ok(sync::OnWarpSyncData::StateRequest(peer, req)) => - prepare_state_request::(&mut self.peers, peer, req), + Ok(()) => CustomMessageOutcome::None, Err(sync::BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); self.peerset_handle.report_peer(id, repu); diff --git a/substrate/client/network/src/protocol/sync.rs b/substrate/client/network/src/protocol/sync.rs index 07f5f76fce..7f85c2b637 100644 --- a/substrate/client/network/src/protocol/sync.rs +++ b/substrate/client/network/src/protocol/sync.rs @@ -182,6 +182,12 @@ impl Default for PendingRequests { } } +struct GapSync { + blocks: BlockCollection, + best_queued_number: NumberFor, + target: NumberFor, +} + /// The main data structure which contains all the state for a chains /// active syncing strategy. pub struct ChainSync { @@ -226,6 +232,8 @@ pub struct ChainSync { /// Enable importing existing blocks. This is used used after the state download to /// catch up to the latest state while re-importing blocks. import_existing: bool, + /// Gap download process. + gap_sync: Option>, } /// All the data we have about a Peer that we are trying to sync with @@ -298,6 +306,8 @@ pub enum PeerSyncState { DownloadingState, /// Downloading warp proof. DownloadingWarpProof, + /// Actively downloading block history after warp sync. + DownloadingGap(NumberFor), } impl PeerSyncState { @@ -326,7 +336,7 @@ pub struct StateDownloadProgress { /// Reported warp sync phase. #[derive(Clone, Eq, PartialEq, Debug)] -pub enum WarpSyncPhase { +pub enum WarpSyncPhase { /// Waiting for peers to connect. AwaitingPeers, /// Downloading and verifying grandpa warp proofs. @@ -335,24 +345,27 @@ pub enum WarpSyncPhase { DownloadingState, /// Importing state. ImportingState, + /// Downloading block history. + DownloadingBlocks(NumberFor), } -impl fmt::Display for WarpSyncPhase { +impl fmt::Display for WarpSyncPhase { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Self::AwaitingPeers => write!(f, "Waiting for peers"), Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"), Self::DownloadingState => write!(f, "Downloading state"), Self::ImportingState => write!(f, "Importing state"), + Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n), } } } /// Reported warp sync progress. #[derive(Clone, Eq, PartialEq, Debug)] -pub struct WarpSyncProgress { +pub struct WarpSyncProgress { /// Estimated download percentage. - pub phase: WarpSyncPhase, + pub phase: WarpSyncPhase, /// Total bytes downloaded so far. pub total_bytes: u64, } @@ -371,7 +384,7 @@ pub struct Status { /// State sync status in progress, if any. pub state_sync: Option, /// Warp sync in progress, if any. - pub warp_sync: Option, + pub warp_sync: Option>, } /// A peer did not behave as expected and should be reported. @@ -413,16 +426,7 @@ pub enum OnStateData { /// The block and state that should be imported. Import(BlockOrigin, IncomingBlock), /// A new state request needs to be made to the given peer. - Request(PeerId, StateRequest), -} - -/// Result of [`ChainSync::on_warp_sync_data`]. -#[derive(Debug)] -pub enum OnWarpSyncData { - /// Warp proof request is issued. - WarpProofRequest(PeerId, warp::WarpProofRequest), - /// A new state request needs to be made to the given peer. - StateRequest(PeerId, StateRequest), + Continue, } /// Result of [`ChainSync::poll_block_announce_validation`]. @@ -555,6 +559,7 @@ impl ChainSync { warp_sync: None, warp_sync_provider, import_existing: false, + gap_sync: None, }; sync.reset_sync_start_point()?; Ok(sync) @@ -608,10 +613,14 @@ impl ChainSync { SyncState::Idle }; - let warp_sync_progress = match (&self.warp_sync, &self.mode) { - (None, SyncMode::Warp) => + let warp_sync_progress = match (&self.warp_sync, &self.mode, &self.gap_sync) { + (_, _, Some(gap_sync)) => Some(WarpSyncProgress { + phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number), + total_bytes: 0, + }), + (None, SyncMode::Warp, _) => Some(WarpSyncProgress { phase: WarpSyncPhase::AwaitingPeers, total_bytes: 0 }), - (Some(sync), _) => Some(sync.progress()), + (Some(sync), _, _) => Some(sync.progress()), _ => None, }; @@ -686,17 +695,6 @@ impl ChainSync { return Ok(None) } - if let SyncMode::Warp = &self.mode { - if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none() - { - log::debug!(target: "sync", "Starting warp state sync."); - if let Some(provider) = &self.warp_sync_provider { - self.warp_sync = - Some(WarpSync::new(self.client.clone(), provider.clone())); - } - } - } - // If we are at genesis, just start downloading. let (state, req) = if self.best_queued_number.is_zero() { debug!( @@ -739,6 +737,17 @@ impl ChainSync { }, ); + if let SyncMode::Warp = &self.mode { + if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none() + { + log::debug!(target: "sync", "Starting warp state sync."); + if let Some(provider) = &self.warp_sync_provider { + self.warp_sync = + Some(WarpSync::new(self.client.clone(), provider.clone())); + } + } + } + Ok(req) }, Ok(BlockStatus::Queued) | @@ -869,10 +878,13 @@ impl ChainSync { /// Get an iterator over all block requests of all peers. pub fn block_requests(&mut self) -> impl Iterator)> + '_ { - if self.pending_requests.is_empty() || self.state_sync.is_some() || self.warp_sync.is_some() + if self.pending_requests.is_empty() || + self.state_sync.is_some() || + self.mode == SyncMode::Warp { return Either::Left(std::iter::empty()) } + if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { trace!(target: "sync", "Too many blocks in the queue."); return Either::Left(std::iter::empty()) @@ -888,6 +900,7 @@ impl ChainSync { let queue = &self.queue_blocks; let pending_requests = self.pending_requests.take(); let max_parallel = if major_sync { 1 } else { self.max_parallel_downloads }; + let gap_sync = &mut self.gap_sync; let iter = self.peers.iter_mut().filter_map(move |(id, peer)| { if !peer.state.is_available() || !pending_requests.contains(id) { return None @@ -947,6 +960,26 @@ impl ChainSync { trace!(target: "sync", "Downloading fork {:?} from {}", hash, id); peer.state = PeerSyncState::DownloadingStale(hash); Some((id, req)) + } else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| { + peer_gap_block_request( + id, + peer, + &mut sync.blocks, + attrs, + sync.target, + sync.best_queued_number, + ) + }) { + peer.state = PeerSyncState::DownloadingGap(range.start); + trace!( + target: "sync", + "New gap block request for {}, (best:{}, common:{}) {:?}", + id, + peer.best_number, + peer.common_number, + req, + ); + Some((id, req)) } else { None } @@ -966,9 +999,9 @@ impl ChainSync { } for (id, peer) in self.peers.iter_mut() { if peer.state.is_available() && peer.common_number >= sync.target_block_num() { - trace!(target: "sync", "New StateRequest for {}", id); peer.state = PeerSyncState::DownloadingState; let request = sync.next_request(); + trace!(target: "sync", "New StateRequest for {}: {:?}", id, request); return Some((*id, request)) } } @@ -982,7 +1015,7 @@ impl ChainSync { { for (id, peer) in self.peers.iter_mut() { if peer.state.is_available() && peer.best_number >= target { - trace!(target: "sync", "New StateRequest for {}", id); + trace!(target: "sync", "New StateRequest for {}: {:?}", id, request); peer.state = PeerSyncState::DownloadingState; return Some((*id, request)) } @@ -1039,6 +1072,7 @@ impl ChainSync { response: BlockResponse, ) -> Result, BadPeer> { self.downloaded_blocks += response.blocks.len(); + let mut gap = false; let new_blocks: Vec> = if let Some(peer) = self.peers.get_mut(who) { let mut blocks = response.blocks; if request @@ -1061,6 +1095,43 @@ impl ChainSync { } self.drain_blocks() }, + PeerSyncState::DownloadingGap(start_block) => { + let start_block = *start_block; + peer.state = PeerSyncState::Available; + if let Some(gap_sync) = &mut self.gap_sync { + gap_sync.blocks.clear_peer_download(who); + validate_blocks::(&blocks, who, Some(request))?; + gap_sync.blocks.insert(start_block, blocks, who.clone()); + gap = true; + gap_sync + .blocks + .drain(gap_sync.best_queued_number + One::one()) + .into_iter() + .map(|block_data| { + let justifications = block_data.block.justifications.or( + legacy_justification_mapping( + block_data.block.justification, + ), + ); + IncomingBlock { + hash: block_data.block.hash, + header: block_data.block.header, + body: block_data.block.body, + indexed_body: block_data.block.indexed_body, + justifications, + origin: block_data.origin, + allow_missing_state: true, + import_existing: self.import_existing, + skip_execution: true, + state: None, + } + }) + .collect() + } else { + debug!(target: "sync", "Unexpected gap block response from {}", who); + return Err(BadPeer(who.clone(), rep::NO_BLOCK)) + } + }, PeerSyncState::DownloadingStale(_) => { peer.state = PeerSyncState::Available; if blocks.is_empty() { @@ -1212,7 +1283,7 @@ impl ChainSync { return Err(BadPeer(*who, rep::NOT_REQUESTED)) }; - Ok(self.validate_and_queue_blocks(new_blocks)) + Ok(self.validate_and_queue_blocks(new_blocks, gap)) } /// Handle a response from the remote to a state request that we made. @@ -1223,6 +1294,11 @@ impl ChainSync { who: &PeerId, response: StateResponse, ) -> Result, BadPeer> { + if let Some(peer) = self.peers.get_mut(&who) { + if let PeerSyncState::DownloadingState = peer.state { + peer.state = PeerSyncState::Available; + } + } let import_result = if let Some(sync) = &mut self.state_sync { debug!( target: "sync", @@ -1261,11 +1337,10 @@ impl ChainSync { skip_execution: self.skip_execution(), state: Some(state), }; - debug!(target: "sync", "State sync is complete. Import is queued"); + debug!(target: "sync", "State download is complete. Import is queued"); Ok(OnStateData::Import(origin, block)) }, - state::ImportResult::Continue(request) => - Ok(OnStateData::Request(who.clone(), request)), + state::ImportResult::Continue => Ok(OnStateData::Continue), state::ImportResult::BadResponse => { debug!(target: "sync", "Bad state data received from {}", who); Err(BadPeer(*who, rep::BAD_BLOCK)) @@ -1280,7 +1355,12 @@ impl ChainSync { &mut self, who: &PeerId, response: warp::EncodedProof, - ) -> Result, BadPeer> { + ) -> Result<(), BadPeer> { + if let Some(peer) = self.peers.get_mut(&who) { + if let PeerSyncState::DownloadingWarpProof = peer.state { + peer.state = PeerSyncState::Available; + } + } let import_result = if let Some(sync) = &mut self.warp_sync { debug!( target: "sync", @@ -1295,10 +1375,7 @@ impl ChainSync { }; match import_result { - warp::WarpProofImportResult::StateRequest(request) => - Ok(OnWarpSyncData::StateRequest(*who, request)), - warp::WarpProofImportResult::WarpProofRequest(request) => - Ok(OnWarpSyncData::WarpProofRequest(*who, request)), + warp::WarpProofImportResult::Success => Ok(()), warp::WarpProofImportResult::BadResponse => { debug!(target: "sync", "Bad proof data received from {}", who); Err(BadPeer(*who, rep::BAD_BLOCK)) @@ -1309,6 +1386,7 @@ impl ChainSync { fn validate_and_queue_blocks( &mut self, mut new_blocks: Vec>, + gap: bool, ) -> OnBlockData { let orig_len = new_blocks.len(); new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash)); @@ -1320,7 +1398,7 @@ impl ChainSync { ); } - let origin = if self.status().state != SyncState::Downloading { + let origin = if !gap && self.status().state != SyncState::Downloading { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync @@ -1494,6 +1572,15 @@ impl ChainSync { self.mode = SyncMode::Full; output.extend(self.restart()); } + let gap_sync_complete = + self.gap_sync.as_ref().map_or(false, |s| s.target == number); + if gap_sync_complete { + info!( + target: "sync", + "Block history download is complete." + ); + self.gap_sync = None; + } }, Err(BlockImportError::IncompleteHeader(who)) => if let Some(peer) = who { @@ -1601,6 +1688,11 @@ impl ChainSync { if self.fork_targets.remove(&hash).is_some() { trace!(target: "sync", "Completed fork sync {:?}", hash); } + if let Some(gap_sync) = &mut self.gap_sync { + if number > gap_sync.best_queued_number && number <= gap_sync.target { + gap_sync.best_queued_number = number; + } + } if number > self.best_queued_number { self.best_queued_number = number; self.best_queued_hash = *hash; @@ -1954,6 +2046,9 @@ impl ChainSync { /// import, so this functions checks for such blocks and returns them. pub fn peer_disconnected(&mut self, who: &PeerId) -> Option> { self.blocks.clear_peer_download(who); + if let Some(gap_sync) = &mut self.gap_sync { + gap_sync.blocks.clear_peer_download(who) + } self.peers.remove(who); self.extra_justifications.peer_disconnected(who); self.pending_requests.set_all(); @@ -1963,7 +2058,7 @@ impl ChainSync { }); let blocks = self.drain_blocks(); if !blocks.is_empty() { - Some(self.validate_and_queue_blocks(blocks)) + Some(self.validate_and_queue_blocks(blocks, false)) } else { None } @@ -2043,6 +2138,14 @@ impl ChainSync { } } } + if let Some((start, end)) = info.block_gap { + debug!(target: "sync", "Starting gap sync #{} - #{}", start, end); + self.gap_sync = Some(GapSync { + best_queued_number: start - One::one(), + target: end, + blocks: BlockCollection::new(), + }); + } trace!(target: "sync", "Restarted sync at #{} ({:?})", self.best_queued_number, self.best_queued_hash); Ok(()) } @@ -2250,6 +2353,39 @@ fn peer_block_request( Some((range, request)) } +/// Get a new block request for the peer if any. +fn peer_gap_block_request( + id: &PeerId, + peer: &PeerSync, + blocks: &mut BlockCollection, + attrs: message::BlockAttributes, + target: NumberFor, + common_number: NumberFor, +) -> Option<(Range>, BlockRequest)> { + let range = blocks.needed_blocks( + id.clone(), + MAX_BLOCKS_TO_REQUEST, + std::cmp::min(peer.best_number, target), + common_number, + 1, + MAX_DOWNLOAD_AHEAD, + )?; + + // The end is not part of the range. + let last = range.end.saturating_sub(One::one()); + let from = message::FromBlock::Number(last); + + let request = message::generic::BlockRequest { + id: 0, + fields: attrs.clone(), + from, + to: None, + direction: message::Direction::Descending, + max: Some((range.end - range.start).saturated_into::()), + }; + Some((range, request)) +} + /// Get pending fork sync targets for a peer. fn fork_sync_request( id: &PeerId, diff --git a/substrate/client/network/src/protocol/sync/state.rs b/substrate/client/network/src/protocol/sync/state.rs index d2e4463f98..e644ba1013 100644 --- a/substrate/client/network/src/protocol/sync/state.rs +++ b/substrate/client/network/src/protocol/sync/state.rs @@ -47,8 +47,8 @@ pub struct StateSync { pub enum ImportResult { /// State is complete and ready for import. Import(B::Hash, B::Header, ImportedState), - /// Continue dowloading. - Continue(StateRequest), + /// Continue downloading. + Continue, /// Bad state chunk. BadResponse, } @@ -134,7 +134,7 @@ impl StateSync { ImportedState { block: self.target_block, state: std::mem::take(&mut self.state) }, ) } else { - ImportResult::Continue(self.next_request()) + ImportResult::Continue } } diff --git a/substrate/client/network/src/protocol/sync/warp.rs b/substrate/client/network/src/protocol/sync/warp.rs index 32bd5cb9ed..bbf8a28da1 100644 --- a/substrate/client/network/src/protocol/sync/warp.rs +++ b/substrate/client/network/src/protocol/sync/warp.rs @@ -37,11 +37,9 @@ enum Phase { } /// Import warp proof result. -pub enum WarpProofImportResult { - /// Start downloading state data. - StateRequest(StateRequest), - /// Continue dowloading warp sync proofs. - WarpProofRequest(WarpProofRequest), +pub enum WarpProofImportResult { + /// Import was successful. + Success, /// Bad proof. BadResponse, } @@ -69,7 +67,7 @@ impl WarpSync { Self { client, warp_sync_provider, phase, total_proof_bytes: 0 } } - /// Validate and import a state reponse. + /// Validate and import a state response. pub fn import_state(&mut self, response: StateResponse) -> ImportResult { match &mut self.phase { Phase::WarpProof { .. } => { @@ -80,19 +78,15 @@ impl WarpSync { } } - /// Validate and import a warp proof reponse. - pub fn import_warp_proof(&mut self, response: EncodedProof) -> WarpProofImportResult { + /// Validate and import a warp proof response. + pub fn import_warp_proof(&mut self, response: EncodedProof) -> WarpProofImportResult { match &mut self.phase { Phase::State(_) => { log::debug!(target: "sync", "Unexpected warp proof response"); WarpProofImportResult::BadResponse }, Phase::WarpProof { set_id, authorities, last_hash } => { - match self.warp_sync_provider.verify( - &response, - *set_id, - std::mem::take(authorities), - ) { + match self.warp_sync_provider.verify(&response, *set_id, authorities.clone()) { Err(e) => { log::debug!(target: "sync", "Bad warp proof response: {:?}", e); return WarpProofImportResult::BadResponse @@ -103,17 +97,14 @@ impl WarpSync { *authorities = new_authorities; *last_hash = new_last_hash.clone(); self.total_proof_bytes += response.0.len() as u64; - WarpProofImportResult::WarpProofRequest(WarpProofRequest { - begin: new_last_hash, - }) + WarpProofImportResult::Success }, Ok(VerificationResult::Complete(new_set_id, _, header)) => { log::debug!(target: "sync", "Verified complete proof, set_id={:?}", new_set_id); self.total_proof_bytes += response.0.len() as u64; let state_sync = StateSync::new(self.client.clone(), header, false); - let request = state_sync.next_request(); self.phase = Phase::State(state_sync); - WarpProofImportResult::StateRequest(request) + WarpProofImportResult::Success }, } }, @@ -161,7 +152,7 @@ impl WarpSync { } /// Returns state sync estimated progress (percentage, bytes) - pub fn progress(&self) -> WarpSyncProgress { + pub fn progress(&self) -> WarpSyncProgress { match &self.phase { Phase::WarpProof { .. } => WarpSyncProgress { phase: WarpSyncPhase::DownloadingWarpProofs, diff --git a/substrate/client/network/src/warp_request_handler.rs b/substrate/client/network/src/warp_request_handler.rs index 2ab95bb385..ca5a93b752 100644 --- a/substrate/client/network/src/warp_request_handler.rs +++ b/substrate/client/network/src/warp_request_handler.rs @@ -23,10 +23,11 @@ use futures::{ stream::StreamExt, }; use log::debug; -use sp_finality_grandpa::{AuthorityList, SetId}; use sp_runtime::traits::Block as BlockT; use std::{sync::Arc, time::Duration}; +pub use sp_finality_grandpa::{AuthorityList, SetId}; + /// Scale-encoded warp sync proof response. pub struct EncodedProof(pub Vec); @@ -55,7 +56,7 @@ pub trait WarpSyncProvider: Send + Sync { &self, start: B::Hash, ) -> Result>; - /// Verify warp proof agains current set of authorities. + /// Verify warp proof against current set of authorities. fn verify( &self, proof: &EncodedProof, diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index e547fa376b..fb0012aaf5 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -55,7 +55,7 @@ use sc_network::{ }, light_client_requests::{self, handler::LightClientRequestHandler}, state_request_handler::{self, StateRequestHandler}, - Multiaddr, NetworkService, NetworkWorker, + warp_request_handler, Multiaddr, NetworkService, NetworkWorker, }; use sc_service::client::Client; use sp_blockchain::{ @@ -68,6 +68,7 @@ use sp_consensus::{ }; use sp_core::H256; use sp_runtime::{ + codec::{Decode, Encode}, generic::{BlockId, OpaqueDigestItemId}, traits::{Block as BlockT, Header as HeaderT, NumberFor}, Justification, Justifications, @@ -652,6 +653,33 @@ impl VerifierAdapter { } } +struct TestWarpSyncProvider(Arc>); + +impl warp_request_handler::WarpSyncProvider for TestWarpSyncProvider { + fn generate( + &self, + _start: B::Hash, + ) -> Result> { + let info = self.0.info(); + let best_header = self.0.header(BlockId::hash(info.best_hash)).unwrap().unwrap(); + Ok(warp_request_handler::EncodedProof(best_header.encode())) + } + fn verify( + &self, + proof: &warp_request_handler::EncodedProof, + _set_id: warp_request_handler::SetId, + _authorities: warp_request_handler::AuthorityList, + ) -> Result, Box> + { + let warp_request_handler::EncodedProof(encoded) = proof; + let header = B::Header::decode(&mut encoded.as_slice()).unwrap(); + Ok(warp_request_handler::VerificationResult::Complete(0, Default::default(), header)) + } + fn current_authorities(&self) -> warp_request_handler::AuthorityList { + Default::default() + } +} + /// Configuration for a full peer. #[derive(Default)] pub struct FullPeerConfig { @@ -737,7 +765,7 @@ where (Some(keep_blocks), false) => TestClientBuilder::with_pruning_window(keep_blocks), (None, false) => TestClientBuilder::with_default_backend(), }; - if matches!(config.sync_mode, SyncMode::Fast { .. }) { + if matches!(config.sync_mode, SyncMode::Fast { .. } | SyncMode::Warp) { test_client_builder = test_client_builder.set_no_genesis(); } let backend = test_client_builder.backend(); @@ -816,6 +844,15 @@ where protocol_config }; + let warp_sync = Arc::new(TestWarpSyncProvider(client.clone())); + + let warp_protocol_config = { + let (handler, protocol_config) = + warp_request_handler::RequestHandler::new(protocol_id.clone(), warp_sync.clone()); + self.spawn_task(handler.run().boxed()); + protocol_config + }; + let network = NetworkWorker::new(sc_network::config::Params { role: if config.is_authority { Role::Authority } else { Role::Full }, executor: None, @@ -835,7 +872,7 @@ where block_request_protocol_config, state_request_protocol_config, light_client_request_protocol_config, - warp_sync: None, + warp_sync: Some((warp_sync, warp_protocol_config)), }) .unwrap(); diff --git a/substrate/client/network/test/src/sync.rs b/substrate/client/network/test/src/sync.rs index f413b705e5..f3af7f8ff6 100644 --- a/substrate/client/network/test/src/sync.rs +++ b/substrate/client/network/test/src/sync.rs @@ -1202,6 +1202,38 @@ fn syncs_indexed_blocks() { .is_some()); } +#[test] +fn warp_sync() { + sp_tracing::try_init_simple(); + let mut net = TestNet::new(0); + // Create 3 synced peers and 1 peer trying to warp sync. + net.add_full_peer_with_config(Default::default()); + net.add_full_peer_with_config(Default::default()); + net.add_full_peer_with_config(Default::default()); + net.add_full_peer_with_config(FullPeerConfig { + sync_mode: SyncMode::Warp, + ..Default::default() + }); + let gap_end = net.peer(0).push_blocks(63, false); + net.peer(0).push_blocks(1, false); + net.peer(1).push_blocks(64, false); + net.peer(2).push_blocks(64, false); + // Wait for peer 1 to sync state. + net.block_until_sync(); + assert!(!net.peer(3).client().has_state_at(&BlockId::Number(1))); + assert!(net.peer(3).client().has_state_at(&BlockId::Number(64))); + + // Wait for peer 1 download block history + block_on(futures::future::poll_fn::<(), _>(|cx| { + net.poll(cx); + if net.peer(3).has_block(&gap_end) { + Poll::Ready(()) + } else { + Poll::Pending + } + })); +} + #[test] fn syncs_huge_blocks() { use sp_core::storage::well_known_keys::HEAP_PAGES; diff --git a/substrate/client/service/src/chain_ops/check_block.rs b/substrate/client/service/src/chain_ops/check_block.rs index 4728e01454..5e2a9faaf0 100644 --- a/substrate/client/service/src/chain_ops/check_block.rs +++ b/substrate/client/service/src/chain_ops/check_block.rs @@ -19,7 +19,7 @@ use crate::error::Error; use codec::Encode; use futures::{future, prelude::*}; -use sc_client_api::{BlockBackend, UsageProvider}; +use sc_client_api::{BlockBackend, HeaderBackend}; use sc_consensus::import_queue::ImportQueue; use sp_runtime::{generic::BlockId, traits::Block as BlockT}; @@ -33,7 +33,7 @@ pub fn check_block( block_id: BlockId, ) -> Pin> + Send>> where - C: BlockBackend + UsageProvider + Send + Sync + 'static, + C: BlockBackend + HeaderBackend + Send + Sync + 'static, B: BlockT + for<'de> serde::Deserialize<'de>, IQ: ImportQueue + 'static, { diff --git a/substrate/client/service/src/chain_ops/import_blocks.rs b/substrate/client/service/src/chain_ops/import_blocks.rs index 1ba9e0bd61..a408a06a81 100644 --- a/substrate/client/service/src/chain_ops/import_blocks.rs +++ b/substrate/client/service/src/chain_ops/import_blocks.rs @@ -22,7 +22,7 @@ use futures::{future, prelude::*}; use futures_timer::Delay; use log::{info, warn}; use sc_chain_spec::ChainSpec; -use sc_client_api::UsageProvider; +use sc_client_api::HeaderBackend; use sc_consensus::import_queue::{ BlockImportError, BlockImportStatus, ImportQueue, IncomingBlock, Link, }; @@ -296,7 +296,7 @@ pub fn import_blocks( binary: bool, ) -> Pin> + Send>> where - C: UsageProvider + Send + Sync + 'static, + C: HeaderBackend + Send + Sync + 'static, B: BlockT + for<'de> serde::Deserialize<'de>, IQ: ImportQueue + 'static, { @@ -438,7 +438,7 @@ where info!( "🎉 Imported {} blocks. Best: #{}", read_block_count, - client.usage_info().chain.best_number + client.info().best_number ); return Poll::Ready(Ok(())) } else { @@ -469,7 +469,7 @@ where queue.poll_actions(cx, &mut link); - let best_number = client.usage_info().chain.best_number; + let best_number = client.info().best_number; speedometer.notify_user(best_number); if link.has_error { diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs index f7d93d036a..d35c0462b8 100644 --- a/substrate/client/service/src/client/client.rs +++ b/substrate/client/service/src/client/client.rs @@ -684,8 +684,6 @@ where .. } = import_block; - assert!(justifications.is_some() && finalized || justifications.is_none()); - if !intermediates.is_empty() { return Err(Error::IncompletePipeline) } @@ -779,11 +777,17 @@ where } let info = self.backend.blockchain().info(); + let gap_block = info + .block_gap + .map_or(false, |(start, _)| *import_headers.post().number() == start); + + assert!(justifications.is_some() && finalized || justifications.is_none() || gap_block); // the block is lower than our last finalized block so it must revert // finality, refusing import. if status == blockchain::BlockStatus::Unknown && - *import_headers.post().number() <= info.finalized_number + *import_headers.post().number() <= info.finalized_number && + !gap_block { return Err(sp_blockchain::Error::NotInFinalizedChain) } @@ -854,12 +858,13 @@ where None => None, }; - let is_new_best = finalized || - match fork_choice { - ForkChoiceStrategy::LongestChain => - import_headers.post().number() > &info.best_number, - ForkChoiceStrategy::Custom(v) => v, - }; + let is_new_best = !gap_block && + (finalized || + match fork_choice { + ForkChoiceStrategy::LongestChain => + import_headers.post().number() > &info.best_number, + ForkChoiceStrategy::Custom(v) => v, + }); let leaf_state = if finalized { NewBlockState::Final diff --git a/substrate/primitives/blockchain/src/backend.rs b/substrate/primitives/blockchain/src/backend.rs index bb34a0449b..fc70ce845d 100644 --- a/substrate/primitives/blockchain/src/backend.rs +++ b/substrate/primitives/blockchain/src/backend.rs @@ -281,6 +281,8 @@ pub struct Info { pub finalized_state: Option<(Block::Hash, <::Header as HeaderT>::Number)>, /// Number of concurrent leave forks. pub number_leaves: usize, + /// Missing blocks after warp sync. (start, end). + pub block_gap: Option<(NumberFor, NumberFor)>, } /// Block status.