Minimal switch of substrate-node to GRANDPA /Aura (#1128)

* add beginnings of SRML grandpa library

* get srml-grandpa compiling

* tests for srml-grandpa

* add optional session integration to grandpa SRML

* start integration into node runtime

* Allow extracting pending change from header digest

* Make it compile on wasm

* make tests compile again

* Move Authority Key fetching into service, simplify service factory construction

* Generalize Authority Consensus Setup system

* Add Authority Setup Docs

* Allow CLI params to be extensible

 - move params to structopts
 - split parsing and default command execution
 - add custom config to node
 - extended parsing of custom config
 - extending params via structop's flatten

* Minor fixes on cli extension params:
 - added docs
 - re-add actual app name, rather than node-name
 - make strategy and subcommand optional

* better cli params

* synchronize GRANDPA and normal node authorities

* Implement grandpa::network for gossip consensus

* run_grandpa in Node

* Fix missed merge error

* Integrate grandpa import queue

* more specific type def

* link up linkhalf and import block

* make grandpa future send

* get compiling

* Fix new params convention and license header

* get it running

* rebuild node runtime WASM

* change logging level

* Update node/cli/src/params.rs

Co-Authored-By: rphmeier <rphmeier@gmail.com>

* Update node/cli/src/params.rs

Co-Authored-By: rphmeier <rphmeier@gmail.com>

* Update node/cli/src/lib.rs

Co-Authored-By: rphmeier <rphmeier@gmail.com>

* Update node/runtime/src/lib.rs

Co-Authored-By: rphmeier <rphmeier@gmail.com>

* Update node/cli/src/lib.rs

Co-Authored-By: rphmeier <rphmeier@gmail.com>

* Clean up and Fixme for mutable config

* Move GrandpaService Integration into grandpa, feature gated but on per default

* Fixing grandpa runtime module test

* Update wasm runtime hashes for tests

* GRANDPA: use post-header hash when logging scheduled changes

* add an extra bit of logging to authorities

* fixing missing constrain

* remove old code

* move `NewAuthorities` to an event in srml-grandpa

* fix node-executor tests to use grandpa log

* Remove GossipConsensus from tests, use newly provided sync-feature, fixes tests

* Update to latest wasm runtimes

* address grumbles

* address grumbles

* only derive deserialize when using std

* Clean up use of Deserialize
This commit is contained in:
Robert Habermeier
2018-11-21 18:42:50 +01:00
committed by GitHub
parent 84da9d4a02
commit 11fe84a742
59 changed files with 1694 additions and 696 deletions
+3 -55
View File
@@ -24,14 +24,11 @@ use rand::{self, Rng};
use network_libp2p::NodeIndex;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor};
use runtime_primitives::generic::BlockId;
use message::generic::{Message, ConsensusMessage};
pub use message::generic::{Message, ConsensusMessage};
use protocol::Context;
use config::Roles;
use specialization::NetworkSpecialization;
use StatusMessage;
use generic_message;
// TODO: Add additional spam/DoS attack protection.
// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
const MESSAGE_LIFETIME: Duration = Duration::from_secs(600);
struct PeerConsensus<H> {
@@ -55,10 +52,7 @@ pub struct ConsensusGossip<B: BlockT> {
session_start: Option<B::Hash>,
}
impl<B: BlockT> ConsensusGossip<B>
where
B::Header: HeaderT<Number=u64>
{
impl<B: BlockT> ConsensusGossip<B> {
/// Create a new instance.
pub fn new() -> Self {
ConsensusGossip {
@@ -262,52 +256,6 @@ where
}
}
impl<Block: BlockT> NetworkSpecialization<Block> for ConsensusGossip<Block> where
Block::Header: HeaderT<Number=u64>
{
fn status(&self) -> Vec<u8> {
Vec::new()
}
fn on_connect(&mut self, ctx: &mut Context<Block>, who: NodeIndex, status: StatusMessage<Block>) {
self.new_peer(ctx, who, status.roles);
}
fn on_disconnect(&mut self, ctx: &mut Context<Block>, who: NodeIndex) {
self.peer_disconnected(ctx, who);
}
fn on_message(
&mut self,
ctx: &mut Context<Block>,
who: NodeIndex,
message: &mut Option<::message::Message<Block>>
) {
match message.take() {
Some(generic_message::Message::Consensus(topic, msg)) => {
trace!(target: "gossip", "Consensus message from {}: {:?}", who, msg);
self.on_incoming(ctx, who, topic, msg);
}
r => *message = r,
}
}
fn on_abort(&mut self) {
self.abort();
}
fn maintain_peers(&mut self, _ctx: &mut Context<Block>) {
self.collect_garbage(|_| true);
}
fn on_block_imported(
&mut self,
_ctx: &mut Context<Block>,
_hash: <Block as BlockT>::Hash,
_header: &<Block as BlockT>::Header)
{}
}
#[cfg(test)]
mod tests {
use runtime_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper};
+1 -1
View File
@@ -65,7 +65,7 @@ pub mod specialization;
pub mod test;
pub use chain::Client as ClientHandle;
pub use service::{Service, FetchFuture, TransactionPool, ManageNetwork, SyncProvider};
pub use service::{Service, FetchFuture, TransactionPool, ManageNetwork, SyncProvider, ExHashT};
pub use protocol::{ProtocolStatus, PeerInfo, Context};
pub use sync::{Status as SyncStatus, SyncState};
pub use network_libp2p::{NodeIndex, ProtocolId, Severity, Protocol};
+17
View File
@@ -27,6 +27,7 @@ use codec::{Encode, Decode};
use message::{self, Message};
use message::generic::Message as GenericMessage;
use consensus_gossip::ConsensusGossip;
use specialization::NetworkSpecialization;
use sync::{ChainSync, Status as SyncStatus, SyncState};
use service::{TransactionPool, ExHashT};
@@ -57,6 +58,7 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
genesis_hash: B::Hash,
sync: Arc<RwLock<ChainSync<B>>>,
specialization: RwLock<S>,
consensus_gossip: RwLock<ConsensusGossip<B>>,
context_data: ContextData<B, H>,
// Connected peers pending Status message.
handshaking_peers: RwLock<HashMap<NodeIndex, time::Instant>>,
@@ -207,6 +209,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
genesis_hash: info.chain.genesis_hash,
sync: Arc::new(RwLock::new(sync)),
specialization: RwLock::new(specialization),
consensus_gossip: RwLock::new(ConsensusGossip::new()),
handshaking_peers: RwLock::new(HashMap::new()),
transaction_pool: transaction_pool,
};
@@ -221,6 +224,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
&self.sync
}
pub(crate) fn consensus_gossip<'a>(&'a self) -> &'a RwLock<ConsensusGossip<B>> {
&self.consensus_gossip
}
/// Returns protocol status
pub fn status(&self) -> ProtocolStatus<B> {
let sync = self.sync.read();
@@ -278,6 +286,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(io, who, response),
GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(io, who, request),
GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(io, who, response),
GenericMessage::Consensus(topic, msg) => {
self.consensus_gossip.write().on_incoming(&mut ProtocolContext::new(&self.context_data, io), who, topic, msg);
},
other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, &mut Some(other)),
}
}
@@ -297,6 +308,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: NodeIndex) {
trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_debug_info(peer));
// lock all the the peer lists so that add/remove peer events are in order
let mut sync = self.sync.write();
let mut spec = self.specialization.write();
@@ -309,6 +321,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
};
if removed {
let mut context = ProtocolContext::new(&self.context_data, io);
self.consensus_gossip.write().peer_disconnected(&mut context, peer);
sync.peer_disconnected(&mut context, peer);
spec.on_disconnect(&mut context, peer);
self.on_demand.as_ref().map(|s| s.on_disconnect(peer));
@@ -391,6 +404,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Perform time based maintenance.
pub fn tick(&self, io: &mut SyncIo) {
self.consensus_gossip.write().collect_garbage(|_| true);
self.maintain_peers(io);
self.on_demand.as_ref().map(|s| s.maintain_peers(io));
}
@@ -478,6 +492,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let mut context = ProtocolContext::new(&self.context_data, io);
self.on_demand.as_ref().map(|s| s.on_connect(who, status.roles, status.best_number));
self.sync.write().new_peer(&mut context, who);
self.consensus_gossip.write().new_peer(&mut context, who, status.roles);
self.specialization.write().on_connect(&mut context, who, status);
}
@@ -555,10 +570,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let mut spec = self.specialization.write();
let mut peers = self.context_data.peers.write();
let mut handshaking_peers = self.handshaking_peers.write();
let mut consensus_gossip = self.consensus_gossip.write();
sync.clear();
spec.on_abort();
peers.clear();
handshaking_peers.clear();
consensus_gossip.abort();
}
pub fn stop(&self) {
+10 -4
View File
@@ -19,11 +19,12 @@ use std::sync::Arc;
use std::{io, thread};
use std::time::Duration;
use futures::{self, Future, Stream, stream, sync::oneshot};
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use network_libp2p::{ProtocolId, PeerId, NetworkConfiguration, ErrorKind};
use network_libp2p::{start_service, Service as NetworkService, ServiceEvent as NetworkServiceEvent};
use network_libp2p::{RegisteredProtocol, parse_str_addr, Protocol as Libp2pProtocol};
use io::NetSyncIo;
use consensus_gossip::ConsensusGossip;
use protocol::{self, Protocol, ProtocolContext, Context, ProtocolStatus};
use config::Params;
use error::Error;
@@ -44,6 +45,7 @@ pub trait SyncProvider<B: BlockT>: Send + Sync {
fn status(&self) -> ProtocolStatus<B>;
}
/// Minimum Requirements for a Hash within Networking
pub trait ExHashT: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static {}
impl<T> ExHashT for T where T: ::std::hash::Hash + Eq + ::std::fmt::Debug + Clone + Send + Sync + 'static {}
@@ -82,9 +84,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Service<B, S,
pub fn new<I: 'static + ImportQueue<B>>(
params: Params<B, S, H>,
protocol_id: ProtocolId,
import_queue: I,
import_queue: Arc<I>,
) -> Result<Arc<Service<B, S, H>>, Error> {
let import_queue = Arc::new(import_queue);
let handler = Arc::new(Protocol::new(
params.config,
params.chain,
@@ -101,7 +102,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Service<B, S,
network,
protocol_id,
handler,
bg_thread: Some(thread),
bg_thread: Some(thread)
});
// connect the import-queue to the network service.
@@ -131,6 +132,11 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Service<B, S,
{
self.handler.with_spec(&mut NetSyncIo::new(&self.network, self.protocol_id), f)
}
/// access the underlying consensus gossip handler
pub fn consensus_gossip<'a>(&'a self) -> &'a RwLock<ConsensusGossip<B>> {
self.handler.consensus_gossip()
}
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> ::consensus::SyncOracle for Service<B, S, H> {
+15 -20
View File
@@ -58,31 +58,23 @@ impl ExecuteInContext<Block> for DummyContextExecutor {
}
/// The test specialization.
pub struct DummySpecialization {
/// Consensus gossip handle.
pub gossip: ConsensusGossip<Block>,
}
pub struct DummySpecialization { }
impl NetworkSpecialization<Block> for DummySpecialization {
fn status(&self) -> Vec<u8> { vec![] }
fn on_connect(&mut self, ctx: &mut Context<Block>, peer_id: NodeIndex, status: ::message::Status<Block>) {
self.gossip.new_peer(ctx, peer_id, status.roles);
fn on_connect(&mut self, _ctx: &mut Context<Block>, _peer_id: NodeIndex, _status: ::message::Status<Block>) {
}
fn on_disconnect(&mut self, ctx: &mut Context<Block>, peer_id: NodeIndex) {
self.gossip.peer_disconnected(ctx, peer_id);
fn on_disconnect(&mut self, _ctx: &mut Context<Block>, _peer_id: NodeIndex) {
}
fn on_message(
&mut self,
ctx: &mut Context<Block>,
peer_id: NodeIndex,
message: &mut Option<::message::Message<Block>>
_ctx: &mut Context<Block>,
_peer_id: NodeIndex,
_message: &mut Option<::message::Message<Block>>
) {
if let Some(::message::generic::Message::Consensus(topic, data)) = message.take() {
self.gossip.on_incoming(ctx, peer_id, topic, data);
}
}
}
@@ -179,6 +171,10 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
self.sync.on_peer_connected(&mut TestIo::new(&self.queue, Some(other)), other);
}
pub fn consensus_gossip(&self) -> &RwLock<ConsensusGossip<Block>> {
self.sync.consensus_gossip()
}
/// Called on disconnect from other indicated peer.
fn on_disconnect(&self, other: NodeIndex) {
let mut io = TestIo::new(&self.queue, Some(other));
@@ -233,9 +229,10 @@ impl<V: 'static + Verifier<Block>, D> Peer<V, D> {
/// Push a message into the gossip network and relay to peers.
/// `TestNet::sync_step` needs to be called to ensure it's propagated.
pub fn gossip_message(&self, topic: Hash, data: Vec<u8>) {
self.sync.with_spec(&mut TestIo::new(&self.queue, None), |spec, ctx| {
spec.gossip.multicast(ctx, topic, data);
})
let gossip = self.sync.consensus_gossip();
self.sync.with_spec(&mut TestIo::new(&self.queue, None), move |_s, context|{
gossip.write().multicast(context, topic, data);
});
}
/// Add blocks to the peer -- edit the block before adding
@@ -363,9 +360,7 @@ pub trait TestNetFactory: Sized {
let (block_import, data) = self.make_block_import(client.clone());
let import_queue = Arc::new(SyncImportQueue::new(verifier, block_import));
let specialization = DummySpecialization {
gossip: ConsensusGossip::new(),
};
let specialization = DummySpecialization { };
let sync = Protocol::new(
config.clone(),
client.clone(),