Initial version of DummyOrdered pallet (#299)

* initial commit of DummyOrdered (aka message-lane) pallet

* API for relay

* cargo fmt --all

* some clippy + no_std

* more clippy + no_std

* inbound lane tests

* outbound lane tests

* cargo fmt --all

* prune old messages whenever outbound lane is updated

* do not care about MessageNonce overflow

* cargo fmt --all

* update crate docs

* MaxHeadersToPruneAtOnce -> MaxMessagesToPruneAtOnce

* MessageAction -> MessageResult

* cargo fmt --all

* fire MessageAccepted + MessagesDelivered

* confirm message processing

* cargo fmt --all

* clippy

* cargo fmt again

* Update modules/message-lane/src/lib.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* Update modules/message-lane/src/lib.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* use crate::*

* cargo fmt --all

* Storage -> S

* Update modules/message-lane/src/outbound_lane.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* add method doc

* Update modules/message-lane/src/inbound_lane.rs

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>

* added detailed module docs

* Update modules/message-lane/src/lib.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* updated OnMessageReceived docs

* prune only when new message is sent

* removed #![warn(missing_docs)]

* fixed merge with overlapped PR

Co-authored-by: Hernando Castano <HCastano@users.noreply.github.com>
Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
This commit is contained in:
Svyatoslav Nikolsky
2020-08-31 11:14:12 +03:00
committed by Bastian Köcher
parent 5e86447d3e
commit f6d45a38da
8 changed files with 1130 additions and 0 deletions
+5
View File
@@ -50,6 +50,11 @@ version = "0.1.0"
default-features = false
path = "../../../modules/currency-exchange"
[dependencies.pallet-message-lane]
version = "0.1.0"
default-features = false
path = "../../../modules/message-lane"
[dependencies.frame-support]
version = "2.0.0-rc6"
tag = 'v2.0.0-rc6'
+56
View File
@@ -0,0 +1,56 @@
[package]
name = "pallet-message-lane"
description = "Module that allows bridged chains to exchange messages using lane concept."
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
bp-message-lane = { path = "../../primitives/message-lane", default-features = false }
codec = { package = "parity-scale-codec", version = "1.3.1", default-features = false }
# Substrate Based Dependencies
[dependencies.frame-support]
version = "2.0.0-rc6"
tag = 'v2.0.0-rc6'
default-features = false
git = "https://github.com/paritytech/substrate/"
[dependencies.frame-system]
version = "2.0.0-rc6"
tag = 'v2.0.0-rc6'
default-features = false
git = "https://github.com/paritytech/substrate/"
[dependencies.sp-std]
version = "2.0.0-rc6"
tag = 'v2.0.0-rc6'
default-features = false
git = "https://github.com/paritytech/substrate/"
[dev-dependencies.sp-core]
version = "2.0.0-rc6"
tag = 'v2.0.0-rc6'
git = "https://github.com/paritytech/substrate/"
[dev-dependencies.sp-io]
version = "2.0.0-rc6"
tag = 'v2.0.0-rc6'
git = "https://github.com/paritytech/substrate/"
[dev-dependencies.sp-runtime]
version = "2.0.0-rc6"
tag = 'v2.0.0-rc6'
git = "https://github.com/paritytech/substrate/"
[features]
default = ["std"]
std = [
"bp-message-lane/std",
"codec/std",
"frame-support/std",
"frame-system/std",
"sp-std/std"
]
@@ -0,0 +1,236 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Everything about incoming messages receival.
use bp_message_lane::{InboundLaneData, LaneId, Message, MessageKey, MessageNonce, MessageResult, OnMessageReceived};
/// Inbound lane storage.
pub trait InboundLaneStorage {
/// Message payload.
type Payload;
/// Lane id.
fn id(&self) -> LaneId;
/// Get lane data from the storage.
fn data(&self) -> InboundLaneData;
/// Update lane data in the storage.
fn set_data(&mut self, data: InboundLaneData);
/// Returns saved inbound message payload.
fn message(&self, nonce: &MessageNonce) -> Option<Self::Payload>;
/// Save inbound message in the storage.
fn save_message(&mut self, nonce: MessageNonce, payload: Self::Payload);
/// Remove inbound message from the storage.
fn remove_message(&mut self, nonce: &MessageNonce);
}
/// Inbound messages lane.
pub struct InboundLane<S> {
storage: S,
}
impl<S: InboundLaneStorage> InboundLane<S> {
/// Create new inbound lane backed by given storage.
pub fn new(storage: S) -> Self {
InboundLane { storage }
}
/// Receive new message.
pub fn receive_message(
&mut self,
nonce: MessageNonce,
payload: S::Payload,
processor: &mut impl OnMessageReceived<S::Payload>,
) -> bool {
let mut data = self.storage.data();
let is_correct_message = nonce == data.latest_received_nonce + 1;
if !is_correct_message {
return false;
}
let is_process_required = is_correct_message && data.oldest_unprocessed_nonce == nonce;
data.latest_received_nonce = nonce;
self.storage.set_data(data);
let payload_to_save = match is_process_required {
true => {
let message = Message {
key: MessageKey {
lane_id: self.storage.id(),
nonce,
},
payload,
};
match processor.on_message_received(message) {
MessageResult::Processed => None,
MessageResult::NotProcessed(message) => Some(message.payload),
}
}
false => Some(payload),
};
if let Some(payload_to_save) = payload_to_save {
self.storage.save_message(nonce, payload_to_save);
}
true
}
/// Process stored lane messages.
///
/// Stops processing either when all messages are processed, or when processor returns
/// MessageResult::NotProcessed.
pub fn process_messages(&mut self, processor: &mut impl OnMessageReceived<S::Payload>) {
let mut anything_processed = false;
let mut data = self.storage.data();
while data.oldest_unprocessed_nonce <= data.latest_received_nonce {
let nonce = data.oldest_unprocessed_nonce;
let payload = self
.storage
.message(&nonce)
.expect("message is referenced by lane; referenced message is not pruned; qed");
let message = Message {
key: MessageKey {
lane_id: self.storage.id(),
nonce,
},
payload,
};
let process_result = processor.on_message_received(message);
if let MessageResult::NotProcessed(_) = process_result {
break;
}
self.storage.remove_message(&nonce);
anything_processed = true;
data.oldest_unprocessed_nonce += 1;
}
if anything_processed {
self.storage.set_data(data);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
inbound_lane,
mock::{
run_test, TestMessageProcessor, TestPayload, TestRuntime, PAYLOAD_TO_QUEUE, REGULAR_PAYLOAD, TEST_LANE_ID,
},
};
#[test]
fn fails_to_receive_message_with_incorrect_nonce() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert!(!lane.receive_message(10, REGULAR_PAYLOAD, &mut TestMessageProcessor));
assert!(lane.storage.message(&10).is_none());
assert_eq!(lane.storage.data().latest_received_nonce, 0);
});
}
#[test]
fn correct_message_is_queued_if_some_other_messages_are_queued() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert!(lane.receive_message(1, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
assert!(lane.storage.message(&1).is_some());
assert!(lane.receive_message(2, REGULAR_PAYLOAD, &mut TestMessageProcessor));
assert!(lane.storage.message(&2).is_some());
assert_eq!(lane.storage.data().latest_received_nonce, 2);
});
}
#[test]
fn correct_message_is_queued_if_processor_wants_to_queue() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert!(lane.receive_message(1, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
assert!(lane.storage.message(&1).is_some());
assert_eq!(lane.storage.data().latest_received_nonce, 1);
});
}
#[test]
fn correct_message_is_not_queued_if_processed_instantly() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert!(lane.receive_message(1, REGULAR_PAYLOAD, &mut TestMessageProcessor));
assert!(lane.storage.message(&1).is_none());
assert_eq!(lane.storage.data().latest_received_nonce, 1);
});
}
#[test]
fn process_message_does_nothing_when_lane_is_empty() {
run_test(|| {
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 1);
lane.process_messages(&mut TestMessageProcessor);
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 1);
});
}
#[test]
fn process_message_works() {
run_test(|| {
pub struct QueueByNonce(MessageNonce);
impl OnMessageReceived<TestPayload> for QueueByNonce {
fn on_message_received(&mut self, message: Message<TestPayload>) -> MessageResult<TestPayload> {
if message.key.nonce == self.0 {
MessageResult::NotProcessed(message)
} else {
MessageResult::Processed
}
}
}
let mut lane = inbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert!(lane.receive_message(1, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
assert!(lane.receive_message(2, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
assert!(lane.receive_message(3, PAYLOAD_TO_QUEUE, &mut TestMessageProcessor));
assert!(lane.receive_message(4, REGULAR_PAYLOAD, &mut TestMessageProcessor));
assert!(lane.storage.message(&1).is_some());
assert!(lane.storage.message(&2).is_some());
assert!(lane.storage.message(&3).is_some());
assert!(lane.storage.message(&4).is_some());
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 1);
lane.process_messages(&mut QueueByNonce(3));
assert!(lane.storage.message(&1).is_none());
assert!(lane.storage.message(&2).is_none());
assert!(lane.storage.message(&3).is_some());
assert!(lane.storage.message(&4).is_some());
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 3);
lane.process_messages(&mut QueueByNonce(10));
assert!(lane.storage.message(&1).is_none());
assert!(lane.storage.message(&2).is_none());
assert!(lane.storage.message(&3).is_none());
assert!(lane.storage.message(&4).is_none());
assert_eq!(lane.storage.data().oldest_unprocessed_nonce, 5);
});
}
}
+281
View File
@@ -0,0 +1,281 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Runtime module that allows sending and receiving messages using lane concept:
//!
//! 1) the message is sent using `send_message()` call;
//! 2) every outbound message is assigned nonce;
//! 3) the messages are stored in the storage;
//! 4) external component (relay) delivers messages to bridged chain;
//! 5) messages are processed in order (ordered by assigned nonce);
//! 6) relay may send proof-of-receiving and proof-of-processing back to this chain.
//!
//! Once message is sent, its progress can be tracked by looking at module events.
//! The assigned nonce is reported using `MessageAccepted` event. When message is
//! accepted by the bridged chain, `MessagesDelivered` is fired. When message is
//! processedby the bridged chain, `MessagesProcessed` by the bridged chain.
#![cfg_attr(not(feature = "std"), no_std)]
use crate::inbound_lane::{InboundLane, InboundLaneStorage};
use crate::outbound_lane::{OutboundLane, OutboundLaneStorage};
use bp_message_lane::{
InboundLaneData, LaneId, Message, MessageKey, MessageNonce, OnMessageReceived, OutboundLaneData,
};
use frame_support::{decl_event, decl_module, decl_storage, traits::Get, Parameter, StorageMap};
use frame_system::ensure_signed;
use sp_std::{marker::PhantomData, prelude::*};
mod inbound_lane;
mod outbound_lane;
#[cfg(test)]
mod mock;
/// The module configuration trait
pub trait Trait<I = DefaultInstance>: frame_system::Trait {
/// They overarching event type.
type Event: From<Event> + Into<<Self as frame_system::Trait>::Event>;
/// Message payload.
type Payload: Parameter;
/// Maximal number of messages that may be pruned during maintenance. Maintenance occurs
/// whenever outbound lane is updated - i.e. when new message is sent, or receival is
/// confirmed. The reason is that if you want to use lane, you should be ready to pay
/// for it.
type MaxMessagesToPruneAtOnce: Get<MessageNonce>;
/// Called when message has been received.
type OnMessageReceived: Default + OnMessageReceived<Self::Payload>;
}
decl_storage! {
trait Store for Module<T: Trait<I>, I: Instance = DefaultInstance> as MessageLane {
/// Map of lane id => inbound lane data.
InboundLanes: map hasher(blake2_128_concat) LaneId => InboundLaneData;
/// All stored (unprocessed) inbound messages.
InboundMessages: map hasher(blake2_128_concat) MessageKey => Option<T::Payload>;
/// Map of lane id => outbound lane data.
OutboundLanes: map hasher(blake2_128_concat) LaneId => OutboundLaneData;
/// All queued outbound messages.
OutboundMessages: map hasher(blake2_128_concat) MessageKey => Option<T::Payload>;
}
}
decl_event!(
pub enum Event {
/// Message has been accepted and is waiting to be delivered.
MessageAccepted(LaneId, MessageNonce),
/// Messages in the inclusive range have been delivered to the bridged chain.
MessagesDelivered(LaneId, MessageNonce, MessageNonce),
/// Messages in the inclusive range have been processed by the bridged chain.
MessagesProcessed(LaneId, MessageNonce, MessageNonce),
}
);
decl_module! {
pub struct Module<T: Trait<I>, I: Instance = DefaultInstance> for enum Call where origin: T::Origin {
/// Deposit one of this module's events by using the default implementation.
fn deposit_event() = default;
/// Send message over lane.
#[weight = 0] // TODO: update me (https://github.com/paritytech/parity-bridges-common/issues/78)
pub fn send_message(
origin,
lane_id: LaneId,
payload: T::Payload,
) {
let _ = ensure_signed(origin)?;
let mut lane = outbound_lane::<T, I>(lane_id);
let nonce = lane.send_message(payload);
lane.prune_messages(T::MaxMessagesToPruneAtOnce::get());
Self::deposit_event(Event::MessageAccepted(lane_id, nonce));
}
}
}
impl<T: Trait<I>, I: Instance> Module<T, I> {
// =========================================================================================
// === Exposed mutables ====================================================================
// =========================================================================================
/// Receive new TRUSTED lane messages.
///
/// Trusted here means that the function itself doesn't check whether message has actually
/// been sent through the other end of the channel. We only check that we are receiving
/// and processing messages in order here.
///
/// Messages vector is required to be sorted by nonce within each lane. Otherise messages
/// will be rejected.
pub fn receive_messages(messages: Vec<Message<T::Payload>>) -> MessageNonce {
let mut correct_messages = 0;
let mut processor = T::OnMessageReceived::default();
for message in messages {
let mut lane = inbound_lane::<T, I>(message.key.lane_id);
if lane.receive_message(message.key.nonce, message.payload, &mut processor) {
correct_messages += 1;
}
}
correct_messages
}
/// Process stored lane messages.
///
/// Stops processing either when all messages are processed, or when processor returns
/// MessageResult::NotProcessed.
pub fn process_lane_messages(lane_id: &LaneId, processor: &mut impl OnMessageReceived<T::Payload>) {
inbound_lane::<T, I>(*lane_id).process_messages(processor);
}
/// Receive TRUSTED proof of message receival.
///
/// Trusted here means that the function itself doesn't check whether the bridged chain has
/// actually received these messages.
///
/// The caller may break the channel by providing `latest_received_nonce` that is larger
/// than actual one. Not-yet-sent messages may be pruned in this case.
pub fn confirm_receival(lane_id: &LaneId, latest_received_nonce: MessageNonce) {
let mut lane = outbound_lane::<T, I>(*lane_id);
let received_range = lane.confirm_receival(latest_received_nonce);
if let Some(received_range) = received_range {
Self::deposit_event(Event::MessagesDelivered(*lane_id, received_range.0, received_range.1));
}
}
/// Receive TRUSTED proof of message processing.
///
/// Trusted here means that the function itself doesn't check whether the bridged chain has
/// actually processed these messages.
pub fn confirm_processing(lane_id: &LaneId, latest_processed_nonce: MessageNonce) {
let mut lane = outbound_lane::<T, I>(*lane_id);
let processed_range = lane.confirm_processing(latest_processed_nonce);
if let Some(processed_range) = processed_range {
Self::deposit_event(Event::MessagesProcessed(*lane_id, processed_range.0, processed_range.1));
}
}
}
/// Creates new inbound lane object, backed by runtime storage.
fn inbound_lane<T: Trait<I>, I: Instance>(lane_id: LaneId) -> InboundLane<RuntimeInboundLaneStorage<T, I>> {
InboundLane::new(RuntimeInboundLaneStorage {
lane_id,
_phantom: Default::default(),
})
}
/// Creates new outbound lane object, backed by runtime storage.
fn outbound_lane<T: Trait<I>, I: Instance>(lane_id: LaneId) -> OutboundLane<RuntimeOutboundLaneStorage<T, I>> {
OutboundLane::new(RuntimeOutboundLaneStorage {
lane_id,
_phantom: Default::default(),
})
}
/// Runtime inbound lane storage.
struct RuntimeInboundLaneStorage<T, I = DefaultInstance> {
lane_id: LaneId,
_phantom: PhantomData<(T, I)>,
}
impl<T: Trait<I>, I: Instance> InboundLaneStorage for RuntimeInboundLaneStorage<T, I> {
type Payload = T::Payload;
fn id(&self) -> LaneId {
self.lane_id
}
fn data(&self) -> InboundLaneData {
InboundLanes::<I>::get(&self.lane_id)
}
fn set_data(&mut self, data: InboundLaneData) {
InboundLanes::<I>::insert(&self.lane_id, data)
}
fn message(&self, nonce: &MessageNonce) -> Option<Self::Payload> {
InboundMessages::<T, I>::get(MessageKey {
lane_id: self.lane_id,
nonce: *nonce,
})
}
fn save_message(&mut self, nonce: MessageNonce, payload: T::Payload) {
InboundMessages::<T, I>::insert(
MessageKey {
lane_id: self.lane_id,
nonce,
},
payload,
);
}
fn remove_message(&mut self, nonce: &MessageNonce) {
InboundMessages::<T, I>::remove(MessageKey {
lane_id: self.lane_id,
nonce: *nonce,
});
}
}
/// Runtime outbound lane storage.
struct RuntimeOutboundLaneStorage<T, I = DefaultInstance> {
lane_id: LaneId,
_phantom: PhantomData<(T, I)>,
}
impl<T: Trait<I>, I: Instance> OutboundLaneStorage for RuntimeOutboundLaneStorage<T, I> {
type Payload = T::Payload;
fn id(&self) -> LaneId {
self.lane_id
}
fn data(&self) -> OutboundLaneData {
OutboundLanes::<I>::get(&self.lane_id)
}
fn set_data(&mut self, data: OutboundLaneData) {
OutboundLanes::<I>::insert(&self.lane_id, data)
}
#[cfg(test)]
fn message(&self, nonce: &MessageNonce) -> Option<Self::Payload> {
OutboundMessages::<T, I>::get(MessageKey {
lane_id: self.lane_id,
nonce: *nonce,
})
}
fn save_message(&mut self, nonce: MessageNonce, payload: T::Payload) {
OutboundMessages::<T, I>::insert(
MessageKey {
lane_id: self.lane_id,
nonce,
},
payload,
);
}
fn remove_message(&mut self, nonce: &MessageNonce) {
OutboundMessages::<T, I>::remove(MessageKey {
lane_id: self.lane_id,
nonce: *nonce,
});
}
}
+121
View File
@@ -0,0 +1,121 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use bp_message_lane::{LaneId, Message, MessageResult, OnMessageReceived};
use frame_support::{impl_outer_event, impl_outer_origin, parameter_types, weights::Weight};
use sp_core::H256;
use sp_runtime::{
testing::Header as SubstrateHeader,
traits::{BlakeTwo256, IdentityLookup},
Perbill,
};
use crate::Trait;
pub type AccountId = u64;
pub type TestPayload = u64;
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct TestRuntime;
mod message_lane {
pub use crate::Event;
}
impl_outer_event! {
pub enum TestEvent for TestRuntime {
frame_system<T>,
message_lane,
}
}
impl_outer_origin! {
pub enum Origin for TestRuntime where system = frame_system {}
}
parameter_types! {
pub const BlockHashCount: u64 = 250;
pub const MaximumBlockWeight: Weight = 1024;
pub const MaximumBlockLength: u32 = 2 * 1024;
pub const AvailableBlockRatio: Perbill = Perbill::one();
}
impl frame_system::Trait for TestRuntime {
type Origin = Origin;
type Index = u64;
type Call = ();
type BlockNumber = u64;
type Hash = H256;
type Hashing = BlakeTwo256;
type AccountId = AccountId;
type Lookup = IdentityLookup<Self::AccountId>;
type Header = SubstrateHeader;
type Event = TestEvent;
type BlockHashCount = BlockHashCount;
type MaximumBlockWeight = MaximumBlockWeight;
type DbWeight = ();
type BlockExecutionWeight = ();
type ExtrinsicBaseWeight = ();
type MaximumExtrinsicWeight = ();
type AvailableBlockRatio = AvailableBlockRatio;
type MaximumBlockLength = MaximumBlockLength;
type Version = ();
type ModuleToIndex = ();
type AccountData = ();
type OnNewAccount = ();
type OnKilledAccount = ();
type BaseCallFilter = ();
type SystemWeightInfo = ();
}
parameter_types! {
pub const MaxMessagesToPruneAtOnce: u64 = 10;
}
impl Trait for TestRuntime {
type Event = TestEvent;
type Payload = TestPayload;
type MaxMessagesToPruneAtOnce = MaxMessagesToPruneAtOnce;
type OnMessageReceived = TestMessageProcessor;
}
/// Lane that we're using in tests.
pub const TEST_LANE_ID: LaneId = [0, 0, 0, 1];
/// Regular message payload that is not PAYLOAD_TO_QUEUE.
pub const REGULAR_PAYLOAD: TestPayload = 0;
/// All messages with this payload are queued by TestMessageProcessor.
pub const PAYLOAD_TO_QUEUE: TestPayload = 42;
/// Message processor that immediately handles all messages except messages with PAYLOAD_TO_QUEUE payload.
#[derive(Debug, Default)]
pub struct TestMessageProcessor;
impl OnMessageReceived<TestPayload> for TestMessageProcessor {
fn on_message_received(&mut self, message: Message<TestPayload>) -> MessageResult<TestPayload> {
if message.payload == PAYLOAD_TO_QUEUE {
MessageResult::NotProcessed(message)
} else {
MessageResult::Processed
}
}
}
/// Run message lane test.
pub fn run_test<T>(test: impl FnOnce() -> T) -> T {
sp_io::TestExternalities::new(Default::default()).execute_with(test)
}
@@ -0,0 +1,271 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Everything about outgoing messages sending.
use bp_message_lane::{LaneId, MessageNonce, OutboundLaneData};
/// Outbound lane storage.
pub trait OutboundLaneStorage {
/// Message payload.
type Payload;
/// Lane id.
fn id(&self) -> LaneId;
/// Get lane data from the storage.
fn data(&self) -> OutboundLaneData;
/// Update lane data in the storage.
fn set_data(&mut self, data: OutboundLaneData);
/// Returns saved outbound message payload.
#[cfg(test)]
fn message(&self, nonce: &MessageNonce) -> Option<Self::Payload>;
/// Save outbound message in the storage.
fn save_message(&mut self, nonce: MessageNonce, payload: Self::Payload);
/// Remove outbound message from the storage.
fn remove_message(&mut self, nonce: &MessageNonce);
}
/// Outbound messages lane.
pub struct OutboundLane<S> {
storage: S,
}
impl<S: OutboundLaneStorage> OutboundLane<S> {
/// Create new inbound lane backed by given storage.
pub fn new(storage: S) -> Self {
OutboundLane { storage }
}
/// Send message over lane.
///
/// Returns new message nonce.
pub fn send_message(&mut self, payload: S::Payload) -> MessageNonce {
let mut data = self.storage.data();
let nonce = data.latest_generated_nonce + 1;
data.latest_generated_nonce = nonce;
self.storage.save_message(nonce, payload);
self.storage.set_data(data);
nonce
}
/// Confirm message receival.
///
/// Returns `None` if confirmation is wrong/duplicate.
/// Returns `Some` with inclusive ranges of message nonces that have been received.
pub fn confirm_receival(&mut self, latest_received_nonce: MessageNonce) -> Option<(MessageNonce, MessageNonce)> {
let mut data = self.storage.data();
if latest_received_nonce <= data.latest_received_nonce || latest_received_nonce > data.latest_generated_nonce {
return None;
}
let prev_latest_received_nonce = data.latest_received_nonce;
data.latest_received_nonce = latest_received_nonce;
self.storage.set_data(data);
Some((prev_latest_received_nonce + 1, latest_received_nonce))
}
/// Confirm message processing.
///
/// Returns `None` if confirmation is wrong/duplicate.
/// Returns `Some` with inclusive ranges of message nonces that have been processed.
pub fn confirm_processing(&mut self, latest_processed_nonce: MessageNonce) -> Option<(MessageNonce, MessageNonce)> {
let mut data = self.storage.data();
// wait for recieval confirmation first
if latest_processed_nonce <= data.latest_processed_nonce || latest_processed_nonce > data.latest_received_nonce
{
return None;
}
let prev_latest_processed_nonce = data.latest_processed_nonce;
data.latest_processed_nonce = latest_processed_nonce;
self.storage.set_data(data);
Some((prev_latest_processed_nonce + 1, latest_processed_nonce))
}
/// Prune at most `max_messages_to_prune` already received messages.
///
/// Returns number of pruned messages.
pub fn prune_messages(&mut self, max_messages_to_prune: MessageNonce) -> MessageNonce {
let mut pruned_messages = 0;
let mut anything_changed = false;
let mut data = self.storage.data();
while pruned_messages < max_messages_to_prune && data.oldest_unpruned_nonce <= data.latest_received_nonce {
self.storage.remove_message(&data.oldest_unpruned_nonce);
anything_changed = true;
pruned_messages += 1;
data.oldest_unpruned_nonce += 1;
}
if anything_changed {
self.storage.set_data(data);
}
pruned_messages
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
mock::{run_test, TestRuntime, REGULAR_PAYLOAD, TEST_LANE_ID},
outbound_lane,
};
#[test]
fn send_message_works() {
run_test(|| {
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert_eq!(lane.storage.data().latest_generated_nonce, 0);
assert_eq!(lane.send_message(REGULAR_PAYLOAD), 1);
assert!(lane.storage.message(&1).is_some());
assert_eq!(lane.storage.data().latest_generated_nonce, 1);
});
}
#[test]
fn confirm_receival_works() {
run_test(|| {
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert_eq!(lane.send_message(REGULAR_PAYLOAD), 1);
assert_eq!(lane.send_message(REGULAR_PAYLOAD), 2);
assert_eq!(lane.send_message(REGULAR_PAYLOAD), 3);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 0);
assert_eq!(lane.confirm_receival(3), Some((1, 3)));
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
});
}
#[test]
fn confirm_receival_rejects_nonce_lesser_than_latest_received() {
run_test(|| {
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
lane.send_message(REGULAR_PAYLOAD);
lane.send_message(REGULAR_PAYLOAD);
lane.send_message(REGULAR_PAYLOAD);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 0);
assert_eq!(lane.confirm_receival(3), Some((1, 3)));
assert_eq!(lane.confirm_receival(3), None);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
assert_eq!(lane.confirm_receival(2), None);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 3);
});
}
#[test]
fn confirm_receival_rejects_nonce_larger_than_last_generated() {
run_test(|| {
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
lane.send_message(REGULAR_PAYLOAD);
lane.send_message(REGULAR_PAYLOAD);
lane.send_message(REGULAR_PAYLOAD);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 0);
assert_eq!(lane.confirm_receival(10), None);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_received_nonce, 0);
});
}
#[test]
fn confirm_processing_works() {
run_test(|| {
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
assert_eq!(lane.send_message(REGULAR_PAYLOAD), 1);
assert_eq!(lane.send_message(REGULAR_PAYLOAD), 2);
assert_eq!(lane.send_message(REGULAR_PAYLOAD), 3);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_processed_nonce, 0);
assert_eq!(lane.confirm_receival(3), Some((1, 3)));
assert_eq!(lane.confirm_processing(2), Some((1, 2)));
assert_eq!(lane.storage.data().latest_processed_nonce, 2);
assert_eq!(lane.confirm_processing(3), Some((3, 3)));
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_processed_nonce, 3);
});
}
#[test]
fn confirm_processing_rejects_nonce_lesser_than_latest_processed() {
run_test(|| {
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
lane.send_message(REGULAR_PAYLOAD);
lane.send_message(REGULAR_PAYLOAD);
lane.send_message(REGULAR_PAYLOAD);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_processed_nonce, 0);
assert_eq!(lane.confirm_receival(3), Some((1, 3)));
assert_eq!(lane.confirm_processing(3), Some((1, 3)));
assert_eq!(lane.confirm_processing(3), None);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_processed_nonce, 3);
assert_eq!(lane.confirm_processing(2), None);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_processed_nonce, 3);
});
}
#[test]
fn confirm_processing_rejects_nonce_larger_than_last_received() {
run_test(|| {
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
lane.send_message(REGULAR_PAYLOAD);
lane.send_message(REGULAR_PAYLOAD);
lane.send_message(REGULAR_PAYLOAD);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_processed_nonce, 0);
assert_eq!(lane.confirm_processing(2), None);
assert_eq!(lane.storage.data().latest_generated_nonce, 3);
assert_eq!(lane.storage.data().latest_processed_nonce, 0);
});
}
#[test]
fn prune_messages_works() {
run_test(|| {
let mut lane = outbound_lane::<TestRuntime, _>(TEST_LANE_ID);
// when lane is empty, nothing is pruned
assert_eq!(lane.prune_messages(100), 0);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
// when nothing is confirmed, nothing is pruned
lane.send_message(REGULAR_PAYLOAD);
lane.send_message(REGULAR_PAYLOAD);
lane.send_message(REGULAR_PAYLOAD);
assert_eq!(lane.prune_messages(100), 0);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 1);
// after confirmation, some messages are received
assert_eq!(lane.confirm_receival(2), Some((1, 2)));
assert_eq!(lane.prune_messages(100), 2);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 3);
// after last message is confirmed, everything is pruned
assert_eq!(lane.confirm_receival(3), Some((3, 3)));
assert_eq!(lane.prune_messages(100), 1);
assert_eq!(lane.storage.data().oldest_unpruned_nonce, 4);
});
}
}
@@ -0,0 +1,25 @@
[package]
name = "bp-message-lane"
description = "Primitives of message lane module."
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
codec = { package = "parity-scale-codec", version = "1.3.1", default-features = false, features = ["derive"] }
# Substrate Based Dependencies
[dependencies.sp-api]
version = "2.0.0-rc6"
tag = 'v2.0.0-rc6'
default-features = false
git = "https://github.com/paritytech/substrate.git"
[features]
default = ["std"]
std = [
"codec/std",
"sp-api/std"
]
+135
View File
@@ -0,0 +1,135 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Primitives for sending and receiving Substrate <-> Substrate messages.
#![cfg_attr(not(feature = "std"), no_std)]
// RuntimeApi generated functions
#![allow(clippy::too_many_arguments)]
// Generated by `DecodeLimit::decode_with_depth_limit`
#![allow(clippy::unnecessary_mut_passed)]
use codec::{Decode, Encode};
use sp_api::decl_runtime_apis;
/// Lane identifier.
pub type LaneId = [u8; 4];
/// Message nonce. Valid messages will never have 0 nonce.
pub type MessageNonce = u64;
/// Message key (unique message identifier) as it is stored in the storage.
#[derive(Encode, Decode, Clone)]
pub struct MessageKey {
/// ID of the message lane.
pub lane_id: LaneId,
/// Message nonce.
pub nonce: MessageNonce,
}
/// Message as it is stored in the storage.
#[derive(Encode, Decode, Clone)]
pub struct Message<Payload> {
/// Message key.
pub key: MessageKey,
/// Message payload.
pub payload: Payload,
}
/// Message processing result.
pub enum MessageResult<Payload> {
/// Message has been processed and should not be queued.
Processed,
/// Message has NOT been processed and should be queued for processing later.
NotProcessed(Message<Payload>),
}
/// Called when inbound message is received.
pub trait OnMessageReceived<Payload> {
/// Called when inbound message is received.
///
/// It is up to the implementers of this trait to determine whether the message
/// is invalid (i.e. improperly encoded, has too large weight, ...) or not. And,
/// if message is invalid, then it should be dropped immediately (by returning
/// `MessageResult::Processed`), or it'll block the lane forever.
fn on_message_received(&mut self, message: Message<Payload>) -> MessageResult<Payload>;
}
/// Inbound lane data.
#[derive(Encode, Decode, Clone)]
pub struct InboundLaneData {
/// Nonce of oldest message that we haven't processed yet. May point to not-yet-received message if
/// lane is currently empty.
pub oldest_unprocessed_nonce: MessageNonce,
/// Nonce of latest message that we have received from bridged chain.
pub latest_received_nonce: MessageNonce,
}
impl Default for InboundLaneData {
fn default() -> Self {
InboundLaneData {
// it is 1 because we're processing everything in [oldest_unprocessed_nonce; latest_received_nonce]
oldest_unprocessed_nonce: 1,
latest_received_nonce: 0,
}
}
}
/// Outbound lane data.
#[derive(Encode, Decode, Clone)]
pub struct OutboundLaneData {
/// Nonce of oldest message that we haven't yet pruned. May point to not-yet-generated message if
/// all sent messages are already pruned.
pub oldest_unpruned_nonce: MessageNonce,
/// Nonce of latest message, received by bridged chain.
pub latest_received_nonce: MessageNonce,
/// Nonce of latest message, processed by bridged chain.
pub latest_processed_nonce: MessageNonce,
/// Nonce of latest message, generated by us.
pub latest_generated_nonce: MessageNonce,
}
impl Default for OutboundLaneData {
fn default() -> Self {
OutboundLaneData {
// it is 1 because we're pruning everything in [oldest_unpruned_nonce; latest_received_nonce]
oldest_unpruned_nonce: 1,
latest_received_nonce: 0,
latest_processed_nonce: 0,
latest_generated_nonce: 0,
}
}
}
decl_runtime_apis! {
/// Outbound message lane API.
pub trait OutboundLaneApi<Payload: Decode> {
/// Returns nonce of the latest message, received by bridged chain.
fn latest_received_nonce(lane: LaneId) -> MessageNonce;
/// Returns nonce of the latest message, processed by bridged chain.
fn latest_processed_nonce(lane: LaneId) -> MessageNonce;
/// Returns nonce of the latest message, generated by given lane.
fn latest_generated_nonce(lane: LaneId) -> MessageNonce;
}
/// Inbound message lane API.
pub trait InboundLaneApi {
/// Returns nonce of the latest message, received by given lane.
fn latest_received_nonce(lane: LaneId) -> MessageNonce;
/// Returns nonce of the latest message, processed by given lane.
fn latest_processed_nonce(lane: LaneId) -> MessageNonce;
}
}