diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000000..9fb23ce694 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,13 @@ +# EditorConfig helps developers define and maintain consistent +# coding styles between different editors and IDEs +# editorconfig.org + +root = true + +[*] +end_of_line = lf +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true +indent_style = space +indent_size = 4 diff --git a/.rustfmt.toml b/.rustfmt.toml new file mode 100644 index 0000000000..224f58aed2 --- /dev/null +++ b/.rustfmt.toml @@ -0,0 +1,67 @@ +max_width = 90 # changed +hard_tabs = false +tab_spaces = 4 +newline_style = "Auto" +use_small_heuristics = "Default" +indent_style = "Block" +wrap_comments = false +format_code_in_doc_comments = false +comment_width = 80 +normalize_comments = true # changed +normalize_doc_attributes = false +license_template_path = "LICENSE_TEMPLATE" # changed +format_strings = false +format_macro_matchers = false +format_macro_bodies = true +empty_item_single_line = true +struct_lit_single_line = true +fn_single_line = false +where_single_line = false +imports_indent = "Block" +imports_layout = "Vertical" # changed +merge_imports = true # changed +reorder_imports = true +reorder_modules = true +reorder_impl_items = false +type_punctuation_density = "Wide" +space_before_colon = false +space_after_colon = true +spaces_around_ranges = false +binop_separator = "Front" +remove_nested_parens = true +combine_control_expr = false # changed +overflow_delimited_expr = false +struct_field_align_threshold = 0 +enum_discrim_align_threshold = 0 +match_arm_blocks = true +force_multiline_blocks = true # changed +fn_args_layout = "Tall" +brace_style = "SameLineWhere" +control_brace_style = "AlwaysSameLine" +trailing_semicolon = false # changed +trailing_comma = "Vertical" +match_block_trailing_comma = false +blank_lines_upper_bound = 1 +blank_lines_lower_bound = 0 +edition = "2018" # changed +version = "One" +merge_derives = true +use_try_shorthand = true # changed +use_field_init_shorthand = true # changed +force_explicit_abi = true +condense_wildcard_suffixes = false +color = "Auto" +unstable_features = true # changed +disable_all_formatting = false +skip_children = false +hide_parse_errors = false +error_on_line_overflow = false +error_on_unformatted = false +report_todo = "Always" +report_fixme = "Always" +ignore = [] + +# Below are `rustfmt` internal settings +# +# emit_mode = "Files" +# make_backup = false diff --git a/Cargo.toml b/Cargo.toml index 5ee31c446b..accecaea14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,16 +18,19 @@ env_logger = "0.6" log = "0.4" futures = "0.1.28" jsonrpc-core-client = { version = "12.1.0", features = ["ws"] } -node-runtime = { git = "https://github.com/paritytech/substrate/", package = "node-runtime", features = ["std"] } +num-traits = { version = "0.2", default-features = false } parity-codec = { version = "4.1", default-features = false, features = ["derive", "full"] } runtime_support = { git = "https://github.com/paritytech/substrate/", package = "srml-support" } runtime_primitives = { git = "https://github.com/paritytech/substrate/", package = "sr-primitives" } serde = { version = "1.0", features = ["derive"] } -srml-balances = { git = "https://github.com/paritytech/substrate/", package = "srml-balances", default-features = false } -srml-system = { git = "https://github.com/paritytech/substrate/", package = "srml-system", default-features = false } +srml-system = { git = "https://github.com/paritytech/substrate/", package = "srml-system" } substrate-rpc = { git = "https://github.com/paritytech/substrate/", package = "substrate-rpc" } -substrate-keyring = { git = "https://github.com/paritytech/substrate/", package = "substrate-keyring" } substrate-primitives = { git = "https://github.com/paritytech/substrate/", package = "substrate-primitives" } transaction_pool = { git = "https://github.com/paritytech/substrate/", package = "substrate-transaction-pool" } tokio = "0.1.21" url = "1.7" + +[dev-dependencies] +node_runtime = { git = "https://github.com/paritytech/substrate/", package = "node-runtime" } +srml_balances = { git = "https://github.com/paritytech/substrate/", package = "srml-balances" } +substrate-keyring = { git = "https://github.com/paritytech/substrate/", package = "substrate-keyring" } diff --git a/README.md b/README.md index afe65dc8dc..9d18ccc0dc 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -# subext +# subxt -A library to submit extrinsics to a [substrate](https://github.com/paritytech/substrate) node via RPC. +A library to **sub**mit e**xt**rinsics to a [substrate](https://github.com/paritytech/substrate) node via RPC. **IMPORTANT NOTE:** WORK IN PROGRESS! Do not expect this to be working (or supported). diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000000..cc2d90d637 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,35 @@ +// Copyright 2018-2019 Parity Technologies (UK) Ltd. +// This file is part of substrate-subxt. +// +// ink! is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// ink! is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with substrate-subxt. If not, see . + +use jsonrpc_core_client::RpcError; +use std::io::Error as IoError; +use substrate_primitives::crypto::SecretStringError; + +#[derive(Debug, derive_more::From)] +pub enum Error { + Io(IoError), + Rpc(RpcError), + SecretString(SecretStringError), + Other(String), +} + +impl From<&str> for Error { + fn from(error: &str) -> Self { + Error::Other(error.into()) + } +} + +pub type Result = std::result::Result; \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 0cfb2f5e68..906ba61762 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,380 +14,78 @@ // You should have received a copy of the GNU General Public License // along with substrate-subxt. If not, see . -use futures::{ - future::{ - self, - Future, - IntoFuture, - }, - stream::Stream, -}; -use jsonrpc_core_client::{ - transports::ws, - RpcChannel, - RpcError, - TypedSubscriptionStream, -}; -use log; -use node_runtime::{ - Call, - Runtime, - UncheckedExtrinsic, -}; -use parity_codec::{ - Decode, - Encode, -}; +use crate::error::Result; +use futures::future::Future; +use jsonrpc_core_client::transports::ws; +use parity_codec::Codec; -use runtime_primitives::generic::Era; -use runtime_support::StorageMap; -use serde::{ - self, - de::Error as DeError, - Deserialize, -}; -use substrate_primitives::{ - blake2_256, - sr25519::Pair, - crypto::SecretStringError, - storage::{ - StorageChangeSet, - StorageKey, - }, - twox_128, - Pair as _, - H256, -}; -use substrate_rpc::{ - author::AuthorClient, - chain::{ - number::NumberOrHex, - ChainClient, - }, - state::StateClient, -}; -use std::io::Error as IoError; -use transaction_pool::txpool::watcher::Status; +use runtime_primitives::traits::SignedExtension; +use substrate_primitives::Pair; use url::Url; -#[derive(Debug, derive_more::From)] -pub enum Error { - Io(IoError), - Rpc(RpcError), - SecretString(SecretStringError), - Other(String), -} - -impl From<&str> for Error { - fn from(error: &str) -> Self { - Error::Other(error.into()) - } -} - -pub type Result = std::result::Result; - -type AccountId = ::AccountId; -type BlockNumber = ::BlockNumber; -type Index = ::Index; -type Hash = ::Hash; -type Event = ::Event; -type EventRecord = srml_system::EventRecord; - -/// Copy of runtime_primitives::OpaqueExtrinsic to allow a local Deserialize impl -#[derive(PartialEq, Eq, Clone, Default, Encode, Decode)] -pub struct OpaqueExtrinsic(pub Vec); - -impl std::fmt::Debug for OpaqueExtrinsic { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - fmt, - "{}", - substrate_primitives::hexdisplay::HexDisplay::from(&self.0) - ) - } -} - -impl<'a> serde::Deserialize<'a> for OpaqueExtrinsic { - fn deserialize(de: D) -> std::result::Result - where - D: serde::Deserializer<'a>, - { - let r = substrate_primitives::bytes::deserialize(de)?; - Decode::decode(&mut &r[..]) - .ok_or(DeError::custom("Invalid value passed into decode")) - } -} - -/// Copy of runtime_primitives::generic::Block with Deserialize implemented -#[derive(PartialEq, Eq, Clone, Encode, Decode, Debug, Deserialize)] -pub struct Block { - // not included: pub header: Header, - /// The accompanying extrinsics. - pub extrinsics: Vec, -} - -/// Copy of runtime_primitives::generic::SignedBlock with Deserialize implemented -#[derive(PartialEq, Eq, Clone, Encode, Decode, Debug, Deserialize)] -pub struct SignedBlock { - /// Full block. - pub block: Block, -} +mod error; +mod rpc; /// Captures data for when an extrinsic is successfully included in a block #[derive(Debug)] -pub struct ExtrinsicSuccess { - pub block: Hash, - pub extrinsic: Hash, - pub events: Vec, -} - -/// Client for substrate rpc interfaces -struct Rpc { - state: StateClient, - chain: ChainClient, - author: AuthorClient, -} - -/// Allows connecting to all inner interfaces on the same RpcChannel -impl From for Rpc { - fn from(channel: RpcChannel) -> Rpc { - Rpc { - state: channel.clone().into(), - chain: channel.clone().into(), - author: channel.into(), - } - } -} - -impl Rpc { - /// Fetch the latest nonce for the given `AccountId` - fn fetch_nonce( - &self, - account: &AccountId, - ) -> impl Future { - let account_nonce_key = >::key_for(account); - let storage_key = blake2_256(&account_nonce_key).to_vec(); - - self.state - .storage(StorageKey(storage_key), None) - .map(|data| { - data.map_or(0, |d| { - Decode::decode(&mut &d.0[..]).expect("Account nonce is valid Index") - }) - }) - .map_err(Into::into) - } - - /// Fetch the genesis hash - fn fetch_genesis_hash(&self) -> impl Future, Error = RpcError> { - self.chain.block_hash(Some(NumberOrHex::Number(0))) - } - - /// Subscribe to substrate System Events - fn subscribe_events( - &self, - ) -> impl Future>, Error = RpcError> - { - let events_key = b"System Events"; - let storage_key = twox_128(events_key); - log::debug!("Events storage key {:?}", storage_key); - - self.state - .subscribe_storage(Some(vec![StorageKey(storage_key.to_vec())])) - } - - /// Submit an extrinsic, waiting for it to be finalized. - /// If successful, returns the block hash. - fn submit_and_watch( - self, - extrinsic: UncheckedExtrinsic, - ) -> impl Future { - self.author - .watch_extrinsic(extrinsic.encode().into()) - .map_err(Into::into) - .and_then(|stream| { - stream - .filter_map(|status| { - match status { - Status::Future | Status::Ready | Status::Broadcast(_) => None, // ignore in progress extrinsic for now - Status::Finalized(block_hash) => Some(Ok(block_hash)), - Status::Usurped(_) => Some(Err("Extrinsic Usurped".into())), - Status::Dropped => Some(Err("Extrinsic Dropped".into())), - Status::Invalid => Some(Err("Extrinsic Invalid".into())), - } - }) - .into_future() - .map_err(|(e,_)| e.into()) - .and_then(|(result, _)| { - result - .ok_or(Error::from("Stream terminated")) - .and_then(|r| r) - .into_future() - }) - }) - } - - /// Create and submit an extrinsic and return corresponding Event if successful - fn create_and_submit_extrinsic( - self, - signer: Pair, - call: Call, - ) -> impl Future { - let account_nonce = self.fetch_nonce(&signer.public()).map_err(Into::into); - let genesis_hash = - self.fetch_genesis_hash() - .map_err(Into::into) - .and_then(|genesis_hash| { - future::result(genesis_hash.ok_or("Genesis hash not found".into())) - }); - let events = self.subscribe_events().map_err(Into::into); - - account_nonce.join3(genesis_hash, events).and_then( - move |(index, genesis_hash, events)| { - let extrinsic = - create_and_sign_extrinsic(index, call, genesis_hash, &signer); - let ext_hash = - H256(extrinsic.using_encoded(|encoded| blake2_256(encoded))); - log::info!("Submitting Extrinsic `{:?}`", ext_hash); - - let chain = self.chain.clone(); - self.submit_and_watch(extrinsic) - .and_then(move |bh| { - log::info!("Fetching block {:?}", bh); - chain - .block(Some(bh)) - .map(move |b| (bh, b)) - .map_err(Into::into) - }) - .and_then(|(h, b)| { - b.ok_or(format!("Failed to find block {:#x}", h).into()) - .map(|b| (h, b)) - .into_future() - }) - .and_then(move |(bh, sb)| { - log::info!( - "Found block {:?}, with {} extrinsics", - bh, - sb.block.extrinsics.len() - ); - wait_for_block_events(ext_hash, &sb, bh, events) - }) - }, - ) - } -} - -/// Creates and signs an Extrinsic for the supplied `Call` -fn create_and_sign_extrinsic( - index: Index, - function: Call, - genesis_hash: Hash, - signer: &Pair, -) -> UncheckedExtrinsic { - log::info!( - "Creating Extrinsic with genesis hash {:?} and account nonce {:?}", - genesis_hash, - index - ); - - let extra = |nonce, fee| { - ( - srml_system::CheckGenesis::::new(), - srml_system::CheckEra::::from(Era::Immortal), - srml_system::CheckNonce::::from(nonce), - srml_system::CheckWeight::::new(), - srml_balances::TakeFees::::from(fee), - ) - }; - - let raw_payload = (function, extra(index, 0), genesis_hash); - let signature = raw_payload.using_encoded(|payload| { - if payload.len() > 256 { - signer.sign(&blake2_256(payload)[..]) - } else { - signer.sign(payload) - } - }); - - UncheckedExtrinsic::new_signed( - raw_payload.0, - signer.public().into(), - signature.into(), - extra(index, 0), - ) -} - -/// Waits for events for the block triggered by the extrinsic -fn wait_for_block_events( - ext_hash: H256, - signed_block: &SignedBlock, - block_hash: H256, - events: TypedSubscriptionStream>, -) -> impl Future { - let ext_index = signed_block - .block - .extrinsics - .iter() - .position(|ext| { - let hash = H256(ext.using_encoded(|encoded| blake2_256(encoded))); - hash == ext_hash - }) - .ok_or(format!("Failed to find Extrinsic with hash {:?}", ext_hash).into()) - .into_future(); - - let block_hash = block_hash.clone(); - let block_events = events - .map(|event| { - let records = event - .changes - .iter() - .filter_map(|(_key, data)| { - data.as_ref() - .and_then(|data| Decode::decode(&mut &data.0[..])) - }) - .flat_map(|events: Vec| events) - .collect::>(); - log::debug!("Block {:?}, Events {:?}", event.block, records.len()); - (event.block, records) - }) - .filter(move |(event_block, _)| *event_block == block_hash) - .into_future() - .map_err(|(e, _)| e.into()) - .map(|(events, _)| events); - - block_events - .join(ext_index) - .map(move |(events, ext_index)| { - let events: Vec = events - .iter() - .flat_map(|(_, events)| events) - .filter_map(|e| { - if let srml_system::Phase::ApplyExtrinsic(i) = e.phase { - if i as usize == ext_index { - Some(e.event.clone()) - } else { - None - } - } else { - None - } - }) - .collect::>(); - ExtrinsicSuccess { - block: block_hash, - extrinsic: ext_hash, - events, - } - }) +pub struct ExtrinsicSuccess { + pub block: T::Hash, + pub extrinsic: T::Hash, + pub events: Vec, } /// Creates, signs and submits an Extrinsic with the given `Call` to a substrate node. -pub fn submit(url: &Url, signer: Pair, call: Call) -> Result { +pub fn submit( + url: &Url, + signer: P, + call: C, + extra: E, +) -> Result> +where + T: srml_system::Trait, + P: Pair, + P::Signature: Codec, + P::Public: Into, + C: Codec + Send + 'static, + E: Fn(T::Index) -> SE + Send + 'static, + SE: SignedExtension + 'static, +{ let submit = ws::connect(url.as_str()) .expect("Url is a valid url; qed") .map_err(Into::into) - .and_then(|rpc: Rpc| rpc.create_and_submit_extrinsic(signer, call)); + .and_then(|rpc: rpc::Rpc| { + rpc.create_and_submit_extrinsic(signer, call, extra) + }); let mut rt = tokio::runtime::Runtime::new()?; rt.block_on(submit) } + +#[cfg(test)] +pub mod tests { + use node_runtime::Runtime; + use runtime_primitives::generic::Era; + use substrate_primitives::crypto::Pair as _; + + #[test] #[ignore] // requires locally running substrate node + fn node_runtime_balance_transfer() { + let url = url::Url::parse("ws://localhost:9944").unwrap(); + let signer = substrate_keyring::AccountKeyring::Alice.pair(); + + let dest = substrate_keyring::AccountKeyring::Bob.pair().public(); + let transfer = srml_balances::Call::transfer(dest.into(), 10_000); + let call = node_runtime::Call::Balances(transfer); + + let extra = |nonce| { + ( + srml_system::CheckGenesis::::new(), + srml_system::CheckEra::::from(Era::Immortal), + srml_system::CheckNonce::::from(nonce), + srml_system::CheckWeight::::new(), + srml_balances::TakeFees::::from(0), + ) + }; + let result = super::submit::(&url, signer, call, extra); + assert!(result.is_ok()) + } +} diff --git a/src/rpc.rs b/src/rpc.rs new file mode 100644 index 0000000000..91885f1b8a --- /dev/null +++ b/src/rpc.rs @@ -0,0 +1,345 @@ +// Copyright 2018-2019 Parity Technologies (UK) Ltd. +// This file is part of substrate-subxt. +// +// ink! is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// ink! is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with substrate-subxt. If not, see . + +use crate::{error::Error, ExtrinsicSuccess}; +use futures::{ + future::{self, Future, IntoFuture}, + stream::Stream, +}; +use jsonrpc_core_client::{RpcChannel, RpcError, TypedSubscriptionStream}; +use log; +use num_traits::bounds::Bounded; +use parity_codec::{Codec, Decode, Encode}; + +use runtime_primitives::{ + generic::UncheckedExtrinsic, + traits::{Hash as _, SignedExtension}, +}; +use runtime_support::StorageMap; +use serde::{self, de::Error as DeError, Deserialize}; +use std::marker::PhantomData; +use substrate_primitives::{ + blake2_256, + storage::{StorageChangeSet, StorageKey}, + twox_128, Pair, +}; +use substrate_rpc::{ + author::AuthorClient, + chain::{number::NumberOrHex, ChainClient}, + state::StateClient, +}; +use transaction_pool::txpool::watcher::Status; + +/// Copy of runtime_primitives::OpaqueExtrinsic to allow a local Deserialize impl +#[derive(PartialEq, Eq, Clone, Default, Encode, Decode)] +pub struct OpaqueExtrinsic(pub Vec); + +impl std::fmt::Debug for OpaqueExtrinsic { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + fmt, + "{}", + substrate_primitives::hexdisplay::HexDisplay::from(&self.0) + ) + } +} + +impl<'a> serde::Deserialize<'a> for OpaqueExtrinsic { + fn deserialize(de: D) -> std::result::Result + where + D: serde::Deserializer<'a>, + { + let r = substrate_primitives::bytes::deserialize(de)?; + Decode::decode(&mut &r[..]) + .ok_or(DeError::custom("Invalid value passed into decode")) + } +} + +/// Copy of runtime_primitives::generic::Block with Deserialize implemented +#[derive(PartialEq, Eq, Clone, Encode, Decode, Debug, Deserialize)] +pub struct Block { + // not included: pub header: Header, + /// The accompanying extrinsics. + pub extrinsics: Vec, +} + +/// Copy of runtime_primitives::generic::SignedBlock with Deserialize implemented +#[derive(PartialEq, Eq, Clone, Encode, Decode, Debug, Deserialize)] +pub struct SignedBlock { + /// Full block. + pub block: Block, +} + +/// Client for substrate rpc interfaces +pub struct Rpc { + state: StateClient, + chain: ChainClient, + author: AuthorClient, + _phantom: PhantomData<(C, P, E, SE)>, +} + +/// Allows connecting to all inner interfaces on the same RpcChannel +impl From for Rpc +where + T: srml_system::Trait, +{ + fn from(channel: RpcChannel) -> Rpc { + Rpc { + state: channel.clone().into(), + chain: channel.clone().into(), + author: channel.into(), + _phantom: PhantomData, + } + } +} + +impl Rpc +where + T: srml_system::Trait, + C: Codec + Send, + P: Pair, + P::Signature: Codec, + P::Public: Into, + E: Fn(T::Index) -> SE + Send, + SE: SignedExtension + Encode, +{ + /// Fetch the latest nonce for the given `AccountId` + fn fetch_nonce( + &self, + account: &T::AccountId, + ) -> impl Future::Index, Error = RpcError> { + let account_nonce_key = >::key_for(account); + let storage_key = blake2_256(&account_nonce_key).to_vec(); + + self.state + .storage(StorageKey(storage_key), None) + .map(|data| { + data.map_or(Default::default(), |d| { + Decode::decode(&mut &d.0[..]).expect("Account nonce is valid Index") + }) + }) + .map_err(Into::into) + } + + /// Fetch the genesis hash + fn fetch_genesis_hash( + &self, + ) -> impl Future, Error = RpcError> { + let block_zero = T::BlockNumber::min_value(); + self.chain.block_hash(Some(NumberOrHex::Number(block_zero))) + } + + /// Subscribe to substrate System Events + fn subscribe_events( + &self, + ) -> impl Future>, Error = RpcError> + { + let events_key = b"System Events"; + let storage_key = twox_128(events_key); + log::debug!("Events storage key {:?}", storage_key); + + self.state + .subscribe_storage(Some(vec![StorageKey(storage_key.to_vec())])) + } + + /// Submit an extrinsic, waiting for it to be finalized. + /// If successful, returns the block hash. + fn submit_and_watch( + self, + extrinsic: UncheckedExtrinsic, + ) -> impl Future { + self.author + .watch_extrinsic(extrinsic.encode().into()) + .map_err(Into::into) + .and_then(|stream| { + stream + .filter_map(|status| { + match status { + Status::Future | Status::Ready | Status::Broadcast(_) => None, // ignore in progress extrinsic for now + Status::Finalized(block_hash) => Some(Ok(block_hash)), + Status::Usurped(_) => Some(Err("Extrinsic Usurped".into())), + Status::Dropped => Some(Err("Extrinsic Dropped".into())), + Status::Invalid => Some(Err("Extrinsic Invalid".into())), + } + }) + .into_future() + .map_err(|(e, _)| e.into()) + .and_then(|(result, _)| { + result + .ok_or(Error::from("Stream terminated")) + .and_then(|r| r) + .into_future() + }) + }) + } + + /// Create and submit an extrinsic and return corresponding Event if successful + pub fn create_and_submit_extrinsic( + self, + signer: P, + call: C, + extra: E, + ) -> impl Future, Error = Error> { + let account_nonce = self + .fetch_nonce(&signer.public().into()) + .map_err(Into::into); + let genesis_hash = + self.fetch_genesis_hash() + .map_err(Into::into) + .and_then(|genesis_hash| { + future::result(genesis_hash.ok_or("Genesis hash not found".into())) + }); + let events = self.subscribe_events().map_err(Into::into); + + account_nonce.join3(genesis_hash, events).and_then( + move |(index, genesis_hash, events)| { + let extrinsic = Self::create_and_sign_extrinsic( + index, + call, + genesis_hash, + &signer, + extra, + ); + let ext_hash = T::Hashing::hash_of(&extrinsic); + + log::info!("Submitting Extrinsic `{:?}`", ext_hash); + + let chain = self.chain.clone(); + self.submit_and_watch(extrinsic) + .and_then(move |bh| { + log::info!("Fetching block {:?}", bh); + chain + .block(Some(bh)) + .map(move |b| (bh, b)) + .map_err(Into::into) + }) + .and_then(|(h, b)| { + b.ok_or(format!("Failed to find block {:?}", h).into()) + .map(|b| (h, b)) + .into_future() + }) + .and_then(move |(bh, sb)| { + log::info!( + "Found block {:?}, with {} extrinsics", + bh, + sb.block.extrinsics.len() + ); + wait_for_block_events::(ext_hash, &sb, bh, events) + }) + }, + ) + } + + /// Creates and signs an Extrinsic for the supplied `Call` + fn create_and_sign_extrinsic( + index: T::Index, + function: C, + genesis_hash: T::Hash, + signer: &P, + extra: E, + ) -> UncheckedExtrinsic { + log::info!( + "Creating Extrinsic with genesis hash {:?} and account nonce {:?}", + genesis_hash, + index + ); + + let raw_payload = (function, extra(index), genesis_hash); + let signature = raw_payload.using_encoded(|payload| { + if payload.len() > 256 { + signer.sign(&blake2_256(payload)[..]) + } else { + signer.sign(payload) + } + }); + + UncheckedExtrinsic::new_signed( + raw_payload.0, + signer.public().into(), + signature.into(), + extra(index), + ) + } +} + +/// Waits for events for the block triggered by the extrinsic +fn wait_for_block_events( + ext_hash: T::Hash, + signed_block: &SignedBlock, + block_hash: T::Hash, + events: TypedSubscriptionStream>, +) -> impl Future, Error = Error> +where + T: srml_system::Trait, +{ + let ext_index = signed_block + .block + .extrinsics + .iter() + .position(|ext| { + let hash = T::Hashing::hash_of(ext); + hash == ext_hash + }) + .ok_or(format!("Failed to find Extrinsic with hash {:?}", ext_hash).into()) + .into_future(); + + let block_hash = block_hash.clone(); + let block_events = events + .map(|event| { + let records = event + .changes + .iter() + .filter_map(|(_key, data)| { + data.as_ref() + .and_then(|data| Decode::decode(&mut &data.0[..])) + }) + .flat_map(|events: Vec>| { + events + }) + .collect::>(); + log::debug!("Block {:?}, Events {:?}", event.block, records.len()); + (event.block, records) + }) + .filter(move |(event_block, _)| *event_block == block_hash) + .into_future() + .map_err(|(e, _)| e.into()) + .map(|(events, _)| events); + + block_events + .join(ext_index) + .map(move |(events, ext_index)| { + let events: Vec = events + .iter() + .flat_map(|(_, events)| events) + .filter_map(|e| { + if let srml_system::Phase::ApplyExtrinsic(i) = e.phase { + if i as usize == ext_index { + Some(e.event.clone()) + } else { + None + } + } else { + None + } + }) + .collect::>(); + ExtrinsicSuccess { + block: block_hash, + extrinsic: ext_hash, + events, + } + }) +}