Revert "Watch existing extrinsics RPC (#3873)" (#3931)

This reverts commit d149c2f719.
This commit is contained in:
Gavin Wood
2019-10-27 13:19:06 +01:00
committed by GitHub
parent d149c2f719
commit d41a5ce00f
15 changed files with 73 additions and 194 deletions
+4 -4
View File
@@ -5,13 +5,13 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
jsonrpc-core = "14.0"
pubsub = { package = "jsonrpc-pubsub", version = "14.0" }
jsonrpc-core = "13.2.0"
pubsub = { package = "jsonrpc-pubsub", version = "13.2.0" }
log = "0.4.8"
serde = "1.0.101"
serde_json = "1.0.41"
sr-primitives = { path = "../sr-primitives" }
[target.'cfg(not(target_os = "unknown"))'.dependencies]
http = { package = "jsonrpc-http-server", version = "14.0" }
ws = { package = "jsonrpc-ws-server", version = "14.0" }
http = { package = "jsonrpc-http-server", version = "13.2.0" }
ws = { package = "jsonrpc-ws-server", version = "13.2.0" }
+2 -2
View File
@@ -9,10 +9,10 @@ api = { package = "substrate-rpc-api", path = "./api" }
client = { package = "substrate-client", path = "../client" }
codec = { package = "parity-scale-codec", version = "1.0.0" }
futures03 = { package = "futures-preview", version = "0.3.0-alpha.19", features = ["compat"] }
jsonrpc-pubsub = "14.0"
jsonrpc-pubsub = "13.1.0"
log = "0.4.8"
primitives = { package = "substrate-primitives", path = "../primitives" }
rpc = { package = "jsonrpc-core", version = "14.0" }
rpc = { package = "jsonrpc-core", version = "13.0.0" }
runtime_version = { package = "sr-version", path = "../sr-version" }
serde_json = "1.0.41"
session = { package = "substrate-session", path = "../session" }
+4 -4
View File
@@ -8,10 +8,10 @@ edition = "2018"
codec = { package = "parity-scale-codec", version = "1.0.0" }
derive_more = "0.15.0"
futures03 = { package = "futures-preview", version = "0.3.0-alpha.19", features = ["compat"] }
jsonrpc-core = "14.0"
jsonrpc-core-client = "14.0"
jsonrpc-derive = "14.0"
jsonrpc-pubsub = "14.0"
jsonrpc-core = "13.2.0"
jsonrpc-core-client = "13.2.0"
jsonrpc-derive = "13.2.0"
jsonrpc-pubsub = "13.2.0"
log = "0.4.8"
parking_lot = "0.9.0"
primitives = { package = "substrate-primitives", path = "../../primitives" }
+1 -13
View File
@@ -67,7 +67,7 @@ pub trait AuthorApi<Hash, BlockHash> {
subscribe,
name = "author_submitAndWatchExtrinsic"
)]
fn submit_and_watch_extrinsic(&self,
fn watch_extrinsic(&self,
metadata: Self::Metadata,
subscriber: Subscriber<Status<Hash, BlockHash>>,
bytes: Bytes
@@ -83,16 +83,4 @@ pub trait AuthorApi<Hash, BlockHash> {
metadata: Option<Self::Metadata>,
id: SubscriptionId
) -> Result<bool>;
/// Watch multiple extrinsics (own or from network)
#[pubsub(
subscription = "author_extrinsicUpdate",
subscribe,
name = "author_watchExtrinsic"
)]
fn watch_extrinsic(&self,
metadata: Self::Metadata,
subscriber: Subscriber<Status<Hash, BlockHash>>,
hash: Hash,
);
}
-17
View File
@@ -153,23 +153,6 @@ impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> whe
}
fn watch_extrinsic(&self,
_metadata: Self::Metadata,
subscriber: Subscriber<Status<ExHash<P>, BlockHash<P>>>,
hash: ExHash<P>,
) {
let watcher = self.pool.watch(hash).into_stream().map(|v| Ok::<_, ()>(Ok(v)));
let subscriptions = self.subscriptions.clone();
subscriptions.add(subscriber,
move |sink| {
sink.sink_map_err(|_| unimplemented!())
.send_all(Compat::new(watcher))
.map(|_| ())
}
);
}
fn submit_and_watch_extrinsic(&self,
_metadata: Self::Metadata,
subscriber: Subscriber<Status<ExHash<P>, BlockHash<P>>>,
xt: Bytes
+2 -49
View File
@@ -115,7 +115,7 @@ fn should_watch_extrinsic() {
let (subscriber, id_rx, data) = jsonrpc_pubsub::typed::Subscriber::new_test("test");
// when
p.submit_and_watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 0).encode().into());
p.watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 0).encode().into());
// then
assert_eq!(setup.runtime.block_on(id_rx), Ok(Ok(1.into())));
@@ -142,53 +142,6 @@ fn should_watch_extrinsic() {
);
}
#[test]
fn should_watch_existing_extrinsic() {
// Initial setup is 1 submitted extrinsic
let mut runtime = runtime::Runtime::new().unwrap();
let client = Arc::new(test_client::new());
let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone())));
let keystore = KeyStore::new();
let p = Author {
client,
pool: pool.clone(),
subscriptions: Subscriptions::new(Arc::new(runtime.executor())),
keystore: keystore.clone(),
};
let (subscriber, id_rx, data) = jsonrpc_pubsub::typed::Subscriber::new_test("test");
let xt = uxt(AccountKeyring::Alice, 0).encode();
let xt_hash: H256 = blake2_256(&xt).into();
p.submit_extrinsic(xt.into()).wait().expect("Failed to submit extrinsic");
// Then we track it
p.watch_extrinsic(Default::default(), subscriber, xt_hash.into());
assert_eq!(runtime.block_on(id_rx), Ok(Ok(1.into())));
// Add replacement
let replacement = {
let tx = Transfer {
amount: 5,
nonce: 0,
from: AccountKeyring::Alice.into(),
to: Default::default(),
};
tx.into_signed_tx()
}.encode();
let replacement_hash = blake2_256(&replacement);
AuthorApi::submit_extrinsic(&p, replacement.into()).wait().unwrap();
// And check if the tracked one received usurped event
assert_eq!(
runtime.block_on(data.into_future()).unwrap().0,
Some(format!(
r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":{{"usurped":"0x{}"}},"subscription":1}}}}"#,
HexDisplay::from(&replacement_hash))
)
);
}
#[test]
fn should_return_watch_validation_error() {
//given
@@ -198,7 +151,7 @@ fn should_return_watch_validation_error() {
let (subscriber, id_rx, _data) = jsonrpc_pubsub::typed::Subscriber::new_test("test");
// when
p.submit_and_watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 179).encode().into());
p.watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 179).encode().into());
// then
let res = setup.runtime.block_on(id_rx).unwrap();
@@ -165,17 +165,6 @@ impl<B: ChainApi> Pool<B> {
)
}
/// Watch existing transaction
///
/// Get notified when some existing transaction is finished verifying or gets finalized
/// in a new block.
pub fn watch(
&self,
hash: ExHash<B>,
) -> Watcher<ExHash<B>, BlockHash<B>> {
self.validated_pool.watch(hash)
}
/// Prunes ready transactions.
///
/// Used to clear the pool from transactions that were part of recently imported block.
@@ -794,6 +783,7 @@ mod tests {
// when
pool.validated_pool.remove_invalid(&[*watcher.hash()]);
// then
let mut stream = futures::executor::block_on_stream(watcher.into_stream());
assert_eq!(stream.next(), Some(watcher::Status::Ready));
@@ -917,37 +907,5 @@ mod tests {
assert_eq!(pool.status().ready, 1);
assert_eq!(pool.status().future, 0);
}
#[test]
fn should_watch_existing() {
let limit = Limit {
count: 1,
total_bytes: 1000,
};
let pool = Pool::new(Options {
ready: limit.clone(),
future: limit.clone(),
}, TestApi::default());
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 0,
});
let hash = block_on(pool.submit_one(&BlockId::Number(0), xt)).expect("Failed to submit");
assert_eq!(pool.status().ready, 1);
let watcher = pool.watch(hash);
block_on(pool.prune_tags(&BlockId::Number(2), vec![], vec![hash]))
.expect("Failed to prune tags");
let mut stream = futures::executor::block_on_stream(
watcher.into_stream()
);
assert_eq!(stream.next(), Some(watcher::Status::Finalized(H256::from_low_u64_be(2).into())));
assert_eq!(stream.next(), None);
}
}
}
@@ -171,7 +171,7 @@ impl<B: ChainApi> ValidatedPool<B> {
match tx {
ValidatedTransaction::Valid(tx) => {
let hash = self.api.hash_and_length(&tx.data).0;
let watcher = self.watch(hash);
let watcher = self.listener.write().create_watcher(hash);
self.submit(std::iter::once(ValidatedTransaction::Valid(tx)))
.pop()
.expect("One extrinsic passed; one result returned; qed")
@@ -182,11 +182,6 @@ impl<B: ChainApi> ValidatedPool<B> {
}
}
/// Watch some existing transaction with known hash.
pub fn watch(&self, hash: ExHash<B>) -> Watcher<ExHash<B>, BlockHash<B>> {
self.listener.write().create_watcher(hash)
}
/// For each extrinsic, returns tags that it provides (if known), or None (if it is unknown).
pub fn extrinsics_tags(&self, extrinsics: &[ExtrinsicFor<B>]) -> (Vec<ExHash<B>>, Vec<Option<Vec<Tag>>>) {
let hashes = extrinsics.iter().map(|extrinsic| self.api.hash_and_length(extrinsic).0).collect::<Vec<_>>();
@@ -130,6 +130,7 @@ impl<H: Clone, H2: Clone> Sender<H, H2> {
self.send(Status::Broadcast(peers))
}
/// Returns true if the are no more listeners for this extrinsic or it was finalized.
pub fn is_done(&self) -> bool {
self.finalized || self.receivers.is_empty()