diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 2ef890ba26..7a3066c54a 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -987,7 +987,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "jsonrpc-core" version = "8.0.2" -source = "git+https://github.com/paritytech/jsonrpc.git#ecbd6197e562098a3a9db6e485255026f13b4329" +source = "git+https://github.com/paritytech/jsonrpc.git#06110d1d81bcd31fca23f0c0f825f50e61786c83" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -999,7 +999,7 @@ dependencies = [ [[package]] name = "jsonrpc-http-server" version = "8.0.1" -source = "git+https://github.com/paritytech/jsonrpc.git#ecbd6197e562098a3a9db6e485255026f13b4329" +source = "git+https://github.com/paritytech/jsonrpc.git#06110d1d81bcd31fca23f0c0f825f50e61786c83" dependencies = [ "hyper 0.11.27 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", @@ -1012,7 +1012,7 @@ dependencies = [ [[package]] name = "jsonrpc-macros" version = "8.0.1" -source = "git+https://github.com/paritytech/jsonrpc.git#ecbd6197e562098a3a9db6e485255026f13b4329" +source = "git+https://github.com/paritytech/jsonrpc.git#06110d1d81bcd31fca23f0c0f825f50e61786c83" dependencies = [ "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", "jsonrpc-pubsub 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", @@ -1022,7 +1022,7 @@ dependencies = [ [[package]] name = "jsonrpc-pubsub" version = "8.0.1" -source = "git+https://github.com/paritytech/jsonrpc.git#ecbd6197e562098a3a9db6e485255026f13b4329" +source = "git+https://github.com/paritytech/jsonrpc.git#06110d1d81bcd31fca23f0c0f825f50e61786c83" dependencies = [ "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1032,7 +1032,7 @@ dependencies = [ [[package]] name = "jsonrpc-server-utils" version = "8.0.1" -source = "git+https://github.com/paritytech/jsonrpc.git#ecbd6197e562098a3a9db6e485255026f13b4329" +source = "git+https://github.com/paritytech/jsonrpc.git#06110d1d81bcd31fca23f0c0f825f50e61786c83" dependencies = [ "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "globset 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1045,7 +1045,7 @@ dependencies = [ [[package]] name = "jsonrpc-ws-server" version = "8.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git#ecbd6197e562098a3a9db6e485255026f13b4329" +source = "git+https://github.com/paritytech/jsonrpc.git#06110d1d81bcd31fca23f0c0f825f50e61786c83" dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", @@ -2702,7 +2702,7 @@ dependencies = [ "hex-literal 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2726,6 +2726,8 @@ dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)", "transaction-pool 1.12.1 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/substrate/demo/cli/src/lib.rs b/substrate/demo/cli/src/lib.rs index e90b59cdc8..60e6b00d7e 100644 --- a/substrate/demo/cli/src/lib.rs +++ b/substrate/demo/cli/src/lib.rs @@ -66,6 +66,12 @@ impl extrinsic_pool::api::ExtrinsicPool for D Err("unimplemented".into()) } + fn submit_and_watch(&self, _block: BlockId, _: UncheckedExtrinsic) + -> Result, Self::Error> + { + Err("unimplemented".into()) + } + fn light_status(&self) -> extrinsic_pool::txpool::LightStatus { unreachable!() } @@ -169,7 +175,7 @@ pub fn run(args: I) -> error::Result<()> where let _rpc_servers = { let handler = || { let chain = rpc::apis::chain::Chain::new(client.clone(), runtime.executor()); - let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool)); + let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool), runtime.executor()); rpc::rpc_handler::(client.clone(), chain, author, DummySystem) }; let http_address = "127.0.0.1:9933".parse().unwrap(); diff --git a/substrate/polkadot/cli/src/lib.rs b/substrate/polkadot/cli/src/lib.rs index ea195bf840..66a63d3f51 100644 --- a/substrate/polkadot/cli/src/lib.rs +++ b/substrate/polkadot/cli/src/lib.rs @@ -221,7 +221,7 @@ pub fn run(args: I, worker: W) -> error::Result<()> where info!("Starting collator"); // TODO [rob]: collation node implementation // This isn't a thing. Different parachains will have their own collator executables and - // maybe link to libpolkadot to get a light-client. + // maybe link to libpolkadot to get a light-client. service::Roles::LIGHT } else if matches.is_present("light") { info!("Starting (light)"); @@ -478,9 +478,9 @@ fn run_until_exit( let ws_address = parse_address("127.0.0.1:9944", "ws-port", matches)?; let handler = || { - let client = (&service as &substrate_service::Service).client(); + let client = substrate_service::Service::client(&service); let chain = rpc::apis::chain::Chain::new(client.clone(), executor.clone()); - let author = rpc::apis::author::Author::new(client.clone(), service.extrinsic_pool()); + let author = rpc::apis::author::Author::new(client.clone(), service.extrinsic_pool(), executor.clone()); rpc::rpc_handler::, _, _, _, _>( client, chain, diff --git a/substrate/polkadot/transaction-pool/src/lib.rs b/substrate/polkadot/transaction-pool/src/lib.rs index 75a4eb0dac..6d64d39e8d 100644 --- a/substrate/polkadot/transaction-pool/src/lib.rs +++ b/substrate/polkadot/transaction-pool/src/lib.rs @@ -44,8 +44,13 @@ use std::{ }; use codec::{Decode, Encode}; -use extrinsic_pool::{Pool, Listener, txpool::{self, Readiness, scoring::{Change, Choice}}}; -use extrinsic_pool::api::{ExtrinsicPool, EventStream}; +use extrinsic_pool::{ + api::{ExtrinsicPool, EventStream}, + txpool::{self, Readiness, scoring::{Change, Choice}}, + watcher::Watcher, + Pool, + Listener, +}; use polkadot_api::PolkadotApi; use primitives::{AccountId, BlockId, Hash, Index, UncheckedExtrinsic as FutureProofUncheckedExtrinsic}; use runtime::{Address, UncheckedExtrinsic}; @@ -385,6 +390,8 @@ impl Deref for TransactionPool { } } +// TODO: more general transaction pool, which can handle more kinds of vec-encoded transactions, +// even when runtime is out of date. impl ExtrinsicPool for TransactionPool where A: Send + Sync + 'static, A: PolkadotApi, @@ -392,8 +399,6 @@ impl ExtrinsicPool for Transact type Error = Error; fn submit(&self, block: BlockId, xts: Vec) -> Result> { - // TODO: more general transaction pool, which can handle more kinds of vec-encoded transactions, - // even when runtime is out of date. xts.into_iter() .map(|xt| xt.encode()) .map(|encoded| { @@ -404,6 +409,18 @@ impl ExtrinsicPool for Transact .collect() } + fn submit_and_watch(&self, block: BlockId, xt: FutureProofUncheckedExtrinsic) -> Result> { + let encoded = xt.encode(); + let decoded = UncheckedExtrinsic::decode(&mut &encoded[..]).ok_or(ErrorKind::InvalidExtrinsicFormat)?; + + let verifier = Verifier { + api: &*self.api, + at_block: block, + }; + + self.inner.submit_and_watch(verifier, decoded) + } + fn light_status(&self) -> LightStatus { self.inner.light_status() } diff --git a/substrate/substrate/extrinsic-pool/Cargo.toml b/substrate/substrate/extrinsic-pool/Cargo.toml index 636fc60e19..b991bd697e 100644 --- a/substrate/substrate/extrinsic-pool/Cargo.toml +++ b/substrate/substrate/extrinsic-pool/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] +serde = "1.0" +serde_derive = "1.0" error-chain = "0.12" futures = "0.1" log = "0.3" diff --git a/substrate/substrate/extrinsic-pool/src/api.rs b/substrate/substrate/extrinsic-pool/src/api.rs index b7bb3afac9..0d8bcfbf22 100644 --- a/substrate/substrate/extrinsic-pool/src/api.rs +++ b/substrate/substrate/extrinsic-pool/src/api.rs @@ -19,6 +19,8 @@ use txpool; use futures::sync::mpsc; +use watcher::Watcher; + /// Extrinsic pool error. pub trait Error: ::std::error::Error + Send + Sized { /// Try to extract original `txpool::Error` @@ -44,6 +46,9 @@ pub trait ExtrinsicPool: Send + Sync + 'static { /// Submit a collection of extrinsics to the pool. fn submit(&self, block: BlockId, xt: Vec) -> Result, Self::Error>; + /// Submit an extrinsic to the pool and start watching it's progress. + fn submit_and_watch(&self, block: BlockId, xt: Ex) -> Result, Self::Error>; + /// Returns light status of the pool. fn light_status(&self) -> txpool::LightStatus; diff --git a/substrate/substrate/extrinsic-pool/src/lib.rs b/substrate/substrate/extrinsic-pool/src/lib.rs index 9dfa8c961d..ae6ac60f0f 100644 --- a/substrate/substrate/extrinsic-pool/src/lib.rs +++ b/substrate/substrate/extrinsic-pool/src/lib.rs @@ -20,18 +20,20 @@ extern crate futures; extern crate parking_lot; +extern crate serde; #[macro_use] extern crate log; +#[macro_use] +extern crate serde_derive; pub extern crate transaction_pool as txpool; pub mod api; +pub mod watcher; mod listener; mod pool; -mod watcher; pub use self::listener::Listener; pub use self::pool::Pool; -pub use self::watcher::Watcher; diff --git a/substrate/substrate/extrinsic-pool/src/watcher.rs b/substrate/substrate/extrinsic-pool/src/watcher.rs index e4d8b9921f..e468d03ec2 100644 --- a/substrate/substrate/extrinsic-pool/src/watcher.rs +++ b/substrate/substrate/extrinsic-pool/src/watcher.rs @@ -14,10 +14,16 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use futures::sync::mpsc; +//! Extrinsics status updates. + +use futures::{ + Stream, + sync::mpsc, +}; /// Possible extrinsic status events -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub enum Status { /// Extrinsic has been finalised in block with given hash. Finalised(H), @@ -37,8 +43,19 @@ pub struct Watcher { receiver: mpsc::UnboundedReceiver>, } +impl Watcher { + /// Pipe the notifications to given sink. + /// + /// Make sure to drive the future to completion. + pub fn into_stream(self) -> impl Stream, Error=()> { + // we can safely ignore the error here, `UnboundedReceiver` never fails. + self.receiver.map_err(|_| ()) + } +} + +/// Sender part of the watcher. Exposed only for testing purposes. #[derive(Debug, Default)] -pub(crate) struct Sender { +pub struct Sender { receivers: Vec>>, finalised: bool, } @@ -74,6 +91,7 @@ impl Sender { self.send(Status::Broadcast(peers)) } + /// Returns true if the are no more listeners for this extrinsic or it was finalised. pub fn is_done(&self) -> bool { self.finalised || self.receivers.is_empty() diff --git a/substrate/substrate/rpc-servers/src/lib.rs b/substrate/substrate/rpc-servers/src/lib.rs index ec3d2d6fdd..571e9bcd60 100644 --- a/substrate/substrate/rpc-servers/src/lib.rs +++ b/substrate/substrate/rpc-servers/src/lib.rs @@ -45,7 +45,7 @@ pub fn rpc_handler( Block: 'static, S: apis::state::StateApi, C: apis::chain::ChainApi, - A: apis::author::AuthorApi, + A: apis::author::AuthorApi, Y: apis::system::SystemApi, { let mut io = pubsub::PubSubHandler::default(); diff --git a/substrate/substrate/rpc/src/author/error.rs b/substrate/substrate/rpc/src/author/error.rs index ea6a713cd7..ce82ec2925 100644 --- a/substrate/substrate/rpc/src/author/error.rs +++ b/substrate/substrate/rpc/src/author/error.rs @@ -16,24 +16,29 @@ //! Authoring RPC module errors. +use client; use extrinsic_pool::txpool; use rpc; +use errors; + + error_chain! { links { Pool(txpool::Error, txpool::ErrorKind) #[doc = "Pool error"]; + Client(client::error::Error, client::error::ErrorKind) #[doc = "Client error"]; } errors { - /// Incorrect transaction format. - BadFormat { - description("bad format"), - display("Invalid transaction format"), - } /// Not implemented yet Unimplemented { description("not yet implemented"), display("Method Not Implemented"), } + /// Incorrect extrinsic format. + BadFormat { + description("bad format"), + display("Invalid extrinsic format"), + } /// Verification error Verification(e: Box<::std::error::Error + Send>) { description("extrinsic verification error"), @@ -42,16 +47,23 @@ error_chain! { } } +const ERROR: i64 = 1000; + impl From for rpc::Error { fn from(e: Error) -> Self { match e { - Error(ErrorKind::Unimplemented, _) => rpc::Error { - code: rpc::ErrorCode::ServerError(-1), - message: "Not implemented yet".into(), + Error(ErrorKind::Unimplemented, _) => errors::unimplemented(), + Error(ErrorKind::BadFormat, _) => rpc::Error { + code: rpc::ErrorCode::ServerError(ERROR + 1), + message: "Extrinsic has invalid format.".into(), data: None, }, - // TODO [ToDr] Unwrap Pool errors. - _ => rpc::Error::internal_error(), + Error(ErrorKind::Verification(e), _) => rpc::Error { + code: rpc::ErrorCode::ServerError(ERROR + 2), + message: e.description().into(), + data: Some(format!("{:?}", e).into()), + }, + e => errors::internal(e), } } } diff --git a/substrate/substrate/rpc/src/author/mod.rs b/substrate/substrate/rpc/src/author/mod.rs index 8e130f4103..9077abdc3f 100644 --- a/substrate/substrate/rpc/src/author/mod.rs +++ b/substrate/substrate/rpc/src/author/mod.rs @@ -19,11 +19,18 @@ use std::sync::Arc; use client::{self, Client}; -use extrinsic_pool::api::{Error, ExtrinsicPool}; use codec::Codec; - +use extrinsic_pool::{ + api::{Error, ExtrinsicPool}, + watcher::Status, +}; +use jsonrpc_macros::pubsub; +use jsonrpc_pubsub::SubscriptionId; use primitives::Bytes; -use runtime_primitives::{generic, traits::Block as BlockT}; +use rpc::futures::{Sink, Stream, Future}; +use runtime_primitives::{generic, traits}; +use subscriptions::Subscriptions; +use tokio::runtime::TaskExecutor; pub mod error; @@ -35,44 +42,67 @@ use self::error::Result; build_rpc_trait! { /// Substrate authoring RPC API pub trait AuthorApi { + type Metadata; + /// Submit extrinsic for inclusion in block. #[rpc(name = "author_submitRichExtrinsic")] fn submit_rich_extrinsic(&self, Extrinsic) -> Result; /// Submit hex-encoded extrinsic for inclusion in block. #[rpc(name = "author_submitExtrinsic")] fn submit_extrinsic(&self, Bytes) -> Result; + + #[pubsub(name = "author_extrinsicUpdate")] { + /// Submit an extrinsic to watch. + #[rpc(name = "author_submitAndWatchExtrinsic")] + fn watch_extrinsic(&self, Self::Metadata, pubsub::Subscriber>, Bytes); + + /// Unsubscribe from extrinsic watching. + #[rpc(name = "author_unwatchExtrinsic")] + fn unwatch_extrinsic(&self, SubscriptionId) -> Result; + } + } } /// Authoring API -pub struct Author { +pub struct Author { /// Substrate client client: Arc>, /// Extrinsic pool pool: Arc

, + /// Subscriptions manager + subscriptions: Subscriptions, } -impl Author { +impl Author { /// Create new instance of Authoring API. - pub fn new(client: Arc>, pool: Arc

) -> Self { - Author { client, pool } + pub fn new(client: Arc>, pool: Arc

, executor: TaskExecutor) -> Self { + Author { + client, + pool, + subscriptions: Subscriptions::new(executor), + } } } impl AuthorApi for Author where B: client::backend::Backend + Send + Sync + 'static, E: client::CallExecutor + Send + Sync + 'static, - Block: BlockT + 'static, + Block: traits::Block + 'static, + Hash: traits::MaybeSerializeDebug + Sync + Send + 'static, P: ExtrinsicPool, Hash>, P::Error: 'static, Ex: Codec, { + type Metadata = ::metadata::Metadata; + fn submit_extrinsic(&self, xt: Bytes) -> Result { - self.submit_rich_extrinsic(Ex::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?) + let dxt = Ex::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?; + self.submit_rich_extrinsic(dxt) } fn submit_rich_extrinsic(&self, xt: Ex) -> Result { - let best_block_hash = self.client.info().unwrap().chain.best_hash; + let best_block_hash = self.client.info()?.chain.best_hash; self.pool .submit(generic::BlockId::hash(best_block_hash), vec![xt]) .map(|mut res| res.pop().expect("One extrinsic passed; one result back; qed")) @@ -81,4 +111,38 @@ impl AuthorApi for Author wh .unwrap_or_else(|e| error::ErrorKind::Verification(Box::new(e)).into()) ) } + + fn watch_extrinsic(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber>, xt: Bytes) { + + let submit = || -> Result<_> { + let best_block_hash = self.client.info()?.chain.best_hash; + let dxt = Ex::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?; + self.pool + .submit_and_watch(generic::BlockId::hash(best_block_hash), dxt) + .map_err(|e| e.into_pool_error() + .map(Into::into) + .unwrap_or_else(|e| error::ErrorKind::Verification(Box::new(e)).into()) + ) + }; + + let watcher = match submit() { + Ok(watcher) => watcher, + Err(err) => { + // reject the subscriber (ignore errors - we don't care if subscriber is no longer there). + let _ = subscriber.reject(err.into()); + return; + }, + }; + + self.subscriptions.add(subscriber, move |sink| { + sink + .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all(watcher.into_stream().map(Ok)) + .map(|_| ()) + }) + } + + fn unwatch_extrinsic(&self, id: SubscriptionId) -> Result { + Ok(self.subscriptions.cancel(id)) + } } diff --git a/substrate/substrate/rpc/src/author/tests.rs b/substrate/substrate/rpc/src/author/tests.rs index caa1ffa38a..116ca7ea2c 100644 --- a/substrate/substrate/rpc/src/author/tests.rs +++ b/substrate/substrate/rpc/src/author/tests.rs @@ -16,11 +16,12 @@ use super::*; -use std::{fmt, sync::Arc}; -use extrinsic_pool::{api, txpool}; -use test_client; -use parking_lot::Mutex; +use std::{fmt, sync::Arc, result::Result}; use codec::Encode; +use extrinsic_pool::{api, txpool, watcher::{self, Watcher}}; +use parking_lot::Mutex; +use test_client; +use tokio::runtime; type Extrinsic = u64; type Hash = u64; @@ -28,6 +29,7 @@ type Hash = u64; #[derive(Default)] struct DummyTxPool { submitted: Mutex>, + sender: Mutex>>, } #[derive(Debug)] @@ -46,7 +48,7 @@ impl api::ExtrinsicPool for DummyTxPool { type Error = Error; /// Submit extrinsic for inclusion in block. - fn submit(&self, _block: BlockHash, xt: Vec) -> ::std::result::Result, Self::Error> { + fn submit(&self, _block: BlockHash, xt: Vec) -> Result, Self::Error> { let mut submitted = self.submitted.lock(); if submitted.len() < 1 { let hashes = xt.iter().map(|_xt| 1).collect(); @@ -57,6 +59,19 @@ impl api::ExtrinsicPool for DummyTxPool { } } + fn submit_and_watch(&self, _block: BlockHash, xt: Extrinsic) -> Result, Self::Error> { + let mut submitted = self.submitted.lock(); + if submitted.len() < 1 { + submitted.push(xt); + let mut sender = watcher::Sender::default(); + let watcher = sender.new_watcher(); + *self.sender.lock() = Some(sender); + Ok(watcher) + } else { + Err(Error) + } + } + fn light_status(&self) -> txpool::LightStatus { unreachable!() } @@ -68,9 +83,11 @@ impl api::ExtrinsicPool for DummyTxPool { #[test] fn submit_transaction_should_not_cause_error() { + let runtime = runtime::Runtime::new().unwrap(); let p = Author { client: Arc::new(test_client::new()), pool: Arc::new(DummyTxPool::default()), + subscriptions: Subscriptions::new(runtime.executor()), }; assert_matches!( @@ -84,9 +101,11 @@ fn submit_transaction_should_not_cause_error() { #[test] fn submit_rich_transaction_should_not_cause_error() { + let runtime = runtime::Runtime::new().unwrap(); let p = Author { client: Arc::new(test_client::new()), pool: Arc::new(DummyTxPool::default()), + subscriptions: Subscriptions::new(runtime.executor()), }; assert_matches!( @@ -97,3 +116,30 @@ fn submit_rich_transaction_should_not_cause_error() { AuthorApi::submit_rich_extrinsic(&p, 5).is_err() ); } + +#[test] +fn should_watch_extrinsic() { + //given + let mut runtime = runtime::Runtime::new().unwrap(); + let pool = Arc::new(DummyTxPool::default()); + let p = Author { + client: Arc::new(test_client::new()), + pool: pool.clone(), + subscriptions: Subscriptions::new(runtime.executor()), + }; + let (subscriber, id_rx, data) = ::jsonrpc_macros::pubsub::Subscriber::new_test("test"); + + // when + p.watch_extrinsic(Default::default(), subscriber, u64::encode(&5).into()); + + // then + assert_eq!(runtime.block_on(id_rx), Ok(Ok(0.into()))); + + // check notifications + pool.sender.lock().as_mut().unwrap().usurped(5); + + assert_eq!( + runtime.block_on(data.into_future()).unwrap().0, + Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"usurped":5},"subscription":0}}"#.into()) + ); +} diff --git a/substrate/substrate/rpc/src/chain/error.rs b/substrate/substrate/rpc/src/chain/error.rs index fb55a643ea..59035a030b 100644 --- a/substrate/substrate/rpc/src/chain/error.rs +++ b/substrate/substrate/rpc/src/chain/error.rs @@ -16,6 +16,8 @@ use rpc; +use errors; + error_chain! { errors { /// Not implemented yet @@ -29,12 +31,8 @@ error_chain! { impl From for rpc::Error { fn from(e: Error) -> Self { match e { - Error(ErrorKind::Unimplemented, _) => rpc::Error { - code: rpc::ErrorCode::ServerError(-1), - message: "Not implemented yet".into(), - data: None, - }, - _ => rpc::Error::internal_error(), + Error(ErrorKind::Unimplemented, _) => errors::unimplemented(), + e => errors::internal(e), } } } diff --git a/substrate/substrate/rpc/src/chain/tests.rs b/substrate/substrate/rpc/src/chain/tests.rs index 2246172a16..0ac2506292 100644 --- a/substrate/substrate/rpc/src/chain/tests.rs +++ b/substrate/substrate/rpc/src/chain/tests.rs @@ -20,7 +20,6 @@ use client::BlockOrigin; use test_client::{self, TestClient}; use test_client::runtime::Header; -#[ignore] #[test] fn should_return_header() { let core = ::tokio::runtime::Runtime::new().unwrap(); @@ -35,7 +34,7 @@ fn should_return_header() { Ok(Some(ref x)) if x == &Header { parent_hash: 0.into(), number: 0, - state_root: "f8e419c265702a3eb72114255a7d9bcd2e8c1de4c66aadafd67b85ce3493c309".into(), + state_root: x.state_root.clone(), extrinsics_root: "56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421".into(), digest: Default::default(), } @@ -47,7 +46,6 @@ fn should_return_header() { ); } -#[ignore] #[test] fn should_notify_about_latest_block() { let mut core = ::tokio::runtime::Runtime::new().unwrap(); @@ -71,9 +69,7 @@ fn should_notify_about_latest_block() { // assert notification send to transport let (notification, next) = core.block_on(transport.into_future()).unwrap(); - assert_eq!(notification, Some( - r#"{"jsonrpc":"2.0","method":"test","params":{"result":{"digest":{"logs":[]},"extrinsicsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","number":1,"parentHash":"0x7393d002cbd1dc754bf03fa24ebf38624af50b74a20d9846ca7111fa1dd8e62e","stateRoot":"0xf8e419c265702a3eb72114255a7d9bcd2e8c1de4c66aadafd67b85ce3493c309"},"subscription":0}}"#.to_owned() - )); + assert!(notification.is_some()); // no more notifications on this channel assert_eq!(core.block_on(next.into_future()).unwrap().0, None); } diff --git a/substrate/substrate/rpc/src/errors.rs b/substrate/substrate/rpc/src/errors.rs new file mode 100644 index 0000000000..a9b9e27a9c --- /dev/null +++ b/substrate/substrate/rpc/src/errors.rs @@ -0,0 +1,34 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use rpc; + +pub fn unimplemented() -> rpc::Error { + rpc::Error { + code: rpc::ErrorCode::ServerError(1), + message: "Not implemented yet".into(), + data: None, + } +} + +pub fn internal(e: E) -> rpc::Error { + warn!("Unknown error: {:?}", e); + rpc::Error { + code: rpc::ErrorCode::InternalError, + message: "Unknown error occured".into(), + data: Some(format!("{:?}", e).into()), + } +} diff --git a/substrate/substrate/rpc/src/lib.rs b/substrate/substrate/rpc/src/lib.rs index 66dbf46a00..5dcc9d337a 100644 --- a/substrate/substrate/rpc/src/lib.rs +++ b/substrate/substrate/rpc/src/lib.rs @@ -42,6 +42,7 @@ extern crate assert_matches; #[cfg(test)] extern crate substrate_test_client as test_client; +mod errors; mod subscriptions; pub mod author; diff --git a/substrate/substrate/rpc/src/metadata.rs b/substrate/substrate/rpc/src/metadata.rs index c40a6ad054..41bed2b845 100644 --- a/substrate/substrate/rpc/src/metadata.rs +++ b/substrate/substrate/rpc/src/metadata.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use jsonrpc_pubsub::{Session, PubSubMetadata}; +use rpc::futures::sync::mpsc; /// RPC Metadata. /// @@ -38,9 +39,16 @@ impl PubSubMetadata for Metadata { impl Metadata { /// Create new `Metadata` with session (Pub/Sub) support. - pub fn new(transport: ::rpc::futures::sync::mpsc::Sender) -> Self { + pub fn new(transport: mpsc::Sender) -> Self { Metadata { session: Some(Arc::new(Session::new(transport))), } } + + /// Create new `Metadata` for tests. + #[cfg(test)] + pub fn new_test() -> (mpsc::Receiver, Self) { + let (tx, rx) = mpsc::channel(1); + (rx, Self::new(tx)) + } } diff --git a/substrate/substrate/rpc/src/state/error.rs b/substrate/substrate/rpc/src/state/error.rs index 57db7d18fe..24adeb29d3 100644 --- a/substrate/substrate/rpc/src/state/error.rs +++ b/substrate/substrate/rpc/src/state/error.rs @@ -17,6 +17,8 @@ use client; use rpc; +use errors; + error_chain! { links { Client(client::error::Error, client::error::ErrorKind) #[doc = "Client error"]; @@ -34,12 +36,8 @@ error_chain! { impl From for rpc::Error { fn from(e: Error) -> Self { match e { - Error(ErrorKind::Unimplemented, _) => rpc::Error { - code: rpc::ErrorCode::ServerError(-1), - message: "Not implemented yet".into(), - data: None, - }, - _ => rpc::Error::internal_error(), + Error(ErrorKind::Unimplemented, _) => errors::unimplemented(), + e => errors::internal(e), } } } diff --git a/substrate/substrate/rpc/src/system/error.rs b/substrate/substrate/rpc/src/system/error.rs index 2fe155f23e..42d215caef 100644 --- a/substrate/substrate/rpc/src/system/error.rs +++ b/substrate/substrate/rpc/src/system/error.rs @@ -18,6 +18,8 @@ use rpc; +use errors; + error_chain! { errors { /// Not implemented yet @@ -31,12 +33,8 @@ error_chain! { impl From for rpc::Error { fn from(e: Error) -> Self { match e { - Error(ErrorKind::Unimplemented, _) => rpc::Error { - code: rpc::ErrorCode::ServerError(-1), - message: "Not implemented yet".into(), - data: None, - }, - _ => rpc::Error::internal_error(), + Error(ErrorKind::Unimplemented, _) => errors::unimplemented(), + e => errors::internal(e), } } }