mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 16:31:07 +00:00
feat: add low-level runtime upgrade API (#657)
* feat: add low-level runtime upgrade API * grumbles * cargo fmt * dont use apply_update in perform_runtime_upgrades * fix nit * remove metadata check * fix doc tests * fix final comments * Update subxt/src/client/online_client.rs * cargo fmt * Update subxt/src/client/online_client.rs
This commit is contained in:
@@ -16,6 +16,9 @@ pub use offline_client::{
|
||||
OfflineClientT,
|
||||
};
|
||||
pub use online_client::{
|
||||
ClientRuntimeUpdater,
|
||||
OnlineClient,
|
||||
OnlineClientT,
|
||||
RuntimeUpdaterStream,
|
||||
UpgradeError,
|
||||
};
|
||||
|
||||
@@ -14,6 +14,7 @@ use crate::{
|
||||
Rpc,
|
||||
RpcClientT,
|
||||
RuntimeVersion,
|
||||
Subscription,
|
||||
},
|
||||
storage::StorageClient,
|
||||
tx::TxClient,
|
||||
@@ -33,7 +34,7 @@ pub trait OnlineClientT<T: Config>: OfflineClientT<T> {
|
||||
}
|
||||
|
||||
/// A client that can be used to perform API calls (that is, either those
|
||||
/// requiriing an [`OfflineClientT`] or those requiring an [`OnlineClientT`]).
|
||||
/// requiring an [`OfflineClientT`] or those requiring an [`OnlineClientT`]).
|
||||
#[derive(Derivative)]
|
||||
#[derivative(Clone(bound = ""))]
|
||||
pub struct OnlineClient<T: Config> {
|
||||
@@ -102,7 +103,7 @@ impl<T: Config> OnlineClient<T> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Create an object which can be used to keep the runtime uptodate
|
||||
/// Create an object which can be used to keep the runtime up to date
|
||||
/// in a separate thread.
|
||||
///
|
||||
/// # Example
|
||||
@@ -114,10 +115,33 @@ impl<T: Config> OnlineClient<T> {
|
||||
///
|
||||
/// let client = OnlineClient::<PolkadotConfig>::new().await.unwrap();
|
||||
///
|
||||
/// // high level API.
|
||||
///
|
||||
/// let update_task = client.subscribe_to_updates();
|
||||
/// tokio::spawn(async move {
|
||||
/// update_task.perform_runtime_updates().await;
|
||||
/// });
|
||||
///
|
||||
///
|
||||
/// // low level API.
|
||||
///
|
||||
/// let updater = client.subscribe_to_updates();
|
||||
/// tokio::spawn(async move {
|
||||
/// let mut update_stream = updater.runtime_updates().await.unwrap();
|
||||
///
|
||||
/// while let Some(Ok(update)) = update_stream.next().await {
|
||||
/// let version = update.runtime_version().spec_version;
|
||||
///
|
||||
/// match updater.apply_update(update) {
|
||||
/// Ok(()) => {
|
||||
/// println!("Upgrade to version: {} successful", version)
|
||||
/// }
|
||||
/// Err(e) => {
|
||||
/// println!("Upgrade to version {} failed {:?}", version, e);
|
||||
/// }
|
||||
/// };
|
||||
/// }
|
||||
/// });
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn subscribe_to_updates(&self) -> ClientRuntimeUpdater<T> {
|
||||
@@ -209,34 +233,110 @@ impl<T: Config> ClientRuntimeUpdater<T> {
|
||||
&curr.runtime_version != new
|
||||
}
|
||||
|
||||
fn do_update(&self, update: Update) {
|
||||
let mut writable = self.0.inner.write();
|
||||
writable.metadata = update.metadata;
|
||||
writable.runtime_version = update.runtime_version;
|
||||
}
|
||||
|
||||
/// Tries to apply a new update.
|
||||
pub fn apply_update(&self, update: Update) -> Result<(), UpgradeError> {
|
||||
if !self.is_runtime_version_different(&update.runtime_version) {
|
||||
return Err(UpgradeError::SameVersion)
|
||||
}
|
||||
|
||||
self.do_update(update);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Performs runtime updates indefinitely unless encountering an error.
|
||||
///
|
||||
/// *Note:* This will run indefinitely until it errors, so the typical usage
|
||||
/// would be to run it in a separate background task.
|
||||
pub async fn perform_runtime_updates(&self) -> Result<(), Error> {
|
||||
// Obtain an update subscription to further detect changes in the runtime version of the node.
|
||||
let mut update_subscription = self.0.rpc.subscribe_runtime_version().await?;
|
||||
let mut runtime_version_stream = self.runtime_updates().await?;
|
||||
|
||||
while let Some(new_runtime_version) = update_subscription.next().await {
|
||||
// The Runtime Version obtained via subscription.
|
||||
let new_runtime_version = new_runtime_version?;
|
||||
while let Some(update) = runtime_version_stream.next().await {
|
||||
let update = update?;
|
||||
|
||||
// Ignore this update if there is no difference.
|
||||
if !self.is_runtime_version_different(&new_runtime_version) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Fetch new metadata.
|
||||
let new_metadata = self.0.rpc.metadata().await?;
|
||||
|
||||
// Do the update.
|
||||
let mut writable = self.0.inner.write();
|
||||
writable.metadata = new_metadata;
|
||||
writable.runtime_version = new_runtime_version;
|
||||
// This only fails if received the runtime version is the same the current runtime version
|
||||
// which might occur because that runtime subscriptions in substrate sends out the initial
|
||||
// value when they created and not only when runtime upgrades occurs.
|
||||
// Thus, fine to ignore here as it strictly speaking isn't really an error
|
||||
let _ = self.apply_update(update);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Low-level API to get runtime updates as a stream but it's doesn't check if the
|
||||
/// runtime version is newer or updates the runtime.
|
||||
///
|
||||
/// Instead that's up to the user of this API to decide when to update and
|
||||
/// to perform the actual updating.
|
||||
pub async fn runtime_updates(&self) -> Result<RuntimeUpdaterStream<T>, Error> {
|
||||
let stream = self.0.rpc().subscribe_runtime_version().await?;
|
||||
Ok(RuntimeUpdaterStream {
|
||||
stream,
|
||||
client: self.0.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Stream to perform runtime upgrades.
|
||||
pub struct RuntimeUpdaterStream<T: Config> {
|
||||
stream: Subscription<RuntimeVersion>,
|
||||
client: OnlineClient<T>,
|
||||
}
|
||||
|
||||
impl<T: Config> RuntimeUpdaterStream<T> {
|
||||
/// Get the next element of the stream.
|
||||
pub async fn next(&mut self) -> Option<Result<Update, Error>> {
|
||||
let maybe_runtime_version = self.stream.next().await?;
|
||||
|
||||
let runtime_version = match maybe_runtime_version {
|
||||
Ok(runtime_version) => runtime_version,
|
||||
Err(err) => return Some(Err(err)),
|
||||
};
|
||||
|
||||
let metadata = match self.client.rpc().metadata().await {
|
||||
Ok(metadata) => metadata,
|
||||
Err(err) => return Some(Err(err)),
|
||||
};
|
||||
|
||||
Some(Ok(Update {
|
||||
metadata,
|
||||
runtime_version,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
/// Error that can occur during upgrade.
|
||||
#[non_exhaustive]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum UpgradeError {
|
||||
/// The version is the same as the current version.
|
||||
SameVersion,
|
||||
}
|
||||
|
||||
/// Represents the state when a runtime upgrade occurred.
|
||||
pub struct Update {
|
||||
runtime_version: RuntimeVersion,
|
||||
metadata: Metadata,
|
||||
}
|
||||
|
||||
impl Update {
|
||||
/// Get the runtime version.
|
||||
pub fn runtime_version(&self) -> &RuntimeVersion {
|
||||
&self.runtime_version
|
||||
}
|
||||
|
||||
/// Get the metadata.
|
||||
pub fn metadata(&self) -> &Metadata {
|
||||
&self.metadata
|
||||
}
|
||||
}
|
||||
|
||||
// helpers for a jsonrpsee specific OnlineClient.
|
||||
|
||||
Reference in New Issue
Block a user