diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 710ce8d7b3..525ea78d9d 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -6389,6 +6389,7 @@ dependencies = [ "pdqselect", "rand 0.7.3", "rand_chacha 0.2.2", + "retain_mut", "sc-block-builder", "sc-client-api", "sc-consensus-epochs", @@ -6415,6 +6416,7 @@ dependencies = [ "sp-keyring", "sp-runtime", "sp-timestamp", + "sp-utils", "sp-version", "substrate-prometheus-endpoint", "substrate-test-runtime-client", diff --git a/substrate/client/consensus/babe/Cargo.toml b/substrate/client/consensus/babe/Cargo.toml index 6ac8ca165e..1b6b705139 100644 --- a/substrate/client/consensus/babe/Cargo.toml +++ b/substrate/client/consensus/babe/Cargo.toml @@ -37,6 +37,7 @@ sp-consensus-vrf = { version = "0.8.0-rc5", path = "../../../primitives/consensu sc-consensus-uncles = { version = "0.8.0-rc5", path = "../uncles" } sc-consensus-slots = { version = "0.8.0-rc5", path = "../slots" } sp-runtime = { version = "2.0.0-rc5", path = "../../../primitives/runtime" } +sp-utils = { version = "2.0.0-rc5", path = "../../../primitives/utils" } fork-tree = { version = "2.0.0-rc5", path = "../../../utils/fork-tree" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-rc5"} futures = "0.3.4" @@ -48,6 +49,7 @@ rand = "0.7.2" merlin = "2.0" pdqselect = "0.1.0" derive_more = "0.99.2" +retain_mut = "0.1.1" [dev-dependencies] sp-keyring = { version = "2.0.0-rc5", path = "../../../primitives/keyring" } diff --git a/substrate/client/consensus/babe/src/lib.rs b/substrate/client/consensus/babe/src/lib.rs index e6f880fe34..951d1467b4 100644 --- a/substrate/client/consensus/babe/src/lib.rs +++ b/substrate/client/consensus/babe/src/lib.rs @@ -106,6 +106,8 @@ use sc_client_api::{ BlockchainEvents, ProvideUncles, }; use sp_block_builder::BlockBuilder as BlockBuilderApi; +use futures::channel::mpsc::{channel, Sender, Receiver}; +use retain_mut::RetainMut; use futures::prelude::*; use log::{debug, info, log, trace, warn}; @@ -370,7 +372,7 @@ pub fn start_babe(BabeParams { babe_link, can_author_with, }: BabeParams) -> Result< - impl futures::Future, + BabeWorker, sp_consensus::Error, > where B: BlockT, @@ -378,16 +380,18 @@ pub fn start_babe(BabeParams { + HeaderBackend + HeaderMetadata + Send + Sync + 'static, C::Api: BabeApi, SC: SelectChain + 'static, - E: Environment + Send + Sync, + E: Environment + Send + Sync + 'static, E::Proposer: Proposer>, I: BlockImport> + Send + Sync + 'static, Error: std::error::Error + Send + From + From + 'static, - SO: SyncOracle + Send + Sync + Clone, - CAW: CanAuthorWith + Send, + SO: SyncOracle + Send + Sync + Clone + 'static, + CAW: CanAuthorWith + Send + 'static, { let config = babe_link.config; - let worker = BabeWorker { + let slot_notification_sinks = Arc::new(Mutex::new(Vec::new())); + + let worker = BabeSlotWorker { client: client.clone(), block_import: Arc::new(Mutex::new(block_import)), env, @@ -395,6 +399,7 @@ pub fn start_babe(BabeParams { force_authoring, keystore, epoch_changes: babe_link.epoch_changes.clone(), + slot_notification_sinks: slot_notification_sinks.clone(), config: config.clone(), }; @@ -406,7 +411,7 @@ pub fn start_babe(BabeParams { )?; info!(target: "babe", "👶 Starting BABE Authorship worker"); - Ok(sc_consensus_slots::start_slot_worker( + let inner = sc_consensus_slots::start_slot_worker( config.0, select_chain, worker, @@ -414,10 +419,49 @@ pub fn start_babe(BabeParams { inherent_data_providers, babe_link.time_source, can_author_with, - )) + ); + Ok(BabeWorker { + inner: Box::pin(inner), + slot_notification_sinks, + }) } -struct BabeWorker { +/// Worker for Babe which implements `Future`. This must be polled. +#[must_use] +pub struct BabeWorker { + inner: Pin + Send + 'static>>, + slot_notification_sinks: Arc, Epoch>)>>>>, +} + +impl BabeWorker { + /// Return an event stream of notifications for when new slot happens, and the corresponding + /// epoch descriptor. + pub fn slot_notification_stream( + &self + ) -> Receiver<(u64, ViableEpochDescriptor, Epoch>)> { + const CHANNEL_BUFFER_SIZE: usize = 1024; + + let (sink, stream) = channel(CHANNEL_BUFFER_SIZE); + self.slot_notification_sinks.lock().push(sink); + stream + } +} + +impl futures::Future for BabeWorker { + type Output = (); + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut futures::task::Context + ) -> futures::task::Poll { + self.inner.as_mut().poll(cx) + } +} + +/// Slot notification sinks. +type SlotNotificationSinks = Arc::Hash, NumberFor, Epoch>)>>>>; + +struct BabeSlotWorker { client: Arc, block_import: Arc>, env: E, @@ -425,10 +469,11 @@ struct BabeWorker { force_authoring: bool, keystore: KeyStorePtr, epoch_changes: SharedEpochChanges, + slot_notification_sinks: SlotNotificationSinks, config: Config, } -impl sc_consensus_slots::SimpleSlotWorker for BabeWorker where +impl sc_consensus_slots::SimpleSlotWorker for BabeSlotWorker where B: BlockT, C: ProvideRuntimeApi + ProvideCache + @@ -502,6 +547,28 @@ impl sc_consensus_slots::SimpleSlotWorker for BabeWork s } + fn notify_slot( + &self, + _parent_header: &B::Header, + slot_number: SlotNumber, + epoch_descriptor: &ViableEpochDescriptor, Epoch>, + ) { + self.slot_notification_sinks.lock() + .retain_mut(|sink| { + match sink.try_send((slot_number, epoch_descriptor.clone())) { + Ok(()) => true, + Err(e) => { + if e.is_full() { + warn!(target: "babe", "Trying to notify a slot but the channel is full"); + true + } else { + false + } + }, + } + }); + } + fn pre_digest_data( &self, _slot_number: u64, @@ -599,7 +666,7 @@ impl sc_consensus_slots::SimpleSlotWorker for BabeWork } } -impl SlotWorker for BabeWorker where +impl SlotWorker for BabeSlotWorker where B: BlockT, C: ProvideRuntimeApi + ProvideCache + diff --git a/substrate/client/consensus/slots/src/lib.rs b/substrate/client/consensus/slots/src/lib.rs index 7687d3114b..7d346ffe39 100644 --- a/substrate/client/consensus/slots/src/lib.rs +++ b/substrate/client/consensus/slots/src/lib.rs @@ -104,6 +104,15 @@ pub trait SimpleSlotWorker { epoch_data: &Self::EpochData, ) -> Option; + /// Notifies the given slot. Similar to `claim_slot`, but will be called no matter whether we + /// need to author blocks or not. + fn notify_slot( + &self, + _header: &B::Header, + _slot_number: u64, + _epoch_data: &Self::EpochData, + ) { } + /// Return the pre digest data to include in a block authored with the given claim. fn pre_digest_data( &self, @@ -191,6 +200,8 @@ pub trait SimpleSlotWorker { } }; + self.notify_slot(&chain_head, slot_number, &epoch_data); + let authorities_len = self.authorities_len(&epoch_data); if !self.force_authoring() &&