mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 16:57:58 +00:00
Keep track of the pending response for each peer individually (#13941)
* Keep track of the pending response for each peer individually When peer disconnects or the syncing is restarted, remove the pending response so syncing won't start sending duplicate requests/receive stale responses from disconnected peers. Before this commit pending responses where stored in `FuturesUnordered` which made it hard to keep track of pending responses for each individual peer. * Update client/network/sync/src/lib.rs Co-authored-by: Bastian Köcher <git@kchr.de> * ".git/.scripts/commands/fmt/fmt.sh" * Apply suggestions from code review Co-authored-by: Dmitry Markin <dmitry@markin.tech> Co-authored-by: Sebastian Kunert <skunert49@gmail.com> * Update client/network/sync/src/lib.rs --------- Co-authored-by: Bastian Köcher <git@kchr.de> Co-authored-by: command-bot <> Co-authored-by: Dmitry Markin <dmitry@markin.tech> Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
This commit is contained in:
@@ -344,7 +344,7 @@ pub struct ChainSync<B: BlockT, Client> {
|
||||
/// Protocol name used to send out warp sync requests
|
||||
warp_sync_protocol_name: Option<ProtocolName>,
|
||||
/// Pending responses
|
||||
pending_responses: FuturesUnordered<PendingResponse<B>>,
|
||||
pending_responses: HashMap<PeerId, PendingResponse<B>>,
|
||||
/// Handle to import queue.
|
||||
import_queue: Box<dyn ImportQueueService<B>>,
|
||||
/// Metrics.
|
||||
@@ -1238,6 +1238,7 @@ where
|
||||
gap_sync.blocks.clear_peer_download(who)
|
||||
}
|
||||
self.peers.remove(who);
|
||||
self.pending_responses.remove(who);
|
||||
self.extra_justifications.peer_disconnected(who);
|
||||
self.allowed_requests.set_all();
|
||||
self.fork_targets.retain(|_, target| {
|
||||
@@ -1360,7 +1361,7 @@ where
|
||||
|
||||
if self.peers.contains_key(&who) {
|
||||
self.pending_responses
|
||||
.push(Box::pin(async move { (who, PeerRequest::Block(request), rx.await) }));
|
||||
.insert(who, Box::pin(async move { (who, PeerRequest::Block(request), rx.await) }));
|
||||
}
|
||||
|
||||
match self.encode_block_request(&opaque_req) {
|
||||
@@ -1457,7 +1458,7 @@ where
|
||||
.notifications_protocol
|
||||
.clone()
|
||||
.into(),
|
||||
pending_responses: Default::default(),
|
||||
pending_responses: HashMap::new(),
|
||||
import_queue,
|
||||
metrics: if let Some(r) = &metrics_registry {
|
||||
match SyncingMetrics::register(r) {
|
||||
@@ -1810,6 +1811,9 @@ where
|
||||
return None
|
||||
}
|
||||
|
||||
// since the request is not a justification, remove it from pending responses
|
||||
self.pending_responses.remove(&id);
|
||||
|
||||
// handle peers that were in other states.
|
||||
match self.new_peer(id, p.best_hash, p.best_number) {
|
||||
Ok(None) => None,
|
||||
@@ -2009,7 +2013,7 @@ where
|
||||
|
||||
if self.peers.contains_key(&who) {
|
||||
self.pending_responses
|
||||
.push(Box::pin(async move { (who, PeerRequest::State, rx.await) }));
|
||||
.insert(who, Box::pin(async move { (who, PeerRequest::State, rx.await) }));
|
||||
}
|
||||
|
||||
match self.encode_state_request(&request) {
|
||||
@@ -2037,7 +2041,7 @@ where
|
||||
|
||||
if self.peers.contains_key(&who) {
|
||||
self.pending_responses
|
||||
.push(Box::pin(async move { (who, PeerRequest::WarpProof, rx.await) }));
|
||||
.insert(who, Box::pin(async move { (who, PeerRequest::WarpProof, rx.await) }));
|
||||
}
|
||||
|
||||
match &self.warp_sync_protocol_name {
|
||||
@@ -2175,9 +2179,20 @@ where
|
||||
}
|
||||
|
||||
fn poll_pending_responses(&mut self, cx: &mut std::task::Context) -> Poll<ImportResult<B>> {
|
||||
while let Poll::Ready(Some((id, request, response))) =
|
||||
self.pending_responses.poll_next_unpin(cx)
|
||||
{
|
||||
let ready_responses = self
|
||||
.pending_responses
|
||||
.values_mut()
|
||||
.filter_map(|future| match future.poll_unpin(cx) {
|
||||
Poll::Pending => None,
|
||||
Poll::Ready(result) => Some(result),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (id, request, response) in ready_responses {
|
||||
self.pending_responses
|
||||
.remove(&id)
|
||||
.expect("Logic error: peer id from pending response is missing in the map.");
|
||||
|
||||
match response {
|
||||
Ok(Ok(resp)) => match request {
|
||||
PeerRequest::Block(req) => {
|
||||
@@ -4116,4 +4131,89 @@ mod test {
|
||||
let state = AncestorSearchState::<Block>::BinarySearch(1, 3);
|
||||
assert!(handle_ancestor_search_state(&state, 2, true).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync_restart_removes_block_but_not_justification_requests() {
|
||||
let mut client = Arc::new(TestClientBuilder::new().build());
|
||||
let block_announce_validator = Box::new(DefaultBlockAnnounceValidator);
|
||||
let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
|
||||
let (_chain_sync_network_provider, chain_sync_network_handle) =
|
||||
NetworkServiceProvider::new();
|
||||
let (mut sync, _) = ChainSync::new(
|
||||
SyncMode::Full,
|
||||
client.clone(),
|
||||
ProtocolId::from("test-protocol-name"),
|
||||
&Some(String::from("test-fork-id")),
|
||||
Roles::from(&Role::Full),
|
||||
block_announce_validator,
|
||||
1,
|
||||
64,
|
||||
None,
|
||||
None,
|
||||
chain_sync_network_handle,
|
||||
import_queue,
|
||||
ProtocolName::from("block-request"),
|
||||
ProtocolName::from("state-request"),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let peers = vec![PeerId::random(), PeerId::random()];
|
||||
|
||||
let mut new_blocks = |n| {
|
||||
for _ in 0..n {
|
||||
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
|
||||
block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
|
||||
}
|
||||
|
||||
let info = client.info();
|
||||
(info.best_hash, info.best_number)
|
||||
};
|
||||
|
||||
let (b1_hash, b1_number) = new_blocks(50);
|
||||
|
||||
// add new peer and request blocks from them
|
||||
sync.new_peer(peers[0], Hash::random(), 42).unwrap();
|
||||
|
||||
// we wil send block requests to these peers
|
||||
// for these blocks we don't know about
|
||||
for (peer, request) in sync.block_requests() {
|
||||
sync.send_block_request(peer, request);
|
||||
}
|
||||
|
||||
// add a new peer at a known block
|
||||
sync.new_peer(peers[1], b1_hash, b1_number).unwrap();
|
||||
|
||||
// we request a justification for a block we have locally
|
||||
sync.request_justification(&b1_hash, b1_number);
|
||||
|
||||
// the justification request should be scheduled to the
|
||||
// new peer which is at the given block
|
||||
let mut requests = sync.justification_requests().collect::<Vec<_>>();
|
||||
assert_eq!(requests.len(), 1);
|
||||
let (peer, request) = requests.remove(0);
|
||||
sync.send_block_request(peer, request);
|
||||
|
||||
assert!(!std::matches!(
|
||||
sync.peers.get(&peers[0]).unwrap().state,
|
||||
PeerSyncState::DownloadingJustification(_),
|
||||
));
|
||||
assert_eq!(
|
||||
sync.peers.get(&peers[1]).unwrap().state,
|
||||
PeerSyncState::DownloadingJustification(b1_hash),
|
||||
);
|
||||
assert_eq!(sync.pending_responses.len(), 2);
|
||||
|
||||
let requests = sync.restart().collect::<Vec<_>>();
|
||||
assert!(requests.iter().any(|res| res.as_ref().unwrap().0 == peers[0]));
|
||||
|
||||
assert_eq!(sync.pending_responses.len(), 1);
|
||||
assert!(sync.pending_responses.get(&peers[1]).is_some());
|
||||
assert_eq!(
|
||||
sync.peers.get(&peers[1]).unwrap().state,
|
||||
PeerSyncState::DownloadingJustification(b1_hash),
|
||||
);
|
||||
sync.peer_disconnected(&peers[1]);
|
||||
assert_eq!(sync.pending_responses.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user