backend: Detect chainHead stream termination and stop polling

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2024-01-10 11:48:44 +02:00
parent b3a467526d
commit d9169e2404
+96 -38
View File
@@ -553,6 +553,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
let mut mem_log = vec![];
let mut loop_times = 0;
let mut should_poll_blocks = true;
// Now we can attempt to associate tx events with pinned blocks.
let tx_stream = futures::stream::poll_fn(move |cx| {
loop {
@@ -575,52 +577,108 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
}
// Make a note of new or finalized blocks that have come in since we started the TX.
if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) {
match seen_block {
SeenBlock::New((block_ref, parent)) => {
// Optimization: once we have a `finalized_hash`, we only care about finalized
// block refs now and can avoid bothering to save new blocks.
// if finalized_hash.is_none() {
seen_blocks.insert(
block_ref.hash(),
(
SeenBlockMarker::New,
block_ref,
parent,
Some(now.elapsed()),
None,
),
);
// }
}
SeenBlock::Finalized(block_refs) => {
for block_ref in block_refs {
// if !seen_blocks.contains_key(&block_ref.hash()) {
// panic!("Finalized before new Finalized {:#?}\n initBlock {:#?} \nMEMLOG{:#?}\n SeenBlocks{:#?} \n Other{:#?}", block_ref.hash(), unsafe { &FIN_BLOCK }, mem_log, seen_blocks, seen_other);
// }
if should_poll_blocks {
match seen_blocks_sub.poll_next_unpin(cx) {
Poll::Ready(Some(seen_block)) => {
match seen_block {
SeenBlock::New((block_ref, parent)) => {
// Optimization: once we have a `finalized_hash`, we only care about finalized
// block refs now and can avoid bothering to save new blocks.
// if finalized_hash.is_none() {
seen_blocks.insert(
block_ref.hash(),
(
SeenBlockMarker::New,
block_ref,
parent,
Some(now.elapsed()),
None,
),
);
// }
}
SeenBlock::Finalized(block_refs) => {
for block_ref in block_refs {
// if !seen_blocks.contains_key(&block_ref.hash()) {
// panic!("Finalized before new Finalized {:#?}\n initBlock {:#?} \nMEMLOG{:#?}\n SeenBlocks{:#?} \n Other{:#?}", block_ref.hash(), unsafe { &FIN_BLOCK }, mem_log, seen_blocks, seen_other);
// }
let entry = seen_blocks.entry(block_ref.hash()).or_insert((
SeenBlockMarker::Finalized,
block_ref.clone(),
block_ref,
None,
Some(now.elapsed()),
));
entry.0 = SeenBlockMarker::Finalized;
entry.4 = Some(now.elapsed());
let entry =
seen_blocks.entry(block_ref.hash()).or_insert((
SeenBlockMarker::Finalized,
block_ref.clone(),
block_ref,
None,
Some(now.elapsed()),
));
entry.0 = SeenBlockMarker::Finalized;
entry.4 = Some(now.elapsed());
// .get_mut(&block_ref.hash())
// .expect("finalized block seen before new block")
// .0 = SeenBlockMarker::Finalized;
// .get_mut(&block_ref.hash())
// .expect("finalized block seen before new block")
// .0 = SeenBlockMarker::Finalized;
}
}
SeenBlock::Other(other) => {
seen_other.push((now.elapsed(), other));
}
}
continue;
}
SeenBlock::Other(other) => {
seen_other.push((now.elapsed(), other));
Poll::Ready(None) => {
should_poll_blocks = false;
println!(" seen_blocks_sub ended!");
}
Poll::Pending => (),
}
continue;
}
// if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) {
// match seen_block {
// SeenBlock::New((block_ref, parent)) => {
// // Optimization: once we have a `finalized_hash`, we only care about finalized
// // block refs now and can avoid bothering to save new blocks.
// // if finalized_hash.is_none() {
// seen_blocks.insert(
// block_ref.hash(),
// (
// SeenBlockMarker::New,
// block_ref,
// parent,
// Some(now.elapsed()),
// None,
// ),
// );
// // }
// }
// SeenBlock::Finalized(block_refs) => {
// for block_ref in block_refs {
// // if !seen_blocks.contains_key(&block_ref.hash()) {
// // panic!("Finalized before new Finalized {:#?}\n initBlock {:#?} \nMEMLOG{:#?}\n SeenBlocks{:#?} \n Other{:#?}", block_ref.hash(), unsafe { &FIN_BLOCK }, mem_log, seen_blocks, seen_other);
// // }
// let entry = seen_blocks.entry(block_ref.hash()).or_insert((
// SeenBlockMarker::Finalized,
// block_ref.clone(),
// block_ref,
// None,
// Some(now.elapsed()),
// ));
// entry.0 = SeenBlockMarker::Finalized;
// entry.4 = Some(now.elapsed());
// // .get_mut(&block_ref.hash())
// // .expect("finalized block seen before new block")
// // .0 = SeenBlockMarker::Finalized;
// }
// }
// SeenBlock::Other(other) => {
// seen_other.push((now.elapsed(), other));
// }
// }
// continue;
// }
// If we have a finalized hash, we are done looking for tx events and we are just waiting
// for a pinned block with a matching hash (which must appear eventually given it's finalized).
if let Some(hash) = &finalized_hash {