diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index b2fca77787..2d17000bf4 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -278,7 +278,7 @@ dependencies = [ [[package]] name = "ethcore-bytes" version = "0.1.0" -source = "git+https://github.com/paritytech/parity.git#0a7cebe316c4dea69cd619908246ae13816adfc8" +source = "git+https://github.com/paritytech/parity.git#a59f6d9bd20b4131fd162a898b20238343db665b" [[package]] name = "ethcore-bytes" @@ -288,7 +288,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "ethcore-io" version = "1.9.0" -source = "git+https://github.com/paritytech/parity.git#0a7cebe316c4dea69cd619908246ae13816adfc8" +source = "git+https://github.com/paritytech/parity.git#a59f6d9bd20b4131fd162a898b20238343db665b" dependencies = [ "crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -300,7 +300,7 @@ dependencies = [ [[package]] name = "ethcore-logger" version = "1.9.0" -source = "git+https://github.com/paritytech/parity.git#0a7cebe316c4dea69cd619908246ae13816adfc8" +source = "git+https://github.com/paritytech/parity.git#a59f6d9bd20b4131fd162a898b20238343db665b" dependencies = [ "ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -332,7 +332,7 @@ dependencies = [ [[package]] name = "ethcore-network" version = "1.9.0" -source = "git+https://github.com/paritytech/parity.git#0a7cebe316c4dea69cd619908246ae13816adfc8" +source = "git+https://github.com/paritytech/parity.git#a59f6d9bd20b4131fd162a898b20238343db665b" dependencies = [ "ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -367,7 +367,7 @@ dependencies = [ [[package]] name = "ethcrypto" version = "0.1.0" -source = "git+https://github.com/paritytech/parity.git#0a7cebe316c4dea69cd619908246ae13816adfc8" +source = "git+https://github.com/paritytech/parity.git#a59f6d9bd20b4131fd162a898b20238343db665b" dependencies = [ "eth-secp256k1 0.5.7 (git+https://github.com/paritytech/rust-secp256k1)", "ethereum-types 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -392,7 +392,7 @@ dependencies = [ [[package]] name = "ethkey" version = "0.3.0" -source = "git+https://github.com/paritytech/parity.git#0a7cebe316c4dea69cd619908246ae13816adfc8" +source = "git+https://github.com/paritytech/parity.git#a59f6d9bd20b4131fd162a898b20238343db665b" dependencies = [ "byteorder 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "edit-distance 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -682,7 +682,7 @@ dependencies = [ [[package]] name = "keccak-hash" version = "0.1.0" -source = "git+https://github.com/paritytech/parity.git#0a7cebe316c4dea69cd619908246ae13816adfc8" +source = "git+https://github.com/paritytech/parity.git#a59f6d9bd20b4131fd162a898b20238343db665b" dependencies = [ "cc 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "ethereum-types 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -916,7 +916,7 @@ dependencies = [ [[package]] name = "path" version = "0.1.0" -source = "git+https://github.com/paritytech/parity.git#0a7cebe316c4dea69cd619908246ae13816adfc8" +source = "git+https://github.com/paritytech/parity.git#a59f6d9bd20b4131fd162a898b20238343db665b" [[package]] name = "patricia-trie" @@ -955,7 +955,6 @@ version = "0.1.0" dependencies = [ "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-cli 0.1.0", - "polkadot-network 0.1.0", ] [[package]] @@ -1015,32 +1014,6 @@ dependencies = [ "triehash 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "polkadot-network" -version = "0.1.0" -dependencies = [ - "bitflags 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", - "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", - "ethcore-io 1.9.0 (git+https://github.com/paritytech/parity.git)", - "ethcore-network 1.9.0 (git+https://github.com/paritytech/parity.git)", - "heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "ipnetwork 0.12.7 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "polkadot-primitives 0.1.0", - "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", - "semver 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_derive 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", - "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", - "smallvec 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", - "substrate-client 0.1.0", - "substrate-primitives 0.1.0", - "substrate-serializer 0.1.0", - "substrate-state-machine 0.1.0", -] - [[package]] name = "polkadot-primitives" version = "0.1.0" @@ -1211,7 +1184,7 @@ dependencies = [ [[package]] name = "rlp" version = "0.2.1" -source = "git+https://github.com/paritytech/parity.git#0a7cebe316c4dea69cd619908246ae13816adfc8" +source = "git+https://github.com/paritytech/parity.git#a59f6d9bd20b4131fd162a898b20238343db665b" dependencies = [ "byteorder 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "elastic-array 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1355,14 +1328,6 @@ name = "smallvec" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "smallvec" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "smallvec" version = "0.6.0" @@ -1442,6 +1407,28 @@ dependencies = [ "triehash 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "substrate-network" +version = "0.1.0" +dependencies = [ + "bitflags 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "ethcore-io 1.9.0 (git+https://github.com/paritytech/parity.git)", + "ethcore-network 1.9.0 (git+https://github.com/paritytech/parity.git)", + "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", + "substrate-client 0.1.0", + "substrate-executor 0.1.0", + "substrate-primitives 0.1.0", + "substrate-serializer 0.1.0", + "substrate-state-machine 0.1.0", +] + [[package]] name = "substrate-primitives" version = "0.1.0" @@ -1989,7 +1976,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fdeff4cd9ecff59ec7e3744cbca73dfe5ac35c2aedb2cfba8a1c715a18912e9d" "checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" -"checksum smallvec 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ee4f357e8cd37bf8822e1b964e96fd39e2cb5a0424f8aaa284ccaccc2162411c" "checksum smallvec 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44db0ecb22921ef790d17ae13a3f6d15784183ff5f2a01aa32098c7498d2b4b9" "checksum snappy 0.1.0 (git+https://github.com/paritytech/rust-snappy)" = "" "checksum snappy-sys 0.1.0 (git+https://github.com/paritytech/rust-snappy)" = "" diff --git a/substrate/Cargo.toml b/substrate/Cargo.toml index 1c3d80e3c7..aecab804f1 100644 --- a/substrate/Cargo.toml +++ b/substrate/Cargo.toml @@ -10,7 +10,6 @@ authors = ["Parity Technologies "] [dependencies] error-chain = "0.11" polkadot-cli = { path = "polkadot/cli" } -polkadot-network = { path = "polkadot/network" } [workspace] members = [ @@ -18,7 +17,6 @@ members = [ "substrate/codec", "substrate/environmental", "substrate/executor", - "polkadot/network", "polkadot/candidate-agreement", "polkadot/cli", "polkadot/collator", @@ -26,6 +24,7 @@ members = [ "polkadot/runtime", "polkadot/primitives", "polkadot/validator", + "substrate/network", "substrate/primitives", "substrate/rpc", "substrate/rpc-servers", diff --git a/substrate/polkadot/network/src/test/mod.rs b/substrate/polkadot/network/src/test/mod.rs deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/substrate/substrate/client/src/in_mem.rs b/substrate/substrate/client/src/in_mem.rs index fe8064c01b..0ed3499033 100644 --- a/substrate/substrate/client/src/in_mem.rs +++ b/substrate/substrate/client/src/in_mem.rs @@ -35,6 +35,7 @@ struct PendingBlock { is_best: bool, } +#[derive(PartialEq, Eq, Clone)] struct Block { header: block::Header, body: Option, @@ -46,6 +47,7 @@ pub struct BlockImportOperation { pending_state: state_machine::backend::InMemory, } +#[derive(Clone)] struct BlockchainStorage { blocks: HashMap, hashes: HashMap, @@ -59,6 +61,14 @@ pub struct Blockchain { storage: RwLock, } +impl Clone for Blockchain { + fn clone(&self) -> Blockchain { + Blockchain { + storage: RwLock::new(self.storage.read().clone()), + } + } +} + impl Blockchain { fn id(&self, id: BlockId) -> Option { match id { @@ -96,6 +106,21 @@ impl Blockchain { storage.genesis_hash = hash; } } + + /// Compare this blockchain with another in-mem blockchain + pub fn equals_to(&self, other: &Blockchain) -> bool { + self.canon_equals_to(other) && self.storage.read().blocks == other.storage.read().blocks + } + + /// Compare canonical chain to other canonical chain. + pub fn canon_equals_to(&self, other: &Blockchain) -> bool { + let this = self.storage.read(); + let other = other.storage.read(); + this.hashes == other.hashes + && this.best_hash == other.best_hash + && this.best_number == other.best_number + && this.genesis_hash == other.genesis_hash + } } impl blockchain::Backend for Blockchain { @@ -167,6 +192,39 @@ impl Backend { blockchain: Blockchain::new(), } } + + /// Generate and import a sequence of blocks. A user supplied function is allowed to modify each block header. Useful for testing. + pub fn generate_blocks(&self, count: usize, edit_header: F) where F: Fn(&mut block::Header) { + use backend::{Backend, BlockImportOperation}; + let info = blockchain::Backend::info(&self.blockchain).expect("In-memory backend never fails"); + let mut best_num = info.best_number; + let mut best_hash = info.best_hash; + let state_root = blockchain::Backend::header(&self.blockchain, BlockId::Hash(best_hash)) + .expect("In-memory backend never fails") + .expect("Best header always exists in the blockchain") + .state_root; + for _ in 0 .. count { + best_num = best_num + 1; + let mut header = block::Header { + parent_hash: best_hash, + number: best_num, + state_root: state_root, + transaction_root: Default::default(), + digest: Default::default(), + }; + edit_header(&mut header); + + let mut tx = self.begin_transaction(BlockId::Hash(best_hash)).expect("In-memory backend does not fail"); + best_hash = header_hash(&header); + tx.import_block(header, None, true).expect("In-memory backend does not fail"); + self.commit_transaction(tx).expect("In-memory backend does not fail"); + } + } + + /// Generate and import a sequence of blocks. Useful for testing. + pub fn push_blocks(&self, count: usize) { + self.generate_blocks(count, |_| {}) + } } impl backend::Backend for Backend { diff --git a/substrate/substrate/client/src/lib.rs b/substrate/substrate/client/src/lib.rs index 983ab2754f..76a7ef0ffc 100644 --- a/substrate/substrate/client/src/lib.rs +++ b/substrate/substrate/client/src/lib.rs @@ -139,22 +139,27 @@ impl Client where }) } - fn state_at(&self, hash: &block::HeaderHash) -> error::Result { - self.backend.state_at(BlockId::Hash(*hash)) + fn state_at(&self, id: BlockId) -> error::Result { + self.backend.state_at(id) } - /// Return single storage entry of contract under given address in state in a block of given hash. - pub fn storage(&self, hash: &block::HeaderHash, key: &StorageKey) -> error::Result { - Ok(self.state_at(hash)? + /// Expose backend reference. To be used in tests only + pub fn backend(&self) -> &B { + &self.backend + } + + /// Return single storage entry of contract under given address in state in a block of given id. + pub fn storage(&self, id: &BlockId, key: &StorageKey) -> error::Result { + Ok(self.state_at(*id)? .storage(&key.0) .map(|x| StorageData(x.to_vec()))?) } - /// Execute a call to a contract on top of state in a block of given hash. + /// Execute a call to a contract on top of state in a block of given id. /// /// No changes are made. - pub fn call(&self, hash: &block::HeaderHash, method: &str, call_data: &[u8]) -> error::Result { - let state = self.state_at(hash)?; + pub fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> error::Result { + let state = self.state_at(*id)?; let mut changes = state_machine::OverlayedChanges::default(); let _ = state_machine::execute( @@ -176,7 +181,7 @@ impl Client where blockchain::BlockStatus::Unknown => return Ok(ImportResult::UnknownParent), } - let mut transaction = self.backend.begin_transaction(BlockId::Number(header.number))?; + let mut transaction = self.backend.begin_transaction(BlockId::Hash(header.parent_hash))?; let mut _state = transaction.state()?; // TODO: execute block on _state @@ -197,9 +202,9 @@ impl Client where } /// Get block status. - pub fn block_status(&self, hash: &block::HeaderHash) -> error::Result { + pub fn block_status(&self, id: &BlockId) -> error::Result { // TODO: more efficient implementation - match self.backend.blockchain().header(BlockId::Hash(*hash)).map_err(|e| error::Error::from_blockchain(Box::new(e)))?.is_some() { + match self.backend.blockchain().header(*id).map_err(|e| error::Error::from_blockchain(Box::new(e)))?.is_some() { true => Ok(BlockStatus::InChain), false => Ok(BlockStatus::Unknown), } @@ -210,8 +215,13 @@ impl Client where self.backend.blockchain().hash(block_number) } - /// Get block header by hash. - pub fn header(&self, hash: &block::HeaderHash) -> error::Result> { - self.backend.blockchain().header(BlockId::Hash(*hash)) + /// Get block header by id. + pub fn header(&self, id: &BlockId) -> error::Result> { + self.backend.blockchain().header(*id) + } + + /// Get block body by id. + pub fn body(&self, id: &BlockId) -> error::Result> { + self.backend.blockchain().body(*id) } } diff --git a/substrate/substrate/executor/src/wasm_executor.rs b/substrate/substrate/executor/src/wasm_executor.rs index e9ed255f19..60f437081a 100644 --- a/substrate/substrate/executor/src/wasm_executor.rs +++ b/substrate/substrate/executor/src/wasm_executor.rs @@ -330,9 +330,8 @@ impl CodeExecutor for WasmExecutor { mod tests { use super::*; use rustc_hex::FromHex; - use codec::{Slicable, Joiner}; + use codec::Slicable; use state_machine::TestExternalities; - use primitives::Header; #[test] fn returning_should_work() { diff --git a/substrate/polkadot/network/Cargo.toml b/substrate/substrate/network/Cargo.toml similarity index 79% rename from substrate/polkadot/network/Cargo.toml rename to substrate/substrate/network/Cargo.toml index e016be7d30..55edfb7d54 100644 --- a/substrate/polkadot/network/Cargo.toml +++ b/substrate/substrate/network/Cargo.toml @@ -1,6 +1,6 @@ [package] description = "Polkadot network protocol" -name = "polkadot-network" +name = "substrate-network" version = "0.1.0" license = "GPL-3.0" authors = ["Parity Technologies "] @@ -9,13 +9,8 @@ authors = ["Parity Technologies "] [dependencies] log = "0.3" -env_logger = "0.4" rand = "0.3" -heapsize = "0.4" -semver = "0.6" -smallvec = { version = "0.4", features = ["heapsizeof"] } parking_lot = "0.4" -ipnetwork = "0.12" error-chain = "0.11" bitflags = "1.0" serde = "1.0" @@ -27,4 +22,7 @@ substrate-primitives = { path = "../../substrate/primitives" } substrate-client = { path = "../../substrate/client" } substrate-state-machine = { path = "../../substrate/state-machine" } substrate-serializer = { path = "../../substrate/serializer" } -polkadot-primitives = { path = "../primitives" } + +[dev-dependencies] +substrate-executor = { path = "../../substrate/executor" } +env_logger = "0.4" diff --git a/substrate/polkadot/network/src/blocks.rs b/substrate/substrate/network/src/blocks.rs similarity index 98% rename from substrate/polkadot/network/src/blocks.rs rename to substrate/substrate/network/src/blocks.rs index 8d0118222b..7cb472ec54 100644 --- a/substrate/polkadot/network/src/blocks.rs +++ b/substrate/substrate/network/src/blocks.rs @@ -126,9 +126,10 @@ impl BlockCollection { // crop to peers best if range.start >= peer_best { + trace!(target: "sync", "Out of range for peer {} ({} vs {})", peer_id, range.start, peer_best); return None; } - range.end = cmp::min(peer_best, range.end); + range.end = cmp::min(peer_best + 1, range.end); self.peer_requests.insert(peer_id, range.start); self.blocks.insert(range.start, BlockRangeState::Downloading{ len: range.end - range.start, downloading: downloading + 1 }); @@ -233,7 +234,7 @@ mod test { bc.clear_peer_download(peer1); bc.insert(41, blocks[41..81].to_vec(), peer1); assert_eq!(bc.drain(1), vec![]); - assert_eq!(bc.needed_blocks(peer1, 40, 150, 0), Some(121 .. 150)); + assert_eq!(bc.needed_blocks(peer1, 40, 150, 0), Some(121 .. 151)); bc.clear_peer_download(peer0); bc.insert(1, blocks[1..11].to_vec(), peer0); diff --git a/substrate/polkadot/network/src/chain.rs b/substrate/substrate/network/src/chain.rs similarity index 65% rename from substrate/polkadot/network/src/chain.rs rename to substrate/substrate/network/src/chain.rs index 9390b727ec..709a988060 100644 --- a/substrate/polkadot/network/src/chain.rs +++ b/substrate/substrate/network/src/chain.rs @@ -16,7 +16,7 @@ //! Blockchain access trait -use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus}; +use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus, BlockId}; use client::error::Error; use state_machine; use primitives::block; @@ -29,29 +29,44 @@ pub trait Client : Send + Sync { fn info(&self) -> Result; /// Get block status. - fn block_status(&self, hash: &block::HeaderHash) -> Result; + fn block_status(&self, id: &BlockId) -> Result; /// Get block hash by number. fn block_hash(&self, block_number: block::Number) -> Result, Error>; + + /// Get block header. + fn header(&self, id: &BlockId) -> Result, Error>; + + /// Get block body. + fn body(&self, id: &BlockId) -> Result, Error>; } impl Client for PolkadotClient where B: client::backend::Backend + Send + Sync + 'static, E: state_machine::CodeExecutor + Send + Sync + 'static, -{ + Error: From<<::State as state_machine::backend::Backend>::Error>, { + fn import(&self, header: block::Header, body: Option) -> Result { - (self as &Client).import(header, body) + (self as &PolkadotClient).import_block(header, body) } fn info(&self) -> Result { - (self as &Client).info() + (self as &PolkadotClient).info() } - fn block_status(&self, hash: &block::HeaderHash) -> Result { - (self as &Client).block_status(hash) + fn block_status(&self, id: &BlockId) -> Result { + (self as &PolkadotClient).block_status(id) } fn block_hash(&self, block_number: block::Number) -> Result, Error> { - (self as &Client).block_hash(block_number) + (self as &PolkadotClient).block_hash(block_number) + } + + fn header(&self, id: &BlockId) -> Result, Error> { + (self as &PolkadotClient).header(id) + } + + fn body(&self, id: &BlockId) -> Result, Error> { + (self as &PolkadotClient).body(id) } } diff --git a/substrate/polkadot/network/src/config.rs b/substrate/substrate/network/src/config.rs similarity index 98% rename from substrate/polkadot/network/src/config.rs rename to substrate/substrate/network/src/config.rs index b826213280..e269d3cd51 100644 --- a/substrate/polkadot/network/src/config.rs +++ b/substrate/substrate/network/src/config.rs @@ -17,6 +17,7 @@ use service::Role; /// Protocol configuration +#[derive(Clone)] pub struct ProtocolConfig { pub roles: Role, } diff --git a/substrate/polkadot/network/src/error.rs b/substrate/substrate/network/src/error.rs similarity index 100% rename from substrate/polkadot/network/src/error.rs rename to substrate/substrate/network/src/error.rs diff --git a/substrate/polkadot/network/src/io.rs b/substrate/substrate/network/src/io.rs similarity index 100% rename from substrate/polkadot/network/src/io.rs rename to substrate/substrate/network/src/io.rs diff --git a/substrate/polkadot/network/src/lib.rs b/substrate/substrate/network/src/lib.rs similarity index 88% rename from substrate/polkadot/network/src/lib.rs rename to substrate/substrate/network/src/lib.rs index bdd7b41ca6..2897dbcba8 100644 --- a/substrate/polkadot/network/src/lib.rs +++ b/substrate/substrate/network/src/lib.rs @@ -21,21 +21,14 @@ extern crate ethcore_network as network; extern crate ethcore_io as core_io; -extern crate env_logger; extern crate rand; -extern crate semver; extern crate parking_lot; -extern crate smallvec; -extern crate ipnetwork; extern crate substrate_primitives as primitives; extern crate substrate_state_machine as state_machine; extern crate substrate_serializer as ser; +extern crate substrate_client as client; extern crate serde; extern crate serde_json; -// TODO: remove these two; split off dependent logic into polkadot-network and rename this crate -// to substrate-network. -extern crate polkadot_primitives as polkadot_primitives; -extern crate substrate_client as client; #[macro_use] extern crate serde_derive; #[macro_use] extern crate log; #[macro_use] extern crate bitflags; @@ -54,8 +47,14 @@ mod blocks; #[cfg(test)] mod test; +#[cfg(test)] +extern crate substrate_executor; +#[cfg(test)] +extern crate env_logger; + pub use service::Service; pub use protocol::{ProtocolStatus}; +pub use sync::{Status as SyncStatus, SyncState}; pub use network::{NonReservedPeerMode, ConnectionFilter, ConnectionDirection, NetworkConfiguration}; // TODO: move it elsewhere diff --git a/substrate/polkadot/network/src/message.rs b/substrate/substrate/network/src/message.rs similarity index 96% rename from substrate/polkadot/network/src/message.rs rename to substrate/substrate/network/src/message.rs index feefb56595..63c4691cfe 100644 --- a/substrate/polkadot/network/src/message.rs +++ b/substrate/substrate/network/src/message.rs @@ -20,7 +20,6 @@ use std::borrow::Borrow; use primitives::AuthorityId; use primitives::block::{Number as BlockNumber, HeaderHash, Header, Body}; use service::Role as RoleFlags; -use polkadot_primitives::parachain::Id as ParachainId; pub type RequestId = u64; type Bytes = Vec; @@ -152,7 +151,7 @@ pub struct Status { /// Validator address. Required for the validator role. pub validator_id: Option, /// Parachain id. Required for the collator role. - pub parachain_id: Option, + pub parachain_id: Option, } #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] @@ -168,7 +167,7 @@ pub struct BlockRequest { pub to: Option, /// Sequence direction. pub direction: Direction, - /// Maximum number of block to return. An implementation defined maximum is used when unspecified. + /// Maximum number of blocks to return. An implementation defined maximum is used when unspecified. pub max: Option, } diff --git a/substrate/polkadot/network/src/protocol.rs b/substrate/substrate/network/src/protocol.rs similarity index 79% rename from substrate/polkadot/network/src/protocol.rs rename to substrate/substrate/network/src/protocol.rs index bd9dd614fe..006ed39c3c 100644 --- a/substrate/polkadot/network/src/protocol.rs +++ b/substrate/substrate/network/src/protocol.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see .? use std::collections::{HashMap, HashSet, BTreeMap}; -use std::mem; +use std::{mem, cmp}; use std::sync::Arc; use parking_lot::RwLock; use serde_json; @@ -30,10 +30,15 @@ use config::ProtocolConfig; use chain::Client; use io::SyncIo; use error; +use client::BlockId; +use super::header_hash; const REQUEST_TIMEOUT_SEC: u64 = 15; const PROTOCOL_VERSION: u32 = 0; +// Maximum allowed entries in `BlockResponse` +const MAX_BLOCK_DATA_RESPONSE: u32 = 128; + // Lock must always be taken in order declared here. pub struct Protocol { config: ProtocolConfig, @@ -47,7 +52,7 @@ pub struct Protocol { } /// Syncing status and statistics -#[derive(Clone, Copy)] +#[derive(Clone)] pub struct ProtocolStatus { /// Sync status. pub sync: SyncStatus, @@ -74,7 +79,7 @@ struct Peer { /// Holds a set of transactions recently sent to this peer to avoid spamming. _last_sent_transactions: HashSet, /// Request counter, - request_id: message::RequestId, + next_request_id: message::RequestId, } #[derive(Debug)] @@ -157,7 +162,7 @@ impl Protocol { } }; if request.id != r.id { - trace!("Ignoring mismatched response packet from {}", peer_id); + trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", peer_id, request.id, r.id); return; } self.on_block_response(io, peer_id, request, r); @@ -173,10 +178,10 @@ impl Protocol { if let Some(ref mut peer) = peers.get_mut(&peer_id) { match &mut message { &mut Message::BlockRequest(ref mut r) => { + r.id = peer.next_request_id; + peer.next_request_id = peer.next_request_id + 1; peer.block_request = Some(r.clone()); peer.request_timestamp = Some(time::Instant::now()); - r.id = peer.request_id; - peer.request_id = peer.request_id + 1; }, _ => (), } @@ -209,12 +214,59 @@ impl Protocol { } } - pub fn on_block_request(&self, _io: &mut SyncIo, _peer_id: PeerId, _request: message::BlockRequest) { + pub fn on_block_request(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest) { + trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, request.from, request.to, request.max); + let mut blocks = Vec::new(); + let mut id = match request.from { + message::FromBlock::Hash(h) => BlockId::Hash(h), + message::FromBlock::Number(n) => BlockId::Number(n), + }; + let max = cmp::min(request.max.unwrap_or(u32::max_value()), MAX_BLOCK_DATA_RESPONSE) as usize; + // TODO: receipts, etc. + let (mut get_header, mut get_body) = (false, false); + for a in request.fields { + match a { + message::BlockAttribute::Header => get_header = true, + message::BlockAttribute::Body => get_body = true, + message::BlockAttribute::Receipt => unimplemented!(), + message::BlockAttribute::MessageQueue => unimplemented!(), + } + } + while let Some(header) = self.chain.header(&id).unwrap_or(None) { + if blocks.len() >= max{ + break; + } + let number = header.number; + let hash = header_hash(&header); + let block_data = message::BlockData { + hash: hash, + header: if get_header { Some(header) } else { None }, + body: if get_body { self.chain.body(&BlockId::Hash(hash)).unwrap_or(None) } else { None }, + receipt: None, + message_queue: None, + }; + blocks.push(block_data); + match request.direction { + message::Direction::Ascending => id = BlockId::Number(number + 1), + message::Direction::Descending => { + if number == 0 { + break; + } + id = BlockId::Number(number - 1) + } + } + } + let response = message::BlockResponse { + id: request.id, + blocks: blocks, + }; + self.send_message(io, peer, Message::BlockResponse(response)) } - pub fn on_block_response(&self, io: &mut SyncIo, peer_id: PeerId, request: message::BlockRequest, response: message::BlockResponse) { + pub fn on_block_response(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest, response: message::BlockResponse) { // TODO: validate response - self.sync.write().on_block_data(io, self, peer_id, request, response); + trace!(target: "sync", "BlockResponse {} from {} with {} blocks", response.id, peer, response.blocks.len()); + self.sync.write().on_block_data(io, self, peer, request, response); } pub fn tick(&self, io: &mut SyncIo) { @@ -231,7 +283,7 @@ impl Protocol { .filter_map(|(id, peer)| peer.request_timestamp.as_ref().map(|r| (id, r))) .chain(handshaking_peers.iter()) { if (tick - *timestamp).as_secs() > REQUEST_TIMEOUT_SEC { - trace!(target:"sync", "Timeout {}", peer_id); + trace!(target: "sync", "Timeout {}", peer_id); io.disconnect_peer(*peer_id); aborting.push(*peer_id); } @@ -287,7 +339,7 @@ impl Protocol { block_request: None, request_timestamp: None, _last_sent_transactions: HashSet::new(), - request_id: 0, + next_request_id: 0, }; peers.insert(peer_id.clone(), peer); handshaking_peers.remove(&peer_id); diff --git a/substrate/polkadot/network/src/service.rs b/substrate/substrate/network/src/service.rs similarity index 100% rename from substrate/polkadot/network/src/service.rs rename to substrate/substrate/network/src/service.rs diff --git a/substrate/polkadot/network/src/sync.rs b/substrate/substrate/network/src/sync.rs similarity index 82% rename from substrate/polkadot/network/src/sync.rs rename to substrate/substrate/network/src/sync.rs index e7d2847b83..465bffcc31 100644 --- a/substrate/polkadot/network/src/sync.rs +++ b/substrate/substrate/network/src/sync.rs @@ -18,14 +18,14 @@ use std::collections::HashMap; use io::SyncIo; use protocol::Protocol; use network::PeerId; -use client::{ImportResult, BlockStatus, ClientInfo}; +use client::{ImportResult, BlockStatus, ClientInfo, BlockId}; use primitives::block::{HeaderHash, Number as BlockNumber, Header}; use blocks::{self, BlockCollection}; use message::{self, Message}; use super::header_hash; -// Maximum parallel requests for a block. -const MAX_BLOCK_DOWNLOAD: usize = 1; +// Maximum blocks to request in a single packet. +const MAX_BLOCKS_TO_REQUEST: usize = 128; struct PeerSync { pub common_hash: HeaderHash, @@ -36,7 +36,7 @@ struct PeerSync { } #[derive(Copy, Clone, Eq, PartialEq, Debug)] -pub enum PeerSyncState { +enum PeerSyncState { AncestorSearch(BlockNumber), Available, DownloadingNew(BlockNumber), @@ -53,11 +53,26 @@ pub struct ChainSync { required_block_attributes: Vec, } +/// Reported sync state. +#[derive(Clone, Eq, PartialEq, Debug)] +pub enum SyncState { + /// Initial sync is complete, keep-up sync is active. + Idle, + /// Actively catching up with the chain. + Downloading +} + /// Syncing status and statistics -#[derive(Clone, Copy)] -pub struct Status; +#[derive(Clone)] +pub struct Status { + /// Current global sync state. + pub state: SyncState, + /// Target sync block number. + pub best_seen_block: Option, +} impl ChainSync { + /// Create a new instance. pub fn new(info: &ClientInfo) -> ChainSync { ChainSync { genesis_hash: info.chain.genesis_hash, @@ -71,12 +86,20 @@ impl ChainSync { /// Returns sync status pub fn status(&self) -> Status { - Status + let best_seen = self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number); + let state = match &best_seen { + &Some(n) if n > self.best_queued_number && n - self.best_queued_number > 5 => SyncState::Downloading, + _ => SyncState::Idle, + }; + Status { + state: state, + best_seen_block: best_seen, + } } pub fn new_peer(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) { if let Some(info) = protocol.peer_info(peer_id) { - match (protocol.chain().block_status(&info.best_hash), info.best_number) { + match (protocol.chain().block_status(&BlockId::Hash(info.best_hash)), info.best_number) { (Err(e), _) => { debug!(target:"sync", "Error reading blockchain: {:?}", e); io.disconnect_peer(peer_id); @@ -90,15 +113,29 @@ impl ChainSync { io.disable_peer(peer_id); }, (Ok(BlockStatus::Unknown), _) => { - debug!(target:"sync", "New peer with unkown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number); - self.peers.insert(peer_id, PeerSync { - common_hash: self.genesis_hash, - common_number: 0, - best_hash: info.best_hash, - best_number: info.best_number, - state: PeerSyncState::AncestorSearch(info.best_number - 1), - }); - Self::request_ancestry(io, protocol, peer_id, info.best_number - 1) + let our_best = self.best_queued_number; + if our_best > 0 { + debug!(target:"sync", "New peer with unkown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number); + self.peers.insert(peer_id, PeerSync { + common_hash: self.genesis_hash, + common_number: 0, + best_hash: info.best_hash, + best_number: info.best_number, + state: PeerSyncState::AncestorSearch(our_best), + }); + Self::request_ancestry(io, protocol, peer_id, our_best) + } else { + // We are at genesis, just start downloading + debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number); + self.peers.insert(peer_id, PeerSync { + common_hash: self.genesis_hash, + common_number: 0, + best_hash: info.best_hash, + best_number: info.best_number, + state: PeerSyncState::Available, + }); + self.download_new(io, protocol, peer_id) + } }, (Ok(BlockStatus::Queued), _) | (Ok(BlockStatus::InChain), _) => { debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number); @@ -277,7 +314,7 @@ impl ChainSync { fn is_known_or_already_downloading(&self, protocol: &Protocol, hash: &HeaderHash) -> bool { self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) - || protocol.chain().block_status(hash).ok().map_or(false, |s| s != BlockStatus::Unknown) + || protocol.chain().block_status(&BlockId::Hash(*hash)).ok().map_or(false, |s| s != BlockStatus::Unknown) } pub fn peer_disconnected(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) { @@ -334,9 +371,11 @@ impl ChainSync { // Issue a request for a peer to download new blocks, if any are available fn download_new(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) { if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", peer_id, peer.common_number, peer.best_number); match peer.state { PeerSyncState::Available => { - if let Some(range) = self.blocks.needed_blocks(peer_id, MAX_BLOCK_DOWNLOAD, peer.common_number, peer.best_number) { + if let Some(range) = self.blocks.needed_blocks(peer_id, MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) { + trace!(target: "sync", "Requesting blocks from {}, ({} to {})", peer_id, range.start, range.end); let request = message::BlockRequest { id: 0, fields: self.required_block_attributes.clone(), @@ -347,6 +386,8 @@ impl ChainSync { }; peer.state = PeerSyncState::DownloadingNew(range.start); protocol.send_message(io, peer_id, Message::BlockRequest(request)); + } else { + trace!(target: "sync", "Nothing to request"); } }, _ => (), diff --git a/substrate/substrate/network/src/test/mod.rs b/substrate/substrate/network/src/test/mod.rs new file mode 100644 index 0000000000..d5010ed2bf --- /dev/null +++ b/substrate/substrate/network/src/test/mod.rs @@ -0,0 +1,264 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +mod sync; + +use std::collections::{VecDeque, HashSet, HashMap}; +use std::sync::Arc; +use parking_lot::RwLock; +use client::{self, BlockId}; +use primitives::block; +use substrate_executor as executor; +use io::SyncIo; +use protocol::Protocol; +use config::ProtocolConfig; +use network::{PeerId, SessionInfo, Error as NetworkError}; + +pub struct TestIo<'p> { + pub queue: &'p RwLock>, + pub sender: Option, + pub to_disconnect: HashSet, + pub packets: Vec, + pub peers_info: HashMap, +} + +impl<'p> TestIo<'p> where { + pub fn new(queue: &'p RwLock>, sender: Option) -> TestIo<'p> { + TestIo { + queue: queue, + sender: sender, + to_disconnect: HashSet::new(), + packets: Vec::new(), + peers_info: HashMap::new(), + } + } +} + +impl<'p> Drop for TestIo<'p> { + fn drop(&mut self) { + self.queue.write().extend(self.packets.drain(..)); + } +} + +impl<'p> SyncIo for TestIo<'p> { + fn disable_peer(&mut self, peer_id: PeerId) { + self.disconnect_peer(peer_id); + } + + fn disconnect_peer(&mut self, peer_id: PeerId) { + self.to_disconnect.insert(peer_id); + } + + fn is_expired(&self) -> bool { + false + } + + fn send(&mut self, peer_id: PeerId, data: Vec) -> Result<(), NetworkError> { + self.packets.push(TestPacket { + data: data, + recipient: peer_id, + }); + Ok(()) + } + + fn peer_info(&self, peer_id: PeerId) -> String { + self.peers_info.get(&peer_id) + .cloned() + .unwrap_or_else(|| peer_id.to_string()) + } + + fn peer_session_info(&self, _peer_id: PeerId) -> Option { + None + } +} + +/// Mocked subprotocol packet +pub struct TestPacket { + pub data: Vec, + pub recipient: PeerId, +} + +pub struct Peer { + pub chain: Arc>, + pub sync: Protocol, + pub queue: RwLock>, +} + +impl Peer { + /// Called after blockchain has been populated to updated current state. + fn start(&self) { + // Update the sync state to the lates chain state. + let info = self.chain.info().expect("In-mem chain does not fail"); + let header = self.chain.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap(); + self.sync.on_block_imported(&header); + } + + /// Called on connection to other indicated peer. + fn on_connect(&self, other: PeerId) { + self.sync.on_peer_connected(&mut TestIo::new(&self.queue, Some(other)), other); + } + + /// Called on disconnect from other indicated peer. + fn on_disconnect(&self, other: PeerId) { + let mut io = TestIo::new(&self.queue, Some(other)); + self.sync.on_peer_disconnected(&mut io, other); + } + + /// Receive a message from another peer. Return a set of peers to disconnect. + fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet { + let mut io = TestIo::new(&self.queue, Some(from)); + self.sync.handle_packet(&mut io, from, &msg.data); + self.flush(); + io.to_disconnect.clone() + } + + /// Produce the next pending message to send to another peer. + fn pending_message(&self) -> Option { + self.flush(); + self.queue.write().pop_front() + } + + /// Whether this peer is done syncing (has no messages to send). + fn is_done(&self) -> bool { + self.queue.read().is_empty() + } + + /// Execute a "sync step". This is called for each peer after it sends a packet. + fn sync_step(&self) { + self.flush(); + self.sync.tick(&mut TestIo::new(&self.queue, None)); + } + + /// Restart sync for a peer. + fn restart_sync(&self) { + self.sync.abort(); + } + + fn flush(&self) { + } +} + +pub struct TestNet { + pub peers: Vec>, + pub started: bool, + pub disconnect_events: Vec<(PeerId, PeerId)>, //disconnected (initiated by, to) +} + +impl TestNet { + pub fn new(n: usize) -> Self { + Self::new_with_config(n, ProtocolConfig::default()) + } + + pub fn new_with_config(n: usize, config: ProtocolConfig) -> Self { + let mut net = TestNet { + peers: Vec::new(), + started: false, + disconnect_events: Vec::new(), + }; + let test_genesis_block = block::Header { + parent_hash: 0.into(), + number: 0, + state_root: 0.into(), + transaction_root: Default::default(), + digest: Default::default(), + }; + + for _ in 0..n { + let chain = Arc::new(client::new_in_mem(executor::WasmExecutor, + || (test_genesis_block.clone(), vec![])).unwrap()); + let sync = Protocol::new(config.clone(), chain.clone()).unwrap(); + net.peers.push(Arc::new(Peer { + sync: sync, + chain: chain, + queue: RwLock::new(VecDeque::new()), + })); + } + net + } + + pub fn peer(&self, i: usize) -> &Peer { + &self.peers[i] + } + + pub fn start(&mut self) { + if self.started { + return; + } + for peer in 0..self.peers.len() { + self.peers[peer].start(); + for client in 0..self.peers.len() { + if peer != client { + self.peers[peer].on_connect(client as PeerId); + } + } + } + self.started = true; + } + + pub fn sync_step(&mut self) { + for peer in 0..self.peers.len() { + let packet = self.peers[peer].pending_message(); + if let Some(packet) = packet { + let disconnecting = { + let recipient = packet.recipient; + trace!("--- {} -> {} ---", peer, recipient); + let to_disconnect = self.peers[recipient].receive_message(peer as PeerId, packet); + for d in &to_disconnect { + // notify this that disconnecting peers are disconnecting + self.peers[recipient].on_disconnect(*d as PeerId); + self.disconnect_events.push((peer, *d)); + } + to_disconnect + }; + for d in &disconnecting { + // notify other peers that this peer is disconnecting + self.peers[*d].on_disconnect(peer as PeerId); + } + } + + self.sync_step_peer(peer); + } + } + + pub fn sync_step_peer(&mut self, peer_num: usize) { + self.peers[peer_num].sync_step(); + } + + pub fn restart_peer(&mut self, i: usize) { + self.peers[i].restart_sync(); + } + + pub fn sync(&mut self) -> u32 { + self.start(); + let mut total_steps = 0; + while !self.done() { + self.sync_step(); + total_steps += 1; + } + total_steps + } + + pub fn sync_steps(&mut self, count: usize) { + self.start(); + for _ in 0..count { + self.sync_step(); + } + } + + pub fn done(&self) -> bool { + self.peers.iter().all(|p| p.is_done()) + } +} diff --git a/substrate/substrate/network/src/test/sync.rs b/substrate/substrate/network/src/test/sync.rs new file mode 100644 index 0000000000..54167a2c5e --- /dev/null +++ b/substrate/substrate/network/src/test/sync.rs @@ -0,0 +1,87 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use client::backend::Backend; +use sync::SyncState; +use super::*; + +#[test] +fn sync_from_two_peers_works() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + net.peer(1).chain.backend().push_blocks(100); + net.peer(2).chain.backend().push_blocks(100); + net.sync(); + assert!(net.peer(0).chain.backend().blockchain().equals_to(net.peer(1).chain.backend().blockchain())); + let status = net.peer(0).sync.status(); + assert_eq!(status.sync.state, SyncState::Idle); +} + +#[test] +fn sync_from_two_peers_with_ancestry_search_works() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + net.peer(0).chain.backend().generate_blocks(10, |header| header.state_root = 42.into()); + net.peer(1).chain.backend().push_blocks(100); + net.peer(2).chain.backend().push_blocks(100); + net.restart_peer(0); + net.sync(); + assert!(net.peer(0).chain.backend().blockchain().canon_equals_to(net.peer(1).chain.backend().blockchain())); +} + +#[test] +fn sync_long_chain_works() { + let mut net = TestNet::new(2); + net.peer(1).chain.backend().push_blocks(5000); + net.sync_steps(3); + assert_eq!(net.peer(0).sync.status().sync.state, SyncState::Downloading); + net.sync(); + assert!(net.peer(0).chain.backend().blockchain().equals_to(net.peer(1).chain.backend().blockchain())); +} + +#[test] +fn sync_no_common_longer_chain_fails() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + net.peer(0).chain.backend().generate_blocks(200, |header| header.state_root = 42.into()); + net.peer(1).chain.backend().push_blocks(200); + net.sync(); + assert!(!net.peer(0).chain.backend().blockchain().canon_equals_to(net.peer(1).chain.backend().blockchain())); +} + +#[test] +fn sync_after_fork_works() { + ::env_logger::init().ok(); + let mut net = TestNet::new(3); + net.peer(0).chain.backend().push_blocks(30); + net.peer(1).chain.backend().push_blocks(30); + net.peer(2).chain.backend().push_blocks(30); + + net.peer(0).chain.backend().generate_blocks(10, |header| header.state_root = 42.into()); // fork + net.peer(1).chain.backend().push_blocks(20); + net.peer(2).chain.backend().push_blocks(20); + + net.peer(1).chain.backend().generate_blocks(10, |header| header.state_root = 42.into()); // second fork between 1 and 2 + net.peer(2).chain.backend().push_blocks(1); + + // peer 1 has the best chain + let peer1_chain = net.peer(1).chain.backend().blockchain().clone(); + net.sync(); + assert!(net.peer(0).chain.backend().blockchain().canon_equals_to(&peer1_chain)); + assert!(net.peer(1).chain.backend().blockchain().canon_equals_to(&peer1_chain)); + assert!(net.peer(2).chain.backend().blockchain().canon_equals_to(&peer1_chain)); +} + diff --git a/substrate/substrate/network/test/mod.rs b/substrate/substrate/network/test/mod.rs new file mode 100644 index 0000000000..43f3abb095 --- /dev/null +++ b/substrate/substrate/network/test/mod.rs @@ -0,0 +1,254 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +mod sync; + +use std::collections::{VecDeque, HashSet, HashMap}; +use std::sync::Arc; +use parking_lot::RwLock; +use client::{self, BlockId}; +use substrate_executor as executor; +use io::SyncIo; +use protocol::Protocol; +use config::ProtocolConfig; +use network::{PeerId, SessionInfo, Error as NetworkError}; + +pub struct TestIo<'p> { + pub queue: &'p RwLock>, + pub sender: Option, + pub to_disconnect: HashSet, + pub packets: Vec, + pub peers_info: HashMap, +} + +impl<'p> TestIo<'p> where { + pub fn new(queue: &'p RwLock>, sender: Option) -> TestIo<'p> { + TestIo { + queue: queue, + sender: sender, + to_disconnect: HashSet::new(), + packets: Vec::new(), + peers_info: HashMap::new(), + } + } +} + +impl<'p> Drop for TestIo<'p> { + fn drop(&mut self) { + self.queue.write().extend(self.packets.drain(..)); + } +} + +impl<'p> SyncIo for TestIo<'p> { + fn disable_peer(&mut self, peer_id: PeerId) { + self.disconnect_peer(peer_id); + } + + fn disconnect_peer(&mut self, peer_id: PeerId) { + self.to_disconnect.insert(peer_id); + } + + fn is_expired(&self) -> bool { + false + } + + fn send(&mut self, peer_id: PeerId, data: Vec) -> Result<(), NetworkError> { + self.packets.push(TestPacket { + data: data, + recipient: peer_id, + }); + Ok(()) + } + + fn peer_info(&self, peer_id: PeerId) -> String { + self.peers_info.get(&peer_id) + .cloned() + .unwrap_or_else(|| peer_id.to_string()) + } + + fn peer_session_info(&self, _peer_id: PeerId) -> Option { + None + } +} + +/// Mocked subprotocol packet +pub struct TestPacket { + pub data: Vec, + pub recipient: PeerId, +} + +pub struct Peer { + pub chain: Arc>, + pub sync: Protocol, + pub queue: RwLock>, +} + +impl Peer { + /// Called after blockchain has been populated to updated current state. + fn start(&self) { + // Update the sync state to the lates chain state. + let info = self.chain.info().expect("In-mem chain does not fail"); + let header = self.chain.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap(); + self.sync.on_block_imported(&header); + } + + /// Called on connection to other indicated peer. + fn on_connect(&self, other: PeerId) { + self.sync.on_peer_connected(&mut TestIo::new(&self.queue, Some(other)), other); + } + + /// Called on disconnect from other indicated peer. + fn on_disconnect(&self, other: PeerId) { + let mut io = TestIo::new(&self.queue, Some(other)); + self.sync.on_peer_disconnected(&mut io, other); + } + + /// Receive a message from another peer. Return a set of peers to disconnect. + fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet { + let mut io = TestIo::new(&self.queue, Some(from)); + self.sync.handle_packet(&mut io, from, &msg.data); + self.flush(); + io.to_disconnect.clone() + } + + /// Produce the next pending message to send to another peer. + fn pending_message(&self) -> Option { + self.flush(); + self.queue.write().pop_front() + } + + /// Whether this peer is done syncing (has no messages to send). + fn is_done(&self) -> bool { + self.queue.read().is_empty() + } + + /// Execute a "sync step". This is called for each peer after it sends a packet. + fn sync_step(&self) { + self.flush(); + self.sync.tick(&mut TestIo::new(&self.queue, None)); + } + + /// Restart sync for a peer. + fn restart_sync(&self) { + self.sync.abort(); + } + + fn flush(&self) { + } +} + +pub struct TestNet { + pub peers: Vec>, + pub started: bool, + pub disconnect_events: Vec<(PeerId, PeerId)>, //disconnected (initiated by, to) +} + +impl TestNet { + pub fn new(n: usize) -> Self { + Self::new_with_config(n, ProtocolConfig::default()) + } + + pub fn new_with_config(n: usize, config: ProtocolConfig) -> Self { + let mut net = TestNet { + peers: Vec::new(), + started: false, + disconnect_events: Vec::new(), + }; + for _ in 0..n { + let chain = Arc::new(client::new_in_mem(executor::executor()).unwrap()); + let sync = Protocol::new(config.clone(), chain.clone()).unwrap(); + net.peers.push(Arc::new(Peer { + sync: sync, + chain: chain, + queue: RwLock::new(VecDeque::new()), + })); + } + net + } + + pub fn peer(&self, i: usize) -> &Peer { + &self.peers[i] + } + + pub fn start(&mut self) { + if self.started { + return; + } + for peer in 0..self.peers.len() { + self.peers[peer].start(); + for client in 0..self.peers.len() { + if peer != client { + self.peers[peer].on_connect(client as PeerId); + } + } + } + self.started = true; + } + + pub fn sync_step(&mut self) { + for peer in 0..self.peers.len() { + let packet = self.peers[peer].pending_message(); + if let Some(packet) = packet { + let disconnecting = { + let recipient = packet.recipient; + trace!("--- {} -> {} ---", peer, recipient); + let to_disconnect = self.peers[recipient].receive_message(peer as PeerId, packet); + for d in &to_disconnect { + // notify this that disconnecting peers are disconnecting + self.peers[recipient].on_disconnect(*d as PeerId); + self.disconnect_events.push((peer, *d)); + } + to_disconnect + }; + for d in &disconnecting { + // notify other peers that this peer is disconnecting + self.peers[*d].on_disconnect(peer as PeerId); + } + } + + self.sync_step_peer(peer); + } + } + + pub fn sync_step_peer(&mut self, peer_num: usize) { + self.peers[peer_num].sync_step(); + } + + pub fn restart_peer(&mut self, i: usize) { + self.peers[i].restart_sync(); + } + + pub fn sync(&mut self) -> u32 { + self.start(); + let mut total_steps = 0; + while !self.done() { + self.sync_step(); + total_steps += 1; + } + total_steps + } + + pub fn sync_steps(&mut self, count: usize) { + self.start(); + for _ in 0..count { + self.sync_step(); + } + } + + pub fn done(&self) -> bool { + self.peers.iter().all(|p| p.is_done()) + } +} diff --git a/substrate/substrate/rpc/src/chain/mod.rs b/substrate/substrate/rpc/src/chain/mod.rs index 473b53aff0..ef82769614 100644 --- a/substrate/substrate/rpc/src/chain/mod.rs +++ b/substrate/substrate/rpc/src/chain/mod.rs @@ -42,6 +42,6 @@ impl ChainApi for client::Client where client::error::Error: From<<::State as state_machine::backend::Backend>::Error>, { fn header(&self, hash: block::HeaderHash) -> Result> { - client::Client::header(self, &hash).chain_err(|| "Blockchain error") + client::Client::header(self, &client::BlockId::Hash(hash)).chain_err(|| "Blockchain error") } } diff --git a/substrate/substrate/rpc/src/state/mod.rs b/substrate/substrate/rpc/src/state/mod.rs index 2549f86a85..de643de7c7 100644 --- a/substrate/substrate/rpc/src/state/mod.rs +++ b/substrate/substrate/rpc/src/state/mod.rs @@ -21,7 +21,7 @@ mod error; #[cfg(test)] mod tests; -use client::{self, Client}; +use client::{self, Client, BlockId}; use primitives::block; use primitives::storage::{StorageKey, StorageData}; use state_machine; @@ -47,10 +47,10 @@ impl StateApi for Client where client::error::Error: From<<::State as state_machine::backend::Backend>::Error>, { fn storage(&self, key: StorageKey, block: block::HeaderHash) -> Result { - Ok(self.storage(&block, &key)?) + Ok(self.storage(&BlockId::Hash(block), &key)?) } fn call(&self, method: String, data: Vec, block: block::HeaderHash) -> Result> { - Ok(self.call(&block, &method, &data)?.return_data) + Ok(self.call(&BlockId::Hash(block), &method, &data)?.return_data) } }