diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index ebeae87d9c..fb5f3825bd 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -900,8 +900,16 @@ async fn circulate_statement_and_dependents( match active_head.note_statement(statement) { NotedStatement::Fresh(stored) => Some(( *stored.compact().candidate_hash(), - circulate_statement(gossip_peers, peers, ctx, relay_parent, stored, priority_peers) - .await, + circulate_statement( + gossip_peers, + peers, + ctx, + relay_parent, + stored, + priority_peers, + metrics, + ) + .await, )), _ => None, } @@ -930,11 +938,18 @@ async fn circulate_statement_and_dependents( } } +/// Create a network message from a given statement. fn statement_message( relay_parent: Hash, statement: SignedFullStatement, + metrics: &Metrics, ) -> protocol_v1::ValidationProtocol { - let msg = if is_statement_large(&statement) { + let (is_large, size) = is_statement_large(&statement); + if let Some(size) = size { + metrics.on_created_message(size); + } + + let msg = if is_large { protocol_v1::StatementDistributionMessage::LargeStatement(StatementMetadata { relay_parent, candidate_hash: statement.payload().candidate_hash(), @@ -949,23 +964,24 @@ fn statement_message( } /// Check whether a statement should be treated as large statement. -fn is_statement_large(statement: &SignedFullStatement) -> bool { +/// +/// Also report size of statement - if it is a `Seconded` statement, otherwise `None`. +fn is_statement_large(statement: &SignedFullStatement) -> (bool, Option) { match &statement.payload() { Statement::Seconded(committed) => { + let size = statement.as_unchecked().encoded_size(); // Runtime upgrades will always be large and even if not - no harm done. if committed.commitments.new_validation_code.is_some() { - return true + return (true, Some(size)) } - // No runtime upgrade, now we need to be more nuanced: - let size = statement.as_unchecked().encoded_size(); // Half max size seems to be a good threshold to start not using notifications: let threshold = PeerSet::Validation.get_info(IsAuthority::Yes).max_notification_size as usize / 2; - size >= threshold + (size >= threshold, Some(size)) }, - Statement::Valid(_) => false, + Statement::Valid(_) => (false, None), } } @@ -978,6 +994,7 @@ async fn circulate_statement<'a>( relay_parent: Hash, stored: StoredStatement<'a>, mut priority_peers: Vec, + metrics: &Metrics, ) -> Vec { let fingerprint = stored.fingerprint(); @@ -1032,7 +1049,7 @@ async fn circulate_statement<'a>( // Send all these peers the initial statement. if !peers_to_send.is_empty() { - let payload = statement_message(relay_parent, stored.statement.clone()); + let payload = statement_message(relay_parent, stored.statement.clone(), metrics); gum::trace!( target: LOG_TARGET, ?peers_to_send, @@ -1069,7 +1086,7 @@ async fn send_statements_about( continue } peer_data.send(&relay_parent, &fingerprint); - let payload = statement_message(relay_parent, statement.statement.clone()); + let payload = statement_message(relay_parent, statement.statement.clone(), metrics); gum::trace!( target: LOG_TARGET, @@ -1104,7 +1121,7 @@ async fn send_statements( continue } peer_data.send(&relay_parent, &fingerprint); - let payload = statement_message(relay_parent, statement.statement.clone()); + let payload = statement_message(relay_parent, statement.statement.clone(), metrics); gum::trace!( target: LOG_TARGET, @@ -1288,8 +1305,16 @@ async fn handle_incoming_message_and_circulate<'a>( // statement before a `Seconded` statement. `Seconded` statements are the only ones // that require dependents. Thus, if this is a `Seconded` statement for a candidate we // were not aware of before, we cannot have any dependent statements from the candidate. - let _ = circulate_statement(gossip_peers, peers, ctx, relay_parent, statement, Vec::new()) - .await; + let _ = circulate_statement( + gossip_peers, + peers, + ctx, + relay_parent, + statement, + Vec::new(), + metrics, + ) + .await; } } @@ -1897,7 +1922,7 @@ impl StatementDistributionSubsystem { let _timer = metrics.time_share(); // Make sure we have data in cache: - if is_statement_large(&statement) { + if is_statement_large(&statement).0 { if let Statement::Seconded(committed) = &statement.payload() { let active_head = active_heads .get_mut(&relay_parent) diff --git a/polkadot/node/network/statement-distribution/src/metrics.rs b/polkadot/node/network/statement-distribution/src/metrics.rs index 1cd6e468a1..a425518720 100644 --- a/polkadot/node/network/statement-distribution/src/metrics.rs +++ b/polkadot/node/network/statement-distribution/src/metrics.rs @@ -25,6 +25,7 @@ struct MetricsInner { share: prometheus::Histogram, network_bridge_update_v1: prometheus::Histogram, statements_unexpected: prometheus::CounterVec, + created_message_size: prometheus::Gauge, } /// Statement Distribution metrics. @@ -97,6 +98,13 @@ impl Metrics { metrics.statements_unexpected.with_label_values(&["large"]).inc(); } } + + /// Report size of a created message. + pub fn on_created_message(&self, size: usize) { + if let Some(metrics) = &self.0 { + metrics.created_message_size.set(size as u64); + } + } } impl metrics::Metrics for Metrics { @@ -159,6 +167,13 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + created_message_size: prometheus::register( + prometheus::Gauge::with_opts(prometheus::Opts::new( + "polkadot_parachain_statement_distribution_created_message_size", + "Size of created messages containing Seconded statements.", + ))?, + registry, + )?, }; Ok(Metrics(Some(metrics))) } diff --git a/polkadot/node/network/statement-distribution/src/requester.rs b/polkadot/node/network/statement-distribution/src/requester.rs index 904333f046..5cff21117a 100644 --- a/polkadot/node/network/statement-distribution/src/requester.rs +++ b/polkadot/node/network/statement-distribution/src/requester.rs @@ -83,6 +83,13 @@ pub async fn fetch( .with_relay_parent(relay_parent) .with_stage(Stage::StatementDistribution); + gum::debug!( + target: LOG_TARGET, + ?candidate_hash, + ?relay_parent, + "Fetch for large statement started", + ); + // Peers we already tried (and failed). let mut tried_peers = Vec::new(); // Peers left for trying out. diff --git a/polkadot/node/network/statement-distribution/src/tests.rs b/polkadot/node/network/statement-distribution/src/tests.rs index 782c4104ad..10462fc1a5 100644 --- a/polkadot/node/network/statement-distribution/src/tests.rs +++ b/polkadot/node/network/statement-distribution/src/tests.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use super::*; +use super::{metrics::Metrics, *}; use assert_matches::assert_matches; use futures::executor::{self, block_on}; use futures_timer::Delay; @@ -539,7 +539,8 @@ fn peer_view_update_sends_messages() { for statement in active_head.statements_about(candidate_hash) { let message = handle.recv().await; let expected_to = vec![peer.clone()]; - let expected_payload = statement_message(hash_c, statement.statement.clone()); + let expected_payload = + statement_message(hash_c, statement.statement.clone(), &Metrics::default()); assert_matches!( message, @@ -638,6 +639,7 @@ fn circulated_statement_goes_to_all_peers_with_view() { hash_b, statement, Vec::new(), + &Metrics::default(), ) .await; @@ -680,7 +682,7 @@ fn circulated_statement_goes_to_all_peers_with_view() { assert_eq!( payload, - statement_message(hash_b, statement.statement.clone()), + statement_message(hash_b, statement.statement.clone(), &Metrics::default()), ); } )