diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index b14022ea3a..1c3bd04108 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -3033,6 +3033,7 @@ dependencies = [ name = "substrate-finality-grandpa" version = "0.1.0" dependencies = [ + "env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)", "finality-grandpa 0.3.0", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/substrate/core/finality-grandpa/Cargo.toml b/substrate/core/finality-grandpa/Cargo.toml index fa30e427f8..f570f4c4d8 100644 --- a/substrate/core/finality-grandpa/Cargo.toml +++ b/substrate/core/finality-grandpa/Cargo.toml @@ -25,3 +25,4 @@ features = ["derive-codec"] substrate-network = { path = "../network", features = ["test-helpers"] } substrate-keyring = { path = "../keyring" } substrate-test-client = { path = "../test-client"} +env_logger = "0.5" diff --git a/substrate/core/finality-grandpa/src/authorities.rs b/substrate/core/finality-grandpa/src/authorities.rs index 8541a64123..79674acefa 100644 --- a/substrate/core/finality-grandpa/src/authorities.rs +++ b/substrate/core/finality-grandpa/src/authorities.rs @@ -120,6 +120,7 @@ impl AuthoritySet } /// Inspect pending changes. + #[cfg(test)] pub(crate) fn pending_changes(&self) -> &[PendingChange] { &self.pending_changes } diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index d12b35ea1f..ee493d67e5 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -41,6 +41,9 @@ extern crate substrate_keyring as keyring; #[cfg(test)] extern crate substrate_test_client as test_client; +#[cfg(test)] +extern crate env_logger; + #[macro_use] extern crate parity_codec_derive; @@ -100,6 +103,14 @@ pub struct Config { pub gossip_duration: Duration, /// The local signing key. pub local_key: Option>, + /// Some local identifier of the voter. + pub name: Option, +} + +impl Config { + fn name(&self) -> &str { + self.name.as_ref().map(|s| s.as_str()).unwrap_or("") + } } /// Errors that can occur while voting in GRANDPA. @@ -133,13 +144,13 @@ pub trait Network: Clone { /// Get a stream of messages for a specific round. This stream should /// never logically conclude. - fn messages_for(&self, round: u64) -> Self::In; + fn messages_for(&self, round: u64, set_id: u64) -> Self::In; /// Send a message at a specific round out. - fn send_message(&self, round: u64, message: Vec); + fn send_message(&self, round: u64, set_id: u64, message: Vec); /// Clean up messages for a round. - fn drop_messages(&self, round: u64); + fn drop_messages(&self, round: u64, set_id: u64); } /// Something which can determine if a block is known. @@ -297,6 +308,7 @@ impl, I> Stream for UntilImported { round: u64, + set_id: u64, inner: I, network: N, } @@ -320,7 +332,7 @@ impl Sink for ClearOnDrop { impl Drop for ClearOnDrop { fn drop(&mut self) { - self.network.drop_messages(self.round); + self.network.drop_messages(self.round, self.set_id); } } @@ -406,7 +418,7 @@ fn outgoing_messages( }; // forward to network. - network.send_message(round, signed.encode()); + network.send_message(round, set_id, signed.encode()); Some(signed) } else { None @@ -473,6 +485,8 @@ impl grandpa::Chain> for E // once blocks are finalized that make that transition irrelevant or activate it, // we will proceed onwards. most of the time there will be no pending transition. let limit = self.authority_set.current_limit(); + trace!(target: "afg", "Finding best chain containing block {:?} with number limit {:?}", block, limit); + match self.inner.best_containing(block, limit) { Ok(Some(hash)) => { let header = self.inner.header(&BlockId::Hash(hash)).ok()? @@ -564,7 +578,7 @@ impl voter::Environment> f let incoming = checked_message_stream::( round, self.set_id, - self.network.messages_for(round), + self.network.messages_for(round, self.set_id), self.voters.clone(), ); @@ -585,11 +599,12 @@ impl voter::Environment> f ); // join incoming network messages with locally originating ones. - let incoming = Box::new(incoming.select(out_rx).map_err(Into::into)); + let incoming = Box::new(out_rx.select(incoming).map_err(Into::into)); // schedule network message cleanup when sink drops. let outgoing = Box::new(ClearOnDrop { round, + set_id: self.set_id, network: self.network.clone(), inner: outgoing.sink_map_err(Into::into), }); @@ -604,6 +619,15 @@ impl voter::Environment> f } fn completed(&self, round: u64, state: RoundState>) -> Result<(), Self::Error> { + debug!( + target: "afg", "Voter {} completed round {} in set {}. Estimate = {:?}, Finalized in round = {:?}", + self.config.name(), + round, + self.set_id, + state.estimate.as_ref().map(|e| e.1), + state.finalized.as_ref().map(|e| e.1), + ); + let encoded_state = (round, state).encode(); if let Err(e) = self.inner.backend() .insert_aux(&[(LAST_COMPLETED_KEY, &encoded_state[..])], &[]) @@ -669,6 +693,12 @@ impl voter::Environment> f if let Some((canon_hash, canon_number)) = status.new_set_block { // the authority set has changed. let (new_id, set_ref) = authority_set.current(); + + if set_ref.len() > 16 { + info!("Applying GRANDPA set change to new set with {} authorities", set_ref.len()); + } else { + info!("Applying GRANDPA set change to new set {:?}", set_ref); + } Err(ExitOrError::AuthoritiesChanged(NewAuthoritySet { canon_hash, canon_number, @@ -772,7 +802,7 @@ impl BlockImport for GrandpaBlockImport( let work = future::loop_fn((initial_environment, last_round_number, last_state), move |params| { let (env, last_round_number, last_state) = params; + debug!(target: "afg", "{}: Starting new voter with set ID {}", config.name(), env.set_id); + let chain_info = match client.info() { Ok(i) => i, Err(e) => return future::Either::B(future::err(Error::Client(e))), diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index 252fbcf830..1a47a8fa8b 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -27,6 +27,7 @@ use client::BlockchainEvents; use test_client::{self, runtime::BlockNumber}; use codec::Decode; use consensus_common::BlockOrigin; +use std::collections::HashSet; use authorities::AuthoritySet; @@ -69,6 +70,13 @@ impl TestNetFactory for GrandpaTestNet { } } + fn default_config() -> ProtocolConfig { + // the authority role ensures gossip hits all nodes here. + ProtocolConfig { + roles: ::network::Roles::AUTHORITY, + } + } + fn make_verifier(&self, _client: Arc, _cfg: &ProtocolConfig) -> Arc { @@ -118,21 +126,25 @@ impl MessageRouting { } } -fn round_to_topic(round: u64) -> Hash { +fn make_topic(round: u64, set_id: u64) -> Hash { let mut hash = Hash::default(); round.using_encoded(|s| { let raw = hash.as_mut(); raw[..8].copy_from_slice(s); }); + set_id.using_encoded(|s| { + let raw = hash.as_mut(); + raw[8..16].copy_from_slice(s); + }); hash } impl Network for MessageRouting { type In = Box,Error=()>>; - fn messages_for(&self, round: u64) -> Self::In { + fn messages_for(&self, round: u64, set_id: u64) -> Self::In { let messages = self.inner.lock().peer(self.peer_id) - .with_spec(|spec, _| spec.gossip.messages_for(round_to_topic(round))); + .with_spec(|spec, _| spec.gossip.messages_for(make_topic(round, set_id))); let messages = messages.map_err( move |_| panic!("Messages for round {} dropped too early", round) @@ -141,14 +153,14 @@ impl Network for MessageRouting { Box::new(messages) } - fn send_message(&self, round: u64, message: Vec) { + fn send_message(&self, round: u64, set_id: u64, message: Vec) { let mut inner = self.inner.lock(); - inner.peer(self.peer_id).gossip_message(round_to_topic(round), message); - inner.route(); + inner.peer(self.peer_id).gossip_message(make_topic(round, set_id), message); + inner.route_until_complete(); } - fn drop_messages(&self, round: u64) { - let topic = round_to_topic(round); + fn drop_messages(&self, round: u64, set_id: u64) { + let topic = make_topic(round, set_id); self.inner.lock().peer(self.peer_id) .with_spec(|spec, _| spec.gossip.collect_garbage(|t| t == &topic)); } @@ -179,12 +191,7 @@ impl ApiClient for TestApi { { // we take only scheduled changes at given block number where there are no // extrinsics. - let change = self.scheduled_changes.lock().get(&header.hash()).map(|c| c.clone()); - if change.is_some() { - println!("Found transition for {:?}", header.hash()); - } - - Ok(change) + Ok(self.scheduled_changes.lock().get(&header.hash()).map(|c| c.clone())) } } @@ -236,6 +243,7 @@ fn finalize_3_voters_no_observers() { Config { gossip_duration: TEST_GOSSIP_DURATION, local_key: Some(Arc::new(key.clone().into())), + name: Some(format!("peer#{}", peer_id)), }, link, MessageRouting::new(net.clone(), peer_id), @@ -293,6 +301,7 @@ fn finalize_3_voters_1_observer() { Config { gossip_duration: TEST_GOSSIP_DURATION, local_key, + name: Some(format!("peer#{}", peer_id)), }, link, MessageRouting::new(net.clone(), peer_id), @@ -344,14 +353,13 @@ fn transition_3_voters_twice_1_observer() { transitions.lock().insert(hash, change); }; - let mut net = GrandpaTestNet::new(api, 8); + let mut net = GrandpaTestNet::new(api, 9); // first 20 blocks: transition at 15, applied at 20. { net.peer(0).push_blocks(14, false); net.peer(0).generate_blocks(1, BlockOrigin::File, |builder| { let block = builder.bake().unwrap(); - println!("Adding transition for {:?}", block.header.hash()); add_transition(block.header.hash(), ScheduledChange { next_authorities: make_ids(peers_b), delay: 4, @@ -367,7 +375,6 @@ fn transition_3_voters_twice_1_observer() { { net.peer(0).generate_blocks(1, BlockOrigin::File, |builder| { let block = builder.bake().unwrap(); - println!("Adding transition for {:?}", block.header.hash()); add_transition(block.header.hash(), ScheduledChange { next_authorities: make_ids(peers_c), delay: 0, @@ -391,50 +398,63 @@ fn transition_3_voters_twice_1_observer() { assert_eq!(set.pending_changes().len(), 2); } - // let net = Arc::new(Mutex::new(net)); - // let mut finality_notifications = Vec::new(); + let net = Arc::new(Mutex::new(net)); + let mut finality_notifications = Vec::new(); - // let mut runtime = current_thread::Runtime::new().unwrap(); - // let all_peers = peers.iter() - // .cloned() - // .map(|key| Some(Arc::new(key.into()))) - // .chain(::std::iter::once(None)); + let mut runtime = current_thread::Runtime::new().unwrap(); + let all_peers = peers_a.iter() + .chain(peers_b) + .chain(peers_c) + .chain(observer) + .cloned() + .collect::>() // deduplicate + .into_iter() + .map(|key| Some(Arc::new(key.into()))) + .enumerate(); - // for (peer_id, local_key) in all_peers.enumerate() { - // let (client, link) = { - // let mut net = net.lock(); - // let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); - // ( - // net.peers[peer_id].client().clone(), - // link, - // ) - // }; - // finality_notifications.push( - // client.finality_notification_stream() - // .take_while(|n| Ok(n.header.number() < &20)) - // .for_each(move |_| Ok(())) - // ); - // let voter = run_grandpa( - // Config { - // gossip_duration: TEST_GOSSIP_DURATION, - // local_key, - // }, - // link, - // MessageRouting::new(net.clone(), peer_id), - // ).expect("all in order with client and network"); + for (peer_id, local_key) in all_peers { + let (client, link) = { + let mut net = net.lock(); + let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); + ( + net.peers[peer_id].client().clone(), + link, + ) + }; + finality_notifications.push( + client.finality_notification_stream() + .take_while(|n| Ok(n.header.number() < &30)) + .for_each(move |_| Ok(())) + .map(move |()| { + let set_raw = client.backend().get_aux(::AUTHORITY_SET_KEY).unwrap().unwrap(); + let set = AuthoritySet::::decode(&mut &set_raw[..]).unwrap(); - // runtime.spawn(voter); - // } + assert_eq!(set.current(), (2, make_ids(peers_c).as_slice())); + assert!(set.pending_changes().is_empty()); + }) + ); + let voter = run_grandpa( + Config { + gossip_duration: TEST_GOSSIP_DURATION, + local_key, + name: Some(format!("peer#{}", peer_id)), + }, + link, + MessageRouting::new(net.clone(), peer_id), + ).expect("all in order with client and network"); - // // wait for all finalized on each. - // let wait_for = ::futures::future::join_all(finality_notifications) - // .map(|_| ()) - // .map_err(|_| ()); + runtime.spawn(voter); + } - // let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) - // .for_each(move |_| { net.lock().route_until_complete(); Ok(()) }) - // .map(|_| ()) - // .map_err(|_| ()); + // wait for all finalized on each. + let wait_for = ::futures::future::join_all(finality_notifications) + .map(|_| ()) + .map_err(|_| ()); - // runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); + let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) + .for_each(move |_| { net.lock().route_until_complete(); Ok(()) }) + .map(|_| ()) + .map_err(|_| ()); + + runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); }