mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 12:51:02 +00:00
Use substrate codec for network messages (#333)
* Use substrate codec for network messages * Version bump * Removed redundant format
This commit is contained in:
committed by
Robert Habermeier
parent
0e40983f3b
commit
20f3e9f636
@@ -222,19 +222,19 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
|
||||
// TODO [rob]: collation node implementation
|
||||
// This isn't a thing. Different parachains will have their own collator executables and
|
||||
// maybe link to libpolkadot to get a light-client.
|
||||
service::Role::LIGHT
|
||||
service::Roles::LIGHT
|
||||
} else if matches.is_present("light") {
|
||||
info!("Starting (light)");
|
||||
config.execution_strategy = service::ExecutionStrategy::NativeWhenPossible;
|
||||
service::Role::LIGHT
|
||||
service::Roles::LIGHT
|
||||
} else if matches.is_present("validator") || matches.is_present("dev") {
|
||||
info!("Starting validator");
|
||||
config.execution_strategy = service::ExecutionStrategy::Both;
|
||||
service::Role::AUTHORITY
|
||||
service::Roles::AUTHORITY
|
||||
} else {
|
||||
info!("Starting (heavy)");
|
||||
config.execution_strategy = service::ExecutionStrategy::NativeWhenPossible;
|
||||
service::Role::FULL
|
||||
service::Roles::FULL
|
||||
};
|
||||
|
||||
if let Some(s) = matches.value_of("execution") {
|
||||
@@ -303,7 +303,7 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
|
||||
None
|
||||
};
|
||||
|
||||
match role == service::Role::LIGHT {
|
||||
match role == service::Roles::LIGHT {
|
||||
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf, worker)?,
|
||||
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf, worker)?,
|
||||
}
|
||||
|
||||
@@ -5,9 +5,6 @@ authors = ["Parity Technologies <admin@parity.io>"]
|
||||
description = "Polkadot-specific networking protocol"
|
||||
|
||||
[dependencies]
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0"
|
||||
parking_lot = "0.4"
|
||||
polkadot-api = { path = "../api" }
|
||||
polkadot-consensus = { path = "../consensus" }
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
use polkadot_primitives::{AccountId, Hash};
|
||||
use polkadot_primitives::parachain::{Id as ParaId, Collation};
|
||||
use codec;
|
||||
|
||||
use futures::sync::oneshot;
|
||||
|
||||
@@ -27,12 +28,28 @@ use std::time::{Duration, Instant};
|
||||
const COLLATION_LIFETIME: Duration = Duration::from_secs(60 * 5);
|
||||
|
||||
/// The role of the collator. Whether they're the primary or backup for this parachain.
|
||||
#[derive(PartialEq, Debug, Serialize, Deserialize)]
|
||||
#[derive(PartialEq, Debug, Clone, Copy)]
|
||||
pub enum Role {
|
||||
/// Primary collators should send collations whenever it's time.
|
||||
Primary,
|
||||
Primary = 0,
|
||||
/// Backup collators should not.
|
||||
Backup,
|
||||
Backup = 1,
|
||||
}
|
||||
|
||||
impl codec::Encode for Role {
|
||||
fn encode_to<T: codec::Output>(&self, dest: &mut T) {
|
||||
dest.push_byte(*self as u8);
|
||||
}
|
||||
}
|
||||
|
||||
impl codec::Decode for Role {
|
||||
fn decode<I: codec::Input>(input: &mut I) -> Option<Self> {
|
||||
match input.read_byte()? {
|
||||
x if x == Role::Primary as u8 => Some(Role::Primary),
|
||||
x if x == Role::Backup as u8 => Some(Role::Backup),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A maintenance action for the collator set.
|
||||
|
||||
@@ -26,6 +26,7 @@ use polkadot_api::{PolkadotApi, LocalPolkadotApi};
|
||||
use polkadot_consensus::{Network, SharedTable, Collators};
|
||||
use polkadot_primitives::{AccountId, Block, Hash, SessionKey};
|
||||
use polkadot_primitives::parachain::{Id as ParaId, Collation};
|
||||
use codec::Decode;
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures::sync::mpsc;
|
||||
@@ -175,7 +176,7 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> MessageProcessTask<P> {
|
||||
}
|
||||
}
|
||||
ConsensusMessage::ChainSpecific(msg, _) => {
|
||||
if let Ok(Message::Statement(parent_hash, statement)) = ::serde_json::from_slice(&msg) {
|
||||
if let Some(Message::Statement(parent_hash, statement)) = Decode::decode(&mut msg.as_slice()) {
|
||||
if ::polkadot_consensus::check_statement(&statement.statement, &statement.signature, statement.sender, &parent_hash) {
|
||||
self.table_router.import_statement(statement);
|
||||
}
|
||||
|
||||
+60
-18
@@ -20,11 +20,6 @@
|
||||
//! parachain block and extrinsic data fetching, communication between collators and validators,
|
||||
//! and more.
|
||||
|
||||
extern crate serde;
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
extern crate serde_json;
|
||||
|
||||
extern crate substrate_bft as bft;
|
||||
extern crate substrate_codec as codec;
|
||||
extern crate substrate_network;
|
||||
@@ -47,7 +42,7 @@ mod collator_pool;
|
||||
mod router;
|
||||
pub mod consensus;
|
||||
|
||||
use codec::{Decode, Encode};
|
||||
use codec::{Decode, Encode, Input, Output};
|
||||
use futures::sync::oneshot;
|
||||
use parking_lot::Mutex;
|
||||
use polkadot_consensus::{Statement, SignedStatement, GenericStatement};
|
||||
@@ -188,7 +183,6 @@ impl CurrentConsensus {
|
||||
}
|
||||
|
||||
/// Polkadot-specific messages.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum Message {
|
||||
/// signed statement and localized parent hash.
|
||||
Statement(Hash, SignedStatement),
|
||||
@@ -205,8 +199,58 @@ pub enum Message {
|
||||
Collation(Hash, Collation),
|
||||
}
|
||||
|
||||
impl Encode for Message {
|
||||
fn encode_to<T: Output>(&self, dest: &mut T) {
|
||||
match *self {
|
||||
Message::Statement(ref h, ref s) => {
|
||||
dest.push_byte(0);
|
||||
dest.push(h);
|
||||
dest.push(s);
|
||||
}
|
||||
Message::SessionKey(ref h, ref k) => {
|
||||
dest.push_byte(1);
|
||||
dest.push(h);
|
||||
dest.push(k);
|
||||
}
|
||||
Message::RequestBlockData(ref id, ref d) => {
|
||||
dest.push_byte(2);
|
||||
dest.push(id);
|
||||
dest.push(d);
|
||||
}
|
||||
Message::BlockData(ref id, ref d) => {
|
||||
dest.push_byte(3);
|
||||
dest.push(id);
|
||||
dest.push(d);
|
||||
}
|
||||
Message::CollatorRole(ref r) => {
|
||||
dest.push_byte(4);
|
||||
dest.push(r);
|
||||
}
|
||||
Message::Collation(ref h, ref c) => {
|
||||
dest.push_byte(5);
|
||||
dest.push(h);
|
||||
dest.push(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Decode for Message {
|
||||
fn decode<I: Input>(input: &mut I) -> Option<Self> {
|
||||
match input.read_byte()? {
|
||||
0 => Some(Message::Statement(Decode::decode(input)?, Decode::decode(input)?)),
|
||||
1 => Some(Message::SessionKey(Decode::decode(input)?, Decode::decode(input)?)),
|
||||
2 => Some(Message::RequestBlockData(Decode::decode(input)?, Decode::decode(input)?)),
|
||||
3 => Some(Message::BlockData(Decode::decode(input)?, Decode::decode(input)?)),
|
||||
4 => Some(Message::CollatorRole(Decode::decode(input)?)),
|
||||
5 => Some(Message::Collation(Decode::decode(input)?, Decode::decode(input)?)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message) {
|
||||
let encoded = ::serde_json::to_vec(&message).expect("serialization of messages infallible; qed");
|
||||
let encoded = message.encode();
|
||||
ctx.send_message(to, generic_message::Message::ChainSpecific(encoded))
|
||||
}
|
||||
|
||||
@@ -244,9 +288,7 @@ impl PolkadotProtocol {
|
||||
/// Send a statement to a validator.
|
||||
fn send_statement(&mut self, ctx: &mut Context<Block>, _val: SessionKey, parent_hash: Hash, statement: SignedStatement) {
|
||||
// TODO: something more targeted than gossip.
|
||||
let raw = ::serde_json::to_vec(&Message::Statement(parent_hash, statement))
|
||||
.expect("message serialization infallible; qed");
|
||||
|
||||
let raw = Message::Statement(parent_hash, statement).encode();
|
||||
self.consensus_gossip.multicast_chain_specific(ctx, raw, parent_hash);
|
||||
}
|
||||
|
||||
@@ -427,7 +469,7 @@ impl Specialization<Block> for PolkadotProtocol {
|
||||
);
|
||||
}
|
||||
|
||||
let validator = status.roles.iter().any(|r| *r == message::Role::Authority);
|
||||
let validator = status.roles.contains(substrate_network::Roles::AUTHORITY);
|
||||
let send_key = validator || local_status.collating_for.is_some();
|
||||
|
||||
self.peers.insert(peer_id, PeerInfo {
|
||||
@@ -436,7 +478,7 @@ impl Specialization<Block> for PolkadotProtocol {
|
||||
validator,
|
||||
});
|
||||
|
||||
self.consensus_gossip.new_peer(ctx, peer_id, &status.roles);
|
||||
self.consensus_gossip.new_peer(ctx, peer_id, status.roles);
|
||||
if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) {
|
||||
send_polkadot_message(
|
||||
ctx,
|
||||
@@ -497,11 +539,11 @@ impl Specialization<Block> for PolkadotProtocol {
|
||||
self.consensus_gossip.on_bft_message(ctx, peer_id, msg)
|
||||
}
|
||||
generic_message::Message::ChainSpecific(raw) => {
|
||||
match serde_json::from_slice(&raw) {
|
||||
Ok(msg) => self.on_polkadot_message(ctx, peer_id, raw, msg),
|
||||
Err(e) => {
|
||||
trace!(target: "p_net", "Bad message from {}: {}", peer_id, e);
|
||||
ctx.disable_peer(peer_id, "Unknown Polkadot-protocol reason");
|
||||
match Message::decode(&mut raw.as_slice()) {
|
||||
Some(msg) => self.on_polkadot_message(ctx, peer_id, raw, msg),
|
||||
None => {
|
||||
trace!(target: "p_net", "Bad message from {}", peer_id);
|
||||
ctx.disable_peer(peer_id, "Invalid polkadot protocol message format");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ use polkadot_primitives::{Block, Hash, SessionKey};
|
||||
use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData};
|
||||
use substrate_primitives::H512;
|
||||
use codec::Encode;
|
||||
use substrate_network::{PeerId, PeerInfo, ClientHandle, Context, message::Message as SubstrateMessage, message::Role, specialization::Specialization, generic_message::Message as GenericMessage};
|
||||
use substrate_network::{PeerId, PeerInfo, ClientHandle, Context, Roles, message::Message as SubstrateMessage, specialization::Specialization, generic_message::Message as GenericMessage};
|
||||
|
||||
use std::sync::Arc;
|
||||
use futures::Future;
|
||||
@@ -62,7 +62,7 @@ impl TestContext {
|
||||
fn has_message(&self, to: PeerId, message: Message) -> bool {
|
||||
use substrate_network::generic_message::Message as GenericMessage;
|
||||
|
||||
let encoded = ::serde_json::to_vec(&message).unwrap();
|
||||
let encoded = message.encode();
|
||||
self.messages.iter().any(|&(ref peer, ref msg)| match msg {
|
||||
GenericMessage::ChainSpecific(ref data) => peer == &to && data == &encoded,
|
||||
_ => false,
|
||||
@@ -70,7 +70,7 @@ impl TestContext {
|
||||
}
|
||||
}
|
||||
|
||||
fn make_status(status: &Status, roles: Vec<Role>) -> FullStatus {
|
||||
fn make_status(status: &Status, roles: Roles) -> FullStatus {
|
||||
FullStatus {
|
||||
version: 1,
|
||||
roles,
|
||||
@@ -78,9 +78,6 @@ fn make_status(status: &Status, roles: Vec<Role>) -> FullStatus {
|
||||
best_hash: Default::default(),
|
||||
genesis_hash: Default::default(),
|
||||
chain_status: status.encode(),
|
||||
parachain_id: None,
|
||||
validator_id: None,
|
||||
validator_signature: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,7 +94,7 @@ fn make_consensus(parent_hash: Hash, local_key: SessionKey) -> (CurrentConsensus
|
||||
}
|
||||
|
||||
fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: PeerId, message: Message) {
|
||||
let encoded = ::serde_json::to_vec(&message).unwrap();
|
||||
let encoded = message.encode();
|
||||
protocol.on_message(ctx, from, GenericMessage::ChainSpecific(encoded));
|
||||
}
|
||||
|
||||
@@ -115,7 +112,7 @@ fn sends_session_key() {
|
||||
|
||||
{
|
||||
let mut ctx = TestContext::default();
|
||||
protocol.on_connect(&mut ctx, peer_a, make_status(&validator_status, vec![Role::Authority]));
|
||||
protocol.on_connect(&mut ctx, peer_a, make_status(&validator_status, Roles::AUTHORITY));
|
||||
assert!(ctx.messages.is_empty());
|
||||
}
|
||||
|
||||
@@ -129,7 +126,7 @@ fn sends_session_key() {
|
||||
|
||||
{
|
||||
let mut ctx = TestContext::default();
|
||||
protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, vec![]));
|
||||
protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, Roles::NONE));
|
||||
assert!(ctx.has_message(peer_b, Message::SessionKey(parent_hash, local_key)));
|
||||
}
|
||||
}
|
||||
@@ -171,7 +168,7 @@ fn fetches_from_those_with_knowledge() {
|
||||
// connect peer A
|
||||
{
|
||||
let mut ctx = TestContext::default();
|
||||
protocol.on_connect(&mut ctx, peer_a, make_status(&status, vec![Role::Authority]));
|
||||
protocol.on_connect(&mut ctx, peer_a, make_status(&status, Roles::AUTHORITY));
|
||||
assert!(ctx.has_message(peer_a, Message::SessionKey(parent_hash, local_key)));
|
||||
}
|
||||
|
||||
@@ -187,7 +184,7 @@ fn fetches_from_those_with_knowledge() {
|
||||
// peer B connects and sends session key. request already assigned to A
|
||||
{
|
||||
let mut ctx = TestContext::default();
|
||||
protocol.on_connect(&mut ctx, peer_b, make_status(&status, vec![Role::Authority]));
|
||||
protocol.on_connect(&mut ctx, peer_b, make_status(&status, Roles::AUTHORITY));
|
||||
on_message(&mut protocol, &mut ctx, peer_b, Message::SessionKey(parent_hash, b_key));
|
||||
assert!(!ctx.has_message(peer_b, Message::RequestBlockData(2, candidate_hash)));
|
||||
|
||||
@@ -220,7 +217,7 @@ fn remove_bad_collator() {
|
||||
|
||||
{
|
||||
let mut ctx = TestContext::default();
|
||||
protocol.on_connect(&mut ctx, peer_id, make_status(&status, vec![]));
|
||||
protocol.on_connect(&mut ctx, peer_id, make_status(&status, Roles::NONE));
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
@@ -224,6 +224,22 @@ pub struct Collation {
|
||||
pub receipt: CandidateReceipt,
|
||||
}
|
||||
|
||||
impl Decode for Collation {
|
||||
fn decode<I: Input>(input: &mut I) -> Option<Self> {
|
||||
Some(Collation {
|
||||
block_data: Decode::decode(input)?,
|
||||
receipt: Decode::decode(input)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Encode for Collation {
|
||||
fn encode_to<T: Output>(&self, dest: &mut T) {
|
||||
dest.push(&self.block_data);
|
||||
dest.push(&self.receipt);
|
||||
}
|
||||
}
|
||||
|
||||
/// Parachain ingress queue message.
|
||||
#[derive(PartialEq, Eq, Clone)]
|
||||
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
|
||||
@@ -253,6 +269,18 @@ impl BlockData {
|
||||
}
|
||||
}
|
||||
|
||||
impl Decode for BlockData {
|
||||
fn decode<I: Input>(input: &mut I) -> Option<Self> {
|
||||
Some(BlockData(Decode::decode(input)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl Encode for BlockData {
|
||||
fn encode_to<T: Output>(&self, dest: &mut T) {
|
||||
dest.push(&self.0);
|
||||
}
|
||||
}
|
||||
|
||||
/// Parachain header raw bytes wrapper type.
|
||||
#[derive(PartialEq, Eq)]
|
||||
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
|
||||
|
||||
@@ -52,7 +52,7 @@ use client::Client;
|
||||
use polkadot_network::{PolkadotProtocol, consensus::ConsensusNetwork};
|
||||
use tokio::runtime::TaskExecutor;
|
||||
|
||||
pub use service::{Configuration, Role, PruningMode, ExtrinsicPoolOptions,
|
||||
pub use service::{Configuration, Roles, PruningMode, ExtrinsicPoolOptions,
|
||||
ErrorKind, Error, ComponentBlock, LightComponents, FullComponents};
|
||||
pub use client::ExecutionStrategy;
|
||||
|
||||
@@ -166,7 +166,7 @@ pub fn new_light(config: Configuration<GenesisConfig>, executor: TaskExecutor)
|
||||
pub fn new_full(config: Configuration<GenesisConfig>, executor: TaskExecutor)
|
||||
-> Result<Service<FullComponents<Factory>>, Error>
|
||||
{
|
||||
let is_validator = (config.roles & Role::AUTHORITY) == Role::AUTHORITY;
|
||||
let is_validator = (config.roles & Roles::AUTHORITY) == Roles::AUTHORITY;
|
||||
let service = service::Service::<FullComponents<Factory>>::new(config, executor.clone())?;
|
||||
// Spin consensus service if configured
|
||||
let consensus = if is_validator {
|
||||
|
||||
@@ -7,5 +7,3 @@ authors = ["Parity Technologies <admin@parity.io>"]
|
||||
substrate-codec = { path = "../../substrate/codec" }
|
||||
substrate-primitives = { path = "../../substrate/primitives" }
|
||||
polkadot-primitives = { path = "../primitives" }
|
||||
serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
|
||||
@@ -70,7 +70,7 @@ pub trait Context {
|
||||
}
|
||||
|
||||
/// Statements circulated among peers.
|
||||
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(PartialEq, Eq, Debug, Clone)]
|
||||
pub enum Statement<C, D> {
|
||||
/// Broadcast by a authority to indicate that this is his candidate for
|
||||
/// inclusion.
|
||||
@@ -141,7 +141,7 @@ impl<C: Decode, D: Decode> Decode for Statement<C, D> {
|
||||
}
|
||||
|
||||
/// A signed statement.
|
||||
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(PartialEq, Eq, Debug, Clone)]
|
||||
pub struct SignedStatement<C, D, V, S> {
|
||||
/// The statement.
|
||||
pub statement: Statement<C, D>,
|
||||
@@ -151,6 +151,23 @@ pub struct SignedStatement<C, D, V, S> {
|
||||
pub sender: V,
|
||||
}
|
||||
|
||||
impl<C: Encode, D: Encode, V: Encode, S: Encode> Encode for SignedStatement<C, D, V, S> {
|
||||
fn encode_to<T: Output>(&self, dest: &mut T) {
|
||||
dest.push(&self.statement);
|
||||
dest.push(&self.signature);
|
||||
dest.push(&self.sender);
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: Decode, D: Decode, V: Decode, S: Decode> Decode for SignedStatement<C, D, V, S> {
|
||||
fn decode<I: Input>(value: &mut I) -> Option<Self> {
|
||||
Some(SignedStatement {
|
||||
statement: Decode::decode(value)?,
|
||||
signature: Decode::decode(value)?,
|
||||
sender: Decode::decode(value)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
/// Misbehavior: voting more than one way on candidate validity.
|
||||
///
|
||||
/// Since there are three possible ways to vote, a double vote is possible in
|
||||
|
||||
@@ -18,10 +18,6 @@ extern crate substrate_codec as codec;
|
||||
extern crate substrate_primitives;
|
||||
extern crate polkadot_primitives as primitives;
|
||||
|
||||
extern crate serde;
|
||||
#[macro_use]
|
||||
extern crate serde_derive;
|
||||
|
||||
pub mod generic;
|
||||
|
||||
pub use generic::Table;
|
||||
|
||||
Reference in New Issue
Block a user