mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 22:41:06 +00:00
Remove queueing from message-lane (#352)
* remove queueing from message-lane * also remove queueing from RPCs * another trace * new clippy
This commit is contained in:
committed by
Bastian Köcher
parent
c2791c2772
commit
d918bcb6f8
@@ -184,7 +184,7 @@ fn prepare_deposit_details<T: Trait<I>, I: Instance>(
|
|||||||
) -> Result<DepositDetails<T, I>, Error<T, I>> {
|
) -> Result<DepositDetails<T, I>, Error<T, I>> {
|
||||||
// ensure that transaction is included in finalized block that we know of
|
// ensure that transaction is included in finalized block that we know of
|
||||||
let transaction = <T as Trait<I>>::PeerBlockchain::verify_transaction_inclusion_proof(proof)
|
let transaction = <T as Trait<I>>::PeerBlockchain::verify_transaction_inclusion_proof(proof)
|
||||||
.ok_or_else(|| Error::<T, I>::UnfinalizedTransaction)?;
|
.ok_or(Error::<T, I>::UnfinalizedTransaction)?;
|
||||||
|
|
||||||
// parse transaction
|
// parse transaction
|
||||||
let transaction =
|
let transaction =
|
||||||
|
|||||||
@@ -122,7 +122,7 @@ pub fn accept_aura_header_into_pool<S: Storage>(
|
|||||||
validator_checks(config, &best_context.validators_set().validators, header, header_step);
|
validator_checks(config, &best_context.validators_set().validators, header, header_step);
|
||||||
if let Err(error) = validators_check_result {
|
if let Err(error) = validators_check_result {
|
||||||
find_next_validators_signal(storage, &best_context)
|
find_next_validators_signal(storage, &best_context)
|
||||||
.ok_or_else(|| error)
|
.ok_or(error)
|
||||||
.and_then(|next_validators| validator_checks(config, &next_validators, header, header_step))?;
|
.and_then(|next_validators| validator_checks(config, &next_validators, header, header_step))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -41,10 +41,6 @@ pub type MessagesProof = Bytes;
|
|||||||
/// SCALE-encoded trie nodes array `Vec<Vec<u8>>`.
|
/// SCALE-encoded trie nodes array `Vec<Vec<u8>>`.
|
||||||
pub type MessagesReceivingProof = Bytes;
|
pub type MessagesReceivingProof = Bytes;
|
||||||
|
|
||||||
/// Trie-based storage proof that the message(s) with given key(s) have been processed by the bridged chain.
|
|
||||||
/// SCALE-encoded trie nodes array `Vec<Vec<u8>>`.
|
|
||||||
pub type MessagesProcessingProof = Bytes;
|
|
||||||
|
|
||||||
/// Runtime adapter.
|
/// Runtime adapter.
|
||||||
pub trait Runtime: Send + Sync + 'static {
|
pub trait Runtime: Send + Sync + 'static {
|
||||||
/// Return runtime storage key for given message. May return None if instance is unknown.
|
/// Return runtime storage key for given message. May return None if instance is unknown.
|
||||||
@@ -75,15 +71,6 @@ pub trait MessageLaneApi<BlockHash> {
|
|||||||
lane: LaneId,
|
lane: LaneId,
|
||||||
block: Option<BlockHash>,
|
block: Option<BlockHash>,
|
||||||
) -> FutureResult<MessagesReceivingProof>;
|
) -> FutureResult<MessagesReceivingProof>;
|
||||||
|
|
||||||
/// Returns proof-of-message(s) processing.
|
|
||||||
#[rpc(name = "messageLane_proveMessagesProcessing")]
|
|
||||||
fn prove_messages_processing(
|
|
||||||
&self,
|
|
||||||
instance: InstanceId,
|
|
||||||
lane: LaneId,
|
|
||||||
block: Option<BlockHash>,
|
|
||||||
) -> FutureResult<MessagesProcessingProof>;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Implements the MessageLaneApi trait for interacting with message lanes.
|
/// Implements the MessageLaneApi trait for interacting with message lanes.
|
||||||
@@ -150,25 +137,6 @@ where
|
|||||||
.map_err(Into::into),
|
.map_err(Into::into),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn prove_messages_processing(
|
|
||||||
&self,
|
|
||||||
instance: InstanceId,
|
|
||||||
lane: LaneId,
|
|
||||||
block: Option<Block::Hash>,
|
|
||||||
) -> FutureResult<MessagesProcessingProof> {
|
|
||||||
Box::new(
|
|
||||||
prove_keys_read(
|
|
||||||
self.backend.clone(),
|
|
||||||
block,
|
|
||||||
vec![self.runtime.inbound_lane_data_key(&instance, &lane)],
|
|
||||||
)
|
|
||||||
.boxed()
|
|
||||||
.compat()
|
|
||||||
.map(serialize_storage_proof)
|
|
||||||
.map_err(Into::into),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn prove_keys_read<Block, Backend>(
|
async fn prove_keys_read<Block, Backend>(
|
||||||
|
|||||||
@@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
//! Everything about incoming messages receival.
|
//! Everything about incoming messages receival.
|
||||||
|
|
||||||
use bp_message_lane::{InboundLaneData, LaneId, Message, MessageKey, MessageNonce, MessageResult, OnMessageReceived};
|
use bp_message_lane::{InboundLaneData, LaneId, Message, MessageKey, MessageNonce, OnMessageReceived};
|
||||||
|
|
||||||
/// Inbound lane storage.
|
/// Inbound lane storage.
|
||||||
pub trait InboundLaneStorage {
|
pub trait InboundLaneStorage {
|
||||||
@@ -29,12 +29,6 @@ pub trait InboundLaneStorage {
|
|||||||
fn data(&self) -> InboundLaneData;
|
fn data(&self) -> InboundLaneData;
|
||||||
/// Update lane data in the storage.
|
/// Update lane data in the storage.
|
||||||
fn set_data(&mut self, data: InboundLaneData);
|
fn set_data(&mut self, data: InboundLaneData);
|
||||||
/// Returns saved inbound message payload.
|
|
||||||
fn message(&self, nonce: &MessageNonce) -> Option<Self::Payload>;
|
|
||||||
/// Save inbound message in the storage.
|
|
||||||
fn save_message(&mut self, nonce: MessageNonce, payload: Self::Payload);
|
|
||||||
/// Remove inbound message from the storage.
|
|
||||||
fn remove_message(&mut self, nonce: &MessageNonce);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Inbound messages lane.
|
/// Inbound messages lane.
|
||||||
@@ -49,11 +43,10 @@ impl<S: InboundLaneStorage> InboundLane<S> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Receive new message.
|
/// Receive new message.
|
||||||
pub fn receive_message(
|
pub fn receive_message<P: OnMessageReceived<S::Payload>>(
|
||||||
&mut self,
|
&mut self,
|
||||||
nonce: MessageNonce,
|
nonce: MessageNonce,
|
||||||
payload: S::Payload,
|
payload: S::Payload,
|
||||||
processor: &mut impl OnMessageReceived<S::Payload>,
|
|
||||||
) -> bool {
|
) -> bool {
|
||||||
let mut data = self.storage.data();
|
let mut data = self.storage.data();
|
||||||
let is_correct_message = nonce == data.latest_received_nonce + 1;
|
let is_correct_message = nonce == data.latest_received_nonce + 1;
|
||||||
@@ -61,70 +54,19 @@ impl<S: InboundLaneStorage> InboundLane<S> {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
let is_process_required = is_correct_message && data.oldest_unprocessed_nonce == nonce;
|
|
||||||
data.latest_received_nonce = nonce;
|
data.latest_received_nonce = nonce;
|
||||||
self.storage.set_data(data);
|
self.storage.set_data(data);
|
||||||
|
|
||||||
let payload_to_save = match is_process_required {
|
P::on_message_received(Message {
|
||||||
true => {
|
key: MessageKey {
|
||||||
let message = Message {
|
lane_id: self.storage.id(),
|
||||||
key: MessageKey {
|
nonce,
|
||||||
lane_id: self.storage.id(),
|
},
|
||||||
nonce,
|
payload,
|
||||||
},
|
});
|
||||||
payload,
|
|
||||||
};
|
|
||||||
match processor.on_message_received(message) {
|
|
||||||
MessageResult::Processed => None,
|
|
||||||
MessageResult::NotProcessed(message) => Some(message.payload),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
false => Some(payload),
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(payload_to_save) = payload_to_save {
|
|
||||||
self.storage.save_message(nonce, payload_to_save);
|
|
||||||
}
|
|
||||||
|
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process stored lane messages.
|
|
||||||
///
|
|
||||||
/// Stops processing either when all messages are processed, or when processor returns
|
|
||||||
/// MessageResult::NotProcessed.
|
|
||||||
pub fn process_messages(&mut self, processor: &mut impl OnMessageReceived<S::Payload>) {
|
|
||||||
let mut anything_processed = false;
|
|
||||||
let mut data = self.storage.data();
|
|
||||||
while data.oldest_unprocessed_nonce <= data.latest_received_nonce {
|
|
||||||
let nonce = data.oldest_unprocessed_nonce;
|
|
||||||
let payload = self
|
|
||||||
.storage
|
|
||||||
.message(&nonce)
|
|
||||||
.expect("message is referenced by lane; referenced message is not pruned; qed");
|
|
||||||
let message = Message {
|
|
||||||
key: MessageKey {
|
|
||||||
lane_id: self.storage.id(),
|
|
||||||
nonce,
|
|
||||||
},
|
|
||||||
payload,
|
|
||||||
};
|
|
||||||
|
|
||||||
let process_result = processor.on_message_received(message);
|
|
||||||
if let MessageResult::NotProcessed(_) = process_result {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.storage.remove_message(&nonce);
|
|
||||||
|
|
||||||
anything_processed = true;
|
|
||||||
data.oldest_unprocessed_nonce += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if anything_processed {
|
|
||||||
self.storage.set_data(data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -132,105 +74,24 @@ mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use crate::{
|
use crate::{
|
||||||
inbound_lane,
|
inbound_lane,
|
||||||
mock::{
|
mock::{run_test, TestRuntime, REGULAR_PAYLOAD, TEST_LANE_ID},
|
||||||
run_test, TestMessageProcessor, TestPayload, TestRuntime, PAYLOAD_TO_QUEUE, REGULAR_PAYLOAD, TEST_LANE_ID,
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn fails_to_receive_message_with_incorrect_nonce() {
|
fn fails_to_receive_message_with_incorrect_nonce() {
|
||||||
run_test(|| {
|
run_test(|| {
|
||||||
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
|
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
|
||||||
assert!(!lane.receive_message(10, REGULAR_PAYLOAD, &mut TestMessageProcessor));
|
assert!(!lane.receive_message::<()>(10, REGULAR_PAYLOAD));
|
||||||
assert!(lane.storage.message(&10).is_none());
|
|
||||||
assert_eq!(lane.storage.data().latest_received_nonce, 0);
|
assert_eq!(lane.storage.data().latest_received_nonce, 0);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn correct_message_is_queued_if_some_other_messages_are_queued() {
|
fn correct_message_is_processed_instantly() {
|
||||||
run_test(|| {
|
run_test(|| {
|
||||||
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
|
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
|
||||||
assert!(lane.receive_message(1, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
|
assert!(lane.receive_message::<()>(1, REGULAR_PAYLOAD));
|
||||||
assert!(lane.storage.message(&1).is_some());
|
|
||||||
assert!(lane.receive_message(2, REGULAR_PAYLOAD, &mut TestMessageProcessor));
|
|
||||||
assert!(lane.storage.message(&2).is_some());
|
|
||||||
assert_eq!(lane.storage.data().latest_received_nonce, 2);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn correct_message_is_queued_if_processor_wants_to_queue() {
|
|
||||||
run_test(|| {
|
|
||||||
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
|
|
||||||
assert!(lane.receive_message(1, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
|
|
||||||
assert!(lane.storage.message(&1).is_some());
|
|
||||||
assert_eq!(lane.storage.data().latest_received_nonce, 1);
|
assert_eq!(lane.storage.data().latest_received_nonce, 1);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn correct_message_is_not_queued_if_processed_instantly() {
|
|
||||||
run_test(|| {
|
|
||||||
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
|
|
||||||
assert!(lane.receive_message(1, REGULAR_PAYLOAD, &mut TestMessageProcessor));
|
|
||||||
assert!(lane.storage.message(&1).is_none());
|
|
||||||
assert_eq!(lane.storage.data().latest_received_nonce, 1);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_message_does_nothing_when_lane_is_empty() {
|
|
||||||
run_test(|| {
|
|
||||||
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
|
|
||||||
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 1);
|
|
||||||
lane.process_messages(&mut TestMessageProcessor);
|
|
||||||
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 1);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_message_works() {
|
|
||||||
run_test(|| {
|
|
||||||
pub struct QueueByNonce(MessageNonce);
|
|
||||||
|
|
||||||
impl OnMessageReceived<TestPayload> for QueueByNonce {
|
|
||||||
fn on_message_received(&mut self, message: Message<TestPayload>) -> MessageResult<TestPayload> {
|
|
||||||
if message.key.nonce == self.0 {
|
|
||||||
MessageResult::NotProcessed(message)
|
|
||||||
} else {
|
|
||||||
MessageResult::Processed
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
|
|
||||||
assert!(lane.receive_message(1, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
|
|
||||||
assert!(lane.receive_message(2, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
|
|
||||||
assert!(lane.receive_message(3, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
|
|
||||||
assert!(lane.receive_message(4, REGULAR_PAYLOAD, &mut TestMessageProcessor));
|
|
||||||
|
|
||||||
assert!(lane.storage.message(&1).is_some());
|
|
||||||
assert!(lane.storage.message(&2).is_some());
|
|
||||||
assert!(lane.storage.message(&3).is_some());
|
|
||||||
assert!(lane.storage.message(&4).is_some());
|
|
||||||
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 1);
|
|
||||||
|
|
||||||
lane.process_messages(&mut QueueByNonce(3));
|
|
||||||
|
|
||||||
assert!(lane.storage.message(&1).is_none());
|
|
||||||
assert!(lane.storage.message(&2).is_none());
|
|
||||||
assert!(lane.storage.message(&3).is_some());
|
|
||||||
assert!(lane.storage.message(&4).is_some());
|
|
||||||
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 3);
|
|
||||||
|
|
||||||
lane.process_messages(&mut QueueByNonce(10));
|
|
||||||
|
|
||||||
assert!(lane.storage.message(&1).is_none());
|
|
||||||
assert!(lane.storage.message(&2).is_none());
|
|
||||||
assert!(lane.storage.message(&3).is_none());
|
|
||||||
assert!(lane.storage.message(&4).is_none());
|
|
||||||
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 5);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -58,15 +58,13 @@ pub trait Trait<I = DefaultInstance>: frame_system::Trait {
|
|||||||
/// for it.
|
/// for it.
|
||||||
type MaxMessagesToPruneAtOnce: Get<MessageNonce>;
|
type MaxMessagesToPruneAtOnce: Get<MessageNonce>;
|
||||||
/// Called when message has been received.
|
/// Called when message has been received.
|
||||||
type OnMessageReceived: Default + OnMessageReceived<Self::Payload>;
|
type OnMessageReceived: OnMessageReceived<Self::Payload>;
|
||||||
}
|
}
|
||||||
|
|
||||||
decl_storage! {
|
decl_storage! {
|
||||||
trait Store for Module<T: Trait<I>, I: Instance = DefaultInstance> as MessageLane {
|
trait Store for Module<T: Trait<I>, I: Instance = DefaultInstance> as MessageLane {
|
||||||
/// Map of lane id => inbound lane data.
|
/// Map of lane id => inbound lane data.
|
||||||
InboundLanes: map hasher(blake2_128_concat) LaneId => InboundLaneData;
|
InboundLanes: map hasher(blake2_128_concat) LaneId => InboundLaneData;
|
||||||
/// All stored (unprocessed) inbound messages.
|
|
||||||
InboundMessages: map hasher(blake2_128_concat) MessageKey => Option<T::Payload>;
|
|
||||||
/// Map of lane id => outbound lane data.
|
/// Map of lane id => outbound lane data.
|
||||||
OutboundLanes: map hasher(blake2_128_concat) LaneId => OutboundLaneData;
|
OutboundLanes: map hasher(blake2_128_concat) LaneId => OutboundLaneData;
|
||||||
/// All queued outbound messages.
|
/// All queued outbound messages.
|
||||||
@@ -80,10 +78,8 @@ decl_event!(
|
|||||||
{
|
{
|
||||||
/// Message has been accepted and is waiting to be delivered.
|
/// Message has been accepted and is waiting to be delivered.
|
||||||
MessageAccepted(LaneId, MessageNonce),
|
MessageAccepted(LaneId, MessageNonce),
|
||||||
/// Messages in the inclusive range have been delivered to the bridged chain.
|
/// Messages in the inclusive range have been delivered and processed by the bridged chain.
|
||||||
MessagesDelivered(LaneId, MessageNonce, MessageNonce),
|
MessagesDelivered(LaneId, MessageNonce, MessageNonce),
|
||||||
/// Messages in the inclusive range have been processed by the bridged chain.
|
|
||||||
MessagesProcessed(LaneId, MessageNonce, MessageNonce),
|
|
||||||
/// Phantom member, never used.
|
/// Phantom member, never used.
|
||||||
Dummy(PhantomData<(AccountId, I)>),
|
Dummy(PhantomData<(AccountId, I)>),
|
||||||
}
|
}
|
||||||
@@ -126,10 +122,9 @@ impl<T: Trait<I>, I: Instance> Module<T, I> {
|
|||||||
/// will be rejected.
|
/// will be rejected.
|
||||||
pub fn receive_messages(messages: Vec<Message<T::Payload>>) -> MessageNonce {
|
pub fn receive_messages(messages: Vec<Message<T::Payload>>) -> MessageNonce {
|
||||||
let mut correct_messages = 0;
|
let mut correct_messages = 0;
|
||||||
let mut processor = T::OnMessageReceived::default();
|
|
||||||
for message in messages {
|
for message in messages {
|
||||||
let mut lane = inbound_lane::<T, I>(message.key.lane_id);
|
let mut lane = inbound_lane::<T, I>(message.key.lane_id);
|
||||||
if lane.receive_message(message.key.nonce, message.payload, &mut processor) {
|
if lane.receive_message::<T::OnMessageReceived>(message.key.nonce, message.payload) {
|
||||||
correct_messages += 1;
|
correct_messages += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -137,14 +132,6 @@ impl<T: Trait<I>, I: Instance> Module<T, I> {
|
|||||||
correct_messages
|
correct_messages
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process stored lane messages.
|
|
||||||
///
|
|
||||||
/// Stops processing either when all messages are processed, or when processor returns
|
|
||||||
/// MessageResult::NotProcessed.
|
|
||||||
pub fn process_lane_messages(lane_id: &LaneId, processor: &mut impl OnMessageReceived<T::Payload>) {
|
|
||||||
inbound_lane::<T, I>(*lane_id).process_messages(processor);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Receive TRUSTED proof of message receival.
|
/// Receive TRUSTED proof of message receival.
|
||||||
///
|
///
|
||||||
/// Trusted here means that the function itself doesn't check whether the bridged chain has
|
/// Trusted here means that the function itself doesn't check whether the bridged chain has
|
||||||
@@ -164,23 +151,6 @@ impl<T: Trait<I>, I: Instance> Module<T, I> {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Receive TRUSTED proof of message processing.
|
|
||||||
///
|
|
||||||
/// Trusted here means that the function itself doesn't check whether the bridged chain has
|
|
||||||
/// actually processed these messages.
|
|
||||||
pub fn confirm_processing(lane_id: &LaneId, latest_processed_nonce: MessageNonce) {
|
|
||||||
let mut lane = outbound_lane::<T, I>(*lane_id);
|
|
||||||
let processed_range = lane.confirm_processing(latest_processed_nonce);
|
|
||||||
|
|
||||||
if let Some(processed_range) = processed_range {
|
|
||||||
Self::deposit_event(RawEvent::MessagesProcessed(
|
|
||||||
*lane_id,
|
|
||||||
processed_range.0,
|
|
||||||
processed_range.1,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates new inbound lane object, backed by runtime storage.
|
/// Creates new inbound lane object, backed by runtime storage.
|
||||||
@@ -219,30 +189,6 @@ impl<T: Trait<I>, I: Instance> InboundLaneStorage for RuntimeInboundLaneStorage<
|
|||||||
fn set_data(&mut self, data: InboundLaneData) {
|
fn set_data(&mut self, data: InboundLaneData) {
|
||||||
InboundLanes::<I>::insert(&self.lane_id, data)
|
InboundLanes::<I>::insert(&self.lane_id, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn message(&self, nonce: &MessageNonce) -> Option<Self::Payload> {
|
|
||||||
InboundMessages::<T, I>::get(MessageKey {
|
|
||||||
lane_id: self.lane_id,
|
|
||||||
nonce: *nonce,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn save_message(&mut self, nonce: MessageNonce, payload: T::Payload) {
|
|
||||||
InboundMessages::<T, I>::insert(
|
|
||||||
MessageKey {
|
|
||||||
lane_id: self.lane_id,
|
|
||||||
nonce,
|
|
||||||
},
|
|
||||||
payload,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn remove_message(&mut self, nonce: &MessageNonce) {
|
|
||||||
InboundMessages::<T, I>::remove(MessageKey {
|
|
||||||
lane_id: self.lane_id,
|
|
||||||
nonce: *nonce,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Runtime outbound lane storage.
|
/// Runtime outbound lane storage.
|
||||||
|
|||||||
@@ -14,7 +14,7 @@
|
|||||||
// You should have received a copy of the GNU General Public License
|
// You should have received a copy of the GNU General Public License
|
||||||
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
use bp_message_lane::{LaneId, Message, MessageResult, OnMessageReceived};
|
use bp_message_lane::LaneId;
|
||||||
use frame_support::{impl_outer_event, impl_outer_origin, parameter_types, weights::Weight};
|
use frame_support::{impl_outer_event, impl_outer_origin, parameter_types, weights::Weight};
|
||||||
use sp_core::H256;
|
use sp_core::H256;
|
||||||
use sp_runtime::{
|
use sp_runtime::{
|
||||||
@@ -89,32 +89,15 @@ impl Trait for TestRuntime {
|
|||||||
type Event = TestEvent;
|
type Event = TestEvent;
|
||||||
type Payload = TestPayload;
|
type Payload = TestPayload;
|
||||||
type MaxMessagesToPruneAtOnce = MaxMessagesToPruneAtOnce;
|
type MaxMessagesToPruneAtOnce = MaxMessagesToPruneAtOnce;
|
||||||
type OnMessageReceived = TestMessageProcessor;
|
type OnMessageReceived = ();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Lane that we're using in tests.
|
/// Lane that we're using in tests.
|
||||||
pub const TEST_LANE_ID: LaneId = [0, 0, 0, 1];
|
pub const TEST_LANE_ID: LaneId = [0, 0, 0, 1];
|
||||||
|
|
||||||
/// Regular message payload that is not PAYLOAD_TO_QUEUE.
|
/// Regular message payload.
|
||||||
pub const REGULAR_PAYLOAD: TestPayload = 0;
|
pub const REGULAR_PAYLOAD: TestPayload = 0;
|
||||||
|
|
||||||
/// All messages with this payload are queued by TestMessageProcessor.
|
|
||||||
pub const PAYLOAD_TO_QUEUE: TestPayload = 42;
|
|
||||||
|
|
||||||
/// Message processor that immediately handles all messages except messages with PAYLOAD_TO_QUEUE payload.
|
|
||||||
#[derive(Debug, Default)]
|
|
||||||
pub struct TestMessageProcessor;
|
|
||||||
|
|
||||||
impl OnMessageReceived<TestPayload> for TestMessageProcessor {
|
|
||||||
fn on_message_received(&mut self, message: Message<TestPayload>) -> MessageResult<TestPayload> {
|
|
||||||
if message.payload == PAYLOAD_TO_QUEUE {
|
|
||||||
MessageResult::NotProcessed(message)
|
|
||||||
} else {
|
|
||||||
MessageResult::Processed
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Run message lane test.
|
/// Run message lane test.
|
||||||
pub fn run_test<T>(test: impl FnOnce() -> T) -> T {
|
pub fn run_test<T>(test: impl FnOnce() -> T) -> T {
|
||||||
sp_io::TestExternalities::new(Default::default()).execute_with(test)
|
sp_io::TestExternalities::new(Default::default()).execute_with(test)
|
||||||
|
|||||||
@@ -80,25 +80,6 @@ impl<S: OutboundLaneStorage> OutboundLane<S> {
|
|||||||
Some((prev_latest_received_nonce + 1, latest_received_nonce))
|
Some((prev_latest_received_nonce + 1, latest_received_nonce))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Confirm message processing.
|
|
||||||
///
|
|
||||||
/// Returns `None` if confirmation is wrong/duplicate.
|
|
||||||
/// Returns `Some` with inclusive ranges of message nonces that have been processed.
|
|
||||||
pub fn confirm_processing(&mut self, latest_processed_nonce: MessageNonce) -> Option<(MessageNonce, MessageNonce)> {
|
|
||||||
let mut data = self.storage.data();
|
|
||||||
// wait for recieval confirmation first
|
|
||||||
if latest_processed_nonce <= data.latest_processed_nonce || latest_processed_nonce > data.latest_received_nonce
|
|
||||||
{
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
let prev_latest_processed_nonce = data.latest_processed_nonce;
|
|
||||||
data.latest_processed_nonce = latest_processed_nonce;
|
|
||||||
self.storage.set_data(data);
|
|
||||||
|
|
||||||
Some((prev_latest_processed_nonce + 1, latest_processed_nonce))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Prune at most `max_messages_to_prune` already received messages.
|
/// Prune at most `max_messages_to_prune` already received messages.
|
||||||
///
|
///
|
||||||
/// Returns number of pruned messages.
|
/// Returns number of pruned messages.
|
||||||
@@ -191,60 +172,6 @@ mod tests {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn confirm_processing_works() {
|
|
||||||
run_test(|| {
|
|
||||||
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
|
|
||||||
assert_eq!(lane.send_message(REGULAR_PAYLOAD), 1);
|
|
||||||
assert_eq!(lane.send_message(REGULAR_PAYLOAD), 2);
|
|
||||||
assert_eq!(lane.send_message(REGULAR_PAYLOAD), 3);
|
|
||||||
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
|
|
||||||
assert_eq!(lane.storage.data().latest_processed_nonce, 0);
|
|
||||||
assert_eq!(lane.confirm_receival(3), Some((1, 3)));
|
|
||||||
assert_eq!(lane.confirm_processing(2), Some((1, 2)));
|
|
||||||
assert_eq!(lane.storage.data().latest_processed_nonce, 2);
|
|
||||||
assert_eq!(lane.confirm_processing(3), Some((3, 3)));
|
|
||||||
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
|
|
||||||
assert_eq!(lane.storage.data().latest_processed_nonce, 3);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn confirm_processing_rejects_nonce_lesser_than_latest_processed() {
|
|
||||||
run_test(|| {
|
|
||||||
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
|
|
||||||
lane.send_message(REGULAR_PAYLOAD);
|
|
||||||
lane.send_message(REGULAR_PAYLOAD);
|
|
||||||
lane.send_message(REGULAR_PAYLOAD);
|
|
||||||
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
|
|
||||||
assert_eq!(lane.storage.data().latest_processed_nonce, 0);
|
|
||||||
assert_eq!(lane.confirm_receival(3), Some((1, 3)));
|
|
||||||
assert_eq!(lane.confirm_processing(3), Some((1, 3)));
|
|
||||||
assert_eq!(lane.confirm_processing(3), None);
|
|
||||||
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
|
|
||||||
assert_eq!(lane.storage.data().latest_processed_nonce, 3);
|
|
||||||
|
|
||||||
assert_eq!(lane.confirm_processing(2), None);
|
|
||||||
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
|
|
||||||
assert_eq!(lane.storage.data().latest_processed_nonce, 3);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn confirm_processing_rejects_nonce_larger_than_last_received() {
|
|
||||||
run_test(|| {
|
|
||||||
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
|
|
||||||
lane.send_message(REGULAR_PAYLOAD);
|
|
||||||
lane.send_message(REGULAR_PAYLOAD);
|
|
||||||
lane.send_message(REGULAR_PAYLOAD);
|
|
||||||
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
|
|
||||||
assert_eq!(lane.storage.data().latest_processed_nonce, 0);
|
|
||||||
assert_eq!(lane.confirm_processing(2), None);
|
|
||||||
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
|
|
||||||
assert_eq!(lane.storage.data().latest_processed_nonce, 0);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn prune_messages_works() {
|
fn prune_messages_works() {
|
||||||
run_test(|| {
|
run_test(|| {
|
||||||
|
|||||||
@@ -49,45 +49,26 @@ pub struct Message<Payload> {
|
|||||||
pub payload: Payload,
|
pub payload: Payload,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Message processing result.
|
|
||||||
pub enum MessageResult<Payload> {
|
|
||||||
/// Message has been processed and should not be queued.
|
|
||||||
Processed,
|
|
||||||
/// Message has NOT been processed and should be queued for processing later.
|
|
||||||
NotProcessed(Message<Payload>),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Called when inbound message is received.
|
/// Called when inbound message is received.
|
||||||
pub trait OnMessageReceived<Payload> {
|
pub trait OnMessageReceived<Payload> {
|
||||||
/// Called when inbound message is received.
|
/// Called when inbound message is received.
|
||||||
///
|
///
|
||||||
/// It is up to the implementers of this trait to determine whether the message
|
/// It is up to the implementers of this trait to determine whether the message
|
||||||
/// is invalid (i.e. improperly encoded, has too large weight, ...) or not. And,
|
/// is invalid (i.e. improperly encoded, has too large weight, ...) or not.
|
||||||
/// if message is invalid, then it should be dropped immediately (by returning
|
fn on_message_received(message: Message<Payload>);
|
||||||
/// `MessageResult::Processed`), or it'll block the lane forever.
|
}
|
||||||
fn on_message_received(&mut self, message: Message<Payload>) -> MessageResult<Payload>;
|
|
||||||
|
impl<Payload> OnMessageReceived<Payload> for () {
|
||||||
|
fn on_message_received(_message: Message<Payload>) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Inbound lane data.
|
/// Inbound lane data.
|
||||||
#[derive(Encode, Decode, Clone)]
|
#[derive(Default, Encode, Decode, Clone)]
|
||||||
pub struct InboundLaneData {
|
pub struct InboundLaneData {
|
||||||
/// Nonce of oldest message that we haven't processed yet. May point to not-yet-received message if
|
|
||||||
/// lane is currently empty.
|
|
||||||
pub oldest_unprocessed_nonce: MessageNonce,
|
|
||||||
/// Nonce of latest message that we have received from bridged chain.
|
/// Nonce of latest message that we have received from bridged chain.
|
||||||
pub latest_received_nonce: MessageNonce,
|
pub latest_received_nonce: MessageNonce,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for InboundLaneData {
|
|
||||||
fn default() -> Self {
|
|
||||||
InboundLaneData {
|
|
||||||
// it is 1 because we're processing everything in [oldest_unprocessed_nonce; latest_received_nonce]
|
|
||||||
oldest_unprocessed_nonce: 1,
|
|
||||||
latest_received_nonce: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Outbound lane data.
|
/// Outbound lane data.
|
||||||
#[derive(Encode, Decode, Clone)]
|
#[derive(Encode, Decode, Clone)]
|
||||||
pub struct OutboundLaneData {
|
pub struct OutboundLaneData {
|
||||||
@@ -96,8 +77,6 @@ pub struct OutboundLaneData {
|
|||||||
pub oldest_unpruned_nonce: MessageNonce,
|
pub oldest_unpruned_nonce: MessageNonce,
|
||||||
/// Nonce of latest message, received by bridged chain.
|
/// Nonce of latest message, received by bridged chain.
|
||||||
pub latest_received_nonce: MessageNonce,
|
pub latest_received_nonce: MessageNonce,
|
||||||
/// Nonce of latest message, processed by bridged chain.
|
|
||||||
pub latest_processed_nonce: MessageNonce,
|
|
||||||
/// Nonce of latest message, generated by us.
|
/// Nonce of latest message, generated by us.
|
||||||
pub latest_generated_nonce: MessageNonce,
|
pub latest_generated_nonce: MessageNonce,
|
||||||
}
|
}
|
||||||
@@ -108,7 +87,6 @@ impl Default for OutboundLaneData {
|
|||||||
// it is 1 because we're pruning everything in [oldest_unpruned_nonce; latest_received_nonce]
|
// it is 1 because we're pruning everything in [oldest_unpruned_nonce; latest_received_nonce]
|
||||||
oldest_unpruned_nonce: 1,
|
oldest_unpruned_nonce: 1,
|
||||||
latest_received_nonce: 0,
|
latest_received_nonce: 0,
|
||||||
latest_processed_nonce: 0,
|
|
||||||
latest_generated_nonce: 0,
|
latest_generated_nonce: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -116,11 +94,9 @@ impl Default for OutboundLaneData {
|
|||||||
|
|
||||||
decl_runtime_apis! {
|
decl_runtime_apis! {
|
||||||
/// Outbound message lane API.
|
/// Outbound message lane API.
|
||||||
pub trait OutboundLaneApi<Payload: Decode> {
|
pub trait OutboundLaneApi {
|
||||||
/// Returns nonce of the latest message, received by bridged chain.
|
/// Returns nonce of the latest message, received by bridged chain.
|
||||||
fn latest_received_nonce(lane: LaneId) -> MessageNonce;
|
fn latest_received_nonce(lane: LaneId) -> MessageNonce;
|
||||||
/// Returns nonce of the latest message, processed by bridged chain.
|
|
||||||
fn latest_processed_nonce(lane: LaneId) -> MessageNonce;
|
|
||||||
/// Returns nonce of the latest message, generated by given lane.
|
/// Returns nonce of the latest message, generated by given lane.
|
||||||
fn latest_generated_nonce(lane: LaneId) -> MessageNonce;
|
fn latest_generated_nonce(lane: LaneId) -> MessageNonce;
|
||||||
}
|
}
|
||||||
@@ -129,7 +105,5 @@ decl_runtime_apis! {
|
|||||||
pub trait InboundLaneApi {
|
pub trait InboundLaneApi {
|
||||||
/// Returns nonce of the latest message, received by given lane.
|
/// Returns nonce of the latest message, received by given lane.
|
||||||
fn latest_received_nonce(lane: LaneId) -> MessageNonce;
|
fn latest_received_nonce(lane: LaneId) -> MessageNonce;
|
||||||
/// Returns nonce of the latest message, processed by given lane.
|
|
||||||
fn latest_processed_nonce(lane: LaneId) -> MessageNonce;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user