mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-19 10:01:02 +00:00
subxt: Expose chainHeadFollow on the backend and test order of blocks
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
@@ -19,9 +19,13 @@ use std::collections::VecDeque;
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
use crate::backend::unstable::UnstableBlockRef;
|
||||||
|
|
||||||
// Expose the RPC methods.
|
// Expose the RPC methods.
|
||||||
pub use rpc_methods::LegacyRpcMethods;
|
pub use rpc_methods::LegacyRpcMethods;
|
||||||
|
|
||||||
|
use super::unstable::rpc_methods::FollowEvent;
|
||||||
|
|
||||||
/// The legacy backend.
|
/// The legacy backend.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct LegacyBackend<T> {
|
pub struct LegacyBackend<T> {
|
||||||
@@ -41,6 +45,13 @@ impl<T: Config> super::sealed::Sealed for LegacyBackend<T> {}
|
|||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
||||||
|
/// ChainHead follow
|
||||||
|
async fn chain_head_follow(
|
||||||
|
&self,
|
||||||
|
) -> Result<StreamOfResults<FollowEvent<UnstableBlockRef<T::Hash>>>, Error> {
|
||||||
|
panic!("Unimplemented")
|
||||||
|
}
|
||||||
|
|
||||||
async fn storage_fetch_values(
|
async fn storage_fetch_values(
|
||||||
&self,
|
&self,
|
||||||
keys: Vec<Vec<u8>>,
|
keys: Vec<Vec<u8>>,
|
||||||
|
|||||||
@@ -19,6 +19,10 @@ use futures::{Stream, StreamExt};
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use crate::backend::unstable::UnstableBlockRef;
|
||||||
|
|
||||||
|
use self::unstable::rpc_methods::FollowEvent;
|
||||||
|
|
||||||
/// Prevent the backend trait being implemented externally.
|
/// Prevent the backend trait being implemented externally.
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub(crate) mod sealed {
|
pub(crate) mod sealed {
|
||||||
@@ -99,6 +103,11 @@ pub trait Backend<T: Config>: sealed::Sealed + Send + Sync + 'static {
|
|||||||
call_parameters: Option<&[u8]>,
|
call_parameters: Option<&[u8]>,
|
||||||
at: T::Hash,
|
at: T::Hash,
|
||||||
) -> Result<Vec<u8>, Error>;
|
) -> Result<Vec<u8>, Error>;
|
||||||
|
|
||||||
|
/// ChainHead follow
|
||||||
|
async fn chain_head_follow(
|
||||||
|
&self,
|
||||||
|
) -> Result<StreamOfResults<FollowEvent<UnstableBlockRef<T::Hash>>>, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// helpeful utility methods derived from those provided on [`Backend`]
|
/// helpeful utility methods derived from those provided on [`Backend`]
|
||||||
|
|||||||
@@ -16,6 +16,8 @@ mod follow_stream_driver;
|
|||||||
mod follow_stream_unpin;
|
mod follow_stream_unpin;
|
||||||
mod storage_items;
|
mod storage_items;
|
||||||
|
|
||||||
|
pub use follow_stream_unpin::BlockRef as UnstableBlockRef;
|
||||||
|
|
||||||
pub mod rpc_methods;
|
pub mod rpc_methods;
|
||||||
|
|
||||||
use self::rpc_methods::{
|
use self::rpc_methods::{
|
||||||
@@ -25,6 +27,7 @@ use crate::backend::{
|
|||||||
rpc::RpcClient, Backend, BlockRef, BlockRefT, RuntimeVersion, StorageResponse, StreamOf,
|
rpc::RpcClient, Backend, BlockRef, BlockRefT, RuntimeVersion, StorageResponse, StreamOf,
|
||||||
StreamOfResults, TransactionStatus,
|
StreamOfResults, TransactionStatus,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::config::BlockHash;
|
use crate::config::BlockHash;
|
||||||
use crate::error::{Error, RpcError};
|
use crate::error::{Error, RpcError};
|
||||||
use crate::Config;
|
use crate::Config;
|
||||||
@@ -332,6 +335,18 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
|
|||||||
next_ref.ok_or_else(|| RpcError::SubscriptionDropped.into())
|
next_ref.ok_or_else(|| RpcError::SubscriptionDropped.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn chain_head_follow(
|
||||||
|
&self,
|
||||||
|
) -> Result<StreamOfResults<FollowEvent<UnstableBlockRef<T::Hash>>>, Error> {
|
||||||
|
let stream = self
|
||||||
|
.follow_handle
|
||||||
|
.subscribe()
|
||||||
|
.events()
|
||||||
|
.map(|event| Ok(event));
|
||||||
|
|
||||||
|
Ok(StreamOf(Box::pin(stream)))
|
||||||
|
}
|
||||||
|
|
||||||
async fn current_runtime_version(&self) -> Result<RuntimeVersion, Error> {
|
async fn current_runtime_version(&self) -> Result<RuntimeVersion, Error> {
|
||||||
// Just start a stream of version infos, and return the first value we get from it.
|
// Just start a stream of version infos, and return the first value we get from it.
|
||||||
let runtime_version = self.stream_runtime_version().await?.next().await;
|
let runtime_version = self.stream_runtime_version().await?.next().await;
|
||||||
|
|||||||
@@ -298,15 +298,14 @@ async fn chainhead_unstable_follow_order_of_blocks() {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut tracked_blocks = HashMap::new();
|
let mut tracked_blocks = HashMap::new();
|
||||||
|
|
||||||
println!("Initialized finalized={:?}", finalized);
|
|
||||||
tracked_blocks.insert(finalized, true);
|
tracked_blocks.insert(finalized, true);
|
||||||
|
|
||||||
|
let mut events = Vec::with_capacity(100);
|
||||||
|
|
||||||
let mut num_blocks = 0;
|
let mut num_blocks = 0;
|
||||||
while let Some(event) = blocks.next().await {
|
while let Some(event) = blocks.next().await {
|
||||||
let event = event.unwrap();
|
let event = event.unwrap();
|
||||||
|
events.push(event.clone());
|
||||||
println!("event = {:?}\n", event);
|
|
||||||
|
|
||||||
match event {
|
match event {
|
||||||
FollowEvent::Initialized(_) => panic!("Unexpected"),
|
FollowEvent::Initialized(_) => panic!("Unexpected"),
|
||||||
@@ -316,14 +315,14 @@ async fn chainhead_unstable_follow_order_of_blocks() {
|
|||||||
|
|
||||||
if tracked_blocks.contains_key(&hash) {
|
if tracked_blocks.contains_key(&hash) {
|
||||||
panic!(
|
panic!(
|
||||||
"NewBlock block={:?} parent={:?} already tracked tracked={:#?}",
|
"NewBlock block={:?} parent={:?} already tracked tracked={:#?}\n events={:#?}",
|
||||||
hash, parent, tracked_blocks
|
hash, parent, tracked_blocks, events,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if !tracked_blocks.contains_key(&parent) {
|
if !tracked_blocks.contains_key(&parent) {
|
||||||
panic!(
|
panic!(
|
||||||
"NewBlock PARENT NOT TRACKED block={:?} parent={:?} tracked={:#?}",
|
"NewBlock PARENT NOT TRACKED block={:?} parent={:?} tracked={:#?}, events={:#?}",
|
||||||
hash, parent, tracked_blocks
|
hash, parent, tracked_blocks, events
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -334,8 +333,8 @@ async fn chainhead_unstable_follow_order_of_blocks() {
|
|||||||
|
|
||||||
if !tracked_blocks.contains_key(&hash) {
|
if !tracked_blocks.contains_key(&hash) {
|
||||||
panic!(
|
panic!(
|
||||||
"BestBlockChanged not tracked block={:?} tracked={:#?}",
|
"BestBlockChanged not tracked block={:?} tracked={:#?} events={:#?}",
|
||||||
hash, tracked_blocks
|
hash, tracked_blocks, events,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -345,8 +344,8 @@ async fn chainhead_unstable_follow_order_of_blocks() {
|
|||||||
for hash in hashes {
|
for hash in hashes {
|
||||||
if !tracked_blocks.contains_key(&hash) {
|
if !tracked_blocks.contains_key(&hash) {
|
||||||
panic!(
|
panic!(
|
||||||
"Finalized block={:?} not tracked tracked={:#?}",
|
"Finalized block={:?} not tracked tracked={:#?} events={:#?}",
|
||||||
hash, tracked_blocks
|
hash, tracked_blocks, events,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -354,7 +353,88 @@ async fn chainhead_unstable_follow_order_of_blocks() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
num_blocks += 1;
|
num_blocks += 1;
|
||||||
if num_blocks > 10 {
|
if num_blocks > 40 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => continue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn unstable_backend_follow_order_of_blocks() {
|
||||||
|
let ctx = test_context().await;
|
||||||
|
|
||||||
|
let api = ctx.unstable_client().await;
|
||||||
|
let backend = api.backend();
|
||||||
|
let mut blocks = backend.chain_head_follow().await.unwrap();
|
||||||
|
|
||||||
|
let event = blocks.next().await.unwrap().unwrap();
|
||||||
|
|
||||||
|
let finalized = match event {
|
||||||
|
FollowEvent::Initialized(init) => init.finalized_block_hash,
|
||||||
|
_ => panic!("Unexpected event"),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut tracked_blocks = HashMap::new();
|
||||||
|
|
||||||
|
println!("Initialized finalized={:?}", finalized);
|
||||||
|
tracked_blocks.insert(finalized.hash(), true);
|
||||||
|
|
||||||
|
let mut events = Vec::with_capacity(100);
|
||||||
|
let mut num_blocks = 0;
|
||||||
|
while let Some(event) = blocks.next().await {
|
||||||
|
let event = event.unwrap();
|
||||||
|
events.push(event.clone());
|
||||||
|
|
||||||
|
match event {
|
||||||
|
FollowEvent::Initialized(_) => panic!("Unexpected"),
|
||||||
|
FollowEvent::NewBlock(new) => {
|
||||||
|
let hash = new.block_hash.hash();
|
||||||
|
let parent = new.parent_block_hash.hash();
|
||||||
|
|
||||||
|
if tracked_blocks.contains_key(&hash) {
|
||||||
|
panic!(
|
||||||
|
"NewBlock block={:?} parent={:?} already tracked tracked={:#?} events={:#?}",
|
||||||
|
hash, parent, tracked_blocks, events,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if !tracked_blocks.contains_key(&parent) {
|
||||||
|
panic!(
|
||||||
|
"NewBlock PARENT NOT TRACKED block={:?} parent={:?} tracked={:#?} events={:#?}",
|
||||||
|
hash, parent, tracked_blocks, events,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
tracked_blocks.insert(hash, false);
|
||||||
|
}
|
||||||
|
FollowEvent::BestBlockChanged(best) => {
|
||||||
|
let hash = best.best_block_hash.hash();
|
||||||
|
|
||||||
|
if !tracked_blocks.contains_key(&hash) {
|
||||||
|
panic!(
|
||||||
|
"BestBlockChanged not tracked block={:?} tracked={:#?} events={:#?}",
|
||||||
|
hash, tracked_blocks, events,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
FollowEvent::Finalized(fin) => {
|
||||||
|
let hashes = fin.finalized_block_hashes;
|
||||||
|
|
||||||
|
for hash in hashes {
|
||||||
|
if !tracked_blocks.contains_key(&hash.hash()) {
|
||||||
|
panic!(
|
||||||
|
"Finalized block={:?} not tracked tracked={:#?} events={:#?}",
|
||||||
|
hash, tracked_blocks, events,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
tracked_blocks.insert(hash.hash(), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
num_blocks += 1;
|
||||||
|
if num_blocks > 40 {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user