Introduce ChainSyncInterface (#12489)

* Introduce `ChainSyncInterface`

`ChainSyncInterface` provides an asynchronous interface for other
subsystems to submit calls to `ChainSync`. This allows `NetworkService`
to delegate calls to `ChainSync` while still providing the same API
for other subsystems (for now). This makes it possible to move the
syncing code in piecemeal fashion out of `protocol.rs` as the calls
are just forwarded to `ChainSync`.

* Apply review comments

* Fix tests
This commit is contained in:
Aaro Altonen
2022-10-17 10:25:25 +03:00
committed by GitHub
parent 68e2513265
commit ce4cad8b8b
17 changed files with 301 additions and 45 deletions
+35 -10
View File
@@ -32,14 +32,18 @@ pub mod block_request_handler;
pub mod blocks;
pub mod mock;
mod schema;
pub mod service;
pub mod state;
pub mod state_request_handler;
#[cfg(test)]
mod tests;
pub mod warp;
pub mod warp_request_handler;
use crate::{
blocks::BlockCollection,
schema::v1::{StateRequest, StateResponse},
service::chain_sync::{ChainSyncInterfaceHandle, ToServiceCommand},
state::StateSync,
warp::{WarpProofImportResult, WarpSync},
};
@@ -67,6 +71,7 @@ use sc_network_common::{
PollBlockAnnounceValidation, SyncMode, SyncState, SyncStatus,
},
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
use sp_arithmetic::traits::Saturating;
use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
use sp_consensus::{
@@ -264,6 +269,8 @@ pub struct ChainSync<B: BlockT, Client> {
import_existing: bool,
/// Gap download process.
gap_sync: Option<GapSync<B>>,
/// Channel for receiving service commands
service_rx: TracingUnboundedReceiver<ToServiceCommand<B>>,
}
/// All the data we have about a Peer that we are trying to sync with
@@ -1725,6 +1732,21 @@ where
Ok(OpaqueStateResponse(Box::new(response)))
}
fn poll(
&mut self,
cx: &mut std::task::Context,
) -> Poll<PollBlockAnnounceValidation<B::Header>> {
while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) {
match event {
ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
self.set_sync_fork_request(peers, &hash, number);
},
}
}
self.poll_block_announce_validation(cx)
}
}
impl<B, Client> ChainSync<B, Client>
@@ -1746,7 +1768,9 @@ where
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
max_parallel_downloads: u32,
warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>,
) -> Result<Self, ClientError> {
) -> Result<(Self, Box<ChainSyncInterfaceHandle<B>>), ClientError> {
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync");
let mut sync = Self {
client,
peers: HashMap::new(),
@@ -1768,9 +1792,10 @@ where
warp_sync_provider,
import_existing: false,
gap_sync: None,
service_rx,
};
sync.reset_sync_start_point()?;
Ok(sync)
Ok((sync, Box::new(ChainSyncInterfaceHandle::new(tx))))
}
/// Returns the best seen block number if we don't have that block yet, `None` otherwise.
@@ -2664,7 +2689,7 @@ mod test {
let block_announce_validator = Box::new(DefaultBlockAnnounceValidator);
let peer_id = PeerId::random();
let mut sync =
let (mut sync, _) =
ChainSync::new(SyncMode::Full, client.clone(), block_announce_validator, 1, None)
.unwrap();
@@ -2712,7 +2737,7 @@ mod test {
#[test]
fn restart_doesnt_affect_peers_downloading_finality_data() {
let mut client = Arc::new(TestClientBuilder::new().build());
let mut sync = ChainSync::new(
let (mut sync, _) = ChainSync::new(
SyncMode::Full,
client.clone(),
Box::new(DefaultBlockAnnounceValidator),
@@ -2879,7 +2904,7 @@ mod test {
let mut client = Arc::new(TestClientBuilder::new().build());
let mut sync = ChainSync::new(
let (mut sync, _) = ChainSync::new(
SyncMode::Full,
client.clone(),
Box::new(DefaultBlockAnnounceValidator),
@@ -2994,7 +3019,7 @@ mod test {
let mut client = Arc::new(TestClientBuilder::new().build());
let info = client.info();
let mut sync = ChainSync::new(
let (mut sync, _) = ChainSync::new(
SyncMode::Full,
client.clone(),
Box::new(DefaultBlockAnnounceValidator),
@@ -3137,7 +3162,7 @@ mod test {
let info = client.info();
let mut sync = ChainSync::new(
let (mut sync, _) = ChainSync::new(
SyncMode::Full,
client.clone(),
Box::new(DefaultBlockAnnounceValidator),
@@ -3268,7 +3293,7 @@ mod test {
let info = client.info();
let mut sync = ChainSync::new(
let (mut sync, _) = ChainSync::new(
SyncMode::Full,
client.clone(),
Box::new(DefaultBlockAnnounceValidator),
@@ -3399,7 +3424,7 @@ mod test {
let mut client = Arc::new(TestClientBuilder::new().build());
let blocks = (0..3).map(|_| build_block(&mut client, None, false)).collect::<Vec<_>>();
let mut sync = ChainSync::new(
let (mut sync, _) = ChainSync::new(
SyncMode::Full,
client.clone(),
Box::new(DefaultBlockAnnounceValidator),
@@ -3432,7 +3457,7 @@ mod test {
let empty_client = Arc::new(TestClientBuilder::new().build());
let mut sync = ChainSync::new(
let (mut sync, _) = ChainSync::new(
SyncMode::Full,
empty_client.clone(),
Box::new(DefaultBlockAnnounceValidator),
@@ -114,5 +114,9 @@ mockall::mock! {
) -> Result<Vec<BlockData<Block>>, String>;
fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result<Vec<u8>, String>;
fn decode_state_response(&self, response: &[u8]) -> Result<OpaqueStateResponse, String>;
fn poll<'a>(
&mut self,
cx: &mut std::task::Context<'a>,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;
}
}
@@ -0,0 +1,58 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use libp2p::PeerId;
use sc_network_common::service::NetworkSyncForkRequest;
use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::{Block as BlockT, NumberFor};
/// Commands send to `ChainSync`
#[derive(Debug)]
pub enum ToServiceCommand<B: BlockT> {
SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
}
/// Handle for communicating with `ChainSync` asynchronously
pub struct ChainSyncInterfaceHandle<B: BlockT> {
tx: TracingUnboundedSender<ToServiceCommand<B>>,
}
impl<B: BlockT> ChainSyncInterfaceHandle<B> {
/// Create new handle
pub fn new(tx: TracingUnboundedSender<ToServiceCommand<B>>) -> Self {
Self { tx }
}
}
impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>>
for ChainSyncInterfaceHandle<B>
{
/// Configure an explicit fork sync request.
///
/// Note that this function should not be used for recent blocks.
/// Sync should be able to download all the recent forks normally.
/// `set_sync_fork_request` should only be used if external code detects that there's
/// a stale fork missing.
///
/// Passing empty `peers` set effectively removes the sync request.
fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
let _ = self
.tx
.unbounded_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number));
}
}
@@ -0,0 +1,31 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use libp2p::PeerId;
use sc_network_common::service::NetworkSyncForkRequest;
use sp_runtime::traits::{Block as BlockT, NumberFor};
mockall::mock! {
pub ChainSyncInterface<B: BlockT> {}
impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>>
for ChainSyncInterface<B>
{
fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>);
}
}
@@ -0,0 +1,22 @@
// This file is part of Substrate.
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! `ChainSync`-related service code
pub mod chain_sync;
pub mod mock;
@@ -0,0 +1,59 @@
// This file is part of Substrate.
// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{ChainSync, ForkTarget};
use libp2p::PeerId;
use sc_network_common::{service::NetworkSyncForkRequest, sync::ChainSync as ChainSyncT};
use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
use sp_core::H256;
use std::{sync::Arc, task::Poll};
use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _};
// verify that the fork target map is empty, then submit a new sync fork request,
// poll `ChainSync` and verify that a new sync fork request has been registered
#[async_std::test]
async fn delegate_to_chainsync() {
let (mut chain_sync, chain_sync_service) = ChainSync::new(
sc_network_common::sync::SyncMode::Full,
Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0),
Box::new(DefaultBlockAnnounceValidator),
1u32,
None,
)
.unwrap();
let hash = H256::random();
let in_number = 1337u64;
let peers = (0..3).map(|_| PeerId::random()).collect::<Vec<_>>();
assert!(chain_sync.fork_targets.is_empty());
chain_sync_service.set_sync_fork_request(peers, hash, in_number);
futures::future::poll_fn(|cx| {
let _ = chain_sync.poll(cx);
Poll::Ready(())
})
.await;
if let Some(ForkTarget { number, .. }) = chain_sync.fork_targets.get(&hash) {
assert_eq!(number, &in_number);
} else {
panic!("expected to contain `ForkTarget`");
}
}