refactor overseer into proc-macro based pattern (#2962)

This commit is contained in:
Bernhard Schuster
2021-07-08 21:09:26 +02:00
committed by GitHub
parent 2510bfc5d7
commit 3c9104daff
119 changed files with 5675 additions and 3864 deletions
@@ -17,7 +17,10 @@
//! A utility for fetching all unknown blocks based on a new chain-head hash.
use polkadot_node_subsystem::{
messages::ChainApiMessage, SubsystemSender,
messages::ChainApiMessage,
};
use polkadot_node_subsystem::{
SubsystemSender,
};
use polkadot_primitives::v1::{Hash, Header, BlockNumber};
use futures::prelude::*;
@@ -34,13 +37,15 @@ use futures::channel::oneshot;
/// then the returned list will be empty.
///
/// This may be somewhat expensive when first recovering from major sync.
pub async fn determine_new_blocks<E>(
ctx: &mut impl SubsystemSender,
pub async fn determine_new_blocks<E, Sender>(
sender: &mut Sender,
is_known: impl Fn(&Hash) -> Result<bool, E>,
head: Hash,
header: &Header,
lower_bound_number: BlockNumber,
) -> Result<Vec<(Hash, Header)>, E> {
) -> Result<Vec<(Hash, Header)>, E> where
Sender: SubsystemSender,
{
const ANCESTRY_STEP: usize = 4;
let min_block_needed = lower_bound_number + 1;
@@ -87,7 +92,7 @@ pub async fn determine_new_blocks<E>(
let batch_hashes = if ancestry_step == 1 {
vec![last_header.parent_hash]
} else {
ctx.send_message(ChainApiMessage::Ancestors {
sender.send_message(ChainApiMessage::Ancestors {
hash: *last_hash,
k: ancestry_step,
response_channel: tx,
@@ -105,8 +110,8 @@ pub async fn determine_new_blocks<E>(
.map(|_| oneshot::channel())
.unzip::<_, _, Vec<_>, Vec<_>>();
for (hash, sender) in batch_hashes.iter().cloned().zip(batch_senders) {
ctx.send_message(ChainApiMessage::BlockHeader(hash, sender).into()).await;
for (hash, batched_sender) in batch_hashes.iter().cloned().zip(batch_senders) {
sender.send_message(ChainApiMessage::BlockHeader(hash, batched_sender).into()).await;
}
let mut requests = futures::stream::FuturesOrdered::new();
@@ -156,7 +161,7 @@ mod tests {
use super::*;
use std::collections::{HashSet, HashMap};
use sp_core::testing::TaskExecutor;
use polkadot_node_subsystem::{messages::AllMessages, SubsystemContext};
use polkadot_overseer::{AllMessages, SubsystemContext};
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use assert_matches::assert_matches;
@@ -606,7 +611,7 @@ mod tests {
}
);
for _ in 0..2 {
for _ in 0_u8..2 {
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::BlockHeader(h, tx)) => {
+98 -172
View File
@@ -25,21 +25,46 @@
#![warn(missing_docs)]
use polkadot_node_subsystem::{
overseer,
errors::RuntimeApiError,
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender, BoundToRelayParent},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemSender,
messages::{
AllMessages,
RuntimeApiMessage,
RuntimeApiRequest,
RuntimeApiSender,
BoundToRelayParent,
},
ActiveLeavesUpdate, OverseerSignal,
SubsystemSender,
errors::{
SubsystemError,
},
SubsystemContext,
SpawnedSubsystem,
FromOverseer,
};
pub use overseer::{
Subsystem,
TimeoutExt,
gen::OverseerError,
gen::Timeout,
};
pub use polkadot_node_metrics::{
Metronome,
metrics,
};
use polkadot_node_jaeger as jaeger;
use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::{Stream, SelectAll}};
use futures_timer::Delay;
use parity_scale_codec::Encode;
use pin_project::pin_project;
use polkadot_primitives::v1::{
CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData,
GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption,
SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex, SessionInfo,
AuthorityDiscoveryId, GroupIndex,
AuthorityDiscoveryId, GroupIndex,
};
use sp_core::{traits::SpawnNamed, Public};
use sp_application_crypto::AppKey;
@@ -59,8 +84,8 @@ pub use error_handling::{Fault, unwrap_non_fatal};
/// These reexports are required so that external crates can use the `delegated_subsystem` macro properly.
pub mod reexports {
pub use sp_core::traits::SpawnNamed;
pub use polkadot_node_subsystem::{
pub use polkadot_overseer::gen::{
SpawnNamed,
SpawnedSubsystem,
Subsystem,
SubsystemContext,
@@ -112,6 +137,12 @@ pub enum Error {
AlreadyForwarding,
}
impl From<OverseerError> for Error {
fn from(e: OverseerError) -> Self {
Self::from(SubsystemError::from(e))
}
}
/// A type alias for Runtime API receivers.
pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
@@ -148,13 +179,14 @@ macro_rules! specialize_requests {
#[doc = "Request `"]
#[doc = $doc_name]
#[doc = "` from the runtime"]
pub async fn $func_name(
pub async fn $func_name (
parent: Hash,
$(
$param_name: $param_ty,
)*
sender: &mut impl SubsystemSender,
) -> RuntimeApiReceiver<$return_ty> {
) -> RuntimeApiReceiver<$return_ty>
{
request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
$( $param_name, )* tx
)).await
@@ -268,7 +300,8 @@ impl Validator {
parent: Hash,
keystore: SyncCryptoStorePtr,
sender: &mut impl SubsystemSender,
) -> Result<Self, Error> {
) -> Result<Self, Error>
{
// Note: request_validators and request_session_index_for_child do not and cannot
// run concurrently: they both have a mutable handle to the same sender.
// However, each of them returns a oneshot::Receiver, and those are resolved concurrently.
@@ -352,41 +385,6 @@ impl<ToJob> JobHandle<ToJob> {
}
}
/// This module reexports Prometheus types and defines the [`Metrics`] trait.
pub mod metrics {
/// Reexport Substrate Prometheus types.
pub use substrate_prometheus_endpoint as prometheus;
/// Subsystem- or job-specific Prometheus metrics.
///
/// Usually implemented as a wrapper for `Option<ActualMetrics>`
/// to ensure `Default` bounds or as a dummy type ().
/// Prometheus metrics internally hold an `Arc` reference, so cloning them is fine.
pub trait Metrics: Default + Clone {
/// Try to register metrics in the Prometheus registry.
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError>;
/// Convenience method to register metrics in the optional Promethius registry.
///
/// If no registry is provided, returns `Default::default()`. Otherwise, returns the same
/// thing that `try_register` does.
fn register(registry: Option<&prometheus::Registry>) -> Result<Self, prometheus::PrometheusError> {
match registry {
None => Ok(Self::default()),
Some(registry) => Self::try_register(registry),
}
}
}
// dummy impl
impl Metrics for () {
fn try_register(_registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> {
Ok(())
}
}
}
/// Commands from a job to the broader subsystem.
pub enum FromJobCommand {
/// Spawn a child task on the executor.
@@ -396,12 +394,22 @@ pub enum FromJobCommand {
}
/// A sender for messages from jobs, as well as commands to the overseer.
#[derive(Clone)]
pub struct JobSender<S> {
pub struct JobSender<S: SubsystemSender> {
sender: S,
from_job: mpsc::Sender<FromJobCommand>,
}
// A custom clone impl, since M does not need to impl `Clone`
// which `#[derive(Clone)]` requires.
impl<S: SubsystemSender> Clone for JobSender<S> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
from_job: self.from_job.clone(),
}
}
}
impl<S: SubsystemSender> JobSender<S> {
/// Get access to the underlying subsystem sender.
pub fn subsystem_sender(&mut self) -> &mut S {
@@ -409,15 +417,17 @@ impl<S: SubsystemSender> JobSender<S> {
}
/// Send a direct message to some other `Subsystem`, routed based on message type.
pub async fn send_message(&mut self, msg: AllMessages) {
self.sender.send_message(msg).await
pub async fn send_message(&mut self, msg: impl Into<AllMessages>) {
self.sender.send_message(msg.into()).await
}
/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
pub async fn send_messages<T>(&mut self, msgs: T)
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
pub async fn send_messages<T, M>(&mut self, msgs: T)
where
T: IntoIterator<Item = M> + Send, T::IntoIter: Send,
M: Into<AllMessages>,
{
self.sender.send_messages(msgs).await
self.sender.send_messages(msgs.into_iter().map(|m| m.into())).await
}
@@ -426,8 +436,8 @@ impl<S: SubsystemSender> JobSender<S> {
///
/// This function should be used only when there is some other bounding factor on the messages
/// sent with it. Otherwise, it risks a memory leak.
pub fn send_unbounded_message(&mut self, msg: AllMessages) {
self.sender.send_unbounded_message(msg)
pub fn send_unbounded_message(&mut self, msg: impl Into<AllMessages>) {
self.sender.send_unbounded_message(msg.into())
}
/// Send a command to the subsystem, to be relayed onwards to the overseer.
@@ -436,20 +446,25 @@ impl<S: SubsystemSender> JobSender<S> {
}
}
#[async_trait::async_trait]
impl<S: SubsystemSender> SubsystemSender for JobSender<S> {
async fn send_message(&mut self, msg: AllMessages) {
self.sender.send_message(msg).await
impl<S, M> overseer::SubsystemSender<M> for JobSender<S>
where
M: Send + 'static + Into<AllMessages>,
S: SubsystemSender + Clone,
{
async fn send_message(&mut self, msg: M) {
self.sender.send_message(msg.into()).await
}
async fn send_messages<T>(&mut self, msgs: T)
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
where T: IntoIterator<Item = M> + Send, T::IntoIter: Send
{
self.sender.send_messages(msgs).await
self.sender.send_messages(msgs.into_iter().map(|m| m.into())).await
}
fn send_unbounded_message(&mut self, msg: AllMessages) {
self.sender.send_unbounded_message(msg)
fn send_unbounded_message(&mut self, msg: M) {
self.sender.send_unbounded_message(msg.into())
}
}
@@ -525,7 +540,11 @@ struct Jobs<Spawner, ToJob> {
outgoing_msgs: SelectAll<mpsc::Receiver<FromJobCommand>>,
}
impl<Spawner: SpawnNamed, ToJob: Send + 'static> Jobs<Spawner, ToJob> {
impl<Spawner, ToJob> Jobs<Spawner, ToJob>
where
Spawner: SpawnNamed,
ToJob: Send + 'static,
{
/// Create a new Jobs manager which handles spawning appropriate jobs.
pub fn new(spawner: Spawner) -> Self {
Self {
@@ -544,7 +563,9 @@ impl<Spawner: SpawnNamed, ToJob: Send + 'static> Jobs<Spawner, ToJob> {
metrics: Job::Metrics,
sender: Sender,
)
where Job: JobTrait<ToJob = ToJob>, Sender: SubsystemSender,
where
Job: JobTrait<ToJob = ToJob>,
Sender: SubsystemSender,
{
let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
@@ -664,11 +685,12 @@ impl<Job: JobTrait, Spawner> JobSubsystem<Job, Spawner> {
pub async fn run<Context>(self, mut ctx: Context)
where
Spawner: SpawnNamed + Send + Clone + Unpin + 'static,
Context: SubsystemContext,
Context: SubsystemContext<Message=<Job as JobTrait>::ToJob, Signal=OverseerSignal>,
<Context as SubsystemContext>::Sender: SubsystemSender,
Job: 'static + JobTrait + Send,
Job::RunArgs: Clone + Sync,
Job::ToJob: From<<Context as SubsystemContext>::Message> + Sync,
Job::Metrics: Sync,
<Job as JobTrait>::RunArgs: Clone + Sync,
<Job as JobTrait>::ToJob: Sync + From<<Context as polkadot_overseer::SubsystemContext>::Message>,
<Job as JobTrait>::Metrics: Sync,
{
let JobSubsystem {
params: JobSubsystemParams {
@@ -679,7 +701,7 @@ impl<Job: JobTrait, Spawner> JobSubsystem<Job, Spawner> {
..
} = self;
let mut jobs = Jobs::new(spawner);
let mut jobs = Jobs::<Spawner, Job::ToJob>::new(spawner);
loop {
select! {
@@ -690,7 +712,7 @@ impl<Job: JobTrait, Spawner> JobSubsystem<Job, Spawner> {
deactivated,
}))) => {
for activated in activated {
let sender: Context::Sender = ctx.sender().clone();
let sender = ctx.sender().clone();
jobs.spawn_job::<Job, _>(
activated.hash,
activated.span,
@@ -710,7 +732,7 @@ impl<Job: JobTrait, Spawner> JobSubsystem<Job, Spawner> {
}
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(..))) => {}
Ok(FromOverseer::Communication { msg }) => {
if let Ok(to_job) = <Job::ToJob>::try_from(msg) {
if let Ok(to_job) = <<Context as SubsystemContext>::Message>::try_from(msg) {
jobs.send_msg(to_job.relay_parent(), to_job).await;
}
}
@@ -725,10 +747,12 @@ impl<Job: JobTrait, Spawner> JobSubsystem<Job, Spawner> {
}
}
outgoing = jobs.next() => {
// TODO verify the introduced .await here is not a problem
// TODO it should only wait for the spawn to complete
// TODO but not for anything beyond that
let res = match outgoing.expect("the Jobs stream never ends; qed") {
FromJobCommand::Spawn(name, task) => ctx.spawn(name, task),
FromJobCommand::SpawnBlocking(name, task)
=> ctx.spawn_blocking(name, task),
FromJobCommand::SpawnBlocking(name, task) => ctx.spawn_blocking(name, task),
};
if let Err(e) = res {
@@ -741,13 +765,13 @@ impl<Job: JobTrait, Spawner> JobSubsystem<Job, Spawner> {
}
}
impl<Context, Job, Spawner> Subsystem<Context> for JobSubsystem<Job, Spawner>
impl<Context, Job, Spawner> Subsystem<Context, SubsystemError> for JobSubsystem<Job, Spawner>
where
Spawner: SpawnNamed + Send + Clone + Unpin + 'static,
Context: SubsystemContext,
Context: SubsystemContext<Message=Job::ToJob,Signal=OverseerSignal>,
Job: 'static + JobTrait + Send,
Job::RunArgs: Clone + Sync,
Job::ToJob: From<<Context as SubsystemContext>::Message> + Sync,
<Job as JobTrait>::ToJob: Sync + From<<Context as polkadot_overseer::SubsystemContext>::Message>,
Job::Metrics: Sync,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
@@ -762,101 +786,3 @@ where
}
}
}
/// A future that wraps another future with a `Delay` allowing for time-limited futures.
#[pin_project]
pub struct Timeout<F: Future> {
#[pin]
future: F,
#[pin]
delay: Delay,
}
/// Extends `Future` to allow time-limited futures.
pub trait TimeoutExt: Future {
/// Adds a timeout of `duration` to the given `Future`.
/// Returns a new `Future`.
fn timeout(self, duration: Duration) -> Timeout<Self>
where
Self: Sized,
{
Timeout {
future: self,
delay: Delay::new(duration),
}
}
}
impl<F: Future> TimeoutExt for F {}
impl<F: Future> Future for Timeout<F> {
type Output = Option<F::Output>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
if this.delay.poll(ctx).is_ready() {
return Poll::Ready(None);
}
if let Poll::Ready(output) = this.future.poll(ctx) {
return Poll::Ready(Some(output));
}
Poll::Pending
}
}
#[derive(Copy, Clone)]
enum MetronomeState {
Snooze,
SetAlarm,
}
/// Create a stream of ticks with a defined cycle duration.
pub struct Metronome {
delay: Delay,
period: Duration,
state: MetronomeState,
}
impl Metronome
{
/// Create a new metronome source with a defined cycle duration.
pub fn new(cycle: Duration) -> Self {
let period = cycle.into();
Self {
period,
delay: Delay::new(period),
state: MetronomeState::Snooze,
}
}
}
impl futures::Stream for Metronome
{
type Item = ();
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
loop {
match self.state {
MetronomeState::SetAlarm => {
let val = self.period.clone();
self.delay.reset(val);
self.state = MetronomeState::Snooze;
}
MetronomeState::Snooze => {
if !Pin::new(&mut self.delay).poll(cx).is_ready() {
break
}
self.state = MetronomeState::SetAlarm;
return Poll::Ready(Some(()));
}
}
}
Poll::Pending
}
}
@@ -20,10 +20,12 @@
//! care about the state of particular blocks.
use polkadot_primitives::v1::{Hash, Header, SessionInfo, SessionIndex};
use polkadot_node_subsystem::{
SubsystemContext,
overseer,
messages::{RuntimeApiMessage, RuntimeApiRequest},
errors::RuntimeApiError,
SubsystemContext,
};
use futures::channel::oneshot;
@@ -147,7 +149,7 @@ impl RollingSessionWindow {
/// some backwards drift in session index is acceptable.
pub async fn cache_session_info_for_head(
&mut self,
ctx: &mut impl SubsystemContext,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
block_hash: Hash,
block_header: &Header,
) -> Result<SessionWindowUpdate, SessionsUnavailable> {
@@ -162,7 +164,7 @@ impl RollingSessionWindow {
ctx.send_message(RuntimeApiMessage::Request(
if block_header.number == 0 { block_hash } else { block_header.parent_hash },
RuntimeApiRequest::SessionIndexForChild(s_tx),
).into()).await;
)).await;
match s_rx.await {
Ok(Ok(s)) => s,
@@ -263,7 +265,7 @@ impl RollingSessionWindow {
}
async fn load_all_sessions(
ctx: &mut impl SubsystemContext,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
block_hash: Hash,
start: SessionIndex,
end_inclusive: SessionIndex,
@@ -274,7 +276,7 @@ async fn load_all_sessions(
ctx.send_message(RuntimeApiMessage::Request(
block_hash,
RuntimeApiRequest::SessionInfo(i, tx),
).into()).await;
)).await;
let session_info = match rx.await {
Ok(Ok(Some(s))) => s,
@@ -295,7 +297,7 @@ async fn load_all_sessions(
mod tests {
use super::*;
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use polkadot_node_subsystem::messages::AllMessages;
use polkadot_node_subsystem::messages::{AllMessages, AvailabilityRecoveryMessage};
use sp_core::testing::TaskExecutor;
use assert_matches::assert_matches;
@@ -331,7 +333,7 @@ mod tests {
};
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
let (mut ctx, mut handle) = make_subsystem_context::<AvailabilityRecoveryMessage, _>(pool.clone());
let hash = header.hash();
@@ -26,6 +26,7 @@ use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_primitives::v1::{CoreState, EncodeAs, GroupIndex, GroupRotationInfo, Hash, OccupiedCore, SessionIndex, SessionInfo, Signed, SigningContext, UncheckedSigned, ValidatorId, ValidatorIndex};
use polkadot_node_subsystem::SubsystemContext;
use crate::{
request_session_index_for_child, request_session_info,
request_availability_cores,
@@ -107,13 +108,11 @@ impl RuntimeInfo {
}
/// Get `ExtendedSessionInfo` by relay parent hash.
pub async fn get_session_info<'a, Context>(
pub async fn get_session_info<'a>(
&'a mut self,
ctx: &mut Context,
ctx: &mut impl SubsystemContext,
parent: Hash,
) -> Result<&'a ExtendedSessionInfo>
where
Context: SubsystemContext,
{
let session_index = self.get_session_index(ctx, parent).await?;
@@ -124,14 +123,12 @@ impl RuntimeInfo {
///
/// `request_session_info` still requires the parent to be passed in, so we take the parent
/// in addition to the `SessionIndex`.
pub async fn get_session_info_by_index<'a, Context>(
pub async fn get_session_info_by_index<'a>(
&'a mut self,
ctx: &mut Context,
ctx: &mut impl SubsystemContext,
parent: Hash,
session_index: SessionIndex,
) -> Result<&'a ExtendedSessionInfo>
where
Context: SubsystemContext,
{
if !self.session_info_cache.contains(&session_index) {
let session_info =
@@ -225,7 +222,7 @@ pub fn check_signature<Payload, RealPayload>(
session_info: &SessionInfo,
relay_parent: Hash,
signed: UncheckedSigned<Payload, RealPayload>,
) -> std::result::Result<Signed<Payload, RealPayload>, UncheckedSigned<Payload, RealPayload>>
) -> std::result::Result<Signed<Payload, RealPayload>, UncheckedSigned<Payload, RealPayload>>
where
Payload: EncodeAs<RealPayload> + Clone,
RealPayload: Encode + Clone,
@@ -243,7 +240,7 @@ where
/// Request availability cores from the runtime.
pub async fn get_availability_cores<Context>(ctx: &mut Context, relay_parent: Hash)
-> Result<Vec<CoreState>>
-> Result<Vec<CoreState>>
where
Context: SubsystemContext,
{
@@ -276,8 +273,8 @@ where
/// Get group rotation info based on the given relay_parent.
pub async fn get_group_rotation_info<Context>(ctx: &mut Context, relay_parent: Hash)
-> Result<GroupRotationInfo>
where
Context: SubsystemContext
where
Context: SubsystemContext,
{
// We drop `groups` here as we don't need them, because of `RuntimeInfo`. Ideally we would not
// fetch them in the first place.
+1 -1
View File
@@ -75,7 +75,7 @@ impl JobTrait for FakeCollatorProtocolJob {
sender.send_message(CollatorProtocolMessage::Invalid(
Default::default(),
Default::default(),
).into()).await;
)).await;
}
// it isn't necessary to break run_loop into its own function,