diff --git a/substrate/core/network/src/on_demand.rs b/substrate/core/network/src/on_demand.rs index 59a5333a47..0c5140f2fe 100644 --- a/substrate/core/network/src/on_demand.rs +++ b/substrate/core/network/src/on_demand.rs @@ -413,16 +413,24 @@ impl OnDemandCore where None => return, }; - let last_peer = self.idle_peers.back().cloned(); - while !self.pending_requests.is_empty() { + let mut last_peer = self.idle_peers.back().cloned(); + let mut unhandled_requests = VecDeque::new(); + + loop { let peer = match self.idle_peers.pop_front() { Some(peer) => peer, - None => return, + None => break, }; // check if request can (optimistically) be processed by the peer let can_be_processed_by_peer = { - let request = self.pending_requests.front().expect("checked in loop condition; qed"); + let request = match self.pending_requests.front() { + Some(r) => r, + None => { + self.idle_peers.push_front(peer); + break; + }, + }; let peer_best_block = self.best_blocks.get(&peer) .expect("entries are inserted into best_blocks when peer is connected; entries are removed from best_blocks when peer is disconnected; @@ -436,12 +444,16 @@ impl OnDemandCore where // we have enumerated all peers and noone can handle request if Some(peer) == last_peer { - break; + let request = self.pending_requests.pop_front().expect("checked in loop condition; qed"); + unhandled_requests.push_back(request); + last_peer = self.idle_peers.back().cloned(); } continue; } + last_peer = self.idle_peers.back().cloned(); + let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed"); request.timestamp = Instant::now(); trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer); @@ -449,6 +461,8 @@ impl OnDemandCore where service.execute_in_context(|ctx| ctx.send_message(peer, request.message())); self.active_peers.insert(peer, request); } + + self.pending_requests.append(&mut unhandled_requests); } } @@ -928,4 +942,55 @@ pub mod tests { assert!(!on_demand.core.lock().idle_peers.iter().any(|_| true)); assert_eq!(on_demand.core.lock().pending_requests.len(), 0); } + + #[test] + fn does_not_loop_forever_after_dispatching_request_to_last_peer() { + // this test is a regression for a bug where the dispatch function would + // loop forever after dispatching a request to the last peer, since the + // last peer was not updated + let (_x, on_demand) = dummy(true); + let queue = RwLock::new(VecDeque::new()); + let mut network = TestIo::new(&queue, None); + + on_demand.remote_header(RemoteHeaderRequest { + cht_root: Default::default(), + block: 250, + retry_count: None, + }); + on_demand.remote_header(RemoteHeaderRequest { + cht_root: Default::default(), + block: 250, + retry_count: None, + }); + + on_demand.on_connect(1, Roles::FULL, 200); + on_demand.on_connect(2, Roles::FULL, 200); + on_demand.on_connect(3, Roles::FULL, 250); + + assert_eq!(vec![1, 2], on_demand.core.lock().idle_peers.iter().cloned().collect::>()); + assert_eq!(on_demand.core.lock().pending_requests.len(), 1); + } + + #[test] + fn tries_to_send_all_pending_requests() { + let (_x, on_demand) = dummy(true); + let queue = RwLock::new(VecDeque::new()); + let mut network = TestIo::new(&queue, None); + + on_demand.remote_header(RemoteHeaderRequest { + cht_root: Default::default(), + block: 300, + retry_count: None, + }); + on_demand.remote_header(RemoteHeaderRequest { + cht_root: Default::default(), + block: 250, + retry_count: None, + }); + + on_demand.on_connect(1, Roles::FULL, 250); + + assert!(on_demand.core.lock().idle_peers.iter().cloned().collect::>().is_empty()); + assert_eq!(on_demand.core.lock().pending_requests.len(), 1); + } }