Prune messages from on-idle callback (#1650)

* prune messages from on-idle callback

* no more secondary lanes at deployments

* clippy

* Update modules/messages/src/lib.rs

Co-authored-by: Adrian Catangiu <adrian@parity.io>

* sub -> add

* more tests + check that message is sent using one of ActiveOutboundLanes

* ensure spent_weight is correct

Co-authored-by: Adrian Catangiu <adrian@parity.io>
This commit is contained in:
Svyatoslav Nikolsky
2022-11-21 13:29:26 +03:00
committed by Bastian Köcher
parent 8e660dd74e
commit eabfea6229
13 changed files with 322 additions and 110 deletions
+233 -20
View File
@@ -65,6 +65,7 @@ use bp_messages::{
use bp_runtime::{BasicOperatingMode, ChainId, OwnedBridgeModule, Size};
use codec::{Decode, Encode, MaxEncodedLen};
use frame_support::{dispatch::PostDispatchInfo, ensure, fail, traits::Get};
use sp_runtime::traits::UniqueSaturatedFrom;
use sp_std::{
cell::RefCell, collections::vec_deque::VecDeque, marker::PhantomData, ops::RangeInclusive,
prelude::*,
@@ -107,10 +108,8 @@ pub mod pallet {
#[pallet::constant]
type BridgedChainId: Get<ChainId>;
/// Maximal number of messages that may be pruned during maintenance. Maintenance occurs
/// whenever new message is sent. The reason is that if you want to use lane, you should
/// be ready to pay for its maintenance.
type MaxMessagesToPruneAtOnce: Get<MessageNonce>;
/// Get all active outbound lanes that the message pallet is serving.
type ActiveOutboundLanes: Get<&'static [LaneId]>;
/// Maximal number of unrewarded relayer entries at inbound lane. Unrewarded means that the
/// relayer has delivered messages, but either confirmations haven't been delivered back to
/// the source chain, or we haven't received reward confirmations yet.
@@ -191,6 +190,40 @@ pub mod pallet {
type OperatingModeStorage = PalletOperatingMode<T, I>;
}
#[pallet::hooks]
impl<T: Config<I>, I: 'static> Hooks<BlockNumberFor<T>> for Pallet<T, I>
where
u32: TryFrom<<T as frame_system::Config>::BlockNumber>,
{
fn on_idle(_block: T::BlockNumber, remaining_weight: Weight) -> Weight {
// we'll need at least to read outbound lane state, kill a message and update lane state
let db_weight = T::DbWeight::get();
if !remaining_weight.all_gte(db_weight.reads_writes(1, 2)) {
return Weight::zero()
}
// messages from lane with index `i` in `ActiveOutboundLanes` are pruned when
// `System::block_number() % lanes.len() == i`. Otherwise we need to read lane states on
// every block, wasting the whole `remaining_weight` for nothing and causing starvation
// of the last lane pruning
let active_lanes = T::ActiveOutboundLanes::get();
let active_lanes_len = (active_lanes.len() as u32).into();
let active_lane_index = u32::unique_saturated_from(
frame_system::Pallet::<T>::block_number() % active_lanes_len,
);
let active_lane_id = active_lanes[active_lane_index as usize];
// first db read - outbound lane state
let mut active_lane = outbound_lane::<T, I>(active_lane_id);
let mut used_weight = db_weight.reads(1);
// and here we'll have writes
used_weight += active_lane.prune_messages(db_weight, remaining_weight - used_weight);
// we already checked we have enough `remaining_weight` to cover this `used_weight`
used_weight
}
}
#[pallet::call]
impl<T: Config<I>, I: 'static> Pallet<T, I> {
/// Change `PalletOwner`.
@@ -469,6 +502,8 @@ pub mod pallet {
pub enum Error<T, I = ()> {
/// Pallet is not in Normal operating mode.
NotOperatingNormally,
/// The outbound lane is inactive.
InactiveOutboundLane,
/// The message is too large to be sent over the bridge.
MessageIsTooLarge,
/// Message has been treated as invalid by chain verifier.
@@ -614,8 +649,8 @@ fn send_message<T: Config<I>, I: 'static>(
> {
ensure_normal_operating_mode::<T, I>()?;
// initially, actual (post-dispatch) weight is equal to pre-dispatch weight
let mut actual_weight = frame_support::weights::Weight::zero(); // TODO (https://github.com/paritytech/parity-bridges-common/issues/1647): remove this
// let's check if outbound lane is active
ensure!(T::ActiveOutboundLanes::get().contains(&lane_id), Error::<T, I>::InactiveOutboundLane,);
// let's first check if message can be delivered to target chain
T::TargetHeaderChain::verify_message(&payload).map_err(|err| {
@@ -653,15 +688,6 @@ fn send_message<T: Config<I>, I: 'static>(
);
let nonce = lane.send_message(encoded_payload);
// message sender pays for pruning at most `MaxMessagesToPruneAtOnce` messages
// the cost of pruning every message is roughly single db write
// => lets refund sender if less than `MaxMessagesToPruneAtOnce` messages pruned
let max_messages_to_prune = T::MaxMessagesToPruneAtOnce::get();
let pruned_messages = lane.prune_messages(max_messages_to_prune);
if let Some(extra_messages) = max_messages_to_prune.checked_sub(pruned_messages) {
actual_weight = actual_weight.saturating_sub(T::DbWeight::get().writes(extra_messages));
}
log::trace!(
target: LOG_TARGET,
"Accepted message {} to lane {:?}. Message size: {:?}",
@@ -672,6 +698,14 @@ fn send_message<T: Config<I>, I: 'static>(
Pallet::<T, I>::deposit_event(Event::MessageAccepted { lane_id, nonce });
// we may introduce benchmarks for that, but no heavy ops planned here apart from
// db reads and writes. There are currently 2 db reads and 2 db writes:
// - one db read for operation mode check (`ensure_normal_operating_mode`);
// - one db read for outbound lane state (`outbound_lane`);
// - one db write for outbound lane state (`send_message`);
// - one db write for the message (`send_message`);
let actual_weight = T::DbWeight::get().reads_writes(2, 2);
Ok(SendMessageArtifacts { nonce, weight: actual_weight })
}
@@ -851,17 +885,18 @@ fn verify_and_decode_messages_proof<Chain: SourceHeaderChain, DispatchPayload: D
mod tests {
use super::*;
use crate::mock::{
message, message_payload, run_test, unrewarded_relayer, RuntimeEvent as TestEvent,
RuntimeOrigin, TestMessageDeliveryAndDispatchPayment, TestMessagesDeliveryProof,
TestMessagesProof, TestRuntime, MAX_OUTBOUND_PAYLOAD_SIZE,
PAYLOAD_REJECTED_BY_TARGET_CHAIN, REGULAR_PAYLOAD, TEST_LANE_ID, TEST_RELAYER_A,
TEST_RELAYER_B,
message, message_payload, run_test, unrewarded_relayer, DbWeight,
RuntimeEvent as TestEvent, RuntimeOrigin, TestMessageDeliveryAndDispatchPayment,
TestMessagesDeliveryProof, TestMessagesProof, TestRuntime, MAX_OUTBOUND_PAYLOAD_SIZE,
PAYLOAD_REJECTED_BY_TARGET_CHAIN, REGULAR_PAYLOAD, TEST_LANE_ID, TEST_LANE_ID_2,
TEST_LANE_ID_3, TEST_RELAYER_A, TEST_RELAYER_B,
};
use bp_messages::{UnrewardedRelayer, UnrewardedRelayersState};
use bp_test_utils::generate_owned_bridge_module_tests;
use frame_support::{
assert_noop, assert_ok,
storage::generator::{StorageMap, StorageValue},
traits::Hooks,
weights::Weight,
};
use frame_system::{EventRecord, Pallet as System, Phase};
@@ -1686,6 +1721,184 @@ mod tests {
});
}
#[test]
fn on_idle_callback_respects_remaining_weight() {
run_test(|| {
send_regular_message();
send_regular_message();
send_regular_message();
send_regular_message();
assert_ok!(Pallet::<TestRuntime>::receive_messages_delivery_proof(
RuntimeOrigin::signed(1),
TestMessagesDeliveryProof(Ok((
TEST_LANE_ID,
InboundLaneData {
last_confirmed_nonce: 4,
relayers: vec![unrewarded_relayer(1, 4, TEST_RELAYER_A)]
.into_iter()
.collect(),
},
))),
UnrewardedRelayersState {
unrewarded_relayer_entries: 1,
messages_in_oldest_entry: 4,
total_messages: 4,
last_delivered_nonce: 4,
},
));
// all 4 messages may be pruned now
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().latest_received_nonce,
4
);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce,
1
);
System::<TestRuntime>::set_block_number(2);
// if passed wight is too low to do anything
let dbw = DbWeight::get();
assert_eq!(
Pallet::<TestRuntime, ()>::on_idle(0, dbw.reads_writes(1, 1)),
Weight::zero(),
);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce,
1
);
// if passed wight is enough to prune single message
assert_eq!(
Pallet::<TestRuntime, ()>::on_idle(0, dbw.reads_writes(1, 2)),
dbw.reads_writes(1, 2),
);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce,
2
);
// if passed wight is enough to prune two more messages
assert_eq!(
Pallet::<TestRuntime, ()>::on_idle(0, dbw.reads_writes(1, 3)),
dbw.reads_writes(1, 3),
);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce,
4
);
// if passed wight is enough to prune many messages
assert_eq!(
Pallet::<TestRuntime, ()>::on_idle(0, dbw.reads_writes(100, 100)),
dbw.reads_writes(1, 2),
);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce,
5
);
});
}
#[test]
fn on_idle_callback_is_rotating_lanes_to_prune() {
run_test(|| {
// send + receive confirmation for lane 1
send_regular_message();
receive_messages_delivery_proof();
// send + receive confirmation for lane 2
assert_ok!(send_message::<TestRuntime, ()>(
RuntimeOrigin::signed(1),
TEST_LANE_ID_2,
REGULAR_PAYLOAD,
));
assert_ok!(Pallet::<TestRuntime>::receive_messages_delivery_proof(
RuntimeOrigin::signed(1),
TestMessagesDeliveryProof(Ok((
TEST_LANE_ID_2,
InboundLaneData {
last_confirmed_nonce: 1,
relayers: vec![unrewarded_relayer(1, 1, TEST_RELAYER_A)]
.into_iter()
.collect(),
},
))),
UnrewardedRelayersState {
unrewarded_relayer_entries: 1,
messages_in_oldest_entry: 1,
total_messages: 1,
last_delivered_nonce: 1,
},
));
// nothing is pruned yet
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().latest_received_nonce,
1
);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce,
1
);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID_2).data().latest_received_nonce,
1
);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID_2).data().oldest_unpruned_nonce,
1
);
// in block#2.on_idle lane messages of lane 1 are pruned
let dbw = DbWeight::get();
System::<TestRuntime>::set_block_number(2);
assert_eq!(
Pallet::<TestRuntime, ()>::on_idle(0, dbw.reads_writes(100, 100)),
dbw.reads_writes(1, 2),
);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce,
2
);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID_2).data().oldest_unpruned_nonce,
1
);
// in block#3.on_idle lane messages of lane 2 are pruned
System::<TestRuntime>::set_block_number(3);
assert_eq!(
Pallet::<TestRuntime, ()>::on_idle(0, dbw.reads_writes(100, 100)),
dbw.reads_writes(1, 2),
);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID).data().oldest_unpruned_nonce,
2
);
assert_eq!(
outbound_lane::<TestRuntime, ()>(TEST_LANE_ID_2).data().oldest_unpruned_nonce,
2
);
});
}
#[test]
fn outbound_message_from_unconfigured_lane_is_rejected() {
run_test(|| {
assert_noop!(
send_message::<TestRuntime, ()>(
RuntimeOrigin::signed(1),
TEST_LANE_ID_3,
REGULAR_PAYLOAD,
),
Error::<TestRuntime, ()>::InactiveOutboundLane,
);
});
}
generate_owned_bridge_module_tests!(
MessagesOperatingMode::Basic(BasicOperatingMode::Normal),
MessagesOperatingMode::Basic(BasicOperatingMode::Halted)
+8 -1
View File
@@ -141,12 +141,13 @@ parameter_types! {
pub const MaxUnrewardedRelayerEntriesAtInboundLane: u64 = 16;
pub const MaxUnconfirmedMessagesAtInboundLane: u64 = 32;
pub const TestBridgedChainId: bp_runtime::ChainId = *b"test";
pub const ActiveOutboundLanes: &'static [LaneId] = &[TEST_LANE_ID, TEST_LANE_ID_2];
}
impl Config for TestRuntime {
type RuntimeEvent = RuntimeEvent;
type WeightInfo = ();
type MaxMessagesToPruneAtOnce = MaxMessagesToPruneAtOnce;
type ActiveOutboundLanes = ActiveOutboundLanes;
type MaxUnrewardedRelayerEntriesAtInboundLane = MaxUnrewardedRelayerEntriesAtInboundLane;
type MaxUnconfirmedMessagesAtInboundLane = MaxUnconfirmedMessagesAtInboundLane;
@@ -192,6 +193,12 @@ pub const TEST_ERROR: &str = "Test error";
/// Lane that we're using in tests.
pub const TEST_LANE_ID: LaneId = [0, 0, 0, 1];
/// Secondary lane that we're using in tests.
pub const TEST_LANE_ID_2: LaneId = [0, 0, 0, 2];
/// Inactive outbound lane.
pub const TEST_LANE_ID_3: LaneId = [0, 0, 0, 3];
/// Regular message payload.
pub const REGULAR_PAYLOAD: TestPayload = message_payload(0, 50);
+46 -12
View File
@@ -23,7 +23,11 @@ use bp_messages::{
DeliveredMessages, DispatchResultsBitVec, LaneId, MessageNonce, MessagePayload,
OutboundLaneData, UnrewardedRelayer,
};
use frame_support::{BoundedVec, RuntimeDebug};
use frame_support::{
weights::{RuntimeDbWeight, Weight},
BoundedVec, RuntimeDebug,
};
use num_traits::Zero;
use sp_std::collections::vec_deque::VecDeque;
/// Outbound lane storage.
@@ -147,24 +151,32 @@ impl<S: OutboundLaneStorage> OutboundLane<S> {
/// Prune at most `max_messages_to_prune` already received messages.
///
/// Returns number of pruned messages.
pub fn prune_messages(&mut self, max_messages_to_prune: MessageNonce) -> MessageNonce {
let mut pruned_messages = 0;
/// Returns weight, consumed by messages pruning and lane state update.
pub fn prune_messages(
&mut self,
db_weight: RuntimeDbWeight,
mut remaining_weight: Weight,
) -> Weight {
let write_weight = db_weight.writes(1);
let two_writes_weight = write_weight + write_weight;
let mut spent_weight = Weight::zero();
let mut data = self.storage.data();
while pruned_messages < max_messages_to_prune &&
while remaining_weight.all_gte(two_writes_weight) &&
data.oldest_unpruned_nonce <= data.latest_received_nonce
{
self.storage.remove_message(&data.oldest_unpruned_nonce);
pruned_messages += 1;
spent_weight += write_weight;
remaining_weight -= write_weight;
data.oldest_unpruned_nonce += 1;
}
if pruned_messages > 0 {
if !spent_weight.is_zero() {
spent_weight += write_weight;
self.storage.set_data(data);
}
pruned_messages
spent_weight
}
}
@@ -246,6 +258,7 @@ mod tests {
},
outbound_lane,
};
use frame_support::weights::constants::RocksDbWeight;
use sp_std::ops::RangeInclusive;
fn unrewarded_relayers(
@@ -413,27 +426,48 @@ mod tests {
run_test(|| {
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
// when lane is empty, nothing is pruned
assert_eq!(lane.prune_messages(100), 0);
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
Weight::zero()
);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
// when nothing is confirmed, nothing is pruned
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
lane.send_message(outbound_message_data(REGULAR_PAYLOAD));
assert_eq!(lane.prune_messages(100), 0);
assert!(lane.storage.message(&1).is_some());
assert!(lane.storage.message(&2).is_some());
assert!(lane.storage.message(&3).is_some());
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
Weight::zero()
);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
// after confirmation, some messages are received
assert_eq!(
lane.confirm_delivery(2, 2, &unrewarded_relayers(1..=2)),
ReceivalConfirmationResult::ConfirmedMessages(delivered_messages(1..=2)),
);
assert_eq!(lane.prune_messages(100), 2);
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
RocksDbWeight::get().writes(3),
);
assert!(lane.storage.message(&1).is_none());
assert!(lane.storage.message(&2).is_none());
assert!(lane.storage.message(&3).is_some());
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 3);
// after last message is confirmed, everything is pruned
assert_eq!(
lane.confirm_delivery(1, 3, &unrewarded_relayers(3..=3)),
ReceivalConfirmationResult::ConfirmedMessages(delivered_messages(3..=3)),
);
assert_eq!(lane.prune_messages(100), 1);
assert_eq!(
lane.prune_messages(RocksDbWeight::get(), RocksDbWeight::get().writes(101)),
RocksDbWeight::get().writes(2),
);
assert!(lane.storage.message(&1).is_none());
assert!(lane.storage.message(&2).is_none());
assert!(lane.storage.message(&3).is_none());
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}