// Copyright 2019-2020 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . //! Periodic rebroadcast of neighbor packets. use futures_timer::Delay; use futures::{channel::mpsc, future::{FutureExt as _}, prelude::*, ready, stream::Stream}; use log::debug; use std::{pin::Pin, task::{Context, Poll}, time::Duration}; use sc_network::PeerId; use sp_runtime::traits::{NumberFor, Block as BlockT}; use super::gossip::{NeighborPacket, GossipMessage}; // How often to rebroadcast, in cases where no new packets are created. const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60); /// A sender used to send neighbor packets to a background job. #[derive(Clone)] pub(super) struct NeighborPacketSender( mpsc::UnboundedSender<(Vec, NeighborPacket>)> ); impl NeighborPacketSender { /// Send a neighbor packet for the background worker to gossip to peers. pub fn send( &self, who: Vec, neighbor_packet: NeighborPacket>, ) { if let Err(err) = self.0.unbounded_send((who, neighbor_packet)) { debug!(target: "afg", "Failed to send neighbor packet: {:?}", err); } } } /// NeighborPacketWorker is listening on a channel for new neighbor packets being produced by /// components within `finality-grandpa` and forwards those packets to the underlying /// `NetworkEngine` through the `NetworkBridge` that it is being polled by (see `Stream` /// implementation). Periodically it sends out the last packet in cases where no new ones arrive. pub(super) struct NeighborPacketWorker { last: Option<(Vec, NeighborPacket>)>, delay: Delay, rx: mpsc::UnboundedReceiver<(Vec, NeighborPacket>)>, } impl Unpin for NeighborPacketWorker {} impl NeighborPacketWorker { pub(super) fn new() -> (Self, NeighborPacketSender){ let (tx, rx) = mpsc::unbounded::<(Vec, NeighborPacket>)>(); let delay = Delay::new(REBROADCAST_AFTER); (NeighborPacketWorker { last: None, delay, rx, }, NeighborPacketSender(tx)) } } impl Stream for NeighborPacketWorker { type Item = (Vec, GossipMessage); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = &mut *self; match this.rx.poll_next_unpin(cx) { Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(Some((to, packet))) => { this.delay.reset(REBROADCAST_AFTER); this.last = Some((to.clone(), packet.clone())); return Poll::Ready(Some((to, GossipMessage::::from(packet.clone())))); } // Don't return yet, maybe the timer fired. Poll::Pending => {}, }; ready!(this.delay.poll_unpin(cx)); // Getting this far here implies that the timer fired. this.delay.reset(REBROADCAST_AFTER); // Make sure the underlying task is scheduled for wake-up. // // Note: In case poll_unpin is called after the resetted delay fires again, this // will drop one tick. Deemed as very unlikely and also not critical. while let Poll::Ready(()) = this.delay.poll_unpin(cx) {}; if let Some((ref to, ref packet)) = this.last { return Poll::Ready(Some((to.clone(), GossipMessage::::from(packet.clone())))); } return Poll::Pending; } }