mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-17 03:11:01 +00:00
Make communication_for exit when we end a round (#313)
* Make `communication_for` exit when we end a round * Fix compilation
This commit is contained in:
@@ -330,6 +330,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
|
|||||||
&self,
|
&self,
|
||||||
table: Arc<SharedTable>,
|
table: Arc<SharedTable>,
|
||||||
authorities: &[ValidatorId],
|
authorities: &[ValidatorId],
|
||||||
|
exit: exit_future::Exit,
|
||||||
) -> Self::BuildTableRouter {
|
) -> Self::BuildTableRouter {
|
||||||
let parent_hash = *table.consensus_parent_hash();
|
let parent_hash = *table.consensus_parent_hash();
|
||||||
let local_session_key = table.session_key();
|
let local_session_key = table.session_key();
|
||||||
@@ -354,7 +355,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
|
|||||||
let table_router_clone = table_router.clone();
|
let table_router_clone = table_router.clone();
|
||||||
let work = table_router.checked_statements()
|
let work = table_router.checked_statements()
|
||||||
.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) });
|
.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) });
|
||||||
executor.spawn(work);
|
executor.spawn(work.select(exit).map(|_| ()).map_err(|_| ()));
|
||||||
|
|
||||||
table_router
|
table_router
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -22,9 +22,7 @@ use parity_codec::{Decode, HasCompact};
|
|||||||
use srml_support::{decl_storage, decl_module, fail, ensure};
|
use srml_support::{decl_storage, decl_module, fail, ensure};
|
||||||
|
|
||||||
use bitvec::{bitvec, BigEndian};
|
use bitvec::{bitvec, BigEndian};
|
||||||
use sr_primitives::traits::{
|
use sr_primitives::traits::{Hash as HashT, BlakeTwo256, Member, CheckedConversion, Saturating, One};
|
||||||
Hash as HashT, BlakeTwo256, Member, CheckedConversion, Saturating, One, Zero,
|
|
||||||
};
|
|
||||||
use primitives::{Hash, Balance, parachain::{
|
use primitives::{Hash, Balance, parachain::{
|
||||||
self, Id as ParaId, Chain, DutyRoster, AttestedCandidate, Statement, AccountIdConversion,
|
self, Id as ParaId, Chain, DutyRoster, AttestedCandidate, Statement, AccountIdConversion,
|
||||||
ParachainDispatchOrigin, UpwardMessage, BlockIngressRoots,
|
ParachainDispatchOrigin, UpwardMessage, BlockIngressRoots,
|
||||||
@@ -243,9 +241,11 @@ decl_storage! {
|
|||||||
config(parachains): Vec<(ParaId, Vec<u8>, Vec<u8>)>;
|
config(parachains): Vec<(ParaId, Vec<u8>, Vec<u8>)>;
|
||||||
config(_phdata): PhantomData<T>;
|
config(_phdata): PhantomData<T>;
|
||||||
build(|storage: &mut StorageOverlay, _: &mut ChildrenStorageOverlay, config: &GenesisConfig<T>| {
|
build(|storage: &mut StorageOverlay, _: &mut ChildrenStorageOverlay, config: &GenesisConfig<T>| {
|
||||||
|
use sr_primitives::traits::Zero;
|
||||||
|
|
||||||
let mut p = config.parachains.clone();
|
let mut p = config.parachains.clone();
|
||||||
p.sort_unstable_by_key(|&(ref id, _, _)| id.clone());
|
p.sort_unstable_by_key(|&(ref id, _, _)| *id);
|
||||||
p.dedup_by_key(|&mut (ref id, _, _)| id.clone());
|
p.dedup_by_key(|&mut (ref id, _, _)| *id);
|
||||||
|
|
||||||
let only_ids: Vec<_> = p.iter().map(|&(ref id, _, _)| id).cloned().collect();
|
let only_ids: Vec<_> = p.iter().map(|&(ref id, _, _)| id).cloned().collect();
|
||||||
|
|
||||||
|
|||||||
@@ -147,6 +147,7 @@ pub trait Network {
|
|||||||
&self,
|
&self,
|
||||||
table: Arc<SharedTable>,
|
table: Arc<SharedTable>,
|
||||||
authorities: &[SessionKey],
|
authorities: &[SessionKey],
|
||||||
|
exit: exit_future::Exit,
|
||||||
) -> Self::BuildTableRouter;
|
) -> Self::BuildTableRouter;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -313,11 +314,14 @@ impl<C, N, P> ParachainValidation<C, N, P> where
|
|||||||
let (group_info, local_duty) = make_group_info(
|
let (group_info, local_duty) = make_group_info(
|
||||||
duty_roster,
|
duty_roster,
|
||||||
&authorities,
|
&authorities,
|
||||||
sign_with.public().into(),
|
sign_with.public(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
info!("Starting parachain attestation session on top of parent {:?}. Local parachain duty is {:?}",
|
info!(
|
||||||
parent_hash, local_duty.validation);
|
"Starting parachain attestation session on top of parent {:?}. Local parachain duty is {:?}",
|
||||||
|
parent_hash,
|
||||||
|
local_duty.validation,
|
||||||
|
);
|
||||||
|
|
||||||
let active_parachains = self.client.runtime_api().active_parachains(&id)?;
|
let active_parachains = self.client.runtime_api().active_parachains(&id)?;
|
||||||
|
|
||||||
@@ -331,25 +335,23 @@ impl<C, N, P> ParachainValidation<C, N, P> where
|
|||||||
self.extrinsic_store.clone(),
|
self.extrinsic_store.clone(),
|
||||||
max_block_data_size,
|
max_block_data_size,
|
||||||
));
|
));
|
||||||
|
|
||||||
|
let (_drop_signal, exit) = exit_future::signal();
|
||||||
|
|
||||||
let router = self.network.communication_for(
|
let router = self.network.communication_for(
|
||||||
table.clone(),
|
table.clone(),
|
||||||
&authorities,
|
&authorities,
|
||||||
|
exit.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let drop_signal = match local_duty.validation {
|
if let Chain::Parachain(id) = local_duty.validation {
|
||||||
Chain::Parachain(id) => Some(self.launch_work(
|
self.launch_work(parent_hash, id, router, max_block_data_size, exit);
|
||||||
parent_hash,
|
}
|
||||||
id,
|
|
||||||
router,
|
|
||||||
max_block_data_size,
|
|
||||||
)),
|
|
||||||
Chain::Relay => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let tracker = Arc::new(AttestationTracker {
|
let tracker = Arc::new(AttestationTracker {
|
||||||
table,
|
table,
|
||||||
started: Instant::now(),
|
started: Instant::now(),
|
||||||
_drop_signal: drop_signal
|
_drop_signal,
|
||||||
});
|
});
|
||||||
|
|
||||||
live_instances.insert(parent_hash, tracker.clone());
|
live_instances.insert(parent_hash, tracker.clone());
|
||||||
@@ -369,10 +371,10 @@ impl<C, N, P> ParachainValidation<C, N, P> where
|
|||||||
validation_para: ParaId,
|
validation_para: ParaId,
|
||||||
build_router: N::BuildTableRouter,
|
build_router: N::BuildTableRouter,
|
||||||
max_block_data_size: Option<u64>,
|
max_block_data_size: Option<u64>,
|
||||||
) -> exit_future::Signal {
|
exit: exit_future::Exit,
|
||||||
|
) {
|
||||||
use extrinsic_store::Data;
|
use extrinsic_store::Data;
|
||||||
|
|
||||||
let (signal, exit) = exit_future::signal();
|
|
||||||
let (collators, client) = (self.collators.clone(), self.client.clone());
|
let (collators, client) = (self.collators.clone(), self.client.clone());
|
||||||
let extrinsic_store = self.extrinsic_store.clone();
|
let extrinsic_store = self.extrinsic_store.clone();
|
||||||
|
|
||||||
@@ -428,16 +430,15 @@ impl<C, N, P> ParachainValidation<C, N, P> where
|
|||||||
.then(|_| Ok(()));
|
.then(|_| Ok(()));
|
||||||
|
|
||||||
// spawn onto thread pool.
|
// spawn onto thread pool.
|
||||||
if let Err(_) = self.handle.execute(Box::new(cancellable_work)) {
|
if self.handle.execute(Box::new(cancellable_work)).is_err() {
|
||||||
error!("Failed to spawn cancellable work task");
|
error!("Failed to spawn cancellable work task");
|
||||||
}
|
}
|
||||||
signal
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parachain validation for a single block.
|
/// Parachain validation for a single block.
|
||||||
struct AttestationTracker {
|
struct AttestationTracker {
|
||||||
_drop_signal: Option<exit_future::Signal>,
|
_drop_signal: exit_future::Signal,
|
||||||
table: Arc<SharedTable>,
|
table: Arc<SharedTable>,
|
||||||
started: Instant,
|
started: Instant,
|
||||||
}
|
}
|
||||||
@@ -544,7 +545,7 @@ impl<C, N, P, SC, TxApi> consensus::Environment<Block> for ProposerFactory<C, N,
|
|||||||
parent_id,
|
parent_id,
|
||||||
parent_number: parent_header.number,
|
parent_number: parent_header.number,
|
||||||
transaction_pool: self.transaction_pool.clone(),
|
transaction_pool: self.transaction_pool.clone(),
|
||||||
slot_duration: self.aura_slot_duration.clone(),
|
slot_duration: self.aura_slot_duration,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user