mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-15 17:21:08 +00:00
client/network-gossip/bridge: Use bounded channel (#5748)
* client/network-gossip/bridge: Use bounded channel Instead of returning an unbounded channel on `GossipEngine::messages_for` return a bounded channel. For now the channel length is determined by the amount of past messages cached in the `ConsensusGossip`. With a bounded channel, one can't just fire-and-forget style send into it, but has to first check whether the channel is ready. Thus this commit restructures `GossipEngine::poll` and introduces a `ForwardingState` into `GossipEngine`. * client/network-gossip/bridge: Add quickcheck for different size channels
This commit is contained in:
Generated
+2
-1
@@ -6509,9 +6509,10 @@ dependencies = [
|
||||
"libp2p",
|
||||
"log",
|
||||
"lru",
|
||||
"quickcheck",
|
||||
"rand 0.7.3",
|
||||
"sc-network",
|
||||
"sp-runtime",
|
||||
"sp-utils",
|
||||
"substrate-test-runtime-client",
|
||||
"wasm-timer",
|
||||
]
|
||||
|
||||
@@ -21,9 +21,10 @@ log = "0.4.8"
|
||||
lru = "0.4.3"
|
||||
sc-network = { version = "0.8.0-dev", path = "../network" }
|
||||
sp-runtime = { version = "2.0.0-dev", path = "../../primitives/runtime" }
|
||||
sp-utils = { version = "2.0.0-dev", path = "../../primitives/utils" }
|
||||
wasm-timer = "0.2"
|
||||
|
||||
[dev-dependencies]
|
||||
async-std = "1.5"
|
||||
quickcheck = "0.9.0"
|
||||
rand = "0.7.2"
|
||||
substrate-test-runtime-client = { version = "2.0.0-dev", path = "../../test-utils/runtime/client" }
|
||||
|
||||
@@ -20,13 +20,13 @@ use crate::state_machine::{ConsensusGossip, TopicNotification, PERIODIC_MAINTENA
|
||||
use sc_network::{Event, ReputationChange};
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures::channel::mpsc::{channel, Sender, Receiver};
|
||||
use libp2p::PeerId;
|
||||
use log::trace;
|
||||
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
|
||||
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::{HashMap, hash_map::Entry},
|
||||
collections::{HashMap, VecDeque},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
@@ -43,7 +43,23 @@ pub struct GossipEngine<B: BlockT> {
|
||||
/// Incoming events from the network.
|
||||
network_event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>,
|
||||
/// Outgoing events to the consumer.
|
||||
message_sinks: HashMap<B::Hash, Vec<TracingUnboundedSender<TopicNotification>>>,
|
||||
message_sinks: HashMap<B::Hash, Vec<Sender<TopicNotification>>>,
|
||||
/// Buffered messages (see [`ForwardingState`]).
|
||||
forwarding_state: ForwardingState<B>,
|
||||
}
|
||||
|
||||
/// A gossip engine receives messages from the network via the `network_event_stream` and forwards
|
||||
/// them to upper layers via the `message sinks`. In the scenario where messages have been received
|
||||
/// from the network but a subscribed message sink is not yet ready to receive the messages, the
|
||||
/// messages are buffered. To model this process a gossip engine can be in two states.
|
||||
enum ForwardingState<B: BlockT> {
|
||||
/// The gossip engine is currently not forwarding any messages and will poll the network for
|
||||
/// more messages to forward.
|
||||
Idle,
|
||||
/// The gossip engine is in the progress of forwarding messages and thus will not poll the
|
||||
/// network for more messages until it has send all current messages into the subscribed message
|
||||
/// sinks.
|
||||
Busy(VecDeque<(B::Hash, TopicNotification)>),
|
||||
}
|
||||
|
||||
impl<B: BlockT> Unpin for GossipEngine<B> {}
|
||||
@@ -69,6 +85,7 @@ impl<B: BlockT> GossipEngine<B> {
|
||||
|
||||
network_event_stream,
|
||||
message_sinks: HashMap::new(),
|
||||
forwarding_state: ForwardingState::Idle,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,12 +113,19 @@ impl<B: BlockT> GossipEngine<B> {
|
||||
|
||||
/// Get data of valid, incoming messages for a topic (but might have expired meanwhile).
|
||||
pub fn messages_for(&mut self, topic: B::Hash)
|
||||
-> TracingUnboundedReceiver<TopicNotification>
|
||||
-> Receiver<TopicNotification>
|
||||
{
|
||||
let (tx, rx) = tracing_unbounded("mpsc_gossip_messages_for");
|
||||
let past_messages = self.state_machine.messages_for(topic).collect::<Vec<_>>();
|
||||
// The channel length is not critical for correctness. By the implementation of `channel`
|
||||
// each sender is guaranteed a single buffer slot, making it a non-rendezvous channel and
|
||||
// thus preventing direct dead-locks. A minimum channel length of 10 is an estimate based on
|
||||
// the fact that despite `NotificationsReceived` having a `Vec` of messages, it only ever
|
||||
// contains a single message.
|
||||
let (mut tx, rx) = channel(usize::max(past_messages.len(), 10));
|
||||
|
||||
for notification in self.state_machine.messages_for(topic) {
|
||||
tx.unbounded_send(notification).expect("receiver known to be live; qed");
|
||||
for notification in past_messages{
|
||||
tx.try_send(notification)
|
||||
.expect("receiver known to be live, and buffer size known to suffice; qed");
|
||||
}
|
||||
|
||||
self.message_sinks.entry(topic).or_default().push(tx);
|
||||
@@ -152,65 +176,106 @@ impl<B: BlockT> Future for GossipEngine<B> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let this = &mut *self;
|
||||
|
||||
loop {
|
||||
match this.network_event_stream.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(event)) => match event {
|
||||
Event::NotificationStreamOpened { remote, engine_id: msg_engine_id, role } => {
|
||||
if msg_engine_id != this.engine_id {
|
||||
continue;
|
||||
}
|
||||
this.state_machine.new_peer(&mut *this.network, remote, role);
|
||||
}
|
||||
Event::NotificationStreamClosed { remote, engine_id: msg_engine_id } => {
|
||||
if msg_engine_id != this.engine_id {
|
||||
continue;
|
||||
}
|
||||
this.state_machine.peer_disconnected(&mut *this.network, remote);
|
||||
},
|
||||
Event::NotificationsReceived { remote, messages } => {
|
||||
let messages = messages.into_iter().filter_map(|(engine, data)| {
|
||||
if engine == this.engine_id {
|
||||
Some(data.to_vec())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}).collect();
|
||||
|
||||
let to_forward = this.state_machine.on_incoming(
|
||||
&mut *this.network,
|
||||
remote,
|
||||
messages,
|
||||
);
|
||||
|
||||
for (topic, notification) in to_forward {
|
||||
if let Entry::Occupied(mut entry) = this.message_sinks.entry(topic) {
|
||||
trace!(
|
||||
target: "gossip",
|
||||
"Pushing consensus message to sinks for {}.", topic,
|
||||
);
|
||||
entry.get_mut().retain(move |sink| {
|
||||
if let Err(e) = sink.unbounded_send(notification.clone()) {
|
||||
trace!(
|
||||
target: "gossip",
|
||||
"Error broadcasting message notification: {:?}", e,
|
||||
);
|
||||
}
|
||||
!sink.is_closed()
|
||||
});
|
||||
if entry.get().is_empty() {
|
||||
entry.remove_entry();
|
||||
'outer: loop {
|
||||
match &mut this.forwarding_state {
|
||||
ForwardingState::Idle => {
|
||||
match this.network_event_stream.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(event)) => match event {
|
||||
Event::NotificationStreamOpened { remote, engine_id, role } => {
|
||||
if engine_id != this.engine_id {
|
||||
continue;
|
||||
}
|
||||
this.state_machine.new_peer(&mut *this.network, remote, role);
|
||||
}
|
||||
Event::NotificationStreamClosed { remote, engine_id } => {
|
||||
if engine_id != this.engine_id {
|
||||
continue;
|
||||
}
|
||||
this.state_machine.peer_disconnected(&mut *this.network, remote);
|
||||
},
|
||||
Event::NotificationsReceived { remote, messages } => {
|
||||
let messages = messages.into_iter().filter_map(|(engine, data)| {
|
||||
if engine == this.engine_id {
|
||||
Some(data.to_vec())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}).collect();
|
||||
|
||||
let to_forward = this.state_machine.on_incoming(
|
||||
&mut *this.network,
|
||||
remote,
|
||||
messages,
|
||||
);
|
||||
|
||||
this.forwarding_state = ForwardingState::Busy(to_forward.into());
|
||||
},
|
||||
Event::Dht(_) => {}
|
||||
}
|
||||
// The network event stream closed. Do the same for [`GossipValidator`].
|
||||
Poll::Ready(None) => return Poll::Ready(()),
|
||||
Poll::Pending => break,
|
||||
}
|
||||
}
|
||||
ForwardingState::Busy(to_forward) => {
|
||||
let (topic, notification) = match to_forward.pop_front() {
|
||||
Some(n) => n,
|
||||
None => {
|
||||
this.forwarding_state = ForwardingState::Idle;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let sinks = match this.message_sinks.get_mut(&topic) {
|
||||
Some(sinks) => sinks,
|
||||
None => {
|
||||
continue;
|
||||
},
|
||||
};
|
||||
|
||||
// Make sure all sinks for the given topic are ready.
|
||||
for sink in sinks.iter_mut() {
|
||||
match sink.poll_ready(cx) {
|
||||
Poll::Ready(Ok(())) => {},
|
||||
// Receiver has been dropped. Ignore for now, filtered out in (1).
|
||||
Poll::Ready(Err(_)) => {},
|
||||
Poll::Pending => {
|
||||
// Push back onto queue for later.
|
||||
to_forward.push_front((topic, notification));
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
},
|
||||
Event::Dht(_) => {}
|
||||
}
|
||||
|
||||
// Filter out all closed sinks.
|
||||
sinks.retain(|sink| !sink.is_closed()); // (1)
|
||||
|
||||
if sinks.is_empty() {
|
||||
this.message_sinks.remove(&topic);
|
||||
continue;
|
||||
}
|
||||
|
||||
trace!(
|
||||
target: "gossip",
|
||||
"Pushing consensus message to sinks for {}.", topic,
|
||||
);
|
||||
|
||||
// Send the notification on each sink.
|
||||
for sink in sinks {
|
||||
match sink.start_send(notification.clone()) {
|
||||
Ok(()) => {},
|
||||
Err(e) if e.is_full() => unreachable!(
|
||||
"Previously ensured that all sinks are ready; qed.",
|
||||
),
|
||||
// Receiver got dropped. Will be removed in next iteration (See (1)).
|
||||
Err(_) => {},
|
||||
}
|
||||
}
|
||||
}
|
||||
// The network event stream closed. Do the same for [`GossipValidator`].
|
||||
Poll::Ready(None) => return Poll::Ready(()),
|
||||
Poll::Pending => break,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
while let Poll::Ready(()) = this.periodic_maintenance_interval.poll_unpin(cx) {
|
||||
this.periodic_maintenance_interval.reset(PERIODIC_MAINTENANCE_INTERVAL);
|
||||
this.state_machine.tick(&mut *this.network);
|
||||
@@ -229,9 +294,12 @@ impl<B: BlockT> Future for GossipEngine<B> {
|
||||
mod tests {
|
||||
use async_std::task::spawn;
|
||||
use crate::{ValidationResult, ValidatorContext};
|
||||
use futures::{channel::mpsc::{channel, Sender}, executor::block_on_stream};
|
||||
use futures::{channel::mpsc::{unbounded, UnboundedSender}, executor::{block_on, block_on_stream}, future::poll_fn};
|
||||
use quickcheck::{Arbitrary, Gen, QuickCheck};
|
||||
use rand::Rng;
|
||||
use sc_network::ObservedRole;
|
||||
use sp_runtime::{testing::H256, traits::{Block as BlockT}};
|
||||
use std::convert::TryInto;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use substrate_test_runtime_client::runtime::Block;
|
||||
use super::*;
|
||||
@@ -243,12 +311,12 @@ mod tests {
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct TestNetworkInner {
|
||||
event_senders: Vec<Sender<Event>>,
|
||||
event_senders: Vec<UnboundedSender<Event>>,
|
||||
}
|
||||
|
||||
impl<B: BlockT> Network<B> for TestNetwork {
|
||||
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
|
||||
let (tx, rx) = channel(100);
|
||||
let (tx, rx) = unbounded();
|
||||
self.inner.lock().unwrap().event_senders.push(tx);
|
||||
|
||||
Box::pin(rx)
|
||||
@@ -301,7 +369,7 @@ mod tests {
|
||||
// Drop network event stream sender side.
|
||||
drop(network.inner.lock().unwrap().event_senders.pop());
|
||||
|
||||
futures::executor::block_on(futures::future::poll_fn(move |ctx| {
|
||||
block_on(poll_fn(move |ctx| {
|
||||
if let Poll::Pending = gossip_engine.poll_unpin(ctx) {
|
||||
panic!(
|
||||
"Expected gossip engine to finish on first poll, given that \
|
||||
@@ -339,7 +407,7 @@ mod tests {
|
||||
engine_id: engine_id.clone(),
|
||||
role: ObservedRole::Authority,
|
||||
}
|
||||
).unwrap();
|
||||
).expect("Event stream is unbounded; qed.");
|
||||
|
||||
let messages = vec![vec![1], vec![2]];
|
||||
let events = messages.iter().cloned().map(|m| {
|
||||
@@ -350,7 +418,7 @@ mod tests {
|
||||
}).collect::<Vec<_>>();
|
||||
|
||||
// Send first event before subscribing.
|
||||
event_sender.start_send(events[0].clone()).unwrap();
|
||||
event_sender.start_send(events[0].clone()).expect("Event stream is unbounded; qed.");
|
||||
|
||||
let mut subscribers = vec![];
|
||||
for _ in 0..2 {
|
||||
@@ -358,7 +426,7 @@ mod tests {
|
||||
}
|
||||
|
||||
// Send second event after subscribing.
|
||||
event_sender.start_send(events[1].clone()).unwrap();
|
||||
event_sender.start_send(events[1].clone()).expect("Event stream is unbounded; qed.");
|
||||
|
||||
spawn(gossip_engine);
|
||||
|
||||
@@ -379,4 +447,202 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn forwarding_to_different_size_and_topic_channels() {
|
||||
#[derive(Clone, Debug)]
|
||||
struct ChannelLengthAndTopic{
|
||||
length: usize,
|
||||
topic: H256,
|
||||
}
|
||||
|
||||
impl Arbitrary for ChannelLengthAndTopic {
|
||||
fn arbitrary<G: Gen>(g: &mut G) -> Self {
|
||||
Self {
|
||||
length: g.gen_range(0, 100),
|
||||
// Make sure channel topics and message topics overlap by choosing a small
|
||||
// range.
|
||||
topic: H256::from_low_u64_ne(g.gen_range(0, 10)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct Message {
|
||||
topic: H256,
|
||||
}
|
||||
|
||||
impl Arbitrary for Message{
|
||||
fn arbitrary<G: Gen>(g: &mut G) -> Self {
|
||||
Self {
|
||||
// Make sure channel topics and message topics overlap by choosing a small
|
||||
// range.
|
||||
topic: H256::from_low_u64_ne(g.gen_range(0, 10)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Validator that always returns `ProcessAndKeep` interpreting the first 32 bytes of data
|
||||
/// as the message topic.
|
||||
struct TestValidator;
|
||||
|
||||
impl Validator<Block> for TestValidator {
|
||||
fn validate(
|
||||
&self,
|
||||
_context: &mut dyn ValidatorContext<Block>,
|
||||
_sender: &PeerId,
|
||||
data: &[u8],
|
||||
) -> ValidationResult<H256> {
|
||||
ValidationResult::ProcessAndKeep(H256::from_slice(&data[0..32]))
|
||||
}
|
||||
}
|
||||
|
||||
fn prop(channels: Vec<ChannelLengthAndTopic>, notifications: Vec<Vec<Message>>) {
|
||||
let engine_id = [1, 2, 3, 4];
|
||||
let remote_peer = PeerId::random();
|
||||
let network = TestNetwork::default();
|
||||
|
||||
let num_channels_per_topic = channels.iter()
|
||||
.fold(HashMap::new(), |mut acc, ChannelLengthAndTopic { topic, .. }| {
|
||||
acc.entry(topic).and_modify(|e| *e += 1).or_insert(1);
|
||||
acc
|
||||
});
|
||||
|
||||
let expected_msgs_per_topic_all_chan = notifications.iter()
|
||||
.fold(HashMap::new(), |mut acc, messages| {
|
||||
for message in messages {
|
||||
acc.entry(message.topic).and_modify(|e| *e += 1).or_insert(1);
|
||||
}
|
||||
acc
|
||||
})
|
||||
.into_iter()
|
||||
// Messages are cloned for each channel with the corresponding topic, thus multiply
|
||||
// with the amount of channels per topic. If there is no channel for a given topic,
|
||||
// don't expect any messages for the topic to be received.
|
||||
.map(|(topic, num)| (topic, num_channels_per_topic.get(&topic).unwrap_or(&0) * num))
|
||||
.collect::<HashMap<H256, _>>();
|
||||
|
||||
let mut gossip_engine = GossipEngine::<Block>::new(
|
||||
network.clone(),
|
||||
engine_id.clone(),
|
||||
"my_protocol".as_bytes(),
|
||||
Arc::new(TestValidator{}),
|
||||
);
|
||||
|
||||
// Create channels.
|
||||
let (txs, mut rxs) = channels.iter()
|
||||
.map(|ChannelLengthAndTopic { length, topic }| {
|
||||
(topic.clone(), channel(*length))
|
||||
})
|
||||
.fold((vec![], vec![]), |mut acc, (topic, (tx, rx))| {
|
||||
acc.0.push((topic, tx)); acc.1.push((topic, rx));
|
||||
acc
|
||||
});
|
||||
|
||||
// Insert sender sides into `gossip_engine`.
|
||||
for (topic, tx) in txs {
|
||||
match gossip_engine.message_sinks.get_mut(&topic) {
|
||||
Some(entry) => entry.push(tx),
|
||||
None => {gossip_engine.message_sinks.insert(topic, vec![tx]);},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
let mut event_sender = network.inner.lock()
|
||||
.unwrap()
|
||||
.event_senders
|
||||
.pop()
|
||||
.unwrap();
|
||||
|
||||
// Register the remote peer.
|
||||
event_sender.start_send(
|
||||
Event::NotificationStreamOpened {
|
||||
remote: remote_peer.clone(),
|
||||
engine_id: engine_id.clone(),
|
||||
role: ObservedRole::Authority,
|
||||
}
|
||||
).expect("Event stream is unbounded; qed.");
|
||||
|
||||
// Send messages into the network event stream.
|
||||
for (i_notification, messages) in notifications.iter().enumerate() {
|
||||
let messages = messages.into_iter().enumerate()
|
||||
.map(|(i_message, Message { topic })| {
|
||||
// Embed the topic in the first 256 bytes of the message to be extracted by
|
||||
// the [`TestValidator`] later on.
|
||||
let mut message = topic.as_bytes().to_vec();
|
||||
|
||||
// Make sure the message is unique via `i_notification` and `i_message` to
|
||||
// ensure [`ConsensusBridge`] does not deduplicate it.
|
||||
message.push(i_notification.try_into().unwrap());
|
||||
message.push(i_message.try_into().unwrap());
|
||||
|
||||
(engine_id, message.into())
|
||||
}).collect();
|
||||
|
||||
event_sender.start_send(Event::NotificationsReceived {
|
||||
remote: remote_peer.clone(),
|
||||
messages,
|
||||
}).expect("Event stream is unbounded; qed.");
|
||||
}
|
||||
|
||||
let mut received_msgs_per_topic_all_chan = HashMap::<H256, _>::new();
|
||||
|
||||
// Poll both gossip engine and each receiver and track the amount of received messages.
|
||||
block_on(poll_fn(|cx| {
|
||||
loop {
|
||||
if let Poll::Ready(()) = gossip_engine.poll_unpin(cx) {
|
||||
unreachable!(
|
||||
"Event stream sender side is not dropped, thus gossip engine does not \
|
||||
terminate",
|
||||
);
|
||||
}
|
||||
|
||||
let mut progress = false;
|
||||
|
||||
for (topic, rx) in rxs.iter_mut() {
|
||||
match rx.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(_)) => {
|
||||
progress = true;
|
||||
received_msgs_per_topic_all_chan.entry(*topic)
|
||||
.and_modify(|e| *e += 1)
|
||||
.or_insert(1);
|
||||
},
|
||||
Poll::Ready(None) => unreachable!(
|
||||
"Sender side of channel is never dropped",
|
||||
),
|
||||
Poll::Pending => {},
|
||||
}
|
||||
}
|
||||
|
||||
if !progress {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Poll::Ready(())
|
||||
}));
|
||||
|
||||
// Compare amount of expected messages with amount of received messages.
|
||||
for (expected_topic, expected_num) in expected_msgs_per_topic_all_chan.iter() {
|
||||
assert_eq!(
|
||||
received_msgs_per_topic_all_chan.get(&expected_topic).unwrap_or(&0),
|
||||
expected_num,
|
||||
);
|
||||
}
|
||||
for (received_topic, received_num) in expected_msgs_per_topic_all_chan.iter() {
|
||||
assert_eq!(
|
||||
expected_msgs_per_topic_all_chan.get(&received_topic).unwrap_or(&0),
|
||||
received_num,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Past regressions.
|
||||
prop(vec![], vec![vec![Message{ topic: H256::default()}]]);
|
||||
prop(
|
||||
vec![ChannelLengthAndTopic {length: 71, topic: H256::default()}],
|
||||
vec![vec![Message{ topic: H256::default()}]],
|
||||
);
|
||||
|
||||
QuickCheck::new().quickcheck(prop as fn(_, _))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user