From 9201f8abf38a7a12e6514c42690ec4a53a38e69a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 18 Mar 2020 20:15:57 +0100 Subject: [PATCH] client/network-gossip/src/bridge: Finish when network event stream closes (#5282) * client/network-gossip/src/bridge: Finish when network even stream closes Previously within `::poll` one would poll the `network_event_stream` ignoring all messages other than `Poll::Ready(Some())`. Ignoring `Poll::Ready(None)` leads to a panic on the next poll of the stream, gien that it is not fused. By design `network_event_stream` does not close unless an unbounded send into it fails, or the `NetworkWorker` gets shut down. > The stream never ends (unless the `NetworkWorker` gets shut down). > (client/network/src/service.rs) An `unbounded_send` to fail on an unbounded channel is unlikely. The `NetworkWorker` shutting down is not unlikely. In such case the `GossipEngine` should shut down as well. With this patch a `` finishes on `Poll::Ready(None)` returned from `network_event_stream`. * client/finality-grandpa/communication: Error on gossip engine finished Have `::poll` return `Poll::Ready(Err)` instead of `Poll::Ready(Ok)` to be consistent with the handling of the neighbor packet worker stream and the gossip validator report stream. Both `Err` as well as `Ok` shut down the `NetworkBridge` as well as the `VoterWorker`. * client/network-gossip/src/bridge: Add regression test * client/network-gossip: Move substrate test client to dev dependencies * client/network-gossip: Remove TODO Addressed in a follow up pull request. * client/network-gossip/bridge: Put match on newline after loop * client/finality-grandpa/src/observer: Fix regression test Make sure the event stream sender side is not dropped till the end. --- substrate/Cargo.lock | 1 + .../finality-grandpa/src/communication/mod.rs | 5 +- .../client/finality-grandpa/src/observer.rs | 13 +- substrate/client/network-gossip/Cargo.toml | 3 + substrate/client/network-gossip/src/bridge.rs | 131 ++++++++++++++---- 5 files changed, 117 insertions(+), 36 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 70610b770b..47ac61f036 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -6274,6 +6274,7 @@ dependencies = [ "lru", "sc-network", "sp-runtime", + "substrate-test-runtime-client", "wasm-timer", ] diff --git a/substrate/client/finality-grandpa/src/communication/mod.rs b/substrate/client/finality-grandpa/src/communication/mod.rs index 3c6a8d7648..52bfdbc818 100644 --- a/substrate/client/finality-grandpa/src/communication/mod.rs +++ b/substrate/client/finality-grandpa/src/communication/mod.rs @@ -451,8 +451,9 @@ impl> Future for NetworkBridge { } match self.gossip_engine.lock().poll_unpin(cx) { - // The gossip engine future finished. We should do the same. - Poll::Ready(()) => return Poll::Ready(Ok(())), + Poll::Ready(()) => return Poll::Ready( + Err(Error::Network("Gossip engine future finished.".into())) + ), Poll::Pending => {}, } diff --git a/substrate/client/finality-grandpa/src/observer.rs b/substrate/client/finality-grandpa/src/observer.rs index 971c213290..07eaf1db9f 100644 --- a/substrate/client/finality-grandpa/src/observer.rs +++ b/substrate/client/finality-grandpa/src/observer.rs @@ -425,21 +425,16 @@ mod tests { tester.trigger_gossip_validator_reputation_change(&peer_id); executor::block_on(async move { + // Poll the observer once and have it forward the reputation change from the gossip + // validator to the test network. + assert!(observer.now_or_never().is_none()); + // Ignore initial event stream request by gossip engine. match tester.events.next().now_or_never() { Some(Some(Event::EventStream(_))) => {}, _ => panic!("expected event stream request"), }; - assert!( - tester.events.next().now_or_never().is_none(), - "expect no further network events", - ); - - // Poll the observer once and have it forward the reputation change from the gossip - // validator to the test network. - assert!(observer.now_or_never().is_none()); - assert_matches!(tester.events.next().now_or_never(), Some(Some(Event::Report(_, _)))); }); } diff --git a/substrate/client/network-gossip/Cargo.toml b/substrate/client/network-gossip/Cargo.toml index c06c7bbd71..57e18124ae 100644 --- a/substrate/client/network-gossip/Cargo.toml +++ b/substrate/client/network-gossip/Cargo.toml @@ -19,3 +19,6 @@ lru = "0.4.3" sc-network = { version = "0.8.0-alpha.4", path = "../network" } sp-runtime = { version = "2.0.0-alpha.4", path = "../../primitives/runtime" } wasm-timer = "0.2" + +[dev-dependencies] +substrate-test-runtime-client = { version = "2.0.0-dev", path = "../../test-utils/runtime/client" } diff --git a/substrate/client/network-gossip/src/bridge.rs b/substrate/client/network-gossip/src/bridge.rs index c06cb6268c..85e06a1d6f 100644 --- a/substrate/client/network-gossip/src/bridge.rs +++ b/substrate/client/network-gossip/src/bridge.rs @@ -148,33 +148,40 @@ impl Future for GossipEngine { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = &mut *self; - while let Poll::Ready(Some(event)) = this.network_event_stream.poll_next_unpin(cx) { - match event { - Event::NotificationStreamOpened { remote, engine_id: msg_engine_id, roles } => { - if msg_engine_id != this.engine_id { - continue; + loop { + match this.network_event_stream.poll_next_unpin(cx) { + Poll::Ready(Some(event)) => match event { + Event::NotificationStreamOpened { remote, engine_id: msg_engine_id, roles } => { + if msg_engine_id != this.engine_id { + continue; + } + this.state_machine.new_peer(&mut *this.network, remote, roles); } - this.state_machine.new_peer(&mut *this.network, remote, roles); + Event::NotificationStreamClosed { remote, engine_id: msg_engine_id } => { + if msg_engine_id != this.engine_id { + continue; + } + this.state_machine.peer_disconnected(&mut *this.network, remote); + }, + Event::NotificationsReceived { remote, messages } => { + let engine_id = this.engine_id.clone(); + this.state_machine.on_incoming( + &mut *this.network, + remote, + messages.into_iter() + .filter_map(|(engine, data)| if engine == engine_id { + Some(ConsensusMessage { + engine_id: engine, data: data.to_vec(), + }) + } else { None }) + .collect() + ); + }, + Event::Dht(_) => {} } - Event::NotificationStreamClosed { remote, engine_id: msg_engine_id } => { - if msg_engine_id != this.engine_id { - continue; - } - this.state_machine.peer_disconnected(&mut *this.network, remote); - }, - Event::NotificationsReceived { remote, messages } => { - let engine_id = this.engine_id.clone(); - this.state_machine.on_incoming( - &mut *this.network, - remote, - messages.into_iter() - .filter_map(|(engine, data)| if engine == engine_id { - Some(ConsensusMessage { engine_id: engine, data: data.to_vec() }) - } else { None }) - .collect() - ); - }, - Event::Dht(_) => {} + // The network event stream closed. Do the same for [`GossipValidator`]. + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => break, } } @@ -186,3 +193,77 @@ impl Future for GossipEngine { Poll::Pending } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ValidationResult, ValidatorContext}; + use substrate_test_runtime_client::runtime::Block; + + struct TestNetwork {} + + impl Network for Arc { + fn event_stream(&self) -> Pin + Send>> { + let (_tx, rx) = futures::channel::mpsc::channel(0); + + // Return rx and drop tx. Thus the given channel will yield `Poll::Ready(None)` on first + // poll. + Box::pin(rx) + } + + fn report_peer(&self, _: PeerId, _: ReputationChange) { + unimplemented!(); + } + + fn disconnect_peer(&self, _: PeerId) { + unimplemented!(); + } + + fn write_notification(&self, _: PeerId, _: ConsensusEngineId, _: Vec) { + unimplemented!(); + } + + fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, [u8]>) {} + + fn announce(&self, _: B::Hash, _: Vec) { + unimplemented!(); + } + } + + struct TestValidator {} + + impl Validator for TestValidator { + fn validate( + &self, + _: &mut dyn ValidatorContext, + _: &PeerId, + _: &[u8] + ) -> ValidationResult { + unimplemented!(); + } + } + + /// Regression test for the case where the `GossipEngine.network_event_stream` closes. One + /// should not ignore a `Poll::Ready(None)` as `poll_next_unpin` will panic on subsequent calls. + /// + /// See https://github.com/paritytech/substrate/issues/5000 for details. + #[test] + fn returns_when_network_event_stream_closes() { + let mut gossip_engine = GossipEngine::::new( + Arc::new(TestNetwork{}), + [1, 2, 3, 4], + "my_protocol".as_bytes(), + Arc::new(TestValidator{}), + ); + + futures::executor::block_on(futures::future::poll_fn(move |ctx| { + if let Poll::Pending = gossip_engine.poll_unpin(ctx) { + panic!( + "Expected gossip engine to finish on first poll, given that \ + `GossipEngine.network_event_stream` closes right away." + ) + } + Poll::Ready(()) + })) + } +}