mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-08 11:18:01 +00:00
Split tx pool and offchain notification handling (#6231)
Instead of having the tx pool and offchain worker being feed from the same import notification stream, this pr splits them to use two different streams. The first advantage of this split is that the tx pool will not be spawned anymore in another task and instead will directly process the notification in the same task. This has the advantage of being faster when the system is being under load, as the tx pool will not be waiting for being scheduled to handle the notification.
This commit is contained in:
@@ -1025,63 +1025,74 @@ ServiceBuilder<
|
||||
|
||||
let spawn_handle = task_manager.spawn_handle();
|
||||
|
||||
// Inform the tx pool about imported and finalized blocks.
|
||||
{
|
||||
// block notifications
|
||||
let txpool = Arc::downgrade(&transaction_pool);
|
||||
|
||||
let mut import_stream = client.import_notification_stream().map(|n| ChainEvent::NewBlock {
|
||||
id: BlockId::Hash(n.hash),
|
||||
header: n.header,
|
||||
retracted: n.retracted,
|
||||
is_new_best: n.is_new_best,
|
||||
}).fuse();
|
||||
let mut finality_stream = client.finality_notification_stream()
|
||||
.map(|n| ChainEvent::Finalized::<TBl> { hash: n.hash })
|
||||
.fuse();
|
||||
|
||||
let events = async move {
|
||||
loop {
|
||||
let evt = futures::select! {
|
||||
evt = import_stream.next() => evt,
|
||||
evt = finality_stream.next() => evt,
|
||||
complete => return,
|
||||
};
|
||||
|
||||
let txpool = txpool.upgrade();
|
||||
if let Some((txpool, evt)) = txpool.and_then(|tp| evt.map(|evt| (tp, evt))) {
|
||||
txpool.maintain(evt).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
spawn_handle.spawn(
|
||||
"txpool-notifications",
|
||||
events,
|
||||
);
|
||||
}
|
||||
|
||||
// Inform the offchain worker about new imported blocks
|
||||
{
|
||||
let offchain = offchain_workers.as_ref().map(Arc::downgrade);
|
||||
let notifications_spawn_handle = task_manager.spawn_handle();
|
||||
let network_state_info: Arc<dyn NetworkStateInfo + Send + Sync> = network.clone();
|
||||
let is_validator = config.role.is_authority();
|
||||
|
||||
let (import_stream, finality_stream) = (
|
||||
client.import_notification_stream().map(|n| ChainEvent::NewBlock {
|
||||
id: BlockId::Hash(n.hash),
|
||||
header: n.header,
|
||||
retracted: n.retracted,
|
||||
is_new_best: n.is_new_best,
|
||||
}),
|
||||
client.finality_notification_stream().map(|n| ChainEvent::Finalized {
|
||||
hash: n.hash
|
||||
})
|
||||
);
|
||||
let events = futures::stream::select(import_stream, finality_stream)
|
||||
.for_each(move |event| {
|
||||
// offchain worker is only interested in block import events
|
||||
if let ChainEvent::NewBlock { ref header, is_new_best, .. } = event {
|
||||
let offchain = offchain.as_ref().and_then(|o| o.upgrade());
|
||||
match offchain {
|
||||
Some(offchain) if is_new_best => {
|
||||
notifications_spawn_handle.spawn(
|
||||
"offchain-on-block",
|
||||
offchain.on_block_imported(
|
||||
&header,
|
||||
network_state_info.clone(),
|
||||
is_validator,
|
||||
),
|
||||
);
|
||||
},
|
||||
Some(_) => log::debug!(
|
||||
target: "sc_offchain",
|
||||
"Skipping offchain workers for non-canon block: {:?}",
|
||||
header,
|
||||
),
|
||||
_ => {},
|
||||
}
|
||||
};
|
||||
|
||||
let txpool = txpool.upgrade();
|
||||
if let Some(txpool) = txpool.as_ref() {
|
||||
let events = client.import_notification_stream().for_each(move |n| {
|
||||
let offchain = offchain.as_ref().and_then(|o| o.upgrade());
|
||||
match offchain {
|
||||
Some(offchain) if n.is_new_best => {
|
||||
notifications_spawn_handle.spawn(
|
||||
"txpool-maintain",
|
||||
txpool.maintain(event),
|
||||
"offchain-on-block",
|
||||
offchain.on_block_imported(
|
||||
&n.header,
|
||||
network_state_info.clone(),
|
||||
is_validator,
|
||||
),
|
||||
);
|
||||
}
|
||||
},
|
||||
Some(_) => log::debug!(
|
||||
target: "sc_offchain",
|
||||
"Skipping offchain workers for non-canon block: {:?}",
|
||||
n.header,
|
||||
),
|
||||
_ => {},
|
||||
}
|
||||
|
||||
ready(())
|
||||
});
|
||||
ready(())
|
||||
});
|
||||
|
||||
spawn_handle.spawn(
|
||||
"txpool-and-offchain-notif",
|
||||
"offchain-notifications",
|
||||
events,
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user