diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index e2def15425..a65df59fd8 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -24,19 +24,12 @@ //! mod service; -mod sync; #[macro_use] mod protocol; mod chain; -mod blocks; -mod on_demand; mod on_demand_layer; -mod util; pub mod config; -pub mod consensus_gossip; pub mod error; -pub mod message; -pub mod specialization; #[cfg(any(test, feature = "test-helpers"))] pub mod test; @@ -46,8 +39,8 @@ pub use service::{ NetworkService, NetworkWorker, FetchFuture, TransactionPool, ManageNetwork, NetworkMsg, SyncProvider, ExHashT, ReportHandle, }; -pub use protocol::{ProtocolStatus, PeerInfo, Context}; -pub use sync::{Status as SyncStatus, SyncState}; +pub use protocol::{ProtocolStatus, PeerInfo, Context, consensus_gossip, message, specialization}; +pub use protocol::sync::{Status as SyncStatus, SyncState}; pub use network_libp2p::{ identity, multiaddr, ProtocolId, Multiaddr, @@ -57,7 +50,7 @@ pub use network_libp2p::{ }; pub use message::{generic as generic_message, RequestId, Status as StatusMessage}; pub use error::Error; -pub use on_demand::AlwaysBadChecker; +pub use protocol::on_demand::AlwaysBadChecker; pub use on_demand_layer::{OnDemand, RemoteResponse}; #[doc(hidden)] pub use runtime_primitives::traits::Block as BlockT; diff --git a/substrate/core/network/src/on_demand_layer.rs b/substrate/core/network/src/on_demand_layer.rs index e58e045f2e..86b3d6b7f4 100644 --- a/substrate/core/network/src/on_demand_layer.rs +++ b/substrate/core/network/src/on_demand_layer.rs @@ -16,7 +16,7 @@ //! On-demand requests service. -use crate::on_demand::RequestData; +use crate::protocol::on_demand::RequestData; use std::sync::Arc; use futures::{prelude::*, sync::mpsc, sync::oneshot}; use parking_lot::Mutex; diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 651f064513..576d37a82f 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -24,16 +24,16 @@ use runtime_primitives::traits::{ CheckedSub, SaturatedConversion }; use consensus::import_queue::SharedFinalityProofRequestBuilder; -use crate::message::{ - self, BlockRequest as BlockRequestMessage, +use message::{ + BlockRequest as BlockRequestMessage, FinalityProofRequest as FinalityProofRequestMessage, Message, }; -use crate::message::{BlockAttributes, Direction, FromBlock, RequestId}; -use crate::message::generic::{Message as GenericMessage, ConsensusMessage}; -use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; -use crate::on_demand::{OnDemandCore, OnDemandNetwork, RequestData}; -use crate::specialization::NetworkSpecialization; -use crate::sync::{ChainSync, Context as SyncContext, Status as SyncStatus, SyncState}; +use message::{BlockAttributes, Direction, FromBlock, RequestId}; +use message::generic::{Message as GenericMessage, ConsensusMessage}; +use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; +use on_demand::{OnDemandCore, OnDemandNetwork, RequestData}; +use specialization::NetworkSpecialization; +use sync::{ChainSync, Context as SyncContext, Status as SyncStatus, SyncState}; use crate::service::{TransactionPool, ExHashT}; use crate::config::Roles; use rustc_hex::ToHex; @@ -43,7 +43,15 @@ use std::{cmp, num::NonZeroUsize, time}; use log::{trace, debug, warn, error}; use crate::chain::{Client, FinalityProofProvider}; use client::light::fetcher::{FetchChecker, ChangesProof}; -use crate::{error, util::LruHashSet}; +use crate::error; +use util::LruHashSet; + +mod util; +pub mod consensus_gossip; +pub mod message; +pub mod on_demand; +pub mod specialization; +pub mod sync; const REQUEST_TIMEOUT_SEC: u64 = 40; /// Interval at which we perform time based maintenance diff --git a/substrate/core/network/src/consensus_gossip.rs b/substrate/core/network/src/protocol/consensus_gossip.rs similarity index 99% rename from substrate/core/network/src/consensus_gossip.rs rename to substrate/core/network/src/protocol/consensus_gossip.rs index 9d654b5bfa..5de6fb2602 100644 --- a/substrate/core/network/src/consensus_gossip.rs +++ b/substrate/core/network/src/protocol/consensus_gossip.rs @@ -208,7 +208,12 @@ pub trait Validator: Send + Sync { } /// Validate consensus message. - fn validate(&self, context: &mut dyn ValidatorContext, sender: &PeerId, data: &[u8]) -> ValidationResult; + fn validate( + &self, + context: &mut dyn ValidatorContext, + sender: &PeerId, + data: &[u8] + ) -> ValidationResult; /// Produce a closure for validating messages on a given topic. fn message_expired<'a>(&'a self) -> Box bool + 'a> { diff --git a/substrate/core/network/src/message.rs b/substrate/core/network/src/protocol/message.rs similarity index 100% rename from substrate/core/network/src/message.rs rename to substrate/core/network/src/protocol/message.rs diff --git a/substrate/core/network/src/on_demand.rs b/substrate/core/network/src/protocol/on_demand.rs similarity index 100% rename from substrate/core/network/src/on_demand.rs rename to substrate/core/network/src/protocol/on_demand.rs diff --git a/substrate/core/network/src/specialization.rs b/substrate/core/network/src/protocol/specialization.rs similarity index 100% rename from substrate/core/network/src/specialization.rs rename to substrate/core/network/src/protocol/specialization.rs diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/protocol/sync.rs similarity index 96% rename from substrate/core/network/src/sync.rs rename to substrate/core/network/src/protocol/sync.rs index f3f62cac01..4ec8f18e35 100644 --- a/substrate/core/network/src/sync.rs +++ b/substrate/core/network/src/protocol/sync.rs @@ -39,8 +39,8 @@ use network_libp2p::PeerId; use client::{BlockStatus, ClientInfo}; use consensus::{BlockOrigin, import_queue::{IncomingBlock, SharedFinalityProofRequestBuilder}}; use client::error::Error as ClientError; -use crate::blocks::BlockCollection; -use crate::sync::extra_requests::ExtraRequestsAggregator; +use blocks::BlockCollection; +use extra_requests::ExtraRequestsAggregator; use runtime_primitives::traits::{ Block as BlockT, Header as HeaderT, NumberFor, Zero, One, CheckedSub, SaturatedConversion @@ -50,6 +50,7 @@ use crate::message; use crate::config::Roles; use std::collections::HashSet; +mod blocks; mod extra_requests; // Maximum blocks to request in a single packet. @@ -303,13 +304,18 @@ impl ChainSync { common_number: Zero::zero(), best_hash: info.best_hash, best_number: info.best_number, - state: PeerSyncState::AncestorSearch(common_best, AncestorSearchState::ExponentialBackoff(One::one())), + state: PeerSyncState::AncestorSearch( + common_best, + AncestorSearchState::ExponentialBackoff(One::one()) + ), recently_announced: Default::default(), }); Self::request_ancestry(protocol, who, common_best) } }, - (Ok(BlockStatus::Queued), _) | (Ok(BlockStatus::InChainWithState), _) | (Ok(BlockStatus::InChainPruned), _) => { + (Ok(BlockStatus::Queued), _) | + (Ok(BlockStatus::InChainWithState), _) | + (Ok(BlockStatus::InChainPruned), _) => { debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number); self.peers.insert(who.clone(), PeerSync { common_number: info.best_number, @@ -441,7 +447,8 @@ impl ChainSync { protocol.disconnect_peer(who); return None } - if let Some((next_state, next_block_num)) = Self::handle_ancestor_search_state(state, num, block_hash_match) { + if let Some((next_state, next_block_num)) = + Self::handle_ancestor_search_state(state, num, block_hash_match) { peer.state = PeerSyncState::AncestorSearch(next_block_num, next_state); Self::request_ancestry(protocol, who, next_block_num); return None @@ -450,7 +457,9 @@ impl ChainSync { vec![] } }, - PeerSyncState::Available | PeerSyncState::DownloadingJustification(..) | PeerSyncState::DownloadingFinalityProof(..) => Vec::new(), + PeerSyncState::Available | + PeerSyncState::DownloadingJustification(..) | + PeerSyncState::DownloadingFinalityProof(..) => Vec::new(), } } else { Vec::new() @@ -770,11 +779,19 @@ impl ChainSync { if protocol.client().block_status(&BlockId::Number(*header.number())) .unwrap_or(BlockStatus::Unknown) == BlockStatus::InChainPruned { - trace!(target: "sync", "Ignored unknown ancient block announced from {}: {} {:?}", who, hash, header); + trace!( + target: "sync", + "Ignored unknown ancient block announced from {}: {} {:?}", + who, hash, header + ); return false; } - trace!(target: "sync", "Considering new unknown stale block announced from {}: {} {:?}", who, hash, header); + trace!( + target: "sync", + "Considering new unknown stale block announced from {}: {} {:?}", + who, hash, header + ); let request = self.download_unknown_stale(&who, &hash); match request { Some(request) => if requires_additional_data { @@ -787,7 +804,11 @@ impl ChainSync { } } else { if ancient_parent { - trace!(target: "sync", "Ignored ancient stale block announced from {}: {} {:?}", who, hash, header); + trace!( + target: "sync", + "Ignored ancient stale block announced from {}: {} {:?}", + who, hash, header + ); return false; } @@ -934,7 +955,12 @@ impl ChainSync { peer.common_number, peer.best_number, ); - let range = self.blocks.needed_blocks(who.clone(), MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number); + let range = self.blocks.needed_blocks( + who.clone(), + MAX_BLOCKS_TO_REQUEST, + peer.best_number, + peer.common_number + ); match range { Some(range) => { trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end); diff --git a/substrate/core/network/src/blocks.rs b/substrate/core/network/src/protocol/sync/blocks.rs similarity index 89% rename from substrate/core/network/src/blocks.rs rename to substrate/core/network/src/protocol/sync/blocks.rs index d693afba17..66730fcc3e 100644 --- a/substrate/core/network/src/blocks.rs +++ b/substrate/core/network/src/protocol/sync/blocks.rs @@ -100,7 +100,8 @@ impl BlockCollection { } /// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded. - pub fn needed_blocks(&mut self, who: PeerId, count: usize, peer_best: NumberFor, common: NumberFor) -> Option>> { + pub fn needed_blocks(&mut self, who: PeerId, count: usize, peer_best: NumberFor, common: NumberFor) + -> Option>> { // First block number that we need to download let first_different = common + >::one(); let count = (count as u32).into(); @@ -110,7 +111,8 @@ impl BlockCollection { loop { let next = downloading_iter.next(); break match &(prev, next) { - &(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _) if downloading < MAX_PARALLEL_DOWNLOADS => + &(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _) + if downloading < MAX_PARALLEL_DOWNLOADS => (*start .. *start + *len, downloading), &(Some((start, r)), Some((next_start, _))) if *start + r.len() < *next_start => (*start + r.len() .. cmp::min(*next_start, *start + r.len() + count), 0), // gap @@ -134,9 +136,13 @@ impl BlockCollection { } range.end = cmp::min(peer_best + One::one(), range.end); self.peer_requests.insert(who, range.start); - self.blocks.insert(range.start, BlockRangeState::Downloading { len: range.end - range.start, downloading: downloading + 1 }); + self.blocks.insert(range.start, BlockRangeState::Downloading { + len: range.end - range.start, + downloading: downloading + 1 + }); if range.end <= range.start { - panic!("Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}", range, count, peer_best, common, self.blocks); + panic!("Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}", + range, count, peer_best, common, self.blocks); } Some(range) } @@ -248,14 +254,17 @@ mod test { bc.insert(1, blocks[1..11].to_vec(), peer0.clone()); assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0), Some(11 .. 41)); - assert_eq!(bc.drain(1), blocks[1..11].iter().map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }).collect::>()); + assert_eq!(bc.drain(1), blocks[1..11].iter() + .map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }).collect::>()); bc.clear_peer_download(&peer0); bc.insert(11, blocks[11..41].to_vec(), peer0.clone()); let drained = bc.drain(12); - assert_eq!(drained[..30], blocks[11..41].iter().map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }).collect::>()[..]); - assert_eq!(drained[30..], blocks[41..81].iter().map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }).collect::>()[..]); + assert_eq!(drained[..30], blocks[11..41].iter() + .map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }).collect::>()[..]); + assert_eq!(drained[30..], blocks[41..81].iter() + .map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }).collect::>()[..]); bc.clear_peer_download(&peer2); assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 80), Some(81 .. 121)); @@ -266,8 +275,10 @@ mod test { assert_eq!(bc.drain(80), vec![]); let drained = bc.drain(81); - assert_eq!(drained[..40], blocks[81..121].iter().map(|b| BlockData { block: b.clone(), origin: Some(peer2.clone()) }).collect::>()[..]); - assert_eq!(drained[40..], blocks[121..150].iter().map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }).collect::>()[..]); + assert_eq!(drained[..40], blocks[81..121].iter() + .map(|b| BlockData { block: b.clone(), origin: Some(peer2.clone()) }).collect::>()[..]); + assert_eq!(drained[40..], blocks[121..150].iter() + .map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }).collect::>()[..]); } #[test] diff --git a/substrate/core/network/src/sync/extra_requests.rs b/substrate/core/network/src/protocol/sync/extra_requests.rs similarity index 99% rename from substrate/core/network/src/sync/extra_requests.rs rename to substrate/core/network/src/protocol/sync/extra_requests.rs index ff8323166b..f41997a05c 100644 --- a/substrate/core/network/src/sync/extra_requests.rs +++ b/substrate/core/network/src/protocol/sync/extra_requests.rs @@ -23,8 +23,8 @@ use fork_tree::ForkTree; use network_libp2p::PeerId; use runtime_primitives::Justification; use runtime_primitives::traits::{Block as BlockT, NumberFor}; -use crate::message; -use crate::sync::{Context, PeerSync, PeerSyncState}; +use crate::protocol::message; +use crate::protocol::sync::{Context, PeerSync, PeerSyncState}; // Time to wait before trying to get the same extra data from the same peer. const EXTRA_RETRY_WAIT: Duration = Duration::from_secs(10); diff --git a/substrate/core/network/src/util.rs b/substrate/core/network/src/protocol/util.rs similarity index 100% rename from substrate/core/network/src/util.rs rename to substrate/core/network/src/protocol/util.rs diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 88c13b4806..6a328e854d 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -31,14 +31,14 @@ use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId use crate::AlwaysBadChecker; use crate::chain::FinalityProofProvider; -use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; -use crate::message::Message; -use crate::on_demand::RequestData; +use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; +use crate::protocol::message::Message; +use crate::protocol::on_demand::RequestData; use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer}; use crate::protocol::{ProtocolStatus, PeerInfo, NetworkOut}; use crate::config::Params; use crate::error::Error; -use crate::specialization::NetworkSpecialization; +use crate::protocol::specialization::NetworkSpecialization; /// Interval at which we send status updates on the SyncProvider status stream. const STATUS_INTERVAL: Duration = Duration::from_millis(5000);