mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 22:11:06 +00:00
Metadata V16: Be more dynamic over which hasher is used. (#1974)
* Use DynamicHasher256 to support Blake2 or Keccack depending on chain * remove Config::Hash associated type, replace with HashFor<Config> alias * Fix doc links * fix wasm tests * Don't strip system pallet associated types. check System.Hashing, not Hash. Rename BlockHash trait to Hash * Tweak comment * fmt * fix merge * Fix typo
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::config::{Config, HashFor};
|
||||
use crate::error::Error;
|
||||
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
|
||||
use std::future::Future;
|
||||
@@ -99,7 +99,7 @@ impl<Hash> FollowStream<Hash> {
|
||||
}
|
||||
|
||||
/// Create a new [`FollowStream`] given the RPC methods.
|
||||
pub fn from_methods<T: Config>(methods: ChainHeadRpcMethods<T>) -> FollowStream<T::Hash> {
|
||||
pub fn from_methods<T: Config>(methods: ChainHeadRpcMethods<T>) -> FollowStream<HashFor<T>> {
|
||||
FollowStream {
|
||||
stream_getter: Box::new(move || {
|
||||
let methods = methods.clone();
|
||||
@@ -115,7 +115,7 @@ impl<Hash> FollowStream<Hash> {
|
||||
};
|
||||
// Map stream errors into the higher level subxt one:
|
||||
let stream = stream.map_err(|e| e.into());
|
||||
let stream: FollowEventStream<T::Hash> = Box::pin(stream);
|
||||
let stream: FollowEventStream<HashFor<T>> = Box::pin(stream);
|
||||
// Return both:
|
||||
Ok((stream, sub_id))
|
||||
})
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// see LICENSE for license details.
|
||||
|
||||
use super::follow_stream_unpin::{BlockRef, FollowStreamMsg, FollowStreamUnpin};
|
||||
use crate::config::BlockHash;
|
||||
use crate::config::Hash;
|
||||
use crate::error::{Error, RpcError};
|
||||
use futures::stream::{Stream, StreamExt};
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
@@ -18,15 +18,15 @@ use subxt_rpcs::methods::chain_head::{FollowEvent, Initialized, RuntimeEvent};
|
||||
/// blocks since then, as if they were each creating a unique `chainHead_follow` subscription). This
|
||||
/// is the "top" layer of our follow stream subscriptions, and the one that's interacted with elsewhere.
|
||||
#[derive(Debug)]
|
||||
pub struct FollowStreamDriver<Hash: BlockHash> {
|
||||
inner: FollowStreamUnpin<Hash>,
|
||||
shared: Shared<Hash>,
|
||||
pub struct FollowStreamDriver<H: Hash> {
|
||||
inner: FollowStreamUnpin<H>,
|
||||
shared: Shared<H>,
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash> FollowStreamDriver<Hash> {
|
||||
impl<H: Hash> FollowStreamDriver<H> {
|
||||
/// Create a new [`FollowStreamDriver`]. This must be polled by some executor
|
||||
/// in order for any progress to be made. Things can subscribe to events.
|
||||
pub fn new(follow_unpin: FollowStreamUnpin<Hash>) -> Self {
|
||||
pub fn new(follow_unpin: FollowStreamUnpin<H>) -> Self {
|
||||
Self {
|
||||
inner: follow_unpin,
|
||||
shared: Shared::default(),
|
||||
@@ -34,14 +34,14 @@ impl<Hash: BlockHash> FollowStreamDriver<Hash> {
|
||||
}
|
||||
|
||||
/// Return a handle from which we can create new subscriptions to follow events.
|
||||
pub fn handle(&self) -> FollowStreamDriverHandle<Hash> {
|
||||
pub fn handle(&self) -> FollowStreamDriverHandle<H> {
|
||||
FollowStreamDriverHandle {
|
||||
shared: self.shared.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash> Stream for FollowStreamDriver<Hash> {
|
||||
impl<H: Hash> Stream for FollowStreamDriver<H> {
|
||||
type Item = Result<(), Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
@@ -65,13 +65,13 @@ impl<Hash: BlockHash> Stream for FollowStreamDriver<Hash> {
|
||||
/// A handle that can be used to create subscribers, but that doesn't
|
||||
/// itself subscribe to events.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FollowStreamDriverHandle<Hash: BlockHash> {
|
||||
shared: Shared<Hash>,
|
||||
pub struct FollowStreamDriverHandle<H: Hash> {
|
||||
shared: Shared<H>,
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash> FollowStreamDriverHandle<Hash> {
|
||||
impl<H: Hash> FollowStreamDriverHandle<H> {
|
||||
/// Subscribe to follow events.
|
||||
pub fn subscribe(&self) -> FollowStreamDriverSubscription<Hash> {
|
||||
pub fn subscribe(&self) -> FollowStreamDriverSubscription<H> {
|
||||
self.shared.subscribe()
|
||||
}
|
||||
}
|
||||
@@ -82,15 +82,15 @@ impl<Hash: BlockHash> FollowStreamDriverHandle<Hash> {
|
||||
/// runtime information, and then any new/best block events and so on received since
|
||||
/// the latest finalized block.
|
||||
#[derive(Debug)]
|
||||
pub struct FollowStreamDriverSubscription<Hash: BlockHash> {
|
||||
pub struct FollowStreamDriverSubscription<H: Hash> {
|
||||
id: usize,
|
||||
done: bool,
|
||||
shared: Shared<Hash>,
|
||||
local_items: VecDeque<FollowStreamMsg<BlockRef<Hash>>>,
|
||||
shared: Shared<H>,
|
||||
local_items: VecDeque<FollowStreamMsg<BlockRef<H>>>,
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash> Stream for FollowStreamDriverSubscription<Hash> {
|
||||
type Item = FollowStreamMsg<BlockRef<Hash>>;
|
||||
impl<H: Hash> Stream for FollowStreamDriverSubscription<H> {
|
||||
type Item = FollowStreamMsg<BlockRef<H>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
if self.done {
|
||||
@@ -122,7 +122,7 @@ impl<Hash: BlockHash> Stream for FollowStreamDriverSubscription<Hash> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash> FollowStreamDriverSubscription<Hash> {
|
||||
impl<H: Hash> FollowStreamDriverSubscription<H> {
|
||||
/// Return the current subscription ID. If the subscription has stopped, then this will
|
||||
/// wait until a new subscription has started with a new ID.
|
||||
pub async fn subscription_id(self) -> Option<String> {
|
||||
@@ -138,18 +138,18 @@ impl<Hash: BlockHash> FollowStreamDriverSubscription<Hash> {
|
||||
}
|
||||
|
||||
/// Subscribe to the follow events, ignoring any other messages.
|
||||
pub fn events(self) -> impl Stream<Item = FollowEvent<BlockRef<Hash>>> + Send + Sync {
|
||||
pub fn events(self) -> impl Stream<Item = FollowEvent<BlockRef<H>>> + Send + Sync {
|
||||
self.filter_map(|ev| std::future::ready(ev.into_event()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash> Clone for FollowStreamDriverSubscription<Hash> {
|
||||
impl<H: Hash> Clone for FollowStreamDriverSubscription<H> {
|
||||
fn clone(&self) -> Self {
|
||||
self.shared.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash> Drop for FollowStreamDriverSubscription<Hash> {
|
||||
impl<H: Hash> Drop for FollowStreamDriverSubscription<H> {
|
||||
fn drop(&mut self) {
|
||||
self.shared.remove_sub(self.id);
|
||||
}
|
||||
@@ -159,25 +159,25 @@ impl<Hash: BlockHash> Drop for FollowStreamDriverSubscription<Hash> {
|
||||
/// events to any subscribers, and subscribers will access it to pull the
|
||||
/// events destined for themselves.
|
||||
#[derive(Debug, Clone)]
|
||||
struct Shared<Hash: BlockHash>(Arc<Mutex<SharedState<Hash>>>);
|
||||
struct Shared<H: Hash>(Arc<Mutex<SharedState<H>>>);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SharedState<Hash: BlockHash> {
|
||||
struct SharedState<H: Hash> {
|
||||
done: bool,
|
||||
next_id: usize,
|
||||
subscribers: HashMap<usize, SubscriberDetails<Hash>>,
|
||||
subscribers: HashMap<usize, SubscriberDetails<H>>,
|
||||
/// Keep a buffer of all events that should be handed to a new subscription.
|
||||
block_events_for_new_subscriptions: VecDeque<FollowEvent<BlockRef<Hash>>>,
|
||||
block_events_for_new_subscriptions: VecDeque<FollowEvent<BlockRef<H>>>,
|
||||
// Keep track of the subscription ID we send out on new subs.
|
||||
current_subscription_id: Option<String>,
|
||||
// Keep track of the init message we send out on new subs.
|
||||
current_init_message: Option<Initialized<BlockRef<Hash>>>,
|
||||
current_init_message: Option<Initialized<BlockRef<H>>>,
|
||||
// Runtime events by block hash; we need to track these to know
|
||||
// whether the runtime has changed when we see a finalized block notification.
|
||||
seen_runtime_events: HashMap<Hash, RuntimeEvent>,
|
||||
seen_runtime_events: HashMap<H, RuntimeEvent>,
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash> Default for Shared<Hash> {
|
||||
impl<H: Hash> Default for Shared<H> {
|
||||
fn default() -> Self {
|
||||
Shared(Arc::new(Mutex::new(SharedState {
|
||||
next_id: 1,
|
||||
@@ -191,7 +191,7 @@ impl<Hash: BlockHash> Default for Shared<Hash> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash> Shared<Hash> {
|
||||
impl<H: Hash> Shared<H> {
|
||||
/// Set the shared state to "done"; no more items will be handed to it.
|
||||
pub fn done(&self) {
|
||||
let mut shared = self.0.lock().unwrap();
|
||||
@@ -216,7 +216,7 @@ impl<Hash: BlockHash> Shared<Hash> {
|
||||
&self,
|
||||
sub_id: usize,
|
||||
waker: &Waker,
|
||||
) -> Option<VecDeque<FollowStreamMsg<BlockRef<Hash>>>> {
|
||||
) -> Option<VecDeque<FollowStreamMsg<BlockRef<H>>>> {
|
||||
let mut shared = self.0.lock().unwrap();
|
||||
|
||||
let is_done = shared.done;
|
||||
@@ -236,7 +236,7 @@ impl<Hash: BlockHash> Shared<Hash> {
|
||||
}
|
||||
|
||||
/// Push a new item out to subscribers.
|
||||
pub fn push_item(&self, item: FollowStreamMsg<BlockRef<Hash>>) {
|
||||
pub fn push_item(&self, item: FollowStreamMsg<BlockRef<H>>) {
|
||||
let mut shared = self.0.lock().unwrap();
|
||||
let shared = shared.deref_mut();
|
||||
|
||||
@@ -289,7 +289,7 @@ impl<Hash: BlockHash> Shared<Hash> {
|
||||
// the state at the head of the chain, therefore it is correct to remove those as well.
|
||||
// Idem for the pruned hashes; they will never be reported again and we remove
|
||||
// them from the window of events.
|
||||
let to_remove: HashSet<Hash> = finalized_ev
|
||||
let to_remove: HashSet<H> = finalized_ev
|
||||
.finalized_block_hashes
|
||||
.iter()
|
||||
.chain(finalized_ev.pruned_block_hashes.iter())
|
||||
@@ -337,7 +337,7 @@ impl<Hash: BlockHash> Shared<Hash> {
|
||||
}
|
||||
|
||||
/// Create a new subscription.
|
||||
pub fn subscribe(&self) -> FollowStreamDriverSubscription<Hash> {
|
||||
pub fn subscribe(&self) -> FollowStreamDriverSubscription<H> {
|
||||
let mut shared = self.0.lock().unwrap();
|
||||
|
||||
let id = shared.next_id;
|
||||
@@ -382,30 +382,30 @@ impl<Hash: BlockHash> Shared<Hash> {
|
||||
/// Details for a given subscriber: any items it's not yet claimed,
|
||||
/// and a way to wake it up when there are more items for it.
|
||||
#[derive(Debug)]
|
||||
struct SubscriberDetails<Hash: BlockHash> {
|
||||
items: VecDeque<FollowStreamMsg<BlockRef<Hash>>>,
|
||||
struct SubscriberDetails<H: Hash> {
|
||||
items: VecDeque<FollowStreamMsg<BlockRef<H>>>,
|
||||
waker: Option<Waker>,
|
||||
}
|
||||
|
||||
/// A stream that subscribes to finalized blocks
|
||||
/// and indicates whether a block was missed if was restarted.
|
||||
#[derive(Debug)]
|
||||
pub struct FollowStreamFinalizedHeads<Hash: BlockHash, F> {
|
||||
stream: FollowStreamDriverSubscription<Hash>,
|
||||
pub struct FollowStreamFinalizedHeads<H: Hash, F> {
|
||||
stream: FollowStreamDriverSubscription<H>,
|
||||
sub_id: Option<String>,
|
||||
last_seen_block: Option<BlockRef<Hash>>,
|
||||
last_seen_block: Option<BlockRef<H>>,
|
||||
f: F,
|
||||
is_done: bool,
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash, F> Unpin for FollowStreamFinalizedHeads<Hash, F> {}
|
||||
impl<H: Hash, F> Unpin for FollowStreamFinalizedHeads<H, F> {}
|
||||
|
||||
impl<Hash, F> FollowStreamFinalizedHeads<Hash, F>
|
||||
impl<H, F> FollowStreamFinalizedHeads<H, F>
|
||||
where
|
||||
Hash: BlockHash,
|
||||
F: Fn(FollowEvent<BlockRef<Hash>>) -> Vec<BlockRef<Hash>>,
|
||||
H: Hash,
|
||||
F: Fn(FollowEvent<BlockRef<H>>) -> Vec<BlockRef<H>>,
|
||||
{
|
||||
pub fn new(stream: FollowStreamDriverSubscription<Hash>, f: F) -> Self {
|
||||
pub fn new(stream: FollowStreamDriverSubscription<H>, f: F) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
sub_id: None,
|
||||
@@ -416,12 +416,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<Hash, F> Stream for FollowStreamFinalizedHeads<Hash, F>
|
||||
impl<H, F> Stream for FollowStreamFinalizedHeads<H, F>
|
||||
where
|
||||
Hash: BlockHash,
|
||||
F: Fn(FollowEvent<BlockRef<Hash>>) -> Vec<BlockRef<Hash>>,
|
||||
H: Hash,
|
||||
F: Fn(FollowEvent<BlockRef<H>>) -> Vec<BlockRef<H>>,
|
||||
{
|
||||
type Item = Result<(String, Vec<BlockRef<Hash>>), Error>;
|
||||
type Item = Result<(String, Vec<BlockRef<H>>), Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
if self.is_done {
|
||||
@@ -493,14 +493,14 @@ mod test_utils {
|
||||
use super::*;
|
||||
|
||||
/// Return a `FollowStreamDriver`
|
||||
pub fn test_follow_stream_driver_getter<Hash, F, I>(
|
||||
pub fn test_follow_stream_driver_getter<H, F, I>(
|
||||
events: F,
|
||||
max_life: usize,
|
||||
) -> FollowStreamDriver<Hash>
|
||||
) -> FollowStreamDriver<H>
|
||||
where
|
||||
Hash: BlockHash + 'static,
|
||||
H: Hash + 'static,
|
||||
F: Fn() -> I + Send + 'static,
|
||||
I: IntoIterator<Item = Result<FollowEvent<Hash>, Error>>,
|
||||
I: IntoIterator<Item = Result<FollowEvent<H>, Error>>,
|
||||
{
|
||||
let (stream, _) = test_unpin_stream_getter(events, max_life);
|
||||
FollowStreamDriver::new(stream)
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
|
||||
use super::follow_stream::FollowStream;
|
||||
use super::ChainHeadRpcMethods;
|
||||
use crate::config::{BlockHash, Config};
|
||||
use crate::config::{Config, Hash, HashFor};
|
||||
use crate::error::Error;
|
||||
use futures::stream::{FuturesUnordered, Stream, StreamExt};
|
||||
use subxt_rpcs::methods::chain_head::{
|
||||
@@ -27,11 +27,11 @@ pub use super::follow_stream::FollowStreamMsg;
|
||||
/// result). Put simply, it tries to keep every block pinned as long as possible until the block is no longer
|
||||
/// used anywhere.
|
||||
#[derive(Debug)]
|
||||
pub struct FollowStreamUnpin<Hash: BlockHash> {
|
||||
pub struct FollowStreamUnpin<H: Hash> {
|
||||
// The underlying stream of events.
|
||||
inner: FollowStream<Hash>,
|
||||
inner: FollowStream<H>,
|
||||
// A method to call to unpin a block, given a block hash and a subscription ID.
|
||||
unpin_method: UnpinMethodHolder<Hash>,
|
||||
unpin_method: UnpinMethodHolder<H>,
|
||||
// Futures for sending unpin events that we'll poll to completion as
|
||||
// part of polling the stream as a whole.
|
||||
unpin_futs: FuturesUnordered<UnpinFut>,
|
||||
@@ -46,14 +46,14 @@ pub struct FollowStreamUnpin<Hash: BlockHash> {
|
||||
// The longest period a block can be pinned for.
|
||||
max_block_life: usize,
|
||||
// The currently seen and pinned blocks.
|
||||
pinned: HashMap<Hash, PinnedDetails<Hash>>,
|
||||
pinned: HashMap<H, PinnedDetails<H>>,
|
||||
// Shared state about blocks we've flagged to unpin from elsewhere
|
||||
unpin_flags: UnpinFlags<Hash>,
|
||||
unpin_flags: UnpinFlags<H>,
|
||||
}
|
||||
|
||||
// Just a wrapper to make implementing debug on the whole thing easier.
|
||||
struct UnpinMethodHolder<Hash>(UnpinMethod<Hash>);
|
||||
impl<Hash> std::fmt::Debug for UnpinMethodHolder<Hash> {
|
||||
struct UnpinMethodHolder<H>(UnpinMethod<H>);
|
||||
impl<H> std::fmt::Debug for UnpinMethodHolder<H> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
@@ -63,15 +63,15 @@ impl<Hash> std::fmt::Debug for UnpinMethodHolder<Hash> {
|
||||
}
|
||||
|
||||
/// The type of the unpin method that we need to provide.
|
||||
pub type UnpinMethod<Hash> = Box<dyn FnMut(Hash, Arc<str>) -> UnpinFut + Send>;
|
||||
pub type UnpinMethod<H> = Box<dyn FnMut(H, Arc<str>) -> UnpinFut + Send>;
|
||||
|
||||
/// The future returned from [`UnpinMethod`].
|
||||
pub type UnpinFut = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
|
||||
|
||||
impl<Hash: BlockHash> std::marker::Unpin for FollowStreamUnpin<Hash> {}
|
||||
impl<H: Hash> std::marker::Unpin for FollowStreamUnpin<H> {}
|
||||
|
||||
impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
|
||||
type Item = Result<FollowStreamMsg<BlockRef<Hash>>, Error>;
|
||||
impl<H: Hash> Stream for FollowStreamUnpin<H> {
|
||||
type Item = Result<FollowStreamMsg<BlockRef<H>>, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut this = self.as_mut();
|
||||
@@ -253,11 +253,11 @@ impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
|
||||
impl<H: Hash> FollowStreamUnpin<H> {
|
||||
/// Create a new [`FollowStreamUnpin`].
|
||||
pub fn new(
|
||||
follow_stream: FollowStream<Hash>,
|
||||
unpin_method: UnpinMethod<Hash>,
|
||||
follow_stream: FollowStream<H>,
|
||||
unpin_method: UnpinMethod<H>,
|
||||
max_block_life: usize,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -274,11 +274,11 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
|
||||
|
||||
/// Create a new [`FollowStreamUnpin`] given the RPC methods.
|
||||
pub fn from_methods<T: Config>(
|
||||
follow_stream: FollowStream<T::Hash>,
|
||||
follow_stream: FollowStream<HashFor<T>>,
|
||||
methods: ChainHeadRpcMethods<T>,
|
||||
max_block_life: usize,
|
||||
) -> FollowStreamUnpin<T::Hash> {
|
||||
let unpin_method = Box::new(move |hash: T::Hash, sub_id: Arc<str>| {
|
||||
) -> FollowStreamUnpin<HashFor<T>> {
|
||||
let unpin_method = Box::new(move |hash: HashFor<T>, sub_id: Arc<str>| {
|
||||
let methods = methods.clone();
|
||||
let fut: UnpinFut = Box::pin(async move {
|
||||
// We ignore any errors trying to unpin at the moment.
|
||||
@@ -291,14 +291,14 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
|
||||
}
|
||||
|
||||
/// Is the block hash currently pinned.
|
||||
pub fn is_pinned(&self, hash: &Hash) -> bool {
|
||||
pub fn is_pinned(&self, hash: &H) -> bool {
|
||||
self.pinned.contains_key(hash)
|
||||
}
|
||||
|
||||
/// Pin a block, or return the reference to an already-pinned block. If the block has been registered to
|
||||
/// be unpinned, we'll clear those flags, so that it won't be unpinned. If the unpin request has already
|
||||
/// been sent though, then the block will be unpinned.
|
||||
fn pin_block_at(&mut self, rel_block_age: usize, hash: Hash) -> BlockRef<Hash> {
|
||||
fn pin_block_at(&mut self, rel_block_age: usize, hash: H) -> BlockRef<H> {
|
||||
self.pin_block_at_setting_unpinnable_flag(rel_block_age, hash, false)
|
||||
}
|
||||
|
||||
@@ -306,16 +306,16 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
|
||||
///
|
||||
/// This is the same as [`Self::pin_block_at`], except that it also marks the block as being unpinnable now,
|
||||
/// which should be done for any block that will no longer be seen in future events.
|
||||
fn pin_unpinnable_block_at(&mut self, rel_block_age: usize, hash: Hash) -> BlockRef<Hash> {
|
||||
fn pin_unpinnable_block_at(&mut self, rel_block_age: usize, hash: H) -> BlockRef<H> {
|
||||
self.pin_block_at_setting_unpinnable_flag(rel_block_age, hash, true)
|
||||
}
|
||||
|
||||
fn pin_block_at_setting_unpinnable_flag(
|
||||
&mut self,
|
||||
rel_block_age: usize,
|
||||
hash: Hash,
|
||||
hash: H,
|
||||
can_be_unpinned: bool,
|
||||
) -> BlockRef<Hash> {
|
||||
) -> BlockRef<H> {
|
||||
let entry = self
|
||||
.pinned
|
||||
.entry(hash)
|
||||
@@ -390,10 +390,10 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
|
||||
|
||||
// The set of block hashes that can be unpinned when ready.
|
||||
// BlockRefs write to this when they are dropped.
|
||||
type UnpinFlags<Hash> = Arc<Mutex<HashSet<Hash>>>;
|
||||
type UnpinFlags<H> = Arc<Mutex<HashSet<H>>>;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PinnedDetails<Hash: BlockHash> {
|
||||
struct PinnedDetails<H: Hash> {
|
||||
/// Relatively speaking, how old is the block? When we start following
|
||||
/// blocks, the first finalized block gets an age of 0, the second an age
|
||||
/// of 1 and so on.
|
||||
@@ -401,7 +401,7 @@ struct PinnedDetails<Hash: BlockHash> {
|
||||
/// A block ref we can hand out to keep blocks pinned.
|
||||
/// Because we store one here until it's unpinned, the live count
|
||||
/// will only drop to 1 when no external refs are left.
|
||||
block_ref: BlockRef<Hash>,
|
||||
block_ref: BlockRef<H>,
|
||||
/// Has this block showed up in the list of pruned blocks, or has it
|
||||
/// been finalized? In this case, it can now been pinned as it won't
|
||||
/// show up again in future events (except as a "parent block" of some
|
||||
@@ -411,21 +411,21 @@ struct PinnedDetails<Hash: BlockHash> {
|
||||
|
||||
/// All blocks reported will be wrapped in this.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BlockRef<Hash: BlockHash> {
|
||||
inner: Arc<BlockRefInner<Hash>>,
|
||||
pub struct BlockRef<H: Hash> {
|
||||
inner: Arc<BlockRefInner<H>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct BlockRefInner<Hash> {
|
||||
hash: Hash,
|
||||
unpin_flags: UnpinFlags<Hash>,
|
||||
struct BlockRefInner<H> {
|
||||
hash: H,
|
||||
unpin_flags: UnpinFlags<H>,
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash> BlockRef<Hash> {
|
||||
impl<H: Hash> BlockRef<H> {
|
||||
/// For testing purposes only, create a BlockRef from a hash
|
||||
/// that isn't pinned.
|
||||
#[cfg(test)]
|
||||
pub fn new(hash: Hash) -> Self {
|
||||
pub fn new(hash: H) -> Self {
|
||||
BlockRef {
|
||||
inner: Arc::new(BlockRefInner {
|
||||
hash,
|
||||
@@ -435,24 +435,24 @@ impl<Hash: BlockHash> BlockRef<Hash> {
|
||||
}
|
||||
|
||||
/// Return the hash for this block.
|
||||
pub fn hash(&self) -> Hash {
|
||||
pub fn hash(&self) -> H {
|
||||
self.inner.hash
|
||||
}
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash> PartialEq for BlockRef<Hash> {
|
||||
impl<H: Hash> PartialEq for BlockRef<H> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.inner.hash == other.inner.hash
|
||||
}
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash> PartialEq<Hash> for BlockRef<Hash> {
|
||||
fn eq(&self, other: &Hash) -> bool {
|
||||
impl<H: Hash> PartialEq<H> for BlockRef<H> {
|
||||
fn eq(&self, other: &H) -> bool {
|
||||
&self.inner.hash == other
|
||||
}
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash> Drop for BlockRef<Hash> {
|
||||
impl<H: Hash> Drop for BlockRef<H> {
|
||||
fn drop(&mut self) {
|
||||
// PinnedDetails keeps one ref, so if this is the second ref, it's the
|
||||
// only "external" one left and we should ask to unpin it now. if it's
|
||||
@@ -472,23 +472,23 @@ pub(super) mod test_utils {
|
||||
use super::*;
|
||||
use crate::config::substrate::H256;
|
||||
|
||||
pub type UnpinRx<Hash> = std::sync::mpsc::Receiver<(Hash, Arc<str>)>;
|
||||
pub type UnpinRx<H> = std::sync::mpsc::Receiver<(H, Arc<str>)>;
|
||||
|
||||
/// Get a [`FollowStreamUnpin`] from an iterator over events.
|
||||
pub fn test_unpin_stream_getter<Hash, F, I>(
|
||||
pub fn test_unpin_stream_getter<H, F, I>(
|
||||
events: F,
|
||||
max_life: usize,
|
||||
) -> (FollowStreamUnpin<Hash>, UnpinRx<Hash>)
|
||||
) -> (FollowStreamUnpin<H>, UnpinRx<H>)
|
||||
where
|
||||
Hash: BlockHash + 'static,
|
||||
H: Hash + 'static,
|
||||
F: Fn() -> I + Send + 'static,
|
||||
I: IntoIterator<Item = Result<FollowEvent<Hash>, Error>>,
|
||||
I: IntoIterator<Item = Result<FollowEvent<H>, Error>>,
|
||||
{
|
||||
// Unpin requests will come here so that we can look out for them.
|
||||
let (unpin_tx, unpin_rx) = std::sync::mpsc::channel();
|
||||
|
||||
let follow_stream = FollowStream::new(test_stream_getter(events));
|
||||
let unpin_method: UnpinMethod<Hash> = Box::new(move |hash, sub_id| {
|
||||
let unpin_method: UnpinMethod<H> = Box::new(move |hash, sub_id| {
|
||||
unpin_tx.send((hash, sub_id)).unwrap();
|
||||
Box::pin(std::future::ready(()))
|
||||
});
|
||||
@@ -498,11 +498,11 @@ pub(super) mod test_utils {
|
||||
}
|
||||
|
||||
/// Assert that the unpinned blocks sent from the `UnpinRx` channel match the items given.
|
||||
pub fn assert_from_unpin_rx<Hash: BlockHash + 'static>(
|
||||
unpin_rx: &UnpinRx<Hash>,
|
||||
items: impl IntoIterator<Item = Hash>,
|
||||
pub fn assert_from_unpin_rx<H: Hash + 'static>(
|
||||
unpin_rx: &UnpinRx<H>,
|
||||
items: impl IntoIterator<Item = H>,
|
||||
) {
|
||||
let expected_hashes = HashSet::<Hash>::from_iter(items);
|
||||
let expected_hashes = HashSet::<H>::from_iter(items);
|
||||
for i in 0..expected_hashes.len() {
|
||||
let Ok((hash, _)) = unpin_rx.try_recv() else {
|
||||
panic!("Another unpin event is expected, but failed to pull item {i} from channel");
|
||||
|
||||
@@ -21,9 +21,8 @@ use crate::backend::{
|
||||
utils::retry, Backend, BlockRef, BlockRefT, RuntimeVersion, StorageResponse, StreamOf,
|
||||
StreamOfResults, TransactionStatus,
|
||||
};
|
||||
use crate::config::BlockHash;
|
||||
use crate::config::{Config, Hash, HashFor};
|
||||
use crate::error::{Error, RpcError};
|
||||
use crate::Config;
|
||||
use async_trait::async_trait;
|
||||
use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle};
|
||||
use futures::future::Either;
|
||||
@@ -130,12 +129,13 @@ impl<T: Config> ChainHeadBackendBuilder<T> {
|
||||
// Construct the underlying follow_stream layers:
|
||||
let rpc_methods = ChainHeadRpcMethods::new(client.into());
|
||||
let follow_stream =
|
||||
follow_stream::FollowStream::<T::Hash>::from_methods(rpc_methods.clone());
|
||||
let follow_stream_unpin = follow_stream_unpin::FollowStreamUnpin::<T::Hash>::from_methods(
|
||||
follow_stream,
|
||||
rpc_methods.clone(),
|
||||
self.max_block_life,
|
||||
);
|
||||
follow_stream::FollowStream::<HashFor<T>>::from_methods(rpc_methods.clone());
|
||||
let follow_stream_unpin =
|
||||
follow_stream_unpin::FollowStreamUnpin::<HashFor<T>>::from_methods(
|
||||
follow_stream,
|
||||
rpc_methods.clone(),
|
||||
self.max_block_life,
|
||||
);
|
||||
let follow_stream_driver = FollowStreamDriver::new(follow_stream_unpin);
|
||||
|
||||
// Wrap these into the backend and driver that we'll expose.
|
||||
@@ -193,11 +193,11 @@ impl<T: Config> ChainHeadBackendBuilder<T> {
|
||||
/// backend to make progress.
|
||||
#[derive(Debug)]
|
||||
pub struct ChainHeadBackendDriver<T: Config> {
|
||||
driver: FollowStreamDriver<T::Hash>,
|
||||
driver: FollowStreamDriver<HashFor<T>>,
|
||||
}
|
||||
|
||||
impl<T: Config> Stream for ChainHeadBackendDriver<T> {
|
||||
type Item = <FollowStreamDriver<T::Hash> as Stream>::Item;
|
||||
type Item = <FollowStreamDriver<HashFor<T>> as Stream>::Item;
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
@@ -212,7 +212,7 @@ pub struct ChainHeadBackend<T: Config> {
|
||||
// RPC methods we'll want to call:
|
||||
methods: ChainHeadRpcMethods<T>,
|
||||
// A handle to the chainHead_follow subscription:
|
||||
follow_handle: FollowStreamDriverHandle<T::Hash>,
|
||||
follow_handle: FollowStreamDriverHandle<HashFor<T>>,
|
||||
// How long to wait until giving up on transactions:
|
||||
transaction_timeout_secs: usize,
|
||||
// Don't synchronise blocks with chainHead_follow when submitting txs:
|
||||
@@ -229,11 +229,11 @@ impl<T: Config> ChainHeadBackend<T> {
|
||||
async fn stream_headers<F>(
|
||||
&self,
|
||||
f: F,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error>
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, Error>
|
||||
where
|
||||
F: Fn(
|
||||
FollowEvent<follow_stream_unpin::BlockRef<T::Hash>>,
|
||||
) -> Vec<follow_stream_unpin::BlockRef<T::Hash>>
|
||||
FollowEvent<follow_stream_unpin::BlockRef<HashFor<T>>>,
|
||||
) -> Vec<follow_stream_unpin::BlockRef<HashFor<T>>>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
@@ -275,9 +275,9 @@ impl<T: Config> ChainHeadBackend<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<Hash: BlockHash + 'static> BlockRefT for follow_stream_unpin::BlockRef<Hash> {}
|
||||
impl<Hash: BlockHash + 'static> From<follow_stream_unpin::BlockRef<Hash>> for BlockRef<Hash> {
|
||||
fn from(b: follow_stream_unpin::BlockRef<Hash>) -> Self {
|
||||
impl<H: Hash + 'static> BlockRefT for follow_stream_unpin::BlockRef<H> {}
|
||||
impl<H: Hash + 'static> From<follow_stream_unpin::BlockRef<H>> for BlockRef<H> {
|
||||
fn from(b: follow_stream_unpin::BlockRef<H>) -> Self {
|
||||
BlockRef::new(b.hash(), b)
|
||||
}
|
||||
}
|
||||
@@ -289,7 +289,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
async fn storage_fetch_values(
|
||||
&self,
|
||||
keys: Vec<Vec<u8>>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
) -> Result<StreamOfResults<StorageResponse>, Error> {
|
||||
retry(|| async {
|
||||
let queries = keys.iter().map(|key| StorageQuery {
|
||||
@@ -324,7 +324,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
async fn storage_fetch_descendant_keys(
|
||||
&self,
|
||||
key: Vec<u8>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
) -> Result<StreamOfResults<Vec<u8>>, Error> {
|
||||
retry(|| async {
|
||||
// Ask for hashes, and then just ignore them and return the keys that come back.
|
||||
@@ -350,7 +350,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
async fn storage_fetch_descendant_values(
|
||||
&self,
|
||||
key: Vec<u8>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
) -> Result<StreamOfResults<StorageResponse>, Error> {
|
||||
retry(|| async {
|
||||
let query = StorageQuery {
|
||||
@@ -386,7 +386,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn genesis_hash(&self) -> Result<T::Hash, Error> {
|
||||
async fn genesis_hash(&self) -> Result<HashFor<T>, Error> {
|
||||
retry(|| async {
|
||||
let genesis_hash = self.methods.chainspec_v1_genesis_hash().await?;
|
||||
Ok(genesis_hash)
|
||||
@@ -394,7 +394,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn block_header(&self, at: T::Hash) -> Result<Option<T::Header>, Error> {
|
||||
async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, Error> {
|
||||
retry(|| async {
|
||||
let sub_id = get_subscription_id(&self.follow_handle).await?;
|
||||
let header = self.methods.chainhead_v1_header(&sub_id, at).await?;
|
||||
@@ -403,7 +403,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn block_body(&self, at: T::Hash) -> Result<Option<Vec<Vec<u8>>>, Error> {
|
||||
async fn block_body(&self, at: HashFor<T>) -> Result<Option<Vec<Vec<u8>>>, Error> {
|
||||
retry(|| async {
|
||||
let sub_id = get_subscription_id(&self.follow_handle).await?;
|
||||
|
||||
@@ -432,8 +432,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<T::Hash>, Error> {
|
||||
let next_ref: Option<BlockRef<T::Hash>> = self
|
||||
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<HashFor<T>>, Error> {
|
||||
let next_ref: Option<BlockRef<HashFor<T>>> = self
|
||||
.follow_handle
|
||||
.subscribe()
|
||||
.events()
|
||||
@@ -543,7 +543,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
|
||||
async fn stream_all_block_headers(
|
||||
&self,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
|
||||
_hasher: T::Hasher,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, Error> {
|
||||
// TODO: https://github.com/paritytech/subxt/issues/1568
|
||||
//
|
||||
// It's possible that blocks may be silently missed if
|
||||
@@ -560,7 +561,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
|
||||
async fn stream_best_block_headers(
|
||||
&self,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
|
||||
_hasher: T::Hasher,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, Error> {
|
||||
// TODO: https://github.com/paritytech/subxt/issues/1568
|
||||
//
|
||||
// It's possible that blocks may be silently missed if
|
||||
@@ -575,7 +577,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
|
||||
async fn stream_finalized_block_headers(
|
||||
&self,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
|
||||
_hasher: T::Hasher,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, Error> {
|
||||
self.stream_headers(|ev| match ev {
|
||||
FollowEvent::Initialized(init) => init.finalized_block_hashes,
|
||||
FollowEvent::Finalized(ev) => ev.finalized_block_hashes,
|
||||
@@ -587,12 +590,12 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
async fn submit_transaction(
|
||||
&self,
|
||||
extrinsic: &[u8],
|
||||
) -> Result<StreamOfResults<TransactionStatus<T::Hash>>, Error> {
|
||||
) -> Result<StreamOfResults<TransactionStatus<HashFor<T>>>, Error> {
|
||||
// Submit a transaction. This makes no attempt to sync with follow events,
|
||||
async fn submit_transaction_ignoring_follow_events<T: Config>(
|
||||
extrinsic: &[u8],
|
||||
methods: &ChainHeadRpcMethods<T>,
|
||||
) -> Result<StreamOfResults<TransactionStatus<T::Hash>>, Error> {
|
||||
) -> Result<StreamOfResults<TransactionStatus<HashFor<T>>>, Error> {
|
||||
let tx_progress = methods
|
||||
.transactionwatch_v1_submit_and_watch(extrinsic)
|
||||
.await?
|
||||
@@ -633,8 +636,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
extrinsic: &[u8],
|
||||
transaction_timeout_secs: u64,
|
||||
methods: &ChainHeadRpcMethods<T>,
|
||||
follow_handle: &FollowStreamDriverHandle<T::Hash>,
|
||||
) -> Result<StreamOfResults<TransactionStatus<T::Hash>>, Error> {
|
||||
follow_handle: &FollowStreamDriverHandle<HashFor<T>>,
|
||||
) -> Result<StreamOfResults<TransactionStatus<HashFor<T>>>, Error> {
|
||||
// We care about new and finalized block hashes.
|
||||
enum SeenBlockMarker {
|
||||
New,
|
||||
@@ -655,7 +658,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
// If we see the finalized event, we start waiting until we find a finalized block that
|
||||
// matches, so we can guarantee to return a pinned block hash and be properly in sync
|
||||
// with chainHead_follow.
|
||||
let mut finalized_hash: Option<T::Hash> = None;
|
||||
let mut finalized_hash: Option<HashFor<T>> = None;
|
||||
|
||||
// Record the start time so that we can time out if things appear to take too long.
|
||||
let start_instant = web_time::Instant::now();
|
||||
@@ -818,7 +821,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
&self,
|
||||
method: &str,
|
||||
call_parameters: Option<&[u8]>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
) -> Result<Vec<u8>, Error> {
|
||||
retry(|| async {
|
||||
let sub_id = get_subscription_id(&self.follow_handle).await?;
|
||||
@@ -856,8 +859,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
|
||||
}
|
||||
|
||||
/// A helper to obtain a subscription ID.
|
||||
async fn get_subscription_id<Hash: BlockHash>(
|
||||
follow_handle: &FollowStreamDriverHandle<Hash>,
|
||||
async fn get_subscription_id<H: Hash>(
|
||||
follow_handle: &FollowStreamDriverHandle<H>,
|
||||
) -> Result<String, Error> {
|
||||
let Some(sub_id) = follow_handle.subscribe().subscription_id().await else {
|
||||
return Err(RpcError::SubscriptionDropped.into());
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
|
||||
use super::follow_stream_driver::FollowStreamDriverHandle;
|
||||
use super::follow_stream_unpin::BlockRef;
|
||||
use crate::config::Config;
|
||||
use crate::config::{Config, HashFor};
|
||||
use crate::error::{Error, RpcError};
|
||||
use futures::{FutureExt, Stream, StreamExt};
|
||||
use std::collections::VecDeque;
|
||||
@@ -24,7 +24,7 @@ pub struct StorageItems<T: Config> {
|
||||
buffered_responses: VecDeque<StorageResult>,
|
||||
continue_call: ContinueFutGetter,
|
||||
continue_fut: Option<ContinueFut>,
|
||||
follow_event_stream: FollowEventStream<T::Hash>,
|
||||
follow_event_stream: FollowEventStream<HashFor<T>>,
|
||||
}
|
||||
|
||||
impl<T: Config> StorageItems<T> {
|
||||
@@ -33,8 +33,8 @@ impl<T: Config> StorageItems<T> {
|
||||
// needed, and stop when done.
|
||||
pub async fn from_methods(
|
||||
queries: impl Iterator<Item = StorageQuery<&[u8]>>,
|
||||
at: T::Hash,
|
||||
follow_handle: &FollowStreamDriverHandle<T::Hash>,
|
||||
at: HashFor<T>,
|
||||
follow_handle: &FollowStreamDriverHandle<HashFor<T>>,
|
||||
methods: ChainHeadRpcMethods<T>,
|
||||
) -> Result<Self, Error> {
|
||||
let sub_id = super::get_subscription_id(follow_handle).await?;
|
||||
@@ -76,7 +76,7 @@ impl<T: Config> StorageItems<T> {
|
||||
fn new(
|
||||
operation_id: Arc<str>,
|
||||
continue_call: ContinueFutGetter,
|
||||
follow_event_stream: FollowEventStream<T::Hash>,
|
||||
follow_event_stream: FollowEventStream<HashFor<T>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
done: false,
|
||||
|
||||
+27
-22
@@ -11,7 +11,10 @@ use crate::backend::{
|
||||
Backend, BlockRef, RuntimeVersion, StorageResponse, StreamOf, StreamOfResults,
|
||||
TransactionStatus,
|
||||
};
|
||||
use crate::{config::Header, Config, Error};
|
||||
use crate::{
|
||||
config::{Config, HashFor, Header},
|
||||
Error,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use futures::TryStreamExt;
|
||||
use futures::{future, future::Either, stream, Future, FutureExt, Stream, StreamExt};
|
||||
@@ -97,11 +100,11 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
||||
async fn storage_fetch_values(
|
||||
&self,
|
||||
keys: Vec<Vec<u8>>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
) -> Result<StreamOfResults<StorageResponse>, Error> {
|
||||
fn get_entry<T: Config>(
|
||||
key: Vec<u8>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
methods: LegacyRpcMethods<T>,
|
||||
) -> impl Future<Output = Result<Option<StorageResponse>, Error>> {
|
||||
retry(move || {
|
||||
@@ -134,7 +137,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
||||
async fn storage_fetch_descendant_keys(
|
||||
&self,
|
||||
key: Vec<u8>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
) -> Result<StreamOfResults<Vec<u8>>, Error> {
|
||||
let keys = StorageFetchDescendantKeysStream {
|
||||
at,
|
||||
@@ -165,7 +168,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
||||
async fn storage_fetch_descendant_values(
|
||||
&self,
|
||||
key: Vec<u8>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
) -> Result<StreamOfResults<StorageResponse>, Error> {
|
||||
let keys_stream = StorageFetchDescendantKeysStream {
|
||||
at,
|
||||
@@ -184,7 +187,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
||||
})))
|
||||
}
|
||||
|
||||
async fn genesis_hash(&self) -> Result<T::Hash, Error> {
|
||||
async fn genesis_hash(&self) -> Result<HashFor<T>, Error> {
|
||||
retry(|| async {
|
||||
let hash = self.methods.genesis_hash().await?;
|
||||
Ok(hash)
|
||||
@@ -192,7 +195,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn block_header(&self, at: T::Hash) -> Result<Option<T::Header>, Error> {
|
||||
async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, Error> {
|
||||
retry(|| async {
|
||||
let header = self.methods.chain_get_header(Some(at)).await?;
|
||||
Ok(header)
|
||||
@@ -200,7 +203,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn block_body(&self, at: T::Hash) -> Result<Option<Vec<Vec<u8>>>, Error> {
|
||||
async fn block_body(&self, at: HashFor<T>) -> Result<Option<Vec<Vec<u8>>>, Error> {
|
||||
retry(|| async {
|
||||
let Some(details) = self.methods.chain_get_block(Some(at)).await? else {
|
||||
return Ok(None);
|
||||
@@ -212,7 +215,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<T::Hash>, Error> {
|
||||
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<HashFor<T>>, Error> {
|
||||
retry(|| async {
|
||||
let hash = self.methods.chain_get_finalized_head().await?;
|
||||
Ok(BlockRef::from_hash(hash))
|
||||
@@ -270,16 +273,16 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
||||
|
||||
async fn stream_all_block_headers(
|
||||
&self,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
|
||||
hasher: T::Hasher,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, Error> {
|
||||
let methods = self.methods.clone();
|
||||
|
||||
let retry_sub = retry_stream(move || {
|
||||
let methods = methods.clone();
|
||||
Box::pin(async move {
|
||||
let sub = methods.chain_subscribe_all_heads().await?;
|
||||
let sub = sub.map_err(|e| e.into()).map(|r| {
|
||||
let sub = sub.map_err(|e| e.into()).map(move |r| {
|
||||
r.map(|h| {
|
||||
let hash = h.hash();
|
||||
let hash = h.hash_with(hasher);
|
||||
(h, BlockRef::from_hash(hash))
|
||||
})
|
||||
});
|
||||
@@ -293,16 +296,17 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
||||
|
||||
async fn stream_best_block_headers(
|
||||
&self,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
|
||||
hasher: T::Hasher,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, Error> {
|
||||
let methods = self.methods.clone();
|
||||
|
||||
let retry_sub = retry_stream(move || {
|
||||
let methods = methods.clone();
|
||||
Box::pin(async move {
|
||||
let sub = methods.chain_subscribe_new_heads().await?;
|
||||
let sub = sub.map_err(|e| e.into()).map(|r| {
|
||||
let sub = sub.map_err(|e| e.into()).map(move |r| {
|
||||
r.map(|h| {
|
||||
let hash = h.hash();
|
||||
let hash = h.hash_with(hasher);
|
||||
(h, BlockRef::from_hash(hash))
|
||||
})
|
||||
});
|
||||
@@ -316,7 +320,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
||||
|
||||
async fn stream_finalized_block_headers(
|
||||
&self,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
|
||||
hasher: T::Hasher,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, Error> {
|
||||
let this = self.clone();
|
||||
|
||||
let retry_sub = retry_stream(move || {
|
||||
@@ -338,9 +343,9 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
||||
sub,
|
||||
last_finalized_block_num,
|
||||
);
|
||||
let sub = sub.map(|r| {
|
||||
let sub = sub.map(move |r| {
|
||||
r.map(|h| {
|
||||
let hash = h.hash();
|
||||
let hash = h.hash_with(hasher);
|
||||
(h, BlockRef::from_hash(hash))
|
||||
})
|
||||
});
|
||||
@@ -356,7 +361,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
||||
async fn submit_transaction(
|
||||
&self,
|
||||
extrinsic: &[u8],
|
||||
) -> Result<StreamOfResults<TransactionStatus<T::Hash>>, Error> {
|
||||
) -> Result<StreamOfResults<TransactionStatus<HashFor<T>>>, Error> {
|
||||
let sub = self
|
||||
.methods
|
||||
.author_submit_and_watch_extrinsic(extrinsic)
|
||||
@@ -417,7 +422,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
|
||||
&self,
|
||||
method: &str,
|
||||
call_parameters: Option<&[u8]>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
) -> Result<Vec<u8>, Error> {
|
||||
retry(|| async {
|
||||
let res = self
|
||||
@@ -484,7 +489,7 @@ where
|
||||
pub struct StorageFetchDescendantKeysStream<T: Config> {
|
||||
methods: LegacyRpcMethods<T>,
|
||||
key: Vec<u8>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
// How many entries to ask for each time.
|
||||
storage_page_size: u32,
|
||||
// What key do we start paginating from? None = from the beginning.
|
||||
|
||||
+24
-22
@@ -10,9 +10,9 @@ pub mod chain_head;
|
||||
pub mod legacy;
|
||||
pub mod utils;
|
||||
|
||||
use crate::config::{Config, HashFor};
|
||||
use crate::error::Error;
|
||||
use crate::metadata::Metadata;
|
||||
use crate::Config;
|
||||
use async_trait::async_trait;
|
||||
use codec::{Decode, Encode};
|
||||
use futures::{Stream, StreamExt};
|
||||
@@ -82,37 +82,37 @@ pub trait Backend<T: Config>: sealed::Sealed + Send + Sync + 'static {
|
||||
async fn storage_fetch_values(
|
||||
&self,
|
||||
keys: Vec<Vec<u8>>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
) -> Result<StreamOfResults<StorageResponse>, Error>;
|
||||
|
||||
/// Fetch keys underneath the given key from storage.
|
||||
async fn storage_fetch_descendant_keys(
|
||||
&self,
|
||||
key: Vec<u8>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
) -> Result<StreamOfResults<Vec<u8>>, Error>;
|
||||
|
||||
/// Fetch values underneath the given key from storage.
|
||||
async fn storage_fetch_descendant_values(
|
||||
&self,
|
||||
key: Vec<u8>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
) -> Result<StreamOfResults<StorageResponse>, Error>;
|
||||
|
||||
/// Fetch the genesis hash
|
||||
async fn genesis_hash(&self) -> Result<T::Hash, Error>;
|
||||
async fn genesis_hash(&self) -> Result<HashFor<T>, Error>;
|
||||
|
||||
/// Get a block header
|
||||
async fn block_header(&self, at: T::Hash) -> Result<Option<T::Header>, Error>;
|
||||
async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, Error>;
|
||||
|
||||
/// Return the extrinsics found in the block. Each extrinsic is represented
|
||||
/// by a vector of bytes which has _not_ been SCALE decoded (in other words, the
|
||||
/// first bytes in the vector will decode to the compact encoded length of the extrinsic)
|
||||
async fn block_body(&self, at: T::Hash) -> Result<Option<Vec<Vec<u8>>>, Error>;
|
||||
async fn block_body(&self, at: HashFor<T>) -> Result<Option<Vec<Vec<u8>>>, Error>;
|
||||
|
||||
/// Get the most recent finalized block hash.
|
||||
/// Note: needed only in blocks client for finalized block stream; can prolly be removed.
|
||||
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<T::Hash>, Error>;
|
||||
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<HashFor<T>>, Error>;
|
||||
|
||||
/// Get information about the current runtime.
|
||||
async fn current_runtime_version(&self) -> Result<RuntimeVersion, Error>;
|
||||
@@ -123,30 +123,33 @@ pub trait Backend<T: Config>: sealed::Sealed + Send + Sync + 'static {
|
||||
/// A stream of all new block headers as they arrive.
|
||||
async fn stream_all_block_headers(
|
||||
&self,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error>;
|
||||
hasher: T::Hasher,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, Error>;
|
||||
|
||||
/// A stream of best block headers.
|
||||
async fn stream_best_block_headers(
|
||||
&self,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error>;
|
||||
hasher: T::Hasher,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, Error>;
|
||||
|
||||
/// A stream of finalized block headers.
|
||||
async fn stream_finalized_block_headers(
|
||||
&self,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error>;
|
||||
hasher: T::Hasher,
|
||||
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, Error>;
|
||||
|
||||
/// Submit a transaction. This will return a stream of events about it.
|
||||
async fn submit_transaction(
|
||||
&self,
|
||||
bytes: &[u8],
|
||||
) -> Result<StreamOfResults<TransactionStatus<T::Hash>>, Error>;
|
||||
) -> Result<StreamOfResults<TransactionStatus<HashFor<T>>>, Error>;
|
||||
|
||||
/// Make a call to some runtime API.
|
||||
async fn call(
|
||||
&self,
|
||||
method: &str,
|
||||
call_parameters: Option<&[u8]>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
) -> Result<Vec<u8>, Error>;
|
||||
}
|
||||
|
||||
@@ -157,7 +160,7 @@ pub trait BackendExt<T: Config>: Backend<T> {
|
||||
async fn storage_fetch_value(
|
||||
&self,
|
||||
key: Vec<u8>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
) -> Result<Option<Vec<u8>>, Error> {
|
||||
self.storage_fetch_values(vec![key], at)
|
||||
.await?
|
||||
@@ -173,7 +176,7 @@ pub trait BackendExt<T: Config>: Backend<T> {
|
||||
&self,
|
||||
method: &str,
|
||||
call_parameters: Option<&[u8]>,
|
||||
at: T::Hash,
|
||||
at: HashFor<T>,
|
||||
) -> Result<D, Error> {
|
||||
let bytes = self.call(method, call_parameters, at).await?;
|
||||
let res = D::decode(&mut &*bytes)?;
|
||||
@@ -181,7 +184,7 @@ pub trait BackendExt<T: Config>: Backend<T> {
|
||||
}
|
||||
|
||||
/// Return the metadata at some version.
|
||||
async fn metadata_at_version(&self, version: u32, at: T::Hash) -> Result<Metadata, Error> {
|
||||
async fn metadata_at_version(&self, version: u32, at: HashFor<T>) -> Result<Metadata, Error> {
|
||||
let param = version.encode();
|
||||
|
||||
let opaque: Option<frame_metadata::OpaqueMetadata> = self
|
||||
@@ -196,7 +199,7 @@ pub trait BackendExt<T: Config>: Backend<T> {
|
||||
}
|
||||
|
||||
/// Return V14 metadata from the legacy `Metadata_metadata` call.
|
||||
async fn legacy_metadata(&self, at: T::Hash) -> Result<Metadata, Error> {
|
||||
async fn legacy_metadata(&self, at: HashFor<T>) -> Result<Metadata, Error> {
|
||||
let opaque: frame_metadata::OpaqueMetadata =
|
||||
self.call_decoding("Metadata_metadata", None, at).await?;
|
||||
let metadata: Metadata = Decode::decode(&mut &opaque.0[..])?;
|
||||
@@ -412,7 +415,6 @@ mod test {
|
||||
// Define dummy config
|
||||
enum Conf {}
|
||||
impl Config for Conf {
|
||||
type Hash = H256;
|
||||
type AccountId = crate::utils::AccountId32;
|
||||
type Address = crate::utils::MultiAddress<Self::AccountId, ()>;
|
||||
type Signature = crate::utils::MultiSignature;
|
||||
@@ -540,7 +542,7 @@ mod test {
|
||||
/// - `call`
|
||||
/// The test covers them because they follow the simple pattern of:
|
||||
/// ```no_run
|
||||
/// async fn THE_THING(&self) -> Result<T::Hash, Error> {
|
||||
/// async fn THE_THING(&self) -> Result<HashFor<T>, Error> {
|
||||
/// retry(|| <DO THE THING> ).await
|
||||
/// }
|
||||
/// ```
|
||||
@@ -573,7 +575,7 @@ mod test {
|
||||
/// ```no_run
|
||||
/// async fn stream_the_thing(
|
||||
/// &self,
|
||||
/// ) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
|
||||
/// ) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, Error> {
|
||||
/// let methods = self.methods.clone();
|
||||
/// let retry_sub = retry_stream(move || {
|
||||
/// let methods = methods.clone();
|
||||
@@ -692,7 +694,7 @@ mod test {
|
||||
serde_json::from_value(spec).expect("Mock runtime spec should be the right shape")
|
||||
}
|
||||
|
||||
type FollowEvent = chain_head::FollowEvent<<Conf as Config>::Hash>;
|
||||
type FollowEvent = chain_head::FollowEvent<HashFor<Conf>>;
|
||||
|
||||
/// Build a mock client which can handle `chainHead_v1_follow` subscriptions.
|
||||
/// Messages from the provided receiver are sent to the latest active subscription.
|
||||
@@ -747,7 +749,7 @@ mod test {
|
||||
async move {
|
||||
if let Some(id) = id {
|
||||
let follow_event =
|
||||
FollowEvent::Initialized(Initialized::<<Conf as Config>::Hash> {
|
||||
FollowEvent::Initialized(Initialized::<HashFor<Conf>> {
|
||||
finalized_block_hashes: vec![random_hash()],
|
||||
finalized_block_runtime: Some(chain_head::RuntimeEvent::Valid(
|
||||
RuntimeVersionEvent {
|
||||
|
||||
Reference in New Issue
Block a user