mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-28 19:07:57 +00:00
Mmr persist state (#12822)
client/mmr: persisting gadget state across runs Fixes #12780 * client/mmr: on init do canonicalization catch-up * client/mmr: add more tests * client/mmr: persist gadget progress in aux db * client/mmr: add more tests * client/mmr: replace async_std with tokio * remove leftover comment * address review comments Signed-off-by: acatangiu <adrian@parity.io>
This commit is contained in:
Generated
+2
@@ -4137,6 +4137,7 @@ dependencies = [
|
||||
"futures",
|
||||
"log",
|
||||
"parity-scale-codec",
|
||||
"parking_lot 0.12.1",
|
||||
"sc-block-builder",
|
||||
"sc-client-api",
|
||||
"sc-offchain",
|
||||
@@ -4148,6 +4149,7 @@ dependencies = [
|
||||
"sp-io",
|
||||
"sp-mmr-primitives",
|
||||
"sp-runtime",
|
||||
"sp-tracing",
|
||||
"substrate-test-runtime-client",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@@ -26,6 +26,8 @@ sc-offchain = { version = "4.0.0-dev", path = "../offchain" }
|
||||
sp-runtime = { version = "7.0.0", path = "../../primitives/runtime" }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = "1.17.0"
|
||||
parking_lot = "0.12.1"
|
||||
sc-block-builder = { version = "0.10.0-dev", path = "../block-builder" }
|
||||
sp-tracing = { version = "6.0.0", path = "../../primitives/tracing" }
|
||||
substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" }
|
||||
tokio = "1.17.0"
|
||||
|
||||
@@ -0,0 +1,228 @@
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
//! Schema for MMR-gadget state persisted in the aux-db.
|
||||
|
||||
use crate::LOG_TARGET;
|
||||
use codec::{Decode, Encode};
|
||||
use log::{info, trace};
|
||||
use sc_client_api::backend::AuxStore;
|
||||
use sp_blockchain::{Error as ClientError, Result as ClientResult};
|
||||
use sp_runtime::traits::{Block, NumberFor};
|
||||
|
||||
const VERSION_KEY: &[u8] = b"mmr_auxschema_version";
|
||||
const GADGET_STATE: &[u8] = b"mmr_gadget_state";
|
||||
|
||||
const CURRENT_VERSION: u32 = 1;
|
||||
pub(crate) type PersistedState<B> = NumberFor<B>;
|
||||
|
||||
pub(crate) fn write_current_version<B: AuxStore>(backend: &B) -> ClientResult<()> {
|
||||
info!(target: LOG_TARGET, "write aux schema version {:?}", CURRENT_VERSION);
|
||||
AuxStore::insert_aux(backend, &[(VERSION_KEY, CURRENT_VERSION.encode().as_slice())], &[])
|
||||
}
|
||||
|
||||
/// Write gadget state.
|
||||
pub(crate) fn write_gadget_state<B: Block, BE: AuxStore>(
|
||||
backend: &BE,
|
||||
state: &PersistedState<B>,
|
||||
) -> ClientResult<()> {
|
||||
trace!(target: LOG_TARGET, "persisting {:?}", state);
|
||||
backend.insert_aux(&[(GADGET_STATE, state.encode().as_slice())], &[])
|
||||
}
|
||||
|
||||
fn load_decode<B: AuxStore, T: Decode>(backend: &B, key: &[u8]) -> ClientResult<Option<T>> {
|
||||
match backend.get_aux(key)? {
|
||||
None => Ok(None),
|
||||
Some(t) => T::decode(&mut &t[..])
|
||||
.map_err(|e| ClientError::Backend(format!("MMR aux DB is corrupted: {}", e)))
|
||||
.map(Some),
|
||||
}
|
||||
}
|
||||
|
||||
/// Load or initialize persistent data from backend.
|
||||
pub(crate) fn load_persistent<B, BE>(backend: &BE) -> ClientResult<Option<PersistedState<B>>>
|
||||
where
|
||||
B: Block,
|
||||
BE: AuxStore,
|
||||
{
|
||||
let version: Option<u32> = load_decode(backend, VERSION_KEY)?;
|
||||
|
||||
match version {
|
||||
None => (),
|
||||
Some(1) => return load_decode::<_, PersistedState<B>>(backend, GADGET_STATE),
|
||||
other =>
|
||||
return Err(ClientError::Backend(format!("Unsupported MMR aux DB version: {:?}", other))),
|
||||
}
|
||||
|
||||
// No persistent state found in DB.
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use super::*;
|
||||
use crate::test_utils::{
|
||||
run_test_with_mmr_gadget_pre_post_using_client, MmrBlock, MockClient, OffchainKeyType,
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
use sp_core::offchain::{DbExternalities, StorageKind};
|
||||
use sp_mmr_primitives::utils::NodesUtils;
|
||||
use sp_runtime::generic::BlockId;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use substrate_test_runtime_client::{runtime::Block, Backend};
|
||||
|
||||
#[test]
|
||||
fn should_load_persistent_sanity_checks() {
|
||||
let client = MockClient::new();
|
||||
let backend = &*client.backend;
|
||||
|
||||
// version not available in db -> None
|
||||
assert_eq!(load_persistent::<Block, Backend>(backend).unwrap(), None);
|
||||
|
||||
// populate version in db
|
||||
write_current_version(backend).unwrap();
|
||||
// verify correct version is retrieved
|
||||
assert_eq!(load_decode(backend, VERSION_KEY).unwrap(), Some(CURRENT_VERSION));
|
||||
|
||||
// version is available in db but state isn't -> None
|
||||
assert_eq!(load_persistent::<Block, Backend>(backend).unwrap(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_persist_progress_across_runs() {
|
||||
sp_tracing::try_init_simple();
|
||||
|
||||
let client = Arc::new(MockClient::new());
|
||||
let backend = client.backend.clone();
|
||||
|
||||
// version not available in db -> None
|
||||
assert_eq!(load_decode::<Backend, Option<u32>>(&*backend, VERSION_KEY).unwrap(), None);
|
||||
// state not available in db -> None
|
||||
assert_eq!(load_persistent::<Block, Backend>(&*backend).unwrap(), None);
|
||||
// run the gadget while importing and finalizing 3 blocks
|
||||
run_test_with_mmr_gadget_pre_post_using_client(
|
||||
client.clone(),
|
||||
|_| async {},
|
||||
|client| async move {
|
||||
let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
|
||||
let a2 = client.import_block(&BlockId::Number(1), b"a2", Some(1)).await;
|
||||
let a3 = client.import_block(&BlockId::Number(2), b"a3", Some(2)).await;
|
||||
client.finalize_block(a3.hash(), Some(3));
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
// a1, a2, a3 were canonicalized
|
||||
client.assert_canonicalized(&[&a1, &a2, &a3]);
|
||||
},
|
||||
);
|
||||
|
||||
// verify previous progress was persisted and run the gadget again
|
||||
run_test_with_mmr_gadget_pre_post_using_client(
|
||||
client.clone(),
|
||||
|client| async move {
|
||||
let backend = &*client.backend;
|
||||
// check there is both version and best canon available in db before running gadget
|
||||
assert_eq!(load_decode(backend, VERSION_KEY).unwrap(), Some(CURRENT_VERSION));
|
||||
assert_eq!(load_persistent::<Block, Backend>(backend).unwrap(), Some(3));
|
||||
},
|
||||
|client| async move {
|
||||
let a4 = client.import_block(&BlockId::Number(3), b"a4", Some(3)).await;
|
||||
let a5 = client.import_block(&BlockId::Number(4), b"a5", Some(4)).await;
|
||||
let a6 = client.import_block(&BlockId::Number(5), b"a6", Some(5)).await;
|
||||
client.finalize_block(a6.hash(), Some(6));
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
// a4, a5, a6 were canonicalized
|
||||
client.assert_canonicalized(&[&a4, &a5, &a6]);
|
||||
// check persisted best canon was updated
|
||||
assert_eq!(load_persistent::<Block, Backend>(&*client.backend).unwrap(), Some(6));
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_resume_from_persisted_state() {
|
||||
sp_tracing::try_init_simple();
|
||||
|
||||
let client = Arc::new(MockClient::new());
|
||||
let blocks = Arc::new(Mutex::new(Vec::<MmrBlock>::new()));
|
||||
let blocks_clone = blocks.clone();
|
||||
|
||||
// run the gadget while importing and finalizing 3 blocks
|
||||
run_test_with_mmr_gadget_pre_post_using_client(
|
||||
client.clone(),
|
||||
|_| async {},
|
||||
|client| async move {
|
||||
let mut blocks = blocks_clone.lock();
|
||||
blocks.push(client.import_block(&BlockId::Number(0), b"a1", Some(0)).await);
|
||||
blocks.push(client.import_block(&BlockId::Number(1), b"a2", Some(1)).await);
|
||||
blocks.push(client.import_block(&BlockId::Number(2), b"a3", Some(2)).await);
|
||||
client.finalize_block(blocks.last().unwrap().hash(), Some(3));
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
// a1, a2, a3 were canonicalized
|
||||
let slice: Vec<&MmrBlock> = blocks.iter().collect();
|
||||
client.assert_canonicalized(&slice);
|
||||
|
||||
// now manually move them back to non-canon/temp location
|
||||
let mut offchain_db = client.offchain_db();
|
||||
for mmr_block in slice {
|
||||
for node in NodesUtils::right_branch_ending_in_leaf(mmr_block.leaf_idx.unwrap())
|
||||
{
|
||||
let canon_key = mmr_block.get_offchain_key(node, OffchainKeyType::Canon);
|
||||
let val = offchain_db
|
||||
.local_storage_get(StorageKind::PERSISTENT, &canon_key)
|
||||
.unwrap();
|
||||
offchain_db.local_storage_clear(StorageKind::PERSISTENT, &canon_key);
|
||||
|
||||
let temp_key = mmr_block.get_offchain_key(node, OffchainKeyType::Temp);
|
||||
offchain_db.local_storage_set(StorageKind::PERSISTENT, &temp_key, &val);
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
let blocks_clone = blocks.clone();
|
||||
// verify new gadget continues from block 4 and ignores 1, 2, 3 based on persisted state
|
||||
run_test_with_mmr_gadget_pre_post_using_client(
|
||||
client.clone(),
|
||||
|client| async move {
|
||||
let blocks = blocks_clone.lock();
|
||||
let slice: Vec<&MmrBlock> = blocks.iter().collect();
|
||||
|
||||
// verify persisted state says a1, a2, a3 were canonicalized,
|
||||
assert_eq!(load_persistent::<Block, Backend>(&*client.backend).unwrap(), Some(3));
|
||||
// but actually they are NOT canon (we manually reverted them earlier).
|
||||
client.assert_not_canonicalized(&slice);
|
||||
},
|
||||
|client| async move {
|
||||
let a4 = client.import_block(&BlockId::Number(3), b"a4", Some(3)).await;
|
||||
let a5 = client.import_block(&BlockId::Number(4), b"a5", Some(4)).await;
|
||||
let a6 = client.import_block(&BlockId::Number(5), b"a6", Some(5)).await;
|
||||
client.finalize_block(a6.hash(), Some(6));
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
let block_1_to_3 = blocks.lock();
|
||||
let slice: Vec<&MmrBlock> = block_1_to_3.iter().collect();
|
||||
// verify a1, a2, a3 are still NOT canon (skipped by gadget based on data in aux db)
|
||||
client.assert_not_canonicalized(&slice);
|
||||
// but a4, a5, a6 were canonicalized
|
||||
client.assert_canonicalized(&[&a4, &a5, &a6]);
|
||||
// check persisted best canon was updated
|
||||
assert_eq!(load_persistent::<Block, Backend>(&*client.backend).unwrap(), Some(6));
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -37,15 +37,15 @@
|
||||
|
||||
#![warn(missing_docs)]
|
||||
|
||||
mod aux_schema;
|
||||
mod offchain_mmr;
|
||||
#[cfg(test)]
|
||||
pub mod test_utils;
|
||||
|
||||
use std::{marker::PhantomData, sync::Arc};
|
||||
|
||||
use crate::offchain_mmr::OffchainMmr;
|
||||
use beefy_primitives::MmrRootHash;
|
||||
use futures::StreamExt;
|
||||
use log::{error, trace, warn};
|
||||
|
||||
use log::{debug, error, trace, warn};
|
||||
use sc_client_api::{Backend, BlockchainEvents, FinalityNotifications};
|
||||
use sc_offchain::OffchainDb;
|
||||
use sp_api::ProvideRuntimeApi;
|
||||
@@ -55,50 +55,75 @@ use sp_runtime::{
|
||||
generic::BlockId,
|
||||
traits::{Block, Header, NumberFor},
|
||||
};
|
||||
|
||||
use crate::offchain_mmr::OffchainMMR;
|
||||
use beefy_primitives::MmrRootHash;
|
||||
use sp_core::offchain::OffchainStorage;
|
||||
use std::{marker::PhantomData, sync::Arc};
|
||||
|
||||
/// Logging target for the mmr gadget.
|
||||
pub const LOG_TARGET: &str = "mmr";
|
||||
|
||||
struct OffchainMmrBuilder<B: Block, C, S> {
|
||||
struct OffchainMmrBuilder<B: Block, BE: Backend<B>, C> {
|
||||
backend: Arc<BE>,
|
||||
client: Arc<C>,
|
||||
offchain_db: OffchainDb<S>,
|
||||
offchain_db: OffchainDb<BE::OffchainStorage>,
|
||||
indexing_prefix: Vec<u8>,
|
||||
|
||||
_phantom: PhantomData<B>,
|
||||
}
|
||||
|
||||
impl<B, C, S> OffchainMmrBuilder<B, C, S>
|
||||
impl<B, BE, C> OffchainMmrBuilder<B, BE, C>
|
||||
where
|
||||
B: Block,
|
||||
BE: Backend<B>,
|
||||
C: ProvideRuntimeApi<B> + HeaderBackend<B> + HeaderMetadata<B>,
|
||||
C::Api: MmrApi<B, MmrRootHash, NumberFor<B>>,
|
||||
S: OffchainStorage,
|
||||
{
|
||||
async fn try_build(
|
||||
self,
|
||||
finality_notifications: &mut FinalityNotifications<B>,
|
||||
) -> Option<OffchainMMR<C, B, S>> {
|
||||
) -> Option<OffchainMmr<B, BE, C>> {
|
||||
while let Some(notification) = finality_notifications.next().await {
|
||||
let best_block = *notification.header.number();
|
||||
match self.client.runtime_api().mmr_leaf_count(&BlockId::number(best_block)) {
|
||||
Ok(Ok(mmr_leaf_count)) => {
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"pallet-mmr detected at block {:?} with mmr size {:?}",
|
||||
best_block,
|
||||
mmr_leaf_count
|
||||
);
|
||||
match utils::first_mmr_block_num::<B::Header>(best_block, mmr_leaf_count) {
|
||||
Ok(first_mmr_block) => {
|
||||
let mut offchain_mmr = OffchainMMR {
|
||||
debug!(
|
||||
target: LOG_TARGET,
|
||||
"pallet-mmr genesis computed at block {:?}", first_mmr_block,
|
||||
);
|
||||
let best_canonicalized =
|
||||
match offchain_mmr::load_or_init_best_canonicalized::<B, BE>(
|
||||
&*self.backend,
|
||||
first_mmr_block,
|
||||
) {
|
||||
Ok(best) => best,
|
||||
Err(e) => {
|
||||
error!(
|
||||
target: LOG_TARGET,
|
||||
"Error loading state from aux db: {:?}", e
|
||||
);
|
||||
return None
|
||||
},
|
||||
};
|
||||
let mut offchain_mmr = OffchainMmr {
|
||||
backend: self.backend,
|
||||
client: self.client,
|
||||
offchain_db: self.offchain_db,
|
||||
indexing_prefix: self.indexing_prefix,
|
||||
first_mmr_block,
|
||||
|
||||
_phantom: Default::default(),
|
||||
best_canonicalized,
|
||||
};
|
||||
// We need to make sure all blocks leading up to current notification
|
||||
// have also been canonicalized.
|
||||
offchain_mmr.canonicalize_catch_up(¬ification);
|
||||
// We have to canonicalize and prune the blocks in the finality
|
||||
// notification that lead to building the offchain-mmr as well.
|
||||
offchain_mmr.canonicalize_and_prune(¬ification);
|
||||
offchain_mmr.canonicalize_and_prune(notification);
|
||||
return Some(offchain_mmr)
|
||||
},
|
||||
Err(e) => {
|
||||
@@ -143,14 +168,14 @@ where
|
||||
C: BlockchainEvents<B> + HeaderBackend<B> + HeaderMetadata<B> + ProvideRuntimeApi<B>,
|
||||
C::Api: MmrApi<B, MmrRootHash, NumberFor<B>>,
|
||||
{
|
||||
async fn run(mut self, builder: OffchainMmrBuilder<B, C, BE::OffchainStorage>) {
|
||||
async fn run(mut self, builder: OffchainMmrBuilder<B, BE, C>) {
|
||||
let mut offchain_mmr = match builder.try_build(&mut self.finality_notifications).await {
|
||||
Some(offchain_mmr) => offchain_mmr,
|
||||
None => return,
|
||||
};
|
||||
|
||||
while let Some(notification) = self.finality_notifications.next().await {
|
||||
offchain_mmr.canonicalize_and_prune(¬ification);
|
||||
offchain_mmr.canonicalize_and_prune(notification);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,6 +199,7 @@ where
|
||||
};
|
||||
mmr_gadget
|
||||
.run(OffchainMmrBuilder {
|
||||
backend,
|
||||
client,
|
||||
offchain_db,
|
||||
indexing_prefix,
|
||||
|
||||
@@ -21,33 +21,57 @@
|
||||
|
||||
#![warn(missing_docs)]
|
||||
|
||||
use std::{marker::PhantomData, sync::Arc};
|
||||
|
||||
use log::{debug, error, warn};
|
||||
|
||||
use sc_client_api::FinalityNotification;
|
||||
use crate::{aux_schema, LOG_TARGET};
|
||||
use log::{debug, error, info, warn};
|
||||
use sc_client_api::{AuxStore, Backend, FinalityNotification};
|
||||
use sc_offchain::OffchainDb;
|
||||
use sp_blockchain::{CachedHeaderMetadata, ForkBackend, HeaderBackend, HeaderMetadata};
|
||||
use sp_core::offchain::{DbExternalities, OffchainStorage, StorageKind};
|
||||
use sp_core::offchain::{DbExternalities, StorageKind};
|
||||
use sp_mmr_primitives::{utils, utils::NodesUtils, NodeIndex};
|
||||
use sp_runtime::traits::{Block, Header};
|
||||
use sp_runtime::{
|
||||
traits::{Block, NumberFor, One},
|
||||
Saturating,
|
||||
};
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
|
||||
use crate::LOG_TARGET;
|
||||
|
||||
/// `OffchainMMR` exposes MMR offchain canonicalization and pruning logic.
|
||||
pub struct OffchainMMR<C, B: Block, S> {
|
||||
pub client: Arc<C>,
|
||||
pub offchain_db: OffchainDb<S>,
|
||||
pub indexing_prefix: Vec<u8>,
|
||||
pub first_mmr_block: <B::Header as Header>::Number,
|
||||
|
||||
pub _phantom: PhantomData<B>,
|
||||
pub(crate) fn load_or_init_best_canonicalized<B, BE>(
|
||||
backend: &BE,
|
||||
first_mmr_block: NumberFor<B>,
|
||||
) -> sp_blockchain::Result<NumberFor<B>>
|
||||
where
|
||||
BE: AuxStore,
|
||||
B: Block,
|
||||
{
|
||||
// Initialize gadget best_canon from AUX DB or from pallet genesis.
|
||||
if let Some(best) = aux_schema::load_persistent::<B, BE>(backend)? {
|
||||
info!(target: LOG_TARGET, "Loading MMR best canonicalized state from db: {:?}.", best);
|
||||
Ok(best)
|
||||
} else {
|
||||
let best = first_mmr_block.saturating_sub(One::one());
|
||||
info!(
|
||||
target: LOG_TARGET,
|
||||
"Loading MMR from pallet genesis on what appears to be the first startup: {:?}.", best
|
||||
);
|
||||
aux_schema::write_current_version(backend)?;
|
||||
aux_schema::write_gadget_state::<B, BE>(backend, &best)?;
|
||||
Ok(best)
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, S, B> OffchainMMR<C, B, S>
|
||||
/// `OffchainMMR` exposes MMR offchain canonicalization and pruning logic.
|
||||
pub struct OffchainMmr<B: Block, BE: Backend<B>, C> {
|
||||
pub backend: Arc<BE>,
|
||||
pub client: Arc<C>,
|
||||
pub offchain_db: OffchainDb<BE::OffchainStorage>,
|
||||
pub indexing_prefix: Vec<u8>,
|
||||
pub first_mmr_block: NumberFor<B>,
|
||||
pub best_canonicalized: NumberFor<B>,
|
||||
}
|
||||
|
||||
impl<B, BE, C> OffchainMmr<B, BE, C>
|
||||
where
|
||||
C: HeaderBackend<B> + HeaderMetadata<B>,
|
||||
S: OffchainStorage,
|
||||
BE: Backend<B>,
|
||||
B: Block,
|
||||
{
|
||||
fn node_temp_offchain_key(&self, pos: NodeIndex, parent_hash: B::Hash) -> Vec<u8> {
|
||||
@@ -77,7 +101,7 @@ where
|
||||
|
||||
fn right_branch_ending_in_block_or_log(
|
||||
&self,
|
||||
block_num: <B::Header as Header>::Number,
|
||||
block_num: NumberFor<B>,
|
||||
action: &str,
|
||||
) -> Option<Vec<NodeIndex>> {
|
||||
match utils::block_num_to_leaf_index::<B::Header>(block_num, self.first_mmr_block) {
|
||||
@@ -128,9 +152,9 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn canonicalize_branch(&mut self, block_hash: &B::Hash) {
|
||||
fn canonicalize_branch(&mut self, block_hash: B::Hash) {
|
||||
let action = "canonicalize";
|
||||
let header = match self.header_metadata_or_log(*block_hash, action) {
|
||||
let header = match self.header_metadata_or_log(block_hash, action) {
|
||||
Some(header) => header,
|
||||
_ => return,
|
||||
};
|
||||
@@ -148,6 +172,7 @@ where
|
||||
None => {
|
||||
// If we can't convert the block number to a leaf index, the chain state is probably
|
||||
// corrupted. We only log the error, hoping that the chain state will be fixed.
|
||||
self.best_canonicalized = header.number;
|
||||
return
|
||||
},
|
||||
};
|
||||
@@ -174,16 +199,58 @@ where
|
||||
);
|
||||
}
|
||||
}
|
||||
if self.best_canonicalized != header.number.saturating_sub(One::one()) {
|
||||
warn!(
|
||||
target: LOG_TARGET,
|
||||
"Detected canonicalization skip: best {:?} current {:?}.",
|
||||
self.best_canonicalized,
|
||||
header.number,
|
||||
);
|
||||
}
|
||||
self.best_canonicalized = header.number;
|
||||
}
|
||||
|
||||
/// In case of missed finality notifications (node restarts for example),
|
||||
/// make sure to also canon everything leading up to `notification.tree_route`.
|
||||
pub fn canonicalize_catch_up(&mut self, notification: &FinalityNotification<B>) {
|
||||
let first = notification.tree_route.first().unwrap_or(¬ification.hash);
|
||||
if let Some(mut header) = self.header_metadata_or_log(*first, "canonicalize") {
|
||||
let mut to_canon = VecDeque::<<B as Block>::Hash>::new();
|
||||
// Walk up the chain adding all blocks newer than `self.best_canonicalized`.
|
||||
loop {
|
||||
header = match self.header_metadata_or_log(header.parent, "canonicalize") {
|
||||
Some(header) => header,
|
||||
_ => break,
|
||||
};
|
||||
if header.number <= self.best_canonicalized {
|
||||
break
|
||||
}
|
||||
to_canon.push_front(header.hash);
|
||||
}
|
||||
// Canonicalize all blocks leading up to current finality notification.
|
||||
for hash in to_canon.drain(..) {
|
||||
self.canonicalize_branch(hash);
|
||||
}
|
||||
if let Err(e) =
|
||||
aux_schema::write_gadget_state::<B, BE>(&*self.backend, &self.best_canonicalized)
|
||||
{
|
||||
debug!(target: LOG_TARGET, "error saving state: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Move leafs and nodes added by finalized blocks in offchain db from _fork-aware key_ to
|
||||
/// _canonical key_.
|
||||
/// Prune leafs and nodes added by stale blocks in offchain db from _fork-aware key_.
|
||||
pub fn canonicalize_and_prune(&mut self, notification: &FinalityNotification<B>) {
|
||||
pub fn canonicalize_and_prune(&mut self, notification: FinalityNotification<B>) {
|
||||
// Move offchain MMR nodes for finalized blocks to canonical keys.
|
||||
for block_hash in notification.tree_route.iter().chain(std::iter::once(¬ification.hash))
|
||||
for hash in notification.tree_route.iter().chain(std::iter::once(¬ification.hash)) {
|
||||
self.canonicalize_branch(*hash);
|
||||
}
|
||||
if let Err(e) =
|
||||
aux_schema::write_gadget_state::<B, BE>(&*self.backend, &self.best_canonicalized)
|
||||
{
|
||||
self.canonicalize_branch(block_hash);
|
||||
debug!(target: LOG_TARGET, "error saving state: {:?}", e);
|
||||
}
|
||||
|
||||
// Remove offchain MMR nodes for stale forks.
|
||||
@@ -201,9 +268,10 @@ where
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::test_utils::run_test_with_mmr_gadget;
|
||||
use crate::test_utils::{run_test_with_mmr_gadget, run_test_with_mmr_gadget_pre_post};
|
||||
use parking_lot::Mutex;
|
||||
use sp_runtime::generic::BlockId;
|
||||
use std::time::Duration;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
#[test]
|
||||
fn canonicalize_and_prune_works_correctly() {
|
||||
@@ -243,4 +311,51 @@ mod tests {
|
||||
client.assert_pruned(&[&b1, &b2, &b3, &a4]);
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn canonicalize_catchup_works_correctly() {
|
||||
let mmr_blocks = Arc::new(Mutex::new(vec![]));
|
||||
let mmr_blocks_ref = mmr_blocks.clone();
|
||||
run_test_with_mmr_gadget_pre_post(
|
||||
|client| async move {
|
||||
// G -> A1 -> A2
|
||||
// | |
|
||||
// | | -> finalized without gadget (missed notification)
|
||||
// |
|
||||
// | -> first mmr block
|
||||
|
||||
let a1 = client.import_block(&BlockId::Number(0), b"a1", Some(0)).await;
|
||||
let a2 = client.import_block(&BlockId::Hash(a1.hash()), b"a2", Some(1)).await;
|
||||
|
||||
client.finalize_block(a2.hash(), Some(2));
|
||||
|
||||
{
|
||||
let mut mmr_blocks = mmr_blocks_ref.lock();
|
||||
mmr_blocks.push(a1);
|
||||
mmr_blocks.push(a2);
|
||||
}
|
||||
},
|
||||
|client| async move {
|
||||
// G -> A1 -> A2 -> A3 -> A4
|
||||
// | | | |
|
||||
// | | | | -> finalized after starting gadget
|
||||
// | | |
|
||||
// | | | -> gadget start
|
||||
// | |
|
||||
// | | -> finalized before starting gadget (missed notification)
|
||||
// |
|
||||
// | -> first mmr block
|
||||
let blocks = mmr_blocks.lock();
|
||||
let a1 = blocks[0].clone();
|
||||
let a2 = blocks[1].clone();
|
||||
let a3 = client.import_block(&BlockId::Hash(a2.hash()), b"a3", Some(2)).await;
|
||||
let a4 = client.import_block(&BlockId::Hash(a3.hash()), b"a4", Some(3)).await;
|
||||
|
||||
client.finalize_block(a4.hash(), Some(4));
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
// expected finalized heads: a1, a2 _and_ a3, a4.
|
||||
client.assert_canonicalized(&[&a1, &a2, &a3, &a4]);
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,12 +16,8 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use std::{
|
||||
future::Future,
|
||||
sync::{Arc, Mutex},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crate::MmrGadget;
|
||||
use parking_lot::Mutex;
|
||||
use sc_block_builder::BlockBuilderProvider;
|
||||
use sc_client_api::{
|
||||
Backend as BackendT, BlockchainEvents, FinalityNotifications, ImportNotifications,
|
||||
@@ -41,33 +37,34 @@ use sp_runtime::{
|
||||
generic::BlockId,
|
||||
traits::{Block as BlockT, Header as HeaderT},
|
||||
};
|
||||
use std::{future::Future, sync::Arc, time::Duration};
|
||||
use substrate_test_runtime_client::{
|
||||
runtime::{Block, BlockNumber, Hash, Header},
|
||||
Backend, BlockBuilderExt, Client, ClientBlockImportExt, ClientExt, DefaultTestClientBuilderExt,
|
||||
TestClientBuilder, TestClientBuilderExt,
|
||||
};
|
||||
|
||||
use crate::MmrGadget;
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
type MmrHash = H256;
|
||||
|
||||
struct MockRuntimeApiData {
|
||||
num_blocks: BlockNumber,
|
||||
pub(crate) struct MockRuntimeApiData {
|
||||
pub(crate) num_blocks: BlockNumber,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MockRuntimeApi {
|
||||
data: Arc<Mutex<MockRuntimeApiData>>,
|
||||
pub(crate) struct MockRuntimeApi {
|
||||
pub(crate) data: Arc<Mutex<MockRuntimeApiData>>,
|
||||
}
|
||||
|
||||
impl MockRuntimeApi {
|
||||
pub const INDEXING_PREFIX: &'static [u8] = b"mmr_test";
|
||||
pub(crate) const INDEXING_PREFIX: &'static [u8] = b"mmr_test";
|
||||
}
|
||||
|
||||
pub struct MmrBlock {
|
||||
block: Block,
|
||||
leaf_idx: Option<LeafIndex>,
|
||||
leaf_data: Vec<u8>,
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct MmrBlock {
|
||||
pub(crate) block: Block,
|
||||
pub(crate) leaf_idx: Option<LeafIndex>,
|
||||
pub(crate) leaf_data: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -90,7 +87,7 @@ impl MmrBlock {
|
||||
OffchainKeyType::Temp => NodesUtils::node_temp_offchain_key::<Header>(
|
||||
MockRuntimeApi::INDEXING_PREFIX,
|
||||
node,
|
||||
*self.block.header.parent_hash(),
|
||||
self.parent_hash(),
|
||||
),
|
||||
OffchainKeyType::Canon =>
|
||||
NodesUtils::node_canon_offchain_key(MockRuntimeApi::INDEXING_PREFIX, node),
|
||||
@@ -98,14 +95,14 @@ impl MmrBlock {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MockClient {
|
||||
client: Mutex<Client<Backend>>,
|
||||
backend: Arc<Backend>,
|
||||
runtime_api_params: Arc<Mutex<MockRuntimeApiData>>,
|
||||
pub(crate) struct MockClient {
|
||||
pub(crate) client: Mutex<Client<Backend>>,
|
||||
pub(crate) backend: Arc<Backend>,
|
||||
pub(crate) runtime_api_params: Arc<Mutex<MockRuntimeApiData>>,
|
||||
}
|
||||
|
||||
impl MockClient {
|
||||
fn new() -> Self {
|
||||
pub(crate) fn new() -> Self {
|
||||
let client_builder = TestClientBuilder::new().enable_offchain_indexing_api();
|
||||
let (client, backend) = client_builder.build_with_backend();
|
||||
MockClient {
|
||||
@@ -115,7 +112,7 @@ impl MockClient {
|
||||
}
|
||||
}
|
||||
|
||||
fn offchain_db(&self) -> OffchainDb<<Backend as BackendT<Block>>::OffchainStorage> {
|
||||
pub(crate) fn offchain_db(&self) -> OffchainDb<<Backend as BackendT<Block>>::OffchainStorage> {
|
||||
OffchainDb::new(self.backend.offchain_storage().unwrap())
|
||||
}
|
||||
|
||||
@@ -125,7 +122,7 @@ impl MockClient {
|
||||
name: &[u8],
|
||||
maybe_leaf_idx: Option<LeafIndex>,
|
||||
) -> MmrBlock {
|
||||
let mut client = self.client.lock().unwrap();
|
||||
let mut client = self.client.lock();
|
||||
|
||||
let mut block_builder = client.new_block_at(at, Default::default(), false).unwrap();
|
||||
// Make sure the block has a different hash than its siblings
|
||||
@@ -157,9 +154,9 @@ impl MockClient {
|
||||
}
|
||||
|
||||
pub fn finalize_block(&self, hash: Hash, maybe_num_mmr_blocks: Option<BlockNumber>) {
|
||||
let client = self.client.lock().unwrap();
|
||||
let client = self.client.lock();
|
||||
if let Some(num_mmr_blocks) = maybe_num_mmr_blocks {
|
||||
self.runtime_api_params.lock().unwrap().num_blocks = num_mmr_blocks;
|
||||
self.runtime_api_params.lock().num_blocks = num_mmr_blocks;
|
||||
}
|
||||
|
||||
client.finalize_block(hash, None).unwrap();
|
||||
@@ -216,7 +213,7 @@ impl HeaderMetadata<Block> for MockClient {
|
||||
type Error = <Client<Backend> as HeaderMetadata<Block>>::Error;
|
||||
|
||||
fn header_metadata(&self, hash: Hash) -> Result<CachedHeaderMetadata<Block>, Self::Error> {
|
||||
self.client.lock().unwrap().header_metadata(hash)
|
||||
self.client.lock().header_metadata(hash)
|
||||
}
|
||||
|
||||
fn insert_header_metadata(&self, _hash: Hash, _header_metadata: CachedHeaderMetadata<Block>) {
|
||||
@@ -230,23 +227,23 @@ impl HeaderMetadata<Block> for MockClient {
|
||||
|
||||
impl HeaderBackend<Block> for MockClient {
|
||||
fn header(&self, id: BlockId<Block>) -> sc_client_api::blockchain::Result<Option<Header>> {
|
||||
self.client.lock().unwrap().header(&id)
|
||||
self.client.lock().header(&id)
|
||||
}
|
||||
|
||||
fn info(&self) -> Info<Block> {
|
||||
self.client.lock().unwrap().info()
|
||||
self.client.lock().info()
|
||||
}
|
||||
|
||||
fn status(&self, id: BlockId<Block>) -> sc_client_api::blockchain::Result<BlockStatus> {
|
||||
self.client.lock().unwrap().status(id)
|
||||
self.client.lock().status(id)
|
||||
}
|
||||
|
||||
fn number(&self, hash: Hash) -> sc_client_api::blockchain::Result<Option<BlockNumber>> {
|
||||
self.client.lock().unwrap().number(hash)
|
||||
self.client.lock().number(hash)
|
||||
}
|
||||
|
||||
fn hash(&self, number: BlockNumber) -> sc_client_api::blockchain::Result<Option<Hash>> {
|
||||
self.client.lock().unwrap().hash(number)
|
||||
self.client.lock().hash(number)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -256,7 +253,7 @@ impl BlockchainEvents<Block> for MockClient {
|
||||
}
|
||||
|
||||
fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
|
||||
self.client.lock().unwrap().finality_notification_stream()
|
||||
self.client.lock().finality_notification_stream()
|
||||
}
|
||||
|
||||
fn storage_changes_notification_stream(
|
||||
@@ -283,7 +280,7 @@ sp_api::mock_impl_runtime_apis! {
|
||||
}
|
||||
|
||||
fn mmr_leaf_count(&self) -> Result<LeafIndex, mmr::Error> {
|
||||
Ok(self.data.lock().unwrap().num_blocks)
|
||||
Ok(self.data.lock().num_blocks)
|
||||
}
|
||||
|
||||
fn generate_proof(
|
||||
@@ -310,13 +307,38 @@ sp_api::mock_impl_runtime_apis! {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run_test_with_mmr_gadget<F, Fut>(f: F)
|
||||
pub(crate) fn run_test_with_mmr_gadget<F, Fut>(post_gadget: F)
|
||||
where
|
||||
F: FnOnce(Arc<MockClient>) -> Fut + 'static,
|
||||
Fut: Future<Output = ()>,
|
||||
{
|
||||
let runtime = tokio::runtime::Runtime::new().unwrap();
|
||||
run_test_with_mmr_gadget_pre_post(|_| async {}, post_gadget);
|
||||
}
|
||||
|
||||
pub(crate) fn run_test_with_mmr_gadget_pre_post<F, G, RetF, RetG>(pre_gadget: F, post_gadget: G)
|
||||
where
|
||||
F: FnOnce(Arc<MockClient>) -> RetF + 'static,
|
||||
G: FnOnce(Arc<MockClient>) -> RetG + 'static,
|
||||
RetF: Future<Output = ()>,
|
||||
RetG: Future<Output = ()>,
|
||||
{
|
||||
let client = Arc::new(MockClient::new());
|
||||
run_test_with_mmr_gadget_pre_post_using_client(client, pre_gadget, post_gadget)
|
||||
}
|
||||
|
||||
pub(crate) fn run_test_with_mmr_gadget_pre_post_using_client<F, G, RetF, RetG>(
|
||||
client: Arc<MockClient>,
|
||||
pre_gadget: F,
|
||||
post_gadget: G,
|
||||
) where
|
||||
F: FnOnce(Arc<MockClient>) -> RetF + 'static,
|
||||
G: FnOnce(Arc<MockClient>) -> RetG + 'static,
|
||||
RetF: Future<Output = ()>,
|
||||
RetG: Future<Output = ()>,
|
||||
{
|
||||
let client_clone = client.clone();
|
||||
let runtime = Runtime::new().unwrap();
|
||||
runtime.block_on(async move { pre_gadget(client_clone).await });
|
||||
|
||||
let client_clone = client.clone();
|
||||
runtime.spawn(async move {
|
||||
@@ -327,6 +349,6 @@ where
|
||||
runtime.block_on(async move {
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
f(client).await
|
||||
post_gadget(client).await
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user