feat(light client): fetch block body from remote (#2527)

* feat(on_demand): block body request

* fix(light block req): no justific + one block

* fix(bad rebase)

* feat(protocol): add messages for `remote_body`

* fix(on demand body): remove needless `take()`

* fix(network): remove messages for `on_demand_body`

* fix(grumbles): use `hash` in `remote_body_requests`

As long as we can't compute `ordered_trie_root(body)` just compare that request.header.hash() == response.header.hash()

* fix(grumbles): `hdr.ext_root == trie_root(body)`

* fix(grumbles): propogate `Err` in `fn body()`

* fix(grumbles): Vec<Block::Extrinsic>

* fix(grumbles): util_fn for `not_impl` in tests

* fix(on remote body): tests `fetch` and `on_demand`

* docs(resolve todos)
This commit is contained in:
Niklas Adolfsson
2019-05-18 02:05:00 +02:00
committed by DemiMarie-parity
parent 55937d1f08
commit 009898f309
6 changed files with 328 additions and 27 deletions
+1 -1
View File
@@ -205,7 +205,7 @@ pub mod generic {
FinalityProofRequest(FinalityProofRequest<Hash>),
/// Finality proof reponse.
FinalityProofResponse(FinalityProofResponse<Hash>),
/// Chain-specific message
/// Chain-specific message.
#[codec(index = "255")]
ChainSpecific(Vec<u8>),
}
+166 -6
View File
@@ -22,13 +22,12 @@ use std::time::{Instant, Duration};
use log::{trace, info};
use futures::{Async, Future, Poll};
use futures::sync::oneshot::{channel, Receiver, Sender as OneShotSender};
use linked_hash_map::LinkedHashMap;
use linked_hash_map::Entry;
use linked_hash_map::{Entry, LinkedHashMap};
use parking_lot::Mutex;
use client::error::Error as ClientError;
use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof,
RemoteReadChildRequest};
RemoteReadChildRequest, RemoteBodyRequest};
use crate::message;
use network_libp2p::PeerId;
use crate::config::Roles;
@@ -76,6 +75,16 @@ pub trait OnDemandService<Block: BlockT>: Send + Sync {
peer: PeerId,
response: message::RemoteChangesResponse<NumberFor<Block>, Block::Hash>
);
/// When body response is received from remote node.
fn on_remote_body_response(
&self,
peer: PeerId,
response: message::BlockResponse<Block>
);
/// Check whether a block response is an `on_demand` response
fn is_on_demand_response(&self, peer: &PeerId, request_id: message::RequestId) -> bool;
}
/// Trait used by the `OnDemand` service to communicate messages back to the network.
@@ -139,6 +148,7 @@ struct Request<Block: BlockT> {
}
enum RequestData<Block: BlockT> {
RemoteBody(RemoteBodyRequest<Block::Header>, OneShotSender<Result<Vec<Block::Extrinsic>, ClientError>>),
RemoteHeader(RemoteHeaderRequest<Block::Header>, OneShotSender<Result<Block::Header, ClientError>>),
RemoteRead(RemoteReadRequest<Block::Header>, OneShotSender<Result<Option<Vec<u8>>, ClientError>>),
RemoteReadChild(
@@ -391,6 +401,41 @@ impl<B> OnDemandService<B> for OnDemand<B> where
data => Accept::Unexpected(data),
})
}
fn on_remote_body_response(&self, peer: PeerId, response: message::BlockResponse<B>) {
self.accept_response("body", peer, response.id, |request| match request.data {
RequestData::RemoteBody(request, sender) => {
let mut bodies: Vec<_> = response
.blocks
.into_iter()
.filter_map(|b| b.body)
.collect();
// Number of bodies are hardcoded to 1 for valid `RemoteBodyResponses`
if bodies.len() != 1 {
return Accept::CheckFailed(
"RemoteBodyResponse: invalid number of blocks".into(),
RequestData::RemoteBody(request, sender),
)
}
let body = bodies.remove(0);
match self.checker.check_body_proof(&request, body) {
Ok(body) => {
let _ = sender.send(Ok(body));
Accept::Ok
}
Err(error) => Accept::CheckFailed(error, RequestData::RemoteBody(request, sender)),
}
}
other => Accept::Unexpected(other),
})
}
fn is_on_demand_response(&self, peer: &PeerId, request_id: message::RequestId) -> bool {
let core = self.core.lock();
core.is_pending_request(&peer, request_id)
}
}
impl<B> Fetcher<B> for OnDemand<B> where
@@ -401,6 +446,7 @@ impl<B> Fetcher<B> for OnDemand<B> where
type RemoteReadResult = RemoteResponse<Option<Vec<u8>>>;
type RemoteCallResult = RemoteResponse<Vec<u8>>;
type RemoteChangesResult = RemoteResponse<Vec<(NumberFor<B>, u32)>>;
type RemoteBodyResult = RemoteResponse<Vec<B::Extrinsic>>;
fn remote_header(&self, request: RemoteHeaderRequest<B::Header>) -> Self::RemoteHeaderResult {
let (sender, receiver) = channel();
@@ -440,12 +486,22 @@ impl<B> Fetcher<B> for OnDemand<B> where
self.schedule_request(request.retry_count.clone(), RequestData::RemoteChanges(request, sender),
RemoteResponse { receiver })
}
fn remote_body(&self, request: RemoteBodyRequest<B::Header>) -> Self::RemoteBodyResult {
let (sender, receiver) = channel();
self.schedule_request(request.retry_count.clone(), RequestData::RemoteBody(request, sender),
RemoteResponse { receiver })
}
}
impl<B> OnDemandCore<B> where
B: BlockT,
B::Header: HeaderT,
{
fn is_pending_request(&self, peer: &PeerId, request_id: message::RequestId) -> bool {
self.active_peers.get(&peer).map_or(false, |r| r.id == request_id)
}
pub fn add_peer(&mut self, peer: PeerId, best_number: NumberFor<B>) {
self.idle_peers.push_back(peer.clone());
self.best_blocks.insert(peer, best_number);
@@ -570,6 +626,7 @@ impl<Block: BlockT> Request<Block> {
RequestData::RemoteReadChild(ref data, _) => *data.header.number(),
RequestData::RemoteCall(ref data, _) => *data.header.number(),
RequestData::RemoteChanges(ref data, _) => data.max_block.0,
RequestData::RemoteBody(ref data, _) => *data.header.number(),
}
}
@@ -610,6 +667,16 @@ impl<Block: BlockT> Request<Block> {
max: data.max_block.1.clone(),
key: data.key.clone(),
}),
RequestData::RemoteBody(ref data, _) => {
message::generic::Message::BlockRequest(message::BlockRequest::<Block> {
id: self.id,
fields: message::BlockAttributes::BODY,
from: message::FromBlock::Hash(data.header.hash()),
to: None,
direction: message::Direction::Ascending,
max: Some(1),
})
}
}
}
}
@@ -623,6 +690,7 @@ impl<Block: BlockT> RequestData<Block> {
RequestData::RemoteRead(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteReadChild(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteChanges(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteBody(_, sender) => { let _ = sender.send(Err(error)); },
}
}
}
@@ -637,12 +705,12 @@ pub mod tests {
use client::{error::{Error as ClientError, Result as ClientResult}};
use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
ChangesProof, RemoteCallRequest, RemoteReadRequest,
RemoteReadChildRequest, RemoteChangesRequest};
RemoteReadChildRequest, RemoteChangesRequest, RemoteBodyRequest};
use crate::config::Roles;
use crate::message;
use network_libp2p::PeerId;
use super::{REQUEST_TIMEOUT, OnDemand, OnDemandNetwork, OnDemandService};
use test_client::runtime::{changes_trie_config, Block, Header};
use test_client::runtime::{changes_trie_config, Block, Extrinsic, Header};
pub struct DummyExecutor;
struct DummyFetchChecker { ok: bool }
@@ -685,12 +753,27 @@ pub mod tests {
}
}
fn check_changes_proof(&self, _: &RemoteChangesRequest<Header>, _: ChangesProof<Header>) -> ClientResult<Vec<(NumberFor<Block>, u32)>> {
fn check_changes_proof(
&self,
_: &RemoteChangesRequest<Header>,
_: ChangesProof<Header>
) -> ClientResult<Vec<(NumberFor<Block>, u32)>> {
match self.ok {
true => Ok(vec![(100, 2)]),
false => Err(ClientError::Backend("Test error".into())),
}
}
fn check_body_proof(
&self,
_: &RemoteBodyRequest<Header>,
body: Vec<Extrinsic>
) -> ClientResult<Vec<Extrinsic>> {
match self.ok {
true => Ok(body),
false => Err(ClientError::Backend("Test error".into())),
}
}
}
fn dummy(ok: bool) -> (Arc<DummyExecutor>, Arc<OnDemand<Block>>) {
@@ -1166,4 +1249,81 @@ pub mod tests {
assert!(on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>().is_empty());
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
}
#[test]
fn remote_body_with_one_block_body_should_succeed() {
let (_x, on_demand) = dummy(true);
let network_interface = Arc::new(DummyNetwork::default());
let peer1 = PeerId::random();
on_demand.set_network_interface(Box::new(network_interface.clone()));
let header = dummy_header();
on_demand.on_connect(peer1.clone(), Roles::FULL, 250);
on_demand.remote_body(RemoteBodyRequest {
header: header.clone(),
retry_count: None,
});
assert!(on_demand.core.lock().pending_requests.is_empty());
assert_eq!(on_demand.core.lock().active_peers.len(), 1);
let block = message::BlockData::<Block> {
hash: primitives::H256::random(),
header: None,
body: Some(Vec::new()),
message_queue: None,
receipt: None,
justification: None,
};
let response = message::generic::BlockResponse {
id: 0,
blocks: vec![block],
};
on_demand.on_remote_body_response(peer1.clone(), response);
assert!(on_demand.core.lock().active_peers.is_empty());
assert_eq!(on_demand.core.lock().idle_peers.len(), 1);
}
#[test]
fn remote_body_with_three_bodies_should_fail() {
let (_x, on_demand) = dummy(true);
let network_interface = Arc::new(DummyNetwork::default());
let peer1 = PeerId::random();
on_demand.set_network_interface(Box::new(network_interface.clone()));
let header = dummy_header();
on_demand.on_connect(peer1.clone(), Roles::FULL, 250);
on_demand.remote_body(RemoteBodyRequest {
header: header.clone(),
retry_count: None,
});
assert!(on_demand.core.lock().pending_requests.is_empty());
assert_eq!(on_demand.core.lock().active_peers.len(), 1);
let response = {
let blocks: Vec<_> = (0..3).map(|_| message::BlockData::<Block> {
hash: primitives::H256::random(),
header: None,
body: Some(Vec::new()),
message_queue: None,
receipt: None,
justification: None,
}).collect();
message::generic::BlockResponse {
id: 0,
blocks,
}
};
on_demand.on_remote_body_response(peer1.clone(), response);
assert!(on_demand.core.lock().active_peers.is_empty());
assert!(on_demand.core.lock().idle_peers.is_empty(), "peer should be disconnected after bad response");
}
}
+19 -4
View File
@@ -324,6 +324,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Future for Protocol<B,
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
fn is_on_demand_response(&self, who: &PeerId, response_id: message::RequestId) -> bool {
self.on_demand.as_ref().map_or(false, |od| od.is_on_demand_response(&who, response_id))
}
fn handle_response(
&mut self,
who: PeerId,
@@ -365,10 +369,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
GenericMessage::Status(s) => self.on_status_message(who, s),
GenericMessage::BlockRequest(r) => self.on_block_request(who, r),
GenericMessage::BlockResponse(r) => {
if let Some(request) = self.handle_response(who.clone(), &r) {
let outcome = self.on_block_response(who.clone(), request, r);
self.update_peer_info(&who);
return outcome
// Note, this is safe because only `ordinary bodies` and `remote bodies` are received in this matter.
if self.is_on_demand_response(&who, r.id) {
self.on_remote_body_response(who, r);
} else {
if let Some(request) = self.handle_response(who.clone(), &r) {
let outcome = self.on_block_response(who.clone(), request, r);
self.update_peer_info(&who);
return outcome
}
}
},
GenericMessage::BlockAnnounce(announce) => {
@@ -1202,6 +1211,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
CustomMessageOutcome::None
}
}
fn on_remote_body_response(&self, peer: PeerId, response: message::BlockResponse<B>) {
self.on_demand
.as_ref()
.map(|od| od.on_remote_body_response(peer, response));
}
}
/// Outcome of an incoming custom message.