Use best block state to craft message relay transactions (#499)

* use best block state to craft message relay transactions

* removed obsolete TODO

* Update relays/messages-relay/src/message_race_loop.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
This commit is contained in:
Svyatoslav Nikolsky
2020-11-23 10:28:56 +03:00
committed by Bastian Köcher
parent f88c71252a
commit d2ab81340a
6 changed files with 97 additions and 89 deletions
@@ -169,8 +169,10 @@ pub trait TargetClient<P: MessageLane>: Clone + Send + Sync {
pub struct ClientState<SelfHeaderId, PeerHeaderId> { pub struct ClientState<SelfHeaderId, PeerHeaderId> {
/// Best header id of this chain. /// Best header id of this chain.
pub best_self: SelfHeaderId, pub best_self: SelfHeaderId,
/// Best header id of the peer chain. /// Best finalized header id of this chain.
pub best_peer: PeerHeaderId, pub best_finalized_self: SelfHeaderId,
/// Best finalized header id of the peer chain read at the best block of this chain (at `best_finalized_self`).
pub best_finalized_peer_at_best_self: PeerHeaderId,
} }
/// State of source client in one-way message lane. /// State of source client in one-way message lane.
@@ -746,12 +748,14 @@ pub(crate) mod tests {
is_source_fails: true, is_source_fails: true,
source_state: ClientState { source_state: ClientState {
best_self: HeaderId(0, 0), best_self: HeaderId(0, 0),
best_peer: HeaderId(0, 0), best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
}, },
source_latest_generated_nonce: 1, source_latest_generated_nonce: 1,
target_state: ClientState { target_state: ClientState {
best_self: HeaderId(0, 0), best_self: HeaderId(0, 0),
best_peer: HeaderId(0, 0), best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
}, },
target_latest_received_nonce: 0, target_latest_received_nonce: 0,
..Default::default() ..Default::default()
@@ -766,9 +770,11 @@ pub(crate) mod tests {
if data.is_target_reconnected { if data.is_target_reconnected {
data.is_target_fails = false; data.is_target_fails = false;
} }
if data.target_state.best_peer.0 < 10 { if data.target_state.best_finalized_peer_at_best_self.0 < 10 {
data.target_state.best_peer = data.target_state.best_finalized_peer_at_best_self = HeaderId(
HeaderId(data.target_state.best_peer.0 + 1, data.target_state.best_peer.0 + 1); data.target_state.best_finalized_peer_at_best_self.0 + 1,
data.target_state.best_finalized_peer_at_best_self.0 + 1,
);
} }
if !data.submitted_messages_proofs.is_empty() { if !data.submitted_messages_proofs.is_empty() {
exit_sender.unbounded_send(()).unwrap(); exit_sender.unbounded_send(()).unwrap();
@@ -787,12 +793,14 @@ pub(crate) mod tests {
TestClientData { TestClientData {
source_state: ClientState { source_state: ClientState {
best_self: HeaderId(10, 10), best_self: HeaderId(10, 10),
best_peer: HeaderId(0, 0), best_finalized_self: HeaderId(10, 10),
best_finalized_peer_at_best_self: HeaderId(0, 0),
}, },
source_latest_generated_nonce: 10, source_latest_generated_nonce: 10,
target_state: ClientState { target_state: ClientState {
best_self: HeaderId(0, 0), best_self: HeaderId(0, 0),
best_peer: HeaderId(0, 0), best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: HeaderId(0, 0),
}, },
target_latest_received_nonce: 0, target_latest_received_nonce: 0,
..Default::default() ..Default::default()
@@ -800,12 +808,12 @@ pub(crate) mod tests {
Arc::new(|_: &mut TestClientData| {}), Arc::new(|_: &mut TestClientData| {}),
Arc::new(move |data: &mut TestClientData| { Arc::new(move |data: &mut TestClientData| {
// syncing source headers -> target chain (all at once) // syncing source headers -> target chain (all at once)
if data.target_state.best_peer.0 < data.source_state.best_self.0 { if data.target_state.best_finalized_peer_at_best_self.0 < data.source_state.best_finalized_self.0 {
data.target_state.best_peer = data.source_state.best_self; data.target_state.best_finalized_peer_at_best_self = data.source_state.best_finalized_self;
} }
// syncing target headers -> source chain (all at once) // syncing source headers -> target chain (all at once)
if data.source_state.best_peer.0 < data.target_state.best_self.0 { if data.source_state.best_finalized_peer_at_best_self.0 < data.target_state.best_finalized_self.0 {
data.source_state.best_peer = data.target_state.best_self; data.source_state.best_finalized_peer_at_best_self = data.target_state.best_finalized_self;
} }
// if target has received messages batch => increase blocks so that confirmations may be sent // if target has received messages batch => increase blocks so that confirmations may be sent
if data.target_latest_received_nonce == 4 if data.target_latest_received_nonce == 4
@@ -814,8 +822,10 @@ pub(crate) mod tests {
{ {
data.target_state.best_self = data.target_state.best_self =
HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.0 + 1); HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.0 + 1);
data.target_state.best_finalized_self = data.target_state.best_self;
data.source_state.best_self = data.source_state.best_self =
HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.0 + 1); HeaderId(data.source_state.best_self.0 + 1, data.source_state.best_self.0 + 1);
data.source_state.best_finalized_self = data.source_state.best_self;
} }
// if source has received all messages receiving confirmations => increase source block so that confirmations may be sent // if source has received all messages receiving confirmations => increase source block so that confirmations may be sent
if data.source_latest_confirmed_received_nonce == 10 { if data.source_latest_confirmed_received_nonce == 10 {
@@ -264,11 +264,6 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M
// There's additional condition in the message delivery race: target would reject messages // There's additional condition in the message delivery race: target would reject messages
// if there are too much unconfirmed messages at the inbound lane. // if there are too much unconfirmed messages at the inbound lane.
// https://github.com/paritytech/parity-bridges-common/issues/432
// TODO: message lane loop works with finalized blocks only, but we're submitting transactions that
// are updating best block (which may not be finalized yet). So all decisions that are made below
// may be outdated. This needs to be changed - all logic here must be built on top of best blocks.
// The receiving race is responsible to deliver confirmations back to the source chain. So if // The receiving race is responsible to deliver confirmations back to the source chain. So if
// there's a lot of unconfirmed messages, let's wait until it'll be able to do its job. // there's a lot of unconfirmed messages, let's wait until it'll be able to do its job.
let latest_received_nonce_at_target = target_nonces.latest_nonce; let latest_received_nonce_at_target = target_nonces.latest_nonce;
@@ -388,9 +383,8 @@ impl NoncesRange for MessageWeightsMap {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::message_lane_loop::{ use crate::message_lane_loop::tests::{
tests::{header_id, TestMessageLane, TestMessagesProof, TestSourceHeaderId, TestTargetHeaderId}, header_id, TestMessageLane, TestMessagesProof, TestSourceHeaderId, TestTargetHeaderId,
ClientState,
}; };
type TestRaceState = RaceState<TestSourceHeaderId, TestTargetHeaderId, TestMessagesProof>; type TestRaceState = RaceState<TestSourceHeaderId, TestTargetHeaderId, TestMessagesProof>;
@@ -398,14 +392,9 @@ mod tests {
fn prepare_strategy() -> (TestRaceState, TestStrategy) { fn prepare_strategy() -> (TestRaceState, TestStrategy) {
let mut race_state = RaceState { let mut race_state = RaceState {
source_state: Some(ClientState { best_finalized_source_header_id_at_source: Some(header_id(1)),
best_self: header_id(1), best_finalized_source_header_id_at_best_target: Some(header_id(1)),
best_peer: header_id(1), best_target_header_id: Some(header_id(1)),
}),
target_state: Some(ClientState {
best_self: header_id(1),
best_peer: header_id(1),
}),
nonces_to_submit: None, nonces_to_submit: None,
nonces_submitted: None, nonces_submitted: None,
}; };
@@ -168,10 +168,13 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof> {
/// State of the race. /// State of the race.
#[derive(Debug)] #[derive(Debug)]
pub struct RaceState<SourceHeaderId, TargetHeaderId, Proof> { pub struct RaceState<SourceHeaderId, TargetHeaderId, Proof> {
/// Source state, if known. /// Best finalized source header id at the source client.
pub source_state: Option<ClientState<SourceHeaderId, TargetHeaderId>>, pub best_finalized_source_header_id_at_source: Option<SourceHeaderId>,
/// Target state, if known. /// Best finalized source header id at the best block on the target
pub target_state: Option<ClientState<TargetHeaderId, SourceHeaderId>>, /// client (at the `best_finalized_source_header_id_at_best_target`).
pub best_finalized_source_header_id_at_best_target: Option<SourceHeaderId>,
/// Best header id at the target client.
pub best_target_header_id: Option<TargetHeaderId>,
/// Range of nonces that we have selected to submit. /// Range of nonces that we have selected to submit.
pub nonces_to_submit: Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Proof)>, pub nonces_to_submit: Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Proof)>,
/// Range of nonces that is currently submitted. /// Range of nonces that is currently submitted.
@@ -227,17 +230,23 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>>(
// when headers ids are updated // when headers ids are updated
source_state = race_source_updated.next() => { source_state = race_source_updated.next() => {
if let Some(source_state) = source_state { if let Some(source_state) = source_state {
if race_state.source_state.as_ref() != Some(&source_state) { let is_source_state_updated = race_state.best_finalized_source_header_id_at_source.as_ref()
!= Some(&source_state.best_finalized_self);
if is_source_state_updated {
source_nonces_required = true; source_nonces_required = true;
race_state.source_state = Some(source_state); race_state.best_finalized_source_header_id_at_source = Some(source_state.best_finalized_self);
} }
} }
}, },
target_state = race_target_updated.next() => { target_state = race_target_updated.next() => {
if let Some(target_state) = target_state { if let Some(target_state) = target_state {
if race_state.target_state.as_ref() != Some(&target_state) { let is_target_state_updated = race_state.best_target_header_id.as_ref()
!= Some(&target_state.best_self);
if is_target_state_updated {
target_nonces_required = true; target_nonces_required = true;
race_state.target_state = Some(target_state); race_state.best_target_header_id = Some(target_state.best_self);
race_state.best_finalized_source_header_id_at_best_target
= Some(target_state.best_finalized_peer_at_best_self);
} }
} }
}, },
@@ -358,10 +367,12 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>>(
} else if source_nonces_required { } else if source_nonces_required {
log::debug!(target: "bridge", "Asking {} about message nonces", P::source_name()); log::debug!(target: "bridge", "Asking {} about message nonces", P::source_name());
let at_block = race_state let at_block = race_state
.source_state .best_finalized_source_header_id_at_source
.as_ref() .as_ref()
.expect("source_nonces_required is only true when source_state is Some; qed") .expect(
.best_self "source_nonces_required is only true when\
best_finalized_source_header_id_at_source is Some; qed",
)
.clone(); .clone();
source_nonces.set(race_source.nonces(at_block, strategy.best_at_source()).fuse()); source_nonces.set(race_source.nonces(at_block, strategy.best_at_source()).fuse());
} else { } else {
@@ -388,10 +399,9 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>>(
if target_nonces_required { if target_nonces_required {
log::debug!(target: "bridge", "Asking {} about message nonces", P::target_name()); log::debug!(target: "bridge", "Asking {} about message nonces", P::target_name());
let at_block = race_state let at_block = race_state
.target_state .best_target_header_id
.as_ref() .as_ref()
.expect("target_nonces_required is only true when target_state is Some; qed") .expect("target_nonces_required is only true when best_target_header_id is Some; qed")
.best_self
.clone(); .clone();
target_nonces.set(race_target.nonces(at_block).fuse()); target_nonces.set(race_target.nonces(at_block).fuse());
} else { } else {
@@ -404,8 +414,9 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>>(
impl<SourceHeaderId, TargetHeaderId, Proof> Default for RaceState<SourceHeaderId, TargetHeaderId, Proof> { impl<SourceHeaderId, TargetHeaderId, Proof> Default for RaceState<SourceHeaderId, TargetHeaderId, Proof> {
fn default() -> Self { fn default() -> Self {
RaceState { RaceState {
source_state: None, best_finalized_source_header_id_at_source: None,
target_state: None, best_finalized_source_header_id_at_best_target: None,
best_target_header_id: None,
nonces_to_submit: None, nonces_to_submit: None,
nonces_submitted: None, nonces_submitted: None,
} }
@@ -446,11 +457,20 @@ where
SourceHeaderId: Clone, SourceHeaderId: Clone,
Strategy: RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>, Strategy: RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>,
{ {
race_state.target_state.as_ref().and_then(|target_state| { race_state
strategy .best_finalized_source_header_id_at_best_target
.select_nonces_to_deliver(&race_state) .as_ref()
.map(|(nonces_range, proof_parameters)| (target_state.best_peer.clone(), nonces_range, proof_parameters)) .and_then(|best_finalized_source_header_id_at_best_target| {
}) strategy
.select_nonces_to_deliver(&race_state)
.map(|(nonces_range, proof_parameters)| {
(
best_finalized_source_header_id_at_best_target.clone(),
nonces_range,
proof_parameters,
)
})
})
} }
#[cfg(test)] #[cfg(test)]
@@ -468,14 +488,9 @@ mod tests {
// target node only knows about source' BEST_AT_TARGET block // target node only knows about source' BEST_AT_TARGET block
// source node has BEST_AT_SOURCE > BEST_AT_TARGET block // source node has BEST_AT_SOURCE > BEST_AT_TARGET block
let mut race_state = RaceState::<_, _, ()> { let mut race_state = RaceState::<_, _, ()> {
source_state: Some(ClientState { best_finalized_source_header_id_at_source: Some(HeaderId(BEST_AT_SOURCE, BEST_AT_SOURCE)),
best_self: HeaderId(BEST_AT_SOURCE, BEST_AT_SOURCE), best_finalized_source_header_id_at_best_target: Some(HeaderId(BEST_AT_TARGET, BEST_AT_TARGET)),
best_peer: HeaderId(0, 0), best_target_header_id: Some(HeaderId(0, 0)),
}),
target_state: Some(ClientState {
best_self: HeaderId(0, 0),
best_peer: HeaderId(BEST_AT_TARGET, BEST_AT_TARGET),
}),
nonces_to_submit: None, nonces_to_submit: None,
nonces_submitted: None, nonces_submitted: None,
}; };
@@ -88,9 +88,8 @@ where
// 2) we can't deliver new nonce until header, that has emitted this nonce, is finalized // 2) we can't deliver new nonce until header, that has emitted this nonce, is finalized
// by target client // by target client
// 3) selector is used for more complicated logic // 3) selector is used for more complicated logic
let best_header_at_target = &race_state.target_state.as_ref()?.best_peer; let best_header_at_target = &race_state.best_finalized_source_header_id_at_best_target.as_ref()?;
let mut nonces_end = None; let mut nonces_end = None;
while let Some((queued_at, queued_range)) = self.source_queue.pop_front() { while let Some((queued_at, queued_range)) = self.source_queue.pop_front() {
// select (sub) range to deliver // select (sub) range to deliver
let queued_range_begin = queued_range.begin(); let queued_range_begin = queued_range.begin();
@@ -241,10 +240,7 @@ where
mod tests { mod tests {
use super::*; use super::*;
use crate::message_lane::MessageLane; use crate::message_lane::MessageLane;
use crate::message_lane_loop::{ use crate::message_lane_loop::tests::{header_id, TestMessageLane, TestMessagesProof};
tests::{header_id, TestMessageLane, TestMessagesProof},
ClientState,
};
type SourceNoncesRange = RangeInclusive<MessageNonce>; type SourceNoncesRange = RangeInclusive<MessageNonce>;
@@ -374,21 +370,15 @@ mod tests {
let mut strategy = BasicStrategy::<TestMessageLane>::new(); let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.source_nonces_updated(header_id(1), source_nonces(1..=1)); strategy.source_nonces_updated(header_id(1), source_nonces(1..=1));
strategy.source_nonces_updated(header_id(2), source_nonces(2..=2)); strategy.source_nonces_updated(header_id(2), source_nonces(2..=2));
strategy.source_nonces_updated(header_id(3), source_nonces(6..=6)); strategy.source_nonces_updated(header_id(3), source_nonces(3..=6));
strategy.source_nonces_updated(header_id(5), source_nonces(8..=8)); strategy.source_nonces_updated(header_id(5), source_nonces(7..=8));
state.target_state = Some(ClientState { state.best_finalized_source_header_id_at_best_target = Some(header_id(4));
best_self: header_id(0),
best_peer: header_id(4),
});
assert_eq!(strategy.select_nonces_to_deliver(&state), Some((1..=6, ()))); assert_eq!(strategy.select_nonces_to_deliver(&state), Some((1..=6, ())));
strategy.target_nonces_updated(target_nonces(6), &mut state); strategy.target_nonces_updated(target_nonces(6), &mut state);
assert_eq!(strategy.select_nonces_to_deliver(&state), None); assert_eq!(strategy.select_nonces_to_deliver(&state), None);
state.target_state = Some(ClientState { state.best_finalized_source_header_id_at_best_target = Some(header_id(5));
best_self: header_id(0),
best_peer: header_id(5),
});
assert_eq!(strategy.select_nonces_to_deliver(&state), Some((7..=8, ()))); assert_eq!(strategy.select_nonces_to_deliver(&state), Some((7..=8, ())));
strategy.target_nonces_updated(target_nonces(8), &mut state); strategy.target_nonces_updated(target_nonces(8), &mut state);
assert_eq!(strategy.select_nonces_to_deliver(&state), None); assert_eq!(strategy.select_nonces_to_deliver(&state), None);
@@ -400,10 +390,10 @@ mod tests {
let mut strategy = BasicStrategy::<TestMessageLane>::new(); let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.source_nonces_updated(header_id(1), source_nonces(1..=100)); strategy.source_nonces_updated(header_id(1), source_nonces(1..=100));
state.target_state = Some(ClientState { state.best_finalized_source_header_id_at_source = Some(header_id(1));
best_self: header_id(0), state.best_finalized_source_header_id_at_best_target = Some(header_id(1));
best_peer: header_id(1), state.best_target_header_id = Some(header_id(1));
});
assert_eq!( assert_eq!(
strategy.select_nonces_to_deliver_with_selector(&state, |_| Some(50..=100)), strategy.select_nonces_to_deliver_with_selector(&state, |_| Some(50..=100)),
Some(1..=49), Some(1..=49),
@@ -417,11 +407,9 @@ mod tests {
let mut strategy = BasicStrategy::<TestMessageLane>::new(); let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.source_nonces_updated(header_id(1), source_nonces(1..=100)); strategy.source_nonces_updated(header_id(1), source_nonces(1..=100));
strategy.target_nonces_updated(target_nonces(50), &mut state); strategy.target_nonces_updated(target_nonces(50), &mut state);
state.target_state = Some(ClientState { state.best_finalized_source_header_id_at_source = Some(header_id(1));
best_self: header_id(0), state.best_finalized_source_header_id_at_best_target = Some(header_id(1));
best_peer: header_id(1), state.best_target_header_id = Some(header_id(1));
});
strategy.select_nonces_to_deliver_with_selector(&state, invalid_selector); strategy.select_nonces_to_deliver_with_selector(&state, invalid_selector);
} }
+2 -2
View File
@@ -64,7 +64,7 @@ impl MessageLaneLoopMetrics {
.set(source_client_state.best_self.0.into()); .set(source_client_state.best_self.0.into());
self.best_block_numbers self.best_block_numbers
.with_label_values(&["target_at_source"]) .with_label_values(&["target_at_source"])
.set(source_client_state.best_peer.0.into()); .set(source_client_state.best_finalized_peer_at_best_self.0.into());
} }
/// Update target client state metrics. /// Update target client state metrics.
@@ -74,7 +74,7 @@ impl MessageLaneLoopMetrics {
.set(target_client_state.best_self.0.into()); .set(target_client_state.best_self.0.into());
self.best_block_numbers self.best_block_numbers
.with_label_values(&["source_at_target"]) .with_label_values(&["source_at_target"])
.set(target_client_state.best_peer.0.into()); .set(target_client_state.best_finalized_peer_at_best_self.0.into());
} }
/// Update latest generated nonce at source. /// Update latest generated nonce at source.
@@ -234,13 +234,18 @@ where
let self_best_finalized_header = self_client.header_by_hash(self_best_finalized_header_hash).await?; let self_best_finalized_header = self_client.header_by_hash(self_best_finalized_header_hash).await?;
let self_best_finalized_id = HeaderId(*self_best_finalized_header.number(), self_best_finalized_header_hash); let self_best_finalized_id = HeaderId(*self_best_finalized_header.number(), self_best_finalized_header_hash);
// now let's read our best header on **this** chain
let self_best_header = self_client.best_header().await?;
let self_best_hash = self_best_header.hash();
let self_best_id = HeaderId(*self_best_header.number(), self_best_hash);
// now let's read id of best finalized peer header at our best finalized block // now let's read id of best finalized peer header at our best finalized block
let best_finalized_peer_on_self_method = format!("{}HeaderApi_finalized_block", bridged_chain_name); let best_finalized_peer_on_self_method = format!("{}HeaderApi_finalized_block", bridged_chain_name);
let encoded_best_finalized_peer_on_self = self_client let encoded_best_finalized_peer_on_self = self_client
.state_call( .state_call(
best_finalized_peer_on_self_method, best_finalized_peer_on_self_method,
Bytes(Vec::new()), Bytes(Vec::new()),
Some(self_best_finalized_header_hash), Some(self_best_hash),
) )
.await?; .await?;
let decoded_best_finalized_peer_on_self: (BridgedHeaderNumber, BridgedHeaderHash) = let decoded_best_finalized_peer_on_self: (BridgedHeaderNumber, BridgedHeaderHash) =
@@ -251,7 +256,8 @@ where
); );
Ok(ClientState { Ok(ClientState {
best_self: self_best_finalized_id, best_self: self_best_id,
best_peer: peer_on_self_best_finalized_id, best_finalized_self: self_best_finalized_id,
best_finalized_peer_at_best_self: peer_on_self_best_finalized_id,
}) })
} }