Network sync refactoring (part 2) (#11322)

* Move `api.v1.proto` schema into new crate `sc-network-sync`

* Move `sc_network::protocol::sync::state` module into `sc_network_sync::state`

* Move `sc_network::protocol::sync::blocks` module into `sc_network_sync::blocks` and some data structures from `sc_network::protocol::message` module into `sc_network_sync::message`

* Move some data structures from `sc_network::config` and `sc_network::request_responses` into new `sc-network-common` crate

* Move `sc_network::protocol::sync::warm` and `sc_network::warp_request_handler` modules into `sc_network_sync`

* Move `client/network/sync/src/lib.rs` to `client/network/sync/src/lib_old.rs` to preserve history of changes of the file in the next commit

* Move `client/network/src/protocol/sync.rs` on top of `client/network/sync/src/lib.rs` to preserve history of changes

* Move `sc_network::protocol::sync` to `sc_network_sync` with submodules, move message data structures around accordingly

* Move `sc_network::block_request_handler` to `sc_network_sync::block_request_handler`

* Move `sc_network::state_request_handler` to `sc_network_sync::state_request_handler`

* Add re-exports for compatibility reasons

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Nazar Mokrynskyi
2022-05-03 16:55:26 +03:00
committed by GitHub
parent 23c30b2b2a
commit e397e0b634
39 changed files with 806 additions and 495 deletions
+71 -68
View File
@@ -17,12 +17,10 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{
config::{self, ProtocolId, WarpSyncProvider},
error,
config, error,
request_responses::RequestFailure,
schema::v1::StateResponse,
utils::{interval, LruHashSet},
warp_request_handler::EncodedProof,
warp_request_handler::{EncodedProof, WarpSyncProvider},
};
use bytes::Bytes;
@@ -43,13 +41,23 @@ use libp2p::{
use log::{debug, error, info, log, trace, warn, Level};
use message::{
generic::{Message as GenericMessage, Roles},
BlockAnnounce, Message,
Message,
};
use notifications::{Notifications, NotificationsOut};
use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64};
use prost::Message as _;
use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::import_queue::{BlockImportError, BlockImportStatus, IncomingBlock, Origin};
use sc_network_common::config::ProtocolId;
use sc_network_sync::{
message::{
BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, BlockState,
FromBlock,
},
schema::v1::StateResponse,
BadPeer, ChainSync, OnBlockData, OnBlockJustification, OnStateData,
PollBlockAnnounceValidation, Status as SyncStatus,
};
use sp_arithmetic::traits::SaturatedConversion;
use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin};
use sp_runtime::{
@@ -67,13 +75,11 @@ use std::{
task::Poll,
time,
};
use sync::{ChainSync, Status as SyncStatus};
mod notifications;
pub mod event;
pub mod message;
pub mod sync;
pub use notifications::{NotificationsSink, NotifsHandlerError, Ready};
use sp_blockchain::HeaderMetadata;
@@ -202,7 +208,7 @@ pub struct Protocol<B: BlockT, Client> {
#[derive(Debug)]
enum PeerRequest<B: BlockT> {
Block(message::BlockRequest<B>),
Block(BlockRequest<B>),
State,
WarpProof,
}
@@ -240,15 +246,15 @@ pub struct ProtocolConfig {
}
impl ProtocolConfig {
fn sync_mode(&self) -> sync::SyncMode {
fn sync_mode(&self) -> sc_network_sync::SyncMode {
if self.roles.is_light() {
sync::SyncMode::Light
sc_network_sync::SyncMode::Light
} else {
match self.sync_mode {
config::SyncMode::Full => sync::SyncMode::Full,
config::SyncMode::Full => sc_network_sync::SyncMode::Full,
config::SyncMode::Fast { skip_proofs, storage_chain_mode } =>
sync::SyncMode::LightState { skip_proofs, storage_chain_mode },
config::SyncMode::Warp => sync::SyncMode::Warp,
sc_network_sync::SyncMode::LightState { skip_proofs, storage_chain_mode },
config::SyncMode::Warp => sc_network_sync::SyncMode::Warp,
}
}
}
@@ -563,7 +569,7 @@ where
fn prepare_block_request(
&mut self,
who: PeerId,
request: message::BlockRequest<B>,
request: BlockRequest<B>,
) -> CustomMessageOutcome<B> {
prepare_block_request::<B>(&mut self.peers, who, request)
}
@@ -579,9 +585,7 @@ where
}
if let Some(_peer_data) = self.peers.remove(&peer) {
if let Some(sync::OnBlockData::Import(origin, blocks)) =
self.sync.peer_disconnected(&peer)
{
if let Some(OnBlockData::Import(origin, blocks)) = self.sync.peer_disconnected(&peer) {
self.pending_messages
.push_back(CustomMessageOutcome::BlockImport(origin, blocks));
}
@@ -601,21 +605,21 @@ where
pub fn on_block_response(
&mut self,
peer_id: PeerId,
request: message::BlockRequest<B>,
response: crate::schema::v1::BlockResponse,
request: BlockRequest<B>,
response: sc_network_sync::schema::v1::BlockResponse,
) -> CustomMessageOutcome<B> {
let blocks = response
.blocks
.into_iter()
.map(|block_data| {
Ok(message::BlockData::<B> {
Ok(BlockData::<B> {
hash: Decode::decode(&mut block_data.hash.as_ref())?,
header: if !block_data.header.is_empty() {
Some(Decode::decode(&mut block_data.header.as_ref())?)
} else {
None
},
body: if request.fields.contains(message::BlockAttributes::BODY) {
body: if request.fields.contains(BlockAttributes::BODY) {
Some(
block_data
.body
@@ -626,8 +630,7 @@ where
} else {
None
},
indexed_body: if request.fields.contains(message::BlockAttributes::INDEXED_BODY)
{
indexed_body: if request.fields.contains(BlockAttributes::INDEXED_BODY) {
Some(block_data.indexed_body)
} else {
None
@@ -667,7 +670,7 @@ where
},
};
let block_response = message::BlockResponse::<B> { id: request.id, blocks };
let block_response = BlockResponse::<B> { id: request.id, blocks };
let blocks_range = || match (
block_response
@@ -687,12 +690,12 @@ where
blocks_range(),
);
if request.fields == message::BlockAttributes::JUSTIFICATION {
if request.fields == BlockAttributes::JUSTIFICATION {
match self.sync.on_block_justification(peer_id, block_response) {
Ok(sync::OnBlockJustification::Nothing) => CustomMessageOutcome::None,
Ok(sync::OnBlockJustification::Import { peer, hash, number, justifications }) =>
Ok(OnBlockJustification::Nothing) => CustomMessageOutcome::None,
Ok(OnBlockJustification::Import { peer, hash, number, justifications }) =>
CustomMessageOutcome::JustificationImport(peer, hash, number, justifications),
Err(sync::BadPeer(id, repu)) => {
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::None
@@ -700,10 +703,10 @@ where
}
} else {
match self.sync.on_block_data(&peer_id, Some(request), block_response) {
Ok(sync::OnBlockData::Import(origin, blocks)) =>
Ok(OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(sync::OnBlockData::Request(peer, req)) => self.prepare_block_request(peer, req),
Err(sync::BadPeer(id, repu)) => {
Ok(OnBlockData::Request(peer, req)) => self.prepare_block_request(peer, req),
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::None
@@ -720,10 +723,10 @@ where
response: StateResponse,
) -> CustomMessageOutcome<B> {
match self.sync.on_state_data(&peer_id, response) {
Ok(sync::OnStateData::Import(origin, block)) =>
Ok(OnStateData::Import(origin, block)) =>
CustomMessageOutcome::BlockImport(origin, vec![block]),
Ok(sync::OnStateData::Continue) => CustomMessageOutcome::None,
Err(sync::BadPeer(id, repu)) => {
Ok(OnStateData::Continue) => CustomMessageOutcome::None,
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::None
@@ -740,7 +743,7 @@ where
) -> CustomMessageOutcome<B> {
match self.sync.on_warp_sync_data(&peer_id, response) {
Ok(()) => CustomMessageOutcome::None,
Err(sync::BadPeer(id, repu)) => {
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::None
@@ -849,7 +852,7 @@ where
let req = if peer.info.roles.is_full() {
match self.sync.new_peer(who, peer.info.best_hash, peer.info.best_number) {
Ok(req) => req,
Err(sync::BadPeer(id, repu)) => {
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
return Err(())
@@ -906,13 +909,9 @@ where
let inserted = peer.known_blocks.insert(hash);
if inserted {
trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
let message = message::BlockAnnounce {
let message = BlockAnnounce {
header: header.clone(),
state: if is_best {
Some(message::BlockState::Best)
} else {
Some(message::BlockState::Normal)
},
state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
data: Some(data.clone()),
};
@@ -949,9 +948,9 @@ where
peer.known_blocks.insert(hash);
let is_best = match announce.state.unwrap_or(message::BlockState::Best) {
message::BlockState::Best => true,
message::BlockState::Normal => false,
let is_best = match announce.state.unwrap_or(BlockState::Best) {
BlockState::Best => true,
BlockState::Normal => false,
};
if peer.info.roles.is_full() {
@@ -962,11 +961,11 @@ where
/// Process the result of the block announce validation.
fn process_block_announce_validation_result(
&mut self,
validation_result: sync::PollBlockAnnounceValidation<B::Header>,
validation_result: PollBlockAnnounceValidation<B::Header>,
) -> CustomMessageOutcome<B> {
let (header, is_best, who) = match validation_result {
sync::PollBlockAnnounceValidation::Skip => return CustomMessageOutcome::None,
sync::PollBlockAnnounceValidation::Nothing { is_best, who, announce } => {
PollBlockAnnounceValidation::Skip => return CustomMessageOutcome::None,
PollBlockAnnounceValidation::Nothing { is_best, who, announce } => {
self.update_peer_info(&who);
if let Some(data) = announce.data {
@@ -987,7 +986,7 @@ where
return CustomMessageOutcome::None
}
},
sync::PollBlockAnnounceValidation::ImportHeader { announce, is_best, who } => {
PollBlockAnnounceValidation::ImportHeader { announce, is_best, who } => {
self.update_peer_info(&who);
if let Some(data) = announce.data {
@@ -998,7 +997,7 @@ where
(announce.header, is_best, who)
},
sync::PollBlockAnnounceValidation::Failure { who, disconnect } => {
PollBlockAnnounceValidation::Failure { who, disconnect } => {
if disconnect {
self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC);
}
@@ -1015,9 +1014,9 @@ where
let blocks_to_import = self.sync.on_block_data(
&who,
None,
message::generic::BlockResponse {
BlockResponse::<B> {
id: 0,
blocks: vec![message::generic::BlockData {
blocks: vec![BlockData::<B> {
hash: header.hash(),
header: Some(header),
body: None,
@@ -1035,10 +1034,10 @@ where
}
match blocks_to_import {
Ok(sync::OnBlockData::Import(origin, blocks)) =>
Ok(OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
Ok(sync::OnBlockData::Request(peer, req)) => self.prepare_block_request(peer, req),
Err(sync::BadPeer(id, repu)) => {
Ok(OnBlockData::Request(peer, req)) => self.prepare_block_request(peer, req),
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::None
@@ -1096,7 +1095,7 @@ where
req,
));
},
Err(sync::BadPeer(id, repu)) => {
Err(BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC);
self.peerset_handle.report_peer(id, repu)
},
@@ -1263,7 +1262,7 @@ where
fn prepare_block_request<B: BlockT>(
peers: &mut HashMap<PeerId, Peer<B>>,
who: PeerId,
request: message::BlockRequest<B>,
request: BlockRequest<B>,
) -> CustomMessageOutcome<B> {
let (tx, rx) = oneshot::channel();
@@ -1271,13 +1270,13 @@ fn prepare_block_request<B: BlockT>(
peer.request = Some((PeerRequest::Block(request.clone()), rx));
}
let request = crate::schema::v1::BlockRequest {
let request = sc_network_sync::schema::v1::BlockRequest {
fields: request.fields.to_be_u32(),
from_block: match request.from {
message::FromBlock::Hash(h) =>
Some(crate::schema::v1::block_request::FromBlock::Hash(h.encode())),
message::FromBlock::Number(n) =>
Some(crate::schema::v1::block_request::FromBlock::Number(n.encode())),
FromBlock::Hash(h) =>
Some(sc_network_sync::schema::v1::block_request::FromBlock::Hash(h.encode())),
FromBlock::Number(n) =>
Some(sc_network_sync::schema::v1::block_request::FromBlock::Number(n.encode())),
},
to_block: request.to.map(|h| h.encode()).unwrap_or_default(),
direction: request.direction as i32,
@@ -1291,7 +1290,7 @@ fn prepare_block_request<B: BlockT>(
fn prepare_state_request<B: BlockT>(
peers: &mut HashMap<PeerId, Peer<B>>,
who: PeerId,
request: crate::schema::v1::StateRequest,
request: sc_network_sync::schema::v1::StateRequest,
) -> CustomMessageOutcome<B> {
let (tx, rx) = oneshot::channel();
@@ -1348,13 +1347,13 @@ pub enum CustomMessageOutcome<B: BlockT> {
/// A new block request must be emitted.
BlockRequest {
target: PeerId,
request: crate::schema::v1::BlockRequest,
request: sc_network_sync::schema::v1::BlockRequest,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// A new storage request must be emitted.
StateRequest {
target: PeerId,
request: crate::schema::v1::StateRequest,
request: sc_network_sync::schema::v1::StateRequest,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
/// A new warp sync request must be emitted.
@@ -1458,7 +1457,9 @@ where
match req {
PeerRequest::Block(req) => {
let protobuf_response =
match crate::schema::v1::BlockResponse::decode(&resp[..]) {
match sc_network_sync::schema::v1::BlockResponse::decode(
&resp[..],
) {
Ok(proto) => proto,
Err(e) => {
debug!(
@@ -1478,7 +1479,9 @@ where
},
PeerRequest::State => {
let protobuf_response =
match crate::schema::v1::StateResponse::decode(&resp[..]) {
match sc_network_sync::schema::v1::StateResponse::decode(
&resp[..],
) {
Ok(proto) => proto,
Err(e) => {
debug!(
@@ -1764,7 +1767,7 @@ where
},
NotificationsOut::Notification { peer_id, set_id, message } => match set_id {
HARDCODED_PEERSETS_SYNC if self.peers.contains_key(&peer_id) => {
if let Ok(announce) = message::BlockAnnounce::decode(&mut message.as_ref()) {
if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) {
self.push_block_announce_validation(peer_id, announce);
// Make sure that the newly added block announce validation future was