client/network: Use request response for block requests (#7478)

* client/network: Add scaffolding for finality req to use req resp
	#sc

* client/network/src/finality_requests: Remove

* client/network/src/behaviour: Pass request id down to sync

* client/network: Use request response for block requests

* client/network: Move handler logic into *_*_handler.rs

* client/network: Track ongoing finality requests in protocol.rs

* client/network: Remove commented out finalization initialization

* client/network: Add docs for request handlers

* client/network/finality_request_handler: Log errors

* client/network/block_request_handler: Log errors

* client/network: Format

* client/network: Handle block request failure

* protocols/network: Fix tests

* client/network/src/behaviour: Handle request sending errors

* client/network: Move response handling into custom method

* client/network/protocol: Handle block response errors

* client/network/protocol: Remove tracking of obsolete requests

* client/network/protocol: Remove block request start time tracking

This will be handled generically via request-responses.

* client/network/protocol: Refactor on_*_request_started

* client/network: Pass protocol config instead of protocol name

* client/network: Pass protocol config in tests

* client/network/config: Document request response configs

* client/network/src/_request_handler: Document protocol config gen

* client/network/src/protocol: Document Peer request values

* client/network: Rework request response to always use oneshot

* client/network: Unified metric reporting for all request protocols

* client/network: Move protobuf parsing into protocol.rs

* client/network/src/protocol: Return pending events after poll

* client/network: Improve error handling and documentation

* client/network/behaviour: Remove outdated error types

* Update client/network/src/block_request_handler.rs

Co-authored-by: Ashley <ashley.ruglys@gmail.com>

* Update client/network/src/finality_request_handler.rs

Co-authored-by: Ashley <ashley.ruglys@gmail.com>

* client/network/protocol: Reduce reputation on timeout

* client/network/protocol: Refine reputation changes

* client/network/block_request_handler: Set and explain queue length

* client/service: Deny block requests when light client

* client/service: Fix role matching

* client: Enforce line width

* client/network/request_responses: Fix unit tests

* client/network: Expose time to build response via metrics

* client/network/request_responses: Fix early connection closed error

* client/network/protocol: Fix line length

* client/network/protocol: Disconnect on most request failures

* client/network/protocol: Disconnect peer when oneshot is canceled

* client/network/protocol: Disconnect peer even when connection closed

* client/network/protocol: Remove debugging log line

* client/network/request_response: Use Clone::clone for error

* client/network/request_response: Remove outdated comment

With libp2p v0.33.0 libp2p-request-response properly sends inbound
failures on connections being closed.

Co-authored-by: Addie Wagenknecht <addie@nortd.com>
Co-authored-by: Ashley <ashley.ruglys@gmail.com>
This commit is contained in:
Max Inden
2021-01-05 19:20:54 +01:00
committed by GitHub
parent 92f596829d
commit 3f629f743b
17 changed files with 780 additions and 1299 deletions
+53 -100
View File
@@ -17,20 +17,22 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{
config::{ProtocolId, Role}, block_requests, light_client_handler,
peer_info, request_responses, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
config::{ProtocolId, Role}, light_client_handler, peer_info, request_responses,
discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
protocol::{message::Roles, CustomMessageOutcome, NotificationsSink, Protocol},
ObservedRole, DhtEvent, ExHashT,
};
use bytes::Bytes;
use codec::Encode as _;
use futures::channel::oneshot;
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, PublicKey};
use libp2p::identify::IdentifyInfo;
use libp2p::kad::record;
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters};
use log::debug;
use prost::Message;
use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}};
use sp_runtime::{traits::{Block as BlockT, NumberFor}, Justification};
use std::{
@@ -42,7 +44,7 @@ use std::{
};
pub use crate::request_responses::{
ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, RequestId, SendRequestError
ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, RequestId,
};
/// General behaviour of the network. Combines all protocols together.
@@ -58,8 +60,6 @@ pub struct Behaviour<B: BlockT, H: ExHashT> {
discovery: DiscoveryBehaviour,
/// Generic request-reponse protocols.
request_responses: request_responses::RequestResponsesBehaviour,
/// Block request handling.
block_requests: block_requests::BlockRequests<B>,
/// Light client request handling.
light_client_handler: light_client_handler::LightClientHandler<B>,
@@ -70,6 +70,11 @@ pub struct Behaviour<B: BlockT, H: ExHashT> {
/// Role of our local node, as originally passed from the configuration.
#[behaviour(ignore)]
role: Role,
/// Protocol name used to send out block requests via
/// [`request_responses::RequestResponsesBehaviour`].
#[behaviour(ignore)]
block_request_protocol_name: String,
}
/// Event generated by `Behaviour`.
@@ -93,34 +98,18 @@ pub enum BehaviourOut<B: BlockT> {
result: Result<Duration, ResponseFailure>,
},
/// A request initiated using [`Behaviour::send_request`] has succeeded or failed.
/// A request has succeeded or failed.
///
/// This event is generated for statistics purposes.
RequestFinished {
/// Request that has succeeded.
request_id: RequestId,
/// Response sent by the remote or reason for failure.
result: Result<Vec<u8>, RequestFailure>,
},
/// Started a new request with the given node.
///
/// This event is for statistics purposes only. The request and response handling are entirely
/// internal to the behaviour.
OpaqueRequestStarted {
/// Peer that we send a request to.
peer: PeerId,
/// Protocol name of the request.
protocol: String,
},
/// Finished, successfully or not, a previously-started request.
///
/// This event is for statistics purposes only. The request and response handling are entirely
/// internal to the behaviour.
OpaqueRequestFinished {
/// Who we were requesting.
peer: PeerId,
/// Protocol name of the request.
protocol: String,
/// How long before the response came or the request got cancelled.
request_duration: Duration,
/// Name of the protocol in question.
protocol: Cow<'static, str>,
/// Duration the request took.
duration: Duration,
/// Result of the request.
result: Result<(), RequestFailure>,
},
/// Opened a substream with the given node with the given notifications protocol.
@@ -180,21 +169,28 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
role: Role,
user_agent: String,
local_public_key: PublicKey,
block_requests: block_requests::BlockRequests<B>,
light_client_handler: light_client_handler::LightClientHandler<B>,
disco_config: DiscoveryConfig,
request_response_protocols: Vec<request_responses::ProtocolConfig>,
// Block request protocol config.
block_request_protocol_config: request_responses::ProtocolConfig,
// All remaining request protocol configs.
mut request_response_protocols: Vec<request_responses::ProtocolConfig>,
) -> Result<Self, request_responses::RegisterError> {
// Extract protocol name and add to `request_response_protocols`.
let block_request_protocol_name = block_request_protocol_config.name.to_string();
request_response_protocols.push(block_request_protocol_config);
Ok(Behaviour {
substrate,
peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key),
discovery: disco_config.finish(),
request_responses:
request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?,
block_requests,
light_client_handler,
events: VecDeque::new(),
role,
block_request_protocol_name,
})
}
@@ -236,13 +232,14 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
}
/// Initiates sending a request.
///
/// An error is returned if we are not connected to the target peer of if the protocol doesn't
/// match one that has been registered.
pub fn send_request(&mut self, target: &PeerId, protocol: &str, request: Vec<u8>)
-> Result<RequestId, SendRequestError>
{
self.request_responses.send_request(target, protocol, request)
pub fn send_request(
&mut self,
target: &PeerId,
protocol: &str,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
) {
self.request_responses.send_request(target, protocol, request, pending_response)
}
/// Registers a new notifications protocol.
@@ -331,28 +328,20 @@ Behaviour<B, H> {
self.events.push_back(BehaviourOut::BlockImport(origin, blocks)),
CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) =>
self.events.push_back(BehaviourOut::JustificationImport(origin, hash, nb, justification)),
CustomMessageOutcome::BlockRequest { target, request } => {
match self.block_requests.send_request(&target, request) {
block_requests::SendRequestOutcome::Ok => {
self.events.push_back(BehaviourOut::OpaqueRequestStarted {
peer: target,
protocol: self.block_requests.protocol_name().to_owned(),
});
},
block_requests::SendRequestOutcome::Replaced { request_duration, .. } => {
self.events.push_back(BehaviourOut::OpaqueRequestFinished {
peer: target.clone(),
protocol: self.block_requests.protocol_name().to_owned(),
request_duration,
});
self.events.push_back(BehaviourOut::OpaqueRequestStarted {
peer: target,
protocol: self.block_requests.protocol_name().to_owned(),
});
}
block_requests::SendRequestOutcome::NotConnected |
block_requests::SendRequestOutcome::EncodeError(_) => {},
CustomMessageOutcome::BlockRequest { target, request, pending_response } => {
let mut buf = Vec::with_capacity(request.encoded_len());
if let Err(err) = request.encode(&mut buf) {
log::warn!(
target: "sync",
"Failed to encode block request {:?}: {:?}",
request, err
);
return
}
self.request_responses.send_request(
&target, &self.block_request_protocol_name, buf, pending_response,
);
},
CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles, notifications_sink } => {
let role = reported_roles_to_observed_role(&self.role, &remote, roles);
@@ -401,51 +390,15 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<request_responses::Even
result,
});
}
request_responses::Event::RequestFinished { request_id, result } => {
request_responses::Event::RequestFinished { peer, protocol, duration, result } => {
self.events.push_back(BehaviourOut::RequestFinished {
request_id,
result,
peer, protocol, duration, result,
});
},
}
}
}
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B>> for Behaviour<B, H> {
fn inject_event(&mut self, event: block_requests::Event<B>) {
match event {
block_requests::Event::AnsweredRequest { peer, total_handling_time } => {
self.events.push_back(BehaviourOut::InboundRequest {
peer,
protocol: self.block_requests.protocol_name().to_owned().into(),
result: Ok(total_handling_time),
});
},
block_requests::Event::Response { peer, response, request_duration } => {
self.events.push_back(BehaviourOut::OpaqueRequestFinished {
peer: peer.clone(),
protocol: self.block_requests.protocol_name().to_owned(),
request_duration,
});
let ev = self.substrate.on_block_response(peer, response);
self.inject_event(ev);
}
block_requests::Event::RequestCancelled { peer, request_duration, .. } |
block_requests::Event::RequestTimeout { peer, request_duration, .. } => {
// There doesn't exist any mechanism to report cancellations or timeouts yet, so
// we process them by disconnecting the node.
self.events.push_back(BehaviourOut::OpaqueRequestFinished {
peer: peer.clone(),
protocol: self.block_requests.protocol_name().to_owned(),
request_duration,
});
self.substrate.on_block_request_failed(&peer);
}
}
}
}
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<peer_info::PeerInfoEvent>
for Behaviour<B, H> {
fn inject_event(&mut self, event: peer_info::PeerInfoEvent) {
@@ -0,0 +1,220 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Helper for handling (i.e. answering) block requests from a remote peer via the
//! [`crate::request_responses::RequestResponsesBehaviour`].
use codec::{Encode, Decode};
use crate::chain::Client;
use crate::config::ProtocolId;
use crate::protocol::{message::BlockAttributes};
use crate::request_responses::{IncomingRequest, ProtocolConfig};
use crate::schema::v1::block_request::FromBlock;
use crate::schema::v1::{BlockResponse, Direction};
use futures::channel::{mpsc, oneshot};
use futures::stream::StreamExt;
use log::debug;
use prost::Message;
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{Block as BlockT, Header, One, Zero};
use std::cmp::min;
use std::sync::{Arc};
use std::time::Duration;
const LOG_TARGET: &str = "block-request-handler";
const MAX_BLOCKS_IN_RESPONSE: usize = 128;
const MAX_BODY_BYTES: usize = 8 * 1024 * 1024;
/// Generates a [`ProtocolConfig`] for the block request protocol, refusing incoming requests.
pub fn generate_protocol_config(protocol_id: ProtocolId) -> ProtocolConfig {
ProtocolConfig {
name: generate_protocol_name(protocol_id).into(),
max_request_size: 1024 * 1024,
max_response_size: 16 * 1024 * 1024,
request_timeout: Duration::from_secs(40),
inbound_queue: None,
}
}
/// Generate the block protocol name from chain specific protocol identifier.
fn generate_protocol_name(protocol_id: ProtocolId) -> String {
let mut s = String::new();
s.push_str("/");
s.push_str(protocol_id.as_ref());
s.push_str("/sync/2");
s
}
/// Handler for incoming block requests from a remote peer.
pub struct BlockRequestHandler<B> {
client: Arc<dyn Client<B>>,
request_receiver: mpsc::Receiver<IncomingRequest>,
}
impl <B: BlockT> BlockRequestHandler<B> {
/// Create a new [`BlockRequestHandler`].
pub fn new(protocol_id: ProtocolId, client: Arc<dyn Client<B>>) -> (Self, ProtocolConfig) {
// Rate of arrival multiplied with the waiting time in the queue equals the queue length.
//
// An average Polkadot sentry node serves less than 5 requests per second. The 95th percentile
// serving a request is less than 2 second. Thus one would estimate the queue length to be
// below 10.
//
// Choosing 20 as the queue length to give some additional buffer.
let (tx, request_receiver) = mpsc::channel(20);
let mut protocol_config = generate_protocol_config(protocol_id);
protocol_config.inbound_queue = Some(tx);
(Self { client, request_receiver }, protocol_config)
}
fn handle_request(
&self,
payload: Vec<u8>,
pending_response: oneshot::Sender<Vec<u8>>
) -> Result<(), HandleRequestError> {
let request = crate::schema::v1::BlockRequest::decode(&payload[..])?;
let from_block_id = match request.from_block.ok_or(HandleRequestError::MissingFromField)? {
FromBlock::Hash(ref h) => {
let h = Decode::decode(&mut h.as_ref())?;
BlockId::<B>::Hash(h)
}
FromBlock::Number(ref n) => {
let n = Decode::decode(&mut n.as_ref())?;
BlockId::<B>::Number(n)
}
};
let max_blocks = if request.max_blocks == 0 {
MAX_BLOCKS_IN_RESPONSE
} else {
min(request.max_blocks as usize, MAX_BLOCKS_IN_RESPONSE)
};
let direction = Direction::from_i32(request.direction)
.ok_or(HandleRequestError::ParseDirection)?;
let attributes = BlockAttributes::from_be_u32(request.fields)?;
let get_header = attributes.contains(BlockAttributes::HEADER);
let get_body = attributes.contains(BlockAttributes::BODY);
let get_justification = attributes.contains(BlockAttributes::JUSTIFICATION);
let mut blocks = Vec::new();
let mut block_id = from_block_id;
let mut total_size: usize = 0;
while let Some(header) = self.client.header(block_id).unwrap_or(None) {
let number = *header.number();
let hash = header.hash();
let parent_hash = *header.parent_hash();
let justification = if get_justification {
self.client.justification(&BlockId::Hash(hash))?
} else {
None
};
let is_empty_justification = justification.as_ref().map(|j| j.is_empty()).unwrap_or(false);
let body = if get_body {
match self.client.block_body(&BlockId::Hash(hash))? {
Some(mut extrinsics) => extrinsics.iter_mut()
.map(|extrinsic| extrinsic.encode())
.collect(),
None => {
log::trace!(target: "sync", "Missing data for block request.");
break;
}
}
} else {
Vec::new()
};
let block_data = crate::schema::v1::BlockData {
hash: hash.encode(),
header: if get_header {
header.encode()
} else {
Vec::new()
},
body,
receipt: Vec::new(),
message_queue: Vec::new(),
justification: justification.unwrap_or_default(),
is_empty_justification,
};
total_size += block_data.body.len();
blocks.push(block_data);
if blocks.len() >= max_blocks as usize || total_size > MAX_BODY_BYTES {
break
}
match direction {
Direction::Ascending => {
block_id = BlockId::Number(number + One::one())
}
Direction::Descending => {
if number.is_zero() {
break
}
block_id = BlockId::Hash(parent_hash)
}
}
}
let res = BlockResponse { blocks };
let mut data = Vec::with_capacity(res.encoded_len());
res.encode(&mut data)?;
pending_response.send(data)
.map_err(|_| HandleRequestError::SendResponse)
}
/// Run [`BlockRequestHandler`].
pub async fn run(mut self) {
while let Some(request) = self.request_receiver.next().await {
let IncomingRequest { peer, payload, pending_response } = request;
match self.handle_request(payload, pending_response) {
Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
Err(e) => debug!(
target: LOG_TARGET,
"Failed to handle block request from {}: {}",
peer, e,
),
}
}
}
}
#[derive(derive_more::Display, derive_more::From)]
enum HandleRequestError {
#[display(fmt = "Failed to decode request: {}.", _0)]
DecodeProto(prost::DecodeError),
#[display(fmt = "Failed to encode response: {}.", _0)]
EncodeProto(prost::EncodeError),
#[display(fmt = "Failed to decode block hash: {}.", _0)]
DecodeScale(codec::Error),
#[display(fmt = "Missing `BlockRequest::from_block` field.")]
MissingFromField,
#[display(fmt = "Failed to parse BlockRequest::direction.")]
ParseDirection,
Client(sp_blockchain::Error),
#[display(fmt = "Failed to send response.")]
SendResponse,
}
@@ -1,859 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! `NetworkBehaviour` implementation which handles incoming block requests.
//!
//! Every request is coming in on a separate connection substream which gets
//! closed after we have sent the response back. Incoming requests are encoded
//! as protocol buffers (cf. `api.v1.proto`).
#![allow(unused)]
use bytes::Bytes;
use codec::{Encode, Decode};
use crate::{
chain::Client,
config::ProtocolId,
protocol::{message::{self, BlockAttributes}},
schema,
};
use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
use futures_timer::Delay;
use libp2p::{
core::{
ConnectedPoint,
Multiaddr,
PeerId,
connection::ConnectionId,
upgrade::{InboundUpgrade, OutboundUpgrade, ReadOneError, UpgradeInfo, Negotiated},
upgrade::{DeniedUpgrade, read_one, write_one}
},
swarm::{
NegotiatedSubstream,
NetworkBehaviour,
NetworkBehaviourAction,
NotifyHandler,
OneShotHandler,
OneShotHandlerConfig,
PollParameters,
SubstreamProtocol
}
};
use prost::Message;
use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}};
use std::{
cmp::min,
collections::{HashMap, VecDeque},
io,
iter,
marker::PhantomData,
pin::Pin,
sync::Arc,
time::Duration,
task::{Context, Poll}
};
use void::{Void, unreachable};
use wasm_timer::Instant;
// Type alias for convenience.
pub type Error = Box<dyn std::error::Error + 'static>;
/// Event generated by the block requests behaviour.
#[derive(Debug)]
pub enum Event<B: Block> {
/// A request came and we have successfully answered it.
AnsweredRequest {
/// Peer which has emitted the request.
peer: PeerId,
/// Time elapsed between when we received the request and when we sent back the response.
total_handling_time: Duration,
},
/// A response to a block request has arrived.
Response {
peer: PeerId,
response: message::BlockResponse<B>,
/// Time elapsed between the start of the request and the response.
request_duration: Duration,
},
/// A request has been cancelled because the peer has disconnected.
/// Disconnects can also happen as a result of violating the network protocol.
///
/// > **Note**: This event is NOT emitted if a request is overridden by calling `send_request`.
/// > For that, you must check the value returned by `send_request`.
RequestCancelled {
peer: PeerId,
/// Time elapsed between the start of the request and the cancellation.
request_duration: Duration,
},
/// A request has timed out.
RequestTimeout {
peer: PeerId,
/// Time elapsed between the start of the request and the timeout.
request_duration: Duration,
}
}
/// Configuration options for `BlockRequests`.
#[derive(Debug, Clone)]
pub struct Config {
max_block_data_response: u32,
max_block_body_bytes: usize,
max_request_len: usize,
max_response_len: usize,
inactivity_timeout: Duration,
request_timeout: Duration,
protocol: String,
}
impl Config {
/// Create a fresh configuration with the following options:
///
/// - max. block data in response = 128
/// - max. request size = 1 MiB
/// - max. response size = 16 MiB
/// - inactivity timeout = 15s
/// - request timeout = 40s
pub fn new(id: &ProtocolId) -> Self {
let mut c = Config {
max_block_data_response: 128,
max_block_body_bytes: 8 * 1024 * 1024,
max_request_len: 1024 * 1024,
max_response_len: 16 * 1024 * 1024,
inactivity_timeout: Duration::from_secs(15),
request_timeout: Duration::from_secs(40),
protocol: String::new(),
};
c.set_protocol(id);
c
}
/// Limit the max. number of block data in a response.
pub fn set_max_block_data_response(&mut self, v: u32) -> &mut Self {
self.max_block_data_response = v;
self
}
/// Limit the max. length of incoming block request bytes.
pub fn set_max_request_len(&mut self, v: usize) -> &mut Self {
self.max_request_len = v;
self
}
/// Limit the max. size of responses to our block requests.
pub fn set_max_response_len(&mut self, v: usize) -> &mut Self {
self.max_response_len = v;
self
}
/// Limit the max. duration the substream may remain inactive before closing it.
pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self {
self.inactivity_timeout = v;
self
}
/// Set the maximum total bytes of block bodies that are send in the response.
/// Note that at least one block is always sent regardless of the limit.
/// This should be lower than the value specified in `set_max_response_len`
/// accounting for headers, justifications and encoding overhead.
pub fn set_max_block_body_bytes(&mut self, v: usize) -> &mut Self {
self.max_block_body_bytes = v;
self
}
/// Set protocol to use for upgrade negotiation.
pub fn set_protocol(&mut self, id: &ProtocolId) -> &mut Self {
let mut s = String::new();
s.push_str("/");
s.push_str(id.as_ref());
s.push_str("/sync/2");
self.protocol = s;
self
}
}
/// The block request handling behaviour.
pub struct BlockRequests<B: Block> {
/// This behaviour's configuration.
config: Config,
/// Blockchain client.
chain: Arc<dyn Client<B>>,
/// List of all active connections and the requests we've sent.
peers: HashMap<PeerId, Vec<Connection<B>>>,
/// Futures sending back the block request response. Returns the `PeerId` we sent back to, and
/// the total time the handling of this request took.
outgoing: FuturesUnordered<BoxFuture<'static, (PeerId, Duration)>>,
/// Events to return as soon as possible from `poll`.
pending_events: VecDeque<NetworkBehaviourAction<OutboundProtocol<B>, Event<B>>>,
}
/// Local tracking of a libp2p connection.
#[derive(Debug)]
struct Connection<B: Block> {
id: ConnectionId,
ongoing_request: Option<OngoingRequest<B>>,
}
#[derive(Debug)]
struct OngoingRequest<B: Block> {
/// `Instant` when the request has been emitted. Used for diagnostic purposes.
emitted: Instant,
request: message::BlockRequest<B>,
timeout: Delay,
}
/// Outcome of calling `send_request`.
#[derive(Debug)]
#[must_use]
pub enum SendRequestOutcome<B: Block> {
/// Request has been emitted.
Ok,
/// The request has been emitted and has replaced an existing request.
Replaced {
/// The previously-emitted request.
previous: message::BlockRequest<B>,
/// Time that had elapsed since `previous` has been emitted.
request_duration: Duration,
},
/// Didn't start a request because we have no connection to this node.
/// If `send_request` returns that, it is as if the function had never been called.
NotConnected,
/// Error while serializing the request.
EncodeError(prost::EncodeError),
}
impl<B> BlockRequests<B>
where
B: Block,
{
pub fn new(cfg: Config, chain: Arc<dyn Client<B>>) -> Self {
BlockRequests {
config: cfg,
chain,
peers: HashMap::new(),
outgoing: FuturesUnordered::new(),
pending_events: VecDeque::new(),
}
}
/// Returns the libp2p protocol name used on the wire (e.g. `/foo/sync/2`).
pub fn protocol_name(&self) -> &str {
&self.config.protocol
}
/// Issue a new block request.
///
/// Cancels any existing request targeting the same `PeerId`.
///
/// If the response doesn't arrive in time, or if the remote answers improperly, the target
/// will be disconnected.
pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest<B>) -> SendRequestOutcome<B> {
// Determine which connection to send the request to.
let connection = if let Some(peer) = self.peers.get_mut(target) {
// We don't want to have multiple requests for any given node, so in priority try to
// find a connection with an existing request, to override it.
if let Some(entry) = peer.iter_mut().find(|c| c.ongoing_request.is_some()) {
entry
} else if let Some(entry) = peer.get_mut(0) {
entry
} else {
log::error!(
target: "sync",
"State inconsistency: empty list of peer connections"
);
return SendRequestOutcome::NotConnected;
}
} else {
return SendRequestOutcome::NotConnected;
};
let protobuf_rq = build_protobuf_block_request(
req.fields,
req.from.clone(),
req.to.clone(),
req.direction,
req.max,
);
let mut buf = Vec::with_capacity(protobuf_rq.encoded_len());
if let Err(err) = protobuf_rq.encode(&mut buf) {
log::warn!(
target: "sync",
"Failed to encode block request {:?}: {:?}",
protobuf_rq,
err
);
return SendRequestOutcome::EncodeError(err);
}
let previous_request = connection.ongoing_request.take();
connection.ongoing_request = Some(OngoingRequest {
emitted: Instant::now(),
request: req.clone(),
timeout: Delay::new(self.config.request_timeout),
});
log::trace!(target: "sync", "Enqueueing block request to {:?}: {:?}", target, protobuf_rq);
self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: target.clone(),
handler: NotifyHandler::One(connection.id),
event: OutboundProtocol {
request: buf,
original_request: req,
max_response_size: self.config.max_response_len,
protocol: self.config.protocol.as_bytes().to_vec().into(),
},
});
if let Some(previous_request) = previous_request {
log::debug!(
target: "sync",
"Replacing existing block request on connection {:?}",
connection.id
);
SendRequestOutcome::Replaced {
previous: previous_request.request,
request_duration: previous_request.emitted.elapsed(),
}
} else {
SendRequestOutcome::Ok
}
}
/// Callback, invoked when a new block request has been received from remote.
fn on_block_request
( &mut self
, peer: &PeerId
, request: &schema::v1::BlockRequest
) -> Result<schema::v1::BlockResponse, Error>
{
log::trace!(
target: "sync",
"Block request from peer {}: from block {:?} to block {:?}, max blocks {:?}",
peer,
request.from_block,
request.to_block,
request.max_blocks);
let from_block_id =
match request.from_block {
Some(schema::v1::block_request::FromBlock::Hash(ref h)) => {
let h = Decode::decode(&mut h.as_ref())?;
BlockId::<B>::Hash(h)
}
Some(schema::v1::block_request::FromBlock::Number(ref n)) => {
let n = Decode::decode(&mut n.as_ref())?;
BlockId::<B>::Number(n)
}
None => {
let msg = "missing `BlockRequest::from_block` field";
return Err(io::Error::new(io::ErrorKind::Other, msg).into())
}
};
let max_blocks =
if request.max_blocks == 0 {
self.config.max_block_data_response
} else {
min(request.max_blocks, self.config.max_block_data_response)
};
let direction =
if request.direction == schema::v1::Direction::Ascending as i32 {
schema::v1::Direction::Ascending
} else if request.direction == schema::v1::Direction::Descending as i32 {
schema::v1::Direction::Descending
} else {
let msg = format!("invalid `BlockRequest::direction` value: {}", request.direction);
return Err(io::Error::new(io::ErrorKind::Other, msg).into())
};
let attributes = BlockAttributes::from_be_u32(request.fields)?;
let get_header = attributes.contains(BlockAttributes::HEADER);
let get_body = attributes.contains(BlockAttributes::BODY);
let get_justification = attributes.contains(BlockAttributes::JUSTIFICATION);
let mut blocks = Vec::new();
let mut block_id = from_block_id;
let mut total_size = 0;
while let Some(header) = self.chain.header(block_id).unwrap_or(None) {
if blocks.len() >= max_blocks as usize
|| (blocks.len() >= 1 && total_size > self.config.max_block_body_bytes)
{
break
}
let number = *header.number();
let hash = header.hash();
let parent_hash = *header.parent_hash();
let justification = if get_justification {
self.chain.justification(&BlockId::Hash(hash))?
} else {
None
};
let is_empty_justification = justification.as_ref().map(|j| j.is_empty()).unwrap_or(false);
let body = if get_body {
match self.chain.block_body(&BlockId::Hash(hash))? {
Some(mut extrinsics) => extrinsics.iter_mut()
.map(|extrinsic| extrinsic.encode())
.collect(),
None => {
log::trace!(target: "sync", "Missing data for block request.");
break;
}
}
} else {
Vec::new()
};
let block_data = schema::v1::BlockData {
hash: hash.encode(),
header: if get_header {
header.encode()
} else {
Vec::new()
},
body,
receipt: Vec::new(),
message_queue: Vec::new(),
justification: justification.unwrap_or_default(),
is_empty_justification,
};
total_size += block_data.body.len();
blocks.push(block_data);
match direction {
schema::v1::Direction::Ascending => {
block_id = BlockId::Number(number + One::one())
}
schema::v1::Direction::Descending => {
if number.is_zero() {
break
}
block_id = BlockId::Hash(parent_hash)
}
}
}
Ok(schema::v1::BlockResponse { blocks })
}
}
impl<B> NetworkBehaviour for BlockRequests<B>
where
B: Block
{
type ProtocolsHandler = OneShotHandler<InboundProtocol<B>, OutboundProtocol<B>, NodeEvent<B, NegotiatedSubstream>>;
type OutEvent = Event<B>;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
let p = InboundProtocol {
max_request_len: self.config.max_request_len,
protocol: self.config.protocol.as_bytes().to_owned().into(),
marker: PhantomData,
};
let mut cfg = OneShotHandlerConfig::default();
cfg.keep_alive_timeout = self.config.inactivity_timeout;
cfg.outbound_substream_timeout = self.config.request_timeout;
OneShotHandler::new(SubstreamProtocol::new(p, ()), cfg)
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, _peer: &PeerId) {
}
fn inject_disconnected(&mut self, _peer: &PeerId) {
}
fn inject_connection_established(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) {
self.peers.entry(peer_id.clone())
.or_default()
.push(Connection {
id: *id,
ongoing_request: None,
});
}
fn inject_connection_closed(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) {
let mut needs_remove = false;
if let Some(entry) = self.peers.get_mut(peer_id) {
if let Some(pos) = entry.iter().position(|i| i.id == *id) {
let ongoing_request = entry.remove(pos).ongoing_request;
if let Some(ongoing_request) = ongoing_request {
log::debug!(
target: "sync",
"Connection {:?} with {} closed with ongoing sync request: {:?}",
id,
peer_id,
ongoing_request
);
let ev = Event::RequestCancelled {
peer: peer_id.clone(),
request_duration: ongoing_request.emitted.elapsed(),
};
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
}
if entry.is_empty() {
needs_remove = true;
}
} else {
log::error!(
target: "sync",
"State inconsistency: connection id not found in list"
);
}
} else {
log::error!(
target: "sync",
"State inconsistency: peer_id not found in list of connections"
);
}
if needs_remove {
self.peers.remove(peer_id);
}
}
fn inject_event(
&mut self,
peer: PeerId,
connection_id: ConnectionId,
node_event: NodeEvent<B, NegotiatedSubstream>
) {
match node_event {
NodeEvent::Request(request, mut stream, handling_start) => {
match self.on_block_request(&peer, &request) {
Ok(res) => {
log::trace!(
target: "sync",
"Enqueueing block response for peer {} with {} blocks",
peer, res.blocks.len()
);
let mut data = Vec::with_capacity(res.encoded_len());
if let Err(e) = res.encode(&mut data) {
log::debug!(
target: "sync",
"Error encoding block response for peer {}: {}",
peer, e
)
} else {
self.outgoing.push(async move {
if let Err(e) = write_one(&mut stream, data).await {
log::debug!(
target: "sync",
"Error writing block response: {}",
e
);
}
(peer, handling_start.elapsed())
}.boxed());
}
}
Err(e) => log::debug!(
target: "sync",
"Error handling block request from peer {}: {}", peer, e
)
}
}
NodeEvent::Response(original_request, response) => {
log::trace!(
target: "sync",
"Received block response from peer {} with {} blocks",
peer, response.blocks.len()
);
let request_duration = if let Some(connections) = self.peers.get_mut(&peer) {
if let Some(connection) = connections.iter_mut().find(|c| c.id == connection_id) {
if let Some(ongoing_request) = &mut connection.ongoing_request {
if ongoing_request.request == original_request {
let request_duration = ongoing_request.emitted.elapsed();
connection.ongoing_request = None;
request_duration
} else {
// We're no longer interested in that request.
log::debug!(
target: "sync",
"Received response from {} to obsolete block request {:?}",
peer,
original_request
);
return;
}
} else {
// We remove from `self.peers` requests we're no longer interested in,
// so this can legitimately happen.
log::trace!(
target: "sync",
"Response discarded because it concerns an obsolete request"
);
return;
}
} else {
log::error!(
target: "sync",
"State inconsistency: response on non-existing connection {:?}",
connection_id
);
return;
}
} else {
log::error!(
target: "sync",
"State inconsistency: response on non-connected peer {}",
peer
);
return;
};
let blocks = response.blocks.into_iter().map(|block_data| {
Ok(message::BlockData::<B> {
hash: Decode::decode(&mut block_data.hash.as_ref())?,
header: if !block_data.header.is_empty() {
Some(Decode::decode(&mut block_data.header.as_ref())?)
} else {
None
},
body: if original_request.fields.contains(message::BlockAttributes::BODY) {
Some(block_data.body.iter().map(|body| {
Decode::decode(&mut body.as_ref())
}).collect::<Result<Vec<_>, _>>()?)
} else {
None
},
receipt: if !block_data.message_queue.is_empty() {
Some(block_data.receipt)
} else {
None
},
message_queue: if !block_data.message_queue.is_empty() {
Some(block_data.message_queue)
} else {
None
},
justification: if !block_data.justification.is_empty() {
Some(block_data.justification)
} else if block_data.is_empty_justification {
Some(Vec::new())
} else {
None
},
})
}).collect::<Result<Vec<_>, codec::Error>>();
match blocks {
Ok(blocks) => {
let id = original_request.id;
let ev = Event::Response {
peer,
response: message::BlockResponse::<B> { id, blocks },
request_duration,
};
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
}
Err(err) => {
log::debug!(
target: "sync",
"Failed to decode block response from peer {}: {}", peer, err
);
}
}
}
}
}
fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters)
-> Poll<NetworkBehaviourAction<OutboundProtocol<B>, Event<B>>>
{
if let Some(ev) = self.pending_events.pop_front() {
return Poll::Ready(ev);
}
// Check the request timeouts.
for (peer, connections) in &mut self.peers {
for connection in connections {
let ongoing_request = match &mut connection.ongoing_request {
Some(rq) => rq,
None => continue,
};
if let Poll::Ready(_) = Pin::new(&mut ongoing_request.timeout).poll(cx) {
let original_request = ongoing_request.request.clone();
let request_duration = ongoing_request.emitted.elapsed();
connection.ongoing_request = None;
log::debug!(
target: "sync",
"Request timeout for {}: {:?}",
peer, original_request
);
let ev = Event::RequestTimeout {
peer: peer.clone(),
request_duration,
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
}
}
if let Poll::Ready(Some((peer, total_handling_time))) = self.outgoing.poll_next_unpin(cx) {
let ev = Event::AnsweredRequest {
peer,
total_handling_time,
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
Poll::Pending
}
}
/// Output type of inbound and outbound substream upgrades.
#[derive(Debug)]
pub enum NodeEvent<B: Block, T> {
/// Incoming request from remote, substream to use for the response, and when we started
/// handling this request.
Request(schema::v1::BlockRequest, T, Instant),
/// Incoming response from remote.
Response(message::BlockRequest<B>, schema::v1::BlockResponse),
}
/// Substream upgrade protocol.
///
/// We attempt to parse an incoming protobuf encoded request (cf. `Request`)
/// which will be handled by the `BlockRequests` behaviour, i.e. the request
/// will become visible via `inject_node_event` which then dispatches to the
/// relevant callback to process the message and prepare a response.
#[derive(Debug, Clone)]
pub struct InboundProtocol<B> {
/// The max. request length in bytes.
max_request_len: usize,
/// The protocol to use during upgrade negotiation.
protocol: Bytes,
/// Type of the block.
marker: PhantomData<B>,
}
impl<B: Block> UpgradeInfo for InboundProtocol<B> {
type Info = Bytes;
type InfoIter = iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
iter::once(self.protocol.clone())
}
}
impl<B, T> InboundUpgrade<T> for InboundProtocol<B>
where
B: Block,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static
{
type Output = NodeEvent<B, T>;
type Error = ReadOneError;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future {
// This `Instant` will be passed around until the processing of this request is done.
let handling_start = Instant::now();
let future = async move {
let len = self.max_request_len;
let vec = read_one(&mut s, len).await?;
match schema::v1::BlockRequest::decode(&vec[..]) {
Ok(r) => Ok(NodeEvent::Request(r, s, handling_start)),
Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)))
}
};
future.boxed()
}
}
/// Substream upgrade protocol.
///
/// Sends a request to remote and awaits the response.
#[derive(Debug, Clone)]
pub struct OutboundProtocol<B: Block> {
/// The serialized protobuf request.
request: Vec<u8>,
/// The original request. Passed back through the API when the response comes back.
original_request: message::BlockRequest<B>,
/// The max. response length in bytes.
max_response_size: usize,
/// The protocol to use for upgrade negotiation.
protocol: Bytes,
}
impl<B: Block> UpgradeInfo for OutboundProtocol<B> {
type Info = Bytes;
type InfoIter = iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
iter::once(self.protocol.clone())
}
}
impl<B, T> OutboundUpgrade<T> for OutboundProtocol<B>
where
B: Block,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static
{
type Output = NodeEvent<B, T>;
type Error = ReadOneError;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_outbound(self, mut s: T, _: Self::Info) -> Self::Future {
async move {
write_one(&mut s, &self.request).await?;
let vec = read_one(&mut s, self.max_response_size).await?;
schema::v1::BlockResponse::decode(&vec[..])
.map(|r| NodeEvent::Response(self.original_request, r))
.map_err(|e| {
ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))
})
}.boxed()
}
}
/// Build protobuf block request message.
pub(crate) fn build_protobuf_block_request<Hash: Encode, Number: Encode>(
attributes: BlockAttributes,
from_block: message::FromBlock<Hash, Number>,
to_block: Option<Hash>,
direction: message::Direction,
max_blocks: Option<u32>,
) -> schema::v1::BlockRequest {
schema::v1::BlockRequest {
fields: attributes.to_be_u32(),
from_block: match from_block {
message::FromBlock::Hash(h) =>
Some(schema::v1::block_request::FromBlock::Hash(h.encode())),
message::FromBlock::Number(n) =>
Some(schema::v1::block_request::FromBlock::Number(n.encode())),
},
to_block: to_block.map(|h| h.encode()).unwrap_or_default(),
direction: match direction {
message::Direction::Ascending => schema::v1::Direction::Ascending as i32,
message::Direction::Descending => schema::v1::Direction::Descending as i32,
},
max_blocks: max_blocks.unwrap_or(0),
}
}
+12
View File
@@ -95,6 +95,18 @@ pub struct Params<B: BlockT, H: ExHashT> {
/// Registry for recording prometheus metrics to.
pub metrics_registry: Option<Registry>,
/// Request response configuration for the block request protocol.
///
/// [`RequestResponseConfig`] [`name`] is used to tag outgoing block requests with the correct
/// protocol name. In addition all of [`RequestResponseConfig`] is used to handle incoming block
/// requests, if enabled.
///
/// Can be constructed either via [`block_request_handler::generate_protocol_config`] allowing
/// outgoing but not incoming requests, or constructed via
/// [`block_request_handler::BlockRequestHandler::new`] allowing both outgoing and incoming
/// requests.
pub block_request_protocol_config: RequestResponseConfig,
}
/// Role of the local node.
+18 -4
View File
@@ -16,7 +16,9 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{config, gossip::QueuedSender, Event, NetworkService, NetworkWorker};
use crate::block_request_handler::BlockRequestHandler;
use crate::gossip::QueuedSender;
use crate::{config, Event, NetworkService, NetworkWorker};
use futures::prelude::*;
use sp_runtime::traits::{Block as BlockT, Header as _};
@@ -33,7 +35,7 @@ type TestNetworkService = NetworkService<
///
/// > **Note**: We return the events stream in order to not possibly lose events between the
/// > construction of the service and the moment the events stream is grabbed.
fn build_test_full_node(config: config::NetworkConfiguration)
fn build_test_full_node(network_config: config::NetworkConfiguration)
-> (Arc<TestNetworkService>, impl Stream<Item = Event>)
{
let client = Arc::new(
@@ -90,19 +92,31 @@ fn build_test_full_node(config: config::NetworkConfiguration)
None,
));
let protocol_id = config::ProtocolId::from("/test-protocol-name");
let block_request_protocol_config = {
let (handler, protocol_config) = BlockRequestHandler::new(
protocol_id.clone(),
client.clone(),
);
async_std::task::spawn(handler.run().boxed());
protocol_config
};
let worker = NetworkWorker::new(config::Params {
role: config::Role::Full,
executor: None,
network_config: config,
network_config,
chain: client.clone(),
on_demand: None,
transaction_pool: Arc::new(crate::config::EmptyTransactionPool),
protocol_id: config::ProtocolId::from("/test-protocol-name"),
protocol_id,
import_queue,
block_announce_validator: Box::new(
sp_consensus::block_validation::DefaultBlockAnnounceValidator,
),
metrics_registry: None,
block_request_protocol_config,
})
.unwrap();
+1 -1
View File
@@ -246,7 +246,6 @@
//!
mod behaviour;
mod block_requests;
mod chain;
mod peer_info;
mod discovery;
@@ -259,6 +258,7 @@ mod service;
mod transport;
mod utils;
pub mod block_request_handler;
pub mod config;
pub mod error;
pub mod gossip;
@@ -29,7 +29,6 @@
use bytes::Bytes;
use codec::{self, Encode, Decode};
use crate::{
block_requests::build_protobuf_block_request,
chain::Client,
config::ProtocolId,
protocol::message::{BlockAttributes, Direction, FromBlock},
@@ -1066,13 +1065,16 @@ fn retries<B: Block>(request: &Request<B>) -> usize {
fn serialize_request<B: Block>(request: &Request<B>) -> Result<Vec<u8>, prost::EncodeError> {
let request = match request {
Request::Body { request, .. } => {
let rq = build_protobuf_block_request::<_, NumberFor<B>>(
BlockAttributes::BODY,
FromBlock::Hash(request.header.hash()),
None,
Direction::Ascending,
Some(1),
);
let rq = schema::v1::BlockRequest {
fields: BlockAttributes::BODY.to_be_u32(),
from_block: Some(schema::v1::block_request::FromBlock::Hash(
request.header.hash().encode(),
)),
to_block: Default::default(),
direction: schema::v1::Direction::Ascending as i32,
max_blocks: 1,
};
let mut buf = Vec::with_capacity(rq.encoded_len());
rq.encode(&mut buf)?;
return Ok(buf);
+232 -175
View File
@@ -21,41 +21,43 @@ use crate::{
chain::Client,
config::{ProtocolId, TransactionPool, TransactionImportFuture, TransactionImport},
error,
request_responses::RequestFailure,
utils::{interval, LruHashSet},
};
use bytes::{Bytes, BytesMut};
use futures::{prelude::*, stream::FuturesUnordered};
use generic_proto::{GenericProto, GenericProtoOut};
use libp2p::{Multiaddr, PeerId};
use libp2p::core::{ConnectedPoint, connection::{ConnectionId, ListenerId}};
use libp2p::swarm::{ProtocolsHandler, IntoProtocolsHandler};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use sp_consensus::{
BlockOrigin,
block_validation::BlockAnnounceValidator,
import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin}
};
use codec::{Decode, DecodeAll, Encode};
use sp_runtime::{generic::BlockId, Justification};
use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub
};
use sp_arithmetic::traits::SaturatedConversion;
use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
use generic_proto::{GenericProto, GenericProtoOut};
use libp2p::core::{ConnectedPoint, connection::{ConnectionId, ListenerId}};
use libp2p::request_response::OutboundFailure;
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::swarm::{ProtocolsHandler, IntoProtocolsHandler};
use libp2p::{Multiaddr, PeerId};
use log::{log, Level, trace, debug, warn, error};
use message::{BlockAnnounce, Message};
use message::generic::{Message as GenericMessage, Roles};
use prometheus_endpoint::{
Registry, Gauge, Counter, GaugeVec,
PrometheusError, Opts, register, U64
};
use prost::Message as _;
use sp_consensus::{
BlockOrigin,
block_validation::BlockAnnounceValidator,
import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin}
};
use sp_runtime::{generic::BlockId, Justification};
use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub
};
use sp_arithmetic::traits::SaturatedConversion;
use sync::{ChainSync, SyncState};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry};
use std::sync::Arc;
use std::fmt::Write;
use std::{io, iter, num::NonZeroUsize, pin::Pin, task::Poll, time};
use log::{log, Level, trace, debug, warn, error};
use wasm_timer::Instant;
mod generic_proto;
@@ -65,7 +67,6 @@ pub mod sync;
pub use generic_proto::{NotificationsSink, Ready, NotifsHandlerError};
const REQUEST_TIMEOUT_SEC: u64 = 40;
/// Interval at which we perform time based maintenance
const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
/// Interval at which we propagate transactions;
@@ -95,6 +96,8 @@ mod rep {
use sc_peerset::ReputationChange as Rep;
/// Reputation change when a peer doesn't respond in time to our messages.
pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
/// Reputation change when a peer refuses a request.
pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
/// Reputation change when we are a light client and a peer is behind us.
pub const PEER_BEHIND_US_LIGHT: Rep = Rep::new(-(1 << 8), "Useless for a light peer");
/// Reputation change when a peer sends us any transaction.
@@ -110,8 +113,6 @@ mod rep {
pub const BAD_TRANSACTION: Rep = Rep::new(-(1 << 12), "Bad transaction");
/// We received a message that failed to decode.
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
/// We received an unexpected response.
pub const UNEXPECTED_RESPONSE: Rep = Rep::new_fatal("Unexpected response packet");
/// We received an unexpected transaction packet.
pub const UNEXPECTED_TRANSACTIONS: Rep = Rep::new_fatal("Unexpected transactions packet");
/// Peer has different genesis.
@@ -125,7 +126,6 @@ mod rep {
}
struct Metrics {
obsolete_requests: Gauge<U64>,
peers: Gauge<U64>,
queued_blocks: Gauge<U64>,
fork_targets: Gauge<U64>,
@@ -136,10 +136,6 @@ struct Metrics {
impl Metrics {
fn register(r: &Registry) -> Result<Self, PrometheusError> {
Ok(Metrics {
obsolete_requests: {
let g = Gauge::new("sync_obsolete_requests", "Number of obsolete requests")?;
register(g, r)?
},
peers: {
let g = Gauge::new("sync_peers", "Number of peers we sync with")?;
register(g, r)?
@@ -241,13 +237,14 @@ struct PacketStats {
}
/// Peer information
#[derive(Debug, Clone)]
#[derive(Debug)]
struct Peer<B: BlockT, H: ExHashT> {
info: PeerInfo<B>,
/// Current block request, if any.
block_request: Option<(Instant, message::BlockRequest<B>)>,
/// Requests we are no longer interested in.
obsolete_requests: HashMap<message::RequestId, Instant>,
/// Current block request, if any. Started by emitting [`CustomMessageOutcome::BlockRequest`].
block_request: Option<(
message::BlockRequest<B>,
oneshot::Receiver<Result<Vec<u8>, RequestFailure>>,
)>,
/// Holds a set of transactions known to this peer.
known_transactions: LruHashSet<H>,
/// Holds a set of blocks known to this peer.
@@ -640,8 +637,12 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
CustomMessageOutcome::None
}
fn update_peer_request(&mut self, who: &PeerId, request: &mut message::BlockRequest<B>) {
update_peer_request::<B, H>(&mut self.context_data.peers, who, request)
fn prepare_block_request(
&mut self,
who: PeerId,
request: message::BlockRequest<B>,
) -> CustomMessageOutcome<B> {
prepare_block_request::<B, H>(&mut self.context_data.peers, who, request)
}
/// Called by peer when it is disconnecting
@@ -674,52 +675,76 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
/// Must contain the same `PeerId` and request that have been emitted.
pub fn on_block_response(
&mut self,
peer: PeerId,
response: message::BlockResponse<B>,
peer_id: PeerId,
request: message::BlockRequest<B>,
response: crate::schema::v1::BlockResponse,
) -> CustomMessageOutcome<B> {
let request = if let Some(ref mut p) = self.context_data.peers.get_mut(&peer) {
if p.obsolete_requests.remove(&response.id).is_some() {
trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
let blocks = response.blocks.into_iter().map(|block_data| {
Ok(message::BlockData::<B> {
hash: Decode::decode(&mut block_data.hash.as_ref())?,
header: if !block_data.header.is_empty() {
Some(Decode::decode(&mut block_data.header.as_ref())?)
} else {
None
},
body: if request.fields.contains(message::BlockAttributes::BODY) {
Some(block_data.body.iter().map(|body| {
Decode::decode(&mut body.as_ref())
}).collect::<Result<Vec<_>, _>>()?)
} else {
None
},
receipt: if !block_data.message_queue.is_empty() {
Some(block_data.receipt)
} else {
None
},
message_queue: if !block_data.message_queue.is_empty() {
Some(block_data.message_queue)
} else {
None
},
justification: if !block_data.justification.is_empty() {
Some(block_data.justification)
} else if block_data.is_empty_justification {
Some(Vec::new())
} else {
None
},
})
}).collect::<Result<Vec<_>, codec::Error>>();
let blocks = match blocks {
Ok(blocks) => blocks,
Err(err) => {
debug!(target: "sync", "Failed to decode block response from {}: {}", peer_id, err);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
return CustomMessageOutcome::None;
}
// Clear the request. If the response is invalid peer will be disconnected anyway.
match p.block_request.take() {
Some((_, request)) if request.id == response.id => request,
Some(_) => {
trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
return CustomMessageOutcome::None;
}
None => {
trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer);
self.behaviour.disconnect_peer(&peer);
self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE);
return CustomMessageOutcome::None;
}
}
} else {
trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer);
self.behaviour.disconnect_peer(&peer);
self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE);
return CustomMessageOutcome::None;
};
let block_response = message::BlockResponse::<B> {
id: request.id,
blocks,
};
let blocks_range = || match (
response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
block_response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
trace!(target: "sync", "BlockResponse {} from {} with {} blocks {}",
response.id,
peer,
response.blocks.len(),
block_response.id,
peer_id,
block_response.blocks.len(),
blocks_range(),
);
if request.fields == message::BlockAttributes::JUSTIFICATION {
match self.sync.on_block_justification(peer, response) {
match self.sync.on_block_justification(peer_id, block_response) {
Ok(sync::OnBlockJustification::Nothing) => CustomMessageOutcome::None,
Ok(sync::OnBlockJustification::Import { peer, hash, number, justification }) =>
CustomMessageOutcome::JustificationImport(peer, hash, number, justification),
@@ -730,15 +755,11 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}
} else {
match self.sync.on_block_data(&peer, Some(request), response) {
match self.sync.on_block_data(&peer_id, Some(request), block_response) {
Ok(sync::OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(sync::OnBlockData::Request(peer, mut req)) => {
self.update_peer_request(&peer, &mut req);
CustomMessageOutcome::BlockRequest {
target: peer,
request: req,
}
Ok(sync::OnBlockData::Request(peer, req)) => {
self.prepare_block_request(peer, req)
}
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
@@ -749,52 +770,13 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}
/// Must be called in response to a [`CustomMessageOutcome::BlockRequest`] if it has failed.
pub fn on_block_request_failed(
&mut self,
peer: &PeerId,
) {
self.peerset_handle.report_peer(peer.clone(), rep::TIMEOUT);
self.behaviour.disconnect_peer(peer);
}
/// Perform time based maintenance.
///
/// > **Note**: This method normally doesn't have to be called except for testing purposes.
pub fn tick(&mut self) {
self.maintain_peers();
self.report_metrics()
}
fn maintain_peers(&mut self) {
let tick = Instant::now();
let mut aborting = Vec::new();
{
for (who, peer) in self.context_data.peers.iter() {
if peer.block_request.as_ref().map_or(false, |(t, _)| (tick - *t).as_secs() > REQUEST_TIMEOUT_SEC) {
log!(
target: "sync",
if self.important_peers.contains(who) { Level::Warn } else { Level::Trace },
"Request timeout {}", who
);
aborting.push(who.clone());
} else if peer.obsolete_requests.values().any(|t| (tick - *t).as_secs() > REQUEST_TIMEOUT_SEC) {
log!(
target: "sync",
if self.important_peers.contains(who) { Level::Warn } else { Level::Trace },
"Obsolete timeout {}", who
);
aborting.push(who.clone());
}
}
}
for p in aborting {
self.behaviour.disconnect_peer(&p);
self.peerset_handle.report_peer(p, rep::TIMEOUT);
}
}
/// Called on the first connection between two peers, after their exchange of handshake.
fn on_peer_connected(
&mut self,
@@ -870,7 +852,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
.expect("Constant is nonzero")),
next_request_id: 0,
obsolete_requests: HashMap::new(),
};
self.context_data.peers.insert(who.clone(), peer);
@@ -881,12 +862,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
if info.roles.is_full() {
match self.sync.new_peer(who.clone(), info.best_hash, info.best_number) {
Ok(None) => (),
Ok(Some(mut req)) => {
self.update_peer_request(&who, &mut req);
self.pending_messages.push_back(CustomMessageOutcome::BlockRequest {
target: who.clone(),
request: req,
});
Ok(Some(req)) => {
let event = self.prepare_block_request(who.clone(), req);
self.pending_messages.push_back(event);
},
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
@@ -1216,12 +1194,8 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
Ok(sync::OnBlockData::Import(origin, blocks)) => {
CustomMessageOutcome::BlockImport(origin, blocks)
},
Ok(sync::OnBlockData::Request(peer, mut req)) => {
self.update_peer_request(&peer, &mut req);
CustomMessageOutcome::BlockRequest {
target: peer,
request: req,
}
Ok(sync::OnBlockData::Request(peer, req)) => {
self.prepare_block_request(peer, req)
}
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
@@ -1268,12 +1242,10 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
);
for result in results {
match result {
Ok((id, mut req)) => {
update_peer_request(&mut self.context_data.peers, &id, &mut req);
self.pending_messages.push_back(CustomMessageOutcome::BlockRequest {
target: id,
request: req,
});
Ok((id, req)) => {
self.pending_messages.push_back(
prepare_block_request(&mut self.context_data.peers, id, req)
);
}
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
@@ -1316,13 +1288,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
use std::convert::TryInto;
if let Some(metrics) = &self.metrics {
let mut obsolete_requests: u64 = 0;
for peer in self.context_data.peers.values() {
let n = peer.obsolete_requests.len().try_into().unwrap_or(std::u64::MAX);
obsolete_requests = obsolete_requests.saturating_add(n);
}
metrics.obsolete_requests.set(obsolete_requests);
let n = self.context_data.peers.len().try_into().unwrap_or(std::u64::MAX);
metrics.peers.set(n);
@@ -1343,6 +1308,39 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}
fn prepare_block_request<B: BlockT, H: ExHashT>(
peers: &mut HashMap<PeerId, Peer<B, H>>,
who: PeerId,
mut request: message::BlockRequest<B>,
) -> CustomMessageOutcome<B> {
let (tx, rx) = oneshot::channel();
if let Some(ref mut peer) = peers.get_mut(&who) {
request.id = peer.next_request_id;
peer.next_request_id += 1;
peer.block_request = Some((request.clone(), rx));
}
let request = crate::schema::v1::BlockRequest {
fields: request.fields.to_be_u32(),
from_block: match request.from {
message::FromBlock::Hash(h) =>
Some(crate::schema::v1::block_request::FromBlock::Hash(h.encode())),
message::FromBlock::Number(n) =>
Some(crate::schema::v1::block_request::FromBlock::Number(n.encode())),
},
to_block: request.to.map(|h| h.encode()).unwrap_or_default(),
direction: request.direction as i32,
max_blocks: request.max.unwrap_or(0),
};
CustomMessageOutcome::BlockRequest {
target: who,
request: request,
pending_response: tx,
}
}
/// Outcome of an incoming custom message.
#[derive(Debug)]
#[must_use]
@@ -1367,33 +1365,16 @@ pub enum CustomMessageOutcome<B: BlockT> {
/// Messages have been received on one or more notifications protocols.
NotificationsReceived { remote: PeerId, messages: Vec<(Cow<'static, str>, Bytes)> },
/// A new block request must be emitted.
/// You must later call either [`Protocol::on_block_response`] or
/// [`Protocol::on_block_request_failed`].
/// Each peer can only have one active request. If a request already exists for this peer, it
/// must be silently discarded.
/// It is the responsibility of the handler to ensure that a timeout exists.
BlockRequest { target: PeerId, request: message::BlockRequest<B> },
BlockRequest {
target: PeerId,
request: crate::schema::v1::BlockRequest,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// Peer has a reported a new head of chain.
PeerNewBest(PeerId, NumberFor<B>),
None,
}
fn update_peer_request<B: BlockT, H: ExHashT>(
peers: &mut HashMap<PeerId, Peer<B, H>>,
who: &PeerId,
request: &mut message::BlockRequest<B>,
) {
if let Some(ref mut peer) = peers.get_mut(who) {
request.id = peer.next_request_id;
peer.next_request_id += 1;
if let Some((timestamp, request)) = peer.block_request.take() {
trace!(target: "sync", "Request {} for {} is now obsolete.", request.id, who);
peer.obsolete_requests.insert(request.id, timestamp);
}
peer.block_request = Some((Instant::now(), request.clone()));
}
}
impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
type ProtocolsHandler = <GenericProto as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = CustomMessageOutcome<B>;
@@ -1445,6 +1426,80 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
}
// Check for finished outgoing requests.
let mut finished_block_requests = Vec::new();
for (id, peer) in self.context_data.peers.iter_mut() {
if let Peer { block_request: Some((_, pending_response)), .. } = peer {
match pending_response.poll_unpin(cx) {
Poll::Ready(Ok(Ok(resp))) => {
let (req, _) = peer.block_request.take().unwrap();
let protobuf_response = match crate::schema::v1::BlockResponse::decode(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
trace!(target: "sync", "Failed to decode block request to peer {:?}: {:?}.", id, e);
self.peerset_handle.report_peer(id.clone(), rep::BAD_MESSAGE);
self.behaviour.disconnect_peer(id);
continue;
}
};
finished_block_requests.push((id.clone(), req, protobuf_response));
},
Poll::Ready(Ok(Err(e))) => {
peer.block_request.take();
trace!(target: "sync", "Block request to peer {:?} failed: {:?}.", id, e);
match e {
RequestFailure::Network(OutboundFailure::Timeout) => {
self.peerset_handle.report_peer(id.clone(), rep::TIMEOUT);
self.behaviour.disconnect_peer(id);
}
RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
self.peerset_handle.report_peer(id.clone(), rep::BAD_PROTOCOL);
self.behaviour.disconnect_peer(id);
}
RequestFailure::Network(OutboundFailure::DialFailure) => {
self.behaviour.disconnect_peer(id);
}
RequestFailure::Refused => {
self.peerset_handle.report_peer(id.clone(), rep::REFUSED);
self.behaviour.disconnect_peer(id);
}
RequestFailure::Network(OutboundFailure::ConnectionClosed)
| RequestFailure::NotConnected => {
self.behaviour.disconnect_peer(id);
},
RequestFailure::UnknownProtocol => {
debug_assert!(false, "Block request protocol should always be known.");
}
RequestFailure::Obsolete => {
debug_assert!(
false,
"Can not receive `RequestFailure::Obsolete` after dropping the \
response receiver.",
);
}
}
},
Poll::Ready(Err(oneshot::Canceled)) => {
peer.block_request.take();
trace!(
target: "sync",
"Block request to peer {:?} failed due to oneshot being canceled.",
id,
);
self.behaviour.disconnect_peer(id);
},
Poll::Pending => {},
}
}
}
for (id, req, protobuf_response) in finished_block_requests {
let ev = self.on_block_response(id, req, protobuf_response);
self.pending_messages.push_back(ev);
}
while let Poll::Ready(Some(())) = self.tick_timeout.poll_next_unpin(cx) {
self.tick();
}
@@ -1453,20 +1508,12 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
self.propagate_transactions();
}
for (id, mut r) in self.sync.block_requests() {
update_peer_request(&mut self.context_data.peers, &id, &mut r);
let event = CustomMessageOutcome::BlockRequest {
target: id.clone(),
request: r,
};
for (id, request) in self.sync.block_requests() {
let event = prepare_block_request(&mut self.context_data.peers, id.clone(), request);
self.pending_messages.push_back(event);
}
for (id, mut r) in self.sync.justification_requests() {
update_peer_request(&mut self.context_data.peers, &id, &mut r);
let event = CustomMessageOutcome::BlockRequest {
target: id,
request: r,
};
for (id, request) in self.sync.justification_requests() {
let event = prepare_block_request(&mut self.context_data.peers, id, request);
self.pending_messages.push_back(event);
}
if let Poll::Ready(Some((tx_hash, result))) = self.pending_transactions.poll_next_unpin(cx) {
@@ -1570,7 +1617,9 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
}
}
Some(Fallback::Transactions) => {
if let Ok(m) = message::Transactions::decode(&mut message.as_ref()) {
if let Ok(m) = <message::Transactions<B::Extrinsic> as Decode>::decode(
&mut message.as_ref(),
) {
self.on_transactions(peer_id, m);
} else {
warn!(target: "sub-libp2p", "Failed to decode transactions list");
@@ -1594,17 +1643,25 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
}
}
None => {
debug!(target: "sub-libp2p", "Received notification from unknown protocol {:?}", protocol_name);
debug!(
target: "sub-libp2p",
"Received notification from unknown protocol {:?}",
protocol_name,
);
CustomMessageOutcome::None
}
}
};
if let CustomMessageOutcome::None = outcome {
Poll::Pending
} else {
Poll::Ready(NetworkBehaviourAction::GenerateEvent(outcome))
if !matches!(outcome, CustomMessageOutcome::<B>::None) {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(outcome));
}
if let Some(message) = self.pending_messages.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
}
Poll::Pending
}
fn inject_addr_reach_failure(
+129 -47
View File
@@ -137,11 +137,17 @@ pub enum Event {
/// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or
/// failed.
///
/// This event is generated for statistics purposes.
RequestFinished {
/// Request that has succeeded.
request_id: RequestId,
/// Response sent by the remote or reason for failure.
result: Result<Vec<u8>, RequestFailure>,
/// Peer that we send a request to.
peer: PeerId,
/// Name of the protocol in question.
protocol: Cow<'static, str>,
/// Duration the request took.
duration: Duration,
/// Result of the request.
result: Result<(), RequestFailure>
},
}
@@ -155,8 +161,11 @@ pub struct RequestResponsesBehaviour {
(RequestResponse<GenericCodec>, Option<mpsc::Sender<IncomingRequest>>)
>,
/// Pending requests, passed down to a [`RequestResponse`] behaviour, awaiting a reply.
pending_requests: HashMap<RequestId, (Instant, oneshot::Sender<Result<Vec<u8>, RequestFailure>>)>,
/// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
/// response to send back to the remote.
/// start time and the response to send back to the remote.
pending_responses: stream::FuturesUnordered<
Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>
>,
@@ -203,6 +212,7 @@ impl RequestResponsesBehaviour {
Ok(Self {
protocols,
pending_requests: Default::default(),
pending_responses: Default::default(),
pending_responses_arrival_time: Default::default(),
})
@@ -212,17 +222,36 @@ impl RequestResponsesBehaviour {
///
/// An error is returned if we are not connected to the target peer or if the protocol doesn't
/// match one that has been registered.
pub fn send_request(&mut self, target: &PeerId, protocol: &str, request: Vec<u8>)
-> Result<RequestId, SendRequestError>
{
pub fn send_request(
&mut self,
target: &PeerId,
protocol: &str,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
) {
if let Some((protocol, _)) = self.protocols.get_mut(protocol) {
if protocol.is_connected(target) {
Ok(protocol.send_request(target, request))
let request_id = protocol.send_request(target, request);
self.pending_requests.insert(request_id, (Instant::now(), pending_response));
} else {
Err(SendRequestError::NotConnected)
if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
log::debug!(
target: "sub-libp2p",
"Not connected to peer {:?}. At the same time local \
node is no longer interested in the result.",
target,
);
};
}
} else {
Err(SendRequestError::UnknownProtocol)
if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
log::debug!(
target: "sub-libp2p",
"Unknown protocol {:?}. At the same time local \
node is no longer interested in the result.",
protocol,
);
};
}
}
}
@@ -440,6 +469,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
payload: request,
pending_response: tx,
});
} else {
debug_assert!(false, "Received message on outbound-only protocol.");
}
let protocol = protocol.clone();
@@ -463,29 +494,80 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
// Received a response from a remote to one of our requests.
RequestResponseEvent::Message {
peer,
message: RequestResponseMessage::Response {
request_id,
response,
},
..
} => {
let out = Event::RequestFinished {
request_id,
result: response.map_err(|()| RequestFailure::Refused),
let (started, delivered) = match self.pending_requests.remove(&request_id) {
Some((started, pending_response)) => {
let delivered = pending_response.send(
response.map_err(|()| RequestFailure::Refused),
).map_err(|_| RequestFailure::Obsolete);
(started, delivered)
}
None => {
log::warn!(
target: "sub-libp2p",
"Received `RequestResponseEvent::Message` with unexpected request id {:?}",
request_id,
);
debug_assert!(false);
continue;
}
};
let out = Event::RequestFinished {
peer,
protocol: protocol.clone(),
duration: started.elapsed(),
result: delivered,
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
// One of our requests has failed.
RequestResponseEvent::OutboundFailure {
peer,
request_id,
error,
..
} => {
let started = match self.pending_requests.remove(&request_id) {
Some((started, pending_response)) => {
if pending_response.send(
Err(RequestFailure::Network(error.clone())),
).is_err() {
log::debug!(
target: "sub-libp2p",
"Request with id {:?} failed. At the same time local \
node is no longer interested in the result.",
request_id,
);
}
started
}
None => {
log::warn!(
target: "sub-libp2p",
"Received `RequestResponseEvent::Message` with unexpected request id {:?}",
request_id,
);
debug_assert!(false);
continue;
}
};
let out = Event::RequestFinished {
request_id,
peer,
protocol: protocol.clone(),
duration: started.elapsed(),
result: Err(RequestFailure::Network(error)),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
@@ -529,21 +611,18 @@ pub enum RegisterError {
DuplicateProtocol(#[error(ignore)] Cow<'static, str>),
}
/// Error when sending a request.
/// Error in a request.
#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum SendRequestError {
pub enum RequestFailure {
/// We are not currently connected to the requested peer.
NotConnected,
/// Given protocol hasn't been registered.
UnknownProtocol,
}
/// Error in a request.
#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum RequestFailure {
/// Remote has closed the substream before answering, thereby signaling that it considers the
/// request as valid, but refused to answer it.
Refused,
/// The remote replied, but the local node is no longer interested in the response.
Obsolete,
/// Problem on the network.
#[display(fmt = "Problem on the network")]
Network(#[error(ignore)] OutboundFailure),
@@ -685,7 +764,7 @@ impl RequestResponseCodec for GenericCodec {
#[cfg(test)]
mod tests {
use futures::channel::mpsc;
use futures::channel::{mpsc, oneshot};
use futures::executor::LocalPool;
use futures::prelude::*;
use futures::task::Spawn;
@@ -771,31 +850,32 @@ mod tests {
// Remove and run the remaining swarm.
let (mut swarm, _) = swarms.remove(0);
pool.run_until(async move {
let mut sent_request_id = None;
let mut response_receiver = None;
loop {
match swarm.next_event().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
let id = swarm.send_request(
let (sender, receiver) = oneshot::channel();
swarm.send_request(
&peer_id,
protocol_name,
b"this is a request".to_vec()
).unwrap();
assert!(sent_request_id.is_none());
sent_request_id = Some(id);
b"this is a request".to_vec(),
sender,
);
assert!(response_receiver.is_none());
response_receiver = Some(receiver);
}
SwarmEvent::Behaviour(super::Event::RequestFinished {
request_id,
result,
result, ..
}) => {
assert_eq!(Some(request_id), sent_request_id);
let result = result.unwrap();
assert_eq!(result, b"this is a response");
result.unwrap();
break;
}
_ => {}
}
}
assert_eq!(response_receiver.unwrap().await.unwrap().unwrap(), b"this is a response");
});
}
@@ -875,33 +955,35 @@ mod tests {
// Remove and run the remaining swarm.
let (mut swarm, _) = swarms.remove(0);
pool.run_until(async move {
let mut sent_request_id = None;
let mut response_receiver = None;
loop {
match swarm.next_event().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
let id = swarm.send_request(
let (sender, receiver) = oneshot::channel();
swarm.send_request(
&peer_id,
protocol_name,
b"this is a request".to_vec()
).unwrap();
assert!(sent_request_id.is_none());
sent_request_id = Some(id);
b"this is a request".to_vec(),
sender,
);
assert!(response_receiver.is_none());
response_receiver = Some(receiver);
}
SwarmEvent::Behaviour(super::Event::RequestFinished {
request_id,
result,
result, ..
}) => {
assert_eq!(Some(request_id), sent_request_id);
match result {
Err(super::RequestFailure::Network(super::OutboundFailure::ConnectionClosed)) => {},
_ => panic!()
}
assert!(result.is_err());
break;
}
_ => {}
}
}
match response_receiver.unwrap().await.unwrap().unwrap_err() {
super::RequestFailure::Network(super::OutboundFailure::ConnectionClosed) => {},
_ => panic!()
}
});
}
}
+35 -84
View File
@@ -38,7 +38,7 @@ use crate::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
on_demand_layer::AlwaysBadChecker,
light_client_handler, block_requests,
light_client_handler,
protocol::{
self,
NotifsHandlerError,
@@ -94,7 +94,6 @@ use std::{
},
task::Poll,
};
use wasm_timer::Instant;
pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure};
@@ -287,10 +286,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
params.network_config.client_version,
params.network_config.node_name
);
let block_requests = {
let config = block_requests::Config::new(&params.protocol_id);
block_requests::BlockRequests::new(config, params.chain.clone())
};
let light_client_handler = {
let config = light_client_handler::Config::new(&params.protocol_id);
light_client_handler::LightClientHandler::new(
@@ -329,9 +324,9 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
params.role,
user_agent,
local_public,
block_requests,
light_client_handler,
discovery_config,
params.block_request_protocol_config,
params.network_config.request_response_protocols,
);
@@ -430,7 +425,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
peers_notifications_sinks,
metrics,
boot_node_ids,
pending_requests: HashMap::with_capacity(128),
})
}
@@ -1231,13 +1225,6 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
metrics: Option<Metrics>,
/// The `PeerId`'s of all boot nodes.
boot_node_ids: Arc<HashSet<PeerId>>,
/// Requests started using [`NetworkService::request`]. Includes the channel to send back the
/// response, when the request has started, and the name of the protocol for diagnostic
/// purposes.
pending_requests: HashMap<
behaviour::RequestId,
(oneshot::Sender<Result<Vec<u8>, RequestFailure>>, Instant, String)
>,
/// For each peer and protocol combination, an object that allows sending notifications to
/// that peer. Shared with the [`NetworkService`].
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, Cow<'static, str>), NotificationsSink>>>,
@@ -1310,29 +1297,7 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
ServiceToWorkerMsg::EventStream(sender) =>
this.event_streams.push(sender),
ServiceToWorkerMsg::Request { target, protocol, request, pending_response } => {
// Calling `send_request` can fail immediately in some circumstances.
// This is handled by sending back an error on the channel.
match this.network_service.send_request(&target, &protocol, request) {
Ok(request_id) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.requests_out_started_total
.with_label_values(&[&protocol])
.inc();
}
this.pending_requests.insert(
request_id,
(pending_response, Instant::now(), protocol.to_string())
);
},
Err(behaviour::SendRequestError::NotConnected) => {
let err = RequestFailure::Network(OutboundFailure::ConnectionClosed);
let _ = pending_response.send(Err(err));
},
Err(behaviour::SendRequestError::UnknownProtocol) => {
let err = RequestFailure::Network(OutboundFailure::UnsupportedProtocols);
let _ = pending_response.send(Err(err));
},
}
this.network_service.send_request(&target, &protocol, request, pending_response);
},
ServiceToWorkerMsg::DisconnectPeer(who) =>
this.network_service.user_protocol_mut().disconnect_peer(&who),
@@ -1396,51 +1361,37 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
}
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { request_id, result })) => {
if let Some((send_back, started, protocol)) = this.pending_requests.remove(&request_id) {
if let Some(metrics) = this.metrics.as_ref() {
match &result {
Ok(_) => {
metrics.requests_out_success_total
.with_label_values(&[&protocol])
.observe(started.elapsed().as_secs_f64());
}
Err(err) => {
let reason = match err {
RequestFailure::Refused => "refused",
RequestFailure::Network(OutboundFailure::DialFailure) =>
"dial-failure",
RequestFailure::Network(OutboundFailure::Timeout) =>
"timeout",
RequestFailure::Network(OutboundFailure::ConnectionClosed) =>
"connection-closed",
RequestFailure::Network(OutboundFailure::UnsupportedProtocols) =>
"unsupported",
};
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
protocol, duration, result, ..
})) => {
if let Some(metrics) = this.metrics.as_ref() {
match result {
Ok(_) => {
metrics.requests_out_success_total
.with_label_values(&[&protocol])
.observe(duration.as_secs_f64());
}
Err(err) => {
let reason = match err {
RequestFailure::NotConnected => "not-connected",
RequestFailure::UnknownProtocol => "unknown-protocol",
RequestFailure::Refused => "refused",
RequestFailure::Obsolete => "obsolete",
RequestFailure::Network(OutboundFailure::DialFailure) =>
"dial-failure",
RequestFailure::Network(OutboundFailure::Timeout) =>
"timeout",
RequestFailure::Network(OutboundFailure::ConnectionClosed) =>
"connection-closed",
RequestFailure::Network(OutboundFailure::UnsupportedProtocols) =>
"unsupported",
};
metrics.requests_out_failure_total
.with_label_values(&[&protocol, reason])
.inc();
}
metrics.requests_out_failure_total
.with_label_values(&[&protocol, reason])
.inc();
}
}
let _ = send_back.send(result);
} else {
error!("Request not in pending_requests");
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::OpaqueRequestStarted { protocol, .. })) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.requests_out_started_total
.with_label_values(&[&protocol])
.inc();
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::OpaqueRequestFinished { protocol, request_duration, .. })) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.requests_out_success_total
.with_label_values(&[&protocol])
.observe(request_duration.as_secs_f64());
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted(protocol))) => {
@@ -1567,11 +1518,11 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
let reason = match cause {
Some(ConnectionError::IO(_)) => "transport-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::B(
EitherError::A(PingFailure::Timeout))))))))) => "ping-timeout",
EitherError::A(EitherError::B(EitherError::A(
PingFailure::Timeout)))))))) => "ping-timeout",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(EitherError::A(
NotifsHandlerError::SyncNotificationsClogged)))))))) => "sync-notifications-clogged",
EitherError::A(EitherError::A(
NotifsHandlerError::SyncNotificationsClogged))))))) => "sync-notifications-clogged",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) => "protocol-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout)) => "keep-alive-timeout",
None => "actively-closed",
@@ -78,7 +78,6 @@ pub struct Metrics {
pub requests_in_success_total: HistogramVec,
pub requests_out_failure_total: CounterVec<U64>,
pub requests_out_success_total: HistogramVec,
pub requests_out_started_total: CounterVec<U64>,
}
impl Metrics {
@@ -230,7 +229,8 @@ impl Metrics {
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_requests_in_success_total",
"Total number of requests received and answered"
"For successful incoming requests, time between receiving the request and \
starting to send the response"
),
buckets: prometheus::exponential_buckets(0.001, 2.0, 16)
.expect("parameters are always valid values; qed"),
@@ -248,20 +248,13 @@ impl Metrics {
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_requests_out_success_total",
"For successful requests, time between a request's start and finish"
"For successful outgoing requests, time between a request's start and finish"
),
buckets: prometheus::exponential_buckets(0.001, 2.0, 16)
.expect("parameters are always valid values; qed"),
},
&["protocol"]
)?, registry)?,
requests_out_started_total: prometheus::register(CounterVec::new(
Opts::new(
"sub_libp2p_requests_out_started_total",
"Total number of requests emitted"
),
&["protocol"]
)?, registry)?,
})
}
}
+14 -1
View File
@@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{config, Event, NetworkService, NetworkWorker};
use crate::block_request_handler::BlockRequestHandler;
use libp2p::PeerId;
use futures::prelude::*;
@@ -91,6 +92,17 @@ fn build_test_full_node(config: config::NetworkConfiguration)
None,
));
let protocol_id = config::ProtocolId::from("/test-protocol-name");
let block_request_protocol_config = {
let (handler, protocol_config) = BlockRequestHandler::new(
protocol_id.clone(),
client.clone(),
);
async_std::task::spawn(handler.run().boxed());
protocol_config
};
let worker = NetworkWorker::new(config::Params {
role: config::Role::Full,
executor: None,
@@ -98,12 +110,13 @@ fn build_test_full_node(config: config::NetworkConfiguration)
chain: client.clone(),
on_demand: None,
transaction_pool: Arc::new(crate::config::EmptyTransactionPool),
protocol_id: config::ProtocolId::from("/test-protocol-name"),
protocol_id,
import_queue,
block_announce_validator: Box::new(
sp_consensus::block_validation::DefaultBlockAnnounceValidator,
),
metrics_registry: None,
block_request_protocol_config,
})
.unwrap();