Import queue API revamp (#2856)

* ImportQueue methods are now mut

* Link methods are now mut

* Remove Arc from BasicSyncQueue

* Fix tests

* Remove BasicSyncQueue

* Change the import queue API

* Add buffered_link

* Remove obsolete tests

* Comments and style improvement pass

* Fix grandpa and comment cleanup

* Update core/consensus/common/src/import_queue.rs

Co-Authored-By: André Silva <andre.beat@gmail.com>
This commit is contained in:
Pierre Krieger
2019-06-19 03:50:48 +02:00
committed by DemiMarie-parity
parent de6d541c74
commit 7efc504d59
9 changed files with 566 additions and 858 deletions
+72 -85
View File
@@ -74,79 +74,6 @@ pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync {
fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>);
}
/// A link implementation that connects to the network.
#[derive(Clone)]
pub struct NetworkLink<B: BlockT, S: NetworkSpecialization<B>> {
/// The protocol sender
pub(crate) protocol_sender: mpsc::UnboundedSender<ProtocolMsg<B, S>>,
/// The network sender
pub(crate) network_sender: mpsc::UnboundedSender<NetworkMsg<B>>,
}
impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> {
fn block_imported(&self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::BlockImportedSync(hash.clone(), number));
}
fn blocks_processed(&self, processed_blocks: Vec<B::Hash>, has_error: bool) {
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::BlocksProcessed(processed_blocks, has_error));
}
fn justification_imported(&self, who: PeerId, hash: &B::Hash, number: NumberFor<B>, success: bool) {
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success));
if !success {
info!("Invalid justification provided by {} for #{}", who, hash);
let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who.clone(), i32::min_value()));
let _ = self.network_sender.unbounded_send(NetworkMsg::DisconnectPeer(who.clone()));
}
}
fn clear_justification_requests(&self) {
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::ClearJustificationRequests);
}
fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RequestJustification(hash.clone(), number));
}
fn request_finality_proof(&self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RequestFinalityProof(
hash.clone(),
number,
));
}
fn finality_proof_imported(
&self,
who: PeerId,
request_block: (B::Hash, NumberFor<B>),
finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
) {
let success = finalization_result.is_ok();
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::FinalityProofImportResult(
request_block,
finalization_result,
));
if !success {
info!("Invalid finality proof provided by {} for #{}", who, request_block.0);
let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who.clone(), i32::min_value()));
let _ = self.network_sender.unbounded_send(NetworkMsg::DisconnectPeer(who.clone()));
}
}
fn report_peer(&self, who: PeerId, reputation_change: i32) {
let _ = self.network_sender.unbounded_send(NetworkMsg::ReportPeer(who, reputation_change));
}
fn restart(&self) {
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::RestartSync);
}
fn set_finality_proof_request_builder(&self, request_builder: SharedFinalityProofRequestBuilder<B>) {
let _ = self.protocol_sender.unbounded_send(ProtocolMsg::SetFinalityProofRequestBuilder(request_builder));
}
}
/// A cloneable handle for reporting cost/benefits of peers.
#[derive(Clone)]
pub struct ReportHandle {
@@ -197,13 +124,6 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
let (protocol_sender, protocol_rx) = mpsc::unbounded();
let status_sinks = Arc::new(Mutex::new(Vec::new()));
// connect the import-queue to the network service.
let link = NetworkLink {
protocol_sender: protocol_sender.clone(),
network_sender: network_chan.clone(),
};
params.import_queue.start(Box::new(link))?;
// Start in off-line mode, since we're not connected to any nodes yet.
let is_offline = Arc::new(AtomicBool::new(true));
let is_major_syncing = Arc::new(AtomicBool::new(false));
@@ -592,7 +512,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// Implementation of `protocol::NetworkOut` using the available local variables.
// Implementation of `protocol::NetworkOut` trait using the available local variables.
struct Context<'a, B: BlockT>(&'a mut Swarm<B>, &'a PeersetHandle);
impl<'a, B: BlockT> NetworkOut<B> for Context<'a, B> {
fn report_peer(&mut self, who: PeerId, reputation: i32) {
@@ -606,11 +526,74 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
}
}
// Implementation of `import_queue::Link` trait using the available local variables.
struct NetworkLink<'a, B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
protocol: &'a mut Protocol<B, S, H>,
context: Context<'a, B>,
}
impl<'a, B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Link<B> for NetworkLink<'a, B, S, H> {
fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.protocol.block_imported(&hash, number)
}
fn blocks_processed(&mut self, hashes: Vec<B::Hash>, has_error: bool) {
self.protocol.blocks_processed(&mut self.context, hashes, has_error)
}
fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor<B>, success: bool) {
self.protocol.justification_import_result(hash.clone(), number, success);
if !success {
info!("Invalid justification provided by {} for #{}", who, hash);
self.context.0.user_protocol_mut().disconnect_peer(&who);
self.context.1.report_peer(who, i32::min_value());
}
}
fn clear_justification_requests(&mut self) {
self.protocol.clear_justification_requests()
}
fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.protocol.request_justification(&mut self.context, hash, number)
}
fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>) {
self.protocol.request_finality_proof(&mut self.context, hash, number)
}
fn finality_proof_imported(
&mut self,
who: PeerId,
request_block: (B::Hash, NumberFor<B>),
finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
) {
let success = finalization_result.is_ok();
self.protocol.finality_proof_import_result(request_block, finalization_result);
if !success {
info!("Invalid finality proof provided by {} for #{}", who, request_block.0);
self.context.0.user_protocol_mut().disconnect_peer(&who);
self.context.1.report_peer(who, i32::min_value());
}
}
fn report_peer(&mut self, who: PeerId, reputation_change: i32) {
self.context.1.report_peer(who, reputation_change)
}
fn restart(&mut self) {
self.protocol.restart(&mut self.context)
}
fn set_finality_proof_request_builder(&mut self, builder: SharedFinalityProofRequestBuilder<B>) {
self.protocol.set_finality_proof_request_builder(builder)
}
}
while let Ok(Async::Ready(_)) = self.status_interval.poll() {
let status = self.protocol.status();
self.status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok());
}
{
let mut network_service = self.network_service.lock();
let mut link = NetworkLink {
protocol: &mut self.protocol,
context: Context(&mut network_service, &self.peerset),
};
self.import_queue.poll_actions(&mut link);
}
while let Ok(Async::Ready(_)) = self.connected_peers_interval.poll() {
let infos = self.protocol.peers_info().map(|(id, info)| {
(id.clone(), ConnectedPeer { peer_info: info.clone() })
@@ -618,10 +601,14 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
*self.peers.write() = infos;
}
match self.protocol.poll(&mut Context(&mut self.network_service.lock(), &self.peerset), &*self.transaction_pool) {
Ok(Async::Ready(v)) => void::unreachable(v),
Ok(Async::NotReady) => {}
Err(err) => void::unreachable(err),
{
let mut network_service = self.network_service.lock();
let mut ctxt = Context(&mut *network_service, &self.peerset);
match self.protocol.poll(&mut ctxt, &*self.transaction_pool) {
Ok(Async::Ready(v)) => void::unreachable(v),
Ok(Async::NotReady) => {}
Err(err) => void::unreachable(err),
}
}
// Check for new incoming on-demand requests.