epoch-changes: separate epoch header with epoch data (#4881)

* fork-tree: prune returns all pruned node data

* epoch-changes: split EpochHeader vs epoch data

* EpochChanges::viable_epoch and add missing comments

* Incoperate the new epoch_changes interface for BABE

* Fix BABE tests

* Fix fork-tree pruning issue

* Fix tests

* Fix pruning algorithm

* fork-tree: implement map function for mapping one value type to another

* Add migration script for new epoch changes scheme

* Update utils/fork-tree/src/lib.rs

Co-Authored-By: André Silva <andre.beat@gmail.com>

* Update client/consensus/slots/src/lib.rs

Co-Authored-By: André Silva <andre.beat@gmail.com>

* Remove authorities_len.is_none check, which is duplicate of unwrap_or(false)

* Update client/consensus/epochs/src/lib.rs

Co-Authored-By: André Silva <andre.beat@gmail.com>

* Update client/consensus/epochs/src/lib.rs

Co-Authored-By: André Silva <andre.beat@gmail.com>

* No trailing ; for return statement

* Use VERSION_KEY for migration

* Fix issues that removed nodes are not properly added into removed list

* Add comments indicating end_slot is non-inclusive

* fork-tree: use &mut F for map type declaration

* Add tests for v0 epoch_changes migration

* Fix babe RPC tests

Co-authored-by: André Silva <andre.beat@gmail.com>
This commit is contained in:
Wei Tang
2020-03-16 22:06:13 +01:00
committed by GitHub
parent abe391a0a7
commit 846a9ce8c6
10 changed files with 764 additions and 212 deletions
+104 -15
View File
@@ -25,10 +25,12 @@ use sc_client_api::backend::AuxStore;
use sp_blockchain::{Result as ClientResult, Error as ClientError};
use sp_runtime::traits::Block as BlockT;
use sp_consensus_babe::BabeBlockWeight;
use sc_consensus_epochs::{EpochChangesFor, SharedEpochChanges};
use sc_consensus_epochs::{EpochChangesFor, SharedEpochChanges, migration::EpochChangesForV0};
use crate::Epoch;
const BABE_EPOCH_CHANGES: &[u8] = b"babe_epoch_changes";
const BABE_EPOCH_CHANGES_VERSION: &[u8] = b"babe_epoch_changes_version";
const BABE_EPOCH_CHANGES_KEY: &[u8] = b"babe_epoch_changes";
const BABE_EPOCH_CHANGES_CURRENT_VERSION: u32 = 1;
fn block_weight_key<H: Encode>(block_hash: H) -> Vec<u8> {
(b"block_weight", block_hash).encode()
@@ -52,14 +54,30 @@ fn load_decode<B, T>(backend: &B, key: &[u8]) -> ClientResult<Option<T>>
pub(crate) fn load_epoch_changes<Block: BlockT, B: AuxStore>(
backend: &B,
) -> ClientResult<SharedEpochChanges<Block, Epoch>> {
let epoch_changes = load_decode::<_, EpochChangesFor<Block, Epoch>>(backend, BABE_EPOCH_CHANGES)?
.map(|v| Arc::new(Mutex::new(v)))
.unwrap_or_else(|| {
info!(target: "babe",
"Creating empty BABE epoch changes on what appears to be first startup."
);
SharedEpochChanges::<Block, Epoch>::default()
});
let version = load_decode::<_, u32>(backend, BABE_EPOCH_CHANGES_VERSION)?;
let maybe_epoch_changes = match version {
None => load_decode::<_, EpochChangesForV0<Block, Epoch>>(
backend,
BABE_EPOCH_CHANGES_KEY,
)?.map(|v0| v0.migrate()),
Some(BABE_EPOCH_CHANGES_CURRENT_VERSION) => load_decode::<_, EpochChangesFor<Block, Epoch>>(
backend,
BABE_EPOCH_CHANGES_KEY,
)?,
Some(other) => {
return Err(ClientError::Backend(
format!("Unsupported BABE DB version: {:?}", other)
))
},
};
let epoch_changes = Arc::new(Mutex::new(maybe_epoch_changes.unwrap_or_else(|| {
info!(target: "babe",
"Creating empty BABE epoch changes on what appears to be first startup."
);
EpochChangesFor::<Block, Epoch>::default()
})));
// rebalance the tree after deserialization. this isn't strictly necessary
// since the tree is now rebalanced on every update operation. but since the
@@ -77,10 +95,13 @@ pub(crate) fn write_epoch_changes<Block: BlockT, F, R>(
) -> R where
F: FnOnce(&[(&'static [u8], &[u8])]) -> R,
{
let encoded_epoch_changes = epoch_changes.encode();
write_aux(
&[(BABE_EPOCH_CHANGES, encoded_epoch_changes.as_slice())],
)
BABE_EPOCH_CHANGES_CURRENT_VERSION.using_encoded(|version| {
let encoded_epoch_changes = epoch_changes.encode();
write_aux(
&[(BABE_EPOCH_CHANGES_KEY, encoded_epoch_changes.as_slice()),
(BABE_EPOCH_CHANGES_VERSION, version)],
)
})
}
/// Write the cumulative chain-weight of a block ot aux storage.
@@ -91,7 +112,6 @@ pub(crate) fn write_block_weight<H: Encode, F, R>(
) -> R where
F: FnOnce(&[(Vec<u8>, &[u8])]) -> R,
{
let key = block_weight_key(block_hash);
block_weight.using_encoded(|s|
write_aux(
@@ -107,3 +127,72 @@ pub(crate) fn load_block_weight<H: Encode, B: AuxStore>(
) -> ClientResult<Option<BabeBlockWeight>> {
load_decode(backend, block_weight_key(block_hash).as_slice())
}
#[cfg(test)]
mod test {
use super::*;
use crate::Epoch;
use fork_tree::ForkTree;
use substrate_test_runtime_client;
use sp_core::H256;
use sp_runtime::traits::NumberFor;
use sc_consensus_epochs::{PersistedEpoch, PersistedEpochHeader, EpochHeader};
use sp_consensus::Error as ConsensusError;
use sc_network_test::Block as TestBlock;
#[test]
fn load_decode_from_v0_epoch_changes() {
let epoch = Epoch {
start_slot: 0,
authorities: vec![],
randomness: [0; 32],
epoch_index: 1,
duration: 100,
};
let client = substrate_test_runtime_client::new();
let mut v0_tree = ForkTree::<H256, NumberFor<TestBlock>, _>::new();
v0_tree.import::<_, ConsensusError>(
Default::default(),
Default::default(),
PersistedEpoch::Regular(epoch),
&|_, _| Ok(false), // Test is single item only so this can be set to false.
).unwrap();
client.insert_aux(
&[(BABE_EPOCH_CHANGES_KEY,
&EpochChangesForV0::<TestBlock, Epoch>::from_raw(v0_tree).encode()[..])],
&[],
).unwrap();
assert_eq!(
load_decode::<_, u32>(&client, BABE_EPOCH_CHANGES_VERSION).unwrap(),
None,
);
let epoch_changes = load_epoch_changes::<TestBlock, _>(&client).unwrap();
assert!(
epoch_changes.lock()
.tree()
.iter()
.map(|(_, _, epoch)| epoch.clone())
.collect::<Vec<_>>() ==
vec![PersistedEpochHeader::Regular(EpochHeader {
start_slot: 0,
end_slot: 100,
})],
); // PersistedEpochHeader does not implement Debug, so we use assert! directly.
write_epoch_changes::<TestBlock, _, _>(
&epoch_changes.lock(),
|values| {
client.insert_aux(values, &[]).unwrap();
},
);
assert_eq!(
load_decode::<_, u32>(&client, BABE_EPOCH_CHANGES_VERSION).unwrap(),
Some(1),
);
}
}
+55 -38
View File
@@ -78,7 +78,7 @@ use sp_runtime::{
generic::{BlockId, OpaqueDigestItemId}, Justification,
traits::{Block as BlockT, Header, DigestItemFor, Zero},
};
use sp_api::ProvideRuntimeApi;
use sp_api::{ProvideRuntimeApi, NumberFor};
use sc_keystore::KeyStorePtr;
use parking_lot::Mutex;
use sp_core::Pair;
@@ -104,7 +104,7 @@ use sc_consensus_slots::{
SlotWorker, SlotInfo, SlotCompatible, StorageChanges, CheckedHeader, check_equivocation,
};
use sc_consensus_epochs::{
descendent_query, ViableEpoch, SharedEpochChanges, EpochChangesFor, Epoch as EpochT
descendent_query, SharedEpochChanges, EpochChangesFor, Epoch as EpochT, ViableEpochDescriptor,
};
use sp_blockchain::{
Result as ClientResult, Error as ClientError,
@@ -231,9 +231,9 @@ macro_rules! babe_info {
/// Intermediate value passed to block importer.
pub struct BabeIntermediate {
/// The epoch data, if available.
pub epoch: ViableEpoch<Epoch>,
pub struct BabeIntermediate<B: BlockT> {
/// The epoch descriptor.
pub epoch_descriptor: ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
}
/// Intermediate key for Babe engine.
@@ -402,7 +402,7 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
SO: SyncOracle + Send + Clone,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
{
type EpochData = ViableEpoch<Epoch>;
type EpochData = ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>;
type Claim = (PreDigest, AuthorityPair);
type SyncOracle = SO;
type CreateProposer = Pin<Box<
@@ -424,31 +424,35 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
parent: &B::Header,
slot_number: u64,
) -> Result<Self::EpochData, ConsensusError> {
self.epoch_changes.lock().epoch_for_child_of(
self.epoch_changes.lock().epoch_descriptor_for_child_of(
descendent_query(&*self.client),
&parent.hash(),
parent.number().clone(),
slot_number,
|slot| self.config.genesis_epoch(slot)
)
.map_err(|e| ConsensusError::ChainLookup(format!("{:?}", e)))?
.ok_or(sp_consensus::Error::InvalidAuthoritiesSet)
}
fn authorities_len(&self, epoch_data: &Self::EpochData) -> usize {
epoch_data.as_ref().authorities.len()
fn authorities_len(&self, epoch_descriptor: &Self::EpochData) -> Option<usize> {
self.epoch_changes.lock()
.viable_epoch(&epoch_descriptor, |slot| self.config.genesis_epoch(slot))
.map(|epoch| epoch.as_ref().authorities.len())
}
fn claim_slot(
&self,
_parent_header: &B::Header,
slot_number: SlotNumber,
epoch_data: &ViableEpoch<Epoch>,
epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
) -> Option<Self::Claim> {
debug!(target: "babe", "Attempting to claim slot {}", slot_number);
let s = authorship::claim_slot(
slot_number,
epoch_data.as_ref(),
self.epoch_changes.lock().viable_epoch(
&epoch_descriptor,
|slot| self.config.genesis_epoch(slot)
)?.as_ref(),
&*self.config,
&self.keystore,
);
@@ -478,7 +482,7 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
Self::Claim,
Self::EpochData,
) -> sp_consensus::BlockImportParams<B, I::Transaction> + Send> {
Box::new(|header, header_hash, body, storage_changes, (_, pair), epoch| {
Box::new(|header, header_hash, body, storage_changes, (_, pair), epoch_descriptor| {
// sign the pre-sealed hash of the block and then
// add it to a digest item.
let signature = pair.sign(header_hash.as_ref());
@@ -490,7 +494,7 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
import_block.storage_changes = Some(storage_changes);
import_block.intermediates.insert(
Cow::from(INTERMEDIATE_KEY),
Box::new(BabeIntermediate { epoch }) as Box<dyn Any>,
Box::new(BabeIntermediate::<B> { epoch_descriptor }) as Box<dyn Any>,
);
import_block
@@ -729,18 +733,19 @@ impl<Block, Client> Verifier<Block> for BabeVerifier<Block, Client> where
.map_err(Error::<Block>::FetchParentHeader)?;
let pre_digest = find_pre_digest::<Block>(&header)?;
let epoch = {
let epoch_changes = self.epoch_changes.lock();
epoch_changes.epoch_for_child_of(
descendent_query(&*self.client),
&parent_hash,
parent_header_metadata.number,
pre_digest.slot_number(),
|slot| self.config.genesis_epoch(slot),
)
.map_err(|e| Error::<Block>::ForkTree(Box::new(e)))?
.ok_or_else(|| Error::<Block>::FetchEpoch(parent_hash))?
};
let epoch_changes = self.epoch_changes.lock();
let epoch_descriptor = epoch_changes.epoch_descriptor_for_child_of(
descendent_query(&*self.client),
&parent_hash,
parent_header_metadata.number,
pre_digest.slot_number(),
)
.map_err(|e| Error::<Block>::ForkTree(Box::new(e)))?
.ok_or_else(|| Error::<Block>::FetchEpoch(parent_hash))?;
let viable_epoch = epoch_changes.viable_epoch(
&epoch_descriptor,
|slot| self.config.genesis_epoch(slot)
).ok_or_else(|| Error::<Block>::FetchEpoch(parent_hash))?;
// We add one to the current slot to allow for some small drift.
// FIXME #1019 in the future, alter this queue to allow deferring of headers
@@ -748,7 +753,7 @@ impl<Block, Client> Verifier<Block> for BabeVerifier<Block, Client> where
header: header.clone(),
pre_digest: Some(pre_digest.clone()),
slot_now: slot_now + 1,
epoch: epoch.as_ref(),
epoch: viable_epoch.as_ref(),
config: &self.config,
};
@@ -808,7 +813,7 @@ impl<Block, Client> Verifier<Block> for BabeVerifier<Block, Client> where
import_block.justification = justification;
import_block.intermediates.insert(
Cow::from(INTERMEDIATE_KEY),
Box::new(BabeIntermediate { epoch }) as Box<dyn Any>,
Box::new(BabeIntermediate::<Block> { epoch_descriptor }) as Box<dyn Any>,
);
import_block.post_hash = Some(hash);
@@ -946,7 +951,7 @@ impl<Block, Client, Inner> BlockImport<Block> for BabeBlockImport<Block, Client,
// if this is the first block in its chain for that epoch.
//
// also provides the total weight of the chain, including the imported block.
let (epoch, first_in_epoch, parent_weight) = {
let (epoch_descriptor, first_in_epoch, parent_weight) = {
let parent_weight = if *parent_header.number() == Zero::zero() {
0
} else {
@@ -957,13 +962,13 @@ impl<Block, Client, Inner> BlockImport<Block> for BabeBlockImport<Block, Client,
))?
};
let intermediate = block.take_intermediate::<BabeIntermediate>(
let intermediate = block.take_intermediate::<BabeIntermediate<Block>>(
INTERMEDIATE_KEY
)?;
let epoch = intermediate.epoch;
let first_in_epoch = parent_slot < epoch.as_ref().start_slot;
(epoch, first_in_epoch, parent_weight)
let epoch_descriptor = intermediate.epoch_descriptor;
let first_in_epoch = parent_slot < epoch_descriptor.start_slot();
(epoch_descriptor, first_in_epoch, parent_weight)
};
let total_weight = parent_weight + pre_digest.added_weight();
@@ -994,12 +999,23 @@ impl<Block, Client, Inner> BlockImport<Block> for BabeBlockImport<Block, Client,
let info = self.client.info();
if let Some(next_epoch_descriptor) = next_epoch_digest {
let next_epoch = epoch.increment(next_epoch_descriptor);
old_epoch_changes = Some(epoch_changes.clone());
let viable_epoch = epoch_changes.viable_epoch(
&epoch_descriptor,
|slot| self.config.genesis_epoch(slot),
).ok_or_else(|| {
ConsensusError::ClientImport(Error::<Block>::FetchEpoch(parent_hash).into())
})?;
babe_info!("New epoch {} launching at block {} (block slot {} >= start slot {}).",
epoch.as_ref().epoch_index, hash, slot_number, epoch.as_ref().start_slot);
viable_epoch.as_ref().epoch_index,
hash,
slot_number,
viable_epoch.as_ref().start_slot);
let next_epoch = viable_epoch.increment(next_epoch_descriptor);
babe_info!("Next epoch starts at slot {}", next_epoch.as_ref().start_slot);
// prune the tree of epochs not part of the finalized chain or
@@ -1227,7 +1243,8 @@ pub mod test_helpers {
HeaderMetadata<B, Error = ClientError>,
C::Api: BabeApi<B>,
{
let epoch = link.epoch_changes.lock().epoch_for_child_of(
let epoch_changes = link.epoch_changes.lock();
let epoch = epoch_changes.epoch_data_for_child_of(
descendent_query(client),
&parent.hash(),
parent.number().clone(),
@@ -1237,7 +1254,7 @@ pub mod test_helpers {
authorship::claim_slot(
slot_number,
epoch.as_ref(),
&epoch,
&link.config,
keystore,
).map(|(digest, _)| digest)
+7 -9
View File
@@ -122,7 +122,7 @@ impl DummyProposer {
// figure out if we should add a consensus digest, since the test runtime
// doesn't.
let epoch_changes = self.factory.epoch_changes.lock();
let epoch = epoch_changes.epoch_for_child_of(
let epoch = epoch_changes.epoch_data_for_child_of(
descendent_query(&*self.factory.client),
&self.parent_hash,
self.parent_number,
@@ -130,8 +130,7 @@ impl DummyProposer {
|slot| self.factory.config.genesis_epoch(slot),
)
.expect("client has data to find epoch")
.expect("can compute epoch for baked block")
.into_inner();
.expect("can compute epoch for baked block");
let first_in_epoch = self.parent_slot < epoch.start_slot;
if first_in_epoch {
@@ -421,7 +420,7 @@ fn run_one_test(
panic!("Verification failed for {:?}: {}", h, e);
}
}
Poll::<()>::Pending
}),
future::select(future::join_all(import_notifications), future::join_all(babe_futures))
@@ -566,12 +565,11 @@ fn propose_and_import_block<Transaction>(
let mut block = futures::executor::block_on(proposer.propose_with(pre_digest)).unwrap().block;
let epoch = proposer_factory.epoch_changes.lock().epoch_for_child_of(
let epoch_descriptor = proposer_factory.epoch_changes.lock().epoch_descriptor_for_child_of(
descendent_query(&*proposer_factory.client),
&parent_hash,
*parent.number(),
slot_number,
|slot| proposer_factory.config.genesis_epoch(slot)
).unwrap().unwrap();
let seal = {
@@ -595,7 +593,7 @@ fn propose_and_import_block<Transaction>(
import.body = Some(block.extrinsics);
import.intermediates.insert(
Cow::from(INTERMEDIATE_KEY),
Box::new(BabeIntermediate { epoch }) as Box<dyn Any>,
Box::new(BabeIntermediate::<TestBlock> { epoch_descriptor }) as Box<dyn Any>,
);
import.fork_choice = Some(ForkChoiceStrategy::LongestChain);
let import_result = block_import.import_block(import, Default::default()).unwrap();
@@ -637,13 +635,13 @@ fn importing_block_one_sets_genesis_epoch() {
let genesis_epoch = data.link.config.genesis_epoch(999);
let epoch_changes = data.link.epoch_changes.lock();
let epoch_for_second_block = epoch_changes.epoch_for_child_of(
let epoch_for_second_block = epoch_changes.epoch_data_for_child_of(
descendent_query(&*client),
&block_hash,
1,
1000,
|slot| data.link.config.genesis_epoch(slot),
).unwrap().unwrap().into_inner();
).unwrap().unwrap();
assert_eq!(epoch_for_second_block, genesis_epoch);
}