core/authority-discovery: Enable authorities to discover each other (#3452)

With the *authority-discovery* module an authoritative node makes itself
discoverable and is able to discover other authorities. Once discovered, a node
can directly connect to other authorities instead of multi-hop gossiping
information.

1. **Making itself discoverable**

    1. Retrieve its external addresses

    2. Adds its network peer id to the addresses

    3. Sign the above

    4. Put the signature and the addresses on the libp2p Kademlia DHT

2. **Discovering other authorities**

    1. Retrieve the current set of authorities

    2. Start DHT queries for the ids of the authorities

    3. Validate the signatures of the retrieved key value pairs

    4. Add the retrieved external addresses as ~reserved~ priority nodes to the
       peerset


* node/runtime: Add authority-discovery as session handler

The srml/authority-discovery module implements the OneSessionHandler in
order to keep its authority set in sync. This commit adds the module to
the set of session handlers.

* core/network: Make network worker return Dht events on poll

Instead of network worker implement the Future trait, have it implement
the Stream interface returning Dht events.

For now these events are ignored in build_network_future but will be
used by the core/authority-discovery module in subsequent commits.

* *: Add scaffolding and integration for core/authority-discovery module

* core/authority-discovery: Implement module logic itself
This commit is contained in:
Max Inden
2019-09-06 17:43:03 +02:00
committed by GitHub
parent ece0b57d8d
commit 027d88796b
19 changed files with 1041 additions and 62 deletions
+102
View File
@@ -876,6 +876,11 @@ dependencies = [
"static_assertions 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fixedbitset"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "flate2"
version = "1.0.9"
@@ -2236,6 +2241,11 @@ dependencies = [
"ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "multimap"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "multistream-select"
version = "0.5.1"
@@ -2324,6 +2334,7 @@ dependencies = [
"srml-system 2.0.0",
"srml-timestamp 2.0.0",
"structopt 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-authority-discovery 2.0.0",
"substrate-basic-authorship 2.0.0",
"substrate-cli 2.0.0",
"substrate-client 2.0.0",
@@ -2934,6 +2945,14 @@ name = "percent-encoding"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "petgraph"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "pin-utils"
version = "0.1.0-alpha.4"
@@ -3005,6 +3024,54 @@ dependencies = [
"unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "prost"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"prost-derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "prost-build"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)",
"prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"prost-types 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"which 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "prost-derive"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)",
"quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)",
"syn 0.15.42 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "prost-types"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "protobuf"
version = "2.8.0"
@@ -4440,6 +4507,32 @@ dependencies = [
"substrate-test-runtime-client 2.0.0",
]
[[package]]
name = "substrate-authority-discovery"
version = "2.0.0"
dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)",
"libp2p 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-scale-codec 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)",
"sr-primitives 2.0.0",
"substrate-authority-discovery-primitives 2.0.0",
"substrate-client 2.0.0",
"substrate-keystore 2.0.0",
"substrate-network 2.0.0",
"substrate-peerset 2.0.0",
"substrate-primitives 2.0.0",
"substrate-test-runtime-client 2.0.0",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "substrate-authority-discovery-primitives"
version = "2.0.0"
@@ -5134,6 +5227,8 @@ dependencies = [
"sr-io 2.0.0",
"sr-primitives 2.0.0",
"substrate-application-crypto 2.0.0",
"substrate-authority-discovery 2.0.0",
"substrate-authority-discovery-primitives 2.0.0",
"substrate-client 2.0.0",
"substrate-client-db 2.0.0",
"substrate-consensus-babe-primitives 2.0.0",
@@ -6433,6 +6528,7 @@ dependencies = [
"checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa"
"checksum finality-grandpa 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9681c1f75941ea47584573dd2bc10558b2067d460612945887e00744e43393be"
"checksum fixed-hash 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "516877b7b9a1cc2d0293cbce23cd6203f0edbfd4090e6ca4489fecb5aa73050e"
"checksum fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33"
"checksum flate2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "550934ad4808d5d39365e5d61727309bf18b3b02c6c56b729cb92e7dd84bc3d8"
"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3"
"checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
@@ -6561,6 +6657,7 @@ dependencies = [
"checksum mio-extras 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "46e73a04c2fa6250b8d802134d56d554a9ec2922bf977777c805ea5def61ce40"
"checksum mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)" = "966257a94e196b11bb43aca423754d87429960a768de9414f3691d6957abf125"
"checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919"
"checksum multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151"
"checksum multistream-select 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e8f3cb4c93f2d79811fc11fa01faab99d8b7b8cbe024b602c27434ff2b08a59d"
"checksum names 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ef320dab323286b50fb5cdda23f61c796a72a89998ab565ca32525c5c556f2da"
"checksum native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4b2df1a4c22fd44a62147fd8f13dd0f95c9d8ca7b2610299b2a2f9cf8964274e"
@@ -6607,6 +6704,7 @@ dependencies = [
"checksum peeking_take_while 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
"checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831"
"checksum percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
"checksum petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "9c3659d1ee90221741f65dd128d9998311b0e40c5d3c23a62445938214abce4f"
"checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587"
"checksum pkg-config 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c1d2cfa5a714db3b5f24f0915e74fcdf91d09d496ba61329705dda7774d2af"
"checksum ppv-lite86 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e3cbf9f658cdb5000fcf6f362b8ea2ba154b9f146a61c7a20d647034c6b6561b"
@@ -6616,6 +6714,10 @@ dependencies = [
"checksum proc-macro-hack 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)" = "982a35d1194084ba319d65c4a68d24ca28f5fdb5b8bc20899e4eef8641ea5178"
"checksum proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)" = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759"
"checksum proc-macro2 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "175a40b9cf564ce9bf050654633dbf339978706b8ead1a907bb970b63185dd95"
"checksum prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "96d14b1c185652833d24aaad41c5832b0be5616a590227c1fbff57c616754b23"
"checksum prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "eb788126ea840817128183f8f603dce02cb7aea25c2a0b764359d8e20010702e"
"checksum prost-derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5e7dc378b94ac374644181a2247cebf59a6ec1c88b49ac77f3a94b86b79d0e11"
"checksum prost-types 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1de482a366941c8d56d19b650fac09ca08508f2a696119ee7513ad590c8bac6f"
"checksum protobuf 2.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8aefcec9f142b524d98fc81d07827743be89dd6586a1ba6ab21fa66a500b3fa5"
"checksum pwasm-utils 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "efb0dcbddbb600f47a7098d33762a00552c671992171637f5bb310b37fe1f0e4"
"checksum quick-error 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5fb6ccf8db7bbcb9c2eae558db5ab4f3da1c2a87e4e597ed394726bc8ea6ca1d"
+1
View File
@@ -64,6 +64,7 @@ members = [
"core/utils/fork-tree",
"core/utils/wasm-builder",
"core/utils/wasm-builder-runner",
"core/authority-discovery",
"srml/support",
"srml/support/procedural",
"srml/support/procedural/tools",
@@ -0,0 +1,32 @@
[package]
name = "substrate-authority-discovery"
version = "2.0.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
build = "build.rs"
[build-dependencies]
prost-build = "0.5"
[dependencies]
authority-discovery-primitives = { package = "substrate-authority-discovery-primitives", path = "./primitives", default-features = false }
bytes = "0.4"
client = { package = "substrate-client", path = "../../core/client" }
codec = { package = "parity-scale-codec", default-features = false, version = "1.0.3" }
derive_more = "0.14.0"
futures = "0.1"
keystore = { package = "substrate-keystore", path = "../../core/keystore" }
libp2p = { version = "0.12.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] }
log = "0.4"
network = { package = "substrate-network", path = "../../core/network" }
primitives = { package = "substrate-primitives", path = "../primitives" }
prost = "0.5"
serde_json = "1.0"
sr-primitives = { path = "../../core/sr-primitives" }
tokio-timer = "0.2"
[dev-dependencies]
parking_lot = { version = "0.9.0" }
peerset = { package = "substrate-peerset", path = "../../core/peerset" }
test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" }
tokio = { version = "0.1"}
@@ -0,0 +1,3 @@
fn main() {
prost_build::compile_protos(&["src/schema/dht.proto"], &["src/schema"]).unwrap();
}
@@ -19,9 +19,15 @@
#![cfg_attr(not(feature = "std"), no_std)]
use client::decl_runtime_apis;
use codec::Codec;
use rstd::vec::Vec;
#[derive(codec::Encode, codec::Decode, Eq, PartialEq, Clone)]
#[cfg_attr(feature = "std", derive(Debug, Hash))]
pub struct Signature(pub Vec<u8>);
#[derive(codec::Encode, codec::Decode, Eq, PartialEq, Clone)]
#[cfg_attr(feature = "std", derive(Debug, Hash))]
pub struct AuthorityId(pub Vec<u8>);
decl_runtime_apis! {
/// The authority discovery api.
///
@@ -29,21 +35,15 @@ decl_runtime_apis! {
/// own authority identifier, to retrieve identifiers of the current authority
/// set, as well as sign and verify Kademlia Dht external address payloads
/// from and to other authorities.
pub trait AuthorityDiscoveryApi<AuthorityId: Codec> {
/// Returns own authority identifier iff it is part of the current authority
/// set, otherwise this function returns None. The restriction might be
/// softened in the future in case a consumer needs to learn own authority
/// identifier.
fn authority_id() -> Option<AuthorityId>;
pub trait AuthorityDiscoveryApi {
/// Retrieve authority identifiers of the current authority set.
fn authorities() -> Vec<AuthorityId>;
/// Sign the given payload with the private key corresponding to the given authority id.
fn sign(payload: Vec<u8>, authority_id: AuthorityId) -> Option<Vec<u8>>;
fn sign(payload: &Vec<u8>) -> Option<(Signature, AuthorityId)>;
/// Verify the given signature for the given payload with the given
/// authority identifier.
fn verify(payload: Vec<u8>, signature: Vec<u8>, authority_id: AuthorityId) -> bool;
fn verify(payload: &Vec<u8>, signature: &Signature, authority_id: &AuthorityId) -> bool;
}
}
@@ -0,0 +1,47 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Authority discovery errors.
/// AuthorityDiscovery Result.
pub type Result<T> = std::result::Result<T, Error>;
/// Error type for the authority discovery module.
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum Error {
/// Failed to verify a dht payload with the given signature.
VerifyingDhtPayload,
/// Failed to hash the authority id to be used as a dht key.
HashingAuthorityId(libp2p::core::multiaddr::multihash::EncodeError),
/// Failed calling into the Substrate runtime.
CallingRuntime(client::error::Error),
/// Failed signing the dht payload via the Substrate runtime.
SigningDhtPayload,
/// From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure it
/// is actually an authority, we match the hash against the hash of the authority id of all other authorities. This
/// error is the result of the above failing.
MatchingHashedAuthorityIdWithAuthorityId,
/// Failed to set the authority discovery peerset priority group in the peerset module.
SettingPeersetPriorityGroup(String),
/// Failed to encode a dht payload.
Encoding(prost::EncodeError),
/// Failed to decode a dht payload.
Decoding(prost::DecodeError),
/// Failed to parse a libp2p multi address.
ParsingMultiaddress(libp2p::core::multiaddr::Error),
/// Tokio timer error.
PollingTokioTimer(tokio_timer::Error)
}
@@ -0,0 +1,698 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
#![warn(missing_docs)]
//! Substrate authority discovery.
//!
//! This crate enables Substrate authorities to directly connect to other authorities. [`AuthorityDiscovery`] implements
//! the Future trait. By polling [`AuthorityDiscovery`] an authority:
//!
//!
//! 1. **Makes itself discoverable**
//!
//! 1. Retrieves its external addresses.
//!
//! 2. Adds its network peer id to the addresses.
//!
//! 3. Signs the above.
//!
//! 4. Puts the signature and the addresses on the libp2p Kademlia DHT.
//!
//!
//! 2. **Discovers other authorities**
//!
//! 1. Retrieves the current set of authorities.
//!
//! 2. Starts DHT queries for the ids of the authorities.
//!
//! 3. Validates the signatures of the retrieved key value pairs.
//!
//! 4. Adds the retrieved external addresses as priority nodes to the peerset.
use authority_discovery_primitives::{AuthorityDiscoveryApi, AuthorityId, Signature};
use client::blockchain::HeaderBackend;
use error::{Error, Result};
use futures::{prelude::*, sync::mpsc::Receiver};
use log::{debug, error, log_enabled, warn};
use network::specialization::NetworkSpecialization;
use network::{DhtEvent, ExHashT};
use prost::Message;
use sr_primitives::generic::BlockId;
use sr_primitives::traits::{Block as BlockT, ProvideRuntimeApi};
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::{Duration, Instant};
mod error;
/// Dht payload schemas generated from Protobuf definitions via Prost crate in build.rs.
mod schema {
include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs"));
}
/// An `AuthorityDiscovery` makes a given authority discoverable and discovers other authorities.
pub struct AuthorityDiscovery<Client, Network, Block>
where
Block: BlockT + 'static,
Network: NetworkProvider,
Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend<Block>,
<Client as ProvideRuntimeApi>::Api: AuthorityDiscoveryApi<Block>,
{
client: Arc<Client>,
network: Arc<Network>,
/// Channel we receive Dht events on.
dht_event_rx: Receiver<DhtEvent>,
/// Interval to be proactive, publishing own addresses.
publish_interval: tokio_timer::Interval,
/// Interval on which to query for addresses of other authorities.
query_interval: tokio_timer::Interval,
/// The network peerset interface for priority groups lets us only set an entire group, but we retrieve the
/// addresses of other authorities one by one from the network. To use the peerset interface we need to cache the
/// addresses and always overwrite the entire peerset priority group. To ensure this map doesn't grow indefinitely
/// `purge_old_authorities_from_cache` function is called each time we add a new entry.
address_cache: HashMap<AuthorityId, Vec<libp2p::Multiaddr>>,
phantom: PhantomData<Block>,
}
impl<Client, Network, Block> AuthorityDiscovery<Client, Network, Block>
where
Block: BlockT + 'static,
Network: NetworkProvider,
Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend<Block>,
<Client as ProvideRuntimeApi>::Api: AuthorityDiscoveryApi<Block>,
{
/// Return a new authority discovery.
pub fn new(
client: Arc<Client>,
network: Arc<Network>,
dht_event_rx: futures::sync::mpsc::Receiver<DhtEvent>,
) -> AuthorityDiscovery<Client, Network, Block> {
// Kademlia's default time-to-live for Dht records is 36h, republishing records every 24h. Given that a node
// could restart at any point in time, one can not depend on the republishing process, thus publishing own
// external addresses should happen on an interval < 36h.
let publish_interval =
tokio_timer::Interval::new(Instant::now(), Duration::from_secs(12 * 60 * 60));
// External addresses of other authorities can change at any given point in time. The interval on which to query
// for external addresses of other authorities is a trade off between efficiency and performance.
let query_interval =
tokio_timer::Interval::new(Instant::now(), Duration::from_secs(10 * 60));
let address_cache = HashMap::new();
AuthorityDiscovery {
client,
network,
dht_event_rx,
publish_interval,
query_interval,
address_cache,
phantom: PhantomData,
}
}
fn publish_own_ext_addresses(&mut self) -> Result<()> {
let id = BlockId::hash(self.client.info().best_hash);
let addresses = self
.network
.external_addresses()
.into_iter()
.map(|a| {
a.with(libp2p::core::multiaddr::Protocol::P2p(
self.network.local_peer_id().into(),
))
})
.map(|a| a.to_vec())
.collect();
let mut serialized_addresses = vec![];
schema::AuthorityAddresses { addresses }
.encode(&mut serialized_addresses)
.map_err(Error::Encoding)?;
let (signature, authority_id) = self
.client
.runtime_api()
.sign(&id, &serialized_addresses)
.map_err(Error::CallingRuntime)?
.ok_or(Error::SigningDhtPayload)?;
let mut signed_addresses = vec![];
schema::SignedAuthorityAddresses {
addresses: serialized_addresses,
signature: signature.0,
}
.encode(&mut signed_addresses)
.map_err(Error::Encoding)?;
self.network.put_value(
hash_authority_id(authority_id.0.as_ref())?,
signed_addresses,
);
Ok(())
}
fn request_addresses_of_others(&mut self) -> Result<()> {
let id = BlockId::hash(self.client.info().best_hash);
let authorities = self
.client
.runtime_api()
.authorities(&id)
.map_err(Error::CallingRuntime)?;
for authority_id in authorities.iter() {
self.network
.get_value(&hash_authority_id(authority_id.0.as_ref())?);
}
Ok(())
}
fn handle_dht_events(&mut self) -> Result<()> {
while let Ok(Async::Ready(Some(event))) = self.dht_event_rx.poll() {
match event {
DhtEvent::ValueFound(v) => {
if log_enabled!(log::Level::Debug) {
let hashes = v.iter().map(|(hash, _value)| hash.clone());
debug!(target: "sub-authority-discovery", "Value for hash '{:?}' found on Dht.", hashes);
}
self.handle_dht_value_found_event(v)?;
}
DhtEvent::ValueNotFound(hash) => {
warn!(target: "sub-authority-discovery", "Value for hash '{:?}' not found on Dht.", hash)
}
DhtEvent::ValuePut(hash) => {
debug!(target: "sub-authority-discovery", "Successfully put hash '{:?}' on Dht.", hash)
}
DhtEvent::ValuePutFailed(hash) => {
warn!(target: "sub-authority-discovery", "Failed to put hash '{:?}' on Dht.", hash)
}
}
}
Ok(())
}
fn handle_dht_value_found_event(
&mut self,
values: Vec<(libp2p::kad::record::Key, Vec<u8>)>,
) -> Result<()> {
debug!(target: "sub-authority-discovery", "Got Dht value from network.");
let id = BlockId::hash(self.client.info().best_hash);
// From the Dht we only get the hashed authority id. In order to retrieve the actual authority id and to ensure
// it is actually an authority, we match the hash against the hash of the authority id of all other authorities.
let authorities = self.client.runtime_api().authorities(&id)?;
self.purge_old_authorities_from_cache(&authorities);
let authorities = authorities
.into_iter()
.map(|a| hash_authority_id(a.0.as_ref()).map(|h| (h, a)))
.collect::<Result<HashMap<_, _>>>()?;
for (key, value) in values.iter() {
// Check if the event origins from an authority in the current authority set.
let authority_id: &AuthorityId = authorities
.get(key)
.ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?;
let schema::SignedAuthorityAddresses {
signature,
addresses,
} = schema::SignedAuthorityAddresses::decode(value).map_err(Error::Decoding)?;
let signature = Signature(signature);
let is_verified = self
.client
.runtime_api()
.verify(&id, &addresses, &signature, &authority_id.clone())
.map_err(Error::CallingRuntime)?;
if !is_verified {
return Err(Error::VerifyingDhtPayload);
}
let addresses: Vec<libp2p::Multiaddr> = schema::AuthorityAddresses::decode(addresses)
.map(|a| a.addresses)
.map_err(Error::Decoding)?
.into_iter()
.map(|a| a.try_into())
.collect::<std::result::Result<_, _>>()
.map_err(Error::ParsingMultiaddress)?;
self.address_cache.insert(authority_id.clone(), addresses);
}
// Let's update the peerset priority group with the all the addresses we have in our cache.
let addresses = HashSet::from_iter(
self.address_cache
.iter()
.map(|(_peer_id, addresses)| addresses.clone())
.flatten(),
);
debug!(target: "sub-authority-discovery", "Applying priority group {:#?} to peerset.", addresses);
self.network
.set_priority_group("authorities".to_string(), addresses)
.map_err(Error::SettingPeersetPriorityGroup)?;
Ok(())
}
fn purge_old_authorities_from_cache(&mut self, current_authorities: &Vec<AuthorityId>) {
self.address_cache
.retain(|peer_id, _addresses| current_authorities.contains(peer_id))
}
}
impl<Client, Network, Block> futures::Future for AuthorityDiscovery<Client, Network, Block>
where
Block: BlockT + 'static,
Network: NetworkProvider,
Client: ProvideRuntimeApi + Send + Sync + 'static + HeaderBackend<Block>,
<Client as ProvideRuntimeApi>::Api: AuthorityDiscoveryApi<Block>,
{
type Item = ();
type Error = ();
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
let mut inner = || -> Result<()> {
// Process incoming events before triggering new ones.
self.handle_dht_events()?;
if let Async::Ready(_) = self
.publish_interval
.poll()
.map_err(Error::PollingTokioTimer)?
{
// Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the
// function calls within this block do a `return`, we don't call `interval.poll` again and thereby the
// underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval
// tick.
while let Async::Ready(_) = self
.publish_interval
.poll()
.map_err(Error::PollingTokioTimer)?
{}
self.publish_own_ext_addresses()?;
}
if let Async::Ready(_) = self
.query_interval
.poll()
.map_err(Error::PollingTokioTimer)?
{
// Make sure to call interval.poll until it returns Async::NotReady once. Otherwise, in case one of the
// function calls within this block do a `return`, we don't call `interval.poll` again and thereby the
// underlying Tokio task is never registered with Tokio's Reactor to be woken up on the next interval
// tick.
while let Async::Ready(_) = self
.query_interval
.poll()
.map_err(Error::PollingTokioTimer)?
{}
self.request_addresses_of_others()?;
}
Ok(())
};
match inner() {
Ok(()) => {}
Err(e) => error!(target: "sub-authority-discovery", "Poll failure: {:?}", e),
};
// Make sure to always return NotReady as this is a long running task with the same lifetime as the node itself.
Ok(futures::Async::NotReady)
}
}
/// NetworkProvider provides AuthorityDiscovery with all necessary hooks into the underlying Substrate networking. Using
/// this trait abstraction instead of NetworkService directly is necessary to unit test AuthorityDiscovery.
pub trait NetworkProvider {
/// Returns the local external addresses.
fn external_addresses(&self) -> Vec<libp2p::Multiaddr>;
/// Returns the network identity of the node.
fn local_peer_id(&self) -> libp2p::PeerId;
/// Modify a peerset priority group.
fn set_priority_group(
&self,
group_id: String,
peers: HashSet<libp2p::Multiaddr>,
) -> std::result::Result<(), String>;
/// Start putting a value in the Dht.
fn put_value(&self, key: libp2p::kad::record::Key, value: Vec<u8>);
/// Start getting a value from the Dht.
fn get_value(&self, key: &libp2p::kad::record::Key);
}
impl<B, S, H> NetworkProvider for network::NetworkService<B, S, H>
where
B: BlockT + 'static,
S: NetworkSpecialization<B>,
H: ExHashT,
{
fn external_addresses(&self) -> Vec<libp2p::Multiaddr> {
self.external_addresses()
}
fn local_peer_id(&self) -> libp2p::PeerId {
self.local_peer_id()
}
fn set_priority_group(
&self,
group_id: String,
peers: HashSet<libp2p::Multiaddr>,
) -> std::result::Result<(), String> {
self.set_priority_group(group_id, peers)
}
fn put_value(&self, key: libp2p::kad::record::Key, value: Vec<u8>) {
self.put_value(key, value)
}
fn get_value(&self, key: &libp2p::kad::record::Key) {
self.get_value(key)
}
}
fn hash_authority_id(id: &[u8]) -> Result<libp2p::kad::record::Key> {
libp2p::multihash::encode(libp2p::multihash::Hash::SHA2256, id)
.map(|k| libp2p::kad::record::Key::new(&k))
.map_err(Error::HashingAuthorityId)
}
#[cfg(test)]
mod tests {
use super::*;
use client::runtime_api::{ApiExt, Core, RuntimeVersion};
use futures::future::poll_fn;
use primitives::{ExecutionContext, NativeOrEncoded};
use sr_primitives::traits::Zero;
use sr_primitives::traits::{ApiRef, Block as BlockT, NumberFor, ProvideRuntimeApi};
use std::sync::{Arc, Mutex};
use test_client::runtime::Block;
use tokio::runtime::current_thread;
#[derive(Clone)]
struct TestApi {}
impl ProvideRuntimeApi for TestApi {
type Api = RuntimeApi;
fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> {
RuntimeApi {}.into()
}
}
/// Blockchain database header backend. Does not perform any validation.
impl<Block: BlockT> HeaderBackend<Block> for TestApi {
fn header(
&self,
_id: BlockId<Block>,
) -> std::result::Result<Option<Block::Header>, client::error::Error> {
Ok(None)
}
fn info(&self) -> client::blockchain::Info<Block> {
client::blockchain::Info {
best_hash: Default::default(),
best_number: Zero::zero(),
finalized_hash: Default::default(),
finalized_number: Zero::zero(),
genesis_hash: Default::default(),
}
}
fn status(
&self,
_id: BlockId<Block>,
) -> std::result::Result<client::blockchain::BlockStatus, client::error::Error> {
Ok(client::blockchain::BlockStatus::Unknown)
}
fn number(
&self,
_hash: Block::Hash,
) -> std::result::Result<Option<NumberFor<Block>>, client::error::Error> {
Ok(None)
}
fn hash(
&self,
_number: NumberFor<Block>,
) -> std::result::Result<Option<Block::Hash>, client::error::Error> {
Ok(None)
}
}
struct RuntimeApi {}
impl Core<Block> for RuntimeApi {
fn Core_version_runtime_api_impl(
&self,
_: &BlockId<Block>,
_: ExecutionContext,
_: Option<()>,
_: Vec<u8>,
) -> std::result::Result<NativeOrEncoded<RuntimeVersion>, client::error::Error> {
unimplemented!("Not required for testing!")
}
fn Core_execute_block_runtime_api_impl(
&self,
_: &BlockId<Block>,
_: ExecutionContext,
_: Option<(Block)>,
_: Vec<u8>,
) -> std::result::Result<NativeOrEncoded<()>, client::error::Error> {
unimplemented!("Not required for testing!")
}
fn Core_initialize_block_runtime_api_impl(
&self,
_: &BlockId<Block>,
_: ExecutionContext,
_: Option<&<Block as BlockT>::Header>,
_: Vec<u8>,
) -> std::result::Result<NativeOrEncoded<()>, client::error::Error> {
unimplemented!("Not required for testing!")
}
}
impl ApiExt<Block> for RuntimeApi {
fn map_api_result<F: FnOnce(&Self) -> std::result::Result<R, E>, R, E>(
&self,
_: F,
) -> std::result::Result<R, E> {
unimplemented!("Not required for testing!")
}
fn runtime_version_at(
&self,
_: &BlockId<Block>,
) -> std::result::Result<RuntimeVersion, client::error::Error> {
unimplemented!("Not required for testing!")
}
fn record_proof(&mut self) {
unimplemented!("Not required for testing!")
}
fn extract_proof(&mut self) -> Option<Vec<Vec<u8>>> {
unimplemented!("Not required for testing!")
}
}
impl AuthorityDiscoveryApi<Block> for RuntimeApi {
fn AuthorityDiscoveryApi_authorities_runtime_api_impl(
&self,
_: &BlockId<Block>,
_: ExecutionContext,
_: Option<()>,
_: Vec<u8>,
) -> std::result::Result<NativeOrEncoded<Vec<AuthorityId>>, client::error::Error> {
return Ok(NativeOrEncoded::Native(vec![
AuthorityId("test-authority-id-1".as_bytes().to_vec()),
AuthorityId("test-authority-id-2".as_bytes().to_vec()),
]));
}
fn AuthorityDiscoveryApi_sign_runtime_api_impl(
&self,
_: &BlockId<Block>,
_: ExecutionContext,
_: Option<&std::vec::Vec<u8>>,
_: Vec<u8>,
) -> std::result::Result<
NativeOrEncoded<Option<(Signature, AuthorityId)>>,
client::error::Error,
> {
return Ok(NativeOrEncoded::Native(Some((
Signature("test-signature-1".as_bytes().to_vec()),
AuthorityId("test-authority-id-1".as_bytes().to_vec()),
))));
}
fn AuthorityDiscoveryApi_verify_runtime_api_impl(
&self,
_: &BlockId<Block>,
_: ExecutionContext,
args: Option<(&Vec<u8>, &Signature, &AuthorityId)>,
_: Vec<u8>,
) -> std::result::Result<NativeOrEncoded<bool>, client::error::Error> {
if *args.unwrap().1 == Signature("test-signature-1".as_bytes().to_vec()) {
return Ok(NativeOrEncoded::Native(true));
}
return Ok(NativeOrEncoded::Native(false));
}
}
#[derive(Default)]
struct TestNetwork {
// Whenever functions on `TestNetwork` are called, the function arguments are added to the vectors below.
pub put_value_call: Arc<Mutex<Vec<(libp2p::kad::record::Key, Vec<u8>)>>>,
pub get_value_call: Arc<Mutex<Vec<libp2p::kad::record::Key>>>,
pub set_priority_group_call: Arc<Mutex<Vec<(String, HashSet<libp2p::Multiaddr>)>>>,
}
impl NetworkProvider for TestNetwork {
fn external_addresses(&self) -> Vec<libp2p::Multiaddr> {
vec![]
}
fn local_peer_id(&self) -> libp2p::PeerId {
libp2p::PeerId::random()
}
fn set_priority_group(
&self,
group_id: String,
peers: HashSet<libp2p::Multiaddr>,
) -> std::result::Result<(), String> {
self.set_priority_group_call
.lock()
.unwrap()
.push((group_id, peers));
Ok(())
}
fn put_value(&self, key: libp2p::kad::record::Key, value: Vec<u8>) {
self.put_value_call.lock().unwrap().push((key, value));
}
fn get_value(&self, key: &libp2p::kad::record::Key) {
self.get_value_call.lock().unwrap().push(key.clone());
}
}
#[test]
fn publish_own_ext_addresses_puts_record_on_dht() {
let (_dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000);
let test_api = Arc::new(TestApi {});
let network: Arc<TestNetwork> = Arc::new(Default::default());
let mut authority_discovery =
AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx);
authority_discovery.publish_own_ext_addresses().unwrap();
// Expect authority discovery to put a new record onto the dht.
assert_eq!(network.put_value_call.lock().unwrap().len(), 1);
}
#[test]
fn request_addresses_of_others_triggers_dht_get_query() {
let (_dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000);
let test_api = Arc::new(TestApi {});
let network: Arc<TestNetwork> = Arc::new(Default::default());
let mut authority_discovery =
AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx);
authority_discovery.request_addresses_of_others().unwrap();
// Expect authority discovery to request new records from the dht.
assert_eq!(network.get_value_call.lock().unwrap().len(), 2);
}
#[test]
fn handle_dht_events_with_value_found_should_call_set_priority_group() {
// Create authority discovery.
let (mut dht_event_tx, dht_event_rx) = futures::sync::mpsc::channel(1000);
let test_api = Arc::new(TestApi {});
let network: Arc<TestNetwork> = Arc::new(Default::default());
let mut authority_discovery =
AuthorityDiscovery::new(test_api, network.clone(), dht_event_rx);
// Create sample dht event.
let authority_id_1 = hash_authority_id("test-authority-id-1".as_bytes()).unwrap();
let address_1: libp2p::Multiaddr = "/ip6/2001:db8::".parse().unwrap();
let mut serialized_addresses = vec![];
schema::AuthorityAddresses {
addresses: vec![address_1.to_vec()],
}
.encode(&mut serialized_addresses)
.unwrap();
let mut signed_addresses = vec![];
schema::SignedAuthorityAddresses {
addresses: serialized_addresses,
signature: "test-signature-1".as_bytes().to_vec(),
}
.encode(&mut signed_addresses)
.unwrap();
let dht_event = network::DhtEvent::ValueFound(vec![(authority_id_1, signed_addresses)]);
dht_event_tx.try_send(dht_event).unwrap();
// Make authority discovery handle the event.
let f = || {
authority_discovery.handle_dht_events().unwrap();
// Expect authority discovery to set the priority set.
assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1);
assert_eq!(
network.set_priority_group_call.lock().unwrap()[0],
(
"authorities".to_string(),
HashSet::from_iter(vec![address_1.clone()].into_iter())
)
);
Ok(Async::Ready(()))
};
let mut runtime = current_thread::Runtime::new().unwrap();
runtime.block_on(poll_fn::<(), (), _>(f)).unwrap();
}
}
@@ -0,0 +1,14 @@
syntax = "proto3";
package authority_discovery;
// First we need to serialize the addresses in order to be able to sign them.
message AuthorityAddresses {
repeated bytes addresses = 1;
}
// Then we need to serialize addresses and signature to send them over the wire.
message SignedAuthorityAddresses {
bytes addresses = 1;
bytes signature = 2;
}
+1
View File
@@ -192,6 +192,7 @@ pub use service::{
NetworkStateInfo,
};
pub use protocol::{PeerInfo, Context, consensus_gossip, message, specialization};
pub use protocol::event::{Event, DhtEvent};
pub use protocol::sync::SyncState;
pub use libp2p::{Multiaddr, PeerId};
#[doc(inline)]
@@ -20,6 +20,7 @@
use libp2p::kad::record::Key;
/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
pub enum DhtEvent {
/// The value was found.
ValueFound(Vec<(Key, Vec<u8>)>),
@@ -35,6 +36,7 @@ pub enum DhtEvent {
}
/// Type for events generated by networking layer.
#[derive(Debug, Clone)]
pub enum Event {
/// Event generated by a DHT.
Dht(DhtEvent),
+7 -6
View File
@@ -612,11 +612,11 @@ pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: Ex
light_client_rqs: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
}
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for NetworkWorker<B, S, H> {
type Item = ();
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Stream for NetworkWorker<B, S, H> {
type Item = Event;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// Poll the import queue for actions to perform.
let _ = futures03::future::poll_fn(|cx| {
self.import_queue.poll_actions(cx, &mut NetworkLink {
@@ -636,7 +636,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
// Process the next message coming from the `NetworkService`.
let msg = match self.from_worker.poll() {
Ok(Async::Ready(Some(msg))) => msg,
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(None)),
Ok(Async::NotReady) => break,
};
@@ -677,8 +677,9 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome,
Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => {
self.network_service.user_protocol_mut()
.on_event(Event::Dht(ev));
CustomMessageOutcome::None
.on_event(Event::Dht(ev.clone()));
return Ok(Async::Ready(Some(Event::Dht(ev))));
},
Ok(Async::Ready(None)) => CustomMessageOutcome::None,
Err(err) => {
+2
View File
@@ -31,12 +31,14 @@ client = { package = "substrate-client", path = "../../core/client" }
client_db = { package = "substrate-client-db", path = "../../core/client/db", features = ["kvdb-rocksdb"] }
codec = { package = "parity-scale-codec", version = "1.0.0" }
substrate-executor = { path = "../../core/executor" }
substrate-authority-discovery = { path = "../../core/authority-discovery"}
transaction_pool = { package = "substrate-transaction-pool", path = "../../core/transaction-pool" }
rpc-servers = { package = "substrate-rpc-servers", path = "../../core/rpc-servers" }
rpc = { package = "substrate-rpc", path = "../../core/rpc" }
tel = { package = "substrate-telemetry", path = "../../core/telemetry" }
offchain = { package = "substrate-offchain", path = "../../core/offchain" }
parity-multiaddr = { package = "parity-multiaddr", version = "0.5.0" }
authority-discovery-primitives = { package = "substrate-authority-discovery-primitives", path = "../authority-discovery/primitives", default-features = false }
[dev-dependencies]
substrate-test-runtime-client = { path = "../test-runtime/client" }
+41 -2
View File
@@ -28,7 +28,7 @@ use futures::{prelude::*, sync::mpsc};
use futures03::{FutureExt as _, compat::Compat, StreamExt as _, TryStreamExt as _};
use keystore::{Store as Keystore, KeyStorePtr};
use log::{info, warn};
use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo};
use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo, DhtEvent};
use network::{config::BoxFinalityProofRequestBuilder, specialization::NetworkSpecialization};
use parking_lot::{Mutex, RwLock};
use primitives::{Blake2Hasher, H256, Hasher};
@@ -76,6 +76,7 @@ pub struct ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFpr
transaction_pool: Arc<TExPool>,
rpc_extensions: TRpc,
rpc_builder: TRpcB,
dht_event_tx: Option<mpsc::Sender<DhtEvent>>,
marker: PhantomData<(TBl, TRtApi)>,
}
@@ -197,6 +198,7 @@ where TGen: Serialize + DeserializeOwned + BuildStorage {
transaction_pool: Arc::new(()),
rpc_extensions: Default::default(),
rpc_builder,
dht_event_tx: None,
marker: PhantomData,
})
}
@@ -266,6 +268,7 @@ where TGen: Serialize + DeserializeOwned + BuildStorage {
transaction_pool: Arc::new(()),
rpc_extensions: Default::default(),
rpc_builder,
dht_event_tx: None,
marker: PhantomData,
})
}
@@ -312,6 +315,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPo
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
@@ -354,6 +358,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPo
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
@@ -380,6 +385,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPo
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
@@ -421,6 +427,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPo
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
@@ -479,6 +486,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPo
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
@@ -516,6 +524,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPo
transaction_pool: Arc::new(transaction_pool),
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
@@ -542,9 +551,36 @@ impl<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPo
transaction_pool: self.transaction_pool,
rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Adds a dht event sender to builder to be used by the network to send dht events to the authority discovery
/// module.
pub fn with_dht_event_tx(
self,
dht_event_tx: mpsc::Sender<DhtEvent>,
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, TRpcB, Backend>, Error> {
Ok(ServiceBuilder {
config: self.config,
client: self.client,
backend: self.backend,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: Some(dht_event_tx),
marker: self.marker,
})
}
}
/// RPC handlers builder.
@@ -798,6 +834,7 @@ ServiceBuilder<
network_protocol,
transaction_pool,
rpc_extensions,
dht_event_tx,
rpc_builder,
) = (
self.client,
@@ -811,6 +848,7 @@ ServiceBuilder<
self.network_protocol,
self.transaction_pool,
self.rpc_extensions,
self.dht_event_tx,
self.rpc_builder,
);
@@ -829,7 +867,8 @@ ServiceBuilder<
finality_proof_provider,
network_protocol,
transaction_pool,
rpc_extensions
rpc_extensions,
dht_event_tx,
))
},
|h, c, tx| maintain_transaction_pool(h, c, tx),
+22 -8
View File
@@ -39,7 +39,7 @@ use client::{runtime_api::BlockT, Client};
use exit_future::Signal;
use futures::prelude::*;
use futures03::stream::{StreamExt as _, TryStreamExt as _};
use network::{NetworkService, NetworkState, specialization::NetworkSpecialization};
use network::{NetworkService, NetworkState, specialization::NetworkSpecialization, Event, DhtEvent};
use log::{log, warn, debug, error, Level};
use codec::{Encode, Decode};
use primitives::{Blake2Hasher, H256};
@@ -154,7 +154,8 @@ macro_rules! new_impl {
finality_proof_provider,
network_protocol,
transaction_pool,
rpc_extensions
rpc_extensions,
dht_event_tx,
) = $build_components(&$config)?;
let import_queue = Box::new(import_queue);
let chain_info = client.info().chain;
@@ -357,12 +358,14 @@ macro_rules! new_impl {
let rpc_handlers = gen_handler();
let rpc = start_rpc_servers(&$config, gen_handler)?;
let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future(
network_mut,
client.clone(),
network_status_sinks.clone(),
system_rpc_rx,
has_bootnodes
has_bootnodes,
dht_event_tx,
)
.map_err(|_| ())
.select(exit.clone())
@@ -653,6 +656,7 @@ fn build_network_future<
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<(NetworkStatus<B>, NetworkState)>>>>,
rpc_rx: futures03::channel::mpsc::UnboundedReceiver<rpc::system::Request<B>>,
should_have_peers: bool,
dht_event_tx: Option<mpsc::Sender<DhtEvent>>,
) -> impl Future<Item = (), Error = ()> {
// Compatibility shim while we're transitionning to stable Futures.
// See https://github.com/paritytech/substrate/issues/3099
@@ -730,11 +734,21 @@ fn build_network_future<
}
// Main network polling.
match network.poll() {
Ok(Async::NotReady) => {}
Err(err) => warn!(target: "service", "Error in network: {:?}", err),
Ok(Async::Ready(())) => warn!(target: "service", "Network service finished"),
}
while let Ok(Async::Ready(Some(Event::Dht(event)))) = network.poll().map_err(|err| {
warn!(target: "service", "Error in network: {:?}", err);
}) {
// Given that core/authority-discovery is the only upper stack consumer of Dht events at the moment, all Dht
// events are being passed on to the authority-discovery module. In the future there might be multiple
// consumers of these events. In that case this would need to be refactored to properly dispatch the events,
// e.g. via a subscriber model.
if let Some(Err(e)) = dht_event_tx.as_ref().map(|c| c.clone().try_send(event)) {
if e.is_full() {
warn!(target: "service", "Dht event channel to authority discovery is full, dropping event.");
} else if e.is_disconnected() {
warn!(target: "service", "Dht event channel to authority discovery is disconnected, dropping event.");
}
}
};
// Now some diagnostic for performances.
let polling_dur = before_polling.elapsed();
+2 -1
View File
@@ -46,7 +46,8 @@ system = { package = "srml-system", path = "../../srml/system" }
balances = { package = "srml-balances", path = "../../srml/balances" }
support = { package = "srml-support", path = "../../srml/support", default-features = false }
im_online = { package = "srml-im-online", path = "../../srml/im-online", default-features = false }
authority-discovery = { package = "srml-authority-discovery", path = "../../srml/authority-discovery", default-features = false }
sr-authority-discovery = { package = "srml-authority-discovery", path = "../../srml/authority-discovery", default-features = false }
authority-discovery = { package = "substrate-authority-discovery", path = "../../core/authority-discovery"}
[dev-dependencies]
keystore = { package = "substrate-keystore", path = "../../core/keystore" }
+17
View File
@@ -103,6 +103,8 @@ macro_rules! new_full_start {
macro_rules! new_full {
($config:expr) => {{
use futures::Future;
use futures::sync::mpsc;
use network::DhtEvent;
let (
is_authority,
@@ -118,10 +120,18 @@ macro_rules! new_full {
let (builder, mut import_setup, inherent_data_providers, mut tasks_to_spawn) = new_full_start!($config);
// Dht event channel from the network to the authority discovery module. Use bounded channel to ensure
// back-pressure. Authority discovery is triggering one event per authority within the current authority set.
// This estimates the authority set size to be somewhere below 10 000 thereby setting the channel buffer size to
// 10 000.
let (dht_event_tx, dht_event_rx) =
mpsc::channel::<DhtEvent>(10000);
let service = builder.with_network_protocol(|_| Ok(crate::service::NodeProtocol::new()))?
.with_finality_proof_provider(|client, backend|
Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, client)) as _)
)?
.with_dht_event_tx(dht_event_tx)?
.build()?;
let (block_import, link_half, babe_link) = import_setup.take()
@@ -162,6 +172,13 @@ macro_rules! new_full {
let babe = babe::start_babe(babe_config)?;
let select = babe.select(service.on_exit()).then(|_| Ok(()));
service.spawn_task(Box::new(select));
let authority_discovery = authority_discovery::AuthorityDiscovery::new(
service.client(),
service.network(),
dht_event_rx,
);
service.spawn_task(Box::new(authority_discovery));
}
let config = grandpa::Config {
+26 -12
View File
@@ -47,7 +47,9 @@ use elections::VoteIndex;
use version::NativeVersion;
use primitives::OpaqueMetadata;
use grandpa::{AuthorityId as GrandpaId, AuthorityWeight as GrandpaWeight};
use im_online::sr25519::{AuthorityId as ImOnlineId};
use im_online::sr25519::{AuthorityId as ImOnlineId, AuthoritySignature as ImOnlineSignature};
use authority_discovery_primitives::{AuthorityId as EncodedAuthorityId, Signature as EncodedSignature};
use codec::{Encode, Decode};
use system::offchain::TransactionSubmitter;
#[cfg(any(feature = "std", test))]
@@ -191,7 +193,7 @@ impl authorship::Trait for Runtime {
type EventHandler = Staking;
}
type SessionHandlers = (Grandpa, Babe, ImOnline);
type SessionHandlers = (Grandpa, Babe, ImOnline, AuthorityDiscovery);
impl_opaque_keys! {
pub struct SessionKeys {
@@ -617,20 +619,32 @@ impl_runtime_apis! {
}
}
impl authority_discovery_primitives::AuthorityDiscoveryApi<Block, ImOnlineId> for Runtime {
fn authority_id() -> Option<ImOnlineId> {
AuthorityDiscovery::authority_id()
}
fn authorities() -> Vec<ImOnlineId> {
AuthorityDiscovery::authorities()
impl authority_discovery_primitives::AuthorityDiscoveryApi<Block> for Runtime {
fn authorities() -> Vec<EncodedAuthorityId> {
AuthorityDiscovery::authorities().into_iter()
.map(|id| id.encode())
.map(EncodedAuthorityId)
.collect()
}
fn sign(payload: Vec<u8>, authority_id: ImOnlineId) -> Option<Vec<u8>> {
AuthorityDiscovery::sign(payload, authority_id)
fn sign(payload: &Vec<u8>) -> Option<(EncodedSignature, EncodedAuthorityId)> {
AuthorityDiscovery::sign(payload).map(|(sig, id)| {
(EncodedSignature(sig.encode()), EncodedAuthorityId(id.encode()))
})
}
fn verify(payload: Vec<u8>, signature: Vec<u8>, public_key: ImOnlineId) -> bool {
AuthorityDiscovery::verify(payload, signature, public_key)
fn verify(payload: &Vec<u8>, signature: &EncodedSignature, authority_id: &EncodedAuthorityId) -> bool {
let signature = match ImOnlineSignature::decode(&mut &signature.0[..]) {
Ok(s) => s,
_ => return false,
};
let authority_id = match ImOnlineId::decode(&mut &authority_id.0[..]) {
Ok(id) => id,
_ => return false,
};
AuthorityDiscovery::verify(payload, signature, authority_id)
}
}
+13 -22
View File
@@ -29,13 +29,14 @@
#![cfg_attr(not(feature = "std"), no_std)]
use app_crypto::RuntimeAppPublic;
use codec::{Decode, Encode};
use rstd::prelude::*;
use support::{decl_module, decl_storage, StorageValue};
pub trait Trait: system::Trait + session::Trait + im_online::Trait {}
type AuthorityIdFor<T> = <T as im_online::Trait>::AuthorityId;
type AuthoritySignatureFor<T> =
<<T as im_online::Trait>::AuthorityId as RuntimeAppPublic>::Signature;
decl_storage! {
trait Store for Module<T: Trait> as AuthorityDiscovery {
@@ -58,7 +59,7 @@ impl<T: Trait> Module<T> {
/// set, otherwise this function returns None. The restriction might be
/// softened in the future in case a consumer needs to learn own authority
/// identifier.
pub fn authority_id() -> Option<AuthorityIdFor<T>> {
fn authority_id() -> Option<AuthorityIdFor<T>> {
let authorities = Keys::<T>::get();
let local_keys = <AuthorityIdFor<T>>::all();
@@ -78,20 +79,19 @@ impl<T: Trait> Module<T> {
}
/// Sign the given payload with the private key corresponding to the given authority id.
pub fn sign(payload: Vec<u8>, authority_id: AuthorityIdFor<T>) -> Option<Vec<u8>> {
authority_id.sign(&payload).map(|s| s.encode())
pub fn sign(payload: &Vec<u8>) -> Option<(AuthoritySignatureFor<T>, AuthorityIdFor<T>)> {
let authority_id = Module::<T>::authority_id()?;
authority_id.sign(payload).map(|s| (s, authority_id))
}
/// Verify the given signature for the given payload with the given
/// authority identifier.
pub fn verify(
payload: Vec<u8>,
signature: Vec<u8>,
payload: &Vec<u8>,
signature: AuthoritySignatureFor<T>,
authority_id: AuthorityIdFor<T>,
) -> bool {
<AuthorityIdFor<T> as RuntimeAppPublic>::Signature::decode(&mut &signature[..])
.map(|s| authority_id.verify(&payload, &s))
.unwrap_or(false)
authority_id.verify(payload, &signature)
}
fn initialize_keys(keys: &[AuthorityIdFor<T>]) {
@@ -158,10 +158,7 @@ mod tests {
pub struct TestOnSessionEnding;
impl session::OnSessionEnding<AuthorityId> for TestOnSessionEnding {
fn on_session_ending(
_: SessionIndex,
_: SessionIndex,
) -> Option<Vec<AuthorityId>> {
fn on_session_ending(_: SessionIndex, _: SessionIndex) -> Option<Vec<AuthorityId>> {
None
}
}
@@ -351,19 +348,13 @@ mod tests {
externalities.set_keystore(key_store);
with_externalities(&mut externalities, || {
let authority_id = AuthorityDiscovery::authority_id().expect("authority id");
let payload = String::from("test payload").into_bytes();
let sig =
AuthorityDiscovery::sign(payload.clone(), authority_id.clone()).expect("signature");
let (sig, authority_id) = AuthorityDiscovery::sign(&payload).expect("signature");
assert!(AuthorityDiscovery::verify(
payload,
sig.clone(),
authority_id.clone()
));
assert!(AuthorityDiscovery::verify(&payload, sig.clone(), authority_id.clone(),));
assert!(!AuthorityDiscovery::verify(
String::from("other payload").into_bytes(),
&String::from("other payload").into_bytes(),
sig,
authority_id
))
+1 -1
View File
@@ -67,7 +67,7 @@
// Ensure we're `no_std` when compiling for Wasm.
#![cfg_attr(not(feature = "std"), no_std)]
use app_crypto::{AppPublic, RuntimeAppPublic};
use app_crypto::{AppPublic, RuntimeAppPublic, AppSignature};
use codec::{Encode, Decode};
use primitives::offchain::{OpaqueNetworkState, StorageKind};
use rstd::prelude::*;