rpc: Implement transaction RPC API (#12328)

* rpc/tx: Add transaction structures for serialization

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tx: Add public facing `TransactionEvent`

To circumvent the fact that serde does not allow mixing
`#[serde(tag = "event")]` with
`#[serde(tag = "event", content = "block")]`
the public facing subscription structure is serialized
and deserialized to an intermmediate representation.

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tx: Add trait for the `transaction` API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tx: Convert RPC errors to transaction events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tx: Implement `transaction` RPC methods

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tx-pool: Propagate tx index to events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tx-pool: Adjust testing to reflect tx index in events

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tx: Convert tx-pool events for the new RPC spec

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tx: Convert tx-pool `FinalityTimeout` event to `Dropped`

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* service: Enable the `transaction` API

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tx: Add tests for tx event encoding and decoding

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tx: Add indentation for subscriptions

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tx: Fix documentation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tx: Serialize usize to hex

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tx-pool: Rename closure parameters

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* service: Separate RPC spec versions

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tx: Use `H256` for testing block's hash

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tx: Serialize numbers as string

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tx-pool: Backward compatibility with RPC v1

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update client/rpc-spec-v2/src/transaction/transaction.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* rpc/tx: Remove comment about serde clone

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* rpc/tx: Use RPC custom error code for invalid tx format

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update client/rpc-spec-v2/src/transaction/event.rs

Co-authored-by: James Wilson <james@jsdw.me>

* rpc/tx: Adjust internal structures for serialization/deserialization

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
Co-authored-by: James Wilson <james@jsdw.me>
This commit is contained in:
Alexandru Vasile
2022-10-11 11:49:12 +03:00
committed by GitHub
parent 896a9fc7cb
commit 5f18aaadcd
16 changed files with 873 additions and 36 deletions
@@ -0,0 +1,37 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! API trait for transactions.
use crate::transaction::event::TransactionEvent;
use jsonrpsee::proc_macros::rpc;
use sp_core::Bytes;
#[rpc(client, server)]
pub trait TransactionApi<Hash: Clone> {
/// Submit an extrinsic to watch.
///
/// See [`TransactionEvent`](crate::transaction::event::TransactionEvent) for details on
/// transaction life cycle.
#[subscription(
name = "transaction_unstable_submitAndWatch" => "transaction_unstable_submitExtrinsic",
unsubscribe = "transaction_unstable_unwatch",
item = TransactionEvent<Hash>,
)]
fn submit_and_watch(&self, bytes: Bytes);
}
@@ -0,0 +1,100 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Transaction RPC errors.
//!
//! Errors are interpreted as transaction events for subscriptions.
use crate::transaction::event::{TransactionError, TransactionEvent};
use sc_transaction_pool_api::error::Error as PoolError;
use sp_runtime::transaction_validity::InvalidTransaction;
/// Transaction RPC errors.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Transaction pool error.
#[error("Transaction pool error: {}", .0)]
Pool(#[from] PoolError),
/// Verification error.
#[error("Extrinsic verification error: {}", .0)]
Verification(Box<dyn std::error::Error + Send + Sync>),
}
impl<Hash> From<Error> for TransactionEvent<Hash> {
fn from(e: Error) -> Self {
match e {
Error::Verification(e) => TransactionEvent::Invalid(TransactionError {
error: format!("Verification error: {}", e),
}),
Error::Pool(PoolError::InvalidTransaction(InvalidTransaction::Custom(e))) =>
TransactionEvent::Invalid(TransactionError {
error: format!("Invalid transaction with custom error: {}", e),
}),
Error::Pool(PoolError::InvalidTransaction(e)) => {
let msg: &str = e.into();
TransactionEvent::Invalid(TransactionError {
error: format!("Invalid transaction: {}", msg),
})
},
Error::Pool(PoolError::UnknownTransaction(e)) => {
let msg: &str = e.into();
TransactionEvent::Invalid(TransactionError {
error: format!("Unknown transaction validity: {}", msg),
})
},
Error::Pool(PoolError::TemporarilyBanned) =>
TransactionEvent::Invalid(TransactionError {
error: "Transaction is temporarily banned".into(),
}),
Error::Pool(PoolError::AlreadyImported(_)) =>
TransactionEvent::Invalid(TransactionError {
error: "Transaction is already imported".into(),
}),
Error::Pool(PoolError::TooLowPriority { old, new }) =>
TransactionEvent::Invalid(TransactionError {
error: format!(
"The priority of the transactin is too low (pool {} > current {})",
old, new
),
}),
Error::Pool(PoolError::CycleDetected) => TransactionEvent::Invalid(TransactionError {
error: "The transaction contains a cyclic dependency".into(),
}),
Error::Pool(PoolError::ImmediatelyDropped) =>
TransactionEvent::Invalid(TransactionError {
error: "The transaction could not enter the pool because of the limit".into(),
}),
Error::Pool(PoolError::Unactionable) => TransactionEvent::Invalid(TransactionError {
error: "Transaction cannot be propagated and the local node does not author blocks"
.into(),
}),
Error::Pool(PoolError::NoTagsProvided) => TransactionEvent::Invalid(TransactionError {
error: "Transaction does not provide any tags, so the pool cannot identify it"
.into(),
}),
Error::Pool(PoolError::InvalidBlockId(_)) =>
TransactionEvent::Invalid(TransactionError {
error: "The provided block ID is not valid".into(),
}),
Error::Pool(PoolError::RejectedFutureTransaction) =>
TransactionEvent::Invalid(TransactionError {
error: "The pool is not accepting future transactions".into(),
}),
}
}
}
@@ -0,0 +1,353 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! The transaction's event returned as json compatible object.
use serde::{Deserialize, Serialize};
/// The transaction was broadcasted to a number of peers.
///
/// # Note
///
/// The RPC does not guarantee that the peers have received the
/// transaction.
///
/// When the number of peers is zero, the event guarantees that
/// shutting down the local node will lead to the transaction
/// not being included in the chain.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TransactionBroadcasted {
/// The number of peers the transaction was broadcasted to.
#[serde(with = "as_string")]
pub num_peers: usize,
}
/// The transaction was included in a block of the chain.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TransactionBlock<Hash> {
/// The hash of the block the transaction was included into.
pub hash: Hash,
/// The index (zero-based) of the transaction within the body of the block.
#[serde(with = "as_string")]
pub index: usize,
}
/// The transaction could not be processed due to an error.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TransactionError {
/// Reason of the error.
pub error: String,
}
/// The transaction was dropped because of exceeding limits.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TransactionDropped {
/// True if the transaction was broadcasted to other peers and
/// may still be included in the block.
pub broadcasted: bool,
/// Reason of the event.
pub error: String,
}
/// Possible transaction status events.
///
/// The status events can be grouped based on their kinds as:
///
/// 1. Runtime validated the transaction:
/// - `Validated`
///
/// 2. Inside the `Ready` queue:
/// - `Broadcast`
///
/// 3. Leaving the pool:
/// - `BestChainBlockIncluded`
/// - `Invalid`
///
/// 4. Block finalized:
/// - `Finalized`
///
/// 5. At any time:
/// - `Dropped`
/// - `Error`
///
/// The subscription's stream is considered finished whenever the following events are
/// received: `Finalized`, `Error`, `Invalid` or `Dropped`. However, the user is allowed
/// to unsubscribe at any moment.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
// We need to manually specify the trait bounds for the `Hash` trait to ensure `into` and
// `from` still work.
#[serde(bound(
serialize = "Hash: Serialize + Clone",
deserialize = "Hash: Deserialize<'de> + Clone"
))]
#[serde(into = "TransactionEventIR<Hash>", from = "TransactionEventIR<Hash>")]
pub enum TransactionEvent<Hash> {
/// The transaction was validated by the runtime.
Validated,
/// The transaction was broadcasted to a number of peers.
Broadcasted(TransactionBroadcasted),
/// The transaction was included in a best block of the chain.
///
/// # Note
///
/// This may contain `None` if the block is no longer a best
/// block of the chain.
BestChainBlockIncluded(Option<TransactionBlock<Hash>>),
/// The transaction was included in a finalized block.
Finalized(TransactionBlock<Hash>),
/// The transaction could not be processed due to an error.
Error(TransactionError),
/// The transaction is marked as invalid.
Invalid(TransactionError),
/// The client was not capable of keeping track of this transaction.
Dropped(TransactionDropped),
}
/// Intermediate representation (IR) for the transaction events
/// that handles block events only.
///
/// The block events require a JSON compatible interpretation similar to:
///
/// ```json
/// { event: "EVENT", block: { hash: "0xFF", index: 0 } }
/// ```
///
/// This IR is introduced to circumvent that the block events need to
/// be serialized/deserialized with "tag" and "content", while other
/// events only require "tag".
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "event", content = "block")]
enum TransactionEventBlockIR<Hash> {
/// The transaction was included in the best block of the chain.
BestChainBlockIncluded(Option<TransactionBlock<Hash>>),
/// The transaction was included in a finalized block of the chain.
Finalized(TransactionBlock<Hash>),
}
/// Intermediate representation (IR) for the transaction events
/// that handles non-block events only.
///
/// The non-block events require a JSON compatible interpretation similar to:
///
/// ```json
/// { event: "EVENT", num_peers: 0 }
/// ```
///
/// This IR is introduced to circumvent that the block events need to
/// be serialized/deserialized with "tag" and "content", while other
/// events only require "tag".
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "event")]
enum TransactionEventNonBlockIR {
Validated,
Broadcasted(TransactionBroadcasted),
Error(TransactionError),
Invalid(TransactionError),
Dropped(TransactionDropped),
}
/// Intermediate representation (IR) used for serialization/deserialization of the
/// [`TransactionEvent`] in a JSON compatible format.
///
/// Serde cannot mix `#[serde(tag = "event")]` with `#[serde(tag = "event", content = "block")]`
/// for specific enum variants. Therefore, this IR is introduced to circumvent this
/// restriction, while exposing a simplified [`TransactionEvent`] for users of the
/// rust ecosystem.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(bound(serialize = "Hash: Serialize", deserialize = "Hash: Deserialize<'de>"))]
#[serde(rename_all = "camelCase")]
#[serde(untagged)]
enum TransactionEventIR<Hash> {
Block(TransactionEventBlockIR<Hash>),
NonBlock(TransactionEventNonBlockIR),
}
impl<Hash> From<TransactionEvent<Hash>> for TransactionEventIR<Hash> {
fn from(value: TransactionEvent<Hash>) -> Self {
match value {
TransactionEvent::Validated =>
TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Validated),
TransactionEvent::Broadcasted(event) =>
TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Broadcasted(event)),
TransactionEvent::BestChainBlockIncluded(event) =>
TransactionEventIR::Block(TransactionEventBlockIR::BestChainBlockIncluded(event)),
TransactionEvent::Finalized(event) =>
TransactionEventIR::Block(TransactionEventBlockIR::Finalized(event)),
TransactionEvent::Error(event) =>
TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Error(event)),
TransactionEvent::Invalid(event) =>
TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Invalid(event)),
TransactionEvent::Dropped(event) =>
TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Dropped(event)),
}
}
}
impl<Hash> From<TransactionEventIR<Hash>> for TransactionEvent<Hash> {
fn from(value: TransactionEventIR<Hash>) -> Self {
match value {
TransactionEventIR::NonBlock(status) => match status {
TransactionEventNonBlockIR::Validated => TransactionEvent::Validated,
TransactionEventNonBlockIR::Broadcasted(event) =>
TransactionEvent::Broadcasted(event),
TransactionEventNonBlockIR::Error(event) => TransactionEvent::Error(event),
TransactionEventNonBlockIR::Invalid(event) => TransactionEvent::Invalid(event),
TransactionEventNonBlockIR::Dropped(event) => TransactionEvent::Dropped(event),
},
TransactionEventIR::Block(block) => match block {
TransactionEventBlockIR::Finalized(event) => TransactionEvent::Finalized(event),
TransactionEventBlockIR::BestChainBlockIncluded(event) =>
TransactionEvent::BestChainBlockIncluded(event),
},
}
}
}
/// Serialize and deserialize helper as string.
mod as_string {
use super::*;
use serde::{Deserializer, Serializer};
pub fn serialize<S: Serializer>(data: &usize, serializer: S) -> Result<S::Ok, S::Error> {
data.to_string().serialize(serializer)
}
pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<usize, D::Error> {
String::deserialize(deserializer)?
.parse()
.map_err(|e| serde::de::Error::custom(format!("Parsing failed: {}", e)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use sp_core::H256;
#[test]
fn validated_event() {
let event: TransactionEvent<()> = TransactionEvent::Validated;
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"validated"}"#;
assert_eq!(ser, exp);
let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn broadcasted_event() {
let event: TransactionEvent<()> =
TransactionEvent::Broadcasted(TransactionBroadcasted { num_peers: 2 });
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"broadcasted","numPeers":"2"}"#;
assert_eq!(ser, exp);
let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn best_chain_event() {
let event: TransactionEvent<()> = TransactionEvent::BestChainBlockIncluded(None);
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"bestChainBlockIncluded","block":null}"#;
assert_eq!(ser, exp);
let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
let event: TransactionEvent<H256> =
TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock {
hash: H256::from_low_u64_be(1),
index: 2,
}));
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"bestChainBlockIncluded","block":{"hash":"0x0000000000000000000000000000000000000000000000000000000000000001","index":"2"}}"#;
assert_eq!(ser, exp);
let event_dec: TransactionEvent<H256> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn finalized_event() {
let event: TransactionEvent<H256> = TransactionEvent::Finalized(TransactionBlock {
hash: H256::from_low_u64_be(1),
index: 10,
});
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"finalized","block":{"hash":"0x0000000000000000000000000000000000000000000000000000000000000001","index":"10"}}"#;
assert_eq!(ser, exp);
let event_dec: TransactionEvent<H256> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn error_event() {
let event: TransactionEvent<()> =
TransactionEvent::Error(TransactionError { error: "abc".to_string() });
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"error","error":"abc"}"#;
assert_eq!(ser, exp);
let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn invalid_event() {
let event: TransactionEvent<()> =
TransactionEvent::Invalid(TransactionError { error: "abc".to_string() });
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"invalid","error":"abc"}"#;
assert_eq!(ser, exp);
let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
#[test]
fn dropped_event() {
let event: TransactionEvent<()> = TransactionEvent::Dropped(TransactionDropped {
broadcasted: true,
error: "abc".to_string(),
});
let ser = serde_json::to_string(&event).unwrap();
let exp = r#"{"event":"dropped","broadcasted":true,"error":"abc"}"#;
assert_eq!(ser, exp);
let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap();
assert_eq!(event_dec, event);
}
}
@@ -0,0 +1,38 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Substrate transaction API.
//!
//! The transaction methods allow submitting a transaction and subscribing to
//! its status updates generated by the chain.
//!
//! # Note
//!
//! Methods are prefixed by `transaction`.
pub mod api;
pub mod error;
pub mod event;
pub mod transaction;
pub use api::TransactionApiServer;
pub use event::{
TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError,
TransactionEvent,
};
pub use transaction::Transaction;
@@ -0,0 +1,208 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! API implementation for submitting transactions.
use crate::{
transaction::{
api::TransactionApiServer,
error::Error,
event::{
TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError,
TransactionEvent,
},
},
SubscriptionTaskExecutor,
};
use jsonrpsee::{
core::async_trait,
types::{
error::{CallError, ErrorObject},
SubscriptionResult,
},
SubscriptionSink,
};
use sc_transaction_pool_api::{
error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource,
TransactionStatus,
};
use std::sync::Arc;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_core::Bytes;
use sp_runtime::{generic, traits::Block as BlockT};
use codec::Decode;
use futures::{FutureExt, StreamExt, TryFutureExt};
/// An API for transaction RPC calls.
pub struct Transaction<Pool, Client> {
/// Substrate client.
client: Arc<Client>,
/// Transactions pool.
pool: Arc<Pool>,
/// Executor to spawn subscriptions.
executor: SubscriptionTaskExecutor,
}
impl<Pool, Client> Transaction<Pool, Client> {
/// Creates a new [`Transaction`].
pub fn new(client: Arc<Client>, pool: Arc<Pool>, executor: SubscriptionTaskExecutor) -> Self {
Transaction { client, pool, executor }
}
}
/// Currently we treat all RPC transactions as externals.
///
/// Possibly in the future we could allow opt-in for special treatment
/// of such transactions, so that the block authors can inject
/// some unique transactions via RPC and have them included in the pool.
const TX_SOURCE: TransactionSource = TransactionSource::External;
/// Extrinsic has an invalid format.
///
/// # Note
///
/// This is similar to the old `author` API error code.
const BAD_FORMAT: i32 = 1001;
#[async_trait]
impl<Pool, Client> TransactionApiServer<BlockHash<Pool>> for Transaction<Pool, Client>
where
Pool: TransactionPool + Sync + Send + 'static,
Pool::Hash: Unpin,
<Pool::Block as BlockT>::Hash: Unpin,
Client: HeaderBackend<Pool::Block> + ProvideRuntimeApi<Pool::Block> + Send + Sync + 'static,
{
fn submit_and_watch(&self, mut sink: SubscriptionSink, xt: Bytes) -> SubscriptionResult {
// This is the only place where the RPC server can return an error for this
// subscription. Other defects must be signaled as events to the sink.
let decoded_extrinsic = match TransactionFor::<Pool>::decode(&mut &xt[..]) {
Ok(decoded_extrinsic) => decoded_extrinsic,
Err(e) => {
let err = CallError::Custom(ErrorObject::owned(
BAD_FORMAT,
format!("Extrinsic has invalid format: {}", e),
None::<()>,
));
let _ = sink.reject(err);
return Ok(())
},
};
let best_block_hash = self.client.info().best_hash;
let submit = self
.pool
.submit_and_watch(
&generic::BlockId::hash(best_block_hash),
TX_SOURCE,
decoded_extrinsic,
)
.map_err(|e| {
e.into_pool_error()
.map(Error::from)
.unwrap_or_else(|e| Error::Verification(Box::new(e)))
});
let fut = async move {
match submit.await {
Ok(stream) => {
let mut state = TransactionState::new();
let stream =
stream.filter_map(|event| async move { state.handle_event(event) });
sink.pipe_from_stream(stream.boxed()).await;
},
Err(err) => {
// We have not created an `Watcher` for the tx. Make sure the
// error is still propagated as an event.
let event: TransactionEvent<<Pool::Block as BlockT>::Hash> = err.into();
sink.pipe_from_stream(futures::stream::once(async { event }).boxed()).await;
},
};
};
self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(())
}
}
/// The transaction's state that needs to be preserved between
/// multiple events generated by the transaction-pool.
///
/// # Note
///
/// In the future, the RPC server can submit only the last event when multiple
/// identical events happen in a row.
#[derive(Clone, Copy)]
struct TransactionState {
/// True if the transaction was previously broadcasted.
broadcasted: bool,
}
impl TransactionState {
/// Construct a new [`TransactionState`].
pub fn new() -> Self {
TransactionState { broadcasted: false }
}
/// Handle events generated by the transaction-pool and convert them
/// to the new API expected state.
#[inline]
pub fn handle_event<Hash: Clone, BlockHash: Clone>(
&mut self,
event: TransactionStatus<Hash, BlockHash>,
) -> Option<TransactionEvent<BlockHash>> {
match event {
TransactionStatus::Ready | TransactionStatus::Future =>
Some(TransactionEvent::<BlockHash>::Validated),
TransactionStatus::Broadcast(peers) => {
// Set the broadcasted flag once if we submitted the transaction to
// at least one peer.
self.broadcasted = self.broadcasted || !peers.is_empty();
Some(TransactionEvent::Broadcasted(TransactionBroadcasted {
num_peers: peers.len(),
}))
},
TransactionStatus::InBlock((hash, index)) =>
Some(TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock {
hash,
index,
}))),
TransactionStatus::Retracted(_) => Some(TransactionEvent::BestChainBlockIncluded(None)),
TransactionStatus::FinalityTimeout(_) =>
Some(TransactionEvent::Dropped(TransactionDropped {
broadcasted: self.broadcasted,
error: "Maximum number of finality watchers has been reached".into(),
})),
TransactionStatus::Finalized((hash, index)) =>
Some(TransactionEvent::Finalized(TransactionBlock { hash, index })),
TransactionStatus::Usurped(_) => Some(TransactionEvent::Invalid(TransactionError {
error: "Extrinsic was rendered invalid by another extrinsic".into(),
})),
TransactionStatus::Dropped => Some(TransactionEvent::Invalid(TransactionError {
error: "Extrinsic dropped from the pool due to exceeding limits".into(),
})),
TransactionStatus::Invalid => Some(TransactionEvent::Invalid(TransactionError {
error: "Extrinsic marked as invalid".into(),
})),
}
}
}