grandpa: observer doesn't send catch up messages (#3460)

* grandpa: don't send catch up requests when running GRANDPA observer

* grandpa: fix tests

* grandpa: add tests for catch up requests
This commit is contained in:
André Silva
2019-08-23 15:13:01 +02:00
committed by Robert Habermeier
parent 94243e66b3
commit 725aa0aec8
6 changed files with 133 additions and 10 deletions
@@ -503,12 +503,13 @@ struct Inner<Block: BlockT> {
config: crate::Config,
next_rebroadcast: Instant,
pending_catch_up: PendingCatchUp,
catch_up_enabled: bool,
}
type MaybeMessage<Block> = Option<(Vec<PeerId>, NeighborPacket<NumberFor<Block>>)>;
impl<Block: BlockT> Inner<Block> {
fn new(config: crate::Config) -> Self {
fn new(config: crate::Config, catch_up_enabled: bool) -> Self {
Inner {
local_view: None,
peers: Peers::default(),
@@ -516,6 +517,7 @@ impl<Block: BlockT> Inner<Block> {
next_rebroadcast: Instant::now() + REBROADCAST_AFTER,
authorities: Vec::new(),
pending_catch_up: PendingCatchUp::None,
catch_up_enabled,
config,
}
}
@@ -804,6 +806,10 @@ impl<Block: BlockT> Inner<Block> {
}
fn try_catch_up(&mut self, who: &PeerId) -> (Option<GossipMessage<Block>>, Option<Report>) {
if !self.catch_up_enabled {
return (None, None);
}
let mut catch_up = None;
let mut report = None;
@@ -917,13 +923,17 @@ pub(super) struct GossipValidator<Block: BlockT> {
}
impl<Block: BlockT> GossipValidator<Block> {
/// Create a new gossip-validator. This initialized the current set to 0.
pub(super) fn new(config: crate::Config, set_state: environment::SharedVoterSetState<Block>)
-> (GossipValidator<Block>, ReportStream)
{
/// Create a new gossip-validator. The current set is initialized to 0. If
/// `catch_up_enabled` is set to false then the validator will not issue any
/// catch up requests (useful e.g. when running just the GRANDPA observer).
pub(super) fn new(
config: crate::Config,
set_state: environment::SharedVoterSetState<Block>,
catch_up_enabled: bool,
) -> (GossipValidator<Block>, ReportStream) {
let (tx, rx) = mpsc::unbounded();
let val = GossipValidator {
inner: parking_lot::RwLock::new(Inner::new(config)),
inner: parking_lot::RwLock::new(Inner::new(config, catch_up_enabled)),
set_state,
report_sender: tx,
};
@@ -1408,6 +1418,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new(
config(),
voter_set_state(),
true,
);
let set_id = 1;
@@ -1443,6 +1454,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new(
config(),
voter_set_state(),
true,
);
let set_id = 1;
let auth = AuthorityId::from_slice(&[1u8; 32]);
@@ -1487,6 +1499,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new(
config(),
voter_set_state(),
true,
);
let set_id = 1;
@@ -1555,6 +1568,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new(
config(),
set_state.clone(),
true,
);
let set_id = 1;
@@ -1609,6 +1623,7 @@ mod tests {
let (val, _) = GossipValidator::<Block>::new(
config(),
set_state.clone(),
true,
);
// the validator starts at set id 2
@@ -1682,4 +1697,101 @@ mod tests {
false,
);
}
#[test]
fn issues_catch_up_request_on_neighbor_packet_import() {
let (val, _) = GossipValidator::<Block>::new(
config(),
voter_set_state(),
true,
);
// the validator starts at set id 1.
val.note_set(SetId(1), Vec::new(), |_, _| {});
// add the peer making the request to the validator,
// otherwise it is discarded.
let peer = PeerId::random();
val.inner.write().peers.new_peer(peer.clone());
let import_neighbor_message = |set_id, round| {
let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message(
&peer,
NeighborPacket {
round: Round(round),
set_id: SetId(set_id),
commit_finalized_height: 42,
},
);
catch_up_request
};
// importing a neighbor message from a peer in the same set in a later
// round should lead to a catch up request for the previous round.
match import_neighbor_message(1, 42) {
Some(GossipMessage::CatchUpRequest(request)) => {
assert_eq!(request.set_id, SetId(1));
assert_eq!(request.round, Round(41));
},
_ => panic!("expected catch up message"),
}
// we note that we're at round 41.
val.note_round(Round(41), |_, _| {});
// if we import a neighbor message within CATCH_UP_THRESHOLD then we
// won't request a catch up.
match import_neighbor_message(1, 42) {
None => {},
_ => panic!("expected no catch up message"),
}
// or if the peer is on a lower round.
match import_neighbor_message(1, 40) {
None => {},
_ => panic!("expected no catch up message"),
}
// we also don't request a catch up if the peer is in a different set.
match import_neighbor_message(2, 42) {
None => {},
_ => panic!("expected no catch up message"),
}
}
#[test]
fn doesnt_send_catch_up_requests_when_disabled() {
// we create a gossip validator with catch up requests disabled.
let (val, _) = GossipValidator::<Block>::new(
config(),
voter_set_state(),
false,
);
// the validator starts at set id 1.
val.note_set(SetId(1), Vec::new(), |_, _| {});
// add the peer making the request to the validator,
// otherwise it is discarded.
let peer = PeerId::random();
val.inner.write().peers.new_peer(peer.clone());
// importing a neighbor message from a peer in the same set in a later
// round should lead to a catch up request but since they're disabled
// we should get `None`.
let (_, _, catch_up_request, _) = val.inner.write().import_neighbor_message(
&peer,
NeighborPacket {
round: Round(42),
set_id: SetId(1),
commit_finalized_height: 50,
},
);
match catch_up_request {
None => {},
_ => panic!("expected no catch up message"),
}
}
}
@@ -236,19 +236,25 @@ pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
/// Create a new NetworkBridge to the given NetworkService. Returns the service
/// handle and a future that must be polled to completion to finish startup.
/// If a voter set state is given it registers previous round votes with the
/// gossip service.
/// On creation it will register previous rounds' votes with the gossip
/// service taken from the VoterSetState.
pub(crate) fn new(
service: N,
config: crate::Config,
set_state: crate::environment::SharedVoterSetState<B>,
on_exit: impl Future<Item=(),Error=()> + Clone + Send + 'static,
catch_up_enabled: bool,
) -> (
Self,
impl futures::Future<Item = (), Error = ()> + Send + 'static,
) {
let (validator, report_stream) = GossipValidator::new(config, set_state.clone());
let (validator, report_stream) = GossipValidator::new(
config,
set_state.clone(),
catch_up_enabled,
);
let validator = Arc::new(validator);
service.register_validator(validator.clone());
@@ -182,6 +182,7 @@ fn make_test_network() -> (
config(),
voter_set_state(),
Exit,
true,
);
(
@@ -524,6 +524,7 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>(
config.clone(),
persistent_data.set_state.clone(),
on_exit.clone(),
true,
);
register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?;
@@ -175,8 +175,10 @@ pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC>(
network,
config.clone(),
persistent_data.set_state.clone(),
on_exit.clone()
on_exit.clone(),
false,
);
let observer_work = ObserverWork::new(
client,
network,
@@ -1215,6 +1215,7 @@ fn voter_persists_its_votes() {
config.clone(),
set_state,
Exit,
true,
);
runtime.block_on(routing_work).unwrap();