diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index dc2ccd9a53..61147a4b22 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -631,7 +631,6 @@ fn transition_3_voters_twice_1_observer() { .for_each(move |_| { net.lock().send_import_notifications(); net.lock().send_finality_notifications(); - net.lock().sync(); Ok(()) }) .map(|_| ()) diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index a58bddfd46..d31bf43716 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -88,8 +88,12 @@ struct Peer { best_number: ::Number, /// Pending block request if any block_request: Option>, - /// Request timestamp - request_timestamp: Option, + /// Pending block request timestamp + block_request_timestamp: Option, + /// Pending block justification request if any + justification_request: Option>, + /// Pending block justification request timestamp + justification_request_timestamp: Option, /// Holds a set of transactions known to this peer. known_extrinsics: HashSet, /// Holds a set of blocks known to this peer. @@ -98,6 +102,18 @@ struct Peer { next_request_id: message::RequestId, } +impl Peer { + fn min_request_timestamp(&self) -> Option<&time::Instant> { + match (self.block_request_timestamp, self.justification_request_timestamp) { + (Some(t1), Some(t2)) if t1 < t2 => self.block_request_timestamp.as_ref(), + (Some(_), Some(_)) => self.justification_request_timestamp.as_ref(), + (Some(_), None) => self.block_request_timestamp.as_ref(), + (None, Some(_)) => self.justification_request_timestamp.as_ref(), + _ => None, + } + } +} + /// Info about a peer's known state. #[derive(Debug)] pub struct PeerInfo { @@ -256,6 +272,72 @@ impl, H: ExHashT> Protocol { }).collect() } + fn handle_response(&self, io: &mut SyncIo, who: NodeIndex, response: &message::BlockResponse) -> Option> { + let mut peers = self.context_data.peers.write(); + let request = if let Some(ref mut peer) = peers.get_mut(&who) { + match (peer.block_request.take(), peer.justification_request.take()) { + (Some(block_request), Some(justification_request)) => { + if block_request.id == response.id { + peer.block_request_timestamp = None; + peer.justification_request = Some(justification_request); + block_request + } else if justification_request.id == response.id { + peer.justification_request_timestamp = None; + peer.block_request = Some(block_request); + justification_request + } else { + peer.justification_request_timestamp = None; + peer.block_request_timestamp = None; + trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} or {} got {})", + who, + block_request.id, + justification_request.id, + response.id, + ); + return None; + } + }, + (Some(block_request), None) => { + if block_request.id == response.id { + peer.block_request_timestamp = None; + block_request + } else { + peer.block_request_timestamp = None; + trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", + who, + block_request.id, + response.id, + ); + return None; + } + }, + (None, Some(justification_request)) => { + if justification_request.id == response.id { + peer.justification_request_timestamp = None; + justification_request + } else { + peer.justification_request_timestamp = None; + trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", + who, + justification_request.id, + response.id, + ); + return None; + } + }, + (None, None) => { + io.report_peer(who, Severity::Bad("Unexpected response packet received from peer")); + return None; + }, + } + } else { + io.report_peer(who, Severity::Bad("Unexpected packet received from peer")); + return None; + }; + + Some(request) + } + pub fn handle_packet(&self, io: &mut SyncIo, who: NodeIndex, mut data: &[u8]) { let message: Message = match Decode::decode(&mut data) { Some(m) => m, @@ -270,29 +352,9 @@ impl, H: ExHashT> Protocol { GenericMessage::Status(s) => self.on_status_message(io, who, s), GenericMessage::BlockRequest(r) => self.on_block_request(io, who, r), GenericMessage::BlockResponse(r) => { - let request = { - let mut peers = self.context_data.peers.write(); - if let Some(ref mut peer) = peers.get_mut(&who) { - peer.request_timestamp = None; - match peer.block_request.take() { - Some(r) => r, - None => { - io.report_peer(who, Severity::Bad("Unexpected response packet received from peer")); - return; - } - } - } else { - io.report_peer(who, Severity::Bad("Unexpected packet received from peer")); - return; - } - }; - - if request.id != r.id { - trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", who, request.id, r.id); - return; + if let Some(request) = self.handle_response(io, who, &r) { + self.on_block_response(io, who, request, r); } - - self.on_block_response(io, who, request, r); }, GenericMessage::BlockAnnounce(announce) => self.on_block_announce(io, who, announce), GenericMessage::Transactions(m) => self.on_extrinsics(io, who, m), @@ -444,7 +506,7 @@ impl, H: ExHashT> Protocol { (Some(first), Some(_)) => format!(" ({})", first), _ => Default::default(), }; - trace!(target: "sync", "BlockResponse {} from {} with {} blocks{}", + trace!(target: "sync", "BlockResponse {} from {} with {} blocks {}", response.id, peer, response.blocks.len(), blocks_range); // TODO [andre]: move this logic to the import queue so that @@ -487,7 +549,7 @@ impl, H: ExHashT> Protocol { let peers = self.context_data.peers.read(); let handshaking_peers = self.handshaking_peers.read(); for (who, timestamp) in peers.iter() - .filter_map(|(id, peer)| peer.request_timestamp.as_ref().map(|r| (id, r))) + .filter_map(|(id, peer)| peer.min_request_timestamp().map(|r| (id, r))) .chain(handshaking_peers.iter()) { if (tick - *timestamp).as_secs() > REQUEST_TIMEOUT_SEC { @@ -551,7 +613,9 @@ impl, H: ExHashT> Protocol { best_hash: status.best_hash, best_number: status.best_number, block_request: None, - request_timestamp: None, + block_request_timestamp: None, + justification_request: None, + justification_request_timestamp: None, known_extrinsics: HashSet::new(), known_blocks: HashSet::new(), next_request_id: 0, @@ -812,8 +876,14 @@ fn send_message(peers: &RwLock (), diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/sync.rs index 015c51046a..cf93b020b4 100644 --- a/substrate/core/network/src/sync.rs +++ b/substrate/core/network/src/sync.rs @@ -93,10 +93,10 @@ impl PendingJustifications { let mut available_peers = peers.iter().filter_map(|(peer, sync)| { // don't request to any peers that already have pending requests - if let PeerSyncState::Available = sync.state { - Some((*peer, sync.best_number)) - } else { + if self.peer_requests.contains_key(&peer) { None + } else { + Some((*peer, sync.best_number)) } }).collect::>();