mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-09 01:18:00 +00:00
Don't send ActiveLeaves from leaves in db on startup in Overseer (#6727)
* Don't send `ActiveLeaves` from leaves in db on startup in Overseer. Wait for fresh leaves instead. * Don't pass initial set of leaves to Overseer * Fix compilation error in subsystem-test-helpers
This commit is contained in:
committed by
GitHub
parent
dfb60f32cd
commit
ed6fa5499c
@@ -190,7 +190,6 @@ where
|
||||
.span_per_active_leaf(Default::default())
|
||||
.active_leaves(Default::default())
|
||||
.known_leaves(LruCache::new(KNOWN_LEAVES_CACHE_SIZE))
|
||||
.leaves(Default::default())
|
||||
.spawner(SpawnGlue(spawner))
|
||||
.metrics(metrics)
|
||||
.supports_parachains(supports_parachains);
|
||||
|
||||
@@ -612,11 +612,6 @@ pub struct Overseer<SupportsParachains> {
|
||||
/// Stores the [`jaeger::Span`] per active leaf.
|
||||
pub span_per_active_leaf: HashMap<Hash, Arc<jaeger::Span>>,
|
||||
|
||||
/// A set of leaves that `Overseer` starts working with.
|
||||
///
|
||||
/// Drained at the beginning of `run` and never used again.
|
||||
pub leaves: Vec<(Hash, BlockNumber)>,
|
||||
|
||||
/// The set of the "active leaves".
|
||||
pub active_leaves: HashMap<Hash, BlockNumber>,
|
||||
|
||||
@@ -728,16 +723,6 @@ where
|
||||
let metrics = self.metrics.clone();
|
||||
spawn_metronome_metrics(&mut self, metrics)?;
|
||||
|
||||
// Notify about active leaves on startup before starting the loop
|
||||
for (hash, number) in std::mem::take(&mut self.leaves) {
|
||||
let _ = self.active_leaves.insert(hash, number);
|
||||
if let Some((span, status)) = self.on_head_activated(&hash, None).await {
|
||||
let update =
|
||||
ActiveLeavesUpdate::start_work(ActivatedLeaf { hash, number, status, span });
|
||||
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
select! {
|
||||
msg = self.events_rx.select_next_some() => {
|
||||
|
||||
@@ -47,14 +47,6 @@ use sp_core::crypto::Pair as _;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn block_info_to_pair(blocks: impl IntoIterator<Item = BlockInfo>) -> Vec<(Hash, BlockNumber)> {
|
||||
Vec::from_iter(
|
||||
blocks
|
||||
.into_iter()
|
||||
.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)),
|
||||
)
|
||||
}
|
||||
|
||||
type SpawnedSubsystem = crate::gen::SpawnedSubsystem<SubsystemError>;
|
||||
|
||||
struct TestSubsystem1(metered::MeteredSender<usize>);
|
||||
@@ -223,20 +215,16 @@ fn overseer_metrics_work() {
|
||||
executor::block_on(async move {
|
||||
let first_block_hash = [1; 32].into();
|
||||
let second_block_hash = [2; 32].into();
|
||||
let third_block_hash = [3; 32].into();
|
||||
|
||||
let first_block =
|
||||
BlockInfo { hash: first_block_hash, parent_hash: [0; 32].into(), number: 1 };
|
||||
let second_block =
|
||||
BlockInfo { hash: second_block_hash, parent_hash: first_block_hash, number: 2 };
|
||||
let third_block =
|
||||
BlockInfo { hash: third_block_hash, parent_hash: second_block_hash, number: 3 };
|
||||
|
||||
let registry = prometheus::Registry::new();
|
||||
let (overseer, handle) =
|
||||
dummy_overseer_builder(spawner, MockSupportsParachains, Some(®istry))
|
||||
.unwrap()
|
||||
.leaves(block_info_to_pair(vec![first_block]))
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
@@ -245,8 +233,8 @@ fn overseer_metrics_work() {
|
||||
|
||||
pin_mut!(overseer_fut);
|
||||
|
||||
handle.block_imported(first_block).await;
|
||||
handle.block_imported(second_block).await;
|
||||
handle.block_imported(third_block).await;
|
||||
handle
|
||||
.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg()))
|
||||
.await;
|
||||
@@ -256,8 +244,8 @@ fn overseer_metrics_work() {
|
||||
res = overseer_fut => {
|
||||
assert!(res.is_ok());
|
||||
let metrics = extract_metrics(®istry);
|
||||
assert_eq!(metrics["activated"], 3);
|
||||
assert_eq!(metrics["deactivated"], 2);
|
||||
assert_eq!(metrics["activated"], 2);
|
||||
assert_eq!(metrics["deactivated"], 1);
|
||||
assert_eq!(metrics["relayed"], 1);
|
||||
},
|
||||
complete => (),
|
||||
@@ -379,14 +367,11 @@ fn overseer_start_stop_works() {
|
||||
executor::block_on(async move {
|
||||
let first_block_hash = [1; 32].into();
|
||||
let second_block_hash = [2; 32].into();
|
||||
let third_block_hash = [3; 32].into();
|
||||
|
||||
let first_block =
|
||||
BlockInfo { hash: first_block_hash, parent_hash: [0; 32].into(), number: 1 };
|
||||
let second_block =
|
||||
BlockInfo { hash: second_block_hash, parent_hash: first_block_hash, number: 2 };
|
||||
let third_block =
|
||||
BlockInfo { hash: third_block_hash, parent_hash: second_block_hash, number: 3 };
|
||||
|
||||
let (tx_5, mut rx_5) = metered::channel(64);
|
||||
let (tx_6, mut rx_6) = metered::channel(64);
|
||||
@@ -395,7 +380,6 @@ fn overseer_start_stop_works() {
|
||||
.unwrap()
|
||||
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
|
||||
.replace_candidate_backing(move |_| TestSubsystem6(tx_6))
|
||||
.leaves(block_info_to_pair(vec![first_block]))
|
||||
.build()
|
||||
.unwrap();
|
||||
let mut handle = Handle::new(handle);
|
||||
@@ -406,16 +390,19 @@ fn overseer_start_stop_works() {
|
||||
let mut ss5_results = Vec::new();
|
||||
let mut ss6_results = Vec::new();
|
||||
|
||||
handle.block_imported(first_block).await;
|
||||
handle.block_imported(second_block).await;
|
||||
handle.block_imported(third_block).await;
|
||||
|
||||
let expected_heartbeats = vec![
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
|
||||
hash: first_block_hash,
|
||||
number: 1,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
status: LeafStatus::Fresh,
|
||||
})),
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: Some(ActivatedLeaf {
|
||||
hash: first_block_hash,
|
||||
number: 1,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
status: LeafStatus::Fresh,
|
||||
}),
|
||||
deactivated: Default::default(),
|
||||
}),
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: Some(ActivatedLeaf {
|
||||
hash: second_block_hash,
|
||||
@@ -425,15 +412,6 @@ fn overseer_start_stop_works() {
|
||||
}),
|
||||
deactivated: [first_block_hash].as_ref().into(),
|
||||
}),
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: Some(ActivatedLeaf {
|
||||
hash: third_block_hash,
|
||||
number: 3,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
status: LeafStatus::Fresh,
|
||||
}),
|
||||
deactivated: [second_block_hash].as_ref().into(),
|
||||
}),
|
||||
];
|
||||
|
||||
loop {
|
||||
@@ -494,7 +472,6 @@ fn overseer_finalize_works() {
|
||||
.unwrap()
|
||||
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
|
||||
.replace_candidate_backing(move |_| TestSubsystem6(tx_6))
|
||||
.leaves(block_info_to_pair(vec![first_block, second_block]))
|
||||
.build()
|
||||
.unwrap();
|
||||
let mut handle = Handle::new(handle);
|
||||
@@ -505,22 +482,32 @@ fn overseer_finalize_works() {
|
||||
let mut ss5_results = Vec::new();
|
||||
let mut ss6_results = Vec::new();
|
||||
|
||||
// activate two blocks
|
||||
handle.block_imported(first_block).await;
|
||||
handle.block_imported(second_block).await;
|
||||
|
||||
// this should stop work on both forks we started with earlier.
|
||||
handle.block_finalized(third_block).await;
|
||||
|
||||
let expected_heartbeats = vec![
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
|
||||
hash: first_block_hash,
|
||||
number: 1,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
status: LeafStatus::Fresh,
|
||||
})),
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
|
||||
hash: second_block_hash,
|
||||
number: 2,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
status: LeafStatus::Fresh,
|
||||
})),
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: Some(ActivatedLeaf {
|
||||
hash: first_block_hash,
|
||||
number: 1,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
status: LeafStatus::Fresh,
|
||||
}),
|
||||
deactivated: Default::default(),
|
||||
}),
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: Some(ActivatedLeaf {
|
||||
hash: second_block_hash,
|
||||
number: 2,
|
||||
span: Arc::new(jaeger::Span::Disabled),
|
||||
status: LeafStatus::Fresh,
|
||||
}),
|
||||
deactivated: Default::default(),
|
||||
}),
|
||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
deactivated: [first_block_hash, second_block_hash].as_ref().into(),
|
||||
..Default::default()
|
||||
@@ -590,7 +577,6 @@ fn overseer_finalize_leaf_preserves_it() {
|
||||
.unwrap()
|
||||
.replace_candidate_validation(move |_| TestSubsystem5(tx_5))
|
||||
.replace_candidate_backing(move |_| TestSubsystem6(tx_6))
|
||||
.leaves(block_info_to_pair(vec![first_block.clone(), second_block]))
|
||||
.build()
|
||||
.unwrap();
|
||||
let mut handle = Handle::new(handle);
|
||||
@@ -601,6 +587,8 @@ fn overseer_finalize_leaf_preserves_it() {
|
||||
let mut ss5_results = Vec::new();
|
||||
let mut ss6_results = Vec::new();
|
||||
|
||||
handle.block_imported(first_block.clone()).await;
|
||||
handle.block_imported(second_block).await;
|
||||
// This should stop work on the second block, but only the
|
||||
// second block.
|
||||
handle.block_finalized(first_block).await;
|
||||
|
||||
@@ -49,7 +49,6 @@ use {
|
||||
polkadot_node_network_protocol::{
|
||||
peer_set::PeerSetProtocolNames, request_response::ReqProtocolNames,
|
||||
},
|
||||
polkadot_overseer::BlockInfo,
|
||||
sc_client_api::BlockBackend,
|
||||
sp_core::traits::SpawnNamed,
|
||||
sp_trie::PrefixedMemoryDB,
|
||||
@@ -129,10 +128,6 @@ pub use {rococo_runtime, rococo_runtime_constants};
|
||||
#[cfg(feature = "westend-native")]
|
||||
pub use {westend_runtime, westend_runtime_constants};
|
||||
|
||||
/// The maximum number of active leaves we forward to the [`Overseer`] on startup.
|
||||
#[cfg(any(test, feature = "full-node"))]
|
||||
const MAX_ACTIVE_LEAVES: usize = 4;
|
||||
|
||||
/// Provides the header and block number for a hash.
|
||||
///
|
||||
/// Decouples `sc_client_api::Backend` and `sp_blockchain::HeaderBackend`.
|
||||
@@ -668,56 +663,6 @@ impl IsCollator {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the active leaves the overseer should start with.
|
||||
#[cfg(feature = "full-node")]
|
||||
async fn active_leaves<RuntimeApi, ExecutorDispatch>(
|
||||
select_chain: &impl SelectChain<Block>,
|
||||
client: &FullClient<RuntimeApi, ExecutorDispatch>,
|
||||
) -> Result<Vec<BlockInfo>, Error>
|
||||
where
|
||||
RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi, ExecutorDispatch>>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
RuntimeApi::RuntimeApi:
|
||||
RuntimeApiCollection<StateBackend = sc_client_api::StateBackendFor<FullBackend, Block>>,
|
||||
ExecutorDispatch: NativeExecutionDispatch + 'static,
|
||||
{
|
||||
let best_block = select_chain.best_chain().await?;
|
||||
|
||||
let mut leaves = select_chain
|
||||
.leaves()
|
||||
.await
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.filter_map(|hash| {
|
||||
let number = HeaderBackend::number(client, hash).ok()??;
|
||||
|
||||
// Only consider leaves that are in maximum an uncle of the best block.
|
||||
if number < best_block.number().saturating_sub(1) {
|
||||
return None
|
||||
} else if hash == best_block.hash() {
|
||||
return None
|
||||
};
|
||||
|
||||
let parent_hash = client.header(hash).ok()??.parent_hash;
|
||||
|
||||
Some(BlockInfo { hash, parent_hash, number })
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Sort by block number and get the maximum number of leaves
|
||||
leaves.sort_by_key(|b| b.number);
|
||||
|
||||
leaves.push(BlockInfo {
|
||||
hash: best_block.hash(),
|
||||
parent_hash: *best_block.parent_hash(),
|
||||
number: *best_block.number(),
|
||||
});
|
||||
|
||||
Ok(leaves.into_iter().rev().take(MAX_ACTIVE_LEAVES).collect())
|
||||
}
|
||||
|
||||
pub const AVAILABILITY_CONFIG: AvailabilityConfig = AvailabilityConfig {
|
||||
col_data: parachains_db::REAL_COLUMNS.col_availability_data,
|
||||
col_meta: parachains_db::REAL_COLUMNS.col_availability_meta,
|
||||
@@ -1009,10 +954,6 @@ where
|
||||
|
||||
let overseer_client = client.clone();
|
||||
let spawner = task_manager.spawn_handle();
|
||||
// Cannot use the `RelayChainSelection`, since that'd require a setup _and running_ overseer
|
||||
// which we are about to setup.
|
||||
let active_leaves =
|
||||
futures::executor::block_on(active_leaves(select_chain.as_longest_chain(), &*client))?;
|
||||
|
||||
let authority_discovery_service = if auth_or_collator || overseer_enable_anyways {
|
||||
use futures::StreamExt;
|
||||
@@ -1068,7 +1009,6 @@ where
|
||||
.generate::<service::SpawnTaskHandle, FullClient<RuntimeApi, ExecutorDispatch>>(
|
||||
overseer_connector,
|
||||
OverseerGenArgs {
|
||||
leaves: active_leaves,
|
||||
keystore,
|
||||
runtime_client: overseer_client.clone(),
|
||||
parachains_db,
|
||||
|
||||
@@ -34,8 +34,8 @@ pub use polkadot_overseer::{
|
||||
HeadSupportsParachains,
|
||||
};
|
||||
use polkadot_overseer::{
|
||||
metrics::Metrics as OverseerMetrics, BlockInfo, InitializedOverseerBuilder, MetricsTrait,
|
||||
Overseer, OverseerConnector, OverseerHandle, SpawnGlue,
|
||||
metrics::Metrics as OverseerMetrics, InitializedOverseerBuilder, MetricsTrait, Overseer,
|
||||
OverseerConnector, OverseerHandle, SpawnGlue,
|
||||
};
|
||||
|
||||
use polkadot_primitives::runtime_api::ParachainHost;
|
||||
@@ -81,8 +81,6 @@ where
|
||||
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
|
||||
Spawner: 'static + SpawnNamed + Clone + Unpin,
|
||||
{
|
||||
/// Set of initial relay chain leaves to track.
|
||||
pub leaves: Vec<BlockInfo>,
|
||||
/// The keystore to use for i.e. validator keys.
|
||||
pub keystore: Arc<LocalKeystore>,
|
||||
/// Runtime client generic, providing the `ProvieRuntimeApi` trait besides others.
|
||||
@@ -131,7 +129,6 @@ where
|
||||
/// with all default values.
|
||||
pub fn prepared_overseer_builder<Spawner, RuntimeClient>(
|
||||
OverseerGenArgs {
|
||||
leaves,
|
||||
keystore,
|
||||
runtime_client,
|
||||
parachains_db,
|
||||
@@ -309,11 +306,6 @@ where
|
||||
Metrics::register(registry)?,
|
||||
))
|
||||
.chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db))
|
||||
.leaves(Vec::from_iter(
|
||||
leaves
|
||||
.into_iter()
|
||||
.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)),
|
||||
))
|
||||
.activation_external_listeners(Default::default())
|
||||
.span_per_active_leaf(Default::default())
|
||||
.active_leaves(Default::default())
|
||||
|
||||
@@ -457,7 +457,6 @@ mod tests {
|
||||
dummy_overseer_builder(spawner.clone(), AlwaysSupportsParachains, None)
|
||||
.unwrap()
|
||||
.replace_collator_protocol(|_| ForwardSubsystem(tx))
|
||||
.leaves(vec![])
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user