diff --git a/substrate/core/finality-grandpa/src/communication/mod.rs b/substrate/core/finality-grandpa/src/communication/mod.rs index 395f820254..e4343352d9 100644 --- a/substrate/core/finality-grandpa/src/communication/mod.rs +++ b/substrate/core/finality-grandpa/src/communication/mod.rs @@ -99,6 +99,12 @@ pub trait Network: Clone + Send + 'static { /// Only should be used in case of consensus stall. fn gossip_message(&self, topic: Block::Hash, data: Vec, force: bool); + /// Register a message with the gossip service, it isn't broadcast right + /// away to any peers, but may be sent to new peers joining or when asked to + /// broadcast the topic. Useful to register previous messages on node + /// startup. + fn register_gossip_message(&self, topic: Block::Hash, data: Vec); + /// Send a message to a bunch of specific peers, even if they've seen it already. fn send_message(&self, who: Vec, data: Vec); @@ -145,11 +151,21 @@ impl Network for Arc> where engine_id: GRANDPA_ENGINE_ID, data, }; + self.with_gossip( move |gossip, ctx| gossip.multicast(ctx, topic, msg, force) ) } + fn register_gossip_message(&self, topic: B::Hash, data: Vec) { + let msg = ConsensusMessage { + engine_id: GRANDPA_ENGINE_ID, + data, + }; + + self.with_gossip(move |gossip, _| gossip.register_message(topic, msg)) + } + fn send_message(&self, who: Vec, data: Vec) { let msg = ConsensusMessage { engine_id: GRANDPA_ENGINE_ID, @@ -212,9 +228,12 @@ pub(crate) struct NetworkBridge> { impl> NetworkBridge { /// Create a new NetworkBridge to the given NetworkService. Returns the service /// handle and a future that must be polled to completion to finish startup. + /// If a voter set state is given it registers previous round votes with the + /// gossip service. pub(crate) fn new( service: N, config: crate::Config, + set_state: Option<(u64, &crate::environment::VoterSetState)>, on_exit: impl Future + Clone + Send + 'static, ) -> ( Self, @@ -225,6 +244,41 @@ impl> NetworkBridge { let validator = Arc::new(validator); service.register_validator(validator.clone()); + if let Some((set_id, set_state)) = set_state { + // register all previous votes with the gossip service so that they're + // available to peers potentially stuck on a previous round. + for round in set_state.completed_rounds().iter() { + let topic = round_topic::(round.number, set_id); + + // we need to note the round with the gossip validator otherwise + // messages will be ignored. + validator.note_round(Round(round.number), SetId(set_id), |_, _| {}); + + for signed in round.votes.iter() { + let message = gossip::GossipMessage::VoteOrPrecommit( + gossip::VoteOrPrecommitMessage:: { + message: signed.clone(), + round: Round(round.number), + set_id: SetId(set_id), + } + ); + + service.register_gossip_message( + topic, + message.encode(), + ); + } + + trace!(target: "afg", + "Registered {} messages for topic {:?} (round: {}, set_id: {})", + round.votes.len(), + topic, + round.number, + set_id, + ); + } + } + let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone()); let reporting_job = report_stream.consume(service.clone()); diff --git a/substrate/core/finality-grandpa/src/communication/tests.rs b/substrate/core/finality-grandpa/src/communication/tests.rs index 2ef6d28064..f2b50ab80c 100644 --- a/substrate/core/finality-grandpa/src/communication/tests.rs +++ b/substrate/core/finality-grandpa/src/communication/tests.rs @@ -72,6 +72,15 @@ impl super::Network for TestNetwork { let _ = self.sender.unbounded_send(Event::SendMessage(who, data)); } + /// Register a message with the gossip service, it isn't broadcast right + /// away to any peers, but may be sent to new peers joining or when asked to + /// broadcast the topic. Useful to register previous messages on node + /// startup. + fn register_gossip_message(&self, _topic: Hash, _data: Vec) { + // NOTE: only required to restore previous state on startup + // not required for tests currently + } + /// Report a peer's cost or benefit after some action. fn report(&self, who: network::PeerId, cost_benefit: i32) { let _ = self.sender.unbounded_send(Event::Report(who, cost_benefit)); @@ -136,6 +145,7 @@ fn make_test_network() -> impl Future { let (bridge, startup_work) = super::NetworkBridge::new( net.clone(), config(), + None, Exit, ); diff --git a/substrate/core/finality-grandpa/src/environment.rs b/substrate/core/finality-grandpa/src/environment.rs index 564b7630c4..1f3bcbca96 100644 --- a/substrate/core/finality-grandpa/src/environment.rs +++ b/substrate/core/finality-grandpa/src/environment.rs @@ -101,6 +101,10 @@ impl CompletedRounds { CompletedRounds { inner } } + pub fn iter(&self) -> impl Iterator> { + self.inner.iter() + } + /// Returns the last (latest) completed round. pub fn last(&self) -> &CompletedRound { self.inner.back() diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index 75aa4d85ce..2d08bdc04e 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -490,16 +490,22 @@ pub fn run_grandpa_voter, N, RA, SC, X>( use futures::future::{self, Loop as FutureLoop}; - let (network, network_startup) = NetworkBridge::new(network, config.clone(), on_exit.clone()); - let LinkHalf { client, select_chain, persistent_data, voter_commands_rx, } = link; + let PersistentData { authority_set, set_state, consensus_changes } = persistent_data; + let (network, network_startup) = NetworkBridge::new( + network, + config.clone(), + Some((authority_set.set_id(), &set_state.read())), + on_exit.clone(), + ); + register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?; if let Some(telemetry_on_connect) = telemetry_on_connect { diff --git a/substrate/core/finality-grandpa/src/observer.rs b/substrate/core/finality-grandpa/src/observer.rs index f6d657e1ff..9f6f87f8c4 100644 --- a/substrate/core/finality-grandpa/src/observer.rs +++ b/substrate/core/finality-grandpa/src/observer.rs @@ -166,9 +166,15 @@ pub fn run_grandpa_observer, N, RA, SC>( } = link; let PersistentData { authority_set, consensus_changes, set_state } = persistent_data; - let initial_state = (authority_set, consensus_changes, set_state, voter_commands_rx.into_future()); - let (network, network_startup) = NetworkBridge::new(network, config.clone(), on_exit.clone()); + let (network, network_startup) = NetworkBridge::new( + network, + config.clone(), + None, + on_exit.clone(), + ); + + let initial_state = (authority_set, consensus_changes, set_state, voter_commands_rx.into_future()); let observer_work = future::loop_fn(initial_state, move |state| { let (authority_set, consensus_changes, set_state, voter_commands_rx) = state; diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index 622eb7d470..a64060d01f 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -253,6 +253,11 @@ impl Network for MessageRouting { }) } + fn register_gossip_message(&self, _topic: Hash, _data: Vec) { + // NOTE: only required to restore previous state on startup + // not required for tests currently + } + fn report(&self, _who: network::PeerId, _cost_benefit: i32) { } @@ -1242,7 +1247,12 @@ fn voter_persists_its_votes() { name: Some(format!("peer#{}", 1)), }; let routing = MessageRouting::new(net.clone(), 1); - let (network, routing_work) = communication::NetworkBridge::new(routing, config.clone(), Exit); + let (network, routing_work) = communication::NetworkBridge::new( + routing, + config.clone(), + None, + Exit, + ); runtime.block_on(routing_work).unwrap(); let (round_rx, round_tx) = network.round_communication( @@ -1470,4 +1480,4 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ runner_net.lock().peer(3).client().info().unwrap().chain.finalized_number, if FORCE_CHANGE { 0 } else { 10 }, ); -} \ No newline at end of file +} diff --git a/substrate/core/network/src/consensus_gossip.rs b/substrate/core/network/src/consensus_gossip.rs index c7747a9101..c5f1a2d927 100644 --- a/substrate/core/network/src/consensus_gossip.rs +++ b/substrate/core/network/src/consensus_gossip.rs @@ -281,7 +281,7 @@ impl ConsensusGossip { } } - fn register_message( + fn register_message_hashed( &mut self, message_hash: B::Hash, topic: B::Hash, @@ -296,6 +296,20 @@ impl ConsensusGossip { } } + /// Registers a message without propagating it to any peers. The message + /// becomes available to new peers or when the service is asked to gossip + /// the message's topic. No validation is performed on the message, if the + /// message is already expired it should be dropped on the next garbage + /// collection. + pub fn register_message( + &mut self, + topic: B::Hash, + message: ConsensusMessage, + ) { + let message_hash = HashFor::::hash(&message.data[..]); + self.register_message_hashed(message_hash, topic, message); + } + /// Call when a peer has been disconnected to stop tracking gossip status. pub fn peer_disconnected(&mut self, protocol: &mut Context, who: PeerId) { for (engine_id, v) in self.validators.clone() { @@ -447,7 +461,7 @@ impl ConsensusGossip { } } if keep { - self.register_message(message_hash, topic, message); + self.register_message_hashed(message_hash, topic, message); } } else { trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); @@ -495,7 +509,7 @@ impl ConsensusGossip { force: bool, ) { let message_hash = HashFor::::hash(&message.data); - self.register_message(message_hash, topic, message.clone()); + self.register_message_hashed(message_hash, topic, message.clone()); let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; propagate(protocol, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators); } @@ -604,11 +618,9 @@ mod tests { consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll)); let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; - - let message_hash = HashFor::::hash(&message.data); let topic = HashFor::::hash(&[1,2,3]); - consensus.register_message(message_hash, topic, message.clone()); + consensus.register_message(topic, message.clone()); let stream = consensus.messages_for([0, 0, 0, 0], topic); assert_eq!(stream.wait().next(), Some(Ok(TopicNotification { message: message.data, sender: None }))); @@ -622,8 +634,8 @@ mod tests { let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; - consensus.register_message(HashFor::::hash(&msg_a.data), topic,msg_a); - consensus.register_message(HashFor::::hash(&msg_b.data), topic,msg_b); + consensus.register_message(topic, msg_a); + consensus.register_message(topic, msg_b); assert_eq!(consensus.messages.len(), 2); } @@ -634,11 +646,9 @@ mod tests { consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll)); let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; - - let message_hash = HashFor::::hash(&message.data); let topic = HashFor::::hash(&[1,2,3]); - consensus.register_message(message_hash, topic, message.clone()); + consensus.register_message(topic, message.clone()); let stream1 = consensus.messages_for([0, 0, 0, 0], topic); let stream2 = consensus.messages_for([0, 0, 0, 0], topic); @@ -656,8 +666,8 @@ mod tests { let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 1] }; - consensus.register_message(HashFor::::hash(&msg_a.data), topic, msg_a); - consensus.register_message(HashFor::::hash(&msg_b.data), topic, msg_b); + consensus.register_message(topic, msg_a); + consensus.register_message(topic, msg_b); let mut stream = consensus.messages_for([0, 0, 0, 0], topic).wait();