client/authority-discovery: Throttle DHT requests (#7018)

* client/authority-discovery: Throttle DHT requests

Instead of passing one DHT query for each authority down to the network
every query interval, only pass MAX_IN_FLIGHT_LOOKUPS at a given point
in time, triggering new ones when previous ones return.

* client/authority-discovery/worker/test: Fix wrong constant
This commit is contained in:
Max Inden
2020-09-07 12:17:28 +02:00
committed by GitHub
parent aba092e8be
commit f677b40ed7
3 changed files with 254 additions and 163 deletions
@@ -221,6 +221,41 @@ impl NetworkStateInfo for TestNetwork {
}
}
fn build_dht_event(
addresses: Vec<Multiaddr>,
public_key: AuthorityId,
key_store: &BareCryptoStorePtr,
) -> (libp2p::kad::record::Key, Vec<u8>) {
let mut serialized_addresses = vec![];
schema::AuthorityAddresses {
addresses: addresses.into_iter().map(|a| a.to_vec()).collect()
}.encode(&mut serialized_addresses)
.map_err(Error::EncodingProto)
.unwrap();
let signature = key_store.read()
.sign_with(
key_types::AUTHORITY_DISCOVERY,
&public_key.clone().into(),
serialized_addresses.as_slice(),
)
.map_err(|_| Error::Signing)
.unwrap();
let mut signed_addresses = vec![];
schema::SignedAuthorityAddresses {
addresses: serialized_addresses.clone(),
signature,
}
.encode(&mut signed_addresses)
.map_err(Error::EncodingProto)
.unwrap();
let key = hash_authority_id(&public_key.to_raw_vec());
let value = signed_addresses;
(key, value)
}
#[test]
fn new_registers_metrics() {
let (_dht_event_tx, dht_event_rx) = channel(1000);
@@ -247,7 +282,7 @@ fn new_registers_metrics() {
}
#[test]
fn request_addresses_of_others_triggers_dht_get_query() {
fn triggers_dht_get_query() {
let _ = ::env_logger::try_init();
let (_dht_event_tx, dht_event_rx) = channel(1000);
@@ -262,7 +297,6 @@ fn request_addresses_of_others_triggers_dht_get_query() {
let network: Arc<TestNetwork> = Arc::new(Default::default());
let key_store = KeyStore::new();
let (_to_worker, from_service) = mpsc::channel(0);
let mut worker = Worker::new(
from_service,
@@ -274,7 +308,12 @@ fn request_addresses_of_others_triggers_dht_get_query() {
None,
);
worker.request_addresses_of_others().unwrap();
worker.refill_pending_lookups_queue().unwrap();
futures::executor::block_on(futures::future::poll_fn(|cx| {
assert_eq!(Poll::Pending, worker.poll_unpin(cx));
Poll::Ready(())
}));
// Expect authority discovery to request new records from the dht.
assert_eq!(network.get_value_call.lock().unwrap().len(), 2);
@@ -352,6 +391,9 @@ fn publish_discover_cycle() {
dht_event_tx.try_send(dht_event).unwrap();
let f = |cx: &mut Context<'_>| -> Poll<()> {
worker.refill_pending_lookups_queue().unwrap();
worker.start_new_lookups();
// Make authority discovery handle the event.
if let Poll::Ready(e) = worker.handle_dht_events(cx) {
panic!("Unexpected error: {:?}", e);
@@ -547,40 +589,11 @@ fn never_add_own_address_to_priority_group() {
))
};
let dht_event = {
let addresses = vec![
sentry_multiaddr.to_vec(),
random_multiaddr.to_vec(),
];
let mut serialized_addresses = vec![];
schema::AuthorityAddresses { addresses }
.encode(&mut serialized_addresses)
.map_err(Error::EncodingProto)
.unwrap();
let signature = validator_key_store.read()
.sign_with(
key_types::AUTHORITY_DISCOVERY,
&validator_public.clone().into(),
serialized_addresses.as_slice(),
)
.map_err(|_| Error::Signing)
.unwrap();
let mut signed_addresses = vec![];
schema::SignedAuthorityAddresses {
addresses: serialized_addresses.clone(),
signature,
}
.encode(&mut signed_addresses)
.map_err(Error::EncodingProto)
.unwrap();
let key = hash_authority_id(&validator_public.to_raw_vec());
let value = signed_addresses;
(key, value)
};
let dht_event = build_dht_event(
vec![sentry_multiaddr, random_multiaddr.clone()],
validator_public.into(),
&validator_key_store,
);
let (_dht_event_tx, dht_event_rx) = channel(1);
let sentry_test_api = Arc::new(TestApi {
@@ -599,6 +612,9 @@ fn never_add_own_address_to_priority_group() {
None,
);
sentry_worker.refill_pending_lookups_queue().unwrap();
sentry_worker.start_new_lookups();
sentry_worker.handle_dht_value_found_event(vec![dht_event]).unwrap();
sentry_worker.set_priority_group().unwrap();
@@ -625,43 +641,19 @@ fn limit_number_of_addresses_added_to_cache_per_authority() {
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
.unwrap();
let dht_event = {
let addresses = (0..100).map(|_| {
let peer_id = PeerId::random();
let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap();
address.with(multiaddr::Protocol::P2p(
peer_id.into(),
)).to_vec()
}).collect();
let addresses = (0..100).map(|_| {
let peer_id = PeerId::random();
let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap();
address.with(multiaddr::Protocol::P2p(
peer_id.into(),
))
}).collect();
let mut serialized_addresses = vec![];
schema::AuthorityAddresses { addresses }
.encode(&mut serialized_addresses)
.map_err(Error::EncodingProto)
.unwrap();
let signature = remote_key_store.read()
.sign_with(
key_types::AUTHORITY_DISCOVERY,
&remote_public.clone().into(),
serialized_addresses.as_slice(),
)
.map_err(|_| Error::Signing)
.unwrap();
let mut signed_addresses = vec![];
schema::SignedAuthorityAddresses {
addresses: serialized_addresses.clone(),
signature,
}
.encode(&mut signed_addresses)
.map_err(Error::EncodingProto)
.unwrap();
let key = hash_authority_id(&remote_public.to_raw_vec());
let value = signed_addresses;
(key, value)
};
let dht_event = build_dht_event(
addresses,
remote_public.into(),
&remote_key_store,
);
let (_dht_event_tx, dht_event_rx) = channel(1);
@@ -676,6 +668,9 @@ fn limit_number_of_addresses_added_to_cache_per_authority() {
None,
);
worker.refill_pending_lookups_queue().unwrap();
worker.start_new_lookups();
worker.handle_dht_value_found_event(vec![dht_event]).unwrap();
assert_eq!(
MAX_ADDRESSES_PER_AUTHORITY,
@@ -700,40 +695,14 @@ fn do_not_cache_addresses_without_peer_id() {
let multiaddr_without_peer_id: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap();
let dht_event = {
let addresses = vec![
multiaddr_with_peer_id.to_vec(),
multiaddr_without_peer_id.to_vec(),
];
let mut serialized_addresses = vec![];
schema::AuthorityAddresses { addresses }
.encode(&mut serialized_addresses)
.map_err(Error::EncodingProto)
.unwrap();
let signature = remote_key_store.read()
.sign_with(
key_types::AUTHORITY_DISCOVERY,
&remote_public.clone().into(),
serialized_addresses.as_slice(),
)
.map_err(|_| Error::Signing)
.unwrap();
let mut signed_addresses = vec![];
schema::SignedAuthorityAddresses {
addresses: serialized_addresses.clone(),
signature,
}
.encode(&mut signed_addresses)
.map_err(Error::EncodingProto)
.unwrap();
let key = hash_authority_id(&remote_public.to_raw_vec());
let value = signed_addresses;
(key, value)
};
let dht_event = build_dht_event(
vec![
multiaddr_with_peer_id.clone(),
multiaddr_without_peer_id,
],
remote_public.into(),
&remote_key_store,
);
let (_dht_event_tx, dht_event_rx) = channel(1);
let local_test_api = Arc::new(TestApi {
@@ -754,6 +723,9 @@ fn do_not_cache_addresses_without_peer_id() {
None,
);
local_worker.refill_pending_lookups_queue().unwrap();
local_worker.start_new_lookups();
local_worker.handle_dht_value_found_event(vec![dht_event]).unwrap();
assert_eq!(
@@ -826,3 +798,83 @@ fn addresses_to_publish_respects_existing_p2p_protocol() {
"Expected Multiaddr from `TestNetwork` to not be altered.",
);
}
#[test]
fn lookup_throttling() {
let remote_multiaddr = {
let peer_id = PeerId::random();
let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap();
address.with(multiaddr::Protocol::P2p(
peer_id.into(),
))
};
let remote_key_store = KeyStore::new();
let remote_public_keys: Vec<AuthorityId> = (0..20).map(|_| {
remote_key_store
.write()
.sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None)
.unwrap().into()
}).collect();
let remote_hash_to_key = remote_public_keys.iter()
.map(|k| (hash_authority_id(k.as_ref()), k.clone()))
.collect::<HashMap<_, _>>();
let (mut dht_event_tx, dht_event_rx) = channel(1);
let (_to_worker, from_service) = mpsc::channel(0);
let network = Arc::new(TestNetwork::default());
let mut worker = Worker::new(
from_service,
Arc::new(TestApi { authorities: remote_public_keys.clone() }),
network.clone(),
vec![],
dht_event_rx.boxed(),
Role::Sentry,
None,
);
futures::executor::block_on(futures::future::poll_fn(|cx| {
worker.refill_pending_lookups_queue().unwrap();
// Assert worker to trigger MAX_IN_FLIGHT_LOOKUPS lookups.
assert_eq!(Poll::Pending, worker.poll_unpin(cx));
assert_eq!(worker.pending_lookups.len(), remote_public_keys.len() - MAX_IN_FLIGHT_LOOKUPS);
assert_eq!(worker.in_flight_lookups.len(), MAX_IN_FLIGHT_LOOKUPS);
assert_eq!(network.get_value_call.lock().unwrap().len(), MAX_IN_FLIGHT_LOOKUPS);
// Make first lookup succeed.
let remote_hash = network.get_value_call.lock().unwrap().pop().unwrap();
let remote_key: AuthorityId = remote_hash_to_key.get(&remote_hash).unwrap().clone();
let dht_event = {
let (key, value) = build_dht_event(vec![remote_multiaddr.clone()], remote_key, &remote_key_store);
sc_network::DhtEvent::ValueFound(vec![(key, value)])
};
dht_event_tx.try_send(dht_event).expect("Channel has capacity of 1.");
// Assert worker to trigger another lookup.
assert_eq!(Poll::Pending, worker.poll_unpin(cx));
assert_eq!(worker.pending_lookups.len(), remote_public_keys.len() - MAX_IN_FLIGHT_LOOKUPS - 1);
assert_eq!(worker.in_flight_lookups.len(), MAX_IN_FLIGHT_LOOKUPS);
assert_eq!(network.get_value_call.lock().unwrap().len(), MAX_IN_FLIGHT_LOOKUPS);
// Make second one fail.
let remote_hash = network.get_value_call.lock().unwrap().pop().unwrap();
let dht_event = sc_network::DhtEvent::ValueNotFound(remote_hash);
dht_event_tx.try_send(dht_event).expect("Channel has capacity of 1.");
// Assert worker to trigger another lookup.
assert_eq!(Poll::Pending, worker.poll_unpin(cx));
assert_eq!(worker.pending_lookups.len(), remote_public_keys.len() - MAX_IN_FLIGHT_LOOKUPS - 2);
assert_eq!(worker.in_flight_lookups.len(), MAX_IN_FLIGHT_LOOKUPS);
assert_eq!(network.get_value_call.lock().unwrap().len(), MAX_IN_FLIGHT_LOOKUPS);
worker.refill_pending_lookups_queue().unwrap();
// Assert worker to restock pending lookups and forget about in-flight lookups.
assert_eq!(worker.pending_lookups.len(), remote_public_keys.len());
assert_eq!(worker.in_flight_lookups.len(), 0);
Poll::Ready(())
}));
}