diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 021eca7758..9a65cf8f4b 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -4542,6 +4542,7 @@ dependencies = [ "bitflags 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", + "either 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "erased-serde 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/core/network/Cargo.toml b/substrate/core/network/Cargo.toml index 446e0e0118..8b34317470 100644 --- a/substrate/core/network/Cargo.toml +++ b/substrate/core/network/Cargo.toml @@ -9,6 +9,7 @@ edition = "2018" [dependencies] bytes = "0.4" derive_more = "0.14.0" +either = "1.5.2" log = "0.4" parking_lot = "0.8.0" bitflags = "1.0" diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index c0f40d4332..9e876e7285 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -28,17 +28,13 @@ use runtime_primitives::traits::{ Block as BlockT, Header as HeaderT, NumberFor, One, Zero, CheckedSub, SaturatedConversion }; -use message::{ - BlockRequest as BlockRequestMessage, - FinalityProofRequest as FinalityProofRequestMessage, Message, -}; -use message::{BlockAttributes, Direction, FromBlock, RequestId}; +use message::{BlockAttributes, Direction, FromBlock, Message, RequestId}; use message::generic::{Message as GenericMessage, ConsensusMessage}; use event::Event; use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use on_demand::{OnDemandCore, OnDemandNetwork, RequestData}; use specialization::NetworkSpecialization; -use sync::{ChainSync, Context as SyncContext, SyncState}; +use sync::{ChainSync, SyncState}; use crate::service::{TransactionPool, ExHashT}; use crate::config::{BoxFinalityProofRequestBuilder, Roles}; use rustc_hex::ToHex; @@ -326,38 +322,6 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, } } -impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext for ProtocolContext<'a, B, H> { - fn report_peer(&mut self, who: PeerId, reputation: i32) { - self.peerset_handle.report_peer(who, reputation) - } - - fn disconnect_peer(&mut self, who: PeerId) { - self.behaviour.disconnect_peer(&who) - } - - fn client(&self) -> &dyn Client { - &*self.context_data.chain - } - - fn send_finality_proof_request(&mut self, who: PeerId, request: FinalityProofRequestMessage) { - send_message( - self.behaviour, - &mut self.context_data.peers, - who, - GenericMessage::FinalityProofRequest(request) - ) - } - - fn send_block_request(&mut self, who: PeerId, request: BlockRequestMessage) { - send_message( - self.behaviour, - &mut self.context_data.peers, - who, - GenericMessage::BlockRequest(request) - ) - } -} - /// Data necessary to create a context. struct ContextData { // All connected peers @@ -394,7 +358,7 @@ impl, H: ExHashT> Protocol { peerset_config: peerset::PeersetConfig, ) -> error::Result<(Protocol, peerset::PeersetHandle)> { let info = chain.info(); - let sync = ChainSync::new(config.roles, &info, finality_proof_request_builder); + let sync = ChainSync::new(config.roles, chain.clone(), &info, finality_proof_request_builder); let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config); let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::>(); let behaviour = CustomProto::new(protocol_id, versions, peerset); @@ -661,7 +625,7 @@ impl, H: ExHashT> Protocol { if peer_data.info.protocol_version > 2 { self.consensus_gossip.peer_disconnected(&mut context, peer.clone()); } - self.sync.peer_disconnected(&mut context, peer.clone()); + self.sync.peer_disconnected(peer.clone()); self.specialization.on_disconnect(&mut context, peer.clone()); self.on_demand_core.on_disconnect(OnDemandIn { behaviour: &mut self.behaviour, @@ -792,29 +756,29 @@ impl, H: ExHashT> Protocol { // TODO [andre]: move this logic to the import queue so that // justifications are imported asynchronously (#1482) if request.fields == message::BlockAttributes::JUSTIFICATION { - let outcome = self.sync.on_block_justification_data( - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), - peer, - response - ); - - if let Some((origin, hash, nb, just)) = outcome { - CustomMessageOutcome::JustificationImport(origin, hash, nb, just) - } else { - CustomMessageOutcome::None + match self.sync.on_block_justification(peer, response) { + Ok(sync::OnBlockJustification::Nothing) => CustomMessageOutcome::None, + Ok(sync::OnBlockJustification::Import { peer, hash, number, justification }) => + CustomMessageOutcome::JustificationImport(peer, hash, number, justification), + Err(sync::BadPeer(id, repu)) => { + self.behaviour.disconnect_peer(&id); + self.peerset_handle.report_peer(id, repu); + CustomMessageOutcome::None + } } - } else { - let outcome = self.sync.on_block_data( - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), - peer, - request, - response - ); - if let Some((origin, blocks)) = outcome { - CustomMessageOutcome::BlockImport(origin, blocks) - } else { - CustomMessageOutcome::None + match self.sync.on_block_data(peer, request, response) { + Ok(sync::OnBlockData::Import(origin, blocks)) => + CustomMessageOutcome::BlockImport(origin, blocks), + Ok(sync::OnBlockData::Request(peer, req)) => { + self.send_message(peer, GenericMessage::BlockRequest(req)); + CustomMessageOutcome::None + } + Err(sync::BadPeer(id, repu)) => { + self.behaviour.disconnect_peer(&id); + self.peerset_handle.report_peer(id, repu); + CustomMessageOutcome::None + } } } } @@ -827,7 +791,6 @@ impl, H: ExHashT> Protocol { &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle) ); self.maintain_peers(); - self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle)); self.on_demand_core.maintain_peers(OnDemandIn { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), @@ -953,8 +916,15 @@ impl, H: ExHashT> Protocol { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who.clone(), status.roles, status.best_number); + match self.sync.new_peer(who.clone(), info) { + Ok(None) => (), + Ok(Some(req)) => self.send_message(who.clone(), GenericMessage::BlockRequest(req)), + Err(sync::BadPeer(id, repu)) => { + self.behaviour.disconnect_peer(&id); + self.peerset_handle.report_peer(id, repu) + } + } let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); - self.sync.new_peer(&mut context, who.clone(), info); if protocol_version > 2 { self.consensus_gossip.new_peer(&mut context, who.clone(), status.roles); } @@ -1085,28 +1055,28 @@ impl, H: ExHashT> Protocol { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who.clone(), *header.number()); - let try_import = self.sync.on_block_announce( - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), - who.clone(), - hash, - &header, - ); - // try_import is only true when we have all data required to import block - // in the BlockAnnounce message. This is only when: - // 1) we're on light client; - // AND - // - EITHER 2.1) announced block is stale; - // - OR 2.2) announced block is NEW and we normally only want to download this single block (i.e. - // there are no ascendants of this block scheduled for retrieval) - if !try_import { - return CustomMessageOutcome::None; + match self.sync.on_block_announce(who.clone(), hash, &header) { + sync::OnBlockAnnounce::Request(peer, req) => { + self.send_message(peer, GenericMessage::BlockRequest(req)); + return CustomMessageOutcome::None + } + sync::OnBlockAnnounce::Nothing => { + // try_import is only true when we have all data required to import block + // in the BlockAnnounce message. This is only when: + // 1) we're on light client; + // AND + // - EITHER 2.1) announced block is stale; + // - OR 2.2) announced block is NEW and we normally only want to download this single block (i.e. + // there are no ascendants of this block scheduled for retrieval) + return CustomMessageOutcome::None + } + sync::OnBlockAnnounce::ImportHeader => () // We proceed with the import. } // to import header from announced block let's construct response to request that normally would have // been sent over network (but it is not in our case) let blocks_to_import = self.sync.on_block_data( - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), who.clone(), message::generic::BlockRequest { id: 0, @@ -1131,8 +1101,16 @@ impl, H: ExHashT> Protocol { }, ); match blocks_to_import { - Some((origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), - None => CustomMessageOutcome::None, + Ok(sync::OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), + Ok(sync::OnBlockData::Request(peer, req)) => { + self.send_message(peer, GenericMessage::BlockRequest(req)); + CustomMessageOutcome::None + } + Err(sync::BadPeer(id, repu)) => { + self.behaviour.disconnect_peer(&id); + self.peerset_handle.report_peer(id, repu); + CustomMessageOutcome::None + } } } @@ -1166,11 +1144,7 @@ impl, H: ExHashT> Protocol { /// Call this when a block has been finalized. The sync layer may have some additional /// requesting to perform. pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { - self.sync.on_block_finalized( - &hash, - *header.number(), - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), - ); + self.sync.on_block_finalized(&hash, *header.number()) } fn on_remote_call_request( @@ -1217,9 +1191,7 @@ impl, H: ExHashT> Protocol { /// Uses `protocol` to queue a new justification request and tries to dispatch all pending /// requests. pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - let mut context = - ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); - self.sync.request_justification(&hash, number, &mut context); + self.sync.request_justification(&hash, number) } /// Clears all pending justification requests. @@ -1230,43 +1202,43 @@ impl, H: ExHashT> Protocol { /// A batch of blocks have been processed, with or without errors. /// Call this when a batch of blocks have been processed by the import queue, with or without /// errors. - pub fn blocks_processed( - &mut self, - processed_blocks: Vec, - has_error: bool - ) { - let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); - self.sync.blocks_processed(&mut context, processed_blocks, has_error); + pub fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { + self.sync.on_blocks_processed(processed_blocks, has_error); } /// Restart the sync process. pub fn restart(&mut self) { let peers = self.context_data.peers.clone(); - let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); - self.sync.restart(&mut context, |peer_id| peers.get(peer_id).map(|i| i.info.clone())); + for result in self.sync.restart(|peer_id| peers.get(peer_id).map(|i| i.info.clone())) { + match result { + Ok((id, req)) => { + let msg = GenericMessage::BlockRequest(req); + send_message(&mut self.behaviour, &mut self.context_data.peers, id, msg) + } + Err(sync::BadPeer(id, repu)) => { + self.behaviour.disconnect_peer(&id); + self.peerset_handle.report_peer(id, repu) + } + } + } } /// Notify about successful import of the given block. pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { - self.sync.block_imported(hash, number) + trace!(target: "sync", "Block imported successfully {} ({})", number, hash) } /// Call this when a justification has been processed by the import queue, with or without /// errors. pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor, success: bool) { - self.sync.justification_import_result(hash, number, success) + self.sync.on_justification_import(hash, number, success) } /// Request a finality proof for the given block. /// /// Queues a new finality proof request and tries to dispatch all pending requests. - pub fn request_finality_proof( - &mut self, - hash: &B::Hash, - number: NumberFor - ) { - let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); - self.sync.request_finality_proof(&hash, number, &mut context); + pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { + self.sync.request_finality_proof(&hash, number) } pub fn finality_proof_import_result( @@ -1274,7 +1246,7 @@ impl, H: ExHashT> Protocol { request_block: (B::Hash, NumberFor), finalization_result: Result<(B::Hash, NumberFor), ()>, ) { - self.sync.finality_proof_import_result(request_block, finalization_result) + self.sync.on_finality_proof_import(request_block, finalization_result) } fn on_remote_call_response( @@ -1475,16 +1447,15 @@ impl, H: ExHashT> Protocol { response: message::FinalityProofResponse, ) -> CustomMessageOutcome { trace!(target: "sync", "Finality proof response from {} for {}", who, response.block); - let outcome = self.sync.on_block_finality_proof_data( - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), - who, - response, - ); - - if let Some((origin, hash, nb, proof)) = outcome { - CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) - } else { - CustomMessageOutcome::None + match self.sync.on_block_finality_proof(who, response) { + Ok(sync::OnBlockFinalityProof::Nothing) => CustomMessageOutcome::None, + Ok(sync::OnBlockFinalityProof::Import { peer, hash, number, proof }) => + CustomMessageOutcome::FinalityProofImport(peer, hash, number, proof), + Err(sync::BadPeer(id, repu)) => { + self.behaviour.disconnect_peer(&id); + self.peerset_handle.report_peer(id, repu); + CustomMessageOutcome::None + } } } @@ -1575,6 +1546,16 @@ Protocol { self.propagate_extrinsics(); } + for (id, r) in self.sync.block_requests() { + send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::BlockRequest(r)) + } + for (id, r) in self.sync.justification_requests() { + send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::BlockRequest(r)) + } + for (id, r) in self.sync.finality_proof_requests() { + send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::FinalityProofRequest(r)) + } + let event = match self.behaviour.poll(params) { Async::NotReady => return Async::NotReady, Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev, diff --git a/substrate/core/network/src/protocol/sync.rs b/substrate/core/network/src/protocol/sync.rs index 8f5218b15c..75658a2f08 100644 --- a/substrate/core/network/src/protocol/sync.rs +++ b/substrate/core/network/src/protocol/sync.rs @@ -1,16 +1,16 @@ // Copyright 2017-2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. - +// // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. - +// // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. - +// // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . @@ -24,133 +24,67 @@ //! //! The `ChainSync` struct maintains the state of the block requests. Whenever something happens on //! the network, or whenever a block has been successfully verified, call the appropriate method in -//! order to update it. You must also regularly call `tick()`. -//! -//! To each of these methods, you must pass a `Context` object that the `ChainSync` will use to -//! send its new outgoing requests. +//! order to update it. //! -use std::cmp::max; -use std::ops::Range; -use std::collections::{HashMap, VecDeque}; -use log::{debug, trace, warn, info, error}; -use crate::protocol::PeerInfo as ProtocolPeerInfo; -use libp2p::PeerId; -use client::{BlockStatus, ClientInfo}; -use consensus::{BlockOrigin, import_queue::IncomingBlock}; -use client::error::Error as ClientError; use blocks::BlockCollection; -use extra_requests::ExtraRequests; -use runtime_primitives::traits::{ - Block as BlockT, Header as HeaderT, NumberFor, Zero, One, - CheckedSub, SaturatedConversion +use client::{BlockStatus, ClientInfo, error::Error as ClientError}; +use consensus::{BlockOrigin, import_queue::IncomingBlock}; +use crate::{ + config::{Roles, BoxFinalityProofRequestBuilder}, + message::{self, generic::FinalityProofRequest, BlockAttributes, BlockRequest, BlockResponse, FinalityProofResponse}, + protocol }; -use runtime_primitives::{Justification, generic::BlockId}; -use crate::message; -use crate::config::{Roles, BoxFinalityProofRequestBuilder}; -use std::collections::HashSet; +use either::Either; +use extra_requests::ExtraRequests; +use libp2p::PeerId; +use log::{debug, trace, warn, info, error}; +use runtime_primitives::{ + Justification, + generic::BlockId, + traits::{Block, Header, NumberFor, Zero, One, CheckedSub, SaturatedConversion} +}; +use std::{fmt, ops::Range, collections::{HashMap, HashSet, VecDeque}, sync::Arc}; mod blocks; mod extra_requests; /// Maximum blocks to request in a single packet. const MAX_BLOCKS_TO_REQUEST: usize = 128; + /// Maximum blocks to store in the import queue. const MAX_IMPORTING_BLOCKS: usize = 2048; -/// We use a heuristic that with a high likelihood, by the time `MAJOR_SYNC_BLOCKS` have been -/// imported we'll be on the same chain as (or at least closer to) the peer so we want to delay the -/// ancestor search to not waste time doing that when we're so far behind. -const MAJOR_SYNC_BLOCKS: usize = 5; + +/// We use a heuristic that with a high likelihood, by the time +/// `MAJOR_SYNC_BLOCKS` have been imported we'll be on the same +/// chain as (or at least closer to) the peer so we want to delay +/// the ancestor search to not waste time doing that when we are +/// so far behind. +const MAJOR_SYNC_BLOCKS: u8 = 5; + /// Number of recently announced blocks to track for each peer. const ANNOUNCE_HISTORY_SIZE: usize = 64; + /// Max number of blocks to download for unknown forks. const MAX_UNKNOWN_FORK_DOWNLOAD_LEN: u32 = 32; -/// Reputation change when a peer sent us a status message that led to a database read error. + +/// Reputation change when a peer sent us a status message that led to a +/// database read error. const BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE: i32 = -(1 << 16); -/// Reputation change when a peer failed to answer our legitimate ancestry block search. + +/// Reputation change when a peer failed to answer our legitimate ancestry +/// block search. const ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE: i32 = -(1 << 9); -/// Reputation change when a peer sent us a status message with a different genesis than us. + +/// Reputation change when a peer sent us a status message with a different +/// genesis than us. const GENESIS_MISMATCH_REPUTATION_CHANGE: i32 = i32::min_value() + 1; -/// Context for a network-specific handler. -pub trait Context { - /// Get a reference to the client. - fn client(&self) -> &dyn crate::chain::Client; - - /// Adjusts the reputation of the peer. Use this to point out that a peer has been malign or - /// irresponsible or appeared lazy. - fn report_peer(&mut self, who: PeerId, reputation: i32); - - /// Force disconnecting from a peer. Use this when a peer misbehaved. - fn disconnect_peer(&mut self, who: PeerId); - - /// Request a finality proof from a peer. - fn send_finality_proof_request(&mut self, who: PeerId, request: message::FinalityProofRequest); - - /// Request a block from a peer. - fn send_block_request(&mut self, who: PeerId, request: message::BlockRequest); -} - -#[derive(Debug, Clone)] -/// All the data we have about a Peer that we are trying to sync with -pub(crate) struct PeerSync { - /// The common number is the block number that is a common point of ancestry for both our chains - /// (as far as we know) - pub common_number: NumberFor, - /// The hash of the best block that we've seen for this peer - pub best_hash: B::Hash, - /// The number of the best block that we've seen for this peer - pub best_number: NumberFor, - /// The state of syncing this peer is in for us, generally categories into `Available` or "busy" - /// with something as defined by `PeerSyncState`. - pub state: PeerSyncState, - /// A queue of blocks that this peer has announced to us, should only contain - /// `ANNOUNCE_HISTORY_SIZE` entries. - pub recently_announced: VecDeque, -} - -/// The sync status of a peer we are trying to sync with -#[derive(Debug)] -pub(crate) struct PeerInfo { - /// Their best block hash. - pub best_hash: B::Hash, - /// Their best block number. - pub best_number: NumberFor, -} - -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -/// The ancestor search state expresses which algorithm, and its stateful parameters, we are using to -/// try to find an ancestor block -pub(crate) enum AncestorSearchState { - /// Use exponential backoff to find an ancestor, then switch to binary search. - /// We keep track of the exponent. - ExponentialBackoff(NumberFor), - /// Using binary search to find the best ancestor. - /// We keep track of left and right bounds. - BinarySearch(NumberFor, NumberFor), -} - -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -/// The state of syncing between a Peer and ourselves. Generally two categories, "busy" or -/// `Available`. If busy, the Enum defines what we are busy with. -pub(crate) enum PeerSyncState { - /// Searching for ancestors the Peer has in common with us. - AncestorSearch(NumberFor, AncestorSearchState), - /// Available for sync requests. - Available, - /// Actively downloading new blocks, starting from the given Number. - DownloadingNew(NumberFor), - /// Downloading a stale block with given Hash. Stale means that it's a block with a number that - /// is lower than our best number. It might be from a fork and not necessarily already imported. - DownloadingStale(B::Hash), - /// Downloading justification for given block hash. - DownloadingJustification(B::Hash), - /// Downloading finality proof for given block hash. - DownloadingFinalityProof(B::Hash), -} - -/// The main data structure to contain all the state for a chains active syncing strategy. -pub struct ChainSync { +/// The main data structure which contains all the state for a chains +/// active syncing strategy. +pub struct ChainSync { + /// Chain client. + client: Arc>, /// The active peers that we are using to sync and their PeerSync status peers: HashMap>, /// A `BlockCollection` of blocks that are being downloaded from peers @@ -161,17 +95,78 @@ pub struct ChainSync { best_queued_hash: B::Hash, /// The role of this node, e.g. light or full role: Roles, - /// What block attributes we require for this node, usually derived from what role we are, but - /// could be customized + /// What block attributes we require for this node, usually derived from + /// what role we are, but could be customized required_block_attributes: message::BlockAttributes, + /// Any extra finality proof requests. extra_finality_proofs: ExtraRequests, + /// Any extra justification requests. extra_justifications: ExtraRequests, - /// A set of hashes of blocks that are being downloaded or have been downloaded and are queued - /// for import. + /// A set of hashes of blocks that are being downloaded or have been + /// downloaded and are queued for import. queue_blocks: HashSet, - /// The best block number that we are currently importing + /// The best block number that we are currently importing. best_importing_number: NumberFor, - request_builder: Option>, + request_builder: Option> +} + +/// All the data we have about a Peer that we are trying to sync with +#[derive(Debug, Clone)] +pub struct PeerSync { + /// The common number is the block number that is a common point of + /// ancestry for both our chains (as far as we know). + pub common_number: NumberFor, + /// The hash of the best block that we've seen for this peer. + pub best_hash: B::Hash, + /// The number of the best block that we've seen for this peer. + pub best_number: NumberFor, + /// The state of syncing this peer is in for us, generally categories + /// into `Available` or "busy" with something as defined by `PeerSyncState`. + pub state: PeerSyncState, + /// A queue of blocks that this peer has announced to us, should only + /// contain `ANNOUNCE_HISTORY_SIZE` entries. + pub recently_announced: VecDeque +} + +/// The sync status of a peer we are trying to sync with +#[derive(Debug)] +pub struct PeerInfo { + /// Their best block hash. + pub best_hash: B::Hash, + /// Their best block number. + pub best_number: NumberFor +} + +/// The state of syncing between a Peer and ourselves. +/// +/// Generally two categories, "busy" or `Available`. If busy, the enum +/// defines what we are busy with. +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum PeerSyncState { + /// Available for sync requests. + Available, + /// Searching for ancestors the Peer has in common with us. + AncestorSearch(NumberFor, AncestorSearchState), + /// Actively downloading new blocks, starting from the given Number. + DownloadingNew(NumberFor), + /// Downloading a stale block with given Hash. Stale means that it is a + /// block with a number that is lower than our best number. It might be + /// from a fork and not necessarily already imported. + DownloadingStale(B::Hash), + /// Downloading justification for given block hash. + DownloadingJustification(B::Hash), + /// Downloading finality proof for given block hash. + DownloadingFinalityProof(B::Hash) +} + +impl PeerSyncState { + pub fn is_available(&self) -> bool { + if let PeerSyncState::Available = self { + true + } else { + false + } + } } /// Reported sync state. @@ -183,32 +178,93 @@ pub enum SyncState { Downloading } -/// Syncing status and statistics +/// Syncing status and statistics. #[derive(Clone)] -pub struct Status { +pub struct Status { /// Current global sync state. pub state: SyncState, /// Target sync block number. pub best_seen_block: Option>, /// Number of peers participating in syncing. - pub num_peers: u32, + pub num_peers: u32 } -impl ChainSync { - /// Create a new instance. Pass the initial known state of the chain. - pub(crate) fn new( +/// A peer did not behave as expected and should be reported. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BadPeer(pub PeerId, pub i32); + +impl fmt::Display for BadPeer { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "bad peer {}; reputation change: {}", self.0, self.1) + } +} + +impl std::error::Error for BadPeer {} + +/// Result of [`ChainSync::on_block_data`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OnBlockData { + /// The block should be imported. + Import(BlockOrigin, Vec>), + /// A new block request needs to be made to the given peer. + Request(PeerId, BlockRequest) +} + +/// Result of [`ChainSync::on_block_announce`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OnBlockAnnounce { + /// The announcement does not require further handling. + Nothing, + /// The announcement header should be imported. + ImportHeader, + /// Another block request to the given peer is necessary. + Request(PeerId, BlockRequest) +} + +/// Result of [`ChainSync::on_block_justification`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OnBlockJustification { + /// The justification needs no further handling. + Nothing, + /// The justification should be imported. + Import { + peer: PeerId, + hash: B::Hash, + number: NumberFor, + justification: Justification + } +} + +/// Result of [`ChainSync::on_block_finality_proof`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OnBlockFinalityProof { + /// The proof needs no further handling. + Nothing, + /// The proof should be imported. + Import { + peer: PeerId, + hash: B::Hash, + number: NumberFor, + proof: Vec + } +} + +impl ChainSync { + /// Create a new instance. + pub fn new( role: Roles, + client: Arc>, info: &ClientInfo, request_builder: Option> ) -> Self { - let mut required_block_attributes = - message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION; + let mut required_block_attributes = BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION; if role.is_full() { - required_block_attributes |= message::BlockAttributes::BODY; + required_block_attributes |= BlockAttributes::BODY } ChainSync { + client, peers: HashMap::new(), blocks: BlockCollection::new(), best_queued_hash: info.chain.best_hash, @@ -219,98 +275,85 @@ impl ChainSync { required_block_attributes, queue_blocks: Default::default(), best_importing_number: Zero::zero(), - request_builder, + request_builder } } - /// Returns the number for the best seen blocks among connected peers, if any - fn best_seen_block(&self) -> Option> { - self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number) + /// Returns the state of the sync of the given peer. + /// + /// Returns `None` if the peer is unknown. + pub fn peer_info(&self, who: &PeerId) -> Option> { + self.peers.get(who).map(|p| PeerInfo { best_hash: p.best_hash, best_number: p.best_number }) } - /// Returns the SyncState that we are currently in based on a provided `best_seen` block number. - /// A chain is classified as downloading if the provided best block is more than `MAJOR_SYNC_BLOCKS` - /// behind the best queued block. - fn state(&self, best_seen: &Option>) -> SyncState { - match best_seen { - &Some(n) if n > self.best_queued_number && n - self.best_queued_number > 5.into() => SyncState::Downloading, - _ => SyncState::Idle, - } - } + /// Returns the current sync status. + pub fn status(&self) -> Status { + let best_seen = self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number); + let sync_state = + if let Some(n) = best_seen { + // A chain is classified as downloading if the provided best block is + // more than `MAJOR_SYNC_BLOCKS` behind the best queued block. + if n > self.best_queued_number && n - self.best_queued_number > MAJOR_SYNC_BLOCKS.into() { + SyncState::Downloading + } else { + SyncState::Idle + } + } else { + SyncState::Idle + }; - /// Returns the state of the sync of the given peer. Returns `None` if the peer is unknown. - pub(crate) fn peer_info(&self, who: &PeerId) -> Option> { - self.peers.get(who).map(|peer| { - PeerInfo { - best_hash: peer.best_hash, - best_number: peer.best_number, - } - }) - } - - /// Returns sync status. - pub(crate) fn status(&self) -> Status { - let best_seen = self.best_seen_block(); - let state = self.state(&best_seen); Status { - state, + state: sync_state, best_seen_block: best_seen, - num_peers: self.peers.len() as u32, + num_peers: self.peers.len() as u32 } } - /// Handle new connected peer. Call this method whenever we connect to a new peer. - pub(crate) fn new_peer( - &mut self, - protocol: &mut dyn Context, - who: PeerId, - info: ProtocolPeerInfo - ) { - // there's nothing sync can get from the node that has no blockchain data - // (the opposite is not true, but all requests are served at protocol level) + /// Handle a new connected peer. + /// + /// Call this method whenever we connect to a new peer. + pub fn new_peer(&mut self, who: PeerId, info: protocol::PeerInfo) -> Result>, BadPeer> { + // There is nothing sync can get from the node that has no blockchain data. if !info.roles.is_full() { - return; + return Ok(None) } - - let status = block_status(&*protocol.client(), &self.queue_blocks, info.best_hash); - match (status, info.best_number) { - (Err(e), _) => { + match self.block_status(&info.best_hash) { + Err(e) => { debug!(target:"sync", "Error reading blockchain: {:?}", e); - protocol.report_peer(who.clone(), BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE); - protocol.disconnect_peer(who); - }, - (Ok(BlockStatus::KnownBad), _) => { + Err(BadPeer(who, BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE)) + } + Ok(BlockStatus::KnownBad) => { info!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number); - protocol.report_peer(who.clone(), i32::min_value()); - protocol.disconnect_peer(who); - }, - (Ok(BlockStatus::Unknown), b) if b.is_zero() => { - info!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number); - protocol.report_peer(who.clone(), i32::min_value()); - protocol.disconnect_peer(who); - }, - (Ok(BlockStatus::Unknown), _) if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS => { + Err(BadPeer(who, i32::min_value())) + } + Ok(BlockStatus::Unknown) => { + if info.best_number.is_zero() { + info!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number); + return Err(BadPeer(who, i32::min_value())) + } + // If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have // enough to do in the import queue that it's not worth kicking off // an ancestor search, which is what we do in the next match case below. - debug!( - target:"sync", - "New peer with unknown best hash {} ({}), assuming common block.", - self.best_queued_hash, - self.best_queued_number - ); - self.peers.insert(who, PeerSync { - common_number: self.best_queued_number, - best_hash: info.best_hash, - best_number: info.best_number, - state: PeerSyncState::Available, - recently_announced: Default::default(), - }); - } - (Ok(BlockStatus::Unknown), _) => { - let our_best = self.best_queued_number; - if our_best.is_zero() { - // We are at genesis, just start downloading + if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS.into() { + debug!( + target:"sync", + "New peer with unknown best hash {} ({}), assuming common block.", + self.best_queued_hash, + self.best_queued_number + ); + self.peers.insert(who, PeerSync { + common_number: self.best_queued_number, + best_hash: info.best_hash, + best_number: info.best_number, + state: PeerSyncState::Available, + recently_announced: Default::default() + }); + return Ok(None) + } + + // If we are at genesis, just start downloading. + if self.best_queued_number.is_zero() { debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number); self.peers.insert(who.clone(), PeerSync { common_number: Zero::zero(), @@ -319,30 +362,31 @@ impl ChainSync { state: PeerSyncState::Available, recently_announced: Default::default(), }); - self.download_new(protocol, who) - } else { - let common_best = ::std::cmp::min(our_best, info.best_number); - debug!(target:"sync", - "New peer with unknown best hash {} ({}), searching for common ancestor.", - info.best_hash, - info.best_number - ); - self.peers.insert(who.clone(), PeerSync { - common_number: Zero::zero(), - best_hash: info.best_hash, - best_number: info.best_number, - state: PeerSyncState::AncestorSearch( - common_best, - AncestorSearchState::ExponentialBackoff(One::one()) - ), - recently_announced: Default::default(), - }); - Self::request_ancestry(protocol, who, common_best) + return Ok(self.select_new_blocks(who).map(|(_, req)| req)) } - }, - (Ok(BlockStatus::Queued), _) | - (Ok(BlockStatus::InChainWithState), _) | - (Ok(BlockStatus::InChainPruned), _) => { + + let common_best = std::cmp::min(self.best_queued_number, info.best_number); + + debug!(target:"sync", + "New peer with unknown best hash {} ({}), searching for common ancestor.", + info.best_hash, + info.best_number + ); + + self.peers.insert(who, PeerSync { + common_number: Zero::zero(), + best_hash: info.best_hash, + best_number: info.best_number, + state: PeerSyncState::AncestorSearch( + common_best, + AncestorSearchState::ExponentialBackoff(One::one()) + ), + recently_announced: Default::default() + }); + + Ok(Some(ancestry_request::(common_best))) + } + Ok(BlockStatus::Queued) | Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => { debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number); self.peers.insert(who.clone(), PeerSync { common_number: info.best_number, @@ -351,337 +395,23 @@ impl ChainSync { state: PeerSyncState::Available, recently_announced: Default::default(), }); + Ok(None) } } } - /// This function handles the ancestor search strategy used. The goal is to find a common point - /// that both our chains agree on that is as close to the tip as possible. - /// The way this works is we first have an exponential backoff strategy, where we try to step - /// forward until we find a block hash mismatch. The size of the step doubles each step we take. - /// - /// When we've found a block hash mismatch we then fall back to a binary search between the two - /// last known points to find the common block closest to the tip. - fn handle_ancestor_search_state( - state: AncestorSearchState, - curr_block_num: NumberFor, - block_hash_match: bool, - ) -> Option<(AncestorSearchState, NumberFor)> { - let two = >::one() + >::one(); - match state { - AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => { - if block_hash_match && next_distance_to_tip == One::one() { - // We found the ancestor in the first step so there is no need to execute binary search. - return None; - } - if block_hash_match { - let left = curr_block_num; - let right = left + next_distance_to_tip / two; - let middle = left + (right - left) / two; - Some((AncestorSearchState::BinarySearch(left, right), middle)) - } else { - let next_block_num = curr_block_num.checked_sub(&next_distance_to_tip) - .unwrap_or_else(Zero::zero); - let next_distance_to_tip = next_distance_to_tip * two; - Some((AncestorSearchState::ExponentialBackoff(next_distance_to_tip), next_block_num)) - } - }, - AncestorSearchState::BinarySearch(mut left, mut right) => { - if left >= curr_block_num { - return None; - } - if block_hash_match { - left = curr_block_num; - } else { - right = curr_block_num; - } - assert!(right >= left); - let middle = left + (right - left) / two; - Some((AncestorSearchState::BinarySearch(left, right), middle)) - }, - } + /// Signal that `best_header` has been queued for import and update the + /// `ChainSync` state with that information. + pub fn update_chain_info(&mut self, best_header: &B::Header) { + self.on_block_queued(&best_header.hash(), *best_header.number()) } - /// Handle a response from the remote to a block request that we made. - /// - /// `request` must be the original request that triggered `response`. - /// - /// If this corresponds to a valid block, this outputs the block that must be imported in the - /// import queue. - #[must_use] - pub(crate) fn on_block_data( - &mut self, - protocol: &mut dyn Context, - who: PeerId, - request: message::BlockRequest, - response: message::BlockResponse - ) -> Option<(BlockOrigin, Vec>)> { - let new_blocks: Vec> = if let Some(ref mut peer) = self.peers.get_mut(&who) { - let mut blocks = response.blocks; - if request.direction == message::Direction::Descending { - trace!(target: "sync", "Reversing incoming block list"); - blocks.reverse(); - } - let peer_state = peer.state.clone(); - match peer_state { - PeerSyncState::DownloadingNew(start_block) => { - self.blocks.clear_peer_download(&who); - peer.state = PeerSyncState::Available; - self.blocks.insert(start_block, blocks, who); - self.blocks - .drain(self.best_queued_number + One::one()) - .into_iter() - .map(|block_data| { - IncomingBlock { - hash: block_data.block.hash, - header: block_data.block.header, - body: block_data.block.body, - justification: block_data.block.justification, - origin: block_data.origin, - } - }).collect() - }, - PeerSyncState::DownloadingStale(_) => { - peer.state = PeerSyncState::Available; - blocks.into_iter().map(|b| { - IncomingBlock { - hash: b.hash, - header: b.header, - body: b.body, - justification: b.justification, - origin: Some(who.clone()), - } - }).collect() - }, - PeerSyncState::AncestorSearch(num, state) => { - let block_hash_match = match (blocks.get(0), protocol.client().block_hash(num)) { - (Some(ref block), Ok(maybe_our_block_hash)) => { - trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", num, block.hash, who); - maybe_our_block_hash.map_or(false, |x| x == block.hash) - }, - (None, _) => { - debug!(target: "sync", "Invalid response when searching for ancestor from {}", who); - protocol.report_peer(who.clone(), i32::min_value()); - protocol.disconnect_peer(who); - return None - }, - (_, Err(e)) => { - info!("Error answering legitimate blockchain query: {:?}", e); - protocol.report_peer(who.clone(), ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE); - protocol.disconnect_peer(who); - return None - }, - }; - if block_hash_match && peer.common_number < num { - peer.common_number = num; - } - if !block_hash_match && num.is_zero() { - trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); - protocol.report_peer(who.clone(), GENESIS_MISMATCH_REPUTATION_CHANGE); - protocol.disconnect_peer(who); - return None - } - if let Some((next_state, next_block_num)) = - Self::handle_ancestor_search_state(state, num, block_hash_match) { - peer.state = PeerSyncState::AncestorSearch(next_block_num, next_state); - Self::request_ancestry(protocol, who, next_block_num); - return None - } else { - peer.state = PeerSyncState::Available; - vec![] - } - }, - PeerSyncState::Available | - PeerSyncState::DownloadingJustification(..) | - PeerSyncState::DownloadingFinalityProof(..) => Vec::new(), - } - } else { - Vec::new() - }; - - let is_recent = new_blocks - .first() - .map(|block| self.peers.iter().any(|(_, peer)| peer.recently_announced.contains(&block.hash))) - .unwrap_or(false); - let origin = if is_recent { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync }; - - if let Some((hash, number)) = new_blocks.last() - .and_then(|b| b.header.as_ref().map(|h| (b.hash.clone(), *h.number()))) - { - trace!(target:"sync", "Accepted {} blocks ({:?}) with origin {:?}", new_blocks.len(), hash, origin); - self.block_queued(&hash, number); - } - self.maintain_sync(protocol); - let new_best_importing_number = new_blocks - .last() - .and_then(|b| b.header.as_ref().map(|h| h.number().clone())) - .unwrap_or_else(|| Zero::zero()); - self.queue_blocks - .extend(new_blocks.iter().map(|b| b.hash.clone())); - self.best_importing_number = max(new_best_importing_number, self.best_importing_number); - Some((origin, new_blocks)) - } - - /// Handle a response from the remote to a justification request that we made. - /// - /// `request` must be the original request that triggered `response`. - /// - /// Returns `Some` if this produces a justification that must be imported into the import - /// queue. - #[must_use] - pub(crate) fn on_block_justification_data( - &mut self, - protocol: &mut dyn Context, - who: PeerId, - response: message::BlockResponse, - ) -> Option<(PeerId, B::Hash, NumberFor, Justification)> - { - let peer = if let Some(peer) = self.peers.get_mut(&who) { - peer - } else { - error!(target: "sync", "Called on_block_justification_data with a bad peer ID"); - return None; - }; - - if let PeerSyncState::DownloadingJustification(hash) = peer.state { - peer.state = PeerSyncState::Available; - - // we only request one justification at a time - match response.blocks.into_iter().next() { - Some(response) => { - if hash != response.hash { - info!("Invalid block justification provided by {}: requested: {:?} got: {:?}", - who, hash, response.hash); - protocol.report_peer(who.clone(), i32::min_value()); - protocol.disconnect_peer(who); - return None; - } - return self.extra_justifications.on_response(who, response.justification) - } - None => { - // we might have asked the peer for a justification on a block that we thought it had - // (regardless of whether it had a justification for it or not). - trace!(target: "sync", "Peer {:?} provided empty response for justification request {:?}", - who, - hash, - ); - return None; - } - } - } - - self.maintain_sync(protocol); - None - } - - /// Handle new finality proof data. - pub(crate) fn on_block_finality_proof_data( - &mut self, - protocol: &mut dyn Context, - who: PeerId, - response: message::FinalityProofResponse, - ) -> Option<(PeerId, B::Hash, NumberFor, Vec)> { - let peer = if let Some(peer) = self.peers.get_mut(&who) { - peer - } else { - error!(target: "sync", "Called on_block_finality_proof_data with a bad peer ID"); - return None; - }; - - if let PeerSyncState::DownloadingFinalityProof(hash) = peer.state { - peer.state = PeerSyncState::Available; - - // we only request one finality proof at a time - if hash != response.block { - info!( - "Invalid block finality proof provided: requested: {:?} got: {:?}", - hash, - response.block, - ); - - protocol.report_peer(who.clone(), i32::min_value()); - protocol.disconnect_peer(who); - return None; - } - - return self.extra_finality_proofs.on_response(who, response.proof) - } - - self.maintain_sync(protocol); - None - } - - /// A batch of blocks have been processed, with or without errors. - /// Call this when a batch of blocks have been processed by the import queue, with or without - /// errors. - pub fn blocks_processed(&mut self, protocol: &mut dyn Context, processed_blocks: Vec, has_error: bool) { - for hash in processed_blocks { - self.queue_blocks.remove(&hash); - } - if has_error { - self.best_importing_number = Zero::zero(); - } - self.maintain_sync(protocol) - } - - /// Maintain the sync process (download new blocks, fetch justifications). - fn maintain_sync(&mut self, protocol: &mut dyn Context) { - let peers: Vec = self.peers.keys().map(|p| p.clone()).collect(); - for peer in peers { - self.download_new(protocol, peer); - } - self.tick(protocol) - } - - /// Called periodically to perform any time-based actions. Must be called at a regular - /// interval. - pub fn tick(&mut self, protocol: &mut dyn Context) { - self.send_justification_requests(protocol); - self.send_finality_proof_request(protocol) - } - - fn send_justification_requests(&mut self, protocol: &mut dyn Context) { - let mut matcher = self.extra_justifications.matcher(); - while let Some((peer, request)) = matcher.next(&self.peers) { - self.peers.get_mut(&peer) - .expect("`Matcher::next` guarantees the `PeerId` comes from the given peers; qed") - .state = PeerSyncState::DownloadingJustification(request.0); - protocol.send_block_request(peer, message::generic::BlockRequest { - id: 0, - fields: message::BlockAttributes::JUSTIFICATION, - from: message::FromBlock::Hash(request.0), - to: None, - direction: message::Direction::Ascending, - max: Some(1) - }) - } - } - - fn send_finality_proof_request(&mut self, protocol: &mut dyn Context) { - let mut matcher = self.extra_finality_proofs.matcher(); - while let Some((peer, request)) = matcher.next(&self.peers) { - self.peers.get_mut(&peer) - .expect("`Matcher::next` guarantees the `PeerId` comes from the given peers; qed") - .state = PeerSyncState::DownloadingFinalityProof(request.0); - protocol.send_finality_proof_request(peer, message::generic::FinalityProofRequest { - id: 0, - block: request.0, - request: self.request_builder.as_mut() - .map(|builder| builder.build_request_data(&request.0)) - .unwrap_or_default() - }) - } - } - - /// Request a justification for the given block. - /// - /// Uses `protocol` to queue a new justification request and tries to dispatch all pending - /// requests. - pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor, protocol: &mut dyn Context) { + /// Schedule a justification request for the given block. + pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + let client = &self.client; self.extra_justifications.schedule((*hash, number), |base, block| { - protocol.client().is_descendent_of(base, block) - }); - self.send_justification_requests(protocol) + client.is_descendent_of(base, block) + }) } /// Clears all pending justification requests. @@ -689,53 +419,316 @@ impl ChainSync { self.extra_justifications.reset() } - /// Call this when a justification has been processed by the import queue, with or without - /// errors. - pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor, success: bool) { + /// Schedule a finality proof request for the given block. + pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { + let client = &self.client; + self.extra_finality_proofs.schedule((*hash, number), |base, block| { + client.is_descendent_of(base, block) + }) + } + + /// Get an iterator over all scheduled justification requests. + pub fn justification_requests(&mut self) -> impl Iterator)> + '_ { + let peers = &mut self.peers; + let mut matcher = self.extra_justifications.matcher(); + std::iter::from_fn(move || { + if let Some((peer, request)) = matcher.next(&peers) { + peers.get_mut(&peer) + .expect("`Matcher::next` guarantees the `PeerId` comes from the given peers; qed") + .state = PeerSyncState::DownloadingJustification(request.0); + let req = message::generic::BlockRequest { + id: 0, + fields: BlockAttributes::JUSTIFICATION, + from: message::FromBlock::Hash(request.0), + to: None, + direction: message::Direction::Ascending, + max: Some(1) + }; + Some((peer, req)) + } else { + None + } + }) + } + + /// Get an iterator over all scheduled finality proof requests. + pub fn finality_proof_requests(&mut self) -> impl Iterator)> + '_ { + let peers = &mut self.peers; + let request_builder = &mut self.request_builder; + let mut matcher = self.extra_finality_proofs.matcher(); + std::iter::from_fn(move || { + if let Some((peer, request)) = matcher.next(&peers) { + peers.get_mut(&peer) + .expect("`Matcher::next` guarantees the `PeerId` comes from the given peers; qed") + .state = PeerSyncState::DownloadingFinalityProof(request.0); + let req = message::generic::FinalityProofRequest { + id: 0, + block: request.0, + request: request_builder.as_mut() + .map(|builder| builder.build_request_data(&request.0)) + .unwrap_or_default() + }; + Some((peer, req)) + } else { + None + } + }) + } + + /// Get an iterator over all block requests of all peers. + pub fn block_requests(&mut self) -> impl Iterator)> + '_ { + if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { + trace!(target: "sync", "Too many blocks in the queue."); + return Either::Left(std::iter::empty()) + } + let blocks = &mut self.blocks; + let attrs = &self.required_block_attributes; + let iter = self.peers.iter_mut().filter_map(move |(id, peer)| { + if !peer.state.is_available() { + trace!(target: "sync", "Peer {} is busy", id); + return None + } + if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs) { + peer.state = PeerSyncState::DownloadingNew(range.start); + trace!(target: "sync", "new block request for {}", id); + Some((id.clone(), req)) + } else { + trace!(target: "sync", "no new block request for {}", id); + None + } + }); + Either::Right(iter) + } + + /// Handle a response from the remote to a block request that we made. + /// + /// `request` must be the original request that triggered `response`. + /// + /// If this corresponds to a valid block, this outputs the block that + /// must be imported in the import queue. + pub fn on_block_data + (&mut self, who: PeerId, request: BlockRequest, response: BlockResponse) -> Result, BadPeer> + { + let new_blocks: Vec> = + if let Some(peer) = self.peers.get_mut(&who) { + let mut blocks = response.blocks; + if request.direction == message::Direction::Descending { + trace!(target: "sync", "Reversing incoming block list"); + blocks.reverse() + } + match &mut peer.state { + PeerSyncState::DownloadingNew(start_block) => { + self.blocks.clear_peer_download(&who); + self.blocks.insert(*start_block, blocks, who); + peer.state = PeerSyncState::Available; + self.blocks + .drain(self.best_queued_number + One::one()) + .into_iter() + .map(|block_data| { + IncomingBlock { + hash: block_data.block.hash, + header: block_data.block.header, + body: block_data.block.body, + justification: block_data.block.justification, + origin: block_data.origin, + } + }).collect() + } + PeerSyncState::DownloadingStale(_) => { + peer.state = PeerSyncState::Available; + blocks.into_iter().map(|b| { + IncomingBlock { + hash: b.hash, + header: b.header, + body: b.body, + justification: b.justification, + origin: Some(who.clone()), + } + }).collect() + } + PeerSyncState::AncestorSearch(num, state) => { + let block_hash_match = match (blocks.get(0), self.client.block_hash(*num)) { + (Some(block), Ok(maybe_our_block_hash)) => { + trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", num, block.hash, who); + maybe_our_block_hash.map_or(false, |x| x == block.hash) + }, + (None, _) => { + debug!(target: "sync", "Invalid response when searching for ancestor from {}", who); + return Err(BadPeer(who, i32::min_value())) + }, + (_, Err(e)) => { + info!("Error answering legitimate blockchain query: {:?}", e); + return Err(BadPeer(who, ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE)) + } + }; + if block_hash_match && peer.common_number < *num { + peer.common_number = *num; + } + if !block_hash_match && num.is_zero() { + trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); + return Err(BadPeer(who, GENESIS_MISMATCH_REPUTATION_CHANGE)) + } + if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *num, block_hash_match) { + peer.state = PeerSyncState::AncestorSearch(next_num, next_state); + return Ok(OnBlockData::Request(who, ancestry_request::(next_num))) + } else { + peer.state = PeerSyncState::Available; + Vec::new() + } + } + + | PeerSyncState::Available + | PeerSyncState::DownloadingJustification(..) + | PeerSyncState::DownloadingFinalityProof(..) => Vec::new() + } + } else { + Vec::new() + }; + + let is_recent = new_blocks.first() + .map(|block| { + self.peers.iter().any(|(_, peer)| peer.recently_announced.contains(&block.hash)) + }) + .unwrap_or(false); + + let origin = + if is_recent { + BlockOrigin::NetworkBroadcast + } else { + BlockOrigin::NetworkInitialSync + }; + + if let Some((h, n)) = new_blocks.last().and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number()))) { + trace!(target:"sync", "Accepted {} blocks ({:?}) with origin {:?}", new_blocks.len(), h, origin); + self.on_block_queued(h, n) + } + + let new_best_importing_number = new_blocks.last() + .and_then(|b| b.header.as_ref().map(|h| *h.number())) + .unwrap_or_else(|| Zero::zero()); + + self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash)); + + self.best_importing_number = std::cmp::max(new_best_importing_number, self.best_importing_number); + + Ok(OnBlockData::Import(origin, new_blocks)) + } + + /// Handle a response from the remote to a justification request that we made. + /// + /// `request` must be the original request that triggered `response`. + /// + /// Returns `Some` if this produces a justification that must be imported + /// into the import queue. + pub fn on_block_justification + (&mut self, who: PeerId, response: BlockResponse) -> Result, BadPeer> + { + let peer = + if let Some(peer) = self.peers.get_mut(&who) { + peer + } else { + error!(target: "sync", "Called on_block_justification with a bad peer ID"); + return Ok(OnBlockJustification::Nothing) + }; + + if let PeerSyncState::DownloadingJustification(hash) = peer.state { + peer.state = PeerSyncState::Available; + + // We only request one justification at a time + debug_assert_eq!(1, response.blocks.len()); + + if let Some(block) = response.blocks.into_iter().next() { + if hash != block.hash { + info!( + "Invalid block justification provided by {}: requested: {:?} got: {:?}", who, hash, block.hash + ); + return Err(BadPeer(who, i32::min_value())) + } + if let Some((peer, hash, number, j)) = self.extra_justifications.on_response(who, block.justification) { + return Ok(OnBlockJustification::Import { peer, hash, number, justification: j }) + } + } else { + // we might have asked the peer for a justification on a block that we thought it had + // (regardless of whether it had a justification for it or not). + trace!(target: "sync", "Peer {:?} provided empty response for justification request {:?}", who, hash) + } + } + + Ok(OnBlockJustification::Nothing) + } + + /// Handle new finality proof data. + pub fn on_block_finality_proof + (&mut self, who: PeerId, resp: FinalityProofResponse) -> Result, BadPeer> + { + let peer = + if let Some(peer) = self.peers.get_mut(&who) { + peer + } else { + error!(target: "sync", "Called on_block_finality_proof_data with a bad peer ID"); + return Ok(OnBlockFinalityProof::Nothing) + }; + + if let PeerSyncState::DownloadingFinalityProof(hash) = peer.state { + peer.state = PeerSyncState::Available; + + // We only request one finality proof at a time. + if hash != resp.block { + info!("Invalid block finality proof provided: requested: {:?} got: {:?}", hash, resp.block); + return Err(BadPeer(who, i32::min_value())) + } + + if let Some((peer, hash, number, p)) = self.extra_finality_proofs.on_response(who, resp.proof) { + return Ok(OnBlockFinalityProof::Import { peer, hash, number, proof: p }) + } + } + + Ok(OnBlockFinalityProof::Nothing) + } + + /// A batch of blocks have been processed, with or without errors. + /// + /// Call this when a batch of blocks have been processed by the import + /// queue, with or without errors. + pub fn on_blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { + for hash in processed_blocks { + self.queue_blocks.remove(&hash); + } + if has_error { + self.best_importing_number = Zero::zero() + } + } + + /// Call this when a justification has been processed by the import queue, + /// with or without errors. + pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; if !self.extra_justifications.try_finalize_root((hash, number), finalization_result, true) { debug!(target: "sync", "Got justification import result for unknown justification {:?} {:?} request.", hash, number, - ); + ) } } - /// Request a finality proof for the given block. - /// - /// Queues a new finality proof request and tries to dispatch all pending requests. - pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor, protocol: &mut dyn Context) { - self.extra_finality_proofs.schedule((*hash, number), |base, block| { - protocol.client().is_descendent_of(base, block) - }); - self.send_finality_proof_request(protocol) - } - - pub fn finality_proof_import_result( - &mut self, - request_block: (B::Hash, NumberFor), - finalization_result: Result<(B::Hash, NumberFor), ()>, - ) { - self.extra_finality_proofs.try_finalize_root(request_block, finalization_result, true); - } - - /// Log that a block has been successfully imported - pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { - trace!(target: "sync", "Block imported successfully {} ({})", number, hash); + pub fn on_finality_proof_import(&mut self, req: (B::Hash, NumberFor), res: Result<(B::Hash, NumberFor), ()>) { + self.extra_finality_proofs.try_finalize_root(req, res, true); } /// Notify about finalization of the given block. - pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor, protocol: &mut dyn Context) { + pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { + let client = &self.client; let r = self.extra_finality_proofs.on_block_finalized(hash, number, |base, block| { - protocol.client().is_descendent_of(base, block) + client.is_descendent_of(base, block) }); if let Err(err) = r { - warn!(target: "sync", "Error cleaning up pending extra finality proof data requests: {:?}", err); + warn!(target: "sync", "Error cleaning up pending extra finality proof data requests: {:?}", err) } + let client = &self.client; let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| { - protocol.client().is_descendent_of(base, block) + client.is_descendent_of(base, block) }); if let Err(err) = r { @@ -743,9 +736,11 @@ impl ChainSync { } } - /// Called when a block has been queued for import. Updates our internal state for best queued - /// block and then goes through all peers to update our view of their state as well. - fn block_queued(&mut self, hash: &B::Hash, number: NumberFor) { + /// Called when a block has been queued for import. + /// + /// Updates our internal state for best queued block and then goes + /// through all peers to update our view of their state as well. + fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor) { if number > self.best_queued_number { self.best_queued_number = number; self.best_queued_hash = *hash; @@ -774,43 +769,29 @@ impl ChainSync { } } - /// Signal that `best_header` has been queued for import and update the `ChainSync` state with - /// that information. - pub(crate) fn update_chain_info(&mut self, best_header: &B::Header) { - let hash = best_header.hash(); - self.block_queued(&hash, best_header.number().clone()) - } - /// Call when a node announces a new block. /// - /// If true is returned, then the caller MUST try to import passed header (call `on_block_data`). - /// The network request isn't sent in this case. - /// Both hash and header is passed as an optimization to avoid rehashing the header. - #[must_use] - pub(crate) fn on_block_announce( - &mut self, - protocol: &mut dyn Context, - who: PeerId, - hash: B::Hash, - header: &B::Header, - ) -> bool { + /// If true is returned, then the caller MUST try to import passed + /// header (call `on_block_data`). The network request isn't sent + /// in this case. Both hash and header is passed as an optimization + /// to avoid rehashing the header. + pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, header: &B::Header) -> OnBlockAnnounce { let number = *header.number(); debug!(target: "sync", "Received block announcement with number {:?}", number); if number.is_zero() { warn!(target: "sync", "Ignored genesis block (#0) announcement from {}: {}", who, hash); - return false; + return OnBlockAnnounce::Nothing } - let parent_status = block_status(&*protocol.client(), &self.queue_blocks, header.parent_hash().clone()).ok() - .unwrap_or(BlockStatus::Unknown); + let parent_status = self.block_status(header.parent_hash()).ok().unwrap_or(BlockStatus::Unknown); let known_parent = parent_status != BlockStatus::Unknown; let ancient_parent = parent_status == BlockStatus::InChainPruned; - let known = self.is_known(protocol, &hash); + let known = self.is_known(&hash); let peer = if let Some(peer) = self.peers.get_mut(&who) { peer } else { error!(target: "sync", "Called on_block_announce with a bad peer ID"); - return false; + return OnBlockAnnounce::Nothing }; while peer.recently_announced.len() >= ANNOUNCE_HISTORY_SIZE { peer.recently_announced.pop_front(); @@ -822,7 +803,7 @@ impl ChainSync { peer.best_hash = hash; } if let PeerSyncState::AncestorSearch(_, _) = peer.state { - return false; + return OnBlockAnnounce::Nothing } // We assume that the announced block is the latest they have seen, and so our common number // is either one further ahead or it's the one they just announced, if we know about it. @@ -835,260 +816,284 @@ impl ChainSync { // known block case if known || self.is_already_downloading(&hash) { trace!(target: "sync", "Known block announce from {}: {}", who, hash); - return false; + return OnBlockAnnounce::Nothing } // stale block case let requires_additional_data = !self.role.is_light(); if number <= self.best_queued_number { if !(known_parent || self.is_already_downloading(header.parent_hash())) { - if protocol.client().block_status(&BlockId::Number(*header.number())) - .unwrap_or(BlockStatus::Unknown) == BlockStatus::InChainPruned - { + let block_status = self.client.block_status(&BlockId::Number(*header.number())) + .unwrap_or(BlockStatus::Unknown); + if block_status == BlockStatus::InChainPruned { trace!( target: "sync", - "Ignored unknown ancient block announced from {}: {} {:?}", - who, hash, header + "Ignored unknown ancient block announced from {}: {} {:?}", who, hash, header ); - return false; + return OnBlockAnnounce::Nothing } - trace!( target: "sync", - "Considering new unknown stale block announced from {}: {} {:?}", - who, hash, header + "Considering new unknown stale block announced from {}: {} {:?}", who, hash, header ); - let request = self.download_unknown_stale(&who, &hash); - match request { - Some(request) => if requires_additional_data { - protocol.send_block_request(who, request); - return false; + if let Some(request) = self.download_unknown_stale(&who, &hash) { + if requires_additional_data { + return OnBlockAnnounce::Request(who, request) } else { - return true; - }, - None => return false, + return OnBlockAnnounce::ImportHeader + } + } else { + return OnBlockAnnounce::Nothing } } else { if ancient_parent { - trace!( - target: "sync", - "Ignored ancient stale block announced from {}: {} {:?}", - who, hash, header - ); - return false; + trace!(target: "sync", "Ignored ancient stale block announced from {}: {} {:?}", who, hash, header); + return OnBlockAnnounce::Nothing } - - let request = self.download_stale(&who, &hash); - match request { - Some(request) => if requires_additional_data { - protocol.send_block_request(who, request); - return false; + if let Some(request) = self.download_stale(&who, &hash) { + if requires_additional_data { + return OnBlockAnnounce::Request(who, request) } else { - return true; - }, - None => return false, + return OnBlockAnnounce::ImportHeader + } + } else { + return OnBlockAnnounce::Nothing } } } if ancient_parent { trace!(target: "sync", "Ignored ancient block announced from {}: {} {:?}", who, hash, header); - return false; + return OnBlockAnnounce::Nothing } trace!(target: "sync", "Considering new block announced from {}: {} {:?}", who, hash, header); + let (range, request) = match self.select_new_blocks(who.clone()) { Some((range, request)) => (range, request), - None => return false, + None => return OnBlockAnnounce::Nothing }; - let is_required_data_available = - !requires_additional_data && - range.end - range.start == One::one() && - range.start == *header.number(); + + let is_required_data_available = !requires_additional_data + && range.end - range.start == One::one() + && range.start == *header.number(); + if !is_required_data_available { - protocol.send_block_request(who, request); - return false; + return OnBlockAnnounce::Request(who, request) } - true - } - - /// Convenience function to iterate through all peers and see if there are any that we are - /// downloading this hash from. - fn is_already_downloading(&self, hash: &B::Hash) -> bool { - self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) - } - - /// Returns true if the block with given hash exists in the import queue with known status or is - /// already imported. - fn is_known(&self, protocol: &mut dyn Context, hash: &B::Hash) -> bool { - block_status(&*protocol.client(), &self.queue_blocks, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown) + OnBlockAnnounce::ImportHeader } /// Call when a peer has disconnected. - pub(crate) fn peer_disconnected(&mut self, protocol: &mut dyn Context, who: PeerId) { + pub fn peer_disconnected(&mut self, who: PeerId) { self.blocks.clear_peer_download(&who); self.peers.remove(&who); self.extra_justifications.peer_disconnected(&who); self.extra_finality_proofs.peer_disconnected(&who); - self.maintain_sync(protocol); } /// Restart the sync process. - pub(crate) fn restart( - &mut self, - protocol: &mut dyn Context, - mut peer_info: impl FnMut(&PeerId) -> Option> - ) { + pub fn restart<'a, F> + (&'a mut self, mut peer_info: F) -> impl Iterator), BadPeer>> + 'a + where F: FnMut(&PeerId) -> Option> + 'a + { self.queue_blocks.clear(); self.best_importing_number = Zero::zero(); self.blocks.clear(); - let info = protocol.client().info(); + let info = self.client.info(); self.best_queued_hash = info.chain.best_hash; self.best_queued_number = info.chain.best_number; debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash); - let ids: Vec = self.peers.drain().map(|(id, _)| id).collect(); - for id in ids { - if let Some(info) = peer_info(&id) { - self.new_peer(protocol, id, info); + let old_peers = std::mem::replace(&mut self.peers, HashMap::new()); + old_peers.into_iter().filter_map(move |(id, _)| { + let info = peer_info(&id)?; + match self.new_peer(id.clone(), info) { + Ok(None) => None, + Ok(Some(x)) => Some(Ok((id, x))), + Err(e) => Some(Err(e)) } - } + }) } - // Download old block with known parent. - fn download_stale( - &mut self, - who: &PeerId, - hash: &B::Hash, - ) -> Option> { + /// Download old block with known parent. + fn download_stale(&mut self, who: &PeerId, hash: &B::Hash) -> Option> { let peer = self.peers.get_mut(who)?; - match peer.state { - PeerSyncState::Available => { - peer.state = PeerSyncState::DownloadingStale(*hash); - Some(message::generic::BlockRequest { - id: 0, - fields: self.required_block_attributes.clone(), - from: message::FromBlock::Hash(*hash), - to: None, - direction: message::Direction::Ascending, - max: Some(1), - }) - }, - _ => None, + if !peer.state.is_available() { + return None } - } - - // Download old block with unknown parent. - fn download_unknown_stale( - &mut self, - who: &PeerId, - hash: &B::Hash, - ) -> Option> { - let peer = self.peers.get_mut(who)?; - match peer.state { - PeerSyncState::Available => { - peer.state = PeerSyncState::DownloadingStale(*hash); - Some(message::generic::BlockRequest { - id: 0, - fields: self.required_block_attributes.clone(), - from: message::FromBlock::Hash(*hash), - to: None, - direction: message::Direction::Descending, - max: Some(MAX_UNKNOWN_FORK_DOWNLOAD_LEN), - }) - }, - _ => None, - } - } - - // Issue a request for a peer to download new blocks, if any are available. - fn download_new(&mut self, protocol: &mut dyn Context, who: PeerId) { - if let Some((_, request)) = self.select_new_blocks(who.clone()) { - protocol.send_block_request(who, request); - } - } - - // Select a range of NEW blocks to download from peer. - fn select_new_blocks(&mut self, who: PeerId) -> Option<(Range>, message::BlockRequest)> { - // when there are too many blocks in the queue => do not try to download new blocks - if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { - trace!(target: "sync", "Too many blocks in the queue."); - return None; - } - - let peer = self.peers.get_mut(&who)?; - match peer.state { - PeerSyncState::Available => { - trace!( - target: "sync", - "Considering new block download from {}, common block is {}, best is {:?}", - who, - peer.common_number, - peer.best_number, - ); - let range = self.blocks.needed_blocks( - who.clone(), - MAX_BLOCKS_TO_REQUEST, - peer.best_number, - peer.common_number - ); - match range { - Some(range) => { - trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end); - let from = message::FromBlock::Number(range.start); - let max = Some((range.end - range.start).saturated_into::()); - peer.state = PeerSyncState::DownloadingNew(range.start); - Some(( - range, - message::generic::BlockRequest { - id: 0, - fields: self.required_block_attributes.clone(), - from, - to: None, - direction: message::Direction::Ascending, - max, - }, - )) - }, - None => { - trace!(target: "sync", "Nothing to request"); - None - }, - } - }, - _ => { - trace!(target: "sync", "Peer {} is busy", who); - None - }, - } - } - - /// Request the ancestry for a block. Sends a request for header and justification for the given - /// block number. Used during ancestry search. - fn request_ancestry(protocol: &mut dyn Context, who: PeerId, block: NumberFor) { - trace!(target: "sync", "Requesting ancestry block #{} from {}", block, who); - let request = message::generic::BlockRequest { + peer.state = PeerSyncState::DownloadingStale(*hash); + Some(message::generic::BlockRequest { id: 0, - fields: message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION, - from: message::FromBlock::Number(block), + fields: self.required_block_attributes.clone(), + from: message::FromBlock::Hash(*hash), to: None, direction: message::Direction::Ascending, max: Some(1), + }) + } + + /// Download old block with unknown parent. + fn download_unknown_stale(&mut self, who: &PeerId, hash: &B::Hash) -> Option> { + let peer = self.peers.get_mut(who)?; + if !peer.state.is_available() { + return None + } + peer.state = PeerSyncState::DownloadingStale(*hash); + Some(message::generic::BlockRequest { + id: 0, + fields: self.required_block_attributes.clone(), + from: message::FromBlock::Hash(*hash), + to: None, + direction: message::Direction::Descending, + max: Some(MAX_UNKNOWN_FORK_DOWNLOAD_LEN), + }) + } + + /// Select a range of new blocks to download from the given peer. + fn select_new_blocks(&mut self, who: PeerId) -> Option<(Range>, BlockRequest)> { + // when there are too many blocks in the queue => do not try to download new blocks + if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { + trace!(target: "sync", "Too many blocks in the queue."); + return None + } + + let peer = self.peers.get_mut(&who)?; + + if !peer.state.is_available() { + trace!(target: "sync", "Peer {} is busy", who); + return None + } + + trace!( + target: "sync", + "Considering new block download from {}, common block is {}, best is {:?}", + who, + peer.common_number, + peer.best_number + ); + + if let Some((range, req)) = peer_block_request(&who, peer, &mut self.blocks, &self.required_block_attributes) { + trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end); + peer.state = PeerSyncState::DownloadingNew(range.start); + Some((range, req)) + } else { + trace!(target: "sync", "Nothing to request from {}", who); + None + } + } + + /// What is the status of the block corresponding to the given hash? + fn block_status(&self, hash: &B::Hash) -> Result { + if self.queue_blocks.contains(hash) { + return Ok(BlockStatus::Queued) + } + self.client.block_status(&BlockId::Hash(*hash)) + } + + /// Is the block corresponding to the given hash known? + fn is_known(&self, hash: &B::Hash) -> bool { + self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown) + } + + /// Is any peer downloading the given hash? + fn is_already_downloading(&self, hash: &B::Hash) -> bool { + self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) + } +} + +/// Request the ancestry for a block. Sends a request for header and justification for the given +/// block number. Used during ancestry search. +fn ancestry_request(block: NumberFor) -> BlockRequest { + message::generic::BlockRequest { + id: 0, + fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION, + from: message::FromBlock::Number(block), + to: None, + direction: message::Direction::Ascending, + max: Some(1) + } +} + +/// The ancestor search state expresses which algorithm, and its stateful parameters, we are using to +/// try to find an ancestor block +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum AncestorSearchState { + /// Use exponential backoff to find an ancestor, then switch to binary search. + /// We keep track of the exponent. + ExponentialBackoff(NumberFor), + /// Using binary search to find the best ancestor. + /// We keep track of left and right bounds. + BinarySearch(NumberFor, NumberFor), +} + +/// This function handles the ancestor search strategy used. The goal is to find a common point +/// that both our chains agree on that is as close to the tip as possible. +/// The way this works is we first have an exponential backoff strategy, where we try to step +/// forward until we find a block hash mismatch. The size of the step doubles each step we take. +/// +/// When we've found a block hash mismatch we then fall back to a binary search between the two +/// last known points to find the common block closest to the tip. +fn handle_ancestor_search_state( + state: &AncestorSearchState, + curr_block_num: NumberFor, + block_hash_match: bool +) -> Option<(AncestorSearchState, NumberFor)> { + let two = >::one() + >::one(); + match state { + AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => { + let next_distance_to_tip = *next_distance_to_tip; + if block_hash_match && next_distance_to_tip == One::one() { + // We found the ancestor in the first step so there is no need to execute binary search. + return None; + } + if block_hash_match { + let left = curr_block_num; + let right = left + next_distance_to_tip / two; + let middle = left + (right - left) / two; + Some((AncestorSearchState::BinarySearch(left, right), middle)) + } else { + let next_block_num = curr_block_num.checked_sub(&next_distance_to_tip) + .unwrap_or_else(Zero::zero); + let next_distance_to_tip = next_distance_to_tip * two; + Some((AncestorSearchState::ExponentialBackoff(next_distance_to_tip), next_block_num)) + } + } + AncestorSearchState::BinarySearch(mut left, mut right) => { + if left >= curr_block_num { + return None; + } + if block_hash_match { + left = curr_block_num; + } else { + right = curr_block_num; + } + assert!(right >= left); + let middle = left + (right - left) / two; + Some((AncestorSearchState::BinarySearch(left, right), middle)) + } + } +} + +/// Get a new block request for the peer if any. +fn peer_block_request( + id: &PeerId, + peer: &PeerSync, + blocks: &mut BlockCollection, + attrs: &message::BlockAttributes, +) -> Option<(Range>, BlockRequest)> { + if let Some(range) = blocks.needed_blocks(id.clone(), MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) { + let request = message::generic::BlockRequest { + id: 0, + fields: attrs.clone(), + from: message::FromBlock::Number(range.start), + to: None, + direction: message::Direction::Ascending, + max: Some((range.end - range.start).saturated_into::()) }; - protocol.send_block_request(who, request); + Some((range, request)) + } else { + None } } -/// Returns the BlockStatus for given block hash, looking first in the import queue and then in the -/// provided chain. -fn block_status( - chain: &dyn crate::chain::Client, - queue_blocks: &HashSet, - hash: B::Hash) -> Result -{ - if queue_blocks.contains(&hash) { - return Ok(BlockStatus::Queued); - } - - chain.block_status(&BlockId::Hash(hash)) -}