Fetching changes proof from remote nodes (#769)

* changes_trie

* changs_trie: continue

* changes_trie: adding tests

* fixed TODO

* removed obsolete ExtrinsicChanges

* encodable ChangesTrieConfiguration

* removed polkadot fle

* fixed grumbles

* ext_storage_changes_root returns u32

* moved changes trie root to digest

* removed commented code

* read storage values from native code

* fixed grumbles

* fixed grumbles

* missing comma

* key changes proof generation + query

* fix grumbles

* check that changes trie config is not changed by block.finalize()

* fixed changes trie config check
This commit is contained in:
Svyatoslav Nikolsky
2018-09-29 11:47:29 +03:00
committed by Gav Wood
parent fdfd4672c1
commit c54350661d
20 changed files with 753 additions and 107 deletions
+20 -1
View File
@@ -18,7 +18,7 @@
use client::{self, Client as SubstrateClient, ImportResult, ClientInfo, BlockStatus, BlockOrigin, CallExecutor};
use client::error::Error;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor};
use runtime_primitives::generic::BlockId;
use runtime_primitives::bft::Justification;
use primitives::{Blake2Hasher};
@@ -61,6 +61,15 @@ pub trait Client<Block: BlockT>: Send + Sync {
/// Get method execution proof.
fn execution_proof(&self, block: &Block::Hash, method: &str, data: &[u8]) -> Result<(Vec<u8>, Vec<Vec<u8>>), Error>;
/// Get key changes proof.
fn key_changes_proof(
&self,
first: Block::Hash,
last: Block::Hash,
max: Block::Hash,
key: &[u8]
) -> Result<(NumberFor<Block>, Vec<Vec<u8>>), Error>;
}
impl<B, E, Block> Client<Block> for SubstrateClient<B, E, Block> where
@@ -116,4 +125,14 @@ impl<B, E, Block> Client<Block> for SubstrateClient<B, E, Block> where
fn execution_proof(&self, block: &Block::Hash, method: &str, data: &[u8]) -> Result<(Vec<u8>, Vec<Vec<u8>>), Error> {
(self as &SubstrateClient<B, E, Block>).execution_proof(&BlockId::Hash(block.clone()), method, data)
}
fn key_changes_proof(
&self,
first: Block::Hash,
last: Block::Hash,
max: Block::Hash,
key: &[u8]
) -> Result<(NumberFor<Block>, Vec<Vec<u8>>), Error> {
(self as &SubstrateClient<B, E, Block>).key_changes_proof(first, last, max, key)
}
}
+34 -2
View File
@@ -20,8 +20,9 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
use codec::{Encode, Decode, Input, Output};
pub use self::generic::{
BlockAnnounce, RemoteCallRequest, RemoteReadRequest,
RemoteHeaderRequest, RemoteHeaderResponse, ConsensusVote,
SignedConsensusVote, FromBlock
RemoteHeaderRequest, RemoteHeaderResponse,
RemoteChangesRequest, RemoteChangesResponse,
ConsensusVote, SignedConsensusVote, FromBlock
};
/// A unique ID of a request.
@@ -274,6 +275,10 @@ pub mod generic {
RemoteHeaderRequest(RemoteHeaderRequest<Number>),
/// Remote header response.
RemoteHeaderResponse(RemoteHeaderResponse<Header>),
/// Remote changes request.
RemoteChangesRequest(RemoteChangesRequest<Hash>),
/// Remote changes reponse.
RemoteChangesResponse(RemoteChangesResponse<Number>),
/// Chain-specific message
#[codec(index = "255")]
ChainSpecific(Vec<u8>),
@@ -372,4 +377,31 @@ pub mod generic {
/// Header proof.
pub proof: Vec<Vec<u8>>,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote changes request.
pub struct RemoteChangesRequest<H> {
/// Unique request id.
pub id: RequestId,
/// Hash of the first block of the range (including first) where changes are requested.
pub first: H,
/// Hash of the last block of the range (including last) where changes are requested.
pub last: H,
/// Hash of the last block that we can use when querying changes.
pub max: H,
/// Storage key which changes are requested.
pub key: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote changes response.
pub struct RemoteChangesResponse<N> {
/// Id of a request this response was made for.
pub id: RequestId,
/// Proof has been generated using block with this number as a max block. Should be
/// less than or equal to the RemoteChangesRequest::max block number.
pub max: N,
/// Changes proof.
pub proof: Vec<Vec<u8>>,
}
}
+104 -27
View File
@@ -24,14 +24,14 @@ use futures::sync::oneshot::{channel, Receiver, Sender};
use linked_hash_map::LinkedHashMap;
use linked_hash_map::Entry;
use parking_lot::Mutex;
use client;
use client::{self, error::{Error as ClientError, ErrorKind as ClientErrorKind}};
use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
RemoteCallRequest, RemoteReadRequest};
RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest};
use io::SyncIo;
use message;
use network_libp2p::{Severity, NodeIndex};
use service;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor};
/// Remote request timeout.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
@@ -62,6 +62,14 @@ pub trait OnDemandService<Block: BlockT>: Send + Sync {
/// When call response is received from remote node.
fn on_remote_call_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteCallResponse);
/// When changes response is received from remote node.
fn on_remote_changes_response(
&self,
io: &mut SyncIo,
peer: NodeIndex,
response: message::RemoteChangesResponse<NumberFor<Block>>
);
}
/// On-demand requests service. Dispatches requests to appropriate peers.
@@ -72,7 +80,7 @@ pub struct OnDemand<B: BlockT, E: service::ExecuteInContext<B>> {
/// On-demand remote call response.
pub struct RemoteResponse<T> {
receiver: Receiver<Result<T, client::error::Error>>,
receiver: Receiver<Result<T, ClientError>>,
}
#[derive(Default)]
@@ -92,24 +100,25 @@ struct Request<Block: BlockT> {
}
enum RequestData<Block: BlockT> {
RemoteHeader(RemoteHeaderRequest<Block::Header>, Sender<Result<Block::Header, client::error::Error>>),
RemoteRead(RemoteReadRequest<Block::Header>, Sender<Result<Option<Vec<u8>>, client::error::Error>>),
RemoteCall(RemoteCallRequest<Block::Header>, Sender<Result<client::CallResult, client::error::Error>>),
RemoteHeader(RemoteHeaderRequest<Block::Header>, Sender<Result<Block::Header, ClientError>>),
RemoteRead(RemoteReadRequest<Block::Header>, Sender<Result<Option<Vec<u8>>, ClientError>>),
RemoteCall(RemoteCallRequest<Block::Header>, Sender<Result<client::CallResult, ClientError>>),
RemoteChanges(RemoteChangesRequest<Block::Header>, Sender<Result<Vec<(NumberFor<Block>, u32)>, ClientError>>),
}
enum Accept<Block: BlockT> {
Ok,
CheckFailed(client::error::Error, RequestData<Block>),
CheckFailed(ClientError, RequestData<Block>),
Unexpected(RequestData<Block>),
}
impl<T> Future for RemoteResponse<T> {
type Item = T;
type Error = client::error::Error;
type Error = ClientError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.receiver.poll()
.map_err(|_| client::error::ErrorKind::RemoteFetchCancelled.into())
.map_err(|_| ClientErrorKind::RemoteFetchCancelled.into())
.and_then(|r| match r {
Async::Ready(Ok(ready)) => Ok(Async::Ready(ready)),
Async::Ready(Err(error)) => Err(error),
@@ -172,7 +181,7 @@ impl<B: BlockT, E> OnDemand<B, E> where
(retry_count - 1, Some(retry_request_data))
} else {
trace!(target: "sync", "Failed to get remote {} response for given number of retries", rtype);
retry_request_data.fail(client::error::ErrorKind::RemoteFetchFailed.into());
retry_request_data.fail(ClientErrorKind::RemoteFetchFailed.into());
(0, None)
}
},
@@ -262,6 +271,22 @@ impl<B, E> OnDemandService<B> for OnDemand<B, E> where
data @ _ => Accept::Unexpected(data),
})
}
fn on_remote_changes_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteChangesResponse<NumberFor<B>>) {
self.accept_response("changes", io, peer, response.id, |request| match request.data {
RequestData::RemoteChanges(request, sender) => match self.checker.check_changes_proof(
&request, response.max, response.proof
) {
Ok(response) => {
// we do not bother if receiver has been dropped already
let _ = sender.send(Ok(response));
Accept::Ok
},
Err(error) => Accept::CheckFailed(error, RequestData::RemoteChanges(request, sender)),
},
data @ _ => Accept::Unexpected(data),
})
}
}
impl<B, E> Fetcher<B> for OnDemand<B, E> where
@@ -272,6 +297,7 @@ impl<B, E> Fetcher<B> for OnDemand<B, E> where
type RemoteHeaderResult = RemoteResponse<B::Header>;
type RemoteReadResult = RemoteResponse<Option<Vec<u8>>>;
type RemoteCallResult = RemoteResponse<client::CallResult>;
type RemoteChangesResult = RemoteResponse<Vec<(NumberFor<B>, u32)>>;
fn remote_header(&self, request: RemoteHeaderRequest<B::Header>) -> Self::RemoteHeaderResult {
let (sender, receiver) = channel();
@@ -290,6 +316,12 @@ impl<B, E> Fetcher<B> for OnDemand<B, E> where
self.schedule_request(request.retry_count.clone(), RequestData::RemoteCall(request, sender),
RemoteResponse { receiver })
}
fn remote_changes(&self, request: RemoteChangesRequest<B::Header>) -> Self::RemoteChangesResult {
let (sender, receiver) = channel();
self.schedule_request(request.retry_count.clone(), RequestData::RemoteChanges(request, sender),
RemoteResponse { receiver })
}
}
impl<B, E> OnDemandCore<B, E> where
@@ -377,35 +409,44 @@ impl<B, E> OnDemandCore<B, E> where
impl<Block: BlockT> Request<Block> {
pub fn message(&self) -> message::Message<Block> {
match self.data {
RequestData::RemoteHeader(ref data, _) => message::generic::Message::RemoteHeaderRequest(
message::RemoteHeaderRequest {
RequestData::RemoteHeader(ref data, _) =>
message::generic::Message::RemoteHeaderRequest(message::RemoteHeaderRequest {
id: self.id,
block: data.block,
}),
RequestData::RemoteRead(ref data, _) => message::generic::Message::RemoteReadRequest(
message::RemoteReadRequest {
RequestData::RemoteRead(ref data, _) =>
message::generic::Message::RemoteReadRequest(message::RemoteReadRequest {
id: self.id,
block: data.block,
key: data.key.clone(),
}),
RequestData::RemoteCall(ref data, _) => message::generic::Message::RemoteCallRequest(
message::RemoteCallRequest {
RequestData::RemoteCall(ref data, _) =>
message::generic::Message::RemoteCallRequest(message::RemoteCallRequest {
id: self.id,
block: data.block,
method: data.method.clone(),
data: data.call_data.clone(),
}),
RequestData::RemoteChanges(ref data, _) =>
message::generic::Message::RemoteChangesRequest(message::RemoteChangesRequest {
id: self.id,
first: data.first_block.1.clone(),
last: data.last_block.1.clone(),
max: data.max_block.1.clone(),
key: data.key.clone(),
}),
}
}
}
impl<Block: BlockT> RequestData<Block> {
pub fn fail(self, error: client::error::Error) {
pub fn fail(self, error: ClientError) {
// don't care if anyone is listening
match self {
RequestData::RemoteHeader(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteCall(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteRead(_, sender) => { let _ = sender.send(Err(error)); },
RequestData::RemoteChanges(_, sender) => { let _ = sender.send(Err(error)); },
}
}
}
@@ -417,15 +458,15 @@ pub mod tests {
use std::time::Instant;
use futures::Future;
use parking_lot::RwLock;
use client;
use client::{self, error::{ErrorKind as ClientErrorKind, Result as ClientResult}};
use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
RemoteCallRequest, RemoteReadRequest};
RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest};
use message;
use network_libp2p::NodeIndex;
use service::{Roles, ExecuteInContext};
use test::TestIo;
use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService};
use test_client::runtime::{Block, Header};
use test_client::runtime::{changes_trie_config, Block, Header};
pub struct DummyExecutor;
struct DummyFetchChecker { ok: bool }
@@ -440,27 +481,34 @@ pub mod tests {
_request: &RemoteHeaderRequest<Header>,
header: Option<Header>,
_remote_proof: Vec<Vec<u8>>
) -> client::error::Result<Header> {
) -> ClientResult<Header> {
match self.ok {
true if header.is_some() => Ok(header.unwrap()),
_ => Err(client::error::ErrorKind::Backend("Test error".into()).into()),
_ => Err(ClientErrorKind::Backend("Test error".into()).into()),
}
}
fn check_read_proof(&self, _request: &RemoteReadRequest<Header>, _remote_proof: Vec<Vec<u8>>) -> client::error::Result<Option<Vec<u8>>> {
fn check_read_proof(&self, _: &RemoteReadRequest<Header>, _: Vec<Vec<u8>>) -> ClientResult<Option<Vec<u8>>> {
match self.ok {
true => Ok(Some(vec![42])),
false => Err(client::error::ErrorKind::Backend("Test error".into()).into()),
false => Err(ClientErrorKind::Backend("Test error".into()).into()),
}
}
fn check_execution_proof(&self, _request: &RemoteCallRequest<Header>, _remote_proof: Vec<Vec<u8>>) -> client::error::Result<client::CallResult> {
fn check_execution_proof(&self, _: &RemoteCallRequest<Header>, _: Vec<Vec<u8>>) -> ClientResult<client::CallResult> {
match self.ok {
true => Ok(client::CallResult {
return_data: vec![42],
changes: Default::default(),
}),
false => Err(client::error::ErrorKind::Backend("Test error".into()).into()),
false => Err(ClientErrorKind::Backend("Test error".into()).into()),
}
}
fn check_changes_proof(&self, _: &RemoteChangesRequest<Header>, _: u64, _: Vec<Vec<u8>>) -> ClientResult<Vec<(u64, u32)>> {
match self.ok {
true => Ok(vec![(100, 2)]),
false => Err(ClientErrorKind::Backend("Test error".into()).into()),
}
}
}
@@ -733,4 +781,33 @@ pub mod tests {
});
thread.join().unwrap();
}
#[test]
fn receives_remote_changes_response() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
let response = on_demand.remote_changes(RemoteChangesRequest {
changes_trie_config: changes_trie_config(),
first_block: (1, Default::default()),
last_block: (100, Default::default()),
max_block: (100, Default::default()),
tries_roots: vec![],
key: vec![],
retry_count: None,
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
assert_eq!(result, vec![(100, 2)]);
});
on_demand.on_remote_changes_response(&mut network, 0, message::RemoteChangesResponse {
id: 0,
max: 1000,
proof: vec![vec![2]],
});
thread.join().unwrap();
}
}
+26 -1
View File
@@ -20,7 +20,7 @@ use std::sync::Arc;
use std::time;
use parking_lot::RwLock;
use rustc_hex::ToHex;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, NumberFor, As};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, NumberFor, As, Zero};
use runtime_primitives::generic::BlockId;
use network_libp2p::{NodeIndex, Severity};
use codec::{Encode, Decode};
@@ -273,6 +273,8 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
GenericMessage::RemoteReadResponse(response) => self.on_remote_read_response(io, who, response),
GenericMessage::RemoteHeaderRequest(request) => self.on_remote_header_request(io, who, request),
GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(io, who, response),
GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(io, who, request),
GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(io, who, response),
other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, other),
}
}
@@ -648,6 +650,29 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
self.on_demand.as_ref().map(|s| s.on_remote_header_response(io, who, response));
}
fn on_remote_changes_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteChangesRequest<B::Hash>) {
trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{})",
request.id, who, request.key.to_hex(), request.first, request.last);
let (max, proof) = match self.context_data.chain.key_changes_proof(request.first, request.last, request.max, &request.key) {
Ok((max, proof)) => (max, proof),
Err(error) => {
trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{}) failed with: {}",
request.id, who, request.key.to_hex(), request.first, request.last, error);
(Zero::zero(), Default::default())
},
};
self.send_message(io, who, GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse {
id: request.id, max, proof,
}));
}
fn on_remote_changes_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteChangesResponse<NumberFor<B>>) {
trace!(target: "sync", "Remote changes proof response {} from {} (max={})",
response.id, who, response.max);
self.on_demand.as_ref().map(|s| s.on_remote_changes_response(io, who, response));
}
/// Execute a closure with access to a network context and specialization.
pub fn with_spec<F, U>(&self, io: &mut SyncIo, f: F) -> U
where F: FnOnce(&mut S, &mut Context<B>) -> U