diff --git a/substrate/client/network-gossip/src/bridge.rs b/substrate/client/network-gossip/src/bridge.rs index 85e06a1d6f..0b3a58bba2 100644 --- a/substrate/client/network-gossip/src/bridge.rs +++ b/substrate/client/network-gossip/src/bridge.rs @@ -17,7 +17,6 @@ use crate::{Network, Validator}; use crate::state_machine::{ConsensusGossip, TopicNotification, PERIODIC_MAINTENANCE_INTERVAL}; -use sc_network::message::generic::ConsensusMessage; use sc_network::{Event, ReputationChange}; use futures::{prelude::*, channel::mpsc}; @@ -77,12 +76,7 @@ impl GossipEngine { topic: B::Hash, message: Vec, ) { - let message = ConsensusMessage { - engine_id: self.engine_id, - data: message, - }; - - self.state_machine.register_message(topic, message); + self.state_machine.register_message(topic, self.engine_id, message); } /// Broadcast all messages with given topic. @@ -114,22 +108,14 @@ impl GossipEngine { message: Vec, force: bool, ) { - let message = ConsensusMessage { - engine_id: self.engine_id, - data: message, - }; - - self.state_machine.multicast(&mut *self.network, topic, message, force) + self.state_machine.multicast(&mut *self.network, topic, self.engine_id, message, force) } /// Send addressed message to the given peers. The message is not kept or multicast /// later on. pub fn send_message(&mut self, who: Vec, data: Vec) { for who in &who { - self.state_machine.send_message(&mut *self.network, who, ConsensusMessage { - engine_id: self.engine_id, - data: data.clone(), - }); + self.state_machine.send_message(&mut *self.network, who, self.engine_id, data.clone()); } } @@ -170,9 +156,7 @@ impl Future for GossipEngine { remote, messages.into_iter() .filter_map(|(engine, data)| if engine == engine_id { - Some(ConsensusMessage { - engine_id: engine, data: data.to_vec(), - }) + Some((engine, data.to_vec())) } else { None }) .collect() ); diff --git a/substrate/client/network-gossip/src/state_machine.rs b/substrate/client/network-gossip/src/state_machine.rs index 675f021365..a800f1e3aa 100644 --- a/substrate/client/network-gossip/src/state_machine.rs +++ b/substrate/client/network-gossip/src/state_machine.rs @@ -26,7 +26,6 @@ use lru::LruCache; use libp2p::PeerId; use sp_runtime::traits::{Block as BlockT, Hash, HashFor}; use sp_runtime::ConsensusEngineId; -pub use sc_network::message::generic::{Message, ConsensusMessage}; use sc_network::config::Roles; use wasm_timer::Instant; @@ -67,7 +66,8 @@ pub struct TopicNotification { struct MessageEntry { message_hash: B::Hash, topic: B::Hash, - message: ConsensusMessage, + engine_id: ConsensusEngineId, + message: Vec, sender: Option, } @@ -89,7 +89,8 @@ impl<'g, 'p, B: BlockT> ValidatorContext for NetworkContext<'g, 'p, B> { self.gossip.multicast( self.network, topic, - ConsensusMessage{ data: message, engine_id: self.engine_id.clone() }, + self.engine_id.clone(), + message, force, ); } @@ -113,11 +114,10 @@ fn propagate<'a, B: BlockT, I>( validators: &HashMap>>, ) // (msg_hash, topic, message) - where I: Clone + IntoIterator, + where I: Clone + IntoIterator)>, { let mut check_fns = HashMap::new(); - let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, message: &ConsensusMessage| { - let engine_id = message.engine_id; + let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, engine_id: ConsensusEngineId, message: &Vec| { let check_fn = match check_fns.entry(engine_id) { Entry::Occupied(entry) => entry.into_mut(), Entry::Vacant(vacant) => match validators.get(&engine_id) { @@ -126,11 +126,11 @@ fn propagate<'a, B: BlockT, I>( } }; - (check_fn)(who, intent, topic, &message.data) + (check_fn)(who, intent, topic, &message) }; for (id, ref mut peer) in peers.iter_mut() { - for (message_hash, topic, message) in messages.clone() { + for (message_hash, topic, engine_id, message) in messages.clone() { let intent = match intent { MessageIntent::Broadcast { .. } => if peer.known_messages.contains(&message_hash) { @@ -149,14 +149,14 @@ fn propagate<'a, B: BlockT, I>( other => other, }; - if !message_allowed(id, intent, &topic, &message) { + if !message_allowed(id, intent, &topic, engine_id, &message) { continue; } peer.known_messages.insert(message_hash.clone()); trace!(target: "gossip", "Propagating to {}: {:?}", id, message); - network.write_notification(id.clone(), message.engine_id, message.data.clone()); + network.write_notification(id.clone(), engine_id, message.clone()); } } } @@ -225,13 +225,15 @@ impl ConsensusGossip { &mut self, message_hash: B::Hash, topic: B::Hash, - message: ConsensusMessage, + engine_id: ConsensusEngineId, + message: Vec, sender: Option, ) { if self.known_messages.put(message_hash.clone(), ()).is_none() { self.messages.push(MessageEntry { message_hash, topic, + engine_id, message, sender, }); @@ -246,10 +248,11 @@ impl ConsensusGossip { pub fn register_message( &mut self, topic: B::Hash, - message: ConsensusMessage, + engine_id: ConsensusEngineId, + message: Vec, ) { - let message_hash = HashFor::::hash(&message.data[..]); - self.register_message_hashed(message_hash, topic, message, None); + let message_hash = HashFor::::hash(&message[..]); + self.register_message_hashed(message_hash, topic, engine_id, message, None); } /// Call when a peer has been disconnected to stop tracking gossip status. @@ -273,7 +276,7 @@ impl ConsensusGossip { /// Rebroadcast all messages to all peers. fn rebroadcast(&mut self, network: &mut dyn Network) { let messages = self.messages.iter() - .map(|entry| (&entry.message_hash, &entry.topic, &entry.message)); + .map(|entry| (&entry.message_hash, &entry.topic, entry.engine_id, &entry.message)); propagate(network, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators); } @@ -281,7 +284,9 @@ impl ConsensusGossip { pub fn broadcast_topic(&mut self, network: &mut dyn Network, topic: B::Hash, force: bool) { let messages = self.messages.iter() .filter_map(|entry| - if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None } + if entry.topic == topic { + Some((&entry.message_hash, &entry.topic, entry.engine_id, &entry.message)) + } else { None } ); let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; propagate(network, messages, intent, &mut self.peers, &self.validators); @@ -301,7 +306,7 @@ impl ConsensusGossip { let mut check_fns = HashMap::new(); let mut message_expired = move |entry: &MessageEntry| { - let engine_id = entry.message.engine_id; + let engine_id = entry.engine_id; let check_fn = match check_fns.entry(engine_id) { Entry::Occupied(entry) => entry.into_mut(), Entry::Vacant(vacant) => match validators.get(&engine_id) { @@ -310,7 +315,7 @@ impl ConsensusGossip { } }; - (check_fn)(entry.topic, &entry.message.data) + (check_fn)(entry.topic, &entry.message) }; self.messages.retain(|entry| !message_expired(entry)); @@ -332,10 +337,10 @@ impl ConsensusGossip { { let (tx, rx) = mpsc::unbounded(); for entry in self.messages.iter_mut() - .filter(|e| e.topic == topic && e.message.engine_id == engine_id) + .filter(|e| e.topic == topic && e.engine_id == engine_id) { tx.unbounded_send(TopicNotification { - message: entry.message.data.clone(), + message: entry.message.clone(), sender: entry.sender.clone(), }) .expect("receiver known to be live; qed"); @@ -346,22 +351,22 @@ impl ConsensusGossip { rx } - /// Handle an incoming ConsensusMessage for topic by who via protocol. Discard message if topic - /// already known, the message is old, its source peers isn't a registered peer or the connection - /// to them is broken. Return `Some(topic, message)` if it was added to the internal queue, `None` + /// Handle an incoming message for topic by who via protocol. Discard message if topic already + /// known, the message is old, its source peers isn't a registered peer or the connection to + /// them is broken. Return `Some(topic, message)` if it was added to the internal queue, `None` /// in all other cases. pub fn on_incoming( &mut self, network: &mut dyn Network, who: PeerId, - messages: Vec, + messages: Vec<(ConsensusEngineId, Vec)>, ) { if !messages.is_empty() { trace!(target: "gossip", "Received {} messages from peer {}", messages.len(), who); } - for message in messages { - let message_hash = HashFor::::hash(&message.data[..]); + for (engine_id, message) in messages { + let message_hash = HashFor::::hash(&message[..]); if self.known_messages.contains(&message_hash) { trace!(target:"gossip", "Ignored already known message from {}", who); @@ -369,13 +374,12 @@ impl ConsensusGossip { continue; } - let engine_id = message.engine_id; // validate the message let validation = self.validators.get(&engine_id) .cloned() .map(|v| { let mut context = NetworkContext { gossip: self, network, engine_id }; - v.validate(&mut context, &who, &message.data) + v.validate(&mut context, &who, &message) }); let validation_result = match validation { @@ -398,7 +402,7 @@ impl ConsensusGossip { debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic); entry.get_mut().retain(|sink| { if let Err(e) = sink.unbounded_send(TopicNotification { - message: message.data.clone(), + message: message.clone(), sender: Some(who.clone()) }) { trace!(target: "gossip", "Error broadcasting message notification: {:?}", e); @@ -410,7 +414,7 @@ impl ConsensusGossip { } } if keep { - self.register_message_hashed(message_hash, topic, message, Some(who.clone())); + self.register_message_hashed(message_hash, topic, engine_id, message, Some(who.clone())); } } else { trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); @@ -438,7 +442,7 @@ impl ConsensusGossip { }; if let Some(ref mut peer) = self.peers.get_mut(who) { - for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) { + for entry in self.messages.iter().filter(|m| m.topic == topic && m.engine_id == engine_id) { let intent = if force { MessageIntent::ForcedBroadcast } else { @@ -449,14 +453,14 @@ impl ConsensusGossip { continue; } - if !message_allowed(who, intent, &entry.topic, &entry.message.data) { + if !message_allowed(who, intent, &entry.topic, &entry.message) { continue; } peer.known_messages.insert(entry.message_hash.clone()); trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message); - network.write_notification(who.clone(), engine_id, entry.message.data.clone()); + network.write_notification(who.clone(), engine_id, entry.message.clone()); } } } @@ -466,13 +470,14 @@ impl ConsensusGossip { &mut self, network: &mut dyn Network, topic: B::Hash, - message: ConsensusMessage, + engine_id: ConsensusEngineId, + message: Vec, force: bool, ) { - let message_hash = HashFor::::hash(&message.data); - self.register_message_hashed(message_hash, topic, message.clone(), None); + let message_hash = HashFor::::hash(&message); + self.register_message_hashed(message_hash, topic, engine_id, message.clone(), None); let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; - propagate(network, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators); + propagate(network, iter::once((&message_hash, &topic, engine_id, &message)), intent, &mut self.peers, &self.validators); } /// Send addressed message to a peer. The message is not kept or multicast @@ -481,19 +486,20 @@ impl ConsensusGossip { &mut self, network: &mut dyn Network, who: &PeerId, - message: ConsensusMessage, + engine_id: ConsensusEngineId, + message: Vec, ) { let peer = match self.peers.get_mut(who) { None => return, Some(peer) => peer, }; - let message_hash = HashFor::::hash(&message.data); + let message_hash = HashFor::::hash(&message); trace!(target: "gossip", "Sending direct to {}: {:?}", who, message); peer.known_messages.insert(message_hash); - network.write_notification(who.clone(), message.engine_id, message.data); + network.write_notification(who.clone(), engine_id, message); } } @@ -513,7 +519,8 @@ mod tests { $consensus.messages.push(MessageEntry { message_hash: $hash, topic: $topic, - message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0]}, + engine_id: [0, 0, 0, 0], + message: $m, sender: None, }); } @@ -588,13 +595,14 @@ mod tests { let mut consensus = ConsensusGossip::::new(); consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll)); - let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; + let engine_id = [0, 0, 0, 0]; + let message = vec![4, 5, 6]; let topic = HashFor::::hash(&[1,2,3]); - consensus.register_message(topic, message.clone()); + consensus.register_message(topic, engine_id, message.clone()); let mut stream = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic)); - assert_eq!(stream.next(), Some(TopicNotification { message: message.data, sender: None })); + assert_eq!(stream.next(), Some(TopicNotification { message: message, sender: None })); } #[test] @@ -602,11 +610,11 @@ mod tests { let mut consensus = ConsensusGossip::::new(); let topic = [1; 32].into(); - let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; - let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; + let msg_a = vec![1, 2, 3]; + let msg_b = vec![4, 5, 6]; - consensus.register_message(topic, msg_a); - consensus.register_message(topic, msg_b); + consensus.register_message(topic, [0, 0, 0, 0], msg_a); + consensus.register_message(topic, [0, 0, 0, 0], msg_b); assert_eq!(consensus.messages.len(), 2); } @@ -616,17 +624,16 @@ mod tests { let mut consensus = ConsensusGossip::::new(); consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll)); - let data = vec![4, 5, 6]; - let message = ConsensusMessage { data: data.clone(), engine_id: [0, 0, 0, 0] }; + let message = vec![4, 5, 6]; let topic = HashFor::::hash(&[1, 2, 3]); - consensus.register_message(topic, message.clone()); + consensus.register_message(topic, [0, 0, 0, 0], message.clone()); let mut stream1 = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic)); let mut stream2 = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic)); - assert_eq!(stream1.next(), Some(TopicNotification { message: data.clone(), sender: None })); - assert_eq!(stream2.next(), Some(TopicNotification { message: data, sender: None })); + assert_eq!(stream1.next(), Some(TopicNotification { message: message.clone(), sender: None })); + assert_eq!(stream2.next(), Some(TopicNotification { message, sender: None })); } #[test] @@ -635,11 +642,11 @@ mod tests { consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll)); let topic = [1; 32].into(); - let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; - let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 1] }; + let msg_a = vec![1, 2, 3]; + let msg_b = vec![4, 5, 6]; - consensus.register_message(topic, msg_a); - consensus.register_message(topic, msg_b); + consensus.register_message(topic, [0, 0, 0, 0], msg_a); + consensus.register_message(topic, [0, 0, 0, 1], msg_b); let mut stream = block_on_stream(consensus.messages_for([0, 0, 0, 0], topic)); diff --git a/substrate/client/network/src/lib.rs b/substrate/client/network/src/lib.rs index a5397a4e3e..f29fb00a4f 100644 --- a/substrate/client/network/src/lib.rs +++ b/substrate/client/network/src/lib.rs @@ -183,13 +183,6 @@ pub use libp2p::{Multiaddr, PeerId}; #[doc(inline)] pub use libp2p::multiaddr; -// Note: these re-exports shouldn't be part of the public API of the crate and will be removed in -// the future. -#[doc(hidden)] -pub use protocol::message; -#[doc(hidden)] -pub use protocol::message::Status as StatusMessage; - pub use sc_peerset::ReputationChange; /// Extension trait for `NetworkBehaviour` that also accepts discovering nodes. diff --git a/substrate/client/network/src/protocol/generic_proto/tests.rs b/substrate/client/network/src/protocol/generic_proto/tests.rs index b8436e2c7f..632e06ec39 100644 --- a/substrate/client/network/src/protocol/generic_proto/tests.rs +++ b/substrate/client/network/src/protocol/generic_proto/tests.rs @@ -26,7 +26,7 @@ use libp2p::{PeerId, Multiaddr, Transport}; use rand::seq::SliceRandom; use std::{error, io, task::Context, task::Poll, time::Duration}; use std::collections::HashSet; -use crate::message::{generic::BlockResponse, Message}; +use crate::protocol::message::{generic::BlockResponse, Message}; use crate::protocol::generic_proto::{GenericProto, GenericProtoOut}; use sp_test_primitives::Block; diff --git a/substrate/client/network/src/protocol/light_dispatch.rs b/substrate/client/network/src/protocol/light_dispatch.rs index 8146172e15..22d26075b3 100644 --- a/substrate/client/network/src/protocol/light_dispatch.rs +++ b/substrate/client/network/src/protocol/light_dispatch.rs @@ -689,7 +689,7 @@ pub mod tests { ChangesProof, RemoteCallRequest, RemoteReadRequest, RemoteReadChildRequest, RemoteChangesRequest, RemoteBodyRequest}; use crate::config::Roles; - use crate::message::{self, BlockAttributes, Direction, FromBlock, RequestId}; + use crate::protocol::message::{self, BlockAttributes, Direction, FromBlock, RequestId}; use libp2p::PeerId; use super::{REQUEST_TIMEOUT, LightDispatch, LightDispatchNetwork, RequestData, StorageProof}; use sp_test_primitives::{Block, Header}; diff --git a/substrate/client/network/src/protocol/sync/blocks.rs b/substrate/client/network/src/protocol/sync/blocks.rs index 31b798ace2..359287701e 100644 --- a/substrate/client/network/src/protocol/sync/blocks.rs +++ b/substrate/client/network/src/protocol/sync/blocks.rs @@ -212,7 +212,7 @@ impl BlockCollection { #[cfg(test)] mod test { use super::{BlockCollection, BlockData, BlockRangeState}; - use crate::{message, PeerId}; + use crate::{protocol::message, PeerId}; use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper}; use sp_core::H256;