mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 08:51:09 +00:00
Networking tests and fixes (#61)
* BlockId in client interface * Sync fixes and tests * Updated to latest primitives * Updated dependencies * Updated for new (old) primitives * Network as workspace member * substrate-network * Removed obsolete file * begin_transaction on hash
This commit is contained in:
@@ -35,6 +35,7 @@ struct PendingBlock {
|
||||
is_best: bool,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Clone)]
|
||||
struct Block {
|
||||
header: block::Header,
|
||||
body: Option<block::Body>,
|
||||
@@ -46,6 +47,7 @@ pub struct BlockImportOperation {
|
||||
pending_state: state_machine::backend::InMemory,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct BlockchainStorage {
|
||||
blocks: HashMap<HeaderHash, Block>,
|
||||
hashes: HashMap<block::Number, HeaderHash>,
|
||||
@@ -59,6 +61,14 @@ pub struct Blockchain {
|
||||
storage: RwLock<BlockchainStorage>,
|
||||
}
|
||||
|
||||
impl Clone for Blockchain {
|
||||
fn clone(&self) -> Blockchain {
|
||||
Blockchain {
|
||||
storage: RwLock::new(self.storage.read().clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Blockchain {
|
||||
fn id(&self, id: BlockId) -> Option<HeaderHash> {
|
||||
match id {
|
||||
@@ -96,6 +106,21 @@ impl Blockchain {
|
||||
storage.genesis_hash = hash;
|
||||
}
|
||||
}
|
||||
|
||||
/// Compare this blockchain with another in-mem blockchain
|
||||
pub fn equals_to(&self, other: &Blockchain) -> bool {
|
||||
self.canon_equals_to(other) && self.storage.read().blocks == other.storage.read().blocks
|
||||
}
|
||||
|
||||
/// Compare canonical chain to other canonical chain.
|
||||
pub fn canon_equals_to(&self, other: &Blockchain) -> bool {
|
||||
let this = self.storage.read();
|
||||
let other = other.storage.read();
|
||||
this.hashes == other.hashes
|
||||
&& this.best_hash == other.best_hash
|
||||
&& this.best_number == other.best_number
|
||||
&& this.genesis_hash == other.genesis_hash
|
||||
}
|
||||
}
|
||||
|
||||
impl blockchain::Backend for Blockchain {
|
||||
@@ -167,6 +192,39 @@ impl Backend {
|
||||
blockchain: Blockchain::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate and import a sequence of blocks. A user supplied function is allowed to modify each block header. Useful for testing.
|
||||
pub fn generate_blocks<F>(&self, count: usize, edit_header: F) where F: Fn(&mut block::Header) {
|
||||
use backend::{Backend, BlockImportOperation};
|
||||
let info = blockchain::Backend::info(&self.blockchain).expect("In-memory backend never fails");
|
||||
let mut best_num = info.best_number;
|
||||
let mut best_hash = info.best_hash;
|
||||
let state_root = blockchain::Backend::header(&self.blockchain, BlockId::Hash(best_hash))
|
||||
.expect("In-memory backend never fails")
|
||||
.expect("Best header always exists in the blockchain")
|
||||
.state_root;
|
||||
for _ in 0 .. count {
|
||||
best_num = best_num + 1;
|
||||
let mut header = block::Header {
|
||||
parent_hash: best_hash,
|
||||
number: best_num,
|
||||
state_root: state_root,
|
||||
transaction_root: Default::default(),
|
||||
digest: Default::default(),
|
||||
};
|
||||
edit_header(&mut header);
|
||||
|
||||
let mut tx = self.begin_transaction(BlockId::Hash(best_hash)).expect("In-memory backend does not fail");
|
||||
best_hash = header_hash(&header);
|
||||
tx.import_block(header, None, true).expect("In-memory backend does not fail");
|
||||
self.commit_transaction(tx).expect("In-memory backend does not fail");
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate and import a sequence of blocks. Useful for testing.
|
||||
pub fn push_blocks(&self, count: usize) {
|
||||
self.generate_blocks(count, |_| {})
|
||||
}
|
||||
}
|
||||
|
||||
impl backend::Backend for Backend {
|
||||
|
||||
@@ -139,22 +139,27 @@ impl<B, E> Client<B, E> where
|
||||
})
|
||||
}
|
||||
|
||||
fn state_at(&self, hash: &block::HeaderHash) -> error::Result<B::State> {
|
||||
self.backend.state_at(BlockId::Hash(*hash))
|
||||
fn state_at(&self, id: BlockId) -> error::Result<B::State> {
|
||||
self.backend.state_at(id)
|
||||
}
|
||||
|
||||
/// Return single storage entry of contract under given address in state in a block of given hash.
|
||||
pub fn storage(&self, hash: &block::HeaderHash, key: &StorageKey) -> error::Result<StorageData> {
|
||||
Ok(self.state_at(hash)?
|
||||
/// Expose backend reference. To be used in tests only
|
||||
pub fn backend(&self) -> &B {
|
||||
&self.backend
|
||||
}
|
||||
|
||||
/// Return single storage entry of contract under given address in state in a block of given id.
|
||||
pub fn storage(&self, id: &BlockId, key: &StorageKey) -> error::Result<StorageData> {
|
||||
Ok(self.state_at(*id)?
|
||||
.storage(&key.0)
|
||||
.map(|x| StorageData(x.to_vec()))?)
|
||||
}
|
||||
|
||||
/// Execute a call to a contract on top of state in a block of given hash.
|
||||
/// Execute a call to a contract on top of state in a block of given id.
|
||||
///
|
||||
/// No changes are made.
|
||||
pub fn call(&self, hash: &block::HeaderHash, method: &str, call_data: &[u8]) -> error::Result<CallResult> {
|
||||
let state = self.state_at(hash)?;
|
||||
pub fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> error::Result<CallResult> {
|
||||
let state = self.state_at(*id)?;
|
||||
let mut changes = state_machine::OverlayedChanges::default();
|
||||
|
||||
let _ = state_machine::execute(
|
||||
@@ -176,7 +181,7 @@ impl<B, E> Client<B, E> where
|
||||
blockchain::BlockStatus::Unknown => return Ok(ImportResult::UnknownParent),
|
||||
}
|
||||
|
||||
let mut transaction = self.backend.begin_transaction(BlockId::Number(header.number))?;
|
||||
let mut transaction = self.backend.begin_transaction(BlockId::Hash(header.parent_hash))?;
|
||||
let mut _state = transaction.state()?;
|
||||
// TODO: execute block on _state
|
||||
|
||||
@@ -197,9 +202,9 @@ impl<B, E> Client<B, E> where
|
||||
}
|
||||
|
||||
/// Get block status.
|
||||
pub fn block_status(&self, hash: &block::HeaderHash) -> error::Result<BlockStatus> {
|
||||
pub fn block_status(&self, id: &BlockId) -> error::Result<BlockStatus> {
|
||||
// TODO: more efficient implementation
|
||||
match self.backend.blockchain().header(BlockId::Hash(*hash)).map_err(|e| error::Error::from_blockchain(Box::new(e)))?.is_some() {
|
||||
match self.backend.blockchain().header(*id).map_err(|e| error::Error::from_blockchain(Box::new(e)))?.is_some() {
|
||||
true => Ok(BlockStatus::InChain),
|
||||
false => Ok(BlockStatus::Unknown),
|
||||
}
|
||||
@@ -210,8 +215,13 @@ impl<B, E> Client<B, E> where
|
||||
self.backend.blockchain().hash(block_number)
|
||||
}
|
||||
|
||||
/// Get block header by hash.
|
||||
pub fn header(&self, hash: &block::HeaderHash) -> error::Result<Option<block::Header>> {
|
||||
self.backend.blockchain().header(BlockId::Hash(*hash))
|
||||
/// Get block header by id.
|
||||
pub fn header(&self, id: &BlockId) -> error::Result<Option<block::Header>> {
|
||||
self.backend.blockchain().header(*id)
|
||||
}
|
||||
|
||||
/// Get block body by id.
|
||||
pub fn body(&self, id: &BlockId) -> error::Result<Option<block::Body>> {
|
||||
self.backend.blockchain().body(*id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -330,9 +330,8 @@ impl CodeExecutor for WasmExecutor {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use rustc_hex::FromHex;
|
||||
use codec::{Slicable, Joiner};
|
||||
use codec::Slicable;
|
||||
use state_machine::TestExternalities;
|
||||
use primitives::Header;
|
||||
|
||||
#[test]
|
||||
fn returning_should_work() {
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
[package]
|
||||
description = "Polkadot network protocol"
|
||||
name = "substrate-network"
|
||||
version = "0.1.0"
|
||||
license = "GPL-3.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
|
||||
[lib]
|
||||
|
||||
[dependencies]
|
||||
log = "0.3"
|
||||
rand = "0.3"
|
||||
parking_lot = "0.4"
|
||||
error-chain = "0.11"
|
||||
bitflags = "1.0"
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0"
|
||||
ethcore-network = { git = "https://github.com/paritytech/parity.git" }
|
||||
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
|
||||
substrate-primitives = { path = "../../substrate/primitives" }
|
||||
substrate-client = { path = "../../substrate/client" }
|
||||
substrate-state-machine = { path = "../../substrate/state-machine" }
|
||||
substrate-serializer = { path = "../../substrate/serializer" }
|
||||
|
||||
[dev-dependencies]
|
||||
substrate-executor = { path = "../../substrate/executor" }
|
||||
env_logger = "0.4"
|
||||
@@ -0,0 +1,263 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
|
||||
|
||||
use std::mem;
|
||||
use std::cmp;
|
||||
use std::ops::Range;
|
||||
use std::collections::{HashMap, BTreeMap};
|
||||
use std::collections::hash_map::Entry;
|
||||
use network::PeerId;
|
||||
use primitives::block::Number as BlockNumber;
|
||||
use message;
|
||||
|
||||
const MAX_PARALLEL_DOWNLOADS: u32 = 1;
|
||||
|
||||
/// Block data with origin.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct BlockData {
|
||||
pub block: message::BlockData,
|
||||
pub origin: PeerId,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum BlockRangeState {
|
||||
Downloading {
|
||||
len: BlockNumber,
|
||||
downloading: u32,
|
||||
},
|
||||
Complete(Vec<BlockData>),
|
||||
}
|
||||
|
||||
impl BlockRangeState {
|
||||
pub fn len(&self) -> BlockNumber {
|
||||
match *self {
|
||||
BlockRangeState::Downloading { len, .. } => len,
|
||||
BlockRangeState::Complete(ref blocks) => blocks.len() as BlockNumber,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A collection of blocks being downloaded.
|
||||
#[derive(Default)]
|
||||
pub struct BlockCollection {
|
||||
/// Downloaded blocks.
|
||||
blocks: BTreeMap<BlockNumber, BlockRangeState>,
|
||||
peer_requests: HashMap<PeerId, BlockNumber>,
|
||||
}
|
||||
|
||||
impl BlockCollection {
|
||||
/// Create a new instance.
|
||||
pub fn new() -> BlockCollection {
|
||||
BlockCollection {
|
||||
blocks: BTreeMap::new(),
|
||||
peer_requests: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Clear everything.
|
||||
pub fn clear(&mut self) {
|
||||
self.blocks.clear();
|
||||
self.peer_requests.clear();
|
||||
}
|
||||
|
||||
/// Insert a set of blocks into collection.
|
||||
pub fn insert(&mut self, start: BlockNumber, blocks: Vec<message::BlockData>, peer_id: PeerId) {
|
||||
if blocks.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
match self.blocks.get(&start) {
|
||||
Some(&BlockRangeState::Downloading { .. }) => {
|
||||
trace!(target: "sync", "Ignored block data still marked as being downloaded: {}", start);
|
||||
debug_assert!(false);
|
||||
return;
|
||||
},
|
||||
Some(&BlockRangeState::Complete(ref existing)) if existing.len() >= blocks.len() => {
|
||||
trace!(target: "sync", "Ignored block data already downloaded: {}", start);
|
||||
return;
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
|
||||
self.blocks.insert(start, BlockRangeState::Complete(blocks.into_iter().map(|b| BlockData { origin: peer_id, block: b }).collect()));
|
||||
}
|
||||
|
||||
/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
|
||||
pub fn needed_blocks(&mut self, peer_id: PeerId, count: usize, peer_best: BlockNumber, common: BlockNumber) -> Option<Range<BlockNumber>> {
|
||||
// First block number that we need to download
|
||||
let first_different = common + 1;
|
||||
let count = count as BlockNumber;
|
||||
let (mut range, downloading) = {
|
||||
let mut downloading_iter = self.blocks.iter().peekable();
|
||||
let mut prev: Option<(&BlockNumber, &BlockRangeState)> = None;
|
||||
loop {
|
||||
let next = downloading_iter.next();
|
||||
break match &(prev, next) {
|
||||
&(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _) if downloading < MAX_PARALLEL_DOWNLOADS =>
|
||||
(*start .. *start + *len, downloading),
|
||||
&(Some((start, r)), Some((next_start, _))) if start + r.len() < *next_start =>
|
||||
(*start + r.len() .. cmp::min(*next_start, *start + count), 0), // gap
|
||||
&(Some((start, r)), None) =>
|
||||
(start + r.len() .. start + r.len() + count, 0), // last range
|
||||
&(None, None) =>
|
||||
(first_different .. first_different + count, 0), // empty
|
||||
&(None, Some((start, _))) if *start > first_different =>
|
||||
(first_different .. cmp::min(first_different + count, *start), 0), // gap at the start
|
||||
_ => {
|
||||
prev = next;
|
||||
continue
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// crop to peers best
|
||||
if range.start >= peer_best {
|
||||
trace!(target: "sync", "Out of range for peer {} ({} vs {})", peer_id, range.start, peer_best);
|
||||
return None;
|
||||
}
|
||||
range.end = cmp::min(peer_best + 1, range.end);
|
||||
|
||||
self.peer_requests.insert(peer_id, range.start);
|
||||
self.blocks.insert(range.start, BlockRangeState::Downloading{ len: range.end - range.start, downloading: downloading + 1 });
|
||||
Some(range)
|
||||
}
|
||||
|
||||
/// Get a valid chain of blocks ordered in descending order and ready for importing into blockchain.
|
||||
pub fn drain(&mut self, from: BlockNumber) -> Vec<BlockData> {
|
||||
let mut drained = Vec::new();
|
||||
let mut ranges = Vec::new();
|
||||
{
|
||||
let mut prev = from;
|
||||
for (start, range_data) in &mut self.blocks {
|
||||
match range_data {
|
||||
&mut BlockRangeState::Complete(ref mut blocks) if *start <= prev => {
|
||||
prev = *start + blocks.len() as BlockNumber;
|
||||
let mut blocks = mem::replace(blocks, Vec::new());
|
||||
drained.append(&mut blocks);
|
||||
ranges.push(*start);
|
||||
},
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
for r in ranges {
|
||||
self.blocks.remove(&r);
|
||||
}
|
||||
trace!(target: "sync", "Drained {} blocks", drained.len());
|
||||
drained
|
||||
}
|
||||
|
||||
pub fn clear_peer_download(&mut self, peer_id: PeerId) {
|
||||
match self.peer_requests.entry(peer_id) {
|
||||
Entry::Occupied(entry) => {
|
||||
let start = entry.remove();
|
||||
let remove = match self.blocks.get_mut(&start) {
|
||||
Some(&mut BlockRangeState::Downloading { ref mut downloading, .. }) if *downloading > 1 => {
|
||||
*downloading = *downloading - 1;
|
||||
false
|
||||
},
|
||||
Some(&mut BlockRangeState::Downloading { .. }) => {
|
||||
true
|
||||
},
|
||||
_ => {
|
||||
debug_assert!(false);
|
||||
false
|
||||
}
|
||||
};
|
||||
if remove {
|
||||
self.blocks.remove(&start);
|
||||
}
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::{BlockCollection, BlockData};
|
||||
use message;
|
||||
use primitives::block::HeaderHash;
|
||||
|
||||
fn is_empty(bc: &BlockCollection) -> bool {
|
||||
bc.blocks.is_empty() &&
|
||||
bc.peer_requests.is_empty()
|
||||
}
|
||||
|
||||
fn generate_blocks(n: usize) -> Vec<message::BlockData> {
|
||||
(0 .. n).map(|_| message::BlockData {
|
||||
hash: HeaderHash::random(),
|
||||
header: None,
|
||||
body: None,
|
||||
message_queue: None,
|
||||
receipt: None,
|
||||
}).collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn create_clear() {
|
||||
let mut bc = BlockCollection::new();
|
||||
assert!(is_empty(&bc));
|
||||
bc.insert(1, generate_blocks(100), 0);
|
||||
assert!(!is_empty(&bc));
|
||||
bc.clear();
|
||||
assert!(is_empty(&bc));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn insert_blocks() {
|
||||
let mut bc = BlockCollection::new();
|
||||
assert!(is_empty(&bc));
|
||||
let peer0 = 0;
|
||||
let peer1 = 1;
|
||||
let peer2 = 2;
|
||||
|
||||
let blocks = generate_blocks(150);
|
||||
assert_eq!(bc.needed_blocks(peer0, 40, 150, 0), Some(1 .. 41));
|
||||
assert_eq!(bc.needed_blocks(peer1, 40, 150, 0), Some(41 .. 81));
|
||||
assert_eq!(bc.needed_blocks(peer2, 40, 150, 0), Some(81 .. 121));
|
||||
|
||||
bc.clear_peer_download(peer1);
|
||||
bc.insert(41, blocks[41..81].to_vec(), peer1);
|
||||
assert_eq!(bc.drain(1), vec![]);
|
||||
assert_eq!(bc.needed_blocks(peer1, 40, 150, 0), Some(121 .. 151));
|
||||
bc.clear_peer_download(peer0);
|
||||
bc.insert(1, blocks[1..11].to_vec(), peer0);
|
||||
|
||||
assert_eq!(bc.needed_blocks(peer0, 40, 150, 0), Some(11 .. 41));
|
||||
assert_eq!(bc.drain(1), blocks[1..11].iter().map(|b| BlockData { block: b.clone(), origin: 0 }).collect::<Vec<_>>());
|
||||
|
||||
bc.clear_peer_download(peer0);
|
||||
bc.insert(11, blocks[11..41].to_vec(), peer0);
|
||||
|
||||
let drained = bc.drain(12);
|
||||
assert_eq!(drained[..30], blocks[11..41].iter().map(|b| BlockData { block: b.clone(), origin: 0 }).collect::<Vec<_>>()[..]);
|
||||
assert_eq!(drained[30..], blocks[41..81].iter().map(|b| BlockData { block: b.clone(), origin: 1 }).collect::<Vec<_>>()[..]);
|
||||
|
||||
bc.clear_peer_download(peer2);
|
||||
assert_eq!(bc.needed_blocks(peer2, 40, 150, 80), Some(81 .. 121));
|
||||
bc.clear_peer_download(peer2);
|
||||
bc.insert(81, blocks[81..121].to_vec(), peer2);
|
||||
bc.clear_peer_download(peer1);
|
||||
bc.insert(121, blocks[121..150].to_vec(), peer1);
|
||||
|
||||
assert_eq!(bc.drain(80), vec![]);
|
||||
let drained = bc.drain(81);
|
||||
assert_eq!(drained[..40], blocks[81..121].iter().map(|b| BlockData { block: b.clone(), origin: 2 }).collect::<Vec<_>>()[..]);
|
||||
assert_eq!(drained[40..], blocks[121..150].iter().map(|b| BlockData { block: b.clone(), origin: 1 }).collect::<Vec<_>>()[..]);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Blockchain access trait
|
||||
|
||||
use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus, BlockId};
|
||||
use client::error::Error;
|
||||
use state_machine;
|
||||
use primitives::block;
|
||||
|
||||
pub trait Client : Send + Sync {
|
||||
/// Given a hash return a header
|
||||
fn import(&self, header: block::Header, body: Option<block::Body>) -> Result<ImportResult, Error>;
|
||||
|
||||
/// Get blockchain info.
|
||||
fn info(&self) -> Result<ClientInfo, Error>;
|
||||
|
||||
/// Get block status.
|
||||
fn block_status(&self, id: &BlockId) -> Result<BlockStatus, Error>;
|
||||
|
||||
/// Get block hash by number.
|
||||
fn block_hash(&self, block_number: block::Number) -> Result<Option<block::HeaderHash>, Error>;
|
||||
|
||||
/// Get block header.
|
||||
fn header(&self, id: &BlockId) -> Result<Option<block::Header>, Error>;
|
||||
|
||||
/// Get block body.
|
||||
fn body(&self, id: &BlockId) -> Result<Option<block::Body>, Error>;
|
||||
}
|
||||
|
||||
impl<B, E> Client for PolkadotClient<B, E> where
|
||||
B: client::backend::Backend + Send + Sync + 'static,
|
||||
E: state_machine::CodeExecutor + Send + Sync + 'static,
|
||||
Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>, {
|
||||
|
||||
fn import(&self, header: block::Header, body: Option<block::Body>) -> Result<ImportResult, Error> {
|
||||
(self as &PolkadotClient<B, E>).import_block(header, body)
|
||||
}
|
||||
|
||||
fn info(&self) -> Result<ClientInfo, Error> {
|
||||
(self as &PolkadotClient<B, E>).info()
|
||||
}
|
||||
|
||||
fn block_status(&self, id: &BlockId) -> Result<BlockStatus, Error> {
|
||||
(self as &PolkadotClient<B, E>).block_status(id)
|
||||
}
|
||||
|
||||
fn block_hash(&self, block_number: block::Number) -> Result<Option<block::HeaderHash>, Error> {
|
||||
(self as &PolkadotClient<B, E>).block_hash(block_number)
|
||||
}
|
||||
|
||||
fn header(&self, id: &BlockId) -> Result<Option<block::Header>, Error> {
|
||||
(self as &PolkadotClient<B, E>).header(id)
|
||||
}
|
||||
|
||||
fn body(&self, id: &BlockId) -> Result<Option<block::Body>, Error> {
|
||||
(self as &PolkadotClient<B, E>).body(id)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
|
||||
|
||||
use service::Role;
|
||||
|
||||
/// Protocol configuration
|
||||
#[derive(Clone)]
|
||||
pub struct ProtocolConfig {
|
||||
pub roles: Role,
|
||||
}
|
||||
|
||||
impl Default for ProtocolConfig {
|
||||
fn default() -> ProtocolConfig {
|
||||
ProtocolConfig {
|
||||
roles: Role::FULL,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
|
||||
|
||||
use network::Error as NetworkError;
|
||||
use client;
|
||||
|
||||
error_chain! {
|
||||
foreign_links {
|
||||
Network(NetworkError) #[doc = "Devp2p error."];
|
||||
}
|
||||
|
||||
links {
|
||||
Client(client::error::Error, client::error::ErrorKind);
|
||||
}
|
||||
|
||||
errors {
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
|
||||
|
||||
use network::{NetworkContext, PeerId, Error as NetworkError, SessionInfo};
|
||||
|
||||
/// IO interface for the syncing handler.
|
||||
/// Provides peer connection management and an interface to the blockchain client.
|
||||
pub trait SyncIo {
|
||||
/// Disable a peer
|
||||
fn disable_peer(&mut self, peer_id: PeerId);
|
||||
/// Disconnect peer
|
||||
fn disconnect_peer(&mut self, peer_id: PeerId);
|
||||
/// Send a packet to a peer.
|
||||
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) -> Result<(), NetworkError>;
|
||||
/// Returns peer identifier string
|
||||
fn peer_info(&self, peer_id: PeerId) -> String {
|
||||
peer_id.to_string()
|
||||
}
|
||||
/// Returns information on p2p session
|
||||
fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo>;
|
||||
/// Check if the session is expired
|
||||
fn is_expired(&self) -> bool;
|
||||
}
|
||||
|
||||
/// Wraps `NetworkContext` and the blockchain client
|
||||
pub struct NetSyncIo<'s, 'h> where 'h: 's {
|
||||
network: &'s NetworkContext<'h>,
|
||||
}
|
||||
|
||||
impl<'s, 'h> NetSyncIo<'s, 'h> {
|
||||
/// Creates a new instance from the `NetworkContext` and the blockchain client reference.
|
||||
pub fn new(network: &'s NetworkContext<'h>) -> NetSyncIo<'s, 'h> {
|
||||
NetSyncIo {
|
||||
network: network,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
|
||||
fn disable_peer(&mut self, peer_id: PeerId) {
|
||||
self.network.disable_peer(peer_id);
|
||||
}
|
||||
|
||||
fn disconnect_peer(&mut self, peer_id: PeerId) {
|
||||
self.network.disconnect_peer(peer_id);
|
||||
}
|
||||
|
||||
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) -> Result<(), NetworkError>{
|
||||
self.network.send(peer_id, 0, data)
|
||||
}
|
||||
|
||||
fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo> {
|
||||
self.network.session_info(peer_id)
|
||||
}
|
||||
|
||||
fn is_expired(&self) -> bool {
|
||||
self.network.is_expired()
|
||||
}
|
||||
|
||||
fn peer_info(&self, peer_id: PeerId) -> String {
|
||||
self.network.peer_client_version(peer_id)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
|
||||
|
||||
#![warn(missing_docs)]
|
||||
|
||||
//! Implements polkadot protocol version as specified here:
|
||||
//! https://github.com/paritytech/polkadot/wiki/Network-protocol
|
||||
|
||||
extern crate ethcore_network as network;
|
||||
extern crate ethcore_io as core_io;
|
||||
extern crate rand;
|
||||
extern crate parking_lot;
|
||||
extern crate substrate_primitives as primitives;
|
||||
extern crate substrate_state_machine as state_machine;
|
||||
extern crate substrate_serializer as ser;
|
||||
extern crate substrate_client as client;
|
||||
extern crate serde;
|
||||
extern crate serde_json;
|
||||
#[macro_use] extern crate serde_derive;
|
||||
#[macro_use] extern crate log;
|
||||
#[macro_use] extern crate bitflags;
|
||||
#[macro_use] extern crate error_chain;
|
||||
|
||||
mod service;
|
||||
mod sync;
|
||||
mod protocol;
|
||||
mod io;
|
||||
mod message;
|
||||
mod error;
|
||||
mod config;
|
||||
mod chain;
|
||||
mod blocks;
|
||||
|
||||
#[cfg(test)]
|
||||
mod test;
|
||||
|
||||
#[cfg(test)]
|
||||
extern crate substrate_executor;
|
||||
#[cfg(test)]
|
||||
extern crate env_logger;
|
||||
|
||||
pub use service::Service;
|
||||
pub use protocol::{ProtocolStatus};
|
||||
pub use sync::{Status as SyncStatus, SyncState};
|
||||
pub use network::{NonReservedPeerMode, ConnectionFilter, ConnectionDirection, NetworkConfiguration};
|
||||
|
||||
// TODO: move it elsewhere
|
||||
fn header_hash(header: &primitives::Header) -> primitives::block::HeaderHash {
|
||||
primitives::hashing::blake2_256(&ser::to_vec(header)).into()
|
||||
}
|
||||
@@ -0,0 +1,188 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
|
||||
|
||||
//! Network packet message types. These get serialized and put into the lower level protocol payload.
|
||||
|
||||
use std::borrow::Borrow;
|
||||
use primitives::AuthorityId;
|
||||
use primitives::block::{Number as BlockNumber, HeaderHash, Header, Body};
|
||||
use service::Role as RoleFlags;
|
||||
|
||||
pub type RequestId = u64;
|
||||
type Bytes = Vec<u8>;
|
||||
|
||||
type Signature = ::primitives::hash::H256; //TODO:
|
||||
|
||||
/// Configured node role.
|
||||
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum Role {
|
||||
/// Full relay chain client with no additional responsibilities.
|
||||
Full,
|
||||
/// Relay chain light client.
|
||||
Light,
|
||||
/// Parachain validator.
|
||||
Validator,
|
||||
/// Parachain collator.
|
||||
Collator,
|
||||
}
|
||||
|
||||
impl<T> From<T> for RoleFlags where T: Borrow<[Role]> {
|
||||
fn from(roles: T) -> RoleFlags {
|
||||
let mut flags = RoleFlags::NONE;
|
||||
let roles: &[Role] = roles.borrow();
|
||||
for r in roles {
|
||||
match *r {
|
||||
Role::Full => flags = flags | RoleFlags::FULL,
|
||||
Role::Light => flags = flags | RoleFlags::LIGHT,
|
||||
Role::Validator => flags = flags | RoleFlags::VALIDATOR,
|
||||
Role::Collator => flags = flags | RoleFlags::COLLATOR,
|
||||
}
|
||||
}
|
||||
flags
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RoleFlags> for Vec<Role> where {
|
||||
fn from(flags: RoleFlags) -> Vec<Role> {
|
||||
let mut roles = Vec::new();
|
||||
if !(flags & RoleFlags::FULL).is_empty() {
|
||||
roles.push(Role::Full);
|
||||
}
|
||||
if !(flags & RoleFlags::LIGHT).is_empty() {
|
||||
roles.push(Role::Light);
|
||||
}
|
||||
if !(flags & RoleFlags::VALIDATOR).is_empty() {
|
||||
roles.push(Role::Validator);
|
||||
}
|
||||
if !(flags & RoleFlags::COLLATOR).is_empty() {
|
||||
roles.push(Role::Collator);
|
||||
}
|
||||
roles
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Copy, Clone)]
|
||||
/// Bits of block data and associated artefacts to request.
|
||||
pub enum BlockAttribute {
|
||||
/// Include block header.
|
||||
Header,
|
||||
/// Include block body.
|
||||
Body,
|
||||
/// Include block receipt.
|
||||
Receipt,
|
||||
/// Include block message queue.
|
||||
MessageQueue,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
/// Block data sent in the response.
|
||||
pub struct BlockData {
|
||||
/// Block header hash.
|
||||
pub hash: HeaderHash,
|
||||
/// Block header if requested.
|
||||
pub header: Option<Header>,
|
||||
/// Block body if requested.
|
||||
pub body: Option<Body>,
|
||||
/// Block receipt if requested.
|
||||
pub receipt: Option<Bytes>,
|
||||
/// Block message queue if requested.
|
||||
pub message_queue: Option<Bytes>,
|
||||
}
|
||||
|
||||
#[serde(untagged)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
/// Identifies starting point of a block sequence.
|
||||
pub enum FromBlock {
|
||||
/// Start with given hash.
|
||||
Hash(HeaderHash),
|
||||
/// Start with given block number.
|
||||
Number(BlockNumber),
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
/// Block enumeration direction.
|
||||
pub enum Direction {
|
||||
/// Enumerate in ascending order (from child to parent).
|
||||
Ascending,
|
||||
/// Enumerate in descendfing order (from parent to canonical child).
|
||||
Descending,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
/// A network message.
|
||||
pub enum Message {
|
||||
/// Status packet.
|
||||
Status(Status),
|
||||
/// Block request.
|
||||
BlockRequest(BlockRequest),
|
||||
/// Block response.
|
||||
BlockResponse(BlockResponse),
|
||||
/// Block announce.
|
||||
BlockAnnounce(BlockAnnounce),
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct Status {
|
||||
/// Protocol version.
|
||||
pub version: u32,
|
||||
/// Supported roles.
|
||||
pub roles: Vec<Role>,
|
||||
/// Best block number.
|
||||
pub best_number: BlockNumber,
|
||||
/// Best block hash.
|
||||
pub best_hash: HeaderHash,
|
||||
/// Genesis block hash.
|
||||
pub genesis_hash: HeaderHash,
|
||||
/// Signatue of `best_hash` made with validator address. Required for the validator role.
|
||||
pub validator_signature: Option<Signature>,
|
||||
/// Validator address. Required for the validator role.
|
||||
pub validator_id: Option<AuthorityId>,
|
||||
/// Parachain id. Required for the collator role.
|
||||
pub parachain_id: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
/// Request block data from a peer.
|
||||
pub struct BlockRequest {
|
||||
/// Unique request id.
|
||||
pub id: RequestId,
|
||||
/// Bits of block data to request.
|
||||
pub fields: Vec<BlockAttribute>,
|
||||
/// Start from this block.
|
||||
pub from: FromBlock,
|
||||
/// End at this block. An implementation defined maximum is used when unspecified.
|
||||
pub to: Option<HeaderHash>,
|
||||
/// Sequence direction.
|
||||
pub direction: Direction,
|
||||
/// Maximum number of blocks to return. An implementation defined maximum is used when unspecified.
|
||||
pub max: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
/// Response to `BlockRequest`
|
||||
pub struct BlockResponse {
|
||||
/// Id of a request this response was made for.
|
||||
pub id: RequestId,
|
||||
/// Block data for the requested sequence.
|
||||
pub blocks: Vec<BlockData>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
/// Announce a new complete relay chain block on the network.
|
||||
pub struct BlockAnnounce {
|
||||
/// New block header.
|
||||
pub header: Header,
|
||||
}
|
||||
@@ -0,0 +1,396 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
|
||||
|
||||
use std::collections::{HashMap, HashSet, BTreeMap};
|
||||
use std::{mem, cmp};
|
||||
use std::sync::Arc;
|
||||
use parking_lot::RwLock;
|
||||
use serde_json;
|
||||
use std::time;
|
||||
use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header};
|
||||
use network::{PeerId, NodeId};
|
||||
|
||||
use message::{self, Message};
|
||||
use sync::{ChainSync, Status as SyncStatus};
|
||||
use service::Role;
|
||||
use config::ProtocolConfig;
|
||||
use chain::Client;
|
||||
use io::SyncIo;
|
||||
use error;
|
||||
use client::BlockId;
|
||||
use super::header_hash;
|
||||
|
||||
const REQUEST_TIMEOUT_SEC: u64 = 15;
|
||||
const PROTOCOL_VERSION: u32 = 0;
|
||||
|
||||
// Maximum allowed entries in `BlockResponse`
|
||||
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
|
||||
|
||||
// Lock must always be taken in order declared here.
|
||||
pub struct Protocol {
|
||||
config: ProtocolConfig,
|
||||
chain: Arc<Client>,
|
||||
genesis_hash: HeaderHash,
|
||||
sync: RwLock<ChainSync>,
|
||||
/// All connected peers
|
||||
peers: RwLock<HashMap<PeerId, Peer>>,
|
||||
/// Connected peers pending Status message.
|
||||
handshaking_peers: RwLock<HashMap<PeerId, time::Instant>>,
|
||||
}
|
||||
|
||||
/// Syncing status and statistics
|
||||
#[derive(Clone)]
|
||||
pub struct ProtocolStatus {
|
||||
/// Sync status.
|
||||
pub sync: SyncStatus,
|
||||
/// Total number of connected peers
|
||||
pub num_peers: usize,
|
||||
/// Total number of active peers.
|
||||
pub num_active_peers: usize,
|
||||
}
|
||||
|
||||
/// Peer information
|
||||
struct Peer {
|
||||
/// Protocol version
|
||||
protocol_version: u32,
|
||||
/// Roles
|
||||
roles: Role,
|
||||
/// Peer best block hash
|
||||
best_hash: HeaderHash,
|
||||
/// Peer best block number
|
||||
best_number: BlockNumber,
|
||||
/// Pending block request if any
|
||||
block_request: Option<message::BlockRequest>,
|
||||
/// Request timestamp
|
||||
request_timestamp: Option<time::Instant>,
|
||||
/// Holds a set of transactions recently sent to this peer to avoid spamming.
|
||||
_last_sent_transactions: HashSet<TransactionHash>,
|
||||
/// Request counter,
|
||||
next_request_id: message::RequestId,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PeerInfo {
|
||||
/// Roles
|
||||
pub roles: Role,
|
||||
/// Protocol version
|
||||
pub protocol_version: u32,
|
||||
/// Peer best block hash
|
||||
pub best_hash: HeaderHash,
|
||||
/// Peer best block number
|
||||
pub best_number: BlockNumber,
|
||||
}
|
||||
|
||||
/// Transaction stats
|
||||
#[derive(Debug)]
|
||||
pub struct TransactionStats {
|
||||
/// Block number where this TX was first seen.
|
||||
pub first_seen: u64,
|
||||
/// Peers it was propagated to.
|
||||
pub propagated_to: BTreeMap<NodeId, usize>,
|
||||
}
|
||||
|
||||
impl Protocol {
|
||||
/// Create a new instance.
|
||||
pub fn new(config: ProtocolConfig, chain: Arc<Client>) -> error::Result<Protocol> {
|
||||
let info = chain.info()?;
|
||||
let protocol = Protocol {
|
||||
config: config,
|
||||
chain: chain,
|
||||
genesis_hash: info.chain.genesis_hash,
|
||||
sync: RwLock::new(ChainSync::new(&info)),
|
||||
peers: RwLock::new(HashMap::new()),
|
||||
handshaking_peers: RwLock::new(HashMap::new()),
|
||||
};
|
||||
Ok(protocol)
|
||||
}
|
||||
|
||||
/// Returns protocol status
|
||||
pub fn status(&self) -> ProtocolStatus {
|
||||
let sync = self.sync.read();
|
||||
let peers = self.peers.read();
|
||||
ProtocolStatus {
|
||||
sync: sync.status(),
|
||||
num_peers: peers.values().count(),
|
||||
num_active_peers: peers.values().filter(|p| p.block_request.is_some()).count(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_packet(&self, io: &mut SyncIo, peer_id: PeerId, data: &[u8]) {
|
||||
let message: Message = match serde_json::from_slice(data) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
debug!("Invalid packet from {}: {}", peer_id, e);
|
||||
io.disable_peer(peer_id);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match message {
|
||||
Message::Status(s) => self.on_status_message(io, peer_id, s),
|
||||
Message::BlockRequest(r) => self.on_block_request(io, peer_id, r),
|
||||
Message::BlockResponse(r) => {
|
||||
let request = {
|
||||
let mut peers = self.peers.write();
|
||||
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
|
||||
peer.request_timestamp = None;
|
||||
match mem::replace(&mut peer.block_request, None) {
|
||||
Some(r) => r,
|
||||
None => {
|
||||
debug!("Unexpected response packet from {}", peer_id);
|
||||
io.disable_peer(peer_id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("Unexpected packet from {}", peer_id);
|
||||
io.disable_peer(peer_id);
|
||||
return;
|
||||
}
|
||||
};
|
||||
if request.id != r.id {
|
||||
trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", peer_id, request.id, r.id);
|
||||
return;
|
||||
}
|
||||
self.on_block_response(io, peer_id, request, r);
|
||||
},
|
||||
Message::BlockAnnounce(announce) => {
|
||||
self.on_block_announce(io, peer_id, announce);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send_message(&self, io: &mut SyncIo, peer_id: PeerId, mut message: Message) {
|
||||
let mut peers = self.peers.write();
|
||||
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
|
||||
match &mut message {
|
||||
&mut Message::BlockRequest(ref mut r) => {
|
||||
r.id = peer.next_request_id;
|
||||
peer.next_request_id = peer.next_request_id + 1;
|
||||
peer.block_request = Some(r.clone());
|
||||
peer.request_timestamp = Some(time::Instant::now());
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
let data = serde_json::to_vec(&message).expect("Serializer is infallible; qed");
|
||||
if let Err(e) = io.send(peer_id, data) {
|
||||
debug!(target:"sync", "Error sending message: {:?}", e);
|
||||
io.disconnect_peer(peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Called when a new peer is connected
|
||||
pub fn on_peer_connected(&self, io: &mut SyncIo, peer_id: PeerId) {
|
||||
trace!(target: "sync", "Connected {}: {}", peer_id, io.peer_info(peer_id));
|
||||
self.handshaking_peers.write().insert(peer_id, time::Instant::now());
|
||||
self.send_status(io, peer_id);
|
||||
}
|
||||
|
||||
/// Called by peer when it is disconnecting
|
||||
pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: PeerId) {
|
||||
trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_info(peer));
|
||||
let removed = {
|
||||
let mut peers = self.peers.write();
|
||||
let mut handshaking_peers = self.handshaking_peers.write();
|
||||
handshaking_peers.remove(&peer);
|
||||
peers.remove(&peer).is_some()
|
||||
};
|
||||
if removed {
|
||||
self.sync.write().peer_disconnected(io, self, peer);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn on_block_request(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest) {
|
||||
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, request.from, request.to, request.max);
|
||||
let mut blocks = Vec::new();
|
||||
let mut id = match request.from {
|
||||
message::FromBlock::Hash(h) => BlockId::Hash(h),
|
||||
message::FromBlock::Number(n) => BlockId::Number(n),
|
||||
};
|
||||
let max = cmp::min(request.max.unwrap_or(u32::max_value()), MAX_BLOCK_DATA_RESPONSE) as usize;
|
||||
// TODO: receipts, etc.
|
||||
let (mut get_header, mut get_body) = (false, false);
|
||||
for a in request.fields {
|
||||
match a {
|
||||
message::BlockAttribute::Header => get_header = true,
|
||||
message::BlockAttribute::Body => get_body = true,
|
||||
message::BlockAttribute::Receipt => unimplemented!(),
|
||||
message::BlockAttribute::MessageQueue => unimplemented!(),
|
||||
}
|
||||
}
|
||||
while let Some(header) = self.chain.header(&id).unwrap_or(None) {
|
||||
if blocks.len() >= max{
|
||||
break;
|
||||
}
|
||||
let number = header.number;
|
||||
let hash = header_hash(&header);
|
||||
let block_data = message::BlockData {
|
||||
hash: hash,
|
||||
header: if get_header { Some(header) } else { None },
|
||||
body: if get_body { self.chain.body(&BlockId::Hash(hash)).unwrap_or(None) } else { None },
|
||||
receipt: None,
|
||||
message_queue: None,
|
||||
};
|
||||
blocks.push(block_data);
|
||||
match request.direction {
|
||||
message::Direction::Ascending => id = BlockId::Number(number + 1),
|
||||
message::Direction::Descending => {
|
||||
if number == 0 {
|
||||
break;
|
||||
}
|
||||
id = BlockId::Number(number - 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
let response = message::BlockResponse {
|
||||
id: request.id,
|
||||
blocks: blocks,
|
||||
};
|
||||
self.send_message(io, peer, Message::BlockResponse(response))
|
||||
}
|
||||
|
||||
pub fn on_block_response(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest, response: message::BlockResponse) {
|
||||
// TODO: validate response
|
||||
trace!(target: "sync", "BlockResponse {} from {} with {} blocks", response.id, peer, response.blocks.len());
|
||||
self.sync.write().on_block_data(io, self, peer, request, response);
|
||||
}
|
||||
|
||||
pub fn tick(&self, io: &mut SyncIo) {
|
||||
self.maintain_peers(io);
|
||||
}
|
||||
|
||||
fn maintain_peers(&self, io: &mut SyncIo) {
|
||||
let tick = time::Instant::now();
|
||||
let mut aborting = Vec::new();
|
||||
{
|
||||
let peers = self.peers.read();
|
||||
let handshaking_peers = self.handshaking_peers.read();
|
||||
for (peer_id, timestamp) in peers.iter()
|
||||
.filter_map(|(id, peer)| peer.request_timestamp.as_ref().map(|r| (id, r)))
|
||||
.chain(handshaking_peers.iter()) {
|
||||
if (tick - *timestamp).as_secs() > REQUEST_TIMEOUT_SEC {
|
||||
trace!(target: "sync", "Timeout {}", peer_id);
|
||||
io.disconnect_peer(*peer_id);
|
||||
aborting.push(*peer_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
for p in aborting {
|
||||
self.on_peer_disconnected(io, p);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn peer_info(&self, peer: PeerId) -> Option<PeerInfo> {
|
||||
self.peers.read().get(&peer).map(|p| {
|
||||
PeerInfo {
|
||||
roles: p.roles,
|
||||
protocol_version: p.protocol_version,
|
||||
best_hash: p.best_hash,
|
||||
best_number: p.best_number,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Called by peer to report status
|
||||
fn on_status_message(&self, io: &mut SyncIo, peer_id: PeerId, status: message::Status) {
|
||||
trace!(target: "sync", "New peer {} {:?}", peer_id, status);
|
||||
if io.is_expired() {
|
||||
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
|
||||
return;
|
||||
}
|
||||
|
||||
{
|
||||
let mut peers = self.peers.write();
|
||||
let mut handshaking_peers = self.handshaking_peers.write();
|
||||
if peers.contains_key(&peer_id) {
|
||||
debug!(target: "sync", "Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
|
||||
return;
|
||||
}
|
||||
if status.genesis_hash != self.genesis_hash {
|
||||
io.disable_peer(peer_id);
|
||||
trace!(target: "sync", "Peer {} genesis hash mismatch (ours: {}, theirs: {})", peer_id, self.genesis_hash, status.genesis_hash);
|
||||
return;
|
||||
}
|
||||
if status.version != PROTOCOL_VERSION {
|
||||
io.disable_peer(peer_id);
|
||||
trace!(target: "sync", "Peer {} unsupported eth protocol ({})", peer_id, status.version);
|
||||
return;
|
||||
}
|
||||
|
||||
let peer = Peer {
|
||||
protocol_version: status.version,
|
||||
roles: status.roles.into(),
|
||||
best_hash: status.best_hash,
|
||||
best_number: status.best_number,
|
||||
block_request: None,
|
||||
request_timestamp: None,
|
||||
_last_sent_transactions: HashSet::new(),
|
||||
next_request_id: 0,
|
||||
};
|
||||
peers.insert(peer_id.clone(), peer);
|
||||
handshaking_peers.remove(&peer_id);
|
||||
debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id));
|
||||
}
|
||||
self.sync.write().new_peer(io, self, peer_id);
|
||||
}
|
||||
|
||||
/// Send Status message
|
||||
fn send_status(&self, io: &mut SyncIo, peer_id: PeerId) {
|
||||
if let Ok(info) = self.chain.info() {
|
||||
let status = message::Status {
|
||||
version: PROTOCOL_VERSION,
|
||||
genesis_hash: info.chain.genesis_hash,
|
||||
roles: self.config.roles.into(),
|
||||
best_number: info.chain.best_number,
|
||||
best_hash: info.chain.best_hash,
|
||||
validator_signature: None,
|
||||
validator_id: None,
|
||||
parachain_id: None,
|
||||
};
|
||||
self.send_message(io, peer_id, Message::Status(status))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn abort(&self) {
|
||||
let mut sync = self.sync.write();
|
||||
let mut peers = self.peers.write();
|
||||
let mut handshaking_peers = self.handshaking_peers.write();
|
||||
sync.clear();
|
||||
peers.clear();
|
||||
handshaking_peers.clear();
|
||||
}
|
||||
|
||||
pub fn on_block_announce(&self, io: &mut SyncIo, peer_id: PeerId, announce: message::BlockAnnounce) {
|
||||
let header = announce.header;
|
||||
self.sync.write().on_block_announce(io, self, peer_id, &header);
|
||||
}
|
||||
|
||||
pub fn on_block_imported(&self, header: &Header) {
|
||||
self.sync.write().update_chain_info(&header);
|
||||
}
|
||||
|
||||
pub fn on_new_transactions(&self) {
|
||||
}
|
||||
|
||||
pub fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> {
|
||||
BTreeMap::new()
|
||||
}
|
||||
|
||||
pub fn chain(&self) -> &Client {
|
||||
&*self.chain
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,239 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::collections::{BTreeMap};
|
||||
use std::io;
|
||||
use network::{NetworkProtocolHandler, NetworkService, NetworkContext, HostInfo, PeerId, ProtocolId,
|
||||
NetworkConfiguration , NonReservedPeerMode, ErrorKind};
|
||||
use primitives::block::{TransactionHash, Header};
|
||||
use core_io::{TimerToken};
|
||||
use io::NetSyncIo;
|
||||
use protocol::{Protocol, ProtocolStatus, PeerInfo as ProtocolPeerInfo, TransactionStats};
|
||||
use config::{ProtocolConfig};
|
||||
use error::Error;
|
||||
use chain::Client;
|
||||
|
||||
/// Polkadot devp2p protocol id
|
||||
pub const DOT_PROTOCOL_ID: ProtocolId = *b"dot";
|
||||
|
||||
bitflags! {
|
||||
pub struct Role: u32 {
|
||||
const NONE = 0b00000000;
|
||||
const FULL = 0b00000001;
|
||||
const LIGHT = 0b00000010;
|
||||
const VALIDATOR = 0b00000100;
|
||||
const COLLATOR = 0b00001000;
|
||||
}
|
||||
}
|
||||
|
||||
/// Sync status
|
||||
pub trait SyncProvider: Send + Sync {
|
||||
/// Get sync status
|
||||
fn status(&self) -> ProtocolStatus;
|
||||
/// Get peers information
|
||||
fn peers(&self) -> Vec<PeerInfo>;
|
||||
/// Get this node id if available.
|
||||
fn node_id(&self) -> Option<String>;
|
||||
/// Returns propagation count for pending transactions.
|
||||
fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats>;
|
||||
}
|
||||
|
||||
/// Peer connection information
|
||||
#[derive(Debug)]
|
||||
pub struct PeerInfo {
|
||||
/// Public node id
|
||||
pub id: Option<String>,
|
||||
/// Node client ID
|
||||
pub client_version: String,
|
||||
/// Capabilities
|
||||
pub capabilities: Vec<String>,
|
||||
/// Remote endpoint address
|
||||
pub remote_address: String,
|
||||
/// Local endpoint address
|
||||
pub local_address: String,
|
||||
/// Dot protocol info.
|
||||
pub dot_info: Option<ProtocolPeerInfo>,
|
||||
}
|
||||
|
||||
/// Service initialization parameters.
|
||||
pub struct Params {
|
||||
/// Configuration.
|
||||
pub config: ProtocolConfig,
|
||||
/// Network layer configuration.
|
||||
pub network_config: NetworkConfiguration,
|
||||
/// Polkadot relay chain access point.
|
||||
pub chain: Arc<Client>,
|
||||
}
|
||||
|
||||
/// Polkadot network service. Handles network IO and manages connectivity.
|
||||
pub struct Service {
|
||||
/// Network service
|
||||
network: NetworkService,
|
||||
/// Devp2p protocol handler
|
||||
handler: Arc<ProtocolHandler>,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
/// Creates and register protocol with the network service
|
||||
pub fn new(params: Params) -> Result<Arc<Service>, Error> {
|
||||
|
||||
let service = NetworkService::new(params.network_config.clone(), None)?;
|
||||
|
||||
let sync = Arc::new(Service {
|
||||
network: service,
|
||||
handler: Arc::new(ProtocolHandler {
|
||||
protocol: Protocol::new(params.config, params.chain.clone())?,
|
||||
}),
|
||||
});
|
||||
|
||||
Ok(sync)
|
||||
}
|
||||
|
||||
/// Called when a new block is imported by the client.
|
||||
pub fn on_block_imported(&self, header: &Header) {
|
||||
self.handler.protocol.on_block_imported(header)
|
||||
}
|
||||
|
||||
/// Called when new transactons are imported by the client.
|
||||
pub fn on_new_transactions(&self) {
|
||||
self.handler.protocol.on_new_transactions()
|
||||
}
|
||||
|
||||
fn start(&self) {
|
||||
match self.network.start().map_err(Into::into) {
|
||||
Err(ErrorKind::Io(ref e)) if e.kind() == io::ErrorKind::AddrInUse =>
|
||||
warn!("Network port {:?} is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option.", self.network.config().listen_address.expect("Listen address is not set.")),
|
||||
Err(err) => warn!("Error starting network: {}", err),
|
||||
_ => {},
|
||||
};
|
||||
self.network.register_protocol(self.handler.clone(), DOT_PROTOCOL_ID, 1, &[0u8])
|
||||
.unwrap_or_else(|e| warn!("Error registering polkadot protocol: {:?}", e));
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
self.handler.protocol.abort();
|
||||
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
|
||||
}
|
||||
}
|
||||
|
||||
impl SyncProvider for Service {
|
||||
/// Get sync status
|
||||
fn status(&self) -> ProtocolStatus {
|
||||
self.handler.protocol.status()
|
||||
}
|
||||
|
||||
/// Get sync peers
|
||||
fn peers(&self) -> Vec<PeerInfo> {
|
||||
self.network.with_context_eval(DOT_PROTOCOL_ID, |ctx| {
|
||||
let peer_ids = self.network.connected_peers();
|
||||
|
||||
peer_ids.into_iter().filter_map(|peer_id| {
|
||||
let session_info = match ctx.session_info(peer_id) {
|
||||
None => return None,
|
||||
Some(info) => info,
|
||||
};
|
||||
|
||||
Some(PeerInfo {
|
||||
id: session_info.id.map(|id| format!("{:x}", id)),
|
||||
client_version: session_info.client_version,
|
||||
capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(),
|
||||
remote_address: session_info.remote_address,
|
||||
local_address: session_info.local_address,
|
||||
dot_info: self.handler.protocol.peer_info(peer_id),
|
||||
})
|
||||
}).collect()
|
||||
}).unwrap_or_else(Vec::new)
|
||||
}
|
||||
|
||||
fn node_id(&self) -> Option<String> {
|
||||
self.network.external_url()
|
||||
}
|
||||
|
||||
fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> {
|
||||
self.handler.protocol.transactions_stats()
|
||||
}
|
||||
}
|
||||
|
||||
struct ProtocolHandler {
|
||||
/// Protocol handler
|
||||
protocol: Protocol,
|
||||
}
|
||||
|
||||
impl NetworkProtocolHandler for ProtocolHandler {
|
||||
fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) {
|
||||
io.register_timer(0, 1000).expect("Error registering sync timer");
|
||||
}
|
||||
|
||||
fn read(&self, io: &NetworkContext, peer: &PeerId, _packet_id: u8, data: &[u8]) {
|
||||
self.protocol.handle_packet(&mut NetSyncIo::new(io), *peer, data);
|
||||
}
|
||||
|
||||
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
|
||||
self.protocol.on_peer_connected(&mut NetSyncIo::new(io), *peer);
|
||||
}
|
||||
|
||||
fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
|
||||
self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer);
|
||||
}
|
||||
|
||||
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
|
||||
self.protocol.tick(&mut NetSyncIo::new(io));
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait for managing network
|
||||
pub trait ManageNetwork : Send + Sync {
|
||||
/// Set to allow unreserved peers to connect
|
||||
fn accept_unreserved_peers(&self);
|
||||
/// Set to deny unreserved peers to connect
|
||||
fn deny_unreserved_peers(&self);
|
||||
/// Remove reservation for the peer
|
||||
fn remove_reserved_peer(&self, peer: String) -> Result<(), String>;
|
||||
/// Add reserved peer
|
||||
fn add_reserved_peer(&self, peer: String) -> Result<(), String>;
|
||||
/// Start network
|
||||
fn start_network(&self);
|
||||
/// Stop network
|
||||
fn stop_network(&self);
|
||||
}
|
||||
|
||||
|
||||
impl ManageNetwork for Service {
|
||||
fn accept_unreserved_peers(&self) {
|
||||
self.network.set_non_reserved_mode(NonReservedPeerMode::Accept);
|
||||
}
|
||||
|
||||
fn deny_unreserved_peers(&self) {
|
||||
self.network.set_non_reserved_mode(NonReservedPeerMode::Deny);
|
||||
}
|
||||
|
||||
fn remove_reserved_peer(&self, peer: String) -> Result<(), String> {
|
||||
self.network.remove_reserved_peer(&peer).map_err(|e| format!("{:?}", e))
|
||||
}
|
||||
|
||||
fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
|
||||
self.network.add_reserved_peer(&peer).map_err(|e| format!("{:?}", e))
|
||||
}
|
||||
|
||||
fn start_network(&self) {
|
||||
self.start();
|
||||
}
|
||||
|
||||
fn stop_network(&self) {
|
||||
self.stop();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,409 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
|
||||
|
||||
use std::collections::HashMap;
|
||||
use io::SyncIo;
|
||||
use protocol::Protocol;
|
||||
use network::PeerId;
|
||||
use client::{ImportResult, BlockStatus, ClientInfo, BlockId};
|
||||
use primitives::block::{HeaderHash, Number as BlockNumber, Header};
|
||||
use blocks::{self, BlockCollection};
|
||||
use message::{self, Message};
|
||||
use super::header_hash;
|
||||
|
||||
// Maximum blocks to request in a single packet.
|
||||
const MAX_BLOCKS_TO_REQUEST: usize = 128;
|
||||
|
||||
struct PeerSync {
|
||||
pub common_hash: HeaderHash,
|
||||
pub common_number: BlockNumber,
|
||||
pub best_hash: HeaderHash,
|
||||
pub best_number: BlockNumber,
|
||||
pub state: PeerSyncState,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
||||
enum PeerSyncState {
|
||||
AncestorSearch(BlockNumber),
|
||||
Available,
|
||||
DownloadingNew(BlockNumber),
|
||||
DownloadingStale(HeaderHash),
|
||||
}
|
||||
|
||||
/// Relay chain sync strategy.
|
||||
pub struct ChainSync {
|
||||
genesis_hash: HeaderHash,
|
||||
peers: HashMap<PeerId, PeerSync>,
|
||||
blocks: BlockCollection,
|
||||
best_queued_number: BlockNumber,
|
||||
best_queued_hash: HeaderHash,
|
||||
required_block_attributes: Vec<message::BlockAttribute>,
|
||||
}
|
||||
|
||||
/// Reported sync state.
|
||||
#[derive(Clone, Eq, PartialEq, Debug)]
|
||||
pub enum SyncState {
|
||||
/// Initial sync is complete, keep-up sync is active.
|
||||
Idle,
|
||||
/// Actively catching up with the chain.
|
||||
Downloading
|
||||
}
|
||||
|
||||
/// Syncing status and statistics
|
||||
#[derive(Clone)]
|
||||
pub struct Status {
|
||||
/// Current global sync state.
|
||||
pub state: SyncState,
|
||||
/// Target sync block number.
|
||||
pub best_seen_block: Option<BlockNumber>,
|
||||
}
|
||||
|
||||
impl ChainSync {
|
||||
/// Create a new instance.
|
||||
pub fn new(info: &ClientInfo) -> ChainSync {
|
||||
ChainSync {
|
||||
genesis_hash: info.chain.genesis_hash,
|
||||
peers: HashMap::new(),
|
||||
blocks: BlockCollection::new(),
|
||||
best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash),
|
||||
best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number),
|
||||
required_block_attributes: vec![message::BlockAttribute::Header, message::BlockAttribute::Body],
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns sync status
|
||||
pub fn status(&self) -> Status {
|
||||
let best_seen = self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number);
|
||||
let state = match &best_seen {
|
||||
&Some(n) if n > self.best_queued_number && n - self.best_queued_number > 5 => SyncState::Downloading,
|
||||
_ => SyncState::Idle,
|
||||
};
|
||||
Status {
|
||||
state: state,
|
||||
best_seen_block: best_seen,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_peer(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) {
|
||||
if let Some(info) = protocol.peer_info(peer_id) {
|
||||
match (protocol.chain().block_status(&BlockId::Hash(info.best_hash)), info.best_number) {
|
||||
(Err(e), _) => {
|
||||
debug!(target:"sync", "Error reading blockchain: {:?}", e);
|
||||
io.disconnect_peer(peer_id);
|
||||
},
|
||||
(Ok(BlockStatus::KnownBad), _) => {
|
||||
debug!(target:"sync", "New peer with known bad best block {} ({}).", info.best_hash, info.best_number);
|
||||
io.disable_peer(peer_id);
|
||||
},
|
||||
(Ok(BlockStatus::Unknown), 0) => {
|
||||
debug!(target:"sync", "New peer with unkown genesis hash {} ({}).", info.best_hash, info.best_number);
|
||||
io.disable_peer(peer_id);
|
||||
},
|
||||
(Ok(BlockStatus::Unknown), _) => {
|
||||
let our_best = self.best_queued_number;
|
||||
if our_best > 0 {
|
||||
debug!(target:"sync", "New peer with unkown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number);
|
||||
self.peers.insert(peer_id, PeerSync {
|
||||
common_hash: self.genesis_hash,
|
||||
common_number: 0,
|
||||
best_hash: info.best_hash,
|
||||
best_number: info.best_number,
|
||||
state: PeerSyncState::AncestorSearch(our_best),
|
||||
});
|
||||
Self::request_ancestry(io, protocol, peer_id, our_best)
|
||||
} else {
|
||||
// We are at genesis, just start downloading
|
||||
debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number);
|
||||
self.peers.insert(peer_id, PeerSync {
|
||||
common_hash: self.genesis_hash,
|
||||
common_number: 0,
|
||||
best_hash: info.best_hash,
|
||||
best_number: info.best_number,
|
||||
state: PeerSyncState::Available,
|
||||
});
|
||||
self.download_new(io, protocol, peer_id)
|
||||
}
|
||||
},
|
||||
(Ok(BlockStatus::Queued), _) | (Ok(BlockStatus::InChain), _) => {
|
||||
debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number);
|
||||
self.peers.insert(peer_id, PeerSync {
|
||||
common_hash: info.best_hash,
|
||||
common_number: info.best_number,
|
||||
best_hash: info.best_hash,
|
||||
best_number: info.best_number,
|
||||
state: PeerSyncState::Available,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn on_block_data(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, _request: message::BlockRequest, response: message::BlockResponse) {
|
||||
let count = response.blocks.len();
|
||||
let mut imported: usize = 0;
|
||||
let new_blocks = if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
||||
match peer.state {
|
||||
PeerSyncState::DownloadingNew(start_block) => {
|
||||
self.blocks.clear_peer_download(peer_id);
|
||||
peer.state = PeerSyncState::Available;
|
||||
|
||||
self.blocks.insert(start_block, response.blocks, peer_id);
|
||||
self.blocks.drain(self.best_queued_number + 1)
|
||||
},
|
||||
PeerSyncState::DownloadingStale(_) => {
|
||||
peer.state = PeerSyncState::Available;
|
||||
response.blocks.into_iter().map(|b| blocks::BlockData {
|
||||
origin: peer_id,
|
||||
block: b
|
||||
}).collect()
|
||||
},
|
||||
PeerSyncState::AncestorSearch(n) => {
|
||||
match response.blocks.get(0) {
|
||||
Some(ref block) => {
|
||||
match protocol.chain().block_hash(n) {
|
||||
Ok(Some(block_hash)) if block_hash == block.hash => {
|
||||
peer.common_hash = block.hash;
|
||||
peer.common_number = n;
|
||||
peer.state = PeerSyncState::Available;
|
||||
trace!(target:"sync", "Found common ancestor for peer {}: {} ({})", peer_id, block.hash, n);
|
||||
vec![]
|
||||
},
|
||||
Ok(_) if n > 0 => {
|
||||
let n = n - 1;
|
||||
peer.state = PeerSyncState::AncestorSearch(n);
|
||||
Self::request_ancestry(io, protocol, peer_id, n);
|
||||
return;
|
||||
},
|
||||
Ok(_) => { // genesis mismatch
|
||||
io.disable_peer(peer_id);
|
||||
return;
|
||||
},
|
||||
Err(e) => {
|
||||
debug!(target:"sync", "Error reading blockchain: {:?}", e);
|
||||
io.disconnect_peer(peer_id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
},
|
||||
None => {
|
||||
trace!(target:"sync", "Invalid response when searching for ancestor from {}", peer_id);
|
||||
io.disconnect_peer(peer_id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
},
|
||||
PeerSyncState::Available => Vec::new(),
|
||||
}
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
|
||||
// Blocks in the response/drain should be in ascending order.
|
||||
for block in new_blocks {
|
||||
let origin = block.origin;
|
||||
let block = block.block;
|
||||
if let Some(header) = block.header {
|
||||
let number = header.number;
|
||||
let hash = header_hash(&header);
|
||||
let parent = header.parent_hash;
|
||||
let result = protocol.chain().import(header, block.body);
|
||||
match result {
|
||||
Ok(ImportResult::AlreadyInChain) => {
|
||||
trace!(target: "sync", "Block already in chain {}: {:?}", number, hash);
|
||||
self.block_imported(&hash, number);
|
||||
},
|
||||
Ok(ImportResult::AlreadyQueued) => {
|
||||
trace!(target: "sync", "Block already queued {}: {:?}", number, hash);
|
||||
self.block_imported(&hash, number);
|
||||
},
|
||||
Ok(ImportResult::Queued) => {
|
||||
trace!(target: "sync", "Block queued {}: {:?}", number, hash);
|
||||
self.block_imported(&hash, number);
|
||||
imported = imported + 1;
|
||||
},
|
||||
Ok(ImportResult::UnknownParent) => {
|
||||
debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent);
|
||||
self.restart(io, protocol);
|
||||
return;
|
||||
},
|
||||
Ok(ImportResult::KnownBad) => {
|
||||
debug!(target: "sync", "Bad block {}: {:?}", number, hash);
|
||||
io.disable_peer(origin); //TODO: use persistent ID
|
||||
self.restart(io, protocol);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e);
|
||||
self.restart(io, protocol);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!(target: "sync", "Imported {} of {}", imported, count);
|
||||
self.maintain_sync(io, protocol);
|
||||
}
|
||||
|
||||
fn maintain_sync(&mut self, io: &mut SyncIo, protocol: &Protocol) {
|
||||
let peers: Vec<PeerId> = self.peers.keys().map(|p| *p).collect();
|
||||
for peer in peers {
|
||||
self.download_new(io, protocol, peer);
|
||||
}
|
||||
}
|
||||
|
||||
fn block_imported(&mut self, hash: &HeaderHash, number: BlockNumber) {
|
||||
if number > self.best_queued_number {
|
||||
self.best_queued_number = number;
|
||||
self.best_queued_hash = *hash;
|
||||
}
|
||||
// Update common blocks
|
||||
for (_, peer) in self.peers.iter_mut() {
|
||||
if peer.best_number >= number {
|
||||
peer.common_number = number;
|
||||
peer.common_hash = *hash;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_chain_info(&mut self, best_header: &Header ) {
|
||||
let hash = header_hash(&best_header);
|
||||
self.block_imported(&hash, best_header.number)
|
||||
}
|
||||
|
||||
pub fn on_block_announce(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, header: &Header) {
|
||||
let hash = header_hash(&header);
|
||||
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
||||
if header.number > peer.best_number {
|
||||
peer.best_number = header.number;
|
||||
peer.best_hash = hash;
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
|
||||
if !self.is_known_or_already_downloading(protocol, &hash) {
|
||||
let stale = header.number <= self.best_queued_number;
|
||||
if stale {
|
||||
if !self.is_known_or_already_downloading(protocol, &header.parent_hash) {
|
||||
trace!(target: "sync", "Ignoring unkown stale block announce from {}: {} {:?}", peer_id, hash, header);
|
||||
} else {
|
||||
trace!(target: "sync", "Downloading new stale block announced from {}: {} {:?}", peer_id, hash, header);
|
||||
self.download_stale(io, protocol, peer_id, &hash);
|
||||
}
|
||||
} else {
|
||||
trace!(target: "sync", "Downloading new block announced from {}: {} {:?}", peer_id, hash, header);
|
||||
self.download_new(io, protocol, peer_id);
|
||||
}
|
||||
} else {
|
||||
trace!(target: "sync", "Known block announce from {}: {}", peer_id, hash);
|
||||
}
|
||||
}
|
||||
|
||||
fn is_known_or_already_downloading(&self, protocol: &Protocol, hash: &HeaderHash) -> bool {
|
||||
self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
|
||||
|| protocol.chain().block_status(&BlockId::Hash(*hash)).ok().map_or(false, |s| s != BlockStatus::Unknown)
|
||||
}
|
||||
|
||||
pub fn peer_disconnected(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) {
|
||||
self.blocks.clear_peer_download(peer_id);
|
||||
self.peers.remove(&peer_id);
|
||||
self.maintain_sync(io, protocol);
|
||||
}
|
||||
|
||||
pub fn restart(&mut self, io: &mut SyncIo, protocol: &Protocol) {
|
||||
self.blocks.clear();
|
||||
let ids: Vec<PeerId> = self.peers.keys().map(|p| *p).collect();
|
||||
for id in ids {
|
||||
self.new_peer(io, protocol, id);
|
||||
}
|
||||
match protocol.chain().info() {
|
||||
Ok(info) => {
|
||||
self.best_queued_hash = info.best_queued_hash.unwrap_or(info.chain.best_hash);
|
||||
self.best_queued_number = info.best_queued_number.unwrap_or(info.chain.best_number);
|
||||
},
|
||||
Err(e) => {
|
||||
debug!(target:"sync", "Error reading blockchain: {:?}", e);
|
||||
self.best_queued_hash = self.genesis_hash;
|
||||
self.best_queued_number = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) {
|
||||
self.blocks.clear();
|
||||
self.peers.clear();
|
||||
}
|
||||
|
||||
// Download old block.
|
||||
fn download_stale(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, hash: &HeaderHash) {
|
||||
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
||||
match peer.state {
|
||||
PeerSyncState::Available => {
|
||||
let request = message::BlockRequest {
|
||||
id: 0,
|
||||
fields: self.required_block_attributes.clone(),
|
||||
from: message::FromBlock::Hash(*hash),
|
||||
to: None,
|
||||
direction: message::Direction::Ascending,
|
||||
max: Some(1),
|
||||
};
|
||||
peer.state = PeerSyncState::DownloadingStale(*hash);
|
||||
protocol.send_message(io, peer_id, Message::BlockRequest(request));
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Issue a request for a peer to download new blocks, if any are available
|
||||
fn download_new(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) {
|
||||
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
||||
trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", peer_id, peer.common_number, peer.best_number);
|
||||
match peer.state {
|
||||
PeerSyncState::Available => {
|
||||
if let Some(range) = self.blocks.needed_blocks(peer_id, MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) {
|
||||
trace!(target: "sync", "Requesting blocks from {}, ({} to {})", peer_id, range.start, range.end);
|
||||
let request = message::BlockRequest {
|
||||
id: 0,
|
||||
fields: self.required_block_attributes.clone(),
|
||||
from: message::FromBlock::Number(range.start),
|
||||
to: None,
|
||||
direction: message::Direction::Ascending,
|
||||
max: Some((range.end - range.start) as u32),
|
||||
};
|
||||
peer.state = PeerSyncState::DownloadingNew(range.start);
|
||||
protocol.send_message(io, peer_id, Message::BlockRequest(request));
|
||||
} else {
|
||||
trace!(target: "sync", "Nothing to request");
|
||||
}
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn request_ancestry(io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, block: BlockNumber) {
|
||||
let request = message::BlockRequest {
|
||||
id: 0,
|
||||
fields: vec![message::BlockAttribute::Header],
|
||||
from: message::FromBlock::Number(block),
|
||||
to: None,
|
||||
direction: message::Direction::Ascending,
|
||||
max: Some(1),
|
||||
};
|
||||
protocol.send_message(io, peer_id, Message::BlockRequest(request));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,264 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
mod sync;
|
||||
|
||||
use std::collections::{VecDeque, HashSet, HashMap};
|
||||
use std::sync::Arc;
|
||||
use parking_lot::RwLock;
|
||||
use client::{self, BlockId};
|
||||
use primitives::block;
|
||||
use substrate_executor as executor;
|
||||
use io::SyncIo;
|
||||
use protocol::Protocol;
|
||||
use config::ProtocolConfig;
|
||||
use network::{PeerId, SessionInfo, Error as NetworkError};
|
||||
|
||||
pub struct TestIo<'p> {
|
||||
pub queue: &'p RwLock<VecDeque<TestPacket>>,
|
||||
pub sender: Option<PeerId>,
|
||||
pub to_disconnect: HashSet<PeerId>,
|
||||
pub packets: Vec<TestPacket>,
|
||||
pub peers_info: HashMap<PeerId, String>,
|
||||
}
|
||||
|
||||
impl<'p> TestIo<'p> where {
|
||||
pub fn new(queue: &'p RwLock<VecDeque<TestPacket>>, sender: Option<PeerId>) -> TestIo<'p> {
|
||||
TestIo {
|
||||
queue: queue,
|
||||
sender: sender,
|
||||
to_disconnect: HashSet::new(),
|
||||
packets: Vec::new(),
|
||||
peers_info: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'p> Drop for TestIo<'p> {
|
||||
fn drop(&mut self) {
|
||||
self.queue.write().extend(self.packets.drain(..));
|
||||
}
|
||||
}
|
||||
|
||||
impl<'p> SyncIo for TestIo<'p> {
|
||||
fn disable_peer(&mut self, peer_id: PeerId) {
|
||||
self.disconnect_peer(peer_id);
|
||||
}
|
||||
|
||||
fn disconnect_peer(&mut self, peer_id: PeerId) {
|
||||
self.to_disconnect.insert(peer_id);
|
||||
}
|
||||
|
||||
fn is_expired(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) -> Result<(), NetworkError> {
|
||||
self.packets.push(TestPacket {
|
||||
data: data,
|
||||
recipient: peer_id,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn peer_info(&self, peer_id: PeerId) -> String {
|
||||
self.peers_info.get(&peer_id)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| peer_id.to_string())
|
||||
}
|
||||
|
||||
fn peer_session_info(&self, _peer_id: PeerId) -> Option<SessionInfo> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Mocked subprotocol packet
|
||||
pub struct TestPacket {
|
||||
pub data: Vec<u8>,
|
||||
pub recipient: PeerId,
|
||||
}
|
||||
|
||||
pub struct Peer {
|
||||
pub chain: Arc<client::Client<client::in_mem::Backend, executor::WasmExecutor>>,
|
||||
pub sync: Protocol,
|
||||
pub queue: RwLock<VecDeque<TestPacket>>,
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
/// Called after blockchain has been populated to updated current state.
|
||||
fn start(&self) {
|
||||
// Update the sync state to the lates chain state.
|
||||
let info = self.chain.info().expect("In-mem chain does not fail");
|
||||
let header = self.chain.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap();
|
||||
self.sync.on_block_imported(&header);
|
||||
}
|
||||
|
||||
/// Called on connection to other indicated peer.
|
||||
fn on_connect(&self, other: PeerId) {
|
||||
self.sync.on_peer_connected(&mut TestIo::new(&self.queue, Some(other)), other);
|
||||
}
|
||||
|
||||
/// Called on disconnect from other indicated peer.
|
||||
fn on_disconnect(&self, other: PeerId) {
|
||||
let mut io = TestIo::new(&self.queue, Some(other));
|
||||
self.sync.on_peer_disconnected(&mut io, other);
|
||||
}
|
||||
|
||||
/// Receive a message from another peer. Return a set of peers to disconnect.
|
||||
fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> {
|
||||
let mut io = TestIo::new(&self.queue, Some(from));
|
||||
self.sync.handle_packet(&mut io, from, &msg.data);
|
||||
self.flush();
|
||||
io.to_disconnect.clone()
|
||||
}
|
||||
|
||||
/// Produce the next pending message to send to another peer.
|
||||
fn pending_message(&self) -> Option<TestPacket> {
|
||||
self.flush();
|
||||
self.queue.write().pop_front()
|
||||
}
|
||||
|
||||
/// Whether this peer is done syncing (has no messages to send).
|
||||
fn is_done(&self) -> bool {
|
||||
self.queue.read().is_empty()
|
||||
}
|
||||
|
||||
/// Execute a "sync step". This is called for each peer after it sends a packet.
|
||||
fn sync_step(&self) {
|
||||
self.flush();
|
||||
self.sync.tick(&mut TestIo::new(&self.queue, None));
|
||||
}
|
||||
|
||||
/// Restart sync for a peer.
|
||||
fn restart_sync(&self) {
|
||||
self.sync.abort();
|
||||
}
|
||||
|
||||
fn flush(&self) {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestNet {
|
||||
pub peers: Vec<Arc<Peer>>,
|
||||
pub started: bool,
|
||||
pub disconnect_events: Vec<(PeerId, PeerId)>, //disconnected (initiated by, to)
|
||||
}
|
||||
|
||||
impl TestNet {
|
||||
pub fn new(n: usize) -> Self {
|
||||
Self::new_with_config(n, ProtocolConfig::default())
|
||||
}
|
||||
|
||||
pub fn new_with_config(n: usize, config: ProtocolConfig) -> Self {
|
||||
let mut net = TestNet {
|
||||
peers: Vec::new(),
|
||||
started: false,
|
||||
disconnect_events: Vec::new(),
|
||||
};
|
||||
let test_genesis_block = block::Header {
|
||||
parent_hash: 0.into(),
|
||||
number: 0,
|
||||
state_root: 0.into(),
|
||||
transaction_root: Default::default(),
|
||||
digest: Default::default(),
|
||||
};
|
||||
|
||||
for _ in 0..n {
|
||||
let chain = Arc::new(client::new_in_mem(executor::WasmExecutor,
|
||||
|| (test_genesis_block.clone(), vec![])).unwrap());
|
||||
let sync = Protocol::new(config.clone(), chain.clone()).unwrap();
|
||||
net.peers.push(Arc::new(Peer {
|
||||
sync: sync,
|
||||
chain: chain,
|
||||
queue: RwLock::new(VecDeque::new()),
|
||||
}));
|
||||
}
|
||||
net
|
||||
}
|
||||
|
||||
pub fn peer(&self, i: usize) -> &Peer {
|
||||
&self.peers[i]
|
||||
}
|
||||
|
||||
pub fn start(&mut self) {
|
||||
if self.started {
|
||||
return;
|
||||
}
|
||||
for peer in 0..self.peers.len() {
|
||||
self.peers[peer].start();
|
||||
for client in 0..self.peers.len() {
|
||||
if peer != client {
|
||||
self.peers[peer].on_connect(client as PeerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.started = true;
|
||||
}
|
||||
|
||||
pub fn sync_step(&mut self) {
|
||||
for peer in 0..self.peers.len() {
|
||||
let packet = self.peers[peer].pending_message();
|
||||
if let Some(packet) = packet {
|
||||
let disconnecting = {
|
||||
let recipient = packet.recipient;
|
||||
trace!("--- {} -> {} ---", peer, recipient);
|
||||
let to_disconnect = self.peers[recipient].receive_message(peer as PeerId, packet);
|
||||
for d in &to_disconnect {
|
||||
// notify this that disconnecting peers are disconnecting
|
||||
self.peers[recipient].on_disconnect(*d as PeerId);
|
||||
self.disconnect_events.push((peer, *d));
|
||||
}
|
||||
to_disconnect
|
||||
};
|
||||
for d in &disconnecting {
|
||||
// notify other peers that this peer is disconnecting
|
||||
self.peers[*d].on_disconnect(peer as PeerId);
|
||||
}
|
||||
}
|
||||
|
||||
self.sync_step_peer(peer);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sync_step_peer(&mut self, peer_num: usize) {
|
||||
self.peers[peer_num].sync_step();
|
||||
}
|
||||
|
||||
pub fn restart_peer(&mut self, i: usize) {
|
||||
self.peers[i].restart_sync();
|
||||
}
|
||||
|
||||
pub fn sync(&mut self) -> u32 {
|
||||
self.start();
|
||||
let mut total_steps = 0;
|
||||
while !self.done() {
|
||||
self.sync_step();
|
||||
total_steps += 1;
|
||||
}
|
||||
total_steps
|
||||
}
|
||||
|
||||
pub fn sync_steps(&mut self, count: usize) {
|
||||
self.start();
|
||||
for _ in 0..count {
|
||||
self.sync_step();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn done(&self) -> bool {
|
||||
self.peers.iter().all(|p| p.is_done())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use client::backend::Backend;
|
||||
use sync::SyncState;
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn sync_from_two_peers_works() {
|
||||
::env_logger::init().ok();
|
||||
let mut net = TestNet::new(3);
|
||||
net.peer(1).chain.backend().push_blocks(100);
|
||||
net.peer(2).chain.backend().push_blocks(100);
|
||||
net.sync();
|
||||
assert!(net.peer(0).chain.backend().blockchain().equals_to(net.peer(1).chain.backend().blockchain()));
|
||||
let status = net.peer(0).sync.status();
|
||||
assert_eq!(status.sync.state, SyncState::Idle);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync_from_two_peers_with_ancestry_search_works() {
|
||||
::env_logger::init().ok();
|
||||
let mut net = TestNet::new(3);
|
||||
net.peer(0).chain.backend().generate_blocks(10, |header| header.state_root = 42.into());
|
||||
net.peer(1).chain.backend().push_blocks(100);
|
||||
net.peer(2).chain.backend().push_blocks(100);
|
||||
net.restart_peer(0);
|
||||
net.sync();
|
||||
assert!(net.peer(0).chain.backend().blockchain().canon_equals_to(net.peer(1).chain.backend().blockchain()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync_long_chain_works() {
|
||||
let mut net = TestNet::new(2);
|
||||
net.peer(1).chain.backend().push_blocks(5000);
|
||||
net.sync_steps(3);
|
||||
assert_eq!(net.peer(0).sync.status().sync.state, SyncState::Downloading);
|
||||
net.sync();
|
||||
assert!(net.peer(0).chain.backend().blockchain().equals_to(net.peer(1).chain.backend().blockchain()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync_no_common_longer_chain_fails() {
|
||||
::env_logger::init().ok();
|
||||
let mut net = TestNet::new(3);
|
||||
net.peer(0).chain.backend().generate_blocks(200, |header| header.state_root = 42.into());
|
||||
net.peer(1).chain.backend().push_blocks(200);
|
||||
net.sync();
|
||||
assert!(!net.peer(0).chain.backend().blockchain().canon_equals_to(net.peer(1).chain.backend().blockchain()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync_after_fork_works() {
|
||||
::env_logger::init().ok();
|
||||
let mut net = TestNet::new(3);
|
||||
net.peer(0).chain.backend().push_blocks(30);
|
||||
net.peer(1).chain.backend().push_blocks(30);
|
||||
net.peer(2).chain.backend().push_blocks(30);
|
||||
|
||||
net.peer(0).chain.backend().generate_blocks(10, |header| header.state_root = 42.into()); // fork
|
||||
net.peer(1).chain.backend().push_blocks(20);
|
||||
net.peer(2).chain.backend().push_blocks(20);
|
||||
|
||||
net.peer(1).chain.backend().generate_blocks(10, |header| header.state_root = 42.into()); // second fork between 1 and 2
|
||||
net.peer(2).chain.backend().push_blocks(1);
|
||||
|
||||
// peer 1 has the best chain
|
||||
let peer1_chain = net.peer(1).chain.backend().blockchain().clone();
|
||||
net.sync();
|
||||
assert!(net.peer(0).chain.backend().blockchain().canon_equals_to(&peer1_chain));
|
||||
assert!(net.peer(1).chain.backend().blockchain().canon_equals_to(&peer1_chain));
|
||||
assert!(net.peer(2).chain.backend().blockchain().canon_equals_to(&peer1_chain));
|
||||
}
|
||||
|
||||
@@ -0,0 +1,254 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Polkadot.
|
||||
|
||||
// Polkadot is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Polkadot is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
mod sync;
|
||||
|
||||
use std::collections::{VecDeque, HashSet, HashMap};
|
||||
use std::sync::Arc;
|
||||
use parking_lot::RwLock;
|
||||
use client::{self, BlockId};
|
||||
use substrate_executor as executor;
|
||||
use io::SyncIo;
|
||||
use protocol::Protocol;
|
||||
use config::ProtocolConfig;
|
||||
use network::{PeerId, SessionInfo, Error as NetworkError};
|
||||
|
||||
pub struct TestIo<'p> {
|
||||
pub queue: &'p RwLock<VecDeque<TestPacket>>,
|
||||
pub sender: Option<PeerId>,
|
||||
pub to_disconnect: HashSet<PeerId>,
|
||||
pub packets: Vec<TestPacket>,
|
||||
pub peers_info: HashMap<PeerId, String>,
|
||||
}
|
||||
|
||||
impl<'p> TestIo<'p> where {
|
||||
pub fn new(queue: &'p RwLock<VecDeque<TestPacket>>, sender: Option<PeerId>) -> TestIo<'p> {
|
||||
TestIo {
|
||||
queue: queue,
|
||||
sender: sender,
|
||||
to_disconnect: HashSet::new(),
|
||||
packets: Vec::new(),
|
||||
peers_info: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'p> Drop for TestIo<'p> {
|
||||
fn drop(&mut self) {
|
||||
self.queue.write().extend(self.packets.drain(..));
|
||||
}
|
||||
}
|
||||
|
||||
impl<'p> SyncIo for TestIo<'p> {
|
||||
fn disable_peer(&mut self, peer_id: PeerId) {
|
||||
self.disconnect_peer(peer_id);
|
||||
}
|
||||
|
||||
fn disconnect_peer(&mut self, peer_id: PeerId) {
|
||||
self.to_disconnect.insert(peer_id);
|
||||
}
|
||||
|
||||
fn is_expired(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) -> Result<(), NetworkError> {
|
||||
self.packets.push(TestPacket {
|
||||
data: data,
|
||||
recipient: peer_id,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn peer_info(&self, peer_id: PeerId) -> String {
|
||||
self.peers_info.get(&peer_id)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| peer_id.to_string())
|
||||
}
|
||||
|
||||
fn peer_session_info(&self, _peer_id: PeerId) -> Option<SessionInfo> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Mocked subprotocol packet
|
||||
pub struct TestPacket {
|
||||
pub data: Vec<u8>,
|
||||
pub recipient: PeerId,
|
||||
}
|
||||
|
||||
pub struct Peer {
|
||||
pub chain: Arc<client::Client<client::in_mem::Backend, executor::DefaultExecutor>>,
|
||||
pub sync: Protocol,
|
||||
pub queue: RwLock<VecDeque<TestPacket>>,
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
/// Called after blockchain has been populated to updated current state.
|
||||
fn start(&self) {
|
||||
// Update the sync state to the lates chain state.
|
||||
let info = self.chain.info().expect("In-mem chain does not fail");
|
||||
let header = self.chain.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap();
|
||||
self.sync.on_block_imported(&header);
|
||||
}
|
||||
|
||||
/// Called on connection to other indicated peer.
|
||||
fn on_connect(&self, other: PeerId) {
|
||||
self.sync.on_peer_connected(&mut TestIo::new(&self.queue, Some(other)), other);
|
||||
}
|
||||
|
||||
/// Called on disconnect from other indicated peer.
|
||||
fn on_disconnect(&self, other: PeerId) {
|
||||
let mut io = TestIo::new(&self.queue, Some(other));
|
||||
self.sync.on_peer_disconnected(&mut io, other);
|
||||
}
|
||||
|
||||
/// Receive a message from another peer. Return a set of peers to disconnect.
|
||||
fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> {
|
||||
let mut io = TestIo::new(&self.queue, Some(from));
|
||||
self.sync.handle_packet(&mut io, from, &msg.data);
|
||||
self.flush();
|
||||
io.to_disconnect.clone()
|
||||
}
|
||||
|
||||
/// Produce the next pending message to send to another peer.
|
||||
fn pending_message(&self) -> Option<TestPacket> {
|
||||
self.flush();
|
||||
self.queue.write().pop_front()
|
||||
}
|
||||
|
||||
/// Whether this peer is done syncing (has no messages to send).
|
||||
fn is_done(&self) -> bool {
|
||||
self.queue.read().is_empty()
|
||||
}
|
||||
|
||||
/// Execute a "sync step". This is called for each peer after it sends a packet.
|
||||
fn sync_step(&self) {
|
||||
self.flush();
|
||||
self.sync.tick(&mut TestIo::new(&self.queue, None));
|
||||
}
|
||||
|
||||
/// Restart sync for a peer.
|
||||
fn restart_sync(&self) {
|
||||
self.sync.abort();
|
||||
}
|
||||
|
||||
fn flush(&self) {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestNet {
|
||||
pub peers: Vec<Arc<Peer>>,
|
||||
pub started: bool,
|
||||
pub disconnect_events: Vec<(PeerId, PeerId)>, //disconnected (initiated by, to)
|
||||
}
|
||||
|
||||
impl TestNet {
|
||||
pub fn new(n: usize) -> Self {
|
||||
Self::new_with_config(n, ProtocolConfig::default())
|
||||
}
|
||||
|
||||
pub fn new_with_config(n: usize, config: ProtocolConfig) -> Self {
|
||||
let mut net = TestNet {
|
||||
peers: Vec::new(),
|
||||
started: false,
|
||||
disconnect_events: Vec::new(),
|
||||
};
|
||||
for _ in 0..n {
|
||||
let chain = Arc::new(client::new_in_mem(executor::executor()).unwrap());
|
||||
let sync = Protocol::new(config.clone(), chain.clone()).unwrap();
|
||||
net.peers.push(Arc::new(Peer {
|
||||
sync: sync,
|
||||
chain: chain,
|
||||
queue: RwLock::new(VecDeque::new()),
|
||||
}));
|
||||
}
|
||||
net
|
||||
}
|
||||
|
||||
pub fn peer(&self, i: usize) -> &Peer {
|
||||
&self.peers[i]
|
||||
}
|
||||
|
||||
pub fn start(&mut self) {
|
||||
if self.started {
|
||||
return;
|
||||
}
|
||||
for peer in 0..self.peers.len() {
|
||||
self.peers[peer].start();
|
||||
for client in 0..self.peers.len() {
|
||||
if peer != client {
|
||||
self.peers[peer].on_connect(client as PeerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.started = true;
|
||||
}
|
||||
|
||||
pub fn sync_step(&mut self) {
|
||||
for peer in 0..self.peers.len() {
|
||||
let packet = self.peers[peer].pending_message();
|
||||
if let Some(packet) = packet {
|
||||
let disconnecting = {
|
||||
let recipient = packet.recipient;
|
||||
trace!("--- {} -> {} ---", peer, recipient);
|
||||
let to_disconnect = self.peers[recipient].receive_message(peer as PeerId, packet);
|
||||
for d in &to_disconnect {
|
||||
// notify this that disconnecting peers are disconnecting
|
||||
self.peers[recipient].on_disconnect(*d as PeerId);
|
||||
self.disconnect_events.push((peer, *d));
|
||||
}
|
||||
to_disconnect
|
||||
};
|
||||
for d in &disconnecting {
|
||||
// notify other peers that this peer is disconnecting
|
||||
self.peers[*d].on_disconnect(peer as PeerId);
|
||||
}
|
||||
}
|
||||
|
||||
self.sync_step_peer(peer);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sync_step_peer(&mut self, peer_num: usize) {
|
||||
self.peers[peer_num].sync_step();
|
||||
}
|
||||
|
||||
pub fn restart_peer(&mut self, i: usize) {
|
||||
self.peers[i].restart_sync();
|
||||
}
|
||||
|
||||
pub fn sync(&mut self) -> u32 {
|
||||
self.start();
|
||||
let mut total_steps = 0;
|
||||
while !self.done() {
|
||||
self.sync_step();
|
||||
total_steps += 1;
|
||||
}
|
||||
total_steps
|
||||
}
|
||||
|
||||
pub fn sync_steps(&mut self, count: usize) {
|
||||
self.start();
|
||||
for _ in 0..count {
|
||||
self.sync_step();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn done(&self) -> bool {
|
||||
self.peers.iter().all(|p| p.is_done())
|
||||
}
|
||||
}
|
||||
@@ -42,6 +42,6 @@ impl<B, E> ChainApi for client::Client<B, E> where
|
||||
client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>,
|
||||
{
|
||||
fn header(&self, hash: block::HeaderHash) -> Result<Option<block::Header>> {
|
||||
client::Client::header(self, &hash).chain_err(|| "Blockchain error")
|
||||
client::Client::header(self, &client::BlockId::Hash(hash)).chain_err(|| "Blockchain error")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ mod error;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use client::{self, Client};
|
||||
use client::{self, Client, BlockId};
|
||||
use primitives::block;
|
||||
use primitives::storage::{StorageKey, StorageData};
|
||||
use state_machine;
|
||||
@@ -47,10 +47,10 @@ impl<B, E> StateApi for Client<B, E> where
|
||||
client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>,
|
||||
{
|
||||
fn storage(&self, key: StorageKey, block: block::HeaderHash) -> Result<StorageData> {
|
||||
Ok(self.storage(&block, &key)?)
|
||||
Ok(self.storage(&BlockId::Hash(block), &key)?)
|
||||
}
|
||||
|
||||
fn call(&self, method: String, data: Vec<u8>, block: block::HeaderHash) -> Result<Vec<u8>> {
|
||||
Ok(self.call(&block, &method, &data)?.return_data)
|
||||
Ok(self.call(&BlockId::Hash(block), &method, &data)?.return_data)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user