BABE slot and epoch event notifications (#6563)

* BabeWorker -> BabeSlotWorker

* SlotWorker::notify_slot: similar to claim_slot, but called no matter authoring

* Wrap the future with a new struct BabeWorker

* Add type definition slot_notification_sinks

* Function slot_notification_streams for the receiver side

* Get a handle of slot_notification_sinks in BabeSlotWorker

* Implement notify_slot

* Switch to use bounded mpsc

* Do not drop the sink when channel is full

Only skip sending the message and emit a warning, because it is recoverable.

* Fix future type bounds

* Add must_use and sink type alias
This commit is contained in:
Wei Tang
2020-07-30 17:07:27 +02:00
committed by GitHub
parent 9ac30f7116
commit b6dedd9016
4 changed files with 92 additions and 10 deletions
+2
View File
@@ -6389,6 +6389,7 @@ dependencies = [
"pdqselect",
"rand 0.7.3",
"rand_chacha 0.2.2",
"retain_mut",
"sc-block-builder",
"sc-client-api",
"sc-consensus-epochs",
@@ -6415,6 +6416,7 @@ dependencies = [
"sp-keyring",
"sp-runtime",
"sp-timestamp",
"sp-utils",
"sp-version",
"substrate-prometheus-endpoint",
"substrate-test-runtime-client",
@@ -37,6 +37,7 @@ sp-consensus-vrf = { version = "0.8.0-rc5", path = "../../../primitives/consensu
sc-consensus-uncles = { version = "0.8.0-rc5", path = "../uncles" }
sc-consensus-slots = { version = "0.8.0-rc5", path = "../slots" }
sp-runtime = { version = "2.0.0-rc5", path = "../../../primitives/runtime" }
sp-utils = { version = "2.0.0-rc5", path = "../../../primitives/utils" }
fork-tree = { version = "2.0.0-rc5", path = "../../../utils/fork-tree" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.8.0-rc5"}
futures = "0.3.4"
@@ -48,6 +49,7 @@ rand = "0.7.2"
merlin = "2.0"
pdqselect = "0.1.0"
derive_more = "0.99.2"
retain_mut = "0.1.1"
[dev-dependencies]
sp-keyring = { version = "2.0.0-rc5", path = "../../../primitives/keyring" }
+77 -10
View File
@@ -106,6 +106,8 @@ use sc_client_api::{
BlockchainEvents, ProvideUncles,
};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use futures::channel::mpsc::{channel, Sender, Receiver};
use retain_mut::RetainMut;
use futures::prelude::*;
use log::{debug, info, log, trace, warn};
@@ -370,7 +372,7 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, Error>(BabeParams {
babe_link,
can_author_with,
}: BabeParams<B, C, E, I, SO, SC, CAW>) -> Result<
impl futures::Future<Output=()>,
BabeWorker<B>,
sp_consensus::Error,
> where
B: BlockT,
@@ -378,16 +380,18 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, Error>(BabeParams {
+ HeaderBackend<B> + HeaderMetadata<B, Error = ClientError> + Send + Sync + 'static,
C::Api: BabeApi<B>,
SC: SelectChain<B> + 'static,
E: Environment<B, Error = Error> + Send + Sync,
E: Environment<B, Error = Error> + Send + Sync + 'static,
E::Proposer: Proposer<B, Error = Error, Transaction = sp_api::TransactionFor<C, B>>,
I: BlockImport<B, Error = ConsensusError, Transaction = sp_api::TransactionFor<C, B>> + Send
+ Sync + 'static,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
SO: SyncOracle + Send + Sync + Clone,
CAW: CanAuthorWith<B> + Send,
SO: SyncOracle + Send + Sync + Clone + 'static,
CAW: CanAuthorWith<B> + Send + 'static,
{
let config = babe_link.config;
let worker = BabeWorker {
let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
let worker = BabeSlotWorker {
client: client.clone(),
block_import: Arc::new(Mutex::new(block_import)),
env,
@@ -395,6 +399,7 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, Error>(BabeParams {
force_authoring,
keystore,
epoch_changes: babe_link.epoch_changes.clone(),
slot_notification_sinks: slot_notification_sinks.clone(),
config: config.clone(),
};
@@ -406,7 +411,7 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, Error>(BabeParams {
)?;
info!(target: "babe", "👶 Starting BABE Authorship worker");
Ok(sc_consensus_slots::start_slot_worker(
let inner = sc_consensus_slots::start_slot_worker(
config.0,
select_chain,
worker,
@@ -414,10 +419,49 @@ pub fn start_babe<B, C, SC, E, I, SO, CAW, Error>(BabeParams {
inherent_data_providers,
babe_link.time_source,
can_author_with,
))
);
Ok(BabeWorker {
inner: Box::pin(inner),
slot_notification_sinks,
})
}
struct BabeWorker<B: BlockT, C, E, I, SO> {
/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
#[must_use]
pub struct BabeWorker<B: BlockT> {
inner: Pin<Box<dyn futures::Future<Output=()> + Send + 'static>>,
slot_notification_sinks: Arc<Mutex<Vec<Sender<(u64, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)>>>>,
}
impl<B: BlockT> BabeWorker<B> {
/// Return an event stream of notifications for when new slot happens, and the corresponding
/// epoch descriptor.
pub fn slot_notification_stream(
&self
) -> Receiver<(u64, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
const CHANNEL_BUFFER_SIZE: usize = 1024;
let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
self.slot_notification_sinks.lock().push(sink);
stream
}
}
impl<B: BlockT> futures::Future for BabeWorker<B> {
type Output = ();
fn poll(
mut self: Pin<&mut Self>,
cx: &mut futures::task::Context
) -> futures::task::Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}
/// Slot notification sinks.
type SlotNotificationSinks<B> = Arc<Mutex<Vec<Sender<(u64, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>>;
struct BabeSlotWorker<B: BlockT, C, E, I, SO> {
client: Arc<C>,
block_import: Arc<Mutex<I>>,
env: E,
@@ -425,10 +469,11 @@ struct BabeWorker<B: BlockT, C, E, I, SO> {
force_authoring: bool,
keystore: KeyStorePtr,
epoch_changes: SharedEpochChanges<B, Epoch>,
slot_notification_sinks: SlotNotificationSinks<B>,
config: Config,
}
impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWorker<B, C, E, I, SO> where
impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeSlotWorker<B, C, E, I, SO> where
B: BlockT,
C: ProvideRuntimeApi<B> +
ProvideCache<B> +
@@ -502,6 +547,28 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
s
}
fn notify_slot(
&self,
_parent_header: &B::Header,
slot_number: SlotNumber,
epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
) {
self.slot_notification_sinks.lock()
.retain_mut(|sink| {
match sink.try_send((slot_number, epoch_descriptor.clone())) {
Ok(()) => true,
Err(e) => {
if e.is_full() {
warn!(target: "babe", "Trying to notify a slot but the channel is full");
true
} else {
false
}
},
}
});
}
fn pre_digest_data(
&self,
_slot_number: u64,
@@ -599,7 +666,7 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
}
}
impl<B, C, E, I, Error, SO> SlotWorker<B> for BabeWorker<B, C, E, I, SO> where
impl<B, C, E, I, Error, SO> SlotWorker<B> for BabeSlotWorker<B, C, E, I, SO> where
B: BlockT,
C: ProvideRuntimeApi<B> +
ProvideCache<B> +
@@ -104,6 +104,15 @@ pub trait SimpleSlotWorker<B: BlockT> {
epoch_data: &Self::EpochData,
) -> Option<Self::Claim>;
/// Notifies the given slot. Similar to `claim_slot`, but will be called no matter whether we
/// need to author blocks or not.
fn notify_slot(
&self,
_header: &B::Header,
_slot_number: u64,
_epoch_data: &Self::EpochData,
) { }
/// Return the pre digest data to include in a block authored with the given claim.
fn pre_digest_data(
&self,
@@ -191,6 +200,8 @@ pub trait SimpleSlotWorker<B: BlockT> {
}
};
self.notify_slot(&chain_head, slot_number, &epoch_data);
let authorities_len = self.authorities_len(&epoch_data);
if !self.force_authoring() &&