From 9ee79d5c5edcb17c40f4dd0dbc3b9e930e30f0ce Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Fri, 12 Jul 2019 20:37:38 +0200 Subject: [PATCH] Remove `sync::Context` trait. (#3105) Instead of passing a context around to each method, thereby introducing side-effecting I/O actions everywhere, with this PR `sync::ChainSync` only contains state which is updated by invoking various callback methods (`on_*`) and actionable items are returned as regular results from method calls, often iterators yielding requests that should be issued to peers. It is up to the caller to handle these in an appropriate way, currently `protocol` will send those as messages. --- substrate/Cargo.lock | 1 + substrate/core/network/Cargo.toml | 1 + substrate/core/network/src/protocol.rs | 217 ++- substrate/core/network/src/protocol/sync.rs | 1597 ++++++++++--------- 4 files changed, 902 insertions(+), 914 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 021eca7758..9a65cf8f4b 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -4542,6 +4542,7 @@ dependencies = [ "bitflags 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", + "either 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "erased-serde 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/core/network/Cargo.toml b/substrate/core/network/Cargo.toml index 446e0e0118..8b34317470 100644 --- a/substrate/core/network/Cargo.toml +++ b/substrate/core/network/Cargo.toml @@ -9,6 +9,7 @@ edition = "2018" [dependencies] bytes = "0.4" derive_more = "0.14.0" +either = "1.5.2" log = "0.4" parking_lot = "0.8.0" bitflags = "1.0" diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index c0f40d4332..9e876e7285 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -28,17 +28,13 @@ use runtime_primitives::traits::{ Block as BlockT, Header as HeaderT, NumberFor, One, Zero, CheckedSub, SaturatedConversion }; -use message::{ - BlockRequest as BlockRequestMessage, - FinalityProofRequest as FinalityProofRequestMessage, Message, -}; -use message::{BlockAttributes, Direction, FromBlock, RequestId}; +use message::{BlockAttributes, Direction, FromBlock, Message, RequestId}; use message::generic::{Message as GenericMessage, ConsensusMessage}; use event::Event; use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use on_demand::{OnDemandCore, OnDemandNetwork, RequestData}; use specialization::NetworkSpecialization; -use sync::{ChainSync, Context as SyncContext, SyncState}; +use sync::{ChainSync, SyncState}; use crate::service::{TransactionPool, ExHashT}; use crate::config::{BoxFinalityProofRequestBuilder, Roles}; use rustc_hex::ToHex; @@ -326,38 +322,6 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, } } -impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext for ProtocolContext<'a, B, H> { - fn report_peer(&mut self, who: PeerId, reputation: i32) { - self.peerset_handle.report_peer(who, reputation) - } - - fn disconnect_peer(&mut self, who: PeerId) { - self.behaviour.disconnect_peer(&who) - } - - fn client(&self) -> &dyn Client { - &*self.context_data.chain - } - - fn send_finality_proof_request(&mut self, who: PeerId, request: FinalityProofRequestMessage) { - send_message( - self.behaviour, - &mut self.context_data.peers, - who, - GenericMessage::FinalityProofRequest(request) - ) - } - - fn send_block_request(&mut self, who: PeerId, request: BlockRequestMessage) { - send_message( - self.behaviour, - &mut self.context_data.peers, - who, - GenericMessage::BlockRequest(request) - ) - } -} - /// Data necessary to create a context. struct ContextData { // All connected peers @@ -394,7 +358,7 @@ impl, H: ExHashT> Protocol { peerset_config: peerset::PeersetConfig, ) -> error::Result<(Protocol, peerset::PeersetHandle)> { let info = chain.info(); - let sync = ChainSync::new(config.roles, &info, finality_proof_request_builder); + let sync = ChainSync::new(config.roles, chain.clone(), &info, finality_proof_request_builder); let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config); let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::>(); let behaviour = CustomProto::new(protocol_id, versions, peerset); @@ -661,7 +625,7 @@ impl, H: ExHashT> Protocol { if peer_data.info.protocol_version > 2 { self.consensus_gossip.peer_disconnected(&mut context, peer.clone()); } - self.sync.peer_disconnected(&mut context, peer.clone()); + self.sync.peer_disconnected(peer.clone()); self.specialization.on_disconnect(&mut context, peer.clone()); self.on_demand_core.on_disconnect(OnDemandIn { behaviour: &mut self.behaviour, @@ -792,29 +756,29 @@ impl, H: ExHashT> Protocol { // TODO [andre]: move this logic to the import queue so that // justifications are imported asynchronously (#1482) if request.fields == message::BlockAttributes::JUSTIFICATION { - let outcome = self.sync.on_block_justification_data( - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), - peer, - response - ); - - if let Some((origin, hash, nb, just)) = outcome { - CustomMessageOutcome::JustificationImport(origin, hash, nb, just) - } else { - CustomMessageOutcome::None + match self.sync.on_block_justification(peer, response) { + Ok(sync::OnBlockJustification::Nothing) => CustomMessageOutcome::None, + Ok(sync::OnBlockJustification::Import { peer, hash, number, justification }) => + CustomMessageOutcome::JustificationImport(peer, hash, number, justification), + Err(sync::BadPeer(id, repu)) => { + self.behaviour.disconnect_peer(&id); + self.peerset_handle.report_peer(id, repu); + CustomMessageOutcome::None + } } - } else { - let outcome = self.sync.on_block_data( - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), - peer, - request, - response - ); - if let Some((origin, blocks)) = outcome { - CustomMessageOutcome::BlockImport(origin, blocks) - } else { - CustomMessageOutcome::None + match self.sync.on_block_data(peer, request, response) { + Ok(sync::OnBlockData::Import(origin, blocks)) => + CustomMessageOutcome::BlockImport(origin, blocks), + Ok(sync::OnBlockData::Request(peer, req)) => { + self.send_message(peer, GenericMessage::BlockRequest(req)); + CustomMessageOutcome::None + } + Err(sync::BadPeer(id, repu)) => { + self.behaviour.disconnect_peer(&id); + self.peerset_handle.report_peer(id, repu); + CustomMessageOutcome::None + } } } } @@ -827,7 +791,6 @@ impl, H: ExHashT> Protocol { &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle) ); self.maintain_peers(); - self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle)); self.on_demand_core.maintain_peers(OnDemandIn { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), @@ -953,8 +916,15 @@ impl, H: ExHashT> Protocol { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who.clone(), status.roles, status.best_number); + match self.sync.new_peer(who.clone(), info) { + Ok(None) => (), + Ok(Some(req)) => self.send_message(who.clone(), GenericMessage::BlockRequest(req)), + Err(sync::BadPeer(id, repu)) => { + self.behaviour.disconnect_peer(&id); + self.peerset_handle.report_peer(id, repu) + } + } let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); - self.sync.new_peer(&mut context, who.clone(), info); if protocol_version > 2 { self.consensus_gossip.new_peer(&mut context, who.clone(), status.roles); } @@ -1085,28 +1055,28 @@ impl, H: ExHashT> Protocol { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), }, who.clone(), *header.number()); - let try_import = self.sync.on_block_announce( - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), - who.clone(), - hash, - &header, - ); - // try_import is only true when we have all data required to import block - // in the BlockAnnounce message. This is only when: - // 1) we're on light client; - // AND - // - EITHER 2.1) announced block is stale; - // - OR 2.2) announced block is NEW and we normally only want to download this single block (i.e. - // there are no ascendants of this block scheduled for retrieval) - if !try_import { - return CustomMessageOutcome::None; + match self.sync.on_block_announce(who.clone(), hash, &header) { + sync::OnBlockAnnounce::Request(peer, req) => { + self.send_message(peer, GenericMessage::BlockRequest(req)); + return CustomMessageOutcome::None + } + sync::OnBlockAnnounce::Nothing => { + // try_import is only true when we have all data required to import block + // in the BlockAnnounce message. This is only when: + // 1) we're on light client; + // AND + // - EITHER 2.1) announced block is stale; + // - OR 2.2) announced block is NEW and we normally only want to download this single block (i.e. + // there are no ascendants of this block scheduled for retrieval) + return CustomMessageOutcome::None + } + sync::OnBlockAnnounce::ImportHeader => () // We proceed with the import. } // to import header from announced block let's construct response to request that normally would have // been sent over network (but it is not in our case) let blocks_to_import = self.sync.on_block_data( - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), who.clone(), message::generic::BlockRequest { id: 0, @@ -1131,8 +1101,16 @@ impl, H: ExHashT> Protocol { }, ); match blocks_to_import { - Some((origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), - None => CustomMessageOutcome::None, + Ok(sync::OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), + Ok(sync::OnBlockData::Request(peer, req)) => { + self.send_message(peer, GenericMessage::BlockRequest(req)); + CustomMessageOutcome::None + } + Err(sync::BadPeer(id, repu)) => { + self.behaviour.disconnect_peer(&id); + self.peerset_handle.report_peer(id, repu); + CustomMessageOutcome::None + } } } @@ -1166,11 +1144,7 @@ impl, H: ExHashT> Protocol { /// Call this when a block has been finalized. The sync layer may have some additional /// requesting to perform. pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { - self.sync.on_block_finalized( - &hash, - *header.number(), - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), - ); + self.sync.on_block_finalized(&hash, *header.number()) } fn on_remote_call_request( @@ -1217,9 +1191,7 @@ impl, H: ExHashT> Protocol { /// Uses `protocol` to queue a new justification request and tries to dispatch all pending /// requests. pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - let mut context = - ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); - self.sync.request_justification(&hash, number, &mut context); + self.sync.request_justification(&hash, number) } /// Clears all pending justification requests. @@ -1230,43 +1202,43 @@ impl, H: ExHashT> Protocol { /// A batch of blocks have been processed, with or without errors. /// Call this when a batch of blocks have been processed by the import queue, with or without /// errors. - pub fn blocks_processed( - &mut self, - processed_blocks: Vec, - has_error: bool - ) { - let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); - self.sync.blocks_processed(&mut context, processed_blocks, has_error); + pub fn blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { + self.sync.on_blocks_processed(processed_blocks, has_error); } /// Restart the sync process. pub fn restart(&mut self) { let peers = self.context_data.peers.clone(); - let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); - self.sync.restart(&mut context, |peer_id| peers.get(peer_id).map(|i| i.info.clone())); + for result in self.sync.restart(|peer_id| peers.get(peer_id).map(|i| i.info.clone())) { + match result { + Ok((id, req)) => { + let msg = GenericMessage::BlockRequest(req); + send_message(&mut self.behaviour, &mut self.context_data.peers, id, msg) + } + Err(sync::BadPeer(id, repu)) => { + self.behaviour.disconnect_peer(&id); + self.peerset_handle.report_peer(id, repu) + } + } + } } /// Notify about successful import of the given block. pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { - self.sync.block_imported(hash, number) + trace!(target: "sync", "Block imported successfully {} ({})", number, hash) } /// Call this when a justification has been processed by the import queue, with or without /// errors. pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor, success: bool) { - self.sync.justification_import_result(hash, number, success) + self.sync.on_justification_import(hash, number, success) } /// Request a finality proof for the given block. /// /// Queues a new finality proof request and tries to dispatch all pending requests. - pub fn request_finality_proof( - &mut self, - hash: &B::Hash, - number: NumberFor - ) { - let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); - self.sync.request_finality_proof(&hash, number, &mut context); + pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { + self.sync.request_finality_proof(&hash, number) } pub fn finality_proof_import_result( @@ -1274,7 +1246,7 @@ impl, H: ExHashT> Protocol { request_block: (B::Hash, NumberFor), finalization_result: Result<(B::Hash, NumberFor), ()>, ) { - self.sync.finality_proof_import_result(request_block, finalization_result) + self.sync.on_finality_proof_import(request_block, finalization_result) } fn on_remote_call_response( @@ -1475,16 +1447,15 @@ impl, H: ExHashT> Protocol { response: message::FinalityProofResponse, ) -> CustomMessageOutcome { trace!(target: "sync", "Finality proof response from {} for {}", who, response.block); - let outcome = self.sync.on_block_finality_proof_data( - &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), - who, - response, - ); - - if let Some((origin, hash, nb, proof)) = outcome { - CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) - } else { - CustomMessageOutcome::None + match self.sync.on_block_finality_proof(who, response) { + Ok(sync::OnBlockFinalityProof::Nothing) => CustomMessageOutcome::None, + Ok(sync::OnBlockFinalityProof::Import { peer, hash, number, proof }) => + CustomMessageOutcome::FinalityProofImport(peer, hash, number, proof), + Err(sync::BadPeer(id, repu)) => { + self.behaviour.disconnect_peer(&id); + self.peerset_handle.report_peer(id, repu); + CustomMessageOutcome::None + } } } @@ -1575,6 +1546,16 @@ Protocol { self.propagate_extrinsics(); } + for (id, r) in self.sync.block_requests() { + send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::BlockRequest(r)) + } + for (id, r) in self.sync.justification_requests() { + send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::BlockRequest(r)) + } + for (id, r) in self.sync.finality_proof_requests() { + send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::FinalityProofRequest(r)) + } + let event = match self.behaviour.poll(params) { Async::NotReady => return Async::NotReady, Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev, diff --git a/substrate/core/network/src/protocol/sync.rs b/substrate/core/network/src/protocol/sync.rs index 8f5218b15c..75658a2f08 100644 --- a/substrate/core/network/src/protocol/sync.rs +++ b/substrate/core/network/src/protocol/sync.rs @@ -1,16 +1,16 @@ // Copyright 2017-2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. - +// // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. - +// // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. - +// // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . @@ -24,133 +24,67 @@ //! //! The `ChainSync` struct maintains the state of the block requests. Whenever something happens on //! the network, or whenever a block has been successfully verified, call the appropriate method in -//! order to update it. You must also regularly call `tick()`. -//! -//! To each of these methods, you must pass a `Context` object that the `ChainSync` will use to -//! send its new outgoing requests. +//! order to update it. //! -use std::cmp::max; -use std::ops::Range; -use std::collections::{HashMap, VecDeque}; -use log::{debug, trace, warn, info, error}; -use crate::protocol::PeerInfo as ProtocolPeerInfo; -use libp2p::PeerId; -use client::{BlockStatus, ClientInfo}; -use consensus::{BlockOrigin, import_queue::IncomingBlock}; -use client::error::Error as ClientError; use blocks::BlockCollection; -use extra_requests::ExtraRequests; -use runtime_primitives::traits::{ - Block as BlockT, Header as HeaderT, NumberFor, Zero, One, - CheckedSub, SaturatedConversion +use client::{BlockStatus, ClientInfo, error::Error as ClientError}; +use consensus::{BlockOrigin, import_queue::IncomingBlock}; +use crate::{ + config::{Roles, BoxFinalityProofRequestBuilder}, + message::{self, generic::FinalityProofRequest, BlockAttributes, BlockRequest, BlockResponse, FinalityProofResponse}, + protocol }; -use runtime_primitives::{Justification, generic::BlockId}; -use crate::message; -use crate::config::{Roles, BoxFinalityProofRequestBuilder}; -use std::collections::HashSet; +use either::Either; +use extra_requests::ExtraRequests; +use libp2p::PeerId; +use log::{debug, trace, warn, info, error}; +use runtime_primitives::{ + Justification, + generic::BlockId, + traits::{Block, Header, NumberFor, Zero, One, CheckedSub, SaturatedConversion} +}; +use std::{fmt, ops::Range, collections::{HashMap, HashSet, VecDeque}, sync::Arc}; mod blocks; mod extra_requests; /// Maximum blocks to request in a single packet. const MAX_BLOCKS_TO_REQUEST: usize = 128; + /// Maximum blocks to store in the import queue. const MAX_IMPORTING_BLOCKS: usize = 2048; -/// We use a heuristic that with a high likelihood, by the time `MAJOR_SYNC_BLOCKS` have been -/// imported we'll be on the same chain as (or at least closer to) the peer so we want to delay the -/// ancestor search to not waste time doing that when we're so far behind. -const MAJOR_SYNC_BLOCKS: usize = 5; + +/// We use a heuristic that with a high likelihood, by the time +/// `MAJOR_SYNC_BLOCKS` have been imported we'll be on the same +/// chain as (or at least closer to) the peer so we want to delay +/// the ancestor search to not waste time doing that when we are +/// so far behind. +const MAJOR_SYNC_BLOCKS: u8 = 5; + /// Number of recently announced blocks to track for each peer. const ANNOUNCE_HISTORY_SIZE: usize = 64; + /// Max number of blocks to download for unknown forks. const MAX_UNKNOWN_FORK_DOWNLOAD_LEN: u32 = 32; -/// Reputation change when a peer sent us a status message that led to a database read error. + +/// Reputation change when a peer sent us a status message that led to a +/// database read error. const BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE: i32 = -(1 << 16); -/// Reputation change when a peer failed to answer our legitimate ancestry block search. + +/// Reputation change when a peer failed to answer our legitimate ancestry +/// block search. const ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE: i32 = -(1 << 9); -/// Reputation change when a peer sent us a status message with a different genesis than us. + +/// Reputation change when a peer sent us a status message with a different +/// genesis than us. const GENESIS_MISMATCH_REPUTATION_CHANGE: i32 = i32::min_value() + 1; -/// Context for a network-specific handler. -pub trait Context { - /// Get a reference to the client. - fn client(&self) -> &dyn crate::chain::Client; - - /// Adjusts the reputation of the peer. Use this to point out that a peer has been malign or - /// irresponsible or appeared lazy. - fn report_peer(&mut self, who: PeerId, reputation: i32); - - /// Force disconnecting from a peer. Use this when a peer misbehaved. - fn disconnect_peer(&mut self, who: PeerId); - - /// Request a finality proof from a peer. - fn send_finality_proof_request(&mut self, who: PeerId, request: message::FinalityProofRequest); - - /// Request a block from a peer. - fn send_block_request(&mut self, who: PeerId, request: message::BlockRequest); -} - -#[derive(Debug, Clone)] -/// All the data we have about a Peer that we are trying to sync with -pub(crate) struct PeerSync { - /// The common number is the block number that is a common point of ancestry for both our chains - /// (as far as we know) - pub common_number: NumberFor, - /// The hash of the best block that we've seen for this peer - pub best_hash: B::Hash, - /// The number of the best block that we've seen for this peer - pub best_number: NumberFor, - /// The state of syncing this peer is in for us, generally categories into `Available` or "busy" - /// with something as defined by `PeerSyncState`. - pub state: PeerSyncState, - /// A queue of blocks that this peer has announced to us, should only contain - /// `ANNOUNCE_HISTORY_SIZE` entries. - pub recently_announced: VecDeque, -} - -/// The sync status of a peer we are trying to sync with -#[derive(Debug)] -pub(crate) struct PeerInfo { - /// Their best block hash. - pub best_hash: B::Hash, - /// Their best block number. - pub best_number: NumberFor, -} - -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -/// The ancestor search state expresses which algorithm, and its stateful parameters, we are using to -/// try to find an ancestor block -pub(crate) enum AncestorSearchState { - /// Use exponential backoff to find an ancestor, then switch to binary search. - /// We keep track of the exponent. - ExponentialBackoff(NumberFor), - /// Using binary search to find the best ancestor. - /// We keep track of left and right bounds. - BinarySearch(NumberFor, NumberFor), -} - -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -/// The state of syncing between a Peer and ourselves. Generally two categories, "busy" or -/// `Available`. If busy, the Enum defines what we are busy with. -pub(crate) enum PeerSyncState { - /// Searching for ancestors the Peer has in common with us. - AncestorSearch(NumberFor, AncestorSearchState), - /// Available for sync requests. - Available, - /// Actively downloading new blocks, starting from the given Number. - DownloadingNew(NumberFor), - /// Downloading a stale block with given Hash. Stale means that it's a block with a number that - /// is lower than our best number. It might be from a fork and not necessarily already imported. - DownloadingStale(B::Hash), - /// Downloading justification for given block hash. - DownloadingJustification(B::Hash), - /// Downloading finality proof for given block hash. - DownloadingFinalityProof(B::Hash), -} - -/// The main data structure to contain all the state for a chains active syncing strategy. -pub struct ChainSync { +/// The main data structure which contains all the state for a chains +/// active syncing strategy. +pub struct ChainSync { + /// Chain client. + client: Arc>, /// The active peers that we are using to sync and their PeerSync status peers: HashMap>, /// A `BlockCollection` of blocks that are being downloaded from peers @@ -161,17 +95,78 @@ pub struct ChainSync { best_queued_hash: B::Hash, /// The role of this node, e.g. light or full role: Roles, - /// What block attributes we require for this node, usually derived from what role we are, but - /// could be customized + /// What block attributes we require for this node, usually derived from + /// what role we are, but could be customized required_block_attributes: message::BlockAttributes, + /// Any extra finality proof requests. extra_finality_proofs: ExtraRequests, + /// Any extra justification requests. extra_justifications: ExtraRequests, - /// A set of hashes of blocks that are being downloaded or have been downloaded and are queued - /// for import. + /// A set of hashes of blocks that are being downloaded or have been + /// downloaded and are queued for import. queue_blocks: HashSet, - /// The best block number that we are currently importing + /// The best block number that we are currently importing. best_importing_number: NumberFor, - request_builder: Option>, + request_builder: Option> +} + +/// All the data we have about a Peer that we are trying to sync with +#[derive(Debug, Clone)] +pub struct PeerSync { + /// The common number is the block number that is a common point of + /// ancestry for both our chains (as far as we know). + pub common_number: NumberFor, + /// The hash of the best block that we've seen for this peer. + pub best_hash: B::Hash, + /// The number of the best block that we've seen for this peer. + pub best_number: NumberFor, + /// The state of syncing this peer is in for us, generally categories + /// into `Available` or "busy" with something as defined by `PeerSyncState`. + pub state: PeerSyncState, + /// A queue of blocks that this peer has announced to us, should only + /// contain `ANNOUNCE_HISTORY_SIZE` entries. + pub recently_announced: VecDeque +} + +/// The sync status of a peer we are trying to sync with +#[derive(Debug)] +pub struct PeerInfo { + /// Their best block hash. + pub best_hash: B::Hash, + /// Their best block number. + pub best_number: NumberFor +} + +/// The state of syncing between a Peer and ourselves. +/// +/// Generally two categories, "busy" or `Available`. If busy, the enum +/// defines what we are busy with. +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum PeerSyncState { + /// Available for sync requests. + Available, + /// Searching for ancestors the Peer has in common with us. + AncestorSearch(NumberFor, AncestorSearchState), + /// Actively downloading new blocks, starting from the given Number. + DownloadingNew(NumberFor), + /// Downloading a stale block with given Hash. Stale means that it is a + /// block with a number that is lower than our best number. It might be + /// from a fork and not necessarily already imported. + DownloadingStale(B::Hash), + /// Downloading justification for given block hash. + DownloadingJustification(B::Hash), + /// Downloading finality proof for given block hash. + DownloadingFinalityProof(B::Hash) +} + +impl PeerSyncState { + pub fn is_available(&self) -> bool { + if let PeerSyncState::Available = self { + true + } else { + false + } + } } /// Reported sync state. @@ -183,32 +178,93 @@ pub enum SyncState { Downloading } -/// Syncing status and statistics +/// Syncing status and statistics. #[derive(Clone)] -pub struct Status { +pub struct Status { /// Current global sync state. pub state: SyncState, /// Target sync block number. pub best_seen_block: Option>, /// Number of peers participating in syncing. - pub num_peers: u32, + pub num_peers: u32 } -impl ChainSync { - /// Create a new instance. Pass the initial known state of the chain. - pub(crate) fn new( +/// A peer did not behave as expected and should be reported. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BadPeer(pub PeerId, pub i32); + +impl fmt::Display for BadPeer { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "bad peer {}; reputation change: {}", self.0, self.1) + } +} + +impl std::error::Error for BadPeer {} + +/// Result of [`ChainSync::on_block_data`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OnBlockData { + /// The block should be imported. + Import(BlockOrigin, Vec>), + /// A new block request needs to be made to the given peer. + Request(PeerId, BlockRequest) +} + +/// Result of [`ChainSync::on_block_announce`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OnBlockAnnounce { + /// The announcement does not require further handling. + Nothing, + /// The announcement header should be imported. + ImportHeader, + /// Another block request to the given peer is necessary. + Request(PeerId, BlockRequest) +} + +/// Result of [`ChainSync::on_block_justification`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OnBlockJustification { + /// The justification needs no further handling. + Nothing, + /// The justification should be imported. + Import { + peer: PeerId, + hash: B::Hash, + number: NumberFor, + justification: Justification + } +} + +/// Result of [`ChainSync::on_block_finality_proof`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OnBlockFinalityProof { + /// The proof needs no further handling. + Nothing, + /// The proof should be imported. + Import { + peer: PeerId, + hash: B::Hash, + number: NumberFor, + proof: Vec + } +} + +impl ChainSync { + /// Create a new instance. + pub fn new( role: Roles, + client: Arc>, info: &ClientInfo, request_builder: Option> ) -> Self { - let mut required_block_attributes = - message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION; + let mut required_block_attributes = BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION; if role.is_full() { - required_block_attributes |= message::BlockAttributes::BODY; + required_block_attributes |= BlockAttributes::BODY } ChainSync { + client, peers: HashMap::new(), blocks: BlockCollection::new(), best_queued_hash: info.chain.best_hash, @@ -219,98 +275,85 @@ impl ChainSync { required_block_attributes, queue_blocks: Default::default(), best_importing_number: Zero::zero(), - request_builder, + request_builder } } - /// Returns the number for the best seen blocks among connected peers, if any - fn best_seen_block(&self) -> Option> { - self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number) + /// Returns the state of the sync of the given peer. + /// + /// Returns `None` if the peer is unknown. + pub fn peer_info(&self, who: &PeerId) -> Option> { + self.peers.get(who).map(|p| PeerInfo { best_hash: p.best_hash, best_number: p.best_number }) } - /// Returns the SyncState that we are currently in based on a provided `best_seen` block number. - /// A chain is classified as downloading if the provided best block is more than `MAJOR_SYNC_BLOCKS` - /// behind the best queued block. - fn state(&self, best_seen: &Option>) -> SyncState { - match best_seen { - &Some(n) if n > self.best_queued_number && n - self.best_queued_number > 5.into() => SyncState::Downloading, - _ => SyncState::Idle, - } - } + /// Returns the current sync status. + pub fn status(&self) -> Status { + let best_seen = self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number); + let sync_state = + if let Some(n) = best_seen { + // A chain is classified as downloading if the provided best block is + // more than `MAJOR_SYNC_BLOCKS` behind the best queued block. + if n > self.best_queued_number && n - self.best_queued_number > MAJOR_SYNC_BLOCKS.into() { + SyncState::Downloading + } else { + SyncState::Idle + } + } else { + SyncState::Idle + }; - /// Returns the state of the sync of the given peer. Returns `None` if the peer is unknown. - pub(crate) fn peer_info(&self, who: &PeerId) -> Option> { - self.peers.get(who).map(|peer| { - PeerInfo { - best_hash: peer.best_hash, - best_number: peer.best_number, - } - }) - } - - /// Returns sync status. - pub(crate) fn status(&self) -> Status { - let best_seen = self.best_seen_block(); - let state = self.state(&best_seen); Status { - state, + state: sync_state, best_seen_block: best_seen, - num_peers: self.peers.len() as u32, + num_peers: self.peers.len() as u32 } } - /// Handle new connected peer. Call this method whenever we connect to a new peer. - pub(crate) fn new_peer( - &mut self, - protocol: &mut dyn Context, - who: PeerId, - info: ProtocolPeerInfo - ) { - // there's nothing sync can get from the node that has no blockchain data - // (the opposite is not true, but all requests are served at protocol level) + /// Handle a new connected peer. + /// + /// Call this method whenever we connect to a new peer. + pub fn new_peer(&mut self, who: PeerId, info: protocol::PeerInfo) -> Result>, BadPeer> { + // There is nothing sync can get from the node that has no blockchain data. if !info.roles.is_full() { - return; + return Ok(None) } - - let status = block_status(&*protocol.client(), &self.queue_blocks, info.best_hash); - match (status, info.best_number) { - (Err(e), _) => { + match self.block_status(&info.best_hash) { + Err(e) => { debug!(target:"sync", "Error reading blockchain: {:?}", e); - protocol.report_peer(who.clone(), BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE); - protocol.disconnect_peer(who); - }, - (Ok(BlockStatus::KnownBad), _) => { + Err(BadPeer(who, BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE)) + } + Ok(BlockStatus::KnownBad) => { info!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number); - protocol.report_peer(who.clone(), i32::min_value()); - protocol.disconnect_peer(who); - }, - (Ok(BlockStatus::Unknown), b) if b.is_zero() => { - info!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number); - protocol.report_peer(who.clone(), i32::min_value()); - protocol.disconnect_peer(who); - }, - (Ok(BlockStatus::Unknown), _) if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS => { + Err(BadPeer(who, i32::min_value())) + } + Ok(BlockStatus::Unknown) => { + if info.best_number.is_zero() { + info!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number); + return Err(BadPeer(who, i32::min_value())) + } + // If there are more than `MAJOR_SYNC_BLOCKS` in the import queue then we have // enough to do in the import queue that it's not worth kicking off // an ancestor search, which is what we do in the next match case below. - debug!( - target:"sync", - "New peer with unknown best hash {} ({}), assuming common block.", - self.best_queued_hash, - self.best_queued_number - ); - self.peers.insert(who, PeerSync { - common_number: self.best_queued_number, - best_hash: info.best_hash, - best_number: info.best_number, - state: PeerSyncState::Available, - recently_announced: Default::default(), - }); - } - (Ok(BlockStatus::Unknown), _) => { - let our_best = self.best_queued_number; - if our_best.is_zero() { - // We are at genesis, just start downloading + if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS.into() { + debug!( + target:"sync", + "New peer with unknown best hash {} ({}), assuming common block.", + self.best_queued_hash, + self.best_queued_number + ); + self.peers.insert(who, PeerSync { + common_number: self.best_queued_number, + best_hash: info.best_hash, + best_number: info.best_number, + state: PeerSyncState::Available, + recently_announced: Default::default() + }); + return Ok(None) + } + + // If we are at genesis, just start downloading. + if self.best_queued_number.is_zero() { debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number); self.peers.insert(who.clone(), PeerSync { common_number: Zero::zero(), @@ -319,30 +362,31 @@ impl ChainSync { state: PeerSyncState::Available, recently_announced: Default::default(), }); - self.download_new(protocol, who) - } else { - let common_best = ::std::cmp::min(our_best, info.best_number); - debug!(target:"sync", - "New peer with unknown best hash {} ({}), searching for common ancestor.", - info.best_hash, - info.best_number - ); - self.peers.insert(who.clone(), PeerSync { - common_number: Zero::zero(), - best_hash: info.best_hash, - best_number: info.best_number, - state: PeerSyncState::AncestorSearch( - common_best, - AncestorSearchState::ExponentialBackoff(One::one()) - ), - recently_announced: Default::default(), - }); - Self::request_ancestry(protocol, who, common_best) + return Ok(self.select_new_blocks(who).map(|(_, req)| req)) } - }, - (Ok(BlockStatus::Queued), _) | - (Ok(BlockStatus::InChainWithState), _) | - (Ok(BlockStatus::InChainPruned), _) => { + + let common_best = std::cmp::min(self.best_queued_number, info.best_number); + + debug!(target:"sync", + "New peer with unknown best hash {} ({}), searching for common ancestor.", + info.best_hash, + info.best_number + ); + + self.peers.insert(who, PeerSync { + common_number: Zero::zero(), + best_hash: info.best_hash, + best_number: info.best_number, + state: PeerSyncState::AncestorSearch( + common_best, + AncestorSearchState::ExponentialBackoff(One::one()) + ), + recently_announced: Default::default() + }); + + Ok(Some(ancestry_request::(common_best))) + } + Ok(BlockStatus::Queued) | Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => { debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number); self.peers.insert(who.clone(), PeerSync { common_number: info.best_number, @@ -351,337 +395,23 @@ impl ChainSync { state: PeerSyncState::Available, recently_announced: Default::default(), }); + Ok(None) } } } - /// This function handles the ancestor search strategy used. The goal is to find a common point - /// that both our chains agree on that is as close to the tip as possible. - /// The way this works is we first have an exponential backoff strategy, where we try to step - /// forward until we find a block hash mismatch. The size of the step doubles each step we take. - /// - /// When we've found a block hash mismatch we then fall back to a binary search between the two - /// last known points to find the common block closest to the tip. - fn handle_ancestor_search_state( - state: AncestorSearchState, - curr_block_num: NumberFor, - block_hash_match: bool, - ) -> Option<(AncestorSearchState, NumberFor)> { - let two = >::one() + >::one(); - match state { - AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => { - if block_hash_match && next_distance_to_tip == One::one() { - // We found the ancestor in the first step so there is no need to execute binary search. - return None; - } - if block_hash_match { - let left = curr_block_num; - let right = left + next_distance_to_tip / two; - let middle = left + (right - left) / two; - Some((AncestorSearchState::BinarySearch(left, right), middle)) - } else { - let next_block_num = curr_block_num.checked_sub(&next_distance_to_tip) - .unwrap_or_else(Zero::zero); - let next_distance_to_tip = next_distance_to_tip * two; - Some((AncestorSearchState::ExponentialBackoff(next_distance_to_tip), next_block_num)) - } - }, - AncestorSearchState::BinarySearch(mut left, mut right) => { - if left >= curr_block_num { - return None; - } - if block_hash_match { - left = curr_block_num; - } else { - right = curr_block_num; - } - assert!(right >= left); - let middle = left + (right - left) / two; - Some((AncestorSearchState::BinarySearch(left, right), middle)) - }, - } + /// Signal that `best_header` has been queued for import and update the + /// `ChainSync` state with that information. + pub fn update_chain_info(&mut self, best_header: &B::Header) { + self.on_block_queued(&best_header.hash(), *best_header.number()) } - /// Handle a response from the remote to a block request that we made. - /// - /// `request` must be the original request that triggered `response`. - /// - /// If this corresponds to a valid block, this outputs the block that must be imported in the - /// import queue. - #[must_use] - pub(crate) fn on_block_data( - &mut self, - protocol: &mut dyn Context, - who: PeerId, - request: message::BlockRequest, - response: message::BlockResponse - ) -> Option<(BlockOrigin, Vec>)> { - let new_blocks: Vec> = if let Some(ref mut peer) = self.peers.get_mut(&who) { - let mut blocks = response.blocks; - if request.direction == message::Direction::Descending { - trace!(target: "sync", "Reversing incoming block list"); - blocks.reverse(); - } - let peer_state = peer.state.clone(); - match peer_state { - PeerSyncState::DownloadingNew(start_block) => { - self.blocks.clear_peer_download(&who); - peer.state = PeerSyncState::Available; - self.blocks.insert(start_block, blocks, who); - self.blocks - .drain(self.best_queued_number + One::one()) - .into_iter() - .map(|block_data| { - IncomingBlock { - hash: block_data.block.hash, - header: block_data.block.header, - body: block_data.block.body, - justification: block_data.block.justification, - origin: block_data.origin, - } - }).collect() - }, - PeerSyncState::DownloadingStale(_) => { - peer.state = PeerSyncState::Available; - blocks.into_iter().map(|b| { - IncomingBlock { - hash: b.hash, - header: b.header, - body: b.body, - justification: b.justification, - origin: Some(who.clone()), - } - }).collect() - }, - PeerSyncState::AncestorSearch(num, state) => { - let block_hash_match = match (blocks.get(0), protocol.client().block_hash(num)) { - (Some(ref block), Ok(maybe_our_block_hash)) => { - trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", num, block.hash, who); - maybe_our_block_hash.map_or(false, |x| x == block.hash) - }, - (None, _) => { - debug!(target: "sync", "Invalid response when searching for ancestor from {}", who); - protocol.report_peer(who.clone(), i32::min_value()); - protocol.disconnect_peer(who); - return None - }, - (_, Err(e)) => { - info!("Error answering legitimate blockchain query: {:?}", e); - protocol.report_peer(who.clone(), ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE); - protocol.disconnect_peer(who); - return None - }, - }; - if block_hash_match && peer.common_number < num { - peer.common_number = num; - } - if !block_hash_match && num.is_zero() { - trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); - protocol.report_peer(who.clone(), GENESIS_MISMATCH_REPUTATION_CHANGE); - protocol.disconnect_peer(who); - return None - } - if let Some((next_state, next_block_num)) = - Self::handle_ancestor_search_state(state, num, block_hash_match) { - peer.state = PeerSyncState::AncestorSearch(next_block_num, next_state); - Self::request_ancestry(protocol, who, next_block_num); - return None - } else { - peer.state = PeerSyncState::Available; - vec![] - } - }, - PeerSyncState::Available | - PeerSyncState::DownloadingJustification(..) | - PeerSyncState::DownloadingFinalityProof(..) => Vec::new(), - } - } else { - Vec::new() - }; - - let is_recent = new_blocks - .first() - .map(|block| self.peers.iter().any(|(_, peer)| peer.recently_announced.contains(&block.hash))) - .unwrap_or(false); - let origin = if is_recent { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync }; - - if let Some((hash, number)) = new_blocks.last() - .and_then(|b| b.header.as_ref().map(|h| (b.hash.clone(), *h.number()))) - { - trace!(target:"sync", "Accepted {} blocks ({:?}) with origin {:?}", new_blocks.len(), hash, origin); - self.block_queued(&hash, number); - } - self.maintain_sync(protocol); - let new_best_importing_number = new_blocks - .last() - .and_then(|b| b.header.as_ref().map(|h| h.number().clone())) - .unwrap_or_else(|| Zero::zero()); - self.queue_blocks - .extend(new_blocks.iter().map(|b| b.hash.clone())); - self.best_importing_number = max(new_best_importing_number, self.best_importing_number); - Some((origin, new_blocks)) - } - - /// Handle a response from the remote to a justification request that we made. - /// - /// `request` must be the original request that triggered `response`. - /// - /// Returns `Some` if this produces a justification that must be imported into the import - /// queue. - #[must_use] - pub(crate) fn on_block_justification_data( - &mut self, - protocol: &mut dyn Context, - who: PeerId, - response: message::BlockResponse, - ) -> Option<(PeerId, B::Hash, NumberFor, Justification)> - { - let peer = if let Some(peer) = self.peers.get_mut(&who) { - peer - } else { - error!(target: "sync", "Called on_block_justification_data with a bad peer ID"); - return None; - }; - - if let PeerSyncState::DownloadingJustification(hash) = peer.state { - peer.state = PeerSyncState::Available; - - // we only request one justification at a time - match response.blocks.into_iter().next() { - Some(response) => { - if hash != response.hash { - info!("Invalid block justification provided by {}: requested: {:?} got: {:?}", - who, hash, response.hash); - protocol.report_peer(who.clone(), i32::min_value()); - protocol.disconnect_peer(who); - return None; - } - return self.extra_justifications.on_response(who, response.justification) - } - None => { - // we might have asked the peer for a justification on a block that we thought it had - // (regardless of whether it had a justification for it or not). - trace!(target: "sync", "Peer {:?} provided empty response for justification request {:?}", - who, - hash, - ); - return None; - } - } - } - - self.maintain_sync(protocol); - None - } - - /// Handle new finality proof data. - pub(crate) fn on_block_finality_proof_data( - &mut self, - protocol: &mut dyn Context, - who: PeerId, - response: message::FinalityProofResponse, - ) -> Option<(PeerId, B::Hash, NumberFor, Vec)> { - let peer = if let Some(peer) = self.peers.get_mut(&who) { - peer - } else { - error!(target: "sync", "Called on_block_finality_proof_data with a bad peer ID"); - return None; - }; - - if let PeerSyncState::DownloadingFinalityProof(hash) = peer.state { - peer.state = PeerSyncState::Available; - - // we only request one finality proof at a time - if hash != response.block { - info!( - "Invalid block finality proof provided: requested: {:?} got: {:?}", - hash, - response.block, - ); - - protocol.report_peer(who.clone(), i32::min_value()); - protocol.disconnect_peer(who); - return None; - } - - return self.extra_finality_proofs.on_response(who, response.proof) - } - - self.maintain_sync(protocol); - None - } - - /// A batch of blocks have been processed, with or without errors. - /// Call this when a batch of blocks have been processed by the import queue, with or without - /// errors. - pub fn blocks_processed(&mut self, protocol: &mut dyn Context, processed_blocks: Vec, has_error: bool) { - for hash in processed_blocks { - self.queue_blocks.remove(&hash); - } - if has_error { - self.best_importing_number = Zero::zero(); - } - self.maintain_sync(protocol) - } - - /// Maintain the sync process (download new blocks, fetch justifications). - fn maintain_sync(&mut self, protocol: &mut dyn Context) { - let peers: Vec = self.peers.keys().map(|p| p.clone()).collect(); - for peer in peers { - self.download_new(protocol, peer); - } - self.tick(protocol) - } - - /// Called periodically to perform any time-based actions. Must be called at a regular - /// interval. - pub fn tick(&mut self, protocol: &mut dyn Context) { - self.send_justification_requests(protocol); - self.send_finality_proof_request(protocol) - } - - fn send_justification_requests(&mut self, protocol: &mut dyn Context) { - let mut matcher = self.extra_justifications.matcher(); - while let Some((peer, request)) = matcher.next(&self.peers) { - self.peers.get_mut(&peer) - .expect("`Matcher::next` guarantees the `PeerId` comes from the given peers; qed") - .state = PeerSyncState::DownloadingJustification(request.0); - protocol.send_block_request(peer, message::generic::BlockRequest { - id: 0, - fields: message::BlockAttributes::JUSTIFICATION, - from: message::FromBlock::Hash(request.0), - to: None, - direction: message::Direction::Ascending, - max: Some(1) - }) - } - } - - fn send_finality_proof_request(&mut self, protocol: &mut dyn Context) { - let mut matcher = self.extra_finality_proofs.matcher(); - while let Some((peer, request)) = matcher.next(&self.peers) { - self.peers.get_mut(&peer) - .expect("`Matcher::next` guarantees the `PeerId` comes from the given peers; qed") - .state = PeerSyncState::DownloadingFinalityProof(request.0); - protocol.send_finality_proof_request(peer, message::generic::FinalityProofRequest { - id: 0, - block: request.0, - request: self.request_builder.as_mut() - .map(|builder| builder.build_request_data(&request.0)) - .unwrap_or_default() - }) - } - } - - /// Request a justification for the given block. - /// - /// Uses `protocol` to queue a new justification request and tries to dispatch all pending - /// requests. - pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor, protocol: &mut dyn Context) { + /// Schedule a justification request for the given block. + pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + let client = &self.client; self.extra_justifications.schedule((*hash, number), |base, block| { - protocol.client().is_descendent_of(base, block) - }); - self.send_justification_requests(protocol) + client.is_descendent_of(base, block) + }) } /// Clears all pending justification requests. @@ -689,53 +419,316 @@ impl ChainSync { self.extra_justifications.reset() } - /// Call this when a justification has been processed by the import queue, with or without - /// errors. - pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor, success: bool) { + /// Schedule a finality proof request for the given block. + pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { + let client = &self.client; + self.extra_finality_proofs.schedule((*hash, number), |base, block| { + client.is_descendent_of(base, block) + }) + } + + /// Get an iterator over all scheduled justification requests. + pub fn justification_requests(&mut self) -> impl Iterator)> + '_ { + let peers = &mut self.peers; + let mut matcher = self.extra_justifications.matcher(); + std::iter::from_fn(move || { + if let Some((peer, request)) = matcher.next(&peers) { + peers.get_mut(&peer) + .expect("`Matcher::next` guarantees the `PeerId` comes from the given peers; qed") + .state = PeerSyncState::DownloadingJustification(request.0); + let req = message::generic::BlockRequest { + id: 0, + fields: BlockAttributes::JUSTIFICATION, + from: message::FromBlock::Hash(request.0), + to: None, + direction: message::Direction::Ascending, + max: Some(1) + }; + Some((peer, req)) + } else { + None + } + }) + } + + /// Get an iterator over all scheduled finality proof requests. + pub fn finality_proof_requests(&mut self) -> impl Iterator)> + '_ { + let peers = &mut self.peers; + let request_builder = &mut self.request_builder; + let mut matcher = self.extra_finality_proofs.matcher(); + std::iter::from_fn(move || { + if let Some((peer, request)) = matcher.next(&peers) { + peers.get_mut(&peer) + .expect("`Matcher::next` guarantees the `PeerId` comes from the given peers; qed") + .state = PeerSyncState::DownloadingFinalityProof(request.0); + let req = message::generic::FinalityProofRequest { + id: 0, + block: request.0, + request: request_builder.as_mut() + .map(|builder| builder.build_request_data(&request.0)) + .unwrap_or_default() + }; + Some((peer, req)) + } else { + None + } + }) + } + + /// Get an iterator over all block requests of all peers. + pub fn block_requests(&mut self) -> impl Iterator)> + '_ { + if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { + trace!(target: "sync", "Too many blocks in the queue."); + return Either::Left(std::iter::empty()) + } + let blocks = &mut self.blocks; + let attrs = &self.required_block_attributes; + let iter = self.peers.iter_mut().filter_map(move |(id, peer)| { + if !peer.state.is_available() { + trace!(target: "sync", "Peer {} is busy", id); + return None + } + if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs) { + peer.state = PeerSyncState::DownloadingNew(range.start); + trace!(target: "sync", "new block request for {}", id); + Some((id.clone(), req)) + } else { + trace!(target: "sync", "no new block request for {}", id); + None + } + }); + Either::Right(iter) + } + + /// Handle a response from the remote to a block request that we made. + /// + /// `request` must be the original request that triggered `response`. + /// + /// If this corresponds to a valid block, this outputs the block that + /// must be imported in the import queue. + pub fn on_block_data + (&mut self, who: PeerId, request: BlockRequest, response: BlockResponse) -> Result, BadPeer> + { + let new_blocks: Vec> = + if let Some(peer) = self.peers.get_mut(&who) { + let mut blocks = response.blocks; + if request.direction == message::Direction::Descending { + trace!(target: "sync", "Reversing incoming block list"); + blocks.reverse() + } + match &mut peer.state { + PeerSyncState::DownloadingNew(start_block) => { + self.blocks.clear_peer_download(&who); + self.blocks.insert(*start_block, blocks, who); + peer.state = PeerSyncState::Available; + self.blocks + .drain(self.best_queued_number + One::one()) + .into_iter() + .map(|block_data| { + IncomingBlock { + hash: block_data.block.hash, + header: block_data.block.header, + body: block_data.block.body, + justification: block_data.block.justification, + origin: block_data.origin, + } + }).collect() + } + PeerSyncState::DownloadingStale(_) => { + peer.state = PeerSyncState::Available; + blocks.into_iter().map(|b| { + IncomingBlock { + hash: b.hash, + header: b.header, + body: b.body, + justification: b.justification, + origin: Some(who.clone()), + } + }).collect() + } + PeerSyncState::AncestorSearch(num, state) => { + let block_hash_match = match (blocks.get(0), self.client.block_hash(*num)) { + (Some(block), Ok(maybe_our_block_hash)) => { + trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", num, block.hash, who); + maybe_our_block_hash.map_or(false, |x| x == block.hash) + }, + (None, _) => { + debug!(target: "sync", "Invalid response when searching for ancestor from {}", who); + return Err(BadPeer(who, i32::min_value())) + }, + (_, Err(e)) => { + info!("Error answering legitimate blockchain query: {:?}", e); + return Err(BadPeer(who, ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE)) + } + }; + if block_hash_match && peer.common_number < *num { + peer.common_number = *num; + } + if !block_hash_match && num.is_zero() { + trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); + return Err(BadPeer(who, GENESIS_MISMATCH_REPUTATION_CHANGE)) + } + if let Some((next_state, next_num)) = handle_ancestor_search_state(state, *num, block_hash_match) { + peer.state = PeerSyncState::AncestorSearch(next_num, next_state); + return Ok(OnBlockData::Request(who, ancestry_request::(next_num))) + } else { + peer.state = PeerSyncState::Available; + Vec::new() + } + } + + | PeerSyncState::Available + | PeerSyncState::DownloadingJustification(..) + | PeerSyncState::DownloadingFinalityProof(..) => Vec::new() + } + } else { + Vec::new() + }; + + let is_recent = new_blocks.first() + .map(|block| { + self.peers.iter().any(|(_, peer)| peer.recently_announced.contains(&block.hash)) + }) + .unwrap_or(false); + + let origin = + if is_recent { + BlockOrigin::NetworkBroadcast + } else { + BlockOrigin::NetworkInitialSync + }; + + if let Some((h, n)) = new_blocks.last().and_then(|b| b.header.as_ref().map(|h| (&b.hash, *h.number()))) { + trace!(target:"sync", "Accepted {} blocks ({:?}) with origin {:?}", new_blocks.len(), h, origin); + self.on_block_queued(h, n) + } + + let new_best_importing_number = new_blocks.last() + .and_then(|b| b.header.as_ref().map(|h| *h.number())) + .unwrap_or_else(|| Zero::zero()); + + self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash)); + + self.best_importing_number = std::cmp::max(new_best_importing_number, self.best_importing_number); + + Ok(OnBlockData::Import(origin, new_blocks)) + } + + /// Handle a response from the remote to a justification request that we made. + /// + /// `request` must be the original request that triggered `response`. + /// + /// Returns `Some` if this produces a justification that must be imported + /// into the import queue. + pub fn on_block_justification + (&mut self, who: PeerId, response: BlockResponse) -> Result, BadPeer> + { + let peer = + if let Some(peer) = self.peers.get_mut(&who) { + peer + } else { + error!(target: "sync", "Called on_block_justification with a bad peer ID"); + return Ok(OnBlockJustification::Nothing) + }; + + if let PeerSyncState::DownloadingJustification(hash) = peer.state { + peer.state = PeerSyncState::Available; + + // We only request one justification at a time + debug_assert_eq!(1, response.blocks.len()); + + if let Some(block) = response.blocks.into_iter().next() { + if hash != block.hash { + info!( + "Invalid block justification provided by {}: requested: {:?} got: {:?}", who, hash, block.hash + ); + return Err(BadPeer(who, i32::min_value())) + } + if let Some((peer, hash, number, j)) = self.extra_justifications.on_response(who, block.justification) { + return Ok(OnBlockJustification::Import { peer, hash, number, justification: j }) + } + } else { + // we might have asked the peer for a justification on a block that we thought it had + // (regardless of whether it had a justification for it or not). + trace!(target: "sync", "Peer {:?} provided empty response for justification request {:?}", who, hash) + } + } + + Ok(OnBlockJustification::Nothing) + } + + /// Handle new finality proof data. + pub fn on_block_finality_proof + (&mut self, who: PeerId, resp: FinalityProofResponse) -> Result, BadPeer> + { + let peer = + if let Some(peer) = self.peers.get_mut(&who) { + peer + } else { + error!(target: "sync", "Called on_block_finality_proof_data with a bad peer ID"); + return Ok(OnBlockFinalityProof::Nothing) + }; + + if let PeerSyncState::DownloadingFinalityProof(hash) = peer.state { + peer.state = PeerSyncState::Available; + + // We only request one finality proof at a time. + if hash != resp.block { + info!("Invalid block finality proof provided: requested: {:?} got: {:?}", hash, resp.block); + return Err(BadPeer(who, i32::min_value())) + } + + if let Some((peer, hash, number, p)) = self.extra_finality_proofs.on_response(who, resp.proof) { + return Ok(OnBlockFinalityProof::Import { peer, hash, number, proof: p }) + } + } + + Ok(OnBlockFinalityProof::Nothing) + } + + /// A batch of blocks have been processed, with or without errors. + /// + /// Call this when a batch of blocks have been processed by the import + /// queue, with or without errors. + pub fn on_blocks_processed(&mut self, processed_blocks: Vec, has_error: bool) { + for hash in processed_blocks { + self.queue_blocks.remove(&hash); + } + if has_error { + self.best_importing_number = Zero::zero() + } + } + + /// Call this when a justification has been processed by the import queue, + /// with or without errors. + pub fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor, success: bool) { let finalization_result = if success { Ok((hash, number)) } else { Err(()) }; if !self.extra_justifications.try_finalize_root((hash, number), finalization_result, true) { debug!(target: "sync", "Got justification import result for unknown justification {:?} {:?} request.", hash, number, - ); + ) } } - /// Request a finality proof for the given block. - /// - /// Queues a new finality proof request and tries to dispatch all pending requests. - pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor, protocol: &mut dyn Context) { - self.extra_finality_proofs.schedule((*hash, number), |base, block| { - protocol.client().is_descendent_of(base, block) - }); - self.send_finality_proof_request(protocol) - } - - pub fn finality_proof_import_result( - &mut self, - request_block: (B::Hash, NumberFor), - finalization_result: Result<(B::Hash, NumberFor), ()>, - ) { - self.extra_finality_proofs.try_finalize_root(request_block, finalization_result, true); - } - - /// Log that a block has been successfully imported - pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { - trace!(target: "sync", "Block imported successfully {} ({})", number, hash); + pub fn on_finality_proof_import(&mut self, req: (B::Hash, NumberFor), res: Result<(B::Hash, NumberFor), ()>) { + self.extra_finality_proofs.try_finalize_root(req, res, true); } /// Notify about finalization of the given block. - pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor, protocol: &mut dyn Context) { + pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor) { + let client = &self.client; let r = self.extra_finality_proofs.on_block_finalized(hash, number, |base, block| { - protocol.client().is_descendent_of(base, block) + client.is_descendent_of(base, block) }); if let Err(err) = r { - warn!(target: "sync", "Error cleaning up pending extra finality proof data requests: {:?}", err); + warn!(target: "sync", "Error cleaning up pending extra finality proof data requests: {:?}", err) } + let client = &self.client; let r = self.extra_justifications.on_block_finalized(hash, number, |base, block| { - protocol.client().is_descendent_of(base, block) + client.is_descendent_of(base, block) }); if let Err(err) = r { @@ -743,9 +736,11 @@ impl ChainSync { } } - /// Called when a block has been queued for import. Updates our internal state for best queued - /// block and then goes through all peers to update our view of their state as well. - fn block_queued(&mut self, hash: &B::Hash, number: NumberFor) { + /// Called when a block has been queued for import. + /// + /// Updates our internal state for best queued block and then goes + /// through all peers to update our view of their state as well. + fn on_block_queued(&mut self, hash: &B::Hash, number: NumberFor) { if number > self.best_queued_number { self.best_queued_number = number; self.best_queued_hash = *hash; @@ -774,43 +769,29 @@ impl ChainSync { } } - /// Signal that `best_header` has been queued for import and update the `ChainSync` state with - /// that information. - pub(crate) fn update_chain_info(&mut self, best_header: &B::Header) { - let hash = best_header.hash(); - self.block_queued(&hash, best_header.number().clone()) - } - /// Call when a node announces a new block. /// - /// If true is returned, then the caller MUST try to import passed header (call `on_block_data`). - /// The network request isn't sent in this case. - /// Both hash and header is passed as an optimization to avoid rehashing the header. - #[must_use] - pub(crate) fn on_block_announce( - &mut self, - protocol: &mut dyn Context, - who: PeerId, - hash: B::Hash, - header: &B::Header, - ) -> bool { + /// If true is returned, then the caller MUST try to import passed + /// header (call `on_block_data`). The network request isn't sent + /// in this case. Both hash and header is passed as an optimization + /// to avoid rehashing the header. + pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, header: &B::Header) -> OnBlockAnnounce { let number = *header.number(); debug!(target: "sync", "Received block announcement with number {:?}", number); if number.is_zero() { warn!(target: "sync", "Ignored genesis block (#0) announcement from {}: {}", who, hash); - return false; + return OnBlockAnnounce::Nothing } - let parent_status = block_status(&*protocol.client(), &self.queue_blocks, header.parent_hash().clone()).ok() - .unwrap_or(BlockStatus::Unknown); + let parent_status = self.block_status(header.parent_hash()).ok().unwrap_or(BlockStatus::Unknown); let known_parent = parent_status != BlockStatus::Unknown; let ancient_parent = parent_status == BlockStatus::InChainPruned; - let known = self.is_known(protocol, &hash); + let known = self.is_known(&hash); let peer = if let Some(peer) = self.peers.get_mut(&who) { peer } else { error!(target: "sync", "Called on_block_announce with a bad peer ID"); - return false; + return OnBlockAnnounce::Nothing }; while peer.recently_announced.len() >= ANNOUNCE_HISTORY_SIZE { peer.recently_announced.pop_front(); @@ -822,7 +803,7 @@ impl ChainSync { peer.best_hash = hash; } if let PeerSyncState::AncestorSearch(_, _) = peer.state { - return false; + return OnBlockAnnounce::Nothing } // We assume that the announced block is the latest they have seen, and so our common number // is either one further ahead or it's the one they just announced, if we know about it. @@ -835,260 +816,284 @@ impl ChainSync { // known block case if known || self.is_already_downloading(&hash) { trace!(target: "sync", "Known block announce from {}: {}", who, hash); - return false; + return OnBlockAnnounce::Nothing } // stale block case let requires_additional_data = !self.role.is_light(); if number <= self.best_queued_number { if !(known_parent || self.is_already_downloading(header.parent_hash())) { - if protocol.client().block_status(&BlockId::Number(*header.number())) - .unwrap_or(BlockStatus::Unknown) == BlockStatus::InChainPruned - { + let block_status = self.client.block_status(&BlockId::Number(*header.number())) + .unwrap_or(BlockStatus::Unknown); + if block_status == BlockStatus::InChainPruned { trace!( target: "sync", - "Ignored unknown ancient block announced from {}: {} {:?}", - who, hash, header + "Ignored unknown ancient block announced from {}: {} {:?}", who, hash, header ); - return false; + return OnBlockAnnounce::Nothing } - trace!( target: "sync", - "Considering new unknown stale block announced from {}: {} {:?}", - who, hash, header + "Considering new unknown stale block announced from {}: {} {:?}", who, hash, header ); - let request = self.download_unknown_stale(&who, &hash); - match request { - Some(request) => if requires_additional_data { - protocol.send_block_request(who, request); - return false; + if let Some(request) = self.download_unknown_stale(&who, &hash) { + if requires_additional_data { + return OnBlockAnnounce::Request(who, request) } else { - return true; - }, - None => return false, + return OnBlockAnnounce::ImportHeader + } + } else { + return OnBlockAnnounce::Nothing } } else { if ancient_parent { - trace!( - target: "sync", - "Ignored ancient stale block announced from {}: {} {:?}", - who, hash, header - ); - return false; + trace!(target: "sync", "Ignored ancient stale block announced from {}: {} {:?}", who, hash, header); + return OnBlockAnnounce::Nothing } - - let request = self.download_stale(&who, &hash); - match request { - Some(request) => if requires_additional_data { - protocol.send_block_request(who, request); - return false; + if let Some(request) = self.download_stale(&who, &hash) { + if requires_additional_data { + return OnBlockAnnounce::Request(who, request) } else { - return true; - }, - None => return false, + return OnBlockAnnounce::ImportHeader + } + } else { + return OnBlockAnnounce::Nothing } } } if ancient_parent { trace!(target: "sync", "Ignored ancient block announced from {}: {} {:?}", who, hash, header); - return false; + return OnBlockAnnounce::Nothing } trace!(target: "sync", "Considering new block announced from {}: {} {:?}", who, hash, header); + let (range, request) = match self.select_new_blocks(who.clone()) { Some((range, request)) => (range, request), - None => return false, + None => return OnBlockAnnounce::Nothing }; - let is_required_data_available = - !requires_additional_data && - range.end - range.start == One::one() && - range.start == *header.number(); + + let is_required_data_available = !requires_additional_data + && range.end - range.start == One::one() + && range.start == *header.number(); + if !is_required_data_available { - protocol.send_block_request(who, request); - return false; + return OnBlockAnnounce::Request(who, request) } - true - } - - /// Convenience function to iterate through all peers and see if there are any that we are - /// downloading this hash from. - fn is_already_downloading(&self, hash: &B::Hash) -> bool { - self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) - } - - /// Returns true if the block with given hash exists in the import queue with known status or is - /// already imported. - fn is_known(&self, protocol: &mut dyn Context, hash: &B::Hash) -> bool { - block_status(&*protocol.client(), &self.queue_blocks, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown) + OnBlockAnnounce::ImportHeader } /// Call when a peer has disconnected. - pub(crate) fn peer_disconnected(&mut self, protocol: &mut dyn Context, who: PeerId) { + pub fn peer_disconnected(&mut self, who: PeerId) { self.blocks.clear_peer_download(&who); self.peers.remove(&who); self.extra_justifications.peer_disconnected(&who); self.extra_finality_proofs.peer_disconnected(&who); - self.maintain_sync(protocol); } /// Restart the sync process. - pub(crate) fn restart( - &mut self, - protocol: &mut dyn Context, - mut peer_info: impl FnMut(&PeerId) -> Option> - ) { + pub fn restart<'a, F> + (&'a mut self, mut peer_info: F) -> impl Iterator), BadPeer>> + 'a + where F: FnMut(&PeerId) -> Option> + 'a + { self.queue_blocks.clear(); self.best_importing_number = Zero::zero(); self.blocks.clear(); - let info = protocol.client().info(); + let info = self.client.info(); self.best_queued_hash = info.chain.best_hash; self.best_queued_number = info.chain.best_number; debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash); - let ids: Vec = self.peers.drain().map(|(id, _)| id).collect(); - for id in ids { - if let Some(info) = peer_info(&id) { - self.new_peer(protocol, id, info); + let old_peers = std::mem::replace(&mut self.peers, HashMap::new()); + old_peers.into_iter().filter_map(move |(id, _)| { + let info = peer_info(&id)?; + match self.new_peer(id.clone(), info) { + Ok(None) => None, + Ok(Some(x)) => Some(Ok((id, x))), + Err(e) => Some(Err(e)) } - } + }) } - // Download old block with known parent. - fn download_stale( - &mut self, - who: &PeerId, - hash: &B::Hash, - ) -> Option> { + /// Download old block with known parent. + fn download_stale(&mut self, who: &PeerId, hash: &B::Hash) -> Option> { let peer = self.peers.get_mut(who)?; - match peer.state { - PeerSyncState::Available => { - peer.state = PeerSyncState::DownloadingStale(*hash); - Some(message::generic::BlockRequest { - id: 0, - fields: self.required_block_attributes.clone(), - from: message::FromBlock::Hash(*hash), - to: None, - direction: message::Direction::Ascending, - max: Some(1), - }) - }, - _ => None, + if !peer.state.is_available() { + return None } - } - - // Download old block with unknown parent. - fn download_unknown_stale( - &mut self, - who: &PeerId, - hash: &B::Hash, - ) -> Option> { - let peer = self.peers.get_mut(who)?; - match peer.state { - PeerSyncState::Available => { - peer.state = PeerSyncState::DownloadingStale(*hash); - Some(message::generic::BlockRequest { - id: 0, - fields: self.required_block_attributes.clone(), - from: message::FromBlock::Hash(*hash), - to: None, - direction: message::Direction::Descending, - max: Some(MAX_UNKNOWN_FORK_DOWNLOAD_LEN), - }) - }, - _ => None, - } - } - - // Issue a request for a peer to download new blocks, if any are available. - fn download_new(&mut self, protocol: &mut dyn Context, who: PeerId) { - if let Some((_, request)) = self.select_new_blocks(who.clone()) { - protocol.send_block_request(who, request); - } - } - - // Select a range of NEW blocks to download from peer. - fn select_new_blocks(&mut self, who: PeerId) -> Option<(Range>, message::BlockRequest)> { - // when there are too many blocks in the queue => do not try to download new blocks - if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { - trace!(target: "sync", "Too many blocks in the queue."); - return None; - } - - let peer = self.peers.get_mut(&who)?; - match peer.state { - PeerSyncState::Available => { - trace!( - target: "sync", - "Considering new block download from {}, common block is {}, best is {:?}", - who, - peer.common_number, - peer.best_number, - ); - let range = self.blocks.needed_blocks( - who.clone(), - MAX_BLOCKS_TO_REQUEST, - peer.best_number, - peer.common_number - ); - match range { - Some(range) => { - trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end); - let from = message::FromBlock::Number(range.start); - let max = Some((range.end - range.start).saturated_into::()); - peer.state = PeerSyncState::DownloadingNew(range.start); - Some(( - range, - message::generic::BlockRequest { - id: 0, - fields: self.required_block_attributes.clone(), - from, - to: None, - direction: message::Direction::Ascending, - max, - }, - )) - }, - None => { - trace!(target: "sync", "Nothing to request"); - None - }, - } - }, - _ => { - trace!(target: "sync", "Peer {} is busy", who); - None - }, - } - } - - /// Request the ancestry for a block. Sends a request for header and justification for the given - /// block number. Used during ancestry search. - fn request_ancestry(protocol: &mut dyn Context, who: PeerId, block: NumberFor) { - trace!(target: "sync", "Requesting ancestry block #{} from {}", block, who); - let request = message::generic::BlockRequest { + peer.state = PeerSyncState::DownloadingStale(*hash); + Some(message::generic::BlockRequest { id: 0, - fields: message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION, - from: message::FromBlock::Number(block), + fields: self.required_block_attributes.clone(), + from: message::FromBlock::Hash(*hash), to: None, direction: message::Direction::Ascending, max: Some(1), + }) + } + + /// Download old block with unknown parent. + fn download_unknown_stale(&mut self, who: &PeerId, hash: &B::Hash) -> Option> { + let peer = self.peers.get_mut(who)?; + if !peer.state.is_available() { + return None + } + peer.state = PeerSyncState::DownloadingStale(*hash); + Some(message::generic::BlockRequest { + id: 0, + fields: self.required_block_attributes.clone(), + from: message::FromBlock::Hash(*hash), + to: None, + direction: message::Direction::Descending, + max: Some(MAX_UNKNOWN_FORK_DOWNLOAD_LEN), + }) + } + + /// Select a range of new blocks to download from the given peer. + fn select_new_blocks(&mut self, who: PeerId) -> Option<(Range>, BlockRequest)> { + // when there are too many blocks in the queue => do not try to download new blocks + if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { + trace!(target: "sync", "Too many blocks in the queue."); + return None + } + + let peer = self.peers.get_mut(&who)?; + + if !peer.state.is_available() { + trace!(target: "sync", "Peer {} is busy", who); + return None + } + + trace!( + target: "sync", + "Considering new block download from {}, common block is {}, best is {:?}", + who, + peer.common_number, + peer.best_number + ); + + if let Some((range, req)) = peer_block_request(&who, peer, &mut self.blocks, &self.required_block_attributes) { + trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end); + peer.state = PeerSyncState::DownloadingNew(range.start); + Some((range, req)) + } else { + trace!(target: "sync", "Nothing to request from {}", who); + None + } + } + + /// What is the status of the block corresponding to the given hash? + fn block_status(&self, hash: &B::Hash) -> Result { + if self.queue_blocks.contains(hash) { + return Ok(BlockStatus::Queued) + } + self.client.block_status(&BlockId::Hash(*hash)) + } + + /// Is the block corresponding to the given hash known? + fn is_known(&self, hash: &B::Hash) -> bool { + self.block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown) + } + + /// Is any peer downloading the given hash? + fn is_already_downloading(&self, hash: &B::Hash) -> bool { + self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) + } +} + +/// Request the ancestry for a block. Sends a request for header and justification for the given +/// block number. Used during ancestry search. +fn ancestry_request(block: NumberFor) -> BlockRequest { + message::generic::BlockRequest { + id: 0, + fields: BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION, + from: message::FromBlock::Number(block), + to: None, + direction: message::Direction::Ascending, + max: Some(1) + } +} + +/// The ancestor search state expresses which algorithm, and its stateful parameters, we are using to +/// try to find an ancestor block +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +pub enum AncestorSearchState { + /// Use exponential backoff to find an ancestor, then switch to binary search. + /// We keep track of the exponent. + ExponentialBackoff(NumberFor), + /// Using binary search to find the best ancestor. + /// We keep track of left and right bounds. + BinarySearch(NumberFor, NumberFor), +} + +/// This function handles the ancestor search strategy used. The goal is to find a common point +/// that both our chains agree on that is as close to the tip as possible. +/// The way this works is we first have an exponential backoff strategy, where we try to step +/// forward until we find a block hash mismatch. The size of the step doubles each step we take. +/// +/// When we've found a block hash mismatch we then fall back to a binary search between the two +/// last known points to find the common block closest to the tip. +fn handle_ancestor_search_state( + state: &AncestorSearchState, + curr_block_num: NumberFor, + block_hash_match: bool +) -> Option<(AncestorSearchState, NumberFor)> { + let two = >::one() + >::one(); + match state { + AncestorSearchState::ExponentialBackoff(next_distance_to_tip) => { + let next_distance_to_tip = *next_distance_to_tip; + if block_hash_match && next_distance_to_tip == One::one() { + // We found the ancestor in the first step so there is no need to execute binary search. + return None; + } + if block_hash_match { + let left = curr_block_num; + let right = left + next_distance_to_tip / two; + let middle = left + (right - left) / two; + Some((AncestorSearchState::BinarySearch(left, right), middle)) + } else { + let next_block_num = curr_block_num.checked_sub(&next_distance_to_tip) + .unwrap_or_else(Zero::zero); + let next_distance_to_tip = next_distance_to_tip * two; + Some((AncestorSearchState::ExponentialBackoff(next_distance_to_tip), next_block_num)) + } + } + AncestorSearchState::BinarySearch(mut left, mut right) => { + if left >= curr_block_num { + return None; + } + if block_hash_match { + left = curr_block_num; + } else { + right = curr_block_num; + } + assert!(right >= left); + let middle = left + (right - left) / two; + Some((AncestorSearchState::BinarySearch(left, right), middle)) + } + } +} + +/// Get a new block request for the peer if any. +fn peer_block_request( + id: &PeerId, + peer: &PeerSync, + blocks: &mut BlockCollection, + attrs: &message::BlockAttributes, +) -> Option<(Range>, BlockRequest)> { + if let Some(range) = blocks.needed_blocks(id.clone(), MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) { + let request = message::generic::BlockRequest { + id: 0, + fields: attrs.clone(), + from: message::FromBlock::Number(range.start), + to: None, + direction: message::Direction::Ascending, + max: Some((range.end - range.start).saturated_into::()) }; - protocol.send_block_request(who, request); + Some((range, request)) + } else { + None } } -/// Returns the BlockStatus for given block hash, looking first in the import queue and then in the -/// provided chain. -fn block_status( - chain: &dyn crate::chain::Client, - queue_blocks: &HashSet, - hash: B::Hash) -> Result -{ - if queue_blocks.contains(&hash) { - return Ok(BlockStatus::Queued); - } - - chain.block_status(&BlockId::Hash(hash)) -}