mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 16:31:07 +00:00
Add DHT key-value behaviour (#2937)
* Add DHT key-value behaviour * Apply suggestions from code review Co-Authored-By: Bastian Köcher <bkchr@users.noreply.github.com> * Apply suggestions from code review Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com> * Return which key failed to be inserted
This commit is contained in:
committed by
Pierre Krieger
parent
0ddf4a2a00
commit
e735853ca3
@@ -14,12 +14,15 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::{debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour};
|
||||
use crate::{
|
||||
debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour, event::DhtEvent
|
||||
};
|
||||
use futures::prelude::*;
|
||||
use libp2p::NetworkBehaviour;
|
||||
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, protocols_handler::IntoProtocolsHandler, PublicKey};
|
||||
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
|
||||
use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters};
|
||||
use libp2p::multihash::Multihash;
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
use libp2p::core::swarm::toggle::Toggle;
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
@@ -30,7 +33,7 @@ use void;
|
||||
|
||||
/// General behaviour of the network.
|
||||
#[derive(NetworkBehaviour)]
|
||||
#[behaviour(out_event = "TBehaviourEv", poll_method = "poll")]
|
||||
#[behaviour(out_event = "BehaviourOut<TBehaviourEv>", poll_method = "poll")]
|
||||
pub struct Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||
/// Main protocol that handles everything except the discovery and the technicalities.
|
||||
user_protocol: UserBehaviourWrap<TBehaviour>,
|
||||
@@ -45,7 +48,13 @@ pub struct Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||
|
||||
/// Queue of events to produce for the outside.
|
||||
#[behaviour(ignore)]
|
||||
events: Vec<TBehaviourEv>,
|
||||
events: Vec<BehaviourOut<TBehaviourEv>>,
|
||||
}
|
||||
|
||||
/// A wrapper for the behavbour event that adds DHT-related event variant.
|
||||
pub enum BehaviourOut<TBehaviourEv> {
|
||||
Behaviour(TBehaviourEv),
|
||||
Dht(DhtEvent),
|
||||
}
|
||||
|
||||
impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||
@@ -112,6 +121,16 @@ impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, T
|
||||
pub fn user_protocol_mut(&mut self) -> &mut TBehaviour {
|
||||
&mut self.user_protocol.0
|
||||
}
|
||||
|
||||
/// Start querying a record from the DHT. Will later produce either a `ValueFound` or a `ValueNotFound` event.
|
||||
pub fn get_value(&mut self, key: &Multihash) {
|
||||
self.discovery.get_value(key);
|
||||
}
|
||||
|
||||
/// Starts putting a record into DHT. Will later produce either a `ValuePut` or a `ValuePutFailed` event.
|
||||
pub fn put_value(&mut self, key: Multihash, value: Vec<u8>) {
|
||||
self.discovery.put_value(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<void::Void> for
|
||||
@@ -124,7 +143,7 @@ Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<UserEventWrap<TBehaviourEv>> for
|
||||
Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||
fn inject_event(&mut self, event: UserEventWrap<TBehaviourEv>) {
|
||||
self.events.push(event.0);
|
||||
self.events.push(BehaviourOut::Behaviour(event.0));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -158,6 +177,18 @@ impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<Discover
|
||||
DiscoveryOut::Discovered(peer_id) => {
|
||||
self.user_protocol.0.add_discovered_nodes(iter::once(peer_id));
|
||||
}
|
||||
DiscoveryOut::ValueFound(results) => {
|
||||
self.events.push(BehaviourOut::Dht(DhtEvent::ValueFound(results)));
|
||||
}
|
||||
DiscoveryOut::ValueNotFound(key) => {
|
||||
self.events.push(BehaviourOut::Dht(DhtEvent::ValueNotFound(key)));
|
||||
}
|
||||
DiscoveryOut::ValuePut(key) => {
|
||||
self.events.push(BehaviourOut::Dht(DhtEvent::ValuePut(key)));
|
||||
}
|
||||
DiscoveryOut::ValuePutFailed(key) => {
|
||||
self.events.push(BehaviourOut::Dht(DhtEvent::ValuePutFailed(key)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -177,7 +208,7 @@ impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<MdnsEven
|
||||
}
|
||||
|
||||
impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
|
||||
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, TBehaviourEv>> {
|
||||
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut<TBehaviourEv>>> {
|
||||
if !self.events.is_empty() {
|
||||
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
|
||||
}
|
||||
|
||||
@@ -18,10 +18,11 @@ use futures::prelude::*;
|
||||
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey};
|
||||
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
|
||||
use libp2p::core::swarm::PollParameters;
|
||||
use libp2p::kad::{Kademlia, KademliaOut};
|
||||
use libp2p::kad::{GetValueResult, Kademlia, KademliaOut, PutValueResult};
|
||||
use libp2p::multihash::Multihash;
|
||||
use libp2p::multiaddr::Protocol;
|
||||
use log::{debug, info, trace, warn};
|
||||
use std::{cmp, time::Duration};
|
||||
use std::{cmp, num::NonZeroU8, time::Duration};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_timer::{Delay, clock::Clock};
|
||||
|
||||
@@ -81,12 +82,35 @@ impl<TSubstream> DiscoveryBehaviour<TSubstream> {
|
||||
pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
|
||||
self.kademlia.add_address(peer_id, addr);
|
||||
}
|
||||
|
||||
/// Get a record from the DHT.
|
||||
pub fn get_value(&mut self, key: &Multihash) {
|
||||
self.kademlia.get_value(key, NonZeroU8::new(10)
|
||||
.expect("Casting 10 to NonZeroU8 should succeed; qed"));
|
||||
}
|
||||
|
||||
/// Put a record into the DHT.
|
||||
pub fn put_value(&mut self, key: Multihash, value: Vec<u8>) {
|
||||
self.kademlia.put_value(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
/// Event generated by the `DiscoveryBehaviour`.
|
||||
pub enum DiscoveryOut {
|
||||
/// We have discovered a node. Can be called multiple times with the same identity.
|
||||
Discovered(PeerId),
|
||||
|
||||
/// The DHT yeided results for the record request, grouped in (key, value) pairs.
|
||||
ValueFound(Vec<(Multihash, Vec<u8>)>),
|
||||
|
||||
/// The record requested was not found in the DHT.
|
||||
ValueNotFound(Multihash),
|
||||
|
||||
/// The record with a given key was successfully inserted into the DHT.
|
||||
ValuePut(Multihash),
|
||||
|
||||
/// Inserting a value into the DHT failed.
|
||||
ValuePutFailed(Multihash),
|
||||
}
|
||||
|
||||
impl<TSubstream> NetworkBehaviour for DiscoveryBehaviour<TSubstream>
|
||||
@@ -175,10 +199,35 @@ where
|
||||
results");
|
||||
}
|
||||
}
|
||||
KademliaOut::GetValueResult(res) => {
|
||||
let ev = match res {
|
||||
GetValueResult::Found { results } => {
|
||||
let results = results
|
||||
.into_iter()
|
||||
.map(|r| (r.key, r.value))
|
||||
.collect();
|
||||
|
||||
DiscoveryOut::ValueFound(results)
|
||||
}
|
||||
GetValueResult::NotFound { key, .. } => {
|
||||
DiscoveryOut::ValueNotFound(key)
|
||||
}
|
||||
};
|
||||
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
KademliaOut::PutValueResult(res) => {
|
||||
let ev = match res {
|
||||
PutValueResult::Ok{ key, .. } => {
|
||||
DiscoveryOut::ValuePut(key)
|
||||
}
|
||||
PutValueResult::Err { key, .. } => {
|
||||
DiscoveryOut::ValuePutFailed(key)
|
||||
}
|
||||
};
|
||||
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
// We never start any other type of query.
|
||||
KademliaOut::GetProvidersResult { .. } => {}
|
||||
KademliaOut::GetValueResult(_) => {}
|
||||
KademliaOut::PutValueResult(_) => {}
|
||||
}
|
||||
},
|
||||
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
|
||||
|
||||
@@ -185,12 +185,13 @@ pub use service::{
|
||||
NetworkMsg, ExHashT, ReportHandle,
|
||||
};
|
||||
pub use config::{NodeKeyConfig, Secret, Secp256k1Secret, Ed25519Secret};
|
||||
pub use protocol::{PeerInfo, Context, consensus_gossip, message, specialization};
|
||||
pub use protocol::{PeerInfo, Context, consensus_gossip, event, message, specialization};
|
||||
pub use protocol::sync::SyncState;
|
||||
pub use libp2p::{Multiaddr, multiaddr, build_multiaddr};
|
||||
pub use libp2p::{identity, PeerId, core::PublicKey, wasm_ext::ExtTransport};
|
||||
|
||||
pub use message::{generic as generic_message, RequestId, Status as StatusMessage};
|
||||
pub use event::Event;
|
||||
pub use error::Error;
|
||||
pub use protocol::on_demand::AlwaysBadChecker;
|
||||
pub use on_demand_layer::{OnDemand, RemoteResponse};
|
||||
|
||||
@@ -30,6 +30,7 @@ use message::{
|
||||
};
|
||||
use message::{BlockAttributes, Direction, FromBlock, RequestId};
|
||||
use message::generic::{Message as GenericMessage, ConsensusMessage};
|
||||
use event::Event;
|
||||
use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
|
||||
use on_demand::{OnDemandCore, OnDemandNetwork, RequestData};
|
||||
use specialization::NetworkSpecialization;
|
||||
@@ -49,6 +50,7 @@ use util::LruHashSet;
|
||||
mod util;
|
||||
pub mod consensus_gossip;
|
||||
pub mod message;
|
||||
pub mod event;
|
||||
pub mod on_demand;
|
||||
pub mod specialization;
|
||||
pub mod sync;
|
||||
@@ -497,6 +499,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
|
||||
self.context_data.peers.iter().map(|(id, peer)| (id, &peer.info))
|
||||
}
|
||||
|
||||
pub fn on_event(&mut self, event: Event) {
|
||||
self.specialization.on_event(event);
|
||||
}
|
||||
|
||||
pub fn on_custom_message(
|
||||
&mut self,
|
||||
network_out: &mut dyn NetworkOut<B>,
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
// This file is part of Substrate.
|
||||
|
||||
// Substrate is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Substrate is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Network event types. These are are not the part of the protocol, but rather
|
||||
//! events that happen on the network like DHT get/put results received.
|
||||
|
||||
use libp2p::multihash::Multihash;
|
||||
|
||||
/// Events generated by DHT as a response to get_value and put_value requests.
|
||||
pub enum DhtEvent {
|
||||
/// The value was found.
|
||||
ValueFound(Vec<(Multihash, Vec<u8>)>),
|
||||
|
||||
/// The requested record has not been found in the DHT.
|
||||
ValueNotFound(Multihash),
|
||||
|
||||
/// The record has been successfully inserted into the DHT.
|
||||
ValuePut(Multihash),
|
||||
|
||||
/// An error has occured while putting a record into the DHT.
|
||||
ValuePutFailed(Multihash),
|
||||
}
|
||||
|
||||
/// Type for events generated by networking layer.
|
||||
pub enum Event {
|
||||
/// Event generated by a DHT.
|
||||
Dht(DhtEvent),
|
||||
}
|
||||
@@ -39,6 +39,12 @@ pub trait NetworkSpecialization<B: BlockT>: Send + Sync + 'static {
|
||||
message: &mut Option<crate::message::Message<B>>
|
||||
);
|
||||
|
||||
/// Called when a network-specific event arrives.
|
||||
fn on_event(
|
||||
&mut self,
|
||||
event: crate::protocol::event::Event
|
||||
);
|
||||
|
||||
/// Called on abort.
|
||||
#[deprecated(note = "This method is never called; aborting corresponds to dropping the object")]
|
||||
fn on_abort(&mut self) { }
|
||||
@@ -130,6 +136,13 @@ macro_rules! construct_simple_protocol {
|
||||
$( self.$sub_protocol_name.on_message(_ctx, _who, _message); )*
|
||||
}
|
||||
|
||||
fn on_event(
|
||||
&mut self,
|
||||
_event: $crate::event::Event
|
||||
) {
|
||||
$( self.$sub_protocol_name.on_event(_event); )*
|
||||
}
|
||||
|
||||
fn on_abort(&mut self) {
|
||||
$( self.$sub_protocol_name.on_abort(); )*
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@
|
||||
use crate::{ExHashT, DiscoveryNetBehaviour, ProtocolId};
|
||||
use crate::custom_proto::{CustomProto, CustomProtoOut};
|
||||
use crate::chain::{Client, FinalityProofProvider};
|
||||
use crate::protocol::{self, CustomMessageOutcome, Protocol, ProtocolConfig, sync::SyncState};
|
||||
use crate::protocol::{self, event::Event, CustomMessageOutcome, Protocol, ProtocolConfig, sync::SyncState};
|
||||
use crate::protocol::{PeerInfo, NetworkOut, message::Message, on_demand::RequestData};
|
||||
use crate::protocol::consensus_gossip::MessageRecipient as GossipMessageRecipient;
|
||||
use crate::protocol::specialization::NetworkSpecialization;
|
||||
@@ -276,6 +276,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> ProtocolBehaviour<B, S,
|
||||
self.protocol.justification_import_result(hash, number, success)
|
||||
}
|
||||
|
||||
/// The networking-level event has happened.
|
||||
pub fn on_event(&mut self, event: Event) {
|
||||
self.protocol.on_event(event);
|
||||
}
|
||||
|
||||
/// Request a finality proof for the given block.
|
||||
///
|
||||
/// Queues a new finality proof request and tries to dispatch all pending requests.
|
||||
|
||||
@@ -23,10 +23,11 @@ use std::time::Duration;
|
||||
use log::{warn, error, info};
|
||||
use libp2p::core::swarm::NetworkBehaviour;
|
||||
use libp2p::core::{nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox};
|
||||
use libp2p::multihash::Multihash;
|
||||
use futures::{prelude::*, sync::oneshot, sync::mpsc};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use crate::protocol_behaviour::ProtocolBehaviour;
|
||||
use crate::{behaviour::Behaviour, parse_str_addr};
|
||||
use crate::{behaviour::{Behaviour, BehaviourOut}, parse_str_addr};
|
||||
use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer};
|
||||
use crate::{transport, config::NodeKeyConfig, config::NonReservedPeerMode};
|
||||
use peerset::PeersetHandle;
|
||||
@@ -35,7 +36,7 @@ use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId
|
||||
|
||||
use crate::AlwaysBadChecker;
|
||||
use crate::protocol::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient};
|
||||
use crate::protocol::message::Message;
|
||||
use crate::protocol::{event::Event, message::Message};
|
||||
use crate::protocol::on_demand::RequestData;
|
||||
use crate::protocol::{self, Context, CustomMessageOutcome, ConnectedPeer, PeerInfo};
|
||||
use crate::protocol::sync::SyncState;
|
||||
@@ -370,6 +371,16 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
|
||||
pub fn is_major_syncing(&self) -> bool {
|
||||
self.is_major_syncing.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Get a value.
|
||||
pub fn get_value(&mut self, key: &Multihash) {
|
||||
self.network.lock().get_value(key);
|
||||
}
|
||||
|
||||
/// Put a value.
|
||||
pub fn put_value(&mut self, key: Multihash, value: Vec<u8>) {
|
||||
self.network.lock().put_value(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkService<B, S, H> {
|
||||
@@ -744,7 +755,12 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
|
||||
|
||||
let outcome = match poll_value {
|
||||
Ok(Async::NotReady) => break,
|
||||
Ok(Async::Ready(Some(outcome))) => outcome,
|
||||
Ok(Async::Ready(Some(BehaviourOut::Behaviour(outcome)))) => outcome,
|
||||
Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => {
|
||||
network_service.user_protocol_mut()
|
||||
.on_event(Event::Dht(ev));
|
||||
CustomMessageOutcome::None
|
||||
},
|
||||
Ok(Async::Ready(None)) => CustomMessageOutcome::None,
|
||||
Err(err) => {
|
||||
error!(target: "sync", "Error in the network: {:?}", err);
|
||||
|
||||
@@ -196,6 +196,11 @@ impl NetworkSpecialization<Block> for DummySpecialization {
|
||||
_peer_id: PeerId,
|
||||
_message: &mut Option<crate::message::Message<Block>>,
|
||||
) {}
|
||||
|
||||
fn on_event(
|
||||
&mut self,
|
||||
_event: crate::event::Event
|
||||
) {}
|
||||
}
|
||||
|
||||
pub type PeersFullClient =
|
||||
|
||||
Reference in New Issue
Block a user