Millau -> Rialto messages relay (#399)

* Millau messages -> Rialto relay

* prepare for custom race strategy of delivery race

* custom strategy for delivery race

* update TODOs

* add reference to issue 457

* impl reconnect

* clippy

* fix check in test

* fmt

* removed obsolete TODO

* fixed another TODOs

* fmt

* use MAX_UNCONFIRMED_MESSAGES_AT_INBOUND_LANE const from primitives

* Update relays/messages-relay/src/message_lane_loop.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* added SubstrateMessagesProof typedef

* fix test

* removed comment

* additional_proof_required -> ProofParameters

* typo

* multiline literal

* clippy

* fix typo

* and_then -> await

* update_source_latest_confirmed_nonce

* Update relays/messages-relay/src/message_race_delivery.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
This commit is contained in:
Svyatoslav Nikolsky
2020-11-05 13:12:31 +03:00
committed by Bastian Köcher
parent 804ef55146
commit d4fc7bebdc
27 changed files with 1506 additions and 451 deletions
+5 -3
View File
@@ -29,6 +29,8 @@ mod metrics;
pub mod message_lane;
pub mod message_lane_loop;
pub mod message_race_delivery;
pub mod message_race_loop;
pub mod message_race_receiving;
mod message_race_delivery;
mod message_race_loop;
mod message_race_receiving;
mod message_race_strategy;
@@ -21,11 +21,11 @@
use relay_utils::HeaderId;
use num_traits::{One, Zero};
use num_traits::{CheckedSub, One, Zero};
use std::fmt::Debug;
/// One-way message lane.
pub trait MessageLane {
pub trait MessageLane: Clone + Send + Sync {
/// Name of the messages source.
const SOURCE_NAME: &'static str;
/// Name of the messages target.
@@ -33,30 +33,33 @@ pub trait MessageLane {
/// Message nonce type.
type MessageNonce: Clone
+ Send
+ Sync
+ Copy
+ Debug
+ Default
+ From<u32>
+ Into<u64>
+ Ord
+ CheckedSub
+ std::ops::Add<Output = Self::MessageNonce>
+ One
+ Zero;
/// Messages proof.
type MessagesProof: Clone;
type MessagesProof: Clone + Send + Sync;
/// Messages receiving proof.
type MessagesReceivingProof: Clone;
type MessagesReceivingProof: Clone + Send + Sync;
/// Number of the source header.
type SourceHeaderNumber: Clone + Debug + Default + Ord + PartialEq + Into<u64>;
type SourceHeaderNumber: Clone + Debug + Ord + PartialEq + Into<u64> + Send + Sync;
/// Hash of the source header.
type SourceHeaderHash: Clone + Debug + Default + PartialEq;
type SourceHeaderHash: Clone + Debug + Default + PartialEq + Send + Sync;
/// Number of the target header.
type TargetHeaderNumber: Clone + Debug + Default + Ord + PartialEq + Into<u64>;
type TargetHeaderNumber: Clone + Debug + Ord + PartialEq + Into<u64> + Send + Sync;
/// Hash of the target header.
type TargetHeaderHash: Clone + Debug + Default + PartialEq;
type TargetHeaderHash: Clone + Debug + Default + PartialEq + Send + Sync;
}
/// Source header id within given one-way message lane.
@@ -24,9 +24,6 @@
//! finalized header. I.e. when talking about headers in lane context, we
//! only care about finalized headers.
// Until there'll be actual message-lane in the runtime.
#![allow(dead_code)]
use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf};
use crate::message_race_delivery::run as run_message_delivery_race;
use crate::message_race_receiving::run as run_message_receiving_race;
@@ -42,14 +39,33 @@ use relay_utils::{
};
use std::{fmt::Debug, future::Future, ops::RangeInclusive, time::Duration};
/// Message lane loop configuration params.
#[derive(Debug, Clone)]
pub struct Params<MessageNonce> {
/// Id of lane this loop is servicing.
pub lane: LaneId,
/// Interval at which we ask target node about its updates.
pub source_tick: Duration,
/// Interval at which we ask target node about its updates.
pub target_tick: Duration,
/// Delay between moments when connection error happens and our reconnect attempt.
pub reconnect_delay: Duration,
/// The loop will auto-restart if there has been no updates during this period.
pub stall_timeout: Duration,
/// Message delivery race will stop delivering messages if there are `max_unconfirmed_nonces_at_target`
/// unconfirmed nonces on the target node. The race would continue once they're confirmed by the
/// receiving race.
pub max_unconfirmed_nonces_at_target: MessageNonce,
}
/// Source client trait.
#[async_trait(?Send)]
pub trait SourceClient<P: MessageLane>: Clone {
#[async_trait]
pub trait SourceClient<P: MessageLane>: Clone + Send + Sync {
/// Type of error this clients returns.
type Error: std::fmt::Debug + MaybeConnectionError;
/// Try to reconnect to source node.
fn reconnect(self) -> Self;
async fn reconnect(self) -> Result<Self, Self::Error>;
/// Returns state of the client.
async fn state(&self) -> Result<SourceClientState<P>, Self::Error>;
@@ -70,6 +86,7 @@ pub trait SourceClient<P: MessageLane>: Clone {
&self,
id: SourceHeaderIdOf<P>,
nonces: RangeInclusive<P::MessageNonce>,
include_outbound_lane_state: bool,
) -> Result<(SourceHeaderIdOf<P>, RangeInclusive<P::MessageNonce>, P::MessagesProof), Self::Error>;
/// Submit messages receiving proof.
@@ -77,17 +94,17 @@ pub trait SourceClient<P: MessageLane>: Clone {
&self,
generated_at_block: TargetHeaderIdOf<P>,
proof: P::MessagesReceivingProof,
) -> Result<RangeInclusive<P::MessageNonce>, Self::Error>;
) -> Result<(), Self::Error>;
}
/// Target client trait.
#[async_trait(?Send)]
pub trait TargetClient<P: MessageLane>: Clone {
#[async_trait]
pub trait TargetClient<P: MessageLane>: Clone + Send + Sync {
/// Type of error this clients returns.
type Error: std::fmt::Debug + MaybeConnectionError;
/// Try to reconnect to source node.
fn reconnect(self) -> Self;
async fn reconnect(self) -> Result<Self, Self::Error>;
/// Returns state of the client.
async fn state(&self) -> Result<TargetClientState<P>, Self::Error>;
@@ -98,6 +115,12 @@ pub trait TargetClient<P: MessageLane>: Clone {
id: TargetHeaderIdOf<P>,
) -> Result<(TargetHeaderIdOf<P>, P::MessageNonce), Self::Error>;
/// Get nonce of latest confirmed message.
async fn latest_confirmed_received_nonce(
&self,
id: TargetHeaderIdOf<P>,
) -> Result<(TargetHeaderIdOf<P>, P::MessageNonce), Self::Error>;
/// Prove messages receiving at given block.
async fn prove_messages_receiving(
&self,
@@ -138,15 +161,10 @@ pub struct ClientsState<P: MessageLane> {
}
/// Run message lane service loop.
#[allow(clippy::too_many_arguments)]
pub fn run<P: MessageLane>(
lane: LaneId,
params: Params<P::MessageNonce>,
mut source_client: impl SourceClient<P>,
source_tick: Duration,
mut target_client: impl TargetClient<P>,
target_tick: Duration,
reconnect_delay: Duration,
stall_timeout: Duration,
metrics_params: Option<MetricsParams>,
exit_signal: impl Future<Output = ()>,
) {
@@ -162,7 +180,7 @@ pub fn run<P: MessageLane>(
"{}_to_{}_MessageLoop/{}",
P::SOURCE_NAME,
P::TARGET_NAME,
hex::encode(lane)
hex::encode(params.lane)
),
metrics_params,
&metrics_global,
@@ -171,11 +189,9 @@ pub fn run<P: MessageLane>(
loop {
let result = run_until_connection_lost(
params.clone(),
source_client.clone(),
source_tick,
target_client.clone(),
target_tick,
stall_timeout,
if metrics_enabled {
Some(&mut metrics_global)
} else {
@@ -192,15 +208,41 @@ pub fn run<P: MessageLane>(
match result {
Ok(()) => break,
Err(failed_client) => {
async_std::task::sleep(reconnect_delay).await;
Err(failed_client) => loop {
async_std::task::sleep(params.reconnect_delay).await;
if failed_client == FailedClient::Both || failed_client == FailedClient::Source {
source_client = source_client.reconnect();
source_client = match source_client.clone().reconnect().await {
Ok(source_client) => source_client,
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect {}. Going to retry in {}s: {:?}",
P::SOURCE_NAME,
params.reconnect_delay.as_secs(),
error,
);
continue;
}
}
}
if failed_client == FailedClient::Both || failed_client == FailedClient::Target {
target_client = target_client.reconnect();
target_client = match target_client.clone().reconnect().await {
Ok(target_client) => target_client,
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect {}. Going to retry in {}s: {:?}",
P::TARGET_NAME,
params.reconnect_delay.as_secs(),
error,
);
continue;
}
}
}
}
break;
},
}
log::debug!(
@@ -214,13 +256,10 @@ pub fn run<P: MessageLane>(
}
/// Run one-way message delivery loop until connection with target or source node is lost, or exit signal is received.
#[allow(clippy::too_many_arguments)]
async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: TargetClient<P>>(
params: Params<P::MessageNonce>,
source_client: SC,
source_tick: Duration,
target_client: TC,
target_tick: Duration,
stall_timeout: Duration,
mut metrics_global: Option<&mut GlobalMetrics>,
metrics_msg: Option<MessageLaneLoopMetrics>,
exit_signal: impl Future<Output = ()>,
@@ -230,14 +269,14 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
let mut source_state_required = true;
let source_state = source_client.state().fuse();
let source_go_offline_future = futures::future::Fuse::terminated();
let source_tick_stream = interval(source_tick).fuse();
let source_tick_stream = interval(params.source_tick).fuse();
let mut target_retry_backoff = retry_backoff();
let mut target_client_is_online = false;
let mut target_state_required = true;
let target_state = target_client.state().fuse();
let target_go_offline_future = futures::future::Fuse::terminated();
let target_tick_stream = interval(target_tick).fuse();
let target_tick_stream = interval(params.target_tick).fuse();
let (
(delivery_source_state_sender, delivery_source_state_receiver),
@@ -248,8 +287,9 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
delivery_source_state_receiver,
target_client.clone(),
delivery_target_state_receiver,
stall_timeout,
params.stall_timeout,
metrics_msg.clone(),
params.max_unconfirmed_nonces_at_target,
)
.fuse();
@@ -262,7 +302,7 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
receiving_source_state_receiver,
target_client.clone(),
receiving_target_state_receiver,
stall_timeout,
params.stall_timeout,
metrics_msg.clone(),
)
.fuse();
@@ -395,7 +435,7 @@ pub(crate) mod tests {
}
pub type TestMessageNonce = u64;
pub type TestMessagesProof = RangeInclusive<TestMessageNonce>;
pub type TestMessagesProof = (RangeInclusive<TestMessageNonce>, Option<TestMessageNonce>);
pub type TestMessagesReceivingProof = TestMessageNonce;
pub type TestSourceHeaderNumber = u64;
@@ -405,20 +445,15 @@ pub(crate) mod tests {
pub type TestTargetHeaderHash = u64;
#[derive(Debug)]
pub enum TestError {
Logic,
Connection,
}
pub struct TestError;
impl MaybeConnectionError for TestError {
fn is_connection_error(&self) -> bool {
match *self {
TestError::Logic => false,
TestError::Connection => true,
}
true
}
}
#[derive(Clone)]
pub struct TestMessageLane;
impl MessageLane for TestMessageLane {
@@ -449,33 +484,34 @@ pub(crate) mod tests {
is_target_reconnected: bool,
target_state: SourceClientState<TestMessageLane>,
target_latest_received_nonce: TestMessageNonce,
target_latest_confirmed_received_nonce: TestMessageNonce,
submitted_messages_proofs: Vec<TestMessagesProof>,
}
#[derive(Clone)]
pub struct TestSourceClient {
data: Arc<Mutex<TestClientData>>,
tick: Arc<dyn Fn(&mut TestClientData)>,
tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
}
#[async_trait(?Send)]
#[async_trait]
impl SourceClient<TestMessageLane> for TestSourceClient {
type Error = TestError;
fn reconnect(self) -> Self {
async fn reconnect(self) -> Result<Self, Self::Error> {
{
let mut data = self.data.lock();
(self.tick)(&mut *data);
data.is_source_reconnected = true;
}
self
Ok(self)
}
async fn state(&self) -> Result<SourceClientState<TestMessageLane>, Self::Error> {
let mut data = self.data.lock();
(self.tick)(&mut *data);
if data.is_source_fails {
return Err(TestError::Connection);
return Err(TestError);
}
Ok(data.source_state.clone())
}
@@ -487,7 +523,7 @@ pub(crate) mod tests {
let mut data = self.data.lock();
(self.tick)(&mut *data);
if data.is_source_fails {
return Err(TestError::Connection);
return Err(TestError);
}
Ok((id, data.source_latest_generated_nonce))
}
@@ -505,6 +541,7 @@ pub(crate) mod tests {
&self,
id: SourceHeaderIdOf<TestMessageLane>,
nonces: RangeInclusive<TestMessageNonce>,
include_outbound_lane_state: bool,
) -> Result<
(
SourceHeaderIdOf<TestMessageLane>,
@@ -513,46 +550,59 @@ pub(crate) mod tests {
),
Self::Error,
> {
Ok((id, nonces.clone(), nonces))
let mut data = self.data.lock();
(self.tick)(&mut *data);
Ok((
id,
nonces.clone(),
(
nonces,
if include_outbound_lane_state {
Some(data.source_latest_confirmed_received_nonce)
} else {
None
},
),
))
}
async fn submit_messages_receiving_proof(
&self,
_generated_at_block: TargetHeaderIdOf<TestMessageLane>,
proof: TestMessagesReceivingProof,
) -> Result<RangeInclusive<TestMessageNonce>, Self::Error> {
) -> Result<(), Self::Error> {
let mut data = self.data.lock();
(self.tick)(&mut *data);
data.submitted_messages_receiving_proofs.push(proof);
data.source_latest_confirmed_received_nonce = proof;
Ok(proof..=proof)
Ok(())
}
}
#[derive(Clone)]
pub struct TestTargetClient {
data: Arc<Mutex<TestClientData>>,
tick: Arc<dyn Fn(&mut TestClientData)>,
tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
}
#[async_trait(?Send)]
#[async_trait]
impl TargetClient<TestMessageLane> for TestTargetClient {
type Error = TestError;
fn reconnect(self) -> Self {
async fn reconnect(self) -> Result<Self, Self::Error> {
{
let mut data = self.data.lock();
(self.tick)(&mut *data);
data.is_target_reconnected = true;
}
self
Ok(self)
}
async fn state(&self) -> Result<TargetClientState<TestMessageLane>, Self::Error> {
let mut data = self.data.lock();
(self.tick)(&mut *data);
if data.is_target_fails {
return Err(TestError::Connection);
return Err(TestError);
}
Ok(data.target_state.clone())
}
@@ -564,11 +614,23 @@ pub(crate) mod tests {
let mut data = self.data.lock();
(self.tick)(&mut *data);
if data.is_target_fails {
return Err(TestError::Connection);
return Err(TestError);
}
Ok((id, data.target_latest_received_nonce))
}
async fn latest_confirmed_received_nonce(
&self,
id: TargetHeaderIdOf<TestMessageLane>,
) -> Result<(TargetHeaderIdOf<TestMessageLane>, TestMessageNonce), Self::Error> {
let mut data = self.data.lock();
(self.tick)(&mut *data);
if data.is_target_fails {
return Err(TestError);
}
Ok((id, data.target_latest_confirmed_received_nonce))
}
async fn prove_messages_receiving(
&self,
id: TargetHeaderIdOf<TestMessageLane>,
@@ -585,11 +647,14 @@ pub(crate) mod tests {
let mut data = self.data.lock();
(self.tick)(&mut *data);
if data.is_target_fails {
return Err(TestError::Connection);
return Err(TestError);
}
data.target_state.best_self =
HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1);
data.target_latest_received_nonce = *proof.end();
data.target_latest_received_nonce = *proof.0.end();
if let Some(target_latest_confirmed_received_nonce) = proof.1 {
data.target_latest_confirmed_received_nonce = target_latest_confirmed_received_nonce;
}
data.submitted_messages_proofs.push(proof);
Ok(nonces)
}
@@ -597,8 +662,8 @@ pub(crate) mod tests {
fn run_loop_test(
data: TestClientData,
source_tick: Arc<dyn Fn(&mut TestClientData)>,
target_tick: Arc<dyn Fn(&mut TestClientData)>,
source_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
target_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
exit_signal: impl Future<Output = ()>,
) -> TestClientData {
async_std::task::block_on(async {
@@ -613,17 +678,19 @@ pub(crate) mod tests {
tick: target_tick,
};
run(
[0, 0, 0, 0],
Params {
lane: [0, 0, 0, 0],
source_tick: Duration::from_millis(100),
target_tick: Duration::from_millis(100),
reconnect_delay: Duration::from_millis(0),
stall_timeout: Duration::from_millis(60),
max_unconfirmed_nonces_at_target: 100,
},
source_client,
Duration::from_millis(100),
target_client,
Duration::from_millis(100),
Duration::from_millis(0),
Duration::from_secs(60),
None,
exit_signal,
);
let result = data.lock().clone();
result
})
@@ -671,7 +738,7 @@ pub(crate) mod tests {
exit_receiver.into_future().map(|(_, _)| ()),
);
assert_eq!(result.submitted_messages_proofs, vec![1..=1],);
assert_eq!(result.submitted_messages_proofs, vec![(1..=1, None)],);
}
#[test]
@@ -717,7 +784,13 @@ pub(crate) mod tests {
exit_receiver.into_future().map(|(_, _)| ()),
);
assert_eq!(result.submitted_messages_proofs, vec![1..=4, 5..=8, 9..=10],);
// there are no strict restrictions on when reward confirmation should come
// (because `max_unconfirmed_nonces_at_target` is `100` in tests and this confirmation
// depends on the state of both clients)
// => we do not check it here
assert_eq!(result.submitted_messages_proofs[0].0, 1..=4);
assert_eq!(result.submitted_messages_proofs[1].0, 5..=8);
assert_eq!(result.submitted_messages_proofs[2].0, 9..=10);
assert!(!result.submitted_messages_receiving_proofs.is_empty());
}
}
@@ -18,14 +18,15 @@ use crate::message_lane_loop::{
SourceClient as MessageLaneSourceClient, SourceClientState, TargetClient as MessageLaneTargetClient,
TargetClientState,
};
use crate::message_race_loop::{MessageRace, RaceState, RaceStrategy, SourceClient, TargetClient};
use crate::message_race_loop::{ClientNonces, MessageRace, RaceState, RaceStrategy, SourceClient, TargetClient};
use crate::message_race_strategy::BasicStrategy;
use crate::metrics::MessageLaneLoopMetrics;
use async_trait::async_trait;
use futures::stream::FusedStream;
use num_traits::{One, Zero};
use relay_utils::{FailedClient, HeaderId};
use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive, time::Duration};
use num_traits::CheckedSub;
use relay_utils::FailedClient;
use std::{marker::PhantomData, ops::RangeInclusive, time::Duration};
/// Maximal number of messages to relay in single transaction.
const MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX: u32 = 4;
@@ -38,6 +39,7 @@ pub async fn run<P: MessageLane>(
target_state_updates: impl FusedStream<Item = TargetClientState<P>>,
stall_timeout: Duration,
metrics_msg: Option<MessageLaneLoopMetrics>,
max_unconfirmed_nonces_at_target: P::MessageNonce,
) -> Result<(), FailedClient> {
crate::message_race_loop::run(
MessageDeliveryRaceSource {
@@ -53,7 +55,12 @@ pub async fn run<P: MessageLane>(
},
target_state_updates,
stall_timeout,
MessageDeliveryStrategy::<P>::new(MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX.into()),
MessageDeliveryStrategy::<P> {
max_unconfirmed_nonces_at_target,
source_nonces: None,
target_nonces: None,
strategy: BasicStrategy::new(MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX.into()),
},
)
.await
}
@@ -84,33 +91,46 @@ struct MessageDeliveryRaceSource<P: MessageLane, C> {
_phantom: PhantomData<P>,
}
#[async_trait(?Send)]
#[async_trait]
impl<P, C> SourceClient<MessageDeliveryRace<P>> for MessageDeliveryRaceSource<P, C>
where
P: MessageLane,
C: MessageLaneSourceClient<P>,
{
type Error = C::Error;
type ProofParameters = bool;
async fn latest_nonce(
async fn nonces(
&self,
at_block: SourceHeaderIdOf<P>,
) -> Result<(SourceHeaderIdOf<P>, P::MessageNonce), Self::Error> {
let result = self.client.latest_generated_nonce(at_block).await;
) -> Result<(SourceHeaderIdOf<P>, ClientNonces<P::MessageNonce>), Self::Error> {
let (at_block, latest_generated_nonce) = self.client.latest_generated_nonce(at_block).await?;
let (at_block, latest_confirmed_nonce) = self.client.latest_confirmed_received_nonce(at_block).await?;
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
if let Ok((_, source_latest_generated_nonce)) = result.as_ref() {
metrics_msg.update_target_latest_received_nonce::<P>(*source_latest_generated_nonce);
}
metrics_msg.update_source_latest_generated_nonce::<P>(latest_generated_nonce);
metrics_msg.update_source_latest_confirmed_nonce::<P>(latest_confirmed_nonce);
}
result
Ok((
at_block,
ClientNonces {
latest_nonce: latest_generated_nonce,
confirmed_nonce: Some(latest_confirmed_nonce),
},
))
}
async fn generate_proof(
&self,
at_block: SourceHeaderIdOf<P>,
nonces: RangeInclusive<P::MessageNonce>,
proof_parameters: Self::ProofParameters,
) -> Result<(SourceHeaderIdOf<P>, RangeInclusive<P::MessageNonce>, P::MessagesProof), Self::Error> {
self.client.prove_messages(at_block, nonces).await
let outbound_state_proof_required = proof_parameters;
self.client
.prove_messages(at_block, nonces, outbound_state_proof_required)
.await
}
}
@@ -121,7 +141,7 @@ struct MessageDeliveryRaceTarget<P: MessageLane, C> {
_phantom: PhantomData<P>,
}
#[async_trait(?Send)]
#[async_trait]
impl<P, C> TargetClient<MessageDeliveryRace<P>> for MessageDeliveryRaceTarget<P, C>
where
P: MessageLane,
@@ -129,17 +149,25 @@ where
{
type Error = C::Error;
async fn latest_nonce(
async fn nonces(
&self,
at_block: TargetHeaderIdOf<P>,
) -> Result<(TargetHeaderIdOf<P>, P::MessageNonce), Self::Error> {
let result = self.client.latest_received_nonce(at_block).await;
) -> Result<(TargetHeaderIdOf<P>, ClientNonces<P::MessageNonce>), Self::Error> {
let (at_block, latest_received_nonce) = self.client.latest_received_nonce(at_block).await?;
let (at_block, latest_confirmed_nonce) = self.client.latest_confirmed_received_nonce(at_block).await?;
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
if let Ok((_, target_latest_received_nonce)) = result.as_ref() {
metrics_msg.update_target_latest_received_nonce::<P>(*target_latest_received_nonce);
}
metrics_msg.update_target_latest_received_nonce::<P>(latest_received_nonce);
metrics_msg.update_target_latest_confirmed_nonce::<P>(latest_confirmed_nonce);
}
result
Ok((
at_block,
ClientNonces {
latest_nonce: latest_received_nonce,
confirmed_nonce: Some(latest_confirmed_nonce),
},
))
}
async fn submit_proof(
@@ -155,7 +183,18 @@ where
}
/// Messages delivery strategy.
type MessageDeliveryStrategy<P> = DeliveryStrategy<
struct MessageDeliveryStrategy<P: MessageLane> {
/// Maximal unconfirmed nonces at target client.
max_unconfirmed_nonces_at_target: P::MessageNonce,
/// Latest nonces from the source client.
source_nonces: Option<ClientNonces<P::MessageNonce>>,
/// Target nonces from the source client.
target_nonces: Option<ClientNonces<P::MessageNonce>>,
/// Basic delivery strategy.
strategy: MessageDeliveryStrategyBase<P>,
}
type MessageDeliveryStrategyBase<P> = BasicStrategy<
<P as MessageLane>::SourceHeaderNumber,
<P as MessageLane>::SourceHeaderHash,
<P as MessageLane>::TargetHeaderNumber,
@@ -164,280 +203,90 @@ type MessageDeliveryStrategy<P> = DeliveryStrategy<
<P as MessageLane>::MessagesProof,
>;
/// Nonces delivery strategy.
#[derive(Debug)]
pub struct DeliveryStrategy<SourceHeaderNumber, SourceHeaderHash, TargetHeaderNumber, TargetHeaderHash, Nonce, Proof> {
/// All queued nonces.
source_queue: VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, Nonce)>,
/// Best nonce known to target node.
target_nonce: Nonce,
/// Max nonces to relay in single transaction.
max_nonces_to_relay_in_single_tx: Nonce,
/// Unused generic types dump.
_phantom: PhantomData<(TargetHeaderNumber, TargetHeaderHash, Proof)>,
}
impl<SourceHeaderNumber, SourceHeaderHash, TargetHeaderNumber, TargetHeaderHash, Nonce: Default, Proof>
DeliveryStrategy<SourceHeaderNumber, SourceHeaderHash, TargetHeaderNumber, TargetHeaderHash, Nonce, Proof>
impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessageNonce, P::MessagesProof>
for MessageDeliveryStrategy<P>
{
/// Create new delivery strategy.
pub fn new(max_nonces_to_relay_in_single_tx: Nonce) -> Self {
DeliveryStrategy {
source_queue: VecDeque::new(),
target_nonce: Default::default(),
max_nonces_to_relay_in_single_tx,
_phantom: Default::default(),
}
}
}
type ProofParameters = bool;
impl<SourceHeaderNumber, SourceHeaderHash, TargetHeaderNumber, TargetHeaderHash, Nonce, Proof>
RaceStrategy<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
Nonce,
Proof,
> for DeliveryStrategy<SourceHeaderNumber, SourceHeaderHash, TargetHeaderNumber, TargetHeaderHash, Nonce, Proof>
where
SourceHeaderHash: Clone,
SourceHeaderNumber: Clone + Ord,
Nonce: Clone + Copy + From<u32> + Ord + std::ops::Add<Output = Nonce> + One + Zero,
{
fn is_empty(&self) -> bool {
self.source_queue.is_empty()
self.strategy.is_empty()
}
fn source_nonce_updated(&mut self, at_block: HeaderId<SourceHeaderHash, SourceHeaderNumber>, nonce: Nonce) {
if nonce <= self.target_nonce {
return;
}
match self.source_queue.back() {
Some((_, prev_nonce)) if *prev_nonce < nonce => (),
Some(_) => return,
None => (),
}
self.source_queue.push_back((at_block, nonce))
fn source_nonces_updated(&mut self, at_block: SourceHeaderIdOf<P>, nonces: ClientNonces<P::MessageNonce>) {
self.source_nonces = Some(nonces.clone());
self.strategy.source_nonces_updated(at_block, nonces)
}
fn target_nonce_updated(
fn target_nonces_updated(
&mut self,
nonce: Nonce,
race_state: &mut RaceState<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
Nonce,
Proof,
>,
nonces: ClientNonces<P::MessageNonce>,
race_state: &mut RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessageNonce, P::MessagesProof>,
) {
if nonce < self.target_nonce {
return;
}
while let Some(true) = self
.source_queue
.front()
.map(|(_, source_nonce)| *source_nonce <= nonce)
{
self.source_queue.pop_front();
}
let need_to_select_new_nonces = race_state
.nonces_to_submit
.as_ref()
.map(|(_, nonces, _)| *nonces.end() <= nonce)
.unwrap_or(false);
if need_to_select_new_nonces {
race_state.nonces_to_submit = None;
}
let need_new_nonces_to_submit = race_state
.nonces_submitted
.as_ref()
.map(|nonces| *nonces.end() <= nonce)
.unwrap_or(false);
if need_new_nonces_to_submit {
race_state.nonces_submitted = None;
}
self.target_nonce = nonce;
self.target_nonces = Some(nonces.clone());
self.strategy.target_nonces_updated(nonces, race_state)
}
fn select_nonces_to_deliver(
&mut self,
race_state: &RaceState<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
Nonce,
Proof,
>,
) -> Option<RangeInclusive<Nonce>> {
// if we have already selected nonces that we want to submit, do nothing
if race_state.nonces_to_submit.is_some() {
return None;
}
race_state: &RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessageNonce, P::MessagesProof>,
) -> Option<(RangeInclusive<P::MessageNonce>, Self::ProofParameters)> {
const CONFIRMED_NONCE_PROOF: &str = "\
ClientNonces are crafted by MessageDeliveryRace(Source|Target);\
MessageDeliveryRace(Source|Target) always fills confirmed_nonce field;\
qed";
// if we already submitted some nonces, do nothing
if race_state.nonces_submitted.is_some() {
return None;
}
let source_nonces = self.source_nonces.as_ref()?;
let target_nonces = self.target_nonces.as_ref()?;
// 1) we want to deliver all nonces, starting from `target_nonce + 1`
// 2) we want to deliver at most `self.max_nonces_to_relay_in_single_tx` nonces in this batch
// 3) we can't deliver new nonce until header, that has emitted this nonce, is finalized
// by target client
let nonces_begin = self.target_nonce + 1.into();
let best_header_at_target = &race_state.target_state.as_ref()?.best_peer;
let mut nonces_end = None;
let mut i = Zero::zero();
while i < self.max_nonces_to_relay_in_single_tx {
let nonce = nonces_begin + i;
// There's additional condition in the message delivery race: target would reject messages
// if there are too much unconfirmed messages at the inbound lane.
// if queue is empty, we don't need to prove anything
let (first_queued_at, first_queued_nonce) = match self.source_queue.front() {
Some((first_queued_at, first_queued_nonce)) => ((*first_queued_at).clone(), *first_queued_nonce),
None => break,
};
// https://github.com/paritytech/parity-bridges-common/issues/432
// TODO: message lane loop works with finalized blocks only, but we're submitting transactions that
// are updating best block (which may not be finalized yet). So all decisions that are made below
// may be outdated. This needs to be changed - all logic here must be built on top of best blocks.
// if header that has queued the message is not yet finalized at bridged chain,
// we can't prove anything
if first_queued_at.0 > best_header_at_target.0 {
break;
// The receiving race is responsible to deliver confirmations back to the source chain. So if
// there's a lot of unconfirmed messages, let's wait until it'll be able to do its job.
let latest_received_nonce_at_target = target_nonces.latest_nonce;
let latest_confirmed_nonce_at_source = source_nonces.confirmed_nonce.expect(CONFIRMED_NONCE_PROOF);
let confirmations_missing = latest_received_nonce_at_target.checked_sub(&latest_confirmed_nonce_at_source);
match confirmations_missing {
Some(confirmations_missing) if confirmations_missing > self.max_unconfirmed_nonces_at_target => {
log::debug!(
target: "bridge",
"Cannot deliver any more messages from {} to {}. Too many unconfirmed nonces \
at target: target.latest_received={:?}, source.latest_confirmed={:?}, max={:?}",
MessageDeliveryRace::<P>::source_name(),
MessageDeliveryRace::<P>::target_name(),
latest_received_nonce_at_target,
latest_confirmed_nonce_at_source,
self.max_unconfirmed_nonces_at_target,
);
return None;
}
// ok, we may deliver this nonce
nonces_end = Some(nonce);
// probably remove it from the queue?
if nonce == first_queued_nonce {
self.source_queue.pop_front();
}
i = i + One::one();
_ => (),
}
nonces_end.map(|nonces_end| RangeInclusive::new(nonces_begin, nonces_end))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message_lane_loop::{
tests::{header_id, TestMessageLane, TestMessageNonce, TestMessagesProof},
ClientState,
};
#[test]
fn strategy_is_empty_works() {
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::new(4);
assert_eq!(strategy.is_empty(), true);
strategy.source_nonce_updated(header_id(1), 1);
assert_eq!(strategy.is_empty(), false);
}
#[test]
fn source_nonce_is_never_lower_than_known_target_nonce() {
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::new(4);
strategy.target_nonce_updated(10, &mut Default::default());
strategy.source_nonce_updated(header_id(1), 5);
assert_eq!(strategy.source_queue, vec![]);
}
#[test]
fn source_nonce_is_never_lower_than_latest_known_source_nonce() {
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::new(4);
strategy.source_nonce_updated(header_id(1), 5);
strategy.source_nonce_updated(header_id(2), 3);
strategy.source_nonce_updated(header_id(2), 5);
assert_eq!(strategy.source_queue, vec![(header_id(1), 5)]);
}
#[test]
fn target_nonce_is_never_lower_than_latest_known_target_nonce() {
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::new(4);
strategy.target_nonce_updated(10, &mut Default::default());
strategy.target_nonce_updated(5, &mut Default::default());
assert_eq!(strategy.target_nonce, 10);
}
#[test]
fn updated_target_nonce_removes_queued_entries() {
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::new(4);
strategy.source_nonce_updated(header_id(1), 5);
strategy.source_nonce_updated(header_id(2), 10);
strategy.source_nonce_updated(header_id(3), 15);
strategy.source_nonce_updated(header_id(4), 20);
strategy.target_nonce_updated(15, &mut Default::default());
assert_eq!(strategy.source_queue, vec![(header_id(4), 20)]);
}
#[test]
fn selected_nonces_are_dropped_on_target_nonce_update() {
let mut state = RaceState::default();
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::new(4);
state.nonces_to_submit = Some((header_id(1), 5..=10, 5..=10));
strategy.target_nonce_updated(7, &mut state);
assert!(state.nonces_to_submit.is_some());
strategy.target_nonce_updated(10, &mut state);
assert!(state.nonces_to_submit.is_none());
}
#[test]
fn submitted_nonces_are_dropped_on_target_nonce_update() {
let mut state = RaceState::default();
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::new(4);
state.nonces_submitted = Some(5..=10);
strategy.target_nonce_updated(7, &mut state);
assert!(state.nonces_submitted.is_some());
strategy.target_nonce_updated(10, &mut state);
assert!(state.nonces_submitted.is_none());
}
#[test]
fn nothing_is_selected_if_something_is_already_selected() {
let mut state = RaceState::default();
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::new(4);
state.nonces_to_submit = Some((header_id(1), 1..=10, 1..=10));
strategy.source_nonce_updated(header_id(1), 10);
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
}
#[test]
fn nothing_is_selected_if_something_is_already_submitted() {
let mut state = RaceState::default();
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::new(4);
state.nonces_submitted = Some(1..=10);
strategy.source_nonce_updated(header_id(1), 10);
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
}
#[test]
fn select_nonces_to_deliver_works() {
let mut state = RaceState::<_, _, TestMessageNonce, TestMessagesProof>::default();
let mut strategy = MessageDeliveryStrategy::<TestMessageLane>::new(4);
strategy.source_nonce_updated(header_id(1), 1);
strategy.source_nonce_updated(header_id(2), 2);
strategy.source_nonce_updated(header_id(3), 6);
strategy.source_nonce_updated(header_id(5), 8);
state.target_state = Some(ClientState {
best_self: header_id(0),
best_peer: header_id(4),
});
assert_eq!(strategy.select_nonces_to_deliver(&state), Some(1..=4));
strategy.target_nonce_updated(4, &mut state);
assert_eq!(strategy.select_nonces_to_deliver(&state), Some(5..=6));
strategy.target_nonce_updated(6, &mut state);
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
state.target_state = Some(ClientState {
best_self: header_id(0),
best_peer: header_id(5),
});
assert_eq!(strategy.select_nonces_to_deliver(&state), Some(7..=8));
strategy.target_nonce_updated(8, &mut state);
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
// If we're here, then the confirmations race did it job && sending side now knows that messages
// have been delivered. Now let's select nonces that we want to deliver.
let selected_nonces = self.strategy.select_nonces_to_deliver(race_state)?.0;
// Ok - we have new nonces to deliver. But target may still reject new messages, because we haven't
// notified it that (some) messages have been confirmed. So we may want to include updated
// `source.latest_confirmed` in the proof.
//
// Important note: we're including outbound state lane proof whenever there are unconfirmed nonces
// on the target chain. Other strategy is to include it only if it's absolutely necessary.
let latest_confirmed_nonce_at_target = target_nonces.confirmed_nonce.expect(CONFIRMED_NONCE_PROOF);
let outbound_state_proof_required = latest_confirmed_nonce_at_target < latest_confirmed_nonce_at_source;
// https://github.com/paritytech/parity-bridges-common/issues/432
// https://github.com/paritytech/parity-bridges-common/issues/433
// TODO: number of messages must be no larger than:
// `max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target - latest_confirmed_nonce_at_target)`
Some((selected_nonces, outbound_state_proof_required))
}
}
@@ -20,9 +20,6 @@
//! associated data - like messages, lane state, etc) to the target node by
//! generating and submitting proof.
// Until there'll be actual message-lane in the runtime.
#![allow(dead_code)]
use crate::message_lane_loop::ClientState;
use async_trait::async_trait;
@@ -61,36 +58,49 @@ type SourceClientState<P> = ClientState<<P as MessageRace>::SourceHeaderId, <P a
/// State of race target client.
type TargetClientState<P> = ClientState<<P as MessageRace>::TargetHeaderId, <P as MessageRace>::SourceHeaderId>;
/// Nonces on the race client.
#[derive(Debug, Clone)]
pub struct ClientNonces<MessageNonce> {
/// Latest nonce that is known to the client.
pub latest_nonce: MessageNonce,
/// Latest nonce that is confirmed to the bridged client. This nonce only makes
/// sense in some races. In other races it is `None`.
pub confirmed_nonce: Option<MessageNonce>,
}
/// One of message lane clients, which is source client for the race.
#[async_trait(?Send)]
#[async_trait]
pub trait SourceClient<P: MessageRace> {
/// Type of error this clients returns.
type Error: std::fmt::Debug + MaybeConnectionError;
/// Additional proof parameters required to generate proof.
type ProofParameters;
/// Return latest nonce that is known to the source client.
async fn latest_nonce(
/// Return nonces that are known to the source client.
async fn nonces(
&self,
at_block: P::SourceHeaderId,
) -> Result<(P::SourceHeaderId, P::MessageNonce), Self::Error>;
) -> Result<(P::SourceHeaderId, ClientNonces<P::MessageNonce>), Self::Error>;
/// Generate proof for delivering to the target client.
async fn generate_proof(
&self,
at_block: P::SourceHeaderId,
nonces: RangeInclusive<P::MessageNonce>,
proof_parameters: Self::ProofParameters,
) -> Result<(P::SourceHeaderId, RangeInclusive<P::MessageNonce>, P::Proof), Self::Error>;
}
/// One of message lane clients, which is target client for the race.
#[async_trait(?Send)]
#[async_trait]
pub trait TargetClient<P: MessageRace> {
/// Type of error this clients returns.
type Error: std::fmt::Debug + MaybeConnectionError;
/// Return latest nonce that is known to the target client.
async fn latest_nonce(
/// Return nonces that are known to the target client.
async fn nonces(
&self,
at_block: P::TargetHeaderId,
) -> Result<(P::TargetHeaderId, P::MessageNonce), Self::Error>;
) -> Result<(P::TargetHeaderId, ClientNonces<P::MessageNonce>), Self::Error>;
/// Submit proof to the target client.
async fn submit_proof(
&self,
@@ -102,22 +112,26 @@ pub trait TargetClient<P: MessageRace> {
/// Race strategy.
pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, MessageNonce, Proof> {
/// Additional proof parameters required to generate proof.
type ProofParameters;
/// Should return true if nothing has to be synced.
fn is_empty(&self) -> bool;
/// Called when latest nonce is updated at source node of the race.
fn source_nonce_updated(&mut self, at_block: SourceHeaderId, nonce: MessageNonce);
/// Called when latest nonce is updated at target node of the race.
fn target_nonce_updated(
/// Called when nonces are updated at source node of the race.
fn source_nonces_updated(&mut self, at_block: SourceHeaderId, nonce: ClientNonces<MessageNonce>);
/// Called when nonces are updated at target node of the race.
fn target_nonces_updated(
&mut self,
nonce: MessageNonce,
nonces: ClientNonces<MessageNonce>,
race_state: &mut RaceState<SourceHeaderId, TargetHeaderId, MessageNonce, Proof>,
);
/// Should return `Some(nonces)` if we need to deliver proof of `nonces` (and associated
/// data) from source to target node.
/// Additionally, parameters required to generate proof are returned.
fn select_nonces_to_deliver(
&mut self,
race_state: &RaceState<SourceHeaderId, TargetHeaderId, MessageNonce, Proof>,
) -> Option<RangeInclusive<MessageNonce>>;
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)>;
}
/// State of the race.
@@ -133,38 +147,44 @@ pub struct RaceState<SourceHeaderId, TargetHeaderId, MessageNonce, Proof> {
}
/// Run race loop until connection with target or source node is lost.
pub async fn run<P: MessageRace>(
race_source: impl SourceClient<P>,
pub async fn run<P: MessageRace, SC: SourceClient<P>>(
race_source: SC,
race_source_updated: impl FusedStream<Item = SourceClientState<P>>,
race_target: impl TargetClient<P>,
race_target_updated: impl FusedStream<Item = TargetClientState<P>>,
stall_timeout: Duration,
mut strategy: impl RaceStrategy<P::SourceHeaderId, P::TargetHeaderId, P::MessageNonce, P::Proof>,
mut strategy: impl RaceStrategy<
P::SourceHeaderId,
P::TargetHeaderId,
P::MessageNonce,
P::Proof,
ProofParameters = SC::ProofParameters,
>,
) -> Result<(), FailedClient> {
let mut race_state = RaceState::default();
let mut stall_countdown = Instant::now();
let mut source_retry_backoff = retry_backoff();
let mut source_client_is_online = true;
let mut source_latest_nonce_required = false;
let source_latest_nonce = futures::future::Fuse::terminated();
let mut source_nonces_required = false;
let source_nonces = futures::future::Fuse::terminated();
let source_generate_proof = futures::future::Fuse::terminated();
let source_go_offline_future = futures::future::Fuse::terminated();
let mut target_retry_backoff = retry_backoff();
let mut target_client_is_online = true;
let mut target_latest_nonce_required = false;
let target_latest_nonce = futures::future::Fuse::terminated();
let mut target_nonces_required = false;
let target_nonces = futures::future::Fuse::terminated();
let target_submit_proof = futures::future::Fuse::terminated();
let target_go_offline_future = futures::future::Fuse::terminated();
futures::pin_mut!(
race_source_updated,
source_latest_nonce,
source_nonces,
source_generate_proof,
source_go_offline_future,
race_target_updated,
target_latest_nonce,
target_nonces,
target_submit_proof,
target_go_offline_future,
);
@@ -175,7 +195,7 @@ pub async fn run<P: MessageRace>(
source_state = race_source_updated.next() => {
if let Some(source_state) = source_state {
if race_state.source_state.as_ref() != Some(&source_state) {
source_latest_nonce_required = true;
source_nonces_required = true;
race_state.source_state = Some(source_state);
}
}
@@ -183,53 +203,53 @@ pub async fn run<P: MessageRace>(
target_state = race_target_updated.next() => {
if let Some(target_state) = target_state {
if race_state.target_state.as_ref() != Some(&target_state) {
target_latest_nonce_required = true;
target_nonces_required = true;
race_state.target_state = Some(target_state);
}
}
},
// when nonces are updated
latest_nonce = source_latest_nonce => {
source_latest_nonce_required = false;
nonces = source_nonces => {
source_nonces_required = false;
source_client_is_online = process_future_result(
latest_nonce,
nonces,
&mut source_retry_backoff,
|(at_block, latest_nonce)| {
|(at_block, nonces)| {
log::debug!(
target: "bridge",
"Received latest nonce from {}: {:?}",
"Received nonces from {}: {:?}",
P::source_name(),
latest_nonce,
nonces,
);
strategy.source_nonce_updated(at_block, latest_nonce);
strategy.source_nonces_updated(at_block, nonces);
},
&mut source_go_offline_future,
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving latest nonce from {}", P::source_name()),
|| format!("Error retrieving nonces from {}", P::source_name()),
).fail_if_connection_error(FailedClient::Source)?;
},
latest_nonce = target_latest_nonce => {
target_latest_nonce_required = false;
nonces = target_nonces => {
target_nonces_required = false;
target_client_is_online = process_future_result(
latest_nonce,
nonces,
&mut target_retry_backoff,
|(_, latest_nonce)| {
|(_, nonces)| {
log::debug!(
target: "bridge",
"Received latest nonce from {}: {:?}",
"Received nonces from {}: {:?}",
P::target_name(),
latest_nonce,
nonces,
);
strategy.target_nonce_updated(latest_nonce, &mut race_state);
strategy.target_nonces_updated(nonces, &mut race_state);
},
&mut target_go_offline_future,
|delay| async_std::task::sleep(delay),
|| format!("Error retrieving latest nonce from {}", P::target_name()),
|| format!("Error retrieving nonces from {}", P::target_name()),
).fail_if_connection_error(FailedClient::Target)?;
},
@@ -288,26 +308,32 @@ pub async fn run<P: MessageRace>(
let nonces_to_deliver = race_state.source_state.as_ref().and_then(|source_state| {
strategy
.select_nonces_to_deliver(&race_state)
.map(|nonces_range| (source_state.best_self.clone(), nonces_range))
.map(|(nonces_range, proof_parameters)| {
(source_state.best_self.clone(), nonces_range, proof_parameters)
})
});
if let Some((at_block, nonces_range)) = nonces_to_deliver {
if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver {
log::debug!(
target: "bridge",
"Asking {} to prove nonces in range {:?}",
P::source_name(),
nonces_range,
);
source_generate_proof.set(race_source.generate_proof(at_block, nonces_range).fuse());
} else if source_latest_nonce_required {
log::debug!(target: "bridge", "Asking {} about latest generated message nonce", P::source_name());
source_generate_proof.set(
race_source
.generate_proof(at_block, nonces_range, proof_parameters)
.fuse(),
);
} else if source_nonces_required {
log::debug!(target: "bridge", "Asking {} about message nonces", P::source_name());
let at_block = race_state
.source_state
.as_ref()
.expect("source_latest_nonce_required is only true when source_state is Some; qed")
.expect("source_nonces_required is only true when source_state is Some; qed")
.best_self
.clone();
source_latest_nonce.set(race_source.latest_nonce(at_block).fuse());
source_nonces.set(race_source.nonces(at_block).fuse());
} else {
source_client_is_online = true;
}
@@ -329,15 +355,15 @@ pub async fn run<P: MessageRace>(
.fuse(),
);
}
if target_latest_nonce_required {
log::debug!(target: "bridge", "Asking {} about latest nonce", P::target_name());
if target_nonces_required {
log::debug!(target: "bridge", "Asking {} about message nonces", P::target_name());
let at_block = race_state
.target_state
.as_ref()
.expect("target_latest_nonce_required is only true when target_state is Some; qed")
.expect("target_nonces_required is only true when target_state is Some; qed")
.best_self
.clone();
target_latest_nonce.set(race_target.latest_nonce(at_block).fuse());
target_nonces.set(race_target.nonces(at_block).fuse());
} else {
target_client_is_online = true;
}
@@ -18,8 +18,8 @@ use crate::message_lane_loop::{
SourceClient as MessageLaneSourceClient, SourceClientState, TargetClient as MessageLaneTargetClient,
TargetClientState,
};
use crate::message_race_delivery::DeliveryStrategy;
use crate::message_race_loop::{MessageRace, SourceClient, TargetClient};
use crate::message_race_loop::{ClientNonces, MessageRace, SourceClient, TargetClient};
use crate::message_race_strategy::BasicStrategy;
use crate::metrics::MessageLaneLoopMetrics;
use async_trait::async_trait;
@@ -28,7 +28,7 @@ use relay_utils::FailedClient;
use std::{marker::PhantomData, ops::RangeInclusive, time::Duration};
/// Message receiving confirmations delivery strategy.
type ReceivingConfirmationsDeliveryStrategy<P> = DeliveryStrategy<
type ReceivingConfirmationsBasicStrategy<P> = BasicStrategy<
<P as MessageLane>::TargetHeaderNumber,
<P as MessageLane>::TargetHeaderHash,
<P as MessageLane>::SourceHeaderNumber,
@@ -60,7 +60,7 @@ pub async fn run<P: MessageLane>(
},
source_state_updates,
stall_timeout,
ReceivingConfirmationsDeliveryStrategy::<P>::new(std::u32::MAX.into()),
ReceivingConfirmationsBasicStrategy::<P>::new(std::u32::MAX.into()),
)
.await
}
@@ -91,31 +91,38 @@ struct ReceivingConfirmationsRaceSource<P: MessageLane, C> {
_phantom: PhantomData<P>,
}
#[async_trait(?Send)]
#[async_trait]
impl<P, C> SourceClient<ReceivingConfirmationsRace<P>> for ReceivingConfirmationsRaceSource<P, C>
where
P: MessageLane,
C: MessageLaneTargetClient<P>,
{
type Error = C::Error;
type ProofParameters = ();
async fn latest_nonce(
async fn nonces(
&self,
at_block: TargetHeaderIdOf<P>,
) -> Result<(TargetHeaderIdOf<P>, P::MessageNonce), Self::Error> {
let result = self.client.latest_received_nonce(at_block).await;
) -> Result<(TargetHeaderIdOf<P>, ClientNonces<P::MessageNonce>), Self::Error> {
let (at_block, latest_received_nonce) = self.client.latest_received_nonce(at_block).await?;
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
if let Ok((_, target_latest_received_nonce)) = result.as_ref() {
metrics_msg.update_target_latest_received_nonce::<P>(*target_latest_received_nonce);
}
metrics_msg.update_target_latest_received_nonce::<P>(latest_received_nonce);
}
result
Ok((
at_block,
ClientNonces {
latest_nonce: latest_received_nonce,
confirmed_nonce: None,
},
))
}
#[allow(clippy::unit_arg)]
async fn generate_proof(
&self,
at_block: TargetHeaderIdOf<P>,
nonces: RangeInclusive<P::MessageNonce>,
_proof_parameters: Self::ProofParameters,
) -> Result<
(
TargetHeaderIdOf<P>,
@@ -138,7 +145,7 @@ struct ReceivingConfirmationsRaceTarget<P: MessageLane, C> {
_phantom: PhantomData<P>,
}
#[async_trait(?Send)]
#[async_trait]
impl<P, C> TargetClient<ReceivingConfirmationsRace<P>> for ReceivingConfirmationsRaceTarget<P, C>
where
P: MessageLane,
@@ -146,27 +153,32 @@ where
{
type Error = C::Error;
async fn latest_nonce(
async fn nonces(
&self,
at_block: SourceHeaderIdOf<P>,
) -> Result<(SourceHeaderIdOf<P>, P::MessageNonce), Self::Error> {
let result = self.client.latest_confirmed_received_nonce(at_block).await;
) -> Result<(SourceHeaderIdOf<P>, ClientNonces<P::MessageNonce>), Self::Error> {
let (at_block, latest_confirmed_nonce) = self.client.latest_confirmed_received_nonce(at_block).await?;
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
if let Ok((_, source_latest_confirmed_nonce)) = result.as_ref() {
metrics_msg.update_source_latest_confirmed_nonce::<P>(*source_latest_confirmed_nonce);
}
metrics_msg.update_source_latest_confirmed_nonce::<P>(latest_confirmed_nonce);
}
result
Ok((
at_block,
ClientNonces {
latest_nonce: latest_confirmed_nonce,
confirmed_nonce: None,
},
))
}
async fn submit_proof(
&self,
generated_at_block: TargetHeaderIdOf<P>,
_nonces: RangeInclusive<P::MessageNonce>,
nonces: RangeInclusive<P::MessageNonce>,
proof: P::MessagesReceivingProof,
) -> Result<RangeInclusive<P::MessageNonce>, Self::Error> {
self.client
.submit_messages_receiving_proof(generated_at_block, proof)
.await
.await?;
Ok(nonces)
}
}
@@ -0,0 +1,334 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//! Basic delivery strategy. The strategy selects nonces if:
//!
//! 1) there are more nonces on the source side than on the target side;
//! 2) new nonces may be proved to target node (i.e. they have appeared at the
//! block, which is known to the target node).
use crate::message_race_loop::{ClientNonces, RaceState, RaceStrategy};
use num_traits::{One, Zero};
use relay_utils::HeaderId;
use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive};
/// Nonces delivery strategy.
#[derive(Debug)]
pub struct BasicStrategy<SourceHeaderNumber, SourceHeaderHash, TargetHeaderNumber, TargetHeaderHash, Nonce, Proof> {
/// All queued nonces.
source_queue: VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, Nonce)>,
/// Best nonce known to target node.
target_nonce: Nonce,
/// Max nonces to relay in single transaction.
max_nonces_to_relay_in_single_tx: Nonce,
/// Unused generic types dump.
_phantom: PhantomData<(TargetHeaderNumber, TargetHeaderHash, Proof)>,
}
impl<SourceHeaderNumber, SourceHeaderHash, TargetHeaderNumber, TargetHeaderHash, Nonce: Default, Proof>
BasicStrategy<SourceHeaderNumber, SourceHeaderHash, TargetHeaderNumber, TargetHeaderHash, Nonce, Proof>
{
/// Create new delivery strategy.
pub fn new(max_nonces_to_relay_in_single_tx: Nonce) -> Self {
BasicStrategy {
source_queue: VecDeque::new(),
target_nonce: Default::default(),
max_nonces_to_relay_in_single_tx,
_phantom: Default::default(),
}
}
}
impl<SourceHeaderNumber, SourceHeaderHash, TargetHeaderNumber, TargetHeaderHash, Nonce, Proof>
RaceStrategy<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
Nonce,
Proof,
> for BasicStrategy<SourceHeaderNumber, SourceHeaderHash, TargetHeaderNumber, TargetHeaderHash, Nonce, Proof>
where
SourceHeaderHash: Clone,
SourceHeaderNumber: Clone + Ord,
Nonce: Clone + Copy + From<u32> + Ord + std::ops::Add<Output = Nonce> + One + Zero,
{
type ProofParameters = ();
fn is_empty(&self) -> bool {
self.source_queue.is_empty()
}
fn source_nonces_updated(
&mut self,
at_block: HeaderId<SourceHeaderHash, SourceHeaderNumber>,
nonces: ClientNonces<Nonce>,
) {
let nonce = nonces.latest_nonce;
if nonce <= self.target_nonce {
return;
}
match self.source_queue.back() {
Some((_, prev_nonce)) if *prev_nonce < nonce => (),
Some(_) => return,
None => (),
}
self.source_queue.push_back((at_block, nonce))
}
fn target_nonces_updated(
&mut self,
nonces: ClientNonces<Nonce>,
race_state: &mut RaceState<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
Nonce,
Proof,
>,
) {
let nonce = nonces.latest_nonce;
if nonce < self.target_nonce {
return;
}
while let Some(true) = self
.source_queue
.front()
.map(|(_, source_nonce)| *source_nonce <= nonce)
{
self.source_queue.pop_front();
}
let need_to_select_new_nonces = race_state
.nonces_to_submit
.as_ref()
.map(|(_, nonces, _)| *nonces.end() <= nonce)
.unwrap_or(false);
if need_to_select_new_nonces {
race_state.nonces_to_submit = None;
}
let need_new_nonces_to_submit = race_state
.nonces_submitted
.as_ref()
.map(|nonces| *nonces.end() <= nonce)
.unwrap_or(false);
if need_new_nonces_to_submit {
race_state.nonces_submitted = None;
}
self.target_nonce = nonce;
}
fn select_nonces_to_deliver(
&mut self,
race_state: &RaceState<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
Nonce,
Proof,
>,
) -> Option<(RangeInclusive<Nonce>, Self::ProofParameters)> {
// if we have already selected nonces that we want to submit, do nothing
if race_state.nonces_to_submit.is_some() {
return None;
}
// if we already submitted some nonces, do nothing
if race_state.nonces_submitted.is_some() {
return None;
}
// 1) we want to deliver all nonces, starting from `target_nonce + 1`
// 2) we want to deliver at most `self.max_nonces_to_relay_in_single_tx` nonces in this batch
// 3) we can't deliver new nonce until header, that has emitted this nonce, is finalized
// by target client
let nonces_begin = self.target_nonce + 1.into();
let best_header_at_target = &race_state.target_state.as_ref()?.best_peer;
let mut nonces_end = None;
let mut i = Zero::zero();
// https://github.com/paritytech/parity-bridges-common/issues/433
// TODO: instead of limiting number of messages by number, provide custom limit callback here.
// In delivery race it'll be weight-based callback. In receiving race it'll be unlimited callback.
while i < self.max_nonces_to_relay_in_single_tx {
let nonce = nonces_begin + i;
// if queue is empty, we don't need to prove anything
let (first_queued_at, first_queued_nonce) = match self.source_queue.front() {
Some((first_queued_at, first_queued_nonce)) => ((*first_queued_at).clone(), *first_queued_nonce),
None => break,
};
// if header that has queued the message is not yet finalized at bridged chain,
// we can't prove anything
if first_queued_at.0 > best_header_at_target.0 {
break;
}
// ok, we may deliver this nonce
nonces_end = Some(nonce);
// probably remove it from the queue?
if nonce == first_queued_nonce {
self.source_queue.pop_front();
}
i = i + One::one();
}
nonces_end.map(|nonces_end| (RangeInclusive::new(nonces_begin, nonces_end), ()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message_lane::MessageLane;
use crate::message_lane_loop::{
tests::{header_id, TestMessageLane, TestMessageNonce, TestMessagesProof},
ClientState,
};
type BasicStrategy<P> = super::BasicStrategy<
<P as MessageLane>::SourceHeaderNumber,
<P as MessageLane>::SourceHeaderHash,
<P as MessageLane>::TargetHeaderNumber,
<P as MessageLane>::TargetHeaderHash,
<P as MessageLane>::MessageNonce,
<P as MessageLane>::MessagesProof,
>;
fn nonces(latest_nonce: TestMessageNonce) -> ClientNonces<TestMessageNonce> {
ClientNonces {
latest_nonce,
confirmed_nonce: None,
}
}
#[test]
fn strategy_is_empty_works() {
let mut strategy = BasicStrategy::<TestMessageLane>::new(4);
assert_eq!(strategy.is_empty(), true);
strategy.source_nonces_updated(header_id(1), nonces(1));
assert_eq!(strategy.is_empty(), false);
}
#[test]
fn source_nonce_is_never_lower_than_known_target_nonce() {
let mut strategy = BasicStrategy::<TestMessageLane>::new(4);
strategy.target_nonces_updated(nonces(10), &mut Default::default());
strategy.source_nonces_updated(header_id(1), nonces(5));
assert_eq!(strategy.source_queue, vec![]);
}
#[test]
fn source_nonce_is_never_lower_than_latest_known_source_nonce() {
let mut strategy = BasicStrategy::<TestMessageLane>::new(4);
strategy.source_nonces_updated(header_id(1), nonces(5));
strategy.source_nonces_updated(header_id(2), nonces(3));
strategy.source_nonces_updated(header_id(2), nonces(5));
assert_eq!(strategy.source_queue, vec![(header_id(1), 5)]);
}
#[test]
fn target_nonce_is_never_lower_than_latest_known_target_nonce() {
let mut strategy = BasicStrategy::<TestMessageLane>::new(4);
strategy.target_nonces_updated(nonces(10), &mut Default::default());
strategy.target_nonces_updated(nonces(5), &mut Default::default());
assert_eq!(strategy.target_nonce, 10);
}
#[test]
fn updated_target_nonce_removes_queued_entries() {
let mut strategy = BasicStrategy::<TestMessageLane>::new(4);
strategy.source_nonces_updated(header_id(1), nonces(5));
strategy.source_nonces_updated(header_id(2), nonces(10));
strategy.source_nonces_updated(header_id(3), nonces(15));
strategy.source_nonces_updated(header_id(4), nonces(20));
strategy.target_nonces_updated(nonces(15), &mut Default::default());
assert_eq!(strategy.source_queue, vec![(header_id(4), 20)]);
}
#[test]
fn selected_nonces_are_dropped_on_target_nonce_update() {
let mut state = RaceState::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new(4);
state.nonces_to_submit = Some((header_id(1), 5..=10, (5..=10, None)));
strategy.target_nonces_updated(nonces(7), &mut state);
assert!(state.nonces_to_submit.is_some());
strategy.target_nonces_updated(nonces(10), &mut state);
assert!(state.nonces_to_submit.is_none());
}
#[test]
fn submitted_nonces_are_dropped_on_target_nonce_update() {
let mut state = RaceState::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new(4);
state.nonces_submitted = Some(5..=10);
strategy.target_nonces_updated(nonces(7), &mut state);
assert!(state.nonces_submitted.is_some());
strategy.target_nonces_updated(nonces(10), &mut state);
assert!(state.nonces_submitted.is_none());
}
#[test]
fn nothing_is_selected_if_something_is_already_selected() {
let mut state = RaceState::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new(4);
state.nonces_to_submit = Some((header_id(1), 1..=10, (1..=10, None)));
strategy.source_nonces_updated(header_id(1), nonces(10));
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
}
#[test]
fn nothing_is_selected_if_something_is_already_submitted() {
let mut state = RaceState::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new(4);
state.nonces_submitted = Some(1..=10);
strategy.source_nonces_updated(header_id(1), nonces(10));
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
}
#[test]
fn select_nonces_to_deliver_works() {
let mut state = RaceState::<_, _, TestMessageNonce, TestMessagesProof>::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new(4);
strategy.source_nonces_updated(header_id(1), nonces(1));
strategy.source_nonces_updated(header_id(2), nonces(2));
strategy.source_nonces_updated(header_id(3), nonces(6));
strategy.source_nonces_updated(header_id(5), nonces(8));
state.target_state = Some(ClientState {
best_self: header_id(0),
best_peer: header_id(4),
});
assert_eq!(strategy.select_nonces_to_deliver(&state), Some((1..=4, ())));
strategy.target_nonces_updated(nonces(4), &mut state);
assert_eq!(strategy.select_nonces_to_deliver(&state), Some((5..=6, ())));
strategy.target_nonces_updated(nonces(6), &mut state);
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
state.target_state = Some(ClientState {
best_self: header_id(0),
best_peer: header_id(5),
});
assert_eq!(strategy.select_nonces_to_deliver(&state), Some((7..=8, ())));
strategy.target_nonces_updated(nonces(8), &mut state);
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
}
}