Parachain improvements (#1905)

* Parachain improvements

- Set the parachains configuration in Rococo genesis
- Don't stop the overseer when a subsystem job is stopped
- Several small code changes

* Remove unused functionality

* Return error from the runtime instead of printing it

* Apply suggestions from code review

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Update primitives/src/v1.rs

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Update primitives/src/v1.rs

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Fix test

* Revert "Update primitives/src/v1.rs"

This reverts commit 11fce2785acd1de481ca57815b8e18400f09fd52.

* Revert "Update primitives/src/v1.rs"

This reverts commit d6439fed4f954360c89fb1e12b73954902c76a41.

* Revert "Return error from the runtime instead of printing it"

This reverts commit cb4b5c0830ac516a6d54b2c24197e9354f2b98cb.

* Revert "Fix test"

This reverts commit 0c5fa1b5566d4cd3c55a55d485e707165ce7a59e.

* Update runtime/parachains/src/runtime_api_impl/v1.rs

Co-authored-by: Sergei Shulepov <sergei@parity.io>

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
Co-authored-by: Sergei Shulepov <sergei@parity.io>
This commit is contained in:
Bastian Köcher
2020-11-03 12:22:38 +01:00
committed by GitHub
parent 45c9aefd27
commit 002e1141a8
14 changed files with 122 additions and 175 deletions
+65 -146
View File
@@ -22,9 +22,6 @@
//!
//! This crate also reexports Prometheus metric types which are expected to be implemented by subsystems.
#![deny(unused_results)]
// #![deny(unused_crate_dependencies] causes false positives
// https://github.com/rust-lang/rust/issues/57274
#![warn(missing_docs)]
use polkadot_node_subsystem::{
@@ -32,17 +29,10 @@ use polkadot_node_subsystem::{
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
};
use futures::{
channel::{mpsc, oneshot},
future::Either,
prelude::*,
select,
stream::Stream,
task,
};
use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::Stream};
use futures_timer::Delay;
use parity_scale_codec::Encode;
use pin_project::{pin_project, pinned_drop};
use pin_project::pin_project;
use polkadot_primitives::v1::{
CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData,
GroupRotationInfo, Hash, Id as ParaId, ValidationData, OccupiedCoreAssumption,
@@ -59,7 +49,7 @@ use sp_keystore::{
Error as KeystoreError,
};
use std::{
collections::HashMap,
collections::{HashMap, hash_map::Entry},
convert::{TryFrom, TryInto},
marker::Unpin,
pin::Pin,
@@ -81,7 +71,6 @@ pub mod reexports {
};
}
/// Duration a job will wait after sending a stop signal before hard-aborting.
pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
/// Capacity of channels to and from individual jobs
@@ -111,9 +100,6 @@ pub enum Error {
/// The local node is not a validator.
#[error("Node is not a validator")]
NotAValidator,
/// The desired job is not present in the jobs list.
#[error("Relay parent {0} not of interest")]
JobNotFound(Hash),
/// Already forwarding errors to another sender
#[error("AlreadyForwarding")]
AlreadyForwarding,
@@ -415,12 +401,19 @@ pub trait ToJobTrait: TryFrom<AllMessages> {
fn relay_parent(&self) -> Option<Hash>;
}
struct AbortOnDrop(future::AbortHandle);
impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort();
}
}
/// A JobHandle manages a particular job for a subsystem.
struct JobHandle<ToJob> {
abort_handle: future::AbortHandle,
_abort_handle: AbortOnDrop,
to_job: mpsc::Sender<ToJob>,
finished: oneshot::Receiver<()>,
outgoing_msgs_handle: usize,
}
impl<ToJob> JobHandle<ToJob> {
@@ -436,20 +429,13 @@ impl<ToJob: ToJobTrait> JobHandle<ToJob> {
/// If it hasn't shut itself down after `JOB_GRACEFUL_STOP_DURATION`, abort it.
async fn stop(mut self) {
// we don't actually care if the message couldn't be sent
if let Err(_) = self.to_job.send(ToJob::STOP).await {
// no need to wait further here: the job is either stalled or
// disconnected, and in either case, we can just abort it immediately
self.abort_handle.abort();
if self.to_job.send(ToJob::STOP).await.is_err() {
return;
}
let stop_timer = Delay::new(JOB_GRACEFUL_STOP_DURATION);
match future::select(stop_timer, self.finished).await {
Either::Left((_, _)) => {}
Either::Right((_, _)) => {
self.abort_handle.abort();
}
}
future::select(stop_timer, self.finished).await;
}
}
@@ -521,20 +507,6 @@ pub trait JobTrait: Unpin {
receiver: mpsc::Receiver<Self::ToJob>,
sender: mpsc::Sender<Self::FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
/// Handle a message which has no relay parent, and therefore can't be dispatched to a particular job
///
/// By default, this is implemented with a NOP function. However, if
/// ToJob occasionally has messages which do not correspond to a particular
/// parent relay hash, then this function will be spawned as a one-off
/// task to handle those messages.
// TODO: the API here is likely not precisely what we want; figure it out more
// once we're implementing a subsystem which actually needs this feature.
// In particular, we're quite likely to want this to return a future instead of
// interrupting the active thread for the duration of the handler.
fn handle_unanchored_msg(_msg: Self::ToJob) -> Result<(), Self::Error> {
Ok(())
}
}
/// Error which can be returned by the jobs manager
@@ -557,12 +529,12 @@ pub enum JobsError<JobError: 'static + std::error::Error> {
/// - Dispatches messages to the appropriate job for a given relay-parent.
/// - When dropped, aborts all remaining jobs.
/// - implements `Stream<Item=Job::FromJob>`, collecting all messages from subordinate jobs.
#[pin_project(PinnedDrop)]
#[pin_project]
pub struct Jobs<Spawner, Job: JobTrait> {
spawner: Spawner,
running: HashMap<Hash, JobHandle<Job::ToJob>>,
#[pin]
outgoing_msgs: StreamUnordered<mpsc::Receiver<Job::FromJob>>,
#[pin]
job: std::marker::PhantomData<Job>,
errors: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
}
@@ -602,7 +574,6 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let (finished_tx, finished) = oneshot::channel();
// clone the error transmitter to move into the future
let err_tx = self.errors.clone();
let (future, abort_handle) = future::abortable(async move {
@@ -625,7 +596,6 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
}
});
// the spawn mechanism requires that the spawned future has no output
let future = async move {
// job errors are already handled within the future, meaning
// that any errors here are due to the abortable mechanism.
@@ -637,54 +607,33 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
};
self.spawner.spawn(Job::NAME, future.boxed());
// this handle lets us remove the appropriate receiver from self.outgoing_msgs
// when it's time to stop the job.
let outgoing_msgs_handle = self.outgoing_msgs.push(from_job_rx);
self.outgoing_msgs.push(from_job_rx);
let handle = JobHandle {
abort_handle,
_abort_handle: AbortOnDrop(abort_handle),
to_job: to_job_tx,
finished,
outgoing_msgs_handle,
};
let _ = self.running.insert(parent_hash, handle);
self.running.insert(parent_hash, handle);
Ok(())
}
/// Stop the job associated with this `parent_hash`.
pub async fn stop_job(&mut self, parent_hash: Hash) -> Result<(), Error> {
match self.running.remove(&parent_hash) {
Some(handle) => {
let _ = Pin::new(&mut self.outgoing_msgs).remove(handle.outgoing_msgs_handle);
handle.stop().await;
Ok(())
}
None => Err(Error::JobNotFound(parent_hash)),
pub async fn stop_job(&mut self, parent_hash: Hash) {
if let Some(handle) = self.running.remove(&parent_hash) {
handle.stop().await;
}
}
/// Send a message to the appropriate job for this `parent_hash`.
/// Will not return an error if the job is not running.
async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) -> Result<(), Error> {
match self.running.get_mut(&parent_hash) {
Some(job) => job.send_msg(msg).await?,
None => {
// don't bring down the subsystem, this can happen to due a race condition
},
}
Ok(())
}
}
// Note that on drop, we don't have the chance to gracefully spin down each of the remaining handles;
// we just abort them all. Still better than letting them dangle.
#[pinned_drop]
impl<Spawner, Job: JobTrait> PinnedDrop for Jobs<Spawner, Job> {
fn drop(self: Pin<&mut Self>) {
for job_handle in self.running.values() {
job_handle.abort_handle.abort();
async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) {
if let Entry::Occupied(mut job) = self.running.entry(parent_hash) {
if job.get_mut().send_msg(msg).await.is_err() {
log::debug!("failed to send message to job ({}), will remove it", Job::NAME);
job.remove();
}
}
}
}
@@ -696,18 +645,18 @@ where
{
type Item = Job::FromJob;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Option<Self::Item>> {
// pin-project the outgoing messages
let result = self.project().outgoing_msgs.poll_next(cx).map(|opt| {
opt.and_then(|(stream_yield, _)| match stream_yield {
StreamYield::Item(msg) => Some(msg),
StreamYield::Finished(_) => None,
})
});
// we don't want the stream to end if the jobs are empty at some point
match result {
task::Poll::Ready(None) => task::Poll::Pending,
otherwise => otherwise,
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
match Pin::new(&mut self.outgoing_msgs).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(r) => match r.map(|v| v.0) {
Some(StreamYield::Item(msg)) => return Poll::Ready(Some(msg)),
// If a job is finished, rerun the loop
Some(StreamYield::Finished(_)) => continue,
// Don't end if there are no jobs running
None => return Poll::Pending,
}
}
}
}
}
@@ -790,8 +739,18 @@ where
loop {
select! {
incoming = ctx.recv().fuse() => if Self::handle_incoming(incoming, &mut jobs, &run_args, &metrics, &mut err_tx).await { break },
outgoing = jobs.next().fuse() => Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await,
incoming = ctx.recv().fuse() =>
if Self::handle_incoming(
incoming,
&mut jobs,
&run_args,
&metrics,
&mut err_tx,
).await {
break
},
outgoing = jobs.next().fuse() =>
Self::handle_outgoing(outgoing, &mut ctx, &mut err_tx).await,
complete => break,
}
}
@@ -832,20 +791,14 @@ where
for hash in activated {
let metrics = metrics.clone();
if let Err(e) = jobs.spawn_job(hash, run_args.clone(), metrics) {
log::error!("Failed to spawn a job: {:?}", e);
let e = JobsError::Utility(e);
Self::fwd_err(Some(hash), e, err_tx).await;
log::error!("Failed to spawn a job({}): {:?}", Job::NAME, e);
Self::fwd_err(Some(hash), JobsError::Utility(e), err_tx).await;
return true;
}
}
for hash in deactivated {
if let Err(e) = jobs.stop_job(hash).await {
log::error!("Failed to stop a job: {:?}", e);
let e = JobsError::Utility(e);
Self::fwd_err(Some(hash), e, err_tx).await;
return true;
}
jobs.stop_job(hash).await;
}
}
Ok(Signal(Conclude)) => {
@@ -867,7 +820,7 @@ where
.forward(drain())
.await
{
log::error!("failed to stop all jobs on conclude signal: {:?}", e);
log::error!("failed to stop all jobs ({}) on conclude signal: {:?}", Job::NAME, e);
let e = Error::from(e);
Self::fwd_err(None, JobsError::Utility(e), err_tx).await;
}
@@ -877,30 +830,18 @@ where
Ok(Communication { msg }) => {
if let Ok(to_job) = <Job::ToJob>::try_from(msg) {
match to_job.relay_parent() {
Some(hash) => {
if let Err(err) = jobs.send_msg(hash, to_job).await {
log::error!("Failed to send a message to a job: {:?}", err);
let e = JobsError::Utility(err);
Self::fwd_err(Some(hash), e, err_tx).await;
return true;
}
}
None => {
if let Err(err) = Job::handle_unanchored_msg(to_job) {
log::error!("Failed to handle unhashed message: {:?}", err);
let e = JobsError::Job(err);
Self::fwd_err(None, e, err_tx).await;
return true;
}
}
Some(hash) => jobs.send_msg(hash, to_job).await,
None => log::debug!(
"Trying to send a message to a job ({}) without specifying a relay parent.",
Job::NAME,
),
}
}
}
Ok(Signal(BlockFinalized(_))) => {}
Err(err) => {
log::error!("error receiving message from subsystem context: {:?}", err);
let e = JobsError::Utility(Error::from(err));
Self::fwd_err(None, e, err_tx).await;
log::error!("error receiving message from subsystem context for job ({}): {:?}", Job::NAME, err);
Self::fwd_err(None, JobsError::Utility(Error::from(err)), err_tx).await;
return true;
}
}
@@ -1078,7 +1019,7 @@ impl<F: Future> Future for Timeout<F> {
#[cfg(test)]
mod tests {
use super::{Error as UtilError, JobManager, JobTrait, JobsError, TimeoutExt, ToJobTrait};
use super::{JobManager, JobTrait, JobsError, TimeoutExt, ToJobTrait};
use thiserror::Error;
use polkadot_node_subsystem::{
messages::{AllMessages, CandidateSelectionMessage},
@@ -1305,28 +1246,6 @@ mod tests {
});
}
#[test]
fn stopping_non_running_job_fails() {
let relay_parent: Hash = [0; 32].into();
let run_args = HashMap::new();
test_harness(run_args, |mut overseer_handle, err_rx| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::stop_work(relay_parent),
)))
.await;
let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 1);
assert_eq!(errs[0].0, Some(relay_parent));
assert_matches!(
errs[0].1,
JobsError::Utility(UtilError::JobNotFound(match_relay_parent)) if relay_parent == match_relay_parent
);
});
}
#[test]
fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
let relay_parent = Hash::repeat_byte(0x01);