// Copyright 2020 Parity Technologies (UK) Ltd. // This file is part of Substrate. // // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . //! `NetworkBehaviour` implementation which handles incoming block requests. //! //! Every request is coming in on a separate connection substream which gets //! closed after we have sent the response back. Incoming requests are encoded //! as protocol buffers (cf. `api.v1.proto`). #![allow(unused)] use bytes::Bytes; use codec::{Encode, Decode}; use crate::{ chain::Client, config::ProtocolId, protocol::{message::{self, BlockAttributes}}, schema, }; use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered}; use futures_timer::Delay; use libp2p::{ core::{ ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId, upgrade::{InboundUpgrade, OutboundUpgrade, ReadOneError, UpgradeInfo, Negotiated}, upgrade::{DeniedUpgrade, read_one, write_one} }, swarm::{ NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, OneShotHandler, OneShotHandlerConfig, PollParameters, SubstreamProtocol } }; use prost::Message; use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}}; use std::{ cmp::min, collections::{HashMap, VecDeque}, io, iter, marker::PhantomData, pin::Pin, sync::Arc, time::Duration, task::{Context, Poll} }; use void::{Void, unreachable}; use wasm_timer::Instant; // Type alias for convenience. pub type Error = Box; /// Event generated by the block requests behaviour. #[derive(Debug)] pub enum Event { /// A request came and we have successfully answered it. AnsweredRequest { /// Peer which has emitted the request. peer: PeerId, /// Time elapsed between when we received the request and when we sent back the response. total_handling_time: Duration, }, /// A response to a block request has arrived. Response { peer: PeerId, response: message::BlockResponse, /// Time elapsed between the start of the request and the response. request_duration: Duration, }, /// A request has been cancelled because the peer has disconnected. /// Disconnects can also happen as a result of violating the network protocol. /// /// > **Note**: This event is NOT emitted if a request is overridden by calling `send_request`. /// > For that, you must check the value returned by `send_request`. RequestCancelled { peer: PeerId, /// Time elapsed between the start of the request and the cancellation. request_duration: Duration, }, /// A request has timed out. RequestTimeout { peer: PeerId, /// Time elapsed between the start of the request and the timeout. request_duration: Duration, } } /// Configuration options for `BlockRequests`. #[derive(Debug, Clone)] pub struct Config { max_block_data_response: u32, max_block_body_bytes: usize, max_request_len: usize, max_response_len: usize, inactivity_timeout: Duration, request_timeout: Duration, protocol: String, } impl Config { /// Create a fresh configuration with the following options: /// /// - max. block data in response = 128 /// - max. request size = 1 MiB /// - max. response size = 16 MiB /// - inactivity timeout = 15s /// - request timeout = 40s pub fn new(id: &ProtocolId) -> Self { let mut c = Config { max_block_data_response: 128, max_block_body_bytes: 8 * 1024 * 1024, max_request_len: 1024 * 1024, max_response_len: 16 * 1024 * 1024, inactivity_timeout: Duration::from_secs(15), request_timeout: Duration::from_secs(40), protocol: String::new(), }; c.set_protocol(id); c } /// Limit the max. number of block data in a response. pub fn set_max_block_data_response(&mut self, v: u32) -> &mut Self { self.max_block_data_response = v; self } /// Limit the max. length of incoming block request bytes. pub fn set_max_request_len(&mut self, v: usize) -> &mut Self { self.max_request_len = v; self } /// Limit the max. size of responses to our block requests. pub fn set_max_response_len(&mut self, v: usize) -> &mut Self { self.max_response_len = v; self } /// Limit the max. duration the substream may remain inactive before closing it. pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self { self.inactivity_timeout = v; self } /// Set the maximum total bytes of block bodies that are send in the response. /// Note that at least one block is always sent regardless of the limit. /// This should be lower than the value specified in `set_max_response_len` /// accounting for headers, justifications and encoding overhead. pub fn set_max_block_body_bytes(&mut self, v: usize) -> &mut Self { self.max_block_body_bytes = v; self } /// Set protocol to use for upgrade negotiation. pub fn set_protocol(&mut self, id: &ProtocolId) -> &mut Self { let mut s = String::new(); s.push_str("/"); s.push_str(id.as_ref()); s.push_str("/sync/2"); self.protocol = s; self } } /// The block request handling behaviour. pub struct BlockRequests { /// This behaviour's configuration. config: Config, /// Blockchain client. chain: Arc>, /// List of all active connections and the requests we've sent. peers: HashMap>>, /// Futures sending back the block request response. Returns the `PeerId` we sent back to, and /// the total time the handling of this request took. outgoing: FuturesUnordered>, /// Events to return as soon as possible from `poll`. pending_events: VecDeque, Event>>, } /// Local tracking of a libp2p connection. #[derive(Debug)] struct Connection { id: ConnectionId, ongoing_request: Option>, } #[derive(Debug)] struct OngoingRequest { /// `Instant` when the request has been emitted. Used for diagnostic purposes. emitted: Instant, request: message::BlockRequest, timeout: Delay, } /// Outcome of calling `send_request`. #[derive(Debug)] #[must_use] pub enum SendRequestOutcome { /// Request has been emitted. Ok, /// The request has been emitted and has replaced an existing request. Replaced { /// The previously-emitted request. previous: message::BlockRequest, /// Time that had elapsed since `previous` has been emitted. request_duration: Duration, }, /// Didn't start a request because we have no connection to this node. /// If `send_request` returns that, it is as if the function had never been called. NotConnected, /// Error while serializing the request. EncodeError(prost::EncodeError), } impl BlockRequests where B: Block, { pub fn new(cfg: Config, chain: Arc>) -> Self { BlockRequests { config: cfg, chain, peers: HashMap::new(), outgoing: FuturesUnordered::new(), pending_events: VecDeque::new(), } } /// Returns the libp2p protocol name used on the wire (e.g. `/foo/sync/2`). pub fn protocol_name(&self) -> &str { &self.config.protocol } /// Issue a new block request. /// /// Cancels any existing request targeting the same `PeerId`. /// /// If the response doesn't arrive in time, or if the remote answers improperly, the target /// will be disconnected. pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest) -> SendRequestOutcome { // Determine which connection to send the request to. let connection = if let Some(peer) = self.peers.get_mut(target) { // We don't want to have multiple requests for any given node, so in priority try to // find a connection with an existing request, to override it. if let Some(entry) = peer.iter_mut().find(|c| c.ongoing_request.is_some()) { entry } else if let Some(entry) = peer.get_mut(0) { entry } else { log::error!( target: "sync", "State inconsistency: empty list of peer connections" ); return SendRequestOutcome::NotConnected; } } else { return SendRequestOutcome::NotConnected; }; let protobuf_rq = build_protobuf_block_request( req.fields, req.from.clone(), req.to.clone(), req.direction, req.max, ); let mut buf = Vec::with_capacity(protobuf_rq.encoded_len()); if let Err(err) = protobuf_rq.encode(&mut buf) { log::warn!( target: "sync", "Failed to encode block request {:?}: {:?}", protobuf_rq, err ); return SendRequestOutcome::EncodeError(err); } let previous_request = connection.ongoing_request.take(); connection.ongoing_request = Some(OngoingRequest { emitted: Instant::now(), request: req.clone(), timeout: Delay::new(self.config.request_timeout), }); log::trace!(target: "sync", "Enqueueing block request to {:?}: {:?}", target, protobuf_rq); self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: target.clone(), handler: NotifyHandler::One(connection.id), event: OutboundProtocol { request: buf, original_request: req, max_response_size: self.config.max_response_len, protocol: self.config.protocol.as_bytes().to_vec().into(), }, }); if let Some(previous_request) = previous_request { log::debug!( target: "sync", "Replacing existing block request on connection {:?}", connection.id ); SendRequestOutcome::Replaced { previous: previous_request.request, request_duration: previous_request.emitted.elapsed(), } } else { SendRequestOutcome::Ok } } /// Callback, invoked when a new block request has been received from remote. fn on_block_request ( &mut self , peer: &PeerId , request: &schema::v1::BlockRequest ) -> Result { log::trace!( target: "sync", "Block request from peer {}: from block {:?} to block {:?}, max blocks {:?}", peer, request.from_block, request.to_block, request.max_blocks); let from_block_id = match request.from_block { Some(schema::v1::block_request::FromBlock::Hash(ref h)) => { let h = Decode::decode(&mut h.as_ref())?; BlockId::::Hash(h) } Some(schema::v1::block_request::FromBlock::Number(ref n)) => { let n = Decode::decode(&mut n.as_ref())?; BlockId::::Number(n) } None => { let msg = "missing `BlockRequest::from_block` field"; return Err(io::Error::new(io::ErrorKind::Other, msg).into()) } }; let max_blocks = if request.max_blocks == 0 { self.config.max_block_data_response } else { min(request.max_blocks, self.config.max_block_data_response) }; let direction = if request.direction == schema::v1::Direction::Ascending as i32 { schema::v1::Direction::Ascending } else if request.direction == schema::v1::Direction::Descending as i32 { schema::v1::Direction::Descending } else { let msg = format!("invalid `BlockRequest::direction` value: {}", request.direction); return Err(io::Error::new(io::ErrorKind::Other, msg).into()) }; let attributes = BlockAttributes::from_be_u32(request.fields)?; let get_header = attributes.contains(BlockAttributes::HEADER); let get_body = attributes.contains(BlockAttributes::BODY); let get_justification = attributes.contains(BlockAttributes::JUSTIFICATION); let mut blocks = Vec::new(); let mut block_id = from_block_id; let mut total_size = 0; while let Some(header) = self.chain.header(block_id).unwrap_or(None) { if blocks.len() >= max_blocks as usize || (blocks.len() >= 1 && total_size > self.config.max_block_body_bytes) { break } let number = *header.number(); let hash = header.hash(); let parent_hash = *header.parent_hash(); let justification = if get_justification { self.chain.justification(&BlockId::Hash(hash))? } else { None }; let is_empty_justification = justification.as_ref().map(|j| j.is_empty()).unwrap_or(false); let body = if get_body { match self.chain.block_body(&BlockId::Hash(hash))? { Some(mut extrinsics) => extrinsics.iter_mut() .map(|extrinsic| extrinsic.encode()) .collect(), None => { log::trace!(target: "sync", "Missing data for block request."); break; } } } else { Vec::new() }; let block_data = schema::v1::BlockData { hash: hash.encode(), header: if get_header { header.encode() } else { Vec::new() }, body, receipt: Vec::new(), message_queue: Vec::new(), justification: justification.unwrap_or_default(), is_empty_justification, }; total_size += block_data.body.len(); blocks.push(block_data); match direction { schema::v1::Direction::Ascending => { block_id = BlockId::Number(number + One::one()) } schema::v1::Direction::Descending => { if number.is_zero() { break } block_id = BlockId::Hash(parent_hash) } } } Ok(schema::v1::BlockResponse { blocks }) } } impl NetworkBehaviour for BlockRequests where B: Block { type ProtocolsHandler = OneShotHandler, OutboundProtocol, NodeEvent>; type OutEvent = Event; fn new_handler(&mut self) -> Self::ProtocolsHandler { let p = InboundProtocol { max_request_len: self.config.max_request_len, protocol: self.config.protocol.as_bytes().to_owned().into(), marker: PhantomData, }; let mut cfg = OneShotHandlerConfig::default(); cfg.keep_alive_timeout = self.config.inactivity_timeout; cfg.outbound_substream_timeout = self.config.request_timeout; OneShotHandler::new(SubstreamProtocol::new(p, ()), cfg) } fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { Vec::new() } fn inject_connected(&mut self, _peer: &PeerId) { } fn inject_disconnected(&mut self, _peer: &PeerId) { } fn inject_connection_established(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) { self.peers.entry(peer_id.clone()) .or_default() .push(Connection { id: *id, ongoing_request: None, }); } fn inject_connection_closed(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) { let mut needs_remove = false; if let Some(entry) = self.peers.get_mut(peer_id) { if let Some(pos) = entry.iter().position(|i| i.id == *id) { let ongoing_request = entry.remove(pos).ongoing_request; if let Some(ongoing_request) = ongoing_request { log::debug!( target: "sync", "Connection {:?} with {} closed with ongoing sync request: {:?}", id, peer_id, ongoing_request ); let ev = Event::RequestCancelled { peer: peer_id.clone(), request_duration: ongoing_request.emitted.elapsed(), }; self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); } if entry.is_empty() { needs_remove = true; } } else { log::error!( target: "sync", "State inconsistency: connection id not found in list" ); } } else { log::error!( target: "sync", "State inconsistency: peer_id not found in list of connections" ); } if needs_remove { self.peers.remove(peer_id); } } fn inject_event( &mut self, peer: PeerId, connection_id: ConnectionId, node_event: NodeEvent ) { match node_event { NodeEvent::Request(request, mut stream, handling_start) => { match self.on_block_request(&peer, &request) { Ok(res) => { log::trace!( target: "sync", "Enqueueing block response for peer {} with {} blocks", peer, res.blocks.len() ); let mut data = Vec::with_capacity(res.encoded_len()); if let Err(e) = res.encode(&mut data) { log::debug!( target: "sync", "Error encoding block response for peer {}: {}", peer, e ) } else { self.outgoing.push(async move { if let Err(e) = write_one(&mut stream, data).await { log::debug!( target: "sync", "Error writing block response: {}", e ); } (peer, handling_start.elapsed()) }.boxed()); } } Err(e) => log::debug!( target: "sync", "Error handling block request from peer {}: {}", peer, e ) } } NodeEvent::Response(original_request, response) => { log::trace!( target: "sync", "Received block response from peer {} with {} blocks", peer, response.blocks.len() ); let request_duration = if let Some(connections) = self.peers.get_mut(&peer) { if let Some(connection) = connections.iter_mut().find(|c| c.id == connection_id) { if let Some(ongoing_request) = &mut connection.ongoing_request { if ongoing_request.request == original_request { let request_duration = ongoing_request.emitted.elapsed(); connection.ongoing_request = None; request_duration } else { // We're no longer interested in that request. log::debug!( target: "sync", "Received response from {} to obsolete block request {:?}", peer, original_request ); return; } } else { // We remove from `self.peers` requests we're no longer interested in, // so this can legitimately happen. log::trace!( target: "sync", "Response discarded because it concerns an obsolete request" ); return; } } else { log::error!( target: "sync", "State inconsistency: response on non-existing connection {:?}", connection_id ); return; } } else { log::error!( target: "sync", "State inconsistency: response on non-connected peer {}", peer ); return; }; let blocks = response.blocks.into_iter().map(|block_data| { Ok(message::BlockData:: { hash: Decode::decode(&mut block_data.hash.as_ref())?, header: if !block_data.header.is_empty() { Some(Decode::decode(&mut block_data.header.as_ref())?) } else { None }, body: if original_request.fields.contains(message::BlockAttributes::BODY) { Some(block_data.body.iter().map(|body| { Decode::decode(&mut body.as_ref()) }).collect::, _>>()?) } else { None }, receipt: if !block_data.message_queue.is_empty() { Some(block_data.receipt) } else { None }, message_queue: if !block_data.message_queue.is_empty() { Some(block_data.message_queue) } else { None }, justification: if !block_data.justification.is_empty() { Some(block_data.justification) } else if block_data.is_empty_justification { Some(Vec::new()) } else { None }, }) }).collect::, codec::Error>>(); match blocks { Ok(blocks) => { let id = original_request.id; let ev = Event::Response { peer, response: message::BlockResponse:: { id, blocks }, request_duration, }; self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); } Err(err) => { log::debug!( target: "sync", "Failed to decode block response from peer {}: {}", peer, err ); } } } } } fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters) -> Poll, Event>> { if let Some(ev) = self.pending_events.pop_front() { return Poll::Ready(ev); } // Check the request timeouts. for (peer, connections) in &mut self.peers { for connection in connections { let ongoing_request = match &mut connection.ongoing_request { Some(rq) => rq, None => continue, }; if let Poll::Ready(_) = Pin::new(&mut ongoing_request.timeout).poll(cx) { let original_request = ongoing_request.request.clone(); let request_duration = ongoing_request.emitted.elapsed(); connection.ongoing_request = None; log::debug!( target: "sync", "Request timeout for {}: {:?}", peer, original_request ); let ev = Event::RequestTimeout { peer: peer.clone(), request_duration, }; return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); } } } if let Poll::Ready(Some((peer, total_handling_time))) = self.outgoing.poll_next_unpin(cx) { let ev = Event::AnsweredRequest { peer, total_handling_time, }; return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); } Poll::Pending } } /// Output type of inbound and outbound substream upgrades. #[derive(Debug)] pub enum NodeEvent { /// Incoming request from remote, substream to use for the response, and when we started /// handling this request. Request(schema::v1::BlockRequest, T, Instant), /// Incoming response from remote. Response(message::BlockRequest, schema::v1::BlockResponse), } /// Substream upgrade protocol. /// /// We attempt to parse an incoming protobuf encoded request (cf. `Request`) /// which will be handled by the `BlockRequests` behaviour, i.e. the request /// will become visible via `inject_node_event` which then dispatches to the /// relevant callback to process the message and prepare a response. #[derive(Debug, Clone)] pub struct InboundProtocol { /// The max. request length in bytes. max_request_len: usize, /// The protocol to use during upgrade negotiation. protocol: Bytes, /// Type of the block. marker: PhantomData, } impl UpgradeInfo for InboundProtocol { type Info = Bytes; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { iter::once(self.protocol.clone()) } } impl InboundUpgrade for InboundProtocol where B: Block, T: AsyncRead + AsyncWrite + Unpin + Send + 'static { type Output = NodeEvent; type Error = ReadOneError; type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future { // This `Instant` will be passed around until the processing of this request is done. let handling_start = Instant::now(); let future = async move { let len = self.max_request_len; let vec = read_one(&mut s, len).await?; match schema::v1::BlockRequest::decode(&vec[..]) { Ok(r) => Ok(NodeEvent::Request(r, s, handling_start)), Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))) } }; future.boxed() } } /// Substream upgrade protocol. /// /// Sends a request to remote and awaits the response. #[derive(Debug, Clone)] pub struct OutboundProtocol { /// The serialized protobuf request. request: Vec, /// The original request. Passed back through the API when the response comes back. original_request: message::BlockRequest, /// The max. response length in bytes. max_response_size: usize, /// The protocol to use for upgrade negotiation. protocol: Bytes, } impl UpgradeInfo for OutboundProtocol { type Info = Bytes; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { iter::once(self.protocol.clone()) } } impl OutboundUpgrade for OutboundProtocol where B: Block, T: AsyncRead + AsyncWrite + Unpin + Send + 'static { type Output = NodeEvent; type Error = ReadOneError; type Future = BoxFuture<'static, Result>; fn upgrade_outbound(self, mut s: T, _: Self::Info) -> Self::Future { async move { write_one(&mut s, &self.request).await?; let vec = read_one(&mut s, self.max_response_size).await?; schema::v1::BlockResponse::decode(&vec[..]) .map(|r| NodeEvent::Response(self.original_request, r)) .map_err(|e| { ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)) }) }.boxed() } } /// Build protobuf block request message. pub(crate) fn build_protobuf_block_request( attributes: BlockAttributes, from_block: message::FromBlock, to_block: Option, direction: message::Direction, max_blocks: Option, ) -> schema::v1::BlockRequest { schema::v1::BlockRequest { fields: attributes.to_be_u32(), from_block: match from_block { message::FromBlock::Hash(h) => Some(schema::v1::block_request::FromBlock::Hash(h.encode())), message::FromBlock::Number(n) => Some(schema::v1::block_request::FromBlock::Number(n.encode())), }, to_block: to_block.map(|h| h.encode()).unwrap_or_default(), direction: match direction { message::Direction::Ascending => schema::v1::Direction::Ascending as i32, message::Direction::Descending => schema::v1::Direction::Descending as i32, }, max_blocks: max_blocks.unwrap_or(0), } }