babe: introduce a request-answering mechanic (#7833)

* babe: introduce a request-answering mechanic

* gromble

* send method
This commit is contained in:
Robert Habermeier
2021-03-04 13:01:18 -06:00
committed by GitHub
parent ddbdfc9e57
commit 0c72a8767b
+87 -3
View File
@@ -102,6 +102,7 @@ use sc_client_api::{
};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use futures::channel::mpsc::{channel, Sender, Receiver};
use futures::channel::oneshot;
use retain_mut::RetainMut;
use futures::prelude::*;
@@ -426,6 +427,8 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error>(BabeParams {
CAW: CanAuthorWith<B> + Send + 'static,
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + 'static,
{
const HANDLE_BUFFER_SIZE: usize = 1024;
let config = babe_link.config;
let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
@@ -444,14 +447,14 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error>(BabeParams {
register_babe_inherent_data_provider(&inherent_data_providers, config.slot_duration())?;
sc_consensus_uncles::register_uncles_inherent_data_provider(
client,
client.clone(),
select_chain.clone(),
&inherent_data_providers,
)?;
info!(target: "babe", "👶 Starting BABE Authorship worker");
let inner = sc_consensus_slots::start_slot_worker(
config.0,
config.0.clone(),
select_chain,
worker,
sync_oracle,
@@ -459,17 +462,93 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, BS, Error>(BabeParams {
babe_link.time_source,
can_author_with,
);
let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
let answer_requests = answer_requests(worker_rx, config.0, client, babe_link.epoch_changes.clone());
Ok(BabeWorker {
inner: Box::pin(inner),
inner: Box::pin(future::join(inner, answer_requests).map(|_| ())),
slot_notification_sinks,
handle: BabeWorkerHandle(worker_tx),
})
}
async fn answer_requests<B: BlockT, C>(
mut request_rx: Receiver<BabeRequest<B>>,
genesis_config: sc_consensus_slots::SlotDuration<BabeGenesisConfiguration>,
client: Arc<C>,
epoch_changes: SharedEpochChanges<B, Epoch>,
)
where C: ProvideRuntimeApi<B> + ProvideCache<B> + ProvideUncles<B> + BlockchainEvents<B>
+ HeaderBackend<B> + HeaderMetadata<B, Error = ClientError> + Send + Sync + 'static,
{
while let Some(request) = request_rx.next().await {
match request {
BabeRequest::EpochForChild(parent_hash, parent_number, slot_number, response) => {
let lookup = || {
let epoch_changes = epoch_changes.lock();
let epoch_descriptor = epoch_changes.epoch_descriptor_for_child_of(
descendent_query(&*client),
&parent_hash,
parent_number,
slot_number,
)
.map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
.ok_or_else(|| Error::<B>::FetchEpoch(parent_hash))?;
let viable_epoch = epoch_changes.viable_epoch(
&epoch_descriptor,
|slot| Epoch::genesis(&genesis_config, slot)
).ok_or_else(|| Error::<B>::FetchEpoch(parent_hash))?;
Ok(sp_consensus_babe::Epoch {
epoch_index: viable_epoch.as_ref().epoch_index,
start_slot: viable_epoch.as_ref().start_slot,
duration: viable_epoch.as_ref().duration,
authorities: viable_epoch.as_ref().authorities.clone(),
randomness: viable_epoch.as_ref().randomness,
})
};
let _ = response.send(lookup());
}
}
}
}
/// Requests to the BABE service.
#[non_exhaustive]
pub enum BabeRequest<B: BlockT> {
/// Request the epoch that a child of the given block, with the given slot number would have.
///
/// The parent block is identified by its hash and number.
EpochForChild(
B::Hash,
NumberFor<B>,
Slot,
oneshot::Sender<Result<sp_consensus_babe::Epoch, Error<B>>>,
),
}
/// A handle to the BABE worker for issuing requests.
#[derive(Clone)]
pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);
impl<B: BlockT> BabeWorkerHandle<B> {
/// Send a request to the BABE service.
pub async fn send(&mut self, request: BabeRequest<B>) {
// Failure to send means that the service is down.
// This will manifest as the receiver of the request being dropped.
let _ = self.0.send(request).await;
}
}
/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
#[must_use]
pub struct BabeWorker<B: BlockT> {
inner: Pin<Box<dyn futures::Future<Output=()> + Send + 'static>>,
slot_notification_sinks: SlotNotificationSinks<B>,
handle: BabeWorkerHandle<B>,
}
impl<B: BlockT> BabeWorker<B> {
@@ -484,6 +563,11 @@ impl<B: BlockT> BabeWorker<B> {
self.slot_notification_sinks.lock().push(sink);
stream
}
/// Get a handle to the worker.
pub fn handle(&self) -> BabeWorkerHandle<B> {
self.handle.clone()
}
}
impl<B: BlockT> futures::Future for BabeWorker<B> {