Logs for large messages + metrics on size. (#5143)

* Logs for large messages + metrics on size.

* Fix typo
This commit is contained in:
Robert Klotzner
2022-03-17 20:57:35 +01:00
committed by GitHub
parent dd6fdf0f14
commit 569599853d
4 changed files with 67 additions and 18 deletions
@@ -900,8 +900,16 @@ async fn circulate_statement_and_dependents(
match active_head.note_statement(statement) {
NotedStatement::Fresh(stored) => Some((
*stored.compact().candidate_hash(),
circulate_statement(gossip_peers, peers, ctx, relay_parent, stored, priority_peers)
.await,
circulate_statement(
gossip_peers,
peers,
ctx,
relay_parent,
stored,
priority_peers,
metrics,
)
.await,
)),
_ => None,
}
@@ -930,11 +938,18 @@ async fn circulate_statement_and_dependents(
}
}
/// Create a network message from a given statement.
fn statement_message(
relay_parent: Hash,
statement: SignedFullStatement,
metrics: &Metrics,
) -> protocol_v1::ValidationProtocol {
let msg = if is_statement_large(&statement) {
let (is_large, size) = is_statement_large(&statement);
if let Some(size) = size {
metrics.on_created_message(size);
}
let msg = if is_large {
protocol_v1::StatementDistributionMessage::LargeStatement(StatementMetadata {
relay_parent,
candidate_hash: statement.payload().candidate_hash(),
@@ -949,23 +964,24 @@ fn statement_message(
}
/// Check whether a statement should be treated as large statement.
fn is_statement_large(statement: &SignedFullStatement) -> bool {
///
/// Also report size of statement - if it is a `Seconded` statement, otherwise `None`.
fn is_statement_large(statement: &SignedFullStatement) -> (bool, Option<usize>) {
match &statement.payload() {
Statement::Seconded(committed) => {
let size = statement.as_unchecked().encoded_size();
// Runtime upgrades will always be large and even if not - no harm done.
if committed.commitments.new_validation_code.is_some() {
return true
return (true, Some(size))
}
// No runtime upgrade, now we need to be more nuanced:
let size = statement.as_unchecked().encoded_size();
// Half max size seems to be a good threshold to start not using notifications:
let threshold =
PeerSet::Validation.get_info(IsAuthority::Yes).max_notification_size as usize / 2;
size >= threshold
(size >= threshold, Some(size))
},
Statement::Valid(_) => false,
Statement::Valid(_) => (false, None),
}
}
@@ -978,6 +994,7 @@ async fn circulate_statement<'a>(
relay_parent: Hash,
stored: StoredStatement<'a>,
mut priority_peers: Vec<PeerId>,
metrics: &Metrics,
) -> Vec<PeerId> {
let fingerprint = stored.fingerprint();
@@ -1032,7 +1049,7 @@ async fn circulate_statement<'a>(
// Send all these peers the initial statement.
if !peers_to_send.is_empty() {
let payload = statement_message(relay_parent, stored.statement.clone());
let payload = statement_message(relay_parent, stored.statement.clone(), metrics);
gum::trace!(
target: LOG_TARGET,
?peers_to_send,
@@ -1069,7 +1086,7 @@ async fn send_statements_about(
continue
}
peer_data.send(&relay_parent, &fingerprint);
let payload = statement_message(relay_parent, statement.statement.clone());
let payload = statement_message(relay_parent, statement.statement.clone(), metrics);
gum::trace!(
target: LOG_TARGET,
@@ -1104,7 +1121,7 @@ async fn send_statements(
continue
}
peer_data.send(&relay_parent, &fingerprint);
let payload = statement_message(relay_parent, statement.statement.clone());
let payload = statement_message(relay_parent, statement.statement.clone(), metrics);
gum::trace!(
target: LOG_TARGET,
@@ -1288,8 +1305,16 @@ async fn handle_incoming_message_and_circulate<'a>(
// statement before a `Seconded` statement. `Seconded` statements are the only ones
// that require dependents. Thus, if this is a `Seconded` statement for a candidate we
// were not aware of before, we cannot have any dependent statements from the candidate.
let _ = circulate_statement(gossip_peers, peers, ctx, relay_parent, statement, Vec::new())
.await;
let _ = circulate_statement(
gossip_peers,
peers,
ctx,
relay_parent,
statement,
Vec::new(),
metrics,
)
.await;
}
}
@@ -1897,7 +1922,7 @@ impl StatementDistributionSubsystem {
let _timer = metrics.time_share();
// Make sure we have data in cache:
if is_statement_large(&statement) {
if is_statement_large(&statement).0 {
if let Statement::Seconded(committed) = &statement.payload() {
let active_head = active_heads
.get_mut(&relay_parent)
@@ -25,6 +25,7 @@ struct MetricsInner {
share: prometheus::Histogram,
network_bridge_update_v1: prometheus::Histogram,
statements_unexpected: prometheus::CounterVec<prometheus::U64>,
created_message_size: prometheus::Gauge<prometheus::U64>,
}
/// Statement Distribution metrics.
@@ -97,6 +98,13 @@ impl Metrics {
metrics.statements_unexpected.with_label_values(&["large"]).inc();
}
}
/// Report size of a created message.
pub fn on_created_message(&self, size: usize) {
if let Some(metrics) = &self.0 {
metrics.created_message_size.set(size as u64);
}
}
}
impl metrics::Metrics for Metrics {
@@ -159,6 +167,13 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
created_message_size: prometheus::register(
prometheus::Gauge::with_opts(prometheus::Opts::new(
"polkadot_parachain_statement_distribution_created_message_size",
"Size of created messages containing Seconded statements.",
))?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
@@ -83,6 +83,13 @@ pub async fn fetch(
.with_relay_parent(relay_parent)
.with_stage(Stage::StatementDistribution);
gum::debug!(
target: LOG_TARGET,
?candidate_hash,
?relay_parent,
"Fetch for large statement started",
);
// Peers we already tried (and failed).
let mut tried_peers = Vec::new();
// Peers left for trying out.
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use super::{metrics::Metrics, *};
use assert_matches::assert_matches;
use futures::executor::{self, block_on};
use futures_timer::Delay;
@@ -539,7 +539,8 @@ fn peer_view_update_sends_messages() {
for statement in active_head.statements_about(candidate_hash) {
let message = handle.recv().await;
let expected_to = vec![peer.clone()];
let expected_payload = statement_message(hash_c, statement.statement.clone());
let expected_payload =
statement_message(hash_c, statement.statement.clone(), &Metrics::default());
assert_matches!(
message,
@@ -638,6 +639,7 @@ fn circulated_statement_goes_to_all_peers_with_view() {
hash_b,
statement,
Vec::new(),
&Metrics::default(),
)
.await;
@@ -680,7 +682,7 @@ fn circulated_statement_goes_to_all_peers_with_view() {
assert_eq!(
payload,
statement_message(hash_b, statement.statement.clone()),
statement_message(hash_b, statement.statement.clone(), &Metrics::default()),
);
}
)