diff --git a/polkadot/network/Cargo.toml b/polkadot/network/Cargo.toml deleted file mode 100644 index e016be7d30..0000000000 --- a/polkadot/network/Cargo.toml +++ /dev/null @@ -1,30 +0,0 @@ -[package] -description = "Polkadot network protocol" -name = "polkadot-network" -version = "0.1.0" -license = "GPL-3.0" -authors = ["Parity Technologies "] - -[lib] - -[dependencies] -log = "0.3" -env_logger = "0.4" -rand = "0.3" -heapsize = "0.4" -semver = "0.6" -smallvec = { version = "0.4", features = ["heapsizeof"] } -parking_lot = "0.4" -ipnetwork = "0.12" -error-chain = "0.11" -bitflags = "1.0" -serde = "1.0" -serde_derive = "1.0" -serde_json = "1.0" -ethcore-network = { git = "https://github.com/paritytech/parity.git" } -ethcore-io = { git = "https://github.com/paritytech/parity.git" } -substrate-primitives = { path = "../../substrate/primitives" } -substrate-client = { path = "../../substrate/client" } -substrate-state-machine = { path = "../../substrate/state-machine" } -substrate-serializer = { path = "../../substrate/serializer" } -polkadot-primitives = { path = "../primitives" } diff --git a/polkadot/network/src/blocks.rs b/polkadot/network/src/blocks.rs deleted file mode 100644 index 8d0118222b..0000000000 --- a/polkadot/network/src/blocks.rs +++ /dev/null @@ -1,262 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see .? - -use std::mem; -use std::cmp; -use std::ops::Range; -use std::collections::{HashMap, BTreeMap}; -use std::collections::hash_map::Entry; -use network::PeerId; -use primitives::block::Number as BlockNumber; -use message; - -const MAX_PARALLEL_DOWNLOADS: u32 = 1; - -/// Block data with origin. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct BlockData { - pub block: message::BlockData, - pub origin: PeerId, -} - -#[derive(Debug)] -enum BlockRangeState { - Downloading { - len: BlockNumber, - downloading: u32, - }, - Complete(Vec), -} - -impl BlockRangeState { - pub fn len(&self) -> BlockNumber { - match *self { - BlockRangeState::Downloading { len, .. } => len, - BlockRangeState::Complete(ref blocks) => blocks.len() as BlockNumber, - } - } -} - -/// A collection of blocks being downloaded. -#[derive(Default)] -pub struct BlockCollection { - /// Downloaded blocks. - blocks: BTreeMap, - peer_requests: HashMap, -} - -impl BlockCollection { - /// Create a new instance. - pub fn new() -> BlockCollection { - BlockCollection { - blocks: BTreeMap::new(), - peer_requests: HashMap::new(), - } - } - - /// Clear everything. - pub fn clear(&mut self) { - self.blocks.clear(); - self.peer_requests.clear(); - } - - /// Insert a set of blocks into collection. - pub fn insert(&mut self, start: BlockNumber, blocks: Vec, peer_id: PeerId) { - if blocks.is_empty() { - return; - } - - match self.blocks.get(&start) { - Some(&BlockRangeState::Downloading { .. }) => { - trace!(target: "sync", "Ignored block data still marked as being downloaded: {}", start); - debug_assert!(false); - return; - }, - Some(&BlockRangeState::Complete(ref existing)) if existing.len() >= blocks.len() => { - trace!(target: "sync", "Ignored block data already downloaded: {}", start); - return; - }, - _ => (), - } - - self.blocks.insert(start, BlockRangeState::Complete(blocks.into_iter().map(|b| BlockData { origin: peer_id, block: b }).collect())); - } - - /// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded. - pub fn needed_blocks(&mut self, peer_id: PeerId, count: usize, peer_best: BlockNumber, common: BlockNumber) -> Option> { - // First block number that we need to download - let first_different = common + 1; - let count = count as BlockNumber; - let (mut range, downloading) = { - let mut downloading_iter = self.blocks.iter().peekable(); - let mut prev: Option<(&BlockNumber, &BlockRangeState)> = None; - loop { - let next = downloading_iter.next(); - break match &(prev, next) { - &(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _) if downloading < MAX_PARALLEL_DOWNLOADS => - (*start .. *start + *len, downloading), - &(Some((start, r)), Some((next_start, _))) if start + r.len() < *next_start => - (*start + r.len() .. cmp::min(*next_start, *start + count), 0), // gap - &(Some((start, r)), None) => - (start + r.len() .. start + r.len() + count, 0), // last range - &(None, None) => - (first_different .. first_different + count, 0), // empty - &(None, Some((start, _))) if *start > first_different => - (first_different .. cmp::min(first_different + count, *start), 0), // gap at the start - _ => { - prev = next; - continue - }, - } - } - }; - - // crop to peers best - if range.start >= peer_best { - return None; - } - range.end = cmp::min(peer_best, range.end); - - self.peer_requests.insert(peer_id, range.start); - self.blocks.insert(range.start, BlockRangeState::Downloading{ len: range.end - range.start, downloading: downloading + 1 }); - Some(range) - } - - /// Get a valid chain of blocks ordered in descending order and ready for importing into blockchain. - pub fn drain(&mut self, from: BlockNumber) -> Vec { - let mut drained = Vec::new(); - let mut ranges = Vec::new(); - { - let mut prev = from; - for (start, range_data) in &mut self.blocks { - match range_data { - &mut BlockRangeState::Complete(ref mut blocks) if *start <= prev => { - prev = *start + blocks.len() as BlockNumber; - let mut blocks = mem::replace(blocks, Vec::new()); - drained.append(&mut blocks); - ranges.push(*start); - }, - _ => break, - } - } - } - for r in ranges { - self.blocks.remove(&r); - } - trace!(target: "sync", "Drained {} blocks", drained.len()); - drained - } - - pub fn clear_peer_download(&mut self, peer_id: PeerId) { - match self.peer_requests.entry(peer_id) { - Entry::Occupied(entry) => { - let start = entry.remove(); - let remove = match self.blocks.get_mut(&start) { - Some(&mut BlockRangeState::Downloading { ref mut downloading, .. }) if *downloading > 1 => { - *downloading = *downloading - 1; - false - }, - Some(&mut BlockRangeState::Downloading { .. }) => { - true - }, - _ => { - debug_assert!(false); - false - } - }; - if remove { - self.blocks.remove(&start); - } - }, - _ => (), - } - } -} - -#[cfg(test)] -mod test { - use super::{BlockCollection, BlockData}; - use message; - use primitives::block::HeaderHash; - - fn is_empty(bc: &BlockCollection) -> bool { - bc.blocks.is_empty() && - bc.peer_requests.is_empty() - } - - fn generate_blocks(n: usize) -> Vec { - (0 .. n).map(|_| message::BlockData { - hash: HeaderHash::random(), - header: None, - body: None, - message_queue: None, - receipt: None, - }).collect() - } - - #[test] - fn create_clear() { - let mut bc = BlockCollection::new(); - assert!(is_empty(&bc)); - bc.insert(1, generate_blocks(100), 0); - assert!(!is_empty(&bc)); - bc.clear(); - assert!(is_empty(&bc)); - } - - #[test] - fn insert_blocks() { - let mut bc = BlockCollection::new(); - assert!(is_empty(&bc)); - let peer0 = 0; - let peer1 = 1; - let peer2 = 2; - - let blocks = generate_blocks(150); - assert_eq!(bc.needed_blocks(peer0, 40, 150, 0), Some(1 .. 41)); - assert_eq!(bc.needed_blocks(peer1, 40, 150, 0), Some(41 .. 81)); - assert_eq!(bc.needed_blocks(peer2, 40, 150, 0), Some(81 .. 121)); - - bc.clear_peer_download(peer1); - bc.insert(41, blocks[41..81].to_vec(), peer1); - assert_eq!(bc.drain(1), vec![]); - assert_eq!(bc.needed_blocks(peer1, 40, 150, 0), Some(121 .. 150)); - bc.clear_peer_download(peer0); - bc.insert(1, blocks[1..11].to_vec(), peer0); - - assert_eq!(bc.needed_blocks(peer0, 40, 150, 0), Some(11 .. 41)); - assert_eq!(bc.drain(1), blocks[1..11].iter().map(|b| BlockData { block: b.clone(), origin: 0 }).collect::>()); - - bc.clear_peer_download(peer0); - bc.insert(11, blocks[11..41].to_vec(), peer0); - - let drained = bc.drain(12); - assert_eq!(drained[..30], blocks[11..41].iter().map(|b| BlockData { block: b.clone(), origin: 0 }).collect::>()[..]); - assert_eq!(drained[30..], blocks[41..81].iter().map(|b| BlockData { block: b.clone(), origin: 1 }).collect::>()[..]); - - bc.clear_peer_download(peer2); - assert_eq!(bc.needed_blocks(peer2, 40, 150, 80), Some(81 .. 121)); - bc.clear_peer_download(peer2); - bc.insert(81, blocks[81..121].to_vec(), peer2); - bc.clear_peer_download(peer1); - bc.insert(121, blocks[121..150].to_vec(), peer1); - - assert_eq!(bc.drain(80), vec![]); - let drained = bc.drain(81); - assert_eq!(drained[..40], blocks[81..121].iter().map(|b| BlockData { block: b.clone(), origin: 2 }).collect::>()[..]); - assert_eq!(drained[40..], blocks[121..150].iter().map(|b| BlockData { block: b.clone(), origin: 1 }).collect::>()[..]); - } -} diff --git a/polkadot/network/src/chain.rs b/polkadot/network/src/chain.rs deleted file mode 100644 index 9390b727ec..0000000000 --- a/polkadot/network/src/chain.rs +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Blockchain access trait - -use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus}; -use client::error::Error; -use state_machine; -use primitives::block; - -pub trait Client : Send + Sync { - /// Given a hash return a header - fn import(&self, header: block::Header, body: Option) -> Result; - - /// Get blockchain info. - fn info(&self) -> Result; - - /// Get block status. - fn block_status(&self, hash: &block::HeaderHash) -> Result; - - /// Get block hash by number. - fn block_hash(&self, block_number: block::Number) -> Result, Error>; -} - -impl Client for PolkadotClient where - B: client::backend::Backend + Send + Sync + 'static, - E: state_machine::CodeExecutor + Send + Sync + 'static, -{ - fn import(&self, header: block::Header, body: Option) -> Result { - (self as &Client).import(header, body) - } - - fn info(&self) -> Result { - (self as &Client).info() - } - - fn block_status(&self, hash: &block::HeaderHash) -> Result { - (self as &Client).block_status(hash) - } - - fn block_hash(&self, block_number: block::Number) -> Result, Error> { - (self as &Client).block_hash(block_number) - } -} diff --git a/polkadot/network/src/config.rs b/polkadot/network/src/config.rs deleted file mode 100644 index b826213280..0000000000 --- a/polkadot/network/src/config.rs +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see .? - -use service::Role; - -/// Protocol configuration -pub struct ProtocolConfig { - pub roles: Role, -} - -impl Default for ProtocolConfig { - fn default() -> ProtocolConfig { - ProtocolConfig { - roles: Role::FULL, - } - } -} diff --git a/polkadot/network/src/error.rs b/polkadot/network/src/error.rs deleted file mode 100644 index 9583a29861..0000000000 --- a/polkadot/network/src/error.rs +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see .? - -use network::Error as NetworkError; -use client; - -error_chain! { - foreign_links { - Network(NetworkError) #[doc = "Devp2p error."]; - } - - links { - Client(client::error::Error, client::error::ErrorKind); - } - - errors { - } -} diff --git a/polkadot/network/src/io.rs b/polkadot/network/src/io.rs deleted file mode 100644 index 339e80e9ad..0000000000 --- a/polkadot/network/src/io.rs +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see .? - -use network::{NetworkContext, PeerId, Error as NetworkError, SessionInfo}; - -/// IO interface for the syncing handler. -/// Provides peer connection management and an interface to the blockchain client. -pub trait SyncIo { - /// Disable a peer - fn disable_peer(&mut self, peer_id: PeerId); - /// Disconnect peer - fn disconnect_peer(&mut self, peer_id: PeerId); - /// Send a packet to a peer. - fn send(&mut self, peer_id: PeerId, data: Vec) -> Result<(), NetworkError>; - /// Returns peer identifier string - fn peer_info(&self, peer_id: PeerId) -> String { - peer_id.to_string() - } - /// Returns information on p2p session - fn peer_session_info(&self, peer_id: PeerId) -> Option; - /// Check if the session is expired - fn is_expired(&self) -> bool; -} - -/// Wraps `NetworkContext` and the blockchain client -pub struct NetSyncIo<'s, 'h> where 'h: 's { - network: &'s NetworkContext<'h>, -} - -impl<'s, 'h> NetSyncIo<'s, 'h> { - /// Creates a new instance from the `NetworkContext` and the blockchain client reference. - pub fn new(network: &'s NetworkContext<'h>) -> NetSyncIo<'s, 'h> { - NetSyncIo { - network: network, - } - } -} - -impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { - fn disable_peer(&mut self, peer_id: PeerId) { - self.network.disable_peer(peer_id); - } - - fn disconnect_peer(&mut self, peer_id: PeerId) { - self.network.disconnect_peer(peer_id); - } - - fn send(&mut self, peer_id: PeerId, data: Vec) -> Result<(), NetworkError>{ - self.network.send(peer_id, 0, data) - } - - fn peer_session_info(&self, peer_id: PeerId) -> Option { - self.network.session_info(peer_id) - } - - fn is_expired(&self) -> bool { - self.network.is_expired() - } - - fn peer_info(&self, peer_id: PeerId) -> String { - self.network.peer_client_version(peer_id) - } -} - - diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs deleted file mode 100644 index bdd7b41ca6..0000000000 --- a/polkadot/network/src/lib.rs +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see .? - -#![warn(missing_docs)] - -//! Implements polkadot protocol version as specified here: -//! https://github.com/paritytech/polkadot/wiki/Network-protocol - -extern crate ethcore_network as network; -extern crate ethcore_io as core_io; -extern crate env_logger; -extern crate rand; -extern crate semver; -extern crate parking_lot; -extern crate smallvec; -extern crate ipnetwork; -extern crate substrate_primitives as primitives; -extern crate substrate_state_machine as state_machine; -extern crate substrate_serializer as ser; -extern crate serde; -extern crate serde_json; -// TODO: remove these two; split off dependent logic into polkadot-network and rename this crate -// to substrate-network. -extern crate polkadot_primitives as polkadot_primitives; -extern crate substrate_client as client; -#[macro_use] extern crate serde_derive; -#[macro_use] extern crate log; -#[macro_use] extern crate bitflags; -#[macro_use] extern crate error_chain; - -mod service; -mod sync; -mod protocol; -mod io; -mod message; -mod error; -mod config; -mod chain; -mod blocks; - -#[cfg(test)] -mod test; - -pub use service::Service; -pub use protocol::{ProtocolStatus}; -pub use network::{NonReservedPeerMode, ConnectionFilter, ConnectionDirection, NetworkConfiguration}; - -// TODO: move it elsewhere -fn header_hash(header: &primitives::Header) -> primitives::block::HeaderHash { - primitives::hashing::blake2_256(&ser::to_vec(header)).into() -} diff --git a/polkadot/network/src/message.rs b/polkadot/network/src/message.rs deleted file mode 100644 index feefb56595..0000000000 --- a/polkadot/network/src/message.rs +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see .? - -//! Network packet message types. These get serialized and put into the lower level protocol payload. - -use std::borrow::Borrow; -use primitives::AuthorityId; -use primitives::block::{Number as BlockNumber, HeaderHash, Header, Body}; -use service::Role as RoleFlags; -use polkadot_primitives::parachain::Id as ParachainId; - -pub type RequestId = u64; -type Bytes = Vec; - -type Signature = ::primitives::hash::H256; //TODO: - -/// Configured node role. -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -pub enum Role { - /// Full relay chain client with no additional responsibilities. - Full, - /// Relay chain light client. - Light, - /// Parachain validator. - Validator, - /// Parachain collator. - Collator, -} - -impl From for RoleFlags where T: Borrow<[Role]> { - fn from(roles: T) -> RoleFlags { - let mut flags = RoleFlags::NONE; - let roles: &[Role] = roles.borrow(); - for r in roles { - match *r { - Role::Full => flags = flags | RoleFlags::FULL, - Role::Light => flags = flags | RoleFlags::LIGHT, - Role::Validator => flags = flags | RoleFlags::VALIDATOR, - Role::Collator => flags = flags | RoleFlags::COLLATOR, - } - } - flags - } -} - -impl From for Vec where { - fn from(flags: RoleFlags) -> Vec { - let mut roles = Vec::new(); - if !(flags & RoleFlags::FULL).is_empty() { - roles.push(Role::Full); - } - if !(flags & RoleFlags::LIGHT).is_empty() { - roles.push(Role::Light); - } - if !(flags & RoleFlags::VALIDATOR).is_empty() { - roles.push(Role::Validator); - } - if !(flags & RoleFlags::COLLATOR).is_empty() { - roles.push(Role::Collator); - } - roles - } -} - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Copy, Clone)] -/// Bits of block data and associated artefacts to request. -pub enum BlockAttribute { - /// Include block header. - Header, - /// Include block body. - Body, - /// Include block receipt. - Receipt, - /// Include block message queue. - MessageQueue, -} - -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -/// Block data sent in the response. -pub struct BlockData { - /// Block header hash. - pub hash: HeaderHash, - /// Block header if requested. - pub header: Option
, - /// Block body if requested. - pub body: Option, - /// Block receipt if requested. - pub receipt: Option, - /// Block message queue if requested. - pub message_queue: Option, -} - -#[serde(untagged)] -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -/// Identifies starting point of a block sequence. -pub enum FromBlock { - /// Start with given hash. - Hash(HeaderHash), - /// Start with given block number. - Number(BlockNumber), -} - -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -/// Block enumeration direction. -pub enum Direction { - /// Enumerate in ascending order (from child to parent). - Ascending, - /// Enumerate in descendfing order (from parent to canonical child). - Descending, -} - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -/// A network message. -pub enum Message { - /// Status packet. - Status(Status), - /// Block request. - BlockRequest(BlockRequest), - /// Block response. - BlockResponse(BlockResponse), - /// Block announce. - BlockAnnounce(BlockAnnounce), -} - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct Status { - /// Protocol version. - pub version: u32, - /// Supported roles. - pub roles: Vec, - /// Best block number. - pub best_number: BlockNumber, - /// Best block hash. - pub best_hash: HeaderHash, - /// Genesis block hash. - pub genesis_hash: HeaderHash, - /// Signatue of `best_hash` made with validator address. Required for the validator role. - pub validator_signature: Option, - /// Validator address. Required for the validator role. - pub validator_id: Option, - /// Parachain id. Required for the collator role. - pub parachain_id: Option, -} - -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] -/// Request block data from a peer. -pub struct BlockRequest { - /// Unique request id. - pub id: RequestId, - /// Bits of block data to request. - pub fields: Vec, - /// Start from this block. - pub from: FromBlock, - /// End at this block. An implementation defined maximum is used when unspecified. - pub to: Option, - /// Sequence direction. - pub direction: Direction, - /// Maximum number of block to return. An implementation defined maximum is used when unspecified. - pub max: Option, -} - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -/// Response to `BlockRequest` -pub struct BlockResponse { - /// Id of a request this response was made for. - pub id: RequestId, - /// Block data for the requested sequence. - pub blocks: Vec, -} - -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -/// Announce a new complete relay chain block on the network. -pub struct BlockAnnounce { - /// New block header. - pub header: Header, -} diff --git a/polkadot/network/src/protocol.rs b/polkadot/network/src/protocol.rs deleted file mode 100644 index bd9dd614fe..0000000000 --- a/polkadot/network/src/protocol.rs +++ /dev/null @@ -1,344 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see .? - -use std::collections::{HashMap, HashSet, BTreeMap}; -use std::mem; -use std::sync::Arc; -use parking_lot::RwLock; -use serde_json; -use std::time; -use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header}; -use network::{PeerId, NodeId}; - -use message::{self, Message}; -use sync::{ChainSync, Status as SyncStatus}; -use service::Role; -use config::ProtocolConfig; -use chain::Client; -use io::SyncIo; -use error; - -const REQUEST_TIMEOUT_SEC: u64 = 15; -const PROTOCOL_VERSION: u32 = 0; - -// Lock must always be taken in order declared here. -pub struct Protocol { - config: ProtocolConfig, - chain: Arc, - genesis_hash: HeaderHash, - sync: RwLock, - /// All connected peers - peers: RwLock>, - /// Connected peers pending Status message. - handshaking_peers: RwLock>, -} - -/// Syncing status and statistics -#[derive(Clone, Copy)] -pub struct ProtocolStatus { - /// Sync status. - pub sync: SyncStatus, - /// Total number of connected peers - pub num_peers: usize, - /// Total number of active peers. - pub num_active_peers: usize, -} - -/// Peer information -struct Peer { - /// Protocol version - protocol_version: u32, - /// Roles - roles: Role, - /// Peer best block hash - best_hash: HeaderHash, - /// Peer best block number - best_number: BlockNumber, - /// Pending block request if any - block_request: Option, - /// Request timestamp - request_timestamp: Option, - /// Holds a set of transactions recently sent to this peer to avoid spamming. - _last_sent_transactions: HashSet, - /// Request counter, - request_id: message::RequestId, -} - -#[derive(Debug)] -pub struct PeerInfo { - /// Roles - pub roles: Role, - /// Protocol version - pub protocol_version: u32, - /// Peer best block hash - pub best_hash: HeaderHash, - /// Peer best block number - pub best_number: BlockNumber, -} - -/// Transaction stats -#[derive(Debug)] -pub struct TransactionStats { - /// Block number where this TX was first seen. - pub first_seen: u64, - /// Peers it was propagated to. - pub propagated_to: BTreeMap, -} - -impl Protocol { - /// Create a new instance. - pub fn new(config: ProtocolConfig, chain: Arc) -> error::Result { - let info = chain.info()?; - let protocol = Protocol { - config: config, - chain: chain, - genesis_hash: info.chain.genesis_hash, - sync: RwLock::new(ChainSync::new(&info)), - peers: RwLock::new(HashMap::new()), - handshaking_peers: RwLock::new(HashMap::new()), - }; - Ok(protocol) - } - - /// Returns protocol status - pub fn status(&self) -> ProtocolStatus { - let sync = self.sync.read(); - let peers = self.peers.read(); - ProtocolStatus { - sync: sync.status(), - num_peers: peers.values().count(), - num_active_peers: peers.values().filter(|p| p.block_request.is_some()).count(), - } - } - - pub fn handle_packet(&self, io: &mut SyncIo, peer_id: PeerId, data: &[u8]) { - let message: Message = match serde_json::from_slice(data) { - Ok(m) => m, - Err(e) => { - debug!("Invalid packet from {}: {}", peer_id, e); - io.disable_peer(peer_id); - return; - } - }; - - match message { - Message::Status(s) => self.on_status_message(io, peer_id, s), - Message::BlockRequest(r) => self.on_block_request(io, peer_id, r), - Message::BlockResponse(r) => { - let request = { - let mut peers = self.peers.write(); - if let Some(ref mut peer) = peers.get_mut(&peer_id) { - peer.request_timestamp = None; - match mem::replace(&mut peer.block_request, None) { - Some(r) => r, - None => { - debug!("Unexpected response packet from {}", peer_id); - io.disable_peer(peer_id); - return; - } - } - } else { - debug!("Unexpected packet from {}", peer_id); - io.disable_peer(peer_id); - return; - } - }; - if request.id != r.id { - trace!("Ignoring mismatched response packet from {}", peer_id); - return; - } - self.on_block_response(io, peer_id, request, r); - }, - Message::BlockAnnounce(announce) => { - self.on_block_announce(io, peer_id, announce); - } - } - } - - pub fn send_message(&self, io: &mut SyncIo, peer_id: PeerId, mut message: Message) { - let mut peers = self.peers.write(); - if let Some(ref mut peer) = peers.get_mut(&peer_id) { - match &mut message { - &mut Message::BlockRequest(ref mut r) => { - peer.block_request = Some(r.clone()); - peer.request_timestamp = Some(time::Instant::now()); - r.id = peer.request_id; - peer.request_id = peer.request_id + 1; - }, - _ => (), - } - } - let data = serde_json::to_vec(&message).expect("Serializer is infallible; qed"); - if let Err(e) = io.send(peer_id, data) { - debug!(target:"sync", "Error sending message: {:?}", e); - io.disconnect_peer(peer_id); - } - } - - /// Called when a new peer is connected - pub fn on_peer_connected(&self, io: &mut SyncIo, peer_id: PeerId) { - trace!(target: "sync", "Connected {}: {}", peer_id, io.peer_info(peer_id)); - self.handshaking_peers.write().insert(peer_id, time::Instant::now()); - self.send_status(io, peer_id); - } - - /// Called by peer when it is disconnecting - pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: PeerId) { - trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_info(peer)); - let removed = { - let mut peers = self.peers.write(); - let mut handshaking_peers = self.handshaking_peers.write(); - handshaking_peers.remove(&peer); - peers.remove(&peer).is_some() - }; - if removed { - self.sync.write().peer_disconnected(io, self, peer); - } - } - - pub fn on_block_request(&self, _io: &mut SyncIo, _peer_id: PeerId, _request: message::BlockRequest) { - } - - pub fn on_block_response(&self, io: &mut SyncIo, peer_id: PeerId, request: message::BlockRequest, response: message::BlockResponse) { - // TODO: validate response - self.sync.write().on_block_data(io, self, peer_id, request, response); - } - - pub fn tick(&self, io: &mut SyncIo) { - self.maintain_peers(io); - } - - fn maintain_peers(&self, io: &mut SyncIo) { - let tick = time::Instant::now(); - let mut aborting = Vec::new(); - { - let peers = self.peers.read(); - let handshaking_peers = self.handshaking_peers.read(); - for (peer_id, timestamp) in peers.iter() - .filter_map(|(id, peer)| peer.request_timestamp.as_ref().map(|r| (id, r))) - .chain(handshaking_peers.iter()) { - if (tick - *timestamp).as_secs() > REQUEST_TIMEOUT_SEC { - trace!(target:"sync", "Timeout {}", peer_id); - io.disconnect_peer(*peer_id); - aborting.push(*peer_id); - } - } - } - for p in aborting { - self.on_peer_disconnected(io, p); - } - } - - pub fn peer_info(&self, peer: PeerId) -> Option { - self.peers.read().get(&peer).map(|p| { - PeerInfo { - roles: p.roles, - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - } - }) - } - - /// Called by peer to report status - fn on_status_message(&self, io: &mut SyncIo, peer_id: PeerId, status: message::Status) { - trace!(target: "sync", "New peer {} {:?}", peer_id, status); - if io.is_expired() { - trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id)); - return; - } - - { - let mut peers = self.peers.write(); - let mut handshaking_peers = self.handshaking_peers.write(); - if peers.contains_key(&peer_id) { - debug!(target: "sync", "Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id)); - return; - } - if status.genesis_hash != self.genesis_hash { - io.disable_peer(peer_id); - trace!(target: "sync", "Peer {} genesis hash mismatch (ours: {}, theirs: {})", peer_id, self.genesis_hash, status.genesis_hash); - return; - } - if status.version != PROTOCOL_VERSION { - io.disable_peer(peer_id); - trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, status.version); - return; - } - - let peer = Peer { - protocol_version: status.version, - roles: status.roles.into(), - best_hash: status.best_hash, - best_number: status.best_number, - block_request: None, - request_timestamp: None, - _last_sent_transactions: HashSet::new(), - request_id: 0, - }; - peers.insert(peer_id.clone(), peer); - handshaking_peers.remove(&peer_id); - debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id)); - } - self.sync.write().new_peer(io, self, peer_id); - } - - /// Send Status message - fn send_status(&self, io: &mut SyncIo, peer_id: PeerId) { - if let Ok(info) = self.chain.info() { - let status = message::Status { - version: PROTOCOL_VERSION, - genesis_hash: info.chain.genesis_hash, - roles: self.config.roles.into(), - best_number: info.chain.best_number, - best_hash: info.chain.best_hash, - validator_signature: None, - validator_id: None, - parachain_id: None, - }; - self.send_message(io, peer_id, Message::Status(status)) - } - } - - pub fn abort(&self) { - let mut sync = self.sync.write(); - let mut peers = self.peers.write(); - let mut handshaking_peers = self.handshaking_peers.write(); - sync.clear(); - peers.clear(); - handshaking_peers.clear(); - } - - pub fn on_block_announce(&self, io: &mut SyncIo, peer_id: PeerId, announce: message::BlockAnnounce) { - let header = announce.header; - self.sync.write().on_block_announce(io, self, peer_id, &header); - } - - pub fn on_block_imported(&self, header: &Header) { - self.sync.write().update_chain_info(&header); - } - - pub fn on_new_transactions(&self) { - } - - pub fn transactions_stats(&self) -> BTreeMap { - BTreeMap::new() - } - - pub fn chain(&self) -> &Client { - &*self.chain - } -} diff --git a/polkadot/network/src/service.rs b/polkadot/network/src/service.rs deleted file mode 100644 index e3832249db..0000000000 --- a/polkadot/network/src/service.rs +++ /dev/null @@ -1,239 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see .? - -use std::sync::Arc; -use std::collections::{BTreeMap}; -use std::io; -use network::{NetworkProtocolHandler, NetworkService, NetworkContext, HostInfo, PeerId, ProtocolId, -NetworkConfiguration , NonReservedPeerMode, ErrorKind}; -use primitives::block::{TransactionHash, Header}; -use core_io::{TimerToken}; -use io::NetSyncIo; -use protocol::{Protocol, ProtocolStatus, PeerInfo as ProtocolPeerInfo, TransactionStats}; -use config::{ProtocolConfig}; -use error::Error; -use chain::Client; - -/// Polkadot devp2p protocol id -pub const DOT_PROTOCOL_ID: ProtocolId = *b"dot"; - -bitflags! { - pub struct Role: u32 { - const NONE = 0b00000000; - const FULL = 0b00000001; - const LIGHT = 0b00000010; - const VALIDATOR = 0b00000100; - const COLLATOR = 0b00001000; - } -} - -/// Sync status -pub trait SyncProvider: Send + Sync { - /// Get sync status - fn status(&self) -> ProtocolStatus; - /// Get peers information - fn peers(&self) -> Vec; - /// Get this node id if available. - fn node_id(&self) -> Option; - /// Returns propagation count for pending transactions. - fn transactions_stats(&self) -> BTreeMap; -} - -/// Peer connection information -#[derive(Debug)] -pub struct PeerInfo { - /// Public node id - pub id: Option, - /// Node client ID - pub client_version: String, - /// Capabilities - pub capabilities: Vec, - /// Remote endpoint address - pub remote_address: String, - /// Local endpoint address - pub local_address: String, - /// Dot protocol info. - pub dot_info: Option, -} - -/// Service initialization parameters. -pub struct Params { - /// Configuration. - pub config: ProtocolConfig, - /// Network layer configuration. - pub network_config: NetworkConfiguration, - /// Polkadot relay chain access point. - pub chain: Arc, -} - -/// Polkadot network service. Handles network IO and manages connectivity. -pub struct Service { - /// Network service - network: NetworkService, - /// Devp2p protocol handler - handler: Arc, -} - -impl Service { - /// Creates and register protocol with the network service - pub fn new(params: Params) -> Result, Error> { - - let service = NetworkService::new(params.network_config.clone(), None)?; - - let sync = Arc::new(Service { - network: service, - handler: Arc::new(ProtocolHandler { - protocol: Protocol::new(params.config, params.chain.clone())?, - }), - }); - - Ok(sync) - } - - /// Called when a new block is imported by the client. - pub fn on_block_imported(&self, header: &Header) { - self.handler.protocol.on_block_imported(header) - } - - /// Called when new transactons are imported by the client. - pub fn on_new_transactions(&self) { - self.handler.protocol.on_new_transactions() - } - - fn start(&self) { - match self.network.start().map_err(Into::into) { - Err(ErrorKind::Io(ref e)) if e.kind() == io::ErrorKind::AddrInUse => - warn!("Network port {:?} is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option.", self.network.config().listen_address.expect("Listen address is not set.")), - Err(err) => warn!("Error starting network: {}", err), - _ => {}, - }; - self.network.register_protocol(self.handler.clone(), DOT_PROTOCOL_ID, 1, &[0u8]) - .unwrap_or_else(|e| warn!("Error registering polkadot protocol: {:?}", e)); - } - - fn stop(&self) { - self.handler.protocol.abort(); - self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); - } -} - -impl SyncProvider for Service { - /// Get sync status - fn status(&self) -> ProtocolStatus { - self.handler.protocol.status() - } - - /// Get sync peers - fn peers(&self) -> Vec { - self.network.with_context_eval(DOT_PROTOCOL_ID, |ctx| { - let peer_ids = self.network.connected_peers(); - - peer_ids.into_iter().filter_map(|peer_id| { - let session_info = match ctx.session_info(peer_id) { - None => return None, - Some(info) => info, - }; - - Some(PeerInfo { - id: session_info.id.map(|id| format!("{:x}", id)), - client_version: session_info.client_version, - capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(), - remote_address: session_info.remote_address, - local_address: session_info.local_address, - dot_info: self.handler.protocol.peer_info(peer_id), - }) - }).collect() - }).unwrap_or_else(Vec::new) - } - - fn node_id(&self) -> Option { - self.network.external_url() - } - - fn transactions_stats(&self) -> BTreeMap { - self.handler.protocol.transactions_stats() - } -} - -struct ProtocolHandler { - /// Protocol handler - protocol: Protocol, -} - -impl NetworkProtocolHandler for ProtocolHandler { - fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { - io.register_timer(0, 1000).expect("Error registering sync timer"); - } - - fn read(&self, io: &NetworkContext, peer: &PeerId, _packet_id: u8, data: &[u8]) { - self.protocol.handle_packet(&mut NetSyncIo::new(io), *peer, data); - } - - fn connected(&self, io: &NetworkContext, peer: &PeerId) { - self.protocol.on_peer_connected(&mut NetSyncIo::new(io), *peer); - } - - fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { - self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer); - } - - fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { - self.protocol.tick(&mut NetSyncIo::new(io)); - } -} - -/// Trait for managing network -pub trait ManageNetwork : Send + Sync { - /// Set to allow unreserved peers to connect - fn accept_unreserved_peers(&self); - /// Set to deny unreserved peers to connect - fn deny_unreserved_peers(&self); - /// Remove reservation for the peer - fn remove_reserved_peer(&self, peer: String) -> Result<(), String>; - /// Add reserved peer - fn add_reserved_peer(&self, peer: String) -> Result<(), String>; - /// Start network - fn start_network(&self); - /// Stop network - fn stop_network(&self); -} - - -impl ManageNetwork for Service { - fn accept_unreserved_peers(&self) { - self.network.set_non_reserved_mode(NonReservedPeerMode::Accept); - } - - fn deny_unreserved_peers(&self) { - self.network.set_non_reserved_mode(NonReservedPeerMode::Deny); - } - - fn remove_reserved_peer(&self, peer: String) -> Result<(), String> { - self.network.remove_reserved_peer(&peer).map_err(|e| format!("{:?}", e)) - } - - fn add_reserved_peer(&self, peer: String) -> Result<(), String> { - self.network.add_reserved_peer(&peer).map_err(|e| format!("{:?}", e)) - } - - fn start_network(&self) { - self.start(); - } - - fn stop_network(&self) { - self.stop(); - } -} diff --git a/polkadot/network/src/sync.rs b/polkadot/network/src/sync.rs deleted file mode 100644 index e7d2847b83..0000000000 --- a/polkadot/network/src/sync.rs +++ /dev/null @@ -1,368 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see .? - -use std::collections::HashMap; -use io::SyncIo; -use protocol::Protocol; -use network::PeerId; -use client::{ImportResult, BlockStatus, ClientInfo}; -use primitives::block::{HeaderHash, Number as BlockNumber, Header}; -use blocks::{self, BlockCollection}; -use message::{self, Message}; -use super::header_hash; - -// Maximum parallel requests for a block. -const MAX_BLOCK_DOWNLOAD: usize = 1; - -struct PeerSync { - pub common_hash: HeaderHash, - pub common_number: BlockNumber, - pub best_hash: HeaderHash, - pub best_number: BlockNumber, - pub state: PeerSyncState, -} - -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -pub enum PeerSyncState { - AncestorSearch(BlockNumber), - Available, - DownloadingNew(BlockNumber), - DownloadingStale(HeaderHash), -} - -/// Relay chain sync strategy. -pub struct ChainSync { - genesis_hash: HeaderHash, - peers: HashMap, - blocks: BlockCollection, - best_queued_number: BlockNumber, - best_queued_hash: HeaderHash, - required_block_attributes: Vec, -} - -/// Syncing status and statistics -#[derive(Clone, Copy)] -pub struct Status; - -impl ChainSync { - pub fn new(info: &ClientInfo) -> ChainSync { - ChainSync { - genesis_hash: info.chain.genesis_hash, - peers: HashMap::new(), - blocks: BlockCollection::new(), - best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash), - best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number), - required_block_attributes: vec![message::BlockAttribute::Header, message::BlockAttribute::Body], - } - } - - /// Returns sync status - pub fn status(&self) -> Status { - Status - } - - pub fn new_peer(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) { - if let Some(info) = protocol.peer_info(peer_id) { - match (protocol.chain().block_status(&info.best_hash), info.best_number) { - (Err(e), _) => { - debug!(target:"sync", "Error reading blockchain: {:?}", e); - io.disconnect_peer(peer_id); - }, - (Ok(BlockStatus::KnownBad), _) => { - debug!(target:"sync", "New peer with known bad best block {} ({}).", info.best_hash, info.best_number); - io.disable_peer(peer_id); - }, - (Ok(BlockStatus::Unknown), 0) => { - debug!(target:"sync", "New peer with unkown genesis hash {} ({}).", info.best_hash, info.best_number); - io.disable_peer(peer_id); - }, - (Ok(BlockStatus::Unknown), _) => { - debug!(target:"sync", "New peer with unkown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number); - self.peers.insert(peer_id, PeerSync { - common_hash: self.genesis_hash, - common_number: 0, - best_hash: info.best_hash, - best_number: info.best_number, - state: PeerSyncState::AncestorSearch(info.best_number - 1), - }); - Self::request_ancestry(io, protocol, peer_id, info.best_number - 1) - }, - (Ok(BlockStatus::Queued), _) | (Ok(BlockStatus::InChain), _) => { - debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number); - self.peers.insert(peer_id, PeerSync { - common_hash: info.best_hash, - common_number: info.best_number, - best_hash: info.best_hash, - best_number: info.best_number, - state: PeerSyncState::Available, - }); - } - } - } - } - - pub fn on_block_data(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, _request: message::BlockRequest, response: message::BlockResponse) { - let count = response.blocks.len(); - let mut imported: usize = 0; - let new_blocks = if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { - match peer.state { - PeerSyncState::DownloadingNew(start_block) => { - self.blocks.clear_peer_download(peer_id); - peer.state = PeerSyncState::Available; - - self.blocks.insert(start_block, response.blocks, peer_id); - self.blocks.drain(self.best_queued_number + 1) - }, - PeerSyncState::DownloadingStale(_) => { - peer.state = PeerSyncState::Available; - response.blocks.into_iter().map(|b| blocks::BlockData { - origin: peer_id, - block: b - }).collect() - }, - PeerSyncState::AncestorSearch(n) => { - match response.blocks.get(0) { - Some(ref block) => { - match protocol.chain().block_hash(n) { - Ok(Some(block_hash)) if block_hash == block.hash => { - peer.common_hash = block.hash; - peer.common_number = n; - peer.state = PeerSyncState::Available; - trace!(target:"sync", "Found common ancestor for peer {}: {} ({})", peer_id, block.hash, n); - vec![] - }, - Ok(_) if n > 0 => { - let n = n - 1; - peer.state = PeerSyncState::AncestorSearch(n); - Self::request_ancestry(io, protocol, peer_id, n); - return; - }, - Ok(_) => { // genesis mismatch - io.disable_peer(peer_id); - return; - }, - Err(e) => { - debug!(target:"sync", "Error reading blockchain: {:?}", e); - io.disconnect_peer(peer_id); - return; - } - } - }, - None => { - trace!(target:"sync", "Invalid response when searching for ancestor from {}", peer_id); - io.disconnect_peer(peer_id); - return; - } - } - }, - PeerSyncState::Available => Vec::new(), - } - } else { - vec![] - }; - - // Blocks in the response/drain should be in ascending order. - for block in new_blocks { - let origin = block.origin; - let block = block.block; - if let Some(header) = block.header { - let number = header.number; - let hash = header_hash(&header); - let parent = header.parent_hash; - let result = protocol.chain().import(header, block.body); - match result { - Ok(ImportResult::AlreadyInChain) => { - trace!(target: "sync", "Block already in chain {}: {:?}", number, hash); - self.block_imported(&hash, number); - }, - Ok(ImportResult::AlreadyQueued) => { - trace!(target: "sync", "Block already queued {}: {:?}", number, hash); - self.block_imported(&hash, number); - }, - Ok(ImportResult::Queued) => { - trace!(target: "sync", "Block queued {}: {:?}", number, hash); - self.block_imported(&hash, number); - imported = imported + 1; - }, - Ok(ImportResult::UnknownParent) => { - debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent); - self.restart(io, protocol); - return; - }, - Ok(ImportResult::KnownBad) => { - debug!(target: "sync", "Bad block {}: {:?}", number, hash); - io.disable_peer(origin); //TODO: use persistent ID - self.restart(io, protocol); - return; - } - Err(e) => { - debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e); - self.restart(io, protocol); - return; - } - } - } - } - trace!(target: "sync", "Imported {} of {}", imported, count); - self.maintain_sync(io, protocol); - } - - fn maintain_sync(&mut self, io: &mut SyncIo, protocol: &Protocol) { - let peers: Vec = self.peers.keys().map(|p| *p).collect(); - for peer in peers { - self.download_new(io, protocol, peer); - } - } - - fn block_imported(&mut self, hash: &HeaderHash, number: BlockNumber) { - if number > self.best_queued_number { - self.best_queued_number = number; - self.best_queued_hash = *hash; - } - // Update common blocks - for (_, peer) in self.peers.iter_mut() { - if peer.best_number >= number { - peer.common_number = number; - peer.common_hash = *hash; - } - } - } - - pub fn update_chain_info(&mut self, best_header: &Header ) { - let hash = header_hash(&best_header); - self.block_imported(&hash, best_header.number) - } - - pub fn on_block_announce(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, header: &Header) { - let hash = header_hash(&header); - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { - if header.number > peer.best_number { - peer.best_number = header.number; - peer.best_hash = hash; - } - } else { - return; - } - - if !self.is_known_or_already_downloading(protocol, &hash) { - let stale = header.number <= self.best_queued_number; - if stale { - if !self.is_known_or_already_downloading(protocol, &header.parent_hash) { - trace!(target: "sync", "Ignoring unkown stale block announce from {}: {} {:?}", peer_id, hash, header); - } else { - trace!(target: "sync", "Downloading new stale block announced from {}: {} {:?}", peer_id, hash, header); - self.download_stale(io, protocol, peer_id, &hash); - } - } else { - trace!(target: "sync", "Downloading new block announced from {}: {} {:?}", peer_id, hash, header); - self.download_new(io, protocol, peer_id); - } - } else { - trace!(target: "sync", "Known block announce from {}: {}", peer_id, hash); - } - } - - fn is_known_or_already_downloading(&self, protocol: &Protocol, hash: &HeaderHash) -> bool { - self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) - || protocol.chain().block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown) - } - - pub fn peer_disconnected(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) { - self.blocks.clear_peer_download(peer_id); - self.peers.remove(&peer_id); - self.maintain_sync(io, protocol); - } - - pub fn restart(&mut self, io: &mut SyncIo, protocol: &Protocol) { - self.blocks.clear(); - let ids: Vec = self.peers.keys().map(|p| *p).collect(); - for id in ids { - self.new_peer(io, protocol, id); - } - match protocol.chain().info() { - Ok(info) => { - self.best_queued_hash = info.best_queued_hash.unwrap_or(info.chain.best_hash); - self.best_queued_number = info.best_queued_number.unwrap_or(info.chain.best_number); - }, - Err(e) => { - debug!(target:"sync", "Error reading blockchain: {:?}", e); - self.best_queued_hash = self.genesis_hash; - self.best_queued_number = 0; - } - } - } - - pub fn clear(&mut self) { - self.blocks.clear(); - self.peers.clear(); - } - - // Download old block. - fn download_stale(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, hash: &HeaderHash) { - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { - match peer.state { - PeerSyncState::Available => { - let request = message::BlockRequest { - id: 0, - fields: self.required_block_attributes.clone(), - from: message::FromBlock::Hash(*hash), - to: None, - direction: message::Direction::Ascending, - max: Some(1), - }; - peer.state = PeerSyncState::DownloadingStale(*hash); - protocol.send_message(io, peer_id, Message::BlockRequest(request)); - }, - _ => (), - } - } - } - - // Issue a request for a peer to download new blocks, if any are available - fn download_new(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) { - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { - match peer.state { - PeerSyncState::Available => { - if let Some(range) = self.blocks.needed_blocks(peer_id, MAX_BLOCK_DOWNLOAD, peer.common_number, peer.best_number) { - let request = message::BlockRequest { - id: 0, - fields: self.required_block_attributes.clone(), - from: message::FromBlock::Number(range.start), - to: None, - direction: message::Direction::Ascending, - max: Some((range.end - range.start) as u32), - }; - peer.state = PeerSyncState::DownloadingNew(range.start); - protocol.send_message(io, peer_id, Message::BlockRequest(request)); - } - }, - _ => (), - } - } - } - - fn request_ancestry(io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, block: BlockNumber) { - let request = message::BlockRequest { - id: 0, - fields: vec![message::BlockAttribute::Header], - from: message::FromBlock::Number(block), - to: None, - direction: message::Direction::Ascending, - max: Some(1), - }; - protocol.send_message(io, peer_id, Message::BlockRequest(request)); - } -} diff --git a/polkadot/network/src/test/mod.rs b/polkadot/network/src/test/mod.rs deleted file mode 100644 index e69de29bb2..0000000000