Move ProtocolMsg to service.rs (#2560)

* Move ProtocolMsg to service.rs

* Fix line width

* Fix line width again

* Correct whitespace

* Another whitespace correction
This commit is contained in:
Pierre Krieger
2019-05-14 15:26:52 +02:00
committed by André Silva
parent a29fd10859
commit 18f7acce98
3 changed files with 388 additions and 188 deletions
+192 -177
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use futures::{prelude::*, sync::mpsc};
use futures::prelude::*;
use network_libp2p::PeerId;
use primitives::storage::StorageKey;
use consensus::{import_queue::IncomingBlock, import_queue::Origin, BlockOrigin};
@@ -77,7 +77,6 @@ const RPC_FAILED_REPUTATION_CHANGE: i32 = -(1 << 12);
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
network_chan: NetworkChan<B>,
port: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>,
/// Interval at which we call `tick`.
tick_timeout: tokio::timer::Interval,
/// Interval at which we call `propagate_extrinsics`.
@@ -187,13 +186,19 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B,
}
fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage) {
send_message(&mut self.context_data.peers, &self.network_chan, who,
send_message(
&mut self.context_data.peers,
&self.network_chan,
who,
GenericMessage::Consensus(consensus)
)
}
fn send_chain_specific(&mut self, who: PeerId, message: Vec<u8>) {
send_message(&mut self.context_data.peers, &self.network_chan, who,
send_message(
&mut self.context_data.peers,
&self.network_chan,
who,
GenericMessage::ChainSpecific(message)
)
}
@@ -217,13 +222,19 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext<B> for ProtocolContext<'a,
}
fn send_finality_proof_request(&mut self, who: PeerId, request: FinalityProofRequestMessage<B::Hash>) {
send_message(&mut self.context_data.peers, &self.network_chan, who,
send_message(
&mut self.context_data.peers,
&self.network_chan,
who,
GenericMessage::FinalityProofRequest(request)
)
}
fn send_block_request(&mut self, who: PeerId, request: BlockRequestMessage<B>) {
send_message(&mut self.context_data.peers, &self.network_chan, who,
send_message(
&mut self.context_data.peers,
&self.network_chan,
who,
GenericMessage::BlockRequest(request)
)
}
@@ -237,70 +248,6 @@ struct ContextData<B: BlockT, H: ExHashT> {
pub finality_proof_provider: Option<Arc<FinalityProofProvider<B>>>,
}
/// A task, consisting of a user-provided closure, to be executed on the Protocol thread.
pub trait SpecTask<B: BlockT, S: NetworkSpecialization<B>> {
fn call_box(self: Box<Self>, spec: &mut S, context: &mut Context<B>);
}
impl<B: BlockT, S: NetworkSpecialization<B>, F: FnOnce(&mut S, &mut Context<B>)> SpecTask<B, S> for F {
fn call_box(self: Box<F>, spec: &mut S, context: &mut Context<B>) {
(*self)(spec, context)
}
}
/// A task, consisting of a user-provided closure, to be executed on the Protocol thread.
pub trait GossipTask<B: BlockT> {
fn call_box(self: Box<Self>, gossip: &mut ConsensusGossip<B>, context: &mut Context<B>);
}
impl<B: BlockT, F: FnOnce(&mut ConsensusGossip<B>, &mut Context<B>)> GossipTask<B> for F {
fn call_box(self: Box<F>, gossip: &mut ConsensusGossip<B>, context: &mut Context<B>) {
(*self)(gossip, context)
}
}
/// Messages sent to Protocol from elsewhere inside the system.
pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> {
/// A batch of blocks has been processed, with or without errors.
BlocksProcessed(Vec<B::Hash>, bool),
/// Tell protocol to restart sync.
RestartSync,
/// Tell protocol to propagate extrinsics.
PropagateExtrinsics,
/// Tell protocol that a block was imported (sent by the import-queue).
BlockImportedSync(B::Hash, NumberFor<B>),
/// Tell protocol to clear all pending justification requests.
ClearJustificationRequests,
/// Tell protocol to request justification for a block.
RequestJustification(B::Hash, NumberFor<B>),
/// Inform protocol whether a justification was successfully imported.
JustificationImportResult(B::Hash, NumberFor<B>, bool),
/// Set finality proof request builder.
SetFinalityProofRequestBuilder(SharedFinalityProofRequestBuilder<B>),
/// Tell protocol to request finality proof for a block.
RequestFinalityProof(B::Hash, NumberFor<B>),
/// Inform protocol whether a finality proof was successfully imported.
FinalityProofImportResult((B::Hash, NumberFor<B>), Result<(B::Hash, NumberFor<B>), ()>),
/// Propagate a block to peers.
AnnounceBlock(B::Hash),
/// A block has been imported (sent by the client).
BlockImported(B::Hash, B::Header),
/// A block has been finalized (sent by the client).
BlockFinalized(B::Hash, B::Header),
/// Execute a closure with the chain-specific network specialization.
ExecuteWithSpec(Box<SpecTask<B, S> + Send + 'static>),
/// Execute a closure with the consensus gossip.
ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>),
/// Incoming gossip consensus message.
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>, GossipMessageRecipient),
/// Tell protocol to perform regular maintenance.
#[cfg(any(test, feature = "test-helpers"))]
Tick,
/// Synchronization request.
#[cfg(any(test, feature = "test-helpers"))]
Synchronize,
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Create a new instance.
pub fn new(
@@ -312,13 +259,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
on_demand: Option<Arc<OnDemandService<B>>>,
transaction_pool: Arc<TransactionPool<H, B>>,
specialization: S,
) -> error::Result<(Protocol<B, S, H>, mpsc::UnboundedSender<ProtocolMsg<B, S>>)> {
let (protocol_sender, port) = mpsc::unbounded();
) -> error::Result<Protocol<B, S, H>> {
let info = chain.info()?;
let sync = ChainSync::new(config.roles, &info);
let protocol = Protocol {
Ok(Protocol {
network_chan,
port,
tick_timeout: tokio::timer::Interval::new_interval(TICK_TIMEOUT),
propagate_timeout: tokio::timer::Interval::new_interval(PROPAGATE_TIMEOUT),
config: config,
@@ -335,9 +280,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
handshaking_peers: HashMap::new(),
connected_peers,
transaction_pool: transaction_pool,
};
Ok((protocol, protocol_sender))
})
}
/// Returns an object representing the status of the protocol.
@@ -376,84 +319,19 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Future for Protocol<B,
self.propagate_extrinsics();
}
loop {
match self.port.poll() {
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(msg))) => if !self.handle_client_msg(msg) {
return Ok(Async::Ready(()))
}
Ok(Async::NotReady) => break,
}
}
Ok(Async::NotReady)
}
}
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
fn handle_client_msg(&mut self, msg: ProtocolMsg<B, S>) -> bool {
match msg {
ProtocolMsg::BlockImported(hash, header) => self.on_block_imported(hash, &header),
ProtocolMsg::BlockFinalized(hash, header) => self.on_block_finalized(hash, &header),
ProtocolMsg::ExecuteWithSpec(task) => {
let mut context =
ProtocolContext::new(&mut self.context_data, &self.network_chan);
task.call_box(&mut self.specialization, &mut context);
},
ProtocolMsg::ExecuteWithGossip(task) => {
let mut context =
ProtocolContext::new(&mut self.context_data, &self.network_chan);
task.call_box(&mut self.consensus_gossip, &mut context);
}
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => {
self.gossip_consensus_message(topic, engine_id, message, recipient)
}
ProtocolMsg::BlocksProcessed(hashes, has_error) => {
self.sync.blocks_processed(hashes, has_error);
let mut context =
ProtocolContext::new(&mut self.context_data, &self.network_chan);
self.sync.maintain_sync(&mut context);
},
ProtocolMsg::RestartSync => {
let mut context =
ProtocolContext::new(&mut self.context_data, &self.network_chan);
self.sync.restart(&mut context);
}
ProtocolMsg::AnnounceBlock(hash) => self.announce_block(hash),
ProtocolMsg::BlockImportedSync(hash, number) => self.sync.block_imported(&hash, number),
ProtocolMsg::ClearJustificationRequests => self.sync.clear_justification_requests(),
ProtocolMsg::RequestJustification(hash, number) => {
let mut context =
ProtocolContext::new(&mut self.context_data, &self.network_chan);
self.sync.request_justification(&hash, number, &mut context);
},
ProtocolMsg::JustificationImportResult(hash, number, success) => self.sync.justification_import_result(hash, number, success),
ProtocolMsg::SetFinalityProofRequestBuilder(builder) => self.sync.set_finality_proof_request_builder(builder),
ProtocolMsg::RequestFinalityProof(hash, number) => {
let mut context =
ProtocolContext::new(&mut self.context_data, &self.network_chan);
self.sync.request_finality_proof(&hash, number, &mut context);
},
ProtocolMsg::FinalityProofImportResult(
requested_block,
finalziation_result,
) => self.sync.finality_proof_import_result(requested_block, finalziation_result),
ProtocolMsg::PropagateExtrinsics => self.propagate_extrinsics(),
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Tick => self.tick(),
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Synchronize => {
trace!(target: "sync", "handle_client_msg: received Synchronize msg");
self.network_chan.send(NetworkMsg::Synchronized)
}
}
true
}
fn handle_response(&mut self, who: PeerId, response: &message::BlockResponse<B>) -> Option<message::BlockRequest<B>> {
fn handle_response(
&mut self,
who: PeerId,
response: &message::BlockResponse<B>
) -> Option<message::BlockRequest<B>> {
if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
if let Some(_) = peer.obsolete_requests.remove(&response.id) {
trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", who, response.id,);
trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", who, response.id);
return None;
}
// Clear the request. If the response is invalid peer will be disconnected anyway.
@@ -536,7 +414,20 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
);
}
fn gossip_consensus_message(
/// Locks `self` and returns a context plus the `ConsensusGossip` struct.
pub fn consensus_gossip_lock<'a>(&'a mut self) -> (impl Context<B> + 'a, &'a mut ConsensusGossip<B>) {
let context = ProtocolContext::new(&mut self.context_data, &self.network_chan);
(context, &mut self.consensus_gossip)
}
/// Locks `self` and returns a context plus the network specialization.
pub fn specialization_lock<'a>(&'a mut self) -> (impl Context<B> + 'a, &'a mut S) {
let context = ProtocolContext::new(&mut self.context_data, &self.network_chan);
(context, &mut self.specialization)
}
/// Gossip a consensus message to the network.
pub fn gossip_consensus_message(
&mut self,
topic: B::Hash,
engine_id: ConsensusEngineId,
@@ -684,15 +575,19 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
response: message::BlockResponse<B>,
) -> CustomMessageOutcome<B> {
let blocks_range = match (
response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
trace!(target: "sync", "BlockResponse {} from {} with {} blocks {}",
response.id, peer, response.blocks.len(), blocks_range);
response.id,
peer,
response.blocks.len(),
blocks_range
);
// TODO [andre]: move this logic to the import queue so that
// justifications are imported asynchronously (#1482)
@@ -711,7 +606,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
} else {
let outcome = self.sync.on_block_data(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan), peer, request, response);
let outcome = self.sync.on_block_data(
&mut ProtocolContext::new(&mut self.context_data, &self.network_chan),
peer,
request,
response
);
if let Some((origin, blocks)) = outcome {
CustomMessageOutcome::BlockImport(origin, blocks)
} else {
@@ -721,7 +621,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
/// Perform time based maintenance.
fn tick(&mut self) {
///
/// > **Note**: This method normally doesn't have to be called except for testing purposes.
pub fn tick(&mut self) {
self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan));
self.maintain_peers();
self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan));
@@ -743,7 +645,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
aborting.push(who.clone());
}
}
for (who, _) in self.handshaking_peers.iter().filter(|(_, handshaking)| (tick - handshaking.timestamp).as_secs() > REQUEST_TIMEOUT_SEC) {
for (who, _) in self.handshaking_peers.iter()
.filter(|(_, handshaking)| (tick - handshaking.timestamp).as_secs() > REQUEST_TIMEOUT_SEC)
{
trace!(target: "sync", "Handshake timeout {}", who);
aborting.push(who.clone());
}
@@ -795,7 +699,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
.unwrap_or(0);
if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE {
debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who);
self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE));
self.network_chan.send(
NetworkMsg::ReportPeer(who.clone(), PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE)
);
self.network_chan.send(NetworkMsg::DisconnectPeer(who));
return;
}
@@ -867,8 +773,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
}
/// Called when we propagate ready extrinsics to peers.
fn propagate_extrinsics(&mut self) {
/// Call when we must propagate ready extrinsics to peers.
pub fn propagate_extrinsics(&mut self) {
debug!(target: "sync", "Propagating extrinsics");
// Accept transactions only when fully synced
@@ -903,7 +809,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
///
/// In chain-based consensus, we often need to make sure non-best forks are
/// at least temporarily synced.
fn announce_block(&mut self, hash: B::Hash) {
pub fn announce_block(&mut self, hash: B::Hash) {
let header = match self.context_data.chain.header(&BlockId::Hash(hash)) {
Ok(Some(header)) => header,
Ok(None) => {
@@ -961,7 +867,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
);
}
fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) {
/// Call this when a block has been imported in the import queue and we should announce it on
/// the network.
pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) {
self.sync.update_chain_info(header);
self.specialization.on_block_imported(
&mut ProtocolContext::new(&mut self.context_data, &self.network_chan),
@@ -986,7 +894,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
}
fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) {
/// Call this when a block has been finalized. The sync layer may have some additional
/// requesting to perform.
pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) {
self.sync.on_block_finalized(
&hash,
*header.number(),
@@ -999,7 +909,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
who: PeerId,
request: message::RemoteCallRequest<B::Hash>,
) {
trace!(target: "sync", "Remote call request {} from {} ({} at {})", request.id, who, request.method, request.block);
trace!(target: "sync", "Remote call request {} from {} ({} at {})",
request.id,
who,
request.method,
request.block
);
let proof = match self.context_data.chain.execution_proof(
&request.block,
&request.method,
@@ -1008,7 +923,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
Ok((_, proof)) => proof,
Err(error) => {
trace!(target: "sync", "Remote call request {} from {} ({} at {}) failed with: {}",
request.id, who, request.method, request.block, error);
request.id,
who,
request.method,
request.block,
error
);
self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), RPC_FAILED_REPUTATION_CHANGE));
Default::default()
}
@@ -1023,6 +943,68 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
);
}
/// Request a justification for the given block.
///
/// Uses `protocol` to queue a new justification request and tries to dispatch all pending
/// requests.
pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
let mut context =
ProtocolContext::new(&mut self.context_data, &self.network_chan);
self.sync.request_justification(&hash, number, &mut context);
}
/// Clears all pending justification requests.
pub fn clear_justification_requests(&mut self) {
self.sync.clear_justification_requests()
}
/// A batch of blocks have been processed, with or without errors.
/// Call this when a batch of blocks have been processed by the import queue, with or without
/// errors.
pub fn blocks_processed(&mut self, processed_blocks: Vec<B::Hash>, has_error: bool) {
self.sync.blocks_processed(processed_blocks, has_error);
let mut context =
ProtocolContext::new(&mut self.context_data, &self.network_chan);
self.sync.maintain_sync(&mut context);
}
/// Restart the sync process.
pub fn restart(&mut self) {
let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan);
self.sync.restart(&mut context);
}
/// Notify about successful import of the given block.
pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.sync.block_imported(hash, number)
}
pub fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder<B>) {
self.sync.set_finality_proof_request_builder(request_builder)
}
/// Call this when a justification has been processed by the import queue, with or without
/// errors.
pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
self.sync.justification_import_result(hash, number, success)
}
/// Request a finality proof for the given block.
///
/// Queues a new finality proof request and tries to dispatch all pending requests.
pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>) {
let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan);
self.sync.request_finality_proof(&hash, number, &mut context);
}
pub fn finality_proof_import_result(
&mut self,
request_block: (B::Hash, NumberFor<B>),
finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
) {
self.sync.finality_proof_import_result(request_block, finalization_result)
}
fn on_remote_call_response(&mut self, who: PeerId, response: message::RemoteCallResponse) {
trace!(target: "sync", "Remote call response {} from {}", response.id, who);
self.on_demand
@@ -1041,7 +1023,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
Ok(proof) => proof,
Err(error) => {
trace!(target: "sync", "Remote read request {} from {} ({} at {}) failed with: {}",
request.id, who, request.key.to_hex::<String>(), request.block, error);
request.id,
who,
request.key.to_hex::<String>(),
request.block,
error
);
Default::default()
}
};
@@ -1071,7 +1058,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
Ok((header, proof)) => (Some(header), proof),
Err(error) => {
trace!(target: "sync", "Remote header proof request {} from {} ({}) failed with: {}",
request.id, who, request.block, error);
request.id,
who,
request.block,
error
);
(Default::default(), Default::default())
}
};
@@ -1102,13 +1093,30 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
request: message::RemoteChangesRequest<B::Hash>,
) {
trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{})",
request.id, who, request.key.to_hex::<String>(), request.first, request.last);
request.id,
who,
request.key.to_hex::<String>(),
request.first,
request.last
);
let key = StorageKey(request.key);
let proof = match self.context_data.chain.key_changes_proof(request.first, request.last, request.min, request.max, &key) {
let proof = match self.context_data.chain.key_changes_proof(
request.first,
request.last,
request.min,
request.max,
&key
) {
Ok(proof) => proof,
Err(error) => {
trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{}) failed with: {}",
request.id, who, key.0.to_hex::<String>(), request.first, request.last, error);
request.id,
who,
key.0.to_hex::<String>(),
request.first,
request.last,
error
);
ChangesProof::<B::Header> {
max_block: Zero::zero(),
proof: vec![],
@@ -1135,7 +1143,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
response: message::RemoteChangesResponse<NumberFor<B>, B::Hash>,
) {
trace!(target: "sync", "Remote changes proof response {} from {} (max={})",
response.id, who, response.max);
response.id,
who,
response.max
);
self.on_demand
.as_ref()
.map(|s| s.on_remote_changes_response(who, response));
@@ -1149,13 +1160,17 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
trace!(target: "sync", "Finality proof request from {} for {}", who, request.block);
let finality_proof = self.context_data.finality_proof_provider.as_ref()
.ok_or_else(|| String::from("Finality provider is not configured"))
.and_then(|provider| provider.prove_finality(request.block, &request.request)
.map_err(|e| e.to_string()));
.and_then(|provider|
provider.prove_finality(request.block, &request.request).map_err(|e| e.to_string())
);
let finality_proof = match finality_proof {
Ok(finality_proof) => finality_proof,
Err(error) => {
trace!(target: "sync", "Finality proof request from {} for {} failed with: {}",
who, request.block, error);
who,
request.block,
error
);
None
},
};
+131 -3
View File
@@ -31,7 +31,7 @@ use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId
use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
use crate::message::Message;
use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer, ProtocolMsg, ProtocolStatus, PeerInfo};
use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer, ProtocolStatus, PeerInfo};
use crate::config::Params;
use crate::error::Error;
use crate::specialization::NetworkSpecialization;
@@ -200,12 +200,13 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
import_queue: Box<ImportQueue<B>>,
) -> Result<Arc<Service<B, S>>, Error> {
let (network_chan, network_port) = network_channel();
let (protocol_sender, protocol_rx) = mpsc::unbounded();
let status_sinks = Arc::new(Mutex::new(Vec::new()));
// Start in off-line mode, since we're not connected to any nodes yet.
let is_offline = Arc::new(AtomicBool::new(true));
let is_major_syncing = Arc::new(AtomicBool::new(false));
let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>> = Arc::new(Default::default());
let (protocol, protocol_sender) = Protocol::new(
let protocol = Protocol::new(
peers.clone(),
network_chan.clone(),
params.config,
@@ -223,6 +224,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
protocol,
import_queue.clone(),
network_port,
protocol_rx,
status_sinks.clone(),
params.network_config,
registered,
@@ -520,6 +522,70 @@ pub enum NetworkMsg<B: BlockT + 'static> {
Synchronized,
}
/// Messages sent to Protocol from elsewhere inside the system.
pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> {
/// A batch of blocks has been processed, with or without errors.
BlocksProcessed(Vec<B::Hash>, bool),
/// Tell protocol to restart sync.
RestartSync,
/// Tell protocol to propagate extrinsics.
PropagateExtrinsics,
/// Tell protocol that a block was imported (sent by the import-queue).
BlockImportedSync(B::Hash, NumberFor<B>),
/// Tell protocol to clear all pending justification requests.
ClearJustificationRequests,
/// Tell protocol to request justification for a block.
RequestJustification(B::Hash, NumberFor<B>),
/// Inform protocol whether a justification was successfully imported.
JustificationImportResult(B::Hash, NumberFor<B>, bool),
/// Set finality proof request builder.
SetFinalityProofRequestBuilder(SharedFinalityProofRequestBuilder<B>),
/// Tell protocol to request finality proof for a block.
RequestFinalityProof(B::Hash, NumberFor<B>),
/// Inform protocol whether a finality proof was successfully imported.
FinalityProofImportResult((B::Hash, NumberFor<B>), Result<(B::Hash, NumberFor<B>), ()>),
/// Propagate a block to peers.
AnnounceBlock(B::Hash),
/// A block has been imported (sent by the client).
BlockImported(B::Hash, B::Header),
/// A block has been finalized (sent by the client).
BlockFinalized(B::Hash, B::Header),
/// Execute a closure with the chain-specific network specialization.
ExecuteWithSpec(Box<SpecTask<B, S> + Send + 'static>),
/// Execute a closure with the consensus gossip.
ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>),
/// Incoming gossip consensus message.
GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>, GossipMessageRecipient),
/// Tell protocol to perform regular maintenance.
#[cfg(any(test, feature = "test-helpers"))]
Tick,
/// Synchronization request.
#[cfg(any(test, feature = "test-helpers"))]
Synchronize,
}
/// A task, consisting of a user-provided closure, to be executed on the Protocol thread.
pub trait SpecTask<B: BlockT, S: NetworkSpecialization<B>> {
fn call_box(self: Box<Self>, spec: &mut S, context: &mut Context<B>);
}
impl<B: BlockT, S: NetworkSpecialization<B>, F: FnOnce(&mut S, &mut Context<B>)> SpecTask<B, S> for F {
fn call_box(self: Box<F>, spec: &mut S, context: &mut Context<B>) {
(*self)(spec, context)
}
}
/// A task, consisting of a user-provided closure, to be executed on the Protocol thread.
pub trait GossipTask<B: BlockT> {
fn call_box(self: Box<Self>, gossip: &mut ConsensusGossip<B>, context: &mut Context<B>);
}
impl<B: BlockT, F: FnOnce(&mut ConsensusGossip<B>, &mut Context<B>)> GossipTask<B> for F {
fn call_box(self: Box<F>, gossip: &mut ConsensusGossip<B>, context: &mut Context<B>) {
(*self)(gossip, context)
}
}
/// Starts the background thread that handles the networking.
fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
is_offline: Arc<AtomicBool>,
@@ -527,6 +593,7 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
protocol: Protocol<B, S, H>,
import_queue: Box<ImportQueue<B>>,
network_port: NetworkPort<B>,
protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
config: NetworkConfiguration,
registered: RegisteredProtocol<Message<B>>,
@@ -545,7 +612,17 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
let mut runtime = RuntimeBuilder::new().name_prefix("libp2p-").build()?;
let peerset_clone = peerset.clone();
let thread = thread::Builder::new().name("network".to_string()).spawn(move || {
let fut = run_thread(is_offline, is_major_syncing, protocol, service_clone, import_queue, network_port, status_sinks, peerset_clone)
let fut = run_thread(
is_offline,
is_major_syncing,
protocol,
service_clone,
import_queue,
network_port,
protocol_rx,
status_sinks,
peerset_clone
)
.select(close_rx.then(|_| Ok(())))
.map(|(val, _)| val)
.map_err(|(err,_ )| err);
@@ -569,6 +646,7 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
network_service: Arc<Mutex<NetworkService<Message<B>>>>,
import_queue: Box<ImportQueue<B>>,
network_port: NetworkPort<B>,
mut protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
peerset: PeersetHandle,
) -> impl Future<Item = (), Error = io::Error> {
@@ -603,6 +681,56 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
}
}
loop {
let msg = match protocol_rx.poll() {
Ok(Async::Ready(Some(msg))) => msg,
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => break,
};
match msg {
ProtocolMsg::BlockImported(hash, header) =>
protocol.on_block_imported(hash, &header),
ProtocolMsg::BlockFinalized(hash, header) =>
protocol.on_block_finalized(hash, &header),
ProtocolMsg::ExecuteWithSpec(task) => {
let (mut context, spec) = protocol.specialization_lock();
task.call_box(spec, &mut context);
},
ProtocolMsg::ExecuteWithGossip(task) => {
let (mut context, gossip) = protocol.consensus_gossip_lock();
task.call_box(gossip, &mut context);
}
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) =>
protocol.gossip_consensus_message(topic, engine_id, message, recipient),
ProtocolMsg::BlocksProcessed(hashes, has_error) =>
protocol.blocks_processed(hashes, has_error),
ProtocolMsg::RestartSync =>
protocol.restart(),
ProtocolMsg::AnnounceBlock(hash) =>
protocol.announce_block(hash),
ProtocolMsg::BlockImportedSync(hash, number) =>
protocol.block_imported(&hash, number),
ProtocolMsg::ClearJustificationRequests =>
protocol.clear_justification_requests(),
ProtocolMsg::RequestJustification(hash, number) =>
protocol.request_justification(&hash, number),
ProtocolMsg::JustificationImportResult(hash, number, success) =>
protocol.justification_import_result(hash, number, success),
ProtocolMsg::SetFinalityProofRequestBuilder(builder) =>
protocol.set_finality_proof_request_builder(builder),
ProtocolMsg::RequestFinalityProof(hash, number) =>
protocol.request_finality_proof(&hash, number),
ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) =>
protocol.finality_proof_import_result(requested_block, finalziation_result),
ProtocolMsg::PropagateExtrinsics => protocol.propagate_extrinsics(),
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Tick => protocol.tick(),
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Synchronize => protocol.synchronize(),
}
}
loop {
let outcome = match network_service.lock().poll() {
Ok(Async::NotReady) => break,
+65 -8
View File
@@ -44,13 +44,11 @@ use crate::message::Message;
use network_libp2p::PeerId;
use parking_lot::{Mutex, RwLock};
use primitives::{H256, sr25519::Public as AuthorityId, Blake2Hasher};
use crate::protocol::{
ConnectedPeer, Context, Protocol, ProtocolStatus, ProtocolMsg, CustomMessageOutcome,
};
use crate::protocol::{ConnectedPeer, Context, Protocol, ProtocolStatus, CustomMessageOutcome};
use runtime_primitives::generic::BlockId;
use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor};
use runtime_primitives::{Justification, ConsensusEngineId};
use crate::service::{network_channel, NetworkChan, NetworkLink, NetworkMsg, NetworkPort, TransactionPool};
use crate::service::{network_channel, NetworkChan, NetworkLink, NetworkMsg, NetworkPort, ProtocolMsg, TransactionPool};
use crate::specialization::NetworkSpecialization;
use test_client::{self, AccountKeyring};
@@ -520,7 +518,9 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> {
}
let header = self.client.header(&BlockId::Hash(info.chain.finalized_hash)).unwrap().unwrap();
self.net_proto_channel.send_from_client(ProtocolMsg::BlockFinalized(info.chain.finalized_hash, header.clone()));
self.net_proto_channel.send_from_client(
ProtocolMsg::BlockFinalized(info.chain.finalized_hash, header.clone())
);
*finalized_hash = Some(info.chain.finalized_hash);
}
@@ -748,6 +748,7 @@ pub trait TestNetFactory: Sized {
import_queue: Box<BasicQueue<Block>>,
mut protocol: Protocol<Block, Self::Specialization, Hash>,
mut network_to_protocol_rx: mpsc::UnboundedReceiver<FromNetworkMsg<Block>>,
mut protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<Block, Self::Specialization>>,
peer: Arc<Peer<Self::PeerData, Self::Specialization>>,
) {
std::thread::spawn(move || {
@@ -782,12 +783,64 @@ pub trait TestNetFactory: Sized {
}
}
loop {
let msg = match protocol_rx.poll() {
Ok(Async::Ready(Some(msg))) => msg,
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => break,
};
match msg {
ProtocolMsg::BlockImported(hash, header) =>
protocol.on_block_imported(hash, &header),
ProtocolMsg::BlockFinalized(hash, header) =>
protocol.on_block_finalized(hash, &header),
ProtocolMsg::ExecuteWithSpec(task) => {
let (mut context, spec) = protocol.specialization_lock();
task.call_box(spec, &mut context);
},
ProtocolMsg::ExecuteWithGossip(task) => {
let (mut context, gossip) = protocol.consensus_gossip_lock();
task.call_box(gossip, &mut context);
}
ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) =>
protocol.gossip_consensus_message(topic, engine_id, message, recipient),
ProtocolMsg::BlocksProcessed(hashes, has_error) =>
protocol.blocks_processed(hashes, has_error),
ProtocolMsg::RestartSync =>
protocol.restart(),
ProtocolMsg::AnnounceBlock(hash) =>
protocol.announce_block(hash),
ProtocolMsg::BlockImportedSync(hash, number) =>
protocol.block_imported(&hash, number),
ProtocolMsg::ClearJustificationRequests =>
protocol.clear_justification_requests(),
ProtocolMsg::RequestJustification(hash, number) =>
protocol.request_justification(&hash, number),
ProtocolMsg::JustificationImportResult(hash, number, success) =>
protocol.justification_import_result(hash, number, success),
ProtocolMsg::SetFinalityProofRequestBuilder(builder) =>
protocol.set_finality_proof_request_builder(builder),
ProtocolMsg::RequestFinalityProof(hash, number) =>
protocol.request_finality_proof(&hash, number),
ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) =>
protocol.finality_proof_import_result(requested_block, finalziation_result),
ProtocolMsg::PropagateExtrinsics => protocol.propagate_extrinsics(),
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Tick => protocol.tick(),
#[cfg(any(test, feature = "test-helpers"))]
ProtocolMsg::Synchronize => {
trace!(target: "sync", "handle_client_msg: received Synchronize msg");
protocol.synchronize();
}
}
}
if let Async::Ready(_) = protocol.poll().unwrap() {
return Ok(Async::Ready(()))
}
*protocol_status.write() = protocol.status();
Ok(Async::NotReady)
}));
});
@@ -825,8 +878,9 @@ pub trait TestNetFactory: Sized {
let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>> = Arc::new(Default::default());
let (network_to_protocol_sender, network_to_protocol_rx) = mpsc::unbounded();
let (protocol_sender, protocol_rx) = mpsc::unbounded();
let (protocol, protocol_sender) = Protocol::new(
let protocol = Protocol::new(
peers.clone(),
network_sender.clone(),
config.clone(),
@@ -843,6 +897,7 @@ pub trait TestNetFactory: Sized {
import_queue.clone(),
protocol,
network_to_protocol_rx,
protocol_rx,
Arc::new(Peer::new(
protocol_status,
peers,
@@ -880,8 +935,9 @@ pub trait TestNetFactory: Sized {
let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>> = Arc::new(Default::default());
let (network_to_protocol_sender, network_to_protocol_rx) = mpsc::unbounded();
let (protocol_sender, protocol_rx) = mpsc::unbounded();
let (protocol, protocol_sender) = Protocol::new(
let protocol = Protocol::new(
peers.clone(),
network_sender.clone(),
config,
@@ -898,6 +954,7 @@ pub trait TestNetFactory: Sized {
import_queue.clone(),
protocol,
network_to_protocol_rx,
protocol_rx,
Arc::new(Peer::new(
protocol_status,
peers,