Rework the OnDemand service (#2670)

* Rework the OnDemand service

* Try fix line widths
This commit is contained in:
Pierre Krieger
2019-05-23 21:13:23 +02:00
committed by Gavin Wood
parent ffce18b994
commit ff479c4e23
8 changed files with 665 additions and 583 deletions
+2 -2
View File
@@ -21,7 +21,7 @@ pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, NodeKeyConfi
use bitflags::bitflags; use bitflags::bitflags;
use crate::chain::{Client, FinalityProofProvider}; use crate::chain::{Client, FinalityProofProvider};
use parity_codec; use parity_codec;
use crate::on_demand::OnDemandService; use crate::on_demand_layer::OnDemand;
use runtime_primitives::traits::{Block as BlockT}; use runtime_primitives::traits::{Block as BlockT};
use crate::service::{ExHashT, TransactionPool}; use crate::service::{ExHashT, TransactionPool};
use std::sync::Arc; use std::sync::Arc;
@@ -37,7 +37,7 @@ pub struct Params<B: BlockT, S, H: ExHashT> {
/// Finality proof provider. /// Finality proof provider.
pub finality_proof_provider: Option<Arc<FinalityProofProvider<B>>>, pub finality_proof_provider: Option<Arc<FinalityProofProvider<B>>>,
/// On-demand service reference. /// On-demand service reference.
pub on_demand: Option<Arc<OnDemandService<B>>>, pub on_demand: Option<Arc<OnDemand<B>>>,
/// Transaction pool. /// Transaction pool.
pub transaction_pool: Arc<dyn TransactionPool<H, B>>, pub transaction_pool: Arc<dyn TransactionPool<H, B>>,
/// Protocol specialization. /// Protocol specialization.
+3 -1
View File
@@ -30,6 +30,7 @@ mod protocol;
mod chain; mod chain;
mod blocks; mod blocks;
mod on_demand; mod on_demand;
mod on_demand_layer;
mod util; mod util;
pub mod config; pub mod config;
pub mod consensus_gossip; pub mod consensus_gossip;
@@ -56,6 +57,7 @@ pub use network_libp2p::{
}; };
pub use message::{generic as generic_message, RequestId, Status as StatusMessage}; pub use message::{generic as generic_message, RequestId, Status as StatusMessage};
pub use error::Error; pub use error::Error;
pub use on_demand::{OnDemand, OnDemandService, RemoteResponse}; pub use on_demand::AlwaysBadChecker;
pub use on_demand_layer::{OnDemand, RemoteResponse};
#[doc(hidden)] #[doc(hidden)]
pub use runtime_primitives::traits::Block as BlockT; pub use runtime_primitives::traits::Block as BlockT;
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,149 @@
// Copyright 2017-2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! On-demand requests service.
use crate::on_demand::RequestData;
use std::sync::Arc;
use futures::{prelude::*, sync::mpsc, sync::oneshot};
use parking_lot::Mutex;
use client::error::Error as ClientError;
use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest,
RemoteReadChildRequest, RemoteBodyRequest};
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor};
/// Implements the `Fetcher` trait of the client. Makes it possible for the light client to perform
/// network requests for some state.
///
/// This implementation stores all the requests in a queue. The network, in parallel, is then
/// responsible for pulling elements out of that queue and fulfilling them.
pub struct OnDemand<B: BlockT> {
/// Objects that checks whether what has been retrieved is correct.
checker: Arc<dyn FetchChecker<B>>,
/// Queue of requests. Set to `Some` at initialization, then extracted by the network.
///
/// Note that a better alternative would be to use a MPMC queue here, and add a `poll` method
/// from the `OnDemand`. However there exists no popular implementation of MPMC channels in
/// asynchronous Rust at the moment
requests_queue: Mutex<Option<mpsc::UnboundedReceiver<RequestData<B>>>>,
/// Sending side of `requests_queue`.
requests_send: mpsc::UnboundedSender<RequestData<B>>,
}
impl<B: BlockT> OnDemand<B> where
B::Header: HeaderT,
{
/// Creates new on-demand service.
pub fn new(checker: Arc<FetchChecker<B>>) -> Self {
let (requests_send, requests_queue) = mpsc::unbounded();
let requests_queue = Mutex::new(Some(requests_queue));
OnDemand {
checker,
requests_queue,
requests_send,
}
}
/// Get checker reference.
pub fn checker(&self) -> &Arc<FetchChecker<B>> {
&self.checker
}
/// Extracts the queue of requests.
///
/// Whenever one of the methods of the `Fetcher` trait is called, an element is pushed on this
/// channel.
///
/// If this function returns `None`, that means that the receiver has already been extracted in
/// the past, and therefore that something already handles the requests.
pub(crate) fn extract_receiver(&self) -> Option<mpsc::UnboundedReceiver<RequestData<B>>> {
self.requests_queue.lock().take()
}
}
impl<B> Fetcher<B> for OnDemand<B> where
B: BlockT,
B::Header: HeaderT,
{
type RemoteHeaderResult = RemoteResponse<B::Header>;
type RemoteReadResult = RemoteResponse<Option<Vec<u8>>>;
type RemoteCallResult = RemoteResponse<Vec<u8>>;
type RemoteChangesResult = RemoteResponse<Vec<(NumberFor<B>, u32)>>;
type RemoteBodyResult = RemoteResponse<Vec<B::Extrinsic>>;
fn remote_header(&self, request: RemoteHeaderRequest<B::Header>) -> Self::RemoteHeaderResult {
let (sender, receiver) = oneshot::channel();
let _ = self.requests_send.unbounded_send(RequestData::RemoteHeader(request, sender));
RemoteResponse { receiver }
}
fn remote_read(&self, request: RemoteReadRequest<B::Header>) -> Self::RemoteReadResult {
let (sender, receiver) = oneshot::channel();
let _ = self.requests_send.unbounded_send(RequestData::RemoteRead(request, sender));
RemoteResponse { receiver }
}
fn remote_read_child(
&self,
request: RemoteReadChildRequest<B::Header>
) -> Self::RemoteReadResult {
let (sender, receiver) = oneshot::channel();
let _ = self.requests_send.unbounded_send(RequestData::RemoteReadChild(request, sender));
RemoteResponse { receiver }
}
fn remote_call(&self, request: RemoteCallRequest<B::Header>) -> Self::RemoteCallResult {
let (sender, receiver) = oneshot::channel();
let _ = self.requests_send.unbounded_send(RequestData::RemoteCall(request, sender));
RemoteResponse { receiver }
}
fn remote_changes(&self, request: RemoteChangesRequest<B::Header>) -> Self::RemoteChangesResult {
let (sender, receiver) = oneshot::channel();
let _ = self.requests_send.unbounded_send(RequestData::RemoteChanges(request, sender));
RemoteResponse { receiver }
}
fn remote_body(&self, request: RemoteBodyRequest<B::Header>) -> Self::RemoteBodyResult {
let (sender, receiver) = oneshot::channel();
let _ = self.requests_send.unbounded_send(RequestData::RemoteBody(request, sender));
RemoteResponse { receiver }
}
}
/// Future for an on-demand remote call response.
pub struct RemoteResponse<T> {
receiver: oneshot::Receiver<Result<T, ClientError>>,
}
impl<T> Future for RemoteResponse<T> {
type Item = T;
type Error = ClientError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.receiver.poll()
.map_err(|_| ClientError::RemoteFetchCancelled.into())
.and_then(|r| match r {
Async::Ready(Ok(ready)) => Ok(Async::Ready(ready)),
Async::Ready(Err(error)) => Err(error),
Async::NotReady => Ok(Async::NotReady),
})
}
}
+89 -48
View File
@@ -30,7 +30,7 @@ use crate::message::{
}; };
use crate::message::generic::{Message as GenericMessage, ConsensusMessage}; use crate::message::generic::{Message as GenericMessage, ConsensusMessage};
use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
use crate::on_demand::OnDemandService; use crate::on_demand::{OnDemandCore, OnDemandNetwork, RequestData};
use crate::specialization::NetworkSpecialization; use crate::specialization::NetworkSpecialization;
use crate::sync::{ChainSync, Context as SyncContext, Status as SyncStatus, SyncState}; use crate::sync::{ChainSync, Context as SyncContext, Status as SyncStatus, SyncState};
use crate::service::{TransactionPool, ExHashT}; use crate::service::{TransactionPool, ExHashT};
@@ -41,7 +41,7 @@ use std::sync::Arc;
use std::{cmp, num::NonZeroUsize, time}; use std::{cmp, num::NonZeroUsize, time};
use log::{trace, debug, warn, error}; use log::{trace, debug, warn, error};
use crate::chain::{Client, FinalityProofProvider}; use crate::chain::{Client, FinalityProofProvider};
use client::light::fetcher::ChangesProof; use client::light::fetcher::{FetchChecker, ChangesProof};
use crate::{error, util::LruHashSet}; use crate::{error, util::LruHashSet};
const REQUEST_TIMEOUT_SEC: u64 = 40; const REQUEST_TIMEOUT_SEC: u64 = 40;
@@ -83,7 +83,8 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
/// Interval at which we call `propagate_extrinsics`. /// Interval at which we call `propagate_extrinsics`.
propagate_timeout: tokio::timer::Interval, propagate_timeout: tokio::timer::Interval,
config: ProtocolConfig, config: ProtocolConfig,
on_demand: Option<Arc<OnDemandService<B>>>, /// Handler for on-demand requests.
on_demand_core: OnDemandCore<B>,
genesis_hash: B::Hash, genesis_hash: B::Hash,
sync: ChainSync<B>, sync: ChainSync<B>,
specialization: S, specialization: S,
@@ -159,6 +160,20 @@ pub trait NetworkOut<B: BlockT> {
fn send_message(&mut self, who: PeerId, message: Message<B>); fn send_message(&mut self, who: PeerId, message: Message<B>);
} }
impl<'a, 'b, B: BlockT> OnDemandNetwork<B> for &'a mut &'b mut dyn NetworkOut<B> {
fn report_peer(&mut self, who: &PeerId, reputation: i32) {
NetworkOut::report_peer(**self, who.clone(), reputation)
}
fn disconnect_peer(&mut self, who: &PeerId) {
NetworkOut::disconnect_peer(**self, who.clone())
}
fn send_request(&mut self, who: &PeerId, message: Message<B>) {
NetworkOut::send_message(**self, who.clone(), message)
}
}
/// Context for a network-specific handler. /// Context for a network-specific handler.
pub trait Context<B: BlockT> { pub trait Context<B: BlockT> {
/// Adjusts the reputation of the peer. Use this to point out that a peer has been malign or /// Adjusts the reputation of the peer. Use this to point out that a peer has been malign or
@@ -263,7 +278,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
pub fn new( pub fn new(
config: ProtocolConfig, config: ProtocolConfig,
chain: Arc<Client<B>>, chain: Arc<Client<B>>,
on_demand: Option<Arc<OnDemandService<B>>>, checker: Arc<dyn FetchChecker<B>>,
specialization: S, specialization: S,
) -> error::Result<Protocol<B, S, H>> { ) -> error::Result<Protocol<B, S, H>> {
let info = chain.info()?; let info = chain.info()?;
@@ -276,7 +291,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
peers: HashMap::new(), peers: HashMap::new(),
chain, chain,
}, },
on_demand, on_demand_core: OnDemandCore::new(checker),
genesis_hash: info.chain.genesis_hash, genesis_hash: info.chain.genesis_hash,
sync, sync,
specialization: specialization, specialization: specialization,
@@ -307,6 +322,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
self.sync.status().is_offline() self.sync.status().is_offline()
} }
/// Starts a new data demand request.
///
/// The parameter contains a `Sender` where the result, once received, must be sent.
pub(crate) fn add_on_demand_request(&mut self, mut network_out: &mut dyn NetworkOut<B>, rq: RequestData<B>) {
self.on_demand_core.add_request(&mut network_out, rq);
}
pub fn poll( pub fn poll(
&mut self, &mut self,
network_out: &mut dyn NetworkOut<B>, network_out: &mut dyn NetworkOut<B>,
@@ -324,7 +346,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
} }
fn is_on_demand_response(&self, who: &PeerId, response_id: message::RequestId) -> bool { fn is_on_demand_response(&self, who: &PeerId, response_id: message::RequestId) -> bool {
self.on_demand.as_ref().map_or(false, |od| od.is_on_demand_response(&who, response_id)) self.on_demand_core.is_on_demand_response(&who, response_id)
} }
fn handle_response( fn handle_response(
@@ -378,7 +400,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
GenericMessage::BlockResponse(r) => { GenericMessage::BlockResponse(r) => {
// Note, this is safe because only `ordinary bodies` and `remote bodies` are received in this matter. // Note, this is safe because only `ordinary bodies` and `remote bodies` are received in this matter.
if self.is_on_demand_response(&who, r.id) { if self.is_on_demand_response(&who, r.id) {
self.on_remote_body_response(who, r); self.on_remote_body_response(network_out, who, r);
} else { } else {
if let Some(request) = self.handle_response(network_out, who.clone(), &r) { if let Some(request) = self.handle_response(network_out, who.clone(), &r) {
let outcome = self.on_block_response(network_out, who.clone(), request, r); let outcome = self.on_block_response(network_out, who.clone(), request, r);
@@ -394,13 +416,20 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
GenericMessage::Transactions(m) => GenericMessage::Transactions(m) =>
self.on_extrinsics(network_out, transaction_pool, who, m), self.on_extrinsics(network_out, transaction_pool, who, m),
GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(network_out, who, request), GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(network_out, who, request),
GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(who, response), GenericMessage::RemoteCallResponse(response) =>
GenericMessage::RemoteReadRequest(request) => self.on_remote_read_request(network_out, who, request), self.on_remote_call_response(network_out, who, response),
GenericMessage::RemoteReadResponse(response) => self.on_remote_read_response(who, response), GenericMessage::RemoteReadRequest(request) =>
GenericMessage::RemoteHeaderRequest(request) => self.on_remote_header_request(network_out, who, request), self.on_remote_read_request(network_out, who, request),
GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(who, response), GenericMessage::RemoteReadResponse(response) =>
GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(network_out, who, request), self.on_remote_read_response(network_out, who, response),
GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(who, response), GenericMessage::RemoteHeaderRequest(request) =>
self.on_remote_header_request(network_out, who, request),
GenericMessage::RemoteHeaderResponse(response) =>
self.on_remote_header_response(network_out, who, response),
GenericMessage::RemoteChangesRequest(request) =>
self.on_remote_changes_request(network_out, who, request),
GenericMessage::RemoteChangesResponse(response) =>
self.on_remote_changes_response(network_out, who, response),
GenericMessage::FinalityProofRequest(request) => GenericMessage::FinalityProofRequest(request) =>
self.on_finality_proof_request(network_out, who, request, finality_proof_provider), self.on_finality_proof_request(network_out, who, request, finality_proof_provider),
GenericMessage::FinalityProofResponse(response) => GenericMessage::FinalityProofResponse(response) =>
@@ -480,7 +509,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
} }
/// Called by peer when it is disconnecting /// Called by peer when it is disconnecting
pub fn on_peer_disconnected(&mut self, network_out: &mut dyn NetworkOut<B>, peer: PeerId, debug_info: String) { pub fn on_peer_disconnected(&mut self, mut network_out: &mut dyn NetworkOut<B>, peer: PeerId, debug_info: String) {
trace!(target: "sync", "Disconnecting {}: {}", peer, debug_info); trace!(target: "sync", "Disconnecting {}: {}", peer, debug_info);
// lock all the the peer lists so that add/remove peer events are in order // lock all the the peer lists so that add/remove peer events are in order
let removed = { let removed = {
@@ -494,7 +523,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
} }
self.sync.peer_disconnected(&mut context, peer.clone()); self.sync.peer_disconnected(&mut context, peer.clone());
self.specialization.on_disconnect(&mut context, peer.clone()); self.specialization.on_disconnect(&mut context, peer.clone());
self.on_demand.as_ref().map(|s| s.on_disconnect(peer)); self.on_demand_core.on_disconnect(&mut network_out, peer);
} }
} }
@@ -514,7 +543,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
} }
} }
fn on_block_request(&mut self, network_out: &mut dyn NetworkOut<B>, peer: PeerId, request: message::BlockRequest<B>) { fn on_block_request(
&mut self,
network_out: &mut dyn NetworkOut<B>,
peer: PeerId,
request: message::BlockRequest<B>
) {
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}",
request.id, request.id,
peer, peer,
@@ -643,13 +677,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Perform time based maintenance. /// Perform time based maintenance.
/// ///
/// > **Note**: This method normally doesn't have to be called except for testing purposes. /// > **Note**: This method normally doesn't have to be called except for testing purposes.
pub fn tick(&mut self, network_out: &mut dyn NetworkOut<B>) { pub fn tick(&mut self, mut network_out: &mut dyn NetworkOut<B>) {
self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, network_out)); self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, network_out));
self.maintain_peers(network_out); self.maintain_peers(network_out);
self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, network_out)); self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, network_out));
self.on_demand self.on_demand_core.maintain_peers(&mut network_out);
.as_ref()
.map(|s| s.maintain_peers());
} }
fn maintain_peers(&mut self, network_out: &mut dyn NetworkOut<B>) { fn maintain_peers(&mut self, network_out: &mut dyn NetworkOut<B>) {
@@ -681,7 +713,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
} }
/// Called by peer to report status /// Called by peer to report status
fn on_status_message(&mut self, network_out: &mut dyn NetworkOut<B>, who: PeerId, status: message::Status<B>) { fn on_status_message(&mut self, mut network_out: &mut dyn NetworkOut<B>, who: PeerId, status: message::Status<B>) {
trace!(target: "sync", "New peer {} {:?}", who, status); trace!(target: "sync", "New peer {} {:?}", who, status);
let protocol_version = { let protocol_version = {
if self.context_data.peers.contains_key(&who) { if self.context_data.peers.contains_key(&who) {
@@ -756,10 +788,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
status.version status.version
}; };
self.on_demand_core.on_connect(&mut network_out, who.clone(), status.roles, status.best_number);
let mut context = ProtocolContext::new(&mut self.context_data, network_out); let mut context = ProtocolContext::new(&mut self.context_data, network_out);
self.on_demand
.as_ref()
.map(|s| s.on_connect(who.clone(), status.roles, status.best_number));
self.sync.new_peer(&mut context, who.clone()); self.sync.new_peer(&mut context, who.clone());
if protocol_version > 2 { if protocol_version > 2 {
self.consensus_gossip.new_peer(&mut context, who.clone(), status.roles); self.consensus_gossip.new_peer(&mut context, who.clone(), status.roles);
@@ -875,7 +905,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
fn on_block_announce( fn on_block_announce(
&mut self, &mut self,
network_out: &mut dyn NetworkOut<B>, mut network_out: &mut dyn NetworkOut<B>,
who: PeerId, who: PeerId,
announce: message::BlockAnnounce<B::Header> announce: message::BlockAnnounce<B::Header>
) { ) {
@@ -886,9 +916,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
peer.known_blocks.insert(hash.clone()); peer.known_blocks.insert(hash.clone());
} }
} }
self.on_demand self.on_demand_core.on_block_announce(&mut network_out, who.clone(), *header.number());
.as_ref()
.map(|s| s.on_block_announce(who.clone(), *header.number()));
self.sync.on_block_announce( self.sync.on_block_announce(
&mut ProtocolContext::new(&mut self.context_data, network_out), &mut ProtocolContext::new(&mut self.context_data, network_out),
who.clone(), who.clone(),
@@ -1029,7 +1057,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Request a finality proof for the given block. /// Request a finality proof for the given block.
/// ///
/// Queues a new finality proof request and tries to dispatch all pending requests. /// Queues a new finality proof request and tries to dispatch all pending requests.
pub fn request_finality_proof(&mut self, network_out: &mut dyn NetworkOut<B>, hash: &B::Hash, number: NumberFor<B>) { pub fn request_finality_proof(
&mut self,
network_out: &mut dyn NetworkOut<B>,
hash: &B::Hash,
number: NumberFor<B>
) {
let mut context = ProtocolContext::new(&mut self.context_data, network_out); let mut context = ProtocolContext::new(&mut self.context_data, network_out);
self.sync.request_finality_proof(&hash, number, &mut context); self.sync.request_finality_proof(&hash, number, &mut context);
} }
@@ -1042,11 +1075,14 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
self.sync.finality_proof_import_result(request_block, finalization_result) self.sync.finality_proof_import_result(request_block, finalization_result)
} }
fn on_remote_call_response(&mut self, who: PeerId, response: message::RemoteCallResponse) { fn on_remote_call_response(
&mut self,
mut network_out: &mut dyn NetworkOut<B>,
who: PeerId,
response: message::RemoteCallResponse
) {
trace!(target: "sync", "Remote call response {} from {}", response.id, who); trace!(target: "sync", "Remote call response {} from {}", response.id, who);
self.on_demand self.on_demand_core.on_remote_call_response(&mut network_out, who, response);
.as_ref()
.map(|s| s.on_remote_call_response(who, response));
} }
fn on_remote_read_request( fn on_remote_read_request(
@@ -1079,11 +1115,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}), }),
); );
} }
fn on_remote_read_response(&mut self, who: PeerId, response: message::RemoteReadResponse) {
fn on_remote_read_response(
&mut self,
mut network_out: &mut dyn NetworkOut<B>,
who: PeerId,
response: message::RemoteReadResponse
) {
trace!(target: "sync", "Remote read response {} from {}", response.id, who); trace!(target: "sync", "Remote read response {} from {}", response.id, who);
self.on_demand self.on_demand_core.on_remote_read_response(&mut network_out, who, response);
.as_ref()
.map(|s| s.on_remote_read_response(who, response));
} }
fn on_remote_header_request( fn on_remote_header_request(
@@ -1119,13 +1159,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
fn on_remote_header_response( fn on_remote_header_response(
&mut self, &mut self,
mut network_out: &mut dyn NetworkOut<B>,
who: PeerId, who: PeerId,
response: message::RemoteHeaderResponse<B::Header>, response: message::RemoteHeaderResponse<B::Header>,
) { ) {
trace!(target: "sync", "Remote header proof response {} from {}", response.id, who); trace!(target: "sync", "Remote header proof response {} from {}", response.id, who);
self.on_demand self.on_demand_core.on_remote_header_response(&mut network_out, who, response);
.as_ref()
.map(|s| s.on_remote_header_response(who, response));
} }
fn on_remote_changes_request( fn on_remote_changes_request(
@@ -1182,6 +1221,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
fn on_remote_changes_response( fn on_remote_changes_response(
&mut self, &mut self,
mut network_out: &mut dyn NetworkOut<B>,
who: PeerId, who: PeerId,
response: message::RemoteChangesResponse<NumberFor<B>, B::Hash>, response: message::RemoteChangesResponse<NumberFor<B>, B::Hash>,
) { ) {
@@ -1190,9 +1230,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
who, who,
response.max response.max
); );
self.on_demand self.on_demand_core.on_remote_changes_response(&mut network_out, who, response);
.as_ref()
.map(|s| s.on_remote_changes_response(who, response));
} }
fn on_finality_proof_request( fn on_finality_proof_request(
@@ -1250,10 +1288,13 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
} }
} }
fn on_remote_body_response(&self, peer: PeerId, response: message::BlockResponse<B>) { fn on_remote_body_response(
self.on_demand &mut self,
.as_ref() mut network_out: &mut dyn NetworkOut<B>,
.map(|od| od.on_remote_body_response(peer, response)); peer: PeerId,
response: message::BlockResponse<B>
) {
self.on_demand_core.on_remote_body_response(&mut network_out, peer, response);
} }
} }
+16 -2
View File
@@ -29,9 +29,11 @@ use peerset::PeersetHandle;
use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder}; use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder};
use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};
use crate::AlwaysBadChecker;
use crate::chain::FinalityProofProvider; use crate::chain::FinalityProofProvider;
use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
use crate::message::Message; use crate::message::Message;
use crate::on_demand::RequestData;
use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer}; use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer};
use crate::protocol::{ProtocolStatus, PeerInfo, NetworkOut}; use crate::protocol::{ProtocolStatus, PeerInfo, NetworkOut};
use crate::config::Params; use crate::config::Params;
@@ -216,7 +218,8 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
let protocol = Protocol::new( let protocol = Protocol::new(
params.config, params.config,
params.chain, params.chain,
params.on_demand, params.on_demand.as_ref().map(|od| od.checker().clone())
.unwrap_or(Arc::new(AlwaysBadChecker)),
params.specialization, params.specialization,
)?; )?;
let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect(); let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect();
@@ -234,6 +237,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
status_sinks.clone(), status_sinks.clone(),
params.network_config, params.network_config,
registered, registered,
params.on_demand.and_then(|od| od.extract_receiver()),
)?; )?;
let service = Arc::new(Service { let service = Arc::new(Service {
@@ -531,6 +535,7 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>, status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
config: NetworkConfiguration, config: NetworkConfiguration,
registered: RegisteredProtocol<Message<B>>, registered: RegisteredProtocol<Message<B>>,
on_demand_in: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc<Mutex<NetworkService<Message<B>>>>, PeersetHandle), Error> { ) -> Result<((oneshot::Sender<()>, thread::JoinHandle<()>), Arc<Mutex<NetworkService<Message<B>>>>, PeersetHandle), Error> {
// Start the main service. // Start the main service.
let (service, peerset) = match start_service(config, registered) { let (service, peerset) = match start_service(config, registered) {
@@ -558,7 +563,8 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
network_port, network_port,
protocol_rx, protocol_rx,
status_sinks, status_sinks,
peerset_clone peerset_clone,
on_demand_in
) )
.select(close_rx.then(|_| Ok(()))) .select(close_rx.then(|_| Ok(())))
.map(|(val, _)| val) .map(|(val, _)| val)
@@ -589,6 +595,7 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
mut protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>, mut protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>,
status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>, status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>,
peerset: PeersetHandle, peerset: PeersetHandle,
mut on_demand_in: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
) -> impl Future<Item = (), Error = io::Error> { ) -> impl Future<Item = (), Error = io::Error> {
// Implementation of `protocol::NetworkOut` using the available local variables. // Implementation of `protocol::NetworkOut` using the available local variables.
struct Ctxt<'a, B: BlockT>(&'a mut NetworkService<Message<B>>, &'a PeersetHandle); struct Ctxt<'a, B: BlockT>(&'a mut NetworkService<Message<B>>, &'a PeersetHandle);
@@ -628,6 +635,13 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>(
Err(err) => void::unreachable(err), Err(err) => void::unreachable(err),
} }
// Check for new incoming on-demand requests.
if let Some(on_demand_in) = on_demand_in.as_mut() {
while let Ok(Async::Ready(Some(rq))) = on_demand_in.poll() {
protocol.add_on_demand_request(&mut Ctxt(&mut network_service.lock(), &peerset), rq);
}
}
loop { loop {
match network_port.poll() { match network_port.poll() {
Ok(Async::NotReady) => break, Ok(Async::NotReady) => break,
+3 -2
View File
@@ -24,6 +24,7 @@ mod sync;
use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc; use std::sync::Arc;
use crate::AlwaysBadChecker;
use log::trace; use log::trace;
use crate::chain::FinalityProofProvider; use crate::chain::FinalityProofProvider;
use client::{self, ClientInfo, BlockchainEvents, FinalityNotifications}; use client::{self, ClientInfo, BlockchainEvents, FinalityNotifications};
@@ -956,7 +957,7 @@ pub trait TestNetFactory: Sized {
let protocol = Protocol::new( let protocol = Protocol::new(
config.clone(), config.clone(),
client.clone(), client.clone(),
None, Arc::new(AlwaysBadChecker),
specialization, specialization,
).unwrap(); ).unwrap();
@@ -1010,7 +1011,7 @@ pub trait TestNetFactory: Sized {
let protocol = Protocol::new( let protocol = Protocol::new(
config, config,
client.clone(), client.clone(),
None, Arc::new(AlwaysBadChecker),
specialization, specialization,
).unwrap(); ).unwrap();
+1 -4
View File
@@ -182,7 +182,7 @@ impl<Components: components::Components> Service<Components> {
network_config: config.network.clone(), network_config: config.network.clone(),
chain: client.clone(), chain: client.clone(),
finality_proof_provider, finality_proof_provider,
on_demand: on_demand.as_ref().map(|d| d.clone() as _), on_demand,
transaction_pool: transaction_pool_adapter.clone() as _, transaction_pool: transaction_pool_adapter.clone() as _,
specialization: network_protocol, specialization: network_protocol,
}; };
@@ -202,9 +202,6 @@ impl<Components: components::Components> Service<Components> {
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
let network = network::Service::new(network_params, protocol_id, import_queue)?; let network = network::Service::new(network_params, protocol_id, import_queue)?;
if let Some(on_demand) = on_demand.as_ref() {
on_demand.set_network_interface(Box::new(Arc::downgrade(&network)));
}
let inherents_pool = Arc::new(InherentsPool::default()); let inherents_pool = Arc::new(InherentsPool::default());
let offchain_workers = if config.offchain_worker { let offchain_workers = if config.offchain_worker {