Better error handling in approval-voting (#2603)

* make approval voting resilient to dropped requests

* some more

* skip whole chain if encountering spurious error
This commit is contained in:
Robert Habermeier
2021-03-10 14:53:12 -06:00
committed by GitHub
parent b360360544
commit 40a584bebc
2 changed files with 81 additions and 50 deletions
@@ -190,12 +190,16 @@ async fn determine_new_blocks(
Ok(ancestry)
}
// Sessions unavailable in state to cache.
#[derive(Debug)]
struct SessionsUnavailable;
async fn load_all_sessions(
ctx: &mut impl SubsystemContext,
block_hash: Hash,
start: SessionIndex,
end_inclusive: SessionIndex,
) -> SubsystemResult<Option<Vec<SessionInfo>>> {
) -> Result<Vec<SessionInfo>, SessionsUnavailable> {
let mut v = Vec::new();
for i in start..=end_inclusive {
let (tx, rx)= oneshot::channel();
@@ -214,22 +218,17 @@ async fn load_all_sessions(
block_hash,
);
return Ok(None);
return Err(SessionsUnavailable);
}
Ok(Err(e)) => return Err(SubsystemError::with_origin("approval-voting", e)),
Err(e) => return Err(SubsystemError::with_origin("approval-voting", e)),
Ok(Err(_)) | Err(_) => return Err(SessionsUnavailable),
};
v.push(session_info);
}
Ok(Some(v))
Ok(v)
}
// Sessions unavailable in state to cache.
#[derive(Debug)]
struct SessionsUnavailable;
// When inspecting a new import notification, updates the session info cache to match
// the session of the imported block.
//
@@ -242,7 +241,7 @@ async fn cache_session_info_for_head(
session_window: &mut RollingSessionWindow,
block_hash: Hash,
block_header: &Header,
) -> SubsystemResult<Result<(), SessionsUnavailable>> {
) -> Result<(), SessionsUnavailable> {
let session_index = {
let (s_tx, s_rx) = oneshot::channel();
@@ -254,9 +253,9 @@ async fn cache_session_info_for_head(
RuntimeApiRequest::SessionIndexForChild(s_tx),
).into()).await;
match s_rx.await? {
Ok(s) => s,
Err(e) => return Err(SubsystemError::with_origin("approval-voting", e)),
match s_rx.await {
Ok(Ok(s)) => s,
Ok(Err(_)) | Err(_) => return Err(SessionsUnavailable),
}
};
@@ -271,17 +270,17 @@ async fn cache_session_info_for_head(
window_start, session_index,
);
match load_all_sessions(ctx, block_hash, window_start, session_index).await? {
None => {
match load_all_sessions(ctx, block_hash, window_start, session_index).await {
Err(SessionsUnavailable) => {
tracing::warn!(
target: LOG_TARGET,
"Could not load sessions {}..={} from block {:?} in session {}",
window_start, session_index, block_hash, session_index,
);
return Ok(Err(SessionsUnavailable));
return Err(SessionsUnavailable);
},
Some(s) => {
Ok(s) => {
session_window.earliest_session = Some(window_start);
session_window.session_info = s;
}
@@ -291,7 +290,7 @@ async fn cache_session_info_for_head(
let latest = session_window.latest_session().expect("latest always exists if earliest does; qed");
// Either cached or ancient.
if session_index <= latest { return Ok(Ok(())) }
if session_index <= latest { return Ok(()) }
let old_window_end = latest;
@@ -311,17 +310,17 @@ async fn cache_session_info_for_head(
latest + 1
};
match load_all_sessions(ctx, block_hash, fresh_start, session_index).await? {
None => {
match load_all_sessions(ctx, block_hash, fresh_start, session_index).await {
Err(SessionsUnavailable) => {
tracing::warn!(
target: LOG_TARGET,
"Could not load sessions {}..={} from block {:?} in session {}",
latest + 1, session_index, block_hash, session_index,
);
return Ok(Err(SessionsUnavailable));
return Err(SessionsUnavailable);
}
Some(s) => {
Ok(s) => {
let outdated = std::cmp::min(overlap_start as usize, session_window.session_info.len());
session_window.session_info.drain(..outdated);
session_window.session_info.extend(s);
@@ -335,7 +334,7 @@ async fn cache_session_info_for_head(
}
}
Ok(Ok(()))
Ok(())
}
struct ImportedBlockInfo {
@@ -539,7 +538,13 @@ pub(crate) async fn handle_new_head(
match h_rx.await? {
Err(e) => {
return Err(SubsystemError::with_origin("approval-voting", e));
tracing::debug!(
target: LOG_TARGET,
"Chain API subsystem temporarily unreachable {}",
e,
);
return Ok(Vec::new());
}
Ok(None) => {
tracing::warn!(target: LOG_TARGET, "Missing header for new head {}", head);
@@ -555,7 +560,7 @@ pub(crate) async fn handle_new_head(
&mut state.session_window,
head,
&header,
).await?
).await
{
tracing::warn!(
target: LOG_TARGET,
@@ -582,13 +587,35 @@ pub(crate) async fn handle_new_head(
let mut imported_candidates = Vec::with_capacity(new_blocks.len());
// `determine_new_blocks` gives us a vec in backwards order. we want to move forwards.
for (block_hash, block_header) in new_blocks.into_iter().rev() {
let env = ImportedBlockInfoEnv {
session_window: &state.session_window,
assignment_criteria: &*state.assignment_criteria,
keystore: &state.keystore,
};
let imported_blocks_and_info = {
let mut imported_blocks_and_info = Vec::with_capacity(new_blocks.len());
for (block_hash, block_header) in new_blocks.into_iter().rev() {
let env = ImportedBlockInfoEnv {
session_window: &state.session_window,
assignment_criteria: &*state.assignment_criteria,
keystore: &state.keystore,
};
match imported_block_info(ctx, env, block_hash, &block_header).await? {
Some(i) => imported_blocks_and_info.push((block_hash, block_header, i)),
None => {
// Such errors are likely spurious, but this prevents us from getting gaps
// in the approval-db.
tracing::warn!(
target: LOG_TARGET,
"Unable to gather info about imported block {:?}. Skipping chain.",
(block_hash, block_header.number),
);
return Ok(Vec::new());
},
};
}
imported_blocks_and_info
};
for (block_hash, block_header, imported_block_info) in imported_blocks_and_info {
let ImportedBlockInfo {
included_candidates,
session_index,
@@ -596,10 +623,7 @@ pub(crate) async fn handle_new_head(
n_validators,
relay_vrf_story,
slot,
} = match imported_block_info(ctx, env, block_hash, &block_header).await? {
Some(i) => i,
None => continue,
};
} = imported_block_info;
let session_info = state.session_window.session_info(session_index)
.expect("imported_block_info requires session to be available; qed");
@@ -1772,7 +1796,7 @@ mod tests {
&mut window,
hash,
&header,
).await.unwrap().unwrap();
).await.unwrap();
assert_eq!(window.earliest_session, Some(expected_start_session));
assert_eq!(
@@ -1953,7 +1977,7 @@ mod tests {
&mut window,
hash,
&header,
).await.unwrap();
).await;
assert_matches!(res, Err(SessionsUnavailable));
})
@@ -2020,7 +2044,7 @@ mod tests {
&mut window,
hash,
&header,
).await.unwrap().unwrap();
).await.unwrap();
assert_eq!(window.earliest_session, Some(session));
assert_eq!(
+19 -12
View File
@@ -665,10 +665,10 @@ async fn handle_approved_ancestor(
ctx.send_message(ChainApiMessage::BlockNumber(target, tx).into()).await;
match rx.await? {
Ok(Some(n)) => n,
Ok(None) => return Ok(None),
Err(_) => return Ok(None),
match rx.await {
Ok(Ok(Some(n))) => n,
Ok(Ok(None)) => return Ok(None),
Ok(Err(_)) | Err(_) => return Ok(None),
}
};
@@ -689,9 +689,9 @@ async fn handle_approved_ancestor(
response_channel: tx,
}.into()).await;
match rx.await? {
Ok(a) => a,
Err(_) => return Ok(None),
match rx.await {
Ok(Ok(a)) => a,
Err(_) | Ok(Err(_)) => return Ok(None),
}
} else {
Vec::new()
@@ -1406,11 +1406,18 @@ async fn launch_approval(
ChainApiMessage::BlockNumber(candidate.descriptor.relay_parent, context_num_tx).into()
).await;
let in_context_number = match context_num_rx.await?
.map_err(|e| SubsystemError::with_origin("chain-api", e))?
{
Some(n) => n,
None => return Ok(()),
let in_context_number = match context_num_rx.await {
Ok(Ok(Some(n))) => n,
Ok(Ok(None)) | Ok(Err(_)) | Err(_) => {
tracing::warn!(
target: LOG_TARGET,
"Could not launch approval work for candidate {:?}: Number of block {} unknown",
(candidate_hash, candidate.descriptor.para_id),
candidate.descriptor.relay_parent,
);
return Ok(());
}
};
ctx.send_message(