// Copyright 2019-2020 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . use crate::{ config::{ProtocolId, Role}, block_requests, light_client_handler, finality_requests, debug_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, protocol::{message::{self, Roles}, CustomMessageOutcome, Protocol}, Event, ObservedRole, DhtEvent, ExHashT, }; use codec::Encode as _; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; use libp2p::kad::record; use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; use log::debug; use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}}; use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId, Justification}; use std::{borrow::Cow, iter, task::{Context, Poll}, time::Duration}; /// General behaviour of the network. Combines all protocols together. #[derive(NetworkBehaviour)] #[behaviour(out_event = "BehaviourOut", poll_method = "poll")] pub struct Behaviour { /// All the substrate-specific protocols. substrate: Protocol, /// Periodically pings and identifies the nodes we are connected to, and store information in a /// cache. debug_info: debug_info::DebugInfoBehaviour, /// Discovers nodes of the network. discovery: DiscoveryBehaviour, /// Block request handling. block_requests: block_requests::BlockRequests, /// Finality proof request handling. finality_proof_requests: finality_requests::FinalityProofRequests, /// Light client request handling. light_client_handler: light_client_handler::LightClientHandler, /// Queue of events to produce for the outside. #[behaviour(ignore)] events: Vec>, /// Role of our local node, as originally passed from the configuration. #[behaviour(ignore)] role: Role, } /// Event generated by `Behaviour`. pub enum BehaviourOut { BlockImport(BlockOrigin, Vec>), JustificationImport(Origin, B::Hash, NumberFor, Justification), FinalityProofImport(Origin, B::Hash, NumberFor, Vec), /// Started a random iterative Kademlia discovery query. RandomKademliaStarted(ProtocolId), /// We have received a request from a peer and answered it. AnsweredRequest { /// Peer which sent us a request. peer: PeerId, /// Protocol name of the request. protocol: Vec, /// Time it took to build the response. build_time: Duration, }, /// Started a new request with the given node. RequestStarted { peer: PeerId, /// Protocol name of the request. protocol: Vec, }, /// Finished, successfully or not, a previously-started request. RequestFinished { /// Who we were requesting. peer: PeerId, /// Protocol name of the request. protocol: Vec, /// How long before the response came or the request got cancelled. request_duration: Duration, }, /// Any event represented by the [`Event`] enum. /// /// > **Note**: The [`Event`] enum contains the events that are available through the public /// > API of the library. Event(Event), } impl Behaviour { /// Builds a new `Behaviour`. pub fn new( substrate: Protocol, role: Role, user_agent: String, local_public_key: PublicKey, block_requests: block_requests::BlockRequests, finality_proof_requests: finality_requests::FinalityProofRequests, light_client_handler: light_client_handler::LightClientHandler, disco_config: DiscoveryConfig, ) -> Self { Behaviour { substrate, debug_info: debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()), discovery: disco_config.finish(), block_requests, finality_proof_requests, light_client_handler, events: Vec::new(), role, } } /// Returns the list of nodes that we know exist in the network. pub fn known_peers(&mut self) -> impl Iterator { self.discovery.known_peers() } /// Adds a hard-coded address for the given peer, that never expires. pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) { self.discovery.add_known_address(peer_id, addr) } /// Returns the number of nodes that are in the Kademlia k-buckets. pub fn num_kbuckets_entries(&mut self) -> impl ExactSizeIterator { self.discovery.num_kbuckets_entries() } /// Returns the number of records in the Kademlia record stores. pub fn num_kademlia_records(&mut self) -> impl ExactSizeIterator { self.discovery.num_kademlia_records() } /// Returns the total size in bytes of all the records in the Kademlia record stores. pub fn kademlia_records_total_size(&mut self) -> impl ExactSizeIterator { self.discovery.kademlia_records_total_size() } /// Borrows `self` and returns a struct giving access to the information about a node. /// /// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes /// we're connected to, meaning that if `None` is returned then we're not connected to that /// node. pub fn node(&self, peer_id: &PeerId) -> Option { self.debug_info.node(peer_id) } /// Registers a new notifications protocol. /// /// After that, you can call `write_notifications`. /// /// Please call `event_stream` before registering a protocol, otherwise you may miss events /// about the protocol that you have registered. /// /// You are very strongly encouraged to call this method very early on. Any connection open /// will retain the protocols that were registered then, and not any new one. pub fn register_notifications_protocol( &mut self, engine_id: ConsensusEngineId, protocol_name: impl Into>, ) { // This is the message that we will send to the remote as part of the initial handshake. // At the moment, we force this to be an encoded `Roles`. let handshake_message = Roles::from(&self.role).encode(); let list = self.substrate.register_notifications_protocol(engine_id, protocol_name, handshake_message); for (remote, roles) in list { let role = reported_roles_to_observed_role(&self.role, remote, roles); let ev = Event::NotificationStreamOpened { remote: remote.clone(), engine_id, role, }; self.events.push(BehaviourOut::Event(ev)); } } /// Returns a shared reference to the user protocol. pub fn user_protocol(&self) -> &Protocol { &self.substrate } /// Returns a mutable reference to the user protocol. pub fn user_protocol_mut(&mut self) -> &mut Protocol { &mut self.substrate } /// Start querying a record from the DHT. Will later produce either a `ValueFound` or a `ValueNotFound` event. pub fn get_value(&mut self, key: &record::Key) { self.discovery.get_value(key); } /// Starts putting a record into DHT. Will later produce either a `ValuePut` or a `ValuePutFailed` event. pub fn put_value(&mut self, key: record::Key, value: Vec) { self.discovery.put_value(key, value); } /// Issue a light client request. pub fn light_client_request(&mut self, r: light_client_handler::Request) -> Result<(), light_client_handler::Error> { self.light_client_handler.request(r) } } fn reported_roles_to_observed_role(local_role: &Role, remote: &PeerId, roles: Roles) -> ObservedRole { if roles.is_authority() { match local_role { Role::Authority { sentry_nodes } if sentry_nodes.iter().any(|s| s.peer_id == *remote) => ObservedRole::OurSentry, Role::Sentry { validators } if validators.iter().any(|s| s.peer_id == *remote) => ObservedRole::OurGuardedAuthority, _ => ObservedRole::Authority } } else if roles.is_full() { ObservedRole::Full } else { ObservedRole::Light } } impl NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: void::Void) { void::unreachable(event) } } impl NetworkBehaviourEventProcess> for Behaviour { fn inject_event(&mut self, event: CustomMessageOutcome) { match event { CustomMessageOutcome::BlockImport(origin, blocks) => self.events.push(BehaviourOut::BlockImport(origin, blocks)), CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) => self.events.push(BehaviourOut::JustificationImport(origin, hash, nb, justification)), CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) => self.events.push(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)), CustomMessageOutcome::BlockRequest { target, request } => { match self.block_requests.send_request(&target, request) { block_requests::SendRequestOutcome::Ok => { self.events.push(BehaviourOut::RequestStarted { peer: target, protocol: self.block_requests.protocol_name().to_vec(), }); }, block_requests::SendRequestOutcome::Replaced { request_duration, .. } => { self.events.push(BehaviourOut::RequestFinished { peer: target.clone(), protocol: self.block_requests.protocol_name().to_vec(), request_duration, }); self.events.push(BehaviourOut::RequestStarted { peer: target, protocol: self.block_requests.protocol_name().to_vec(), }); } block_requests::SendRequestOutcome::NotConnected | block_requests::SendRequestOutcome::EncodeError(_) => {}, } }, CustomMessageOutcome::FinalityProofRequest { target, block_hash, request } => { self.finality_proof_requests.send_request(&target, block_hash, request); }, CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles } => { let role = reported_roles_to_observed_role(&self.role, &remote, roles); for engine_id in protocols { self.events.push(BehaviourOut::Event(Event::NotificationStreamOpened { remote: remote.clone(), engine_id, role: role.clone(), })); } }, CustomMessageOutcome::NotificationStreamClosed { remote, protocols } => for engine_id in protocols { self.events.push(BehaviourOut::Event(Event::NotificationStreamClosed { remote: remote.clone(), engine_id, })); }, CustomMessageOutcome::NotificationsReceived { remote, messages } => { let ev = Event::NotificationsReceived { remote, messages }; self.events.push(BehaviourOut::Event(ev)); }, CustomMessageOutcome::PeerNewBest(peer_id, number) => { self.light_client_handler.update_best_block(&peer_id, number); } CustomMessageOutcome::None => {} } } } impl NetworkBehaviourEventProcess> for Behaviour { fn inject_event(&mut self, event: block_requests::Event) { match event { block_requests::Event::AnsweredRequest { peer, total_handling_time } => { self.events.push(BehaviourOut::AnsweredRequest { peer, protocol: self.block_requests.protocol_name().to_vec(), build_time: total_handling_time, }); }, block_requests::Event::Response { peer, original_request, response, request_duration } => { self.events.push(BehaviourOut::RequestFinished { peer: peer.clone(), protocol: self.block_requests.protocol_name().to_vec(), request_duration, }); let ev = self.substrate.on_block_response(peer, original_request, response); self.inject_event(ev); } block_requests::Event::RequestCancelled { peer, request_duration, .. } => { // There doesn't exist any mechanism to report cancellations yet. // We would normally disconnect the node, but this event happens as the result of // a disconnect, so there's nothing more to do. self.events.push(BehaviourOut::RequestFinished { peer, protocol: self.block_requests.protocol_name().to_vec(), request_duration, }); } block_requests::Event::RequestTimeout { peer, request_duration, .. } => { // There doesn't exist any mechanism to report timeouts yet, so we process them by // disconnecting the node. self.events.push(BehaviourOut::RequestFinished { peer: peer.clone(), protocol: self.block_requests.protocol_name().to_vec(), request_duration, }); self.substrate.disconnect_peer(&peer); } } } } impl NetworkBehaviourEventProcess> for Behaviour { fn inject_event(&mut self, event: finality_requests::Event) { match event { finality_requests::Event::Response { peer, block_hash, proof } => { let response = message::FinalityProofResponse { id: 0, block: block_hash, proof: if !proof.is_empty() { Some(proof) } else { None }, }; let ev = self.substrate.on_finality_proof_response(peer, response); self.inject_event(ev); } } } } impl NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: debug_info::DebugInfoEvent) { let debug_info::DebugInfoEvent::Identified { peer_id, mut info } = event; if info.listen_addrs.len() > 30 { debug!(target: "sub-libp2p", "Node {:?} has reported more than 30 addresses; \ it is identified by {:?} and {:?}", peer_id, info.protocol_version, info.agent_version ); info.listen_addrs.truncate(30); } for addr in &info.listen_addrs { self.discovery.add_self_reported_address(&peer_id, addr.clone()); } self.substrate.add_discovered_nodes(iter::once(peer_id.clone())); } } impl NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, out: DiscoveryOut) { match out { DiscoveryOut::UnroutablePeer(_peer_id) => { // Obtaining and reporting listen addresses for unroutable peers back // to Kademlia is handled by the `Identify` protocol, part of the // `DebugInfoBehaviour`. See the `NetworkBehaviourEventProcess` // implementation for `DebugInfoEvent`. } DiscoveryOut::Discovered(peer_id) => { self.substrate.add_discovered_nodes(iter::once(peer_id)); } DiscoveryOut::ValueFound(results) => { self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValueFound(results)))); } DiscoveryOut::ValueNotFound(key) => { self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValueNotFound(key)))); } DiscoveryOut::ValuePut(key) => { self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePut(key)))); } DiscoveryOut::ValuePutFailed(key) => { self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePutFailed(key)))); } DiscoveryOut::RandomKademliaStarted(protocols) => { for protocol in protocols { self.events.push(BehaviourOut::RandomKademliaStarted(protocol)); } } } } } impl Behaviour { fn poll(&mut self, _: &mut Context, _: &mut impl PollParameters) -> Poll>> { if !self.events.is_empty() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))) } Poll::Pending } }