mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 17:01:09 +00:00
rpc-v2/tx: Remove the broadcast event from transaction_submitAndWatch (#3321)
This PR backports the changes from the rpc-v2 spec: https://github.com/paritytech/json-rpc-interface-spec/pull/134 The `Broadcasted` event has been removed: - it is hard to enforce a `Dropped { broadcasted: bool }` event in cases of a load-balancer being placed in front of an RPC server - when the server exists, it is impossible to guarantee this field if the server did not previously send a `Broadcasted` event - the number of peers reported by this event does not guarantee that peers are unique - the same peer can disconnect and reconnect, increasing this metric number - the number of peers that receive this transaction offers no guarantee about the transaction being included in the chain at a later time cc @paritytech/subxt-team --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: James Wilson <james@jsdw.me>
This commit is contained in:
@@ -20,23 +20,6 @@
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// The transaction was broadcasted to a number of peers.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// The RPC does not guarantee that the peers have received the
|
||||
/// transaction.
|
||||
///
|
||||
/// When the number of peers is zero, the event guarantees that
|
||||
/// shutting down the local node will lead to the transaction
|
||||
/// not being included in the chain.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct TransactionBroadcasted {
|
||||
/// The number of peers the transaction was broadcasted to.
|
||||
pub num_peers: usize,
|
||||
}
|
||||
|
||||
/// The transaction was included in a block of the chain.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
@@ -59,9 +42,6 @@ pub struct TransactionError {
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct TransactionDropped {
|
||||
/// True if the transaction was broadcasted to other peers and
|
||||
/// may still be included in the block.
|
||||
pub broadcasted: bool,
|
||||
/// Reason of the event.
|
||||
pub error: String,
|
||||
}
|
||||
@@ -70,20 +50,17 @@ pub struct TransactionDropped {
|
||||
///
|
||||
/// The status events can be grouped based on their kinds as:
|
||||
///
|
||||
/// 1. Runtime validated the transaction:
|
||||
/// 1. Runtime validated the transaction and it entered the pool:
|
||||
/// - `Validated`
|
||||
///
|
||||
/// 2. Inside the `Ready` queue:
|
||||
/// - `Broadcast`
|
||||
///
|
||||
/// 3. Leaving the pool:
|
||||
/// 2. Leaving the pool:
|
||||
/// - `BestChainBlockIncluded`
|
||||
/// - `Invalid`
|
||||
///
|
||||
/// 4. Block finalized:
|
||||
/// 3. Block finalized:
|
||||
/// - `Finalized`
|
||||
///
|
||||
/// 5. At any time:
|
||||
/// 4. At any time:
|
||||
/// - `Dropped`
|
||||
/// - `Error`
|
||||
///
|
||||
@@ -101,8 +78,6 @@ pub struct TransactionDropped {
|
||||
pub enum TransactionEvent<Hash> {
|
||||
/// The transaction was validated by the runtime.
|
||||
Validated,
|
||||
/// The transaction was broadcasted to a number of peers.
|
||||
Broadcasted(TransactionBroadcasted),
|
||||
/// The transaction was included in a best block of the chain.
|
||||
///
|
||||
/// # Note
|
||||
@@ -159,7 +134,6 @@ enum TransactionEventBlockIR<Hash> {
|
||||
#[serde(tag = "event")]
|
||||
enum TransactionEventNonBlockIR {
|
||||
Validated,
|
||||
Broadcasted(TransactionBroadcasted),
|
||||
Error(TransactionError),
|
||||
Invalid(TransactionError),
|
||||
Dropped(TransactionDropped),
|
||||
@@ -186,8 +160,6 @@ impl<Hash> From<TransactionEvent<Hash>> for TransactionEventIR<Hash> {
|
||||
match value {
|
||||
TransactionEvent::Validated =>
|
||||
TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Validated),
|
||||
TransactionEvent::Broadcasted(event) =>
|
||||
TransactionEventIR::NonBlock(TransactionEventNonBlockIR::Broadcasted(event)),
|
||||
TransactionEvent::BestChainBlockIncluded(event) =>
|
||||
TransactionEventIR::Block(TransactionEventBlockIR::BestChainBlockIncluded(event)),
|
||||
TransactionEvent::Finalized(event) =>
|
||||
@@ -207,8 +179,6 @@ impl<Hash> From<TransactionEventIR<Hash>> for TransactionEvent<Hash> {
|
||||
match value {
|
||||
TransactionEventIR::NonBlock(status) => match status {
|
||||
TransactionEventNonBlockIR::Validated => TransactionEvent::Validated,
|
||||
TransactionEventNonBlockIR::Broadcasted(event) =>
|
||||
TransactionEvent::Broadcasted(event),
|
||||
TransactionEventNonBlockIR::Error(event) => TransactionEvent::Error(event),
|
||||
TransactionEventNonBlockIR::Invalid(event) => TransactionEvent::Invalid(event),
|
||||
TransactionEventNonBlockIR::Dropped(event) => TransactionEvent::Dropped(event),
|
||||
@@ -239,19 +209,6 @@ mod tests {
|
||||
assert_eq!(event_dec, event);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn broadcasted_event() {
|
||||
let event: TransactionEvent<()> =
|
||||
TransactionEvent::Broadcasted(TransactionBroadcasted { num_peers: 2 });
|
||||
let ser = serde_json::to_string(&event).unwrap();
|
||||
|
||||
let exp = r#"{"event":"broadcasted","numPeers":2}"#;
|
||||
assert_eq!(ser, exp);
|
||||
|
||||
let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap();
|
||||
assert_eq!(event_dec, event);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn best_chain_event() {
|
||||
let event: TransactionEvent<()> = TransactionEvent::BestChainBlockIncluded(None);
|
||||
@@ -320,13 +277,11 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn dropped_event() {
|
||||
let event: TransactionEvent<()> = TransactionEvent::Dropped(TransactionDropped {
|
||||
broadcasted: true,
|
||||
error: "abc".to_string(),
|
||||
});
|
||||
let event: TransactionEvent<()> =
|
||||
TransactionEvent::Dropped(TransactionDropped { error: "abc".to_string() });
|
||||
let ser = serde_json::to_string(&event).unwrap();
|
||||
|
||||
let exp = r#"{"event":"dropped","broadcasted":true,"error":"abc"}"#;
|
||||
let exp = r#"{"event":"dropped","error":"abc"}"#;
|
||||
assert_eq!(ser, exp);
|
||||
|
||||
let event_dec: TransactionEvent<()> = serde_json::from_str(exp).unwrap();
|
||||
|
||||
@@ -35,9 +35,6 @@ pub mod transaction;
|
||||
pub mod transaction_broadcast;
|
||||
|
||||
pub use api::{TransactionApiServer, TransactionBroadcastApiServer};
|
||||
pub use event::{
|
||||
TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError,
|
||||
TransactionEvent,
|
||||
};
|
||||
pub use event::{TransactionBlock, TransactionDropped, TransactionError, TransactionEvent};
|
||||
pub use transaction::Transaction;
|
||||
pub use transaction_broadcast::TransactionBroadcast;
|
||||
|
||||
@@ -22,10 +22,7 @@ use crate::{
|
||||
transaction::{
|
||||
api::TransactionApiServer,
|
||||
error::Error,
|
||||
event::{
|
||||
TransactionBlock, TransactionBroadcasted, TransactionDropped, TransactionError,
|
||||
TransactionEvent,
|
||||
},
|
||||
event::{TransactionBlock, TransactionDropped, TransactionError, TransactionEvent},
|
||||
},
|
||||
SubscriptionTaskExecutor,
|
||||
};
|
||||
@@ -113,9 +110,7 @@ where
|
||||
|
||||
match submit.await {
|
||||
Ok(stream) => {
|
||||
let mut state = TransactionState::new();
|
||||
let stream =
|
||||
stream.filter_map(move |event| async move { state.handle_event(event) });
|
||||
let stream = stream.filter_map(move |event| async move { handle_event(event) });
|
||||
pipe_from_stream(pending, stream.boxed()).await;
|
||||
},
|
||||
Err(err) => {
|
||||
@@ -131,66 +126,34 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// The transaction's state that needs to be preserved between
|
||||
/// multiple events generated by the transaction-pool.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// In the future, the RPC server can submit only the last event when multiple
|
||||
/// identical events happen in a row.
|
||||
#[derive(Clone, Copy)]
|
||||
struct TransactionState {
|
||||
/// True if the transaction was previously broadcasted.
|
||||
broadcasted: bool,
|
||||
}
|
||||
|
||||
impl TransactionState {
|
||||
/// Construct a new [`TransactionState`].
|
||||
pub fn new() -> Self {
|
||||
TransactionState { broadcasted: false }
|
||||
}
|
||||
|
||||
/// Handle events generated by the transaction-pool and convert them
|
||||
/// to the new API expected state.
|
||||
#[inline]
|
||||
pub fn handle_event<Hash: Clone, BlockHash: Clone>(
|
||||
&mut self,
|
||||
event: TransactionStatus<Hash, BlockHash>,
|
||||
) -> Option<TransactionEvent<BlockHash>> {
|
||||
match event {
|
||||
TransactionStatus::Ready | TransactionStatus::Future =>
|
||||
Some(TransactionEvent::<BlockHash>::Validated),
|
||||
TransactionStatus::Broadcast(peers) => {
|
||||
// Set the broadcasted flag once if we submitted the transaction to
|
||||
// at least one peer.
|
||||
self.broadcasted = self.broadcasted || !peers.is_empty();
|
||||
|
||||
Some(TransactionEvent::Broadcasted(TransactionBroadcasted {
|
||||
num_peers: peers.len(),
|
||||
}))
|
||||
},
|
||||
TransactionStatus::InBlock((hash, index)) =>
|
||||
Some(TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock {
|
||||
hash,
|
||||
index,
|
||||
}))),
|
||||
TransactionStatus::Retracted(_) => Some(TransactionEvent::BestChainBlockIncluded(None)),
|
||||
TransactionStatus::FinalityTimeout(_) =>
|
||||
Some(TransactionEvent::Dropped(TransactionDropped {
|
||||
broadcasted: self.broadcasted,
|
||||
error: "Maximum number of finality watchers has been reached".into(),
|
||||
})),
|
||||
TransactionStatus::Finalized((hash, index)) =>
|
||||
Some(TransactionEvent::Finalized(TransactionBlock { hash, index })),
|
||||
TransactionStatus::Usurped(_) => Some(TransactionEvent::Invalid(TransactionError {
|
||||
error: "Extrinsic was rendered invalid by another extrinsic".into(),
|
||||
/// Handle events generated by the transaction-pool and convert them
|
||||
/// to the new API expected state.
|
||||
#[inline]
|
||||
pub fn handle_event<Hash: Clone, BlockHash: Clone>(
|
||||
event: TransactionStatus<Hash, BlockHash>,
|
||||
) -> Option<TransactionEvent<BlockHash>> {
|
||||
match event {
|
||||
TransactionStatus::Ready | TransactionStatus::Future =>
|
||||
Some(TransactionEvent::<BlockHash>::Validated),
|
||||
TransactionStatus::InBlock((hash, index)) =>
|
||||
Some(TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock { hash, index }))),
|
||||
TransactionStatus::Retracted(_) => Some(TransactionEvent::BestChainBlockIncluded(None)),
|
||||
TransactionStatus::FinalityTimeout(_) =>
|
||||
Some(TransactionEvent::Dropped(TransactionDropped {
|
||||
error: "Maximum number of finality watchers has been reached".into(),
|
||||
})),
|
||||
TransactionStatus::Dropped => Some(TransactionEvent::Invalid(TransactionError {
|
||||
error: "Extrinsic dropped from the pool due to exceeding limits".into(),
|
||||
})),
|
||||
TransactionStatus::Invalid => Some(TransactionEvent::Invalid(TransactionError {
|
||||
error: "Extrinsic marked as invalid".into(),
|
||||
})),
|
||||
}
|
||||
TransactionStatus::Finalized((hash, index)) =>
|
||||
Some(TransactionEvent::Finalized(TransactionBlock { hash, index })),
|
||||
TransactionStatus::Usurped(_) => Some(TransactionEvent::Invalid(TransactionError {
|
||||
error: "Extrinsic was rendered invalid by another extrinsic".into(),
|
||||
})),
|
||||
TransactionStatus::Dropped => Some(TransactionEvent::Invalid(TransactionError {
|
||||
error: "Extrinsic dropped from the pool due to exceeding limits".into(),
|
||||
})),
|
||||
TransactionStatus::Invalid => Some(TransactionEvent::Invalid(TransactionError {
|
||||
error: "Extrinsic marked as invalid".into(),
|
||||
})),
|
||||
// These are the events that are not supported by the new API.
|
||||
TransactionStatus::Broadcast(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user