diff --git a/polkadot/node/core/proposer/src/lib.rs b/polkadot/node/core/proposer/src/lib.rs index 48971ad074..e5b4c61341 100644 --- a/polkadot/node/core/proposer/src/lib.rs +++ b/polkadot/node/core/proposer/src/lib.rs @@ -37,7 +37,7 @@ use prometheus_endpoint::Registry as PrometheusRegistry; use std::{fmt, pin::Pin, sync::Arc, time}; /// How long proposal can take before we give up and err out -const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_secs(2); +const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_millis(2500); /// Custom Proposer factory for Polkadot pub struct ProposerFactory { diff --git a/polkadot/node/core/provisioner/Cargo.toml b/polkadot/node/core/provisioner/Cargo.toml index c5fccfd43b..57034696fb 100644 --- a/polkadot/node/core/provisioner/Cargo.toml +++ b/polkadot/node/core/provisioner/Cargo.toml @@ -13,8 +13,8 @@ thiserror = "1.0.22" polkadot-primitives = { path = "../../../primitives" } polkadot-node-subsystem = { path = "../../subsystem" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } +futures-timer = "3.0.2" [dev-dependencies] sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } -futures-timer = "3.0.2" diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index c013e11b8f..560764453a 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -38,9 +38,45 @@ use polkadot_primitives::v1::{ }; use std::{pin::Pin, collections::BTreeMap}; use thiserror::Error; +use futures_timer::Delay; + +/// How long to wait before proposing. +const PRE_PROPOSE_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(2000); const LOG_TARGET: &str = "provisioner"; +enum InherentAfter { + Ready, + Wait(Delay), +} + +impl InherentAfter { + fn new_from_now() -> Self { + InherentAfter::Wait(Delay::new(PRE_PROPOSE_TIMEOUT)) + } + + fn is_ready(&self) -> bool { + match *self { + InherentAfter::Ready => true, + InherentAfter::Wait(_) => false, + } + } + + async fn ready(&mut self) { + match *self { + InherentAfter::Ready => { + // Make sure we never end the returned future. + // This is required because the `select!` that calls this future will end in a busy loop. + futures::pending!() + }, + InherentAfter::Wait(ref mut d) => { + d.await; + *self = InherentAfter::Ready; + }, + } + } +} + struct ProvisioningJob { relay_parent: Hash, sender: mpsc::Sender, @@ -49,6 +85,8 @@ struct ProvisioningJob { backed_candidates: Vec, signed_bitfields: Vec, metrics: Metrics, + inherent_after: InherentAfter, + awaiting_inherent: Vec> } #[derive(Debug, Error)] @@ -92,7 +130,12 @@ impl JobTrait for ProvisioningJob { sender: mpsc::Sender, ) -> Pin> + Send>> { async move { - let job = ProvisioningJob::new(relay_parent, metrics, sender, receiver); + let job = ProvisioningJob::new( + relay_parent, + metrics, + sender, + receiver, + ); // it isn't necessary to break run_loop into its own function, // but it's convenient to separate the concerns in this way @@ -117,6 +160,8 @@ impl ProvisioningJob { backed_candidates: Vec::new(), signed_bitfields: Vec::new(), metrics, + inherent_after: InherentAfter::new_from_now(), + awaiting_inherent: Vec::new(), } } @@ -126,70 +171,89 @@ impl ProvisioningJob { }; loop { - match self.receiver.next().await { - Some(RequestInherentData(_, return_sender)) => { - let _timer = self.metrics.time_request_inherent_data(); + futures::select! { + msg = self.receiver.next().fuse() => match msg { + Some(RequestInherentData(_, return_sender)) => { + let _timer = self.metrics.time_request_inherent_data(); - if let Err(err) = send_inherent_data( - self.relay_parent, - &self.signed_bitfields, - &self.backed_candidates, - return_sender, - self.sender.clone(), - ) - .await - { - tracing::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data"); - self.metrics.on_inherent_data_request(Err(())); - } else { - self.metrics.on_inherent_data_request(Ok(())); - } - } - Some(RequestBlockAuthorshipData(_, sender)) => { - self.provisionable_data_channels.push(sender) - } - Some(ProvisionableData(_, data)) => { - let _timer = self.metrics.time_provisionable_data(); - - let mut bad_indices = Vec::new(); - for (idx, channel) in self.provisionable_data_channels.iter_mut().enumerate() { - match channel.send(data.clone()).await { - Ok(_) => {} - Err(_) => bad_indices.push(idx), + if self.inherent_after.is_ready() { + self.send_inherent_data(vec![return_sender]).await; + } else { + self.awaiting_inherent.push(return_sender); } } - self.note_provisionable_data(data); + Some(RequestBlockAuthorshipData(_, sender)) => { + self.provisionable_data_channels.push(sender) + } + Some(ProvisionableData(_, data)) => { + let _timer = self.metrics.time_provisionable_data(); - // clean up our list of channels by removing the bad indices - // start by reversing it for efficient pop - bad_indices.reverse(); - // Vec::retain would be nicer here, but it doesn't provide - // an easy API for retaining by index, so we re-collect instead. - self.provisionable_data_channels = self - .provisionable_data_channels - .into_iter() - .enumerate() - .filter(|(idx, _)| { - if bad_indices.is_empty() { - return true; + let mut bad_indices = Vec::new(); + for (idx, channel) in self.provisionable_data_channels.iter_mut().enumerate() { + match channel.send(data.clone()).await { + Ok(_) => {} + Err(_) => bad_indices.push(idx), } - let tail = bad_indices[bad_indices.len() - 1]; - let retain = *idx != tail; - if *idx >= tail { - let _ = bad_indices.pop(); - } - retain - }) - .map(|(_, item)| item) - .collect(); + } + self.note_provisionable_data(data); + + // clean up our list of channels by removing the bad indices + // start by reversing it for efficient pop + bad_indices.reverse(); + // Vec::retain would be nicer here, but it doesn't provide + // an easy API for retaining by index, so we re-collect instead. + self.provisionable_data_channels = self + .provisionable_data_channels + .into_iter() + .enumerate() + .filter(|(idx, _)| { + if bad_indices.is_empty() { + return true; + } + let tail = bad_indices[bad_indices.len() - 1]; + let retain = *idx != tail; + if *idx >= tail { + let _ = bad_indices.pop(); + } + retain + }) + .map(|(_, item)| item) + .collect(); + } + None => break, + }, + _ = self.inherent_after.ready().fuse() => { + let return_senders = std::mem::take(&mut self.awaiting_inherent); + if !return_senders.is_empty() { + self.send_inherent_data(return_senders).await; + } } - None => break, } } Ok(()) } + async fn send_inherent_data( + &mut self, + return_senders: Vec>, + ) { + if let Err(err) = send_inherent_data( + self.relay_parent, + &self.signed_bitfields, + &self.backed_candidates, + return_senders, + &mut self.sender, + ) + .await + { + tracing::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data"); + self.metrics.on_inherent_data_request(Err(())); + } else { + self.metrics.on_inherent_data_request(Ok(())); + } + } + #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] fn note_provisionable_data(&mut self, provisionable_data: ProvisionableData) { match provisionable_data { @@ -223,15 +287,15 @@ type CoreAvailability = BitVec; /// When we're choosing bitfields to include, the rule should be simple: /// maximize availability. So basically, include all bitfields. And then /// choose a coherent set of candidates along with that. -#[tracing::instrument(level = "trace", skip(return_sender, from_job), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(return_senders, from_job), fields(subsystem = LOG_TARGET))] async fn send_inherent_data( relay_parent: Hash, bitfields: &[SignedAvailabilityBitfield], candidates: &[BackedCandidate], - return_sender: oneshot::Sender, - mut from_job: mpsc::Sender, + return_senders: Vec>, + from_job: &mut mpsc::Sender, ) -> Result<(), Error> { - let availability_cores = request_availability_cores(relay_parent, &mut from_job) + let availability_cores = request_availability_cores(relay_parent, from_job) .await? .await??; @@ -241,13 +305,15 @@ async fn send_inherent_data( &bitfields, candidates, relay_parent, - &mut from_job, + from_job, ) .await?; - return_sender - .send((bitfields, candidates)) - .map_err(|_data| Error::InherentDataReturnChannel)?; + let res = (bitfields, candidates); + for return_sender in return_senders { + return_sender.send(res.clone()).map_err(|_data| Error::InherentDataReturnChannel)?; + } + Ok(()) } diff --git a/polkadot/node/core/runtime-api/src/lib.rs b/polkadot/node/core/runtime-api/src/lib.rs index 38b78648f1..16fd080ff2 100644 --- a/polkadot/node/core/runtime-api/src/lib.rs +++ b/polkadot/node/core/runtime-api/src/lib.rs @@ -243,7 +243,7 @@ mod tests { sp_api::mock_impl_runtime_apis! { impl ParachainHost for MockRuntimeApi { - type Error = String; + type Error = sp_api::ApiError; fn validators(&self) -> Vec { self.validators.clone() diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 7e05595d0b..0253d3c137 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -136,6 +136,7 @@ impl Network for Arc> { sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed() } + #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] fn action_sink<'a>(&'a mut self) -> Pin + Send + 'a>> { @@ -153,10 +154,13 @@ impl Network for Arc> { fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> { match action { - NetworkAction::ReputationChange(peer, cost_benefit) => self.0.report_peer( - peer, - cost_benefit, - ), + NetworkAction::ReputationChange(peer, cost_benefit) => { + tracing::debug!("reputation: {:?} for {}", cost_benefit, peer); + self.0.report_peer( + peer, + cost_benefit, + ) + } NetworkAction::WriteNotification(peer, peer_set, message) => { match peer_set { PeerSet::Validation => self.0.write_notification( diff --git a/polkadot/node/test/service/tests/build-blocks.rs b/polkadot/node/test/service/tests/build-blocks.rs index fa03e04228..777f266a03 100644 --- a/polkadot/node/test/service/tests/build-blocks.rs +++ b/polkadot/node/test/service/tests/build-blocks.rs @@ -21,6 +21,8 @@ use sp_keyring::Sr25519Keyring; #[substrate_test_utils::test] async fn ensure_test_service_build_blocks(task_executor: TaskExecutor) { + sc_cli::init_logger("", Default::default(), None).expect("Sets up logger"); + let mut alice = run_validator_node( task_executor.clone(), Sr25519Keyring::Alice,