Bridges subtree sync (#3022)

* Squashed 'bridges/' changes from edf33a2c85..277f0d5496

277f0d5496 Dynamic fees for bridges-v1 (#2294)
b1c51f7dd2 Finality loop refactoring (#2357)
620db2b10f Add equivocation detector crate and implement clients (#2348) (#2353)
3fe4b13eb4 Add basic equivocation detection pipeline schema (#2338) (#2341)

git-subtree-dir: bridges
git-subtree-split: 277f0d54961c800b231d8123c6445f378b1deb89

* [dynfees] Rococo/Wococo does not need congestion and dynamic fees (for now)

* Fix

* ".git/.scripts/commands/fmt/fmt.sh"

* Forgotten bridges/Cargo.lock

---------

Co-authored-by: command-bot <>
This commit is contained in:
Branislav Kontur
2023-08-17 09:15:24 +02:00
committed by GitHub
parent 840ca86741
commit 061eee1382
40 changed files with 2177 additions and 250 deletions
@@ -17,7 +17,7 @@
use crate::messages::{
source::FromBridgedChainMessagesDeliveryProof, target::FromBridgedChainMessagesProof,
};
use bp_messages::{InboundLaneData, LaneId, MessageNonce};
use bp_messages::{target_chain::MessageDispatch, InboundLaneData, LaneId, MessageNonce};
use frame_support::{
dispatch::CallableCallFor,
traits::{Get, IsSubType},
@@ -77,7 +77,12 @@ impl ReceiveMessagesProofInfo {
///
/// - or there are no bundled messages, but the inbound lane is blocked by too many unconfirmed
/// messages and/or unrewarded relayers.
fn is_obsolete(&self) -> bool {
fn is_obsolete(&self, is_dispatcher_active: bool) -> bool {
// if dispatcher is inactive, we don't accept any delivery transactions
if !is_dispatcher_active {
return true
}
// transactions with zero bundled nonces are not allowed, unless they're message
// delivery transactions, which brings reward confirmations required to unblock
// the lane
@@ -275,7 +280,9 @@ impl<
fn check_obsolete_call(&self) -> TransactionValidity {
match self.call_info() {
Some(CallInfo::ReceiveMessagesProof(proof_info)) if proof_info.is_obsolete() => {
Some(CallInfo::ReceiveMessagesProof(proof_info))
if proof_info.is_obsolete(T::MessageDispatch::is_active()) =>
{
log::trace!(
target: pallet_bridge_messages::LOG_TARGET,
"Rejecting obsolete messages delivery transaction: {:?}",
@@ -327,8 +334,8 @@ mod tests {
},
messages_call_ext::MessagesCallSubType,
mock::{
MaxUnconfirmedMessagesAtInboundLane, MaxUnrewardedRelayerEntriesAtInboundLane,
TestRuntime, ThisChainRuntimeCall,
DummyMessageDispatch, MaxUnconfirmedMessagesAtInboundLane,
MaxUnrewardedRelayerEntriesAtInboundLane, TestRuntime, ThisChainRuntimeCall,
},
};
use bp_messages::{DeliveredMessages, UnrewardedRelayer, UnrewardedRelayersState};
@@ -435,6 +442,18 @@ mod tests {
});
}
#[test]
fn extension_reject_call_when_dispatcher_is_inactive() {
sp_io::TestExternalities::new(Default::default()).execute_with(|| {
// when current best delivered is message#10 and we're trying to deliver message 11..=15
// => tx is accepted, but we have inactive dispatcher, so...
deliver_message_10();
DummyMessageDispatch::deactivate();
assert!(!validate_message_delivery(11, 15));
});
}
#[test]
fn extension_rejects_empty_delivery_with_rewards_confirmations_if_there_are_free_relayer_and_message_slots(
) {
@@ -22,16 +22,22 @@
//! `XcmRouter` <- `MessageDispatch` <- `InboundMessageQueue`
use bp_messages::{
source_chain::MessagesBridge,
source_chain::{MessagesBridge, OnMessagesDelivered},
target_chain::{DispatchMessage, MessageDispatch},
LaneId,
LaneId, MessageNonce,
};
use bp_runtime::messages::MessageDispatchResult;
use bp_xcm_bridge_hub_router::XcmChannelStatusProvider;
use codec::{Decode, Encode};
use frame_support::{dispatch::Weight, CloneNoBound, EqNoBound, PartialEqNoBound};
use pallet_bridge_messages::WeightInfoExt as MessagesPalletWeights;
use frame_support::{dispatch::Weight, traits::Get, CloneNoBound, EqNoBound, PartialEqNoBound};
use pallet_bridge_messages::{
Config as MessagesConfig, OutboundLanesCongestedSignals, Pallet as MessagesPallet,
WeightInfoExt as MessagesPalletWeights,
};
use scale_info::TypeInfo;
use sp_runtime::SaturatedConversion;
use sp_std::{fmt::Debug, marker::PhantomData};
use xcm::prelude::*;
use xcm_builder::{DispatchBlob, DispatchBlobError, HaulBlob, HaulBlobError};
/// Plain "XCM" payload, which we transfer through bridge
@@ -46,16 +52,25 @@ pub enum XcmBlobMessageDispatchResult {
}
/// [`XcmBlobMessageDispatch`] is responsible for dispatching received messages
pub struct XcmBlobMessageDispatch<DispatchBlob, Weights> {
_marker: sp_std::marker::PhantomData<(DispatchBlob, Weights)>,
///
/// It needs to be used at the target bridge hub.
pub struct XcmBlobMessageDispatch<DispatchBlob, Weights, Channel> {
_marker: sp_std::marker::PhantomData<(DispatchBlob, Weights, Channel)>,
}
impl<BlobDispatcher: DispatchBlob, Weights: MessagesPalletWeights> MessageDispatch
for XcmBlobMessageDispatch<BlobDispatcher, Weights>
impl<
BlobDispatcher: DispatchBlob,
Weights: MessagesPalletWeights,
Channel: XcmChannelStatusProvider,
> MessageDispatch for XcmBlobMessageDispatch<BlobDispatcher, Weights, Channel>
{
type DispatchPayload = XcmAsPlainPayload;
type DispatchLevelResult = XcmBlobMessageDispatchResult;
fn is_active() -> bool {
!Channel::is_congested()
}
fn dispatch_weight(message: &mut DispatchMessage<Self::DispatchPayload>) -> Weight {
match message.data.payload {
Ok(ref payload) => {
@@ -106,40 +121,374 @@ impl<BlobDispatcher: DispatchBlob, Weights: MessagesPalletWeights> MessageDispat
}
}
/// A pair of sending chain location and message lane, used by this chain to send messages
/// over the bridge.
pub struct SenderAndLane {
/// Sending chain relative location.
pub location: MultiLocation,
/// Message lane, used by the sending chain.
pub lane: LaneId,
}
impl SenderAndLane {
/// Create new object using provided location and lane.
pub fn new(location: MultiLocation, lane: LaneId) -> Self {
SenderAndLane { location, lane }
}
}
/// [`XcmBlobHauler`] is responsible for sending messages to the bridge "point-to-point link" from
/// one side, where on the other it can be dispatched by [`XcmBlobMessageDispatch`].
pub trait XcmBlobHauler {
/// Runtime message sender adapter.
type MessageSender: MessagesBridge<XcmAsPlainPayload>;
/// Runtime that has messages pallet deployed.
type Runtime: MessagesConfig<Self::MessagesInstance>;
/// Instance of the messages pallet that is used to send messages.
type MessagesInstance: 'static;
/// Returns lane used by this hauler.
type SenderAndLane: Get<SenderAndLane>;
/// Return message lane (as "point-to-point link") used to deliver XCM messages.
fn xcm_lane() -> LaneId;
/// Actual XCM message sender (`HRMP` or `UMP`) to the source chain
/// location (`Self::SenderAndLane::get().location`).
type ToSourceChainSender: SendXcm;
/// An XCM message that is sent to the sending chain when the bridge queue becomes congested.
type CongestedMessage: Get<Option<Xcm<()>>>;
/// An XCM message that is sent to the sending chain when the bridge queue becomes not
/// congested.
type UncongestedMessage: Get<Option<Xcm<()>>>;
/// Returns `true` if we want to handle congestion.
fn supports_congestion_detection() -> bool {
Self::CongestedMessage::get().is_some() || Self::UncongestedMessage::get().is_some()
}
}
/// XCM bridge adapter which connects [`XcmBlobHauler`] with [`XcmBlobHauler::MessageSender`] and
/// makes sure that XCM blob is sent to the [`pallet_bridge_messages`] queue to be relayed.
/// XCM bridge adapter which connects [`XcmBlobHauler`] with [`pallet_bridge_messages`] and
/// makes sure that XCM blob is sent to the outbound lane to be relayed.
///
/// It needs to be used at the source bridge hub.
pub struct XcmBlobHaulerAdapter<XcmBlobHauler>(sp_std::marker::PhantomData<XcmBlobHauler>);
impl<H: XcmBlobHauler> HaulBlob for XcmBlobHaulerAdapter<H> {
impl<H: XcmBlobHauler> HaulBlob for XcmBlobHaulerAdapter<H>
where
H::Runtime: MessagesConfig<H::MessagesInstance, OutboundPayload = XcmAsPlainPayload>,
{
fn haul_blob(blob: sp_std::prelude::Vec<u8>) -> Result<(), HaulBlobError> {
let lane = H::xcm_lane();
H::MessageSender::send_message(lane, blob)
.map(|artifacts| (lane, artifacts.nonce).using_encoded(sp_io::hashing::blake2_256))
.map(|result| {
let sender_and_lane = H::SenderAndLane::get();
MessagesPallet::<H::Runtime, H::MessagesInstance>::send_message(sender_and_lane.lane, blob)
.map(|artifacts| {
log::info!(
target: crate::LOG_TARGET_BRIDGE_DISPATCH,
"haul_blob result - ok: {:?} on lane: {:?}",
result,
lane
)
"haul_blob result - ok: {:?} on lane: {:?}. Enqueued messages: {}",
artifacts.nonce,
sender_and_lane.lane,
artifacts.enqueued_messages,
);
// notify XCM queue manager about updated lane state
LocalXcmQueueManager::<H>::on_bridge_message_enqueued(
&sender_and_lane,
artifacts.enqueued_messages,
);
})
.map_err(|error| {
log::error!(
target: crate::LOG_TARGET_BRIDGE_DISPATCH,
"haul_blob result - error: {:?} on lane: {:?}",
error,
lane
sender_and_lane.lane,
);
HaulBlobError::Transport("MessageSenderError")
})
}
}
impl<H: XcmBlobHauler> OnMessagesDelivered for XcmBlobHaulerAdapter<H> {
fn on_messages_delivered(lane: LaneId, enqueued_messages: MessageNonce) {
let sender_and_lane = H::SenderAndLane::get();
if sender_and_lane.lane != lane {
return
}
// notify XCM queue manager about updated lane state
LocalXcmQueueManager::<H>::on_bridge_messages_delivered(
&sender_and_lane,
enqueued_messages,
);
}
}
/// Manager of local XCM queues (and indirectly - underlying transport channels) that
/// controls the queue state.
///
/// It needs to be used at the source bridge hub.
pub struct LocalXcmQueueManager<H>(PhantomData<H>);
/// Maximal number of messages in the outbound bridge queue. Once we reach this limit, we
/// send a "congestion" XCM message to the sending chain.
const OUTBOUND_LANE_CONGESTED_THRESHOLD: MessageNonce = 8_192;
/// After we have sent "congestion" XCM message to the sending chain, we wait until number
/// of messages in the outbound bridge queue drops to this count, before sending `uncongestion`
/// XCM message.
const OUTBOUND_LANE_UNCONGESTED_THRESHOLD: MessageNonce = 1_024;
impl<H: XcmBlobHauler> LocalXcmQueueManager<H> {
/// Must be called whenever we push a message to the bridge lane.
pub fn on_bridge_message_enqueued(
sender_and_lane: &SenderAndLane,
enqueued_messages: MessageNonce,
) {
// skip if we dont want to handle congestion
if !H::supports_congestion_detection() {
return
}
// if we have already sent the congestion signal, we don't want to do anything
if Self::is_congested_signal_sent(sender_and_lane.lane) {
return
}
// if the bridge queue is not congested, we don't want to do anything
let is_congested = enqueued_messages > OUTBOUND_LANE_CONGESTED_THRESHOLD;
if !is_congested {
return
}
log::info!(
target: crate::LOG_TARGET_BRIDGE_DISPATCH,
"Sending 'congested' XCM message to {:?} to avoid overloading lane {:?}: there are\
{} messages queued at the bridge queue",
sender_and_lane.location,
sender_and_lane.lane,
enqueued_messages,
);
if let Err(e) = Self::send_congested_signal(sender_and_lane) {
log::info!(
target: crate::LOG_TARGET_BRIDGE_DISPATCH,
"Failed to send the 'congested' XCM message to {:?}: {:?}",
sender_and_lane.location,
e,
);
}
}
/// Must be called whenever we receive a message delivery confirmation.
pub fn on_bridge_messages_delivered(
sender_and_lane: &SenderAndLane,
enqueued_messages: MessageNonce,
) {
// skip if we dont want to handle congestion
if !H::supports_congestion_detection() {
return
}
// if we have not sent the congestion signal before, we don't want to do anything
if !Self::is_congested_signal_sent(sender_and_lane.lane) {
return
}
// if the bridge queue is still congested, we don't want to do anything
let is_congested = enqueued_messages > OUTBOUND_LANE_UNCONGESTED_THRESHOLD;
if is_congested {
return
}
log::info!(
target: crate::LOG_TARGET_BRIDGE_DISPATCH,
"Sending 'uncongested' XCM message to {:?}. Lane {:?}: there are\
{} messages queued at the bridge queue",
sender_and_lane.location,
sender_and_lane.lane,
enqueued_messages,
);
if let Err(e) = Self::send_uncongested_signal(sender_and_lane) {
log::info!(
target: crate::LOG_TARGET_BRIDGE_DISPATCH,
"Failed to send the 'uncongested' XCM message to {:?}: {:?}",
sender_and_lane.location,
e,
);
}
}
/// Returns true if we have sent "congested" signal to the `sending_chain_location`.
fn is_congested_signal_sent(lane: LaneId) -> bool {
OutboundLanesCongestedSignals::<H::Runtime, H::MessagesInstance>::get(lane)
}
/// Send congested signal to the `sending_chain_location`.
fn send_congested_signal(sender_and_lane: &SenderAndLane) -> Result<(), SendError> {
if let Some(msg) = H::CongestedMessage::get() {
send_xcm::<H::ToSourceChainSender>(sender_and_lane.location, msg)?;
OutboundLanesCongestedSignals::<H::Runtime, H::MessagesInstance>::insert(
sender_and_lane.lane,
true,
);
}
Ok(())
}
/// Send `uncongested` signal to the `sending_chain_location`.
fn send_uncongested_signal(sender_and_lane: &SenderAndLane) -> Result<(), SendError> {
if let Some(msg) = H::UncongestedMessage::get() {
send_xcm::<H::ToSourceChainSender>(sender_and_lane.location, msg)?;
OutboundLanesCongestedSignals::<H::Runtime, H::MessagesInstance>::remove(
sender_and_lane.lane,
);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::*;
use bp_messages::OutboundLaneData;
use frame_support::parameter_types;
use pallet_bridge_messages::OutboundLanes;
parameter_types! {
pub TestSenderAndLane: SenderAndLane = SenderAndLane {
location: MultiLocation::new(1, X1(Parachain(1000))),
lane: TEST_LANE_ID,
};
pub DummyXcmMessage: Xcm<()> = Xcm::new();
}
struct DummySendXcm;
impl DummySendXcm {
fn messages_sent() -> u32 {
frame_support::storage::unhashed::get(b"DummySendXcm").unwrap_or(0)
}
}
impl SendXcm for DummySendXcm {
type Ticket = ();
fn validate(
_destination: &mut Option<MultiLocation>,
_message: &mut Option<Xcm<()>>,
) -> SendResult<Self::Ticket> {
Ok(((), Default::default()))
}
fn deliver(_ticket: Self::Ticket) -> Result<XcmHash, SendError> {
let messages_sent: u32 = Self::messages_sent();
frame_support::storage::unhashed::put(b"DummySendXcm", &(messages_sent + 1));
Ok(XcmHash::default())
}
}
struct TestBlobHauler;
impl XcmBlobHauler for TestBlobHauler {
type Runtime = TestRuntime;
type MessagesInstance = ();
type SenderAndLane = TestSenderAndLane;
type ToSourceChainSender = DummySendXcm;
type CongestedMessage = DummyXcmMessage;
type UncongestedMessage = DummyXcmMessage;
}
type TestBlobHaulerAdapter = XcmBlobHaulerAdapter<TestBlobHauler>;
fn fill_up_lane_to_congestion() {
OutboundLanes::<TestRuntime, ()>::insert(
TEST_LANE_ID,
OutboundLaneData {
oldest_unpruned_nonce: 0,
latest_received_nonce: 0,
latest_generated_nonce: OUTBOUND_LANE_CONGESTED_THRESHOLD,
},
);
}
#[test]
fn congested_signal_is_not_sent_twice() {
run_test(|| {
fill_up_lane_to_congestion();
// next sent message leads to congested signal
TestBlobHaulerAdapter::haul_blob(vec![42]).unwrap();
assert_eq!(DummySendXcm::messages_sent(), 1);
// next sent message => we don't sent another congested signal
TestBlobHaulerAdapter::haul_blob(vec![42]).unwrap();
assert_eq!(DummySendXcm::messages_sent(), 1);
});
}
#[test]
fn congested_signal_is_not_sent_when_outbound_lane_is_not_congested() {
run_test(|| {
TestBlobHaulerAdapter::haul_blob(vec![42]).unwrap();
assert_eq!(DummySendXcm::messages_sent(), 0);
});
}
#[test]
fn congested_signal_is_sent_when_outbound_lane_is_congested() {
run_test(|| {
fill_up_lane_to_congestion();
// next sent message leads to congested signal
TestBlobHaulerAdapter::haul_blob(vec![42]).unwrap();
assert_eq!(DummySendXcm::messages_sent(), 1);
assert!(LocalXcmQueueManager::<TestBlobHauler>::is_congested_signal_sent(TEST_LANE_ID));
});
}
#[test]
fn uncongested_signal_is_not_sent_when_messages_are_delivered_at_other_lane() {
run_test(|| {
LocalXcmQueueManager::<TestBlobHauler>::send_congested_signal(&TestSenderAndLane::get()).unwrap();
assert_eq!(DummySendXcm::messages_sent(), 1);
// when we receive a delivery report for other lane, we don't send an uncongested signal
TestBlobHaulerAdapter::on_messages_delivered(LaneId([42, 42, 42, 42]), 0);
assert_eq!(DummySendXcm::messages_sent(), 1);
});
}
#[test]
fn uncongested_signal_is_not_sent_when_we_havent_send_congested_signal_before() {
run_test(|| {
TestBlobHaulerAdapter::on_messages_delivered(TEST_LANE_ID, 0);
assert_eq!(DummySendXcm::messages_sent(), 0);
});
}
#[test]
fn uncongested_signal_is_not_sent_if_outbound_lane_is_still_congested() {
run_test(|| {
LocalXcmQueueManager::<TestBlobHauler>::send_congested_signal(&TestSenderAndLane::get()).unwrap();
assert_eq!(DummySendXcm::messages_sent(), 1);
TestBlobHaulerAdapter::on_messages_delivered(
TEST_LANE_ID,
OUTBOUND_LANE_UNCONGESTED_THRESHOLD + 1,
);
assert_eq!(DummySendXcm::messages_sent(), 1);
});
}
#[test]
fn uncongested_signal_is_sent_if_outbound_lane_is_uncongested() {
run_test(|| {
LocalXcmQueueManager::<TestBlobHauler>::send_congested_signal(&TestSenderAndLane::get()).unwrap();
assert_eq!(DummySendXcm::messages_sent(), 1);
TestBlobHaulerAdapter::on_messages_delivered(
TEST_LANE_ID,
OUTBOUND_LANE_UNCONGESTED_THRESHOLD,
);
assert_eq!(DummySendXcm::messages_sent(), 2);
});
}
}
+37 -3
View File
@@ -33,10 +33,15 @@ use crate::messages::{
};
use bp_header_chain::{ChainWithGrandpa, HeaderChain};
use bp_messages::{target_chain::ForbidInboundMessages, LaneId, MessageNonce};
use bp_messages::{
target_chain::{DispatchMessage, MessageDispatch},
LaneId, MessageNonce,
};
use bp_parachains::SingleParaStoredHeaderDataBuilder;
use bp_relayers::PayRewardFromAccount;
use bp_runtime::{Chain, ChainId, Parachain, UnderlyingChainProvider};
use bp_runtime::{
messages::MessageDispatchResult, Chain, ChainId, Parachain, UnderlyingChainProvider,
};
use codec::{Decode, Encode};
use frame_support::{
parameter_types,
@@ -245,9 +250,10 @@ impl pallet_bridge_messages::Config for TestRuntime {
(),
ConstU64<100_000>,
>;
type OnMessagesDelivered = ();
type SourceHeaderChain = SourceHeaderChainAdapter<OnThisChainBridge>;
type MessageDispatch = ForbidInboundMessages<(), FromBridgedChainMessagePayload>;
type MessageDispatch = DummyMessageDispatch;
type BridgedChainId = BridgedChainId;
}
@@ -259,6 +265,34 @@ impl pallet_bridge_relayers::Config for TestRuntime {
type WeightInfo = ();
}
/// Dummy message dispatcher.
pub struct DummyMessageDispatch;
impl DummyMessageDispatch {
pub fn deactivate() {
frame_support::storage::unhashed::put(&b"inactive"[..], &false);
}
}
impl MessageDispatch for DummyMessageDispatch {
type DispatchPayload = Vec<u8>;
type DispatchLevelResult = ();
fn is_active() -> bool {
frame_support::storage::unhashed::take::<bool>(&b"inactive"[..]) != Some(false)
}
fn dispatch_weight(_message: &mut DispatchMessage<Self::DispatchPayload>) -> Weight {
Weight::zero()
}
fn dispatch(
_: DispatchMessage<Self::DispatchPayload>,
) -> MessageDispatchResult<Self::DispatchLevelResult> {
MessageDispatchResult { unspent_weight: Weight::zero(), dispatch_level_result: () }
}
}
/// Bridge that is deployed on `ThisChain` and allows sending/receiving messages to/from
/// `BridgedChain`.
#[derive(Debug, PartialEq, Eq)]
@@ -494,8 +494,7 @@ where
};
// compute total number of messages in transaction
let bundled_messages =
parsed_call.messages_call_info().bundled_messages().checked_len().unwrap_or(0);
let bundled_messages = parsed_call.messages_call_info().bundled_messages().saturating_len();
// a quick check to avoid invalid high-priority transactions
if bundled_messages > Runtime::MaxUnconfirmedMessagesAtInboundLane::get() {