core/authority-discovery: Enable authorities to discover each other (#3452)

With the *authority-discovery* module an authoritative node makes itself
discoverable and is able to discover other authorities. Once discovered, a node
can directly connect to other authorities instead of multi-hop gossiping
information.

1. **Making itself discoverable**

    1. Retrieve its external addresses

    2. Adds its network peer id to the addresses

    3. Sign the above

    4. Put the signature and the addresses on the libp2p Kademlia DHT

2. **Discovering other authorities**

    1. Retrieve the current set of authorities

    2. Start DHT queries for the ids of the authorities

    3. Validate the signatures of the retrieved key value pairs

    4. Add the retrieved external addresses as ~reserved~ priority nodes to the
       peerset


* node/runtime: Add authority-discovery as session handler

The srml/authority-discovery module implements the OneSessionHandler in
order to keep its authority set in sync. This commit adds the module to
the set of session handlers.

* core/network: Make network worker return Dht events on poll

Instead of network worker implement the Future trait, have it implement
the Stream interface returning Dht events.

For now these events are ignored in build_network_future but will be
used by the core/authority-discovery module in subsequent commits.

* *: Add scaffolding and integration for core/authority-discovery module

* core/authority-discovery: Implement module logic itself
This commit is contained in:
Max Inden
2019-09-06 17:43:03 +02:00
committed by GitHub
parent ece0b57d8d
commit 027d88796b
19 changed files with 1041 additions and 62 deletions
+41 -2
View File
@@ -28,7 +28,7 @@ use futures::{prelude::*, sync::mpsc};
use futures03::{FutureExt as _, compat::Compat, StreamExt as _, TryStreamExt as _};
use keystore::{Store as Keystore, KeyStorePtr};
use log::{info, warn};
use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo};
use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo, DhtEvent};
use network::{config::BoxFinalityProofRequestBuilder, specialization::NetworkSpecialization};
use parking_lot::{Mutex, RwLock};
use primitives::{Blake2Hasher, H256, Hasher};
@@ -76,6 +76,7 @@ pub struct ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFpr
transaction_pool: Arc<TExPool>,
rpc_extensions: TRpc,
rpc_builder: TRpcB,
dht_event_tx: Option<mpsc::Sender<DhtEvent>>,
marker: PhantomData<(TBl, TRtApi)>,
}
@@ -197,6 +198,7 @@ where TGen: Serialize + DeserializeOwned + BuildStorage {
transaction_pool: Arc::new(()),
rpc_extensions: Default::default(),
rpc_builder,
dht_event_tx: None,
marker: PhantomData,
})
}
@@ -266,6 +268,7 @@ where TGen: Serialize + DeserializeOwned + BuildStorage {
transaction_pool: Arc::new(()),
rpc_extensions: Default::default(),
rpc_builder,
dht_event_tx: None,
marker: PhantomData,
})
}
@@ -312,6 +315,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPo
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
@@ -354,6 +358,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPo
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
@@ -380,6 +385,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPo
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
@@ -421,6 +427,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPo
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
@@ -479,6 +486,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPo
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
@@ -516,6 +524,7 @@ impl<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPo
transaction_pool: Arc::new(transaction_pool),
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
@@ -542,9 +551,36 @@ impl<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPo
transaction_pool: self.transaction_pool,
rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: self.dht_event_tx,
marker: self.marker,
})
}
/// Adds a dht event sender to builder to be used by the network to send dht events to the authority discovery
/// module.
pub fn with_dht_event_tx(
self,
dht_event_tx: mpsc::Sender<DhtEvent>,
) -> Result<ServiceBuilder<TBl, TRtApi, TCfg, TGen, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
TNetP, TExPool, TRpc, TRpcB, Backend>, Error> {
Ok(ServiceBuilder {
config: self.config,
client: self.client,
backend: self.backend,
keystore: self.keystore,
fetcher: self.fetcher,
select_chain: self.select_chain,
import_queue: self.import_queue,
finality_proof_request_builder: self.finality_proof_request_builder,
finality_proof_provider: self.finality_proof_provider,
network_protocol: self.network_protocol,
transaction_pool: self.transaction_pool,
rpc_extensions: self.rpc_extensions,
rpc_builder: self.rpc_builder,
dht_event_tx: Some(dht_event_tx),
marker: self.marker,
})
}
}
/// RPC handlers builder.
@@ -798,6 +834,7 @@ ServiceBuilder<
network_protocol,
transaction_pool,
rpc_extensions,
dht_event_tx,
rpc_builder,
) = (
self.client,
@@ -811,6 +848,7 @@ ServiceBuilder<
self.network_protocol,
self.transaction_pool,
self.rpc_extensions,
self.dht_event_tx,
self.rpc_builder,
);
@@ -829,7 +867,8 @@ ServiceBuilder<
finality_proof_provider,
network_protocol,
transaction_pool,
rpc_extensions
rpc_extensions,
dht_event_tx,
))
},
|h, c, tx| maintain_transaction_pool(h, c, tx),
+22 -8
View File
@@ -39,7 +39,7 @@ use client::{runtime_api::BlockT, Client};
use exit_future::Signal;
use futures::prelude::*;
use futures03::stream::{StreamExt as _, TryStreamExt as _};
use network::{NetworkService, NetworkState, specialization::NetworkSpecialization};
use network::{NetworkService, NetworkState, specialization::NetworkSpecialization, Event, DhtEvent};
use log::{log, warn, debug, error, Level};
use codec::{Encode, Decode};
use primitives::{Blake2Hasher, H256};
@@ -154,7 +154,8 @@ macro_rules! new_impl {
finality_proof_provider,
network_protocol,
transaction_pool,
rpc_extensions
rpc_extensions,
dht_event_tx,
) = $build_components(&$config)?;
let import_queue = Box::new(import_queue);
let chain_info = client.info().chain;
@@ -357,12 +358,14 @@ macro_rules! new_impl {
let rpc_handlers = gen_handler();
let rpc = start_rpc_servers(&$config, gen_handler)?;
let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future(
network_mut,
client.clone(),
network_status_sinks.clone(),
system_rpc_rx,
has_bootnodes
has_bootnodes,
dht_event_tx,
)
.map_err(|_| ())
.select(exit.clone())
@@ -653,6 +656,7 @@ fn build_network_future<
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<(NetworkStatus<B>, NetworkState)>>>>,
rpc_rx: futures03::channel::mpsc::UnboundedReceiver<rpc::system::Request<B>>,
should_have_peers: bool,
dht_event_tx: Option<mpsc::Sender<DhtEvent>>,
) -> impl Future<Item = (), Error = ()> {
// Compatibility shim while we're transitionning to stable Futures.
// See https://github.com/paritytech/substrate/issues/3099
@@ -730,11 +734,21 @@ fn build_network_future<
}
// Main network polling.
match network.poll() {
Ok(Async::NotReady) => {}
Err(err) => warn!(target: "service", "Error in network: {:?}", err),
Ok(Async::Ready(())) => warn!(target: "service", "Network service finished"),
}
while let Ok(Async::Ready(Some(Event::Dht(event)))) = network.poll().map_err(|err| {
warn!(target: "service", "Error in network: {:?}", err);
}) {
// Given that core/authority-discovery is the only upper stack consumer of Dht events at the moment, all Dht
// events are being passed on to the authority-discovery module. In the future there might be multiple
// consumers of these events. In that case this would need to be refactored to properly dispatch the events,
// e.g. via a subscriber model.
if let Some(Err(e)) = dht_event_tx.as_ref().map(|c| c.clone().try_send(event)) {
if e.is_full() {
warn!(target: "service", "Dht event channel to authority discovery is full, dropping event.");
} else if e.is_disconnected() {
warn!(target: "service", "Dht event channel to authority discovery is disconnected, dropping event.");
}
}
};
// Now some diagnostic for performances.
let polling_dur = before_polling.elapsed();