Split NetworkBridge and break cycles with Unbounded (#2736)

* overseer: pass messages directly between subsystems

* test that message is held on to

* Update node/overseer/src/lib.rs

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* give every subsystem an unbounded sender too

* remove metered_channel::name

1. we don't provide good names
2. these names are never used anywhere

* unused mut

* remove unnecessary &mut

* subsystem unbounded_send

* remove unused MaybeTimer

We have channel size metrics that serve the same purpose better now and the implementation of message timing was pretty ugly.

* remove comment

* split up senders and receivers

* update metrics

* fix tests

* fix test subsystem context

* use SubsystemSender in jobs system now

* refactor of awful jobs code

* expose public `run` on JobSubsystem

* update candidate backing to new jobs & use unbounded

* bitfield signing

* candidate-selection

* provisioner

* approval voting: send unbounded for assignment/approvals

* async not needed

* begin bridge split

* split up network tasks into background worker

* port over network bridge

* Update node/network/bridge/src/lib.rs

Co-authored-by: Andronik Ordian <write@reusable.software>

* rename ValidationWorkerNotifications

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
Robert Habermeier
2021-03-29 01:18:53 +02:00
committed by GitHub
parent 6f464a360f
commit 8ebbe19d10
16 changed files with 1191 additions and 1346 deletions
+3
View File
@@ -5632,6 +5632,7 @@ version = "0.1.0"
dependencies = [
"futures 0.3.13",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-keystore",
@@ -5647,6 +5648,7 @@ dependencies = [
"futures 0.3.13",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-core",
@@ -5720,6 +5722,7 @@ dependencies = [
"futures 0.3.13",
"futures-timer 3.0.2",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-application-crypto",
+12 -10
View File
@@ -564,10 +564,10 @@ async fn handle_actions(
let block_hash = indirect_cert.block_hash;
let validator_index = indirect_cert.validator;
ctx.send_message(ApprovalDistributionMessage::DistributeAssignment(
ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment(
indirect_cert,
candidate_index,
).into()).await;
).into());
launch_approval(
ctx,
@@ -712,7 +712,7 @@ async fn handle_background_request(
) -> SubsystemResult<Vec<Action>> {
match request {
BackgroundRequest::ApprovalVote(vote_request) => {
issue_approval(ctx, state, metrics, vote_request).await
issue_approval(ctx, state, metrics, vote_request)
}
BackgroundRequest::CandidateValidation(
validation_data,
@@ -1724,7 +1724,7 @@ async fn launch_approval(
// Issue and import a local approval vote. Should only be invoked after approval checks
// have been done.
async fn issue_approval(
fn issue_approval(
ctx: &mut impl SubsystemContext,
state: &State<impl DBReader>,
metrics: &Metrics,
@@ -1830,12 +1830,14 @@ async fn issue_approval(
metrics.on_approval_produced();
// dispatch to approval distribution.
ctx.send_message(ApprovalDistributionMessage::DistributeApproval(IndirectSignedApprovalVote {
block_hash,
candidate_index: candidate_index as _,
validator: validator_index,
signature: sig,
}).into()).await;
ctx.send_unbounded_message(
ApprovalDistributionMessage::DistributeApproval(IndirectSignedApprovalVote {
block_hash,
candidate_index: candidate_index as _,
validator: validator_index,
signature: sig,
}
).into());
Ok(actions)
}
+146 -145
View File
@@ -35,7 +35,8 @@ use polkadot_node_primitives::{
Statement, SignedFullStatement, ValidationResult,
};
use polkadot_subsystem::{
PerLeafSpan, Stage, jaeger,
PerLeafSpan, Stage, SubsystemSender,
jaeger,
messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage,
CandidateBackingMessage, CandidateSelectionMessage, CandidateValidationMessage,
@@ -50,8 +51,8 @@ use polkadot_node_subsystem_util::{
request_validators,
request_from_runtime,
Validator,
delegated_subsystem,
FromJobCommand,
JobSender,
metrics::{self, prometheus},
};
use statement_table::{
@@ -68,8 +69,9 @@ use thiserror::Error;
const LOG_TARGET: &str = "parachain::candidate-backing";
/// Errors that can occur in candidate backing.
#[derive(Debug, Error)]
enum Error {
pub enum Error {
#[error("Candidate is not found")]
CandidateNotFound,
#[error("Signature is invalid")]
@@ -142,11 +144,9 @@ impl ValidatedCandidateCommand {
}
/// Holds all data needed for candidate backing job operation.
struct CandidateBackingJob {
pub struct CandidateBackingJob {
/// The hash of the relay parent on top of which this job is doing it's work.
parent: Hash,
/// Outbound message channel sending part.
tx_from: mpsc::Sender<FromJobCommand>,
/// The `ParaId` assigned to this validator
assignment: Option<ParaId>,
/// The collator required to author the candidate, if any.
@@ -294,23 +294,20 @@ fn table_attested_to_backed(
}
async fn store_available_data(
tx_from: &mut mpsc::Sender<FromJobCommand>,
sender: &mut JobSender<impl SubsystemSender>,
id: Option<ValidatorIndex>,
n_validators: u32,
candidate_hash: CandidateHash,
available_data: AvailableData,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
tx_from.send(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreAvailableData(
candidate_hash,
id,
n_validators,
available_data,
tx,
)
).into()
).await?;
sender.send_message(AvailabilityStoreMessage::StoreAvailableData(
candidate_hash,
id,
n_validators,
available_data,
tx,
).into()).await;
let _ = rx.await.map_err(Error::StoreAvailableData)?;
@@ -321,9 +318,9 @@ async fn store_available_data(
//
// This will compute the erasure root internally and compare it to the expected erasure root.
// This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`.
#[tracing::instrument(level = "trace", skip(tx_from, pov, span), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(sender, pov, span), fields(subsystem = LOG_TARGET))]
async fn make_pov_available(
tx_from: &mut mpsc::Sender<FromJobCommand>,
sender: &mut JobSender<impl SubsystemSender>,
validator_index: Option<ValidatorIndex>,
n_validators: usize,
pov: Arc<PoV>,
@@ -361,7 +358,7 @@ async fn make_pov_available(
);
store_available_data(
tx_from,
sender,
validator_index,
n_validators as u32,
candidate_hash,
@@ -373,7 +370,7 @@ async fn make_pov_available(
}
async fn request_pov(
tx_from: &mut mpsc::Sender<FromJobCommand>,
sender: &mut JobSender<impl SubsystemSender>,
relay_parent: Hash,
from_validator: ValidatorIndex,
candidate_hash: CandidateHash,
@@ -381,35 +378,33 @@ async fn request_pov(
) -> Result<Arc<PoV>, Error> {
let (tx, rx) = oneshot::channel();
tx_from.send(FromJobCommand::SendMessage(AllMessages::AvailabilityDistribution(
AvailabilityDistributionMessage::FetchPoV {
relay_parent,
from_validator,
candidate_hash,
pov_hash,
tx,
}
))).await?;
sender.send_message(AvailabilityDistributionMessage::FetchPoV {
relay_parent,
from_validator,
candidate_hash,
pov_hash,
tx,
}.into()).await;
let pov = rx.await.map_err(|_| Error::FetchPoV)?;
Ok(Arc::new(pov))
}
async fn request_candidate_validation(
tx_from: &mut mpsc::Sender<FromJobCommand>,
sender: &mut JobSender<impl SubsystemSender>,
candidate: CandidateDescriptor,
pov: Arc<PoV>,
) -> Result<ValidationResult, Error> {
let (tx, rx) = oneshot::channel();
tx_from.send(AllMessages::CandidateValidation(
sender.send_message(AllMessages::CandidateValidation(
CandidateValidationMessage::ValidateFromChainState(
candidate,
pov,
tx,
)
).into()
).await?;
).await;
match rx.await {
Ok(Ok(validation_result)) => Ok(validation_result),
@@ -420,8 +415,8 @@ async fn request_candidate_validation(
type BackgroundValidationResult = Result<(CandidateReceipt, CandidateCommitments, Arc<PoV>), CandidateReceipt>;
struct BackgroundValidationParams<F> {
tx_from: mpsc::Sender<FromJobCommand>,
struct BackgroundValidationParams<S, F> {
sender: JobSender<S>,
tx_command: mpsc::Sender<ValidatedCandidateCommand>,
candidate: CandidateReceipt,
relay_parent: Hash,
@@ -433,10 +428,13 @@ struct BackgroundValidationParams<F> {
}
async fn validate_and_make_available(
params: BackgroundValidationParams<impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync>,
params: BackgroundValidationParams<
impl SubsystemSender,
impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
>
) -> Result<(), Error> {
let BackgroundValidationParams {
mut tx_from,
mut sender,
mut tx_command,
candidate,
relay_parent,
@@ -456,12 +454,12 @@ async fn validate_and_make_available(
} => {
let _span = span.as_ref().map(|s| s.child("request-pov"));
match request_pov(
&mut tx_from,
relay_parent,
from_validator,
candidate_hash,
pov_hash,
).await {
&mut sender,
relay_parent,
from_validator,
candidate_hash,
pov_hash,
).await {
Err(Error::FetchPoV) => {
tx_command.send(ValidatedCandidateCommand::AttestNoPoV(candidate.hash())).await.map_err(Error::Mpsc)?;
return Ok(())
@@ -478,7 +476,7 @@ async fn validate_and_make_available(
.with_pov(&pov)
.with_para_id(candidate.descriptor().para_id)
});
request_candidate_validation(&mut tx_from, candidate.descriptor.clone(), pov.clone()).await?
request_candidate_validation(&mut sender, candidate.descriptor.clone(), pov.clone()).await?
};
let expected_commitments_hash = candidate.commitments_hash;
@@ -502,7 +500,7 @@ async fn validate_and_make_available(
Err(candidate)
} else {
let erasure_valid = make_pov_available(
&mut tx_from,
&mut sender,
validator_index,
n_validators,
pov.clone(),
@@ -544,6 +542,7 @@ impl CandidateBackingJob {
/// Run asynchronously.
async fn run_loop(
mut self,
mut sender: JobSender<impl SubsystemSender>,
mut rx_to: mpsc::Receiver<CandidateBackingMessage>,
span: PerLeafSpan,
) -> Result<(), Error> {
@@ -552,7 +551,7 @@ impl CandidateBackingJob {
validated_command = self.background_validation.next() => {
let _span = span.child("process-validation-result");
if let Some(c) = validated_command {
self.handle_validated_candidate_command(&span, c).await?;
self.handle_validated_candidate_command(&span, &mut sender, c).await?;
} else {
panic!("`self` hasn't dropped and `self` holds a reference to this sender; qed");
}
@@ -563,7 +562,7 @@ impl CandidateBackingJob {
// we intentionally want spans created in `process_msg` to descend from the
// `span ` which is longer-lived than this ephemeral timing span.
let _timing_span = span.child("process-message");
self.process_msg(&span, msg).await?;
self.process_msg(&span, &mut sender, msg).await?;
}
}
}
@@ -572,10 +571,11 @@ impl CandidateBackingJob {
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self, parent_span, sender), fields(subsystem = LOG_TARGET))]
async fn handle_validated_candidate_command(
&mut self,
parent_span: &jaeger::Span,
sender: &mut JobSender<impl SubsystemSender>,
command: ValidatedCandidateCommand,
) -> Result<(), Error> {
let candidate_hash = command.candidate_hash();
@@ -596,15 +596,20 @@ impl CandidateBackingJob {
commitments,
});
if let Some(stmt) = self.sign_import_and_distribute_statement(
sender,
statement,
parent_span,
).await? {
self.issue_candidate_seconded_message(stmt).await?;
sender.send_message(
CandidateSelectionMessage::Seconded(self.parent, stmt).into()
).await;
}
}
}
Err(candidate) => {
self.issue_candidate_invalid_message(candidate).await?;
sender.send_message(
CandidateSelectionMessage::Invalid(self.parent, candidate).into()
).await;
}
}
}
@@ -615,7 +620,7 @@ impl CandidateBackingJob {
if !self.issued_statements.contains(&candidate_hash) {
if res.is_ok() {
let statement = Statement::Valid(candidate_hash);
self.sign_import_and_distribute_statement(statement, &parent_span).await?;
self.sign_import_and_distribute_statement(sender, statement, &parent_span).await?;
}
self.issued_statements.insert(candidate_hash);
}
@@ -627,7 +632,7 @@ impl CandidateBackingJob {
// Ok, another try:
let c_span = span.as_ref().map(|s| s.child("try"));
let attesting = attesting.clone();
self.kick_off_validation_work(attesting, c_span).await?
self.kick_off_validation_work(sender, attesting, c_span).await?
}
} else {
@@ -643,10 +648,12 @@ impl CandidateBackingJob {
Ok(())
}
#[tracing::instrument(level = "trace", skip(self, params), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self, sender, params), fields(subsystem = LOG_TARGET))]
async fn background_validate_and_make_available(
&mut self,
sender: &mut JobSender<impl SubsystemSender>,
params: BackgroundValidationParams<
impl SubsystemSender,
impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync
>,
) -> Result<(), Error> {
@@ -658,36 +665,19 @@ impl CandidateBackingJob {
tracing::error!(target: LOG_TARGET, "Failed to validate and make available: {:?}", e);
}
};
self.tx_from.send(FromJobCommand::Spawn("Backing Validation", bg.boxed())).await?;
sender.send_command(FromJobCommand::Spawn("Backing Validation", bg.boxed())).await?;
}
Ok(())
}
async fn issue_candidate_invalid_message(
&mut self,
candidate: CandidateReceipt,
) -> Result<(), Error> {
self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Invalid(self.parent, candidate)).into()).await?;
Ok(())
}
async fn issue_candidate_seconded_message(
&mut self,
statement: SignedFullStatement,
) -> Result<(), Error> {
self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Seconded(self.parent, statement)).into()).await?;
Ok(())
}
/// Kick off background validation with intent to second.
#[tracing::instrument(level = "trace", skip(self, parent_span, pov), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self, parent_span, sender, pov), fields(subsystem = LOG_TARGET))]
async fn validate_and_second(
&mut self,
parent_span: &jaeger::Span,
root_span: &jaeger::Span,
sender: &mut JobSender<impl SubsystemSender>,
candidate: &CandidateReceipt,
pov: Arc<PoV>,
) -> Result<(), Error> {
@@ -695,7 +685,9 @@ impl CandidateBackingJob {
if self.required_collator.as_ref()
.map_or(false, |c| c != &candidate.descriptor().collator)
{
self.issue_candidate_invalid_message(candidate.clone()).await?;
sender.send_message(
CandidateSelectionMessage::Invalid(self.parent, candidate.clone()).into()
).await;
return Ok(());
}
@@ -715,29 +707,36 @@ impl CandidateBackingJob {
"Validate and second candidate",
);
self.background_validate_and_make_available(BackgroundValidationParams {
tx_from: self.tx_from.clone(),
tx_command: self.background_validation_tx.clone(),
candidate: candidate.clone(),
relay_parent: self.parent,
pov: PoVData::Ready(pov),
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
n_validators: self.table_context.validators.len(),
span,
make_command: ValidatedCandidateCommand::Second,
}).await?;
let bg_sender = sender.clone();
self.background_validate_and_make_available(
sender,
BackgroundValidationParams {
sender: bg_sender,
tx_command: self.background_validation_tx.clone(),
candidate: candidate.clone(),
relay_parent: self.parent,
pov: PoVData::Ready(pov),
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
n_validators: self.table_context.validators.len(),
span,
make_command: ValidatedCandidateCommand::Second,
}
).await?;
Ok(())
}
async fn sign_import_and_distribute_statement(
&mut self,
sender: &mut JobSender<impl SubsystemSender>,
statement: Statement,
parent_span: &jaeger::Span,
) -> Result<Option<SignedFullStatement>, Error> {
if let Some(signed_statement) = self.sign_statement(statement).await {
self.import_statement(&signed_statement, parent_span).await?;
self.distribute_signed_statement(signed_statement.clone()).await?;
self.import_statement(sender, &signed_statement, parent_span).await?;
let smsg = StatementDistributionMessage::Share(self.parent, signed_statement.clone());
sender.send_unbounded_message(smsg.into());
Ok(Some(signed_statement))
} else {
Ok(None)
@@ -745,26 +744,25 @@ impl CandidateBackingJob {
}
/// Check if there have happened any new misbehaviors and issue necessary messages.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn issue_new_misbehaviors(&mut self) -> Result<(), Error> {
#[tracing::instrument(level = "trace", skip(self, sender), fields(subsystem = LOG_TARGET))]
async fn issue_new_misbehaviors(&mut self, sender: &mut JobSender<impl SubsystemSender>) {
// collect the misbehaviors to avoid double mutable self borrow issues
let misbehaviors: Vec<_> = self.table.drain_misbehaviors().collect();
for (validator_id, report) in misbehaviors {
self.send_to_provisioner(
sender.send_message(
ProvisionerMessage::ProvisionableData(
self.parent,
ProvisionableData::MisbehaviorReport(self.parent, validator_id, report)
)
).await?
).into()
).await;
}
Ok(())
}
/// Import a statement into the statement table and return the summary of the import.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self, sender), fields(subsystem = LOG_TARGET))]
async fn import_statement(
&mut self,
sender: &mut JobSender<impl SubsystemSender>,
statement: &SignedFullStatement,
parent_span: &jaeger::Span,
) -> Result<Option<TableSummary>, Error> {
@@ -807,7 +805,7 @@ impl CandidateBackingJob {
self.parent,
ProvisionableData::BackedCandidate(backed.receipt()),
);
self.send_to_provisioner(message).await?;
sender.send_message(message.into()).await;
span.as_ref().map(|s| s.child("backed"));
span
@@ -821,7 +819,7 @@ impl CandidateBackingJob {
None
};
self.issue_new_misbehaviors().await?;
self.issue_new_misbehaviors(sender).await;
// It is important that the child span is dropped before its parent span (`unbacked_span`)
drop(import_statement_span);
@@ -830,8 +828,13 @@ impl CandidateBackingJob {
Ok(summary)
}
#[tracing::instrument(level = "trace", skip(self, root_span), fields(subsystem = LOG_TARGET))]
async fn process_msg(&mut self, root_span: &jaeger::Span, msg: CandidateBackingMessage) -> Result<(), Error> {
#[tracing::instrument(level = "trace", skip(self, root_span, sender), fields(subsystem = LOG_TARGET))]
async fn process_msg(
&mut self,
root_span: &jaeger::Span,
sender: &mut JobSender<impl SubsystemSender>,
msg: CandidateBackingMessage,
) -> Result<(), Error> {
match msg {
CandidateBackingMessage::Second(relay_parent, candidate, pov) => {
let _timer = self.metrics.time_process_second();
@@ -856,7 +859,7 @@ impl CandidateBackingJob {
if !self.issued_statements.contains(&candidate_hash) {
let pov = Arc::new(pov);
self.validate_and_second(&span, &root_span, &candidate, pov).await?;
self.validate_and_second(&span, &root_span, sender, &candidate, pov).await?;
}
}
}
@@ -868,7 +871,7 @@ impl CandidateBackingJob {
.with_relay_parent(_relay_parent);
self.check_statement_signature(&statement)?;
match self.maybe_validate_and_import(&span, &root_span, statement).await {
match self.maybe_validate_and_import(&span, &root_span, sender, statement).await {
Err(Error::ValidationFailed(_)) => return Ok(()),
Err(e) => return Err(e),
Ok(()) => (),
@@ -893,9 +896,10 @@ impl CandidateBackingJob {
}
/// Kick off validation work and distribute the result as a signed statement.
#[tracing::instrument(level = "trace", skip(self, attesting, span), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self, sender, attesting, span), fields(subsystem = LOG_TARGET))]
async fn kick_off_validation_work(
&mut self,
sender: &mut JobSender<impl SubsystemSender>,
attesting: AttestingData,
span: Option<jaeger::Span>,
) -> Result<(), Error> {
@@ -925,34 +929,38 @@ impl CandidateBackingJob {
return Ok(());
}
let bg_sender = sender.clone();
let pov = PoVData::FetchFromValidator {
from_validator: attesting.from_validator,
candidate_hash,
pov_hash: attesting.pov_hash,
};
self.background_validate_and_make_available(BackgroundValidationParams {
tx_from: self.tx_from.clone(),
tx_command: self.background_validation_tx.clone(),
candidate: attesting.candidate,
relay_parent: self.parent,
pov,
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
n_validators: self.table_context.validators.len(),
span,
make_command: ValidatedCandidateCommand::Attest,
}).await
self.background_validate_and_make_available(
sender,
BackgroundValidationParams {
sender: bg_sender,
tx_command: self.background_validation_tx.clone(),
candidate: attesting.candidate,
relay_parent: self.parent,
pov,
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
n_validators: self.table_context.validators.len(),
span,
make_command: ValidatedCandidateCommand::Attest,
},
).await
}
/// Import the statement and kick off validation work if it is a part of our assignment.
#[tracing::instrument(level = "trace", skip(self, parent_span), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self, parent_span, root_span, sender), fields(subsystem = LOG_TARGET))]
async fn maybe_validate_and_import(
&mut self,
parent_span: &jaeger::Span,
root_span: &jaeger::Span,
sender: &mut JobSender<impl SubsystemSender>,
statement: SignedFullStatement,
) -> Result<(), Error> {
if let Some(summary) = self.import_statement(&statement, parent_span).await? {
if let Some(summary) = self.import_statement(sender, &statement, parent_span).await? {
if Some(summary.group_id) != self.assignment {
return Ok(())
}
@@ -989,7 +997,7 @@ impl CandidateBackingJob {
attesting.backing.push(statement.validator_index());
return Ok(())
} else {
// No job, so start another try with current validator:
// No job, so start another with current validator:
attesting.from_validator = statement.validator_index();
(attesting.clone(), span.as_ref().map(|s| s.child("try")))
}
@@ -1000,6 +1008,7 @@ impl CandidateBackingJob {
};
self.kick_off_validation_work(
sender,
attesting,
span,
).await?;
@@ -1089,20 +1098,6 @@ impl CandidateBackingJob {
fn remove_unbacked_span(&mut self, hash: &CandidateHash) -> Option<jaeger::Span> {
self.unbacked_candidates.remove(hash)
}
async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> {
self.tx_from.send(AllMessages::from(msg).into()).await?;
Ok(())
}
async fn distribute_signed_statement(&mut self, s: SignedFullStatement) -> Result<(), Error> {
let smsg = StatementDistributionMessage::Share(self.parent, s);
self.tx_from.send(AllMessages::from(smsg).into()).await?;
Ok(())
}
}
impl util::JobTrait for CandidateBackingJob {
@@ -1113,14 +1108,14 @@ impl util::JobTrait for CandidateBackingJob {
const NAME: &'static str = "CandidateBackingJob";
#[tracing::instrument(skip(span, keystore, metrics, rx_to, tx_from), fields(subsystem = LOG_TARGET))]
fn run(
#[tracing::instrument(skip(span, keystore, metrics, rx_to, sender), fields(subsystem = LOG_TARGET))]
fn run<S: SubsystemSender>(
parent: Hash,
span: Arc<jaeger::Span>,
keystore: SyncCryptoStorePtr,
metrics: Metrics,
rx_to: mpsc::Receiver<Self::ToJob>,
mut tx_from: mpsc::Sender<FromJobCommand>,
mut sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
macro_rules! try_runtime_api {
@@ -1147,14 +1142,14 @@ impl util::JobTrait for CandidateBackingJob {
let _span = span.child("runtime-apis");
let (validators, groups, session_index, cores) = futures::try_join!(
try_runtime_api!(request_validators(parent, &mut tx_from).await),
try_runtime_api!(request_validator_groups(parent, &mut tx_from).await),
try_runtime_api!(request_session_index_for_child(parent, &mut tx_from).await),
try_runtime_api!(request_from_runtime(
request_validators(parent, &mut sender).await,
request_validator_groups(parent, &mut sender).await,
request_session_index_for_child(parent, &mut sender).await,
request_from_runtime(
parent,
&mut tx_from,
&mut sender,
|tx| RuntimeApiRequest::AvailabilityCores(tx),
).await),
).await,
).map_err(Error::JoinMultiple)?;
let validators = try_runtime_api!(validators);
@@ -1231,7 +1226,6 @@ impl util::JobTrait for CandidateBackingJob {
let (background_tx, background_rx) = mpsc::channel(16);
let job = CandidateBackingJob {
parent,
tx_from,
assignment,
required_collator,
issued_statements: HashSet::new(),
@@ -1249,7 +1243,7 @@ impl util::JobTrait for CandidateBackingJob {
};
drop(_span);
job.run_loop(rx_to, span).await
job.run_loop(sender, rx_to, span).await
}.boxed()
}
}
@@ -1345,7 +1339,9 @@ impl metrics::Metrics for Metrics {
}
}
delegated_subsystem!(CandidateBackingJob(SyncCryptoStorePtr, Metrics) <- CandidateBackingMessage as CandidateBackingSubsystem);
/// The candidate backing subsystem.
pub type CandidateBackingSubsystem<Spawner>
= polkadot_node_subsystem_util::JobSubsystem<CandidateBackingJob, Spawner>;
#[cfg(test)]
mod tests {
@@ -1363,6 +1359,7 @@ mod tests {
use sp_keystore::{CryptoStore, SyncCryptoStore};
use statement_table::v1::Misbehavior;
use std::collections::HashMap;
use sp_tracing as _;
fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
val_ids.iter().map(|v| v.public().into()).collect()
@@ -1479,7 +1476,11 @@ mod tests {
let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool.clone());
let subsystem = CandidateBackingSubsystem::run(context, keystore, Metrics(None), pool.clone());
let subsystem = CandidateBackingSubsystem::new(
pool.clone(),
keystore,
Metrics(None),
).run(context);
let test_fut = test(TestHarness {
virtual_overseer,
@@ -2670,7 +2671,7 @@ mod tests {
// Test whether we retry on failed PoV fetching.
#[test]
fn retry_works() {
sp_tracing::try_init_simple();
// sp_tracing::try_init_simple();
let test_state = TestState::default();
test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
@@ -13,3 +13,6 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
wasm-timer = "0.2.5"
thiserror = "1.0.23"
[dev-dependencies]
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
+35 -34
View File
@@ -23,15 +23,16 @@
use futures::{channel::{mpsc, oneshot}, lock::Mutex, prelude::*, future, Future};
use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
use polkadot_node_subsystem::{
jaeger, PerLeafSpan,
jaeger, PerLeafSpan, SubsystemSender,
messages::{
AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage,
AvailabilityStoreMessage, BitfieldDistributionMessage,
BitfieldSigningMessage, RuntimeApiMessage, RuntimeApiRequest,
},
errors::RuntimeApiError,
};
use polkadot_node_subsystem_util::{
self as util, JobManager, JobTrait, Validator, FromJobCommand, metrics::{self, prometheus},
self as util, JobSubsystem, JobTrait, Validator, metrics::{self, prometheus},
JobSender,
};
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
use std::{pin::Pin, time::Duration, iter::FromIterator, sync::Arc};
@@ -73,7 +74,7 @@ pub enum Error {
async fn get_core_availability(
core: &CoreState,
validator_idx: ValidatorIndex,
sender: &Mutex<&mut mpsc::Sender<FromJobCommand>>,
sender: &Mutex<&mut impl SubsystemSender>,
span: &jaeger::Span,
) -> Result<bool, Error> {
if let &CoreState::Occupied(ref core) = core {
@@ -83,14 +84,14 @@ async fn get_core_availability(
sender
.lock()
.await
.send(
AllMessages::from(AvailabilityStoreMessage::QueryChunkAvailability(
.send_message(
AvailabilityStoreMessage::QueryChunkAvailability(
core.candidate_hash,
validator_idx,
tx,
)).into(),
).into(),
)
.await?;
.await;
let res = rx.await.map_err(Into::into);
@@ -111,12 +112,15 @@ async fn get_core_availability(
/// delegates to the v1 runtime API
async fn get_availability_cores(
relay_parent: Hash,
sender: &mut mpsc::Sender<FromJobCommand>,
sender: &mut impl SubsystemSender,
) -> Result<Vec<CoreState>, Error> {
let (tx, rx) = oneshot::channel();
sender
.send(AllMessages::from(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::AvailabilityCores(tx))).into())
.await?;
.send_message(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::AvailabilityCores(tx),
).into())
.await;
match rx.await {
Ok(Ok(out)) => Ok(out),
Ok(Err(runtime_err)) => Err(runtime_err.into()),
@@ -133,7 +137,7 @@ async fn construct_availability_bitfield(
relay_parent: Hash,
span: &jaeger::Span,
validator_idx: ValidatorIndex,
sender: &mut mpsc::Sender<FromJobCommand>,
sender: &mut impl SubsystemSender,
) -> Result<AvailabilityBitfield, Error> {
// get the set of availability cores from the runtime
let availability_cores = {
@@ -223,13 +227,13 @@ impl JobTrait for BitfieldSigningJob {
/// Run a job for the parent block indicated
#[tracing::instrument(skip(span, keystore, metrics, _receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
fn run<S: SubsystemSender>(
relay_parent: Hash,
span: Arc<jaeger::Span>,
keystore: Self::RunArgs,
metrics: Self::Metrics,
_receiver: mpsc::Receiver<BitfieldSigningMessage>,
mut sender: mpsc::Sender<FromJobCommand>,
mut sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let metrics = metrics.clone();
async move {
@@ -239,7 +243,7 @@ impl JobTrait for BitfieldSigningJob {
// now do all the work we can before we need to wait for the availability store
// if we're not a validator, we can just succeed effortlessly
let validator = match Validator::new(relay_parent, keystore.clone(), sender.clone()).await {
let validator = match Validator::new(relay_parent, keystore.clone(), &mut sender).await {
Ok(validator) => validator,
Err(util::Error::NotAValidator) => return Ok(()),
Err(err) => return Err(Error::Util(err)),
@@ -260,7 +264,7 @@ impl JobTrait for BitfieldSigningJob {
relay_parent,
&span_availability,
validator.index(),
&mut sender,
sender.subsystem_sender(),
).await
{
Err(Error::Runtime(runtime_err)) => {
@@ -295,26 +299,27 @@ impl JobTrait for BitfieldSigningJob {
let _span = span.child("gossip");
sender
.send(
AllMessages::from(
BitfieldDistributionMessage::DistributeBitfield(relay_parent, signed_bitfield),
).into(),
)
.await
.map_err(Into::into)
.send_message(BitfieldDistributionMessage::DistributeBitfield(
relay_parent,
signed_bitfield,
).into())
.await;
Ok(())
}
.boxed()
}
}
/// BitfieldSigningSubsystem manages a number of bitfield signing jobs.
pub type BitfieldSigningSubsystem<Spawner, Context> = JobManager<Spawner, Context, BitfieldSigningJob>;
pub type BitfieldSigningSubsystem<Spawner> = JobSubsystem<BitfieldSigningJob, Spawner>;
#[cfg(test)]
mod tests {
use super::*;
use futures::{pin_mut, executor::block_on};
use polkadot_primitives::v1::{CandidateHash, OccupiedCore};
use polkadot_node_subsystem::messages::AllMessages;
fn occupied_core(para_id: u32, candidate_hash: CandidateHash) -> CoreState {
CoreState::Occupied(OccupiedCore {
@@ -332,10 +337,10 @@ mod tests {
#[test]
fn construct_availability_bitfield_works() {
block_on(async move {
let (mut sender, mut receiver) = mpsc::channel(10);
let relay_parent = Hash::default();
let validator_index = ValidatorIndex(1u32);
let (mut sender, mut receiver) = polkadot_node_subsystem_test_helpers::sender_receiver();
let future = construct_availability_bitfield(
relay_parent,
&jaeger::Span::Disabled,
@@ -350,18 +355,14 @@ mod tests {
loop {
futures::select! {
m = receiver.next() => match m.unwrap() {
FromJobCommand::SendMessage(
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(rp, RuntimeApiRequest::AvailabilityCores(tx)),
),
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(rp, RuntimeApiRequest::AvailabilityCores(tx)),
) => {
assert_eq!(relay_parent, rp);
tx.send(Ok(vec![CoreState::Free, occupied_core(1, hash_a), occupied_core(2, hash_b)])).unwrap();
},
FromJobCommand::SendMessage(
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryChunkAvailability(c_hash, vidx, tx),
),
}
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryChunkAvailability(c_hash, vidx, tx),
) => {
assert_eq!(validator_index, vidx);
@@ -18,3 +18,4 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
[dev-dependencies]
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
+100 -129
View File
@@ -25,16 +25,16 @@ use futures::{
};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_subsystem::{
jaeger, PerLeafSpan,
jaeger, PerLeafSpan, SubsystemSender,
errors::ChainApiError,
messages::{
AllMessages, CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage,
CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage,
RuntimeApiRequest,
},
};
use polkadot_node_subsystem_util::{
self as util, request_from_runtime, request_validator_groups, delegated_subsystem,
JobTrait, FromJobCommand, Validator, metrics::{self, prometheus},
self as util, request_from_runtime, request_validator_groups, JobSubsystem,
JobTrait, JobSender, Validator, metrics::{self, prometheus},
};
use polkadot_primitives::v1::{
CandidateReceipt, CollatorId, CoreState, CoreIndex, Hash, Id as ParaId, PoV, BlockNumber,
@@ -45,22 +45,24 @@ use thiserror::Error;
const LOG_TARGET: &'static str = "parachain::candidate-selection";
struct CandidateSelectionJob {
/// A per-block job in the candidate selection subsystem.
pub struct CandidateSelectionJob {
assignment: ParaId,
sender: mpsc::Sender<FromJobCommand>,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
metrics: Metrics,
seconded_candidate: Option<CollatorId>,
}
/// Errors in the candidate selection subsystem.
#[derive(Debug, Error)]
enum Error {
#[error(transparent)]
Sending(#[from] mpsc::SendError),
pub enum Error {
/// An error in utilities.
#[error(transparent)]
Util(#[from] util::Error),
/// An error receiving on a oneshot channel.
#[error(transparent)]
OneshotRecv(#[from] oneshot::Canceled),
/// An error interacting with the chain API.
#[error(transparent)]
ChainApi(#[from] ChainApiError),
}
@@ -94,13 +96,13 @@ impl JobTrait for CandidateSelectionJob {
const NAME: &'static str = "CandidateSelectionJob";
#[tracing::instrument(skip(keystore, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
fn run<S: SubsystemSender>(
relay_parent: Hash,
span: Arc<jaeger::Span>,
keystore: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
mut sender: mpsc::Sender<FromJobCommand>,
mut sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let span = PerLeafSpan::new(span, "candidate-selection");
async move {
@@ -108,12 +110,12 @@ impl JobTrait for CandidateSelectionJob {
.with_relay_parent(relay_parent)
.with_stage(jaeger::Stage::CandidateSelection);
let (groups, cores) = futures::try_join!(
try_runtime_api!(request_validator_groups(relay_parent, &mut sender).await),
try_runtime_api!(request_from_runtime(
request_validator_groups(relay_parent, &mut sender).await,
request_from_runtime(
relay_parent,
&mut sender,
|tx| RuntimeApiRequest::AvailabilityCores(tx),
).await),
).await,
)?;
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
@@ -126,7 +128,7 @@ impl JobTrait for CandidateSelectionJob {
let n_cores = cores.len();
let validator = match Validator::new(relay_parent, keystore.clone(), sender.clone()).await {
let validator = match Validator::new(relay_parent, keystore.clone(), &mut sender).await {
Ok(validator) => validator,
Err(util::Error::NotAValidator) => return Ok(()),
Err(err) => return Err(Error::Util(err)),
@@ -198,20 +200,20 @@ impl JobTrait for CandidateSelectionJob {
drop(assignment_span);
CandidateSelectionJob::new(assignment, metrics, sender, receiver).run_loop(&span).await
CandidateSelectionJob::new(assignment, metrics, receiver)
.run_loop(&span, sender.subsystem_sender())
.await
}.boxed()
}
}
impl CandidateSelectionJob {
pub fn new(
fn new(
assignment: ParaId,
metrics: Metrics,
sender: mpsc::Sender<FromJobCommand>,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
) -> Self {
Self {
sender,
receiver,
metrics,
assignment,
@@ -219,7 +221,11 @@ impl CandidateSelectionJob {
}
}
async fn run_loop(&mut self, span: &jaeger::Span) -> Result<(), Error> {
async fn run_loop(
&mut self,
span: &jaeger::Span,
sender: &mut impl SubsystemSender,
) -> Result<(), Error> {
let span = span.child("run-loop")
.with_stage(jaeger::Stage::CandidateSelection);
@@ -231,7 +237,7 @@ impl CandidateSelectionJob {
collator_id,
)) => {
let _span = span.child("handle-collation");
self.handle_collation(relay_parent, para_id, collator_id).await;
self.handle_collation(sender, relay_parent, para_id, collator_id).await;
}
Some(CandidateSelectionMessage::Invalid(
_relay_parent,
@@ -241,28 +247,26 @@ impl CandidateSelectionJob {
.with_stage(jaeger::Stage::CandidateSelection)
.with_candidate(candidate_receipt.hash())
.with_relay_parent(_relay_parent);
self.handle_invalid(candidate_receipt).await;
self.handle_invalid(sender, candidate_receipt).await;
}
Some(CandidateSelectionMessage::Seconded(_relay_parent, statement)) => {
let _span = span.child("handle-seconded")
.with_stage(jaeger::Stage::CandidateSelection)
.with_candidate(statement.payload().candidate_hash())
.with_relay_parent(_relay_parent);
self.handle_seconded(statement).await;
self.handle_seconded(sender, statement).await;
}
None => break,
}
}
// closing the sender here means that we don't deadlock in tests
self.sender.close_channel();
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self, sender), fields(subsystem = LOG_TARGET))]
async fn handle_collation(
&mut self,
sender: &mut impl SubsystemSender,
relay_parent: Hash,
para_id: ParaId,
collator_id: CollatorId,
@@ -276,13 +280,7 @@ impl CandidateSelectionJob {
collator_id,
para_id,
);
if let Err(err) = forward_invalidity_note(&collator_id, &mut self.sender).await {
tracing::warn!(
target: LOG_TARGET,
err = ?err,
"failed to forward invalidity note",
);
}
forward_invalidity_note(&collator_id, sender).await;
return;
}
@@ -292,7 +290,7 @@ impl CandidateSelectionJob {
relay_parent,
para_id,
collator_id.clone(),
self.sender.clone(),
sender,
).await {
Ok(response) => response,
Err(err) => {
@@ -305,21 +303,23 @@ impl CandidateSelectionJob {
}
};
match second_candidate(
second_candidate(
relay_parent,
candidate_receipt,
pov,
&mut self.sender,
sender,
&self.metrics,
).await {
Err(err) => tracing::warn!(target: LOG_TARGET, err = ?err, "failed to second a candidate"),
Ok(()) => self.seconded_candidate = Some(collator_id),
}
).await;
self.seconded_candidate = Some(collator_id);
}
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn handle_invalid(&mut self, candidate_receipt: CandidateReceipt) {
#[tracing::instrument(level = "trace", skip(self, sender), fields(subsystem = LOG_TARGET))]
async fn handle_invalid(
&mut self,
sender: &mut impl SubsystemSender,
candidate_receipt: CandidateReceipt,
) {
let _timer = self.metrics.time_handle_invalid();
let received_from = match &self.seconded_candidate {
@@ -338,21 +338,15 @@ impl CandidateSelectionJob {
"received invalidity note for candidate",
);
let result =
if let Err(err) = forward_invalidity_note(received_from, &mut self.sender).await {
tracing::warn!(
target: LOG_TARGET,
err = ?err,
"failed to forward invalidity note",
);
Err(())
} else {
Ok(())
};
self.metrics.on_invalid_selection(result);
forward_invalidity_note(received_from, sender).await;
self.metrics.on_invalid_selection();
}
async fn handle_seconded(&mut self, statement: SignedFullStatement) {
async fn handle_seconded(
&mut self,
sender: &mut impl SubsystemSender,
statement: SignedFullStatement,
) {
let received_from = match &self.seconded_candidate {
Some(peer) => peer,
None => {
@@ -369,27 +363,13 @@ impl CandidateSelectionJob {
"received seconded note for candidate",
);
if let Err(e) = self.sender
.send(AllMessages::from(CollatorProtocolMessage::NoteGoodCollation(received_from.clone())).into()).await
{
tracing::debug!(
target: LOG_TARGET,
error = ?e,
"failed to note good collator"
);
}
sender
.send_message(CollatorProtocolMessage::NoteGoodCollation(received_from.clone()).into())
.await;
if let Err(e) = self.sender
.send(AllMessages::from(
CollatorProtocolMessage::NotifyCollationSeconded(received_from.clone(), statement)
).into()).await
{
tracing::debug!(
target: LOG_TARGET,
error = ?e,
"failed to notify collator about seconded collation"
);
}
sender.send_message(
CollatorProtocolMessage::NotifyCollationSeconded(received_from.clone(), statement).into()
).await;
}
}
@@ -401,17 +381,18 @@ async fn get_collation(
relay_parent: Hash,
para_id: ParaId,
collator_id: CollatorId,
mut sender: mpsc::Sender<FromJobCommand>,
sender: &mut impl SubsystemSender,
) -> Result<(CandidateReceipt, PoV), Error> {
let (tx, rx) = oneshot::channel();
sender
.send(AllMessages::from(CollatorProtocolMessage::FetchCollation(
.send_message(CollatorProtocolMessage::FetchCollation(
relay_parent,
collator_id,
para_id,
tx,
)).into())
.await?;
).into())
.await;
rx.await.map_err(Into::into)
}
@@ -419,45 +400,33 @@ async fn second_candidate(
relay_parent: Hash,
candidate_receipt: CandidateReceipt,
pov: PoV,
sender: &mut mpsc::Sender<FromJobCommand>,
sender: &mut impl SubsystemSender,
metrics: &Metrics,
) -> Result<(), Error> {
match sender
.send(AllMessages::from(CandidateBackingMessage::Second(
) {
sender
.send_message(CandidateBackingMessage::Second(
relay_parent,
candidate_receipt,
pov,
)).into())
.await
{
Err(err) => {
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to send a seconding message");
metrics.on_second(Err(()));
Err(err.into())
}
Ok(_) => {
metrics.on_second(Ok(()));
Ok(())
}
}
).into())
.await;
metrics.on_second();
}
async fn forward_invalidity_note(
received_from: &CollatorId,
sender: &mut mpsc::Sender<FromJobCommand>,
) -> Result<(), Error> {
sender: &mut impl SubsystemSender,
) {
sender
.send(AllMessages::from(CollatorProtocolMessage::ReportCollator(
received_from.clone(),
)).into())
.send_message(CollatorProtocolMessage::ReportCollator(received_from.clone()).into())
.await
.map_err(Into::into)
}
#[derive(Clone)]
struct MetricsInner {
seconds: prometheus::CounterVec<prometheus::U64>,
invalid_selections: prometheus::CounterVec<prometheus::U64>,
seconds: prometheus::Counter<prometheus::U64>,
invalid_selections: prometheus::Counter<prometheus::U64>,
handle_collation: prometheus::Histogram,
handle_invalid: prometheus::Histogram,
}
@@ -467,17 +436,15 @@ struct MetricsInner {
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_second(&self, result: Result<(), ()>) {
fn on_second(&self) {
if let Some(metrics) = &self.0 {
let label = if result.is_ok() { "succeeded" } else { "failed" };
metrics.seconds.with_label_values(&[label]).inc();
metrics.seconds.inc();
}
}
fn on_invalid_selection(&self, result: Result<(), ()>) {
fn on_invalid_selection(&self) {
if let Some(metrics) = &self.0 {
let label = if result.is_ok() { "succeeded" } else { "failed" };
metrics.invalid_selections.with_label_values(&[label]).inc();
metrics.invalid_selections.inc();
}
}
@@ -496,22 +463,20 @@ impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
seconds: prometheus::register(
prometheus::CounterVec::new(
prometheus::Counter::with_opts(
prometheus::Opts::new(
"candidate_selection_seconds_total",
"Number of Candidate Selection subsystem seconding events.",
),
&["success"],
)?,
registry,
)?,
invalid_selections: prometheus::register(
prometheus::CounterVec::new(
prometheus::Counter::with_opts(
prometheus::Opts::new(
"candidate_selection_invalid_selections_total",
"Number of Candidate Selection subsystem seconding selections which proved to be invalid.",
),
&["success"],
)?,
registry,
)?,
@@ -538,13 +503,15 @@ impl metrics::Metrics for Metrics {
}
}
delegated_subsystem!(CandidateSelectionJob(SyncCryptoStorePtr, Metrics) <- CandidateSelectionMessage as CandidateSelectionSubsystem);
/// The candidate selection subsystem.
pub type CandidateSelectionSubsystem<Spawner> = JobSubsystem<CandidateSelectionJob, Spawner>;
#[cfg(test)]
mod tests {
use super::*;
use futures::lock::Mutex;
use polkadot_primitives::v1::BlockData;
use polkadot_node_subsystem::messages::AllMessages;
use sp_core::crypto::Public;
use std::sync::Arc;
@@ -554,15 +521,14 @@ mod tests {
postconditions: Postconditions,
) where
Preconditions: FnOnce(&mut CandidateSelectionJob),
TestBuilder: FnOnce(mpsc::Sender<CandidateSelectionMessage>, mpsc::Receiver<FromJobCommand>) -> Test,
TestBuilder: FnOnce(mpsc::Sender<CandidateSelectionMessage>, mpsc::UnboundedReceiver<AllMessages>) -> Test,
Test: Future<Output = ()>,
Postconditions: FnOnce(CandidateSelectionJob, Result<(), Error>),
{
let (to_job_tx, to_job_rx) = mpsc::channel(0);
let (from_job_tx, from_job_rx) = mpsc::channel(0);
let (mut from_job_tx, from_job_rx) = polkadot_node_subsystem_test_helpers::sender_receiver();
let mut job = CandidateSelectionJob {
assignment: 123.into(),
sender: from_job_tx,
receiver: to_job_rx,
metrics: Default::default(),
seconded_candidate: None,
@@ -570,9 +536,13 @@ mod tests {
preconditions(&mut job);
let span = jaeger::Span::Disabled;
let (_, job_result) = futures::executor::block_on(future::join(
let (_, (job, job_result)) = futures::executor::block_on(future::join(
test(to_job_tx, from_job_rx),
job.run_loop(&span),
async move {
let res = job.run_loop(&span, &mut from_job_tx).await;
drop(from_job_tx);
(job, res)
},
));
postconditions(job, job_result);
@@ -609,12 +579,12 @@ mod tests {
while let Some(msg) = from_job.next().await {
match msg {
FromJobCommand::SendMessage(AllMessages::CollatorProtocol(CollatorProtocolMessage::FetchCollation(
AllMessages::CollatorProtocol(CollatorProtocolMessage::FetchCollation(
got_relay_parent,
collator_id,
got_para_id,
return_sender,
))) => {
)) => {
assert_eq!(got_relay_parent, relay_parent);
assert_eq!(got_para_id, para_id);
assert_eq!(collator_id, collator_id_clone);
@@ -623,11 +593,11 @@ mod tests {
.send((candidate_receipt.clone(), pov.clone()))
.unwrap();
}
FromJobCommand::SendMessage(AllMessages::CandidateBacking(CandidateBackingMessage::Second(
AllMessages::CandidateBacking(CandidateBackingMessage::Second(
got_relay_parent,
got_candidate_receipt,
got_pov,
))) => {
)) => {
assert_eq!(got_relay_parent, relay_parent);
assert_eq!(got_candidate_receipt, candidate_receipt);
assert_eq!(got_pov, pov);
@@ -674,11 +644,11 @@ mod tests {
while let Some(msg) = from_job.next().await {
match msg {
FromJobCommand::SendMessage(AllMessages::CandidateBacking(CandidateBackingMessage::Second(
AllMessages::CandidateBacking(CandidateBackingMessage::Second(
_got_relay_parent,
_got_candidate_receipt,
_got_pov,
))) => {
)) => {
*was_seconded_clone.lock().await = true;
}
other => panic!("unexpected message from job: {:?}", other),
@@ -713,13 +683,14 @@ mod tests {
.send(CandidateSelectionMessage::Invalid(relay_parent, candidate_receipt))
.await
.unwrap();
std::mem::drop(to_job);
while let Some(msg) = from_job.next().await {
match msg {
FromJobCommand::SendMessage(AllMessages::CollatorProtocol(CollatorProtocolMessage::ReportCollator(
AllMessages::CollatorProtocol(CollatorProtocolMessage::ReportCollator(
got_collator_id,
))) => {
)) => {
assert_eq!(got_collator_id, collator_id_clone);
*sent_report_clone.lock().await = true;
@@ -17,3 +17,4 @@ futures-timer = "3.0.2"
[dev-dependencies]
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
+34 -29
View File
@@ -25,14 +25,14 @@ use futures::{
prelude::*,
};
use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError}, PerLeafSpan, jaeger,
errors::{ChainApiError, RuntimeApiError}, PerLeafSpan, SubsystemSender, jaeger,
messages::{
AllMessages, CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData,
CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData,
ProvisionerMessage,
},
};
use polkadot_node_subsystem_util::{
self as util, delegated_subsystem, FromJobCommand,
self as util, JobSubsystem, JobSender,
request_availability_cores, request_persisted_validation_data, JobTrait, metrics::{self, prometheus},
};
use polkadot_primitives::v1::{
@@ -80,9 +80,9 @@ impl InherentAfter {
}
}
struct ProvisioningJob {
/// A per-relay-parent job for the provisioning subsystem.
pub struct ProvisioningJob {
relay_parent: Hash,
sender: mpsc::Sender<FromJobCommand>,
receiver: mpsc::Receiver<ProvisionerMessage>,
backed_candidates: Vec<CandidateReceipt>,
signed_bitfields: Vec<SignedAvailabilityBitfield>,
@@ -91,8 +91,10 @@ struct ProvisioningJob {
awaiting_inherent: Vec<oneshot::Sender<ProvisionerInherentData>>
}
/// Errors in the provisioner.
#[derive(Debug, Error)]
enum Error {
#[allow(missing_docs)]
pub enum Error {
#[error(transparent)]
Util(#[from] util::Error),
@@ -139,38 +141,35 @@ impl JobTrait for ProvisioningJob {
//
// this function is in charge of creating and executing the job's main loop
#[tracing::instrument(skip(span, _run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
fn run<S: SubsystemSender>(
relay_parent: Hash,
span: Arc<jaeger::Span>,
_run_args: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<ProvisionerMessage>,
sender: mpsc::Sender<FromJobCommand>,
mut sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let job = ProvisioningJob::new(
relay_parent,
metrics,
sender,
receiver,
);
job.run_loop(PerLeafSpan::new(span, "provisioner")).await
job.run_loop(sender.subsystem_sender(), PerLeafSpan::new(span, "provisioner")).await
}
.boxed()
}
}
impl ProvisioningJob {
pub fn new(
fn new(
relay_parent: Hash,
metrics: Metrics,
sender: mpsc::Sender<FromJobCommand>,
receiver: mpsc::Receiver<ProvisionerMessage>,
) -> Self {
Self {
relay_parent,
sender,
receiver,
backed_candidates: Vec::new(),
signed_bitfields: Vec::new(),
@@ -180,7 +179,11 @@ impl ProvisioningJob {
}
}
async fn run_loop(mut self, span: PerLeafSpan) -> Result<(), Error> {
async fn run_loop(
mut self,
sender: &mut impl SubsystemSender,
span: PerLeafSpan,
) -> Result<(), Error> {
use ProvisionerMessage::{
ProvisionableData, RequestInherentData,
};
@@ -192,7 +195,7 @@ impl ProvisioningJob {
let _timer = self.metrics.time_request_inherent_data();
if self.inherent_after.is_ready() {
self.send_inherent_data(vec![return_sender]).await;
self.send_inherent_data(sender, vec![return_sender]).await;
} else {
self.awaiting_inherent.push(return_sender);
}
@@ -209,7 +212,7 @@ impl ProvisioningJob {
let _span = span.child("send-inherent-data");
let return_senders = std::mem::take(&mut self.awaiting_inherent);
if !return_senders.is_empty() {
self.send_inherent_data(return_senders).await;
self.send_inherent_data(sender, return_senders).await;
}
}
}
@@ -220,6 +223,7 @@ impl ProvisioningJob {
async fn send_inherent_data(
&mut self,
sender: &mut impl SubsystemSender,
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
) {
if let Err(err) = send_inherent_data(
@@ -227,7 +231,7 @@ impl ProvisioningJob {
&self.signed_bitfields,
&self.backed_candidates,
return_senders,
&mut self.sender,
sender,
)
.await
{
@@ -279,10 +283,10 @@ async fn send_inherent_data(
bitfields: &[SignedAvailabilityBitfield],
candidates: &[CandidateReceipt],
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
from_job: &mut mpsc::Sender<FromJobCommand>,
from_job: &mut impl SubsystemSender,
) -> Result<(), Error> {
let availability_cores = request_availability_cores(relay_parent, from_job)
.await?
.await
.await.map_err(|err| Error::CanceledAvailabilityCores(err))??;
let bitfields = select_availability_bitfields(&availability_cores, bitfields);
@@ -351,7 +355,7 @@ async fn select_candidates(
bitfields: &[SignedAvailabilityBitfield],
candidates: &[CandidateReceipt],
relay_parent: Hash,
sender: &mut mpsc::Sender<FromJobCommand>,
sender: &mut impl SubsystemSender,
) -> Result<Vec<BackedCandidate>, Error> {
let block_number = get_block_number_under_construction(relay_parent, sender).await?;
@@ -388,7 +392,7 @@ async fn select_candidates(
assumption,
sender,
)
.await?
.await
.await.map_err(|err| Error::CanceledPersistedValidationData(err))??
{
Some(v) => v,
@@ -418,11 +422,11 @@ async fn select_candidates(
// now get the backed candidates corresponding to these candidate receipts
let (tx, rx) = oneshot::channel();
sender.send(AllMessages::CandidateBacking(CandidateBackingMessage::GetBackedCandidates(
sender.send_message(CandidateBackingMessage::GetBackedCandidates(
relay_parent,
selected_candidates.clone(),
tx,
)).into()).await.map_err(|err| Error::GetBackedCandidatesSend(err))?;
).into()).await;
let mut candidates = rx.await.map_err(|err| Error::CanceledBackedCandidates(err))?;
// `selected_candidates` is generated in ascending order by core index, and `GetBackedCandidates`
@@ -470,16 +474,16 @@ async fn select_candidates(
#[tracing::instrument(level = "trace", skip(sender), fields(subsystem = LOG_TARGET))]
async fn get_block_number_under_construction(
relay_parent: Hash,
sender: &mut mpsc::Sender<FromJobCommand>,
sender: &mut impl SubsystemSender,
) -> Result<BlockNumber, Error> {
let (tx, rx) = oneshot::channel();
sender
.send(AllMessages::from(ChainApiMessage::BlockNumber(
.send_message(ChainApiMessage::BlockNumber(
relay_parent,
tx,
)).into())
.await
.map_err(|e| Error::ChainApiMessageSend(e))?;
).into())
.await;
match rx.await.map_err(|err| Error::CanceledBlockNumber(err))? {
Ok(Some(n)) => Ok(n + 1),
Ok(None) => Ok(0),
@@ -596,7 +600,8 @@ impl metrics::Metrics for Metrics {
}
delegated_subsystem!(ProvisioningJob((), Metrics) <- ProvisionerMessage as ProvisioningSubsystem);
/// The provisioning subsystem.
pub type ProvisioningSubsystem<Spawner> = JobSubsystem<ProvisioningJob, Spawner>;
#[cfg(test)]
mod tests;
+17 -34
View File
@@ -191,7 +191,6 @@ mod select_availability_bitfields {
}
mod select_candidates {
use futures_timer::Delay;
use super::super::*;
use super::{build_occupied_core, occupied_core, scheduled_core, default_bitvec};
use polkadot_node_subsystem::messages::{
@@ -201,6 +200,7 @@ mod select_candidates {
use polkadot_primitives::v1::{
BlockNumber, CandidateDescriptor, PersistedValidationData, CommittedCandidateReceipt, CandidateCommitments,
};
use polkadot_node_subsystem_test_helpers::TestSubsystemSender;
const BLOCK_UNDER_PRODUCTION: BlockNumber = 128;
@@ -208,12 +208,12 @@ mod select_candidates {
overseer_factory: OverseerFactory,
test_factory: TestFactory,
) where
OverseerFactory: FnOnce(mpsc::Receiver<FromJobCommand>) -> Overseer,
OverseerFactory: FnOnce(mpsc::UnboundedReceiver<AllMessages>) -> Overseer,
Overseer: Future<Output = ()>,
TestFactory: FnOnce(mpsc::Sender<FromJobCommand>) -> Test,
TestFactory: FnOnce(TestSubsystemSender) -> Test,
Test: Future<Output = ()>,
{
let (tx, rx) = mpsc::channel(64);
let (tx, rx) = polkadot_node_subsystem_test_helpers::sender_receiver();
let overseer = overseer_factory(rx);
let test = test_factory(tx);
@@ -298,24 +298,27 @@ mod select_candidates {
]
}
async fn mock_overseer(mut receiver: mpsc::Receiver<FromJobCommand>, expected: Vec<BackedCandidate>) {
async fn mock_overseer(
mut receiver: mpsc::UnboundedReceiver<AllMessages>,
expected: Vec<BackedCandidate>,
) {
use ChainApiMessage::BlockNumber;
use RuntimeApiMessage::Request;
while let Some(from_job) = receiver.next().await {
match from_job {
FromJobCommand::SendMessage(AllMessages::ChainApi(BlockNumber(_relay_parent, tx))) => {
AllMessages::ChainApi(BlockNumber(_relay_parent, tx)) => {
tx.send(Ok(Some(BLOCK_UNDER_PRODUCTION - 1))).unwrap()
}
FromJobCommand::SendMessage(AllMessages::RuntimeApi(Request(
AllMessages::RuntimeApi(Request(
_parent_hash,
PersistedValidationDataReq(_para_id, _assumption, tx),
))) => tx.send(Ok(Some(Default::default()))).unwrap(),
FromJobCommand::SendMessage(AllMessages::RuntimeApi(Request(_parent_hash, AvailabilityCores(tx)))) => {
)) => tx.send(Ok(Some(Default::default()))).unwrap(),
AllMessages::RuntimeApi(Request(_parent_hash, AvailabilityCores(tx))) => {
tx.send(Ok(mock_availability_cores())).unwrap()
}
FromJobCommand::SendMessage(
AllMessages::CandidateBacking(CandidateBackingMessage::GetBackedCandidates(_, _, sender))
AllMessages::CandidateBacking(
CandidateBackingMessage::GetBackedCandidates(_, _, sender)
) => {
let _ = sender.send(expected.clone());
}
@@ -324,29 +327,9 @@ mod select_candidates {
}
}
#[test]
fn handles_overseer_failure() {
let overseer = |rx: mpsc::Receiver<FromJobCommand>| async move {
// drop the receiver so it closes and the sender can't send, then just sleep long enough that
// this is almost certainly not the first of the two futures to complete
std::mem::drop(rx);
Delay::new(std::time::Duration::from_secs(1)).await;
};
let test = |mut tx: mpsc::Sender<FromJobCommand>| async move {
// wait so that the overseer can drop the rx before we attempt to send
Delay::new(std::time::Duration::from_millis(50)).await;
let result = select_candidates(&[], &[], &[], Default::default(), &mut tx).await;
println!("{:?}", result);
assert!(std::matches!(result, Err(Error::ChainApiMessageSend(_))));
};
test_harness(overseer, test);
}
#[test]
fn can_succeed() {
test_harness(|r| mock_overseer(r, Vec::new()), |mut tx: mpsc::Sender<FromJobCommand>| async move {
test_harness(|r| mock_overseer(r, Vec::new()), |mut tx: TestSubsystemSender| async move {
select_candidates(&[], &[], &[], Default::default(), &mut tx).await.unwrap();
})
}
@@ -411,7 +394,7 @@ mod select_candidates {
})
.collect();
test_harness(|r| mock_overseer(r, expected_backed), |mut tx: mpsc::Sender<FromJobCommand>| async move {
test_harness(|r| mock_overseer(r, expected_backed), |mut tx: TestSubsystemSender| async move {
let result =
select_candidates(&mock_cores, &[], &candidates, Default::default(), &mut tx)
.await.unwrap();
@@ -470,7 +453,7 @@ mod select_candidates {
})
.collect();
test_harness(|r| mock_overseer(r, expected_backed), |mut tx: mpsc::Sender<FromJobCommand>| async move {
test_harness(|r| mock_overseer(r, expected_backed), |mut tx: TestSubsystemSender| async move {
let result =
select_candidates(&mock_cores, &[], &candidates, Default::default(), &mut tx)
.await.unwrap();
+1 -1
View File
@@ -15,10 +15,10 @@ sc-network = { git = "https://github.com/paritytech/substrate", branch = "master
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-network-protocol = { path = "../protocol" }
strum = "0.20.0"
parking_lot = "0.11.1"
[dev-dependencies]
assert_matches = "1.4.0"
parking_lot = "0.11.1"
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
polkadot-node-subsystem-util = { path = "../../subsystem-util"}
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
-223
View File
@@ -1,223 +0,0 @@
// Copyright 2020-2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//
use futures::channel::mpsc;
use parity_scale_codec::Decode;
use polkadot_node_network_protocol::{
peer_set::PeerSet, v1 as protocol_v1, PeerId, UnifiedReputationChange,
};
use polkadot_primitives::v1::{AuthorityDiscoveryId, BlockNumber};
use polkadot_subsystem::messages::{AllMessages, NetworkBridgeMessage};
use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal};
use sc_network::{Event as NetworkEvent, IfDisconnected};
use polkadot_node_network_protocol::{request_response::Requests, ObservedRole};
use super::multiplexer::RequestMultiplexError;
use super::{WireMessage, MALFORMED_MESSAGE_COST};
/// Internal type combining all actions a `NetworkBridge` might perform.
///
/// Both messages coming from the network (`NetworkEvent`) and messages coming from other
/// subsystems (`FromOverseer`) will be converted to `Action` in `run_network` before being
/// processed.
#[derive(Debug)]
pub(crate) enum Action {
/// Ask network to send a validation message.
SendValidationMessages(Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>),
/// Ask network to send a collation message.
SendCollationMessages(Vec<(Vec<PeerId>, protocol_v1::CollationProtocol)>),
/// Ask network to send requests.
SendRequests(Vec<Requests>, IfDisconnected),
/// Ask network to connect to validators.
ConnectToValidators {
validator_ids: Vec<AuthorityDiscoveryId>,
peer_set: PeerSet,
connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
},
/// Report a peer to the network implementation (decreasing/increasing its reputation).
ReportPeer(PeerId, UnifiedReputationChange),
/// Disconnect a peer from the given peer-set without affecting their reputation.
DisconnectPeer(PeerId, PeerSet),
/// A subsystem updates us on the relay chain leaves we consider active.
///
/// Implementation will send `WireMessage::ViewUpdate` message to peers as appropriate to the
/// change.
ActiveLeaves(ActiveLeavesUpdate),
/// A subsystem updates our view on the latest finalized block.
///
/// This information is used for view updates, see also `ActiveLeaves`.
BlockFinalized(BlockNumber),
/// Network tells us about a new peer.
PeerConnected(PeerSet, PeerId, ObservedRole),
/// Network tells us about a peer that left.
PeerDisconnected(PeerSet, PeerId),
/// Messages from the network targeted to other subsystems.
PeerMessages(
PeerId,
Vec<WireMessage<protocol_v1::ValidationProtocol>>,
Vec<WireMessage<protocol_v1::CollationProtocol>>,
),
/// Send a message to another subsystem or the overseer.
///
/// Used for handling incoming requests.
SendMessage(AllMessages),
/// Abort with reason.
Abort(AbortReason),
Nop,
}
#[derive(Debug)]
pub(crate) enum AbortReason {
/// Received error from overseer:
SubsystemError(polkadot_subsystem::SubsystemError),
/// The stream of incoming events concluded.
EventStreamConcluded,
/// The stream of incoming requests concluded.
RequestStreamConcluded,
/// We received OverseerSignal::Conclude
OverseerConcluded,
}
impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>> for Action {
fn from(
res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>,
) -> Self {
match res {
Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves))) => {
Action::ActiveLeaves(active_leaves)
}
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number))) => {
Action::BlockFinalized(number)
}
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => {
Action::Abort(AbortReason::OverseerConcluded)
}
Ok(FromOverseer::Communication { msg }) => match msg {
NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => {
Action::DisconnectPeer(peer, peer_set)
}
NetworkBridgeMessage::SendValidationMessage(peers, msg) => {
Action::SendValidationMessages(vec![(peers, msg)])
}
NetworkBridgeMessage::SendCollationMessage(peers, msg) => {
Action::SendCollationMessages(vec![(peers, msg)])
}
NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => Action::SendRequests(reqs, if_disconnected),
NetworkBridgeMessage::SendValidationMessages(msgs) => {
Action::SendValidationMessages(msgs)
}
NetworkBridgeMessage::SendCollationMessages(msgs) => {
Action::SendCollationMessages(msgs)
}
NetworkBridgeMessage::ConnectToValidators {
validator_ids,
peer_set,
connected,
} => Action::ConnectToValidators {
validator_ids,
peer_set,
connected,
},
},
Err(e) => Action::Abort(AbortReason::SubsystemError(e)),
}
}
}
impl From<Option<NetworkEvent>> for Action {
fn from(event: Option<NetworkEvent>) -> Action {
match event {
None => Action::Abort(AbortReason::EventStreamConcluded),
Some(NetworkEvent::Dht(_))
| Some(NetworkEvent::SyncConnected { .. })
| Some(NetworkEvent::SyncDisconnected { .. }) => Action::Nop,
Some(NetworkEvent::NotificationStreamOpened {
remote,
protocol,
role,
}) => {
let role = role.into();
PeerSet::try_from_protocol_name(&protocol).map_or(Action::Nop, |peer_set| {
Action::PeerConnected(peer_set, remote, role)
})
}
Some(NetworkEvent::NotificationStreamClosed { remote, protocol }) => {
PeerSet::try_from_protocol_name(&protocol).map_or(Action::Nop, |peer_set| {
Action::PeerDisconnected(peer_set, remote)
})
}
Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
let v_messages: Result<Vec<_>, _> = messages
.iter()
.filter(|(protocol, _)| {
protocol == &PeerSet::Validation.into_protocol_name()
})
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.collect();
let v_messages = match v_messages {
Err(_) => return Action::ReportPeer(remote, MALFORMED_MESSAGE_COST),
Ok(v) => v,
};
let c_messages: Result<Vec<_>, _> = messages
.iter()
.filter(|(protocol, _)| {
protocol == &PeerSet::Collation.into_protocol_name()
})
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.collect();
match c_messages {
Err(_) => Action::ReportPeer(remote, MALFORMED_MESSAGE_COST),
Ok(c_messages) => {
if v_messages.is_empty() && c_messages.is_empty() {
Action::Nop
} else {
Action::PeerMessages(remote, v_messages, c_messages)
}
}
}
}
}
}
}
impl From<Option<Result<AllMessages, RequestMultiplexError>>> for Action {
fn from(event: Option<Result<AllMessages, RequestMultiplexError>>) -> Self {
match event {
None => Action::Abort(AbortReason::RequestStreamConcluded),
Some(Err(err)) => Action::ReportPeer(err.peer, MALFORMED_MESSAGE_COST),
Some(Ok(msg)) => Action::SendMessage(msg),
}
}
}
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -99,7 +99,7 @@ pub enum NetworkAction {
/// An abstraction over networking for the purposes of this subsystem.
#[async_trait]
pub trait Network: Send + 'static {
pub trait Network: Clone + Send + 'static {
/// Get a stream of all events occurring on the network. This may include events unrelated
/// to the Polkadot protocol - the user of this function should filter only for events related
/// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME)
@@ -162,6 +162,15 @@ pub struct TestSubsystemSender {
tx: mpsc::UnboundedSender<AllMessages>,
}
/// Construct a sender/receiver pair.
pub fn sender_receiver() -> (TestSubsystemSender, mpsc::UnboundedReceiver<AllMessages>) {
let (tx, rx) = mpsc::unbounded();
(
TestSubsystemSender { tx },
rx,
)
}
#[async_trait::async_trait]
impl SubsystemSender for TestSubsystemSender {
async fn send_message(&mut self, msg: AllMessages) {
+222 -364
View File
@@ -27,7 +27,8 @@
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender, BoundToRelayParent},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemSender,
ActiveLeavesUpdate, OverseerSignal,
};
use polkadot_node_jaeger as jaeger;
use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::Stream};
@@ -100,20 +101,20 @@ pub enum Error {
pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
/// Request some data from the `RuntimeApi`.
pub async fn request_from_runtime<RequestBuilder, Response, FromJob>(
pub async fn request_from_runtime<RequestBuilder, Response, Sender>(
parent: Hash,
sender: &mut mpsc::Sender<FromJob>,
sender: &mut Sender,
request_builder: RequestBuilder,
) -> Result<RuntimeApiReceiver<Response>, Error>
) -> RuntimeApiReceiver<Response>
where
RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
FromJob: From<AllMessages>,
Sender: SubsystemSender,
{
let (tx, rx) = oneshot::channel();
sender.send(AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx))).into()).await?;
sender.send_message(RuntimeApiMessage::Request(parent, request_builder(tx)).into()).await;
Ok(rx)
rx
}
/// Construct specialized request functions for the runtime.
@@ -132,16 +133,13 @@ macro_rules! specialize_requests {
#[doc = "Request `"]
#[doc = $doc_name]
#[doc = "` from the runtime"]
pub async fn $func_name<FromJob>(
pub async fn $func_name(
parent: Hash,
$(
$param_name: $param_ty,
)*
sender: &mut mpsc::Sender<FromJob>,
) -> Result<RuntimeApiReceiver<$return_ty>, Error>
where
FromJob: From<AllMessages>,
{
sender: &mut impl SubsystemSender,
) -> RuntimeApiReceiver<$return_ty> {
request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
$( $param_name, )* tx
)).await
@@ -282,20 +280,17 @@ pub struct Validator {
impl Validator {
/// Get a struct representing this node's validator if this node is in fact a validator in the context of the given block.
pub async fn new<FromJob>(
pub async fn new(
parent: Hash,
keystore: SyncCryptoStorePtr,
mut sender: mpsc::Sender<FromJob>,
) -> Result<Self, Error>
where
FromJob: From<AllMessages>,
{
sender: &mut JobSender<impl SubsystemSender>,
) -> Result<Self, Error> {
// Note: request_validators and request_session_index_for_child do not and cannot
// run concurrently: they both have a mutable handle to the same sender.
// However, each of them returns a oneshot::Receiver, and those are resolved concurrently.
let (validators, session_index) = futures::try_join!(
request_validators(parent, &mut sender).await?,
request_session_index_for_child(parent, &mut sender).await?,
request_validators(parent, sender).await,
request_session_index_for_child(parent, sender).await,
)?;
let signing_context = SigningContext {
@@ -429,27 +424,76 @@ pub mod metrics {
/// Commands from a job to the broader subsystem.
pub enum FromJobCommand {
/// Send a message to another subsystem.
SendMessage(AllMessages),
/// Spawn a child task on the executor.
Spawn(&'static str, Pin<Box<dyn Future<Output = ()> + Send>>),
/// Spawn a blocking child task on the executor's dedicated thread pool.
SpawnBlocking(&'static str, Pin<Box<dyn Future<Output = ()> + Send>>),
}
impl fmt::Debug for FromJobCommand {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::SendMessage(msg) => write!(fmt, "FromJobCommand::SendMessage({:?})", msg),
Self::Spawn(name, _) => write!(fmt, "FromJobCommand::Spawn({})", name),
Self::SpawnBlocking(name, _) => write!(fmt, "FromJobCommand::SpawnBlocking({})", name),
}
/// A sender for messages from jobs, as well as commands to the overseer.
#[derive(Clone)]
pub struct JobSender<S> {
sender: S,
from_job: mpsc::Sender<FromJobCommand>,
}
impl<S: SubsystemSender> JobSender<S> {
/// Get access to the underlying subsystem sender.
pub fn subsystem_sender(&mut self) -> &mut S {
&mut self.sender
}
/// Send a direct message to some other `Subsystem`, routed based on message type.
pub async fn send_message(&mut self, msg: AllMessages) {
self.sender.send_message(msg).await
}
/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
pub async fn send_messages<T>(&mut self, msgs: T)
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
{
self.sender.send_messages(msgs).await
}
/// Send a message onto the unbounded queue of some other `Subsystem`, routed based on message
/// type.
///
/// This function should be used only when there is some other bounding factor on the messages
/// sent with it. Otherwise, it risks a memory leak.
pub fn send_unbounded_message(&mut self, msg: AllMessages) {
self.sender.send_unbounded_message(msg)
}
/// Send a command to the subsystem, to be relayed onwards to the overseer.
pub async fn send_command(&mut self, msg: FromJobCommand) -> Result<(), mpsc::SendError> {
self.from_job.send(msg).await
}
}
impl From<AllMessages> for FromJobCommand {
fn from(msg: AllMessages) -> Self {
Self::SendMessage(msg)
#[async_trait::async_trait]
impl<S: SubsystemSender> SubsystemSender for JobSender<S> {
async fn send_message(&mut self, msg: AllMessages) {
self.sender.send_message(msg).await
}
async fn send_messages<T>(&mut self, msgs: T)
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
{
self.sender.send_messages(msgs).await
}
fn send_unbounded_message(&mut self, msg: AllMessages) {
self.sender.send_unbounded_message(msg)
}
}
impl fmt::Debug for FromJobCommand {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Spawn(name, _) => write!(fmt, "FromJobCommand::Spawn({})", name),
Self::SpawnBlocking(name, _) => write!(fmt, "FromJobCommand::SpawnBlocking({})", name),
}
}
}
@@ -457,7 +501,7 @@ impl From<AllMessages> for FromJobCommand {
///
/// Jobs are instantiated and killed automatically on appropriate overseer messages.
/// Other messages are passed along to and from the job via the overseer to other subsystems.
pub trait JobTrait: Unpin {
pub trait JobTrait: Unpin + Sized {
/// Message type used to send messages to the job.
type ToJob: 'static + BoundToRelayParent + Send;
/// Job runtime error.
@@ -479,13 +523,13 @@ pub trait JobTrait: Unpin {
/// Run a job for the given relay `parent`.
///
/// The job should be ended when `receiver` returns `None`.
fn run(
fn run<S: SubsystemSender>(
parent: Hash,
span: Arc<jaeger::Span>,
run_args: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<Self::ToJob>,
sender: mpsc::Sender<FromJobCommand>,
sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>>;
}
@@ -493,7 +537,7 @@ pub trait JobTrait: Unpin {
///
/// Wraps the utility error type and the job-specific error
#[derive(Debug, Error)]
pub enum JobsError<JobError: 'static + std::error::Error> {
pub enum JobsError<JobError: std::fmt::Debug + std::error::Error + 'static> {
/// utility error
#[error("Utility")]
Utility(#[source] Error),
@@ -508,61 +552,50 @@ pub enum JobsError<JobError: 'static + std::error::Error> {
/// - Closes old jobs for a given relay-parent on demand.
/// - Dispatches messages to the appropriate job for a given relay-parent.
/// - When dropped, aborts all remaining jobs.
/// - implements `Stream<Item=Job::FromJob>`, collecting all messages from subordinate jobs.
/// - implements `Stream<Item=FromJobCommand>`, collecting all messages from subordinate jobs.
#[pin_project]
pub struct Jobs<Spawner, Job: JobTrait> {
struct Jobs<Spawner, ToJob> {
spawner: Spawner,
running: HashMap<Hash, JobHandle<Job::ToJob>>,
running: HashMap<Hash, JobHandle<ToJob>>,
outgoing_msgs: StreamUnordered<mpsc::Receiver<FromJobCommand>>,
#[pin]
job: std::marker::PhantomData<Job>,
errors: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
}
impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
impl<Spawner: SpawnNamed, ToJob: Send + 'static> Jobs<Spawner, ToJob> {
/// Create a new Jobs manager which handles spawning appropriate jobs.
pub fn new(spawner: Spawner) -> Self {
Self {
spawner,
running: HashMap::new(),
outgoing_msgs: StreamUnordered::new(),
job: std::marker::PhantomData,
errors: None,
}
}
/// Monitor errors which may occur during handling of a spawned job.
///
/// By default, an error in a job is simply logged. Once this is called,
/// the error is forwarded onto the provided channel.
///
/// Errors if the error channel already exists.
pub fn forward_errors(
&mut self,
tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>,
) -> Result<(), Error> {
if self.errors.is_some() {
return Err(Error::AlreadyForwarding);
}
self.errors = Some(tx);
Ok(())
}
/// Spawn a new job for this `parent_hash`, with whatever args are appropriate.
fn spawn_job(
fn spawn_job<Job, Sender>(
&mut self,
parent_hash: Hash,
span: Arc<jaeger::Span>,
run_args: Job::RunArgs,
metrics: Job::Metrics,
) -> Result<(), Error> {
sender: Sender,
)
where Job: JobTrait<ToJob = ToJob>, Sender: SubsystemSender,
{
let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let err_tx = self.errors.clone();
let (future, abort_handle) = future::abortable(async move {
if let Err(e) = Job::run(parent_hash, span, run_args, metrics, to_job_rx, from_job_tx).await {
if let Err(e) = Job::run(
parent_hash,
span,
run_args,
metrics,
to_job_rx,
JobSender {
sender,
from_job: from_job_tx,
},
).await {
tracing::error!(
job = Job::NAME,
parent_hash = %parent_hash,
@@ -570,19 +603,13 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
"job finished with an error",
);
if let Some(mut err_tx) = err_tx {
// if we can't send the notification of error on the error channel, then
// there's no point trying to propagate this error onto the channel too
// all we can do is warn that error propagation has failed
if let Err(e) = err_tx.send((Some(parent_hash), JobsError::Job(e))).await {
tracing::warn!(err = ?e, "failed to forward error");
}
}
return Err(e);
}
Ok(())
});
self.spawner.spawn(Job::NAME, future.map(drop).boxed());
self.outgoing_msgs.push(from_job_rx);
let handle = JobHandle {
@@ -591,8 +618,6 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
};
self.running.insert(parent_hash, handle);
Ok(())
}
/// Stop the job associated with this `parent_hash`.
@@ -601,7 +626,7 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
}
/// Send a message to the appropriate job for this `parent_hash`.
async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) {
async fn send_msg(&mut self, parent_hash: Hash, msg: ToJob) {
if let Entry::Occupied(mut job) = self.running.entry(parent_hash) {
if job.get_mut().send_msg(msg).await.is_err() {
job.remove();
@@ -610,10 +635,9 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
}
}
impl<Spawner, Job> Stream for Jobs<Spawner, Job>
impl<Spawner, ToJob> Stream for Jobs<Spawner, ToJob>
where
Spawner: SpawnNamed,
Job: JobTrait,
{
type Item = FromJobCommand;
@@ -633,107 +657,123 @@ where
}
}
impl<Spawner, Job> stream::FusedStream for Jobs<Spawner, Job>
impl<Spawner, ToJob> stream::FusedStream for Jobs<Spawner, ToJob>
where
Spawner: SpawnNamed,
Job: JobTrait,
{
fn is_terminated(&self) -> bool {
false
}
}
/// A basic implementation of a subsystem.
///
/// This struct is responsible for handling message traffic between
/// this subsystem and the overseer. It spawns and kills jobs on the
/// appropriate Overseer messages, and dispatches standard traffic to
/// the appropriate job the rest of the time.
pub struct JobManager<Spawner, Context, Job: JobTrait> {
/// Parameters to a job subsystem.
struct JobSubsystemParams<Spawner, RunArgs, Metrics> {
/// A spawner for sub-tasks.
spawner: Spawner,
run_args: Job::RunArgs,
metrics: Job::Metrics,
context: std::marker::PhantomData<Context>,
job: std::marker::PhantomData<Job>,
errors: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
/// Arguments to each job.
run_args: RunArgs,
/// Metrics for the subsystem.
metrics: Metrics,
}
impl<Spawner, Context, Job> JobManager<Spawner, Context, Job>
where
Spawner: SpawnNamed + Clone + Send + Unpin,
Context: SubsystemContext,
Job: 'static + JobTrait,
Job::RunArgs: Clone,
Job::ToJob: From<<Context as SubsystemContext>::Message> + Sync,
{
/// Creates a new `Subsystem`.
/// A subsystem which wraps jobs.
///
/// Conceptually, this is very simple: it just loops forever.
///
/// - On incoming overseer messages, it starts or stops jobs as appropriate.
/// - On other incoming messages, if they can be converted into Job::ToJob and
/// include a hash, then they're forwarded to the appropriate individual job.
/// - On outgoing messages from the jobs, it forwards them to the overseer.
pub struct JobSubsystem<Job: JobTrait, Spawner> {
params: JobSubsystemParams<Spawner, Job::RunArgs, Job::Metrics>,
_marker: std::marker::PhantomData<Job>,
}
impl<Job: JobTrait, Spawner> JobSubsystem<Job, Spawner> {
/// Create a new `JobSubsystem`.
pub fn new(spawner: Spawner, run_args: Job::RunArgs, metrics: Job::Metrics) -> Self {
Self {
spawner,
run_args,
metrics,
context: std::marker::PhantomData,
job: std::marker::PhantomData,
errors: None,
JobSubsystem {
params: JobSubsystemParams {
spawner,
run_args,
metrics,
},
_marker: std::marker::PhantomData,
}
}
/// Monitor errors which may occur during handling of a spawned job.
///
/// By default, an error in a job is simply logged. Once this is called,
/// the error is forwarded onto the provided channel.
///
/// Errors if the error channel already exists.
pub fn forward_errors(
&mut self,
tx: mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>,
) -> Result<(), Error> {
if self.errors.is_some() {
return Err(Error::AlreadyForwarding);
}
self.errors = Some(tx);
Ok(())
}
/// Run the subsystem to completion.
pub async fn run<Context>(self, mut ctx: Context)
where
Spawner: SpawnNamed + Send + Clone + Unpin + 'static,
Context: SubsystemContext,
Job: 'static + JobTrait + Send,
Job::RunArgs: Clone + Sync,
Job::ToJob: From<<Context as SubsystemContext>::Message> + Sync,
Job::Metrics: Sync,
{
let JobSubsystem {
params: JobSubsystemParams {
spawner,
run_args,
metrics,
},
..
} = self;
/// Run this subsystem
///
/// Conceptually, this is very simple: it just loops forever.
///
/// - On incoming overseer messages, it starts or stops jobs as appropriate.
/// - On other incoming messages, if they can be converted into Job::ToJob and
/// include a hash, then they're forwarded to the appropriate individual job.
/// - On outgoing messages from the jobs, it forwards them to the overseer.
///
/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
/// Otherwise, most are logged and then discarded.
pub async fn run(
mut ctx: Context,
run_args: Job::RunArgs,
metrics: Job::Metrics,
spawner: Spawner,
mut err_tx: Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
) {
let mut jobs = Jobs::new(spawner.clone());
if let Some(ref err_tx) = err_tx {
jobs.forward_errors(err_tx.clone())
.expect("we never call this twice in this context; qed");
}
let mut jobs = Jobs::new(spawner);
loop {
select! {
incoming = ctx.recv().fuse() =>
if Self::handle_incoming(
incoming,
&mut jobs,
&run_args,
&metrics,
&mut err_tx,
).await {
break
},
incoming = ctx.recv().fuse() => {
match incoming {
Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated,
deactivated,
}))) => {
for activated in activated {
let sender: Context::Sender = ctx.sender().clone();
jobs.spawn_job::<Job, _>(
activated.hash,
activated.span,
run_args.clone(),
metrics.clone(),
sender,
)
}
for hash in deactivated {
jobs.stop_job(hash).await;
}
}
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => {
jobs.running.clear();
break;
}
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(..))) => {}
Ok(FromOverseer::Communication { msg }) => {
if let Ok(to_job) = <Job::ToJob>::try_from(msg) {
jobs.send_msg(to_job.relay_parent(), to_job).await;
}
}
Err(err) => {
tracing::error!(
job = Job::NAME,
err = ?err,
"error receiving message from subsystem context for job",
);
break;
}
}
}
outgoing = jobs.next() => {
if let Err(e) = Self::handle_from_job(outgoing, &mut ctx).await {
let res = match outgoing.expect("the Jobs stream never ends; qed") {
FromJobCommand::Spawn(name, task) => ctx.spawn(name, task).await,
FromJobCommand::SpawnBlocking(name, task)
=> ctx.spawn_blocking(name, task).await,
};
if let Err(e) = res {
tracing::warn!(err = ?e, "failed to handle command from job");
}
}
@@ -741,97 +781,9 @@ where
}
}
}
/// Forward a given error to the higher context using the given error channel.
async fn fwd_err(
hash: Option<Hash>,
err: JobsError<Job::Error>,
err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
) {
if let Some(err_tx) = err_tx {
// if we can't send on the error transmission channel, we can't do anything useful about it
// still, we can at least log the failure
if let Err(e) = err_tx.send((hash, err)).await {
tracing::warn!(err = ?e, "failed to forward error");
}
}
}
/// Handle an incoming message.
///
/// Returns `true` when this job manager should shutdown.
async fn handle_incoming(
incoming: SubsystemResult<FromOverseer<Context::Message>>,
jobs: &mut Jobs<Spawner, Job>,
run_args: &Job::RunArgs,
metrics: &Job::Metrics,
err_tx: &mut Option<mpsc::Sender<(Option<Hash>, JobsError<Job::Error>)>>,
) -> bool {
use polkadot_node_subsystem::ActiveLeavesUpdate;
use polkadot_node_subsystem::FromOverseer::{Communication, Signal};
use polkadot_node_subsystem::OverseerSignal::{ActiveLeaves, BlockFinalized, Conclude};
match incoming {
Ok(Signal(ActiveLeaves(ActiveLeavesUpdate {
activated,
deactivated,
}))) => {
for activated in activated {
let metrics = metrics.clone();
if let Err(e) = jobs.spawn_job(activated.hash, activated.span, run_args.clone(), metrics) {
tracing::error!(
job = Job::NAME,
err = ?e,
"failed to spawn a job",
);
Self::fwd_err(Some(activated.hash), JobsError::Utility(e), err_tx).await;
return true;
}
}
for hash in deactivated {
jobs.stop_job(hash).await;
}
}
Ok(Signal(Conclude)) => {
jobs.running.clear();
return true;
}
Ok(Communication { msg }) => {
if let Ok(to_job) = <Job::ToJob>::try_from(msg) {
jobs.send_msg(to_job.relay_parent(), to_job).await;
}
}
Ok(Signal(BlockFinalized(..))) => {}
Err(err) => {
tracing::error!(
job = Job::NAME,
err = ?err,
"error receiving message from subsystem context for job",
);
Self::fwd_err(None, JobsError::Utility(Error::from(err)), err_tx).await;
return true;
}
}
false
}
// handle a command from a job.
async fn handle_from_job(
outgoing: Option<FromJobCommand>,
ctx: &mut Context,
) -> SubsystemResult<()> {
match outgoing.expect("the Jobs stream never ends; qed") {
FromJobCommand::SendMessage(msg) => ctx.send_message(msg).await,
FromJobCommand::Spawn(name, task) => ctx.spawn(name, task).await?,
FromJobCommand::SpawnBlocking(name, task) => ctx.spawn_blocking(name, task).await?,
}
Ok(())
}
}
impl<Spawner, Context, Job> Subsystem<Context> for JobManager<Spawner, Context, Job>
impl<Context, Job, Spawner> Subsystem<Context> for JobSubsystem<Job, Spawner>
where
Spawner: SpawnNamed + Send + Clone + Unpin + 'static,
Context: SubsystemContext,
@@ -841,13 +793,8 @@ where
Job::Metrics: Sync,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let spawner = self.spawner.clone();
let run_args = self.run_args.clone();
let metrics = self.metrics.clone();
let errors = self.errors;
let future = Box::pin(async move {
Self::run(ctx, run_args, metrics, spawner, errors).await;
self.run(ctx).await;
Ok(())
});
@@ -858,90 +805,6 @@ where
}
}
/// Create a delegated subsystem
///
/// It is possible to create a type which implements `Subsystem` by simply doing:
///
/// ```ignore
/// pub type ExampleSubsystem<Spawner, Context> = JobManager<Spawner, Context, ExampleJob>;
/// ```
///
/// However, doing this requires that job itself and all types which comprise it (i.e. `ToJob`, `FromJob`, `Error`, `RunArgs`)
/// are public, to avoid exposing private types in public interfaces. It's possible to delegate instead, which
/// can reduce the total number of public types exposed, i.e.
///
/// ```ignore
/// type Manager<Spawner, Context> = JobManager<Spawner, Context, ExampleJob>;
/// pub struct ExampleSubsystem {
/// manager: Manager<Spawner, Context>,
/// }
///
/// impl<Spawner, Context> Subsystem<Context> for ExampleSubsystem<Spawner, Context> { ... }
/// ```
///
/// This dramatically reduces the number of public types in the crate; the only things which must be public are now
///
/// - `struct ExampleSubsystem` (defined by this macro)
/// - `type ToJob` (because it appears in a trait bound)
/// - `type RunArgs` (because it appears in a function signature)
///
/// Implementing this all manually is of course possible, but it's tedious; why bother? This macro exists for
/// the purpose of doing it automatically:
///
/// ```ignore
/// delegated_subsystem!(ExampleJob(ExampleRunArgs) <- ExampleToJob as ExampleSubsystem);
/// ```
#[macro_export]
macro_rules! delegated_subsystem {
($job:ident($run_args:ty, $metrics:ty) <- $to_job:ty as $subsystem:ident) => {
delegated_subsystem!($job($run_args, $metrics) <- $to_job as $subsystem; stringify!($subsystem));
};
($job:ident($run_args:ty, $metrics:ty) <- $to_job:ty as $subsystem:ident; $subsystem_name:expr) => {
#[doc = "Manager type for the "]
#[doc = $subsystem_name]
type Manager<Spawner, Context> = $crate::JobManager<Spawner, Context, $job>;
#[doc = "An implementation of the "]
#[doc = $subsystem_name]
pub struct $subsystem<Spawner, Context> {
manager: Manager<Spawner, Context>,
}
impl<Spawner, Context> $subsystem<Spawner, Context>
where
Spawner: Clone + $crate::reexports::SpawnNamed + Send + Unpin,
Context: $crate::reexports::SubsystemContext,
$to_job: From<<Context as $crate::reexports::SubsystemContext>::Message>,
{
#[doc = "Creates a new "]
#[doc = $subsystem_name]
pub fn new(spawner: Spawner, run_args: $run_args, metrics: $metrics) -> Self {
$subsystem {
manager: $crate::JobManager::new(spawner, run_args, metrics)
}
}
/// Run this subsystem
#[tracing::instrument(skip(ctx, run_args, metrics, spawner), fields(subsystem = $subsystem_name))]
pub async fn run(ctx: Context, run_args: $run_args, metrics: $metrics, spawner: Spawner) {
<Manager<Spawner, Context>>::run(ctx, run_args, metrics, spawner, None).await
}
}
impl<Spawner, Context> $crate::reexports::Subsystem<Context> for $subsystem<Spawner, Context>
where
Spawner: $crate::reexports::SpawnNamed + Send + Clone + Unpin + 'static,
Context: $crate::reexports::SubsystemContext,
$to_job: From<<Context as $crate::reexports::SubsystemContext>::Message>,
{
fn start(self, ctx: Context) -> $crate::reexports::SpawnedSubsystem {
self.manager.start(ctx)
}
}
};
}
/// A future that wraps another future with a `Delay` allowing for time-limited futures.
#[pin_project]
pub struct Timeout<F: Future> {
@@ -1088,24 +951,22 @@ mod tests {
/// Run a job for the parent block indicated
//
// this function is in charge of creating and executing the job's main loop
fn run(
fn run<S: SubsystemSender>(
_: Hash,
_: Arc<jaeger::Span>,
run_args: Self::RunArgs,
_metrics: Self::Metrics,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
mut sender: mpsc::Sender<FromJobCommand>,
mut sender: JobSender<S>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let job = FakeCandidateSelectionJob { receiver };
if run_args {
sender.send(FromJobCommand::SendMessage(
CandidateSelectionMessage::Invalid(
Default::default(),
Default::default(),
).into(),
)).await?;
sender.send_message(CandidateSelectionMessage::Invalid(
Default::default(),
Default::default(),
).into()).await;
}
// it isn't necessary to break run_loop into its own function,
@@ -1132,15 +993,15 @@ mod tests {
}
// with the job defined, it's straightforward to get a subsystem implementation.
type FakeCandidateSelectionSubsystem<Spawner, Context> =
JobManager<Spawner, Context, FakeCandidateSelectionJob>;
type FakeCandidateSelectionSubsystem<Spawner> =
JobSubsystem<FakeCandidateSelectionJob, Spawner>;
// this type lets us pretend to be the overseer
type OverseerHandle = test_helpers::TestSubsystemContextHandle<CandidateSelectionMessage>;
fn test_harness<T: Future<Output = ()>>(
run_args: bool,
test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option<Hash>, JobsError<Error>)>) -> T,
test: impl FnOnce(OverseerHandle) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
@@ -1152,10 +1013,13 @@ mod tests {
let pool = sp_core::testing::TaskExecutor::new();
let (context, overseer_handle) = make_subsystem_context(pool.clone());
let (err_tx, err_rx) = mpsc::channel(16);
let subsystem = FakeCandidateSelectionSubsystem::run(context, run_args, (), pool, Some(err_tx));
let test_future = test(overseer_handle, err_rx);
let subsystem = FakeCandidateSelectionSubsystem::new(
pool,
run_args,
(),
).run(context);
let test_future = test(overseer_handle);
futures::pin_mut!(subsystem, test_future);
@@ -1171,7 +1035,7 @@ mod tests {
fn starting_and_stopping_job_works() {
let relay_parent: Hash = [0; 32].into();
test_harness(true, |mut overseer_handle, err_rx| async move {
test_harness(true, |mut overseer_handle| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
@@ -1194,9 +1058,6 @@ mod tests {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 0);
});
}
@@ -1204,7 +1065,7 @@ mod tests {
fn sending_to_a_non_running_job_do_not_stop_the_subsystem() {
let relay_parent = Hash::repeat_byte(0x01);
test_harness(true, |mut overseer_handle, err_rx| async move {
test_harness(true, |mut overseer_handle| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(ActivatedLeaf {
@@ -1231,9 +1092,6 @@ mod tests {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::Conclude))
.await;
let errs: Vec<_> = err_rx.collect().await;
assert_eq!(errs.len(), 0);
});
}