Kill the light client, CHTs and change tries. (#10080)

* Remove light client, change tries and CHTs

* Update tests

* fmt

* Restore changes_root

* Fixed benches

* Cargo fmt

* fmt

* fmt
This commit is contained in:
Arkadiy Paronyan
2021-11-12 14:15:01 +01:00
committed by GitHub
parent 112b7dac47
commit 4cbbf0cf43
141 changed files with 532 additions and 17807 deletions
+8 -42
View File
@@ -20,14 +20,14 @@ use crate::{
bitswap::Bitswap,
config::ProtocolId,
discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
light_client_requests, peer_info,
peer_info,
protocol::{message::Roles, CustomMessageOutcome, NotificationsSink, Protocol},
request_responses, DhtEvent, ObservedRole,
};
use bytes::Bytes;
use codec::Encode;
use futures::{channel::oneshot, stream::StreamExt};
use futures::channel::oneshot;
use libp2p::{
core::{Multiaddr, PeerId, PublicKey},
identify::IdentifyInfo,
@@ -76,10 +76,6 @@ pub struct Behaviour<B: BlockT> {
#[behaviour(ignore)]
events: VecDeque<BehaviourOut<B>>,
/// Light client request handling.
#[behaviour(ignore)]
light_client_request_sender: light_client_requests::sender::LightClientRequestSender<B>,
/// Protocol name used to send out block requests via
/// [`request_responses::RequestResponsesBehaviour`].
#[behaviour(ignore)]
@@ -198,7 +194,6 @@ impl<B: BlockT> Behaviour<B> {
substrate: Protocol<B>,
user_agent: String,
local_public_key: PublicKey,
light_client_request_sender: light_client_requests::sender::LightClientRequestSender<B>,
disco_config: DiscoveryConfig,
block_request_protocol_config: request_responses::ProtocolConfig,
state_request_protocol_config: request_responses::ProtocolConfig,
@@ -233,7 +228,6 @@ impl<B: BlockT> Behaviour<B> {
request_response_protocols.into_iter(),
peerset,
)?,
light_client_request_sender,
events: VecDeque::new(),
block_request_protocol_name,
state_request_protocol_name,
@@ -316,14 +310,6 @@ impl<B: BlockT> Behaviour<B> {
pub fn put_value(&mut self, key: record::Key, value: Vec<u8>) {
self.discovery.put_value(key, value);
}
/// Issue a light client request.
pub fn light_client_request(
&mut self,
r: light_client_requests::sender::Request<B>,
) -> Result<(), light_client_requests::sender::SendRequestError> {
self.light_client_request_sender.request(r)
}
}
fn reported_roles_to_observed_role(roles: Roles) -> ObservedRole {
@@ -436,17 +422,11 @@ impl<B: BlockT> NetworkBehaviourEventProcess<CustomMessageOutcome<B>> for Behavi
CustomMessageOutcome::NotificationsReceived { remote, messages } => {
self.events.push_back(BehaviourOut::NotificationsReceived { remote, messages });
},
CustomMessageOutcome::PeerNewBest(peer_id, number) => {
self.light_client_request_sender.update_best_block(&peer_id, number);
},
CustomMessageOutcome::SyncConnected(peer_id) => {
self.light_client_request_sender.inject_connected(peer_id);
self.events.push_back(BehaviourOut::SyncConnected(peer_id))
},
CustomMessageOutcome::SyncDisconnected(peer_id) => {
self.light_client_request_sender.inject_disconnected(peer_id);
self.events.push_back(BehaviourOut::SyncDisconnected(peer_id))
},
CustomMessageOutcome::PeerNewBest(_peer_id, _number) => {},
CustomMessageOutcome::SyncConnected(peer_id) =>
self.events.push_back(BehaviourOut::SyncConnected(peer_id)),
CustomMessageOutcome::SyncDisconnected(peer_id) =>
self.events.push_back(BehaviourOut::SyncDisconnected(peer_id)),
CustomMessageOutcome::None => {},
}
}
@@ -534,23 +514,9 @@ impl<B: BlockT> NetworkBehaviourEventProcess<DiscoveryOut> for Behaviour<B> {
impl<B: BlockT> Behaviour<B> {
fn poll<TEv>(
&mut self,
cx: &mut Context,
_cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> {
use light_client_requests::sender::OutEvent;
while let Poll::Ready(Some(event)) = self.light_client_request_sender.poll_next_unpin(cx) {
match event {
OutEvent::SendRequest { target, request, pending_response, protocol_name } =>
self.request_responses.send_request(
&target,
&protocol_name,
request,
pending_response,
IfDisconnected::ImmediateError,
),
}
}
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
}
-6
View File
@@ -23,7 +23,6 @@
pub use crate::{
chain::Client,
on_demand_layer::{AlwaysBadChecker, OnDemand},
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
@@ -83,11 +82,6 @@ pub struct Params<B: BlockT, H: ExHashT> {
/// Client that contains the blockchain.
pub chain: Arc<dyn Client<B>>,
/// The `OnDemand` object acts as a "receiver" for block data requests from the client.
/// If `Some`, the network worker will process these requests and answer them.
/// Normally used only for light clients.
pub on_demand: Option<Arc<OnDemand<B>>>,
/// Pool of transactions.
///
/// The network worker will fetch transactions from this object in order to propagate them on
-1
View File
@@ -247,7 +247,6 @@
mod behaviour;
mod chain;
mod discovery;
mod on_demand_layer;
mod peer_info;
mod protocol;
mod request_responses;
@@ -20,8 +20,6 @@
/// For incoming light client requests.
pub mod handler;
/// For outgoing light client requests.
pub mod sender;
use crate::{config::ProtocolId, request_responses::ProtocolConfig};
@@ -47,269 +45,3 @@ pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig {
inbound_queue: None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{config::ProtocolId, request_responses::IncomingRequest};
use assert_matches::assert_matches;
use futures::{
channel::oneshot,
executor::{block_on, LocalPool},
prelude::*,
task::Spawn,
};
use libp2p::PeerId;
use sc_client_api::{
light::{
self, ChangesProof, RemoteBodyRequest, RemoteCallRequest, RemoteChangesRequest,
RemoteHeaderRequest, RemoteReadRequest,
},
FetchChecker, RemoteReadChildRequest, StorageProof,
};
use sp_blockchain::Error as ClientError;
use sp_core::storage::ChildInfo;
use sp_runtime::{
generic::Header,
traits::{BlakeTwo256, Block as BlockT, NumberFor},
};
use std::{collections::HashMap, sync::Arc};
pub struct DummyFetchChecker<B> {
pub ok: bool,
pub _mark: std::marker::PhantomData<B>,
}
impl<B: BlockT> FetchChecker<B> for DummyFetchChecker<B> {
fn check_header_proof(
&self,
_request: &RemoteHeaderRequest<B::Header>,
header: Option<B::Header>,
_remote_proof: StorageProof,
) -> Result<B::Header, ClientError> {
match self.ok {
true if header.is_some() => Ok(header.unwrap()),
_ => Err(ClientError::Backend("Test error".into())),
}
}
fn check_read_proof(
&self,
request: &RemoteReadRequest<B::Header>,
_: StorageProof,
) -> Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError> {
match self.ok {
true => Ok(request.keys.iter().cloned().map(|k| (k, Some(vec![42]))).collect()),
false => Err(ClientError::Backend("Test error".into())),
}
}
fn check_read_child_proof(
&self,
request: &RemoteReadChildRequest<B::Header>,
_: StorageProof,
) -> Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError> {
match self.ok {
true => Ok(request.keys.iter().cloned().map(|k| (k, Some(vec![42]))).collect()),
false => Err(ClientError::Backend("Test error".into())),
}
}
fn check_execution_proof(
&self,
_: &RemoteCallRequest<B::Header>,
_: StorageProof,
) -> Result<Vec<u8>, ClientError> {
match self.ok {
true => Ok(vec![42]),
false => Err(ClientError::Backend("Test error".into())),
}
}
fn check_changes_proof(
&self,
_: &RemoteChangesRequest<B::Header>,
_: ChangesProof<B::Header>,
) -> Result<Vec<(NumberFor<B>, u32)>, ClientError> {
match self.ok {
true => Ok(vec![(100u32.into(), 2)]),
false => Err(ClientError::Backend("Test error".into())),
}
}
fn check_body_proof(
&self,
_: &RemoteBodyRequest<B::Header>,
body: Vec<B::Extrinsic>,
) -> Result<Vec<B::Extrinsic>, ClientError> {
match self.ok {
true => Ok(body),
false => Err(ClientError::Backend("Test error".into())),
}
}
}
pub fn protocol_id() -> ProtocolId {
ProtocolId::from("test")
}
pub fn peerset() -> (sc_peerset::Peerset, sc_peerset::PeersetHandle) {
let cfg = sc_peerset::SetConfig {
in_peers: 128,
out_peers: 128,
bootnodes: Default::default(),
reserved_only: false,
reserved_nodes: Default::default(),
};
sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { sets: vec![cfg] })
}
pub fn dummy_header() -> sp_test_primitives::Header {
sp_test_primitives::Header {
parent_hash: Default::default(),
number: 0,
state_root: Default::default(),
extrinsics_root: Default::default(),
digest: Default::default(),
}
}
type Block =
sp_runtime::generic::Block<Header<u64, BlakeTwo256>, substrate_test_runtime::Extrinsic>;
fn send_receive(request: sender::Request<Block>, pool: &LocalPool) {
let client = Arc::new(substrate_test_runtime_client::new());
let (handler, protocol_config) =
handler::LightClientRequestHandler::new(&protocol_id(), client);
pool.spawner().spawn_obj(handler.run().boxed().into()).unwrap();
let (_peer_set, peer_set_handle) = peerset();
let mut sender = sender::LightClientRequestSender::<Block>::new(
&protocol_id(),
Arc::new(crate::light_client_requests::tests::DummyFetchChecker {
ok: true,
_mark: std::marker::PhantomData,
}),
peer_set_handle,
);
sender.inject_connected(PeerId::random());
sender.request(request).unwrap();
let sender::OutEvent::SendRequest { pending_response, request, .. } =
block_on(sender.next()).unwrap();
let (tx, rx) = oneshot::channel();
block_on(protocol_config.inbound_queue.unwrap().send(IncomingRequest {
peer: PeerId::random(),
payload: request,
pending_response: tx,
}))
.unwrap();
pool.spawner()
.spawn_obj(
async move {
pending_response.send(Ok(rx.await.unwrap().result.unwrap())).unwrap();
}
.boxed()
.into(),
)
.unwrap();
pool.spawner()
.spawn_obj(sender.for_each(|_| future::ready(())).boxed().into())
.unwrap();
}
#[test]
fn send_receive_call() {
let chan = oneshot::channel();
let request = light::RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: None,
};
let mut pool = LocalPool::new();
send_receive(sender::Request::Call { request, sender: chan.0 }, &pool);
assert_eq!(vec![42], pool.run_until(chan.1).unwrap().unwrap());
// ^--- from `DummyFetchChecker::check_execution_proof`
}
#[test]
fn send_receive_read() {
let chan = oneshot::channel();
let request = light::RemoteReadRequest {
header: dummy_header(),
block: Default::default(),
keys: vec![b":key".to_vec()],
retry_count: None,
};
let mut pool = LocalPool::new();
send_receive(sender::Request::Read { request, sender: chan.0 }, &pool);
assert_eq!(
Some(vec![42]),
pool.run_until(chan.1).unwrap().unwrap().remove(&b":key"[..]).unwrap()
);
// ^--- from `DummyFetchChecker::check_read_proof`
}
#[test]
fn send_receive_read_child() {
let chan = oneshot::channel();
let child_info = ChildInfo::new_default(&b":child_storage:default:sub"[..]);
let request = light::RemoteReadChildRequest {
header: dummy_header(),
block: Default::default(),
storage_key: child_info.prefixed_storage_key(),
keys: vec![b":key".to_vec()],
retry_count: None,
};
let mut pool = LocalPool::new();
send_receive(sender::Request::ReadChild { request, sender: chan.0 }, &pool);
assert_eq!(
Some(vec![42]),
pool.run_until(chan.1).unwrap().unwrap().remove(&b":key"[..]).unwrap()
);
// ^--- from `DummyFetchChecker::check_read_child_proof`
}
#[test]
fn send_receive_header() {
sp_tracing::try_init_simple();
let chan = oneshot::channel();
let request = light::RemoteHeaderRequest {
cht_root: Default::default(),
block: 1,
retry_count: None,
};
let mut pool = LocalPool::new();
send_receive(sender::Request::Header { request, sender: chan.0 }, &pool);
// The remote does not know block 1:
assert_matches!(pool.run_until(chan.1).unwrap(), Err(ClientError::RemoteFetchFailed));
}
#[test]
fn send_receive_changes() {
let chan = oneshot::channel();
let request = light::RemoteChangesRequest {
changes_trie_configs: vec![sp_core::ChangesTrieConfigurationRange {
zero: (0, Default::default()),
end: None,
config: Some(sp_core::ChangesTrieConfiguration::new(4, 2)),
}],
first_block: (1, Default::default()),
last_block: (100, Default::default()),
max_block: (100, Default::default()),
tries_roots: (1, Default::default(), Vec::new()),
key: Vec::new(),
storage_key: None,
retry_count: None,
};
let mut pool = LocalPool::new();
send_receive(sender::Request::Changes { request, sender: chan.0 }, &pool);
assert_eq!(vec![(100, 2)], pool.run_until(chan.1).unwrap().unwrap());
// ^--- from `DummyFetchChecker::check_changes_proof`
}
}
@@ -32,17 +32,14 @@ use codec::{self, Decode, Encode};
use futures::{channel::mpsc, prelude::*};
use log::{debug, trace};
use prost::Message;
use sc_client_api::{light, StorageProof};
use sc_client_api::StorageProof;
use sc_peerset::ReputationChange;
use sp_core::{
hexdisplay::HexDisplay,
storage::{ChildInfo, ChildType, PrefixedStorageKey, StorageKey},
storage::{ChildInfo, ChildType, PrefixedStorageKey},
};
use sp_runtime::{
generic::BlockId,
traits::{Block, Zero},
};
use std::{collections::BTreeMap, sync::Arc};
use sp_runtime::{generic::BlockId, traits::Block};
use std::sync::Arc;
const LOG_TARGET: &str = "light-client-request-handler";
@@ -137,12 +134,12 @@ impl<B: Block> LightClientRequestHandler<B> {
self.on_remote_call_request(&peer, r)?,
Some(schema::v1::light::request::Request::RemoteReadRequest(r)) =>
self.on_remote_read_request(&peer, r)?,
Some(schema::v1::light::request::Request::RemoteHeaderRequest(r)) =>
self.on_remote_header_request(&peer, r)?,
Some(schema::v1::light::request::Request::RemoteHeaderRequest(_r)) =>
return Err(HandleRequestError::BadRequest("Not supported.")),
Some(schema::v1::light::request::Request::RemoteReadChildRequest(r)) =>
self.on_remote_read_child_request(&peer, r)?,
Some(schema::v1::light::request::Request::RemoteChangesRequest(r)) =>
self.on_remote_changes_request(&peer, r)?,
Some(schema::v1::light::request::Request::RemoteChangesRequest(_r)) =>
return Err(HandleRequestError::BadRequest("Not supported.")),
None =>
return Err(HandleRequestError::BadRequest("Remote request without request data.")),
};
@@ -285,106 +282,6 @@ impl<B: Block> LightClientRequestHandler<B> {
Ok(schema::v1::light::Response { response: Some(response) })
}
fn on_remote_header_request(
&mut self,
peer: &PeerId,
request: &schema::v1::light::RemoteHeaderRequest,
) -> Result<schema::v1::light::Response, HandleRequestError> {
trace!("Remote header proof request from {} ({:?}).", peer, request.block);
let block = Decode::decode(&mut request.block.as_ref())?;
let (header, proof) = match self.client.header_proof(&BlockId::Number(block)) {
Ok((header, proof)) => (header.encode(), proof),
Err(error) => {
trace!(
"Remote header proof request from {} ({:?}) failed with: {}.",
peer,
request.block,
error
);
(Default::default(), StorageProof::empty())
},
};
let response = {
let r = schema::v1::light::RemoteHeaderResponse { header, proof: proof.encode() };
schema::v1::light::response::Response::RemoteHeaderResponse(r)
};
Ok(schema::v1::light::Response { response: Some(response) })
}
fn on_remote_changes_request(
&mut self,
peer: &PeerId,
request: &schema::v1::light::RemoteChangesRequest,
) -> Result<schema::v1::light::Response, HandleRequestError> {
trace!(
"Remote changes proof request from {} for key {} ({:?}..{:?}).",
peer,
if !request.storage_key.is_empty() {
format!(
"{} : {}",
HexDisplay::from(&request.storage_key),
HexDisplay::from(&request.key)
)
} else {
HexDisplay::from(&request.key).to_string()
},
request.first,
request.last,
);
let first = Decode::decode(&mut request.first.as_ref())?;
let last = Decode::decode(&mut request.last.as_ref())?;
let min = Decode::decode(&mut request.min.as_ref())?;
let max = Decode::decode(&mut request.max.as_ref())?;
let key = StorageKey(request.key.clone());
let storage_key = if request.storage_key.is_empty() {
None
} else {
Some(PrefixedStorageKey::new_ref(&request.storage_key))
};
let proof =
match self.client.key_changes_proof(first, last, min, max, storage_key, &key) {
Ok(proof) => proof,
Err(error) => {
trace!(
"Remote changes proof request from {} for key {} ({:?}..{:?}) failed with: {}.",
peer,
format!("{} : {}", HexDisplay::from(&request.storage_key), HexDisplay::from(&key.0)),
request.first,
request.last,
error,
);
light::ChangesProof::<B::Header> {
max_block: Zero::zero(),
proof: Vec::new(),
roots: BTreeMap::new(),
roots_proof: StorageProof::empty(),
}
},
};
let response = {
let r = schema::v1::light::RemoteChangesResponse {
max: proof.max_block.encode(),
proof: proof.proof,
roots: proof
.roots
.into_iter()
.map(|(k, v)| schema::v1::light::Pair { fst: k.encode(), snd: v.encode() })
.collect(),
roots_proof: proof.roots_proof.encode(),
};
schema::v1::light::response::Response::RemoteChangesResponse(r)
};
Ok(schema::v1::light::Response { response: Some(response) })
}
}
#[derive(derive_more::Display, derive_more::From)]
File diff suppressed because it is too large Load Diff
@@ -1,241 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2017-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! On-demand requests service.
use crate::light_client_requests;
use futures::{channel::oneshot, prelude::*};
use parking_lot::Mutex;
use sc_client_api::{
ChangesProof, FetchChecker, Fetcher, RemoteBodyRequest, RemoteCallRequest,
RemoteChangesRequest, RemoteHeaderRequest, RemoteReadChildRequest, RemoteReadRequest,
StorageProof,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_blockchain::Error as ClientError;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use std::{
collections::HashMap,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
/// Implements the `Fetcher` trait of the client. Makes it possible for the light client to perform
/// network requests for some state.
///
/// This implementation stores all the requests in a queue. The network, in parallel, is then
/// responsible for pulling elements out of that queue and fulfilling them.
pub struct OnDemand<B: BlockT> {
/// Objects that checks whether what has been retrieved is correct.
checker: Arc<dyn FetchChecker<B>>,
/// Queue of requests. Set to `Some` at initialization, then extracted by the network.
///
/// Note that a better alternative would be to use a MPMC queue here, and add a `poll` method
/// from the `OnDemand`. However there exists no popular implementation of MPMC channels in
/// asynchronous Rust at the moment
requests_queue:
Mutex<Option<TracingUnboundedReceiver<light_client_requests::sender::Request<B>>>>,
/// Sending side of `requests_queue`.
requests_send: TracingUnboundedSender<light_client_requests::sender::Request<B>>,
}
#[derive(Debug, thiserror::Error)]
#[error("AlwaysBadChecker")]
struct ErrorAlwaysBadChecker;
impl Into<ClientError> for ErrorAlwaysBadChecker {
fn into(self) -> ClientError {
ClientError::Application(Box::new(self))
}
}
/// Dummy implementation of `FetchChecker` that always assumes that responses are bad.
///
/// Considering that it is the responsibility of the client to build the fetcher, it can use this
/// implementation if it knows that it will never perform any request.
#[derive(Default, Clone)]
pub struct AlwaysBadChecker;
impl<Block: BlockT> FetchChecker<Block> for AlwaysBadChecker {
fn check_header_proof(
&self,
_request: &RemoteHeaderRequest<Block::Header>,
_remote_header: Option<Block::Header>,
_remote_proof: StorageProof,
) -> Result<Block::Header, ClientError> {
Err(ErrorAlwaysBadChecker.into())
}
fn check_read_proof(
&self,
_request: &RemoteReadRequest<Block::Header>,
_remote_proof: StorageProof,
) -> Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError> {
Err(ErrorAlwaysBadChecker.into())
}
fn check_read_child_proof(
&self,
_request: &RemoteReadChildRequest<Block::Header>,
_remote_proof: StorageProof,
) -> Result<HashMap<Vec<u8>, Option<Vec<u8>>>, ClientError> {
Err(ErrorAlwaysBadChecker.into())
}
fn check_execution_proof(
&self,
_request: &RemoteCallRequest<Block::Header>,
_remote_proof: StorageProof,
) -> Result<Vec<u8>, ClientError> {
Err(ErrorAlwaysBadChecker.into())
}
fn check_changes_proof(
&self,
_request: &RemoteChangesRequest<Block::Header>,
_remote_proof: ChangesProof<Block::Header>,
) -> Result<Vec<(NumberFor<Block>, u32)>, ClientError> {
Err(ErrorAlwaysBadChecker.into())
}
fn check_body_proof(
&self,
_request: &RemoteBodyRequest<Block::Header>,
_body: Vec<Block::Extrinsic>,
) -> Result<Vec<Block::Extrinsic>, ClientError> {
Err(ErrorAlwaysBadChecker.into())
}
}
impl<B: BlockT> OnDemand<B>
where
B::Header: HeaderT,
{
/// Creates new on-demand service.
pub fn new(checker: Arc<dyn FetchChecker<B>>) -> Self {
let (requests_send, requests_queue) = tracing_unbounded("mpsc_ondemand");
let requests_queue = Mutex::new(Some(requests_queue));
Self { checker, requests_queue, requests_send }
}
/// Get checker reference.
pub fn checker(&self) -> &Arc<dyn FetchChecker<B>> {
&self.checker
}
/// Extracts the queue of requests.
///
/// Whenever one of the methods of the `Fetcher` trait is called, an element is pushed on this
/// channel.
///
/// If this function returns `None`, that means that the receiver has already been extracted in
/// the past, and therefore that something already handles the requests.
pub(crate) fn extract_receiver(
&self,
) -> Option<TracingUnboundedReceiver<light_client_requests::sender::Request<B>>> {
self.requests_queue.lock().take()
}
}
impl<B> Fetcher<B> for OnDemand<B>
where
B: BlockT,
B::Header: HeaderT,
{
type RemoteHeaderResult = RemoteResponse<B::Header>;
type RemoteReadResult = RemoteResponse<HashMap<Vec<u8>, Option<Vec<u8>>>>;
type RemoteCallResult = RemoteResponse<Vec<u8>>;
type RemoteChangesResult = RemoteResponse<Vec<(NumberFor<B>, u32)>>;
type RemoteBodyResult = RemoteResponse<Vec<B::Extrinsic>>;
fn remote_header(&self, request: RemoteHeaderRequest<B::Header>) -> Self::RemoteHeaderResult {
let (sender, receiver) = oneshot::channel();
let _ = self
.requests_send
.unbounded_send(light_client_requests::sender::Request::Header { request, sender });
RemoteResponse { receiver }
}
fn remote_read(&self, request: RemoteReadRequest<B::Header>) -> Self::RemoteReadResult {
let (sender, receiver) = oneshot::channel();
let _ = self
.requests_send
.unbounded_send(light_client_requests::sender::Request::Read { request, sender });
RemoteResponse { receiver }
}
fn remote_read_child(
&self,
request: RemoteReadChildRequest<B::Header>,
) -> Self::RemoteReadResult {
let (sender, receiver) = oneshot::channel();
let _ = self
.requests_send
.unbounded_send(light_client_requests::sender::Request::ReadChild { request, sender });
RemoteResponse { receiver }
}
fn remote_call(&self, request: RemoteCallRequest<B::Header>) -> Self::RemoteCallResult {
let (sender, receiver) = oneshot::channel();
let _ = self
.requests_send
.unbounded_send(light_client_requests::sender::Request::Call { request, sender });
RemoteResponse { receiver }
}
fn remote_changes(
&self,
request: RemoteChangesRequest<B::Header>,
) -> Self::RemoteChangesResult {
let (sender, receiver) = oneshot::channel();
let _ = self
.requests_send
.unbounded_send(light_client_requests::sender::Request::Changes { request, sender });
RemoteResponse { receiver }
}
fn remote_body(&self, request: RemoteBodyRequest<B::Header>) -> Self::RemoteBodyResult {
let (sender, receiver) = oneshot::channel();
let _ = self
.requests_send
.unbounded_send(light_client_requests::sender::Request::Body { request, sender });
RemoteResponse { receiver }
}
}
/// Future for an on-demand remote call response.
pub struct RemoteResponse<T> {
receiver: oneshot::Receiver<Result<T, ClientError>>,
}
impl<T> Future for RemoteResponse<T> {
type Output = Result<T, ClientError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.receiver.poll_unpin(cx) {
Poll::Ready(Ok(res)) => Poll::Ready(res),
Poll::Ready(Err(_)) => Poll::Ready(Err(ClientError::RemoteFetchCancelled)),
Poll::Pending => Poll::Pending,
}
}
}
-37
View File
@@ -33,11 +33,9 @@ use crate::{
config::{parse_str_addr, Params, TransportConfig},
discovery::DiscoveryConfig,
error::Error,
light_client_requests,
network_state::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
on_demand_layer::AlwaysBadChecker,
protocol::{
self,
event::Event,
@@ -238,12 +236,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
}
})?;
let checker = params
.on_demand
.as_ref()
.map(|od| od.checker().clone())
.unwrap_or_else(|| Arc::new(AlwaysBadChecker));
let num_connected = Arc::new(AtomicUsize::new(0));
let is_major_syncing = Arc::new(AtomicBool::new(false));
@@ -255,14 +247,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
params.network_config.client_version, params.network_config.node_name
);
let light_client_request_sender = {
light_client_requests::sender::LightClientRequestSender::new(
&params.protocol_id,
checker,
peerset_handle.clone(),
)
};
let discovery_config = {
let mut config = DiscoveryConfig::new(local_public.clone());
config.with_permanent_addresses(known_addresses);
@@ -347,7 +331,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
protocol,
user_agent,
local_public,
light_client_request_sender,
discovery_config,
params.block_request_protocol_config,
params.state_request_protocol_config,
@@ -447,7 +430,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
service,
import_queue: params.import_queue,
from_service,
light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()),
event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
peers_notifications_sinks,
tx_handler_controller,
@@ -1464,8 +1446,6 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
import_queue: Box<dyn ImportQueue<B>>,
/// Messages from the [`NetworkService`] that must be processed.
from_service: TracingUnboundedReceiver<ServiceToWorkerMsg<B, H>>,
/// Receiver for queries from the light client that must be processed.
light_client_rqs: Option<TracingUnboundedReceiver<light_client_requests::sender::Request<B>>>,
/// Senders for events that happen on the network.
event_streams: out_events::OutChannels,
/// Prometheus network metrics.
@@ -1489,23 +1469,6 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.import_queue
.poll_actions(cx, &mut NetworkLink { protocol: &mut this.network_service });
// Check for new incoming light client requests.
if let Some(light_client_rqs) = this.light_client_rqs.as_mut() {
while let Poll::Ready(Some(rq)) = light_client_rqs.poll_next_unpin(cx) {
let result = this.network_service.behaviour_mut().light_client_request(rq);
match result {
Ok(()) => {},
Err(light_client_requests::sender::SendRequestError::TooManyRequests) => {
warn!("Couldn't start light client request: too many pending requests");
},
}
if let Some(metrics) = this.metrics.as_ref() {
metrics.issued_light_requests.inc();
}
}
}
// At the time of writing of this comment, due to a high volume of messages, the network
// worker sometimes takes a long time to process the loop below. When that happens, the
// rest of the polling is frozen. In order to avoid negative side-effects caused by this
@@ -116,7 +116,6 @@ fn build_test_full_node(
}),
network_config: config,
chain: client.clone(),
on_demand: None,
transaction_pool: Arc::new(crate::config::EmptyTransactionPool),
protocol_id,
import_queue,
+27 -158
View File
@@ -48,13 +48,13 @@ use sc_consensus::{
};
pub use sc_network::config::EmptyTransactionPool;
use sc_network::{
block_request_handler::{self, BlockRequestHandler},
block_request_handler::BlockRequestHandler,
config::{
MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode,
ProtocolConfig, ProtocolId, Role, SyncMode, TransportConfig,
},
light_client_requests::{self, handler::LightClientRequestHandler},
state_request_handler::{self, StateRequestHandler},
light_client_requests::handler::LightClientRequestHandler,
state_request_handler::StateRequestHandler,
warp_request_handler, Multiaddr, NetworkService, NetworkWorker,
};
use sc_service::client::Client;
@@ -133,25 +133,20 @@ pub type PeersFullClient = Client<
Block,
substrate_test_runtime_client::runtime::RuntimeApi,
>;
pub type PeersLightClient = Client<
substrate_test_runtime_client::LightBackend,
substrate_test_runtime_client::LightExecutor,
Block,
substrate_test_runtime_client::runtime::RuntimeApi,
>;
#[derive(Clone)]
pub enum PeersClient {
Full(Arc<PeersFullClient>, Arc<substrate_test_runtime_client::Backend>),
Light(Arc<PeersLightClient>, Arc<substrate_test_runtime_client::LightBackend>),
pub struct PeersClient {
client: Arc<PeersFullClient>,
backend: Arc<substrate_test_runtime_client::Backend>,
}
impl PeersClient {
pub fn as_full(&self) -> Option<Arc<PeersFullClient>> {
match *self {
PeersClient::Full(ref client, _) => Some(client.clone()),
_ => None,
}
pub fn as_client(&self) -> Arc<PeersFullClient> {
self.client.clone()
}
pub fn as_backend(&self) -> Arc<substrate_test_runtime_client::Backend> {
self.backend.clone()
}
pub fn as_block_import(&self) -> BlockImportAdapter<Self> {
@@ -159,27 +154,18 @@ impl PeersClient {
}
pub fn get_aux(&self, key: &[u8]) -> ClientResult<Option<Vec<u8>>> {
match *self {
PeersClient::Full(ref client, _) => client.get_aux(key),
PeersClient::Light(ref client, _) => client.get_aux(key),
}
self.client.get_aux(key)
}
pub fn info(&self) -> BlockchainInfo<Block> {
match *self {
PeersClient::Full(ref client, _) => client.chain_info(),
PeersClient::Light(ref client, _) => client.chain_info(),
}
self.client.info()
}
pub fn header(
&self,
block: &BlockId<Block>,
) -> ClientResult<Option<<Block as BlockT>::Header>> {
match *self {
PeersClient::Full(ref client, _) => client.header(block),
PeersClient::Light(ref client, _) => client.header(block),
}
self.client.header(block)
}
pub fn has_state_at(&self, block: &BlockId<Block>) -> bool {
@@ -187,33 +173,19 @@ impl PeersClient {
Some(header) => header,
None => return false,
};
match self {
PeersClient::Full(_client, backend) =>
backend.have_state_at(&header.hash(), *header.number()),
PeersClient::Light(_client, backend) =>
backend.have_state_at(&header.hash(), *header.number()),
}
self.backend.have_state_at(&header.hash(), *header.number())
}
pub fn justifications(&self, block: &BlockId<Block>) -> ClientResult<Option<Justifications>> {
match *self {
PeersClient::Full(ref client, _) => client.justifications(block),
PeersClient::Light(ref client, _) => client.justifications(block),
}
self.client.justifications(block)
}
pub fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
match *self {
PeersClient::Full(ref client, _) => client.finality_notification_stream(),
PeersClient::Light(ref client, _) => client.finality_notification_stream(),
}
self.client.finality_notification_stream()
}
pub fn import_notification_stream(&self) -> ImportNotifications<Block> {
match *self {
PeersClient::Full(ref client, _) => client.import_notification_stream(),
PeersClient::Light(ref client, _) => client.import_notification_stream(),
}
self.client.import_notification_stream()
}
pub fn finalize_block(
@@ -222,12 +194,7 @@ impl PeersClient {
justification: Option<Justification>,
notify: bool,
) -> ClientResult<()> {
match *self {
PeersClient::Full(ref client, ref _backend) =>
client.finalize_block(id, justification, notify),
PeersClient::Light(ref client, ref _backend) =>
client.finalize_block(id, justification, notify),
}
self.client.finalize_block(id, justification, notify)
}
}
@@ -240,10 +207,7 @@ impl BlockImport<Block> for PeersClient {
&mut self,
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
match self {
PeersClient::Full(client, _) => client.check_block(block).await,
PeersClient::Light(client, _) => client.check_block(block).await,
}
self.client.check_block(block).await
}
async fn import_block(
@@ -251,12 +215,7 @@ impl BlockImport<Block> for PeersClient {
block: BlockImportParams<Block, ()>,
cache: HashMap<well_known_cache_keys::Id, Vec<u8>>,
) -> Result<ImportResult, Self::Error> {
match self {
PeersClient::Full(client, _) =>
client.import_block(block.clear_storage_changes_and_mutate(), cache).await,
PeersClient::Light(client, _) =>
client.import_block(block.clear_storage_changes_and_mutate(), cache).await,
}
self.client.import_block(block.clear_storage_changes_and_mutate(), cache).await
}
}
@@ -370,8 +329,7 @@ where
BlockBuilder<Block, PeersFullClient, substrate_test_runtime_client::Backend>,
) -> Block,
{
let full_client =
self.client.as_full().expect("blocks could only be generated by full clients");
let full_client = self.client.as_client();
let mut at = full_client.header(&at).unwrap().unwrap().hash();
for _ in 0..count {
let builder =
@@ -779,11 +737,11 @@ where
let (c, longest_chain) = test_client_builder.build_with_longest_chain();
let client = Arc::new(c);
let (block_import, justification_import, data) =
self.make_block_import(PeersClient::Full(client.clone(), backend.clone()));
let (block_import, justification_import, data) = self
.make_block_import(PeersClient { client: client.clone(), backend: backend.clone() });
let verifier = self.make_verifier(
PeersClient::Full(client.clone(), backend.clone()),
PeersClient { client: client.clone(), backend: backend.clone() },
&Default::default(),
&data,
);
@@ -868,7 +826,6 @@ where
}),
network_config,
chain: client.clone(),
on_demand: None,
transaction_pool: Arc::new(EmptyTransactionPool),
protocol_id,
import_queue,
@@ -899,7 +856,7 @@ where
peers.push(Peer {
data,
client: PeersClient::Full(client.clone(), backend.clone()),
client: PeersClient { client: client.clone(), backend: backend.clone() },
select_chain: Some(longest_chain),
backend: Some(backend),
imported_blocks_stream,
@@ -912,94 +869,6 @@ where
});
}
/// Add a light peer.
fn add_light_peer(&mut self) {
let (c, backend) = substrate_test_runtime_client::new_light();
let client = Arc::new(c);
let (block_import, justification_import, data) =
self.make_block_import(PeersClient::Light(client.clone(), backend.clone()));
let verifier = self.make_verifier(
PeersClient::Light(client.clone(), backend.clone()),
&Default::default(),
&data,
);
let verifier = VerifierAdapter::new(verifier);
let import_queue = Box::new(BasicQueue::new(
verifier.clone(),
Box::new(block_import.clone()),
justification_import,
&sp_core::testing::TaskExecutor::new(),
None,
));
let listen_addr = build_multiaddr![Memory(rand::random::<u64>())];
let mut network_config =
NetworkConfiguration::new("test-node", "test-client", Default::default(), None);
network_config.transport = TransportConfig::MemoryOnly;
network_config.listen_addresses = vec![listen_addr.clone()];
network_config.allow_non_globals_in_dht = true;
let protocol_id = ProtocolId::from("test-protocol-name");
let block_request_protocol_config =
block_request_handler::generate_protocol_config(&protocol_id);
let state_request_protocol_config =
state_request_handler::generate_protocol_config(&protocol_id);
let light_client_request_protocol_config =
light_client_requests::generate_protocol_config(&protocol_id);
let network = NetworkWorker::new(sc_network::config::Params {
role: Role::Light,
executor: None,
transactions_handler_executor: Box::new(|task| {
async_std::task::spawn(task);
}),
network_config,
chain: client.clone(),
on_demand: None,
transaction_pool: Arc::new(EmptyTransactionPool),
protocol_id,
import_queue,
block_announce_validator: Box::new(DefaultBlockAnnounceValidator),
metrics_registry: None,
block_request_protocol_config,
state_request_protocol_config,
light_client_request_protocol_config,
warp_sync: None,
})
.unwrap();
self.mut_peers(|peers| {
for peer in peers.iter_mut() {
peer.network.add_known_address(
network.service().local_peer_id().clone(),
listen_addr.clone(),
);
}
let imported_blocks_stream = Box::pin(client.import_notification_stream().fuse());
let finality_notification_stream =
Box::pin(client.finality_notification_stream().fuse());
peers.push(Peer {
data,
verifier,
select_chain: None,
backend: None,
block_import,
client: PeersClient::Light(client, backend),
imported_blocks_stream,
finality_notification_stream,
network,
listen_addr,
});
});
}
/// Used to spawn background tasks, e.g. the block request protocol handler.
fn spawn_task(&self, f: BoxFuture<'static, ()>) {
async_std::task::spawn(f);
+3 -102
View File
@@ -20,7 +20,6 @@ use super::*;
use futures::{executor::block_on, Future};
use sp_consensus::{block_validation::Validation, BlockOrigin};
use sp_runtime::Justifications;
use std::time::Duration;
use substrate_test_runtime::Header;
fn test_ancestor_search_when_common_is(n: usize) {
@@ -391,35 +390,6 @@ fn own_blocks_are_announced() {
(net.peers()[2].blockchain_canon_equals(peer0));
}
#[test]
fn blocks_are_not_announced_by_light_nodes() {
sp_tracing::try_init_simple();
let mut net = TestNet::new(0);
// full peer0 is connected to light peer
// light peer1 is connected to full peer2
net.add_full_peer();
net.add_light_peer();
// Sync between 0 and 1.
net.peer(0).push_blocks(1, false);
assert_eq!(net.peer(0).client.info().best_number, 1);
net.block_until_sync();
assert_eq!(net.peer(1).client.info().best_number, 1);
// Add another node and remove node 0.
net.add_full_peer();
net.peers.remove(0);
// Poll for a few seconds and make sure 1 and 2 (now 0 and 1) don't sync together.
let mut delay = futures_timer::Delay::new(Duration::from_secs(5));
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
Pin::new(&mut delay).poll(cx)
}));
assert_eq!(net.peer(1).client.info().best_number, 0);
}
#[test]
fn can_sync_small_non_best_forks() {
sp_tracing::try_init_simple();
@@ -483,72 +453,6 @@ fn can_sync_small_non_best_forks() {
}));
}
#[test]
fn can_not_sync_from_light_peer() {
sp_tracing::try_init_simple();
// given the network with 1 full nodes (#0) and 1 light node (#1)
let mut net = TestNet::new(1);
net.add_light_peer();
// generate some blocks on #0
net.peer(0).push_blocks(1, false);
// and let the light client sync from this node
net.block_until_sync();
// ensure #0 && #1 have the same best block
let full0_info = net.peer(0).client.info();
let light_info = net.peer(1).client.info();
assert_eq!(full0_info.best_number, 1);
assert_eq!(light_info.best_number, 1);
assert_eq!(light_info.best_hash, full0_info.best_hash);
// add new full client (#2) && remove #0
net.add_full_peer();
net.peers.remove(0);
// ensure that the #2 (now #1) fails to sync block #1 even after 5 seconds
let mut test_finished = futures_timer::Delay::new(Duration::from_secs(5));
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
Pin::new(&mut test_finished).poll(cx)
}));
}
#[test]
fn light_peer_imports_header_from_announce() {
sp_tracing::try_init_simple();
fn import_with_announce(net: &mut TestNet, hash: H256) {
net.peer(0).announce_block(hash, None);
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(1).client().header(&BlockId::Hash(hash)).unwrap().is_some() {
Poll::Ready(())
} else {
Poll::Pending
}
}));
}
// given the network with 1 full nodes (#0) and 1 light node (#1)
let mut net = TestNet::new(1);
net.add_light_peer();
// let them connect to each other
net.block_until_sync();
// check that NEW block is imported from announce message
let new_hash = net.peer(0).push_blocks(1, false);
import_with_announce(&mut net, new_hash);
// check that KNOWN STALE block is imported from announce message
let known_stale_hash = net.peer(0).push_blocks_at(BlockId::Number(0), 1, true);
import_with_announce(&mut net, known_stale_hash);
}
#[test]
fn can_sync_explicit_forks() {
sp_tracing::try_init_simple();
@@ -1210,16 +1114,14 @@ fn syncs_indexed_blocks() {
assert!(net
.peer(0)
.client()
.as_full()
.unwrap()
.as_client()
.indexed_transaction(&indexed_key)
.unwrap()
.is_some());
assert!(net
.peer(1)
.client()
.as_full()
.unwrap()
.as_client()
.indexed_transaction(&indexed_key)
.unwrap()
.is_none());
@@ -1228,8 +1130,7 @@ fn syncs_indexed_blocks() {
assert!(net
.peer(1)
.client()
.as_full()
.unwrap()
.as_client()
.indexed_transaction(&indexed_key)
.unwrap()
.is_some());