diff --git a/substrate/client/network-gossip/src/bridge.rs b/substrate/client/network-gossip/src/bridge.rs
index 7eeb33131d..2b0b19b876 100644
--- a/substrate/client/network-gossip/src/bridge.rs
+++ b/substrate/client/network-gossip/src/bridge.rs
@@ -14,10 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see .
-use crate::Network;
-use crate::state_machine::{ConsensusGossip, Validator, TopicNotification};
+use crate::{Network, Validator};
+use crate::state_machine::{ConsensusGossip, TopicNotification};
-use sc_network::Context;
use sc_network::message::generic::ConsensusMessage;
use sc_network::{Event, ReputationChange};
@@ -36,37 +35,29 @@ pub struct GossipEngine {
struct GossipEngineInner {
state_machine: ConsensusGossip,
- context: Box + Send>,
- context_ext: Box + Send>,
+ network: Box + Send>,
}
impl GossipEngine {
/// Create a new instance.
pub fn new + Send + Clone + 'static>(
- network: N,
+ mut network: N,
executor: &impl futures::task::Spawn,
engine_id: ConsensusEngineId,
validator: Arc>,
) -> Self where B: 'static {
let mut state_machine = ConsensusGossip::new();
- let mut context = Box::new(ContextOverService {
- network: network.clone(),
- });
- let context_ext = Box::new(ContextOverService {
- network: network.clone(),
- });
// We grab the event stream before registering the notifications protocol, otherwise we
// might miss events.
let event_stream = network.event_stream();
network.register_notifications_protocol(engine_id);
- state_machine.register_validator(&mut *context, engine_id, validator);
+ state_machine.register_validator(&mut network, engine_id, validator);
let inner = Arc::new(Mutex::new(GossipEngineInner {
state_machine,
- context,
- context_ext,
+ network: Box::new(network),
}));
let gossip_engine = GossipEngine {
@@ -82,7 +73,7 @@ impl GossipEngine {
if let Some(inner) = inner.upgrade() {
let mut inner = inner.lock();
let inner = &mut *inner;
- inner.state_machine.tick(&mut *inner.context);
+ inner.state_machine.tick(&mut *inner.network);
} else {
// We reach this branch if the `Arc` has no reference
// left. We can now let the task end.
@@ -107,7 +98,7 @@ impl GossipEngine {
}
let mut inner = inner.lock();
let inner = &mut *inner;
- inner.state_machine.new_peer(&mut *inner.context, remote, roles);
+ inner.state_machine.new_peer(&mut *inner.network, remote, roles);
}
Event::NotificationsStreamClosed { remote, engine_id: msg_engine_id } => {
if msg_engine_id != engine_id {
@@ -115,13 +106,13 @@ impl GossipEngine {
}
let mut inner = inner.lock();
let inner = &mut *inner;
- inner.state_machine.peer_disconnected(&mut *inner.context, remote);
+ inner.state_machine.peer_disconnected(&mut *inner.network, remote);
},
Event::NotificationsReceived { remote, messages } => {
let mut inner = inner.lock();
let inner = &mut *inner;
inner.state_machine.on_incoming(
- &mut *inner.context,
+ &mut *inner.network,
remote,
messages.into_iter()
.filter_map(|(engine, data)| if engine == engine_id {
@@ -144,7 +135,7 @@ impl GossipEngine {
}
pub fn report(&self, who: PeerId, reputation: ReputationChange) {
- self.inner.lock().context.report_peer(who, reputation);
+ self.inner.lock().network.report_peer(who, reputation);
}
/// Registers a message without propagating it to any peers. The message
@@ -169,7 +160,7 @@ impl GossipEngine {
pub fn broadcast_topic(&self, topic: B::Hash, force: bool) {
let mut inner = self.inner.lock();
let inner = &mut *inner;
- inner.state_machine.broadcast_topic(&mut *inner.context, topic, force);
+ inner.state_machine.broadcast_topic(&mut *inner.network, topic, force);
}
/// Get data of valid, incoming messages for a topic (but might have expired meanwhile).
@@ -188,7 +179,7 @@ impl GossipEngine {
) {
let mut inner = self.inner.lock();
let inner = &mut *inner;
- inner.state_machine.send_topic(&mut *inner.context, who, topic, self.engine_id, force)
+ inner.state_machine.send_topic(&mut *inner.network, who, topic, self.engine_id, force)
}
/// Multicast a message to all peers.
@@ -205,7 +196,7 @@ impl GossipEngine {
let mut inner = self.inner.lock();
let inner = &mut *inner;
- inner.state_machine.multicast(&mut *inner.context, topic, message, force)
+ inner.state_machine.multicast(&mut *inner.network, topic, message, force)
}
/// Send addressed message to the given peers. The message is not kept or multicast
@@ -215,7 +206,7 @@ impl GossipEngine {
let inner = &mut *inner;
for who in &who {
- inner.state_machine.send_message(&mut *inner.context, who, ConsensusMessage {
+ inner.state_machine.send_message(&mut *inner.network, who, ConsensusMessage {
engine_id: self.engine_id,
data: data.clone(),
});
@@ -227,7 +218,7 @@ impl GossipEngine {
/// Note: this method isn't strictly related to gossiping and should eventually be moved
/// somewhere else.
pub fn announce(&self, block: B::Hash, associated_data: Vec) {
- self.inner.lock().context_ext.announce(block, associated_data);
+ self.inner.lock().network.announce(block, associated_data);
}
}
@@ -239,40 +230,3 @@ impl Clone for GossipEngine {
}
}
}
-
-struct ContextOverService {
- network: N,
-}
-
-impl> Context for ContextOverService {
- fn report_peer(&mut self, who: PeerId, reputation: ReputationChange) {
- self.network.report_peer(who, reputation);
- }
-
- fn disconnect_peer(&mut self, who: PeerId) {
- self.network.disconnect_peer(who)
- }
-
- fn send_consensus(&mut self, who: PeerId, messages: Vec) {
- for message in messages {
- self.network.write_notification(who.clone(), message.engine_id, message.data);
- }
- }
-
- fn send_chain_specific(&mut self, _: PeerId, _: Vec) {
- log::error!(
- target: "sub-libp2p",
- "send_chain_specific has been called in a context where it shouldn't"
- );
- }
-}
-
-trait ContextExt {
- fn announce(&self, block: B::Hash, associated_data: Vec);
-}
-
-impl> ContextExt for ContextOverService {
- fn announce(&self, block: B::Hash, associated_data: Vec) {
- Network::announce(&self.network, block, associated_data)
- }
-}
diff --git a/substrate/client/network-gossip/src/lib.rs b/substrate/client/network-gossip/src/lib.rs
index f7b360f939..705a27210a 100644
--- a/substrate/client/network-gossip/src/lib.rs
+++ b/substrate/client/network-gossip/src/lib.rs
@@ -55,9 +55,8 @@
//! used to inform peers of a current view of protocol state.
pub use self::bridge::GossipEngine;
-pub use self::state_machine::{TopicNotification, MessageIntent};
-pub use self::state_machine::{Validator, ValidatorContext, ValidationResult};
-pub use self::state_machine::DiscardAll;
+pub use self::state_machine::TopicNotification;
+pub use self::validator::{DiscardAll, MessageIntent, Validator, ValidatorContext, ValidationResult};
use futures::prelude::*;
use sc_network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange};
@@ -66,6 +65,7 @@ use std::sync::Arc;
mod bridge;
mod state_machine;
+mod validator;
/// Abstraction over a network.
pub trait Network {
diff --git a/substrate/client/network-gossip/src/state_machine.rs b/substrate/client/network-gossip/src/state_machine.rs
index 3e54e452db..d1931b1bd2 100644
--- a/substrate/client/network-gossip/src/state_machine.rs
+++ b/substrate/client/network-gossip/src/state_machine.rs
@@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see .
+use crate::{Network, MessageIntent, Validator, ValidatorContext, ValidationResult};
+
use std::collections::{HashMap, HashSet, hash_map::Entry};
use std::sync::Arc;
use std::iter;
@@ -25,7 +27,6 @@ use libp2p::PeerId;
use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
use sp_runtime::ConsensusEngineId;
pub use sc_network::message::generic::{Message, ConsensusMessage};
-use sc_network::Context;
use sc_network::config::Roles;
// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
@@ -67,62 +68,23 @@ struct MessageEntry {
sender: Option,
}
-/// The reason for sending out the message.
-#[derive(Eq, PartialEq, Copy, Clone)]
-#[cfg_attr(test, derive(Debug))]
-pub enum MessageIntent {
- /// Requested broadcast.
- Broadcast,
- /// Requested broadcast to all peers.
- ForcedBroadcast,
- /// Periodic rebroadcast of all messages to all peers.
- PeriodicRebroadcast,
-}
-
-/// Message validation result.
-pub enum ValidationResult {
- /// Message should be stored and propagated under given topic.
- ProcessAndKeep(H),
- /// Message should be processed, but not propagated.
- ProcessAndDiscard(H),
- /// Message should be ignored.
- Discard,
-}
-
-impl MessageIntent {
- fn broadcast() -> MessageIntent {
- MessageIntent::Broadcast
- }
-}
-
-/// Validation context. Allows reacting to incoming messages by sending out further messages.
-pub trait ValidatorContext {
- /// Broadcast all messages with given topic to peers that do not have it yet.
- fn broadcast_topic(&mut self, topic: B::Hash, force: bool);
- /// Broadcast a message to all peers that have not received it previously.
- fn broadcast_message(&mut self, topic: B::Hash, message: Vec, force: bool);
- /// Send addressed message to a peer.
- fn send_message(&mut self, who: &PeerId, message: Vec);
- /// Send all messages with given topic to a peer.
- fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool);
-}
-
+/// Local implementation of `ValidatorContext`.
struct NetworkContext<'g, 'p, B: BlockT> {
gossip: &'g mut ConsensusGossip,
- protocol: &'p mut dyn Context,
+ network: &'p mut dyn Network,
engine_id: ConsensusEngineId,
}
impl<'g, 'p, B: BlockT> ValidatorContext for NetworkContext<'g, 'p, B> {
/// Broadcast all messages with given topic to peers that do not have it yet.
fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
- self.gossip.broadcast_topic(self.protocol, topic, force);
+ self.gossip.broadcast_topic(self.network, topic, force);
}
/// Broadcast a message to all peers that have not received it previously.
fn broadcast_message(&mut self, topic: B::Hash, message: Vec, force: bool) {
self.gossip.multicast(
- self.protocol,
+ self.network,
topic,
ConsensusMessage{ data: message, engine_id: self.engine_id.clone() },
force,
@@ -131,20 +93,17 @@ impl<'g, 'p, B: BlockT> ValidatorContext for NetworkContext<'g, 'p, B> {
/// Send addressed message to a peer.
fn send_message(&mut self, who: &PeerId, message: Vec) {
- self.protocol.send_consensus(who.clone(), vec![ConsensusMessage {
- engine_id: self.engine_id,
- data: message,
- }]);
+ self.network.write_notification(who.clone(), self.engine_id, message);
}
/// Send all messages with given topic to a peer.
fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) {
- self.gossip.send_topic(self.protocol, who, topic, self.engine_id, force);
+ self.gossip.send_topic(self.network, who, topic, self.engine_id, force);
}
}
fn propagate<'a, B: BlockT, I>(
- protocol: &mut dyn Context,
+ network: &mut dyn Network,
messages: I,
intent: MessageIntent,
peers: &mut HashMap>,
@@ -168,7 +127,6 @@ fn propagate<'a, B: BlockT, I>(
};
for (id, ref mut peer) in peers.iter_mut() {
- let mut batch = Vec::new();
for (message_hash, topic, message) in messages.clone() {
let intent = match intent {
MessageIntent::Broadcast { .. } =>
@@ -195,38 +153,8 @@ fn propagate<'a, B: BlockT, I>(
peer.known_messages.insert(message_hash.clone());
trace!(target: "gossip", "Propagating to {}: {:?}", id, message);
- batch.push(message.clone())
+ network.write_notification(id.clone(), message.engine_id, message.data.clone());
}
- protocol.send_consensus(id.clone(), batch);
- }
-}
-
-/// Validates consensus messages.
-pub trait Validator: Send + Sync {
- /// New peer is connected.
- fn new_peer(&self, _context: &mut dyn ValidatorContext, _who: &PeerId, _roles: Roles) {
- }
-
- /// New connection is dropped.
- fn peer_disconnected(&self, _context: &mut dyn ValidatorContext, _who: &PeerId) {
- }
-
- /// Validate consensus message.
- fn validate(
- &self,
- context: &mut dyn ValidatorContext,
- sender: &PeerId,
- data: &[u8]
- ) -> ValidationResult;
-
- /// Produce a closure for validating messages on a given topic.
- fn message_expired<'a>(&'a self) -> Box bool + 'a> {
- Box::new(move |_topic, _data| false)
- }
-
- /// Produce a closure for filtering egress messages.
- fn message_allowed<'a>(&'a self) -> Box bool + 'a> {
- Box::new(move |_who, _intent, _topic, _data| true)
}
}
@@ -256,14 +184,14 @@ impl ConsensusGossip {
/// Register message validator for a message type.
pub fn register_validator(
&mut self,
- protocol: &mut dyn Context,
+ network: &mut dyn Network,
engine_id: ConsensusEngineId,
validator: Arc>
) {
self.register_validator_internal(engine_id, validator.clone());
let peers: Vec<_> = self.peers.iter().map(|(id, peer)| (id.clone(), peer.roles)).collect();
for (id, roles) in peers {
- let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
+ let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() };
validator.new_peer(&mut context, &id, roles);
}
}
@@ -273,7 +201,7 @@ impl ConsensusGossip {
}
/// Handle new connected peer.
- pub fn new_peer(&mut self, protocol: &mut dyn Context, who: PeerId, roles: Roles) {
+ pub fn new_peer(&mut self, network: &mut dyn Network, who: PeerId, roles: Roles) {
// light nodes are not valid targets for consensus gossip messages
if !roles.is_full() {
return;
@@ -285,7 +213,7 @@ impl ConsensusGossip {
roles,
});
for (engine_id, v) in self.validators.clone() {
- let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
+ let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() };
v.new_peer(&mut context, &who, roles);
}
}
@@ -322,37 +250,37 @@ impl ConsensusGossip {
}
/// Call when a peer has been disconnected to stop tracking gossip status.
- pub fn peer_disconnected(&mut self, protocol: &mut dyn Context, who: PeerId) {
+ pub fn peer_disconnected(&mut self, network: &mut dyn Network, who: PeerId) {
for (engine_id, v) in self.validators.clone() {
- let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() };
+ let mut context = NetworkContext { gossip: self, network, engine_id: engine_id.clone() };
v.peer_disconnected(&mut context, &who);
}
}
/// Perform periodic maintenance
- pub fn tick(&mut self, protocol: &mut dyn Context) {
+ pub fn tick(&mut self, network: &mut dyn Network) {
self.collect_garbage();
if time::Instant::now() >= self.next_broadcast {
- self.rebroadcast(protocol);
+ self.rebroadcast(network);
self.next_broadcast = time::Instant::now() + REBROADCAST_INTERVAL;
}
}
/// Rebroadcast all messages to all peers.
- fn rebroadcast(&mut self, protocol: &mut dyn Context) {
+ fn rebroadcast(&mut self, network: &mut dyn Network) {
let messages = self.messages.iter()
.map(|entry| (&entry.message_hash, &entry.topic, &entry.message));
- propagate(protocol, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators);
+ propagate(network, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators);
}
/// Broadcast all messages with given topic.
- pub fn broadcast_topic(&mut self, protocol: &mut dyn Context, topic: B::Hash, force: bool) {
+ pub fn broadcast_topic(&mut self, network: &mut dyn Network, topic: B::Hash, force: bool) {
let messages = self.messages.iter()
.filter_map(|entry|
if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None }
);
- let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() };
- propagate(protocol, messages, intent, &mut self.peers, &self.validators);
+ let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
+ propagate(network, messages, intent, &mut self.peers, &self.validators);
}
/// Prune old or no longer relevant consensus messages. Provide a predicate
@@ -420,7 +348,7 @@ impl ConsensusGossip {
/// in all other cases.
pub fn on_incoming(
&mut self,
- protocol: &mut dyn Context,
+ network: &mut dyn Network,
who: PeerId,
messages: Vec,
) {
@@ -430,7 +358,7 @@ impl ConsensusGossip {
if self.known_messages.contains(&message_hash) {
trace!(target:"gossip", "Ignored already known message from {}", who);
- protocol.report_peer(who.clone(), rep::DUPLICATE_GOSSIP);
+ network.report_peer(who.clone(), rep::DUPLICATE_GOSSIP);
continue;
}
@@ -439,7 +367,7 @@ impl ConsensusGossip {
let validation = self.validators.get(&engine_id)
.cloned()
.map(|v| {
- let mut context = NetworkContext { gossip: self, protocol, engine_id };
+ let mut context = NetworkContext { gossip: self, network, engine_id };
v.validate(&mut context, &who, &message.data)
});
@@ -449,14 +377,14 @@ impl ConsensusGossip {
Some(ValidationResult::Discard) => None,
None => {
trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who);
- protocol.report_peer(who.clone(), rep::UNKNOWN_GOSSIP);
- protocol.disconnect_peer(who.clone());
+ network.report_peer(who.clone(), rep::UNKNOWN_GOSSIP);
+ network.disconnect_peer(who.clone());
continue;
}
};
if let Some((topic, keep)) = validation_result {
- protocol.report_peer(who.clone(), rep::GOSSIP_SUCCESS);
+ network.report_peer(who.clone(), rep::GOSSIP_SUCCESS);
if let Some(ref mut peer) = self.peers.get_mut(&who) {
peer.known_messages.insert(message_hash);
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) {
@@ -479,7 +407,7 @@ impl ConsensusGossip {
}
} else {
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
- protocol.report_peer(who.clone(), rep::UNREGISTERED_TOPIC);
+ network.report_peer(who.clone(), rep::UNREGISTERED_TOPIC);
}
} else {
trace!(target:"gossip", "Handled valid one hop message from peer {}", who);
@@ -490,7 +418,7 @@ impl ConsensusGossip {
/// Send all messages with given topic to a peer.
pub fn send_topic(
&mut self,
- protocol: &mut dyn Context,
+ network: &mut dyn Network,
who: &PeerId,
topic: B::Hash,
engine_id: ConsensusEngineId,
@@ -503,7 +431,6 @@ impl ConsensusGossip {
};
if let Some(ref mut peer) = self.peers.get_mut(who) {
- let mut batch = Vec::new();
for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) {
let intent = if force {
MessageIntent::ForcedBroadcast
@@ -522,34 +449,30 @@ impl ConsensusGossip {
peer.known_messages.insert(entry.message_hash.clone());
trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message);
- batch.push(ConsensusMessage {
- engine_id: engine_id.clone(),
- data: entry.message.data.clone(),
- });
+ network.write_notification(who.clone(), engine_id, entry.message.data.clone());
}
- protocol.send_consensus(who.clone(), batch);
}
}
/// Multicast a message to all peers.
pub fn multicast(
&mut self,
- protocol: &mut dyn Context,
+ network: &mut dyn Network,
topic: B::Hash,
message: ConsensusMessage,
force: bool,
) {
let message_hash = HashFor::::hash(&message.data);
self.register_message_hashed(message_hash, topic, message.clone(), None);
- let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::broadcast() };
- propagate(protocol, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators);
+ let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
+ propagate(network, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators);
}
/// Send addressed message to a peer. The message is not kept or multicast
/// later on.
pub fn send_message(
&mut self,
- protocol: &mut dyn Context,
+ network: &mut dyn Network,
who: &PeerId,
message: ConsensusMessage,
) {
@@ -563,29 +486,7 @@ impl ConsensusGossip {
trace!(target: "gossip", "Sending direct to {}: {:?}", who, message);
peer.known_messages.insert(message_hash);
- protocol.send_consensus(who.clone(), vec![message.clone()]);
- }
-}
-
-/// A gossip message validator that discards all messages.
-pub struct DiscardAll;
-
-impl Validator for DiscardAll {
- fn validate(
- &self,
- _context: &mut dyn ValidatorContext,
- _sender: &PeerId,
- _data: &[u8],
- ) -> ValidationResult {
- ValidationResult::Discard
- }
-
- fn message_expired<'a>(&'a self) -> Box bool + 'a> {
- Box::new(move |_topic, _data| true)
- }
-
- fn message_allowed<'a>(&'a self) -> Box bool + 'a> {
- Box::new(move |_who, _intent, _topic, _data| false)
+ network.write_notification(who.clone(), message.engine_id, message.data);
}
}
diff --git a/substrate/client/network-gossip/src/validator.rs b/substrate/client/network-gossip/src/validator.rs
new file mode 100644
index 0000000000..74b5307ee9
--- /dev/null
+++ b/substrate/client/network-gossip/src/validator.rs
@@ -0,0 +1,103 @@
+// Copyright 2017-2020 Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Substrate. If not, see .
+
+use sc_network::{config::Roles, PeerId};
+use sp_runtime::traits::Block as BlockT;
+
+/// Validates consensus messages.
+pub trait Validator: Send + Sync {
+ /// New peer is connected.
+ fn new_peer(&self, _context: &mut dyn ValidatorContext, _who: &PeerId, _roles: Roles) {
+ }
+
+ /// New connection is dropped.
+ fn peer_disconnected(&self, _context: &mut dyn ValidatorContext, _who: &PeerId) {
+ }
+
+ /// Validate consensus message.
+ fn validate(
+ &self,
+ context: &mut dyn ValidatorContext,
+ sender: &PeerId,
+ data: &[u8]
+ ) -> ValidationResult;
+
+ /// Produce a closure for validating messages on a given topic.
+ fn message_expired<'a>(&'a self) -> Box bool + 'a> {
+ Box::new(move |_topic, _data| false)
+ }
+
+ /// Produce a closure for filtering egress messages.
+ fn message_allowed<'a>(&'a self) -> Box bool + 'a> {
+ Box::new(move |_who, _intent, _topic, _data| true)
+ }
+}
+
+/// Validation context. Allows reacting to incoming messages by sending out further messages.
+pub trait ValidatorContext {
+ /// Broadcast all messages with given topic to peers that do not have it yet.
+ fn broadcast_topic(&mut self, topic: B::Hash, force: bool);
+ /// Broadcast a message to all peers that have not received it previously.
+ fn broadcast_message(&mut self, topic: B::Hash, message: Vec, force: bool);
+ /// Send addressed message to a peer.
+ fn send_message(&mut self, who: &PeerId, message: Vec);
+ /// Send all messages with given topic to a peer.
+ fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool);
+}
+
+/// The reason for sending out the message.
+#[derive(Eq, PartialEq, Copy, Clone)]
+#[cfg_attr(test, derive(Debug))]
+pub enum MessageIntent {
+ /// Requested broadcast.
+ Broadcast,
+ /// Requested broadcast to all peers.
+ ForcedBroadcast,
+ /// Periodic rebroadcast of all messages to all peers.
+ PeriodicRebroadcast,
+}
+
+/// Message validation result.
+pub enum ValidationResult {
+ /// Message should be stored and propagated under given topic.
+ ProcessAndKeep(H),
+ /// Message should be processed, but not propagated.
+ ProcessAndDiscard(H),
+ /// Message should be ignored.
+ Discard,
+}
+
+/// A gossip message validator that discards all messages.
+pub struct DiscardAll;
+
+impl Validator for DiscardAll {
+ fn validate(
+ &self,
+ _context: &mut dyn ValidatorContext,
+ _sender: &PeerId,
+ _data: &[u8],
+ ) -> ValidationResult {
+ ValidationResult::Discard
+ }
+
+ fn message_expired<'a>(&'a self) -> Box bool + 'a> {
+ Box::new(move |_topic, _data| true)
+ }
+
+ fn message_allowed<'a>(&'a self) -> Box bool + 'a> {
+ Box::new(move |_who, _intent, _topic, _data| false)
+ }
+}
diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs
index 983cdd25a8..b712ebe515 100644
--- a/substrate/client/network/src/protocol.rs
+++ b/substrate/client/network/src/protocol.rs
@@ -80,8 +80,6 @@ pub(crate) const MIN_VERSION: u32 = 3;
// Maximum allowed entries in `BlockResponse`
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
-// Maximum allowed entries in `ConsensusBatch`
-const MAX_CONSENSUS_MESSAGES: usize = 256;
/// When light node connects to the full node and the full node is behind light node
/// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it unuseful
/// and disconnect to free connection slot.
@@ -327,9 +325,6 @@ pub trait Context {
/// Force disconnecting from a peer. Use this when a peer misbehaved.
fn disconnect_peer(&mut self, who: PeerId);
- /// Send a consensus message to a peer.
- fn send_consensus(&mut self, who: PeerId, messages: Vec);
-
/// Send a chain-specific message to a peer.
fn send_chain_specific(&mut self, who: PeerId, message: Vec);
}
@@ -360,35 +355,6 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B,
self.behaviour.disconnect_peer(&who)
}
- fn send_consensus(&mut self, who: PeerId, messages: Vec) {
- if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 4) {
- let mut batch = Vec::new();
- let len = messages.len();
- for (index, message) in messages.into_iter().enumerate() {
- batch.reserve(MAX_CONSENSUS_MESSAGES);
- batch.push(message);
- if batch.len() == MAX_CONSENSUS_MESSAGES || index == len - 1 {
- send_message:: (
- self.behaviour,
- &mut self.context_data.stats,
- &who,
- GenericMessage::ConsensusBatch(std::mem::replace(&mut batch, Vec::new())),
- )
- }
- }
- } else {
- // Backwards compatibility
- for message in messages {
- send_message:: (
- self.behaviour,
- &mut self.context_data.stats,
- &who,
- GenericMessage::Consensus(message)
- )
- }
- }
- }
-
fn send_chain_specific(&mut self, who: PeerId, message: Vec) {
send_message:: (
self.behaviour,