Network sync refactoring (part 4) (#11412)

* Remove direct dependency of `sc-network` on `sc-network-light`

* Move `WarpSyncProvider` trait and surrounding data structures into `sc-network-common`

* Move `WarpSyncProvider` trait and surrounding data structures into `sc-network-common`

* Create `sync` module in `sc-network-common`, create `ChainSync` trait there (not used yet), move a bunch of associated data structures from `sc-network-sync`

* Switch from concrete implementation to `ChainSync` trait from `sc-network-common`

* Introduce `OpaqueStateRequest`/`OpaqueStateResponse` to remove generics from `StateSync` trait

* Introduce `OpaqueBlockRequest`/`OpaqueBlockResponse`, make `scheme` module of `sc-network-sync` private

* Surface `sc-network-sync` into `sc-service` and make `sc-network` not depend on it anymore

* Remove now unnecessary dependency from `sc-network`

* Replace crate links with just text since dependencies are gone now

* Remove `warp_sync` re-export from `sc-network-common`

* Update copyright in network-related files

* Address review comments about documentation

* Apply review suggestion

* Rename `extra_requests` module to `metrics`

Co-authored-by: Bastian Köcher <info@kchr.de>
This commit is contained in:
Nazar Mokrynskyi
2022-07-12 23:34:17 +03:00
committed by GitHub
parent 4b8d784210
commit 5896072b86
35 changed files with 1286 additions and 1041 deletions
+111 -199
View File
@@ -20,7 +20,6 @@ use crate::{
config, error,
request_responses::RequestFailure,
utils::{interval, LruHashSet},
warp_request_handler::{EncodedProof, WarpSyncProvider},
};
use bytes::Bytes;
@@ -42,21 +41,23 @@ use message::{
};
use notifications::{Notifications, NotificationsOut};
use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64};
use prost::Message as _;
use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::import_queue::{BlockImportError, BlockImportStatus, IncomingBlock, Origin};
use sc_network_common::config::ProtocolId;
use sc_network_sync::{
message::{
BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, BlockState,
FromBlock,
use sc_network_common::{
config::ProtocolId,
sync::{
message::{
BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, BlockState,
},
warp::{EncodedProof, WarpProofRequest},
BadPeer, ChainSync, OnBlockData, OnBlockJustification, OnStateData, OpaqueBlockRequest,
OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PollBlockAnnounceValidation,
SyncStatus,
},
schema::v1::StateResponse,
BadPeer, ChainSync, OnBlockData, OnBlockJustification, OnStateData,
PollBlockAnnounceValidation, Status as SyncStatus,
};
use sp_arithmetic::traits::SaturatedConversion;
use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
use sp_blockchain::HeaderMetadata;
use sp_consensus::BlockOrigin;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, CheckedSub, Header as HeaderT, NumberFor, Zero},
@@ -79,7 +80,6 @@ pub mod event;
pub mod message;
pub use notifications::{NotificationsSink, NotifsHandlerError, Ready};
use sp_blockchain::HeaderMetadata;
/// Interval at which we perform time based maintenance
const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
@@ -167,11 +167,12 @@ pub struct Protocol<B: BlockT, Client> {
tick_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
/// Pending list of messages to return from `poll` as a priority.
pending_messages: VecDeque<CustomMessageOutcome<B>>,
config: ProtocolConfig,
/// Assigned roles.
roles: Roles,
genesis_hash: B::Hash,
/// State machine that handles the list of in-progress requests. Only full node peers are
/// registered.
sync: ChainSync<B, Client>,
chain_sync: Box<dyn ChainSync<B>>,
// All connected peers. Contains both full and light node peers.
peers: HashMap<PeerId, Peer<B>>,
chain: Arc<Client>,
@@ -231,38 +232,6 @@ pub struct PeerInfo<B: BlockT> {
pub best_number: <B::Header as HeaderT>::Number,
}
/// Configuration for the Substrate-specific part of the networking layer.
#[derive(Clone)]
pub struct ProtocolConfig {
/// Assigned roles.
pub roles: Roles,
/// Maximum number of peers to ask the same blocks in parallel.
pub max_parallel_downloads: u32,
/// Enable state sync.
pub sync_mode: config::SyncMode,
}
impl ProtocolConfig {
fn sync_mode(&self) -> sc_network_sync::SyncMode {
if self.roles.is_light() {
sc_network_sync::SyncMode::Light
} else {
match self.sync_mode {
config::SyncMode::Full => sc_network_sync::SyncMode::Full,
config::SyncMode::Fast { skip_proofs, storage_chain_mode } =>
sc_network_sync::SyncMode::LightState { skip_proofs, storage_chain_mode },
config::SyncMode::Warp => sc_network_sync::SyncMode::Warp,
}
}
}
}
impl Default for ProtocolConfig {
fn default() -> ProtocolConfig {
Self { roles: Roles::FULL, max_parallel_downloads: 5, sync_mode: config::SyncMode::Full }
}
}
/// Handshake sent when we open a block announces substream.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
struct BlockAnnouncesHandshake<B: BlockT> {
@@ -278,12 +247,12 @@ struct BlockAnnouncesHandshake<B: BlockT> {
impl<B: BlockT> BlockAnnouncesHandshake<B> {
fn build(
protocol_config: &ProtocolConfig,
roles: Roles,
best_number: NumberFor<B>,
best_hash: B::Hash,
genesis_hash: B::Hash,
) -> Self {
Self { genesis_hash, roles: protocol_config.roles, best_number, best_hash }
Self { genesis_hash, roles, best_number, best_hash }
}
}
@@ -300,24 +269,15 @@ where
{
/// Create a new instance.
pub fn new(
config: ProtocolConfig,
roles: Roles,
chain: Arc<Client>,
protocol_id: ProtocolId,
network_config: &config::NetworkConfiguration,
notifications_protocols_handshakes: Vec<Vec<u8>>,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
metrics_registry: Option<&Registry>,
warp_sync_provider: Option<Arc<dyn WarpSyncProvider<B>>>,
) -> error::Result<(Protocol<B, Client>, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
chain_sync: Box<dyn ChainSync<B>>,
) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
let info = chain.info();
let sync = ChainSync::new(
config.sync_mode(),
chain.clone(),
block_announce_validator,
config.max_parallel_downloads,
warp_sync_provider,
)
.map_err(Box::new)?;
let boot_node_ids = {
let mut list = HashSet::new();
@@ -405,7 +365,7 @@ where
let genesis_hash = info.genesis_hash;
let block_announces_handshake =
BlockAnnouncesHandshake::<B>::build(&config, best_number, best_hash, genesis_hash)
BlockAnnouncesHandshake::<B>::build(roles, best_number, best_hash, genesis_hash)
.encode();
let sync_protocol_config = notifications::ProtocolConfig {
@@ -438,11 +398,11 @@ where
let protocol = Self {
tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
pending_messages: VecDeque::new(),
config,
roles,
peers: HashMap::new(),
chain,
genesis_hash: info.genesis_hash,
sync,
chain_sync,
important_peers,
default_peers_set_num_full: network_config.default_peers_set_num_full as usize,
default_peers_set_num_light: {
@@ -510,49 +470,49 @@ where
/// Current global sync state.
pub fn sync_state(&self) -> SyncStatus<B> {
self.sync.status()
self.chain_sync.status()
}
/// Target sync block number.
pub fn best_seen_block(&self) -> Option<NumberFor<B>> {
self.sync.status().best_seen_block
self.chain_sync.status().best_seen_block
}
/// Number of peers participating in syncing.
pub fn num_sync_peers(&self) -> u32 {
self.sync.status().num_peers
self.chain_sync.status().num_peers
}
/// Number of blocks in the import queue.
pub fn num_queued_blocks(&self) -> u32 {
self.sync.status().queued_blocks
self.chain_sync.status().queued_blocks
}
/// Number of downloaded blocks.
pub fn num_downloaded_blocks(&self) -> usize {
self.sync.num_downloaded_blocks()
self.chain_sync.num_downloaded_blocks()
}
/// Number of active sync requests.
pub fn num_sync_requests(&self) -> usize {
self.sync.num_sync_requests()
self.chain_sync.num_sync_requests()
}
/// Inform sync about new best imported block.
pub fn new_best_block_imported(&mut self, hash: B::Hash, number: NumberFor<B>) {
debug!(target: "sync", "New best block imported {:?}/#{}", hash, number);
self.sync.update_chain_info(&hash, number);
self.chain_sync.update_chain_info(&hash, number);
self.behaviour.set_notif_protocol_handshake(
HARDCODED_PEERSETS_SYNC,
BlockAnnouncesHandshake::<B>::build(&self.config, number, hash, self.genesis_hash)
BlockAnnouncesHandshake::<B>::build(self.roles, number, hash, self.genesis_hash)
.encode(),
);
}
fn update_peer_info(&mut self, who: &PeerId) {
if let Some(info) = self.sync.peer_info(who) {
if let Some(info) = self.chain_sync.peer_info(who) {
if let Some(ref mut peer) = self.peers.get_mut(who) {
peer.info.best_hash = info.best_hash;
peer.info.best_number = info.best_number;
@@ -565,14 +525,6 @@ where
self.peers.iter().map(|(id, peer)| (id, &peer.info))
}
fn prepare_block_request(
&mut self,
who: PeerId,
request: BlockRequest<B>,
) -> CustomMessageOutcome<B> {
prepare_block_request::<B>(&mut self.peers, who, request)
}
/// Called by peer when it is disconnecting.
///
/// Returns a result if the handshake of this peer was indeed accepted.
@@ -584,7 +536,9 @@ where
}
if let Some(_peer_data) = self.peers.remove(&peer) {
if let Some(OnBlockData::Import(origin, blocks)) = self.sync.peer_disconnected(&peer) {
if let Some(OnBlockData::Import(origin, blocks)) =
self.chain_sync.peer_disconnected(&peer)
{
self.pending_messages
.push_back(CustomMessageOutcome::BlockImport(origin, blocks));
}
@@ -605,62 +559,9 @@ where
&mut self,
peer_id: PeerId,
request: BlockRequest<B>,
response: sc_network_sync::schema::v1::BlockResponse,
response: OpaqueBlockResponse,
) -> CustomMessageOutcome<B> {
let blocks = response
.blocks
.into_iter()
.map(|block_data| {
Ok(BlockData::<B> {
hash: Decode::decode(&mut block_data.hash.as_ref())?,
header: if !block_data.header.is_empty() {
Some(Decode::decode(&mut block_data.header.as_ref())?)
} else {
None
},
body: if request.fields.contains(BlockAttributes::BODY) {
Some(
block_data
.body
.iter()
.map(|body| Decode::decode(&mut body.as_ref()))
.collect::<Result<Vec<_>, _>>()?,
)
} else {
None
},
indexed_body: if request.fields.contains(BlockAttributes::INDEXED_BODY) {
Some(block_data.indexed_body)
} else {
None
},
receipt: if !block_data.receipt.is_empty() {
Some(block_data.receipt)
} else {
None
},
message_queue: if !block_data.message_queue.is_empty() {
Some(block_data.message_queue)
} else {
None
},
justification: if !block_data.justification.is_empty() {
Some(block_data.justification)
} else if block_data.is_empty_justification {
Some(Vec::new())
} else {
None
},
justifications: if !block_data.justifications.is_empty() {
Some(DecodeAll::decode_all(&mut block_data.justifications.as_ref())?)
} else {
None
},
})
})
.collect::<Result<Vec<_>, codec::Error>>();
let blocks = match blocks {
let blocks = match self.chain_sync.block_response_into_blocks(&request, response) {
Ok(blocks) => blocks,
Err(err) => {
debug!(target: "sync", "Failed to decode block response from {}: {}", peer_id, err);
@@ -690,7 +591,7 @@ where
);
if request.fields == BlockAttributes::JUSTIFICATION {
match self.sync.on_block_justification(peer_id, block_response) {
match self.chain_sync.on_block_justification(peer_id, block_response) {
Ok(OnBlockJustification::Nothing) => CustomMessageOutcome::None,
Ok(OnBlockJustification::Import { peer, hash, number, justifications }) =>
CustomMessageOutcome::JustificationImport(peer, hash, number, justifications),
@@ -701,10 +602,11 @@ where
},
}
} else {
match self.sync.on_block_data(&peer_id, Some(request), block_response) {
match self.chain_sync.on_block_data(&peer_id, Some(request), block_response) {
Ok(OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(OnBlockData::Request(peer, req)) => self.prepare_block_request(peer, req),
Ok(OnBlockData::Request(peer, req)) =>
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, peer, req),
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
@@ -719,9 +621,9 @@ where
pub fn on_state_response(
&mut self,
peer_id: PeerId,
response: StateResponse,
response: OpaqueStateResponse,
) -> CustomMessageOutcome<B> {
match self.sync.on_state_data(&peer_id, response) {
match self.chain_sync.on_state_data(&peer_id, response) {
Ok(OnStateData::Import(origin, block)) =>
CustomMessageOutcome::BlockImport(origin, vec![block]),
Ok(OnStateData::Continue) => CustomMessageOutcome::None,
@@ -738,9 +640,9 @@ where
pub fn on_warp_sync_response(
&mut self,
peer_id: PeerId,
response: crate::warp_request_handler::EncodedProof,
response: EncodedProof,
) -> CustomMessageOutcome<B> {
match self.sync.on_warp_sync_data(&peer_id, response) {
match self.chain_sync.on_warp_sync_data(&peer_id, response) {
Ok(()) => CustomMessageOutcome::None,
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
@@ -798,7 +700,7 @@ where
return Err(())
}
if self.config.roles.is_light() {
if self.roles.is_light() {
// we're not interested in light peers
if status.roles.is_light() {
debug!(target: "sync", "Peer {} is unable to serve light requests", who);
@@ -821,14 +723,15 @@ where
}
}
if status.roles.is_full() && self.sync.num_peers() >= self.default_peers_set_num_full {
if status.roles.is_full() && self.chain_sync.num_peers() >= self.default_peers_set_num_full
{
debug!(target: "sync", "Too many full nodes, rejecting {}", who);
self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC);
return Err(())
}
if status.roles.is_light() &&
(self.peers.len() - self.sync.num_peers()) >= self.default_peers_set_num_light
(self.peers.len() - self.chain_sync.num_peers()) >= self.default_peers_set_num_light
{
// Make sure that not all slots are occupied by light clients.
debug!(target: "sync", "Too many light nodes, rejecting {}", who);
@@ -849,7 +752,7 @@ where
};
let req = if peer.info.roles.is_full() {
match self.sync.new_peer(who, peer.info.best_hash, peer.info.best_number) {
match self.chain_sync.new_peer(who, peer.info.best_hash, peer.info.best_number) {
Ok(req) => req,
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
@@ -868,8 +771,12 @@ where
.push_back(CustomMessageOutcome::PeerNewBest(who, status.best_number));
if let Some(req) = req {
let event = self.prepare_block_request(who, req);
self.pending_messages.push_back(event);
self.pending_messages.push_back(prepare_block_request(
self.chain_sync.as_ref(),
&mut self.peers,
who,
req,
));
}
Ok(())
@@ -953,7 +860,7 @@ where
};
if peer.info.roles.is_full() {
self.sync.push_block_announce_validation(who, hash, announce, is_best);
self.chain_sync.push_block_announce_validation(who, hash, announce, is_best);
}
}
@@ -1010,7 +917,7 @@ where
// to import header from announced block let's construct response to request that normally
// would have been sent over network (but it is not in our case)
let blocks_to_import = self.sync.on_block_data(
let blocks_to_import = self.chain_sync.on_block_data(
&who,
None,
BlockResponse::<B> {
@@ -1035,7 +942,8 @@ where
match blocks_to_import {
Ok(OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(OnBlockData::Request(peer, req)) => self.prepare_block_request(peer, req),
Ok(OnBlockData::Request(peer, req)) =>
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, peer, req),
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
@@ -1047,7 +955,7 @@ where
/// Call this when a block has been finalized. The sync layer may have some additional
/// requesting to perform.
pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) {
self.sync.on_block_finalized(&hash, *header.number())
self.chain_sync.on_block_finalized(&hash, *header.number())
}
/// Request a justification for the given block.
@@ -1055,12 +963,12 @@ where
/// Uses `protocol` to queue a new justification request and tries to dispatch all pending
/// requests.
pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.sync.request_justification(hash, number)
self.chain_sync.request_justification(hash, number)
}
/// Clear all pending justification requests.
pub fn clear_justification_requests(&mut self) {
self.sync.clear_justification_requests();
self.chain_sync.clear_justification_requests();
}
/// Request syncing for the given block from given set of peers.
@@ -1072,7 +980,7 @@ where
hash: &B::Hash,
number: NumberFor<B>,
) {
self.sync.set_sync_fork_request(peers, hash, number)
self.chain_sync.set_sync_fork_request(peers, hash, number)
}
/// A batch of blocks have been processed, with or without errors.
@@ -1084,11 +992,12 @@ where
count: usize,
results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
) {
let results = self.sync.on_blocks_processed(imported, count, results);
let results = self.chain_sync.on_blocks_processed(imported, count, results);
for result in results {
match result {
Ok((id, req)) => {
self.pending_messages.push_back(prepare_block_request(
self.chain_sync.as_ref(),
&mut self.peers,
id,
req,
@@ -1111,7 +1020,7 @@ where
number: NumberFor<B>,
success: bool,
) {
self.sync.on_justification_import(hash, number, success);
self.chain_sync.on_justification_import(hash, number, success);
if !success {
info!("💔 Invalid justification provided by {} for #{}", who, hash);
self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC);
@@ -1228,12 +1137,22 @@ where
}
}
/// Encode implementation-specific block request.
pub fn encode_block_request(&self, request: &OpaqueBlockRequest) -> Result<Vec<u8>, String> {
self.chain_sync.encode_block_request(request)
}
/// Encode implementation-specific state request.
pub fn encode_state_request(&self, request: &OpaqueStateRequest) -> Result<Vec<u8>, String> {
self.chain_sync.encode_state_request(request)
}
fn report_metrics(&self) {
if let Some(metrics) = &self.metrics {
let n = u64::try_from(self.peers.len()).unwrap_or(std::u64::MAX);
metrics.peers.set(n);
let m = self.sync.metrics();
let m = self.chain_sync.metrics();
metrics.fork_targets.set(m.fork_targets.into());
metrics.queued_blocks.set(m.queued_blocks.into());
@@ -1259,6 +1178,7 @@ where
}
fn prepare_block_request<B: BlockT>(
chain_sync: &dyn ChainSync<B>,
peers: &mut HashMap<PeerId, Peer<B>>,
who: PeerId,
request: BlockRequest<B>,
@@ -1269,19 +1189,7 @@ fn prepare_block_request<B: BlockT>(
peer.request = Some((PeerRequest::Block(request.clone()), rx));
}
let request = sc_network_sync::schema::v1::BlockRequest {
fields: request.fields.to_be_u32(),
from_block: match request.from {
FromBlock::Hash(h) =>
Some(sc_network_sync::schema::v1::block_request::FromBlock::Hash(h.encode())),
FromBlock::Number(n) =>
Some(sc_network_sync::schema::v1::block_request::FromBlock::Number(n.encode())),
},
to_block: request.to.map(|h| h.encode()).unwrap_or_default(),
direction: request.direction as i32,
max_blocks: request.max.unwrap_or(0),
support_multiple_justifications: true,
};
let request = chain_sync.create_opaque_block_request(&request);
CustomMessageOutcome::BlockRequest { target: who, request, pending_response: tx }
}
@@ -1289,7 +1197,7 @@ fn prepare_block_request<B: BlockT>(
fn prepare_state_request<B: BlockT>(
peers: &mut HashMap<PeerId, Peer<B>>,
who: PeerId,
request: sc_network_sync::schema::v1::StateRequest,
request: OpaqueStateRequest,
) -> CustomMessageOutcome<B> {
let (tx, rx) = oneshot::channel();
@@ -1302,7 +1210,7 @@ fn prepare_state_request<B: BlockT>(
fn prepare_warp_sync_request<B: BlockT>(
peers: &mut HashMap<PeerId, Peer<B>>,
who: PeerId,
request: crate::warp_request_handler::Request<B>,
request: WarpProofRequest<B>,
) -> CustomMessageOutcome<B> {
let (tx, rx) = oneshot::channel();
@@ -1346,19 +1254,19 @@ pub enum CustomMessageOutcome<B: BlockT> {
/// A new block request must be emitted.
BlockRequest {
target: PeerId,
request: sc_network_sync::schema::v1::BlockRequest,
request: OpaqueBlockRequest,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// A new storage request must be emitted.
StateRequest {
target: PeerId,
request: sc_network_sync::schema::v1::StateRequest,
request: OpaqueStateRequest,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// A new warp sync request must be emitted.
WarpSyncRequest {
target: PeerId,
request: crate::warp_request_handler::Request<B>,
request: WarpProofRequest<B>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// Peer has a reported a new head of chain.
@@ -1455,10 +1363,8 @@ where
let (req, _) = peer.request.take().unwrap();
match req {
PeerRequest::Block(req) => {
let protobuf_response =
match sc_network_sync::schema::v1::BlockResponse::decode(
&resp[..],
) {
let response =
match self.chain_sync.decode_block_response(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
@@ -1474,13 +1380,11 @@ where
},
};
finished_block_requests.push((*id, req, protobuf_response));
finished_block_requests.push((*id, req, response));
},
PeerRequest::State => {
let protobuf_response =
match sc_network_sync::schema::v1::StateResponse::decode(
&resp[..],
) {
let response =
match self.chain_sync.decode_state_response(&resp[..]) {
Ok(proto) => proto,
Err(e) => {
debug!(
@@ -1496,7 +1400,7 @@ where
},
};
finished_state_requests.push((*id, protobuf_response));
finished_state_requests.push((*id, response));
},
PeerRequest::WarpProof => {
finished_warp_sync_requests.push((*id, resp));
@@ -1555,12 +1459,12 @@ where
}
}
}
for (id, req, protobuf_response) in finished_block_requests {
let ev = self.on_block_response(id, req, protobuf_response);
for (id, req, response) in finished_block_requests {
let ev = self.on_block_response(id, req, response);
self.pending_messages.push_back(ev);
}
for (id, protobuf_response) in finished_state_requests {
let ev = self.on_state_response(id, protobuf_response);
for (id, response) in finished_state_requests {
let ev = self.on_state_response(id, response);
self.pending_messages.push_back(ev);
}
for (id, response) in finished_warp_sync_requests {
@@ -1572,25 +1476,32 @@ where
self.tick();
}
for (id, request) in self.sync.block_requests() {
let event = prepare_block_request(&mut self.peers, *id, request);
for (id, request) in self
.chain_sync
.block_requests()
.map(|(peer_id, request)| (*peer_id, request))
.collect::<Vec<_>>()
{
let event =
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, id, request);
self.pending_messages.push_back(event);
}
if let Some((id, request)) = self.sync.state_request() {
if let Some((id, request)) = self.chain_sync.state_request() {
let event = prepare_state_request(&mut self.peers, id, request);
self.pending_messages.push_back(event);
}
for (id, request) in self.sync.justification_requests() {
let event = prepare_block_request(&mut self.peers, id, request);
for (id, request) in self.chain_sync.justification_requests().collect::<Vec<_>>() {
let event =
prepare_block_request(self.chain_sync.as_ref(), &mut self.peers, id, request);
self.pending_messages.push_back(event);
}
if let Some((id, request)) = self.sync.warp_sync_request() {
if let Some((id, request)) = self.chain_sync.warp_sync_request() {
let event = prepare_warp_sync_request(&mut self.peers, id, request);
self.pending_messages.push_back(event);
}
// Check if there is any block announcement validation finished.
while let Poll::Ready(result) = self.sync.poll_block_announce_validation(cx) {
while let Poll::Ready(result) = self.chain_sync.poll_block_announce_validation(cx) {
match self.process_block_announce_validation_result(result) {
CustomMessageOutcome::None => {},
outcome => self.pending_messages.push_back(outcome),
@@ -1771,7 +1682,8 @@ where
// Make sure that the newly added block announce validation future was
// polled once to be registered in the task.
if let Poll::Ready(res) = self.sync.poll_block_announce_validation(cx) {
if let Poll::Ready(res) = self.chain_sync.poll_block_announce_validation(cx)
{
self.process_block_announce_validation_result(res)
} else {
CustomMessageOutcome::None