Add block number to activated leaves and associated fixes (#2718)

* add number to `ActivatedLeavesUpdate`

* update subsystem util and overseer

* use new ActivatedLeaf everywhere

* sort view

* sorted and limited view in network bridge

* use live block hash only if it's newer

* grumples
This commit is contained in:
Robert Habermeier
2021-03-26 13:06:40 +01:00
committed by GitHub
parent a4ed8aaab2
commit 064df81ee4
20 changed files with 286 additions and 97 deletions
@@ -128,7 +128,7 @@ impl CollationGenerationSubsystem {
let metrics = self.metrics.clone();
if let Err(err) = handle_new_activations(
config.clone(),
activated.into_iter().map(|v| v.0),
activated.into_iter().map(|v| v.hash),
ctx,
metrics,
sender,
@@ -591,7 +591,8 @@ async fn handle_from_overseer(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
let mut actions = Vec::new();
for (head, _span) in update.activated {
for activated in update.activated {
let head = activated.hash;
match import::handle_new_head(
ctx,
state,
+2 -2
View File
@@ -551,9 +551,9 @@ where
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate { activated, .. })
) => {
for (activated, _span) in activated.into_iter() {
for activated in activated.into_iter() {
let _timer = subsystem.metrics.time_block_activated();
process_block_activated(ctx, subsystem, activated).await?;
process_block_activated(ctx, subsystem, activated.hash).await?;
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash, number)) => {
+11 -3
View File
@@ -31,7 +31,7 @@ use polkadot_primitives::v1::{
};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem::{
ActiveLeavesUpdate, errors::RuntimeApiError, jaeger, messages::AllMessages,
ActiveLeavesUpdate, errors::RuntimeApiError, jaeger, messages::AllMessages, ActivatedLeaf,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
use sp_keyring::Sr25519Keyring;
@@ -240,7 +240,11 @@ fn runtime_api_error_does_not_stop_the_subsystem() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: vec![(new_leaf, Arc::new(jaeger::Span::Disabled))].into(),
activated: vec![ActivatedLeaf {
hash: new_leaf,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}].into(),
deactivated: vec![].into(),
}),
).await;
@@ -885,7 +889,11 @@ async fn import_leaf(
overseer_signal(
virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: vec![(new_leaf, Arc::new(jaeger::Span::Disabled))].into(),
activated: vec![ActivatedLeaf {
hash: new_leaf,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}].into(),
deactivated: vec![].into(),
}),
).await;
+6 -5
View File
@@ -1249,7 +1249,7 @@ mod tests {
use polkadot_primitives::v1::{BlockData, GroupRotationInfo, HeadData, PersistedValidationData, ScheduledCore};
use polkadot_subsystem::{
messages::{RuntimeApiRequest, RuntimeApiMessage},
ActiveLeavesUpdate, FromOverseer, OverseerSignal,
ActiveLeavesUpdate, FromOverseer, OverseerSignal, ActivatedLeaf,
};
use polkadot_node_primitives::InvalidCandidate;
use sp_keyring::Sr25519Keyring;
@@ -1428,10 +1428,11 @@ mod tests {
) {
// Start work on some new parent.
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(
test_state.relay_parent,
Arc::new(jaeger::Span::Disabled),
)))
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: test_state.relay_parent,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
})))
).await;
// Check that subsystem job issues a request for a validator set.
@@ -23,7 +23,6 @@ use std::collections::{
};
use std::iter::IntoIterator;
use std::pin::Pin;
use std::sync::Arc;
use futures::{
channel::mpsc,
@@ -36,7 +35,7 @@ use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_subsystem_util::request_availability_cores_ctx;
use polkadot_primitives::v1::{CandidateHash, CoreState, Hash, OccupiedCore};
use polkadot_subsystem::{
messages::AllMessages, ActiveLeavesUpdate, jaeger, SubsystemContext,
messages::AllMessages, ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf,
};
use super::{error::recv_runtime, session_cache::SessionCache, LOG_TARGET, Metrics};
@@ -121,12 +120,12 @@ impl Requester {
async fn start_requesting_chunks<Context>(
&mut self,
ctx: &mut Context,
new_heads: impl Iterator<Item = (Hash, Arc<jaeger::Span>)>,
new_heads: impl Iterator<Item = ActivatedLeaf>,
) -> super::Result<Option<NonFatalError>>
where
Context: SubsystemContext,
{
for (leaf, _) in new_heads {
for ActivatedLeaf { hash: leaf, .. } in new_heads {
let cores = match query_occupied_cores(ctx, leaf).await {
Err(err) => return Ok(Some(err)),
Ok(cores) => cores,
@@ -32,9 +32,11 @@ use sc_network as network;
use sc_network::IfDisconnected;
use sc_network::config as netconfig;
use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal, messages::{AllMessages,
AvailabilityDistributionMessage, AvailabilityStoreMessage, NetworkBridgeMessage, RuntimeApiMessage,
RuntimeApiRequest}
use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal, ActivatedLeaf,
messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, NetworkBridgeMessage,
RuntimeApiMessage, RuntimeApiRequest,
}
};
use polkadot_primitives::v1::{CandidateHash, CoreState, ErasureChunk, GroupIndex, Hash, Id
as ParaId, ScheduledCore, SessionInfo, ValidatorId,
@@ -169,7 +171,11 @@ impl TestState {
self
.relay_chain.iter().zip(advanced)
.map(|(old, new)| ActiveLeavesUpdate {
activated: smallvec![(new.clone(), Arc::new(jaeger::Span::Disabled))],
activated: smallvec![ActivatedLeaf {
hash: new.clone(),
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}],
deactivated: smallvec![old.clone()],
}).collect::<Vec<_>>()
};
@@ -28,7 +28,7 @@ use rand::seq::SliceRandom;
use polkadot_primitives::v1::{
AuthorityDiscoveryId, AvailableData, CandidateReceipt, CandidateHash,
Hash, ErasureChunk, ValidatorId, ValidatorIndex,
SessionInfo, SessionIndex, BlakeTwo256, HashT, GroupIndex,
SessionInfo, SessionIndex, BlakeTwo256, HashT, GroupIndex, BlockNumber,
};
use polkadot_subsystem::{
SubsystemContext, SubsystemResult, SubsystemError, Subsystem, SpawnedSubsystem, FromOverseer,
@@ -473,7 +473,7 @@ struct State {
interactions: HashMap<CandidateHash, InteractionHandle>,
/// A recent block hash for which state should be available.
live_block_hash: Hash,
live_block: (BlockNumber, Hash),
/// interaction communication. This is cloned and given to interactions that are spun up.
from_interaction_tx: mpsc::Sender<FromInteraction>,
@@ -491,7 +491,7 @@ impl Default for State {
Self {
interactions: HashMap::new(),
live_block_hash: Hash::default(),
live_block: (0, Hash::default()),
from_interaction_tx,
from_interaction_rx,
availability_lru: LruCache::new(LRU_SIZE),
@@ -521,9 +521,11 @@ async fn handle_signal(
match signal {
OverseerSignal::Conclude => Ok(true),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. }) => {
// if activated is non-empty, set state.live_block_hash to the first block in Activated.
if let Some(hash) = activated.get(0) {
state.live_block_hash = hash.0;
// if activated is non-empty, set state.live_block to the highest block in `activated`
for activated in activated {
if activated.number > state.live_block.0 {
state.live_block = (activated.number, activated.hash)
}
}
Ok(false)
@@ -630,7 +632,7 @@ async fn handle_recover(
let _span = span.child("not-cached");
let session_info = request_session_info_ctx(
state.live_block_hash,
state.live_block.1,
session_index,
ctx,
).await?.await.map_err(error::Error::CanceledSessionInfo)??;
@@ -651,7 +653,7 @@ async fn handle_recover(
None => {
tracing::warn!(
target: LOG_TARGET,
"SessionInfo is `None` at {}", state.live_block_hash,
"SessionInfo is `None` at {:?}", state.live_block,
);
response_sender
.send(Err(RecoveryError::Unavailable))
@@ -32,7 +32,7 @@ use polkadot_primitives::v1::{
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger};
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger, ActivatedLeaf};
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>;
@@ -418,7 +418,11 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
activated: smallvec![ActivatedLeaf {
hash: test_state.current.clone(),
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}],
deactivated: smallvec![],
}),
).await;
@@ -490,7 +494,11 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
activated: smallvec![ActivatedLeaf {
hash: test_state.current.clone(),
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}],
deactivated: smallvec![],
}),
).await;
@@ -562,7 +570,11 @@ fn bad_merkle_path_leads_to_recovery_error() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
activated: smallvec![ActivatedLeaf {
hash: test_state.current.clone(),
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}],
deactivated: smallvec![],
}),
).await;
@@ -612,7 +624,11 @@ fn wrong_chunk_index_leads_to_recovery_error() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
activated: smallvec![ActivatedLeaf {
hash: test_state.current.clone(),
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}],
deactivated: smallvec![],
}),
).await;
@@ -682,7 +698,11 @@ fn invalid_erasure_coding_leads_to_invalid_error() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
activated: smallvec![ActivatedLeaf {
hash: test_state.current.clone(),
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}],
deactivated: smallvec![],
}),
).await;
@@ -723,7 +743,11 @@ fn fast_path_backing_group_recovers() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
activated: smallvec![ActivatedLeaf {
hash: test_state.current.clone(),
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}],
deactivated: smallvec![],
}),
).await;
@@ -768,7 +792,11 @@ fn no_answers_in_fast_path_causes_chunk_requests() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
activated: smallvec![ActivatedLeaf {
hash: test_state.current.clone(),
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}],
deactivated: smallvec![],
}),
).await;
@@ -192,9 +192,11 @@ impl BitfieldDistribution {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => {
let _timer = self.metrics.time_active_leaves_update();
for (relay_parent, span) in activated {
for activated in activated {
let relay_parent = activated.hash;
tracing::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "activated");
let span = PerLeafSpan::new(span, "bitfield-distribution");
let span = PerLeafSpan::new(activated.span, "bitfield-distribution");
let _span = span.child("query-basics");
// query validator set and signing context per relay_parent once only
+94 -18
View File
@@ -24,8 +24,8 @@ use parity_scale_codec::{Encode, Decode};
use futures::prelude::*;
use polkadot_subsystem::{
ActiveLeavesUpdate, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
SubsystemResult, jaeger,
ActiveLeavesUpdate, ActivatedLeaf, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
SubsystemResult,
};
use polkadot_subsystem::messages::{
NetworkBridgeMessage, AllMessages,
@@ -43,7 +43,6 @@ pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
use std::collections::{HashMap, hash_map};
use std::iter::ExactSizeIterator;
use std::sync::Arc;
use std::time::Instant;
mod validator_discovery;
@@ -154,8 +153,8 @@ where
{
let mut event_stream = bridge.network_service.event_stream().fuse();
// Most recent heads are at the back.
let mut live_heads: Vec<(Hash, Arc<jaeger::Span>)> = Vec::with_capacity(MAX_VIEW_HEADS);
// This is kept sorted, descending, by block number.
let mut live_heads: Vec<ActivatedLeaf> = Vec::with_capacity(MAX_VIEW_HEADS);
let mut local_view = View::default();
let mut finalized_number = 0;
@@ -315,8 +314,14 @@ where
num_deactivated = %deactivated.len(),
);
live_heads.extend(activated);
live_heads.retain(|h| !deactivated.contains(&h.0));
for activated in activated {
let pos = live_heads
.binary_search_by(|probe| probe.number.cmp(&activated.number).reverse())
.unwrap_or_else(|i| i);
live_heads.insert(pos, activated);
}
live_heads.retain(|h| !deactivated.contains(&h.hash));
update_our_view(
&mut bridge.network_service,
@@ -490,8 +495,8 @@ where
fn construct_view(live_heads: impl DoubleEndedIterator<Item = Hash>, finalized_number: BlockNumber) -> View {
View::new(
live_heads.rev().take(MAX_VIEW_HEADS),
finalized_number
live_heads.take(MAX_VIEW_HEADS),
finalized_number,
)
}
@@ -499,13 +504,13 @@ fn construct_view(live_heads: impl DoubleEndedIterator<Item = Hash>, finalized_n
async fn update_our_view(
net: &mut impl Network,
ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>,
live_heads: &[(Hash, Arc<jaeger::Span>)],
live_heads: &[ActivatedLeaf],
local_view: &mut View,
finalized_number: BlockNumber,
validation_peers: &HashMap<PeerId, PeerData>,
collation_peers: &HashMap<PeerId, PeerData>,
) -> SubsystemResult<()> {
let new_view = construct_view(live_heads.iter().map(|v| v.0), finalized_number);
let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number);
// We only want to send a view update when the heads changed.
// A change in finalized block number only is _not_ sufficient.
@@ -527,7 +532,10 @@ async fn update_our_view(
WireMessage::ViewUpdate(new_view),
).await?;
let our_view = OurView::new(live_heads.iter().cloned(), finalized_number);
let our_view = OurView::new(
live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| (a.hash, a.span)),
finalized_number,
);
dispatch_validation_event_to_all(NetworkBridgeEvent::OurViewChange(our_view.clone()), ctx).await;
@@ -684,7 +692,7 @@ mod tests {
use sc_network::{Event as NetworkEvent, IfDisconnected};
use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal};
use polkadot_subsystem::{jaeger, ActiveLeavesUpdate, FromOverseer, OverseerSignal};
use polkadot_subsystem::messages::{
ApprovalDistributionMessage,
BitfieldDistributionMessage,
@@ -929,7 +937,11 @@ mod tests {
let head = Hash::repeat_byte(1);
virtual_overseer.send(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(head, Arc::new(jaeger::Span::Disabled)),
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: head,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
})
))
).await;
@@ -984,7 +996,11 @@ mod tests {
virtual_overseer.send(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(hash_a, Arc::new(jaeger::Span::Disabled)),
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: hash_a,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
})
))
).await;
@@ -1046,7 +1062,11 @@ mod tests {
// This should trigger the view update to our peers.
virtual_overseer.send(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(hash_a, Arc::new(jaeger::Span::Disabled)),
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: hash_a,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
})
))
).await;
@@ -1236,7 +1256,11 @@ mod tests {
virtual_overseer.send(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(hash_a, Arc::new(jaeger::Span::Disabled)),
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: hash_a,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
})
))
).await;
@@ -1429,7 +1453,11 @@ mod tests {
).await;
virtual_overseer.send(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(hash_b, Arc::new(jaeger::Span::Disabled)),
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: hash_b,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
})
))
).await;
@@ -1626,4 +1654,52 @@ mod tests {
}
assert_eq!(cnt, EXPECTED_COUNT);
}
#[test]
fn our_view_updates_decreasing_order_and_limited_to_max() {
test_harness(|test_harness| async move {
let TestHarness {
mut virtual_overseer,
..
} = test_harness;
// to show that we're still connected on the collation protocol, send a view update.
let hashes = (0..MAX_VIEW_HEADS * 3).map(|i| Hash::repeat_byte(i as u8));
virtual_overseer.send(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
// These are in reverse order, so the subsystem must sort internally to
// get the correct view.
ActiveLeavesUpdate {
activated: hashes.enumerate().map(|(i, h)| ActivatedLeaf {
hash: h,
number: i as _,
span: Arc::new(jaeger::Span::Disabled),
}).rev().collect(),
deactivated: Default::default(),
}
))
).await;
let view_heads = (MAX_VIEW_HEADS * 2 .. MAX_VIEW_HEADS * 3).rev()
.map(|i| (Hash::repeat_byte(i as u8), Arc::new(jaeger::Span::Disabled)) );
let our_view = OurView::new(
view_heads,
0,
);
assert_sends_validation_event_to_all(
NetworkBridgeEvent::OurViewChange(our_view.clone()),
&mut virtual_overseer,
).await;
assert_sends_collation_event_to_all(
NetworkBridgeEvent::OurViewChange(our_view),
&mut virtual_overseer,
).await;
});
}
}
@@ -955,7 +955,7 @@ mod tests {
use polkadot_subsystem::{
jaeger,
messages::{RuntimeApiMessage, RuntimeApiRequest},
ActiveLeavesUpdate,
ActiveLeavesUpdate, ActivatedLeaf,
};
use polkadot_subsystem_testhelpers as test_helpers;
@@ -1215,7 +1215,11 @@ mod tests {
overseer_signal(
virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: [(test_state.relay_parent, Arc::new(jaeger::Span::Disabled))][..].into(),
activated: vec![ActivatedLeaf {
hash: test_state.relay_parent,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}].into(),
deactivated: [][..].into(),
}),
).await;
@@ -80,7 +80,7 @@ impl GossipSupport {
})) => {
tracing::trace!(target: LOG_TARGET, "active leaves signal");
let leaves = activated.into_iter().map(|(h, _)| h);
let leaves = activated.into_iter().map(|a| a.hash);
if let Err(e) = state.handle_active_leaves(&mut ctx, leaves).await {
tracing::debug!(target: LOG_TARGET, error = ?e);
}
@@ -153,10 +153,11 @@ async fn handle_signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
let _timer = state.metrics.time_handle_signal();
for (relay_parent, span) in activated {
let _span = span.child("pov-dist")
for activated in activated {
let _span = activated.span.child("pov-dist")
.with_stage(jaeger::Stage::PoVDistribution);
let relay_parent = activated.hash;
match request_validators_ctx(relay_parent, ctx).await {
Ok(vals_rx) => {
let n_validators = match vals_rx.await? {
@@ -25,7 +25,7 @@ use tracing::trace;
use sp_keyring::Sr25519Keyring;
use polkadot_primitives::v1::{AuthorityDiscoveryId, BlockData, CoreState, GroupRotationInfo, Id as ParaId, ScheduledCore, SessionIndex, SessionInfo, ValidatorIndex};
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger};
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger, ActivatedLeaf};
use polkadot_node_subsystem_test_helpers as test_helpers;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_node_network_protocol::{view, our_view};
@@ -275,7 +275,11 @@ fn ask_validators_for_povs() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: [(test_state.relay_parent, Arc::new(jaeger::Span::Disabled))][..].into(),
activated: vec![ActivatedLeaf {
hash: test_state.relay_parent,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}].into(),
deactivated: [][..].into(),
}),
).await;
@@ -447,7 +451,11 @@ fn ask_validators_for_povs() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: [(next_leaf, Arc::new(jaeger::Span::Disabled))][..].into(),
activated: vec![ActivatedLeaf {
hash: next_leaf,
number: 2,
span: Arc::new(jaeger::Span::Disabled),
}].into(),
deactivated: [current.clone()][..].into(),
})
).await;
@@ -1016,8 +1016,9 @@ impl StatementDistribution {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => {
let _timer = metrics.time_active_leaves_update();
for (relay_parent, span) in activated {
let span = PerLeafSpan::new(span, "statement-distribution");
for activated in activated {
let relay_parent = activated.hash;
let span = PerLeafSpan::new(activated.span, "statement-distribution");
let (validators, session_index) = {
let (val_tx, val_rx) = oneshot::channel();
@@ -1187,7 +1188,7 @@ mod tests {
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
use sc_keystore::LocalKeystore;
use polkadot_node_network_protocol::{view, ObservedRole, our_view};
use polkadot_subsystem::jaeger;
use polkadot_subsystem::{jaeger, ActivatedLeaf};
#[test]
fn active_head_accepts_only_2_seconded_per_validator() {
@@ -1743,7 +1744,11 @@ mod tests {
let test_fut = async move {
// register our active heads.
handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: vec![(hash_a, Arc::new(jaeger::Span::Disabled))].into(),
activated: vec![ActivatedLeaf {
hash: hash_a,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}].into(),
deactivated: vec![].into(),
}))).await;
+41 -12
View File
@@ -90,7 +90,7 @@ use polkadot_subsystem::messages::{
};
pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem, jaeger,
SpawnedSubsystem, ActiveLeavesUpdate, ActivatedLeaf, DummySubsystem, jaeger,
};
use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}, metered, Metronome};
use polkadot_node_primitives::SpawnNamed;
@@ -1898,7 +1898,11 @@ where
for (hash, number) in std::mem::take(&mut self.leaves) {
let _ = self.active_leaves.insert(hash, number);
let span = self.on_head_activated(&hash, None);
update.activated.push((hash, span));
update.activated.push(ActivatedLeaf {
hash,
number,
span,
});
}
if !update.is_empty() {
@@ -1982,7 +1986,11 @@ where
};
let span = self.on_head_activated(&block.hash, Some(block.parent_hash));
let mut update = ActiveLeavesUpdate::start_work(block.hash, span);
let mut update = ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: block.hash,
number: block.number,
span
});
if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
debug_assert_eq!(block.number.saturating_sub(1), number);
@@ -2602,16 +2610,25 @@ mod tests {
handler.block_imported(third_block).await;
let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(
first_block_hash,
Arc::new(jaeger::Span::Disabled),
)),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: first_block_hash,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
})),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: [(second_block_hash, Arc::new(jaeger::Span::Disabled))].as_ref().into(),
activated: [ActivatedLeaf {
hash: second_block_hash,
number: 2,
span: Arc::new(jaeger::Span::Disabled),
}].as_ref().into(),
deactivated: [first_block_hash].as_ref().into(),
}),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: [(third_block_hash, Arc::new(jaeger::Span::Disabled))].as_ref().into(),
activated: [ActivatedLeaf {
hash: third_block_hash,
number: 3,
span: Arc::new(jaeger::Span::Disabled),
}].as_ref().into(),
deactivated: [second_block_hash].as_ref().into(),
}),
];
@@ -2700,8 +2717,16 @@ mod tests {
let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: [
(first_block_hash, Arc::new(jaeger::Span::Disabled)),
(second_block_hash, Arc::new(jaeger::Span::Disabled)),
ActivatedLeaf {
hash: first_block_hash,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
},
ActivatedLeaf {
hash: second_block_hash,
number: 2,
span: Arc::new(jaeger::Span::Disabled),
},
].as_ref().into(),
..Default::default()
}),
@@ -2788,7 +2813,11 @@ mod tests {
let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: [
(imported_block.hash, Arc::new(jaeger::Span::Disabled)),
ActivatedLeaf {
hash: imported_block.hash,
number: imported_block.number,
span: Arc::new(jaeger::Span::Disabled)
}
].as_ref().into(),
..Default::default()
}),
+14 -6
View File
@@ -776,15 +776,15 @@ where
activated,
deactivated,
}))) => {
for (hash, span) in activated {
for activated in activated {
let metrics = metrics.clone();
if let Err(e) = jobs.spawn_job(hash, span, run_args.clone(), metrics) {
if let Err(e) = jobs.spawn_job(activated.hash, activated.span, run_args.clone(), metrics) {
tracing::error!(
job = Job::NAME,
err = ?e,
"failed to spawn a job",
);
Self::fwd_err(Some(hash), JobsError::Utility(e), err_tx).await;
Self::fwd_err(Some(activated.hash), JobsError::Utility(e), err_tx).await;
return true;
}
}
@@ -1048,7 +1048,7 @@ mod tests {
use polkadot_node_jaeger as jaeger;
use polkadot_node_subsystem::{
messages::{AllMessages, CandidateSelectionMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
SpawnedSubsystem,
SpawnedSubsystem, ActivatedLeaf,
};
use assert_matches::assert_matches;
use futures::{channel::mpsc, executor, StreamExt, future, Future, FutureExt, SinkExt};
@@ -1174,7 +1174,11 @@ mod tests {
test_harness(true, |mut overseer_handle, err_rx| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(relay_parent, Arc::new(jaeger::Span::Disabled)),
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: relay_parent,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
assert_matches!(
@@ -1203,7 +1207,11 @@ mod tests {
test_harness(true, |mut overseer_handle, err_rx| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(relay_parent, Arc::new(jaeger::Span::Disabled)),
ActiveLeavesUpdate::start_work(ActivatedLeaf {
hash: relay_parent,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}),
)))
.await;
+21 -10
View File
@@ -46,24 +46,35 @@ use self::messages::AllMessages;
/// If there are greater than this number of slots, then we fall back to a heap vector.
const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8;
/// Activated leaf.
#[derive(Debug, Clone)]
pub struct ActivatedLeaf {
/// The block hash.
pub hash: Hash,
/// The block number.
pub number: BlockNumber,
/// An associated [`jaeger::Span`].
///
/// NOTE: Each span should only be kept active as long as the leaf is considered active and should be dropped
/// when the leaf is deactivated.
pub span: Arc<jaeger::Span>,
}
/// Changes in the set of active leaves: the parachain heads which we care to work on.
///
/// Note that the activated and deactivated fields indicate deltas, not complete sets.
#[derive(Clone, Default)]
pub struct ActiveLeavesUpdate {
/// New relay chain block hashes of interest and their associated [`jaeger::Span`].
///
/// NOTE: Each span should only be kept active as long as the leaf is considered active and should be dropped
/// when the leaf is deactivated.
pub activated: SmallVec<[(Hash, Arc<jaeger::Span>); ACTIVE_LEAVES_SMALLVEC_CAPACITY]>,
/// New relay chain blocks of interest.
pub activated: SmallVec<[ActivatedLeaf; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>,
/// Relay chain block hashes no longer of interest.
pub deactivated: SmallVec<[Hash; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>,
}
impl ActiveLeavesUpdate {
/// Create a ActiveLeavesUpdate with a single activated hash
pub fn start_work(hash: Hash, span: Arc<jaeger::Span>) -> Self {
Self { activated: [(hash, span)][..].into(), ..Default::default() }
pub fn start_work(activated: ActivatedLeaf) -> Self {
Self { activated: [activated][..].into(), ..Default::default() }
}
/// Create a ActiveLeavesUpdate with a single deactivated hash
@@ -83,17 +94,17 @@ impl PartialEq for ActiveLeavesUpdate {
/// Instead, it means equality when `activated` and `deactivated` are considered as sets.
fn eq(&self, other: &Self) -> bool {
self.activated.len() == other.activated.len() && self.deactivated.len() == other.deactivated.len()
&& self.activated.iter().all(|a| other.activated.iter().any(|o| a.0 == o.0))
&& self.activated.iter().all(|a| other.activated.iter().any(|o| a.hash == o.hash))
&& self.deactivated.iter().all(|a| other.deactivated.contains(a))
}
}
impl fmt::Debug for ActiveLeavesUpdate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
struct Activated<'a>(&'a [(Hash, Arc<jaeger::Span>)]);
struct Activated<'a>(&'a [ActivatedLeaf]);
impl fmt::Debug for Activated<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list().entries(self.0.iter().map(|e| e.0)).finish()
f.debug_list().entries(self.0.iter().map(|e| e.hash)).finish()
}
}
@@ -30,7 +30,7 @@ Indicates a change in active leaves. Activated leaves should have jobs, whereas
```rust
struct ActiveLeavesUpdate {
activated: [Hash], // in practice, these should probably be a SmallVec
activated: [(Hash, Number)], // in practice, these should probably be a SmallVec
deactivated: [Hash],
}
```