Custom relay strategy (#1198)

* Add relayer strategy

* Add default relayer strategy

* default relayer strategy

* expose relayer strategy

* fix compile

* fix compile

* docs

* Rename Relayer to Relay, keep RelayerDecide

* split `DefaultRelayerStrategy` into `AltruisticRelayerStrategy` and `RationalRelayerStrategy`

* Remove relayer mode

* Remove unused import

* Rename `RelayerStrategy` to `RelayStrategy`

* Add missing docs

* clippy

* clippy

* clippy

* clippy

* Revert `relayer_mode` and add `MixStrategy`

* Add `EnforcementStrategy`

* fix bug and simplify relay strategy

* Update message_lane_loop.rs

* Update messages_target.rs

* clippy

* clippy

* clippy

* clippy

* clippy

* clippy

* clippy

* fix test

* fix test

* test

test

test

fix test
This commit is contained in:
fewensa
2021-11-09 17:01:06 +08:00
committed by Bastian Köcher
parent c2b38ba530
commit 19201175e6
17 changed files with 714 additions and 344 deletions
@@ -24,17 +24,13 @@
//! finalized header. I.e. when talking about headers in lane context, we
//! only care about finalized headers.
use crate::{
message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
message_race_delivery::run as run_message_delivery_race,
message_race_receiving::run as run_message_receiving_race,
metrics::MessageLaneLoopMetrics,
};
use std::{collections::BTreeMap, fmt::Debug, future::Future, ops::RangeInclusive, time::Duration};
use async_trait::async_trait;
use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt};
use bp_messages::{LaneId, MessageNonce, UnrewardedRelayersState, Weight};
use bp_runtime::messages::DispatchFeePayment;
use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt};
use relay_utils::{
interval,
metrics::{GlobalMetrics, MetricsParams},
@@ -42,11 +38,18 @@ use relay_utils::{
relay_loop::Client as RelayClient,
retry_backoff, FailedClient,
};
use std::{collections::BTreeMap, fmt::Debug, future::Future, ops::RangeInclusive, time::Duration};
use crate::{
message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
message_race_delivery::run as run_message_delivery_race,
message_race_receiving::run as run_message_receiving_race,
metrics::MessageLaneLoopMetrics,
relay_strategy::RelayStrategy,
};
/// Message lane loop configuration params.
#[derive(Debug, Clone)]
pub struct Params {
pub struct Params<Strategy: RelayStrategy> {
/// Id of lane this loop is servicing.
pub lane: LaneId,
/// Interval at which we ask target node about its updates.
@@ -58,7 +61,7 @@ pub struct Params {
/// The loop will auto-restart if there has been no updates during this period.
pub stall_timeout: Duration,
/// Message delivery race parameters.
pub delivery_params: MessageDeliveryParams,
pub delivery_params: MessageDeliveryParams<Strategy>,
}
/// Relayer operating mode.
@@ -73,7 +76,7 @@ pub enum RelayerMode {
/// Message delivery race parameters.
#[derive(Debug, Clone)]
pub struct MessageDeliveryParams {
pub struct MessageDeliveryParams<Strategy: RelayStrategy> {
/// Maximal number of unconfirmed relayer entries at the inbound lane. If there's that number
/// of entries in the `InboundLaneData::relayers` set, all new messages will be rejected until
/// reward payment will be proved (by including outbound lane state to the message delivery
@@ -89,8 +92,8 @@ pub struct MessageDeliveryParams {
pub max_messages_weight_in_single_batch: Weight,
/// Maximal cumulative size of relayed messages in single delivery transaction.
pub max_messages_size_in_single_batch: u32,
/// Relayer operating mode.
pub relayer_mode: RelayerMode,
/// Relay strategy
pub relay_strategy: Strategy,
}
/// Message details.
@@ -257,8 +260,8 @@ pub fn metrics_prefix<P: MessageLane>(lane: &LaneId) -> String {
}
/// Run message lane service loop.
pub async fn run<P: MessageLane>(
params: Params,
pub async fn run<P: MessageLane, Strategy: RelayStrategy>(
params: Params<Strategy>,
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
metrics_params: MetricsParams,
@@ -286,8 +289,13 @@ pub async fn run<P: MessageLane>(
/// Run one-way message delivery loop until connection with target or source node is lost, or exit
/// signal is received.
async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: TargetClient<P>>(
params: Params,
async fn run_until_connection_lost<
P: MessageLane,
Strategy: RelayStrategy,
SC: SourceClient<P>,
TC: TargetClient<P>,
>(
params: Params<Strategy>,
source_client: SC,
target_client: TC,
metrics_msg: Option<MessageLaneLoopMetrics>,
@@ -449,11 +457,16 @@ async fn run_until_connection_lost<P: MessageLane, SC: SourceClient<P>, TC: Targ
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use std::sync::Arc;
use futures::stream::StreamExt;
use parking_lot::Mutex;
use relay_utils::{HeaderId, MaybeConnectionError};
use std::sync::Arc;
use crate::relay_strategy::AltruisticStrategy;
use super::*;
pub fn header_id(number: TestSourceHeaderNumber) -> TestSourceHeaderId {
HeaderId(number, number)
@@ -807,7 +820,7 @@ pub(crate) mod tests {
max_messages_in_single_batch: 4,
max_messages_weight_in_single_batch: 4,
max_messages_size_in_single_batch: 4,
relayer_mode: RelayerMode::Altruistic,
relay_strategy: AltruisticStrategy,
},
},
source_client,