Use substrate codec for network messages (#333)

* Use substrate codec for network messages

* Version bump

* Removed redundant format
This commit is contained in:
Arkadiy Paronyan
2018-07-16 15:28:31 +02:00
committed by Robert Habermeier
parent 747537f6fd
commit f84ad85c62
24 changed files with 617 additions and 316 deletions
-12
View File
@@ -1917,9 +1917,6 @@ dependencies = [
"polkadot-consensus 0.1.0",
"polkadot-primitives 0.1.0",
"rhododendron 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.22 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-bft 0.1.0",
"substrate-codec 0.1.0",
"substrate-network 0.1.0",
@@ -2014,8 +2011,6 @@ name = "polkadot-statement-table"
version = "0.1.0"
dependencies = [
"polkadot-primitives 0.1.0",
"serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-codec 0.1.0",
"substrate-primitives 0.1.0",
]
@@ -2765,19 +2760,12 @@ dependencies = [
"linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.64 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.22 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-bft 0.1.0",
"substrate-client 0.1.0",
"substrate-codec 0.1.0",
"substrate-keyring 0.1.0",
"substrate-network-libp2p 0.1.0",
"substrate-primitives 0.1.0",
"substrate-runtime-primitives 0.1.0",
"substrate-runtime-support 0.1.0",
"substrate-serializer 0.1.0",
"substrate-test-client 0.1.0",
]
+5 -5
View File
@@ -222,19 +222,19 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
// TODO [rob]: collation node implementation
// This isn't a thing. Different parachains will have their own collator executables and
// maybe link to libpolkadot to get a light-client.
service::Role::LIGHT
service::Roles::LIGHT
} else if matches.is_present("light") {
info!("Starting (light)");
config.execution_strategy = service::ExecutionStrategy::NativeWhenPossible;
service::Role::LIGHT
service::Roles::LIGHT
} else if matches.is_present("validator") || matches.is_present("dev") {
info!("Starting validator");
config.execution_strategy = service::ExecutionStrategy::Both;
service::Role::AUTHORITY
service::Roles::AUTHORITY
} else {
info!("Starting (heavy)");
config.execution_strategy = service::ExecutionStrategy::NativeWhenPossible;
service::Role::FULL
service::Roles::FULL
};
if let Some(s) = matches.value_of("execution") {
@@ -303,7 +303,7 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
None
};
match role == service::Role::LIGHT {
match role == service::Roles::LIGHT {
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf, worker)?,
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf, worker)?,
}
-3
View File
@@ -5,9 +5,6 @@ authors = ["Parity Technologies <admin@parity.io>"]
description = "Polkadot-specific networking protocol"
[dependencies]
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
parking_lot = "0.4"
polkadot-api = { path = "../api" }
polkadot-consensus = { path = "../consensus" }
@@ -18,6 +18,7 @@
use polkadot_primitives::{AccountId, Hash};
use polkadot_primitives::parachain::{Id as ParaId, Collation};
use codec;
use futures::sync::oneshot;
@@ -27,12 +28,28 @@ use std::time::{Duration, Instant};
const COLLATION_LIFETIME: Duration = Duration::from_secs(60 * 5);
/// The role of the collator. Whether they're the primary or backup for this parachain.
#[derive(PartialEq, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Debug, Clone, Copy)]
pub enum Role {
/// Primary collators should send collations whenever it's time.
Primary,
Primary = 0,
/// Backup collators should not.
Backup,
Backup = 1,
}
impl codec::Encode for Role {
fn encode_to<T: codec::Output>(&self, dest: &mut T) {
dest.push_byte(*self as u8);
}
}
impl codec::Decode for Role {
fn decode<I: codec::Input>(input: &mut I) -> Option<Self> {
match input.read_byte()? {
x if x == Role::Primary as u8 => Some(Role::Primary),
x if x == Role::Backup as u8 => Some(Role::Backup),
_ => None,
}
}
}
/// A maintenance action for the collator set.
+2 -1
View File
@@ -26,6 +26,7 @@ use polkadot_api::{PolkadotApi, LocalPolkadotApi};
use polkadot_consensus::{Network, SharedTable, Collators};
use polkadot_primitives::{AccountId, Block, Hash, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, Collation};
use codec::Decode;
use futures::prelude::*;
use futures::sync::mpsc;
@@ -175,7 +176,7 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> MessageProcessTask<P> {
}
}
ConsensusMessage::ChainSpecific(msg, _) => {
if let Ok(Message::Statement(parent_hash, statement)) = ::serde_json::from_slice(&msg) {
if let Some(Message::Statement(parent_hash, statement)) = Decode::decode(&mut msg.as_slice()) {
if ::polkadot_consensus::check_statement(&statement.statement, &statement.signature, statement.sender, &parent_hash) {
self.table_router.import_statement(statement);
}
+60 -18
View File
@@ -20,11 +20,6 @@
//! parachain block and extrinsic data fetching, communication between collators and validators,
//! and more.
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
extern crate substrate_bft as bft;
extern crate substrate_codec as codec;
extern crate substrate_network;
@@ -47,7 +42,7 @@ mod collator_pool;
mod router;
pub mod consensus;
use codec::{Decode, Encode};
use codec::{Decode, Encode, Input, Output};
use futures::sync::oneshot;
use parking_lot::Mutex;
use polkadot_consensus::{Statement, SignedStatement, GenericStatement};
@@ -188,7 +183,6 @@ impl CurrentConsensus {
}
/// Polkadot-specific messages.
#[derive(Serialize, Deserialize)]
pub enum Message {
/// signed statement and localized parent hash.
Statement(Hash, SignedStatement),
@@ -205,8 +199,58 @@ pub enum Message {
Collation(Hash, Collation),
}
impl Encode for Message {
fn encode_to<T: Output>(&self, dest: &mut T) {
match *self {
Message::Statement(ref h, ref s) => {
dest.push_byte(0);
dest.push(h);
dest.push(s);
}
Message::SessionKey(ref h, ref k) => {
dest.push_byte(1);
dest.push(h);
dest.push(k);
}
Message::RequestBlockData(ref id, ref d) => {
dest.push_byte(2);
dest.push(id);
dest.push(d);
}
Message::BlockData(ref id, ref d) => {
dest.push_byte(3);
dest.push(id);
dest.push(d);
}
Message::CollatorRole(ref r) => {
dest.push_byte(4);
dest.push(r);
}
Message::Collation(ref h, ref c) => {
dest.push_byte(5);
dest.push(h);
dest.push(c);
}
}
}
}
impl Decode for Message {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
match input.read_byte()? {
0 => Some(Message::Statement(Decode::decode(input)?, Decode::decode(input)?)),
1 => Some(Message::SessionKey(Decode::decode(input)?, Decode::decode(input)?)),
2 => Some(Message::RequestBlockData(Decode::decode(input)?, Decode::decode(input)?)),
3 => Some(Message::BlockData(Decode::decode(input)?, Decode::decode(input)?)),
4 => Some(Message::CollatorRole(Decode::decode(input)?)),
5 => Some(Message::Collation(Decode::decode(input)?, Decode::decode(input)?)),
_ => None,
}
}
}
fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message) {
let encoded = ::serde_json::to_vec(&message).expect("serialization of messages infallible; qed");
let encoded = message.encode();
ctx.send_message(to, generic_message::Message::ChainSpecific(encoded))
}
@@ -244,9 +288,7 @@ impl PolkadotProtocol {
/// Send a statement to a validator.
fn send_statement(&mut self, ctx: &mut Context<Block>, _val: SessionKey, parent_hash: Hash, statement: SignedStatement) {
// TODO: something more targeted than gossip.
let raw = ::serde_json::to_vec(&Message::Statement(parent_hash, statement))
.expect("message serialization infallible; qed");
let raw = Message::Statement(parent_hash, statement).encode();
self.consensus_gossip.multicast_chain_specific(ctx, raw, parent_hash);
}
@@ -427,7 +469,7 @@ impl Specialization<Block> for PolkadotProtocol {
);
}
let validator = status.roles.iter().any(|r| *r == message::Role::Authority);
let validator = status.roles.contains(substrate_network::Roles::AUTHORITY);
let send_key = validator || local_status.collating_for.is_some();
self.peers.insert(peer_id, PeerInfo {
@@ -436,7 +478,7 @@ impl Specialization<Block> for PolkadotProtocol {
validator,
});
self.consensus_gossip.new_peer(ctx, peer_id, &status.roles);
self.consensus_gossip.new_peer(ctx, peer_id, status.roles);
if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) {
send_polkadot_message(
ctx,
@@ -497,11 +539,11 @@ impl Specialization<Block> for PolkadotProtocol {
self.consensus_gossip.on_bft_message(ctx, peer_id, msg)
}
generic_message::Message::ChainSpecific(raw) => {
match serde_json::from_slice(&raw) {
Ok(msg) => self.on_polkadot_message(ctx, peer_id, raw, msg),
Err(e) => {
trace!(target: "p_net", "Bad message from {}: {}", peer_id, e);
ctx.disable_peer(peer_id, "Unknown Polkadot-protocol reason");
match Message::decode(&mut raw.as_slice()) {
Some(msg) => self.on_polkadot_message(ctx, peer_id, raw, msg),
None => {
trace!(target: "p_net", "Bad message from {}", peer_id);
ctx.disable_peer(peer_id, "Invalid polkadot protocol message format");
}
}
}
+9 -12
View File
@@ -24,7 +24,7 @@ use polkadot_primitives::{Block, Hash, SessionKey};
use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData};
use substrate_primitives::H512;
use codec::Encode;
use substrate_network::{PeerId, PeerInfo, ClientHandle, Context, message::Message as SubstrateMessage, message::Role, specialization::Specialization, generic_message::Message as GenericMessage};
use substrate_network::{PeerId, PeerInfo, ClientHandle, Context, Roles, message::Message as SubstrateMessage, specialization::Specialization, generic_message::Message as GenericMessage};
use std::sync::Arc;
use futures::Future;
@@ -62,7 +62,7 @@ impl TestContext {
fn has_message(&self, to: PeerId, message: Message) -> bool {
use substrate_network::generic_message::Message as GenericMessage;
let encoded = ::serde_json::to_vec(&message).unwrap();
let encoded = message.encode();
self.messages.iter().any(|&(ref peer, ref msg)| match msg {
GenericMessage::ChainSpecific(ref data) => peer == &to && data == &encoded,
_ => false,
@@ -70,7 +70,7 @@ impl TestContext {
}
}
fn make_status(status: &Status, roles: Vec<Role>) -> FullStatus {
fn make_status(status: &Status, roles: Roles) -> FullStatus {
FullStatus {
version: 1,
roles,
@@ -78,9 +78,6 @@ fn make_status(status: &Status, roles: Vec<Role>) -> FullStatus {
best_hash: Default::default(),
genesis_hash: Default::default(),
chain_status: status.encode(),
parachain_id: None,
validator_id: None,
validator_signature: None,
}
}
@@ -97,7 +94,7 @@ fn make_consensus(parent_hash: Hash, local_key: SessionKey) -> (CurrentConsensus
}
fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: PeerId, message: Message) {
let encoded = ::serde_json::to_vec(&message).unwrap();
let encoded = message.encode();
protocol.on_message(ctx, from, GenericMessage::ChainSpecific(encoded));
}
@@ -115,7 +112,7 @@ fn sends_session_key() {
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_a, make_status(&validator_status, vec![Role::Authority]));
protocol.on_connect(&mut ctx, peer_a, make_status(&validator_status, Roles::AUTHORITY));
assert!(ctx.messages.is_empty());
}
@@ -129,7 +126,7 @@ fn sends_session_key() {
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, vec![]));
protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, Roles::NONE));
assert!(ctx.has_message(peer_b, Message::SessionKey(parent_hash, local_key)));
}
}
@@ -171,7 +168,7 @@ fn fetches_from_those_with_knowledge() {
// connect peer A
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_a, make_status(&status, vec![Role::Authority]));
protocol.on_connect(&mut ctx, peer_a, make_status(&status, Roles::AUTHORITY));
assert!(ctx.has_message(peer_a, Message::SessionKey(parent_hash, local_key)));
}
@@ -187,7 +184,7 @@ fn fetches_from_those_with_knowledge() {
// peer B connects and sends session key. request already assigned to A
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_b, make_status(&status, vec![Role::Authority]));
protocol.on_connect(&mut ctx, peer_b, make_status(&status, Roles::AUTHORITY));
on_message(&mut protocol, &mut ctx, peer_b, Message::SessionKey(parent_hash, b_key));
assert!(!ctx.has_message(peer_b, Message::RequestBlockData(2, candidate_hash)));
@@ -220,7 +217,7 @@ fn remove_bad_collator() {
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_id, make_status(&status, vec![]));
protocol.on_connect(&mut ctx, peer_id, make_status(&status, Roles::NONE));
}
{
@@ -224,6 +224,22 @@ pub struct Collation {
pub receipt: CandidateReceipt,
}
impl Decode for Collation {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(Collation {
block_data: Decode::decode(input)?,
receipt: Decode::decode(input)?,
})
}
}
impl Encode for Collation {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.block_data);
dest.push(&self.receipt);
}
}
/// Parachain ingress queue message.
#[derive(PartialEq, Eq, Clone)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
@@ -253,6 +269,18 @@ impl BlockData {
}
}
impl Decode for BlockData {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(BlockData(Decode::decode(input)?))
}
}
impl Encode for BlockData {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.0);
}
}
/// Parachain header raw bytes wrapper type.
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
+2 -2
View File
@@ -52,7 +52,7 @@ use client::Client;
use polkadot_network::{PolkadotProtocol, consensus::ConsensusNetwork};
use tokio::runtime::TaskExecutor;
pub use service::{Configuration, Role, PruningMode, ExtrinsicPoolOptions,
pub use service::{Configuration, Roles, PruningMode, ExtrinsicPoolOptions,
ErrorKind, Error, ComponentBlock, LightComponents, FullComponents};
pub use client::ExecutionStrategy;
@@ -166,7 +166,7 @@ pub fn new_light(config: Configuration<GenesisConfig>, executor: TaskExecutor)
pub fn new_full(config: Configuration<GenesisConfig>, executor: TaskExecutor)
-> Result<Service<FullComponents<Factory>>, Error>
{
let is_validator = (config.roles & Role::AUTHORITY) == Role::AUTHORITY;
let is_validator = (config.roles & Roles::AUTHORITY) == Roles::AUTHORITY;
let service = service::Service::<FullComponents<Factory>>::new(config, executor.clone())?;
// Spin consensus service if configured
let consensus = if is_validator {
@@ -7,5 +7,3 @@ authors = ["Parity Technologies <admin@parity.io>"]
substrate-codec = { path = "../../substrate/codec" }
substrate-primitives = { path = "../../substrate/primitives" }
polkadot-primitives = { path = "../primitives" }
serde = "1.0"
serde_derive = "1.0"
@@ -70,7 +70,7 @@ pub trait Context {
}
/// Statements circulated among peers.
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum Statement<C, D> {
/// Broadcast by a authority to indicate that this is his candidate for
/// inclusion.
@@ -141,7 +141,7 @@ impl<C: Decode, D: Decode> Decode for Statement<C, D> {
}
/// A signed statement.
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct SignedStatement<C, D, V, S> {
/// The statement.
pub statement: Statement<C, D>,
@@ -151,6 +151,23 @@ pub struct SignedStatement<C, D, V, S> {
pub sender: V,
}
impl<C: Encode, D: Encode, V: Encode, S: Encode> Encode for SignedStatement<C, D, V, S> {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.statement);
dest.push(&self.signature);
dest.push(&self.sender);
}
}
impl<C: Decode, D: Decode, V: Decode, S: Decode> Decode for SignedStatement<C, D, V, S> {
fn decode<I: Input>(value: &mut I) -> Option<Self> {
Some(SignedStatement {
statement: Decode::decode(value)?,
signature: Decode::decode(value)?,
sender: Decode::decode(value)?,
})
}
}
/// Misbehavior: voting more than one way on candidate validity.
///
/// Since there are three possible ways to vote, a double vote is possible in
@@ -18,10 +18,6 @@ extern crate substrate_codec as codec;
extern crate substrate_primitives;
extern crate polkadot_primitives as primitives;
extern crate serde;
#[macro_use]
extern crate serde_derive;
pub mod generic;
pub use generic::Table;
-7
View File
@@ -9,23 +9,16 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
log = "0.3"
rand = "0.3"
parking_lot = "0.4"
error-chain = "0.12"
bitflags = "1.0"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
futures = "0.1.17"
linked-hash-map = "0.5"
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
ed25519 = { path = "../../substrate/ed25519" }
substrate-primitives = { path = "../../substrate/primitives" }
substrate-client = { path = "../../substrate/client" }
substrate-serializer = { path = "../../substrate/serializer" }
substrate-runtime-support = { path = "../../substrate/runtime-support" }
substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" }
substrate-bft = { path = "../../substrate/bft" }
substrate-codec = { path = "../../substrate/codec" }
substrate-network-libp2p = { path = "../../substrate/network-libp2p" }
+3 -3
View File
@@ -14,19 +14,19 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
pub use service::Role;
pub use service::Roles;
/// Protocol configuration
#[derive(Clone)]
pub struct ProtocolConfig {
/// Assigned roles.
pub roles: Role,
pub roles: Roles,
}
impl Default for ProtocolConfig {
fn default() -> ProtocolConfig {
ProtocolConfig {
roles: Role::FULL,
roles: Roles::FULL,
}
}
}
@@ -25,6 +25,7 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
use runtime_primitives::generic::BlockId;
use message::{self, generic::Message as GenericMessage};
use protocol::Context;
use service::Roles;
// TODO: Add additional spam/DoS attack protection.
const MESSAGE_LIFETIME: Duration = Duration::from_secs(600);
@@ -73,8 +74,8 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> {
}
/// Handle new connected peer.
pub fn new_peer(&mut self, protocol: &mut Context<B>, peer_id: PeerId, roles: &[message::Role]) {
if roles.iter().any(|r| *r == message::Role::Validator) {
pub fn new_peer(&mut self, protocol: &mut Context<B>, peer_id: PeerId, roles: Roles) {
if roles.contains(Roles::AUTHORITY) {
trace!(target:"gossip", "Registering authority {}", peer_id);
// Send out all known messages.
// TODO: limit by size
+2 -8
View File
@@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
#![warn(unused_extern_crates)]
#![warn(missing_docs)]
//! Substrate-specific P2P networking: synchronizing blocks, propagating BFT messages.
@@ -21,21 +22,14 @@
extern crate ethcore_io as core_io;
extern crate linked_hash_map;
extern crate rand;
extern crate parking_lot;
extern crate substrate_primitives as primitives;
extern crate substrate_serializer as ser;
extern crate substrate_client as client;
extern crate substrate_runtime_support as runtime_support;
extern crate substrate_runtime_primitives as runtime_primitives;
extern crate substrate_network_libp2p as network_libp2p;
extern crate substrate_bft;
extern crate substrate_codec as codec;
extern crate serde;
extern crate serde_json;
extern crate futures;
extern crate ed25519;
#[macro_use] extern crate serde_derive;
#[macro_use] extern crate log;
#[macro_use] extern crate bitflags;
#[macro_use] extern crate error_chain;
@@ -67,5 +61,5 @@ pub use sync::{Status as SyncStatus, SyncState};
pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, PeerId, ProtocolId, ConnectionFilter, ConnectionDirection};
pub use message::{generic as generic_message, RequestId, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal, Status as StatusMessage};
pub use error::Error;
pub use config::{Role, ProtocolConfig};
pub use config::{Roles, ProtocolConfig};
pub use on_demand::{OnDemand, OnDemandService, RemoteCallResponse};
+412 -162
View File
@@ -17,9 +17,8 @@
//! Network packet message types. These get serialized and put into the lower level protocol payload.
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT};
use service::Role as RoleFlags;
pub use self::generic::{BlockAnnounce, RemoteCallRequest, ConsensusVote, SignedConsensusVote, FromBlock, Body};
use codec::{Encode, Decode, Input, Output};
pub use self::generic::{BlockAnnounce, RemoteCallRequest, ConsensusVote, SignedConsensusVote, FromBlock};
/// A unique ID of a request.
pub type RequestId = u64;
@@ -86,76 +85,34 @@ pub type SignedConsensusMessage<B> = generic::SignedConsensusProposal<
/// A set of transactions.
pub type Transactions<E> = Vec<E>;
/// Configured node role.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum Role {
/// Full node with no additional responsibilities.
Full,
/// Light client.
Light,
/// Parachain validator.
Authority,
/// Same as `Authority`
Validator,
}
impl Role {
/// Convert enum to service flags.
pub fn as_flags(roles: &[Role]) -> RoleFlags {
let mut flags = RoleFlags::NONE;
for r in roles {
match *r {
Role::Full => flags = flags | RoleFlags::FULL,
Role::Light => flags = flags | RoleFlags::LIGHT,
Role::Authority | Role::Validator => flags = flags | RoleFlags::AUTHORITY,
}
}
flags
}
}
impl From<RoleFlags> for Vec<Role> where {
fn from(flags: RoleFlags) -> Vec<Role> {
let mut roles = Vec::new();
if !(flags & RoleFlags::FULL).is_empty() {
roles.push(Role::Full);
}
if !(flags & RoleFlags::LIGHT).is_empty() {
roles.push(Role::Light);
}
if !(flags & RoleFlags::AUTHORITY).is_empty() {
roles.push(Role::Validator);
}
roles
}
}
/// Bits of block data and associated artefacts to request.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Copy, Clone)]
pub enum BlockAttribute {
/// Include block header.
Header,
/// Include block body.
Body,
/// Include block receipt.
Receipt,
/// Include block message queue.
MessageQueue,
/// Include a justification for the block.
Justification,
bitflags! {
/// Node roles bitmask.
pub struct BlockAttributes: u8 {
/// Include block header.
const HEADER = 0b00000001;
/// Include block body.
const BODY = 0b00000010;
/// Include block receipt.
const RECEIPT = 0b00000100;
/// Include block message queue.
const MESSAGE_QUEUE = 0b00001000;
/// Include a justification for the block.
const JUSTIFICATION = 0b00010000;
}
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
/// Block enumeration direction.
pub enum Direction {
/// Enumerate in ascending order (from child to parent).
Ascending,
Ascending = 0,
/// Enumerate in descendfing order (from parent to canonical child).
Descending,
Descending = 1,
}
/// Remote call response.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct RemoteCallResponse {
/// Id of a request this response was made for.
pub id: RequestId,
@@ -163,106 +120,75 @@ pub struct RemoteCallResponse {
pub proof: Vec<Vec<u8>>,
}
impl Encode for RemoteCallResponse {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.id);
dest.push(&self.proof);
}
}
impl Decode for RemoteCallResponse {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(RemoteCallResponse {
id: Decode::decode(input)?,
proof: Decode::decode(input)?,
})
}
}
/// Generic types.
pub mod generic {
use primitives::AuthorityId;
use codec::{Codec, Decode, Encode};
use codec::{Decode, Encode, Input, Output};
use runtime_primitives::bft::Justification;
use ed25519;
use primitives::Signature;
use service::Roles;
use super::{BlockAttributes, RemoteCallResponse, RequestId, Transactions, Direction};
use super::{Role, BlockAttribute, RemoteCallResponse, RequestId, Transactions, Direction};
use primitives::bytes;
/// Emulates Poc-1 extrinsic primitive.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct V1Extrinsic(#[serde(with="bytes")] pub Vec<u8>);
// Alternative block format for poc-1 compatibility.
// TODO: remove this after poc-2
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[serde(untagged)]
/// Serialized block body type.
pub enum Body<Extrinsic> {
/// Poc-1. Extrinsics as bytes.
V1(Vec<V1Extrinsic>),
/// Poc-2 or later. A structured type.
Extrinsics(Vec<Extrinsic>),
}
impl<Extrinsic> Body<Extrinsic> where Extrinsic: Codec {
/// Extracts extrinsic from the body.
pub fn to_extrinsics(self) -> Vec<Extrinsic> {
match self {
Body::Extrinsics(e) => e,
Body::V1(e) => {
e.into_iter().filter_map(|bytes| {
let bytes = bytes.0.encode();
Decode::decode(&mut bytes.as_slice())
}).collect()
}
}
}
}
/// Emulates Poc-1 justification format.
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
pub struct V1Justification<H> {
/// The round consensus was reached in.
pub round_number: u32,
/// The hash of the header justified.
pub hash: H,
/// The signatures and signers of the hash.
pub signatures: Vec<([u8; 32], Signature)>
}
// TODO: remove this after poc-2
/// Justification back compat
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum BlockJustification<H> {
/// Poc-1 format.
V1(V1Justification<H>),
/// Poc-2 format.
V2(Justification<H>),
}
impl<H> BlockJustification<H> {
/// Convert to PoC-2 justification format.
pub fn to_justification(self) -> Justification<H> {
match self {
BlockJustification::V2(j) => j,
BlockJustification::V1(j) => {
Justification {
round_number: j.round_number,
hash: j.hash,
signatures: j.signatures.into_iter().map(|(a, s)| (a.into(), s)).collect(),
}
}
}
}
}
/// Block data sent in the response.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BlockData<Header, Hash, Extrinsic> {
/// Block header hash.
pub hash: Hash,
/// Block header if requested.
pub header: Option<Header>,
/// Block body if requested.
pub body: Option<Body<Extrinsic>>,
pub body: Option<Vec<Extrinsic>>,
/// Block receipt if requested.
pub receipt: Option<Vec<u8>>,
/// Block message queue if requested.
pub message_queue: Option<Vec<u8>>,
/// Justification if requested.
pub justification: Option<BlockJustification<Hash>>,
pub justification: Option<Justification<Hash>>,
}
impl<Header: Encode, Hash: Encode, Extrinsic: Encode> Encode for BlockData<Header, Hash, Extrinsic> {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.hash);
dest.push(&self.header);
dest.push(&self.body);
dest.push(&self.receipt);
dest.push(&self.message_queue);
dest.push(&self.justification);
}
}
impl<Header: Decode, Hash: Decode, Extrinsic: Decode> Decode for BlockData<Header, Hash, Extrinsic> {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(BlockData {
hash: Decode::decode(input)?,
header: Decode::decode(input)?,
body: Decode::decode(input)?,
receipt: Decode::decode(input)?,
message_queue: Decode::decode(input)?,
justification: Decode::decode(input)?,
})
}
}
/// Identifies starting point of a block sequence.
#[serde(untagged)]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum FromBlock<Hash, Number> {
/// Start with given hash.
Hash(Hash),
@@ -270,8 +196,33 @@ pub mod generic {
Number(Number),
}
impl<Hash: Encode, Number: Encode> Encode for FromBlock<Hash, Number> {
fn encode_to<T: Output>(&self, dest: &mut T) {
match *self {
FromBlock::Hash(ref h) => {
dest.push_byte(0);
dest.push(h);
}
FromBlock::Number(ref n) => {
dest.push_byte(1);
dest.push(n);
}
}
}
}
impl<Hash: Decode, Number: Decode> Decode for FromBlock<Hash, Number> {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
match input.read_byte()? {
0 => Some(FromBlock::Hash(Decode::decode(input)?)),
1 => Some(FromBlock::Number(Decode::decode(input)?)),
_ => None,
}
}
}
/// Communication that can occur between participants in consensus.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BftMessage<Block, Hash> {
/// A consensus message (proposal or vote)
Consensus(SignedConsensusMessage<Block, Hash>),
@@ -279,8 +230,33 @@ pub mod generic {
Auxiliary(Justification<Hash>),
}
impl<Block: Encode, Hash: Encode> Encode for BftMessage<Block, Hash> {
fn encode_to<T: Output>(&self, dest: &mut T) {
match *self {
BftMessage::Consensus(ref h) => {
dest.push_byte(0);
dest.push(h);
}
BftMessage::Auxiliary(ref n) => {
dest.push_byte(1);
dest.push(n);
}
}
}
}
impl<Block: Decode, Hash: Decode> Decode for BftMessage<Block, Hash> {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
match input.read_byte()? {
0 => Some(BftMessage::Consensus(Decode::decode(input)?)),
1 => Some(BftMessage::Auxiliary(Decode::decode(input)?)),
_ => None,
}
}
}
/// BFT Consensus message with parent header hash attached to it.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct LocalizedBftMessage<Block, Hash> {
/// Consensus message.
pub message: BftMessage<Block, Hash>,
@@ -288,8 +264,24 @@ pub mod generic {
pub parent_hash: Hash,
}
impl<Block: Encode, Hash: Encode> Encode for LocalizedBftMessage<Block, Hash> {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.message);
dest.push(&self.parent_hash);
}
}
impl<Block: Decode, Hash: Decode> Decode for LocalizedBftMessage<Block, Hash> {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(LocalizedBftMessage {
message: Decode::decode(input)?,
parent_hash: Decode::decode(input)?,
})
}
}
/// A localized proposal message. Contains two signed pieces of data.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct SignedConsensusProposal<Block, Hash> {
/// The round number.
pub round_number: u32,
@@ -305,8 +297,32 @@ pub mod generic {
pub full_signature: ed25519::Signature,
}
impl<Block: Encode, Hash: Encode> Encode for SignedConsensusProposal<Block, Hash> {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.round_number);
dest.push(&self.proposal);
dest.push(&self.digest);
dest.push(&self.sender);
dest.push(&self.digest_signature);
dest.push(&self.full_signature);
}
}
impl<Block: Decode, Hash: Decode> Decode for SignedConsensusProposal<Block, Hash> {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(SignedConsensusProposal {
round_number: Decode::decode(input)?,
proposal: Decode::decode(input)?,
digest: Decode::decode(input)?,
sender: Decode::decode(input)?,
digest_signature: Decode::decode(input)?,
full_signature: Decode::decode(input)?,
})
}
}
/// A localized vote message, including the sender.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct SignedConsensusVote<H> {
/// The message sent.
pub vote: ConsensusVote<H>,
@@ -316,8 +332,26 @@ pub mod generic {
pub signature: ed25519::Signature,
}
impl<Hash: Encode> Encode for SignedConsensusVote<Hash> {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.vote);
dest.push(&self.sender);
dest.push(&self.signature);
}
}
impl<Hash: Decode> Decode for SignedConsensusVote<Hash> {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(SignedConsensusVote {
vote: Decode::decode(input)?,
sender: Decode::decode(input)?,
signature: Decode::decode(input)?,
})
}
}
/// Votes during a consensus round.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum ConsensusVote<H> {
/// Prepare to vote for proposal with digest D.
Prepare(u32, H),
@@ -327,8 +361,40 @@ pub mod generic {
AdvanceRound(u32),
}
impl<Hash: Encode> Encode for ConsensusVote<Hash> {
fn encode_to<T: Output>(&self, dest: &mut T) {
match *self {
ConsensusVote::Prepare(ref r, ref h) => {
dest.push_byte(0);
dest.push(r);
dest.push(h);
}
ConsensusVote::Commit(ref r, ref h) => {
dest.push_byte(1);
dest.push(r);
dest.push(h);
}
ConsensusVote::AdvanceRound(ref r) => {
dest.push_byte(2);
dest.push(r);
}
}
}
}
impl<Hash: Decode> Decode for ConsensusVote<Hash> {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
match input.read_byte()? {
0 => Some(ConsensusVote::Prepare(Decode::decode(input)?, Decode::decode(input)?)),
1 => Some(ConsensusVote::Commit(Decode::decode(input)?, Decode::decode(input)?)),
2 => Some(ConsensusVote::AdvanceRound(Decode::decode(input)?)),
_ => None,
}
}
}
/// A localized message.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum SignedConsensusMessage<Block, Hash> {
/// A proposal.
Propose(SignedConsensusProposal<Block, Hash>),
@@ -336,8 +402,33 @@ pub mod generic {
Vote(SignedConsensusVote<Hash>),
}
impl<Block: Encode, Hash: Encode> Encode for SignedConsensusMessage<Block, Hash> {
fn encode_to<T: Output>(&self, dest: &mut T) {
match *self {
SignedConsensusMessage::Propose(ref m) => {
dest.push_byte(0);
dest.push(m);
}
SignedConsensusMessage::Vote(ref m) => {
dest.push_byte(1);
dest.push(m);
}
}
}
}
impl<Block: Decode, Hash: Decode> Decode for SignedConsensusMessage<Block, Hash> {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
match input.read_byte()? {
0 => Some(SignedConsensusMessage::Propose(Decode::decode(input)?)),
1 => Some(SignedConsensusMessage::Vote(Decode::decode(input)?)),
_ => None,
}
}
}
/// A network message.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum Message<Block, Header, Hash, Number, Extrinsic> {
/// Status packet.
Status(Status<Hash, Number>),
@@ -359,13 +450,77 @@ pub mod generic {
ChainSpecific(Vec<u8>),
}
impl<Block: Encode, Header: Encode, Hash: Encode, Number: Encode, Extrinsic: Encode> Encode
for Message<Block, Header, Hash, Number, Extrinsic>
{
fn encode_to<T: Output>(&self, dest: &mut T) {
match *self {
Message::Status(ref m) => {
dest.push_byte(0);
dest.push(m);
}
Message::BlockRequest(ref m) => {
dest.push_byte(1);
dest.push(m);
}
Message::BlockResponse(ref m) => {
dest.push_byte(2);
dest.push(m);
}
Message::BlockAnnounce(ref m) => {
dest.push_byte(3);
dest.push(m);
}
Message::Transactions(ref m) => {
dest.push_byte(4);
dest.push(m);
}
Message::BftMessage(ref m) => {
dest.push_byte(5);
dest.push(m);
}
Message::RemoteCallRequest(ref m) => {
dest.push_byte(6);
dest.push(m);
}
Message::RemoteCallResponse(ref m) => {
dest.push_byte(7);
dest.push(m);
}
Message::ChainSpecific(ref m) => {
dest.push_byte(255);
dest.push(m);
}
}
}
}
impl<Block: Decode, Header: Decode, Hash: Decode, Number: Decode, Extrinsic: Decode> Decode
for Message<Block, Header, Hash, Number, Extrinsic>
{
fn decode<I: Input>(input: &mut I) -> Option<Self> {
match input.read_byte()? {
0 => Some(Message::Status(Decode::decode(input)?)),
1 => Some(Message::BlockRequest(Decode::decode(input)?)),
2 => Some(Message::BlockResponse(Decode::decode(input)?)),
3 => Some(Message::BlockAnnounce(Decode::decode(input)?)),
4 => Some(Message::Transactions(Decode::decode(input)?)),
5 => Some(Message::BftMessage(Decode::decode(input)?)),
6 => Some(Message::RemoteCallResponse(Decode::decode(input)?)),
7 => Some(Message::RemoteCallResponse(Decode::decode(input)?)),
255 => Some(Message::ChainSpecific(Decode::decode(input)?)),
_ => None,
}
}
}
/// Status sent on connection.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Status<Hash, Number> {
/// Protocol version.
pub version: u32,
/// Supported roles.
pub roles: Vec<Role>,
pub roles: Roles,
/// Best block number.
pub best_number: Number,
/// Best block hash.
@@ -373,23 +528,40 @@ pub mod generic {
/// Genesis block hash.
pub genesis_hash: Hash,
/// Chain-specific status.
#[serde(skip)]
pub chain_status: Vec<u8>,
/// Signatue of `best_hash` made with validator address. Required for the validator role.
pub validator_signature: Option<ed25519::Signature>,
/// Validator address. Required for the validator role.
pub validator_id: Option<AuthorityId>,
/// Parachain id. Required for the collator role.
pub parachain_id: Option<u64>,
}
impl<Hash: Encode, Number: Encode> Encode for Status<Hash, Number> {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.version);
dest.push_byte(self.roles.bits());
dest.push(&self.best_number);
dest.push(&self.best_hash);
dest.push(&self.genesis_hash);
dest.push(&self.chain_status);
}
}
impl<Hash: Decode, Number: Decode> Decode for Status<Hash, Number> {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(Status {
version: Decode::decode(input)?,
roles: Roles::from_bits(input.read_byte()?)?,
best_number: Decode::decode(input)?,
best_hash: Decode::decode(input)?,
genesis_hash: Decode::decode(input)?,
chain_status: Decode::decode(input)?,
})
}
}
/// Request block data from a peer.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BlockRequest<Hash, Number> {
/// Unique request id.
pub id: RequestId,
/// Bits of block data to request.
pub fields: Vec<BlockAttribute>,
pub fields: BlockAttributes,
/// Start from this block.
pub from: FromBlock<Hash, Number>,
/// End at this block. An implementation defined maximum is used when unspecified.
@@ -400,8 +572,36 @@ pub mod generic {
pub max: Option<u32>,
}
impl<Hash: Encode, Number: Encode> Encode for BlockRequest<Hash, Number> {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.id);
dest.push_byte(self.fields.bits());
dest.push(&self.from);
dest.push(&self.to);
dest.push_byte(self.direction as u8);
dest.push(&self.max);
}
}
impl<Hash: Decode, Number: Decode> Decode for BlockRequest<Hash, Number> {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(BlockRequest {
id: Decode::decode(input)?,
fields: BlockAttributes::from_bits(input.read_byte()?)?,
from: Decode::decode(input)?,
to: Decode::decode(input)?,
direction: match input.read_byte()? {
x if x == Direction::Ascending as u8 => Some(Direction::Ascending),
x if x == Direction::Descending as u8 => Some(Direction::Descending),
_ => None,
}?,
max: Decode::decode(input)?,
})
}
}
/// Response to `BlockRequest`
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BlockResponse<Header, Hash, Extrinsic> {
/// Id of a request this response was made for.
pub id: RequestId,
@@ -409,14 +609,44 @@ pub mod generic {
pub blocks: Vec<BlockData<Header, Hash, Extrinsic>>,
}
impl<Header: Encode, Hash: Encode, Extrinsic: Encode> Encode for BlockResponse<Header, Hash, Extrinsic> {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.id);
dest.push(&self.blocks)
}
}
impl<Header: Decode, Hash: Decode, Extrinsic: Decode> Decode for BlockResponse<Header, Hash, Extrinsic> {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(BlockResponse {
id: Decode::decode(input)?,
blocks: Decode::decode(input)?,
})
}
}
/// Announce a new complete relay chain block on the network.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BlockAnnounce<H> {
/// New block header.
pub header: H,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
impl<Header: Encode> Encode for BlockAnnounce<Header> {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.header);
}
}
impl<Header: Decode> Decode for BlockAnnounce<Header> {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(BlockAnnounce {
header: Decode::decode(input)?,
})
}
}
#[derive(Debug, PartialEq, Eq, Clone)]
/// Remote call request.
pub struct RemoteCallRequest<H> {
/// Unique request id.
@@ -428,4 +658,24 @@ pub mod generic {
/// Call data.
pub data: Vec<u8>,
}
impl<Hash: Encode> Encode for RemoteCallRequest<Hash> {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.id);
dest.push(&self.block);
dest.push(self.method.as_bytes());
dest.push(&self.data);
}
}
impl<Hash: Decode> Decode for RemoteCallRequest<Hash> {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(RemoteCallRequest {
id: Decode::decode(input)?,
block: Decode::decode(input)?,
method: String::from_utf8_lossy(&Vec::decode(input)?).into(),
data: Decode::decode(input)?,
})
}
}
}
+14 -14
View File
@@ -38,7 +38,7 @@ const REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
/// On-demand service API.
pub trait OnDemandService<Block: BlockT>: Send + Sync {
/// When new node is connected.
fn on_connect(&self, peer: PeerId, role: service::Role);
fn on_connect(&self, peer: PeerId, role: service::Roles);
/// When node is disconnected.
fn on_disconnect(&self, peer: PeerId);
@@ -168,8 +168,8 @@ impl<B, E> OnDemandService<B> for OnDemand<B, E> where
E: service::ExecuteInContext<B>,
B::Header: HeaderT,
{
fn on_connect(&self, peer: PeerId, role: service::Role) {
if !role.intersects(service::Role::FULL | service::Role::AUTHORITY) { // TODO: correct?
fn on_connect(&self, peer: PeerId, role: service::Roles) {
if !role.intersects(service::Roles::FULL | service::Roles::AUTHORITY) { // TODO: correct?
return;
}
@@ -326,7 +326,7 @@ mod tests {
use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest};
use message;
use network_libp2p::PeerId;
use service::{Role, ExecuteInContext};
use service::{Roles, ExecuteInContext};
use test::TestIo;
use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService};
use test_client::runtime::{Block, Hash};
@@ -372,16 +372,16 @@ mod tests {
#[test]
fn knows_about_peers_roles() {
let (_, on_demand) = dummy(true);
on_demand.on_connect(0, Role::LIGHT);
on_demand.on_connect(1, Role::FULL);
on_demand.on_connect(2, Role::AUTHORITY);
on_demand.on_connect(0, Roles::LIGHT);
on_demand.on_connect(1, Roles::FULL);
on_demand.on_connect(2, Roles::AUTHORITY);
assert_eq!(vec![1, 2], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
}
#[test]
fn disconnects_from_idle_peer() {
let (_, on_demand) = dummy(true);
on_demand.on_connect(0, Role::FULL);
on_demand.on_connect(0, Roles::FULL);
assert_eq!(1, total_peers(&*on_demand));
on_demand.on_disconnect(0);
assert_eq!(0, total_peers(&*on_demand));
@@ -393,8 +393,8 @@ mod tests {
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Role::FULL);
on_demand.on_connect(1, Role::FULL);
on_demand.on_connect(0, Roles::FULL);
on_demand.on_connect(1, Roles::FULL);
assert_eq!(vec![0, 1], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>());
assert!(on_demand.core.lock().active_peers.is_empty());
@@ -414,7 +414,7 @@ mod tests {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Role::FULL);
on_demand.on_connect(0, Roles::FULL);
on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] });
receive_call_response(&*on_demand, &mut network, 0, 1);
@@ -429,7 +429,7 @@ mod tests {
let mut network = TestIo::new(&queue, None);
on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] });
on_demand.on_connect(0, Role::FULL);
on_demand.on_connect(0, Roles::FULL);
receive_call_response(&*on_demand, &mut network, 0, 0);
assert!(network.to_disconnect.contains(&0));
assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
@@ -440,7 +440,7 @@ mod tests {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Role::FULL);
on_demand.on_connect(0, Roles::FULL);
receive_call_response(&*on_demand, &mut network, 0, 0);
assert!(network.to_disconnect.contains(&0));
@@ -451,7 +451,7 @@ mod tests {
let (_x, on_demand) = dummy(true);
let queue = RwLock::new(VecDeque::new());
let mut network = TestIo::new(&queue, None);
on_demand.on_connect(0, Role::FULL);
on_demand.on_connect(0, Roles::FULL);
let response = on_demand.remote_call(RemoteCallRequest { block: Default::default(), method: "test".into(), call_data: vec![] });
let thread = ::std::thread::spawn(move || {
+21 -32
View File
@@ -19,16 +19,16 @@ use std::{mem, cmp};
use std::sync::Arc;
use std::time;
use parking_lot::RwLock;
use serde_json;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, As};
use runtime_primitives::generic::BlockId;
use network_libp2p::PeerId;
use codec::{Encode, Decode};
use message::{self, Message};
use message::generic::Message as GenericMessage;
use specialization::Specialization;
use sync::{ChainSync, Status as SyncStatus, SyncState};
use service::{Role, TransactionPool};
use service::{Roles, TransactionPool};
use config::ProtocolConfig;
use chain::Client;
use on_demand::OnDemandService;
@@ -38,7 +38,7 @@ use error;
const REQUEST_TIMEOUT_SEC: u64 = 40;
/// Current protocol version.
pub (crate) const CURRENT_VERSION: u32 = 0;
pub (crate) const CURRENT_VERSION: u32 = 1;
/// Current packet count.
pub (crate) const CURRENT_PACKET_COUNT: u8 = 1;
@@ -74,7 +74,7 @@ struct Peer<B: BlockT> {
/// Protocol version
protocol_version: u32,
/// Roles
roles: Role,
roles: Roles,
/// Peer best block hash
best_hash: B::Hash,
/// Peer best block number
@@ -95,7 +95,7 @@ struct Peer<B: BlockT> {
#[derive(Debug)]
pub struct PeerInfo<B: BlockT> {
/// Roles
pub roles: Role,
pub roles: Roles,
/// Protocol version
pub protocol_version: u32,
/// Peer best block hash
@@ -233,12 +233,12 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
}
}
pub fn handle_packet(&self, io: &mut SyncIo, peer_id: PeerId, data: &[u8]) {
let message: Message<B> = match serde_json::from_slice(data) {
Ok(m) => m,
Err(e) => {
trace!(target: "sync", "Invalid packet: {}", String::from_utf8_lossy(data));
io.disable_peer(peer_id, &format!("Peer sent us a packet with invalid format ({})", e));
pub fn handle_packet(&self, io: &mut SyncIo, peer_id: PeerId, mut data: &[u8]) {
let message: Message<B> = match Decode::decode(&mut data) {
Some(m) => m,
None => {
trace!(target: "sync", "Invalid packet from {}", peer_id);
io.disable_peer(peer_id, "Peer sent us a packet with invalid format");
return;
}
};
@@ -319,16 +319,9 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
};
let max = cmp::min(request.max.unwrap_or(u32::max_value()), MAX_BLOCK_DATA_RESPONSE) as usize;
// TODO: receipts, etc.
let (mut get_header, mut get_body, mut get_justification) = (false, false, false);
for a in request.fields {
match a {
message::BlockAttribute::Header => get_header = true,
message::BlockAttribute::Body => get_body = true,
message::BlockAttribute::Receipt => unimplemented!(),
message::BlockAttribute::MessageQueue => unimplemented!(),
message::BlockAttribute::Justification => get_justification = true,
}
}
let get_header = request.fields.contains(message::BlockAttributes::HEADER);
let get_body = request.fields.contains(message::BlockAttributes::BODY);
let get_justification = request.fields.contains(message::BlockAttributes::JUSTIFICATION);
while let Some(header) = self.context_data.chain.header(&id).unwrap_or(None) {
if blocks.len() >= max{
break;
@@ -339,10 +332,10 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
let block_data = message::generic::BlockData {
hash: hash,
header: if get_header { Some(header) } else { None },
body: (if get_body { self.context_data.chain.body(&BlockId::Hash(hash)).unwrap_or(None) } else { None }).map(|body| message::Body::Extrinsics(body)),
body: if get_body { self.context_data.chain.body(&BlockId::Hash(hash)).unwrap_or(None) } else { None },
receipt: None,
message_queue: None,
justification: justification.map(|j| message::generic::BlockJustification::V2(j)),
justification,
};
blocks.push(block_data);
match request.direction {
@@ -435,7 +428,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
let peer = Peer {
protocol_version: status.version,
roles: message::Role::as_flags(&status.roles),
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number,
block_request: None,
@@ -452,7 +445,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
let mut context = ProtocolContext::new(&self.context_data, io);
self.sync.write().new_peer(&mut context, peer_id);
self.specialization.write().on_connect(&mut context, peer_id, status.clone());
self.on_demand.as_ref().map(|s| s.on_connect(peer_id, message::Role::as_flags(&status.roles)));
self.on_demand.as_ref().map(|s| s.on_connect(peer_id, status.roles));
}
/// Called when peer sends us new extrinsics
@@ -521,10 +514,6 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
best_number: info.chain.best_number,
best_hash: info.chain.best_hash,
chain_status: self.specialization.read().status(),
parachain_id: None,
validator_id: None,
validator_signature: None,
};
self.send_message(io, peer_id, GenericMessage::Status(status))
}
@@ -562,7 +551,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
);
// blocks are not announced by light clients
if self.config.roles & Role::LIGHT == Role::LIGHT {
if self.config.roles & Roles::LIGHT == Roles::LIGHT {
return;
}
@@ -621,7 +610,7 @@ fn send_message<B: BlockT>(peers: &RwLock<HashMap<PeerId, Peer<B>>>, io: &mut Sy
},
_ => (),
}
let data = serde_json::to_vec(&message).expect("Serializer is infallible; qed");
let data = message.encode();
if let Err(e) = io.send(peer_id, data) {
debug!(target:"sync", "Error sending message: {:?}", e);
io.disconnect_peer(peer_id);
@@ -630,6 +619,6 @@ fn send_message<B: BlockT>(peers: &RwLock<HashMap<PeerId, Peer<B>>>, io: &mut Sy
/// Hash a message.
pub(crate) fn hash_message<B: BlockT>(message: &Message<B>) -> B::Hash {
let data = serde_json::to_vec(&message).expect("Serializer is infallible; qed");
let data = message.encode();
HashFor::<B>::hash(&data)
}
+1 -1
View File
@@ -46,7 +46,7 @@ const PROPAGATE_TIMEOUT: Duration = Duration::from_millis(5000);
bitflags! {
/// Node roles bitmask.
pub struct Role: u32 {
pub struct Roles: u8 {
/// No network.
const NONE = 0b00000000;
/// Full node, does not participate in consensus.
+10 -13
View File
@@ -22,7 +22,7 @@ use blocks::{self, BlockCollection};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, As, NumberFor};
use runtime_primitives::generic::BlockId;
use message::{self, generic::Message as GenericMessage};
use service::Role;
use service::Roles;
// Maximum blocks to request in a single packet.
const MAX_BLOCKS_TO_REQUEST: usize = 128;
@@ -50,7 +50,7 @@ pub struct ChainSync<B: BlockT> {
blocks: BlockCollection<B>,
best_queued_number: NumberFor<B>,
best_queued_hash: B::Hash,
required_block_attributes: Vec<message::BlockAttribute>,
required_block_attributes: message::BlockAttributes,
}
/// Reported sync state.
@@ -73,13 +73,10 @@ pub struct Status<B: BlockT> {
impl<B: BlockT> ChainSync<B> {
/// Create a new instance.
pub(crate) fn new(role: Role, info: &ClientInfo<B>) -> Self {
let mut required_block_attributes = vec![
message::BlockAttribute::Header,
message::BlockAttribute::Justification
];
if role.intersects(Role::FULL) {
required_block_attributes.push(message::BlockAttribute::Body);
pub(crate) fn new(role: Roles, info: &ClientInfo<B>) -> Self {
let mut required_block_attributes = message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION;
if role.intersects(Roles::FULL) {
required_block_attributes |= message::BlockAttributes::BODY;
}
ChainSync {
@@ -88,7 +85,7 @@ impl<B: BlockT> ChainSync<B> {
blocks: BlockCollection::new(),
best_queued_hash: info.best_queued_hash.unwrap_or(info.chain.best_hash),
best_queued_number: info.best_queued_number.unwrap_or(info.chain.best_number),
required_block_attributes: required_block_attributes,
required_block_attributes,
}
}
@@ -253,8 +250,8 @@ impl<B: BlockT> ChainSync<B> {
let result = protocol.client().import(
is_best,
header,
justification.to_justification(),
block.body.map(|b| b.to_extrinsics()),
justification,
block.body,
);
match result {
Ok(ImportResult::AlreadyInChain) => {
@@ -447,7 +444,7 @@ impl<B: BlockT> ChainSync<B> {
trace!(target: "sync", "Requesting ancestry block #{} from {}", block, peer_id);
let request = message::generic::BlockRequest {
id: 0,
fields: vec![message::BlockAttribute::Header, message::BlockAttribute::Justification],
fields: message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION,
from: message::FromBlock::Number(block),
to: None,
direction: message::Direction::Ascending,
+2 -6
View File
@@ -17,11 +17,10 @@
use client::backend::Backend;
use client::blockchain::HeaderBackend as BlockchainHeaderBackend;
use sync::SyncState;
use {Role};
use Roles;
use super::*;
#[test]
#[ignore]
fn sync_from_two_peers_works() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
@@ -34,7 +33,6 @@ fn sync_from_two_peers_works() {
}
#[test]
#[ignore]
fn sync_from_two_peers_with_ancestry_search_works() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
@@ -47,7 +45,6 @@ fn sync_from_two_peers_with_ancestry_search_works() {
}
#[test]
#[ignore]
fn sync_long_chain_works() {
let mut net = TestNet::new(2);
net.peer(1).push_blocks(500, false);
@@ -68,7 +65,6 @@ fn sync_no_common_longer_chain_fails() {
}
#[test]
#[ignore]
fn sync_after_fork_works() {
::env_logger::init().ok();
let mut net = TestNet::new(3);
@@ -99,7 +95,7 @@ fn blocks_are_not_announced_by_light_nodes() {
// full peer0 is connected to light peer
// light peer1 is connected to full peer2
let mut light_config = ProtocolConfig::default();
light_config.roles = Role::LIGHT;
light_config.roles = Roles::LIGHT;
net.add_peer(&ProtocolConfig::default());
net.add_peer(&light_config);
net.add_peer(&ProtocolConfig::default());
+3 -3
View File
@@ -19,7 +19,7 @@
use extrinsic_pool;
use chain_spec::ChainSpec;
pub use client::ExecutionStrategy;
pub use network::Role;
pub use network::Roles;
pub use network::NetworkConfiguration;
pub use client_db::PruningMode;
use runtime_primitives::BuildStorage;
@@ -28,7 +28,7 @@ use serde::{Serialize, de::DeserializeOwned};
/// Service configuration.
pub struct Configuration<G: Serialize + DeserializeOwned + BuildStorage> {
/// Node roles.
pub roles: Role,
pub roles: Roles,
/// Extrinsic pool configuration.
pub extrinsic_pool: extrinsic_pool::txpool::Options,
/// Network configuration.
@@ -57,7 +57,7 @@ impl<G: Serialize + DeserializeOwned + BuildStorage> Configuration<G> {
let mut configuration = Configuration {
chain_spec,
name: Default::default(),
roles: Role::FULL,
roles: Roles::FULL,
extrinsic_pool: Default::default(),
network: Default::default(),
keystore_path: Default::default(),
+1 -1
View File
@@ -59,7 +59,7 @@ use exit_future::Signal;
use tokio::runtime::TaskExecutor;
pub use self::error::{ErrorKind, Error};
pub use config::{Configuration, Role, PruningMode};
pub use config::{Configuration, Roles, PruningMode};
pub use chain_spec::ChainSpec;
pub use extrinsic_pool::txpool::{Options as ExtrinsicPoolOptions};
pub use extrinsic_pool::api::{ExtrinsicPool as ExtrinsicPoolApi};