mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 08:51:09 +00:00
Kademlia: Speed-up the record fetching (#13081)
Before libp2p 0.50.0 we used a quorum of one to fetch records from the DHT. In the pr that upgraded to libp2p 0.50.0 we accidentally changed this behavior. This pr brings back the old behavior of using a qorum of one and thus, a faster discovery. After finding the first value, we directly finish the query. There was also another behavior change in libp2p, they stopped automatic caching on remote nodes. This pr also brings back the remote caching on nodes that are nearest to the key from our point of view of the network. The pr that changed the behavior in libp2p: https://github.com/libp2p/rust-libp2p/pull/2712
This commit is contained in:
@@ -249,7 +249,7 @@ impl DiscoveryConfig {
|
||||
NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES)
|
||||
.expect("value is a constant; constant is non-zero; qed."),
|
||||
),
|
||||
outbound_query_records: Vec::new(),
|
||||
records_to_publish: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -287,8 +287,12 @@ pub struct DiscoveryBehaviour {
|
||||
allow_non_globals_in_dht: bool,
|
||||
/// A cache of discovered external addresses. Only used for logging purposes.
|
||||
known_external_addresses: LruHashSet<Multiaddr>,
|
||||
/// A cache of outbound query records.
|
||||
outbound_query_records: Vec<(record::Key, Vec<u8>)>,
|
||||
/// Records to publish per QueryId.
|
||||
///
|
||||
/// After finishing a Kademlia query, libp2p will return us a list of the closest peers that
|
||||
/// did not return the record(in `FinishedWithNoAdditionalRecord`). We will then put the record
|
||||
/// to these peers.
|
||||
records_to_publish: HashMap<QueryId, Record>,
|
||||
}
|
||||
|
||||
impl DiscoveryBehaviour {
|
||||
@@ -692,33 +696,54 @@ impl NetworkBehaviour for DiscoveryBehaviour {
|
||||
KademliaEvent::OutboundQueryProgressed {
|
||||
result: QueryResult::GetRecord(res),
|
||||
stats,
|
||||
step,
|
||||
id,
|
||||
..
|
||||
} => {
|
||||
let ev = match res {
|
||||
Ok(ok) =>
|
||||
if let GetRecordOk::FoundRecord(r) = ok {
|
||||
self.outbound_query_records
|
||||
.push((r.record.key, r.record.value));
|
||||
continue
|
||||
} else {
|
||||
debug!(
|
||||
target: "sub-libp2p",
|
||||
"Libp2p => Query progressed to {:?} step (last: {:?})",
|
||||
step.count,
|
||||
step.last,
|
||||
);
|
||||
if step.last {
|
||||
let records =
|
||||
self.outbound_query_records.drain(..).collect();
|
||||
DiscoveryOut::ValueFound(
|
||||
records,
|
||||
stats.duration().unwrap_or_default(),
|
||||
)
|
||||
} else {
|
||||
continue
|
||||
Ok(GetRecordOk::FoundRecord(r)) => {
|
||||
debug!(
|
||||
target: "sub-libp2p",
|
||||
"Libp2p => Found record ({:?}) with value: {:?}",
|
||||
r.record.key,
|
||||
r.record.value,
|
||||
);
|
||||
|
||||
// Let's directly finish the query, as we are only interested in a
|
||||
// quorum of 1.
|
||||
if let Some(kad) = self.kademlia.as_mut() {
|
||||
if let Some(mut query) = kad.query_mut(&id) {
|
||||
query.finish();
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
self.records_to_publish.insert(id, r.record.clone());
|
||||
|
||||
DiscoveryOut::ValueFound(
|
||||
vec![(r.record.key, r.record.value)],
|
||||
stats.duration().unwrap_or_default(),
|
||||
)
|
||||
},
|
||||
Ok(GetRecordOk::FinishedWithNoAdditionalRecord {
|
||||
cache_candidates,
|
||||
}) => {
|
||||
if cache_candidates.is_empty() {
|
||||
continue
|
||||
}
|
||||
|
||||
// Put the record to the `cache_candidates` that are nearest to the
|
||||
// record key from our point of view of the network.
|
||||
if let Some(record) = self.records_to_publish.remove(&id) {
|
||||
if let Some(kad) = self.kademlia.as_mut() {
|
||||
kad.put_record_to(
|
||||
record,
|
||||
cache_candidates.into_iter().map(|v| v.1),
|
||||
Quorum::One,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
continue
|
||||
},
|
||||
Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => {
|
||||
trace!(
|
||||
target: "sub-libp2p",
|
||||
|
||||
Reference in New Issue
Block a user