Move modules around in network (#2822)

* Rename modules in protocol

* Line widths
This commit is contained in:
Pierre Krieger
2019-06-07 15:30:45 +02:00
committed by Gavin Wood
parent 693ea3cb42
commit 4d3396d095
12 changed files with 89 additions and 46 deletions
@@ -0,0 +1,705 @@
// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Utility for gossip of network messages between authorities.
//! Handles chain-specific and standard BFT messages.
use std::collections::{HashMap, HashSet, hash_map::Entry};
use std::sync::Arc;
use std::iter;
use std::time;
use log::{trace, debug};
use futures::sync::mpsc;
use lru_cache::LruCache;
use network_libp2p::PeerId;
use runtime_primitives::traits::{Block as BlockT, Hash, HashFor};
use runtime_primitives::ConsensusEngineId;
pub use crate::message::generic::{Message, ConsensusMessage};
use crate::protocol::Context;
use crate::config::Roles;
// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096;
const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_secs(30);
/// Reputation change when a peer sends us a gossip message that we didn't know about.
const GOSSIP_SUCCESS_REPUTATION_CHANGE: i32 = 1 << 4;
/// Reputation change when a peer sends us a gossip message that we already knew about.
const DUPLICATE_GOSSIP_REPUTATION_CHANGE: i32 = -(1 << 2);
/// Reputation change when a peer sends us a gossip message for an unknown engine, whatever that
/// means.
const UNKNOWN_GOSSIP_REPUTATION_CHANGE: i32 = -(1 << 6);
/// Reputation change when a peer sends a message from a topic it isn't registered on.
const UNREGISTERED_TOPIC_REPUTATION_CHANGE: i32 = -(1 << 10);
struct PeerConsensus<H> {
known_messages: HashSet<H>,
roles: Roles,
}
/// Topic stream message with sender.
#[derive(Debug, Eq, PartialEq)]
pub struct TopicNotification {
/// Message data.
pub message: Vec<u8>,
/// Sender if available.
pub sender: Option<PeerId>,
}
struct MessageEntry<B: BlockT> {
message_hash: B::Hash,
topic: B::Hash,
message: ConsensusMessage,
}
/// Consensus message destination.
pub enum MessageRecipient {
/// Send to all peers.
BroadcastToAll,
/// Send to peers that don't have that message already.
BroadcastNew,
/// Send to specific peer.
Peer(PeerId),
}
/// The reason for sending out the message.
#[derive(Eq, PartialEq, Copy, Clone)]
pub enum MessageIntent {
/// Requested broadcast
Broadcast,
/// Requested broadcast to all peers.
ForcedBroadcast,
/// Periodic rebroadcast of all messages to all peers.
PeriodicRebroadcast,
}
/// Message validation result.
pub enum ValidationResult<H> {
/// Message should be stored and propagated under given topic.
ProcessAndKeep(H),
/// Message should be processed, but not propagated.
ProcessAndDiscard(H),
/// Message should be ignored.
Discard,
}
/// Validation context. Allows reacting to incoming messages by sending out further messages.
pub trait ValidatorContext<B: BlockT> {
/// Broadcast all messages with given topic to peers that do not have it yet.
fn broadcast_topic(&mut self, topic: B::Hash, force: bool);
/// Broadcast a message to all peers that have not received it previously.
fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool);
/// Send addressed message to a peer.
fn send_message(&mut self, who: &PeerId, message: Vec<u8>);
/// Send all messages with given topic to a peer.
fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool);
}
struct NetworkContext<'g, 'p, B: BlockT> {
gossip: &'g mut ConsensusGossip<B>,
protocol: &'p mut dyn Context<B>,
engine_id: ConsensusEngineId,
}
impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
/// Broadcast all messages with given topic to peers that do not have it yet.
fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
self.gossip.broadcast_topic(self.protocol, topic, force);
}
/// Broadcast a message to all peers that have not received it previously.
fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool) {
self.gossip.multicast(
self.protocol,
topic,
ConsensusMessage{ data: message, engine_id: self.engine_id.clone() },
force,
);
}
/// Send addressed message to a peer.
fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
self.protocol.send_consensus(who.clone(), ConsensusMessage {
engine_id: self.engine_id,
data: message,
});
}
/// Send all messages with given topic to a peer.
fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
self.gossip.send_topic(self.protocol, who, topic, self.engine_id, force);
}
}
fn propagate<'a, B: BlockT, I>(
protocol: &mut dyn Context<B>,
messages: I,
intent: MessageIntent,
peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
validators: &HashMap<ConsensusEngineId, Arc<dyn Validator<B>>>,
)
where I: IntoIterator<Item=(&'a B::Hash, &'a B::Hash, &'a ConsensusMessage)>, // (msg_hash, topic, message)
{
let mut check_fns = HashMap::new();
let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, message: &ConsensusMessage| {
let engine_id = message.engine_id;
let check_fn = match check_fns.entry(engine_id) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(vacant) => match validators.get(&engine_id) {
None => return false, // treat all messages with no validator as not allowed
Some(validator) => vacant.insert(validator.message_allowed()),
}
};
(check_fn)(who, intent, topic, &message.data)
};
for (message_hash, topic, message) in messages {
for (id, ref mut peer) in peers.iter_mut() {
let intent = match intent {
MessageIntent::Broadcast =>
if peer.known_messages.contains(&message_hash) {
continue
} else {
MessageIntent::Broadcast
},
MessageIntent::PeriodicRebroadcast =>
if peer.known_messages.contains(&message_hash) {
MessageIntent::PeriodicRebroadcast
} else {
// peer doesn't know message, so the logic should treat it as an
// initial broadcast.
MessageIntent::Broadcast
},
other => other,
};
if !message_allowed(id, intent, &topic, &message) {
continue
}
peer.known_messages.insert(message_hash.clone());
trace!(target: "gossip", "Propagating to {}: {:?}", id, message);
protocol.send_consensus(id.clone(), message.clone());
}
}
}
/// Validates consensus messages.
pub trait Validator<B: BlockT>: Send + Sync {
/// New peer is connected.
fn new_peer(&self, _context: &mut dyn ValidatorContext<B>, _who: &PeerId, _roles: Roles) {
}
/// New connection is dropped.
fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<B>, _who: &PeerId) {
}
/// Validate consensus message.
fn validate(
&self,
context: &mut dyn ValidatorContext<B>,
sender: &PeerId,
data: &[u8]
) -> ValidationResult<B::Hash>;
/// Produce a closure for validating messages on a given topic.
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
Box::new(move |_topic, _data| false)
}
/// Produce a closure for filtering egress messages.
fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &B::Hash, &[u8]) -> bool + 'a> {
Box::new(move |_who, _intent, _topic, _data| true)
}
}
/// Consensus network protocol handler. Manages statements and candidate requests.
pub struct ConsensusGossip<B: BlockT> {
peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
live_message_sinks: HashMap<(ConsensusEngineId, B::Hash), Vec<mpsc::UnboundedSender<TopicNotification>>>,
messages: Vec<MessageEntry<B>>,
known_messages: LruCache<B::Hash, ()>,
validators: HashMap<ConsensusEngineId, Arc<dyn Validator<B>>>,
next_broadcast: time::Instant,
}
impl<B: BlockT> ConsensusGossip<B> {
/// Create a new instance.
pub fn new() -> Self {
ConsensusGossip {
peers: HashMap::new(),
live_message_sinks: HashMap::new(),
messages: Default::default(),
known_messages: LruCache::new(KNOWN_MESSAGES_CACHE_SIZE),
validators: Default::default(),
next_broadcast: time::Instant::now() + REBROADCAST_INTERVAL,
}
}
/// Closes all notification streams.
pub fn abort(&mut self) {
self.live_message_sinks.clear();
}
/// Register message validator for a message type.
pub fn register_validator(
&mut self,
protocol: &mut dyn Context<B>,
engine_id: ConsensusEngineId,
validator: Arc<dyn Validator<B>>
) {
self.register_validator_internal(engine_id, validator.clone());
let peers: Vec<_> = self.peers.iter().map(|(id, peer)| (id.clone(), peer.roles)).collect();
for (id, roles) in peers {
let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
validator.new_peer(&mut context, &id, roles);
}
}
fn register_validator_internal(&mut self, engine_id: ConsensusEngineId, validator: Arc<dyn Validator<B>>) {
self.validators.insert(engine_id, validator.clone());
}
/// Handle new connected peer.
pub fn new_peer(&mut self, protocol: &mut dyn Context<B>, who: PeerId, roles: Roles) {
// light nodes are not valid targets for consensus gossip messages
if !roles.is_full() {
return;
}
trace!(target:"gossip", "Registering {:?} {}", roles, who);
self.peers.insert(who.clone(), PeerConsensus {
known_messages: HashSet::new(),
roles,
});
for (engine_id, v) in self.validators.clone() {
let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
v.new_peer(&mut context, &who, roles);
}
}
fn register_message_hashed(
&mut self,
message_hash: B::Hash,
topic: B::Hash,
message: ConsensusMessage,
) {
if self.known_messages.insert(message_hash.clone(), ()).is_none() {
self.messages.push(MessageEntry {
message_hash,
topic,
message,
});
}
}
/// Registers a message without propagating it to any peers. The message
/// becomes available to new peers or when the service is asked to gossip
/// the message's topic. No validation is performed on the message, if the
/// message is already expired it should be dropped on the next garbage
/// collection.
pub fn register_message(
&mut self,
topic: B::Hash,
message: ConsensusMessage,
) {
let message_hash = HashFor::<B>::hash(&message.data[..]);
self.register_message_hashed(message_hash, topic, message);
}
/// Call when a peer has been disconnected to stop tracking gossip status.
pub fn peer_disconnected(&mut self, protocol: &mut dyn Context<B>, who: PeerId) {
for (engine_id, v) in self.validators.clone() {
let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
v.peer_disconnected(&mut context, &who);
}
}
/// Perform periodic maintenance
pub fn tick(&mut self, protocol: &mut dyn Context<B>) {
self.collect_garbage();
if time::Instant::now() >= self.next_broadcast {
self.rebroadcast(protocol);
self.next_broadcast = time::Instant::now() + REBROADCAST_INTERVAL;
}
}
/// Rebroadcast all messages to all peers.
fn rebroadcast(&mut self, protocol: &mut dyn Context<B>) {
let messages = self.messages.iter()
.map(|entry| (&entry.message_hash, &entry.topic, &entry.message));
propagate(protocol, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators);
}
/// Broadcast all messages with given topic.
pub fn broadcast_topic(&mut self, protocol: &mut dyn Context<B>, topic: B::Hash, force: bool) {
let messages = self.messages.iter()
.filter_map(|entry|
if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None }
);
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
propagate(protocol, messages, intent, &mut self.peers, &self.validators);
}
/// Prune old or no longer relevant consensus messages. Provide a predicate
/// for pruning, which returns `false` when the items with a given topic should be pruned.
pub fn collect_garbage(&mut self) {
self.live_message_sinks.retain(|_, sinks| {
sinks.retain(|sink| !sink.is_closed());
!sinks.is_empty()
});
let known_messages = &mut self.known_messages;
let before = self.messages.len();
let validators = &self.validators;
let mut check_fns = HashMap::new();
let mut message_expired = move |entry: &MessageEntry<B>| {
let engine_id = entry.message.engine_id;
let check_fn = match check_fns.entry(engine_id) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(vacant) => match validators.get(&engine_id) {
None => return true, // treat all messages with no validator as expired
Some(validator) => vacant.insert(validator.message_expired()),
}
};
(check_fn)(entry.topic, &entry.message.data)
};
self.messages.retain(|entry| !message_expired(entry));
trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)",
before - self.messages.len(),
self.messages.len(),
known_messages.len(),
);
for (_, ref mut peer) in self.peers.iter_mut() {
peer.known_messages.retain(|h| known_messages.contains_key(h));
}
}
/// Get data of valid, incoming messages for a topic (but might have expired meanwhile)
pub fn messages_for(&mut self, engine_id: ConsensusEngineId, topic: B::Hash)
-> mpsc::UnboundedReceiver<TopicNotification>
{
let (tx, rx) = mpsc::unbounded();
for entry in self.messages.iter_mut()
.filter(|e| e.topic == topic && e.message.engine_id == engine_id)
{
tx.unbounded_send(TopicNotification {
message: entry.message.data.clone(),
sender: None,
})
.expect("receiver known to be live; qed");
}
self.live_message_sinks.entry((engine_id, topic)).or_default().push(tx);
rx
}
/// Handle an incoming ConsensusMessage for topic by who via protocol. Discard message if topic
/// already known, the message is old, its source peers isn't a registered peer or the connection
/// to them is broken. Return `Some(topic, message)` if it was added to the internal queue, `None`
/// in all other cases.
pub fn on_incoming(
&mut self,
protocol: &mut dyn Context<B>,
who: PeerId,
message: ConsensusMessage,
) {
let message_hash = HashFor::<B>::hash(&message.data[..]);
if self.known_messages.contains_key(&message_hash) {
trace!(target:"gossip", "Ignored already known message from {}", who);
protocol.report_peer(who.clone(), DUPLICATE_GOSSIP_REPUTATION_CHANGE);
return;
}
let engine_id = message.engine_id;
//validate the message
let validation = self.validators.get(&engine_id)
.cloned()
.map(|v| {
let mut context = NetworkContext { gossip: self, protocol, engine_id };
v.validate(&mut context, &who, &message.data)
});
let validation_result = match validation {
Some(ValidationResult::ProcessAndKeep(topic)) => Some((topic, true)),
Some(ValidationResult::ProcessAndDiscard(topic)) => Some((topic, false)),
Some(ValidationResult::Discard) => None,
None => {
trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who);
protocol.report_peer(who.clone(), UNKNOWN_GOSSIP_REPUTATION_CHANGE);
protocol.disconnect_peer(who);
return;
}
};
if let Some((topic, keep)) = validation_result {
protocol.report_peer(who.clone(), GOSSIP_SUCCESS_REPUTATION_CHANGE);
if let Some(ref mut peer) = self.peers.get_mut(&who) {
peer.known_messages.insert(message_hash);
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) {
debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
entry.get_mut().retain(|sink| {
if let Err(e) = sink.unbounded_send(TopicNotification {
message: message.data.clone(),
sender: Some(who.clone())
}) {
trace!(target: "gossip", "Error broadcasting message notification: {:?}", e);
}
!sink.is_closed()
});
if entry.get().is_empty() {
entry.remove_entry();
}
}
if keep {
self.register_message_hashed(message_hash, topic, message);
}
} else {
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
protocol.report_peer(who.clone(), UNREGISTERED_TOPIC_REPUTATION_CHANGE);
}
} else {
trace!(target:"gossip", "Handled valid one hop message from peer {}", who);
}
}
/// Send all messages with given topic to a peer.
pub fn send_topic(
&mut self,
protocol: &mut dyn Context<B>,
who: &PeerId,
topic: B::Hash,
engine_id: ConsensusEngineId,
force: bool
) {
let validator = self.validators.get(&engine_id);
let mut message_allowed = match validator {
None => return, // treat all messages with no validator as not allowed
Some(validator) => validator.message_allowed(),
};
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
if let Some(ref mut peer) = self.peers.get_mut(who) {
for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) {
if !force && peer.known_messages.contains(&entry.message_hash) {
continue
}
if !message_allowed(who, intent, &entry.topic, &entry.message.data) {
continue
}
peer.known_messages.insert(entry.message_hash.clone());
trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message);
protocol.send_consensus(who.clone(), ConsensusMessage {
engine_id: engine_id.clone(),
data: entry.message.data.clone(),
});
}
}
}
/// Multicast a message to all peers.
pub fn multicast(
&mut self,
protocol: &mut dyn Context<B>,
topic: B::Hash,
message: ConsensusMessage,
force: bool,
) {
let message_hash = HashFor::<B>::hash(&message.data);
self.register_message_hashed(message_hash, topic, message.clone());
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
propagate(protocol, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators);
}
/// Send addressed message to a peer. The message is not kept or multicast
/// later on.
pub fn send_message(
&mut self,
protocol: &mut dyn Context<B>,
who: &PeerId,
message: ConsensusMessage,
) {
let peer = match self.peers.get_mut(who) {
None => return,
Some(peer) => peer,
};
let message_hash = HashFor::<B>::hash(&message.data);
trace!(target: "gossip", "Sending direct to {}: {:?}", who, message);
peer.known_messages.insert(message_hash);
protocol.send_consensus(who.clone(), message.clone());
}
}
#[cfg(test)]
mod tests {
use runtime_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper};
use futures::Stream;
use super::*;
type Block = RawBlock<ExtrinsicWrapper<u64>>;
macro_rules! push_msg {
($consensus:expr, $topic:expr, $hash: expr, $m:expr) => {
if $consensus.known_messages.insert($hash, ()).is_none() {
$consensus.messages.push(MessageEntry {
message_hash: $hash,
topic: $topic,
message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0]},
});
}
}
}
struct AllowAll;
impl Validator<Block> for AllowAll {
fn validate(
&self,
_context: &mut dyn ValidatorContext<Block>,
_sender: &PeerId,
_data: &[u8],
) -> ValidationResult<H256> {
ValidationResult::ProcessAndKeep(H256::default())
}
}
#[test]
fn collects_garbage() {
struct AllowOne;
impl Validator<Block> for AllowOne {
fn validate(
&self,
_context: &mut dyn ValidatorContext<Block>,
_sender: &PeerId,
data: &[u8],
) -> ValidationResult<H256> {
if data[0] == 1 {
ValidationResult::ProcessAndKeep(H256::default())
} else {
ValidationResult::Discard
}
}
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(H256, &[u8]) -> bool + 'a> {
Box::new(move |_topic, data| data[0] != 1 )
}
}
let prev_hash = H256::random();
let best_hash = H256::random();
let mut consensus = ConsensusGossip::<Block>::new();
let m1_hash = H256::random();
let m2_hash = H256::random();
let m1 = vec![1, 2, 3];
let m2 = vec![4, 5, 6];
push_msg!(consensus, prev_hash, m1_hash, m1);
push_msg!(consensus, best_hash, m2_hash, m2.clone());
consensus.known_messages.insert(m1_hash, ());
consensus.known_messages.insert(m2_hash, ());
let test_engine_id = Default::default();
consensus.register_validator_internal(test_engine_id, Arc::new(AllowAll));
consensus.collect_garbage();
assert_eq!(consensus.messages.len(), 2);
assert_eq!(consensus.known_messages.len(), 2);
consensus.register_validator_internal(test_engine_id, Arc::new(AllowOne));
// m2 is expired
consensus.collect_garbage();
assert_eq!(consensus.messages.len(), 1);
// known messages are only pruned based on size.
assert_eq!(consensus.known_messages.len(), 2);
assert!(consensus.known_messages.contains_key(&m2_hash));
}
#[test]
fn message_stream_include_those_sent_before_asking_for_stream() {
use futures::Stream;
let mut consensus = ConsensusGossip::<Block>::new();
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
let topic = HashFor::<Block>::hash(&[1,2,3]);
consensus.register_message(topic, message.clone());
let stream = consensus.messages_for([0, 0, 0, 0], topic);
assert_eq!(stream.wait().next(), Some(Ok(TopicNotification { message: message.data, sender: None })));
}
#[test]
fn can_keep_multiple_messages_per_topic() {
let mut consensus = ConsensusGossip::<Block>::new();
let topic = [1; 32].into();
let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] };
let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
consensus.register_message(topic, msg_a);
consensus.register_message(topic, msg_b);
assert_eq!(consensus.messages.len(), 2);
}
#[test]
fn can_keep_multiple_subscribers_per_topic() {
let mut consensus = ConsensusGossip::<Block>::new();
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
let topic = HashFor::<Block>::hash(&[1,2,3]);
consensus.register_message(topic, message.clone());
let stream1 = consensus.messages_for([0, 0, 0, 0], topic);
let stream2 = consensus.messages_for([0, 0, 0, 0], topic);
assert_eq!(stream1.wait().next(), Some(Ok(TopicNotification { message: message.data.clone(), sender: None })));
assert_eq!(stream2.wait().next(), Some(Ok(TopicNotification { message: message.data, sender: None })));
}
#[test]
fn topics_are_localized_to_engine_id() {
let mut consensus = ConsensusGossip::<Block>::new();
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
let topic = [1; 32].into();
let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] };
let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 1] };
consensus.register_message(topic, msg_a);
consensus.register_message(topic, msg_b);
let mut stream = consensus.messages_for([0, 0, 0, 0], topic).wait();
assert_eq!(stream.next(), Some(Ok(TopicNotification { message: vec![1, 2, 3], sender: None })));
let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic));
assert_eq!(stream.next(), None);
}
}
@@ -0,0 +1,389 @@
// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Network packet message types. These get serialized and put into the lower level protocol payload.
use bitflags::bitflags;
use runtime_primitives::{ConsensusEngineId, traits::{Block as BlockT, Header as HeaderT}};
use parity_codec::{Encode, Decode, Input, Output};
pub use self::generic::{
BlockAnnounce, RemoteCallRequest, RemoteReadRequest,
RemoteHeaderRequest, RemoteHeaderResponse,
RemoteChangesRequest, RemoteChangesResponse,
FinalityProofRequest, FinalityProofResponse,
FromBlock, RemoteReadChildRequest,
};
/// A unique ID of a request.
pub type RequestId = u64;
/// Type alias for using the message type using block type parameters.
pub type Message<B> = generic::Message<
<B as BlockT>::Header,
<B as BlockT>::Hash,
<<B as BlockT>::Header as HeaderT>::Number,
<B as BlockT>::Extrinsic,
>;
/// Type alias for using the status type using block type parameters.
pub type Status<B> = generic::Status<
<B as BlockT>::Hash,
<<B as BlockT>::Header as HeaderT>::Number,
>;
/// Type alias for using the block request type using block type parameters.
pub type BlockRequest<B> = generic::BlockRequest<
<B as BlockT>::Hash,
<<B as BlockT>::Header as HeaderT>::Number,
>;
/// Type alias for using the BlockData type using block type parameters.
pub type BlockData<B> = generic::BlockData<
<B as BlockT>::Header,
<B as BlockT>::Hash,
<B as BlockT>::Extrinsic,
>;
/// Type alias for using the BlockResponse type using block type parameters.
pub type BlockResponse<B> = generic::BlockResponse<
<B as BlockT>::Header,
<B as BlockT>::Hash,
<B as BlockT>::Extrinsic,
>;
/// A set of transactions.
pub type Transactions<E> = Vec<E>;
// Bits of block data and associated artifacts to request.
bitflags! {
/// Node roles bitmask.
pub struct BlockAttributes: u8 {
/// Include block header.
const HEADER = 0b00000001;
/// Include block body.
const BODY = 0b00000010;
/// Include block receipt.
const RECEIPT = 0b00000100;
/// Include block message queue.
const MESSAGE_QUEUE = 0b00001000;
/// Include a justification for the block.
const JUSTIFICATION = 0b00010000;
}
}
impl Encode for BlockAttributes {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push_byte(self.bits())
}
}
impl Decode for BlockAttributes {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Self::from_bits(input.read_byte()?)
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy, Encode, Decode)]
/// Block enumeration direction.
pub enum Direction {
/// Enumerate in ascending order (from child to parent).
Ascending = 0,
/// Enumerate in descendfing order (from parent to canonical child).
Descending = 1,
}
/// Remote call response.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct RemoteCallResponse {
/// Id of a request this response was made for.
pub id: RequestId,
/// Execution proof.
pub proof: Vec<Vec<u8>>,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote read response.
pub struct RemoteReadResponse {
/// Id of a request this response was made for.
pub id: RequestId,
/// Read proof.
pub proof: Vec<Vec<u8>>,
}
/// Generic types.
pub mod generic {
use parity_codec::{Encode, Decode};
use network_libp2p::CustomMessage;
use runtime_primitives::Justification;
use crate::config::Roles;
use super::{
RemoteReadResponse, Transactions, Direction,
RequestId, BlockAttributes, RemoteCallResponse, ConsensusEngineId,
};
/// Consensus is mostly opaque to us
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct ConsensusMessage {
/// Identifies consensus engine.
pub engine_id: ConsensusEngineId,
/// Message payload.
pub data: Vec<u8>,
}
/// Block data sent in the response.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct BlockData<Header, Hash, Extrinsic> {
/// Block header hash.
pub hash: Hash,
/// Block header if requested.
pub header: Option<Header>,
/// Block body if requested.
pub body: Option<Vec<Extrinsic>>,
/// Block receipt if requested.
pub receipt: Option<Vec<u8>>,
/// Block message queue if requested.
pub message_queue: Option<Vec<u8>>,
/// Justification if requested.
pub justification: Option<Justification>,
}
/// Identifies starting point of a block sequence.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub enum FromBlock<Hash, Number> {
/// Start with given hash.
Hash(Hash),
/// Start with given block number.
Number(Number),
}
/// A network message.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub enum Message<Header, Hash, Number, Extrinsic> {
/// Status packet.
Status(Status<Hash, Number>),
/// Block request.
BlockRequest(BlockRequest<Hash, Number>),
/// Block response.
BlockResponse(BlockResponse<Header, Hash, Extrinsic>),
/// Block announce.
BlockAnnounce(BlockAnnounce<Header>),
/// Transactions.
Transactions(Transactions<Extrinsic>),
/// Consensus protocol message.
Consensus(ConsensusMessage),
/// Remote method call request.
RemoteCallRequest(RemoteCallRequest<Hash>),
/// Remote method call response.
RemoteCallResponse(RemoteCallResponse),
/// Remote storage read request.
RemoteReadRequest(RemoteReadRequest<Hash>),
/// Remote storage read response.
RemoteReadResponse(RemoteReadResponse),
/// Remote header request.
RemoteHeaderRequest(RemoteHeaderRequest<Number>),
/// Remote header response.
RemoteHeaderResponse(RemoteHeaderResponse<Header>),
/// Remote changes request.
RemoteChangesRequest(RemoteChangesRequest<Hash>),
/// Remote changes reponse.
RemoteChangesResponse(RemoteChangesResponse<Number, Hash>),
/// Remote child storage read request.
RemoteReadChildRequest(RemoteReadChildRequest<Hash>),
/// Finality proof request.
FinalityProofRequest(FinalityProofRequest<Hash>),
/// Finality proof reponse.
FinalityProofResponse(FinalityProofResponse<Hash>),
/// Chain-specific message.
#[codec(index = "255")]
ChainSpecific(Vec<u8>),
}
impl<Header, Hash, Number, Extrinsic> CustomMessage for Message<Header, Hash, Number, Extrinsic>
where Self: Decode + Encode
{
fn into_bytes(self) -> Vec<u8> {
self.encode()
}
fn from_bytes(bytes: &[u8]) -> Result<Self, ()> {
Decode::decode(&mut &bytes[..]).ok_or(())
}
}
/// Status sent on connection.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct Status<Hash, Number> {
/// Protocol version.
pub version: u32,
/// Minimum supported version.
pub min_supported_version: u32,
/// Supported roles.
pub roles: Roles,
/// Best block number.
pub best_number: Number,
/// Best block hash.
pub best_hash: Hash,
/// Genesis block hash.
pub genesis_hash: Hash,
/// Chain-specific status.
pub chain_status: Vec<u8>,
}
/// Request block data from a peer.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct BlockRequest<Hash, Number> {
/// Unique request id.
pub id: RequestId,
/// Bits of block data to request.
pub fields: BlockAttributes,
/// Start from this block.
pub from: FromBlock<Hash, Number>,
/// End at this block. An implementation defined maximum is used when unspecified.
pub to: Option<Hash>,
/// Sequence direction.
pub direction: Direction,
/// Maximum number of blocks to return. An implementation defined maximum is used when unspecified.
pub max: Option<u32>,
}
/// Response to `BlockRequest`
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct BlockResponse<Header, Hash, Extrinsic> {
/// Id of a request this response was made for.
pub id: RequestId,
/// Block data for the requested sequence.
pub blocks: Vec<BlockData<Header, Hash, Extrinsic>>,
}
/// Announce a new complete relay chain block on the network.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct BlockAnnounce<H> {
/// New block header.
pub header: H,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote call request.
pub struct RemoteCallRequest<H> {
/// Unique request id.
pub id: RequestId,
/// Block at which to perform call.
pub block: H,
/// Method name.
pub method: String,
/// Call data.
pub data: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote storage read request.
pub struct RemoteReadRequest<H> {
/// Unique request id.
pub id: RequestId,
/// Block at which to perform call.
pub block: H,
/// Storage key.
pub key: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote storage read child request.
pub struct RemoteReadChildRequest<H> {
/// Unique request id.
pub id: RequestId,
/// Block at which to perform call.
pub block: H,
/// Child Storage key.
pub storage_key: Vec<u8>,
/// Storage key.
pub key: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote header request.
pub struct RemoteHeaderRequest<N> {
/// Unique request id.
pub id: RequestId,
/// Block number to request header for.
pub block: N,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote header response.
pub struct RemoteHeaderResponse<Header> {
/// Id of a request this response was made for.
pub id: RequestId,
/// Header. None if proof generation has failed (e.g. header is unknown).
pub header: Option<Header>,
/// Header proof.
pub proof: Vec<Vec<u8>>,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote changes request.
pub struct RemoteChangesRequest<H> {
/// Unique request id.
pub id: RequestId,
/// Hash of the first block of the range (including first) where changes are requested.
pub first: H,
/// Hash of the last block of the range (including last) where changes are requested.
pub last: H,
/// Hash of the first block for which the requester has the changes trie root. All other
/// affected roots must be proved.
pub min: H,
/// Hash of the last block that we can use when querying changes.
pub max: H,
/// Storage key which changes are requested.
pub key: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote changes response.
pub struct RemoteChangesResponse<N, H> {
/// Id of a request this response was made for.
pub id: RequestId,
/// Proof has been generated using block with this number as a max block. Should be
/// less than or equal to the RemoteChangesRequest::max block number.
pub max: N,
/// Changes proof.
pub proof: Vec<Vec<u8>>,
/// Changes tries roots missing on the requester' node.
pub roots: Vec<(N, H)>,
/// Missing changes tries roots proof.
pub roots_proof: Vec<Vec<u8>>,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Finality proof request.
pub struct FinalityProofRequest<H> {
/// Unique request id.
pub id: RequestId,
/// Hash of the block to request proof for.
pub block: H,
/// Additional data blob (that both requester and provider understood) required for proving finality.
pub request: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Finality proof response.
pub struct FinalityProofResponse<H> {
/// Id of a request this response was made for.
pub id: RequestId,
/// Hash of the block (the same as in the FinalityProofRequest).
pub block: H,
/// Finality proof (if available).
pub proof: Option<Vec<u8>>,
}
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,151 @@
// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Specializations of the substrate network protocol to allow more complex forms of communication.
use crate::PeerId;
use runtime_primitives::traits::Block as BlockT;
use crate::protocol::Context;
/// A specialization of the substrate network protocol. Handles events and sends messages.
pub trait NetworkSpecialization<B: BlockT>: Send + Sync + 'static {
/// Get the current specialization-status.
fn status(&self) -> Vec<u8>;
/// Called when a peer successfully handshakes.
fn on_connect(&mut self, ctx: &mut dyn Context<B>, who: PeerId, status: crate::message::Status<B>);
/// Called when a peer is disconnected. If the peer ID is unknown, it should be ignored.
fn on_disconnect(&mut self, ctx: &mut dyn Context<B>, who: PeerId);
/// Called when a network-specific message arrives.
fn on_message(
&mut self,
ctx: &mut dyn Context<B>,
who: PeerId,
message: &mut Option<crate::message::Message<B>>
);
/// Called on abort.
#[deprecated(note = "This method is never called; aborting corresponds to dropping the object")]
fn on_abort(&mut self) { }
/// Called periodically to maintain peers and handle timeouts.
fn maintain_peers(&mut self, _ctx: &mut dyn Context<B>) { }
/// Called when a block is _imported_ at the head of the chain (not during major sync).
/// Not guaranteed to be called for every block, but will be most of the after major sync.
fn on_block_imported(&mut self, _ctx: &mut dyn Context<B>, _hash: B::Hash, _header: &B::Header) { }
}
/// Construct a simple protocol that is composed of several sub protocols.
/// Each "sub protocol" needs to implement `Specialization` and needs to provide a `new()` function.
/// For more fine grained implementations, this macro is not usable.
///
/// # Example
///
/// ```nocompile
/// construct_simple_protocol! {
/// pub struct MyProtocol where Block = MyBlock {
/// consensus_gossip: ConsensusGossip<MyBlock>,
/// other_protocol: MyCoolStuff,
/// }
/// }
/// ```
///
/// You can also provide an optional parameter after `where Block = MyBlock`, so it looks like
/// `where Block = MyBlock, Status = consensus_gossip`. This will instruct the implementation to
/// use the `status()` function from the `ConsensusGossip` protocol. By default, `status()` returns
/// an empty vector.
#[macro_export]
macro_rules! construct_simple_protocol {
(
$( #[ $attr:meta ] )*
pub struct $protocol:ident where
Block = $block:ident
$( , Status = $status_protocol_name:ident )*
{
$( $sub_protocol_name:ident : $sub_protocol:ident $( <$protocol_block:ty> )*, )*
}
) => {
$( #[$attr] )*
pub struct $protocol {
$( $sub_protocol_name: $sub_protocol $( <$protocol_block> )*, )*
}
impl $protocol {
/// Instantiate a node protocol handler.
pub fn new() -> Self {
Self {
$( $sub_protocol_name: $sub_protocol::new(), )*
}
}
}
impl $crate::specialization::NetworkSpecialization<$block> for $protocol {
fn status(&self) -> Vec<u8> {
$(
let status = self.$status_protocol_name.status();
if !status.is_empty() {
return status;
}
)*
Vec::new()
}
fn on_connect(
&mut self,
_ctx: &mut $crate::Context<$block>,
_who: $crate::PeerId,
_status: $crate::StatusMessage<$block>
) {
$( self.$sub_protocol_name.on_connect(_ctx, _who, _status); )*
}
fn on_disconnect(&mut self, _ctx: &mut $crate::Context<$block>, _who: $crate::PeerId) {
$( self.$sub_protocol_name.on_disconnect(_ctx, _who); )*
}
fn on_message(
&mut self,
_ctx: &mut $crate::Context<$block>,
_who: $crate::PeerId,
_message: &mut Option<$crate::message::Message<$block>>
) {
$( self.$sub_protocol_name.on_message(_ctx, _who, _message); )*
}
fn on_abort(&mut self) {
$( self.$sub_protocol_name.on_abort(); )*
}
fn maintain_peers(&mut self, _ctx: &mut $crate::Context<$block>) {
$( self.$sub_protocol_name.maintain_peers(_ctx); )*
}
fn on_block_imported(
&mut self,
_ctx: &mut $crate::Context<$block>,
_hash: <$block as $crate::BlockT>::Hash,
_header: &<$block as $crate::BlockT>::Header
) {
$( self.$sub_protocol_name.on_block_imported(_ctx, _hash, _header); )*
}
}
}
}
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,298 @@
// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::mem;
use std::cmp;
use std::ops::Range;
use std::collections::{HashMap, BTreeMap};
use std::collections::hash_map::Entry;
use log::trace;
use network_libp2p::PeerId;
use runtime_primitives::traits::{Block as BlockT, NumberFor, One};
use crate::message;
const MAX_PARALLEL_DOWNLOADS: u32 = 1;
/// Block data with origin.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockData<B: BlockT> {
/// The Block Message from the wire
pub block: message::BlockData<B>,
/// The peer, we received this from
pub origin: Option<PeerId>,
}
#[derive(Debug)]
enum BlockRangeState<B: BlockT> {
Downloading {
len: NumberFor<B>,
downloading: u32,
},
Complete(Vec<BlockData<B>>),
}
impl<B: BlockT> BlockRangeState<B> {
pub fn len(&self) -> NumberFor<B> {
match *self {
BlockRangeState::Downloading { len, .. } => len,
BlockRangeState::Complete(ref blocks) => (blocks.len() as u32).into(),
}
}
}
/// A collection of blocks being downloaded.
#[derive(Default)]
pub struct BlockCollection<B: BlockT> {
/// Downloaded blocks.
blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>,
peer_requests: HashMap<PeerId, NumberFor<B>>,
}
impl<B: BlockT> BlockCollection<B> {
/// Create a new instance.
pub fn new() -> Self {
BlockCollection {
blocks: BTreeMap::new(),
peer_requests: HashMap::new(),
}
}
/// Clear everything.
pub fn clear(&mut self) {
self.blocks.clear();
self.peer_requests.clear();
}
/// Insert a set of blocks into collection.
pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, who: PeerId) {
if blocks.is_empty() {
return;
}
match self.blocks.get(&start) {
Some(&BlockRangeState::Downloading { .. }) => {
trace!(target: "sync", "Ignored block data still marked as being downloaded: {}", start);
debug_assert!(false);
return;
},
Some(&BlockRangeState::Complete(ref existing)) if existing.len() >= blocks.len() => {
trace!(target: "sync", "Ignored block data already downloaded: {}", start);
return;
},
_ => (),
}
self.blocks.insert(start, BlockRangeState::Complete(blocks.into_iter()
.map(|b| BlockData { origin: Some(who.clone()), block: b }).collect()));
}
/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
pub fn needed_blocks(&mut self, who: PeerId, count: usize, peer_best: NumberFor<B>, common: NumberFor<B>)
-> Option<Range<NumberFor<B>>> {
// First block number that we need to download
let first_different = common + <NumberFor<B>>::one();
let count = (count as u32).into();
let (mut range, downloading) = {
let mut downloading_iter = self.blocks.iter().peekable();
let mut prev: Option<(&NumberFor<B>, &BlockRangeState<B>)> = None;
loop {
let next = downloading_iter.next();
break match &(prev, next) {
&(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
if downloading < MAX_PARALLEL_DOWNLOADS =>
(*start .. *start + *len, downloading),
&(Some((start, r)), Some((next_start, _))) if *start + r.len() < *next_start =>
(*start + r.len() .. cmp::min(*next_start, *start + r.len() + count), 0), // gap
&(Some((start, r)), None) =>
(*start + r.len() .. *start + r.len() + count, 0), // last range
&(None, None) =>
(first_different .. first_different + count, 0), // empty
&(None, Some((start, _))) if *start > first_different =>
(first_different .. cmp::min(first_different + count, *start), 0), // gap at the start
_ => {
prev = next;
continue
},
}
}
};
// crop to peers best
if range.start > peer_best {
trace!(target: "sync", "Out of range for peer {} ({} vs {})", who, range.start, peer_best);
return None;
}
range.end = cmp::min(peer_best + One::one(), range.end);
self.peer_requests.insert(who, range.start);
self.blocks.insert(range.start, BlockRangeState::Downloading {
len: range.end - range.start,
downloading: downloading + 1
});
if range.end <= range.start {
panic!("Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}",
range, count, peer_best, common, self.blocks);
}
Some(range)
}
/// Get a valid chain of blocks ordered in descending order and ready for importing into blockchain.
pub fn drain(&mut self, from: NumberFor<B>) -> Vec<BlockData<B>> {
let mut drained = Vec::new();
let mut ranges = Vec::new();
{
let mut prev = from;
for (start, range_data) in &mut self.blocks {
match range_data {
&mut BlockRangeState::Complete(ref mut blocks) if *start <= prev => {
prev = *start + (blocks.len() as u32).into();
let mut blocks = mem::replace(blocks, Vec::new());
drained.append(&mut blocks);
ranges.push(*start);
},
_ => break,
}
}
}
for r in ranges {
self.blocks.remove(&r);
}
trace!(target: "sync", "Drained {} blocks", drained.len());
drained
}
pub fn clear_peer_download(&mut self, who: &PeerId) {
match self.peer_requests.entry(who.clone()) {
Entry::Occupied(entry) => {
let start = entry.remove();
let remove = match self.blocks.get_mut(&start) {
Some(&mut BlockRangeState::Downloading { ref mut downloading, .. }) if *downloading > 1 => {
*downloading = *downloading - 1;
false
},
Some(&mut BlockRangeState::Downloading { .. }) => {
true
},
_ => {
debug_assert!(false);
false
}
};
if remove {
self.blocks.remove(&start);
}
},
_ => (),
}
}
}
#[cfg(test)]
mod test {
use super::{BlockCollection, BlockData, BlockRangeState};
use crate::{message, PeerId};
use runtime_primitives::testing::{Block as RawBlock, ExtrinsicWrapper};
use primitives::H256;
type Block = RawBlock<ExtrinsicWrapper<u64>>;
fn is_empty(bc: &BlockCollection<Block>) -> bool {
bc.blocks.is_empty() &&
bc.peer_requests.is_empty()
}
fn generate_blocks(n: usize) -> Vec<message::BlockData<Block>> {
(0 .. n).map(|_| message::generic::BlockData {
hash: H256::random(),
header: None,
body: None,
message_queue: None,
receipt: None,
justification: None,
}).collect()
}
#[test]
fn create_clear() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
bc.insert(1, generate_blocks(100), PeerId::random());
assert!(!is_empty(&bc));
bc.clear();
assert!(is_empty(&bc));
}
#[test]
fn insert_blocks() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
let peer0 = PeerId::random();
let peer1 = PeerId::random();
let peer2 = PeerId::random();
let blocks = generate_blocks(150);
assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0), Some(1 .. 41));
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0), Some(41 .. 81));
assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 0), Some(81 .. 121));
bc.clear_peer_download(&peer1);
bc.insert(41, blocks[41..81].to_vec(), peer1.clone());
assert_eq!(bc.drain(1), vec![]);
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0), Some(121 .. 151));
bc.clear_peer_download(&peer0);
bc.insert(1, blocks[1..11].to_vec(), peer0.clone());
assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0), Some(11 .. 41));
assert_eq!(bc.drain(1), blocks[1..11].iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }).collect::<Vec<_>>());
bc.clear_peer_download(&peer0);
bc.insert(11, blocks[11..41].to_vec(), peer0.clone());
let drained = bc.drain(12);
assert_eq!(drained[..30], blocks[11..41].iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }).collect::<Vec<_>>()[..]);
assert_eq!(drained[30..], blocks[41..81].iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }).collect::<Vec<_>>()[..]);
bc.clear_peer_download(&peer2);
assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 80), Some(81 .. 121));
bc.clear_peer_download(&peer2);
bc.insert(81, blocks[81..121].to_vec(), peer2.clone());
bc.clear_peer_download(&peer1);
bc.insert(121, blocks[121..150].to_vec(), peer1.clone());
assert_eq!(bc.drain(80), vec![]);
let drained = bc.drain(81);
assert_eq!(drained[..40], blocks[81..121].iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer2.clone()) }).collect::<Vec<_>>()[..]);
assert_eq!(drained[40..], blocks[121..150].iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }).collect::<Vec<_>>()[..]);
}
#[test]
fn large_gap() {
let mut bc: BlockCollection<Block> = BlockCollection::new();
bc.blocks.insert(100, BlockRangeState::Downloading {
len: 128,
downloading: 1,
});
let blocks = generate_blocks(10).into_iter().map(|b| BlockData { block: b, origin: None }).collect();
bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
let peer0 = PeerId::random();
assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 000), Some(1 .. 100));
assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 600), Some(100 + 128 .. 100 + 128 + 128));
}
}
@@ -0,0 +1,467 @@
// Copyright 2017-2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant};
use log::{trace, warn};
use client::error::Error as ClientError;
use consensus::import_queue::SharedFinalityProofRequestBuilder;
use fork_tree::ForkTree;
use network_libp2p::PeerId;
use runtime_primitives::Justification;
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use crate::protocol::message;
use crate::protocol::sync::{Context, PeerSync, PeerSyncState};
// Time to wait before trying to get the same extra data from the same peer.
const EXTRA_RETRY_WAIT: Duration = Duration::from_secs(10);
/// Pending extra data request for the given block (hash and number).
type ExtraRequest<B> = (<B as BlockT>::Hash, NumberFor<B>);
/// Extra requests processor.
pub(crate) trait ExtraRequestsEssence<B: BlockT> {
type Response;
/// Name of request type to display in logs.
fn type_name(&self) -> &'static str;
/// Send network message corresponding to the request.
fn send_network_request(&self, protocol: &mut dyn Context<B>, peer: PeerId, request: ExtraRequest<B>);
/// Create peer state for peer that is downloading extra data.
fn peer_downloading_state(&self, block: B::Hash) -> PeerSyncState<B>;
}
/// Manages all extra data requests required for sync.
pub(crate) struct ExtraRequestsAggregator<B: BlockT> {
/// Manages justifications requests.
justifications: ExtraRequests<B, JustificationsRequestsEssence>,
/// Manages finality proof requests.
finality_proofs: ExtraRequests<B, FinalityProofRequestsEssence<B>>,
}
impl<B: BlockT> ExtraRequestsAggregator<B> {
pub(crate) fn new() -> Self {
ExtraRequestsAggregator {
justifications: ExtraRequests::new(JustificationsRequestsEssence),
finality_proofs: ExtraRequests::new(FinalityProofRequestsEssence(None)),
}
}
pub(crate) fn justifications(&mut self) -> &mut ExtraRequests<B, JustificationsRequestsEssence> {
&mut self.justifications
}
pub(crate) fn finality_proofs(&mut self) -> &mut ExtraRequests<B, FinalityProofRequestsEssence<B>> {
&mut self.finality_proofs
}
/// Dispatches all possible pending requests to the given peers.
pub(crate) fn dispatch(&mut self, peers: &mut HashMap<PeerId, PeerSync<B>>, protocol: &mut dyn Context<B>) {
self.justifications.dispatch(peers, protocol);
self.finality_proofs.dispatch(peers, protocol);
}
/// Removes any pending extra requests for blocks lower than the
/// given best finalized.
pub(crate) fn on_block_finalized<F>(
&mut self,
best_finalized_hash: &B::Hash,
best_finalized_number: NumberFor<B>,
is_descendent_of: &F,
) -> Result<(), fork_tree::Error<ClientError>>
where F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>
{
self.justifications.on_block_finalized(best_finalized_hash, best_finalized_number, is_descendent_of)?;
self.finality_proofs.on_block_finalized(best_finalized_hash, best_finalized_number, is_descendent_of)?;
Ok(())
}
/// Retry any pending request if a peer disconnected.
pub(crate) fn peer_disconnected(&mut self, who: PeerId) {
self.justifications.peer_disconnected(&who);
self.finality_proofs.peer_disconnected(&who);
}
}
/// Manages pending block extra data (e.g. justification) requests.
/// Multiple extras may be requested for competing forks, or for the same branch
/// at different (increasing) heights. This structure will guarantee that extras
/// are fetched in-order, and that obsolete changes are pruned (when finalizing a
/// competing fork).
pub(crate) struct ExtraRequests<B: BlockT, Essence> {
tree: ForkTree<B::Hash, NumberFor<B>, ()>,
pending_requests: VecDeque<ExtraRequest<B>>,
peer_requests: HashMap<PeerId, ExtraRequest<B>>,
previous_requests: HashMap<ExtraRequest<B>, Vec<(PeerId, Instant)>>,
importing_requests: HashSet<ExtraRequest<B>>,
essence: Essence,
}
impl<B: BlockT, Essence: ExtraRequestsEssence<B>> ExtraRequests<B, Essence> {
fn new(essence: Essence) -> Self {
ExtraRequests {
tree: ForkTree::new(),
pending_requests: VecDeque::new(),
peer_requests: HashMap::new(),
previous_requests: HashMap::new(),
importing_requests: HashSet::new(),
essence,
}
}
/// Get mutable reference to the requests essence.
pub(crate) fn essence(&mut self) -> &mut Essence {
&mut self.essence
}
/// Dispatches all possible pending requests to the given peers. Peers are
/// filtered according to the current known best block (i.e. we won't send a
/// extra request for block #10 to a peer at block #2), and we also
/// throttle requests to the same peer if a previous justification request
/// yielded no results.
pub(crate) fn dispatch(&mut self, peers: &mut HashMap<PeerId, PeerSync<B>>, protocol: &mut dyn Context<B>) {
if self.pending_requests.is_empty() {
return;
}
let initial_pending_requests = self.pending_requests.len();
// clean up previous failed requests so we can retry again
for (_, requests) in self.previous_requests.iter_mut() {
requests.retain(|(_, instant)| instant.elapsed() < EXTRA_RETRY_WAIT);
}
let mut available_peers = peers.iter().filter_map(|(peer, sync)| {
// don't request to any peers that already have pending requests or are unavailable
if sync.state != PeerSyncState::Available || self.peer_requests.contains_key(&peer) {
None
} else {
Some((peer.clone(), sync.best_number))
}
}).collect::<VecDeque<_>>();
let mut last_peer = available_peers.back().map(|p| p.0.clone());
let mut unhandled_requests = VecDeque::new();
loop {
let (peer, peer_best_number) = match available_peers.pop_front() {
Some(p) => p,
_ => break,
};
// only ask peers that have synced past the block number that we're
// asking the extra for and to whom we haven't already made
// the same request recently
let peer_eligible = {
let request = match self.pending_requests.front() {
Some(r) => r.clone(),
_ => break,
};
peer_best_number >= request.1 &&
!self.previous_requests
.get(&request)
.map(|requests| requests.iter().any(|i| i.0 == peer))
.unwrap_or(false)
};
if !peer_eligible {
available_peers.push_back((peer.clone(), peer_best_number));
// we tried all peers and none can answer this request
if Some(peer) == last_peer {
last_peer = available_peers.back().map(|p| p.0.clone());
let request = self.pending_requests.pop_front()
.expect("verified to be Some in the beginning of the loop; qed");
unhandled_requests.push_back(request);
}
continue;
}
last_peer = available_peers.back().map(|p| p.0.clone());
let request = self.pending_requests.pop_front()
.expect("verified to be Some in the beginning of the loop; qed");
self.peer_requests.insert(peer.clone(), request);
peers.get_mut(&peer)
.expect("peer was is taken from available_peers; available_peers is a subset of peers; qed")
.state = self.essence.peer_downloading_state(request.0.clone());
trace!(target: "sync", "Requesting {} for block #{} from {}", self.essence.type_name(), request.0, peer);
self.essence.send_network_request(protocol, peer, request);
}
self.pending_requests.append(&mut unhandled_requests);
trace!(target: "sync", "Dispatched {} {} requests ({} pending)",
initial_pending_requests - self.pending_requests.len(),
self.essence.type_name(),
self.pending_requests.len(),
);
}
/// Queue a extra data request (without dispatching it).
pub(crate) fn queue_request<F>(&mut self, request: ExtraRequest<B>, is_descendent_of: F)
where F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>
{
match self.tree.import(request.0.clone(), request.1.clone(), (), &is_descendent_of) {
Ok(true) => {
// this is a new root so we add it to the current `pending_requests`
self.pending_requests.push_back((request.0, request.1));
},
Err(err) => {
warn!(target: "sync", "Failed to insert requested {} {:?} {:?} into tree: {:?}",
self.essence.type_name(),
request.0,
request.1,
err,
);
return;
},
_ => {},
}
}
/// Retry any pending request if a peer disconnected.
fn peer_disconnected(&mut self, who: &PeerId) {
if let Some(request) = self.peer_requests.remove(who) {
self.pending_requests.push_front(request);
}
}
/// Process the import result of an extra.
/// Queues a retry in case the import failed.
/// Returns true if import has been queued.
pub(crate) fn on_import_result(
&mut self,
request: (B::Hash, NumberFor<B>),
finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
) -> bool {
self.try_finalize_root(request, finalization_result, true)
}
/// Processes the response for the request previously sent to the given
/// peer. Queues a retry in case the given justification
/// was `None`.
pub(crate) fn on_response(
&mut self,
who: PeerId,
response: Option<Essence::Response>,
) -> Option<(PeerId, B::Hash, NumberFor<B>, Essence::Response)> {
// we assume that the request maps to the given response, this is
// currently enforced by the outer network protocol before passing on
// messages to chain sync.
if let Some(request) = self.peer_requests.remove(&who) {
if let Some(response) = response {
self.importing_requests.insert(request);
return Some((who, request.0, request.1, response));
}
self.previous_requests
.entry(request)
.or_insert(Vec::new())
.push((who, Instant::now()));
self.pending_requests.push_front(request);
}
None
}
/// Removes any pending extra requests for blocks lower than the
/// given best finalized.
fn on_block_finalized<F>(
&mut self,
best_finalized_hash: &B::Hash,
best_finalized_number: NumberFor<B>,
is_descendent_of: F,
) -> Result<(), fork_tree::Error<ClientError>>
where F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>
{
let is_scheduled_root = self.try_finalize_root(
(*best_finalized_hash, best_finalized_number),
Ok((*best_finalized_hash, best_finalized_number)),
false,
);
if is_scheduled_root {
return Ok(());
}
self.tree.finalize(best_finalized_hash, best_finalized_number, &is_descendent_of)?;
let roots = self.tree.roots().collect::<HashSet<_>>();
self.pending_requests.retain(|(h, n)| roots.contains(&(h, n, &())));
self.peer_requests.retain(|_, (h, n)| roots.contains(&(h, n, &())));
self.previous_requests.retain(|(h, n), _| roots.contains(&(h, n, &())));
Ok(())
}
/// Clear all data.
pub(crate) fn clear(&mut self) {
self.tree = ForkTree::new();
self.pending_requests.clear();
self.peer_requests.clear();
self.previous_requests.clear();
}
/// Try to finalize pending root.
/// Returns true if import of this request has been scheduled.
fn try_finalize_root(
&mut self,
request: (B::Hash, NumberFor<B>),
finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
reschedule_on_failure: bool,
) -> bool {
if !self.importing_requests.remove(&request) {
return false;
}
let (finalized_hash, finalized_number) = match finalization_result {
Ok((finalized_hash, finalized_number)) => (finalized_hash, finalized_number),
Err(_) => {
if reschedule_on_failure {
self.pending_requests.push_front(request);
}
return true;
},
};
if self.tree.finalize_root(&finalized_hash).is_none() {
warn!(target: "sync", "Imported {} for {:?} {:?} which isn't a root in the tree: {:?}",
self.essence.type_name(),
finalized_hash,
finalized_number,
self.tree.roots().collect::<Vec<_>>(),
);
return true;
};
self.previous_requests.clear();
self.peer_requests.clear();
self.pending_requests =
self.tree.roots().map(|(h, n, _)| (h.clone(), n.clone())).collect();
true
}
}
pub(crate) struct JustificationsRequestsEssence;
impl<B: BlockT> ExtraRequestsEssence<B> for JustificationsRequestsEssence {
type Response = Justification;
fn type_name(&self) -> &'static str {
"justification"
}
fn send_network_request(&self, protocol: &mut dyn Context<B>, peer: PeerId, request: ExtraRequest<B>) {
protocol.send_block_request(peer, message::generic::BlockRequest {
id: 0,
fields: message::BlockAttributes::JUSTIFICATION,
from: message::FromBlock::Hash(request.0),
to: None,
direction: message::Direction::Ascending,
max: Some(1),
})
}
fn peer_downloading_state(&self, block: B::Hash) -> PeerSyncState<B> {
PeerSyncState::DownloadingJustification(block)
}
}
pub(crate) struct FinalityProofRequestsEssence<B: BlockT>(pub Option<SharedFinalityProofRequestBuilder<B>>);
impl<B: BlockT> ExtraRequestsEssence<B> for FinalityProofRequestsEssence<B> {
type Response = Vec<u8>;
fn type_name(&self) -> &'static str {
"finality proof"
}
fn send_network_request(&self, protocol: &mut dyn Context<B>, peer: PeerId, request: ExtraRequest<B>) {
protocol.send_finality_proof_request(peer, message::generic::FinalityProofRequest {
id: 0,
block: request.0,
request: self.0.as_ref()
.map(|builder| builder.build_request_data(&request.0))
.unwrap_or_default(),
})
}
fn peer_downloading_state(&self, block: B::Hash) -> PeerSyncState<B> {
PeerSyncState::DownloadingFinalityProof(block)
}
}
#[cfg(test)]
mod tests {
use client::error::Error as ClientError;
use test_client::runtime::{Block, Hash};
use super::ExtraRequestsAggregator;
#[test]
fn request_is_rescheduled_when_earlier_block_is_finalized() {
let _ = ::env_logger::try_init();
let mut extra_requests = ExtraRequestsAggregator::<Block>::new();
let hash4 = [4; 32].into();
let hash5 = [5; 32].into();
let hash6 = [6; 32].into();
let hash7 = [7; 32].into();
fn is_descendent_of(base: &Hash, target: &Hash) -> Result<bool, ClientError> {
Ok(target[0] >= base[0])
}
// make #4 last finalized block
extra_requests.finality_proofs().tree.import(hash4, 4, (), &is_descendent_of).unwrap();
extra_requests.finality_proofs().tree.finalize_root(&hash4);
// schedule request for #6
extra_requests.finality_proofs().queue_request((hash6, 6), is_descendent_of);
// receive finality proof for #5
extra_requests.finality_proofs().importing_requests.insert((hash6, 6));
extra_requests.finality_proofs().on_block_finalized(&hash5, 5, is_descendent_of).unwrap();
extra_requests.finality_proofs().on_import_result((hash6, 6), Ok((hash5, 5)));
// ensure that request for #6 is still pending
assert_eq!(
extra_requests.finality_proofs().pending_requests.iter().collect::<Vec<_>>(),
vec![&(hash6, 6)],
);
// receive finality proof for #7
extra_requests.finality_proofs().importing_requests.insert((hash6, 6));
extra_requests.finality_proofs().on_block_finalized(&hash6, 6, is_descendent_of).unwrap();
extra_requests.finality_proofs().on_block_finalized(&hash7, 7, is_descendent_of).unwrap();
extra_requests.finality_proofs().on_import_result((hash6, 6), Ok((hash7, 7)));
// ensure that there's no request for #6
assert_eq!(
extra_requests.finality_proofs().pending_requests.iter().collect::<Vec<_>>(),
Vec::<&(Hash, u64)>::new(),
);
}
}
@@ -0,0 +1,76 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use linked_hash_set::LinkedHashSet;
use std::{hash::Hash, num::NonZeroUsize};
/// Wrapper around `LinkedHashSet` which grows bounded.
///
/// In the limit, for each element inserted the oldest existing element will be removed.
#[derive(Debug, Clone)]
pub(crate) struct LruHashSet<T: Hash + Eq> {
set: LinkedHashSet<T>,
limit: NonZeroUsize
}
impl<T: Hash + Eq> LruHashSet<T> {
/// Create a new `LruHashSet` with the given (exclusive) limit.
pub(crate) fn new(limit: NonZeroUsize) -> Self {
Self { set: LinkedHashSet::new(), limit }
}
/// Insert element into the set.
///
/// Returns `true` if this is a new element to the set, `false` otherwise.
/// Maintains the limit of the set by removing the oldest entry if necessary.
/// Inserting the same element will update its LRU position.
pub(crate) fn insert(&mut self, e: T) -> bool {
if self.set.insert(e) {
if self.set.len() == usize::from(self.limit) {
self.set.pop_front(); // remove oldest entry
}
return true
}
false
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn maintains_limit() {
let three = NonZeroUsize::new(3).unwrap();
let mut set = LruHashSet::<u8>::new(three);
// First element.
assert!(set.insert(1));
assert_eq!(vec![&1], set.set.iter().collect::<Vec<_>>());
// Second element.
assert!(set.insert(2));
assert_eq!(vec![&1, &2], set.set.iter().collect::<Vec<_>>());
// Inserting the same element updates its LRU position.
assert!(!set.insert(1));
assert_eq!(vec![&2, &1], set.set.iter().collect::<Vec<_>>());
// We reached the limit. The next element forces the oldest one out.
assert!(set.insert(3));
assert_eq!(vec![&1, &3], set.set.iter().collect::<Vec<_>>());
}
}