Preparing light client structure [v2] (#150)

* light client structure + remote call requests
This commit is contained in:
Svyatoslav Nikolsky
2018-05-28 10:27:59 +03:00
committed by GitHub
parent 4808077674
commit 58ba901ccc
39 changed files with 1835 additions and 550 deletions
+8
View File
@@ -922,6 +922,11 @@ name = "libc"
version = "0.2.36" version = "0.2.36"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "linked-hash-map"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "local-encoding" name = "local-encoding"
version = "0.2.0" version = "0.2.0"
@@ -1398,6 +1403,7 @@ dependencies = [
"substrate-network 0.1.0", "substrate-network 0.1.0",
"substrate-primitives 0.1.0", "substrate-primitives 0.1.0",
"substrate-runtime-io 0.1.0", "substrate-runtime-io 0.1.0",
"substrate-state-machine 0.1.0",
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@@ -1952,6 +1958,7 @@ dependencies = [
"ethcore-network 1.12.0 (git+https://github.com/paritytech/parity.git)", "ethcore-network 1.12.0 (git+https://github.com/paritytech/parity.git)",
"ethcore-network-devp2p 1.12.0 (git+https://github.com/paritytech/parity.git)", "ethcore-network-devp2p 1.12.0 (git+https://github.com/paritytech/parity.git)",
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -2779,6 +2786,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c8f31047daa365f19be14b47c29df4f7c3b581832407daabe6ae77397619237d" "checksum lazy_static 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c8f31047daa365f19be14b47c29df4f7c3b581832407daabe6ae77397619237d"
"checksum lazycell 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a6f08839bc70ef4a3fe1d566d5350f519c5912ea86be0df1740a7d247c7fc0ef" "checksum lazycell 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a6f08839bc70ef4a3fe1d566d5350f519c5912ea86be0df1740a7d247c7fc0ef"
"checksum libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "1e5d97d6708edaa407429faa671b942dc0f2727222fb6b6539bf1db936e4b121" "checksum libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "1e5d97d6708edaa407429faa671b942dc0f2727222fb6b6539bf1db936e4b121"
"checksum linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "70fb39025bc7cdd76305867c4eccf2f2dcf6e9a57f5b21a93e1c2d86cd03ec9e"
"checksum local-encoding 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e1ceb20f39ff7ae42f3ff9795f3986b1daad821caaa1e1732a0944103a5a1a66" "checksum local-encoding 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e1ceb20f39ff7ae42f3ff9795f3986b1daad821caaa1e1732a0944103a5a1a66"
"checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" "checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b"
"checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2" "checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2"
+11 -6
View File
@@ -82,9 +82,12 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
// Create client // Create client
let executor = demo_executor::Executor::new(); let executor = demo_executor::Executor::new();
let mut storage = Default::default();
let god_key = hex!["3d866ec8a9190c8343c2fc593d21d8a6d0c5c4763aaab2349de3a6111d64d124"];
struct GenesisBuilder;
impl client::GenesisBuilder for GenesisBuilder {
fn build(self) -> (primitives::Header, Vec<(Vec<u8>, Vec<u8>)>) {
let god_key = hex!["3d866ec8a9190c8343c2fc593d21d8a6d0c5c4763aaab2349de3a6111d64d124"];
let genesis_config = GenesisConfig { let genesis_config = GenesisConfig {
consensus: Some(ConsensusConfig { consensus: Some(ConsensusConfig {
code: vec![], // TODO code: vec![], // TODO
@@ -126,12 +129,14 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
voting_period: 7 * 120 * 24, // 7 day voting period for council members. voting_period: 7 * 120 * 24, // 7 day voting period for council members.
}), }),
}; };
let prepare_genesis = || {
storage = genesis_config.build_externalities(); let storage = genesis_config.build_externalities();
let block = genesis::construct_genesis_block(&storage); let block = genesis::construct_genesis_block(&storage);
(primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) (primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
}; }
let client = Arc::new(client::new_in_mem(executor, prepare_genesis)?); }
let client = Arc::new(client::new_in_mem(executor, GenesisBuilder)?);
let mut core = ::tokio_core::reactor::Core::new().expect("Unable to spawn event loop."); let mut core = ::tokio_core::reactor::Core::new().expect("Unable to spawn event loop.");
let _rpc_servers = { let _rpc_servers = {
+352
View File
@@ -0,0 +1,352 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Strongly typed API for full Polkadot client.
use client::backend::{Backend, LocalBackend};
use client::{self, Client, LocalCallExecutor};
use polkadot_executor::Executor as LocalDispatch;
use substrate_executor::{NativeExecutionDispatch, NativeExecutor};
use state_machine::{self, OverlayedChanges};
use primitives::{AccountId, BlockId, Hash, Index, SessionKey, Timestamp};
use primitives::parachain::{DutyRoster, CandidateReceipt, Id as ParaId};
use runtime::{self, Block, Header, UncheckedExtrinsic, Extrinsic, Call, TimestampCall, ParachainsCall};
use {CheckedBlockId, BlockBuilder, PolkadotApi, LocalPolkadotApi, ErrorKind, Error, Result};
/// A checked block ID used for the substrate-client implementation of CheckedBlockId;
#[derive(Debug, Clone, Copy)]
pub struct CheckedId(pub BlockId);
impl CheckedBlockId for CheckedId {
fn block_id(&self) -> &BlockId {
&self.0
}
}
// set up the necessary scaffolding to execute a set of calls to the runtime.
// this creates a new block on top of the given ID and initialises it.
macro_rules! with_runtime {
($client: ident, $at: expr, $exec: expr) => {{
let parent = $at.block_id();
let header = Header {
parent_hash: $client.block_hash_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))?,
number: $client.block_number_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))? + 1,
state_root: Default::default(),
extrinsics_root: Default::default(),
digest: Default::default(),
};
$client.state_at(parent).map_err(Error::from).and_then(|state| {
let mut changes = Default::default();
let mut ext = state_machine::Ext::new(&mut changes, &state);
::substrate_executor::with_native_environment(&mut ext, || {
::runtime::Executive::initialise_block(&header);
($exec)()
}).map_err(Into::into)
})
}}
}
/// A polkadot block builder.
#[derive(Debug, Clone)]
pub struct ClientBlockBuilder<S> {
parent: BlockId,
changes: OverlayedChanges,
state: S,
header: Header,
timestamp: Timestamp,
extrinsics: Vec<UncheckedExtrinsic>,
}
impl<S: state_machine::Backend> ClientBlockBuilder<S>
where S::Error: Into<client::error::Error>
{
// initialises a block, ready to allow extrinsics to be applied.
fn initialise_block(&mut self) -> Result<()> {
let result = {
let mut ext = state_machine::Ext::new(&mut self.changes, &self.state);
let h = self.header.clone();
::substrate_executor::with_native_environment(
&mut ext,
|| runtime::Executive::initialise_block(&h),
).map_err(Into::into)
};
match result {
Ok(_) => {
self.changes.commit_prospective();
Ok(())
}
Err(e) => {
self.changes.discard_prospective();
Err(e)
}
}
}
// executes a extrinsic, inherent or otherwise, without appending to the list.
fn apply_extrinsic(&mut self, extrinsic: UncheckedExtrinsic) -> Result<()> {
let result = {
let mut ext = state_machine::Ext::new(&mut self.changes, &self.state);
::substrate_executor::with_native_environment(
&mut ext,
move || runtime::Executive::apply_extrinsic(extrinsic),
).map_err(Into::into)
};
match result {
Ok(_) => {
self.changes.commit_prospective();
Ok(())
}
Err(e) => {
self.changes.discard_prospective();
Err(e)
}
}
}
}
impl<S: state_machine::Backend> BlockBuilder for ClientBlockBuilder<S>
where S::Error: Into<client::error::Error>
{
fn push_extrinsic(&mut self, extrinsic: UncheckedExtrinsic) -> Result<()> {
// Check that this is not an "inherent" extrinsic.
if extrinsic.signature == Default::default() {
bail!(ErrorKind::PushedInherentTransaction(extrinsic));
} else {
self.apply_extrinsic(extrinsic.clone())?;
self.extrinsics.push(extrinsic);
Ok(())
}
}
fn bake(mut self) -> Block {
let mut ext = state_machine::Ext::new(&mut self.changes, &self.state);
let final_header = ::substrate_executor::with_native_environment(
&mut ext,
move || runtime::Executive::finalise_block()
).expect("all inherent extrinsics pushed; all other extrinsics executed correctly; qed");
Block {
header: final_header,
extrinsics: self.extrinsics,
}
}
}
impl<B: LocalBackend> PolkadotApi for Client<B, LocalCallExecutor<B, NativeExecutor<LocalDispatch>>>
where ::client::error::Error: From<<<B as Backend>::State as state_machine::backend::Backend>::Error>
{
type CheckedBlockId = CheckedId;
type BlockBuilder = ClientBlockBuilder<B::State>;
fn check_id(&self, id: BlockId) -> Result<CheckedId> {
// bail if the code is not the same as the natively linked.
if self.code_at(&id)? != LocalDispatch::native_equivalent() {
bail!("This node is out of date. Block authoring may not work correctly. Bailing.")
}
Ok(CheckedId(id))
}
fn session_keys(&self, at: &CheckedId) -> Result<Vec<SessionKey>> {
with_runtime!(self, at, ::runtime::Consensus::authorities)
}
fn validators(&self, at: &CheckedId) -> Result<Vec<AccountId>> {
with_runtime!(self, at, ::runtime::Session::validators)
}
fn random_seed(&self, at: &CheckedId) -> Result<Hash> {
with_runtime!(self, at, ::runtime::System::random_seed)
}
fn duty_roster(&self, at: &CheckedId) -> Result<DutyRoster> {
with_runtime!(self, at, ::runtime::Parachains::calculate_duty_roster)
}
fn timestamp(&self, at: &CheckedId) -> Result<Timestamp> {
with_runtime!(self, at, ::runtime::Timestamp::now)
}
fn evaluate_block(&self, at: &CheckedId, block: Block) -> Result<bool> {
use substrate_executor::error::ErrorKind as ExecErrorKind;
let res = with_runtime!(self, at, || ::runtime::Executive::execute_block(block));
match res {
Ok(()) => Ok(true),
Err(err) => match err.kind() {
&ErrorKind::Executor(ExecErrorKind::Runtime) => Ok(false),
_ => Err(err)
}
}
}
fn index(&self, at: &CheckedId, account: AccountId) -> Result<Index> {
with_runtime!(self, at, || ::runtime::System::account_index(account))
}
fn active_parachains(&self, at: &CheckedId) -> Result<Vec<ParaId>> {
with_runtime!(self, at, ::runtime::Parachains::active_parachains)
}
fn parachain_code(&self, at: &CheckedId, parachain: ParaId) -> Result<Option<Vec<u8>>> {
with_runtime!(self, at, || ::runtime::Parachains::parachain_code(parachain))
}
fn parachain_head(&self, at: &CheckedId, parachain: ParaId) -> Result<Option<Vec<u8>>> {
with_runtime!(self, at, || ::runtime::Parachains::parachain_head(parachain))
}
fn build_block(&self, parent: &CheckedId, timestamp: Timestamp, parachains: Vec<CandidateReceipt>) -> Result<Self::BlockBuilder> {
let parent = parent.block_id();
let header = Header {
parent_hash: self.block_hash_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))?,
number: self.block_number_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))? + 1,
state_root: Default::default(),
extrinsics_root: Default::default(),
digest: Default::default(),
};
let extrinsics = vec![
UncheckedExtrinsic {
extrinsic: Extrinsic {
signed: Default::default(),
index: Default::default(),
function: Call::Timestamp(TimestampCall::set(timestamp)),
},
signature: Default::default(),
},
UncheckedExtrinsic {
extrinsic: Extrinsic {
signed: Default::default(),
index: Default::default(),
function: Call::Parachains(ParachainsCall::set_heads(parachains)),
},
signature: Default::default(),
}
];
let mut builder = ClientBlockBuilder {
parent: *parent,
changes: OverlayedChanges::default(),
state: self.state_at(parent)?,
header,
timestamp,
extrinsics: extrinsics.clone(),
};
builder.initialise_block()?;
for inherent in extrinsics {
builder.apply_extrinsic(inherent)?;
}
Ok(builder)
}
}
impl<B: LocalBackend> LocalPolkadotApi for Client<B, LocalCallExecutor<B, NativeExecutor<LocalDispatch>>>
where ::client::error::Error: From<<<B as Backend>::State as state_machine::backend::Backend>::Error>
{}
#[cfg(test)]
mod tests {
use super::*;
use keyring::Keyring;
use codec::Slicable;
use client::{self, LocalCallExecutor};
use client::in_mem::Backend as InMemory;
use substrate_executor::NativeExecutionDispatch;
use substrate_primitives::{self, Header};
use runtime::{GenesisConfig, ConsensusConfig, SessionConfig, BuildExternalities};
fn validators() -> Vec<AccountId> {
vec![
Keyring::One.to_raw_public(),
Keyring::Two.to_raw_public(),
]
}
fn client() -> Client<InMemory, LocalCallExecutor<InMemory, NativeExecutor<LocalDispatch>>> {
struct GenesisBuilder;
impl client::GenesisBuilder for GenesisBuilder {
fn build(self) -> (Header, Vec<(Vec<u8>, Vec<u8>)>) {
let genesis_config = GenesisConfig {
consensus: Some(ConsensusConfig {
code: LocalDispatch::native_equivalent().to_vec(),
authorities: validators(),
}),
system: None,
session: Some(SessionConfig {
validators: validators(),
session_length: 100,
}),
council: Some(Default::default()),
democracy: Some(Default::default()),
parachains: Some(Default::default()),
staking: Some(Default::default()),
};
let storage = genesis_config.build_externalities();
let block = ::client::genesis::construct_genesis_block(&storage);
(substrate_primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
}
}
::client::new_in_mem(LocalDispatch::new(), GenesisBuilder).unwrap()
}
#[test]
fn gets_session_and_validator_keys() {
let client = client();
let id = client.check_id(BlockId::Number(0)).unwrap();
assert_eq!(client.session_keys(&id).unwrap(), validators());
assert_eq!(client.validators(&id).unwrap(), validators());
}
#[test]
fn build_block() {
let client = client();
let id = client.check_id(BlockId::Number(0)).unwrap();
let block_builder = client.build_block(&id, 1_000_000, Vec::new()).unwrap();
let block = block_builder.bake();
assert_eq!(block.header.number, 1);
assert!(block.header.extrinsics_root != Default::default());
}
#[test]
fn fails_to_check_id_for_unknown_block() {
assert!(client().check_id(BlockId::Number(100)).is_err());
}
#[test]
fn gets_random_seed_with_genesis() {
let client = client();
let id = client.check_id(BlockId::Number(0)).unwrap();
assert!(client.random_seed(&id).is_ok());
}
}
+8 -317
View File
@@ -34,14 +34,12 @@ extern crate error_chain;
#[cfg(test)] #[cfg(test)]
extern crate substrate_keyring as keyring; extern crate substrate_keyring as keyring;
use client::backend::Backend; pub mod full;
use client::Client; pub mod light;
use polkadot_executor::Executor as LocalDispatch;
use substrate_executor::{NativeExecutionDispatch, NativeExecutor};
use state_machine::OverlayedChanges;
use primitives::{AccountId, BlockId, Hash, Index, SessionKey, Timestamp}; use primitives::{AccountId, BlockId, Hash, Index, SessionKey, Timestamp};
use primitives::parachain::{DutyRoster, CandidateReceipt, Id as ParaId}; use primitives::parachain::{DutyRoster, CandidateReceipt, Id as ParaId};
use runtime::{Block, Header, UncheckedExtrinsic, Extrinsic, Call, TimestampCall, ParachainsCall}; use runtime::{Block, UncheckedExtrinsic};
error_chain! { error_chain! {
errors { errors {
@@ -152,315 +150,8 @@ pub trait PolkadotApi {
fn build_block(&self, parent: &Self::CheckedBlockId, timestamp: Timestamp, parachains: Vec<CandidateReceipt>) -> Result<Self::BlockBuilder>; fn build_block(&self, parent: &Self::CheckedBlockId, timestamp: Timestamp, parachains: Vec<CandidateReceipt>) -> Result<Self::BlockBuilder>;
} }
/// A checked block ID used for the substrate-client implementation of CheckedBlockId; /// Mark for all Polkadot API implementations, that are making use of state data, stored locally.
#[derive(Debug, Clone, Copy)] pub trait LocalPolkadotApi: PolkadotApi {}
pub struct CheckedId(BlockId);
impl CheckedBlockId for CheckedId { /// Mark for all Polkadot API implementations, that are fetching required state data from remote nodes.
fn block_id(&self) -> &BlockId { pub trait RemotePolkadotApi: PolkadotApi {}
&self.0
}
}
// set up the necessary scaffolding to execute a set of calls to the runtime.
// this creates a new block on top of the given ID and initialises it.
macro_rules! with_runtime {
($client: ident, $at: expr, $exec: expr) => {{
let parent = $at.block_id();
let header = Header {
parent_hash: $client.block_hash_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))?,
number: $client.block_number_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))? + 1,
state_root: Default::default(),
extrinsics_root: Default::default(),
digest: Default::default(),
};
$client.state_at(parent).map_err(Error::from).and_then(|state| {
let mut changes = Default::default();
let mut ext = state_machine::Ext::new(&mut changes, &state);
::substrate_executor::with_native_environment(&mut ext, || {
::runtime::Executive::initialise_block(&header);
($exec)()
}).map_err(Into::into)
})
}}
}
impl<B: Backend> PolkadotApi for Client<B, NativeExecutor<LocalDispatch>>
where ::client::error::Error: From<<<B as Backend>::State as state_machine::backend::Backend>::Error>
{
type CheckedBlockId = CheckedId;
type BlockBuilder = ClientBlockBuilder<B::State>;
fn check_id(&self, id: BlockId) -> Result<CheckedId> {
// bail if the code is not the same as the natively linked.
if self.code_at(&id)? != LocalDispatch::native_equivalent() {
bail!("This node is out of date. Block authoring may not work correctly. Bailing.")
}
Ok(CheckedId(id))
}
fn session_keys(&self, at: &CheckedId) -> Result<Vec<SessionKey>> {
with_runtime!(self, at, ::runtime::Consensus::authorities)
}
fn validators(&self, at: &CheckedId) -> Result<Vec<AccountId>> {
with_runtime!(self, at, ::runtime::Session::validators)
}
fn random_seed(&self, at: &CheckedId) -> Result<Hash> {
with_runtime!(self, at, ::runtime::System::random_seed)
}
fn duty_roster(&self, at: &CheckedId) -> Result<DutyRoster> {
with_runtime!(self, at, ::runtime::Parachains::calculate_duty_roster)
}
fn timestamp(&self, at: &CheckedId) -> Result<Timestamp> {
with_runtime!(self, at, ::runtime::Timestamp::now)
}
fn evaluate_block(&self, at: &CheckedId, block: Block) -> Result<bool> {
use substrate_executor::error::ErrorKind as ExecErrorKind;
let res = with_runtime!(self, at, || ::runtime::Executive::execute_block(block));
match res {
Ok(()) => Ok(true),
Err(err) => match err.kind() {
&ErrorKind::Executor(ExecErrorKind::Runtime) => Ok(false),
_ => Err(err)
}
}
}
fn index(&self, at: &CheckedId, account: AccountId) -> Result<Index> {
with_runtime!(self, at, || ::runtime::System::account_index(account))
}
fn active_parachains(&self, at: &CheckedId) -> Result<Vec<ParaId>> {
with_runtime!(self, at, ::runtime::Parachains::active_parachains)
}
fn parachain_code(&self, at: &CheckedId, parachain: ParaId) -> Result<Option<Vec<u8>>> {
with_runtime!(self, at, || ::runtime::Parachains::parachain_code(parachain))
}
fn parachain_head(&self, at: &CheckedId, parachain: ParaId) -> Result<Option<Vec<u8>>> {
with_runtime!(self, at, || ::runtime::Parachains::parachain_head(parachain))
}
fn build_block(&self, parent: &CheckedId, timestamp: Timestamp, parachains: Vec<CandidateReceipt>) -> Result<Self::BlockBuilder> {
let parent = parent.block_id();
let header = Header {
parent_hash: self.block_hash_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))?,
number: self.block_number_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))? + 1,
state_root: Default::default(),
extrinsics_root: Default::default(),
digest: Default::default(),
};
let extrinsics = vec![
UncheckedExtrinsic {
extrinsic: Extrinsic {
signed: Default::default(),
index: Default::default(),
function: Call::Timestamp(TimestampCall::set(timestamp)),
},
signature: Default::default(),
},
UncheckedExtrinsic {
extrinsic: Extrinsic {
signed: Default::default(),
index: Default::default(),
function: Call::Parachains(ParachainsCall::set_heads(parachains)),
},
signature: Default::default(),
}
];
let mut builder = ClientBlockBuilder {
parent: *parent,
changes: OverlayedChanges::default(),
state: self.state_at(parent)?,
header,
timestamp,
extrinsics: extrinsics.clone(),
};
builder.initialise_block()?;
for inherent in extrinsics {
builder.apply_extrinsic(inherent)?;
}
Ok(builder)
}
}
/// A polkadot block builder.
#[derive(Debug, Clone)]
pub struct ClientBlockBuilder<S> {
parent: BlockId,
changes: OverlayedChanges,
state: S,
header: Header,
timestamp: Timestamp,
extrinsics: Vec<UncheckedExtrinsic>,
}
impl<S: state_machine::Backend> ClientBlockBuilder<S>
where S::Error: Into<client::error::Error>
{
// initialises a block, ready to allow extrinsics to be applied.
fn initialise_block(&mut self) -> Result<()> {
let result = {
let mut ext = state_machine::Ext::new(&mut self.changes, &self.state);
let h = self.header.clone();
::substrate_executor::with_native_environment(
&mut ext,
|| runtime::Executive::initialise_block(&h),
).map_err(Into::into)
};
match result {
Ok(_) => {
self.changes.commit_prospective();
Ok(())
}
Err(e) => {
self.changes.discard_prospective();
Err(e)
}
}
}
// executes a extrinsic, inherent or otherwise, without appending to the list.
fn apply_extrinsic(&mut self, extrinsic: UncheckedExtrinsic) -> Result<()> {
let result = {
let mut ext = state_machine::Ext::new(&mut self.changes, &self.state);
::substrate_executor::with_native_environment(
&mut ext,
move || runtime::Executive::apply_extrinsic(extrinsic),
).map_err(Into::into)
};
match result {
Ok(_) => {
self.changes.commit_prospective();
Ok(())
}
Err(e) => {
self.changes.discard_prospective();
Err(e)
}
}
}
}
impl<S: state_machine::Backend> BlockBuilder for ClientBlockBuilder<S>
where S::Error: Into<client::error::Error>
{
fn push_extrinsic(&mut self, extrinsic: UncheckedExtrinsic) -> Result<()> {
// Check that this is not an "inherent" extrinsic.
if extrinsic.signature == Default::default() {
bail!(ErrorKind::PushedInherentTransaction(extrinsic));
} else {
self.apply_extrinsic(extrinsic.clone())?;
self.extrinsics.push(extrinsic);
Ok(())
}
}
fn bake(mut self) -> Block {
let mut ext = state_machine::Ext::new(&mut self.changes, &self.state);
let final_header = ::substrate_executor::with_native_environment(
&mut ext,
move || runtime::Executive::finalise_block()
).expect("all inherent extrinsics pushed; all other extrinsics executed correctly; qed");
Block {
header: final_header,
extrinsics: self.extrinsics,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use keyring::Keyring;
use codec::Slicable;
use client::in_mem::Backend as InMemory;
use substrate_executor::NativeExecutionDispatch;
use runtime::{GenesisConfig, ConsensusConfig, SessionConfig, BuildExternalities};
fn validators() -> Vec<AccountId> {
vec![
Keyring::One.to_raw_public(),
Keyring::Two.to_raw_public(),
]
}
fn client() -> Client<InMemory, NativeExecutor<LocalDispatch>> {
let genesis_config = GenesisConfig {
consensus: Some(ConsensusConfig {
code: LocalDispatch::native_equivalent().to_vec(),
authorities: validators(),
}),
system: None,
session: Some(SessionConfig {
validators: validators(),
session_length: 100,
}),
council: Some(Default::default()),
democracy: Some(Default::default()),
parachains: Some(Default::default()),
staking: Some(Default::default()),
};
::client::new_in_mem(
LocalDispatch::new(),
|| {
let storage = genesis_config.build_externalities();
let block = ::client::genesis::construct_genesis_block(&storage);
(substrate_primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
}
).unwrap()
}
#[test]
fn gets_session_and_validator_keys() {
let client = client();
let id = client.check_id(BlockId::Number(0)).unwrap();
assert_eq!(client.session_keys(&id).unwrap(), validators());
assert_eq!(client.validators(&id).unwrap(), validators());
}
#[test]
fn build_block() {
let client = client();
let id = client.check_id(BlockId::Number(0)).unwrap();
let block_builder = client.build_block(&id, 1_000_000, Vec::new()).unwrap();
let block = block_builder.bake();
assert_eq!(block.header.number, 1);
assert!(block.header.extrinsics_root != Default::default());
}
#[test]
fn fails_to_check_id_for_unknown_block() {
assert!(client().check_id(BlockId::Number(100)).is_err());
}
#[test]
fn gets_random_seed_with_genesis() {
let client = client();
let id = client.check_id(BlockId::Number(0)).unwrap();
assert!(client.random_seed(&id).is_ok());
}
}
+106
View File
@@ -0,0 +1,106 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Strongly typed API for light Polkadot client.
use std::sync::Arc;
use client::backend::{Backend, RemoteBackend};
use client::{Client, CallExecutor};
use codec::Slicable;
use state_machine;
use primitives::{AccountId, BlockId, Hash, Index, SessionKey, Timestamp};
use primitives::parachain::{DutyRoster, CandidateReceipt, Id as ParaId};
use runtime::{Block, UncheckedExtrinsic};
use full::CheckedId;
use {PolkadotApi, RemotePolkadotApi, BlockBuilder, CheckedBlockId, Result, ErrorKind};
/// Remote polkadot API implementation.
pub struct RemotePolkadotApiWrapper<B: Backend, E: CallExecutor>(pub Arc<Client<B, E>>);
/// Block builder for light client.
pub struct LightBlockBuilder;
impl<B: Backend, E: CallExecutor> PolkadotApi for RemotePolkadotApiWrapper<B, E>
where ::client::error::Error: From<<<B as Backend>::State as state_machine::backend::Backend>::Error>
{
type CheckedBlockId = CheckedId;
type BlockBuilder = LightBlockBuilder;
fn check_id(&self, id: BlockId) -> Result<CheckedId> {
Ok(CheckedId(id))
}
fn session_keys(&self, at: &CheckedId) -> Result<Vec<SessionKey>> {
self.0.executor().call(at.block_id(), "authorities", &[])
.and_then(|r| Vec::<SessionKey>::decode(&mut &r.return_data[..])
.ok_or("error decoding session keys".into()))
.map_err(Into::into)
}
fn validators(&self, _at: &CheckedId) -> Result<Vec<AccountId>> {
Err(ErrorKind::UnknownRuntime.into())
}
fn random_seed(&self, _at: &Self::CheckedBlockId) -> Result<Hash> {
Err(ErrorKind::UnknownRuntime.into())
}
fn duty_roster(&self, _at: &CheckedId) -> Result<DutyRoster> {
Err(ErrorKind::UnknownRuntime.into())
}
fn timestamp(&self, _at: &CheckedId) -> Result<Timestamp> {
Err(ErrorKind::UnknownRuntime.into())
}
fn evaluate_block(&self, _at: &CheckedId, _block: Block) -> Result<bool> {
Err(ErrorKind::UnknownRuntime.into())
}
fn index(&self, _at: &CheckedId, _account: AccountId) -> Result<Index> {
Err(ErrorKind::UnknownRuntime.into())
}
fn active_parachains(&self, _at: &Self::CheckedBlockId) -> Result<Vec<ParaId>> {
Err(ErrorKind::UnknownRuntime.into())
}
fn parachain_code(&self, _at: &Self::CheckedBlockId, _parachain: ParaId) -> Result<Option<Vec<u8>>> {
Err(ErrorKind::UnknownRuntime.into())
}
fn parachain_head(&self, _at: &Self::CheckedBlockId, _parachain: ParaId) -> Result<Option<Vec<u8>>> {
Err(ErrorKind::UnknownRuntime.into())
}
fn build_block(&self, _parent: &CheckedId, _timestamp: Timestamp, _parachains: Vec<CandidateReceipt>) -> Result<Self::BlockBuilder> {
Err(ErrorKind::UnknownRuntime.into())
}
}
impl<B: RemoteBackend, E: CallExecutor> RemotePolkadotApi for RemotePolkadotApiWrapper<B, E>
where ::client::error::Error: From<<<B as Backend>::State as state_machine::backend::Backend>::Error>
{}
impl BlockBuilder for LightBlockBuilder {
fn push_extrinsic(&mut self, _extrinsic: UncheckedExtrinsic) -> Result<()> {
Err(ErrorKind::UnknownRuntime.into())
}
fn bake(self) -> Block {
unimplemented!()
}
}
+4
View File
@@ -37,6 +37,10 @@ args:
long: validator long: validator
help: Enable validator mode help: Enable validator mode
takes_value: false takes_value: false
- light:
long: light
help: Run in light client mode
takes_value: false
- port: - port:
long: port long: port
value_name: PORT value_name: PORT
+8 -2
View File
@@ -23,12 +23,18 @@ use tokio_core::reactor;
use network::{SyncState, SyncProvider}; use network::{SyncState, SyncProvider};
use runtime_support::Hashable; use runtime_support::Hashable;
use primitives::block::HeaderHash; use primitives::block::HeaderHash;
use client::BlockchainEvents; use state_machine;
use client::{self, BlockchainEvents};
const TIMER_INTERVAL_MS: u64 = 5000; const TIMER_INTERVAL_MS: u64 = 5000;
/// Spawn informant on the event loop /// Spawn informant on the event loop
pub fn start(service: &Service, handle: reactor::Handle) { pub fn start<B, E>(service: &Service<B, E>, handle: reactor::Handle)
where
B: client::backend::Backend + Send + Sync + 'static,
E: client::CallExecutor + Send + Sync + 'static,
client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>
{
let interval = reactor::Interval::new_at(Instant::now(), Duration::from_millis(TIMER_INTERVAL_MS), &handle) let interval = reactor::Interval::new_at(Instant::now(), Duration::from_millis(TIMER_INTERVAL_MS), &handle)
.expect("Error creating informant timer"); .expect("Error creating informant timer");
+28 -15
View File
@@ -107,16 +107,7 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
I: IntoIterator<Item = T>, I: IntoIterator<Item = T>,
T: Into<std::ffi::OsString> + Clone, T: Into<std::ffi::OsString> + Clone,
{ {
let mut core = reactor::Core::new().expect("tokio::Core could not be created"); let core = reactor::Core::new().expect("tokio::Core could not be created");
let exit = {
// can't use signal directly here because CtrlC takes only `Fn`.
let (exit_send, exit) = mpsc::channel(1);
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).wait().expect("Error sending exit notification");
});
exit
};
let yaml = load_yaml!("./cli.yml"); let yaml = load_yaml!("./cli.yml");
let matches = match clap::App::from_yaml(yaml).version(crate_version!()).get_matches_from_safe(args) { let matches = match clap::App::from_yaml(yaml).version(crate_version!()).get_matches_from_safe(args) {
@@ -152,10 +143,12 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
if matches.is_present("collator") { if matches.is_present("collator") {
info!("Starting collator."); info!("Starting collator.");
role = service::Role::COLLATOR; role = service::Role::COLLATOR;
} } else if matches.is_present("validator") {
else if matches.is_present("validator") {
info!("Starting validator."); info!("Starting validator.");
role = service::Role::VALIDATOR; role = service::Role::VALIDATOR;
} else if matches.is_present("light") {
info!("Starting light.");
role = service::Role::LIGHT;
} }
match matches.value_of("chain") { match matches.value_of("chain") {
@@ -195,13 +188,33 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect(); config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect();
let service = service::Service::new(config)?; match role == service::Role::LIGHT {
true => run_until_exit(core, service::new_light(config)?, &matches),
false => run_until_exit(core, service::new_full(config)?, &matches),
}
}
fn run_until_exit<B, E>(mut core: reactor::Core, service: service::Service<B, E>, matches: &clap::ArgMatches) -> error::Result<()>
where
B: client::backend::Backend + Send + Sync + 'static,
E: client::CallExecutor + Send + Sync + 'static,
client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>
{
let exit = {
// can't use signal directly here because CtrlC takes only `Fn`.
let (exit_send, exit) = mpsc::channel(1);
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).wait().expect("Error sending exit notification");
});
exit
};
informant::start(&service, core.handle()); informant::start(&service, core.handle());
let _rpc_servers = { let _rpc_servers = {
let http_address = parse_address("127.0.0.1:9933", "rpc-port", &matches)?; let http_address = parse_address("127.0.0.1:9933", "rpc-port", matches)?;
let ws_address = parse_address("127.0.0.1:9944", "ws-port", &matches)?; let ws_address = parse_address("127.0.0.1:9944", "ws-port", matches)?;
let handler = || { let handler = || {
let chain = rpc::apis::chain::Chain::new(service.client(), core.remote()); let chain = rpc::apis::chain::Chain::new(service.client(), core.remote());
+6 -4
View File
@@ -29,7 +29,7 @@ use ed25519;
use futures::prelude::*; use futures::prelude::*;
use futures::{future, Canceled}; use futures::{future, Canceled};
use parking_lot::Mutex; use parking_lot::Mutex;
use polkadot_api::PolkadotApi; use polkadot_api::LocalPolkadotApi;
use polkadot_primitives::AccountId; use polkadot_primitives::AccountId;
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt}; use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
use primitives::{Hash, AuthorityId}; use primitives::{Hash, AuthorityId};
@@ -233,15 +233,17 @@ pub struct Service {
impl Service { impl Service {
/// Create and start a new instance. /// Create and start a new instance.
pub fn new<C>( pub fn new<A, C>(
client: Arc<C>, client: Arc<C>,
api: Arc<A>,
network: Arc<net::ConsensusService>, network: Arc<net::ConsensusService>,
transaction_pool: Arc<Mutex<TransactionPool>>, transaction_pool: Arc<Mutex<TransactionPool>>,
parachain_empty_duration: Duration, parachain_empty_duration: Duration,
key: ed25519::Pair, key: ed25519::Pair,
) -> Service ) -> Service
where where
C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static, A: LocalPolkadotApi + Send + Sync + 'static,
C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + Send + Sync + 'static,
{ {
let (signal, exit) = ::exit_future::signal(); let (signal, exit) = ::exit_future::signal();
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
@@ -249,7 +251,7 @@ impl Service {
let key = Arc::new(key); let key = Arc::new(key);
let factory = ProposerFactory { let factory = ProposerFactory {
client: client.clone(), client: api.clone(),
transaction_pool: transaction_pool.clone(), transaction_pool: transaction_pool.clone(),
network: Network(network.clone()), network: Network(network.clone()),
collators: NoCollators, collators: NoCollators,
+1
View File
@@ -27,3 +27,4 @@ substrate-client = { path = "../../substrate/client" }
substrate-client-db = { path = "../../substrate/client/db" } substrate-client-db = { path = "../../substrate/client/db" }
substrate-codec = { path = "../../substrate/codec" } substrate-codec = { path = "../../substrate/codec" }
substrate-executor = { path = "../../substrate/executor" } substrate-executor = { path = "../../substrate/executor" }
substrate-state-machine = { path = "../../substrate/state-machine" }
+113 -36
View File
@@ -35,6 +35,7 @@ extern crate substrate_network as network;
extern crate substrate_codec as codec; extern crate substrate_codec as codec;
extern crate substrate_client_db as client_db; extern crate substrate_client_db as client_db;
extern crate substrate_executor; extern crate substrate_executor;
extern crate substrate_state_machine as state_machine;
extern crate exit_future; extern crate exit_future;
extern crate tokio_core; extern crate tokio_core;
@@ -55,7 +56,7 @@ use futures::prelude::*;
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use codec::Slicable; use codec::Slicable;
use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash}; use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash, Header};
use primitives::{AuthorityId, hashing}; use primitives::{AuthorityId, hashing};
use transaction_pool::TransactionPool; use transaction_pool::TransactionPool;
use substrate_executor::NativeExecutor; use substrate_executor::NativeExecutor;
@@ -64,31 +65,39 @@ use keystore::Store as Keystore;
use polkadot_api::PolkadotApi; use polkadot_api::PolkadotApi;
use polkadot_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyConfig, use polkadot_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyConfig,
SessionConfig, StakingConfig, BuildExternalities}; SessionConfig, StakingConfig, BuildExternalities};
use client::{genesis, BlockchainEvents}; use client::backend::Backend;
use client::{genesis, Client, BlockchainEvents, CallExecutor};
use network::ManageNetwork; use network::ManageNetwork;
use exit_future::Signal; use exit_future::Signal;
pub use self::error::{ErrorKind, Error}; pub use self::error::{ErrorKind, Error};
pub use config::{Configuration, Role, ChainSpec}; pub use config::{Configuration, Role, ChainSpec};
type Client = client::Client<client_db::Backend, NativeExecutor<LocalDispatch>>; type CodeExecutor = NativeExecutor<LocalDispatch>;
/// Polkadot service. /// Polkadot service.
pub struct Service { pub struct Service<B, E> {
thread: Option<thread::JoinHandle<()>>, thread: Option<thread::JoinHandle<()>>,
client: Arc<Client>, client: Arc<Client<B, E>>,
network: Arc<network::Service>, network: Arc<network::Service>,
transaction_pool: Arc<Mutex<TransactionPool>>, transaction_pool: Arc<Mutex<TransactionPool>>,
signal: Option<Signal>, signal: Option<Signal>,
_consensus: Option<consensus::Service>, _consensus: Option<consensus::Service>,
} }
struct TransactionPoolAdapter { struct TransactionPoolAdapter<B, E, A> where A: Send + Sync, E: Send + Sync {
pool: Arc<Mutex<TransactionPool>>, pool: Arc<Mutex<TransactionPool>>,
client: Arc<Client>, client: Arc<Client<B, E>>,
api: Arc<A>,
} }
impl network::TransactionPool for TransactionPoolAdapter { impl<B, E, A> network::TransactionPool for TransactionPoolAdapter<B, E, A>
where
B: Backend + Send + Sync,
E: client::CallExecutor + Send + Sync,
client::error::Error: From<<<B as Backend>::State as state_machine::backend::Backend>::Error>,
A: PolkadotApi + Send + Sync,
{
fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)> { fn transactions(&self) -> Vec<(ExtrinsicHash, Vec<u8>)> {
let best_block = match self.client.info() { let best_block = match self.client.info() {
Ok(info) => info.chain.best_hash, Ok(info) => info.chain.best_hash,
@@ -97,10 +106,11 @@ impl network::TransactionPool for TransactionPoolAdapter {
return Vec::new(); return Vec::new();
} }
}; };
let id = self.client.check_id(BlockId::Hash(best_block)).expect("Best block is always valid; qed.");
let id = self.api.check_id(BlockId::Hash(best_block)).expect("Best block is always valid; qed.");
let mut pool = self.pool.lock(); let mut pool = self.pool.lock();
pool.cull(None, transaction_pool::Ready::create(id, &*self.client)); pool.cull(None, transaction_pool::Ready::create(id.clone(), &*self.api));
pool.pending(transaction_pool::Ready::create(id, &*self.client)).map(|t| { pool.pending(transaction_pool::Ready::create(id, &*self.api)).map(|t| {
let hash = ::primitives::Hash::from(&t.hash()[..]); let hash = ::primitives::Hash::from(&t.hash()[..]);
let tx = codec::Slicable::encode(t.as_transaction()); let tx = codec::Slicable::encode(t.as_transaction());
(hash, tx) (hash, tx)
@@ -257,16 +267,89 @@ fn local_testnet_config() -> ChainConfig {
]) ])
} }
impl Service { struct GenesisBuilder {
config: GenesisConfig,
}
impl client::GenesisBuilder for GenesisBuilder {
fn build(self) -> (Header, Vec<(Vec<u8>, Vec<u8>)>) {
let storage = self.config.build_externalities();
let block = genesis::construct_genesis_block(&storage);
(primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
}
}
/// Creates light client and register protocol with the network service
pub fn new_light(config: Configuration) -> Result<Service<client::light::Backend, client::RemoteCallExecutor<client::light::Backend, network::OnDemand<network::Service>>>, error::Error> {
Service::new(move |_, executor, genesis_builder: GenesisBuilder| {
let client_backend = client::light::new_light_backend();
let fetch_checker = Arc::new(client::light::new_fetch_checker(client_backend.clone(), executor));
let fetcher = Arc::new(network::OnDemand::new(fetch_checker));
let client = client::light::new_light(client_backend, fetcher.clone(), genesis_builder)?;
Ok((Arc::new(client), Some(fetcher)))
},
|client| Arc::new(polkadot_api::light::RemotePolkadotApiWrapper(client.clone())),
|_client, _network, _tx_pool, _keystore| Ok(None),
config)
}
/// Creates full client and register protocol with the network service
pub fn new_full(config: Configuration) -> Result<Service<client_db::Backend, client::LocalCallExecutor<client_db::Backend, CodeExecutor>>, error::Error> {
let is_validator = (config.roles & Role::VALIDATOR) == Role::VALIDATOR;
Service::new(|db_settings, executor, genesis_builder: GenesisBuilder|
Ok((Arc::new(client_db::new_client(db_settings, executor, genesis_builder)?), None)),
|client| client,
|client, network, tx_pool, keystore| {
if !is_validator {
return Ok(None);
}
// Load the first available key. Code above makes sure it exisis.
let key = keystore.load(&keystore.contents()?[0], "")?;
info!("Using authority key {:?}", key.public());
Ok(Some(consensus::Service::new(
client.clone(),
client.clone(),
network.clone(),
tx_pool.clone(),
::std::time::Duration::from_millis(4000), // TODO: dynamic
key,
)))
},
config)
}
impl<B, E> Service<B, E>
where
B: Backend + Send + Sync + 'static,
E: CallExecutor + Send + Sync + 'static,
client::error::Error: From<<<B as Backend>::State as state_machine::backend::Backend>::Error>
{
/// Creates and register protocol with the network service /// Creates and register protocol with the network service
pub fn new(mut config: Configuration) -> Result<Service, error::Error> { fn new<F, G, C, A>(client_creator: F, api_creator: G, consensus_creator: C, mut config: Configuration) -> Result<Self, error::Error>
where
F: FnOnce(
client_db::DatabaseSettings,
CodeExecutor,
GenesisBuilder,
) -> Result<(Arc<Client<B, E>>, Option<Arc<network::OnDemand<network::Service>>>), error::Error>,
G: Fn(
Arc<Client<B, E>>,
) -> Arc<A>,
C: Fn(
Arc<Client<B, E>>,
Arc<network::Service>,
Arc<Mutex<TransactionPool>>,
&Keystore
) -> Result<Option<consensus::Service>, error::Error>,
A: PolkadotApi + Send + Sync + 'static,
{
use std::sync::Barrier; use std::sync::Barrier;
let (signal, exit) = ::exit_future::signal(); let (signal, exit) = ::exit_future::signal();
// Create client // Create client
let executor = polkadot_executor::Executor::new(); let executor = polkadot_executor::Executor::new();
let mut storage = Default::default();
let mut keystore = Keystore::open(config.keystore_path.into())?; let mut keystore = Keystore::open(config.keystore_path.into())?;
for seed in &config.keys { for seed in &config.keys {
@@ -285,10 +368,8 @@ impl Service {
}; };
config.network.boot_nodes.extend(boot_nodes); config.network.boot_nodes.extend(boot_nodes);
let prepare_genesis = || { let genesis_builder = GenesisBuilder {
storage = genesis_config.build_externalities(); config: genesis_config,
let block = genesis::construct_genesis_block(&storage);
(primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
}; };
let db_settings = client_db::DatabaseSettings { let db_settings = client_db::DatabaseSettings {
@@ -296,13 +377,15 @@ impl Service {
path: config.database_path.into(), path: config.database_path.into(),
}; };
let client = Arc::new(client_db::new_client(db_settings, executor, prepare_genesis)?); let (client, on_demand) = client_creator(db_settings, executor, genesis_builder)?;
let api = api_creator(client.clone());
let best_header = client.best_block_header()?; let best_header = client.best_block_header()?;
info!("Starting Polkadot. Best block is #{}", best_header.number); info!("Starting Polkadot. Best block is #{}", best_header.number);
let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool))); let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool)));
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
pool: transaction_pool.clone(), pool: transaction_pool.clone(),
client: client.clone(), client: client.clone(),
api: api.clone(),
}); });
let network_params = network::Params { let network_params = network::Params {
config: network::ProtocolConfig { config: network::ProtocolConfig {
@@ -310,11 +393,13 @@ impl Service {
}, },
network_config: config.network, network_config: config.network,
chain: client.clone(), chain: client.clone(),
on_demand: on_demand.clone().map(|d| d as Arc<network::OnDemandService>),
transaction_pool: transaction_pool_adapter, transaction_pool: transaction_pool_adapter,
}; };
let network = network::Service::new(network_params)?; let network = network::Service::new(network_params)?;
let barrier = ::std::sync::Arc::new(Barrier::new(2)); let barrier = ::std::sync::Arc::new(Barrier::new(2));
on_demand.map(|on_demand| on_demand.set_service_link(Arc::downgrade(&network)));
let thread = { let thread = {
let client = client.clone(); let client = client.clone();
@@ -347,20 +432,7 @@ impl Service {
barrier.wait(); barrier.wait();
// Spin consensus service if configured // Spin consensus service if configured
let consensus_service = if config.roles & Role::VALIDATOR == Role::VALIDATOR { let consensus_service = consensus_creator(client.clone(), network.clone(), transaction_pool.clone(), &keystore)?;
// Load the first available key. Code above makes sure it exisis.
let key = keystore.load(&keystore.contents()?[0], "")?;
info!("Using authority key {:?}", key.public());
Some(consensus::Service::new(
client.clone(),
network.clone(),
transaction_pool.clone(),
::std::time::Duration::from_millis(4000), // TODO: dynamic
key,
))
} else {
None
};
Ok(Service { Ok(Service {
thread: Some(thread), thread: Some(thread),
@@ -373,7 +445,7 @@ impl Service {
} }
/// Get shared client instance. /// Get shared client instance.
pub fn client(&self) -> Arc<Client> { pub fn client(&self) -> Arc<Client<B, E>> {
self.client.clone() self.client.clone()
} }
@@ -396,7 +468,12 @@ fn prune_transactions(pool: &mut TransactionPool, extrinsics: &[Extrinsic]) {
} }
/// Produce a task which prunes any finalized transactions from the pool. /// Produce a task which prunes any finalized transactions from the pool.
pub fn prune_imported(client: &Client, pool: &Mutex<TransactionPool>, hash: HeaderHash) { pub fn prune_imported<B, E>(client: &Client<B, E>, pool: &Mutex<TransactionPool>, hash: HeaderHash)
where
B: Backend + Send + Sync,
E: CallExecutor + Send + Sync,
client::error::Error: From<<<B as Backend>::State as state_machine::backend::Backend>::Error>
{
let id = BlockId::Hash(hash); let id = BlockId::Hash(hash);
match client.body(&id) { match client.body(&id) {
Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]), Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]),
@@ -405,7 +482,7 @@ pub fn prune_imported(client: &Client, pool: &Mutex<TransactionPool>, hash: Head
} }
} }
impl Drop for Service { impl<B, E> Drop for Service<B, E> {
fn drop(&mut self) { fn drop(&mut self) {
self.network.stop_network(); self.network.stop_network();
+10 -7
View File
@@ -65,14 +65,15 @@ pub struct DatabaseSettings {
pub fn new_client<E, F>( pub fn new_client<E, F>(
settings: DatabaseSettings, settings: DatabaseSettings,
executor: E, executor: E,
build_genesis: F genesis_builder: F,
) -> Result<client::Client<Backend, E>, client::error::Error> ) -> Result<client::Client<Backend, client::LocalCallExecutor<Backend, E>>, client::error::Error>
where where
E: CodeExecutor, E: CodeExecutor,
F: FnOnce() -> (block::Header, Vec<(Vec<u8>, Vec<u8>)>) F: client::GenesisBuilder,
{ {
let backend = Backend::new(&settings)?; let backend = Arc::new(Backend::new(&settings)?);
Ok(client::Client::new(backend, executor, build_genesis)?) let executor = client::LocalCallExecutor::new(backend.clone(), executor);
Ok(client::Client::new(backend, executor, genesis_builder)?)
} }
mod columns { mod columns {
@@ -265,8 +266,8 @@ pub struct BlockImportOperation {
impl client::backend::BlockImportOperation for BlockImportOperation { impl client::backend::BlockImportOperation for BlockImportOperation {
type State = DbState; type State = DbState;
fn state(&self) -> Result<&Self::State, client::error::Error> { fn state(&self) -> Result<Option<&Self::State>, client::error::Error> {
Ok(&self.old_state) Ok(Some(&self.old_state))
} }
fn set_block_data(&mut self, header: block::Header, body: Option<block::Body>, justification: Option<primitives::bft::Justification>, is_best: bool) -> Result<(), client::error::Error> { fn set_block_data(&mut self, header: block::Header, body: Option<block::Body>, justification: Option<primitives::bft::Justification>, is_best: bool) -> Result<(), client::error::Error> {
@@ -534,6 +535,8 @@ impl client::backend::Backend for Backend {
} }
} }
impl client::backend::LocalBackend for Backend {}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
+9 -3
View File
@@ -26,8 +26,8 @@ pub trait BlockImportOperation {
/// Associated state backend type. /// Associated state backend type.
type State: StateBackend; type State: StateBackend;
/// Returns pending state. /// Returns pending state. Returns None for backends with locally-unavailable state data.
fn state(&self) -> error::Result<&Self::State>; fn state(&self) -> error::Result<Option<&Self::State>>;
/// Append block data to the transaction. /// Append block data to the transaction.
fn set_block_data(&mut self, header: block::Header, body: Option<block::Body>, justification: Option<primitives::bft::Justification>, is_new_best: bool) -> error::Result<()>; fn set_block_data(&mut self, header: block::Header, body: Option<block::Body>, justification: Option<primitives::bft::Justification>, is_new_best: bool) -> error::Result<()>;
/// Inject storage data into the database. /// Inject storage data into the database.
@@ -44,7 +44,7 @@ pub trait BlockImportOperation {
/// ///
/// The same applies for live `BlockImportOperation`s: while an import operation building on a parent `P` /// The same applies for live `BlockImportOperation`s: while an import operation building on a parent `P`
/// is alive, the state for `P` should not be pruned. /// is alive, the state for `P` should not be pruned.
pub trait Backend { pub trait Backend: Send + Sync {
/// Associated block insertion operation type. /// Associated block insertion operation type.
type BlockImportOperation: BlockImportOperation; type BlockImportOperation: BlockImportOperation;
/// Associated blockchain backend type. /// Associated blockchain backend type.
@@ -62,3 +62,9 @@ pub trait Backend {
/// Returns state backend with post-state of given block. /// Returns state backend with post-state of given block.
fn state_at(&self, block: BlockId) -> error::Result<Self::State>; fn state_at(&self, block: BlockId) -> error::Result<Self::State>;
} }
/// Mark for all Backend implementations, that are making use of state data, stored locally.
pub trait LocalBackend: Backend {}
/// Mark for all Backend implementations, that are fetching required state data from remote nodes.
pub trait RemoteBackend: Backend {}
@@ -18,16 +18,16 @@
use std::vec::Vec; use std::vec::Vec;
use codec::{Joiner, Slicable}; use codec::{Joiner, Slicable};
use state_machine::{self, CodeExecutor}; use state_machine;
use primitives::{Header, Block}; use primitives::{Header, Block};
use primitives::block::{Id as BlockId, Extrinsic}; use primitives::block::{Id as BlockId, Extrinsic};
use {backend, error, Client}; use {backend, error, Client, CallExecutor};
use triehash::ordered_trie_root; use triehash::ordered_trie_root;
/// Utility for building new (valid) blocks from a stream of transactions. /// Utility for building new (valid) blocks from a stream of transactions.
pub struct BlockBuilder<B, E> where pub struct BlockBuilder<B, E> where
B: backend::Backend, B: backend::Backend,
E: CodeExecutor + Clone, E: CallExecutor + Clone,
error::Error: From<<<B as backend::Backend>::State as state_machine::backend::Backend>::Error>, error::Error: From<<<B as backend::Backend>::State as state_machine::backend::Backend>::Error>,
{ {
header: Header, header: Header,
@@ -39,7 +39,7 @@ pub struct BlockBuilder<B, E> where
impl<B, E> BlockBuilder<B, E> where impl<B, E> BlockBuilder<B, E> where
B: backend::Backend, B: backend::Backend,
E: CodeExecutor + Clone, E: CallExecutor + Clone,
error::Error: From<<<B as backend::Backend>::State as state_machine::backend::Backend>::Error>, error::Error: From<<<B as backend::Backend>::State as state_machine::backend::Backend>::Error>,
{ {
/// Create a new instance of builder from the given client, building on the latest block. /// Create a new instance of builder from the given client, building on the latest block.
@@ -59,7 +59,7 @@ impl<B, E> BlockBuilder<B, E> where
digest: Default::default(), digest: Default::default(),
}, },
transactions: Default::default(), transactions: Default::default(),
executor: client.clone_executor(), executor: client.executor().clone(),
state: client.state_at(block_id)?, state: client.state_at(block_id)?,
changes: Default::default(), changes: Default::default(),
}) })
@@ -69,7 +69,10 @@ impl<B, E> BlockBuilder<B, E> where
/// can be validly executed (by executing it); if it is invalid, it'll be returned along with /// can be validly executed (by executing it); if it is invalid, it'll be returned along with
/// the error. Otherwise, it will return a mutable reference to self (in order to chain). /// the error. Otherwise, it will return a mutable reference to self (in order to chain).
pub fn push(&mut self, tx: Extrinsic) -> error::Result<()> { pub fn push(&mut self, tx: Extrinsic) -> error::Result<()> {
let (output, _) = state_machine::execute(&self.state, &mut self.changes, &self.executor, "execute_transaction", let (output, _) = self.executor.call_at_state(
&self.state,
&mut self.changes,
"execute_transaction",
&vec![].and(&self.header).and(&tx))?; &vec![].and(&self.header).and(&tx))?;
self.header = Header::decode(&mut &output[..]).expect("Header came straight out of runtime so must be valid"); self.header = Header::decode(&mut &output[..]).expect("Header came straight out of runtime so must be valid");
self.transactions.push(tx); self.transactions.push(tx);
@@ -79,7 +82,10 @@ impl<B, E> BlockBuilder<B, E> where
/// Consume the builder to return a valid `Block` containing all pushed transactions. /// Consume the builder to return a valid `Block` containing all pushed transactions.
pub fn bake(mut self) -> error::Result<Block> { pub fn bake(mut self) -> error::Result<Block> {
self.header.extrinsics_root = ordered_trie_root(self.transactions.iter().map(Slicable::encode)).0.into(); self.header.extrinsics_root = ordered_trie_root(self.transactions.iter().map(Slicable::encode)).0.into();
let (output, _) = state_machine::execute(&self.state, &mut self.changes, &self.executor, "finalise_block", let (output, _) = self.executor.call_at_state(
&self.state,
&mut self.changes,
"finalise_block",
&self.header.encode())?; &self.header.encode())?;
self.header = Header::decode(&mut &output[..]).expect("Header came straight out of runtime so must be valid"); self.header = Header::decode(&mut &output[..]).expect("Header came straight out of runtime so must be valid");
Ok(Block { Ok(Block {
@@ -0,0 +1,203 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use futures::{IntoFuture, Future};
use primitives::block::Id as BlockId;
use state_machine::{self, OverlayedChanges, Backend as StateBackend, CodeExecutor};
use state_machine::backend::InMemory as InMemoryStateBackend;
use triehash::trie_root;
use backend;
use blockchain::Backend as ChainBackend;
use error;
use light::{Fetcher, RemoteCallRequest};
/// Information regarding the result of a call.
#[derive(Debug)]
pub struct CallResult {
/// The data that was returned from the call.
pub return_data: Vec<u8>,
/// The changes made to the state by the call.
pub changes: OverlayedChanges,
}
/// Method call executor.
pub trait CallExecutor {
/// Externalities error type.
type Error: state_machine::Error;
/// Execute a call to a contract on top of state in a block of given hash.
///
/// No changes are made.
fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> Result<CallResult, error::Error>;
/// Execute a call to a contract on top of given state.
///
/// No changes are made.
fn call_at_state<S: state_machine::Backend>(&self, state: &S, overlay: &mut OverlayedChanges, method: &str, call_data: &[u8]) -> Result<(Vec<u8>, S::Transaction), error::Error>;
}
/// Call executor that executes methods locally, querying all required
/// data from local backend.
pub struct LocalCallExecutor<B, E> {
backend: Arc<B>,
executor: E,
}
/// Call executor that executes methods on remote node, querying execution proof
/// and checking proof by re-executing locally.
pub struct RemoteCallExecutor<B, F> {
backend: Arc<B>,
fetcher: Arc<F>,
}
impl<B, E> LocalCallExecutor<B, E> {
/// Creates new instance of local call executor.
pub fn new(backend: Arc<B>, executor: E) -> Self {
LocalCallExecutor { backend, executor }
}
}
impl<B, E> Clone for LocalCallExecutor<B, E> where E: Clone {
fn clone(&self) -> Self {
LocalCallExecutor {
backend: self.backend.clone(),
executor: self.executor.clone(),
}
}
}
impl<B, E> CallExecutor for LocalCallExecutor<B, E>
where
B: backend::LocalBackend,
E: CodeExecutor,
error::Error: From<<<B as backend::Backend>::State as StateBackend>::Error>,
{
type Error = E::Error;
fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> error::Result<CallResult> {
let mut changes = OverlayedChanges::default();
let (return_data, _) = self.call_at_state(&self.backend.state_at(*id)?, &mut changes, method, call_data)?;
Ok(CallResult{ return_data, changes })
}
fn call_at_state<S: state_machine::Backend>(&self, state: &S, changes: &mut OverlayedChanges, method: &str, call_data: &[u8]) -> error::Result<(Vec<u8>, S::Transaction)> {
state_machine::execute(
state,
changes,
&self.executor,
method,
call_data,
).map_err(Into::into)
}
}
impl<B, F> RemoteCallExecutor<B, F> {
/// Creates new instance of remote call executor.
pub fn new(backend: Arc<B>, fetcher: Arc<F>) -> Self {
RemoteCallExecutor { backend, fetcher }
}
}
impl<B, F> CallExecutor for RemoteCallExecutor<B, F>
where
B: backend::RemoteBackend,
F: Fetcher,
error::Error: From<<<B as backend::Backend>::State as StateBackend>::Error>,
{
type Error = error::Error;
fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> error::Result<CallResult> {
let block_hash = match *id {
BlockId::Hash(hash) => hash,
BlockId::Number(number) => self.backend.blockchain().hash(number)?
.ok_or_else(|| error::ErrorKind::UnknownBlock(BlockId::Number(number)))?,
};
self.fetcher.remote_call(RemoteCallRequest {
block: block_hash,
method: method.into(),
call_data: call_data.to_vec(),
}).into_future().wait()
}
fn call_at_state<S: state_machine::Backend>(&self, _state: &S, _changes: &mut OverlayedChanges, _method: &str, _call_data: &[u8]) -> error::Result<(Vec<u8>, S::Transaction)> {
Err(error::ErrorKind::NotAvailableOnLightClient.into())
}
}
/// Check remote execution proof.
pub fn check_execution_proof<B, E>(backend: &B, executor: &E, request: &RemoteCallRequest, remote_proof: (Vec<u8>, Vec<Vec<u8>>)) -> Result<CallResult, error::Error>
where
B: backend::RemoteBackend,
E: CodeExecutor,
error::Error: From<<<B as backend::Backend>::State as StateBackend>::Error>,
{
let (remote_result, remote_proof) = remote_proof;
let remote_state = state_from_execution_proof(remote_proof);
let remote_state_root = trie_root(remote_state.pairs().into_iter()).0;
let local_header = backend.blockchain().header(BlockId::Hash(request.block))?;
let local_header = local_header.ok_or_else(|| error::ErrorKind::UnknownBlock(BlockId::Hash(request.block)))?;
let local_state_root = local_header.state_root;
if remote_state_root != *local_state_root {
return Err(error::ErrorKind::InvalidExecutionProof.into());
}
let mut changes = OverlayedChanges::default();
let (local_result, _) = state_machine::execute(
&remote_state,
&mut changes,
executor,
&request.method,
&request.call_data,
)?;
if local_result != remote_result {
return Err(error::ErrorKind::InvalidExecutionProof.into());
}
Ok(CallResult { return_data: local_result, changes })
}
/// Convert state to execution proof. Proof is simple the whole state (temporary).
// TODO [light]: this method must be removed after trie-based proofs are landed.
pub fn state_to_execution_proof<B: state_machine::Backend>(state: &B) -> Vec<Vec<u8>> {
state.pairs().into_iter()
.flat_map(|(k, v)| ::std::iter::once(k).chain(::std::iter::once(v)))
.collect()
}
/// Convert execution proof to in-memory state for check. Reverse function for state_to_execution_proof.
// TODO [light]: this method must be removed after trie-based proofs are landed.
fn state_from_execution_proof(proof: Vec<Vec<u8>>) -> InMemoryStateBackend {
let mut changes = Vec::new();
let mut proof_iter = proof.into_iter();
loop {
let key = proof_iter.next();
let value = proof_iter.next();
if let (Some(key), Some(value)) = (key, value) {
changes.push((key, Some(value)));
} else {
break;
}
}
InMemoryStateBackend::default().update(changes)
}
+57 -55
View File
@@ -16,25 +16,33 @@
//! Substrate Client //! Substrate Client
use std::sync::Arc;
use futures::sync::mpsc; use futures::sync::mpsc;
use parking_lot::Mutex; use parking_lot::Mutex;
use primitives::{self, block, AuthorityId}; use primitives::{self, block, AuthorityId};
use primitives::block::Id as BlockId; use primitives::block::Id as BlockId;
use primitives::storage::{StorageKey, StorageData}; use primitives::storage::{StorageKey, StorageData};
use runtime_support::Hashable; use runtime_support::Hashable;
use codec::{KeyedVec, Slicable}; use codec::Slicable;
use state_machine::{self, Ext, OverlayedChanges, Backend as StateBackend, CodeExecutor}; use state_machine::{self, Ext, OverlayedChanges, Backend as StateBackend, CodeExecutor};
use backend::{self, BlockImportOperation}; use backend::{self, BlockImportOperation};
use blockchain::{self, Info as ChainInfo, Backend as ChainBackend}; use blockchain::{self, Info as ChainInfo, Backend as ChainBackend};
use call_executor::{CallExecutor, LocalCallExecutor};
use {error, in_mem, block_builder, runtime_io, bft}; use {error, in_mem, block_builder, runtime_io, bft};
/// Type that implements `futures::Stream` of block import events. /// Type that implements `futures::Stream` of block import events.
pub type BlockchainEventStream = mpsc::UnboundedReceiver<BlockImportNotification>; pub type BlockchainEventStream = mpsc::UnboundedReceiver<BlockImportNotification>;
/// Polkadot Client genesis block builder.
pub trait GenesisBuilder {
/// Build genesis block.
fn build(self) -> (block::Header, Vec<(Vec<u8>, Vec<u8>)>);
}
/// Polkadot Client /// Polkadot Client
pub struct Client<B, E> { pub struct Client<B, E> {
backend: B, backend: Arc<B>,
executor: E, executor: E,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<BlockImportNotification>>>, import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<BlockImportNotification>>>,
} }
@@ -63,14 +71,6 @@ pub struct ClientInfo {
pub best_queued_hash: Option<block::HeaderHash>, pub best_queued_hash: Option<block::HeaderHash>,
} }
/// Information regarding the result of a call.
pub struct CallResult {
/// The data that was returned from the call.
pub return_data: Vec<u8>,
/// The changes made to the state by the call.
pub changes: OverlayedChanges,
}
/// Block import result. /// Block import result.
#[derive(Debug)] #[derive(Debug)]
pub enum ImportResult { pub enum ImportResult {
@@ -146,32 +146,34 @@ impl JustifiedHeader {
/// Create an instance of in-memory client. /// Create an instance of in-memory client.
pub fn new_in_mem<E, F>( pub fn new_in_mem<E, F>(
executor: E, executor: E,
build_genesis: F genesis_builder: F
) -> error::Result<Client<in_mem::Backend, E>> ) -> error::Result<Client<in_mem::Backend, LocalCallExecutor<in_mem::Backend, E>>>
where where
E: CodeExecutor, E: CodeExecutor,
F: FnOnce() -> (block::Header, Vec<(Vec<u8>, Vec<u8>)>) F: GenesisBuilder,
{ {
Client::new(in_mem::Backend::new(), executor, build_genesis) let backend = Arc::new(in_mem::Backend::new());
let executor = LocalCallExecutor::new(backend.clone(), executor);
Client::new(backend, executor, genesis_builder)
} }
impl<B, E> Client<B, E> where impl<B, E> Client<B, E> where
B: backend::Backend, B: backend::Backend,
E: CodeExecutor, E: CallExecutor,
error::Error: From<<<B as backend::Backend>::State as StateBackend>::Error>, error::Error: From<<<B as backend::Backend>::State as StateBackend>::Error>,
{ {
/// Creates new Polkadot Client with given blockchain and code executor. /// Creates new Polkadot Client with given blockchain and code executor.
pub fn new<F>( pub fn new<F>(
backend: B, backend: Arc<B>,
executor: E, executor: E,
build_genesis: F genesis_builder: F,
) -> error::Result<Self> ) -> error::Result<Self>
where where
F: FnOnce() -> (block::Header, Vec<(Vec<u8>, Vec<u8>)>) F: GenesisBuilder
{ {
if backend.blockchain().header(BlockId::Number(0))?.is_none() { if backend.blockchain().header(BlockId::Number(0))?.is_none() {
trace!("Empty database, writing genesis block"); trace!("Empty database, writing genesis block");
let (genesis_header, genesis_store) = build_genesis(); let (genesis_header, genesis_store) = genesis_builder.build();
let mut op = backend.begin_operation(BlockId::Hash(block::HeaderHash::default()))?; let mut op = backend.begin_operation(BlockId::Hash(block::HeaderHash::default()))?;
op.reset_storage(genesis_store.into_iter())?; op.reset_storage(genesis_store.into_iter())?;
op.set_block_data(genesis_header, Some(vec![]), None, true)?; op.set_block_data(genesis_header, Some(vec![]), None, true)?;
@@ -190,7 +192,7 @@ impl<B, E> Client<B, E> where
} }
/// Expose backend reference. To be used in tests only /// Expose backend reference. To be used in tests only
pub fn backend(&self) -> &B { pub fn backend(&self) -> &Arc<B> {
&self.backend &self.backend
} }
@@ -207,36 +209,29 @@ impl<B, E> Client<B, E> where
self.storage(id, &StorageKey(b":code".to_vec())).map(|data| data.0) self.storage(id, &StorageKey(b":code".to_vec())).map(|data| data.0)
} }
/// Clone a new instance of Executor. /// Get the set of authorities at a given block.
pub fn clone_executor(&self) -> E where E: Clone {
self.executor.clone()
}
/// Get the current set of authorities from storage.
pub fn authorities_at(&self, id: &BlockId) -> error::Result<Vec<AuthorityId>> { pub fn authorities_at(&self, id: &BlockId) -> error::Result<Vec<AuthorityId>> {
let state = self.state_at(id)?; self.executor.call(id, "authorities",&[])
(0..u32::decode(&mut state.storage(b":auth:len")?.ok_or(error::ErrorKind::AuthLenEmpty)?.as_slice()).ok_or(error::ErrorKind::AuthLenInvalid)?) .and_then(|r| Vec::<AuthorityId>::decode(&mut &r.return_data[..])
.map(|i| state.storage(&i.to_keyed_vec(b":auth:")) .ok_or(error::ErrorKind::AuthLenInvalid.into()))
.map_err(|e| error::Error::from(e).into())
.and_then(|v| v.ok_or(error::ErrorKind::AuthEmpty(i)))
.and_then(|s| AuthorityId::decode(&mut s.as_slice()).ok_or(error::ErrorKind::AuthInvalid(i)))
.map_err(Into::into)
).collect()
} }
/// Execute a call to a contract on top of state in a block of given hash. /// Get call executor reference.
pub fn executor(&self) -> &E {
&self.executor
}
/// Execute a call to a contract on top of state in a block of given hash
/// AND returning execution proof.
/// ///
/// No changes are made. /// No changes are made.
pub fn call(&self, id: &BlockId, method: &str, call_data: &[u8]) -> error::Result<CallResult> { pub fn execution_proof(&self, id: &BlockId, method: &str, call_data: &[u8]) -> error::Result<(Vec<u8>, Vec<Vec<u8>>)> {
let mut changes = OverlayedChanges::default(); use call_executor::state_to_execution_proof;
let (return_data, _) = state_machine::execute(
&self.state_at(id)?, let result = self.executor.call(id, method, call_data);
&mut changes, let result = result?.return_data;
&self.executor, let proof = self.backend.state_at(*id).map(|state| state_to_execution_proof(&state))?;
method, Ok((result, proof))
call_data,
)?;
Ok(CallResult { return_data, changes })
} }
/// Set up the native execution environment to call into a native runtime code. /// Set up the native execution environment to call into a native runtime code.
@@ -297,21 +292,28 @@ impl<B, E> Client<B, E> where
} }
let mut transaction = self.backend.begin_operation(BlockId::Hash(header.parent_hash))?; let mut transaction = self.backend.begin_operation(BlockId::Hash(header.parent_hash))?;
let mut overlay = OverlayedChanges::default(); let storage_update = match transaction.state()? {
Some(transaction_state) => {
let (_out, storage_update) = state_machine::execute( let mut overlay = Default::default();
transaction.state()?, let (_, storage_update) = self.executor.call_at_state(
transaction_state,
&mut overlay, &mut overlay,
&self.executor,
"execute_block", "execute_block",
&block::Block { header: header.clone(), transactions: body.clone().unwrap_or_default().clone() }.encode() &block::Block { header: header.clone(), transactions: body.clone().unwrap_or_default().clone() }.encode(),
)?; )?;
Some(storage_update)
},
None => None,
};
let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1; let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1;
let hash: block::HeaderHash = header.blake2_256().into(); let hash: block::HeaderHash = header.blake2_256().into();
trace!("Imported {}, (#{}), best={}, origin={:?}", hash, header.number, is_new_best, origin); trace!("Imported {}, (#{}), best={}, origin={:?}", hash, header.number, is_new_best, origin);
transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?; transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?;
if let Some(storage_update) = storage_update {
transaction.update_storage(storage_update)?; transaction.update_storage(storage_update)?;
}
self.backend.commit_operation(transaction)?; self.backend.commit_operation(transaction)?;
if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast { if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast {
@@ -392,7 +394,7 @@ impl<B, E> Client<B, E> where
impl<B, E> bft::BlockImport for Client<B, E> impl<B, E> bft::BlockImport for Client<B, E>
where where
B: backend::Backend, B: backend::Backend,
E: state_machine::CodeExecutor, E: CallExecutor,
error::Error: From<<B::State as state_machine::backend::Backend>::Error> error::Error: From<<B::State as state_machine::backend::Backend>::Error>
{ {
fn import_block(&self, block: block::Block, justification: bft::Justification) { fn import_block(&self, block: block::Block, justification: bft::Justification) {
@@ -408,7 +410,7 @@ impl<B, E> bft::BlockImport for Client<B, E>
impl<B, E> bft::Authorities for Client<B, E> impl<B, E> bft::Authorities for Client<B, E>
where where
B: backend::Backend, B: backend::Backend,
E: state_machine::CodeExecutor, E: CallExecutor,
error::Error: From<<B::State as state_machine::backend::Backend>::Error> error::Error: From<<B::State as state_machine::backend::Backend>::Error>
{ {
fn authorities(&self, at: &BlockId) -> Result<Vec<AuthorityId>, bft::Error> { fn authorities(&self, at: &BlockId) -> Result<Vec<AuthorityId>, bft::Error> {
@@ -419,7 +421,7 @@ impl<B, E> bft::Authorities for Client<B, E>
impl<B, E> BlockchainEvents for Client<B, E> impl<B, E> BlockchainEvents for Client<B, E>
where where
B: backend::Backend, B: backend::Backend,
E: state_machine::CodeExecutor, E: CallExecutor,
error::Error: From<<B::State as state_machine::backend::Backend>::Error> error::Error: From<<B::State as state_machine::backend::Backend>::Error>
{ {
/// Get block import event stream. /// Get block import event stream.
@@ -433,7 +435,7 @@ impl<B, E> BlockchainEvents for Client<B, E>
impl<B, E> ChainHead for Client<B, E> impl<B, E> ChainHead for Client<B, E>
where where
B: backend::Backend, B: backend::Backend,
E: state_machine::CodeExecutor, E: CallExecutor,
error::Error: From<<B::State as state_machine::backend::Backend>::Error> error::Error: From<<B::State as state_machine::backend::Backend>::Error>
{ {
fn best_block_header(&self) -> error::Result<block::Header> { fn best_block_header(&self) -> error::Result<block::Header> {
+18
View File
@@ -81,6 +81,24 @@ error_chain! {
description("bad justification for header"), description("bad justification for header"),
display("bad justification for header: {}", h), display("bad justification for header: {}", h),
} }
/// Not available on light client.
NotAvailableOnLightClient {
description("not available on light client"),
display("This method is not currently available when running in light client mode"),
}
/// Invalid remote proof.
InvalidExecutionProof {
description("invalid execution proof"),
display("Remote node has responded with invalid execution proof"),
}
/// Invalid remote proof.
RemoteFetchCancelled {
description("remote fetch cancelled"),
display("Remote data fetch has been cancelled"),
}
} }
} }
+17 -16
View File
@@ -26,10 +26,6 @@ use primitives::block::{self, Id as BlockId, HeaderHash};
use blockchain::{self, BlockStatus}; use blockchain::{self, BlockStatus};
use state_machine::backend::{Backend as StateBackend, InMemory}; use state_machine::backend::{Backend as StateBackend, InMemory};
fn header_hash(header: &block::Header) -> block::HeaderHash {
header.blake2_256().into()
}
struct PendingBlock { struct PendingBlock {
block: Block, block: Block,
is_best: bool, is_best: bool,
@@ -65,14 +61,8 @@ impl Clone for Blockchain {
} }
impl Blockchain { impl Blockchain {
fn id(&self, id: BlockId) -> Option<HeaderHash> { /// Create new in-memory blockchain storage.
match id { pub fn new() -> Blockchain {
BlockId::Hash(h) => Some(h),
BlockId::Number(n) => self.storage.read().hashes.get(&n).cloned(),
}
}
fn new() -> Blockchain {
Blockchain { Blockchain {
storage: RwLock::new( storage: RwLock::new(
BlockchainStorage { BlockchainStorage {
@@ -85,7 +75,16 @@ impl Blockchain {
} }
} }
fn insert(&self, hash: HeaderHash, header: block::Header, justification: Option<primitives::bft::Justification>, body: Option<block::Body>, is_new_best: bool) { /// Get header hash of given block.
pub fn id(&self, id: BlockId) -> Option<HeaderHash> {
match id {
BlockId::Hash(h) => Some(h),
BlockId::Number(n) => self.storage.read().hashes.get(&n).cloned(),
}
}
/// Insert block.
pub fn insert(&self, hash: HeaderHash, header: block::Header, justification: Option<primitives::bft::Justification>, body: Option<block::Body>, is_new_best: bool) {
let number = header.number; let number = header.number;
let mut storage = self.storage.write(); let mut storage = self.storage.write();
storage.blocks.insert(hash, Block { storage.blocks.insert(hash, Block {
@@ -163,8 +162,8 @@ pub struct BlockImportOperation {
impl backend::BlockImportOperation for BlockImportOperation { impl backend::BlockImportOperation for BlockImportOperation {
type State = InMemory; type State = InMemory;
fn state(&self) -> error::Result<&Self::State> { fn state(&self) -> error::Result<Option<&Self::State>> {
Ok(&self.old_state) Ok(Some(&self.old_state))
} }
fn set_block_data(&mut self, header: block::Header, body: Option<block::Body>, justification: Option<primitives::bft::Justification>, is_new_best: bool) -> error::Result<()> { fn set_block_data(&mut self, header: block::Header, body: Option<block::Body>, justification: Option<primitives::bft::Justification>, is_new_best: bool) -> error::Result<()> {
@@ -227,7 +226,7 @@ impl backend::Backend for Backend {
fn commit_operation(&self, operation: Self::BlockImportOperation) -> error::Result<()> { fn commit_operation(&self, operation: Self::BlockImportOperation) -> error::Result<()> {
if let Some(pending_block) = operation.pending_block { if let Some(pending_block) = operation.pending_block {
let hash = header_hash(&pending_block.block.header); let hash = pending_block.block.header.blake2_256().into();
let old_state = &operation.old_state; let old_state = &operation.old_state;
self.states.write().insert(hash, operation.new_state.unwrap_or_else(|| old_state.clone())); self.states.write().insert(hash, operation.new_state.unwrap_or_else(|| old_state.clone()));
self.blockchain.insert(hash, pending_block.block.header, pending_block.block.justification, pending_block.block.body, pending_block.is_best); self.blockchain.insert(hash, pending_block.block.header, pending_block.block.justification, pending_block.block.body, pending_block.is_best);
@@ -246,3 +245,5 @@ impl backend::Backend for Backend {
} }
} }
} }
impl backend::LocalBackend for Backend {}
+8 -2
View File
@@ -17,6 +17,7 @@
//! Substrate Client and associated logic. //! Substrate Client and associated logic.
#![warn(missing_docs)] #![warn(missing_docs)]
#![recursion_limit="128"]
extern crate substrate_bft as bft; extern crate substrate_bft as bft;
extern crate substrate_codec as codec; extern crate substrate_codec as codec;
@@ -43,12 +44,17 @@ pub mod backend;
pub mod in_mem; pub mod in_mem;
pub mod genesis; pub mod genesis;
pub mod block_builder; pub mod block_builder;
pub mod light;
mod call_executor;
mod client; mod client;
pub use client::{ pub use client::{
new_in_mem, new_in_mem,
BlockStatus, BlockOrigin, BlockchainEventStream, BlockchainEvents, BlockStatus, BlockOrigin, BlockchainEventStream, BlockchainEvents,
Client, ClientInfo, CallResult, ChainHead, Client, ClientInfo, ChainHead,
ImportResult, ImportResult, GenesisBuilder,
}; };
pub use blockchain::Info as ChainInfo; pub use blockchain::Info as ChainInfo;
pub use call_executor::{
CallResult, CallExecutor, LocalCallExecutor, RemoteCallExecutor,
};
+243
View File
@@ -0,0 +1,243 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Light client backend. Only stores headers and justifications of blocks.
//! Everything else is requested from full nodes on demand.
use std::sync::Arc;
use futures::future::IntoFuture;
use primitives;
use primitives::block::{self, Id as BlockId, HeaderHash};
use runtime_support::Hashable;
use state_machine::CodeExecutor;
use state_machine::backend::Backend as StateBackend;
use blockchain::{self, BlockStatus};
use backend;
use call_executor::{CallResult, RemoteCallExecutor, check_execution_proof};
use client::{Client, GenesisBuilder};
use error;
use in_mem::Blockchain as InMemBlockchain;
/// Remote call request.
pub struct RemoteCallRequest {
/// Call at state of given block.
pub block: HeaderHash,
/// Method to call.
pub method: String,
/// Call data.
pub call_data: Vec<u8>,
}
/// Light client data fetcher. Implementations of this trait must check if remote data
/// is correct (see FetchedDataChecker) and return already checked data.
pub trait Fetcher: Send + Sync {
/// Remote call result future.
type RemoteCallResult: IntoFuture<Item=CallResult, Error=error::Error>;
/// Fetch remote call result.
fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult;
}
/// Light client remote data checker.
pub trait FetchChecker: Send + Sync {
/// Check remote method execution proof.
fn check_execution_proof(&self, request: &RemoteCallRequest, remote_proof: (Vec<u8>, Vec<Vec<u8>>)) -> error::Result<CallResult>;
}
/// Light client backend.
pub struct Backend {
blockchain: Blockchain,
}
/// Light client blockchain.
pub struct Blockchain {
storage: InMemBlockchain,
}
/// Block (header and justification) import operation.
pub struct BlockImportOperation {
pending_block: Option<PendingBlock>,
}
/// On-demand state.
#[derive(Clone)]
pub struct OnDemandState {
/// Hash of the block, state is valid for.
_block: HeaderHash,
}
/// Remote data checker.
pub struct LightDataChecker<E> {
/// Backend reference.
backend: Arc<Backend>,
/// Executor.
executor: E,
}
struct PendingBlock {
header: block::Header,
justification: Option<primitives::bft::Justification>,
is_best: bool,
}
impl backend::Backend for Backend {
type BlockImportOperation = BlockImportOperation;
type Blockchain = Blockchain;
type State = OnDemandState;
fn begin_operation(&self, _block: BlockId) -> error::Result<Self::BlockImportOperation> {
Ok(BlockImportOperation {
pending_block: None,
})
}
fn commit_operation(&self, operation: Self::BlockImportOperation) -> error::Result<()> {
if let Some(pending_block) = operation.pending_block {
let hash = pending_block.header.blake2_256().into();
self.blockchain.storage.insert(hash, pending_block.header, pending_block.justification, None, pending_block.is_best);
}
Ok(())
}
fn blockchain(&self) -> &Blockchain {
&self.blockchain
}
fn state_at(&self, block: BlockId) -> error::Result<Self::State> {
Ok(OnDemandState {
_block: self.blockchain.storage.id(block).ok_or(error::ErrorKind::UnknownBlock(block))?,
})
}
}
impl backend::RemoteBackend for Backend {}
impl backend::BlockImportOperation for BlockImportOperation {
type State = OnDemandState;
fn state(&self) -> error::Result<Option<&Self::State>> {
// None means 'locally-stateless' backend
Ok(None)
}
fn set_block_data(&mut self, header: block::Header, _body: Option<block::Body>, justification: Option<primitives::bft::Justification>, is_new_best: bool) -> error::Result<()> {
assert!(self.pending_block.is_none(), "Only one block per operation is allowed");
self.pending_block = Some(PendingBlock {
header,
justification,
is_best: is_new_best,
});
Ok(())
}
fn update_storage(&mut self, _update: <Self::State as StateBackend>::Transaction) -> error::Result<()> {
// we're not storing anything locally => ignore changes
Ok(())
}
fn reset_storage<I: Iterator<Item=(Vec<u8>, Vec<u8>)>>(&mut self, _iter: I) -> error::Result<()> {
// we're not storing anything locally => ignore changes
Ok(())
}
}
impl blockchain::Backend for Blockchain {
fn header(&self, id: BlockId) -> error::Result<Option<block::Header>> {
self.storage.header(id)
}
fn body(&self, _id: BlockId) -> error::Result<Option<block::Body>> {
// TODO [light]: fetch from remote node
Ok(None)
}
fn justification(&self, id: BlockId) -> error::Result<Option<primitives::bft::Justification>> {
self.storage.justification(id)
}
fn info(&self) -> error::Result<blockchain::Info> {
self.storage.info()
}
fn status(&self, id: BlockId) -> error::Result<BlockStatus> {
self.storage.status(id)
}
fn hash(&self, number: block::Number) -> error::Result<Option<block::HeaderHash>> {
self.storage.hash(number)
}
}
impl StateBackend for OnDemandState {
type Error = error::Error;
type Transaction = ();
fn storage(&self, _key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
// TODO [light]: fetch from remote node
Err(error::ErrorKind::NotAvailableOnLightClient.into())
}
fn storage_root<I>(&self, _delta: I) -> ([u8; 32], Self::Transaction)
where I: IntoIterator<Item=(Vec<u8>, Option<Vec<u8>>)> {
([0; 32], ())
}
fn pairs(&self) -> Vec<(Vec<u8>, Vec<u8>)> {
// whole state is not available on light node
Vec::new()
}
}
impl<E> FetchChecker for LightDataChecker<E>
where
E: CodeExecutor,
{
fn check_execution_proof(&self, request: &RemoteCallRequest, remote_proof: (Vec<u8>, Vec<Vec<u8>>)) -> error::Result<CallResult> {
check_execution_proof(&*self.backend, &self.executor, request, remote_proof)
}
}
/// Create an instance of light client backend.
pub fn new_light_backend() -> Arc<Backend> {
let storage = InMemBlockchain::new();
let blockchain = Blockchain { storage };
Arc::new(Backend { blockchain })
}
/// Create an instance of light client.
pub fn new_light<F, B>(
backend: Arc<Backend>,
fetcher: Arc<F>,
genesis_builder: B,
) -> error::Result<Client<Backend, RemoteCallExecutor<Backend, F>>>
where
F: Fetcher,
B: GenesisBuilder,
{
let executor = RemoteCallExecutor::new(backend.clone(), fetcher);
Client::new(backend, executor, genesis_builder)
}
/// Create an instance of fetch data checker.
pub fn new_fetch_checker<E>(
backend: Arc<Backend>,
executor: E,
) -> LightDataChecker<E>
where
E: CodeExecutor,
{
LightDataChecker { backend, executor }
}
+1
View File
@@ -17,6 +17,7 @@ serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
futures = "0.1.17" futures = "0.1.17"
linked-hash-map = "0.5"
ethcore-network = { git = "https://github.com/paritytech/parity.git" } ethcore-network = { git = "https://github.com/paritytech/parity.git" }
ethcore-network-devp2p = { git = "https://github.com/paritytech/parity.git" } ethcore-network-devp2p = { git = "https://github.com/paritytech/parity.git" }
ethcore-io = { git = "https://github.com/paritytech/parity.git" } ethcore-io = { git = "https://github.com/paritytech/parity.git" }
+9 -2
View File
@@ -16,7 +16,7 @@
//! Blockchain access trait //! Blockchain access trait
use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus, BlockOrigin}; use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus, BlockOrigin, CallExecutor};
use client::error::Error; use client::error::Error;
use state_machine; use state_machine;
use primitives::block::{self, Id as BlockId}; use primitives::block::{self, Id as BlockId};
@@ -43,11 +43,14 @@ pub trait Client: Send + Sync {
/// Get block justification. /// Get block justification.
fn justification(&self, id: &BlockId) -> Result<Option<Justification>, Error>; fn justification(&self, id: &BlockId) -> Result<Option<Justification>, Error>;
/// Get method execution proof.
fn execution_proof(&self, block: &block::HeaderHash, method: &str, data: &[u8]) -> Result<(Vec<u8>, Vec<Vec<u8>>), Error>;
} }
impl<B, E> Client for PolkadotClient<B, E> where impl<B, E> Client for PolkadotClient<B, E> where
B: client::backend::Backend + Send + Sync + 'static, B: client::backend::Backend + Send + Sync + 'static,
E: state_machine::CodeExecutor + Send + Sync + 'static, E: CallExecutor + Send + Sync + 'static,
Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>, { Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>, {
fn import(&self, is_best: bool, header: block::Header, justification: Justification, body: Option<block::Body>) -> Result<ImportResult, Error> { fn import(&self, is_best: bool, header: block::Header, justification: Justification, body: Option<block::Body>) -> Result<ImportResult, Error> {
@@ -80,4 +83,8 @@ impl<B, E> Client for PolkadotClient<B, E> where
fn justification(&self, id: &BlockId) -> Result<Option<Justification>, Error> { fn justification(&self, id: &BlockId) -> Result<Option<Justification>, Error> {
(self as &PolkadotClient<B, E>).justification(id) (self as &PolkadotClient<B, E>).justification(id)
} }
fn execution_proof(&self, block: &block::HeaderHash, method: &str, data: &[u8]) -> Result<(Vec<u8>, Vec<Vec<u8>>), Error> {
(self as &PolkadotClient<B, E>).execution_proof(&BlockId::Hash(block.clone()), method, data)
}
} }
+3
View File
@@ -22,6 +22,7 @@
extern crate ethcore_network_devp2p as network_devp2p; extern crate ethcore_network_devp2p as network_devp2p;
extern crate ethcore_network as network; extern crate ethcore_network as network;
extern crate ethcore_io as core_io; extern crate ethcore_io as core_io;
extern crate linked_hash_map;
extern crate rand; extern crate rand;
extern crate parking_lot; extern crate parking_lot;
extern crate substrate_primitives as primitives; extern crate substrate_primitives as primitives;
@@ -53,6 +54,7 @@ mod config;
mod chain; mod chain;
mod blocks; mod blocks;
mod consensus; mod consensus;
mod on_demand;
pub mod error; pub mod error;
#[cfg(test)] mod test; #[cfg(test)] mod test;
@@ -66,6 +68,7 @@ pub use network_devp2p::{ConnectionFilter, ConnectionDirection};
pub use message::{Statement, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal}; pub use message::{Statement, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal};
pub use error::Error; pub use error::Error;
pub use config::{Role, ProtocolConfig}; pub use config::{Role, ProtocolConfig};
pub use on_demand::{OnDemand, OnDemandService, Response as OnDemandResponse};
// TODO: move it elsewhere // TODO: move it elsewhere
fn header_hash(header: &primitives::Header) -> primitives::block::HeaderHash { fn header_hash(header: &primitives::Header) -> primitives::block::HeaderHash {
@@ -247,6 +247,10 @@ pub enum Message {
CandidateResponse(CandidateResponse), CandidateResponse(CandidateResponse),
/// BFT Consensus statement. /// BFT Consensus statement.
BftMessage(LocalizedBftMessage), BftMessage(LocalizedBftMessage),
/// Remote method call request.
RemoteCallRequest(RemoteCallRequest),
/// Remote method call response.
RemoteCallResponse(RemoteCallResponse),
} }
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
@@ -319,3 +323,27 @@ pub struct BlockAnnounce {
/// New block header. /// New block header.
pub header: Header, pub header: Header,
} }
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
/// Remote call request.
pub struct RemoteCallRequest {
/// Unique request id.
pub id: RequestId,
/// Block at which to perform call.
pub block: HeaderHash,
/// Method name.
pub method: String,
/// Call data.
pub data: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
/// Remote call response.
pub struct RemoteCallResponse {
/// Id of a request this response was made for.
pub id: RequestId,
/// Method return value.
pub value: Vec<u8>,
/// Execution proof.
pub proof: Vec<Vec<u8>>,
}
@@ -0,0 +1,423 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
//! On-demand requests service.
use std::collections::VecDeque;
use std::sync::{Arc, Weak};
use std::time::{Instant, Duration};
use futures::{Future, Poll};
use futures::sync::oneshot::{channel, Receiver, Sender};
use linked_hash_map::LinkedHashMap;
use linked_hash_map::Entry;
use parking_lot::Mutex;
use client;
use client::light::{Fetcher, FetchChecker, RemoteCallRequest};
use io::SyncIo;
use message;
use network::PeerId;
use service;
/// Remote request timeout.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
/// On-demand service API.
pub trait OnDemandService: Send + Sync {
/// When new node is connected.
fn on_connect(&self, peer: PeerId, role: service::Role);
/// When node is disconnected.
fn on_disconnect(&self, peer: PeerId);
/// Maintain peers requests.
fn maintain_peers(&self, io: &mut SyncIo);
/// When response is received from remote node.
fn on_remote_response(&self, io: &mut SyncIo, peer: PeerId, response: message::RemoteCallResponse);
}
/// On-demand requests service. Dispatches requests to appropriate peers.
pub struct OnDemand<E: service::ExecuteInContext> {
core: Mutex<OnDemandCore<E>>,
checker: Arc<FetchChecker>,
}
/// On-demand response.
pub struct Response {
receiver: Receiver<client::CallResult>,
}
#[derive(Default)]
struct OnDemandCore<E: service::ExecuteInContext> {
service: Weak<E>,
next_request_id: u64,
pending_requests: VecDeque<Request>,
active_peers: LinkedHashMap<PeerId, Request>,
idle_peers: VecDeque<PeerId>,
}
struct Request {
id: u64,
timestamp: Instant,
sender: Sender<client::CallResult>,
request: RemoteCallRequest,
}
impl Future for Response {
type Item = client::CallResult;
type Error = client::error::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.receiver.poll()
.map_err(|_| client::error::ErrorKind::RemoteFetchCancelled.into())
}
}
impl<E> OnDemand<E> where E: service::ExecuteInContext {
/// Creates new on-demand service.
pub fn new(checker: Arc<FetchChecker>) -> Self {
OnDemand {
checker,
core: Mutex::new(OnDemandCore {
service: Weak::new(),
next_request_id: 0,
pending_requests: VecDeque::new(),
active_peers: LinkedHashMap::new(),
idle_peers: VecDeque::new(),
})
}
}
/// Sets weak reference to network service.
pub fn set_service_link(&self, service: Weak<E>) {
self.core.lock().service = service;
}
/// Execute method call on remote node, returning execution result and proof.
pub fn remote_call(&self, request: RemoteCallRequest) -> Response {
let (sender, receiver) = channel();
let result = Response {
receiver: receiver,
};
{
let mut core = self.core.lock();
core.insert(sender, request);
core.dispatch();
}
result
}
}
impl<E> OnDemandService for OnDemand<E> where E: service::ExecuteInContext {
fn on_connect(&self, peer: PeerId, role: service::Role) {
if !role.intersects(service::Role::FULL | service::Role::COLLATOR | service::Role::VALIDATOR) { // TODO: correct?
return;
}
let mut core = self.core.lock();
core.add_peer(peer);
core.dispatch();
}
fn on_disconnect(&self, peer: PeerId) {
let mut core = self.core.lock();
core.remove_peer(peer);
core.dispatch();
}
fn maintain_peers(&self, io: &mut SyncIo) {
let mut core = self.core.lock();
for bad_peer in core.maintain_peers() {
trace!(target: "sync", "Remote request timeout for peer {}", bad_peer);
io.disconnect_peer(bad_peer);
}
core.dispatch();
}
fn on_remote_response(&self, io: &mut SyncIo, peer: PeerId, response: message::RemoteCallResponse) {
let mut core = self.core.lock();
match core.remove(peer, response.id) {
Some(request) => match self.checker.check_execution_proof(&request.request, (response.value, response.proof)) {
Ok(response) => {
// we do not bother if receiver has been dropped already
let _ = request.sender.send(response);
},
Err(error) => {
trace!(target: "sync", "Failed to check remote response from peer {}: {}", peer, error);
io.disconnect_peer(peer);
core.remove_peer(peer);
core.insert(request.sender, request.request);
},
},
None => {
trace!(target: "sync", "Invalid remote response from peer {}", peer);
io.disconnect_peer(peer);
core.remove_peer(peer);
},
}
core.dispatch();
}
}
impl<E> Fetcher for OnDemand<E> where E: service::ExecuteInContext {
type RemoteCallResult = Response;
fn remote_call(&self, request: RemoteCallRequest) -> Self::RemoteCallResult {
self.remote_call(request)
}
}
impl<E> OnDemandCore<E> where E: service::ExecuteInContext {
pub fn add_peer(&mut self, peer: PeerId) {
self.idle_peers.push_back(peer);
}
pub fn remove_peer(&mut self, peer: PeerId) {
if let Some(request) = self.active_peers.remove(&peer) {
self.pending_requests.push_front(request);
return;
}
if let Some(idle_index) = self.idle_peers.iter().position(|i| *i == peer) {
self.idle_peers.swap_remove_back(idle_index);
}
}
pub fn maintain_peers(&mut self) -> Vec<PeerId> {
let now = Instant::now();
let mut bad_peers = Vec::new();
loop {
match self.active_peers.front() {
Some((_, request)) if now - request.timestamp >= REQUEST_TIMEOUT => (),
_ => return bad_peers,
}
let (bad_peer, request) = self.active_peers.pop_front().expect("front() is Some as checked above");
self.pending_requests.push_front(request);
bad_peers.push(bad_peer);
}
}
pub fn insert(&mut self, sender: Sender<client::CallResult>, request: RemoteCallRequest) {
let request_id = self.next_request_id;
self.next_request_id += 1;
self.pending_requests.push_back(Request {
id: request_id,
timestamp: Instant::now(),
sender,
request,
});
}
pub fn remove(&mut self, peer: PeerId, id: u64) -> Option<Request> {
match self.active_peers.entry(peer) {
Entry::Occupied(entry) => match entry.get().id == id {
true => {
self.idle_peers.push_back(peer);
Some(entry.remove())
},
false => None,
},
Entry::Vacant(_) => None,
}
}
pub fn dispatch(&mut self) {
let service = match self.service.upgrade() {
Some(service) => service,
None => return,
};
while !self.pending_requests.is_empty() {
let peer = match self.idle_peers.pop_front() {
Some(peer) => peer,
None => return,
};
let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed");
request.timestamp = Instant::now();
trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer);
service.execute_in_context(|ctx, protocol| {
let message = message::RemoteCallRequest {
id: request.id,
block: request.request.block,
method: request.request.method.clone(),
data: request.request.call_data.clone(),
};
protocol.send_message(ctx, peer, message::Message::RemoteCallRequest(message))
});
self.active_peers.insert(peer, request);
}
}
}
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Instant;
use futures::Future;
use parking_lot::RwLock;
use client;
use client::light::{FetchChecker, RemoteCallRequest};
use io::NetSyncIo;
use message;
use network::PeerId;
use protocol::Protocol;
use service::{Role, ExecuteInContext};
use test::TestIo;
use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService};
struct DummyExecutor;
struct DummyFetchChecker { ok: bool }
impl ExecuteInContext for DummyExecutor {
fn execute_in_context<F: Fn(&mut NetSyncIo, &Protocol)>(&self, _closure: F) {}
}
impl FetchChecker for DummyFetchChecker {
fn check_execution_proof(&self, _request: &RemoteCallRequest, remote_proof: (Vec<u8>, Vec<Vec<u8>>)) -> client::error::Result<client::CallResult> {
match self.ok {
true => Ok(client::CallResult {
return_data: remote_proof.0,
changes: Default::default(),
}),
false => Err(client::error::ErrorKind::Backend("Test error".into()).into()),
}
}
}
fn dummy(ok: bool) -> (Arc<DummyExecutor>, Arc<OnDemand<DummyExecutor>>) {
let executor = Arc::new(DummyExecutor);
let service = Arc::new(OnDemand::new(Arc::new(DummyFetchChecker { ok })));
service.set_service_link(Arc::downgrade(&executor));
(executor, service)
}
fn total_peers(on_demand: &OnDemand<DummyExecutor>) -> usize {
let core = on_demand.core.lock();
core.idle_peers.len() + core.active_peers.len()
}
fn receive_response(on_demand: &OnDemand<DummyExecutor>, network: &mut TestIo, peer: PeerId, id: message::RequestId) {
on_demand.on_remote_response(network, peer, message::RemoteCallResponse {
id: id,
value: vec![1],
proof: vec![vec![2]],
});
}
#[test]
fn knows_about_peers_roles() {
let (_, on_demand) = dummy(true);
on_demand.on_connect(0, Role::LIGHT);
on_demand.on_connect(1, Role::FULL);
on_demand.on_connect(2, Role::COLLATOR);
on_demand.on_connect(3, Role::VALIDATOR);
assert_eq!(vec![1, 2, 3], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
}
#[test]
fn disconnects_from_idle_peer() {
let (_, on_demand) = dummy(true);
on_demand.on_connect(0, Role::FULL);
assert_eq!(1, total_peers(&*on_demand));
on_demand.on_disconnect(0);
assert_eq!(0, total_peers(&*on_demand));
}
#[test]
fn disconnects_from_timeouted_peer() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Role::FULL);
on_demand.on_connect(1, Role::FULL);
assert_eq!(vec![0, 1], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
assert!(on_demand.core.lock().active_peers.is_empty());
on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] });
assert_eq!(vec![1], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
assert_eq!(vec![0], on_demand.core.lock().active_peers.keys().cloned().collect::<Vec<_>>());
on_demand.core.lock().active_peers[&0].timestamp = Instant::now() - REQUEST_TIMEOUT - REQUEST_TIMEOUT;
on_demand.maintain_peers(&mut network);
assert!(on_demand.core.lock().idle_peers.is_empty());
assert_eq!(vec![1], on_demand.core.lock().active_peers.keys().cloned().collect::<Vec<_>>());
assert!(network.to_disconnect.contains(&0));
}
#[test]
fn disconnects_from_peer_on_response_with_wrong_id() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Role::FULL);
on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] });
receive_response(&*on_demand, &mut network, 0, 1);
assert!(network.to_disconnect.contains(&0));
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
}
#[test]
fn disconnects_from_peer_on_incorrect_response() {
let (_x, on_demand) = dummy(false);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Role::FULL);
on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] });
receive_response(&*on_demand, &mut network, 0, 0);
assert!(network.to_disconnect.contains(&0));
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
}
#[test]
fn disconnects_from_peer_on_unexpected_response() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Role::FULL);
receive_response(&*on_demand, &mut network, 0, 0);
assert!(network.to_disconnect.contains(&0));
}
#[test]
fn receives_remote_response() {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Role::FULL);
let response = on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] });
let thread = ::std::thread::spawn(move || {
let result = response.wait().unwrap();
assert_eq!(result.return_data, vec![1]);
});
receive_response(&*on_demand, &mut network, 0, 0);
thread.join().unwrap();
}
}
+35 -6
View File
@@ -32,6 +32,7 @@ use consensus::Consensus;
use service::{Role, TransactionPool, StatementStream, BftMessageStream}; use service::{Role, TransactionPool, StatementStream, BftMessageStream};
use config::ProtocolConfig; use config::ProtocolConfig;
use chain::Client; use chain::Client;
use on_demand::OnDemandService;
use io::SyncIo; use io::SyncIo;
use error; use error;
use super::header_hash; use super::header_hash;
@@ -46,6 +47,7 @@ const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
pub struct Protocol { pub struct Protocol {
config: ProtocolConfig, config: ProtocolConfig,
chain: Arc<Client>, chain: Arc<Client>,
on_demand: Option<Arc<OnDemandService>>,
genesis_hash: HeaderHash, genesis_hash: HeaderHash,
sync: RwLock<ChainSync>, sync: RwLock<ChainSync>,
consensus: Mutex<Consensus>, consensus: Mutex<Consensus>,
@@ -112,14 +114,16 @@ pub struct TransactionStats {
impl Protocol { impl Protocol {
/// Create a new instance. /// Create a new instance.
pub fn new(config: ProtocolConfig, chain: Arc<Client>, transaction_pool: Arc<TransactionPool>) -> error::Result<Protocol> { pub fn new(config: ProtocolConfig, chain: Arc<Client>, on_demand: Option<Arc<OnDemandService>>, transaction_pool: Arc<TransactionPool>) -> error::Result<Protocol> {
let info = chain.info()?; let info = chain.info()?;
let best_hash = info.chain.best_hash; let best_hash = info.chain.best_hash;
let sync = ChainSync::new(config.roles, &info);
let protocol = Protocol { let protocol = Protocol {
config: config, config: config,
chain: chain, chain: chain,
on_demand: on_demand,
genesis_hash: info.chain.genesis_hash, genesis_hash: info.chain.genesis_hash,
sync: RwLock::new(ChainSync::new(&info)), sync: RwLock::new(sync),
consensus: Mutex::new(Consensus::new(best_hash)), consensus: Mutex::new(Consensus::new(best_hash)),
peers: RwLock::new(HashMap::new()), peers: RwLock::new(HashMap::new()),
handshaking_peers: RwLock::new(HashMap::new()), handshaking_peers: RwLock::new(HashMap::new()),
@@ -185,6 +189,8 @@ impl Protocol {
Message::CandidateResponse(r) => self.on_candidate_response(io, peer_id, r), Message::CandidateResponse(r) => self.on_candidate_response(io, peer_id, r),
Message::BftMessage(m) => self.on_bft_message(io, peer_id, m, blake2_256(data).into()), Message::BftMessage(m) => self.on_bft_message(io, peer_id, m, blake2_256(data).into()),
Message::Transactions(m) => self.on_transactions(io, peer_id, m), Message::Transactions(m) => self.on_transactions(io, peer_id, m),
Message::RemoteCallRequest(request) => self.on_remote_call_request(io, peer_id, request),
Message::RemoteCallResponse(response) => self.on_remote_call_response(io, peer_id, response)
} }
} }
@@ -232,6 +238,7 @@ impl Protocol {
if removed { if removed {
self.consensus.lock().peer_disconnected(io, self, peer); self.consensus.lock().peer_disconnected(io, self, peer);
self.sync.write().peer_disconnected(io, self, peer); self.sync.write().peer_disconnected(io, self, peer);
self.on_demand.as_ref().map(|s| s.on_disconnect(peer));
} }
} }
@@ -345,6 +352,7 @@ impl Protocol {
/// Perform time based maintenance. /// Perform time based maintenance.
pub fn tick(&self, io: &mut SyncIo) { pub fn tick(&self, io: &mut SyncIo) {
self.maintain_peers(io); self.maintain_peers(io);
self.on_demand.as_ref().map(|s| s.maintain_peers(io));
self.consensus.lock().collect_garbage(None); self.consensus.lock().collect_garbage(None);
} }
@@ -388,8 +396,6 @@ impl Protocol {
return; return;
} }
let mut sync = self.sync.write();
let mut consensus = self.consensus.lock();
{ {
let mut peers = self.peers.write(); let mut peers = self.peers.write();
let mut handshaking_peers = self.handshaking_peers.write(); let mut handshaking_peers = self.handshaking_peers.write();
@@ -423,8 +429,10 @@ impl Protocol {
handshaking_peers.remove(&peer_id); handshaking_peers.remove(&peer_id);
debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id)); debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id));
} }
sync.new_peer(io, self, peer_id);
consensus.new_peer(io, self, peer_id, &status.roles); self.sync.write().new_peer(io, self, peer_id);
self.consensus.lock().new_peer(io, self, peer_id, &status.roles);
self.on_demand.as_ref().map(|s| s.on_connect(peer_id, message::Role::as_flags(&status.roles)));
} }
/// Called when peer sends us new transactions /// Called when peer sends us new transactions
@@ -523,6 +531,27 @@ impl Protocol {
self.consensus.lock().collect_garbage(Some((hash, &header))); self.consensus.lock().collect_garbage(Some((hash, &header)));
} }
fn on_remote_call_request(&self, io: &mut SyncIo, peer_id: PeerId, request: message::RemoteCallRequest) {
trace!(target: "sync", "Remote request {} from {} ({} at {})", request.id, peer_id, request.method, request.block);
let (value, proof) = match self.chain.execution_proof(&request.block, &request.method, &request.data) {
Ok((value, proof)) => (value, proof),
Err(error) => {
trace!(target: "sync", "Remote request {} from {} ({} at {}) failed with: {}",
request.id, peer_id, request.method, request.block, error);
(Default::default(), Default::default())
},
};
self.send_message(io, peer_id, message::Message::RemoteCallResponse(message::RemoteCallResponse {
id: request.id, value, proof,
}));
}
fn on_remote_call_response(&self, io: &mut SyncIo, peer_id: PeerId, response: message::RemoteCallResponse) {
trace!(target: "sync", "Remote response {} from {}", response.id, peer_id);
self.on_demand.as_ref().map(|s| s.on_remote_response(io, peer_id, response));
}
pub fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats> { pub fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats> {
BTreeMap::new() BTreeMap::new()
} }
+18 -1
View File
@@ -31,6 +31,7 @@ use config::{ProtocolConfig};
use error::Error; use error::Error;
use chain::Client; use chain::Client;
use message::{Statement, LocalizedBftMessage}; use message::{Statement, LocalizedBftMessage};
use on_demand::OnDemandService;
/// Polkadot devp2p protocol id /// Polkadot devp2p protocol id
pub const DOT_PROTOCOL_ID: ProtocolId = *b"dot"; pub const DOT_PROTOCOL_ID: ProtocolId = *b"dot";
@@ -107,6 +108,12 @@ pub trait ConsensusService: Send + Sync {
fn send_bft_message(&self, message: LocalizedBftMessage); fn send_bft_message(&self, message: LocalizedBftMessage);
} }
/// Service able to execute closure in the network context.
pub trait ExecuteInContext: Send + Sync {
/// Execute closure in network context.
fn execute_in_context<F: Fn(&mut NetSyncIo, &Protocol)>(&self, closure: F);
}
/// devp2p Protocol handler /// devp2p Protocol handler
struct ProtocolHandler { struct ProtocolHandler {
protocol: Protocol, protocol: Protocol,
@@ -137,6 +144,8 @@ pub struct Params {
pub network_config: NetworkConfiguration, pub network_config: NetworkConfiguration,
/// Polkadot relay chain access point. /// Polkadot relay chain access point.
pub chain: Arc<Client>, pub chain: Arc<Client>,
/// On-demand service reference.
pub on_demand: Option<Arc<OnDemandService>>,
/// Transaction pool. /// Transaction pool.
pub transaction_pool: Arc<TransactionPool>, pub transaction_pool: Arc<TransactionPool>,
} }
@@ -156,7 +165,7 @@ impl Service {
let sync = Arc::new(Service { let sync = Arc::new(Service {
network: service, network: service,
handler: Arc::new(ProtocolHandler { handler: Arc::new(ProtocolHandler {
protocol: Protocol::new(params.config, params.chain.clone(), params.transaction_pool)?, protocol: Protocol::new(params.config, params.chain, params.on_demand, params.transaction_pool)?,
}), }),
}); });
@@ -200,6 +209,14 @@ impl Drop for Service {
} }
} }
impl ExecuteInContext for Service {
fn execute_in_context<F: Fn(&mut NetSyncIo, &Protocol)>(&self, closure: F) {
self.network.with_context(DOT_PROTOCOL_ID, |context| {
closure(&mut NetSyncIo::new(context), &self.handler.protocol)
});
}
}
impl SyncProvider for Service { impl SyncProvider for Service {
/// Get sync status /// Get sync status
fn status(&self) -> ProtocolStatus { fn status(&self) -> ProtocolStatus {
+11 -2
View File
@@ -22,6 +22,7 @@ use client::{ImportResult, BlockStatus, ClientInfo};
use primitives::block::{HeaderHash, Number as BlockNumber, Header, Id as BlockId}; use primitives::block::{HeaderHash, Number as BlockNumber, Header, Id as BlockId};
use blocks::{self, BlockCollection}; use blocks::{self, BlockCollection};
use message::{self, Message}; use message::{self, Message};
use service::Role;
use super::header_hash; use super::header_hash;
// Maximum blocks to request in a single packet. // Maximum blocks to request in a single packet.
@@ -73,14 +74,22 @@ pub struct Status {
impl ChainSync { impl ChainSync {
/// Create a new instance. /// Create a new instance.
pub fn new(info: &ClientInfo) -> ChainSync { pub fn new(role: Role, info: &ClientInfo) -> ChainSync {
let mut required_block_attributes = vec![
message::BlockAttribute::Header,
message::BlockAttribute::Justification
];
if role.intersects(Role::FULL | Role::VALIDATOR | Role::COLLATOR) {
required_block_attributes.push(message::BlockAttribute::Body);
}
ChainSync { ChainSync {
genesis_hash: info.chain.genesis_hash, genesis_hash: info.chain.genesis_hash,
peers: HashMap::new(), peers: HashMap::new(),
blocks: BlockCollection::new(), blocks: BlockCollection::new(),
best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash), best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash),
best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number), best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number),
required_block_attributes: vec![message::BlockAttribute::Header, message::BlockAttribute::Body, message::BlockAttribute::Justification], required_block_attributes: required_block_attributes,
} }
} }
+1 -1
View File
@@ -227,7 +227,7 @@ impl TestNet {
for _ in 0..n { for _ in 0..n {
let client = Arc::new(test_client::new()); let client = Arc::new(test_client::new());
let tx_pool = Arc::new(EmptyTransactionPool); let tx_pool = Arc::new(EmptyTransactionPool);
let sync = Protocol::new(config.clone(), client.clone(), tx_pool).unwrap(); let sync = Protocol::new(config.clone(), client.clone(), None, tx_pool).unwrap();
net.peers.push(Arc::new(Peer { net.peers.push(Arc::new(Peer {
sync: sync, sync: sync,
client: client, client: client,
+1 -1
View File
@@ -81,7 +81,7 @@ impl<B, E> Chain<B, E> {
impl<B, E> ChainApi for Chain<B, E> where impl<B, E> ChainApi for Chain<B, E> where
B: client::backend::Backend + Send + Sync + 'static, B: client::backend::Backend + Send + Sync + 'static,
E: state_machine::CodeExecutor + Send + Sync + 'static, E: client::CallExecutor + Send + Sync + 'static,
client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>, client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>,
{ {
type Metadata = ::metadata::Metadata; type Metadata = ::metadata::Metadata;
+2 -3
View File
@@ -28,13 +28,12 @@ fn should_return_header() {
client: Arc::new(test_client::new()), client: Arc::new(test_client::new()),
subscriptions: Subscriptions::new(remote), subscriptions: Subscriptions::new(remote),
}; };
assert_matches!( assert_matches!(
client.header(client.client.genesis_hash()), client.header(client.client.genesis_hash()),
Ok(Some(ref x)) if x == &block::Header { Ok(Some(ref x)) if x == &block::Header {
parent_hash: 0.into(), parent_hash: 0.into(),
number: 0, number: 0,
state_root: "6da331d07a82d99f4debaafb0110a2e36244ed34162f9a7f6312a23fd52989ed".into(), state_root: "0c81ab6cfac8c8d7201d78cb699b6b79d714462a4ba00abcacce22444babe315".into(),
extrinsics_root: "56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421".into(), extrinsics_root: "56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421".into(),
digest: Default::default(), digest: Default::default(),
} }
@@ -70,7 +69,7 @@ fn should_notify_about_latest_block() {
// assert notification send to transport // assert notification send to transport
let (notification, next) = core.run(transport.into_future()).unwrap(); let (notification, next) = core.run(transport.into_future()).unwrap();
assert_eq!(notification, Some( assert_eq!(notification, Some(
r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"digest":{"logs":[]},"extrinsicsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","number":1,"parentHash":"0x4c4ab196ed07bbd5b8c901ae5092d9d3990cbb4d44421af8e988af7d3c2a4226","stateRoot":"0x75b634da2a0d272e8a5145ab704406d3b50676c7739f977f2ccb2d0e5a0cdbd0"},"subscription":0}}"#.to_owned() r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"digest":{"logs":[]},"extrinsicsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","number":1,"parentHash":"0x72ae67388233893fb4594f13df56d4e654aa8721763bcd0bd4e187fee7b2f349","stateRoot":"0x2e1f2f1c53ffb1767fe1abf4fe5953cc87c7650d4af2d4393d1f72324f2cc5d7"},"subscription":0}}"#.to_owned()
)); ));
// no more notifications on this channel // no more notifications on this channel
assert_eq!(core.run(next.into_future()).unwrap().0, None); assert_eq!(core.run(next.into_future()).unwrap().0, None);
+3 -3
View File
@@ -22,7 +22,7 @@ mod error;
mod tests; mod tests;
use std::sync::Arc; use std::sync::Arc;
use client::{self, Client}; use client::{self, Client, CallExecutor};
use primitives::{block, Hash, blake2_256}; use primitives::{block, Hash, blake2_256};
use primitives::storage::{StorageKey, StorageData}; use primitives::storage::{StorageKey, StorageData};
use primitives::hexdisplay::HexDisplay; use primitives::hexdisplay::HexDisplay;
@@ -69,7 +69,7 @@ build_rpc_trait! {
impl<B, E> StateApi for Arc<Client<B, E>> where impl<B, E> StateApi for Arc<Client<B, E>> where
B: client::backend::Backend + Send + Sync + 'static, B: client::backend::Backend + Send + Sync + 'static,
E: state_machine::CodeExecutor + Send + Sync + 'static, E: CallExecutor + Send + Sync + 'static,
client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>, client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>,
{ {
fn storage_at(&self, key: StorageKey, block: block::HeaderHash) -> Result<StorageData> { fn storage_at(&self, key: StorageKey, block: block::HeaderHash) -> Result<StorageData> {
@@ -79,7 +79,7 @@ impl<B, E> StateApi for Arc<Client<B, E>> where
fn call_at(&self, method: String, data: Vec<u8>, block: block::HeaderHash) -> Result<Vec<u8>> { fn call_at(&self, method: String, data: Vec<u8>, block: block::HeaderHash) -> Result<Vec<u8>> {
trace!(target: "rpc", "Calling runtime at {:?} for method {} ({})", block, method, HexDisplay::from(&data)); trace!(target: "rpc", "Calling runtime at {:?} for method {} ({})", block, method, HexDisplay::from(&data));
Ok(self.as_ref().call(&block::Id::Hash(block), &method, &data)?.return_data) Ok(self.as_ref().executor().call(&block::Id::Hash(block), &method, &data)?.return_data)
} }
fn storage_hash_at(&self, key: StorageKey, block: block::HeaderHash) -> Result<Hash> { fn storage_hash_at(&self, key: StorageKey, block: block::HeaderHash) -> Result<Hash> {
@@ -39,7 +39,7 @@ pub trait TestClient {
impl TestClient for Client<Backend, Executor> { impl TestClient for Client<Backend, Executor> {
fn new_for_tests() -> Self { fn new_for_tests() -> Self {
client::new_in_mem(NativeExecutor::new(), prepare_genesis).unwrap() client::new_in_mem(NativeExecutor::new(), GenesisBuilder).unwrap()
} }
fn justify_and_import(&self, origin: client::BlockOrigin, block: block::Block) -> client::error::Result<()> { fn justify_and_import(&self, origin: client::BlockOrigin, block: block::Block) -> client::error::Result<()> {
@@ -95,7 +95,10 @@ fn genesis_config() -> GenesisConfig {
], 1000) ], 1000)
} }
fn prepare_genesis() -> (block::Header, Vec<(Vec<u8>, Vec<u8>)>) { struct GenesisBuilder;
impl client::GenesisBuilder for GenesisBuilder {
fn build(self) -> (block::Header, Vec<(Vec<u8>, Vec<u8>)>) {
let mut storage = genesis_config().genesis_map(); let mut storage = genesis_config().genesis_map();
let block = client::genesis::construct_genesis_block(&storage); let block = client::genesis::construct_genesis_block(&storage);
storage.extend(additional_storage_with_genesis(&block)); storage.extend(additional_storage_with_genesis(&block));
@@ -106,3 +109,4 @@ fn prepare_genesis() -> (block::Header, Vec<(Vec<u8>, Vec<u8>)>) {
storage.into_iter().collect() storage.into_iter().collect()
) )
} }
}
+1 -1
View File
@@ -46,7 +46,7 @@ pub use self::native_executor::NativeExecutor;
pub type Backend = client::in_mem::Backend; pub type Backend = client::in_mem::Backend;
/// Test client executor. /// Test client executor.
pub type Executor = executor::NativeExecutor<NativeExecutor>; pub type Executor = client::LocalCallExecutor<Backend, executor::NativeExecutor<NativeExecutor>>;
/// Creates new client instance used for tests. /// Creates new client instance used for tests.
pub fn new() -> client::Client<Backend, Executor> { pub fn new() -> client::Client<Backend, Executor> {
@@ -68,6 +68,7 @@ pub mod api {
use system; use system;
impl_stubs!( impl_stubs!(
authorities => |()| system::authorities(),
execute_block => |block| system::execute_block(block), execute_block => |block| system::execute_block(block),
execute_transaction => |(header, utx)| system::execute_transaction(utx, header), execute_transaction => |(header, utx)| system::execute_transaction(utx, header),
finalise_block => |header| system::finalise_block(header) finalise_block => |header| system::finalise_block(header)
@@ -18,6 +18,7 @@
//! and depositing logs. //! and depositing logs.
use rstd::prelude::*; use rstd::prelude::*;
use primitives::AuthorityId;
use runtime_io::{storage_root, enumerated_trie_root, ed25519_verify}; use runtime_io::{storage_root, enumerated_trie_root, ed25519_verify};
use runtime_support::{Hashable, storage}; use runtime_support::{Hashable, storage};
use codec::{KeyedVec, Slicable}; use codec::{KeyedVec, Slicable};
@@ -26,6 +27,8 @@ use super::{AccountId, UncheckedTransaction, H256 as Hash, Block, Header};
const NONCE_OF: &[u8] = b"nonce:"; const NONCE_OF: &[u8] = b"nonce:";
const BALANCE_OF: &[u8] = b"balance:"; const BALANCE_OF: &[u8] = b"balance:";
const LATEST_BLOCK_HASH: &[u8] = b"latest"; const LATEST_BLOCK_HASH: &[u8] = b"latest";
const AUTHORITY_AT: &'static[u8] = b":auth:";
const AUTHORITY_COUNT: &'static[u8] = b":auth:len";
pub fn latest_block_hash() -> Hash { pub fn latest_block_hash() -> Hash {
storage::get(LATEST_BLOCK_HASH).expect("There must always be a latest block") storage::get(LATEST_BLOCK_HASH).expect("There must always be a latest block")
@@ -39,6 +42,14 @@ pub fn nonce_of(who: AccountId) -> u64 {
storage::get_or(&who.to_keyed_vec(NONCE_OF), 0) storage::get_or(&who.to_keyed_vec(NONCE_OF), 0)
} }
/// Get authorities ar given block.
pub fn authorities() -> Vec<AuthorityId> {
let len: u32 = storage::unhashed::get(AUTHORITY_COUNT).expect("There are always authorities in test-runtime");
(0..len)
.map(|i| storage::unhashed::get(&i.to_keyed_vec(AUTHORITY_AT)).expect("Authority is properly encoded in test-runtime"))
.collect()
}
/// Actually execute all transitioning for `block`. /// Actually execute all transitioning for `block`.
pub fn execute_block(block: Block) { pub fn execute_block(block: Block) {
let ref header = block.header; let ref header = block.header;
Binary file not shown.