Filter peers on light nodes (#862)

* do not send OnDemand request to node who, we believe, can't process it

* ignore peers with too old best block on light nodes

* non-weird temporary

* fix condition

* compilation
This commit is contained in:
Svyatoslav Nikolsky
2018-10-05 20:51:28 +03:00
committed by Robert Habermeier
parent 36d71cb88d
commit f851dcf41c
2 changed files with 141 additions and 22 deletions
+124 -20
View File
@@ -16,7 +16,7 @@
//! On-demand requests service.
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Weak};
use std::time::{Instant, Duration};
use futures::{Async, Future, Poll};
@@ -41,7 +41,10 @@ const RETRY_COUNT: usize = 1;
/// On-demand service API.
pub trait OnDemandService<Block: BlockT>: Send + Sync {
/// When new node is connected.
fn on_connect(&self, peer: NodeIndex, role: service::Roles);
fn on_connect(&self, peer: NodeIndex, role: service::Roles, best_number: NumberFor<Block>);
/// When block is announced by the peer.
fn on_block_announce(&self, peer: NodeIndex, best_number: NumberFor<Block>);
/// When node is disconnected.
fn on_disconnect(&self, peer: NodeIndex);
@@ -90,6 +93,7 @@ struct OnDemandCore<B: BlockT, E: service::ExecuteInContext<B>> {
pending_requests: VecDeque<Request<B>>,
active_peers: LinkedHashMap<NodeIndex, Request<B>>,
idle_peers: VecDeque<NodeIndex>,
best_blocks: HashMap<NodeIndex, NumberFor<B>>,
}
struct Request<Block: BlockT> {
@@ -141,6 +145,7 @@ impl<B: BlockT, E> OnDemand<B, E> where
pending_requests: VecDeque::new(),
active_peers: LinkedHashMap::new(),
idle_peers: VecDeque::new(),
best_blocks: HashMap::new(),
})
}
}
@@ -206,13 +211,19 @@ impl<B, E> OnDemandService<B> for OnDemand<B, E> where
E: service::ExecuteInContext<B>,
B::Header: HeaderT,
{
fn on_connect(&self, peer: NodeIndex, role: service::Roles) {
fn on_connect(&self, peer: NodeIndex, role: service::Roles, best_number: NumberFor<B>) {
if !role.intersects(service::Roles::FULL | service::Roles::AUTHORITY) { // TODO: correct?
return;
}
let mut core = self.core.lock();
core.add_peer(peer);
core.add_peer(peer, best_number);
core.dispatch();
}
fn on_block_announce(&self, peer: NodeIndex, best_number: NumberFor<B>) {
let mut core = self.core.lock();
core.update_peer(peer, best_number);
core.dispatch();
}
@@ -329,11 +340,18 @@ impl<B, E> OnDemandCore<B, E> where
E: service::ExecuteInContext<B>,
B::Header: HeaderT,
{
pub fn add_peer(&mut self, peer: NodeIndex) {
pub fn add_peer(&mut self, peer: NodeIndex, best_number: NumberFor<B>) {
self.idle_peers.push_back(peer);
self.best_blocks.insert(peer, best_number);
}
pub fn update_peer(&mut self, peer: NodeIndex, best_number: NumberFor<B>) {
self.best_blocks.insert(peer, best_number);
}
pub fn remove_peer(&mut self, peer: NodeIndex) {
self.best_blocks.remove(&peer);
if let Some(request) = self.active_peers.remove(&peer) {
self.pending_requests.push_front(request);
return;
@@ -390,12 +408,35 @@ impl<B, E> OnDemandCore<B, E> where
None => return,
};
let last_peer = self.idle_peers.back().cloned();
while !self.pending_requests.is_empty() {
let peer = match self.idle_peers.pop_front() {
Some(peer) => peer,
None => return,
};
// check if request can (optimistically) be processed by the peer
let can_be_processed_by_peer = {
let request = self.pending_requests.front().expect("checked in loop condition; qed");
let peer_best_block = self.best_blocks.get(&peer)
.expect("entries are inserted into best_blocks when peer is connected;
entries are removed from best_blocks when peer is disconnected;
peer is in idle_peers and thus connected; qed");
request.required_block() <= *peer_best_block
};
if !can_be_processed_by_peer {
// return peer to the back of the queue
self.idle_peers.push_back(peer);
// we have enumerated all peers and noone can handle request
if Some(peer) == last_peer {
break;
}
continue;
}
let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed");
request.timestamp = Instant::now();
trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer);
@@ -407,6 +448,15 @@ impl<B, E> OnDemandCore<B, E> where
}
impl<Block: BlockT> Request<Block> {
pub fn required_block(&self) -> NumberFor<Block> {
match self.data {
RequestData::RemoteHeader(ref data, _) => data.block,
RequestData::RemoteRead(ref data, _) => *data.header.number(),
RequestData::RemoteCall(ref data, _) => *data.header.number(),
RequestData::RemoteChanges(ref data, _) => data.max_block.0,
}
}
pub fn message(&self) -> message::Message<Block> {
match self.data {
RequestData::RemoteHeader(ref data, _) =>
@@ -545,19 +595,24 @@ pub mod tests {
#[test]
fn knows_about_peers_roles() {
let (_, on_demand) = dummy(true);
on_demand.on_connect(0, Roles::LIGHT);
on_demand.on_connect(1, Roles::FULL);
on_demand.on_connect(2, Roles::AUTHORITY);
on_demand.on_connect(0, Roles::LIGHT, 1000);
on_demand.on_connect(1, Roles::FULL, 2000);
on_demand.on_connect(2, Roles::AUTHORITY, 3000);
assert_eq!(vec![1, 2], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
assert_eq!(on_demand.core.lock().best_blocks.get(&1), Some(&2000));
assert_eq!(on_demand.core.lock().best_blocks.get(&2), Some(&3000));
}
#[test]
fn disconnects_from_idle_peer() {
let (_, on_demand) = dummy(true);
on_demand.on_connect(0, Roles::FULL);
on_demand.on_connect(0, Roles::FULL, 100);
assert_eq!(1, total_peers(&*on_demand));
assert!(!on_demand.core.lock().best_blocks.is_empty());
on_demand.on_disconnect(0);
assert_eq!(0, total_peers(&*on_demand));
assert!(on_demand.core.lock().best_blocks.is_empty());
}
#[test]
@@ -566,8 +621,8 @@ pub mod tests {
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
on_demand.on_connect(1, Roles::FULL);
on_demand.on_connect(0, Roles::FULL, 1000);
on_demand.on_connect(1, Roles::FULL, 1000);
assert_eq!(vec![0, 1], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
assert!(on_demand.core.lock().active_peers.is_empty());
@@ -593,7 +648,7 @@ pub mod tests {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
on_demand.on_connect(0, Roles::FULL, 1000);
on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
@@ -620,7 +675,7 @@ pub mod tests {
retry_count: Some(1),
});
on_demand.on_connect(0, Roles::FULL);
on_demand.on_connect(0, Roles::FULL, 1000);
receive_call_response(&*on_demand, &mut network, 0, 0);
assert!(network.to_disconnect.contains(&0));
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
@@ -631,7 +686,7 @@ pub mod tests {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
on_demand.on_connect(0, Roles::FULL, 1000);
receive_call_response(&*on_demand, &mut network, 0, 0);
assert!(network.to_disconnect.contains(&0));
@@ -642,7 +697,7 @@ pub mod tests {
let (_x, on_demand) = dummy(false);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
on_demand.on_connect(0, Roles::FULL, 1000);
on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
@@ -669,7 +724,7 @@ pub mod tests {
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
for i in 0..retry_count+1 {
on_demand.on_connect(i, Roles::FULL);
on_demand.on_connect(i, Roles::FULL, 1000);
}
let sync = Arc::new((Mutex::new(0), Mutex::new(0), Condvar::new()));
@@ -708,7 +763,7 @@ pub mod tests {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
on_demand.on_connect(0, Roles::FULL, 1000);
let response = on_demand.remote_call(RemoteCallRequest {
block: Default::default(),
@@ -731,7 +786,7 @@ pub mod tests {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
on_demand.on_connect(0, Roles::FULL, 1000);
let response = on_demand.remote_read(RemoteReadRequest {
header: dummy_header(),
@@ -756,7 +811,7 @@ pub mod tests {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
on_demand.on_connect(0, Roles::FULL, 1000);
let response = on_demand.remote_header(RemoteHeaderRequest {
cht_root: Default::default(),
@@ -787,7 +842,7 @@ pub mod tests {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
on_demand.on_connect(0, Roles::FULL, 1000);
let response = on_demand.remote_changes(RemoteChangesRequest {
changes_trie_config: changes_trie_config(),
@@ -810,4 +865,53 @@ pub mod tests {
});
thread.join().unwrap();
}
#[test]
fn does_not_sends_request_to_peer_who_has_no_required_block() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(1, Roles::FULL, 100);
on_demand.remote_header(RemoteHeaderRequest {
cht_root: Default::default(),
block: 200,
retry_count: None,
});
on_demand.remote_header(RemoteHeaderRequest {
cht_root: Default::default(),
block: 250,
retry_count: None,
});
on_demand.remote_header(RemoteHeaderRequest {
cht_root: Default::default(),
block: 250,
retry_count: None,
});
on_demand.on_connect(2, Roles::FULL, 150);
assert_eq!(vec![1, 2], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
assert_eq!(on_demand.core.lock().pending_requests.len(), 3);
on_demand.on_block_announce(1, 250);
assert_eq!(vec![2], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
assert_eq!(on_demand.core.lock().pending_requests.len(), 2);
on_demand.on_block_announce(2, 250);
assert!(!on_demand.core.lock().idle_peers.iter().any(|_| true));
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
on_demand.on_remote_header_response(&mut network, 1, message::RemoteHeaderResponse {
id: 0,
header: Some(dummy_header()),
proof: vec![],
});
assert!(!on_demand.core.lock().idle_peers.iter().any(|_| true));
assert_eq!(on_demand.core.lock().pending_requests.len(), 0);
}
}
+17 -2
View File
@@ -46,6 +46,10 @@ pub (crate) const CURRENT_PACKET_COUNT: u8 = 1;
// Maximum allowed entries in `BlockResponse`
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
/// When light node connects to the full node and the full node is behind light node
/// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it unuseful
/// and disconnect to free connection slot.
const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192;
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, S: Specialization<B>, H: ExHashT> {
@@ -436,6 +440,16 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
io.report_peer(who, Severity::Bad(&format!("Peer using unsupported protocol version {}", status.version)));
return;
}
if self.config.roles & Roles::LIGHT == Roles::LIGHT {
let self_best_block = self.context_data.chain.info().ok()
.and_then(|info| info.best_queued_number)
.unwrap_or_else(|| Zero::zero());
let blocks_difference = self_best_block.as_().checked_sub(status.best_number.as_()).unwrap_or(0);
if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE {
io.report_peer(who, Severity::Useless("Peer is far behind us and will unable to serve light requests"));
return;
}
}
let peer = Peer {
protocol_version: status.version,
@@ -454,9 +468,9 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
}
let mut context = ProtocolContext::new(&self.context_data, io);
self.on_demand.as_ref().map(|s| s.on_connect(who, status.roles, status.best_number));
self.sync.write().new_peer(&mut context, who);
self.specialization.write().on_connect(&mut context, who, status.clone());
self.on_demand.as_ref().map(|s| s.on_connect(who, status.roles));
self.specialization.write().on_connect(&mut context, who, status);
}
/// Called when peer sends us new extrinsics
@@ -559,6 +573,7 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
peer.known_blocks.insert(hash.clone());
}
}
self.on_demand.as_ref().map(|s| s.on_block_announce(who, *header.number()));
self.sync.write().on_block_announce(&mut ProtocolContext::new(&self.context_data, io), who, hash, &header);
}