mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 21:01:02 +00:00
client/network-gossip/src/bridge: Finish when network event stream closes (#5282)
* client/network-gossip/src/bridge: Finish when network even stream closes Previously within `<GossipEngine as Future>::poll` one would poll the `network_event_stream` ignoring all messages other than `Poll::Ready(Some())`. Ignoring `Poll::Ready(None)` leads to a panic on the next poll of the stream, gien that it is not fused. By design `network_event_stream` does not close unless an unbounded send into it fails, or the `NetworkWorker` gets shut down. > The stream never ends (unless the `NetworkWorker` gets shut down). > (client/network/src/service.rs) An `unbounded_send` to fail on an unbounded channel is unlikely. The `NetworkWorker` shutting down is not unlikely. In such case the `GossipEngine` should shut down as well. With this patch a `<GossipEngine as Future>` finishes on `Poll::Ready(None)` returned from `network_event_stream`. * client/finality-grandpa/communication: Error on gossip engine finished Have `<NetworkBridge as Future>::poll` return `Poll::Ready(Err)` instead of `Poll::Ready(Ok)` to be consistent with the handling of the neighbor packet worker stream and the gossip validator report stream. Both `Err` as well as `Ok` shut down the `NetworkBridge` as well as the `VoterWorker`. * client/network-gossip/src/bridge: Add regression test * client/network-gossip: Move substrate test client to dev dependencies * client/network-gossip: Remove TODO Addressed in a follow up pull request. * client/network-gossip/bridge: Put match on newline after loop * client/finality-grandpa/src/observer: Fix regression test Make sure the event stream sender side is not dropped till the end.
This commit is contained in:
Generated
+1
@@ -6274,6 +6274,7 @@ dependencies = [
|
|||||||
"lru",
|
"lru",
|
||||||
"sc-network",
|
"sc-network",
|
||||||
"sp-runtime",
|
"sp-runtime",
|
||||||
|
"substrate-test-runtime-client",
|
||||||
"wasm-timer",
|
"wasm-timer",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@@ -451,8 +451,9 @@ impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
match self.gossip_engine.lock().poll_unpin(cx) {
|
match self.gossip_engine.lock().poll_unpin(cx) {
|
||||||
// The gossip engine future finished. We should do the same.
|
Poll::Ready(()) => return Poll::Ready(
|
||||||
Poll::Ready(()) => return Poll::Ready(Ok(())),
|
Err(Error::Network("Gossip engine future finished.".into()))
|
||||||
|
),
|
||||||
Poll::Pending => {},
|
Poll::Pending => {},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -425,21 +425,16 @@ mod tests {
|
|||||||
tester.trigger_gossip_validator_reputation_change(&peer_id);
|
tester.trigger_gossip_validator_reputation_change(&peer_id);
|
||||||
|
|
||||||
executor::block_on(async move {
|
executor::block_on(async move {
|
||||||
|
// Poll the observer once and have it forward the reputation change from the gossip
|
||||||
|
// validator to the test network.
|
||||||
|
assert!(observer.now_or_never().is_none());
|
||||||
|
|
||||||
// Ignore initial event stream request by gossip engine.
|
// Ignore initial event stream request by gossip engine.
|
||||||
match tester.events.next().now_or_never() {
|
match tester.events.next().now_or_never() {
|
||||||
Some(Some(Event::EventStream(_))) => {},
|
Some(Some(Event::EventStream(_))) => {},
|
||||||
_ => panic!("expected event stream request"),
|
_ => panic!("expected event stream request"),
|
||||||
};
|
};
|
||||||
|
|
||||||
assert!(
|
|
||||||
tester.events.next().now_or_never().is_none(),
|
|
||||||
"expect no further network events",
|
|
||||||
);
|
|
||||||
|
|
||||||
// Poll the observer once and have it forward the reputation change from the gossip
|
|
||||||
// validator to the test network.
|
|
||||||
assert!(observer.now_or_never().is_none());
|
|
||||||
|
|
||||||
assert_matches!(tester.events.next().now_or_never(), Some(Some(Event::Report(_, _))));
|
assert_matches!(tester.events.next().now_or_never(), Some(Some(Event::Report(_, _))));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,3 +19,6 @@ lru = "0.4.3"
|
|||||||
sc-network = { version = "0.8.0-alpha.4", path = "../network" }
|
sc-network = { version = "0.8.0-alpha.4", path = "../network" }
|
||||||
sp-runtime = { version = "2.0.0-alpha.4", path = "../../primitives/runtime" }
|
sp-runtime = { version = "2.0.0-alpha.4", path = "../../primitives/runtime" }
|
||||||
wasm-timer = "0.2"
|
wasm-timer = "0.2"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
substrate-test-runtime-client = { version = "2.0.0-dev", path = "../../test-utils/runtime/client" }
|
||||||
|
|||||||
@@ -148,8 +148,9 @@ impl<B: BlockT> Future for GossipEngine<B> {
|
|||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
let this = &mut *self;
|
let this = &mut *self;
|
||||||
|
|
||||||
while let Poll::Ready(Some(event)) = this.network_event_stream.poll_next_unpin(cx) {
|
loop {
|
||||||
match event {
|
match this.network_event_stream.poll_next_unpin(cx) {
|
||||||
|
Poll::Ready(Some(event)) => match event {
|
||||||
Event::NotificationStreamOpened { remote, engine_id: msg_engine_id, roles } => {
|
Event::NotificationStreamOpened { remote, engine_id: msg_engine_id, roles } => {
|
||||||
if msg_engine_id != this.engine_id {
|
if msg_engine_id != this.engine_id {
|
||||||
continue;
|
continue;
|
||||||
@@ -169,13 +170,19 @@ impl<B: BlockT> Future for GossipEngine<B> {
|
|||||||
remote,
|
remote,
|
||||||
messages.into_iter()
|
messages.into_iter()
|
||||||
.filter_map(|(engine, data)| if engine == engine_id {
|
.filter_map(|(engine, data)| if engine == engine_id {
|
||||||
Some(ConsensusMessage { engine_id: engine, data: data.to_vec() })
|
Some(ConsensusMessage {
|
||||||
|
engine_id: engine, data: data.to_vec(),
|
||||||
|
})
|
||||||
} else { None })
|
} else { None })
|
||||||
.collect()
|
.collect()
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
Event::Dht(_) => {}
|
Event::Dht(_) => {}
|
||||||
}
|
}
|
||||||
|
// The network event stream closed. Do the same for [`GossipValidator`].
|
||||||
|
Poll::Ready(None) => return Poll::Ready(()),
|
||||||
|
Poll::Pending => break,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
while let Poll::Ready(()) = this.periodic_maintenance_interval.poll_unpin(cx) {
|
while let Poll::Ready(()) = this.periodic_maintenance_interval.poll_unpin(cx) {
|
||||||
@@ -186,3 +193,77 @@ impl<B: BlockT> Future for GossipEngine<B> {
|
|||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::{ValidationResult, ValidatorContext};
|
||||||
|
use substrate_test_runtime_client::runtime::Block;
|
||||||
|
|
||||||
|
struct TestNetwork {}
|
||||||
|
|
||||||
|
impl<B: BlockT> Network<B> for Arc<TestNetwork> {
|
||||||
|
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
|
||||||
|
let (_tx, rx) = futures::channel::mpsc::channel(0);
|
||||||
|
|
||||||
|
// Return rx and drop tx. Thus the given channel will yield `Poll::Ready(None)` on first
|
||||||
|
// poll.
|
||||||
|
Box::pin(rx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn report_peer(&self, _: PeerId, _: ReputationChange) {
|
||||||
|
unimplemented!();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn disconnect_peer(&self, _: PeerId) {
|
||||||
|
unimplemented!();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_notification(&self, _: PeerId, _: ConsensusEngineId, _: Vec<u8>) {
|
||||||
|
unimplemented!();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, [u8]>) {}
|
||||||
|
|
||||||
|
fn announce(&self, _: B::Hash, _: Vec<u8>) {
|
||||||
|
unimplemented!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TestValidator {}
|
||||||
|
|
||||||
|
impl<B: BlockT> Validator<B> for TestValidator {
|
||||||
|
fn validate(
|
||||||
|
&self,
|
||||||
|
_: &mut dyn ValidatorContext<B>,
|
||||||
|
_: &PeerId,
|
||||||
|
_: &[u8]
|
||||||
|
) -> ValidationResult<B::Hash> {
|
||||||
|
unimplemented!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Regression test for the case where the `GossipEngine.network_event_stream` closes. One
|
||||||
|
/// should not ignore a `Poll::Ready(None)` as `poll_next_unpin` will panic on subsequent calls.
|
||||||
|
///
|
||||||
|
/// See https://github.com/paritytech/substrate/issues/5000 for details.
|
||||||
|
#[test]
|
||||||
|
fn returns_when_network_event_stream_closes() {
|
||||||
|
let mut gossip_engine = GossipEngine::<Block>::new(
|
||||||
|
Arc::new(TestNetwork{}),
|
||||||
|
[1, 2, 3, 4],
|
||||||
|
"my_protocol".as_bytes(),
|
||||||
|
Arc::new(TestValidator{}),
|
||||||
|
);
|
||||||
|
|
||||||
|
futures::executor::block_on(futures::future::poll_fn(move |ctx| {
|
||||||
|
if let Poll::Pending = gossip_engine.poll_unpin(ctx) {
|
||||||
|
panic!(
|
||||||
|
"Expected gossip engine to finish on first poll, given that \
|
||||||
|
`GossipEngine.network_event_stream` closes right away."
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Poll::Ready(())
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user