diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index ae225184c5..f6c2df81d6 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -11122,6 +11122,7 @@ dependencies = [ "sp-runtime", "sp-state-machine", "sp-version", + "tokio", "zstd", ] diff --git a/substrate/utils/frame/try-runtime/cli/Cargo.toml b/substrate/utils/frame/try-runtime/cli/Cargo.toml index dd98854e84..56ead30644 100644 --- a/substrate/utils/frame/try-runtime/cli/Cargo.toml +++ b/substrate/utils/frame/try-runtime/cli/Cargo.toml @@ -32,3 +32,6 @@ sp-runtime = { version = "6.0.0", path = "../../../../primitives/runtime" } sp-state-machine = { version = "0.12.0", path = "../../../../primitives/state-machine" } sp-version = { version = "5.0.0", path = "../../../../primitives/version" } frame-try-runtime = { path = "../../../../frame/try-runtime" } + +[dev-dependencies] +tokio = "1.17.0" diff --git a/substrate/utils/frame/try-runtime/cli/src/commands/follow_chain.rs b/substrate/utils/frame/try-runtime/cli/src/commands/follow_chain.rs index 01fc1dae15..9f598694d0 100644 --- a/substrate/utils/frame/try-runtime/cli/src/commands/follow_chain.rs +++ b/substrate/utils/frame/try-runtime/cli/src/commands/follow_chain.rs @@ -20,16 +20,20 @@ use crate::{ state_machine_call_with_proof, SharedParams, LOG_TARGET, }; use jsonrpsee::{ - core::client::{Subscription, SubscriptionClientT}, + core::{ + async_trait, + client::{Client, Subscription, SubscriptionClientT}, + }, ws_client::WsClientBuilder, }; use parity_scale_codec::{Decode, Encode}; use remote_externalities::{rpc_api, Builder, Mode, OnlineConfig}; use sc_executor::NativeExecutionDispatch; use sc_service::Configuration; +use serde::de::DeserializeOwned; use sp_core::H256; -use sp_runtime::traits::{Block as BlockT, Header, NumberFor}; -use std::{fmt::Debug, str::FromStr}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; +use std::{collections::VecDeque, fmt::Debug, marker::PhantomData, str::FromStr}; const SUB: &str = "chain_subscribeFinalizedHeads"; const UN_SUB: &str = "chain_unsubscribeFinalizedHeads"; @@ -58,51 +62,182 @@ pub struct FollowChainCmd { try_state: frame_try_runtime::TryStateSelect, } +/// Start listening for with `SUB` at `url`. +/// +/// Returns a pair `(client, subscription)` - `subscription` alone will be useless, because it +/// relies on the related alive `client`. +async fn start_subscribing( + url: &str, +) -> sc_cli::Result<(Client, Subscription
)> { + let client = WsClientBuilder::default() + .connection_timeout(std::time::Duration::new(20, 0)) + .max_notifs_per_subscription(1024) + .max_request_body_size(u32::MAX) + .build(url) + .await + .map_err(|e| sc_cli::Error::Application(e.into()))?; + + log::info!(target: LOG_TARGET, "subscribing to {:?} / {:?}", SUB, UN_SUB); + + let sub = client + .subscribe(SUB, None, UN_SUB) + .await + .map_err(|e| sc_cli::Error::Application(e.into()))?; + Ok((client, sub)) +} + +/// Abstraction over RPC calling for headers. +#[async_trait] +trait HeaderProvider +where + Block::Header: HeaderT, +{ + /// Awaits for the header of the block with hash `hash`. + async fn get_header(&mut self, hash: Block::Hash) -> Block::Header; +} + +struct RpcHeaderProvider { + uri: String, + _phantom: PhantomData, +} + +#[async_trait] +impl HeaderProvider for RpcHeaderProvider +where + Block::Header: DeserializeOwned, +{ + async fn get_header(&mut self, hash: Block::Hash) -> Block::Header { + rpc_api::get_header::(&self.uri, hash).await.unwrap() + } +} + +/// Abstraction over RPC subscription for finalized headers. +#[async_trait] +trait HeaderSubscription +where + Block::Header: HeaderT, +{ + /// Await for the next finalized header from the subscription. + /// + /// Returns `None` if either the subscription has been closed or there was an error when reading + /// an object from the client. + async fn next_header(&mut self) -> Option; +} + +#[async_trait] +impl HeaderSubscription for Subscription +where + Block::Header: DeserializeOwned, +{ + async fn next_header(&mut self) -> Option { + match self.next().await { + Some(Ok(header)) => Some(header), + None => { + log::warn!("subscription closed"); + None + }, + Some(Err(why)) => { + log::warn!("subscription returned error: {:?}. Probably decoding has failed.", why); + None + }, + } + } +} + +/// Stream of all finalized headers. +/// +/// Returned headers are guaranteed to be ordered. There are no missing headers (even if some of +/// them lack justification). +struct FinalizedHeaders, HS: HeaderSubscription> { + header_provider: HP, + subscription: HS, + fetched_headers: VecDeque, + last_returned: Option<::Hash>, +} + +impl, HS: HeaderSubscription> + FinalizedHeaders +where + ::Header: DeserializeOwned, +{ + pub fn new(header_provider: HP, subscription: HS) -> Self { + Self { + header_provider, + subscription, + fetched_headers: VecDeque::new(), + last_returned: None, + } + } + + /// Reads next finalized header from the subscription. If some headers (without justification) + /// have been skipped, fetches them as well. Returns number of headers that have been fetched. + /// + /// All fetched headers are stored in `self.fetched_headers`. + async fn fetch(&mut self) -> usize { + let last_finalized = match self.subscription.next_header().await { + Some(header) => header, + None => return 0, + }; + + self.fetched_headers.push_front(last_finalized.clone()); + + let mut last_finalized_parent = *last_finalized.parent_hash(); + let last_returned = self.last_returned.unwrap_or(last_finalized_parent); + + while last_finalized_parent != last_returned { + let parent_header = self.header_provider.get_header(last_finalized_parent).await; + self.fetched_headers.push_front(parent_header.clone()); + last_finalized_parent = *parent_header.parent_hash(); + } + + self.fetched_headers.len() + } + + /// Get the next finalized header. + pub async fn next(&mut self) -> Option { + if self.fetched_headers.is_empty() { + self.fetch().await; + } + + if let Some(header) = self.fetched_headers.pop_front() { + self.last_returned = Some(header.hash()); + Some(header) + } else { + None + } + } +} + pub(crate) async fn follow_chain( shared: SharedParams, command: FollowChainCmd, config: Configuration, ) -> sc_cli::Result<()> where - Block: BlockT + serde::de::DeserializeOwned, + Block: BlockT + DeserializeOwned, Block::Hash: FromStr, - Block::Header: serde::de::DeserializeOwned, + Block::Header: DeserializeOwned, ::Err: Debug, NumberFor: FromStr, as FromStr>::Err: Debug, ExecDispatch: NativeExecutionDispatch + 'static, { let mut maybe_state_ext = None; - - let client = WsClientBuilder::default() - .connection_timeout(std::time::Duration::new(20, 0)) - .max_notifs_per_subscription(1024) - .max_request_body_size(u32::MAX) - .build(&command.uri) - .await - .unwrap(); - - log::info!(target: LOG_TARGET, "subscribing to {:?} / {:?}", SUB, UN_SUB); - let mut subscription: Subscription = - client.subscribe(SUB, None, UN_SUB).await.unwrap(); + let (_client, subscription) = start_subscribing::(&command.uri).await?; let (code_key, code) = extract_code(&config.chain_spec)?; let executor = build_executor::(&shared, &config); let execution = shared.execution; - loop { - let header = match subscription.next().await { - Some(Ok(header)) => header, - None => { - log::warn!("subscription closed"); - break - }, - Some(Err(why)) => { - log::warn!("subscription returned error: {:?}. Probably decoding has failed.", why); - continue - }, - }; + let header_provider: RpcHeaderProvider = + RpcHeaderProvider { uri: command.uri.clone(), _phantom: PhantomData {} }; + let mut finalized_headers: FinalizedHeaders< + Block, + RpcHeaderProvider, + Subscription, + > = FinalizedHeaders::new(header_provider, subscription); + while let Some(header) = finalized_headers.next().await { let hash = header.hash(); let number = header.number(); @@ -193,3 +328,74 @@ where log::error!(target: LOG_TARGET, "ws subscription must have terminated."); Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use sp_runtime::testing::{Block as TBlock, ExtrinsicWrapper, Header}; + + type Block = TBlock>; + type BlockNumber = u64; + type Hash = H256; + + struct MockHeaderProvider(pub VecDeque); + + fn headers() -> Vec
{ + let mut headers = vec![Header::new_from_number(0)]; + for n in 1..11 { + headers.push(Header { + parent_hash: headers.last().unwrap().hash(), + ..Header::new_from_number(n) + }) + } + headers + } + + #[async_trait] + impl HeaderProvider for MockHeaderProvider { + async fn get_header(&mut self, _hash: Hash) -> Header { + let height = self.0.pop_front().unwrap(); + headers()[height as usize].clone() + } + } + + struct MockHeaderSubscription(pub VecDeque); + + #[async_trait] + impl HeaderSubscription for MockHeaderSubscription { + async fn next_header(&mut self) -> Option
{ + self.0.pop_front().map(|h| headers()[h as usize].clone()) + } + } + + #[tokio::test] + async fn finalized_headers_works_when_every_block_comes_from_subscription() { + let heights = vec![4, 5, 6, 7]; + + let provider = MockHeaderProvider(vec![].into()); + let subscription = MockHeaderSubscription(heights.clone().into()); + let mut headers = FinalizedHeaders::new(provider, subscription); + + for h in heights { + assert_eq!(h, headers.next().await.unwrap().number); + } + assert_eq!(None, headers.next().await); + } + + #[tokio::test] + async fn finalized_headers_come_from_subscription_and_provider_if_in_need() { + let all_heights = 3..11; + let heights_in_subscription = vec![3, 4, 6, 10]; + // Consecutive headers will be requested in the reversed order. + let heights_not_in_subscription = vec![5, 9, 8, 7]; + + let provider = MockHeaderProvider(heights_not_in_subscription.into()); + let subscription = MockHeaderSubscription(heights_in_subscription.into()); + let mut headers = FinalizedHeaders::new(provider, subscription); + + for h in all_heights { + assert_eq!(h, headers.next().await.unwrap().number); + } + assert_eq!(None, headers.next().await); + } +}