Fix asynchronous transaction rejections. (#3817)

* Fix handling transaction pool errors.

* Add test.

* Review suggestions.
This commit is contained in:
Tomasz Drwięga
2019-10-15 11:16:10 +02:00
committed by Bastian Köcher
parent ed449bf415
commit 9fe8ee4197
4 changed files with 73 additions and 41 deletions
+36 -35
View File
@@ -26,10 +26,9 @@ use log::warn;
use client::{self, Client};
use rpc::futures::{
Sink, Future,
stream::Stream as _,
future::result,
};
use futures03::{StreamExt as _, compat::Compat};
use futures03::{StreamExt as _, compat::Compat, future::ready};
use api::Subscriptions;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use codec::{Encode, Decode};
@@ -162,42 +161,44 @@ impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> whe
let best_block_hash = self.client.info().chain.best_hash;
let dxt = <<P as PoolChainApi>::Block as traits::Block>::Extrinsic::decode(&mut &xt[..])
.map_err(error::Error::from)?;
Ok(self.pool
.submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt)
.boxed()
.compat()
.map_err(|e| e.into_pool_error()
.map(error::Error::from)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
))
Ok(
self.pool
.submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt)
.map_err(|e| e.into_pool_error()
.map(error::Error::from)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
)
)
};
let future_watcher = match submit() {
Ok(future_watcher) => future_watcher,
Err(err) => {
// reject the subscriber (ignore errors - we don't care if subscriber is no longer there).
let _ = subscriber.reject(err.into());
return;
},
};
let subscriptions = self.subscriptions.clone();
let future = ready(submit())
.and_then(|res| res)
// convert the watcher into a `Stream`
.map(|res| res.map(|watcher| watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))))
// now handle the import result,
// start a new subscrition
.map(move |result| match result {
Ok(watcher) => {
subscriptions.add(subscriber, move |sink| {
sink
.sink_map_err(|_| unimplemented!())
.send_all(Compat::new(watcher))
.map(|_| ())
});
},
Err(err) => {
warn!("Failed to submit extrinsic: {}", err);
// reject the subscriber (ignore errors - we don't care if subscriber is no longer there).
let _ = subscriber.reject(err.into());
},
});
// make 'future' watcher be a future with output = stream of watcher events
let future_watcher = future_watcher
.map_err(|err| { warn!("Failed to submit extrinsic: {}", err); })
.map(|watcher| Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))));
// convert a 'future' watcher into the stream with single element = stream of watcher events
let watcher_stream = future_watcher.into_stream();
// and now flatten the 'watcher_stream' so that we'll have the stream with watcher events
let watcher_stream = watcher_stream.flatten();
self.subscriptions.add(subscriber, move |sink| {
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(watcher_stream)
.map(|_| ())
});
let res = self.subscriptions.executor()
.execute(Box::new(Compat::new(future.map(|_| Ok(())))));
if res.is_err() {
warn!("Error spawning subscription RPC task.");
}
}
fn unwatch_extrinsic(&self, _metadata: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {
+29 -5
View File
@@ -19,18 +19,19 @@ use super::*;
use std::sync::Arc;
use assert_matches::assert_matches;
use codec::Encode;
use transaction_pool::{
txpool::Pool,
FullChainApi,
};
use primitives::{
H256, blake2_256, hexdisplay::HexDisplay, testing::{ED25519, SR25519, KeyStore}, ed25519,
crypto::Pair,
};
use rpc::futures::Stream as _;
use test_client::{
self, AccountKeyring, runtime::{Extrinsic, Transfer, SessionKeys}, DefaultTestClientBuilderExt,
TestClientBuilderExt,
};
use transaction_pool::{
txpool::Pool,
FullChainApi,
};
use tokio::runtime;
fn uxt(sender: AccountKeyring, nonce: u64) -> Extrinsic {
@@ -102,7 +103,7 @@ fn should_watch_extrinsic() {
subscriptions: Subscriptions::new(Arc::new(runtime.executor())),
keystore: keystore.clone(),
};
let (subscriber, id_rx, data) = ::jsonrpc_pubsub::typed::Subscriber::new_test("test");
let (subscriber, id_rx, data) = jsonrpc_pubsub::typed::Subscriber::new_test("test");
// when
p.watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 0).encode().into());
@@ -132,6 +133,29 @@ fn should_watch_extrinsic() {
);
}
#[test]
fn should_return_watch_validation_error() {
//given
let mut runtime = runtime::Runtime::new().unwrap();
let client = Arc::new(test_client::new());
let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone())));
let keystore = KeyStore::new();
let p = Author {
client,
pool: pool.clone(),
subscriptions: Subscriptions::new(Arc::new(runtime.executor())),
keystore: keystore.clone(),
};
let (subscriber, id_rx, _data) = jsonrpc_pubsub::typed::Subscriber::new_test("test");
// when
p.watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 179).encode().into());
// then
let res = runtime.block_on(id_rx).unwrap();
assert!(res.is_err(), "Expected the transaction to be rejected as invalid.");
}
#[test]
fn should_return_pending_extrinsics() {
let runtime = runtime::Runtime::new().unwrap();