Phase 1 of repo reorg (#719)

* Remove unneeded script

* Rename Substrate Demo -> Substrate

* Rename demo -> node

* Build wasm from last rename.

* Merge ed25519 into substrate-primitives

* Minor tweak

* Rename substrate -> core

* Move substrate-runtime-support to core/runtime/support

* Rename/move substrate-runtime-version

* Move codec up a level

* Rename substrate-codec -> parity-codec

* Move environmental up a level

* Move pwasm-* up to top, ready for removal

* Remove requirement of s-r-support from s-r-primitives

* Move core/runtime/primitives into core/runtime-primitives

* Remove s-r-support dep from s-r-version

* Remove dep of s-r-support from bft

* Remove dep of s-r-support from node/consensus

* Sever all other core deps from s-r-support

* Forgot the no_std directive

* Rename non-SRML modules to sr-* to avoid match clashes

* Move runtime/* to srml/*

* Rename substrate-runtime-* -> srml-*

* Move srml to top-level
This commit is contained in:
Gav Wood
2018-09-12 11:13:31 +02:00
committed by Arkadiy Paronyan
parent 8fe5aa4c81
commit 1e01162505
374 changed files with 2845 additions and 2902 deletions
+282
View File
@@ -0,0 +1,282 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::mem;
use std::cmp;
use std::ops::Range;
use std::collections::{HashMap, BTreeMap};
use std::collections::hash_map::Entry;
use network_libp2p::NodeIndex;
use runtime_primitives::traits::{Block as BlockT, NumberFor, As};
use message;
const MAX_PARALLEL_DOWNLOADS: u32 = 1;
/// Block data with origin.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockData<B: BlockT> {
pub block: message::BlockData<B>,
pub origin: NodeIndex,
}
#[derive(Debug)]
enum BlockRangeState<B: BlockT> {
Downloading {
len: NumberFor<B>,
downloading: u32,
},
Complete(Vec<BlockData<B>>),
}
impl<B: BlockT> BlockRangeState<B> {
pub fn len(&self) -> NumberFor<B> {
match *self {
BlockRangeState::Downloading { len, .. } => len,
BlockRangeState::Complete(ref blocks) => As::sa(blocks.len() as u64),
}
}
}
/// A collection of blocks being downloaded.
#[derive(Default)]
pub struct BlockCollection<B: BlockT> {
/// Downloaded blocks.
blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>,
peer_requests: HashMap<NodeIndex, NumberFor<B>>,
}
impl<B: BlockT> BlockCollection<B> {
/// Create a new instance.
pub fn new() -> Self {
BlockCollection {
blocks: BTreeMap::new(),
peer_requests: HashMap::new(),
}
}
/// Clear everything.
pub fn clear(&mut self) {
self.blocks.clear();
self.peer_requests.clear();
}
/// Insert a set of blocks into collection.
pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, who: NodeIndex) {
if blocks.is_empty() {
return;
}
match self.blocks.get(&start) {
Some(&BlockRangeState::Downloading { .. }) => {
trace!(target: "sync", "Ignored block data still marked as being downloaded: {}", start);
debug_assert!(false);
return;
},
Some(&BlockRangeState::Complete(ref existing)) if existing.len() >= blocks.len() => {
trace!(target: "sync", "Ignored block data already downloaded: {}", start);
return;
},
_ => (),
}
self.blocks.insert(start, BlockRangeState::Complete(blocks.into_iter().map(|b| BlockData { origin: who, block: b }).collect()));
}
/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
pub fn needed_blocks(&mut self, who: NodeIndex, count: usize, peer_best: NumberFor<B>, common: NumberFor<B>) -> Option<Range<NumberFor<B>>> {
// First block number that we need to download
let first_different = common + As::sa(1);
let count = As::sa(count as u64);
let (mut range, downloading) = {
let mut downloading_iter = self.blocks.iter().peekable();
let mut prev: Option<(&NumberFor<B>, &BlockRangeState<B>)> = None;
loop {
let next = downloading_iter.next();
break match &(prev, next) {
&(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _) if downloading < MAX_PARALLEL_DOWNLOADS =>
(*start .. *start + *len, downloading),
&(Some((start, r)), Some((next_start, _))) if *start + r.len() < *next_start =>
(*start + r.len() .. cmp::min(*next_start, *start + r.len() + count), 0), // gap
&(Some((start, r)), None) =>
(*start + r.len() .. *start + r.len() + count, 0), // last range
&(None, None) =>
(first_different .. first_different + count, 0), // empty
&(None, Some((start, _))) if *start > first_different =>
(first_different .. cmp::min(first_different + count, *start), 0), // gap at the start
_ => {
prev = next;
continue
},
}
}
};
// crop to peers best
if range.start > peer_best {
trace!(target: "sync", "Out of range for peer {} ({} vs {})", who, range.start, peer_best);
return None;
}
range.end = cmp::min(peer_best + As::sa(1), range.end);
self.peer_requests.insert(who, range.start);
self.blocks.insert(range.start, BlockRangeState::Downloading { len: range.end - range.start, downloading: downloading + 1 });
if range.end <= range.start {
panic!("Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}", range, count, peer_best, common, self.blocks);
}
Some(range)
}
/// Get a valid chain of blocks ordered in descending order and ready for importing into blockchain.
pub fn drain(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
let mut drained = Vec::new();
let mut ranges = Vec::new();
{
let mut prev = from;
for (start, range_data) in &mut self.blocks {
match range_data {
&mut BlockRangeState::Complete(ref mut blocks) if *start <= prev => {
prev = *start + As::sa(blocks.len() as u64);
let mut blocks = mem::replace(blocks, Vec::new());
drained.append(&mut blocks);
ranges.push(*start);
},
_ => break,
}
}
}
for r in ranges {
self.blocks.remove(&r);
}
trace!(target: "sync", "Drained {} blocks", drained.len());
drained
}
pub fn clear_peer_download(&mut self, who: NodeIndex) {
match self.peer_requests.entry(who) {
Entry::Occupied(entry) => {
let start = entry.remove();
let remove = match self.blocks.get_mut(&start) {
Some(&mut BlockRangeState::Downloading { ref mut downloading, .. }) if *downloading > 1 => {
*downloading = *downloading - 1;
false
},
Some(&mut BlockRangeState::Downloading { .. }) => {
true
},
_ => {
debug_assert!(false);
false
}
};
if remove {
self.blocks.remove(&start);
}
},
_ => (),
}
}
}
#[cfg(test)]
mod test {
use super::{BlockCollection, BlockData, BlockRangeState};
use message;
use runtime_primitives::testing::Block as RawBlock;
use primitives::H256;
type Block = RawBlock<u64>;
fn is_empty(bc: &BlockCollection<Block>) -> bool {
bc.blocks.is_empty() &&
bc.peer_requests.is_empty()
}
fn generate_blocks(n: usize) -> Vec<message::BlockData<Block>> {
(0 .. n).map(|_| message::generic::BlockData {
hash: H256::random(),
header: None,
body: None,
message_queue: None,
receipt: None,
justification: None,
}).collect()
}
#[test]
fn create_clear() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
bc.insert(1, generate_blocks(100), 0);
assert!(!is_empty(&bc));
bc.clear();
assert!(is_empty(&bc));
}
#[test]
fn insert_blocks() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
let peer0 = 0;
let peer1 = 1;
let peer2 = 2;
let blocks = generate_blocks(150);
assert_eq!(bc.needed_blocks(peer0, 40, 150, 0), Some(1 .. 41));
assert_eq!(bc.needed_blocks(peer1, 40, 150, 0), Some(41 .. 81));
assert_eq!(bc.needed_blocks(peer2, 40, 150, 0), Some(81 .. 121));
bc.clear_peer_download(peer1);
bc.insert(41, blocks[41..81].to_vec(), peer1);
assert_eq!(bc.drain(1), vec![]);
assert_eq!(bc.needed_blocks(peer1, 40, 150, 0), Some(121 .. 151));
bc.clear_peer_download(peer0);
bc.insert(1, blocks[1..11].to_vec(), peer0);
assert_eq!(bc.needed_blocks(peer0, 40, 150, 0), Some(11 .. 41));
assert_eq!(bc.drain(1), blocks[1..11].iter().map(|b| BlockData { block: b.clone(), origin: 0 }).collect::<Vec<_>>());
bc.clear_peer_download(peer0);
bc.insert(11, blocks[11..41].to_vec(), peer0);
let drained = bc.drain(12);
assert_eq!(drained[..30], blocks[11..41].iter().map(|b| BlockData { block: b.clone(), origin: 0 }).collect::<Vec<_>>()[..]);
assert_eq!(drained[30..], blocks[41..81].iter().map(|b| BlockData { block: b.clone(), origin: 1 }).collect::<Vec<_>>()[..]);
bc.clear_peer_download(peer2);
assert_eq!(bc.needed_blocks(peer2, 40, 150, 80), Some(81 .. 121));
bc.clear_peer_download(peer2);
bc.insert(81, blocks[81..121].to_vec(), peer2);
bc.clear_peer_download(peer1);
bc.insert(121, blocks[121..150].to_vec(), peer1);
assert_eq!(bc.drain(80), vec![]);
let drained = bc.drain(81);
assert_eq!(drained[..40], blocks[81..121].iter().map(|b| BlockData { block: b.clone(), origin: 2 }).collect::<Vec<_>>()[..]);
assert_eq!(drained[40..], blocks[121..150].iter().map(|b| BlockData { block: b.clone(), origin: 1 }).collect::<Vec<_>>()[..]);
}
#[test]
fn large_gap() {
let mut bc: BlockCollection<Block> = BlockCollection::new();
bc.blocks.insert(100, BlockRangeState::Downloading {
len: 128,
downloading: 1,
});
let blocks = generate_blocks(10).into_iter().map(|b| BlockData { block: b, origin: 0 }).collect();
bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
assert_eq!(bc.needed_blocks(0, 128, 10000, 000), Some(1 .. 100));
assert_eq!(bc.needed_blocks(0, 128, 10000, 600), Some(100 + 128 .. 100 + 128 + 128));
}
}
+105
View File
@@ -0,0 +1,105 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Blockchain access trait
use client::{self, Client as SubstrateClient, ImportResult, ClientInfo, BlockStatus, BlockOrigin, CallExecutor};
use client::error::Error;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
use runtime_primitives::generic::BlockId;
use runtime_primitives::bft::Justification;
use primitives::{Blake2Hasher, RlpCodec};
/// Local client abstraction for the network.
pub trait Client<Block: BlockT>: Send + Sync {
/// Import a new block. Parent is supposed to be existing in the blockchain.
fn import(&self, origin: BlockOrigin, header: Block::Header, justification: Justification<Block::Hash>, body: Option<Vec<Block::Extrinsic>>) -> Result<ImportResult, Error>;
/// Get blockchain info.
fn info(&self) -> Result<ClientInfo<Block>, Error>;
/// Get block status.
fn block_status(&self, id: &BlockId<Block>) -> Result<BlockStatus, Error>;
/// Get block hash by number.
fn block_hash(&self, block_number: <Block::Header as HeaderT>::Number) -> Result<Option<Block::Hash>, Error>;
/// Get block header.
fn header(&self, id: &BlockId<Block>) -> Result<Option<Block::Header>, Error>;
/// Get block body.
fn body(&self, id: &BlockId<Block>) -> Result<Option<Vec<Block::Extrinsic>>, Error>;
/// Get block justification.
fn justification(&self, id: &BlockId<Block>) -> Result<Option<Justification<Block::Hash>>, Error>;
/// Get block header proof.
fn header_proof(&self, block_number: <Block::Header as HeaderT>::Number) -> Result<(Block::Header, Vec<Vec<u8>>), Error>;
/// Get storage read execution proof.
fn read_proof(&self, block: &Block::Hash, key: &[u8]) -> Result<Vec<Vec<u8>>, Error>;
/// Get method execution proof.
fn execution_proof(&self, block: &Block::Hash, method: &str, data: &[u8]) -> Result<(Vec<u8>, Vec<Vec<u8>>), Error>;
}
impl<B, E, Block> Client<Block> for SubstrateClient<B, E, Block> where
B: client::backend::Backend<Block, Blake2Hasher, RlpCodec> + Send + Sync + 'static,
E: CallExecutor<Block, Blake2Hasher, RlpCodec> + Send + Sync + 'static,
Block: BlockT,
{
fn import(&self, origin: BlockOrigin, header: Block::Header, justification: Justification<Block::Hash>, body: Option<Vec<Block::Extrinsic>>) -> Result<ImportResult, Error> {
// TODO: defer justification check.
let justified_header = self.check_justification(header, justification.into())?;
(self as &SubstrateClient<B, E, Block>).import_block(origin, justified_header, body)
}
fn info(&self) -> Result<ClientInfo<Block>, Error> {
(self as &SubstrateClient<B, E, Block>).info()
}
fn block_status(&self, id: &BlockId<Block>) -> Result<BlockStatus, Error> {
(self as &SubstrateClient<B, E, Block>).block_status(id)
}
fn block_hash(&self, block_number: <Block::Header as HeaderT>::Number) -> Result<Option<Block::Hash>, Error> {
(self as &SubstrateClient<B, E, Block>).block_hash(block_number)
}
fn header(&self, id: &BlockId<Block>) -> Result<Option<Block::Header>, Error> {
(self as &SubstrateClient<B, E, Block>).header(id)
}
fn body(&self, id: &BlockId<Block>) -> Result<Option<Vec<Block::Extrinsic>>, Error> {
(self as &SubstrateClient<B, E, Block>).body(id)
}
fn justification(&self, id: &BlockId<Block>) -> Result<Option<Justification<Block::Hash>>, Error> {
(self as &SubstrateClient<B, E, Block>).justification(id)
}
fn header_proof(&self, block_number: <Block::Header as HeaderT>::Number) -> Result<(Block::Header, Vec<Vec<u8>>), Error> {
(self as &SubstrateClient<B, E, Block>).header_proof(&BlockId::Number(block_number))
}
fn read_proof(&self, block: &Block::Hash, key: &[u8]) -> Result<Vec<Vec<u8>>, Error> {
(self as &SubstrateClient<B, E, Block>).read_proof(&BlockId::Hash(block.clone()), key)
}
fn execution_proof(&self, block: &Block::Hash, method: &str, data: &[u8]) -> Result<(Vec<u8>, Vec<Vec<u8>>), Error> {
(self as &SubstrateClient<B, E, Block>).execution_proof(&BlockId::Hash(block.clone()), method, data)
}
}
+32
View File
@@ -0,0 +1,32 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
pub use service::Roles;
/// Protocol configuration
#[derive(Clone)]
pub struct ProtocolConfig {
/// Assigned roles.
pub roles: Roles,
}
impl Default for ProtocolConfig {
fn default() -> ProtocolConfig {
ProtocolConfig {
roles: Roles::FULL,
}
}
}
@@ -0,0 +1,382 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Utility for gossip of network messages between authorities.
//! Handles chain-specific and standard BFT messages.
use std::collections::{HashMap, HashSet};
use futures::sync::mpsc;
use std::time::{Instant, Duration};
use network_libp2p::NodeIndex;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
use runtime_primitives::generic::BlockId;
use message::{self, generic::Message as GenericMessage};
use protocol::Context;
use service::Roles;
// TODO: Add additional spam/DoS attack protection.
const MESSAGE_LIFETIME: Duration = Duration::from_secs(600);
struct PeerConsensus<H> {
known_messages: HashSet<H>,
}
/// Consensus messages.
#[derive(Debug, Clone, PartialEq)]
pub enum ConsensusMessage<B: BlockT> {
/// A message concerning BFT agreement
Bft(message::LocalizedBftMessage<B>),
/// A message concerning some chain-specific aspect of consensus
ChainSpecific(Vec<u8>, B::Hash),
}
struct MessageEntry<B: BlockT> {
hash: B::Hash,
message: ConsensusMessage<B>,
instant: Instant,
}
/// Consensus network protocol handler. Manages statements and candidate requests.
pub struct ConsensusGossip<B: BlockT> {
peers: HashMap<NodeIndex, PeerConsensus<B::Hash>>,
message_sink: Option<(mpsc::UnboundedSender<ConsensusMessage<B>>, B::Hash)>,
messages: Vec<MessageEntry<B>>,
message_hashes: HashSet<B::Hash>,
}
impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
/// Create a new instance.
pub fn new() -> Self {
ConsensusGossip {
peers: HashMap::new(),
message_sink: None,
messages: Default::default(),
message_hashes: Default::default(),
}
}
/// Closes all notification streams.
pub fn abort(&mut self) {
self.message_sink = None;
}
/// Handle new connected peer.
pub fn new_peer(&mut self, protocol: &mut Context<B>, who: NodeIndex, roles: Roles) {
if roles.intersects(Roles::AUTHORITY | Roles::FULL) {
trace!(target:"gossip", "Registering {:?} {}", roles, who);
// Send out all known messages.
// TODO: limit by size
let mut known_messages = HashSet::new();
for entry in self.messages.iter() {
known_messages.insert(entry.hash);
let message = match entry.message {
ConsensusMessage::Bft(ref bft) => GenericMessage::BftMessage(bft.clone()),
ConsensusMessage::ChainSpecific(ref msg, _) => GenericMessage::ChainSpecific(msg.clone()),
};
protocol.send_message(who, message);
}
self.peers.insert(who, PeerConsensus {
known_messages,
});
}
}
fn propagate(&mut self, protocol: &mut Context<B>, message: message::Message<B>, hash: B::Hash) {
for (id, ref mut peer) in self.peers.iter_mut() {
if peer.known_messages.insert(hash.clone()) {
trace!(target:"gossip", "Propagating to {}: {:?}", id, message);
protocol.send_message(*id, message.clone());
}
}
}
fn register_message(&mut self, hash: B::Hash, message: ConsensusMessage<B>) {
if self.message_hashes.insert(hash) {
self.messages.push(MessageEntry {
hash,
instant: Instant::now(),
message,
});
}
}
/// Handles incoming BFT message, passing to stream and repropagating.
pub fn on_bft_message(&mut self, protocol: &mut Context<B>, who: NodeIndex, message: message::LocalizedBftMessage<B>) {
if let Some((hash, message)) = self.handle_incoming(protocol, who, ConsensusMessage::Bft(message)) {
// propagate to other peers.
self.multicast(protocol, message, Some(hash));
}
}
/// Handles incoming chain-specific message and repropagates
pub fn on_chain_specific(&mut self, protocol: &mut Context<B>, who: NodeIndex, message: Vec<u8>, parent_hash: B::Hash) {
debug!(target: "gossip", "received chain-specific gossip message");
if let Some((hash, message)) = self.handle_incoming(protocol, who, ConsensusMessage::ChainSpecific(message, parent_hash)) {
debug!(target: "gossip", "handled incoming chain-specific message");
// propagate to other peers.
self.multicast(protocol, message, Some(hash));
}
}
/// Get a stream of messages relevant to consensus on top of a given parent hash.
pub fn messages_for(&mut self, parent_hash: B::Hash) -> mpsc::UnboundedReceiver<ConsensusMessage<B>> {
let (sink, stream) = mpsc::unbounded();
for entry in self.messages.iter() {
let message_matches = match entry.message {
ConsensusMessage::Bft(ref msg) => msg.parent_hash == parent_hash,
ConsensusMessage::ChainSpecific(_, ref h) => h == &parent_hash,
};
if message_matches {
sink.unbounded_send(entry.message.clone()).expect("receiving end known to be open; qed");
}
}
self.message_sink = Some((sink, parent_hash));
stream
}
/// Multicast a chain-specific message to other authorities.
pub fn multicast_chain_specific(&mut self, protocol: &mut Context<B>, message: Vec<u8>, parent_hash: B::Hash) {
trace!(target:"gossip", "sending chain-specific message");
self.multicast(protocol, ConsensusMessage::ChainSpecific(message, parent_hash), None);
}
/// Multicast a BFT message to other authorities
pub fn multicast_bft_message(&mut self, protocol: &mut Context<B>, message: message::LocalizedBftMessage<B>) {
// Broadcast message to all authorities.
trace!(target:"gossip", "Broadcasting BFT message {:?}", message);
self.multicast(protocol, ConsensusMessage::Bft(message), None);
}
/// Call when a peer has been disconnected to stop tracking gossip status.
pub fn peer_disconnected(&mut self, _protocol: &mut Context<B>, who: NodeIndex) {
self.peers.remove(&who);
}
/// Prune old or no longer relevant consensus messages.
/// Supply an optional block hash where consensus is known to have concluded.
pub fn collect_garbage(&mut self, best_hash: Option<&B::Hash>) {
let hashes = &mut self.message_hashes;
let before = self.messages.len();
let now = Instant::now();
self.messages.retain(|entry| {
if entry.instant + MESSAGE_LIFETIME >= now &&
best_hash.map_or(true, |parent_hash|
match entry.message {
ConsensusMessage::Bft(ref msg) => &msg.parent_hash != parent_hash,
ConsensusMessage::ChainSpecific(_, ref h) => h != parent_hash,
})
{
true
} else {
hashes.remove(&entry.hash);
false
}
});
if self.messages.len() != before {
trace!(target:"gossip", "Cleaned up {} stale messages", before - self.messages.len());
}
for (_, ref mut peer) in self.peers.iter_mut() {
peer.known_messages.retain(|h| hashes.contains(h));
}
}
fn handle_incoming(&mut self, protocol: &mut Context<B>, who: NodeIndex, message: ConsensusMessage<B>) -> Option<(B::Hash, ConsensusMessage<B>)> {
let (hash, parent, message) = match message {
ConsensusMessage::Bft(msg) => {
let parent = msg.parent_hash;
let generic = GenericMessage::BftMessage(msg);
(
::protocol::hash_message(&generic),
parent,
match generic {
GenericMessage::BftMessage(msg) => ConsensusMessage::Bft(msg),
_ => panic!("`generic` is known to be the `BftMessage` variant; qed"),
}
)
}
ConsensusMessage::ChainSpecific(msg, parent) => {
let generic = GenericMessage::ChainSpecific(msg);
(
::protocol::hash_message::<B>(&generic),
parent,
match generic {
GenericMessage::ChainSpecific(msg) => ConsensusMessage::ChainSpecific(msg, parent),
_ => panic!("`generic` is known to be the `ChainSpecific` variant; qed"),
}
)
}
};
if self.message_hashes.contains(&hash) {
trace!(target:"gossip", "Ignored already known message from {}", who);
return None;
}
match (protocol.client().info(), protocol.client().header(&BlockId::Hash(parent))) {
(_, Err(e)) | (Err(e), _) => {
debug!(target:"gossip", "Error reading blockchain: {:?}", e);
return None;
},
(Ok(info), Ok(Some(header))) => {
if header.number() < &info.chain.best_number {
trace!(target:"gossip", "Ignored ancient message from {}, hash={}", who, parent);
return None;
}
},
(Ok(_), Ok(None)) => {},
}
if let Some(ref mut peer) = self.peers.get_mut(&who) {
peer.known_messages.insert(hash);
if let Some((sink, parent_hash)) = self.message_sink.take() {
if parent == parent_hash {
debug!(target: "gossip", "Pushing relevant consensus message to sink.");
if let Err(e) = sink.unbounded_send(message.clone()) {
trace!(target:"gossip", "Error broadcasting message notification: {:?}", e);
}
}
self.message_sink = Some((sink, parent_hash));
}
} else {
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
return None;
}
Some((hash, message))
}
fn multicast(&mut self, protocol: &mut Context<B>, message: ConsensusMessage<B>, hash: Option<B::Hash>) {
let generic = match message {
ConsensusMessage::Bft(ref message) => GenericMessage::BftMessage(message.clone()),
ConsensusMessage::ChainSpecific(ref message, _) => GenericMessage::ChainSpecific(message.clone()),
};
let hash = hash.unwrap_or_else(|| ::protocol::hash_message(&generic));
self.register_message(hash, message);
self.propagate(protocol, generic, hash);
}
}
#[cfg(test)]
mod tests {
use runtime_primitives::bft::Justification;
use runtime_primitives::testing::{H256, Header, Block as RawBlock};
use std::time::Instant;
use message::{self, generic};
use super::*;
type Block = RawBlock<u64>;
#[test]
fn collects_garbage() {
let prev_hash = H256::random();
let best_hash = H256::random();
let mut consensus = ConsensusGossip::<Block>::new();
let now = Instant::now();
let m1_hash = H256::random();
let m2_hash = H256::random();
let m1 = ConsensusMessage::Bft(message::LocalizedBftMessage {
parent_hash: prev_hash,
message: message::generic::BftMessage::Auxiliary(Justification {
round_number: 0,
hash: Default::default(),
signatures: Default::default(),
}),
});
let m2 = ConsensusMessage::ChainSpecific(vec![1, 2, 3], best_hash);
macro_rules! push_msg {
($hash:expr, $now: expr, $m:expr) => {
consensus.messages.push(MessageEntry {
hash: $hash,
instant: $now,
message: $m,
})
}
}
push_msg!(m1_hash, now, m1);
push_msg!(m2_hash, now, m2.clone());
consensus.message_hashes.insert(m1_hash);
consensus.message_hashes.insert(m2_hash);
// nothing to collect
consensus.collect_garbage(None);
assert_eq!(consensus.messages.len(), 2);
assert_eq!(consensus.message_hashes.len(), 2);
// random header, nothing should be cleared
let mut header = Header {
parent_hash: H256::default(),
number: 0,
state_root: H256::default(),
extrinsics_root: H256::default(),
digest: Default::default(),
};
consensus.collect_garbage(Some(&H256::default()));
assert_eq!(consensus.messages.len(), 2);
assert_eq!(consensus.message_hashes.len(), 2);
// header that matches one of the messages
header.parent_hash = prev_hash;
consensus.collect_garbage(Some(&prev_hash));
assert_eq!(consensus.messages.len(), 1);
assert_eq!(consensus.message_hashes.len(), 1);
assert!(consensus.message_hashes.contains(&m2_hash));
// make timestamp expired
consensus.messages.clear();
push_msg!(m2_hash, now - MESSAGE_LIFETIME, m2);
consensus.collect_garbage(None);
assert!(consensus.messages.is_empty());
assert!(consensus.message_hashes.is_empty());
}
#[test]
fn message_stream_include_those_sent_before_asking_for_stream() {
use futures::Stream;
let mut consensus = ConsensusGossip::new();
let bft_message = generic::BftMessage::Consensus(generic::SignedConsensusMessage::Vote(generic::SignedConsensusVote {
vote: generic::ConsensusVote::AdvanceRound(0),
sender: [0; 32].into(),
signature: Default::default(),
}));
let parent_hash = [1; 32].into();
let localized = ::message::LocalizedBftMessage::<Block> {
message: bft_message,
parent_hash: parent_hash,
};
let message = generic::Message::BftMessage(localized.clone());
let message_hash = ::protocol::hash_message::<Block>(&message);
let message = ConsensusMessage::Bft(localized);
consensus.register_message(message_hash, message.clone());
let stream = consensus.messages_for(parent_hash);
assert_eq!(stream.wait().next(), Some(Ok(message)));
}
}
+35
View File
@@ -0,0 +1,35 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Polkadot service possible errors.
use std::io::Error as IoError;
use network_libp2p::Error as NetworkError;
use client;
error_chain! {
foreign_links {
Network(NetworkError) #[doc = "Devp2p error."];
Io(IoError) #[doc = "IO error."];
}
links {
Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"];
}
errors {
}
}
+593
View File
@@ -0,0 +1,593 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Blocks import queue.
use std::collections::{HashSet, VecDeque};
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use parking_lot::{Condvar, Mutex, RwLock};
use client::{BlockOrigin, ImportResult};
use network_libp2p::{NodeIndex, Severity};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero};
use blocks::BlockData;
use chain::Client;
use error::{ErrorKind, Error};
use protocol::Context;
use service::ExecuteInContext;
use sync::ChainSync;
/// Blocks import queue API.
pub trait ImportQueue<B: BlockT>: Send + Sync {
/// Clear the queue when sync is restarting.
fn clear(&self);
/// Clears the import queue and stops importing.
fn stop(&self);
/// Get queue status.
fn status(&self) -> ImportQueueStatus<B>;
/// Is block with given hash currently in the queue.
fn is_importing(&self, hash: &B::Hash) -> bool;
/// Import bunch of blocks.
fn import_blocks(&self, sync: &mut ChainSync<B>, protocol: &mut Context<B>, blocks: (BlockOrigin, Vec<BlockData<B>>));
}
/// Import queue status. It isn't completely accurate.
pub struct ImportQueueStatus<B: BlockT> {
/// Number of blocks that are currently in the queue.
pub importing_count: usize,
/// The number of the best block that was ever in the queue since start/last failure.
pub best_importing_number: <<B as BlockT>::Header as HeaderT>::Number,
}
/// Blocks import queue that is importing blocks in the separate thread.
pub struct AsyncImportQueue<B: BlockT> {
handle: Mutex<Option<::std::thread::JoinHandle<()>>>,
data: Arc<AsyncImportQueueData<B>>,
}
/// Locks order: queue, queue_blocks, best_importing_number
struct AsyncImportQueueData<B: BlockT> {
signal: Condvar,
queue: Mutex<VecDeque<(BlockOrigin, Vec<BlockData<B>>)>>,
queue_blocks: RwLock<HashSet<B::Hash>>,
best_importing_number: RwLock<<<B as BlockT>::Header as HeaderT>::Number>,
is_stopping: AtomicBool,
}
impl<B: BlockT> AsyncImportQueue<B> {
pub fn new() -> Self {
Self {
handle: Mutex::new(None),
data: Arc::new(AsyncImportQueueData::new()),
}
}
pub fn start<E: 'static + ExecuteInContext<B>>(&self, sync: Weak<RwLock<ChainSync<B>>>, service: Weak<E>, chain: Weak<Client<B>>) -> Result<(), Error> {
debug_assert!(self.handle.lock().is_none());
let qdata = self.data.clone();
*self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || {
import_thread(sync, service, chain, qdata)
}).map_err(|err| Error::from(ErrorKind::Io(err)))?);
Ok(())
}
}
impl<B: BlockT> AsyncImportQueueData<B> {
pub fn new() -> Self {
Self {
signal: Default::default(),
queue: Mutex::new(VecDeque::new()),
queue_blocks: RwLock::new(HashSet::new()),
best_importing_number: RwLock::new(Zero::zero()),
is_stopping: Default::default(),
}
}
}
impl<B: BlockT> ImportQueue<B> for AsyncImportQueue<B> {
fn clear(&self) {
let mut queue = self.data.queue.lock();
let mut queue_blocks = self.data.queue_blocks.write();
let mut best_importing_number = self.data.best_importing_number.write();
queue_blocks.clear();
queue.clear();
*best_importing_number = Zero::zero();
}
fn stop(&self) {
self.clear();
if let Some(handle) = self.handle.lock().take() {
self.data.is_stopping.store(true, Ordering::SeqCst);
self.data.signal.notify_one();
let _ = handle.join();
}
}
fn status(&self) -> ImportQueueStatus<B> {
ImportQueueStatus {
importing_count: self.data.queue_blocks.read().len(),
best_importing_number: *self.data.best_importing_number.read(),
}
}
fn is_importing(&self, hash: &B::Hash) -> bool {
self.data.queue_blocks.read().contains(hash)
}
fn import_blocks(&self, _sync: &mut ChainSync<B>, _protocol: &mut Context<B>, blocks: (BlockOrigin, Vec<BlockData<B>>)) {
if blocks.1.is_empty() {
return;
}
trace!(target:"sync", "Scheduling {} blocks for import", blocks.1.len());
let mut queue = self.data.queue.lock();
let mut queue_blocks = self.data.queue_blocks.write();
let mut best_importing_number = self.data.best_importing_number.write();
let new_best_importing_number = blocks.1.last().and_then(|b| b.block.header.as_ref().map(|h| h.number().clone())).unwrap_or_else(|| Zero::zero());
queue_blocks.extend(blocks.1.iter().map(|b| b.block.hash.clone()));
if new_best_importing_number > *best_importing_number {
*best_importing_number = new_best_importing_number;
}
queue.push_back(blocks);
self.data.signal.notify_one();
}
}
impl<B: BlockT> Drop for AsyncImportQueue<B> {
fn drop(&mut self) {
self.stop();
}
}
/// Blocks import thread.
fn import_thread<B: BlockT, E: ExecuteInContext<B>>(sync: Weak<RwLock<ChainSync<B>>>, service: Weak<E>, chain: Weak<Client<B>>, qdata: Arc<AsyncImportQueueData<B>>) {
trace!(target: "sync", "Starting import thread");
loop {
if qdata.is_stopping.load(Ordering::SeqCst) {
break;
}
let new_blocks = {
let mut queue_lock = qdata.queue.lock();
if queue_lock.is_empty() {
qdata.signal.wait(&mut queue_lock);
}
match queue_lock.pop_front() {
Some(new_blocks) => new_blocks,
None => break,
}
};
match (sync.upgrade(), service.upgrade(), chain.upgrade()) {
(Some(sync), Some(service), Some(chain)) => {
let blocks_hashes: Vec<B::Hash> = new_blocks.1.iter().map(|b| b.block.hash.clone()).collect();
if !import_many_blocks(&mut SyncLink::Indirect(&sync, &*chain, &*service), Some(&*qdata), new_blocks) {
break;
}
let mut queue_blocks = qdata.queue_blocks.write();
for blocks_hash in blocks_hashes {
queue_blocks.remove(&blocks_hash);
}
},
_ => break,
}
}
trace!(target: "sync", "Stopping import thread");
}
/// ChainSync link trait.
trait SyncLinkApi<B: BlockT> {
/// Get chain reference.
fn chain(&self) -> &Client<B>;
/// Block imported.
fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>);
/// Maintain sync.
fn maintain_sync(&mut self);
/// Disconnect from peer.
fn useless_peer(&mut self, who: NodeIndex, reason: &str);
/// Disconnect from peer and restart sync.
fn note_useless_and_restart_sync(&mut self, who: NodeIndex, reason: &str);
/// Restart sync.
fn restart(&mut self);
}
/// Link with the ChainSync service.
enum SyncLink<'a, B: 'a + BlockT, E: 'a + ExecuteInContext<B>> {
/// Indirect link (through service).
Indirect(&'a RwLock<ChainSync<B>>, &'a Client<B>, &'a E),
/// Direct references are given.
#[cfg(test)]
Direct(&'a mut ChainSync<B>, &'a mut Context<B>),
}
/// Block import successful result.
#[derive(Debug, PartialEq)]
enum BlockImportResult<H: ::std::fmt::Debug + PartialEq, N: ::std::fmt::Debug + PartialEq> {
/// Imported known block.
ImportedKnown(H, N),
/// Imported unknown block.
ImportedUnknown(H, N),
}
/// Block import error.
#[derive(Debug, PartialEq)]
enum BlockImportError {
/// Disconnect from peer and continue import of next bunch of blocks.
Disconnect(NodeIndex),
/// Disconnect from peer and restart sync.
DisconnectAndRestart(NodeIndex),
/// Restart sync.
Restart,
}
/// Import a bunch of blocks.
fn import_many_blocks<'a, B: BlockT>(
link: &mut SyncLinkApi<B>,
qdata: Option<&AsyncImportQueueData<B>>,
blocks: (BlockOrigin, Vec<BlockData<B>>)
) -> bool
{
let (blocks_origin, blocks) = blocks;
let count = blocks.len();
let mut imported = 0;
let blocks_range = match (
blocks.first().and_then(|b| b.block.header.as_ref().map(|h| h.number())),
blocks.last().and_then(|b| b.block.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
trace!(target:"sync", "Starting import of {} blocks{}", count, blocks_range);
// Blocks in the response/drain should be in ascending order.
for block in blocks {
let import_result = import_single_block(link.chain(), blocks_origin.clone(), block);
let is_import_failed = import_result.is_err();
imported += process_import_result(link, import_result);
if is_import_failed {
qdata.map(|qdata| *qdata.best_importing_number.write() = Zero::zero());
return true;
}
if qdata.map(|qdata| qdata.is_stopping.load(Ordering::SeqCst)).unwrap_or_default() {
return false;
}
}
trace!(target: "sync", "Imported {} of {}", imported, count);
link.maintain_sync();
true
}
/// Single block import function.
fn import_single_block<B: BlockT>(
chain: &Client<B>,
block_origin: BlockOrigin,
block: BlockData<B>
) -> Result<BlockImportResult<B::Hash, <<B as BlockT>::Header as HeaderT>::Number>, BlockImportError>
{
let origin = block.origin;
let block = block.block;
match (block.header, block.justification) {
(Some(header), Some(justification)) => {
let number = header.number().clone();
let hash = header.hash();
let parent = header.parent_hash().clone();
let result = chain.import(
block_origin,
header,
justification,
block.body,
);
match result {
Ok(ImportResult::AlreadyInChain) => {
trace!(target: "sync", "Block already in chain {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedKnown(hash, number))
},
Ok(ImportResult::AlreadyQueued) => {
trace!(target: "sync", "Block already queued {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedKnown(hash, number))
},
Ok(ImportResult::Queued) => {
trace!(target: "sync", "Block queued {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedUnknown(hash, number))
},
Ok(ImportResult::UnknownParent) => {
debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent);
Err(BlockImportError::Restart)
},
Ok(ImportResult::KnownBad) => {
debug!(target: "sync", "Peer gave us a bad block {}: {:?}", number, hash);
Err(BlockImportError::DisconnectAndRestart(origin)) //TODO: use persistent ID
}
Err(e) => {
debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e);
Err(BlockImportError::Restart)
}
}
},
(None, _) => {
debug!(target: "sync", "Header {} was not provided by {} ", block.hash, origin);
Err(BlockImportError::Disconnect(origin)) //TODO: use persistent ID
},
(_, None) => {
debug!(target: "sync", "Justification set for block {} was not provided by {} ", block.hash, origin);
Err(BlockImportError::Disconnect(origin)) //TODO: use persistent ID
}
}
}
/// Process single block import result.
fn process_import_result<'a, B: BlockT>(
link: &mut SyncLinkApi<B>,
result: Result<BlockImportResult<B::Hash, <<B as BlockT>::Header as HeaderT>::Number>, BlockImportError>
) -> usize
{
match result {
Ok(BlockImportResult::ImportedKnown(hash, number)) => {
link.block_imported(&hash, number);
1
},
Ok(BlockImportResult::ImportedUnknown(hash, number)) => {
link.block_imported(&hash, number);
1
},
Err(BlockImportError::Disconnect(who)) => {
// TODO: FIXME: @arkpar BlockImport shouldn't be trying to manage the peer set.
// This should contain an actual reason.
link.useless_peer(who, "Import result was stated Disconnect");
0
},
Err(BlockImportError::DisconnectAndRestart(who)) => {
// TODO: FIXME: @arkpar BlockImport shouldn't be trying to manage the peer set.
// This should contain an actual reason.
link.note_useless_and_restart_sync(who, "Import result was stated DisconnectAndRestart");
0
},
Err(BlockImportError::Restart) => {
link.restart();
0
},
}
}
impl<'a, B: 'static + BlockT, E: 'a + ExecuteInContext<B>> SyncLink<'a, B, E> {
/// Execute closure with locked ChainSync.
fn with_sync<F: Fn(&mut ChainSync<B>, &mut Context<B>)>(&mut self, closure: F) {
match *self {
#[cfg(test)]
SyncLink::Direct(ref mut sync, ref mut protocol) =>
closure(*sync, *protocol),
SyncLink::Indirect(ref sync, _, ref service) =>
service.execute_in_context(move |protocol| {
let mut sync = sync.write();
closure(&mut *sync, protocol)
}),
}
}
}
impl<'a, B: 'static + BlockT, E: ExecuteInContext<B>> SyncLinkApi<B> for SyncLink<'a, B, E> {
fn chain(&self) -> &Client<B> {
match *self {
#[cfg(test)]
SyncLink::Direct(_, ref protocol) => protocol.client(),
SyncLink::Indirect(_, ref chain, _) => *chain,
}
}
fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.with_sync(|sync, _| sync.block_imported(&hash, number))
}
fn maintain_sync(&mut self) {
self.with_sync(|sync, protocol| sync.maintain_sync(protocol))
}
fn useless_peer(&mut self, who: NodeIndex, reason: &str) {
self.with_sync(|_, protocol| protocol.report_peer(who, Severity::Useless(reason)))
}
fn note_useless_and_restart_sync(&mut self, who: NodeIndex, reason: &str) {
self.with_sync(|sync, protocol| {
protocol.report_peer(who, Severity::Useless(reason)); // is this actually malign or just useless?
sync.restart(protocol);
})
}
fn restart(&mut self) {
self.with_sync(|sync, protocol| sync.restart(protocol))
}
}
#[cfg(test)]
pub mod tests {
use client;
use message;
use test_client::{self, TestClient};
use test_client::runtime::{Block, Hash};
use on_demand::tests::DummyExecutor;
use runtime_primitives::generic::BlockId;
use super::*;
/// Blocks import queue that is importing blocks in the same thread.
pub struct SyncImportQueue;
struct DummyExecuteInContext;
impl<B: 'static + BlockT> ExecuteInContext<B> for DummyExecuteInContext {
fn execute_in_context<F: Fn(&mut Context<B>)>(&self, _closure: F) { }
}
impl<B: 'static + BlockT> ImportQueue<B> for SyncImportQueue {
fn clear(&self) { }
fn stop(&self) { }
fn status(&self) -> ImportQueueStatus<B> {
ImportQueueStatus {
importing_count: 0,
best_importing_number: Zero::zero(),
}
}
fn is_importing(&self, _hash: &B::Hash) -> bool {
false
}
fn import_blocks(&self, sync: &mut ChainSync<B>, protocol: &mut Context<B>, blocks: (BlockOrigin, Vec<BlockData<B>>)) {
import_many_blocks(&mut SyncLink::Direct::<_, DummyExecuteInContext>(sync, protocol), None, blocks);
}
}
struct TestLink {
chain: Arc<Client<Block>>,
imported: usize,
maintains: usize,
disconnects: usize,
restarts: usize,
}
impl TestLink {
fn new() -> TestLink {
TestLink {
chain: Arc::new(test_client::new()),
imported: 0,
maintains: 0,
disconnects: 0,
restarts: 0,
}
}
fn total(&self) -> usize {
self.imported + self.maintains + self.disconnects + self.restarts
}
}
impl SyncLinkApi<Block> for TestLink {
fn chain(&self) -> &Client<Block> { &*self.chain }
fn block_imported(&mut self, _hash: &Hash, _number: NumberFor<Block>) { self.imported += 1; }
fn maintain_sync(&mut self) { self.maintains += 1; }
fn useless_peer(&mut self, _: NodeIndex, _: &str) { self.disconnects += 1; }
fn note_useless_and_restart_sync(&mut self, _: NodeIndex, _: &str) { self.disconnects += 1; self.restarts += 1; }
fn restart(&mut self) { self.restarts += 1; }
}
fn prepare_good_block() -> (client::Client<test_client::Backend, test_client::Executor, Block>, Hash, u64, BlockData<Block>) {
let client = test_client::new();
let block = client.new_block().unwrap().bake().unwrap();
client.justify_and_import(BlockOrigin::File, block).unwrap();
let (hash, number) = (client.block_hash(1).unwrap().unwrap(), 1);
let block = message::BlockData::<Block> {
hash: client.block_hash(1).unwrap().unwrap(),
header: client.header(&BlockId::Number(1)).unwrap(),
body: None,
receipt: None,
message_queue: None,
justification: client.justification(&BlockId::Number(1)).unwrap(),
};
(client, hash, number, BlockData { block, origin: 0 })
}
#[test]
fn import_single_good_block_works() {
let (_, hash, number, block) = prepare_good_block();
assert_eq!(import_single_block(&test_client::new(), BlockOrigin::File, block), Ok(BlockImportResult::ImportedUnknown(hash, number)));
}
#[test]
fn import_single_good_known_block_is_ignored() {
let (client, hash, number, block) = prepare_good_block();
assert_eq!(import_single_block(&client, BlockOrigin::File, block), Ok(BlockImportResult::ImportedKnown(hash, number)));
}
#[test]
fn import_single_good_block_without_header_fails() {
let (_, _, _, mut block) = prepare_good_block();
block.block.header = None;
assert_eq!(import_single_block(&test_client::new(), BlockOrigin::File, block), Err(BlockImportError::Disconnect(0)));
}
#[test]
fn import_single_good_block_without_justification_fails() {
let (_, _, _, mut block) = prepare_good_block();
block.block.justification = None;
assert_eq!(import_single_block(&test_client::new(), BlockOrigin::File, block), Err(BlockImportError::Disconnect(0)));
}
#[test]
fn process_import_result_works() {
let mut link = TestLink::new();
assert_eq!(process_import_result::<Block>(&mut link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1);
assert_eq!(link.total(), 1);
let mut link = TestLink::new();
assert_eq!(process_import_result::<Block>(&mut link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1);
assert_eq!(link.total(), 1);
assert_eq!(link.imported, 1);
let mut link = TestLink::new();
assert_eq!(process_import_result::<Block>(&mut link, Ok(BlockImportResult::ImportedUnknown(Default::default(), 0))), 1);
assert_eq!(link.total(), 1);
assert_eq!(link.imported, 1);
let mut link = TestLink::new();
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::Disconnect(0))), 0);
assert_eq!(link.total(), 1);
assert_eq!(link.disconnects, 1);
let mut link = TestLink::new();
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::DisconnectAndRestart(0))), 0);
assert_eq!(link.total(), 2);
assert_eq!(link.disconnects, 1);
assert_eq!(link.restarts, 1);
let mut link = TestLink::new();
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::Restart)), 0);
assert_eq!(link.total(), 1);
assert_eq!(link.restarts, 1);
}
#[test]
fn import_many_blocks_stops_when_stopping() {
let (_, _, _, block) = prepare_good_block();
let qdata = AsyncImportQueueData::new();
qdata.is_stopping.store(true, Ordering::SeqCst);
assert!(!import_many_blocks(&mut TestLink::new(), Some(&qdata), (BlockOrigin::File, vec![block.clone(), block])));
}
#[test]
fn async_import_queue_drops() {
let queue = AsyncImportQueue::new();
let service = Arc::new(DummyExecutor);
let chain = Arc::new(test_client::new());
queue.start(Weak::new(), Arc::downgrade(&service), Arc::downgrade(&chain) as Weak<Client<Block>>).unwrap();
drop(queue);
}
}
+72
View File
@@ -0,0 +1,72 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use network_libp2p::{NetworkContext, Severity, NodeIndex, SessionInfo};
/// IO interface for the syncing handler.
/// Provides peer connection management and an interface to the blockchain client.
pub trait SyncIo {
/// Report a peer for misbehaviour.
fn report_peer(&mut self, who: NodeIndex, reason: Severity);
/// Send a packet to a peer.
fn send(&mut self, who: NodeIndex, data: Vec<u8>);
/// Returns peer identifier string
fn peer_info(&self, who: NodeIndex) -> String {
who.to_string()
}
/// Returns information on p2p session
fn peer_session_info(&self, who: NodeIndex) -> Option<SessionInfo>;
/// Check if the session is expired
fn is_expired(&self) -> bool;
}
/// Wraps `NetworkContext` and the blockchain client
pub struct NetSyncIo<'s> {
network: &'s NetworkContext,
}
impl<'s> NetSyncIo<'s> {
/// Creates a new instance from the `NetworkContext` and the blockchain client reference.
pub fn new(network: &'s NetworkContext) -> NetSyncIo<'s> {
NetSyncIo {
network: network,
}
}
}
impl<'s> SyncIo for NetSyncIo<'s> {
fn report_peer(&mut self, who: NodeIndex, reason: Severity) {
self.network.report_peer(who, reason);
}
fn send(&mut self, who: NodeIndex, data: Vec<u8>) {
self.network.send(who, 0, data)
}
fn peer_session_info(&self, who: NodeIndex) -> Option<SessionInfo> {
self.network.session_info(who)
}
fn is_expired(&self) -> bool {
self.network.is_expired()
}
fn peer_info(&self, who: NodeIndex) -> String {
self.network.peer_client_version(who)
}
}
+69
View File
@@ -0,0 +1,69 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
#![warn(unused_extern_crates)]
#![warn(missing_docs)]
// tag::description[]
//! Substrate-specific P2P networking: synchronizing blocks, propagating BFT messages.
//! Allows attachment of an optional subprotocol for chain-specific requests.
// end::description[]
extern crate ethcore_io as core_io;
extern crate linked_hash_map;
extern crate parking_lot;
extern crate substrate_primitives as primitives;
extern crate substrate_client as client;
extern crate sr_primitives as runtime_primitives;
extern crate substrate_network_libp2p as network_libp2p;
extern crate parity_codec as codec;
extern crate futures;
extern crate rustc_hex;
#[macro_use] extern crate log;
#[macro_use] extern crate bitflags;
#[macro_use] extern crate error_chain;
#[macro_use] extern crate parity_codec_derive;
#[cfg(test)] extern crate env_logger;
#[cfg(test)] extern crate substrate_keyring as keyring;
#[cfg(test)] extern crate substrate_test_client as test_client;
mod service;
mod sync;
mod protocol;
mod io;
mod config;
mod chain;
mod blocks;
mod on_demand;
mod import_queue;
pub mod consensus_gossip;
pub mod error;
pub mod message;
pub mod specialization;
#[cfg(test)] mod test;
pub use chain::Client as ClientHandle;
pub use service::{Service, FetchFuture, ConsensusService, BftMessageStream,
TransactionPool, Params, ManageNetwork, SyncProvider};
pub use protocol::{ProtocolStatus, PeerInfo, Context};
pub use sync::{Status as SyncStatus, SyncState};
pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, NodeIndex, ProtocolId, ConnectionFilter, ConnectionDirection, Severity};
pub use message::{generic as generic_message, RequestId, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal, Status as StatusMessage};
pub use error::Error;
pub use config::{Roles, ProtocolConfig};
pub use on_demand::{OnDemand, OnDemandService, RemoteResponse};
+375
View File
@@ -0,0 +1,375 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Network packet message types. These get serialized and put into the lower level protocol payload.
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
use codec::{Encode, Decode, Input, Output};
pub use self::generic::{
BlockAnnounce, RemoteCallRequest, RemoteReadRequest,
RemoteHeaderRequest, RemoteHeaderResponse, ConsensusVote,
SignedConsensusVote, FromBlock
};
/// A unique ID of a request.
pub type RequestId = u64;
/// Type alias for using the message type using block type parameters.
pub type Message<B> = generic::Message<
B,
<B as BlockT>::Header,
<B as BlockT>::Hash,
<<B as BlockT>::Header as HeaderT>::Number,
<B as BlockT>::Extrinsic,
>;
/// Type alias for using the status type using block type parameters.
pub type Status<B> = generic::Status<
<B as BlockT>::Hash,
<<B as BlockT>::Header as HeaderT>::Number,
>;
/// Type alias for using the block request type using block type parameters.
pub type BlockRequest<B> = generic::BlockRequest<
<B as BlockT>::Hash,
<<B as BlockT>::Header as HeaderT>::Number,
>;
/// Type alias for using the localized bft message type using block type parameters.
pub type LocalizedBftMessage<B> = generic::LocalizedBftMessage<
B,
<B as BlockT>::Hash,
>;
/// Type alias for using the BlockData type using block type parameters.
pub type BlockData<B> = generic::BlockData<
<B as BlockT>::Header,
<B as BlockT>::Hash,
<B as BlockT>::Extrinsic,
>;
/// Type alias for using the BlockResponse type using block type parameters.
pub type BlockResponse<B> = generic::BlockResponse<
<B as BlockT>::Header,
<B as BlockT>::Hash,
<B as BlockT>::Extrinsic,
>;
/// Type alias for using the BftMessage type using block type parameters.
pub type BftMessage<B> = generic::BftMessage<
B,
<B as BlockT>::Hash,
>;
/// Type alias for using the SignedConsensusProposal type using block type parameters.
pub type SignedConsensusProposal<B> = generic::SignedConsensusProposal<
B,
<B as BlockT>::Hash,
>;
/// Type alias for using the SignedConsensusProposal type using block type parameters.
pub type SignedConsensusMessage<B> = generic::SignedConsensusProposal<
B,
<B as BlockT>::Hash,
>;
/// A set of transactions.
pub type Transactions<E> = Vec<E>;
/// Bits of block data and associated artefacts to request.
bitflags! {
/// Node roles bitmask.
pub struct BlockAttributes: u8 {
/// Include block header.
const HEADER = 0b00000001;
/// Include block body.
const BODY = 0b00000010;
/// Include block receipt.
const RECEIPT = 0b00000100;
/// Include block message queue.
const MESSAGE_QUEUE = 0b00001000;
/// Include a justification for the block.
const JUSTIFICATION = 0b00010000;
}
}
impl Encode for BlockAttributes {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push_byte(self.bits())
}
}
impl Decode for BlockAttributes {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Self::from_bits(input.read_byte()?)
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy, Encode, Decode)]
/// Block enumeration direction.
pub enum Direction {
/// Enumerate in ascending order (from child to parent).
Ascending = 0,
/// Enumerate in descendfing order (from parent to canonical child).
Descending = 1,
}
/// Remote call response.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct RemoteCallResponse {
/// Id of a request this response was made for.
pub id: RequestId,
/// Execution proof.
pub proof: Vec<Vec<u8>>,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote read response.
pub struct RemoteReadResponse {
/// Id of a request this response was made for.
pub id: RequestId,
/// Read proof.
pub proof: Vec<Vec<u8>>,
}
/// Generic types.
pub mod generic {
use primitives::{AuthorityId, ed25519};
use runtime_primitives::bft::Justification;
use service::Roles;
use super::{
BlockAttributes, RemoteCallResponse, RemoteReadResponse,
RequestId, Transactions, Direction
};
/// Block data sent in the response.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct BlockData<Header, Hash, Extrinsic> {
/// Block header hash.
pub hash: Hash,
/// Block header if requested.
pub header: Option<Header>,
/// Block body if requested.
pub body: Option<Vec<Extrinsic>>,
/// Block receipt if requested.
pub receipt: Option<Vec<u8>>,
/// Block message queue if requested.
pub message_queue: Option<Vec<u8>>,
/// Justification if requested.
pub justification: Option<Justification<Hash>>,
}
/// Identifies starting point of a block sequence.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub enum FromBlock<Hash, Number> {
/// Start with given hash.
Hash(Hash),
/// Start with given block number.
Number(Number),
}
/// Communication that can occur between participants in consensus.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub enum BftMessage<Block, Hash> {
/// A consensus message (proposal or vote)
Consensus(SignedConsensusMessage<Block, Hash>),
/// Auxiliary communication (just proof-of-lock for now).
Auxiliary(Justification<Hash>),
}
/// BFT Consensus message with parent header hash attached to it.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct LocalizedBftMessage<Block, Hash> {
/// Consensus message.
pub message: BftMessage<Block, Hash>,
/// Parent header hash.
pub parent_hash: Hash,
}
/// A localized proposal message. Contains two signed pieces of data.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct SignedConsensusProposal<Block, Hash> {
/// The round number.
pub round_number: u32,
/// The proposal sent.
pub proposal: Block,
/// The digest of the proposal.
pub digest: Hash,
/// The sender of the proposal
pub sender: AuthorityId,
/// The signature on the message (propose, round number, digest)
pub digest_signature: ed25519::Signature,
/// The signature on the message (propose, round number, proposal)
pub full_signature: ed25519::Signature,
}
/// A localized vote message, including the sender.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct SignedConsensusVote<H> {
/// The message sent.
pub vote: ConsensusVote<H>,
/// The sender of the message
pub sender: AuthorityId,
/// The signature of the message.
pub signature: ed25519::Signature,
}
/// Votes during a consensus round.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub enum ConsensusVote<H> {
/// Prepare to vote for proposal with digest D.
Prepare(u32, H),
/// Commit to proposal with digest D..
Commit(u32, H),
/// Propose advancement to a new round.
AdvanceRound(u32),
}
/// A localized message.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub enum SignedConsensusMessage<Block, Hash> {
/// A proposal.
Propose(SignedConsensusProposal<Block, Hash>),
/// A vote.
Vote(SignedConsensusVote<Hash>),
}
/// A network message.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub enum Message<Block, Header, Hash, Number, Extrinsic> {
/// Status packet.
Status(Status<Hash, Number>),
/// Block request.
BlockRequest(BlockRequest<Hash, Number>),
/// Block response.
BlockResponse(BlockResponse<Header, Hash, Extrinsic>),
/// Block announce.
BlockAnnounce(BlockAnnounce<Header>),
/// Transactions.
Transactions(Transactions<Extrinsic>),
/// BFT Consensus statement.
BftMessage(LocalizedBftMessage<Block, Hash>),
/// Remote method call request.
RemoteCallRequest(RemoteCallRequest<Hash>),
/// Remote method call response.
RemoteCallResponse(RemoteCallResponse),
/// Remote storage read request.
RemoteReadRequest(RemoteReadRequest<Hash>),
/// Remote storage read response.
RemoteReadResponse(RemoteReadResponse),
/// Remote header request.
RemoteHeaderRequest(RemoteHeaderRequest<Number>),
/// Remote header response.
RemoteHeaderResponse(RemoteHeaderResponse<Header>),
/// Chain-specific message
#[codec(index = "255")]
ChainSpecific(Vec<u8>),
}
/// Status sent on connection.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct Status<Hash, Number> {
/// Protocol version.
pub version: u32,
/// Supported roles.
pub roles: Roles,
/// Best block number.
pub best_number: Number,
/// Best block hash.
pub best_hash: Hash,
/// Genesis block hash.
pub genesis_hash: Hash,
/// Chain-specific status.
pub chain_status: Vec<u8>,
}
/// Request block data from a peer.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct BlockRequest<Hash, Number> {
/// Unique request id.
pub id: RequestId,
/// Bits of block data to request.
pub fields: BlockAttributes,
/// Start from this block.
pub from: FromBlock<Hash, Number>,
/// End at this block. An implementation defined maximum is used when unspecified.
pub to: Option<Hash>,
/// Sequence direction.
pub direction: Direction,
/// Maximum number of blocks to return. An implementation defined maximum is used when unspecified.
pub max: Option<u32>,
}
/// Response to `BlockRequest`
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct BlockResponse<Header, Hash, Extrinsic> {
/// Id of a request this response was made for.
pub id: RequestId,
/// Block data for the requested sequence.
pub blocks: Vec<BlockData<Header, Hash, Extrinsic>>,
}
/// Announce a new complete relay chain block on the network.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct BlockAnnounce<H> {
/// New block header.
pub header: H,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote call request.
pub struct RemoteCallRequest<H> {
/// Unique request id.
pub id: RequestId,
/// Block at which to perform call.
pub block: H,
/// Method name.
pub method: String,
/// Call data.
pub data: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote storage read request.
pub struct RemoteReadRequest<H> {
/// Unique request id.
pub id: RequestId,
/// Block at which to perform call.
pub block: H,
/// Storage key.
pub key: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote header request.
pub struct RemoteHeaderRequest<N> {
/// Unique request id.
pub id: RequestId,
/// Block number to request header for.
pub block: N,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote header response.
pub struct RemoteHeaderResponse<Header> {
/// Id of a request this response was made for.
pub id: RequestId,
/// Header. None if proof generation has failed (e.g. header is unknown).
pub header: Option<Header>,
/// Header proof.
pub proof: Vec<Vec<u8>>,
}
}
+736
View File
@@ -0,0 +1,736 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! On-demand requests service.
use std::collections::VecDeque;
use std::sync::{Arc, Weak};
use std::time::{Instant, Duration};
use futures::{Async, Future, Poll};
use futures::sync::oneshot::{channel, Receiver, Sender};
use linked_hash_map::LinkedHashMap;
use linked_hash_map::Entry;
use parking_lot::Mutex;
use client;
use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
RemoteCallRequest, RemoteReadRequest};
use io::SyncIo;
use message;
use network_libp2p::{Severity, NodeIndex};
use service;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
/// Remote request timeout.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
/// Default request retry count.
const RETRY_COUNT: usize = 1;
/// On-demand service API.
pub trait OnDemandService<Block: BlockT>: Send + Sync {
/// When new node is connected.
fn on_connect(&self, peer: NodeIndex, role: service::Roles);
/// When node is disconnected.
fn on_disconnect(&self, peer: NodeIndex);
/// Maintain peers requests.
fn maintain_peers(&self, io: &mut SyncIo);
/// When header response is received from remote node.
fn on_remote_header_response(
&self,
io: &mut SyncIo,
peer: NodeIndex,
response: message::RemoteHeaderResponse<Block::Header>
);
/// When read response is received from remote node.
fn on_remote_read_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteReadResponse);
/// When call response is received from remote node.
fn on_remote_call_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteCallResponse);
}
/// On-demand requests service. Dispatches requests to appropriate peers.
pub struct OnDemand<B: BlockT, E: service::ExecuteInContext<B>> {
core: Mutex<OnDemandCore<B, E>>,
checker: Arc<FetchChecker<B>>,
}
/// On-demand remote call response.
pub struct RemoteResponse<T> {
receiver: Receiver<Result<T, client::error::Error>>,
}
#[derive(Default)]
struct OnDemandCore<B: BlockT, E: service::ExecuteInContext<B>> {
service: Weak<E>,
next_request_id: u64,
pending_requests: VecDeque<Request<B>>,
active_peers: LinkedHashMap<NodeIndex, Request<B>>,
idle_peers: VecDeque<NodeIndex>,
}
struct Request<Block: BlockT> {
id: u64,
timestamp: Instant,
retry_count: usize,
data: RequestData<Block>,
}
enum RequestData<Block: BlockT> {
RemoteHeader(RemoteHeaderRequest<Block::Header>, Sender<Result<Block::Header, client::error::Error>>),
RemoteRead(RemoteReadRequest<Block::Header>, Sender<Result<Option<Vec<u8>>, client::error::Error>>),
RemoteCall(RemoteCallRequest<Block::Header>, Sender<Result<client::CallResult, client::error::Error>>),
}
enum Accept<Block: BlockT> {
Ok,
CheckFailed(client::error::Error, RequestData<Block>),
Unexpected(RequestData<Block>),
}
impl<T> Future for RemoteResponse<T> {
type Item = T;
type Error = client::error::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.receiver.poll()
.map_err(|_| client::error::ErrorKind::RemoteFetchCancelled.into())
.and_then(|r| match r {
Async::Ready(Ok(ready)) => Ok(Async::Ready(ready)),
Async::Ready(Err(error)) => Err(error),
Async::NotReady => Ok(Async::NotReady),
})
}
}
impl<B: BlockT, E> OnDemand<B, E> where
E: service::ExecuteInContext<B>,
B::Header: HeaderT,
{
/// Creates new on-demand service.
pub fn new(checker: Arc<FetchChecker<B>>) -> Self {
OnDemand {
checker,
core: Mutex::new(OnDemandCore {
service: Weak::new(),
next_request_id: 0,
pending_requests: VecDeque::new(),
active_peers: LinkedHashMap::new(),
idle_peers: VecDeque::new(),
})
}
}
/// Sets weak reference to network service.
pub fn set_service_link(&self, service: Weak<E>) {
self.core.lock().service = service;
}
/// Schedule && dispatch all scheduled requests.
fn schedule_request<R>(&self, retry_count: Option<usize>, data: RequestData<B>, result: R) -> R {
let mut core = self.core.lock();
core.insert(retry_count.unwrap_or(RETRY_COUNT), data);
core.dispatch();
result
}
/// Try to accept response from given peer.
fn accept_response<F: FnOnce(Request<B>) -> Accept<B>>(&self, rtype: &str, io: &mut SyncIo, peer: NodeIndex, request_id: u64, try_accept: F) {
let mut core = self.core.lock();
let request = match core.remove(peer, request_id) {
Some(request) => request,
None => {
io.report_peer(peer, Severity::Bad(&format!("Invalid remote {} response from peer", rtype)));
core.remove_peer(peer);
return;
},
};
let retry_count = request.retry_count;
let (retry_count, retry_request_data) = match try_accept(request) {
Accept::Ok => (retry_count, None),
Accept::CheckFailed(error, retry_request_data) => {
io.report_peer(peer, Severity::Bad(&format!("Failed to check remote {} response from peer: {}", rtype, error)));
core.remove_peer(peer);
if retry_count > 0 {
(retry_count - 1, Some(retry_request_data))
} else {
trace!(target: "sync", "Failed to get remote {} response for given number of retries", rtype);
retry_request_data.fail(client::error::ErrorKind::RemoteFetchFailed.into());
(0, None)
}
},
Accept::Unexpected(retry_request_data) => {
io.report_peer(peer, Severity::Bad(&format!("Unexpected response to remote {} from peer", rtype)));
core.remove_peer(peer);
(retry_count, Some(retry_request_data))
},
};
if let Some(request_data) = retry_request_data {
core.insert(retry_count, request_data);
}
core.dispatch();
}
}
impl<B, E> OnDemandService<B> for OnDemand<B, E> where
B: BlockT,
E: service::ExecuteInContext<B>,
B::Header: HeaderT,
{
fn on_connect(&self, peer: NodeIndex, role: service::Roles) {
if !role.intersects(service::Roles::FULL | service::Roles::AUTHORITY) { // TODO: correct?
return;
}
let mut core = self.core.lock();
core.add_peer(peer);
core.dispatch();
}
fn on_disconnect(&self, peer: NodeIndex) {
let mut core = self.core.lock();
core.remove_peer(peer);
core.dispatch();
}
fn maintain_peers(&self, io: &mut SyncIo) {
let mut core = self.core.lock();
for bad_peer in core.maintain_peers() {
io.report_peer(bad_peer, Severity::Timeout);
}
core.dispatch();
}
fn on_remote_header_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteHeaderResponse<B::Header>) {
self.accept_response("header", io, peer, response.id, |request| match request.data {
RequestData::RemoteHeader(request, sender) => match self.checker.check_header_proof(&request, response.header, response.proof) {
Ok(response) => {
// we do not bother if receiver has been dropped already
let _ = sender.send(Ok(response));
Accept::Ok
},
Err(error) => Accept::CheckFailed(error, RequestData::RemoteHeader(request, sender)),
},
data @ _ => Accept::Unexpected(data),
})
}
fn on_remote_read_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteReadResponse) {
self.accept_response("read", io, peer, response.id, |request| match request.data {
RequestData::RemoteRead(request, sender) => match self.checker.check_read_proof(&request, response.proof) {
Ok(response) => {
// we do not bother if receiver has been dropped already
let _ = sender.send(Ok(response));
Accept::Ok
},
Err(error) => Accept::CheckFailed(error, RequestData::RemoteRead(request, sender)),
},
data @ _ => Accept::Unexpected(data),
})
}
fn on_remote_call_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteCallResponse) {
self.accept_response("call", io, peer, response.id, |request| match request.data {
RequestData::RemoteCall(request, sender) => match self.checker.check_execution_proof(&request, response.proof) {
Ok(response) => {
// we do not bother if receiver has been dropped already
let _ = sender.send(Ok(response));
Accept::Ok
},
Err(error) => Accept::CheckFailed(error, RequestData::RemoteCall(request, sender)),
},
data @ _ => Accept::Unexpected(data),
})
}
}
impl<B, E> Fetcher<B> for OnDemand<B, E> where
B: BlockT,
E: service::ExecuteInContext<B>,
B::Header: HeaderT,
{
type RemoteHeaderResult = RemoteResponse<B::Header>;
type RemoteReadResult = RemoteResponse<Option<Vec<u8>>>;
type RemoteCallResult = RemoteResponse<client::CallResult>;
fn remote_header(&self, request: RemoteHeaderRequest<B::Header>) -> Self::RemoteHeaderResult {
let (sender, receiver) = channel();
self.schedule_request(request.retry_count.clone(), RequestData::RemoteHeader(request, sender),
RemoteResponse { receiver })
}
fn remote_read(&self, request: RemoteReadRequest<B::Header>) -> Self::RemoteReadResult {
let (sender, receiver) = channel();
self.schedule_request(request.retry_count.clone(), RequestData::RemoteRead(request, sender),
RemoteResponse { receiver })
}
fn remote_call(&self, request: RemoteCallRequest<B::Header>) -> Self::RemoteCallResult {
let (sender, receiver) = channel();
self.schedule_request(request.retry_count.clone(), RequestData::RemoteCall(request, sender),
RemoteResponse { receiver })
}
}
impl<B, E> OnDemandCore<B, E> where
B: BlockT,
E: service::ExecuteInContext<B>,
B::Header: HeaderT,
{
pub fn add_peer(&mut self, peer: NodeIndex) {
self.idle_peers.push_back(peer);
}
pub fn remove_peer(&mut self, peer: NodeIndex) {
if let Some(request) = self.active_peers.remove(&peer) {
self.pending_requests.push_front(request);
return;
}
if let Some(idle_index) = self.idle_peers.iter().position(|i| *i == peer) {
self.idle_peers.swap_remove_back(idle_index);
}
}
pub fn maintain_peers(&mut self) -> Vec<NodeIndex> {
let now = Instant::now();
let mut bad_peers = Vec::new();
loop {
match self.active_peers.front() {
Some((_, request)) if now - request.timestamp >= REQUEST_TIMEOUT => (),
_ => return bad_peers,
}
let (bad_peer, request) = self.active_peers.pop_front().expect("front() is Some as checked above");
self.pending_requests.push_front(request);
bad_peers.push(bad_peer);
}
}
pub fn insert(&mut self, retry_count: usize, data: RequestData<B>) {
let request_id = self.next_request_id;
self.next_request_id += 1;
self.pending_requests.push_back(Request {
id: request_id,
timestamp: Instant::now(),
retry_count,
data,
});
}
pub fn remove(&mut self, peer: NodeIndex, id: u64) -> Option<Request<B>> {
match self.active_peers.entry(peer) {
Entry::Occupied(entry) => match entry.get().id == id {
true => {
self.idle_peers.push_back(peer);
Some(entry.remove())
},
false => None,
},
Entry::Vacant(_) => None,
}
}
pub fn dispatch(&mut self) {
let service = match self.service.upgrade() {
Some(service) => service,
None => return,
};
while !self.pending_requests.is_empty() {
let peer = match self.idle_peers.pop_front() {
Some(peer) => peer,
None => return,
};
let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed");
request.timestamp = Instant::now();
trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer);
service.execute_in_context(|ctx| ctx.send_message(peer, request.message()));
self.active_peers.insert(peer, request);
}
}
}
impl<Block: BlockT> Request<Block> {
pub fn message(&self) -> message::Message<Block> {
match self.data {
RequestData::RemoteHeader(ref data, _) => message::generic::Message::RemoteHeaderRequest(
message::RemoteHeaderRequest {
id: self.id,
block: data.block,
}),
RequestData::RemoteRead(ref data, _) => message::generic::Message::RemoteReadRequest(
message::RemoteReadRequest {
id: self.id,
block: data.block,
key: data.key.clone(),
}),
RequestData::RemoteCall(ref data, _) => message::generic::Message::RemoteCallRequest(
message::RemoteCallRequest {
id: self.id,
block: data.block,
method: data.method.clone(),
data: data.call_data.clone(),
}),
}
}
}
impl<Block: BlockT> RequestData<Block> {
pub fn fail(self, error: client::error::Error) {
// don't care if anyone is listening
match self {
RequestData::RemoteHeader(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteCall(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteRead(_, sender) => { let _ = sender.send(Err(error)); },
}
}
}
#[cfg(test)]
pub mod tests {
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Instant;
use futures::Future;
use parking_lot::RwLock;
use client;
use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
RemoteCallRequest, RemoteReadRequest};
use message;
use network_libp2p::NodeIndex;
use service::{Roles, ExecuteInContext};
use test::TestIo;
use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService};
use test_client::runtime::{Block, Header};
pub struct DummyExecutor;
struct DummyFetchChecker { ok: bool }
impl ExecuteInContext<Block> for DummyExecutor {
fn execute_in_context<F: Fn(&mut ::protocol::Context<Block>)>(&self, _closure: F) {}
}
impl FetchChecker<Block> for DummyFetchChecker {
fn check_header_proof(
&self,
_request: &RemoteHeaderRequest<Header>,
header: Option<Header>,
_remote_proof: Vec<Vec<u8>>
) -> client::error::Result<Header> {
match self.ok {
true if header.is_some() => Ok(header.unwrap()),
_ => Err(client::error::ErrorKind::Backend("Test error".into()).into()),
}
}
fn check_read_proof(&self, _request: &RemoteReadRequest<Header>, _remote_proof: Vec<Vec<u8>>) -> client::error::Result<Option<Vec<u8>>> {
match self.ok {
true => Ok(Some(vec![42])),
false => Err(client::error::ErrorKind::Backend("Test error".into()).into()),
}
}
fn check_execution_proof(&self, _request: &RemoteCallRequest<Header>, _remote_proof: Vec<Vec<u8>>) -> client::error::Result<client::CallResult> {
match self.ok {
true => Ok(client::CallResult {
return_data: vec![42],
changes: Default::default(),
}),
false => Err(client::error::ErrorKind::Backend("Test error".into()).into()),
}
}
}
fn dummy(ok: bool) -> (Arc<DummyExecutor>, Arc<OnDemand<Block, DummyExecutor>>) {
let executor = Arc::new(DummyExecutor);
let service = Arc::new(OnDemand::new(Arc::new(DummyFetchChecker { ok })));
service.set_service_link(Arc::downgrade(&executor));
(executor, service)
}
fn total_peers(on_demand: &OnDemand<Block, DummyExecutor>) -> usize {
let core = on_demand.core.lock();
core.idle_peers.len() + core.active_peers.len()
}
fn receive_call_response(on_demand: &OnDemand<Block, DummyExecutor>, network: &mut TestIo, peer: NodeIndex, id: message::RequestId) {
on_demand.on_remote_call_response(network, peer, message::RemoteCallResponse {
id: id,
proof: vec![vec![2]],
});
}
fn dummy_header() -> Header {
Header {
parent_hash: Default::default(),
number: 0,
state_root: Default::default(),
extrinsics_root: Default::default(),
digest: Default::default(),
}
}
#[test]
fn knows_about_peers_roles() {
let (_, on_demand) = dummy(true);
on_demand.on_connect(0, Roles::LIGHT);
on_demand.on_connect(1, Roles::FULL);
on_demand.on_connect(2, Roles::AUTHORITY);
assert_eq!(vec![1, 2], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
}
#[test]
fn disconnects_from_idle_peer() {
let (_, on_demand) = dummy(true);
on_demand.on_connect(0, Roles::FULL);
assert_eq!(1, total_peers(&*on_demand));
on_demand.on_disconnect(0);
assert_eq!(0, total_peers(&*on_demand));
}
#[test]
fn disconnects_from_timeouted_peer() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
on_demand.on_connect(1, Roles::FULL);
assert_eq!(vec![0, 1], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
assert!(on_demand.core.lock().active_peers.is_empty());
on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
assert_eq!(vec![1], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
assert_eq!(vec![0], on_demand.core.lock().active_peers.keys().cloned().collect::<Vec<_>>());
on_demand.core.lock().active_peers[&0].timestamp = Instant::now() - REQUEST_TIMEOUT - REQUEST_TIMEOUT;
on_demand.maintain_peers(&mut network);
assert!(on_demand.core.lock().idle_peers.is_empty());
assert_eq!(vec![1], on_demand.core.lock().active_peers.keys().cloned().collect::<Vec<_>>());
assert!(network.to_disconnect.contains(&0));
}
#[test]
fn disconnects_from_peer_on_response_with_wrong_id() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
receive_call_response(&*on_demand, &mut network, 0, 1);
assert!(network.to_disconnect.contains(&0));
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
}
#[test]
fn disconnects_from_peer_on_incorrect_response() {
let (_x, on_demand) = dummy(false);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: Some(1),
});
on_demand.on_connect(0, Roles::FULL);
receive_call_response(&*on_demand, &mut network, 0, 0);
assert!(network.to_disconnect.contains(&0));
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
}
#[test]
fn disconnects_from_peer_on_unexpected_response() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
receive_call_response(&*on_demand, &mut network, 0, 0);
assert!(network.to_disconnect.contains(&0));
}
#[test]
fn disconnects_from_peer_on_wrong_response_type() {
let (_x, on_demand) = dummy(false);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: Some(1),
});
on_demand.on_remote_read_response(&mut network, 0, message::RemoteReadResponse {
id: 0,
proof: vec![vec![2]],
});
assert!(network.to_disconnect.contains(&0));
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
}
#[test]
fn receives_remote_failure_after_retry_count_failures() {
use parking_lot::{Condvar, Mutex};
let retry_count = 2;
let (_x, on_demand) = dummy(false);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
for i in 0..retry_count+1 {
on_demand.on_connect(i, Roles::FULL);
}
let sync = Arc::new((Mutex::new(0), Mutex::new(0), Condvar::new()));
let thread_sync = sync.clone();
let response = on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: Some(retry_count)
});
let thread = ::std::thread::spawn(move || {
let &(ref current, ref finished_at, ref finished) = &*thread_sync;
let _ = response.wait().unwrap_err();
*finished_at.lock() = *current.lock();
finished.notify_one();
});
let &(ref current, ref finished_at, ref finished) = &*sync;
for i in 0..retry_count+1 {
let mut current = current.lock();
*current = *current + 1;
receive_call_response(&*on_demand, &mut network, i, i as u64);
}
let mut finished_at = finished_at.lock();
assert!(!finished.wait_for(&mut finished_at, ::std::time::Duration::from_millis(1000)).timed_out());
assert_eq!(*finished_at, retry_count + 1);
thread.join().unwrap();
}
#[test]
fn receives_remote_call_response() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
let response = on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
assert_eq!(result.return_data, vec![42]);
});
receive_call_response(&*on_demand, &mut network, 0, 0);
thread.join().unwrap();
}
#[test]
fn receives_remote_read_response() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
let response = on_demand.remote_read(RemoteReadRequest {
header: dummy_header(),
block: Default::default(),
key: b":key".to_vec(),
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
assert_eq!(result, Some(vec![42]));
});
on_demand.on_remote_read_response(&mut network, 0, message::RemoteReadResponse {
id: 0,
proof: vec![vec![2]],
});
thread.join().unwrap();
}
#[test]
fn receives_remote_header_response() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
let response = on_demand.remote_header(RemoteHeaderRequest {
cht_root: Default::default(),
block: 1,
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
assert_eq!(result.hash(), "80729accb7bb10ff9c637a10e8bb59f21c52571aa7b46544c5885ca89ed190f4".into());
});
on_demand.on_remote_header_response(&mut network, 0, message::RemoteHeaderResponse {
id: 0,
header: Some(Header {
parent_hash: Default::default(),
number: 1,
state_root: Default::default(),
extrinsics_root: Default::default(),
digest: Default::default(),
}),
proof: vec![vec![2]],
});
thread.join().unwrap();
}
}
+679
View File
@@ -0,0 +1,679 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashMap, HashSet};
use std::{mem, cmp};
use std::sync::Arc;
use std::time;
use parking_lot::RwLock;
use rustc_hex::ToHex;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, NumberFor, As};
use runtime_primitives::generic::BlockId;
use network_libp2p::{NodeIndex, Severity};
use codec::{Encode, Decode};
use message::{self, Message};
use message::generic::Message as GenericMessage;
use specialization::Specialization;
use sync::{ChainSync, Status as SyncStatus, SyncState};
use service::{Roles, TransactionPool, ExHashT};
use import_queue::ImportQueue;
use config::ProtocolConfig;
use chain::Client;
use on_demand::OnDemandService;
use io::SyncIo;
use error;
const REQUEST_TIMEOUT_SEC: u64 = 40;
/// Current protocol version.
pub (crate) const CURRENT_VERSION: u32 = 1;
/// Current packet count.
pub (crate) const CURRENT_PACKET_COUNT: u8 = 1;
// Maximum allowed entries in `BlockResponse`
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, S: Specialization<B>, H: ExHashT> {
config: ProtocolConfig,
on_demand: Option<Arc<OnDemandService<B>>>,
genesis_hash: B::Hash,
sync: Arc<RwLock<ChainSync<B>>>,
specialization: RwLock<S>,
context_data: ContextData<B, H>,
// Connected peers pending Status message.
handshaking_peers: RwLock<HashMap<NodeIndex, time::Instant>>,
transaction_pool: Arc<TransactionPool<H, B>>,
}
/// Syncing status and statistics
#[derive(Clone)]
pub struct ProtocolStatus<B: BlockT> {
/// Sync status.
pub sync: SyncStatus<B>,
/// Total number of connected peers
pub num_peers: usize,
/// Total number of active peers.
pub num_active_peers: usize,
}
/// Peer information
struct Peer<B: BlockT, H: ExHashT> {
/// Protocol version
protocol_version: u32,
/// Roles
roles: Roles,
/// Peer best block hash
best_hash: B::Hash,
/// Peer best block number
best_number: <B::Header as HeaderT>::Number,
/// Pending block request if any
block_request: Option<message::BlockRequest<B>>,
/// Request timestamp
request_timestamp: Option<time::Instant>,
/// Holds a set of transactions known to this peer.
known_extrinsics: HashSet<H>,
/// Holds a set of blocks known to this peer.
known_blocks: HashSet<B::Hash>,
/// Request counter,
next_request_id: message::RequestId,
}
/// Info about a peer's known state.
#[derive(Debug)]
pub struct PeerInfo<B: BlockT> {
/// Roles
pub roles: Roles,
/// Protocol version
pub protocol_version: u32,
/// Peer best block hash
pub best_hash: B::Hash,
/// Peer best block number
pub best_number: <B::Header as HeaderT>::Number,
}
/// Context for a network-specific handler.
pub trait Context<B: BlockT> {
/// Get a reference to the client.
fn client(&self) -> &::chain::Client<B>;
/// Point out that a peer has been malign or irresponsible or appeared lazy.
fn report_peer(&mut self, who: NodeIndex, reason: Severity);
/// Get peer info.
fn peer_info(&self, peer: NodeIndex) -> Option<PeerInfo<B>>;
/// Send a message to a peer.
fn send_message(&mut self, who: NodeIndex, data: ::message::Message<B>);
}
/// Protocol context.
pub(crate) struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> {
io: &'a mut SyncIo,
context_data: &'a ContextData<B, H>,
}
impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> {
pub(crate) fn new(context_data: &'a ContextData<B, H>, io: &'a mut SyncIo) -> Self {
ProtocolContext {
io,
context_data,
}
}
/// Send a message to a peer.
pub fn send_message(&mut self, who: NodeIndex, message: Message<B>) {
send_message(&self.context_data.peers, self.io, who, message)
}
/// Point out that a peer has been malign or irresponsible or appeared lazy.
pub fn report_peer(&mut self, who: NodeIndex, reason: Severity) {
self.io.report_peer(who, reason);
}
/// Get peer info.
pub fn peer_info(&self, peer: NodeIndex) -> Option<PeerInfo<B>> {
self.context_data.peers.read().get(&peer).map(|p| {
PeerInfo {
roles: p.roles,
protocol_version: p.protocol_version,
best_hash: p.best_hash,
best_number: p.best_number,
}
})
}
}
impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B, H> {
fn send_message(&mut self, who: NodeIndex, message: Message<B>) {
ProtocolContext::send_message(self, who, message);
}
fn report_peer(&mut self, who: NodeIndex, reason: Severity) {
ProtocolContext::report_peer(self, who, reason);
}
fn peer_info(&self, who: NodeIndex) -> Option<PeerInfo<B>> {
ProtocolContext::peer_info(self, who)
}
fn client(&self) -> &Client<B> {
&*self.context_data.chain
}
}
/// Data necessary to create a context.
pub(crate) struct ContextData<B: BlockT, H: ExHashT> {
// All connected peers
peers: RwLock<HashMap<NodeIndex, Peer<B, H>>>,
chain: Arc<Client<B>>,
}
impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Create a new instance.
pub fn new(
config: ProtocolConfig,
chain: Arc<Client<B>>,
import_queue: Arc<ImportQueue<B>>,
on_demand: Option<Arc<OnDemandService<B>>>,
transaction_pool: Arc<TransactionPool<H, B>>,
specialization: S,
) -> error::Result<Self> {
let info = chain.info()?;
let sync = ChainSync::new(config.roles, &info, import_queue);
let protocol = Protocol {
config: config,
context_data: ContextData {
peers: RwLock::new(HashMap::new()),
chain,
},
on_demand,
genesis_hash: info.chain.genesis_hash,
sync: Arc::new(RwLock::new(sync)),
specialization: RwLock::new(specialization),
handshaking_peers: RwLock::new(HashMap::new()),
transaction_pool: transaction_pool,
};
Ok(protocol)
}
pub(crate) fn context_data(&self) -> &ContextData<B, H> {
&self.context_data
}
pub(crate) fn sync(&self) -> &Arc<RwLock<ChainSync<B>>> {
&self.sync
}
/// Returns protocol status
pub fn status(&self) -> ProtocolStatus<B> {
let sync = self.sync.read();
let peers = self.context_data.peers.read();
ProtocolStatus {
sync: sync.status(),
num_peers: peers.values().count(),
num_active_peers: peers.values().filter(|p| p.block_request.is_some()).count(),
}
}
pub fn handle_packet(&self, io: &mut SyncIo, who: NodeIndex, mut data: &[u8]) {
let message: Message<B> = match Decode::decode(&mut data) {
Some(m) => m,
None => {
trace!(target: "sync", "Invalid packet from {}", who);
io.report_peer(who, Severity::Bad("Peer sent us a packet with invalid format"));
return;
}
};
match message {
GenericMessage::Status(s) => self.on_status_message(io, who, s),
GenericMessage::BlockRequest(r) => self.on_block_request(io, who, r),
GenericMessage::BlockResponse(r) => {
let request = {
let mut peers = self.context_data.peers.write();
if let Some(ref mut peer) = peers.get_mut(&who) {
peer.request_timestamp = None;
match mem::replace(&mut peer.block_request, None) {
Some(r) => r,
None => {
io.report_peer(who, Severity::Bad("Unexpected response packet received from peer"));
return;
}
}
} else {
io.report_peer(who, Severity::Bad("Unexpected packet received from peer"));
return;
}
};
if request.id != r.id {
trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", who, request.id, r.id);
return;
}
self.on_block_response(io, who, request, r);
},
GenericMessage::BlockAnnounce(announce) => self.on_block_announce(io, who, announce),
GenericMessage::Transactions(m) => self.on_extrinsics(io, who, m),
GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(io, who, request),
GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(io, who, response),
GenericMessage::RemoteReadRequest(request) => self.on_remote_read_request(io, who, request),
GenericMessage::RemoteReadResponse(response) => self.on_remote_read_response(io, who, response),
GenericMessage::RemoteHeaderRequest(request) => self.on_remote_header_request(io, who, request),
GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(io, who, response),
other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, other),
}
}
pub fn send_message(&self, io: &mut SyncIo, who: NodeIndex, message: Message<B>) {
send_message::<B, H>(&self.context_data.peers, io, who, message)
}
/// Called when a new peer is connected
pub fn on_peer_connected(&self, io: &mut SyncIo, who: NodeIndex) {
trace!(target: "sync", "Connected {}: {}", who, io.peer_info(who));
self.handshaking_peers.write().insert(who, time::Instant::now());
self.send_status(io, who);
}
/// Called by peer when it is disconnecting
pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: NodeIndex) {
trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_info(peer));
// lock all the the peer lists so that add/remove peer events are in order
let mut sync = self.sync.write();
let mut spec = self.specialization.write();
let removed = {
let mut peers = self.context_data.peers.write();
let mut handshaking_peers = self.handshaking_peers.write();
handshaking_peers.remove(&peer);
peers.remove(&peer).is_some()
};
if removed {
let mut context = ProtocolContext::new(&self.context_data, io);
sync.peer_disconnected(&mut context, peer);
spec.on_disconnect(&mut context, peer);
self.on_demand.as_ref().map(|s| s.on_disconnect(peer));
}
}
fn on_block_request(&self, io: &mut SyncIo, peer: NodeIndex, request: message::BlockRequest<B>) {
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, request.from, request.to, request.max);
let mut blocks = Vec::new();
let mut id = match request.from {
message::FromBlock::Hash(h) => BlockId::Hash(h),
message::FromBlock::Number(n) => BlockId::Number(n),
};
let max = cmp::min(request.max.unwrap_or(u32::max_value()), MAX_BLOCK_DATA_RESPONSE) as usize;
// TODO: receipts, etc.
let get_header = request.fields.contains(message::BlockAttributes::HEADER);
let get_body = request.fields.contains(message::BlockAttributes::BODY);
let get_justification = request.fields.contains(message::BlockAttributes::JUSTIFICATION);
while let Some(header) = self.context_data.chain.header(&id).unwrap_or(None) {
if blocks.len() >= max {
break;
}
let number = header.number().clone();
let hash = header.hash();
let justification = if get_justification { self.context_data.chain.justification(&BlockId::Hash(hash)).unwrap_or(None) } else { None };
let block_data = message::generic::BlockData {
hash: hash,
header: if get_header { Some(header) } else { None },
body: if get_body { self.context_data.chain.body(&BlockId::Hash(hash)).unwrap_or(None) } else { None },
receipt: None,
message_queue: None,
justification,
};
blocks.push(block_data);
match request.direction {
message::Direction::Ascending => id = BlockId::Number(number + As::sa(1)),
message::Direction::Descending => {
if number == As::sa(0) {
break;
}
id = BlockId::Number(number - As::sa(1))
}
}
}
let response = message::generic::BlockResponse {
id: request.id,
blocks: blocks,
};
trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len());
self.send_message(io, peer, GenericMessage::BlockResponse(response))
}
fn on_block_response(&self, io: &mut SyncIo, peer: NodeIndex, request: message::BlockRequest<B>, response: message::BlockResponse<B>) {
// TODO: validate response
let blocks_range = match (
response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
trace!(target: "sync", "BlockResponse {} from {} with {} blocks{}",
response.id, peer, response.blocks.len(), blocks_range);
self.sync.write().on_block_data(&mut ProtocolContext::new(&self.context_data, io), peer, request, response);
}
/// Perform time based maintenance.
pub fn tick(&self, io: &mut SyncIo) {
self.maintain_peers(io);
self.on_demand.as_ref().map(|s| s.maintain_peers(io));
}
fn maintain_peers(&self, io: &mut SyncIo) {
let tick = time::Instant::now();
let mut aborting = Vec::new();
{
let peers = self.context_data.peers.read();
let handshaking_peers = self.handshaking_peers.read();
for (who, timestamp) in peers.iter()
.filter_map(|(id, peer)| peer.request_timestamp.as_ref().map(|r| (id, r)))
.chain(handshaking_peers.iter()) {
if (tick - *timestamp).as_secs() > REQUEST_TIMEOUT_SEC {
trace!(target: "sync", "Timeout {}", who);
aborting.push(*who);
}
}
}
self.specialization.write().maintain_peers(&mut ProtocolContext::new(&self.context_data, io));
for p in aborting {
io.report_peer(p, Severity::Timeout);
}
}
pub fn peer_info(&self, peer: NodeIndex) -> Option<PeerInfo<B>> {
self.context_data.peers.read().get(&peer).map(|p| {
PeerInfo {
roles: p.roles,
protocol_version: p.protocol_version,
best_hash: p.best_hash,
best_number: p.best_number,
}
})
}
/// Called by peer to report status
fn on_status_message(&self, io: &mut SyncIo, who: NodeIndex, status: message::Status<B>) {
trace!(target: "sync", "New peer {} {:?}", who, status);
if io.is_expired() {
trace!(target: "sync", "Status packet from expired session {}:{}", who, io.peer_info(who));
return;
}
{
let mut peers = self.context_data.peers.write();
let mut handshaking_peers = self.handshaking_peers.write();
if peers.contains_key(&who) {
debug!(target: "sync", "Unexpected status packet from {}:{}", who, io.peer_info(who));
return;
}
if status.genesis_hash != self.genesis_hash {
io.report_peer(who, Severity::Bad(&format!("Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash)));
return;
}
if status.version != CURRENT_VERSION {
io.report_peer(who, Severity::Bad(&format!("Peer using unsupported protocol version {}", status.version)));
return;
}
let peer = Peer {
protocol_version: status.version,
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number,
block_request: None,
request_timestamp: None,
known_extrinsics: HashSet::new(),
known_blocks: HashSet::new(),
next_request_id: 0,
};
peers.insert(who.clone(), peer);
handshaking_peers.remove(&who);
debug!(target: "sync", "Connected {} {}", who, io.peer_info(who));
}
let mut context = ProtocolContext::new(&self.context_data, io);
self.sync.write().new_peer(&mut context, who);
self.specialization.write().on_connect(&mut context, who, status.clone());
self.on_demand.as_ref().map(|s| s.on_connect(who, status.roles));
}
/// Called when peer sends us new extrinsics
fn on_extrinsics(&self, _io: &mut SyncIo, who: NodeIndex, extrinsics: message::Transactions<B::Extrinsic>) {
// Accept extrinsics only when fully synced
if self.sync.read().status().state != SyncState::Idle {
trace!(target: "sync", "{} Ignoring extrinsics while syncing", who);
return;
}
trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who);
let mut peers = self.context_data.peers.write();
if let Some(ref mut peer) = peers.get_mut(&who) {
for t in extrinsics {
if let Some(hash) = self.transaction_pool.import(&t) {
peer.known_extrinsics.insert(hash);
}
}
}
}
/// Called when we propagate ready extrinsics to peers.
pub fn propagate_extrinsics(&self, io: &mut SyncIo) {
debug!(target: "sync", "Propagating extrinsics");
// Accept transactions only when fully synced
if self.sync.read().status().state != SyncState::Idle {
return;
}
let extrinsics = self.transaction_pool.transactions();
let mut propagated_to = HashMap::new();
let mut peers = self.context_data.peers.write();
for (who, ref mut peer) in peers.iter_mut() {
let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
.iter()
.cloned()
.filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone()))
.unzip();
if !to_send.is_empty() {
let node_id = io.peer_session_info(*who).map(|info| match info.id {
Some(id) => format!("{}@{:x}", info.remote_address, id),
None => info.remote_address.clone(),
});
if let Some(id) = node_id {
for hash in hashes {
propagated_to.entry(hash).or_insert_with(Vec::new).push(id.clone());
}
}
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
self.send_message(io, *who, GenericMessage::Transactions(to_send));
}
}
self.transaction_pool.on_broadcasted(propagated_to);
}
/// Send Status message
fn send_status(&self, io: &mut SyncIo, who: NodeIndex) {
if let Ok(info) = self.context_data.chain.info() {
let status = message::generic::Status {
version: CURRENT_VERSION,
genesis_hash: info.chain.genesis_hash,
roles: self.config.roles.into(),
best_number: info.chain.best_number,
best_hash: info.chain.best_hash,
chain_status: self.specialization.read().status(),
};
self.send_message(io, who, GenericMessage::Status(status))
}
}
pub fn abort(&self) {
let mut sync = self.sync.write();
let mut spec = self.specialization.write();
let mut peers = self.context_data.peers.write();
let mut handshaking_peers = self.handshaking_peers.write();
sync.clear();
spec.on_abort();
peers.clear();
handshaking_peers.clear();
}
pub fn stop(&self) {
// stop processing import requests first (without holding a sync lock)
let import_queue = self.sync.read().import_queue();
import_queue.stop();
// and then clear all the sync data
self.abort();
}
pub fn on_block_announce(&self, io: &mut SyncIo, who: NodeIndex, announce: message::BlockAnnounce<B::Header>) {
let header = announce.header;
let hash = header.hash();
{
let mut peers = self.context_data.peers.write();
if let Some(ref mut peer) = peers.get_mut(&who) {
peer.known_blocks.insert(hash.clone());
}
}
self.sync.write().on_block_announce(&mut ProtocolContext::new(&self.context_data, io), who, hash, &header);
}
pub fn on_block_imported(&self, io: &mut SyncIo, hash: B::Hash, header: &B::Header) {
self.sync.write().update_chain_info(&header);
self.specialization.write().on_block_imported(
&mut ProtocolContext::new(&self.context_data, io),
hash.clone(),
header
);
// blocks are not announced by light clients
if self.config.roles & Roles::LIGHT == Roles::LIGHT {
return;
}
// send out block announcements
let mut peers = self.context_data.peers.write();
for (who, ref mut peer) in peers.iter_mut() {
if peer.known_blocks.insert(hash.clone()) {
trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
self.send_message(io, *who, GenericMessage::BlockAnnounce(message::BlockAnnounce {
header: header.clone()
}));
}
}
}
fn on_remote_call_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteCallRequest<B::Hash>) {
trace!(target: "sync", "Remote call request {} from {} ({} at {})", request.id, who, request.method, request.block);
let proof = match self.context_data.chain.execution_proof(&request.block, &request.method, &request.data) {
Ok((_, proof)) => proof,
Err(error) => {
trace!(target: "sync", "Remote call request {} from {} ({} at {}) failed with: {}",
request.id, who, request.method, request.block, error);
Default::default()
},
};
self.send_message(io, who, GenericMessage::RemoteCallResponse(message::RemoteCallResponse {
id: request.id, proof,
}));
}
fn on_remote_call_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteCallResponse) {
trace!(target: "sync", "Remote call response {} from {}", response.id, who);
self.on_demand.as_ref().map(|s| s.on_remote_call_response(io, who, response));
}
fn on_remote_read_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteReadRequest<B::Hash>) {
trace!(target: "sync", "Remote read request {} from {} ({} at {})",
request.id, who, request.key.to_hex(), request.block);
let proof = match self.context_data.chain.read_proof(&request.block, &request.key) {
Ok(proof) => proof,
Err(error) => {
trace!(target: "sync", "Remote read request {} from {} ({} at {}) failed with: {}",
request.id, who, request.key.to_hex(), request.block, error);
Default::default()
},
};
self.send_message(io, who, GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
id: request.id, proof,
}));
}
fn on_remote_read_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteReadResponse) {
trace!(target: "sync", "Remote read response {} from {}", response.id, who);
self.on_demand.as_ref().map(|s| s.on_remote_read_response(io, who, response));
}
fn on_remote_header_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteHeaderRequest<NumberFor<B>>) {
trace!(target: "sync", "Remote header proof request {} from {} ({})",
request.id, who, request.block);
let (header, proof) = match self.context_data.chain.header_proof(request.block) {
Ok((header, proof)) => (Some(header), proof),
Err(error) => {
trace!(target: "sync", "Remote header proof request {} from {} ({}) failed with: {}",
request.id, who, request.block, error);
(Default::default(), Default::default())
},
};
self.send_message(io, who, GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse {
id: request.id, header, proof,
}));
}
fn on_remote_header_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteHeaderResponse<B::Header>) {
trace!(target: "sync", "Remote header proof response {} from {}", response.id, who);
self.on_demand.as_ref().map(|s| s.on_remote_header_response(io, who, response));
}
/// Execute a closure with access to a network context and specialization.
pub fn with_spec<F, U>(&self, io: &mut SyncIo, f: F) -> U
where F: FnOnce(&mut S, &mut Context<B>) -> U
{
f(&mut* self.specialization.write(), &mut ProtocolContext::new(&self.context_data, io))
}
}
fn send_message<B: BlockT, H: ExHashT>(peers: &RwLock<HashMap<NodeIndex, Peer<B, H>>>, io: &mut SyncIo, who: NodeIndex, mut message: Message<B>) {
match &mut message {
&mut GenericMessage::BlockRequest(ref mut r) => {
let mut peers = peers.write();
if let Some(ref mut peer) = peers.get_mut(&who) {
r.id = peer.next_request_id;
peer.next_request_id = peer.next_request_id + 1;
peer.block_request = Some(r.clone());
peer.request_timestamp = Some(time::Instant::now());
}
},
_ => (),
}
io.send(who, message.encode());
}
/// Hash a message.
pub(crate) fn hash_message<B: BlockT>(message: &Message<B>) -> B::Hash {
let data = message.encode();
HashFor::<B>::hash(&data)
}
+340
View File
@@ -0,0 +1,340 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::sync::Arc;
use std::io;
use std::time::Duration;
use futures::sync::{oneshot, mpsc};
use network_libp2p::{NetworkProtocolHandler, NetworkContext, NodeIndex, ProtocolId,
NetworkConfiguration , NonReservedPeerMode, ErrorKind};
use network_libp2p::{NetworkService};
use core_io::{TimerToken};
use io::NetSyncIo;
use protocol::{Protocol, ProtocolContext, Context, ProtocolStatus, PeerInfo as ProtocolPeerInfo};
use config::{ProtocolConfig};
use error::Error;
use chain::Client;
use message::LocalizedBftMessage;
use specialization::Specialization;
use on_demand::OnDemandService;
use import_queue::AsyncImportQueue;
use runtime_primitives::traits::{Block as BlockT};
/// Type that represents fetch completion future.
pub type FetchFuture = oneshot::Receiver<Vec<u8>>;
/// Type that represents bft messages stream.
pub type BftMessageStream<B> = mpsc::UnboundedReceiver<LocalizedBftMessage<B>>;
const TICK_TOKEN: TimerToken = 0;
const TICK_TIMEOUT: Duration = Duration::from_millis(1000);
const PROPAGATE_TOKEN: TimerToken = 1;
const PROPAGATE_TIMEOUT: Duration = Duration::from_millis(5000);
bitflags! {
/// Node roles bitmask.
pub struct Roles: u8 {
/// No network.
const NONE = 0b00000000;
/// Full node, does not participate in consensus.
const FULL = 0b00000001;
/// Light client node.
const LIGHT = 0b00000010;
/// Act as an authority
const AUTHORITY = 0b00000100;
}
}
impl ::codec::Encode for Roles {
fn encode_to<T: ::codec::Output>(&self, dest: &mut T) {
dest.push_byte(self.bits())
}
}
impl ::codec::Decode for Roles {
fn decode<I: ::codec::Input>(input: &mut I) -> Option<Self> {
Self::from_bits(input.read_byte()?)
}
}
/// Sync status
pub trait SyncProvider<B: BlockT>: Send + Sync {
/// Get sync status
fn status(&self) -> ProtocolStatus<B>;
/// Get peers information
fn peers(&self) -> Vec<PeerInfo<B>>;
/// Get this node id if available.
fn node_id(&self) -> Option<String>;
}
pub trait ExHashT: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static {}
impl<T> ExHashT for T where T: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static {}
/// Transaction pool interface
pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
/// Get transactions from the pool that are ready to be propagated.
fn transactions(&self) -> Vec<(H, B::Extrinsic)>;
/// Import a transaction into the pool.
fn import(&self, transaction: &B::Extrinsic) -> Option<H>;
/// Notify the pool about transactions broadcast.
fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>);
}
/// ConsensusService
pub trait ConsensusService<B: BlockT>: Send + Sync {
/// Maintain connectivity to given addresses.
fn connect_to_authorities(&self, addresses: &[String]);
/// Get BFT message stream for messages corresponding to consensus on given
/// parent hash.
fn bft_messages(&self, parent_hash: B::Hash) -> BftMessageStream<B>;
/// Send out a BFT message.
fn send_bft_message(&self, message: LocalizedBftMessage<B>);
}
/// Service able to execute closure in the network context.
pub trait ExecuteInContext<B: BlockT>: Send + Sync {
/// Execute closure in network context.
fn execute_in_context<F: Fn(&mut Context<B>)>(&self, closure: F);
}
/// Network protocol handler
struct ProtocolHandler<B: BlockT, S: Specialization<B>, H: ExHashT> {
protocol: Protocol<B, S, H>,
}
/// Peer connection information
#[derive(Debug)]
pub struct PeerInfo<B: BlockT> {
/// Public node id
pub id: Option<String>,
/// Node client ID
pub client_version: String,
/// Capabilities
pub capabilities: Vec<String>,
/// Remote endpoint address
pub remote_address: String,
/// Local endpoint address
pub local_address: String,
/// Dot protocol info.
pub dot_info: Option<ProtocolPeerInfo<B>>,
}
/// Service initialization parameters.
pub struct Params<B: BlockT, S, H: ExHashT> {
/// Configuration.
pub config: ProtocolConfig,
/// Network layer configuration.
pub network_config: NetworkConfiguration,
/// Polkadot relay chain access point.
pub chain: Arc<Client<B>>,
/// On-demand service reference.
pub on_demand: Option<Arc<OnDemandService<B>>>,
/// Transaction pool.
pub transaction_pool: Arc<TransactionPool<H, B>>,
/// Protocol specialization.
pub specialization: S,
}
/// Polkadot network service. Handles network IO and manages connectivity.
pub struct Service<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> {
/// Network service
network: NetworkService,
/// Devp2p protocol handler
handler: Arc<ProtocolHandler<B, S, H>>,
/// Devp2p protocol ID.
protocol_id: ProtocolId,
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> {
/// Creates and register protocol with the network service
pub fn new(params: Params<B, S, H>, protocol_id: ProtocolId) -> Result<Arc<Service<B, S, H>>, Error> {
let chain = params.chain.clone();
let import_queue = Arc::new(AsyncImportQueue::new());
let handler = Arc::new(ProtocolHandler {
protocol: Protocol::new(
params.config,
params.chain,
import_queue.clone(),
params.on_demand,
params.transaction_pool,
params.specialization,
)?,
});
let versions = [(::protocol::CURRENT_VERSION as u8, ::protocol::CURRENT_PACKET_COUNT)];
let protocols = vec![(handler.clone() as Arc<_>, protocol_id, &versions[..])];
let service = match NetworkService::new(params.network_config.clone(), protocols) {
Ok(service) => service,
Err(err) => {
match err.kind() {
ErrorKind::Io(ref e) if e.kind() == io::ErrorKind::AddrInUse =>
warn!("Network port is already in use, make sure that another instance of Polkadot client is not running or change the port using the --port option."),
_ => warn!("Error starting network: {}", err),
};
return Err(err.into())
},
};
let sync = Arc::new(Service {
network: service,
protocol_id,
handler,
});
import_queue.start(
Arc::downgrade(sync.handler.protocol.sync()),
Arc::downgrade(&sync),
Arc::downgrade(&chain)
)?;
Ok(sync)
}
/// Called when a new block is imported by the client.
pub fn on_block_imported(&self, hash: B::Hash, header: &B::Header) {
self.network.with_context(self.protocol_id, |context| {
self.handler.protocol.on_block_imported(&mut NetSyncIo::new(context), hash, header)
});
}
/// Called when new transactons are imported by the client.
pub fn trigger_repropagate(&self) {
self.network.with_context(self.protocol_id, |context| {
self.handler.protocol.propagate_extrinsics(&mut NetSyncIo::new(context));
});
}
/// Execute a closure with the chain-specific network specialization.
/// If the network is unavailable, this will return `None`.
pub fn with_spec<F, U>(&self, f: F) -> Option<U>
where F: FnOnce(&mut S, &mut Context<B>) -> U
{
let mut res = None;
self.network.with_context(self.protocol_id, |context| {
res = Some(self.handler.protocol.with_spec(&mut NetSyncIo::new(context), f))
});
res
}
}
impl<B: BlockT + 'static, S: Specialization<B>, H:ExHashT> Drop for Service<B, S, H> {
fn drop(&mut self) {
self.handler.protocol.stop();
}
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> ExecuteInContext<B> for Service<B, S, H> {
fn execute_in_context<F: Fn(&mut ::protocol::Context<B>)>(&self, closure: F) {
self.network.with_context(self.protocol_id, |context| {
closure(&mut ProtocolContext::new(self.handler.protocol.context_data(), &mut NetSyncIo::new(context)))
});
}
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> SyncProvider<B> for Service<B, S, H> {
/// Get sync status
fn status(&self) -> ProtocolStatus<B> {
self.handler.protocol.status()
}
/// Get sync peers
fn peers(&self) -> Vec<PeerInfo<B>> {
self.network.with_context_eval(self.protocol_id, |ctx| {
let peer_ids = self.network.connected_peers();
peer_ids.into_iter().filter_map(|who| {
let session_info = match ctx.session_info(who) {
None => return None,
Some(info) => info,
};
Some(PeerInfo {
id: session_info.id.map(|id| format!("{:x}", id)),
client_version: session_info.client_version,
capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(),
remote_address: session_info.remote_address,
local_address: session_info.local_address,
dot_info: self.handler.protocol.peer_info(who),
})
}).collect()
}).unwrap_or_else(Vec::new)
}
fn node_id(&self) -> Option<String> {
self.network.external_url()
}
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> NetworkProtocolHandler for ProtocolHandler<B, S, H> {
fn initialize(&self, io: &NetworkContext) {
io.register_timer(TICK_TOKEN, TICK_TIMEOUT)
.expect("Error registering sync timer");
io.register_timer(PROPAGATE_TOKEN, PROPAGATE_TIMEOUT)
.expect("Error registering transaction propagation timer");
}
fn read(&self, io: &NetworkContext, peer: &NodeIndex, _packet_id: u8, data: &[u8]) {
self.protocol.handle_packet(&mut NetSyncIo::new(io), *peer, data);
}
fn connected(&self, io: &NetworkContext, peer: &NodeIndex) {
self.protocol.on_peer_connected(&mut NetSyncIo::new(io), *peer);
}
fn disconnected(&self, io: &NetworkContext, peer: &NodeIndex) {
self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer);
}
fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
match timer {
TICK_TOKEN => self.protocol.tick(&mut NetSyncIo::new(io)),
PROPAGATE_TOKEN => self.protocol.propagate_extrinsics(&mut NetSyncIo::new(io)),
_ => {}
}
}
}
/// Trait for managing network
pub trait ManageNetwork: Send + Sync {
/// Set to allow unreserved peers to connect
fn accept_unreserved_peers(&self);
/// Set to deny unreserved peers to connect
fn deny_unreserved_peers(&self);
/// Remove reservation for the peer
fn remove_reserved_peer(&self, peer: String) -> Result<(), String>;
/// Add reserved peer
fn add_reserved_peer(&self, peer: String) -> Result<(), String>;
}
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> ManageNetwork for Service<B, S, H> {
fn accept_unreserved_peers(&self) {
self.network.set_non_reserved_mode(NonReservedPeerMode::Accept);
}
fn deny_unreserved_peers(&self) {
self.network.set_non_reserved_mode(NonReservedPeerMode::Deny);
}
fn remove_reserved_peer(&self, peer: String) -> Result<(), String> {
self.network.remove_reserved_peer(&peer).map_err(|e| format!("{:?}", e))
}
fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
self.network.add_reserved_peer(&peer).map_err(|e| format!("{:?}", e))
}
}
@@ -0,0 +1,48 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Specializations of the substrate network protocol to allow more complex forms of communication.
use ::NodeIndex;
use runtime_primitives::traits::Block as BlockT;
use protocol::Context;
/// A specialization of the substrate network protocol. Handles events and sends messages.
pub trait Specialization<B: BlockT>: Send + Sync + 'static {
/// Get the current specialization-status.
fn status(&self) -> Vec<u8>;
/// Called on start-up.
fn on_start(&mut self) { }
/// Called when a peer successfully handshakes.
fn on_connect(&mut self, ctx: &mut Context<B>, who: NodeIndex, status: ::message::Status<B>);
/// Called when a peer is disconnected. If the peer ID is unknown, it should be ignored.
fn on_disconnect(&mut self, ctx: &mut Context<B>, who: NodeIndex);
/// Called when a network-specific message arrives.
fn on_message(&mut self, ctx: &mut Context<B>, who: NodeIndex, message: ::message::Message<B>);
/// Called on abort.
fn on_abort(&mut self) { }
/// Called periodically to maintain peers and handle timeouts.
fn maintain_peers(&mut self, _ctx: &mut Context<B>) { }
/// Called when a block is _imported_ at the head of the chain (not during major sync).
fn on_block_imported(&mut self, _ctx: &mut Context<B>, _hash: B::Hash, _header: &B::Header) { }
}
+428
View File
@@ -0,0 +1,428 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::sync::Arc;
use protocol::Context;
use network_libp2p::{Severity, NodeIndex};
use client::{BlockStatus, BlockOrigin, ClientInfo};
use client::error::Error as ClientError;
use blocks::{self, BlockCollection};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor};
use runtime_primitives::generic::BlockId;
use message::{self, generic::Message as GenericMessage};
use service::Roles;
use import_queue::ImportQueue;
// Maximum blocks to request in a single packet.
const MAX_BLOCKS_TO_REQUEST: usize = 128;
// Maximum blocks to store in the import queue.
const MAX_IMPORTING_BLOCKS: usize = 2048;
struct PeerSync<B: BlockT> {
pub common_hash: B::Hash,
pub common_number: NumberFor<B>,
pub best_hash: B::Hash,
pub best_number: NumberFor<B>,
pub state: PeerSyncState<B>,
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
enum PeerSyncState<B: BlockT> {
AncestorSearch(NumberFor<B>),
Available,
DownloadingNew(NumberFor<B>),
DownloadingStale(B::Hash),
}
/// Relay chain sync strategy.
pub struct ChainSync<B: BlockT> {
genesis_hash: B::Hash,
peers: HashMap<NodeIndex, PeerSync<B>>,
blocks: BlockCollection<B>,
best_queued_number: NumberFor<B>,
best_queued_hash: B::Hash,
required_block_attributes: message::BlockAttributes,
import_queue: Arc<ImportQueue<B>>,
}
/// Reported sync state.
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum SyncState {
/// Initial sync is complete, keep-up sync is active.
Idle,
/// Actively catching up with the chain.
Downloading
}
/// Syncing status and statistics
#[derive(Clone)]
pub struct Status<B: BlockT> {
/// Current global sync state.
pub state: SyncState,
/// Target sync block number.
pub best_seen_block: Option<NumberFor<B>>,
}
impl<B: BlockT> ChainSync<B> {
/// Create a new instance.
pub(crate) fn new(role: Roles, info: &ClientInfo<B>, import_queue: Arc<ImportQueue<B>>) -> Self {
let mut required_block_attributes = message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION;
if role.intersects(Roles::FULL | Roles::AUTHORITY) {
required_block_attributes |= message::BlockAttributes::BODY;
}
ChainSync {
genesis_hash: info.chain.genesis_hash,
peers: HashMap::new(),
blocks: BlockCollection::new(),
best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash),
best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number),
required_block_attributes,
import_queue,
}
}
fn best_seen_block(&self) -> Option<NumberFor<B>> {
self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number)
}
/// Returns import queue reference.
pub(crate) fn import_queue(&self) -> Arc<ImportQueue<B>> {
self.import_queue.clone()
}
/// Returns sync status.
pub(crate) fn status(&self) -> Status<B> {
let best_seen = self.best_seen_block();
let state = match &best_seen {
&Some(n) if n > self.best_queued_number && n - self.best_queued_number > As::sa(5) => SyncState::Downloading,
_ => SyncState::Idle,
};
Status {
state: state,
best_seen_block: best_seen,
}
}
/// Handle new connected peer.
pub(crate) fn new_peer(&mut self, protocol: &mut Context<B>, who: NodeIndex) {
if let Some(info) = protocol.peer_info(who) {
match (block_status(&*protocol.client(), &*self.import_queue, info.best_hash), info.best_number) {
(Err(e), _) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
protocol.report_peer(who, Severity::Useless(&format!("Error legimimately reading blockchain status: {:?}", e)));
},
(Ok(BlockStatus::KnownBad), _) => {
protocol.report_peer(who, Severity::Bad(&format!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number)));
},
(Ok(BlockStatus::Unknown), b) if b == As::sa(0) => {
protocol.report_peer(who, Severity::Bad(&format!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number)));
},
(Ok(BlockStatus::Unknown), _) => {
let our_best = self.best_queued_number;
if our_best > As::sa(0) {
debug!(target:"sync", "New peer with unknown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number);
self.peers.insert(who, PeerSync {
common_hash: self.genesis_hash,
common_number: As::sa(0),
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::AncestorSearch(our_best),
});
Self::request_ancestry(protocol, who, our_best)
} else {
// We are at genesis, just start downloading
debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number);
self.peers.insert(who, PeerSync {
common_hash: self.genesis_hash,
common_number: As::sa(0),
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::Available,
});
self.download_new(protocol, who)
}
},
(Ok(BlockStatus::Queued), _) | (Ok(BlockStatus::InChain), _) => {
debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number);
self.peers.insert(who, PeerSync {
common_hash: info.best_hash,
common_number: info.best_number,
best_hash: info.best_hash,
best_number: info.best_number,
state: PeerSyncState::Available,
});
}
}
}
}
pub(crate) fn on_block_data(&mut self, protocol: &mut Context<B>, who: NodeIndex, _request: message::BlockRequest<B>, response: message::BlockResponse<B>) {
let new_blocks = if let Some(ref mut peer) = self.peers.get_mut(&who) {
match peer.state {
PeerSyncState::DownloadingNew(start_block) => {
self.blocks.clear_peer_download(who);
peer.state = PeerSyncState::Available;
self.blocks.insert(start_block, response.blocks, who);
self.blocks.drain(self.best_queued_number + As::sa(1))
},
PeerSyncState::DownloadingStale(_) => {
peer.state = PeerSyncState::Available;
response.blocks.into_iter().map(|b| blocks::BlockData {
origin: who,
block: b
}).collect()
},
PeerSyncState::AncestorSearch(n) => {
match response.blocks.get(0) {
Some(ref block) => {
trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", n, block.hash, who);
match protocol.client().block_hash(n) {
Ok(Some(block_hash)) if block_hash == block.hash => {
if peer.common_number < n {
peer.common_hash = block.hash;
peer.common_number = n;
}
peer.state = PeerSyncState::Available;
trace!(target:"sync", "Found common ancestor for peer {}: {} ({})", who, block.hash, n);
vec![]
},
Ok(our_best) if n > As::sa(0) => {
trace!(target:"sync", "Ancestry block mismatch for peer {}: theirs: {} ({}), ours: {:?}", who, block.hash, n, our_best);
let n = n - As::sa(1);
peer.state = PeerSyncState::AncestorSearch(n);
Self::request_ancestry(protocol, who, n);
return;
},
Ok(_) => { // genesis mismatch
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who);
protocol.report_peer(who, Severity::Bad("Ancestry search: genesis mismatch for peer"));
return;
},
Err(e) => {
protocol.report_peer(who, Severity::Useless(&format!("Error answering legitimate blockchain query: {:?}", e)));
return;
}
}
},
None => {
trace!(target:"sync", "Invalid response when searching for ancestor from {}", who);
protocol.report_peer(who, Severity::Bad("Invalid response when searching for ancestor"));
return;
}
}
},
PeerSyncState::Available => Vec::new(),
}
} else {
vec![]
};
let best_seen = self.best_seen_block();
let is_best = new_blocks.first().and_then(|b| b.block.header.as_ref()).map(|h| best_seen.as_ref().map_or(false, |n| h.number() >= n));
let origin = if is_best.unwrap_or_default() { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync };
let import_queue = self.import_queue.clone();
if let Some((hash, number)) = new_blocks.last()
.and_then(|b| b.block.header.as_ref().map(|h|(b.block.hash.clone(), *h.number())))
{
if number > self.best_queued_number {
self.best_queued_number = number;
self.best_queued_hash = hash;
}
}
import_queue.import_blocks(self, protocol, (origin, new_blocks));
self.maintain_sync(protocol);
}
pub fn maintain_sync(&mut self, protocol: &mut Context<B>) {
let peers: Vec<NodeIndex> = self.peers.keys().map(|p| *p).collect();
for peer in peers {
self.download_new(protocol, peer);
}
}
pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
if number > self.best_queued_number {
self.best_queued_number = number;
self.best_queued_hash = *hash;
}
// Update common blocks
for (_, peer) in self.peers.iter_mut() {
trace!("Updating peer info ours={}, theirs={}", number, peer.best_number);
if peer.best_number >= number {
peer.common_number = number;
peer.common_hash = *hash;
}
}
}
pub(crate) fn update_chain_info(&mut self, best_header: &B::Header) {
let hash = best_header.hash();
self.block_imported(&hash, best_header.number().clone())
}
pub(crate) fn on_block_announce(&mut self, protocol: &mut Context<B>, who: NodeIndex, hash: B::Hash, header: &B::Header) {
let number = *header.number();
if let Some(ref mut peer) = self.peers.get_mut(&who) {
if number > peer.best_number {
peer.best_number = number;
peer.best_hash = hash;
}
if number <= self.best_queued_number && number > peer.common_number {
peer.common_number = number
}
} else {
return;
}
if !self.is_known_or_already_downloading(protocol, &hash) {
let stale = number <= self.best_queued_number;
if stale {
if !self.is_known_or_already_downloading(protocol, header.parent_hash()) {
trace!(target: "sync", "Ignoring unknown stale block announce from {}: {} {:?}", who, hash, header);
} else {
trace!(target: "sync", "Downloading new stale block announced from {}: {} {:?}", who, hash, header);
self.download_stale(protocol, who, &hash);
}
} else {
trace!(target: "sync", "Downloading new block announced from {}: {} {:?}", who, hash, header);
self.download_new(protocol, who);
}
} else {
trace!(target: "sync", "Known block announce from {}: {}", who, hash);
}
}
fn is_known_or_already_downloading(&self, protocol: &mut Context<B>, hash: &B::Hash) -> bool {
self.peers.iter().any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash))
|| block_status(&*protocol.client(), &*self.import_queue, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown)
}
pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context<B>, who: NodeIndex) {
self.blocks.clear_peer_download(who);
self.peers.remove(&who);
self.maintain_sync(protocol);
}
pub(crate) fn restart(&mut self, protocol: &mut Context<B>) {
self.import_queue.clear();
self.blocks.clear();
let ids: Vec<NodeIndex> = self.peers.keys().map(|p| *p).collect();
for id in ids {
self.new_peer(protocol, id);
}
match protocol.client().info() {
Ok(info) => {
self.best_queued_hash = info.best_queued_hash.unwrap_or(info.chain.best_hash);
self.best_queued_number = info.best_queued_number.unwrap_or(info.chain.best_number);
},
Err(e) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
self.best_queued_hash = self.genesis_hash;
self.best_queued_number = As::sa(0);
}
}
}
pub(crate) fn clear(&mut self) {
self.blocks.clear();
self.peers.clear();
}
// Download old block.
fn download_stale(&mut self, protocol: &mut Context<B>, who: NodeIndex, hash: &B::Hash) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
match peer.state {
PeerSyncState::Available => {
let request = message::generic::BlockRequest {
id: 0,
fields: self.required_block_attributes.clone(),
from: message::FromBlock::Hash(*hash),
to: None,
direction: message::Direction::Ascending,
max: Some(1),
};
peer.state = PeerSyncState::DownloadingStale(*hash);
protocol.send_message(who, GenericMessage::BlockRequest(request));
},
_ => (),
}
}
}
// Issue a request for a peer to download new blocks, if any are available
fn download_new(&mut self, protocol: &mut Context<B>, who: NodeIndex) {
if let Some(ref mut peer) = self.peers.get_mut(&who) {
let import_status = self.import_queue.status();
// when there are too many blocks in the queue => do not try to download new blocks
if import_status.importing_count > MAX_IMPORTING_BLOCKS {
return;
}
// we should not download already queued blocks
let common_number = ::std::cmp::max(peer.common_number, import_status.best_importing_number);
trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", who, common_number, peer.best_number);
match peer.state {
PeerSyncState::Available => {
if let Some(range) = self.blocks.needed_blocks(who, MAX_BLOCKS_TO_REQUEST, peer.best_number, common_number) {
trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end);
let request = message::generic::BlockRequest {
id: 0,
fields: self.required_block_attributes.clone(),
from: message::FromBlock::Number(range.start),
to: None,
direction: message::Direction::Ascending,
max: Some((range.end - range.start).as_() as u32),
};
peer.state = PeerSyncState::DownloadingNew(range.start);
protocol.send_message(who, GenericMessage::BlockRequest(request));
} else {
trace!(target: "sync", "Nothing to request");
}
},
_ => (),
}
}
}
fn request_ancestry(protocol: &mut Context<B>, who: NodeIndex, block: NumberFor<B>) {
trace!(target: "sync", "Requesting ancestry block #{} from {}", block, who);
let request = message::generic::BlockRequest {
id: 0,
fields: message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION,
from: message::FromBlock::Number(block),
to: None,
direction: message::Direction::Ascending,
max: Some(1),
};
protocol.send_message(who, GenericMessage::BlockRequest(request));
}
}
/// Get block status, taking into account import queue.
fn block_status<B: BlockT>(
chain: &::chain::Client<B>,
queue: &ImportQueue<B>,
hash: B::Hash) -> Result<BlockStatus, ClientError>
{
if queue.is_importing(&hash) {
return Ok(BlockStatus::Queued);
}
chain.block_status(&BlockId::Hash(hash))
}
+330
View File
@@ -0,0 +1,330 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
mod sync;
use std::collections::{VecDeque, HashSet, HashMap};
use std::sync::Arc;
use parking_lot::RwLock;
use client;
use client::block_builder::BlockBuilder;
use runtime_primitives::traits::Block as BlockT;
use runtime_primitives::generic::BlockId;
use io::SyncIo;
use protocol::{Context, Protocol};
use primitives::{Blake2Hasher, RlpCodec};
use config::ProtocolConfig;
use service::TransactionPool;
use network_libp2p::{NodeIndex, SessionInfo, Severity};
use keyring::Keyring;
use codec::Encode;
use import_queue::tests::SyncImportQueue;
use test_client::{self, TestClient};
use test_client::runtime::{Block, Hash, Transfer, Extrinsic};
use specialization::Specialization;
pub struct DummySpecialization;
impl Specialization<Block> for DummySpecialization {
fn status(&self) -> Vec<u8> { vec![] }
fn on_connect(&mut self, _ctx: &mut Context<Block>, _peer_id: NodeIndex, _status: ::message::Status<Block>) {
}
fn on_disconnect(&mut self, _ctx: &mut Context<Block>, _peer_id: NodeIndex) {
}
fn on_message(&mut self, _ctx: &mut Context<Block>, _peer_id: NodeIndex, _message: ::message::Message<Block>) {
}
}
pub struct TestIo<'p> {
queue: &'p RwLock<VecDeque<TestPacket>>,
pub to_disconnect: HashSet<NodeIndex>,
packets: Vec<TestPacket>,
peers_info: HashMap<NodeIndex, String>,
_sender: Option<NodeIndex>,
}
impl<'p> TestIo<'p> where {
pub fn new(queue: &'p RwLock<VecDeque<TestPacket>>, sender: Option<NodeIndex>) -> TestIo<'p> {
TestIo {
queue: queue,
_sender: sender,
to_disconnect: HashSet::new(),
packets: Vec::new(),
peers_info: HashMap::new(),
}
}
}
impl<'p> Drop for TestIo<'p> {
fn drop(&mut self) {
self.queue.write().extend(self.packets.drain(..));
}
}
impl<'p> SyncIo for TestIo<'p> {
fn report_peer(&mut self, who: NodeIndex, _reason: Severity) {
self.to_disconnect.insert(who);
}
fn is_expired(&self) -> bool {
false
}
fn send(&mut self, who: NodeIndex, data: Vec<u8>) {
self.packets.push(TestPacket {
data: data,
recipient: who,
});
}
fn peer_info(&self, who: NodeIndex) -> String {
self.peers_info.get(&who)
.cloned()
.unwrap_or_else(|| who.to_string())
}
fn peer_session_info(&self, _peer_id: NodeIndex) -> Option<SessionInfo> {
None
}
}
/// Mocked subprotocol packet
pub struct TestPacket {
data: Vec<u8>,
recipient: NodeIndex,
}
pub struct Peer {
client: Arc<client::Client<test_client::Backend, test_client::Executor, Block>>,
pub sync: Protocol<Block, DummySpecialization, Hash>,
pub queue: RwLock<VecDeque<TestPacket>>,
}
impl Peer {
/// Called after blockchain has been populated to updated current state.
fn start(&self) {
// Update the sync state to the latest chain state.
let info = self.client.info().expect("In-mem client does not fail");
let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap();
self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header);
}
/// Called on connection to other indicated peer.
fn on_connect(&self, other: NodeIndex) {
self.sync.on_peer_connected(&mut TestIo::new(&self.queue, Some(other)), other);
}
/// Called on disconnect from other indicated peer.
fn on_disconnect(&self, other: NodeIndex) {
let mut io = TestIo::new(&self.queue, Some(other));
self.sync.on_peer_disconnected(&mut io, other);
}
/// Receive a message from another peer. Return a set of peers to disconnect.
fn receive_message(&self, from: NodeIndex, msg: TestPacket) -> HashSet<NodeIndex> {
let mut io = TestIo::new(&self.queue, Some(from));
self.sync.handle_packet(&mut io, from, &msg.data);
self.flush();
io.to_disconnect.clone()
}
/// Produce the next pending message to send to another peer.
fn pending_message(&self) -> Option<TestPacket> {
self.flush();
self.queue.write().pop_front()
}
/// Whether this peer is done syncing (has no messages to send).
fn is_done(&self) -> bool {
self.queue.read().is_empty()
}
/// Execute a "sync step". This is called for each peer after it sends a packet.
fn sync_step(&self) {
self.flush();
self.sync.tick(&mut TestIo::new(&self.queue, None));
}
/// Restart sync for a peer.
fn restart_sync(&self) {
self.sync.abort();
}
fn flush(&self) {
}
fn generate_blocks<F>(&self, count: usize, mut edit_block: F)
where F: FnMut(&mut BlockBuilder<test_client::Backend, test_client::Executor, Block, Blake2Hasher, RlpCodec>)
{
for _ in 0 .. count {
let mut builder = self.client.new_block().unwrap();
edit_block(&mut builder);
let block = builder.bake().unwrap();
trace!("Generating {}, (#{})", block.hash(), block.header.number);
self.client.justify_and_import(client::BlockOrigin::File, block).unwrap();
}
}
fn push_blocks(&self, count: usize, with_tx: bool) {
let mut nonce = 0;
if with_tx {
self.generate_blocks(count, |builder| {
let transfer = Transfer {
from: Keyring::Alice.to_raw_public().into(),
to: Keyring::Alice.to_raw_public().into(),
amount: 1,
nonce,
};
let signature = Keyring::from_raw_public(transfer.from.0).unwrap().sign(&transfer.encode()).into();
builder.push(Extrinsic { transfer, signature }).unwrap();
nonce = nonce + 1;
});
} else {
self.generate_blocks(count, |_| ());
}
}
}
pub struct EmptyTransactionPool;
impl TransactionPool<Hash, Block> for EmptyTransactionPool {
fn transactions(&self) -> Vec<(Hash, Extrinsic)> {
Vec::new()
}
fn import(&self, _transaction: &Extrinsic) -> Option<Hash> {
None
}
fn on_broadcasted(&self, _: HashMap<Hash, Vec<String>>) {}
}
pub struct TestNet {
peers: Vec<Arc<Peer>>,
started: bool,
disconnect_events: Vec<(NodeIndex, NodeIndex)>, //disconnected (initiated by, to)
}
impl TestNet {
fn new(n: usize) -> Self {
Self::new_with_config(n, ProtocolConfig::default())
}
fn new_with_config(n: usize, config: ProtocolConfig) -> Self {
let mut net = TestNet {
peers: Vec::new(),
started: false,
disconnect_events: Vec::new(),
};
for _ in 0..n {
net.add_peer(&config);
}
net
}
pub fn add_peer(&mut self, config: &ProtocolConfig) {
let client = Arc::new(test_client::new());
let tx_pool = Arc::new(EmptyTransactionPool);
let import_queue = Arc::new(SyncImportQueue);
let sync = Protocol::new(config.clone(), client.clone(), import_queue, None, tx_pool, DummySpecialization).unwrap();
self.peers.push(Arc::new(Peer {
sync: sync,
client: client,
queue: RwLock::new(VecDeque::new()),
}));
}
pub fn peer(&self, i: usize) -> &Peer {
&self.peers[i]
}
fn start(&mut self) {
if self.started {
return;
}
for peer in 0..self.peers.len() {
self.peers[peer].start();
for client in 0..self.peers.len() {
if peer != client {
self.peers[peer].on_connect(client as NodeIndex);
}
}
}
self.started = true;
}
fn sync_step(&mut self) {
for peer in 0..self.peers.len() {
let packet = self.peers[peer].pending_message();
if let Some(packet) = packet {
let disconnecting = {
let recipient = packet.recipient;
trace!("--- {} -> {} ---", peer, recipient);
let to_disconnect = self.peers[recipient].receive_message(peer as NodeIndex, packet);
for d in &to_disconnect {
// notify this that disconnecting peers are disconnecting
self.peers[recipient].on_disconnect(*d as NodeIndex);
self.disconnect_events.push((peer, *d));
}
to_disconnect
};
for d in &disconnecting {
// notify other peers that this peer is disconnecting
self.peers[*d].on_disconnect(peer as NodeIndex);
}
}
self.sync_step_peer(peer);
}
}
fn sync_step_peer(&mut self, peer_num: usize) {
self.peers[peer_num].sync_step();
}
fn restart_peer(&mut self, i: usize) {
self.peers[i].restart_sync();
}
fn sync(&mut self) -> u32 {
self.start();
let mut total_steps = 0;
while !self.done() {
self.sync_step();
total_steps += 1;
}
total_steps
}
fn sync_steps(&mut self, count: usize) {
self.start();
for _ in 0..count {
self.sync_step();
}
}
fn done(&self) -> bool {
self.peers.iter().all(|p| p.is_done())
}
}
+121
View File
@@ -0,0 +1,121 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use client::backend::Backend;
use client::blockchain::HeaderBackend as BlockchainHeaderBackend;
use sync::SyncState;
use Roles;
use super::*;
#[test]
fn sync_from_two_peers_works() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.sync();
assert!(net.peer(0).client.backend().blockchain().equals_to(net.peer(1).client.backend().blockchain()));
let status = net.peer(0).sync.status();
assert_eq!(status.sync.state, SyncState::Idle);
}
#[test]
fn sync_from_two_peers_with_ancestry_search_works() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(10, true);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.restart_peer(0);
net.sync();
assert!(net.peer(0).client.backend().blockchain().canon_equals_to(net.peer(1).client.backend().blockchain()));
}
#[test]
fn sync_long_chain_works() {
let mut net = TestNet::new(2);
net.peer(1).push_blocks(500, false);
net.sync_steps(3);
assert_eq!(net.peer(0).sync.status().sync.state, SyncState::Downloading);
net.sync();
assert!(net.peer(0).client.backend().blockchain().equals_to(net.peer(1).client.backend().blockchain()));
}
#[test]
fn sync_no_common_longer_chain_fails() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(20, true);
net.peer(1).push_blocks(20, false);
net.sync();
assert!(!net.peer(0).client.backend().blockchain().canon_equals_to(net.peer(1).client.backend().blockchain()));
}
#[test]
fn sync_after_fork_works() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(30, false);
net.peer(1).push_blocks(30, false);
net.peer(2).push_blocks(30, false);
net.peer(0).push_blocks(10, true);
net.peer(1).push_blocks(20, false);
net.peer(2).push_blocks(20, false);
net.peer(1).push_blocks(10, true);
net.peer(2).push_blocks(1, false);
// peer 1 has the best chain
let peer1_chain = net.peer(1).client.backend().blockchain().clone();
net.sync();
assert!(net.peer(0).client.backend().blockchain().canon_equals_to(&peer1_chain));
assert!(net.peer(1).client.backend().blockchain().canon_equals_to(&peer1_chain));
assert!(net.peer(2).client.backend().blockchain().canon_equals_to(&peer1_chain));
}
#[test]
fn blocks_are_not_announced_by_light_nodes() {
::env_logger::init().ok();
let mut net = TestNet::new(0);
// full peer0 is connected to light peer
// light peer1 is connected to full peer2
let mut light_config = ProtocolConfig::default();
light_config.roles = Roles::LIGHT;
net.add_peer(&ProtocolConfig::default());
net.add_peer(&light_config);
net.add_peer(&ProtocolConfig::default());
net.peer(0).push_blocks(1, false);
net.peer(0).start();
net.peer(1).start();
net.peer(2).start();
net.peer(0).on_connect(1);
net.peer(1).on_connect(2);
// generate block at peer0 && run sync
while !net.done() {
net.sync_step();
}
// peer 0 has the best chain
// peer 1 has the best chain
// peer 2 has genesis-chain only
assert_eq!(net.peer(0).client.backend().blockchain().info().unwrap().best_number, 1);
assert_eq!(net.peer(1).client.backend().blockchain().info().unwrap().best_number, 1);
assert_eq!(net.peer(2).client.backend().blockchain().info().unwrap().best_number, 0);
}