client/network: Use request response for light client requests (#7895)

* client/network: Re-enable light_client_handler.rs unit tests

* client/network: Add scaffolding for light client using req-resp

* client/network: Make it compile

* client/network: Rename OutEvent SendRequest

* client/network: Restructure light client request client and handler

* client/network: Rename light client request client to sender

* client/network: Remove light client prepare_request

* client/network/src/light: Rework configuration

* client/network: Formatting

* client/network/light: Remove RequestId

* client/network/light: Make request functions methods

* client/network/light: Refactor request wrapping

* client/network/light: Fix warnings

* client/network/light: Serialize request in method

* client/network/light: Make returning response a method

* client/network/light: Depend on request response to timeout requests

* client/network: Fix test compilation

* client/network/light: Re-enable connection test

* client/network/light: Re-enable timeout test

* client/network/light: Re-enable incorrect_response test

* client/network/light: Re-enable wrong_response_type test

* client/network/light: Re-enable retry_count_failures test

* client/network/light: Re-enable issue_request tests

* client/network/light: Re-enable send_receive tests

* client/network/light: Deduplicate test logic

* client/network/light: Remove unused imports

* client/network/light: Handle request failure

* client/network/light: Move generate_protocol_config

* client/network: Fix test compilation

* client/network: Rename light client request client to sender

* client/network: Handle too-many-requests error

* client/network: Update outdated comments

* client/network/light: Choose any peer if none has best block defined

* .maintain: Replace sentry-node with local-docker-test-network

Sentry nodes are deprecated. Thus there is no need for
`.maintain/sentry-node` to spin up a sentry node test environment.
Instead this commit rewrites the setup to contain two full-connected
validators and one light client.

With the steps below one can now spin up a local test network with
two validators, one light-client, Prometheus and Grafana.

- cargo build --release
- sudo docker-compose -f .maintain/local-docker-test-network/docker-compose.yml up

* client/network/light: Handle oneshot cancellation

* client/network/light: Do not reduce retry count on missing peer

* client/network/request-response: Assert in debug request id to be unique

* client/network/light: Choose same limit as block request protocol

* client/network: Report reputation changes via response

Allow request response protocol handlers to issue reputation changes, by
sending them back along with the response payload.

* client/network: Remove resolved TODOs
This commit is contained in:
Max Inden
2021-02-01 16:59:47 +01:00
committed by GitHub
parent c83bca67b5
commit 3006100977
15 changed files with 2235 additions and 2136 deletions
+40 -22
View File
@@ -17,14 +17,15 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{
config::{ProtocolId, Role}, light_client_handler, peer_info, request_responses,
config::{ProtocolId, Role},
discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
protocol::{message::Roles, CustomMessageOutcome, NotificationsSink, Protocol},
peer_info, request_responses, light_client_requests,
ObservedRole, DhtEvent, ExHashT,
};
use bytes::Bytes;
use futures::channel::oneshot;
use futures::{channel::oneshot, stream::StreamExt};
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, PublicKey};
use libp2p::identify::IdentifyInfo;
@@ -59,8 +60,6 @@ pub struct Behaviour<B: BlockT, H: ExHashT> {
discovery: DiscoveryBehaviour,
/// Generic request-reponse protocols.
request_responses: request_responses::RequestResponsesBehaviour,
/// Light client request handling.
light_client_handler: light_client_handler::LightClientHandler<B>,
/// Queue of events to produce for the outside.
#[behaviour(ignore)]
@@ -70,6 +69,10 @@ pub struct Behaviour<B: BlockT, H: ExHashT> {
#[behaviour(ignore)]
role: Role,
/// Light client request handling.
#[behaviour(ignore)]
light_client_request_sender: light_client_requests::sender::LightClientRequestSender<B>,
/// Protocol name used to send out block requests via
/// [`request_responses::RequestResponsesBehaviour`].
#[behaviour(ignore)]
@@ -174,10 +177,10 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
role: Role,
user_agent: String,
local_public_key: PublicKey,
light_client_handler: light_client_handler::LightClientHandler<B>,
light_client_request_sender: light_client_requests::sender::LightClientRequestSender<B>,
disco_config: DiscoveryConfig,
// Block request protocol config.
block_request_protocol_config: request_responses::ProtocolConfig,
light_client_request_protocol_config: request_responses::ProtocolConfig,
// All remaining request protocol configs.
mut request_response_protocols: Vec<request_responses::ProtocolConfig>,
) -> Result<Self, request_responses::RegisterError> {
@@ -185,13 +188,15 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
let block_request_protocol_name = block_request_protocol_config.name.to_string();
request_response_protocols.push(block_request_protocol_config);
request_response_protocols.push(light_client_request_protocol_config);
Ok(Behaviour {
substrate,
peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key),
discovery: disco_config.finish(),
request_responses:
request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?,
light_client_handler,
light_client_request_sender,
events: VecDeque::new(),
role,
@@ -268,8 +273,11 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
}
/// Issue a light client request.
pub fn light_client_request(&mut self, r: light_client_handler::Request<B>) -> Result<(), light_client_handler::Error> {
self.light_client_handler.request(r)
pub fn light_client_request(
&mut self,
r: light_client_requests::sender::Request<B>,
) -> Result<(), light_client_requests::sender::SendRequestError> {
self.light_client_request_sender.request(r)
}
}
@@ -289,13 +297,6 @@ fn reported_roles_to_observed_role(local_role: &Role, remote: &PeerId, roles: Ro
}
}
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<void::Void> for
Behaviour<B, H> {
fn inject_event(&mut self, event: void::Void) {
void::unreachable(event)
}
}
impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<CustomMessageOutcome<B>> for
Behaviour<B, H> {
fn inject_event(&mut self, event: CustomMessageOutcome<B>) {
@@ -343,12 +344,16 @@ Behaviour<B, H> {
self.events.push_back(BehaviourOut::NotificationsReceived { remote, messages });
},
CustomMessageOutcome::PeerNewBest(peer_id, number) => {
self.light_client_handler.update_best_block(&peer_id, number);
self.light_client_request_sender.update_best_block(&peer_id, number);
}
CustomMessageOutcome::SyncConnected(peer_id) => {
self.light_client_request_sender.inject_connected(peer_id);
self.events.push_back(BehaviourOut::SyncConnected(peer_id))
}
CustomMessageOutcome::SyncDisconnected(peer_id) => {
self.light_client_request_sender.inject_disconnected(peer_id);
self.events.push_back(BehaviourOut::SyncDisconnected(peer_id))
}
CustomMessageOutcome::SyncConnected(peer_id) =>
self.events.push_back(BehaviourOut::SyncConnected(peer_id)),
CustomMessageOutcome::SyncDisconnected(peer_id) =>
self.events.push_back(BehaviourOut::SyncDisconnected(peer_id)),
CustomMessageOutcome::None => {}
}
}
@@ -443,7 +448,20 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut>
}
impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
fn poll<TEv>(&mut self, _: &mut Context, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> {
fn poll<TEv>(
&mut self,
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> {
use light_client_requests::sender::OutEvent;
while let Poll::Ready(Some(event)) = self.light_client_request_sender.poll_next_unpin(cx) {
match event {
OutEvent::SendRequest { target, request, pending_response, protocol_name } => {
self.request_responses.send_request(&target, &protocol_name, request, pending_response)
}
}
}
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
}
@@ -39,7 +39,7 @@ const MAX_BLOCKS_IN_RESPONSE: usize = 128;
const MAX_BODY_BYTES: usize = 8 * 1024 * 1024;
/// Generates a [`ProtocolConfig`] for the block request protocol, refusing incoming requests.
pub fn generate_protocol_config(protocol_id: ProtocolId) -> ProtocolConfig {
pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig {
ProtocolConfig {
name: generate_protocol_name(protocol_id).into(),
max_request_size: 1024 * 1024,
@@ -50,7 +50,10 @@ pub fn generate_protocol_config(protocol_id: ProtocolId) -> ProtocolConfig {
}
/// Generate the block protocol name from chain specific protocol identifier.
fn generate_protocol_name(protocol_id: ProtocolId) -> String {
//
// Visibility `pub(crate)` to allow `crate::light_client_requests::sender` to generate block request
// protocol name and send block requests.
pub(crate) fn generate_protocol_name(protocol_id: &ProtocolId) -> String {
let mut s = String::new();
s.push_str("/");
s.push_str(protocol_id.as_ref());
@@ -66,7 +69,7 @@ pub struct BlockRequestHandler<B> {
impl <B: BlockT> BlockRequestHandler<B> {
/// Create a new [`BlockRequestHandler`].
pub fn new(protocol_id: ProtocolId, client: Arc<dyn Client<B>>) -> (Self, ProtocolConfig) {
pub fn new(protocol_id: &ProtocolId, client: Arc<dyn Client<B>>) -> (Self, ProtocolConfig) {
// Rate of arrival multiplied with the waiting time in the queue equals the queue length.
//
// An average Polkadot sentry node serves less than 5 requests per second. The 95th percentile
@@ -82,6 +85,22 @@ impl <B: BlockT> BlockRequestHandler<B> {
(Self { client, request_receiver }, protocol_config)
}
/// Run [`BlockRequestHandler`].
pub async fn run(mut self) {
while let Some(request) = self.request_receiver.next().await {
let IncomingRequest { peer, payload, pending_response } = request;
match self.handle_request(payload, pending_response) {
Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
Err(e) => debug!(
target: LOG_TARGET,
"Failed to handle block request from {}: {}",
peer, e,
),
}
}
}
fn handle_request(
&self,
payload: Vec<u8>,
@@ -186,22 +205,6 @@ impl <B: BlockT> BlockRequestHandler<B> {
reputation_changes: Vec::new(),
}).map_err(|_| HandleRequestError::SendResponse)
}
/// Run [`BlockRequestHandler`].
pub async fn run(mut self) {
while let Some(request) = self.request_receiver.next().await {
let IncomingRequest { peer, payload, pending_response } = request;
match self.handle_request(payload, pending_response) {
Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
Err(e) => debug!(
target: LOG_TARGET,
"Failed to handle block request from {}: {}",
peer, e,
),
}
}
}
}
#[derive(derive_more::Display, derive_more::From)]
+8
View File
@@ -111,6 +111,14 @@ pub struct Params<B: BlockT, H: ExHashT> {
/// [`block_request_handler::BlockRequestHandler::new`] allowing both outgoing and incoming
/// requests.
pub block_request_protocol_config: RequestResponseConfig,
/// Request response configuration for the light client request protocol.
///
/// Can be constructed either via [`light_client_requests::generate_protocol_config`] allowing
/// outgoing but not incoming requests, or constructed via
/// [`light_client_requests::handler::LightClientRequestHandler::new`] allowing both outgoing
/// and incoming requests.
pub light_client_request_protocol_config: RequestResponseConfig,
}
/// Role of the local node.
+12 -1
View File
@@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::block_request_handler::BlockRequestHandler;
use crate::light_client_requests::handler::LightClientRequestHandler;
use crate::gossip::QueuedSender;
use crate::{config, Event, NetworkService, NetworkWorker};
@@ -96,7 +97,16 @@ fn build_test_full_node(network_config: config::NetworkConfiguration)
let block_request_protocol_config = {
let (handler, protocol_config) = BlockRequestHandler::new(
protocol_id.clone(),
&protocol_id,
client.clone(),
);
async_std::task::spawn(handler.run().boxed());
protocol_config
};
let light_client_request_protocol_config = {
let (handler, protocol_config) = LightClientRequestHandler::new(
&protocol_id,
client.clone(),
);
async_std::task::spawn(handler.run().boxed());
@@ -117,6 +127,7 @@ fn build_test_full_node(network_config: config::NetworkConfiguration)
),
metrics_registry: None,
block_request_protocol_config,
light_client_request_protocol_config,
})
.unwrap();
+1 -1
View File
@@ -249,7 +249,6 @@ mod behaviour;
mod chain;
mod peer_info;
mod discovery;
mod light_client_handler;
mod on_demand_layer;
mod protocol;
mod request_responses;
@@ -259,6 +258,7 @@ mod transport;
mod utils;
pub mod block_request_handler;
pub mod light_client_requests;
pub mod config;
pub mod error;
pub mod gossip;
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,334 @@
// This file is part of Substrate.
// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Helpers for outgoing and incoming light client requests.
/// For outgoing light client requests.
pub mod sender;
/// For incoming light client requests.
pub mod handler;
use crate::config::ProtocolId;
use crate::request_responses::ProtocolConfig;
use std::time::Duration;
/// Generate the light client protocol name from chain specific protocol identifier.
fn generate_protocol_name(protocol_id: &ProtocolId) -> String {
let mut s = String::new();
s.push_str("/");
s.push_str(protocol_id.as_ref());
s.push_str("/light/2");
s
}
/// Generates a [`ProtocolConfig`] for the light client request protocol, refusing incoming requests.
pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig {
ProtocolConfig {
name: generate_protocol_name(protocol_id).into(),
max_request_size: 1 * 1024 * 1024,
max_response_size: 16 * 1024 * 1024,
request_timeout: Duration::from_secs(15),
inbound_queue: None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::request_responses::IncomingRequest;
use crate::config::ProtocolId;
use assert_matches::assert_matches;
use futures::executor::{block_on, LocalPool};
use futures::task::Spawn;
use futures::{channel::oneshot, prelude::*};
use libp2p::PeerId;
use sc_client_api::StorageProof;
use sc_client_api::light::{RemoteCallRequest, RemoteChangesRequest, RemoteHeaderRequest};
use sc_client_api::light::{self, RemoteReadRequest, RemoteBodyRequest, ChangesProof};
use sc_client_api::{FetchChecker, RemoteReadChildRequest};
use sp_blockchain::Error as ClientError;
use sp_core::storage::ChildInfo;
use sp_runtime::generic::Header;
use sp_runtime::traits::{BlakeTwo256, Block as BlockT, NumberFor};
use std::collections::HashMap;
use std::sync::Arc;
pub struct DummyFetchChecker<B> {
pub ok: bool,
pub _mark: std::marker::PhantomData<B>,
}
impl<B: BlockT> FetchChecker<B> for DummyFetchChecker<B> {
fn check_header_proof(
&self,
_request: &RemoteHeaderRequest<B::Header>,
header: Option<B::Header>,
_remote_proof: StorageProof,
) -> Result<B::Header, ClientError> {
match self.ok {
true if header.is_some() => Ok(header.unwrap()),
_ => Err(ClientError::Backend("Test error".into())),
}
}
fn check_read_proof(
&self,
request: &RemoteReadRequest<B::Header>,
_: StorageProof,
) -> Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError> {
match self.ok {
true => Ok(request
.keys
.iter()
.cloned()
.map(|k| (k, Some(vec![42])))
.collect()),
false => Err(ClientError::Backend("Test error".into())),
}
}
fn check_read_child_proof(
&self,
request: &RemoteReadChildRequest<B::Header>,
_: StorageProof,
) -> Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError> {
match self.ok {
true => Ok(request
.keys
.iter()
.cloned()
.map(|k| (k, Some(vec![42])))
.collect()),
false => Err(ClientError::Backend("Test error".into())),
}
}
fn check_execution_proof(
&self,
_: &RemoteCallRequest<B::Header>,
_: StorageProof,
) -> Result<Vec<u8>, ClientError> {
match self.ok {
true => Ok(vec![42]),
false => Err(ClientError::Backend("Test error".into())),
}
}
fn check_changes_proof(
&self,
_: &RemoteChangesRequest<B::Header>,
_: ChangesProof<B::Header>,
) -> Result<Vec<(NumberFor<B>, u32)>, ClientError> {
match self.ok {
true => Ok(vec![(100u32.into(), 2)]),
false => Err(ClientError::Backend("Test error".into())),
}
}
fn check_body_proof(
&self,
_: &RemoteBodyRequest<B::Header>,
body: Vec<B::Extrinsic>,
) -> Result<Vec<B::Extrinsic>, ClientError> {
match self.ok {
true => Ok(body),
false => Err(ClientError::Backend("Test error".into())),
}
}
}
pub fn protocol_id() -> ProtocolId {
ProtocolId::from("test")
}
pub fn peerset() -> (sc_peerset::Peerset, sc_peerset::PeersetHandle) {
let cfg = sc_peerset::SetConfig {
in_peers: 128,
out_peers: 128,
bootnodes: Default::default(),
reserved_only: false,
reserved_nodes: Default::default(),
};
sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { sets: vec![cfg] })
}
pub fn dummy_header() -> sp_test_primitives::Header {
sp_test_primitives::Header {
parent_hash: Default::default(),
number: 0,
state_root: Default::default(),
extrinsics_root: Default::default(),
digest: Default::default(),
}
}
type Block =
sp_runtime::generic::Block<Header<u64, BlakeTwo256>, substrate_test_runtime::Extrinsic>;
fn send_receive(request: sender::Request<Block>, pool: &LocalPool) {
let client = Arc::new(substrate_test_runtime_client::new());
let (handler, protocol_config) = handler::LightClientRequestHandler::new(&protocol_id(), client);
pool.spawner().spawn_obj(handler.run().boxed().into()).unwrap();
let (_peer_set, peer_set_handle) = peerset();
let mut sender = sender::LightClientRequestSender::<Block>::new(
&protocol_id(),
Arc::new(crate::light_client_requests::tests::DummyFetchChecker {
ok: true,
_mark: std::marker::PhantomData,
}),
peer_set_handle,
);
sender.inject_connected(PeerId::random());
sender.request(request).unwrap();
let sender::OutEvent::SendRequest { pending_response, request, .. } = block_on(sender.next()).unwrap();
let (tx, rx) = oneshot::channel();
block_on(protocol_config.inbound_queue.unwrap().send(IncomingRequest {
peer: PeerId::random(),
payload: request,
pending_response: tx,
})).unwrap();
pool.spawner().spawn_obj(async move {
pending_response.send(Ok(rx.await.unwrap().result.unwrap())).unwrap();
}.boxed().into()).unwrap();
pool.spawner().spawn_obj(sender.for_each(|_| future::ready(())).boxed().into()).unwrap();
}
#[test]
fn send_receive_call() {
let chan = oneshot::channel();
let request = light::RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: None,
};
let mut pool = LocalPool::new();
send_receive(sender::Request::Call {
request,
sender: chan.0,
}, &pool);
assert_eq!(vec![42], pool.run_until(chan.1).unwrap().unwrap());
// ^--- from `DummyFetchChecker::check_execution_proof`
}
#[test]
fn send_receive_read() {
let chan = oneshot::channel();
let request = light::RemoteReadRequest {
header: dummy_header(),
block: Default::default(),
keys: vec![b":key".to_vec()],
retry_count: None,
};
let mut pool = LocalPool::new();
send_receive(sender::Request::Read {
request,
sender: chan.0,
}, &pool);
assert_eq!(
Some(vec![42]),
pool.run_until(chan.1)
.unwrap()
.unwrap()
.remove(&b":key"[..])
.unwrap()
);
// ^--- from `DummyFetchChecker::check_read_proof`
}
#[test]
fn send_receive_read_child() {
let chan = oneshot::channel();
let child_info = ChildInfo::new_default(&b":child_storage:default:sub"[..]);
let request = light::RemoteReadChildRequest {
header: dummy_header(),
block: Default::default(),
storage_key: child_info.prefixed_storage_key(),
keys: vec![b":key".to_vec()],
retry_count: None,
};
let mut pool = LocalPool::new();
send_receive(sender::Request::ReadChild {
request,
sender: chan.0,
}, &pool);
assert_eq!(
Some(vec![42]),
pool.run_until(chan.1)
.unwrap()
.unwrap()
.remove(&b":key"[..])
.unwrap()
);
// ^--- from `DummyFetchChecker::check_read_child_proof`
}
#[test]
fn send_receive_header() {
sp_tracing::try_init_simple();
let chan = oneshot::channel();
let request = light::RemoteHeaderRequest {
cht_root: Default::default(),
block: 1,
retry_count: None,
};
let mut pool = LocalPool::new();
send_receive(sender::Request::Header {
request,
sender: chan.0,
}, &pool);
// The remote does not know block 1:
assert_matches!(
pool.run_until(chan.1).unwrap(),
Err(ClientError::RemoteFetchFailed)
);
}
#[test]
fn send_receive_changes() {
let chan = oneshot::channel();
let request = light::RemoteChangesRequest {
changes_trie_configs: vec![sp_core::ChangesTrieConfigurationRange {
zero: (0, Default::default()),
end: None,
config: Some(sp_core::ChangesTrieConfiguration::new(4, 2)),
}],
first_block: (1, Default::default()),
last_block: (100, Default::default()),
max_block: (100, Default::default()),
tries_roots: (1, Default::default(), Vec::new()),
key: Vec::new(),
storage_key: None,
retry_count: None,
};
let mut pool = LocalPool::new();
send_receive(sender::Request::Changes {
request,
sender: chan.0,
}, &pool);
assert_eq!(vec![(100, 2)], pool.run_until(chan.1).unwrap().unwrap());
// ^--- from `DummyFetchChecker::check_changes_proof`
}
}
@@ -0,0 +1,399 @@
// This file is part of Substrate.
// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Helper for incoming light client requests.
//!
//! Handle (i.e. answer) incoming light client requests from a remote peer received via
//! [`crate::request_responses::RequestResponsesBehaviour`] with [`LightClientRequestHandler`].
use codec::{self, Encode, Decode};
use crate::{
chain::Client,
config::ProtocolId,
schema,
PeerId,
};
use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig};
use futures::{channel::mpsc, prelude::*};
use prost::Message;
use sc_client_api::{
StorageProof,
light
};
use sc_peerset::ReputationChange;
use sp_core::{
storage::{ChildInfo, ChildType,StorageKey, PrefixedStorageKey},
hexdisplay::HexDisplay,
};
use sp_runtime::{
traits::{Block, Zero},
generic::BlockId,
};
use std::{
collections::{BTreeMap},
sync::Arc,
};
use log::debug;
const LOG_TARGET: &str = "light-client-request-handler";
/// Handler for incoming light client requests from a remote peer.
pub struct LightClientRequestHandler<B: Block> {
request_receiver: mpsc::Receiver<IncomingRequest>,
/// Blockchain client.
client: Arc<dyn Client<B>>,
}
impl<B: Block> LightClientRequestHandler<B> {
/// Create a new [`BlockRequestHandler`].
pub fn new(
protocol_id: &ProtocolId,
client: Arc<dyn Client<B>>,
) -> (Self, ProtocolConfig) {
// For now due to lack of data on light client request handling in production systems, this
// value is chosen to match the block request limit.
let (tx, request_receiver) = mpsc::channel(20);
let mut protocol_config = super::generate_protocol_config(protocol_id);
protocol_config.inbound_queue = Some(tx);
(Self { client, request_receiver }, protocol_config)
}
/// Run [`LightClientRequestHandler`].
pub async fn run(mut self) {
while let Some(request) = self.request_receiver.next().await {
let IncomingRequest { peer, payload, pending_response } = request;
match self.handle_request(peer, payload) {
Ok(response_data) => {
let response = OutgoingResponse { result: Ok(response_data), reputation_changes: Vec::new() };
match pending_response.send(response) {
Ok(()) => debug!(
target: LOG_TARGET,
"Handled light client request from {}.",
peer,
),
Err(_) => debug!(
target: LOG_TARGET,
"Failed to handle light client request from {}: {}",
peer, HandleRequestError::SendResponse,
),
};
} ,
Err(e) => {
debug!(
target: LOG_TARGET,
"Failed to handle light client request from {}: {}",
peer, e,
);
let reputation_changes = match e {
HandleRequestError::BadRequest(_) => {
vec![ReputationChange::new(-(1 << 12), "bad request")]
}
_ => Vec::new(),
};
let response = OutgoingResponse { result: Err(()), reputation_changes };
if pending_response.send(response).is_err() {
debug!(
target: LOG_TARGET,
"Failed to handle light client request from {}: {}",
peer, HandleRequestError::SendResponse,
);
};
},
}
}
}
fn handle_request(
&mut self,
peer: PeerId,
payload: Vec<u8>,
) -> Result<Vec<u8>, HandleRequestError> {
let request = schema::v1::light::Request::decode(&payload[..])?;
let response = match &request.request {
Some(schema::v1::light::request::Request::RemoteCallRequest(r)) =>
self.on_remote_call_request(&peer, r)?,
Some(schema::v1::light::request::Request::RemoteReadRequest(r)) =>
self.on_remote_read_request(&peer, r)?,
Some(schema::v1::light::request::Request::RemoteHeaderRequest(r)) =>
self.on_remote_header_request(&peer, r)?,
Some(schema::v1::light::request::Request::RemoteReadChildRequest(r)) =>
self.on_remote_read_child_request(&peer, r)?,
Some(schema::v1::light::request::Request::RemoteChangesRequest(r)) =>
self.on_remote_changes_request(&peer, r)?,
None => {
return Err(HandleRequestError::BadRequest("Remote request without request data."));
}
};
let mut data = Vec::new();
response.encode(&mut data)?;
Ok(data)
}
fn on_remote_call_request(
&mut self,
peer: &PeerId,
request: &schema::v1::light::RemoteCallRequest,
) -> Result<schema::v1::light::Response, HandleRequestError> {
log::trace!(
"Remote call request from {} ({} at {:?}).",
peer, request.method, request.block,
);
let block = Decode::decode(&mut request.block.as_ref())?;
let proof = match self.client.execution_proof(
&BlockId::Hash(block),
&request.method, &request.data,
) {
Ok((_, proof)) => proof,
Err(e) => {
log::trace!(
"remote call request from {} ({} at {:?}) failed with: {}",
peer, request.method, request.block, e,
);
StorageProof::empty()
}
};
let response = {
let r = schema::v1::light::RemoteCallResponse { proof: proof.encode() };
schema::v1::light::response::Response::RemoteCallResponse(r)
};
Ok(schema::v1::light::Response { response: Some(response) })
}
fn on_remote_read_request(
&mut self,
peer: &PeerId,
request: &schema::v1::light::RemoteReadRequest,
) -> Result<schema::v1::light::Response, HandleRequestError> {
if request.keys.is_empty() {
log::debug!("Invalid remote read request sent by {}.", peer);
return Err(HandleRequestError::BadRequest("Remote read request without keys."))
}
log::trace!(
"Remote read request from {} ({} at {:?}).",
peer, fmt_keys(request.keys.first(), request.keys.last()), request.block,
);
let block = Decode::decode(&mut request.block.as_ref())?;
let proof = match self.client.read_proof(
&BlockId::Hash(block),
&mut request.keys.iter().map(AsRef::as_ref),
) {
Ok(proof) => proof,
Err(error) => {
log::trace!(
"remote read request from {} ({} at {:?}) failed with: {}",
peer, fmt_keys(request.keys.first(), request.keys.last()), request.block, error,
);
StorageProof::empty()
}
};
let response = {
let r = schema::v1::light::RemoteReadResponse { proof: proof.encode() };
schema::v1::light::response::Response::RemoteReadResponse(r)
};
Ok(schema::v1::light::Response { response: Some(response) })
}
fn on_remote_read_child_request(
&mut self,
peer: &PeerId,
request: &schema::v1::light::RemoteReadChildRequest,
) -> Result<schema::v1::light::Response, HandleRequestError> {
if request.keys.is_empty() {
log::debug!("Invalid remote child read request sent by {}.", peer);
return Err(HandleRequestError::BadRequest("Remove read child request without keys."))
}
log::trace!(
"Remote read child request from {} ({} {} at {:?}).",
peer,
HexDisplay::from(&request.storage_key),
fmt_keys(request.keys.first(), request.keys.last()),
request.block,
);
let block = Decode::decode(&mut request.block.as_ref())?;
let prefixed_key = PrefixedStorageKey::new_ref(&request.storage_key);
let child_info = match ChildType::from_prefixed_key(prefixed_key) {
Some((ChildType::ParentKeyId, storage_key)) => Ok(ChildInfo::new_default(storage_key)),
None => Err(sp_blockchain::Error::InvalidChildStorageKey),
};
let proof = match child_info.and_then(|child_info| self.client.read_child_proof(
&BlockId::Hash(block),
&child_info,
&mut request.keys.iter().map(AsRef::as_ref)
)) {
Ok(proof) => proof,
Err(error) => {
log::trace!(
"remote read child request from {} ({} {} at {:?}) failed with: {}",
peer,
HexDisplay::from(&request.storage_key),
fmt_keys(request.keys.first(), request.keys.last()),
request.block,
error,
);
StorageProof::empty()
}
};
let response = {
let r = schema::v1::light::RemoteReadResponse { proof: proof.encode() };
schema::v1::light::response::Response::RemoteReadResponse(r)
};
Ok(schema::v1::light::Response { response: Some(response) })
}
fn on_remote_header_request(
&mut self,
peer: &PeerId,
request: &schema::v1::light::RemoteHeaderRequest,
) -> Result<schema::v1::light::Response, HandleRequestError> {
log::trace!("Remote header proof request from {} ({:?}).", peer, request.block);
let block = Decode::decode(&mut request.block.as_ref())?;
let (header, proof) = match self.client.header_proof(&BlockId::Number(block)) {
Ok((header, proof)) => (header.encode(), proof),
Err(error) => {
log::trace!(
"Remote header proof request from {} ({:?}) failed with: {}.",
peer, request.block, error
);
(Default::default(), StorageProof::empty())
}
};
let response = {
let r = schema::v1::light::RemoteHeaderResponse { header, proof: proof.encode() };
schema::v1::light::response::Response::RemoteHeaderResponse(r)
};
Ok(schema::v1::light::Response { response: Some(response) })
}
fn on_remote_changes_request(
&mut self,
peer: &PeerId,
request: &schema::v1::light::RemoteChangesRequest,
) -> Result<schema::v1::light::Response, HandleRequestError> {
log::trace!(
"Remote changes proof request from {} for key {} ({:?}..{:?}).",
peer,
if !request.storage_key.is_empty() {
format!("{} : {}", HexDisplay::from(&request.storage_key), HexDisplay::from(&request.key))
} else {
HexDisplay::from(&request.key).to_string()
},
request.first,
request.last,
);
let first = Decode::decode(&mut request.first.as_ref())?;
let last = Decode::decode(&mut request.last.as_ref())?;
let min = Decode::decode(&mut request.min.as_ref())?;
let max = Decode::decode(&mut request.max.as_ref())?;
let key = StorageKey(request.key.clone());
let storage_key = if request.storage_key.is_empty() {
None
} else {
Some(PrefixedStorageKey::new_ref(&request.storage_key))
};
let proof = match self.client.key_changes_proof(first, last, min, max, storage_key, &key) {
Ok(proof) => proof,
Err(error) => {
log::trace!(
"Remote changes proof request from {} for key {} ({:?}..{:?}) failed with: {}.",
peer,
format!("{} : {}", HexDisplay::from(&request.storage_key), HexDisplay::from(&key.0)),
request.first,
request.last,
error,
);
light::ChangesProof::<B::Header> {
max_block: Zero::zero(),
proof: Vec::new(),
roots: BTreeMap::new(),
roots_proof: StorageProof::empty(),
}
}
};
let response = {
let r = schema::v1::light::RemoteChangesResponse {
max: proof.max_block.encode(),
proof: proof.proof,
roots: proof.roots.into_iter()
.map(|(k, v)| schema::v1::light::Pair { fst: k.encode(), snd: v.encode() })
.collect(),
roots_proof: proof.roots_proof.encode(),
};
schema::v1::light::response::Response::RemoteChangesResponse(r)
};
Ok(schema::v1::light::Response { response: Some(response) })
}
}
#[derive(derive_more::Display, derive_more::From)]
enum HandleRequestError {
#[display(fmt = "Failed to decode request: {}.", _0)]
DecodeProto(prost::DecodeError),
#[display(fmt = "Failed to encode response: {}.", _0)]
EncodeProto(prost::EncodeError),
#[display(fmt = "Failed to send response.")]
SendResponse,
/// A bad request has been received.
#[display(fmt = "bad request: {}", _0)]
BadRequest(&'static str),
/// Encoding or decoding of some data failed.
#[display(fmt = "codec error: {}", _0)]
Codec(codec::Error),
}
fn fmt_keys(first: Option<&Vec<u8>>, last: Option<&Vec<u8>>) -> String {
if let (Some(first), Some(last)) = (first, last) {
if first == last {
HexDisplay::from(first).to_string()
} else {
format!("{}..{}", HexDisplay::from(first), HexDisplay::from(last))
}
} else {
String::from("n/a")
}
}
File diff suppressed because it is too large Load Diff
+10 -10
View File
@@ -18,7 +18,7 @@
//! On-demand requests service.
use crate::light_client_handler;
use crate::light_client_requests;
use futures::{channel::oneshot, prelude::*};
use parking_lot::Mutex;
@@ -45,10 +45,10 @@ pub struct OnDemand<B: BlockT> {
/// Note that a better alternative would be to use a MPMC queue here, and add a `poll` method
/// from the `OnDemand`. However there exists no popular implementation of MPMC channels in
/// asynchronous Rust at the moment
requests_queue: Mutex<Option<TracingUnboundedReceiver<light_client_handler::Request<B>>>>,
requests_queue: Mutex<Option<TracingUnboundedReceiver<light_client_requests::sender::Request<B>>>>,
/// Sending side of `requests_queue`.
requests_send: TracingUnboundedSender<light_client_handler::Request<B>>,
requests_send: TracingUnboundedSender<light_client_requests::sender::Request<B>>,
}
@@ -149,7 +149,7 @@ where
/// If this function returns `None`, that means that the receiver has already been extracted in
/// the past, and therefore that something already handles the requests.
pub(crate) fn extract_receiver(&self)
-> Option<TracingUnboundedReceiver<light_client_handler::Request<B>>>
-> Option<TracingUnboundedReceiver<light_client_requests::sender::Request<B>>>
{
self.requests_queue.lock().take()
}
@@ -170,7 +170,7 @@ where
let (sender, receiver) = oneshot::channel();
let _ = self
.requests_send
.unbounded_send(light_client_handler::Request::Header { request, sender });
.unbounded_send(light_client_requests::sender::Request::Header { request, sender });
RemoteResponse { receiver }
}
@@ -178,7 +178,7 @@ where
let (sender, receiver) = oneshot::channel();
let _ = self
.requests_send
.unbounded_send(light_client_handler::Request::Read { request, sender });
.unbounded_send(light_client_requests::sender::Request::Read { request, sender });
RemoteResponse { receiver }
}
@@ -189,7 +189,7 @@ where
let (sender, receiver) = oneshot::channel();
let _ = self
.requests_send
.unbounded_send(light_client_handler::Request::ReadChild { request, sender });
.unbounded_send(light_client_requests::sender::Request::ReadChild { request, sender });
RemoteResponse { receiver }
}
@@ -197,7 +197,7 @@ where
let (sender, receiver) = oneshot::channel();
let _ = self
.requests_send
.unbounded_send(light_client_handler::Request::Call { request, sender });
.unbounded_send(light_client_requests::sender::Request::Call { request, sender });
RemoteResponse { receiver }
}
@@ -208,7 +208,7 @@ where
let (sender, receiver) = oneshot::channel();
let _ = self
.requests_send
.unbounded_send(light_client_handler::Request::Changes { request, sender });
.unbounded_send(light_client_requests::sender::Request::Changes { request, sender });
RemoteResponse { receiver }
}
@@ -216,7 +216,7 @@ where
let (sender, receiver) = oneshot::channel();
let _ = self
.requests_send
.unbounded_send(light_client_handler::Request::Body { request, sender });
.unbounded_send(light_client_requests::sender::Request::Body { request, sender });
RemoteResponse { receiver }
}
}
@@ -281,10 +281,11 @@ impl RequestResponsesBehaviour {
if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
if protocol.is_connected(target) {
let request_id = protocol.send_request(target, request);
self.pending_requests.insert(
let prev_req_id = self.pending_requests.insert(
(protocol_name.to_string().into(), request_id).into(),
(Instant::now(), pending_response),
);
debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
} else {
if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
log::debug!(
+19 -15
View File
@@ -38,7 +38,7 @@ use crate::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
on_demand_layer::AlwaysBadChecker,
light_client_handler,
light_client_requests,
protocol::{
self,
NotifsHandlerError,
@@ -254,11 +254,10 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
params.network_config.client_version,
params.network_config.node_name
);
let light_client_handler = {
let config = light_client_handler::Config::new(&params.protocol_id);
light_client_handler::LightClientHandler::new(
config,
params.chain,
let light_client_request_sender = {
light_client_requests::sender::LightClientRequestSender::new(
&params.protocol_id,
checker,
peerset_handle.clone(),
)
@@ -339,9 +338,10 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
params.role,
user_agent,
local_public,
light_client_handler,
light_client_request_sender,
discovery_config,
params.block_request_protocol_config,
params.light_client_request_protocol_config,
params.network_config.request_response_protocols,
);
@@ -1286,7 +1286,7 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
/// Messages from the [`NetworkService`] that must be processed.
from_service: TracingUnboundedReceiver<ServiceToWorkerMsg<B, H>>,
/// Receiver for queries from the light client that must be processed.
light_client_rqs: Option<TracingUnboundedReceiver<light_client_handler::Request<B>>>,
light_client_rqs: Option<TracingUnboundedReceiver<light_client_requests::sender::Request<B>>>,
/// Senders for events that happen on the network.
event_streams: out_events::OutChannels,
/// Prometheus network metrics.
@@ -1312,10 +1312,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
// Check for new incoming light client requests.
if let Some(light_client_rqs) = this.light_client_rqs.as_mut() {
while let Poll::Ready(Some(rq)) = light_client_rqs.poll_next_unpin(cx) {
// This can error if there are too many queued requests already.
if this.network_service.light_client_request(rq).is_err() {
log::warn!("Couldn't start light client request: too many pending requests");
let result = this.network_service.light_client_request(rq);
match result {
Ok(()) => {},
Err(light_client_requests::sender::SendRequestError::TooManyRequests) => {
log::warn!("Couldn't start light client request: too many pending requests");
}
}
if let Some(metrics) = this.metrics.as_ref() {
metrics.issued_light_requests.inc();
}
@@ -1608,11 +1612,11 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
let reason = match cause {
Some(ConnectionError::IO(_)) => "transport-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::B(EitherError::A(
PingFailure::Timeout)))))))) => "ping-timeout",
EitherError::B(EitherError::A(
PingFailure::Timeout))))))) => "ping-timeout",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
EitherError::A(EitherError::A(
NotifsHandlerError::SyncNotificationsClogged))))))) => "sync-notifications-clogged",
EitherError::A(
NotifsHandlerError::SyncNotificationsClogged)))))) => "sync-notifications-clogged",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) => "protocol-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout)) => "keep-alive-timeout",
None => "actively-closed",
+12 -1
View File
@@ -18,6 +18,7 @@
use crate::{config, Event, NetworkService, NetworkWorker};
use crate::block_request_handler::BlockRequestHandler;
use crate::light_client_requests::handler::LightClientRequestHandler;
use libp2p::PeerId;
use futures::prelude::*;
@@ -96,7 +97,16 @@ fn build_test_full_node(config: config::NetworkConfiguration)
let block_request_protocol_config = {
let (handler, protocol_config) = BlockRequestHandler::new(
protocol_id.clone(),
&protocol_id,
client.clone(),
);
async_std::task::spawn(handler.run().boxed());
protocol_config
};
let light_client_request_protocol_config = {
let (handler, protocol_config) = LightClientRequestHandler::new(
&protocol_id,
client.clone(),
);
async_std::task::spawn(handler.run().boxed());
@@ -117,6 +127,7 @@ fn build_test_full_node(config: config::NetworkConfiguration)
),
metrics_registry: None,
block_request_protocol_config,
light_client_request_protocol_config,
})
.unwrap();
+14 -3
View File
@@ -30,6 +30,7 @@ use std::{
use libp2p::build_multiaddr;
use log::trace;
use sc_network::block_request_handler::{self, BlockRequestHandler};
use sc_network::light_client_requests::{self, handler::LightClientRequestHandler};
use sp_blockchain::{
HeaderBackend, Result as ClientResult,
well_known_cache_keys::{self, Id as CacheKeyId},
@@ -726,7 +727,13 @@ pub trait TestNetFactory: Sized {
let protocol_id = ProtocolId::from("test-protocol-name");
let block_request_protocol_config = {
let (handler, protocol_config) = BlockRequestHandler::new(protocol_id.clone(), client.clone());
let (handler, protocol_config) = BlockRequestHandler::new(&protocol_id, client.clone());
self.spawn_task(handler.run().boxed());
protocol_config
};
let light_client_request_protocol_config = {
let (handler, protocol_config) = LightClientRequestHandler::new(&protocol_id, client.clone());
self.spawn_task(handler.run().boxed());
protocol_config
};
@@ -744,6 +751,7 @@ pub trait TestNetFactory: Sized {
.unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)),
metrics_registry: None,
block_request_protocol_config,
light_client_request_protocol_config,
}).unwrap();
trace!(target: "test_network", "Peer identifier: {}", network.service().local_peer_id());
@@ -813,11 +821,13 @@ pub trait TestNetFactory: Sized {
let protocol_id = ProtocolId::from("test-protocol-name");
// Add block request handler.
let block_request_protocol_config = block_request_handler::generate_protocol_config(
protocol_id.clone(),
&protocol_id,
);
let light_client_request_protocol_config =
light_client_requests::generate_protocol_config(&protocol_id);
let network = NetworkWorker::new(sc_network::config::Params {
role: Role::Light,
executor: None,
@@ -830,6 +840,7 @@ pub trait TestNetFactory: Sized {
block_announce_validator: Box::new(DefaultBlockAnnounceValidator),
metrics_registry: None,
block_request_protocol_config,
light_client_request_protocol_config,
}).unwrap();
self.mut_peers(|peers| {
+19 -2
View File
@@ -43,6 +43,7 @@ use log::{info, warn};
use sc_network::config::{Role, OnDemand};
use sc_network::NetworkService;
use sc_network::block_request_handler::{self, BlockRequestHandler};
use sc_network::light_client_requests::{self, handler::LightClientRequestHandler};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{
Block as BlockT, HashFor, Zero, BlockIdTo,
@@ -869,11 +870,11 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
let block_request_protocol_config = {
if matches!(config.role, Role::Light) {
// Allow outgoing requests but deny incoming requests.
block_request_handler::generate_protocol_config(protocol_id.clone())
block_request_handler::generate_protocol_config(&protocol_id)
} else {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) = BlockRequestHandler::new(
protocol_id.clone(),
&protocol_id,
client.clone(),
);
spawn_handle.spawn("block_request_handler", handler.run());
@@ -881,6 +882,21 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
}
};
let light_client_request_protocol_config = {
if matches!(config.role, Role::Light) {
// Allow outgoing requests but deny incoming requests.
light_client_requests::generate_protocol_config(&protocol_id)
} else {
// Allow both outgoing and incoming requests.
let (handler, protocol_config) = LightClientRequestHandler::new(
&protocol_id,
client.clone(),
);
spawn_handle.spawn("light_client_request_handler", handler.run());
protocol_config
}
};
let network_params = sc_network::config::Params {
role: config.role.clone(),
executor: {
@@ -898,6 +914,7 @@ pub fn build_network<TBl, TExPool, TImpQu, TCl>(
block_announce_validator,
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
block_request_protocol_config,
light_client_request_protocol_config,
};
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();