diff --git a/substrate/bin/node-template/node/src/service.rs b/substrate/bin/node-template/node/src/service.rs index e32ba74050..1fa1a372a0 100644 --- a/substrate/bin/node-template/node/src/service.rs +++ b/substrate/bin/node-template/node/src/service.rs @@ -79,13 +79,15 @@ pub fn new_partial(config: &Configuration) -> Result Result { +pub fn new_full(mut config: Configuration) -> Result { let sc_service::PartialComponents { client, backend, mut task_manager, import_queue, keystore_container, select_chain, transaction_pool, inherent_data_providers, other: (block_import, grandpa_link), } = new_partial(&config)?; + config.network.notifications_protocols.push(sc_finality_grandpa::GRANDPA_PROTOCOL_NAME.into()); + let (network, network_status_sinks, system_rpc_tx, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, @@ -210,8 +212,6 @@ pub fn new_full(config: Configuration) -> Result { "grandpa-voter", sc_finality_grandpa::run_grandpa_voter(grandpa_config)? ); - } else { - sc_finality_grandpa::setup_disabled_grandpa(network)?; } network_starter.start_network(); @@ -219,10 +219,12 @@ pub fn new_full(config: Configuration) -> Result { } /// Builds a new service for a light client. -pub fn new_light(config: Configuration) -> Result { +pub fn new_light(mut config: Configuration) -> Result { let (client, backend, keystore_container, mut task_manager, on_demand) = sc_service::new_light_parts::(&config)?; + config.network.notifications_protocols.push(sc_finality_grandpa::GRANDPA_PROTOCOL_NAME.into()); + let select_chain = sc_consensus::LongestChain::new(backend.clone()); let transaction_pool = Arc::new(sc_transaction_pool::BasicPool::new_light( diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index 9d7c9bb1b7..5eb8e35e69 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -164,7 +164,7 @@ pub struct NewFullBase { /// Creates a full service from the configuration. pub fn new_full_base( - config: Configuration, + mut config: Configuration, with_startup_data: impl FnOnce( &sc_consensus_babe::BabeBlockImport, &sc_consensus_babe::BabeLink, @@ -178,6 +178,8 @@ pub fn new_full_base( let shared_voter_state = rpc_setup; + config.network.notifications_protocols.push(grandpa::GRANDPA_PROTOCOL_NAME.into()); + let (network, network_status_sinks, system_rpc_tx, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, @@ -315,8 +317,6 @@ pub fn new_full_base( "grandpa-voter", grandpa::run_grandpa_voter(grandpa_config)? ); - } else { - grandpa::setup_disabled_grandpa(network.clone())?; } network_starter.start_network(); @@ -338,7 +338,7 @@ pub fn new_full(config: Configuration) }) } -pub fn new_light_base(config: Configuration) -> Result<( +pub fn new_light_base(mut config: Configuration) -> Result<( TaskManager, RpcHandlers, Arc, Arc::Hash>>, Arc>> @@ -346,6 +346,8 @@ pub fn new_light_base(config: Configuration) -> Result<( let (client, backend, keystore_container, mut task_manager, on_demand) = sc_service::new_light_parts::(&config)?; + config.network.notifications_protocols.push(grandpa::GRANDPA_PROTOCOL_NAME.into()); + let select_chain = sc_consensus::LongestChain::new(backend.clone()); let transaction_pool = Arc::new(sc_transaction_pool::BasicPool::new_light( diff --git a/substrate/client/finality-grandpa/src/communication/mod.rs b/substrate/client/finality-grandpa/src/communication/mod.rs index 038d82a8cd..29fe8bc747 100644 --- a/substrate/client/finality-grandpa/src/communication/mod.rs +++ b/substrate/client/finality-grandpa/src/communication/mod.rs @@ -68,6 +68,8 @@ mod periodic; #[cfg(test)] pub(crate) mod tests; +/// Name of the notifications protocol used by Grandpa. Must be registered towards the networking +/// in order for Grandpa to properly function. pub const GRANDPA_PROTOCOL_NAME: &'static str = "/paritytech/grandpa/1"; // cost scalars for reporting peers. diff --git a/substrate/client/finality-grandpa/src/communication/tests.rs b/substrate/client/finality-grandpa/src/communication/tests.rs index e1685256f7..27a394a062 100644 --- a/substrate/client/finality-grandpa/src/communication/tests.rs +++ b/substrate/client/finality-grandpa/src/communication/tests.rs @@ -62,8 +62,6 @@ impl sc_network_gossip::Network for TestNetwork { let _ = self.sender.unbounded_send(Event::WriteNotification(who, message)); } - fn register_notifications_protocol(&self, _: Cow<'static, str>) {} - fn announce(&self, block: Hash, _associated_data: Vec) { let _ = self.sender.unbounded_send(Event::Announce(block)); } diff --git a/substrate/client/finality-grandpa/src/lib.rs b/substrate/client/finality-grandpa/src/lib.rs index c5f89717a6..ced101b8c8 100644 --- a/substrate/client/finality-grandpa/src/lib.rs +++ b/substrate/client/finality-grandpa/src/lib.rs @@ -122,6 +122,7 @@ mod until_imported; mod voting_rule; pub use authorities::{SharedAuthoritySet, AuthoritySet}; +pub use communication::GRANDPA_PROTOCOL_NAME; pub use finality_proof::{FinalityProofFragment, FinalityProofProvider, StorageAndProofProvider}; pub use notification::{GrandpaJustificationSender, GrandpaJustificationStream}; pub use import::GrandpaBlockImport; @@ -652,6 +653,10 @@ pub struct GrandpaParams { /// A link to the block import worker. pub link: LinkHalf, /// The Network instance. + /// + /// It is assumed that this network will feed us Grandpa notifications. When using the + /// `sc_network` crate, it is assumed that the Grandpa notifications protocol has been passed + /// to the configuration of the networking. pub network: N, /// If supplied, can be used to hook on telemetry connection established events. pub telemetry_on_connect: Option>, @@ -1065,26 +1070,6 @@ where } } -/// When GRANDPA is not initialized we still need to register the finality -/// tracker inherent provider which might be expected by the runtime for block -/// authoring. Additionally, we register a gossip message validator that -/// discards all GRANDPA messages (otherwise, we end up banning nodes that send -/// us a `Neighbor` message, since there is no registered gossip validator for -/// the engine id defined in the message.) -pub fn setup_disabled_grandpa(network: N) -> Result<(), sp_consensus::Error> -where - N: NetworkT + Send + Clone + 'static, -{ - // We register the GRANDPA protocol so that we don't consider it an anomaly - // to receive GRANDPA messages on the network. We don't process the - // messages. - network.register_notifications_protocol( - From::from(communication::GRANDPA_PROTOCOL_NAME), - ); - - Ok(()) -} - /// Checks if this node has any available keys in the keystore for any authority id in the given /// voter set. Returns the authority id for which keys are available, or `None` if no keys are /// available. diff --git a/substrate/client/finality-grandpa/src/tests.rs b/substrate/client/finality-grandpa/src/tests.rs index ef8168e84f..452b30941d 100644 --- a/substrate/client/finality-grandpa/src/tests.rs +++ b/substrate/client/finality-grandpa/src/tests.rs @@ -269,6 +269,52 @@ fn block_until_complete(future: impl Future + Unpin, net: &Arc impl Future { + let voters = stream::FuturesUnordered::new(); + + for (peer_id, key) in peers.iter().enumerate() { + let (keystore, _) = create_keystore(*key); + + let (net_service, link) = { + // temporary needed for some reason + let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); + ( + net.peers[peer_id].network_service().clone(), + link, + ) + }; + + let grandpa_params = GrandpaParams { + config: Config { + gossip_duration: TEST_GOSSIP_DURATION, + justification_period: 32, + keystore: Some(keystore), + name: Some(format!("peer#{}", peer_id)), + is_authority: true, + observer_enabled: true, + }, + link, + network: net_service, + telemetry_on_connect: None, + voting_rule: (), + prometheus_registry: None, + shared_voter_state: SharedVoterState::empty(), + }; + let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network"); + + fn assert_send(_: &T) { } + assert_send(&voter); + + voters.push(voter); + } + + voters.for_each(|_| async move {}) +} + // run the voters to completion. provide a closure to be invoked after // the voters are spawned but before blocking on them. fn run_to_completion_with( @@ -288,22 +334,9 @@ fn run_to_completion_with( wait_for.push(f); }; - let mut keystore_paths = Vec::new(); - for (peer_id, key) in peers.iter().enumerate() { - let (keystore, keystore_path) = create_keystore(*key); - keystore_paths.push(keystore_path); - + for (peer_id, _) in peers.iter().enumerate() { let highest_finalized = highest_finalized.clone(); - let (client, net_service, link) = { - let net = net.lock(); - // temporary needed for some reason - let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); - ( - net.peers[peer_id].client().clone(), - net.peers[peer_id].network_service().clone(), - link, - ) - }; + let client = net.lock().peers[peer_id].client().clone(); wait_for.push( Box::pin( @@ -319,30 +352,6 @@ fn run_to_completion_with( .map(|_| ()) ) ); - - fn assert_send(_: &T) { } - - let grandpa_params = GrandpaParams { - config: Config { - gossip_duration: TEST_GOSSIP_DURATION, - justification_period: 32, - keystore: Some(keystore), - name: Some(format!("peer#{}", peer_id)), - is_authority: true, - observer_enabled: true, - }, - link: link, - network: net_service, - telemetry_on_connect: None, - voting_rule: (), - prometheus_registry: None, - shared_voter_state: SharedVoterState::empty(), - }; - let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network"); - - assert_send(&voter); - - runtime.spawn(voter); } // wait for all finalized on each. @@ -388,6 +397,7 @@ fn finalize_3_voters_no_observers() { let voters = make_ids(peers); let mut net = GrandpaTestNet::new(TestApi::new(voters), 3); + runtime.spawn(initialize_grandpa(&mut net, peers)); net.peer(0).push_blocks(20, false); net.block_until_sync(); @@ -414,50 +424,18 @@ fn finalize_3_voters_1_full_observer() { let voters = make_ids(peers); let mut net = GrandpaTestNet::new(TestApi::new(voters), 4); - net.peer(0).push_blocks(20, false); - net.block_until_sync(); + runtime.spawn(initialize_grandpa(&mut net, peers)); - let net = Arc::new(Mutex::new(net)); - let mut finality_notifications = Vec::new(); - - let all_peers = peers.iter() - .cloned() - .map(Some) - .chain(std::iter::once(None)); - - let mut keystore_paths = Vec::new(); - - let mut voters = Vec::new(); - - for (peer_id, local_key) in all_peers.enumerate() { - let (client, net_service, link) = { - let net = net.lock(); - let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); - ( - net.peers[peer_id].client().clone(), - net.peers[peer_id].network_service().clone(), - link, - ) - }; - finality_notifications.push( - client.finality_notification_stream() - .take_while(|n| future::ready(n.header.number() < &20)) - .for_each(move |_| future::ready(())) - ); - - let keystore = if let Some(local_key) = local_key { - let (keystore, keystore_path) = create_keystore(local_key); - keystore_paths.push(keystore_path); - Some(keystore) - } else { - None - }; + runtime.spawn({ + let peer_id = 3; + let net_service = net.peers[peer_id].network_service().clone(); + let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); let grandpa_params = GrandpaParams { config: Config { gossip_duration: TEST_GOSSIP_DURATION, justification_period: 32, - keystore, + keystore: None, name: Some(format!("peer#{}", peer_id)), is_authority: true, observer_enabled: true, @@ -470,11 +448,21 @@ fn finalize_3_voters_1_full_observer() { shared_voter_state: SharedVoterState::empty(), }; - voters.push(run_grandpa_voter(grandpa_params).expect("all in order with client and network")); - } + run_grandpa_voter(grandpa_params).expect("all in order with client and network") + }); - for voter in voters { - runtime.spawn(voter); + net.peer(0).push_blocks(20, false); + + let net = Arc::new(Mutex::new(net)); + let mut finality_notifications = Vec::new(); + + for peer_id in 0..4 { + let client = net.lock().peers[peer_id].client().clone(); + finality_notifications.push( + client.finality_notification_stream() + .take_while(|n| future::ready(n.header.number() < &20)) + .for_each(move |_| future::ready(())) + ); } // wait for all finalized on each. @@ -507,6 +495,13 @@ fn transition_3_voters_twice_1_full_observer() { let observer = &[Ed25519Keyring::One]; + let all_peers = peers_a.iter() + .chain(peers_b) + .chain(peers_c) + .chain(observer) + .cloned() + .collect::>(); // deduplicate + let genesis_voters = make_ids(peers_a); let api = TestApi::new(genesis_voters); @@ -514,6 +509,41 @@ fn transition_3_voters_twice_1_full_observer() { let mut runtime = Runtime::new().unwrap(); + let mut keystore_paths = Vec::new(); + let mut voters = Vec::new(); + for (peer_id, local_key) in all_peers.clone().into_iter().enumerate() { + let (keystore, keystore_path) = create_keystore(local_key); + keystore_paths.push(keystore_path); + + let (net_service, link) = { + let net = net.lock(); + let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); + ( + net.peers[peer_id].network_service().clone(), + link, + ) + }; + + let grandpa_params = GrandpaParams { + config: Config { + gossip_duration: TEST_GOSSIP_DURATION, + justification_period: 32, + keystore: Some(keystore), + name: Some(format!("peer#{}", peer_id)), + is_authority: true, + observer_enabled: true, + }, + link, + network: net_service, + telemetry_on_connect: None, + voting_rule: (), + prometheus_registry: None, + shared_voter_state: SharedVoterState::empty(), + }; + + voters.push(run_grandpa_voter(grandpa_params).expect("all in order with client and network")); + } + net.lock().peer(0).push_blocks(1, false); net.lock().block_until_sync(); @@ -579,30 +609,13 @@ fn transition_3_voters_twice_1_full_observer() { } let mut finality_notifications = Vec::new(); - let all_peers = peers_a.iter() - .chain(peers_b) - .chain(peers_c) - .chain(observer) - .cloned() - .collect::>() // deduplicate - .into_iter() - .enumerate(); - let mut keystore_paths = Vec::new(); - for (peer_id, local_key) in all_peers { - let (keystore, keystore_path) = create_keystore(local_key); - keystore_paths.push(keystore_path); - - let (client, net_service, link) = { - let net = net.lock(); - let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); - ( - net.peers[peer_id].client().clone(), - net.peers[peer_id].network_service().clone(), - link, - ) - }; + for voter in voters { + runtime.spawn(voter); + } + for (peer_id, _) in all_peers.into_iter().enumerate() { + let client = net.lock().peers[peer_id].client().clone(); finality_notifications.push( client.finality_notification_stream() .take_while(|n| future::ready(n.header.number() < &30)) @@ -615,26 +628,6 @@ fn transition_3_voters_twice_1_full_observer() { assert_eq!(set.pending_changes().count(), 0); }) ); - - let grandpa_params = GrandpaParams { - config: Config { - gossip_duration: TEST_GOSSIP_DURATION, - justification_period: 32, - keystore: Some(keystore), - name: Some(format!("peer#{}", peer_id)), - is_authority: true, - observer_enabled: true, - }, - link: link, - network: net_service, - telemetry_on_connect: None, - voting_rule: (), - prometheus_registry: None, - shared_voter_state: SharedVoterState::empty(), - }; - let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network"); - - runtime.spawn(voter); } // wait for all finalized on each. @@ -650,6 +643,7 @@ fn justification_is_generated_periodically() { let voters = make_ids(peers); let mut net = GrandpaTestNet::new(TestApi::new(voters), 3); + runtime.spawn(initialize_grandpa(&mut net, peers)); net.peer(0).push_blocks(32, false); net.block_until_sync(); @@ -673,6 +667,7 @@ fn sync_justifications_on_change_blocks() { // 4 peers, 3 of them are authorities and participate in grandpa let api = TestApi::new(voters); let mut net = GrandpaTestNet::new(api, 4); + let voters = initialize_grandpa(&mut net, peers_a); // add 20 blocks net.peer(0).push_blocks(20, false); @@ -697,6 +692,7 @@ fn sync_justifications_on_change_blocks() { } let net = Arc::new(Mutex::new(net)); + runtime.spawn(voters); run_to_completion(&mut runtime, 25, net.clone(), peers_a); // the first 3 peers are grandpa voters and therefore have already finalized @@ -734,6 +730,7 @@ fn finalizes_multiple_pending_changes_in_order() { // 6 peers, 3 of them are authorities and participate in grandpa from genesis let api = TestApi::new(genesis_voters); let mut net = GrandpaTestNet::new(api, 6); + runtime.spawn(initialize_grandpa(&mut net, all_peers)); // add 20 blocks net.peer(0).push_blocks(20, false); @@ -792,7 +789,8 @@ fn force_change_to_new_set() { let api = TestApi::new(make_ids(genesis_authorities)); let voters = make_ids(peers_a); - let net = GrandpaTestNet::new(api, 3); + let mut net = GrandpaTestNet::new(api, 3); + let voters_future = initialize_grandpa(&mut net, peers_a); let net = Arc::new(Mutex::new(net)); net.lock().peer(0).generate_blocks(1, BlockOrigin::File, |builder| { @@ -830,6 +828,7 @@ fn force_change_to_new_set() { // it will only finalize if the forced transition happens. // we add_blocks after the voters are spawned because otherwise // the link-halves have the wrong AuthoritySet + runtime.spawn(voters_future); run_to_completion(&mut runtime, 25, net, peers_a); } @@ -937,10 +936,10 @@ fn test_bad_justification() { fn voter_persists_its_votes() { use std::sync::atomic::{AtomicUsize, Ordering}; use futures::future; - use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver}; sp_tracing::try_init_simple(); let mut runtime = Runtime::new().unwrap(); + let mut keystore_paths = Vec::new(); // we have two authorities but we'll only be running the voter for alice // we are going to be listening for the prevotes it casts @@ -949,152 +948,150 @@ fn voter_persists_its_votes() { // alice has a chain with 20 blocks let mut net = GrandpaTestNet::new(TestApi::new(voters.clone()), 2); - net.peer(0).push_blocks(20, false); - net.block_until_sync(); - - assert_eq!(net.peer(0).client().info().best_number, 20, - "Peer #{} failed to sync", 0); - - - let peer = net.peer(0); - let client = peer.client().clone(); - let net = Arc::new(Mutex::new(net)); - - // channel between the voter and the main controller. - // sending a message on the `voter_tx` restarts the voter. - let (voter_tx, voter_rx) = tracing_unbounded::<()>(""); - - let mut keystore_paths = Vec::new(); - - // startup a grandpa voter for alice but also listen for messages on a - // channel. whenever a message is received the voter is restarted. when the - // sender is dropped the voter is stopped. - { - let (keystore, keystore_path) = create_keystore(peers[0]); - keystore_paths.push(keystore_path); - - struct ResettableVoter { - voter: Pin + Send + Unpin>>, - voter_rx: TracingUnboundedReceiver<()>, - net: Arc>, - client: PeersClient, - keystore: SyncCryptoStorePtr, - } - - impl Future for ResettableVoter { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = Pin::into_inner(self); - - if let Poll::Ready(()) = Pin::new(&mut this.voter).poll(cx) { - panic!("error in the voter"); - } - - match Pin::new(&mut this.voter_rx).poll_next(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(())) => { - let (_block_import, _, link) = - this.net.lock() - .make_block_import::< - TransactionFor - >(this.client.clone()); - let link = link.lock().take().unwrap(); - - let grandpa_params = GrandpaParams { - config: Config { - gossip_duration: TEST_GOSSIP_DURATION, - justification_period: 32, - keystore: Some(this.keystore.clone()), - name: Some(format!("peer#{}", 0)), - is_authority: true, - observer_enabled: true, - }, - link, - network: this.net.lock().peers[0].network_service().clone(), - telemetry_on_connect: None, - voting_rule: VotingRulesBuilder::default().build(), - prometheus_registry: None, - shared_voter_state: SharedVoterState::empty(), - }; - - let voter = run_grandpa_voter(grandpa_params) - .expect("all in order with client and network") - .map(move |r| { - // we need to keep the block_import alive since it owns the - // sender for the voter commands channel, if that gets dropped - // then the voter will stop - drop(_block_import); - r - }); - - this.voter = Box::pin(voter); - // notify current task in order to poll the voter - cx.waker().wake_by_ref(); - } - }; - - Poll::Pending - } - } - - // we create a "dummy" voter by setting it to `pending` and triggering the `tx`. - // this way, the `ResettableVoter` will reset its `voter` field to a value ASAP. - voter_tx.unbounded_send(()).unwrap(); - runtime.spawn(ResettableVoter { - voter: Box::pin(futures::future::pending()), - voter_rx, - net: net.clone(), - client: client.clone(), - keystore, - }); - } - - let (exit_tx, exit_rx) = futures::channel::oneshot::channel::<()>(); // create the communication layer for bob, but don't start any // voter. instead we'll listen for the prevote that alice casts // and cast our own manually - { + let bob_keystore = { let (keystore, keystore_path) = create_keystore(peers[1]); keystore_paths.push(keystore_path); - + keystore + }; + let bob_network = { let config = Config { gossip_duration: TEST_GOSSIP_DURATION, justification_period: 32, - keystore: Some(keystore.clone()), + keystore: Some(bob_keystore.clone()), name: Some(format!("peer#{}", 1)), is_authority: true, observer_enabled: true, }; let set_state = { - let (_, _, link) = net.lock() + let bob_client = net.peer(1).client().clone(); + let (_, _, link) = net .make_block_import::< TransactionFor - >(client); + >(bob_client); let LinkHalf { persistent_data, .. } = link.lock().take().unwrap(); let PersistentData { set_state, .. } = persistent_data; set_state }; - let network = communication::NetworkBridge::new( - net.lock().peers[1].network_service().clone(), + communication::NetworkBridge::new( + net.peers[1].network_service().clone(), config.clone(), set_state, None, - ); + ) + }; - let (round_rx, round_tx) = network.round_communication( - Some((peers[1].public().into(), keystore).into()), + // spawn two voters for alice. + // half-way through the test, we stop one and start the other. + let (alice_voter1, abort) = future::abortable({ + let (keystore, _) = create_keystore(peers[0]); + + let (net_service, link) = { + // temporary needed for some reason + let link = net.peers[0].data.lock().take().expect("link initialized at startup; qed"); + ( + net.peers[0].network_service().clone(), + link, + ) + }; + + let grandpa_params = GrandpaParams { + config: Config { + gossip_duration: TEST_GOSSIP_DURATION, + justification_period: 32, + keystore: Some(keystore), + name: Some(format!("peer#{}", 0)), + is_authority: true, + observer_enabled: true, + }, + link, + network: net_service, + telemetry_on_connect: None, + voting_rule: VotingRulesBuilder::default().build(), + prometheus_registry: None, + shared_voter_state: SharedVoterState::empty(), + }; + + run_grandpa_voter(grandpa_params).expect("all in order with client and network") + }); + + fn alice_voter2( + peers: &[Ed25519Keyring], + net: Arc>, + ) -> impl Future + Unpin + Send + 'static { + let (keystore, _) = create_keystore(peers[0]); + let mut net = net.lock(); + + // we add a new peer to the test network and we'll use + // the network service of this new peer + net.add_full_peer(); + let net_service = net.peers[2].network_service().clone(); + // but we'll reuse the client from the first peer (alice_voter1) + // since we want to share the same database, so that we can + // read the persisted state after aborting alice_voter1. + let alice_client = net.peer(0).client().clone(); + + let (_block_import, _, link) = net + .make_block_import::< + TransactionFor + >(alice_client); + let link = link.lock().take().unwrap(); + + let grandpa_params = GrandpaParams { + config: Config { + gossip_duration: TEST_GOSSIP_DURATION, + justification_period: 32, + keystore: Some(keystore), + name: Some(format!("peer#{}", 0)), + is_authority: true, + observer_enabled: true, + }, + link, + network: net_service, + telemetry_on_connect: None, + voting_rule: VotingRulesBuilder::default().build(), + prometheus_registry: None, + shared_voter_state: SharedVoterState::empty(), + }; + + run_grandpa_voter(grandpa_params) + .expect("all in order with client and network") + .map(move |r| { + // we need to keep the block_import alive since it owns the + // sender for the voter commands channel, if that gets dropped + // then the voter will stop + drop(_block_import); + r + }) + }; + + runtime.spawn(alice_voter1); + + net.peer(0).push_blocks(20, false); + net.block_until_sync(); + + assert_eq!(net.peer(0).client().info().best_number, 20, + "Peer #{} failed to sync", 0); + + let net = Arc::new(Mutex::new(net)); + + let (exit_tx, exit_rx) = futures::channel::oneshot::channel::<()>(); + + { + let (round_rx, round_tx) = bob_network.round_communication( + Some((peers[1].public().into(), bob_keystore).into()), communication::Round(1), communication::SetId(0), Arc::new(VoterSet::new(voters).unwrap()), HasVoted::No, ); - runtime.spawn(network); + runtime.spawn(bob_network); let round_tx = Arc::new(Mutex::new(round_tx)); let exit_tx = Arc::new(Mutex::new(Some(exit_tx))); @@ -1102,13 +1099,15 @@ fn voter_persists_its_votes() { let net = net.clone(); let state = Arc::new(AtomicUsize::new(0)); + let runtime_handle = runtime.handle().clone(); runtime.spawn(round_rx.for_each(move |signed| { let net2 = net.clone(); let net = net.clone(); - let voter_tx = voter_tx.clone(); + let abort = abort.clone(); let round_tx = round_tx.clone(); let state = state.clone(); let exit_tx = exit_tx.clone(); + let runtime_handle = runtime_handle.clone(); async move { if state.compare_and_swap(0, 1, Ordering::SeqCst) == 0 { @@ -1120,7 +1119,7 @@ fn voter_persists_its_votes() { // its chain has 20 blocks and the voter targets 3/4 of the // unfinalized chain, so the vote should be for block 15 - assert!(prevote.target_number == 15); + assert_eq!(prevote.target_number, 15); // we push 20 more blocks to alice's chain net.lock().peer(0).push_blocks(20, false); @@ -1143,7 +1142,8 @@ fn voter_persists_its_votes() { net.lock().peer(0).client().as_full().unwrap().hash(30).unwrap().unwrap(); // we restart alice's voter - voter_tx.unbounded_send(()).unwrap(); + abort.abort(); + runtime_handle.spawn(alice_voter2(peers, net.clone())); // and we push our own prevote for block 30 let prevote = finality_grandpa::Prevote { @@ -1200,6 +1200,19 @@ fn finalize_3_voters_1_light_observer() { let voters = make_ids(authorities); let mut net = GrandpaTestNet::new(TestApi::new(voters), 4); + let voters = initialize_grandpa(&mut net, authorities); + let observer = observer::run_grandpa_observer( + Config { + gossip_duration: TEST_GOSSIP_DURATION, + justification_period: 32, + keystore: None, + name: Some("observer".to_string()), + is_authority: false, + observer_enabled: true, + }, + net.peers[3].data.lock().take().expect("link initialized at startup; qed"), + net.peers[3].network_service().clone(), + ).unwrap(); net.peer(0).push_blocks(20, false); net.block_until_sync(); @@ -1209,32 +1222,10 @@ fn finalize_3_voters_1_light_observer() { } let net = Arc::new(Mutex::new(net)); - let link = net.lock().peer(3).data.lock().take().expect("link initialized on startup; qed"); - let finality_notifications = net.lock().peer(3).client().finality_notification_stream() - .take_while(|n| { - future::ready(n.header.number() < &20) - }) - .collect::>(); - - run_to_completion_with(&mut runtime, 20, net.clone(), authorities, |executor| { - executor.spawn( - observer::run_grandpa_observer( - Config { - gossip_duration: TEST_GOSSIP_DURATION, - justification_period: 32, - keystore: None, - name: Some("observer".to_string()), - is_authority: false, - observer_enabled: true, - }, - link, - net.lock().peers[3].network_service().clone(), - ).unwrap() - ); - - Some(Box::pin(finality_notifications.map(|_| ()))) - }); + runtime.spawn(voters); + runtime.spawn(observer); + run_to_completion(&mut runtime, 20, net.clone(), authorities); } #[test] @@ -1245,9 +1236,7 @@ fn voter_catches_up_to_latest_round_when_behind() { let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob]; let voters = make_ids(peers); - let mut net = GrandpaTestNet::new(TestApi::new(voters), 3); - net.peer(0).push_blocks(50, false); - net.block_until_sync(); + let net = GrandpaTestNet::new(TestApi::new(voters), 2); let net = Arc::new(Mutex::new(net)); let mut finality_notifications = Vec::new(); @@ -1300,6 +1289,9 @@ fn voter_catches_up_to_latest_round_when_behind() { runtime.spawn(voter); } + net.lock().peer(0).push_blocks(50, false); + net.lock().block_until_sync(); + // wait for them to finalize block 50. since they'll vote on 3/4 of the // unfinalized chain it will take at least 4 rounds to do it. let wait_for_finality = ::futures::future::join_all(finality_notifications); @@ -1311,18 +1303,15 @@ fn voter_catches_up_to_latest_round_when_behind() { let runtime = runtime.handle().clone(); wait_for_finality.then(move |_| { - let peer_id = 2; + net.lock().add_full_peer(); + let link = { let net = net.lock(); - let mut link = net.peers[peer_id].data.lock(); + let mut link = net.peers[2].data.lock(); link.take().expect("link initialized at startup; qed") }; - let set_state = link.persistent_data.set_state.clone(); - - let voter = voter(None, peer_id, link, net); - - runtime.spawn(voter); + runtime.spawn(voter(None, 2, link, net.clone())); let start_time = std::time::Instant::now(); let timeout = Duration::from_secs(5 * 60); diff --git a/substrate/client/network-gossip/src/bridge.rs b/substrate/client/network-gossip/src/bridge.rs index 98ada69590..4deaad6d74 100644 --- a/substrate/client/network-gossip/src/bridge.rs +++ b/substrate/client/network-gossip/src/bridge.rs @@ -72,11 +72,7 @@ impl GossipEngine { validator: Arc>, ) -> Self where B: 'static { let protocol = protocol.into(); - - // We grab the event stream before registering the notifications protocol, otherwise we - // might miss events. let network_event_stream = network.event_stream(); - network.register_notifications_protocol(protocol.clone()); GossipEngine { state_machine: ConsensusGossip::new(validator, protocol.clone()), @@ -335,8 +331,6 @@ mod tests { unimplemented!(); } - fn register_notifications_protocol(&self, _: Cow<'static, str>) {} - fn announce(&self, _: B::Hash, _: Vec) { unimplemented!(); } diff --git a/substrate/client/network-gossip/src/lib.rs b/substrate/client/network-gossip/src/lib.rs index 09e946d1a1..2b33361022 100644 --- a/substrate/client/network-gossip/src/lib.rs +++ b/substrate/client/network-gossip/src/lib.rs @@ -81,14 +81,6 @@ pub trait Network { /// Send a notification to a peer. fn write_notification(&self, who: PeerId, protocol: Cow<'static, str>, message: Vec); - /// Registers a notifications protocol. - /// - /// See the documentation of [`NetworkService:register_notifications_protocol`] for more information. - fn register_notifications_protocol( - &self, - protocol: Cow<'static, str>, - ); - /// Notify everyone we're connected to that we have the given block. /// /// Note: this method isn't strictly related to gossiping and should eventually be moved @@ -113,13 +105,6 @@ impl Network for Arc> { NetworkService::write_notification(self, who, protocol, message) } - fn register_notifications_protocol( - &self, - protocol: Cow<'static, str>, - ) { - NetworkService::register_notifications_protocol(self, protocol) - } - fn announce(&self, block: B::Hash, associated_data: Vec) { NetworkService::announce_block(self, block, associated_data) } diff --git a/substrate/client/network-gossip/src/state_machine.rs b/substrate/client/network-gossip/src/state_machine.rs index 8bd6d9df01..88f9d48375 100644 --- a/substrate/client/network-gossip/src/state_machine.rs +++ b/substrate/client/network-gossip/src/state_machine.rs @@ -503,8 +503,6 @@ mod tests { unimplemented!(); } - fn register_notifications_protocol(&self, _: Cow<'static, str>) {} - fn announce(&self, _: B::Hash, _: Vec) { unimplemented!(); } diff --git a/substrate/client/network/README.md b/substrate/client/network/README.md index e0bd691043..914720f53e 100644 --- a/substrate/client/network/README.md +++ b/substrate/client/network/README.md @@ -120,8 +120,8 @@ bytes. block announces are pushed to other nodes. The handshake is empty on both sides. The message format is a SCALE-encoded tuple containing a block header followed with an opaque list of bytes containing some data associated with this block announcement, e.g. a candidate message. -- Notifications protocols that are registered using the `register_notifications_protocol` -method. For example: `/paritytech/grandpa/1`. See below for more information. +- Notifications protocols that are registered using `NetworkConfiguration::notifications_protocols`. +For example: `/paritytech/grandpa/1`. See below for more information. ## The legacy Substrate substream @@ -223,4 +223,4 @@ dispatching a background task with the [`NetworkWorker`]. More precise usage details are still being worked on and will likely change in the future. -License: GPL-3.0-or-later WITH Classpath-exception-2.0 \ No newline at end of file +License: GPL-3.0-or-later WITH Classpath-exception-2.0 diff --git a/substrate/client/network/src/lib.rs b/substrate/client/network/src/lib.rs index 91ea49bce7..fb65c754d7 100644 --- a/substrate/client/network/src/lib.rs +++ b/substrate/client/network/src/lib.rs @@ -141,8 +141,9 @@ //! block announces are pushed to other nodes. The handshake is empty on both sides. The message //! format is a SCALE-encoded tuple containing a block header followed with an opaque list of //! bytes containing some data associated with this block announcement, e.g. a candidate message. -//! - Notifications protocols that are registered using the `register_notifications_protocol` -//! method. For example: `/paritytech/grandpa/1`. See below for more information. +//! - Notifications protocols that are registered using +//! `NetworkConfiguration::notifications_protocols`. For example: `/paritytech/grandpa/1`. See +//! below for more information. //! //! ## The legacy Substrate substream //! diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 8c0dbd7eec..b6f162affd 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -656,7 +656,7 @@ impl NetworkService { /// > between the remote voluntarily closing a substream or a network error /// > preventing the message from being delivered. /// - /// The protocol must have been registered with `register_notifications_protocol` or + /// The protocol must have been registered with /// [`NetworkConfiguration::notifications_protocols`](crate::config::NetworkConfiguration::notifications_protocols). /// pub fn write_notification(&self, target: PeerId, protocol: Cow<'static, str>, message: Vec) { @@ -717,7 +717,7 @@ impl NetworkService { /// return an error. It is however possible for the entire connection to be abruptly closed, /// in which case enqueued notifications will be lost. /// - /// The protocol must have been registered with `register_notifications_protocol` or + /// The protocol must have been registered with /// [`NetworkConfiguration::notifications_protocols`](crate::config::NetworkConfiguration::notifications_protocols). /// /// # Usage @@ -844,28 +844,6 @@ impl NetworkService { } } - /// Registers a new notifications protocol. - /// - /// After a protocol has been registered, you can call `write_notifications`. - /// - /// **Important**: This method is a work-around, and you are instead strongly encouraged to - /// pass the protocol in the `NetworkConfiguration::notifications_protocols` list instead. - /// If you have no other choice but to use this method, you are very strongly encouraged to - /// call it very early on. Any connection open will retain the protocols that were registered - /// then, and not any new one. - /// - /// Please call `event_stream` before registering a protocol, otherwise you may miss events - /// about the protocol that you have registered. - // TODO: remove this method after https://github.com/paritytech/substrate/issues/6827 - pub fn register_notifications_protocol( - &self, - protocol_name: impl Into>, - ) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::RegisterNotifProtocol { - protocol_name: protocol_name.into(), - }); - } - /// You may call this when new transactons are imported by the transaction pool. /// /// All transactions will be fetched from the `TransactionPool` that was passed at @@ -1222,9 +1200,6 @@ enum ServiceToWorkerMsg { request: Vec, pending_response: oneshot::Sender, RequestFailure>>, }, - RegisterNotifProtocol { - protocol_name: Cow<'static, str>, - }, DisconnectPeer(PeerId), NewBestBlockImported(B::Hash, NumberFor), } @@ -1359,8 +1334,6 @@ impl Future for NetworkWorker { }, } }, - ServiceToWorkerMsg::RegisterNotifProtocol { protocol_name } => - this.network_service.register_notifications_protocol(protocol_name), ServiceToWorkerMsg::DisconnectPeer(who) => this.network_service.user_protocol_mut().disconnect_peer(&who), ServiceToWorkerMsg::NewBestBlockImported(hash, number) =>