Generalize the Consensus Infrastructure (#883)

* Split out Consensus
* Supply ImportQueue through network-service
  - simplify ImportQueue.import_blocks
  - remove Deadlock on import_block
  - Adding Verifier-Trait
  - Implement import_queue provisioning in service; allow cli to import
* Allow to actually customize import queue
* Consensus Gossip: Cache Message hash per Topic
This commit is contained in:
Benjamin Kampmann
2018-10-16 13:40:33 +02:00
committed by GitHub
parent a24e61cb29
commit ac4bcf879f
61 changed files with 1937 additions and 3306 deletions
+11 -8
View File
@@ -28,8 +28,10 @@ const MAX_PARALLEL_DOWNLOADS: u32 = 1;
/// Block data with origin.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockData<B: BlockT> {
/// The Block Message from the wire
pub block: message::BlockData<B>,
pub origin: NodeIndex,
/// The peer, we received this from
pub origin: Option<NodeIndex>,
}
#[derive(Debug)]
@@ -92,7 +94,8 @@ impl<B: BlockT> BlockCollection<B> {
_ => (),
}
self.blocks.insert(start, BlockRangeState::Complete(blocks.into_iter().map(|b| BlockData { origin: who, block: b }).collect()));
self.blocks.insert(start, BlockRangeState::Complete(blocks.into_iter()
.map(|b| BlockData { origin: Some(who), block: b }).collect()));
}
/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
@@ -244,14 +247,14 @@ mod test {
bc.insert(1, blocks[1..11].to_vec(), peer0);
assert_eq!(bc.needed_blocks(peer0, 40, 150, 0), Some(11 .. 41));
assert_eq!(bc.drain(1), blocks[1..11].iter().map(|b| BlockData { block: b.clone(), origin: 0 }).collect::<Vec<_>>());
assert_eq!(bc.drain(1), blocks[1..11].iter().map(|b| BlockData { block: b.clone(), origin: Some(0) }).collect::<Vec<_>>());
bc.clear_peer_download(peer0);
bc.insert(11, blocks[11..41].to_vec(), peer0);
let drained = bc.drain(12);
assert_eq!(drained[..30], blocks[11..41].iter().map(|b| BlockData { block: b.clone(), origin: 0 }).collect::<Vec<_>>()[..]);
assert_eq!(drained[30..], blocks[41..81].iter().map(|b| BlockData { block: b.clone(), origin: 1 }).collect::<Vec<_>>()[..]);
assert_eq!(drained[..30], blocks[11..41].iter().map(|b| BlockData { block: b.clone(), origin: Some(0) }).collect::<Vec<_>>()[..]);
assert_eq!(drained[30..], blocks[41..81].iter().map(|b| BlockData { block: b.clone(), origin: Some(1) }).collect::<Vec<_>>()[..]);
bc.clear_peer_download(peer2);
assert_eq!(bc.needed_blocks(peer2, 40, 150, 80), Some(81 .. 121));
@@ -262,8 +265,8 @@ mod test {
assert_eq!(bc.drain(80), vec![]);
let drained = bc.drain(81);
assert_eq!(drained[..40], blocks[81..121].iter().map(|b| BlockData { block: b.clone(), origin: 2 }).collect::<Vec<_>>()[..]);
assert_eq!(drained[40..], blocks[121..150].iter().map(|b| BlockData { block: b.clone(), origin: 1 }).collect::<Vec<_>>()[..]);
assert_eq!(drained[..40], blocks[81..121].iter().map(|b| BlockData { block: b.clone(), origin: Some(2) }).collect::<Vec<_>>()[..]);
assert_eq!(drained[40..], blocks[121..150].iter().map(|b| BlockData { block: b.clone(), origin: Some(1) }).collect::<Vec<_>>()[..]);
}
#[test]
@@ -273,7 +276,7 @@ mod test {
len: 128,
downloading: 1,
});
let blocks = generate_blocks(10).into_iter().map(|b| BlockData { block: b, origin: 0 }).collect();
let blocks = generate_blocks(10).into_iter().map(|b| BlockData { block: b, origin: None }).collect();
bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
assert_eq!(bc.needed_blocks(0, 128, 10000, 000), Some(1 .. 100));
+9 -24
View File
@@ -16,24 +16,18 @@
//! Blockchain access trait
use client::{self, Client as SubstrateClient, ImportResult, ClientInfo, BlockStatus, BlockOrigin, CallExecutor};
use client::{self, Client as SubstrateClient, ImportBlock, ImportResult, ClientInfo, BlockStatus, CallExecutor};
use client::error::Error;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use runtime_primitives::generic::BlockId;
use runtime_primitives::bft::Justification;
use primitives::{Blake2Hasher};
use runtime_primitives::Justification;
use primitives::{Blake2Hasher, AuthorityId};
/// Local client abstraction for the network.
pub trait Client<Block: BlockT>: Send + Sync {
/// Import a new block. Parent is supposed to be existing in the blockchain.
fn import(
&self,
origin: BlockOrigin,
header: Block::Header,
justification: Justification<Block::Hash>,
body: Option<Vec<Block::Extrinsic>>,
finalized: bool,
) -> Result<ImportResult, Error>;
fn import(&self, block: ImportBlock<Block>, new_authorities: Option<Vec<AuthorityId>>)
-> Result<ImportResult, Error>;
/// Get blockchain info.
fn info(&self) -> Result<ClientInfo<Block>, Error>;
@@ -51,7 +45,7 @@ pub trait Client<Block: BlockT>: Send + Sync {
fn body(&self, id: &BlockId<Block>) -> Result<Option<Vec<Block::Extrinsic>>, Error>;
/// Get block justification.
fn justification(&self, id: &BlockId<Block>) -> Result<Option<Justification<Block::Hash>>, Error>;
fn justification(&self, id: &BlockId<Block>) -> Result<Option<Justification>, Error>;
/// Get block header proof.
fn header_proof(&self, block_number: <Block::Header as HeaderT>::Number) -> Result<(Block::Header, Vec<Vec<u8>>), Error>;
@@ -77,17 +71,8 @@ impl<B, E, Block> Client<Block> for SubstrateClient<B, E, Block> where
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
Block: BlockT,
{
fn import(
&self,
origin: BlockOrigin,
header: Block::Header,
justification: Justification<Block::Hash>,
body: Option<Vec<Block::Extrinsic>>,
finalized: bool,
) -> Result<ImportResult, Error> {
// TODO: defer justification check and add finality.
let justified_header = self.check_justification(header, justification.into())?;
(self as &SubstrateClient<B, E, Block>).import_block(origin, justified_header, body, finalized)
fn import(&self, block: ImportBlock<Block>, new_authorities: Option<Vec<AuthorityId>>) -> Result<ImportResult, Error> {
(self as &SubstrateClient<B, E, Block>).import_block(block, new_authorities)
}
fn info(&self) -> Result<ClientInfo<Block>, Error> {
@@ -110,7 +95,7 @@ impl<B, E, Block> Client<Block> for SubstrateClient<B, E, Block> where
(self as &SubstrateClient<B, E, Block>).body(id)
}
fn justification(&self, id: &BlockId<Block>) -> Result<Option<Justification<Block::Hash>>, Error> {
fn justification(&self, id: &BlockId<Block>) -> Result<Option<Justification>, Error> {
(self as &SubstrateClient<B, E, Block>).justification(id)
}
+130 -193
View File
@@ -22,9 +22,9 @@ use futures::sync::mpsc;
use std::time::{Instant, Duration};
use rand::{self, Rng};
use network_libp2p::NodeIndex;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor};
use runtime_primitives::generic::BlockId;
use message::{self, generic::Message as GenericMessage};
use message::generic::{Message, ConsensusMessage};
use protocol::Context;
use service::Roles;
use specialization::Specialization;
@@ -39,38 +39,33 @@ struct PeerConsensus<H> {
is_authority: bool,
}
/// Consensus messages.
#[derive(Debug, Clone, PartialEq)]
pub enum ConsensusMessage<B: BlockT> {
/// A message concerning BFT agreement
Bft(message::LocalizedBftMessage<B>),
/// A message concerning some chain-specific aspect of consensus
ChainSpecific(Vec<u8>, B::Hash),
}
struct MessageEntry<B: BlockT> {
hash: B::Hash,
message: ConsensusMessage<B>,
topic: B::Hash,
message_hash: B::Hash,
message: ConsensusMessage,
instant: Instant,
}
/// Consensus network protocol handler. Manages statements and candidate requests.
pub struct ConsensusGossip<B: BlockT> {
peers: HashMap<NodeIndex, PeerConsensus<B::Hash>>,
live_message_sinks: HashMap<B::Hash, mpsc::UnboundedSender<ConsensusMessage<B>>>,
peers: HashMap<NodeIndex, PeerConsensus<(B::Hash, B::Hash)>>,
live_message_sinks: HashMap<B::Hash, mpsc::UnboundedSender<ConsensusMessage>>,
messages: Vec<MessageEntry<B>>,
message_hashes: HashSet<B::Hash>,
known_messages: HashSet<(B::Hash, B::Hash)>,
session_start: Option<B::Hash>,
}
impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
impl<B: BlockT> ConsensusGossip<B>
where
B::Header: HeaderT<Number=u64>
{
/// Create a new instance.
pub fn new() -> Self {
ConsensusGossip {
peers: HashMap::new(),
live_message_sinks: HashMap::new(),
messages: Default::default(),
message_hashes: Default::default(),
known_messages: Default::default(),
session_start: None
}
}
@@ -88,13 +83,8 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
// TODO: limit by size
let mut known_messages = HashSet::new();
for entry in self.messages.iter() {
known_messages.insert(entry.hash);
let message = match entry.message {
ConsensusMessage::Bft(ref bft) => GenericMessage::BftMessage(bft.clone()),
ConsensusMessage::ChainSpecific(ref msg, _) => GenericMessage::ChainSpecific(msg.clone()),
};
protocol.send_message(who, message);
known_messages.insert((entry.topic, entry.message_hash));
protocol.send_message(who, Message::Consensus(entry.topic.clone(), entry.message.clone()));
}
self.peers.insert(who, PeerConsensus {
known_messages,
@@ -109,9 +99,17 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
}
}
fn propagate(&mut self, protocol: &mut Context<B>, message: message::Message<B>, hash: B::Hash) {
fn propagate<F>(
&mut self,
protocol: &mut Context<B>,
message_hash: B::Hash,
topic: B::Hash,
get_message: F,
)
where F: Fn() -> ConsensusMessage,
{
let mut non_authorities: Vec<_> = self.peers.iter()
.filter_map(|(id, ref peer)| if !peer.is_authority && !peer.known_messages.contains(&hash) { Some(*id) } else { None })
.filter_map(|(id, ref peer)| if !peer.is_authority && !peer.known_messages.contains(&(topic, message_hash)) { Some(*id) } else { None })
.collect();
rand::thread_rng().shuffle(&mut non_authorities);
@@ -123,79 +121,33 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
for (id, ref mut peer) in self.peers.iter_mut() {
if peer.is_authority {
if peer.known_messages.insert(hash.clone()) {
if peer.known_messages.insert((topic.clone(), message_hash.clone())) {
let message = get_message();
trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message);
protocol.send_message(*id, message.clone());
protocol.send_message(*id, Message::Consensus(topic, message));
}
}
else if non_authorities.contains(&id) {
} else if non_authorities.contains(&id) {
let message = get_message();
trace!(target:"gossip", "Propagating to {}: {:?}", id, message);
peer.known_messages.insert(hash.clone());
protocol.send_message(*id, message.clone());
peer.known_messages.insert((topic.clone(), message_hash.clone()));
protocol.send_message(*id, Message::Consensus(topic, message));
}
}
}
fn register_message(&mut self, hash: B::Hash, message: ConsensusMessage<B>) {
if self.message_hashes.insert(hash) {
fn register_message<F>(&mut self, message_hash: B::Hash, topic: B::Hash, get_message: F)
where F: Fn() -> ConsensusMessage
{
if self.known_messages.insert((topic, message_hash)) {
self.messages.push(MessageEntry {
hash,
topic,
message_hash,
instant: Instant::now(),
message,
message: get_message(),
});
}
}
/// Handles incoming BFT message, passing to stream and repropagating.
pub fn on_bft_message(&mut self, protocol: &mut Context<B>, who: NodeIndex, message: message::LocalizedBftMessage<B>) {
if let Some((hash, message)) = self.handle_incoming(protocol, who, ConsensusMessage::Bft(message)) {
// propagate to other peers.
self.multicast(protocol, message, Some(hash));
}
}
/// Handles incoming chain-specific message and repropagates
pub fn on_chain_specific(&mut self, protocol: &mut Context<B>, who: NodeIndex, message: Vec<u8>, topic: B::Hash) {
debug!(target: "gossip", "received chain-specific gossip message");
if let Some((hash, message)) = self.handle_incoming(protocol, who, ConsensusMessage::ChainSpecific(message, topic)) {
debug!(target: "gossip", "handled incoming chain-specific message");
// propagate to other peers.
self.multicast(protocol, message, Some(hash));
}
}
/// Get a stream of messages relevant to consensus for the given topic.
pub fn messages_for(&mut self, topic: B::Hash) -> mpsc::UnboundedReceiver<ConsensusMessage<B>> {
let (sink, stream) = mpsc::unbounded();
for entry in self.messages.iter() {
let message_matches = match entry.message {
ConsensusMessage::Bft(ref msg) => msg.parent_hash == topic,
ConsensusMessage::ChainSpecific(_, ref h) => h == &topic,
};
if message_matches {
sink.unbounded_send(entry.message.clone()).expect("receiving end known to be open; qed");
}
}
self.live_message_sinks.insert(topic, sink);
stream
}
/// Multicast a chain-specific message to other authorities.
pub fn multicast_chain_specific(&mut self, protocol: &mut Context<B>, message: Vec<u8>, topic: B::Hash) {
trace!(target:"gossip", "sending chain-specific message");
self.multicast(protocol, ConsensusMessage::ChainSpecific(message, topic), None);
}
/// Multicast a BFT message to other authorities
pub fn multicast_bft_message(&mut self, protocol: &mut Context<B>, message: message::LocalizedBftMessage<B>) {
// Broadcast message to all authorities.
trace!(target:"gossip", "Broadcasting BFT message {:?}", message);
self.multicast(protocol, ConsensusMessage::Bft(message), None);
}
/// Call when a peer has been disconnected to stop tracking gossip status.
pub fn peer_disconnected(&mut self, _protocol: &mut Context<B>, who: NodeIndex) {
self.peers.remove(&who);
@@ -206,19 +158,14 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
pub fn collect_garbage<P: Fn(&B::Hash) -> bool>(&mut self, predicate: P) {
self.live_message_sinks.retain(|_, sink| !sink.is_closed());
let hashes = &mut self.message_hashes;
let hashes = &mut self.known_messages;
let before = self.messages.len();
let now = Instant::now();
self.messages.retain(|entry| {
let topic = match entry.message {
ConsensusMessage::Bft(ref msg) => &msg.parent_hash,
ConsensusMessage::ChainSpecific(_, ref h) => h,
};
if entry.instant + MESSAGE_LIFETIME >= now && predicate(topic) {
if entry.instant + MESSAGE_LIFETIME >= now && predicate(&entry.topic) {
true
} else {
hashes.remove(&entry.hash);
hashes.remove(&(entry.topic, entry.message_hash));
false
}
});
@@ -228,35 +175,32 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
}
}
fn handle_incoming(&mut self, protocol: &mut Context<B>, who: NodeIndex, message: ConsensusMessage<B>) -> Option<(B::Hash, ConsensusMessage<B>)> {
let (hash, topic, message) = match message {
ConsensusMessage::Bft(msg) => {
let parent = msg.parent_hash;
let generic = GenericMessage::BftMessage(msg);
(
::protocol::hash_message(&generic),
parent,
match generic {
GenericMessage::BftMessage(msg) => ConsensusMessage::Bft(msg),
_ => panic!("`generic` is known to be the `BftMessage` variant; qed"),
}
)
}
ConsensusMessage::ChainSpecific(msg, topic) => {
let generic = GenericMessage::ChainSpecific(msg);
(
::protocol::hash_message::<B>(&generic),
topic,
match generic {
GenericMessage::ChainSpecific(msg) => ConsensusMessage::ChainSpecific(msg, topic),
_ => panic!("`generic` is known to be the `ChainSpecific` variant; qed"),
}
)
}
};
/// Get all incoming messages for a topic.
pub fn messages_for(&mut self, topic: B::Hash) -> mpsc::UnboundedReceiver<ConsensusMessage> {
let (tx, rx) = mpsc::unbounded();
for entry in self.messages.iter().filter(|e| e.topic == topic) {
tx.unbounded_send(entry.message.clone()).expect("receiver known to be live; qed");
}
self.live_message_sinks.insert(topic, tx);
if self.message_hashes.contains(&hash) {
trace!(target:"gossip", "Ignored already known message from {}", who);
rx
}
/// Handle an incoming ConsensusMessage for topic by who via protocol. Discard message if topic
/// already known, the message is old, its source peers isn't a registered peer or the connection
/// to them is broken. Return `Some(topic, message)` if it was added to the internal queue, `None`
/// in all other cases.
pub fn on_incoming(
&mut self,
protocol: &mut Context<B>,
who: NodeIndex,
topic: B::Hash,
message: ConsensusMessage,
) -> Option<(B::Hash, ConsensusMessage)> {
let message_hash = HashFor::<B>::hash(&message[..]);
if self.known_messages.contains(&(topic, message_hash)) {
trace!(target:"gossip", "Ignored already known message from {} in {}", who, topic);
return None;
}
@@ -274,11 +218,12 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
(Ok(_), Ok(None)) => {},
}
if let Some(ref mut peer) = self.peers.get_mut(&who) {
use std::collections::hash_map::Entry;
peer.known_messages.insert(hash);
peer.known_messages.insert((topic, message_hash));
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry(topic) {
debug!(target: "gossip", "Pushing relevant consensus message to sink.");
debug!(target: "gossip", "Pushing consensus message to sink for {}.", topic);
if let Err(e) = entry.get().unbounded_send(message.clone()) {
trace!(target:"gossip", "Error broadcasting message notification: {:?}", e);
}
@@ -292,18 +237,21 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
return None;
}
Some((hash, message))
self.multicast_inner(protocol, message_hash, topic, || message.clone());
Some((topic, message))
}
fn multicast(&mut self, protocol: &mut Context<B>, message: ConsensusMessage<B>, hash: Option<B::Hash>) {
let generic = match message {
ConsensusMessage::Bft(ref message) => GenericMessage::BftMessage(message.clone()),
ConsensusMessage::ChainSpecific(ref message, _) => GenericMessage::ChainSpecific(message.clone()),
};
/// Multicast a message to all peers.
pub fn multicast(&mut self, protocol: &mut Context<B>, topic: B::Hash, message: ConsensusMessage) {
let message_hash = HashFor::<B>::hash(&message);
self.multicast_inner(protocol, message_hash, topic, || message.clone());
}
let hash = hash.unwrap_or_else(|| ::protocol::hash_message(&generic));
self.register_message(hash, message);
self.propagate(protocol, generic, hash);
fn multicast_inner<F>(&mut self, protocol: &mut Context<B>, message_hash: B::Hash, topic: B::Hash, get_message: F)
where F: Fn() -> ConsensusMessage
{
self.register_message(message_hash, topic, &get_message);
self.propagate(protocol, message_hash, topic, get_message);
}
/// Note new consensus session.
@@ -329,12 +277,16 @@ impl<Block: BlockT> Specialization<Block> for ConsensusGossip<Block> where
self.peer_disconnected(ctx, who);
}
fn on_message(&mut self, ctx: &mut Context<Block>, who: NodeIndex, message: &mut Option<message::Message<Block>>) {
fn on_message(
&mut self,
ctx: &mut Context<Block>,
who: NodeIndex,
message: &mut Option<::message::Message<Block>>
) {
match message.take() {
Some(generic_message::Message::BftMessage(msg)) => {
trace!(target: "gossip", "BFT message from {}: {:?}", who, msg);
// TODO: check signature here? what if relevant block is unknown?
self.on_bft_message(ctx, who, msg)
Some(generic_message::Message::Consensus(topic, msg)) => {
trace!(target: "gossip", "Consensus message from {}: {:?}", who, msg);
self.on_incoming(ctx, who, topic, msg);
}
r => *message = r,
}
@@ -358,10 +310,8 @@ impl<Block: BlockT> Specialization<Block> for ConsensusGossip<Block> where
#[cfg(test)]
mod tests {
use runtime_primitives::bft::Justification;
use runtime_primitives::testing::{H256, Header, Block as RawBlock};
use runtime_primitives::testing::{H256, Block as RawBlock};
use std::time::Instant;
use message::{self, generic};
use super::*;
type Block = RawBlock<u64>;
@@ -374,90 +324,77 @@ mod tests {
let now = Instant::now();
let m1_hash = H256::random();
let m2_hash = H256::random();
let m1 = ConsensusMessage::Bft(message::LocalizedBftMessage {
parent_hash: prev_hash,
message: message::generic::BftMessage::Auxiliary(Justification {
round_number: 0,
hash: Default::default(),
signatures: Default::default(),
}),
});
let m2 = ConsensusMessage::ChainSpecific(vec![1, 2, 3], best_hash);
let m1 = vec![1, 2, 3];
let m2 = vec![4, 5, 6];
macro_rules! push_msg {
($hash:expr, $now: expr, $m:expr) => {
($topic:expr, $hash: expr, $now: expr, $m:expr) => {
consensus.messages.push(MessageEntry {
hash: $hash,
topic: $topic,
message_hash: $hash,
instant: $now,
message: $m,
})
}
}
push_msg!(m1_hash, now, m1);
push_msg!(m2_hash, now, m2.clone());
consensus.message_hashes.insert(m1_hash);
consensus.message_hashes.insert(m2_hash);
push_msg!(prev_hash, m1_hash, now, m1);
push_msg!(best_hash, m2_hash, now, m2.clone());
consensus.known_messages.insert((prev_hash, m1_hash));
consensus.known_messages.insert((best_hash, m2_hash));
// nothing to collect
consensus.collect_garbage(|_topic| true);
consensus.collect_garbage(|_t| true);
assert_eq!(consensus.messages.len(), 2);
assert_eq!(consensus.message_hashes.len(), 2);
// random header, nothing should be cleared
let mut header = Header {
parent_hash: H256::default(),
number: 0,
state_root: H256::default(),
extrinsics_root: H256::default(),
digest: Default::default(),
};
assert_eq!(consensus.known_messages.len(), 2);
// nothing to collect with default.
consensus.collect_garbage(|&topic| topic != Default::default());
assert_eq!(consensus.messages.len(), 2);
assert_eq!(consensus.message_hashes.len(), 2);
assert_eq!(consensus.known_messages.len(), 2);
// header that matches one of the messages
header.parent_hash = prev_hash;
// topic that was used in one message.
consensus.collect_garbage(|topic| topic != &prev_hash);
assert_eq!(consensus.messages.len(), 1);
assert_eq!(consensus.message_hashes.len(), 1);
assert!(consensus.message_hashes.contains(&m2_hash));
assert_eq!(consensus.known_messages.len(), 1);
assert!(consensus.known_messages.contains(&(best_hash, m2_hash)));
// make timestamp expired
consensus.messages.clear();
push_msg!(m2_hash, now - MESSAGE_LIFETIME, m2);
push_msg!(best_hash, m2_hash, now - MESSAGE_LIFETIME, m2);
consensus.collect_garbage(|_topic| true);
assert!(consensus.messages.is_empty());
assert!(consensus.message_hashes.is_empty());
assert!(consensus.known_messages.is_empty());
}
#[test]
fn message_stream_include_those_sent_before_asking_for_stream() {
use futures::Stream;
let mut consensus = ConsensusGossip::new();
let mut consensus = ConsensusGossip::<Block>::new();
let bft_message = generic::BftMessage::Consensus(generic::SignedConsensusMessage::Vote(generic::SignedConsensusVote {
vote: generic::ConsensusVote::AdvanceRound(0),
sender: [0; 32].into(),
signature: Default::default(),
}));
let message = vec![1, 2, 3];
let parent_hash = [1; 32].into();
let message_hash = HashFor::<Block>::hash(&message);
let topic = HashFor::<Block>::hash(&[1,2,3]);
let localized = ::message::LocalizedBftMessage::<Block> {
message: bft_message,
parent_hash: parent_hash,
};
let message = generic::Message::BftMessage(localized.clone());
let message_hash = ::protocol::hash_message::<Block>(&message);
let message = ConsensusMessage::Bft(localized);
consensus.register_message(message_hash, message.clone());
let stream = consensus.messages_for(parent_hash);
consensus.register_message(message_hash, topic, || message.clone());
let stream = consensus.messages_for(topic);
assert_eq!(stream.wait().next(), Some(Ok(message)));
}
#[test]
fn can_keep_multiple_messages_per_topic() {
let mut consensus = ConsensusGossip::<Block>::new();
let topic = [1; 32].into();
let msg_a = vec![1, 2, 3];
let msg_b = vec![4, 5, 6];
consensus.register_message(HashFor::<Block>::hash(&msg_a), topic, || msg_a.clone());
consensus.register_message(HashFor::<Block>::hash(&msg_b), topic, || msg_b.clone());
assert_eq!(consensus.messages.len(), 2);
}
}
+395 -243
View File
@@ -14,27 +14,69 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Blocks import queue.
//! Import Queue primitive: something which can verify and import blocks.
//!
//! This serves as an intermediate and abstracted step between synchronization
//! and import. Each mode of consensus will have its own requirements for block verification.
//! Some algorithms can verify in parallel, while others only sequentially.
//!
//! The `ImportQueue` trait allows such verification strategies to be instantiated.
//! The `BasicQueue` and `BasicVerifier` traits allow serial queues to be
//! instantiated simply.
use std::collections::{HashSet, VecDeque};
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use parking_lot::{Condvar, Mutex, RwLock};
use client::{BlockOrigin, ImportResult};
pub use client::{BlockOrigin, ImportBlock, ImportResult};
use network_libp2p::{NodeIndex, Severity};
use primitives::AuthorityId;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero};
use blocks::BlockData;
pub use blocks::BlockData;
use chain::Client;
use error::{ErrorKind, Error};
use protocol::Context;
use service::ExecuteInContext;
use sync::ChainSync;
#[cfg(any(test, feature = "test-helpers"))]
use std::cell::RefCell;
/// Verify a justification of a block
pub trait Verifier<B: BlockT>: Send + Sync + Sized {
/// Verify the given data and return the ImportBlock and an optional
/// new set of validators to import. If not, err with an Error-Message
/// presented to the User in the logs.
fn verify(
&self,
origin: BlockOrigin,
header: B::Header,
justification: Vec<u8>,
body: Option<Vec<B::Extrinsic>>
) -> Result<(ImportBlock<B>, Option<Vec<AuthorityId>>), String>;
}
/// Blocks import queue API.
pub trait ImportQueue<B: BlockT>: Send + Sync {
/// Start background work for the queue as necessary.
///
/// This is called automatically by the network service when synchronization
/// begins.
fn start<E>(
&self,
_sync: Weak<RwLock<ChainSync<B>>>,
_service: Weak<E>,
_chain: Weak<Client<B>>
) -> Result<(), Error> where
Self: Sized,
E: 'static + ExecuteInContext<B>,
{
Ok(())
}
/// Clear the queue when sync is restarting.
fn clear(&self);
/// Clears the import queue and stops importing.
@@ -44,7 +86,7 @@ pub trait ImportQueue<B: BlockT>: Send + Sync {
/// Is block with given hash currently in the queue.
fn is_importing(&self, hash: &B::Hash) -> bool;
/// Import bunch of blocks.
fn import_blocks(&self, sync: &mut ChainSync<B>, protocol: &mut Context<B>, blocks: (BlockOrigin, Vec<BlockData<B>>));
fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<BlockData<B>>);
}
/// Import queue status. It isn't completely accurate.
@@ -55,11 +97,12 @@ pub struct ImportQueueStatus<B: BlockT> {
pub best_importing_number: <<B as BlockT>::Header as HeaderT>::Number,
}
/// Blocks import queue that is importing blocks in the separate thread.
pub struct AsyncImportQueue<B: BlockT> {
/// Basic block import queue that is importing blocks sequentially in a separate thread,
/// with pluggable verification.
pub struct BasicQueue<B: BlockT, V: 'static + Verifier<B>> {
handle: Mutex<Option<::std::thread::JoinHandle<()>>>,
data: Arc<AsyncImportQueueData<B>>,
instant_finality: bool,
verifier: Arc<V>,
}
/// Locks order: queue, queue_blocks, best_importing_number
@@ -71,29 +114,19 @@ struct AsyncImportQueueData<B: BlockT> {
is_stopping: AtomicBool,
}
impl<B: BlockT> AsyncImportQueue<B> {
pub fn new(instant_finality: bool) -> Self {
impl<B: BlockT, V: Verifier<B>> BasicQueue<B, V> {
/// Instantiate a new basic queue, with given verifier.
pub fn new(verifier: Arc<V>) -> Self {
Self {
handle: Mutex::new(None),
data: Arc::new(AsyncImportQueueData::new()),
instant_finality,
verifier,
}
}
pub fn start<E: 'static + ExecuteInContext<B>>(&self, sync: Weak<RwLock<ChainSync<B>>>, service: Weak<E>, chain: Weak<Client<B>>) -> Result<(), Error> {
debug_assert!(self.handle.lock().is_none());
let qdata = self.data.clone();
let instant_finality = self.instant_finality;
*self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || {
import_thread(sync, service, chain, qdata, instant_finality)
}).map_err(|err| Error::from(ErrorKind::Io(err)))?);
Ok(())
}
}
impl<B: BlockT> AsyncImportQueueData<B> {
pub fn new() -> Self {
fn new() -> Self {
Self {
signal: Default::default(),
queue: Mutex::new(VecDeque::new()),
@@ -104,7 +137,23 @@ impl<B: BlockT> AsyncImportQueueData<B> {
}
}
impl<B: BlockT> ImportQueue<B> for AsyncImportQueue<B> {
impl<B: BlockT, V: 'static + Verifier<B>> ImportQueue<B> for BasicQueue<B, V> {
fn start<E: 'static + ExecuteInContext<B>>(
&self,
sync: Weak<RwLock<ChainSync<B>>>,
service: Weak<E>,
chain: Weak<Client<B>>
) -> Result<(), Error> {
debug_assert!(self.handle.lock().is_none());
let qdata = self.data.clone();
let verifier = self.verifier.clone();
*self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || {
import_thread(sync, service, chain, qdata, verifier)
}).map_err(|err| Error::from(ErrorKind::Io(err)))?);
Ok(())
}
fn clear(&self) {
let mut queue = self.data.queue.lock();
let mut queue_blocks = self.data.queue_blocks.write();
@@ -135,39 +184,39 @@ impl<B: BlockT> ImportQueue<B> for AsyncImportQueue<B> {
self.data.queue_blocks.read().contains(hash)
}
fn import_blocks(&self, _sync: &mut ChainSync<B>, _protocol: &mut Context<B>, blocks: (BlockOrigin, Vec<BlockData<B>>)) {
if blocks.1.is_empty() {
fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<BlockData<B>>) {
if blocks.is_empty() {
return;
}
trace!(target:"sync", "Scheduling {} blocks for import", blocks.1.len());
trace!(target:"sync", "Scheduling {} blocks for import", blocks.len());
let mut queue = self.data.queue.lock();
let mut queue_blocks = self.data.queue_blocks.write();
let mut best_importing_number = self.data.best_importing_number.write();
let new_best_importing_number = blocks.1.last().and_then(|b| b.block.header.as_ref().map(|h| h.number().clone())).unwrap_or_else(|| Zero::zero());
queue_blocks.extend(blocks.1.iter().map(|b| b.block.hash.clone()));
let new_best_importing_number = blocks.last().and_then(|b| b.block.header.as_ref().map(|h| h.number().clone())).unwrap_or_else(|| Zero::zero());
queue_blocks.extend(blocks.iter().map(|b| b.block.hash.clone()));
if new_best_importing_number > *best_importing_number {
*best_importing_number = new_best_importing_number;
}
queue.push_back(blocks);
queue.push_back((origin, blocks));
self.data.signal.notify_one();
}
}
impl<B: BlockT> Drop for AsyncImportQueue<B> {
impl<B: BlockT, V: 'static + Verifier<B>> Drop for BasicQueue<B, V> {
fn drop(&mut self) {
self.stop();
}
}
/// Blocks import thread.
fn import_thread<B: BlockT, E: ExecuteInContext<B>>(
fn import_thread<B: BlockT, E: ExecuteInContext<B>, V: Verifier<B>>(
sync: Weak<RwLock<ChainSync<B>>>,
service: Weak<E>,
chain: Weak<Client<B>>,
qdata: Arc<AsyncImportQueueData<B>>,
instant_finality: bool,
verifier: Arc<V>
) {
trace!(target: "sync", "Starting import thread");
loop {
@@ -191,10 +240,10 @@ fn import_thread<B: BlockT, E: ExecuteInContext<B>>(
(Some(sync), Some(service), Some(chain)) => {
let blocks_hashes: Vec<B::Hash> = new_blocks.1.iter().map(|b| b.block.hash.clone()).collect();
if !import_many_blocks(
&mut SyncLink::Indirect(&sync, &*chain, &*service),
&mut SyncLink{chain: &sync, client: &*chain, context: &*service},
Some(&*qdata),
new_blocks,
instant_finality,
verifier.clone(),
) {
break;
}
@@ -210,7 +259,6 @@ fn import_thread<B: BlockT, E: ExecuteInContext<B>>(
trace!(target: "sync", "Stopping import thread");
}
/// ChainSync link trait.
trait SyncLinkApi<B: BlockT> {
/// Get chain reference.
@@ -227,200 +275,30 @@ trait SyncLinkApi<B: BlockT> {
fn restart(&mut self);
}
/// Link with the ChainSync service.
enum SyncLink<'a, B: 'a + BlockT, E: 'a + ExecuteInContext<B>> {
/// Indirect link (through service).
Indirect(&'a RwLock<ChainSync<B>>, &'a Client<B>, &'a E),
/// Direct references are given.
#[cfg(any(test, feature = "test-helpers"))]
Direct(&'a mut ChainSync<B>, &'a mut Context<B>),
}
/// Block import successful result.
#[derive(Debug, PartialEq)]
enum BlockImportResult<H: ::std::fmt::Debug + PartialEq, N: ::std::fmt::Debug + PartialEq> {
/// Imported known block.
ImportedKnown(H, N),
/// Imported unknown block.
ImportedUnknown(H, N),
}
/// Block import error.
#[derive(Debug, PartialEq)]
enum BlockImportError {
/// Disconnect from peer and continue import of next bunch of blocks.
Disconnect(NodeIndex),
/// Disconnect from peer and restart sync.
DisconnectAndRestart(NodeIndex),
/// Restart sync.
Restart,
}
/// Import a bunch of blocks.
fn import_many_blocks<'a, B: BlockT>(
link: &mut SyncLinkApi<B>,
qdata: Option<&AsyncImportQueueData<B>>,
blocks: (BlockOrigin, Vec<BlockData<B>>),
instant_finality: bool,
) -> bool
{
let (blocks_origin, blocks) = blocks;
let count = blocks.len();
let mut imported = 0;
let blocks_range = match (
blocks.first().and_then(|b| b.block.header.as_ref().map(|h| h.number())),
blocks.last().and_then(|b| b.block.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
trace!(target:"sync", "Starting import of {} blocks{}", count, blocks_range);
// Blocks in the response/drain should be in ascending order.
for block in blocks {
let import_result = import_single_block(
link.chain(),
blocks_origin.clone(),
block,
instant_finality,
);
let is_import_failed = import_result.is_err();
imported += process_import_result(link, import_result);
if is_import_failed {
qdata.map(|qdata| *qdata.best_importing_number.write() = Zero::zero());
return true;
}
if qdata.map(|qdata| qdata.is_stopping.load(Ordering::SeqCst)).unwrap_or_default() {
return false;
}
}
trace!(target: "sync", "Imported {} of {}", imported, count);
link.maintain_sync();
true
}
/// Single block import function.
fn import_single_block<B: BlockT>(
chain: &Client<B>,
block_origin: BlockOrigin,
block: BlockData<B>,
instant_finality: bool,
) -> Result<BlockImportResult<B::Hash, <<B as BlockT>::Header as HeaderT>::Number>, BlockImportError>
{
let origin = block.origin;
let block = block.block;
match (block.header, block.justification) {
(Some(header), Some(justification)) => {
let number = header.number().clone();
let hash = header.hash();
let parent = header.parent_hash().clone();
let result = chain.import(
block_origin,
header,
justification,
block.body,
instant_finality,
);
match result {
Ok(ImportResult::AlreadyInChain) => {
trace!(target: "sync", "Block already in chain {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedKnown(hash, number))
},
Ok(ImportResult::AlreadyQueued) => {
trace!(target: "sync", "Block already queued {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedKnown(hash, number))
},
Ok(ImportResult::Queued) => {
trace!(target: "sync", "Block queued {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedUnknown(hash, number))
},
Ok(ImportResult::UnknownParent) => {
debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent);
Err(BlockImportError::Restart)
},
Ok(ImportResult::KnownBad) => {
debug!(target: "sync", "Peer gave us a bad block {}: {:?}", number, hash);
Err(BlockImportError::DisconnectAndRestart(origin)) //TODO: use persistent ID
}
Err(e) => {
debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e);
Err(BlockImportError::Restart)
}
}
},
(None, _) => {
debug!(target: "sync", "Header {} was not provided by {} ", block.hash, origin);
Err(BlockImportError::Disconnect(origin)) //TODO: use persistent ID
},
(_, None) => {
debug!(target: "sync", "Justification set for block {} was not provided by {} ", block.hash, origin);
Err(BlockImportError::Disconnect(origin)) //TODO: use persistent ID
}
}
}
/// Process single block import result.
fn process_import_result<'a, B: BlockT>(
link: &mut SyncLinkApi<B>,
result: Result<BlockImportResult<B::Hash, <<B as BlockT>::Header as HeaderT>::Number>, BlockImportError>
) -> usize
{
match result {
Ok(BlockImportResult::ImportedKnown(hash, number)) => {
link.block_imported(&hash, number);
1
},
Ok(BlockImportResult::ImportedUnknown(hash, number)) => {
link.block_imported(&hash, number);
1
},
Err(BlockImportError::Disconnect(who)) => {
// TODO: FIXME: @arkpar BlockImport shouldn't be trying to manage the peer set.
// This should contain an actual reason.
link.useless_peer(who, "Import result was stated Disconnect");
0
},
Err(BlockImportError::DisconnectAndRestart(who)) => {
// TODO: FIXME: @arkpar BlockImport shouldn't be trying to manage the peer set.
// This should contain an actual reason.
link.note_useless_and_restart_sync(who, "Import result was stated DisconnectAndRestart");
0
},
Err(BlockImportError::Restart) => {
link.restart();
0
},
}
struct SyncLink<'a, B: 'a + BlockT, E: 'a + ExecuteInContext<B>> {
pub chain: &'a RwLock<ChainSync<B>>,
pub client: &'a Client<B>,
pub context: &'a E,
}
impl<'a, B: 'static + BlockT, E: 'a + ExecuteInContext<B>> SyncLink<'a, B, E> {
/// Execute closure with locked ChainSync.
/// Execute closure with locked ChainSync.
fn with_sync<F: Fn(&mut ChainSync<B>, &mut Context<B>)>(&mut self, closure: F) {
match *self {
#[cfg(any(test, feature = "test-helpers"))]
SyncLink::Direct(ref mut sync, ref mut protocol) =>
closure(*sync, *protocol),
SyncLink::Indirect(ref sync, _, ref service) =>
service.execute_in_context(move |protocol| {
let mut sync = sync.write();
closure(&mut *sync, protocol)
}),
}
let service = self.context;
let sync = self.chain;
service.execute_in_context(move |protocol| {
let mut sync = sync.write();
closure(&mut *sync, protocol)
});
}
}
impl<'a, B: 'static + BlockT, E: ExecuteInContext<B>> SyncLinkApi<B> for SyncLink<'a, B, E> {
impl<'a, B: 'static + BlockT, E: 'a + ExecuteInContext<B>> SyncLinkApi<B> for SyncLink<'a, B, E> {
fn chain(&self) -> &Client<B> {
match *self {
#[cfg(any(test, feature = "test-helpers"))]
SyncLink::Direct(_, ref protocol) => protocol.client(),
SyncLink::Indirect(_, ref chain, _) => *chain,
}
self.client
}
fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
@@ -447,14 +325,288 @@ impl<'a, B: 'static + BlockT, E: ExecuteInContext<B>> SyncLinkApi<B> for SyncLin
}
}
/// Block import successful result.
#[derive(Debug, PartialEq)]
enum BlockImportResult<H: ::std::fmt::Debug + PartialEq, N: ::std::fmt::Debug + PartialEq> {
/// Imported known block.
ImportedKnown(H, N),
/// Imported unknown block.
ImportedUnknown(H, N),
}
/// Block import error.
#[derive(Debug, PartialEq)]
enum BlockImportError {
/// Block missed header, can't be imported
IncompleteHeader(Option<NodeIndex>),
/// Block missed justification, can't be imported
IncompleteJustification(Option<NodeIndex>),
/// Block verification failed, can't be imported
VerificationFailed(Option<NodeIndex>, String),
/// Block is known to be Bad
BadBlock(Option<NodeIndex>),
/// Block has an unknown parent
UnknownParent,
/// Other Error.
Error,
}
/// Import a bunch of blocks.
fn import_many_blocks<'a, B: BlockT, V: Verifier<B>>(
link: &mut SyncLinkApi<B>,
qdata: Option<&AsyncImportQueueData<B>>,
blocks: (BlockOrigin, Vec<BlockData<B>>),
verifier: Arc<V>
) -> bool
{
let (blocks_origin, blocks) = blocks;
let count = blocks.len();
let mut imported = 0;
let blocks_range = match (
blocks.first().and_then(|b| b.block.header.as_ref().map(|h| h.number())),
blocks.last().and_then(|b| b.block.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
trace!(target:"sync", "Starting import of {} blocks {}", count, blocks_range);
// Blocks in the response/drain should be in ascending order.
for block in blocks {
let import_result = import_single_block(
link.chain(),
blocks_origin.clone(),
block,
verifier.clone(),
);
let is_import_failed = import_result.is_err();
imported += process_import_result(link, import_result);
if is_import_failed {
qdata.map(|qdata| *qdata.best_importing_number.write() = Zero::zero());
return true;
}
if qdata.map(|qdata| qdata.is_stopping.load(Ordering::SeqCst)).unwrap_or_default() {
return false;
}
}
trace!(target: "sync", "Imported {} of {}", imported, count);
link.maintain_sync();
true
}
/// Single block import function.
fn import_single_block<B: BlockT, V: Verifier<B>>(
chain: &Client<B>,
block_origin: BlockOrigin,
block: BlockData<B>,
verifier: Arc<V>
) -> Result<BlockImportResult<B::Hash, <<B as BlockT>::Header as HeaderT>::Number>, BlockImportError>
{
let peer = block.origin;
let block = block.block;
let (header, justification) = match (block.header, block.justification) {
(Some(header), Some(justification)) => (header, justification),
(None, _) => {
if let Some(peer) = peer {
debug!(target: "sync", "Header {} was not provided by {} ", block.hash, peer);
} else {
debug!(target: "sync", "Header {} was not provided ", block.hash);
}
return Err(BlockImportError::IncompleteHeader(peer)) //TODO: use persistent ID
},
(_, None) => {
if let Some(peer) = peer {
debug!(target: "sync", "Justification set for block {} was not provided by {} ", block.hash, peer);
} else {
debug!(target: "sync", "Justification set for block {} was not provided", block.hash);
}
return Err(BlockImportError::IncompleteJustification(peer)) //TODO: use persistent ID
}
};
let number = header.number().clone();
let hash = header.hash();
let parent = header.parent_hash().clone();
let (import_block, new_authorities) = verifier.verify(block_origin, header, justification, block.body)
.map_err(|msg| {
if let Some(peer) = peer {
trace!(target: "sync", "Verifying {}({}) from {} failed: {}", number, hash, peer, msg);
} else {
trace!(target: "sync", "Verifying {}({}) failed: {}", number, hash, msg);
}
BlockImportError::VerificationFailed(peer, msg)
})?;
match chain.import(import_block, new_authorities) {
Ok(ImportResult::AlreadyInChain) => {
trace!(target: "sync", "Block already in chain {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedKnown(hash, number))
},
Ok(ImportResult::AlreadyQueued) => {
trace!(target: "sync", "Block already queued {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedKnown(hash, number))
},
Ok(ImportResult::Queued) => {
trace!(target: "sync", "Block queued {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedUnknown(hash, number))
},
Ok(ImportResult::UnknownParent) => {
debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent);
Err(BlockImportError::UnknownParent)
},
Ok(ImportResult::KnownBad) => {
debug!(target: "sync", "Peer gave us a bad block {}: {:?}", number, hash);
Err(BlockImportError::BadBlock(peer)) //TODO: use persistent ID
}
Err(e) => {
debug!(target: "sync", "Error importing block {}: {:?}: {:?}", number, hash, e);
Err(BlockImportError::Error)
}
}
}
/// Process single block import result.
fn process_import_result<'a, B: BlockT>(
link: &mut SyncLinkApi<B>,
result: Result<BlockImportResult<B::Hash, <<B as BlockT>::Header as HeaderT>::Number>, BlockImportError>
) -> usize
{
match result {
Ok(BlockImportResult::ImportedKnown(hash, number)) => {
link.block_imported(&hash, number);
1
},
Ok(BlockImportResult::ImportedUnknown(hash, number)) => {
link.block_imported(&hash, number);
1
},
Err(BlockImportError::IncompleteJustification(who)) => {
if let Some(peer) = who {
link.useless_peer(peer, "Sent block with incomplete justification to import");
}
0
},
Err(BlockImportError::IncompleteHeader(who)) => {
if let Some(peer) = who {
link.useless_peer(peer, "Sent block with incomplete header to import");
}
0
},
Err(BlockImportError::VerificationFailed(who, e)) => {
if let Some(peer) = who {
link.useless_peer(peer, &format!("Verification failed: {}", e));
}
0
},
Err(BlockImportError::BadBlock(who)) => {
if let Some(peer) = who {
link.note_useless_and_restart_sync(peer, "Sent us a bad block");
}
0
},
Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => {
link.restart();
0
},
}
}
#[cfg(any(test, feature = "test-helpers"))]
struct ImportCB<B: BlockT>(RefCell<Option<Box<dyn Fn(BlockOrigin, Vec<BlockData<B>>) -> bool>>>);
#[cfg(any(test, feature = "test-helpers"))]
impl<B: BlockT> ImportCB<B> {
fn new() -> Self {
ImportCB(RefCell::new(None))
}
fn set<F>(&self, cb: Box<F>)
where F: 'static + Fn(BlockOrigin, Vec<BlockData<B>>) -> bool
{
*self.0.borrow_mut() = Some(cb);
}
fn call(&self, origin: BlockOrigin, data: Vec<BlockData<B>>) -> bool {
let b = self.0.borrow();
b.as_ref().expect("The Callback has been set before. qed.")(origin, data)
}
}
#[cfg(any(test, feature = "test-helpers"))]
unsafe impl<B: BlockT> Send for ImportCB<B> {}
#[cfg(any(test, feature = "test-helpers"))]
unsafe impl<B: BlockT> Sync for ImportCB<B> {}
#[cfg(any(test, feature = "test-helpers"))]
/// A Verifier that accepts all blocks and passes them on with the configured
/// finality to be imported.
pub struct PassThroughVerifier(pub bool);
#[cfg(any(test, feature = "test-helpers"))]
/// This Verifiyer accepts all data as valid
impl<B: BlockT> Verifier<B> for PassThroughVerifier {
fn verify(
&self,
origin: BlockOrigin,
header: B::Header,
justification: Vec<u8>,
body: Option<Vec<B::Extrinsic>>
) -> Result<(ImportBlock<B>, Option<Vec<AuthorityId>>), String> {
Ok((ImportBlock {
origin,
header,
body,
finalized: self.0,
external_justification: justification,
internal_justification: vec![],
auxiliary: Vec::new(),
}, None))
}
}
#[cfg(any(test, feature = "test-helpers"))]
/// Blocks import queue that is importing blocks in the same thread.
/// The boolean value indicates whether blocks should be imported without instant finality.
pub struct SyncImportQueue<B: BlockT, V: Verifier<B>>(Arc<V>, ImportCB<B>);
#[cfg(any(test, feature = "test-helpers"))]
pub struct SyncImportQueue(pub bool);
impl<B: BlockT, V: Verifier<B>> SyncImportQueue<B, V> {
/// Create a new SyncImportQueue wrapping the given Verifier
pub fn new(verifier: Arc<V>) -> Self {
SyncImportQueue(verifier, ImportCB::new())
}
}
#[cfg(any(test, feature = "test-helpers"))]
impl<B: 'static + BlockT> ImportQueue<B> for SyncImportQueue {
impl<B: 'static + BlockT, V: 'static + Verifier<B>> ImportQueue<B> for SyncImportQueue<B, V>
{
fn start<E: 'static + ExecuteInContext<B>>(
&self,
sync: Weak<RwLock<ChainSync<B>>>,
service: Weak<E>,
chain: Weak<Client<B>>
) -> Result<(), Error> {
let v = self.0.clone();
self.1.set(Box::new(move | origin, new_blocks | {
let verifier = v.clone();
match (sync.upgrade(), service.upgrade(), chain.upgrade()) {
(Some(sync), Some(service), Some(chain)) =>
import_many_blocks(
&mut SyncLink{chain: &sync, client: &*chain, context: &*service},
None,
(origin, new_blocks),
verifier,
),
_ => false
}
}));
Ok(())
}
fn clear(&self) { }
fn stop(&self) { }
@@ -470,14 +622,8 @@ impl<B: 'static + BlockT> ImportQueue<B> for SyncImportQueue {
false
}
fn import_blocks(&self, sync: &mut ChainSync<B>, protocol: &mut Context<B>, blocks: (BlockOrigin, Vec<BlockData<B>>)) {
struct DummyExecuteInContext;
impl<B: 'static + BlockT> ExecuteInContext<B> for DummyExecuteInContext {
fn execute_in_context<F: Fn(&mut Context<B>)>(&self, _closure: F) { }
}
import_many_blocks(&mut SyncLink::Direct::<_, DummyExecuteInContext>(sync, protocol), None, blocks, self.0);
fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<BlockData<B>>) {
self.1.call(origin, blocks);
}
}
@@ -540,14 +686,14 @@ pub mod tests {
justification: client.justification(&BlockId::Number(1)).unwrap(),
};
(client, hash, number, BlockData { block, origin: 0 })
(client, hash, number, BlockData { block, origin: Some(0) })
}
#[test]
fn import_single_good_block_works() {
let (_, hash, number, block) = prepare_good_block();
assert_eq!(
import_single_block(&test_client::new(), BlockOrigin::File, block, true),
import_single_block(&test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))),
Ok(BlockImportResult::ImportedUnknown(hash, number))
);
}
@@ -556,7 +702,7 @@ pub mod tests {
fn import_single_good_known_block_is_ignored() {
let (client, hash, number, block) = prepare_good_block();
assert_eq!(
import_single_block(&client, BlockOrigin::File, block, true),
import_single_block(&client, BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))),
Ok(BlockImportResult::ImportedKnown(hash, number))
);
}
@@ -566,8 +712,8 @@ pub mod tests {
let (_, _, _, mut block) = prepare_good_block();
block.block.header = None;
assert_eq!(
import_single_block(&test_client::new(), BlockOrigin::File, block, true),
Err(BlockImportError::Disconnect(0))
import_single_block(&test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))),
Err(BlockImportError::IncompleteHeader(Some(0)))
);
}
@@ -576,8 +722,8 @@ pub mod tests {
let (_, _, _, mut block) = prepare_good_block();
block.block.justification = None;
assert_eq!(
import_single_block(&test_client::new(), BlockOrigin::File, block, true),
Err(BlockImportError::Disconnect(0))
import_single_block(&test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))),
Err(BlockImportError::IncompleteJustification(Some(0)))
);
}
@@ -598,18 +744,22 @@ pub mod tests {
assert_eq!(link.imported, 1);
let mut link = TestLink::new();
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::Disconnect(0))), 0);
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::IncompleteHeader(Some(0)))), 0);
assert_eq!(link.total(), 1);
assert_eq!(link.disconnects, 1);
let mut link = TestLink::new();
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::DisconnectAndRestart(0))), 0);
assert_eq!(link.total(), 2);
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::IncompleteJustification(Some(0)))), 0);
assert_eq!(link.total(), 1);
assert_eq!(link.disconnects, 1);
let mut link = TestLink::new();
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::UnknownParent)), 0);
assert_eq!(link.total(), 1);
assert_eq!(link.restarts, 1);
let mut link = TestLink::new();
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::Restart)), 0);
assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::Error)), 0);
assert_eq!(link.total(), 1);
assert_eq!(link.restarts, 1);
}
@@ -618,18 +768,20 @@ pub mod tests {
fn import_many_blocks_stops_when_stopping() {
let (_, _, _, block) = prepare_good_block();
let qdata = AsyncImportQueueData::new();
let verifier = Arc::new(PassThroughVerifier(true));
qdata.is_stopping.store(true, Ordering::SeqCst);
assert!(!import_many_blocks(
&mut TestLink::new(),
Some(&qdata),
(BlockOrigin::File, vec![block.clone(), block]),
true
verifier
));
}
#[test]
fn async_import_queue_drops() {
let queue = AsyncImportQueue::new(true);
let verifier = Arc::new(PassThroughVerifier(true));
let queue = BasicQueue::new(verifier);
let service = Arc::new(DummyExecutor);
let chain = Arc::new(test_client::new());
queue.start(Weak::new(), Arc::downgrade(&service), Arc::downgrade(&chain) as Weak<Client<Block>>).unwrap();
+3 -4
View File
@@ -56,7 +56,7 @@ mod config;
mod chain;
mod blocks;
mod on_demand;
mod import_queue;
pub mod import_queue;
pub mod consensus_gossip;
pub mod error;
pub mod message;
@@ -66,12 +66,11 @@ pub mod specialization;
pub mod test;
pub use chain::Client as ClientHandle;
pub use service::{Service, FetchFuture, ConsensusService, BftMessageStream,
TransactionPool, Params, ManageNetwork, SyncProvider};
pub use service::{Service, FetchFuture, TransactionPool, Params, ManageNetwork, SyncProvider};
pub use protocol::{ProtocolStatus, PeerInfo, Context};
pub use sync::{Status as SyncStatus, SyncState};
pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, NodeIndex, ProtocolId, Severity, Protocol};
pub use message::{generic as generic_message, RequestId, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal, Status as StatusMessage};
pub use message::{generic as generic_message, RequestId, Status as StatusMessage};
pub use error::Error;
pub use config::{Roles, ProtocolConfig};
pub use on_demand::{OnDemand, OnDemandService, RemoteResponse};
+8 -97
View File
@@ -22,7 +22,7 @@ pub use self::generic::{
BlockAnnounce, RemoteCallRequest, RemoteReadRequest,
RemoteHeaderRequest, RemoteHeaderResponse,
RemoteChangesRequest, RemoteChangesResponse,
ConsensusVote, SignedConsensusVote, FromBlock
FromBlock
};
/// A unique ID of a request.
@@ -30,7 +30,6 @@ pub type RequestId = u64;
/// Type alias for using the message type using block type parameters.
pub type Message<B> = generic::Message<
B,
<B as BlockT>::Header,
<B as BlockT>::Hash,
<<B as BlockT>::Header as HeaderT>::Number,
@@ -49,11 +48,6 @@ pub type BlockRequest<B> = generic::BlockRequest<
<<B as BlockT>::Header as HeaderT>::Number,
>;
/// Type alias for using the localized bft message type using block type parameters.
pub type LocalizedBftMessage<B> = generic::LocalizedBftMessage<
B,
<B as BlockT>::Hash,
>;
/// Type alias for using the BlockData type using block type parameters.
pub type BlockData<B> = generic::BlockData<
@@ -69,24 +63,6 @@ pub type BlockResponse<B> = generic::BlockResponse<
<B as BlockT>::Extrinsic,
>;
/// Type alias for using the BftMessage type using block type parameters.
pub type BftMessage<B> = generic::BftMessage<
B,
<B as BlockT>::Hash,
>;
/// Type alias for using the SignedConsensusProposal type using block type parameters.
pub type SignedConsensusProposal<B> = generic::SignedConsensusProposal<
B,
<B as BlockT>::Hash,
>;
/// Type alias for using the SignedConsensusProposal type using block type parameters.
pub type SignedConsensusMessage<B> = generic::SignedConsensusProposal<
B,
<B as BlockT>::Hash,
>;
/// A set of transactions.
pub type Transactions<E> = Vec<E>;
@@ -148,13 +124,14 @@ pub struct RemoteReadResponse {
/// Generic types.
pub mod generic {
use primitives::{AuthorityId, ed25519};
use runtime_primitives::bft::Justification;
use runtime_primitives::Justification;
use service::Roles;
use super::{
BlockAttributes, RemoteCallResponse, RemoteReadResponse,
RequestId, Transactions, Direction
};
/// Consensus is opaque to us
pub type ConsensusMessage = Vec<u8>;
/// Block data sent in the response.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
@@ -170,7 +147,7 @@ pub mod generic {
/// Block message queue if requested.
pub message_queue: Option<Vec<u8>>,
/// Justification if requested.
pub justification: Option<Justification<Hash>>,
pub justification: Option<Justification>,
}
/// Identifies starting point of a block sequence.
@@ -182,75 +159,9 @@ pub mod generic {
Number(Number),
}
/// Communication that can occur between participants in consensus.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub enum BftMessage<Block, Hash> {
/// A consensus message (proposal or vote)
Consensus(SignedConsensusMessage<Block, Hash>),
/// Auxiliary communication (just proof-of-lock for now).
Auxiliary(Justification<Hash>),
}
/// BFT Consensus message with parent header hash attached to it.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct LocalizedBftMessage<Block, Hash> {
/// Consensus message.
pub message: BftMessage<Block, Hash>,
/// Parent header hash.
pub parent_hash: Hash,
}
/// A localized proposal message. Contains two signed pieces of data.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct SignedConsensusProposal<Block, Hash> {
/// The round number.
pub round_number: u32,
/// The proposal sent.
pub proposal: Block,
/// The digest of the proposal.
pub digest: Hash,
/// The sender of the proposal
pub sender: AuthorityId,
/// The signature on the message (propose, round number, digest)
pub digest_signature: ed25519::Signature,
/// The signature on the message (propose, round number, proposal)
pub full_signature: ed25519::Signature,
}
/// A localized vote message, including the sender.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct SignedConsensusVote<H> {
/// The message sent.
pub vote: ConsensusVote<H>,
/// The sender of the message
pub sender: AuthorityId,
/// The signature of the message.
pub signature: ed25519::Signature,
}
/// Votes during a consensus round.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub enum ConsensusVote<H> {
/// Prepare to vote for proposal with digest D.
Prepare(u32, H),
/// Commit to proposal with digest D..
Commit(u32, H),
/// Propose advancement to a new round.
AdvanceRound(u32),
}
/// A localized message.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub enum SignedConsensusMessage<Block, Hash> {
/// A proposal.
Propose(SignedConsensusProposal<Block, Hash>),
/// A vote.
Vote(SignedConsensusVote<Hash>),
}
/// A network message.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub enum Message<Block, Header, Hash, Number, Extrinsic> {
pub enum Message<Header, Hash, Number, Extrinsic> {
/// Status packet.
Status(Status<Hash, Number>),
/// Block request.
@@ -261,8 +172,8 @@ pub mod generic {
BlockAnnounce(BlockAnnounce<Header>),
/// Transactions.
Transactions(Transactions<Extrinsic>),
/// BFT Consensus statement.
BftMessage(LocalizedBftMessage<Block, Hash>),
/// Consensus protocol message.
Consensus(Hash, ConsensusMessage), // topic, opaque Vec<u8>
/// Remote method call request.
RemoteCallRequest(RemoteCallRequest<Hash>),
/// Remote method call response.
+17 -11
View File
@@ -20,7 +20,7 @@ use std::sync::Arc;
use std::time;
use parking_lot::RwLock;
use rustc_hex::ToHex;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, NumberFor, As, Zero};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, As, Zero};
use runtime_primitives::generic::BlockId;
use network_libp2p::{NodeIndex, Severity};
use codec::{Encode, Decode};
@@ -181,15 +181,15 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B,
pub(crate) struct ContextData<B: BlockT, H: ExHashT> {
// All connected peers
peers: RwLock<HashMap<NodeIndex, Peer<B, H>>>,
chain: Arc<Client<B>>,
pub chain: Arc<Client<B>>,
}
impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Create a new instance.
pub fn new(
pub fn new<I: 'static + ImportQueue<B>>(
config: ProtocolConfig,
chain: Arc<Client<B>>,
import_queue: Arc<ImportQueue<B>>,
import_queue: Arc<I>,
on_demand: Option<Arc<OnDemandService<B>>>,
transaction_pool: Arc<TransactionPool<H, B>>,
specialization: S,
@@ -373,7 +373,19 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
trace!(target: "sync", "BlockResponse {} from {} with {} blocks{}",
response.id, peer, response.blocks.len(), blocks_range);
self.sync.write().on_block_data(&mut ProtocolContext::new(&self.context_data, io), peer, request, response);
// import_queue.import_blocks also acquires sync.write();
// Break the cycle by doing these separately from the outside;
let new_blocks = {
let mut sync = self.sync.write();
sync.on_block_data(&mut ProtocolContext::new(&self.context_data, io), peer, request, response)
};
if let Some((origin, new_blocks)) = new_blocks {
let import_queue = self.sync.read().import_queue();
import_queue.import_blocks(origin, new_blocks);
}
}
/// Perform time based maintenance.
@@ -704,12 +716,6 @@ fn send_message<B: BlockT, H: ExHashT>(peers: &RwLock<HashMap<NodeIndex, Peer<B,
io.send(who, message.encode());
}
/// Hash a message.
pub(crate) fn hash_message<B: BlockT>(message: &Message<B>) -> B::Hash {
let data = message.encode();
HashFor::<B>::hash(&data)
}
/// Construct a simple protocol that is composed of several sub protocols.
/// Each "sub protocol" needs to implement `Specialization` and needs to provide a `new()` function.
/// For more fine grained implementations, this macro is not usable.
+9 -20
View File
@@ -18,7 +18,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::{io, thread};
use std::time::Duration;
use futures::{self, Future, Stream, stream, sync::{oneshot, mpsc}};
use futures::{self, Future, Stream, stream, sync::oneshot};
use parking_lot::Mutex;
use network_libp2p::{ProtocolId, PeerId, NetworkConfiguration, ErrorKind};
use network_libp2p::{start_service, Service as NetworkService, ServiceEvent as NetworkServiceEvent};
@@ -28,17 +28,14 @@ use protocol::{self, Protocol, ProtocolContext, Context, ProtocolStatus};
use config::{ProtocolConfig};
use error::Error;
use chain::Client;
use message::LocalizedBftMessage;
use specialization::Specialization;
use on_demand::OnDemandService;
use import_queue::AsyncImportQueue;
use import_queue::ImportQueue;
use runtime_primitives::traits::{Block as BlockT};
use tokio::{runtime::Runtime, timer::Interval};
/// Type that represents fetch completion future.
pub type FetchFuture = oneshot::Receiver<Vec<u8>>;
/// Type that represents bft messages stream.
pub type BftMessageStream<B> = mpsc::UnboundedReceiver<LocalizedBftMessage<B>>;
const TICK_TIMEOUT: Duration = Duration::from_millis(1000);
const PROPAGATE_TIMEOUT: Duration = Duration::from_millis(5000);
@@ -90,18 +87,6 @@ pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>);
}
/// ConsensusService
pub trait ConsensusService<B: BlockT>: Send + Sync {
/// Maintain connectivity to given addresses.
fn connect_to_authorities(&self, addresses: &[String]);
/// Get BFT message stream for messages corresponding to consensus on given
/// parent hash.
fn bft_messages(&self, parent_hash: B::Hash) -> BftMessageStream<B>;
/// Send out a BFT message.
fn send_bft_message(&self, message: LocalizedBftMessage<B>);
}
/// Service able to execute closure in the network context.
pub trait ExecuteInContext<B: BlockT>: Send + Sync {
/// Execute closure in network context.
@@ -140,10 +125,13 @@ pub struct Service<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> {
impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> {
/// Creates and register protocol with the network service
pub fn new(params: Params<B, S, H>, protocol_id: ProtocolId) -> Result<Arc<Service<B, S, H>>, Error> {
pub fn new<I: 'static + ImportQueue<B>>(
params: Params<B, S, H>,
protocol_id: ProtocolId,
import_queue: I,
) -> Result<Arc<Service<B, S, H>>, Error> {
let chain = params.chain.clone();
// TODO: non-instant finality.
let import_queue = Arc::new(AsyncImportQueue::new(true));
let import_queue = Arc::new(import_queue);
let handler = Arc::new(Protocol::new(
params.config,
params.chain,
@@ -155,6 +143,7 @@ impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> {
let versions = [(protocol::CURRENT_VERSION as u8)];
let registered = RegisteredProtocol::new(protocol_id, &versions[..]);
let (thread, network) = start_thread(params.network_config, handler.clone(), registered)?;
let sync = Arc::new(Service {
network,
protocol_id,
+14 -9
View File
@@ -171,7 +171,13 @@ impl<B: BlockT> ChainSync<B> {
}
}
pub(crate) fn on_block_data(&mut self, protocol: &mut Context<B>, who: NodeIndex, _request: message::BlockRequest<B>, response: message::BlockResponse<B>) {
pub(crate) fn on_block_data(
&mut self,
protocol: &mut Context<B>,
who: NodeIndex,
_request: message::BlockRequest<B>,
response: message::BlockResponse<B>
) -> Option<(BlockOrigin, Vec<blocks::BlockData<B>>)> {
let new_blocks = if let Some(ref mut peer) = self.peers.get_mut(&who) {
match peer.state {
PeerSyncState::DownloadingNew(start_block) => {
@@ -184,7 +190,7 @@ impl<B: BlockT> ChainSync<B> {
PeerSyncState::DownloadingStale(_) => {
peer.state = PeerSyncState::Available;
response.blocks.into_iter().map(|b| blocks::BlockData {
origin: who,
origin: Some(who),
block: b
}).collect()
},
@@ -207,23 +213,23 @@ impl<B: BlockT> ChainSync<B> {
let n = n - As::sa(1);
peer.state = PeerSyncState::AncestorSearch(n);
Self::request_ancestry(protocol, who, n);
return;
return None;
},
Ok(_) => { // genesis mismatch
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who);
protocol.report_peer(who, Severity::Bad("Ancestry search: genesis mismatch for peer"));
return;
return None;
},
Err(e) => {
protocol.report_peer(who, Severity::Useless(&format!("Error answering legitimate blockchain query: {:?}", e)));
return;
return None;
}
}
},
None => {
trace!(target:"sync", "Invalid response when searching for ancestor from {}", who);
protocol.report_peer(who, Severity::Bad("Invalid response when searching for ancestor"));
return;
return None;
}
}
},
@@ -236,7 +242,6 @@ impl<B: BlockT> ChainSync<B> {
let best_seen = self.best_seen_block();
let is_best = new_blocks.first().and_then(|b| b.block.header.as_ref()).map(|h| best_seen.as_ref().map_or(false, |n| h.number() >= n));
let origin = if is_best.unwrap_or_default() { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync };
let import_queue = self.import_queue.clone();
if let Some((hash, number)) = new_blocks.last()
.and_then(|b| b.block.header.as_ref().map(|h|(b.block.hash.clone(), *h.number())))
{
@@ -245,8 +250,8 @@ impl<B: BlockT> ChainSync<B> {
self.best_queued_hash = hash;
}
}
import_queue.import_blocks(self, protocol, (origin, new_blocks));
self.maintain_sync(protocol);
Some((origin, new_blocks))
}
pub fn maintain_sync(&mut self, protocol: &mut Context<B>) {
@@ -263,7 +268,7 @@ impl<B: BlockT> ChainSync<B> {
}
// Update common blocks
for (_, peer) in self.peers.iter_mut() {
trace!("Updating peer info ours={}, theirs={}", number, peer.best_number);
trace!(target: "sync", "Updating peer info ours={}, theirs={}", number, peer.best_number);
if peer.best_number >= number {
peer.common_number = number;
peer.common_hash = *hash;
+51 -27
View File
@@ -27,34 +27,40 @@ use client;
use client::block_builder::BlockBuilder;
use runtime_primitives::generic::BlockId;
use io::SyncIo;
use protocol::{Context, Protocol};
use protocol::{Context, Protocol, ProtocolContext};
use primitives::{Blake2Hasher};
use config::ProtocolConfig;
use service::TransactionPool;
use network_libp2p::{NodeIndex, PeerId, Severity};
use keyring::Keyring;
use codec::{Encode, Decode};
use import_queue::SyncImportQueue;
use codec::Encode;
use import_queue::{SyncImportQueue, PassThroughVerifier};
use test_client::{self, TestClient};
use specialization::Specialization;
use consensus_gossip::ConsensusGossip;
use import_queue::ImportQueue;
use service::ExecuteInContext;
pub use test_client::runtime::{Block, Hash, Transfer, Extrinsic};
struct DummyContextExecutor(Arc<Protocol<Block, DummySpecialization, Hash>>, Arc<RwLock<VecDeque<TestPacket>>>);
unsafe impl Send for DummyContextExecutor {}
unsafe impl Sync for DummyContextExecutor {}
impl ExecuteInContext<Block> for DummyContextExecutor {
fn execute_in_context<F: Fn(&mut Context<Block>)>(&self, closure: F) {
let mut io = TestIo::new(&self.1, None);
let mut context = ProtocolContext::new(&self.0.context_data(), &mut io);
closure(&mut context);
}
}
/// The test specialization.
pub struct DummySpecialization {
/// Consensus gossip handle.
pub gossip: ConsensusGossip<Block>,
}
#[derive(Encode, Decode)]
pub struct GossipMessage {
/// The topic to classify under.
pub topic: Hash,
/// The data to send.
pub data: Vec<u8>,
}
impl Specialization<Block> for DummySpecialization {
fn status(&self) -> Vec<u8> { vec![] }
@@ -66,11 +72,14 @@ impl Specialization<Block> for DummySpecialization {
self.gossip.peer_disconnected(ctx, peer_id);
}
fn on_message(&mut self, ctx: &mut Context<Block>, peer_id: NodeIndex, message: &mut Option<::message::Message<Block>>) {
if let Some(::message::generic::Message::ChainSpecific(data)) = message.take() {
let gossip_message = GossipMessage::decode(&mut &data[..])
.expect("gossip messages all in known format; qed");
self.gossip.on_chain_specific(ctx, peer_id, data, gossip_message.topic)
fn on_message(
&mut self,
ctx: &mut Context<Block>,
peer_id: NodeIndex,
message: &mut Option<::message::Message<Block>>
) {
if let Some(::message::generic::Message::Consensus(topic, data)) = message.take() {
self.gossip.on_incoming(ctx, peer_id, topic, data);
}
}
}
@@ -128,16 +137,31 @@ pub struct TestPacket {
pub struct Peer {
client: Arc<client::Client<test_client::Backend, test_client::Executor, Block>>,
pub sync: Protocol<Block, DummySpecialization, Hash>,
pub queue: RwLock<VecDeque<TestPacket>>,
pub sync: Arc<Protocol<Block, DummySpecialization, Hash>>,
pub queue: Arc<RwLock<VecDeque<TestPacket>>>,
import_queue: Arc<SyncImportQueue<Block, PassThroughVerifier>>,
executor: Arc<DummyContextExecutor>,
}
impl Peer {
fn new(
client: Arc<client::Client<test_client::Backend, test_client::Executor, Block>>,
sync: Arc<Protocol<Block, DummySpecialization, Hash>>,
queue: Arc<RwLock<VecDeque<TestPacket>>>,
import_queue: Arc<SyncImportQueue<Block, PassThroughVerifier>>,
) -> Self {
let executor = Arc::new(DummyContextExecutor(sync.clone(), queue.clone()));
Peer { client, sync, queue, import_queue, executor}
}
/// Called after blockchain has been populated to updated current state.
fn start(&self) {
// Update the sync state to the latest chain state.
let info = self.client.info().expect("In-mem client does not fail");
let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap();
self.import_queue.start(
Arc::downgrade(&self.sync.sync()),
Arc::downgrade(&self.executor),
Arc::downgrade(&self.sync.context_data().chain)).expect("Test ImportQueue always starts");
self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header);
}
@@ -189,8 +213,7 @@ impl Peer {
/// `TestNet::sync_step` needs to be called to ensure it's propagated.
pub fn gossip_message(&self, topic: Hash, data: Vec<u8>) {
self.sync.with_spec(&mut TestIo::new(&self.queue, None), |spec, ctx| {
let message = GossipMessage { topic, data }.encode();
spec.gossip.multicast_chain_specific(ctx, message, topic);
spec.gossip.multicast(ctx, topic, data);
})
}
@@ -284,24 +307,25 @@ impl TestNet {
pub fn add_peer(&mut self, config: &ProtocolConfig) {
let client = Arc::new(test_client::new());
let tx_pool = Arc::new(EmptyTransactionPool);
let import_queue = Arc::new(SyncImportQueue(false));
let import_queue = Arc::new(SyncImportQueue::new(Arc::new(PassThroughVerifier(false))));
let specialization = DummySpecialization {
gossip: ConsensusGossip::new(),
};
let sync = Protocol::new(
config.clone(),
client.clone(),
import_queue,
import_queue.clone(),
None,
tx_pool,
specialization
).unwrap();
self.peers.push(Arc::new(Peer {
sync: sync,
client: client,
queue: RwLock::new(VecDeque::new()),
}));
self.peers.push(Arc::new(Peer::new(
client,
Arc::new(sync),
Arc::new(RwLock::new(VecDeque::new())),
import_queue
)));
}
/// Get reference to peer.