Clean-ups in the network-gossip crate (#4542)

* Remove usage of sc_network::Context trait

* Remove Context::send_consensus

* Pass &mut dyn Network instead of &dyn Network

* Move Validator traits and related to separate module
This commit is contained in:
Pierre Krieger
2020-01-09 19:24:51 +01:00
committed by Gavin Wood
parent b61b3095ee
commit bc3d283e78
5 changed files with 159 additions and 235 deletions
@@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::{Network, MessageIntent, Validator, ValidatorContext, ValidationResult};
use std::collections::{HashMap, HashSet, hash_map::Entry};
use std::sync::Arc;
use std::iter;
@@ -25,7 +27,6 @@ use libp2p::PeerId;
use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
use sp_runtime::ConsensusEngineId;
pub use sc_network::message::generic::{Message, ConsensusMessage};
use sc_network::Context;
use sc_network::config::Roles;
// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
@@ -67,62 +68,23 @@ struct MessageEntry<B: BlockT> {
sender: Option<PeerId>,
}
/// The reason for sending out the message.
#[derive(Eq, PartialEq, Copy, Clone)]
#[cfg_attr(test, derive(Debug))]
pub enum MessageIntent {
/// Requested broadcast.
Broadcast,
/// Requested broadcast to all peers.
ForcedBroadcast,
/// Periodic rebroadcast of all messages to all peers.
PeriodicRebroadcast,
}
/// Message validation result.
pub enum ValidationResult<H> {
/// Message should be stored and propagated under given topic.
ProcessAndKeep(H),
/// Message should be processed, but not propagated.
ProcessAndDiscard(H),
/// Message should be ignored.
Discard,
}
impl MessageIntent {
fn broadcast() -> MessageIntent {
MessageIntent::Broadcast
}
}
/// Validation context. Allows reacting to incoming messages by sending out further messages.
pub trait ValidatorContext<B: BlockT> {
/// Broadcast all messages with given topic to peers that do not have it yet.
fn broadcast_topic(&mut self, topic: B::Hash, force: bool);
/// Broadcast a message to all peers that have not received it previously.
fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool);
/// Send addressed message to a peer.
fn send_message(&mut self, who: &PeerId, message: Vec<u8>);
/// Send all messages with given topic to a peer.
fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool);
}
/// Local implementation of `ValidatorContext`.
struct NetworkContext<'g, 'p, B: BlockT> {
gossip: &'g mut ConsensusGossip<B>,
protocol: &'p mut dyn Context<B>,
network: &'p mut dyn Network<B>,
engine_id: ConsensusEngineId,
}
impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
/// Broadcast all messages with given topic to peers that do not have it yet.
fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
self.gossip.broadcast_topic(self.protocol, topic, force);
self.gossip.broadcast_topic(self.network, topic, force);
}
/// Broadcast a message to all peers that have not received it previously.
fn broadcast_message(&mut self, topic: B::Hash, message: Vec<u8>, force: bool) {
self.gossip.multicast(
self.protocol,
self.network,
topic,
ConsensusMessage{ data: message, engine_id: self.engine_id.clone() },
force,
@@ -131,20 +93,17 @@ impl<'g, 'p, B: BlockT> ValidatorContext<B> for NetworkContext<'g, 'p, B> {
/// Send addressed message to a peer.
fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
self.protocol.send_consensus(who.clone(), vec![ConsensusMessage {
engine_id: self.engine_id,
data: message,
}]);
self.network.write_notification(who.clone(), self.engine_id, message);
}
/// Send all messages with given topic to a peer.
fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
self.gossip.send_topic(self.protocol, who, topic, self.engine_id, force);
self.gossip.send_topic(self.network, who, topic, self.engine_id, force);
}
}
fn propagate<'a, B: BlockT, I>(
protocol: &mut dyn Context<B>,
network: &mut dyn Network<B>,
messages: I,
intent: MessageIntent,
peers: &mut HashMap<PeerId, PeerConsensus<B::Hash>>,
@@ -168,7 +127,6 @@ fn propagate<'a, B: BlockT, I>(
};
for (id, ref mut peer) in peers.iter_mut() {
let mut batch = Vec::new();
for (message_hash, topic, message) in messages.clone() {
let intent = match intent {
MessageIntent::Broadcast { .. } =>
@@ -195,38 +153,8 @@ fn propagate<'a, B: BlockT, I>(
peer.known_messages.insert(message_hash.clone());
trace!(target: "gossip", "Propagating to {}: {:?}", id, message);
batch.push(message.clone())
network.write_notification(id.clone(), message.engine_id, message.data.clone());
}
protocol.send_consensus(id.clone(), batch);
}
}
/// Validates consensus messages.
pub trait Validator<B: BlockT>: Send + Sync {
/// New peer is connected.
fn new_peer(&self, _context: &mut dyn ValidatorContext<B>, _who: &PeerId, _roles: Roles) {
}
/// New connection is dropped.
fn peer_disconnected(&self, _context: &mut dyn ValidatorContext<B>, _who: &PeerId) {
}
/// Validate consensus message.
fn validate(
&self,
context: &mut dyn ValidatorContext<B>,
sender: &PeerId,
data: &[u8]
) -> ValidationResult<B::Hash>;
/// Produce a closure for validating messages on a given topic.
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
Box::new(move |_topic, _data| false)
}
/// Produce a closure for filtering egress messages.
fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &B::Hash, &[u8]) -> bool + 'a> {
Box::new(move |_who, _intent, _topic, _data| true)
}
}
@@ -256,14 +184,14 @@ impl<B: BlockT> ConsensusGossip<B> {
/// Register message validator for a message type.
pub fn register_validator(
&mut self,
protocol: &mut dyn Context<B>,
network: &mut dyn Network<B>,
engine_id: ConsensusEngineId,
validator: Arc<dyn Validator<B>>
) {
self.register_validator_internal(engine_id, validator.clone());
let peers: Vec<_> = self.peers.iter().map(|(id, peer)| (id.clone(), peer.roles)).collect();
for (id, roles) in peers {
let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() };
validator.new_peer(&mut context, &id, roles);
}
}
@@ -273,7 +201,7 @@ impl<B: BlockT> ConsensusGossip<B> {
}
/// Handle new connected peer.
pub fn new_peer(&mut self, protocol: &mut dyn Context<B>, who: PeerId, roles: Roles) {
pub fn new_peer(&mut self, network: &mut dyn Network<B>, who: PeerId, roles: Roles) {
// light nodes are not valid targets for consensus gossip messages
if !roles.is_full() {
return;
@@ -285,7 +213,7 @@ impl<B: BlockT> ConsensusGossip<B> {
roles,
});
for (engine_id, v) in self.validators.clone() {
let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() };
v.new_peer(&mut context, &who, roles);
}
}
@@ -322,37 +250,37 @@ impl<B: BlockT> ConsensusGossip<B> {
}
/// Call when a peer has been disconnected to stop tracking gossip status.
pub fn peer_disconnected(&mut self, protocol: &mut dyn Context<B>, who: PeerId) {
pub fn peer_disconnected(&mut self, network: &mut dyn Network<B>, who: PeerId) {
for (engine_id, v) in self.validators.clone() {
let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() };
v.peer_disconnected(&mut context, &who);
}
}
/// Perform periodic maintenance
pub fn tick(&mut self, protocol: &mut dyn Context<B>) {
pub fn tick(&mut self, network: &mut dyn Network<B>) {
self.collect_garbage();
if time::Instant::now() >= self.next_broadcast {
self.rebroadcast(protocol);
self.rebroadcast(network);
self.next_broadcast = time::Instant::now() + REBROADCAST_INTERVAL;
}
}
/// Rebroadcast all messages to all peers.
fn rebroadcast(&mut self, protocol: &mut dyn Context<B>) {
fn rebroadcast(&mut self, network: &mut dyn Network<B>) {
let messages = self.messages.iter()
.map(|entry| (&entry.message_hash, &entry.topic, &entry.message));
propagate(protocol, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators);
propagate(network, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators);
}
/// Broadcast all messages with given topic.
pub fn broadcast_topic(&mut self, protocol: &mut dyn Context<B>, topic: B::Hash, force: bool) {
pub fn broadcast_topic(&mut self, network: &mut dyn Network<B>, topic: B::Hash, force: bool) {
let messages = self.messages.iter()
.filter_map(|entry|
if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None }
);
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() };
propagate(protocol, messages, intent, &mut self.peers, &self.validators);
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
propagate(network, messages, intent, &mut self.peers, &self.validators);
}
/// Prune old or no longer relevant consensus messages. Provide a predicate
@@ -420,7 +348,7 @@ impl<B: BlockT> ConsensusGossip<B> {
/// in all other cases.
pub fn on_incoming(
&mut self,
protocol: &mut dyn Context<B>,
network: &mut dyn Network<B>,
who: PeerId,
messages: Vec<ConsensusMessage>,
) {
@@ -430,7 +358,7 @@ impl<B: BlockT> ConsensusGossip<B> {
if self.known_messages.contains(&message_hash) {
trace!(target:"gossip", "Ignored already known message from {}", who);
protocol.report_peer(who.clone(), rep::DUPLICATE_GOSSIP);
network.report_peer(who.clone(), rep::DUPLICATE_GOSSIP);
continue;
}
@@ -439,7 +367,7 @@ impl<B: BlockT> ConsensusGossip<B> {
let validation = self.validators.get(&engine_id)
.cloned()
.map(|v| {
let mut context = NetworkContext { gossip: self, protocol, engine_id };
let mut context = NetworkContext { gossip: self, network, engine_id };
v.validate(&mut context, &who, &message.data)
});
@@ -449,14 +377,14 @@ impl<B: BlockT> ConsensusGossip<B> {
Some(ValidationResult::Discard) => None,
None => {
trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who);
protocol.report_peer(who.clone(), rep::UNKNOWN_GOSSIP);
protocol.disconnect_peer(who.clone());
network.report_peer(who.clone(), rep::UNKNOWN_GOSSIP);
network.disconnect_peer(who.clone());
continue;
}
};
if let Some((topic, keep)) = validation_result {
protocol.report_peer(who.clone(), rep::GOSSIP_SUCCESS);
network.report_peer(who.clone(), rep::GOSSIP_SUCCESS);
if let Some(ref mut peer) = self.peers.get_mut(&who) {
peer.known_messages.insert(message_hash);
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) {
@@ -479,7 +407,7 @@ impl<B: BlockT> ConsensusGossip<B> {
}
} else {
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
protocol.report_peer(who.clone(), rep::UNREGISTERED_TOPIC);
network.report_peer(who.clone(), rep::UNREGISTERED_TOPIC);
}
} else {
trace!(target:"gossip", "Handled valid one hop message from peer {}", who);
@@ -490,7 +418,7 @@ impl<B: BlockT> ConsensusGossip<B> {
/// Send all messages with given topic to a peer.
pub fn send_topic(
&mut self,
protocol: &mut dyn Context<B>,
network: &mut dyn Network<B>,
who: &PeerId,
topic: B::Hash,
engine_id: ConsensusEngineId,
@@ -503,7 +431,6 @@ impl<B: BlockT> ConsensusGossip<B> {
};
if let Some(ref mut peer) = self.peers.get_mut(who) {
let mut batch = Vec::new();
for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) {
let intent = if force {
MessageIntent::ForcedBroadcast
@@ -522,34 +449,30 @@ impl<B: BlockT> ConsensusGossip<B> {
peer.known_messages.insert(entry.message_hash.clone());
trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message);
batch.push(ConsensusMessage {
engine_id: engine_id.clone(),
data: entry.message.data.clone(),
});
network.write_notification(who.clone(), engine_id, entry.message.data.clone());
}
protocol.send_consensus(who.clone(), batch);
}
}
/// Multicast a message to all peers.
pub fn multicast(
&mut self,
protocol: &mut dyn Context<B>,
network: &mut dyn Network<B>,
topic: B::Hash,
message: ConsensusMessage,
force: bool,
) {
let message_hash = HashFor::<B>::hash(&message.data);
self.register_message_hashed(message_hash, topic, message.clone(), None);
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() };
propagate(protocol, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators);
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
propagate(network, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators);
}
/// Send addressed message to a peer. The message is not kept or multicast
/// later on.
pub fn send_message(
&mut self,
protocol: &mut dyn Context<B>,
network: &mut dyn Network<B>,
who: &PeerId,
message: ConsensusMessage,
) {
@@ -563,29 +486,7 @@ impl<B: BlockT> ConsensusGossip<B> {
trace!(target: "gossip", "Sending direct to {}: {:?}", who, message);
peer.known_messages.insert(message_hash);
protocol.send_consensus(who.clone(), vec![message.clone()]);
}
}
/// A gossip message validator that discards all messages.
pub struct DiscardAll;
impl<B: BlockT> Validator<B> for DiscardAll {
fn validate(
&self,
_context: &mut dyn ValidatorContext<B>,
_sender: &PeerId,
_data: &[u8],
) -> ValidationResult<B::Hash> {
ValidationResult::Discard
}
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(B::Hash, &[u8]) -> bool + 'a> {
Box::new(move |_topic, _data| true)
}
fn message_allowed<'a>(&'a self) -> Box<dyn FnMut(&PeerId, MessageIntent, &B::Hash, &[u8]) -> bool + 'a> {
Box::new(move |_who, _intent, _topic, _data| false)
network.write_notification(who.clone(), message.engine_id, message.data);
}
}