mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 12:51:02 +00:00
Fix statement distribution: forward statements to other peers. (#2146)
* add candidate hash statement circulation span * add relay-parent to hash-span * Some typos and misspellings in docs I found, during my studies. (#2144) * Fix stale link to overseer docs * Some typos and mispellings in docs/comments I found during studying how Polkadot works. * Rococo V1 (#2141) * Update to latest master and use 30 minutes sessions * add bootnodes to chainspec * Update Substrate * Update chain-spec * Update Cargo.lock * GENESIS * Change session length to one hour * Bump spec_version to not fuck anything up ;) Co-authored-by: Erin Grasmick <erin@parity.io> * avoid creating duplicate unbacked spans when we see extra statements (#2145) * improve jaeger spans for statement distribution * tweak and add failing test for repropagation * make a change that gets the test passing * guide: clarify * remove semicolon Co-authored-by: Robert Klotzner <eskimor@users.noreply.github.com> Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> Co-authored-by: Erin Grasmick <erin@parity.io>
This commit is contained in:
committed by
GitHub
parent
e95be77eb6
commit
01d271caeb
@@ -389,16 +389,23 @@ struct ActiveHeadData {
|
||||
session_index: sp_staking::SessionIndex,
|
||||
/// How many `Seconded` statements we've seen per validator.
|
||||
seconded_counts: HashMap<ValidatorIndex, usize>,
|
||||
/// A Jaeger span for this head, so we can attach data to it.
|
||||
span: jaeger::JaegerSpan,
|
||||
}
|
||||
|
||||
impl ActiveHeadData {
|
||||
fn new(validators: Vec<ValidatorId>, session_index: sp_staking::SessionIndex) -> Self {
|
||||
fn new(
|
||||
validators: Vec<ValidatorId>,
|
||||
session_index: sp_staking::SessionIndex,
|
||||
relay_parent: &Hash,
|
||||
) -> Self {
|
||||
ActiveHeadData {
|
||||
candidates: Default::default(),
|
||||
statements: Default::default(),
|
||||
validators,
|
||||
session_index,
|
||||
seconded_counts: Default::default(),
|
||||
span: jaeger::hash_span(&relay_parent, "statement-dist-active"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -532,6 +539,15 @@ async fn circulate_statement_and_dependents(
|
||||
None => return,
|
||||
};
|
||||
|
||||
let _span = {
|
||||
let mut span = active_head.span.child("circulate-statement");
|
||||
span.add_string_tag(
|
||||
"candidate-hash",
|
||||
&format!("{:?}", statement.payload().candidate_hash().0),
|
||||
);
|
||||
span
|
||||
};
|
||||
|
||||
// First circulate the statement directly to all peers needing it.
|
||||
// The borrow of `active_head` needs to encompass only this (Rust) statement.
|
||||
let outputs: Option<(CandidateHash, Vec<PeerId>)> = {
|
||||
@@ -674,7 +690,7 @@ async fn report_peer(
|
||||
// if we were not already aware of it, along with the corresponding relay-parent.
|
||||
//
|
||||
// This function checks the signature and ensures the statement is compatible with our
|
||||
// view.
|
||||
// view. It also notifies candidate backing if the statement was previously unknown.
|
||||
#[tracing::instrument(level = "trace", skip(peer_data, ctx, active_heads, metrics), fields(subsystem = LOG_TARGET))]
|
||||
async fn handle_incoming_message<'a>(
|
||||
peer: PeerId,
|
||||
@@ -708,6 +724,16 @@ async fn handle_incoming_message<'a>(
|
||||
}
|
||||
};
|
||||
|
||||
let candidate_hash = statement.payload().candidate_hash();
|
||||
let handle_incoming_span = {
|
||||
let mut span = active_head.span.child("handle-incoming");
|
||||
span.add_string_tag(
|
||||
"candidate-hash",
|
||||
&format!("{:?}", candidate_hash.0),
|
||||
);
|
||||
span
|
||||
};
|
||||
|
||||
// check the signature on the statement.
|
||||
if let Err(()) = check_statement_signature(&active_head, relay_parent, &statement) {
|
||||
report_peer(ctx, peer, COST_INVALID_SIGNATURE).await;
|
||||
@@ -733,7 +759,7 @@ async fn handle_incoming_message<'a>(
|
||||
peer_data,
|
||||
ctx,
|
||||
relay_parent,
|
||||
fingerprint.0.candidate_hash().clone(),
|
||||
candidate_hash,
|
||||
&*active_head,
|
||||
metrics,
|
||||
).await;
|
||||
@@ -753,6 +779,16 @@ async fn handle_incoming_message<'a>(
|
||||
}
|
||||
NotedStatement::Fresh(statement) => {
|
||||
report_peer(ctx, peer, BENEFIT_VALID_STATEMENT_FIRST).await;
|
||||
|
||||
let mut _span = handle_incoming_span.child("notify-backing");
|
||||
|
||||
// When we receive a new message from a peer, we forward it to the
|
||||
// candidate backing subsystem.
|
||||
let message = AllMessages::CandidateBacking(
|
||||
CandidateBackingMessage::Statement(relay_parent, statement.statement.clone())
|
||||
);
|
||||
ctx.send_message(message).await;
|
||||
|
||||
Some((relay_parent, statement))
|
||||
}
|
||||
}
|
||||
@@ -815,9 +851,9 @@ async fn handle_network_update(
|
||||
peers.remove(&peer);
|
||||
}
|
||||
NetworkBridgeEvent::PeerMessage(peer, message) => {
|
||||
match peers.get_mut(&peer) {
|
||||
let handled_incoming = match peers.get_mut(&peer) {
|
||||
Some(data) => {
|
||||
let new_stored = handle_incoming_message(
|
||||
handle_incoming_message(
|
||||
peer,
|
||||
data,
|
||||
&*our_view,
|
||||
@@ -826,21 +862,27 @@ async fn handle_network_update(
|
||||
message,
|
||||
metrics,
|
||||
statement_listeners,
|
||||
).await;
|
||||
|
||||
if let Some((relay_parent, new)) = new_stored {
|
||||
let mut _span = jaeger::hash_span(&relay_parent, "sending-statement");
|
||||
// When we receive a new message from a peer, we forward it to the
|
||||
// candidate backing subsystem.
|
||||
let message = AllMessages::CandidateBacking(
|
||||
CandidateBackingMessage::Statement(relay_parent, new.statement.clone())
|
||||
);
|
||||
ctx.send_message(message).await;
|
||||
}
|
||||
).await
|
||||
}
|
||||
None => (),
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
// if we got a fresh message, we need to circulate it to all peers.
|
||||
if let Some((relay_parent, statement)) = handled_incoming {
|
||||
// we can ignore the set of peers who this function returns as now expecting
|
||||
// dependent statements.
|
||||
//
|
||||
// we have the invariant in this subsystem that we never store a `Valid` or `Invalid`
|
||||
// statement before a `Seconded` statement. `Seconded` statements are the only ones
|
||||
// that require dependents. Thus, if this is a `Seconded` statement for a candidate we
|
||||
// were not aware of before, we cannot have any dependent statements from the candidate.
|
||||
let _ = circulate_statement(
|
||||
peers,
|
||||
ctx,
|
||||
relay_parent,
|
||||
statement,
|
||||
).await;
|
||||
}
|
||||
}
|
||||
NetworkBridgeEvent::PeerViewChange(peer, view) => {
|
||||
match peers.get_mut(&peer) {
|
||||
@@ -935,7 +977,7 @@ impl StatementDistribution {
|
||||
};
|
||||
|
||||
active_heads.entry(relay_parent)
|
||||
.or_insert(ActiveHeadData::new(validators, session_index));
|
||||
.or_insert(ActiveHeadData::new(validators, session_index, &relay_parent));
|
||||
}
|
||||
}
|
||||
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {
|
||||
@@ -945,7 +987,6 @@ impl StatementDistribution {
|
||||
FromOverseer::Communication { msg } => match msg {
|
||||
StatementDistributionMessage::Share(relay_parent, statement) => {
|
||||
let _timer = metrics.time_share();
|
||||
let mut _span = jaeger::hash_span(&relay_parent, "circulate-statement");
|
||||
|
||||
inform_statement_listeners(
|
||||
&statement,
|
||||
@@ -1072,7 +1113,7 @@ mod tests {
|
||||
use futures::executor::{self, block_on};
|
||||
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
|
||||
use sc_keystore::LocalKeystore;
|
||||
use polkadot_node_network_protocol::view;
|
||||
use polkadot_node_network_protocol::{view, ObservedRole};
|
||||
|
||||
#[test]
|
||||
fn active_head_accepts_only_2_seconded_per_validator() {
|
||||
@@ -1110,7 +1151,7 @@ mod tests {
|
||||
c
|
||||
};
|
||||
|
||||
let mut head_data = ActiveHeadData::new(validators, session_index);
|
||||
let mut head_data = ActiveHeadData::new(validators, session_index, &parent_hash);
|
||||
|
||||
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
|
||||
let alice_public = SyncCryptoStore::sr25519_generate_new(
|
||||
@@ -1368,7 +1409,7 @@ mod tests {
|
||||
).unwrap();
|
||||
|
||||
let new_head_data = {
|
||||
let mut data = ActiveHeadData::new(validators, session_index);
|
||||
let mut data = ActiveHeadData::new(validators, session_index, &hash_c);
|
||||
|
||||
let noted = data.note_statement(block_on(SignedFullStatement::sign(
|
||||
&keystore,
|
||||
@@ -1586,4 +1627,162 @@ mod tests {
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn receiving_from_one_sends_to_another_and_to_candidate_backing() {
|
||||
let hash_a = Hash::repeat_byte(1);
|
||||
|
||||
let candidate = {
|
||||
let mut c = CommittedCandidateReceipt::default();
|
||||
c.descriptor.relay_parent = hash_a;
|
||||
c.descriptor.para_id = 1.into();
|
||||
c
|
||||
};
|
||||
|
||||
let peer_a = PeerId::random();
|
||||
let peer_b = PeerId::random();
|
||||
|
||||
let validators = vec![
|
||||
Sr25519Keyring::Alice.public().into(),
|
||||
Sr25519Keyring::Bob.public().into(),
|
||||
Sr25519Keyring::Charlie.public().into(),
|
||||
];
|
||||
|
||||
let session_index = 1;
|
||||
|
||||
let pool = sp_core::testing::TaskExecutor::new();
|
||||
let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
|
||||
|
||||
let bg = async move {
|
||||
let s = StatementDistribution { metrics: Default::default() };
|
||||
s.run(ctx).await.unwrap();
|
||||
};
|
||||
|
||||
let test_fut = async move {
|
||||
// register our active heads.
|
||||
handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||
activated: vec![hash_a].into(),
|
||||
deactivated: vec![].into(),
|
||||
}))).await;
|
||||
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::RuntimeApi(
|
||||
RuntimeApiMessage::Request(r, RuntimeApiRequest::Validators(tx))
|
||||
)
|
||||
if r == hash_a
|
||||
=> {
|
||||
let _ = tx.send(Ok(validators));
|
||||
}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::RuntimeApi(
|
||||
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
|
||||
)
|
||||
if r == hash_a
|
||||
=> {
|
||||
let _ = tx.send(Ok(session_index));
|
||||
}
|
||||
);
|
||||
|
||||
// notify of peers and view
|
||||
handle.send(FromOverseer::Communication {
|
||||
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full)
|
||||
)
|
||||
}).await;
|
||||
|
||||
handle.send(FromOverseer::Communication {
|
||||
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)
|
||||
)
|
||||
}).await;
|
||||
|
||||
handle.send(FromOverseer::Communication {
|
||||
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a])
|
||||
)
|
||||
}).await;
|
||||
|
||||
handle.send(FromOverseer::Communication {
|
||||
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a])
|
||||
)
|
||||
}).await;
|
||||
|
||||
handle.send(FromOverseer::Communication {
|
||||
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::OurViewChange(view![hash_a])
|
||||
)
|
||||
}).await;
|
||||
|
||||
// receive a seconded statement from peer A. it should be propagated onwards to peer B and to
|
||||
// candidate backing.
|
||||
let statement = {
|
||||
let signing_context = SigningContext {
|
||||
parent_hash: hash_a,
|
||||
session_index,
|
||||
};
|
||||
|
||||
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
|
||||
let alice_public = CryptoStore::sr25519_generate_new(
|
||||
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
|
||||
).await.unwrap();
|
||||
|
||||
SignedFullStatement::sign(
|
||||
&keystore,
|
||||
Statement::Seconded(candidate),
|
||||
&signing_context,
|
||||
0,
|
||||
&alice_public.into(),
|
||||
).await.expect("should be signed")
|
||||
};
|
||||
|
||||
handle.send(FromOverseer::Communication {
|
||||
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
|
||||
NetworkBridgeEvent::PeerMessage(
|
||||
peer_a.clone(),
|
||||
protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone()),
|
||||
)
|
||||
)
|
||||
}).await;
|
||||
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::ReportPeer(p, r)
|
||||
) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST => {}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::CandidateBacking(
|
||||
CandidateBackingMessage::Statement(r, s)
|
||||
) if r == hash_a && s == statement => {}
|
||||
);
|
||||
|
||||
assert_matches!(
|
||||
handle.recv().await,
|
||||
AllMessages::NetworkBridge(
|
||||
NetworkBridgeMessage::SendValidationMessage(
|
||||
recipients,
|
||||
protocol_v1::ValidationProtocol::StatementDistribution(
|
||||
protocol_v1::StatementDistributionMessage::Statement(r, s)
|
||||
),
|
||||
)
|
||||
) => {
|
||||
assert_eq!(recipients, vec![peer_b.clone()]);
|
||||
assert_eq!(r, hash_a);
|
||||
assert_eq!(s, statement);
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
futures::pin_mut!(test_fut);
|
||||
futures::pin_mut!(bg);
|
||||
|
||||
executor::block_on(future::select(test_fut, bg));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,9 +159,13 @@ pub fn pov_span(pov: &PoV, span_name: impl Into<String>) -> JaegerSpan {
|
||||
|
||||
/// Creates a `Span` referring to the given hash. All spans created with [`hash_span`] with the
|
||||
/// same hash (even from multiple different nodes) will be visible in the same view on Jaeger.
|
||||
///
|
||||
/// This span automatically has the `relay-parent` tag set.
|
||||
#[inline(always)]
|
||||
pub fn hash_span(hash: &Hash, span_name: impl Into<String>) -> JaegerSpan {
|
||||
INSTANCE.read_recursive().span(|| { *hash }, span_name).into()
|
||||
let mut span: JaegerSpan = INSTANCE.read_recursive().span(|| { *hash }, span_name).into();
|
||||
span.add_string_tag("relay-parent", &format!("{:?}", hash));
|
||||
span
|
||||
}
|
||||
|
||||
/// Stateful convenience wrapper around [`mick_jaeger`].
|
||||
|
||||
@@ -38,7 +38,7 @@ There is a very simple state machine which governs which messages we are willing
|
||||
|
||||
A: Initial State. Receive `SignedFullStatement(Statement::Second)`: extract `Statement`, forward to Candidate Backing and PoV Distribution, proceed to B. Receive any other `SignedFullStatement` variant: drop it.
|
||||
|
||||
B: Receive any `SignedFullStatement`: check signature, forward to Candidate Backing. Receive `OverseerMessage::StopWork`: proceed to C.
|
||||
B: Receive any `SignedFullStatement`: check signature and determine whether the statement is new to us. if new, forward to Candidate Backing and circulate to other peers. Receive `OverseerMessage::StopWork`: proceed to C.
|
||||
|
||||
C: Receive any message for this block: drop it.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user