Keep sessions in window for the full unfinalized chain (#6054)

* Impl dynamic window size. Keep sessions for unfinalized chain

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* feedback

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* Stretch also in contructor plus  tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* review feedback

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* fix approval-voting tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

* grunting: dispute coordinator tests

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>

Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
This commit is contained in:
Andrei Sandu
2022-10-04 13:36:42 +03:00
committed by GitHub
parent ab8f04f827
commit 7114a8cfca
4 changed files with 435 additions and 14 deletions
@@ -1296,6 +1296,38 @@ pub(crate) mod tests {
}
);
// Caching of sesssions needs sessoion of first unfinalied block.
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(header.number));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, header.number);
let _ = s_tx.send(Ok(Some(header.hash())));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, header.hash());
let _ = s_tx.send(Ok(session));
}
);
// determine_new_blocks exits early as the parent_hash is in the DB
assert_matches!(
@@ -807,6 +807,37 @@ async fn import_block(
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(number));
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, number);
let _ = s_tx.send(Ok(Some(hashes[number as usize].0)));
}
);
assert_matches!(
overseer_recv(overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, hashes[number as usize].0);
let _ = s_tx.send(Ok(number.into()));
}
);
if !fork {
assert_matches!(
overseer_recv(overseer).await,
@@ -239,13 +239,15 @@ impl TestState {
)))
.await;
self.handle_sync_queries(virtual_overseer, block_hash, session).await;
self.handle_sync_queries(virtual_overseer, block_hash, block_number, session)
.await;
}
async fn handle_sync_queries(
&mut self,
virtual_overseer: &mut VirtualOverseer,
block_hash: Hash,
block_number: BlockNumber,
session: SessionIndex,
) {
// Order of messages is not fixed (different on initializing):
@@ -278,11 +280,45 @@ impl TestState {
finished_steps.got_session_information = true;
assert_eq!(h, block_hash);
let _ = tx.send(Ok(session));
// Queries for fetching earliest unfinalized block session. See `RollingSessionWindow`.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(block_number));
}
);
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
number,
s_tx,
)) => {
assert_eq!(block_number, number);
let _ = s_tx.send(Ok(Some(block_hash)));
}
);
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, block_hash);
let _ = s_tx.send(Ok(session));
}
);
// No queries, if subsystem knows about this session already.
if self.known_session == Some(session) {
continue
}
self.known_session = Some(session);
loop {
// answer session info queries until the current session is reached.
assert_matches!(
@@ -361,7 +397,8 @@ impl TestState {
)))
.await;
self.handle_sync_queries(virtual_overseer, *leaf, session).await;
self.handle_sync_queries(virtual_overseer, *leaf, n as BlockNumber, session)
.await;
}
}
@@ -20,15 +20,17 @@
//! care about the state of particular blocks.
pub use polkadot_node_primitives::{new_session_window_size, SessionWindowSize};
use polkadot_primitives::v2::{Hash, SessionIndex, SessionInfo};
use polkadot_primitives::v2::{BlockNumber, Hash, SessionIndex, SessionInfo};
use futures::channel::oneshot;
use polkadot_node_subsystem::{
errors::RuntimeApiError,
messages::{RuntimeApiMessage, RuntimeApiRequest},
errors::{ChainApiError, RuntimeApiError},
messages::{ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest},
overseer,
};
const LOG_TARGET: &str = "parachain::rolling-session-window";
/// Sessions unavailable in state to cache.
#[derive(Debug, Clone, thiserror::Error)]
pub enum SessionsUnavailableReason {
@@ -38,9 +40,18 @@ pub enum SessionsUnavailableReason {
/// The runtime API itself returned an error.
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
/// The chain API itself returned an error.
#[error(transparent)]
ChainApi(#[from] ChainApiError),
/// Missing session info from runtime API for given `SessionIndex`.
#[error("Missing session index {0:?}")]
Missing(SessionIndex),
/// Missing last finalized block number.
#[error("Missing last finalized block number")]
MissingLastFinalizedBlock,
/// Missing last finalized block hash.
#[error("Missing last finalized block hash")]
MissingLastFinalizedBlockHash(BlockNumber),
}
/// Information about the sessions being fetched.
@@ -98,11 +109,18 @@ impl RollingSessionWindow {
block_hash: Hash,
) -> Result<Self, SessionsUnavailable>
where
Sender: overseer::SubsystemSender<RuntimeApiMessage>,
Sender: overseer::SubsystemSender<RuntimeApiMessage>
+ overseer::SubsystemSender<ChainApiMessage>,
{
let session_index = get_session_index_for_child(&mut sender, block_hash).await?;
let earliest_non_finalized_block_session =
Self::earliest_non_finalized_block_session(&mut sender).await?;
let window_start = session_index.saturating_sub(window_size.get() - 1);
// This will increase the session window to cover the full unfinalized chain.
let window_start = std::cmp::min(
session_index.saturating_sub(window_size.get() - 1),
earliest_non_finalized_block_session,
);
match load_all_sessions(&mut sender, block_hash, window_start, session_index).await {
Err(kind) => Err(SessionsUnavailable {
@@ -146,6 +164,87 @@ impl RollingSessionWindow {
self.earliest_session + (self.session_info.len() as SessionIndex).saturating_sub(1)
}
async fn earliest_non_finalized_block_session<Sender>(
sender: &mut Sender,
) -> Result<u32, SessionsUnavailable>
where
Sender: overseer::SubsystemSender<RuntimeApiMessage>
+ overseer::SubsystemSender<ChainApiMessage>,
{
let last_finalized_height = {
let (tx, rx) = oneshot::channel();
sender.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await;
match rx.await {
Ok(Ok(number)) => number,
Ok(Err(e)) =>
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::ChainApi(e),
info: None,
}),
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?err,
"Failed fetching last finalized block number"
);
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::MissingLastFinalizedBlock,
info: None,
})
},
}
};
let (tx, rx) = oneshot::channel();
// We want to get the session index for the child of the last finalized block.
sender
.send_message(ChainApiMessage::FinalizedBlockHash(last_finalized_height, tx))
.await;
let last_finalized_hash_parent = match rx.await {
Ok(Ok(maybe_hash)) => maybe_hash,
Ok(Err(e)) =>
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::ChainApi(e),
info: None,
}),
Err(err) => {
gum::warn!(target: LOG_TARGET, ?err, "Failed fetching last finalized block hash");
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash(
last_finalized_height,
),
info: None,
})
},
};
// Get the session in which the last finalized block was authored.
if let Some(last_finalized_hash_parent) = last_finalized_hash_parent {
let session =
match get_session_index_for_child(sender, last_finalized_hash_parent).await {
Ok(session_index) => session_index,
Err(err) => {
gum::warn!(
target: LOG_TARGET,
?err,
?last_finalized_hash_parent,
"Failed fetching session index"
);
return Err(err)
},
};
Ok(session)
} else {
return Err(SessionsUnavailable {
kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash(
last_finalized_height,
),
info: None,
})
}
}
/// When inspecting a new import notification, updates the session info cache to match
/// the session of the imported block's child.
///
@@ -153,12 +252,18 @@ impl RollingSessionWindow {
/// not change often and import notifications are expected to be typically increasing in session number.
///
/// some backwards drift in session index is acceptable.
pub async fn cache_session_info_for_head(
pub async fn cache_session_info_for_head<Sender>(
&mut self,
sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
sender: &mut Sender,
block_hash: Hash,
) -> Result<SessionWindowUpdate, SessionsUnavailable> {
) -> Result<SessionWindowUpdate, SessionsUnavailable>
where
Sender: overseer::SubsystemSender<RuntimeApiMessage>
+ overseer::SubsystemSender<ChainApiMessage>,
{
let session_index = get_session_index_for_child(sender, block_hash).await?;
let earliest_non_finalized_block_session =
Self::earliest_non_finalized_block_session(sender).await?;
let old_window_start = self.earliest_session;
@@ -171,7 +276,12 @@ impl RollingSessionWindow {
let old_window_end = latest;
let window_start = session_index.saturating_sub(self.window_size.get() - 1);
// Ensure we keep sessions up to last finalized block by adjusting the window start.
// This will increase the session window to cover the full unfinalized chain.
let window_start = std::cmp::min(
session_index.saturating_sub(self.window_size.get() - 1),
earliest_non_finalized_block_session,
);
// keep some of the old window, if applicable.
let overlap_start = window_start.saturating_sub(old_window_start);
@@ -319,6 +429,14 @@ mod tests {
parent_hash: Default::default(),
};
let finalized_header = Header {
digest: Default::default(),
extrinsics_root: Default::default(),
number: 0,
state_root: Default::default(),
parent_hash: Default::default(),
};
let pool = TaskExecutor::new();
let (mut ctx, mut handle) =
make_subsystem_context::<AvailabilityRecoveryMessage, _>(pool.clone());
@@ -358,6 +476,37 @@ mod tests {
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(finalized_header.number));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, finalized_header.number);
let _ = s_tx.send(Ok(Some(finalized_header.hash())));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, finalized_header.hash());
let _ = s_tx.send(Ok(session));
}
);
for i in expect_requests_from..=session {
assert_matches!(
handle.recv().await,
@@ -486,9 +635,10 @@ mod tests {
}
#[test]
fn any_session_unavailable_for_caching_means_no_change() {
let session: SessionIndex = 6;
let start_session = session.saturating_sub(TEST_WINDOW_SIZE.get() - 1);
fn any_session_stretch_for_unfinalized_chain() {
// Session index of the tip of our fake test chain.
let session: SessionIndex = 100;
let genesis_session: SessionIndex = 0;
let header = Header {
digest: Default::default(),
@@ -498,6 +648,14 @@ mod tests {
parent_hash: Default::default(),
};
let finalized_header = Header {
digest: Default::default(),
extrinsics_root: Default::default(),
number: 0,
state_root: Default::default(),
parent_hash: Default::default(),
};
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
@@ -523,6 +681,138 @@ mod tests {
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(finalized_header.number));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, finalized_header.number);
let _ = s_tx.send(Ok(Some(finalized_header.hash())));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, finalized_header.hash());
let _ = s_tx.send(Ok(0));
}
);
// Unfinalized chain starts at geneisis block, so session 0 is how far we stretch.
for i in genesis_session..=session {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(j, s_tx),
)) => {
assert_eq!(h, hash);
assert_eq!(i, j);
let _ = s_tx.send(Ok(if i == session {
None
} else {
Some(dummy_session_info(i))
}));
}
);
}
});
futures::executor::block_on(futures::future::join(test_fut, aux_fut));
}
#[test]
fn any_session_unavailable_for_caching_means_no_change() {
let session: SessionIndex = 6;
let start_session = session.saturating_sub(TEST_WINDOW_SIZE.get() - 1);
let header = Header {
digest: Default::default(),
extrinsics_root: Default::default(),
number: 5,
state_root: Default::default(),
parent_hash: Default::default(),
};
let finalized_header = Header {
digest: Default::default(),
extrinsics_root: Default::default(),
number: 0,
state_root: Default::default(),
parent_hash: Default::default(),
};
let pool = TaskExecutor::new();
let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone());
let hash = header.hash();
let test_fut = {
let sender = ctx.sender().clone();
Box::pin(async move {
let res = RollingSessionWindow::new(sender, TEST_WINDOW_SIZE, hash).await;
assert!(res.is_err());
})
};
let aux_fut = Box::pin(async move {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, hash);
let _ = s_tx.send(Ok(session));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(finalized_header.number));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, finalized_header.number);
let _ = s_tx.send(Ok(Some(finalized_header.hash())));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, finalized_header.hash());
let _ = s_tx.send(Ok(session));
}
);
for i in start_session..=session {
assert_matches!(
handle.recv().await,
@@ -586,6 +876,37 @@ mod tests {
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(
s_tx,
)) => {
let _ = s_tx.send(Ok(header.number));
}
);
assert_matches!(
handle.recv().await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash(
block_number,
s_tx,
)) => {
assert_eq!(block_number, header.number);
let _ = s_tx.send(Ok(Some(header.hash())));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionIndexForChild(s_tx),
)) => {
assert_eq!(h, header.hash());
let _ = s_tx.send(Ok(session));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(