From 0c72a8767bb621b1d9189f67f8726db2e5a7987c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 4 Mar 2021 13:01:18 -0600 Subject: [PATCH] babe: introduce a request-answering mechanic (#7833) * babe: introduce a request-answering mechanic * gromble * send method --- substrate/client/consensus/babe/src/lib.rs | 90 +++++++++++++++++++++- 1 file changed, 87 insertions(+), 3 deletions(-) diff --git a/substrate/client/consensus/babe/src/lib.rs b/substrate/client/consensus/babe/src/lib.rs index a8e533d2a8..5622df48db 100644 --- a/substrate/client/consensus/babe/src/lib.rs +++ b/substrate/client/consensus/babe/src/lib.rs @@ -102,6 +102,7 @@ use sc_client_api::{ }; use sp_block_builder::BlockBuilder as BlockBuilderApi; use futures::channel::mpsc::{channel, Sender, Receiver}; +use futures::channel::oneshot; use retain_mut::RetainMut; use futures::prelude::*; @@ -426,6 +427,8 @@ pub fn start_babe(BabeParams { CAW: CanAuthorWith + Send + 'static, BS: BackoffAuthoringBlocksStrategy> + Send + 'static, { + const HANDLE_BUFFER_SIZE: usize = 1024; + let config = babe_link.config; let slot_notification_sinks = Arc::new(Mutex::new(Vec::new())); @@ -444,14 +447,14 @@ pub fn start_babe(BabeParams { register_babe_inherent_data_provider(&inherent_data_providers, config.slot_duration())?; sc_consensus_uncles::register_uncles_inherent_data_provider( - client, + client.clone(), select_chain.clone(), &inherent_data_providers, )?; info!(target: "babe", "👶 Starting BABE Authorship worker"); let inner = sc_consensus_slots::start_slot_worker( - config.0, + config.0.clone(), select_chain, worker, sync_oracle, @@ -459,17 +462,93 @@ pub fn start_babe(BabeParams { babe_link.time_source, can_author_with, ); + + let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE); + + let answer_requests = answer_requests(worker_rx, config.0, client, babe_link.epoch_changes.clone()); Ok(BabeWorker { - inner: Box::pin(inner), + inner: Box::pin(future::join(inner, answer_requests).map(|_| ())), slot_notification_sinks, + handle: BabeWorkerHandle(worker_tx), }) } +async fn answer_requests( + mut request_rx: Receiver>, + genesis_config: sc_consensus_slots::SlotDuration, + client: Arc, + epoch_changes: SharedEpochChanges, +) + where C: ProvideRuntimeApi + ProvideCache + ProvideUncles + BlockchainEvents + + HeaderBackend + HeaderMetadata + Send + Sync + 'static, +{ + while let Some(request) = request_rx.next().await { + match request { + BabeRequest::EpochForChild(parent_hash, parent_number, slot_number, response) => { + let lookup = || { + let epoch_changes = epoch_changes.lock(); + let epoch_descriptor = epoch_changes.epoch_descriptor_for_child_of( + descendent_query(&*client), + &parent_hash, + parent_number, + slot_number, + ) + .map_err(|e| Error::::ForkTree(Box::new(e)))? + .ok_or_else(|| Error::::FetchEpoch(parent_hash))?; + + let viable_epoch = epoch_changes.viable_epoch( + &epoch_descriptor, + |slot| Epoch::genesis(&genesis_config, slot) + ).ok_or_else(|| Error::::FetchEpoch(parent_hash))?; + + Ok(sp_consensus_babe::Epoch { + epoch_index: viable_epoch.as_ref().epoch_index, + start_slot: viable_epoch.as_ref().start_slot, + duration: viable_epoch.as_ref().duration, + authorities: viable_epoch.as_ref().authorities.clone(), + randomness: viable_epoch.as_ref().randomness, + }) + }; + + let _ = response.send(lookup()); + } + } + } +} + +/// Requests to the BABE service. +#[non_exhaustive] +pub enum BabeRequest { + /// Request the epoch that a child of the given block, with the given slot number would have. + /// + /// The parent block is identified by its hash and number. + EpochForChild( + B::Hash, + NumberFor, + Slot, + oneshot::Sender>>, + ), +} + +/// A handle to the BABE worker for issuing requests. +#[derive(Clone)] +pub struct BabeWorkerHandle(Sender>); + +impl BabeWorkerHandle { + /// Send a request to the BABE service. + pub async fn send(&mut self, request: BabeRequest) { + // Failure to send means that the service is down. + // This will manifest as the receiver of the request being dropped. + let _ = self.0.send(request).await; + } +} + /// Worker for Babe which implements `Future`. This must be polled. #[must_use] pub struct BabeWorker { inner: Pin + Send + 'static>>, slot_notification_sinks: SlotNotificationSinks, + handle: BabeWorkerHandle, } impl BabeWorker { @@ -484,6 +563,11 @@ impl BabeWorker { self.slot_notification_sinks.lock().push(sink); stream } + + /// Get a handle to the worker. + pub fn handle(&self) -> BabeWorkerHandle { + self.handle.clone() + } } impl futures::Future for BabeWorker {