Register previous rounds' votes with gossip service on startup (#2676)

* grandpa: register previous round votes with gossip service on startup

* gossip: fix tests

* grandpa: optionally register previous round votes on startup

* grandpa: fix tests
This commit is contained in:
André Silva
2019-05-26 22:05:02 +01:00
committed by Arkadiy Paronyan
parent e2d1d0c951
commit 10c1dfcffb
7 changed files with 119 additions and 19 deletions
@@ -99,6 +99,12 @@ pub trait Network<Block: BlockT>: Clone + Send + 'static {
/// Only should be used in case of consensus stall. /// Only should be used in case of consensus stall.
fn gossip_message(&self, topic: Block::Hash, data: Vec<u8>, force: bool); fn gossip_message(&self, topic: Block::Hash, data: Vec<u8>, force: bool);
/// Register a message with the gossip service, it isn't broadcast right
/// away to any peers, but may be sent to new peers joining or when asked to
/// broadcast the topic. Useful to register previous messages on node
/// startup.
fn register_gossip_message(&self, topic: Block::Hash, data: Vec<u8>);
/// Send a message to a bunch of specific peers, even if they've seen it already. /// Send a message to a bunch of specific peers, even if they've seen it already.
fn send_message(&self, who: Vec<network::PeerId>, data: Vec<u8>); fn send_message(&self, who: Vec<network::PeerId>, data: Vec<u8>);
@@ -145,11 +151,21 @@ impl<B, S> Network<B> for Arc<NetworkService<B, S>> where
engine_id: GRANDPA_ENGINE_ID, engine_id: GRANDPA_ENGINE_ID,
data, data,
}; };
self.with_gossip( self.with_gossip(
move |gossip, ctx| gossip.multicast(ctx, topic, msg, force) move |gossip, ctx| gossip.multicast(ctx, topic, msg, force)
) )
} }
fn register_gossip_message(&self, topic: B::Hash, data: Vec<u8>) {
let msg = ConsensusMessage {
engine_id: GRANDPA_ENGINE_ID,
data,
};
self.with_gossip(move |gossip, _| gossip.register_message(topic, msg))
}
fn send_message(&self, who: Vec<network::PeerId>, data: Vec<u8>) { fn send_message(&self, who: Vec<network::PeerId>, data: Vec<u8>) {
let msg = ConsensusMessage { let msg = ConsensusMessage {
engine_id: GRANDPA_ENGINE_ID, engine_id: GRANDPA_ENGINE_ID,
@@ -212,9 +228,12 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> { impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
/// Create a new NetworkBridge to the given NetworkService. Returns the service /// Create a new NetworkBridge to the given NetworkService. Returns the service
/// handle and a future that must be polled to completion to finish startup. /// handle and a future that must be polled to completion to finish startup.
/// If a voter set state is given it registers previous round votes with the
/// gossip service.
pub(crate) fn new( pub(crate) fn new(
service: N, service: N,
config: crate::Config, config: crate::Config,
set_state: Option<(u64, &crate::environment::VoterSetState<B>)>,
on_exit: impl Future<Item=(),Error=()> + Clone + Send + 'static, on_exit: impl Future<Item=(),Error=()> + Clone + Send + 'static,
) -> ( ) -> (
Self, Self,
@@ -225,6 +244,41 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
let validator = Arc::new(validator); let validator = Arc::new(validator);
service.register_validator(validator.clone()); service.register_validator(validator.clone());
if let Some((set_id, set_state)) = set_state {
// register all previous votes with the gossip service so that they're
// available to peers potentially stuck on a previous round.
for round in set_state.completed_rounds().iter() {
let topic = round_topic::<B>(round.number, set_id);
// we need to note the round with the gossip validator otherwise
// messages will be ignored.
validator.note_round(Round(round.number), SetId(set_id), |_, _| {});
for signed in round.votes.iter() {
let message = gossip::GossipMessage::VoteOrPrecommit(
gossip::VoteOrPrecommitMessage::<B> {
message: signed.clone(),
round: Round(round.number),
set_id: SetId(set_id),
}
);
service.register_gossip_message(
topic,
message.encode(),
);
}
trace!(target: "afg",
"Registered {} messages for topic {:?} (round: {}, set_id: {})",
round.votes.len(),
topic,
round.number,
set_id,
);
}
}
let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone()); let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(service.clone());
let reporting_job = report_stream.consume(service.clone()); let reporting_job = report_stream.consume(service.clone());
@@ -72,6 +72,15 @@ impl super::Network<Block> for TestNetwork {
let _ = self.sender.unbounded_send(Event::SendMessage(who, data)); let _ = self.sender.unbounded_send(Event::SendMessage(who, data));
} }
/// Register a message with the gossip service, it isn't broadcast right
/// away to any peers, but may be sent to new peers joining or when asked to
/// broadcast the topic. Useful to register previous messages on node
/// startup.
fn register_gossip_message(&self, _topic: Hash, _data: Vec<u8>) {
// NOTE: only required to restore previous state on startup
// not required for tests currently
}
/// Report a peer's cost or benefit after some action. /// Report a peer's cost or benefit after some action.
fn report(&self, who: network::PeerId, cost_benefit: i32) { fn report(&self, who: network::PeerId, cost_benefit: i32) {
let _ = self.sender.unbounded_send(Event::Report(who, cost_benefit)); let _ = self.sender.unbounded_send(Event::Report(who, cost_benefit));
@@ -136,6 +145,7 @@ fn make_test_network() -> impl Future<Item=Tester,Error=()> {
let (bridge, startup_work) = super::NetworkBridge::new( let (bridge, startup_work) = super::NetworkBridge::new(
net.clone(), net.clone(),
config(), config(),
None,
Exit, Exit,
); );
@@ -101,6 +101,10 @@ impl<Block: BlockT> CompletedRounds<Block> {
CompletedRounds { inner } CompletedRounds { inner }
} }
pub fn iter(&self) -> impl Iterator<Item=&CompletedRound<Block>> {
self.inner.iter()
}
/// Returns the last (latest) completed round. /// Returns the last (latest) completed round.
pub fn last(&self) -> &CompletedRound<Block> { pub fn last(&self) -> &CompletedRound<Block> {
self.inner.back() self.inner.back()
+8 -2
View File
@@ -490,16 +490,22 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>(
use futures::future::{self, Loop as FutureLoop}; use futures::future::{self, Loop as FutureLoop};
let (network, network_startup) = NetworkBridge::new(network, config.clone(), on_exit.clone());
let LinkHalf { let LinkHalf {
client, client,
select_chain, select_chain,
persistent_data, persistent_data,
voter_commands_rx, voter_commands_rx,
} = link; } = link;
let PersistentData { authority_set, set_state, consensus_changes } = persistent_data; let PersistentData { authority_set, set_state, consensus_changes } = persistent_data;
let (network, network_startup) = NetworkBridge::new(
network,
config.clone(),
Some((authority_set.set_id(), &set_state.read())),
on_exit.clone(),
);
register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?; register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?;
if let Some(telemetry_on_connect) = telemetry_on_connect { if let Some(telemetry_on_connect) = telemetry_on_connect {
@@ -166,9 +166,15 @@ pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC>(
} = link; } = link;
let PersistentData { authority_set, consensus_changes, set_state } = persistent_data; let PersistentData { authority_set, consensus_changes, set_state } = persistent_data;
let initial_state = (authority_set, consensus_changes, set_state, voter_commands_rx.into_future());
let (network, network_startup) = NetworkBridge::new(network, config.clone(), on_exit.clone()); let (network, network_startup) = NetworkBridge::new(
network,
config.clone(),
None,
on_exit.clone(),
);
let initial_state = (authority_set, consensus_changes, set_state, voter_commands_rx.into_future());
let observer_work = future::loop_fn(initial_state, move |state| { let observer_work = future::loop_fn(initial_state, move |state| {
let (authority_set, consensus_changes, set_state, voter_commands_rx) = state; let (authority_set, consensus_changes, set_state, voter_commands_rx) = state;
+11 -1
View File
@@ -253,6 +253,11 @@ impl Network<Block> for MessageRouting {
}) })
} }
fn register_gossip_message(&self, _topic: Hash, _data: Vec<u8>) {
// NOTE: only required to restore previous state on startup
// not required for tests currently
}
fn report(&self, _who: network::PeerId, _cost_benefit: i32) { fn report(&self, _who: network::PeerId, _cost_benefit: i32) {
} }
@@ -1242,7 +1247,12 @@ fn voter_persists_its_votes() {
name: Some(format!("peer#{}", 1)), name: Some(format!("peer#{}", 1)),
}; };
let routing = MessageRouting::new(net.clone(), 1); let routing = MessageRouting::new(net.clone(), 1);
let (network, routing_work) = communication::NetworkBridge::new(routing, config.clone(), Exit); let (network, routing_work) = communication::NetworkBridge::new(
routing,
config.clone(),
None,
Exit,
);
runtime.block_on(routing_work).unwrap(); runtime.block_on(routing_work).unwrap();
let (round_rx, round_tx) = network.round_communication( let (round_rx, round_tx) = network.round_communication(
+23 -13
View File
@@ -281,7 +281,7 @@ impl<B: BlockT> ConsensusGossip<B> {
} }
} }
fn register_message( fn register_message_hashed(
&mut self, &mut self,
message_hash: B::Hash, message_hash: B::Hash,
topic: B::Hash, topic: B::Hash,
@@ -296,6 +296,20 @@ impl<B: BlockT> ConsensusGossip<B> {
} }
} }
/// Registers a message without propagating it to any peers. The message
/// becomes available to new peers or when the service is asked to gossip
/// the message's topic. No validation is performed on the message, if the
/// message is already expired it should be dropped on the next garbage
/// collection.
pub fn register_message(
&mut self,
topic: B::Hash,
message: ConsensusMessage,
) {
let message_hash = HashFor::<B>::hash(&message.data[..]);
self.register_message_hashed(message_hash, topic, message);
}
/// Call when a peer has been disconnected to stop tracking gossip status. /// Call when a peer has been disconnected to stop tracking gossip status.
pub fn peer_disconnected(&mut self, protocol: &mut Context<B>, who: PeerId) { pub fn peer_disconnected(&mut self, protocol: &mut Context<B>, who: PeerId) {
for (engine_id, v) in self.validators.clone() { for (engine_id, v) in self.validators.clone() {
@@ -447,7 +461,7 @@ impl<B: BlockT> ConsensusGossip<B> {
} }
} }
if keep { if keep {
self.register_message(message_hash, topic, message); self.register_message_hashed(message_hash, topic, message);
} }
} else { } else {
trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
@@ -495,7 +509,7 @@ impl<B: BlockT> ConsensusGossip<B> {
force: bool, force: bool,
) { ) {
let message_hash = HashFor::<B>::hash(&message.data); let message_hash = HashFor::<B>::hash(&message.data);
self.register_message(message_hash, topic, message.clone()); self.register_message_hashed(message_hash, topic, message.clone());
let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast };
propagate(protocol, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators); propagate(protocol, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators);
} }
@@ -604,11 +618,9 @@ mod tests {
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll)); consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
let message_hash = HashFor::<Block>::hash(&message.data);
let topic = HashFor::<Block>::hash(&[1,2,3]); let topic = HashFor::<Block>::hash(&[1,2,3]);
consensus.register_message(message_hash, topic, message.clone()); consensus.register_message(topic, message.clone());
let stream = consensus.messages_for([0, 0, 0, 0], topic); let stream = consensus.messages_for([0, 0, 0, 0], topic);
assert_eq!(stream.wait().next(), Some(Ok(TopicNotification { message: message.data, sender: None }))); assert_eq!(stream.wait().next(), Some(Ok(TopicNotification { message: message.data, sender: None })));
@@ -622,8 +634,8 @@ mod tests {
let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] };
let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic,msg_a); consensus.register_message(topic, msg_a);
consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic,msg_b); consensus.register_message(topic, msg_b);
assert_eq!(consensus.messages.len(), 2); assert_eq!(consensus.messages.len(), 2);
} }
@@ -634,11 +646,9 @@ mod tests {
consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll)); consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll));
let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] };
let message_hash = HashFor::<Block>::hash(&message.data);
let topic = HashFor::<Block>::hash(&[1,2,3]); let topic = HashFor::<Block>::hash(&[1,2,3]);
consensus.register_message(message_hash, topic, message.clone()); consensus.register_message(topic, message.clone());
let stream1 = consensus.messages_for([0, 0, 0, 0], topic); let stream1 = consensus.messages_for([0, 0, 0, 0], topic);
let stream2 = consensus.messages_for([0, 0, 0, 0], topic); let stream2 = consensus.messages_for([0, 0, 0, 0], topic);
@@ -656,8 +666,8 @@ mod tests {
let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] };
let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 1] }; let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 1] };
consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic, msg_a); consensus.register_message(topic, msg_a);
consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic, msg_b); consensus.register_message(topic, msg_b);
let mut stream = consensus.messages_for([0, 0, 0, 0], topic).wait(); let mut stream = consensus.messages_for([0, 0, 0, 0], topic).wait();