Async keystore + Authority-Discovery async/await (#7000)

* Asyncify sign_with

* Asyncify generate/get keys

* Complete BareCryptoStore asyncification

* Cleanup

* Rebase

* Add Proxy

* Inject keystore proxy into extensions

* Implement some methods

* Await on send

* Cleanup

* Send result over the oneshot channel sender

* Process one future at a time

* Fix cargo stuff

* Asyncify sr25519_vrf_sign

* Cherry-pick and fix changes

* Introduce SyncCryptoStore

* SQUASH ME WITH THE first commit

* Implement into SyncCryptoStore

* Implement BareCryptoStore for KeystoreProxyAdapter

* authority-discovery

* AURA

* BABE

* finality-grandpa

* offchain-workers

* benchmarking-cli

* sp_io

* test-utils

* application-crypto

* Extensions and RPC

* Client Service

* bin

* Update cargo.lock

* Implement BareCryptoStore on proxy directly

* Simplify proxy setup

* Fix authority-discover

* Pass async keystore to authority-discovery

* Fix tests

* Use async keystore in authority-discovery

* Rename BareCryptoStore to CryptoStore

* WIP

* Remote mutable borrow in CryptoStore trait

* Implement Keystore with backends

* Remove Proxy implementation

* Fix service builder and keystore user-crates

* Fix tests

* Rework authority-discovery after refactoring

* futures::select!

* Fix multiple mut borrows in authority-discovery

* Merge fixes

* Require sync

* Restore Cargo.lock

* PR feedback - round 1

* Remove Keystore and use LocalKeystore directly

Also renamed KeystoreParams to KeystoreContainer

* Join

* Remove sync requirement

* Fix keystore tests

* Fix tests

* client/authority-discovery: Remove event stream dynamic dispatching

With authority-discovery moving from a poll based future to an `async`
future Rust has difficulties propagating the `Sync` trade through the
generated state machine.

Instead of using dynamic dispatching, use a trait parameter to specify
the DHT event stream.

* Make it compile

* Fix submit_transaction

* Fix block_on issue

* Use await in async context

* Fix manual seal keystore

* Fix authoring_blocks test

* fix aura authoring_blocks

* Try to fix tests for auth-discovery

* client/authority-discovery: Fix lookup_throttling test

* client/authority-discovery: Fix triggers_dht_get_query test

* Fix epoch_authorship_works

* client/authority-discovery: Remove timing assumption in unit test

* client/authority-discovery: Revert changes to termination test

* PR feedback

* Remove deadcode and mark test code

* Fix test_sync

* Use the correct keyring type

* Return when from_service stream is closed

* Convert SyncCryptoStore to a trait

* Fix line width

* Fix line width - take 2

* Remove unused import

* Fix keystore instantiation

* PR feedback

* Remove KeystoreContainer

* Revert "Remove KeystoreContainer"

This reverts commit ea4a37c7d74f9772b93d974e05e4498af6192730.

* Take a ref of keystore

* Move keystore to dev-dependencies

* Address some PR feedback

* Missed one

* Pass keystore reference - take 2

* client/finality-grandpa: Use `Arc<dyn CryptoStore>` instead of SyncXXX

Instead of using `SyncCryptoStorePtr` within `client/finality-grandpa`,
which is a type alias for `Arc<dyn SyncCryptoStore>`, use `Arc<dyn
CryptoStore>`. Benefits are:

1. No additional mental overhead of a `SyncCryptoStorePtr`.

2. Ability for new code to use the asynchronous methods of `CryptoStore`
instead of the synchronous `SyncCryptoStore` methods within
`client/finality-granpa` without the need for larger refactorings.

Note: This commit uses `Arc<dyn CryptoStore>` instead of
`CryptoStorePtr`, as I find the type signature more descriptive. This is
subjective and in no way required.

* Remove SyncCryptoStorePtr

* Remove KeystoreContainer & SyncCryptoStorePtr

* PR feedback

* *: Use CryptoStorePtr whereever possible

* *: Define SyncCryptoStore as a pure extension trait of CryptoStore

* Follow up to SyncCryptoStore extension trait

* Adjust docs for SyncCryptoStore as Ben suggested

* Cleanup unnecessary requirements

* sp-keystore

* Use async_std::task::block_on in keystore

* Fix block_on std requirement

* Update primitives/keystore/src/lib.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Fix wasm build

* Remove unused var

* Fix wasm compilation - take 2

* Revert async-std in keystore

* Fix indent

* Fix version and copyright

* Cleanup feature = "std"

* Auth Discovery: Ignore if from_service is cloed

* Max's suggestion

* Revert async-std usage for block_on

* Address PR feedback

* Fix example offchain worker build

* Address PR feedback

* Update Cargo.lock

* Move unused methods to test helper functions

* Restore accidentally deleted cargo.lock files

* Fix unused imports

Co-authored-by: Max Inden <mail@max-inden.de>
Co-authored-by: Shawn Tabrizi <shawntabrizi@gmail.com>
This commit is contained in:
Rakan Alhneiti
2020-10-08 22:56:35 +02:00
committed by GitHub
parent db8a0cafa9
commit 3aa4bfacfc
70 changed files with 2394 additions and 1762 deletions
@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
#![warn(missing_docs)]
#![recursion_limit = "1024"]
//! Substrate authority discovery.
//!
//! This crate enables Substrate authorities to discover and directly connect to
@@ -26,7 +26,6 @@
pub use crate::{service::Service, worker::{NetworkProvider, Worker, Role}};
use std::pin::Pin;
use std::sync::Arc;
use futures::channel::{mpsc, oneshot};
@@ -45,19 +44,20 @@ mod tests;
mod worker;
/// Create a new authority discovery [`Worker`] and [`Service`].
pub fn new_worker_and_service<Client, Network, Block>(
pub fn new_worker_and_service<Client, Network, Block, DhtEventStream>(
client: Arc<Client>,
network: Arc<Network>,
sentry_nodes: Vec<MultiaddrWithPeerId>,
dht_event_rx: Pin<Box<dyn Stream<Item = DhtEvent> + Send>>,
dht_event_rx: DhtEventStream,
role: Role,
prometheus_registry: Option<prometheus_endpoint::Registry>,
) -> (Worker<Client, Network, Block>, Service)
) -> (Worker<Client, Network, Block, DhtEventStream>, Service)
where
Block: BlockT + Unpin + 'static,
Network: NetworkProvider,
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static + HeaderBackend<Block>,
<Client as ProvideRuntimeApi<Block>>::Api: AuthorityDiscoveryApi<Block, Error = sp_blockchain::Error>,
DhtEventStream: Stream<Item = DhtEvent> + Unpin,
{
let (to_worker, from_service) = mpsc::channel(0);
@@ -19,28 +19,29 @@
use crate::{new_worker_and_service, worker::{tests::{TestApi, TestNetwork}, Role}};
use std::sync::Arc;
use futures::prelude::*;
use futures::channel::mpsc::channel;
use futures::executor::LocalPool;
use futures::task::LocalSpawn;
use futures::{channel::mpsc::channel, executor::LocalPool, task::LocalSpawn};
use libp2p::core::{multiaddr::{Multiaddr, Protocol}, PeerId};
use sp_authority_discovery::AuthorityId;
use sp_core::crypto::key_types;
use sp_core::testing::KeyStore;
use sp_keystore::{CryptoStore, testing::KeyStore};
#[test]
fn get_addresses_and_authority_id() {
let (_dht_event_tx, dht_event_rx) = channel(0);
let network: Arc<TestNetwork> = Arc::new(Default::default());
let mut pool = LocalPool::new();
let key_store = KeyStore::new();
let remote_authority_id: AuthorityId = key_store
.write()
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
.unwrap()
.into();
let remote_authority_id: AuthorityId = pool.run_until(async {
key_store
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
.await
.unwrap()
.into()
});
let remote_peer_id = PeerId::random();
let remote_addr = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333".parse::<Multiaddr>()
@@ -55,15 +56,13 @@ fn get_addresses_and_authority_id() {
test_api,
network.clone(),
vec![],
dht_event_rx.boxed(),
Role::Authority(key_store),
Box::pin(dht_event_rx),
Role::Authority(key_store.into()),
None,
);
worker.inject_addresses(remote_authority_id.clone(), vec![remote_addr.clone()]);
let mut pool = LocalPool::new();
pool.spawner().spawn_local_obj(Box::pin(worker).into()).unwrap();
pool.spawner().spawn_local_obj(Box::pin(worker.run()).into()).unwrap();
pool.run_until(async {
assert_eq!(
+144 -173
View File
@@ -19,13 +19,11 @@ use crate::{error::{Error, Result}, ServicetoWorkerMsg};
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::channel::mpsc;
use futures::task::{Context, Poll};
use futures::{Future, FutureExt, ready, Stream, StreamExt, stream::Fuse};
use futures::{FutureExt, Stream, StreamExt, stream::Fuse};
use futures_timer::Delay;
use addr_cache::AddrCache;
@@ -47,7 +45,7 @@ use sc_network::{
};
use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair};
use sp_core::crypto::{key_types, Pair};
use sp_core::traits::BareCryptoStorePtr;
use sp_keystore::CryptoStore;
use sp_runtime::{traits::Block as BlockT, generic::BlockId};
use sp_api::ProvideRuntimeApi;
@@ -77,7 +75,7 @@ const MAX_IN_FLIGHT_LOOKUPS: usize = 8;
/// Role an authority discovery module can run as.
pub enum Role {
/// Actual authority as well as a reference to its key store.
Authority(BareCryptoStorePtr),
Authority(Arc<dyn CryptoStore>),
/// Sentry node that guards an authority.
///
/// No reference to its key store needed, as sentry nodes don't have an identity to sign
@@ -115,7 +113,7 @@ pub enum Role {
/// When run as a sentry node, the [`Worker`] does not publish
/// any addresses to the DHT but still discovers validators and sentry nodes of
/// validators, i.e. only step 2 (Discovers other authorities) is executed.
pub struct Worker<Client, Network, Block>
pub struct Worker<Client, Network, Block, DhtEventStream>
where
Block: BlockT + 'static,
Network: NetworkProvider,
@@ -137,7 +135,7 @@ where
// - Some(vec![a, b, c, ...]): Valid addresses were specified.
sentry_nodes: Option<Vec<Multiaddr>>,
/// Channel we receive Dht events on.
dht_event_rx: Pin<Box<dyn Stream<Item = DhtEvent> + Send>>,
dht_event_rx: DhtEventStream,
/// Interval to be proactive, publishing own addresses.
publish_interval: Interval,
@@ -161,14 +159,14 @@ where
phantom: PhantomData<Block>,
}
impl<Client, Network, Block> Worker<Client, Network, Block>
impl<Client, Network, Block, DhtEventStream> Worker<Client, Network, Block, DhtEventStream>
where
Block: BlockT + Unpin + 'static,
Network: NetworkProvider,
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static + HeaderBackend<Block>,
<Client as ProvideRuntimeApi<Block>>::Api:
AuthorityDiscoveryApi<Block, Error = sp_blockchain::Error>,
Self: Future<Output = ()>,
DhtEventStream: Stream<Item = DhtEvent> + Unpin,
{
/// Return a new [`Worker`].
///
@@ -179,7 +177,7 @@ where
client: Arc<Client>,
network: Arc<Network>,
sentry_nodes: Vec<MultiaddrWithPeerId>,
dht_event_rx: Pin<Box<dyn Stream<Item = DhtEvent> + Send>>,
dht_event_rx: DhtEventStream,
role: Role,
prometheus_registry: Option<prometheus_endpoint::Registry>,
) -> Self {
@@ -247,6 +245,72 @@ where
}
}
/// Start the worker
pub async fn run(mut self) {
loop {
self.start_new_lookups();
futures::select! {
// Process incoming events.
event = self.dht_event_rx.next().fuse() => {
if let Some(event) = event {
self.handle_dht_event(event).await;
} else {
// This point is reached if the network has shut down, at which point there is not
// much else to do than to shut down the authority discovery as well.
return;
}
},
// Handle messages from [`Service`]. Ignore if sender side is closed.
msg = self.from_service.select_next_some() => {
self.process_message_from_service(msg);
},
// Set peerset priority group to a new random set of addresses.
_ = self.priority_group_set_interval.next().fuse() => {
if let Err(e) = self.set_priority_group() {
error!(
target: LOG_TARGET,
"Failed to set priority group: {:?}", e,
);
}
},
// Publish own addresses.
_ = self.publish_interval.next().fuse() => {
if let Err(e) = self.publish_ext_addresses().await {
error!(
target: LOG_TARGET,
"Failed to publish external addresses: {:?}", e,
);
}
},
// Request addresses of authorities.
_ = self.query_interval.next().fuse() => {
if let Err(e) = self.refill_pending_lookups_queue().await {
error!(
target: LOG_TARGET,
"Failed to request addresses of authorities: {:?}", e,
);
}
},
}
}
}
fn process_message_from_service(&self, msg: ServicetoWorkerMsg) {
match msg {
ServicetoWorkerMsg::GetAddressesByAuthorityId(authority, sender) => {
let _ = sender.send(
self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone),
);
}
ServicetoWorkerMsg::GetAuthorityIdByPeerId(peer_id, sender) => {
let _ = sender.send(
self.addr_cache.get_authority_id_by_peer_id(&peer_id).map(Clone::clone),
);
}
}
}
fn addresses_to_publish(&self) -> impl ExactSizeIterator<Item = Multiaddr> {
match &self.sentry_nodes {
Some(addrs) => Either::Left(addrs.clone().into_iter()),
@@ -268,7 +332,7 @@ where
}
/// Publish either our own or if specified the public addresses of our sentry nodes.
fn publish_ext_addresses(&mut self) -> Result<()> {
async fn publish_ext_addresses(&mut self) -> Result<()> {
let key_store = match &self.role {
Role::Authority(key_store) => key_store,
// Only authority nodes can put addresses (their own or the ones of their sentry nodes)
@@ -291,18 +355,16 @@ where
.encode(&mut serialized_addresses)
.map_err(Error::EncodingProto)?;
let keys = Worker::get_own_public_keys_within_authority_set(
&key_store,
&self.client,
)?.into_iter().map(Into::into).collect::<Vec<_>>();
let keys = Worker::<Client, Network, Block, DhtEventStream>::get_own_public_keys_within_authority_set(
key_store.clone(),
self.client.as_ref(),
).await?.into_iter().map(Into::into).collect::<Vec<_>>();
let signatures = key_store.read()
.sign_with_all(
key_types::AUTHORITY_DISCOVERY,
keys.clone(),
serialized_addresses.as_slice(),
)
.map_err(|_| Error::Signing)?;
let signatures = key_store.sign_with_all(
key_types::AUTHORITY_DISCOVERY,
keys.clone(),
serialized_addresses.as_slice(),
).await.map_err(|_| Error::Signing)?;
for (sign_result, key) in signatures.into_iter().zip(keys) {
let mut signed_addresses = vec![];
@@ -327,15 +389,14 @@ where
Ok(())
}
fn refill_pending_lookups_queue(&mut self) -> Result<()> {
async fn refill_pending_lookups_queue(&mut self) -> Result<()> {
let id = BlockId::hash(self.client.info().best_hash);
let local_keys = match &self.role {
Role::Authority(key_store) => {
key_store.read()
.sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
.into_iter()
.collect::<HashSet<_>>()
key_store.sr25519_public_keys(
key_types::AUTHORITY_DISCOVERY
).await.into_iter().collect::<HashSet<_>>()
},
Role::Sentry => HashSet::new(),
};
@@ -387,78 +448,68 @@ where
}
/// Handle incoming Dht events.
///
/// Returns either:
/// - Poll::Pending when there are no more events to handle or
/// - Poll::Ready(()) when the dht event stream terminated.
fn handle_dht_events(&mut self, cx: &mut Context) -> Poll<()>{
loop {
match ready!(self.dht_event_rx.poll_next_unpin(cx)) {
Some(DhtEvent::ValueFound(v)) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_found"]).inc();
}
if log_enabled!(log::Level::Debug) {
let hashes = v.iter().map(|(hash, _value)| hash.clone());
debug!(
target: LOG_TARGET,
"Value for hash '{:?}' found on Dht.", hashes,
);
}
if let Err(e) = self.handle_dht_value_found_event(v) {
if let Some(metrics) = &self.metrics {
metrics.handle_value_found_event_failure.inc();
}
debug!(
target: LOG_TARGET,
"Failed to handle Dht value found event: {:?}", e,
);
}
async fn handle_dht_event(&mut self, event: DhtEvent) {
match event {
DhtEvent::ValueFound(v) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_found"]).inc();
}
Some(DhtEvent::ValueNotFound(hash)) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
}
if self.in_flight_lookups.remove(&hash).is_some() {
debug!(
target: LOG_TARGET,
"Value for hash '{:?}' not found on Dht.", hash
)
} else {
debug!(
target: LOG_TARGET,
"Received 'ValueNotFound' for unexpected hash '{:?}'.", hash
)
}
},
Some(DhtEvent::ValuePut(hash)) => {
if log_enabled!(log::Level::Debug) {
let hashes = v.iter().map(|(hash, _value)| hash.clone());
debug!(
target: LOG_TARGET,
"Value for hash '{:?}' found on Dht.", hashes,
);
}
if let Err(e) = self.handle_dht_value_found_event(v) {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put"]).inc();
metrics.handle_value_found_event_failure.inc();
}
debug!(
target: LOG_TARGET,
"Successfully put hash '{:?}' on Dht.", hash,
)
},
Some(DhtEvent::ValuePutFailed(hash)) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
}
"Failed to handle Dht value found event: {:?}", e,
);
}
}
DhtEvent::ValueNotFound(hash) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_not_found"]).inc();
}
if self.in_flight_lookups.remove(&hash).is_some() {
debug!(
target: LOG_TARGET,
"Failed to put hash '{:?}' on Dht.", hash
"Value for hash '{:?}' not found on Dht.", hash
)
},
None => {
debug!(target: LOG_TARGET, "Dht event stream terminated.");
return Poll::Ready(());
},
} else {
debug!(
target: LOG_TARGET,
"Received 'ValueNotFound' for unexpected hash '{:?}'.", hash
)
}
},
DhtEvent::ValuePut(hash) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put"]).inc();
}
debug!(
target: LOG_TARGET,
"Successfully put hash '{:?}' on Dht.", hash,
)
},
DhtEvent::ValuePutFailed(hash) => {
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
}
debug!(
target: LOG_TARGET,
"Failed to put hash '{:?}' on Dht.", hash
)
}
}
}
@@ -541,7 +592,6 @@ where
);
}
}
Ok(())
}
@@ -551,12 +601,13 @@ where
// one for the upcoming session. In addition it could be participating in the current and (/ or)
// next authority set with two keys. The function does not return all of the local authority
// discovery public keys, but only the ones intersecting with the current or next authority set.
fn get_own_public_keys_within_authority_set(
key_store: &BareCryptoStorePtr,
async fn get_own_public_keys_within_authority_set(
key_store: Arc<dyn CryptoStore>,
client: &Client,
) -> Result<HashSet<AuthorityId>> {
let local_pub_keys = key_store.read()
let local_pub_keys = key_store
.sr25519_public_keys(key_types::AUTHORITY_DISCOVERY)
.await
.into_iter()
.collect::<HashSet<_>>();
@@ -609,86 +660,6 @@ where
}
}
impl<Client, Network, Block> Future for Worker<Client, Network, Block>
where
Block: BlockT + Unpin + 'static,
Network: NetworkProvider,
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static + HeaderBackend<Block>,
<Client as ProvideRuntimeApi<Block>>::Api:
AuthorityDiscoveryApi<Block, Error = sp_blockchain::Error>,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// Process incoming events.
if let Poll::Ready(()) = self.handle_dht_events(cx) {
// `handle_dht_events` returns `Poll::Ready(())` when the Dht event stream terminated.
// Termination of the Dht event stream implies that the underlying network terminated,
// thus authority discovery should terminate as well.
return Poll::Ready(());
}
// Publish own addresses.
if let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {
// Register waker of underlying task for next interval.
while let Poll::Ready(_) = self.publish_interval.poll_next_unpin(cx) {}
if let Err(e) = self.publish_ext_addresses() {
error!(
target: LOG_TARGET,
"Failed to publish external addresses: {:?}", e,
);
}
}
// Request addresses of authorities, refilling the pending lookups queue.
if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {
// Register waker of underlying task for next interval.
while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {}
if let Err(e) = self.refill_pending_lookups_queue() {
error!(
target: LOG_TARGET,
"Failed to refill pending lookups queue: {:?}", e,
);
}
}
// Set peerset priority group to a new random set of addresses.
if let Poll::Ready(_) = self.priority_group_set_interval.poll_next_unpin(cx) {
// Register waker of underlying task for next interval.
while let Poll::Ready(_) = self.priority_group_set_interval.poll_next_unpin(cx) {}
if let Err(e) = self.set_priority_group() {
error!(
target: LOG_TARGET,
"Failed to set priority group: {:?}", e,
);
}
}
// Handle messages from [`Service`].
while let Poll::Ready(Some(msg)) = self.from_service.poll_next_unpin(cx) {
match msg {
ServicetoWorkerMsg::GetAddressesByAuthorityId(authority, sender) => {
let _ = sender.send(
self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone),
);
}
ServicetoWorkerMsg::GetAuthorityIdByPeerId(peer_id, sender) => {
let _ = sender.send(
self.addr_cache.get_authority_id_by_peer_id(&peer_id).map(Clone::clone),
);
}
}
}
self.start_new_lookups();
Poll::Pending
}
}
/// NetworkProvider provides [`Worker`] with all necessary hooks into the
/// underlying Substrate networking. Using this trait abstraction instead of [`NetworkService`]
/// directly is necessary to unit test [`Worker`].
@@ -824,7 +795,7 @@ impl Metrics {
// Helper functions for unit testing.
#[cfg(test)]
impl<Client, Network, Block> Worker<Client, Network, Block>
impl<Block, Client, Network, DhtEventStream> Worker<Client, Network, Block, DhtEventStream>
where
Block: BlockT + 'static,
Network: NetworkProvider,
@@ -18,18 +18,19 @@
use crate::worker::schema;
use std::{iter::FromIterator, sync::{Arc, Mutex}};
use std::{iter::FromIterator, sync::{Arc, Mutex}, task::Poll};
use futures::channel::mpsc::channel;
use futures::channel::mpsc::{self, channel};
use futures::executor::{block_on, LocalPool};
use futures::future::{poll_fn, FutureExt};
use futures::future::FutureExt;
use futures::sink::SinkExt;
use futures::task::LocalSpawn;
use futures::poll;
use libp2p::{kad, core::multiaddr, PeerId};
use prometheus_endpoint::prometheus::default_registry;
use sp_api::{ProvideRuntimeApi, ApiRef};
use sp_core::{crypto::Public, testing::KeyStore};
use sp_core::crypto::Public;
use sp_keystore::{testing::KeyStore, CryptoStore};
use sp_runtime::traits::{Zero, Block as BlockT, NumberFor};
use substrate_test_runtime_client::runtime::Block;
@@ -166,6 +167,16 @@ sp_api::mock_impl_runtime_apis! {
}
}
#[derive(Debug)]
pub enum TestNetworkEvent {
GetCalled(kad::record::Key),
PutCalled(kad::record::Key, Vec<u8>),
SetPriorityGroupCalled {
group_id: String,
peers: HashSet<Multiaddr>
},
}
pub struct TestNetwork {
peer_id: PeerId,
external_addresses: Vec<Multiaddr>,
@@ -174,10 +185,19 @@ pub struct TestNetwork {
pub put_value_call: Arc<Mutex<Vec<(kad::record::Key, Vec<u8>)>>>,
pub get_value_call: Arc<Mutex<Vec<kad::record::Key>>>,
pub set_priority_group_call: Arc<Mutex<Vec<(String, HashSet<Multiaddr>)>>>,
event_sender: mpsc::UnboundedSender<TestNetworkEvent>,
event_receiver: Option<mpsc::UnboundedReceiver<TestNetworkEvent>>,
}
impl TestNetwork {
fn get_event_receiver(&mut self) -> Option<mpsc::UnboundedReceiver<TestNetworkEvent>> {
self.event_receiver.take()
}
}
impl Default for TestNetwork {
fn default() -> Self {
let (tx, rx) = mpsc::unbounded();
TestNetwork {
peer_id: PeerId::random(),
external_addresses: vec![
@@ -187,6 +207,8 @@ impl Default for TestNetwork {
put_value_call: Default::default(),
get_value_call: Default::default(),
set_priority_group_call: Default::default(),
event_sender: tx,
event_receiver: Some(rx),
}
}
}
@@ -200,14 +222,20 @@ impl NetworkProvider for TestNetwork {
self.set_priority_group_call
.lock()
.unwrap()
.push((group_id, peers));
.push((group_id.clone(), peers.clone()));
self.event_sender.clone().unbounded_send(TestNetworkEvent::SetPriorityGroupCalled {
group_id,
peers,
}).unwrap();
Ok(())
}
fn put_value(&self, key: kad::record::Key, value: Vec<u8>) {
self.put_value_call.lock().unwrap().push((key, value));
self.put_value_call.lock().unwrap().push((key.clone(), value.clone()));
self.event_sender.clone().unbounded_send(TestNetworkEvent::PutCalled(key, value)).unwrap();
}
fn get_value(&self, key: &kad::record::Key) {
self.get_value_call.lock().unwrap().push(key.clone());
self.event_sender.clone().unbounded_send(TestNetworkEvent::GetCalled(key.clone())).unwrap();
}
}
@@ -221,10 +249,10 @@ impl NetworkStateInfo for TestNetwork {
}
}
fn build_dht_event(
async fn build_dht_event(
addresses: Vec<Multiaddr>,
public_key: AuthorityId,
key_store: &BareCryptoStorePtr,
key_store: &KeyStore,
) -> (libp2p::kad::record::Key, Vec<u8>) {
let mut serialized_addresses = vec![];
schema::AuthorityAddresses {
@@ -233,12 +261,13 @@ fn build_dht_event(
.map_err(Error::EncodingProto)
.unwrap();
let signature = key_store.read()
let signature = key_store
.sign_with(
key_types::AUTHORITY_DISCOVERY,
&public_key.clone().into(),
serialized_addresses.as_slice(),
)
.await
.map_err(|_| Error::Signing)
.unwrap();
@@ -258,7 +287,7 @@ fn build_dht_event(
#[test]
fn new_registers_metrics() {
let (_dht_event_tx, dht_event_rx) = channel(1000);
let (_dht_event_tx, dht_event_rx) = mpsc::channel(1000);
let network: Arc<TestNetwork> = Arc::new(Default::default());
let key_store = KeyStore::new();
let test_api = Arc::new(TestApi {
@@ -273,8 +302,8 @@ fn new_registers_metrics() {
test_api,
network.clone(),
vec![],
dht_event_rx.boxed(),
Role::Authority(key_store),
Box::pin(dht_event_rx),
Role::Authority(key_store.into()),
Some(registry.clone()),
);
@@ -289,12 +318,11 @@ fn triggers_dht_get_query() {
// Generate authority keys
let authority_1_key_pair = AuthorityPair::from_seed_slice(&[1; 32]).unwrap();
let authority_2_key_pair = AuthorityPair::from_seed_slice(&[2; 32]).unwrap();
let authorities = vec![authority_1_key_pair.public(), authority_2_key_pair.public()];
let test_api = Arc::new(TestApi {
authorities: vec![authority_1_key_pair.public(), authority_2_key_pair.public()],
});
let test_api = Arc::new(TestApi { authorities: authorities.clone() });
let network: Arc<TestNetwork> = Arc::new(Default::default());
let network = Arc::new(TestNetwork::default());
let key_store = KeyStore::new();
let (_to_worker, from_service) = mpsc::channel(0);
@@ -303,26 +331,24 @@ fn triggers_dht_get_query() {
test_api,
network.clone(),
vec![],
dht_event_rx.boxed(),
Role::Authority(key_store),
Box::pin(dht_event_rx),
Role::Authority(key_store.into()),
None,
);
worker.refill_pending_lookups_queue().unwrap();
futures::executor::block_on(futures::future::poll_fn(|cx| {
assert_eq!(Poll::Pending, worker.poll_unpin(cx));
Poll::Ready(())
}));
// Expect authority discovery to request new records from the dht.
assert_eq!(network.get_value_call.lock().unwrap().len(), 2);
futures::executor::block_on(async {
worker.refill_pending_lookups_queue().await.unwrap();
worker.start_new_lookups();
assert_eq!(network.get_value_call.lock().unwrap().len(), authorities.len());
})
}
#[test]
fn publish_discover_cycle() {
sp_tracing::try_init_simple();
let mut pool = LocalPool::new();
// Node A publishing its address.
let (_dht_event_tx, dht_event_rx) = channel(1000);
@@ -338,66 +364,66 @@ fn publish_discover_cycle() {
};
let key_store = KeyStore::new();
let node_a_public = key_store
.write()
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
.unwrap();
let test_api = Arc::new(TestApi {
authorities: vec![node_a_public.into()],
});
let (_to_worker, from_service) = mpsc::channel(0);
let mut worker = Worker::new(
from_service,
test_api,
network.clone(),
vec![],
dht_event_rx.boxed(),
Role::Authority(key_store),
None,
);
let _ = pool.spawner().spawn_local_obj(async move {
let node_a_public = key_store
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
.await
.unwrap();
let test_api = Arc::new(TestApi {
authorities: vec![node_a_public.into()],
});
worker.publish_ext_addresses().unwrap();
let (_to_worker, from_service) = mpsc::channel(0);
let mut worker = Worker::new(
from_service,
test_api,
network.clone(),
vec![],
Box::pin(dht_event_rx),
Role::Authority(key_store.into()),
None,
);
// Expect authority discovery to put a new record onto the dht.
assert_eq!(network.put_value_call.lock().unwrap().len(), 1);
worker.publish_ext_addresses().await.unwrap();
let dht_event = {
let (key, value) = network.put_value_call.lock().unwrap().pop().unwrap();
sc_network::DhtEvent::ValueFound(vec![(key, value)])
};
// Expect authority discovery to put a new record onto the dht.
assert_eq!(network.put_value_call.lock().unwrap().len(), 1);
// Node B discovering node A's address.
let dht_event = {
let (key, value) = network.put_value_call.lock().unwrap().pop().unwrap();
sc_network::DhtEvent::ValueFound(vec![(key, value)])
};
let (mut dht_event_tx, dht_event_rx) = channel(1000);
let test_api = Arc::new(TestApi {
// Make sure node B identifies node A as an authority.
authorities: vec![node_a_public.into()],
});
let network: Arc<TestNetwork> = Arc::new(Default::default());
let key_store = KeyStore::new();
// Node B discovering node A's address.
let (_to_worker, from_service) = mpsc::channel(0);
let mut worker = Worker::new(
from_service,
test_api,
network.clone(),
vec![],
dht_event_rx.boxed(),
Role::Authority(key_store),
None,
);
let (mut dht_event_tx, dht_event_rx) = channel(1000);
let test_api = Arc::new(TestApi {
// Make sure node B identifies node A as an authority.
authorities: vec![node_a_public.into()],
});
let network: Arc<TestNetwork> = Arc::new(Default::default());
let key_store = KeyStore::new();
dht_event_tx.try_send(dht_event).unwrap();
let (_to_worker, from_service) = mpsc::channel(0);
let mut worker = Worker::new(
from_service,
test_api,
network.clone(),
vec![],
Box::pin(dht_event_rx),
Role::Authority(key_store.into()),
None,
);
let f = |cx: &mut Context<'_>| -> Poll<()> {
worker.refill_pending_lookups_queue().unwrap();
dht_event_tx.try_send(dht_event.clone()).unwrap();
worker.refill_pending_lookups_queue().await.unwrap();
worker.start_new_lookups();
// Make authority discovery handle the event.
if let Poll::Ready(e) = worker.handle_dht_events(cx) {
panic!("Unexpected error: {:?}", e);
}
worker.handle_dht_event(dht_event).await;
worker.set_priority_group().unwrap();
// Expect authority discovery to set the priority set.
@@ -410,13 +436,12 @@ fn publish_discover_cycle() {
HashSet::from_iter(vec![node_a_multiaddr.clone()].into_iter())
)
);
}.boxed_local().into());
Poll::Ready(())
};
let _ = block_on(poll_fn(f));
pool.run();
}
/// Don't terminate when sender side of service channel is dropped. Terminate when network event
/// stream terminates.
#[test]
fn terminate_when_event_stream_terminates() {
let (dht_event_tx, dht_event_rx) = channel(1000);
@@ -426,91 +451,76 @@ fn terminate_when_event_stream_terminates() {
authorities: vec![],
});
let (_to_worker, from_service) = mpsc::channel(0);
let mut worker = Worker::new(
let (to_worker, from_service) = mpsc::channel(0);
let worker = Worker::new(
from_service,
test_api,
network.clone(),
vec![],
dht_event_rx.boxed(),
Role::Authority(key_store),
Box::pin(dht_event_rx),
Role::Authority(key_store.into()),
None,
);
).run();
futures::pin_mut!(worker);
block_on(async {
assert_eq!(Poll::Pending, poll!(&mut worker));
assert_eq!(Poll::Pending, futures::poll!(&mut worker));
// Simulate termination of the network through dropping the sender side of the dht event
// channel.
// Drop sender side of service channel.
drop(to_worker);
assert_eq!(
Poll::Pending, futures::poll!(&mut worker),
"Expect the authority discovery module not to terminate once the \
sender side of the service channel is closed.",
);
// Simulate termination of the network through dropping the sender side
// of the dht event channel.
drop(dht_event_tx);
assert_eq!(
Poll::Ready(()), poll!(&mut worker),
"Expect the authority discovery module to terminate once the sending side of the dht \
event channel is terminated.",
Poll::Ready(()), futures::poll!(&mut worker),
"Expect the authority discovery module to terminate once the \
sending side of the dht event channel is closed.",
);
});
}
});}
#[test]
fn continue_operating_when_service_channel_is_dropped() {
let (_dht_event_tx, dht_event_rx) = channel(0);
let network: Arc<TestNetwork> = Arc::new(Default::default());
let key_store = KeyStore::new();
let test_api = Arc::new(TestApi {
authorities: vec![],
});
fn dont_stop_polling_dht_event_stream_after_bogus_event() {
let remote_multiaddr = {
let peer_id = PeerId::random();
let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap();
let (to_worker, from_service) = mpsc::channel(0);
let mut worker = Worker::new(
from_service,
test_api,
network.clone(),
vec![],
dht_event_rx.boxed(),
Role::Authority(key_store),
None,
);
address.with(multiaddr::Protocol::P2p(
peer_id.into(),
))
};
let remote_key_store = KeyStore::new();
let remote_public_key: AuthorityId = block_on(
remote_key_store.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None),
).unwrap().into();
block_on(async {
assert_eq!(Poll::Pending, poll!(&mut worker));
drop(to_worker);
for _ in 0..100 {
assert_eq!(
Poll::Pending, poll!(&mut worker),
"Expect authority discovery `Worker` not to panic when service channel is dropped.",
);
}
});
}
#[test]
fn dont_stop_polling_when_error_is_returned() {
#[derive(PartialEq, Debug)]
enum Event {
Processed,
End,
let (mut dht_event_tx, dht_event_rx) = channel(1);
let (network, mut network_events) = {
let mut n = TestNetwork::default();
let r = n.get_event_receiver().unwrap();
(Arc::new(n), r)
};
let (mut dht_event_tx, dht_event_rx) = channel(1000);
let (mut discovery_update_tx, mut discovery_update_rx) = channel(1000);
let network: Arc<TestNetwork> = Arc::new(Default::default());
let key_store = KeyStore::new();
let test_api = Arc::new(TestApi {
authorities: vec![],
authorities: vec![remote_public_key.clone()],
});
let mut pool = LocalPool::new();
let (_to_worker, from_service) = mpsc::channel(0);
let (mut to_worker, from_service) = mpsc::channel(1);
let mut worker = Worker::new(
from_service,
test_api,
network.clone(),
vec![],
dht_event_rx.boxed(),
Role::Authority(key_store),
Box::pin(dht_event_rx),
Role::Authority(Arc::new(key_store)),
None,
);
@@ -518,45 +528,43 @@ fn dont_stop_polling_when_error_is_returned() {
//
// As this is a local pool, only one future at a time will have the CPU and
// can make progress until the future returns `Pending`.
pool.spawner().spawn_local_obj(
futures::future::poll_fn(move |ctx| {
match std::pin::Pin::new(&mut worker).poll(ctx) {
Poll::Ready(()) => {},
Poll::Pending => {
discovery_update_tx.send(Event::Processed).now_or_never();
return Poll::Pending;
},
}
let _ = discovery_update_tx.send(Event::End).now_or_never().unwrap();
Poll::Ready(())
}).boxed_local().into(),
).expect("Spawns authority discovery");
let _ = pool.spawner().spawn_local_obj(async move {
// Refilling `pending_lookups` only happens every X minutes. Fast
// forward by calling `refill_pending_lookups_queue` directly.
worker.refill_pending_lookups_queue().await.unwrap();
worker.run().await
}.boxed_local().into());
pool.run_until(
// The future that drives the event stream
async {
// Send an event that should generate an error
let _ = dht_event_tx.send(DhtEvent::ValueFound(Default::default())).now_or_never();
// Send the same event again to make sure that the event stream needs to be polled twice
// to be woken up again.
let _ = dht_event_tx.send(DhtEvent::ValueFound(Default::default())).now_or_never();
pool.run_until(async {
// Assert worker to trigger a lookup for the one and only authority.
assert!(matches!(
network_events.next().await,
Some(TestNetworkEvent::GetCalled(_))
));
// Now we call `await` and give the control to the authority discovery future.
assert_eq!(Some(Event::Processed), discovery_update_rx.next().await);
// Send an event that should generate an error
dht_event_tx.send(DhtEvent::ValueFound(Default::default())).await
.expect("Channel has capacity of 1.");
// Drop the event rx to stop the authority discovery. If it was polled correctly, it
// should end properly.
drop(dht_event_tx);
// Make previously triggered lookup succeed.
let dht_event = {
let (key, value) = build_dht_event(
vec![remote_multiaddr.clone()],
remote_public_key.clone(), &remote_key_store,
).await;
sc_network::DhtEvent::ValueFound(vec![(key, value)])
};
dht_event_tx.send(dht_event).await.expect("Channel has capacity of 1.");
assert!(
discovery_update_rx.collect::<Vec<Event>>()
.await
.into_iter()
.any(|evt| evt == Event::End),
"The authority discovery should have ended",
);
}
);
// Expect authority discovery to function normally, now knowing the
// address for the remote node.
let (sender, addresses) = futures::channel::oneshot::channel();
to_worker.send(ServicetoWorkerMsg::GetAddressesByAuthorityId(
remote_public_key,
sender,
)).await.expect("Channel has capacity of 1.");
assert_eq!(Some(vec![remote_multiaddr]), addresses.await.unwrap());
});
}
/// In the scenario of a validator publishing the address of its sentry node to
@@ -565,9 +573,8 @@ fn dont_stop_polling_when_error_is_returned() {
#[test]
fn never_add_own_address_to_priority_group() {
let validator_key_store = KeyStore::new();
let validator_public = validator_key_store
.write()
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
let validator_public = block_on(validator_key_store
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None))
.unwrap();
let sentry_network: Arc<TestNetwork> = Arc::new(Default::default());
@@ -589,11 +596,11 @@ fn never_add_own_address_to_priority_group() {
))
};
let dht_event = build_dht_event(
let dht_event = block_on(build_dht_event(
vec![sentry_multiaddr, random_multiaddr.clone()],
validator_public.into(),
&validator_key_store,
);
));
let (_dht_event_tx, dht_event_rx) = channel(1);
let sentry_test_api = Arc::new(TestApi {
@@ -607,12 +614,12 @@ fn never_add_own_address_to_priority_group() {
sentry_test_api,
sentry_network.clone(),
vec![],
dht_event_rx.boxed(),
Box::pin(dht_event_rx),
Role::Sentry,
None,
);
sentry_worker.refill_pending_lookups_queue().unwrap();
block_on(sentry_worker.refill_pending_lookups_queue()).unwrap();
sentry_worker.start_new_lookups();
sentry_worker.handle_dht_value_found_event(vec![dht_event]).unwrap();
@@ -636,9 +643,8 @@ fn never_add_own_address_to_priority_group() {
#[test]
fn limit_number_of_addresses_added_to_cache_per_authority() {
let remote_key_store = KeyStore::new();
let remote_public = remote_key_store
.write()
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
let remote_public = block_on(remote_key_store
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None))
.unwrap();
let addresses = (0..100).map(|_| {
@@ -649,11 +655,11 @@ fn limit_number_of_addresses_added_to_cache_per_authority() {
))
}).collect();
let dht_event = build_dht_event(
let dht_event = block_on(build_dht_event(
addresses,
remote_public.into(),
&remote_key_store,
);
));
let (_dht_event_tx, dht_event_rx) = channel(1);
@@ -663,12 +669,12 @@ fn limit_number_of_addresses_added_to_cache_per_authority() {
Arc::new(TestApi { authorities: vec![remote_public.into()] }),
Arc::new(TestNetwork::default()),
vec![],
dht_event_rx.boxed(),
Box::pin(dht_event_rx),
Role::Sentry,
None,
);
worker.refill_pending_lookups_queue().unwrap();
block_on(worker.refill_pending_lookups_queue()).unwrap();
worker.start_new_lookups();
worker.handle_dht_value_found_event(vec![dht_event]).unwrap();
@@ -681,9 +687,8 @@ fn limit_number_of_addresses_added_to_cache_per_authority() {
#[test]
fn do_not_cache_addresses_without_peer_id() {
let remote_key_store = KeyStore::new();
let remote_public = remote_key_store
.write()
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
let remote_public = block_on(remote_key_store
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None))
.unwrap();
let multiaddr_with_peer_id = {
@@ -695,14 +700,14 @@ fn do_not_cache_addresses_without_peer_id() {
let multiaddr_without_peer_id: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap();
let dht_event = build_dht_event(
let dht_event = block_on(build_dht_event(
vec![
multiaddr_with_peer_id.clone(),
multiaddr_without_peer_id,
],
remote_public.into(),
&remote_key_store,
);
));
let (_dht_event_tx, dht_event_rx) = channel(1);
let local_test_api = Arc::new(TestApi {
@@ -718,12 +723,12 @@ fn do_not_cache_addresses_without_peer_id() {
local_test_api,
local_network.clone(),
vec![],
dht_event_rx.boxed(),
Role::Authority(local_key_store),
Box::pin(dht_event_rx),
Role::Authority(Arc::new(local_key_store)),
None,
);
local_worker.refill_pending_lookups_queue().unwrap();
block_on(local_worker.refill_pending_lookups_queue()).unwrap();
local_worker.start_new_lookups();
local_worker.handle_dht_value_found_event(vec![dht_event]).unwrap();
@@ -753,8 +758,8 @@ fn addresses_to_publish_adds_p2p() {
}),
network.clone(),
vec![],
dht_event_rx.boxed(),
Role::Authority(KeyStore::new()),
Box::pin(dht_event_rx),
Role::Authority(Arc::new(KeyStore::new())),
Some(prometheus_endpoint::Registry::new()),
);
@@ -788,8 +793,8 @@ fn addresses_to_publish_respects_existing_p2p_protocol() {
}),
network.clone(),
vec![],
dht_event_rx.boxed(),
Role::Authority(KeyStore::new()),
Box::pin(dht_event_rx),
Role::Authority(Arc::new(KeyStore::new())),
Some(prometheus_endpoint::Registry::new()),
);
@@ -811,10 +816,9 @@ fn lookup_throttling() {
};
let remote_key_store = KeyStore::new();
let remote_public_keys: Vec<AuthorityId> = (0..20).map(|_| {
remote_key_store
.write()
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
.unwrap().into()
block_on(remote_key_store
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None))
.unwrap().into()
}).collect();
let remote_hash_to_key = remote_public_keys.iter()
.map(|k| (hash_authority_id(k.as_ref()), k.clone()))
@@ -823,7 +827,9 @@ fn lookup_throttling() {
let (mut dht_event_tx, dht_event_rx) = channel(1);
let (_to_worker, from_service) = mpsc::channel(0);
let network = Arc::new(TestNetwork::default());
let mut network = TestNetwork::default();
let mut receiver = network.get_event_receiver().unwrap();
let network = Arc::new(network);
let mut worker = Worker::new(
from_service,
Arc::new(TestApi { authorities: remote_public_keys.clone() }),
@@ -831,50 +837,62 @@ fn lookup_throttling() {
vec![],
dht_event_rx.boxed(),
Role::Sentry,
None,
Some(default_registry().clone()),
);
futures::executor::block_on(futures::future::poll_fn(|cx| {
worker.refill_pending_lookups_queue().unwrap();
let mut pool = LocalPool::new();
let metrics = worker.metrics.clone().unwrap();
let _ = pool.spawner().spawn_local_obj(async move {
// Refilling `pending_lookups` only happens every X minutes. Fast
// forward by calling `refill_pending_lookups_queue` directly.
worker.refill_pending_lookups_queue().await.unwrap();
worker.run().await
}.boxed_local().into());
pool.run_until(async {
// Assert worker to trigger MAX_IN_FLIGHT_LOOKUPS lookups.
assert_eq!(Poll::Pending, worker.poll_unpin(cx));
assert_eq!(worker.pending_lookups.len(), remote_public_keys.len() - MAX_IN_FLIGHT_LOOKUPS);
assert_eq!(worker.in_flight_lookups.len(), MAX_IN_FLIGHT_LOOKUPS);
for _ in 0..MAX_IN_FLIGHT_LOOKUPS {
assert!(matches!(receiver.next().await, Some(TestNetworkEvent::GetCalled(_))));
}
assert_eq!(
metrics.requests_pending.get(),
(remote_public_keys.len() - MAX_IN_FLIGHT_LOOKUPS) as u64
);
assert_eq!(network.get_value_call.lock().unwrap().len(), MAX_IN_FLIGHT_LOOKUPS);
// Make first lookup succeed.
let remote_hash = network.get_value_call.lock().unwrap().pop().unwrap();
let remote_key: AuthorityId = remote_hash_to_key.get(&remote_hash).unwrap().clone();
let dht_event = {
let (key, value) = build_dht_event(vec![remote_multiaddr.clone()], remote_key, &remote_key_store);
let (key, value) = build_dht_event(
vec![remote_multiaddr.clone()],
remote_key,
&remote_key_store
).await;
sc_network::DhtEvent::ValueFound(vec![(key, value)])
};
dht_event_tx.try_send(dht_event).expect("Channel has capacity of 1.");
dht_event_tx.send(dht_event).await.expect("Channel has capacity of 1.");
// Assert worker to trigger another lookup.
assert_eq!(Poll::Pending, worker.poll_unpin(cx));
assert_eq!(worker.pending_lookups.len(), remote_public_keys.len() - MAX_IN_FLIGHT_LOOKUPS - 1);
assert_eq!(worker.in_flight_lookups.len(), MAX_IN_FLIGHT_LOOKUPS);
assert!(matches!(receiver.next().await, Some(TestNetworkEvent::GetCalled(_))));
assert_eq!(
metrics.requests_pending.get(),
(remote_public_keys.len() - MAX_IN_FLIGHT_LOOKUPS - 1) as u64
);
assert_eq!(network.get_value_call.lock().unwrap().len(), MAX_IN_FLIGHT_LOOKUPS);
// Make second one fail.
let remote_hash = network.get_value_call.lock().unwrap().pop().unwrap();
let dht_event = sc_network::DhtEvent::ValueNotFound(remote_hash);
dht_event_tx.try_send(dht_event).expect("Channel has capacity of 1.");
dht_event_tx.send(dht_event).await.expect("Channel has capacity of 1.");
// Assert worker to trigger another lookup.
assert_eq!(Poll::Pending, worker.poll_unpin(cx));
assert_eq!(worker.pending_lookups.len(), remote_public_keys.len() - MAX_IN_FLIGHT_LOOKUPS - 2);
assert_eq!(worker.in_flight_lookups.len(), MAX_IN_FLIGHT_LOOKUPS);
assert!(matches!(receiver.next().await, Some(TestNetworkEvent::GetCalled(_))));
assert_eq!(
metrics.requests_pending.get(),
(remote_public_keys.len() - MAX_IN_FLIGHT_LOOKUPS - 2) as u64
);
assert_eq!(network.get_value_call.lock().unwrap().len(), MAX_IN_FLIGHT_LOOKUPS);
worker.refill_pending_lookups_queue().unwrap();
// Assert worker to restock pending lookups and forget about in-flight lookups.
assert_eq!(worker.pending_lookups.len(), remote_public_keys.len());
assert_eq!(worker.in_flight_lookups.len(), 0);
Poll::Ready(())
}));
}.boxed_local());
}