Warp sync part II (#9284)

* Gap sync

* Gap epoch test

* Simplified network requests

* Update client/db/src/utils.rs

Co-authored-by: cheme <emericchevalier.pro@gmail.com>

* Fixed v1 migration and added some comments

* Next epoch is always regular

* Removed fork tree change

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Added a comment and converted assert to error

Co-authored-by: cheme <emericchevalier.pro@gmail.com>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Arkadiy Paronyan
2021-10-07 11:31:39 +02:00
committed by GitHub
parent 9f1c3acb7d
commit e6ff531d0b
29 changed files with 800 additions and 169 deletions
@@ -179,6 +179,7 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
let warp_sync = Arc::new(sc_finality_grandpa::warp_proof::NetworkProvider::new(
backend.clone(),
grandpa_link.shared_authority_set().clone(),
Vec::default(),
));
let (network, system_rpc_tx, network_starter) =
@@ -409,6 +410,7 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
let warp_sync = Arc::new(sc_finality_grandpa::warp_proof::NetworkProvider::new(
backend.clone(),
grandpa_link.shared_authority_set().clone(),
Vec::default(),
));
let (network, system_rpc_tx, network_starter) =
+2
View File
@@ -244,6 +244,7 @@ pub fn new_full_base(
let warp_sync = Arc::new(grandpa::warp_proof::NetworkProvider::new(
backend.clone(),
import_setup.1.shared_authority_set().clone(),
Vec::default(),
));
let (network, system_rpc_tx, network_starter) =
@@ -531,6 +532,7 @@ pub fn new_light_base(
let warp_sync = Arc::new(grandpa::warp_proof::NetworkProvider::new(
backend.clone(),
grandpa_link.shared_authority_set().clone(),
Vec::default(),
));
let (network, system_rpc_tx, network_starter) =
+1
View File
@@ -369,6 +369,7 @@ impl<Block: BlockT> HeaderBackend<Block> for Blockchain<Block> {
None
},
number_leaves: storage.leaves.count(),
block_gap: None,
}
}
@@ -73,6 +73,7 @@ impl<Block: BlockT> HeaderBackend<Block> for TestApi {
genesis_hash: Default::default(),
number_leaves: Default::default(),
finalized_state: None,
block_gap: None,
}
}
@@ -21,7 +21,7 @@ use crate::{
params::{BlockNumberOrHash, ImportParams, SharedParams},
CliConfiguration,
};
use sc_client_api::{BlockBackend, UsageProvider};
use sc_client_api::{BlockBackend, HeaderBackend};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use std::{fmt::Debug, str::FromStr, sync::Arc};
use structopt::StructOpt;
@@ -53,7 +53,7 @@ impl CheckBlockCmd {
pub async fn run<B, C, IQ>(&self, client: Arc<C>, import_queue: IQ) -> error::Result<()>
where
B: BlockT + for<'de> serde::Deserialize<'de>,
C: BlockBackend<B> + UsageProvider<B> + Send + Sync + 'static,
C: BlockBackend<B> + HeaderBackend<B> + Send + Sync + 'static,
IQ: sc_service::ImportQueue<B> + 'static,
B::Hash: FromStr,
<B::Hash as FromStr>::Err: Debug,
@@ -21,7 +21,7 @@ use crate::{
params::{ImportParams, SharedParams},
CliConfiguration,
};
use sc_client_api::UsageProvider;
use sc_client_api::HeaderBackend;
use sc_service::chain_ops::import_blocks;
use sp_runtime::traits::Block as BlockT;
use std::{
@@ -68,7 +68,7 @@ impl ImportBlocksCmd {
/// Run the import-blocks command
pub async fn run<B, C, IQ>(&self, client: Arc<C>, import_queue: IQ) -> error::Result<()>
where
C: UsageProvider<B> + Send + Sync + 'static,
C: HeaderBackend<B> + Send + Sync + 'static,
B: BlockT + for<'de> serde::Deserialize<'de>,
IQ: sc_service::ImportQueue<B> + 'static,
{
@@ -23,14 +23,17 @@ use log::info;
use crate::{migration::EpochV0, Epoch};
use sc_client_api::backend::AuxStore;
use sc_consensus_epochs::{migration::EpochChangesForV0, EpochChangesFor, SharedEpochChanges};
use sc_consensus_epochs::{
migration::{EpochChangesV0For, EpochChangesV1For},
EpochChangesFor, SharedEpochChanges,
};
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_consensus_babe::{BabeBlockWeight, BabeGenesisConfiguration};
use sp_runtime::traits::Block as BlockT;
const BABE_EPOCH_CHANGES_VERSION: &[u8] = b"babe_epoch_changes_version";
const BABE_EPOCH_CHANGES_KEY: &[u8] = b"babe_epoch_changes";
const BABE_EPOCH_CHANGES_CURRENT_VERSION: u32 = 2;
const BABE_EPOCH_CHANGES_CURRENT_VERSION: u32 = 3;
/// The aux storage key used to store the block weight of the given block hash.
pub fn block_weight_key<H: Encode>(block_hash: H) -> Vec<u8> {
@@ -60,11 +63,16 @@ pub fn load_epoch_changes<Block: BlockT, B: AuxStore>(
let maybe_epoch_changes = match version {
None =>
load_decode::<_, EpochChangesForV0<Block, EpochV0>>(backend, BABE_EPOCH_CHANGES_KEY)?
load_decode::<_, EpochChangesV0For<Block, EpochV0>>(backend, BABE_EPOCH_CHANGES_KEY)?
.map(|v0| v0.migrate().map(|_, _, epoch| epoch.migrate(config))),
Some(1) =>
load_decode::<_, EpochChangesFor<Block, EpochV0>>(backend, BABE_EPOCH_CHANGES_KEY)?
.map(|v1| v1.map(|_, _, epoch| epoch.migrate(config))),
load_decode::<_, EpochChangesV1For<Block, EpochV0>>(backend, BABE_EPOCH_CHANGES_KEY)?
.map(|v1| v1.migrate().map(|_, _, epoch| epoch.migrate(config))),
Some(2) => {
// v2 still uses `EpochChanges` v1 format but with a different `Epoch` type.
load_decode::<_, EpochChangesV1For<Block, Epoch>>(backend, BABE_EPOCH_CHANGES_KEY)?
.map(|v2| v2.migrate())
},
Some(BABE_EPOCH_CHANGES_CURRENT_VERSION) =>
load_decode::<_, EpochChangesFor<Block, Epoch>>(backend, BABE_EPOCH_CHANGES_KEY)?,
Some(other) =>
@@ -164,7 +172,7 @@ mod test {
.insert_aux(
&[(
BABE_EPOCH_CHANGES_KEY,
&EpochChangesForV0::<TestBlock, EpochV0>::from_raw(v0_tree).encode()[..],
&EpochChangesV0For::<TestBlock, EpochV0>::from_raw(v0_tree).encode()[..],
)],
&[],
)
@@ -202,6 +210,6 @@ mod test {
client.insert_aux(values, &[]).unwrap();
});
assert_eq!(load_decode::<_, u32>(&client, BABE_EPOCH_CHANGES_VERSION).unwrap(), Some(2));
assert_eq!(load_decode::<_, u32>(&client, BABE_EPOCH_CHANGES_VERSION).unwrap(), Some(3));
}
}
+9 -2
View File
@@ -1578,8 +1578,12 @@ where
*block.header.parent_hash(),
next_epoch,
)
.map_err(|e| ConsensusError::ClientImport(format!("{:?}", e)))?;
.map_err(|e| {
ConsensusError::ClientImport(format!(
"Error importing epoch changes: {:?}",
e
))
})?;
Ok(())
};
@@ -1667,6 +1671,9 @@ where
Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
{
let info = client.info();
if info.block_gap.is_none() {
epoch_changes.clear_gap();
}
let finalized_slot = {
let finalized_header = client
+295 -31
View File
@@ -78,11 +78,11 @@ where
///
/// Once an epoch is created, it must have a known `start_slot` and `end_slot`, which cannot be
/// changed. Consensus engine may modify any other data in the epoch, if needed.
pub trait Epoch {
pub trait Epoch: std::fmt::Debug {
/// Descriptor for the next epoch.
type NextEpochDescriptor;
/// Type of the slot number.
type Slot: Ord + Copy;
type Slot: Ord + Copy + std::fmt::Debug;
/// The starting slot of the epoch.
fn start_slot(&self) -> Self::Slot;
@@ -228,7 +228,7 @@ impl<Hash, Number, E: Epoch> ViableEpochDescriptor<Hash, Number, E> {
}
/// Persisted epoch stored in EpochChanges.
#[derive(Clone, Encode, Decode)]
#[derive(Clone, Encode, Decode, Debug)]
pub enum PersistedEpoch<E: Epoch> {
/// Genesis persisted epoch data. epoch_0, epoch_1.
Genesis(E, E),
@@ -246,8 +246,23 @@ impl<'a, E: Epoch> From<&'a PersistedEpoch<E>> for PersistedEpochHeader<E> {
}
}
impl<E: Epoch> PersistedEpoch<E> {
/// Map the epoch to a different type using a conversion function.
pub fn map<B, F, Hash, Number>(self, h: &Hash, n: &Number, f: &mut F) -> PersistedEpoch<B>
where
B: Epoch<Slot = E::Slot>,
F: FnMut(&Hash, &Number, E) -> B,
{
match self {
PersistedEpoch::Genesis(epoch_0, epoch_1) =>
PersistedEpoch::Genesis(f(h, n, epoch_0), f(h, n, epoch_1)),
PersistedEpoch::Regular(epoch_n) => PersistedEpoch::Regular(f(h, n, epoch_n)),
}
}
}
/// Persisted epoch header stored in ForkTree.
#[derive(Encode, Decode, PartialEq, Eq)]
#[derive(Encode, Decode, PartialEq, Eq, Debug)]
pub enum PersistedEpochHeader<E: Epoch> {
/// Genesis persisted epoch header. epoch_0, epoch_1.
Genesis(EpochHeader<E>, EpochHeader<E>),
@@ -264,6 +279,25 @@ impl<E: Epoch> Clone for PersistedEpochHeader<E> {
}
}
impl<E: Epoch> PersistedEpochHeader<E> {
/// Map the epoch header to a different type.
pub fn map<B>(self) -> PersistedEpochHeader<B>
where
B: Epoch<Slot = E::Slot>,
{
match self {
PersistedEpochHeader::Genesis(epoch_0, epoch_1) => PersistedEpochHeader::Genesis(
EpochHeader { start_slot: epoch_0.start_slot, end_slot: epoch_0.end_slot },
EpochHeader { start_slot: epoch_1.start_slot, end_slot: epoch_1.end_slot },
),
PersistedEpochHeader::Regular(epoch_n) => PersistedEpochHeader::Regular(EpochHeader {
start_slot: epoch_n.start_slot,
end_slot: epoch_n.end_slot,
}),
}
}
}
/// A fresh, incremented epoch to import into the underlying fork-tree.
///
/// Create this with `ViableEpoch::increment`.
@@ -279,6 +313,106 @@ impl<E: Epoch> AsRef<E> for IncrementedEpoch<E> {
}
}
/// A pair of epochs for the gap block download validation.
/// Block gap is created after the warp sync is complete. Blocks
/// are imported both at the tip of the chain and at the start of the gap.
/// This holds a pair of epochs that are required to validate headers
/// at the start of the gap. Since gap download does not allow forks we don't
/// need to keep a tree of epochs.
#[derive(Clone, Encode, Decode, Debug)]
pub struct GapEpochs<Hash, Number, E: Epoch> {
current: (Hash, Number, PersistedEpoch<E>),
next: Option<(Hash, Number, E)>,
}
impl<Hash, Number, E> GapEpochs<Hash, Number, E>
where
Hash: Copy + PartialEq + std::fmt::Debug,
Number: Copy + PartialEq + std::fmt::Debug,
E: Epoch,
{
/// Check if given slot matches one of the gap epochs.
/// Returns epoch identifier if it does.
fn matches(
&self,
slot: E::Slot,
) -> Option<(Hash, Number, EpochHeader<E>, EpochIdentifierPosition)> {
match &self.current {
(_, _, PersistedEpoch::Genesis(epoch_0, _))
if slot >= epoch_0.start_slot() && slot < epoch_0.end_slot() =>
return Some((
self.current.0,
self.current.1,
epoch_0.into(),
EpochIdentifierPosition::Genesis0,
)),
(_, _, PersistedEpoch::Genesis(_, epoch_1))
if slot >= epoch_1.start_slot() && slot < epoch_1.end_slot() =>
return Some((
self.current.0,
self.current.1,
epoch_1.into(),
EpochIdentifierPosition::Genesis1,
)),
(_, _, PersistedEpoch::Regular(epoch_n))
if slot >= epoch_n.start_slot() && slot < epoch_n.end_slot() =>
return Some((
self.current.0,
self.current.1,
epoch_n.into(),
EpochIdentifierPosition::Regular,
)),
_ => {},
};
match &self.next {
Some((h, n, epoch_n)) if slot >= epoch_n.start_slot() && slot < epoch_n.end_slot() =>
Some((*h, *n, epoch_n.into(), EpochIdentifierPosition::Regular)),
_ => None,
}
}
/// Returns epoch data if it matches given identifier.
pub fn epoch(&self, id: &EpochIdentifier<Hash, Number>) -> Option<&E> {
match (&self.current, &self.next) {
((h, n, e), _) if h == &id.hash && n == &id.number => match e {
PersistedEpoch::Genesis(ref epoch_0, _)
if id.position == EpochIdentifierPosition::Genesis0 =>
Some(epoch_0),
PersistedEpoch::Genesis(_, ref epoch_1)
if id.position == EpochIdentifierPosition::Genesis1 =>
Some(epoch_1),
PersistedEpoch::Regular(ref epoch_n)
if id.position == EpochIdentifierPosition::Regular =>
Some(epoch_n),
_ => None,
},
(_, Some((h, n, e)))
if h == &id.hash &&
n == &id.number && id.position == EpochIdentifierPosition::Regular =>
Some(e),
_ => None,
}
}
/// Import a new gap epoch, potentially replacing an old epoch.
fn import(&mut self, slot: E::Slot, hash: Hash, number: Number, epoch: E) -> Result<(), E> {
match (&mut self.current, &mut self.next) {
((_, _, PersistedEpoch::Genesis(_, epoch_1)), _) if slot == epoch_1.end_slot() => {
self.next = Some((hash, number, epoch));
Ok(())
},
(_, Some((_, _, epoch_n))) if slot == epoch_n.end_slot() => {
let (cur_h, cur_n, cur_epoch) =
self.next.take().expect("Already matched as `Some`");
self.current = (cur_h, cur_n, PersistedEpoch::Regular(cur_epoch));
self.next = Some((hash, number, epoch));
Ok(())
},
_ => Err(epoch),
}
}
}
/// Tree of all epoch changes across all *seen* forks. Data stored in tree is
/// the hash and block number of the block signaling the epoch change, and the
/// epoch that was signalled at that block.
@@ -294,10 +428,14 @@ impl<E: Epoch> AsRef<E> for IncrementedEpoch<E> {
/// same DAG entry, pinned to a specific block #1.
///
/// Further epochs (epoch_2, ..., epoch_n) each get their own entry.
#[derive(Clone, Encode, Decode)]
///
/// Also maintains a pair of epochs for the start of the gap,
/// as long as there's an active gap download after a warp sync.
#[derive(Clone, Encode, Decode, Debug)]
pub struct EpochChanges<Hash, Number, E: Epoch> {
inner: ForkTree<Hash, Number, PersistedEpochHeader<E>>,
epochs: BTreeMap<(Hash, Number), PersistedEpoch<E>>,
gap: Option<GapEpochs<Hash, Number, E>>,
}
// create a fake header hash which hasn't been included in the chain.
@@ -315,14 +453,14 @@ where
Number: Ord,
{
fn default() -> Self {
EpochChanges { inner: ForkTree::new(), epochs: BTreeMap::new() }
EpochChanges { inner: ForkTree::new(), epochs: BTreeMap::new(), gap: None }
}
}
impl<Hash, Number, E: Epoch> EpochChanges<Hash, Number, E>
where
Hash: PartialEq + Ord + AsRef<[u8]> + AsMut<[u8]> + Copy,
Number: Ord + One + Zero + Add<Output = Number> + Sub<Output = Number> + Copy,
Hash: PartialEq + Ord + AsRef<[u8]> + AsMut<[u8]> + Copy + std::fmt::Debug,
Number: Ord + One + Zero + Add<Output = Number> + Sub<Output = Number> + Copy + std::fmt::Debug,
{
/// Create a new epoch change.
pub fn new() -> Self {
@@ -335,6 +473,11 @@ where
self.inner.rebalance()
}
/// Clear gap epochs if any.
pub fn clear_gap(&mut self) {
self.gap = None;
}
/// Map the epoch changes from one storing data to a different one.
pub fn map<B, F>(self, mut f: F) -> EpochChanges<Hash, Number, B>
where
@@ -342,31 +485,15 @@ where
F: FnMut(&Hash, &Number, E) -> B,
{
EpochChanges {
inner: self.inner.map(&mut |_, _, header| match header {
PersistedEpochHeader::Genesis(epoch_0, epoch_1) => PersistedEpochHeader::Genesis(
EpochHeader { start_slot: epoch_0.start_slot, end_slot: epoch_0.end_slot },
EpochHeader { start_slot: epoch_1.start_slot, end_slot: epoch_1.end_slot },
),
PersistedEpochHeader::Regular(epoch_n) =>
PersistedEpochHeader::Regular(EpochHeader {
start_slot: epoch_n.start_slot,
end_slot: epoch_n.end_slot,
}),
inner: self.inner.map(&mut |_, _, header: PersistedEpochHeader<E>| header.map()),
gap: self.gap.map(|GapEpochs { current: (h, n, header), next }| GapEpochs {
current: (h, n, header.map(&h, &n, &mut f)),
next: next.map(|(h, n, e)| (h, n, f(&h, &n, e))),
}),
epochs: self
.epochs
.into_iter()
.map(|((hash, number), epoch)| {
let bepoch = match epoch {
PersistedEpoch::Genesis(epoch_0, epoch_1) => PersistedEpoch::Genesis(
f(&hash, &number, epoch_0),
f(&hash, &number, epoch_1),
),
PersistedEpoch::Regular(epoch_n) =>
PersistedEpoch::Regular(f(&hash, &number, epoch_n)),
};
((hash, number), bepoch)
})
.map(|((hash, number), epoch)| ((hash, number), epoch.map(&hash, &number, &mut f)))
.collect(),
}
}
@@ -402,6 +529,9 @@ where
/// Get a reference to an epoch with given identifier.
pub fn epoch(&self, id: &EpochIdentifier<Hash, Number>) -> Option<&E> {
if let Some(e) = &self.gap.as_ref().and_then(|gap| gap.epoch(id)) {
return Some(e)
}
self.epochs.get(&(id.hash, id.number)).and_then(|v| match v {
PersistedEpoch::Genesis(ref epoch_0, _)
if id.position == EpochIdentifierPosition::Genesis0 =>
@@ -537,6 +667,15 @@ where
return Ok(Some(ViableEpochDescriptor::UnimportedGenesis(slot)))
}
if let Some(gap) = &self.gap {
if let Some((hash, number, hdr, position)) = gap.matches(slot) {
return Ok(Some(ViableEpochDescriptor::Signaled(
EpochIdentifier { position, hash, number },
hdr,
)))
}
}
// We want to find the deepest node in the tree which is an ancestor
// of our block and where the start slot of the epoch was before the
// slot of our block. The genesis special-case doesn't need to look
@@ -598,13 +737,30 @@ where
) -> Result<(), fork_tree::Error<D::Error>> {
let is_descendent_of =
descendent_of_builder.build_is_descendent_of(Some((hash, parent_hash)));
let header = PersistedEpochHeader::<E>::from(&epoch.0);
let slot = epoch.as_ref().start_slot();
let IncrementedEpoch(mut epoch) = epoch;
let header = PersistedEpochHeader::<E>::from(&epoch);
if let Some(gap) = &mut self.gap {
if let PersistedEpoch::Regular(e) = epoch {
epoch = match gap.import(slot, hash.clone(), number.clone(), e) {
Ok(()) => return Ok(()),
Err(e) => PersistedEpoch::Regular(e),
}
}
} else if !self.epochs.is_empty() && matches!(epoch, PersistedEpoch::Genesis(_, _)) {
// There's a genesis epoch imported when we already have an active epoch.
// This happens after the warp sync as the ancient blocks download start.
// We need to start tracking gap epochs here.
self.gap = Some(GapEpochs { current: (hash, number, epoch), next: None });
return Ok(())
}
let res = self.inner.import(hash, number, header, &is_descendent_of);
match res {
Ok(_) | Err(fork_tree::Error::Duplicate) => {
self.epochs.insert((hash, number), epoch.0);
self.epochs.insert((hash, number), epoch);
Ok(())
},
Err(e) => Err(e),
@@ -916,4 +1072,112 @@ mod tests {
assert!(epoch_for_x_child_before_genesis.is_none());
}
}
#[test]
fn gap_epochs_advance() {
// 0 - 1 - 2 - 3 - .... 42 - 43
let is_descendent_of = |base: &Hash, block: &Hash| -> Result<bool, TestError> {
match (base, *block) {
(b"0", _) => Ok(true),
(b"1", b) => Ok(b == *b"0"),
(b"2", b) => Ok(b == *b"1"),
(b"3", b) => Ok(b == *b"2"),
_ => Ok(false),
}
};
let duration = 100;
let make_genesis = |slot| Epoch { start_slot: slot, duration };
let mut epoch_changes = EpochChanges::new();
let next_descriptor = ();
let epoch42 = Epoch { start_slot: 42, duration: 100 };
let epoch43 = Epoch { start_slot: 43, duration: 100 };
epoch_changes.reset(*b"0", *b"1", 4200, epoch42, epoch43);
assert!(epoch_changes.gap.is_none());
// Import a new genesis epoch, this should crate the gap.
let genesis_epoch_a_descriptor = epoch_changes
.epoch_descriptor_for_child_of(&is_descendent_of, b"0", 0, 100)
.unwrap()
.unwrap();
let incremented_epoch = epoch_changes
.viable_epoch(&genesis_epoch_a_descriptor, &make_genesis)
.unwrap()
.increment(next_descriptor.clone());
epoch_changes
.import(&is_descendent_of, *b"1", 1, *b"0", incremented_epoch)
.unwrap();
assert!(epoch_changes.gap.is_some());
let genesis_epoch = epoch_changes
.epoch_descriptor_for_child_of(&is_descendent_of, b"0", 0, 100)
.unwrap()
.unwrap();
assert_eq!(genesis_epoch, ViableEpochDescriptor::UnimportedGenesis(100));
// Import more epochs and check that gap advances.
let import_epoch_1 =
epoch_changes.viable_epoch(&genesis_epoch, &make_genesis).unwrap().increment(());
let epoch_1 = import_epoch_1.as_ref().clone();
epoch_changes
.import(&is_descendent_of, *b"1", 1, *b"0", import_epoch_1)
.unwrap();
let genesis_epoch_data = epoch_changes.epoch_data(&genesis_epoch, &make_genesis).unwrap();
let end_slot = genesis_epoch_data.end_slot();
let x = epoch_changes
.epoch_data_for_child_of(&is_descendent_of, b"1", 1, end_slot, &make_genesis)
.unwrap()
.unwrap();
assert_eq!(x, epoch_1);
assert_eq!(epoch_changes.gap.as_ref().unwrap().current.0, *b"1");
assert!(epoch_changes.gap.as_ref().unwrap().next.is_none());
let epoch_1_desriptor = epoch_changes
.epoch_descriptor_for_child_of(&is_descendent_of, b"1", 1, end_slot)
.unwrap()
.unwrap();
let epoch_1 = epoch_changes.epoch_data(&epoch_1_desriptor, &make_genesis).unwrap();
let import_epoch_2 = epoch_changes
.viable_epoch(&epoch_1_desriptor, &make_genesis)
.unwrap()
.increment(());
let epoch_2 = import_epoch_2.as_ref().clone();
epoch_changes
.import(&is_descendent_of, *b"2", 2, *b"1", import_epoch_2)
.unwrap();
let end_slot = epoch_1.end_slot();
let x = epoch_changes
.epoch_data_for_child_of(&is_descendent_of, b"2", 2, end_slot, &make_genesis)
.unwrap()
.unwrap();
assert_eq!(epoch_changes.gap.as_ref().unwrap().current.0, *b"1");
assert_eq!(epoch_changes.gap.as_ref().unwrap().next.as_ref().unwrap().0, *b"2");
assert_eq!(x, epoch_2);
let epoch_2_desriptor = epoch_changes
.epoch_descriptor_for_child_of(&is_descendent_of, b"2", 2, end_slot)
.unwrap()
.unwrap();
let import_epoch_3 = epoch_changes
.viable_epoch(&epoch_2_desriptor, &make_genesis)
.unwrap()
.increment(());
epoch_changes
.import(&is_descendent_of, *b"3", 3, *b"2", import_epoch_3)
.unwrap();
assert_eq!(epoch_changes.gap.as_ref().unwrap().current.0, *b"2");
epoch_changes.clear_gap();
assert!(epoch_changes.gap.is_none());
}
}
@@ -30,9 +30,19 @@ pub struct EpochChangesV0<Hash, Number, E: Epoch> {
inner: ForkTree<Hash, Number, PersistedEpoch<E>>,
}
/// Type alias for legacy definition of epoch changes.
pub type EpochChangesForV0<Block, Epoch> =
/// Legacy definition of epoch changes.
#[derive(Clone, Encode, Decode)]
pub struct EpochChangesV1<Hash, Number, E: Epoch> {
inner: ForkTree<Hash, Number, PersistedEpochHeader<E>>,
epochs: BTreeMap<(Hash, Number), PersistedEpoch<E>>,
}
/// Type alias for v0 definition of epoch changes.
pub type EpochChangesV0For<Block, Epoch> =
EpochChangesV0<<Block as BlockT>::Hash, NumberFor<Block>, Epoch>;
/// Type alias for v1 and v2 definition of epoch changes.
pub type EpochChangesV1For<Block, Epoch> =
EpochChangesV1<<Block as BlockT>::Hash, NumberFor<Block>, Epoch>;
impl<Hash, Number, E: Epoch> EpochChangesV0<Hash, Number, E>
where
@@ -54,6 +64,17 @@ where
header
});
EpochChanges { inner, epochs }
EpochChanges { inner, epochs, gap: None }
}
}
impl<Hash, Number, E: Epoch> EpochChangesV1<Hash, Number, E>
where
Hash: PartialEq + Ord + Copy,
Number: Ord + Copy,
{
/// Migrate the type into current epoch changes definition.
pub fn migrate(self) -> EpochChanges<Hash, Number, E> {
EpochChanges { inner: self.inner, epochs: self.epochs, gap: None }
}
}
+46 -3
View File
@@ -502,6 +502,11 @@ impl<Block: BlockT> BlockchainDb<Block> {
}
}
fn update_block_gap(&self, gap: Option<(NumberFor<Block>, NumberFor<Block>)>) {
let mut meta = self.meta.write();
meta.block_gap = gap;
}
// Get block changes trie root, if available.
fn changes_trie_root(&self, block: BlockId<Block>) -> ClientResult<Option<Block::Hash>> {
self.header(block).map(|header| {
@@ -538,6 +543,7 @@ impl<Block: BlockT> sc_client_api::blockchain::HeaderBackend<Block> for Blockcha
finalized_number: meta.finalized_number,
finalized_state: meta.finalized_state.clone(),
number_leaves: self.leaves.read().count(),
block_gap: meta.block_gap,
}
}
@@ -1388,9 +1394,10 @@ impl<Block: BlockT> Backend<Block> {
operation.apply_offchain(&mut transaction);
let mut meta_updates = Vec::with_capacity(operation.finalized_blocks.len());
let mut last_finalized_hash = self.blockchain.meta.read().finalized_hash;
let mut last_finalized_num = self.blockchain.meta.read().finalized_number;
let best_num = self.blockchain.meta.read().best_number;
let (best_num, mut last_finalized_hash, mut last_finalized_num, mut block_gap) = {
let meta = self.blockchain.meta.read();
(meta.best_number, meta.finalized_hash, meta.finalized_number, meta.block_gap.clone())
};
let mut changes_trie_cache_ops = None;
for (block, justification) in operation.finalized_blocks {
@@ -1639,6 +1646,41 @@ impl<Block: BlockT> Backend<Block> {
children,
);
}
if let Some((mut start, end)) = block_gap {
if number == start {
start += One::one();
utils::insert_number_to_key_mapping(
&mut transaction,
columns::KEY_LOOKUP,
number,
hash,
)?;
}
if start > end {
transaction.remove(columns::META, meta_keys::BLOCK_GAP);
block_gap = None;
debug!(target: "db", "Removed block gap.");
} else {
block_gap = Some((start, end));
debug!(target: "db", "Update block gap. {:?}", block_gap);
transaction.set(
columns::META,
meta_keys::BLOCK_GAP,
&(start, end).encode(),
);
}
} else if number > best_num + One::one() &&
number > One::one() && self
.blockchain
.header(BlockId::hash(parent_hash))?
.is_none()
{
let gap = (best_num + One::one(), number - One::one());
transaction.set(columns::META, meta_keys::BLOCK_GAP, &gap.encode());
block_gap = Some(gap);
debug!(target: "db", "Detected block gap {:?}", block_gap);
}
}
meta_updates.push(MetaUpdate {
@@ -1716,6 +1758,7 @@ impl<Block: BlockT> Backend<Block> {
for m in meta_updates {
self.blockchain.update_meta(m);
}
self.blockchain.update_block_gap(block_gap);
Ok(())
}
+1
View File
@@ -157,6 +157,7 @@ where
None
},
number_leaves: 1,
block_gap: None,
}
}
+11 -1
View File
@@ -54,6 +54,8 @@ pub mod meta_keys {
pub const FINALIZED_BLOCK: &[u8; 5] = b"final";
/// Last finalized state key.
pub const FINALIZED_STATE: &[u8; 6] = b"fstate";
/// Block gap.
pub const BLOCK_GAP: &[u8; 3] = b"gap";
/// Meta information prefix for list-based caches.
pub const CACHE_META_PREFIX: &[u8; 5] = b"cache";
/// Meta information for changes tries key.
@@ -81,6 +83,8 @@ pub struct Meta<N, H> {
pub genesis_hash: H,
/// Finalized state, if any
pub finalized_state: Option<(H, N)>,
/// Block gap, start and end inclusive, if any.
pub block_gap: Option<(N, N)>,
}
/// A block lookup key: used for canonical lookup from block number to hash
@@ -527,6 +531,7 @@ where
finalized_number: Zero::zero(),
genesis_hash: Default::default(),
finalized_state: None,
block_gap: None,
}),
};
@@ -541,7 +546,7 @@ where
"Opened blockchain db, fetched {} = {:?} ({})",
desc,
hash,
header.number()
header.number(),
);
Ok((hash, *header.number()))
} else {
@@ -558,6 +563,10 @@ where
} else {
None
};
let block_gap = db
.get(COLUMN_META, meta_keys::BLOCK_GAP)
.and_then(|d| Decode::decode(&mut d.as_slice()).ok());
debug!(target: "db", "block_gap={:?}", block_gap);
Ok(Meta {
best_hash,
@@ -566,6 +575,7 @@ where
finalized_number,
genesis_hash,
finalized_state,
block_gap,
})
}
@@ -168,7 +168,7 @@ pub struct AuthoritySet<H, N> {
/// Track at which blocks the set id changed. This is useful when we need to prove finality for
/// a given block since we can figure out what set the block belongs to and when the set
/// started/ended.
authority_set_changes: AuthoritySetChanges<N>,
pub(crate) authority_set_changes: AuthoritySetChanges<N>,
}
impl<H, N> AuthoritySet<H, N>
@@ -714,6 +714,17 @@ impl<N: Ord + Clone> AuthoritySetChanges<N> {
}
}
pub(crate) fn insert(&mut self, block_number: N) {
let idx = self
.0
.binary_search_by_key(&block_number, |(_, n)| n.clone())
.unwrap_or_else(|b| b);
let set_id = if idx == 0 { 0 } else { self.0[idx - 1].0 + 1 };
assert!(idx == self.0.len() || self.0[idx].0 != set_id);
self.0.insert(idx, (set_id, block_number));
}
/// Returns an iterator over all historical authority set changes starting at the given block
/// number (excluded). The iterator yields a tuple representing the set id and the block number
/// of the last block in that set.
@@ -1632,6 +1643,18 @@ mod tests {
assert_eq!(authorities.pending_forced_changes.first().unwrap().canon_hash, "D");
}
#[test]
fn authority_set_changes_insert() {
let mut authority_set_changes = AuthoritySetChanges::empty();
authority_set_changes.append(0, 41);
authority_set_changes.append(1, 81);
authority_set_changes.append(4, 121);
authority_set_changes.insert(101);
assert_eq!(authority_set_changes.get_set_id(100), AuthoritySetChangeId::Set(2, 101));
assert_eq!(authority_set_changes.get_set_id(101), AuthoritySetChangeId::Set(2, 101));
}
#[test]
fn authority_set_changes_for_complete_data() {
let mut authority_set_changes = AuthoritySetChanges::empty();
@@ -551,6 +551,32 @@ where
return self.import_state(block, new_cache).await
}
if number <= self.inner.info().finalized_number {
// Importing an old block. Just save justifications and authority set changes
if self.check_new_change(&block.header, hash).is_some() {
if block.justifications.is_none() {
return Err(ConsensusError::ClientImport(
"Justification required when importing \
an old block with authority set change."
.into(),
))
}
assert!(block.justifications.is_some());
let mut authority_set = self.authority_set.inner_locked();
authority_set.authority_set_changes.insert(number);
crate::aux_schema::update_authority_set::<Block, _, _>(
&authority_set,
None,
|insert| {
block
.auxiliary
.extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
},
);
}
return (&*self.inner).import_block(block, new_cache).await
}
// on initial sync we will restrict logging under info to avoid spam.
let initial_sync = block.origin == BlockOrigin::NetworkInitialSync;
@@ -31,7 +31,7 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, NumberFor, One},
};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
/// Warp proof processing error.
#[derive(Debug, derive_more::Display, derive_more::From)]
@@ -194,6 +194,7 @@ impl<Block: BlockT> WarpSyncProof<Block> {
&self,
set_id: SetId,
authorities: AuthorityList,
hard_forks: &HashMap<(Block::Hash, NumberFor<Block>), (SetId, AuthorityList)>,
) -> Result<(SetId, AuthorityList), Error>
where
NumberFor<Block>: BlockNumberOps,
@@ -202,26 +203,34 @@ impl<Block: BlockT> WarpSyncProof<Block> {
let mut current_authorities = authorities;
for (fragment_num, proof) in self.proofs.iter().enumerate() {
proof
.justification
.verify(current_set_id, &current_authorities)
.map_err(|err| Error::InvalidProof(err.to_string()))?;
let hash = proof.header.hash();
let number = *proof.header.number();
if proof.justification.target().1 != proof.header.hash() {
return Err(Error::InvalidProof(
"Mismatch between header and justification".to_owned(),
))
}
if let Some((set_id, list)) = hard_forks.get(&(hash.clone(), number)) {
current_set_id = *set_id;
current_authorities = list.clone();
} else {
proof
.justification
.verify(current_set_id, &current_authorities)
.map_err(|err| Error::InvalidProof(err.to_string()))?;
if let Some(scheduled_change) = find_scheduled_change::<Block>(&proof.header) {
current_authorities = scheduled_change.next_authorities;
current_set_id += 1;
} else if fragment_num != self.proofs.len() - 1 || !self.is_finished {
// Only the last fragment of the last proof message is allowed to be missing
// the authority set change.
return Err(Error::InvalidProof(
"Header is missing authority set change digest".to_string(),
))
if proof.justification.target().1 != hash {
return Err(Error::InvalidProof(
"Mismatch between header and justification".to_owned(),
))
}
if let Some(scheduled_change) = find_scheduled_change::<Block>(&proof.header) {
current_authorities = scheduled_change.next_authorities;
current_set_id += 1;
} else if fragment_num != self.proofs.len() - 1 || !self.is_finished {
// Only the last fragment of the last proof message is allowed to be missing the
// authority set change.
return Err(Error::InvalidProof(
"Header is missing authority set change digest".to_string(),
))
}
}
}
Ok((current_set_id, current_authorities))
@@ -235,6 +244,7 @@ where
{
backend: Arc<Backend>,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
hard_forks: HashMap<(Block::Hash, NumberFor<Block>), (SetId, AuthorityList)>,
}
impl<Block: BlockT, Backend: ClientBackend<Block>> NetworkProvider<Block, Backend>
@@ -245,8 +255,13 @@ where
pub fn new(
backend: Arc<Backend>,
authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
hard_forks: Vec<(SetId, (Block::Hash, NumberFor<Block>), AuthorityList)>,
) -> Self {
NetworkProvider { backend, authority_set }
NetworkProvider {
backend,
authority_set,
hard_forks: hard_forks.into_iter().map(|(s, hn, list)| (hn, (s, list))).collect(),
}
}
}
@@ -283,7 +298,7 @@ where
.map(|p| p.header.clone())
.ok_or_else(|| "Empty proof".to_string())?;
let (next_set_id, next_authorities) =
proof.verify(set_id, authorities).map_err(Box::new)?;
proof.verify(set_id, authorities, &self.hard_forks).map_err(Box::new)?;
if proof.is_finished {
Ok(VerificationResult::<Block>::Complete(next_set_id, next_authorities, last_header))
} else {
@@ -417,7 +432,8 @@ mod tests {
WarpSyncProof::generate(&*backend, genesis_hash, &authority_set_changes).unwrap();
// verifying the proof should yield the last set id and authorities
let (new_set_id, new_authorities) = warp_sync_proof.verify(0, genesis_authorities).unwrap();
let (new_set_id, new_authorities) =
warp_sync_proof.verify(0, genesis_authorities, &Default::default()).unwrap();
let expected_authorities = current_authorities
.iter()
+9 -3
View File
@@ -20,7 +20,7 @@ use crate::OutputFormat;
use ansi_term::Colour;
use log::info;
use sc_client_api::ClientInfo;
use sc_network::{NetworkStatus, SyncState};
use sc_network::{NetworkStatus, SyncState, WarpSyncPhase, WarpSyncProgress};
use sp_runtime::traits::{Block as BlockT, CheckedDiv, NumberFor, Saturating, Zero};
use std::{
convert::{TryFrom, TryInto},
@@ -97,11 +97,17 @@ impl<B: BlockT> InformantDisplay<B> {
net_status.state_sync,
net_status.warp_sync,
) {
(
_,
_,
_,
Some(WarpSyncProgress { phase: WarpSyncPhase::DownloadingBlocks(n), .. }),
) => ("", "Block history".into(), format!(", #{}", n)),
(_, _, _, Some(warp)) => (
"",
"Warping".into(),
format!(
", {}, ({:.2}) Mib",
", {}, {:.2} Mib",
warp.phase,
(warp.total_bytes as f32) / (1024f32 * 1024f32)
),
@@ -110,7 +116,7 @@ impl<B: BlockT> InformantDisplay<B> {
"⚙️ ",
"Downloading state".into(),
format!(
", {}%, ({:.2}) Mib",
", {}%, {:.2} Mib",
state.percentage,
(state.size as f32) / (1024f32 * 1024f32)
),
+1 -1
View File
@@ -328,5 +328,5 @@ pub struct NetworkStatus<B: BlockT> {
/// State sync in progress.
pub state_sync: Option<protocol::sync::StateDownloadProgress>,
/// Warp sync in progress.
pub warp_sync: Option<protocol::sync::WarpSyncProgress>,
pub warp_sync: Option<protocol::sync::WarpSyncProgress<B>>,
}
+2 -6
View File
@@ -710,8 +710,7 @@ impl<B: BlockT> Protocol<B> {
match self.sync.on_state_data(&peer_id, response) {
Ok(sync::OnStateData::Import(origin, block)) =>
CustomMessageOutcome::BlockImport(origin, vec![block]),
Ok(sync::OnStateData::Request(peer, req)) =>
prepare_state_request::<B>(&mut self.peers, peer, req),
Ok(sync::OnStateData::Continue) => CustomMessageOutcome::None,
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
@@ -728,10 +727,7 @@ impl<B: BlockT> Protocol<B> {
response: crate::warp_request_handler::EncodedProof,
) -> CustomMessageOutcome<B> {
match self.sync.on_warp_sync_data(&peer_id, response) {
Ok(sync::OnWarpSyncData::WarpProofRequest(peer, req)) =>
prepare_warp_sync_request::<B>(&mut self.peers, peer, req),
Ok(sync::OnWarpSyncData::StateRequest(peer, req)) =>
prepare_state_request::<B>(&mut self.peers, peer, req),
Ok(()) => CustomMessageOutcome::None,
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
+179 -43
View File
@@ -182,6 +182,12 @@ impl Default for PendingRequests {
}
}
struct GapSync<B: BlockT> {
blocks: BlockCollection<B>,
best_queued_number: NumberFor<B>,
target: NumberFor<B>,
}
/// The main data structure which contains all the state for a chains
/// active syncing strategy.
pub struct ChainSync<B: BlockT> {
@@ -226,6 +232,8 @@ pub struct ChainSync<B: BlockT> {
/// Enable importing existing blocks. This is used used after the state download to
/// catch up to the latest state while re-importing blocks.
import_existing: bool,
/// Gap download process.
gap_sync: Option<GapSync<B>>,
}
/// All the data we have about a Peer that we are trying to sync with
@@ -298,6 +306,8 @@ pub enum PeerSyncState<B: BlockT> {
DownloadingState,
/// Downloading warp proof.
DownloadingWarpProof,
/// Actively downloading block history after warp sync.
DownloadingGap(NumberFor<B>),
}
impl<B: BlockT> PeerSyncState<B> {
@@ -326,7 +336,7 @@ pub struct StateDownloadProgress {
/// Reported warp sync phase.
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum WarpSyncPhase {
pub enum WarpSyncPhase<B: BlockT> {
/// Waiting for peers to connect.
AwaitingPeers,
/// Downloading and verifying grandpa warp proofs.
@@ -335,24 +345,27 @@ pub enum WarpSyncPhase {
DownloadingState,
/// Importing state.
ImportingState,
/// Downloading block history.
DownloadingBlocks(NumberFor<B>),
}
impl fmt::Display for WarpSyncPhase {
impl<B: BlockT> fmt::Display for WarpSyncPhase<B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::AwaitingPeers => write!(f, "Waiting for peers"),
Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"),
Self::DownloadingState => write!(f, "Downloading state"),
Self::ImportingState => write!(f, "Importing state"),
Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n),
}
}
}
/// Reported warp sync progress.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct WarpSyncProgress {
pub struct WarpSyncProgress<B: BlockT> {
/// Estimated download percentage.
pub phase: WarpSyncPhase,
pub phase: WarpSyncPhase<B>,
/// Total bytes downloaded so far.
pub total_bytes: u64,
}
@@ -371,7 +384,7 @@ pub struct Status<B: BlockT> {
/// State sync status in progress, if any.
pub state_sync: Option<StateDownloadProgress>,
/// Warp sync in progress, if any.
pub warp_sync: Option<WarpSyncProgress>,
pub warp_sync: Option<WarpSyncProgress<B>>,
}
/// A peer did not behave as expected and should be reported.
@@ -413,16 +426,7 @@ pub enum OnStateData<B: BlockT> {
/// The block and state that should be imported.
Import(BlockOrigin, IncomingBlock<B>),
/// A new state request needs to be made to the given peer.
Request(PeerId, StateRequest),
}
/// Result of [`ChainSync::on_warp_sync_data`].
#[derive(Debug)]
pub enum OnWarpSyncData<B: BlockT> {
/// Warp proof request is issued.
WarpProofRequest(PeerId, warp::WarpProofRequest<B>),
/// A new state request needs to be made to the given peer.
StateRequest(PeerId, StateRequest),
Continue,
}
/// Result of [`ChainSync::poll_block_announce_validation`].
@@ -555,6 +559,7 @@ impl<B: BlockT> ChainSync<B> {
warp_sync: None,
warp_sync_provider,
import_existing: false,
gap_sync: None,
};
sync.reset_sync_start_point()?;
Ok(sync)
@@ -608,10 +613,14 @@ impl<B: BlockT> ChainSync<B> {
SyncState::Idle
};
let warp_sync_progress = match (&self.warp_sync, &self.mode) {
(None, SyncMode::Warp) =>
let warp_sync_progress = match (&self.warp_sync, &self.mode, &self.gap_sync) {
(_, _, Some(gap_sync)) => Some(WarpSyncProgress {
phase: WarpSyncPhase::DownloadingBlocks(gap_sync.best_queued_number),
total_bytes: 0,
}),
(None, SyncMode::Warp, _) =>
Some(WarpSyncProgress { phase: WarpSyncPhase::AwaitingPeers, total_bytes: 0 }),
(Some(sync), _) => Some(sync.progress()),
(Some(sync), _, _) => Some(sync.progress()),
_ => None,
};
@@ -686,17 +695,6 @@ impl<B: BlockT> ChainSync<B> {
return Ok(None)
}
if let SyncMode::Warp = &self.mode {
if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none()
{
log::debug!(target: "sync", "Starting warp state sync.");
if let Some(provider) = &self.warp_sync_provider {
self.warp_sync =
Some(WarpSync::new(self.client.clone(), provider.clone()));
}
}
}
// If we are at genesis, just start downloading.
let (state, req) = if self.best_queued_number.is_zero() {
debug!(
@@ -739,6 +737,17 @@ impl<B: BlockT> ChainSync<B> {
},
);
if let SyncMode::Warp = &self.mode {
if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none()
{
log::debug!(target: "sync", "Starting warp state sync.");
if let Some(provider) = &self.warp_sync_provider {
self.warp_sync =
Some(WarpSync::new(self.client.clone(), provider.clone()));
}
}
}
Ok(req)
},
Ok(BlockStatus::Queued) |
@@ -869,10 +878,13 @@ impl<B: BlockT> ChainSync<B> {
/// Get an iterator over all block requests of all peers.
pub fn block_requests(&mut self) -> impl Iterator<Item = (&PeerId, BlockRequest<B>)> + '_ {
if self.pending_requests.is_empty() || self.state_sync.is_some() || self.warp_sync.is_some()
if self.pending_requests.is_empty() ||
self.state_sync.is_some() ||
self.mode == SyncMode::Warp
{
return Either::Left(std::iter::empty())
}
if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
trace!(target: "sync", "Too many blocks in the queue.");
return Either::Left(std::iter::empty())
@@ -888,6 +900,7 @@ impl<B: BlockT> ChainSync<B> {
let queue = &self.queue_blocks;
let pending_requests = self.pending_requests.take();
let max_parallel = if major_sync { 1 } else { self.max_parallel_downloads };
let gap_sync = &mut self.gap_sync;
let iter = self.peers.iter_mut().filter_map(move |(id, peer)| {
if !peer.state.is_available() || !pending_requests.contains(id) {
return None
@@ -947,6 +960,26 @@ impl<B: BlockT> ChainSync<B> {
trace!(target: "sync", "Downloading fork {:?} from {}", hash, id);
peer.state = PeerSyncState::DownloadingStale(hash);
Some((id, req))
} else if let Some((range, req)) = gap_sync.as_mut().and_then(|sync| {
peer_gap_block_request(
id,
peer,
&mut sync.blocks,
attrs,
sync.target,
sync.best_queued_number,
)
}) {
peer.state = PeerSyncState::DownloadingGap(range.start);
trace!(
target: "sync",
"New gap block request for {}, (best:{}, common:{}) {:?}",
id,
peer.best_number,
peer.common_number,
req,
);
Some((id, req))
} else {
None
}
@@ -966,9 +999,9 @@ impl<B: BlockT> ChainSync<B> {
}
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.common_number >= sync.target_block_num() {
trace!(target: "sync", "New StateRequest for {}", id);
peer.state = PeerSyncState::DownloadingState;
let request = sync.next_request();
trace!(target: "sync", "New StateRequest for {}: {:?}", id, request);
return Some((*id, request))
}
}
@@ -982,7 +1015,7 @@ impl<B: BlockT> ChainSync<B> {
{
for (id, peer) in self.peers.iter_mut() {
if peer.state.is_available() && peer.best_number >= target {
trace!(target: "sync", "New StateRequest for {}", id);
trace!(target: "sync", "New StateRequest for {}: {:?}", id, request);
peer.state = PeerSyncState::DownloadingState;
return Some((*id, request))
}
@@ -1039,6 +1072,7 @@ impl<B: BlockT> ChainSync<B> {
response: BlockResponse<B>,
) -> Result<OnBlockData<B>, BadPeer> {
self.downloaded_blocks += response.blocks.len();
let mut gap = false;
let new_blocks: Vec<IncomingBlock<B>> = if let Some(peer) = self.peers.get_mut(who) {
let mut blocks = response.blocks;
if request
@@ -1061,6 +1095,43 @@ impl<B: BlockT> ChainSync<B> {
}
self.drain_blocks()
},
PeerSyncState::DownloadingGap(start_block) => {
let start_block = *start_block;
peer.state = PeerSyncState::Available;
if let Some(gap_sync) = &mut self.gap_sync {
gap_sync.blocks.clear_peer_download(who);
validate_blocks::<B>(&blocks, who, Some(request))?;
gap_sync.blocks.insert(start_block, blocks, who.clone());
gap = true;
gap_sync
.blocks
.drain(gap_sync.best_queued_number + One::one())
.into_iter()
.map(|block_data| {
let justifications = block_data.block.justifications.or(
legacy_justification_mapping(
block_data.block.justification,
),
);
IncomingBlock {
hash: block_data.block.hash,
header: block_data.block.header,
body: block_data.block.body,
indexed_body: block_data.block.indexed_body,
justifications,
origin: block_data.origin,
allow_missing_state: true,
import_existing: self.import_existing,
skip_execution: true,
state: None,
}
})
.collect()
} else {
debug!(target: "sync", "Unexpected gap block response from {}", who);
return Err(BadPeer(who.clone(), rep::NO_BLOCK))
}
},
PeerSyncState::DownloadingStale(_) => {
peer.state = PeerSyncState::Available;
if blocks.is_empty() {
@@ -1212,7 +1283,7 @@ impl<B: BlockT> ChainSync<B> {
return Err(BadPeer(*who, rep::NOT_REQUESTED))
};
Ok(self.validate_and_queue_blocks(new_blocks))
Ok(self.validate_and_queue_blocks(new_blocks, gap))
}
/// Handle a response from the remote to a state request that we made.
@@ -1223,6 +1294,11 @@ impl<B: BlockT> ChainSync<B> {
who: &PeerId,
response: StateResponse,
) -> Result<OnStateData<B>, BadPeer> {
if let Some(peer) = self.peers.get_mut(&who) {
if let PeerSyncState::DownloadingState = peer.state {
peer.state = PeerSyncState::Available;
}
}
let import_result = if let Some(sync) = &mut self.state_sync {
debug!(
target: "sync",
@@ -1261,11 +1337,10 @@ impl<B: BlockT> ChainSync<B> {
skip_execution: self.skip_execution(),
state: Some(state),
};
debug!(target: "sync", "State sync is complete. Import is queued");
debug!(target: "sync", "State download is complete. Import is queued");
Ok(OnStateData::Import(origin, block))
},
state::ImportResult::Continue(request) =>
Ok(OnStateData::Request(who.clone(), request)),
state::ImportResult::Continue => Ok(OnStateData::Continue),
state::ImportResult::BadResponse => {
debug!(target: "sync", "Bad state data received from {}", who);
Err(BadPeer(*who, rep::BAD_BLOCK))
@@ -1280,7 +1355,12 @@ impl<B: BlockT> ChainSync<B> {
&mut self,
who: &PeerId,
response: warp::EncodedProof,
) -> Result<OnWarpSyncData<B>, BadPeer> {
) -> Result<(), BadPeer> {
if let Some(peer) = self.peers.get_mut(&who) {
if let PeerSyncState::DownloadingWarpProof = peer.state {
peer.state = PeerSyncState::Available;
}
}
let import_result = if let Some(sync) = &mut self.warp_sync {
debug!(
target: "sync",
@@ -1295,10 +1375,7 @@ impl<B: BlockT> ChainSync<B> {
};
match import_result {
warp::WarpProofImportResult::StateRequest(request) =>
Ok(OnWarpSyncData::StateRequest(*who, request)),
warp::WarpProofImportResult::WarpProofRequest(request) =>
Ok(OnWarpSyncData::WarpProofRequest(*who, request)),
warp::WarpProofImportResult::Success => Ok(()),
warp::WarpProofImportResult::BadResponse => {
debug!(target: "sync", "Bad proof data received from {}", who);
Err(BadPeer(*who, rep::BAD_BLOCK))
@@ -1309,6 +1386,7 @@ impl<B: BlockT> ChainSync<B> {
fn validate_and_queue_blocks(
&mut self,
mut new_blocks: Vec<IncomingBlock<B>>,
gap: bool,
) -> OnBlockData<B> {
let orig_len = new_blocks.len();
new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash));
@@ -1320,7 +1398,7 @@ impl<B: BlockT> ChainSync<B> {
);
}
let origin = if self.status().state != SyncState::Downloading {
let origin = if !gap && self.status().state != SyncState::Downloading {
BlockOrigin::NetworkBroadcast
} else {
BlockOrigin::NetworkInitialSync
@@ -1494,6 +1572,15 @@ impl<B: BlockT> ChainSync<B> {
self.mode = SyncMode::Full;
output.extend(self.restart());
}
let gap_sync_complete =
self.gap_sync.as_ref().map_or(false, |s| s.target == number);
if gap_sync_complete {
info!(
target: "sync",
"Block history download is complete."
);
self.gap_sync = None;
}
},
Err(BlockImportError::IncompleteHeader(who)) =>
if let Some(peer) = who {
@@ -1601,6 +1688,11 @@ impl<B: BlockT> ChainSync<B> {
if self.fork_targets.remove(&hash).is_some() {
trace!(target: "sync", "Completed fork sync {:?}", hash);
}
if let Some(gap_sync) = &mut self.gap_sync {
if number > gap_sync.best_queued_number && number <= gap_sync.target {
gap_sync.best_queued_number = number;
}
}
if number > self.best_queued_number {
self.best_queued_number = number;
self.best_queued_hash = *hash;
@@ -1954,6 +2046,9 @@ impl<B: BlockT> ChainSync<B> {
/// import, so this functions checks for such blocks and returns them.
pub fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<B>> {
self.blocks.clear_peer_download(who);
if let Some(gap_sync) = &mut self.gap_sync {
gap_sync.blocks.clear_peer_download(who)
}
self.peers.remove(who);
self.extra_justifications.peer_disconnected(who);
self.pending_requests.set_all();
@@ -1963,7 +2058,7 @@ impl<B: BlockT> ChainSync<B> {
});
let blocks = self.drain_blocks();
if !blocks.is_empty() {
Some(self.validate_and_queue_blocks(blocks))
Some(self.validate_and_queue_blocks(blocks, false))
} else {
None
}
@@ -2043,6 +2138,14 @@ impl<B: BlockT> ChainSync<B> {
}
}
}
if let Some((start, end)) = info.block_gap {
debug!(target: "sync", "Starting gap sync #{} - #{}", start, end);
self.gap_sync = Some(GapSync {
best_queued_number: start - One::one(),
target: end,
blocks: BlockCollection::new(),
});
}
trace!(target: "sync", "Restarted sync at #{} ({:?})", self.best_queued_number, self.best_queued_hash);
Ok(())
}
@@ -2250,6 +2353,39 @@ fn peer_block_request<B: BlockT>(
Some((range, request))
}
/// Get a new block request for the peer if any.
fn peer_gap_block_request<B: BlockT>(
id: &PeerId,
peer: &PeerSync<B>,
blocks: &mut BlockCollection<B>,
attrs: message::BlockAttributes,
target: NumberFor<B>,
common_number: NumberFor<B>,
) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
let range = blocks.needed_blocks(
id.clone(),
MAX_BLOCKS_TO_REQUEST,
std::cmp::min(peer.best_number, target),
common_number,
1,
MAX_DOWNLOAD_AHEAD,
)?;
// The end is not part of the range.
let last = range.end.saturating_sub(One::one());
let from = message::FromBlock::Number(last);
let request = message::generic::BlockRequest {
id: 0,
fields: attrs.clone(),
from,
to: None,
direction: message::Direction::Descending,
max: Some((range.end - range.start).saturated_into::<u32>()),
};
Some((range, request))
}
/// Get pending fork sync targets for a peer.
fn fork_sync_request<B: BlockT>(
id: &PeerId,
@@ -47,8 +47,8 @@ pub struct StateSync<B: BlockT> {
pub enum ImportResult<B: BlockT> {
/// State is complete and ready for import.
Import(B::Hash, B::Header, ImportedState<B>),
/// Continue dowloading.
Continue(StateRequest),
/// Continue downloading.
Continue,
/// Bad state chunk.
BadResponse,
}
@@ -134,7 +134,7 @@ impl<B: BlockT> StateSync<B> {
ImportedState { block: self.target_block, state: std::mem::take(&mut self.state) },
)
} else {
ImportResult::Continue(self.next_request())
ImportResult::Continue
}
}
@@ -37,11 +37,9 @@ enum Phase<B: BlockT> {
}
/// Import warp proof result.
pub enum WarpProofImportResult<B: BlockT> {
/// Start downloading state data.
StateRequest(StateRequest),
/// Continue dowloading warp sync proofs.
WarpProofRequest(WarpProofRequest<B>),
pub enum WarpProofImportResult {
/// Import was successful.
Success,
/// Bad proof.
BadResponse,
}
@@ -69,7 +67,7 @@ impl<B: BlockT> WarpSync<B> {
Self { client, warp_sync_provider, phase, total_proof_bytes: 0 }
}
/// Validate and import a state reponse.
/// Validate and import a state response.
pub fn import_state(&mut self, response: StateResponse) -> ImportResult<B> {
match &mut self.phase {
Phase::WarpProof { .. } => {
@@ -80,19 +78,15 @@ impl<B: BlockT> WarpSync<B> {
}
}
/// Validate and import a warp proof reponse.
pub fn import_warp_proof(&mut self, response: EncodedProof) -> WarpProofImportResult<B> {
/// Validate and import a warp proof response.
pub fn import_warp_proof(&mut self, response: EncodedProof) -> WarpProofImportResult {
match &mut self.phase {
Phase::State(_) => {
log::debug!(target: "sync", "Unexpected warp proof response");
WarpProofImportResult::BadResponse
},
Phase::WarpProof { set_id, authorities, last_hash } => {
match self.warp_sync_provider.verify(
&response,
*set_id,
std::mem::take(authorities),
) {
match self.warp_sync_provider.verify(&response, *set_id, authorities.clone()) {
Err(e) => {
log::debug!(target: "sync", "Bad warp proof response: {:?}", e);
return WarpProofImportResult::BadResponse
@@ -103,17 +97,14 @@ impl<B: BlockT> WarpSync<B> {
*authorities = new_authorities;
*last_hash = new_last_hash.clone();
self.total_proof_bytes += response.0.len() as u64;
WarpProofImportResult::WarpProofRequest(WarpProofRequest {
begin: new_last_hash,
})
WarpProofImportResult::Success
},
Ok(VerificationResult::Complete(new_set_id, _, header)) => {
log::debug!(target: "sync", "Verified complete proof, set_id={:?}", new_set_id);
self.total_proof_bytes += response.0.len() as u64;
let state_sync = StateSync::new(self.client.clone(), header, false);
let request = state_sync.next_request();
self.phase = Phase::State(state_sync);
WarpProofImportResult::StateRequest(request)
WarpProofImportResult::Success
},
}
},
@@ -161,7 +152,7 @@ impl<B: BlockT> WarpSync<B> {
}
/// Returns state sync estimated progress (percentage, bytes)
pub fn progress(&self) -> WarpSyncProgress {
pub fn progress(&self) -> WarpSyncProgress<B> {
match &self.phase {
Phase::WarpProof { .. } => WarpSyncProgress {
phase: WarpSyncPhase::DownloadingWarpProofs,
@@ -23,10 +23,11 @@ use futures::{
stream::StreamExt,
};
use log::debug;
use sp_finality_grandpa::{AuthorityList, SetId};
use sp_runtime::traits::Block as BlockT;
use std::{sync::Arc, time::Duration};
pub use sp_finality_grandpa::{AuthorityList, SetId};
/// Scale-encoded warp sync proof response.
pub struct EncodedProof(pub Vec<u8>);
@@ -55,7 +56,7 @@ pub trait WarpSyncProvider<B: BlockT>: Send + Sync {
&self,
start: B::Hash,
) -> Result<EncodedProof, Box<dyn std::error::Error + Send + Sync>>;
/// Verify warp proof agains current set of authorities.
/// Verify warp proof against current set of authorities.
fn verify(
&self,
proof: &EncodedProof,
+40 -3
View File
@@ -55,7 +55,7 @@ use sc_network::{
},
light_client_requests::{self, handler::LightClientRequestHandler},
state_request_handler::{self, StateRequestHandler},
Multiaddr, NetworkService, NetworkWorker,
warp_request_handler, Multiaddr, NetworkService, NetworkWorker,
};
use sc_service::client::Client;
use sp_blockchain::{
@@ -68,6 +68,7 @@ use sp_consensus::{
};
use sp_core::H256;
use sp_runtime::{
codec::{Decode, Encode},
generic::{BlockId, OpaqueDigestItemId},
traits::{Block as BlockT, Header as HeaderT, NumberFor},
Justification, Justifications,
@@ -652,6 +653,33 @@ impl<B: BlockT> VerifierAdapter<B> {
}
}
struct TestWarpSyncProvider<B: BlockT>(Arc<dyn HeaderBackend<B>>);
impl<B: BlockT> warp_request_handler::WarpSyncProvider<B> for TestWarpSyncProvider<B> {
fn generate(
&self,
_start: B::Hash,
) -> Result<warp_request_handler::EncodedProof, Box<dyn std::error::Error + Send + Sync>> {
let info = self.0.info();
let best_header = self.0.header(BlockId::hash(info.best_hash)).unwrap().unwrap();
Ok(warp_request_handler::EncodedProof(best_header.encode()))
}
fn verify(
&self,
proof: &warp_request_handler::EncodedProof,
_set_id: warp_request_handler::SetId,
_authorities: warp_request_handler::AuthorityList,
) -> Result<warp_request_handler::VerificationResult<B>, Box<dyn std::error::Error + Send + Sync>>
{
let warp_request_handler::EncodedProof(encoded) = proof;
let header = B::Header::decode(&mut encoded.as_slice()).unwrap();
Ok(warp_request_handler::VerificationResult::Complete(0, Default::default(), header))
}
fn current_authorities(&self) -> warp_request_handler::AuthorityList {
Default::default()
}
}
/// Configuration for a full peer.
#[derive(Default)]
pub struct FullPeerConfig {
@@ -737,7 +765,7 @@ where
(Some(keep_blocks), false) => TestClientBuilder::with_pruning_window(keep_blocks),
(None, false) => TestClientBuilder::with_default_backend(),
};
if matches!(config.sync_mode, SyncMode::Fast { .. }) {
if matches!(config.sync_mode, SyncMode::Fast { .. } | SyncMode::Warp) {
test_client_builder = test_client_builder.set_no_genesis();
}
let backend = test_client_builder.backend();
@@ -816,6 +844,15 @@ where
protocol_config
};
let warp_sync = Arc::new(TestWarpSyncProvider(client.clone()));
let warp_protocol_config = {
let (handler, protocol_config) =
warp_request_handler::RequestHandler::new(protocol_id.clone(), warp_sync.clone());
self.spawn_task(handler.run().boxed());
protocol_config
};
let network = NetworkWorker::new(sc_network::config::Params {
role: if config.is_authority { Role::Authority } else { Role::Full },
executor: None,
@@ -835,7 +872,7 @@ where
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
warp_sync: None,
warp_sync: Some((warp_sync, warp_protocol_config)),
})
.unwrap();
+32
View File
@@ -1202,6 +1202,38 @@ fn syncs_indexed_blocks() {
.is_some());
}
#[test]
fn warp_sync() {
sp_tracing::try_init_simple();
let mut net = TestNet::new(0);
// Create 3 synced peers and 1 peer trying to warp sync.
net.add_full_peer_with_config(Default::default());
net.add_full_peer_with_config(Default::default());
net.add_full_peer_with_config(Default::default());
net.add_full_peer_with_config(FullPeerConfig {
sync_mode: SyncMode::Warp,
..Default::default()
});
let gap_end = net.peer(0).push_blocks(63, false);
net.peer(0).push_blocks(1, false);
net.peer(1).push_blocks(64, false);
net.peer(2).push_blocks(64, false);
// Wait for peer 1 to sync state.
net.block_until_sync();
assert!(!net.peer(3).client().has_state_at(&BlockId::Number(1)));
assert!(net.peer(3).client().has_state_at(&BlockId::Number(64)));
// Wait for peer 1 download block history
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(3).has_block(&gap_end) {
Poll::Ready(())
} else {
Poll::Pending
}
}));
}
#[test]
fn syncs_huge_blocks() {
use sp_core::storage::well_known_keys::HEAP_PAGES;
@@ -19,7 +19,7 @@
use crate::error::Error;
use codec::Encode;
use futures::{future, prelude::*};
use sc_client_api::{BlockBackend, UsageProvider};
use sc_client_api::{BlockBackend, HeaderBackend};
use sc_consensus::import_queue::ImportQueue;
use sp_runtime::{generic::BlockId, traits::Block as BlockT};
@@ -33,7 +33,7 @@ pub fn check_block<B, IQ, C>(
block_id: BlockId<B>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
where
C: BlockBackend<B> + UsageProvider<B> + Send + Sync + 'static,
C: BlockBackend<B> + HeaderBackend<B> + Send + Sync + 'static,
B: BlockT + for<'de> serde::Deserialize<'de>,
IQ: ImportQueue<B> + 'static,
{
@@ -22,7 +22,7 @@ use futures::{future, prelude::*};
use futures_timer::Delay;
use log::{info, warn};
use sc_chain_spec::ChainSpec;
use sc_client_api::UsageProvider;
use sc_client_api::HeaderBackend;
use sc_consensus::import_queue::{
BlockImportError, BlockImportStatus, ImportQueue, IncomingBlock, Link,
};
@@ -296,7 +296,7 @@ pub fn import_blocks<B, IQ, C>(
binary: bool,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
where
C: UsageProvider<B> + Send + Sync + 'static,
C: HeaderBackend<B> + Send + Sync + 'static,
B: BlockT + for<'de> serde::Deserialize<'de>,
IQ: ImportQueue<B> + 'static,
{
@@ -438,7 +438,7 @@ where
info!(
"🎉 Imported {} blocks. Best: #{}",
read_block_count,
client.usage_info().chain.best_number
client.info().best_number
);
return Poll::Ready(Ok(()))
} else {
@@ -469,7 +469,7 @@ where
queue.poll_actions(cx, &mut link);
let best_number = client.usage_info().chain.best_number;
let best_number = client.info().best_number;
speedometer.notify_user(best_number);
if link.has_error {
+14 -9
View File
@@ -684,8 +684,6 @@ where
..
} = import_block;
assert!(justifications.is_some() && finalized || justifications.is_none());
if !intermediates.is_empty() {
return Err(Error::IncompletePipeline)
}
@@ -779,11 +777,17 @@ where
}
let info = self.backend.blockchain().info();
let gap_block = info
.block_gap
.map_or(false, |(start, _)| *import_headers.post().number() == start);
assert!(justifications.is_some() && finalized || justifications.is_none() || gap_block);
// the block is lower than our last finalized block so it must revert
// finality, refusing import.
if status == blockchain::BlockStatus::Unknown &&
*import_headers.post().number() <= info.finalized_number
*import_headers.post().number() <= info.finalized_number &&
!gap_block
{
return Err(sp_blockchain::Error::NotInFinalizedChain)
}
@@ -854,12 +858,13 @@ where
None => None,
};
let is_new_best = finalized ||
match fork_choice {
ForkChoiceStrategy::LongestChain =>
import_headers.post().number() > &info.best_number,
ForkChoiceStrategy::Custom(v) => v,
};
let is_new_best = !gap_block &&
(finalized ||
match fork_choice {
ForkChoiceStrategy::LongestChain =>
import_headers.post().number() > &info.best_number,
ForkChoiceStrategy::Custom(v) => v,
});
let leaf_state = if finalized {
NewBlockState::Final
@@ -281,6 +281,8 @@ pub struct Info<Block: BlockT> {
pub finalized_state: Option<(Block::Hash, <<Block as BlockT>::Header as HeaderT>::Number)>,
/// Number of concurrent leave forks.
pub number_leaves: usize,
/// Missing blocks after warp sync. (start, end).
pub block_gap: Option<(NumberFor<Block>, NumberFor<Block>)>,
}
/// Block status.