Periodically call Peerset::alloc_slots on all sets (#9025)

* Periodically call alloc_slots on all slots

* Add test
This commit is contained in:
Pierre Krieger
2021-06-07 15:00:03 +02:00
committed by GitHub
parent d9c1836987
commit 5c14dd3f32
+54 -1
View File
@@ -39,7 +39,7 @@ use futures::prelude::*;
use log::{debug, error, trace};
use serde_json::json;
use std::{collections::HashMap, pin::Pin, task::{Context, Poll}, time::Duration};
use wasm_timer::Instant;
use wasm_timer::{Delay, Instant};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
pub use libp2p::PeerId;
@@ -252,6 +252,9 @@ pub struct Peerset {
created: Instant,
/// Last time when we updated the reputations of connected nodes.
latest_time_update: Instant,
/// Next time to do a periodic call to `alloc_slots` with all sets. This is done once per
/// second, to match the period of the reputation updates.
next_periodic_alloc_slots: Delay,
}
impl Peerset {
@@ -279,6 +282,7 @@ impl Peerset {
message_queue: VecDeque::new(),
created: now,
latest_time_update: now,
next_periodic_alloc_slots: Delay::new(Duration::new(0, 0)),
}
};
@@ -699,6 +703,14 @@ impl Stream for Peerset {
return Poll::Ready(Some(message));
}
if let Poll::Ready(_) = Future::poll(Pin::new(&mut self.next_periodic_alloc_slots), cx) {
self.next_periodic_alloc_slots = Delay::new(Duration::new(1, 0));
for set_index in 0..self.data.num_sets() {
self.alloc_slots(SetId(set_index));
}
}
let action = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(event)) => event,
@@ -907,4 +919,45 @@ mod tests {
futures::executor::block_on(fut);
}
#[test]
fn test_relloc_after_banned() {
let (mut peerset, handle) = Peerset::from_config(PeersetConfig {
sets: vec![SetConfig {
in_peers: 25,
out_peers: 25,
bootnodes: vec![],
reserved_nodes: Default::default(),
reserved_only: false,
}],
});
// We ban a node by setting its reputation under the threshold.
let peer_id = PeerId::random();
handle.report_peer(peer_id.clone(), ReputationChange::new(BANNED_THRESHOLD - 1, ""));
let fut = futures::future::poll_fn(move |cx| {
// We need one polling for the message to be processed.
assert_eq!(Stream::poll_next(Pin::new(&mut peerset), cx), Poll::Pending);
// Check that an incoming connection from that node gets refused.
// This is already tested in other tests, but it is done again here because it doesn't
// hurt.
peerset.incoming(SetId::from(0), peer_id.clone(), IncomingIndex(1));
if let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
assert_eq!(msg.unwrap(), Message::Reject(IncomingIndex(1)));
} else {
panic!()
}
// Wait for the peerset to change its mind and actually connect to it.
while let Poll::Ready(msg) = Stream::poll_next(Pin::new(&mut peerset), cx) {
assert_eq!(msg.unwrap(), Message::Connect { set_id: SetId::from(0), peer_id });
}
Poll::Ready(())
});
futures::executor::block_on(fut);
}
}