mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-09 19:57:59 +00:00
Support the subscription of every imported block (#13372)
* Support the subscription of every import block Close #13315 * Clean up any closed block import notification sinks * Apply review suggestions * Nit * `every_block_import_notification_sinks` -> `every_import_notification_sinks` * Apply review suggestions
This commit is contained in:
@@ -48,6 +48,19 @@ pub type TransactionForSB<B, Block> = <B as StateBackend<HashFor<Block>>>::Trans
|
||||
/// Extracts the transaction for the given backend.
|
||||
pub type TransactionFor<B, Block> = TransactionForSB<StateBackendFor<B, Block>, Block>;
|
||||
|
||||
/// Describes which block import notification stream should be notified.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum ImportNotificationAction {
|
||||
/// Notify only when the node has synced to the tip or there is a re-org.
|
||||
RecentBlock,
|
||||
/// Notify for every single block no matter what the sync state is.
|
||||
EveryBlock,
|
||||
/// Both block import notifications above should be fired.
|
||||
Both,
|
||||
/// No block import notification should be fired.
|
||||
None,
|
||||
}
|
||||
|
||||
/// Import operation summary.
|
||||
///
|
||||
/// Contains information about the block that just got imported,
|
||||
@@ -67,6 +80,8 @@ pub struct ImportSummary<Block: BlockT> {
|
||||
///
|
||||
/// If `None`, there was no re-org while importing.
|
||||
pub tree_route: Option<sp_blockchain::TreeRoute<Block>>,
|
||||
/// What notify action to take for this import.
|
||||
pub import_notification_action: ImportNotificationAction,
|
||||
}
|
||||
|
||||
/// Finalization operation summary.
|
||||
|
||||
@@ -59,10 +59,16 @@ pub trait BlockOf {
|
||||
|
||||
/// A source of blockchain events.
|
||||
pub trait BlockchainEvents<Block: BlockT> {
|
||||
/// Get block import event stream. Not guaranteed to be fired for every
|
||||
/// imported block.
|
||||
/// Get block import event stream.
|
||||
///
|
||||
/// Not guaranteed to be fired for every imported block, only fired when the node
|
||||
/// has synced to the tip or there is a re-org. Use `every_import_notification_stream()`
|
||||
/// if you want a notification of every imported block regardless.
|
||||
fn import_notification_stream(&self) -> ImportNotifications<Block>;
|
||||
|
||||
/// Get a stream of every imported block.
|
||||
fn every_import_notification_stream(&self) -> ImportNotifications<Block>;
|
||||
|
||||
/// Get a stream of finality notifications. Not guaranteed to be fired for every
|
||||
/// finalized block.
|
||||
fn finality_notification_stream(&self) -> FinalityNotifications<Block>;
|
||||
|
||||
@@ -265,6 +265,10 @@ impl BlockchainEvents<Block> for MockClient {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn every_import_notification_stream(&self) -> ImportNotifications<Block> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
|
||||
self.client.lock().finality_notification_stream()
|
||||
}
|
||||
|
||||
@@ -31,7 +31,7 @@ use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider, RecordProof};
|
||||
use sc_client_api::{
|
||||
backend::{
|
||||
self, apply_aux, BlockImportOperation, ClientImportOperation, FinalizeSummary, Finalizer,
|
||||
ImportSummary, LockImportRun, NewBlockState, StorageProvider,
|
||||
ImportNotificationAction, ImportSummary, LockImportRun, NewBlockState, StorageProvider,
|
||||
},
|
||||
client::{
|
||||
BadBlocks, BlockBackend, BlockImportNotification, BlockOf, BlockchainEvents, ClientInfo,
|
||||
@@ -106,6 +106,7 @@ where
|
||||
executor: E,
|
||||
storage_notifications: StorageNotifications<Block>,
|
||||
import_notification_sinks: NotificationSinks<BlockImportNotification<Block>>,
|
||||
every_import_notification_sinks: NotificationSinks<BlockImportNotification<Block>>,
|
||||
finality_notification_sinks: NotificationSinks<FinalityNotification<Block>>,
|
||||
// Collects auxiliary operations to be performed atomically together with
|
||||
// block import operations.
|
||||
@@ -304,19 +305,22 @@ where
|
||||
FinalityNotification::from_summary(summary, self.unpin_worker_sender.clone())
|
||||
});
|
||||
|
||||
let (import_notification, storage_changes) = match notify_imported {
|
||||
Some(mut summary) => {
|
||||
let storage_changes = summary.storage_changes.take();
|
||||
(
|
||||
Some(BlockImportNotification::from_summary(
|
||||
summary,
|
||||
self.unpin_worker_sender.clone(),
|
||||
)),
|
||||
storage_changes,
|
||||
)
|
||||
},
|
||||
None => (None, None),
|
||||
};
|
||||
let (import_notification, storage_changes, import_notification_action) =
|
||||
match notify_imported {
|
||||
Some(mut summary) => {
|
||||
let import_notification_action = summary.import_notification_action;
|
||||
let storage_changes = summary.storage_changes.take();
|
||||
(
|
||||
Some(BlockImportNotification::from_summary(
|
||||
summary,
|
||||
self.unpin_worker_sender.clone(),
|
||||
)),
|
||||
storage_changes,
|
||||
import_notification_action,
|
||||
)
|
||||
},
|
||||
None => (None, None, ImportNotificationAction::None),
|
||||
};
|
||||
|
||||
if let Some(ref notification) = finality_notification {
|
||||
for action in self.finality_actions.lock().iter_mut() {
|
||||
@@ -353,7 +357,7 @@ where
|
||||
}
|
||||
|
||||
self.notify_finalized(finality_notification)?;
|
||||
self.notify_imported(import_notification, storage_changes)?;
|
||||
self.notify_imported(import_notification, import_notification_action, storage_changes)?;
|
||||
|
||||
Ok(r)
|
||||
};
|
||||
@@ -451,6 +455,7 @@ where
|
||||
executor,
|
||||
storage_notifications: StorageNotifications::new(prometheus_registry),
|
||||
import_notification_sinks: Default::default(),
|
||||
every_import_notification_sinks: Default::default(),
|
||||
finality_notification_sinks: Default::default(),
|
||||
import_actions: Default::default(),
|
||||
finality_actions: Default::default(),
|
||||
@@ -769,11 +774,15 @@ where
|
||||
|
||||
operation.op.insert_aux(aux)?;
|
||||
|
||||
// We only notify when we are already synced to the tip of the chain
|
||||
let should_notify_every_block = !self.every_import_notification_sinks.lock().is_empty();
|
||||
|
||||
// Notify when we are already synced to the tip of the chain
|
||||
// or if this import triggers a re-org
|
||||
if make_notifications || tree_route.is_some() {
|
||||
let should_notify_recent_block = make_notifications || tree_route.is_some();
|
||||
|
||||
if should_notify_every_block || should_notify_recent_block {
|
||||
let header = import_headers.into_post();
|
||||
if finalized {
|
||||
if finalized && should_notify_recent_block {
|
||||
let mut summary = match operation.notify_finalized.take() {
|
||||
Some(mut summary) => {
|
||||
summary.header = header.clone();
|
||||
@@ -810,6 +819,16 @@ where
|
||||
operation.notify_finalized = Some(summary);
|
||||
}
|
||||
|
||||
let import_notification_action = if should_notify_every_block {
|
||||
if should_notify_recent_block {
|
||||
ImportNotificationAction::Both
|
||||
} else {
|
||||
ImportNotificationAction::EveryBlock
|
||||
}
|
||||
} else {
|
||||
ImportNotificationAction::RecentBlock
|
||||
};
|
||||
|
||||
operation.notify_imported = Some(ImportSummary {
|
||||
hash,
|
||||
origin,
|
||||
@@ -817,6 +836,7 @@ where
|
||||
is_new_best,
|
||||
storage_changes,
|
||||
tree_route,
|
||||
import_notification_action,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1012,6 +1032,7 @@ where
|
||||
fn notify_imported(
|
||||
&self,
|
||||
notification: Option<BlockImportNotification<Block>>,
|
||||
import_notification_action: ImportNotificationAction,
|
||||
storage_changes: Option<(StorageCollection, ChildStorageCollection)>,
|
||||
) -> sp_blockchain::Result<()> {
|
||||
let notification = match notification {
|
||||
@@ -1024,22 +1045,59 @@ where
|
||||
// temporary leak of closed/discarded notification sinks (e.g.
|
||||
// from consensus code).
|
||||
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());
|
||||
|
||||
self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed());
|
||||
|
||||
return Ok(())
|
||||
},
|
||||
};
|
||||
|
||||
if let Some(storage_changes) = storage_changes {
|
||||
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
|
||||
self.storage_notifications.trigger(
|
||||
¬ification.hash,
|
||||
storage_changes.0.into_iter(),
|
||||
storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())),
|
||||
);
|
||||
}
|
||||
let trigger_storage_changes_notification = || {
|
||||
if let Some(storage_changes) = storage_changes {
|
||||
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
|
||||
self.storage_notifications.trigger(
|
||||
¬ification.hash,
|
||||
storage_changes.0.into_iter(),
|
||||
storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
self.import_notification_sinks
|
||||
.lock()
|
||||
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
|
||||
match import_notification_action {
|
||||
ImportNotificationAction::Both => {
|
||||
trigger_storage_changes_notification();
|
||||
self.import_notification_sinks
|
||||
.lock()
|
||||
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
|
||||
|
||||
self.every_import_notification_sinks
|
||||
.lock()
|
||||
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
|
||||
},
|
||||
ImportNotificationAction::RecentBlock => {
|
||||
trigger_storage_changes_notification();
|
||||
self.import_notification_sinks
|
||||
.lock()
|
||||
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
|
||||
|
||||
self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed());
|
||||
},
|
||||
ImportNotificationAction::EveryBlock => {
|
||||
self.every_import_notification_sinks
|
||||
.lock()
|
||||
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
|
||||
|
||||
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());
|
||||
},
|
||||
ImportNotificationAction::None => {
|
||||
// This branch is unreachable in fact because the block import notification must be
|
||||
// Some(_) instead of None (it's already handled at the beginning of this function)
|
||||
// at this point.
|
||||
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());
|
||||
|
||||
self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed());
|
||||
},
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1944,6 +2002,12 @@ where
|
||||
stream
|
||||
}
|
||||
|
||||
fn every_import_notification_stream(&self) -> ImportNotifications<Block> {
|
||||
let (sink, stream) = tracing_unbounded("mpsc_every_import_notification_stream", 100_000);
|
||||
self.every_import_notification_sinks.lock().push(sink);
|
||||
stream
|
||||
}
|
||||
|
||||
fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
|
||||
let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream", 100_000);
|
||||
self.finality_notification_sinks.lock().push(sink);
|
||||
|
||||
Reference in New Issue
Block a user