Adds construct_simple_protocol macro for simplifying the creation of protocols (#897)

* Make `on_message` take the message as `&mut Option<_>`

* Make `ConsensusGossip` implement `Specialization`

* Move `new_session` into `ConsensusGossip`

* Adds `construct_simple_protocol` macro for simplifying the creation of protocols
This commit is contained in:
Bastian Köcher
2018-10-12 13:10:36 +02:00
committed by Gav Wood
parent 671b0e0007
commit db427cb45c
7 changed files with 167 additions and 74 deletions
@@ -27,6 +27,9 @@ use runtime_primitives::generic::BlockId;
use message::{self, generic::Message as GenericMessage};
use protocol::Context;
use service::Roles;
use specialization::Specialization;
use StatusMessage;
use generic_message;
// TODO: Add additional spam/DoS attack protection.
const MESSAGE_LIFETIME: Duration = Duration::from_secs(600);
@@ -57,6 +60,7 @@ pub struct ConsensusGossip<B: BlockT> {
live_message_sinks: HashMap<B::Hash, mpsc::UnboundedSender<ConsensusMessage<B>>>,
messages: Vec<MessageEntry<B>>,
message_hashes: HashSet<B::Hash>,
session_start: Option<B::Hash>,
}
impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
@@ -67,6 +71,7 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
live_message_sinks: HashMap::new(),
messages: Default::default(),
message_hashes: Default::default(),
session_start: None
}
}
@@ -300,6 +305,55 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
self.register_message(hash, message);
self.propagate(protocol, generic, hash);
}
/// Note new consensus session.
pub fn new_session(&mut self, parent_hash: B::Hash) {
let old_session = self.session_start.take();
self.session_start = Some(parent_hash);
self.collect_garbage(|topic| old_session.as_ref().map_or(true, |h| topic != h));
}
}
impl<Block: BlockT> Specialization<Block> for ConsensusGossip<Block> where
Block::Header: HeaderT<Number=u64>
{
fn status(&self) -> Vec<u8> {
Vec::new()
}
fn on_connect(&mut self, ctx: &mut Context<Block>, who: NodeIndex, status: StatusMessage<Block>) {
self.new_peer(ctx, who, status.roles);
}
fn on_disconnect(&mut self, ctx: &mut Context<Block>, who: NodeIndex) {
self.peer_disconnected(ctx, who);
}
fn on_message(&mut self, ctx: &mut Context<Block>, who: NodeIndex, message: &mut Option<message::Message<Block>>) {
match message.take() {
Some(generic_message::Message::BftMessage(msg)) => {
trace!(target: "gossip", "BFT message from {}: {:?}", who, msg);
// TODO: check signature here? what if relevant block is unknown?
self.on_bft_message(ctx, who, msg)
}
r => *message = r,
}
}
fn on_abort(&mut self) {
self.abort();
}
fn maintain_peers(&mut self, _ctx: &mut Context<Block>) {
self.collect_garbage(|_| true);
}
fn on_block_imported(
&mut self,
_ctx: &mut Context<Block>,
_hash: <Block as BlockT>::Hash,
_header: &<Block as BlockT>::Header)
{}
}
#[cfg(test)]
+3
View File
@@ -49,6 +49,7 @@ extern crate substrate_test_client as test_client;
mod service;
mod sync;
#[macro_use]
mod protocol;
mod io;
mod config;
@@ -74,3 +75,5 @@ pub use message::{generic as generic_message, RequestId, BftMessage, LocalizedBf
pub use error::Error;
pub use config::{Roles, ProtocolConfig};
pub use on_demand::{OnDemand, OnDemandService, RemoteResponse};
#[doc(hidden)]
pub use runtime_primitives::traits::Block as BlockT;
+100 -1
View File
@@ -277,7 +277,7 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> {
GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(io, who, response),
GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(io, who, request),
GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(io, who, response),
other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, other),
other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, &mut Some(other)),
}
}
@@ -709,3 +709,102 @@ pub(crate) fn hash_message<B: BlockT>(message: &Message<B>) -> B::Hash {
let data = message.encode();
HashFor::<B>::hash(&data)
}
/// Construct a simple protocol that is composed of several sub protocols.
/// Each "sub protocol" needs to implement `Specialization` and needs to provide a `new()` function.
/// For more fine grained implementations, this macro is not usable.
///
/// # Example
///
/// ```nocompile
/// construct_simple_protocol! {
/// pub struct MyProtocol where Block = MyBlock {
/// consensus_gossip: ConsensusGossip<MyBlock>,
/// other_protocol: MyCoolStuff,
/// }
/// }
/// ```
///
/// You can also provide an optional parameter after `where Block = MyBlock`, so it looks like
/// `where Block = MyBlock, Status = consensus_gossip`. This will instruct the implementation to
/// use the `status()` function from the `ConsensusGossip` protocol. By default, `status()` returns
/// an empty vector.
#[macro_export]
macro_rules! construct_simple_protocol {
(
$( #[ $attr:meta ] )*
pub struct $protocol:ident where
Block = $block:ident
$( , Status = $status_protocol_name:ident )*
{
$( $sub_protocol_name:ident : $sub_protocol:ident $( <$protocol_block:ty> )*, )*
}
) => {
$( #[$attr] )*
pub struct $protocol {
$( $sub_protocol_name: $sub_protocol $( <$protocol_block> )*, )*
}
impl $protocol {
/// Instantiate a node protocol handler.
pub fn new() -> Self {
Self {
$( $sub_protocol_name: $sub_protocol::new(), )*
}
}
}
impl $crate::specialization::Specialization<$block> for $protocol {
fn status(&self) -> Vec<u8> {
$(
let status = self.$status_protocol_name.status();
if !status.is_empty() {
return status;
}
)*
Vec::new()
}
fn on_connect(
&mut self,
ctx: &mut $crate::Context<$block>,
who: $crate::NodeIndex,
status: $crate::StatusMessage<$block>
) {
$( self.$sub_protocol_name.on_connect(ctx, who, status); )*
}
fn on_disconnect(&mut self, ctx: &mut $crate::Context<$block>, who: $crate::NodeIndex) {
$( self.$sub_protocol_name.on_disconnect(ctx, who); )*
}
fn on_message(
&mut self,
ctx: &mut $crate::Context<$block>,
who: $crate::NodeIndex,
message: &mut Option<$crate::message::Message<$block>>
) {
$( self.$sub_protocol_name.on_message(ctx, who, message); )*
}
fn on_abort(&mut self) {
$( self.$sub_protocol_name.on_abort(); )*
}
fn maintain_peers(&mut self, ctx: &mut $crate::Context<$block>) {
$( self.$sub_protocol_name.maintain_peers(ctx); )*
}
fn on_block_imported(
&mut self,
ctx: &mut $crate::Context<$block>,
hash: <$block as $crate::BlockT>::Hash,
header: &<$block as $crate::BlockT>::Header
) {
$( self.$sub_protocol_name.on_block_imported(ctx, hash, header); )*
}
}
}
}
+1 -1
View File
@@ -35,7 +35,7 @@ pub trait Specialization<B: BlockT>: Send + Sync + 'static {
fn on_disconnect(&mut self, ctx: &mut Context<B>, who: NodeIndex);
/// Called when a network-specific message arrives.
fn on_message(&mut self, ctx: &mut Context<B>, who: NodeIndex, message: ::message::Message<B>);
fn on_message(&mut self, ctx: &mut Context<B>, who: NodeIndex, message: &mut Option<::message::Message<B>>);
/// Called on abort.
fn on_abort(&mut self) { }
+2 -2
View File
@@ -66,8 +66,8 @@ impl Specialization<Block> for DummySpecialization {
self.gossip.peer_disconnected(ctx, peer_id);
}
fn on_message(&mut self, ctx: &mut Context<Block>, peer_id: NodeIndex, message: ::message::Message<Block>) {
if let ::message::generic::Message::ChainSpecific(data) = message {
fn on_message(&mut self, ctx: &mut Context<Block>, peer_id: NodeIndex, message: &mut Option<::message::Message<Block>>) {
if let Some(::message::generic::Message::ChainSpecific(data)) = message.take() {
let gossip_message = GossipMessage::decode(&mut &data[..])
.expect("gossip messages all in known format; qed");
self.gossip.on_chain_specific(ctx, peer_id, data, gossip_message.topic)