From 3963bb58ffc4cb0168ae5bef6d07b33e3af87c6d Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Fri, 18 Oct 2019 16:44:40 +0200 Subject: [PATCH] Persist block announcements (#3826) * Persist block announcements * Renamed sync requests to fork targets * Fixed pruning detection condition --- substrate/core/network/src/protocol.rs | 11 +- substrate/core/network/src/protocol/sync.rs | 209 +++++++++----------- substrate/core/network/src/test/mod.rs | 2 + substrate/core/network/src/test/sync.rs | 11 ++ 4 files changed, 105 insertions(+), 128 deletions(-) diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index dc9e6688e7..2e036cf118 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -1121,18 +1121,13 @@ impl, H: ExHashT> Protocol { }; match self.sync.on_block_announce(who.clone(), hash, &announce, is_their_best) { - sync::OnBlockAnnounce::Request(peer, req) => { - self.send_message(peer, GenericMessage::BlockRequest(req)); - return CustomMessageOutcome::None - } sync::OnBlockAnnounce::Nothing => { - // try_import is only true when we have all data required to import block + // `on_block_announce` returns `OnBlockAnnounce::ImportHeader` + // when we have all data required to import the block // in the BlockAnnounce message. This is only when: // 1) we're on light client; // AND - // - EITHER 2.1) announced block is stale; - // - OR 2.2) announced block is NEW and we normally only want to download this single block (i.e. - // there are no ascendants of this block scheduled for retrieval) + // 2) parent block is already imported and not pruned. return CustomMessageOutcome::None } sync::OnBlockAnnounce::ImportHeader => () // We proceed with the import. diff --git a/substrate/core/network/src/protocol/sync.rs b/substrate/core/network/src/protocol/sync.rs index 9f9db92289..bd8a9fe27f 100644 --- a/substrate/core/network/src/protocol/sync.rs +++ b/substrate/core/network/src/protocol/sync.rs @@ -69,9 +69,6 @@ const MAJOR_SYNC_BLOCKS: u8 = 5; /// Number of recently announced blocks to track for each peer. const ANNOUNCE_HISTORY_SIZE: usize = 64; -/// Max number of blocks to download for unknown forks. -const MAX_UNKNOWN_FORK_DOWNLOAD_LEN: u32 = 32; - /// Reputation change when a peer sent us a status message that led to a /// database read error. const BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE: i32 = -(1 << 16); @@ -125,8 +122,8 @@ pub struct ChainSync { best_importing_number: NumberFor, /// Finality proof handler. request_builder: Option>, - /// Explicit sync requests. - sync_requests: HashMap>, + /// Fork sync targets. + fork_targets: HashMap>, /// A flag that caches idle state with no pending requests. is_idle: bool, /// A type to check incoming block announcements. @@ -160,8 +157,9 @@ pub struct PeerInfo { pub best_number: NumberFor } -struct SyncRequest { +struct ForkTarget { number: NumberFor, + parent_hash: Option, peers: HashSet, } @@ -242,13 +240,11 @@ pub enum OnBlockData { /// Result of [`ChainSync::on_block_announce`]. #[derive(Debug, Clone, PartialEq, Eq)] -pub enum OnBlockAnnounce { +pub enum OnBlockAnnounce { /// The announcement does not require further handling. Nothing, /// The announcement header should be imported. ImportHeader, - /// Another block request to the given peer is necessary. - Request(PeerId, BlockRequest) } /// Result of [`ChainSync::on_block_justification`]. @@ -307,7 +303,7 @@ impl ChainSync { queue_blocks: Default::default(), best_importing_number: Zero::zero(), request_builder, - sync_requests: Default::default(), + fork_targets: Default::default(), is_idle: false, block_announce_validator, } @@ -462,7 +458,7 @@ impl ChainSync { // The implementation is similar to on_block_announce with unknown parent hash. pub fn set_sync_fork_request(&mut self, peers: Vec, hash: &B::Hash, number: NumberFor) { if peers.is_empty() { - if let Some(_) = self.sync_requests.remove(hash) { + if let Some(_) = self.fork_targets.remove(hash) { debug!(target: "sync", "Cleared sync request for block {:?} with {:?}", hash, peers); } return; @@ -494,11 +490,12 @@ impl ChainSync { } } - self.sync_requests + self.fork_targets .entry(hash.clone()) - .or_insert_with(|| SyncRequest { + .or_insert_with(|| ForkTarget { number, peers: Default::default(), + parent_hash: None, }) .peers.extend(peers); } @@ -562,17 +559,30 @@ impl ChainSync { } let blocks = &mut self.blocks; let attrs = &self.required_block_attributes; - let sync_requests = &self.sync_requests; + let fork_targets = &self.fork_targets; let mut have_requests = false; let last_finalized = self.client.info().chain.finalized_number; let best_queued = self.best_queued_number; + let client = &self.client; + let queue = &self.queue_blocks; let iter = self.peers.iter_mut().filter_map(move |(id, peer)| { if !peer.state.is_available() { trace!(target: "sync", "Peer {} is busy", id); return None } - if let Some((hash, req)) = explicit_sync_request(id, sync_requests, best_queued, last_finalized, attrs) { - trace!(target: "sync", "Downloading explicitly requested block {:?} from {}", hash, id); + if let Some((hash, req)) = fork_sync_request( + id, + fork_targets, + best_queued, + last_finalized, + attrs, + |hash| if queue.contains(hash) { + BlockStatus::Queued + } else { + client.block_status(&BlockId::Hash(*hash)).unwrap_or(BlockStatus::Unknown) + }, + ) { + trace!(target: "sync", "Downloading fork {:?} from {}", hash, id); peer.state = PeerSyncState::DownloadingStale(hash); have_requests = true; Some((id.clone(), req)) @@ -665,6 +675,26 @@ impl ChainSync { peer.state = PeerSyncState::AncestorSearch(next_num, next_state); return Ok(OnBlockData::Request(who, ancestry_request::(next_num))) } else { + // Ancestry search is complete. Check if peer is on a stale fork unknown to us and + // add it to sync targets if necessary. + trace!(target: "sync", "Ancestry search complete. Ours={} ({}), Theirs={} ({}), Common={}", + self.best_queued_hash, + self.best_queued_number, + peer.best_hash, + peer.best_number, + peer.common_number + ); + if peer.common_number < peer.best_number && peer.best_number < self.best_queued_number { + trace!(target: "sync", "Added fork target {} for {}" , peer.best_hash, who); + self.fork_targets + .entry(peer.best_hash.clone()) + .or_insert_with(|| ForkTarget { + number: peer.best_number, + parent_hash: None, + peers: Default::default(), + }) + .peers.insert(who); + } peer.state = PeerSyncState::Available; Vec::new() } @@ -922,14 +952,14 @@ impl ChainSync { self.best_queued_number = number; self.best_queued_hash = *hash; } - if let Some(_) = self.sync_requests.remove(&hash) { - trace!(target: "sync", "Completed explicit sync request {:?}", hash); + if let Some(_) = self.fork_targets.remove(&hash) { + trace!(target: "sync", "Completed fork sync {:?}", hash); } // Update common blocks for (n, peer) in self.peers.iter_mut() { if let PeerSyncState::AncestorSearch(_, _) = peer.state { - // Abort search. - peer.state = PeerSyncState::Available; + // Wait for ancestry search to complete first. + continue; } let new_common_number = if peer.best_number >= number { number @@ -952,12 +982,12 @@ impl ChainSync { /// Call when a node announces a new block. /// - /// If true is returned, then the caller MUST try to import passed + /// If `OnBlockAnnounce::ImportHeader` is returned, then the caller MUST try to import passed /// header (call `on_block_data`). The network request isn't sent /// in this case. Both hash and header is passed as an optimization /// to avoid rehashing the header. pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, announce: &BlockAnnounce, is_best: bool) - -> OnBlockAnnounce + -> OnBlockAnnounce { let header = &announce.header; let number = *header.number(); @@ -1001,6 +1031,9 @@ impl ChainSync { // known block case if known || self.is_already_downloading(&hash) { trace!(target: "sync", "Known block announce from {}: {}", who, hash); + if let Some(target) = self.fork_targets.get_mut(&hash) { + target.peers.insert(who); + } return OnBlockAnnounce::Nothing } @@ -1009,79 +1042,42 @@ impl ChainSync { match self.block_announce_validator.validate(&header, assoc_data) { Ok(Validation::Success) => (), Ok(Validation::Failure) => { - debug!(target: "sync", "block announcement validation of block {} from {} failed", hash, who); + debug!(target: "sync", "Block announcement validation of block {} from {} failed", hash, who); return OnBlockAnnounce::Nothing } Err(e) => { - error!(target: "sync", "block announcement validation errored: {}", e); + error!(target: "sync", "Block announcement validation errored: {}", e); return OnBlockAnnounce::Nothing } } - // stale block case - let requires_additional_data = !self.role.is_light(); - if number <= self.best_queued_number { - if !(known_parent || self.is_already_downloading(header.parent_hash())) { - let block_status = self.client.block_status(&BlockId::Number(*header.number())) - .unwrap_or(BlockStatus::Unknown); - if block_status == BlockStatus::InChainPruned { - trace!( - target: "sync", - "Ignored unknown ancient block announced from {}: {} {:?}", who, hash, header - ); - return OnBlockAnnounce::Nothing - } - trace!( - target: "sync", - "Considering new unknown stale block announced from {}: {} {:?}", who, hash, header - ); - if let Some(request) = self.download_unknown_stale(&who, &hash) { - if requires_additional_data { - return OnBlockAnnounce::Request(who, request) - } else { - return OnBlockAnnounce::ImportHeader - } - } else { - return OnBlockAnnounce::Nothing - } - } else { - if ancient_parent { - trace!(target: "sync", "Ignored ancient stale block announced from {}: {} {:?}", who, hash, header); - return OnBlockAnnounce::Nothing - } - if let Some(request) = self.download_stale(&who, &hash) { - if requires_additional_data { - return OnBlockAnnounce::Request(who, request) - } else { - return OnBlockAnnounce::ImportHeader - } - } else { - return OnBlockAnnounce::Nothing - } - } - } - if ancient_parent { trace!(target: "sync", "Ignored ancient block announced from {}: {} {:?}", who, hash, header); return OnBlockAnnounce::Nothing } - trace!(target: "sync", "Considering new block announced from {}: {} {:?}", who, hash, header); - - let (range, request) = match self.select_new_blocks(who.clone()) { - Some((range, request)) => (range, request), - None => return OnBlockAnnounce::Nothing - }; - - let is_required_data_available = !requires_additional_data - && range.end - range.start == One::one() - && range.start == *header.number(); - - if !is_required_data_available { - return OnBlockAnnounce::Request(who, request) + let requires_additional_data = !self.role.is_light() || !known_parent; + if !requires_additional_data { + trace!(target: "sync", "Importing new header announced from {}: {} {:?}", who, hash, header); + return OnBlockAnnounce::ImportHeader } - OnBlockAnnounce::ImportHeader + if number <= self.best_queued_number { + trace!( + target: "sync", + "Added sync target for block announced from {}: {} {:?}", who, hash, header + ); + self.fork_targets + .entry(hash.clone()) + .or_insert_with(|| ForkTarget { + number, + parent_hash: Some(header.parent_hash().clone()), + peers: Default::default(), + }) + .peers.insert(who); + } + + OnBlockAnnounce::Nothing } /// Call when a peer has disconnected. @@ -1117,40 +1113,6 @@ impl ChainSync { }) } - /// Download old block with known parent. - fn download_stale(&mut self, who: &PeerId, hash: &B::Hash) -> Option> { - let peer = self.peers.get_mut(who)?; - if !peer.state.is_available() { - return None - } - peer.state = PeerSyncState::DownloadingStale(*hash); - Some(message::generic::BlockRequest { - id: 0, - fields: self.required_block_attributes.clone(), - from: message::FromBlock::Hash(*hash), - to: None, - direction: message::Direction::Ascending, - max: Some(1), - }) - } - - /// Download old block with unknown parent. - fn download_unknown_stale(&mut self, who: &PeerId, hash: &B::Hash) -> Option> { - let peer = self.peers.get_mut(who)?; - if !peer.state.is_available() { - return None - } - peer.state = PeerSyncState::DownloadingStale(*hash); - Some(message::generic::BlockRequest { - id: 0, - fields: self.required_block_attributes.clone(), - from: message::FromBlock::Hash(*hash), - to: None, - direction: message::Direction::Descending, - max: Some(MAX_UNKNOWN_FORK_DOWNLOAD_LEN), - }) - } - /// Select a range of new blocks to download from the given peer. fn select_new_blocks(&mut self, who: PeerId) -> Option<(Range>, BlockRequest)> { // when there are too many blocks in the queue => do not try to download new blocks @@ -1298,28 +1260,35 @@ fn peer_block_request( } } -/// Get pending explicit sync request for a peer. -fn explicit_sync_request( +/// Get pending fork sync targets for a peer. +fn fork_sync_request( id: &PeerId, - requests: &HashMap>, + targets: &HashMap>, best_num: NumberFor, finalized: NumberFor, attributes: &message::BlockAttributes, + check_block: impl Fn(&B::Hash) -> BlockStatus, ) -> Option<(B::Hash, BlockRequest)> { - for (hash, r) in requests { + for (hash, r) in targets { if !r.peers.contains(id) { continue } if r.number <= best_num { trace!(target: "sync", "Downloading requested fork {:?} from {}", hash, id); + let parent_status = r.parent_hash.as_ref().map_or(BlockStatus::Unknown, check_block); + let mut count = (r.number - finalized).saturated_into::(); // up to the last finalized block + if parent_status != BlockStatus::Unknown { + // request only single block + count = 1; + } return Some((hash.clone(), message::generic::BlockRequest { id: 0, fields: attributes.clone(), from: message::FromBlock::Hash(hash.clone()), to: None, direction: message::Direction::Descending, - max: Some((r.number - finalized).saturated_into::()), // up to the last finalized block + max: Some(count), })) } } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 2c87ba1ac8..92e747280b 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -704,7 +704,9 @@ pub trait TestNetFactory: Sized { fn poll(&mut self) { self.mut_peers(|peers| { for peer in peers { + trace!(target: "sync", "-- Polling {}", peer.id()); peer.network.poll().unwrap(); + trace!(target: "sync", "-- Polling complete {}", peer.id()); // We poll `imported_blocks_stream`. while let Ok(Async::Ready(Some(notification))) = peer.imported_blocks_stream.poll() { diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index b5b137a31a..b1b2b9d407 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -457,6 +457,17 @@ fn can_sync_small_non_best_forks() { } Ok(Async::Ready(())) })).unwrap(); + net.block_until_sync(&mut runtime); + + let another_fork = net.peer(0).push_blocks_at(BlockId::Number(35), 2, true); + net.peer(0).announce_block(another_fork, Vec::new()); + runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> { + net.poll(); + if net.peer(1).client().header(&BlockId::Hash(another_fork)).unwrap().is_none() { + return Ok(Async::NotReady) + } + Ok(Async::Ready(())) + })).unwrap(); } #[test]