Extrinsics PubSub (#349)

* Extrinsic subscriptions.

* Handle RPC errors better.

* Add tests for extrinsics and unignored others.

* Handle client errors.

* Fix compilation.
This commit is contained in:
Tomasz Drwięga
2018-07-17 23:57:08 +02:00
committed by Gav Wood
parent 7dcbf77c9d
commit 7ce2a8552f
19 changed files with 278 additions and 71 deletions
+74 -10
View File
@@ -19,11 +19,18 @@
use std::sync::Arc;
use client::{self, Client};
use extrinsic_pool::api::{Error, ExtrinsicPool};
use codec::Codec;
use extrinsic_pool::{
api::{Error, ExtrinsicPool},
watcher::Status,
};
use jsonrpc_macros::pubsub;
use jsonrpc_pubsub::SubscriptionId;
use primitives::Bytes;
use runtime_primitives::{generic, traits::Block as BlockT};
use rpc::futures::{Sink, Stream, Future};
use runtime_primitives::{generic, traits};
use subscriptions::Subscriptions;
use tokio::runtime::TaskExecutor;
pub mod error;
@@ -35,44 +42,67 @@ use self::error::Result;
build_rpc_trait! {
/// Substrate authoring RPC API
pub trait AuthorApi<Hash, Extrinsic> {
type Metadata;
/// Submit extrinsic for inclusion in block.
#[rpc(name = "author_submitRichExtrinsic")]
fn submit_rich_extrinsic(&self, Extrinsic) -> Result<Hash>;
/// Submit hex-encoded extrinsic for inclusion in block.
#[rpc(name = "author_submitExtrinsic")]
fn submit_extrinsic(&self, Bytes) -> Result<Hash>;
#[pubsub(name = "author_extrinsicUpdate")] {
/// Submit an extrinsic to watch.
#[rpc(name = "author_submitAndWatchExtrinsic")]
fn watch_extrinsic(&self, Self::Metadata, pubsub::Subscriber<Status<Hash>>, Bytes);
/// Unsubscribe from extrinsic watching.
#[rpc(name = "author_unwatchExtrinsic")]
fn unwatch_extrinsic(&self, SubscriptionId) -> Result<bool>;
}
}
}
/// Authoring API
pub struct Author<B, E, Block: BlockT, P> {
pub struct Author<B, E, Block: traits::Block, P> {
/// Substrate client
client: Arc<Client<B, E, Block>>,
/// Extrinsic pool
pool: Arc<P>,
/// Subscriptions manager
subscriptions: Subscriptions,
}
impl<B, E, Block: BlockT, P> Author<B, E, Block, P> {
impl<B, E, Block: traits::Block, P> Author<B, E, Block, P> {
/// Create new instance of Authoring API.
pub fn new(client: Arc<Client<B, E, Block>>, pool: Arc<P>) -> Self {
Author { client, pool }
pub fn new(client: Arc<Client<B, E, Block>>, pool: Arc<P>, executor: TaskExecutor) -> Self {
Author {
client,
pool,
subscriptions: Subscriptions::new(executor),
}
}
}
impl<B, E, Block, P, Ex, Hash> AuthorApi<Hash, Ex> for Author<B, E, Block, P> where
B: client::backend::Backend<Block> + Send + Sync + 'static,
E: client::CallExecutor<Block> + Send + Sync + 'static,
Block: BlockT + 'static,
Block: traits::Block + 'static,
Hash: traits::MaybeSerializeDebug + Sync + Send + 'static,
P: ExtrinsicPool<Ex, generic::BlockId<Block>, Hash>,
P::Error: 'static,
Ex: Codec,
{
type Metadata = ::metadata::Metadata;
fn submit_extrinsic(&self, xt: Bytes) -> Result<Hash> {
self.submit_rich_extrinsic(Ex::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?)
let dxt = Ex::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?;
self.submit_rich_extrinsic(dxt)
}
fn submit_rich_extrinsic(&self, xt: Ex) -> Result<Hash> {
let best_block_hash = self.client.info().unwrap().chain.best_hash;
let best_block_hash = self.client.info()?.chain.best_hash;
self.pool
.submit(generic::BlockId::hash(best_block_hash), vec![xt])
.map(|mut res| res.pop().expect("One extrinsic passed; one result back; qed"))
@@ -81,4 +111,38 @@ impl<B, E, Block, P, Ex, Hash> AuthorApi<Hash, Ex> for Author<B, E, Block, P> wh
.unwrap_or_else(|e| error::ErrorKind::Verification(Box::new(e)).into())
)
}
fn watch_extrinsic(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber<Status<Hash>>, xt: Bytes) {
let submit = || -> Result<_> {
let best_block_hash = self.client.info()?.chain.best_hash;
let dxt = Ex::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?;
self.pool
.submit_and_watch(generic::BlockId::hash(best_block_hash), dxt)
.map_err(|e| e.into_pool_error()
.map(Into::into)
.unwrap_or_else(|e| error::ErrorKind::Verification(Box::new(e)).into())
)
};
let watcher = match submit() {
Ok(watcher) => watcher,
Err(err) => {
// reject the subscriber (ignore errors - we don't care if subscriber is no longer there).
let _ = subscriber.reject(err.into());
return;
},
};
self.subscriptions.add(subscriber, move |sink| {
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(watcher.into_stream().map(Ok))
.map(|_| ())
})
}
fn unwatch_extrinsic(&self, id: SubscriptionId) -> Result<bool> {
Ok(self.subscriptions.cancel(id))
}
}