From dc41558b6ef69aad59bb4cfcd9c19c91c1d5c3a9 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Tue, 18 Jun 2019 10:35:28 +0200 Subject: [PATCH] Change `network::sync::extra_requests` to not send. (#2890) * Change network::sync::extra_requests to not send. Instead it only maintains the invariants and leaves the actual I/O part to the parent module (i.e. `sync`). * Update Cargo.lock. --- substrate/Cargo.lock | 13 + substrate/core/network/Cargo.toml | 2 + substrate/core/network/src/protocol.rs | 3 +- substrate/core/network/src/protocol/sync.rs | 137 ++-- .../src/protocol/sync/extra_requests.rs | 636 +++++++++--------- 5 files changed, 417 insertions(+), 374 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 70780e409c..4f9a55e87d 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -2733,6 +2733,17 @@ name = "quick-error" version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "quickcheck" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "env_logger 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rand_core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "quote" version = "0.6.12" @@ -4280,6 +4291,7 @@ dependencies = [ "lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "quickcheck 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.92 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5896,6 +5908,7 @@ dependencies = [ "checksum pwasm-utils 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "efb0dcbddbb600f47a7098d33762a00552c671992171637f5bb310b37fe1f0e4" "checksum quick-error 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5fb6ccf8db7bbcb9c2eae558db5ab4f3da1c2a87e4e597ed394726bc8ea6ca1d" "checksum quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9274b940887ce9addde99c4eee6b5c44cc494b182b97e73dc8ffdcb3397fd3f0" +"checksum quickcheck 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)" = "9c35d9c36a562f37eca96e79f66d5fd56eefbc22560dacc4a864cabd2d277456" "checksum quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)" = "faf4799c5d274f3868a4aae320a0a182cbd2baee377b378f080e16a23e9d80db" "checksum rand 0.3.23 (registry+https://github.com/rust-lang/crates.io-index)" = "64ac302d8f83c0c1974bf758f6b041c6c8ada916fbb44a609158ca8b064cc76c" "checksum rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" diff --git a/substrate/core/network/Cargo.toml b/substrate/core/network/Cargo.toml index 832a9dda88..56a69039c3 100644 --- a/substrate/core/network/Cargo.toml +++ b/substrate/core/network/Cargo.toml @@ -46,6 +46,8 @@ zeroize = "0.6.0" [dev-dependencies] env_logger = { version = "0.6" } keyring = { package = "substrate-keyring", path = "../../core/keyring" } +quickcheck = "0.8.5" +rand = "0.6.5" test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" } test_runtime = { package = "substrate-test-runtime", path = "../../core/test-runtime" } consensus = { package = "substrate-consensus-common", path = "../../core/consensus/common", features = ["test-helpers"] } diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 4b300e7a3e..13be819fae 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -765,8 +765,7 @@ impl, H: ExHashT> Protocol { let outcome = self.sync.on_block_justification_data( &mut ProtocolContext::new(&mut self.context_data, network_out), peer, - request, - response, + response ); if let Some((origin, hash, nb, just)) = outcome { diff --git a/substrate/core/network/src/protocol/sync.rs b/substrate/core/network/src/protocol/sync.rs index 751e3f872e..a6d87f33ff 100644 --- a/substrate/core/network/src/protocol/sync.rs +++ b/substrate/core/network/src/protocol/sync.rs @@ -40,7 +40,7 @@ use client::{BlockStatus, ClientInfo}; use consensus::{BlockOrigin, import_queue::{IncomingBlock, SharedFinalityProofRequestBuilder}}; use client::error::Error as ClientError; use blocks::BlockCollection; -use extra_requests::ExtraRequestsAggregator; +use extra_requests::ExtraRequests; use runtime_primitives::traits::{ Block as BlockT, Header as HeaderT, NumberFor, Zero, One, CheckedSub, SaturatedConversion @@ -89,7 +89,7 @@ pub trait Context { fn send_block_request(&mut self, who: PeerId, request: message::BlockRequest); } -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct PeerSync { pub common_number: NumberFor, pub best_hash: B::Hash, @@ -98,8 +98,8 @@ pub(crate) struct PeerSync { pub recently_announced: VecDeque, } -#[derive(Debug)] /// Peer sync status. +#[derive(Debug)] pub(crate) struct PeerInfo { /// Their best block hash. pub best_hash: B::Hash, @@ -129,16 +129,17 @@ pub(crate) enum PeerSyncState { /// Relay chain sync strategy. pub struct ChainSync { - _genesis_hash: B::Hash, peers: HashMap>, blocks: BlockCollection, best_queued_number: NumberFor, best_queued_hash: B::Hash, role: Roles, required_block_attributes: message::BlockAttributes, - extra_requests: ExtraRequestsAggregator, + extra_finality_proofs: ExtraRequests, + extra_justifications: ExtraRequests, queue_blocks: HashSet, best_importing_number: NumberFor, + request_builder: Option>, } /// Reported sync state. @@ -179,26 +180,26 @@ impl Status { impl ChainSync { /// Create a new instance. Pass the initial known state of the chain. - pub(crate) fn new( - role: Roles, - info: &ClientInfo, - ) -> Self { - let mut required_block_attributes = message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION; + pub(crate) fn new(role: Roles, info: &ClientInfo) -> Self { + let mut required_block_attributes = + message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION; + if role.is_full() { required_block_attributes |= message::BlockAttributes::BODY; } ChainSync { - _genesis_hash: info.chain.genesis_hash, peers: HashMap::new(), blocks: BlockCollection::new(), best_queued_hash: info.chain.best_hash, best_queued_number: info.chain.best_number, - extra_requests: ExtraRequestsAggregator::new(), + extra_finality_proofs: ExtraRequests::new(), + extra_justifications: ExtraRequests::new(), role, required_block_attributes, queue_blocks: Default::default(), best_importing_number: Zero::zero(), + request_builder: None, } } @@ -499,9 +500,9 @@ impl ChainSync { &mut self, protocol: &mut dyn Context, who: PeerId, - _request: message::BlockRequest, response: message::BlockResponse, - ) -> Option<(PeerId, B::Hash, NumberFor, Justification)> { + ) -> Option<(PeerId, B::Hash, NumberFor, Justification)> + { let peer = if let Some(peer) = self.peers.get_mut(&who) { peer } else { @@ -522,12 +523,8 @@ impl ChainSync { protocol.disconnect_peer(who); return None; } - - return self.extra_requests.justifications().on_response( - who, - response.justification, - ); - }, + return self.extra_justifications.on_response(who, response.justification) + } None => { // we might have asked the peer for a justification on a block that we thought it had // (regardless of whether it had a justification for it or not). @@ -536,7 +533,7 @@ impl ChainSync { hash, ); return None; - }, + } } } @@ -574,10 +571,7 @@ impl ChainSync { return None; } - return self.extra_requests.finality_proofs().on_response( - who, - response.proof, - ); + return self.extra_finality_proofs.on_response(who, response.proof) } self.maintain_sync(protocol); @@ -603,13 +597,47 @@ impl ChainSync { for peer in peers { self.download_new(protocol, peer); } - self.extra_requests.dispatch(&mut self.peers, protocol); + self.tick(protocol) } /// Called periodically to perform any time-based actions. Must be called at a regular /// interval. pub fn tick(&mut self, protocol: &mut dyn Context) { - self.extra_requests.dispatch(&mut self.peers, protocol); + self.send_justification_requests(protocol); + self.send_finality_proof_request(protocol) + } + + fn send_justification_requests(&mut self, protocol: &mut dyn Context) { + let mut matcher = self.extra_justifications.matcher(); + while let Some((peer, request)) = matcher.next(&self.peers) { + self.peers.get_mut(&peer) + .expect("`Matcher::next` guarantees the `PeerId` comes from the given peers; qed") + .state = PeerSyncState::DownloadingJustification(request.0); + protocol.send_block_request(peer, message::generic::BlockRequest { + id: 0, + fields: message::BlockAttributes::JUSTIFICATION, + from: message::FromBlock::Hash(request.0), + to: None, + direction: message::Direction::Ascending, + max: Some(1) + }) + } + } + + fn send_finality_proof_request(&mut self, protocol: &mut dyn Context) { + let mut matcher = self.extra_finality_proofs.matcher(); + while let Some((peer, request)) = matcher.next(&self.peers) { + self.peers.get_mut(&peer) + .expect("`Matcher::next` guarantees the `PeerId` comes from the given peers; qed") + .state = PeerSyncState::DownloadingFinalityProof(request.0); + protocol.send_finality_proof_request(peer, message::generic::FinalityProofRequest { + id: 0, + block: request.0, + request: self.request_builder.as_ref() + .map(|builder| builder.build_request_data(&request.0)) + .unwrap_or_default() + }) + } } /// Request a justification for the given block. @@ -617,24 +645,22 @@ impl ChainSync { /// Uses `protocol` to queue a new justification request and tries to dispatch all pending /// requests. pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor, protocol: &mut dyn Context) { - self.extra_requests.justifications().queue_request( - (*hash, number), - |base, block| protocol.client().is_descendent_of(base, block), - ); - - self.extra_requests.justifications().dispatch(&mut self.peers, protocol); + self.extra_justifications.schedule((*hash, number), |base, block| { + protocol.client().is_descendent_of(base, block) + }); + self.send_justification_requests(protocol) } /// Clears all pending justification requests. pub fn clear_justification_requests(&mut self) { - self.extra_requests.justifications().clear(); + self.extra_justifications.reset() } /// Call this when a justification has been processed by the import queue, with or without /// errors. pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor, success: bool) { let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; - if !self.extra_requests.justifications().on_import_result((hash, number), finalization_result) { + if !self.extra_justifications.try_finalize_root((hash, number), finalization_result, true) { debug!(target: "sync", "Got justification import result for unknown justification {:?} {:?} request.", hash, number, @@ -646,12 +672,10 @@ impl ChainSync { /// /// Queues a new finality proof request and tries to dispatch all pending requests. pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor, protocol: &mut dyn Context) { - self.extra_requests.finality_proofs().queue_request( - (*hash, number), - |base, block| protocol.client().is_descendent_of(base, block), - ); - - self.extra_requests.finality_proofs().dispatch(&mut self.peers, protocol); + self.extra_finality_proofs.schedule((*hash, number), |base, block| { + protocol.client().is_descendent_of(base, block) + }); + self.send_finality_proof_request(protocol) } pub fn finality_proof_import_result( @@ -659,11 +683,11 @@ impl ChainSync { request_block: (B::Hash, NumberFor), finalization_result: Result<(B::Hash, NumberFor), ()>, ) { - self.extra_requests.finality_proofs().on_import_result(request_block, finalization_result); + self.extra_finality_proofs.try_finalize_root(request_block, finalization_result, true); } - pub fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder) { - self.extra_requests.finality_proofs().essence().0 = Some(request_builder); + pub fn set_finality_proof_request_builder(&mut self, builder: SharedFinalityProofRequestBuilder) { + self.request_builder = Some(builder) } /// Notify about successful import of the given block. @@ -673,13 +697,21 @@ impl ChainSync { /// Notify about finalization of the given block. pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor, protocol: &mut dyn Context) { - if let Err(err) = self.extra_requests.on_block_finalized( - hash, - number, - &|base, block| protocol.client().is_descendent_of(base, block), - ) { - warn!(target: "sync", "Error cleaning up pending extra data requests: {:?}", err); - }; + let r = self.extra_finality_proofs.on_block_finalized(hash, number, |base, block| { + protocol.client().is_descendent_of(base, block) + }); + + if let Err(err) = r { + warn!(target: "sync", "Error cleaning up pending extra finality proof data requests: {:?}", err); + } + + let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| { + protocol.client().is_descendent_of(base, block) + }); + + if let Err(err) = r { + warn!(target: "sync", "Error cleaning up pending extra justification data requests: {:?}", err); + } } fn block_queued(&mut self, hash: &B::Hash, number: NumberFor) { @@ -859,7 +891,8 @@ impl ChainSync { pub(crate) fn peer_disconnected(&mut self, protocol: &mut dyn Context, who: PeerId) { self.blocks.clear_peer_download(&who); self.peers.remove(&who); - self.extra_requests.peer_disconnected(who); + self.extra_justifications.peer_disconnected(&who); + self.extra_finality_proofs.peer_disconnected(&who); self.maintain_sync(protocol); } diff --git a/substrate/core/network/src/protocol/sync/extra_requests.rs b/substrate/core/network/src/protocol/sync/extra_requests.rs index 589a5d3787..c4f6de05a2 100644 --- a/substrate/core/network/src/protocol/sync/extra_requests.rs +++ b/substrate/core/network/src/protocol/sync/extra_requests.rs @@ -14,294 +14,118 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use std::collections::{HashMap, HashSet, VecDeque}; -use std::time::{Duration, Instant}; -use log::{trace, warn}; use client::error::Error as ClientError; -use consensus::import_queue::SharedFinalityProofRequestBuilder; +use crate::protocol::sync::{PeerSync, PeerSyncState}; use fork_tree::ForkTree; use libp2p::PeerId; -use runtime_primitives::Justification; +use log::warn; use runtime_primitives::traits::{Block as BlockT, NumberFor}; -use crate::protocol::message; -use crate::protocol::sync::{Context, PeerSync, PeerSyncState}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::time::{Duration, Instant}; // Time to wait before trying to get the same extra data from the same peer. const EXTRA_RETRY_WAIT: Duration = Duration::from_secs(10); /// Pending extra data request for the given block (hash and number). -type ExtraRequest = (::Hash, NumberFor); - -/// Extra requests processor. -pub(crate) trait ExtraRequestsEssence { - type Response; - - /// Name of request type to display in logs. - fn type_name(&self) -> &'static str; - /// Send network message corresponding to the request. - fn send_network_request(&self, protocol: &mut dyn Context, peer: PeerId, request: ExtraRequest); - /// Create peer state for peer that is downloading extra data. - fn peer_downloading_state(&self, block: B::Hash) -> PeerSyncState; -} - -/// Manages all extra data requests required for sync. -pub(crate) struct ExtraRequestsAggregator { - /// Manages justifications requests. - justifications: ExtraRequests, - /// Manages finality proof requests. - finality_proofs: ExtraRequests>, -} - -impl ExtraRequestsAggregator { - pub(crate) fn new() -> Self { - ExtraRequestsAggregator { - justifications: ExtraRequests::new(JustificationsRequestsEssence), - finality_proofs: ExtraRequests::new(FinalityProofRequestsEssence(None)), - } - } - - pub(crate) fn justifications(&mut self) -> &mut ExtraRequests { - &mut self.justifications - } - - pub(crate) fn finality_proofs(&mut self) -> &mut ExtraRequests> { - &mut self.finality_proofs - } - - /// Dispatches all possible pending requests to the given peers. - pub(crate) fn dispatch(&mut self, peers: &mut HashMap>, protocol: &mut dyn Context) { - self.justifications.dispatch(peers, protocol); - self.finality_proofs.dispatch(peers, protocol); - } - - /// Removes any pending extra requests for blocks lower than the - /// given best finalized. - pub(crate) fn on_block_finalized( - &mut self, - best_finalized_hash: &B::Hash, - best_finalized_number: NumberFor, - is_descendent_of: &F, - ) -> Result<(), fork_tree::Error> - where F: Fn(&B::Hash, &B::Hash) -> Result - { - self.justifications.on_block_finalized(best_finalized_hash, best_finalized_number, is_descendent_of)?; - self.finality_proofs.on_block_finalized(best_finalized_hash, best_finalized_number, is_descendent_of)?; - Ok(()) - } - - /// Retry any pending request if a peer disconnected. - pub(crate) fn peer_disconnected(&mut self, who: PeerId) { - self.justifications.peer_disconnected(&who); - self.finality_proofs.peer_disconnected(&who); - } -} +pub(crate) type ExtraRequest = (::Hash, NumberFor); /// Manages pending block extra data (e.g. justification) requests. +/// /// Multiple extras may be requested for competing forks, or for the same branch /// at different (increasing) heights. This structure will guarantee that extras /// are fetched in-order, and that obsolete changes are pruned (when finalizing a /// competing fork). -pub(crate) struct ExtraRequests { +#[derive(Debug)] +pub(crate) struct ExtraRequests { tree: ForkTree, ()>, + /// requests which have been queued for later processing pending_requests: VecDeque>, - peer_requests: HashMap>, - previous_requests: HashMap, Vec<(PeerId, Instant)>>, + /// requests which are currently underway to some peer + active_requests: HashMap>, + /// previous requests without response + failed_requests: HashMap, Vec<(PeerId, Instant)>>, + /// successful requests importing_requests: HashSet>, - essence: Essence, } -impl> ExtraRequests { - fn new(essence: Essence) -> Self { +impl ExtraRequests { + pub(crate) fn new() -> Self { ExtraRequests { tree: ForkTree::new(), pending_requests: VecDeque::new(), - peer_requests: HashMap::new(), - previous_requests: HashMap::new(), + active_requests: HashMap::new(), + failed_requests: HashMap::new(), importing_requests: HashSet::new(), - essence, } } - /// Get mutable reference to the requests essence. - pub(crate) fn essence(&mut self) -> &mut Essence { - &mut self.essence + /// Reset all state as if returned from `new`. + pub(crate) fn reset(&mut self) { + self.tree = ForkTree::new(); + self.pending_requests.clear(); + self.active_requests.clear(); + self.failed_requests.clear(); } - /// Dispatches all possible pending requests to the given peers. Peers are - /// filtered according to the current known best block (i.e. we won't send a - /// extra request for block #10 to a peer at block #2), and we also - /// throttle requests to the same peer if a previous justification request - /// yielded no results. - pub(crate) fn dispatch(&mut self, peers: &mut HashMap>, protocol: &mut dyn Context) { - if self.pending_requests.is_empty() { - return; - } - - let initial_pending_requests = self.pending_requests.len(); - - // clean up previous failed requests so we can retry again - for (_, requests) in self.previous_requests.iter_mut() { - requests.retain(|(_, instant)| instant.elapsed() < EXTRA_RETRY_WAIT); - } - - let mut available_peers = peers.iter().filter_map(|(peer, sync)| { - // don't request to any peers that already have pending requests or are unavailable - if sync.state != PeerSyncState::Available || self.peer_requests.contains_key(&peer) { - None - } else { - Some((peer.clone(), sync.best_number)) - } - }).collect::>(); - - let mut last_peer = available_peers.back().map(|p| p.0.clone()); - let mut unhandled_requests = VecDeque::new(); - - loop { - let (peer, peer_best_number) = match available_peers.pop_front() { - Some(p) => p, - _ => break, - }; - - // only ask peers that have synced past the block number that we're - // asking the extra for and to whom we haven't already made - // the same request recently - let peer_eligible = { - let request = match self.pending_requests.front() { - Some(r) => r.clone(), - _ => break, - }; - - peer_best_number >= request.1 && - !self.previous_requests - .get(&request) - .map(|requests| requests.iter().any(|i| i.0 == peer)) - .unwrap_or(false) - }; - - if !peer_eligible { - available_peers.push_back((peer.clone(), peer_best_number)); - - // we tried all peers and none can answer this request - if Some(peer) == last_peer { - last_peer = available_peers.back().map(|p| p.0.clone()); - - let request = self.pending_requests.pop_front() - .expect("verified to be Some in the beginning of the loop; qed"); - - unhandled_requests.push_back(request); - } - - continue; - } - - last_peer = available_peers.back().map(|p| p.0.clone()); - - let request = self.pending_requests.pop_front() - .expect("verified to be Some in the beginning of the loop; qed"); - - self.peer_requests.insert(peer.clone(), request); - - peers.get_mut(&peer) - .expect("peer was is taken from available_peers; available_peers is a subset of peers; qed") - .state = self.essence.peer_downloading_state(request.0.clone()); - - trace!(target: "sync", "Requesting {} for block #{} from {}", self.essence.type_name(), request.0, peer); - self.essence.send_network_request(protocol, peer, request); - } - - self.pending_requests.append(&mut unhandled_requests); - - trace!(target: "sync", "Dispatched {} {} requests ({} pending)", - initial_pending_requests - self.pending_requests.len(), - self.essence.type_name(), - self.pending_requests.len(), - ); + /// Returns an iterator-like struct that yields peers which extra + /// requests can be sent to. + pub(crate) fn matcher(&mut self) -> Matcher { + Matcher::new(self) } - /// Queue a extra data request (without dispatching it). - pub(crate) fn queue_request(&mut self, request: ExtraRequest, is_descendent_of: F) + /// Queue an extra data request to be considered by the `Matcher`. + pub(crate) fn schedule(&mut self, request: ExtraRequest, is_descendent_of: F) where F: Fn(&B::Hash, &B::Hash) -> Result { - match self.tree.import(request.0.clone(), request.1.clone(), (), &is_descendent_of) { + match self.tree.import(request.0, request.1, (), &is_descendent_of) { Ok(true) => { // this is a new root so we add it to the current `pending_requests` - self.pending_requests.push_back((request.0, request.1)); - }, + self.pending_requests.push_back((request.0, request.1)) + } Err(err) => { - warn!(target: "sync", "Failed to insert requested {} {:?} {:?} into tree: {:?}", - self.essence.type_name(), - request.0, - request.1, - err, - ); - return; - }, - _ => {}, + warn!(target: "sync", "Failed to insert request {:?} into tree: {:?}", request, err); + return + } + _ => () } } /// Retry any pending request if a peer disconnected. - fn peer_disconnected(&mut self, who: &PeerId) { - if let Some(request) = self.peer_requests.remove(who) { - self.pending_requests.push_front(request); + pub(crate) fn peer_disconnected(&mut self, who: &PeerId) { + if let Some(request) = self.active_requests.remove(who) { + self.pending_requests.push_front(request) } } - /// Process the import result of an extra. - /// Queues a retry in case the import failed. - /// Returns true if import has been queued. - pub(crate) fn on_import_result( - &mut self, - request: (B::Hash, NumberFor), - finalization_result: Result<(B::Hash, NumberFor), ()>, - ) -> bool { - self.try_finalize_root(request, finalization_result, true) - } - - /// Processes the response for the request previously sent to the given - /// peer. Queues a retry in case the given justification - /// was `None`. - pub(crate) fn on_response( - &mut self, - who: PeerId, - response: Option, - ) -> Option<(PeerId, B::Hash, NumberFor, Essence::Response)> { + /// Processes the response for the request previously sent to the given peer. + pub(crate) fn on_response(&mut self, who: PeerId, resp: Option) -> Option<(PeerId, B::Hash, NumberFor, R)> { // we assume that the request maps to the given response, this is // currently enforced by the outer network protocol before passing on // messages to chain sync. - if let Some(request) = self.peer_requests.remove(&who) { - if let Some(response) = response { + if let Some(request) = self.active_requests.remove(&who) { + if let Some(r) = resp { self.importing_requests.insert(request); - return Some((who, request.0, request.1, response)); + return Some((who, request.0, request.1, r)) } - - self.previous_requests - .entry(request) - .or_insert(Vec::new()) - .push((who, Instant::now())); + self.failed_requests.entry(request).or_insert(Vec::new()).push((who, Instant::now())); self.pending_requests.push_front(request); } - None } - /// Removes any pending extra requests for blocks lower than the - /// given best finalized. - fn on_block_finalized( + /// Removes any pending extra requests for blocks lower than the given best finalized. + pub(crate) fn on_block_finalized( &mut self, best_finalized_hash: &B::Hash, best_finalized_number: NumberFor, - is_descendent_of: F, + is_descendent_of: F ) -> Result<(), fork_tree::Error> where F: Fn(&B::Hash, &B::Hash) -> Result { - let is_scheduled_root = self.try_finalize_root( - (*best_finalized_hash, best_finalized_number), - Ok((*best_finalized_hash, best_finalized_number)), - false, - ); - if is_scheduled_root { - return Ok(()); + let request = (*best_finalized_hash, best_finalized_number); + + if self.try_finalize_root::<()>(request, Ok(request), false) { + return Ok(()) } self.tree.finalize(best_finalized_hash, best_finalized_number, &is_descendent_of)?; @@ -309,121 +133,250 @@ impl> ExtraRequests { let roots = self.tree.roots().collect::>(); self.pending_requests.retain(|(h, n)| roots.contains(&(h, n, &()))); - self.peer_requests.retain(|_, (h, n)| roots.contains(&(h, n, &()))); - self.previous_requests.retain(|(h, n), _| roots.contains(&(h, n, &()))); + self.active_requests.retain(|_, (h, n)| roots.contains(&(h, n, &()))); + self.failed_requests.retain(|(h, n), _| roots.contains(&(h, n, &()))); Ok(()) } - /// Clear all data. - pub(crate) fn clear(&mut self) { - self.tree = ForkTree::new(); - self.pending_requests.clear(); - self.peer_requests.clear(); - self.previous_requests.clear(); - } - /// Try to finalize pending root. + /// /// Returns true if import of this request has been scheduled. - fn try_finalize_root( + pub(crate) fn try_finalize_root( &mut self, - request: (B::Hash, NumberFor), - finalization_result: Result<(B::Hash, NumberFor), ()>, - reschedule_on_failure: bool, - ) -> bool { + request: ExtraRequest, + result: Result, E>, + reschedule_on_failure: bool + ) -> bool + { if !self.importing_requests.remove(&request) { - return false; + return false } - let (finalized_hash, finalized_number) = match finalization_result { - Ok((finalized_hash, finalized_number)) => (finalized_hash, finalized_number), + let (finalized_hash, finalized_number) = match result { + Ok(req) => (req.0, req.1), Err(_) => { if reschedule_on_failure { self.pending_requests.push_front(request); } - return true; - }, + return true + } }; if self.tree.finalize_root(&finalized_hash).is_none() { - warn!(target: "sync", "Imported {} for {:?} {:?} which isn't a root in the tree: {:?}", - self.essence.type_name(), + warn!(target: "sync", "Imported {:?} {:?} which isn't a root in the tree: {:?}", finalized_hash, finalized_number, - self.tree.roots().collect::>(), + self.tree.roots().collect::>() ); - return true; - }; + return true + } - self.previous_requests.clear(); - self.peer_requests.clear(); - self.pending_requests = - self.tree.roots().map(|(h, n, _)| (h.clone(), n.clone())).collect(); + self.failed_requests.clear(); + self.active_requests.clear(); + self.pending_requests.clear(); + self.pending_requests.extend(self.tree.roots().map(|(&h, &n, _)| (h, n))); true } } -pub(crate) struct JustificationsRequestsEssence; - -impl ExtraRequestsEssence for JustificationsRequestsEssence { - type Response = Justification; - - fn type_name(&self) -> &'static str { - "justification" - } - - fn send_network_request(&self, protocol: &mut dyn Context, peer: PeerId, request: ExtraRequest) { - protocol.send_block_request(peer, message::generic::BlockRequest { - id: 0, - fields: message::BlockAttributes::JUSTIFICATION, - from: message::FromBlock::Hash(request.0), - to: None, - direction: message::Direction::Ascending, - max: Some(1), - }) - } - - fn peer_downloading_state(&self, block: B::Hash) -> PeerSyncState { - PeerSyncState::DownloadingJustification(block) - } +/// Matches peers with pending extra requests. +#[derive(Debug)] +pub(crate) struct Matcher<'a, B: BlockT> { + /// Length of pending requests collection. + /// Used to ensure we do not loop more than once over all pending requests. + remaining: usize, + extras: &'a mut ExtraRequests } -pub(crate) struct FinalityProofRequestsEssence(pub Option>); - -impl ExtraRequestsEssence for FinalityProofRequestsEssence { - type Response = Vec; - - fn type_name(&self) -> &'static str { - "finality proof" +impl<'a, B: BlockT> Matcher<'a, B> { + fn new(extras: &'a mut ExtraRequests) -> Self { + Matcher { + remaining: extras.pending_requests.len(), + extras + } } - fn send_network_request(&self, protocol: &mut dyn Context, peer: PeerId, request: ExtraRequest) { - protocol.send_finality_proof_request(peer, message::generic::FinalityProofRequest { - id: 0, - block: request.0, - request: self.0.as_ref() - .map(|builder| builder.build_request_data(&request.0)) - .unwrap_or_default(), - }) - } + /// Finds a peer to which a pending request can be sent. + /// + /// Peers are filtered according to the current known best block (i.e. we won't + /// send an extra request for block #10 to a peer at block #2), and we also + /// throttle requests to the same peer if a previous request yielded no results. + /// + /// This method returns as soon as it finds a peer that should be able to answer + /// our request. If no request is pending or no peer can handle it, `None` is + /// returned instead. + /// + /// # Note + /// + /// The returned `PeerId` (if any) is guaranteed to come from the given `peers` + /// argument. + pub(crate) fn next(&mut self, peers: &HashMap>) -> Option<(PeerId, ExtraRequest)> { + if self.remaining == 0 { + return None + } - fn peer_downloading_state(&self, block: B::Hash) -> PeerSyncState { - PeerSyncState::DownloadingFinalityProof(block) + // clean up previously failed requests so we can retry again + for requests in self.extras.failed_requests.values_mut() { + requests.retain(|(_, instant)| instant.elapsed() < EXTRA_RETRY_WAIT); + } + + while let Some(request) = self.extras.pending_requests.pop_front() { + for (peer, sync) in peers.iter().filter(|(_, sync)| sync.state == PeerSyncState::Available) { + // only ask peers that have synced at least up to the block number that we're asking the extra for + if sync.best_number < request.1 { + continue + } + // don't request to any peers that already have pending requests + if self.extras.active_requests.contains_key(peer) { + continue + } + // only ask if the same request has not failed for this peer before + if self.extras.failed_requests.get(&request).map(|rr| rr.iter().any(|i| &i.0 == peer)).unwrap_or(false) { + continue + } + self.extras.active_requests.insert(peer.clone(), request); + return Some((peer.clone(), request)) + } + + self.extras.pending_requests.push_back(request); + self.remaining -= 1; + + if self.remaining == 0 { + break + } + } + + None } } #[cfg(test)] mod tests { + use crate::protocol::sync::PeerSync; use client::error::Error as ClientError; - use test_client::runtime::{Block, Hash}; - use super::ExtraRequestsAggregator; + use quickcheck::{Arbitrary, Gen, QuickCheck, StdThreadGen}; + use rand::Rng; + use std::collections::{HashMap, HashSet}; + use super::*; + use test_client::runtime::{Block, BlockNumber, Hash}; + + #[test] + fn requests_are_processed_in_order() { + fn property(mut peers: ArbitraryPeers) { + let mut requests = ExtraRequests::::new(); + + let num_peers_available = peers.0.values() + .filter(|s| s.state == PeerSyncState::Available).count(); + + for i in 0 .. num_peers_available { + requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0])) + } + + let pending = requests.pending_requests.clone(); + let mut m = requests.matcher(); + + for p in &pending { + let (peer, r) = m.next(&peers.0).unwrap(); + assert_eq!(p, &r); + peers.0.get_mut(&peer).unwrap().state = PeerSyncState::DownloadingJustification(r.0); + } + } + + QuickCheck::with_gen(StdThreadGen::new(19)) + .quickcheck(property as fn(ArbitraryPeers)) + } + + #[test] + fn new_roots_schedule_new_request() { + fn property(data: Vec) { + let mut requests = ExtraRequests::::new(); + for (i, number) in data.into_iter().enumerate() { + let hash = [i as u8; 32].into(); + let pending = requests.pending_requests.len(); + let is_root = requests.tree.roots().any(|(&h, &n, _)| hash == h && number == n); + requests.schedule((hash, number), |a, b| Ok(a[0] >= b[0])); + if !is_root { + assert_eq!(1 + pending, requests.pending_requests.len()) + } + } + } + QuickCheck::new().quickcheck(property as fn(Vec)) + } + + #[test] + fn disconnecting_implies_rescheduling() { + fn property(mut peers: ArbitraryPeers) -> bool { + let mut requests = ExtraRequests::::new(); + + let num_peers_available = peers.0.values() + .filter(|s| s.state == PeerSyncState::Available).count(); + + for i in 0 .. num_peers_available { + requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0])) + } + + let mut m = requests.matcher(); + while let Some((peer, r)) = m.next(&peers.0) { + peers.0.get_mut(&peer).unwrap().state = PeerSyncState::DownloadingJustification(r.0); + } + + assert!(requests.pending_requests.is_empty()); + + let active_peers = requests.active_requests.keys().cloned().collect::>(); + let previously_active = requests.active_requests.values().cloned().collect::>(); + + for peer in &active_peers { + requests.peer_disconnected(peer) + } + + assert!(requests.active_requests.is_empty()); + + previously_active == requests.pending_requests.iter().cloned().collect::>() + } + + QuickCheck::with_gen(StdThreadGen::new(19)) + .quickcheck(property as fn(ArbitraryPeers) -> bool) + } + + #[test] + fn no_response_reschedules() { + fn property(mut peers: ArbitraryPeers) { + let mut requests = ExtraRequests::::new(); + + let num_peers_available = peers.0.values() + .filter(|s| s.state == PeerSyncState::Available).count(); + + for i in 0 .. num_peers_available { + requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0])) + } + + let mut m = requests.matcher(); + while let Some((peer, r)) = m.next(&peers.0) { + peers.0.get_mut(&peer).unwrap().state = PeerSyncState::DownloadingJustification(r.0); + } + + let active = requests.active_requests.iter().map(|(p, &r)| (p.clone(), r)).collect::>(); + + for (peer, req) in &active { + assert!(requests.failed_requests.get(req).is_none()); + assert!(!requests.pending_requests.contains(req)); + assert!(requests.on_response::<()>(peer.clone(), None).is_none()); + assert!(requests.pending_requests.contains(req)); + assert_eq!(1, requests.failed_requests.get(req).unwrap().iter().filter(|(p, _)| p == peer).count()) + } + } + + QuickCheck::with_gen(StdThreadGen::new(19)) + .quickcheck(property as fn(ArbitraryPeers)) + } #[test] fn request_is_rescheduled_when_earlier_block_is_finalized() { let _ = ::env_logger::try_init(); - let mut extra_requests = ExtraRequestsAggregator::::new(); + let mut finality_proofs = ExtraRequests::::new(); let hash4 = [4; 32].into(); let hash5 = [5; 32].into(); @@ -435,33 +388,76 @@ mod tests { } // make #4 last finalized block - extra_requests.finality_proofs().tree.import(hash4, 4, (), &is_descendent_of).unwrap(); - extra_requests.finality_proofs().tree.finalize_root(&hash4); + finality_proofs.tree.import(hash4, 4, (), &is_descendent_of).unwrap(); + finality_proofs.tree.finalize_root(&hash4); // schedule request for #6 - extra_requests.finality_proofs().queue_request((hash6, 6), is_descendent_of); + finality_proofs.schedule((hash6, 6), is_descendent_of); // receive finality proof for #5 - extra_requests.finality_proofs().importing_requests.insert((hash6, 6)); - extra_requests.finality_proofs().on_block_finalized(&hash5, 5, is_descendent_of).unwrap(); - extra_requests.finality_proofs().on_import_result((hash6, 6), Ok((hash5, 5))); + finality_proofs.importing_requests.insert((hash6, 6)); + finality_proofs.on_block_finalized(&hash5, 5, is_descendent_of).unwrap(); + finality_proofs.try_finalize_root::<()>((hash6, 6), Ok((hash5, 5)), true); // ensure that request for #6 is still pending - assert_eq!( - extra_requests.finality_proofs().pending_requests.iter().collect::>(), - vec![&(hash6, 6)], - ); + assert_eq!(finality_proofs.pending_requests.iter().collect::>(), vec![&(hash6, 6)]); // receive finality proof for #7 - extra_requests.finality_proofs().importing_requests.insert((hash6, 6)); - extra_requests.finality_proofs().on_block_finalized(&hash6, 6, is_descendent_of).unwrap(); - extra_requests.finality_proofs().on_block_finalized(&hash7, 7, is_descendent_of).unwrap(); - extra_requests.finality_proofs().on_import_result((hash6, 6), Ok((hash7, 7))); + finality_proofs.importing_requests.insert((hash6, 6)); + finality_proofs.on_block_finalized(&hash6, 6, is_descendent_of).unwrap(); + finality_proofs.on_block_finalized(&hash7, 7, is_descendent_of).unwrap(); + finality_proofs.try_finalize_root::<()>((hash6, 6), Ok((hash7, 7)), true); // ensure that there's no request for #6 - assert_eq!( - extra_requests.finality_proofs().pending_requests.iter().collect::>(), - Vec::<&(Hash, u64)>::new(), - ); + assert_eq!(finality_proofs.pending_requests.iter().collect::>(), Vec::<&(Hash, u64)>::new()); } + + // Some Arbitrary instances to allow easy construction of random peer sets: + + #[derive(Debug, Clone)] + struct ArbitraryPeerSyncState(PeerSyncState); + + impl Arbitrary for ArbitraryPeerSyncState { + fn arbitrary(g: &mut G) -> Self { + let s = match g.gen::() % 5 { + 0 => PeerSyncState::Available, + // TODO: 1 => PeerSyncState::AncestorSearch(g.gen(), AncestorSearchState), + 1 => PeerSyncState::DownloadingNew(g.gen::()), + 2 => PeerSyncState::DownloadingStale(Hash::random()), + 3 => PeerSyncState::DownloadingJustification(Hash::random()), + _ => PeerSyncState::DownloadingFinalityProof(Hash::random()) + }; + ArbitraryPeerSyncState(s) + } + } + + #[derive(Debug, Clone)] + struct ArbitraryPeerSync(PeerSync); + + impl Arbitrary for ArbitraryPeerSync { + fn arbitrary(g: &mut G) -> Self { + let ps = PeerSync { + common_number: g.gen(), + best_hash: Hash::random(), + best_number: g.gen(), + state: ArbitraryPeerSyncState::arbitrary(g).0, + recently_announced: Default::default() + }; + ArbitraryPeerSync(ps) + } + } + + #[derive(Debug, Clone)] + struct ArbitraryPeers(HashMap>); + + impl Arbitrary for ArbitraryPeers { + fn arbitrary(g: &mut G) -> Self { + let mut peers = HashMap::with_capacity(g.size()); + for _ in 0 .. g.size() { + peers.insert(PeerId::random(), ArbitraryPeerSync::arbitrary(g).0); + } + ArbitraryPeers(peers) + } + } + }