mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 14:11:09 +00:00
Improve Network Spans (#2169)
* utility functions for erasure-coding threshold * add candidate-hash tag to candidate jaeger spans * debug implementation for jaeger span * add a span to each live candidate in availability dist. * availability span covers only our piece * fix tests * keep span alive slightly longer * remove spammy bitfield-gossip-received log * Revert "remove spammy bitfield-gossip-received log" This reverts commit 831a2db506d66f64ea516af3caf891e8643f5c43. * add claimed validator to bitfield-gossip span * add peer-id to handle-incoming span * add peer-id to availability distribution span * Update node/network/availability-distribution/src/lib.rs Co-authored-by: Bernhard Schuster <bernhard@ahoi.io> * Update erasure-coding/src/lib.rs Co-authored-by: Bernhard Schuster <bernhard@ahoi.io> * Update node/subsystem/src/jaeger.rs Co-authored-by: Bernhard Schuster <bernhard@ahoi.io> Co-authored-by: Bernhard Schuster <bernhard@ahoi.io>
This commit is contained in:
committed by
GitHub
parent
a864eaa093
commit
41102a6ff9
@@ -119,12 +119,18 @@ impl CodeParams {
|
||||
.expect("this struct is not created with invalid shard number; qed")
|
||||
}
|
||||
}
|
||||
|
||||
fn code_params(n_validators: usize) -> Result<CodeParams, Error> {
|
||||
/// Returns the maximum number of allowed, faulty chunks
|
||||
/// which does not prevent recovery given all other pieces
|
||||
/// are correct.
|
||||
const fn n_faulty(n_validators: usize) -> Result<usize, Error> {
|
||||
if n_validators > MAX_VALIDATORS { return Err(Error::TooManyValidators) }
|
||||
if n_validators <= 1 { return Err(Error::NotEnoughValidators) }
|
||||
|
||||
let n_faulty = n_validators.saturating_sub(1) / 3;
|
||||
Ok(n_validators.saturating_sub(1) / 3)
|
||||
}
|
||||
|
||||
fn code_params(n_validators: usize) -> Result<CodeParams, Error> {
|
||||
let n_faulty = n_faulty(n_validators)?;
|
||||
let n_good = n_validators - n_faulty;
|
||||
|
||||
Ok(CodeParams {
|
||||
@@ -133,6 +139,13 @@ fn code_params(n_validators: usize) -> Result<CodeParams, Error> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Obtain a threshold of chunks that should be enough to recover the data.
|
||||
pub fn recovery_threshold(n_validators: usize) -> Result<usize, Error> {
|
||||
let n_faulty = n_faulty(n_validators)?;
|
||||
|
||||
Ok(n_faulty + 1)
|
||||
}
|
||||
|
||||
/// Obtain erasure-coded chunks for v0 `AvailableData`, one for each validator.
|
||||
///
|
||||
/// Works only up to 65536 validators, and `n_validators` must be non-zero.
|
||||
|
||||
@@ -121,7 +121,7 @@ impl From<AvailabilityGossipMessage> for protocol_v1::AvailabilityDistributionMe
|
||||
|
||||
/// Data used to track information of peers and relay parents the
|
||||
/// overseer ordered us to work on.
|
||||
#[derive(Default, Clone, Debug)]
|
||||
#[derive(Debug, Default)]
|
||||
struct ProtocolState {
|
||||
/// Track all active peers and their views
|
||||
/// to determine what is relevant to them.
|
||||
@@ -142,7 +142,7 @@ struct ProtocolState {
|
||||
per_candidate: HashMap<CandidateHash, PerCandidate>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
#[derive(Debug)]
|
||||
struct PerCandidate {
|
||||
/// A Candidate and a set of known erasure chunks in form of messages to be gossiped / distributed if the peer view wants that.
|
||||
/// This is _across_ peers and not specific to a particular one.
|
||||
@@ -166,13 +166,30 @@ struct PerCandidate {
|
||||
|
||||
/// The set of relay chain blocks this appears to be live in.
|
||||
live_in: HashSet<Hash>,
|
||||
|
||||
/// A Jaeger span relating to this candidate.
|
||||
span: jaeger::JaegerSpan,
|
||||
}
|
||||
|
||||
impl PerCandidate {
|
||||
/// Returns `true` iff the given `validator_index` is required by the given `peer`.
|
||||
fn message_required_by_peer(&self, peer: &PeerId, validator_index: &ValidatorIndex) -> bool {
|
||||
self.received_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true)
|
||||
&& self.sent_messages.get(peer).map(|v| !v.contains(validator_index)).unwrap_or(true)
|
||||
fn message_required_by_peer(&self, peer: &PeerId, validator_index: ValidatorIndex) -> bool {
|
||||
self.received_messages.get(peer).map(|v| !v.contains(&validator_index)).unwrap_or(true)
|
||||
&& self.sent_messages.get(peer).map(|v| !v.contains(&validator_index)).unwrap_or(true)
|
||||
}
|
||||
|
||||
/// Add a chunk to the message vault. Overwrites anything that was already present.
|
||||
fn add_message(&mut self, chunk_index: u32, message: AvailabilityGossipMessage) {
|
||||
let _ = self.message_vault.insert(chunk_index, message);
|
||||
}
|
||||
|
||||
/// Clean up the span if we've got our own chunk.
|
||||
fn drop_span_after_own_availability(&mut self) {
|
||||
if let Some(validator_index) = self.validator_index {
|
||||
if self.message_vault.contains_key(&validator_index) {
|
||||
self.span = jaeger::JaegerSpan::Disabled;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,12 +212,10 @@ impl ProtocolState {
|
||||
&'a self,
|
||||
relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
|
||||
) -> HashSet<CandidateHash> {
|
||||
relay_parents
|
||||
.into_iter()
|
||||
.filter_map(|r| self.per_relay_parent.get(r))
|
||||
.map(|per_relay_parent| per_relay_parent.live_candidates.iter().cloned())
|
||||
.flatten()
|
||||
.collect()
|
||||
cached_live_candidates_unioned(
|
||||
&self.per_relay_parent,
|
||||
relay_parents
|
||||
)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(candidates), fields(subsystem = LOG_TARGET))]
|
||||
@@ -218,16 +233,32 @@ impl ProtocolState {
|
||||
|
||||
// register the relation of relay_parent to candidate..
|
||||
for (receipt_hash, fetched) in candidates {
|
||||
let per_candidate = self.per_candidate.entry(receipt_hash).or_default();
|
||||
let candidate_entry = match self.per_candidate.entry(receipt_hash) {
|
||||
Entry::Occupied(e) => e.into_mut(),
|
||||
Entry::Vacant(e) => {
|
||||
if let FetchedLiveCandidate::Fresh(descriptor) = fetched {
|
||||
e.insert(PerCandidate {
|
||||
message_vault: HashMap::new(),
|
||||
received_messages: HashMap::new(),
|
||||
sent_messages: HashMap::new(),
|
||||
validators: validators.clone(),
|
||||
validator_index,
|
||||
descriptor,
|
||||
live_in: HashSet::new(),
|
||||
span: if validator_index.is_some() {
|
||||
jaeger::candidate_hash_span(&receipt_hash, "pending-availability")
|
||||
} else {
|
||||
jaeger::JaegerSpan::Disabled
|
||||
},
|
||||
})
|
||||
} else {
|
||||
tracing::warn!(target: LOG_TARGET, "No `per_candidate` but not fresh. logic error");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Cached candidates already have entries and thus don't need this
|
||||
// information to be set.
|
||||
if let FetchedLiveCandidate::Fresh(descriptor) = fetched {
|
||||
per_candidate.validator_index = validator_index.clone();
|
||||
per_candidate.validators = validators.clone();
|
||||
per_candidate.descriptor = descriptor;
|
||||
}
|
||||
per_candidate.live_in.insert(relay_parent);
|
||||
candidate_entry.live_in.insert(relay_parent);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -259,6 +290,18 @@ impl ProtocolState {
|
||||
}
|
||||
}
|
||||
|
||||
fn cached_live_candidates_unioned<'a>(
|
||||
per_relay_parent: &'a HashMap<Hash, PerRelayParent>,
|
||||
relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
|
||||
) -> HashSet<CandidateHash> {
|
||||
relay_parents
|
||||
.into_iter()
|
||||
.filter_map(|r| per_relay_parent.get(r))
|
||||
.map(|per_relay_parent| per_relay_parent.live_candidates.iter().cloned())
|
||||
.flatten()
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Deal with network bridge updates and track what needs to be tracked
|
||||
/// which depends on the message type received.
|
||||
#[tracing::instrument(level = "trace", skip(ctx, keystore, metrics), fields(subsystem = LOG_TARGET))]
|
||||
@@ -297,8 +340,6 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
let mut _span = jaeger::hash_span(&gossiped_availability.candidate_hash.0, "availability-message-received");
|
||||
|
||||
process_incoming_peer_message(ctx, state, remote, gossiped_availability, metrics)
|
||||
.await?;
|
||||
}
|
||||
@@ -344,9 +385,11 @@ where
|
||||
// handle all candidates
|
||||
for candidate_hash in state.cached_live_candidates_unioned(view.difference(&old_view)) {
|
||||
// If we are not a validator for this candidate, let's skip it.
|
||||
if state.per_candidate.entry(candidate_hash).or_default().validator_index.is_none() {
|
||||
continue
|
||||
}
|
||||
match state.per_candidate.get(&candidate_hash) {
|
||||
None => continue,
|
||||
Some(c) if c.validator_index.is_none() => continue,
|
||||
Some(_) => {},
|
||||
};
|
||||
|
||||
// check if the availability is present in the store exists
|
||||
if !query_data_availability(ctx, candidate_hash).await? {
|
||||
@@ -367,12 +410,18 @@ where
|
||||
.map(|(peer, _view)| peer.clone())
|
||||
.collect();
|
||||
|
||||
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
|
||||
let per_candidate = state.per_candidate.get_mut(&candidate_hash)
|
||||
.expect("existence checked above; qed");
|
||||
|
||||
let validator_count = per_candidate.validators.len();
|
||||
|
||||
// distribute all erasure messages to interested peers
|
||||
for chunk_index in 0u32..(validator_count as u32) {
|
||||
let _span = {
|
||||
let mut span = per_candidate.span.child("load-and-distribute");
|
||||
span.add_string_tag("chunk-index", &format!("{}", chunk_index));
|
||||
span
|
||||
};
|
||||
let message = if let Some(message) = per_candidate.message_vault.get(&chunk_index) {
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
@@ -389,10 +438,15 @@ where
|
||||
"Retrieved chunk from availability storage",
|
||||
);
|
||||
|
||||
AvailabilityGossipMessage {
|
||||
|
||||
let msg = AvailabilityGossipMessage {
|
||||
candidate_hash,
|
||||
erasure_chunk,
|
||||
}
|
||||
};
|
||||
|
||||
per_candidate.add_message(chunk_index, msg.clone());
|
||||
|
||||
msg
|
||||
} else {
|
||||
tracing::error!(
|
||||
target: LOG_TARGET,
|
||||
@@ -407,12 +461,15 @@ where
|
||||
|
||||
let peers = peers
|
||||
.iter()
|
||||
.filter(|peer| per_candidate.message_required_by_peer(peer, &chunk_index))
|
||||
.filter(|peer| per_candidate.message_required_by_peer(peer, chunk_index))
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await;
|
||||
}
|
||||
|
||||
// traces are better if we wait until the loop is done to drop.
|
||||
per_candidate.drop_span_after_own_availability();
|
||||
}
|
||||
|
||||
// cleanup the removed relay parents and their states
|
||||
@@ -442,10 +499,6 @@ where
|
||||
.insert(message.erasure_chunk.index);
|
||||
}
|
||||
|
||||
per_candidate
|
||||
.message_vault
|
||||
.insert(message.erasure_chunk.index, message.clone());
|
||||
|
||||
if !peers.is_empty() {
|
||||
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
|
||||
peers.clone(),
|
||||
@@ -482,7 +535,10 @@ where
|
||||
|
||||
// Send all messages we've seen before and the peer is now interested in.
|
||||
for candidate_hash in added_candidates {
|
||||
let per_candidate = state.per_candidate.entry(candidate_hash).or_default();
|
||||
let per_candidate = match state.per_candidate.get_mut(&candidate_hash) {
|
||||
Some(p) => p,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
// obtain the relevant chunk indices not sent yet
|
||||
let messages = ((0 as ValidatorIndex)..(per_candidate.validators.len() as ValidatorIndex))
|
||||
@@ -493,7 +549,7 @@ where
|
||||
per_candidate
|
||||
.message_vault
|
||||
.get(&erasure_chunk_index)
|
||||
.filter(|_| per_candidate.message_required_by_peer(&origin, &erasure_chunk_index))
|
||||
.filter(|_| per_candidate.message_required_by_peer(&origin, erasure_chunk_index))
|
||||
})
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>();
|
||||
@@ -540,12 +596,10 @@ where
|
||||
let live_candidates = state.cached_live_candidates_unioned(state.view.heads.iter());
|
||||
|
||||
// check if the candidate is of interest
|
||||
let descriptor = if live_candidates.contains(&message.candidate_hash) {
|
||||
let candidate_entry = if live_candidates.contains(&message.candidate_hash) {
|
||||
state.per_candidate
|
||||
.get(&message.candidate_hash)
|
||||
.get_mut(&message.candidate_hash)
|
||||
.expect("All live candidates are contained in per_candidate; qed")
|
||||
.descriptor
|
||||
.clone()
|
||||
} else {
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
@@ -557,105 +611,140 @@ where
|
||||
return Ok(())
|
||||
};
|
||||
|
||||
// check the merkle proof against the erasure root in the candidate descriptor.
|
||||
let anticipated_hash = match branch_hash(
|
||||
&descriptor.erasure_root,
|
||||
&message.erasure_chunk.proof,
|
||||
message.erasure_chunk.index as usize,
|
||||
) {
|
||||
Ok(hash) => hash,
|
||||
Err(e) => {
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?message.candidate_hash,
|
||||
peer = %origin,
|
||||
error = ?e,
|
||||
"Failed to calculate chunk merkle proof",
|
||||
);
|
||||
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
|
||||
return Ok(());
|
||||
},
|
||||
};
|
||||
|
||||
let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk);
|
||||
if anticipated_hash != erasure_chunk_hash {
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?message.candidate_hash,
|
||||
peer = %origin,
|
||||
"Peer send chunk with invalid merkle proof",
|
||||
);
|
||||
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let erasure_chunk_index = &message.erasure_chunk.index;
|
||||
|
||||
{
|
||||
let per_candidate = state.per_candidate.entry(message.candidate_hash).or_default();
|
||||
|
||||
// Handle a duplicate before doing expensive checks.
|
||||
if let Some(existing) = candidate_entry.message_vault.get(&message.erasure_chunk.index) {
|
||||
let span = candidate_entry.span.child("handle-duplicate");
|
||||
// check if this particular erasure chunk was already sent by that peer before
|
||||
{
|
||||
let received_set = per_candidate
|
||||
let _span = span.child("check-entry");
|
||||
let received_set = candidate_entry
|
||||
.received_messages
|
||||
.entry(origin.clone())
|
||||
.or_default();
|
||||
if !received_set.insert(*erasure_chunk_index) {
|
||||
|
||||
if !received_set.insert(message.erasure_chunk.index) {
|
||||
modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// insert into known messages and change reputation
|
||||
if per_candidate
|
||||
.message_vault
|
||||
.insert(*erasure_chunk_index, message.clone())
|
||||
.is_some()
|
||||
// check that the message content matches what we have already before rewarding
|
||||
// the peer.
|
||||
{
|
||||
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
|
||||
} else {
|
||||
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await;
|
||||
|
||||
// save the chunk for our index
|
||||
if Some(*erasure_chunk_index) == per_candidate.validator_index {
|
||||
if store_chunk(
|
||||
ctx,
|
||||
message.candidate_hash,
|
||||
descriptor.relay_parent,
|
||||
message.erasure_chunk.index,
|
||||
message.erasure_chunk.clone(),
|
||||
).await?.is_err() {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Failed to store erasure chunk to availability store"
|
||||
);
|
||||
}
|
||||
let _span = span.child("check-accurate");
|
||||
if existing == &message {
|
||||
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
|
||||
} else {
|
||||
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let span = {
|
||||
let mut span = candidate_entry.span.child("process-new-chunk");
|
||||
span.add_string_tag("peer-id", &origin.to_base58());
|
||||
span
|
||||
};
|
||||
|
||||
// check the merkle proof against the erasure root in the candidate descriptor.
|
||||
let anticipated_hash = {
|
||||
let _span = span.child("check-merkle-root");
|
||||
match branch_hash(
|
||||
&candidate_entry.descriptor.erasure_root,
|
||||
&message.erasure_chunk.proof,
|
||||
message.erasure_chunk.index as usize,
|
||||
) {
|
||||
Ok(hash) => hash,
|
||||
Err(e) => {
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?message.candidate_hash,
|
||||
peer = %origin,
|
||||
error = ?e,
|
||||
"Failed to calculate chunk merkle proof",
|
||||
);
|
||||
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
|
||||
return Ok(());
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
{
|
||||
let _span = span.child("check-chunk-hash");
|
||||
let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk);
|
||||
if anticipated_hash != erasure_chunk_hash {
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
candidate_hash = ?message.candidate_hash,
|
||||
peer = %origin,
|
||||
"Peer sent chunk with invalid merkle proof",
|
||||
);
|
||||
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// insert into known messages and change reputation. we've guaranteed
|
||||
// above that the message vault doesn't contain any message under this
|
||||
// chunk index already.
|
||||
|
||||
candidate_entry
|
||||
.received_messages
|
||||
.entry(origin.clone())
|
||||
.or_default()
|
||||
.insert(message.erasure_chunk.index);
|
||||
|
||||
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await;
|
||||
|
||||
// save the chunk for our index
|
||||
if Some(message.erasure_chunk.index) == candidate_entry.validator_index {
|
||||
let _span = span.child("store-our-chunk");
|
||||
if store_chunk(
|
||||
ctx,
|
||||
message.candidate_hash,
|
||||
candidate_entry.descriptor.relay_parent,
|
||||
message.erasure_chunk.index,
|
||||
message.erasure_chunk.clone(),
|
||||
).await?.is_err() {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Failed to store erasure chunk to availability store"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
candidate_entry.add_message(message.erasure_chunk.index, message.clone());
|
||||
candidate_entry.drop_span_after_own_availability();
|
||||
}
|
||||
|
||||
// condense the peers to the peers with interest on the candidate
|
||||
let peers = state
|
||||
.peer_views
|
||||
.clone()
|
||||
.into_iter()
|
||||
.filter(|(_, view)| {
|
||||
// peers view must contain the candidate hash too
|
||||
state
|
||||
.cached_live_candidates_unioned(view.heads.iter())
|
||||
.contains(&message.candidate_hash)
|
||||
})
|
||||
.map(|(peer, _)| -> PeerId { peer.clone() })
|
||||
.collect::<Vec<_>>();
|
||||
let peers = {
|
||||
let _span = span.child("determine-recipient-peers");
|
||||
let per_relay_parent = &state.per_relay_parent;
|
||||
|
||||
let per_candidate = state.per_candidate.entry(message.candidate_hash).or_default();
|
||||
|
||||
let peers = peers
|
||||
.into_iter()
|
||||
.filter(|peer| per_candidate.message_required_by_peer(peer, erasure_chunk_index))
|
||||
.collect::<Vec<_>>();
|
||||
state
|
||||
.peer_views
|
||||
.clone()
|
||||
.into_iter()
|
||||
.filter(|(_, view)| {
|
||||
// peers view must contain the candidate hash too
|
||||
cached_live_candidates_unioned(
|
||||
per_relay_parent,
|
||||
view.heads.iter(),
|
||||
).contains(&message.candidate_hash)
|
||||
})
|
||||
.map(|(peer, _)| -> PeerId { peer.clone() })
|
||||
.filter(|peer| candidate_entry.message_required_by_peer(peer, message.erasure_chunk.index))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
drop(span);
|
||||
// gossip that message to interested peers
|
||||
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await;
|
||||
send_tracked_gossip_messages_to_peers(ctx, candidate_entry, metrics, peers, iter::once(message)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -681,7 +770,14 @@ impl AvailabilityDistributionSubsystem {
|
||||
where
|
||||
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
|
||||
{
|
||||
let mut state = ProtocolState::default();
|
||||
let mut state = ProtocolState {
|
||||
peer_views: HashMap::new(),
|
||||
view: Default::default(),
|
||||
live_under: HashMap::new(),
|
||||
per_relay_parent: HashMap::new(),
|
||||
per_candidate: HashMap::new(),
|
||||
};
|
||||
|
||||
self.run_inner(ctx, &mut state).await
|
||||
}
|
||||
|
||||
|
||||
@@ -50,6 +50,19 @@ fn chunk_protocol_message(
|
||||
)
|
||||
}
|
||||
|
||||
fn make_per_candidate() -> PerCandidate {
|
||||
PerCandidate {
|
||||
live_in: HashSet::new(),
|
||||
message_vault: HashMap::new(),
|
||||
received_messages: HashMap::new(),
|
||||
sent_messages: HashMap::new(),
|
||||
validators: Vec::new(),
|
||||
validator_index: None,
|
||||
descriptor: Default::default(),
|
||||
span: jaeger::JaegerSpan::Disabled,
|
||||
}
|
||||
}
|
||||
|
||||
struct TestHarness {
|
||||
virtual_overseer: test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
|
||||
}
|
||||
@@ -1024,9 +1037,10 @@ fn remove_relay_parent_only_removes_per_candidate_if_final() {
|
||||
live_candidates: std::iter::once(candidate_hash_a).collect(),
|
||||
});
|
||||
|
||||
state.per_candidate.insert(candidate_hash_a, PerCandidate {
|
||||
live_in: vec![hash_a, hash_b].into_iter().collect(),
|
||||
..Default::default()
|
||||
state.per_candidate.insert(candidate_hash_a, {
|
||||
let mut per_candidate = make_per_candidate();
|
||||
per_candidate.live_in = vec![hash_a, hash_b].into_iter().collect();
|
||||
per_candidate
|
||||
});
|
||||
|
||||
state.remove_relay_parent(&hash_a);
|
||||
@@ -1052,6 +1066,8 @@ fn add_relay_parent_includes_all_live_candidates() {
|
||||
let candidate_hash_a = CandidateHash([10u8; 32].into());
|
||||
let candidate_hash_b = CandidateHash([11u8; 32].into());
|
||||
|
||||
state.per_candidate.insert(candidate_hash_b, make_per_candidate());
|
||||
|
||||
let candidates = vec![
|
||||
(candidate_hash_a, FetchedLiveCandidate::Fresh(Default::default())),
|
||||
(candidate_hash_b, FetchedLiveCandidate::Cached),
|
||||
|
||||
@@ -495,8 +495,16 @@ where
|
||||
NetworkBridgeEvent::PeerMessage(remote, message) => {
|
||||
match message {
|
||||
protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) => {
|
||||
let mut _span = jaeger::hash_span(&relay_parent, "bitfield-gossip-received");
|
||||
_span.add_string_tag("peer-id", &remote.to_base58());
|
||||
let mut _span = {
|
||||
let mut span = jaeger::hash_span(&relay_parent, "bitfield-gossip-received");
|
||||
span.add_string_tag("peer-id", &remote.to_base58());
|
||||
span.add_string_tag(
|
||||
"claimed-validator",
|
||||
&format!("{}", bitfield.validator_index()),
|
||||
);
|
||||
span
|
||||
};
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, peer_id = %remote, "received bitfield gossip from peer");
|
||||
let gossiped_bitfield = BitfieldGossipMessage {
|
||||
relay_parent,
|
||||
|
||||
@@ -731,6 +731,10 @@ async fn handle_incoming_message<'a>(
|
||||
"candidate-hash",
|
||||
&format!("{:?}", candidate_hash.0),
|
||||
);
|
||||
span.add_string_tag(
|
||||
"peer-id",
|
||||
&peer.to_base58(),
|
||||
);
|
||||
span
|
||||
};
|
||||
|
||||
|
||||
@@ -129,6 +129,12 @@ impl JaegerSpan {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for JaegerSpan {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "<jaeger span>")
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Option<mick_jaeger::Span>> for JaegerSpan {
|
||||
fn from(src: Option<mick_jaeger::Span>) -> Self {
|
||||
if let Some(span) = src {
|
||||
@@ -146,9 +152,12 @@ impl From<mick_jaeger::Span> for JaegerSpan {
|
||||
}
|
||||
|
||||
/// Shortcut for [`candidate_hash_span`] with the hash of the `Candidate` block.
|
||||
#[inline(always)]
|
||||
pub fn candidate_hash_span(candidate_hash: &CandidateHash, span_name: impl Into<String>) -> JaegerSpan {
|
||||
INSTANCE.read_recursive().span(|| { candidate_hash.0 }, span_name).into()
|
||||
let mut span: JaegerSpan = INSTANCE.read_recursive()
|
||||
.span(|| { candidate_hash.0 }, span_name).into();
|
||||
|
||||
span.add_string_tag("candidate-hash", &format!("{:?}", candidate_hash.0));
|
||||
span
|
||||
}
|
||||
|
||||
/// Shortcut for [`hash_span`] with the hash of the `PoV`.
|
||||
|
||||
Reference in New Issue
Block a user