Networking tests and fixes (#61)

* BlockId in client interface

* Sync fixes and tests

* Updated to latest primitives

* Updated dependencies

* Updated for new (old) primitives

* Network as workspace member

* substrate-network

* Removed obsolete file

* begin_transaction on hash
This commit is contained in:
Arkadiy Paronyan
2018-02-08 17:49:55 +01:00
committed by GitHub
parent 1a8fe81cfe
commit 7496062052
12 changed files with 0 additions and 1692 deletions
-30
View File
@@ -1,30 +0,0 @@
[package]
description = "Polkadot network protocol"
name = "polkadot-network"
version = "0.1.0"
license = "GPL-3.0"
authors = ["Parity Technologies <admin@parity.io>"]
[lib]
[dependencies]
log = "0.3"
env_logger = "0.4"
rand = "0.3"
heapsize = "0.4"
semver = "0.6"
smallvec = { version = "0.4", features = ["heapsizeof"] }
parking_lot = "0.4"
ipnetwork = "0.12"
error-chain = "0.11"
bitflags = "1.0"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
ethcore-network = { git = "https://github.com/paritytech/parity.git" }
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
substrate-primitives = { path = "../../substrate/primitives" }
substrate-client = { path = "../../substrate/client" }
substrate-state-machine = { path = "../../substrate/state-machine" }
substrate-serializer = { path = "../../substrate/serializer" }
polkadot-primitives = { path = "../primitives" }
-262
View File
@@ -1,262 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
use std::mem;
use std::cmp;
use std::ops::Range;
use std::collections::{HashMap, BTreeMap};
use std::collections::hash_map::Entry;
use network::PeerId;
use primitives::block::Number as BlockNumber;
use message;
const MAX_PARALLEL_DOWNLOADS: u32 = 1;
/// Block data with origin.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockData {
pub block: message::BlockData,
pub origin: PeerId,
}
#[derive(Debug)]
enum BlockRangeState {
Downloading {
len: BlockNumber,
downloading: u32,
},
Complete(Vec<BlockData>),
}
impl BlockRangeState {
pub fn len(&self) -> BlockNumber {
match *self {
BlockRangeState::Downloading { len, .. } => len,
BlockRangeState::Complete(ref blocks) => blocks.len() as BlockNumber,
}
}
}
/// A collection of blocks being downloaded.
#[derive(Default)]
pub struct BlockCollection {
/// Downloaded blocks.
blocks: BTreeMap<BlockNumber, BlockRangeState>,
peer_requests: HashMap<PeerId, BlockNumber>,
}
impl BlockCollection {
/// Create a new instance.
pub fn new() -> BlockCollection {
BlockCollection {
blocks: BTreeMap::new(),
peer_requests: HashMap::new(),
}
}
/// Clear everything.
pub fn clear(&mut self) {
self.blocks.clear();
self.peer_requests.clear();
}
/// Insert a set of blocks into collection.
pub fn insert(&mut self, start: BlockNumber, blocks: Vec<message::BlockData>, peer_id: PeerId) {
if blocks.is_empty() {
return;
}
match self.blocks.get(&start) {
Some(&BlockRangeState::Downloading { .. }) => {
trace!(target: "sync", "Ignored block data still marked as being downloaded: {}", start);
debug_assert!(false);
return;
},
Some(&BlockRangeState::Complete(ref existing)) if existing.len() >= blocks.len() => {
trace!(target: "sync", "Ignored block data already downloaded: {}", start);
return;
},
_ => (),
}
self.blocks.insert(start, BlockRangeState::Complete(blocks.into_iter().map(|b| BlockData { origin: peer_id, block: b }).collect()));
}
/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
pub fn needed_blocks(&mut self, peer_id: PeerId, count: usize, peer_best: BlockNumber, common: BlockNumber) -> Option<Range<BlockNumber>> {
// First block number that we need to download
let first_different = common + 1;
let count = count as BlockNumber;
let (mut range, downloading) = {
let mut downloading_iter = self.blocks.iter().peekable();
let mut prev: Option<(&BlockNumber, &BlockRangeState)> = None;
loop {
let next = downloading_iter.next();
break match &(prev, next) {
&(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _) if downloading < MAX_PARALLEL_DOWNLOADS =>
(*start .. *start + *len, downloading),
&(Some((start, r)), Some((next_start, _))) if start + r.len() < *next_start =>
(*start + r.len() .. cmp::min(*next_start, *start + count), 0), // gap
&(Some((start, r)), None) =>
(start + r.len() .. start + r.len() + count, 0), // last range
&(None, None) =>
(first_different .. first_different + count, 0), // empty
&(None, Some((start, _))) if *start > first_different =>
(first_different .. cmp::min(first_different + count, *start), 0), // gap at the start
_ => {
prev = next;
continue
},
}
}
};
// crop to peers best
if range.start >= peer_best {
return None;
}
range.end = cmp::min(peer_best, range.end);
self.peer_requests.insert(peer_id, range.start);
self.blocks.insert(range.start, BlockRangeState::Downloading{ len: range.end - range.start, downloading: downloading + 1 });
Some(range)
}
/// Get a valid chain of blocks ordered in descending order and ready for importing into blockchain.
pub fn drain(&mut self, from: BlockNumber) -> Vec<BlockData> {
let mut drained = Vec::new();
let mut ranges = Vec::new();
{
let mut prev = from;
for (start, range_data) in &mut self.blocks {
match range_data {
&mut BlockRangeState::Complete(ref mut blocks) if *start <= prev => {
prev = *start + blocks.len() as BlockNumber;
let mut blocks = mem::replace(blocks, Vec::new());
drained.append(&mut blocks);
ranges.push(*start);
},
_ => break,
}
}
}
for r in ranges {
self.blocks.remove(&r);
}
trace!(target: "sync", "Drained {} blocks", drained.len());
drained
}
pub fn clear_peer_download(&mut self, peer_id: PeerId) {
match self.peer_requests.entry(peer_id) {
Entry::Occupied(entry) => {
let start = entry.remove();
let remove = match self.blocks.get_mut(&start) {
Some(&mut BlockRangeState::Downloading { ref mut downloading, .. }) if *downloading > 1 => {
*downloading = *downloading - 1;
false
},
Some(&mut BlockRangeState::Downloading { .. }) => {
true
},
_ => {
debug_assert!(false);
false
}
};
if remove {
self.blocks.remove(&start);
}
},
_ => (),
}
}
}
#[cfg(test)]
mod test {
use super::{BlockCollection, BlockData};
use message;
use primitives::block::HeaderHash;
fn is_empty(bc: &BlockCollection) -> bool {
bc.blocks.is_empty() &&
bc.peer_requests.is_empty()
}
fn generate_blocks(n: usize) -> Vec<message::BlockData> {
(0 .. n).map(|_| message::BlockData {
hash: HeaderHash::random(),
header: None,
body: None,
message_queue: None,
receipt: None,
}).collect()
}
#[test]
fn create_clear() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
bc.insert(1, generate_blocks(100), 0);
assert!(!is_empty(&bc));
bc.clear();
assert!(is_empty(&bc));
}
#[test]
fn insert_blocks() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
let peer0 = 0;
let peer1 = 1;
let peer2 = 2;
let blocks = generate_blocks(150);
assert_eq!(bc.needed_blocks(peer0, 40, 150, 0), Some(1 .. 41));
assert_eq!(bc.needed_blocks(peer1, 40, 150, 0), Some(41 .. 81));
assert_eq!(bc.needed_blocks(peer2, 40, 150, 0), Some(81 .. 121));
bc.clear_peer_download(peer1);
bc.insert(41, blocks[41..81].to_vec(), peer1);
assert_eq!(bc.drain(1), vec![]);
assert_eq!(bc.needed_blocks(peer1, 40, 150, 0), Some(121 .. 150));
bc.clear_peer_download(peer0);
bc.insert(1, blocks[1..11].to_vec(), peer0);
assert_eq!(bc.needed_blocks(peer0, 40, 150, 0), Some(11 .. 41));
assert_eq!(bc.drain(1), blocks[1..11].iter().map(|b| BlockData { block: b.clone(), origin: 0 }).collect::<Vec<_>>());
bc.clear_peer_download(peer0);
bc.insert(11, blocks[11..41].to_vec(), peer0);
let drained = bc.drain(12);
assert_eq!(drained[..30], blocks[11..41].iter().map(|b| BlockData { block: b.clone(), origin: 0 }).collect::<Vec<_>>()[..]);
assert_eq!(drained[30..], blocks[41..81].iter().map(|b| BlockData { block: b.clone(), origin: 1 }).collect::<Vec<_>>()[..]);
bc.clear_peer_download(peer2);
assert_eq!(bc.needed_blocks(peer2, 40, 150, 80), Some(81 .. 121));
bc.clear_peer_download(peer2);
bc.insert(81, blocks[81..121].to_vec(), peer2);
bc.clear_peer_download(peer1);
bc.insert(121, blocks[121..150].to_vec(), peer1);
assert_eq!(bc.drain(80), vec![]);
let drained = bc.drain(81);
assert_eq!(drained[..40], blocks[81..121].iter().map(|b| BlockData { block: b.clone(), origin: 2 }).collect::<Vec<_>>()[..]);
assert_eq!(drained[40..], blocks[121..150].iter().map(|b| BlockData { block: b.clone(), origin: 1 }).collect::<Vec<_>>()[..]);
}
}
-57
View File
@@ -1,57 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Blockchain access trait
use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus};
use client::error::Error;
use state_machine;
use primitives::block;
pub trait Client : Send + Sync {
/// Given a hash return a header
fn import(&self, header: block::Header, body: Option<block::Body>) -> Result<ImportResult, Error>;
/// Get blockchain info.
fn info(&self) -> Result<ClientInfo, Error>;
/// Get block status.
fn block_status(&self, hash: &block::HeaderHash) -> Result<BlockStatus, Error>;
/// Get block hash by number.
fn block_hash(&self, block_number: block::Number) -> Result<Option<block::HeaderHash>, Error>;
}
impl<B, E> Client for PolkadotClient<B, E> where
B: client::backend::Backend + Send + Sync + 'static,
E: state_machine::CodeExecutor + Send + Sync + 'static,
{
fn import(&self, header: block::Header, body: Option<block::Body>) -> Result<ImportResult, Error> {
(self as &Client).import(header, body)
}
fn info(&self) -> Result<ClientInfo, Error> {
(self as &Client).info()
}
fn block_status(&self, hash: &block::HeaderHash) -> Result<BlockStatus, Error> {
(self as &Client).block_status(hash)
}
fn block_hash(&self, block_number: block::Number) -> Result<Option<block::HeaderHash>, Error> {
(self as &Client).block_hash(block_number)
}
}
-30
View File
@@ -1,30 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
use service::Role;
/// Protocol configuration
pub struct ProtocolConfig {
pub roles: Role,
}
impl Default for ProtocolConfig {
fn default() -> ProtocolConfig {
ProtocolConfig {
roles: Role::FULL,
}
}
}
-31
View File
@@ -1,31 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
use network::Error as NetworkError;
use client;
error_chain! {
foreign_links {
Network(NetworkError) #[doc = "Devp2p error."];
}
links {
Client(client::error::Error, client::error::ErrorKind);
}
errors {
}
}
-78
View File
@@ -1,78 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
use network::{NetworkContext, PeerId, Error as NetworkError, SessionInfo};
/// IO interface for the syncing handler.
/// Provides peer connection management and an interface to the blockchain client.
pub trait SyncIo {
/// Disable a peer
fn disable_peer(&mut self, peer_id: PeerId);
/// Disconnect peer
fn disconnect_peer(&mut self, peer_id: PeerId);
/// Send a packet to a peer.
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) -> Result<(), NetworkError>;
/// Returns peer identifier string
fn peer_info(&self, peer_id: PeerId) -> String {
peer_id.to_string()
}
/// Returns information on p2p session
fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo>;
/// Check if the session is expired
fn is_expired(&self) -> bool;
}
/// Wraps `NetworkContext` and the blockchain client
pub struct NetSyncIo<'s, 'h> where 'h: 's {
network: &'s NetworkContext<'h>,
}
impl<'s, 'h> NetSyncIo<'s, 'h> {
/// Creates a new instance from the `NetworkContext` and the blockchain client reference.
pub fn new(network: &'s NetworkContext<'h>) -> NetSyncIo<'s, 'h> {
NetSyncIo {
network: network,
}
}
}
impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
fn disable_peer(&mut self, peer_id: PeerId) {
self.network.disable_peer(peer_id);
}
fn disconnect_peer(&mut self, peer_id: PeerId) {
self.network.disconnect_peer(peer_id);
}
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) -> Result<(), NetworkError>{
self.network.send(peer_id, 0, data)
}
fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo> {
self.network.session_info(peer_id)
}
fn is_expired(&self) -> bool {
self.network.is_expired()
}
fn peer_info(&self, peer_id: PeerId) -> String {
self.network.peer_client_version(peer_id)
}
}
-64
View File
@@ -1,64 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
#![warn(missing_docs)]
//! Implements polkadot protocol version as specified here:
//! https://github.com/paritytech/polkadot/wiki/Network-protocol
extern crate ethcore_network as network;
extern crate ethcore_io as core_io;
extern crate env_logger;
extern crate rand;
extern crate semver;
extern crate parking_lot;
extern crate smallvec;
extern crate ipnetwork;
extern crate substrate_primitives as primitives;
extern crate substrate_state_machine as state_machine;
extern crate substrate_serializer as ser;
extern crate serde;
extern crate serde_json;
// TODO: remove these two; split off dependent logic into polkadot-network and rename this crate
// to substrate-network.
extern crate polkadot_primitives as polkadot_primitives;
extern crate substrate_client as client;
#[macro_use] extern crate serde_derive;
#[macro_use] extern crate log;
#[macro_use] extern crate bitflags;
#[macro_use] extern crate error_chain;
mod service;
mod sync;
mod protocol;
mod io;
mod message;
mod error;
mod config;
mod chain;
mod blocks;
#[cfg(test)]
mod test;
pub use service::Service;
pub use protocol::{ProtocolStatus};
pub use network::{NonReservedPeerMode, ConnectionFilter, ConnectionDirection, NetworkConfiguration};
// TODO: move it elsewhere
fn header_hash(header: &primitives::Header) -> primitives::block::HeaderHash {
primitives::hashing::blake2_256(&ser::to_vec(header)).into()
}
-189
View File
@@ -1,189 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
//! Network packet message types. These get serialized and put into the lower level protocol payload.
use std::borrow::Borrow;
use primitives::AuthorityId;
use primitives::block::{Number as BlockNumber, HeaderHash, Header, Body};
use service::Role as RoleFlags;
use polkadot_primitives::parachain::Id as ParachainId;
pub type RequestId = u64;
type Bytes = Vec<u8>;
type Signature = ::primitives::hash::H256; //TODO:
/// Configured node role.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Role {
/// Full relay chain client with no additional responsibilities.
Full,
/// Relay chain light client.
Light,
/// Parachain validator.
Validator,
/// Parachain collator.
Collator,
}
impl<T> From<T> for RoleFlags where T: Borrow<[Role]> {
fn from(roles: T) -> RoleFlags {
let mut flags = RoleFlags::NONE;
let roles: &[Role] = roles.borrow();
for r in roles {
match *r {
Role::Full => flags = flags | RoleFlags::FULL,
Role::Light => flags = flags | RoleFlags::LIGHT,
Role::Validator => flags = flags | RoleFlags::VALIDATOR,
Role::Collator => flags = flags | RoleFlags::COLLATOR,
}
}
flags
}
}
impl From<RoleFlags> for Vec<Role> where {
fn from(flags: RoleFlags) -> Vec<Role> {
let mut roles = Vec::new();
if !(flags & RoleFlags::FULL).is_empty() {
roles.push(Role::Full);
}
if !(flags & RoleFlags::LIGHT).is_empty() {
roles.push(Role::Light);
}
if !(flags & RoleFlags::VALIDATOR).is_empty() {
roles.push(Role::Validator);
}
if !(flags & RoleFlags::COLLATOR).is_empty() {
roles.push(Role::Collator);
}
roles
}
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Copy, Clone)]
/// Bits of block data and associated artefacts to request.
pub enum BlockAttribute {
/// Include block header.
Header,
/// Include block body.
Body,
/// Include block receipt.
Receipt,
/// Include block message queue.
MessageQueue,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
/// Block data sent in the response.
pub struct BlockData {
/// Block header hash.
pub hash: HeaderHash,
/// Block header if requested.
pub header: Option<Header>,
/// Block body if requested.
pub body: Option<Body>,
/// Block receipt if requested.
pub receipt: Option<Bytes>,
/// Block message queue if requested.
pub message_queue: Option<Bytes>,
}
#[serde(untagged)]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
/// Identifies starting point of a block sequence.
pub enum FromBlock {
/// Start with given hash.
Hash(HeaderHash),
/// Start with given block number.
Number(BlockNumber),
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
/// Block enumeration direction.
pub enum Direction {
/// Enumerate in ascending order (from child to parent).
Ascending,
/// Enumerate in descendfing order (from parent to canonical child).
Descending,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
/// A network message.
pub enum Message {
/// Status packet.
Status(Status),
/// Block request.
BlockRequest(BlockRequest),
/// Block response.
BlockResponse(BlockResponse),
/// Block announce.
BlockAnnounce(BlockAnnounce),
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Status {
/// Protocol version.
pub version: u32,
/// Supported roles.
pub roles: Vec<Role>,
/// Best block number.
pub best_number: BlockNumber,
/// Best block hash.
pub best_hash: HeaderHash,
/// Genesis block hash.
pub genesis_hash: HeaderHash,
/// Signatue of `best_hash` made with validator address. Required for the validator role.
pub validator_signature: Option<Signature>,
/// Validator address. Required for the validator role.
pub validator_id: Option<AuthorityId>,
/// Parachain id. Required for the collator role.
pub parachain_id: Option<ParachainId>,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
/// Request block data from a peer.
pub struct BlockRequest {
/// Unique request id.
pub id: RequestId,
/// Bits of block data to request.
pub fields: Vec<BlockAttribute>,
/// Start from this block.
pub from: FromBlock,
/// End at this block. An implementation defined maximum is used when unspecified.
pub to: Option<HeaderHash>,
/// Sequence direction.
pub direction: Direction,
/// Maximum number of block to return. An implementation defined maximum is used when unspecified.
pub max: Option<u32>,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
/// Response to `BlockRequest`
pub struct BlockResponse {
/// Id of a request this response was made for.
pub id: RequestId,
/// Block data for the requested sequence.
pub blocks: Vec<BlockData>,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
/// Announce a new complete relay chain block on the network.
pub struct BlockAnnounce {
/// New block header.
pub header: Header,
}
-344
View File
@@ -1,344 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
use std::collections::{HashMap, HashSet, BTreeMap};
use std::mem;
use std::sync::Arc;
use parking_lot::RwLock;
use serde_json;
use std::time;
use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header};
use network::{PeerId, NodeId};
use message::{self, Message};
use sync::{ChainSync, Status as SyncStatus};
use service::Role;
use config::ProtocolConfig;
use chain::Client;
use io::SyncIo;
use error;
const REQUEST_TIMEOUT_SEC: u64 = 15;
const PROTOCOL_VERSION: u32 = 0;
// Lock must always be taken in order declared here.
pub struct Protocol {
config: ProtocolConfig,
chain: Arc<Client>,
genesis_hash: HeaderHash,
sync: RwLock<ChainSync>,
/// All connected peers
peers: RwLock<HashMap<PeerId, Peer>>,
/// Connected peers pending Status message.
handshaking_peers: RwLock<HashMap<PeerId, time::Instant>>,
}
/// Syncing status and statistics
#[derive(Clone, Copy)]
pub struct ProtocolStatus {
/// Sync status.
pub sync: SyncStatus,
/// Total number of connected peers
pub num_peers: usize,
/// Total number of active peers.
pub num_active_peers: usize,
}
/// Peer information
struct Peer {
/// Protocol version
protocol_version: u32,
/// Roles
roles: Role,
/// Peer best block hash
best_hash: HeaderHash,
/// Peer best block number
best_number: BlockNumber,
/// Pending block request if any
block_request: Option<message::BlockRequest>,
/// Request timestamp
request_timestamp: Option<time::Instant>,
/// Holds a set of transactions recently sent to this peer to avoid spamming.
_last_sent_transactions: HashSet<TransactionHash>,
/// Request counter,
request_id: message::RequestId,
}
#[derive(Debug)]
pub struct PeerInfo {
/// Roles
pub roles: Role,
/// Protocol version
pub protocol_version: u32,
/// Peer best block hash
pub best_hash: HeaderHash,
/// Peer best block number
pub best_number: BlockNumber,
}
/// Transaction stats
#[derive(Debug)]
pub struct TransactionStats {
/// Block number where this TX was first seen.
pub first_seen: u64,
/// Peers it was propagated to.
pub propagated_to: BTreeMap<NodeId, usize>,
}
impl Protocol {
/// Create a new instance.
pub fn new(config: ProtocolConfig, chain: Arc<Client>) -> error::Result<Protocol> {
let info = chain.info()?;
let protocol = Protocol {
config: config,
chain: chain,
genesis_hash: info.chain.genesis_hash,
sync: RwLock::new(ChainSync::new(&info)),
peers: RwLock::new(HashMap::new()),
handshaking_peers: RwLock::new(HashMap::new()),
};
Ok(protocol)
}
/// Returns protocol status
pub fn status(&self) -> ProtocolStatus {
let sync = self.sync.read();
let peers = self.peers.read();
ProtocolStatus {
sync: sync.status(),
num_peers: peers.values().count(),
num_active_peers: peers.values().filter(|p| p.block_request.is_some()).count(),
}
}
pub fn handle_packet(&self, io: &mut SyncIo, peer_id: PeerId, data: &[u8]) {
let message: Message = match serde_json::from_slice(data) {
Ok(m) => m,
Err(e) => {
debug!("Invalid packet from {}: {}", peer_id, e);
io.disable_peer(peer_id);
return;
}
};
match message {
Message::Status(s) => self.on_status_message(io, peer_id, s),
Message::BlockRequest(r) => self.on_block_request(io, peer_id, r),
Message::BlockResponse(r) => {
let request = {
let mut peers = self.peers.write();
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
peer.request_timestamp = None;
match mem::replace(&mut peer.block_request, None) {
Some(r) => r,
None => {
debug!("Unexpected response packet from {}", peer_id);
io.disable_peer(peer_id);
return;
}
}
} else {
debug!("Unexpected packet from {}", peer_id);
io.disable_peer(peer_id);
return;
}
};
if request.id != r.id {
trace!("Ignoring mismatched response packet from {}", peer_id);
return;
}
self.on_block_response(io, peer_id, request, r);
},
Message::BlockAnnounce(announce) => {
self.on_block_announce(io, peer_id, announce);
}
}
}
pub fn send_message(&self, io: &mut SyncIo, peer_id: PeerId, mut message: Message) {
let mut peers = self.peers.write();
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
match &mut message {
&mut Message::BlockRequest(ref mut r) => {
peer.block_request = Some(r.clone());
peer.request_timestamp = Some(time::Instant::now());
r.id = peer.request_id;
peer.request_id = peer.request_id + 1;
},
_ => (),
}
}
let data = serde_json::to_vec(&message).expect("Serializer is infallible; qed");
if let Err(e) = io.send(peer_id, data) {
debug!(target:"sync", "Error sending message: {:?}", e);
io.disconnect_peer(peer_id);
}
}
/// Called when a new peer is connected
pub fn on_peer_connected(&self, io: &mut SyncIo, peer_id: PeerId) {
trace!(target: "sync", "Connected {}: {}", peer_id, io.peer_info(peer_id));
self.handshaking_peers.write().insert(peer_id, time::Instant::now());
self.send_status(io, peer_id);
}
/// Called by peer when it is disconnecting
pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: PeerId) {
trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_info(peer));
let removed = {
let mut peers = self.peers.write();
let mut handshaking_peers = self.handshaking_peers.write();
handshaking_peers.remove(&peer);
peers.remove(&peer).is_some()
};
if removed {
self.sync.write().peer_disconnected(io, self, peer);
}
}
pub fn on_block_request(&self, _io: &mut SyncIo, _peer_id: PeerId, _request: message::BlockRequest) {
}
pub fn on_block_response(&self, io: &mut SyncIo, peer_id: PeerId, request: message::BlockRequest, response: message::BlockResponse) {
// TODO: validate response
self.sync.write().on_block_data(io, self, peer_id, request, response);
}
pub fn tick(&self, io: &mut SyncIo) {
self.maintain_peers(io);
}
fn maintain_peers(&self, io: &mut SyncIo) {
let tick = time::Instant::now();
let mut aborting = Vec::new();
{
let peers = self.peers.read();
let handshaking_peers = self.handshaking_peers.read();
for (peer_id, timestamp) in peers.iter()
.filter_map(|(id, peer)| peer.request_timestamp.as_ref().map(|r| (id, r)))
.chain(handshaking_peers.iter()) {
if (tick - *timestamp).as_secs() > REQUEST_TIMEOUT_SEC {
trace!(target:"sync", "Timeout {}", peer_id);
io.disconnect_peer(*peer_id);
aborting.push(*peer_id);
}
}
}
for p in aborting {
self.on_peer_disconnected(io, p);
}
}
pub fn peer_info(&self, peer: PeerId) -> Option<PeerInfo> {
self.peers.read().get(&peer).map(|p| {
PeerInfo {
roles: p.roles,
protocol_version: p.protocol_version,
best_hash: p.best_hash,
best_number: p.best_number,
}
})
}
/// Called by peer to report status
fn on_status_message(&self, io: &mut SyncIo, peer_id: PeerId, status: message::Status) {
trace!(target: "sync", "New peer {} {:?}", peer_id, status);
if io.is_expired() {
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
return;
}
{
let mut peers = self.peers.write();
let mut handshaking_peers = self.handshaking_peers.write();
if peers.contains_key(&peer_id) {
debug!(target: "sync", "Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
return;
}
if status.genesis_hash != self.genesis_hash {
io.disable_peer(peer_id);
trace!(target: "sync", "Peer {} genesis hash mismatch (ours: {}, theirs: {})", peer_id, self.genesis_hash, status.genesis_hash);
return;
}
if status.version != PROTOCOL_VERSION {
io.disable_peer(peer_id);
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, status.version);
return;
}
let peer = Peer {
protocol_version: status.version,
roles: status.roles.into(),
best_hash: status.best_hash,
best_number: status.best_number,
block_request: None,
request_timestamp: None,
_last_sent_transactions: HashSet::new(),
request_id: 0,
};
peers.insert(peer_id.clone(), peer);
handshaking_peers.remove(&peer_id);
debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id));
}
self.sync.write().new_peer(io, self, peer_id);
}
/// Send Status message
fn send_status(&self, io: &mut SyncIo, peer_id: PeerId) {
if let Ok(info) = self.chain.info() {
let status = message::Status {
version: PROTOCOL_VERSION,
genesis_hash: info.chain.genesis_hash,
roles: self.config.roles.into(),
best_number: info.chain.best_number,
best_hash: info.chain.best_hash,
validator_signature: None,
validator_id: None,
parachain_id: None,
};
self.send_message(io, peer_id, Message::Status(status))
}
}
pub fn abort(&self) {
let mut sync = self.sync.write();
let mut peers = self.peers.write();
let mut handshaking_peers = self.handshaking_peers.write();
sync.clear();
peers.clear();
handshaking_peers.clear();
}
pub fn on_block_announce(&self, io: &mut SyncIo, peer_id: PeerId, announce: message::BlockAnnounce) {
let header = announce.header;
self.sync.write().on_block_announce(io, self, peer_id, &header);
}
pub fn on_block_imported(&self, header: &Header) {
self.sync.write().update_chain_info(&header);
}
pub fn on_new_transactions(&self) {
}
pub fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> {
BTreeMap::new()
}
pub fn chain(&self) -> &Client {
&*self.chain
}
}
-239
View File
@@ -1,239 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
use std::sync::Arc;
use std::collections::{BTreeMap};
use std::io;
use network::{NetworkProtocolHandler, NetworkService, NetworkContext, HostInfo, PeerId, ProtocolId,
NetworkConfiguration , NonReservedPeerMode, ErrorKind};
use primitives::block::{TransactionHash, Header};
use core_io::{TimerToken};
use io::NetSyncIo;
use protocol::{Protocol, ProtocolStatus, PeerInfo as ProtocolPeerInfo, TransactionStats};
use config::{ProtocolConfig};
use error::Error;
use chain::Client;
/// Polkadot devp2p protocol id
pub const DOT_PROTOCOL_ID: ProtocolId = *b"dot";
bitflags! {
pub struct Role: u32 {
const NONE = 0b00000000;
const FULL = 0b00000001;
const LIGHT = 0b00000010;
const VALIDATOR = 0b00000100;
const COLLATOR = 0b00001000;
}
}
/// Sync status
pub trait SyncProvider: Send + Sync {
/// Get sync status
fn status(&self) -> ProtocolStatus;
/// Get peers information
fn peers(&self) -> Vec<PeerInfo>;
/// Get this node id if available.
fn node_id(&self) -> Option<String>;
/// Returns propagation count for pending transactions.
fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats>;
}
/// Peer connection information
#[derive(Debug)]
pub struct PeerInfo {
/// Public node id
pub id: Option<String>,
/// Node client ID
pub client_version: String,
/// Capabilities
pub capabilities: Vec<String>,
/// Remote endpoint address
pub remote_address: String,
/// Local endpoint address
pub local_address: String,
/// Dot protocol info.
pub dot_info: Option<ProtocolPeerInfo>,
}
/// Service initialization parameters.
pub struct Params {
/// Configuration.
pub config: ProtocolConfig,
/// Network layer configuration.
pub network_config: NetworkConfiguration,
/// Polkadot relay chain access point.
pub chain: Arc<Client>,
}
/// Polkadot network service. Handles network IO and manages connectivity.
pub struct Service {
/// Network service
network: NetworkService,
/// Devp2p protocol handler
handler: Arc<ProtocolHandler>,
}
impl Service {
/// Creates and register protocol with the network service
pub fn new(params: Params) -> Result<Arc<Service>, Error> {
let service = NetworkService::new(params.network_config.clone(), None)?;
let sync = Arc::new(Service {
network: service,
handler: Arc::new(ProtocolHandler {
protocol: Protocol::new(params.config, params.chain.clone())?,
}),
});
Ok(sync)
}
/// Called when a new block is imported by the client.
pub fn on_block_imported(&self, header: &Header) {
self.handler.protocol.on_block_imported(header)
}
/// Called when new transactons are imported by the client.
pub fn on_new_transactions(&self) {
self.handler.protocol.on_new_transactions()
}
fn start(&self) {
match self.network.start().map_err(Into::into) {
Err(ErrorKind::Io(ref e)) if e.kind() == io::ErrorKind::AddrInUse =>
warn!("Network port {:?} is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option.", self.network.config().listen_address.expect("Listen address is not set.")),
Err(err) => warn!("Error starting network: {}", err),
_ => {},
};
self.network.register_protocol(self.handler.clone(), DOT_PROTOCOL_ID, 1, &[0u8])
.unwrap_or_else(|e| warn!("Error registering polkadot protocol: {:?}", e));
}
fn stop(&self) {
self.handler.protocol.abort();
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
}
}
impl SyncProvider for Service {
/// Get sync status
fn status(&self) -> ProtocolStatus {
self.handler.protocol.status()
}
/// Get sync peers
fn peers(&self) -> Vec<PeerInfo> {
self.network.with_context_eval(DOT_PROTOCOL_ID, |ctx| {
let peer_ids = self.network.connected_peers();
peer_ids.into_iter().filter_map(|peer_id| {
let session_info = match ctx.session_info(peer_id) {
None => return None,
Some(info) => info,
};
Some(PeerInfo {
id: session_info.id.map(|id| format!("{:x}", id)),
client_version: session_info.client_version,
capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(),
remote_address: session_info.remote_address,
local_address: session_info.local_address,
dot_info: self.handler.protocol.peer_info(peer_id),
})
}).collect()
}).unwrap_or_else(Vec::new)
}
fn node_id(&self) -> Option<String> {
self.network.external_url()
}
fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> {
self.handler.protocol.transactions_stats()
}
}
struct ProtocolHandler {
/// Protocol handler
protocol: Protocol,
}
impl NetworkProtocolHandler for ProtocolHandler {
fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
io.register_timer(0, 1000).expect("Error registering sync timer");
}
fn read(&self, io: &NetworkContext, peer: &PeerId, _packet_id: u8, data: &[u8]) {
self.protocol.handle_packet(&mut NetSyncIo::new(io), *peer, data);
}
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
self.protocol.on_peer_connected(&mut NetSyncIo::new(io), *peer);
}
fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer);
}
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
self.protocol.tick(&mut NetSyncIo::new(io));
}
}
/// Trait for managing network
pub trait ManageNetwork : Send + Sync {
/// Set to allow unreserved peers to connect
fn accept_unreserved_peers(&self);
/// Set to deny unreserved peers to connect
fn deny_unreserved_peers(&self);
/// Remove reservation for the peer
fn remove_reserved_peer(&self, peer: String) -> Result<(), String>;
/// Add reserved peer
fn add_reserved_peer(&self, peer: String) -> Result<(), String>;
/// Start network
fn start_network(&self);
/// Stop network
fn stop_network(&self);
}
impl ManageNetwork for Service {
fn accept_unreserved_peers(&self) {
self.network.set_non_reserved_mode(NonReservedPeerMode::Accept);
}
fn deny_unreserved_peers(&self) {
self.network.set_non_reserved_mode(NonReservedPeerMode::Deny);
}
fn remove_reserved_peer(&self, peer: String) -> Result<(), String> {
self.network.remove_reserved_peer(&peer).map_err(|e| format!("{:?}", e))
}
fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
self.network.add_reserved_peer(&peer).map_err(|e| format!("{:?}", e))
}
fn start_network(&self) {
self.start();
}
fn stop_network(&self) {
self.stop();
}
}
-368
View File
@@ -1,368 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
use std::collections::HashMap;
use io::SyncIo;
use protocol::Protocol;
use network::PeerId;
use client::{ImportResult, BlockStatus, ClientInfo};
use primitives::block::{HeaderHash, Number as BlockNumber, Header};
use blocks::{self, BlockCollection};
use message::{self, Message};
use super::header_hash;
// Maximum parallel requests for a block.
const MAX_BLOCK_DOWNLOAD: usize = 1;
struct PeerSync {
pub common_hash: HeaderHash,
pub common_number: BlockNumber,
pub best_hash: HeaderHash,
pub best_number: BlockNumber,
pub state: PeerSyncState,
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum PeerSyncState {
AncestorSearch(BlockNumber),
Available,
DownloadingNew(BlockNumber),
DownloadingStale(HeaderHash),
}
/// Relay chain sync strategy.
pub struct ChainSync {
genesis_hash: HeaderHash,
peers: HashMap<PeerId, PeerSync>,
blocks: BlockCollection,
best_queued_number: BlockNumber,
best_queued_hash: HeaderHash,
required_block_attributes: Vec<message::BlockAttribute>,
}
/// Syncing status and statistics
#[derive(Clone, Copy)]
pub struct Status;
impl ChainSync {
pub fn new(info: &ClientInfo) -> ChainSync {
ChainSync {
genesis_hash: info.chain.genesis_hash,
peers: HashMap::new(),
blocks: BlockCollection::new(),
best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash),
best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number),
required_block_attributes: vec![message::BlockAttribute::Header, message::BlockAttribute::Body],
}
}
/// Returns sync status
pub fn status(&self) -> Status {
Status
}
pub fn new_peer(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) {
if let Some(info) = protocol.peer_info(peer_id) {
match (protocol.chain().block_status(&info.best_hash), info.best_number) {
(Err(e), _) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
io.disconnect_peer(peer_id);
},
(Ok(BlockStatus::KnownBad), _) => {
debug!(target:"sync", "New peer with known bad best block {} ({}).", info.best_hash, info.best_number);
io.disable_peer(peer_id);
},
(Ok(BlockStatus::Unknown), 0) => {
debug!(target:"sync", "New peer with unkown genesis hash {} ({}).", info.best_hash, info.best_number);
io.disable_peer(peer_id);
},
(Ok(BlockStatus::Unknown), _) => {
debug!(target:"sync", "New peer with unkown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number);
self.peers.insert(peer_id, PeerSync {
common_hash: self.genesis_hash,
common_number: 0,
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::AncestorSearch(info.best_number - 1),
});
Self::request_ancestry(io, protocol, peer_id, info.best_number - 1)
},
(Ok(BlockStatus::Queued), _) | (Ok(BlockStatus::InChain), _) => {
debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number);
self.peers.insert(peer_id, PeerSync {
common_hash: info.best_hash,
common_number: info.best_number,
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::Available,
});
}
}
}
}
pub fn on_block_data(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, _request: message::BlockRequest, response: message::BlockResponse) {
let count = response.blocks.len();
let mut imported: usize = 0;
let new_blocks = if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
match peer.state {
PeerSyncState::DownloadingNew(start_block) => {
self.blocks.clear_peer_download(peer_id);
peer.state = PeerSyncState::Available;
self.blocks.insert(start_block, response.blocks, peer_id);
self.blocks.drain(self.best_queued_number + 1)
},
PeerSyncState::DownloadingStale(_) => {
peer.state = PeerSyncState::Available;
response.blocks.into_iter().map(|b| blocks::BlockData {
origin: peer_id,
block: b
}).collect()
},
PeerSyncState::AncestorSearch(n) => {
match response.blocks.get(0) {
Some(ref block) => {
match protocol.chain().block_hash(n) {
Ok(Some(block_hash)) if block_hash == block.hash => {
peer.common_hash = block.hash;
peer.common_number = n;
peer.state = PeerSyncState::Available;
trace!(target:"sync", "Found common ancestor for peer {}: {} ({})", peer_id, block.hash, n);
vec![]
},
Ok(_) if n > 0 => {
let n = n - 1;
peer.state = PeerSyncState::AncestorSearch(n);
Self::request_ancestry(io, protocol, peer_id, n);
return;
},
Ok(_) => { // genesis mismatch
io.disable_peer(peer_id);
return;
},
Err(e) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
io.disconnect_peer(peer_id);
return;
}
}
},
None => {
trace!(target:"sync", "Invalid response when searching for ancestor from {}", peer_id);
io.disconnect_peer(peer_id);
return;
}
}
},
PeerSyncState::Available => Vec::new(),
}
} else {
vec![]
};
// Blocks in the response/drain should be in ascending order.
for block in new_blocks {
let origin = block.origin;
let block = block.block;
if let Some(header) = block.header {
let number = header.number;
let hash = header_hash(&header);
let parent = header.parent_hash;
let result = protocol.chain().import(header, block.body);
match result {
Ok(ImportResult::AlreadyInChain) => {
trace!(target: "sync", "Block already in chain {}: {:?}", number, hash);
self.block_imported(&hash, number);
},
Ok(ImportResult::AlreadyQueued) => {
trace!(target: "sync", "Block already queued {}: {:?}", number, hash);
self.block_imported(&hash, number);
},
Ok(ImportResult::Queued) => {
trace!(target: "sync", "Block queued {}: {:?}", number, hash);
self.block_imported(&hash, number);
imported = imported + 1;
},
Ok(ImportResult::UnknownParent) => {
debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent);
self.restart(io, protocol);
return;
},
Ok(ImportResult::KnownBad) => {
debug!(target: "sync", "Bad block {}: {:?}", number, hash);
io.disable_peer(origin); //TODO: use persistent ID
self.restart(io, protocol);
return;
}
Err(e) => {
debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e);
self.restart(io, protocol);
return;
}
}
}
}
trace!(target: "sync", "Imported {} of {}", imported, count);
self.maintain_sync(io, protocol);
}
fn maintain_sync(&mut self, io: &mut SyncIo, protocol: &Protocol) {
let peers: Vec<PeerId> = self.peers.keys().map(|p| *p).collect();
for peer in peers {
self.download_new(io, protocol, peer);
}
}
fn block_imported(&mut self, hash: &HeaderHash, number: BlockNumber) {
if number > self.best_queued_number {
self.best_queued_number = number;
self.best_queued_hash = *hash;
}
// Update common blocks
for (_, peer) in self.peers.iter_mut() {
if peer.best_number >= number {
peer.common_number = number;
peer.common_hash = *hash;
}
}
}
pub fn update_chain_info(&mut self, best_header: &Header ) {
let hash = header_hash(&best_header);
self.block_imported(&hash, best_header.number)
}
pub fn on_block_announce(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, header: &Header) {
let hash = header_hash(&header);
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
if header.number > peer.best_number {
peer.best_number = header.number;
peer.best_hash = hash;
}
} else {
return;
}
if !self.is_known_or_already_downloading(protocol, &hash) {
let stale = header.number <= self.best_queued_number;
if stale {
if !self.is_known_or_already_downloading(protocol, &header.parent_hash) {
trace!(target: "sync", "Ignoring unkown stale block announce from {}: {} {:?}", peer_id, hash, header);
} else {
trace!(target: "sync", "Downloading new stale block announced from {}: {} {:?}", peer_id, hash, header);
self.download_stale(io, protocol, peer_id, &hash);
}
} else {
trace!(target: "sync", "Downloading new block announced from {}: {} {:?}", peer_id, hash, header);
self.download_new(io, protocol, peer_id);
}
} else {
trace!(target: "sync", "Known block announce from {}: {}", peer_id, hash);
}
}
fn is_known_or_already_downloading(&self, protocol: &Protocol, hash: &HeaderHash) -> bool {
self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
|| protocol.chain().block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
}
pub fn peer_disconnected(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) {
self.blocks.clear_peer_download(peer_id);
self.peers.remove(&peer_id);
self.maintain_sync(io, protocol);
}
pub fn restart(&mut self, io: &mut SyncIo, protocol: &Protocol) {
self.blocks.clear();
let ids: Vec<PeerId> = self.peers.keys().map(|p| *p).collect();
for id in ids {
self.new_peer(io, protocol, id);
}
match protocol.chain().info() {
Ok(info) => {
self.best_queued_hash = info.best_queued_hash.unwrap_or(info.chain.best_hash);
self.best_queued_number = info.best_queued_number.unwrap_or(info.chain.best_number);
},
Err(e) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
self.best_queued_hash = self.genesis_hash;
self.best_queued_number = 0;
}
}
}
pub fn clear(&mut self) {
self.blocks.clear();
self.peers.clear();
}
// Download old block.
fn download_stale(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, hash: &HeaderHash) {
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
match peer.state {
PeerSyncState::Available => {
let request = message::BlockRequest {
id: 0,
fields: self.required_block_attributes.clone(),
from: message::FromBlock::Hash(*hash),
to: None,
direction: message::Direction::Ascending,
max: Some(1),
};
peer.state = PeerSyncState::DownloadingStale(*hash);
protocol.send_message(io, peer_id, Message::BlockRequest(request));
},
_ => (),
}
}
}
// Issue a request for a peer to download new blocks, if any are available
fn download_new(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) {
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
match peer.state {
PeerSyncState::Available => {
if let Some(range) = self.blocks.needed_blocks(peer_id, MAX_BLOCK_DOWNLOAD, peer.common_number, peer.best_number) {
let request = message::BlockRequest {
id: 0,
fields: self.required_block_attributes.clone(),
from: message::FromBlock::Number(range.start),
to: None,
direction: message::Direction::Ascending,
max: Some((range.end - range.start) as u32),
};
peer.state = PeerSyncState::DownloadingNew(range.start);
protocol.send_message(io, peer_id, Message::BlockRequest(request));
}
},
_ => (),
}
}
}
fn request_ancestry(io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, block: BlockNumber) {
let request = message::BlockRequest {
id: 0,
fields: vec![message::BlockAttribute::Header],
from: message::FromBlock::Number(block),
to: None,
direction: message::Direction::Ascending,
max: Some(1),
};
protocol.send_message(io, peer_id, Message::BlockRequest(request));
}
}
View File