Notification-based block pinning (#13157)

* Worker

* Reorganize and unpin onnotification drop

* Pin in state-db, pass block number

* Pin blocks in blockchain db

* Switch to reference counted LRU

* Disable pinning when we keep all blocks

* Fix pinning hint for state-db

* Remove pinning from backend layer

* Improve readability

* Add justifications to test

* Fix justification behaviour

* Remove debug prints

* Convert channels to tracing_unbounded

* Add comments to the test

* Documentation and Cleanup

* Move task start to client

* Simplify cache

* Improve test, remove unwanted log

* Add tracing logs, remove expect for block number

* Cleanup

* Add conversion method for unpin handle to Finalitynotification

* Revert unwanted changes

* Improve naming

* Make clippy happy

* Fix docs

Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>

* Use `NumberFor` instead of u64 in API

* Hand over weak reference to unpin worker task

* Unwanted

* &Hash -> Hash

* Remove number from interface, rename `_unpin_handle`, LOG_TARGET

* Move RwLock one layer up

* Apply code style suggestions

* Improve comments

* Replace lru crate by schnellru

* Only insert values for pinned items + better docs

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Improve comments, log target and test

Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>
Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
Sebastian Kunert
2023-01-19 16:13:16 +01:00
committed by GitHub
parent 7a1958ca4e
commit 9be655701a
11 changed files with 962 additions and 107 deletions
+3 -1
View File
@@ -329,13 +329,15 @@ where
let executor = crate::client::LocalCallExecutor::new(
backend.clone(),
executor,
spawn_handle,
spawn_handle.clone(),
config.clone(),
execution_extensions,
)?;
crate::client::Client::new(
backend,
executor,
spawn_handle,
genesis_block_builder,
fork_blocks,
bad_blocks,
+67 -10
View File
@@ -22,7 +22,8 @@ use super::{
block_rules::{BlockRules, LookupResult as BlockLookupResult},
genesis::BuildGenesisBlock,
};
use log::{info, trace, warn};
use futures::{FutureExt, StreamExt};
use log::{error, info, trace, warn};
use parking_lot::{Mutex, RwLock};
use prometheus_endpoint::Registry;
use rand::Rng;
@@ -58,9 +59,12 @@ use sp_blockchain::{
use sp_consensus::{BlockOrigin, BlockStatus, Error as ConsensusError};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender};
use sp_core::storage::{
well_known_keys, ChildInfo, ChildType, PrefixedStorageKey, Storage, StorageChild, StorageData,
StorageKey,
use sp_core::{
storage::{
well_known_keys, ChildInfo, ChildType, PrefixedStorageKey, Storage, StorageChild,
StorageData, StorageKey,
},
traits::SpawnNamed,
};
#[cfg(feature = "test-helpers")]
use sp_keystore::SyncCryptoStorePtr;
@@ -88,9 +92,7 @@ use std::{
#[cfg(feature = "test-helpers")]
use {
super::call_executor::LocalCallExecutor,
sc_client_api::in_mem,
sp_core::traits::{CodeExecutor, SpawnNamed},
super::call_executor::LocalCallExecutor, sc_client_api::in_mem, sp_core::traits::CodeExecutor,
};
type NotificationSinks<T> = Mutex<Vec<TracingUnboundedSender<T>>>;
@@ -116,6 +118,7 @@ where
block_rules: BlockRules<Block>,
config: ClientConfig<Block>,
telemetry: Option<TelemetryHandle>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
_phantom: PhantomData<RA>,
}
@@ -246,7 +249,7 @@ where
let call_executor = LocalCallExecutor::new(
backend.clone(),
executor,
spawn_handle,
spawn_handle.clone(),
config.clone(),
extensions,
)?;
@@ -254,6 +257,7 @@ where
Client::new(
backend,
call_executor,
spawn_handle,
genesis_block_builder,
Default::default(),
Default::default(),
@@ -296,11 +300,20 @@ where
let ClientImportOperation { mut op, notify_imported, notify_finalized } = op;
let finality_notification = notify_finalized.map(|summary| summary.into());
let finality_notification = notify_finalized.map(|summary| {
FinalityNotification::from_summary(summary, self.unpin_worker_sender.clone())
});
let (import_notification, storage_changes) = match notify_imported {
Some(mut summary) => {
let storage_changes = summary.storage_changes.take();
(Some(summary.into()), storage_changes)
(
Some(BlockImportNotification::from_summary(
summary,
self.unpin_worker_sender.clone(),
)),
storage_changes,
)
},
None => (None, None),
};
@@ -318,6 +331,27 @@ where
self.backend.commit_operation(op)?;
// We need to pin the block in the backend once
// for each notification. Once all notifications are
// dropped, the block will be unpinned automatically.
if let Some(ref notification) = finality_notification {
if let Err(err) = self.backend.pin_block(notification.hash) {
error!(
"Unable to pin block for finality notification. hash: {}, Error: {}",
notification.hash, err
);
};
}
if let Some(ref notification) = import_notification {
if let Err(err) = self.backend.pin_block(notification.hash) {
error!(
"Unable to pin block for import notification. hash: {}, Error: {}",
notification.hash, err
);
};
}
self.notify_finalized(finality_notification)?;
self.notify_imported(import_notification, storage_changes)?;
@@ -357,6 +391,7 @@ where
pub fn new<G>(
backend: Arc<B>,
executor: E,
spawn_handle: Box<dyn SpawnNamed>,
genesis_block_builder: G,
fork_blocks: ForkBlocks<Block>,
bad_blocks: BadBlocks<Block>,
@@ -369,6 +404,7 @@ where
Block,
BlockImportOperation = <B as backend::Backend<Block>>::BlockImportOperation,
>,
B: 'static,
{
let info = backend.blockchain().info();
if info.finalized_state.is_none() {
@@ -390,6 +426,26 @@ where
backend.commit_operation(op)?;
}
let (unpin_worker_sender, mut rx) =
tracing_unbounded::<Block::Hash>("unpin-worker-channel", 10_000);
let task_backend = Arc::downgrade(&backend);
spawn_handle.spawn(
"unpin-worker",
None,
async move {
while let Some(message) = rx.next().await {
if let Some(backend) = task_backend.upgrade() {
backend.unpin_block(message);
} else {
log::debug!("Terminating unpin-worker, backend reference was dropped.");
return
}
}
log::debug!("Terminating unpin-worker, stream terminated.")
}
.boxed(),
);
Ok(Client {
backend,
executor,
@@ -402,6 +458,7 @@ where
block_rules: BlockRules::new(fork_blocks, bad_blocks),
config,
telemetry,
unpin_worker_sender,
_phantom: Default::default(),
})
}