From 327d11025ed48f1488a1487d15b6e1a733c5b359 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 6 May 2020 14:56:46 +0200 Subject: [PATCH] client/network-gossip/bridge: Use bounded channel (#5748) * client/network-gossip/bridge: Use bounded channel Instead of returning an unbounded channel on `GossipEngine::messages_for` return a bounded channel. For now the channel length is determined by the amount of past messages cached in the `ConsensusGossip`. With a bounded channel, one can't just fire-and-forget style send into it, but has to first check whether the channel is ready. Thus this commit restructures `GossipEngine::poll` and introduces a `ForwardingState` into `GossipEngine`. * client/network-gossip/bridge: Add quickcheck for different size channels --- substrate/Cargo.lock | 3 +- substrate/client/network-gossip/Cargo.toml | 3 +- substrate/client/network-gossip/src/bridge.rs | 398 +++++++++++++++--- 3 files changed, 336 insertions(+), 68 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index fad0e21d69..56fa41cb08 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -6509,9 +6509,10 @@ dependencies = [ "libp2p", "log", "lru", + "quickcheck", + "rand 0.7.3", "sc-network", "sp-runtime", - "sp-utils", "substrate-test-runtime-client", "wasm-timer", ] diff --git a/substrate/client/network-gossip/Cargo.toml b/substrate/client/network-gossip/Cargo.toml index c6714375fe..4b02a6f70e 100644 --- a/substrate/client/network-gossip/Cargo.toml +++ b/substrate/client/network-gossip/Cargo.toml @@ -21,9 +21,10 @@ log = "0.4.8" lru = "0.4.3" sc-network = { version = "0.8.0-dev", path = "../network" } sp-runtime = { version = "2.0.0-dev", path = "../../primitives/runtime" } -sp-utils = { version = "2.0.0-dev", path = "../../primitives/utils" } wasm-timer = "0.2" [dev-dependencies] async-std = "1.5" +quickcheck = "0.9.0" +rand = "0.7.2" substrate-test-runtime-client = { version = "2.0.0-dev", path = "../../test-utils/runtime/client" } diff --git a/substrate/client/network-gossip/src/bridge.rs b/substrate/client/network-gossip/src/bridge.rs index 26e49fce8a..df2a5c8e7e 100644 --- a/substrate/client/network-gossip/src/bridge.rs +++ b/substrate/client/network-gossip/src/bridge.rs @@ -20,13 +20,13 @@ use crate::state_machine::{ConsensusGossip, TopicNotification, PERIODIC_MAINTENA use sc_network::{Event, ReputationChange}; use futures::prelude::*; +use futures::channel::mpsc::{channel, Sender, Receiver}; use libp2p::PeerId; use log::trace; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; -use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver}; use std::{ borrow::Cow, - collections::{HashMap, hash_map::Entry}, + collections::{HashMap, VecDeque}, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -43,7 +43,23 @@ pub struct GossipEngine { /// Incoming events from the network. network_event_stream: Pin + Send>>, /// Outgoing events to the consumer. - message_sinks: HashMap>>, + message_sinks: HashMap>>, + /// Buffered messages (see [`ForwardingState`]). + forwarding_state: ForwardingState, +} + +/// A gossip engine receives messages from the network via the `network_event_stream` and forwards +/// them to upper layers via the `message sinks`. In the scenario where messages have been received +/// from the network but a subscribed message sink is not yet ready to receive the messages, the +/// messages are buffered. To model this process a gossip engine can be in two states. +enum ForwardingState { + /// The gossip engine is currently not forwarding any messages and will poll the network for + /// more messages to forward. + Idle, + /// The gossip engine is in the progress of forwarding messages and thus will not poll the + /// network for more messages until it has send all current messages into the subscribed message + /// sinks. + Busy(VecDeque<(B::Hash, TopicNotification)>), } impl Unpin for GossipEngine {} @@ -69,6 +85,7 @@ impl GossipEngine { network_event_stream, message_sinks: HashMap::new(), + forwarding_state: ForwardingState::Idle, } } @@ -96,12 +113,19 @@ impl GossipEngine { /// Get data of valid, incoming messages for a topic (but might have expired meanwhile). pub fn messages_for(&mut self, topic: B::Hash) - -> TracingUnboundedReceiver + -> Receiver { - let (tx, rx) = tracing_unbounded("mpsc_gossip_messages_for"); + let past_messages = self.state_machine.messages_for(topic).collect::>(); + // The channel length is not critical for correctness. By the implementation of `channel` + // each sender is guaranteed a single buffer slot, making it a non-rendezvous channel and + // thus preventing direct dead-locks. A minimum channel length of 10 is an estimate based on + // the fact that despite `NotificationsReceived` having a `Vec` of messages, it only ever + // contains a single message. + let (mut tx, rx) = channel(usize::max(past_messages.len(), 10)); - for notification in self.state_machine.messages_for(topic) { - tx.unbounded_send(notification).expect("receiver known to be live; qed"); + for notification in past_messages{ + tx.try_send(notification) + .expect("receiver known to be live, and buffer size known to suffice; qed"); } self.message_sinks.entry(topic).or_default().push(tx); @@ -152,65 +176,106 @@ impl Future for GossipEngine { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = &mut *self; - loop { - match this.network_event_stream.poll_next_unpin(cx) { - Poll::Ready(Some(event)) => match event { - Event::NotificationStreamOpened { remote, engine_id: msg_engine_id, role } => { - if msg_engine_id != this.engine_id { - continue; - } - this.state_machine.new_peer(&mut *this.network, remote, role); - } - Event::NotificationStreamClosed { remote, engine_id: msg_engine_id } => { - if msg_engine_id != this.engine_id { - continue; - } - this.state_machine.peer_disconnected(&mut *this.network, remote); - }, - Event::NotificationsReceived { remote, messages } => { - let messages = messages.into_iter().filter_map(|(engine, data)| { - if engine == this.engine_id { - Some(data.to_vec()) - } else { - None - } - }).collect(); - - let to_forward = this.state_machine.on_incoming( - &mut *this.network, - remote, - messages, - ); - - for (topic, notification) in to_forward { - if let Entry::Occupied(mut entry) = this.message_sinks.entry(topic) { - trace!( - target: "gossip", - "Pushing consensus message to sinks for {}.", topic, - ); - entry.get_mut().retain(move |sink| { - if let Err(e) = sink.unbounded_send(notification.clone()) { - trace!( - target: "gossip", - "Error broadcasting message notification: {:?}", e, - ); - } - !sink.is_closed() - }); - if entry.get().is_empty() { - entry.remove_entry(); + 'outer: loop { + match &mut this.forwarding_state { + ForwardingState::Idle => { + match this.network_event_stream.poll_next_unpin(cx) { + Poll::Ready(Some(event)) => match event { + Event::NotificationStreamOpened { remote, engine_id, role } => { + if engine_id != this.engine_id { + continue; } + this.state_machine.new_peer(&mut *this.network, remote, role); + } + Event::NotificationStreamClosed { remote, engine_id } => { + if engine_id != this.engine_id { + continue; + } + this.state_machine.peer_disconnected(&mut *this.network, remote); + }, + Event::NotificationsReceived { remote, messages } => { + let messages = messages.into_iter().filter_map(|(engine, data)| { + if engine == this.engine_id { + Some(data.to_vec()) + } else { + None + } + }).collect(); + + let to_forward = this.state_machine.on_incoming( + &mut *this.network, + remote, + messages, + ); + + this.forwarding_state = ForwardingState::Busy(to_forward.into()); + }, + Event::Dht(_) => {} + } + // The network event stream closed. Do the same for [`GossipValidator`]. + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => break, + } + } + ForwardingState::Busy(to_forward) => { + let (topic, notification) = match to_forward.pop_front() { + Some(n) => n, + None => { + this.forwarding_state = ForwardingState::Idle; + continue; + } + }; + + let sinks = match this.message_sinks.get_mut(&topic) { + Some(sinks) => sinks, + None => { + continue; + }, + }; + + // Make sure all sinks for the given topic are ready. + for sink in sinks.iter_mut() { + match sink.poll_ready(cx) { + Poll::Ready(Ok(())) => {}, + // Receiver has been dropped. Ignore for now, filtered out in (1). + Poll::Ready(Err(_)) => {}, + Poll::Pending => { + // Push back onto queue for later. + to_forward.push_front((topic, notification)); + break 'outer; } } - }, - Event::Dht(_) => {} + } + + // Filter out all closed sinks. + sinks.retain(|sink| !sink.is_closed()); // (1) + + if sinks.is_empty() { + this.message_sinks.remove(&topic); + continue; + } + + trace!( + target: "gossip", + "Pushing consensus message to sinks for {}.", topic, + ); + + // Send the notification on each sink. + for sink in sinks { + match sink.start_send(notification.clone()) { + Ok(()) => {}, + Err(e) if e.is_full() => unreachable!( + "Previously ensured that all sinks are ready; qed.", + ), + // Receiver got dropped. Will be removed in next iteration (See (1)). + Err(_) => {}, + } + } } - // The network event stream closed. Do the same for [`GossipValidator`]. - Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => break, } } + while let Poll::Ready(()) = this.periodic_maintenance_interval.poll_unpin(cx) { this.periodic_maintenance_interval.reset(PERIODIC_MAINTENANCE_INTERVAL); this.state_machine.tick(&mut *this.network); @@ -229,9 +294,12 @@ impl Future for GossipEngine { mod tests { use async_std::task::spawn; use crate::{ValidationResult, ValidatorContext}; - use futures::{channel::mpsc::{channel, Sender}, executor::block_on_stream}; + use futures::{channel::mpsc::{unbounded, UnboundedSender}, executor::{block_on, block_on_stream}, future::poll_fn}; + use quickcheck::{Arbitrary, Gen, QuickCheck}; + use rand::Rng; use sc_network::ObservedRole; use sp_runtime::{testing::H256, traits::{Block as BlockT}}; + use std::convert::TryInto; use std::sync::{Arc, Mutex}; use substrate_test_runtime_client::runtime::Block; use super::*; @@ -243,12 +311,12 @@ mod tests { #[derive(Clone, Default)] struct TestNetworkInner { - event_senders: Vec>, + event_senders: Vec>, } impl Network for TestNetwork { fn event_stream(&self) -> Pin + Send>> { - let (tx, rx) = channel(100); + let (tx, rx) = unbounded(); self.inner.lock().unwrap().event_senders.push(tx); Box::pin(rx) @@ -301,7 +369,7 @@ mod tests { // Drop network event stream sender side. drop(network.inner.lock().unwrap().event_senders.pop()); - futures::executor::block_on(futures::future::poll_fn(move |ctx| { + block_on(poll_fn(move |ctx| { if let Poll::Pending = gossip_engine.poll_unpin(ctx) { panic!( "Expected gossip engine to finish on first poll, given that \ @@ -339,7 +407,7 @@ mod tests { engine_id: engine_id.clone(), role: ObservedRole::Authority, } - ).unwrap(); + ).expect("Event stream is unbounded; qed."); let messages = vec![vec![1], vec![2]]; let events = messages.iter().cloned().map(|m| { @@ -350,7 +418,7 @@ mod tests { }).collect::>(); // Send first event before subscribing. - event_sender.start_send(events[0].clone()).unwrap(); + event_sender.start_send(events[0].clone()).expect("Event stream is unbounded; qed."); let mut subscribers = vec![]; for _ in 0..2 { @@ -358,7 +426,7 @@ mod tests { } // Send second event after subscribing. - event_sender.start_send(events[1].clone()).unwrap(); + event_sender.start_send(events[1].clone()).expect("Event stream is unbounded; qed."); spawn(gossip_engine); @@ -379,4 +447,202 @@ mod tests { } } } + + #[test] + fn forwarding_to_different_size_and_topic_channels() { + #[derive(Clone, Debug)] + struct ChannelLengthAndTopic{ + length: usize, + topic: H256, + } + + impl Arbitrary for ChannelLengthAndTopic { + fn arbitrary(g: &mut G) -> Self { + Self { + length: g.gen_range(0, 100), + // Make sure channel topics and message topics overlap by choosing a small + // range. + topic: H256::from_low_u64_ne(g.gen_range(0, 10)), + } + } + } + + #[derive(Clone, Debug)] + struct Message { + topic: H256, + } + + impl Arbitrary for Message{ + fn arbitrary(g: &mut G) -> Self { + Self { + // Make sure channel topics and message topics overlap by choosing a small + // range. + topic: H256::from_low_u64_ne(g.gen_range(0, 10)), + } + } + } + + /// Validator that always returns `ProcessAndKeep` interpreting the first 32 bytes of data + /// as the message topic. + struct TestValidator; + + impl Validator for TestValidator { + fn validate( + &self, + _context: &mut dyn ValidatorContext, + _sender: &PeerId, + data: &[u8], + ) -> ValidationResult { + ValidationResult::ProcessAndKeep(H256::from_slice(&data[0..32])) + } + } + + fn prop(channels: Vec, notifications: Vec>) { + let engine_id = [1, 2, 3, 4]; + let remote_peer = PeerId::random(); + let network = TestNetwork::default(); + + let num_channels_per_topic = channels.iter() + .fold(HashMap::new(), |mut acc, ChannelLengthAndTopic { topic, .. }| { + acc.entry(topic).and_modify(|e| *e += 1).or_insert(1); + acc + }); + + let expected_msgs_per_topic_all_chan = notifications.iter() + .fold(HashMap::new(), |mut acc, messages| { + for message in messages { + acc.entry(message.topic).and_modify(|e| *e += 1).or_insert(1); + } + acc + }) + .into_iter() + // Messages are cloned for each channel with the corresponding topic, thus multiply + // with the amount of channels per topic. If there is no channel for a given topic, + // don't expect any messages for the topic to be received. + .map(|(topic, num)| (topic, num_channels_per_topic.get(&topic).unwrap_or(&0) * num)) + .collect::>(); + + let mut gossip_engine = GossipEngine::::new( + network.clone(), + engine_id.clone(), + "my_protocol".as_bytes(), + Arc::new(TestValidator{}), + ); + + // Create channels. + let (txs, mut rxs) = channels.iter() + .map(|ChannelLengthAndTopic { length, topic }| { + (topic.clone(), channel(*length)) + }) + .fold((vec![], vec![]), |mut acc, (topic, (tx, rx))| { + acc.0.push((topic, tx)); acc.1.push((topic, rx)); + acc + }); + + // Insert sender sides into `gossip_engine`. + for (topic, tx) in txs { + match gossip_engine.message_sinks.get_mut(&topic) { + Some(entry) => entry.push(tx), + None => {gossip_engine.message_sinks.insert(topic, vec![tx]);}, + } + } + + + let mut event_sender = network.inner.lock() + .unwrap() + .event_senders + .pop() + .unwrap(); + + // Register the remote peer. + event_sender.start_send( + Event::NotificationStreamOpened { + remote: remote_peer.clone(), + engine_id: engine_id.clone(), + role: ObservedRole::Authority, + } + ).expect("Event stream is unbounded; qed."); + + // Send messages into the network event stream. + for (i_notification, messages) in notifications.iter().enumerate() { + let messages = messages.into_iter().enumerate() + .map(|(i_message, Message { topic })| { + // Embed the topic in the first 256 bytes of the message to be extracted by + // the [`TestValidator`] later on. + let mut message = topic.as_bytes().to_vec(); + + // Make sure the message is unique via `i_notification` and `i_message` to + // ensure [`ConsensusBridge`] does not deduplicate it. + message.push(i_notification.try_into().unwrap()); + message.push(i_message.try_into().unwrap()); + + (engine_id, message.into()) + }).collect(); + + event_sender.start_send(Event::NotificationsReceived { + remote: remote_peer.clone(), + messages, + }).expect("Event stream is unbounded; qed."); + } + + let mut received_msgs_per_topic_all_chan = HashMap::::new(); + + // Poll both gossip engine and each receiver and track the amount of received messages. + block_on(poll_fn(|cx| { + loop { + if let Poll::Ready(()) = gossip_engine.poll_unpin(cx) { + unreachable!( + "Event stream sender side is not dropped, thus gossip engine does not \ + terminate", + ); + } + + let mut progress = false; + + for (topic, rx) in rxs.iter_mut() { + match rx.poll_next_unpin(cx) { + Poll::Ready(Some(_)) => { + progress = true; + received_msgs_per_topic_all_chan.entry(*topic) + .and_modify(|e| *e += 1) + .or_insert(1); + }, + Poll::Ready(None) => unreachable!( + "Sender side of channel is never dropped", + ), + Poll::Pending => {}, + } + } + + if !progress { + break; + } + } + Poll::Ready(()) + })); + + // Compare amount of expected messages with amount of received messages. + for (expected_topic, expected_num) in expected_msgs_per_topic_all_chan.iter() { + assert_eq!( + received_msgs_per_topic_all_chan.get(&expected_topic).unwrap_or(&0), + expected_num, + ); + } + for (received_topic, received_num) in expected_msgs_per_topic_all_chan.iter() { + assert_eq!( + expected_msgs_per_topic_all_chan.get(&received_topic).unwrap_or(&0), + received_num, + ); + } + } + + // Past regressions. + prop(vec![], vec![vec![Message{ topic: H256::default()}]]); + prop( + vec![ChannelLengthAndTopic {length: 71, topic: H256::default()}], + vec![vec![Message{ topic: H256::default()}]], + ); + + QuickCheck::new().quickcheck(prop as fn(_, _)) + } }