mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 12:51:02 +00:00
ensure all spawned futures are exit-guarded (#59)
This commit is contained in:
committed by
GitHub
parent
5e04c22522
commit
b2db7857a7
@@ -246,7 +246,7 @@ impl<P, E> IntoExit for CollationNode<P, E> where
|
|||||||
|
|
||||||
impl<P, E> Worker for CollationNode<P, E> where
|
impl<P, E> Worker for CollationNode<P, E> where
|
||||||
P: ParachainContext + Send + 'static,
|
P: ParachainContext + Send + 'static,
|
||||||
E: Future<Item=(),Error=()> + Send + 'static
|
E: Future<Item=(),Error=()> + Clone + Send + 'static
|
||||||
{
|
{
|
||||||
type Work = Box<Future<Item=(),Error=()> + Send>;
|
type Work = Box<Future<Item=(),Error=()> + Send>;
|
||||||
|
|
||||||
@@ -267,6 +267,7 @@ impl<P, E> Worker for CollationNode<P, E> where
|
|||||||
let client = service.client();
|
let client = service.client();
|
||||||
let network = service.network();
|
let network = service.network();
|
||||||
|
|
||||||
|
let inner_exit = exit.clone();
|
||||||
let work = client.import_notification_stream()
|
let work = client.import_notification_stream()
|
||||||
.for_each(move |notification| {
|
.for_each(move |notification| {
|
||||||
macro_rules! try_fr {
|
macro_rules! try_fr {
|
||||||
@@ -325,7 +326,7 @@ impl<P, E> Worker for CollationNode<P, E> where
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
tokio::spawn(silenced);
|
tokio::spawn(silenced.select(inner_exit.clone()).then(|_| Ok(())));
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -137,28 +137,31 @@ pub(crate) fn start<C, N, P>(
|
|||||||
let consensus = parachain_consensus.clone();
|
let consensus = parachain_consensus.clone();
|
||||||
let key = key.clone();
|
let key = key.clone();
|
||||||
|
|
||||||
client.import_notification_stream().for_each(move |notification| {
|
client.import_notification_stream()
|
||||||
let parent_hash = notification.hash;
|
.for_each(move |notification| {
|
||||||
if notification.is_new_best {
|
let parent_hash = notification.hash;
|
||||||
let res = client
|
if notification.is_new_best {
|
||||||
.runtime_api()
|
let res = client
|
||||||
.authorities(&BlockId::hash(parent_hash))
|
.runtime_api()
|
||||||
.map_err(Into::into)
|
.authorities(&BlockId::hash(parent_hash))
|
||||||
.and_then(|authorities| {
|
.map_err(Into::into)
|
||||||
consensus.get_or_instantiate(
|
.and_then(|authorities| {
|
||||||
parent_hash,
|
consensus.get_or_instantiate(
|
||||||
&authorities,
|
parent_hash,
|
||||||
key.clone(),
|
&authorities,
|
||||||
)
|
key.clone(),
|
||||||
});
|
)
|
||||||
|
});
|
||||||
|
|
||||||
if let Err(e) = res {
|
if let Err(e) = res {
|
||||||
warn!("Unable to start parachain consensus on top of {:?}: {}",
|
warn!("Unable to start parachain consensus on top of {:?}: {}",
|
||||||
parent_hash, e);
|
parent_hash, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
Ok(())
|
||||||
Ok(())
|
})
|
||||||
})
|
.select(exit.clone())
|
||||||
|
.then(|_| Ok(()))
|
||||||
};
|
};
|
||||||
|
|
||||||
let prune_old_sessions = {
|
let prune_old_sessions = {
|
||||||
@@ -180,6 +183,8 @@ pub(crate) fn start<C, N, P>(
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
.map_err(|e| warn!("Timer error {:?}", e))
|
.map_err(|e| warn!("Timer error {:?}", e))
|
||||||
|
.select(exit.clone())
|
||||||
|
.then(|_| Ok(()))
|
||||||
};
|
};
|
||||||
|
|
||||||
runtime.spawn(notifications);
|
runtime.spawn(notifications);
|
||||||
|
|||||||
@@ -41,14 +41,17 @@ use router::Router;
|
|||||||
|
|
||||||
// task that processes all gossipped consensus messages,
|
// task that processes all gossipped consensus messages,
|
||||||
// checking signatures
|
// checking signatures
|
||||||
struct MessageProcessTask<P> {
|
struct MessageProcessTask<P, E> {
|
||||||
inner_stream: mpsc::UnboundedReceiver<ConsensusMessage>,
|
inner_stream: mpsc::UnboundedReceiver<ConsensusMessage>,
|
||||||
parent_hash: Hash,
|
parent_hash: Hash,
|
||||||
table_router: Router<P>,
|
table_router: Router<P>,
|
||||||
|
exit: E,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<P: ProvideRuntimeApi + Send + Sync + 'static> MessageProcessTask<P>
|
impl<P, E> MessageProcessTask<P, E> where
|
||||||
where P::Api: ParachainHost<Block>,
|
P: ProvideRuntimeApi + Send + Sync + 'static,
|
||||||
|
P::Api: ParachainHost<Block>,
|
||||||
|
E: Future<Item=(),Error=()> + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
fn process_message(&self, msg: ConsensusMessage) -> Option<Async<()>> {
|
fn process_message(&self, msg: ConsensusMessage) -> Option<Async<()>> {
|
||||||
use polkadot_consensus::SignedStatement;
|
use polkadot_consensus::SignedStatement;
|
||||||
@@ -61,7 +64,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> MessageProcessTask<P>
|
|||||||
statement.sender,
|
statement.sender,
|
||||||
&self.parent_hash
|
&self.parent_hash
|
||||||
) {
|
) {
|
||||||
self.table_router.import_statement(statement);
|
self.table_router.import_statement(statement, self.exit.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -69,8 +72,10 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> MessageProcessTask<P>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<P: ProvideRuntimeApi + Send + Sync + 'static> Future for MessageProcessTask<P>
|
impl<P, E> Future for MessageProcessTask<P, E> where
|
||||||
where P::Api: ParachainHost<Block>,
|
P: ProvideRuntimeApi + Send + Sync + 'static,
|
||||||
|
P::Api: ParachainHost<Block>,
|
||||||
|
E: Future<Item=(),Error=()> + Clone + Send + 'static,
|
||||||
{
|
{
|
||||||
type Item = ();
|
type Item = ();
|
||||||
type Error = ();
|
type Error = ();
|
||||||
@@ -90,30 +95,34 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Future for MessageProcessTask
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Wrapper around the network service
|
/// Wrapper around the network service
|
||||||
pub struct ConsensusNetwork<P> {
|
pub struct ConsensusNetwork<P, E> {
|
||||||
network: Arc<NetworkService>,
|
network: Arc<NetworkService>,
|
||||||
api: Arc<P>,
|
api: Arc<P>,
|
||||||
|
exit: E,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<P> ConsensusNetwork<P> {
|
impl<P, E> ConsensusNetwork<P, E> {
|
||||||
/// Create a new consensus networking object.
|
/// Create a new consensus networking object.
|
||||||
pub fn new(network: Arc<NetworkService>, api: Arc<P>) -> Self {
|
pub fn new(network: Arc<NetworkService>, exit: E, api: Arc<P>) -> Self {
|
||||||
ConsensusNetwork { network, api }
|
ConsensusNetwork { network, exit, api }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<P> Clone for ConsensusNetwork<P> {
|
impl<P, E: Clone> Clone for ConsensusNetwork<P, E> {
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
ConsensusNetwork {
|
ConsensusNetwork {
|
||||||
network: self.network.clone(),
|
network: self.network.clone(),
|
||||||
|
exit: self.exit.clone(),
|
||||||
api: self.api.clone(),
|
api: self.api.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A long-lived network which can create parachain statement routing processes on demand.
|
/// A long-lived network which can create parachain statement routing processes on demand.
|
||||||
impl<P: ProvideRuntimeApi + Send + Sync + 'static> Network for ConsensusNetwork<P>
|
impl<P, E> Network for ConsensusNetwork<P,E> where
|
||||||
where P::Api: ParachainHost<Block>,
|
P: ProvideRuntimeApi + Send + Sync + 'static,
|
||||||
|
P::Api: ParachainHost<Block>,
|
||||||
|
E: Clone + Future<Item=(),Error=()> + Send + 'static,
|
||||||
{
|
{
|
||||||
type TableRouter = Router<P>;
|
type TableRouter = Router<P>;
|
||||||
|
|
||||||
@@ -122,7 +131,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Network for ConsensusNetwork<
|
|||||||
&self,
|
&self,
|
||||||
_validators: &[SessionKey],
|
_validators: &[SessionKey],
|
||||||
table: Arc<SharedTable>,
|
table: Arc<SharedTable>,
|
||||||
task_executor: TaskExecutor
|
task_executor: TaskExecutor,
|
||||||
) -> Self::TableRouter {
|
) -> Self::TableRouter {
|
||||||
let parent_hash = table.consensus_parent_hash().clone();
|
let parent_hash = table.consensus_parent_hash().clone();
|
||||||
|
|
||||||
@@ -139,22 +148,28 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Network for ConsensusNetwork<
|
|||||||
);
|
);
|
||||||
|
|
||||||
let attestation_topic = table_router.gossip_topic();
|
let attestation_topic = table_router.gossip_topic();
|
||||||
|
let exit = self.exit.clone();
|
||||||
|
|
||||||
// spin up a task in the background that processes all incoming statements
|
// spin up a task in the background that processes all incoming statements
|
||||||
// TODO: propagate statements on a timer?
|
// TODO: propagate statements on a timer?
|
||||||
let inner_stream = self.network.consensus_gossip().write().messages_for(attestation_topic);
|
let inner_stream = self.network.consensus_gossip().write().messages_for(attestation_topic);
|
||||||
task_executor.spawn(self.network.with_spec(|spec, ctx| {
|
let process_task = self.network
|
||||||
spec.new_consensus(ctx, parent_hash, CurrentConsensus {
|
.with_spec(|spec, ctx| {
|
||||||
knowledge,
|
spec.new_consensus(ctx, parent_hash, CurrentConsensus {
|
||||||
local_session_key,
|
knowledge,
|
||||||
});
|
local_session_key,
|
||||||
|
});
|
||||||
|
|
||||||
MessageProcessTask {
|
MessageProcessTask {
|
||||||
inner_stream,
|
inner_stream,
|
||||||
parent_hash,
|
parent_hash,
|
||||||
table_router: table_router.clone(),
|
table_router: table_router.clone(),
|
||||||
}
|
exit,
|
||||||
}));
|
}
|
||||||
|
})
|
||||||
|
.then(|_| Ok(()));
|
||||||
|
|
||||||
|
task_executor.spawn(process_task);
|
||||||
|
|
||||||
table_router
|
table_router
|
||||||
}
|
}
|
||||||
@@ -176,7 +191,7 @@ impl Future for AwaitingCollation {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<P: ProvideRuntimeApi + Send + Sync + 'static> Collators for ConsensusNetwork<P>
|
impl<P: ProvideRuntimeApi + Send + Sync + 'static, E: Clone> Collators for ConsensusNetwork<P, E>
|
||||||
where P::Api: ParachainHost<Block>,
|
where P::Api: ParachainHost<Block>,
|
||||||
{
|
{
|
||||||
type Error = NetworkDown;
|
type Error = NetworkDown;
|
||||||
|
|||||||
@@ -104,7 +104,9 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P>
|
|||||||
where P::Api: ParachainHost<Block>
|
where P::Api: ParachainHost<Block>
|
||||||
{
|
{
|
||||||
/// Import a statement whose signature has been checked already.
|
/// Import a statement whose signature has been checked already.
|
||||||
pub(crate) fn import_statement(&self, statement: SignedStatement) {
|
pub(crate) fn import_statement<Exit>(&self, statement: SignedStatement, exit: Exit)
|
||||||
|
where Exit: Future<Item=(),Error=()> + Clone + Send + 'static
|
||||||
|
{
|
||||||
trace!(target: "p_net", "importing consensus statement {:?}", statement.statement);
|
trace!(target: "p_net", "importing consensus statement {:?}", statement.statement);
|
||||||
|
|
||||||
// defer any statements for which we haven't imported the candidate yet
|
// defer any statements for which we haven't imported the candidate yet
|
||||||
@@ -143,14 +145,16 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P>
|
|||||||
for (producer, statement) in producers.into_iter().zip(statements) {
|
for (producer, statement) in producers.into_iter().zip(statements) {
|
||||||
self.knowledge.lock().note_statement(statement.sender, &statement.statement);
|
self.knowledge.lock().note_statement(statement.sender, &statement.statement);
|
||||||
|
|
||||||
if let Some(producer) = producer {
|
if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) {
|
||||||
trace!(target: "consensus", "driving statement work to completion");
|
trace!(target: "consensus", "driving statement work to completion");
|
||||||
self.dispatch_work(c_hash, producer);
|
self.task_executor.spawn(work.select(exit.clone()).then(|_| Ok(())));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dispatch_work<D, E>(&self, candidate_hash: Hash, producer: StatementProducer<D, E>) where
|
fn create_work<D, E>(&self, candidate_hash: Hash, producer: StatementProducer<D, E>)
|
||||||
|
-> impl Future<Item=(),Error=()>
|
||||||
|
where
|
||||||
D: Future<Item=BlockData,Error=io::Error> + Send + 'static,
|
D: Future<Item=BlockData,Error=io::Error> + Send + 'static,
|
||||||
E: Future<Item=Extrinsic,Error=io::Error> + Send + 'static,
|
E: Future<Item=Extrinsic,Error=io::Error> + Send + 'static,
|
||||||
{
|
{
|
||||||
@@ -173,13 +177,13 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P>
|
|||||||
let knowledge = self.knowledge.clone();
|
let knowledge = self.knowledge.clone();
|
||||||
let attestation_topic = self.attestation_topic.clone();
|
let attestation_topic = self.attestation_topic.clone();
|
||||||
|
|
||||||
let work = producer.prime(validate)
|
producer.prime(validate)
|
||||||
.map(move |produced| {
|
.map(move |produced| {
|
||||||
// store the data before broadcasting statements, so other peers can fetch.
|
// store the data before broadcasting statements, so other peers can fetch.
|
||||||
knowledge.lock().note_candidate(
|
knowledge.lock().note_candidate(
|
||||||
candidate_hash,
|
candidate_hash,
|
||||||
produced.block_data,
|
produced.block_data,
|
||||||
produced.extrinsic
|
produced.extrinsic,
|
||||||
);
|
);
|
||||||
|
|
||||||
if produced.validity.is_none() && produced.availability.is_none() {
|
if produced.validity.is_none() && produced.availability.is_none() {
|
||||||
@@ -204,9 +208,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P>
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e));
|
.map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e))
|
||||||
|
|
||||||
self.task_executor.spawn(work);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -205,7 +205,11 @@ construct_service_factory! {
|
|||||||
let client = service.client();
|
let client = service.client();
|
||||||
|
|
||||||
// collator connections and consensus network both fulfilled by this
|
// collator connections and consensus network both fulfilled by this
|
||||||
let consensus_network = ConsensusNetwork::new(service.network(), service.client());
|
let consensus_network = ConsensusNetwork::new(
|
||||||
|
service.network(),
|
||||||
|
service.on_exit(),
|
||||||
|
service.client(),
|
||||||
|
);
|
||||||
let proposer_factory = ::consensus::ProposerFactory::new(
|
let proposer_factory = ::consensus::ProposerFactory::new(
|
||||||
client.clone(),
|
client.clone(),
|
||||||
consensus_network.clone(),
|
consensus_network.clone(),
|
||||||
|
|||||||
Reference in New Issue
Block a user