Sync tx submission with chainHead_follow (#1305)

* sync tx submission with chainHead_follow

* make it compile

* add small comment
This commit is contained in:
James Wilson
2023-12-04 16:48:32 +00:00
committed by GitHub
parent 14b71279ba
commit c3b433123d
+47 -13
View File
@@ -436,10 +436,26 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
&self, &self,
extrinsic: &[u8], extrinsic: &[u8],
) -> Result<StreamOfResults<TransactionStatus<T::Hash>>, Error> { ) -> Result<StreamOfResults<TransactionStatus<T::Hash>>, Error> {
// First, subscribe to all new block hashes // We care about new and finalized block hashes.
let mut new_blocks = self.follow_handle.subscribe().events().filter_map(|ev| { enum SeenBlock<Ref> {
New(Ref),
Finalized(Vec<Ref>),
}
enum SeenBlockMarker {
New,
Finalized,
}
// First, subscribe to all new and finalized block refs.
// - we subscribe to new refs so that when we see `BestChainBlockIncluded`, we
// can try to return a block ref for the best block.
// - we subscribe to finalized refs so that when we see `Finalized`, we can
// guarantee that when we return here, the finalized block we report has been
// reported from chainHead_follow already.
let mut seen_blocks_sub = self.follow_handle.subscribe().events().filter_map(|ev| {
std::future::ready(match ev { std::future::ready(match ev {
FollowEvent::NewBlock(ev) => Some(ev.block_hash), FollowEvent::NewBlock(ev) => Some(SeenBlock::New(ev.block_hash)),
FollowEvent::Finalized(ev) => Some(SeenBlock::Finalized(ev.finalized_block_hashes)),
_ => None, _ => None,
}) })
}); });
@@ -453,8 +469,9 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
let mut seen_blocks = HashMap::new(); let mut seen_blocks = HashMap::new();
let mut done = false; let mut done = false;
// If we see the finalized event, we start waiting until we find a block that // If we see the finalized event, we start waiting until we find a finalized block that
// matches, so we can guarantee to return a pinned block hash. // matches, so we can guarantee to return a pinned block hash and be properly in sync
// with chainHead_follow.
let mut finalized_hash: Option<T::Hash> = None; let mut finalized_hash: Option<T::Hash> = None;
// Now we can attempt to associate tx events with pinned blocks. // Now we can attempt to associate tx events with pinned blocks.
@@ -465,17 +482,34 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
return Poll::Ready(None); return Poll::Ready(None);
} }
// Save any pinned blocks. Keep doing this until no more, so that we always have the most uptodate // Make a note of new or finalized blocks that have come in since we started the TX.
// pinned blocks when we are looking at our tx events. if let Poll::Ready(Some(seen_block)) = seen_blocks_sub.poll_next_unpin(cx) {
if let Poll::Ready(Some(block_ref)) = new_blocks.poll_next_unpin(cx) { match seen_block {
seen_blocks.insert(block_ref.hash(), block_ref); SeenBlock::New(block_ref) => {
// Optimization: once we have a `finalized_hash`, we only care about finalized
// block refs now and can avoid bothering to save new blocks.
if finalized_hash.is_none() {
seen_blocks
.insert(block_ref.hash(), (SeenBlockMarker::New, block_ref));
}
}
SeenBlock::Finalized(block_refs) => {
for block_ref in block_refs {
seen_blocks.insert(
block_ref.hash(),
(SeenBlockMarker::Finalized, block_ref),
);
}
}
}
continue; continue;
} }
// If we have a finalized hash, we are done looking for tx events and we are just waiting // If we have a finalized hash, we are done looking for tx events and we are just waiting
// for a pinned block with a matching hash (which must appear eventually given it's finalized). // for a pinned block with a matching hash (which must appear eventually given it's finalized).
if let Some(hash) = &finalized_hash { if let Some(hash) = &finalized_hash {
if let Some(block_ref) = seen_blocks.remove(hash) { if let Some((SeenBlockMarker::Finalized, block_ref)) = seen_blocks.remove(hash)
{
// Found it! Hand back the event with a pinned block. We're done. // Found it! Hand back the event with a pinned block. We're done.
done = true; done = true;
let ev = TransactionStatus::InFinalizedBlock { let ev = TransactionStatus::InFinalizedBlock {
@@ -483,7 +517,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
}; };
return Poll::Ready(Some(Ok(ev))); return Poll::Ready(Some(Ok(ev)));
} else { } else {
// Keep waiting for more new blocks until we find it (get rid of any other block refs // Keep waiting for more finalized blocks until we find it (get rid of any other block refs
// now, since none of them were what we were looking for anyway). // now, since none of them were what we were looking for anyway).
seen_blocks.clear(); seen_blocks.clear();
continue; continue;
@@ -517,8 +551,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
// block that likely isn't accessible. We have no guarantee that a best // block that likely isn't accessible. We have no guarantee that a best
// block on the node a tx was sent to will ever be known about on the // block on the node a tx was sent to will ever be known about on the
// chainHead_follow subscription. // chainHead_follow subscription.
let block_ref = match seen_blocks.get(&block.hash).cloned() { let block_ref = match seen_blocks.get(&block.hash) {
Some(block_ref) => block_ref.into(), Some((_, block_ref)) => block_ref.clone().into(),
None => BlockRef::from_hash(block.hash), None => BlockRef::from_hash(block.hash),
}; };
TransactionStatus::InBestBlock { hash: block_ref } TransactionStatus::InBestBlock { hash: block_ref }