Move import queue out of sc-network (#12764)

* Move import queue out of `sc-network`

Add supplementary asynchronous API for the import queue which means
it can be run as an independent task and communicated with through
the `ImportQueueService`.

This commit removes removes block and justification imports from
`sc-network` and provides `ChainSync` with a handle to import queue so
it can import blocks and justifications. Polling of the import queue is
moved complete out of `sc-network` and `sc_consensus::Link` is
implemented for `ChainSyncInterfaceHandled` so the import queue
can still influence the syncing process.

* Fix tests

* Apply review comments

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <git@kchr.de>

* Update client/network/sync/src/lib.rs

Co-authored-by: Bastian Köcher <git@kchr.de>

Co-authored-by: Bastian Köcher <git@kchr.de>
This commit is contained in:
Aaro Altonen
2022-12-09 21:50:57 +02:00
committed by GitHub
parent 225c260e07
commit d6827185c3
24 changed files with 716 additions and 490 deletions
+318 -165
View File
@@ -54,9 +54,12 @@ use futures::{
};
use libp2p::{request_response::OutboundFailure, PeerId};
use log::{debug, error, info, trace, warn};
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use prost::Message;
use sc_client_api::{BlockBackend, ProofProvider};
use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock};
use sc_consensus::{
import_queue::ImportQueueService, BlockImportError, BlockImportStatus, IncomingBlock,
};
use sc_network_common::{
config::{
NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, ProtocolId, SetConfig,
@@ -71,8 +74,8 @@ use sc_network_common::{
warp::{EncodedProof, WarpProofRequest, WarpSyncPhase, WarpSyncProgress, WarpSyncProvider},
BadPeer, ChainSync as ChainSyncT, ImportResult, Metrics, OnBlockData, OnBlockJustification,
OnStateData, OpaqueBlockRequest, OpaqueBlockResponse, OpaqueStateRequest,
OpaqueStateResponse, PeerInfo, PeerRequest, PollBlockAnnounceValidation, PollResult,
SyncMode, SyncState, SyncStatus,
OpaqueStateResponse, PeerInfo, PeerRequest, PollBlockAnnounceValidation, SyncMode,
SyncState, SyncStatus,
},
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
@@ -233,6 +236,32 @@ impl Default for AllowedRequests {
}
}
struct SyncingMetrics {
pub import_queue_blocks_submitted: Counter<U64>,
pub import_queue_justifications_submitted: Counter<U64>,
}
impl SyncingMetrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
import_queue_blocks_submitted: register(
Counter::new(
"substrate_sync_import_queue_blocks_submitted",
"Number of blocks submitted to the import queue.",
)?,
registry,
)?,
import_queue_justifications_submitted: register(
Counter::new(
"substrate_sync_import_queue_justifications_submitted",
"Number of justifications submitted to the import queue.",
)?,
registry,
)?,
})
}
}
struct GapSync<B: BlockT> {
blocks: BlockCollection<B>,
best_queued_number: NumberFor<B>,
@@ -311,6 +340,10 @@ pub struct ChainSync<B: BlockT, Client> {
warp_sync_protocol_name: Option<ProtocolName>,
/// Pending responses
pending_responses: FuturesUnordered<PendingResponse<B>>,
/// Handle to import queue.
import_queue: Box<dyn ImportQueueService<B>>,
/// Metrics.
metrics: Option<SyncingMetrics>,
}
/// All the data we have about a Peer that we are trying to sync with
@@ -961,6 +994,19 @@ where
Ok(self.validate_and_queue_blocks(new_blocks, gap))
}
fn process_block_response_data(&mut self, blocks_to_import: Result<OnBlockData<B>, BadPeer>) {
match blocks_to_import {
Ok(OnBlockData::Import(origin, blocks)) => self.import_blocks(origin, blocks),
Ok(OnBlockData::Request(peer, req)) => self.send_block_request(peer, req),
Ok(OnBlockData::Continue) => {},
Err(BadPeer(id, repu)) => {
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(id, repu);
},
}
}
fn on_block_justification(
&mut self,
who: PeerId,
@@ -1016,156 +1062,6 @@ where
Ok(OnBlockJustification::Nothing)
}
fn on_blocks_processed(
&mut self,
imported: usize,
count: usize,
results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
) -> Box<dyn Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>>> {
trace!(target: "sync", "Imported {} of {}", imported, count);
let mut output = Vec::new();
let mut has_error = false;
for (_, hash) in &results {
self.queue_blocks.remove(hash);
self.blocks.clear_queued(hash);
if let Some(gap_sync) = &mut self.gap_sync {
gap_sync.blocks.clear_queued(hash);
}
}
for (result, hash) in results {
if has_error {
break
}
if result.is_err() {
has_error = true;
}
match result {
Ok(BlockImportStatus::ImportedKnown(number, who)) =>
if let Some(peer) = who {
self.update_peer_common_number(&peer, number);
},
Ok(BlockImportStatus::ImportedUnknown(number, aux, who)) => {
if aux.clear_justification_requests {
trace!(
target: "sync",
"Block imported clears all pending justification requests {}: {:?}",
number,
hash,
);
self.clear_justification_requests();
}
if aux.needs_justification {
trace!(
target: "sync",
"Block imported but requires justification {}: {:?}",
number,
hash,
);
self.request_justification(&hash, number);
}
if aux.bad_justification {
if let Some(ref peer) = who {
warn!("💔 Sent block with bad justification to import");
output.push(Err(BadPeer(*peer, rep::BAD_JUSTIFICATION)));
}
}
if let Some(peer) = who {
self.update_peer_common_number(&peer, number);
}
let state_sync_complete =
self.state_sync.as_ref().map_or(false, |s| s.target() == hash);
if state_sync_complete {
info!(
target: "sync",
"State sync is complete ({} MiB), restarting block sync.",
self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
);
self.state_sync = None;
self.mode = SyncMode::Full;
output.extend(self.restart());
}
let warp_sync_complete = self
.warp_sync
.as_ref()
.map_or(false, |s| s.target_block_hash() == Some(hash));
if warp_sync_complete {
info!(
target: "sync",
"Warp sync is complete ({} MiB), restarting block sync.",
self.warp_sync.as_ref().map_or(0, |s| s.progress().total_bytes / (1024 * 1024)),
);
self.warp_sync = None;
self.mode = SyncMode::Full;
output.extend(self.restart());
}
let gap_sync_complete =
self.gap_sync.as_ref().map_or(false, |s| s.target == number);
if gap_sync_complete {
info!(
target: "sync",
"Block history download is complete."
);
self.gap_sync = None;
}
},
Err(BlockImportError::IncompleteHeader(who)) =>
if let Some(peer) = who {
warn!(
target: "sync",
"💔 Peer sent block with incomplete header to import",
);
output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER)));
output.extend(self.restart());
},
Err(BlockImportError::VerificationFailed(who, e)) =>
if let Some(peer) = who {
warn!(
target: "sync",
"💔 Verification failed for block {:?} received from peer: {}, {:?}",
hash,
peer,
e,
);
output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL)));
output.extend(self.restart());
},
Err(BlockImportError::BadBlock(who)) =>
if let Some(peer) = who {
warn!(
target: "sync",
"💔 Block {:?} received from peer {} has been blacklisted",
hash,
peer,
);
output.push(Err(BadPeer(peer, rep::BAD_BLOCK)));
},
Err(BlockImportError::MissingState) => {
// This may happen if the chain we were requesting upon has been discarded
// in the meantime because other chain has been finalized.
// Don't mark it as bad as it still may be synced if explicitly requested.
trace!(target: "sync", "Obsolete block {:?}", hash);
},
e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
warn!(target: "sync", "💔 Error importing block {:?}: {}", hash, e.unwrap_err());
self.state_sync = None;
self.warp_sync = None;
output.extend(self.restart());
},
Err(BlockImportError::Cancelled) => {},
};
}
self.allowed_requests.set_all();
Box::new(output.into_iter())
}
fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
let finalization_result = if success { Ok((hash, number)) } else { Err(()) };
self.extra_justifications
@@ -1331,7 +1227,7 @@ where
}
}
fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<B>> {
fn peer_disconnected(&mut self, who: &PeerId) {
self.blocks.clear_peer_download(who);
if let Some(gap_sync) = &mut self.gap_sync {
gap_sync.blocks.clear_peer_download(who)
@@ -1343,8 +1239,13 @@ where
target.peers.remove(who);
!target.peers.is_empty()
});
let blocks = self.ready_blocks();
(!blocks.is_empty()).then(|| self.validate_and_queue_blocks(blocks, false))
if let Some(OnBlockData::Import(origin, blocks)) =
(!blocks.is_empty()).then(|| self.validate_and_queue_blocks(blocks, false))
{
self.import_blocks(origin, blocks);
}
}
fn metrics(&self) -> Metrics {
@@ -1421,22 +1322,56 @@ where
.map_err(|error: codec::Error| error.to_string())
}
fn poll(&mut self, cx: &mut std::task::Context) -> Poll<PollResult<B>> {
fn poll(
&mut self,
cx: &mut std::task::Context,
) -> Poll<PollBlockAnnounceValidation<B::Header>> {
while let Poll::Ready(Some(event)) = self.service_rx.poll_next_unpin(cx) {
match event {
ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
self.set_sync_fork_request(peers, &hash, number);
},
ToServiceCommand::RequestJustification(hash, number) =>
self.request_justification(&hash, number),
ToServiceCommand::ClearJustificationRequests => self.clear_justification_requests(),
ToServiceCommand::BlocksProcessed(imported, count, results) => {
for result in self.on_blocks_processed(imported, count, results) {
match result {
Ok((id, req)) => self.send_block_request(id, req),
Err(BadPeer(id, repu)) => {
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(id, repu)
},
}
}
},
ToServiceCommand::JustificationImported(peer, hash, number, success) => {
self.on_justification_import(hash, number, success);
if !success {
info!(target: "sync", "💔 Invalid justification provided by {} for #{}", peer, hash);
self.network_service
.disconnect_peer(peer, self.block_announce_protocol_name.clone());
self.network_service.report_peer(
peer,
sc_peerset::ReputationChange::new_fatal("Invalid justification"),
);
}
},
}
}
self.process_outbound_requests();
if let Poll::Ready(result) = self.poll_pending_responses(cx) {
return Poll::Ready(PollResult::Import(result))
while let Poll::Ready(result) = self.poll_pending_responses(cx) {
match result {
ImportResult::BlockImport(origin, blocks) => self.import_blocks(origin, blocks),
ImportResult::JustificationImport(who, hash, number, justifications) =>
self.import_justifications(who, hash, number, justifications),
}
}
if let Poll::Ready(announce) = self.poll_block_announce_validation(cx) {
return Poll::Ready(PollResult::Announce(announce))
return Poll::Ready(announce)
}
Poll::Pending
@@ -1494,11 +1429,13 @@ where
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
max_parallel_downloads: u32,
warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>,
metrics_registry: Option<&Registry>,
network_service: service::network::NetworkServiceHandle,
import_queue: Box<dyn ImportQueueService<B>>,
block_request_protocol_name: ProtocolName,
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option<ProtocolName>,
) -> Result<(Self, Box<ChainSyncInterfaceHandle<B>>, NonDefaultSetConfig), ClientError> {
) -> Result<(Self, ChainSyncInterfaceHandle<B>, NonDefaultSetConfig), ClientError> {
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync");
let block_announce_config = Self::get_block_announce_proto_config(
protocol_id,
@@ -1544,10 +1481,22 @@ where
.clone()
.into(),
pending_responses: Default::default(),
import_queue,
metrics: if let Some(r) = &metrics_registry {
match SyncingMetrics::register(r) {
Ok(metrics) => Some(metrics),
Err(err) => {
error!(target: "sync", "Failed to register metrics for ChainSync: {err:?}");
None
},
}
} else {
None
},
};
sync.reset_sync_start_point()?;
Ok((sync, Box::new(ChainSyncInterfaceHandle::new(tx)), block_announce_config))
Ok((sync, ChainSyncInterfaceHandle::new(tx), block_announce_config))
}
/// Returns the median seen block number.
@@ -2173,8 +2122,10 @@ where
if request.fields == BlockAttributes::JUSTIFICATION {
match self.on_block_justification(peer_id, block_response) {
Ok(OnBlockJustification::Nothing) => None,
Ok(OnBlockJustification::Import { peer, hash, number, justifications }) =>
Some(ImportResult::JustificationImport(peer, hash, number, justifications)),
Ok(OnBlockJustification::Import { peer, hash, number, justifications }) => {
self.import_justifications(peer, hash, number, justifications);
None
},
Err(BadPeer(id, repu)) => {
self.network_service
.disconnect_peer(id, self.block_announce_protocol_name.clone());
@@ -2184,8 +2135,10 @@ where
}
} else {
match self.on_block_data(&peer_id, Some(request), block_response) {
Ok(OnBlockData::Import(origin, blocks)) =>
Some(ImportResult::BlockImport(origin, blocks)),
Ok(OnBlockData::Import(origin, blocks)) => {
self.import_blocks(origin, blocks);
None
},
Ok(OnBlockData::Request(peer, req)) => {
self.send_block_request(peer, req);
None
@@ -2712,6 +2665,182 @@ where
},
}
}
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
if let Some(metrics) = &self.metrics {
metrics.import_queue_blocks_submitted.inc();
}
self.import_queue.import_blocks(origin, blocks);
}
fn import_justifications(
&mut self,
peer: PeerId,
hash: B::Hash,
number: NumberFor<B>,
justifications: Justifications,
) {
if let Some(metrics) = &self.metrics {
metrics.import_queue_justifications_submitted.inc();
}
self.import_queue.import_justifications(peer, hash, number, justifications);
}
/// A batch of blocks have been processed, with or without errors.
///
/// Call this when a batch of blocks have been processed by the import
/// queue, with or without errors.
fn on_blocks_processed(
&mut self,
imported: usize,
count: usize,
results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
) -> Box<dyn Iterator<Item = Result<(PeerId, BlockRequest<B>), BadPeer>>> {
trace!(target: "sync", "Imported {} of {}", imported, count);
let mut output = Vec::new();
let mut has_error = false;
for (_, hash) in &results {
self.queue_blocks.remove(hash);
self.blocks.clear_queued(hash);
if let Some(gap_sync) = &mut self.gap_sync {
gap_sync.blocks.clear_queued(hash);
}
}
for (result, hash) in results {
if has_error {
break
}
if result.is_err() {
has_error = true;
}
match result {
Ok(BlockImportStatus::ImportedKnown(number, who)) =>
if let Some(peer) = who {
self.update_peer_common_number(&peer, number);
},
Ok(BlockImportStatus::ImportedUnknown(number, aux, who)) => {
if aux.clear_justification_requests {
trace!(
target: "sync",
"Block imported clears all pending justification requests {}: {:?}",
number,
hash,
);
self.clear_justification_requests();
}
if aux.needs_justification {
trace!(
target: "sync",
"Block imported but requires justification {}: {:?}",
number,
hash,
);
self.request_justification(&hash, number);
}
if aux.bad_justification {
if let Some(ref peer) = who {
warn!("💔 Sent block with bad justification to import");
output.push(Err(BadPeer(*peer, rep::BAD_JUSTIFICATION)));
}
}
if let Some(peer) = who {
self.update_peer_common_number(&peer, number);
}
let state_sync_complete =
self.state_sync.as_ref().map_or(false, |s| s.target() == hash);
if state_sync_complete {
info!(
target: "sync",
"State sync is complete ({} MiB), restarting block sync.",
self.state_sync.as_ref().map_or(0, |s| s.progress().size / (1024 * 1024)),
);
self.state_sync = None;
self.mode = SyncMode::Full;
output.extend(self.restart());
}
let warp_sync_complete = self
.warp_sync
.as_ref()
.map_or(false, |s| s.target_block_hash() == Some(hash));
if warp_sync_complete {
info!(
target: "sync",
"Warp sync is complete ({} MiB), restarting block sync.",
self.warp_sync.as_ref().map_or(0, |s| s.progress().total_bytes / (1024 * 1024)),
);
self.warp_sync = None;
self.mode = SyncMode::Full;
output.extend(self.restart());
}
let gap_sync_complete =
self.gap_sync.as_ref().map_or(false, |s| s.target == number);
if gap_sync_complete {
info!(
target: "sync",
"Block history download is complete."
);
self.gap_sync = None;
}
},
Err(BlockImportError::IncompleteHeader(who)) =>
if let Some(peer) = who {
warn!(
target: "sync",
"💔 Peer sent block with incomplete header to import",
);
output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER)));
output.extend(self.restart());
},
Err(BlockImportError::VerificationFailed(who, e)) =>
if let Some(peer) = who {
warn!(
target: "sync",
"💔 Verification failed for block {:?} received from peer: {}, {:?}",
hash,
peer,
e,
);
output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL)));
output.extend(self.restart());
},
Err(BlockImportError::BadBlock(who)) =>
if let Some(peer) = who {
warn!(
target: "sync",
"💔 Block {:?} received from peer {} has been blacklisted",
hash,
peer,
);
output.push(Err(BadPeer(peer, rep::BAD_BLOCK)));
},
Err(BlockImportError::MissingState) => {
// This may happen if the chain we were requesting upon has been discarded
// in the meantime because other chain has been finalized.
// Don't mark it as bad as it still may be synced if explicitly requested.
trace!(target: "sync", "Obsolete block {:?}", hash);
},
e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => {
warn!(target: "sync", "💔 Error importing block {:?}: {}", hash, e.unwrap_err());
self.state_sync = None;
self.warp_sync = None;
output.extend(self.restart());
},
Err(BlockImportError::Cancelled) => {},
};
}
self.allowed_requests.set_all();
Box::new(output.into_iter())
}
}
// This is purely during a backwards compatible transitionary period and should be removed
@@ -3089,6 +3218,7 @@ mod test {
let block_announce_validator = Box::new(DefaultBlockAnnounceValidator);
let peer_id = PeerId::random();
let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
let (mut sync, _, _) = ChainSync::new(
@@ -3100,7 +3230,9 @@ mod test {
block_announce_validator,
1,
None,
None,
chain_sync_network_handle,
import_queue,
ProtocolName::from("block-request"),
ProtocolName::from("state-request"),
None,
@@ -3151,6 +3283,7 @@ mod test {
#[test]
fn restart_doesnt_affect_peers_downloading_finality_data() {
let mut client = Arc::new(TestClientBuilder::new().build());
let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
@@ -3163,7 +3296,9 @@ mod test {
Box::new(DefaultBlockAnnounceValidator),
1,
None,
None,
chain_sync_network_handle,
import_queue,
ProtocolName::from("block-request"),
ProtocolName::from("state-request"),
None,
@@ -3330,6 +3465,7 @@ mod test {
sp_tracing::try_init_simple();
let mut client = Arc::new(TestClientBuilder::new().build());
let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
@@ -3342,7 +3478,9 @@ mod test {
Box::new(DefaultBlockAnnounceValidator),
5,
None,
None,
chain_sync_network_handle,
import_queue,
ProtocolName::from("block-request"),
ProtocolName::from("state-request"),
None,
@@ -3453,6 +3591,7 @@ mod test {
};
let mut client = Arc::new(TestClientBuilder::new().build());
let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
let info = client.info();
@@ -3466,7 +3605,9 @@ mod test {
Box::new(DefaultBlockAnnounceValidator),
5,
None,
None,
chain_sync_network_handle,
import_queue,
ProtocolName::from("block-request"),
ProtocolName::from("state-request"),
None,
@@ -3584,6 +3725,7 @@ mod test {
fn can_sync_huge_fork() {
sp_tracing::try_init_simple();
let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
let mut client = Arc::new(TestClientBuilder::new().build());
@@ -3619,7 +3761,9 @@ mod test {
Box::new(DefaultBlockAnnounceValidator),
5,
None,
None,
chain_sync_network_handle,
import_queue,
ProtocolName::from("block-request"),
ProtocolName::from("state-request"),
None,
@@ -3722,6 +3866,7 @@ mod test {
fn syncs_fork_without_duplicate_requests() {
sp_tracing::try_init_simple();
let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
let mut client = Arc::new(TestClientBuilder::new().build());
@@ -3757,7 +3902,9 @@ mod test {
Box::new(DefaultBlockAnnounceValidator),
5,
None,
None,
chain_sync_network_handle,
import_queue,
ProtocolName::from("block-request"),
ProtocolName::from("state-request"),
None,
@@ -3881,6 +4028,7 @@ mod test {
#[test]
fn removes_target_fork_on_disconnect() {
sp_tracing::try_init_simple();
let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
let mut client = Arc::new(TestClientBuilder::new().build());
@@ -3895,7 +4043,9 @@ mod test {
Box::new(DefaultBlockAnnounceValidator),
1,
None,
None,
chain_sync_network_handle,
import_queue,
ProtocolName::from("block-request"),
ProtocolName::from("state-request"),
None,
@@ -3921,6 +4071,7 @@ mod test {
#[test]
fn can_import_response_with_missing_blocks() {
sp_tracing::try_init_simple();
let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
let mut client2 = Arc::new(TestClientBuilder::new().build());
@@ -3937,7 +4088,9 @@ mod test {
Box::new(DefaultBlockAnnounceValidator),
1,
None,
None,
chain_sync_network_handle,
import_queue,
ProtocolName::from("block-request"),
ProtocolName::from("state-request"),
None,
+4 -10
View File
@@ -21,11 +21,10 @@
use futures::task::Poll;
use libp2p::PeerId;
use sc_consensus::{BlockImportError, BlockImportStatus};
use sc_network_common::sync::{
message::{BlockAnnounce, BlockData, BlockRequest, BlockResponse},
BadPeer, ChainSync as ChainSyncT, Metrics, OnBlockData, OnBlockJustification,
OpaqueBlockResponse, PeerInfo, PollBlockAnnounceValidation, PollResult, SyncStatus,
OpaqueBlockResponse, PeerInfo, PollBlockAnnounceValidation, SyncStatus,
};
use sp_runtime::traits::{Block as BlockT, NumberFor};
@@ -60,17 +59,12 @@ mockall::mock! {
request: Option<BlockRequest<Block>>,
response: BlockResponse<Block>,
) -> Result<OnBlockData<Block>, BadPeer>;
fn process_block_response_data(&mut self, blocks_to_import: Result<OnBlockData<Block>, BadPeer>);
fn on_block_justification(
&mut self,
who: PeerId,
response: BlockResponse<Block>,
) -> Result<OnBlockJustification<Block>, BadPeer>;
fn on_blocks_processed(
&mut self,
imported: usize,
count: usize,
results: Vec<(Result<BlockImportStatus<NumberFor<Block>>, BlockImportError>, Block::Hash)>,
) -> Box<dyn Iterator<Item = Result<(PeerId, BlockRequest<Block>), BadPeer>>>;
fn on_justification_import(
&mut self,
hash: Block::Hash,
@@ -89,7 +83,7 @@ mockall::mock! {
&mut self,
cx: &mut std::task::Context<'a>,
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;
fn peer_disconnected(&mut self, who: &PeerId) -> Option<OnBlockData<Block>>;
fn peer_disconnected(&mut self, who: &PeerId);
fn metrics(&self) -> Metrics;
fn block_response_into_blocks(
&self,
@@ -99,7 +93,7 @@ mockall::mock! {
fn poll<'a>(
&mut self,
cx: &mut std::task::Context<'a>,
) -> Poll<PollResult<Block>>;
) -> Poll<PollBlockAnnounceValidation<Block::Header>>;
fn send_block_request(
&mut self,
who: PeerId,
@@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use libp2p::PeerId;
use sc_consensus::{BlockImportError, BlockImportStatus, JustificationSyncLink, Link};
use sc_network_common::service::NetworkSyncForkRequest;
use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::{Block as BlockT, NumberFor};
@@ -25,9 +26,18 @@ use sp_runtime::traits::{Block as BlockT, NumberFor};
#[derive(Debug)]
pub enum ToServiceCommand<B: BlockT> {
SetSyncForkRequest(Vec<PeerId>, B::Hash, NumberFor<B>),
RequestJustification(B::Hash, NumberFor<B>),
ClearJustificationRequests,
BlocksProcessed(
usize,
usize,
Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
),
JustificationImported(PeerId, B::Hash, NumberFor<B>, bool),
}
/// Handle for communicating with `ChainSync` asynchronously
#[derive(Clone)]
pub struct ChainSyncInterfaceHandle<B: BlockT> {
tx: TracingUnboundedSender<ToServiceCommand<B>>,
}
@@ -56,3 +66,46 @@ impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>>
.unbounded_send(ToServiceCommand::SetSyncForkRequest(peers, hash, number));
}
}
impl<B: BlockT> JustificationSyncLink<B> for ChainSyncInterfaceHandle<B> {
/// Request a justification for the given block from the network.
///
/// On success, the justification will be passed to the import queue that was part at
/// initialization as part of the configuration.
fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
}
fn clear_justification_requests(&self) {
let _ = self.tx.unbounded_send(ToServiceCommand::ClearJustificationRequests);
}
}
impl<B: BlockT> Link<B> for ChainSyncInterfaceHandle<B> {
fn blocks_processed(
&mut self,
imported: usize,
count: usize,
results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
) {
let _ = self
.tx
.unbounded_send(ToServiceCommand::BlocksProcessed(imported, count, results));
}
fn justification_imported(
&mut self,
who: PeerId,
hash: &B::Hash,
number: NumberFor<B>,
success: bool,
) {
let _ = self
.tx
.unbounded_send(ToServiceCommand::JustificationImported(who, *hash, number, success));
}
fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self.tx.unbounded_send(ToServiceCommand::RequestJustification(*hash, number));
}
}
@@ -18,6 +18,7 @@
use futures::channel::oneshot;
use libp2p::{Multiaddr, PeerId};
use sc_consensus::{BlockImportError, BlockImportStatus};
use sc_network_common::{
config::MultiaddrWithPeerId,
protocol::ProtocolName,
@@ -29,13 +30,43 @@ use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::collections::HashSet;
mockall::mock! {
pub ChainSyncInterface<B: BlockT> {}
pub ChainSyncInterface<B: BlockT> {
pub fn justification_sync_link_request_justification(&self, hash: &B::Hash, number: NumberFor<B>);
pub fn justification_sync_link_clear_justification_requests(&self);
}
impl<B: BlockT + 'static> NetworkSyncForkRequest<B::Hash, NumberFor<B>>
for ChainSyncInterface<B>
{
fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>);
}
impl<B: BlockT> sc_consensus::Link<B> for ChainSyncInterface<B> {
fn blocks_processed(
&mut self,
imported: usize,
count: usize,
results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
);
fn justification_imported(
&mut self,
who: PeerId,
hash: &B::Hash,
number: NumberFor<B>,
success: bool,
);
fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>);
}
}
impl<B: BlockT> sc_consensus::JustificationSyncLink<B> for MockChainSyncInterface<B> {
fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
self.justification_sync_link_request_justification(hash, number);
}
fn clear_justification_requests(&self) {
self.justification_sync_link_clear_justification_requests();
}
}
mockall::mock! {
@@ -37,6 +37,7 @@ use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _
// poll `ChainSync` and verify that a new sync fork request has been registered
#[tokio::test]
async fn delegate_to_chainsync() {
let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
let (_chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new();
let (mut chain_sync, chain_sync_service, _) = ChainSync::new(
sc_network_common::sync::SyncMode::Full,
@@ -47,7 +48,9 @@ async fn delegate_to_chainsync() {
Box::new(DefaultBlockAnnounceValidator),
1u32,
None,
None,
chain_sync_network_handle,
import_queue,
ProtocolName::from("block-request"),
ProtocolName::from("state-request"),
None,