Extra timeout handling in block_requests (#5794)

This commit is contained in:
Pierre Krieger
2020-04-27 12:17:26 +02:00
committed by GitHub
parent fbc4e7f055
commit 636ddd95d2
2 changed files with 229 additions and 12 deletions
@@ -30,6 +30,7 @@ use crate::{
protocol::{api, message::{self, BlockAttributes}}
};
use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
use futures_timer::Delay;
use libp2p::{
core::{
ConnectedPoint,
@@ -54,10 +55,11 @@ use prost::Message;
use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}};
use std::{
cmp::min,
collections::VecDeque,
collections::{HashMap, VecDeque},
io,
iter,
marker::PhantomData,
pin::Pin,
sync::Arc,
time::Duration,
task::{Context, Poll}
@@ -77,6 +79,19 @@ pub enum Event<B: Block> {
original_request: message::BlockRequest<B>,
response: message::BlockResponse<B>,
},
/// A request has been cancelled because the peer has disconnected.
/// Disconnects can also happen as a result of violating the network protocol.
RequestCancelled {
peer: PeerId,
/// The original request passed to `send_request`.
original_request: message::BlockRequest<B>,
},
/// A request has timed out.
RequestTimeout {
peer: PeerId,
/// The original request passed to `send_request`.
original_request: message::BlockRequest<B>,
}
}
/// Configuration options for `BlockRequests`.
@@ -86,6 +101,7 @@ pub struct Config {
max_request_len: usize,
max_response_len: usize,
inactivity_timeout: Duration,
request_timeout: Duration,
protocol: Bytes,
}
@@ -96,12 +112,14 @@ impl Config {
/// - max. request size = 1 MiB
/// - max. response size = 16 MiB
/// - inactivity timeout = 15s
/// - request timeout = 40s
pub fn new(id: &ProtocolId) -> Self {
let mut c = Config {
max_block_data_response: 128,
max_request_len: 1024 * 1024,
max_response_len: 16 * 1024 * 1024,
inactivity_timeout: Duration::from_secs(15),
request_timeout: Duration::from_secs(40),
protocol: Bytes::new(),
};
c.set_protocol(id);
@@ -149,12 +167,27 @@ pub struct BlockRequests<B: Block> {
config: Config,
/// Blockchain client.
chain: Arc<dyn Client<B>>,
/// List of all active connections and the requests we've sent.
peers: HashMap<PeerId, Vec<Connection<B>>>,
/// Futures sending back the block request response.
outgoing: FuturesUnordered<BoxFuture<'static, ()>>,
/// Events to return as soon as possible from `poll`.
pending_events: VecDeque<NetworkBehaviourAction<OutboundProtocol<B>, Event<B>>>,
}
/// Local tracking of a libp2p connection.
#[derive(Debug)]
struct Connection<B: Block> {
id: ConnectionId,
ongoing_request: Option<OngoingRequest<B>>,
}
#[derive(Debug)]
struct OngoingRequest<B: Block> {
request: message::BlockRequest<B>,
timeout: Delay,
}
impl<B> BlockRequests<B>
where
B: Block,
@@ -163,6 +196,7 @@ where
BlockRequests {
config: cfg,
chain,
peers: HashMap::new(),
outgoing: FuturesUnordered::new(),
pending_events: VecDeque::new(),
}
@@ -170,9 +204,30 @@ where
/// Issue a new block request.
///
/// Cancels any existing request targeting the same `PeerId`.
///
/// If the response doesn't arrive in time, or if the remote answers improperly, the target
/// will be disconnected.
pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest<B>) {
// Determine which connection to send the request to.
let connection = if let Some(peer) = self.peers.get_mut(target) {
// We don't want to have multiple requests for any given node, so in priority try to
// find a connection with an existing request, to override it.
if let Some(entry) = peer.iter_mut().find(|c| c.ongoing_request.is_some()) {
entry
} else if let Some(entry) = peer.get_mut(0) {
entry
} else {
log::error!(
target: "sync",
"State inconsistency: empty list of peer connections"
);
return;
}
} else {
return;
};
let protobuf_rq = api::v1::BlockRequest {
fields: u32::from_be_bytes([req.fields.bits(), 0, 0, 0]),
from_block: match req.from {
@@ -191,14 +246,31 @@ where
let mut buf = Vec::with_capacity(protobuf_rq.encoded_len());
if let Err(err) = protobuf_rq.encode(&mut buf) {
log::warn!("failed to encode block request {:?}: {:?}", protobuf_rq, err);
log::warn!(
target: "sync",
"Failed to encode block request {:?}: {:?}",
protobuf_rq,
err
);
return;
}
log::trace!("enqueueing block request to {:?}: {:?}", target, protobuf_rq);
if let Some(rq) = &connection.ongoing_request {
log::debug!(
target: "sync",
"Replacing existing block request on connection {:?}",
connection.id
);
}
connection.ongoing_request = Some(OngoingRequest {
request: req.clone(),
timeout: Delay::new(self.config.request_timeout),
});
log::trace!(target: "sync", "Enqueueing block request to {:?}: {:?}", target, protobuf_rq);
self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: target.clone(),
handler: NotifyHandler::Any,
handler: NotifyHandler::One(connection.id),
event: OutboundProtocol {
request: buf,
original_request: req,
@@ -215,7 +287,9 @@ where
, request: &api::v1::BlockRequest
) -> Result<api::v1::BlockResponse, Error>
{
log::trace!("block request from peer {}: from block {:?} to block {:?}, max blocks {:?}",
log::trace!(
target: "sync",
"Block request from peer {}: from block {:?} to block {:?}, max blocks {:?}",
peer,
request.from_block,
request.to_block,
@@ -332,6 +406,7 @@ where
};
let mut cfg = OneShotHandlerConfig::default();
cfg.inactive_timeout = self.config.inactivity_timeout;
cfg.substream_timeout = self.config.request_timeout;
OneShotHandler::new(SubstreamProtocol::new(p), cfg)
}
@@ -345,34 +420,138 @@ where
fn inject_disconnected(&mut self, _peer: &PeerId) {
}
fn inject_connection_established(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) {
self.peers.entry(peer_id.clone())
.or_default()
.push(Connection {
id: *id,
ongoing_request: None,
});
}
fn inject_connection_closed(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) {
let mut needs_remove = false;
if let Some(entry) = self.peers.get_mut(peer_id) {
if let Some(pos) = entry.iter().position(|i| i.id == *id) {
let ongoing_request = entry.remove(pos).ongoing_request;
if let Some(ongoing_request) = ongoing_request {
log::debug!(
target: "sync",
"Connection {:?} with {} closed with ongoing sync request: {:?}",
id,
peer_id,
ongoing_request
);
let ev = Event::RequestCancelled {
peer: peer_id.clone(),
original_request: ongoing_request.request.clone(),
};
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
}
if entry.is_empty() {
needs_remove = true;
}
} else {
log::error!(
target: "sync",
"State inconsistency: connection id not found in list"
);
}
} else {
log::error!(
target: "sync",
"State inconsistency: peer_id not found in list of connections"
);
}
if needs_remove {
self.peers.remove(peer_id);
}
}
fn inject_event(
&mut self,
peer: PeerId,
connection: ConnectionId,
connection_id: ConnectionId,
node_event: NodeEvent<B, NegotiatedSubstream>
) {
match node_event {
NodeEvent::Request(request, mut stream) => {
match self.on_block_request(&peer, &request) {
Ok(res) => {
log::trace!("enqueueing block response for peer {} with {} blocks", peer, res.blocks.len());
log::trace!(
target: "sync",
"Enqueueing block response for peer {} with {} blocks",
peer, res.blocks.len()
);
let mut data = Vec::with_capacity(res.encoded_len());
if let Err(e) = res.encode(&mut data) {
log::debug!("error encoding block response for peer {}: {}", peer, e)
log::debug!(
target: "sync",
"Error encoding block response for peer {}: {}",
peer, e
)
} else {
let future = async move {
if let Err(e) = write_one(&mut stream, data).await {
log::debug!("error writing block response: {}", e)
log::debug!(
target: "sync",
"Error writing block response: {}",
e
);
}
};
self.outgoing.push(future.boxed())
}
}
Err(e) => log::debug!("error handling block request from peer {}: {}", peer, e)
Err(e) => log::debug!(
target: "sync",
"Error handling block request from peer {}: {}", peer, e
)
}
}
NodeEvent::Response(original_request, response) => {
log::trace!("received block response from peer {} with {} blocks", peer, response.blocks.len());
log::trace!(
target: "sync",
"Received block response from peer {} with {} blocks",
peer, response.blocks.len()
);
if let Some(connections) = self.peers.get_mut(&peer) {
if let Some(connection) = connections.iter_mut().find(|c| c.id == connection_id) {
if let Some(ongoing_request) = &mut connection.ongoing_request {
if ongoing_request.request == original_request {
connection.ongoing_request = None;
} else {
// We're no longer interested in that request.
log::debug!(
target: "sync",
"Received response from {} to obsolete block request {:?}",
peer,
original_request
);
return;
}
} else {
// We remove from `self.peers` requests we're no longer interested in,
// so this can legitimately happen.
return;
}
} else {
log::error!(
target: "sync",
"State inconsistency: response on non-existing connection {:?}",
connection_id
);
return;
}
} else {
log::error!(
target: "sync",
"State inconsistency: response on non-connected peer {}",
peer
);
return;
}
let blocks = response.blocks.into_iter().map(|block_data| {
Ok(message::BlockData::<B> {
hash: Decode::decode(&mut block_data.hash.as_ref())?,
@@ -419,7 +598,10 @@ where
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
}
Err(err) => {
log::debug!("failed to decode block response from peer {}: {}", peer, err);
log::debug!(
target: "sync",
"Failed to decode block response from peer {}: {}", peer, err
);
}
}
}
@@ -433,6 +615,31 @@ where
return Poll::Ready(ev);
}
// Check the request timeouts.
for (peer, connections) in &mut self.peers {
for connection in connections {
let ongoing_request = match &mut connection.ongoing_request {
Some(rq) => rq,
None => continue,
};
if let Poll::Ready(_) = Pin::new(&mut ongoing_request.timeout).poll(cx) {
let original_request = ongoing_request.request.clone();
connection.ongoing_request = None;
log::debug!(
target: "sync",
"Request timeout for {}: {:?}",
peer, original_request
);
let ev = Event::RequestTimeout {
peer: peer.clone(),
original_request,
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
}
}
while let Poll::Ready(Some(_)) = self.outgoing.poll_next_unpin(cx) {}
Poll::Pending
}