mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 03:01:07 +00:00
Keep the table router for the lifetime of the validation instance alive (#1175)
This pr ensures that the table router stays alive for the lifetime of the validation instance. This is required to ensure that the node responds onto gossip messages for the particular relay chain round. Before, the table router was only kept alive for relay chain nodes that were assigned to a Parachain, however the lifetime was also relative short. This lead to bugs where a relay chain node did not include PoVBlock's, because it did not receive them (rejected them on receive, because it was not listening on the particular round).
This commit is contained in:
@@ -503,7 +503,10 @@ impl ProtocolHandler {
|
||||
}
|
||||
|
||||
fn on_connect(&mut self, peer: PeerId, role: ObservedRole) {
|
||||
let claimed_validator = matches!(role, ObservedRole::OurSentry | ObservedRole::OurGuardedAuthority | ObservedRole::Authority);
|
||||
let claimed_validator = matches!(
|
||||
role,
|
||||
ObservedRole::OurSentry | ObservedRole::OurGuardedAuthority | ObservedRole::Authority
|
||||
);
|
||||
|
||||
self.peers.insert(peer.clone(), PeerData {
|
||||
claimed_validator,
|
||||
@@ -978,7 +981,11 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
|
||||
);
|
||||
}
|
||||
ServiceToWorkerMsg::AwaitCollation(relay_parent, para_id, sender) => {
|
||||
debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent);
|
||||
debug!(
|
||||
target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}",
|
||||
para_id,
|
||||
relay_parent,
|
||||
);
|
||||
self.protocol_handler.await_collation(relay_parent, para_id, sender)
|
||||
}
|
||||
ServiceToWorkerMsg::NoteBadCollator(collator) => {
|
||||
|
||||
@@ -47,6 +47,9 @@ pub enum Error {
|
||||
/// Proposer destroyed before finishing proposing or evaluating
|
||||
#[display(fmt = "Proposer destroyed before finishing proposing or evaluating")]
|
||||
PrematureDestruction,
|
||||
/// Failed to build the table router.
|
||||
#[display(fmt = "Failed to build the table router: {}", _0)]
|
||||
CouldNotBuildTableRouter(String),
|
||||
/// Timer failed
|
||||
#[display(fmt = "Timer failed: {}", _0)]
|
||||
Timer(std::io::Error),
|
||||
|
||||
@@ -208,7 +208,7 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
|
||||
relay_parent,
|
||||
&keystore,
|
||||
max_block_data_size,
|
||||
));
|
||||
).await);
|
||||
}
|
||||
Message::NotifyImport(notification) => {
|
||||
let relay_parent = notification.hash;
|
||||
@@ -217,7 +217,7 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
|
||||
relay_parent,
|
||||
&keystore,
|
||||
max_block_data_size,
|
||||
);
|
||||
).await;
|
||||
|
||||
if let Err(e) = res {
|
||||
warn!(
|
||||
@@ -299,8 +299,17 @@ fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc
|
||||
.map(|pair| Arc::new(pair))
|
||||
}
|
||||
|
||||
/// A live instance that is related to a relay chain validation round.
|
||||
///
|
||||
/// It stores the `instance_handle` and the `_table_router`.
|
||||
struct LiveInstance<TR> {
|
||||
instance_handle: ValidationInstanceHandle,
|
||||
/// Make sure we keep the table router alive, to respond/receive consensus messages.
|
||||
_table_router: TR,
|
||||
}
|
||||
|
||||
/// Constructs parachain-agreement instances.
|
||||
pub(crate) struct ParachainValidationInstances<N, P, SP, CF> {
|
||||
pub(crate) struct ParachainValidationInstances<N: Network, P, SP, CF> {
|
||||
/// The client instance.
|
||||
client: Arc<P>,
|
||||
/// The backing network handle.
|
||||
@@ -311,7 +320,7 @@ pub(crate) struct ParachainValidationInstances<N, P, SP, CF> {
|
||||
availability_store: AvailabilityStore,
|
||||
/// Live agreements. Maps relay chain parent hashes to attestation
|
||||
/// instances.
|
||||
live_instances: HashMap<Hash, ValidationInstanceHandle>,
|
||||
live_instances: HashMap<Hash, LiveInstance<N::TableRouter>>,
|
||||
/// The underlying validation pool of processes to use.
|
||||
/// Only `None` in tests.
|
||||
validation_pool: Option<ValidationPool>,
|
||||
@@ -339,7 +348,7 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
|
||||
///
|
||||
/// Additionally, this will trigger broadcast of data to the new block's duty
|
||||
/// roster.
|
||||
fn get_or_instantiate(
|
||||
async fn get_or_instantiate(
|
||||
&mut self,
|
||||
parent_hash: Hash,
|
||||
keystore: &KeyStorePtr,
|
||||
@@ -347,8 +356,8 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
|
||||
) -> Result<ValidationInstanceHandle, Error> {
|
||||
use primitives::Pair;
|
||||
|
||||
if let Some(tracker) = self.live_instances.get(&parent_hash) {
|
||||
return Ok(tracker.clone());
|
||||
if let Some(instance) = self.live_instances.get(&parent_hash) {
|
||||
return Ok(instance.instance_handle.clone());
|
||||
}
|
||||
|
||||
let id = BlockId::hash(parent_hash);
|
||||
@@ -417,41 +426,39 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
|
||||
self.validation_pool.clone(),
|
||||
));
|
||||
|
||||
let build_router = self.network.build_table_router(
|
||||
// The router will join the consensus gossip network. This is important
|
||||
// to receive messages sent for the current round.
|
||||
let router = match self.network.build_table_router(
|
||||
table.clone(),
|
||||
&validators,
|
||||
);
|
||||
).await {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
warn!(target: "validation", "Failed to build router: {:?}", e);
|
||||
return Err(Error::CouldNotBuildTableRouter(format!("{:?}", e)))
|
||||
}
|
||||
};
|
||||
|
||||
let availability_store = self.availability_store.clone();
|
||||
let client = self.client.clone();
|
||||
let collation_fetch = self.collation_fetch.clone();
|
||||
|
||||
let res = self.spawner.spawn(async move {
|
||||
// It is important that we build the router as it launches tasks internally
|
||||
// that are required to receive gossip messages.
|
||||
let router = match build_router.await {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
warn!(target: "validation", "Failed to build router: {:?}", e);
|
||||
return
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((Chain::Parachain(id), index)) = local_duty.map(|d| (d.validation, d.index)) {
|
||||
let n_validators = validators.len();
|
||||
if let Some((Chain::Parachain(id), index)) = local_duty.map(|d| (d.validation, d.index)) {
|
||||
let n_validators = validators.len();
|
||||
let availability_store = self.availability_store.clone();
|
||||
let client = self.client.clone();
|
||||
let collation_fetch = self.collation_fetch.clone();
|
||||
let router = router.clone();
|
||||
|
||||
let res = self.spawner.spawn(
|
||||
launch_work(
|
||||
move || collation_fetch.collation_fetch(id, parent_hash, client, max_block_data_size, n_validators),
|
||||
availability_store,
|
||||
router,
|
||||
n_validators,
|
||||
index,
|
||||
).await;
|
||||
}
|
||||
});
|
||||
),
|
||||
);
|
||||
|
||||
if let Err(e) = res {
|
||||
error!(target: "validation", "Failed to create router and launch work: {:?}", e);
|
||||
if let Err(e) = res {
|
||||
error!(target: "validation", "Failed to launch work: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
let tracker = ValidationInstanceHandle {
|
||||
@@ -459,7 +466,11 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
|
||||
started: Instant::now(),
|
||||
};
|
||||
|
||||
self.live_instances.insert(parent_hash, tracker.clone());
|
||||
let live_instance = LiveInstance {
|
||||
instance_handle: tracker.clone(),
|
||||
_table_router: router,
|
||||
};
|
||||
self.live_instances.insert(parent_hash, live_instance);
|
||||
|
||||
Ok(tracker)
|
||||
}
|
||||
@@ -721,8 +732,9 @@ mod tests {
|
||||
validation_pool: None,
|
||||
};
|
||||
|
||||
parachain_validation.get_or_instantiate(Default::default(), &keystore, None)
|
||||
executor::block_on(parachain_validation.get_or_instantiate(Default::default(), &keystore, None))
|
||||
.expect("Creates new validation round");
|
||||
assert!(parachain_validation.live_instances.contains_key(&Default::default()));
|
||||
|
||||
let mut events = executor::block_on_stream(events);
|
||||
|
||||
@@ -760,8 +772,9 @@ mod tests {
|
||||
validation_pool: None,
|
||||
};
|
||||
|
||||
parachain_validation.get_or_instantiate(Default::default(), &keystore, None)
|
||||
executor::block_on(parachain_validation.get_or_instantiate(Default::default(), &keystore, None))
|
||||
.expect("Creates new validation round");
|
||||
assert!(parachain_validation.live_instances.contains_key(&Default::default()));
|
||||
|
||||
let mut events = executor::block_on_stream(events);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user