From 8ebbe19d104e016ec61d6254baca2d8a630fa767 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 29 Mar 2021 01:18:53 +0200 Subject: [PATCH] Split NetworkBridge and break cycles with Unbounded (#2736) * overseer: pass messages directly between subsystems * test that message is held on to * Update node/overseer/src/lib.rs Co-authored-by: Peter Goodspeed-Niklaus * give every subsystem an unbounded sender too * remove metered_channel::name 1. we don't provide good names 2. these names are never used anywhere * unused mut * remove unnecessary &mut * subsystem unbounded_send * remove unused MaybeTimer We have channel size metrics that serve the same purpose better now and the implementation of message timing was pretty ugly. * remove comment * split up senders and receivers * update metrics * fix tests * fix test subsystem context * use SubsystemSender in jobs system now * refactor of awful jobs code * expose public `run` on JobSubsystem * update candidate backing to new jobs & use unbounded * bitfield signing * candidate-selection * provisioner * approval voting: send unbounded for assignment/approvals * async not needed * begin bridge split * split up network tasks into background worker * port over network bridge * Update node/network/bridge/src/lib.rs Co-authored-by: Andronik Ordian * rename ValidationWorkerNotifications Co-authored-by: Peter Goodspeed-Niklaus Co-authored-by: Andronik Ordian --- polkadot/Cargo.lock | 3 + polkadot/node/core/approval-voting/src/lib.rs | 22 +- polkadot/node/core/backing/src/lib.rs | 291 +++--- .../node/core/bitfield-signing/Cargo.toml | 3 + .../node/core/bitfield-signing/src/lib.rs | 69 +- .../node/core/candidate-selection/Cargo.toml | 1 + .../node/core/candidate-selection/src/lib.rs | 229 ++-- polkadot/node/core/provisioner/Cargo.toml | 1 + polkadot/node/core/provisioner/src/lib.rs | 63 +- polkadot/node/core/provisioner/src/tests.rs | 51 +- polkadot/node/network/bridge/Cargo.toml | 2 +- polkadot/node/network/bridge/src/action.rs | 223 ---- polkadot/node/network/bridge/src/lib.rs | 982 +++++++++++------- polkadot/node/network/bridge/src/network.rs | 2 +- .../node/subsystem-test-helpers/src/lib.rs | 9 + polkadot/node/subsystem-util/src/lib.rs | 586 ++++------- 16 files changed, 1191 insertions(+), 1346 deletions(-) delete mode 100644 polkadot/node/network/bridge/src/action.rs diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 09cdbe1930..2d13d3012f 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5632,6 +5632,7 @@ version = "0.1.0" dependencies = [ "futures 0.3.13", "polkadot-node-subsystem", + "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", "polkadot-primitives", "sp-keystore", @@ -5647,6 +5648,7 @@ dependencies = [ "futures 0.3.13", "polkadot-node-primitives", "polkadot-node-subsystem", + "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", "polkadot-primitives", "sp-core", @@ -5720,6 +5722,7 @@ dependencies = [ "futures 0.3.13", "futures-timer 3.0.2", "polkadot-node-subsystem", + "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", "polkadot-primitives", "sp-application-crypto", diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index c079724a5a..61168d378e 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -564,10 +564,10 @@ async fn handle_actions( let block_hash = indirect_cert.block_hash; let validator_index = indirect_cert.validator; - ctx.send_message(ApprovalDistributionMessage::DistributeAssignment( + ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment( indirect_cert, candidate_index, - ).into()).await; + ).into()); launch_approval( ctx, @@ -712,7 +712,7 @@ async fn handle_background_request( ) -> SubsystemResult> { match request { BackgroundRequest::ApprovalVote(vote_request) => { - issue_approval(ctx, state, metrics, vote_request).await + issue_approval(ctx, state, metrics, vote_request) } BackgroundRequest::CandidateValidation( validation_data, @@ -1724,7 +1724,7 @@ async fn launch_approval( // Issue and import a local approval vote. Should only be invoked after approval checks // have been done. -async fn issue_approval( +fn issue_approval( ctx: &mut impl SubsystemContext, state: &State, metrics: &Metrics, @@ -1830,12 +1830,14 @@ async fn issue_approval( metrics.on_approval_produced(); // dispatch to approval distribution. - ctx.send_message(ApprovalDistributionMessage::DistributeApproval(IndirectSignedApprovalVote { - block_hash, - candidate_index: candidate_index as _, - validator: validator_index, - signature: sig, - }).into()).await; + ctx.send_unbounded_message( + ApprovalDistributionMessage::DistributeApproval(IndirectSignedApprovalVote { + block_hash, + candidate_index: candidate_index as _, + validator: validator_index, + signature: sig, + } + ).into()); Ok(actions) } diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 8121f0dd83..927ea955c2 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -35,7 +35,8 @@ use polkadot_node_primitives::{ Statement, SignedFullStatement, ValidationResult, }; use polkadot_subsystem::{ - PerLeafSpan, Stage, jaeger, + PerLeafSpan, Stage, SubsystemSender, + jaeger, messages::{ AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage, CandidateValidationMessage, @@ -50,8 +51,8 @@ use polkadot_node_subsystem_util::{ request_validators, request_from_runtime, Validator, - delegated_subsystem, FromJobCommand, + JobSender, metrics::{self, prometheus}, }; use statement_table::{ @@ -68,8 +69,9 @@ use thiserror::Error; const LOG_TARGET: &str = "parachain::candidate-backing"; +/// Errors that can occur in candidate backing. #[derive(Debug, Error)] -enum Error { +pub enum Error { #[error("Candidate is not found")] CandidateNotFound, #[error("Signature is invalid")] @@ -142,11 +144,9 @@ impl ValidatedCandidateCommand { } /// Holds all data needed for candidate backing job operation. -struct CandidateBackingJob { +pub struct CandidateBackingJob { /// The hash of the relay parent on top of which this job is doing it's work. parent: Hash, - /// Outbound message channel sending part. - tx_from: mpsc::Sender, /// The `ParaId` assigned to this validator assignment: Option, /// The collator required to author the candidate, if any. @@ -294,23 +294,20 @@ fn table_attested_to_backed( } async fn store_available_data( - tx_from: &mut mpsc::Sender, + sender: &mut JobSender, id: Option, n_validators: u32, candidate_hash: CandidateHash, available_data: AvailableData, ) -> Result<(), Error> { let (tx, rx) = oneshot::channel(); - tx_from.send(AllMessages::AvailabilityStore( - AvailabilityStoreMessage::StoreAvailableData( - candidate_hash, - id, - n_validators, - available_data, - tx, - ) - ).into() - ).await?; + sender.send_message(AvailabilityStoreMessage::StoreAvailableData( + candidate_hash, + id, + n_validators, + available_data, + tx, + ).into()).await; let _ = rx.await.map_err(Error::StoreAvailableData)?; @@ -321,9 +318,9 @@ async fn store_available_data( // // This will compute the erasure root internally and compare it to the expected erasure root. // This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`. -#[tracing::instrument(level = "trace", skip(tx_from, pov, span), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(sender, pov, span), fields(subsystem = LOG_TARGET))] async fn make_pov_available( - tx_from: &mut mpsc::Sender, + sender: &mut JobSender, validator_index: Option, n_validators: usize, pov: Arc, @@ -361,7 +358,7 @@ async fn make_pov_available( ); store_available_data( - tx_from, + sender, validator_index, n_validators as u32, candidate_hash, @@ -373,7 +370,7 @@ async fn make_pov_available( } async fn request_pov( - tx_from: &mut mpsc::Sender, + sender: &mut JobSender, relay_parent: Hash, from_validator: ValidatorIndex, candidate_hash: CandidateHash, @@ -381,35 +378,33 @@ async fn request_pov( ) -> Result, Error> { let (tx, rx) = oneshot::channel(); - tx_from.send(FromJobCommand::SendMessage(AllMessages::AvailabilityDistribution( - AvailabilityDistributionMessage::FetchPoV { - relay_parent, - from_validator, - candidate_hash, - pov_hash, - tx, - } - ))).await?; + sender.send_message(AvailabilityDistributionMessage::FetchPoV { + relay_parent, + from_validator, + candidate_hash, + pov_hash, + tx, + }.into()).await; let pov = rx.await.map_err(|_| Error::FetchPoV)?; Ok(Arc::new(pov)) } async fn request_candidate_validation( - tx_from: &mut mpsc::Sender, + sender: &mut JobSender, candidate: CandidateDescriptor, pov: Arc, ) -> Result { let (tx, rx) = oneshot::channel(); - tx_from.send(AllMessages::CandidateValidation( + sender.send_message(AllMessages::CandidateValidation( CandidateValidationMessage::ValidateFromChainState( candidate, pov, tx, ) ).into() - ).await?; + ).await; match rx.await { Ok(Ok(validation_result)) => Ok(validation_result), @@ -420,8 +415,8 @@ async fn request_candidate_validation( type BackgroundValidationResult = Result<(CandidateReceipt, CandidateCommitments, Arc), CandidateReceipt>; -struct BackgroundValidationParams { - tx_from: mpsc::Sender, +struct BackgroundValidationParams { + sender: JobSender, tx_command: mpsc::Sender, candidate: CandidateReceipt, relay_parent: Hash, @@ -433,10 +428,13 @@ struct BackgroundValidationParams { } async fn validate_and_make_available( - params: BackgroundValidationParams ValidatedCandidateCommand + Sync>, + params: BackgroundValidationParams< + impl SubsystemSender, + impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync, + > ) -> Result<(), Error> { let BackgroundValidationParams { - mut tx_from, + mut sender, mut tx_command, candidate, relay_parent, @@ -456,12 +454,12 @@ async fn validate_and_make_available( } => { let _span = span.as_ref().map(|s| s.child("request-pov")); match request_pov( - &mut tx_from, - relay_parent, - from_validator, - candidate_hash, - pov_hash, - ).await { + &mut sender, + relay_parent, + from_validator, + candidate_hash, + pov_hash, + ).await { Err(Error::FetchPoV) => { tx_command.send(ValidatedCandidateCommand::AttestNoPoV(candidate.hash())).await.map_err(Error::Mpsc)?; return Ok(()) @@ -478,7 +476,7 @@ async fn validate_and_make_available( .with_pov(&pov) .with_para_id(candidate.descriptor().para_id) }); - request_candidate_validation(&mut tx_from, candidate.descriptor.clone(), pov.clone()).await? + request_candidate_validation(&mut sender, candidate.descriptor.clone(), pov.clone()).await? }; let expected_commitments_hash = candidate.commitments_hash; @@ -502,7 +500,7 @@ async fn validate_and_make_available( Err(candidate) } else { let erasure_valid = make_pov_available( - &mut tx_from, + &mut sender, validator_index, n_validators, pov.clone(), @@ -544,6 +542,7 @@ impl CandidateBackingJob { /// Run asynchronously. async fn run_loop( mut self, + mut sender: JobSender, mut rx_to: mpsc::Receiver, span: PerLeafSpan, ) -> Result<(), Error> { @@ -552,7 +551,7 @@ impl CandidateBackingJob { validated_command = self.background_validation.next() => { let _span = span.child("process-validation-result"); if let Some(c) = validated_command { - self.handle_validated_candidate_command(&span, c).await?; + self.handle_validated_candidate_command(&span, &mut sender, c).await?; } else { panic!("`self` hasn't dropped and `self` holds a reference to this sender; qed"); } @@ -563,7 +562,7 @@ impl CandidateBackingJob { // we intentionally want spans created in `process_msg` to descend from the // `span ` which is longer-lived than this ephemeral timing span. let _timing_span = span.child("process-message"); - self.process_msg(&span, msg).await?; + self.process_msg(&span, &mut sender, msg).await?; } } } @@ -572,10 +571,11 @@ impl CandidateBackingJob { Ok(()) } - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self, parent_span, sender), fields(subsystem = LOG_TARGET))] async fn handle_validated_candidate_command( &mut self, parent_span: &jaeger::Span, + sender: &mut JobSender, command: ValidatedCandidateCommand, ) -> Result<(), Error> { let candidate_hash = command.candidate_hash(); @@ -596,15 +596,20 @@ impl CandidateBackingJob { commitments, }); if let Some(stmt) = self.sign_import_and_distribute_statement( + sender, statement, parent_span, ).await? { - self.issue_candidate_seconded_message(stmt).await?; + sender.send_message( + CandidateSelectionMessage::Seconded(self.parent, stmt).into() + ).await; } } } Err(candidate) => { - self.issue_candidate_invalid_message(candidate).await?; + sender.send_message( + CandidateSelectionMessage::Invalid(self.parent, candidate).into() + ).await; } } } @@ -615,7 +620,7 @@ impl CandidateBackingJob { if !self.issued_statements.contains(&candidate_hash) { if res.is_ok() { let statement = Statement::Valid(candidate_hash); - self.sign_import_and_distribute_statement(statement, &parent_span).await?; + self.sign_import_and_distribute_statement(sender, statement, &parent_span).await?; } self.issued_statements.insert(candidate_hash); } @@ -627,7 +632,7 @@ impl CandidateBackingJob { // Ok, another try: let c_span = span.as_ref().map(|s| s.child("try")); let attesting = attesting.clone(); - self.kick_off_validation_work(attesting, c_span).await? + self.kick_off_validation_work(sender, attesting, c_span).await? } } else { @@ -643,10 +648,12 @@ impl CandidateBackingJob { Ok(()) } - #[tracing::instrument(level = "trace", skip(self, params), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self, sender, params), fields(subsystem = LOG_TARGET))] async fn background_validate_and_make_available( &mut self, + sender: &mut JobSender, params: BackgroundValidationParams< + impl SubsystemSender, impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync >, ) -> Result<(), Error> { @@ -658,36 +665,19 @@ impl CandidateBackingJob { tracing::error!(target: LOG_TARGET, "Failed to validate and make available: {:?}", e); } }; - self.tx_from.send(FromJobCommand::Spawn("Backing Validation", bg.boxed())).await?; + sender.send_command(FromJobCommand::Spawn("Backing Validation", bg.boxed())).await?; } Ok(()) } - async fn issue_candidate_invalid_message( - &mut self, - candidate: CandidateReceipt, - ) -> Result<(), Error> { - self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Invalid(self.parent, candidate)).into()).await?; - - Ok(()) - } - - async fn issue_candidate_seconded_message( - &mut self, - statement: SignedFullStatement, - ) -> Result<(), Error> { - self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Seconded(self.parent, statement)).into()).await?; - - Ok(()) - } - /// Kick off background validation with intent to second. - #[tracing::instrument(level = "trace", skip(self, parent_span, pov), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self, parent_span, sender, pov), fields(subsystem = LOG_TARGET))] async fn validate_and_second( &mut self, parent_span: &jaeger::Span, root_span: &jaeger::Span, + sender: &mut JobSender, candidate: &CandidateReceipt, pov: Arc, ) -> Result<(), Error> { @@ -695,7 +685,9 @@ impl CandidateBackingJob { if self.required_collator.as_ref() .map_or(false, |c| c != &candidate.descriptor().collator) { - self.issue_candidate_invalid_message(candidate.clone()).await?; + sender.send_message( + CandidateSelectionMessage::Invalid(self.parent, candidate.clone()).into() + ).await; return Ok(()); } @@ -715,29 +707,36 @@ impl CandidateBackingJob { "Validate and second candidate", ); - self.background_validate_and_make_available(BackgroundValidationParams { - tx_from: self.tx_from.clone(), - tx_command: self.background_validation_tx.clone(), - candidate: candidate.clone(), - relay_parent: self.parent, - pov: PoVData::Ready(pov), - validator_index: self.table_context.validator.as_ref().map(|v| v.index()), - n_validators: self.table_context.validators.len(), - span, - make_command: ValidatedCandidateCommand::Second, - }).await?; + let bg_sender = sender.clone(); + self.background_validate_and_make_available( + sender, + BackgroundValidationParams { + sender: bg_sender, + tx_command: self.background_validation_tx.clone(), + candidate: candidate.clone(), + relay_parent: self.parent, + pov: PoVData::Ready(pov), + validator_index: self.table_context.validator.as_ref().map(|v| v.index()), + n_validators: self.table_context.validators.len(), + span, + make_command: ValidatedCandidateCommand::Second, + } + ).await?; Ok(()) } async fn sign_import_and_distribute_statement( &mut self, + sender: &mut JobSender, statement: Statement, parent_span: &jaeger::Span, ) -> Result, Error> { if let Some(signed_statement) = self.sign_statement(statement).await { - self.import_statement(&signed_statement, parent_span).await?; - self.distribute_signed_statement(signed_statement.clone()).await?; + self.import_statement(sender, &signed_statement, parent_span).await?; + let smsg = StatementDistributionMessage::Share(self.parent, signed_statement.clone()); + sender.send_unbounded_message(smsg.into()); + Ok(Some(signed_statement)) } else { Ok(None) @@ -745,26 +744,25 @@ impl CandidateBackingJob { } /// Check if there have happened any new misbehaviors and issue necessary messages. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - async fn issue_new_misbehaviors(&mut self) -> Result<(), Error> { + #[tracing::instrument(level = "trace", skip(self, sender), fields(subsystem = LOG_TARGET))] + async fn issue_new_misbehaviors(&mut self, sender: &mut JobSender) { // collect the misbehaviors to avoid double mutable self borrow issues let misbehaviors: Vec<_> = self.table.drain_misbehaviors().collect(); for (validator_id, report) in misbehaviors { - self.send_to_provisioner( + sender.send_message( ProvisionerMessage::ProvisionableData( self.parent, ProvisionableData::MisbehaviorReport(self.parent, validator_id, report) - ) - ).await? + ).into() + ).await; } - - Ok(()) } /// Import a statement into the statement table and return the summary of the import. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self, sender), fields(subsystem = LOG_TARGET))] async fn import_statement( &mut self, + sender: &mut JobSender, statement: &SignedFullStatement, parent_span: &jaeger::Span, ) -> Result, Error> { @@ -807,7 +805,7 @@ impl CandidateBackingJob { self.parent, ProvisionableData::BackedCandidate(backed.receipt()), ); - self.send_to_provisioner(message).await?; + sender.send_message(message.into()).await; span.as_ref().map(|s| s.child("backed")); span @@ -821,7 +819,7 @@ impl CandidateBackingJob { None }; - self.issue_new_misbehaviors().await?; + self.issue_new_misbehaviors(sender).await; // It is important that the child span is dropped before its parent span (`unbacked_span`) drop(import_statement_span); @@ -830,8 +828,13 @@ impl CandidateBackingJob { Ok(summary) } - #[tracing::instrument(level = "trace", skip(self, root_span), fields(subsystem = LOG_TARGET))] - async fn process_msg(&mut self, root_span: &jaeger::Span, msg: CandidateBackingMessage) -> Result<(), Error> { + #[tracing::instrument(level = "trace", skip(self, root_span, sender), fields(subsystem = LOG_TARGET))] + async fn process_msg( + &mut self, + root_span: &jaeger::Span, + sender: &mut JobSender, + msg: CandidateBackingMessage, + ) -> Result<(), Error> { match msg { CandidateBackingMessage::Second(relay_parent, candidate, pov) => { let _timer = self.metrics.time_process_second(); @@ -856,7 +859,7 @@ impl CandidateBackingJob { if !self.issued_statements.contains(&candidate_hash) { let pov = Arc::new(pov); - self.validate_and_second(&span, &root_span, &candidate, pov).await?; + self.validate_and_second(&span, &root_span, sender, &candidate, pov).await?; } } } @@ -868,7 +871,7 @@ impl CandidateBackingJob { .with_relay_parent(_relay_parent); self.check_statement_signature(&statement)?; - match self.maybe_validate_and_import(&span, &root_span, statement).await { + match self.maybe_validate_and_import(&span, &root_span, sender, statement).await { Err(Error::ValidationFailed(_)) => return Ok(()), Err(e) => return Err(e), Ok(()) => (), @@ -893,9 +896,10 @@ impl CandidateBackingJob { } /// Kick off validation work and distribute the result as a signed statement. - #[tracing::instrument(level = "trace", skip(self, attesting, span), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self, sender, attesting, span), fields(subsystem = LOG_TARGET))] async fn kick_off_validation_work( &mut self, + sender: &mut JobSender, attesting: AttestingData, span: Option, ) -> Result<(), Error> { @@ -925,34 +929,38 @@ impl CandidateBackingJob { return Ok(()); } + let bg_sender = sender.clone(); let pov = PoVData::FetchFromValidator { from_validator: attesting.from_validator, candidate_hash, pov_hash: attesting.pov_hash, }; - - self.background_validate_and_make_available(BackgroundValidationParams { - tx_from: self.tx_from.clone(), - tx_command: self.background_validation_tx.clone(), - candidate: attesting.candidate, - relay_parent: self.parent, - pov, - validator_index: self.table_context.validator.as_ref().map(|v| v.index()), - n_validators: self.table_context.validators.len(), - span, - make_command: ValidatedCandidateCommand::Attest, - }).await + self.background_validate_and_make_available( + sender, + BackgroundValidationParams { + sender: bg_sender, + tx_command: self.background_validation_tx.clone(), + candidate: attesting.candidate, + relay_parent: self.parent, + pov, + validator_index: self.table_context.validator.as_ref().map(|v| v.index()), + n_validators: self.table_context.validators.len(), + span, + make_command: ValidatedCandidateCommand::Attest, + }, + ).await } /// Import the statement and kick off validation work if it is a part of our assignment. - #[tracing::instrument(level = "trace", skip(self, parent_span), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self, parent_span, root_span, sender), fields(subsystem = LOG_TARGET))] async fn maybe_validate_and_import( &mut self, parent_span: &jaeger::Span, root_span: &jaeger::Span, + sender: &mut JobSender, statement: SignedFullStatement, ) -> Result<(), Error> { - if let Some(summary) = self.import_statement(&statement, parent_span).await? { + if let Some(summary) = self.import_statement(sender, &statement, parent_span).await? { if Some(summary.group_id) != self.assignment { return Ok(()) } @@ -989,7 +997,7 @@ impl CandidateBackingJob { attesting.backing.push(statement.validator_index()); return Ok(()) } else { - // No job, so start another try with current validator: + // No job, so start another with current validator: attesting.from_validator = statement.validator_index(); (attesting.clone(), span.as_ref().map(|s| s.child("try"))) } @@ -1000,6 +1008,7 @@ impl CandidateBackingJob { }; self.kick_off_validation_work( + sender, attesting, span, ).await?; @@ -1089,20 +1098,6 @@ impl CandidateBackingJob { fn remove_unbacked_span(&mut self, hash: &CandidateHash) -> Option { self.unbacked_candidates.remove(hash) } - - async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> { - self.tx_from.send(AllMessages::from(msg).into()).await?; - - Ok(()) - } - - async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> { - let smsg = StatementDistributionMessage::Share(self.parent, s); - - self.tx_from.send(AllMessages::from(smsg).into()).await?; - - Ok(()) - } } impl util::JobTrait for CandidateBackingJob { @@ -1113,14 +1108,14 @@ impl util::JobTrait for CandidateBackingJob { const NAME: &'static str = "CandidateBackingJob"; - #[tracing::instrument(skip(span, keystore, metrics, rx_to, tx_from), fields(subsystem = LOG_TARGET))] - fn run( + #[tracing::instrument(skip(span, keystore, metrics, rx_to, sender), fields(subsystem = LOG_TARGET))] + fn run( parent: Hash, span: Arc, keystore: SyncCryptoStorePtr, metrics: Metrics, rx_to: mpsc::Receiver, - mut tx_from: mpsc::Sender, + mut sender: JobSender, ) -> Pin> + Send>> { async move { macro_rules! try_runtime_api { @@ -1147,14 +1142,14 @@ impl util::JobTrait for CandidateBackingJob { let _span = span.child("runtime-apis"); let (validators, groups, session_index, cores) = futures::try_join!( - try_runtime_api!(request_validators(parent, &mut tx_from).await), - try_runtime_api!(request_validator_groups(parent, &mut tx_from).await), - try_runtime_api!(request_session_index_for_child(parent, &mut tx_from).await), - try_runtime_api!(request_from_runtime( + request_validators(parent, &mut sender).await, + request_validator_groups(parent, &mut sender).await, + request_session_index_for_child(parent, &mut sender).await, + request_from_runtime( parent, - &mut tx_from, + &mut sender, |tx| RuntimeApiRequest::AvailabilityCores(tx), - ).await), + ).await, ).map_err(Error::JoinMultiple)?; let validators = try_runtime_api!(validators); @@ -1231,7 +1226,6 @@ impl util::JobTrait for CandidateBackingJob { let (background_tx, background_rx) = mpsc::channel(16); let job = CandidateBackingJob { parent, - tx_from, assignment, required_collator, issued_statements: HashSet::new(), @@ -1249,7 +1243,7 @@ impl util::JobTrait for CandidateBackingJob { }; drop(_span); - job.run_loop(rx_to, span).await + job.run_loop(sender, rx_to, span).await }.boxed() } } @@ -1345,7 +1339,9 @@ impl metrics::Metrics for Metrics { } } -delegated_subsystem!(CandidateBackingJob(SyncCryptoStorePtr, Metrics) <- CandidateBackingMessage as CandidateBackingSubsystem); +/// The candidate backing subsystem. +pub type CandidateBackingSubsystem + = polkadot_node_subsystem_util::JobSubsystem; #[cfg(test)] mod tests { @@ -1363,6 +1359,7 @@ mod tests { use sp_keystore::{CryptoStore, SyncCryptoStore}; use statement_table::v1::Misbehavior; use std::collections::HashMap; + use sp_tracing as _; fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec { val_ids.iter().map(|v| v.public().into()).collect() @@ -1479,7 +1476,11 @@ mod tests { let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool.clone()); - let subsystem = CandidateBackingSubsystem::run(context, keystore, Metrics(None), pool.clone()); + let subsystem = CandidateBackingSubsystem::new( + pool.clone(), + keystore, + Metrics(None), + ).run(context); let test_fut = test(TestHarness { virtual_overseer, @@ -2670,7 +2671,7 @@ mod tests { // Test whether we retry on failed PoV fetching. #[test] fn retry_works() { - sp_tracing::try_init_simple(); + // sp_tracing::try_init_simple(); let test_state = TestState::default(); test_harness(test_state.keystore.clone(), |test_harness| async move { let TestHarness { mut virtual_overseer } = test_harness; diff --git a/polkadot/node/core/bitfield-signing/Cargo.toml b/polkadot/node/core/bitfield-signing/Cargo.toml index e336af9522..635aad8f01 100644 --- a/polkadot/node/core/bitfield-signing/Cargo.toml +++ b/polkadot/node/core/bitfield-signing/Cargo.toml @@ -13,3 +13,6 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } wasm-timer = "0.2.5" thiserror = "1.0.23" + +[dev-dependencies] +polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } diff --git a/polkadot/node/core/bitfield-signing/src/lib.rs b/polkadot/node/core/bitfield-signing/src/lib.rs index ee2c34f0ef..39ef5622a5 100644 --- a/polkadot/node/core/bitfield-signing/src/lib.rs +++ b/polkadot/node/core/bitfield-signing/src/lib.rs @@ -23,15 +23,16 @@ use futures::{channel::{mpsc, oneshot}, lock::Mutex, prelude::*, future, Future}; use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr}; use polkadot_node_subsystem::{ - jaeger, PerLeafSpan, + jaeger, PerLeafSpan, SubsystemSender, messages::{ - AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage, + AvailabilityStoreMessage, BitfieldDistributionMessage, BitfieldSigningMessage, RuntimeApiMessage, RuntimeApiRequest, }, errors::RuntimeApiError, }; use polkadot_node_subsystem_util::{ - self as util, JobManager, JobTrait, Validator, FromJobCommand, metrics::{self, prometheus}, + self as util, JobSubsystem, JobTrait, Validator, metrics::{self, prometheus}, + JobSender, }; use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex}; use std::{pin::Pin, time::Duration, iter::FromIterator, sync::Arc}; @@ -73,7 +74,7 @@ pub enum Error { async fn get_core_availability( core: &CoreState, validator_idx: ValidatorIndex, - sender: &Mutex<&mut mpsc::Sender>, + sender: &Mutex<&mut impl SubsystemSender>, span: &jaeger::Span, ) -> Result { if let &CoreState::Occupied(ref core) = core { @@ -83,14 +84,14 @@ async fn get_core_availability( sender .lock() .await - .send( - AllMessages::from(AvailabilityStoreMessage::QueryChunkAvailability( + .send_message( + AvailabilityStoreMessage::QueryChunkAvailability( core.candidate_hash, validator_idx, tx, - )).into(), + ).into(), ) - .await?; + .await; let res = rx.await.map_err(Into::into); @@ -111,12 +112,15 @@ async fn get_core_availability( /// delegates to the v1 runtime API async fn get_availability_cores( relay_parent: Hash, - sender: &mut mpsc::Sender, + sender: &mut impl SubsystemSender, ) -> Result, Error> { let (tx, rx) = oneshot::channel(); sender - .send(AllMessages::from(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::AvailabilityCores(tx))).into()) - .await?; + .send_message(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx), + ).into()) + .await; match rx.await { Ok(Ok(out)) => Ok(out), Ok(Err(runtime_err)) => Err(runtime_err.into()), @@ -133,7 +137,7 @@ async fn construct_availability_bitfield( relay_parent: Hash, span: &jaeger::Span, validator_idx: ValidatorIndex, - sender: &mut mpsc::Sender, + sender: &mut impl SubsystemSender, ) -> Result { // get the set of availability cores from the runtime let availability_cores = { @@ -223,13 +227,13 @@ impl JobTrait for BitfieldSigningJob { /// Run a job for the parent block indicated #[tracing::instrument(skip(span, keystore, metrics, _receiver, sender), fields(subsystem = LOG_TARGET))] - fn run( + fn run( relay_parent: Hash, span: Arc, keystore: Self::RunArgs, metrics: Self::Metrics, _receiver: mpsc::Receiver, - mut sender: mpsc::Sender, + mut sender: JobSender, ) -> Pin> + Send>> { let metrics = metrics.clone(); async move { @@ -239,7 +243,7 @@ impl JobTrait for BitfieldSigningJob { // now do all the work we can before we need to wait for the availability store // if we're not a validator, we can just succeed effortlessly - let validator = match Validator::new(relay_parent, keystore.clone(), sender.clone()).await { + let validator = match Validator::new(relay_parent, keystore.clone(), &mut sender).await { Ok(validator) => validator, Err(util::Error::NotAValidator) => return Ok(()), Err(err) => return Err(Error::Util(err)), @@ -260,7 +264,7 @@ impl JobTrait for BitfieldSigningJob { relay_parent, &span_availability, validator.index(), - &mut sender, + sender.subsystem_sender(), ).await { Err(Error::Runtime(runtime_err)) => { @@ -295,26 +299,27 @@ impl JobTrait for BitfieldSigningJob { let _span = span.child("gossip"); sender - .send( - AllMessages::from( - BitfieldDistributionMessage::DistributeBitfield(relay_parent, signed_bitfield), - ).into(), - ) - .await - .map_err(Into::into) + .send_message(BitfieldDistributionMessage::DistributeBitfield( + relay_parent, + signed_bitfield, + ).into()) + .await; + + Ok(()) } .boxed() } } /// BitfieldSigningSubsystem manages a number of bitfield signing jobs. -pub type BitfieldSigningSubsystem = JobManager; +pub type BitfieldSigningSubsystem = JobSubsystem; #[cfg(test)] mod tests { use super::*; use futures::{pin_mut, executor::block_on}; use polkadot_primitives::v1::{CandidateHash, OccupiedCore}; + use polkadot_node_subsystem::messages::AllMessages; fn occupied_core(para_id: u32, candidate_hash: CandidateHash) -> CoreState { CoreState::Occupied(OccupiedCore { @@ -332,10 +337,10 @@ mod tests { #[test] fn construct_availability_bitfield_works() { block_on(async move { - let (mut sender, mut receiver) = mpsc::channel(10); let relay_parent = Hash::default(); let validator_index = ValidatorIndex(1u32); + let (mut sender, mut receiver) = polkadot_node_subsystem_test_helpers::sender_receiver(); let future = construct_availability_bitfield( relay_parent, &jaeger::Span::Disabled, @@ -350,18 +355,14 @@ mod tests { loop { futures::select! { m = receiver.next() => match m.unwrap() { - FromJobCommand::SendMessage( - AllMessages::RuntimeApi( - RuntimeApiMessage::Request(rp, RuntimeApiRequest::AvailabilityCores(tx)), - ), + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(rp, RuntimeApiRequest::AvailabilityCores(tx)), ) => { assert_eq!(relay_parent, rp); tx.send(Ok(vec![CoreState::Free, occupied_core(1, hash_a), occupied_core(2, hash_b)])).unwrap(); - }, - FromJobCommand::SendMessage( - AllMessages::AvailabilityStore( - AvailabilityStoreMessage::QueryChunkAvailability(c_hash, vidx, tx), - ), + } + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::QueryChunkAvailability(c_hash, vidx, tx), ) => { assert_eq!(validator_index, vidx); diff --git a/polkadot/node/core/candidate-selection/Cargo.toml b/polkadot/node/core/candidate-selection/Cargo.toml index ee58baa1f8..ac7d257e2f 100644 --- a/polkadot/node/core/candidate-selection/Cargo.toml +++ b/polkadot/node/core/candidate-selection/Cargo.toml @@ -18,3 +18,4 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" } [dev-dependencies] sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } diff --git a/polkadot/node/core/candidate-selection/src/lib.rs b/polkadot/node/core/candidate-selection/src/lib.rs index 5b9e3a57c4..018a9aaa90 100644 --- a/polkadot/node/core/candidate-selection/src/lib.rs +++ b/polkadot/node/core/candidate-selection/src/lib.rs @@ -25,16 +25,16 @@ use futures::{ }; use sp_keystore::SyncCryptoStorePtr; use polkadot_node_subsystem::{ - jaeger, PerLeafSpan, + jaeger, PerLeafSpan, SubsystemSender, errors::ChainApiError, messages::{ - AllMessages, CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage, + CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage, RuntimeApiRequest, }, }; use polkadot_node_subsystem_util::{ - self as util, request_from_runtime, request_validator_groups, delegated_subsystem, - JobTrait, FromJobCommand, Validator, metrics::{self, prometheus}, + self as util, request_from_runtime, request_validator_groups, JobSubsystem, + JobTrait, JobSender, Validator, metrics::{self, prometheus}, }; use polkadot_primitives::v1::{ CandidateReceipt, CollatorId, CoreState, CoreIndex, Hash, Id as ParaId, PoV, BlockNumber, @@ -45,22 +45,24 @@ use thiserror::Error; const LOG_TARGET: &'static str = "parachain::candidate-selection"; -struct CandidateSelectionJob { +/// A per-block job in the candidate selection subsystem. +pub struct CandidateSelectionJob { assignment: ParaId, - sender: mpsc::Sender, receiver: mpsc::Receiver, metrics: Metrics, seconded_candidate: Option, } +/// Errors in the candidate selection subsystem. #[derive(Debug, Error)] -enum Error { - #[error(transparent)] - Sending(#[from] mpsc::SendError), +pub enum Error { + /// An error in utilities. #[error(transparent)] Util(#[from] util::Error), + /// An error receiving on a oneshot channel. #[error(transparent)] OneshotRecv(#[from] oneshot::Canceled), + /// An error interacting with the chain API. #[error(transparent)] ChainApi(#[from] ChainApiError), } @@ -94,13 +96,13 @@ impl JobTrait for CandidateSelectionJob { const NAME: &'static str = "CandidateSelectionJob"; #[tracing::instrument(skip(keystore, metrics, receiver, sender), fields(subsystem = LOG_TARGET))] - fn run( + fn run( relay_parent: Hash, span: Arc, keystore: Self::RunArgs, metrics: Self::Metrics, receiver: mpsc::Receiver, - mut sender: mpsc::Sender, + mut sender: JobSender, ) -> Pin> + Send>> { let span = PerLeafSpan::new(span, "candidate-selection"); async move { @@ -108,12 +110,12 @@ impl JobTrait for CandidateSelectionJob { .with_relay_parent(relay_parent) .with_stage(jaeger::Stage::CandidateSelection); let (groups, cores) = futures::try_join!( - try_runtime_api!(request_validator_groups(relay_parent, &mut sender).await), - try_runtime_api!(request_from_runtime( + request_validator_groups(relay_parent, &mut sender).await, + request_from_runtime( relay_parent, &mut sender, |tx| RuntimeApiRequest::AvailabilityCores(tx), - ).await), + ).await, )?; let (validator_groups, group_rotation_info) = try_runtime_api!(groups); @@ -126,7 +128,7 @@ impl JobTrait for CandidateSelectionJob { let n_cores = cores.len(); - let validator = match Validator::new(relay_parent, keystore.clone(), sender.clone()).await { + let validator = match Validator::new(relay_parent, keystore.clone(), &mut sender).await { Ok(validator) => validator, Err(util::Error::NotAValidator) => return Ok(()), Err(err) => return Err(Error::Util(err)), @@ -198,20 +200,20 @@ impl JobTrait for CandidateSelectionJob { drop(assignment_span); - CandidateSelectionJob::new(assignment, metrics, sender, receiver).run_loop(&span).await + CandidateSelectionJob::new(assignment, metrics, receiver) + .run_loop(&span, sender.subsystem_sender()) + .await }.boxed() } } impl CandidateSelectionJob { - pub fn new( + fn new( assignment: ParaId, metrics: Metrics, - sender: mpsc::Sender, receiver: mpsc::Receiver, ) -> Self { Self { - sender, receiver, metrics, assignment, @@ -219,7 +221,11 @@ impl CandidateSelectionJob { } } - async fn run_loop(&mut self, span: &jaeger::Span) -> Result<(), Error> { + async fn run_loop( + &mut self, + span: &jaeger::Span, + sender: &mut impl SubsystemSender, + ) -> Result<(), Error> { let span = span.child("run-loop") .with_stage(jaeger::Stage::CandidateSelection); @@ -231,7 +237,7 @@ impl CandidateSelectionJob { collator_id, )) => { let _span = span.child("handle-collation"); - self.handle_collation(relay_parent, para_id, collator_id).await; + self.handle_collation(sender, relay_parent, para_id, collator_id).await; } Some(CandidateSelectionMessage::Invalid( _relay_parent, @@ -241,28 +247,26 @@ impl CandidateSelectionJob { .with_stage(jaeger::Stage::CandidateSelection) .with_candidate(candidate_receipt.hash()) .with_relay_parent(_relay_parent); - self.handle_invalid(candidate_receipt).await; + self.handle_invalid(sender, candidate_receipt).await; } Some(CandidateSelectionMessage::Seconded(_relay_parent, statement)) => { let _span = span.child("handle-seconded") .with_stage(jaeger::Stage::CandidateSelection) .with_candidate(statement.payload().candidate_hash()) .with_relay_parent(_relay_parent); - self.handle_seconded(statement).await; + self.handle_seconded(sender, statement).await; } None => break, } } - // closing the sender here means that we don't deadlock in tests - self.sender.close_channel(); - Ok(()) } - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self, sender), fields(subsystem = LOG_TARGET))] async fn handle_collation( &mut self, + sender: &mut impl SubsystemSender, relay_parent: Hash, para_id: ParaId, collator_id: CollatorId, @@ -276,13 +280,7 @@ impl CandidateSelectionJob { collator_id, para_id, ); - if let Err(err) = forward_invalidity_note(&collator_id, &mut self.sender).await { - tracing::warn!( - target: LOG_TARGET, - err = ?err, - "failed to forward invalidity note", - ); - } + forward_invalidity_note(&collator_id, sender).await; return; } @@ -292,7 +290,7 @@ impl CandidateSelectionJob { relay_parent, para_id, collator_id.clone(), - self.sender.clone(), + sender, ).await { Ok(response) => response, Err(err) => { @@ -305,21 +303,23 @@ impl CandidateSelectionJob { } }; - match second_candidate( + second_candidate( relay_parent, candidate_receipt, pov, - &mut self.sender, + sender, &self.metrics, - ).await { - Err(err) => tracing::warn!(target: LOG_TARGET, err = ?err, "failed to second a candidate"), - Ok(()) => self.seconded_candidate = Some(collator_id), - } + ).await; + self.seconded_candidate = Some(collator_id); } } - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - async fn handle_invalid(&mut self, candidate_receipt: CandidateReceipt) { + #[tracing::instrument(level = "trace", skip(self, sender), fields(subsystem = LOG_TARGET))] + async fn handle_invalid( + &mut self, + sender: &mut impl SubsystemSender, + candidate_receipt: CandidateReceipt, + ) { let _timer = self.metrics.time_handle_invalid(); let received_from = match &self.seconded_candidate { @@ -338,21 +338,15 @@ impl CandidateSelectionJob { "received invalidity note for candidate", ); - let result = - if let Err(err) = forward_invalidity_note(received_from, &mut self.sender).await { - tracing::warn!( - target: LOG_TARGET, - err = ?err, - "failed to forward invalidity note", - ); - Err(()) - } else { - Ok(()) - }; - self.metrics.on_invalid_selection(result); + forward_invalidity_note(received_from, sender).await; + self.metrics.on_invalid_selection(); } - async fn handle_seconded(&mut self, statement: SignedFullStatement) { + async fn handle_seconded( + &mut self, + sender: &mut impl SubsystemSender, + statement: SignedFullStatement, + ) { let received_from = match &self.seconded_candidate { Some(peer) => peer, None => { @@ -369,27 +363,13 @@ impl CandidateSelectionJob { "received seconded note for candidate", ); - if let Err(e) = self.sender - .send(AllMessages::from(CollatorProtocolMessage::NoteGoodCollation(received_from.clone())).into()).await - { - tracing::debug!( - target: LOG_TARGET, - error = ?e, - "failed to note good collator" - ); - } + sender + .send_message(CollatorProtocolMessage::NoteGoodCollation(received_from.clone()).into()) + .await; - if let Err(e) = self.sender - .send(AllMessages::from( - CollatorProtocolMessage::NotifyCollationSeconded(received_from.clone(), statement) - ).into()).await - { - tracing::debug!( - target: LOG_TARGET, - error = ?e, - "failed to notify collator about seconded collation" - ); - } + sender.send_message( + CollatorProtocolMessage::NotifyCollationSeconded(received_from.clone(), statement).into() + ).await; } } @@ -401,17 +381,18 @@ async fn get_collation( relay_parent: Hash, para_id: ParaId, collator_id: CollatorId, - mut sender: mpsc::Sender, + sender: &mut impl SubsystemSender, ) -> Result<(CandidateReceipt, PoV), Error> { let (tx, rx) = oneshot::channel(); sender - .send(AllMessages::from(CollatorProtocolMessage::FetchCollation( + .send_message(CollatorProtocolMessage::FetchCollation( relay_parent, collator_id, para_id, tx, - )).into()) - .await?; + ).into()) + .await; + rx.await.map_err(Into::into) } @@ -419,45 +400,33 @@ async fn second_candidate( relay_parent: Hash, candidate_receipt: CandidateReceipt, pov: PoV, - sender: &mut mpsc::Sender, + sender: &mut impl SubsystemSender, metrics: &Metrics, -) -> Result<(), Error> { - match sender - .send(AllMessages::from(CandidateBackingMessage::Second( +) { + sender + .send_message(CandidateBackingMessage::Second( relay_parent, candidate_receipt, pov, - )).into()) - .await - { - Err(err) => { - tracing::warn!(target: LOG_TARGET, err = ?err, "failed to send a seconding message"); - metrics.on_second(Err(())); - Err(err.into()) - } - Ok(_) => { - metrics.on_second(Ok(())); - Ok(()) - } - } + ).into()) + .await; + + metrics.on_second(); } async fn forward_invalidity_note( received_from: &CollatorId, - sender: &mut mpsc::Sender, -) -> Result<(), Error> { + sender: &mut impl SubsystemSender, +) { sender - .send(AllMessages::from(CollatorProtocolMessage::ReportCollator( - received_from.clone(), - )).into()) + .send_message(CollatorProtocolMessage::ReportCollator(received_from.clone()).into()) .await - .map_err(Into::into) } #[derive(Clone)] struct MetricsInner { - seconds: prometheus::CounterVec, - invalid_selections: prometheus::CounterVec, + seconds: prometheus::Counter, + invalid_selections: prometheus::Counter, handle_collation: prometheus::Histogram, handle_invalid: prometheus::Histogram, } @@ -467,17 +436,15 @@ struct MetricsInner { pub struct Metrics(Option); impl Metrics { - fn on_second(&self, result: Result<(), ()>) { + fn on_second(&self) { if let Some(metrics) = &self.0 { - let label = if result.is_ok() { "succeeded" } else { "failed" }; - metrics.seconds.with_label_values(&[label]).inc(); + metrics.seconds.inc(); } } - fn on_invalid_selection(&self, result: Result<(), ()>) { + fn on_invalid_selection(&self) { if let Some(metrics) = &self.0 { - let label = if result.is_ok() { "succeeded" } else { "failed" }; - metrics.invalid_selections.with_label_values(&[label]).inc(); + metrics.invalid_selections.inc(); } } @@ -496,22 +463,20 @@ impl metrics::Metrics for Metrics { fn try_register(registry: &prometheus::Registry) -> Result { let metrics = MetricsInner { seconds: prometheus::register( - prometheus::CounterVec::new( + prometheus::Counter::with_opts( prometheus::Opts::new( "candidate_selection_seconds_total", "Number of Candidate Selection subsystem seconding events.", ), - &["success"], )?, registry, )?, invalid_selections: prometheus::register( - prometheus::CounterVec::new( + prometheus::Counter::with_opts( prometheus::Opts::new( "candidate_selection_invalid_selections_total", "Number of Candidate Selection subsystem seconding selections which proved to be invalid.", ), - &["success"], )?, registry, )?, @@ -538,13 +503,15 @@ impl metrics::Metrics for Metrics { } } -delegated_subsystem!(CandidateSelectionJob(SyncCryptoStorePtr, Metrics) <- CandidateSelectionMessage as CandidateSelectionSubsystem); +/// The candidate selection subsystem. +pub type CandidateSelectionSubsystem = JobSubsystem; #[cfg(test)] mod tests { use super::*; use futures::lock::Mutex; use polkadot_primitives::v1::BlockData; + use polkadot_node_subsystem::messages::AllMessages; use sp_core::crypto::Public; use std::sync::Arc; @@ -554,15 +521,14 @@ mod tests { postconditions: Postconditions, ) where Preconditions: FnOnce(&mut CandidateSelectionJob), - TestBuilder: FnOnce(mpsc::Sender, mpsc::Receiver) -> Test, + TestBuilder: FnOnce(mpsc::Sender, mpsc::UnboundedReceiver) -> Test, Test: Future, Postconditions: FnOnce(CandidateSelectionJob, Result<(), Error>), { let (to_job_tx, to_job_rx) = mpsc::channel(0); - let (from_job_tx, from_job_rx) = mpsc::channel(0); + let (mut from_job_tx, from_job_rx) = polkadot_node_subsystem_test_helpers::sender_receiver(); let mut job = CandidateSelectionJob { assignment: 123.into(), - sender: from_job_tx, receiver: to_job_rx, metrics: Default::default(), seconded_candidate: None, @@ -570,9 +536,13 @@ mod tests { preconditions(&mut job); let span = jaeger::Span::Disabled; - let (_, job_result) = futures::executor::block_on(future::join( + let (_, (job, job_result)) = futures::executor::block_on(future::join( test(to_job_tx, from_job_rx), - job.run_loop(&span), + async move { + let res = job.run_loop(&span, &mut from_job_tx).await; + drop(from_job_tx); + (job, res) + }, )); postconditions(job, job_result); @@ -609,12 +579,12 @@ mod tests { while let Some(msg) = from_job.next().await { match msg { - FromJobCommand::SendMessage(AllMessages::CollatorProtocol(CollatorProtocolMessage::FetchCollation( + AllMessages::CollatorProtocol(CollatorProtocolMessage::FetchCollation( got_relay_parent, collator_id, got_para_id, return_sender, - ))) => { + )) => { assert_eq!(got_relay_parent, relay_parent); assert_eq!(got_para_id, para_id); assert_eq!(collator_id, collator_id_clone); @@ -623,11 +593,11 @@ mod tests { .send((candidate_receipt.clone(), pov.clone())) .unwrap(); } - FromJobCommand::SendMessage(AllMessages::CandidateBacking(CandidateBackingMessage::Second( + AllMessages::CandidateBacking(CandidateBackingMessage::Second( got_relay_parent, got_candidate_receipt, got_pov, - ))) => { + )) => { assert_eq!(got_relay_parent, relay_parent); assert_eq!(got_candidate_receipt, candidate_receipt); assert_eq!(got_pov, pov); @@ -674,11 +644,11 @@ mod tests { while let Some(msg) = from_job.next().await { match msg { - FromJobCommand::SendMessage(AllMessages::CandidateBacking(CandidateBackingMessage::Second( + AllMessages::CandidateBacking(CandidateBackingMessage::Second( _got_relay_parent, _got_candidate_receipt, _got_pov, - ))) => { + )) => { *was_seconded_clone.lock().await = true; } other => panic!("unexpected message from job: {:?}", other), @@ -713,13 +683,14 @@ mod tests { .send(CandidateSelectionMessage::Invalid(relay_parent, candidate_receipt)) .await .unwrap(); + std::mem::drop(to_job); while let Some(msg) = from_job.next().await { match msg { - FromJobCommand::SendMessage(AllMessages::CollatorProtocol(CollatorProtocolMessage::ReportCollator( + AllMessages::CollatorProtocol(CollatorProtocolMessage::ReportCollator( got_collator_id, - ))) => { + )) => { assert_eq!(got_collator_id, collator_id_clone); *sent_report_clone.lock().await = true; diff --git a/polkadot/node/core/provisioner/Cargo.toml b/polkadot/node/core/provisioner/Cargo.toml index bff63e230a..15f3866fe6 100644 --- a/polkadot/node/core/provisioner/Cargo.toml +++ b/polkadot/node/core/provisioner/Cargo.toml @@ -17,3 +17,4 @@ futures-timer = "3.0.2" [dev-dependencies] sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } +polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index de5f3eb036..82a0033c52 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -25,14 +25,14 @@ use futures::{ prelude::*, }; use polkadot_node_subsystem::{ - errors::{ChainApiError, RuntimeApiError}, PerLeafSpan, jaeger, + errors::{ChainApiError, RuntimeApiError}, PerLeafSpan, SubsystemSender, jaeger, messages::{ - AllMessages, CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData, + CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData, ProvisionerMessage, }, }; use polkadot_node_subsystem_util::{ - self as util, delegated_subsystem, FromJobCommand, + self as util, JobSubsystem, JobSender, request_availability_cores, request_persisted_validation_data, JobTrait, metrics::{self, prometheus}, }; use polkadot_primitives::v1::{ @@ -80,9 +80,9 @@ impl InherentAfter { } } -struct ProvisioningJob { +/// A per-relay-parent job for the provisioning subsystem. +pub struct ProvisioningJob { relay_parent: Hash, - sender: mpsc::Sender, receiver: mpsc::Receiver, backed_candidates: Vec, signed_bitfields: Vec, @@ -91,8 +91,10 @@ struct ProvisioningJob { awaiting_inherent: Vec> } +/// Errors in the provisioner. #[derive(Debug, Error)] -enum Error { +#[allow(missing_docs)] +pub enum Error { #[error(transparent)] Util(#[from] util::Error), @@ -139,38 +141,35 @@ impl JobTrait for ProvisioningJob { // // this function is in charge of creating and executing the job's main loop #[tracing::instrument(skip(span, _run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))] - fn run( + fn run( relay_parent: Hash, span: Arc, _run_args: Self::RunArgs, metrics: Self::Metrics, receiver: mpsc::Receiver, - sender: mpsc::Sender, + mut sender: JobSender, ) -> Pin> + Send>> { async move { let job = ProvisioningJob::new( relay_parent, metrics, - sender, receiver, ); - job.run_loop(PerLeafSpan::new(span, "provisioner")).await + job.run_loop(sender.subsystem_sender(), PerLeafSpan::new(span, "provisioner")).await } .boxed() } } impl ProvisioningJob { - pub fn new( + fn new( relay_parent: Hash, metrics: Metrics, - sender: mpsc::Sender, receiver: mpsc::Receiver, ) -> Self { Self { relay_parent, - sender, receiver, backed_candidates: Vec::new(), signed_bitfields: Vec::new(), @@ -180,7 +179,11 @@ impl ProvisioningJob { } } - async fn run_loop(mut self, span: PerLeafSpan) -> Result<(), Error> { + async fn run_loop( + mut self, + sender: &mut impl SubsystemSender, + span: PerLeafSpan, + ) -> Result<(), Error> { use ProvisionerMessage::{ ProvisionableData, RequestInherentData, }; @@ -192,7 +195,7 @@ impl ProvisioningJob { let _timer = self.metrics.time_request_inherent_data(); if self.inherent_after.is_ready() { - self.send_inherent_data(vec![return_sender]).await; + self.send_inherent_data(sender, vec![return_sender]).await; } else { self.awaiting_inherent.push(return_sender); } @@ -209,7 +212,7 @@ impl ProvisioningJob { let _span = span.child("send-inherent-data"); let return_senders = std::mem::take(&mut self.awaiting_inherent); if !return_senders.is_empty() { - self.send_inherent_data(return_senders).await; + self.send_inherent_data(sender, return_senders).await; } } } @@ -220,6 +223,7 @@ impl ProvisioningJob { async fn send_inherent_data( &mut self, + sender: &mut impl SubsystemSender, return_senders: Vec>, ) { if let Err(err) = send_inherent_data( @@ -227,7 +231,7 @@ impl ProvisioningJob { &self.signed_bitfields, &self.backed_candidates, return_senders, - &mut self.sender, + sender, ) .await { @@ -279,10 +283,10 @@ async fn send_inherent_data( bitfields: &[SignedAvailabilityBitfield], candidates: &[CandidateReceipt], return_senders: Vec>, - from_job: &mut mpsc::Sender, + from_job: &mut impl SubsystemSender, ) -> Result<(), Error> { let availability_cores = request_availability_cores(relay_parent, from_job) - .await? + .await .await.map_err(|err| Error::CanceledAvailabilityCores(err))??; let bitfields = select_availability_bitfields(&availability_cores, bitfields); @@ -351,7 +355,7 @@ async fn select_candidates( bitfields: &[SignedAvailabilityBitfield], candidates: &[CandidateReceipt], relay_parent: Hash, - sender: &mut mpsc::Sender, + sender: &mut impl SubsystemSender, ) -> Result, Error> { let block_number = get_block_number_under_construction(relay_parent, sender).await?; @@ -388,7 +392,7 @@ async fn select_candidates( assumption, sender, ) - .await? + .await .await.map_err(|err| Error::CanceledPersistedValidationData(err))?? { Some(v) => v, @@ -418,11 +422,11 @@ async fn select_candidates( // now get the backed candidates corresponding to these candidate receipts let (tx, rx) = oneshot::channel(); - sender.send(AllMessages::CandidateBacking(CandidateBackingMessage::GetBackedCandidates( + sender.send_message(CandidateBackingMessage::GetBackedCandidates( relay_parent, selected_candidates.clone(), tx, - )).into()).await.map_err(|err| Error::GetBackedCandidatesSend(err))?; + ).into()).await; let mut candidates = rx.await.map_err(|err| Error::CanceledBackedCandidates(err))?; // `selected_candidates` is generated in ascending order by core index, and `GetBackedCandidates` @@ -470,16 +474,16 @@ async fn select_candidates( #[tracing::instrument(level = "trace", skip(sender), fields(subsystem = LOG_TARGET))] async fn get_block_number_under_construction( relay_parent: Hash, - sender: &mut mpsc::Sender, + sender: &mut impl SubsystemSender, ) -> Result { let (tx, rx) = oneshot::channel(); sender - .send(AllMessages::from(ChainApiMessage::BlockNumber( + .send_message(ChainApiMessage::BlockNumber( relay_parent, tx, - )).into()) - .await - .map_err(|e| Error::ChainApiMessageSend(e))?; + ).into()) + .await; + match rx.await.map_err(|err| Error::CanceledBlockNumber(err))? { Ok(Some(n)) => Ok(n + 1), Ok(None) => Ok(0), @@ -596,7 +600,8 @@ impl metrics::Metrics for Metrics { } -delegated_subsystem!(ProvisioningJob((), Metrics) <- ProvisionerMessage as ProvisioningSubsystem); +/// The provisioning subsystem. +pub type ProvisioningSubsystem = JobSubsystem; #[cfg(test)] mod tests; diff --git a/polkadot/node/core/provisioner/src/tests.rs b/polkadot/node/core/provisioner/src/tests.rs index 777e1d1447..9b9409fbf5 100644 --- a/polkadot/node/core/provisioner/src/tests.rs +++ b/polkadot/node/core/provisioner/src/tests.rs @@ -191,7 +191,6 @@ mod select_availability_bitfields { } mod select_candidates { - use futures_timer::Delay; use super::super::*; use super::{build_occupied_core, occupied_core, scheduled_core, default_bitvec}; use polkadot_node_subsystem::messages::{ @@ -201,6 +200,7 @@ mod select_candidates { use polkadot_primitives::v1::{ BlockNumber, CandidateDescriptor, PersistedValidationData, CommittedCandidateReceipt, CandidateCommitments, }; + use polkadot_node_subsystem_test_helpers::TestSubsystemSender; const BLOCK_UNDER_PRODUCTION: BlockNumber = 128; @@ -208,12 +208,12 @@ mod select_candidates { overseer_factory: OverseerFactory, test_factory: TestFactory, ) where - OverseerFactory: FnOnce(mpsc::Receiver) -> Overseer, + OverseerFactory: FnOnce(mpsc::UnboundedReceiver) -> Overseer, Overseer: Future, - TestFactory: FnOnce(mpsc::Sender) -> Test, + TestFactory: FnOnce(TestSubsystemSender) -> Test, Test: Future, { - let (tx, rx) = mpsc::channel(64); + let (tx, rx) = polkadot_node_subsystem_test_helpers::sender_receiver(); let overseer = overseer_factory(rx); let test = test_factory(tx); @@ -298,24 +298,27 @@ mod select_candidates { ] } - async fn mock_overseer(mut receiver: mpsc::Receiver, expected: Vec) { + async fn mock_overseer( + mut receiver: mpsc::UnboundedReceiver, + expected: Vec, + ) { use ChainApiMessage::BlockNumber; use RuntimeApiMessage::Request; while let Some(from_job) = receiver.next().await { match from_job { - FromJobCommand::SendMessage(AllMessages::ChainApi(BlockNumber(_relay_parent, tx))) => { + AllMessages::ChainApi(BlockNumber(_relay_parent, tx)) => { tx.send(Ok(Some(BLOCK_UNDER_PRODUCTION - 1))).unwrap() } - FromJobCommand::SendMessage(AllMessages::RuntimeApi(Request( + AllMessages::RuntimeApi(Request( _parent_hash, PersistedValidationDataReq(_para_id, _assumption, tx), - ))) => tx.send(Ok(Some(Default::default()))).unwrap(), - FromJobCommand::SendMessage(AllMessages::RuntimeApi(Request(_parent_hash, AvailabilityCores(tx)))) => { + )) => tx.send(Ok(Some(Default::default()))).unwrap(), + AllMessages::RuntimeApi(Request(_parent_hash, AvailabilityCores(tx))) => { tx.send(Ok(mock_availability_cores())).unwrap() } - FromJobCommand::SendMessage( - AllMessages::CandidateBacking(CandidateBackingMessage::GetBackedCandidates(_, _, sender)) + AllMessages::CandidateBacking( + CandidateBackingMessage::GetBackedCandidates(_, _, sender) ) => { let _ = sender.send(expected.clone()); } @@ -324,29 +327,9 @@ mod select_candidates { } } - #[test] - fn handles_overseer_failure() { - let overseer = |rx: mpsc::Receiver| async move { - // drop the receiver so it closes and the sender can't send, then just sleep long enough that - // this is almost certainly not the first of the two futures to complete - std::mem::drop(rx); - Delay::new(std::time::Duration::from_secs(1)).await; - }; - - let test = |mut tx: mpsc::Sender| async move { - // wait so that the overseer can drop the rx before we attempt to send - Delay::new(std::time::Duration::from_millis(50)).await; - let result = select_candidates(&[], &[], &[], Default::default(), &mut tx).await; - println!("{:?}", result); - assert!(std::matches!(result, Err(Error::ChainApiMessageSend(_)))); - }; - - test_harness(overseer, test); - } - #[test] fn can_succeed() { - test_harness(|r| mock_overseer(r, Vec::new()), |mut tx: mpsc::Sender| async move { + test_harness(|r| mock_overseer(r, Vec::new()), |mut tx: TestSubsystemSender| async move { select_candidates(&[], &[], &[], Default::default(), &mut tx).await.unwrap(); }) } @@ -411,7 +394,7 @@ mod select_candidates { }) .collect(); - test_harness(|r| mock_overseer(r, expected_backed), |mut tx: mpsc::Sender| async move { + test_harness(|r| mock_overseer(r, expected_backed), |mut tx: TestSubsystemSender| async move { let result = select_candidates(&mock_cores, &[], &candidates, Default::default(), &mut tx) .await.unwrap(); @@ -470,7 +453,7 @@ mod select_candidates { }) .collect(); - test_harness(|r| mock_overseer(r, expected_backed), |mut tx: mpsc::Sender| async move { + test_harness(|r| mock_overseer(r, expected_backed), |mut tx: TestSubsystemSender| async move { let result = select_candidates(&mock_cores, &[], &candidates, Default::default(), &mut tx) .await.unwrap(); diff --git a/polkadot/node/network/bridge/Cargo.toml b/polkadot/node/network/bridge/Cargo.toml index 8afa884cf1..ea2528ed72 100644 --- a/polkadot/node/network/bridge/Cargo.toml +++ b/polkadot/node/network/bridge/Cargo.toml @@ -15,10 +15,10 @@ sc-network = { git = "https://github.com/paritytech/substrate", branch = "master polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-node-network-protocol = { path = "../protocol" } strum = "0.20.0" +parking_lot = "0.11.1" [dev-dependencies] assert_matches = "1.4.0" -parking_lot = "0.11.1" polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } polkadot-node-subsystem-util = { path = "../../subsystem-util"} sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/bridge/src/action.rs b/polkadot/node/network/bridge/src/action.rs deleted file mode 100644 index cabba1f9df..0000000000 --- a/polkadot/node/network/bridge/src/action.rs +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright 2020-2021 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . -// - -use futures::channel::mpsc; - -use parity_scale_codec::Decode; -use polkadot_node_network_protocol::{ - peer_set::PeerSet, v1 as protocol_v1, PeerId, UnifiedReputationChange, -}; -use polkadot_primitives::v1::{AuthorityDiscoveryId, BlockNumber}; -use polkadot_subsystem::messages::{AllMessages, NetworkBridgeMessage}; -use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal}; -use sc_network::{Event as NetworkEvent, IfDisconnected}; - -use polkadot_node_network_protocol::{request_response::Requests, ObservedRole}; - -use super::multiplexer::RequestMultiplexError; -use super::{WireMessage, MALFORMED_MESSAGE_COST}; - -/// Internal type combining all actions a `NetworkBridge` might perform. -/// -/// Both messages coming from the network (`NetworkEvent`) and messages coming from other -/// subsystems (`FromOverseer`) will be converted to `Action` in `run_network` before being -/// processed. -#[derive(Debug)] -pub(crate) enum Action { - /// Ask network to send a validation message. - SendValidationMessages(Vec<(Vec, protocol_v1::ValidationProtocol)>), - - /// Ask network to send a collation message. - SendCollationMessages(Vec<(Vec, protocol_v1::CollationProtocol)>), - - /// Ask network to send requests. - SendRequests(Vec, IfDisconnected), - - /// Ask network to connect to validators. - ConnectToValidators { - validator_ids: Vec, - peer_set: PeerSet, - connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, - }, - - /// Report a peer to the network implementation (decreasing/increasing its reputation). - ReportPeer(PeerId, UnifiedReputationChange), - - /// Disconnect a peer from the given peer-set without affecting their reputation. - DisconnectPeer(PeerId, PeerSet), - - /// A subsystem updates us on the relay chain leaves we consider active. - /// - /// Implementation will send `WireMessage::ViewUpdate` message to peers as appropriate to the - /// change. - ActiveLeaves(ActiveLeavesUpdate), - - /// A subsystem updates our view on the latest finalized block. - /// - /// This information is used for view updates, see also `ActiveLeaves`. - BlockFinalized(BlockNumber), - - /// Network tells us about a new peer. - PeerConnected(PeerSet, PeerId, ObservedRole), - - /// Network tells us about a peer that left. - PeerDisconnected(PeerSet, PeerId), - - /// Messages from the network targeted to other subsystems. - PeerMessages( - PeerId, - Vec>, - Vec>, - ), - - /// Send a message to another subsystem or the overseer. - /// - /// Used for handling incoming requests. - SendMessage(AllMessages), - - /// Abort with reason. - Abort(AbortReason), - Nop, -} - -#[derive(Debug)] -pub(crate) enum AbortReason { - /// Received error from overseer: - SubsystemError(polkadot_subsystem::SubsystemError), - /// The stream of incoming events concluded. - EventStreamConcluded, - /// The stream of incoming requests concluded. - RequestStreamConcluded, - /// We received OverseerSignal::Conclude - OverseerConcluded, -} - -impl From>> for Action { - fn from( - res: polkadot_subsystem::SubsystemResult>, - ) -> Self { - match res { - Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves))) => { - Action::ActiveLeaves(active_leaves) - } - Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number))) => { - Action::BlockFinalized(number) - } - Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => { - Action::Abort(AbortReason::OverseerConcluded) - } - Ok(FromOverseer::Communication { msg }) => match msg { - NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep), - NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => { - Action::DisconnectPeer(peer, peer_set) - } - NetworkBridgeMessage::SendValidationMessage(peers, msg) => { - Action::SendValidationMessages(vec![(peers, msg)]) - } - NetworkBridgeMessage::SendCollationMessage(peers, msg) => { - Action::SendCollationMessages(vec![(peers, msg)]) - } - NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => Action::SendRequests(reqs, if_disconnected), - NetworkBridgeMessage::SendValidationMessages(msgs) => { - Action::SendValidationMessages(msgs) - } - NetworkBridgeMessage::SendCollationMessages(msgs) => { - Action::SendCollationMessages(msgs) - } - NetworkBridgeMessage::ConnectToValidators { - validator_ids, - peer_set, - connected, - } => Action::ConnectToValidators { - validator_ids, - peer_set, - connected, - }, - }, - Err(e) => Action::Abort(AbortReason::SubsystemError(e)), - } - } -} - -impl From> for Action { - fn from(event: Option) -> Action { - match event { - None => Action::Abort(AbortReason::EventStreamConcluded), - Some(NetworkEvent::Dht(_)) - | Some(NetworkEvent::SyncConnected { .. }) - | Some(NetworkEvent::SyncDisconnected { .. }) => Action::Nop, - Some(NetworkEvent::NotificationStreamOpened { - remote, - protocol, - role, - }) => { - let role = role.into(); - PeerSet::try_from_protocol_name(&protocol).map_or(Action::Nop, |peer_set| { - Action::PeerConnected(peer_set, remote, role) - }) - } - Some(NetworkEvent::NotificationStreamClosed { remote, protocol }) => { - PeerSet::try_from_protocol_name(&protocol).map_or(Action::Nop, |peer_set| { - Action::PeerDisconnected(peer_set, remote) - }) - } - Some(NetworkEvent::NotificationsReceived { remote, messages }) => { - let v_messages: Result, _> = messages - .iter() - .filter(|(protocol, _)| { - protocol == &PeerSet::Validation.into_protocol_name() - }) - .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref())) - .collect(); - - let v_messages = match v_messages { - Err(_) => return Action::ReportPeer(remote, MALFORMED_MESSAGE_COST), - Ok(v) => v, - }; - - let c_messages: Result, _> = messages - .iter() - .filter(|(protocol, _)| { - protocol == &PeerSet::Collation.into_protocol_name() - }) - .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref())) - .collect(); - - match c_messages { - Err(_) => Action::ReportPeer(remote, MALFORMED_MESSAGE_COST), - Ok(c_messages) => { - if v_messages.is_empty() && c_messages.is_empty() { - Action::Nop - } else { - Action::PeerMessages(remote, v_messages, c_messages) - } - } - } - } - } - } -} - -impl From>> for Action { - fn from(event: Option>) -> Self { - match event { - None => Action::Abort(AbortReason::RequestStreamConcluded), - Some(Err(err)) => Action::ReportPeer(err.peer, MALFORMED_MESSAGE_COST), - Some(Ok(msg)) => Action::SendMessage(msg), - } - } -} diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 62dc5d8172..e7c3620acd 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -21,11 +21,14 @@ use parity_scale_codec::{Encode, Decode}; +use parking_lot::Mutex; use futures::prelude::*; +use futures::channel::mpsc; +use sc_network::Event as NetworkEvent; use polkadot_subsystem::{ ActiveLeavesUpdate, ActivatedLeaf, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, - SubsystemResult, + SubsystemResult, SubsystemSender, OverseerSignal, FromOverseer, }; use polkadot_subsystem::messages::{ NetworkBridgeMessage, AllMessages, @@ -34,6 +37,7 @@ use polkadot_subsystem::messages::{ use polkadot_primitives::v1::{Hash, BlockNumber}; use polkadot_node_network_protocol::{ PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView, UnifiedReputationChange as Rep, + ObservedRole, }; /// Peer set infos for network initialization. @@ -43,17 +47,10 @@ pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority}; use std::collections::{HashMap, hash_map}; use std::iter::ExactSizeIterator; -use std::time::Instant; +use std::sync::Arc; mod validator_discovery; -/// Internally used `Action` type. -/// -/// All requested `NetworkBridgeMessage` user actions and `NetworkEvent` network messages are -/// translated to `Action` before being processed by `run_network`. -mod action; -use action::{Action, AbortReason}; - /// Actual interfacing to the network based on the `Network` trait. /// /// Defines the `Network` trait with an implementation for an `Arc`. @@ -141,355 +138,565 @@ struct PeerData { view: View, } +#[derive(Debug)] +enum UnexpectedAbort { + /// Received error from overseer: + SubsystemError(polkadot_subsystem::SubsystemError), + /// The stream of incoming events concluded. + EventStreamConcluded, + /// The stream of incoming requests concluded. + RequestStreamConcluded, +} + +impl From for UnexpectedAbort { + fn from(e: SubsystemError) -> Self { + UnexpectedAbort::SubsystemError(e) + } +} + +// notifications to be passed through to the validator discovery worker. +enum ValidatorDiscoveryNotification { + PeerConnected(PeerId, PeerSet), + PeerDisconnected(PeerId, PeerSet), +} + +#[derive(Default, Clone)] +struct Shared(Arc>); + +#[derive(Default)] +struct SharedInner { + local_view: View, + validation_peers: HashMap, + collation_peers: HashMap, +} + +async fn handle_subsystem_messages( + mut ctx: Context, + mut network_service: N, + mut authority_discovery_service: AD, + validator_discovery_notifications: mpsc::Receiver, + shared: Shared, +) -> Result<(), UnexpectedAbort> +where + Context: SubsystemContext, + N: Network + validator_discovery::Network, + AD: validator_discovery::AuthorityDiscovery, +{ + // This is kept sorted, descending, by block number. + let mut live_heads: Vec = Vec::with_capacity(MAX_VIEW_HEADS); + let mut finalized_number = 0; + let mut validator_discovery = validator_discovery::Service::::new(); + + let mut validator_discovery_notifications = validator_discovery_notifications.fuse(); + + loop { + futures::select! { + msg = ctx.recv().fuse() => match msg { + Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves))) => { + let ActiveLeavesUpdate { activated, deactivated } = active_leaves; + tracing::trace!( + target: LOG_TARGET, + action = "ActiveLeaves", + num_activated = %activated.len(), + num_deactivated = %deactivated.len(), + ); + + for activated in activated { + let pos = live_heads + .binary_search_by(|probe| probe.number.cmp(&activated.number).reverse()) + .unwrap_or_else(|i| i); + + live_heads.insert(pos, activated); + } + live_heads.retain(|h| !deactivated.contains(&h.hash)); + + update_our_view( + &mut network_service, + &mut ctx, + &live_heads, + &shared, + finalized_number, + ).await?; + } + Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number))) => { + tracing::trace!( + target: LOG_TARGET, + action = "BlockFinalized" + ); + + debug_assert!(finalized_number < number); + + // we don't send the view updates here, but delay them until the next `ActiveLeaves` + // otherwise it might break assumptions of some of the subsystems + // that we never send the same `ActiveLeavesUpdate` + finalized_number = number; + } + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => { + return Ok(()); + } + Ok(FromOverseer::Communication { msg }) => match msg { + NetworkBridgeMessage::ReportPeer(peer, rep) => { + tracing::debug!( + target: LOG_TARGET, + action = "ReportPeer" + ); + network_service.report_peer(peer, rep).await? + } + NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => { + tracing::trace!( + target: LOG_TARGET, + action = "DisconnectPeer", + ?peer, + peer_set = ?peer_set, + ); + network_service.disconnect_peer(peer, peer_set).await?; + } + NetworkBridgeMessage::SendValidationMessage(peers, msg) => { + tracing::trace!( + target: LOG_TARGET, + action = "SendValidationMessages", + num_messages = 1, + ); + + send_message( + &mut network_service, + peers, + PeerSet::Validation, + WireMessage::ProtocolMessage(msg), + ).await? + } + NetworkBridgeMessage::SendValidationMessages(msgs) => { + tracing::trace!( + target: LOG_TARGET, + action = "SendValidationMessages", + num_messages = %msgs.len(), + ); + + for (peers, msg) in msgs { + send_message( + &mut network_service, + peers, + PeerSet::Validation, + WireMessage::ProtocolMessage(msg), + ).await? + } + } + NetworkBridgeMessage::SendCollationMessage(peers, msg) => { + tracing::trace!( + target: LOG_TARGET, + action = "SendCollationMessages", + num_messages = 1, + ); + + send_message( + &mut network_service, + peers, + PeerSet::Collation, + WireMessage::ProtocolMessage(msg), + ).await? + } + NetworkBridgeMessage::SendCollationMessages(msgs) => { + tracing::trace!( + target: LOG_TARGET, + action = "SendCollationMessages", + num_messages = %msgs.len(), + ); + + for (peers, msg) in msgs { + send_message( + &mut network_service, + peers, + PeerSet::Collation, + WireMessage::ProtocolMessage(msg), + ).await? + } + } + NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => { + tracing::trace!( + target: LOG_TARGET, + action = "SendRequests", + num_requests = %reqs.len(), + ); + + for req in reqs { + network_service + .start_request(&mut authority_discovery_service, req, if_disconnected) + .await; + } + } + NetworkBridgeMessage::ConnectToValidators { + validator_ids, + peer_set, + connected, + } => { + tracing::trace!( + target: LOG_TARGET, + action = "ConnectToValidators", + peer_set = ?peer_set, + ids = ?validator_ids, + "Received a validator connection request", + ); + + let (ns, ads) = validator_discovery.on_request( + validator_ids, + peer_set, + connected, + network_service, + authority_discovery_service, + ).await; + + network_service = ns; + authority_discovery_service = ads; + } + } + Err(e) => return Err(e.into()), + }, + notification = validator_discovery_notifications.next().fuse() => match notification { + None => return Ok(()), + Some(ValidatorDiscoveryNotification::PeerConnected(peer, peer_set)) => { + validator_discovery.on_peer_connected( + peer.clone(), + peer_set, + &mut authority_discovery_service, + ).await; + } + Some(ValidatorDiscoveryNotification::PeerDisconnected(peer, peer_set)) => { + validator_discovery.on_peer_disconnected(&peer, peer_set); + } + }, + } + } +} + +async fn handle_network_messages( + mut sender: impl SubsystemSender, + mut network_service: impl Network, + mut request_multiplexer: RequestMultiplexer, + mut validator_discovery_notifications: mpsc::Sender, + shared: Shared, +) -> Result<(), UnexpectedAbort> { + let mut network_stream = network_service.event_stream(); + + loop { + futures::select! { + network_event = network_stream.next().fuse() => match network_event { + None => return Err(UnexpectedAbort::EventStreamConcluded), + Some(NetworkEvent::Dht(_)) + | Some(NetworkEvent::SyncConnected { .. }) + | Some(NetworkEvent::SyncDisconnected { .. }) => {} + Some(NetworkEvent::NotificationStreamOpened { remote: peer, protocol, role }) => { + let role = ObservedRole::from(role); + let peer_set = match PeerSet::try_from_protocol_name(&protocol) { + None => continue, + Some(peer_set) => peer_set, + }; + + tracing::trace!( + target: LOG_TARGET, + action = "PeerConnected", + peer_set = ?peer_set, + peer = ?peer, + role = ?role + ); + + let local_view = { + let mut shared = shared.0.lock(); + let peer_map = match peer_set { + PeerSet::Validation => &mut shared.validation_peers, + PeerSet::Collation => &mut shared.collation_peers, + }; + + match peer_map.entry(peer.clone()) { + hash_map::Entry::Occupied(_) => continue, + hash_map::Entry::Vacant(vacant) => { + vacant.insert(PeerData { view: View::default() }); + } + } + + shared.local_view.clone() + }; + + // Failure here means that the other side of the network bridge + // has concluded and this future will be dropped in due course. + let _ = validator_discovery_notifications.send( + ValidatorDiscoveryNotification::PeerConnected(peer.clone(), peer_set) + ).await; + + + match peer_set { + PeerSet::Validation => { + dispatch_validation_events_to_all( + vec![ + NetworkBridgeEvent::PeerConnected(peer.clone(), role), + NetworkBridgeEvent::PeerViewChange( + peer.clone(), + View::default(), + ), + ], + &mut sender, + ).await; + + send_message( + &mut network_service, + vec![peer], + PeerSet::Validation, + WireMessage::::ViewUpdate( + local_view, + ), + ).await?; + } + PeerSet::Collation => { + dispatch_collation_events_to_all( + vec![ + NetworkBridgeEvent::PeerConnected(peer.clone(), role), + NetworkBridgeEvent::PeerViewChange( + peer.clone(), + View::default(), + ), + ], + &mut sender, + ).await; + + send_message( + &mut network_service, + vec![peer], + PeerSet::Collation, + WireMessage::::ViewUpdate( + local_view, + ), + ).await?; + } + } + } + Some(NetworkEvent::NotificationStreamClosed { remote: peer, protocol }) => { + let peer_set = match PeerSet::try_from_protocol_name(&protocol) { + None => continue, + Some(peer_set) => peer_set, + }; + + tracing::trace!( + target: LOG_TARGET, + action = "PeerDisconnected", + peer_set = ?peer_set, + peer = ?peer + ); + + let was_connected = { + let mut shared = shared.0.lock(); + let peer_map = match peer_set { + PeerSet::Validation => &mut shared.validation_peers, + PeerSet::Collation => &mut shared.collation_peers, + }; + + peer_map.remove(&peer).is_some() + }; + + // Failure here means that the other side of the network bridge + // has concluded and this future will be dropped in due course. + let _ = validator_discovery_notifications.send( + ValidatorDiscoveryNotification::PeerDisconnected(peer.clone(), peer_set) + ).await; + + if was_connected { + match peer_set { + PeerSet::Validation => dispatch_validation_event_to_all( + NetworkBridgeEvent::PeerDisconnected(peer), + &mut sender, + ).await, + PeerSet::Collation => dispatch_collation_event_to_all( + NetworkBridgeEvent::PeerDisconnected(peer), + &mut sender, + ).await, + } + } + } + Some(NetworkEvent::NotificationsReceived { remote, messages }) => { + let v_messages: Result, _> = messages + .iter() + .filter(|(protocol, _)| { + protocol == &PeerSet::Validation.into_protocol_name() + }) + .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref())) + .collect(); + + let v_messages = match v_messages { + Err(_) => { + tracing::debug!( + target: LOG_TARGET, + action = "ReportPeer" + ); + + network_service.report_peer(remote, MALFORMED_MESSAGE_COST).await?; + continue; + } + Ok(v) => v, + }; + + let c_messages: Result, _> = messages + .iter() + .filter(|(protocol, _)| { + protocol == &PeerSet::Collation.into_protocol_name() + }) + .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref())) + .collect(); + + match c_messages { + Err(_) => { + tracing::debug!( + target: LOG_TARGET, + action = "ReportPeer" + ); + + network_service.report_peer(remote, MALFORMED_MESSAGE_COST).await?; + continue; + } + Ok(c_messages) => { + if v_messages.is_empty() && c_messages.is_empty() { + continue; + } else { + tracing::trace!( + target: LOG_TARGET, + action = "PeerMessages", + peer = ?remote, + num_validation_messages = %v_messages.len(), + num_collation_messages = %c_messages.len() + ); + + if !v_messages.is_empty() { + let (events, reports) = handle_peer_messages( + remote.clone(), + &mut shared.0.lock().validation_peers, + v_messages, + ); + + for report in reports { + network_service.report_peer(remote.clone(), report).await?; + } + + dispatch_validation_events_to_all(events, &mut sender).await; + } + + if !c_messages.is_empty() { + let (events, reports) = handle_peer_messages( + remote.clone(), + &mut shared.0.lock().collation_peers, + c_messages, + ); + + for report in reports { + network_service.report_peer(remote.clone(), report).await?; + } + + + dispatch_collation_events_to_all(events, &mut sender).await; + } + } + } + } + } + }, + req_res_event = request_multiplexer.next().fuse() => match req_res_event { + None => return Err(UnexpectedAbort::RequestStreamConcluded), + Some(Err(err)) => { + sender.send_message(NetworkBridgeMessage::ReportPeer( + err.peer, + MALFORMED_MESSAGE_COST, + ).into()).await; + } + Some(Ok(msg)) => { + sender.send_message(msg).await; + } + }, + } + } +} + /// Main driver, processing network events and messages from other subsystems. +/// +/// THIS IS A HACK. We need to ensure we never hold the mutex across a `.await` boundary +/// and `parking_lot` currently does not provide `Send`, which helps us enforce that. +/// If this breaks, we need to find another way to protect ourselves. +/// +/// ```compile_fail +/// #use parking_lot::MutexGuard; +/// #fn is_send(); +/// #is_send::(); +/// ``` #[tracing::instrument(skip(bridge, ctx), fields(subsystem = LOG_TARGET))] async fn run_network( - mut bridge: NetworkBridge, + bridge: NetworkBridge, mut ctx: impl SubsystemContext, ) -> SubsystemResult<()> where N: Network + validator_discovery::Network, AD: validator_discovery::AuthorityDiscovery, { - let mut event_stream = bridge.network_service.event_stream().fuse(); + let shared = Shared::default(); - // This is kept sorted, descending, by block number. - let mut live_heads: Vec = Vec::with_capacity(MAX_VIEW_HEADS); - let mut local_view = View::default(); - let mut finalized_number = 0; + let NetworkBridge { + network_service, + request_multiplexer, + authority_discovery_service, + } = bridge; - let mut validation_peers: HashMap = HashMap::new(); - let mut collation_peers: HashMap = HashMap::new(); + let (validation_worker_tx, validation_worker_rx) = mpsc::channel(1024); - let mut validator_discovery = validator_discovery::Service::::new(); + let (remote, network_event_handler) = handle_network_messages( + ctx.sender().clone(), + network_service.clone(), + request_multiplexer, + validation_worker_tx, + shared.clone(), + ).remote_handle(); - loop { + ctx.spawn("network-bridge-network-worker", Box::pin(remote)).await?; - let action = { - let subsystem_next = ctx.recv().fuse(); - let mut net_event_next = event_stream.next().fuse(); - let mut req_res_event_next = bridge.request_multiplexer.next(); - futures::pin_mut!(subsystem_next); + let subsystem_event_handler = handle_subsystem_messages( + ctx, + network_service, + authority_discovery_service, + validation_worker_rx, + shared, + ); - futures::select! { - subsystem_msg = subsystem_next => Action::from(subsystem_msg), - net_event = net_event_next => Action::from(net_event), - req_res_event = req_res_event_next => Action::from(req_res_event), - } - }; + futures::pin_mut!(subsystem_event_handler); - // Used for logging purposes. - let before_action_process = Instant::now(); + match futures::future::select(subsystem_event_handler, network_event_handler) + .await + .factor_first() + .0 + { + Ok(()) => Ok(()), + Err(UnexpectedAbort::SubsystemError(err)) => { + tracing::warn!( + target: LOG_TARGET, + err = ?err, + "Shutting down Network Bridge due to error" + ); - match action { - Action::Nop => {} - Action::Abort(reason) => match reason { - AbortReason::SubsystemError(err) => { - tracing::warn!( - target: LOG_TARGET, - err = ?err, - "Shutting down Network Bridge due to error" - ); - return Err(SubsystemError::Context(format!( - "Received SubsystemError from overseer: {:?}", - err - ))); - } - AbortReason::EventStreamConcluded => { - tracing::info!( - target: LOG_TARGET, - "Shutting down Network Bridge: underlying request stream concluded" - ); - return Err(SubsystemError::Context( - "Incoming network event stream concluded.".to_string(), - )); - } - AbortReason::RequestStreamConcluded => { - tracing::info!( - target: LOG_TARGET, - "Shutting down Network Bridge: underlying request stream concluded" - ); - return Err(SubsystemError::Context( - "Incoming network request stream concluded".to_string(), - )); - } - AbortReason::OverseerConcluded => return Ok(()), - } - - Action::SendValidationMessages(msgs) => { - tracing::trace!( - target: LOG_TARGET, - action = "SendValidationMessages", - num_messages = %msgs.len(), - ); - - for (peers, msg) in msgs { - send_message( - &mut bridge.network_service, - peers, - PeerSet::Validation, - WireMessage::ProtocolMessage(msg), - ).await? - } - } - - Action::SendCollationMessages(msgs) => { - tracing::trace!( - target: LOG_TARGET, - action = "SendCollationMessages", - num_messages = %msgs.len(), - ); - - for (peers, msg) in msgs { - send_message( - &mut bridge.network_service, - peers, - PeerSet::Collation, - WireMessage::ProtocolMessage(msg), - ).await? - } - } - - Action::SendRequests(reqs, if_disconnected) => { - tracing::trace!( - target: LOG_TARGET, - action = "SendRequests", - num_requests = %reqs.len(), - ); - - for req in reqs { - bridge - .network_service - .start_request(&mut bridge.authority_discovery_service, req, if_disconnected) - .await; - } - }, - - Action::ConnectToValidators { - validator_ids, - peer_set, - connected, - } => { - tracing::trace!( - target: LOG_TARGET, - action = "ConnectToValidators", - peer_set = ?peer_set, - ids = ?validator_ids, - "Received a validator connection request", - ); - let (ns, ads) = validator_discovery.on_request( - validator_ids, - peer_set, - connected, - bridge.network_service, - bridge.authority_discovery_service, - ).await; - bridge.network_service = ns; - bridge.authority_discovery_service = ads; - }, - - Action::ReportPeer(peer, rep) => { - tracing::debug!( - target: LOG_TARGET, - action = "ReportPeer" - ); - bridge.network_service.report_peer(peer, rep).await? - } - - Action::DisconnectPeer(peer, peer_set) => { - tracing::trace!( - target: LOG_TARGET, - action = "DisconnectPeer", - ?peer, - peer_set = ?peer_set, - ); - bridge.network_service.disconnect_peer(peer, peer_set); - } - - Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { - tracing::trace!( - target: LOG_TARGET, - action = "ActiveLeaves", - num_activated = %activated.len(), - num_deactivated = %deactivated.len(), - ); - - for activated in activated { - let pos = live_heads - .binary_search_by(|probe| probe.number.cmp(&activated.number).reverse()) - .unwrap_or_else(|i| i); - - live_heads.insert(pos, activated); - } - live_heads.retain(|h| !deactivated.contains(&h.hash)); - - update_our_view( - &mut bridge.network_service, - &mut ctx, - &live_heads, - &mut local_view, - finalized_number, - &validation_peers, - &collation_peers, - ).await?; - } - - Action::BlockFinalized(number) => { - tracing::trace!( - target: LOG_TARGET, - action = "BlockFinalized" - ); - - debug_assert!(finalized_number < number); - - // we don't send the view updates here, but delay them until the next `Action::ActiveLeaves` - // otherwise it might break assumptions of some of the subsystems - // that we never send the same `ActiveLeavesUpdate` - // this is fine, we will get `Action::ActiveLeaves` on block finalization anyway - finalized_number = number; - }, - - Action::PeerConnected(peer_set, peer, role) => { - tracing::trace!( - target: LOG_TARGET, - action = "PeerConnected", - peer_set = ?peer_set, - peer = ?peer, - role = ?role - ); - - let peer_map = match peer_set { - PeerSet::Validation => &mut validation_peers, - PeerSet::Collation => &mut collation_peers, - }; - - validator_discovery.on_peer_connected( - peer.clone(), - peer_set, - &mut bridge.authority_discovery_service, - ).await; - - match peer_map.entry(peer.clone()) { - hash_map::Entry::Occupied(_) => continue, - hash_map::Entry::Vacant(vacant) => { - let _ = vacant.insert(PeerData { - view: View::default(), - }); - - match peer_set { - PeerSet::Validation => { - dispatch_validation_events_to_all( - vec![ - NetworkBridgeEvent::PeerConnected(peer.clone(), role), - NetworkBridgeEvent::PeerViewChange( - peer.clone(), - View::default(), - ), - ], - &mut ctx, - ).await; - - send_message( - &mut bridge.network_service, - vec![peer], - PeerSet::Validation, - WireMessage::::ViewUpdate( - local_view.clone() - ), - ).await?; - } - PeerSet::Collation => { - dispatch_collation_events_to_all( - vec![ - NetworkBridgeEvent::PeerConnected(peer.clone(), role), - NetworkBridgeEvent::PeerViewChange( - peer.clone(), - View::default(), - ), - ], - &mut ctx, - ).await; - - send_message( - &mut bridge.network_service, - vec![peer], - PeerSet::Collation, - WireMessage::::ViewUpdate( - local_view.clone() - ), - ).await?; - } - } - } - } - } - Action::PeerDisconnected(peer_set, peer) => { - tracing::trace!( - target: LOG_TARGET, - action = "PeerDisconnected", - peer_set = ?peer_set, - peer = ?peer - ); - - let peer_map = match peer_set { - PeerSet::Validation => &mut validation_peers, - PeerSet::Collation => &mut collation_peers, - }; - - validator_discovery.on_peer_disconnected(&peer, peer_set); - - if peer_map.remove(&peer).is_some() { - match peer_set { - PeerSet::Validation => dispatch_validation_event_to_all( - NetworkBridgeEvent::PeerDisconnected(peer), - &mut ctx, - ).await, - PeerSet::Collation => dispatch_collation_event_to_all( - NetworkBridgeEvent::PeerDisconnected(peer), - &mut ctx, - ).await, - } - } - }, - Action::PeerMessages(peer, v_messages, c_messages) => { - tracing::trace!( - target: LOG_TARGET, - action = "PeerMessages", - peer = ?peer, - num_validation_messages = %v_messages.len(), - num_collation_messages = %c_messages.len() - ); - - if !v_messages.is_empty() { - let events = handle_peer_messages( - peer.clone(), - &mut validation_peers, - v_messages, - &mut bridge.network_service, - ).await?; - - dispatch_validation_events_to_all(events, &mut ctx).await; - } - - if !c_messages.is_empty() { - let events = handle_peer_messages( - peer.clone(), - &mut collation_peers, - c_messages, - &mut bridge.network_service, - ).await?; - - dispatch_collation_events_to_all(events, &mut ctx).await; - } - }, - Action::SendMessage(msg) => ctx.send_message(msg).await, + Err(SubsystemError::Context(format!( + "Received SubsystemError from overseer: {:?}", + err + ))) + } + Err(UnexpectedAbort::EventStreamConcluded) => { + tracing::info!( + target: LOG_TARGET, + "Shutting down Network Bridge: underlying request stream concluded" + ); + Err(SubsystemError::Context( + "Incoming network event stream concluded.".to_string(), + )) + } + Err(UnexpectedAbort::RequestStreamConcluded) => { + tracing::info!( + target: LOG_TARGET, + "Shutting down Network Bridge: underlying request stream concluded" + ); + Err(SubsystemError::Context( + "Incoming network request stream concluded".to_string(), + )) } - - tracing::trace!( - target: LOG_TARGET, - elapsed = ?before_action_process.elapsed(), - "Processed action", - ); } } @@ -500,35 +707,42 @@ fn construct_view(live_heads: impl DoubleEndedIterator, finalized_n ) } -#[tracing::instrument(level = "trace", skip(net, ctx, validation_peers, collation_peers), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(net, ctx, shared), fields(subsystem = LOG_TARGET))] async fn update_our_view( net: &mut impl Network, ctx: &mut impl SubsystemContext, live_heads: &[ActivatedLeaf], - local_view: &mut View, + shared: &Shared, finalized_number: BlockNumber, - validation_peers: &HashMap, - collation_peers: &HashMap, ) -> SubsystemResult<()> { let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number); - // We only want to send a view update when the heads changed. - // A change in finalized block number only is _not_ sufficient. - if local_view.check_heads_eq(&new_view) { - return Ok(()) - } + let (validation_peers, collation_peers) = { + let mut shared = shared.0.lock(); - *local_view = new_view.clone(); + // We only want to send a view update when the heads changed. + // A change in finalized block number only is _not_ sufficient. + if shared.local_view.check_heads_eq(&new_view) { + return Ok(()) + } + + shared.local_view = new_view.clone(); + + ( + shared.validation_peers.keys().cloned().collect::>(), + shared.collation_peers.keys().cloned().collect::>(), + ) + }; send_validation_message( net, - validation_peers.keys().cloned(), + validation_peers, WireMessage::ViewUpdate(new_view.clone()), ).await?; send_collation_message( net, - collation_peers.keys().cloned(), + collation_peers, WireMessage::ViewUpdate(new_view), ).await?; @@ -537,50 +751,47 @@ async fn update_our_view( finalized_number, ); - dispatch_validation_event_to_all(NetworkBridgeEvent::OurViewChange(our_view.clone()), ctx).await; + dispatch_validation_event_to_all_unbounded( + NetworkBridgeEvent::OurViewChange(our_view.clone()), + ctx.sender(), + ); - dispatch_collation_event_to_all(NetworkBridgeEvent::OurViewChange(our_view), ctx).await; + dispatch_collation_event_to_all_unbounded( + NetworkBridgeEvent::OurViewChange(our_view), + ctx.sender(), + ); Ok(()) } // Handle messages on a specific peer-set. The peer is expected to be connected on that // peer-set. -#[tracing::instrument(level = "trace", skip(peers, messages, net), fields(subsystem = LOG_TARGET))] -async fn handle_peer_messages( +#[tracing::instrument(level = "trace", skip(peers, messages), fields(subsystem = LOG_TARGET))] +fn handle_peer_messages( peer: PeerId, peers: &mut HashMap, messages: Vec>, - net: &mut impl Network, -) -> SubsystemResult>> { +) -> (Vec>, Vec) { let peer_data = match peers.get_mut(&peer) { None => { - net.report_peer(peer, UNCONNECTED_PEERSET_COST).await?; - - return Ok(Vec::new()); + return (Vec::new(), vec![UNCONNECTED_PEERSET_COST]); }, Some(d) => d, }; let mut outgoing_messages = Vec::with_capacity(messages.len()); + let mut reports = Vec::new(); + for message in messages { outgoing_messages.push(match message { WireMessage::ViewUpdate(new_view) => { if new_view.len() > MAX_VIEW_HEADS || new_view.finalized_number < peer_data.view.finalized_number { - net.report_peer( - peer.clone(), - MALFORMED_VIEW_COST, - ).await?; - + reports.push(MALFORMED_VIEW_COST); continue } else if new_view.is_empty() { - net.report_peer( - peer.clone(), - EMPTY_VIEW_COST, - ).await?; - + reports.push(EMPTY_VIEW_COST); continue } else if new_view == peer_data.view { continue @@ -599,7 +810,7 @@ async fn handle_peer_messages( }) } - Ok(outgoing_messages) + (outgoing_messages, reports) } #[tracing::instrument(level = "trace", skip(net, peers), fields(subsystem = LOG_TARGET))] @@ -631,22 +842,40 @@ async fn send_collation_message( async fn dispatch_validation_event_to_all( event: NetworkBridgeEvent, - ctx: &mut impl SubsystemContext, + ctx: &mut impl SubsystemSender ) { dispatch_validation_events_to_all(std::iter::once(event), ctx).await } async fn dispatch_collation_event_to_all( event: NetworkBridgeEvent, - ctx: &mut impl SubsystemContext, + ctx: &mut impl SubsystemSender ) { dispatch_collation_events_to_all(std::iter::once(event), ctx).await } +fn dispatch_validation_event_to_all_unbounded( + event: NetworkBridgeEvent, + ctx: &mut impl SubsystemSender +) { + for msg in AllMessages::dispatch_iter(event) { + ctx.send_unbounded_message(msg); + } +} + +fn dispatch_collation_event_to_all_unbounded( + event: NetworkBridgeEvent, + ctx: &mut impl SubsystemSender +) { + if let Some(msg) = event.focus().ok().map(CollatorProtocolMessage::NetworkBridgeUpdateV1) { + ctx.send_unbounded_message(msg.into()); + } +} + #[tracing::instrument(level = "trace", skip(events, ctx), fields(subsystem = LOG_TARGET))] async fn dispatch_validation_events_to_all( events: I, - ctx: &mut impl SubsystemContext, + ctx: &mut impl SubsystemSender ) where I: IntoIterator>, @@ -658,7 +887,7 @@ async fn dispatch_validation_events_to_all( #[tracing::instrument(level = "trace", skip(events, ctx), fields(subsystem = LOG_TARGET))] async fn dispatch_collation_events_to_all( events: I, - ctx: &mut impl SubsystemContext, + ctx: &mut impl SubsystemSender ) where I: IntoIterator>, @@ -713,6 +942,7 @@ mod tests { use crate::validator_discovery::AuthorityDiscovery; // The subsystem's view of the network - only supports a single call to `event_stream`. + #[derive(Clone)] struct TestNetwork { net_events: Arc>>>, action_tx: metered::UnboundedMeteredSender, diff --git a/polkadot/node/network/bridge/src/network.rs b/polkadot/node/network/bridge/src/network.rs index 54d8ca12a4..a4fb7150d0 100644 --- a/polkadot/node/network/bridge/src/network.rs +++ b/polkadot/node/network/bridge/src/network.rs @@ -99,7 +99,7 @@ pub enum NetworkAction { /// An abstraction over networking for the purposes of this subsystem. #[async_trait] -pub trait Network: Send + 'static { +pub trait Network: Clone + Send + 'static { /// Get a stream of all events occurring on the network. This may include events unrelated /// to the Polkadot protocol - the user of this function should filter only for events related /// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME) diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index 76fd86a3f6..955bc22ccc 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -162,6 +162,15 @@ pub struct TestSubsystemSender { tx: mpsc::UnboundedSender, } +/// Construct a sender/receiver pair. +pub fn sender_receiver() -> (TestSubsystemSender, mpsc::UnboundedReceiver) { + let (tx, rx) = mpsc::unbounded(); + ( + TestSubsystemSender { tx }, + rx, + ) +} + #[async_trait::async_trait] impl SubsystemSender for TestSubsystemSender { async fn send_message(&mut self, msg: AllMessages) { diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 4b8b3d2c91..0358cf7f9b 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -27,7 +27,8 @@ use polkadot_node_subsystem::{ errors::RuntimeApiError, messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender, BoundToRelayParent}, - FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult, + FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemSender, + ActiveLeavesUpdate, OverseerSignal, }; use polkadot_node_jaeger as jaeger; use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::Stream}; @@ -100,20 +101,20 @@ pub enum Error { pub type RuntimeApiReceiver = oneshot::Receiver>; /// Request some data from the `RuntimeApi`. -pub async fn request_from_runtime( +pub async fn request_from_runtime( parent: Hash, - sender: &mut mpsc::Sender, + sender: &mut Sender, request_builder: RequestBuilder, -) -> Result, Error> +) -> RuntimeApiReceiver where RequestBuilder: FnOnce(RuntimeApiSender) -> RuntimeApiRequest, - FromJob: From, + Sender: SubsystemSender, { let (tx, rx) = oneshot::channel(); - sender.send(AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx))).into()).await?; + sender.send_message(RuntimeApiMessage::Request(parent, request_builder(tx)).into()).await; - Ok(rx) + rx } /// Construct specialized request functions for the runtime. @@ -132,16 +133,13 @@ macro_rules! specialize_requests { #[doc = "Request `"] #[doc = $doc_name] #[doc = "` from the runtime"] - pub async fn $func_name( + pub async fn $func_name( parent: Hash, $( $param_name: $param_ty, )* - sender: &mut mpsc::Sender, - ) -> Result, Error> - where - FromJob: From, - { + sender: &mut impl SubsystemSender, + ) -> RuntimeApiReceiver<$return_ty> { request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant( $( $param_name, )* tx )).await @@ -282,20 +280,17 @@ pub struct Validator { impl Validator { /// Get a struct representing this node's validator if this node is in fact a validator in the context of the given block. - pub async fn new( + pub async fn new( parent: Hash, keystore: SyncCryptoStorePtr, - mut sender: mpsc::Sender, - ) -> Result - where - FromJob: From, - { + sender: &mut JobSender, + ) -> Result { // Note: request_validators and request_session_index_for_child do not and cannot // run concurrently: they both have a mutable handle to the same sender. // However, each of them returns a oneshot::Receiver, and those are resolved concurrently. let (validators, session_index) = futures::try_join!( - request_validators(parent, &mut sender).await?, - request_session_index_for_child(parent, &mut sender).await?, + request_validators(parent, sender).await, + request_session_index_for_child(parent, sender).await, )?; let signing_context = SigningContext { @@ -429,27 +424,76 @@ pub mod metrics { /// Commands from a job to the broader subsystem. pub enum FromJobCommand { - /// Send a message to another subsystem. - SendMessage(AllMessages), /// Spawn a child task on the executor. Spawn(&'static str, Pin + Send>>), /// Spawn a blocking child task on the executor's dedicated thread pool. SpawnBlocking(&'static str, Pin + Send>>), } -impl fmt::Debug for FromJobCommand { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::SendMessage(msg) => write!(fmt, "FromJobCommand::SendMessage({:?})", msg), - Self::Spawn(name, _) => write!(fmt, "FromJobCommand::Spawn({})", name), - Self::SpawnBlocking(name, _) => write!(fmt, "FromJobCommand::SpawnBlocking({})", name), - } +/// A sender for messages from jobs, as well as commands to the overseer. +#[derive(Clone)] +pub struct JobSender { + sender: S, + from_job: mpsc::Sender, +} + +impl JobSender { + /// Get access to the underlying subsystem sender. + pub fn subsystem_sender(&mut self) -> &mut S { + &mut self.sender + } + + /// Send a direct message to some other `Subsystem`, routed based on message type. + pub async fn send_message(&mut self, msg: AllMessages) { + self.sender.send_message(msg).await + } + + /// Send multiple direct messages to other `Subsystem`s, routed based on message type. + pub async fn send_messages(&mut self, msgs: T) + where T: IntoIterator + Send, T::IntoIter: Send + { + self.sender.send_messages(msgs).await + } + + + /// Send a message onto the unbounded queue of some other `Subsystem`, routed based on message + /// type. + /// + /// This function should be used only when there is some other bounding factor on the messages + /// sent with it. Otherwise, it risks a memory leak. + pub fn send_unbounded_message(&mut self, msg: AllMessages) { + self.sender.send_unbounded_message(msg) + } + + /// Send a command to the subsystem, to be relayed onwards to the overseer. + pub async fn send_command(&mut self, msg: FromJobCommand) -> Result<(), mpsc::SendError> { + self.from_job.send(msg).await } } -impl From for FromJobCommand { - fn from(msg: AllMessages) -> Self { - Self::SendMessage(msg) +#[async_trait::async_trait] +impl SubsystemSender for JobSender { + async fn send_message(&mut self, msg: AllMessages) { + self.sender.send_message(msg).await + } + + async fn send_messages(&mut self, msgs: T) + where T: IntoIterator + Send, T::IntoIter: Send + { + self.sender.send_messages(msgs).await + } + + fn send_unbounded_message(&mut self, msg: AllMessages) { + self.sender.send_unbounded_message(msg) + } +} + +impl fmt::Debug for FromJobCommand { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Spawn(name, _) => write!(fmt, "FromJobCommand::Spawn({})", name), + Self::SpawnBlocking(name, _) => write!(fmt, "FromJobCommand::SpawnBlocking({})", name), + } } } @@ -457,7 +501,7 @@ impl From for FromJobCommand { /// /// Jobs are instantiated and killed automatically on appropriate overseer messages. /// Other messages are passed along to and from the job via the overseer to other subsystems. -pub trait JobTrait: Unpin { +pub trait JobTrait: Unpin + Sized { /// Message type used to send messages to the job. type ToJob: 'static + BoundToRelayParent + Send; /// Job runtime error. @@ -479,13 +523,13 @@ pub trait JobTrait: Unpin { /// Run a job for the given relay `parent`. /// /// The job should be ended when `receiver` returns `None`. - fn run( + fn run( parent: Hash, span: Arc, run_args: Self::RunArgs, metrics: Self::Metrics, receiver: mpsc::Receiver, - sender: mpsc::Sender, + sender: JobSender, ) -> Pin> + Send>>; } @@ -493,7 +537,7 @@ pub trait JobTrait: Unpin { /// /// Wraps the utility error type and the job-specific error #[derive(Debug, Error)] -pub enum JobsError { +pub enum JobsError { /// utility error #[error("Utility")] Utility(#[source] Error), @@ -508,61 +552,50 @@ pub enum JobsError { /// - Closes old jobs for a given relay-parent on demand. /// - Dispatches messages to the appropriate job for a given relay-parent. /// - When dropped, aborts all remaining jobs. -/// - implements `Stream`, collecting all messages from subordinate jobs. +/// - implements `Stream`, collecting all messages from subordinate jobs. #[pin_project] -pub struct Jobs { +struct Jobs { spawner: Spawner, - running: HashMap>, + running: HashMap>, outgoing_msgs: StreamUnordered>, - #[pin] - job: std::marker::PhantomData, - errors: Option, JobsError)>>, } -impl Jobs { +impl Jobs { /// Create a new Jobs manager which handles spawning appropriate jobs. pub fn new(spawner: Spawner) -> Self { Self { spawner, running: HashMap::new(), outgoing_msgs: StreamUnordered::new(), - job: std::marker::PhantomData, - errors: None, } } - /// Monitor errors which may occur during handling of a spawned job. - /// - /// By default, an error in a job is simply logged. Once this is called, - /// the error is forwarded onto the provided channel. - /// - /// Errors if the error channel already exists. - pub fn forward_errors( - &mut self, - tx: mpsc::Sender<(Option, JobsError)>, - ) -> Result<(), Error> { - if self.errors.is_some() { - return Err(Error::AlreadyForwarding); - } - self.errors = Some(tx); - Ok(()) - } - /// Spawn a new job for this `parent_hash`, with whatever args are appropriate. - fn spawn_job( + fn spawn_job( &mut self, parent_hash: Hash, span: Arc, run_args: Job::RunArgs, metrics: Job::Metrics, - ) -> Result<(), Error> { + sender: Sender, + ) + where Job: JobTrait, Sender: SubsystemSender, + { let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); - let err_tx = self.errors.clone(); - let (future, abort_handle) = future::abortable(async move { - if let Err(e) = Job::run(parent_hash, span, run_args, metrics, to_job_rx, from_job_tx).await { + if let Err(e) = Job::run( + parent_hash, + span, + run_args, + metrics, + to_job_rx, + JobSender { + sender, + from_job: from_job_tx, + }, + ).await { tracing::error!( job = Job::NAME, parent_hash = %parent_hash, @@ -570,19 +603,13 @@ impl Jobs { "job finished with an error", ); - if let Some(mut err_tx) = err_tx { - // if we can't send the notification of error on the error channel, then - // there's no point trying to propagate this error onto the channel too - // all we can do is warn that error propagation has failed - if let Err(e) = err_tx.send((Some(parent_hash), JobsError::Job(e))).await { - tracing::warn!(err = ?e, "failed to forward error"); - } - } + return Err(e); } + + Ok(()) }); self.spawner.spawn(Job::NAME, future.map(drop).boxed()); - self.outgoing_msgs.push(from_job_rx); let handle = JobHandle { @@ -591,8 +618,6 @@ impl Jobs { }; self.running.insert(parent_hash, handle); - - Ok(()) } /// Stop the job associated with this `parent_hash`. @@ -601,7 +626,7 @@ impl Jobs { } /// Send a message to the appropriate job for this `parent_hash`. - async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) { + async fn send_msg(&mut self, parent_hash: Hash, msg: ToJob) { if let Entry::Occupied(mut job) = self.running.entry(parent_hash) { if job.get_mut().send_msg(msg).await.is_err() { job.remove(); @@ -610,10 +635,9 @@ impl Jobs { } } -impl Stream for Jobs +impl Stream for Jobs where Spawner: SpawnNamed, - Job: JobTrait, { type Item = FromJobCommand; @@ -633,107 +657,123 @@ where } } -impl stream::FusedStream for Jobs +impl stream::FusedStream for Jobs where Spawner: SpawnNamed, - Job: JobTrait, { fn is_terminated(&self) -> bool { false } } - -/// A basic implementation of a subsystem. -/// -/// This struct is responsible for handling message traffic between -/// this subsystem and the overseer. It spawns and kills jobs on the -/// appropriate Overseer messages, and dispatches standard traffic to -/// the appropriate job the rest of the time. -pub struct JobManager { +/// Parameters to a job subsystem. +struct JobSubsystemParams { + /// A spawner for sub-tasks. spawner: Spawner, - run_args: Job::RunArgs, - metrics: Job::Metrics, - context: std::marker::PhantomData, - job: std::marker::PhantomData, - errors: Option, JobsError)>>, + /// Arguments to each job. + run_args: RunArgs, + /// Metrics for the subsystem. + metrics: Metrics, } -impl JobManager -where - Spawner: SpawnNamed + Clone + Send + Unpin, - Context: SubsystemContext, - Job: 'static + JobTrait, - Job::RunArgs: Clone, - Job::ToJob: From<::Message> + Sync, -{ - /// Creates a new `Subsystem`. +/// A subsystem which wraps jobs. +/// +/// Conceptually, this is very simple: it just loops forever. +/// +/// - On incoming overseer messages, it starts or stops jobs as appropriate. +/// - On other incoming messages, if they can be converted into Job::ToJob and +/// include a hash, then they're forwarded to the appropriate individual job. +/// - On outgoing messages from the jobs, it forwards them to the overseer. +pub struct JobSubsystem { + params: JobSubsystemParams, + _marker: std::marker::PhantomData, +} + +impl JobSubsystem { + /// Create a new `JobSubsystem`. pub fn new(spawner: Spawner, run_args: Job::RunArgs, metrics: Job::Metrics) -> Self { - Self { - spawner, - run_args, - metrics, - context: std::marker::PhantomData, - job: std::marker::PhantomData, - errors: None, + JobSubsystem { + params: JobSubsystemParams { + spawner, + run_args, + metrics, + }, + _marker: std::marker::PhantomData, } } - /// Monitor errors which may occur during handling of a spawned job. - /// - /// By default, an error in a job is simply logged. Once this is called, - /// the error is forwarded onto the provided channel. - /// - /// Errors if the error channel already exists. - pub fn forward_errors( - &mut self, - tx: mpsc::Sender<(Option, JobsError)>, - ) -> Result<(), Error> { - if self.errors.is_some() { - return Err(Error::AlreadyForwarding); - } - self.errors = Some(tx); - Ok(()) - } + /// Run the subsystem to completion. + pub async fn run(self, mut ctx: Context) + where + Spawner: SpawnNamed + Send + Clone + Unpin + 'static, + Context: SubsystemContext, + Job: 'static + JobTrait + Send, + Job::RunArgs: Clone + Sync, + Job::ToJob: From<::Message> + Sync, + Job::Metrics: Sync, + { + let JobSubsystem { + params: JobSubsystemParams { + spawner, + run_args, + metrics, + }, + .. + } = self; - /// Run this subsystem - /// - /// Conceptually, this is very simple: it just loops forever. - /// - /// - On incoming overseer messages, it starts or stops jobs as appropriate. - /// - On other incoming messages, if they can be converted into Job::ToJob and - /// include a hash, then they're forwarded to the appropriate individual job. - /// - On outgoing messages from the jobs, it forwards them to the overseer. - /// - /// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur. - /// Otherwise, most are logged and then discarded. - pub async fn run( - mut ctx: Context, - run_args: Job::RunArgs, - metrics: Job::Metrics, - spawner: Spawner, - mut err_tx: Option, JobsError)>>, - ) { - let mut jobs = Jobs::new(spawner.clone()); - if let Some(ref err_tx) = err_tx { - jobs.forward_errors(err_tx.clone()) - .expect("we never call this twice in this context; qed"); - } + let mut jobs = Jobs::new(spawner); loop { select! { - incoming = ctx.recv().fuse() => - if Self::handle_incoming( - incoming, - &mut jobs, - &run_args, - &metrics, - &mut err_tx, - ).await { - break - }, + incoming = ctx.recv().fuse() => { + match incoming { + Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated, + deactivated, + }))) => { + for activated in activated { + let sender: Context::Sender = ctx.sender().clone(); + jobs.spawn_job::( + activated.hash, + activated.span, + run_args.clone(), + metrics.clone(), + sender, + ) + } + + for hash in deactivated { + jobs.stop_job(hash).await; + } + } + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => { + jobs.running.clear(); + break; + } + Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(..))) => {} + Ok(FromOverseer::Communication { msg }) => { + if let Ok(to_job) = ::try_from(msg) { + jobs.send_msg(to_job.relay_parent(), to_job).await; + } + } + Err(err) => { + tracing::error!( + job = Job::NAME, + err = ?err, + "error receiving message from subsystem context for job", + ); + break; + } + } + } outgoing = jobs.next() => { - if let Err(e) = Self::handle_from_job(outgoing, &mut ctx).await { + let res = match outgoing.expect("the Jobs stream never ends; qed") { + FromJobCommand::Spawn(name, task) => ctx.spawn(name, task).await, + FromJobCommand::SpawnBlocking(name, task) + => ctx.spawn_blocking(name, task).await, + }; + + if let Err(e) = res { tracing::warn!(err = ?e, "failed to handle command from job"); } } @@ -741,97 +781,9 @@ where } } } - - /// Forward a given error to the higher context using the given error channel. - async fn fwd_err( - hash: Option, - err: JobsError, - err_tx: &mut Option, JobsError)>>, - ) { - if let Some(err_tx) = err_tx { - // if we can't send on the error transmission channel, we can't do anything useful about it - // still, we can at least log the failure - if let Err(e) = err_tx.send((hash, err)).await { - tracing::warn!(err = ?e, "failed to forward error"); - } - } - } - - /// Handle an incoming message. - /// - /// Returns `true` when this job manager should shutdown. - async fn handle_incoming( - incoming: SubsystemResult>, - jobs: &mut Jobs, - run_args: &Job::RunArgs, - metrics: &Job::Metrics, - err_tx: &mut Option, JobsError)>>, - ) -> bool { - use polkadot_node_subsystem::ActiveLeavesUpdate; - use polkadot_node_subsystem::FromOverseer::{Communication, Signal}; - use polkadot_node_subsystem::OverseerSignal::{ActiveLeaves, BlockFinalized, Conclude}; - - match incoming { - Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { - activated, - deactivated, - }))) => { - for activated in activated { - let metrics = metrics.clone(); - if let Err(e) = jobs.spawn_job(activated.hash, activated.span, run_args.clone(), metrics) { - tracing::error!( - job = Job::NAME, - err = ?e, - "failed to spawn a job", - ); - Self::fwd_err(Some(activated.hash), JobsError::Utility(e), err_tx).await; - return true; - } - } - - for hash in deactivated { - jobs.stop_job(hash).await; - } - } - Ok(Signal(Conclude)) => { - jobs.running.clear(); - return true; - } - Ok(Communication { msg }) => { - if let Ok(to_job) = ::try_from(msg) { - jobs.send_msg(to_job.relay_parent(), to_job).await; - } - } - Ok(Signal(BlockFinalized(..))) => {} - Err(err) => { - tracing::error!( - job = Job::NAME, - err = ?err, - "error receiving message from subsystem context for job", - ); - Self::fwd_err(None, JobsError::Utility(Error::from(err)), err_tx).await; - return true; - } - } - false - } - - // handle a command from a job. - async fn handle_from_job( - outgoing: Option, - ctx: &mut Context, - ) -> SubsystemResult<()> { - match outgoing.expect("the Jobs stream never ends; qed") { - FromJobCommand::SendMessage(msg) => ctx.send_message(msg).await, - FromJobCommand::Spawn(name, task) => ctx.spawn(name, task).await?, - FromJobCommand::SpawnBlocking(name, task) => ctx.spawn_blocking(name, task).await?, - } - - Ok(()) - } } -impl Subsystem for JobManager +impl Subsystem for JobSubsystem where Spawner: SpawnNamed + Send + Clone + Unpin + 'static, Context: SubsystemContext, @@ -841,13 +793,8 @@ where Job::Metrics: Sync, { fn start(self, ctx: Context) -> SpawnedSubsystem { - let spawner = self.spawner.clone(); - let run_args = self.run_args.clone(); - let metrics = self.metrics.clone(); - let errors = self.errors; - let future = Box::pin(async move { - Self::run(ctx, run_args, metrics, spawner, errors).await; + self.run(ctx).await; Ok(()) }); @@ -858,90 +805,6 @@ where } } -/// Create a delegated subsystem -/// -/// It is possible to create a type which implements `Subsystem` by simply doing: -/// -/// ```ignore -/// pub type ExampleSubsystem = JobManager; -/// ``` -/// -/// However, doing this requires that job itself and all types which comprise it (i.e. `ToJob`, `FromJob`, `Error`, `RunArgs`) -/// are public, to avoid exposing private types in public interfaces. It's possible to delegate instead, which -/// can reduce the total number of public types exposed, i.e. -/// -/// ```ignore -/// type Manager = JobManager; -/// pub struct ExampleSubsystem { -/// manager: Manager, -/// } -/// -/// impl Subsystem for ExampleSubsystem { ... } -/// ``` -/// -/// This dramatically reduces the number of public types in the crate; the only things which must be public are now -/// -/// - `struct ExampleSubsystem` (defined by this macro) -/// - `type ToJob` (because it appears in a trait bound) -/// - `type RunArgs` (because it appears in a function signature) -/// -/// Implementing this all manually is of course possible, but it's tedious; why bother? This macro exists for -/// the purpose of doing it automatically: -/// -/// ```ignore -/// delegated_subsystem!(ExampleJob(ExampleRunArgs) <- ExampleToJob as ExampleSubsystem); -/// ``` -#[macro_export] -macro_rules! delegated_subsystem { - ($job:ident($run_args:ty, $metrics:ty) <- $to_job:ty as $subsystem:ident) => { - delegated_subsystem!($job($run_args, $metrics) <- $to_job as $subsystem; stringify!($subsystem)); - }; - - ($job:ident($run_args:ty, $metrics:ty) <- $to_job:ty as $subsystem:ident; $subsystem_name:expr) => { - #[doc = "Manager type for the "] - #[doc = $subsystem_name] - type Manager = $crate::JobManager; - - #[doc = "An implementation of the "] - #[doc = $subsystem_name] - pub struct $subsystem { - manager: Manager, - } - - impl $subsystem - where - Spawner: Clone + $crate::reexports::SpawnNamed + Send + Unpin, - Context: $crate::reexports::SubsystemContext, - $to_job: From<::Message>, - { - #[doc = "Creates a new "] - #[doc = $subsystem_name] - pub fn new(spawner: Spawner, run_args: $run_args, metrics: $metrics) -> Self { - $subsystem { - manager: $crate::JobManager::new(spawner, run_args, metrics) - } - } - - /// Run this subsystem - #[tracing::instrument(skip(ctx, run_args, metrics, spawner), fields(subsystem = $subsystem_name))] - pub async fn run(ctx: Context, run_args: $run_args, metrics: $metrics, spawner: Spawner) { - >::run(ctx, run_args, metrics, spawner, None).await - } - } - - impl $crate::reexports::Subsystem for $subsystem - where - Spawner: $crate::reexports::SpawnNamed + Send + Clone + Unpin + 'static, - Context: $crate::reexports::SubsystemContext, - $to_job: From<::Message>, - { - fn start(self, ctx: Context) -> $crate::reexports::SpawnedSubsystem { - self.manager.start(ctx) - } - } - }; -} - /// A future that wraps another future with a `Delay` allowing for time-limited futures. #[pin_project] pub struct Timeout { @@ -1088,24 +951,22 @@ mod tests { /// Run a job for the parent block indicated // // this function is in charge of creating and executing the job's main loop - fn run( + fn run( _: Hash, _: Arc, run_args: Self::RunArgs, _metrics: Self::Metrics, receiver: mpsc::Receiver, - mut sender: mpsc::Sender, + mut sender: JobSender, ) -> Pin> + Send>> { async move { let job = FakeCandidateSelectionJob { receiver }; if run_args { - sender.send(FromJobCommand::SendMessage( - CandidateSelectionMessage::Invalid( - Default::default(), - Default::default(), - ).into(), - )).await?; + sender.send_message(CandidateSelectionMessage::Invalid( + Default::default(), + Default::default(), + ).into()).await; } // it isn't necessary to break run_loop into its own function, @@ -1132,15 +993,15 @@ mod tests { } // with the job defined, it's straightforward to get a subsystem implementation. - type FakeCandidateSelectionSubsystem = - JobManager; + type FakeCandidateSelectionSubsystem = + JobSubsystem; // this type lets us pretend to be the overseer type OverseerHandle = test_helpers::TestSubsystemContextHandle; fn test_harness>( run_args: bool, - test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option, JobsError)>) -> T, + test: impl FnOnce(OverseerHandle) -> T, ) { let _ = env_logger::builder() .is_test(true) @@ -1152,10 +1013,13 @@ mod tests { let pool = sp_core::testing::TaskExecutor::new(); let (context, overseer_handle) = make_subsystem_context(pool.clone()); - let (err_tx, err_rx) = mpsc::channel(16); - let subsystem = FakeCandidateSelectionSubsystem::run(context, run_args, (), pool, Some(err_tx)); - let test_future = test(overseer_handle, err_rx); + let subsystem = FakeCandidateSelectionSubsystem::new( + pool, + run_args, + (), + ).run(context); + let test_future = test(overseer_handle); futures::pin_mut!(subsystem, test_future); @@ -1171,7 +1035,7 @@ mod tests { fn starting_and_stopping_job_works() { let relay_parent: Hash = [0; 32].into(); - test_harness(true, |mut overseer_handle, err_rx| async move { + test_harness(true, |mut overseer_handle| async move { overseer_handle .send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( ActiveLeavesUpdate::start_work(ActivatedLeaf { @@ -1194,9 +1058,6 @@ mod tests { overseer_handle .send(FromOverseer::Signal(OverseerSignal::Conclude)) .await; - - let errs: Vec<_> = err_rx.collect().await; - assert_eq!(errs.len(), 0); }); } @@ -1204,7 +1065,7 @@ mod tests { fn sending_to_a_non_running_job_do_not_stop_the_subsystem() { let relay_parent = Hash::repeat_byte(0x01); - test_harness(true, |mut overseer_handle, err_rx| async move { + test_harness(true, |mut overseer_handle| async move { overseer_handle .send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( ActiveLeavesUpdate::start_work(ActivatedLeaf { @@ -1231,9 +1092,6 @@ mod tests { overseer_handle .send(FromOverseer::Signal(OverseerSignal::Conclude)) .await; - - let errs: Vec<_> = err_rx.collect().await; - assert_eq!(errs.len(), 0); }); }