Revert "Header-only sync for old forks (#3942)" (#4022)

This reverts commit 172359adad.
This commit is contained in:
Gavin Wood
2019-11-05 16:05:36 +01:00
committed by GitHub
parent 172359adad
commit 350e72dac5
11 changed files with 80 additions and 178 deletions
+15 -26
View File
@@ -894,12 +894,6 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
inmem
}
/// Returns total numbet of blocks (headers) in the block DB.
#[cfg(feature = "test-helpers")]
pub fn blocks_count(&self) -> u64 {
self.blockchain.db.iter(columns::HEADER).count() as u64
}
/// Read (from storage or cache) changes trie config.
///
/// Currently changes tries configuration is set up once (at genesis) and could not
@@ -1121,7 +1115,7 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
);
transaction.put(columns::HEADER, &lookup_key, &pending_block.header.encode());
if let Some(body) = &pending_block.body {
if let Some(body) = pending_block.body {
transaction.put(columns::BODY, &lookup_key, &body.encode());
}
if let Some(justification) = pending_block.justification {
@@ -1133,26 +1127,21 @@ impl<Block: BlockT<Hash=H256>> Backend<Block> {
transaction.put(columns::META, meta_keys::GENESIS_HASH, hash.as_ref());
}
let finalized = if pending_block.body.is_some() {
let mut changeset: state_db::ChangeSet<Vec<u8>> = state_db::ChangeSet::default();
for (key, (val, rc)) in operation.db_updates.drain() {
if rc > 0 {
changeset.inserted.push((key, val.to_vec()));
} else if rc < 0 {
changeset.deleted.push(key);
}
let mut changeset: state_db::ChangeSet<Vec<u8>> = state_db::ChangeSet::default();
for (key, (val, rc)) in operation.db_updates.drain() {
if rc > 0 {
changeset.inserted.push((key, val.to_vec()));
} else if rc < 0 {
changeset.deleted.push(key);
}
let number_u64 = number.saturated_into::<u64>();
let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset)
.map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(&mut transaction, commit);
}
let number_u64 = number.saturated_into::<u64>();
let commit = self.storage.state_db.insert_block(&hash, number_u64, &pending_block.header.parent_hash(), changeset)
.map_err(|e: state_db::Error<io::Error>| client::error::Error::from(format!("State database error: {:?}", e)))?;
apply_state_commit(&mut transaction, commit);
// Check if need to finalize. Genesis is always finalized instantly.
let finalized = number_u64 == 0 || pending_block.leaf_state.is_final();
finalized
} else {
false
};
// Check if need to finalize. Genesis is always finalized instantly.
let finalized = number_u64 == 0 || pending_block.leaf_state.is_final();
let header = &pending_block.header;
let is_best = pending_block.leaf_state.is_best();
@@ -1592,7 +1581,7 @@ mod tests {
};
let mut op = backend.begin_operation().unwrap();
backend.begin_state_operation(&mut op, block_id).unwrap();
op.set_block_data(header, Some(Vec::new()), None, NewBlockState::Best).unwrap();
op.set_block_data(header, None, None, NewBlockState::Best).unwrap();
op.update_changes_trie((changes_trie_update, ChangesTrieCacheAction::Clear)).unwrap();
backend.commit_operation(op).unwrap();
+30 -37
View File
@@ -927,39 +927,22 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
BlockOrigin::Genesis | BlockOrigin::NetworkInitialSync | BlockOrigin::File => false,
};
let storage_changes = match &body {
Some(body) => {
self.backend.begin_state_operation(&mut operation.op, BlockId::Hash(parent_hash))?;
self.backend.begin_state_operation(&mut operation.op, BlockId::Hash(parent_hash))?;
// ensure parent block is finalized to maintain invariant that
// finality is called sequentially.
if finalized {
self.apply_finality_with_block_hash(operation, parent_hash, None, info.best_hash, make_notifications)?;
}
// ensure parent block is finalized to maintain invariant that
// finality is called sequentially.
if finalized {
self.apply_finality_with_block_hash(operation, parent_hash, None, info.best_hash, make_notifications)?;
}
// FIXME #1232: correct path logic for when to execute this function
let (storage_update, changes_update, storage_changes) = self.block_execution(
&operation.op,
&import_headers,
origin,
hash,
&body,
)?;
operation.op.update_cache(new_cache);
if let Some(storage_update) = storage_update {
operation.op.update_db_storage(storage_update)?;
}
if let Some(storage_changes) = storage_changes.clone() {
operation.op.update_storage(storage_changes.0, storage_changes.1)?;
}
if let Some(Some(changes_update)) = changes_update {
operation.op.update_changes_trie(changes_update)?;
}
storage_changes
},
None => Default::default()
};
// FIXME #1232: correct path logic for when to execute this function
let (storage_update, changes_update, storage_changes) = self.block_execution(
&operation.op,
&import_headers,
origin,
hash,
body.clone(),
)?;
let is_new_best = finalized || match fork_choice {
ForkChoiceStrategy::LongestChain => import_headers.post().number() > &info.best_number,
@@ -994,6 +977,17 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
leaf_state,
)?;
operation.op.update_cache(new_cache);
if let Some(storage_update) = storage_update {
operation.op.update_db_storage(storage_update)?;
}
if let Some(storage_changes) = storage_changes.clone() {
operation.op.update_storage(storage_changes.0, storage_changes.1)?;
}
if let Some(Some(changes_update)) = changes_update {
operation.op.update_changes_trie(changes_update)?;
}
operation.op.insert_aux(aux)?;
if make_notifications {
@@ -1020,7 +1014,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
import_headers: &PrePostHeader<Block::Header>,
origin: BlockOrigin,
hash: Block::Hash,
body: &[Block::Extrinsic],
body: Option<Vec<Block::Extrinsic>>,
) -> error::Result<(
Option<StorageUpdate<B, Block>>,
Option<Option<ChangesUpdate<Block>>>,
@@ -1058,7 +1052,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
let encoded_block = <Block as BlockT>::encode_from(
import_headers.pre(),
body,
&body.unwrap_or_default()
);
let (_, storage_update, changes_update) = self.executor
@@ -1529,7 +1523,7 @@ impl<'a, B, E, Block, RA> consensus::BlockImport<Block> for &'a Client<B, E, Blo
&mut self,
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
let BlockCheckParams { hash, number, parent_hash, header_only } = block;
let BlockCheckParams { hash, number, parent_hash } = block;
if let Some(h) = self.fork_blocks.as_ref().and_then(|x| x.get(&number)) {
if &hash != h {
@@ -1547,9 +1541,7 @@ impl<'a, B, E, Block, RA> consensus::BlockImport<Block> for &'a Client<B, E, Blo
.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
{
BlockStatus::InChainWithState | BlockStatus::Queued => {},
BlockStatus::Unknown => return Ok(ImportResult::UnknownParent),
BlockStatus::InChainPruned if header_only => {},
BlockStatus::InChainPruned => return Ok(ImportResult::MissingState),
BlockStatus::Unknown | BlockStatus::InChainPruned => return Ok(ImportResult::UnknownParent),
BlockStatus::KnownBad => return Ok(ImportResult::KnownBad),
}
@@ -1561,6 +1553,7 @@ impl<'a, B, E, Block, RA> consensus::BlockImport<Block> for &'a Client<B, E, Blo
BlockStatus::KnownBad => return Ok(ImportResult::KnownBad),
}
Ok(ImportResult::imported(false))
}
}
@@ -35,15 +35,11 @@ pub enum ImportResult {
KnownBad,
/// Block parent is not in the chain.
UnknownParent,
/// Parent state is missing.
MissingState,
}
/// Auxiliary data associated with an imported block result.
#[derive(Debug, Default, PartialEq, Eq)]
pub struct ImportedAux {
/// Only the header has been imported. Block body verification was skipped.
pub header_only: bool,
/// Clear all pending justification requests.
pub clear_justification_requests: bool,
/// Request a justification for the given block.
@@ -102,8 +98,6 @@ pub struct BlockCheckParams<Block: BlockT> {
pub number: NumberFor<Block>,
/// Parent hash of the block that we verify.
pub parent_hash: Block::Hash,
/// Don't check state availability
pub header_only: bool,
}
/// Data required to import a Block.
@@ -203,10 +203,6 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>(
Ok(BlockImportResult::ImportedKnown(number))
},
Ok(ImportResult::Imported(aux)) => Ok(BlockImportResult::ImportedUnknown(number, aux, peer.clone())),
Ok(ImportResult::MissingState) => {
debug!(target: "sync", "Parent state is missing for {}: {:?}, parent: {:?}", number, hash, parent_hash);
Err(BlockImportError::UnknownParent)
},
Ok(ImportResult::UnknownParent) => {
debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent_hash);
Err(BlockImportError::UnknownParent)
@@ -221,12 +217,7 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>(
}
}
};
match import_error(import_handle.check_block(BlockCheckParams {
hash,
number,
parent_hash,
header_only: block.body.is_none(),
}))? {
match import_error(import_handle.check_block(BlockCheckParams { hash, number, parent_hash }))? {
BlockImportResult::ImportedUnknown { .. } => (),
r => return Ok(r), // Any other successful result means that the block is already imported.
}
@@ -680,7 +680,6 @@ pub mod tests {
bad_justification: false,
needs_finality_proof: false,
is_new_best: true,
header_only: false,
}));
}
@@ -693,7 +692,6 @@ pub mod tests {
bad_justification: false,
needs_finality_proof: false,
is_new_best: true,
header_only: false,
}));
}
@@ -707,7 +705,6 @@ pub mod tests {
bad_justification: false,
needs_finality_proof: true,
is_new_best: true,
header_only: false,
}));
}
@@ -724,7 +721,6 @@ pub mod tests {
bad_justification: false,
needs_finality_proof: true,
is_new_best: false,
header_only: false,
},
));
}
@@ -984,7 +984,6 @@ fn allows_reimporting_change_blocks() {
bad_justification: false,
needs_finality_proof: false,
is_new_best: true,
header_only: false,
}),
);
+19 -31
View File
@@ -158,7 +158,6 @@ pub struct PeerInfo<B: BlockT> {
}
struct ForkTarget<B: BlockT> {
header_only: bool,
number: NumberFor<B>,
parent_hash: Option<B::Hash>,
peers: HashSet<PeerId>,
@@ -481,7 +480,13 @@ impl<B: BlockT> ChainSync<B> {
return;
}
trace!(target: "sync", "Downloading requested old fork {:?}", hash);
let block_status = self.client.block_status(&BlockId::Number(number - One::one()))
.unwrap_or(BlockStatus::Unknown);
if block_status == BlockStatus::InChainPruned {
trace!(target: "sync", "Refusing to sync ancient block {:?}", hash);
return;
}
self.is_idle = false;
for peer_id in &peers {
if let Some(peer) = self.peers.get_mut(peer_id) {
@@ -502,7 +507,6 @@ impl<B: BlockT> ChainSync<B> {
number,
peers: Default::default(),
parent_hash: None,
header_only: true,
})
.peers.extend(peers);
}
@@ -567,7 +571,7 @@ impl<B: BlockT> ChainSync<B> {
let major_sync = self.status().state == SyncState::Downloading;
let blocks = &mut self.blocks;
let attrs = &self.required_block_attributes;
let fork_targets = &mut self.fork_targets;
let fork_targets = &self.fork_targets;
let mut have_requests = false;
let last_finalized = self.client.info().chain.finalized_number;
let best_queued = self.best_queued_number;
@@ -658,10 +662,10 @@ impl<B: BlockT> ChainSync<B> {
}).collect()
}
PeerSyncState::AncestorSearch(num, state) => {
let matching_hash = match (blocks.get(0), self.client.block_hash(*num)) {
let block_hash_match = match (blocks.get(0), self.client.block_hash(*num)) {
(Some(block), Ok(maybe_our_block_hash)) => {
trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", num, block.hash, who);
maybe_our_block_hash.filter(|x| x == &block.hash)
maybe_our_block_hash.map_or(false, |x| x == block.hash)
},
(None, _) => {
debug!(target: "sync", "Invalid response when searching for ancestor from {}", who);
@@ -672,34 +676,27 @@ impl<B: BlockT> ChainSync<B> {
return Err(BadPeer(who, ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE))
}
};
if matching_hash.is_some() && peer.common_number < *num {
if block_hash_match && peer.common_number < *num {
peer.common_number = *num;
}
if matching_hash.is_none() && num.is_zero() {
if !block_hash_match && num.is_zero() {
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who);
return Err(BadPeer(who, GENESIS_MISMATCH_REPUTATION_CHANGE))
}
if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *num, matching_hash.is_some()) {
if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *num, block_hash_match) {
peer.state = PeerSyncState::AncestorSearch(next_num, next_state);
return Ok(OnBlockData::Request(who, ancestry_request::<B>(next_num)))
} else {
// Ancestry search is complete. Check if peer is on a stale fork unknown to us and
// add it to sync targets if necessary.
trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={:?} ({})",
trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={}",
self.best_queued_hash,
self.best_queued_number,
peer.best_hash,
peer.best_number,
matching_hash,
peer.common_number,
peer.common_number
);
let client = &self.client;
if peer.common_number < peer.best_number
&& peer.best_number < self.best_queued_number
&& matching_hash.and_then(
|h| client.block_status(&BlockId::Hash(h)).ok()
).unwrap_or(BlockStatus::Unknown) != BlockStatus::InChainPruned
{
if peer.common_number < peer.best_number && peer.best_number < self.best_queued_number {
trace!(target: "sync", "Added fork target {} for {}" , peer.best_hash, who);
self.fork_targets
.entry(peer.best_hash.clone())
@@ -707,7 +704,6 @@ impl<B: BlockT> ChainSync<B> {
number: peer.best_number,
parent_hash: None,
peers: Default::default(),
header_only: false,
})
.peers.insert(who);
}
@@ -1089,7 +1085,6 @@ impl<B: BlockT> ChainSync<B> {
number,
parent_hash: Some(header.parent_hash().clone()),
peers: Default::default(),
header_only: false,
})
.peers.insert(who);
}
@@ -1255,35 +1250,28 @@ fn peer_block_request<B: BlockT>(
/// Get pending fork sync targets for a peer.
fn fork_sync_request<B: BlockT>(
id: &PeerId,
targets: &mut HashMap<B::Hash, ForkTarget<B>>,
targets: &HashMap<B::Hash, ForkTarget<B>>,
best_num: NumberFor<B>,
finalized: NumberFor<B>,
attributes: &message::BlockAttributes,
check_block: impl Fn(&B::Hash) -> BlockStatus,
) -> Option<(B::Hash, BlockRequest<B>)>
{
targets.retain(|hash, r| if r.number > finalized {
true
} else {
trace!(target: "sync", "Removed expired fork sync request {:?} (#{})", hash, r.number);
false
});
for (hash, r) in targets {
if !r.peers.contains(id) {
continue
}
if r.number <= best_num {
trace!(target: "sync", "Downloading requested fork {:?} from {}", hash, id);
let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block);
let mut count = (r.number - finalized).saturated_into::<u32>(); // up to the last finalized block
if parent_status != BlockStatus::Unknown {
// request only single block
count = 1;
}
let attributes = if r.header_only { BlockAttributes::HEADER } else { attributes.clone() };
trace!(target: "sync", "Downloading requested fork {:?} from {}, {} blocks", hash, id, count);
return Some((hash.clone(), message::generic::BlockRequest {
id: 0,
fields: attributes,
fields: attributes.clone(),
from: message::FromBlock::Hash(hash.clone()),
to: None,
direction: message::Direction::Descending,
@@ -37,7 +37,7 @@ fn prepare_good_block() -> (TestClient, Hash, u64, PeerId, IncomingBlock<Block>)
(client, hash, number, peer_id.clone(), IncomingBlock {
hash,
header,
body: Some(Vec::new()),
body: None,
justification,
origin: Some(peer_id.clone())
})
@@ -53,7 +53,7 @@ fn import_single_good_block_works() {
match import_single_block(&mut test_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
Ok(BlockImportResult::ImportedUnknown(ref num, ref aux, ref org))
if *num == number && *aux == expected_aux && *org == Some(peer_id) => {}
r @ _ => panic!("{:?}", r)
_ => panic!()
}
}
+12 -13
View File
@@ -374,10 +374,16 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
}
}
/// Count the total number of imported blocks.
pub fn blocks_count(&self) -> u64 {
/// Count the current number of known blocks. Note that:
/// 1. this might be expensive as it creates an in-memory-copy of the chain
/// to count the blocks, thus if you have a different way of testing this
/// (e.g. `info.best_hash`) - use that.
/// 2. This is not always increasing nor accurate, as the
/// orphaned and proven-to-never-finalized blocks may be pruned at any time.
/// Therefore, this number can drop again.
pub fn blocks_count(&self) -> usize {
self.backend.as_ref().map(
|backend| backend.blocks_count()
|backend| backend.as_in_memory().blockchain().blocks_count()
).unwrap_or(0)
}
}
@@ -513,16 +519,9 @@ pub trait TestNetFactory: Sized {
net
}
fn add_full_peer(&mut self, config: &ProtocolConfig) {
self.add_full_peer_with_states(config, None)
}
/// Add a full peer.
fn add_full_peer_with_states(&mut self, config: &ProtocolConfig, keep_blocks: Option<u32>) {
let test_client_builder = match keep_blocks {
Some(keep_blocks) => TestClientBuilder::with_pruning_window(keep_blocks),
None => TestClientBuilder::with_default_backend(),
};
fn add_full_peer(&mut self, config: &ProtocolConfig) {
let test_client_builder = TestClientBuilder::with_default_backend();
let backend = test_client_builder.backend();
let (c, longest_chain) = test_client_builder.build_with_longest_chain();
let client = Arc::new(c);
@@ -680,7 +679,7 @@ pub trait TestNetFactory: Sized {
if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 {
return Async::NotReady
}
match (highest, peer.client.info().chain.best_hash) {
match (highest, peer.client.info().chain.best_number) {
(None, b) => highest = Some(b),
(Some(ref a), ref b) if a == b => {},
(Some(_), _) => return Async::NotReady,
+1 -42
View File
@@ -236,14 +236,7 @@ fn sync_no_common_longer_chain_fails() {
let mut net = TestNet::new(3);
net.peer(0).push_blocks(20, true);
net.peer(1).push_blocks(20, false);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if net.peer(0).is_major_syncing() {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
net.block_until_sync(&mut runtime);
let peer1 = &net.peers()[1];
assert!(!net.peers()[0].blockchain_canon_equals(peer1));
}
@@ -599,37 +592,3 @@ fn can_sync_explicit_forks() {
Ok(Async::Ready(()))
})).unwrap();
}
#[test]
fn syncs_header_only_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(0);
let config = ProtocolConfig::default();
net.add_full_peer_with_states(&config, None);
net.add_full_peer_with_states(&config, Some(3));
net.peer(0).push_blocks(2, false);
net.peer(1).push_blocks(2, false);
net.peer(0).push_blocks(2, true);
let small_hash = net.peer(0).client().info().chain.best_hash;
let small_number = net.peer(0).client().info().chain.best_number;
net.peer(1).push_blocks(4, false);
net.block_until_sync(&mut runtime);
// Peer 1 won't sync the small fork because common block state is missing
assert_eq!(9, net.peer(0).blocks_count());
assert_eq!(7, net.peer(1).blocks_count());
// Request explicit header-only sync request for the ancient fork.
let first_peer_id = net.peer(0).id();
net.peer(1).set_sync_fork_request(vec![first_peer_id], small_hash, small_number);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none() {
return Ok(Async::NotReady)
}
Ok(Async::Ready(()))
})).unwrap();
}
-6
View File
@@ -98,12 +98,6 @@ impl<Block, Executor, G: GenesisInit> TestClientBuilder<
pub fn backend(&self) -> Arc<Backend<Block>> {
self.backend.clone()
}
/// Create new `TestClientBuilder` with default backend and pruning window size
pub fn with_pruning_window(keep_blocks: u32) -> Self {
let backend = Arc::new(Backend::new_test(keep_blocks, 0));
Self::with_backend(backend)
}
}
impl<Executor, Backend, G: GenesisInit> TestClientBuilder<Executor, Backend, G> {