mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 21:21:11 +00:00
Fix peer discovery delays. (#2982)
* Fix DiscoveryBehaviour::poll. The previous implementation regularly returned `NotReady` from `poll` despite the inner Kademlia behaviour having events ready, thus letting the `poll`ing be largely driven by the task wakeups from the Delay for the next random Kademlia query, inducing major delays in consuming the ready Kademlia events and thus slowing progress. The discovery test now passes quickly, as expected. * Ensure the Delay is polled right after reset. For task wakeup.
This commit is contained in:
@@ -181,65 +181,6 @@ where
|
||||
Self::OutEvent,
|
||||
>,
|
||||
> {
|
||||
// Poll Kademlia.
|
||||
match self.kademlia.poll(params) {
|
||||
Async::NotReady => (),
|
||||
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => {
|
||||
match ev {
|
||||
KademliaOut::Discovered { .. } => {}
|
||||
KademliaOut::KBucketAdded { peer_id, .. } => {
|
||||
let ev = DiscoveryOut::Discovered(peer_id);
|
||||
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
KademliaOut::FindNodeResult { key, closer_peers } => {
|
||||
trace!(target: "sub-libp2p", "Libp2p => Query for {:?} yielded {:?} results",
|
||||
key, closer_peers.len());
|
||||
if closer_peers.is_empty() {
|
||||
warn!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \
|
||||
results");
|
||||
}
|
||||
}
|
||||
KademliaOut::GetValueResult(res) => {
|
||||
let ev = match res {
|
||||
GetValueResult::Found { results } => {
|
||||
let results = results
|
||||
.into_iter()
|
||||
.map(|r| (r.key, r.value))
|
||||
.collect();
|
||||
|
||||
DiscoveryOut::ValueFound(results)
|
||||
}
|
||||
GetValueResult::NotFound { key, .. } => {
|
||||
DiscoveryOut::ValueNotFound(key)
|
||||
}
|
||||
};
|
||||
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
KademliaOut::PutValueResult(res) => {
|
||||
let ev = match res {
|
||||
PutValueResult::Ok{ key, .. } => {
|
||||
DiscoveryOut::ValuePut(key)
|
||||
}
|
||||
PutValueResult::Err { key, .. } => {
|
||||
DiscoveryOut::ValuePutFailed(key)
|
||||
}
|
||||
};
|
||||
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
// We never start any other type of query.
|
||||
KademliaOut::GetProvidersResult { .. } => {}
|
||||
}
|
||||
},
|
||||
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
|
||||
return Async::Ready(NetworkBehaviourAction::DialAddress { address }),
|
||||
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
|
||||
return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
|
||||
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
|
||||
return Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }),
|
||||
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
|
||||
return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
|
||||
}
|
||||
|
||||
// Poll the stream that fires when we need to start a random Kademlia query.
|
||||
loop {
|
||||
match self.next_kad_random_query.poll() {
|
||||
@@ -262,6 +203,67 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// Poll Kademlia.
|
||||
loop {
|
||||
match self.kademlia.poll(params) {
|
||||
Async::NotReady => break,
|
||||
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => {
|
||||
match ev {
|
||||
KademliaOut::Discovered { .. } => {}
|
||||
KademliaOut::KBucketAdded { peer_id, .. } => {
|
||||
let ev = DiscoveryOut::Discovered(peer_id);
|
||||
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
KademliaOut::FindNodeResult { key, closer_peers } => {
|
||||
trace!(target: "sub-libp2p", "Libp2p => Query for {:?} yielded {:?} results",
|
||||
key, closer_peers.len());
|
||||
if closer_peers.is_empty() {
|
||||
warn!(target: "sub-libp2p", "Libp2p => Random Kademlia query has yielded empty \
|
||||
results");
|
||||
}
|
||||
}
|
||||
KademliaOut::GetValueResult(res) => {
|
||||
let ev = match res {
|
||||
GetValueResult::Found { results } => {
|
||||
let results = results
|
||||
.into_iter()
|
||||
.map(|r| (r.key, r.value))
|
||||
.collect();
|
||||
|
||||
DiscoveryOut::ValueFound(results)
|
||||
}
|
||||
GetValueResult::NotFound { key, .. } => {
|
||||
DiscoveryOut::ValueNotFound(key)
|
||||
}
|
||||
};
|
||||
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
KademliaOut::PutValueResult(res) => {
|
||||
let ev = match res {
|
||||
PutValueResult::Ok{ key, .. } => {
|
||||
DiscoveryOut::ValuePut(key)
|
||||
}
|
||||
PutValueResult::Err { key, .. } => {
|
||||
DiscoveryOut::ValuePutFailed(key)
|
||||
}
|
||||
};
|
||||
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
// We never start any other type of query.
|
||||
KademliaOut::GetProvidersResult { .. } => {}
|
||||
}
|
||||
},
|
||||
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
|
||||
return Async::Ready(NetworkBehaviourAction::DialAddress { address }),
|
||||
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
|
||||
return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
|
||||
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
|
||||
return Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }),
|
||||
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
|
||||
return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
|
||||
}
|
||||
}
|
||||
|
||||
Async::NotReady
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user