mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
client/network-gossip: Move sink IO outside of state_machine (#5669)
* client/network-gossip: Move sink IO outside of state_machine `ConsensusGossip` is supposed to be a deterministic state machine. `GossipEngine` wrapping `ConsensusGossip` should handle IO operations. This commit moves the `message_sink` IO operations to `GossipEngine`. More specifically on incoming messages a `GossipEngine` calls `ConsensusGossip::on_incoming` to validate and register the messages. `ConsensusGossip` returns the valid messages which are then forwarded by `GossipEngine` to the upper layer via the `message_sinks`. * client/network-gossip: Adjust and extend tests * Update client/network-gossip/src/bridge.rs Co-authored-by: Benjamin Kampmann <ben.kampmann@googlemail.com>
This commit is contained in:
Generated
+1
@@ -6463,6 +6463,7 @@ dependencies = [
|
||||
name = "sc-network-gossip"
|
||||
version = "0.8.0-dev"
|
||||
dependencies = [
|
||||
"async-std",
|
||||
"futures 0.3.4",
|
||||
"futures-timer 3.0.2",
|
||||
"libp2p",
|
||||
|
||||
@@ -25,4 +25,5 @@ sp-utils = { version = "2.0.0-dev", path = "../../primitives/utils" }
|
||||
wasm-timer = "0.2"
|
||||
|
||||
[dev-dependencies]
|
||||
async-std = "1.5"
|
||||
substrate-test-runtime-client = { version = "2.0.0-dev", path = "../../test-utils/runtime/client" }
|
||||
|
||||
@@ -21,9 +21,16 @@ use sc_network::{Event, ReputationChange};
|
||||
|
||||
use futures::prelude::*;
|
||||
use libp2p::PeerId;
|
||||
use log::trace;
|
||||
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
|
||||
use std::{borrow::Cow, pin::Pin, sync::Arc, task::{Context, Poll}};
|
||||
use sp_utils::mpsc::TracingUnboundedReceiver;
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::{HashMap, hash_map::Entry},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
/// Wraps around an implementation of the `Network` crate and provides gossiping capabilities on
|
||||
/// top of it.
|
||||
@@ -31,8 +38,12 @@ pub struct GossipEngine<B: BlockT> {
|
||||
state_machine: ConsensusGossip<B>,
|
||||
network: Box<dyn Network<B> + Send>,
|
||||
periodic_maintenance_interval: futures_timer::Delay,
|
||||
network_event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>,
|
||||
engine_id: ConsensusEngineId,
|
||||
|
||||
/// Incoming events from the network.
|
||||
network_event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>,
|
||||
/// Outgoing events to the consumer.
|
||||
message_sinks: HashMap<B::Hash, Vec<TracingUnboundedSender<TopicNotification>>>,
|
||||
}
|
||||
|
||||
impl<B: BlockT> Unpin for GossipEngine<B> {}
|
||||
@@ -54,8 +65,10 @@ impl<B: BlockT> GossipEngine<B> {
|
||||
state_machine: ConsensusGossip::new(validator, engine_id),
|
||||
network: Box::new(network),
|
||||
periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
|
||||
network_event_stream,
|
||||
engine_id,
|
||||
|
||||
network_event_stream,
|
||||
message_sinks: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,7 +98,15 @@ impl<B: BlockT> GossipEngine<B> {
|
||||
pub fn messages_for(&mut self, topic: B::Hash)
|
||||
-> TracingUnboundedReceiver<TopicNotification>
|
||||
{
|
||||
self.state_machine.messages_for(topic)
|
||||
let (tx, rx) = tracing_unbounded("mpsc_gossip_messages_for");
|
||||
|
||||
for notification in self.state_machine.messages_for(topic) {
|
||||
tx.unbounded_send(notification).expect("receiver known to be live; qed");
|
||||
}
|
||||
|
||||
self.message_sinks.entry(topic).or_default().push(tx);
|
||||
|
||||
rx
|
||||
}
|
||||
|
||||
/// Send all messages with given topic to a peer.
|
||||
@@ -147,16 +168,40 @@ impl<B: BlockT> Future for GossipEngine<B> {
|
||||
this.state_machine.peer_disconnected(&mut *this.network, remote);
|
||||
},
|
||||
Event::NotificationsReceived { remote, messages } => {
|
||||
let engine_id = this.engine_id.clone();
|
||||
this.state_machine.on_incoming(
|
||||
let messages = messages.into_iter().filter_map(|(engine, data)| {
|
||||
if engine == this.engine_id {
|
||||
Some(data.to_vec())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}).collect();
|
||||
|
||||
let to_forward = this.state_machine.on_incoming(
|
||||
&mut *this.network,
|
||||
remote,
|
||||
messages.into_iter()
|
||||
.filter_map(|(engine, data)| if engine == engine_id {
|
||||
Some(data.to_vec())
|
||||
} else { None })
|
||||
.collect()
|
||||
messages,
|
||||
);
|
||||
|
||||
for (topic, notification) in to_forward.into_iter() {
|
||||
if let Entry::Occupied(mut entry) = this.message_sinks.entry(topic) {
|
||||
trace!(
|
||||
target: "gossip",
|
||||
"Pushing consensus message to sinks for {}.", topic,
|
||||
);
|
||||
entry.get_mut().retain(move |sink| {
|
||||
if let Err(e) = sink.unbounded_send(notification.clone()) {
|
||||
trace!(
|
||||
target: "gossip",
|
||||
"Error broadcasting message notification: {:?}", e,
|
||||
);
|
||||
}
|
||||
!sink.is_closed()
|
||||
});
|
||||
if entry.get().is_empty() {
|
||||
entry.remove_entry();
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Event::Dht(_) => {}
|
||||
}
|
||||
@@ -169,6 +214,11 @@ impl<B: BlockT> Future for GossipEngine<B> {
|
||||
while let Poll::Ready(()) = this.periodic_maintenance_interval.poll_unpin(cx) {
|
||||
this.periodic_maintenance_interval.reset(PERIODIC_MAINTENANCE_INTERVAL);
|
||||
this.state_machine.tick(&mut *this.network);
|
||||
|
||||
this.message_sinks.retain(|_, sinks| {
|
||||
sinks.retain(|sink| !sink.is_closed());
|
||||
!sinks.is_empty()
|
||||
});
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
@@ -177,23 +227,34 @@ impl<B: BlockT> Future for GossipEngine<B> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use async_std::task::spawn;
|
||||
use crate::{ValidationResult, ValidatorContext};
|
||||
use futures::{channel::mpsc::{channel, Sender}, executor::block_on_stream};
|
||||
use sc_network::ObservedRole;
|
||||
use sp_runtime::{testing::H256, traits::{Block as BlockT}};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use substrate_test_runtime_client::runtime::Block;
|
||||
use super::*;
|
||||
|
||||
struct TestNetwork {}
|
||||
#[derive(Clone, Default)]
|
||||
struct TestNetwork {
|
||||
inner: Arc<Mutex<TestNetworkInner>>,
|
||||
}
|
||||
|
||||
impl<B: BlockT> Network<B> for Arc<TestNetwork> {
|
||||
#[derive(Clone, Default)]
|
||||
struct TestNetworkInner {
|
||||
event_senders: Vec<Sender<Event>>,
|
||||
}
|
||||
|
||||
impl<B: BlockT> Network<B> for TestNetwork {
|
||||
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
|
||||
let (_tx, rx) = futures::channel::mpsc::channel(0);
|
||||
let (tx, rx) = channel(100);
|
||||
self.inner.lock().unwrap().event_senders.push(tx);
|
||||
|
||||
// Return rx and drop tx. Thus the given channel will yield `Poll::Ready(None)` on first
|
||||
// poll.
|
||||
Box::pin(rx)
|
||||
}
|
||||
|
||||
fn report_peer(&self, _: PeerId, _: ReputationChange) {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn disconnect_peer(&self, _: PeerId) {
|
||||
@@ -211,16 +272,15 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
struct TestValidator {}
|
||||
|
||||
impl<B: BlockT> Validator<B> for TestValidator {
|
||||
struct AllowAll;
|
||||
impl Validator<Block> for AllowAll {
|
||||
fn validate(
|
||||
&self,
|
||||
_: &mut dyn ValidatorContext<B>,
|
||||
_: &PeerId,
|
||||
_: &[u8]
|
||||
) -> ValidationResult<B::Hash> {
|
||||
unimplemented!();
|
||||
_context: &mut dyn ValidatorContext<Block>,
|
||||
_sender: &PeerId,
|
||||
_data: &[u8],
|
||||
) -> ValidationResult<H256> {
|
||||
ValidationResult::ProcessAndKeep(H256::default())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -230,13 +290,17 @@ mod tests {
|
||||
/// See https://github.com/paritytech/substrate/issues/5000 for details.
|
||||
#[test]
|
||||
fn returns_when_network_event_stream_closes() {
|
||||
let network = TestNetwork::default();
|
||||
let mut gossip_engine = GossipEngine::<Block>::new(
|
||||
Arc::new(TestNetwork{}),
|
||||
network.clone(),
|
||||
[1, 2, 3, 4],
|
||||
"my_protocol".as_bytes(),
|
||||
Arc::new(TestValidator{}),
|
||||
Arc::new(AllowAll{}),
|
||||
);
|
||||
|
||||
// Drop network event stream sender side.
|
||||
drop(network.inner.lock().unwrap().event_senders.pop());
|
||||
|
||||
futures::executor::block_on(futures::future::poll_fn(move |ctx| {
|
||||
if let Poll::Pending = gossip_engine.poll_unpin(ctx) {
|
||||
panic!(
|
||||
@@ -247,4 +311,72 @@ mod tests {
|
||||
Poll::Ready(())
|
||||
}))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn keeps_multiple_subscribers_per_topic_updated_with_both_old_and_new_messages() {
|
||||
let topic = H256::default();
|
||||
let engine_id = [1, 2, 3, 4];
|
||||
let remote_peer = PeerId::random();
|
||||
let network = TestNetwork::default();
|
||||
|
||||
let mut gossip_engine = GossipEngine::<Block>::new(
|
||||
network.clone(),
|
||||
engine_id.clone(),
|
||||
"my_protocol".as_bytes(),
|
||||
Arc::new(AllowAll{}),
|
||||
);
|
||||
|
||||
let mut event_sender = network.inner.lock()
|
||||
.unwrap()
|
||||
.event_senders
|
||||
.pop()
|
||||
.unwrap();
|
||||
|
||||
// Register the remote peer.
|
||||
event_sender.start_send(
|
||||
Event::NotificationStreamOpened {
|
||||
remote: remote_peer.clone(),
|
||||
engine_id: engine_id.clone(),
|
||||
role: ObservedRole::Authority,
|
||||
}
|
||||
).unwrap();
|
||||
|
||||
let messages = vec![vec![1], vec![2]];
|
||||
let events = messages.iter().cloned().map(|m| {
|
||||
Event::NotificationsReceived {
|
||||
remote: remote_peer.clone(),
|
||||
messages: vec![(engine_id, m.into())]
|
||||
}
|
||||
}).collect::<Vec<_>>();
|
||||
|
||||
// Send first event before subscribing.
|
||||
event_sender.start_send(events[0].clone()).unwrap();
|
||||
|
||||
let mut subscribers = vec![];
|
||||
for _ in 0..2 {
|
||||
subscribers.push(gossip_engine.messages_for(topic));
|
||||
}
|
||||
|
||||
// Send second event after subscribing.
|
||||
event_sender.start_send(events[1].clone()).unwrap();
|
||||
|
||||
spawn(gossip_engine);
|
||||
|
||||
let mut subscribers = subscribers.into_iter()
|
||||
.map(|s| block_on_stream(s))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Expect each subscriber to receive both events.
|
||||
for message in messages {
|
||||
for subscriber in subscribers.iter_mut() {
|
||||
assert_eq!(
|
||||
subscriber.next(),
|
||||
Some(TopicNotification {
|
||||
message: message.clone(),
|
||||
sender: Some(remote_peer.clone()),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
use crate::{Network, MessageIntent, Validator, ValidatorContext, ValidationResult};
|
||||
|
||||
use std::collections::{HashMap, HashSet, hash_map::Entry};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::iter;
|
||||
use std::time;
|
||||
@@ -24,7 +24,6 @@ use log::trace;
|
||||
use lru::LruCache;
|
||||
use libp2p::PeerId;
|
||||
use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||
use sp_runtime::ConsensusEngineId;
|
||||
use sc_network::ObservedRole;
|
||||
use wasm_timer::Instant;
|
||||
@@ -51,7 +50,7 @@ struct PeerConsensus<H> {
|
||||
}
|
||||
|
||||
/// Topic stream message with sender.
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct TopicNotification {
|
||||
/// Message data.
|
||||
pub message: Vec<u8>,
|
||||
@@ -147,7 +146,6 @@ fn propagate<'a, B: BlockT, I>(
|
||||
/// Consensus network protocol handler. Manages statements and candidate requests.
|
||||
pub struct ConsensusGossip<B: BlockT> {
|
||||
peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
|
||||
live_message_sinks: HashMap<B::Hash, Vec<TracingUnboundedSender<TopicNotification>>>,
|
||||
messages: Vec<MessageEntry<B>>,
|
||||
known_messages: LruCache<B::Hash, ()>,
|
||||
engine_id: ConsensusEngineId,
|
||||
@@ -160,7 +158,6 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
pub fn new(validator: Arc<dyn Validator<B>>, engine_id: ConsensusEngineId) -> Self {
|
||||
ConsensusGossip {
|
||||
peers: HashMap::new(),
|
||||
live_message_sinks: HashMap::new(),
|
||||
messages: Default::default(),
|
||||
known_messages: LruCache::new(KNOWN_MESSAGES_CACHE_SIZE),
|
||||
engine_id,
|
||||
@@ -256,11 +253,6 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
/// Prune old or no longer relevant consensus messages. Provide a predicate
|
||||
/// for pruning, which returns `false` when the items with a given topic should be pruned.
|
||||
pub fn collect_garbage(&mut self) {
|
||||
self.live_message_sinks.retain(|_, sinks| {
|
||||
sinks.retain(|sink| !sink.is_closed());
|
||||
!sinks.is_empty()
|
||||
});
|
||||
|
||||
let known_messages = &mut self.known_messages;
|
||||
let before = self.messages.len();
|
||||
|
||||
@@ -278,33 +270,24 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get data of valid, incoming messages for a topic (but might have expired meanwhile)
|
||||
pub fn messages_for(&mut self, topic: B::Hash)
|
||||
-> TracingUnboundedReceiver<TopicNotification>
|
||||
{
|
||||
let (tx, rx) = tracing_unbounded("mpsc_gossip_messages_for");
|
||||
for entry in self.messages.iter_mut().filter(|e| e.topic == topic) {
|
||||
tx.unbounded_send(TopicNotification {
|
||||
message: entry.message.clone(),
|
||||
sender: entry.sender.clone(),
|
||||
})
|
||||
.expect("receiver known to be live; qed");
|
||||
}
|
||||
|
||||
self.live_message_sinks.entry(topic).or_default().push(tx);
|
||||
|
||||
rx
|
||||
/// Get valid messages received in the past for a topic (might have expired meanwhile).
|
||||
pub fn messages_for(&mut self, topic: B::Hash) -> impl Iterator<Item = TopicNotification> + '_ {
|
||||
self.messages.iter().filter(move |e| e.topic == topic).map(|entry| TopicNotification {
|
||||
message: entry.message.clone(),
|
||||
sender: entry.sender.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Handle an incoming message for topic by who via protocol. Discard message if topic already
|
||||
/// known, the message is old, its source peers isn't a registered peer or the connection to
|
||||
/// them is broken.
|
||||
/// Register incoming messages and return the ones that are new and valid (according to a gossip
|
||||
/// validator) and should thus be forwarded to the upper layers.
|
||||
pub fn on_incoming(
|
||||
&mut self,
|
||||
network: &mut dyn Network<B>,
|
||||
who: PeerId,
|
||||
messages: Vec<Vec<u8>>,
|
||||
) {
|
||||
) -> Vec<(B::Hash, TopicNotification)> {
|
||||
let mut to_forward = vec![];
|
||||
|
||||
if !messages.is_empty() {
|
||||
trace!(target: "gossip", "Received {} messages from peer {}", messages.len(), who);
|
||||
}
|
||||
@@ -335,23 +318,19 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
network.report_peer(who.clone(), rep::GOSSIP_SUCCESS);
|
||||
if let Some(ref mut peer) = self.peers.get_mut(&who) {
|
||||
peer.known_messages.insert(message_hash);
|
||||
if let Entry::Occupied(mut entry) = self.live_message_sinks.entry(topic) {
|
||||
trace!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
|
||||
entry.get_mut().retain(|sink| {
|
||||
if let Err(e) = sink.unbounded_send(TopicNotification {
|
||||
message: message.clone(),
|
||||
sender: Some(who.clone())
|
||||
}) {
|
||||
trace!(target: "gossip", "Error broadcasting message notification: {:?}", e);
|
||||
}
|
||||
!sink.is_closed()
|
||||
});
|
||||
if entry.get().is_empty() {
|
||||
entry.remove_entry();
|
||||
}
|
||||
}
|
||||
|
||||
to_forward.push((topic, TopicNotification {
|
||||
message: message.clone(),
|
||||
sender: Some(who.clone())
|
||||
}));
|
||||
|
||||
if keep {
|
||||
self.register_message_hashed(message_hash, topic, message, Some(who.clone()));
|
||||
self.register_message_hashed(
|
||||
message_hash,
|
||||
topic,
|
||||
message,
|
||||
Some(who.clone()),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
|
||||
@@ -361,6 +340,8 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
trace!(target:"gossip", "Discard message from peer {}", who);
|
||||
}
|
||||
}
|
||||
|
||||
to_forward
|
||||
}
|
||||
|
||||
/// Send all messages with given topic to a peer.
|
||||
@@ -437,7 +418,6 @@ impl<B: BlockT> ConsensusGossip<B> {
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use sp_runtime::testing::{H256, Block as RawBlock, ExtrinsicWrapper};
|
||||
use futures::executor::block_on_stream;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -518,16 +498,18 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn message_stream_include_those_sent_before_asking_for_stream() {
|
||||
fn message_stream_include_those_sent_before_asking() {
|
||||
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), [0, 0, 0, 0]);
|
||||
|
||||
// Register message.
|
||||
let message = vec![4, 5, 6];
|
||||
let topic = HashFor::<Block>::hash(&[1,2,3]);
|
||||
|
||||
consensus.register_message(topic, message.clone());
|
||||
let mut stream = block_on_stream(consensus.messages_for(topic));
|
||||
|
||||
assert_eq!(stream.next(), Some(TopicNotification { message: message, sender: None }));
|
||||
assert_eq!(
|
||||
consensus.messages_for(topic).next(),
|
||||
Some(TopicNotification { message: message, sender: None }),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -544,22 +526,6 @@ mod tests {
|
||||
assert_eq!(consensus.messages.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn can_keep_multiple_subscribers_per_topic() {
|
||||
let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), [0, 0, 0, 0]);
|
||||
|
||||
let message = vec![4, 5, 6];
|
||||
let topic = HashFor::<Block>::hash(&[1, 2, 3]);
|
||||
|
||||
consensus.register_message(topic, message.clone());
|
||||
|
||||
let mut stream1 = block_on_stream(consensus.messages_for(topic));
|
||||
let mut stream2 = block_on_stream(consensus.messages_for(topic));
|
||||
|
||||
assert_eq!(stream1.next(), Some(TopicNotification { message: message.clone(), sender: None }));
|
||||
assert_eq!(stream2.next(), Some(TopicNotification { message, sender: None }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn peer_is_removed_on_disconnect() {
|
||||
struct TestNetwork;
|
||||
|
||||
Reference in New Issue
Block a user