diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 02ed1cf7bd..951f76d526 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -389,16 +389,23 @@ struct ActiveHeadData { session_index: sp_staking::SessionIndex, /// How many `Seconded` statements we've seen per validator. seconded_counts: HashMap, + /// A Jaeger span for this head, so we can attach data to it. + span: jaeger::JaegerSpan, } impl ActiveHeadData { - fn new(validators: Vec, session_index: sp_staking::SessionIndex) -> Self { + fn new( + validators: Vec, + session_index: sp_staking::SessionIndex, + relay_parent: &Hash, + ) -> Self { ActiveHeadData { candidates: Default::default(), statements: Default::default(), validators, session_index, seconded_counts: Default::default(), + span: jaeger::hash_span(&relay_parent, "statement-dist-active"), } } @@ -532,6 +539,15 @@ async fn circulate_statement_and_dependents( None => return, }; + let _span = { + let mut span = active_head.span.child("circulate-statement"); + span.add_string_tag( + "candidate-hash", + &format!("{:?}", statement.payload().candidate_hash().0), + ); + span + }; + // First circulate the statement directly to all peers needing it. // The borrow of `active_head` needs to encompass only this (Rust) statement. let outputs: Option<(CandidateHash, Vec)> = { @@ -674,7 +690,7 @@ async fn report_peer( // if we were not already aware of it, along with the corresponding relay-parent. // // This function checks the signature and ensures the statement is compatible with our -// view. +// view. It also notifies candidate backing if the statement was previously unknown. #[tracing::instrument(level = "trace", skip(peer_data, ctx, active_heads, metrics), fields(subsystem = LOG_TARGET))] async fn handle_incoming_message<'a>( peer: PeerId, @@ -708,6 +724,16 @@ async fn handle_incoming_message<'a>( } }; + let candidate_hash = statement.payload().candidate_hash(); + let handle_incoming_span = { + let mut span = active_head.span.child("handle-incoming"); + span.add_string_tag( + "candidate-hash", + &format!("{:?}", candidate_hash.0), + ); + span + }; + // check the signature on the statement. if let Err(()) = check_statement_signature(&active_head, relay_parent, &statement) { report_peer(ctx, peer, COST_INVALID_SIGNATURE).await; @@ -733,7 +759,7 @@ async fn handle_incoming_message<'a>( peer_data, ctx, relay_parent, - fingerprint.0.candidate_hash().clone(), + candidate_hash, &*active_head, metrics, ).await; @@ -753,6 +779,16 @@ async fn handle_incoming_message<'a>( } NotedStatement::Fresh(statement) => { report_peer(ctx, peer, BENEFIT_VALID_STATEMENT_FIRST).await; + + let mut _span = handle_incoming_span.child("notify-backing"); + + // When we receive a new message from a peer, we forward it to the + // candidate backing subsystem. + let message = AllMessages::CandidateBacking( + CandidateBackingMessage::Statement(relay_parent, statement.statement.clone()) + ); + ctx.send_message(message).await; + Some((relay_parent, statement)) } } @@ -815,9 +851,9 @@ async fn handle_network_update( peers.remove(&peer); } NetworkBridgeEvent::PeerMessage(peer, message) => { - match peers.get_mut(&peer) { + let handled_incoming = match peers.get_mut(&peer) { Some(data) => { - let new_stored = handle_incoming_message( + handle_incoming_message( peer, data, &*our_view, @@ -826,21 +862,27 @@ async fn handle_network_update( message, metrics, statement_listeners, - ).await; - - if let Some((relay_parent, new)) = new_stored { - let mut _span = jaeger::hash_span(&relay_parent, "sending-statement"); - // When we receive a new message from a peer, we forward it to the - // candidate backing subsystem. - let message = AllMessages::CandidateBacking( - CandidateBackingMessage::Statement(relay_parent, new.statement.clone()) - ); - ctx.send_message(message).await; - } + ).await } - None => (), - } + None => None, + }; + // if we got a fresh message, we need to circulate it to all peers. + if let Some((relay_parent, statement)) = handled_incoming { + // we can ignore the set of peers who this function returns as now expecting + // dependent statements. + // + // we have the invariant in this subsystem that we never store a `Valid` or `Invalid` + // statement before a `Seconded` statement. `Seconded` statements are the only ones + // that require dependents. Thus, if this is a `Seconded` statement for a candidate we + // were not aware of before, we cannot have any dependent statements from the candidate. + let _ = circulate_statement( + peers, + ctx, + relay_parent, + statement, + ).await; + } } NetworkBridgeEvent::PeerViewChange(peer, view) => { match peers.get_mut(&peer) { @@ -935,7 +977,7 @@ impl StatementDistribution { }; active_heads.entry(relay_parent) - .or_insert(ActiveHeadData::new(validators, session_index)); + .or_insert(ActiveHeadData::new(validators, session_index, &relay_parent)); } } FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => { @@ -945,7 +987,6 @@ impl StatementDistribution { FromOverseer::Communication { msg } => match msg { StatementDistributionMessage::Share(relay_parent, statement) => { let _timer = metrics.time_share(); - let mut _span = jaeger::hash_span(&relay_parent, "circulate-statement"); inform_statement_listeners( &statement, @@ -1072,7 +1113,7 @@ mod tests { use futures::executor::{self, block_on}; use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore}; use sc_keystore::LocalKeystore; - use polkadot_node_network_protocol::view; + use polkadot_node_network_protocol::{view, ObservedRole}; #[test] fn active_head_accepts_only_2_seconded_per_validator() { @@ -1110,7 +1151,7 @@ mod tests { c }; - let mut head_data = ActiveHeadData::new(validators, session_index); + let mut head_data = ActiveHeadData::new(validators, session_index, &parent_hash); let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory()); let alice_public = SyncCryptoStore::sr25519_generate_new( @@ -1368,7 +1409,7 @@ mod tests { ).unwrap(); let new_head_data = { - let mut data = ActiveHeadData::new(validators, session_index); + let mut data = ActiveHeadData::new(validators, session_index, &hash_c); let noted = data.note_statement(block_on(SignedFullStatement::sign( &keystore, @@ -1586,4 +1627,162 @@ mod tests { ) }); } + + #[test] + fn receiving_from_one_sends_to_another_and_to_candidate_backing() { + let hash_a = Hash::repeat_byte(1); + + let candidate = { + let mut c = CommittedCandidateReceipt::default(); + c.descriptor.relay_parent = hash_a; + c.descriptor.para_id = 1.into(); + c + }; + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + + let validators = vec![ + Sr25519Keyring::Alice.public().into(), + Sr25519Keyring::Bob.public().into(), + Sr25519Keyring::Charlie.public().into(), + ]; + + let session_index = 1; + + let pool = sp_core::testing::TaskExecutor::new(); + let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); + + let bg = async move { + let s = StatementDistribution { metrics: Default::default() }; + s.run(ctx).await.unwrap(); + }; + + let test_fut = async move { + // register our active heads. + handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: vec![hash_a].into(), + deactivated: vec![].into(), + }))).await; + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(r, RuntimeApiRequest::Validators(tx)) + ) + if r == hash_a + => { + let _ = tx.send(Ok(validators)); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx)) + ) + if r == hash_a + => { + let _ = tx.send(Ok(session_index)); + } + ); + + // notify of peers and view + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full) + ) + }).await; + + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full) + ) + }).await; + + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a]) + ) + }).await; + + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a]) + ) + }).await; + + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(view![hash_a]) + ) + }).await; + + // receive a seconded statement from peer A. it should be propagated onwards to peer B and to + // candidate backing. + let statement = { + let signing_context = SigningContext { + parent_hash: hash_a, + session_index, + }; + + let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory()); + let alice_public = CryptoStore::sr25519_generate_new( + &*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed()) + ).await.unwrap(); + + SignedFullStatement::sign( + &keystore, + Statement::Seconded(candidate), + &signing_context, + 0, + &alice_public.into(), + ).await.expect("should be signed") + }; + + handle.send(FromOverseer::Communication { + msg: StatementDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone()), + ) + ) + }).await; + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(p, r) + ) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST => {} + ); + + assert_matches!( + handle.recv().await, + AllMessages::CandidateBacking( + CandidateBackingMessage::Statement(r, s) + ) if r == hash_a && s == statement => {} + ); + + assert_matches!( + handle.recv().await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage( + recipients, + protocol_v1::ValidationProtocol::StatementDistribution( + protocol_v1::StatementDistributionMessage::Statement(r, s) + ), + ) + ) => { + assert_eq!(recipients, vec![peer_b.clone()]); + assert_eq!(r, hash_a); + assert_eq!(s, statement); + } + ); + }; + + futures::pin_mut!(test_fut); + futures::pin_mut!(bg); + + executor::block_on(future::select(test_fut, bg)); + } } diff --git a/polkadot/node/subsystem/src/jaeger.rs b/polkadot/node/subsystem/src/jaeger.rs index cbd8e23ac5..35f5325116 100644 --- a/polkadot/node/subsystem/src/jaeger.rs +++ b/polkadot/node/subsystem/src/jaeger.rs @@ -159,9 +159,13 @@ pub fn pov_span(pov: &PoV, span_name: impl Into) -> JaegerSpan { /// Creates a `Span` referring to the given hash. All spans created with [`hash_span`] with the /// same hash (even from multiple different nodes) will be visible in the same view on Jaeger. +/// +/// This span automatically has the `relay-parent` tag set. #[inline(always)] pub fn hash_span(hash: &Hash, span_name: impl Into) -> JaegerSpan { - INSTANCE.read_recursive().span(|| { *hash }, span_name).into() + let mut span: JaegerSpan = INSTANCE.read_recursive().span(|| { *hash }, span_name).into(); + span.add_string_tag("relay-parent", &format!("{:?}", hash)); + span } /// Stateful convenience wrapper around [`mick_jaeger`]. diff --git a/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md b/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md index f5258b4155..9e15bc35e5 100644 --- a/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md +++ b/polkadot/roadmap/implementers-guide/src/node/backing/statement-distribution.md @@ -38,7 +38,7 @@ There is a very simple state machine which governs which messages we are willing A: Initial State. Receive `SignedFullStatement(Statement::Second)`: extract `Statement`, forward to Candidate Backing and PoV Distribution, proceed to B. Receive any other `SignedFullStatement` variant: drop it. -B: Receive any `SignedFullStatement`: check signature, forward to Candidate Backing. Receive `OverseerMessage::StopWork`: proceed to C. +B: Receive any `SignedFullStatement`: check signature and determine whether the statement is new to us. if new, forward to Candidate Backing and circulate to other peers. Receive `OverseerMessage::StopWork`: proceed to C. C: Receive any message for this block: drop it.