babe: replace usage of SharedEpochChanges with internal RPC (#13883)

* babe: replace usage of SharedEpochChanges with internal RPC

* babe-rpc: fix tests

* babe: use SinkExt::send instead of Sender::try_send

SinkExt::send provides backpressure in case the channel is full

* Update client/consensus/babe/src/lib.rs

Co-authored-by: Bastian Köcher <git@kchr.de>

* babe: fix spawn

* babe: send handles backpressure

* babe: use testing::TaskExecutor

* babe-rpc: better error handling

---------

Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
André Silva
2023-04-18 10:38:04 +01:00
committed by GitHub
parent 818976d98e
commit e8e22b83b8
7 changed files with 172 additions and 176 deletions
+67 -70
View File
@@ -18,28 +18,29 @@
//! RPC api for babe.
use std::{collections::HashMap, sync::Arc};
use futures::TryFutureExt;
use jsonrpsee::{
core::{async_trait, Error as JsonRpseeError, RpcResult},
proc_macros::rpc,
types::{error::CallError, ErrorObject},
};
use sc_consensus_babe::{authorship, Epoch};
use sc_consensus_epochs::{descendent_query, Epoch as EpochT, SharedEpochChanges};
use sc_rpc_api::DenyUnsafe;
use serde::{Deserialize, Serialize};
use sc_consensus_babe::{authorship, BabeWorkerHandle};
use sc_consensus_epochs::Epoch as EpochT;
use sc_rpc_api::DenyUnsafe;
use sp_api::ProvideRuntimeApi;
use sp_application_crypto::AppCrypto;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_consensus::{Error as ConsensusError, SelectChain};
use sp_consensus_babe::{
digests::PreDigest, AuthorityId, BabeApi as BabeRuntimeApi, BabeConfiguration,
};
use sp_consensus_babe::{digests::PreDigest, AuthorityId, BabeApi as BabeRuntimeApi};
use sp_core::crypto::ByteArray;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Header as _};
use std::{collections::HashMap, sync::Arc};
const BABE_ERROR: i32 = 9000;
/// Provides rpc methods for interacting with Babe.
#[rpc(client, server)]
@@ -54,12 +55,10 @@ pub trait BabeApi {
pub struct Babe<B: BlockT, C, SC> {
/// shared reference to the client.
client: Arc<C>,
/// shared reference to EpochChanges
shared_epoch_changes: SharedEpochChanges<B, Epoch>,
/// A handle to the BABE worker for issuing requests.
babe_worker_handle: BabeWorkerHandle<B>,
/// shared reference to the Keystore
keystore: KeystorePtr,
/// config (actually holds the slot duration)
babe_config: BabeConfiguration,
/// The SelectChain strategy
select_chain: SC,
/// Whether to deny unsafe calls
@@ -70,13 +69,12 @@ impl<B: BlockT, C, SC> Babe<B, C, SC> {
/// Creates a new instance of the Babe Rpc handler.
pub fn new(
client: Arc<C>,
shared_epoch_changes: SharedEpochChanges<B, Epoch>,
babe_worker_handle: BabeWorkerHandle<B>,
keystore: KeystorePtr,
babe_config: BabeConfiguration,
select_chain: SC,
deny_unsafe: DenyUnsafe,
) -> Self {
Self { client, shared_epoch_changes, keystore, babe_config, select_chain, deny_unsafe }
Self { client, babe_worker_handle, keystore, select_chain, deny_unsafe }
}
}
@@ -93,21 +91,21 @@ where
{
async fn epoch_authorship(&self) -> RpcResult<HashMap<AuthorityId, EpochAuthorship>> {
self.deny_unsafe.check_if_safe()?;
let header = self.select_chain.best_chain().map_err(Error::Consensus).await?;
let best_header = self.select_chain.best_chain().map_err(Error::SelectChain).await?;
let epoch_start = self
.client
.runtime_api()
.current_epoch_start(header.hash())
.map_err(|err| Error::StringError(format!("{:?}", err)))?;
.current_epoch_start(best_header.hash())
.map_err(|_| Error::FetchEpoch)?;
let epoch = self
.babe_worker_handle
.epoch_data_for_child_of(best_header.hash(), *best_header.number(), epoch_start)
.await
.map_err(|_| Error::FetchEpoch)?;
let epoch = epoch_data(
&self.shared_epoch_changes,
&self.client,
&self.babe_config,
*epoch_start,
&self.select_chain,
)
.await?;
let (epoch_start, epoch_end) = (epoch.start_slot(), epoch.end_slot());
let mut claims: HashMap<AuthorityId, EpochAuthorship> = HashMap::new();
@@ -159,59 +157,37 @@ pub struct EpochAuthorship {
secondary_vrf: Vec<u64>,
}
/// Errors encountered by the RPC
/// Top-level error type for the RPC handler.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Consensus error
#[error(transparent)]
Consensus(#[from] ConsensusError),
/// Errors that can be formatted as a String
#[error("{0}")]
StringError(String),
/// Failed to fetch the current best header.
#[error("Failed to fetch the current best header: {0}")]
SelectChain(ConsensusError),
/// Failed to fetch epoch data.
#[error("Failed to fetch epoch data")]
FetchEpoch,
}
impl From<Error> for JsonRpseeError {
fn from(error: Error) -> Self {
let error_code = match error {
Error::SelectChain(_) => 1,
Error::FetchEpoch => 2,
};
JsonRpseeError::Call(CallError::Custom(ErrorObject::owned(
1234,
BABE_ERROR + error_code,
error.to_string(),
None::<()>,
Some(format!("{:?}", error)),
)))
}
}
/// Fetches the epoch data for a given slot.
async fn epoch_data<B, C, SC>(
epoch_changes: &SharedEpochChanges<B, Epoch>,
client: &Arc<C>,
babe_config: &BabeConfiguration,
slot: u64,
select_chain: &SC,
) -> Result<Epoch, Error>
where
B: BlockT,
C: HeaderBackend<B> + HeaderMetadata<B, Error = BlockChainError> + 'static,
SC: SelectChain<B>,
{
let parent = select_chain.best_chain().await?;
epoch_changes
.shared_data()
.epoch_data_for_child_of(
descendent_query(&**client),
&parent.hash(),
*parent.number(),
slot.into(),
|slot| Epoch::genesis(babe_config, slot),
)
.map_err(|e| Error::Consensus(ConsensusError::ChainLookup(e.to_string())))?
.ok_or(Error::Consensus(ConsensusError::InvalidAuthoritiesSet))
}
#[cfg(test)]
mod tests {
use super::*;
use sc_consensus_babe::block_import;
use sp_core::crypto::key_types::BABE;
use sp_consensus_babe::inherents::InherentDataProvider;
use sp_core::{crypto::key_types::BABE, testing::TaskExecutor};
use sp_keyring::Sr25519Keyring;
use sp_keystore::{testing::MemoryKeystore, Keystore};
use substrate_test_runtime_client::{
@@ -233,14 +209,35 @@ mod tests {
let builder = TestClientBuilder::new();
let (client, longest_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let config = sc_consensus_babe::configuration(&*client).expect("config available");
let (_, link) = block_import(config.clone(), client.clone(), client.clone())
.expect("can initialize block-import");
let epoch_changes = link.epoch_changes().clone();
let task_executor = TaskExecutor::new();
let keystore = create_keystore(Sr25519Keyring::Alice);
Babe::new(client.clone(), epoch_changes, keystore, config, longest_chain, deny_unsafe)
let config = sc_consensus_babe::configuration(&*client).expect("config available");
let slot_duration = config.slot_duration();
let (block_import, link) =
sc_consensus_babe::block_import(config.clone(), client.clone(), client.clone())
.expect("can initialize block-import");
let (_, babe_worker_handle) = sc_consensus_babe::import_queue(
link.clone(),
block_import.clone(),
None,
client.clone(),
longest_chain.clone(),
move |_, _| async move {
Ok((InherentDataProvider::from_timestamp_and_slot_duration(
0.into(),
slot_duration,
),))
},
&task_executor,
None,
None,
)
.unwrap();
Babe::new(client.clone(), babe_worker_handle, keystore, longest_chain, deny_unsafe)
}
#[tokio::test]
+72 -68
View File
@@ -89,8 +89,8 @@ use prometheus_endpoint::Registry;
use schnorrkel::SignatureError;
use sc_client_api::{
backend::AuxStore, AuxDataOperations, Backend as BackendT, BlockchainEvents,
FinalityNotification, PreCommitActions, ProvideUncles, UsageProvider,
backend::AuxStore, AuxDataOperations, Backend as BackendT, FinalityNotification,
PreCommitActions, UsageProvider,
};
use sc_consensus::{
block_import::{
@@ -338,6 +338,9 @@ pub enum Error<B: BlockT> {
/// Create inherents error.
#[error("Creating inherents failed: {0}")]
CreateInherents(sp_inherents::Error),
/// Background worker is not running and therefore requests cannot be answered.
#[error("Background worker is not running")]
BackgroundWorkerTerminated,
/// Client error
#[error(transparent)]
Client(sp_blockchain::Error),
@@ -475,9 +478,6 @@ pub fn start_babe<B, C, SC, E, I, SO, CIDP, BS, L, Error>(
where
B: BlockT,
C: ProvideRuntimeApi<B>
+ ProvideUncles<B>
+ BlockchainEvents<B>
+ PreCommitActions<B>
+ HeaderBackend<B>
+ HeaderMetadata<B, Error = ClientError>
+ Send
@@ -498,8 +498,6 @@ where
BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
{
const HANDLE_BUFFER_SIZE: usize = 1024;
let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
let worker = BabeSlotWorker {
@@ -529,17 +527,7 @@ where
create_inherent_data_providers,
);
let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
let answer_requests =
answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes);
let inner = future::select(Box::pin(slot_worker), Box::pin(answer_requests));
Ok(BabeWorker {
inner: Box::pin(inner.map(|_| ())),
slot_notification_sinks,
handle: BabeWorkerHandle(worker_tx),
})
Ok(BabeWorker { inner: Box::pin(slot_worker), slot_notification_sinks })
}
// Remove obsolete block's weight data by leveraging finality notifications.
@@ -593,42 +581,26 @@ async fn answer_requests<B: BlockT, C>(
client: Arc<C>,
epoch_changes: SharedEpochChanges<B, Epoch>,
) where
C: ProvideRuntimeApi<B>
+ ProvideUncles<B>
+ BlockchainEvents<B>
+ HeaderBackend<B>
+ HeaderMetadata<B, Error = ClientError>
+ Send
+ Sync
+ 'static,
C: HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
{
while let Some(request) = request_rx.next().await {
match request {
BabeRequest::EpochForChild(parent_hash, parent_number, slot_number, response) => {
BabeRequest::EpochData(response) => {
let _ = response.send(epoch_changes.shared_data().clone());
},
BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, response) => {
let lookup = || {
let epoch_changes = epoch_changes.shared_data();
let epoch_descriptor = epoch_changes
.epoch_descriptor_for_child_of(
epoch_changes
.epoch_data_for_child_of(
descendent_query(&*client),
&parent_hash,
parent_number,
slot_number,
slot,
|slot| Epoch::genesis(&config, slot),
)
.map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
.ok_or(Error::<B>::FetchEpoch(parent_hash))?;
let viable_epoch = epoch_changes
.viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&config, slot))
.ok_or(Error::<B>::FetchEpoch(parent_hash))?;
Ok(sp_consensus_babe::Epoch {
epoch_index: viable_epoch.as_ref().epoch_index,
start_slot: viable_epoch.as_ref().start_slot,
duration: viable_epoch.as_ref().duration,
authorities: viable_epoch.as_ref().authorities.clone(),
randomness: viable_epoch.as_ref().randomness,
config: viable_epoch.as_ref().config.clone(),
})
.ok_or(Error::<B>::FetchEpoch(parent_hash))
};
let _ = response.send(lookup());
@@ -638,17 +610,13 @@ async fn answer_requests<B: BlockT, C>(
}
/// Requests to the BABE service.
#[non_exhaustive]
pub enum BabeRequest<B: BlockT> {
enum BabeRequest<B: BlockT> {
/// Request all available epoch data.
EpochData(oneshot::Sender<EpochChangesFor<B, Epoch>>),
/// Request the epoch that a child of the given block, with the given slot number would have.
///
/// The parent block is identified by its hash and number.
EpochForChild(
B::Hash,
NumberFor<B>,
Slot,
oneshot::Sender<Result<sp_consensus_babe::Epoch, Error<B>>>,
),
EpochDataForChildOf(B::Hash, NumberFor<B>, Slot, oneshot::Sender<Result<Epoch, Error<B>>>),
}
/// A handle to the BABE worker for issuing requests.
@@ -656,11 +624,41 @@ pub enum BabeRequest<B: BlockT> {
pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);
impl<B: BlockT> BabeWorkerHandle<B> {
/// Send a request to the BABE service.
pub async fn send(&mut self, request: BabeRequest<B>) {
// Failure to send means that the service is down.
// This will manifest as the receiver of the request being dropped.
let _ = self.0.send(request).await;
async fn send_request(&self, request: BabeRequest<B>) -> Result<(), Error<B>> {
match self.0.clone().send(request).await {
Err(err) if err.is_disconnected() => return Err(Error::BackgroundWorkerTerminated),
Err(err) => warn!(
target: LOG_TARGET,
"Unhandled error when sending request to worker: {:?}", err
),
_ => {},
}
Ok(())
}
/// Fetch all available epoch data.
pub async fn epoch_data(&self) -> Result<EpochChangesFor<B, Epoch>, Error<B>> {
let (tx, rx) = oneshot::channel();
self.send_request(BabeRequest::EpochData(tx)).await?;
rx.await.or(Err(Error::BackgroundWorkerTerminated))
}
/// Fetch the epoch that a child of the given block, with the given slot number would have.
///
/// The parent block is identified by its hash and number.
pub async fn epoch_data_for_child_of(
&self,
parent_hash: B::Hash,
parent_number: NumberFor<B>,
slot: Slot,
) -> Result<Epoch, Error<B>> {
let (tx, rx) = oneshot::channel();
self.send_request(BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, tx))
.await?;
rx.await.or(Err(Error::BackgroundWorkerTerminated))?
}
}
@@ -669,7 +667,6 @@ impl<B: BlockT> BabeWorkerHandle<B> {
pub struct BabeWorker<B: BlockT> {
inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
slot_notification_sinks: SlotNotificationSinks<B>,
handle: BabeWorkerHandle<B>,
}
impl<B: BlockT> BabeWorker<B> {
@@ -684,11 +681,6 @@ impl<B: BlockT> BabeWorker<B> {
self.slot_notification_sinks.lock().push(sink);
stream
}
/// Get a handle to the worker.
pub fn handle(&self) -> BabeWorkerHandle<B> {
self.handle.clone()
}
}
impl<B: BlockT> Future for BabeWorker<B> {
@@ -1790,7 +1782,7 @@ pub fn import_queue<Block: BlockT, Client, SelectChain, Inner, CIDP>(
spawner: &impl sp_core::traits::SpawnEssentialNamed,
registry: Option<&Registry>,
telemetry: Option<TelemetryHandle>,
) -> ClientResult<DefaultImportQueue<Block, Client>>
) -> ClientResult<(DefaultImportQueue<Block, Client>, BabeWorkerHandle<Block>)>
where
Inner: BlockImport<
Block,
@@ -1811,16 +1803,28 @@ where
CIDP: CreateInherentDataProviders<Block, ()> + Send + Sync + 'static,
CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
{
const HANDLE_BUFFER_SIZE: usize = 1024;
let verifier = BabeVerifier {
select_chain,
create_inherent_data_providers,
config: babe_link.config,
epoch_changes: babe_link.epoch_changes,
config: babe_link.config.clone(),
epoch_changes: babe_link.epoch_changes.clone(),
telemetry,
client,
client: client.clone(),
};
Ok(BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry))
let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
let answer_requests =
answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes);
spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());
Ok((
BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
BabeWorkerHandle(worker_tx),
))
}
/// Reverts protocol aux data to at most the last finalized block.
+23 -12
View File
@@ -41,20 +41,21 @@
#![deny(unused_crate_dependencies)]
use std::sync::Arc;
use jsonrpsee::{
core::{Error as JsonRpseeError, RpcResult},
core::{async_trait, Error as JsonRpseeError, RpcResult},
proc_macros::rpc,
types::{error::CallError, ErrorObject},
};
use sc_client_api::StorageData;
use sc_consensus_babe::{BabeWorkerHandle, Error as BabeError};
use sp_blockchain::HeaderBackend;
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::sync::Arc;
type SharedAuthoritySet<TBl> =
sc_consensus_grandpa::SharedAuthoritySet<<TBl as BlockT>::Hash, NumberFor<TBl>>;
type SharedEpochChanges<TBl> =
sc_consensus_epochs::SharedEpochChanges<TBl, sc_consensus_babe::Epoch>;
/// Error type used by this crate.
#[derive(Debug, thiserror::Error)]
@@ -66,6 +67,9 @@ pub enum Error<Block: BlockT> {
#[error("Failed to load the block weight for block {0:?}")]
LoadingBlockWeightFailed(Block::Hash),
#[error("Failed to load the BABE epoch data: {0}")]
LoadingEpochDataFailed(BabeError<Block>),
#[error("JsonRpc error: {0}")]
JsonRpc(String),
@@ -125,7 +129,7 @@ pub struct LightSyncState<Block: BlockT> {
pub trait SyncStateApi {
/// Returns the JSON serialized chainspec running the node, with a sync state.
#[method(name = "sync_state_genSyncSpec")]
fn system_gen_sync_spec(&self, raw: bool) -> RpcResult<serde_json::Value>;
async fn system_gen_sync_spec(&self, raw: bool) -> RpcResult<serde_json::Value>;
}
/// An api for sync state RPC calls.
@@ -133,7 +137,7 @@ pub struct SyncState<Block: BlockT, Client> {
chain_spec: Box<dyn sc_chain_spec::ChainSpec>,
client: Arc<Client>,
shared_authority_set: SharedAuthoritySet<Block>,
shared_epoch_changes: SharedEpochChanges<Block>,
babe_worker_handle: BabeWorkerHandle<Block>,
}
impl<Block, Client> SyncState<Block, Client>
@@ -146,18 +150,24 @@ where
chain_spec: Box<dyn sc_chain_spec::ChainSpec>,
client: Arc<Client>,
shared_authority_set: SharedAuthoritySet<Block>,
shared_epoch_changes: SharedEpochChanges<Block>,
babe_worker_handle: BabeWorkerHandle<Block>,
) -> Result<Self, Error<Block>> {
if sc_chain_spec::get_extension::<LightSyncStateExtension>(chain_spec.extensions())
.is_some()
{
Ok(Self { chain_spec, client, shared_authority_set, shared_epoch_changes })
Ok(Self { chain_spec, client, shared_authority_set, babe_worker_handle })
} else {
Err(Error::<Block>::LightSyncStateExtensionNotFound)
}
}
fn build_sync_state(&self) -> Result<LightSyncState<Block>, Error<Block>> {
async fn build_sync_state(&self) -> Result<LightSyncState<Block>, Error<Block>> {
let epoch_changes = self
.babe_worker_handle
.epoch_data()
.await
.map_err(Error::LoadingEpochDataFailed)?;
let finalized_hash = self.client.info().finalized_hash;
let finalized_header = self
.client
@@ -170,20 +180,21 @@ where
Ok(LightSyncState {
finalized_block_header: finalized_header,
babe_epoch_changes: self.shared_epoch_changes.shared_data().clone(),
babe_epoch_changes: epoch_changes,
babe_finalized_block_weight: finalized_block_weight,
grandpa_authority_set: self.shared_authority_set.clone_inner(),
})
}
}
#[async_trait]
impl<Block, Backend> SyncStateApiServer for SyncState<Block, Backend>
where
Block: BlockT,
Backend: HeaderBackend<Block> + sc_client_api::AuxStore + 'static,
{
fn system_gen_sync_spec(&self, raw: bool) -> RpcResult<serde_json::Value> {
let current_sync_state = self.build_sync_state()?;
async fn system_gen_sync_spec(&self, raw: bool) -> RpcResult<serde_json::Value> {
let current_sync_state = self.build_sync_state().await?;
let mut chain_spec = self.chain_spec.cloned_box();
let extension = sc_chain_spec::get_extension_mut::<LightSyncStateExtension>(