// This file is part of Substrate. // Copyright (C) Parity Technologies (UK) Ltd. // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with this program. If not, see . use crate::{PeerSync, PeerSyncState}; use fork_tree::ForkTree; use libp2p::PeerId; use log::{debug, trace, warn}; use sc_network_common::sync::metrics::Metrics; use sp_blockchain::Error as ClientError; use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; use std::{ collections::{HashMap, HashSet, VecDeque}, time::{Duration, Instant}, }; // Time to wait before trying to get the same extra data from the same peer. const EXTRA_RETRY_WAIT: Duration = Duration::from_secs(10); /// Pending extra data request for the given block (hash and number). type ExtraRequest = (::Hash, NumberFor); /// Manages pending block extra data (e.g. justification) requests. /// /// Multiple extras may be requested for competing forks, or for the same branch /// at different (increasing) heights. This structure will guarantee that extras /// are fetched in-order, and that obsolete changes are pruned (when finalizing a /// competing fork). #[derive(Debug)] pub(crate) struct ExtraRequests { tree: ForkTree, ()>, /// best finalized block number that we have seen since restart best_seen_finalized_number: NumberFor, /// requests which have been queued for later processing pending_requests: VecDeque>, /// requests which are currently underway to some peer active_requests: HashMap>, /// previous requests without response failed_requests: HashMap, Vec<(PeerId, Instant)>>, /// successful requests importing_requests: HashSet>, /// the name of this type of extra request (useful for logging.) request_type_name: &'static str, } impl ExtraRequests { pub(crate) fn new(request_type_name: &'static str) -> Self { Self { tree: ForkTree::new(), best_seen_finalized_number: Zero::zero(), pending_requests: VecDeque::new(), active_requests: HashMap::new(), failed_requests: HashMap::new(), importing_requests: HashSet::new(), request_type_name, } } /// Reset all state as if returned from `new`. pub(crate) fn reset(&mut self) { self.tree = ForkTree::new(); self.pending_requests.clear(); self.active_requests.clear(); self.failed_requests.clear(); } /// Returns an iterator-like struct that yields peers which extra /// requests can be sent to. pub(crate) fn matcher(&mut self) -> Matcher { Matcher::new(self) } /// Queue an extra data request to be considered by the `Matcher`. pub(crate) fn schedule(&mut self, request: ExtraRequest, is_descendent_of: F) where F: Fn(&B::Hash, &B::Hash) -> Result, { match self.tree.import(request.0, request.1, (), &is_descendent_of) { Ok(true) => { // this is a new root so we add it to the current `pending_requests` self.pending_requests.push_back((request.0, request.1)); }, Err(fork_tree::Error::Revert) => { // we have finalized further than the given request, presumably // by some other part of the system (not sync). we can safely // ignore the `Revert` error. }, Err(err) => { debug!(target: "sync", "Failed to insert request {:?} into tree: {}", request, err); }, _ => (), } } /// Retry any pending request if a peer disconnected. pub(crate) fn peer_disconnected(&mut self, who: &PeerId) { if let Some(request) = self.active_requests.remove(who) { self.pending_requests.push_front(request); } } /// Processes the response for the request previously sent to the given peer. pub(crate) fn on_response( &mut self, who: PeerId, resp: Option, ) -> Option<(PeerId, B::Hash, NumberFor, R)> { // we assume that the request maps to the given response, this is // currently enforced by the outer network protocol before passing on // messages to chain sync. if let Some(request) = self.active_requests.remove(&who) { if let Some(r) = resp { trace!(target: "sync", "Queuing import of {} from {:?} for {:?}", self.request_type_name, who, request, ); self.importing_requests.insert(request); return Some((who, request.0, request.1, r)) } else { trace!(target: "sync", "Empty {} response from {:?} for {:?}", self.request_type_name, who, request, ); } self.failed_requests.entry(request).or_default().push((who, Instant::now())); self.pending_requests.push_front(request); } else { trace!(target: "sync", "No active {} request to {:?}", self.request_type_name, who, ); } None } /// Removes any pending extra requests for blocks lower than the given best finalized. pub(crate) fn on_block_finalized( &mut self, best_finalized_hash: &B::Hash, best_finalized_number: NumberFor, is_descendent_of: F, ) -> Result<(), fork_tree::Error> where F: Fn(&B::Hash, &B::Hash) -> Result, { let request = (*best_finalized_hash, best_finalized_number); if self.try_finalize_root::<()>(request, Ok(request), false) { return Ok(()) } if best_finalized_number > self.best_seen_finalized_number { // we receive finality notification only for the finalized branch head. match self.tree.finalize_with_ancestors( best_finalized_hash, best_finalized_number, &is_descendent_of, ) { Err(fork_tree::Error::Revert) => { // we might have finalized further already in which case we // will get a `Revert` error which we can safely ignore. }, Err(err) => return Err(err), Ok(_) => {}, } self.best_seen_finalized_number = best_finalized_number; } let roots = self.tree.roots().collect::>(); self.pending_requests.retain(|(h, n)| roots.contains(&(h, n, &()))); self.active_requests.retain(|_, (h, n)| roots.contains(&(h, n, &()))); self.failed_requests.retain(|(h, n), _| roots.contains(&(h, n, &()))); Ok(()) } /// Try to finalize pending root. /// /// Returns true if import of this request has been scheduled. pub(crate) fn try_finalize_root( &mut self, request: ExtraRequest, result: Result, E>, reschedule_on_failure: bool, ) -> bool { if !self.importing_requests.remove(&request) { return false } let (finalized_hash, finalized_number) = match result { Ok(req) => (req.0, req.1), Err(_) => { if reschedule_on_failure { self.pending_requests.push_front(request); } return true }, }; if self.tree.finalize_root(&finalized_hash).is_none() { warn!(target: "sync", "‼️ Imported {:?} {:?} which isn't a root in the tree: {:?}", finalized_hash, finalized_number, self.tree.roots().collect::>() ); return true } self.failed_requests.clear(); self.active_requests.clear(); self.pending_requests.clear(); self.pending_requests.extend(self.tree.roots().map(|(&h, &n, _)| (h, n))); self.best_seen_finalized_number = finalized_number; true } /// Returns an iterator over all active (in-flight) requests and associated peer id. #[cfg(test)] pub(crate) fn active_requests(&self) -> impl Iterator)> { self.active_requests.iter() } /// Returns an iterator over all scheduled pending requests. #[cfg(test)] pub(crate) fn pending_requests(&self) -> impl Iterator> { self.pending_requests.iter() } /// Get some key metrics. pub(crate) fn metrics(&self) -> Metrics { Metrics { pending_requests: self.pending_requests.len().try_into().unwrap_or(std::u32::MAX), active_requests: self.active_requests.len().try_into().unwrap_or(std::u32::MAX), failed_requests: self.failed_requests.len().try_into().unwrap_or(std::u32::MAX), importing_requests: self.importing_requests.len().try_into().unwrap_or(std::u32::MAX), } } } /// Matches peers with pending extra requests. #[derive(Debug)] pub(crate) struct Matcher<'a, B: BlockT> { /// Length of pending requests collection. /// Used to ensure we do not loop more than once over all pending requests. remaining: usize, extras: &'a mut ExtraRequests, } impl<'a, B: BlockT> Matcher<'a, B> { fn new(extras: &'a mut ExtraRequests) -> Self { Self { remaining: extras.pending_requests.len(), extras } } /// Finds a peer to which a pending request can be sent. /// /// Peers are filtered according to the current known best block (i.e. we won't /// send an extra request for block #10 to a peer at block #2), and we also /// throttle requests to the same peer if a previous request yielded no results. /// /// This method returns as soon as it finds a peer that should be able to answer /// our request. If no request is pending or no peer can handle it, `None` is /// returned instead. /// /// # Note /// /// The returned `PeerId` (if any) is guaranteed to come from the given `peers` /// argument. pub(crate) fn next( &mut self, peers: &HashMap>, ) -> Option<(PeerId, ExtraRequest)> { if self.remaining == 0 { return None } // clean up previously failed requests so we can retry again for requests in self.extras.failed_requests.values_mut() { requests.retain(|(_, instant)| instant.elapsed() < EXTRA_RETRY_WAIT); } while let Some(request) = self.extras.pending_requests.pop_front() { for (peer, sync) in peers.iter().filter(|(_, sync)| sync.state == PeerSyncState::Available) { // only ask peers that have synced at least up to the block number that we're asking // the extra for if sync.best_number < request.1 { continue } // don't request to any peers that already have pending requests if self.extras.active_requests.contains_key(peer) { continue } // only ask if the same request has not failed for this peer before if self .extras .failed_requests .get(&request) .map(|rr| rr.iter().any(|i| &i.0 == peer)) .unwrap_or(false) { continue } self.extras.active_requests.insert(*peer, request); trace!(target: "sync", "Sending {} request to {:?} for {:?}", self.extras.request_type_name, peer, request, ); return Some((*peer, request)) } self.extras.pending_requests.push_back(request); self.remaining -= 1; if self.remaining == 0 { break } } None } } #[cfg(test)] mod tests { use super::*; use crate::PeerSync; use quickcheck::{Arbitrary, Gen, QuickCheck}; use sp_blockchain::Error as ClientError; use sp_test_primitives::{Block, BlockNumber, Hash}; use std::collections::{HashMap, HashSet}; #[test] fn requests_are_processed_in_order() { fn property(mut peers: ArbitraryPeers) { let mut requests = ExtraRequests::::new("test"); let num_peers_available = peers.0.values().filter(|s| s.state == PeerSyncState::Available).count(); for i in 0..num_peers_available { requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0])) } let pending = requests.pending_requests.clone(); let mut m = requests.matcher(); for p in &pending { let (peer, r) = m.next(&peers.0).unwrap(); assert_eq!(p, &r); peers.0.get_mut(&peer).unwrap().state = PeerSyncState::DownloadingJustification(r.0); } } QuickCheck::new().quickcheck(property as fn(ArbitraryPeers)) } #[test] fn new_roots_schedule_new_request() { fn property(data: Vec) { let mut requests = ExtraRequests::::new("test"); for (i, number) in data.into_iter().enumerate() { let hash = [i as u8; 32].into(); let pending = requests.pending_requests.len(); let is_root = requests.tree.roots().any(|(&h, &n, _)| hash == h && number == n); requests.schedule((hash, number), |a, b| Ok(a[0] >= b[0])); if !is_root { assert_eq!(1 + pending, requests.pending_requests.len()) } } } QuickCheck::new().quickcheck(property as fn(Vec)) } #[test] fn disconnecting_implies_rescheduling() { fn property(mut peers: ArbitraryPeers) -> bool { let mut requests = ExtraRequests::::new("test"); let num_peers_available = peers.0.values().filter(|s| s.state == PeerSyncState::Available).count(); for i in 0..num_peers_available { requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0])) } let mut m = requests.matcher(); while let Some((peer, r)) = m.next(&peers.0) { peers.0.get_mut(&peer).unwrap().state = PeerSyncState::DownloadingJustification(r.0); } assert!(requests.pending_requests.is_empty()); let active_peers = requests.active_requests.keys().cloned().collect::>(); let previously_active = requests.active_requests.values().cloned().collect::>(); for peer in &active_peers { requests.peer_disconnected(peer) } assert!(requests.active_requests.is_empty()); previously_active == requests.pending_requests.iter().cloned().collect::>() } QuickCheck::new().quickcheck(property as fn(ArbitraryPeers) -> bool) } #[test] fn no_response_reschedules() { fn property(mut peers: ArbitraryPeers) { let mut requests = ExtraRequests::::new("test"); let num_peers_available = peers.0.values().filter(|s| s.state == PeerSyncState::Available).count(); for i in 0..num_peers_available { requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0])) } let mut m = requests.matcher(); while let Some((peer, r)) = m.next(&peers.0) { peers.0.get_mut(&peer).unwrap().state = PeerSyncState::DownloadingJustification(r.0); } let active = requests.active_requests.iter().map(|(&p, &r)| (p, r)).collect::>(); for (peer, req) in &active { assert!(requests.failed_requests.get(req).is_none()); assert!(!requests.pending_requests.contains(req)); assert!(requests.on_response::<()>(*peer, None).is_none()); assert!(requests.pending_requests.contains(req)); assert_eq!( 1, requests .failed_requests .get(req) .unwrap() .iter() .filter(|(p, _)| p == peer) .count() ) } } QuickCheck::new().quickcheck(property as fn(ArbitraryPeers)) } #[test] fn request_is_rescheduled_when_earlier_block_is_finalized() { sp_tracing::try_init_simple(); let mut finality_proofs = ExtraRequests::::new("test"); let hash4 = [4; 32].into(); let hash5 = [5; 32].into(); let hash6 = [6; 32].into(); let hash7 = [7; 32].into(); fn is_descendent_of(base: &Hash, target: &Hash) -> Result { Ok(target[0] >= base[0]) } // make #4 last finalized block finality_proofs.tree.import(hash4, 4, (), &is_descendent_of).unwrap(); finality_proofs.tree.finalize_root(&hash4); // schedule request for #6 finality_proofs.schedule((hash6, 6), is_descendent_of); // receive finality proof for #5 finality_proofs.importing_requests.insert((hash6, 6)); finality_proofs.on_block_finalized(&hash5, 5, is_descendent_of).unwrap(); finality_proofs.try_finalize_root::<()>((hash6, 6), Ok((hash5, 5)), true); // ensure that request for #6 is still pending assert_eq!(finality_proofs.pending_requests.iter().collect::>(), vec![&(hash6, 6)]); // receive finality proof for #7 finality_proofs.importing_requests.insert((hash6, 6)); finality_proofs.on_block_finalized(&hash6, 6, is_descendent_of).unwrap(); finality_proofs.on_block_finalized(&hash7, 7, is_descendent_of).unwrap(); finality_proofs.try_finalize_root::<()>((hash6, 6), Ok((hash7, 7)), true); // ensure that there's no request for #6 assert_eq!( finality_proofs.pending_requests.iter().collect::>(), Vec::<&(Hash, u64)>::new() ); } #[test] fn ancestor_roots_are_finalized_when_finality_notification_is_missed() { let mut finality_proofs = ExtraRequests::::new("test"); let hash4 = [4; 32].into(); let hash5 = [5; 32].into(); fn is_descendent_of(base: &Hash, target: &Hash) -> Result { Ok(target[0] >= base[0]) } // schedule request for #4 finality_proofs.schedule((hash4, 4), is_descendent_of); // receive finality notification for #5 (missing notification for #4!!!) finality_proofs.importing_requests.insert((hash4, 5)); finality_proofs.on_block_finalized(&hash5, 5, is_descendent_of).unwrap(); assert_eq!(finality_proofs.tree.roots().count(), 0); } // Some Arbitrary instances to allow easy construction of random peer sets: #[derive(Debug, Clone)] struct ArbitraryPeerSyncState(PeerSyncState); impl Arbitrary for ArbitraryPeerSyncState { fn arbitrary(g: &mut Gen) -> Self { let s = match u8::arbitrary(g) % 4 { 0 => PeerSyncState::Available, // TODO: 1 => PeerSyncState::AncestorSearch(g.gen(), AncestorSearchState), 1 => PeerSyncState::DownloadingNew(BlockNumber::arbitrary(g)), 2 => PeerSyncState::DownloadingStale(Hash::random()), _ => PeerSyncState::DownloadingJustification(Hash::random()), }; ArbitraryPeerSyncState(s) } } #[derive(Debug, Clone)] struct ArbitraryPeerSync(PeerSync); impl Arbitrary for ArbitraryPeerSync { fn arbitrary(g: &mut Gen) -> Self { let ps = PeerSync { peer_id: PeerId::random(), common_number: u64::arbitrary(g), best_hash: Hash::random(), best_number: u64::arbitrary(g), state: ArbitraryPeerSyncState::arbitrary(g).0, }; ArbitraryPeerSync(ps) } } #[derive(Debug, Clone)] struct ArbitraryPeers(HashMap>); impl Arbitrary for ArbitraryPeers { fn arbitrary(g: &mut Gen) -> Self { let mut peers = HashMap::with_capacity(g.size()); for _ in 0..g.size() { let ps = ArbitraryPeerSync::arbitrary(g).0; peers.insert(ps.peer_id, ps); } ArbitraryPeers(peers) } } }