Replace old headers with CHT in light clients (#512)

* storage proofs

* CHT
This commit is contained in:
Svyatoslav Nikolsky
2018-09-04 22:57:55 +03:00
committed by Gav Wood
parent a34e990cf2
commit 8eb4589ca6
19 changed files with 809 additions and 81 deletions
+1 -1
View File
@@ -2483,7 +2483,7 @@ dependencies = [
"substrate-state-machine 0.1.0",
"substrate-telemetry 0.3.0",
"substrate-test-client 0.1.0",
"triehash 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"triehash 0.2.0 (git+https://github.com/paritytech/parity-common)",
]
[[package]]
+1 -1
View File
@@ -8,7 +8,6 @@ error-chain = "0.12"
fnv = "1.0"
log = "0.3"
parking_lot = "0.4"
triehash = "0.1"
hex-literal = "0.1"
futures = "0.1.17"
ed25519 = { path = "../ed25519" }
@@ -27,6 +26,7 @@ substrate-telemetry = { path = "../telemetry" }
hashdb = { git = "https://github.com/paritytech/parity-common" }
patricia-trie = { git = "https://github.com/paritytech/parity-common" }
rlp = { git = "https://github.com/paritytech/parity-common" }
triehash = { git = "https://github.com/paritytech/parity-common" }
[dev-dependencies]
substrate-test-client = { path = "../test-client" }
+12 -3
View File
@@ -61,7 +61,8 @@ use runtime_primitives::BuildStorage;
use state_machine::backend::Backend as StateBackend;
use executor::RuntimeInfo;
use state_machine::{CodeExecutor, DBValue, ExecutionStrategy};
use utils::{Meta, db_err, meta_keys, number_to_db_key, open_database, read_db, read_id, read_meta};
use utils::{Meta, db_err, meta_keys, number_to_db_key, db_key_to_number, open_database,
read_db, read_id, read_meta};
use state_db::StateDb;
pub use state_db::PruningMode;
@@ -183,6 +184,14 @@ impl<Block: BlockT> client::blockchain::HeaderBackend<Block> for BlockchainDb<Bl
}
}
fn number(&self, hash: Block::Hash) -> Result<Option<<Block::Header as HeaderT>::Number>, client::error::Error> {
read_id::<Block>(&*self.db, columns::BLOCK_INDEX, BlockId::Hash(hash))
.and_then(|key| match key {
Some(key) => Ok(Some(db_key_to_number(&key)?)),
None => Ok(None),
})
}
fn hash(&self, number: <Block::Header as HeaderT>::Number) -> Result<Option<Block::Hash>, client::error::Error> {
read_db::<Block>(&*self.db, columns::BLOCK_INDEX, columns::HEADER, BlockId::Number(number)).map(|x|
x.map(|raw| HashFor::<Block>::hash(&raw[..])).map(Into::into)
@@ -397,13 +406,13 @@ impl<Block> client::backend::Backend<Block, KeccakHasher, RlpCodec> for Backend<
}
};
if let Some(finalizing_hash) = finalizing_hash {
trace!("Finalizing block #{} ({:?})", number_u64 - self.finalization_window, finalizing_hash);
trace!(target: "db", "Finalizing block #{} ({:?})", number_u64 - self.finalization_window, finalizing_hash);
let commit = self.storage.state_db.finalize_block(&finalizing_hash);
apply_state_commit(&mut transaction, commit);
}
}
debug!("DB Commit {:?} ({}), best = {}", hash, number, pending_block.is_best);
debug!(target: "db", "DB Commit {:?} ({}), best = {}", hash, number, pending_block.is_best);
self.storage.db.write(transaction).map_err(db_err)?;
self.blockchain.update_meta(hash, number, pending_block.is_best);
}
+104 -4
View File
@@ -23,14 +23,17 @@ use kvdb::{KeyValueDB, DBTransaction};
use client::blockchain::{BlockStatus, Cache as BlockchainCache,
HeaderBackend as BlockchainHeaderBackend, Info as BlockchainInfo};
use client::cht;
use client::error::{ErrorKind as ClientErrorKind, Result as ClientResult};
use client::light::blockchain::Storage as LightBlockchainStorage;
use codec::{Decode, Encode};
use primitives::AuthorityId;
use primitives::{AuthorityId, H256, KeccakHasher};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, Zero, As};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor,
Zero, One, As, NumberFor};
use cache::DbCache;
use utils::{meta_keys, Meta, db_err, number_to_db_key, open_database, read_db, read_id, read_meta};
use utils::{meta_keys, Meta, db_err, number_to_db_key, db_key_to_number, open_database,
read_db, read_id, read_meta};
use DatabaseSettings;
pub(crate) mod columns {
@@ -38,10 +41,11 @@ pub(crate) mod columns {
pub const BLOCK_INDEX: Option<u32> = Some(1);
pub const HEADER: Option<u32> = Some(2);
pub const AUTHORITIES: Option<u32> = Some(3);
pub const CHT: Option<u32> = Some(4);
}
/// Keep authorities for last 'AUTHORITIES_ENTRIES_TO_KEEP' blocks.
pub(crate) const AUTHORITIES_ENTRIES_TO_KEEP: u64 = 2048;
pub(crate) const AUTHORITIES_ENTRIES_TO_KEEP: u64 = cht::SIZE;
/// Light blockchain storage. Stores most recent headers + CHTs for older headers.
pub struct LightStorage<Block: BlockT> {
@@ -146,6 +150,14 @@ impl<Block> BlockchainHeaderBackend<Block> for LightStorage<Block>
}
}
fn number(&self, hash: Block::Hash) -> ClientResult<Option<<<Block as BlockT>::Header as HeaderT>::Number>> {
read_id::<Block>(&*self.db, columns::BLOCK_INDEX, BlockId::Hash(hash))
.and_then(|key| match key {
Some(key) => Ok(Some(db_key_to_number(&key)?)),
None => Ok(None),
})
}
fn hash(&self, number: <<Block as BlockT>::Header as HeaderT>::Number) -> ClientResult<Option<Block::Hash>> {
read_db::<Block>(&*self.db, columns::BLOCK_INDEX, columns::HEADER, BlockId::Number(number)).map(|x|
x.map(|raw| HashFor::<Block>::hash(&raw[..])).map(Into::into)
@@ -156,6 +168,7 @@ impl<Block> BlockchainHeaderBackend<Block> for LightStorage<Block>
impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
where
Block: BlockT,
Block::Hash: From<H256>,
{
fn import_header(&self, is_new_best: bool, header: Block::Header, authorities: Option<Vec<AuthorityId>>) -> ClientResult<()> {
let mut transaction = DBTransaction::new();
@@ -187,6 +200,27 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
None
};
// build new CHT if required
if let Some(new_cht_number) = cht::is_build_required(cht::SIZE, *header.number()) {
let new_cht_start: NumberFor<Block> = cht::start_number(cht::SIZE, new_cht_number);
let new_cht_root: Option<Block::Hash> = cht::compute_root::<Block::Header, KeccakHasher, _>(
cht::SIZE, new_cht_number, (new_cht_start.as_()..)
.map(|num| self.hash(As::sa(num)).unwrap_or_default()));
if let Some(new_cht_root) = new_cht_root {
transaction.put(columns::CHT, &number_to_db_key(new_cht_start), new_cht_root.as_ref());
let mut prune_block = new_cht_start;
let new_cht_end = cht::end_number(cht::SIZE, new_cht_number);
trace!(target: "db", "Replacing blocks [{}..{}] with CHT#{}", new_cht_start, new_cht_end, new_cht_number);
while prune_block <= new_cht_end {
transaction.delete(columns::HEADER, &number_to_db_key(prune_block));
prune_block += <<Block as BlockT>::Header as HeaderT>::Number::one();
}
}
}
debug!("Light DB Commit {:?} ({})", hash, number);
self.db.write(transaction).map_err(db_err)?;
self.update_meta(hash, number, is_new_best);
@@ -197,6 +231,16 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
Ok(())
}
fn cht_root(&self, cht_size: u64, block: <<Block as BlockT>::Header as HeaderT>::Number) -> ClientResult<Block::Hash> {
let no_cht_for_block = || ClientErrorKind::Backend(format!("CHT for block {} not exists", block)).into();
let cht_number = cht::block_to_cht_number(cht_size, block).ok_or_else(no_cht_for_block)?;
let cht_start = cht::start_number(cht_size, cht_number);
self.db.get(columns::CHT, &number_to_db_key(cht_start)).map_err(db_err)?
.ok_or_else(no_cht_for_block)
.and_then(|hash| Block::Hash::decode(&mut &*hash).ok_or_else(no_cht_for_block))
}
fn cache(&self) -> Option<&BlockchainCache<Block>> {
Some(&self.cache)
}
@@ -204,6 +248,7 @@ impl<Block> LightBlockchainStorage<Block> for LightStorage<Block>
#[cfg(test)]
pub(crate) mod tests {
use client::cht;
use runtime_primitives::testing::{H256 as Hash, Header, Block as RawBlock};
use super::*;
@@ -289,4 +334,59 @@ pub(crate) mod tests {
assert_eq!(db.db.iter(columns::HEADER).count(), 2);
assert_eq!(db.db.iter(columns::BLOCK_INDEX).count(), 2);
}
#[test]
fn ancient_headers_are_replaced_with_cht() {
let db = LightStorage::new_test();
// insert genesis block header (never pruned)
let mut prev_hash = insert_block(&db, &Default::default(), 0, None);
// insert SIZE blocks && ensure that nothing is pruned
for number in 0..cht::SIZE {
prev_hash = insert_block(&db, &prev_hash, 1 + number, None);
}
assert_eq!(db.db.iter(columns::HEADER).count(), (1 + cht::SIZE) as usize);
assert_eq!(db.db.iter(columns::CHT).count(), 0);
// insert next SIZE blocks && ensure that nothing is pruned
for number in 0..cht::SIZE {
prev_hash = insert_block(&db, &prev_hash, 1 + cht::SIZE + number, None);
}
assert_eq!(db.db.iter(columns::HEADER).count(), (1 + cht::SIZE + cht::SIZE) as usize);
assert_eq!(db.db.iter(columns::CHT).count(), 0);
// insert block #{2 * cht::SIZE + 1} && check that new CHT is created + headers of this CHT are pruned
insert_block(&db, &prev_hash, 1 + cht::SIZE + cht::SIZE, None);
assert_eq!(db.db.iter(columns::HEADER).count(), (1 + cht::SIZE + 1) as usize);
assert_eq!(db.db.iter(columns::CHT).count(), 1);
assert!((0..cht::SIZE).all(|i| db.db.get(columns::HEADER, &number_to_db_key(1 + i)).unwrap().is_none()));
}
#[test]
fn get_cht_fails_for_genesis_block() {
assert!(LightStorage::<Block>::new_test().cht_root(cht::SIZE, 0).is_err());
}
#[test]
fn get_cht_fails_for_non_existant_cht() {
assert!(LightStorage::<Block>::new_test().cht_root(cht::SIZE, (cht::SIZE / 2) as u64).is_err());
}
#[test]
fn get_cht_works() {
let db = LightStorage::new_test();
// insert 1 + SIZE + SIZE + 1 blocks so that CHT#0 is created
let mut prev_hash = Default::default();
for i in 0..1 + cht::SIZE + cht::SIZE + 1 {
prev_hash = insert_block(&db, &prev_hash, i as u64, None);
}
let cht_root_1 = db.cht_root(cht::SIZE, cht::start_number(cht::SIZE, 0)).unwrap();
let cht_root_2 = db.cht_root(cht::SIZE, (cht::start_number(cht::SIZE, 0) + cht::SIZE / 2) as u64).unwrap();
let cht_root_3 = db.cht_root(cht::SIZE, cht::end_number(cht::SIZE, 0)).unwrap();
assert_eq!(cht_root_1, cht_root_2);
assert_eq!(cht_root_2, cht_root_3);
}
}
@@ -31,6 +31,8 @@ pub trait HeaderBackend<Block: BlockT>: Send + Sync {
fn info(&self) -> Result<Info<Block>>;
/// Get block status.
fn status(&self, id: BlockId<Block>) -> Result<BlockStatus>;
/// Get block number by hash. Returns `None` if the header is not in the chain.
fn number(&self, hash: Block::Hash) -> Result<Option<<<Block as BlockT>::Header as HeaderT>::Number>>;
/// Get block hash by number. Returns `None` if the header is not in the chain.
fn hash(&self, number: NumberFor<Block>) -> Result<Option<Block::Hash>>;
+274
View File
@@ -0,0 +1,274 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Canonical hash trie definitions and helper functions.
//!
//! Each CHT is a trie mapping block numbers to canonical hash.
//! One is generated for every `SIZE` blocks, allowing us to discard those blocks in
//! favor of the trie root. When the "ancient" blocks need to be accessed, we simply
//! request an inclusion proof of a specific block number against the trie with the
//! root has. A correct proof implies that the claimed block is identical to the one
//! we discarded.
use hashdb;
use heapsize::HeapSizeOf;
use patricia_trie::NodeCodec;
use rlp::Encodable;
use triehash;
use primitives::H256;
use runtime_primitives::traits::{As, Header as HeaderT, SimpleArithmetic, One};
use state_machine::backend::InMemory as InMemoryState;
use state_machine::{prove_read, read_proof_check};
use error::{Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult};
/// The size of each CHT. This value is passed to every CHT-related function from
/// production code. Other values are passed from tests.
pub const SIZE: u64 = 2048;
/// Returns Some(cht_number) if CHT is need to be built when the block with given number is canonized.
pub fn is_build_required<N>(cht_size: u64, block_num: N) -> Option<N>
where
N: Clone + SimpleArithmetic,
{
let block_cht_num = block_to_cht_number(cht_size, block_num.clone())?;
let two = N::one() + N::one();
if block_cht_num < two {
return None;
}
let cht_start = start_number(cht_size, block_cht_num.clone());
if cht_start != block_num {
return None;
}
Some(block_cht_num - two)
}
/// Compute a CHT root from an iterator of block hashes. Fails if shorter than
/// SIZE items. The items are assumed to proceed sequentially from `start_number(cht_num)`.
/// Discards the trie's nodes.
pub fn compute_root<Header, Hasher, I>(
cht_size: u64,
cht_num: Header::Number,
hashes: I,
) -> Option<Header::Hash>
where
Header: HeaderT,
Header::Hash: From<Hasher::Out>,
Hasher: hashdb::Hasher,
Hasher::Out: Ord + Encodable,
I: IntoIterator<Item=Option<Header::Hash>>,
{
build_pairs::<Header, I>(cht_size, cht_num, hashes)
.map(|pairs| triehash::trie_root::<Hasher, _, _, _>(pairs).into())
}
/// Build CHT-based header proof.
pub fn build_proof<Header, Hasher, Codec, I>(
cht_size: u64,
cht_num: Header::Number,
block_num: Header::Number,
hashes: I
) -> Option<Vec<Vec<u8>>>
where
Header: HeaderT,
Hasher: hashdb::Hasher,
Hasher::Out: Ord + Encodable + HeapSizeOf,
Codec: NodeCodec<Hasher>,
I: IntoIterator<Item=Option<Header::Hash>>,
{
let transaction = build_pairs::<Header, I>(cht_size, cht_num, hashes)?
.into_iter()
.map(|(k, v)| (k, Some(v)))
.collect::<Vec<_>>();
let storage = InMemoryState::<Hasher, Codec>::default().update(transaction);
let (value, proof) = prove_read(storage, &encode_cht_key(block_num)).ok()?;
if value.is_none() {
None
} else {
Some(proof)
}
}
/// Check CHT-based header proof.
pub fn check_proof<Header, Hasher, Codec>(
local_root: Header::Hash,
local_number: Header::Number,
remote_hash: Header::Hash,
remote_proof: Vec<Vec<u8>>
) -> ClientResult<()>
where
Header: HeaderT,
Header::Hash: From<H256>,
Hasher: hashdb::Hasher,
Hasher::Out: Ord + Encodable + HeapSizeOf + From<Header::Hash>,
Codec: NodeCodec<Hasher>,
{
let local_cht_key = encode_cht_key(local_number);
let local_cht_value = read_proof_check::<Hasher, Codec>(local_root.into(), remote_proof,
&local_cht_key).map_err(|e| ClientError::from(e))?;
let local_cht_value = local_cht_value.ok_or_else(|| ClientErrorKind::InvalidHeaderProof)?;
let local_hash: Header::Hash = decode_cht_value(&local_cht_value).ok_or_else(|| ClientErrorKind::InvalidHeaderProof)?;
match local_hash == remote_hash {
true => Ok(()),
false => Err(ClientErrorKind::InvalidHeaderProof.into()),
}
}
/// Build pairs for computing CHT.
fn build_pairs<Header, I>(
cht_size: u64,
cht_num: Header::Number,
hashes: I
) -> Option<Vec<(Vec<u8>, Vec<u8>)>>
where
Header: HeaderT,
I: IntoIterator<Item=Option<Header::Hash>>,
{
let start_num = start_number(cht_size, cht_num);
let mut pairs = Vec::new();
let mut hash_number = start_num;
for hash in hashes.into_iter().take(cht_size as usize) {
pairs.push(hash.map(|hash| (
encode_cht_key(hash_number).to_vec(),
encode_cht_value(hash)
))?);
hash_number += Header::Number::one();
}
if pairs.len() as u64 == cht_size {
Some(pairs)
} else {
None
}
}
/// Get the starting block of a given CHT.
/// CHT 0 includes block 1...SIZE,
/// CHT 1 includes block SIZE + 1 ... 2*SIZE
/// More generally: CHT N includes block (1 + N*SIZE)...((N+1)*SIZE).
/// This is because the genesis hash is assumed to be known
/// and including it would be redundant.
pub fn start_number<N: SimpleArithmetic>(cht_size: u64, cht_num: N) -> N {
(cht_num * As::sa(cht_size)) + N::one()
}
/// Get the ending block of a given CHT.
pub fn end_number<N: SimpleArithmetic>(cht_size: u64, cht_num: N) -> N {
(cht_num + N::one()) * As::sa(cht_size)
}
/// Convert a block number to a CHT number.
/// Returns `None` for `block_num` == 0, `Some` otherwise.
pub fn block_to_cht_number<N: SimpleArithmetic>(cht_size: u64, block_num: N) -> Option<N> {
if block_num == N::zero() {
None
} else {
Some((block_num - N::one()) / As::sa(cht_size))
}
}
/// Convert header number into CHT key.
pub fn encode_cht_key<N: As<u64>>(number: N) -> Vec<u8> {
let number: u64 = number.as_();
vec![
(number >> 56) as u8,
((number >> 48) & 0xff) as u8,
((number >> 40) & 0xff) as u8,
((number >> 32) & 0xff) as u8,
((number >> 24) & 0xff) as u8,
((number >> 16) & 0xff) as u8,
((number >> 8) & 0xff) as u8,
(number & 0xff) as u8
]
}
/// Convert header hash into CHT value.
fn encode_cht_value<Hash: AsRef<[u8]>>(hash: Hash) -> Vec<u8> {
hash.as_ref().to_vec()
}
/// Convert CHT value into block header hash.
pub fn decode_cht_value<Hash: From<H256>>(value: &[u8]) -> Option<Hash> {
match value.len() {
32 => Some(H256::from_slice(&value[0..32]).into()),
_ => None,
}
}
#[cfg(test)]
mod tests {
use primitives::{KeccakHasher, RlpCodec};
use test_client::runtime::Header;
use super::*;
#[test]
fn is_build_required_works() {
assert_eq!(is_build_required(SIZE, 0), None);
assert_eq!(is_build_required(SIZE, 1), None);
assert_eq!(is_build_required(SIZE, SIZE), None);
assert_eq!(is_build_required(SIZE, SIZE + 1), None);
assert_eq!(is_build_required(SIZE, 2 * SIZE), None);
assert_eq!(is_build_required(SIZE, 2 * SIZE + 1), Some(0));
assert_eq!(is_build_required(SIZE, 3 * SIZE), None);
assert_eq!(is_build_required(SIZE, 3 * SIZE + 1), Some(1));
}
#[test]
fn start_number_works() {
assert_eq!(start_number(SIZE, 0), 1);
assert_eq!(start_number(SIZE, 1), SIZE + 1);
assert_eq!(start_number(SIZE, 2), SIZE + SIZE + 1);
}
#[test]
fn end_number_works() {
assert_eq!(end_number(SIZE, 0), SIZE);
assert_eq!(end_number(SIZE, 1), SIZE + SIZE);
assert_eq!(end_number(SIZE, 2), SIZE + SIZE + SIZE);
}
#[test]
fn build_pairs_fails_when_no_enough_blocks() {
assert!(build_pairs::<Header, _>(SIZE, 0, vec![Some(1.into()); SIZE as usize / 2]).is_none());
}
#[test]
fn build_pairs_fails_when_missing_block() {
assert!(build_pairs::<Header, _>(SIZE, 0, ::std::iter::repeat(Some(1.into())).take(SIZE as usize / 2)
.chain(::std::iter::once(None))
.chain(::std::iter::repeat(Some(2.into())).take(SIZE as usize / 2 - 1))).is_none());
}
#[test]
fn compute_root_works() {
assert!(compute_root::<Header, KeccakHasher, _>(SIZE, 42, vec![Some(1.into()); SIZE as usize]).is_some());
}
#[test]
fn build_proof_fails_when_querying_wrong_block() {
assert!(build_proof::<Header, KeccakHasher, RlpCodec, _>(
SIZE, 0, (SIZE * 1000) as u64, vec![Some(1.into()); SIZE as usize]).is_none());
}
#[test]
fn build_proof_works() {
assert!(build_proof::<Header, KeccakHasher, RlpCodec, _>(
SIZE, 0, (SIZE / 2) as u64, vec![Some(1.into()); SIZE as usize]).is_some());
}
}
+19 -1
View File
@@ -36,7 +36,7 @@ use blockchain::{self, Info as ChainInfo, Backend as ChainBackend, HeaderBackend
use call_executor::{CallExecutor, LocalCallExecutor};
use executor::{RuntimeVersion, RuntimeInfo};
use notifications::{StorageNotifications, StorageEventStream};
use {error, in_mem, block_builder, runtime_io, bft, genesis};
use {cht, error, in_mem, block_builder, runtime_io, bft, genesis};
/// Type that implements `futures::Stream` of block import events.
pub type BlockchainEventStream<Block> = mpsc::UnboundedReceiver<BlockImportNotification<Block>>;
@@ -267,6 +267,24 @@ impl<B, E, Block> Client<B, E, Block> where
self.state_at(id).and_then(|state| self.executor.prove_at_state(state, &mut Default::default(), method, call_data))
}
/// Reads given header and generates CHT-based header proof.
pub fn header_proof(&self, id: &BlockId<Block>) -> error::Result<(Block::Header, Vec<Vec<u8>>)> {
self.header_proof_with_cht_size(id, cht::SIZE)
}
/// Reads given header and generates CHT-based header proof for CHT of given size.
pub fn header_proof_with_cht_size(&self, id: &BlockId<Block>, cht_size: u64) -> error::Result<(Block::Header, Vec<Vec<u8>>)> {
let proof_error = || error::ErrorKind::Backend(format!("Failed to generate header proof for {:?}", id));
let header = self.header(id)?.ok_or_else(|| error::ErrorKind::UnknownBlock(format!("{:?}", id)))?;
let block_num = *header.number();
let cht_num = cht::block_to_cht_number(cht_size, block_num).ok_or_else(proof_error)?;
let cht_start = cht::start_number(cht_size, cht_num);
let headers = (cht_start.as_()..).map(|num| self.block_hash(As::sa(num)).unwrap_or_default());
let proof = cht::build_proof::<Block::Header, KeccakHasher, RlpCodec, _>(cht_size, cht_num, block_num, headers)
.ok_or_else(proof_error)?;
Ok((header, proof))
}
/// Set up the native execution environment to call into a native runtime code.
pub fn using_environment<F: FnOnce() -> T, T>(
&self, f: F
+6
View File
@@ -94,6 +94,12 @@ error_chain! {
display("This method is not currently available when running in light client mode"),
}
/// Invalid remote header proof.
InvalidHeaderProof {
description("invalid header proof"),
display("Remote node has responded with invalid header proof"),
}
/// Invalid remote execution proof.
InvalidExecutionProof {
description("invalid execution proof"),
+1 -1
View File
@@ -69,7 +69,7 @@ mod tests {
Extrinsic { transfer: tx, signature }
}).collect::<Vec<_>>();
let extrinsics_root = ordered_trie_root(transactions.iter().map(Encode::encode)).0.into();
let extrinsics_root = ordered_trie_root::<KeccakHasher, _, _>(transactions.iter().map(Encode::encode)).into();
println!("root before: {:?}", extrinsics_root);
let mut header = Header {
+20 -1
View File
@@ -89,6 +89,7 @@ struct BlockchainStorage<Block: BlockT> {
best_hash: Block::Hash,
best_number: <<Block as BlockT>::Header as HeaderT>::Number,
genesis_hash: Block::Hash,
cht_roots: HashMap<NumberFor<Block>, Block::Hash>,
}
/// In-memory blockchain. Supports concurrent reads.
@@ -133,6 +134,7 @@ impl<Block: BlockT> Blockchain<Block> {
best_hash: Default::default(),
best_number: Zero::zero(),
genesis_hash: Default::default(),
cht_roots: HashMap::new(),
}));
Blockchain {
storage: storage.clone(),
@@ -179,6 +181,11 @@ impl<Block: BlockT> Blockchain<Block> {
&& this.best_number == other.best_number
&& this.genesis_hash == other.genesis_hash
}
/// Insert CHT root.
pub fn insert_cht_root(&self, block: NumberFor<Block>, cht_root: Block::Hash) {
self.storage.write().cht_roots.insert(block, cht_root);
}
}
impl<Block: BlockT> blockchain::HeaderBackend<Block> for Blockchain<Block> {
@@ -204,6 +211,10 @@ impl<Block: BlockT> blockchain::HeaderBackend<Block> for Blockchain<Block> {
}
}
fn number(&self, hash: Block::Hash) -> error::Result<Option<NumberFor<Block>>> {
Ok(self.storage.read().blocks.get(&hash).map(|b| *b.header().number()))
}
fn hash(&self, number: <<Block as BlockT>::Header as HeaderT>::Number) -> error::Result<Option<Block::Hash>> {
Ok(self.id(BlockId::Number(number)))
}
@@ -229,7 +240,10 @@ impl<Block: BlockT> blockchain::Backend<Block> for Blockchain<Block> {
}
}
impl<Block: BlockT> light::blockchain::Storage<Block> for Blockchain<Block> {
impl<Block: BlockT> light::blockchain::Storage<Block> for Blockchain<Block>
where
Block::Hash: From<[u8; 32]>,
{
fn import_header(
&self,
is_new_best: bool,
@@ -245,6 +259,11 @@ impl<Block: BlockT> light::blockchain::Storage<Block> for Blockchain<Block> {
Ok(())
}
fn cht_root(&self, _cht_size: u64, block: NumberFor<Block>) -> error::Result<Block::Hash> {
self.storage.read().cht_roots.get(&block).cloned()
.ok_or_else(|| error::ErrorKind::Backend(format!("CHT for block {} not exists", block)).into())
}
fn cache(&self) -> Option<&blockchain::Cache<Block>> {
Some(&self.cache)
}
+1
View File
@@ -49,6 +49,7 @@ extern crate heapsize;
pub mod error;
pub mod blockchain;
pub mod backend;
pub mod cht;
pub mod in_mem;
pub mod genesis;
pub mod block_builder;
@@ -226,59 +226,3 @@ where
None
}
}
#[cfg(test)]
pub mod tests {
use futures::future::{ok, err, FutureResult};
use parking_lot::Mutex;
use call_executor::CallResult;
use executor::NativeExecutionDispatch;
use error::Error as ClientError;
use test_client::{self, runtime::{Header, Block}};
use light::new_fetch_checker;
use light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest};
use super::*;
use primitives::{KeccakHasher, RlpCodec};
pub type OkCallFetcher = Mutex<CallResult>;
impl Fetcher<Block> for OkCallFetcher {
type RemoteReadResult = FutureResult<Option<Vec<u8>>, ClientError>;
type RemoteCallResult = FutureResult<CallResult, ClientError>;
fn remote_read(&self, _request: RemoteReadRequest<Header>) -> Self::RemoteReadResult {
err("Not implemented on test node".into())
}
fn remote_call(&self, _request: RemoteCallRequest<Header>) -> Self::RemoteCallResult {
ok((*self.lock()).clone())
}
}
#[test]
fn storage_read_proof_is_generated_and_checked() {
// prepare remote client
let remote_client = test_client::new();
let remote_block_id = BlockId::Number(0);
let remote_block_hash = remote_client.block_hash(0).unwrap().unwrap();
let mut remote_block_header = remote_client.header(&remote_block_id).unwrap().unwrap();
remote_block_header.state_root = remote_client.state_at(&remote_block_id)
.unwrap().storage_root(::std::iter::empty()).0.into();
// 'fetch' read proof from remote node
let authorities_len = remote_client.authorities_at(&remote_block_id).unwrap().len();
let remote_read_proof = remote_client.read_proof(&remote_block_id, b":auth:len").unwrap();
// check remote read proof locally
let local_executor = test_client::LocalExecutor::new();
let local_checker = new_fetch_checker::<_, KeccakHasher, RlpCodec>(local_executor);
let request = RemoteReadRequest {
block: remote_block_hash,
header: remote_block_header,
key: b":auth:len".to_vec(),
};
assert_eq!((&local_checker as &FetchChecker<Block>).check_read_proof(
&request,
remote_read_proof).unwrap().unwrap()[0], authorities_len as u8);
}
}
@@ -18,16 +18,18 @@
//! blocks. CHT roots are stored for headers of ancient blocks.
use std::sync::Weak;
use futures::{Future, IntoFuture};
use parking_lot::Mutex;
use primitives::AuthorityId;
use runtime_primitives::{bft::Justification, generic::BlockId};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero};
use blockchain::{Backend as BlockchainBackend, BlockStatus, Cache as BlockchainCache,
HeaderBackend as BlockchainHeaderBackend, Info as BlockchainInfo};
use error::Result as ClientResult;
use light::fetcher::Fetcher;
use cht;
use error::{ErrorKind as ClientErrorKind, Result as ClientResult};
use light::fetcher::{Fetcher, RemoteHeaderRequest};
/// Light client blockchain storage.
pub trait Storage<Block: BlockT>: BlockchainHeaderBackend<Block> {
@@ -39,6 +41,9 @@ pub trait Storage<Block: BlockT>: BlockchainHeaderBackend<Block> {
authorities: Option<Vec<AuthorityId>>
) -> ClientResult<()>;
/// Get CHT root for given block. Fails if the block is not pruned (not a part of any CHT).
fn cht_root(&self, cht_size: u64, block: NumberFor<Block>) -> ClientResult<Block::Hash>;
/// Get storage cache.
fn cache(&self) -> Option<&BlockchainCache<Block>>;
}
@@ -76,7 +81,31 @@ impl<S, F> Blockchain<S, F> {
impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Block: BlockT, S: Storage<Block>, F: Fetcher<Block> {
fn header(&self, id: BlockId<Block>) -> ClientResult<Option<Block::Header>> {
self.storage.header(id)
match self.storage.header(id)? {
Some(header) => Ok(Some(header)),
None => {
let number = match id {
BlockId::Hash(hash) => match self.storage.number(hash)? {
Some(number) => number,
None => return Ok(None),
},
BlockId::Number(number) => number,
};
// if the header is from future or genesis (we never prune genesis) => return
if number.is_zero() || self.storage.status(BlockId::Number(number))? != BlockStatus::InChain {
return Ok(None);
}
self.fetcher().upgrade().ok_or(ClientErrorKind::NotAvailableOnLightClient)?
.remote_header(RemoteHeaderRequest {
cht_root: self.storage.cht_root(cht::SIZE, number)?,
block: number,
})
.into_future().wait()
.map(Some)
}
}
}
fn info(&self) -> ClientResult<BlockchainInfo<Block>> {
@@ -87,6 +116,10 @@ impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Bloc
self.storage.status(id)
}
fn number(&self, hash: Block::Hash) -> ClientResult<Option<NumberFor<Block>>> {
self.storage.number(hash)
}
fn hash(&self, number: <<Block as BlockT>::Header as HeaderT>::Number) -> ClientResult<Option<Block::Hash>> {
self.storage.hash(number)
}
+169 -3
View File
@@ -28,7 +28,8 @@ use state_machine::{CodeExecutor, read_proof_check};
use std::marker::PhantomData;
use call_executor::CallResult;
use error::{Error as ClientError, Result as ClientResult};
use cht;
use error::{Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult};
use light::call_executor::check_execution_proof;
/// Remote call request.
@@ -44,6 +45,15 @@ pub struct RemoteCallRequest<Header: HeaderT> {
pub call_data: Vec<u8>,
}
/// Remote canonical header request.
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct RemoteHeaderRequest<Header: HeaderT> {
/// The root of CHT this block is included in.
pub cht_root: Header::Hash,
/// Number of the header to query.
pub block: Header::Number,
}
/// Remote storage read request.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct RemoteReadRequest<Header: HeaderT> {
@@ -58,11 +68,15 @@ pub struct RemoteReadRequest<Header: HeaderT> {
/// Light client data fetcher. Implementations of this trait must check if remote data
/// is correct (see FetchedDataChecker) and return already checked data.
pub trait Fetcher<Block: BlockT>: Send + Sync {
/// Remote header future.
type RemoteHeaderResult: IntoFuture<Item=Block::Header, Error=ClientError>;
/// Remote storage read future.
type RemoteReadResult: IntoFuture<Item=Option<Vec<u8>>, Error=ClientError>;
/// Remote call result future.
type RemoteCallResult: IntoFuture<Item=CallResult, Error=ClientError>;
/// Fetch remote header.
fn remote_header(&self, request: RemoteHeaderRequest<Block::Header>) -> Self::RemoteHeaderResult;
/// Fetch remote storage value.
fn remote_read(&self, request: RemoteReadRequest<Block::Header>) -> Self::RemoteReadResult;
/// Fetch remote call result.
@@ -74,6 +88,13 @@ pub trait Fetcher<Block: BlockT>: Send + Sync {
/// Implementations of this trait should not use any blockchain data except that is
/// passed to its methods.
pub trait FetchChecker<Block: BlockT>: Send + Sync {
/// Check remote header proof.
fn check_header_proof(
&self,
request: &RemoteHeaderRequest<Block::Header>,
header: Option<Block::Header>,
remote_proof: Vec<Vec<u8>>
) -> ClientResult<Block::Header>;
/// Check remote storage read proof.
fn check_read_proof(
&self,
@@ -107,12 +128,29 @@ impl<E, H, C> LightDataChecker<E, H, C> {
impl<E, Block, H, C> FetchChecker<Block> for LightDataChecker<E, H, C>
where
Block: BlockT,
Block::Hash: Into<H::Out>,
Block::Hash: Into<H::Out> + From<H256>,
E: CodeExecutor<H>,
H: Hasher,
C: NodeCodec<H> + Sync + Send,
H::Out: Ord + Encodable + HeapSizeOf + From<H256>,
H::Out: Ord + Encodable + HeapSizeOf + From<Block::Hash> + From<H256>,
{
fn check_header_proof(
&self,
request: &RemoteHeaderRequest<Block::Header>,
remote_header: Option<Block::Header>,
remote_proof: Vec<Vec<u8>>
) -> ClientResult<Block::Header> {
let remote_header = remote_header.ok_or_else(||
ClientError::from(ClientErrorKind::InvalidHeaderProof))?;
let remote_header_hash = remote_header.hash();
cht::check_proof::<Block::Header, H, C>(
request.cht_root,
request.block,
remote_header_hash,
remote_proof)
.map(|_| remote_header)
}
fn check_read_proof(
&self,
request: &RemoteReadRequest<Block::Header>,
@@ -130,3 +168,131 @@ impl<E, Block, H, C> FetchChecker<Block> for LightDataChecker<E, H, C>
check_execution_proof::<_, _, H, C>(&self.executor, request, remote_proof)
}
}
#[cfg(test)]
pub mod tests {
use futures::future::{ok, err, FutureResult};
use parking_lot::Mutex;
use call_executor::CallResult;
use executor::{self, NativeExecutionDispatch};
use error::Error as ClientError;
use test_client::{self, TestClient, runtime::{Hash, Block, Header}};
use test_client::client::BlockOrigin;
use in_mem::{Blockchain as InMemoryBlockchain};
use light::fetcher::{Fetcher, FetchChecker, LightDataChecker,
RemoteCallRequest, RemoteHeaderRequest};
use primitives::{KeccakHasher, RlpCodec};
use runtime_primitives::generic::BlockId;
use state_machine::Backend;
use super::*;
pub type OkCallFetcher = Mutex<CallResult>;
impl Fetcher<Block> for OkCallFetcher {
type RemoteHeaderResult = FutureResult<Header, ClientError>;
type RemoteReadResult = FutureResult<Option<Vec<u8>>, ClientError>;
type RemoteCallResult = FutureResult<CallResult, ClientError>;
fn remote_header(&self, _request: RemoteHeaderRequest<Header>) -> Self::RemoteHeaderResult {
err("Not implemented on test node".into())
}
fn remote_read(&self, _request: RemoteReadRequest<Header>) -> Self::RemoteReadResult {
err("Not implemented on test node".into())
}
fn remote_call(&self, _request: RemoteCallRequest<Header>) -> Self::RemoteCallResult {
ok((*self.lock()).clone())
}
}
fn prepare_for_read_proof_check() -> (
LightDataChecker<executor::NativeExecutor<test_client::LocalExecutor>, KeccakHasher, RlpCodec>,
Header, Vec<Vec<u8>>, usize)
{
// prepare remote client
let remote_client = test_client::new();
let remote_block_id = BlockId::Number(0);
let remote_block_hash = remote_client.block_hash(0).unwrap().unwrap();
let mut remote_block_header = remote_client.header(&remote_block_id).unwrap().unwrap();
remote_block_header.state_root = remote_client.state_at(&remote_block_id).unwrap().storage_root(::std::iter::empty()).0.into();
// 'fetch' read proof from remote node
let authorities_len = remote_client.authorities_at(&remote_block_id).unwrap().len();
let remote_read_proof = remote_client.read_proof(&remote_block_id, b":auth:len").unwrap();
// check remote read proof locally
let local_storage = InMemoryBlockchain::<Block>::new();
local_storage.insert(remote_block_hash, remote_block_header.clone(), None, None, true);
let local_executor = test_client::LocalExecutor::new();
let local_checker = LightDataChecker::new(local_executor);
(local_checker, remote_block_header, remote_read_proof, authorities_len)
}
fn prepare_for_header_proof_check(insert_cht: bool) -> (
LightDataChecker<executor::NativeExecutor<test_client::LocalExecutor>, KeccakHasher, RlpCodec>,
Hash, Header, Vec<Vec<u8>>)
{
// prepare remote client
let remote_client = test_client::new();
let mut local_headers_hashes = Vec::new();
for i in 0..4 {
let builder = remote_client.new_block().unwrap();
remote_client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap();
local_headers_hashes.push(remote_client.block_hash(i + 1).unwrap());
}
// 'fetch' header proof from remote node
let remote_block_id = BlockId::Number(1);
let (remote_block_header, remote_header_proof) = remote_client.header_proof_with_cht_size(&remote_block_id, 4).unwrap();
// check remote read proof locally
let local_storage = InMemoryBlockchain::<Block>::new();
let local_cht_root = cht::compute_root::<Header, KeccakHasher, _>(4, 0, local_headers_hashes.into_iter()).unwrap();
if insert_cht {
local_storage.insert_cht_root(1, local_cht_root);
}
let local_executor = test_client::LocalExecutor::new();
let local_checker = LightDataChecker::new(local_executor);
(local_checker, local_cht_root, remote_block_header, remote_header_proof)
}
#[test]
fn storage_read_proof_is_generated_and_checked() {
let (local_checker, remote_block_header, remote_read_proof, authorities_len) = prepare_for_read_proof_check();
assert_eq!((&local_checker as &FetchChecker<Block>).check_read_proof(&RemoteReadRequest::<Header> {
block: remote_block_header.hash(),
header: remote_block_header,
key: b":auth:len".to_vec(),
}, remote_read_proof).unwrap().unwrap()[0], authorities_len as u8);
}
#[test]
fn header_proof_is_generated_and_checked() {
let (local_checker, local_cht_root, remote_block_header, remote_header_proof) = prepare_for_header_proof_check(true);
assert_eq!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
cht_root: local_cht_root,
block: 1,
}, Some(remote_block_header.clone()), remote_header_proof).unwrap(), remote_block_header);
}
#[test]
fn check_header_proof_fails_if_cht_root_is_invalid() {
let (local_checker, _, mut remote_block_header, remote_header_proof) = prepare_for_header_proof_check(true);
remote_block_header.number = 100;
assert!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
cht_root: Default::default(),
block: 1,
}, Some(remote_block_header.clone()), remote_header_proof).is_err());
}
#[test]
fn check_header_proof_fails_if_invalid_header_provided() {
let (local_checker, local_cht_root, mut remote_block_header, remote_header_proof) = prepare_for_header_proof_check(true);
remote_block_header.number = 100;
assert!((&local_checker as &FetchChecker<Block>).check_header_proof(&RemoteHeaderRequest::<Header> {
cht_root: local_cht_root,
block: 1,
}, Some(remote_block_header.clone()), remote_header_proof).is_err());
}
}
+7
View File
@@ -46,6 +46,9 @@ pub trait Client<Block: BlockT>: Send + Sync {
/// Get block justification.
fn justification(&self, id: &BlockId<Block>) -> Result<Option<Justification<Block::Hash>>, Error>;
/// Get block header proof.
fn header_proof(&self, block_number: <Block::Header as HeaderT>::Number) -> Result<(Block::Header, Vec<Vec<u8>>), Error>;
/// Get storage read execution proof.
fn read_proof(&self, block: &Block::Hash, key: &[u8]) -> Result<Vec<Vec<u8>>, Error>;
@@ -88,6 +91,10 @@ impl<B, E, Block> Client<Block> for SubstrateClient<B, E, Block> where
(self as &SubstrateClient<B, E, Block>).justification(id)
}
fn header_proof(&self, block_number: <Block::Header as HeaderT>::Number) -> Result<(Block::Header, Vec<Vec<u8>>), Error> {
(self as &SubstrateClient<B, E, Block>).header_proof(&BlockId::Number(block_number))
}
fn read_proof(&self, block: &Block::Hash, key: &[u8]) -> Result<Vec<Vec<u8>>, Error> {
(self as &SubstrateClient<B, E, Block>).read_proof(&BlockId::Hash(block.clone()), key)
}
+26 -1
View File
@@ -20,7 +20,8 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
use codec::{Encode, Decode, Input, Output};
pub use self::generic::{
BlockAnnounce, RemoteCallRequest, RemoteReadRequest,
ConsensusVote, SignedConsensusVote, FromBlock
RemoteHeaderRequest, RemoteHeaderResponse, ConsensusVote,
SignedConsensusVote, FromBlock
};
/// A unique ID of a request.
@@ -270,6 +271,10 @@ pub mod generic {
RemoteReadRequest(RemoteReadRequest<Hash>),
/// Remote storage read response.
RemoteReadResponse(RemoteReadResponse),
/// Remote header request.
RemoteHeaderRequest(RemoteHeaderRequest<Number>),
/// Remote header response.
RemoteHeaderResponse(RemoteHeaderResponse<Header>),
/// Chain-specific message
#[codec(index = "255")]
ChainSpecific(Vec<u8>),
@@ -348,4 +353,24 @@ pub mod generic {
/// Storage key.
pub key: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote header request.
pub struct RemoteHeaderRequest<N> {
/// Unique request id.
pub id: RequestId,
/// Block number to request header for.
pub block: N,
}
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
/// Remote header response.
pub struct RemoteHeaderResponse<Header> {
/// Id of a request this response was made for.
pub id: RequestId,
/// Header. None if proof generation has failed (e.g. header is unknown).
pub header: Option<Header>,
/// Header proof.
pub proof: Vec<Vec<u8>>,
}
}
+103 -3
View File
@@ -25,7 +25,8 @@ use linked_hash_map::LinkedHashMap;
use linked_hash_map::Entry;
use parking_lot::Mutex;
use client;
use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest, RemoteReadRequest};
use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
RemoteCallRequest, RemoteReadRequest};
use io::SyncIo;
use message;
use network_libp2p::{Severity, NodeIndex};
@@ -46,6 +47,14 @@ pub trait OnDemandService<Block: BlockT>: Send + Sync {
/// Maintain peers requests.
fn maintain_peers(&self, io: &mut SyncIo);
/// When header response is received from remote node.
fn on_remote_header_response(
&self,
io: &mut SyncIo,
peer: NodeIndex,
response: message::RemoteHeaderResponse<Block::Header>
);
/// When read response is received from remote node.
fn on_remote_read_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteReadResponse);
@@ -80,6 +89,7 @@ struct Request<Block: BlockT> {
}
enum RequestData<Block: BlockT> {
RemoteHeader(RemoteHeaderRequest<Block::Header>, Sender<Result<Block::Header, client::error::Error>>),
RemoteRead(RemoteReadRequest<Block::Header>, Sender<Result<Option<Vec<u8>>, client::error::Error>>),
RemoteCall(RemoteCallRequest<Block::Header>, Sender<Result<client::CallResult, client::error::Error>>),
}
@@ -198,6 +208,20 @@ impl<B, E> OnDemandService<B> for OnDemand<B, E> where
core.dispatch();
}
fn on_remote_header_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteHeaderResponse<B::Header>) {
self.accept_response("header", io, peer, response.id, |request| match request.data {
RequestData::RemoteHeader(request, sender) => match self.checker.check_header_proof(&request, response.header, response.proof) {
Ok(response) => {
// we do not bother if receiver has been dropped already
let _ = sender.send(Ok(response));
Accept::Ok
},
Err(error) => Accept::CheckFailed(error, RequestData::RemoteHeader(request, sender)),
},
data @ _ => Accept::Unexpected(data),
})
}
fn on_remote_read_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteReadResponse) {
self.accept_response("read", io, peer, response.id, |request| match request.data {
RequestData::RemoteRead(request, sender) => match self.checker.check_read_proof(&request, response.proof) {
@@ -232,9 +256,16 @@ impl<B, E> Fetcher<B> for OnDemand<B, E> where
E: service::ExecuteInContext<B>,
B::Header: HeaderT,
{
type RemoteHeaderResult = RemoteResponse<B::Header>;
type RemoteReadResult = RemoteResponse<Option<Vec<u8>>>;
type RemoteCallResult = RemoteResponse<client::CallResult>;
fn remote_header(&self, request: RemoteHeaderRequest<B::Header>) -> Self::RemoteHeaderResult {
let (sender, receiver) = channel();
self.schedule_request(RequestData::RemoteHeader(request, sender),
RemoteResponse { receiver })
}
fn remote_read(&self, request: RemoteReadRequest<B::Header>) -> Self::RemoteReadResult {
let (sender, receiver) = channel();
self.schedule_request(RequestData::RemoteRead(request, sender),
@@ -332,6 +363,11 @@ impl<B, E> OnDemandCore<B, E> where
impl<Block: BlockT> Request<Block> {
pub fn message(&self) -> message::Message<Block> {
match self.data {
RequestData::RemoteHeader(ref data, _) => message::generic::Message::RemoteHeaderRequest(
message::RemoteHeaderRequest {
id: self.id,
block: data.block,
}),
RequestData::RemoteRead(ref data, _) => message::generic::Message::RemoteReadRequest(
message::RemoteReadRequest {
id: self.id,
@@ -357,7 +393,8 @@ pub mod tests {
use futures::Future;
use parking_lot::RwLock;
use client;
use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest, RemoteReadRequest};
use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
RemoteCallRequest, RemoteReadRequest};
use message;
use network_libp2p::NodeIndex;
use service::{Roles, ExecuteInContext};
@@ -373,6 +410,18 @@ pub mod tests {
}
impl FetchChecker<Block> for DummyFetchChecker {
fn check_header_proof(
&self,
_request: &RemoteHeaderRequest<Header>,
header: Option<Header>,
_remote_proof: Vec<Vec<u8>>
) -> client::error::Result<Header> {
match self.ok {
true if header.is_some() => Ok(header.unwrap()),
_ => Err(client::error::ErrorKind::Backend("Test error".into()).into()),
}
}
fn check_read_proof(&self, _request: &RemoteReadRequest<Header>, _remote_proof: Vec<Vec<u8>>) -> client::error::Result<Option<Vec<u8>>> {
match self.ok {
true => Ok(Some(vec![42])),
@@ -513,7 +562,7 @@ pub mod tests {
}
#[test]
fn receives_remote_response() {
fn receives_remote_call_response() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
@@ -533,4 +582,55 @@ pub mod tests {
receive_call_response(&*on_demand, &mut network, 0, 0);
thread.join().unwrap();
}
#[test]
fn receives_remote_read_response() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
let response = on_demand.remote_read(RemoteReadRequest {
header: dummy_header(),
block: Default::default(),
key: b":key".to_vec()
});
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
assert_eq!(result, Some(vec![42]));
});
on_demand.on_remote_read_response(&mut network, 0, message::RemoteReadResponse {
id: 0,
proof: vec![vec![2]],
});
thread.join().unwrap();
}
#[test]
fn receives_remote_header_response() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Roles::FULL);
let response = on_demand.remote_header(RemoteHeaderRequest { cht_root: Default::default(), block: 1 });
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
assert_eq!(result.hash(), "80729accb7bb10ff9c637a10e8bb59f21c52571aa7b46544c5885ca89ed190f4".into());
});
on_demand.on_remote_header_response(&mut network, 0, message::RemoteHeaderResponse {
id: 0,
header: Some(Header {
parent_hash: Default::default(),
number: 1,
state_root: Default::default(),
extrinsics_root: Default::default(),
digest: Default::default(),
}),
proof: vec![vec![2]],
});
thread.join().unwrap();
}
}
+24 -1
View File
@@ -20,7 +20,7 @@ use std::sync::Arc;
use std::time;
use parking_lot::RwLock;
use rustc_hex::ToHex;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, As};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, NumberFor, As};
use runtime_primitives::generic::BlockId;
use network_libp2p::{NodeIndex, Severity};
use codec::{Encode, Decode};
@@ -271,6 +271,8 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(io, who, response),
GenericMessage::RemoteReadRequest(request) => self.on_remote_read_request(io, who, request),
GenericMessage::RemoteReadResponse(response) => self.on_remote_read_response(io, who, response),
GenericMessage::RemoteHeaderRequest(request) => self.on_remote_header_request(io, who, request),
GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(io, who, response),
other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, other),
}
}
@@ -625,6 +627,27 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
self.on_demand.as_ref().map(|s| s.on_remote_read_response(io, who, response));
}
fn on_remote_header_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteHeaderRequest<NumberFor<B>>) {
trace!(target: "sync", "Remote header proof request {} from {} ({})",
request.id, who, request.block);
let (header, proof) = match self.context_data.chain.header_proof(request.block) {
Ok((header, proof)) => (Some(header), proof),
Err(error) => {
trace!(target: "sync", "Remote header proof request {} from {} ({}) failed with: {}",
request.id, who, request.block, error);
(Default::default(), Default::default())
},
};
self.send_message(io, who, GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse {
id: request.id, header, proof,
}));
}
fn on_remote_header_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteHeaderResponse<B::Header>) {
trace!(target: "sync", "Remote header proof response {} from {}", response.id, who);
self.on_demand.as_ref().map(|s| s.on_remote_header_response(io, who, response));
}
/// Execute a closure with access to a network context and specialization.
pub fn with_spec<F, U>(&self, io: &mut SyncIo, f: F) -> U
where F: FnOnce(&mut S, &mut Context<B>) -> U
@@ -214,7 +214,8 @@ pub struct LightComponents<Factory: ServiceFactory> {
impl<Factory: ServiceFactory> Components for LightComponents<Factory>
where
<<Factory as ServiceFactory>::Block as BlockT>::Hash: Into<H256>,
<<Factory as ServiceFactory>::Block as BlockT>::Hash: From<H256>,
H256: From<<<Factory as ServiceFactory>::Block as BlockT>::Hash>,
{
type Factory = Factory;
type Executor = LightExecutor<Factory>;