Message relay fixes for nightly stuck (#532)

* fixed missing else

* really wake up when retry timeout is completed

* do not query weights if target nonce is unknown

* fix compilation
This commit is contained in:
Svyatoslav Nikolsky
2020-11-27 10:44:49 +03:00
committed by Bastian Köcher
parent 889b22bee5
commit 4f661d2fe0
5 changed files with 175 additions and 47 deletions
@@ -103,6 +103,9 @@ pub trait SourceClient<P: MessageLane>: Clone + Send + Sync {
) -> Result<(SourceHeaderIdOf<P>, MessageNonce), Self::Error>; ) -> Result<(SourceHeaderIdOf<P>, MessageNonce), Self::Error>;
/// Returns mapping of message nonces, generated on this client, to their weights. /// Returns mapping of message nonces, generated on this client, to their weights.
///
/// Some weights may be missing from returned map, if corresponding messages were pruned at
/// the source chain.
async fn generated_messages_weights( async fn generated_messages_weights(
&self, &self,
id: SourceHeaderIdOf<P>, id: SourceHeaderIdOf<P>,
@@ -223,11 +223,11 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M
self.strategy.is_empty() self.strategy.is_empty()
} }
fn best_at_source(&self) -> MessageNonce { fn best_at_source(&self) -> Option<MessageNonce> {
self.strategy.best_at_source() self.strategy.best_at_source()
} }
fn best_at_target(&self) -> MessageNonce { fn best_at_target(&self) -> Option<MessageNonce> {
self.strategy.best_at_target() self.strategy.best_at_target()
} }
@@ -144,9 +144,14 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof> {
/// Should return true if nothing has to be synced. /// Should return true if nothing has to be synced.
fn is_empty(&self) -> bool; fn is_empty(&self) -> bool;
/// Return best nonce at source node. /// Return best nonce at source node.
fn best_at_source(&self) -> MessageNonce; ///
/// `Some` is returned only if we are sure that the value is greater or equal
/// than the result of `best_at_target`.
fn best_at_source(&self) -> Option<MessageNonce>;
/// Return best nonce at target node. /// Return best nonce at target node.
fn best_at_target(&self) -> MessageNonce; ///
/// May return `None` if value is yet unknown.
fn best_at_target(&self) -> Option<MessageNonce>;
/// Called when nonces are updated at source node of the race. /// Called when nonces are updated at source node of the race.
fn source_nonces_updated(&mut self, at_block: SourceHeaderId, nonces: SourceClientNonces<Self::SourceNoncesRange>); fn source_nonces_updated(&mut self, at_block: SourceHeaderId, nonces: SourceClientNonces<Self::SourceNoncesRange>);
@@ -334,7 +339,15 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>>(
async_std::task::sleep, async_std::task::sleep,
|| format!("Error submitting proof {}", P::target_name()), || format!("Error submitting proof {}", P::target_name()),
).fail_if_connection_error(FailedClient::Target)?; ).fail_if_connection_error(FailedClient::Target)?;
} },
// when we're ready to retry request
_ = source_go_offline_future => {
source_client_is_online = true;
},
_ = target_go_offline_future => {
target_client_is_online = true;
},
} }
progress_context = print_race_progress::<P, _>(progress_context, &strategy); progress_context = print_race_progress::<P, _>(progress_context, &strategy);
@@ -350,6 +363,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>>(
source_client_is_online = false; source_client_is_online = false;
let nonces_to_deliver = select_nonces_to_deliver(&race_state, &mut strategy); let nonces_to_deliver = select_nonces_to_deliver(&race_state, &mut strategy);
let best_at_source = strategy.best_at_source();
if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver { if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver {
log::debug!( log::debug!(
@@ -364,7 +378,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>>(
.generate_proof(at_block, nonces_range, proof_parameters) .generate_proof(at_block, nonces_range, proof_parameters)
.fuse(), .fuse(),
); );
} else if source_nonces_required { } else if source_nonces_required && best_at_source.is_some() {
log::debug!(target: "bridge", "Asking {} about message nonces", P::source_name()); log::debug!(target: "bridge", "Asking {} about message nonces", P::source_name());
let at_block = race_state let at_block = race_state
.best_finalized_source_header_id_at_source .best_finalized_source_header_id_at_source
@@ -374,7 +388,11 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>>(
best_finalized_source_header_id_at_source is Some; qed", best_finalized_source_header_id_at_source is Some; qed",
) )
.clone(); .clone();
source_nonces.set(race_source.nonces(at_block, strategy.best_at_source()).fuse()); source_nonces.set(
race_source
.nonces(at_block, best_at_source.expect("guaranteed by if condition; qed"))
.fuse(),
);
} else { } else {
source_client_is_online = true; source_client_is_online = true;
} }
@@ -395,8 +413,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>>(
.submit_proof(at_block.clone(), nonces_range.clone(), proof.clone()) .submit_proof(at_block.clone(), nonces_range.clone(), proof.clone())
.fuse(), .fuse(),
); );
} } else if target_nonces_required {
if target_nonces_required {
log::debug!(target: "bridge", "Asking {} about message nonces", P::target_name()); log::debug!(target: "bridge", "Asking {} about message nonces", P::target_name());
let at_block = race_state let at_block = race_state
.best_target_header_id .best_target_header_id
@@ -35,8 +35,8 @@ pub struct BasicStrategy<
> { > {
/// All queued nonces. /// All queued nonces.
source_queue: VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, SourceNoncesRange)>, source_queue: VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, SourceNoncesRange)>,
/// Best nonce known to target node. /// Best nonce known to target node. `None` if it has not been received yet.
target_nonce: MessageNonce, target_nonce: Option<MessageNonce>,
/// Unused generic types dump. /// Unused generic types dump.
_phantom: PhantomData<(TargetHeaderNumber, TargetHeaderHash, Proof)>, _phantom: PhantomData<(TargetHeaderNumber, TargetHeaderHash, Proof)>,
} }
@@ -52,7 +52,7 @@ where
pub fn new() -> Self { pub fn new() -> Self {
BasicStrategy { BasicStrategy {
source_queue: VecDeque::new(), source_queue: VecDeque::new(),
target_nonce: Default::default(), target_nonce: None,
_phantom: Default::default(), _phantom: Default::default(),
} }
} }
@@ -74,6 +74,9 @@ where
>, >,
mut selector: impl FnMut(SourceNoncesRange) -> Option<SourceNoncesRange>, mut selector: impl FnMut(SourceNoncesRange) -> Option<SourceNoncesRange>,
) -> Option<RangeInclusive<MessageNonce>> { ) -> Option<RangeInclusive<MessageNonce>> {
// if we do not know best nonce at target node, we can't select anything
let target_nonce = self.target_nonce?;
// if we have already selected nonces that we want to submit, do nothing // if we have already selected nonces that we want to submit, do nothing
if race_state.nonces_to_submit.is_some() { if race_state.nonces_to_submit.is_some() {
return None; return None;
@@ -128,7 +131,7 @@ where
} }
} }
nonces_end.map(|nonces_end| RangeInclusive::new(self.target_nonce + 1, nonces_end)) nonces_end.map(|nonces_end| RangeInclusive::new(target_nonce + 1, nonces_end))
} }
} }
@@ -147,17 +150,16 @@ where
self.source_queue.is_empty() self.source_queue.is_empty()
} }
fn best_at_source(&self) -> MessageNonce { fn best_at_source(&self) -> Option<MessageNonce> {
std::cmp::max( let best_in_queue = self.source_queue.back().map(|(_, range)| range.end());
self.source_queue match (best_in_queue, self.target_nonce) {
.back() (Some(best_in_queue), Some(target_nonce)) if best_in_queue > target_nonce => Some(best_in_queue),
.map(|(_, range)| range.end()) (_, Some(target_nonce)) => Some(target_nonce),
.unwrap_or(self.target_nonce), (_, None) => None,
self.target_nonce, }
)
} }
fn best_at_target(&self) -> MessageNonce { fn best_at_target(&self) -> Option<MessageNonce> {
self.target_nonce self.target_nonce
} }
@@ -166,11 +168,16 @@ where
at_block: HeaderId<SourceHeaderHash, SourceHeaderNumber>, at_block: HeaderId<SourceHeaderHash, SourceHeaderNumber>,
nonces: SourceClientNonces<SourceNoncesRange>, nonces: SourceClientNonces<SourceNoncesRange>,
) { ) {
let prev_best_at_source = self.best_at_source(); let best_in_queue = self
.source_queue
.back()
.map(|(_, range)| range.end())
.or(self.target_nonce)
.unwrap_or_default();
self.source_queue.extend( self.source_queue.extend(
nonces nonces
.new_nonces .new_nonces
.greater_than(prev_best_at_source) .greater_than(best_in_queue)
.into_iter() .into_iter()
.map(move |range| (at_block.clone(), range)), .map(move |range| (at_block.clone(), range)),
) )
@@ -187,9 +194,11 @@ where
) { ) {
let nonce = nonces.latest_nonce; let nonce = nonces.latest_nonce;
if nonce < self.target_nonce { if let Some(target_nonce) = self.target_nonce {
if nonce < target_nonce {
return; return;
} }
}
while let Some(true) = self.source_queue.front().map(|(_, range)| range.begin() <= nonce) { while let Some(true) = self.source_queue.front().map(|(_, range)| range.begin() <= nonce) {
let maybe_subrange = self let maybe_subrange = self
@@ -220,7 +229,7 @@ where
race_state.nonces_submitted = None; race_state.nonces_submitted = None;
} }
self.target_nonce = nonce; self.target_nonce = Some(nonce);
} }
fn select_nonces_to_deliver( fn select_nonces_to_deliver(
@@ -278,12 +287,12 @@ mod tests {
#[test] #[test]
fn best_at_source_is_never_lower_than_target_nonce() { fn best_at_source_is_never_lower_than_target_nonce() {
let mut strategy = BasicStrategy::<TestMessageLane>::new(); let mut strategy = BasicStrategy::<TestMessageLane>::new();
assert_eq!(strategy.best_at_source(), 0); assert_eq!(strategy.best_at_source(), None);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=5)); strategy.source_nonces_updated(header_id(1), source_nonces(1..=5));
assert_eq!(strategy.best_at_source(), 5); assert_eq!(strategy.best_at_source(), None);
strategy.target_nonces_updated(target_nonces(10), &mut Default::default()); strategy.target_nonces_updated(target_nonces(10), &mut Default::default());
assert_eq!(strategy.source_queue, vec![]); assert_eq!(strategy.source_queue, vec![]);
assert_eq!(strategy.best_at_source(), 10); assert_eq!(strategy.best_at_source(), Some(10));
} }
#[test] #[test]
@@ -306,9 +315,11 @@ mod tests {
#[test] #[test]
fn target_nonce_is_never_lower_than_latest_known_target_nonce() { fn target_nonce_is_never_lower_than_latest_known_target_nonce() {
let mut strategy = BasicStrategy::<TestMessageLane>::new(); let mut strategy = BasicStrategy::<TestMessageLane>::new();
assert_eq!(strategy.target_nonce, None);
strategy.target_nonces_updated(target_nonces(10), &mut Default::default()); strategy.target_nonces_updated(target_nonces(10), &mut Default::default());
assert_eq!(strategy.target_nonce, Some(10));
strategy.target_nonces_updated(target_nonces(5), &mut Default::default()); strategy.target_nonces_updated(target_nonces(5), &mut Default::default());
assert_eq!(strategy.target_nonce, 10); assert_eq!(strategy.target_nonce, Some(10));
} }
#[test] #[test]
@@ -351,6 +362,7 @@ mod tests {
let mut state = RaceState::default(); let mut state = RaceState::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new(); let mut strategy = BasicStrategy::<TestMessageLane>::new();
state.nonces_to_submit = Some((header_id(1), 1..=10, (1..=10, None))); state.nonces_to_submit = Some((header_id(1), 1..=10, (1..=10, None)));
strategy.target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=10)); strategy.source_nonces_updated(header_id(1), source_nonces(1..=10));
assert_eq!(strategy.select_nonces_to_deliver(&state), None); assert_eq!(strategy.select_nonces_to_deliver(&state), None);
} }
@@ -360,6 +372,7 @@ mod tests {
let mut state = RaceState::default(); let mut state = RaceState::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new(); let mut strategy = BasicStrategy::<TestMessageLane>::new();
state.nonces_submitted = Some(1..=10); state.nonces_submitted = Some(1..=10);
strategy.target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=10)); strategy.source_nonces_updated(header_id(1), source_nonces(1..=10));
assert_eq!(strategy.select_nonces_to_deliver(&state), None); assert_eq!(strategy.select_nonces_to_deliver(&state), None);
} }
@@ -368,6 +381,7 @@ mod tests {
fn select_nonces_to_deliver_works() { fn select_nonces_to_deliver_works() {
let mut state = RaceState::<_, _, TestMessagesProof>::default(); let mut state = RaceState::<_, _, TestMessagesProof>::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new(); let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=1)); strategy.source_nonces_updated(header_id(1), source_nonces(1..=1));
strategy.source_nonces_updated(header_id(2), source_nonces(2..=2)); strategy.source_nonces_updated(header_id(2), source_nonces(2..=2));
strategy.source_nonces_updated(header_id(3), source_nonces(3..=6)); strategy.source_nonces_updated(header_id(3), source_nonces(3..=6));
@@ -388,6 +402,7 @@ mod tests {
fn select_nonces_to_deliver_able_to_split_ranges_with_selector() { fn select_nonces_to_deliver_able_to_split_ranges_with_selector() {
let mut state = RaceState::<_, _, TestMessagesProof>::default(); let mut state = RaceState::<_, _, TestMessagesProof>::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new(); let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=100)); strategy.source_nonces_updated(header_id(1), source_nonces(1..=100));
state.best_finalized_source_header_id_at_source = Some(header_id(1)); state.best_finalized_source_header_id_at_source = Some(header_id(1));
+109 -16
View File
@@ -150,23 +150,11 @@ where
Some(id.1), Some(id.1),
) )
.await?; .await?;
let weights: Vec<(MessageNonce, Weight)> =
Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?;
let mut expected_nonce = *nonces.start(); make_message_weights_map::<C>(
let mut weights_map = MessageWeightsMap::new(); Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?,
for (nonce, weight) in weights { nonces,
if nonce != expected_nonce { )
return Err(SubstrateError::Custom(format!(
"Unexpected nonce in messages_dispatch_weight call result. Expected {}, got {}",
expected_nonce, nonce
)));
}
weights_map.insert(nonce, weight);
expected_nonce += 1;
}
Ok(weights_map)
} }
async fn prove_messages( async fn prove_messages(
@@ -245,3 +233,108 @@ where
best_finalized_peer_at_best_self: peer_on_self_best_finalized_id, best_finalized_peer_at_best_self: peer_on_self_best_finalized_id,
}) })
} }
fn make_message_weights_map<C: Chain>(
weights: Vec<(MessageNonce, Weight)>,
nonces: RangeInclusive<MessageNonce>,
) -> Result<MessageWeightsMap, SubstrateError> {
let make_missing_nonce_error = |expected_nonce| {
Err(SubstrateError::Custom(format!(
"Missing nonce {} in messages_dispatch_weight call result. Expected all nonces from {:?}",
expected_nonce, nonces,
)))
};
let mut weights_map = MessageWeightsMap::new();
// this is actually prevented by external logic
if nonces.is_empty() {
return Ok(weights_map);
}
// check if last nonce is missing - loop below is not checking this
let last_nonce_is_missing = weights
.last()
.map(|(last_nonce, _)| last_nonce != nonces.end())
.unwrap_or(true);
if last_nonce_is_missing {
return make_missing_nonce_error(*nonces.end());
}
let mut expected_nonce = *nonces.start();
let mut is_at_head = true;
for (nonce, weight) in weights {
match (nonce == expected_nonce, is_at_head) {
(true, _) => (),
(false, true) => {
// this may happen if some messages were already pruned from the source node
//
// this is not critical error and will be auto-resolved by messages lane (and target node)
log::info!(
target: "bridge",
"Some messages are missing from the {} node: {:?}. Target node may be out of sync?",
C::NAME,
expected_nonce..nonce,
);
}
(false, false) => {
// some nonces are missing from the middle/tail of the range
//
// this is critical error, because we can't miss any nonces
return make_missing_nonce_error(expected_nonce);
}
}
weights_map.insert(nonce, weight);
expected_nonce = nonce + 1;
is_at_head = false;
}
Ok(weights_map)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn make_message_weights_map_succeeds_if_no_messages_are_missing() {
assert_eq!(
make_message_weights_map::<relay_rialto_client::Rialto>(vec![(1, 0), (2, 0), (3, 0)], 1..=3).unwrap(),
vec![(1, 0), (2, 0), (3, 0)].into_iter().collect(),
);
}
#[test]
fn make_message_weights_map_succeeds_if_head_messages_are_missing() {
assert_eq!(
make_message_weights_map::<relay_rialto_client::Rialto>(vec![(2, 0), (3, 0)], 1..=3).unwrap(),
vec![(2, 0), (3, 0)].into_iter().collect(),
);
}
#[test]
fn make_message_weights_map_fails_if_mid_messages_are_missing() {
assert!(matches!(
make_message_weights_map::<relay_rialto_client::Rialto>(vec![(1, 0), (3, 0)], 1..=3),
Err(SubstrateError::Custom(_))
));
}
#[test]
fn make_message_weights_map_fails_if_tail_messages_are_missing() {
assert!(matches!(
make_message_weights_map::<relay_rialto_client::Rialto>(vec![(1, 0), (2, 0)], 1..=3),
Err(SubstrateError::Custom(_))
));
}
#[test]
fn make_message_weights_map_fails_if_all_messages_are_missing() {
assert!(matches!(
make_message_weights_map::<relay_rialto_client::Rialto>(vec![], 1..=3),
Err(SubstrateError::Custom(_))
));
}
}