small improvements for parachains consensus (#2040)

* introduce a waiting period before selecting candidates and bitfields

* add network_bridge=debug tracing for rep

* change to 2.5s timeout in proposer

* pass timeout to proposer

* move timeout back to provisioner

* grumbles

* Update node/core/provisioner/src/lib.rs

* Fix nitpicks

* Fix bug

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
Robert Habermeier
2020-11-30 16:52:30 -05:00
committed by GitHub
parent 0f4395fc44
commit 414acdfc54
6 changed files with 140 additions and 68 deletions
+127 -61
View File
@@ -38,9 +38,45 @@ use polkadot_primitives::v1::{
};
use std::{pin::Pin, collections::BTreeMap};
use thiserror::Error;
use futures_timer::Delay;
/// How long to wait before proposing.
const PRE_PROPOSE_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(2000);
const LOG_TARGET: &str = "provisioner";
enum InherentAfter {
Ready,
Wait(Delay),
}
impl InherentAfter {
fn new_from_now() -> Self {
InherentAfter::Wait(Delay::new(PRE_PROPOSE_TIMEOUT))
}
fn is_ready(&self) -> bool {
match *self {
InherentAfter::Ready => true,
InherentAfter::Wait(_) => false,
}
}
async fn ready(&mut self) {
match *self {
InherentAfter::Ready => {
// Make sure we never end the returned future.
// This is required because the `select!` that calls this future will end in a busy loop.
futures::pending!()
},
InherentAfter::Wait(ref mut d) => {
d.await;
*self = InherentAfter::Ready;
},
}
}
}
struct ProvisioningJob {
relay_parent: Hash,
sender: mpsc::Sender<FromJobCommand>,
@@ -49,6 +85,8 @@ struct ProvisioningJob {
backed_candidates: Vec<BackedCandidate>,
signed_bitfields: Vec<SignedAvailabilityBitfield>,
metrics: Metrics,
inherent_after: InherentAfter,
awaiting_inherent: Vec<oneshot::Sender<ProvisionerInherentData>>
}
#[derive(Debug, Error)]
@@ -92,7 +130,12 @@ impl JobTrait for ProvisioningJob {
sender: mpsc::Sender<FromJobCommand>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let job = ProvisioningJob::new(relay_parent, metrics, sender, receiver);
let job = ProvisioningJob::new(
relay_parent,
metrics,
sender,
receiver,
);
// it isn't necessary to break run_loop into its own function,
// but it's convenient to separate the concerns in this way
@@ -117,6 +160,8 @@ impl ProvisioningJob {
backed_candidates: Vec::new(),
signed_bitfields: Vec::new(),
metrics,
inherent_after: InherentAfter::new_from_now(),
awaiting_inherent: Vec::new(),
}
}
@@ -126,70 +171,89 @@ impl ProvisioningJob {
};
loop {
match self.receiver.next().await {
Some(RequestInherentData(_, return_sender)) => {
let _timer = self.metrics.time_request_inherent_data();
futures::select! {
msg = self.receiver.next().fuse() => match msg {
Some(RequestInherentData(_, return_sender)) => {
let _timer = self.metrics.time_request_inherent_data();
if let Err(err) = send_inherent_data(
self.relay_parent,
&self.signed_bitfields,
&self.backed_candidates,
return_sender,
self.sender.clone(),
)
.await
{
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data");
self.metrics.on_inherent_data_request(Err(()));
} else {
self.metrics.on_inherent_data_request(Ok(()));
}
}
Some(RequestBlockAuthorshipData(_, sender)) => {
self.provisionable_data_channels.push(sender)
}
Some(ProvisionableData(_, data)) => {
let _timer = self.metrics.time_provisionable_data();
let mut bad_indices = Vec::new();
for (idx, channel) in self.provisionable_data_channels.iter_mut().enumerate() {
match channel.send(data.clone()).await {
Ok(_) => {}
Err(_) => bad_indices.push(idx),
if self.inherent_after.is_ready() {
self.send_inherent_data(vec![return_sender]).await;
} else {
self.awaiting_inherent.push(return_sender);
}
}
self.note_provisionable_data(data);
Some(RequestBlockAuthorshipData(_, sender)) => {
self.provisionable_data_channels.push(sender)
}
Some(ProvisionableData(_, data)) => {
let _timer = self.metrics.time_provisionable_data();
// clean up our list of channels by removing the bad indices
// start by reversing it for efficient pop
bad_indices.reverse();
// Vec::retain would be nicer here, but it doesn't provide
// an easy API for retaining by index, so we re-collect instead.
self.provisionable_data_channels = self
.provisionable_data_channels
.into_iter()
.enumerate()
.filter(|(idx, _)| {
if bad_indices.is_empty() {
return true;
let mut bad_indices = Vec::new();
for (idx, channel) in self.provisionable_data_channels.iter_mut().enumerate() {
match channel.send(data.clone()).await {
Ok(_) => {}
Err(_) => bad_indices.push(idx),
}
let tail = bad_indices[bad_indices.len() - 1];
let retain = *idx != tail;
if *idx >= tail {
let _ = bad_indices.pop();
}
retain
})
.map(|(_, item)| item)
.collect();
}
self.note_provisionable_data(data);
// clean up our list of channels by removing the bad indices
// start by reversing it for efficient pop
bad_indices.reverse();
// Vec::retain would be nicer here, but it doesn't provide
// an easy API for retaining by index, so we re-collect instead.
self.provisionable_data_channels = self
.provisionable_data_channels
.into_iter()
.enumerate()
.filter(|(idx, _)| {
if bad_indices.is_empty() {
return true;
}
let tail = bad_indices[bad_indices.len() - 1];
let retain = *idx != tail;
if *idx >= tail {
let _ = bad_indices.pop();
}
retain
})
.map(|(_, item)| item)
.collect();
}
None => break,
},
_ = self.inherent_after.ready().fuse() => {
let return_senders = std::mem::take(&mut self.awaiting_inherent);
if !return_senders.is_empty() {
self.send_inherent_data(return_senders).await;
}
}
None => break,
}
}
Ok(())
}
async fn send_inherent_data(
&mut self,
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
) {
if let Err(err) = send_inherent_data(
self.relay_parent,
&self.signed_bitfields,
&self.backed_candidates,
return_senders,
&mut self.sender,
)
.await
{
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data");
self.metrics.on_inherent_data_request(Err(()));
} else {
self.metrics.on_inherent_data_request(Ok(()));
}
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn note_provisionable_data(&mut self, provisionable_data: ProvisionableData) {
match provisionable_data {
@@ -223,15 +287,15 @@ type CoreAvailability = BitVec<bitvec::order::Lsb0, u8>;
/// When we're choosing bitfields to include, the rule should be simple:
/// maximize availability. So basically, include all bitfields. And then
/// choose a coherent set of candidates along with that.
#[tracing::instrument(level = "trace", skip(return_sender, from_job), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(return_senders, from_job), fields(subsystem = LOG_TARGET))]
async fn send_inherent_data(
relay_parent: Hash,
bitfields: &[SignedAvailabilityBitfield],
candidates: &[BackedCandidate],
return_sender: oneshot::Sender<ProvisionerInherentData>,
mut from_job: mpsc::Sender<FromJobCommand>,
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
from_job: &mut mpsc::Sender<FromJobCommand>,
) -> Result<(), Error> {
let availability_cores = request_availability_cores(relay_parent, &mut from_job)
let availability_cores = request_availability_cores(relay_parent, from_job)
.await?
.await??;
@@ -241,13 +305,15 @@ async fn send_inherent_data(
&bitfields,
candidates,
relay_parent,
&mut from_job,
from_job,
)
.await?;
return_sender
.send((bitfields, candidates))
.map_err(|_data| Error::InherentDataReturnChannel)?;
let res = (bitfields, candidates);
for return_sender in return_senders {
return_sender.send(res.clone()).map_err(|_data| Error::InherentDataReturnChannel)?;
}
Ok(())
}