mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 19:17:58 +00:00
Move around stuff in sc_network (#5847)
This commit is contained in:
@@ -1,819 +0,0 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
//
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! `NetworkBehaviour` implementation which handles incoming block requests.
|
||||
//!
|
||||
//! Every request is coming in on a separate connection substream which gets
|
||||
//! closed after we have sent the response back. Incoming requests are encoded
|
||||
//! as protocol buffers (cf. `api.v1.proto`).
|
||||
|
||||
#![allow(unused)]
|
||||
|
||||
use bytes::Bytes;
|
||||
use codec::{Encode, Decode};
|
||||
use crate::{
|
||||
chain::Client,
|
||||
config::ProtocolId,
|
||||
protocol::{api, message::{self, BlockAttributes}}
|
||||
};
|
||||
use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
|
||||
use futures_timer::Delay;
|
||||
use libp2p::{
|
||||
core::{
|
||||
ConnectedPoint,
|
||||
Multiaddr,
|
||||
PeerId,
|
||||
connection::ConnectionId,
|
||||
upgrade::{InboundUpgrade, OutboundUpgrade, ReadOneError, UpgradeInfo, Negotiated},
|
||||
upgrade::{DeniedUpgrade, read_one, write_one}
|
||||
},
|
||||
swarm::{
|
||||
NegotiatedSubstream,
|
||||
NetworkBehaviour,
|
||||
NetworkBehaviourAction,
|
||||
NotifyHandler,
|
||||
OneShotHandler,
|
||||
OneShotHandlerConfig,
|
||||
PollParameters,
|
||||
SubstreamProtocol
|
||||
}
|
||||
};
|
||||
use prost::Message;
|
||||
use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}};
|
||||
use std::{
|
||||
cmp::min,
|
||||
collections::{HashMap, VecDeque},
|
||||
io,
|
||||
iter,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
task::{Context, Poll}
|
||||
};
|
||||
use void::{Void, unreachable};
|
||||
use wasm_timer::Instant;
|
||||
|
||||
// Type alias for convenience.
|
||||
pub type Error = Box<dyn std::error::Error + 'static>;
|
||||
|
||||
/// Event generated by the block requests behaviour.
|
||||
#[derive(Debug)]
|
||||
pub enum Event<B: Block> {
|
||||
/// A request came and we answered it.
|
||||
AnsweredRequest {
|
||||
/// Peer which has emitted the request.
|
||||
peer: PeerId,
|
||||
/// Time it took to compute the response.
|
||||
response_build_time: Duration,
|
||||
},
|
||||
|
||||
/// A response to a block request has arrived.
|
||||
Response {
|
||||
peer: PeerId,
|
||||
/// The original request passed to `send_request`.
|
||||
original_request: message::BlockRequest<B>,
|
||||
response: message::BlockResponse<B>,
|
||||
/// Time elapsed between the start of the request and the response.
|
||||
request_duration: Duration,
|
||||
},
|
||||
|
||||
/// A request has been cancelled because the peer has disconnected.
|
||||
/// Disconnects can also happen as a result of violating the network protocol.
|
||||
///
|
||||
/// > **Note**: This event is NOT emitted if a request is overridden by calling `send_request`.
|
||||
/// > For that, you must check the value returned by `send_request`.
|
||||
RequestCancelled {
|
||||
peer: PeerId,
|
||||
/// The original request passed to `send_request`.
|
||||
original_request: message::BlockRequest<B>,
|
||||
/// Time elapsed between the start of the request and the cancellation.
|
||||
request_duration: Duration,
|
||||
},
|
||||
|
||||
/// A request has timed out.
|
||||
RequestTimeout {
|
||||
peer: PeerId,
|
||||
/// The original request passed to `send_request`.
|
||||
original_request: message::BlockRequest<B>,
|
||||
/// Time elapsed between the start of the request and the timeout.
|
||||
request_duration: Duration,
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration options for `BlockRequests`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Config {
|
||||
max_block_data_response: u32,
|
||||
max_request_len: usize,
|
||||
max_response_len: usize,
|
||||
inactivity_timeout: Duration,
|
||||
request_timeout: Duration,
|
||||
protocol: Bytes,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
/// Create a fresh configuration with the following options:
|
||||
///
|
||||
/// - max. block data in response = 128
|
||||
/// - max. request size = 1 MiB
|
||||
/// - max. response size = 16 MiB
|
||||
/// - inactivity timeout = 15s
|
||||
/// - request timeout = 40s
|
||||
pub fn new(id: &ProtocolId) -> Self {
|
||||
let mut c = Config {
|
||||
max_block_data_response: 128,
|
||||
max_request_len: 1024 * 1024,
|
||||
max_response_len: 16 * 1024 * 1024,
|
||||
inactivity_timeout: Duration::from_secs(15),
|
||||
request_timeout: Duration::from_secs(40),
|
||||
protocol: Bytes::new(),
|
||||
};
|
||||
c.set_protocol(id);
|
||||
c
|
||||
}
|
||||
|
||||
/// Limit the max. number of block data in a response.
|
||||
pub fn set_max_block_data_response(&mut self, v: u32) -> &mut Self {
|
||||
self.max_block_data_response = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the max. length of incoming block request bytes.
|
||||
pub fn set_max_request_len(&mut self, v: usize) -> &mut Self {
|
||||
self.max_request_len = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the max. size of responses to our block requests.
|
||||
pub fn set_max_response_len(&mut self, v: usize) -> &mut Self {
|
||||
self.max_response_len = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the max. duration the substream may remain inactive before closing it.
|
||||
pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self {
|
||||
self.inactivity_timeout = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set protocol to use for upgrade negotiation.
|
||||
pub fn set_protocol(&mut self, id: &ProtocolId) -> &mut Self {
|
||||
let mut v = Vec::new();
|
||||
v.extend_from_slice(b"/");
|
||||
v.extend_from_slice(id.as_bytes());
|
||||
v.extend_from_slice(b"/sync/2");
|
||||
self.protocol = v.into();
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// The block request handling behaviour.
|
||||
pub struct BlockRequests<B: Block> {
|
||||
/// This behaviour's configuration.
|
||||
config: Config,
|
||||
/// Blockchain client.
|
||||
chain: Arc<dyn Client<B>>,
|
||||
/// List of all active connections and the requests we've sent.
|
||||
peers: HashMap<PeerId, Vec<Connection<B>>>,
|
||||
/// Futures sending back the block request response.
|
||||
outgoing: FuturesUnordered<BoxFuture<'static, ()>>,
|
||||
/// Events to return as soon as possible from `poll`.
|
||||
pending_events: VecDeque<NetworkBehaviourAction<OutboundProtocol<B>, Event<B>>>,
|
||||
}
|
||||
|
||||
/// Local tracking of a libp2p connection.
|
||||
#[derive(Debug)]
|
||||
struct Connection<B: Block> {
|
||||
id: ConnectionId,
|
||||
ongoing_request: Option<OngoingRequest<B>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct OngoingRequest<B: Block> {
|
||||
/// `Instant` when the request has been emitted. Used for diagnostic purposes.
|
||||
emitted: Instant,
|
||||
request: message::BlockRequest<B>,
|
||||
timeout: Delay,
|
||||
}
|
||||
|
||||
/// Outcome of calling `send_request`.
|
||||
#[derive(Debug)]
|
||||
#[must_use]
|
||||
pub enum SendRequestOutcome<B: Block> {
|
||||
/// Request has been emitted.
|
||||
Ok,
|
||||
/// The request has been emitted and has replaced an existing request.
|
||||
Replaced {
|
||||
/// The previously-emitted request.
|
||||
previous: message::BlockRequest<B>,
|
||||
/// Time that had elapsed since `previous` has been emitted.
|
||||
request_duration: Duration,
|
||||
},
|
||||
/// Didn't start a request because we have no connection to this node.
|
||||
/// If `send_request` returns that, it is as if the function had never been called.
|
||||
NotConnected,
|
||||
/// Error while serializing the request.
|
||||
EncodeError(prost::EncodeError),
|
||||
}
|
||||
|
||||
impl<B> BlockRequests<B>
|
||||
where
|
||||
B: Block,
|
||||
{
|
||||
pub fn new(cfg: Config, chain: Arc<dyn Client<B>>) -> Self {
|
||||
BlockRequests {
|
||||
config: cfg,
|
||||
chain,
|
||||
peers: HashMap::new(),
|
||||
outgoing: FuturesUnordered::new(),
|
||||
pending_events: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the libp2p protocol name used on the wire (e.g. `/foo/sync/2`).
|
||||
pub fn protocol_name(&self) -> &[u8] {
|
||||
&self.config.protocol
|
||||
}
|
||||
|
||||
/// Issue a new block request.
|
||||
///
|
||||
/// Cancels any existing request targeting the same `PeerId`.
|
||||
///
|
||||
/// If the response doesn't arrive in time, or if the remote answers improperly, the target
|
||||
/// will be disconnected.
|
||||
pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest<B>) -> SendRequestOutcome<B> {
|
||||
// Determine which connection to send the request to.
|
||||
let connection = if let Some(peer) = self.peers.get_mut(target) {
|
||||
// We don't want to have multiple requests for any given node, so in priority try to
|
||||
// find a connection with an existing request, to override it.
|
||||
if let Some(entry) = peer.iter_mut().find(|c| c.ongoing_request.is_some()) {
|
||||
entry
|
||||
} else if let Some(entry) = peer.get_mut(0) {
|
||||
entry
|
||||
} else {
|
||||
log::error!(
|
||||
target: "sync",
|
||||
"State inconsistency: empty list of peer connections"
|
||||
);
|
||||
return SendRequestOutcome::NotConnected;
|
||||
}
|
||||
} else {
|
||||
return SendRequestOutcome::NotConnected;
|
||||
};
|
||||
|
||||
let protobuf_rq = api::v1::BlockRequest {
|
||||
fields: u32::from_be_bytes([req.fields.bits(), 0, 0, 0]),
|
||||
from_block: match req.from {
|
||||
message::FromBlock::Hash(h) =>
|
||||
Some(api::v1::block_request::FromBlock::Hash(h.encode())),
|
||||
message::FromBlock::Number(n) =>
|
||||
Some(api::v1::block_request::FromBlock::Number(n.encode())),
|
||||
},
|
||||
to_block: req.to.map(|h| h.encode()).unwrap_or_default(),
|
||||
direction: match req.direction {
|
||||
message::Direction::Ascending => api::v1::Direction::Ascending as i32,
|
||||
message::Direction::Descending => api::v1::Direction::Descending as i32,
|
||||
},
|
||||
max_blocks: req.max.unwrap_or(0),
|
||||
};
|
||||
|
||||
let mut buf = Vec::with_capacity(protobuf_rq.encoded_len());
|
||||
if let Err(err) = protobuf_rq.encode(&mut buf) {
|
||||
log::warn!(
|
||||
target: "sync",
|
||||
"Failed to encode block request {:?}: {:?}",
|
||||
protobuf_rq,
|
||||
err
|
||||
);
|
||||
return SendRequestOutcome::EncodeError(err);
|
||||
}
|
||||
|
||||
let previous_request = connection.ongoing_request.take();
|
||||
connection.ongoing_request = Some(OngoingRequest {
|
||||
emitted: Instant::now(),
|
||||
request: req.clone(),
|
||||
timeout: Delay::new(self.config.request_timeout),
|
||||
});
|
||||
|
||||
log::trace!(target: "sync", "Enqueueing block request to {:?}: {:?}", target, protobuf_rq);
|
||||
self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id: target.clone(),
|
||||
handler: NotifyHandler::One(connection.id),
|
||||
event: OutboundProtocol {
|
||||
request: buf,
|
||||
original_request: req,
|
||||
max_response_size: self.config.max_response_len,
|
||||
protocol: self.config.protocol.clone(),
|
||||
},
|
||||
});
|
||||
|
||||
if let Some(previous_request) = previous_request {
|
||||
log::debug!(
|
||||
target: "sync",
|
||||
"Replacing existing block request on connection {:?}",
|
||||
connection.id
|
||||
);
|
||||
SendRequestOutcome::Replaced {
|
||||
previous: previous_request.request,
|
||||
request_duration: previous_request.emitted.elapsed(),
|
||||
}
|
||||
} else {
|
||||
SendRequestOutcome::Ok
|
||||
}
|
||||
}
|
||||
|
||||
/// Callback, invoked when a new block request has been received from remote.
|
||||
fn on_block_request
|
||||
( &mut self
|
||||
, peer: &PeerId
|
||||
, request: &api::v1::BlockRequest
|
||||
) -> Result<api::v1::BlockResponse, Error>
|
||||
{
|
||||
log::trace!(
|
||||
target: "sync",
|
||||
"Block request from peer {}: from block {:?} to block {:?}, max blocks {:?}",
|
||||
peer,
|
||||
request.from_block,
|
||||
request.to_block,
|
||||
request.max_blocks);
|
||||
|
||||
let from_block_id =
|
||||
match request.from_block {
|
||||
Some(api::v1::block_request::FromBlock::Hash(ref h)) => {
|
||||
let h = Decode::decode(&mut h.as_ref())?;
|
||||
BlockId::<B>::Hash(h)
|
||||
}
|
||||
Some(api::v1::block_request::FromBlock::Number(ref n)) => {
|
||||
let n = Decode::decode(&mut n.as_ref())?;
|
||||
BlockId::<B>::Number(n)
|
||||
}
|
||||
None => {
|
||||
let msg = "missing `BlockRequest::from_block` field";
|
||||
return Err(io::Error::new(io::ErrorKind::Other, msg).into())
|
||||
}
|
||||
};
|
||||
|
||||
let max_blocks =
|
||||
if request.max_blocks == 0 {
|
||||
self.config.max_block_data_response
|
||||
} else {
|
||||
min(request.max_blocks, self.config.max_block_data_response)
|
||||
};
|
||||
|
||||
let direction =
|
||||
if request.direction == api::v1::Direction::Ascending as i32 {
|
||||
api::v1::Direction::Ascending
|
||||
} else if request.direction == api::v1::Direction::Descending as i32 {
|
||||
api::v1::Direction::Descending
|
||||
} else {
|
||||
let msg = format!("invalid `BlockRequest::direction` value: {}", request.direction);
|
||||
return Err(io::Error::new(io::ErrorKind::Other, msg).into())
|
||||
};
|
||||
|
||||
let attributes = BlockAttributes::decode(&mut request.fields.to_be_bytes().as_ref())?;
|
||||
let get_header = attributes.contains(BlockAttributes::HEADER);
|
||||
let get_body = attributes.contains(BlockAttributes::BODY);
|
||||
let get_justification = attributes.contains(BlockAttributes::JUSTIFICATION);
|
||||
|
||||
let mut blocks = Vec::new();
|
||||
let mut block_id = from_block_id;
|
||||
while let Some(header) = self.chain.header(block_id).unwrap_or(None) {
|
||||
if blocks.len() >= max_blocks as usize {
|
||||
break
|
||||
}
|
||||
|
||||
let number = header.number().clone();
|
||||
let hash = header.hash();
|
||||
let parent_hash = header.parent_hash().clone();
|
||||
let justification = if get_justification {
|
||||
self.chain.justification(&BlockId::Hash(hash))?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let is_empty_justification = justification.as_ref().map(|j| j.is_empty()).unwrap_or(false);
|
||||
|
||||
let block_data = api::v1::BlockData {
|
||||
hash: hash.encode(),
|
||||
header: if get_header {
|
||||
header.encode()
|
||||
} else {
|
||||
Vec::new()
|
||||
},
|
||||
body: if get_body {
|
||||
self.chain.block_body(&BlockId::Hash(hash))?
|
||||
.unwrap_or(Vec::new())
|
||||
.iter_mut()
|
||||
.map(|extrinsic| extrinsic.encode())
|
||||
.collect()
|
||||
} else {
|
||||
Vec::new()
|
||||
},
|
||||
receipt: Vec::new(),
|
||||
message_queue: Vec::new(),
|
||||
justification: justification.unwrap_or(Vec::new()),
|
||||
is_empty_justification,
|
||||
};
|
||||
|
||||
blocks.push(block_data);
|
||||
|
||||
match direction {
|
||||
api::v1::Direction::Ascending => {
|
||||
block_id = BlockId::Number(number + One::one())
|
||||
}
|
||||
api::v1::Direction::Descending => {
|
||||
if number.is_zero() {
|
||||
break
|
||||
}
|
||||
block_id = BlockId::Hash(parent_hash)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(api::v1::BlockResponse { blocks })
|
||||
}
|
||||
}
|
||||
|
||||
impl<B> NetworkBehaviour for BlockRequests<B>
|
||||
where
|
||||
B: Block
|
||||
{
|
||||
type ProtocolsHandler = OneShotHandler<InboundProtocol<B>, OutboundProtocol<B>, NodeEvent<B, NegotiatedSubstream>>;
|
||||
type OutEvent = Event<B>;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||
let p = InboundProtocol {
|
||||
max_request_len: self.config.max_request_len,
|
||||
protocol: self.config.protocol.clone(),
|
||||
marker: PhantomData,
|
||||
};
|
||||
let mut cfg = OneShotHandlerConfig::default();
|
||||
cfg.inactive_timeout = self.config.inactivity_timeout;
|
||||
cfg.substream_timeout = self.config.request_timeout;
|
||||
OneShotHandler::new(SubstreamProtocol::new(p), cfg)
|
||||
}
|
||||
|
||||
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, _peer: &PeerId) {
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, _peer: &PeerId) {
|
||||
}
|
||||
|
||||
fn inject_connection_established(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) {
|
||||
self.peers.entry(peer_id.clone())
|
||||
.or_default()
|
||||
.push(Connection {
|
||||
id: *id,
|
||||
ongoing_request: None,
|
||||
});
|
||||
}
|
||||
|
||||
fn inject_connection_closed(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) {
|
||||
let mut needs_remove = false;
|
||||
if let Some(entry) = self.peers.get_mut(peer_id) {
|
||||
if let Some(pos) = entry.iter().position(|i| i.id == *id) {
|
||||
let ongoing_request = entry.remove(pos).ongoing_request;
|
||||
if let Some(ongoing_request) = ongoing_request {
|
||||
log::debug!(
|
||||
target: "sync",
|
||||
"Connection {:?} with {} closed with ongoing sync request: {:?}",
|
||||
id,
|
||||
peer_id,
|
||||
ongoing_request
|
||||
);
|
||||
let ev = Event::RequestCancelled {
|
||||
peer: peer_id.clone(),
|
||||
original_request: ongoing_request.request.clone(),
|
||||
request_duration: ongoing_request.emitted.elapsed(),
|
||||
};
|
||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
if entry.is_empty() {
|
||||
needs_remove = true;
|
||||
}
|
||||
} else {
|
||||
log::error!(
|
||||
target: "sync",
|
||||
"State inconsistency: connection id not found in list"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
log::error!(
|
||||
target: "sync",
|
||||
"State inconsistency: peer_id not found in list of connections"
|
||||
);
|
||||
}
|
||||
if needs_remove {
|
||||
self.peers.remove(peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_event(
|
||||
&mut self,
|
||||
peer: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
node_event: NodeEvent<B, NegotiatedSubstream>
|
||||
) {
|
||||
match node_event {
|
||||
NodeEvent::Request(request, mut stream) => {
|
||||
let before_answer_build = Instant::now();
|
||||
|
||||
match self.on_block_request(&peer, &request) {
|
||||
Ok(res) => {
|
||||
log::trace!(
|
||||
target: "sync",
|
||||
"Enqueueing block response for peer {} with {} blocks",
|
||||
peer, res.blocks.len()
|
||||
);
|
||||
let mut data = Vec::with_capacity(res.encoded_len());
|
||||
if let Err(e) = res.encode(&mut data) {
|
||||
log::debug!(
|
||||
target: "sync",
|
||||
"Error encoding block response for peer {}: {}",
|
||||
peer, e
|
||||
)
|
||||
} else {
|
||||
let future = async move {
|
||||
if let Err(e) = write_one(&mut stream, data).await {
|
||||
log::debug!(
|
||||
target: "sync",
|
||||
"Error writing block response: {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
};
|
||||
self.outgoing.push(future.boxed())
|
||||
}
|
||||
}
|
||||
Err(e) => log::debug!(
|
||||
target: "sync",
|
||||
"Error handling block request from peer {}: {}", peer, e
|
||||
)
|
||||
}
|
||||
|
||||
let ev = Event::AnsweredRequest {
|
||||
peer: peer.clone(),
|
||||
response_build_time: before_answer_build.elapsed(),
|
||||
};
|
||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
NodeEvent::Response(original_request, response) => {
|
||||
log::trace!(
|
||||
target: "sync",
|
||||
"Received block response from peer {} with {} blocks",
|
||||
peer, response.blocks.len()
|
||||
);
|
||||
let request_duration = if let Some(connections) = self.peers.get_mut(&peer) {
|
||||
if let Some(connection) = connections.iter_mut().find(|c| c.id == connection_id) {
|
||||
if let Some(ongoing_request) = &mut connection.ongoing_request {
|
||||
if ongoing_request.request == original_request {
|
||||
let request_duration = ongoing_request.emitted.elapsed();
|
||||
connection.ongoing_request = None;
|
||||
request_duration
|
||||
} else {
|
||||
// We're no longer interested in that request.
|
||||
log::debug!(
|
||||
target: "sync",
|
||||
"Received response from {} to obsolete block request {:?}",
|
||||
peer,
|
||||
original_request
|
||||
);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// We remove from `self.peers` requests we're no longer interested in,
|
||||
// so this can legitimately happen.
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
log::error!(
|
||||
target: "sync",
|
||||
"State inconsistency: response on non-existing connection {:?}",
|
||||
connection_id
|
||||
);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
log::error!(
|
||||
target: "sync",
|
||||
"State inconsistency: response on non-connected peer {}",
|
||||
peer
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
let blocks = response.blocks.into_iter().map(|block_data| {
|
||||
Ok(message::BlockData::<B> {
|
||||
hash: Decode::decode(&mut block_data.hash.as_ref())?,
|
||||
header: if !block_data.header.is_empty() {
|
||||
Some(Decode::decode(&mut block_data.header.as_ref())?)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
body: if original_request.fields.contains(message::BlockAttributes::BODY) {
|
||||
Some(block_data.body.iter().map(|body| {
|
||||
Decode::decode(&mut body.as_ref())
|
||||
}).collect::<Result<Vec<_>, _>>()?)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
receipt: if !block_data.message_queue.is_empty() {
|
||||
Some(block_data.receipt)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
message_queue: if !block_data.message_queue.is_empty() {
|
||||
Some(block_data.message_queue)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
justification: if !block_data.justification.is_empty() {
|
||||
Some(block_data.justification)
|
||||
} else if block_data.is_empty_justification {
|
||||
Some(Vec::new())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
})
|
||||
}).collect::<Result<Vec<_>, codec::Error>>();
|
||||
|
||||
match blocks {
|
||||
Ok(blocks) => {
|
||||
let id = original_request.id;
|
||||
let ev = Event::Response {
|
||||
peer,
|
||||
original_request,
|
||||
response: message::BlockResponse::<B> { id, blocks },
|
||||
request_duration,
|
||||
};
|
||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
Err(err) => {
|
||||
log::debug!(
|
||||
target: "sync",
|
||||
"Failed to decode block response from peer {}: {}", peer, err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters)
|
||||
-> Poll<NetworkBehaviourAction<OutboundProtocol<B>, Event<B>>>
|
||||
{
|
||||
if let Some(ev) = self.pending_events.pop_front() {
|
||||
return Poll::Ready(ev);
|
||||
}
|
||||
|
||||
// Check the request timeouts.
|
||||
for (peer, connections) in &mut self.peers {
|
||||
for connection in connections {
|
||||
let ongoing_request = match &mut connection.ongoing_request {
|
||||
Some(rq) => rq,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
if let Poll::Ready(_) = Pin::new(&mut ongoing_request.timeout).poll(cx) {
|
||||
let original_request = ongoing_request.request.clone();
|
||||
let request_duration = ongoing_request.emitted.elapsed();
|
||||
connection.ongoing_request = None;
|
||||
log::debug!(
|
||||
target: "sync",
|
||||
"Request timeout for {}: {:?}",
|
||||
peer, original_request
|
||||
);
|
||||
let ev = Event::RequestTimeout {
|
||||
peer: peer.clone(),
|
||||
original_request,
|
||||
request_duration,
|
||||
};
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while let Poll::Ready(Some(_)) = self.outgoing.poll_next_unpin(cx) {}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
/// Output type of inbound and outbound substream upgrades.
|
||||
#[derive(Debug)]
|
||||
pub enum NodeEvent<B: Block, T> {
|
||||
/// Incoming request from remote and substream to use for the response.
|
||||
Request(api::v1::BlockRequest, T),
|
||||
/// Incoming response from remote.
|
||||
Response(message::BlockRequest<B>, api::v1::BlockResponse),
|
||||
}
|
||||
|
||||
/// Substream upgrade protocol.
|
||||
///
|
||||
/// We attempt to parse an incoming protobuf encoded request (cf. `Request`)
|
||||
/// which will be handled by the `BlockRequests` behaviour, i.e. the request
|
||||
/// will become visible via `inject_node_event` which then dispatches to the
|
||||
/// relevant callback to process the message and prepare a response.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct InboundProtocol<B> {
|
||||
/// The max. request length in bytes.
|
||||
max_request_len: usize,
|
||||
/// The protocol to use during upgrade negotiation.
|
||||
protocol: Bytes,
|
||||
/// Type of the block.
|
||||
marker: PhantomData<B>,
|
||||
}
|
||||
|
||||
impl<B: Block> UpgradeInfo for InboundProtocol<B> {
|
||||
type Info = Bytes;
|
||||
type InfoIter = iter::Once<Self::Info>;
|
||||
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
iter::once(self.protocol.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, T> InboundUpgrade<T> for InboundProtocol<B>
|
||||
where
|
||||
B: Block,
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send + 'static
|
||||
{
|
||||
type Output = NodeEvent<B, T>;
|
||||
type Error = ReadOneError;
|
||||
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
|
||||
|
||||
fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future {
|
||||
let future = async move {
|
||||
let len = self.max_request_len;
|
||||
let vec = read_one(&mut s, len).await?;
|
||||
match api::v1::BlockRequest::decode(&vec[..]) {
|
||||
Ok(r) => Ok(NodeEvent::Request(r, s)),
|
||||
Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)))
|
||||
}
|
||||
};
|
||||
future.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
/// Substream upgrade protocol.
|
||||
///
|
||||
/// Sends a request to remote and awaits the response.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OutboundProtocol<B: Block> {
|
||||
/// The serialized protobuf request.
|
||||
request: Vec<u8>,
|
||||
/// The original request. Passed back through the API when the response comes back.
|
||||
original_request: message::BlockRequest<B>,
|
||||
/// The max. response length in bytes.
|
||||
max_response_size: usize,
|
||||
/// The protocol to use for upgrade negotiation.
|
||||
protocol: Bytes,
|
||||
}
|
||||
|
||||
impl<B: Block> UpgradeInfo for OutboundProtocol<B> {
|
||||
type Info = Bytes;
|
||||
type InfoIter = iter::Once<Self::Info>;
|
||||
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
iter::once(self.protocol.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, T> OutboundUpgrade<T> for OutboundProtocol<B>
|
||||
where
|
||||
B: Block,
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send + 'static
|
||||
{
|
||||
type Output = NodeEvent<B, T>;
|
||||
type Error = ReadOneError;
|
||||
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
|
||||
|
||||
fn upgrade_outbound(self, mut s: T, _: Self::Info) -> Self::Future {
|
||||
async move {
|
||||
write_one(&mut s, &self.request).await?;
|
||||
let vec = read_one(&mut s, self.max_response_size).await?;
|
||||
|
||||
api::v1::BlockResponse::decode(&vec[..])
|
||||
.map(|r| NodeEvent::Response(self.original_request, r))
|
||||
.map_err(|e| {
|
||||
ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))
|
||||
})
|
||||
}.boxed()
|
||||
}
|
||||
}
|
||||
@@ -1,402 +0,0 @@
|
||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
//
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! `NetworkBehaviour` implementation which handles incoming finality proof requests.
|
||||
//!
|
||||
//! Every request is coming in on a separate connection substream which gets
|
||||
//! closed after we have sent the response back. Incoming requests are encoded
|
||||
//! as protocol buffers (cf. `finality.v1.proto`).
|
||||
|
||||
#![allow(unused)]
|
||||
|
||||
use bytes::Bytes;
|
||||
use codec::{Encode, Decode};
|
||||
use crate::{
|
||||
chain::FinalityProofProvider,
|
||||
config::ProtocolId,
|
||||
protocol::{api, message}
|
||||
};
|
||||
use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
|
||||
use libp2p::{
|
||||
core::{
|
||||
ConnectedPoint,
|
||||
Multiaddr,
|
||||
PeerId,
|
||||
connection::ConnectionId,
|
||||
upgrade::{InboundUpgrade, OutboundUpgrade, ReadOneError, UpgradeInfo, Negotiated},
|
||||
upgrade::{DeniedUpgrade, read_one, write_one}
|
||||
},
|
||||
swarm::{
|
||||
NegotiatedSubstream,
|
||||
NetworkBehaviour,
|
||||
NetworkBehaviourAction,
|
||||
NotifyHandler,
|
||||
OneShotHandler,
|
||||
OneShotHandlerConfig,
|
||||
PollParameters,
|
||||
SubstreamProtocol
|
||||
}
|
||||
};
|
||||
use prost::Message;
|
||||
use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}};
|
||||
use std::{
|
||||
cmp::min,
|
||||
collections::VecDeque,
|
||||
io,
|
||||
iter,
|
||||
marker::PhantomData,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
task::{Context, Poll}
|
||||
};
|
||||
use void::{Void, unreachable};
|
||||
|
||||
// Type alias for convenience.
|
||||
pub type Error = Box<dyn std::error::Error + 'static>;
|
||||
|
||||
/// Event generated by the finality proof requests behaviour.
|
||||
#[derive(Debug)]
|
||||
pub enum Event<B: Block> {
|
||||
/// A response to a finality proof request has arrived.
|
||||
Response {
|
||||
peer: PeerId,
|
||||
/// Block hash originally passed to `send_request`.
|
||||
block_hash: B::Hash,
|
||||
/// Finality proof returned by the remote.
|
||||
proof: Vec<u8>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Configuration options for `FinalityProofRequests`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Config {
|
||||
max_request_len: usize,
|
||||
max_response_len: usize,
|
||||
inactivity_timeout: Duration,
|
||||
protocol: Bytes,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
/// Create a fresh configuration with the following options:
|
||||
///
|
||||
/// - max. request size = 1 MiB
|
||||
/// - max. response size = 1 MiB
|
||||
/// - inactivity timeout = 15s
|
||||
pub fn new(id: &ProtocolId) -> Self {
|
||||
let mut c = Config {
|
||||
max_request_len: 1024 * 1024,
|
||||
max_response_len: 1024 * 1024,
|
||||
inactivity_timeout: Duration::from_secs(15),
|
||||
protocol: Bytes::new(),
|
||||
};
|
||||
c.set_protocol(id);
|
||||
c
|
||||
}
|
||||
|
||||
/// Limit the max. length of incoming finality proof request bytes.
|
||||
pub fn set_max_request_len(&mut self, v: usize) -> &mut Self {
|
||||
self.max_request_len = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the max. length of incoming finality proof response bytes.
|
||||
pub fn set_max_response_len(&mut self, v: usize) -> &mut Self {
|
||||
self.max_response_len = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the max. duration the substream may remain inactive before closing it.
|
||||
pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self {
|
||||
self.inactivity_timeout = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set protocol to use for upgrade negotiation.
|
||||
pub fn set_protocol(&mut self, id: &ProtocolId) -> &mut Self {
|
||||
let mut v = Vec::new();
|
||||
v.extend_from_slice(b"/");
|
||||
v.extend_from_slice(id.as_bytes());
|
||||
v.extend_from_slice(b"/finality-proof/1");
|
||||
self.protocol = v.into();
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// The finality proof request handling behaviour.
|
||||
pub struct FinalityProofRequests<B: Block> {
|
||||
/// This behaviour's configuration.
|
||||
config: Config,
|
||||
/// How to construct finality proofs.
|
||||
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
|
||||
/// Futures sending back the finality proof request responses.
|
||||
outgoing: FuturesUnordered<BoxFuture<'static, ()>>,
|
||||
/// Events to return as soon as possible from `poll`.
|
||||
pending_events: VecDeque<NetworkBehaviourAction<OutboundProtocol<B>, Event<B>>>,
|
||||
}
|
||||
|
||||
impl<B> FinalityProofRequests<B>
|
||||
where
|
||||
B: Block,
|
||||
{
|
||||
/// Initializes the behaviour.
|
||||
///
|
||||
/// If the proof provider is `None`, then the behaviour will not support the finality proof
|
||||
/// requests protocol.
|
||||
pub fn new(cfg: Config, finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>) -> Self {
|
||||
FinalityProofRequests {
|
||||
config: cfg,
|
||||
finality_proof_provider,
|
||||
outgoing: FuturesUnordered::new(),
|
||||
pending_events: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Issue a new finality proof request.
|
||||
///
|
||||
/// If the response doesn't arrive in time, or if the remote answers improperly, the target
|
||||
/// will be disconnected.
|
||||
pub fn send_request(&mut self, target: &PeerId, block_hash: B::Hash, request: Vec<u8>) {
|
||||
let protobuf_rq = api::v1::finality::FinalityProofRequest {
|
||||
block_hash: block_hash.encode(),
|
||||
request,
|
||||
};
|
||||
|
||||
let mut buf = Vec::with_capacity(protobuf_rq.encoded_len());
|
||||
if let Err(err) = protobuf_rq.encode(&mut buf) {
|
||||
log::warn!("failed to encode finality proof request {:?}: {:?}", protobuf_rq, err);
|
||||
return;
|
||||
}
|
||||
|
||||
log::trace!("enqueueing finality proof request to {:?}: {:?}", target, protobuf_rq);
|
||||
self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler {
|
||||
peer_id: target.clone(),
|
||||
handler: NotifyHandler::Any,
|
||||
event: OutboundProtocol {
|
||||
request: buf,
|
||||
block_hash,
|
||||
max_response_size: self.config.max_response_len,
|
||||
protocol: self.config.protocol.clone(),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/// Callback, invoked when a new finality request has been received from remote.
|
||||
fn on_finality_request(&mut self, peer: &PeerId, request: &api::v1::finality::FinalityProofRequest)
|
||||
-> Result<api::v1::finality::FinalityProofResponse, Error>
|
||||
{
|
||||
let block_hash = Decode::decode(&mut request.block_hash.as_ref())?;
|
||||
|
||||
log::trace!(target: "sync", "Finality proof request from {} for {}", peer, block_hash);
|
||||
|
||||
// Note that an empty Vec is sent if no proof is available.
|
||||
let finality_proof = if let Some(provider) = &self.finality_proof_provider {
|
||||
provider
|
||||
.prove_finality(block_hash, &request.request)?
|
||||
.unwrap_or(Vec::new())
|
||||
} else {
|
||||
log::error!("Answering a finality proof request while finality provider is empty");
|
||||
return Err(From::from("Empty finality proof provider".to_string()))
|
||||
};
|
||||
|
||||
Ok(api::v1::finality::FinalityProofResponse { proof: finality_proof })
|
||||
}
|
||||
}
|
||||
|
||||
impl<B> NetworkBehaviour for FinalityProofRequests<B>
|
||||
where
|
||||
B: Block
|
||||
{
|
||||
type ProtocolsHandler = OneShotHandler<InboundProtocol<B>, OutboundProtocol<B>, NodeEvent<B, NegotiatedSubstream>>;
|
||||
type OutEvent = Event<B>;
|
||||
|
||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||
let p = InboundProtocol {
|
||||
max_request_len: self.config.max_request_len,
|
||||
protocol: if self.finality_proof_provider.is_some() {
|
||||
Some(self.config.protocol.clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
marker: PhantomData,
|
||||
};
|
||||
let mut cfg = OneShotHandlerConfig::default();
|
||||
cfg.inactive_timeout = self.config.inactivity_timeout;
|
||||
OneShotHandler::new(SubstreamProtocol::new(p), cfg)
|
||||
}
|
||||
|
||||
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
fn inject_connected(&mut self, _peer: &PeerId) {
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, _peer: &PeerId) {
|
||||
}
|
||||
|
||||
fn inject_event(
|
||||
&mut self,
|
||||
peer: PeerId,
|
||||
connection: ConnectionId,
|
||||
event: NodeEvent<B, NegotiatedSubstream>
|
||||
) {
|
||||
match event {
|
||||
NodeEvent::Request(request, mut stream) => {
|
||||
match self.on_finality_request(&peer, &request) {
|
||||
Ok(res) => {
|
||||
log::trace!("enqueueing finality response for peer {}", peer);
|
||||
let mut data = Vec::with_capacity(res.encoded_len());
|
||||
if let Err(e) = res.encode(&mut data) {
|
||||
log::debug!("error encoding finality response for peer {}: {}", peer, e)
|
||||
} else {
|
||||
let future = async move {
|
||||
if let Err(e) = write_one(&mut stream, data).await {
|
||||
log::debug!("error writing finality response: {}", e)
|
||||
}
|
||||
};
|
||||
self.outgoing.push(future.boxed())
|
||||
}
|
||||
}
|
||||
Err(e) => log::debug!("error handling finality request from peer {}: {}", peer, e)
|
||||
}
|
||||
}
|
||||
NodeEvent::Response(response, block_hash) => {
|
||||
let ev = Event::Response {
|
||||
peer,
|
||||
block_hash,
|
||||
proof: response.proof,
|
||||
};
|
||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters)
|
||||
-> Poll<NetworkBehaviourAction<OutboundProtocol<B>, Event<B>>>
|
||||
{
|
||||
if let Some(ev) = self.pending_events.pop_front() {
|
||||
return Poll::Ready(ev);
|
||||
}
|
||||
|
||||
while let Poll::Ready(Some(_)) = self.outgoing.poll_next_unpin(cx) {}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
/// Output type of inbound and outbound substream upgrades.
|
||||
#[derive(Debug)]
|
||||
pub enum NodeEvent<B: Block, T> {
|
||||
/// Incoming request from remote and substream to use for the response.
|
||||
Request(api::v1::finality::FinalityProofRequest, T),
|
||||
/// Incoming response from remote.
|
||||
Response(api::v1::finality::FinalityProofResponse, B::Hash),
|
||||
}
|
||||
|
||||
/// Substream upgrade protocol.
|
||||
///
|
||||
/// We attempt to parse an incoming protobuf encoded request (cf. `Request`)
|
||||
/// which will be handled by the `FinalityProofRequests` behaviour, i.e. the request
|
||||
/// will become visible via `inject_node_event` which then dispatches to the
|
||||
/// relevant callback to process the message and prepare a response.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct InboundProtocol<B> {
|
||||
/// The max. request length in bytes.
|
||||
max_request_len: usize,
|
||||
/// The protocol to use during upgrade negotiation. If `None`, then the incoming protocol
|
||||
/// is simply disabled.
|
||||
protocol: Option<Bytes>,
|
||||
/// Marker to pin the block type.
|
||||
marker: PhantomData<B>,
|
||||
}
|
||||
|
||||
impl<B: Block> UpgradeInfo for InboundProtocol<B> {
|
||||
type Info = Bytes;
|
||||
// This iterator will return either 0 elements if `self.protocol` is `None`, or 1 element if
|
||||
// it is `Some`.
|
||||
type InfoIter = std::option::IntoIter<Self::Info>;
|
||||
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
self.protocol.clone().into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, T> InboundUpgrade<T> for InboundProtocol<B>
|
||||
where
|
||||
B: Block,
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send + 'static
|
||||
{
|
||||
type Output = NodeEvent<B, T>;
|
||||
type Error = ReadOneError;
|
||||
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
|
||||
|
||||
fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future {
|
||||
async move {
|
||||
let len = self.max_request_len;
|
||||
let vec = read_one(&mut s, len).await?;
|
||||
match api::v1::finality::FinalityProofRequest::decode(&vec[..]) {
|
||||
Ok(r) => Ok(NodeEvent::Request(r, s)),
|
||||
Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)))
|
||||
}
|
||||
}.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
/// Substream upgrade protocol.
|
||||
///
|
||||
/// Sends a request to remote and awaits the response.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OutboundProtocol<B: Block> {
|
||||
/// The serialized protobuf request.
|
||||
request: Vec<u8>,
|
||||
/// Block hash that has been requested.
|
||||
block_hash: B::Hash,
|
||||
/// The max. response length in bytes.
|
||||
max_response_size: usize,
|
||||
/// The protocol to use for upgrade negotiation.
|
||||
protocol: Bytes,
|
||||
}
|
||||
|
||||
impl<B: Block> UpgradeInfo for OutboundProtocol<B> {
|
||||
type Info = Bytes;
|
||||
type InfoIter = iter::Once<Self::Info>;
|
||||
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
iter::once(self.protocol.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, T> OutboundUpgrade<T> for OutboundProtocol<B>
|
||||
where
|
||||
B: Block,
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send + 'static
|
||||
{
|
||||
type Output = NodeEvent<B, T>;
|
||||
type Error = ReadOneError;
|
||||
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
|
||||
|
||||
fn upgrade_outbound(self, mut s: T, _: Self::Info) -> Self::Future {
|
||||
async move {
|
||||
write_one(&mut s, &self.request).await?;
|
||||
let vec = read_one(&mut s, self.max_response_size).await?;
|
||||
|
||||
api::v1::finality::FinalityProofResponse::decode(&vec[..])
|
||||
.map(|r| NodeEvent::Response(r, self.block_hash))
|
||||
.map_err(|e| {
|
||||
ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))
|
||||
})
|
||||
}.boxed()
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,60 +0,0 @@
|
||||
// Schema definition for block request/response messages.
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package api.v1;
|
||||
|
||||
// Block enumeration direction.
|
||||
enum Direction {
|
||||
// Enumerate in ascending order (from child to parent).
|
||||
Ascending = 0;
|
||||
// Enumerate in descending order (from parent to canonical child).
|
||||
Descending = 1;
|
||||
}
|
||||
|
||||
// Request block data from a peer.
|
||||
message BlockRequest {
|
||||
// Bits of block data to request.
|
||||
uint32 fields = 1;
|
||||
// Start from this block.
|
||||
oneof from_block {
|
||||
// Start with given hash.
|
||||
bytes hash = 2;
|
||||
// Start with given block number.
|
||||
bytes number = 3;
|
||||
}
|
||||
// End at this block. An implementation defined maximum is used when unspecified.
|
||||
bytes to_block = 4; // optional
|
||||
// Sequence direction.
|
||||
Direction direction = 5;
|
||||
// Maximum number of blocks to return. An implementation defined maximum is used when unspecified.
|
||||
uint32 max_blocks = 6; // optional
|
||||
}
|
||||
|
||||
// Response to `BlockRequest`
|
||||
message BlockResponse {
|
||||
// Block data for the requested sequence.
|
||||
repeated BlockData blocks = 1;
|
||||
}
|
||||
|
||||
// Block data sent in the response.
|
||||
message BlockData {
|
||||
// Block header hash.
|
||||
bytes hash = 1;
|
||||
// Block header if requested.
|
||||
bytes header = 2; // optional
|
||||
// Block body if requested.
|
||||
repeated bytes body = 3; // optional
|
||||
// Block receipt if requested.
|
||||
bytes receipt = 4; // optional
|
||||
// Block message queue if requested.
|
||||
bytes message_queue = 5; // optional
|
||||
// Justification if requested.
|
||||
bytes justification = 6; // optional
|
||||
// True if justification should be treated as present but empty.
|
||||
// This hack is unfortunately necessary because shortcomings in the protobuf format otherwise
|
||||
// doesn't make in possible to differentiate between a lack of justification and an empty
|
||||
// justification.
|
||||
bool is_empty_justification = 7; // optional, false if absent
|
||||
}
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
// Schema definition for finality proof request/responses.
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package api.v1.finality;
|
||||
|
||||
// Request a finality proof from a peer.
|
||||
message FinalityProofRequest {
|
||||
// SCALE-encoded hash of the block to request.
|
||||
bytes block_hash = 1;
|
||||
// Opaque chain-specific additional request data.
|
||||
bytes request = 2;
|
||||
}
|
||||
|
||||
// Response to a finality proof request.
|
||||
message FinalityProofResponse {
|
||||
// Opaque chain-specific finality proof. Empty if no such proof exists.
|
||||
bytes proof = 1; // optional
|
||||
}
|
||||
@@ -1,120 +0,0 @@
|
||||
// Schema definition for light client messages.
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package api.v1.light;
|
||||
|
||||
// A pair of arbitrary bytes.
|
||||
message Pair {
|
||||
// The first element of the pair.
|
||||
bytes fst = 1;
|
||||
// The second element of the pair.
|
||||
bytes snd = 2;
|
||||
}
|
||||
|
||||
// Enumerate all possible light client request messages.
|
||||
message Request {
|
||||
oneof request {
|
||||
RemoteCallRequest remote_call_request = 1;
|
||||
RemoteReadRequest remote_read_request = 2;
|
||||
RemoteHeaderRequest remote_header_request = 3;
|
||||
RemoteReadChildRequest remote_read_child_request = 4;
|
||||
RemoteChangesRequest remote_changes_request = 5;
|
||||
}
|
||||
}
|
||||
|
||||
// Enumerate all possible light client response messages.
|
||||
message Response {
|
||||
oneof response {
|
||||
RemoteCallResponse remote_call_response = 1;
|
||||
RemoteReadResponse remote_read_response = 2;
|
||||
RemoteHeaderResponse remote_header_response = 3;
|
||||
RemoteChangesResponse remote_changes_response = 4;
|
||||
}
|
||||
}
|
||||
|
||||
// Remote call request.
|
||||
message RemoteCallRequest {
|
||||
// Block at which to perform call.
|
||||
bytes block = 2;
|
||||
// Method name.
|
||||
string method = 3;
|
||||
// Call data.
|
||||
bytes data = 4;
|
||||
}
|
||||
|
||||
// Remote call response.
|
||||
message RemoteCallResponse {
|
||||
// Execution proof.
|
||||
bytes proof = 2;
|
||||
}
|
||||
|
||||
// Remote storage read request.
|
||||
message RemoteReadRequest {
|
||||
// Block at which to perform call.
|
||||
bytes block = 2;
|
||||
// Storage keys.
|
||||
repeated bytes keys = 3;
|
||||
}
|
||||
|
||||
// Remote read response.
|
||||
message RemoteReadResponse {
|
||||
// Read proof.
|
||||
bytes proof = 2;
|
||||
}
|
||||
|
||||
// Remote storage read child request.
|
||||
message RemoteReadChildRequest {
|
||||
// Block at which to perform call.
|
||||
bytes block = 2;
|
||||
// Child Storage key, this is relative
|
||||
// to the child type storage location.
|
||||
bytes storage_key = 3;
|
||||
// Storage keys.
|
||||
repeated bytes keys = 6;
|
||||
}
|
||||
|
||||
// Remote header request.
|
||||
message RemoteHeaderRequest {
|
||||
// Block number to request header for.
|
||||
bytes block = 2;
|
||||
}
|
||||
|
||||
// Remote header response.
|
||||
message RemoteHeaderResponse {
|
||||
// Header. None if proof generation has failed (e.g. header is unknown).
|
||||
bytes header = 2; // optional
|
||||
// Header proof.
|
||||
bytes proof = 3;
|
||||
}
|
||||
|
||||
/// Remote changes request.
|
||||
message RemoteChangesRequest {
|
||||
// Hash of the first block of the range (including first) where changes are requested.
|
||||
bytes first = 2;
|
||||
// Hash of the last block of the range (including last) where changes are requested.
|
||||
bytes last = 3;
|
||||
// Hash of the first block for which the requester has the changes trie root. All other
|
||||
// affected roots must be proved.
|
||||
bytes min = 4;
|
||||
// Hash of the last block that we can use when querying changes.
|
||||
bytes max = 5;
|
||||
// Storage child node key which changes are requested.
|
||||
bytes storage_key = 6; // optional
|
||||
// Storage key which changes are requested.
|
||||
bytes key = 7;
|
||||
}
|
||||
|
||||
// Remote changes response.
|
||||
message RemoteChangesResponse {
|
||||
// Proof has been generated using block with this number as a max block. Should be
|
||||
// less than or equal to the RemoteChangesRequest::max block number.
|
||||
bytes max = 2;
|
||||
// Changes proof.
|
||||
repeated bytes proof = 3;
|
||||
// Changes tries roots missing on the requester' node.
|
||||
repeated Pair roots = 4;
|
||||
// Missing changes tries roots proof.
|
||||
bytes roots_proof = 5;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user