Use Substrate state_getReadProof RPC method to get storage proofs (#893)

* use Substrate state_getReadProof method instead of pallet-bridge-messages-rpc

* Fix typo

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
This commit is contained in:
Svyatoslav Nikolsky
2021-04-14 09:12:38 +03:00
committed by Bastian Köcher
parent aa17c272f1
commit 0d60f42b5e
20 changed files with 130 additions and 560 deletions
@@ -25,18 +25,19 @@ use bp_messages::{LaneId, MessageNonce};
use bp_runtime::InstanceId;
use bridge_runtime_common::messages::target::FromBridgedChainMessagesProof;
use codec::{Decode, Encode};
use frame_support::weights::Weight;
use frame_support::{traits::Instance, weights::Weight};
use messages_relay::{
message_lane::{SourceHeaderIdOf, TargetHeaderIdOf},
message_lane_loop::{
ClientState, MessageProofParameters, MessageWeights, MessageWeightsMap, SourceClient, SourceClientState,
},
};
use pallet_bridge_messages::Config as MessagesConfig;
use relay_substrate_client::{Chain, Client, Error as SubstrateError, HashOf, HeaderIdOf};
use relay_utils::{relay_loop::Client as RelayClient, BlockNumberBase, HeaderId};
use sp_core::Bytes;
use sp_runtime::{traits::Header as HeaderT, DeserializeOwned};
use std::ops::RangeInclusive;
use std::{marker::PhantomData, ops::RangeInclusive};
/// Intermediate message proof returned by the source Substrate node. Includes everything
/// required to submit to the target node: cumulative dispatch weight of bundled messages and
@@ -44,14 +45,15 @@ use std::ops::RangeInclusive;
pub type SubstrateMessagesProof<C> = (Weight, FromBridgedChainMessagesProof<HashOf<C>>);
/// Substrate client as Substrate messages source.
pub struct SubstrateMessagesSource<C: Chain, P> {
pub struct SubstrateMessagesSource<C: Chain, P, R, I> {
client: Client<C>,
lane: P,
lane_id: LaneId,
instance: InstanceId,
_phantom: PhantomData<(R, I)>,
}
impl<C: Chain, P> SubstrateMessagesSource<C, P> {
impl<C: Chain, P, R, I> SubstrateMessagesSource<C, P, R, I> {
/// Create new Substrate headers source.
pub fn new(client: Client<C>, lane: P, lane_id: LaneId, instance: InstanceId) -> Self {
SubstrateMessagesSource {
@@ -59,23 +61,31 @@ impl<C: Chain, P> SubstrateMessagesSource<C, P> {
lane,
lane_id,
instance,
_phantom: Default::default(),
}
}
}
impl<C: Chain, P: SubstrateMessageLane> Clone for SubstrateMessagesSource<C, P> {
impl<C: Chain, P: SubstrateMessageLane, R, I> Clone for SubstrateMessagesSource<C, P, R, I> {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
lane: self.lane.clone(),
lane_id: self.lane_id,
instance: self.instance,
_phantom: Default::default(),
}
}
}
#[async_trait]
impl<C: Chain, P: SubstrateMessageLane> RelayClient for SubstrateMessagesSource<C, P> {
impl<C, P, R, I> RelayClient for SubstrateMessagesSource<C, P, R, I>
where
C: Chain,
P: SubstrateMessageLane,
R: Send + Sync,
I: Send + Sync + Instance,
{
type Error = SubstrateError;
async fn reconnect(&mut self) -> Result<(), SubstrateError> {
@@ -84,7 +94,7 @@ impl<C: Chain, P: SubstrateMessageLane> RelayClient for SubstrateMessagesSource<
}
#[async_trait]
impl<C, P> SourceClient<P> for SubstrateMessagesSource<C, P>
impl<C, P, R, I> SourceClient<P> for SubstrateMessagesSource<C, P, R, I>
where
C: Chain,
C::Header: DeserializeOwned,
@@ -98,6 +108,8 @@ where
>,
P::TargetHeaderNumber: Decode,
P::TargetHeaderHash: Decode,
R: Send + Sync + MessagesConfig<I>,
I: Send + Sync + Instance,
{
async fn state(&self) -> Result<SourceClientState<P>, SubstrateError> {
// we can't continue to deliver confirmations if source node is out of sync, because
@@ -171,15 +183,22 @@ where
nonces: RangeInclusive<MessageNonce>,
proof_parameters: MessageProofParameters,
) -> Result<(SourceHeaderIdOf<P>, RangeInclusive<MessageNonce>, P::MessagesProof), SubstrateError> {
let mut storage_keys = Vec::with_capacity(nonces.end().saturating_sub(*nonces.start()) as usize + 1);
let mut message_nonce = *nonces.start();
while message_nonce <= *nonces.end() {
let message_key = pallet_bridge_messages::storage_keys::message_key::<R, I>(&self.lane_id, message_nonce);
storage_keys.push(message_key);
message_nonce += 1;
}
if proof_parameters.outbound_state_proof_required {
storage_keys.push(pallet_bridge_messages::storage_keys::outbound_lane_data_key::<I>(
&self.lane_id,
));
}
let proof = self
.client
.prove_messages(
self.instance,
self.lane_id,
nonces.clone(),
proof_parameters.outbound_state_proof_required,
id.1,
)
.prove_storage(storage_keys, id.1)
.await?
.iter_nodes()
.collect();
@@ -26,15 +26,17 @@ use bp_messages::{LaneId, MessageNonce, UnrewardedRelayersState};
use bp_runtime::InstanceId;
use bridge_runtime_common::messages::source::FromBridgedChainMessagesDeliveryProof;
use codec::{Decode, Encode};
use frame_support::traits::Instance;
use messages_relay::{
message_lane::{SourceHeaderIdOf, TargetHeaderIdOf},
message_lane_loop::{TargetClient, TargetClientState},
};
use pallet_bridge_messages::Config as MessagesConfig;
use relay_substrate_client::{Chain, Client, Error as SubstrateError, HashOf};
use relay_utils::{relay_loop::Client as RelayClient, BlockNumberBase};
use sp_core::Bytes;
use sp_runtime::{traits::Header as HeaderT, DeserializeOwned};
use std::ops::RangeInclusive;
use std::{marker::PhantomData, ops::RangeInclusive};
/// Message receiving proof returned by the target Substrate node.
pub type SubstrateMessagesReceivingProof<C> = (
@@ -43,14 +45,15 @@ pub type SubstrateMessagesReceivingProof<C> = (
);
/// Substrate client as Substrate messages target.
pub struct SubstrateMessagesTarget<C: Chain, P> {
pub struct SubstrateMessagesTarget<C: Chain, P, R, I> {
client: Client<C>,
lane: P,
lane_id: LaneId,
instance: InstanceId,
_phantom: PhantomData<(R, I)>,
}
impl<C: Chain, P> SubstrateMessagesTarget<C, P> {
impl<C: Chain, P, R, I> SubstrateMessagesTarget<C, P, R, I> {
/// Create new Substrate headers target.
pub fn new(client: Client<C>, lane: P, lane_id: LaneId, instance: InstanceId) -> Self {
SubstrateMessagesTarget {
@@ -58,23 +61,31 @@ impl<C: Chain, P> SubstrateMessagesTarget<C, P> {
lane,
lane_id,
instance,
_phantom: Default::default(),
}
}
}
impl<C: Chain, P: SubstrateMessageLane> Clone for SubstrateMessagesTarget<C, P> {
impl<C: Chain, P: SubstrateMessageLane, R, I> Clone for SubstrateMessagesTarget<C, P, R, I> {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
lane: self.lane.clone(),
lane_id: self.lane_id,
instance: self.instance,
_phantom: Default::default(),
}
}
}
#[async_trait]
impl<C: Chain, P: SubstrateMessageLane> RelayClient for SubstrateMessagesTarget<C, P> {
impl<C, P, R, I> RelayClient for SubstrateMessagesTarget<C, P, R, I>
where
C: Chain,
P: SubstrateMessageLane,
R: Send + Sync,
I: Send + Sync + Instance,
{
type Error = SubstrateError;
async fn reconnect(&mut self) -> Result<(), SubstrateError> {
@@ -83,7 +94,7 @@ impl<C: Chain, P: SubstrateMessageLane> RelayClient for SubstrateMessagesTarget<
}
#[async_trait]
impl<C, P> TargetClient<P> for SubstrateMessagesTarget<C, P>
impl<C, P, R, I> TargetClient<P> for SubstrateMessagesTarget<C, P, R, I>
where
C: Chain,
C::Header: DeserializeOwned,
@@ -97,6 +108,8 @@ where
>,
P::SourceHeaderNumber: Decode,
P::SourceHeaderHash: Decode,
R: Send + Sync + MessagesConfig<I>,
I: Send + Sync + Instance,
{
async fn state(&self) -> Result<TargetClientState<P>, SubstrateError> {
// we can't continue to deliver messages if target node is out of sync, because
@@ -166,10 +179,13 @@ where
id: TargetHeaderIdOf<P>,
) -> Result<(TargetHeaderIdOf<P>, P::MessagesReceivingProof), SubstrateError> {
let (id, relayers_state) = self.unrewarded_relayers_state(id).await?;
let inbound_data_key = pallet_bridge_messages::storage_keys::inbound_lane_data_key::<R, I>(&self.lane_id);
let proof = self
.client
.prove_messages_delivery(self.instance, self.lane_id, id.1)
.await?;
.prove_storage(vec![inbound_data_key], id.1)
.await?
.iter_nodes()
.collect();
let proof = FromBridgedChainMessagesDeliveryProof {
bridged_header_hash: id.1,
storage_proof: proof,
@@ -126,10 +126,20 @@ impl SubstrateMessageLane for MillauMessagesToRialto {
}
/// Millau node as messages source.
type MillauSourceClient = SubstrateMessagesSource<Millau, MillauMessagesToRialto>;
type MillauSourceClient = SubstrateMessagesSource<
Millau,
MillauMessagesToRialto,
millau_runtime::Runtime,
millau_runtime::WithRialtoMessagesInstance,
>;
/// Rialto node as messages target.
type RialtoTargetClient = SubstrateMessagesTarget<Rialto, MillauMessagesToRialto>;
type RialtoTargetClient = SubstrateMessagesTarget<
Rialto,
MillauMessagesToRialto,
rialto_runtime::Runtime,
rialto_runtime::WithMillauMessagesInstance,
>;
/// Run Millau-to-Rialto messages sync.
pub async fn run(
@@ -196,8 +206,6 @@ pub async fn run(
)
.standalone_metric(StorageProofOverheadMetric::new(
millau_client.clone(),
(bp_runtime::RIALTO_BRIDGE_INSTANCE, lane_id),
millau_runtime::rialto_messages::inbound_lane_data_key(&lane_id),
"millau_storage_proof_overhead".into(),
"Millau storage proof overhead".into(),
))?
@@ -126,10 +126,20 @@ impl SubstrateMessageLane for RialtoMessagesToMillau {
}
/// Rialto node as messages source.
type RialtoSourceClient = SubstrateMessagesSource<Rialto, RialtoMessagesToMillau>;
type RialtoSourceClient = SubstrateMessagesSource<
Rialto,
RialtoMessagesToMillau,
rialto_runtime::Runtime,
rialto_runtime::WithMillauMessagesInstance,
>;
/// Millau node as messages target.
type MillauTargetClient = SubstrateMessagesTarget<Millau, RialtoMessagesToMillau>;
type MillauTargetClient = SubstrateMessagesTarget<
Millau,
RialtoMessagesToMillau,
millau_runtime::Runtime,
millau_runtime::WithRialtoMessagesInstance,
>;
/// Run Rialto-to-Millau messages sync.
pub async fn run(
@@ -195,8 +205,6 @@ pub async fn run(
)
.standalone_metric(StorageProofOverheadMetric::new(
rialto_client.clone(),
(bp_runtime::MILLAU_BRIDGE_INSTANCE, lane_id),
rialto_runtime::millau_messages::inbound_lane_data_key(&lane_id),
"rialto_storage_proof_overhead".into(),
"Rialto storage proof overhead".into(),
))?
@@ -34,6 +34,7 @@ sc-rpc-api = { git = "https://github.com/paritytech/substrate", branch = "master
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-finality-grandpa = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-storage = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-std = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-version = { git = "https://github.com/paritytech/substrate", branch = "master" }
+10 -46
View File
@@ -17,12 +17,10 @@
//! Substrate node client.
use crate::chain::{Chain, ChainWithBalances};
use crate::rpc::{Substrate, SubstrateMessages};
use crate::rpc::Substrate;
use crate::{ConnectionParams, Error, Result};
use async_std::sync::{Arc, Mutex};
use bp_messages::{LaneId, MessageNonce};
use bp_runtime::InstanceId;
use codec::Decode;
use frame_system::AccountInfo;
use jsonrpsee_types::{jsonrpc::DeserializeOwned, traits::SubscriptionClient};
@@ -32,7 +30,6 @@ use pallet_balances::AccountData;
use sp_core::{storage::StorageKey, Bytes};
use sp_trie::StorageProof;
use sp_version::RuntimeVersion;
use std::ops::RangeInclusive;
const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities";
const MAX_SUBSCRIPTION_CAPACITY: usize = 4096;
@@ -175,12 +172,12 @@ impl<C: Chain> Client<C> {
/// Return runtime version.
pub async fn runtime_version(&self) -> Result<RuntimeVersion> {
Ok(Substrate::<C>::runtime_version(&*self.client).await?)
Ok(Substrate::<C>::state_runtime_version(&*self.client).await?)
}
/// Read value from runtime storage.
pub async fn storage_value<T: Decode>(&self, storage_key: StorageKey) -> Result<Option<T>> {
Substrate::<C>::get_storage(&*self.client, storage_key)
Substrate::<C>::state_get_storage(&*self.client, storage_key)
.await?
.map(|encoded_value| T::decode(&mut &encoded_value.0[..]).map_err(Error::ResponseParseFailed))
.transpose()
@@ -192,7 +189,7 @@ impl<C: Chain> Client<C> {
C: ChainWithBalances,
{
let storage_key = C::account_info_storage_key(&account);
let encoded_account_data = Substrate::<C>::get_storage(&*self.client, storage_key)
let encoded_account_data = Substrate::<C>::state_get_storage(&*self.client, storage_key)
.await?
.ok_or(Error::AccountDoesNotExist)?;
let decoded_account_data =
@@ -255,45 +252,12 @@ impl<C: Chain> Client<C> {
.map_err(Into::into)
}
/// Returns proof-of-message(s) in given inclusive range.
pub async fn prove_messages(
&self,
instance: InstanceId,
lane: LaneId,
range: RangeInclusive<MessageNonce>,
include_outbound_lane_state: bool,
at_block: C::Hash,
) -> Result<StorageProof> {
let encoded_trie_nodes = SubstrateMessages::<C>::prove_messages(
&*self.client,
instance,
lane,
*range.start(),
*range.end(),
include_outbound_lane_state,
Some(at_block),
)
.await
.map_err(Error::RpcError)?;
let decoded_trie_nodes: Vec<Vec<u8>> =
Decode::decode(&mut &encoded_trie_nodes[..]).map_err(Error::ResponseParseFailed)?;
Ok(StorageProof::new(decoded_trie_nodes))
}
/// Returns proof-of-message(s) delivery.
pub async fn prove_messages_delivery(
&self,
instance: InstanceId,
lane: LaneId,
at_block: C::Hash,
) -> Result<Vec<Vec<u8>>> {
let encoded_trie_nodes =
SubstrateMessages::<C>::prove_messages_delivery(&*self.client, instance, lane, Some(at_block))
.await
.map_err(Error::RpcError)?;
let decoded_trie_nodes: Vec<Vec<u8>> =
Decode::decode(&mut &encoded_trie_nodes[..]).map_err(Error::ResponseParseFailed)?;
Ok(decoded_trie_nodes)
/// Returns storage proof of given storage keys.
pub async fn prove_storage(&self, keys: Vec<StorageKey>, at_block: C::Hash) -> Result<StorageProof> {
Substrate::<C>::state_prove_storage(&*self.client, keys, Some(at_block))
.await
.map(|proof| StorageProof::new(proof.proof.into_iter().map(|b| b.0).collect()))
.map_err(Into::into)
}
/// Return new justifications stream.
@@ -36,6 +36,8 @@ pub enum Error {
UninitializedBridgePallet,
/// Account does not exist on the chain.
AccountDoesNotExist,
/// Runtime storage is missing mandatory ":code:" entry.
MissingMandatoryCodeEntry,
/// The client we're connected to is not synced, so we can't rely on its state.
ClientNotSynced(Health),
/// An error has happened when we have tried to parse storage proof.
@@ -51,6 +53,7 @@ impl std::error::Error for Error {
Self::ResponseParseFailed(ref e) => Some(e),
Self::UninitializedBridgePallet => None,
Self::AccountDoesNotExist => None,
Self::MissingMandatoryCodeEntry => None,
Self::ClientNotSynced(_) => None,
Self::StorageProofError(_) => None,
Self::Custom(_) => None,
@@ -85,6 +88,7 @@ impl std::fmt::Display for Error {
Self::ResponseParseFailed(e) => e.to_string(),
Self::UninitializedBridgePallet => "The Substrate bridge pallet has not been initialized yet.".into(),
Self::AccountDoesNotExist => "Account does not exist on the chain".into(),
Self::MissingMandatoryCodeEntry => "Mandatory :code: entry is missing from runtime storage".into(),
Self::StorageProofError(e) => format!("Error when parsing storage proof: {:?}", e),
Self::ClientNotSynced(health) => format!("Substrate client is not synced: {}", health),
Self::Custom(e) => e.clone(),
@@ -19,12 +19,10 @@ use crate::client::Client;
use crate::error::Error;
use async_trait::async_trait;
use bp_messages::LaneId;
use bp_runtime::InstanceId;
use relay_utils::metrics::{register, Gauge, Metrics, Registry, StandaloneMetrics, U64};
use sp_core::storage::StorageKey;
use sp_runtime::traits::Header as HeaderT;
use sp_trie::StorageProof;
use sp_storage::well_known_keys::CODE;
use std::time::Duration;
/// Storage proof overhead update interval (in blocks).
@@ -32,20 +30,11 @@ const UPDATE_INTERVAL_IN_BLOCKS: u32 = 100;
/// Metric that represents extra size of storage proof as unsigned integer gauge.
///
/// Regular Substrate node does not provide any RPC endpoints that return storage proofs.
/// So here we're using our own `pallet-bridge-messages-rpc` RPC API, which returns proof
/// of the inbound message lane state. Then we simply subtract size of this state from
/// the size of storage proof to compute metric value.
///
/// There are two things to keep in mind when using this metric:
///
/// 1) it'll only work on inbound lanes that have already accepted at least one message;
/// 2) the overhead may be slightly different for other values, but this metric gives a good estimation.
/// There's one thing to keep in mind when using this metric: the overhead may be slightly
/// different for other values, but this metric gives a good estimation.
#[derive(Debug)]
pub struct StorageProofOverheadMetric<C: Chain> {
client: Client<C>,
inbound_lane: (InstanceId, LaneId),
inbound_lane_data_key: StorageKey,
metric: Gauge<U64>,
}
@@ -53,8 +42,6 @@ impl<C: Chain> Clone for StorageProofOverheadMetric<C> {
fn clone(&self) -> Self {
StorageProofOverheadMetric {
client: self.client.clone(),
inbound_lane: self.inbound_lane,
inbound_lane_data_key: self.inbound_lane_data_key.clone(),
metric: self.metric.clone(),
}
}
@@ -62,17 +49,9 @@ impl<C: Chain> Clone for StorageProofOverheadMetric<C> {
impl<C: Chain> StorageProofOverheadMetric<C> {
/// Create new metric instance with given name and help.
pub fn new(
client: Client<C>,
inbound_lane: (InstanceId, LaneId),
inbound_lane_data_key: StorageKey,
name: String,
help: String,
) -> Self {
pub fn new(client: Client<C>, name: String, help: String) -> Self {
StorageProofOverheadMetric {
client,
inbound_lane,
inbound_lane_data_key,
metric: Gauge::new(name, help).expect(
"only fails if gauge options are customized;\
we use default options;\
@@ -82,32 +61,27 @@ impl<C: Chain> StorageProofOverheadMetric<C> {
}
/// Returns approximate storage proof size overhead.
///
/// Returs `Ok(None)` if inbound lane we're watching for has no state. This shouldn't be treated as error.
async fn compute_storage_proof_overhead(&self) -> Result<Option<usize>, Error> {
async fn compute_storage_proof_overhead(&self) -> Result<usize, Error> {
let best_header_hash = self.client.best_finalized_header_hash().await?;
let best_header = self.client.header_by_hash(best_header_hash).await?;
let storage_proof = self
.client
.prove_messages_delivery(self.inbound_lane.0, self.inbound_lane.1, best_header_hash)
.prove_storage(vec![StorageKey(CODE.to_vec())], best_header_hash)
.await?;
let storage_proof_size: usize = storage_proof.iter().map(|n| n.len()).sum();
let storage_proof_size: usize = storage_proof.clone().iter_nodes().map(|n| n.len()).sum();
let storage_value_reader = bp_runtime::StorageProofChecker::<C::Hasher>::new(
*best_header.state_root(),
StorageProof::new(storage_proof),
)
.map_err(Error::StorageProofError)?;
let storage_value_reader =
bp_runtime::StorageProofChecker::<C::Hasher>::new(*best_header.state_root(), storage_proof)
.map_err(Error::StorageProofError)?;
let maybe_encoded_storage_value = storage_value_reader
.read_value(&self.inbound_lane_data_key.0)
.read_value(CODE)
.map_err(Error::StorageProofError)?;
let encoded_storage_value_size = match maybe_encoded_storage_value {
Some(encoded_storage_value) => encoded_storage_value.len(),
None => return Ok(None),
};
let encoded_storage_value_size = maybe_encoded_storage_value
.ok_or(Error::MissingMandatoryCodeEntry)?
.len();
Ok(Some(storage_proof_size - encoded_storage_value_size))
Ok(storage_proof_size - encoded_storage_value_size)
}
}
@@ -129,7 +103,7 @@ impl<C: Chain> StandaloneMetrics for StorageProofOverheadMetric<C> {
&self.metric,
self.compute_storage_proof_overhead()
.await
.map(|v| v.map(|overhead| overhead as u64)),
.map(|overhead| Some(overhead as u64)),
);
}
}
+5 -24
View File
@@ -18,9 +18,7 @@
use crate::chain::Chain;
use bp_messages::{LaneId, MessageNonce};
use bp_runtime::InstanceId;
use sc_rpc_api::system::Health;
use sc_rpc_api::{state::ReadProof, system::Health};
use sp_core::{
storage::{StorageData, StorageKey},
Bytes,
@@ -46,27 +44,10 @@ jsonrpsee_proc_macros::rpc_client_api! {
#[rpc(method = "state_call", positional_params)]
fn state_call(method: String, data: Bytes, at_block: Option<C::Hash>) -> Bytes;
#[rpc(method = "state_getStorage", positional_params)]
fn get_storage(key: StorageKey) -> Option<StorageData>;
fn state_get_storage(key: StorageKey) -> Option<StorageData>;
#[rpc(method = "state_getReadProof", positional_params)]
fn state_prove_storage(keys: Vec<StorageKey>, hash: Option<C::Hash>) -> ReadProof<C::Hash>;
#[rpc(method = "state_getRuntimeVersion", positional_params)]
fn runtime_version() -> RuntimeVersion;
}
pub(crate) SubstrateMessages<C: Chain> {
#[rpc(method = "messages_proveMessages", positional_params)]
fn prove_messages(
instance: InstanceId,
lane: LaneId,
begin: MessageNonce,
end: MessageNonce,
include_outbound_lane_state: bool,
block: Option<C::Hash>,
) -> Bytes;
#[rpc(method = "messages_proveMessagesDelivery", positional_params)]
fn prove_messages_delivery(
instance: InstanceId,
lane: LaneId,
block: Option<C::Hash>,
) -> Bytes;
fn state_runtime_version() -> RuntimeVersion;
}
}