core: send justification requests in parallel to block requests (#1563)

* core: send justification requests in parallel to block requests

* core: fix grandpa test
This commit is contained in:
André Silva
2019-01-25 14:07:26 +00:00
committed by Robert Habermeier
parent 6e55812704
commit 5ddcbe0ca6
3 changed files with 102 additions and 33 deletions
+99 -29
View File
@@ -88,8 +88,12 @@ struct Peer<B: BlockT, H: ExHashT> {
best_number: <B::Header as HeaderT>::Number,
/// Pending block request if any
block_request: Option<message::BlockRequest<B>>,
/// Request timestamp
request_timestamp: Option<time::Instant>,
/// Pending block request timestamp
block_request_timestamp: Option<time::Instant>,
/// Pending block justification request if any
justification_request: Option<message::BlockRequest<B>>,
/// Pending block justification request timestamp
justification_request_timestamp: Option<time::Instant>,
/// Holds a set of transactions known to this peer.
known_extrinsics: HashSet<H>,
/// Holds a set of blocks known to this peer.
@@ -98,6 +102,18 @@ struct Peer<B: BlockT, H: ExHashT> {
next_request_id: message::RequestId,
}
impl<B: BlockT, H: ExHashT> Peer<B, H> {
fn min_request_timestamp(&self) -> Option<&time::Instant> {
match (self.block_request_timestamp, self.justification_request_timestamp) {
(Some(t1), Some(t2)) if t1 < t2 => self.block_request_timestamp.as_ref(),
(Some(_), Some(_)) => self.justification_request_timestamp.as_ref(),
(Some(_), None) => self.block_request_timestamp.as_ref(),
(None, Some(_)) => self.justification_request_timestamp.as_ref(),
_ => None,
}
}
}
/// Info about a peer's known state.
#[derive(Debug)]
pub struct PeerInfo<B: BlockT> {
@@ -256,6 +272,72 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}).collect()
}
fn handle_response(&self, io: &mut SyncIo, who: NodeIndex, response: &message::BlockResponse<B>) -> Option<message::BlockRequest<B>> {
let mut peers = self.context_data.peers.write();
let request = if let Some(ref mut peer) = peers.get_mut(&who) {
match (peer.block_request.take(), peer.justification_request.take()) {
(Some(block_request), Some(justification_request)) => {
if block_request.id == response.id {
peer.block_request_timestamp = None;
peer.justification_request = Some(justification_request);
block_request
} else if justification_request.id == response.id {
peer.justification_request_timestamp = None;
peer.block_request = Some(block_request);
justification_request
} else {
peer.justification_request_timestamp = None;
peer.block_request_timestamp = None;
trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} or {} got {})",
who,
block_request.id,
justification_request.id,
response.id,
);
return None;
}
},
(Some(block_request), None) => {
if block_request.id == response.id {
peer.block_request_timestamp = None;
block_request
} else {
peer.block_request_timestamp = None;
trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})",
who,
block_request.id,
response.id,
);
return None;
}
},
(None, Some(justification_request)) => {
if justification_request.id == response.id {
peer.justification_request_timestamp = None;
justification_request
} else {
peer.justification_request_timestamp = None;
trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})",
who,
justification_request.id,
response.id,
);
return None;
}
},
(None, None) => {
io.report_peer(who, Severity::Bad("Unexpected response packet received from peer"));
return None;
},
}
} else {
io.report_peer(who, Severity::Bad("Unexpected packet received from peer"));
return None;
};
Some(request)
}
pub fn handle_packet(&self, io: &mut SyncIo, who: NodeIndex, mut data: &[u8]) {
let message: Message<B> = match Decode::decode(&mut data) {
Some(m) => m,
@@ -270,29 +352,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
GenericMessage::Status(s) => self.on_status_message(io, who, s),
GenericMessage::BlockRequest(r) => self.on_block_request(io, who, r),
GenericMessage::BlockResponse(r) => {
let request = {
let mut peers = self.context_data.peers.write();
if let Some(ref mut peer) = peers.get_mut(&who) {
peer.request_timestamp = None;
match peer.block_request.take() {
Some(r) => r,
None => {
io.report_peer(who, Severity::Bad("Unexpected response packet received from peer"));
return;
}
}
} else {
io.report_peer(who, Severity::Bad("Unexpected packet received from peer"));
return;
}
};
if request.id != r.id {
trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", who, request.id, r.id);
return;
if let Some(request) = self.handle_response(io, who, &r) {
self.on_block_response(io, who, request, r);
}
self.on_block_response(io, who, request, r);
},
GenericMessage::BlockAnnounce(announce) => self.on_block_announce(io, who, announce),
GenericMessage::Transactions(m) => self.on_extrinsics(io, who, m),
@@ -444,7 +506,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
trace!(target: "sync", "BlockResponse {} from {} with {} blocks{}",
trace!(target: "sync", "BlockResponse {} from {} with {} blocks {}",
response.id, peer, response.blocks.len(), blocks_range);
// TODO [andre]: move this logic to the import queue so that
@@ -487,7 +549,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let peers = self.context_data.peers.read();
let handshaking_peers = self.handshaking_peers.read();
for (who, timestamp) in peers.iter()
.filter_map(|(id, peer)| peer.request_timestamp.as_ref().map(|r| (id, r)))
.filter_map(|(id, peer)| peer.min_request_timestamp().map(|r| (id, r)))
.chain(handshaking_peers.iter())
{
if (tick - *timestamp).as_secs() > REQUEST_TIMEOUT_SEC {
@@ -551,7 +613,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
best_hash: status.best_hash,
best_number: status.best_number,
block_request: None,
request_timestamp: None,
block_request_timestamp: None,
justification_request: None,
justification_request_timestamp: None,
known_extrinsics: HashSet::new(),
known_blocks: HashSet::new(),
next_request_id: 0,
@@ -812,8 +876,14 @@ fn send_message<B: BlockT, H: ExHashT>(peers: &RwLock<HashMap<NodeIndex, Peer<B,
if let Some(ref mut peer) = peers.get_mut(&who) {
r.id = peer.next_request_id;
peer.next_request_id = peer.next_request_id + 1;
peer.block_request = Some(r.clone());
peer.request_timestamp = Some(time::Instant::now());
if r.fields == message::BlockAttributes::JUSTIFICATION {
peer.justification_request = Some(r.clone());
peer.justification_request_timestamp = Some(time::Instant::now());
} else {
peer.block_request = Some(r.clone());
peer.block_request_timestamp = Some(time::Instant::now());
}
}
},
_ => (),