diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index acaaab9ad4..dfbea1fe81 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -96,14 +96,13 @@ use runtime_primitives::traits::{ use fg_primitives::GrandpaApi; use runtime_primitives::generic::BlockId; use substrate_primitives::{ed25519, H256, AuthorityId, Blake2Hasher}; -use tokio::timer::{Delay, Interval}; +use tokio::timer::Delay; use grandpa::Error as GrandpaError; use grandpa::{voter, round::State as RoundState, Equivocation, BlockNumberOps}; use network::{Service as NetworkService, ExHashT}; use network::consensus_gossip::{ConsensusMessage}; -use parking_lot::Mutex; use std::collections::{HashMap, HashSet}; use std::fmt; use std::sync::Arc; @@ -266,7 +265,7 @@ impl, H: ExHashT fn send_message(&self, round: u64, set_id: u64, message: Vec) { let topic = message_topic::(round, set_id); - self.service.gossip_consensus_message(topic, message); + self.service.gossip_consensus_message(topic, message, false); } fn drop_messages(&self, round: u64, set_id: u64) { @@ -280,7 +279,7 @@ impl, H: ExHashT fn send_commit(&self, set_id: u64, message: Vec) { let topic = commit_topic::(set_id); - self.service.gossip_consensus_message(topic, message); + self.service.gossip_consensus_message(topic, message, true); } } @@ -809,63 +808,6 @@ fn finalize_block, E, RA>( } } -/// An oracle for liveness checking of a GRANDPA authority set. This is used -/// when importing blocks, if the block enacts an authority set change then -/// either it must provide a justification or if the GRANDPA authority set is -/// still live then the block can be imported unjustified since the block will -/// still be finalized by GRANDPA in a future round. The current heuristic for -/// deciding whether an authority set is live is to check if there were any -/// recent commit messages on an unfiltered stream). -struct GrandpaOracle { - unfiltered_commits_stream: Box), Error=Error> + Send>, - last_commit_target: Option<(Instant, Block::Hash, NumberFor)>, -} - -impl GrandpaOracle { - fn new(stream: Box), Error=Error> + Send>) -> GrandpaOracle { - GrandpaOracle { - unfiltered_commits_stream: stream, - last_commit_target: None, - } - } - - fn poll(&mut self) { - while let Ok(Async::Ready(Some((_, commit)))) = self.unfiltered_commits_stream.poll() { - self.last_commit_target = Some((Instant::now(), commit.target_hash, commit.target_number)); - } - } - - fn is_live(&self) -> bool { - self.last_commit_target.map(|(instant, _, _)| { - instant.elapsed() < Duration::from_secs(30) - }).unwrap_or(false) - } -} - -#[derive(Clone)] -struct SharedGrandpaOracle { - inner: Arc>>>, -} - -impl SharedGrandpaOracle { - fn empty() -> SharedGrandpaOracle { - SharedGrandpaOracle { inner: Arc::new(Mutex::new(None)) } - } - - fn poll(&self) { - if let Some(inner) = self.inner.lock().as_mut() { - inner.poll(); - } - } - - fn is_live(&self) -> bool { - self.inner.lock() - .as_ref() - .map(|inner| inner.is_live()) - .unwrap_or(false) - } -} - /// A block-import handler for GRANDPA. /// /// This scans each imported block for signals of changing authority set. @@ -879,7 +821,6 @@ pub struct GrandpaBlockImport, RA, PRA> { inner: Arc>, authority_set: SharedAuthoritySet>, authority_set_change: mpsc::UnboundedSender>>, - authority_set_oracle: SharedGrandpaOracle, api: Arc, } @@ -900,49 +841,46 @@ impl, RA, PRA> BlockImport { use authorities::PendingChange; - // we don't want to finalize on `inner.import_block` - let justification = block.justification.take(); - let number = block.header.number().clone(); let hash = block.post_header().hash(); - let parent_hash = *block.header.parent_hash(); - let digest = block.header.digest().clone(); - let is_live = self.authority_set_oracle.is_live(); - - let import_result = self.inner.import_block(block, new_authorities)?; - if import_result != ImportResult::Queued { - return Ok(import_result); - } + let number = block.header.number().clone(); let maybe_change = self.api.runtime_api().grandpa_pending_change( - &BlockId::hash(parent_hash), - &digest, + &BlockId::hash(*block.header.parent_hash()), + &block.header.digest().clone(), )?; - let is_equal_or_descendent_of = |base: &Block::Hash| -> Result<(), ClientError> { - let error = || { - Err(ClientErrorKind::Backend( - "invalid authority set change: multiple pending changes on the same chain".to_string() - ).into()) + // when we update the authorities, we need to hold the lock + // until the block is written to prevent a race if we need to restore + // the old authority set on error. + let just_in_case = if let Some(change) = maybe_change { + let parent_hash = *block.header.parent_hash(); + + let mut authorities = self.authority_set.inner().write(); + let old_set = authorities.clone(); + + let is_equal_or_descendent_of = |base: &Block::Hash| -> Result<(), ClientError> { + let error = || { + Err(ClientErrorKind::Backend( + "invalid authority set change: multiple pending changes on the same chain".to_string() + ).into()) + }; + + if *base == hash { return error(); } + if *base == parent_hash { return error(); } + + let tree_route = ::client::blockchain::tree_route( + self.inner.backend().blockchain(), + BlockId::Hash(parent_hash), + BlockId::Hash(*base), + )?; + + if tree_route.common_block().hash == *base { + return error(); + } + + Ok(()) }; - if *base == hash { return error(); } - if *base == parent_hash { return error(); } - - let tree_route = ::client::blockchain::tree_route( - self.inner.backend().blockchain(), - BlockId::Hash(parent_hash), - BlockId::Hash(*base), - )?; - - if tree_route.common_block().hash == *base { - return error(); - } - - Ok(()) - }; - - if let Some(change) = maybe_change { - let mut authorities = self.authority_set.inner().write(); authorities.add_pending_change( PendingChange { next_authorities: change.next_authorities, @@ -953,62 +891,74 @@ impl, RA, PRA> BlockImport is_equal_or_descendent_of, )?; - let encoded = authorities.encode(); - Backend::insert_aux(&**self.inner.backend(), &[(AUTHORITY_SET_KEY, &encoded[..])], &[])?; + block.auxiliary.push((AUTHORITY_SET_KEY.to_vec(), Some(authorities.encode()))); + Some((old_set, authorities)) + } else { + None }; + // we don't want to finalize on `inner.import_block` + let justification = block.justification.take(); + let import_result = self.inner.import_block(block, new_authorities).map_err(|e| { + if let Some((old_set, mut authorities)) = just_in_case { + debug!(target: "afg", "Restoring old set after block import error: {:?}", e); + *authorities = old_set; + } + e + })?; + + if import_result != ImportResult::Queued { + return Ok(import_result); + } + let enacts_change = self.authority_set.inner().read().enacts_change(number, |canon_number| { canonical_at_height(&self.inner, (hash, number), canon_number) })?; - // a pending change is enacted by the given block, if the current - // grandpa authority set isn't live anymore the provided `ImportBlock` - // should include a justification for finalizing the block. + if !enacts_change { + return Ok(import_result); + } + match justification { Some(justification) => { - if enacts_change && !is_live { - let justification = GrandpaJustification::decode_and_verify( - justification, - self.authority_set.set_id(), - &self.authority_set.current_authorities(), - )?; + let justification = GrandpaJustification::decode_and_verify( + justification, + self.authority_set.set_id(), + &self.authority_set.current_authorities(), + )?; - let result = finalize_block( - &*self.inner, - &self.authority_set, - hash, - number, - justification.into(), - ); + let result = finalize_block( + &*self.inner, + &self.authority_set, + hash, + number, + justification.into(), + ); - match result { - Ok(_) => { - unreachable!("returns Ok when no authority set change should be enacted; \ - verified previously that finalizing the current block enacts a change; \ - qed;"); - }, - Err(ExitOrError::AuthoritiesChanged(new)) => { - debug!(target: "finality", "Imported justified block #{} that enacts authority set change, signalling voter.", number); - if let Err(_) = self.authority_set_change.unbounded_send(new) { - return Err(ClientErrorKind::Backend( - "imported and finalized change block but grandpa voter is no longer running".to_string() - ).into()); - } - }, - Err(ExitOrError::Error(_)) => { + match result { + Ok(_) => { + unreachable!("returns Ok when no authority set change should be enacted; \ + verified previously that finalizing the current block enacts a change; \ + qed;"); + }, + Err(ExitOrError::AuthoritiesChanged(new)) => { + debug!(target: "finality", "Imported justified block #{} that enacts authority set change, signalling voter.", number); + if let Err(_) = self.authority_set_change.unbounded_send(new) { return Err(ClientErrorKind::Backend( - "imported change block but failed to finalize it, node may be in an inconsistent state".to_string() + "imported and finalized change block but grandpa voter is no longer running".to_string() ).into()); - }, - } + } + }, + Err(ExitOrError::Error(_)) => { + return Err(ClientErrorKind::Backend( + "imported change block but failed to finalize it, node may be in an inconsistent state".to_string() + ).into()); + }, } }, - None if enacts_change && !is_live => { - return Err(ClientErrorKind::BadJustification( - "missing justification for block that enacts authority set change".to_string() - ).into()); - }, - _ => {} + None => { + trace!(target: "finality", "Imported unjustified block #{} that enacts authority set change, waiting for finality for enactment.", number); + } } Ok(import_result) @@ -1085,7 +1035,6 @@ pub struct LinkHalf, RA> { client: Arc>, authority_set: SharedAuthoritySet>, authority_set_change: mpsc::UnboundedReceiver>>, - authority_set_oracle: SharedGrandpaOracle, } struct AncestryChain { @@ -1170,21 +1119,17 @@ pub fn block_import, RA, PRA>( let (authority_set_change_tx, authority_set_change_rx) = mpsc::unbounded(); - let authority_set_oracle = SharedGrandpaOracle::empty(); - Ok(( GrandpaBlockImport { inner: client.clone(), authority_set: authority_set.clone(), authority_set_change: authority_set_change_tx, - authority_set_oracle: authority_set_oracle.clone(), api }, LinkHalf { client, authority_set, authority_set_change: authority_set_change_rx, - authority_set_oracle, }, )) } @@ -1241,10 +1186,7 @@ pub fn run_grandpa, N, RA>( config: Config, link: LinkHalf, network: N, -) -> ::client::error::Result<( - impl Future + Send + 'static, - impl Future + Send + 'static, -)> where +) -> ::client::error::Result + Send + 'static> where Block::Hash: Ord, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, @@ -1261,16 +1203,8 @@ pub fn run_grandpa, N, RA>( client, authority_set, authority_set_change, - authority_set_oracle } = link; - let oracle_work = { - let authority_set_oracle = authority_set_oracle.clone(); - Interval::new(Instant::now(), Duration::from_secs(1)) - .for_each(move |_| Ok(authority_set_oracle.poll())) - .map_err(|_| ()) - }; - let chain_info = client.info()?; let genesis_hash = chain_info.chain.genesis_hash; @@ -1315,14 +1249,6 @@ pub fn run_grandpa, N, RA>( &network, ); - let unfiltered_commits_stream = Box::new(::communication::checked_commit_stream::( - env.set_id, - network.commit_messages(env.set_id), - env.voters.clone(), - )); - - *authority_set_oracle.inner.lock() = Some(GrandpaOracle::new(unfiltered_commits_stream)); - let voters = (*env.voters).clone(); let voter = voter::Voter::new( @@ -1386,5 +1312,5 @@ pub fn run_grandpa, N, RA>( })) }).map_err(|e| warn!("GRANDPA Voter failed: {:?}", e)); - Ok((voter_work, oracle_work)) + Ok(voter_work) } diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index 424e771b81..d01a48d62f 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -192,7 +192,7 @@ impl Network for MessageRouting { fn send_message(&self, round: u64, set_id: u64, message: Vec) { let mut inner = self.inner.lock(); - inner.peer(self.peer_id).gossip_message(make_topic(round, set_id), message); + inner.peer(self.peer_id).gossip_message(make_topic(round, set_id), message, false); inner.route_until_complete(); } @@ -223,7 +223,7 @@ impl Network for MessageRouting { fn send_commit(&self, set_id: u64, message: Vec) { let mut inner = self.inner.lock(); - inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), message); + inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), message, true); inner.route_until_complete(); } } @@ -368,7 +368,7 @@ fn finalize_3_voters_no_observers() { ); fn assert_send(_: &T) { } - let (voter, oracle) = run_grandpa( + let voter = run_grandpa( Config { gossip_duration: TEST_GOSSIP_DURATION, local_key: Some(Arc::new(key.clone().into())), @@ -380,7 +380,6 @@ fn finalize_3_voters_no_observers() { assert_send(&voter); - runtime.spawn(oracle); runtime.spawn(voter); } @@ -429,7 +428,7 @@ fn finalize_3_voters_1_observer() { .take_while(|n| Ok(n.header.number() < &20)) .for_each(move |_| Ok(())) ); - let (voter, oracle) = run_grandpa( + let voter = run_grandpa( Config { gossip_duration: TEST_GOSSIP_DURATION, local_key, @@ -439,7 +438,6 @@ fn finalize_3_voters_1_observer() { MessageRouting::new(net.clone(), peer_id), ).expect("all in order with client and network"); - runtime.spawn(oracle); runtime.spawn(voter); } @@ -508,7 +506,6 @@ fn transition_3_voters_twice_1_observer() { transitions.lock().insert(parent_hash, change); }; let peers_c = peers_c.clone(); - let executor = runtime.executor().clone(); // wait for blocks to be finalized before generating new ones let block_production = client.finality_notification_stream() @@ -533,34 +530,18 @@ fn transition_3_voters_twice_1_observer() { net.lock().peer(0).push_blocks(5, false); }, 20 => { - let net = net.clone(); - let add_transition = add_transition.clone(); - // at block 21 we do another transition, but this time instant. // add more until we have 30. - let generate_blocks = move || { - net.lock().peer(0).generate_blocks(1, BlockOrigin::File, |builder| { - let block = builder.bake().unwrap(); - add_transition(*block.header.parent_hash(), ScheduledChange { - next_authorities: make_ids(&peers_c), - delay: 0, - }); - - block + net.lock().peer(0).generate_blocks(1, BlockOrigin::File, |builder| { + let block = builder.bake().unwrap(); + add_transition(*block.header.parent_hash(), ScheduledChange { + next_authorities: make_ids(&peers_c), + delay: 0, }); - net.lock().peer(0).push_blocks(9, false); - }; - // delay block generation for a bit for the liveness tracker to be - // able to update due to the authority set change - let delay_generate = Delay::new(Instant::now() + Duration::from_millis(5000)) - .and_then(move |_| { - generate_blocks(); - Ok(()) - }) - .map_err(|_| ()); - - executor.spawn(delay_generate); + block + }); + net.lock().peer(0).push_blocks(9, false); }, _ => {}, } @@ -603,7 +584,7 @@ fn transition_3_voters_twice_1_observer() { assert!(set.pending_changes().is_empty()); }) ); - let (voter, oracle) = run_grandpa( + let voter = run_grandpa( Config { gossip_duration: TEST_GOSSIP_DURATION, local_key, @@ -613,7 +594,6 @@ fn transition_3_voters_twice_1_observer() { MessageRouting::new(net.clone(), peer_id), ).expect("all in order with client and network"); - runtime.spawn(oracle); runtime.spawn(voter); } diff --git a/substrate/core/network/src/consensus_gossip.rs b/substrate/core/network/src/consensus_gossip.rs index cf78fa43b2..b9eec32b30 100644 --- a/substrate/core/network/src/consensus_gossip.rs +++ b/substrate/core/network/src/consensus_gossip.rs @@ -40,6 +40,7 @@ struct MessageEntry { topic: B::Hash, message_hash: B::Hash, message: ConsensusMessage, + broadcast: bool, instant: Instant, } @@ -78,7 +79,7 @@ impl ConsensusGossip { let mut known_messages = HashSet::new(); for entry in self.messages.iter() { known_messages.insert((entry.topic, entry.message_hash)); - protocol.send_message(who, Message::Consensus(entry.topic.clone(), entry.message.clone())); + protocol.send_message(who, Message::Consensus(entry.topic.clone(), entry.message.clone(), entry.broadcast)); } self.peers.insert(who, PeerConsensus { known_messages, @@ -98,10 +99,27 @@ impl ConsensusGossip { protocol: &mut Context, message_hash: B::Hash, topic: B::Hash, + broadcast: bool, get_message: F, ) where F: Fn() -> ConsensusMessage, { + if broadcast { + for (id, ref mut peer) in self.peers.iter_mut() { + if peer.known_messages.insert((topic.clone(), message_hash.clone())) { + let message = get_message(); + if peer.is_authority { + trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message); + } else { + trace!(target:"gossip", "Propagating to {}: {:?}", id, message); + } + protocol.send_message(*id, Message::Consensus(topic, message, broadcast)); + } + } + + return; + } + let mut non_authorities: Vec<_> = self.peers.iter() .filter_map(|(id, ref peer)| if !peer.is_authority && !peer.known_messages.contains(&(topic, message_hash)) { Some(*id) } else { None }) .collect(); @@ -118,24 +136,25 @@ impl ConsensusGossip { if peer.known_messages.insert((topic.clone(), message_hash.clone())) { let message = get_message(); trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message); - protocol.send_message(*id, Message::Consensus(topic, message)); + protocol.send_message(*id, Message::Consensus(topic, message, broadcast)); } } else if non_authorities.contains(&id) { let message = get_message(); trace!(target:"gossip", "Propagating to {}: {:?}", id, message); peer.known_messages.insert((topic.clone(), message_hash.clone())); - protocol.send_message(*id, Message::Consensus(topic, message)); + protocol.send_message(*id, Message::Consensus(topic, message, broadcast)); } } } - fn register_message(&mut self, message_hash: B::Hash, topic: B::Hash, get_message: F) + fn register_message(&mut self, message_hash: B::Hash, topic: B::Hash, broadcast: bool, get_message: F) where F: Fn() -> ConsensusMessage { if self.known_messages.insert((topic, message_hash)) { self.messages.push(MessageEntry { topic, message_hash, + broadcast, instant: Instant::now(), message: get_message(), }); @@ -193,6 +212,7 @@ impl ConsensusGossip { who: NodeIndex, topic: B::Hash, message: ConsensusMessage, + broadcast: bool, ) -> Option<(B::Hash, ConsensusMessage)> { let message_hash = HashFor::::hash(&message[..]); @@ -236,21 +256,34 @@ impl ConsensusGossip { return None; } - self.multicast_inner(protocol, message_hash, topic, || message.clone()); + self.multicast_inner(protocol, message_hash, topic, broadcast, || message.clone()); Some((topic, message)) } /// Multicast a message to all peers. - pub fn multicast(&mut self, protocol: &mut Context, topic: B::Hash, message: ConsensusMessage) { + pub fn multicast( + &mut self, + protocol: &mut Context, + topic: B::Hash, + message: ConsensusMessage, + broadcast: bool, + ) { let message_hash = HashFor::::hash(&message); - self.multicast_inner(protocol, message_hash, topic, || message.clone()); + self.multicast_inner(protocol, message_hash, topic, broadcast, || message.clone()); } - fn multicast_inner(&mut self, protocol: &mut Context, message_hash: B::Hash, topic: B::Hash, get_message: F) + fn multicast_inner( + &mut self, + protocol: &mut Context, + message_hash: B::Hash, + topic: B::Hash, + broadcast: bool, + get_message: F, + ) where F: Fn() -> ConsensusMessage { - self.register_message(message_hash, topic, &get_message); - self.propagate(protocol, message_hash, topic, get_message); + self.register_message(message_hash, topic, broadcast, &get_message); + self.propagate(protocol, message_hash, topic, broadcast, get_message); } /// Note new consensus session. @@ -287,6 +320,7 @@ mod tests { message_hash: $hash, instant: $now, message: $m, + broadcast: false, }) } } @@ -331,7 +365,7 @@ mod tests { let message_hash = HashFor::::hash(&message); let topic = HashFor::::hash(&[1,2,3]); - consensus.register_message(message_hash, topic, || message.clone()); + consensus.register_message(message_hash, topic, false, || message.clone()); let stream = consensus.messages_for(topic); assert_eq!(stream.wait().next(), Some(Ok(message))); @@ -345,8 +379,8 @@ mod tests { let msg_a = vec![1, 2, 3]; let msg_b = vec![4, 5, 6]; - consensus.register_message(HashFor::::hash(&msg_a), topic, || msg_a.clone()); - consensus.register_message(HashFor::::hash(&msg_b), topic, || msg_b.clone()); + consensus.register_message(HashFor::::hash(&msg_a), topic, false, || msg_a.clone()); + consensus.register_message(HashFor::::hash(&msg_b), topic, false, || msg_b.clone()); assert_eq!(consensus.messages.len(), 2); } @@ -362,7 +396,7 @@ mod tests { let message_hash = HashFor::::hash(&message); let topic = HashFor::::hash(&[1,2,3]); - consensus.register_message(message_hash, topic, || message.clone()); + consensus.register_message(message_hash, topic, false, || message.clone()); let stream1 = consensus.messages_for(topic); let stream2 = consensus.messages_for(topic); diff --git a/substrate/core/network/src/message.rs b/substrate/core/network/src/message.rs index 519d7c78eb..e422c77418 100644 --- a/substrate/core/network/src/message.rs +++ b/substrate/core/network/src/message.rs @@ -173,7 +173,7 @@ pub mod generic { /// Transactions. Transactions(Transactions), /// Consensus protocol message. - Consensus(Hash, ConsensusMessage), // topic, opaque Vec + Consensus(Hash, ConsensusMessage, bool), // topic, opaque Vec, broadcast /// Remote method call request. RemoteCallRequest(RemoteCallRequest), /// Remote method call response. diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index cebda6c69c..60133b25c1 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -285,8 +285,8 @@ impl, H: ExHashT> Protocol { GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(io, who, response), GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(io, who, request), GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(io, who, response), - GenericMessage::Consensus(topic, msg) => { - self.consensus_gossip.write().on_incoming(&mut ProtocolContext::new(&self.context_data, io), who, topic, msg); + GenericMessage::Consensus(topic, msg, broadcast) => { + self.consensus_gossip.write().on_incoming(&mut ProtocolContext::new(&self.context_data, io), who, topic, msg, broadcast); }, other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, &mut Some(other)), } @@ -296,10 +296,10 @@ impl, H: ExHashT> Protocol { send_message::(&self.context_data.peers, io, who, message) } - pub fn gossip_consensus_message(&self, io: &mut SyncIo, topic: B::Hash, message: Vec) { + pub fn gossip_consensus_message(&self, io: &mut SyncIo, topic: B::Hash, message: Vec, broadcast: bool) { let gossip = self.consensus_gossip(); self.with_spec(io, move |_s, context|{ - gossip.write().multicast(context, topic, message); + gossip.write().multicast(context, topic, message, broadcast); }); } diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index d7d72dca8d..d633e0397d 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -127,11 +127,13 @@ impl, H: ExHashT> Service) { + pub fn gossip_consensus_message(&self, topic: B::Hash, message: Vec, broadcast: bool) { self.handler.gossip_consensus_message( &mut NetSyncIo::new(&self.network, self.protocol_id), topic, - message) + message, + broadcast, + ) } /// Execute a closure with the chain-specific network specialization. pub fn with_spec(&self, f: F) -> U diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 6b0d36a1fc..f8b6705164 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -229,8 +229,8 @@ impl, D> Peer { /// Push a message into the gossip network and relay to peers. /// `TestNet::sync_step` needs to be called to ensure it's propagated. - pub fn gossip_message(&self, topic: Hash, data: Vec) { - self.sync.gossip_consensus_message(&mut TestIo::new(&self.queue, None), topic, data); + pub fn gossip_message(&self, topic: Hash, data: Vec, broadcast: bool) { + self.sync.gossip_consensus_message(&mut TestIo::new(&self.queue, None), topic, data, broadcast); } /// Add blocks to the peer -- edit the block before adding diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 603f9df82a..12d8f89208 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -578,11 +578,8 @@ macro_rules! construct_service_factory { ) -> Result { ( $( $full_service_init )* ) (config, executor.clone()).and_then(|service| { - if let Some(key) = (&service).authority_key() { - ($( $authority_setup )*)(service, executor, Arc::new(key)) - } else { - Ok(service) - } + let key = (&service).authority_key().map(Arc::new); + ($( $authority_setup )*)(service, executor, key) }) } } diff --git a/substrate/core/service/test/src/lib.rs b/substrate/core/service/test/src/lib.rs index 182851485f..d5aafbe675 100644 --- a/substrate/core/service/test/src/lib.rs +++ b/substrate/core/service/test/src/lib.rs @@ -204,7 +204,7 @@ pub fn connectivity(spec: FactoryChainSpec) wher { let temp = TempDir::new("substrate-connectivity-test").expect("Error creating test dir"); { - let mut network = TestNet::::new(&temp, spec, NUM_NODES as u32, 0, vec![], 30400); + let mut network = TestNet::::new(&temp, spec, NUM_NODES, 0, vec![], 30500); info!("Checking linked topology"); let mut address = network.full_nodes[0].1.network().node_id().expect("No node address"); for (_, service) in network.full_nodes.iter().skip(1) { diff --git a/substrate/node/cli/src/chain_spec.rs b/substrate/node/cli/src/chain_spec.rs index 92653e057c..373a3ed7e1 100644 --- a/substrate/node/cli/src/chain_spec.rs +++ b/substrate/node/cli/src/chain_spec.rs @@ -311,7 +311,7 @@ mod tests { } #[test] - fn test_connectiviy() { + fn test_connectivity() { service_test::connectivity::(integration_test_config()); } } diff --git a/substrate/node/cli/src/service.rs b/substrate/node/cli/src/service.rs index 0645e1ca5d..393b2a480c 100644 --- a/substrate/node/cli/src/service.rs +++ b/substrate/node/cli/src/service.rs @@ -77,42 +77,51 @@ construct_service_factory! { { |config: FactoryFullConfiguration, executor: TaskExecutor| FullComponents::::new(config, executor) }, AuthoritySetup = { - |mut service: Self::FullService, executor: TaskExecutor, key: Arc| { + |mut service: Self::FullService, executor: TaskExecutor, key: Option>| { let (block_import, link_half) = service.config.custom.grandpa_import_setup.take() .expect("Link Half and Block Import are present for Full Services or setup failed before. qed"); - if service.config.custom.grandpa_authority { - info!("Running Grandpa session as Authority {}", key.public()); - let (voter, oracle) = grandpa::run_grandpa( - grandpa::Config { - gossip_duration: Duration::new(4, 0), // FIXME: make this available through chainspec? - local_key: Some(key.clone()), - name: Some(service.config.name.clone()) - }, - link_half, - grandpa::NetworkBridge::new(service.network()), - )?; + let local_key = if let Some(key) = key { + if !service.config.custom.grandpa_authority_only { + info!("Using authority key {}", key.public()); + let proposer = Arc::new(substrate_service::ProposerFactory { + client: service.client(), + transaction_pool: service.transaction_pool(), + }); - executor.spawn(oracle); - executor.spawn(voter); - } - if !service.config.custom.grandpa_authority_only { - info!("Using authority key {}", key.public()); - let proposer = Arc::new(substrate_service::ProposerFactory { - client: service.client(), - transaction_pool: service.transaction_pool(), - }); + let client = service.client(); + executor.spawn(start_aura( + SlotDuration::get_or_compute(&*client)?, + key.clone(), + client, + block_import.clone(), + proposer, + service.network(), + )); + } + + if service.config.custom.grandpa_authority { + info!("Running Grandpa session as Authority {}", key.public()); + Some(key) + } else { + None + } + } else { + None + }; + + let voter = grandpa::run_grandpa( + grandpa::Config { + local_key, + gossip_duration: Duration::new(4, 0), // FIXME: make this available through chainspec? + name: Some(service.config.name.clone()) + }, + link_half, + grandpa::NetworkBridge::new(service.network()), + )?; + + executor.spawn(voter); - let client = service.client(); - executor.spawn(start_aura( - SlotDuration::get_or_compute(&*client)?, - key, - client, - block_import.clone(), - proposer, - service.network(), - )); - } Ok(service) } },