mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 12:51:05 +00:00
client: fix notification sinks leak during initial sync (#5161)
* client: fix notification sinks leak during initial sync * client: add test for notification sink cleanup * Make it compile * Further cleanup * client: fix test for notification sinks cleanup Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
@@ -208,11 +208,9 @@ impl<B, E, Block, RA> LockImportRun<Block, B> for Client<B, E, Block, RA>
|
||||
|
||||
let ClientImportOperation { op, notify_imported, notify_finalized } = op;
|
||||
self.backend.commit_operation(op)?;
|
||||
self.notify_finalized(notify_finalized)?;
|
||||
|
||||
if let Some(notify_imported) = notify_imported {
|
||||
self.notify_imported(notify_imported)?;
|
||||
}
|
||||
self.notify_finalized(notify_finalized)?;
|
||||
self.notify_imported(notify_imported)?;
|
||||
|
||||
Ok(r)
|
||||
};
|
||||
@@ -919,6 +917,15 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
|
||||
) -> sp_blockchain::Result<()> {
|
||||
let mut sinks = self.finality_notification_sinks.lock();
|
||||
|
||||
if notify_finalized.is_empty() {
|
||||
// cleanup any closed finality notification sinks
|
||||
// since we won't be running the loop below which
|
||||
// would also remove any closed sinks.
|
||||
sinks.retain(|sink| !sink.is_closed());
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
for finalized_hash in notify_finalized {
|
||||
let header = self.header(&BlockId::Hash(finalized_hash))?
|
||||
.expect("header already known to exist in DB because it is indicated in the tree route; qed");
|
||||
@@ -939,7 +946,27 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn notify_imported(&self, notify_import: ImportSummary<Block>) -> sp_blockchain::Result<()> {
|
||||
fn notify_imported(
|
||||
&self,
|
||||
notify_import: Option<ImportSummary<Block>>,
|
||||
) -> sp_blockchain::Result<()> {
|
||||
let notify_import = match notify_import {
|
||||
Some(notify_import) => notify_import,
|
||||
None => {
|
||||
// cleanup any closed import notification sinks since we won't
|
||||
// be sending any notifications below which would remove any
|
||||
// closed sinks. this is necessary since during initial sync we
|
||||
// won't send any import notifications which could lead to a
|
||||
// temporary leak of closed/discarded notification sinks (e.g.
|
||||
// from consensus code).
|
||||
self.import_notification_sinks
|
||||
.lock()
|
||||
.retain(|sink| !sink.is_closed());
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(storage_changes) = notify_import.storage_changes {
|
||||
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
|
||||
self.storage_notifications.lock()
|
||||
@@ -3410,4 +3437,79 @@ pub(crate) mod tests {
|
||||
.collect();
|
||||
assert_eq!(res, [hex!("cf722c0832b5231d35e29f319ff27389f5032bfc7bfc3ba5ed7839f2042fb99f").to_vec()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cleans_up_closed_notification_sinks_on_block_import() {
|
||||
use substrate_test_runtime_client::GenesisInit;
|
||||
|
||||
// NOTE: we need to build the client here instead of using the client
|
||||
// provided by test_runtime_client otherwise we can't access the private
|
||||
// `import_notification_sinks` and `finality_notification_sinks` fields.
|
||||
let mut client =
|
||||
new_in_mem::<
|
||||
_,
|
||||
substrate_test_runtime_client::runtime::Block,
|
||||
_,
|
||||
substrate_test_runtime_client::runtime::RuntimeApi
|
||||
>(
|
||||
substrate_test_runtime_client::new_native_executor(),
|
||||
&substrate_test_runtime_client::GenesisParameters::default().genesis_storage(),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
type TestClient = Client<
|
||||
in_mem::Backend<Block>,
|
||||
LocalCallExecutor<in_mem::Backend<Block>, sc_executor::NativeExecutor<LocalExecutor>>,
|
||||
substrate_test_runtime_client::runtime::Block,
|
||||
substrate_test_runtime_client::runtime::RuntimeApi,
|
||||
>;
|
||||
|
||||
let import_notif1 = client.import_notification_stream();
|
||||
let import_notif2 = client.import_notification_stream();
|
||||
let finality_notif1 = client.finality_notification_stream();
|
||||
let finality_notif2 = client.finality_notification_stream();
|
||||
|
||||
// for some reason I can't seem to use `ClientBlockImportExt`
|
||||
let bake_and_import_block = |client: &mut TestClient, origin| {
|
||||
let block = client
|
||||
.new_block(Default::default())
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap()
|
||||
.block;
|
||||
|
||||
let (header, extrinsics) = block.deconstruct();
|
||||
let mut import = BlockImportParams::new(origin, header);
|
||||
import.body = Some(extrinsics);
|
||||
import.fork_choice = Some(ForkChoiceStrategy::LongestChain);
|
||||
client.import_block(import, Default::default()).unwrap();
|
||||
};
|
||||
|
||||
// after importing a block we should still have 4 notification sinks
|
||||
// (2 import + 2 finality)
|
||||
bake_and_import_block(&mut client, BlockOrigin::Own);
|
||||
assert_eq!(client.import_notification_sinks.lock().len(), 2);
|
||||
assert_eq!(client.finality_notification_sinks.lock().len(), 2);
|
||||
|
||||
// if we drop one import notification receiver and one finality
|
||||
// notification receiver
|
||||
drop(import_notif2);
|
||||
drop(finality_notif2);
|
||||
|
||||
// the sinks should be cleaned up after block import
|
||||
bake_and_import_block(&mut client, BlockOrigin::Own);
|
||||
assert_eq!(client.import_notification_sinks.lock().len(), 1);
|
||||
assert_eq!(client.finality_notification_sinks.lock().len(), 1);
|
||||
|
||||
// the same thing should happen if block import happens during initial
|
||||
// sync
|
||||
drop(import_notif1);
|
||||
drop(finality_notif1);
|
||||
|
||||
bake_and_import_block(&mut client, BlockOrigin::NetworkInitialSync);
|
||||
assert_eq!(client.import_notification_sinks.lock().len(), 0);
|
||||
assert_eq!(client.finality_notification_sinks.lock().len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user