bitfield-dist: fix state update on gossip (#2817)

* bitfield-dist: fix state update on gossip

* fixes

* doc fixes

* oops

* 2 lines of code change
This commit is contained in:
Andronik Ordian
2021-04-05 00:25:40 +02:00
committed by GitHub
parent bfc8f4fcf3
commit 4df29e71ab
3 changed files with 133 additions and 86 deletions
@@ -33,7 +33,7 @@ use polkadot_subsystem::{
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
MIN_GOSSIP_PEERS,
self as util, MIN_GOSSIP_PEERS,
};
use polkadot_node_primitives::{SignedFullStatement};
use polkadot_primitives::v1::{
@@ -158,25 +158,21 @@ struct PeerRelayParentKnowledge {
}
impl PeerRelayParentKnowledge {
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based
/// Updates our view of the peer's knowledge with this statement's fingerprint based
/// on something that we would like to send to the peer.
///
/// This returns `None` if the peer cannot accept this statement, without altering internal
/// state.
/// NOTE: assumes `self.can_send` returned true before this call.
///
/// If the peer can accept the statement, this returns `Some` and updates the internal state.
/// Once the knowledge has incorporated a statement, it cannot be incorporated again.
///
/// This returns `Some(true)` if this is the first time the peer has become aware of a
/// This returns `true` if this is the first time the peer has become aware of a
/// candidate with the given hash.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> Option<bool> {
let already_known = self.sent_statements.contains(fingerprint)
|| self.received_statements.contains(fingerprint);
if already_known {
return None;
}
fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool {
debug_assert!(
self.can_send(fingerprint),
"send is only called after `can_send` returns true; qed",
);
let new_known = match fingerprint.0 {
CompactStatement::Seconded(ref h) => {
@@ -186,20 +182,36 @@ impl PeerRelayParentKnowledge {
self.known_candidates.insert(h.clone())
},
CompactStatement::Valid(ref h) => {
// The peer can only accept Valid and Invalid statements for which it is aware
// of the corresponding candidate.
if !self.known_candidates.contains(h) {
return None;
}
CompactStatement::Valid(_) => {
false
}
};
self.sent_statements.insert(fingerprint.clone());
Some(new_known)
new_known
}
/// This returns `true` if the peer cannot accept this statement, without altering internal
/// state, `false` otherwise.
fn can_send(&self, fingerprint: &(CompactStatement, ValidatorIndex)) -> bool {
let already_known = self.sent_statements.contains(fingerprint)
|| self.received_statements.contains(fingerprint);
if already_known {
return false;
}
match fingerprint.0 {
CompactStatement::Valid(ref h) => {
// The peer can only accept Valid and Invalid statements for which it is aware
// of the corresponding candidate.
self.known_candidates.contains(h)
}
CompactStatement::Seconded(_) => {
true
},
}
}
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on
@@ -274,24 +286,41 @@ struct PeerData {
}
impl PeerData {
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based
/// Updates our view of the peer's knowledge with this statement's fingerprint based
/// on something that we would like to send to the peer.
///
/// This returns `None` if the peer cannot accept this statement, without altering internal
/// state.
/// NOTE: assumes `self.can_send` returned true before this call.
///
/// If the peer can accept the statement, this returns `Some` and updates the internal state.
/// Once the knowledge has incorporated a statement, it cannot be incorporated again.
///
/// This returns `Some(true)` if this is the first time the peer has become aware of a
/// This returns `true` if this is the first time the peer has become aware of a
/// candidate with the given hash.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn send(
&mut self,
relay_parent: &Hash,
fingerprint: &(CompactStatement, ValidatorIndex),
) -> Option<bool> {
self.view_knowledge.get_mut(relay_parent).map_or(None, |k| k.send(fingerprint))
) -> bool {
debug_assert!(
self.can_send(relay_parent, fingerprint),
"send is only called after `can_send` returns true; qed",
);
self.view_knowledge
.get_mut(relay_parent)
.expect("send is only called after `can_send` returns true; qed")
.send(fingerprint)
}
/// This returns `None` if the peer cannot accept this statement, without altering internal
/// state.
fn can_send(
&self,
relay_parent: &Hash,
fingerprint: &(CompactStatement, ValidatorIndex),
) -> bool {
self.view_knowledge
.get(relay_parent)
.map_or(false, |k| k.can_send(fingerprint))
}
/// Attempt to update our view of the peer's knowledge with this statement's fingerprint based on
@@ -623,12 +652,21 @@ async fn circulate_statement(
) -> Vec<PeerId> {
let fingerprint = stored.fingerprint();
let len_sqrt = (peers.len() as f64).sqrt() as usize;
let cap = std::cmp::max(MIN_GOSSIP_PEERS, len_sqrt);
let peers_to_send: HashMap<PeerId, bool> = peers.iter_mut().filter_map(|(peer, data)| {
data.send(&relay_parent, &fingerprint).map(|new| (peer.clone(), new))
}).take(cap).collect();
let peers_to_send: Vec<PeerId> = peers.iter().filter_map(|(peer, data)| {
if data.can_send(&relay_parent, &fingerprint) {
Some(peer.clone())
} else {
None
}
}).collect();
let peers_to_send = util::choose_random_sqrt_subset(peers_to_send, MIN_GOSSIP_PEERS);
let peers_to_send: Vec<(PeerId, bool)> = peers_to_send.into_iter()
.map(|peer_id| {
let new = peers.get_mut(&peer_id)
.expect("a subset is taken above, so it exists; qed")
.send(&relay_parent, &fingerprint);
(peer_id, new)
}).collect();
// Send all these peers the initial statement.
if !peers_to_send.is_empty() {
@@ -638,10 +676,10 @@ async fn circulate_statement(
?peers_to_send,
?relay_parent,
statement = ?stored.statement,
"Sending statement"
"Sending statement",
);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send.keys().cloned().collect(),
peers_to_send.iter().map(|(p, _)| p.clone()).collect(),
payload,
))).await;
}
@@ -665,26 +703,29 @@ async fn send_statements_about(
metrics: &Metrics,
) {
for statement in active_head.statements_about(candidate_hash) {
if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
let payload = statement_message(
relay_parent,
statement.statement.clone(),
);
tracing::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
?candidate_hash,
statement = ?statement.statement,
"Sending statement"
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
metrics.on_statement_distributed();
let fingerprint = statement.fingerprint();
if !peer_data.can_send(&relay_parent, &fingerprint) {
continue;
}
peer_data.send(&relay_parent, &fingerprint);
let payload = statement_message(
relay_parent,
statement.statement.clone(),
);
tracing::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
?candidate_hash,
statement = ?statement.statement,
"Sending statement",
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
metrics.on_statement_distributed();
}
}
@@ -699,25 +740,28 @@ async fn send_statements(
metrics: &Metrics,
) {
for statement in active_head.statements() {
if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() {
let payload = statement_message(
relay_parent,
statement.statement.clone(),
);
tracing::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
statement = ?statement.statement,
"Sending statement"
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
metrics.on_statement_distributed();
let fingerprint = statement.fingerprint();
if !peer_data.can_send(&relay_parent, &fingerprint) {
continue;
}
peer_data.send(&relay_parent, &fingerprint);
let payload = statement_message(
relay_parent,
statement.statement.clone(),
);
tracing::trace!(
target: LOG_TARGET,
?peer,
?relay_parent,
statement = ?statement.statement,
"Sending statement"
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
metrics.on_statement_distributed();
}
}
@@ -1331,7 +1375,7 @@ mod tests {
let hash_a = CandidateHash([1; 32].into());
// Sending an un-pinned statement should not work and should have no effect.
assert!(knowledge.send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))).is_none());
assert!(!knowledge.can_send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))));
assert!(!knowledge.known_candidates.contains(&hash_a));
assert!(knowledge.sent_statements.is_empty());
assert!(knowledge.received_statements.is_empty());
@@ -1339,8 +1383,8 @@ mod tests {
assert!(knowledge.received_message_count.is_empty());
// Make the peer aware of the candidate.
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))), Some(true));
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(1))), Some(false));
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))), true);
assert_eq!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(1))), false);
assert!(knowledge.known_candidates.contains(&hash_a));
assert_eq!(knowledge.sent_statements.len(), 2);
assert!(knowledge.received_statements.is_empty());
@@ -1348,7 +1392,7 @@ mod tests {
assert!(knowledge.received_message_count.get(&hash_a).is_none());
// And now it should accept the dependent message.
assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))), Some(false));
assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), ValidatorIndex(0))), false);
assert!(knowledge.known_candidates.contains(&hash_a));
assert_eq!(knowledge.sent_statements.len(), 3);
assert!(knowledge.received_statements.is_empty());
@@ -1362,7 +1406,7 @@ mod tests {
let hash_a = CandidateHash([1; 32].into());
assert!(knowledge.receive(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0)), 3).unwrap());
assert!(knowledge.send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))).is_none());
assert!(!knowledge.can_send(&(CompactStatement::Seconded(hash_a), ValidatorIndex(0))));
}
#[test]