mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 15:51:12 +00:00
backend(fix): Remove only finalized blocks from the event window (#1356)
* backend(fix): Remove only finalized blocks from the event window Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * backend: Improve documentation Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * backend/tests: Check new block is delivered after finalized event Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * backend: More docs about finalzied / prunning and new block / best block Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
@@ -7,7 +7,7 @@ use crate::backend::unstable::rpc_methods::{FollowEvent, Initialized, RuntimeEve
|
||||
use crate::config::BlockHash;
|
||||
use crate::error::Error;
|
||||
use futures::stream::{Stream, StreamExt};
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::ops::DerefMut;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
@@ -165,9 +165,8 @@ struct SharedState<Hash: BlockHash> {
|
||||
done: bool,
|
||||
next_id: usize,
|
||||
subscribers: HashMap<usize, SubscriberDetails<Hash>>,
|
||||
// Keep a buffer of all events from last finalized block so that new
|
||||
// subscriptions can be handed this info first.
|
||||
block_events_from_last_finalized: VecDeque<FollowEvent<BlockRef<Hash>>>,
|
||||
/// Keep a buffer of all events that should be handed to a new subscription.
|
||||
block_events_for_new_subscriptions: VecDeque<FollowEvent<BlockRef<Hash>>>,
|
||||
// Keep track of the subscription ID we send out on new subs.
|
||||
current_subscription_id: Option<String>,
|
||||
// Keep track of the init message we send out on new subs.
|
||||
@@ -186,7 +185,7 @@ impl<Hash: BlockHash> Default for Shared<Hash> {
|
||||
current_init_message: None,
|
||||
current_subscription_id: None,
|
||||
seen_runtime_events: HashMap::new(),
|
||||
block_events_from_last_finalized: VecDeque::new(),
|
||||
block_events_for_new_subscriptions: VecDeque::new(),
|
||||
})))
|
||||
}
|
||||
}
|
||||
@@ -251,7 +250,7 @@ impl<Hash: BlockHash> Shared<Hash> {
|
||||
// New subscriptions will be given this init message:
|
||||
shared.current_init_message = Some(ev.clone());
|
||||
// Clear block cache (since a new finalized block hash is seen):
|
||||
shared.block_events_from_last_finalized.clear();
|
||||
shared.block_events_for_new_subscriptions.clear();
|
||||
}
|
||||
FollowStreamMsg::Event(FollowEvent::Finalized(finalized_ev)) => {
|
||||
// Update the init message that we'll hand out to new subscriptions. If the init message
|
||||
@@ -275,8 +274,30 @@ impl<Hash: BlockHash> Shared<Hash> {
|
||||
}
|
||||
}
|
||||
|
||||
// New finalized block, so clear the cache of older block events.
|
||||
shared.block_events_from_last_finalized.clear();
|
||||
// The last finalized block will be reported as Initialized by our driver,
|
||||
// therefore there is no need to report NewBlock and BestBlock events for it.
|
||||
// If the Finalized event reported multiple finalized hashes, we only care about
|
||||
// the state at the head of the chain, therefore it is correct to remove those as well.
|
||||
// Idem for the pruned hashes; they will never be reported again and we remove
|
||||
// them from the window of events.
|
||||
let to_remove: HashSet<Hash> = finalized_ev
|
||||
.finalized_block_hashes
|
||||
.iter()
|
||||
.chain(finalized_ev.pruned_block_hashes.iter())
|
||||
.map(|h| h.hash())
|
||||
.collect();
|
||||
|
||||
shared
|
||||
.block_events_for_new_subscriptions
|
||||
.retain(|ev| match ev {
|
||||
FollowEvent::NewBlock(new_block_ev) => {
|
||||
!to_remove.contains(&new_block_ev.block_hash.hash())
|
||||
}
|
||||
FollowEvent::BestBlockChanged(best_block_ev) => {
|
||||
!to_remove.contains(&best_block_ev.best_block_hash.hash())
|
||||
}
|
||||
_ => true,
|
||||
});
|
||||
}
|
||||
FollowStreamMsg::Event(FollowEvent::NewBlock(new_block_ev)) => {
|
||||
// If a new runtime is seen, note it so that when a block is finalized, we
|
||||
@@ -288,15 +309,15 @@ impl<Hash: BlockHash> Shared<Hash> {
|
||||
}
|
||||
|
||||
shared
|
||||
.block_events_from_last_finalized
|
||||
.block_events_for_new_subscriptions
|
||||
.push_back(FollowEvent::NewBlock(new_block_ev));
|
||||
}
|
||||
FollowStreamMsg::Event(ev @ FollowEvent::BestBlockChanged(_)) => {
|
||||
shared.block_events_from_last_finalized.push_back(ev);
|
||||
shared.block_events_for_new_subscriptions.push_back(ev);
|
||||
}
|
||||
FollowStreamMsg::Event(FollowEvent::Stop) => {
|
||||
// On a stop event, clear everything. Wait for resubscription and new ready/initialised events.
|
||||
shared.block_events_from_last_finalized.clear();
|
||||
shared.block_events_for_new_subscriptions.clear();
|
||||
shared.current_subscription_id = None;
|
||||
shared.current_init_message = None;
|
||||
}
|
||||
@@ -334,7 +355,7 @@ impl<Hash: BlockHash> Shared<Hash> {
|
||||
init_msg.clone(),
|
||||
)));
|
||||
}
|
||||
for ev in &shared.block_events_from_last_finalized {
|
||||
for ev in &shared.block_events_for_new_subscriptions {
|
||||
local_items.push_back(FollowStreamMsg::Event(ev.clone()));
|
||||
}
|
||||
|
||||
@@ -485,4 +506,41 @@ mod test {
|
||||
];
|
||||
assert_eq!(evs, expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribers_receive_new_blocks_before_subscribing() {
|
||||
let mut driver = test_follow_stream_driver_getter(
|
||||
|| {
|
||||
[
|
||||
Ok(ev_initialized(0)),
|
||||
Ok(ev_new_block(0, 1)),
|
||||
Ok(ev_best_block(1)),
|
||||
Ok(ev_new_block(1, 2)),
|
||||
Ok(ev_new_block(2, 3)),
|
||||
Ok(ev_finalized([1])),
|
||||
Err(Error::Other("ended".to_owned())),
|
||||
]
|
||||
},
|
||||
10,
|
||||
);
|
||||
|
||||
// Skip to the first finalized block F1.
|
||||
let _r = driver.next().await.unwrap();
|
||||
let _i0 = driver.next().await.unwrap();
|
||||
let _n1 = driver.next().await.unwrap();
|
||||
let _b1 = driver.next().await.unwrap();
|
||||
let _n2 = driver.next().await.unwrap();
|
||||
let _n3 = driver.next().await.unwrap();
|
||||
let _f1 = driver.next().await.unwrap();
|
||||
|
||||
// THEN subscribe; and make sure new block 1 and 2 are received.
|
||||
let evs: Vec<_> = driver.handle().subscribe().take(4).collect().await;
|
||||
let expected = vec![
|
||||
FollowStreamMsg::Ready("sub_id_0".into()),
|
||||
FollowStreamMsg::Event(ev_initialized_ref(1)),
|
||||
FollowStreamMsg::Event(ev_new_block_ref(1, 2)),
|
||||
FollowStreamMsg::Event(ev_new_block_ref(2, 3)),
|
||||
];
|
||||
assert_eq!(evs, expected);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user