have some lenience on outdated messages in statement distribution (#5150)

* have some lenience on outdated messages in statement distribution

* fmt
This commit is contained in:
asynchronous rob
2022-03-18 11:12:26 -05:00
committed by GitHub
parent a97ee7b122
commit be2c91555e
@@ -58,7 +58,7 @@ use indexmap::{map::Entry as IEntry, IndexMap};
use sp_keystore::SyncCryptoStorePtr; use sp_keystore::SyncCryptoStorePtr;
use util::runtime::RuntimeInfo; use util::runtime::RuntimeInfo;
use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};
use fatality::Nested; use fatality::Nested;
@@ -153,6 +153,27 @@ impl StatementDistributionSubsystem {
} }
} }
#[derive(Default)]
struct RecentOutdatedHeads {
buf: VecDeque<Hash>,
}
impl RecentOutdatedHeads {
fn note_outdated(&mut self, hash: Hash) {
const MAX_BUF_LEN: usize = 10;
self.buf.push_back(hash);
while self.buf.len() > MAX_BUF_LEN {
let _ = self.buf.pop_front();
}
}
fn is_recent_outdated(&self, hash: &Hash) -> bool {
self.buf.contains(hash)
}
}
/// Tracks our impression of a single peer's view of the candidates a validator has seconded /// Tracks our impression of a single peer's view of the candidates a validator has seconded
/// for a given relay-parent. /// for a given relay-parent.
/// ///
@@ -1287,6 +1308,7 @@ async fn handle_incoming_message_and_circulate<'a>(
gossip_peers: &HashSet<PeerId>, gossip_peers: &HashSet<PeerId>,
peers: &mut HashMap<PeerId, PeerData>, peers: &mut HashMap<PeerId, PeerData>,
active_heads: &'a mut HashMap<Hash, ActiveHeadData>, active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext), ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
message: protocol_v1::StatementDistributionMessage, message: protocol_v1::StatementDistributionMessage,
req_sender: &mpsc::Sender<RequesterMessage>, req_sender: &mpsc::Sender<RequesterMessage>,
@@ -1294,8 +1316,17 @@ async fn handle_incoming_message_and_circulate<'a>(
) { ) {
let handled_incoming = match peers.get_mut(&peer) { let handled_incoming = match peers.get_mut(&peer) {
Some(data) => Some(data) =>
handle_incoming_message(peer, data, active_heads, ctx, message, req_sender, metrics) handle_incoming_message(
.await, peer,
data,
active_heads,
recent_outdated_heads,
ctx,
message,
req_sender,
metrics,
)
.await,
None => None, None => None,
}; };
@@ -1331,6 +1362,7 @@ async fn handle_incoming_message<'a>(
peer: PeerId, peer: PeerId,
peer_data: &mut PeerData, peer_data: &mut PeerData,
active_heads: &'a mut HashMap<Hash, ActiveHeadData>, active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext), ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
message: protocol_v1::StatementDistributionMessage, message: protocol_v1::StatementDistributionMessage,
req_sender: &mpsc::Sender<RequesterMessage>, req_sender: &mpsc::Sender<RequesterMessage>,
@@ -1347,7 +1379,11 @@ async fn handle_incoming_message<'a>(
%relay_parent, %relay_parent,
"our view out-of-sync with active heads; head not found", "our view out-of-sync with active heads; head not found",
); );
report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await;
if !recent_outdated_heads.is_recent_outdated(&relay_parent) {
report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await;
}
return None return None
}, },
}; };
@@ -1556,6 +1592,7 @@ async fn handle_network_update(
gossip_peers: &mut HashSet<PeerId>, gossip_peers: &mut HashSet<PeerId>,
authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>, authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>,
active_heads: &mut HashMap<Hash, ActiveHeadData>, active_heads: &mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext), ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
req_sender: &mpsc::Sender<RequesterMessage>, req_sender: &mpsc::Sender<RequesterMessage>,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>, update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
@@ -1612,6 +1649,7 @@ async fn handle_network_update(
gossip_peers, gossip_peers,
peers, peers,
active_heads, active_heads,
&*recent_outdated_heads,
ctx, ctx,
message, message,
req_sender, req_sender,
@@ -1653,6 +1691,7 @@ impl StatementDistributionSubsystem {
let mut gossip_peers: HashSet<PeerId> = HashSet::new(); let mut gossip_peers: HashSet<PeerId> = HashSet::new();
let mut authorities: HashMap<AuthorityDiscoveryId, PeerId> = HashMap::new(); let mut authorities: HashMap<AuthorityDiscoveryId, PeerId> = HashMap::new();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new(); let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
let mut recent_outdated_heads = RecentOutdatedHeads::default();
let mut runtime = RuntimeInfo::new(Some(self.keystore.clone())); let mut runtime = RuntimeInfo::new(Some(self.keystore.clone()));
@@ -1684,6 +1723,7 @@ impl StatementDistributionSubsystem {
&mut gossip_peers, &mut gossip_peers,
&mut authorities, &mut authorities,
&mut active_heads, &mut active_heads,
&mut recent_outdated_heads,
&req_sender, &req_sender,
result?, result?,
) )
@@ -1701,6 +1741,7 @@ impl StatementDistributionSubsystem {
&gossip_peers, &gossip_peers,
&mut peers, &mut peers,
&mut active_heads, &mut active_heads,
&recent_outdated_heads,
&req_sender, &req_sender,
result.ok_or(FatalError::RequesterReceiverFinished)?, result.ok_or(FatalError::RequesterReceiverFinished)?,
) )
@@ -1767,6 +1808,7 @@ impl StatementDistributionSubsystem {
gossip_peers: &HashSet<PeerId>, gossip_peers: &HashSet<PeerId>,
peers: &mut HashMap<PeerId, PeerData>, peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>, active_heads: &mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
req_sender: &mpsc::Sender<RequesterMessage>, req_sender: &mpsc::Sender<RequesterMessage>,
message: RequesterMessage, message: RequesterMessage,
) -> JfyiErrorResult<()> { ) -> JfyiErrorResult<()> {
@@ -1814,6 +1856,7 @@ impl StatementDistributionSubsystem {
gossip_peers, gossip_peers,
peers, peers,
active_heads, active_heads,
recent_outdated_heads,
ctx, ctx,
message, message,
req_sender, req_sender,
@@ -1874,6 +1917,7 @@ impl StatementDistributionSubsystem {
gossip_peers: &mut HashSet<PeerId>, gossip_peers: &mut HashSet<PeerId>,
authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>, authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>,
active_heads: &mut HashMap<Hash, ActiveHeadData>, active_heads: &mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &mut RecentOutdatedHeads,
req_sender: &mpsc::Sender<RequesterMessage>, req_sender: &mpsc::Sender<RequesterMessage>,
message: FromOverseer<StatementDistributionMessage>, message: FromOverseer<StatementDistributionMessage>,
) -> Result<bool> { ) -> Result<bool> {
@@ -1893,6 +1937,8 @@ impl StatementDistributionSubsystem {
hash = ?deactivated, hash = ?deactivated,
"Deactivating leaf", "Deactivating leaf",
); );
recent_outdated_heads.note_outdated(deactivated);
} }
} }
@@ -1985,6 +2031,7 @@ impl StatementDistributionSubsystem {
gossip_peers, gossip_peers,
authorities, authorities,
active_heads, active_heads,
&*recent_outdated_heads,
ctx, ctx,
req_sender, req_sender,
event, event,