diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index cfc4f6ed86..6502399a64 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4606,6 +4606,7 @@ dependencies = [ "bitvec", "derive_more 0.99.9", "futures 0.3.5", + "log 0.4.8", "polkadot-erasure-coding", "polkadot-node-primitives", "polkadot-node-subsystem", diff --git a/polkadot/node/core/backing/Cargo.toml b/polkadot/node/core/backing/Cargo.toml index a660df3c31..22f8dcc734 100644 --- a/polkadot/node/core/backing/Cargo.toml +++ b/polkadot/node/core/backing/Cargo.toml @@ -17,6 +17,7 @@ erasure-coding = { package = "polkadot-erasure-coding", path = "../../../erasure statement-table = { package = "polkadot-statement-table", path = "../../../statement-table" } derive_more = "0.99.9" bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] } +log = "0.4.8" [dev-dependencies] sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 837bcac767..4d2a3817be 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -32,7 +32,7 @@ use polkadot_primitives::v1::{ CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId, ValidatorIndex, SigningContext, PoV, OmittedValidationData, CandidateDescriptor, AvailableData, ValidatorSignature, Hash, CandidateReceipt, - CandidateCommitments, + CandidateCommitments, CoreState, CoreIndex, }; use polkadot_node_primitives::{ FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, @@ -44,12 +44,14 @@ use polkadot_subsystem::{ AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage, CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData, ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed, + RuntimeApiRequest, }, util::{ self, - request_signing_context, + request_session_index_for_child, request_validator_groups, request_validators, + request_from_runtime, Validator, }, }; @@ -680,19 +682,56 @@ impl util::JobTrait for CandidateBackingJob { mut tx_from: mpsc::Sender, ) -> Pin> + Send>> { async move { - let (validators, roster, signing_context) = futures::try_join!( + macro_rules! try_runtime_api { + ($x: expr) => { + match $x { + Ok(x) => x, + Err(e) => { + log::warn!( + target: "candidate_backing", + "Failed to fetch runtime API data for job: {:?}", + e, + ); + + // We can't do candidate validation work if we don't have the + // requisite runtime API data. But these errors should not take + // down the node. + return Ok(()); + } + } + } + } + + let (validators, groups, session_index, cores) = futures::try_join!( request_validators(parent, &mut tx_from).await?, request_validator_groups(parent, &mut tx_from).await?, - request_signing_context(parent, &mut tx_from).await?, + request_session_index_for_child(parent, &mut tx_from).await?, + request_from_runtime( + parent, + &mut tx_from, + |tx| RuntimeApiRequest::AvailabilityCores(tx), + ).await?, )?; + let validators = try_runtime_api!(validators); + let (validator_groups, group_rotation_info) = try_runtime_api!(groups); + let session_index = try_runtime_api!(session_index); + let cores = try_runtime_api!(cores); + + let signing_context = SigningContext { parent_hash: parent, session_index }; let validator = Validator::construct(&validators, signing_context, keystore.clone())?; let mut groups = HashMap::new(); - for assignment in roster.scheduled { - if let Some(g) = roster.validator_groups.get(assignment.group_idx.0 as usize) { - groups.insert(assignment.para_id, g.clone()); + let n_cores = cores.len(); + for (idx, core) in cores.into_iter().enumerate() { + // Ignore prospective assignments on occupied cores for the time being. + if let CoreState::Scheduled(scheduled) = core { + let core_index = CoreIndex(idx as _); + let group_index = group_rotation_info.group_for_core(core_index, n_cores); + if let Some(g) = validator_groups.get(group_index.0 as usize) { + groups.insert(scheduled.para_id, g.clone()); + } } } @@ -779,12 +818,12 @@ mod tests { use assert_matches::assert_matches; use futures::{executor, future, Future}; use polkadot_primitives::v1::{ - AssignmentKind, BlockData, CandidateCommitments, CollatorId, CoreAssignment, CoreIndex, - LocalValidationData, GlobalValidationData, GroupIndex, HeadData, - ValidatorPair, ValidityAttestation, + ScheduledCore, BlockData, CandidateCommitments, CollatorId, + LocalValidationData, GlobalValidationData, HeadData, + ValidatorPair, ValidityAttestation, GroupRotationInfo, }; use polkadot_subsystem::{ - messages::{RuntimeApiRequest, SchedulerRoster}, + messages::RuntimeApiRequest, ActiveLeavesUpdate, FromOverseer, OverseerSignal, }; use sp_keyring::Sr25519Keyring; @@ -801,7 +840,8 @@ mod tests { validator_public: Vec, global_validation_data: GlobalValidationData, local_validation_data: LocalValidationData, - roster: SchedulerRoster, + validator_groups: (Vec>, GroupRotationInfo), + availability_cores: Vec, head_data: HashMap, signing_context: SigningContext, relay_parent: Hash, @@ -830,53 +870,39 @@ mod tests { let validator_public = validator_pubkeys(&validators); - let chain_a_assignment = CoreAssignment { - core: CoreIndex::from(0), - para_id: chain_a, - kind: AssignmentKind::Parachain, - group_idx: GroupIndex::from(0), - }; - - let chain_b_assignment = CoreAssignment { - core: CoreIndex::from(1), - para_id: chain_b, - kind: AssignmentKind::Parachain, - group_idx: GroupIndex::from(1), + let validator_groups = vec![vec![2, 0, 3], vec![1], vec![4]]; + let group_rotation_info = GroupRotationInfo { + session_start_block: 0, + group_rotation_frequency: 100, + now: 1, }; let thread_collator: CollatorId = Sr25519Keyring::Two.public().into(); - - let thread_a_assignment = CoreAssignment { - core: CoreIndex::from(2), - para_id: thread_a, - kind: AssignmentKind::Parathread(thread_collator.clone(), 0), - group_idx: GroupIndex::from(2), - }; - - let validator_groups = vec![vec![2, 0, 3], vec![1], vec![4]]; - - let parent_hash_1 = [1; 32].into(); - - let roster = SchedulerRoster { - validator_groups, - scheduled: vec![ - chain_a_assignment, - chain_b_assignment, - thread_a_assignment, - ], - upcoming: vec![], - availability_cores: vec![], - }; - let signing_context = SigningContext { - session_index: 1, - parent_hash: parent_hash_1, - }; + let availability_cores = vec![ + CoreState::Scheduled(ScheduledCore { + para_id: chain_a, + collator: None, + }), + CoreState::Scheduled(ScheduledCore { + para_id: chain_b, + collator: None, + }), + CoreState::Scheduled(ScheduledCore { + para_id: thread_a, + collator: Some(thread_collator.clone()), + }), + ]; let mut head_data = HashMap::new(); head_data.insert(chain_a, HeadData(vec![4, 5, 6])); let relay_parent = Hash::from([5; 32]); + let signing_context = SigningContext { + session_index: 1, + parent_hash: relay_parent, + }; + let local_validation_data = LocalValidationData { parent_head: HeadData(vec![7, 8, 9]), balance: Default::default(), @@ -895,7 +921,8 @@ mod tests { keystore, validators, validator_public, - roster, + validator_groups: (validator_groups, group_rotation_info), + availability_cores, head_data, local_validation_data, global_validation_data, @@ -984,7 +1011,7 @@ mod tests { AllMessages::RuntimeApi( RuntimeApiMessage::Request(parent, RuntimeApiRequest::Validators(tx)) ) if parent == test_state.relay_parent => { - tx.send(test_state.validator_public.clone()).unwrap(); + tx.send(Ok(test_state.validator_public.clone())).unwrap(); } ); @@ -994,19 +1021,29 @@ mod tests { AllMessages::RuntimeApi( RuntimeApiMessage::Request(parent, RuntimeApiRequest::ValidatorGroups(tx)) ) if parent == test_state.relay_parent => { - tx.send(test_state.roster.clone()).unwrap(); + tx.send(Ok(test_state.validator_groups.clone())).unwrap(); } ); - // Check that subsystem job issues a request for the signing context. + // Check that subsystem job issues a request for the session index for child. assert_matches!( virtual_overseer.recv().await, AllMessages::RuntimeApi( - RuntimeApiMessage::Request(parent, RuntimeApiRequest::SigningContext(tx)) + RuntimeApiMessage::Request(parent, RuntimeApiRequest::SessionIndexForChild(tx)) ) if parent == test_state.relay_parent => { - tx.send(test_state.signing_context.clone()).unwrap(); + tx.send(Ok(test_state.signing_context.session_index)).unwrap(); } ); + + // Check that subsystem job issues a request for the availability cores. + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::AvailabilityCores(tx)) + ) if parent == test_state.relay_parent => { + tx.send(Ok(test_state.availability_cores.clone())).unwrap(); + } + ); } // Test that a `CandidateBackingMessage::Second` issues validation work diff --git a/polkadot/node/core/bitfield-signing/src/lib.rs b/polkadot/node/core/bitfield-signing/src/lib.rs index 806e3c1418..601f0b41b9 100644 --- a/polkadot/node/core/bitfield-signing/src/lib.rs +++ b/polkadot/node/core/bitfield-signing/src/lib.rs @@ -30,7 +30,7 @@ use polkadot_node_subsystem::{ }, util::{self, JobManager, JobTrait, ToJobTrait, Validator}, }; -use polkadot_primitives::v1::{AvailabilityBitfield, CoreOccupied, Hash}; +use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash}; use std::{convert::TryFrom, pin::Pin, time::Duration}; use wasm_timer::{Delay, Instant}; @@ -130,8 +130,7 @@ pub enum Error { // this function exists mainly to collect a bunch of potential error points into one. async fn get_core_availability( relay_parent: Hash, - idx: usize, - core: Option, + core: CoreState, sender: &mpsc::Sender, ) -> Result { use messages::{ @@ -144,18 +143,23 @@ async fn get_core_availability( // we have to (cheaply) clone this sender so we can mutate it to actually send anything let mut sender = sender.clone(); - // REVIEW: is it safe to ignore parathreads here, or do they also figure in the availability mapping? - if let Some(CoreOccupied::Parachain) = core { + if let CoreState::Occupied(core) = core { let (tx, rx) = oneshot::channel(); sender .send(RuntimeApi(Request( relay_parent, - CandidatePendingAvailability(idx.into(), tx), + CandidatePendingAvailability(core.para_id, tx), ))) .await?; + let committed_candidate_receipt = match rx.await? { - Some(ccr) => ccr, - None => return Ok(false), + Ok(Some(ccr)) => ccr, + Ok(None) => return Ok(false), + Err(e) => { + // Don't take down the node on runtime API errors. + log::warn!(target: "bitfield_signing", "Encountered a runtime API error: {:?}", e); + return Ok(false); + } }; let (tx, rx) = oneshot::channel(); sender @@ -171,35 +175,41 @@ async fn get_core_availability( // the way this function works is not intuitive: // -// - get the scheduler roster so we have a list of cores, in order. +// - get the availability cores so we have a list of cores, in order. // - for each occupied core, fetch `candidate_pending_availability` from runtime // - from there, we can get the `CandidateDescriptor` // - from there, we can send a `AvailabilityStore::QueryPoV` and set the indexed bit to 1 if it returns Some(_) async fn construct_availability_bitfield( relay_parent: Hash, sender: &mut mpsc::Sender, -) -> Result { +) -> Result, Error> { use futures::lock::Mutex; - use messages::RuntimeApiRequest::ValidatorGroups; + use messages::RuntimeApiRequest::AvailabilityCores; use FromJob::RuntimeApi; use RuntimeApiMessage::Request; - // request the validator groups so we can get the scheduler roster + // request the availability cores metadata from runtime. let (tx, rx) = oneshot::channel(); sender - .send(RuntimeApi(Request(relay_parent, ValidatorGroups(tx)))) + .send(RuntimeApi(Request(relay_parent, AvailabilityCores(tx)))) .await?; // we now need sender to be immutable so we can copy the reference to multiple concurrent closures let sender = &*sender; - // wait for the scheduler roster - let scheduler_roster = rx.await?; + // wait for the cores + let availability_cores = match rx.await? { + Ok(a) => a, + Err(e) => { + log::warn!(target: "bitfield_signing", "Encountered a runtime API error: {:?}", e); + return Ok(None); + } + }; // prepare outputs let out = - Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; scheduler_roster.availability_cores.len())); + Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; availability_cores.len())); // in principle, we know that we never want concurrent access to the _same_ bit within the vec; // we could `let out_ref = out.as_mut_ptr();` here instead, and manually assign bits, avoiding // any need to ever wait to lock this mutex. @@ -213,9 +223,9 @@ async fn construct_availability_bitfield( // // In principle, this work is all concurrent, not parallel. In practice, we can't guarantee it, which is why // we need the mutexes and explicit references above. - stream::iter(scheduler_roster.availability_cores.into_iter().enumerate()) + stream::iter(availability_cores.into_iter().enumerate()) .for_each_concurrent(None, |(idx, core)| async move { - let availability = match get_core_availability(relay_parent, idx, core, sender).await { + let availability = match get_core_availability(relay_parent, core, sender).await { Ok(availability) => availability, Err(err) => { errs_ref.lock().await.push(err); @@ -228,7 +238,7 @@ async fn construct_availability_bitfield( let errs = errs.into_inner(); if errs.is_empty() { - Ok(out.into_inner().into()) + Ok(Some(out.into_inner().into())) } else { Err(errs.into()) } @@ -264,7 +274,13 @@ impl JobTrait for BitfieldSigningJob { // wait a bit before doing anything else Delay::new_at(wait_until).await?; - let bitfield = construct_availability_bitfield(relay_parent, &mut sender).await?; + let bitfield = + match construct_availability_bitfield(relay_parent, &mut sender).await? + { + None => return Ok(()), + Some(b) => b, + }; + let signed_bitfield = validator.sign(bitfield); // make an anonymous scope to contain some use statements to simplify creating the outbound message diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 11c9563cb0..66f9ca9509 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -161,17 +161,23 @@ impl BitfieldDistribution { for relay_parent in activated { trace!(target: "bitd", "Start {:?}", relay_parent); // query basic system parameters once - let (validator_set, signing_context) = - query_basics(&mut ctx, relay_parent).await?; - - let _ = state.per_relay_parent.insert( - relay_parent, - PerRelayParentData { - signing_context, - validator_set, - ..Default::default() - }, - ); + if let Some((validator_set, signing_context)) = + query_basics(&mut ctx, relay_parent).await? + { + // If our runtime API fails, we don't take down the node, + // but we might alter peers' reputations erroneously as a result + // of not having the correct bookkeeping. If we have lost a race + // with state pruning, it is unlikely that peers will be sending + // us anything to do with this relay-parent anyway. + let _ = state.per_relay_parent.insert( + relay_parent, + PerRelayParentData { + signing_context, + validator_set, + ..Default::default() + }, + ); + } } for relay_parent in deactivated { @@ -562,12 +568,12 @@ where async fn query_basics( ctx: &mut Context, relay_parent: Hash, -) -> SubsystemResult<(Vec, SigningContext)> +) -> SubsystemResult, SigningContext)>> where Context: SubsystemContext, { let (validators_tx, validators_rx) = oneshot::channel(); - let (signing_tx, signing_rx) = oneshot::channel(); + let (session_tx, session_rx) = oneshot::channel(); let query_validators = AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent.clone(), @@ -576,13 +582,22 @@ where let query_signing = AllMessages::RuntimeApi(RuntimeApiMessage::Request( relay_parent.clone(), - RuntimeApiRequest::SigningContext(signing_tx), + RuntimeApiRequest::SessionIndexForChild(session_tx), )); ctx.send_messages(std::iter::once(query_validators).chain(std::iter::once(query_signing))) .await?; - Ok((validators_rx.await?, signing_rx.await?)) + match (validators_rx.await?, session_rx.await?) { + (Ok(v), Ok(s)) => Ok(Some(( + v, + SigningContext { parent_hash: relay_parent, session_index: s }, + ))), + (Err(e), _) | (_, Err(e)) => { + warn!(target: "bitd", "Failed to fetch basics from runtime API: {:?}", e); + Ok(None) + } + } } #[cfg(test)] diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs index 24f14df3b0..4409d7f1ea 100644 --- a/polkadot/node/network/pov-distribution/src/lib.rs +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -115,10 +115,27 @@ async fn handle_signal( RuntimeApiRequest::Validators(vals_tx), ))).await?; + let n_validators = match vals_rx.await? { + Ok(v) => v.len(), + Err(e) => { + log::warn!(target: "pov_distribution", + "Error fetching validators from runtime API for active leaf: {:?}", + e + ); + + // Not adding bookkeeping here might make us behave funny, but we + // shouldn't take down the node on spurious runtime API errors. + // + // and this is "behave funny" as in be bad at our job, but not in any + // slashable or security-related way. + continue; + } + }; + state.relay_parent_state.insert(relay_parent, BlockBasedState { known: HashMap::new(), fetching: HashMap::new(), - n_validators: vals_rx.await?.len(), + n_validators: n_validators, }); } diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 86a3efbe91..5cc67cfea0 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -847,17 +847,37 @@ async fn run( let (session_tx, session_rx) = oneshot::channel(); let val_message = AllMessages::RuntimeApi( - RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Validators(val_tx)), + RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(val_tx), + ), ); let session_message = AllMessages::RuntimeApi( - RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::SigningContext(session_tx)), + RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(session_tx), + ), ); ctx.send_messages( std::iter::once(val_message).chain(std::iter::once(session_message)) ).await?; - (val_rx.await?, session_rx.await?.session_index) + match (val_rx.await?, session_rx.await?) { + (Ok(v), Ok(s)) => (v, s), + (Err(e), _) | (_, Err(e)) => { + log::warn!( + target: "statement_distribution", + "Failed to fetch runtime API data for active leaf: {:?}", + e, + ); + + // Lacking this bookkeeping might make us behave funny, although + // not in any slashable way. But we shouldn't take down the node + // on what are likely spurious runtime API errors. + continue; + } + } }; active_heads.entry(relay_parent) diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 710ba77ce9..d6bd4f45d8 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -25,11 +25,13 @@ use futures::channel::{mpsc, oneshot}; use polkadot_primitives::v1::{ - BlockNumber, Hash, CommittedCandidateReceipt, + Hash, CommittedCandidateReceipt, CandidateReceipt, PoV, ErasureChunk, BackedCandidate, Id as ParaId, - SignedAvailabilityBitfield, SigningContext, ValidatorId, ValidationCode, ValidatorIndex, - CoreAssignment, CoreOccupied, HeadData, CandidateDescriptor, - ValidatorSignature, OmittedValidationData, AvailableData, + SignedAvailabilityBitfield, ValidatorId, ValidationCode, ValidatorIndex, + CoreAssignment, CoreOccupied, CandidateDescriptor, + ValidatorSignature, OmittedValidationData, AvailableData, GroupRotationInfo, + CoreState, LocalValidationData, GlobalValidationData, OccupiedCoreAssumption, + CandidateEvent, SessionIndex, }; use polkadot_node_primitives::{ MisbehaviorReport, SignedFullStatement, View, ProtocolId, ValidationResult, @@ -286,23 +288,39 @@ pub struct SchedulerRoster { pub availability_cores: Vec>, } +/// A description of an error causing the runtime API request to be unservable. +#[derive(Debug, Clone)] +pub struct RuntimeApiError(String); + +/// A sender for the result of a runtime API request. +pub type RuntimeApiSender = oneshot::Sender>; + /// A request to the Runtime API subsystem. #[derive(Debug)] pub enum RuntimeApiRequest { /// Get the current validator set. - Validators(oneshot::Sender>), - /// Get the assignments of validators to cores. - ValidatorGroups(oneshot::Sender), - /// Get a signing context for bitfields and statements. - SigningContext(oneshot::Sender), - /// Get the validation code for a specific para, assuming execution under given block number, and - /// an optional block number representing an intermediate parablock executed in the context of - /// that block. - ValidationCode(ParaId, BlockNumber, Option, oneshot::Sender), - /// Get head data for a specific para. - HeadData(ParaId, oneshot::Sender), + Validators(RuntimeApiSender>), + /// Get the validator groups and group rotation info. + ValidatorGroups(RuntimeApiSender<(Vec>, GroupRotationInfo)>), + /// Get information on all availability cores. + AvailabilityCores(RuntimeApiSender>), + /// Get the global validation data. + GlobalValidationData(RuntimeApiSender), + /// Get the local validation data for a particular para, taking the given + /// `OccupiedCoreAssumption`, which will inform on how the validation data should be computed + /// if the para currently occupies a core. + LocalValidationData( + ParaId, + OccupiedCoreAssumption, + RuntimeApiSender>, + ), + /// Get the session index that a child of the block will have. + SessionIndexForChild(RuntimeApiSender), /// Get a the candidate pending availability for a particular parachain by parachain / core index - CandidatePendingAvailability(ParaId, oneshot::Sender>), + CandidatePendingAvailability(ParaId, RuntimeApiSender>), + /// Get all events concerning candidates (backing, inclusion, time-out) in the parent of + /// the block in whose state this request is executed. + CandidateEvents(RuntimeApiSender>), } /// A message to the Runtime API subsystem. diff --git a/polkadot/node/subsystem/src/util.rs b/polkadot/node/subsystem/src/util.rs index ced6b21bd9..ce31bebeda 100644 --- a/polkadot/node/subsystem/src/util.rs +++ b/polkadot/node/subsystem/src/util.rs @@ -21,7 +21,9 @@ //! this module. use crate::{ - messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, SchedulerRoster}, + messages::{ + AllMessages, RuntimeApiError, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender, + }, FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult, }; use futures::{ @@ -37,8 +39,8 @@ use keystore::KeyStorePtr; use parity_scale_codec::Encode; use pin_project::{pin_project, pinned_drop}; use polkadot_primitives::v1::{ - EncodeAs, Hash, HeadData, Id as ParaId, Signed, SigningContext, - ValidatorId, ValidatorIndex, ValidatorPair, + EncodeAs, Hash, Signed, SigningContext, SessionIndex, + ValidatorId, ValidatorIndex, ValidatorPair, GroupRotationInfo, }; use sp_core::{ Pair, @@ -70,6 +72,9 @@ pub enum Error { /// A subsystem error #[from] Subsystem(SubsystemError), + /// An error in the runtime API. + #[from] + RuntimeApi(RuntimeApiError), /// The type system wants this even though it doesn't make sense #[from] Infallible(std::convert::Infallible), @@ -83,14 +88,17 @@ pub enum Error { AlreadyForwarding, } +/// A type alias for Runtime API receivers. +pub type RuntimeApiReceiver = oneshot::Receiver>; + /// Request some data from the `RuntimeApi`. pub async fn request_from_runtime( parent: Hash, sender: &mut mpsc::Sender, request_builder: RequestBuilder, -) -> Result, Error> +) -> Result, Error> where - RequestBuilder: FnOnce(oneshot::Sender) -> RuntimeApiRequest, + RequestBuilder: FnOnce(RuntimeApiSender) -> RuntimeApiRequest, FromJob: TryFrom, >::Error: std::fmt::Debug, { @@ -111,7 +119,7 @@ where pub async fn request_validators( parent: Hash, s: &mut mpsc::Sender, -) -> Result>, Error> +) -> Result>, Error> where FromJob: TryFrom, >::Error: std::fmt::Debug, @@ -119,11 +127,11 @@ where request_from_runtime(parent, s, |tx| RuntimeApiRequest::Validators(tx)).await } -/// Request the scheduler roster from `RuntimeApi`. +/// Request the validator groups. pub async fn request_validator_groups( parent: Hash, s: &mut mpsc::Sender, -) -> Result, Error> +) -> Result>, GroupRotationInfo)>, Error> where FromJob: TryFrom, >::Error: std::fmt::Debug, @@ -131,29 +139,18 @@ where request_from_runtime(parent, s, |tx| RuntimeApiRequest::ValidatorGroups(tx)).await } -/// Request a `SigningContext` from the `RuntimeApi`. -pub async fn request_signing_context( +/// Request the session index of the child block. +pub async fn request_session_index_for_child( parent: Hash, s: &mut mpsc::Sender, -) -> Result, Error> +) -> Result, Error> where FromJob: TryFrom, >::Error: std::fmt::Debug, { - request_from_runtime(parent, s, |tx| RuntimeApiRequest::SigningContext(tx)).await -} - -/// Request `HeadData` for some `ParaId` from `RuntimeApi`. -pub async fn request_head_data( - parent: Hash, - s: &mut mpsc::Sender, - id: ParaId, -) -> Result, Error> -where - FromJob: TryFrom, - >::Error: std::fmt::Debug, -{ - request_from_runtime(parent, s, |tx| RuntimeApiRequest::HeadData(id, tx)).await + request_from_runtime(parent, s, |tx| { + RuntimeApiRequest::SessionIndexForChild(tx) + }).await } /// From the given set of validators, find the first key we can sign with, if any. @@ -185,14 +182,21 @@ impl Validator { FromJob: TryFrom, >::Error: std::fmt::Debug, { - // Note: request_validators and request_signing_context do not and cannot run concurrently: they both - // have a mutable handle to the same sender. + // Note: request_validators and request_session_index_for_child do not and cannot + // run concurrently: they both have a mutable handle to the same sender. // However, each of them returns a oneshot::Receiver, and those are resolved concurrently. - let (validators, signing_context) = futures::try_join!( + let (validators, session_index) = futures::try_join!( request_validators(parent, &mut sender).await?, - request_signing_context(parent, &mut sender).await?, + request_session_index_for_child(parent, &mut sender).await?, )?; + let signing_context = SigningContext { + session_index: session_index?, + parent_hash: parent, + }; + + let validators = validators?; + Self::construct(&validators, signing_context, keystore) }