Return correct hash for finalised transactions. (#858)

This commit is contained in:
Tomasz Drwięga
2018-10-01 18:49:48 +02:00
committed by Gav Wood
parent e8d88fcb7b
commit ec8dda0dd4
8 changed files with 98 additions and 42 deletions
+1 -1
View File
@@ -51,7 +51,7 @@ pub fn rpc_handler<Block: BlockT, ExHash, PendingExtrinsics, S, C, A, Y>(
PendingExtrinsics: serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
S: apis::state::StateApi<Block::Hash, Metadata=Metadata>,
C: apis::chain::ChainApi<Block::Hash, Block::Header, NumberFor<Block>, Block::Extrinsic, Metadata=Metadata>,
A: apis::author::AuthorApi<ExHash, Block::Extrinsic, PendingExtrinsics, Metadata=Metadata>,
A: apis::author::AuthorApi<ExHash, Block::Hash, Block::Extrinsic, PendingExtrinsics, Metadata=Metadata>,
Y: apis::system::SystemApi,
{
let mut io = pubsub::PubSubHandler::default();
+5 -4
View File
@@ -29,6 +29,7 @@ use transaction_pool::{
AllExtrinsics,
ExHash,
ExtrinsicFor,
HashOf,
};
use jsonrpc_macros::pubsub;
use jsonrpc_pubsub::SubscriptionId;
@@ -46,7 +47,7 @@ use self::error::Result;
build_rpc_trait! {
/// Substrate authoring RPC API
pub trait AuthorApi<Hash, Extrinsic, PendingExtrinsics> {
pub trait AuthorApi<Hash, BlockHash, Extrinsic, PendingExtrinsics> {
type Metadata;
/// Submit extrinsic for inclusion in block.
@@ -63,7 +64,7 @@ build_rpc_trait! {
#[pubsub(name = "author_extrinsicUpdate")] {
/// Submit an extrinsic to watch.
#[rpc(name = "author_submitAndWatchExtrinsic")]
fn watch_extrinsic(&self, Self::Metadata, pubsub::Subscriber<Status<Hash>>, Bytes);
fn watch_extrinsic(&self, Self::Metadata, pubsub::Subscriber<Status<Hash, BlockHash>>, Bytes);
/// Unsubscribe from extrinsic watching.
#[rpc(name = "author_unwatchExtrinsic")]
@@ -102,7 +103,7 @@ impl<B, E, P> Author<B, E, P> where
}
}
impl<B, E, P> AuthorApi<ExHash<P>, ExtrinsicFor<P>, AllExtrinsics<P>> for Author<B, E, P> where
impl<B, E, P> AuthorApi<ExHash<P>, HashOf<P::Block>, ExtrinsicFor<P>, AllExtrinsics<P>> for Author<B, E, P> where
B: client::backend::Backend<<P as PoolChainApi>::Block, Blake2Hasher> + Send + Sync + 'static,
E: client::CallExecutor<<P as PoolChainApi>::Block, Blake2Hasher> + Send + Sync + 'static,
P: PoolChainApi + Sync + Send + 'static,
@@ -130,7 +131,7 @@ impl<B, E, P> AuthorApi<ExHash<P>, ExtrinsicFor<P>, AllExtrinsics<P>> for Author
Ok(self.pool.all())
}
fn watch_extrinsic(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber<Status<ExHash<P>>>, xt: Bytes) {
fn watch_extrinsic(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber<Status<ExHash<P>, HashOf<P::Block>>>, xt: Bytes) {
let submit = || -> Result<_> {
let best_block_hash = self.client.info()?.chain.best_hash;
let dxt = <<P as PoolChainApi>::Block as traits::Block>::Extrinsic::decode(&mut &xt[..]).ok_or(error::Error::from(error::ErrorKind::BadFormat))?;
+5 -1
View File
@@ -23,7 +23,7 @@ use transaction_pool::{VerifiedTransaction, scoring, Transaction, ChainApi, Erro
use test_client::runtime::{Block, Extrinsic, Transfer};
use test_client;
use tokio::runtime;
use runtime_primitives::generic::BlockId;
use runtime_primitives::{traits, generic::BlockId};
#[derive(Clone, Debug)]
pub struct Verified
@@ -53,6 +53,10 @@ impl ChainApi for TestApi {
type Event = ();
type Ready = ();
fn latest_hash(&self) -> <Block as traits::Block>::Hash {
1.into()
}
fn verify_transaction(&self, _at: &BlockId<Block>, uxt: &ExtrinsicFor<Self>) -> Result<Self::VEx, Self::Error> {
Ok(Verified {
sender: uxt.transfer.from[31] as u64,
+1 -1
View File
@@ -42,7 +42,7 @@ mod pool;
mod rotator;
pub use listener::Listener;
pub use pool::{Pool, ChainApi, EventStream, Verified, VerifiedFor, ExtrinsicFor, ExHash, AllExtrinsics};
pub use pool::{Pool, ChainApi, EventStream, Verified, VerifiedFor, ExtrinsicFor, ExHash, AllExtrinsics, HashOf};
pub use txpool::scoring;
pub use txpool::{Error, ErrorKind};
pub use error::IntoPoolError;
+29 -11
View File
@@ -20,20 +20,38 @@ use std::{
collections::HashMap,
};
use txpool;
use watcher;
/// Extrinsic pool default listener.
#[derive(Default)]
pub struct Listener<H: ::std::hash::Hash + Eq> {
watchers: HashMap<H, watcher::Sender<H>>
/// Returns the hash of the latest block.
pub trait LatestHash {
type Hash: Clone;
/// Hash of the latest block.
fn latest_hash(&self) -> Self::Hash;
}
impl<H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default> Listener<H> {
/// Extrinsic pool default listener.
pub struct Listener<H: ::std::hash::Hash + Eq, C: LatestHash> {
watchers: HashMap<H, watcher::Sender<H, C::Hash>>,
chain: C,
}
impl<H, C> Listener<H, C> where
H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default,
C: LatestHash,
{
/// Creates a new listener with given latest hash provider.
pub fn new(chain: C) -> Self {
Listener {
watchers: Default::default(),
chain,
}
}
/// Creates a new watcher for given verified extrinsic.
///
/// The watcher can be used to subscribe to lifecycle events of that extrinsic.
pub fn create_watcher<T: txpool::VerifiedTransaction<Hash=H>>(&mut self, xt: Arc<T>) -> watcher::Watcher<H> {
pub fn create_watcher<T: txpool::VerifiedTransaction<Hash=H>>(&mut self, xt: Arc<T>) -> watcher::Watcher<H, C::Hash> {
let sender = self.watchers.entry(*xt.hash()).or_insert_with(watcher::Sender::default);
sender.new_watcher()
}
@@ -43,7 +61,7 @@ impl<H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default> Li
self.fire(hash, |watcher| watcher.broadcast(peers));
}
fn fire<F>(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender<H>) {
fn fire<F>(&mut self, hash: &H, fun: F) where F: FnOnce(&mut watcher::Sender<H, C::Hash>) {
let clean = if let Some(h) = self.watchers.get_mut(hash) {
fun(h);
h.is_done()
@@ -57,9 +75,10 @@ impl<H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default> Li
}
}
impl<H, T> txpool::Listener<T> for Listener<H> where
impl<H, T, C> txpool::Listener<T> for Listener<H, C> where
H: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Default,
T: txpool::VerifiedTransaction<Hash=H>,
C: LatestHash,
{
fn added(&mut self, tx: &Arc<T>, old: Option<&Arc<T>>) {
if let Some(old) = old {
@@ -88,8 +107,7 @@ impl<H, T> txpool::Listener<T> for Listener<H> where
}
fn culled(&mut self, tx: &Arc<T>) {
// TODO [ToDr] latest block number?
let header_hash = Default::default();
let header_hash = self.chain.latest_hash();
self.fire(tx.hash(), |watcher| watcher.finalised(header_hash))
}
}
+31 -11
View File
@@ -26,11 +26,11 @@ use serde::{Serialize, de::DeserializeOwned};
use txpool::{self, Scoring, Readiness};
use error::IntoPoolError;
use listener::Listener;
use listener::{self, Listener};
use rotator::PoolRotator;
use watcher::Watcher;
use runtime_primitives::{generic::BlockId, traits::Block as BlockT};
use runtime_primitives::{generic::BlockId, traits};
/// Modification notification event stream type;
pub type EventStream = mpsc::UnboundedReceiver<()>;
@@ -38,7 +38,7 @@ pub type EventStream = mpsc::UnboundedReceiver<()>;
/// Extrinsic hash type for a pool.
pub type ExHash<A> = <A as ChainApi>::Hash;
/// Extrinsic type for a pool.
pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as BlockT>::Extrinsic;
pub type ExtrinsicFor<A> = <<A as ChainApi>::Block as traits::Block>::Extrinsic;
/// Verified extrinsic data for `ChainApi`.
pub type VerifiedFor<A> = Verified<ExtrinsicFor<A>, <A as ChainApi>::VEx>;
/// A collection of all extrinsics.
@@ -80,7 +80,7 @@ where
/// Concrete extrinsic validation and query logic.
pub trait ChainApi: Send + Sync {
/// Block type.
type Block: BlockT;
type Block: traits::Block;
/// Extrinsic hash type.
type Hash: ::std::hash::Hash + Eq + Copy + fmt::Debug + fmt::LowerHex + Serialize + DeserializeOwned + ::std::str::FromStr + Send + Sync + Default + 'static;
/// Extrinsic sender type.
@@ -96,6 +96,7 @@ pub trait ChainApi: Send + Sync {
type Score: ::std::cmp::Ord + Clone + Default + fmt::Debug + Send + Send + Sync + fmt::LowerHex;
/// Custom scoring update event type.
type Event: ::std::fmt::Debug;
/// Verify extrinsic at given block.
fn verify_transaction(&self, at: &BlockId<Self::Block>, uxt: &ExtrinsicFor<Self>) -> Result<Self::VEx, Self::Error>;
@@ -120,6 +121,20 @@ pub trait ChainApi: Send + Sync {
///
/// NOTE returning `InsertNew` here can lead to some transactions being accepted above pool limits.
fn should_replace(old: &VerifiedFor<Self>, new: &VerifiedFor<Self>) -> txpool::scoring::Choice;
/// Returns hash of the latest block in chain.
fn latest_hash(&self) -> HashOf<Self::Block>;
}
/// Returns block's hash type.
pub type HashOf<B> = <B as traits::Block>::Hash;
impl<T: ChainApi> listener::LatestHash for Arc<T> {
type Hash = HashOf<T::Block>;
fn latest_hash(&self) -> HashOf<T::Block> {
ChainApi::latest_hash(&**self)
}
}
pub struct Ready<'a, 'b, B: 'a + ChainApi> {
@@ -177,11 +192,11 @@ const POOL_TIME: time::Duration = time::Duration::from_secs(60 * 5);
/// Extrinsics pool.
pub struct Pool<B: ChainApi> {
api: B,
api: Arc<B>,
pool: RwLock<txpool::Pool<
VerifiedFor<B>,
ScoringAdapter<B>,
Listener<B::Hash>,
Listener<B::Hash, Arc<B>>,
>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,
rotator: PoolRotator<B::Hash>,
@@ -190,8 +205,9 @@ pub struct Pool<B: ChainApi> {
impl<B: ChainApi> Pool<B> {
/// Create a new transaction pool.
pub fn new(options: txpool::Options, api: B) -> Self {
let api = Arc::new(api);
Pool {
pool: RwLock::new(txpool::Pool::new(Listener::default(), ScoringAdapter::<B>(Default::default()), options)),
pool: RwLock::new(txpool::Pool::new(Listener::new(api.clone()), ScoringAdapter::<B>(Default::default()), options)),
import_notification_sinks: Default::default(),
api,
rotator: Default::default(),
@@ -253,7 +269,7 @@ impl<B: ChainApi> Pool<B> {
}
/// Import a single extrinsic and starts to watch their progress in the pool.
pub fn submit_and_watch(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Watcher<B::Hash>, B::Error> {
pub fn submit_and_watch(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<Watcher<B::Hash, HashOf<B::Block>>, B::Error> {
let xt = self.submit_at(at, Some(xt))?.pop().expect("One extrinsic passed; one result returned; qed");
Ok(self.pool.write().listener_mut().create_watcher(xt))
}
@@ -295,7 +311,7 @@ impl<B: ChainApi> Pool<B> {
/// Cull transactions from the queue and then compute the pending set.
pub fn cull_and_get_pending<F, T>(&self, at: &BlockId<B::Block>, f: F) -> Result<T, B::Error> where
F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash>>) -> T,
F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash, Arc<B>>>) -> T,
{
self.cull_from(at, None);
Ok(self.pending(at, f))
@@ -322,7 +338,7 @@ impl<B: ChainApi> Pool<B> {
/// Retrieve the pending set. Be careful to not leak the pool `ReadGuard` to prevent deadlocks.
pub fn pending<F, T>(&self, at: &BlockId<B::Block>, f: F) -> T where
F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash>>) -> T,
F: FnOnce(txpool::PendingIterator<VerifiedFor<B>, Ready<B>, ScoringAdapter<B>, Listener<B::Hash, Arc<B>>>) -> T,
{
let ready = self.ready(at);
f(self.pool.read().pending(ready))
@@ -389,7 +405,7 @@ impl<VEx> txpool::Ready<VEx> for AlwaysReady {
#[cfg(test)]
pub mod tests {
use txpool;
use super::{VerifiedFor, ExtrinsicFor};
use super::{VerifiedFor, ExtrinsicFor, HashOf};
use std::collections::HashMap;
use std::cmp::Ordering;
use {Pool, ChainApi, scoring, Readiness};
@@ -443,6 +459,10 @@ pub mod tests {
type Score = u64;
type Event = ();
fn latest_hash(&self) -> HashOf<Self::Block> {
1.into()
}
fn verify_transaction(&self, _at: &BlockId, uxt: &ExtrinsicFor<Self>) -> Result<Self::VEx, Self::Error> {
let hash = BlakeTwo256::hash(&uxt.encode());
let xt = uxt.clone().check()?;
+22 -13
View File
@@ -24,9 +24,9 @@ use futures::{
/// Possible extrinsic status events
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Status<H> {
pub enum Status<H, H2> {
/// Extrinsic has been finalised in block with given hash.
Finalised(H),
Finalised(H2),
/// Some state change (perhaps another extrinsic was included) rendered this extrinsic invalid.
Usurped(H),
/// The extrinsic has been broadcast to the given peers.
@@ -39,30 +39,39 @@ pub enum Status<H> {
///
/// Represents a stream of status updates for particular extrinsic.
#[derive(Debug)]
pub struct Watcher<H> {
receiver: mpsc::UnboundedReceiver<Status<H>>,
pub struct Watcher<H, H2> {
receiver: mpsc::UnboundedReceiver<Status<H, H2>>,
}
impl<H> Watcher<H> {
impl<H, H2> Watcher<H, H2> {
/// Pipe the notifications to given sink.
///
/// Make sure to drive the future to completion.
pub fn into_stream(self) -> impl Stream<Item=Status<H>, Error=()> {
pub fn into_stream(self) -> impl Stream<Item=Status<H, H2>, Error=()> {
// we can safely ignore the error here, `UnboundedReceiver` never fails.
self.receiver.map_err(|_| ())
}
}
/// Sender part of the watcher. Exposed only for testing purposes.
#[derive(Debug, Default)]
pub struct Sender<H> {
receivers: Vec<mpsc::UnboundedSender<Status<H>>>,
#[derive(Debug)]
pub struct Sender<H, H2> {
receivers: Vec<mpsc::UnboundedSender<Status<H, H2>>>,
finalised: bool,
}
impl<H: Clone> Sender<H> {
impl<H, H2> Default for Sender<H, H2> {
fn default() -> Self {
Sender {
receivers: Default::default(),
finalised: Default::default(),
}
}
}
impl<H: Clone, H2: Clone> Sender<H, H2> {
/// Add a new watcher to this sender object.
pub fn new_watcher(&mut self) -> Watcher<H> {
pub fn new_watcher(&mut self) -> Watcher<H, H2> {
let (tx, receiver) = mpsc::unbounded();
self.receivers.push(tx);
Watcher {
@@ -76,7 +85,7 @@ impl<H: Clone> Sender<H> {
}
/// Extrinsic has been finalised in block with given hash.
pub fn finalised(&mut self, hash: H) {
pub fn finalised(&mut self, hash: H2) {
self.send(Status::Finalised(hash));
self.finalised = true;
}
@@ -97,7 +106,7 @@ impl<H: Clone> Sender<H> {
self.finalised || self.receivers.is_empty()
}
fn send(&mut self, status: Status<H>) {
fn send(&mut self, status: Status<H, H2>) {
self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok())
}
}
@@ -185,6 +185,10 @@ impl<C: Client> transaction_pool::ChainApi for ChainApi<C> {
type Score = u64;
type Event = ();
fn latest_hash(&self) -> <C::Block as BlockT>::Hash {
self.api.block_number_to_hash(self.api.current_height()).expect("Latest block number always has a hash; qed")
}
fn verify_transaction(&self, _at: &BlockId<Self::Block>, xt: &ExtrinsicFor<Self>) -> Result<Self::VEx> {
let encoded = xt.encode();
let uxt = UncheckedExtrinsic::decode(&mut encoded.as_slice()).ok_or_else(|| ErrorKind::InvalidExtrinsicFormat)?;