Remove sync::Context trait. (#3105)

Instead of passing a context around to each method, thereby introducing
side-effecting I/O actions everywhere, with this PR `sync::ChainSync`
only contains state which is updated by invoking various callback
methods (`on_*`) and actionable items are returned as regular results
from method calls, often iterators yielding requests that should be
issued to peers. It is up to the caller to handle these in an
appropriate way, currently `protocol` will send those as messages.
This commit is contained in:
Toralf Wittner
2019-07-12 20:37:38 +02:00
committed by Gavin Wood
parent e4d4548121
commit 9ee79d5c5e
4 changed files with 902 additions and 914 deletions
+1
View File
@@ -4542,6 +4542,7 @@ dependencies = [
"bitflags 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "bitflags 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)",
"either 1.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
"erased-serde 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "erased-serde 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
+1
View File
@@ -9,6 +9,7 @@ edition = "2018"
[dependencies] [dependencies]
bytes = "0.4" bytes = "0.4"
derive_more = "0.14.0" derive_more = "0.14.0"
either = "1.5.2"
log = "0.4" log = "0.4"
parking_lot = "0.8.0" parking_lot = "0.8.0"
bitflags = "1.0" bitflags = "1.0"
+99 -118
View File
@@ -28,17 +28,13 @@ use runtime_primitives::traits::{
Block as BlockT, Header as HeaderT, NumberFor, One, Zero, Block as BlockT, Header as HeaderT, NumberFor, One, Zero,
CheckedSub, SaturatedConversion CheckedSub, SaturatedConversion
}; };
use message::{ use message::{BlockAttributes, Direction, FromBlock, Message, RequestId};
BlockRequest as BlockRequestMessage,
FinalityProofRequest as FinalityProofRequestMessage, Message,
};
use message::{BlockAttributes, Direction, FromBlock, RequestId};
use message::generic::{Message as GenericMessage, ConsensusMessage}; use message::generic::{Message as GenericMessage, ConsensusMessage};
use event::Event; use event::Event;
use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
use on_demand::{OnDemandCore, OnDemandNetwork, RequestData}; use on_demand::{OnDemandCore, OnDemandNetwork, RequestData};
use specialization::NetworkSpecialization; use specialization::NetworkSpecialization;
use sync::{ChainSync, Context as SyncContext, SyncState}; use sync::{ChainSync, SyncState};
use crate::service::{TransactionPool, ExHashT}; use crate::service::{TransactionPool, ExHashT};
use crate::config::{BoxFinalityProofRequestBuilder, Roles}; use crate::config::{BoxFinalityProofRequestBuilder, Roles};
use rustc_hex::ToHex; use rustc_hex::ToHex;
@@ -326,38 +322,6 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B,
} }
} }
impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext<B> for ProtocolContext<'a, B, H> {
fn report_peer(&mut self, who: PeerId, reputation: i32) {
self.peerset_handle.report_peer(who, reputation)
}
fn disconnect_peer(&mut self, who: PeerId) {
self.behaviour.disconnect_peer(&who)
}
fn client(&self) -> &dyn Client<B> {
&*self.context_data.chain
}
fn send_finality_proof_request(&mut self, who: PeerId, request: FinalityProofRequestMessage<B::Hash>) {
send_message(
self.behaviour,
&mut self.context_data.peers,
who,
GenericMessage::FinalityProofRequest(request)
)
}
fn send_block_request(&mut self, who: PeerId, request: BlockRequestMessage<B>) {
send_message(
self.behaviour,
&mut self.context_data.peers,
who,
GenericMessage::BlockRequest(request)
)
}
}
/// Data necessary to create a context. /// Data necessary to create a context.
struct ContextData<B: BlockT, H: ExHashT> { struct ContextData<B: BlockT, H: ExHashT> {
// All connected peers // All connected peers
@@ -394,7 +358,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
peerset_config: peerset::PeersetConfig, peerset_config: peerset::PeersetConfig,
) -> error::Result<(Protocol<B, S, H>, peerset::PeersetHandle)> { ) -> error::Result<(Protocol<B, S, H>, peerset::PeersetHandle)> {
let info = chain.info(); let info = chain.info();
let sync = ChainSync::new(config.roles, &info, finality_proof_request_builder); let sync = ChainSync::new(config.roles, chain.clone(), &info, finality_proof_request_builder);
let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config); let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config);
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>(); let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
let behaviour = CustomProto::new(protocol_id, versions, peerset); let behaviour = CustomProto::new(protocol_id, versions, peerset);
@@ -661,7 +625,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
if peer_data.info.protocol_version > 2 { if peer_data.info.protocol_version > 2 {
self.consensus_gossip.peer_disconnected(&mut context, peer.clone()); self.consensus_gossip.peer_disconnected(&mut context, peer.clone());
} }
self.sync.peer_disconnected(&mut context, peer.clone()); self.sync.peer_disconnected(peer.clone());
self.specialization.on_disconnect(&mut context, peer.clone()); self.specialization.on_disconnect(&mut context, peer.clone());
self.on_demand_core.on_disconnect(OnDemandIn { self.on_demand_core.on_disconnect(OnDemandIn {
behaviour: &mut self.behaviour, behaviour: &mut self.behaviour,
@@ -792,29 +756,29 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
// TODO [andre]: move this logic to the import queue so that // TODO [andre]: move this logic to the import queue so that
// justifications are imported asynchronously (#1482) // justifications are imported asynchronously (#1482)
if request.fields == message::BlockAttributes::JUSTIFICATION { if request.fields == message::BlockAttributes::JUSTIFICATION {
let outcome = self.sync.on_block_justification_data( match self.sync.on_block_justification(peer, response) {
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), Ok(sync::OnBlockJustification::Nothing) => CustomMessageOutcome::None,
peer, Ok(sync::OnBlockJustification::Import { peer, hash, number, justification }) =>
response CustomMessageOutcome::JustificationImport(peer, hash, number, justification),
); Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
if let Some((origin, hash, nb, just)) = outcome { self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::JustificationImport(origin, hash, nb, just) CustomMessageOutcome::None
} else { }
CustomMessageOutcome::None
} }
} else { } else {
let outcome = self.sync.on_block_data( match self.sync.on_block_data(peer, request, response) {
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), Ok(sync::OnBlockData::Import(origin, blocks)) =>
peer, CustomMessageOutcome::BlockImport(origin, blocks),
request, Ok(sync::OnBlockData::Request(peer, req)) => {
response self.send_message(peer, GenericMessage::BlockRequest(req));
); CustomMessageOutcome::None
if let Some((origin, blocks)) = outcome { }
CustomMessageOutcome::BlockImport(origin, blocks) Err(sync::BadPeer(id, repu)) => {
} else { self.behaviour.disconnect_peer(&id);
CustomMessageOutcome::None self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::None
}
} }
} }
} }
@@ -827,7 +791,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle) &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle)
); );
self.maintain_peers(); self.maintain_peers();
self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle));
self.on_demand_core.maintain_peers(OnDemandIn { self.on_demand_core.maintain_peers(OnDemandIn {
behaviour: &mut self.behaviour, behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(), peerset: self.peerset_handle.clone(),
@@ -953,8 +916,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
behaviour: &mut self.behaviour, behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(), peerset: self.peerset_handle.clone(),
}, who.clone(), status.roles, status.best_number); }, who.clone(), status.roles, status.best_number);
match self.sync.new_peer(who.clone(), info) {
Ok(None) => (),
Ok(Some(req)) => self.send_message(who.clone(), GenericMessage::BlockRequest(req)),
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
self.peerset_handle.report_peer(id, repu)
}
}
let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle);
self.sync.new_peer(&mut context, who.clone(), info);
if protocol_version > 2 { if protocol_version > 2 {
self.consensus_gossip.new_peer(&mut context, who.clone(), status.roles); self.consensus_gossip.new_peer(&mut context, who.clone(), status.roles);
} }
@@ -1085,28 +1055,28 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
behaviour: &mut self.behaviour, behaviour: &mut self.behaviour,
peerset: self.peerset_handle.clone(), peerset: self.peerset_handle.clone(),
}, who.clone(), *header.number()); }, who.clone(), *header.number());
let try_import = self.sync.on_block_announce(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
who.clone(),
hash,
&header,
);
// try_import is only true when we have all data required to import block match self.sync.on_block_announce(who.clone(), hash, &header) {
// in the BlockAnnounce message. This is only when: sync::OnBlockAnnounce::Request(peer, req) => {
// 1) we're on light client; self.send_message(peer, GenericMessage::BlockRequest(req));
// AND return CustomMessageOutcome::None
// - EITHER 2.1) announced block is stale; }
// - OR 2.2) announced block is NEW and we normally only want to download this single block (i.e. sync::OnBlockAnnounce::Nothing => {
// there are no ascendants of this block scheduled for retrieval) // try_import is only true when we have all data required to import block
if !try_import { // in the BlockAnnounce message. This is only when:
return CustomMessageOutcome::None; // 1) we're on light client;
// AND
// - EITHER 2.1) announced block is stale;
// - OR 2.2) announced block is NEW and we normally only want to download this single block (i.e.
// there are no ascendants of this block scheduled for retrieval)
return CustomMessageOutcome::None
}
sync::OnBlockAnnounce::ImportHeader => () // We proceed with the import.
} }
// to import header from announced block let's construct response to request that normally would have // to import header from announced block let's construct response to request that normally would have
// been sent over network (but it is not in our case) // been sent over network (but it is not in our case)
let blocks_to_import = self.sync.on_block_data( let blocks_to_import = self.sync.on_block_data(
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
who.clone(), who.clone(),
message::generic::BlockRequest { message::generic::BlockRequest {
id: 0, id: 0,
@@ -1131,8 +1101,16 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}, },
); );
match blocks_to_import { match blocks_to_import {
Some((origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), Ok(sync::OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks),
None => CustomMessageOutcome::None, Ok(sync::OnBlockData::Request(peer, req)) => {
self.send_message(peer, GenericMessage::BlockRequest(req));
CustomMessageOutcome::None
}
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::None
}
} }
} }
@@ -1166,11 +1144,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Call this when a block has been finalized. The sync layer may have some additional /// Call this when a block has been finalized. The sync layer may have some additional
/// requesting to perform. /// requesting to perform.
pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) {
self.sync.on_block_finalized( self.sync.on_block_finalized(&hash, *header.number())
&hash,
*header.number(),
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle),
);
} }
fn on_remote_call_request( fn on_remote_call_request(
@@ -1217,9 +1191,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Uses `protocol` to queue a new justification request and tries to dispatch all pending /// Uses `protocol` to queue a new justification request and tries to dispatch all pending
/// requests. /// requests.
pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) { pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
let mut context = self.sync.request_justification(&hash, number)
ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle);
self.sync.request_justification(&hash, number, &mut context);
} }
/// Clears all pending justification requests. /// Clears all pending justification requests.
@@ -1230,43 +1202,43 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// A batch of blocks have been processed, with or without errors. /// A batch of blocks have been processed, with or without errors.
/// Call this when a batch of blocks have been processed by the import queue, with or without /// Call this when a batch of blocks have been processed by the import queue, with or without
/// errors. /// errors.
pub fn blocks_processed( pub fn blocks_processed(&mut self, processed_blocks: Vec<B::Hash>, has_error: bool) {
&mut self, self.sync.on_blocks_processed(processed_blocks, has_error);
processed_blocks: Vec<B::Hash>,
has_error: bool
) {
let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle);
self.sync.blocks_processed(&mut context, processed_blocks, has_error);
} }
/// Restart the sync process. /// Restart the sync process.
pub fn restart(&mut self) { pub fn restart(&mut self) {
let peers = self.context_data.peers.clone(); let peers = self.context_data.peers.clone();
let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); for result in self.sync.restart(|peer_id| peers.get(peer_id).map(|i| i.info.clone())) {
self.sync.restart(&mut context, |peer_id| peers.get(peer_id).map(|i| i.info.clone())); match result {
Ok((id, req)) => {
let msg = GenericMessage::BlockRequest(req);
send_message(&mut self.behaviour, &mut self.context_data.peers, id, msg)
}
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
self.peerset_handle.report_peer(id, repu)
}
}
}
} }
/// Notify about successful import of the given block. /// Notify about successful import of the given block.
pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) { pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.sync.block_imported(hash, number) trace!(target: "sync", "Block imported successfully {} ({})", number, hash)
} }
/// Call this when a justification has been processed by the import queue, with or without /// Call this when a justification has been processed by the import queue, with or without
/// errors. /// errors.
pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) { pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
self.sync.justification_import_result(hash, number, success) self.sync.on_justification_import(hash, number, success)
} }
/// Request a finality proof for the given block. /// Request a finality proof for the given block.
/// ///
/// Queues a new finality proof request and tries to dispatch all pending requests. /// Queues a new finality proof request and tries to dispatch all pending requests.
pub fn request_finality_proof( pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>) {
&mut self, self.sync.request_finality_proof(&hash, number)
hash: &B::Hash,
number: NumberFor<B>
) {
let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle);
self.sync.request_finality_proof(&hash, number, &mut context);
} }
pub fn finality_proof_import_result( pub fn finality_proof_import_result(
@@ -1274,7 +1246,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
request_block: (B::Hash, NumberFor<B>), request_block: (B::Hash, NumberFor<B>),
finalization_result: Result<(B::Hash, NumberFor<B>), ()>, finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
) { ) {
self.sync.finality_proof_import_result(request_block, finalization_result) self.sync.on_finality_proof_import(request_block, finalization_result)
} }
fn on_remote_call_response( fn on_remote_call_response(
@@ -1475,16 +1447,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
response: message::FinalityProofResponse<B::Hash>, response: message::FinalityProofResponse<B::Hash>,
) -> CustomMessageOutcome<B> { ) -> CustomMessageOutcome<B> {
trace!(target: "sync", "Finality proof response from {} for {}", who, response.block); trace!(target: "sync", "Finality proof response from {} for {}", who, response.block);
let outcome = self.sync.on_block_finality_proof_data( match self.sync.on_block_finality_proof(who, response) {
&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), Ok(sync::OnBlockFinalityProof::Nothing) => CustomMessageOutcome::None,
who, Ok(sync::OnBlockFinalityProof::Import { peer, hash, number, proof }) =>
response, CustomMessageOutcome::FinalityProofImport(peer, hash, number, proof),
); Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
if let Some((origin, hash, nb, proof)) = outcome { self.peerset_handle.report_peer(id, repu);
CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) CustomMessageOutcome::None
} else { }
CustomMessageOutcome::None
} }
} }
@@ -1575,6 +1546,16 @@ Protocol<B, S, H> {
self.propagate_extrinsics(); self.propagate_extrinsics();
} }
for (id, r) in self.sync.block_requests() {
send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::BlockRequest(r))
}
for (id, r) in self.sync.justification_requests() {
send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::BlockRequest(r))
}
for (id, r) in self.sync.finality_proof_requests() {
send_message(&mut self.behaviour, &mut self.context_data.peers, id, GenericMessage::FinalityProofRequest(r))
}
let event = match self.behaviour.poll(params) { let event = match self.behaviour.poll(params) {
Async::NotReady => return Async::NotReady, Async::NotReady => return Async::NotReady,
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev, Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev,
File diff suppressed because it is too large Load Diff