chainHead based backend implementation (#1161)

* add follow_stream impl

* follow_stream_unpin first draft

* add tests for follow_stream_unpin

* more tests and fixes for follow_stream_unpin

* first pass follow_stream_driver

* follow_stream_driver: add tests, fix things, buffer events from last finalized

* First pass finishing Backend impl

* Fix test compile issues

* clippy fixes

* clippy fix and consistify light_client

* revert lightclient tweak

* revert other lightclient thing

* cargo fmt

* start testing unstable backend behind feature flag

* more test fixes and move test-runtime metadata path just incase

* fix compile error

* ensure transaction progress stream actually used and fix another test

* cargo fmt

* CI tweak

* improve some comments and address some feedback bits

* update CI to use our own nightly binary

* wait for finalized block perhaps
This commit is contained in:
James Wilson
2023-09-26 16:58:30 +01:00
committed by GitHub
parent 00cce68371
commit cf7e2db1b7
43 changed files with 2682 additions and 250 deletions
+14 -11
View File
@@ -2,14 +2,12 @@ name: Daily compatibility check against latest substrate
on:
schedule:
# Run at 8am every day
# Run at 8am every day, well after the new binary is built
- cron: "0 8 * * *"
env:
CARGO_TERM_COLOR: always
# Use latest substrate for nightly runs:
SUBSTRATE_URL: https://releases.parity.io/substrate/x86_64-debian:bullseye/latest/substrate/substrate
jobs:
tests:
@@ -19,13 +17,19 @@ jobs:
- name: Checkout sources
uses: actions/checkout@v4
- name: Download Substrate
- name: Download substrate-node binary
id: download-artifact
uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0
with:
workflow: build-substrate.yml
name: nightly-substrate-binary
- name: Prepare substrate-node binary
run: |
curl $SUBSTRATE_URL --output substrate-node --location
chmod +x substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
mv substrate-node ~/.local/bin
chmod u+x ./substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
cp ./substrate-node ~/.local/bin
- name: Install Rust stable toolchain
uses: actions-rs/toolchain@v1
@@ -43,8 +47,7 @@ jobs:
command: test
args: --all-targets --workspace
# If the previous step fails, create a new Github issue
# to nofity us about it.
# If any previous step fails, create a new Github issue to notify us about it.
- if: ${{ failure() }}
uses: JasonEtco/create-an-issue@e27dddc79c92bc6e4562f268fffa5ed752639abd # v2.9.1
env:
+141 -67
View File
@@ -18,8 +18,6 @@ concurrency:
env:
CARGO_TERM_COLOR: always
# TODO: Currently pointing at latest substrate; is there a suitable binary we can pin to here?
SUBSTRATE_URL: https://releases.parity.io/substrate/x86_64-debian:bullseye/latest/substrate/substrate
# Increase wasm test timeout from 20 seconds (default) to 1 minute.
WASM_BINDGEN_TEST_TIMEOUT: 60
@@ -31,13 +29,19 @@ jobs:
- name: Checkout sources
uses: actions/checkout@v4
- name: Download Substrate
- name: Download substrate-node binary
id: download-artifact
uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0
with:
workflow: build-substrate.yml
name: nightly-substrate-binary
- name: Prepare substrate-node binary
run: |
curl $SUBSTRATE_URL --output substrate-node --location
chmod +x substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
mv substrate-node ~/.local/bin
chmod u+x ./substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
cp ./substrate-node ~/.local/bin
- name: Install Rust stable toolchain
uses: actions-rs/toolchain@v1
@@ -137,13 +141,19 @@ jobs:
- name: Checkout sources
uses: actions/checkout@v4
- name: Download Substrate
- name: Download substrate-node binary
id: download-artifact
uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0
with:
workflow: build-substrate.yml
name: nightly-substrate-binary
- name: Prepare substrate-node binary
run: |
curl $SUBSTRATE_URL --output substrate-node --location
chmod +x substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
mv substrate-node ~/.local/bin
chmod u+x ./substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
cp ./substrate-node ~/.local/bin
- name: Install Rust stable toolchain
uses: actions-rs/toolchain@v1
@@ -165,19 +175,25 @@ jobs:
args: --doc
tests:
name: "Test non-wasm"
name: "Test (Native)"
runs-on: ubuntu-latest-16-cores
steps:
- name: Checkout sources
uses: actions/checkout@v4
- name: Download Substrate
- name: Download substrate-node binary
id: download-artifact
uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0
with:
workflow: build-substrate.yml
name: nightly-substrate-binary
- name: Prepare substrate-node binary
run: |
curl $SUBSTRATE_URL --output substrate-node --location
chmod +x substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
mv substrate-node ~/.local/bin
chmod u+x ./substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
cp ./substrate-node ~/.local/bin
- name: Install Rust stable toolchain
uses: actions-rs/toolchain@v1
@@ -198,21 +214,67 @@ jobs:
command: nextest
args: run --workspace
unstable_backend_tests:
name: "Test (Unstable Backend)"
runs-on: ubuntu-latest-16-cores
steps:
- name: Checkout sources
uses: actions/checkout@v3
- name: Download substrate-node binary
id: download-artifact
uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0
with:
workflow: build-substrate.yml
name: nightly-substrate-binary
- name: Prepare substrate-node binary
run: |
chmod u+x ./substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
cp ./substrate-node ~/.local/bin
- name: Install Rust stable toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- name: Rust Cache
uses: Swatinem/rust-cache@e207df5d269b42b69c8bc5101da26f7d31feddb4 # v2.6.2
- name: Install cargo-nextest
run: cargo install cargo-nextest
- name: Run tests
uses: actions-rs/cargo@v1.0.3
with:
command: nextest
args: run --workspace --features unstable-backend-client
light_client_tests:
name: "Test Light Client"
name: "Test (Light Client)"
runs-on: ubuntu-latest-16-cores
timeout-minutes: 25
steps:
- name: Checkout sources
uses: actions/checkout@v4
- name: Download Substrate
- name: Download substrate-node binary
id: download-artifact
uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0
with:
workflow: build-substrate.yml
name: nightly-substrate-binary
- name: Prepare substrate-node binary
run: |
curl $SUBSTRATE_URL --output substrate-node --location
chmod +x substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
mv substrate-node ~/.local/bin
chmod u+x ./substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
cp ./substrate-node ~/.local/bin
- name: Install Rust stable toolchain
uses: actions-rs/toolchain@v1
@@ -230,40 +292,8 @@ jobs:
command: test
args: --release --package integration-tests --features unstable-light-client
clippy:
name: Cargo clippy
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v4
- name: Download Substrate
run: |
curl $SUBSTRATE_URL --output substrate-node --location
chmod +x substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
mv substrate-node ~/.local/bin
- name: Install Rust stable toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
components: clippy
override: true
- name: Rust Cache
uses: Swatinem/rust-cache@a95ba195448af2da9b00fb742d14ffaaf3c21f43 # v2.7.0
- name: Run clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: --all-targets -- -D warnings
wasm_tests:
name: Test wasm
name: Test (WASM)
runs-on: ubuntu-latest
steps:
@@ -281,13 +311,19 @@ jobs:
- name: Rust Cache
uses: Swatinem/rust-cache@a95ba195448af2da9b00fb742d14ffaaf3c21f43 # v2.7.0
- name: Download Substrate
- name: Download substrate-node binary
id: download-artifact
uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0
with:
workflow: build-substrate.yml
name: nightly-substrate-binary
- name: Prepare substrate-node binary
run: |
curl $SUBSTRATE_URL --output substrate-node --location
chmod +x substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
mv substrate-node ~/.local/bin
chmod u+x ./substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
cp ./substrate-node ~/.local/bin
- name: Run subxt WASM tests
run: |
@@ -314,3 +350,41 @@ jobs:
wasm-pack test --headless --firefox
wasm-pack test --headless --chrome
working-directory: signer/wasm-tests
clippy:
name: Cargo clippy
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v4
- name: Download substrate-node binary
id: download-artifact
uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0
with:
workflow: build-substrate.yml
name: nightly-substrate-binary
- name: Prepare substrate-node binary
run: |
chmod u+x ./substrate-node
./substrate-node --version
mkdir -p ~/.local/bin
cp ./substrate-node ~/.local/bin
- name: Install Rust stable toolchain
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
components: clippy
override: true
- name: Rust Cache
uses: Swatinem/rust-cache@a95ba195448af2da9b00fb742d14ffaaf3c21f43 # v2.7.0
- name: Run clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: --all-targets -- -D warnings
-1
View File
@@ -427,7 +427,6 @@ pub fn get_custom_value_hash(
pub fn get_storage_hash(pallet: &PalletMetadata, entry_name: &str) -> Option<[u8; HASH_LEN]> {
let storage = pallet.storage()?;
let entry = storage.entry_by_name(entry_name)?;
let hash = get_storage_entry_hash(pallet.types, entry, &mut HashMap::new());
Some(hash)
}
+1 -1
View File
@@ -12,7 +12,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Build a balance transfer extrinsic.
let dest = dev::bob().public_key().into();
let balance_transfer_tx = polkadot::tx().balances().transfer(dest, 10_000);
let balance_transfer_tx = polkadot::tx().balances().transfer_allow_death(dest, 10_000);
// Submit the balance transfer extrinsic from Alice, and wait for it to be successful
// and in a finalized block. We get back the extrinsic events if all is well.
+1 -1
View File
@@ -12,7 +12,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Build a balance transfer extrinsic.
let dest = dev::bob().public_key().into();
let balance_transfer_tx = polkadot::tx().balances().transfer(dest, 10_000);
let balance_transfer_tx = polkadot::tx().balances().transfer_allow_death(dest, 10_000);
// Submit the balance transfer extrinsic from Alice, and then monitor the
// progress of it.
+1 -1
View File
@@ -12,7 +12,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Build a balance transfer extrinsic.
let dest = dev::bob().public_key().into();
let tx = polkadot::tx().balances().transfer(dest, 10_000);
let tx = polkadot::tx().balances().transfer_allow_death(dest, 10_000);
let latest_block = api.blocks().at_latest().await?;
@@ -26,7 +26,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Build a balance transfer extrinsic.
let dest = dev::bob().public_key().into();
let balance_transfer_tx = polkadot::tx().balances().transfer(dest, 10_000);
let balance_transfer_tx = polkadot::tx().balances().transfer_allow_death(dest, 10_000);
// Submit the balance transfer extrinsic from Alice, and wait for it to be successful
// and in a finalized block. We get back the extrinsic events if all is well.
+7 -11
View File
@@ -19,6 +19,7 @@ use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
// Expose the RPC methods.
pub use rpc_methods::LegacyRpcMethods;
/// The legacy backend.
@@ -125,15 +126,6 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
Ok(BlockRef::from_hash(hash))
}
async fn latest_best_block_ref(&self) -> Result<BlockRef<T::Hash>, Error> {
let hash = self
.methods
.chain_get_block_hash(None)
.await?
.ok_or_else(|| Error::Other("Latest best block doesn't exist".into()))?;
Ok(BlockRef::from_hash(hash))
}
async fn current_runtime_version(&self) -> Result<RuntimeVersion, Error> {
let details = self.methods.state_get_runtime_version(None).await?;
Ok(RuntimeVersion {
@@ -231,7 +223,9 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
})
}
RpcTransactionStatus::InBlock(hash) => {
Some(TransactionStatus::InBestBlock { hash })
Some(TransactionStatus::InBestBlock {
hash: BlockRef::from_hash(hash),
})
}
// These 5 mean that the stream will very likely end:
RpcTransactionStatus::FinalityTimeout(_) => {
@@ -240,7 +234,9 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
})
}
RpcTransactionStatus::Finalized(hash) => {
Some(TransactionStatus::InFinalizedBlock { hash })
Some(TransactionStatus::InFinalizedBlock {
hash: BlockRef::from_hash(hash),
})
}
RpcTransactionStatus::Usurped(_) => Some(TransactionStatus::Invalid {
message: "Transaction was usurped by another with the same nonce"
+4 -6
View File
@@ -65,10 +65,6 @@ pub trait Backend<T: Config>: sealed::Sealed + Send + Sync + 'static {
/// Note: needed only in blocks client for finalized block stream; can prolly be removed.
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<T::Hash>, Error>;
/// Get the most recent best block hash.
/// Note: needed only in blocks client for finalized block stream; can prolly be removed.
async fn latest_best_block_ref(&self) -> Result<BlockRef<T::Hash>, Error>;
/// Get information about the current runtime.
async fn current_runtime_version(&self) -> Result<RuntimeVersion, Error>;
@@ -314,15 +310,17 @@ pub enum TransactionStatus<Hash> {
/// Number of peers it's been broadcast to.
num_peers: u32,
},
/// Transaciton is no longer in a best block.
NoLongerInBestBlock,
/// Transaction has been included in block with given hash.
InBestBlock {
/// Block hash the transaction is in.
hash: Hash,
hash: BlockRef<Hash>,
},
/// Transaction has been finalized by a finality-gadget, e.g GRANDPA
InFinalizedBlock {
/// Block hash the transaction is in.
hash: Hash,
hash: BlockRef<Hash>,
},
/// Something went wrong in the node.
Error {
+314
View File
@@ -0,0 +1,314 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::rpc_methods::{FollowEvent, UnstableRpcMethods};
use crate::config::Config;
use crate::error::Error;
use futures::{FutureExt, Stream, StreamExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// A `Stream` whose goal is to remain subscribed to `chainHead_follow`. It will re-subscribe if the subscription
/// is ended for any reason, and it will return the current `subscription_id` as an event, along with the other
/// follow events.
pub struct FollowStream<Hash> {
// Using this and not just keeping a copy of the RPC methods
// around means that we can test this in isolation with dummy streams.
stream_getter: FollowEventStreamGetter<Hash>,
stream: InnerStreamState<Hash>,
}
impl<Hash> std::fmt::Debug for FollowStream<Hash> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FollowStream")
.field("stream_getter", &"..")
.field("stream", &self.stream)
.finish()
}
}
/// A getter function that returns an [`FollowEventStreamFut<Hash>`].
pub type FollowEventStreamGetter<Hash> = Box<dyn FnMut() -> FollowEventStreamFut<Hash> + Send>;
/// The future which will return a stream of follow events and the subscription ID for it.
pub type FollowEventStreamFut<Hash> = Pin<
Box<dyn Future<Output = Result<(FollowEventStream<Hash>, String), Error>> + Send + 'static>,
>;
/// The stream of follow events.
pub type FollowEventStream<Hash> =
Pin<Box<dyn Stream<Item = Result<FollowEvent<Hash>, Error>> + Send + 'static>>;
/// Either a ready message with the current subscription ID, or
/// an event from the stream itself.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FollowStreamMsg<Hash> {
/// The stream is ready (and has a subscription ID)
Ready(String),
/// An event from the stream.
Event(FollowEvent<Hash>),
}
impl<Hash> FollowStreamMsg<Hash> {
/// Return an event, or none if the message is a "ready" one.
pub fn into_event(self) -> Option<FollowEvent<Hash>> {
match self {
FollowStreamMsg::Ready(_) => None,
FollowStreamMsg::Event(e) => Some(e),
}
}
}
enum InnerStreamState<Hash> {
/// We've just created the stream; we'll start Initializing it
New,
/// We're fetching the inner subscription. Move to Ready when we have one.
Initializing(FollowEventStreamFut<Hash>),
/// Report back the subscription ID here, and then start ReceivingEvents.
Ready(Option<(FollowEventStream<Hash>, String)>),
/// We are polling for, and receiving events from the stream.
ReceivingEvents(FollowEventStream<Hash>),
/// We received a stop event. We'll send one on and restart the stream.
Stopped,
/// The stream is finished and will not restart (likely due to an error).
Finished,
}
impl<Hash> std::fmt::Debug for InnerStreamState<Hash> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::New => write!(f, "New"),
Self::Initializing(_) => write!(f, "Initializing(..)"),
Self::Ready(_) => write!(f, "Ready(..)"),
Self::ReceivingEvents(_) => write!(f, "ReceivingEvents(..)"),
Self::Stopped => write!(f, "Stopped"),
Self::Finished => write!(f, "Finished"),
}
}
}
impl<Hash> FollowStream<Hash> {
/// Create a new [`FollowStream`] given a function which returns the stream.
pub fn new(stream_getter: FollowEventStreamGetter<Hash>) -> Self {
Self {
stream_getter,
stream: InnerStreamState::New,
}
}
/// Create a new [`FollowStream`] given the RPC methods.
pub fn from_methods<T: Config>(methods: UnstableRpcMethods<T>) -> FollowStream<T::Hash> {
FollowStream {
stream_getter: Box::new(move || {
let methods = methods.clone();
Box::pin(async move {
// Make the RPC call:
let stream = methods.chainhead_unstable_follow(true).await?;
// Extract the subscription ID:
let Some(sub_id) = stream.subscription_id().map(ToOwned::to_owned) else {
return Err(Error::Other(
"Subscription ID expected for chainHead_follow response, but not given"
.to_owned(),
));
};
// Return both:
let stream: FollowEventStream<T::Hash> = Box::pin(stream);
Ok((stream, sub_id))
})
}),
stream: InnerStreamState::New,
}
}
}
impl<Hash> std::marker::Unpin for FollowStream<Hash> {}
impl<Hash> Stream for FollowStream<Hash> {
type Item = Result<FollowStreamMsg<Hash>, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
match &mut this.stream {
InnerStreamState::New => {
let fut = (this.stream_getter)();
this.stream = InnerStreamState::Initializing(fut);
continue;
}
InnerStreamState::Initializing(fut) => {
match fut.poll_unpin(cx) {
Poll::Pending => {
return Poll::Pending;
}
Poll::Ready(Ok(sub_with_id)) => {
this.stream = InnerStreamState::Ready(Some(sub_with_id));
continue;
}
Poll::Ready(Err(e)) => {
// Finish forever if there's an error, passing it on.
this.stream = InnerStreamState::Finished;
return Poll::Ready(Some(Err(e)));
}
}
}
InnerStreamState::Ready(stream) => {
// We never set the Option to `None`; we just have an Option so
// that we can take ownership of the contents easily here.
let (sub, sub_id) = stream.take().expect("should always be Some");
this.stream = InnerStreamState::ReceivingEvents(sub);
return Poll::Ready(Some(Ok(FollowStreamMsg::Ready(sub_id))));
}
InnerStreamState::ReceivingEvents(stream) => {
match stream.poll_next_unpin(cx) {
Poll::Pending => {
return Poll::Pending;
}
Poll::Ready(None) => {
// No error happened but the stream ended; restart and
// pass on a Stop message anyway.
this.stream = InnerStreamState::Stopped;
continue;
}
Poll::Ready(Some(Ok(ev))) => {
if let FollowEvent::Stop = ev {
// A stop event means the stream has ended, so start
// over after passing on the stop message.
this.stream = InnerStreamState::Stopped;
continue;
}
return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev))));
}
Poll::Ready(Some(Err(e))) => {
// Finish forever if there's an error, passing it on.
this.stream = InnerStreamState::Finished;
return Poll::Ready(Some(Err(e)));
}
}
}
InnerStreamState::Stopped => {
this.stream = InnerStreamState::New;
return Poll::Ready(Some(Ok(FollowStreamMsg::Event(FollowEvent::Stop))));
}
InnerStreamState::Finished => {
return Poll::Ready(None);
}
}
}
}
}
#[cfg(test)]
pub(super) mod test_utils {
use super::*;
use crate::backend::unstable::rpc_methods::{
BestBlockChanged, Finalized, Initialized, NewBlock,
};
use crate::config::substrate::H256;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
/// Given some events, returns a follow stream getter that we can use in
/// place of the usual RPC method.
pub fn test_stream_getter<Hash, F, I>(events: F) -> FollowEventStreamGetter<Hash>
where
Hash: Send + 'static,
F: Fn() -> I + Send + 'static,
I: IntoIterator<Item = Result<FollowEvent<Hash>, Error>>,
{
let start_idx = Arc::new(AtomicUsize::new(0));
Box::new(move || {
// Start the events from where we left off last time.
let start_idx = start_idx.clone();
let this_idx = start_idx.load(Ordering::Relaxed);
let events: Vec<_> = events().into_iter().skip(this_idx).collect();
Box::pin(async move {
// Increment start_idx for each event we see, so that if we get
// the stream again, we get only the remaining events for it.
let stream = futures::stream::iter(events).map(move |ev| {
start_idx.fetch_add(1, Ordering::Relaxed);
ev
});
let stream: FollowEventStream<Hash> = Box::pin(stream);
Ok((stream, format!("sub_id_{this_idx}")))
})
})
}
/// An initialized event
pub fn ev_initialized(n: u64) -> FollowEvent<H256> {
FollowEvent::Initialized(Initialized {
finalized_block_hash: H256::from_low_u64_le(n),
finalized_block_runtime: None,
})
}
/// A new block event
pub fn ev_new_block(parent: u64, n: u64) -> FollowEvent<H256> {
FollowEvent::NewBlock(NewBlock {
parent_block_hash: H256::from_low_u64_le(parent),
block_hash: H256::from_low_u64_le(n),
new_runtime: None,
})
}
/// A best block event
pub fn ev_best_block(n: u64) -> FollowEvent<H256> {
FollowEvent::BestBlockChanged(BestBlockChanged {
best_block_hash: H256::from_low_u64_le(n),
})
}
/// A finalized event
pub fn ev_finalized(ns: impl IntoIterator<Item = u64>) -> FollowEvent<H256> {
FollowEvent::Finalized(Finalized {
finalized_block_hashes: ns.into_iter().map(H256::from_low_u64_le).collect(),
pruned_block_hashes: vec![],
})
}
}
#[cfg(test)]
pub mod test {
use super::*;
use test_utils::{ev_initialized, ev_new_block, test_stream_getter};
#[tokio::test]
async fn follow_stream_provides_messages_until_error() {
// The events we'll get back on the stream.
let stream_getter = test_stream_getter(|| {
[
Ok(ev_initialized(1)),
// Stop should lead to a drop and resubscribe:
Ok(FollowEvent::Stop),
Ok(FollowEvent::Stop),
Ok(ev_new_block(1, 2)),
// Nothing should be emitted after an error:
Err(Error::Other("ended".to_owned())),
Ok(ev_new_block(2, 3)),
]
});
let s = FollowStream::new(stream_getter);
let out: Vec<_> = s.filter_map(|e| async move { e.ok() }).collect().await;
// The expected response, given the above.
assert_eq!(
out,
vec![
FollowStreamMsg::Ready("sub_id_0".to_owned()),
FollowStreamMsg::Event(ev_initialized(1)),
FollowStreamMsg::Event(FollowEvent::Stop),
FollowStreamMsg::Ready("sub_id_2".to_owned()),
FollowStreamMsg::Event(FollowEvent::Stop),
FollowStreamMsg::Ready("sub_id_3".to_owned()),
FollowStreamMsg::Event(ev_new_block(1, 2)),
]
);
}
}
@@ -0,0 +1,488 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::follow_stream_unpin::{BlockRef, FollowStreamMsg, FollowStreamUnpin};
use crate::backend::unstable::rpc_methods::{FollowEvent, Initialized, RuntimeEvent};
use crate::config::BlockHash;
use crate::error::Error;
use futures::stream::{Stream, StreamExt};
use std::collections::{HashMap, VecDeque};
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
/// A `Stream` which builds on `FollowStreamDriver`, and allows multiple subscribers to obtain events
/// from the single underlying subscription (each being provided an `Initialized` message and all new
/// blocks since then, as if they were each creating a unique `chainHead_follow` subscription). This
/// is the "top" layer of our follow stream subscriptions, and the one that's interacted with elsewhere.
#[derive(Debug)]
pub struct FollowStreamDriver<Hash: BlockHash> {
inner: FollowStreamUnpin<Hash>,
shared: Shared<Hash>,
}
impl<Hash: BlockHash> FollowStreamDriver<Hash> {
/// Create a new [`FollowStreamDriver`]. This must be polled by some executor
/// in order for any progress to be made. Things can subscribe to events.
pub fn new(follow_unpin: FollowStreamUnpin<Hash>) -> Self {
Self {
inner: follow_unpin,
shared: Shared::default(),
}
}
/// Return a handle from which we can create new subscriptions to follow events.
pub fn handle(&self) -> FollowStreamDriverHandle<Hash> {
FollowStreamDriverHandle {
shared: self.shared.clone(),
}
}
}
impl<Hash: BlockHash> Stream for FollowStreamDriver<Hash> {
type Item = Result<(), Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = match self.inner.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
// Mark ourselves as done so that everything can end.
self.shared.done();
return Poll::Ready(None);
}
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(item))) => item,
};
self.shared.push_item(item);
Poll::Ready(Some(Ok(())))
}
}
/// A handle that can be used to create subscribers, but that doesn't
/// itself subscribe to events.
#[derive(Debug, Clone)]
pub struct FollowStreamDriverHandle<Hash: BlockHash> {
shared: Shared<Hash>,
}
impl<Hash: BlockHash> FollowStreamDriverHandle<Hash> {
/// Subscribe to follow events.
pub fn subscribe(&self) -> FollowStreamDriverSubscription<Hash> {
self.shared.subscribe()
}
}
/// A subscription to events from the [`FollowStreamDriver`]. All subscriptions
/// begin first with a `Ready` event containing the current subscription ID, and
/// then with an `Initialized` event containing the latest finalized block and latest
/// runtime information, and then any new/best block events and so on received since
/// the latest finalized block.
#[derive(Debug)]
pub struct FollowStreamDriverSubscription<Hash: BlockHash> {
id: usize,
done: bool,
shared: Shared<Hash>,
local_items: VecDeque<FollowStreamMsg<BlockRef<Hash>>>,
}
impl<Hash: BlockHash> Stream for FollowStreamDriverSubscription<Hash> {
type Item = FollowStreamMsg<BlockRef<Hash>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}
loop {
if let Some(item) = self.local_items.pop_front() {
return Poll::Ready(Some(item));
}
let items = self.shared.take_items_and_save_waker(self.id, cx.waker());
// If no items left, mark locally as done (to avoid further locking)
// and return None to signal done-ness.
let Some(items) = items else {
self.done = true;
return Poll::Ready(None);
};
// No items? We've saved the waker so we'll be told when more come.
// Else, save the items locally and loop around to pop from them.
if items.is_empty() {
return Poll::Pending;
} else {
self.local_items = items;
}
}
}
}
impl<Hash: BlockHash> FollowStreamDriverSubscription<Hash> {
/// Return the current subscription ID. If the subscription has stopped, then this will
/// wait until a new subscription has started with a new ID.
pub async fn subscription_id(self) -> Option<String> {
let ready_event = self
.skip_while(|ev| std::future::ready(!matches!(ev, FollowStreamMsg::Ready(_))))
.next()
.await?;
match ready_event {
FollowStreamMsg::Ready(sub_id) => Some(sub_id),
_ => None,
}
}
/// Subscribe to the follow events, ignoring any other messages.
pub fn events(self) -> impl Stream<Item = FollowEvent<BlockRef<Hash>>> + Send + Sync {
self.filter_map(|ev| std::future::ready(ev.into_event()))
}
}
impl<Hash: BlockHash> Clone for FollowStreamDriverSubscription<Hash> {
fn clone(&self) -> Self {
self.shared.subscribe()
}
}
impl<Hash: BlockHash> Drop for FollowStreamDriverSubscription<Hash> {
fn drop(&mut self) {
self.shared.remove_sub(self.id);
}
}
/// Locked shared state. The driver stream will access this state to push
/// events to any subscribers, and subscribers will access it to pull the
/// events destined for themselves.
#[derive(Debug, Clone)]
struct Shared<Hash: BlockHash>(Arc<Mutex<SharedState<Hash>>>);
#[derive(Debug)]
struct SharedState<Hash: BlockHash> {
done: bool,
next_id: usize,
subscribers: HashMap<usize, SubscriberDetails<Hash>>,
// Keep a buffer of all events from last finalized block so that new
// subscriptions can be handed this info first.
block_events_from_last_finalized: VecDeque<FollowEvent<BlockRef<Hash>>>,
// Keep track of the subscription ID we send out on new subs.
current_subscription_id: Option<String>,
// Keep track of the init message we send out on new subs.
current_init_message: Option<Initialized<BlockRef<Hash>>>,
// Runtime events by block hash; we need to track these to know
// whether the runtime has changed when we see a finalized block notification.
seen_runtime_events: HashMap<Hash, RuntimeEvent>,
}
impl<Hash: BlockHash> Default for Shared<Hash> {
fn default() -> Self {
Shared(Arc::new(Mutex::new(SharedState {
next_id: 1,
done: false,
subscribers: HashMap::new(),
current_init_message: None,
current_subscription_id: None,
seen_runtime_events: HashMap::new(),
block_events_from_last_finalized: VecDeque::new(),
})))
}
}
impl<Hash: BlockHash> Shared<Hash> {
/// Set the shared state to "done"; no more items will be handed to it.
pub fn done(&self) {
let mut shared = self.0.lock().unwrap();
shared.done = true;
}
/// Cleanup a subscription.
pub fn remove_sub(&self, sub_id: usize) {
let mut shared = self.0.lock().unwrap();
shared.subscribers.remove(&sub_id);
}
/// Take items for some subscription ID and save the waker.
pub fn take_items_and_save_waker(
&self,
sub_id: usize,
waker: &Waker,
) -> Option<VecDeque<FollowStreamMsg<BlockRef<Hash>>>> {
let mut shared = self.0.lock().unwrap();
let is_done = shared.done;
let details = shared.subscribers.get_mut(&sub_id)?;
// no more items to pull, and stream closed, so return None.
if details.items.is_empty() && is_done {
return None;
}
// else, take whatever items, and save the waker if not done yet.
let items = std::mem::take(&mut details.items);
if !is_done {
details.waker = Some(waker.clone());
}
Some(items)
}
/// Push a new item out to subscribers.
pub fn push_item(&self, item: FollowStreamMsg<BlockRef<Hash>>) {
let mut shared = self.0.lock().unwrap();
let shared = shared.deref_mut();
// broadcast item to subscribers:
for details in shared.subscribers.values_mut() {
details.items.push_back(item.clone());
if let Some(waker) = details.waker.take() {
waker.wake();
}
}
// Keep our buffer of ready/block events uptodate:
match item {
FollowStreamMsg::Ready(sub_id) => {
// Set new subscription ID when it comes in.
shared.current_subscription_id = Some(sub_id);
}
FollowStreamMsg::Event(FollowEvent::Initialized(ev)) => {
// New subscriptions will be given this init message:
shared.current_init_message = Some(ev.clone());
// Clear block cache (since a new finalized block hash is seen):
shared.block_events_from_last_finalized.clear();
}
FollowStreamMsg::Event(FollowEvent::Finalized(finalized_ev)) => {
// Update the init message that we'll hand out to new subscriptions. If the init message
// is `None` for some reason, we just ignore this step.
if let Some(init_message) = &mut shared.current_init_message {
// Find the latest runtime update that's been finalized.
let newest_runtime = finalized_ev
.finalized_block_hashes
.iter()
.rev()
.filter_map(|h| shared.seen_runtime_events.get(&h.hash()).cloned())
.next();
shared.seen_runtime_events.clear();
if let Some(finalized) = finalized_ev.finalized_block_hashes.last() {
init_message.finalized_block_hash = finalized.clone();
}
if let Some(runtime_ev) = newest_runtime {
init_message.finalized_block_runtime = Some(runtime_ev);
}
}
// New finalized block, so clear the cache of older block events.
shared.block_events_from_last_finalized.clear();
}
FollowStreamMsg::Event(FollowEvent::NewBlock(new_block_ev)) => {
// If a new runtime is seen, note it so that when a block is finalized, we
// can associate that with a runtime update having happened.
if let Some(runtime_event) = &new_block_ev.new_runtime {
shared
.seen_runtime_events
.insert(new_block_ev.block_hash.hash(), runtime_event.clone());
}
shared
.block_events_from_last_finalized
.push_back(FollowEvent::NewBlock(new_block_ev));
}
FollowStreamMsg::Event(ev @ FollowEvent::BestBlockChanged(_)) => {
shared.block_events_from_last_finalized.push_back(ev);
}
FollowStreamMsg::Event(FollowEvent::Stop) => {
// On a stop event, clear everything. Wait for resubscription and new ready/initialised events.
shared.block_events_from_last_finalized.clear();
shared.current_subscription_id = None;
shared.current_init_message = None;
}
_ => {
// We don't buffer any other events.
}
}
}
/// Create a new subscription.
pub fn subscribe(&self) -> FollowStreamDriverSubscription<Hash> {
let mut shared = self.0.lock().unwrap();
let id = shared.next_id;
shared.next_id += 1;
shared.subscribers.insert(
id,
SubscriberDetails {
items: VecDeque::new(),
waker: None,
},
);
// Any new subscription should start with a "Ready" message and then an "Initialized"
// message, and then any non-finalized block events since that. If these don't exist,
// it means the subscription is currently stopped, and we should expect new Ready/Init
// messages anyway once it restarts.
let mut local_items = VecDeque::new();
if let Some(sub_id) = &shared.current_subscription_id {
local_items.push_back(FollowStreamMsg::Ready(sub_id.clone()));
}
if let Some(init_msg) = &shared.current_init_message {
local_items.push_back(FollowStreamMsg::Event(FollowEvent::Initialized(
init_msg.clone(),
)));
}
for ev in &shared.block_events_from_last_finalized {
local_items.push_back(FollowStreamMsg::Event(ev.clone()));
}
drop(shared);
FollowStreamDriverSubscription {
id,
done: false,
shared: self.clone(),
local_items,
}
}
}
/// Details for a given subscriber: any items it's not yet claimed,
/// and a way to wake it up when there are more items for it.
#[derive(Debug)]
struct SubscriberDetails<Hash: BlockHash> {
items: VecDeque<FollowStreamMsg<BlockRef<Hash>>>,
waker: Option<Waker>,
}
#[cfg(test)]
mod test_utils {
use super::super::follow_stream_unpin::test_utils::test_unpin_stream_getter;
use super::*;
/// Return a `FollowStreamDriver`
pub fn test_follow_stream_driver_getter<Hash, F, I>(
events: F,
max_life: usize,
) -> FollowStreamDriver<Hash>
where
Hash: BlockHash + 'static,
F: Fn() -> I + Send + 'static,
I: IntoIterator<Item = Result<FollowEvent<Hash>, Error>>,
{
let (stream, _) = test_unpin_stream_getter(events, max_life);
FollowStreamDriver::new(stream)
}
}
#[cfg(test)]
mod test {
use super::super::follow_stream::test_utils::{
ev_best_block, ev_finalized, ev_initialized, ev_new_block,
};
use super::super::follow_stream_unpin::test_utils::{
ev_best_block_ref, ev_finalized_ref, ev_initialized_ref, ev_new_block_ref,
};
use super::test_utils::test_follow_stream_driver_getter;
use super::*;
#[test]
fn follow_stream_driver_is_sendable() {
fn assert_send<T: Send + 'static>(_: T) {}
let stream_getter = test_follow_stream_driver_getter(|| [Ok(ev_initialized(1))], 10);
assert_send(stream_getter);
}
#[tokio::test]
async fn subscribers_all_receive_events_and_finish_gracefully_on_error() {
let mut driver = test_follow_stream_driver_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(ev_new_block(0, 1)),
Ok(ev_best_block(1)),
Ok(ev_finalized([1])),
Err(Error::Other("ended".to_owned())),
]
},
10,
);
let handle = driver.handle();
let a = handle.subscribe();
let b = handle.subscribe();
let c = handle.subscribe();
// Drive to completion (the sort of real life usage I'd expect):
tokio::spawn(async move { while driver.next().await.is_some() {} });
let a_vec: Vec<_> = a.collect().await;
let b_vec: Vec<_> = b.collect().await;
let c_vec: Vec<_> = c.collect().await;
let expected = vec![
FollowStreamMsg::Ready("sub_id_0".into()),
FollowStreamMsg::Event(ev_initialized_ref(0)),
FollowStreamMsg::Event(ev_new_block_ref(0, 1)),
FollowStreamMsg::Event(ev_best_block_ref(1)),
FollowStreamMsg::Event(ev_finalized_ref([1])),
];
assert_eq!(a_vec, expected);
assert_eq!(b_vec, expected);
assert_eq!(c_vec, expected);
}
#[tokio::test]
async fn subscribers_receive_block_events_from_last_finalised() {
let mut driver = test_follow_stream_driver_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(ev_new_block(0, 1)),
Ok(ev_best_block(1)),
Ok(ev_finalized([1])),
Ok(ev_new_block(1, 2)),
Ok(ev_new_block(2, 3)),
Err(Error::Other("ended".to_owned())),
]
},
10,
);
// Skip past ready, init, new, best events.
let _r = driver.next().await.unwrap();
let _i0 = driver.next().await.unwrap();
let _n1 = driver.next().await.unwrap();
let _b1 = driver.next().await.unwrap();
// THEN subscribe; subscription should still receive them:
let evs: Vec<_> = driver.handle().subscribe().take(4).collect().await;
let expected = vec![
FollowStreamMsg::Ready("sub_id_0".into()),
FollowStreamMsg::Event(ev_initialized_ref(0)),
FollowStreamMsg::Event(ev_new_block_ref(0, 1)),
FollowStreamMsg::Event(ev_best_block_ref(1)),
];
assert_eq!(evs, expected);
// Skip past finalized 1, new 2, new 3 events
let _f1 = driver.next().await.unwrap();
let _n2 = driver.next().await.unwrap();
let _n3 = driver.next().await.unwrap();
// THEN subscribe again; new subs will see an updated initialized message
// with the latest finalized block hash.
let evs: Vec<_> = driver.handle().subscribe().take(4).collect().await;
let expected = vec![
FollowStreamMsg::Ready("sub_id_0".into()),
FollowStreamMsg::Event(ev_initialized_ref(1)),
FollowStreamMsg::Event(ev_new_block_ref(1, 2)),
FollowStreamMsg::Event(ev_new_block_ref(2, 3)),
];
assert_eq!(evs, expected);
}
}
@@ -0,0 +1,634 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::follow_stream::FollowStream;
use super::UnstableRpcMethods;
use crate::backend::unstable::rpc_methods::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock,
};
use crate::config::{BlockHash, Config};
use crate::error::Error;
use futures::stream::{FuturesUnordered, Stream, StreamExt};
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
/// The type of stream item.
pub use super::follow_stream::FollowStreamMsg;
/// A `Stream` which builds on `FollowStream`, and handles pinning. It replaces any block hash seen in
/// the follow events with a `BlockRef` which, when all clones are dropped, will lead to an "unpin" call
/// for that block hash being queued. It will also automatically unpin any blocks that exceed a given max
/// age, to try and prevent the underlying stream from ending (and _all_ blocks from being unpinned as a
/// result). Put simply, it tries to keep every block pinned as long as possible until the block is no longer
/// used anywhere.
#[derive(Debug)]
pub struct FollowStreamUnpin<Hash: BlockHash> {
// The underlying stream of events.
inner: FollowStream<Hash>,
// A method to call to unpin a block, given a block hash and a subscription ID.
unpin_method: UnpinMethodHolder<Hash>,
// Futures for sending unpin events that we'll poll to completion as
// part of polling the stream as a whole.
unpin_futs: FuturesUnordered<UnpinFut>,
// Each new finalized block increments this. Allows us to track
// the age of blocks so that we can unpin old ones.
rel_block_num: usize,
// The latest ID of the FollowStream subscription, which we can use
// to unpin blocks.
subscription_id: Option<Arc<str>>,
// The longest period a block can be pinned for.
max_block_life: usize,
// The currently seen and pinned blocks.
pinned: HashMap<Hash, PinnedDetails<Hash>>,
// Shared state about blocks we've flagged to unpin from elsewhere
unpin_flags: UnpinFlags<Hash>,
}
// Just a wrapper to make implementing debug on the whole thing easier.
struct UnpinMethodHolder<Hash>(UnpinMethod<Hash>);
impl<Hash> std::fmt::Debug for UnpinMethodHolder<Hash> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"UnpinMethodHolder(Box<dyn FnMut(Hash, Arc<str>) -> UnpinFut>)"
)
}
}
/// The type of the unpin method that we need to provide.
pub type UnpinMethod<Hash> = Box<dyn FnMut(Hash, Arc<str>) -> UnpinFut + Send>;
/// The future returned from [`UnpinMethod`].
pub type UnpinFut = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
impl<Hash: BlockHash> std::marker::Unpin for FollowStreamUnpin<Hash> {}
impl<Hash: BlockHash> Stream for FollowStreamUnpin<Hash> {
type Item = Result<FollowStreamMsg<BlockRef<Hash>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut();
loop {
// Poll the unpin tasks while they are completing. if we get back None, then
// no tasks in the list, and if pending, we'll be woken when we can poll again.
if let Poll::Ready(Some(())) = this.unpin_futs.poll_next_unpin(cx) {
continue;
};
// Poll the inner stream for the next event.
let Poll::Ready(ev) = this.inner.poll_next_unpin(cx) else {
return Poll::Pending;
};
// No more progress to be made if inner stream done.
let Some(ev) = ev else {
return Poll::Ready(None);
};
// Error? just return it and do nothing further.
let ev = match ev {
Ok(ev) => ev,
Err(e) => {
return Poll::Ready(Some(Err(e)));
}
};
// React to any actual FollowEvent we get back.
let ev = match ev {
FollowStreamMsg::Ready(subscription_id) => {
// update the subscription ID we'll use to unpin things.
this.subscription_id = Some(subscription_id.clone().into());
FollowStreamMsg::Ready(subscription_id)
}
FollowStreamMsg::Event(FollowEvent::Initialized(details)) => {
// The first finalized block gets the starting block_num.
let rel_block_num = this.rel_block_num;
let block_ref = this.pin_block_at(rel_block_num, details.finalized_block_hash);
FollowStreamMsg::Event(FollowEvent::Initialized(Initialized {
finalized_block_hash: block_ref,
finalized_block_runtime: details.finalized_block_runtime,
}))
}
FollowStreamMsg::Event(FollowEvent::NewBlock(details)) => {
// One bigger than our parent, and if no parent seen (maybe it was
// unpinned already), then one bigger than the last finalized block num
// as a best guess.
let parent_rel_block_num = this
.pinned
.get(&details.parent_block_hash)
.map(|p| p.rel_block_num)
.unwrap_or(this.rel_block_num);
let block_ref = this.pin_block_at(parent_rel_block_num + 1, details.block_hash);
let parent_block_ref =
this.pin_block_at(parent_rel_block_num, details.parent_block_hash);
FollowStreamMsg::Event(FollowEvent::NewBlock(NewBlock {
block_hash: block_ref,
parent_block_hash: parent_block_ref,
new_runtime: details.new_runtime,
}))
}
FollowStreamMsg::Event(FollowEvent::BestBlockChanged(details)) => {
// We expect this block to already exist, so it'll keep its existing block_num,
// but worst case it'll just get the current finalized block_num + 1.
let rel_block_num = this.rel_block_num + 1;
let block_ref = this.pin_block_at(rel_block_num, details.best_block_hash);
FollowStreamMsg::Event(FollowEvent::BestBlockChanged(BestBlockChanged {
best_block_hash: block_ref,
}))
}
FollowStreamMsg::Event(FollowEvent::Finalized(details)) => {
let finalized_block_refs: Vec<_> = details
.finalized_block_hashes
.into_iter()
.enumerate()
.map(|(idx, hash)| {
// These blocks _should_ exist already and so will have a known block num,
// but if they don't, we just increment the num from the last finalized block
// we saw, which should be accurate.
let rel_block_num = this.rel_block_num + idx + 1;
this.pin_block_at(rel_block_num, hash)
})
.collect();
// Our relative block height is increased by however many finalized
// blocks we've seen.
this.rel_block_num += finalized_block_refs.len();
let pruned_block_refs: Vec<_> = details
.pruned_block_hashes
.into_iter()
.map(|hash| {
// We should know about these, too, and if not we set their age to last_finalized + 1
let rel_block_num = this.rel_block_num + 1;
this.pin_block_at(rel_block_num, hash)
})
.collect();
// At this point, we also check to see which blocks we should submit unpin events
// for. When we see a block hash as finalized, we know that it won't be reported again
// (except as a parent hash of a new block), so we can safely make an unpin call for it
// without worrying about the hash being returned again despite the block not being pinned.
this.unpin_blocks(cx.waker());
FollowStreamMsg::Event(FollowEvent::Finalized(Finalized {
finalized_block_hashes: finalized_block_refs,
pruned_block_hashes: pruned_block_refs,
}))
}
FollowStreamMsg::Event(FollowEvent::Stop) => {
// clear out "old" things that are no longer applicable since
// the subscription has ended (a new one will be created under the hood).
this.pinned.clear();
this.unpin_futs.clear();
this.unpin_flags.lock().unwrap().clear();
this.rel_block_num = 0;
FollowStreamMsg::Event(FollowEvent::Stop)
}
// These events aren't intresting; we just forward them on:
FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details)) => {
FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details))
}
FollowStreamMsg::Event(FollowEvent::OperationCallDone(details)) => {
FollowStreamMsg::Event(FollowEvent::OperationCallDone(details))
}
FollowStreamMsg::Event(FollowEvent::OperationStorageItems(details)) => {
FollowStreamMsg::Event(FollowEvent::OperationStorageItems(details))
}
FollowStreamMsg::Event(FollowEvent::OperationWaitingForContinue(details)) => {
FollowStreamMsg::Event(FollowEvent::OperationWaitingForContinue(details))
}
FollowStreamMsg::Event(FollowEvent::OperationStorageDone(details)) => {
FollowStreamMsg::Event(FollowEvent::OperationStorageDone(details))
}
FollowStreamMsg::Event(FollowEvent::OperationInaccessible(details)) => {
FollowStreamMsg::Event(FollowEvent::OperationInaccessible(details))
}
FollowStreamMsg::Event(FollowEvent::OperationError(details)) => {
FollowStreamMsg::Event(FollowEvent::OperationError(details))
}
};
// Return our event.
return Poll::Ready(Some(Ok(ev)));
}
}
}
impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
/// Create a new [`FollowStreamUnpin`].
pub fn new(
follow_stream: FollowStream<Hash>,
unpin_method: UnpinMethod<Hash>,
max_block_life: usize,
) -> Self {
Self {
inner: follow_stream,
unpin_method: UnpinMethodHolder(unpin_method),
max_block_life,
pinned: Default::default(),
subscription_id: None,
rel_block_num: 0,
unpin_flags: Default::default(),
unpin_futs: Default::default(),
}
}
/// Create a new [`FollowStreamUnpin`] given the RPC methods.
pub fn from_methods<T: Config>(
follow_stream: FollowStream<T::Hash>,
methods: UnstableRpcMethods<T>,
max_block_life: usize,
) -> FollowStreamUnpin<T::Hash> {
let unpin_method = Box::new(move |hash: T::Hash, sub_id: Arc<str>| {
let methods = methods.clone();
let fut: UnpinFut = Box::pin(async move {
// We ignore any errors trying to unpin at the moment.
let _ = methods.chainhead_unstable_unpin(&sub_id, hash).await;
});
fut
});
FollowStreamUnpin::new(follow_stream, unpin_method, max_block_life)
}
/// Is the block hash currently pinned.
pub fn is_pinned(&self, hash: &Hash) -> bool {
self.pinned.contains_key(hash)
}
/// Pin a block, or return the reference to an already-pinned block. If the block has been registered to
/// be unpinned, we'll clear those flags, so that it won't be unpinned. If the unpin request has already
/// been sent though, then the block will be unpinned.
fn pin_block_at(&mut self, rel_block_num: usize, hash: Hash) -> BlockRef<Hash> {
let entry = self
.pinned
.entry(hash)
// Only if there's already an entry do we need to clear any unpin flags set against it.
.and_modify(|_| {
self.unpin_flags.lock().unwrap().remove(&hash);
})
// If there's not an entry already, make one and return it.
.or_insert_with(|| PinnedDetails {
rel_block_num,
block_ref: BlockRef {
inner: Arc::new(BlockRefInner {
hash,
unpin_flags: self.unpin_flags.clone(),
}),
},
});
entry.block_ref.clone()
}
/// Unpin any blocks that are either too old, or have the unpin flag set and are old enough.
fn unpin_blocks(&mut self, waker: &Waker) {
let unpin_flags = std::mem::take(&mut *self.unpin_flags.lock().unwrap());
let rel_block_num = self.rel_block_num;
// If we asked to unpin and there was no subscription_id, then there's nothing to
// do here, and we've cleared the flags now above anyway.
let Some(sub_id) = &self.subscription_id else {
return;
};
let mut blocks_to_unpin = vec![];
for (hash, details) in &self.pinned {
if rel_block_num.saturating_sub(details.rel_block_num) >= self.max_block_life
|| unpin_flags.contains(hash)
{
// The block is too old, or it's been flagged to be unpinned (no refs to it left)
blocks_to_unpin.push(*hash);
}
}
// No need to call the waker etc if nothing to do:
if blocks_to_unpin.is_empty() {
return;
}
for hash in blocks_to_unpin {
self.pinned.remove(&hash);
let fut = (self.unpin_method.0)(hash, sub_id.clone());
self.unpin_futs.push(fut);
}
// Any new futures pushed above need polling to start. We could
// just wait for the next stream event, but let's wake the task to
// have it polled sooner, just incase it's slow to receive things.
waker.wake_by_ref();
}
}
// The set of block hashes that can be unpinned when ready.
// BlockRefs write to this when they are dropped.
type UnpinFlags<Hash> = Arc<Mutex<HashSet<Hash>>>;
#[derive(Debug)]
struct PinnedDetails<Hash: BlockHash> {
/// How old is the block?
rel_block_num: usize,
/// A block ref we can hand out to keep blocks pinned.
/// Because we store one here until it's unpinned, the live count
/// will only drop to 1 when no external refs are left.
block_ref: BlockRef<Hash>,
}
/// All blocks reported will be wrapped in this.
#[derive(Debug, Clone)]
pub struct BlockRef<Hash: BlockHash> {
inner: Arc<BlockRefInner<Hash>>,
}
#[derive(Debug)]
struct BlockRefInner<Hash> {
hash: Hash,
unpin_flags: UnpinFlags<Hash>,
}
impl<Hash: BlockHash> BlockRef<Hash> {
/// For testing purposes only, create a BlockRef from a hash
/// that isn't pinned.
#[cfg(test)]
pub fn new(hash: Hash) -> Self {
BlockRef {
inner: Arc::new(BlockRefInner {
hash,
unpin_flags: Default::default(),
}),
}
}
/// Return the hash for this block.
pub fn hash(&self) -> Hash {
self.inner.hash
}
}
impl<Hash: BlockHash> PartialEq for BlockRef<Hash> {
fn eq(&self, other: &Self) -> bool {
self.inner.hash == other.inner.hash
}
}
impl<Hash: BlockHash> PartialEq<Hash> for BlockRef<Hash> {
fn eq(&self, other: &Hash) -> bool {
&self.inner.hash == other
}
}
impl<Hash: BlockHash> Drop for BlockRef<Hash> {
fn drop(&mut self) {
// PinnedDetails keeps one ref, so if this is the second ref, it's the
// only "external" one left and we should ask to unpin it now. if it's
// the only ref remaining, it means that it's already been unpinned, so
// nothing to do here anyway.
if Arc::strong_count(&self.inner) == 2 {
if let Ok(mut unpin_flags) = self.inner.unpin_flags.lock() {
unpin_flags.insert(self.inner.hash);
}
}
}
}
#[cfg(test)]
pub(super) mod test_utils {
use super::super::follow_stream::{test_utils::test_stream_getter, FollowStream};
use super::*;
use crate::config::substrate::H256;
pub type UnpinRx<Hash> = std::sync::mpsc::Receiver<(Hash, Arc<str>)>;
/// Get a `FolowStreamUnpin` from an iterator over events.
pub fn test_unpin_stream_getter<Hash, F, I>(
events: F,
max_life: usize,
) -> (FollowStreamUnpin<Hash>, UnpinRx<Hash>)
where
Hash: BlockHash + 'static,
F: Fn() -> I + Send + 'static,
I: IntoIterator<Item = Result<FollowEvent<Hash>, Error>>,
{
// Unpin requests will come here so that we can look out for them.
let (unpin_tx, unpin_rx) = std::sync::mpsc::channel();
let follow_stream = FollowStream::new(test_stream_getter(events));
let unpin_method: UnpinMethod<Hash> = Box::new(move |hash, sub_id| {
unpin_tx.send((hash, sub_id)).unwrap();
Box::pin(std::future::ready(()))
});
let follow_unpin = FollowStreamUnpin::new(follow_stream, unpin_method, max_life);
(follow_unpin, unpin_rx)
}
/// An initialized event containing a BlockRef (useful for comparisons)
pub fn ev_initialized_ref(n: u64) -> FollowEvent<BlockRef<H256>> {
FollowEvent::Initialized(Initialized {
finalized_block_hash: BlockRef::new(H256::from_low_u64_le(n)),
finalized_block_runtime: None,
})
}
/// A new block event containing a BlockRef (useful for comparisons)
pub fn ev_new_block_ref(parent: u64, n: u64) -> FollowEvent<BlockRef<H256>> {
FollowEvent::NewBlock(NewBlock {
parent_block_hash: BlockRef::new(H256::from_low_u64_le(parent)),
block_hash: BlockRef::new(H256::from_low_u64_le(n)),
new_runtime: None,
})
}
/// A best block event containing a BlockRef (useful for comparisons)
pub fn ev_best_block_ref(n: u64) -> FollowEvent<BlockRef<H256>> {
FollowEvent::BestBlockChanged(BestBlockChanged {
best_block_hash: BlockRef::new(H256::from_low_u64_le(n)),
})
}
/// A finalized event containing a BlockRef (useful for comparisons)
pub fn ev_finalized_ref(ns: impl IntoIterator<Item = u64>) -> FollowEvent<BlockRef<H256>> {
FollowEvent::Finalized(Finalized {
finalized_block_hashes: ns
.into_iter()
.map(|h| BlockRef::new(H256::from_low_u64_le(h)))
.collect(),
pruned_block_hashes: vec![],
})
}
}
#[cfg(test)]
mod test {
use super::super::follow_stream::test_utils::{
ev_best_block, ev_finalized, ev_initialized, ev_new_block,
};
use super::test_utils::{ev_new_block_ref, test_unpin_stream_getter};
use super::*;
use crate::config::substrate::H256;
#[tokio::test]
async fn hands_back_blocks() {
let (follow_unpin, _) = test_unpin_stream_getter(
|| {
[
Ok(ev_new_block(0, 1)),
Ok(ev_new_block(1, 2)),
Ok(ev_new_block(2, 3)),
Err(Error::Other("ended".to_owned())),
]
},
10,
);
let out: Vec<_> = follow_unpin
.filter_map(|e| async move { e.ok() })
.collect()
.await;
assert_eq!(
out,
vec![
FollowStreamMsg::Ready("sub_id_0".into()),
FollowStreamMsg::Event(ev_new_block_ref(0, 1)),
FollowStreamMsg::Event(ev_new_block_ref(1, 2)),
FollowStreamMsg::Event(ev_new_block_ref(2, 3)),
]
);
}
#[tokio::test]
async fn unpins_old_blocks() {
let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(ev_finalized([1])),
Ok(ev_finalized([2])),
Ok(ev_finalized([3])),
Ok(ev_finalized([4])),
Ok(ev_finalized([5])),
Err(Error::Other("ended".to_owned())),
]
},
3,
);
let _r = follow_unpin.next().await.unwrap().unwrap();
let _i0 = follow_unpin.next().await.unwrap().unwrap();
unpin_rx.try_recv().expect_err("nothing unpinned yet");
let _f1 = follow_unpin.next().await.unwrap().unwrap();
unpin_rx.try_recv().expect_err("nothing unpinned yet");
let _f2 = follow_unpin.next().await.unwrap().unwrap();
unpin_rx.try_recv().expect_err("nothing unpinned yet");
let _f3 = follow_unpin.next().await.unwrap().unwrap();
// Max age is 3, so after block 3 finalized, block 0 becomes too old and is unpinned.
let (hash, _) = unpin_rx
.try_recv()
.expect("unpin call should have happened");
assert_eq!(hash, H256::from_low_u64_le(0));
let _f4 = follow_unpin.next().await.unwrap().unwrap();
// Block 1 is now too old and is unpinned.
let (hash, _) = unpin_rx
.try_recv()
.expect("unpin call should have happened");
assert_eq!(hash, H256::from_low_u64_le(1));
let _f5 = follow_unpin.next().await.unwrap().unwrap();
// Block 2 is now too old and is unpinned.
let (hash, _) = unpin_rx
.try_recv()
.expect("unpin call should have happened");
assert_eq!(hash, H256::from_low_u64_le(2));
}
#[tokio::test]
async fn unpins_dropped_blocks() {
let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(ev_finalized([1])),
Ok(ev_finalized([2])),
Err(Error::Other("ended".to_owned())),
]
},
10,
);
let _r = follow_unpin.next().await.unwrap().unwrap();
let _i0 = follow_unpin.next().await.unwrap().unwrap();
let f1 = follow_unpin.next().await.unwrap().unwrap();
// We don't care about block 1 any more; drop it. unpins happen at finalized evs.
drop(f1);
let _f2 = follow_unpin.next().await.unwrap().unwrap();
// Check that we get the expected unpin event.
let (hash, _) = unpin_rx
.try_recv()
.expect("unpin call should have happened");
assert_eq!(hash, H256::from_low_u64_le(1));
// Confirm that 0 and 2 are still pinned and that 1 isnt.
assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(1)));
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(0)));
assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2)));
}
#[tokio::test]
async fn only_unpins_if_finalized_is_dropped() {
// If we drop the "new block" and "best block" BlockRefs,
// and then the block comes back as finalized (for instance),
// no unpin call should be made unless we also drop the finalized
// one.
let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter(
|| {
[
Ok(ev_initialized(0)),
Ok(ev_new_block(0, 1)),
Ok(ev_best_block(1)),
Ok(ev_finalized([1])),
Ok(ev_finalized([2])),
Err(Error::Other("ended".to_owned())),
]
},
100,
);
let _r = follow_unpin.next().await.unwrap().unwrap();
let _i0 = follow_unpin.next().await.unwrap().unwrap();
let n1 = follow_unpin.next().await.unwrap().unwrap();
drop(n1);
let b1 = follow_unpin.next().await.unwrap().unwrap();
drop(b1);
let f1 = follow_unpin.next().await.unwrap().unwrap();
// even though we dropped our block 1 in the new/best events, it won't be unpinned
// because it occurred again in finalized event.
unpin_rx.try_recv().expect_err("nothing unpinned yet");
drop(f1);
let _f2 = follow_unpin.next().await.unwrap().unwrap();
// Since we dropped the finalized block 1, we'll now unpin it when next block finalized.
let (hash, _) = unpin_rx.try_recv().expect("unpin should have happened now");
assert_eq!(hash, H256::from_low_u64_le(1));
}
}
+583
View File
@@ -11,6 +11,589 @@
//! Everything in this module is **unstable**, meaning that it could change without
//! warning at any time.
mod follow_stream;
mod follow_stream_driver;
mod follow_stream_unpin;
mod storage_items;
pub mod rpc_methods;
use self::rpc_methods::{
FollowEvent, MethodResponse, RuntimeEvent, StorageQuery, StorageQueryType, StorageResultType,
};
use crate::backend::{
rpc::RpcClient, Backend, BlockRef, BlockRefT, RuntimeVersion, StorageResponse, StreamOf,
StreamOfResults, TransactionStatus,
};
use crate::config::BlockHash;
use crate::error::{Error, RpcError};
use crate::Config;
use async_trait::async_trait;
use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle};
use futures::{Stream, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use std::task::Poll;
use storage_items::StorageItems;
// Expose the RPC methods.
pub use rpc_methods::UnstableRpcMethods;
/// Configure and build an [`UnstableBackend`].
pub struct UnstableBackendBuilder<T> {
max_block_life: usize,
_marker: std::marker::PhantomData<T>,
}
impl<T: Config> Default for UnstableBackendBuilder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Config> UnstableBackendBuilder<T> {
/// Create a new [`UnstableBackendBuilder`].
pub fn new() -> Self {
Self {
max_block_life: usize::MAX,
_marker: std::marker::PhantomData,
}
}
/// The age of a block is defined here as the difference between the current finalized block number
/// and the block number of a given block. Once the difference equals or exceeds the number given
/// here, the block is unpinned.
///
/// By default, we will never automatically unpin blocks, but if the number of pinned blocks that we
/// keep hold of exceeds the number that the server can tolerate, then a `stop` event is generated and
/// we are forced to resubscribe, losing any pinned blocks.
pub fn max_block_life(mut self, max_block_life: usize) -> Self {
self.max_block_life = max_block_life;
self
}
/// Given an [`RpcClient`] to use to make requests, this returns a tuple of an [`UnstableBackend`],
/// which implements the [`Backend`] trait, and an [`UnstableBackendDriver`] which must be polled in
/// order for the backend to make progress.
pub fn build(self, client: RpcClient) -> (UnstableBackend<T>, UnstableBackendDriver<T>) {
// Construct the underlying follow_stream layers:
let rpc_methods = UnstableRpcMethods::new(client);
let follow_stream =
follow_stream::FollowStream::<T::Hash>::from_methods(rpc_methods.clone());
let follow_stream_unpin = follow_stream_unpin::FollowStreamUnpin::<T::Hash>::from_methods(
follow_stream,
rpc_methods.clone(),
self.max_block_life,
);
let follow_stream_driver = FollowStreamDriver::new(follow_stream_unpin);
// Wrap these into the backend and driver that we'll expose.
let backend = UnstableBackend {
methods: rpc_methods,
follow_handle: follow_stream_driver.handle(),
};
let driver = UnstableBackendDriver {
driver: follow_stream_driver,
};
(backend, driver)
}
}
/// Driver for the [`UnstableBackend`]. This must be polled in order for the
/// backend to make progress.
#[derive(Debug)]
pub struct UnstableBackendDriver<T: Config> {
driver: FollowStreamDriver<T::Hash>,
}
impl<T: Config> Stream for UnstableBackendDriver<T> {
type Item = <FollowStreamDriver<T::Hash> as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.driver.poll_next_unpin(cx)
}
}
/// The unstable backend.
#[derive(Debug, Clone)]
pub struct UnstableBackend<T: Config> {
// RPC methods we'll want to call:
methods: UnstableRpcMethods<T>,
// A handle to the chainHead_follow subscription:
follow_handle: FollowStreamDriverHandle<T::Hash>,
}
impl<T: Config> UnstableBackend<T> {
/// Configure and construct an [`UnstableBackend`] and the associated [`UnstableBackendDriver`].
pub fn builder() -> UnstableBackendBuilder<T> {
UnstableBackendBuilder::new()
}
/// Stream block headers based on the provided filter fn
async fn stream_headers<F, I>(
&self,
f: F,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error>
where
F: Fn(FollowEvent<follow_stream_unpin::BlockRef<T::Hash>>) -> I + Copy + Send + 'static,
I: IntoIterator<Item = follow_stream_unpin::BlockRef<T::Hash>> + Send + 'static,
<I as IntoIterator>::IntoIter: Send,
{
let sub_id = get_subscription_id(&self.follow_handle).await?;
let sub_id = Arc::new(sub_id);
let methods = self.methods.clone();
let headers = self.follow_handle.subscribe().events().flat_map(move |ev| {
let sub_id = sub_id.clone();
let methods = methods.clone();
let block_refs = f(ev).into_iter();
futures::stream::iter(block_refs).filter_map(move |block_ref| {
let sub_id = sub_id.clone();
let methods = methods.clone();
async move {
let res = methods
.chainhead_unstable_header(&sub_id, block_ref.hash())
.await
.transpose()?;
let header = match res {
Ok(header) => header,
Err(e) => return Some(Err(e)),
};
Some(Ok((header, block_ref.into())))
}
})
});
Ok(StreamOf(Box::pin(headers)))
}
}
impl<Hash: BlockHash + 'static> BlockRefT for follow_stream_unpin::BlockRef<Hash> {}
impl<Hash: BlockHash + 'static> From<follow_stream_unpin::BlockRef<Hash>> for BlockRef<Hash> {
fn from(b: follow_stream_unpin::BlockRef<Hash>) -> Self {
BlockRef::new(b.hash(), b)
}
}
impl<T: Config> super::sealed::Sealed for UnstableBackend<T> {}
#[async_trait]
impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
async fn storage_fetch_values(
&self,
keys: Vec<Vec<u8>>,
at: T::Hash,
) -> Result<StreamOfResults<StorageResponse>, Error> {
let queries = keys.iter().map(|key| StorageQuery {
key: &**key,
query_type: StorageQueryType::Value,
});
let storage_items =
StorageItems::from_methods(queries, at, &self.follow_handle, self.methods.clone())
.await?;
let storage_result_stream = storage_items.filter_map(|val| async move {
let val = match val {
Ok(val) => val,
Err(e) => return Some(Err(e)),
};
let StorageResultType::Value(result) = val.result else {
return None;
};
Some(Ok(StorageResponse {
key: val.key.0,
value: result.0,
}))
});
Ok(StreamOf(Box::pin(storage_result_stream)))
}
async fn storage_fetch_descendant_keys(
&self,
key: Vec<u8>,
at: T::Hash,
) -> Result<StreamOfResults<Vec<u8>>, Error> {
// Ask for hashes, and then just ignore them and return the keys that come back.
let query = StorageQuery {
key: &*key,
query_type: StorageQueryType::DescendantsHashes,
};
let storage_items = StorageItems::from_methods(
std::iter::once(query),
at,
&self.follow_handle,
self.methods.clone(),
)
.await?;
let storage_result_stream = storage_items.map(|val| val.map(|v| v.key.0));
Ok(StreamOf(Box::pin(storage_result_stream)))
}
async fn storage_fetch_descendant_values(
&self,
key: Vec<u8>,
at: T::Hash,
) -> Result<StreamOfResults<StorageResponse>, Error> {
let query = StorageQuery {
key: &*key,
query_type: StorageQueryType::DescendantsValues,
};
let storage_items = StorageItems::from_methods(
std::iter::once(query),
at,
&self.follow_handle,
self.methods.clone(),
)
.await?;
let storage_result_stream = storage_items.filter_map(|val| async move {
let val = match val {
Ok(val) => val,
Err(e) => return Some(Err(e)),
};
let StorageResultType::Value(result) = val.result else {
return None;
};
Some(Ok(StorageResponse {
key: val.key.0,
value: result.0,
}))
});
Ok(StreamOf(Box::pin(storage_result_stream)))
}
async fn genesis_hash(&self) -> Result<T::Hash, Error> {
self.methods.chainspec_v1_genesis_hash().await
}
async fn block_header(&self, at: T::Hash) -> Result<Option<T::Header>, Error> {
let sub_id = get_subscription_id(&self.follow_handle).await?;
self.methods.chainhead_unstable_header(&sub_id, at).await
}
async fn block_body(&self, at: T::Hash) -> Result<Option<Vec<Vec<u8>>>, Error> {
let sub_id = get_subscription_id(&self.follow_handle).await?;
// Subscribe to the body response and get our operationId back.
let follow_events = self.follow_handle.subscribe().events();
let status = self.methods.chainhead_unstable_body(&sub_id, at).await?;
let operation_id = match status {
MethodResponse::LimitReached => {
return Err(RpcError::request_rejected("limit reached").into())
}
MethodResponse::Started(s) => s.operation_id,
};
// Wait for the response to come back with the correct operationId.
let mut exts_stream = follow_events.filter_map(|ev| {
let FollowEvent::OperationBodyDone(body) = ev else {
return std::future::ready(None);
};
if body.operation_id != operation_id {
return std::future::ready(None);
}
let exts: Vec<_> = body.value.into_iter().map(|ext| ext.0).collect();
std::future::ready(Some(exts))
});
Ok(exts_stream.next().await)
}
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<T::Hash>, Error> {
let next_ref: Option<BlockRef<T::Hash>> = self
.follow_handle
.subscribe()
.events()
.filter_map(|ev| {
let out = match ev {
FollowEvent::Initialized(init) => Some(init.finalized_block_hash.into()),
_ => None,
};
std::future::ready(out)
})
.next()
.await;
next_ref.ok_or_else(|| RpcError::SubscriptionDropped.into())
}
async fn current_runtime_version(&self) -> Result<RuntimeVersion, Error> {
// Just start a stream of version infos, and return the first value we get from it.
let runtime_version = self.stream_runtime_version().await?.next().await;
match runtime_version {
None => Err(Error::Rpc(RpcError::SubscriptionDropped)),
Some(Err(e)) => Err(e),
Some(Ok(version)) => Ok(version),
}
}
async fn stream_runtime_version(&self) -> Result<StreamOfResults<RuntimeVersion>, Error> {
// Keep track of runtime details announced in new blocks, and then when blocks
// are finalized, find the latest of these that has runtime details, and clear the rest.
let mut runtimes = HashMap::new();
let runtime_stream = self
.follow_handle
.subscribe()
.events()
.filter_map(move |ev| {
let output = match ev {
FollowEvent::Initialized(ev) => {
runtimes.clear();
ev.finalized_block_runtime
}
FollowEvent::NewBlock(ev) => {
if let Some(runtime) = ev.new_runtime {
runtimes.insert(ev.block_hash.hash(), runtime);
}
None
}
FollowEvent::Finalized(ev) => {
let next_runtime = ev
.finalized_block_hashes
.iter()
.rev()
.filter_map(|h| runtimes.get(&h.hash()).cloned())
.next();
runtimes.clear();
next_runtime
}
_ => None,
};
let runtime_event = match output {
None => return std::future::ready(None),
Some(ev) => ev,
};
let runtime_details = match runtime_event {
RuntimeEvent::Invalid(err) => {
return std::future::ready(Some(Err(Error::Other(err.error))))
}
RuntimeEvent::Valid(ev) => ev,
};
std::future::ready(Some(Ok(RuntimeVersion {
spec_version: runtime_details.spec.spec_version,
transaction_version: runtime_details.spec.transaction_version,
})))
});
Ok(StreamOf(Box::pin(runtime_stream)))
}
async fn stream_all_block_headers(
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
self.stream_headers(|ev| match ev {
FollowEvent::Initialized(ev) => Some(ev.finalized_block_hash),
FollowEvent::NewBlock(ev) => Some(ev.block_hash),
_ => None,
})
.await
}
async fn stream_best_block_headers(
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
self.stream_headers(|ev| match ev {
FollowEvent::Initialized(ev) => Some(ev.finalized_block_hash),
FollowEvent::BestBlockChanged(ev) => Some(ev.best_block_hash),
_ => None,
})
.await
}
async fn stream_finalized_block_headers(
&self,
) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
self.stream_headers(|ev| match ev {
FollowEvent::Initialized(ev) => {
vec![ev.finalized_block_hash]
}
FollowEvent::Finalized(ev) => ev.finalized_block_hashes,
_ => vec![],
})
.await
}
async fn submit_transaction(
&self,
extrinsic: &[u8],
) -> Result<StreamOfResults<TransactionStatus<T::Hash>>, Error> {
// First, subscribe to all new block hashes
let mut new_blocks = self.follow_handle.subscribe().events().filter_map(|ev| {
std::future::ready(match ev {
FollowEvent::NewBlock(ev) => Some(ev.block_hash),
_ => None,
})
});
// Then, submit the transaction.
let mut tx_progress = self
.methods
.transaction_unstable_submit_and_watch(extrinsic)
.await?;
let mut seen_blocks = HashMap::new();
let mut done = false;
// If we see the finalized event, we start waiting until we find a block that
// matches, so we can guarantee to return a pinned block hash.
let mut finalized_hash: Option<T::Hash> = None;
// Now we can attempt to associate tx events with pinned blocks.
let tx_stream = futures::stream::poll_fn(move |cx| {
loop {
// Bail early if no more tx events; we don't want to keep polling for pinned blocks.
if done {
return Poll::Ready(None);
}
// Save any pinned blocks. Keep doing this until no more, so that we always have the most uptodate
// pinned blocks when we are looking at our tx events.
if let Poll::Ready(Some(block_ref)) = new_blocks.poll_next_unpin(cx) {
seen_blocks.insert(block_ref.hash(), block_ref);
continue;
}
// If we have a finalized hash, we are done looking for tx events and we are just waiting
// for a pinned block with a matching hash (which must appear eventually given it's finalized).
if let Some(hash) = &finalized_hash {
if let Some(block_ref) = seen_blocks.remove(hash) {
// Found it! Hand back the event with a pinned block. We're done.
done = true;
let ev = TransactionStatus::InFinalizedBlock {
hash: block_ref.into(),
};
return Poll::Ready(Some(Ok(ev)));
} else {
// Keep waiting for more new blocks until we find it (get rid of any other block refs
// now, since none of them were what we were looking for anyway).
seen_blocks.clear();
continue;
}
}
// Otherwise, we are still watching for tx events:
let ev = match tx_progress.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
done = true;
return Poll::Ready(None);
}
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(ev))) => ev,
};
// When we get one, map it to the correct format (or for finalized ev, wait for the pinned block):
let ev = match ev {
rpc_methods::TransactionStatus::Finalized { block } => {
// We'll wait until we have seen this hash, to try to guarantee
// that when we return this event, the corresponding block is
// pinned and accessible.
finalized_hash = Some(block.hash);
continue;
}
rpc_methods::TransactionStatus::BestChainBlockIncluded {
block: Some(block),
} => {
// Look up a pinned block ref if we can, else return a non-pinned
// block that likely isn't accessible. We have no guarantee that a best
// block on the node a tx was sent to will ever be known about on the
// chainHead_follow subscription.
let block_ref = match seen_blocks.get(&block.hash).cloned() {
Some(block_ref) => block_ref.into(),
None => BlockRef::from_hash(block.hash),
};
TransactionStatus::InBestBlock { hash: block_ref }
}
rpc_methods::TransactionStatus::BestChainBlockIncluded { block: None } => {
TransactionStatus::NoLongerInBestBlock
}
rpc_methods::TransactionStatus::Broadcasted { num_peers } => {
TransactionStatus::Broadcasted { num_peers }
}
rpc_methods::TransactionStatus::Dropped { error, .. } => {
TransactionStatus::Dropped { message: error }
}
rpc_methods::TransactionStatus::Error { error } => {
TransactionStatus::Dropped { message: error }
}
rpc_methods::TransactionStatus::Invalid { error } => {
TransactionStatus::Invalid { message: error }
}
rpc_methods::TransactionStatus::Validated => TransactionStatus::Validated,
};
return Poll::Ready(Some(Ok(ev)));
}
});
Ok(StreamOf(Box::pin(tx_stream)))
}
async fn call(
&self,
method: &str,
call_parameters: Option<&[u8]>,
at: T::Hash,
) -> Result<Vec<u8>, Error> {
let sub_id = get_subscription_id(&self.follow_handle).await?;
// Subscribe to the body response and get our operationId back.
let follow_events = self.follow_handle.subscribe().events();
let call_parameters = call_parameters.unwrap_or(&[]);
let status = self
.methods
.chainhead_unstable_call(&sub_id, at, method, call_parameters)
.await?;
let operation_id = match status {
MethodResponse::LimitReached => {
return Err(RpcError::request_rejected("limit reached").into())
}
MethodResponse::Started(s) => s.operation_id,
};
// Wait for the response to come back with the correct operationId.
let mut call_data_stream = follow_events.filter_map(|ev| {
let FollowEvent::OperationCallDone(body) = ev else {
return std::future::ready(None);
};
if body.operation_id != operation_id {
return std::future::ready(None);
}
std::future::ready(Some(body.output.0))
});
call_data_stream
.next()
.await
.ok_or_else(|| RpcError::SubscriptionDropped.into())
}
}
/// A helper to obtain a subscription ID.
async fn get_subscription_id<Hash: BlockHash>(
follow_handle: &FollowStreamDriverHandle<Hash>,
) -> Result<String, Error> {
let Some(sub_id) = follow_handle.subscribe().subscription_id().await else {
return Err(RpcError::SubscriptionDropped.into());
};
Ok(sub_id)
}
+55 -16
View File
@@ -7,10 +7,11 @@
//! methods exposed here.
use crate::backend::rpc::{rpc_params, RpcClient, RpcSubscription};
use crate::config::BlockHash;
use crate::{Config, Error};
use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::task::Poll;
/// An interface to call the unstable RPC methods. This interface is instantiated with
@@ -64,8 +65,8 @@ impl<T: Config> UnstableRpcMethods<T> {
pub async fn chainhead_unstable_follow(
&self,
with_runtime: bool,
) -> Result<RpcSubscription<FollowEvent<T::Hash>>, Error> {
let subscription = self
) -> Result<FollowSubscription<T::Hash>, Error> {
let sub = self
.client
.subscribe(
"chainHead_unstable_follow",
@@ -74,7 +75,7 @@ impl<T: Config> UnstableRpcMethods<T> {
)
.await?;
Ok(subscription)
Ok(FollowSubscription { sub, done: false })
}
/// Resumes a storage fetch started with chainHead_unstable_storage after it has generated an
@@ -500,7 +501,7 @@ pub struct OperationBodyDone {
/// The operation id of the event.
pub operation_id: String,
/// Array of hexadecimal-encoded scale-encoded extrinsics found in the block.
pub value: Vec<String>,
pub value: Vec<Bytes>,
}
/// The response of the `chainHead_call` method.
@@ -510,7 +511,7 @@ pub struct OperationCallDone {
/// The operation id of the event.
pub operation_id: String,
/// Hexadecimal-encoded output of the runtime function call.
pub output: String,
pub output: Bytes,
}
/// The response of the `chainHead_call` method.
@@ -520,7 +521,7 @@ pub struct OperationStorageItems {
/// The operation id of the event.
pub operation_id: String,
/// The resulting items.
pub items: Vec<StorageResult>,
pub items: VecDeque<StorageResult>,
}
/// Indicate a problem during the operation.
@@ -538,7 +539,7 @@ pub struct OperationError {
#[serde(rename_all = "camelCase")]
pub struct StorageResult {
/// The hex-encoded key of the result.
pub key: String,
pub key: Bytes,
/// The result of the query.
#[serde(flatten)]
pub result: StorageResultType,
@@ -549,11 +550,11 @@ pub struct StorageResult {
#[serde(rename_all = "camelCase")]
pub enum StorageResultType {
/// Fetch the value of the provided key.
Value(String),
Value(Bytes),
/// Fetch the hash of the value of the provided key.
Hash(String),
Hash(Bytes),
/// Fetch the closest descendant merkle value.
ClosestDescendantMerkleValue(String),
ClosestDescendantMerkleValue(Bytes),
}
/// The method respose of `chainHead_body`, `chainHead_call` and `chainHead_storage`.
@@ -604,6 +605,44 @@ pub enum StorageQueryType {
DescendantsHashes,
}
/// A subscription which returns follow events, and ends when a Stop event occurs.
pub struct FollowSubscription<Hash> {
sub: RpcSubscription<FollowEvent<Hash>>,
done: bool,
}
impl<Hash: BlockHash> FollowSubscription<Hash> {
/// Fetch the next item in the stream.
pub async fn next(&mut self) -> Option<<Self as Stream>::Item> {
<Self as StreamExt>::next(self).await
}
/// Fetch the subscription ID for the stream.
pub fn subscription_id(&self) -> Option<&str> {
self.sub.subscription_id()
}
}
impl<Hash: BlockHash> Stream for FollowSubscription<Hash> {
type Item = <RpcSubscription<FollowEvent<Hash>> as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}
let res = self.sub.poll_next_unpin(cx);
if let Poll::Ready(Some(Ok(FollowEvent::Stop))) = &res {
// No more events will occur after this one.
self.done = true
}
res
}
}
/// A subscription which returns transaction status events, stopping
/// when no more events will be sent.
pub struct TransactionSubscription<Hash> {
@@ -611,14 +650,14 @@ pub struct TransactionSubscription<Hash> {
done: bool,
}
impl<Hash: serde::de::DeserializeOwned> TransactionSubscription<Hash> {
impl<Hash: BlockHash> TransactionSubscription<Hash> {
/// Fetch the next item in the stream.
pub async fn next(&mut self) -> Option<<Self as Stream>::Item> {
StreamExt::next(self).await
<Self as StreamExt>::next(self).await
}
}
impl<Hash: serde::de::DeserializeOwned> Stream for TransactionSubscription<Hash> {
impl<Hash: BlockHash> Stream for TransactionSubscription<Hash> {
type Item = <RpcSubscription<TransactionStatus<Hash>> as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
@@ -694,10 +733,10 @@ pub enum TransactionStatus<Hash> {
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
pub struct TransactionBlockDetails<Hash> {
/// The block hash.
hash: Hash,
pub hash: Hash,
/// The index of the transaction in the block.
#[serde(with = "unsigned_number_as_string")]
index: u64,
pub index: u64,
}
/// Hex-serialized shim for `Vec<u8>`.
+160
View File
@@ -0,0 +1,160 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::follow_stream_driver::FollowStreamDriverHandle;
use super::follow_stream_unpin::BlockRef;
use super::rpc_methods::{
FollowEvent, MethodResponse, StorageQuery, StorageResult, UnstableRpcMethods,
};
use crate::config::Config;
use crate::error::{Error, RpcError};
use futures::{FutureExt, Stream, StreamExt};
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
/// Obtain a stream of storage items given some query. this handles continuing
/// and stopping under the hood, and returns a stream of `StorageResult`s.
pub struct StorageItems<T: Config> {
done: bool,
operation_id: Arc<str>,
buffered_responses: VecDeque<StorageResult>,
continue_call: ContinueFutGetter,
continue_fut: Option<ContinueFut>,
follow_event_stream: FollowEventStream<T::Hash>,
}
impl<T: Config> StorageItems<T> {
// Subscribe to follow events, and return a stream of storage results
// given some storage queries. The stream will automatically resume as
// needed, and stop when done.
pub async fn from_methods(
queries: impl Iterator<Item = StorageQuery<&[u8]>>,
at: T::Hash,
follow_handle: &FollowStreamDriverHandle<T::Hash>,
methods: UnstableRpcMethods<T>,
) -> Result<Self, Error> {
let sub_id = super::get_subscription_id(follow_handle).await?;
// Subscribe to events and make the initial request to get an operation ID.
let follow_events = follow_handle.subscribe().events();
let status = methods
.chainhead_unstable_storage(&sub_id, at, queries, None)
.await?;
let operation_id: Arc<str> = match status {
MethodResponse::LimitReached => {
return Err(RpcError::request_rejected("limit reached").into())
}
MethodResponse::Started(s) => s.operation_id.into(),
};
// A function which returns the call to continue the subscription:
let continue_call: ContinueFutGetter = {
let operation_id = operation_id.clone();
Box::new(move || {
let sub_id = sub_id.clone();
let operation_id = operation_id.clone();
let methods = methods.clone();
Box::pin(async move {
methods
.chainhead_unstable_continue(&sub_id, &operation_id)
.await
})
})
};
Ok(StorageItems::new(
operation_id,
continue_call,
Box::pin(follow_events),
))
}
fn new(
operation_id: Arc<str>,
continue_call: ContinueFutGetter,
follow_event_stream: FollowEventStream<T::Hash>,
) -> Self {
Self {
done: false,
buffered_responses: VecDeque::new(),
operation_id,
continue_call,
continue_fut: None,
follow_event_stream,
}
}
}
pub type FollowEventStream<Hash> =
Pin<Box<dyn Stream<Item = FollowEvent<BlockRef<Hash>>> + Send + 'static>>;
pub type ContinueFutGetter = Box<dyn Fn() -> ContinueFut + Send + 'static>;
pub type ContinueFut = Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'static>>;
impl<T: Config> Stream for StorageItems<T> {
type Item = Result<StorageResult, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if self.done {
return Poll::Ready(None);
}
if let Some(item) = self.buffered_responses.pop_front() {
return Poll::Ready(Some(Ok(item)));
}
if let Some(mut fut) = self.continue_fut.take() {
match fut.poll_unpin(cx) {
Poll::Pending => {
self.continue_fut = Some(fut);
return Poll::Pending;
}
Poll::Ready(Err(e)) => {
self.done = true;
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(Ok(())) => {
// Finished; carry on.
}
}
}
let ev = match self.follow_event_stream.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(ev)) => ev,
};
match ev {
FollowEvent::OperationWaitingForContinue(id)
if id.operation_id == *self.operation_id =>
{
// Start a call to ask for more events
self.continue_fut = Some((self.continue_call)());
continue;
}
FollowEvent::OperationStorageDone(id) if id.operation_id == *self.operation_id => {
// We're finished!
self.done = true;
return Poll::Ready(None);
}
FollowEvent::OperationStorageItems(items)
if items.operation_id == *self.operation_id =>
{
// We have items; buffer them to emit next loops.
self.buffered_responses = items.items;
continue;
}
_ => {
// We don't care about this event; wait for the next.
continue;
}
}
}
}
}
+2 -2
View File
@@ -69,10 +69,10 @@ where
) -> impl Future<Output = Result<Block<T, Client>, Error>> + Send + 'static {
let client = self.client.clone();
async move {
// If a block ref isn't provided, we'll get the latest best block to use.
// If a block ref isn't provided, we'll get the latest finalized ref to use.
let block_ref = match block_ref {
Some(r) => r,
None => client.backend().latest_best_block_ref().await?,
None => client.backend().latest_finalized_block_ref().await?,
};
let block_header = match client.backend().block_header(block_ref.hash()).await? {
@@ -3,7 +3,7 @@
// see LICENSE for license details.
use super::{rpc::LightClientRpc, LightClient, LightClientError};
use crate::backend::rpc::RpcClient;
use crate::backend::{rpc::RpcClient, Backend};
use crate::{config::Config, error::Error, OnlineClient};
use std::num::NonZeroU32;
use subxt_lightclient::{AddChainConfig, AddChainConfigJsonRpc, ChainId};
+2 -2
View File
@@ -12,7 +12,7 @@ mod offline_client;
mod online_client;
#[cfg(feature = "unstable-light-client")]
mod lightclient;
mod light_client;
pub use offline_client::{OfflineClient, OfflineClientT};
pub use online_client::{
@@ -20,4 +20,4 @@ pub use online_client::{
};
#[cfg(feature = "unstable-light-client")]
pub use lightclient::{LightClient, LightClientBuilder, LightClientError};
pub use light_client::{LightClient, LightClientBuilder, LightClientError};
+2 -2
View File
@@ -106,7 +106,7 @@ impl<T: Config> OnlineClient<T> {
/// Construct a new [`OnlineClient`] by providing an underlying [`Backend`]
/// implementation to power it. Other details will be obtained from the chain.
pub async fn from_backend<B: Backend<T>>(backend: Arc<B>) -> Result<OnlineClient<T>, Error> {
let latest_block = backend.latest_best_block_ref().await?;
let latest_block = backend.latest_finalized_block_ref().await?;
let (genesis_hash, runtime_version, metadata) = future::join3(
backend.genesis_hash(),
@@ -437,7 +437,7 @@ impl<T: Config> RuntimeUpdaterStream<T> {
Err(err) => return Some(Err(err)),
};
let latest_block_ref = match self.client.backend().latest_best_block_ref().await {
let latest_block_ref = match self.client.backend().latest_finalized_block_ref().await {
Ok(block_ref) => block_ref,
Err(e) => return Some(Err(e)),
};
+33 -10
View File
@@ -32,16 +32,7 @@ pub use substrate::{SubstrateConfig, SubstrateExtrinsicParams, SubstrateExtrinsi
// rather than having to `unsafe impl` them ourselves.
pub trait Config: Sized + Send + Sync + 'static {
/// The output of the `Hasher` function.
type Hash: Debug
+ Copy
+ Send
+ Sync
+ Decode
+ AsRef<[u8]>
+ Serialize
+ DeserializeOwned
+ Encode
+ PartialEq;
type Hash: BlockHash;
/// The account ID type.
type AccountId: Debug + Clone + Encode;
@@ -65,6 +56,38 @@ pub trait Config: Sized + Send + Sync + 'static {
/// given some [`Config`], this return the other params needed for its `ExtrinsicParams`.
pub type OtherParamsFor<T> = <<T as Config>::ExtrinsicParams as ExtrinsicParams<T>>::OtherParams;
/// Block hashes must conform to a bunch of things to be used in Subxt.
pub trait BlockHash:
Debug
+ Copy
+ Send
+ Sync
+ Decode
+ AsRef<[u8]>
+ Serialize
+ DeserializeOwned
+ Encode
+ PartialEq
+ Eq
+ std::hash::Hash
{
}
impl<T> BlockHash for T where
T: Debug
+ Copy
+ Send
+ Sync
+ Decode
+ AsRef<[u8]>
+ Serialize
+ DeserializeOwned
+ Encode
+ PartialEq
+ Eq
+ std::hash::Hash
{
}
/// This represents the hasher used by a node to hash things like block headers
/// and extrinsics.
pub trait Hasher {
+11
View File
@@ -108,11 +108,22 @@ pub enum RpcError {
/// Error related to the RPC client.
#[error("RPC error: {0}")]
ClientError(Box<dyn std::error::Error + Send + Sync + 'static>),
/// This error signals that the request was rejected for some reason.
/// The specific reason is provided.
#[error("RPC error: request rejected")]
RequestRejected(String),
/// The RPC subscription dropped.
#[error("RPC error: subscription dropped.")]
SubscriptionDropped,
}
impl RpcError {
/// Create a `RequestRejected` error from anything that can be turned into a string.
pub fn request_rejected<S: Into<String>>(s: S) -> RpcError {
RpcError::RequestRejected(s.into())
}
}
/// Block error
#[derive(Clone, Debug, Eq, thiserror::Error, PartialEq)]
#[non_exhaustive]
+2 -2
View File
@@ -58,10 +58,10 @@ where
// return a Future that's Send + 'static, rather than tied to &self.
let client = self.client.clone();
async move {
// If a block ref isn't provided, we'll get the latest best block to use.
// If a block ref isn't provided, we'll get the latest finalized block to use.
let block_ref = match block_ref {
Some(r) => r,
None => client.backend().latest_best_block_ref().await?,
None => client.backend().latest_finalized_block_ref().await?,
};
let event_bytes = get_event_bytes(client.backend(), block_ref.hash()).await?;
+2 -2
View File
@@ -44,8 +44,8 @@ where
// return a Future that's Send + 'static, rather than tied to &self.
let client = self.client.clone();
async move {
// get the ref for the latest block and use that.
let block_ref = client.backend().latest_best_block_ref().await?;
// get the ref for the latest finalized block and use that.
let block_ref = client.backend().latest_finalized_block_ref().await?;
Ok(RuntimeApi::new(client, block_ref))
}
+2 -2
View File
@@ -85,8 +85,8 @@ where
// return a Future that's Send + 'static, rather than tied to &self.
let client = self.client.clone();
async move {
// get the ref for the latest block and use that.
let block_ref = client.backend().latest_best_block_ref().await?;
// get the ref for the latest finalized block and use that.
let block_ref = client.backend().latest_finalized_block_ref().await?;
Ok(Storage::new(client, block_ref))
}
+4 -3
View File
@@ -169,7 +169,7 @@ where
{
/// Get the account nonce for a given account ID.
pub async fn account_nonce(&self, account_id: &T::AccountId) -> Result<u64, Error> {
let block_ref = self.client.backend().latest_best_block_ref().await?;
let block_ref = self.client.backend().latest_finalized_block_ref().await?;
let account_nonce_bytes = self
.client
.backend()
@@ -486,6 +486,7 @@ where
TransactionStatus::Validated
| TransactionStatus::Broadcasted { .. }
| TransactionStatus::InBestBlock { .. }
| TransactionStatus::NoLongerInBestBlock
| TransactionStatus::InFinalizedBlock { .. } => Ok(ext_hash),
TransactionStatus::Error { message } => {
Err(Error::Other(format!("Transaction error: {message}")))
@@ -509,7 +510,7 @@ where
///
/// Returns `Ok` with a [`ValidationResult`], which is the result of attempting to dry run the extrinsic.
pub async fn validate(&self) -> Result<ValidationResult, Error> {
let latest_block_ref = self.client.backend().latest_best_block_ref().await?;
let latest_block_ref = self.client.backend().latest_finalized_block_ref().await?;
self.validate_at(latest_block_ref).await
}
@@ -547,7 +548,7 @@ where
pub async fn partial_fee_estimate(&self) -> Result<u128, Error> {
let mut params = self.encoded().to_vec();
(self.encoded().len() as u32).encode_to(&mut params);
let latest_block_ref = self.client.backend().latest_best_block_ref().await?;
let latest_block_ref = self.client.backend().latest_finalized_block_ref().await?;
// destructuring RuntimeDispatchInfo, see type information <https://paritytech.github.io/substrate/master/pallet_transaction_payment_rpc_runtime_api/struct.RuntimeDispatchInfo.html>
// data layout: {weight_ref_time: Compact<u64>, weight_proof_size: Compact<u64>, class: u8, partial_fee: u128}
+10 -7
View File
@@ -8,7 +8,7 @@ use std::task::Poll;
use crate::utils::strip_compact_prefix;
use crate::{
backend::{StreamOfResults, TransactionStatus as BackendTxStatus},
backend::{BlockRef, StreamOfResults, TransactionStatus as BackendTxStatus},
client::OnlineClientT,
error::{DispatchError, Error, RpcError, TransactionError},
events::EventsClient,
@@ -167,6 +167,7 @@ impl<T: Config, C: Clone> Stream for TxProgress<T, C> {
match status {
BackendTxStatus::Validated => TxStatus::Validated,
BackendTxStatus::Broadcasted { num_peers } => TxStatus::Broadcasted { num_peers },
BackendTxStatus::NoLongerInBestBlock => TxStatus::NoLongerInBestBlock,
BackendTxStatus::InBestBlock { hash } => {
TxStatus::InBestBlock(TxInBlock::new(hash, self.ext_hash, self.client.clone()))
}
@@ -207,6 +208,8 @@ pub enum TxStatus<T: Config, C> {
/// Number of peers it's been broadcast to.
num_peers: u32,
},
/// Transaction is no longer in a best block.
NoLongerInBestBlock,
/// Transaction has been included in block with given hash.
InBestBlock(TxInBlock<T, C>),
/// Transaction has been finalized by a finality-gadget, e.g GRANDPA
@@ -252,15 +255,15 @@ impl<T: Config, C> TxStatus<T, C> {
#[derive(Derivative)]
#[derivative(Debug(bound = "C: std::fmt::Debug"))]
pub struct TxInBlock<T: Config, C> {
block_hash: T::Hash,
block_ref: BlockRef<T::Hash>,
ext_hash: T::Hash,
client: C,
}
impl<T: Config, C> TxInBlock<T, C> {
pub(crate) fn new(block_hash: T::Hash, ext_hash: T::Hash, client: C) -> Self {
pub(crate) fn new(block_ref: BlockRef<T::Hash>, ext_hash: T::Hash, client: C) -> Self {
Self {
block_hash,
block_ref,
ext_hash,
client,
}
@@ -268,7 +271,7 @@ impl<T: Config, C> TxInBlock<T, C> {
/// Return the hash of the block that the transaction has made it into.
pub fn block_hash(&self) -> T::Hash {
self.block_hash
self.block_ref.hash()
}
/// Return the hash of the extrinsic that was submitted.
@@ -317,7 +320,7 @@ impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
let block_body = self
.client
.backend()
.block_body(self.block_hash)
.block_body(self.block_ref.hash())
.await?
.ok_or(Error::Transaction(TransactionError::BlockNotFound))?;
@@ -336,7 +339,7 @@ impl<T: Config, C: OnlineClientT<T>> TxInBlock<T, C> {
.ok_or(Error::Transaction(TransactionError::BlockNotFound))?;
let events = EventsClient::new(self.client.clone())
.at(self.block_hash)
.at(self.block_ref.clone())
.await?;
Ok(crate::blocks::ExtrinsicEvents::new(
+4
View File
@@ -18,6 +18,10 @@ default = []
# Enable to run the tests with Light Client support.
unstable-light-client = ["subxt/unstable-light-client"]
# Enable this to use the unstable backend in tests _instead of_
# the default one which relies on the "old" RPC methods.
unstable-backend-client = []
[dev-dependencies]
assert_matches = { workspace = true }
codec = { package = "parity-scale-codec", workspace = true, features = ["derive", "bit-vec"] }
@@ -5,30 +5,73 @@
use crate::{test_context, utils::node_runtime};
use codec::{Compact, Encode};
use futures::StreamExt;
use subxt::blocks::BlocksClient;
use subxt_metadata::Metadata;
use subxt_signer::sr25519::dev;
// Check that we can subscribe to non-finalized blocks.
#[tokio::test]
async fn non_finalized_headers_subscription() -> Result<(), subxt::Error> {
async fn block_subscriptions_are_consistent_with_eachother() -> Result<(), subxt::Error> {
let ctx = test_context().await;
let api = ctx.client();
let mut sub = api.blocks().subscribe_best().await?;
let mut all_sub = api.blocks().subscribe_all().await?;
let mut best_sub = api.blocks().subscribe_best().await?;
let mut finalized_sub = api.blocks().subscribe_finalized().await?;
// Wait for the next set of headers, and check that the
// associated block hash is the one we just finalized.
// (this can be a bit slow as we have to wait for finalization)
let header = sub.next().await.unwrap()?;
let block_hash = header.hash();
let current_block_hash = api.backend().latest_best_block_ref().await?.hash();
let mut finals = vec![];
let mut bests = vec![];
let mut alls = vec![];
// Finalization can run behind a bit; blocks that were reported a while ago can
// only just now be being finalized (in the new RPCs this isn't true and we'll be
// told about all of those blocks up front). So, first we wait until finalization reports
// a block that we've seen as new.
loop {
tokio::select! {biased;
Some(Ok(b)) = all_sub.next() => alls.push(b.hash()),
Some(Ok(b)) = best_sub.next() => bests.push(b.hash()),
Some(Ok(b)) = finalized_sub.next() => if alls.contains(&b.hash()) { break },
}
}
// Now, gather a couple more finalized blocks as well as anything else we hear about.
while finals.len() < 2 {
tokio::select! {biased;
Some(Ok(b)) = all_sub.next() => alls.push(b.hash()),
Some(Ok(b)) = best_sub.next() => bests.push(b.hash()),
Some(Ok(b)) = finalized_sub.next() => finals.push(b.hash()),
}
}
// Check that the items in the first slice are found in the same order in the second slice.
fn are_same_order_in<T: PartialEq>(a_items: &[T], b_items: &[T]) -> bool {
let mut b_idx = 0;
for a in a_items {
if let Some((idx, _)) = b_items[b_idx..]
.iter()
.enumerate()
.find(|(_idx, b)| a == *b)
{
b_idx += idx;
} else {
return false;
}
}
true
}
// Final blocks and best blocks should both be subsets of _all_ of the blocks reported.
assert!(
are_same_order_in(&bests, &alls),
"Best set {bests:?} should be a subset of all: {alls:?}"
);
assert!(
are_same_order_in(&finals, &alls),
"Final set {finals:?} should be a subset of all: {alls:?}"
);
assert_eq!(block_hash, current_block_hash);
Ok(())
}
// Check that we can subscribe to finalized blocks.
#[tokio::test]
async fn finalized_headers_subscription() -> Result<(), subxt::Error> {
let ctx = test_context().await;
@@ -36,13 +79,13 @@ async fn finalized_headers_subscription() -> Result<(), subxt::Error> {
let mut sub = api.blocks().subscribe_finalized().await?;
// Wait for the next set of headers, and check that the
// associated block hash is the one we just finalized.
// (this can be a bit slow as we have to wait for finalization)
let header = sub.next().await.unwrap()?;
let finalized_hash = api.backend().latest_finalized_block_ref().await?.hash();
// check that the finalized block reported lines up with the `latest_finalized_block_ref`.
for _ in 0..2 {
let header = sub.next().await.unwrap()?;
let finalized_hash = api.backend().latest_finalized_block_ref().await?.hash();
assert_eq!(header.hash(), finalized_hash);
}
assert_eq!(header.hash(), finalized_hash);
Ok(())
}
@@ -117,17 +160,17 @@ async fn runtime_api_call() -> Result<(), subxt::Error> {
}
#[tokio::test]
async fn decode_extrinsics() {
async fn fetch_block_and_decode_extrinsic_details() {
let ctx = test_context().await;
let api = ctx.client();
let alice = dev::alice();
let bob = dev::bob();
// Generate a block that has unsigned and signed transactions.
// Setup; put an extrinsic into a block:
let tx = node_runtime::tx()
.balances()
.transfer(bob.public_key().into(), 10_000);
.transfer_allow_death(bob.public_key().into(), 10_000);
let signed_extrinsic = api
.tx()
@@ -143,19 +186,21 @@ async fn decode_extrinsics() {
.await
.unwrap();
// Now, separately, download that block. Let's see what it contains..
let block_hash = in_block.block_hash();
let block = BlocksClient::new(api).at(block_hash).await.unwrap();
let block = api.blocks().at(block_hash).await.unwrap();
let extrinsics = block.extrinsics().await.unwrap();
assert_eq!(extrinsics.len(), 2);
assert_eq!(extrinsics.block_hash(), block_hash);
// `.has` should work and find a transfer call.
assert!(extrinsics
.has::<node_runtime::balances::calls::types::Transfer>()
.has::<node_runtime::balances::calls::types::TransferAllowDeath>()
.unwrap());
// `.find_first` should similarly work to find the transfer call:
assert!(extrinsics
.find_first::<node_runtime::balances::calls::types::Transfer>()
.find_first::<node_runtime::balances::calls::types::TransferAllowDeath>()
.unwrap()
.is_some());
@@ -164,7 +209,7 @@ async fn decode_extrinsics() {
.map(|res| res.unwrap())
.collect::<Vec<_>>();
assert_eq!(block_extrinsics.len(), 2);
// All blocks contain a timestamp; check this first:
let timestamp = block_extrinsics.get(0).unwrap();
timestamp.as_root_extrinsic::<node_runtime::Call>().unwrap();
timestamp
@@ -172,9 +217,13 @@ async fn decode_extrinsics() {
.unwrap();
assert!(!timestamp.is_signed());
// Next we expect our transfer:
let tx = block_extrinsics.get(1).unwrap();
tx.as_root_extrinsic::<node_runtime::Call>().unwrap();
tx.as_extrinsic::<node_runtime::balances::calls::types::Transfer>()
let ext = tx
.as_extrinsic::<node_runtime::balances::calls::types::TransferAllowDeath>()
.unwrap()
.unwrap();
assert_eq!(ext.value, 10_000);
assert!(tx.is_signed());
}
@@ -72,7 +72,7 @@ async fn transaction_validation() {
let tx = node_runtime::tx()
.balances()
.transfer(bob.public_key().into(), 10_000);
.transfer_allow_death(bob.public_key().into(), 10_000);
let signed_extrinsic = api
.tx()
@@ -110,7 +110,7 @@ async fn validation_fails() {
// The actual TX is not important; the account has no funds to pay for it.
let tx = node_runtime::tx()
.balances()
.transfer(to.public_key().into(), 1);
.transfer_allow_death(to.public_key().into(), 1);
let signed_extrinsic = api
.tx()
@@ -232,7 +232,7 @@ async fn unsigned_extrinsic_is_same_shape_as_polkadotjs() {
let tx = node_runtime::tx()
.balances()
.transfer(dev::alice().public_key().into(), 12345000000000000);
.transfer_allow_death(dev::alice().public_key().into(), 12345000000000000);
let actual_tx = api.tx().create_unsigned(&tx).unwrap();
@@ -242,10 +242,10 @@ async fn unsigned_extrinsic_is_same_shape_as_polkadotjs() {
// - start local substrate node.
// - open polkadot.js UI in browser and point at local node.
// - open dev console (may need to refresh page now) and find the WS connection.
// - create a balances.transfer to ALICE with 12345 and "submit unsigned".
// - create a balances.transferAllowDeath to ALICE (doesn't matter who from) with 12345 and "submit unsigned".
// - find the submitAndWatchExtrinsic call in the WS connection to get these bytes:
let expected_tx_bytes = hex::decode(
"b004060700d43593c715fdd31c61141abd04a99fd6822c8558854ccde39a5684e7a56da27d0f0090c04bb6db2b"
"b004060000d43593c715fdd31c61141abd04a99fd6822c8558854ccde39a5684e7a56da27d0f0090c04bb6db2b"
)
.unwrap();
@@ -261,7 +261,7 @@ async fn extrinsic_hash_is_same_as_returned() {
let payload = node_runtime::tx()
.balances()
.transfer(dev::alice().public_key().into(), 12345000000000000);
.transfer_allow_death(dev::alice().public_key().into(), 12345000000000000);
let tx = api
.tx()
@@ -314,7 +314,7 @@ async fn partial_fee_estimate_correct() {
let bob = dev::bob();
let tx = node_runtime::tx()
.balances()
.transfer(bob.public_key().into(), 1_000_000_000_000);
.transfer_allow_death(bob.public_key().into(), 1_000_000_000_000);
let signed_extrinsic = api
.tx()
@@ -326,7 +326,7 @@ async fn partial_fee_estimate_correct() {
let partial_fee_1 = signed_extrinsic.partial_fee_estimate().await.unwrap();
// Method II: TransactionPaymentApi_query_fee_details + calculations
let latest_block_ref = api.backend().latest_best_block_ref().await.unwrap();
let latest_block_ref = api.backend().latest_finalized_block_ref().await.unwrap();
let len_bytes: [u8; 4] = (signed_extrinsic.encoded().len() as u32).to_le_bytes();
let encoded_with_len = [signed_extrinsic.encoded(), &len_bytes[..]].concat();
let InclusionFee {
@@ -8,8 +8,8 @@
use crate::{test_context, utils::node_runtime};
use assert_matches::assert_matches;
use codec::Encode;
use futures::Stream;
use subxt::{
backend::rpc::RpcSubscription,
backend::unstable::rpc_methods::{
FollowEvent, Initialized, MethodResponse, RuntimeEvent, RuntimeVersionEvent, StorageQuery,
StorageQueryType,
@@ -153,7 +153,7 @@ async fn chainhead_unstable_storage() {
event,
FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id &&
res.items.len() == 1 &&
res.items[0].key == format!("0x{}", hex::encode(addr_bytes))
res.items[0].key.0 == addr_bytes
);
let event = next_operation_event(&mut blocks).await;
@@ -218,8 +218,6 @@ async fn chainhead_unstable_unpin() {
assert!(rpc.chainhead_unstable_unpin(sub_id, hash).await.is_err());
}
// Ignored until this is implemented in Substrate
#[ignore]
#[tokio::test]
async fn chainspec_v1_genesishash() {
let ctx = test_context().await;
@@ -232,22 +230,18 @@ async fn chainspec_v1_genesishash() {
assert_eq!(a, b);
}
// Ignored until this is implemented in Substrate
#[ignore]
#[tokio::test]
async fn chainspec_v1_chainname() {
let ctx = test_context().await;
let old_rpc = ctx.legacy_rpc_methods().await;
let rpc = ctx.unstable_rpc_methods().await;
let a = old_rpc.system_name().await.unwrap();
let a = old_rpc.system_chain().await.unwrap();
let b = rpc.chainspec_v1_chain_name().await.unwrap();
assert_eq!(a, b);
}
// Ignored until this is implemented in Substrate
#[ignore]
#[tokio::test]
async fn chainspec_v1_properties() {
let ctx = test_context().await;
@@ -290,9 +284,14 @@ async fn transaction_unstable_submit_and_watch() {
}
/// Ignore block related events and obtain the next event related to an operation.
async fn next_operation_event<T: serde::de::DeserializeOwned>(
sub: &mut RpcSubscription<FollowEvent<T>>,
async fn next_operation_event<
T: serde::de::DeserializeOwned,
S: Unpin + Stream<Item = Result<FollowEvent<T>, subxt::Error>>,
>(
sub: &mut S,
) -> FollowEvent<T> {
use futures::StreamExt;
// Number of events to wait for the next operation event.
const NUM_EVENTS: usize = 10;
@@ -41,7 +41,9 @@ async fn tx_basic_transfer() -> Result<(), subxt::Error> {
.fetch_or_default(&bob_account_addr)
.await?;
let tx = node_runtime::tx().balances().transfer(bob_address, 10_000);
let tx = node_runtime::tx()
.balances()
.transfer_allow_death(bob_address, 10_000);
let events = api
.tx()
@@ -118,7 +120,7 @@ async fn tx_dynamic_transfer() -> Result<(), subxt::Error> {
let tx = subxt::dynamic::tx(
"Balances",
"transfer",
"transfer_allow_death",
vec![
Value::unnamed_variant(
"Id",
@@ -206,14 +208,14 @@ async fn tx_dynamic_transfer() -> Result<(), subxt::Error> {
}
#[tokio::test]
async fn multiple_transfers_work_nonce_incremented() -> Result<(), subxt::Error> {
async fn multiple_sequential_transfers_work() -> Result<(), subxt::Error> {
let alice = dev::alice();
let bob = dev::bob();
let bob_address: MultiAddress<AccountId32, u32> = bob.public_key().into();
let ctx = test_context().await;
let api = ctx.client();
let bob_account_addr = node_runtime::storage()
let bob_account_info_addr = node_runtime::storage()
.system()
.account(bob.public_key().to_account_id());
@@ -221,19 +223,19 @@ async fn multiple_transfers_work_nonce_incremented() -> Result<(), subxt::Error>
.storage()
.at_latest()
.await?
.fetch_or_default(&bob_account_addr)
.fetch_or_default(&bob_account_info_addr)
.await?;
// Do a transfer several times. If this works, it indicates that the
// nonce is properly incremented each time.
let tx = node_runtime::tx()
.balances()
.transfer(bob_address.clone(), 10_000);
.transfer_allow_death(bob_address.clone(), 10_000);
for _ in 0..3 {
api.tx()
.sign_and_submit_then_watch_default(&tx, &alice)
.await?
.wait_for_in_block() // Don't need to wait for finalization; this is quicker.
.await?
.wait_for_success()
.wait_for_finalized_success()
.await?;
}
@@ -241,7 +243,7 @@ async fn multiple_transfers_work_nonce_incremented() -> Result<(), subxt::Error>
.storage()
.at_latest()
.await?
.fetch_or_default(&bob_account_addr)
.fetch_or_default(&bob_account_info_addr)
.await?;
assert_eq!(bob_pre.data.free + 30_000, bob_post.data.free);
@@ -317,10 +319,10 @@ async fn transfer_error() {
let to_bob_tx = node_runtime::tx()
.balances()
.transfer(bob_address, 100_000_000_000_000_000);
.transfer_allow_death(bob_address, 100_000_000_000_000_000);
let to_alice_tx = node_runtime::tx()
.balances()
.transfer(alice_addr, 100_000_000_000_000_000);
.transfer_allow_death(alice_addr, 100_000_000_000_000_000);
api.tx()
.sign_and_submit_then_watch_default(&to_bob_tx, &alice)
@@ -360,7 +362,7 @@ async fn transfer_implicit_subscription() {
let to_bob_tx = node_runtime::tx()
.balances()
.transfer(bob.clone().into(), 10_000);
.transfer_allow_death(bob.clone().into(), 10_000);
let event = api
.tx()
@@ -203,7 +203,6 @@ async fn tx_call() {
let info_addr = node_runtime::storage()
.contracts()
.contract_info_of(&contract);
let info_addr_iter = node_runtime::storage().contracts().contract_info_of_iter();
let contract_info = cxt
.client()
@@ -213,7 +212,13 @@ async fn tx_call() {
.unwrap()
.fetch(&info_addr)
.await;
assert!(contract_info.is_ok());
assert!(
contract_info.is_ok(),
"Contract info is not ok, is: {contract_info:?}"
);
let info_addr_iter = node_runtime::storage().contracts().contract_info_of_iter();
let keys_and_values = cxt
.client()
@@ -23,7 +23,7 @@ async fn test_sudo() -> Result<(), subxt::Error> {
let alice = dev::alice();
let bob = dev::bob().public_key().into();
let call = Call::Balances(BalancesCall::transfer {
let call = Call::Balances(BalancesCall::transfer_allow_death {
dest: bob,
value: 10_000,
});
@@ -49,7 +49,7 @@ async fn test_sudo_unchecked_weight() -> Result<(), subxt::Error> {
let alice = dev::alice();
let bob = dev::bob().public_key().into();
let call = Call::Balances(BalancesCall::transfer {
let call = Call::Balances(BalancesCall::transfer_allow_death {
dest: bob,
value: 10_000,
});
@@ -59,7 +59,9 @@ async fn unchecked_extrinsic_encoding() -> Result<(), subxt::Error> {
let bob_address = bob.public_key().to_address();
// Construct a tx from Alice to Bob.
let tx = node_runtime::tx().balances().transfer(bob_address, 10_000);
let tx = node_runtime::tx()
.balances()
.transfer_allow_death(bob_address, 10_000);
let signed_extrinsic = api
.tx()
+5
View File
@@ -2,6 +2,11 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
#[cfg(all(feature = "unstable-light-client", feature = "unstable-backend-client"))]
compile_error!(
"The features 'unstable-light-client' and 'unstable-backend-client' cannot be used together"
);
#[cfg(test)]
pub mod utils;
@@ -40,11 +40,6 @@ type Client = LightClient<PolkadotConfig>;
// Check that we can subscribe to non-finalized blocks.
async fn non_finalized_headers_subscription(api: &Client) -> Result<(), subxt::Error> {
let mut sub = api.blocks().subscribe_best().await?;
let header = sub.next().await.unwrap()?;
let block_hash = header.hash();
let current_block_hash = api.backend().latest_best_block_ref().await.unwrap().hash();
assert_eq!(block_hash, current_block_hash);
let _block = sub.next().await.unwrap()?;
let _block = sub.next().await.unwrap()?;
@@ -3,6 +3,7 @@
// see LICENSE for license details.
use std::ffi::{OsStr, OsString};
use std::sync::Arc;
use substrate_runner::SubstrateNode;
use subxt::{
backend::{legacy, rpc, unstable},
@@ -118,9 +119,14 @@ impl TestNodeProcessBuilder {
#[cfg(feature = "unstable-light-client")]
let client = build_light_client(&proc).await;
// Connect to the node with a subxt client:
#[cfg(not(feature = "unstable-light-client"))]
let client = OnlineClient::from_url(ws_url.clone()).await;
#[cfg(feature = "unstable-backend-client")]
let client = build_unstable_client(&proc).await;
#[cfg(all(
not(feature = "unstable-light-client"),
not(feature = "unstable-backend-client")
))]
let client = build_legacy_client(&proc).await;
match client {
Ok(client) => Ok(TestNodeProcess { proc, client }),
@@ -129,13 +135,59 @@ impl TestNodeProcessBuilder {
}
}
#[cfg(all(
not(feature = "unstable-light-client"),
not(feature = "unstable-backend-client")
))]
async fn build_legacy_client<T: Config>(proc: &SubstrateNode) -> Result<OnlineClient<T>, String> {
let ws_url = format!("ws://127.0.0.1:{}", proc.ws_port());
let rpc_client = rpc::RpcClient::from_url(ws_url)
.await
.map_err(|e| format!("Cannot construct RPC client: {e}"))?;
let backend = legacy::LegacyBackend::new(rpc_client);
let client = OnlineClient::from_backend(Arc::new(backend))
.await
.map_err(|e| format!("Cannot construct OnlineClient from backend: {e}"))?;
Ok(client)
}
#[cfg(feature = "unstable-backend-client")]
async fn build_unstable_client<T: Config>(proc: &SubstrateNode) -> Result<OnlineClient<T>, String> {
let ws_url = format!("ws://127.0.0.1:{}", proc.ws_port());
let rpc_client = rpc::RpcClient::from_url(ws_url)
.await
.map_err(|e| format!("Cannot construct RPC client: {e}"))?;
let (backend, mut driver) = unstable::UnstableBackend::builder().build(rpc_client);
// The unstable backend needs driving:
tokio::spawn(async move {
use futures::StreamExt;
while let Some(val) = driver.next().await {
if let Err(e) = val {
eprintln!("Error driving unstable backend: {e}");
break;
}
}
});
let client = OnlineClient::from_backend(Arc::new(backend))
.await
.map_err(|e| format!("Cannot construct OnlineClient from backend: {e}"))?;
Ok(client)
}
#[cfg(feature = "unstable-light-client")]
async fn build_light_client<R: Config>(proc: &SubstrateNode) -> Result<LightClient<R>, String> {
async fn build_light_client<T: Config>(proc: &SubstrateNode) -> Result<LightClient<T>, String> {
// RPC endpoint.
let ws_url = format!("ws://127.0.0.1:{}", proc.ws_port());
// Step 1. Wait for a few blocks to be produced using the subxt client.
let client = OnlineClient::<R>::from_url(ws_url.clone())
let client = OnlineClient::<T>::from_url(ws_url.clone())
.await
.map_err(|err| format!("Failed to connect to node rpc at {ws_url}: {err}"))?;
@@ -4,23 +4,13 @@
use subxt::{client::OnlineClientT, Config};
/// Wait for blocks to be produced before running tests. Waiting for two blocks
/// (the genesis block and another one) seems to be enough to allow tests
/// like `validation_passes` to work properly.
///
/// If the "unstable-light-client" feature flag is enabled, this will wait for
/// 5 blocks instead of two. The light client needs the extra blocks to avoid
/// errors caused by loading information that is not available in the first 2 blocks
/// (`Failed to load the block weight for block`).
/// Wait for blocks to be produced before running tests. Specifically, we
/// wait for one more finalized block to be produced, which is important because
/// the first finalized block doesn't have much state etc associated with it.
pub async fn wait_for_blocks<C: Config>(api: &impl OnlineClientT<C>) {
let mut sub = api.backend().stream_all_block_headers().await.unwrap();
let mut sub = api.blocks().subscribe_finalized().await.unwrap();
// The current finalized block:
sub.next().await;
// The next one:
sub.next().await;
#[cfg(feature = "unstable-light-client")]
{
sub.next().await;
sub.next().await;
sub.next().await;
}
}
+1 -1
View File
@@ -63,7 +63,7 @@ async fn run() {
// Save metadata to a file:
let out_dir = env::var_os("OUT_DIR").unwrap();
let metadata_path = Path::new(&out_dir).join("metadata.scale");
let metadata_path = Path::new(&out_dir).join("test_node_runtime_metadata.scale");
fs::write(&metadata_path, metadata_bytes).expect("Couldn't write metadata output");
// Write out our expression to generate the runtime API to a file. Ideally, we'd just write this code
+4 -1
View File
@@ -5,6 +5,9 @@
#![allow(clippy::too_many_arguments)]
/// The SCALE encoded metadata obtained from a local run of a substrate node.
pub static METADATA: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/metadata.scale"));
pub static METADATA: &[u8] = include_bytes!(concat!(
env!("OUT_DIR"),
"/test_node_runtime_metadata.scale"
));
include!(concat!(env!("OUT_DIR"), "/runtime.rs"));
@@ -103,33 +103,33 @@ fn main() {
// We assume Polkadot's config of MultiAddress<AccountId32, ()> here
let _ = node_runtime::tx()
.balances()
.transfer(CustomAddress(1337), 123);
.transfer_allow_death(CustomAddress(1337), 123);
let _ = node_runtime2::tx()
.balances()
.transfer(Generic(AccountId32::from([0x01;32])), 123);
.transfer_allow_death(Generic(AccountId32::from([0x01;32])), 123);
let _ = node_runtime3::tx()
.balances()
.transfer(Generic(()), 123);
.transfer_allow_death(Generic(()), 123);
let _ = node_runtime4::tx()
.balances()
.transfer(Second((), AccountId32::from([0x01;32])), 123);
.transfer_allow_death(Second((), AccountId32::from([0x01;32])), 123);
let _ = node_runtime5::tx()
.balances()
.transfer(Second(true, vec![1u8, 2u8]), 123);
.transfer_allow_death(Second(true, vec![1u8, 2u8]), 123);
let _ = node_runtime6::tx()
.balances()
.transfer(Second((), 1234u16), 123);
.transfer_allow_death(Second((), 1234u16), 123);
let _ = node_runtime7::tx()
.balances()
.transfer(subxt::utils::Static(DoesntImplEncodeDecodeAsType(1337)), 123);
.transfer_allow_death(subxt::utils::Static(DoesntImplEncodeDecodeAsType(1337)), 123);
let _ = node_runtime8::tx()
.balances()
.transfer(subxt::utils::Static(Second(AccountId32::from([0x01;32]), ())), 123);
.transfer_allow_death(subxt::utils::Static(Second(AccountId32::from([0x01;32]), ())), 123);
}