mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 01:11:10 +00:00
Improve ondemand request dispatch (#1349)
* core: fix bug on ondemand dispatch after a request was dispatched to the last peer the dispatch would loop forever on subsequent requests that aren't able to be fulfilled by any of the peers, since the last peer wasn't updated. * core: try to dispatch all pending ondemand requests
This commit is contained in:
@@ -413,16 +413,24 @@ impl<B, E> OnDemandCore<B, E> where
|
||||
None => return,
|
||||
};
|
||||
|
||||
let last_peer = self.idle_peers.back().cloned();
|
||||
while !self.pending_requests.is_empty() {
|
||||
let mut last_peer = self.idle_peers.back().cloned();
|
||||
let mut unhandled_requests = VecDeque::new();
|
||||
|
||||
loop {
|
||||
let peer = match self.idle_peers.pop_front() {
|
||||
Some(peer) => peer,
|
||||
None => return,
|
||||
None => break,
|
||||
};
|
||||
|
||||
// check if request can (optimistically) be processed by the peer
|
||||
let can_be_processed_by_peer = {
|
||||
let request = self.pending_requests.front().expect("checked in loop condition; qed");
|
||||
let request = match self.pending_requests.front() {
|
||||
Some(r) => r,
|
||||
None => {
|
||||
self.idle_peers.push_front(peer);
|
||||
break;
|
||||
},
|
||||
};
|
||||
let peer_best_block = self.best_blocks.get(&peer)
|
||||
.expect("entries are inserted into best_blocks when peer is connected;
|
||||
entries are removed from best_blocks when peer is disconnected;
|
||||
@@ -436,12 +444,16 @@ impl<B, E> OnDemandCore<B, E> where
|
||||
|
||||
// we have enumerated all peers and noone can handle request
|
||||
if Some(peer) == last_peer {
|
||||
break;
|
||||
let request = self.pending_requests.pop_front().expect("checked in loop condition; qed");
|
||||
unhandled_requests.push_back(request);
|
||||
last_peer = self.idle_peers.back().cloned();
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
last_peer = self.idle_peers.back().cloned();
|
||||
|
||||
let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed");
|
||||
request.timestamp = Instant::now();
|
||||
trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer);
|
||||
@@ -449,6 +461,8 @@ impl<B, E> OnDemandCore<B, E> where
|
||||
service.execute_in_context(|ctx| ctx.send_message(peer, request.message()));
|
||||
self.active_peers.insert(peer, request);
|
||||
}
|
||||
|
||||
self.pending_requests.append(&mut unhandled_requests);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -928,4 +942,55 @@ pub mod tests {
|
||||
assert!(!on_demand.core.lock().idle_peers.iter().any(|_| true));
|
||||
assert_eq!(on_demand.core.lock().pending_requests.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn does_not_loop_forever_after_dispatching_request_to_last_peer() {
|
||||
// this test is a regression for a bug where the dispatch function would
|
||||
// loop forever after dispatching a request to the last peer, since the
|
||||
// last peer was not updated
|
||||
let (_x, on_demand) = dummy(true);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let mut network = TestIo::new(&queue, None);
|
||||
|
||||
on_demand.remote_header(RemoteHeaderRequest {
|
||||
cht_root: Default::default(),
|
||||
block: 250,
|
||||
retry_count: None,
|
||||
});
|
||||
on_demand.remote_header(RemoteHeaderRequest {
|
||||
cht_root: Default::default(),
|
||||
block: 250,
|
||||
retry_count: None,
|
||||
});
|
||||
|
||||
on_demand.on_connect(1, Roles::FULL, 200);
|
||||
on_demand.on_connect(2, Roles::FULL, 200);
|
||||
on_demand.on_connect(3, Roles::FULL, 250);
|
||||
|
||||
assert_eq!(vec![1, 2], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
|
||||
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tries_to_send_all_pending_requests() {
|
||||
let (_x, on_demand) = dummy(true);
|
||||
let queue = RwLock::new(VecDeque::new());
|
||||
let mut network = TestIo::new(&queue, None);
|
||||
|
||||
on_demand.remote_header(RemoteHeaderRequest {
|
||||
cht_root: Default::default(),
|
||||
block: 300,
|
||||
retry_count: None,
|
||||
});
|
||||
on_demand.remote_header(RemoteHeaderRequest {
|
||||
cht_root: Default::default(),
|
||||
block: 250,
|
||||
retry_count: None,
|
||||
});
|
||||
|
||||
on_demand.on_connect(1, Roles::FULL, 250);
|
||||
|
||||
assert!(on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>().is_empty());
|
||||
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user