From d004789b0426739eb883b128cbf7571cb1969614 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Thu, 11 Jan 2024 16:50:44 +0200 Subject: [PATCH] backend(fix): Remove only finalized blocks from the event window (#1356) * backend(fix): Remove only finalized blocks from the event window Signed-off-by: Alexandru Vasile * backend: Improve documentation Signed-off-by: Alexandru Vasile * backend/tests: Check new block is delivered after finalized event Signed-off-by: Alexandru Vasile * backend: More docs about finalzied / prunning and new block / best block Signed-off-by: Alexandru Vasile --------- Signed-off-by: Alexandru Vasile --- .../backend/unstable/follow_stream_driver.rs | 82 ++++++++++++++++--- 1 file changed, 70 insertions(+), 12 deletions(-) diff --git a/subxt/src/backend/unstable/follow_stream_driver.rs b/subxt/src/backend/unstable/follow_stream_driver.rs index f058e2d250..028065f941 100644 --- a/subxt/src/backend/unstable/follow_stream_driver.rs +++ b/subxt/src/backend/unstable/follow_stream_driver.rs @@ -7,7 +7,7 @@ use crate::backend::unstable::rpc_methods::{FollowEvent, Initialized, RuntimeEve use crate::config::BlockHash; use crate::error::Error; use futures::stream::{Stream, StreamExt}; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::ops::DerefMut; use std::pin::Pin; use std::sync::{Arc, Mutex}; @@ -165,9 +165,8 @@ struct SharedState { done: bool, next_id: usize, subscribers: HashMap>, - // Keep a buffer of all events from last finalized block so that new - // subscriptions can be handed this info first. - block_events_from_last_finalized: VecDeque>>, + /// Keep a buffer of all events that should be handed to a new subscription. + block_events_for_new_subscriptions: VecDeque>>, // Keep track of the subscription ID we send out on new subs. current_subscription_id: Option, // Keep track of the init message we send out on new subs. @@ -186,7 +185,7 @@ impl Default for Shared { current_init_message: None, current_subscription_id: None, seen_runtime_events: HashMap::new(), - block_events_from_last_finalized: VecDeque::new(), + block_events_for_new_subscriptions: VecDeque::new(), }))) } } @@ -251,7 +250,7 @@ impl Shared { // New subscriptions will be given this init message: shared.current_init_message = Some(ev.clone()); // Clear block cache (since a new finalized block hash is seen): - shared.block_events_from_last_finalized.clear(); + shared.block_events_for_new_subscriptions.clear(); } FollowStreamMsg::Event(FollowEvent::Finalized(finalized_ev)) => { // Update the init message that we'll hand out to new subscriptions. If the init message @@ -275,8 +274,30 @@ impl Shared { } } - // New finalized block, so clear the cache of older block events. - shared.block_events_from_last_finalized.clear(); + // The last finalized block will be reported as Initialized by our driver, + // therefore there is no need to report NewBlock and BestBlock events for it. + // If the Finalized event reported multiple finalized hashes, we only care about + // the state at the head of the chain, therefore it is correct to remove those as well. + // Idem for the pruned hashes; they will never be reported again and we remove + // them from the window of events. + let to_remove: HashSet = finalized_ev + .finalized_block_hashes + .iter() + .chain(finalized_ev.pruned_block_hashes.iter()) + .map(|h| h.hash()) + .collect(); + + shared + .block_events_for_new_subscriptions + .retain(|ev| match ev { + FollowEvent::NewBlock(new_block_ev) => { + !to_remove.contains(&new_block_ev.block_hash.hash()) + } + FollowEvent::BestBlockChanged(best_block_ev) => { + !to_remove.contains(&best_block_ev.best_block_hash.hash()) + } + _ => true, + }); } FollowStreamMsg::Event(FollowEvent::NewBlock(new_block_ev)) => { // If a new runtime is seen, note it so that when a block is finalized, we @@ -288,15 +309,15 @@ impl Shared { } shared - .block_events_from_last_finalized + .block_events_for_new_subscriptions .push_back(FollowEvent::NewBlock(new_block_ev)); } FollowStreamMsg::Event(ev @ FollowEvent::BestBlockChanged(_)) => { - shared.block_events_from_last_finalized.push_back(ev); + shared.block_events_for_new_subscriptions.push_back(ev); } FollowStreamMsg::Event(FollowEvent::Stop) => { // On a stop event, clear everything. Wait for resubscription and new ready/initialised events. - shared.block_events_from_last_finalized.clear(); + shared.block_events_for_new_subscriptions.clear(); shared.current_subscription_id = None; shared.current_init_message = None; } @@ -334,7 +355,7 @@ impl Shared { init_msg.clone(), ))); } - for ev in &shared.block_events_from_last_finalized { + for ev in &shared.block_events_for_new_subscriptions { local_items.push_back(FollowStreamMsg::Event(ev.clone())); } @@ -485,4 +506,41 @@ mod test { ]; assert_eq!(evs, expected); } + + #[tokio::test] + async fn subscribers_receive_new_blocks_before_subscribing() { + let mut driver = test_follow_stream_driver_getter( + || { + [ + Ok(ev_initialized(0)), + Ok(ev_new_block(0, 1)), + Ok(ev_best_block(1)), + Ok(ev_new_block(1, 2)), + Ok(ev_new_block(2, 3)), + Ok(ev_finalized([1])), + Err(Error::Other("ended".to_owned())), + ] + }, + 10, + ); + + // Skip to the first finalized block F1. + let _r = driver.next().await.unwrap(); + let _i0 = driver.next().await.unwrap(); + let _n1 = driver.next().await.unwrap(); + let _b1 = driver.next().await.unwrap(); + let _n2 = driver.next().await.unwrap(); + let _n3 = driver.next().await.unwrap(); + let _f1 = driver.next().await.unwrap(); + + // THEN subscribe; and make sure new block 1 and 2 are received. + let evs: Vec<_> = driver.handle().subscribe().take(4).collect().await; + let expected = vec![ + FollowStreamMsg::Ready("sub_id_0".into()), + FollowStreamMsg::Event(ev_initialized_ref(1)), + FollowStreamMsg::Event(ev_new_block_ref(1, 2)), + FollowStreamMsg::Event(ev_new_block_ref(2, 3)), + ]; + assert_eq!(evs, expected); + } }