diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 699e3320ff..adfdfb4b63 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -208,11 +208,9 @@ impl LockImportRun for Client let ClientImportOperation { op, notify_imported, notify_finalized } = op; self.backend.commit_operation(op)?; - self.notify_finalized(notify_finalized)?; - if let Some(notify_imported) = notify_imported { - self.notify_imported(notify_imported)?; - } + self.notify_finalized(notify_finalized)?; + self.notify_imported(notify_imported)?; Ok(r) }; @@ -919,6 +917,15 @@ impl Client where ) -> sp_blockchain::Result<()> { let mut sinks = self.finality_notification_sinks.lock(); + if notify_finalized.is_empty() { + // cleanup any closed finality notification sinks + // since we won't be running the loop below which + // would also remove any closed sinks. + sinks.retain(|sink| !sink.is_closed()); + + return Ok(()); + } + for finalized_hash in notify_finalized { let header = self.header(&BlockId::Hash(finalized_hash))? .expect("header already known to exist in DB because it is indicated in the tree route; qed"); @@ -939,7 +946,27 @@ impl Client where Ok(()) } - fn notify_imported(&self, notify_import: ImportSummary) -> sp_blockchain::Result<()> { + fn notify_imported( + &self, + notify_import: Option>, + ) -> sp_blockchain::Result<()> { + let notify_import = match notify_import { + Some(notify_import) => notify_import, + None => { + // cleanup any closed import notification sinks since we won't + // be sending any notifications below which would remove any + // closed sinks. this is necessary since during initial sync we + // won't send any import notifications which could lead to a + // temporary leak of closed/discarded notification sinks (e.g. + // from consensus code). + self.import_notification_sinks + .lock() + .retain(|sink| !sink.is_closed()); + + return Ok(()); + } + }; + if let Some(storage_changes) = notify_import.storage_changes { // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? self.storage_notifications.lock() @@ -3410,4 +3437,79 @@ pub(crate) mod tests { .collect(); assert_eq!(res, [hex!("cf722c0832b5231d35e29f319ff27389f5032bfc7bfc3ba5ed7839f2042fb99f").to_vec()]); } + + #[test] + fn cleans_up_closed_notification_sinks_on_block_import() { + use substrate_test_runtime_client::GenesisInit; + + // NOTE: we need to build the client here instead of using the client + // provided by test_runtime_client otherwise we can't access the private + // `import_notification_sinks` and `finality_notification_sinks` fields. + let mut client = + new_in_mem::< + _, + substrate_test_runtime_client::runtime::Block, + _, + substrate_test_runtime_client::runtime::RuntimeApi + >( + substrate_test_runtime_client::new_native_executor(), + &substrate_test_runtime_client::GenesisParameters::default().genesis_storage(), + None, + None, + ) + .unwrap(); + + type TestClient = Client< + in_mem::Backend, + LocalCallExecutor, sc_executor::NativeExecutor>, + substrate_test_runtime_client::runtime::Block, + substrate_test_runtime_client::runtime::RuntimeApi, + >; + + let import_notif1 = client.import_notification_stream(); + let import_notif2 = client.import_notification_stream(); + let finality_notif1 = client.finality_notification_stream(); + let finality_notif2 = client.finality_notification_stream(); + + // for some reason I can't seem to use `ClientBlockImportExt` + let bake_and_import_block = |client: &mut TestClient, origin| { + let block = client + .new_block(Default::default()) + .unwrap() + .build() + .unwrap() + .block; + + let (header, extrinsics) = block.deconstruct(); + let mut import = BlockImportParams::new(origin, header); + import.body = Some(extrinsics); + import.fork_choice = Some(ForkChoiceStrategy::LongestChain); + client.import_block(import, Default::default()).unwrap(); + }; + + // after importing a block we should still have 4 notification sinks + // (2 import + 2 finality) + bake_and_import_block(&mut client, BlockOrigin::Own); + assert_eq!(client.import_notification_sinks.lock().len(), 2); + assert_eq!(client.finality_notification_sinks.lock().len(), 2); + + // if we drop one import notification receiver and one finality + // notification receiver + drop(import_notif2); + drop(finality_notif2); + + // the sinks should be cleaned up after block import + bake_and_import_block(&mut client, BlockOrigin::Own); + assert_eq!(client.import_notification_sinks.lock().len(), 1); + assert_eq!(client.finality_notification_sinks.lock().len(), 1); + + // the same thing should happen if block import happens during initial + // sync + drop(import_notif1); + drop(finality_notif1); + + bake_and_import_block(&mut client, BlockOrigin::NetworkInitialSync); + assert_eq!(client.import_notification_sinks.lock().len(), 0); + assert_eq!(client.finality_notification_sinks.lock().len(), 0); + } }