mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
Rework consensus instance communication with the network worker (#958)
Up to now consensus instances used the main channel to communicate with the background network worker. This lead to a race condition when sending a local collation and dropping the router before driving the send local collation future until it is finished. This pr changes the communication between worker and the instances to use their own channels. This has the advantage that we don't need an extra `DropConsensusNetworking` message as the network is dropped automatically when the last sender is dropped.
This commit is contained in:
@@ -19,6 +19,8 @@
|
||||
//! This manages routing for parachain statements, parachain block and outgoing message
|
||||
//! data fetching, communication between collators and validators, and more.
|
||||
|
||||
#![recursion_limit="256"]
|
||||
|
||||
use polkadot_primitives::{Block, Hash, BlakeTwo256, HashT};
|
||||
|
||||
pub mod legacy;
|
||||
|
||||
@@ -26,7 +26,8 @@ use codec::{Decode, Encode};
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use futures::future::Either;
|
||||
use futures::prelude::*;
|
||||
use futures::task::{Spawn, SpawnExt};
|
||||
use futures::task::{Spawn, SpawnExt, Context, Poll};
|
||||
use futures::stream::{FuturesUnordered, StreamFuture};
|
||||
use log::{debug, trace};
|
||||
|
||||
use polkadot_primitives::{
|
||||
@@ -76,8 +77,7 @@ enum ServiceToWorkerMsg {
|
||||
PeerDisconnected(PeerId),
|
||||
|
||||
// service messages.
|
||||
BuildConsensusNetworking(Arc<SharedTable>, Vec<ValidatorId>),
|
||||
DropConsensusNetworking(Hash),
|
||||
BuildConsensusNetworking(mpsc::Receiver<ServiceToWorkerMsg>, Arc<SharedTable>, Vec<ValidatorId>),
|
||||
SubmitValidatedCollation(
|
||||
AbridgedCandidateReceipt,
|
||||
PoVBlock,
|
||||
@@ -782,6 +782,21 @@ fn send_peer_collations(
|
||||
}
|
||||
}
|
||||
|
||||
/// Receives messages associated to a certain consensus networking instance.
|
||||
struct ConsensusNetworkingReceiver {
|
||||
receiver: mpsc::Receiver<ServiceToWorkerMsg>,
|
||||
/// The relay parent of this consensus network.
|
||||
relay_parent: Hash,
|
||||
}
|
||||
|
||||
impl Stream for ConsensusNetworkingReceiver {
|
||||
type Item = ServiceToWorkerMsg;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
Pin::new(&mut self.receiver).poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
struct Worker<Api, Sp, Gossip> {
|
||||
protocol_handler: ProtocolHandler,
|
||||
api: Arc<Api>,
|
||||
@@ -790,6 +805,7 @@ struct Worker<Api, Sp, Gossip> {
|
||||
background_to_main_sender: mpsc::Sender<BackgroundToWorkerMsg>,
|
||||
background_receiver: mpsc::Receiver<BackgroundToWorkerMsg>,
|
||||
service_receiver: mpsc::Receiver<ServiceToWorkerMsg>,
|
||||
consensus_networking_receivers: FuturesUnordered<StreamFuture<ConsensusNetworkingReceiver>>,
|
||||
}
|
||||
|
||||
impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
|
||||
@@ -801,6 +817,7 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
|
||||
// spawns a background task to spawn consensus networking.
|
||||
fn build_consensus_networking(
|
||||
&mut self,
|
||||
receiver: mpsc::Receiver<ServiceToWorkerMsg>,
|
||||
table: Arc<SharedTable>,
|
||||
authorities: Vec<ValidatorId>,
|
||||
) {
|
||||
@@ -832,6 +849,9 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
|
||||
},
|
||||
);
|
||||
|
||||
let relay_parent = table.signing_context().parent_hash;
|
||||
self.consensus_networking_receivers.push(ConsensusNetworkingReceiver { receiver, relay_parent }.into_future());
|
||||
|
||||
// glue the incoming messages, shared table, and validation
|
||||
// work together.
|
||||
let _ = self.executor.spawn(statement_import_loop(
|
||||
@@ -855,12 +875,8 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
|
||||
ServiceToWorkerMsg::PeerMessage(remote, messages) => {
|
||||
self.protocol_handler.on_raw_messages(remote, messages)
|
||||
}
|
||||
|
||||
ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities) => {
|
||||
self.build_consensus_networking(table, authorities);
|
||||
}
|
||||
ServiceToWorkerMsg::DropConsensusNetworking(relay_parent) => {
|
||||
self.protocol_handler.drop_consensus_networking(&relay_parent);
|
||||
ServiceToWorkerMsg::BuildConsensusNetworking(receiver, table, authorities) => {
|
||||
self.build_consensus_networking(receiver, table, authorities);
|
||||
}
|
||||
ServiceToWorkerMsg::SubmitValidatedCollation(receipt, pov_block, chunks) => {
|
||||
let relay_parent = receipt.relay_parent;
|
||||
@@ -985,6 +1001,16 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
|
||||
Some(msg) => self.handle_service_message(msg),
|
||||
None => return,
|
||||
},
|
||||
consensus_service_msg = self.consensus_networking_receivers.next() => match consensus_service_msg {
|
||||
Some((Some(msg), receiver)) => {
|
||||
self.handle_service_message(msg);
|
||||
self.consensus_networking_receivers.push(receiver.into_future());
|
||||
},
|
||||
Some((None, receiver)) => {
|
||||
self.protocol_handler.drop_consensus_networking(&receiver.relay_parent);
|
||||
},
|
||||
None => {},
|
||||
},
|
||||
background_msg = self.background_receiver.next() => match background_msg {
|
||||
Some(msg) => self.handle_background_message(msg),
|
||||
None => return,
|
||||
@@ -1017,6 +1043,7 @@ async fn worker_loop<Api, Sp>(
|
||||
background_to_main_sender: background_tx,
|
||||
background_receiver: background_rx,
|
||||
service_receiver: receiver,
|
||||
consensus_networking_receivers: Default::default(),
|
||||
};
|
||||
|
||||
worker.main_loop().await
|
||||
@@ -1296,24 +1323,6 @@ struct RouterInner {
|
||||
sender: mpsc::Sender<ServiceToWorkerMsg>,
|
||||
}
|
||||
|
||||
impl Drop for RouterInner {
|
||||
fn drop(&mut self) {
|
||||
let res = self.sender.try_send(
|
||||
ServiceToWorkerMsg::DropConsensusNetworking(self.relay_parent)
|
||||
);
|
||||
|
||||
if let Err(e) = res {
|
||||
assert!(
|
||||
!e.is_full(),
|
||||
"futures 0.3 guarantees at least one free slot in the capacity \
|
||||
per sender; this is the first message sent via this sender; \
|
||||
therefore we will not have to wait for capacity; qed"
|
||||
);
|
||||
// other error variants (disconnection) are fine here.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Service {
|
||||
/// Register an availablility-store that the network can query.
|
||||
pub fn register_availability_store(&self, store: av_store::Store) {
|
||||
@@ -1379,14 +1388,15 @@ impl ParachainNetwork for Service {
|
||||
let relay_parent = table.signing_context().parent_hash.clone();
|
||||
|
||||
Box::pin(async move {
|
||||
let (router_sender, receiver) = mpsc::channel(0);
|
||||
sender.send(
|
||||
ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities)
|
||||
ServiceToWorkerMsg::BuildConsensusNetworking(receiver, table, authorities)
|
||||
).await?;
|
||||
|
||||
Ok(Router {
|
||||
inner: Arc::new(RouterInner {
|
||||
relay_parent,
|
||||
sender,
|
||||
sender: router_sender,
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -54,6 +54,7 @@ type GossipStreamEntry = (mpsc::UnboundedReceiver<TopicNotification>, oneshot::S
|
||||
#[derive(Default, Clone)]
|
||||
struct MockGossip {
|
||||
inner: Arc<Mutex<HashMap<Hash, GossipStreamEntry>>>,
|
||||
gossip_messages: Arc<Mutex<HashMap<Hash, GossipMessage>>>,
|
||||
}
|
||||
|
||||
impl MockGossip {
|
||||
@@ -102,8 +103,8 @@ impl crate::legacy::GossipService for MockGossip {
|
||||
})
|
||||
}
|
||||
|
||||
fn gossip_message(&self, _topic: Hash, _message: GossipMessage) {
|
||||
|
||||
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
|
||||
self.gossip_messages.lock().insert(topic, message);
|
||||
}
|
||||
|
||||
fn send_message(&self, _who: PeerId, _message: GossipMessage) {
|
||||
@@ -250,22 +251,6 @@ fn test_setup(config: Config) -> (
|
||||
(service, mock_gossip, pool, worker_task)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn router_inner_drop_sends_worker_message() {
|
||||
let parent = [1; 32].into();
|
||||
|
||||
let (sender, mut receiver) = mpsc::channel(0);
|
||||
drop(RouterInner {
|
||||
relay_parent: parent,
|
||||
sender,
|
||||
});
|
||||
|
||||
match receiver.try_next() {
|
||||
Ok(Some(ServiceToWorkerMsg::DropConsensusNetworking(x))) => assert_eq!(parent, x),
|
||||
_ => panic!("message not sent"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn worker_task_shuts_down_when_sender_dropped() {
|
||||
let (service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
|
||||
@@ -274,6 +259,30 @@ fn worker_task_shuts_down_when_sender_dropped() {
|
||||
let _ = pool.run_until(worker_task);
|
||||
}
|
||||
|
||||
/// Given the async nature of `select!` that is being used in the main loop of the worker
|
||||
/// and that consensus instances use their own channels, we don't know when the synchronize message
|
||||
/// is handled. This helper functions checks multiple times that the given instance is dropped. Even
|
||||
/// if the first round fails, the second one should be successful as the consensus instance drop
|
||||
/// should be already handled this time.
|
||||
fn wait_for_instance_drop(service: &mut Service, pool: &mut LocalPool, instance: Hash) {
|
||||
let mut try_counter = 0;
|
||||
let max_tries = 3;
|
||||
|
||||
while try_counter < max_tries {
|
||||
let dropped = pool.run_until(service.synchronize(move |proto| {
|
||||
!proto.consensus_instances.contains_key(&instance)
|
||||
}));
|
||||
|
||||
if dropped {
|
||||
return;
|
||||
}
|
||||
|
||||
try_counter += 1;
|
||||
}
|
||||
|
||||
panic!("Consensus instance `{}` wasn't dropped!", instance);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn consensus_instances_cleaned_up() {
|
||||
let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
|
||||
@@ -300,11 +309,61 @@ fn consensus_instances_cleaned_up() {
|
||||
|
||||
drop(router);
|
||||
|
||||
wait_for_instance_drop(&mut service, &mut pool, relay_parent);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collation_is_received_with_dropped_router() {
|
||||
let (mut service, gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
|
||||
let relay_parent = [0; 32].into();
|
||||
let topic = crate::legacy::gossip::attestation_topic(relay_parent);
|
||||
|
||||
let signing_context = SigningContext {
|
||||
session_index: Default::default(),
|
||||
parent_hash: relay_parent,
|
||||
};
|
||||
let table = Arc::new(SharedTable::new(
|
||||
vec![Sr25519Keyring::Alice.public().into()],
|
||||
HashMap::new(),
|
||||
Some(Arc::new(Sr25519Keyring::Alice.pair().into())),
|
||||
signing_context,
|
||||
AvailabilityStore::new_in_memory(service.clone()),
|
||||
None,
|
||||
));
|
||||
|
||||
pool.spawner().spawn_local(worker_task).unwrap();
|
||||
|
||||
let router = pool.run_until(
|
||||
service.build_table_router(table, &[])
|
||||
).unwrap();
|
||||
|
||||
let receipt = AbridgedCandidateReceipt { relay_parent, ..Default::default() };
|
||||
let local_collation_future = router.local_collation(
|
||||
receipt,
|
||||
PoVBlock { block_data: BlockData(Vec::new()) },
|
||||
(0, &[]),
|
||||
);
|
||||
|
||||
// Drop the router and make sure that the consensus instance is still alive
|
||||
drop(router);
|
||||
|
||||
assert!(pool.run_until(service.synchronize(move |proto| {
|
||||
!proto.consensus_instances.contains_key(&relay_parent)
|
||||
proto.consensus_instances.contains_key(&relay_parent)
|
||||
})));
|
||||
|
||||
// The gossip message should still be unknown
|
||||
assert!(!gossip.gossip_messages.lock().contains_key(&topic));
|
||||
|
||||
pool.run_until(local_collation_future).unwrap();
|
||||
|
||||
// Make sure the instance is now dropped and the message was gossiped
|
||||
wait_for_instance_drop(&mut service, &mut pool, relay_parent);
|
||||
assert!(pool.run_until(service.synchronize(move |_| {
|
||||
gossip.gossip_messages.lock().contains_key(&topic)
|
||||
})));
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn validator_peer_cleaned_up() {
|
||||
let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
|
||||
|
||||
@@ -75,9 +75,10 @@ pub type ValidatorId = validator_app::Public;
|
||||
/// Index of the validator is used as a lightweight replacement of the `ValidatorId` when appropriate.
|
||||
pub type ValidatorIndex = u32;
|
||||
|
||||
/// A Parachain validator keypair.
|
||||
#[cfg(feature = "std")]
|
||||
pub type ValidatorPair = validator_app::Pair;
|
||||
application_crypto::with_pair! {
|
||||
/// A Parachain validator keypair.
|
||||
pub type ValidatorPair = validator_app::Pair;
|
||||
}
|
||||
|
||||
/// Signature with which parachain validators sign blocks.
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user