client/network-gossip: Merge GossipEngine and GossipEngineInner (#5042)

* client/network-gossip: Merge GossipEngine and GossipEngineInner

Given that GossipEngine and GossipEngineInner are not shared between
threads anyone (public interface + background tasks), neither depends on
being Send or Sync. Thus one can merge the two as done in this patch.
One only needs to wrap an `Arc<Mutex<>>` around the whole structure when
the owner (e.g. finality-grandpa) needs to share the gossip engine
between threads.

* client/finality-grandpa: Wrap GossipEngine in Arc Mutex & lock it on use

GossipEngine in itself has no need to be Send and Sync, given that it
does not rely on separately spawned background tasks anymore. Given that
finality-grandpa shares the `NetworkBridge` potentially between threads
its components need to be clonable, thus this patch wraps `GossipEngine`
in an `Arc<Mutex<>>`.
This commit is contained in:
Max Inden
2020-03-09 16:31:29 +01:00
committed by GitHub
parent 67f10b9195
commit e8f3c6a686
4 changed files with 43 additions and 84 deletions
-1
View File
@@ -6245,7 +6245,6 @@ dependencies = [
"libp2p", "libp2p",
"log 0.4.8", "log 0.4.8",
"lru", "lru",
"parking_lot 0.10.0",
"sc-network", "sc-network",
"sp-runtime", "sp-runtime",
"wasm-timer", "wasm-timer",
@@ -142,7 +142,7 @@ pub(crate) fn global_topic<B: BlockT>(set_id: SetIdNumber) -> B::Hash {
/// Bridge between the underlying network service, gossiping consensus messages and Grandpa /// Bridge between the underlying network service, gossiping consensus messages and Grandpa
pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> { pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
service: N, service: N,
gossip_engine: GossipEngine<B>, gossip_engine: Arc<Mutex<GossipEngine<B>>>,
validator: Arc<GossipValidator<B>>, validator: Arc<GossipValidator<B>>,
/// Sender side of the neighbor packet channel. /// Sender side of the neighbor packet channel.
@@ -185,12 +185,12 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
); );
let validator = Arc::new(validator); let validator = Arc::new(validator);
let gossip_engine = GossipEngine::new( let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
service.clone(), service.clone(),
GRANDPA_ENGINE_ID, GRANDPA_ENGINE_ID,
GRANDPA_PROTOCOL_NAME, GRANDPA_PROTOCOL_NAME,
validator.clone() validator.clone()
); )));
{ {
// register all previous votes with the gossip service so that they're // register all previous votes with the gossip service so that they're
@@ -214,7 +214,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
} }
); );
gossip_engine.register_gossip_message( gossip_engine.lock().register_gossip_message(
topic, topic,
message.encode(), message.encode(),
); );
@@ -293,7 +293,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
}); });
let topic = round_topic::<B>(round.0, set_id.0); let topic = round_topic::<B>(round.0, set_id.0);
let incoming = self.gossip_engine.messages_for(topic) let incoming = self.gossip_engine.lock().messages_for(topic)
.filter_map(move |notification| { .filter_map(move |notification| {
let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]); let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
@@ -422,11 +422,11 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> { impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
type Output = Result<(), Error>; type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop { loop {
match self.neighbor_packet_worker.lock().poll_next_unpin(cx) { match self.neighbor_packet_worker.lock().poll_next_unpin(cx) {
Poll::Ready(Some((to, packet))) => { Poll::Ready(Some((to, packet))) => {
self.gossip_engine.send_message(to, packet.encode()); self.gossip_engine.lock().send_message(to, packet.encode());
}, },
Poll::Ready(None) => return Poll::Ready( Poll::Ready(None) => return Poll::Ready(
Err(Error::Network("Neighbor packet worker stream closed.".into())) Err(Error::Network("Neighbor packet worker stream closed.".into()))
@@ -438,7 +438,7 @@ impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
loop { loop {
match self.gossip_validator_report_stream.lock().poll_next_unpin(cx) { match self.gossip_validator_report_stream.lock().poll_next_unpin(cx) {
Poll::Ready(Some(PeerReport { who, cost_benefit })) => { Poll::Ready(Some(PeerReport { who, cost_benefit })) => {
self.gossip_engine.report(who, cost_benefit); self.gossip_engine.lock().report(who, cost_benefit);
}, },
Poll::Ready(None) => return Poll::Ready( Poll::Ready(None) => return Poll::Ready(
Err(Error::Network("Gossip validator report stream closed.".into())) Err(Error::Network("Gossip validator report stream closed.".into()))
@@ -447,7 +447,7 @@ impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
} }
} }
match self.gossip_engine.poll_unpin(cx) { match self.gossip_engine.lock().poll_unpin(cx) {
// The gossip engine future finished. We should do the same. // The gossip engine future finished. We should do the same.
Poll::Ready(()) => return Poll::Ready(Ok(())), Poll::Ready(()) => return Poll::Ready(Ok(())),
Poll::Pending => {}, Poll::Pending => {},
@@ -458,7 +458,7 @@ impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
} }
fn incoming_global<B: BlockT>( fn incoming_global<B: BlockT>(
mut gossip_engine: GossipEngine<B>, gossip_engine: Arc<Mutex<GossipEngine<B>>>,
topic: B::Hash, topic: B::Hash,
voters: Arc<VoterSet<AuthorityId>>, voters: Arc<VoterSet<AuthorityId>>,
gossip_validator: Arc<GossipValidator<B>>, gossip_validator: Arc<GossipValidator<B>>,
@@ -467,7 +467,7 @@ fn incoming_global<B: BlockT>(
let process_commit = move | let process_commit = move |
msg: FullCommitMessage<B>, msg: FullCommitMessage<B>,
mut notification: sc_network_gossip::TopicNotification, mut notification: sc_network_gossip::TopicNotification,
gossip_engine: &mut GossipEngine<B>, gossip_engine: &Arc<Mutex<GossipEngine<B>>>,
gossip_validator: &Arc<GossipValidator<B>>, gossip_validator: &Arc<GossipValidator<B>>,
voters: &VoterSet<AuthorityId>, voters: &VoterSet<AuthorityId>,
| { | {
@@ -491,7 +491,7 @@ fn incoming_global<B: BlockT>(
msg.set_id, msg.set_id,
) { ) {
if let Some(who) = notification.sender { if let Some(who) = notification.sender {
gossip_engine.report(who, cost); gossip_engine.lock().report(who, cost);
} }
return None; return None;
@@ -513,12 +513,12 @@ fn incoming_global<B: BlockT>(
|to, neighbor| neighbor_sender.send(to, neighbor), |to, neighbor| neighbor_sender.send(to, neighbor),
); );
gossip_engine.gossip_message(topic, notification.message.clone(), false); gossip_engine.lock().gossip_message(topic, notification.message.clone(), false);
} }
voter::CommitProcessingOutcome::Bad(_) => { voter::CommitProcessingOutcome::Bad(_) => {
// report peer and do not gossip. // report peer and do not gossip.
if let Some(who) = notification.sender.take() { if let Some(who) = notification.sender.take() {
gossip_engine.report(who, cost::INVALID_COMMIT); gossip_engine.lock().report(who, cost::INVALID_COMMIT);
} }
} }
}; };
@@ -531,7 +531,7 @@ fn incoming_global<B: BlockT>(
let process_catch_up = move | let process_catch_up = move |
msg: FullCatchUpMessage<B>, msg: FullCatchUpMessage<B>,
mut notification: sc_network_gossip::TopicNotification, mut notification: sc_network_gossip::TopicNotification,
gossip_engine: &mut GossipEngine<B>, gossip_engine: &Arc<Mutex<GossipEngine<B>>>,
gossip_validator: &Arc<GossipValidator<B>>, gossip_validator: &Arc<GossipValidator<B>>,
voters: &VoterSet<AuthorityId>, voters: &VoterSet<AuthorityId>,
| { | {
@@ -544,7 +544,7 @@ fn incoming_global<B: BlockT>(
msg.set_id, msg.set_id,
) { ) {
if let Some(who) = notification.sender { if let Some(who) = notification.sender {
gossip_engine.report(who, cost); gossip_engine.lock().report(who, cost);
} }
return None; return None;
@@ -554,7 +554,7 @@ fn incoming_global<B: BlockT>(
if let voter::CatchUpProcessingOutcome::Bad(_) = outcome { if let voter::CatchUpProcessingOutcome::Bad(_) = outcome {
// report peer // report peer
if let Some(who) = notification.sender.take() { if let Some(who) = notification.sender.take() {
gossip_engine.report(who, cost::INVALID_CATCH_UP); gossip_engine.lock().report(who, cost::INVALID_CATCH_UP);
} }
} }
@@ -566,7 +566,7 @@ fn incoming_global<B: BlockT>(
Some(voter::CommunicationIn::CatchUp(msg.message, cb)) Some(voter::CommunicationIn::CatchUp(msg.message, cb))
}; };
gossip_engine.messages_for(topic) gossip_engine.clone().lock().messages_for(topic)
.filter_map(|notification| { .filter_map(|notification| {
// this could be optimized by decoding piecewise. // this could be optimized by decoding piecewise.
let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]); let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
@@ -578,9 +578,9 @@ fn incoming_global<B: BlockT>(
.filter_map(move |(notification, msg)| { .filter_map(move |(notification, msg)| {
future::ready(match msg { future::ready(match msg {
GossipMessage::Commit(msg) => GossipMessage::Commit(msg) =>
process_commit(msg, notification, &mut gossip_engine, &gossip_validator, &*voters), process_commit(msg, notification, &gossip_engine, &gossip_validator, &*voters),
GossipMessage::CatchUp(msg) => GossipMessage::CatchUp(msg) =>
process_catch_up(msg, notification, &mut gossip_engine, &gossip_validator, &*voters), process_catch_up(msg, notification, &gossip_engine, &gossip_validator, &*voters),
_ => { _ => {
debug!(target: "afg", "Skipping unknown message type"); debug!(target: "afg", "Skipping unknown message type");
None None
@@ -688,7 +688,7 @@ pub(crate) struct OutgoingMessages<Block: BlockT> {
set_id: SetIdNumber, set_id: SetIdNumber,
locals: Option<(AuthorityPair, AuthorityId)>, locals: Option<(AuthorityPair, AuthorityId)>,
sender: mpsc::Sender<SignedMessage<Block>>, sender: mpsc::Sender<SignedMessage<Block>>,
network: GossipEngine<Block>, network: Arc<Mutex<GossipEngine<Block>>>,
has_voted: HasVoted<Block>, has_voted: HasVoted<Block>,
} }
@@ -754,11 +754,11 @@ impl<Block: BlockT> Sink<Message<Block>> for OutgoingMessages<Block>
); );
// announce the block we voted on to our peers. // announce the block we voted on to our peers.
self.network.announce(target_hash, Vec::new()); self.network.lock().announce(target_hash, Vec::new());
// propagate the message to peers // propagate the message to peers
let topic = round_topic::<Block>(self.round, self.set_id); let topic = round_topic::<Block>(self.round, self.set_id);
self.network.gossip_message(topic, message.encode(), false); self.network.lock().gossip_message(topic, message.encode(), false);
// forward the message to the inner sender. // forward the message to the inner sender.
return self.sender.start_send(signed).map_err(|e| { return self.sender.start_send(signed).map_err(|e| {
@@ -959,7 +959,7 @@ fn check_catch_up<Block: BlockT>(
/// An output sink for commit messages. /// An output sink for commit messages.
struct CommitsOut<Block: BlockT> { struct CommitsOut<Block: BlockT> {
network: GossipEngine<Block>, network: Arc<Mutex<GossipEngine<Block>>>,
set_id: SetId, set_id: SetId,
is_voter: bool, is_voter: bool,
gossip_validator: Arc<GossipValidator<Block>>, gossip_validator: Arc<GossipValidator<Block>>,
@@ -969,7 +969,7 @@ struct CommitsOut<Block: BlockT> {
impl<Block: BlockT> CommitsOut<Block> { impl<Block: BlockT> CommitsOut<Block> {
/// Create a new commit output stream. /// Create a new commit output stream.
pub(crate) fn new( pub(crate) fn new(
network: GossipEngine<Block>, network: Arc<Mutex<GossipEngine<Block>>>,
set_id: SetIdNumber, set_id: SetIdNumber,
is_voter: bool, is_voter: bool,
gossip_validator: Arc<GossipValidator<Block>>, gossip_validator: Arc<GossipValidator<Block>>,
@@ -1028,7 +1028,7 @@ impl<Block: BlockT> Sink<(RoundNumber, Commit<Block>)> for CommitsOut<Block> {
commit.target_number, commit.target_number,
|to, neighbor| self.neighbor_sender.send(to, neighbor), |to, neighbor| self.neighbor_sender.send(to, neighbor),
); );
self.network.gossip_message(topic, message.encode(), false); self.network.lock().gossip_message(topic, message.encode(), false);
Ok(()) Ok(())
} }
@@ -16,7 +16,6 @@ futures-timer = "3.0.1"
libp2p = { version = "0.16.2", default-features = false, features = ["libp2p-websocket"] } libp2p = { version = "0.16.2", default-features = false, features = ["libp2p-websocket"] }
log = "0.4.8" log = "0.4.8"
lru = "0.4.3" lru = "0.4.3"
parking_lot = "0.10.0"
sc-network = { version = "0.8.0-alpha.2", path = "../network" } sc-network = { version = "0.8.0-alpha.2", path = "../network" }
sp-runtime = { version = "2.0.0-alpha.2", path = "../../primitives/runtime" } sp-runtime = { version = "2.0.0-alpha.2", path = "../../primitives/runtime" }
wasm-timer = "0.2" wasm-timer = "0.2"
+17 -56
View File
@@ -22,18 +22,12 @@ use sc_network::{Event, ReputationChange};
use futures::{prelude::*, channel::mpsc}; use futures::{prelude::*, channel::mpsc};
use libp2p::PeerId; use libp2p::PeerId;
use parking_lot::Mutex;
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use std::{borrow::Cow, pin::Pin, sync::Arc, task::{Context, Poll}}; use std::{borrow::Cow, pin::Pin, sync::Arc, task::{Context, Poll}};
/// Wraps around an implementation of the `Network` crate and provides gossiping capabilities on /// Wraps around an implementation of the `Network` crate and provides gossiping capabilities on
/// top of it. /// top of it.
pub struct GossipEngine<B: BlockT> { pub struct GossipEngine<B: BlockT> {
inner: Arc<Mutex<GossipEngineInner<B>>>,
engine_id: ConsensusEngineId,
}
struct GossipEngineInner<B: BlockT> {
state_machine: ConsensusGossip<B>, state_machine: ConsensusGossip<B>,
network: Box<dyn Network<B> + Send>, network: Box<dyn Network<B> + Send>,
periodic_maintenance_interval: futures_timer::Delay, periodic_maintenance_interval: futures_timer::Delay,
@@ -41,7 +35,7 @@ struct GossipEngineInner<B: BlockT> {
engine_id: ConsensusEngineId, engine_id: ConsensusEngineId,
} }
impl<B: BlockT> Unpin for GossipEngineInner<B> {} impl<B: BlockT> Unpin for GossipEngine<B> {}
impl<B: BlockT> GossipEngine<B> { impl<B: BlockT> GossipEngine<B> {
/// Create a new instance. /// Create a new instance.
@@ -60,24 +54,17 @@ impl<B: BlockT> GossipEngine<B> {
network.register_notifications_protocol(engine_id, protocol_name.into()); network.register_notifications_protocol(engine_id, protocol_name.into());
state_machine.register_validator(&mut network, engine_id, validator); state_machine.register_validator(&mut network, engine_id, validator);
let inner = Arc::new(Mutex::new(GossipEngineInner { GossipEngine {
state_machine, state_machine,
network: Box::new(network), network: Box::new(network),
periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL), periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
network_event_stream, network_event_stream,
engine_id, engine_id,
})); }
let gossip_engine = GossipEngine {
inner: inner.clone(),
engine_id,
};
gossip_engine
} }
pub fn report(&self, who: PeerId, reputation: ReputationChange) { pub fn report(&self, who: PeerId, reputation: ReputationChange) {
self.inner.lock().network.report_peer(who, reputation); self.network.report_peer(who, reputation);
} }
/// Registers a message without propagating it to any peers. The message /// Registers a message without propagating it to any peers. The message
@@ -86,7 +73,7 @@ impl<B: BlockT> GossipEngine<B> {
/// message is already expired it should be dropped on the next garbage /// message is already expired it should be dropped on the next garbage
/// collection. /// collection.
pub fn register_gossip_message( pub fn register_gossip_message(
&self, &mut self,
topic: B::Hash, topic: B::Hash,
message: Vec<u8>, message: Vec<u8>,
) { ) {
@@ -95,38 +82,34 @@ impl<B: BlockT> GossipEngine<B> {
data: message, data: message,
}; };
self.inner.lock().state_machine.register_message(topic, message); self.state_machine.register_message(topic, message);
} }
/// Broadcast all messages with given topic. /// Broadcast all messages with given topic.
pub fn broadcast_topic(&self, topic: B::Hash, force: bool) { pub fn broadcast_topic(&mut self, topic: B::Hash, force: bool) {
let mut inner = self.inner.lock(); self.state_machine.broadcast_topic(&mut *self.network, topic, force);
let inner = &mut *inner;
inner.state_machine.broadcast_topic(&mut *inner.network, topic, force);
} }
/// Get data of valid, incoming messages for a topic (but might have expired meanwhile). /// Get data of valid, incoming messages for a topic (but might have expired meanwhile).
pub fn messages_for(&self, topic: B::Hash) pub fn messages_for(&mut self, topic: B::Hash)
-> mpsc::UnboundedReceiver<TopicNotification> -> mpsc::UnboundedReceiver<TopicNotification>
{ {
self.inner.lock().state_machine.messages_for(self.engine_id, topic) self.state_machine.messages_for(self.engine_id, topic)
} }
/// Send all messages with given topic to a peer. /// Send all messages with given topic to a peer.
pub fn send_topic( pub fn send_topic(
&self, &mut self,
who: &PeerId, who: &PeerId,
topic: B::Hash, topic: B::Hash,
force: bool force: bool
) { ) {
let mut inner = self.inner.lock(); self.state_machine.send_topic(&mut *self.network, who, topic, self.engine_id, force)
let inner = &mut *inner;
inner.state_machine.send_topic(&mut *inner.network, who, topic, self.engine_id, force)
} }
/// Multicast a message to all peers. /// Multicast a message to all peers.
pub fn gossip_message( pub fn gossip_message(
&self, &mut self,
topic: B::Hash, topic: B::Hash,
message: Vec<u8>, message: Vec<u8>,
force: bool, force: bool,
@@ -136,19 +119,14 @@ impl<B: BlockT> GossipEngine<B> {
data: message, data: message,
}; };
let mut inner = self.inner.lock(); self.state_machine.multicast(&mut *self.network, topic, message, force)
let inner = &mut *inner;
inner.state_machine.multicast(&mut *inner.network, topic, message, force)
} }
/// Send addressed message to the given peers. The message is not kept or multicast /// Send addressed message to the given peers. The message is not kept or multicast
/// later on. /// later on.
pub fn send_message(&self, who: Vec<sc_network::PeerId>, data: Vec<u8>) { pub fn send_message(&mut self, who: Vec<sc_network::PeerId>, data: Vec<u8>) {
let mut inner = self.inner.lock();
let inner = &mut *inner;
for who in &who { for who in &who {
inner.state_machine.send_message(&mut *inner.network, who, ConsensusMessage { self.state_machine.send_message(&mut *self.network, who, ConsensusMessage {
engine_id: self.engine_id, engine_id: self.engine_id,
data: data.clone(), data: data.clone(),
}); });
@@ -160,21 +138,13 @@ impl<B: BlockT> GossipEngine<B> {
/// Note: this method isn't strictly related to gossiping and should eventually be moved /// Note: this method isn't strictly related to gossiping and should eventually be moved
/// somewhere else. /// somewhere else.
pub fn announce(&self, block: B::Hash, associated_data: Vec<u8>) { pub fn announce(&self, block: B::Hash, associated_data: Vec<u8>) {
self.inner.lock().network.announce(block, associated_data); self.network.announce(block, associated_data);
} }
} }
impl<B: BlockT> Future for GossipEngine<B> { impl<B: BlockT> Future for GossipEngine<B> {
type Output = (); type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.inner.lock().poll_unpin(cx)
}
}
impl<B: BlockT> Future for GossipEngineInner<B> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = &mut *self; let this = &mut *self;
@@ -216,12 +186,3 @@ impl<B: BlockT> Future for GossipEngineInner<B> {
Poll::Pending Poll::Pending
} }
} }
impl<B: BlockT> Clone for GossipEngine<B> {
fn clone(&self) -> Self {
GossipEngine {
inner: self.inner.clone(),
engine_id: self.engine_id.clone(),
}
}
}