diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index ccf90cb64e..2e82af2664 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -2,14 +2,12 @@ name: Daily compatibility check against latest substrate on: schedule: - # Run at 8am every day + # Run at 8am every day, well after the new binary is built - cron: "0 8 * * *" env: CARGO_TERM_COLOR: always - # Use latest substrate for nightly runs: - SUBSTRATE_URL: https://releases.parity.io/substrate/x86_64-debian:bullseye/latest/substrate/substrate jobs: tests: @@ -19,13 +17,19 @@ jobs: - name: Checkout sources uses: actions/checkout@v4 - - name: Download Substrate + - name: Download substrate-node binary + id: download-artifact + uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0 + with: + workflow: build-substrate.yml + name: nightly-substrate-binary + + - name: Prepare substrate-node binary run: | - curl $SUBSTRATE_URL --output substrate-node --location - chmod +x substrate-node - ./substrate-node --version - mkdir -p ~/.local/bin - mv substrate-node ~/.local/bin + chmod u+x ./substrate-node + ./substrate-node --version + mkdir -p ~/.local/bin + cp ./substrate-node ~/.local/bin - name: Install Rust stable toolchain uses: actions-rs/toolchain@v1 @@ -43,8 +47,7 @@ jobs: command: test args: --all-targets --workspace - # If the previous step fails, create a new Github issue - # to nofity us about it. + # If any previous step fails, create a new Github issue to notify us about it. - if: ${{ failure() }} uses: JasonEtco/create-an-issue@e27dddc79c92bc6e4562f268fffa5ed752639abd # v2.9.1 env: diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index d55efb2b65..6cf003e7e0 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -18,8 +18,6 @@ concurrency: env: CARGO_TERM_COLOR: always - # TODO: Currently pointing at latest substrate; is there a suitable binary we can pin to here? - SUBSTRATE_URL: https://releases.parity.io/substrate/x86_64-debian:bullseye/latest/substrate/substrate # Increase wasm test timeout from 20 seconds (default) to 1 minute. WASM_BINDGEN_TEST_TIMEOUT: 60 @@ -31,13 +29,19 @@ jobs: - name: Checkout sources uses: actions/checkout@v4 - - name: Download Substrate + - name: Download substrate-node binary + id: download-artifact + uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0 + with: + workflow: build-substrate.yml + name: nightly-substrate-binary + + - name: Prepare substrate-node binary run: | - curl $SUBSTRATE_URL --output substrate-node --location - chmod +x substrate-node - ./substrate-node --version - mkdir -p ~/.local/bin - mv substrate-node ~/.local/bin + chmod u+x ./substrate-node + ./substrate-node --version + mkdir -p ~/.local/bin + cp ./substrate-node ~/.local/bin - name: Install Rust stable toolchain uses: actions-rs/toolchain@v1 @@ -137,13 +141,19 @@ jobs: - name: Checkout sources uses: actions/checkout@v4 - - name: Download Substrate + - name: Download substrate-node binary + id: download-artifact + uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0 + with: + workflow: build-substrate.yml + name: nightly-substrate-binary + + - name: Prepare substrate-node binary run: | - curl $SUBSTRATE_URL --output substrate-node --location - chmod +x substrate-node - ./substrate-node --version - mkdir -p ~/.local/bin - mv substrate-node ~/.local/bin + chmod u+x ./substrate-node + ./substrate-node --version + mkdir -p ~/.local/bin + cp ./substrate-node ~/.local/bin - name: Install Rust stable toolchain uses: actions-rs/toolchain@v1 @@ -165,19 +175,25 @@ jobs: args: --doc tests: - name: "Test non-wasm" + name: "Test (Native)" runs-on: ubuntu-latest-16-cores steps: - name: Checkout sources uses: actions/checkout@v4 - - name: Download Substrate + - name: Download substrate-node binary + id: download-artifact + uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0 + with: + workflow: build-substrate.yml + name: nightly-substrate-binary + + - name: Prepare substrate-node binary run: | - curl $SUBSTRATE_URL --output substrate-node --location - chmod +x substrate-node - ./substrate-node --version - mkdir -p ~/.local/bin - mv substrate-node ~/.local/bin + chmod u+x ./substrate-node + ./substrate-node --version + mkdir -p ~/.local/bin + cp ./substrate-node ~/.local/bin - name: Install Rust stable toolchain uses: actions-rs/toolchain@v1 @@ -198,21 +214,67 @@ jobs: command: nextest args: run --workspace + unstable_backend_tests: + name: "Test (Unstable Backend)" + runs-on: ubuntu-latest-16-cores + steps: + - name: Checkout sources + uses: actions/checkout@v3 + + - name: Download substrate-node binary + id: download-artifact + uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0 + with: + workflow: build-substrate.yml + name: nightly-substrate-binary + + - name: Prepare substrate-node binary + run: | + chmod u+x ./substrate-node + ./substrate-node --version + mkdir -p ~/.local/bin + cp ./substrate-node ~/.local/bin + + - name: Install Rust stable toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + + - name: Rust Cache + uses: Swatinem/rust-cache@e207df5d269b42b69c8bc5101da26f7d31feddb4 # v2.6.2 + + - name: Install cargo-nextest + run: cargo install cargo-nextest + + - name: Run tests + uses: actions-rs/cargo@v1.0.3 + with: + command: nextest + args: run --workspace --features unstable-backend-client + light_client_tests: - name: "Test Light Client" + name: "Test (Light Client)" runs-on: ubuntu-latest-16-cores timeout-minutes: 25 steps: - name: Checkout sources uses: actions/checkout@v4 - - name: Download Substrate + - name: Download substrate-node binary + id: download-artifact + uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0 + with: + workflow: build-substrate.yml + name: nightly-substrate-binary + + - name: Prepare substrate-node binary run: | - curl $SUBSTRATE_URL --output substrate-node --location - chmod +x substrate-node - ./substrate-node --version - mkdir -p ~/.local/bin - mv substrate-node ~/.local/bin + chmod u+x ./substrate-node + ./substrate-node --version + mkdir -p ~/.local/bin + cp ./substrate-node ~/.local/bin - name: Install Rust stable toolchain uses: actions-rs/toolchain@v1 @@ -230,40 +292,8 @@ jobs: command: test args: --release --package integration-tests --features unstable-light-client - clippy: - name: Cargo clippy - runs-on: ubuntu-latest - steps: - - name: Checkout sources - uses: actions/checkout@v4 - - - name: Download Substrate - run: | - curl $SUBSTRATE_URL --output substrate-node --location - chmod +x substrate-node - ./substrate-node --version - mkdir -p ~/.local/bin - mv substrate-node ~/.local/bin - - - name: Install Rust stable toolchain - uses: actions-rs/toolchain@v1 - with: - profile: minimal - toolchain: stable - components: clippy - override: true - - - name: Rust Cache - uses: Swatinem/rust-cache@a95ba195448af2da9b00fb742d14ffaaf3c21f43 # v2.7.0 - - - name: Run clippy - uses: actions-rs/cargo@v1 - with: - command: clippy - args: --all-targets -- -D warnings - wasm_tests: - name: Test wasm + name: Test (WASM) runs-on: ubuntu-latest steps: @@ -281,13 +311,19 @@ jobs: - name: Rust Cache uses: Swatinem/rust-cache@a95ba195448af2da9b00fb742d14ffaaf3c21f43 # v2.7.0 - - name: Download Substrate + - name: Download substrate-node binary + id: download-artifact + uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0 + with: + workflow: build-substrate.yml + name: nightly-substrate-binary + + - name: Prepare substrate-node binary run: | - curl $SUBSTRATE_URL --output substrate-node --location - chmod +x substrate-node - ./substrate-node --version - mkdir -p ~/.local/bin - mv substrate-node ~/.local/bin + chmod u+x ./substrate-node + ./substrate-node --version + mkdir -p ~/.local/bin + cp ./substrate-node ~/.local/bin - name: Run subxt WASM tests run: | @@ -314,3 +350,41 @@ jobs: wasm-pack test --headless --firefox wasm-pack test --headless --chrome working-directory: signer/wasm-tests + + clippy: + name: Cargo clippy + runs-on: ubuntu-latest + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Download substrate-node binary + id: download-artifact + uses: dawidd6/action-download-artifact@268677152d06ba59fcec7a7f0b5d961b6ccd7e1e # v2.28.0 + with: + workflow: build-substrate.yml + name: nightly-substrate-binary + + - name: Prepare substrate-node binary + run: | + chmod u+x ./substrate-node + ./substrate-node --version + mkdir -p ~/.local/bin + cp ./substrate-node ~/.local/bin + + - name: Install Rust stable toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + components: clippy + override: true + + - name: Rust Cache + uses: Swatinem/rust-cache@a95ba195448af2da9b00fb742d14ffaaf3c21f43 # v2.7.0 + + - name: Run clippy + uses: actions-rs/cargo@v1 + with: + command: clippy + args: --all-targets -- -D warnings diff --git a/metadata/src/utils/validation.rs b/metadata/src/utils/validation.rs index 1ce551e026..6656c6dbfc 100644 --- a/metadata/src/utils/validation.rs +++ b/metadata/src/utils/validation.rs @@ -427,7 +427,6 @@ pub fn get_custom_value_hash( pub fn get_storage_hash(pallet: &PalletMetadata, entry_name: &str) -> Option<[u8; HASH_LEN]> { let storage = pallet.storage()?; let entry = storage.entry_by_name(entry_name)?; - let hash = get_storage_entry_hash(pallet.types, entry, &mut HashMap::new()); Some(hash) } diff --git a/subxt/examples/tx_basic.rs b/subxt/examples/tx_basic.rs index 53818b5ece..b1ffd30331 100644 --- a/subxt/examples/tx_basic.rs +++ b/subxt/examples/tx_basic.rs @@ -12,7 +12,7 @@ async fn main() -> Result<(), Box> { // Build a balance transfer extrinsic. let dest = dev::bob().public_key().into(); - let balance_transfer_tx = polkadot::tx().balances().transfer(dest, 10_000); + let balance_transfer_tx = polkadot::tx().balances().transfer_allow_death(dest, 10_000); // Submit the balance transfer extrinsic from Alice, and wait for it to be successful // and in a finalized block. We get back the extrinsic events if all is well. diff --git a/subxt/examples/tx_status_stream.rs b/subxt/examples/tx_status_stream.rs index 47fd58321e..134b739dbe 100644 --- a/subxt/examples/tx_status_stream.rs +++ b/subxt/examples/tx_status_stream.rs @@ -12,7 +12,7 @@ async fn main() -> Result<(), Box> { // Build a balance transfer extrinsic. let dest = dev::bob().public_key().into(); - let balance_transfer_tx = polkadot::tx().balances().transfer(dest, 10_000); + let balance_transfer_tx = polkadot::tx().balances().transfer_allow_death(dest, 10_000); // Submit the balance transfer extrinsic from Alice, and then monitor the // progress of it. diff --git a/subxt/examples/tx_with_params.rs b/subxt/examples/tx_with_params.rs index 90a02430b0..810597a5d1 100644 --- a/subxt/examples/tx_with_params.rs +++ b/subxt/examples/tx_with_params.rs @@ -12,7 +12,7 @@ async fn main() -> Result<(), Box> { // Build a balance transfer extrinsic. let dest = dev::bob().public_key().into(); - let tx = polkadot::tx().balances().transfer(dest, 10_000); + let tx = polkadot::tx().balances().transfer_allow_death(dest, 10_000); let latest_block = api.blocks().at_latest().await?; diff --git a/subxt/examples/unstable_light_client_tx_basic.rs b/subxt/examples/unstable_light_client_tx_basic.rs index 750a7c2c7a..7e0e9ad1bb 100644 --- a/subxt/examples/unstable_light_client_tx_basic.rs +++ b/subxt/examples/unstable_light_client_tx_basic.rs @@ -26,7 +26,7 @@ async fn main() -> Result<(), Box> { // Build a balance transfer extrinsic. let dest = dev::bob().public_key().into(); - let balance_transfer_tx = polkadot::tx().balances().transfer(dest, 10_000); + let balance_transfer_tx = polkadot::tx().balances().transfer_allow_death(dest, 10_000); // Submit the balance transfer extrinsic from Alice, and wait for it to be successful // and in a finalized block. We get back the extrinsic events if all is well. diff --git a/subxt/src/backend/legacy/mod.rs b/subxt/src/backend/legacy/mod.rs index acf903ca78..98f0f20559 100644 --- a/subxt/src/backend/legacy/mod.rs +++ b/subxt/src/backend/legacy/mod.rs @@ -19,6 +19,7 @@ use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; +// Expose the RPC methods. pub use rpc_methods::LegacyRpcMethods; /// The legacy backend. @@ -125,15 +126,6 @@ impl Backend for LegacyBackend { Ok(BlockRef::from_hash(hash)) } - async fn latest_best_block_ref(&self) -> Result, Error> { - let hash = self - .methods - .chain_get_block_hash(None) - .await? - .ok_or_else(|| Error::Other("Latest best block doesn't exist".into()))?; - Ok(BlockRef::from_hash(hash)) - } - async fn current_runtime_version(&self) -> Result { let details = self.methods.state_get_runtime_version(None).await?; Ok(RuntimeVersion { @@ -231,7 +223,9 @@ impl Backend for LegacyBackend { }) } RpcTransactionStatus::InBlock(hash) => { - Some(TransactionStatus::InBestBlock { hash }) + Some(TransactionStatus::InBestBlock { + hash: BlockRef::from_hash(hash), + }) } // These 5 mean that the stream will very likely end: RpcTransactionStatus::FinalityTimeout(_) => { @@ -240,7 +234,9 @@ impl Backend for LegacyBackend { }) } RpcTransactionStatus::Finalized(hash) => { - Some(TransactionStatus::InFinalizedBlock { hash }) + Some(TransactionStatus::InFinalizedBlock { + hash: BlockRef::from_hash(hash), + }) } RpcTransactionStatus::Usurped(_) => Some(TransactionStatus::Invalid { message: "Transaction was usurped by another with the same nonce" diff --git a/subxt/src/backend/mod.rs b/subxt/src/backend/mod.rs index a8d52cfa72..3fb9782c53 100644 --- a/subxt/src/backend/mod.rs +++ b/subxt/src/backend/mod.rs @@ -65,10 +65,6 @@ pub trait Backend: sealed::Sealed + Send + Sync + 'static { /// Note: needed only in blocks client for finalized block stream; can prolly be removed. async fn latest_finalized_block_ref(&self) -> Result, Error>; - /// Get the most recent best block hash. - /// Note: needed only in blocks client for finalized block stream; can prolly be removed. - async fn latest_best_block_ref(&self) -> Result, Error>; - /// Get information about the current runtime. async fn current_runtime_version(&self) -> Result; @@ -314,15 +310,17 @@ pub enum TransactionStatus { /// Number of peers it's been broadcast to. num_peers: u32, }, + /// Transaciton is no longer in a best block. + NoLongerInBestBlock, /// Transaction has been included in block with given hash. InBestBlock { /// Block hash the transaction is in. - hash: Hash, + hash: BlockRef, }, /// Transaction has been finalized by a finality-gadget, e.g GRANDPA InFinalizedBlock { /// Block hash the transaction is in. - hash: Hash, + hash: BlockRef, }, /// Something went wrong in the node. Error { diff --git a/subxt/src/backend/unstable/follow_stream.rs b/subxt/src/backend/unstable/follow_stream.rs new file mode 100644 index 0000000000..dbd2616328 --- /dev/null +++ b/subxt/src/backend/unstable/follow_stream.rs @@ -0,0 +1,314 @@ +// Copyright 2019-2023 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use super::rpc_methods::{FollowEvent, UnstableRpcMethods}; +use crate::config::Config; +use crate::error::Error; +use futures::{FutureExt, Stream, StreamExt}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A `Stream` whose goal is to remain subscribed to `chainHead_follow`. It will re-subscribe if the subscription +/// is ended for any reason, and it will return the current `subscription_id` as an event, along with the other +/// follow events. +pub struct FollowStream { + // Using this and not just keeping a copy of the RPC methods + // around means that we can test this in isolation with dummy streams. + stream_getter: FollowEventStreamGetter, + stream: InnerStreamState, +} + +impl std::fmt::Debug for FollowStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FollowStream") + .field("stream_getter", &"..") + .field("stream", &self.stream) + .finish() + } +} + +/// A getter function that returns an [`FollowEventStreamFut`]. +pub type FollowEventStreamGetter = Box FollowEventStreamFut + Send>; + +/// The future which will return a stream of follow events and the subscription ID for it. +pub type FollowEventStreamFut = Pin< + Box, String), Error>> + Send + 'static>, +>; + +/// The stream of follow events. +pub type FollowEventStream = + Pin, Error>> + Send + 'static>>; + +/// Either a ready message with the current subscription ID, or +/// an event from the stream itself. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FollowStreamMsg { + /// The stream is ready (and has a subscription ID) + Ready(String), + /// An event from the stream. + Event(FollowEvent), +} + +impl FollowStreamMsg { + /// Return an event, or none if the message is a "ready" one. + pub fn into_event(self) -> Option> { + match self { + FollowStreamMsg::Ready(_) => None, + FollowStreamMsg::Event(e) => Some(e), + } + } +} + +enum InnerStreamState { + /// We've just created the stream; we'll start Initializing it + New, + /// We're fetching the inner subscription. Move to Ready when we have one. + Initializing(FollowEventStreamFut), + /// Report back the subscription ID here, and then start ReceivingEvents. + Ready(Option<(FollowEventStream, String)>), + /// We are polling for, and receiving events from the stream. + ReceivingEvents(FollowEventStream), + /// We received a stop event. We'll send one on and restart the stream. + Stopped, + /// The stream is finished and will not restart (likely due to an error). + Finished, +} + +impl std::fmt::Debug for InnerStreamState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::New => write!(f, "New"), + Self::Initializing(_) => write!(f, "Initializing(..)"), + Self::Ready(_) => write!(f, "Ready(..)"), + Self::ReceivingEvents(_) => write!(f, "ReceivingEvents(..)"), + Self::Stopped => write!(f, "Stopped"), + Self::Finished => write!(f, "Finished"), + } + } +} + +impl FollowStream { + /// Create a new [`FollowStream`] given a function which returns the stream. + pub fn new(stream_getter: FollowEventStreamGetter) -> Self { + Self { + stream_getter, + stream: InnerStreamState::New, + } + } + + /// Create a new [`FollowStream`] given the RPC methods. + pub fn from_methods(methods: UnstableRpcMethods) -> FollowStream { + FollowStream { + stream_getter: Box::new(move || { + let methods = methods.clone(); + Box::pin(async move { + // Make the RPC call: + let stream = methods.chainhead_unstable_follow(true).await?; + // Extract the subscription ID: + let Some(sub_id) = stream.subscription_id().map(ToOwned::to_owned) else { + return Err(Error::Other( + "Subscription ID expected for chainHead_follow response, but not given" + .to_owned(), + )); + }; + // Return both: + let stream: FollowEventStream = Box::pin(stream); + Ok((stream, sub_id)) + }) + }), + stream: InnerStreamState::New, + } + } +} + +impl std::marker::Unpin for FollowStream {} + +impl Stream for FollowStream { + type Item = Result, Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + loop { + match &mut this.stream { + InnerStreamState::New => { + let fut = (this.stream_getter)(); + this.stream = InnerStreamState::Initializing(fut); + continue; + } + InnerStreamState::Initializing(fut) => { + match fut.poll_unpin(cx) { + Poll::Pending => { + return Poll::Pending; + } + Poll::Ready(Ok(sub_with_id)) => { + this.stream = InnerStreamState::Ready(Some(sub_with_id)); + continue; + } + Poll::Ready(Err(e)) => { + // Finish forever if there's an error, passing it on. + this.stream = InnerStreamState::Finished; + return Poll::Ready(Some(Err(e))); + } + } + } + InnerStreamState::Ready(stream) => { + // We never set the Option to `None`; we just have an Option so + // that we can take ownership of the contents easily here. + let (sub, sub_id) = stream.take().expect("should always be Some"); + this.stream = InnerStreamState::ReceivingEvents(sub); + return Poll::Ready(Some(Ok(FollowStreamMsg::Ready(sub_id)))); + } + InnerStreamState::ReceivingEvents(stream) => { + match stream.poll_next_unpin(cx) { + Poll::Pending => { + return Poll::Pending; + } + Poll::Ready(None) => { + // No error happened but the stream ended; restart and + // pass on a Stop message anyway. + this.stream = InnerStreamState::Stopped; + continue; + } + Poll::Ready(Some(Ok(ev))) => { + if let FollowEvent::Stop = ev { + // A stop event means the stream has ended, so start + // over after passing on the stop message. + this.stream = InnerStreamState::Stopped; + continue; + } + return Poll::Ready(Some(Ok(FollowStreamMsg::Event(ev)))); + } + Poll::Ready(Some(Err(e))) => { + // Finish forever if there's an error, passing it on. + this.stream = InnerStreamState::Finished; + return Poll::Ready(Some(Err(e))); + } + } + } + InnerStreamState::Stopped => { + this.stream = InnerStreamState::New; + return Poll::Ready(Some(Ok(FollowStreamMsg::Event(FollowEvent::Stop)))); + } + InnerStreamState::Finished => { + return Poll::Ready(None); + } + } + } + } +} + +#[cfg(test)] +pub(super) mod test_utils { + use super::*; + use crate::backend::unstable::rpc_methods::{ + BestBlockChanged, Finalized, Initialized, NewBlock, + }; + use crate::config::substrate::H256; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + /// Given some events, returns a follow stream getter that we can use in + /// place of the usual RPC method. + pub fn test_stream_getter(events: F) -> FollowEventStreamGetter + where + Hash: Send + 'static, + F: Fn() -> I + Send + 'static, + I: IntoIterator, Error>>, + { + let start_idx = Arc::new(AtomicUsize::new(0)); + + Box::new(move || { + // Start the events from where we left off last time. + let start_idx = start_idx.clone(); + let this_idx = start_idx.load(Ordering::Relaxed); + let events: Vec<_> = events().into_iter().skip(this_idx).collect(); + + Box::pin(async move { + // Increment start_idx for each event we see, so that if we get + // the stream again, we get only the remaining events for it. + let stream = futures::stream::iter(events).map(move |ev| { + start_idx.fetch_add(1, Ordering::Relaxed); + ev + }); + + let stream: FollowEventStream = Box::pin(stream); + Ok((stream, format!("sub_id_{this_idx}"))) + }) + }) + } + + /// An initialized event + pub fn ev_initialized(n: u64) -> FollowEvent { + FollowEvent::Initialized(Initialized { + finalized_block_hash: H256::from_low_u64_le(n), + finalized_block_runtime: None, + }) + } + + /// A new block event + pub fn ev_new_block(parent: u64, n: u64) -> FollowEvent { + FollowEvent::NewBlock(NewBlock { + parent_block_hash: H256::from_low_u64_le(parent), + block_hash: H256::from_low_u64_le(n), + new_runtime: None, + }) + } + + /// A best block event + pub fn ev_best_block(n: u64) -> FollowEvent { + FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: H256::from_low_u64_le(n), + }) + } + + /// A finalized event + pub fn ev_finalized(ns: impl IntoIterator) -> FollowEvent { + FollowEvent::Finalized(Finalized { + finalized_block_hashes: ns.into_iter().map(H256::from_low_u64_le).collect(), + pruned_block_hashes: vec![], + }) + } +} + +#[cfg(test)] +pub mod test { + use super::*; + use test_utils::{ev_initialized, ev_new_block, test_stream_getter}; + + #[tokio::test] + async fn follow_stream_provides_messages_until_error() { + // The events we'll get back on the stream. + let stream_getter = test_stream_getter(|| { + [ + Ok(ev_initialized(1)), + // Stop should lead to a drop and resubscribe: + Ok(FollowEvent::Stop), + Ok(FollowEvent::Stop), + Ok(ev_new_block(1, 2)), + // Nothing should be emitted after an error: + Err(Error::Other("ended".to_owned())), + Ok(ev_new_block(2, 3)), + ] + }); + + let s = FollowStream::new(stream_getter); + let out: Vec<_> = s.filter_map(|e| async move { e.ok() }).collect().await; + + // The expected response, given the above. + assert_eq!( + out, + vec![ + FollowStreamMsg::Ready("sub_id_0".to_owned()), + FollowStreamMsg::Event(ev_initialized(1)), + FollowStreamMsg::Event(FollowEvent::Stop), + FollowStreamMsg::Ready("sub_id_2".to_owned()), + FollowStreamMsg::Event(FollowEvent::Stop), + FollowStreamMsg::Ready("sub_id_3".to_owned()), + FollowStreamMsg::Event(ev_new_block(1, 2)), + ] + ); + } +} diff --git a/subxt/src/backend/unstable/follow_stream_driver.rs b/subxt/src/backend/unstable/follow_stream_driver.rs new file mode 100644 index 0000000000..f058e2d250 --- /dev/null +++ b/subxt/src/backend/unstable/follow_stream_driver.rs @@ -0,0 +1,488 @@ +// Copyright 2019-2023 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use super::follow_stream_unpin::{BlockRef, FollowStreamMsg, FollowStreamUnpin}; +use crate::backend::unstable::rpc_methods::{FollowEvent, Initialized, RuntimeEvent}; +use crate::config::BlockHash; +use crate::error::Error; +use futures::stream::{Stream, StreamExt}; +use std::collections::{HashMap, VecDeque}; +use std::ops::DerefMut; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; + +/// A `Stream` which builds on `FollowStreamDriver`, and allows multiple subscribers to obtain events +/// from the single underlying subscription (each being provided an `Initialized` message and all new +/// blocks since then, as if they were each creating a unique `chainHead_follow` subscription). This +/// is the "top" layer of our follow stream subscriptions, and the one that's interacted with elsewhere. +#[derive(Debug)] +pub struct FollowStreamDriver { + inner: FollowStreamUnpin, + shared: Shared, +} + +impl FollowStreamDriver { + /// Create a new [`FollowStreamDriver`]. This must be polled by some executor + /// in order for any progress to be made. Things can subscribe to events. + pub fn new(follow_unpin: FollowStreamUnpin) -> Self { + Self { + inner: follow_unpin, + shared: Shared::default(), + } + } + + /// Return a handle from which we can create new subscriptions to follow events. + pub fn handle(&self) -> FollowStreamDriverHandle { + FollowStreamDriverHandle { + shared: self.shared.clone(), + } + } +} + +impl Stream for FollowStreamDriver { + type Item = Result<(), Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let item = match self.inner.poll_next_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { + // Mark ourselves as done so that everything can end. + self.shared.done(); + return Poll::Ready(None); + } + Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), + Poll::Ready(Some(Ok(item))) => item, + }; + + self.shared.push_item(item); + Poll::Ready(Some(Ok(()))) + } +} + +/// A handle that can be used to create subscribers, but that doesn't +/// itself subscribe to events. +#[derive(Debug, Clone)] +pub struct FollowStreamDriverHandle { + shared: Shared, +} + +impl FollowStreamDriverHandle { + /// Subscribe to follow events. + pub fn subscribe(&self) -> FollowStreamDriverSubscription { + self.shared.subscribe() + } +} + +/// A subscription to events from the [`FollowStreamDriver`]. All subscriptions +/// begin first with a `Ready` event containing the current subscription ID, and +/// then with an `Initialized` event containing the latest finalized block and latest +/// runtime information, and then any new/best block events and so on received since +/// the latest finalized block. +#[derive(Debug)] +pub struct FollowStreamDriverSubscription { + id: usize, + done: bool, + shared: Shared, + local_items: VecDeque>>, +} + +impl Stream for FollowStreamDriverSubscription { + type Item = FollowStreamMsg>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.done { + return Poll::Ready(None); + } + + loop { + if let Some(item) = self.local_items.pop_front() { + return Poll::Ready(Some(item)); + } + + let items = self.shared.take_items_and_save_waker(self.id, cx.waker()); + + // If no items left, mark locally as done (to avoid further locking) + // and return None to signal done-ness. + let Some(items) = items else { + self.done = true; + return Poll::Ready(None); + }; + + // No items? We've saved the waker so we'll be told when more come. + // Else, save the items locally and loop around to pop from them. + if items.is_empty() { + return Poll::Pending; + } else { + self.local_items = items; + } + } + } +} + +impl FollowStreamDriverSubscription { + /// Return the current subscription ID. If the subscription has stopped, then this will + /// wait until a new subscription has started with a new ID. + pub async fn subscription_id(self) -> Option { + let ready_event = self + .skip_while(|ev| std::future::ready(!matches!(ev, FollowStreamMsg::Ready(_)))) + .next() + .await?; + + match ready_event { + FollowStreamMsg::Ready(sub_id) => Some(sub_id), + _ => None, + } + } + + /// Subscribe to the follow events, ignoring any other messages. + pub fn events(self) -> impl Stream>> + Send + Sync { + self.filter_map(|ev| std::future::ready(ev.into_event())) + } +} + +impl Clone for FollowStreamDriverSubscription { + fn clone(&self) -> Self { + self.shared.subscribe() + } +} + +impl Drop for FollowStreamDriverSubscription { + fn drop(&mut self) { + self.shared.remove_sub(self.id); + } +} + +/// Locked shared state. The driver stream will access this state to push +/// events to any subscribers, and subscribers will access it to pull the +/// events destined for themselves. +#[derive(Debug, Clone)] +struct Shared(Arc>>); + +#[derive(Debug)] +struct SharedState { + done: bool, + next_id: usize, + subscribers: HashMap>, + // Keep a buffer of all events from last finalized block so that new + // subscriptions can be handed this info first. + block_events_from_last_finalized: VecDeque>>, + // Keep track of the subscription ID we send out on new subs. + current_subscription_id: Option, + // Keep track of the init message we send out on new subs. + current_init_message: Option>>, + // Runtime events by block hash; we need to track these to know + // whether the runtime has changed when we see a finalized block notification. + seen_runtime_events: HashMap, +} + +impl Default for Shared { + fn default() -> Self { + Shared(Arc::new(Mutex::new(SharedState { + next_id: 1, + done: false, + subscribers: HashMap::new(), + current_init_message: None, + current_subscription_id: None, + seen_runtime_events: HashMap::new(), + block_events_from_last_finalized: VecDeque::new(), + }))) + } +} + +impl Shared { + /// Set the shared state to "done"; no more items will be handed to it. + pub fn done(&self) { + let mut shared = self.0.lock().unwrap(); + shared.done = true; + } + + /// Cleanup a subscription. + pub fn remove_sub(&self, sub_id: usize) { + let mut shared = self.0.lock().unwrap(); + shared.subscribers.remove(&sub_id); + } + + /// Take items for some subscription ID and save the waker. + pub fn take_items_and_save_waker( + &self, + sub_id: usize, + waker: &Waker, + ) -> Option>>> { + let mut shared = self.0.lock().unwrap(); + + let is_done = shared.done; + let details = shared.subscribers.get_mut(&sub_id)?; + + // no more items to pull, and stream closed, so return None. + if details.items.is_empty() && is_done { + return None; + } + + // else, take whatever items, and save the waker if not done yet. + let items = std::mem::take(&mut details.items); + if !is_done { + details.waker = Some(waker.clone()); + } + Some(items) + } + + /// Push a new item out to subscribers. + pub fn push_item(&self, item: FollowStreamMsg>) { + let mut shared = self.0.lock().unwrap(); + let shared = shared.deref_mut(); + + // broadcast item to subscribers: + for details in shared.subscribers.values_mut() { + details.items.push_back(item.clone()); + if let Some(waker) = details.waker.take() { + waker.wake(); + } + } + + // Keep our buffer of ready/block events uptodate: + match item { + FollowStreamMsg::Ready(sub_id) => { + // Set new subscription ID when it comes in. + shared.current_subscription_id = Some(sub_id); + } + FollowStreamMsg::Event(FollowEvent::Initialized(ev)) => { + // New subscriptions will be given this init message: + shared.current_init_message = Some(ev.clone()); + // Clear block cache (since a new finalized block hash is seen): + shared.block_events_from_last_finalized.clear(); + } + FollowStreamMsg::Event(FollowEvent::Finalized(finalized_ev)) => { + // Update the init message that we'll hand out to new subscriptions. If the init message + // is `None` for some reason, we just ignore this step. + if let Some(init_message) = &mut shared.current_init_message { + // Find the latest runtime update that's been finalized. + let newest_runtime = finalized_ev + .finalized_block_hashes + .iter() + .rev() + .filter_map(|h| shared.seen_runtime_events.get(&h.hash()).cloned()) + .next(); + + shared.seen_runtime_events.clear(); + + if let Some(finalized) = finalized_ev.finalized_block_hashes.last() { + init_message.finalized_block_hash = finalized.clone(); + } + if let Some(runtime_ev) = newest_runtime { + init_message.finalized_block_runtime = Some(runtime_ev); + } + } + + // New finalized block, so clear the cache of older block events. + shared.block_events_from_last_finalized.clear(); + } + FollowStreamMsg::Event(FollowEvent::NewBlock(new_block_ev)) => { + // If a new runtime is seen, note it so that when a block is finalized, we + // can associate that with a runtime update having happened. + if let Some(runtime_event) = &new_block_ev.new_runtime { + shared + .seen_runtime_events + .insert(new_block_ev.block_hash.hash(), runtime_event.clone()); + } + + shared + .block_events_from_last_finalized + .push_back(FollowEvent::NewBlock(new_block_ev)); + } + FollowStreamMsg::Event(ev @ FollowEvent::BestBlockChanged(_)) => { + shared.block_events_from_last_finalized.push_back(ev); + } + FollowStreamMsg::Event(FollowEvent::Stop) => { + // On a stop event, clear everything. Wait for resubscription and new ready/initialised events. + shared.block_events_from_last_finalized.clear(); + shared.current_subscription_id = None; + shared.current_init_message = None; + } + _ => { + // We don't buffer any other events. + } + } + } + + /// Create a new subscription. + pub fn subscribe(&self) -> FollowStreamDriverSubscription { + let mut shared = self.0.lock().unwrap(); + + let id = shared.next_id; + shared.next_id += 1; + + shared.subscribers.insert( + id, + SubscriberDetails { + items: VecDeque::new(), + waker: None, + }, + ); + + // Any new subscription should start with a "Ready" message and then an "Initialized" + // message, and then any non-finalized block events since that. If these don't exist, + // it means the subscription is currently stopped, and we should expect new Ready/Init + // messages anyway once it restarts. + let mut local_items = VecDeque::new(); + if let Some(sub_id) = &shared.current_subscription_id { + local_items.push_back(FollowStreamMsg::Ready(sub_id.clone())); + } + if let Some(init_msg) = &shared.current_init_message { + local_items.push_back(FollowStreamMsg::Event(FollowEvent::Initialized( + init_msg.clone(), + ))); + } + for ev in &shared.block_events_from_last_finalized { + local_items.push_back(FollowStreamMsg::Event(ev.clone())); + } + + drop(shared); + + FollowStreamDriverSubscription { + id, + done: false, + shared: self.clone(), + local_items, + } + } +} + +/// Details for a given subscriber: any items it's not yet claimed, +/// and a way to wake it up when there are more items for it. +#[derive(Debug)] +struct SubscriberDetails { + items: VecDeque>>, + waker: Option, +} + +#[cfg(test)] +mod test_utils { + use super::super::follow_stream_unpin::test_utils::test_unpin_stream_getter; + use super::*; + + /// Return a `FollowStreamDriver` + pub fn test_follow_stream_driver_getter( + events: F, + max_life: usize, + ) -> FollowStreamDriver + where + Hash: BlockHash + 'static, + F: Fn() -> I + Send + 'static, + I: IntoIterator, Error>>, + { + let (stream, _) = test_unpin_stream_getter(events, max_life); + FollowStreamDriver::new(stream) + } +} + +#[cfg(test)] +mod test { + use super::super::follow_stream::test_utils::{ + ev_best_block, ev_finalized, ev_initialized, ev_new_block, + }; + use super::super::follow_stream_unpin::test_utils::{ + ev_best_block_ref, ev_finalized_ref, ev_initialized_ref, ev_new_block_ref, + }; + use super::test_utils::test_follow_stream_driver_getter; + use super::*; + + #[test] + fn follow_stream_driver_is_sendable() { + fn assert_send(_: T) {} + let stream_getter = test_follow_stream_driver_getter(|| [Ok(ev_initialized(1))], 10); + assert_send(stream_getter); + } + + #[tokio::test] + async fn subscribers_all_receive_events_and_finish_gracefully_on_error() { + let mut driver = test_follow_stream_driver_getter( + || { + [ + Ok(ev_initialized(0)), + Ok(ev_new_block(0, 1)), + Ok(ev_best_block(1)), + Ok(ev_finalized([1])), + Err(Error::Other("ended".to_owned())), + ] + }, + 10, + ); + + let handle = driver.handle(); + + let a = handle.subscribe(); + let b = handle.subscribe(); + let c = handle.subscribe(); + + // Drive to completion (the sort of real life usage I'd expect): + tokio::spawn(async move { while driver.next().await.is_some() {} }); + + let a_vec: Vec<_> = a.collect().await; + let b_vec: Vec<_> = b.collect().await; + let c_vec: Vec<_> = c.collect().await; + + let expected = vec![ + FollowStreamMsg::Ready("sub_id_0".into()), + FollowStreamMsg::Event(ev_initialized_ref(0)), + FollowStreamMsg::Event(ev_new_block_ref(0, 1)), + FollowStreamMsg::Event(ev_best_block_ref(1)), + FollowStreamMsg::Event(ev_finalized_ref([1])), + ]; + + assert_eq!(a_vec, expected); + assert_eq!(b_vec, expected); + assert_eq!(c_vec, expected); + } + + #[tokio::test] + async fn subscribers_receive_block_events_from_last_finalised() { + let mut driver = test_follow_stream_driver_getter( + || { + [ + Ok(ev_initialized(0)), + Ok(ev_new_block(0, 1)), + Ok(ev_best_block(1)), + Ok(ev_finalized([1])), + Ok(ev_new_block(1, 2)), + Ok(ev_new_block(2, 3)), + Err(Error::Other("ended".to_owned())), + ] + }, + 10, + ); + + // Skip past ready, init, new, best events. + let _r = driver.next().await.unwrap(); + let _i0 = driver.next().await.unwrap(); + let _n1 = driver.next().await.unwrap(); + let _b1 = driver.next().await.unwrap(); + + // THEN subscribe; subscription should still receive them: + let evs: Vec<_> = driver.handle().subscribe().take(4).collect().await; + let expected = vec![ + FollowStreamMsg::Ready("sub_id_0".into()), + FollowStreamMsg::Event(ev_initialized_ref(0)), + FollowStreamMsg::Event(ev_new_block_ref(0, 1)), + FollowStreamMsg::Event(ev_best_block_ref(1)), + ]; + assert_eq!(evs, expected); + + // Skip past finalized 1, new 2, new 3 events + let _f1 = driver.next().await.unwrap(); + let _n2 = driver.next().await.unwrap(); + let _n3 = driver.next().await.unwrap(); + + // THEN subscribe again; new subs will see an updated initialized message + // with the latest finalized block hash. + let evs: Vec<_> = driver.handle().subscribe().take(4).collect().await; + let expected = vec![ + FollowStreamMsg::Ready("sub_id_0".into()), + FollowStreamMsg::Event(ev_initialized_ref(1)), + FollowStreamMsg::Event(ev_new_block_ref(1, 2)), + FollowStreamMsg::Event(ev_new_block_ref(2, 3)), + ]; + assert_eq!(evs, expected); + } +} diff --git a/subxt/src/backend/unstable/follow_stream_unpin.rs b/subxt/src/backend/unstable/follow_stream_unpin.rs new file mode 100644 index 0000000000..7b54f642dc --- /dev/null +++ b/subxt/src/backend/unstable/follow_stream_unpin.rs @@ -0,0 +1,634 @@ +// Copyright 2019-2023 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use super::follow_stream::FollowStream; +use super::UnstableRpcMethods; +use crate::backend::unstable::rpc_methods::{ + BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, +}; +use crate::config::{BlockHash, Config}; +use crate::error::Error; +use futures::stream::{FuturesUnordered, Stream, StreamExt}; +use std::collections::{HashMap, HashSet}; +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; + +/// The type of stream item. +pub use super::follow_stream::FollowStreamMsg; + +/// A `Stream` which builds on `FollowStream`, and handles pinning. It replaces any block hash seen in +/// the follow events with a `BlockRef` which, when all clones are dropped, will lead to an "unpin" call +/// for that block hash being queued. It will also automatically unpin any blocks that exceed a given max +/// age, to try and prevent the underlying stream from ending (and _all_ blocks from being unpinned as a +/// result). Put simply, it tries to keep every block pinned as long as possible until the block is no longer +/// used anywhere. +#[derive(Debug)] +pub struct FollowStreamUnpin { + // The underlying stream of events. + inner: FollowStream, + // A method to call to unpin a block, given a block hash and a subscription ID. + unpin_method: UnpinMethodHolder, + // Futures for sending unpin events that we'll poll to completion as + // part of polling the stream as a whole. + unpin_futs: FuturesUnordered, + // Each new finalized block increments this. Allows us to track + // the age of blocks so that we can unpin old ones. + rel_block_num: usize, + // The latest ID of the FollowStream subscription, which we can use + // to unpin blocks. + subscription_id: Option>, + // The longest period a block can be pinned for. + max_block_life: usize, + // The currently seen and pinned blocks. + pinned: HashMap>, + // Shared state about blocks we've flagged to unpin from elsewhere + unpin_flags: UnpinFlags, +} + +// Just a wrapper to make implementing debug on the whole thing easier. +struct UnpinMethodHolder(UnpinMethod); +impl std::fmt::Debug for UnpinMethodHolder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "UnpinMethodHolder(Box) -> UnpinFut>)" + ) + } +} + +/// The type of the unpin method that we need to provide. +pub type UnpinMethod = Box) -> UnpinFut + Send>; + +/// The future returned from [`UnpinMethod`]. +pub type UnpinFut = Pin + Send + 'static>>; + +impl std::marker::Unpin for FollowStreamUnpin {} + +impl Stream for FollowStreamUnpin { + type Item = Result>, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut(); + + loop { + // Poll the unpin tasks while they are completing. if we get back None, then + // no tasks in the list, and if pending, we'll be woken when we can poll again. + if let Poll::Ready(Some(())) = this.unpin_futs.poll_next_unpin(cx) { + continue; + }; + + // Poll the inner stream for the next event. + let Poll::Ready(ev) = this.inner.poll_next_unpin(cx) else { + return Poll::Pending; + }; + + // No more progress to be made if inner stream done. + let Some(ev) = ev else { + return Poll::Ready(None); + }; + + // Error? just return it and do nothing further. + let ev = match ev { + Ok(ev) => ev, + Err(e) => { + return Poll::Ready(Some(Err(e))); + } + }; + + // React to any actual FollowEvent we get back. + let ev = match ev { + FollowStreamMsg::Ready(subscription_id) => { + // update the subscription ID we'll use to unpin things. + this.subscription_id = Some(subscription_id.clone().into()); + + FollowStreamMsg::Ready(subscription_id) + } + FollowStreamMsg::Event(FollowEvent::Initialized(details)) => { + // The first finalized block gets the starting block_num. + let rel_block_num = this.rel_block_num; + let block_ref = this.pin_block_at(rel_block_num, details.finalized_block_hash); + + FollowStreamMsg::Event(FollowEvent::Initialized(Initialized { + finalized_block_hash: block_ref, + finalized_block_runtime: details.finalized_block_runtime, + })) + } + FollowStreamMsg::Event(FollowEvent::NewBlock(details)) => { + // One bigger than our parent, and if no parent seen (maybe it was + // unpinned already), then one bigger than the last finalized block num + // as a best guess. + let parent_rel_block_num = this + .pinned + .get(&details.parent_block_hash) + .map(|p| p.rel_block_num) + .unwrap_or(this.rel_block_num); + + let block_ref = this.pin_block_at(parent_rel_block_num + 1, details.block_hash); + let parent_block_ref = + this.pin_block_at(parent_rel_block_num, details.parent_block_hash); + + FollowStreamMsg::Event(FollowEvent::NewBlock(NewBlock { + block_hash: block_ref, + parent_block_hash: parent_block_ref, + new_runtime: details.new_runtime, + })) + } + FollowStreamMsg::Event(FollowEvent::BestBlockChanged(details)) => { + // We expect this block to already exist, so it'll keep its existing block_num, + // but worst case it'll just get the current finalized block_num + 1. + let rel_block_num = this.rel_block_num + 1; + let block_ref = this.pin_block_at(rel_block_num, details.best_block_hash); + + FollowStreamMsg::Event(FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: block_ref, + })) + } + FollowStreamMsg::Event(FollowEvent::Finalized(details)) => { + let finalized_block_refs: Vec<_> = details + .finalized_block_hashes + .into_iter() + .enumerate() + .map(|(idx, hash)| { + // These blocks _should_ exist already and so will have a known block num, + // but if they don't, we just increment the num from the last finalized block + // we saw, which should be accurate. + let rel_block_num = this.rel_block_num + idx + 1; + this.pin_block_at(rel_block_num, hash) + }) + .collect(); + + // Our relative block height is increased by however many finalized + // blocks we've seen. + this.rel_block_num += finalized_block_refs.len(); + + let pruned_block_refs: Vec<_> = details + .pruned_block_hashes + .into_iter() + .map(|hash| { + // We should know about these, too, and if not we set their age to last_finalized + 1 + let rel_block_num = this.rel_block_num + 1; + this.pin_block_at(rel_block_num, hash) + }) + .collect(); + + // At this point, we also check to see which blocks we should submit unpin events + // for. When we see a block hash as finalized, we know that it won't be reported again + // (except as a parent hash of a new block), so we can safely make an unpin call for it + // without worrying about the hash being returned again despite the block not being pinned. + this.unpin_blocks(cx.waker()); + + FollowStreamMsg::Event(FollowEvent::Finalized(Finalized { + finalized_block_hashes: finalized_block_refs, + pruned_block_hashes: pruned_block_refs, + })) + } + FollowStreamMsg::Event(FollowEvent::Stop) => { + // clear out "old" things that are no longer applicable since + // the subscription has ended (a new one will be created under the hood). + this.pinned.clear(); + this.unpin_futs.clear(); + this.unpin_flags.lock().unwrap().clear(); + this.rel_block_num = 0; + + FollowStreamMsg::Event(FollowEvent::Stop) + } + // These events aren't intresting; we just forward them on: + FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details)) => { + FollowStreamMsg::Event(FollowEvent::OperationBodyDone(details)) + } + FollowStreamMsg::Event(FollowEvent::OperationCallDone(details)) => { + FollowStreamMsg::Event(FollowEvent::OperationCallDone(details)) + } + FollowStreamMsg::Event(FollowEvent::OperationStorageItems(details)) => { + FollowStreamMsg::Event(FollowEvent::OperationStorageItems(details)) + } + FollowStreamMsg::Event(FollowEvent::OperationWaitingForContinue(details)) => { + FollowStreamMsg::Event(FollowEvent::OperationWaitingForContinue(details)) + } + FollowStreamMsg::Event(FollowEvent::OperationStorageDone(details)) => { + FollowStreamMsg::Event(FollowEvent::OperationStorageDone(details)) + } + FollowStreamMsg::Event(FollowEvent::OperationInaccessible(details)) => { + FollowStreamMsg::Event(FollowEvent::OperationInaccessible(details)) + } + FollowStreamMsg::Event(FollowEvent::OperationError(details)) => { + FollowStreamMsg::Event(FollowEvent::OperationError(details)) + } + }; + + // Return our event. + return Poll::Ready(Some(Ok(ev))); + } + } +} + +impl FollowStreamUnpin { + /// Create a new [`FollowStreamUnpin`]. + pub fn new( + follow_stream: FollowStream, + unpin_method: UnpinMethod, + max_block_life: usize, + ) -> Self { + Self { + inner: follow_stream, + unpin_method: UnpinMethodHolder(unpin_method), + max_block_life, + pinned: Default::default(), + subscription_id: None, + rel_block_num: 0, + unpin_flags: Default::default(), + unpin_futs: Default::default(), + } + } + + /// Create a new [`FollowStreamUnpin`] given the RPC methods. + pub fn from_methods( + follow_stream: FollowStream, + methods: UnstableRpcMethods, + max_block_life: usize, + ) -> FollowStreamUnpin { + let unpin_method = Box::new(move |hash: T::Hash, sub_id: Arc| { + let methods = methods.clone(); + let fut: UnpinFut = Box::pin(async move { + // We ignore any errors trying to unpin at the moment. + let _ = methods.chainhead_unstable_unpin(&sub_id, hash).await; + }); + fut + }); + + FollowStreamUnpin::new(follow_stream, unpin_method, max_block_life) + } + + /// Is the block hash currently pinned. + pub fn is_pinned(&self, hash: &Hash) -> bool { + self.pinned.contains_key(hash) + } + + /// Pin a block, or return the reference to an already-pinned block. If the block has been registered to + /// be unpinned, we'll clear those flags, so that it won't be unpinned. If the unpin request has already + /// been sent though, then the block will be unpinned. + fn pin_block_at(&mut self, rel_block_num: usize, hash: Hash) -> BlockRef { + let entry = self + .pinned + .entry(hash) + // Only if there's already an entry do we need to clear any unpin flags set against it. + .and_modify(|_| { + self.unpin_flags.lock().unwrap().remove(&hash); + }) + // If there's not an entry already, make one and return it. + .or_insert_with(|| PinnedDetails { + rel_block_num, + block_ref: BlockRef { + inner: Arc::new(BlockRefInner { + hash, + unpin_flags: self.unpin_flags.clone(), + }), + }, + }); + + entry.block_ref.clone() + } + + /// Unpin any blocks that are either too old, or have the unpin flag set and are old enough. + fn unpin_blocks(&mut self, waker: &Waker) { + let unpin_flags = std::mem::take(&mut *self.unpin_flags.lock().unwrap()); + let rel_block_num = self.rel_block_num; + + // If we asked to unpin and there was no subscription_id, then there's nothing to + // do here, and we've cleared the flags now above anyway. + let Some(sub_id) = &self.subscription_id else { + return; + }; + + let mut blocks_to_unpin = vec![]; + for (hash, details) in &self.pinned { + if rel_block_num.saturating_sub(details.rel_block_num) >= self.max_block_life + || unpin_flags.contains(hash) + { + // The block is too old, or it's been flagged to be unpinned (no refs to it left) + blocks_to_unpin.push(*hash); + } + } + + // No need to call the waker etc if nothing to do: + if blocks_to_unpin.is_empty() { + return; + } + + for hash in blocks_to_unpin { + self.pinned.remove(&hash); + let fut = (self.unpin_method.0)(hash, sub_id.clone()); + self.unpin_futs.push(fut); + } + + // Any new futures pushed above need polling to start. We could + // just wait for the next stream event, but let's wake the task to + // have it polled sooner, just incase it's slow to receive things. + waker.wake_by_ref(); + } +} + +// The set of block hashes that can be unpinned when ready. +// BlockRefs write to this when they are dropped. +type UnpinFlags = Arc>>; + +#[derive(Debug)] +struct PinnedDetails { + /// How old is the block? + rel_block_num: usize, + /// A block ref we can hand out to keep blocks pinned. + /// Because we store one here until it's unpinned, the live count + /// will only drop to 1 when no external refs are left. + block_ref: BlockRef, +} + +/// All blocks reported will be wrapped in this. +#[derive(Debug, Clone)] +pub struct BlockRef { + inner: Arc>, +} + +#[derive(Debug)] +struct BlockRefInner { + hash: Hash, + unpin_flags: UnpinFlags, +} + +impl BlockRef { + /// For testing purposes only, create a BlockRef from a hash + /// that isn't pinned. + #[cfg(test)] + pub fn new(hash: Hash) -> Self { + BlockRef { + inner: Arc::new(BlockRefInner { + hash, + unpin_flags: Default::default(), + }), + } + } + + /// Return the hash for this block. + pub fn hash(&self) -> Hash { + self.inner.hash + } +} + +impl PartialEq for BlockRef { + fn eq(&self, other: &Self) -> bool { + self.inner.hash == other.inner.hash + } +} + +impl PartialEq for BlockRef { + fn eq(&self, other: &Hash) -> bool { + &self.inner.hash == other + } +} + +impl Drop for BlockRef { + fn drop(&mut self) { + // PinnedDetails keeps one ref, so if this is the second ref, it's the + // only "external" one left and we should ask to unpin it now. if it's + // the only ref remaining, it means that it's already been unpinned, so + // nothing to do here anyway. + if Arc::strong_count(&self.inner) == 2 { + if let Ok(mut unpin_flags) = self.inner.unpin_flags.lock() { + unpin_flags.insert(self.inner.hash); + } + } + } +} + +#[cfg(test)] +pub(super) mod test_utils { + use super::super::follow_stream::{test_utils::test_stream_getter, FollowStream}; + use super::*; + use crate::config::substrate::H256; + + pub type UnpinRx = std::sync::mpsc::Receiver<(Hash, Arc)>; + + /// Get a `FolowStreamUnpin` from an iterator over events. + pub fn test_unpin_stream_getter( + events: F, + max_life: usize, + ) -> (FollowStreamUnpin, UnpinRx) + where + Hash: BlockHash + 'static, + F: Fn() -> I + Send + 'static, + I: IntoIterator, Error>>, + { + // Unpin requests will come here so that we can look out for them. + let (unpin_tx, unpin_rx) = std::sync::mpsc::channel(); + + let follow_stream = FollowStream::new(test_stream_getter(events)); + let unpin_method: UnpinMethod = Box::new(move |hash, sub_id| { + unpin_tx.send((hash, sub_id)).unwrap(); + Box::pin(std::future::ready(())) + }); + + let follow_unpin = FollowStreamUnpin::new(follow_stream, unpin_method, max_life); + (follow_unpin, unpin_rx) + } + + /// An initialized event containing a BlockRef (useful for comparisons) + pub fn ev_initialized_ref(n: u64) -> FollowEvent> { + FollowEvent::Initialized(Initialized { + finalized_block_hash: BlockRef::new(H256::from_low_u64_le(n)), + finalized_block_runtime: None, + }) + } + + /// A new block event containing a BlockRef (useful for comparisons) + pub fn ev_new_block_ref(parent: u64, n: u64) -> FollowEvent> { + FollowEvent::NewBlock(NewBlock { + parent_block_hash: BlockRef::new(H256::from_low_u64_le(parent)), + block_hash: BlockRef::new(H256::from_low_u64_le(n)), + new_runtime: None, + }) + } + + /// A best block event containing a BlockRef (useful for comparisons) + pub fn ev_best_block_ref(n: u64) -> FollowEvent> { + FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: BlockRef::new(H256::from_low_u64_le(n)), + }) + } + + /// A finalized event containing a BlockRef (useful for comparisons) + pub fn ev_finalized_ref(ns: impl IntoIterator) -> FollowEvent> { + FollowEvent::Finalized(Finalized { + finalized_block_hashes: ns + .into_iter() + .map(|h| BlockRef::new(H256::from_low_u64_le(h))) + .collect(), + pruned_block_hashes: vec![], + }) + } +} + +#[cfg(test)] +mod test { + use super::super::follow_stream::test_utils::{ + ev_best_block, ev_finalized, ev_initialized, ev_new_block, + }; + use super::test_utils::{ev_new_block_ref, test_unpin_stream_getter}; + use super::*; + use crate::config::substrate::H256; + + #[tokio::test] + async fn hands_back_blocks() { + let (follow_unpin, _) = test_unpin_stream_getter( + || { + [ + Ok(ev_new_block(0, 1)), + Ok(ev_new_block(1, 2)), + Ok(ev_new_block(2, 3)), + Err(Error::Other("ended".to_owned())), + ] + }, + 10, + ); + + let out: Vec<_> = follow_unpin + .filter_map(|e| async move { e.ok() }) + .collect() + .await; + + assert_eq!( + out, + vec![ + FollowStreamMsg::Ready("sub_id_0".into()), + FollowStreamMsg::Event(ev_new_block_ref(0, 1)), + FollowStreamMsg::Event(ev_new_block_ref(1, 2)), + FollowStreamMsg::Event(ev_new_block_ref(2, 3)), + ] + ); + } + + #[tokio::test] + async fn unpins_old_blocks() { + let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter( + || { + [ + Ok(ev_initialized(0)), + Ok(ev_finalized([1])), + Ok(ev_finalized([2])), + Ok(ev_finalized([3])), + Ok(ev_finalized([4])), + Ok(ev_finalized([5])), + Err(Error::Other("ended".to_owned())), + ] + }, + 3, + ); + + let _r = follow_unpin.next().await.unwrap().unwrap(); + let _i0 = follow_unpin.next().await.unwrap().unwrap(); + unpin_rx.try_recv().expect_err("nothing unpinned yet"); + let _f1 = follow_unpin.next().await.unwrap().unwrap(); + unpin_rx.try_recv().expect_err("nothing unpinned yet"); + let _f2 = follow_unpin.next().await.unwrap().unwrap(); + unpin_rx.try_recv().expect_err("nothing unpinned yet"); + let _f3 = follow_unpin.next().await.unwrap().unwrap(); + + // Max age is 3, so after block 3 finalized, block 0 becomes too old and is unpinned. + let (hash, _) = unpin_rx + .try_recv() + .expect("unpin call should have happened"); + assert_eq!(hash, H256::from_low_u64_le(0)); + + let _f4 = follow_unpin.next().await.unwrap().unwrap(); + + // Block 1 is now too old and is unpinned. + let (hash, _) = unpin_rx + .try_recv() + .expect("unpin call should have happened"); + assert_eq!(hash, H256::from_low_u64_le(1)); + + let _f5 = follow_unpin.next().await.unwrap().unwrap(); + + // Block 2 is now too old and is unpinned. + let (hash, _) = unpin_rx + .try_recv() + .expect("unpin call should have happened"); + assert_eq!(hash, H256::from_low_u64_le(2)); + } + + #[tokio::test] + async fn unpins_dropped_blocks() { + let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter( + || { + [ + Ok(ev_initialized(0)), + Ok(ev_finalized([1])), + Ok(ev_finalized([2])), + Err(Error::Other("ended".to_owned())), + ] + }, + 10, + ); + + let _r = follow_unpin.next().await.unwrap().unwrap(); + let _i0 = follow_unpin.next().await.unwrap().unwrap(); + let f1 = follow_unpin.next().await.unwrap().unwrap(); + + // We don't care about block 1 any more; drop it. unpins happen at finalized evs. + drop(f1); + + let _f2 = follow_unpin.next().await.unwrap().unwrap(); + + // Check that we get the expected unpin event. + let (hash, _) = unpin_rx + .try_recv() + .expect("unpin call should have happened"); + assert_eq!(hash, H256::from_low_u64_le(1)); + + // Confirm that 0 and 2 are still pinned and that 1 isnt. + assert!(!follow_unpin.is_pinned(&H256::from_low_u64_le(1))); + assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(0))); + assert!(follow_unpin.is_pinned(&H256::from_low_u64_le(2))); + } + + #[tokio::test] + async fn only_unpins_if_finalized_is_dropped() { + // If we drop the "new block" and "best block" BlockRefs, + // and then the block comes back as finalized (for instance), + // no unpin call should be made unless we also drop the finalized + // one. + let (mut follow_unpin, unpin_rx) = test_unpin_stream_getter( + || { + [ + Ok(ev_initialized(0)), + Ok(ev_new_block(0, 1)), + Ok(ev_best_block(1)), + Ok(ev_finalized([1])), + Ok(ev_finalized([2])), + Err(Error::Other("ended".to_owned())), + ] + }, + 100, + ); + + let _r = follow_unpin.next().await.unwrap().unwrap(); + let _i0 = follow_unpin.next().await.unwrap().unwrap(); + let n1 = follow_unpin.next().await.unwrap().unwrap(); + drop(n1); + let b1 = follow_unpin.next().await.unwrap().unwrap(); + drop(b1); + let f1 = follow_unpin.next().await.unwrap().unwrap(); + + // even though we dropped our block 1 in the new/best events, it won't be unpinned + // because it occurred again in finalized event. + unpin_rx.try_recv().expect_err("nothing unpinned yet"); + + drop(f1); + let _f2 = follow_unpin.next().await.unwrap().unwrap(); + + // Since we dropped the finalized block 1, we'll now unpin it when next block finalized. + let (hash, _) = unpin_rx.try_recv().expect("unpin should have happened now"); + assert_eq!(hash, H256::from_low_u64_le(1)); + } +} diff --git a/subxt/src/backend/unstable/mod.rs b/subxt/src/backend/unstable/mod.rs index c29f8b9c5a..6610e5f903 100644 --- a/subxt/src/backend/unstable/mod.rs +++ b/subxt/src/backend/unstable/mod.rs @@ -11,6 +11,589 @@ //! Everything in this module is **unstable**, meaning that it could change without //! warning at any time. +mod follow_stream; +mod follow_stream_driver; +mod follow_stream_unpin; +mod storage_items; + pub mod rpc_methods; +use self::rpc_methods::{ + FollowEvent, MethodResponse, RuntimeEvent, StorageQuery, StorageQueryType, StorageResultType, +}; +use crate::backend::{ + rpc::RpcClient, Backend, BlockRef, BlockRefT, RuntimeVersion, StorageResponse, StreamOf, + StreamOfResults, TransactionStatus, +}; +use crate::config::BlockHash; +use crate::error::{Error, RpcError}; +use crate::Config; +use async_trait::async_trait; +use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle}; +use futures::{Stream, StreamExt}; +use std::collections::HashMap; +use std::sync::Arc; +use std::task::Poll; +use storage_items::StorageItems; + +// Expose the RPC methods. pub use rpc_methods::UnstableRpcMethods; + +/// Configure and build an [`UnstableBackend`]. +pub struct UnstableBackendBuilder { + max_block_life: usize, + _marker: std::marker::PhantomData, +} + +impl Default for UnstableBackendBuilder { + fn default() -> Self { + Self::new() + } +} + +impl UnstableBackendBuilder { + /// Create a new [`UnstableBackendBuilder`]. + pub fn new() -> Self { + Self { + max_block_life: usize::MAX, + _marker: std::marker::PhantomData, + } + } + + /// The age of a block is defined here as the difference between the current finalized block number + /// and the block number of a given block. Once the difference equals or exceeds the number given + /// here, the block is unpinned. + /// + /// By default, we will never automatically unpin blocks, but if the number of pinned blocks that we + /// keep hold of exceeds the number that the server can tolerate, then a `stop` event is generated and + /// we are forced to resubscribe, losing any pinned blocks. + pub fn max_block_life(mut self, max_block_life: usize) -> Self { + self.max_block_life = max_block_life; + self + } + + /// Given an [`RpcClient`] to use to make requests, this returns a tuple of an [`UnstableBackend`], + /// which implements the [`Backend`] trait, and an [`UnstableBackendDriver`] which must be polled in + /// order for the backend to make progress. + pub fn build(self, client: RpcClient) -> (UnstableBackend, UnstableBackendDriver) { + // Construct the underlying follow_stream layers: + let rpc_methods = UnstableRpcMethods::new(client); + let follow_stream = + follow_stream::FollowStream::::from_methods(rpc_methods.clone()); + let follow_stream_unpin = follow_stream_unpin::FollowStreamUnpin::::from_methods( + follow_stream, + rpc_methods.clone(), + self.max_block_life, + ); + let follow_stream_driver = FollowStreamDriver::new(follow_stream_unpin); + + // Wrap these into the backend and driver that we'll expose. + let backend = UnstableBackend { + methods: rpc_methods, + follow_handle: follow_stream_driver.handle(), + }; + let driver = UnstableBackendDriver { + driver: follow_stream_driver, + }; + + (backend, driver) + } +} + +/// Driver for the [`UnstableBackend`]. This must be polled in order for the +/// backend to make progress. +#[derive(Debug)] +pub struct UnstableBackendDriver { + driver: FollowStreamDriver, +} + +impl Stream for UnstableBackendDriver { + type Item = as Stream>::Item; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.driver.poll_next_unpin(cx) + } +} + +/// The unstable backend. +#[derive(Debug, Clone)] +pub struct UnstableBackend { + // RPC methods we'll want to call: + methods: UnstableRpcMethods, + // A handle to the chainHead_follow subscription: + follow_handle: FollowStreamDriverHandle, +} + +impl UnstableBackend { + /// Configure and construct an [`UnstableBackend`] and the associated [`UnstableBackendDriver`]. + pub fn builder() -> UnstableBackendBuilder { + UnstableBackendBuilder::new() + } + + /// Stream block headers based on the provided filter fn + async fn stream_headers( + &self, + f: F, + ) -> Result)>, Error> + where + F: Fn(FollowEvent>) -> I + Copy + Send + 'static, + I: IntoIterator> + Send + 'static, + ::IntoIter: Send, + { + let sub_id = get_subscription_id(&self.follow_handle).await?; + let sub_id = Arc::new(sub_id); + let methods = self.methods.clone(); + let headers = self.follow_handle.subscribe().events().flat_map(move |ev| { + let sub_id = sub_id.clone(); + let methods = methods.clone(); + + let block_refs = f(ev).into_iter(); + + futures::stream::iter(block_refs).filter_map(move |block_ref| { + let sub_id = sub_id.clone(); + let methods = methods.clone(); + + async move { + let res = methods + .chainhead_unstable_header(&sub_id, block_ref.hash()) + .await + .transpose()?; + + let header = match res { + Ok(header) => header, + Err(e) => return Some(Err(e)), + }; + + Some(Ok((header, block_ref.into()))) + } + }) + }); + + Ok(StreamOf(Box::pin(headers))) + } +} + +impl BlockRefT for follow_stream_unpin::BlockRef {} +impl From> for BlockRef { + fn from(b: follow_stream_unpin::BlockRef) -> Self { + BlockRef::new(b.hash(), b) + } +} + +impl super::sealed::Sealed for UnstableBackend {} + +#[async_trait] +impl Backend for UnstableBackend { + async fn storage_fetch_values( + &self, + keys: Vec>, + at: T::Hash, + ) -> Result, Error> { + let queries = keys.iter().map(|key| StorageQuery { + key: &**key, + query_type: StorageQueryType::Value, + }); + + let storage_items = + StorageItems::from_methods(queries, at, &self.follow_handle, self.methods.clone()) + .await?; + + let storage_result_stream = storage_items.filter_map(|val| async move { + let val = match val { + Ok(val) => val, + Err(e) => return Some(Err(e)), + }; + + let StorageResultType::Value(result) = val.result else { + return None; + }; + Some(Ok(StorageResponse { + key: val.key.0, + value: result.0, + })) + }); + + Ok(StreamOf(Box::pin(storage_result_stream))) + } + + async fn storage_fetch_descendant_keys( + &self, + key: Vec, + at: T::Hash, + ) -> Result>, Error> { + // Ask for hashes, and then just ignore them and return the keys that come back. + let query = StorageQuery { + key: &*key, + query_type: StorageQueryType::DescendantsHashes, + }; + + let storage_items = StorageItems::from_methods( + std::iter::once(query), + at, + &self.follow_handle, + self.methods.clone(), + ) + .await?; + + let storage_result_stream = storage_items.map(|val| val.map(|v| v.key.0)); + Ok(StreamOf(Box::pin(storage_result_stream))) + } + + async fn storage_fetch_descendant_values( + &self, + key: Vec, + at: T::Hash, + ) -> Result, Error> { + let query = StorageQuery { + key: &*key, + query_type: StorageQueryType::DescendantsValues, + }; + + let storage_items = StorageItems::from_methods( + std::iter::once(query), + at, + &self.follow_handle, + self.methods.clone(), + ) + .await?; + + let storage_result_stream = storage_items.filter_map(|val| async move { + let val = match val { + Ok(val) => val, + Err(e) => return Some(Err(e)), + }; + + let StorageResultType::Value(result) = val.result else { + return None; + }; + Some(Ok(StorageResponse { + key: val.key.0, + value: result.0, + })) + }); + + Ok(StreamOf(Box::pin(storage_result_stream))) + } + + async fn genesis_hash(&self) -> Result { + self.methods.chainspec_v1_genesis_hash().await + } + + async fn block_header(&self, at: T::Hash) -> Result, Error> { + let sub_id = get_subscription_id(&self.follow_handle).await?; + self.methods.chainhead_unstable_header(&sub_id, at).await + } + + async fn block_body(&self, at: T::Hash) -> Result>>, Error> { + let sub_id = get_subscription_id(&self.follow_handle).await?; + + // Subscribe to the body response and get our operationId back. + let follow_events = self.follow_handle.subscribe().events(); + let status = self.methods.chainhead_unstable_body(&sub_id, at).await?; + let operation_id = match status { + MethodResponse::LimitReached => { + return Err(RpcError::request_rejected("limit reached").into()) + } + MethodResponse::Started(s) => s.operation_id, + }; + + // Wait for the response to come back with the correct operationId. + let mut exts_stream = follow_events.filter_map(|ev| { + let FollowEvent::OperationBodyDone(body) = ev else { + return std::future::ready(None); + }; + if body.operation_id != operation_id { + return std::future::ready(None); + } + let exts: Vec<_> = body.value.into_iter().map(|ext| ext.0).collect(); + std::future::ready(Some(exts)) + }); + + Ok(exts_stream.next().await) + } + + async fn latest_finalized_block_ref(&self) -> Result, Error> { + let next_ref: Option> = self + .follow_handle + .subscribe() + .events() + .filter_map(|ev| { + let out = match ev { + FollowEvent::Initialized(init) => Some(init.finalized_block_hash.into()), + _ => None, + }; + std::future::ready(out) + }) + .next() + .await; + + next_ref.ok_or_else(|| RpcError::SubscriptionDropped.into()) + } + + async fn current_runtime_version(&self) -> Result { + // Just start a stream of version infos, and return the first value we get from it. + let runtime_version = self.stream_runtime_version().await?.next().await; + match runtime_version { + None => Err(Error::Rpc(RpcError::SubscriptionDropped)), + Some(Err(e)) => Err(e), + Some(Ok(version)) => Ok(version), + } + } + + async fn stream_runtime_version(&self) -> Result, Error> { + // Keep track of runtime details announced in new blocks, and then when blocks + // are finalized, find the latest of these that has runtime details, and clear the rest. + let mut runtimes = HashMap::new(); + let runtime_stream = self + .follow_handle + .subscribe() + .events() + .filter_map(move |ev| { + let output = match ev { + FollowEvent::Initialized(ev) => { + runtimes.clear(); + ev.finalized_block_runtime + } + FollowEvent::NewBlock(ev) => { + if let Some(runtime) = ev.new_runtime { + runtimes.insert(ev.block_hash.hash(), runtime); + } + None + } + FollowEvent::Finalized(ev) => { + let next_runtime = ev + .finalized_block_hashes + .iter() + .rev() + .filter_map(|h| runtimes.get(&h.hash()).cloned()) + .next(); + + runtimes.clear(); + next_runtime + } + _ => None, + }; + + let runtime_event = match output { + None => return std::future::ready(None), + Some(ev) => ev, + }; + + let runtime_details = match runtime_event { + RuntimeEvent::Invalid(err) => { + return std::future::ready(Some(Err(Error::Other(err.error)))) + } + RuntimeEvent::Valid(ev) => ev, + }; + + std::future::ready(Some(Ok(RuntimeVersion { + spec_version: runtime_details.spec.spec_version, + transaction_version: runtime_details.spec.transaction_version, + }))) + }); + + Ok(StreamOf(Box::pin(runtime_stream))) + } + + async fn stream_all_block_headers( + &self, + ) -> Result)>, Error> { + self.stream_headers(|ev| match ev { + FollowEvent::Initialized(ev) => Some(ev.finalized_block_hash), + FollowEvent::NewBlock(ev) => Some(ev.block_hash), + _ => None, + }) + .await + } + + async fn stream_best_block_headers( + &self, + ) -> Result)>, Error> { + self.stream_headers(|ev| match ev { + FollowEvent::Initialized(ev) => Some(ev.finalized_block_hash), + FollowEvent::BestBlockChanged(ev) => Some(ev.best_block_hash), + _ => None, + }) + .await + } + + async fn stream_finalized_block_headers( + &self, + ) -> Result)>, Error> { + self.stream_headers(|ev| match ev { + FollowEvent::Initialized(ev) => { + vec![ev.finalized_block_hash] + } + FollowEvent::Finalized(ev) => ev.finalized_block_hashes, + _ => vec![], + }) + .await + } + + async fn submit_transaction( + &self, + extrinsic: &[u8], + ) -> Result>, Error> { + // First, subscribe to all new block hashes + let mut new_blocks = self.follow_handle.subscribe().events().filter_map(|ev| { + std::future::ready(match ev { + FollowEvent::NewBlock(ev) => Some(ev.block_hash), + _ => None, + }) + }); + + // Then, submit the transaction. + let mut tx_progress = self + .methods + .transaction_unstable_submit_and_watch(extrinsic) + .await?; + + let mut seen_blocks = HashMap::new(); + let mut done = false; + + // If we see the finalized event, we start waiting until we find a block that + // matches, so we can guarantee to return a pinned block hash. + let mut finalized_hash: Option = None; + + // Now we can attempt to associate tx events with pinned blocks. + let tx_stream = futures::stream::poll_fn(move |cx| { + loop { + // Bail early if no more tx events; we don't want to keep polling for pinned blocks. + if done { + return Poll::Ready(None); + } + + // Save any pinned blocks. Keep doing this until no more, so that we always have the most uptodate + // pinned blocks when we are looking at our tx events. + if let Poll::Ready(Some(block_ref)) = new_blocks.poll_next_unpin(cx) { + seen_blocks.insert(block_ref.hash(), block_ref); + continue; + } + + // If we have a finalized hash, we are done looking for tx events and we are just waiting + // for a pinned block with a matching hash (which must appear eventually given it's finalized). + if let Some(hash) = &finalized_hash { + if let Some(block_ref) = seen_blocks.remove(hash) { + // Found it! Hand back the event with a pinned block. We're done. + done = true; + let ev = TransactionStatus::InFinalizedBlock { + hash: block_ref.into(), + }; + return Poll::Ready(Some(Ok(ev))); + } else { + // Keep waiting for more new blocks until we find it (get rid of any other block refs + // now, since none of them were what we were looking for anyway). + seen_blocks.clear(); + continue; + } + } + + // Otherwise, we are still watching for tx events: + let ev = match tx_progress.poll_next_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { + done = true; + return Poll::Ready(None); + } + Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), + Poll::Ready(Some(Ok(ev))) => ev, + }; + + // When we get one, map it to the correct format (or for finalized ev, wait for the pinned block): + let ev = match ev { + rpc_methods::TransactionStatus::Finalized { block } => { + // We'll wait until we have seen this hash, to try to guarantee + // that when we return this event, the corresponding block is + // pinned and accessible. + finalized_hash = Some(block.hash); + continue; + } + rpc_methods::TransactionStatus::BestChainBlockIncluded { + block: Some(block), + } => { + // Look up a pinned block ref if we can, else return a non-pinned + // block that likely isn't accessible. We have no guarantee that a best + // block on the node a tx was sent to will ever be known about on the + // chainHead_follow subscription. + let block_ref = match seen_blocks.get(&block.hash).cloned() { + Some(block_ref) => block_ref.into(), + None => BlockRef::from_hash(block.hash), + }; + TransactionStatus::InBestBlock { hash: block_ref } + } + rpc_methods::TransactionStatus::BestChainBlockIncluded { block: None } => { + TransactionStatus::NoLongerInBestBlock + } + rpc_methods::TransactionStatus::Broadcasted { num_peers } => { + TransactionStatus::Broadcasted { num_peers } + } + rpc_methods::TransactionStatus::Dropped { error, .. } => { + TransactionStatus::Dropped { message: error } + } + rpc_methods::TransactionStatus::Error { error } => { + TransactionStatus::Dropped { message: error } + } + rpc_methods::TransactionStatus::Invalid { error } => { + TransactionStatus::Invalid { message: error } + } + rpc_methods::TransactionStatus::Validated => TransactionStatus::Validated, + }; + return Poll::Ready(Some(Ok(ev))); + } + }); + + Ok(StreamOf(Box::pin(tx_stream))) + } + + async fn call( + &self, + method: &str, + call_parameters: Option<&[u8]>, + at: T::Hash, + ) -> Result, Error> { + let sub_id = get_subscription_id(&self.follow_handle).await?; + + // Subscribe to the body response and get our operationId back. + let follow_events = self.follow_handle.subscribe().events(); + let call_parameters = call_parameters.unwrap_or(&[]); + let status = self + .methods + .chainhead_unstable_call(&sub_id, at, method, call_parameters) + .await?; + let operation_id = match status { + MethodResponse::LimitReached => { + return Err(RpcError::request_rejected("limit reached").into()) + } + MethodResponse::Started(s) => s.operation_id, + }; + + // Wait for the response to come back with the correct operationId. + let mut call_data_stream = follow_events.filter_map(|ev| { + let FollowEvent::OperationCallDone(body) = ev else { + return std::future::ready(None); + }; + if body.operation_id != operation_id { + return std::future::ready(None); + } + std::future::ready(Some(body.output.0)) + }); + + call_data_stream + .next() + .await + .ok_or_else(|| RpcError::SubscriptionDropped.into()) + } +} + +/// A helper to obtain a subscription ID. +async fn get_subscription_id( + follow_handle: &FollowStreamDriverHandle, +) -> Result { + let Some(sub_id) = follow_handle.subscribe().subscription_id().await else { + return Err(RpcError::SubscriptionDropped.into()); + }; + + Ok(sub_id) +} diff --git a/subxt/src/backend/unstable/rpc_methods.rs b/subxt/src/backend/unstable/rpc_methods.rs index 965dc1288b..5df83f4d97 100644 --- a/subxt/src/backend/unstable/rpc_methods.rs +++ b/subxt/src/backend/unstable/rpc_methods.rs @@ -7,10 +7,11 @@ //! methods exposed here. use crate::backend::rpc::{rpc_params, RpcClient, RpcSubscription}; +use crate::config::BlockHash; use crate::{Config, Error}; use futures::{Stream, StreamExt}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::task::Poll; /// An interface to call the unstable RPC methods. This interface is instantiated with @@ -64,8 +65,8 @@ impl UnstableRpcMethods { pub async fn chainhead_unstable_follow( &self, with_runtime: bool, - ) -> Result>, Error> { - let subscription = self + ) -> Result, Error> { + let sub = self .client .subscribe( "chainHead_unstable_follow", @@ -74,7 +75,7 @@ impl UnstableRpcMethods { ) .await?; - Ok(subscription) + Ok(FollowSubscription { sub, done: false }) } /// Resumes a storage fetch started with chainHead_unstable_storage after it has generated an @@ -500,7 +501,7 @@ pub struct OperationBodyDone { /// The operation id of the event. pub operation_id: String, /// Array of hexadecimal-encoded scale-encoded extrinsics found in the block. - pub value: Vec, + pub value: Vec, } /// The response of the `chainHead_call` method. @@ -510,7 +511,7 @@ pub struct OperationCallDone { /// The operation id of the event. pub operation_id: String, /// Hexadecimal-encoded output of the runtime function call. - pub output: String, + pub output: Bytes, } /// The response of the `chainHead_call` method. @@ -520,7 +521,7 @@ pub struct OperationStorageItems { /// The operation id of the event. pub operation_id: String, /// The resulting items. - pub items: Vec, + pub items: VecDeque, } /// Indicate a problem during the operation. @@ -538,7 +539,7 @@ pub struct OperationError { #[serde(rename_all = "camelCase")] pub struct StorageResult { /// The hex-encoded key of the result. - pub key: String, + pub key: Bytes, /// The result of the query. #[serde(flatten)] pub result: StorageResultType, @@ -549,11 +550,11 @@ pub struct StorageResult { #[serde(rename_all = "camelCase")] pub enum StorageResultType { /// Fetch the value of the provided key. - Value(String), + Value(Bytes), /// Fetch the hash of the value of the provided key. - Hash(String), + Hash(Bytes), /// Fetch the closest descendant merkle value. - ClosestDescendantMerkleValue(String), + ClosestDescendantMerkleValue(Bytes), } /// The method respose of `chainHead_body`, `chainHead_call` and `chainHead_storage`. @@ -604,6 +605,44 @@ pub enum StorageQueryType { DescendantsHashes, } +/// A subscription which returns follow events, and ends when a Stop event occurs. +pub struct FollowSubscription { + sub: RpcSubscription>, + done: bool, +} + +impl FollowSubscription { + /// Fetch the next item in the stream. + pub async fn next(&mut self) -> Option<::Item> { + ::next(self).await + } + /// Fetch the subscription ID for the stream. + pub fn subscription_id(&self) -> Option<&str> { + self.sub.subscription_id() + } +} + +impl Stream for FollowSubscription { + type Item = > as Stream>::Item; + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + if self.done { + return Poll::Ready(None); + } + + let res = self.sub.poll_next_unpin(cx); + + if let Poll::Ready(Some(Ok(FollowEvent::Stop))) = &res { + // No more events will occur after this one. + self.done = true + } + + res + } +} + /// A subscription which returns transaction status events, stopping /// when no more events will be sent. pub struct TransactionSubscription { @@ -611,14 +650,14 @@ pub struct TransactionSubscription { done: bool, } -impl TransactionSubscription { +impl TransactionSubscription { /// Fetch the next item in the stream. pub async fn next(&mut self) -> Option<::Item> { - StreamExt::next(self).await + ::next(self).await } } -impl Stream for TransactionSubscription { +impl Stream for TransactionSubscription { type Item = > as Stream>::Item; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -694,10 +733,10 @@ pub enum TransactionStatus { #[derive(Debug, Clone, PartialEq, Eq, Deserialize)] pub struct TransactionBlockDetails { /// The block hash. - hash: Hash, + pub hash: Hash, /// The index of the transaction in the block. #[serde(with = "unsigned_number_as_string")] - index: u64, + pub index: u64, } /// Hex-serialized shim for `Vec`. diff --git a/subxt/src/backend/unstable/storage_items.rs b/subxt/src/backend/unstable/storage_items.rs new file mode 100644 index 0000000000..73790765ea --- /dev/null +++ b/subxt/src/backend/unstable/storage_items.rs @@ -0,0 +1,160 @@ +// Copyright 2019-2023 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use super::follow_stream_driver::FollowStreamDriverHandle; +use super::follow_stream_unpin::BlockRef; +use super::rpc_methods::{ + FollowEvent, MethodResponse, StorageQuery, StorageResult, UnstableRpcMethods, +}; +use crate::config::Config; +use crate::error::{Error, RpcError}; +use futures::{FutureExt, Stream, StreamExt}; +use std::collections::VecDeque; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +/// Obtain a stream of storage items given some query. this handles continuing +/// and stopping under the hood, and returns a stream of `StorageResult`s. +pub struct StorageItems { + done: bool, + operation_id: Arc, + buffered_responses: VecDeque, + continue_call: ContinueFutGetter, + continue_fut: Option, + follow_event_stream: FollowEventStream, +} + +impl StorageItems { + // Subscribe to follow events, and return a stream of storage results + // given some storage queries. The stream will automatically resume as + // needed, and stop when done. + pub async fn from_methods( + queries: impl Iterator>, + at: T::Hash, + follow_handle: &FollowStreamDriverHandle, + methods: UnstableRpcMethods, + ) -> Result { + let sub_id = super::get_subscription_id(follow_handle).await?; + + // Subscribe to events and make the initial request to get an operation ID. + let follow_events = follow_handle.subscribe().events(); + let status = methods + .chainhead_unstable_storage(&sub_id, at, queries, None) + .await?; + let operation_id: Arc = match status { + MethodResponse::LimitReached => { + return Err(RpcError::request_rejected("limit reached").into()) + } + MethodResponse::Started(s) => s.operation_id.into(), + }; + + // A function which returns the call to continue the subscription: + let continue_call: ContinueFutGetter = { + let operation_id = operation_id.clone(); + Box::new(move || { + let sub_id = sub_id.clone(); + let operation_id = operation_id.clone(); + let methods = methods.clone(); + + Box::pin(async move { + methods + .chainhead_unstable_continue(&sub_id, &operation_id) + .await + }) + }) + }; + + Ok(StorageItems::new( + operation_id, + continue_call, + Box::pin(follow_events), + )) + } + + fn new( + operation_id: Arc, + continue_call: ContinueFutGetter, + follow_event_stream: FollowEventStream, + ) -> Self { + Self { + done: false, + buffered_responses: VecDeque::new(), + operation_id, + continue_call, + continue_fut: None, + follow_event_stream, + } + } +} + +pub type FollowEventStream = + Pin>> + Send + 'static>>; +pub type ContinueFutGetter = Box ContinueFut + Send + 'static>; +pub type ContinueFut = Pin> + Send + 'static>>; + +impl Stream for StorageItems { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + if self.done { + return Poll::Ready(None); + } + + if let Some(item) = self.buffered_responses.pop_front() { + return Poll::Ready(Some(Ok(item))); + } + + if let Some(mut fut) = self.continue_fut.take() { + match fut.poll_unpin(cx) { + Poll::Pending => { + self.continue_fut = Some(fut); + return Poll::Pending; + } + Poll::Ready(Err(e)) => { + self.done = true; + return Poll::Ready(Some(Err(e))); + } + Poll::Ready(Ok(())) => { + // Finished; carry on. + } + } + } + + let ev = match self.follow_event_stream.poll_next_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(ev)) => ev, + }; + + match ev { + FollowEvent::OperationWaitingForContinue(id) + if id.operation_id == *self.operation_id => + { + // Start a call to ask for more events + self.continue_fut = Some((self.continue_call)()); + continue; + } + FollowEvent::OperationStorageDone(id) if id.operation_id == *self.operation_id => { + // We're finished! + self.done = true; + return Poll::Ready(None); + } + FollowEvent::OperationStorageItems(items) + if items.operation_id == *self.operation_id => + { + // We have items; buffer them to emit next loops. + self.buffered_responses = items.items; + continue; + } + _ => { + // We don't care about this event; wait for the next. + continue; + } + } + } + } +} diff --git a/subxt/src/blocks/blocks_client.rs b/subxt/src/blocks/blocks_client.rs index 4a80f775fa..9ec031a35a 100644 --- a/subxt/src/blocks/blocks_client.rs +++ b/subxt/src/blocks/blocks_client.rs @@ -69,10 +69,10 @@ where ) -> impl Future, Error>> + Send + 'static { let client = self.client.clone(); async move { - // If a block ref isn't provided, we'll get the latest best block to use. + // If a block ref isn't provided, we'll get the latest finalized ref to use. let block_ref = match block_ref { Some(r) => r, - None => client.backend().latest_best_block_ref().await?, + None => client.backend().latest_finalized_block_ref().await?, }; let block_header = match client.backend().block_header(block_ref.hash()).await? { diff --git a/subxt/src/client/lightclient/builder.rs b/subxt/src/client/light_client/builder.rs similarity index 99% rename from subxt/src/client/lightclient/builder.rs rename to subxt/src/client/light_client/builder.rs index 8c02f2f761..5669654eee 100644 --- a/subxt/src/client/lightclient/builder.rs +++ b/subxt/src/client/light_client/builder.rs @@ -3,7 +3,7 @@ // see LICENSE for license details. use super::{rpc::LightClientRpc, LightClient, LightClientError}; -use crate::backend::rpc::RpcClient; +use crate::backend::{rpc::RpcClient, Backend}; use crate::{config::Config, error::Error, OnlineClient}; use std::num::NonZeroU32; use subxt_lightclient::{AddChainConfig, AddChainConfigJsonRpc, ChainId}; diff --git a/subxt/src/client/lightclient/mod.rs b/subxt/src/client/light_client/mod.rs similarity index 100% rename from subxt/src/client/lightclient/mod.rs rename to subxt/src/client/light_client/mod.rs diff --git a/subxt/src/client/lightclient/rpc.rs b/subxt/src/client/light_client/rpc.rs similarity index 100% rename from subxt/src/client/lightclient/rpc.rs rename to subxt/src/client/light_client/rpc.rs diff --git a/subxt/src/client/mod.rs b/subxt/src/client/mod.rs index a800ad603c..446787d6b1 100644 --- a/subxt/src/client/mod.rs +++ b/subxt/src/client/mod.rs @@ -12,7 +12,7 @@ mod offline_client; mod online_client; #[cfg(feature = "unstable-light-client")] -mod lightclient; +mod light_client; pub use offline_client::{OfflineClient, OfflineClientT}; pub use online_client::{ @@ -20,4 +20,4 @@ pub use online_client::{ }; #[cfg(feature = "unstable-light-client")] -pub use lightclient::{LightClient, LightClientBuilder, LightClientError}; +pub use light_client::{LightClient, LightClientBuilder, LightClientError}; diff --git a/subxt/src/client/online_client.rs b/subxt/src/client/online_client.rs index 6e042dde52..338643ebea 100644 --- a/subxt/src/client/online_client.rs +++ b/subxt/src/client/online_client.rs @@ -106,7 +106,7 @@ impl OnlineClient { /// Construct a new [`OnlineClient`] by providing an underlying [`Backend`] /// implementation to power it. Other details will be obtained from the chain. pub async fn from_backend>(backend: Arc) -> Result, Error> { - let latest_block = backend.latest_best_block_ref().await?; + let latest_block = backend.latest_finalized_block_ref().await?; let (genesis_hash, runtime_version, metadata) = future::join3( backend.genesis_hash(), @@ -437,7 +437,7 @@ impl RuntimeUpdaterStream { Err(err) => return Some(Err(err)), }; - let latest_block_ref = match self.client.backend().latest_best_block_ref().await { + let latest_block_ref = match self.client.backend().latest_finalized_block_ref().await { Ok(block_ref) => block_ref, Err(e) => return Some(Err(e)), }; diff --git a/subxt/src/config/mod.rs b/subxt/src/config/mod.rs index d6b05cf24a..960c58b6d7 100644 --- a/subxt/src/config/mod.rs +++ b/subxt/src/config/mod.rs @@ -32,16 +32,7 @@ pub use substrate::{SubstrateConfig, SubstrateExtrinsicParams, SubstrateExtrinsi // rather than having to `unsafe impl` them ourselves. pub trait Config: Sized + Send + Sync + 'static { /// The output of the `Hasher` function. - type Hash: Debug - + Copy - + Send - + Sync - + Decode - + AsRef<[u8]> - + Serialize - + DeserializeOwned - + Encode - + PartialEq; + type Hash: BlockHash; /// The account ID type. type AccountId: Debug + Clone + Encode; @@ -65,6 +56,38 @@ pub trait Config: Sized + Send + Sync + 'static { /// given some [`Config`], this return the other params needed for its `ExtrinsicParams`. pub type OtherParamsFor = <::ExtrinsicParams as ExtrinsicParams>::OtherParams; +/// Block hashes must conform to a bunch of things to be used in Subxt. +pub trait BlockHash: + Debug + + Copy + + Send + + Sync + + Decode + + AsRef<[u8]> + + Serialize + + DeserializeOwned + + Encode + + PartialEq + + Eq + + std::hash::Hash +{ +} +impl BlockHash for T where + T: Debug + + Copy + + Send + + Sync + + Decode + + AsRef<[u8]> + + Serialize + + DeserializeOwned + + Encode + + PartialEq + + Eq + + std::hash::Hash +{ +} + /// This represents the hasher used by a node to hash things like block headers /// and extrinsics. pub trait Hasher { diff --git a/subxt/src/error/mod.rs b/subxt/src/error/mod.rs index 527766f53d..d3c88ce6c5 100644 --- a/subxt/src/error/mod.rs +++ b/subxt/src/error/mod.rs @@ -108,11 +108,22 @@ pub enum RpcError { /// Error related to the RPC client. #[error("RPC error: {0}")] ClientError(Box), + /// This error signals that the request was rejected for some reason. + /// The specific reason is provided. + #[error("RPC error: request rejected")] + RequestRejected(String), /// The RPC subscription dropped. #[error("RPC error: subscription dropped.")] SubscriptionDropped, } +impl RpcError { + /// Create a `RequestRejected` error from anything that can be turned into a string. + pub fn request_rejected>(s: S) -> RpcError { + RpcError::RequestRejected(s.into()) + } +} + /// Block error #[derive(Clone, Debug, Eq, thiserror::Error, PartialEq)] #[non_exhaustive] diff --git a/subxt/src/events/events_client.rs b/subxt/src/events/events_client.rs index 1d750ed522..834e6cd64f 100644 --- a/subxt/src/events/events_client.rs +++ b/subxt/src/events/events_client.rs @@ -58,10 +58,10 @@ where // return a Future that's Send + 'static, rather than tied to &self. let client = self.client.clone(); async move { - // If a block ref isn't provided, we'll get the latest best block to use. + // If a block ref isn't provided, we'll get the latest finalized block to use. let block_ref = match block_ref { Some(r) => r, - None => client.backend().latest_best_block_ref().await?, + None => client.backend().latest_finalized_block_ref().await?, }; let event_bytes = get_event_bytes(client.backend(), block_ref.hash()).await?; diff --git a/subxt/src/runtime_api/runtime_client.rs b/subxt/src/runtime_api/runtime_client.rs index f75aa2d862..5770285b83 100644 --- a/subxt/src/runtime_api/runtime_client.rs +++ b/subxt/src/runtime_api/runtime_client.rs @@ -44,8 +44,8 @@ where // return a Future that's Send + 'static, rather than tied to &self. let client = self.client.clone(); async move { - // get the ref for the latest block and use that. - let block_ref = client.backend().latest_best_block_ref().await?; + // get the ref for the latest finalized block and use that. + let block_ref = client.backend().latest_finalized_block_ref().await?; Ok(RuntimeApi::new(client, block_ref)) } diff --git a/subxt/src/storage/storage_client.rs b/subxt/src/storage/storage_client.rs index 4fb980e596..55ab339aea 100644 --- a/subxt/src/storage/storage_client.rs +++ b/subxt/src/storage/storage_client.rs @@ -85,8 +85,8 @@ where // return a Future that's Send + 'static, rather than tied to &self. let client = self.client.clone(); async move { - // get the ref for the latest block and use that. - let block_ref = client.backend().latest_best_block_ref().await?; + // get the ref for the latest finalized block and use that. + let block_ref = client.backend().latest_finalized_block_ref().await?; Ok(Storage::new(client, block_ref)) } diff --git a/subxt/src/tx/tx_client.rs b/subxt/src/tx/tx_client.rs index 74d7192e7a..ff4e3416e8 100644 --- a/subxt/src/tx/tx_client.rs +++ b/subxt/src/tx/tx_client.rs @@ -169,7 +169,7 @@ where { /// Get the account nonce for a given account ID. pub async fn account_nonce(&self, account_id: &T::AccountId) -> Result { - let block_ref = self.client.backend().latest_best_block_ref().await?; + let block_ref = self.client.backend().latest_finalized_block_ref().await?; let account_nonce_bytes = self .client .backend() @@ -486,6 +486,7 @@ where TransactionStatus::Validated | TransactionStatus::Broadcasted { .. } | TransactionStatus::InBestBlock { .. } + | TransactionStatus::NoLongerInBestBlock | TransactionStatus::InFinalizedBlock { .. } => Ok(ext_hash), TransactionStatus::Error { message } => { Err(Error::Other(format!("Transaction error: {message}"))) @@ -509,7 +510,7 @@ where /// /// Returns `Ok` with a [`ValidationResult`], which is the result of attempting to dry run the extrinsic. pub async fn validate(&self) -> Result { - let latest_block_ref = self.client.backend().latest_best_block_ref().await?; + let latest_block_ref = self.client.backend().latest_finalized_block_ref().await?; self.validate_at(latest_block_ref).await } @@ -547,7 +548,7 @@ where pub async fn partial_fee_estimate(&self) -> Result { let mut params = self.encoded().to_vec(); (self.encoded().len() as u32).encode_to(&mut params); - let latest_block_ref = self.client.backend().latest_best_block_ref().await?; + let latest_block_ref = self.client.backend().latest_finalized_block_ref().await?; // destructuring RuntimeDispatchInfo, see type information // data layout: {weight_ref_time: Compact, weight_proof_size: Compact, class: u8, partial_fee: u128} diff --git a/subxt/src/tx/tx_progress.rs b/subxt/src/tx/tx_progress.rs index 6bf0956347..65164b1ffe 100644 --- a/subxt/src/tx/tx_progress.rs +++ b/subxt/src/tx/tx_progress.rs @@ -8,7 +8,7 @@ use std::task::Poll; use crate::utils::strip_compact_prefix; use crate::{ - backend::{StreamOfResults, TransactionStatus as BackendTxStatus}, + backend::{BlockRef, StreamOfResults, TransactionStatus as BackendTxStatus}, client::OnlineClientT, error::{DispatchError, Error, RpcError, TransactionError}, events::EventsClient, @@ -167,6 +167,7 @@ impl Stream for TxProgress { match status { BackendTxStatus::Validated => TxStatus::Validated, BackendTxStatus::Broadcasted { num_peers } => TxStatus::Broadcasted { num_peers }, + BackendTxStatus::NoLongerInBestBlock => TxStatus::NoLongerInBestBlock, BackendTxStatus::InBestBlock { hash } => { TxStatus::InBestBlock(TxInBlock::new(hash, self.ext_hash, self.client.clone())) } @@ -207,6 +208,8 @@ pub enum TxStatus { /// Number of peers it's been broadcast to. num_peers: u32, }, + /// Transaction is no longer in a best block. + NoLongerInBestBlock, /// Transaction has been included in block with given hash. InBestBlock(TxInBlock), /// Transaction has been finalized by a finality-gadget, e.g GRANDPA @@ -252,15 +255,15 @@ impl TxStatus { #[derive(Derivative)] #[derivative(Debug(bound = "C: std::fmt::Debug"))] pub struct TxInBlock { - block_hash: T::Hash, + block_ref: BlockRef, ext_hash: T::Hash, client: C, } impl TxInBlock { - pub(crate) fn new(block_hash: T::Hash, ext_hash: T::Hash, client: C) -> Self { + pub(crate) fn new(block_ref: BlockRef, ext_hash: T::Hash, client: C) -> Self { Self { - block_hash, + block_ref, ext_hash, client, } @@ -268,7 +271,7 @@ impl TxInBlock { /// Return the hash of the block that the transaction has made it into. pub fn block_hash(&self) -> T::Hash { - self.block_hash + self.block_ref.hash() } /// Return the hash of the extrinsic that was submitted. @@ -317,7 +320,7 @@ impl> TxInBlock { let block_body = self .client .backend() - .block_body(self.block_hash) + .block_body(self.block_ref.hash()) .await? .ok_or(Error::Transaction(TransactionError::BlockNotFound))?; @@ -336,7 +339,7 @@ impl> TxInBlock { .ok_or(Error::Transaction(TransactionError::BlockNotFound))?; let events = EventsClient::new(self.client.clone()) - .at(self.block_hash) + .at(self.block_ref.clone()) .await?; Ok(crate::blocks::ExtrinsicEvents::new( diff --git a/testing/integration-tests/Cargo.toml b/testing/integration-tests/Cargo.toml index 89f2d5dae3..48685bbc60 100644 --- a/testing/integration-tests/Cargo.toml +++ b/testing/integration-tests/Cargo.toml @@ -18,6 +18,10 @@ default = [] # Enable to run the tests with Light Client support. unstable-light-client = ["subxt/unstable-light-client"] +# Enable this to use the unstable backend in tests _instead of_ +# the default one which relies on the "old" RPC methods. +unstable-backend-client = [] + [dev-dependencies] assert_matches = { workspace = true } codec = { package = "parity-scale-codec", workspace = true, features = ["derive", "bit-vec"] } diff --git a/testing/integration-tests/src/full_client/blocks/mod.rs b/testing/integration-tests/src/full_client/blocks/mod.rs index ff473a7574..3ea80a4afe 100644 --- a/testing/integration-tests/src/full_client/blocks/mod.rs +++ b/testing/integration-tests/src/full_client/blocks/mod.rs @@ -5,30 +5,73 @@ use crate::{test_context, utils::node_runtime}; use codec::{Compact, Encode}; use futures::StreamExt; -use subxt::blocks::BlocksClient; use subxt_metadata::Metadata; use subxt_signer::sr25519::dev; -// Check that we can subscribe to non-finalized blocks. #[tokio::test] -async fn non_finalized_headers_subscription() -> Result<(), subxt::Error> { +async fn block_subscriptions_are_consistent_with_eachother() -> Result<(), subxt::Error> { let ctx = test_context().await; let api = ctx.client(); - let mut sub = api.blocks().subscribe_best().await?; + let mut all_sub = api.blocks().subscribe_all().await?; + let mut best_sub = api.blocks().subscribe_best().await?; + let mut finalized_sub = api.blocks().subscribe_finalized().await?; - // Wait for the next set of headers, and check that the - // associated block hash is the one we just finalized. - // (this can be a bit slow as we have to wait for finalization) - let header = sub.next().await.unwrap()?; - let block_hash = header.hash(); - let current_block_hash = api.backend().latest_best_block_ref().await?.hash(); + let mut finals = vec![]; + let mut bests = vec![]; + let mut alls = vec![]; + + // Finalization can run behind a bit; blocks that were reported a while ago can + // only just now be being finalized (in the new RPCs this isn't true and we'll be + // told about all of those blocks up front). So, first we wait until finalization reports + // a block that we've seen as new. + loop { + tokio::select! {biased; + Some(Ok(b)) = all_sub.next() => alls.push(b.hash()), + Some(Ok(b)) = best_sub.next() => bests.push(b.hash()), + Some(Ok(b)) = finalized_sub.next() => if alls.contains(&b.hash()) { break }, + } + } + + // Now, gather a couple more finalized blocks as well as anything else we hear about. + while finals.len() < 2 { + tokio::select! {biased; + Some(Ok(b)) = all_sub.next() => alls.push(b.hash()), + Some(Ok(b)) = best_sub.next() => bests.push(b.hash()), + Some(Ok(b)) = finalized_sub.next() => finals.push(b.hash()), + } + } + + // Check that the items in the first slice are found in the same order in the second slice. + fn are_same_order_in(a_items: &[T], b_items: &[T]) -> bool { + let mut b_idx = 0; + for a in a_items { + if let Some((idx, _)) = b_items[b_idx..] + .iter() + .enumerate() + .find(|(_idx, b)| a == *b) + { + b_idx += idx; + } else { + return false; + } + } + true + } + + // Final blocks and best blocks should both be subsets of _all_ of the blocks reported. + assert!( + are_same_order_in(&bests, &alls), + "Best set {bests:?} should be a subset of all: {alls:?}" + ); + assert!( + are_same_order_in(&finals, &alls), + "Final set {finals:?} should be a subset of all: {alls:?}" + ); - assert_eq!(block_hash, current_block_hash); Ok(()) } -// Check that we can subscribe to finalized blocks. #[tokio::test] async fn finalized_headers_subscription() -> Result<(), subxt::Error> { let ctx = test_context().await; @@ -36,13 +79,13 @@ async fn finalized_headers_subscription() -> Result<(), subxt::Error> { let mut sub = api.blocks().subscribe_finalized().await?; - // Wait for the next set of headers, and check that the - // associated block hash is the one we just finalized. - // (this can be a bit slow as we have to wait for finalization) - let header = sub.next().await.unwrap()?; - let finalized_hash = api.backend().latest_finalized_block_ref().await?.hash(); + // check that the finalized block reported lines up with the `latest_finalized_block_ref`. + for _ in 0..2 { + let header = sub.next().await.unwrap()?; + let finalized_hash = api.backend().latest_finalized_block_ref().await?.hash(); + assert_eq!(header.hash(), finalized_hash); + } - assert_eq!(header.hash(), finalized_hash); Ok(()) } @@ -117,17 +160,17 @@ async fn runtime_api_call() -> Result<(), subxt::Error> { } #[tokio::test] -async fn decode_extrinsics() { +async fn fetch_block_and_decode_extrinsic_details() { let ctx = test_context().await; let api = ctx.client(); let alice = dev::alice(); let bob = dev::bob(); - // Generate a block that has unsigned and signed transactions. + // Setup; put an extrinsic into a block: let tx = node_runtime::tx() .balances() - .transfer(bob.public_key().into(), 10_000); + .transfer_allow_death(bob.public_key().into(), 10_000); let signed_extrinsic = api .tx() @@ -143,19 +186,21 @@ async fn decode_extrinsics() { .await .unwrap(); + // Now, separately, download that block. Let's see what it contains.. let block_hash = in_block.block_hash(); - - let block = BlocksClient::new(api).at(block_hash).await.unwrap(); + let block = api.blocks().at(block_hash).await.unwrap(); let extrinsics = block.extrinsics().await.unwrap(); - assert_eq!(extrinsics.len(), 2); + assert_eq!(extrinsics.block_hash(), block_hash); + // `.has` should work and find a transfer call. assert!(extrinsics - .has::() + .has::() .unwrap()); + // `.find_first` should similarly work to find the transfer call: assert!(extrinsics - .find_first::() + .find_first::() .unwrap() .is_some()); @@ -164,7 +209,7 @@ async fn decode_extrinsics() { .map(|res| res.unwrap()) .collect::>(); - assert_eq!(block_extrinsics.len(), 2); + // All blocks contain a timestamp; check this first: let timestamp = block_extrinsics.get(0).unwrap(); timestamp.as_root_extrinsic::().unwrap(); timestamp @@ -172,9 +217,13 @@ async fn decode_extrinsics() { .unwrap(); assert!(!timestamp.is_signed()); + // Next we expect our transfer: let tx = block_extrinsics.get(1).unwrap(); tx.as_root_extrinsic::().unwrap(); - tx.as_extrinsic::() + let ext = tx + .as_extrinsic::() + .unwrap() .unwrap(); + assert_eq!(ext.value, 10_000); assert!(tx.is_signed()); } diff --git a/testing/integration-tests/src/full_client/client/mod.rs b/testing/integration-tests/src/full_client/client/mod.rs index 4740dab6ea..416fbfb19b 100644 --- a/testing/integration-tests/src/full_client/client/mod.rs +++ b/testing/integration-tests/src/full_client/client/mod.rs @@ -72,7 +72,7 @@ async fn transaction_validation() { let tx = node_runtime::tx() .balances() - .transfer(bob.public_key().into(), 10_000); + .transfer_allow_death(bob.public_key().into(), 10_000); let signed_extrinsic = api .tx() @@ -110,7 +110,7 @@ async fn validation_fails() { // The actual TX is not important; the account has no funds to pay for it. let tx = node_runtime::tx() .balances() - .transfer(to.public_key().into(), 1); + .transfer_allow_death(to.public_key().into(), 1); let signed_extrinsic = api .tx() @@ -232,7 +232,7 @@ async fn unsigned_extrinsic_is_same_shape_as_polkadotjs() { let tx = node_runtime::tx() .balances() - .transfer(dev::alice().public_key().into(), 12345000000000000); + .transfer_allow_death(dev::alice().public_key().into(), 12345000000000000); let actual_tx = api.tx().create_unsigned(&tx).unwrap(); @@ -242,10 +242,10 @@ async fn unsigned_extrinsic_is_same_shape_as_polkadotjs() { // - start local substrate node. // - open polkadot.js UI in browser and point at local node. // - open dev console (may need to refresh page now) and find the WS connection. - // - create a balances.transfer to ALICE with 12345 and "submit unsigned". + // - create a balances.transferAllowDeath to ALICE (doesn't matter who from) with 12345 and "submit unsigned". // - find the submitAndWatchExtrinsic call in the WS connection to get these bytes: let expected_tx_bytes = hex::decode( - "b004060700d43593c715fdd31c61141abd04a99fd6822c8558854ccde39a5684e7a56da27d0f0090c04bb6db2b" + "b004060000d43593c715fdd31c61141abd04a99fd6822c8558854ccde39a5684e7a56da27d0f0090c04bb6db2b" ) .unwrap(); @@ -261,7 +261,7 @@ async fn extrinsic_hash_is_same_as_returned() { let payload = node_runtime::tx() .balances() - .transfer(dev::alice().public_key().into(), 12345000000000000); + .transfer_allow_death(dev::alice().public_key().into(), 12345000000000000); let tx = api .tx() @@ -314,7 +314,7 @@ async fn partial_fee_estimate_correct() { let bob = dev::bob(); let tx = node_runtime::tx() .balances() - .transfer(bob.public_key().into(), 1_000_000_000_000); + .transfer_allow_death(bob.public_key().into(), 1_000_000_000_000); let signed_extrinsic = api .tx() @@ -326,7 +326,7 @@ async fn partial_fee_estimate_correct() { let partial_fee_1 = signed_extrinsic.partial_fee_estimate().await.unwrap(); // Method II: TransactionPaymentApi_query_fee_details + calculations - let latest_block_ref = api.backend().latest_best_block_ref().await.unwrap(); + let latest_block_ref = api.backend().latest_finalized_block_ref().await.unwrap(); let len_bytes: [u8; 4] = (signed_extrinsic.encoded().len() as u32).to_le_bytes(); let encoded_with_len = [signed_extrinsic.encoded(), &len_bytes[..]].concat(); let InclusionFee { diff --git a/testing/integration-tests/src/full_client/client/unstable_rpcs.rs b/testing/integration-tests/src/full_client/client/unstable_rpcs.rs index 4c5fdddc8d..2252abc3e4 100644 --- a/testing/integration-tests/src/full_client/client/unstable_rpcs.rs +++ b/testing/integration-tests/src/full_client/client/unstable_rpcs.rs @@ -8,8 +8,8 @@ use crate::{test_context, utils::node_runtime}; use assert_matches::assert_matches; use codec::Encode; +use futures::Stream; use subxt::{ - backend::rpc::RpcSubscription, backend::unstable::rpc_methods::{ FollowEvent, Initialized, MethodResponse, RuntimeEvent, RuntimeVersionEvent, StorageQuery, StorageQueryType, @@ -153,7 +153,7 @@ async fn chainhead_unstable_storage() { event, FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && res.items.len() == 1 && - res.items[0].key == format!("0x{}", hex::encode(addr_bytes)) + res.items[0].key.0 == addr_bytes ); let event = next_operation_event(&mut blocks).await; @@ -218,8 +218,6 @@ async fn chainhead_unstable_unpin() { assert!(rpc.chainhead_unstable_unpin(sub_id, hash).await.is_err()); } -// Ignored until this is implemented in Substrate -#[ignore] #[tokio::test] async fn chainspec_v1_genesishash() { let ctx = test_context().await; @@ -232,22 +230,18 @@ async fn chainspec_v1_genesishash() { assert_eq!(a, b); } -// Ignored until this is implemented in Substrate -#[ignore] #[tokio::test] async fn chainspec_v1_chainname() { let ctx = test_context().await; let old_rpc = ctx.legacy_rpc_methods().await; let rpc = ctx.unstable_rpc_methods().await; - let a = old_rpc.system_name().await.unwrap(); + let a = old_rpc.system_chain().await.unwrap(); let b = rpc.chainspec_v1_chain_name().await.unwrap(); assert_eq!(a, b); } -// Ignored until this is implemented in Substrate -#[ignore] #[tokio::test] async fn chainspec_v1_properties() { let ctx = test_context().await; @@ -290,9 +284,14 @@ async fn transaction_unstable_submit_and_watch() { } /// Ignore block related events and obtain the next event related to an operation. -async fn next_operation_event( - sub: &mut RpcSubscription>, +async fn next_operation_event< + T: serde::de::DeserializeOwned, + S: Unpin + Stream, subxt::Error>>, +>( + sub: &mut S, ) -> FollowEvent { + use futures::StreamExt; + // Number of events to wait for the next operation event. const NUM_EVENTS: usize = 10; diff --git a/testing/integration-tests/src/full_client/frame/balances.rs b/testing/integration-tests/src/full_client/frame/balances.rs index 35e4a0ab03..d4ef0392de 100644 --- a/testing/integration-tests/src/full_client/frame/balances.rs +++ b/testing/integration-tests/src/full_client/frame/balances.rs @@ -41,7 +41,9 @@ async fn tx_basic_transfer() -> Result<(), subxt::Error> { .fetch_or_default(&bob_account_addr) .await?; - let tx = node_runtime::tx().balances().transfer(bob_address, 10_000); + let tx = node_runtime::tx() + .balances() + .transfer_allow_death(bob_address, 10_000); let events = api .tx() @@ -118,7 +120,7 @@ async fn tx_dynamic_transfer() -> Result<(), subxt::Error> { let tx = subxt::dynamic::tx( "Balances", - "transfer", + "transfer_allow_death", vec![ Value::unnamed_variant( "Id", @@ -206,14 +208,14 @@ async fn tx_dynamic_transfer() -> Result<(), subxt::Error> { } #[tokio::test] -async fn multiple_transfers_work_nonce_incremented() -> Result<(), subxt::Error> { +async fn multiple_sequential_transfers_work() -> Result<(), subxt::Error> { let alice = dev::alice(); let bob = dev::bob(); let bob_address: MultiAddress = bob.public_key().into(); let ctx = test_context().await; let api = ctx.client(); - let bob_account_addr = node_runtime::storage() + let bob_account_info_addr = node_runtime::storage() .system() .account(bob.public_key().to_account_id()); @@ -221,19 +223,19 @@ async fn multiple_transfers_work_nonce_incremented() -> Result<(), subxt::Error> .storage() .at_latest() .await? - .fetch_or_default(&bob_account_addr) + .fetch_or_default(&bob_account_info_addr) .await?; + // Do a transfer several times. If this works, it indicates that the + // nonce is properly incremented each time. let tx = node_runtime::tx() .balances() - .transfer(bob_address.clone(), 10_000); + .transfer_allow_death(bob_address.clone(), 10_000); for _ in 0..3 { api.tx() .sign_and_submit_then_watch_default(&tx, &alice) .await? - .wait_for_in_block() // Don't need to wait for finalization; this is quicker. - .await? - .wait_for_success() + .wait_for_finalized_success() .await?; } @@ -241,7 +243,7 @@ async fn multiple_transfers_work_nonce_incremented() -> Result<(), subxt::Error> .storage() .at_latest() .await? - .fetch_or_default(&bob_account_addr) + .fetch_or_default(&bob_account_info_addr) .await?; assert_eq!(bob_pre.data.free + 30_000, bob_post.data.free); @@ -317,10 +319,10 @@ async fn transfer_error() { let to_bob_tx = node_runtime::tx() .balances() - .transfer(bob_address, 100_000_000_000_000_000); + .transfer_allow_death(bob_address, 100_000_000_000_000_000); let to_alice_tx = node_runtime::tx() .balances() - .transfer(alice_addr, 100_000_000_000_000_000); + .transfer_allow_death(alice_addr, 100_000_000_000_000_000); api.tx() .sign_and_submit_then_watch_default(&to_bob_tx, &alice) @@ -360,7 +362,7 @@ async fn transfer_implicit_subscription() { let to_bob_tx = node_runtime::tx() .balances() - .transfer(bob.clone().into(), 10_000); + .transfer_allow_death(bob.clone().into(), 10_000); let event = api .tx() diff --git a/testing/integration-tests/src/full_client/frame/contracts.rs b/testing/integration-tests/src/full_client/frame/contracts.rs index fcd0ca9663..080c456ebb 100644 --- a/testing/integration-tests/src/full_client/frame/contracts.rs +++ b/testing/integration-tests/src/full_client/frame/contracts.rs @@ -203,7 +203,6 @@ async fn tx_call() { let info_addr = node_runtime::storage() .contracts() .contract_info_of(&contract); - let info_addr_iter = node_runtime::storage().contracts().contract_info_of_iter(); let contract_info = cxt .client() @@ -213,7 +212,13 @@ async fn tx_call() { .unwrap() .fetch(&info_addr) .await; - assert!(contract_info.is_ok()); + + assert!( + contract_info.is_ok(), + "Contract info is not ok, is: {contract_info:?}" + ); + + let info_addr_iter = node_runtime::storage().contracts().contract_info_of_iter(); let keys_and_values = cxt .client() diff --git a/testing/integration-tests/src/full_client/frame/sudo.rs b/testing/integration-tests/src/full_client/frame/sudo.rs index 30ae462fcb..fad808d563 100644 --- a/testing/integration-tests/src/full_client/frame/sudo.rs +++ b/testing/integration-tests/src/full_client/frame/sudo.rs @@ -23,7 +23,7 @@ async fn test_sudo() -> Result<(), subxt::Error> { let alice = dev::alice(); let bob = dev::bob().public_key().into(); - let call = Call::Balances(BalancesCall::transfer { + let call = Call::Balances(BalancesCall::transfer_allow_death { dest: bob, value: 10_000, }); @@ -49,7 +49,7 @@ async fn test_sudo_unchecked_weight() -> Result<(), subxt::Error> { let alice = dev::alice(); let bob = dev::bob().public_key().into(); - let call = Call::Balances(BalancesCall::transfer { + let call = Call::Balances(BalancesCall::transfer_allow_death { dest: bob, value: 10_000, }); diff --git a/testing/integration-tests/src/full_client/runtime_api/mod.rs b/testing/integration-tests/src/full_client/runtime_api/mod.rs index 9027f6c00d..2a637f9aff 100644 --- a/testing/integration-tests/src/full_client/runtime_api/mod.rs +++ b/testing/integration-tests/src/full_client/runtime_api/mod.rs @@ -59,7 +59,9 @@ async fn unchecked_extrinsic_encoding() -> Result<(), subxt::Error> { let bob_address = bob.public_key().to_address(); // Construct a tx from Alice to Bob. - let tx = node_runtime::tx().balances().transfer(bob_address, 10_000); + let tx = node_runtime::tx() + .balances() + .transfer_allow_death(bob_address, 10_000); let signed_extrinsic = api .tx() diff --git a/testing/integration-tests/src/lib.rs b/testing/integration-tests/src/lib.rs index 17a9af1f0b..0eeb3e607f 100644 --- a/testing/integration-tests/src/lib.rs +++ b/testing/integration-tests/src/lib.rs @@ -2,6 +2,11 @@ // This file is dual-licensed as Apache-2.0 or GPL-3.0. // see LICENSE for license details. +#[cfg(all(feature = "unstable-light-client", feature = "unstable-backend-client"))] +compile_error!( + "The features 'unstable-light-client' and 'unstable-backend-client' cannot be used together" +); + #[cfg(test)] pub mod utils; diff --git a/testing/integration-tests/src/light_client/mod.rs b/testing/integration-tests/src/light_client/mod.rs index c24c91d8bf..0694273ba5 100644 --- a/testing/integration-tests/src/light_client/mod.rs +++ b/testing/integration-tests/src/light_client/mod.rs @@ -40,11 +40,6 @@ type Client = LightClient; // Check that we can subscribe to non-finalized blocks. async fn non_finalized_headers_subscription(api: &Client) -> Result<(), subxt::Error> { let mut sub = api.blocks().subscribe_best().await?; - let header = sub.next().await.unwrap()?; - let block_hash = header.hash(); - let current_block_hash = api.backend().latest_best_block_ref().await.unwrap().hash(); - - assert_eq!(block_hash, current_block_hash); let _block = sub.next().await.unwrap()?; let _block = sub.next().await.unwrap()?; diff --git a/testing/integration-tests/src/utils/node_proc.rs b/testing/integration-tests/src/utils/node_proc.rs index 7bc390818d..b49fa8125a 100644 --- a/testing/integration-tests/src/utils/node_proc.rs +++ b/testing/integration-tests/src/utils/node_proc.rs @@ -3,6 +3,7 @@ // see LICENSE for license details. use std::ffi::{OsStr, OsString}; +use std::sync::Arc; use substrate_runner::SubstrateNode; use subxt::{ backend::{legacy, rpc, unstable}, @@ -118,9 +119,14 @@ impl TestNodeProcessBuilder { #[cfg(feature = "unstable-light-client")] let client = build_light_client(&proc).await; - // Connect to the node with a subxt client: - #[cfg(not(feature = "unstable-light-client"))] - let client = OnlineClient::from_url(ws_url.clone()).await; + #[cfg(feature = "unstable-backend-client")] + let client = build_unstable_client(&proc).await; + + #[cfg(all( + not(feature = "unstable-light-client"), + not(feature = "unstable-backend-client") + ))] + let client = build_legacy_client(&proc).await; match client { Ok(client) => Ok(TestNodeProcess { proc, client }), @@ -129,13 +135,59 @@ impl TestNodeProcessBuilder { } } +#[cfg(all( + not(feature = "unstable-light-client"), + not(feature = "unstable-backend-client") +))] +async fn build_legacy_client(proc: &SubstrateNode) -> Result, String> { + let ws_url = format!("ws://127.0.0.1:{}", proc.ws_port()); + + let rpc_client = rpc::RpcClient::from_url(ws_url) + .await + .map_err(|e| format!("Cannot construct RPC client: {e}"))?; + let backend = legacy::LegacyBackend::new(rpc_client); + let client = OnlineClient::from_backend(Arc::new(backend)) + .await + .map_err(|e| format!("Cannot construct OnlineClient from backend: {e}"))?; + + Ok(client) +} + +#[cfg(feature = "unstable-backend-client")] +async fn build_unstable_client(proc: &SubstrateNode) -> Result, String> { + let ws_url = format!("ws://127.0.0.1:{}", proc.ws_port()); + + let rpc_client = rpc::RpcClient::from_url(ws_url) + .await + .map_err(|e| format!("Cannot construct RPC client: {e}"))?; + + let (backend, mut driver) = unstable::UnstableBackend::builder().build(rpc_client); + + // The unstable backend needs driving: + tokio::spawn(async move { + use futures::StreamExt; + while let Some(val) = driver.next().await { + if let Err(e) = val { + eprintln!("Error driving unstable backend: {e}"); + break; + } + } + }); + + let client = OnlineClient::from_backend(Arc::new(backend)) + .await + .map_err(|e| format!("Cannot construct OnlineClient from backend: {e}"))?; + + Ok(client) +} + #[cfg(feature = "unstable-light-client")] -async fn build_light_client(proc: &SubstrateNode) -> Result, String> { +async fn build_light_client(proc: &SubstrateNode) -> Result, String> { // RPC endpoint. let ws_url = format!("ws://127.0.0.1:{}", proc.ws_port()); // Step 1. Wait for a few blocks to be produced using the subxt client. - let client = OnlineClient::::from_url(ws_url.clone()) + let client = OnlineClient::::from_url(ws_url.clone()) .await .map_err(|err| format!("Failed to connect to node rpc at {ws_url}: {err}"))?; diff --git a/testing/integration-tests/src/utils/wait_for_blocks.rs b/testing/integration-tests/src/utils/wait_for_blocks.rs index b21180a24a..a426fc239d 100644 --- a/testing/integration-tests/src/utils/wait_for_blocks.rs +++ b/testing/integration-tests/src/utils/wait_for_blocks.rs @@ -4,23 +4,13 @@ use subxt::{client::OnlineClientT, Config}; -/// Wait for blocks to be produced before running tests. Waiting for two blocks -/// (the genesis block and another one) seems to be enough to allow tests -/// like `validation_passes` to work properly. -/// -/// If the "unstable-light-client" feature flag is enabled, this will wait for -/// 5 blocks instead of two. The light client needs the extra blocks to avoid -/// errors caused by loading information that is not available in the first 2 blocks -/// (`Failed to load the block weight for block`). +/// Wait for blocks to be produced before running tests. Specifically, we +/// wait for one more finalized block to be produced, which is important because +/// the first finalized block doesn't have much state etc associated with it. pub async fn wait_for_blocks(api: &impl OnlineClientT) { - let mut sub = api.backend().stream_all_block_headers().await.unwrap(); + let mut sub = api.blocks().subscribe_finalized().await.unwrap(); + // The current finalized block: sub.next().await; + // The next one: sub.next().await; - - #[cfg(feature = "unstable-light-client")] - { - sub.next().await; - sub.next().await; - sub.next().await; - } } diff --git a/testing/test-runtime/build.rs b/testing/test-runtime/build.rs index 2cc3a6d670..23643e491d 100644 --- a/testing/test-runtime/build.rs +++ b/testing/test-runtime/build.rs @@ -63,7 +63,7 @@ async fn run() { // Save metadata to a file: let out_dir = env::var_os("OUT_DIR").unwrap(); - let metadata_path = Path::new(&out_dir).join("metadata.scale"); + let metadata_path = Path::new(&out_dir).join("test_node_runtime_metadata.scale"); fs::write(&metadata_path, metadata_bytes).expect("Couldn't write metadata output"); // Write out our expression to generate the runtime API to a file. Ideally, we'd just write this code diff --git a/testing/test-runtime/src/lib.rs b/testing/test-runtime/src/lib.rs index fb90001dc9..db922c32e6 100644 --- a/testing/test-runtime/src/lib.rs +++ b/testing/test-runtime/src/lib.rs @@ -5,6 +5,9 @@ #![allow(clippy::too_many_arguments)] /// The SCALE encoded metadata obtained from a local run of a substrate node. -pub static METADATA: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/metadata.scale")); +pub static METADATA: &[u8] = include_bytes!(concat!( + env!("OUT_DIR"), + "/test_node_runtime_metadata.scale" +)); include!(concat!(env!("OUT_DIR"), "/runtime.rs")); diff --git a/testing/ui-tests/src/correct/generic_params.rs b/testing/ui-tests/src/correct/generic_params.rs index 30f024fe12..3e8dab9aae 100644 --- a/testing/ui-tests/src/correct/generic_params.rs +++ b/testing/ui-tests/src/correct/generic_params.rs @@ -103,33 +103,33 @@ fn main() { // We assume Polkadot's config of MultiAddress here let _ = node_runtime::tx() .balances() - .transfer(CustomAddress(1337), 123); + .transfer_allow_death(CustomAddress(1337), 123); let _ = node_runtime2::tx() .balances() - .transfer(Generic(AccountId32::from([0x01;32])), 123); + .transfer_allow_death(Generic(AccountId32::from([0x01;32])), 123); let _ = node_runtime3::tx() .balances() - .transfer(Generic(()), 123); + .transfer_allow_death(Generic(()), 123); let _ = node_runtime4::tx() .balances() - .transfer(Second((), AccountId32::from([0x01;32])), 123); + .transfer_allow_death(Second((), AccountId32::from([0x01;32])), 123); let _ = node_runtime5::tx() .balances() - .transfer(Second(true, vec![1u8, 2u8]), 123); + .transfer_allow_death(Second(true, vec![1u8, 2u8]), 123); let _ = node_runtime6::tx() .balances() - .transfer(Second((), 1234u16), 123); + .transfer_allow_death(Second((), 1234u16), 123); let _ = node_runtime7::tx() .balances() - .transfer(subxt::utils::Static(DoesntImplEncodeDecodeAsType(1337)), 123); + .transfer_allow_death(subxt::utils::Static(DoesntImplEncodeDecodeAsType(1337)), 123); let _ = node_runtime8::tx() .balances() - .transfer(subxt::utils::Static(Second(AccountId32::from([0x01;32]), ())), 123); + .transfer_allow_death(subxt::utils::Static(Second(AccountId32::from([0x01;32]), ())), 123); }