mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 18:37:59 +00:00
Detect obsolete block responses (#6077)
This commit is contained in:
@@ -548,30 +548,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
self.sync.update_chain_info(&info.best_hash, info.best_number);
|
||||
}
|
||||
|
||||
/// Accepts a response from the legacy substream and determines what the corresponding
|
||||
/// request was.
|
||||
fn handle_response(
|
||||
&mut self,
|
||||
who: PeerId,
|
||||
response: &message::BlockResponse<B>
|
||||
) -> Option<message::BlockRequest<B>> {
|
||||
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
|
||||
if peer.obsolete_requests.remove(&response.id).is_some() {
|
||||
trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", who, response.id);
|
||||
return None;
|
||||
}
|
||||
// Clear the request. If the response is invalid peer will be disconnected anyway.
|
||||
let request = peer.block_request.take();
|
||||
if request.as_ref().map_or(false, |(_, r)| r.id == response.id) {
|
||||
return request.map(|(_, r)| r)
|
||||
}
|
||||
trace!(target: "sync", "Unexpected response packet from {} ({})", who, response.id);
|
||||
self.peerset_handle.report_peer(who.clone(), rep::UNEXPECTED_RESPONSE);
|
||||
self.behaviour.disconnect_peer(&who);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn update_peer_info(&mut self, who: &PeerId) {
|
||||
if let Some(info) = self.sync.peer_info(who) {
|
||||
if let Some(ref mut peer) = self.context_data.peers.get_mut(who) {
|
||||
@@ -609,11 +585,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
GenericMessage::Status(s) => return self.on_status_message(who, s),
|
||||
GenericMessage::BlockRequest(r) => self.on_block_request(who, r),
|
||||
GenericMessage::BlockResponse(r) => {
|
||||
if let Some(request) = self.handle_response(who.clone(), &r) {
|
||||
let outcome = self.on_block_response(who.clone(), request, r);
|
||||
self.update_peer_info(&who);
|
||||
return outcome
|
||||
}
|
||||
let outcome = self.on_block_response(who.clone(), r);
|
||||
self.update_peer_info(&who);
|
||||
return outcome
|
||||
},
|
||||
GenericMessage::BlockAnnounce(announce) => {
|
||||
let outcome = self.on_block_announce(who.clone(), announce);
|
||||
@@ -705,6 +679,10 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
);
|
||||
}
|
||||
|
||||
fn update_peer_request(&mut self, who: &PeerId, request: &mut message::BlockRequest<B>) {
|
||||
update_peer_request::<B, H>(&mut self.context_data.peers, who, request)
|
||||
}
|
||||
|
||||
/// Called when a new peer is connected
|
||||
pub fn on_peer_connected(&mut self, who: PeerId) {
|
||||
trace!(target: "sync", "Connecting {}", who);
|
||||
@@ -844,9 +822,34 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
pub fn on_block_response(
|
||||
&mut self,
|
||||
peer: PeerId,
|
||||
request: message::BlockRequest<B>,
|
||||
response: message::BlockResponse<B>,
|
||||
) -> CustomMessageOutcome<B> {
|
||||
let request = if let Some(ref mut p) = self.context_data.peers.get_mut(&peer) {
|
||||
if p.obsolete_requests.remove(&response.id).is_some() {
|
||||
trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
|
||||
return CustomMessageOutcome::None;
|
||||
}
|
||||
// Clear the request. If the response is invalid peer will be disconnected anyway.
|
||||
match p.block_request.take() {
|
||||
Some((_, request)) if request.id == response.id => request,
|
||||
Some(_) => {
|
||||
trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
|
||||
return CustomMessageOutcome::None;
|
||||
}
|
||||
None => {
|
||||
trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer);
|
||||
self.behaviour.disconnect_peer(&peer);
|
||||
self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE);
|
||||
return CustomMessageOutcome::None;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer);
|
||||
self.behaviour.disconnect_peer(&peer);
|
||||
self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE);
|
||||
return CustomMessageOutcome::None;
|
||||
};
|
||||
|
||||
let blocks_range = || match (
|
||||
response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
|
||||
response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
|
||||
@@ -891,8 +894,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
match self.sync.on_block_data(&peer, Some(request), response) {
|
||||
Ok(sync::OnBlockData::Import(origin, blocks)) =>
|
||||
CustomMessageOutcome::BlockImport(origin, blocks),
|
||||
Ok(sync::OnBlockData::Request(peer, req)) => {
|
||||
Ok(sync::OnBlockData::Request(peer, mut req)) => {
|
||||
if self.use_new_block_requests_protocol {
|
||||
self.update_peer_request(&peer, &mut req);
|
||||
CustomMessageOutcome::BlockRequest {
|
||||
target: peer,
|
||||
request: req,
|
||||
@@ -1076,8 +1080,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
if info.roles.is_full() {
|
||||
match self.sync.new_peer(who.clone(), info.best_hash, info.best_number) {
|
||||
Ok(None) => (),
|
||||
Ok(Some(req)) => {
|
||||
Ok(Some(mut req)) => {
|
||||
if self.use_new_block_requests_protocol {
|
||||
self.update_peer_request(&who, &mut req);
|
||||
self.pending_messages.push_back(CustomMessageOutcome::BlockRequest {
|
||||
target: who.clone(),
|
||||
request: req,
|
||||
@@ -1413,8 +1418,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
Ok(sync::OnBlockData::Import(origin, blocks)) => {
|
||||
CustomMessageOutcome::BlockImport(origin, blocks)
|
||||
},
|
||||
Ok(sync::OnBlockData::Request(peer, req)) => {
|
||||
Ok(sync::OnBlockData::Request(peer, mut req)) => {
|
||||
if self.use_new_block_requests_protocol {
|
||||
self.update_peer_request(&peer, &mut req);
|
||||
CustomMessageOutcome::BlockRequest {
|
||||
target: peer,
|
||||
request: req,
|
||||
@@ -1520,8 +1526,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
|
||||
);
|
||||
for result in results {
|
||||
match result {
|
||||
Ok((id, req)) => {
|
||||
Ok((id, mut req)) => {
|
||||
if self.use_new_block_requests_protocol {
|
||||
update_peer_request(&mut self.context_data.peers, &id, &mut req);
|
||||
self.pending_messages.push_back(CustomMessageOutcome::BlockRequest {
|
||||
target: id,
|
||||
request: req,
|
||||
@@ -1935,6 +1942,22 @@ fn send_request<B: BlockT, H: ExHashT>(
|
||||
send_message::<B>(behaviour, stats, who, None, message)
|
||||
}
|
||||
|
||||
fn update_peer_request<B: BlockT, H: ExHashT>(
|
||||
peers: &mut HashMap<PeerId, Peer<B, H>>,
|
||||
who: &PeerId,
|
||||
request: &mut message::BlockRequest<B>,
|
||||
) {
|
||||
if let Some(ref mut peer) = peers.get_mut(who) {
|
||||
request.id = peer.next_request_id;
|
||||
peer.next_request_id += 1;
|
||||
if let Some((timestamp, request)) = peer.block_request.take() {
|
||||
trace!(target: "sync", "Request {} for {} is now obsolete.", request.id, who);
|
||||
peer.obsolete_requests.insert(request.id, timestamp);
|
||||
}
|
||||
peer.block_request = Some((Instant::now(), request.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
fn send_message<B: BlockT>(
|
||||
behaviour: &mut GenericProto,
|
||||
stats: &mut HashMap<&'static str, PacketStats>,
|
||||
@@ -2012,8 +2035,9 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
|
||||
self.propagate_extrinsics();
|
||||
}
|
||||
|
||||
for (id, r) in self.sync.block_requests() {
|
||||
for (id, mut r) in self.sync.block_requests() {
|
||||
if self.use_new_block_requests_protocol {
|
||||
update_peer_request(&mut self.context_data.peers, &id, &mut r);
|
||||
let event = CustomMessageOutcome::BlockRequest {
|
||||
target: id.clone(),
|
||||
request: r,
|
||||
@@ -2029,8 +2053,9 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
|
||||
)
|
||||
}
|
||||
}
|
||||
for (id, r) in self.sync.justification_requests() {
|
||||
for (id, mut r) in self.sync.justification_requests() {
|
||||
if self.use_new_block_requests_protocol {
|
||||
update_peer_request(&mut self.context_data.peers, &id, &mut r);
|
||||
let event = CustomMessageOutcome::BlockRequest {
|
||||
target: id,
|
||||
request: r,
|
||||
|
||||
Reference in New Issue
Block a user