diff --git a/substrate/core/network/src/behaviour.rs b/substrate/core/network/src/behaviour.rs index 74d6b56fa8..2899234938 100644 --- a/substrate/core/network/src/behaviour.rs +++ b/substrate/core/network/src/behaviour.rs @@ -14,12 +14,15 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::{debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour}; +use crate::{ + debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour, event::DhtEvent +}; use futures::prelude::*; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, protocols_handler::IntoProtocolsHandler, PublicKey}; use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters}; +use libp2p::multihash::Multihash; #[cfg(not(target_os = "unknown"))] use libp2p::core::swarm::toggle::Toggle; #[cfg(not(target_os = "unknown"))] @@ -30,7 +33,7 @@ use void; /// General behaviour of the network. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "TBehaviourEv", poll_method = "poll")] +#[behaviour(out_event = "BehaviourOut", poll_method = "poll")] pub struct Behaviour { /// Main protocol that handles everything except the discovery and the technicalities. user_protocol: UserBehaviourWrap, @@ -45,7 +48,13 @@ pub struct Behaviour { /// Queue of events to produce for the outside. #[behaviour(ignore)] - events: Vec, + events: Vec>, +} + +/// A wrapper for the behavbour event that adds DHT-related event variant. +pub enum BehaviourOut { + Behaviour(TBehaviourEv), + Dht(DhtEvent), } impl Behaviour { @@ -112,6 +121,16 @@ impl Behaviour &mut TBehaviour { &mut self.user_protocol.0 } + + /// Start querying a record from the DHT. Will later produce either a `ValueFound` or a `ValueNotFound` event. + pub fn get_value(&mut self, key: &Multihash) { + self.discovery.get_value(key); + } + + /// Starts putting a record into DHT. Will later produce either a `ValuePut` or a `ValuePutFailed` event. + pub fn put_value(&mut self, key: Multihash, value: Vec) { + self.discovery.put_value(key, value); + } } impl NetworkBehaviourEventProcess for @@ -124,7 +143,7 @@ Behaviour { impl NetworkBehaviourEventProcess> for Behaviour { fn inject_event(&mut self, event: UserEventWrap) { - self.events.push(event.0); + self.events.push(BehaviourOut::Behaviour(event.0)); } } @@ -158,6 +177,18 @@ impl NetworkBehaviourEventProcess { self.user_protocol.0.add_discovered_nodes(iter::once(peer_id)); } + DiscoveryOut::ValueFound(results) => { + self.events.push(BehaviourOut::Dht(DhtEvent::ValueFound(results))); + } + DiscoveryOut::ValueNotFound(key) => { + self.events.push(BehaviourOut::Dht(DhtEvent::ValueNotFound(key))); + } + DiscoveryOut::ValuePut(key) => { + self.events.push(BehaviourOut::Dht(DhtEvent::ValuePut(key))); + } + DiscoveryOut::ValuePutFailed(key) => { + self.events.push(BehaviourOut::Dht(DhtEvent::ValuePutFailed(key))); + } } } } @@ -177,7 +208,7 @@ impl NetworkBehaviourEventProcess Behaviour { - fn poll(&mut self) -> Async> { + fn poll(&mut self) -> Async>> { if !self.events.is_empty() { return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))) } diff --git a/substrate/core/network/src/discovery.rs b/substrate/core/network/src/discovery.rs index 75cf8b1e2f..c9c06e4251 100644 --- a/substrate/core/network/src/discovery.rs +++ b/substrate/core/network/src/discovery.rs @@ -18,10 +18,11 @@ use futures::prelude::*; use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey}; use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; use libp2p::core::swarm::PollParameters; -use libp2p::kad::{Kademlia, KademliaOut}; +use libp2p::kad::{GetValueResult, Kademlia, KademliaOut, PutValueResult}; +use libp2p::multihash::Multihash; use libp2p::multiaddr::Protocol; use log::{debug, info, trace, warn}; -use std::{cmp, time::Duration}; +use std::{cmp, num::NonZeroU8, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::{Delay, clock::Clock}; @@ -81,12 +82,35 @@ impl DiscoveryBehaviour { pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) { self.kademlia.add_address(peer_id, addr); } + + /// Get a record from the DHT. + pub fn get_value(&mut self, key: &Multihash) { + self.kademlia.get_value(key, NonZeroU8::new(10) + .expect("Casting 10 to NonZeroU8 should succeed; qed")); + } + + /// Put a record into the DHT. + pub fn put_value(&mut self, key: Multihash, value: Vec) { + self.kademlia.put_value(key, value); + } } /// Event generated by the `DiscoveryBehaviour`. pub enum DiscoveryOut { /// We have discovered a node. Can be called multiple times with the same identity. Discovered(PeerId), + + /// The DHT yeided results for the record request, grouped in (key, value) pairs. + ValueFound(Vec<(Multihash, Vec)>), + + /// The record requested was not found in the DHT. + ValueNotFound(Multihash), + + /// The record with a given key was successfully inserted into the DHT. + ValuePut(Multihash), + + /// Inserting a value into the DHT failed. + ValuePutFailed(Multihash), } impl NetworkBehaviour for DiscoveryBehaviour @@ -175,10 +199,35 @@ where results"); } } + KademliaOut::GetValueResult(res) => { + let ev = match res { + GetValueResult::Found { results } => { + let results = results + .into_iter() + .map(|r| (r.key, r.value)) + .collect(); + + DiscoveryOut::ValueFound(results) + } + GetValueResult::NotFound { key, .. } => { + DiscoveryOut::ValueNotFound(key) + } + }; + return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)); + } + KademliaOut::PutValueResult(res) => { + let ev = match res { + PutValueResult::Ok{ key, .. } => { + DiscoveryOut::ValuePut(key) + } + PutValueResult::Err { key, .. } => { + DiscoveryOut::ValuePutFailed(key) + } + }; + return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)); + } // We never start any other type of query. KademliaOut::GetProvidersResult { .. } => {} - KademliaOut::GetValueResult(_) => {} - KademliaOut::PutValueResult(_) => {} } }, Async::Ready(NetworkBehaviourAction::DialAddress { address }) => diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index caf4ca20ee..9b3d86322b 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -185,12 +185,13 @@ pub use service::{ NetworkMsg, ExHashT, ReportHandle, }; pub use config::{NodeKeyConfig, Secret, Secp256k1Secret, Ed25519Secret}; -pub use protocol::{PeerInfo, Context, consensus_gossip, message, specialization}; +pub use protocol::{PeerInfo, Context, consensus_gossip, event, message, specialization}; pub use protocol::sync::SyncState; pub use libp2p::{Multiaddr, multiaddr, build_multiaddr}; pub use libp2p::{identity, PeerId, core::PublicKey, wasm_ext::ExtTransport}; pub use message::{generic as generic_message, RequestId, Status as StatusMessage}; +pub use event::Event; pub use error::Error; pub use protocol::on_demand::AlwaysBadChecker; pub use on_demand_layer::{OnDemand, RemoteResponse}; diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 3038898b09..61f79225cd 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -30,6 +30,7 @@ use message::{ }; use message::{BlockAttributes, Direction, FromBlock, RequestId}; use message::generic::{Message as GenericMessage, ConsensusMessage}; +use event::Event; use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use on_demand::{OnDemandCore, OnDemandNetwork, RequestData}; use specialization::NetworkSpecialization; @@ -49,6 +50,7 @@ use util::LruHashSet; mod util; pub mod consensus_gossip; pub mod message; +pub mod event; pub mod on_demand; pub mod specialization; pub mod sync; @@ -497,6 +499,10 @@ impl, H: ExHashT> Protocol { self.context_data.peers.iter().map(|(id, peer)| (id, &peer.info)) } + pub fn on_event(&mut self, event: Event) { + self.specialization.on_event(event); + } + pub fn on_custom_message( &mut self, network_out: &mut dyn NetworkOut, diff --git a/substrate/core/network/src/protocol/event.rs b/substrate/core/network/src/protocol/event.rs new file mode 100644 index 0000000000..2edbb0fbf7 --- /dev/null +++ b/substrate/core/network/src/protocol/event.rs @@ -0,0 +1,41 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Network event types. These are are not the part of the protocol, but rather +//! events that happen on the network like DHT get/put results received. + +use libp2p::multihash::Multihash; + +/// Events generated by DHT as a response to get_value and put_value requests. +pub enum DhtEvent { + /// The value was found. + ValueFound(Vec<(Multihash, Vec)>), + + /// The requested record has not been found in the DHT. + ValueNotFound(Multihash), + + /// The record has been successfully inserted into the DHT. + ValuePut(Multihash), + + /// An error has occured while putting a record into the DHT. + ValuePutFailed(Multihash), +} + +/// Type for events generated by networking layer. +pub enum Event { + /// Event generated by a DHT. + Dht(DhtEvent), +} diff --git a/substrate/core/network/src/protocol/specialization.rs b/substrate/core/network/src/protocol/specialization.rs index 41b10bf707..0078e66522 100644 --- a/substrate/core/network/src/protocol/specialization.rs +++ b/substrate/core/network/src/protocol/specialization.rs @@ -39,6 +39,12 @@ pub trait NetworkSpecialization: Send + Sync + 'static { message: &mut Option> ); + /// Called when a network-specific event arrives. + fn on_event( + &mut self, + event: crate::protocol::event::Event + ); + /// Called on abort. #[deprecated(note = "This method is never called; aborting corresponds to dropping the object")] fn on_abort(&mut self) { } @@ -130,6 +136,13 @@ macro_rules! construct_simple_protocol { $( self.$sub_protocol_name.on_message(_ctx, _who, _message); )* } + fn on_event( + &mut self, + _event: $crate::event::Event + ) { + $( self.$sub_protocol_name.on_event(_event); )* + } + fn on_abort(&mut self) { $( self.$sub_protocol_name.on_abort(); )* } diff --git a/substrate/core/network/src/protocol_behaviour.rs b/substrate/core/network/src/protocol_behaviour.rs index cef009feaa..70fb487055 100644 --- a/substrate/core/network/src/protocol_behaviour.rs +++ b/substrate/core/network/src/protocol_behaviour.rs @@ -19,7 +19,7 @@ use crate::{ExHashT, DiscoveryNetBehaviour, ProtocolId}; use crate::custom_proto::{CustomProto, CustomProtoOut}; use crate::chain::{Client, FinalityProofProvider}; -use crate::protocol::{self, CustomMessageOutcome, Protocol, ProtocolConfig, sync::SyncState}; +use crate::protocol::{self, event::Event, CustomMessageOutcome, Protocol, ProtocolConfig, sync::SyncState}; use crate::protocol::{PeerInfo, NetworkOut, message::Message, on_demand::RequestData}; use crate::protocol::consensus_gossip::MessageRecipient as GossipMessageRecipient; use crate::protocol::specialization::NetworkSpecialization; @@ -276,6 +276,11 @@ impl, H: ExHashT> ProtocolBehaviour, H: ExHashT> NetworkServic pub fn is_major_syncing(&self) -> bool { self.is_major_syncing.load(Ordering::Relaxed) } + + /// Get a value. + pub fn get_value(&mut self, key: &Multihash) { + self.network.lock().get_value(key); + } + + /// Put a value. + pub fn put_value(&mut self, key: Multihash, value: Vec) { + self.network.lock().put_value(key, value); + } } impl, H: ExHashT> NetworkService { @@ -744,7 +755,12 @@ impl, H: ExHashT> Future for Ne let outcome = match poll_value { Ok(Async::NotReady) => break, - Ok(Async::Ready(Some(outcome))) => outcome, + Ok(Async::Ready(Some(BehaviourOut::Behaviour(outcome)))) => outcome, + Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => { + network_service.user_protocol_mut() + .on_event(Event::Dht(ev)); + CustomMessageOutcome::None + }, Ok(Async::Ready(None)) => CustomMessageOutcome::None, Err(err) => { error!(target: "sync", "Error in the network: {:?}", err); diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 58d8a91c2e..ce6d521ac5 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -196,6 +196,11 @@ impl NetworkSpecialization for DummySpecialization { _peer_id: PeerId, _message: &mut Option>, ) {} + + fn on_event( + &mut self, + _event: crate::event::Event + ) {} } pub type PeersFullClient =