fix(runtime update): wait until upgrade on chain (#1321)

* fix(runtime update): wait until upgrade on chain

* address grumbles

* Update subxt/src/client/online_client.rs

* Update subxt/src/client/online_client.rs

* fix nits and debug logs

* remove debug logs
This commit is contained in:
Niklas Adolfsson
2024-01-08 15:48:20 +01:00
committed by GitHub
parent c5948dc967
commit 298869bd98
+64 -14
View File
@@ -8,7 +8,7 @@ use crate::{
backend::{
legacy::LegacyBackend, rpc::RpcClient, Backend, BackendExt, RuntimeVersion, StreamOfResults,
},
blocks::BlocksClient,
blocks::{BlockRef, BlocksClient},
constants::ConstantsClient,
error::Error,
events::EventsClient,
@@ -430,24 +430,18 @@ pub struct RuntimeUpdaterStream<T: Config> {
impl<T: Config> RuntimeUpdaterStream<T> {
/// Wait for the next runtime update.
pub async fn next(&mut self) -> Option<Result<Update, Error>> {
let maybe_runtime_version = self.stream.next().await?;
let runtime_version = match maybe_runtime_version {
let runtime_version = match self.stream.next().await? {
Ok(runtime_version) => runtime_version,
Err(err) => return Some(Err(err)),
};
let latest_block_ref = match self.client.backend().latest_finalized_block_ref().await {
Ok(block_ref) => block_ref,
Err(e) => return Some(Err(e)),
};
let at =
match wait_runtime_upgrade_in_finalized_block(&self.client, &runtime_version).await? {
Ok(at) => at,
Err(err) => return Some(Err(err)),
};
let metadata = match OnlineClient::fetch_metadata(
self.client.backend(),
latest_block_ref.hash(),
)
.await
{
let metadata = match OnlineClient::fetch_metadata(self.client.backend(), at.hash()).await {
Ok(metadata) => metadata,
Err(err) => return Some(Err(err)),
};
@@ -484,3 +478,59 @@ impl Update {
&self.metadata
}
}
/// Helper to wait until the runtime upgrade is applied on at finalized block.
async fn wait_runtime_upgrade_in_finalized_block<T: Config>(
client: &OnlineClient<T>,
runtime_version: &RuntimeVersion,
) -> Option<Result<BlockRef<T::Hash>, Error>> {
use scale_value::At;
let mut block_sub = match client.backend().stream_finalized_block_headers().await {
Ok(s) => s,
Err(err) => return Some(Err(err)),
};
let block_ref = loop {
let (_, block_ref) = match block_sub.next().await? {
Ok(n) => n,
Err(err) => return Some(Err(err)),
};
let key: Vec<scale_value::Value> = vec![];
let addr = crate::dynamic::storage("System", "LastRuntimeUpgrade", key);
let chunk = match client.storage().at(block_ref.hash()).fetch(&addr).await {
Ok(Some(v)) => v,
Ok(None) => {
// The storage `system::lastRuntimeUpgrade` should always exist.
// <https://github.com/paritytech/polkadot-sdk/blob/master/substrate/frame/system/src/lib.rs#L958>
unreachable!("The storage item `system::lastRuntimeUpgrade` should always exist")
}
Err(e) => return Some(Err(e)),
};
let scale_val = match chunk.to_value() {
Ok(v) => v,
Err(e) => return Some(Err(e)),
};
let Some(Ok(spec_version)) = scale_val
.at("spec_version")
.and_then(|v| v.as_u128())
.map(u32::try_from)
else {
return Some(Err(Error::Other(
"Decoding `RuntimeVersion::spec_version` as u32 failed".to_string(),
)));
};
// We are waiting for the chain to have the same spec version
// as sent out via the runtime subscription.
if spec_version == runtime_version.spec_version {
break block_ref;
}
};
Some(Ok(block_ref))
}