Run cargo fmt on the whole code base (#9394)

* Run cargo fmt on the whole code base

* Second run

* Add CI check

* Fix compilation

* More unnecessary braces

* Handle weights

* Use --all

* Use correct attributes...

* Fix UI tests

* AHHHHHHHHH

* 🤦

* Docs

* Fix compilation

* 🤷

* Please stop

* 🤦 x 2

* More

* make rustfmt.toml consistent with polkadot

Co-authored-by: André Silva <andrerfosilva@gmail.com>
This commit is contained in:
Bastian Köcher
2021-07-21 16:32:32 +02:00
committed by GitHub
parent d451c38c1c
commit 7b56ab15b4
1010 changed files with 53339 additions and 51208 deletions
@@ -20,8 +20,7 @@
//! events that happen on the network like DHT get/put results received.
use bytes::Bytes;
use libp2p::core::PeerId;
use libp2p::kad::record::Key;
use libp2p::{core::PeerId, kad::record::Key};
use std::borrow::Cow;
/// Events generated by DHT as a response to get_value and put_value requests.
@@ -18,16 +18,17 @@
//! Network packet message types. These get serialized and put into the lower level protocol payload.
use bitflags::bitflags;
use sp_runtime::{ConsensusEngineId, traits::{Block as BlockT, Header as HeaderT}};
use codec::{Encode, Decode, Input, Output, Error};
pub use self::generic::{
BlockAnnounce, RemoteCallRequest, RemoteReadRequest,
RemoteHeaderRequest, RemoteHeaderResponse,
RemoteChangesRequest, RemoteChangesResponse,
FromBlock, RemoteReadChildRequest, Roles,
BlockAnnounce, FromBlock, RemoteCallRequest, RemoteChangesRequest, RemoteChangesResponse,
RemoteHeaderRequest, RemoteHeaderResponse, RemoteReadChildRequest, RemoteReadRequest, Roles,
};
use bitflags::bitflags;
use codec::{Decode, Encode, Error, Input, Output};
use sc_client_api::StorageProof;
use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT},
ConsensusEngineId,
};
/// A unique ID of a request.
pub type RequestId = u64;
@@ -41,24 +42,16 @@ pub type Message<B> = generic::Message<
>;
/// Type alias for using the block request type using block type parameters.
pub type BlockRequest<B> = generic::BlockRequest<
<B as BlockT>::Hash,
<<B as BlockT>::Header as HeaderT>::Number,
>;
pub type BlockRequest<B> =
generic::BlockRequest<<B as BlockT>::Hash, <<B as BlockT>::Header as HeaderT>::Number>;
/// Type alias for using the BlockData type using block type parameters.
pub type BlockData<B> = generic::BlockData<
<B as BlockT>::Header,
<B as BlockT>::Hash,
<B as BlockT>::Extrinsic,
>;
pub type BlockData<B> =
generic::BlockData<<B as BlockT>::Header, <B as BlockT>::Hash, <B as BlockT>::Extrinsic>;
/// Type alias for using the BlockResponse type using block type parameters.
pub type BlockResponse<B> = generic::BlockResponse<
<B as BlockT>::Header,
<B as BlockT>::Hash,
<B as BlockT>::Extrinsic,
>;
pub type BlockResponse<B> =
generic::BlockResponse<<B as BlockT>::Header, <B as BlockT>::Hash, <B as BlockT>::Extrinsic>;
/// A set of transactions.
pub type Transactions<E> = Vec<E>;
@@ -168,14 +161,13 @@ impl<H: HeaderT> generic::BlockAnnounce<H> {
/// Generic types.
pub mod generic {
use bitflags::bitflags;
use codec::{Encode, Decode, Input, Output};
use sp_runtime::{EncodedJustification, Justifications};
use super::{
RemoteReadResponse, Transactions, Direction,
RequestId, BlockAttributes, RemoteCallResponse, ConsensusEngineId,
BlockState, StorageProof,
BlockAttributes, BlockState, ConsensusEngineId, Direction, RemoteCallResponse,
RemoteReadResponse, RequestId, StorageProof, Transactions,
};
use bitflags::bitflags;
use codec::{Decode, Encode, Input, Output};
use sp_runtime::{EncodedJustification, Justifications};
bitflags! {
/// Bitmask of the roles that a node fulfills.
@@ -358,11 +350,12 @@ pub mod generic {
let compact = CompactStatus::decode(value)?;
let chain_status = match <Vec<u8>>::decode(value) {
Ok(v) => v,
Err(e) => if compact.version <= LAST_CHAIN_STATUS_VERSION {
return Err(e)
} else {
Vec::new()
}
Err(e) =>
if compact.version <= LAST_CHAIN_STATUS_VERSION {
return Err(e)
} else {
Vec::new()
},
};
let CompactStatus {
@@ -443,11 +436,7 @@ pub mod generic {
let header = H::decode(input)?;
let state = BlockState::decode(input).ok();
let data = Vec::decode(input).ok();
Ok(BlockAnnounce {
header,
state,
data,
})
Ok(BlockAnnounce { header, state, data })
}
}
@@ -19,10 +19,12 @@
//! Implementation of libp2p's `NetworkBehaviour` trait that establishes communications and opens
//! notifications substreams.
pub use self::behaviour::{Notifications, NotificationsOut, ProtocolConfig};
pub use self::handler::{NotifsHandlerError, NotificationsSink, Ready};
pub use self::{
behaviour::{Notifications, NotificationsOut, ProtocolConfig},
handler::{NotificationsSink, NotifsHandlerError, Ready},
};
mod behaviour;
mod handler;
mod upgrade;
mod tests;
mod upgrade;
File diff suppressed because it is too large Load Diff
@@ -57,31 +57,39 @@
//! It is illegal to send a [`NotifsHandlerIn::Open`] before a previously-emitted
//! [`NotifsHandlerIn::Open`] has gotten an answer.
use crate::protocol::notifications::{
upgrade::{
NotificationsIn, NotificationsOut, NotificationsInSubstream, NotificationsOutSubstream,
NotificationsHandshakeError, UpgradeCollec
},
use crate::protocol::notifications::upgrade::{
NotificationsHandshakeError, NotificationsIn, NotificationsInSubstream, NotificationsOut,
NotificationsOutSubstream, UpgradeCollec,
};
use bytes::BytesMut;
use libp2p::core::{ConnectedPoint, PeerId, upgrade::{InboundUpgrade, OutboundUpgrade}};
use libp2p::swarm::{
ProtocolsHandler, ProtocolsHandlerEvent,
IntoProtocolsHandler,
KeepAlive,
ProtocolsHandlerUpgrErr,
SubstreamProtocol,
NegotiatedSubstream,
};
use futures::{
channel::mpsc,
lock::{Mutex as FuturesMutex, MutexGuard as FuturesMutexGuard},
prelude::*
prelude::*,
};
use libp2p::{
core::{
upgrade::{InboundUpgrade, OutboundUpgrade},
ConnectedPoint, PeerId,
},
swarm::{
IntoProtocolsHandler, KeepAlive, NegotiatedSubstream, ProtocolsHandler,
ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
},
};
use log::error;
use parking_lot::{Mutex, RwLock};
use std::{borrow::Cow, collections::VecDeque, mem, pin::Pin, str, sync::Arc, task::{Context, Poll}, time::Duration};
use std::{
borrow::Cow,
collections::VecDeque,
mem,
pin::Pin,
str,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use wasm_timer::Instant;
/// Number of pending notifications in asynchronous contexts.
@@ -131,7 +139,7 @@ pub struct NotifsHandler {
/// Events to return in priority from `poll`.
events_queue: VecDeque<
ProtocolsHandlerEvent<NotificationsOut, usize, NotifsHandlerOut, NotifsHandlerError>
ProtocolsHandlerEvent<NotificationsOut, usize, NotifsHandlerOut, NotifsHandlerError>,
>,
}
@@ -195,10 +203,12 @@ enum State {
/// We use two different channels in order to have two different channel sizes, but from
/// the receiving point of view, the two channels are the same.
/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
notifications_sink_rx: stream::Peekable<stream::Select<
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>
>>,
notifications_sink_rx: stream::Peekable<
stream::Select<
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
>,
>,
/// Outbound substream that has been accepted by the remote.
///
@@ -220,28 +230,33 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
type Handler = NotifsHandler;
fn inbound_protocol(&self) -> UpgradeCollec<NotificationsIn> {
self.protocols.iter()
.map(|cfg| NotificationsIn::new(cfg.name.clone(), cfg.fallback_names.clone(), cfg.max_notification_size))
self.protocols
.iter()
.map(|cfg| {
NotificationsIn::new(
cfg.name.clone(),
cfg.fallback_names.clone(),
cfg.max_notification_size,
)
})
.collect::<UpgradeCollec<_>>()
}
fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
NotifsHandler {
protocols: self.protocols.into_iter().map(|config| {
let in_upgrade = NotificationsIn::new(
config.name.clone(),
config.fallback_names.clone(),
config.max_notification_size
);
protocols: self
.protocols
.into_iter()
.map(|config| {
let in_upgrade = NotificationsIn::new(
config.name.clone(),
config.fallback_names.clone(),
config.max_notification_size,
);
Protocol {
config,
in_upgrade,
state: State::Closed {
pending_opening: false,
},
}
}).collect(),
Protocol { config, in_upgrade, state: State::Closed { pending_opening: false } }
})
.collect(),
peer_id: peer_id.clone(),
endpoint: connected_point.clone(),
when_connection_open: Instant::now(),
@@ -363,9 +378,7 @@ struct NotificationsSinkInner {
enum NotificationsSinkMessage {
/// Message emitted by [`NotificationsSink::reserve_notification`] and
/// [`NotificationsSink::write_notification_now`].
Notification {
message: Vec<u8>,
},
Notification { message: Vec<u8> },
/// Must close the connection.
ForceClose,
@@ -386,14 +399,10 @@ impl NotificationsSink {
/// error to send a notification using an unknown protocol.
///
/// This method will be removed in a future version.
pub fn send_sync_notification<'a>(
&'a self,
message: impl Into<Vec<u8>>
) {
pub fn send_sync_notification<'a>(&'a self, message: impl Into<Vec<u8>>) {
let mut lock = self.inner.sync_channel.lock();
let result = lock.try_send(NotificationsSinkMessage::Notification {
message: message.into()
});
let result =
lock.try_send(NotificationsSinkMessage::Notification { message: message.into() });
if result.is_err() {
// Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the
@@ -433,13 +442,10 @@ impl<'a> Ready<'a> {
/// Consumes this slots reservation and actually queues the notification.
///
/// Returns an error if the substream has been closed.
pub fn send(
mut self,
notification: impl Into<Vec<u8>>
) -> Result<(), ()> {
self.lock.start_send(NotificationsSinkMessage::Notification {
message: notification.into(),
}).map_err(|_| ())
pub fn send(mut self, notification: impl Into<Vec<u8>>) -> Result<(), ()> {
self.lock
.start_send(NotificationsSinkMessage::Notification { message: notification.into() })
.map_err(|_| ())
}
}
@@ -457,12 +463,8 @@ impl NotifsHandlerProto {
/// handshake, and the maximum allowed size of a notification. At the moment, the message
/// is always the same whether we open a substream ourselves or respond to handshake from
/// the remote.
pub fn new(
list: impl Into<Vec<ProtocolConfig>>,
) -> Self {
NotifsHandlerProto {
protocols: list.into(),
}
pub fn new(list: impl Into<Vec<ProtocolConfig>>) -> Self {
NotifsHandlerProto { protocols: list.into() }
}
}
@@ -477,7 +479,9 @@ impl ProtocolsHandler for NotifsHandler {
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
let protocols = self.protocols.iter()
let protocols = self
.protocols
.iter()
.map(|p| p.in_upgrade.clone())
.collect::<UpgradeCollec<_>>();
@@ -486,17 +490,16 @@ impl ProtocolsHandler for NotifsHandler {
fn inject_fully_negotiated_inbound(
&mut self,
(mut in_substream_open, protocol_index):
<Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
(mut in_substream_open, protocol_index): <Self::InboundProtocol as InboundUpgrade<
NegotiatedSubstream,
>>::Output,
(): (),
) {
let mut protocol_info = &mut self.protocols[protocol_index];
match protocol_info.state {
State::Closed { pending_opening } => {
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index,
}
NotifsHandlerOut::OpenDesiredByRemote { protocol_index },
));
protocol_info.state = State::OpenDesiredByRemote {
@@ -512,13 +515,13 @@ impl ProtocolsHandler for NotifsHandler {
// in mind that it is invalid for the remote to open multiple such
// substreams, and therefore sending a "RST" is the most correct thing
// to do.
return;
return
},
State::Opening { ref mut in_substream, .. } |
State::Open { ref mut in_substream, .. } => {
if in_substream.is_some() {
// Same remark as above.
return;
return
}
// Create `handshake_message` on a separate line to be sure that the
@@ -533,18 +536,18 @@ impl ProtocolsHandler for NotifsHandler {
fn inject_fully_negotiated_outbound(
&mut self,
new_open: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
protocol_index: Self::OutboundOpenInfo
protocol_index: Self::OutboundOpenInfo,
) {
match self.protocols[protocol_index].state {
State::Closed { ref mut pending_opening } |
State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
debug_assert!(*pending_opening);
*pending_opening = false;
}
},
State::Open { .. } => {
error!(target: "sub-libp2p", "☎️ State mismatch in notifications handler");
debug_assert!(false);
}
},
State::Opening { ref mut in_substream } => {
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
@@ -557,7 +560,8 @@ impl ProtocolsHandler for NotifsHandler {
};
self.protocols[protocol_index].state = State::Open {
notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse())
.peekable(),
out_substream: Some(new_open.substream),
in_substream: in_substream.take(),
};
@@ -568,10 +572,10 @@ impl ProtocolsHandler for NotifsHandler {
negotiated_fallback: new_open.negotiated_fallback,
endpoint: self.endpoint.clone(),
received_handshake: new_open.handshake,
notifications_sink
}
notifications_sink,
},
));
}
},
}
}
@@ -586,18 +590,18 @@ impl ProtocolsHandler for NotifsHandler {
protocol_info.config.name.clone(),
protocol_info.config.fallback_names.clone(),
protocol_info.config.handshake.read().clone(),
protocol_info.config.max_notification_size
protocol_info.config.max_notification_size,
);
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto, protocol_index)
.with_timeout(OPEN_TIMEOUT),
});
self.events_queue.push_back(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto, protocol_index)
.with_timeout(OPEN_TIMEOUT),
},
);
}
protocol_info.state = State::Opening {
in_substream: None,
};
protocol_info.state = State::Opening { in_substream: None };
},
State::OpenDesiredByRemote { pending_opening, in_substream } => {
let handshake_message = protocol_info.config.handshake.read().clone();
@@ -610,27 +614,27 @@ impl ProtocolsHandler for NotifsHandler {
protocol_info.config.max_notification_size,
);
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto, protocol_index)
.with_timeout(OPEN_TIMEOUT),
});
self.events_queue.push_back(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto, protocol_index)
.with_timeout(OPEN_TIMEOUT),
},
);
}
in_substream.send_handshake(handshake_message);
// The state change is done in two steps because of borrowing issues.
let in_substream = match
mem::replace(&mut protocol_info.state, State::Opening { in_substream: None })
{
let in_substream = match mem::replace(
&mut protocol_info.state,
State::Opening { in_substream: None },
) {
State::OpenDesiredByRemote { in_substream, .. } => in_substream,
_ => unreachable!()
};
protocol_info.state = State::Opening {
in_substream: Some(in_substream),
_ => unreachable!(),
};
protocol_info.state = State::Opening { in_substream: Some(in_substream) };
},
State::Opening { .. } |
State::Open { .. } => {
State::Opening { .. } | State::Open { .. } => {
// As documented, it is forbidden to send an `Open` while there is already
// one in the fly.
error!(target: "sub-libp2p", "opening already-opened handler");
@@ -642,34 +646,26 @@ impl ProtocolsHandler for NotifsHandler {
NotifsHandlerIn::Close { protocol_index } => {
match self.protocols[protocol_index].state {
State::Open { .. } => {
self.protocols[protocol_index].state = State::Closed {
pending_opening: false,
};
self.protocols[protocol_index].state =
State::Closed { pending_opening: false };
},
State::Opening { .. } => {
self.protocols[protocol_index].state = State::Closed {
pending_opening: true,
};
self.protocols[protocol_index].state =
State::Closed { pending_opening: true };
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::OpenResultErr {
protocol_index,
}
NotifsHandlerOut::OpenResultErr { protocol_index },
));
},
State::OpenDesiredByRemote { pending_opening, .. } => {
self.protocols[protocol_index].state = State::Closed {
pending_opening,
};
}
self.protocols[protocol_index].state = State::Closed { pending_opening };
},
State::Closed { .. } => {},
}
self.events_queue.push_back(
ProtocolsHandlerEvent::Custom(NotifsHandlerOut::CloseResult {
protocol_index,
})
);
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::CloseResult { protocol_index },
));
},
}
}
@@ -677,26 +673,22 @@ impl ProtocolsHandler for NotifsHandler {
fn inject_dial_upgrade_error(
&mut self,
num: usize,
_: ProtocolsHandlerUpgrErr<NotificationsHandshakeError>
_: ProtocolsHandlerUpgrErr<NotificationsHandshakeError>,
) {
match self.protocols[num].state {
State::Closed { ref mut pending_opening } |
State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
debug_assert!(*pending_opening);
*pending_opening = false;
}
},
State::Opening { .. } => {
self.protocols[num].state = State::Closed {
pending_opening: false,
};
self.protocols[num].state = State::Closed { pending_opening: false };
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::OpenResultErr {
protocol_index: num,
}
NotifsHandlerOut::OpenResultErr { protocol_index: num },
));
}
},
// No substream is being open when already `Open`.
State::Open { .. } => debug_assert!(false),
@@ -706,7 +698,7 @@ impl ProtocolsHandler for NotifsHandler {
fn connection_keep_alive(&self) -> KeepAlive {
// `Yes` if any protocol has some activity.
if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) {
return KeepAlive::Yes;
return KeepAlive::Yes
}
// A grace period of `INITIAL_KEEPALIVE_TIME` must be given to leave time for the remote
@@ -718,28 +710,33 @@ impl ProtocolsHandler for NotifsHandler {
&mut self,
cx: &mut Context,
) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
if let Some(ev) = self.events_queue.pop_front() {
return Poll::Ready(ev);
return Poll::Ready(ev)
}
// For each open substream, try send messages from `notifications_sink_rx` to the
// substream.
for protocol_index in 0..self.protocols.len() {
if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. }
= &mut self.protocols[protocol_index].state
if let State::Open {
notifications_sink_rx, out_substream: Some(out_substream), ..
} = &mut self.protocols[protocol_index].state
{
loop {
// Only proceed with `out_substream.poll_ready_unpin` if there is an element
// available in `notifications_sink_rx`. This avoids waking up the task when
// a substream is ready to send if there isn't actually something to send.
match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) {
Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) => {
return Poll::Ready(
ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged)
);
},
Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) =>
return Poll::Ready(ProtocolsHandlerEvent::Close(
NotifsHandlerError::SyncNotificationsClogged,
)),
Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {},
Poll::Ready(None) | Poll::Pending => break,
}
@@ -748,19 +745,20 @@ impl ProtocolsHandler for NotifsHandler {
// substream is ready to accept a message.
match out_substream.poll_ready_unpin(cx) {
Poll::Ready(_) => {},
Poll::Pending => break
Poll::Pending => break,
}
// Now that the substream is ready for a message, grab what to send.
let message = match notifications_sink_rx.poll_next_unpin(cx) {
Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => message,
Poll::Ready(Some(NotificationsSinkMessage::ForceClose))
| Poll::Ready(None)
| Poll::Pending => {
Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) =>
message,
Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) |
Poll::Ready(None) |
Poll::Pending => {
// Should never be reached, as per `poll_peek` above.
debug_assert!(false);
break;
}
break
},
};
let _ = out_substream.start_send_unpin(message);
@@ -784,15 +782,15 @@ impl ProtocolsHandler for NotifsHandler {
Poll::Ready(Err(_)) => {
*out_substream = None;
let event = NotifsHandlerOut::CloseDesired { protocol_index };
return Poll::Ready(ProtocolsHandlerEvent::Custom(event));
}
return Poll::Ready(ProtocolsHandlerEvent::Custom(event))
},
};
}
},
State::Closed { .. } |
State::Opening { .. } |
State::Open { out_substream: None, .. } |
State::OpenDesiredByRemote { .. } => {}
State::OpenDesiredByRemote { .. } => {},
}
}
@@ -803,45 +801,40 @@ impl ProtocolsHandler for NotifsHandler {
match &mut self.protocols[protocol_index].state {
State::Closed { .. } |
State::Open { in_substream: None, .. } |
State::Opening { in_substream: None } => {}
State::Opening { in_substream: None } => {},
State::Open { in_substream: in_substream @ Some(_), .. } => {
State::Open { in_substream: in_substream @ Some(_), .. } =>
match Stream::poll_next(Pin::new(in_substream.as_mut().unwrap()), cx) {
Poll::Pending => {},
Poll::Ready(Some(Ok(message))) => {
let event = NotifsHandlerOut::Notification {
protocol_index,
message,
};
let event = NotifsHandlerOut::Notification { protocol_index, message };
return Poll::Ready(ProtocolsHandlerEvent::Custom(event))
},
Poll::Ready(None) | Poll::Ready(Some(Err(_))) =>
*in_substream = None,
}
}
Poll::Ready(None) | Poll::Ready(Some(Err(_))) => *in_substream = None,
},
State::OpenDesiredByRemote { in_substream, pending_opening } => {
State::OpenDesiredByRemote { in_substream, pending_opening } =>
match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
Poll::Pending => {},
Poll::Ready(Ok(void)) => match void {},
Poll::Ready(Err(_)) => {
self.protocols[protocol_index].state = State::Closed {
pending_opening: *pending_opening,
};
self.protocols[protocol_index].state =
State::Closed { pending_opening: *pending_opening };
return Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::CloseDesired { protocol_index }
NotifsHandlerOut::CloseDesired { protocol_index },
))
},
}
}
},
State::Opening { in_substream: in_substream @ Some(_), .. } => {
match NotificationsInSubstream::poll_process(Pin::new(in_substream.as_mut().unwrap()), cx) {
State::Opening { in_substream: in_substream @ Some(_), .. } =>
match NotificationsInSubstream::poll_process(
Pin::new(in_substream.as_mut().unwrap()),
cx,
) {
Poll::Pending => {},
Poll::Ready(Ok(void)) => match void {},
Poll::Ready(Err(_)) => *in_substream = None,
}
}
},
}
}
@@ -21,19 +21,24 @@
use crate::protocol::notifications::{Notifications, NotificationsOut, ProtocolConfig};
use futures::prelude::*;
use libp2p::{PeerId, Multiaddr, Transport};
use libp2p::core::{
connection::{ConnectionId, ListenerId},
ConnectedPoint,
transport::MemoryTransport,
upgrade
use libp2p::{
core::{
connection::{ConnectionId, ListenerId},
transport::MemoryTransport,
upgrade, ConnectedPoint,
},
identity, noise,
swarm::{
IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
ProtocolsHandler, Swarm,
},
yamux, Multiaddr, PeerId, Transport,
};
use libp2p::{identity, noise, yamux};
use libp2p::swarm::{
Swarm, ProtocolsHandler, IntoProtocolsHandler, PollParameters,
NetworkBehaviour, NetworkBehaviourAction
use std::{
error, io, iter,
task::{Context, Poll},
time::Duration,
};
use std::{error, io, iter, task::{Context, Poll}, time::Duration};
/// Builds two nodes that have each other as bootstrap nodes.
/// This is to be used only for testing, and a panic will happen if something goes wrong.
@@ -45,12 +50,11 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
.map(|_| format!("/memory/{}", rand::random::<u64>()).parse().unwrap())
.collect();
for index in 0 .. 2 {
for index in 0..2 {
let keypair = keypairs[index].clone();
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&keypair)
.unwrap();
let noise_keys =
noise::Keypair::<noise::X25519Spec>::new().into_authentic(&keypair).unwrap();
let transport = MemoryTransport
.upgrade(upgrade::Version::V1)
@@ -60,48 +64,43 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
.boxed();
let (peerset, _) = sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig {
sets: vec![
sc_peerset::SetConfig {
in_peers: 25,
out_peers: 25,
bootnodes: if index == 0 {
keypairs
.iter()
.skip(1)
.map(|keypair| keypair.public().into_peer_id())
.collect()
} else {
vec![]
},
reserved_nodes: Default::default(),
reserved_only: false,
}
],
sets: vec![sc_peerset::SetConfig {
in_peers: 25,
out_peers: 25,
bootnodes: if index == 0 {
keypairs.iter().skip(1).map(|keypair| keypair.public().into_peer_id()).collect()
} else {
vec![]
},
reserved_nodes: Default::default(),
reserved_only: false,
}],
});
let behaviour = CustomProtoWithAddr {
inner: Notifications::new(peerset, iter::once(ProtocolConfig {
name: "/foo".into(),
fallback_names: Vec::new(),
handshake: Vec::new(),
max_notification_size: 1024 * 1024
})),
inner: Notifications::new(
peerset,
iter::once(ProtocolConfig {
name: "/foo".into(),
fallback_names: Vec::new(),
handshake: Vec::new(),
max_notification_size: 1024 * 1024,
}),
),
addrs: addrs
.iter()
.enumerate()
.filter_map(|(n, a)| if n != index {
Some((keypairs[n].public().into_peer_id(), a.clone()))
} else {
None
.filter_map(|(n, a)| {
if n != index {
Some((keypairs[n].public().into_peer_id(), a.clone()))
} else {
None
}
})
.collect(),
};
let mut swarm = Swarm::new(
transport,
behaviour,
keypairs[index].public().into_peer_id()
);
let mut swarm = Swarm::new(transport, behaviour, keypairs[index].public().into_peer_id());
swarm.listen_on(addrs[index].clone()).unwrap();
out.push(swarm);
}
@@ -159,11 +158,21 @@ impl NetworkBehaviour for CustomProtoWithAddr {
self.inner.inject_disconnected(peer_id)
}
fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
) {
self.inner.inject_connection_established(peer_id, conn, endpoint)
}
fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
conn: &ConnectionId,
endpoint: &ConnectedPoint,
) {
self.inner.inject_connection_closed(peer_id, conn, endpoint)
}
@@ -171,7 +180,7 @@ impl NetworkBehaviour for CustomProtoWithAddr {
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
) {
self.inner.inject_event(peer_id, connection, event)
}
@@ -185,11 +194,16 @@ impl NetworkBehaviour for CustomProtoWithAddr {
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
Self::OutEvent
>
> {
>{
self.inner.poll(cx, params)
}
fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) {
fn inject_addr_reach_failure(
&mut self,
peer_id: Option<&PeerId>,
addr: &Multiaddr,
error: &dyn std::error::Error,
) {
self.inner.inject_addr_reach_failure(peer_id, addr, error)
}
@@ -235,7 +249,12 @@ fn reconnect_after_disconnect() {
// For this test, the services can be in the following states.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum ServiceState { NotConnected, FirstConnec, Disconnected, ConnectedAgain }
enum ServiceState {
NotConnected,
FirstConnec,
Disconnected,
ConnectedAgain,
}
let mut service1_state = ServiceState::NotConnected;
let mut service2_state = ServiceState::NotConnected;
@@ -253,55 +272,55 @@ fn reconnect_after_disconnect() {
};
match event {
future::Either::Left(NotificationsOut::CustomProtocolOpen { .. }) => {
future::Either::Left(NotificationsOut::CustomProtocolOpen { .. }) =>
match service1_state {
ServiceState::NotConnected => {
service1_state = ServiceState::FirstConnec;
if service2_state == ServiceState::FirstConnec {
service1.behaviour_mut().disconnect_peer(
Swarm::local_peer_id(&service2),
sc_peerset::SetId::from(0)
sc_peerset::SetId::from(0),
);
}
},
ServiceState::Disconnected => service1_state = ServiceState::ConnectedAgain,
ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(),
}
},
future::Either::Left(NotificationsOut::CustomProtocolClosed { .. }) => {
},
future::Either::Left(NotificationsOut::CustomProtocolClosed { .. }) =>
match service1_state {
ServiceState::FirstConnec => service1_state = ServiceState::Disconnected,
ServiceState::ConnectedAgain| ServiceState::NotConnected |
ServiceState::ConnectedAgain |
ServiceState::NotConnected |
ServiceState::Disconnected => panic!(),
}
},
future::Either::Right(NotificationsOut::CustomProtocolOpen { .. }) => {
},
future::Either::Right(NotificationsOut::CustomProtocolOpen { .. }) =>
match service2_state {
ServiceState::NotConnected => {
service2_state = ServiceState::FirstConnec;
if service1_state == ServiceState::FirstConnec {
service1.behaviour_mut().disconnect_peer(
Swarm::local_peer_id(&service2),
sc_peerset::SetId::from(0)
sc_peerset::SetId::from(0),
);
}
},
ServiceState::Disconnected => service2_state = ServiceState::ConnectedAgain,
ServiceState::FirstConnec | ServiceState::ConnectedAgain => panic!(),
}
},
future::Either::Right(NotificationsOut::CustomProtocolClosed { .. }) => {
},
future::Either::Right(NotificationsOut::CustomProtocolClosed { .. }) =>
match service2_state {
ServiceState::FirstConnec => service2_state = ServiceState::Disconnected,
ServiceState::ConnectedAgain| ServiceState::NotConnected |
ServiceState::ConnectedAgain |
ServiceState::NotConnected |
ServiceState::Disconnected => panic!(),
}
},
_ => {}
},
_ => {},
}
if service1_state == ServiceState::ConnectedAgain && service2_state == ServiceState::ConnectedAgain {
break;
if service1_state == ServiceState::ConnectedAgain &&
service2_state == ServiceState::ConnectedAgain
{
break
}
}
@@ -316,7 +335,7 @@ fn reconnect_after_disconnect() {
let s2 = service2.next();
futures::pin_mut!(s1, s2);
match future::select(future::select(s1, s2), &mut delay).await {
future::Either::Right(_) => break, // success
future::Either::Right(_) => break, // success
future::Either::Left((future::Either::Left((ev, _)), _)) => ev,
future::Either::Left((future::Either::Right((ev, _)), _)) => ev,
}
@@ -325,7 +344,7 @@ fn reconnect_after_disconnect() {
match event {
NotificationsOut::CustomProtocolOpen { .. } |
NotificationsOut::CustomProtocolClosed { .. } => panic!(),
_ => {}
_ => {},
}
}
});
@@ -16,16 +16,13 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
pub use self::collec::UpgradeCollec;
pub use self::notifications::{
NotificationsIn,
NotificationsInOpen,
NotificationsInSubstream,
NotificationsOut,
NotificationsOutOpen,
NotificationsOutSubstream,
NotificationsHandshakeError,
NotificationsOutError,
pub use self::{
collec::UpgradeCollec,
notifications::{
NotificationsHandshakeError, NotificationsIn, NotificationsInOpen,
NotificationsInSubstream, NotificationsOut, NotificationsOutError, NotificationsOutOpen,
NotificationsOutSubstream,
},
};
mod collec;
@@ -18,7 +18,12 @@
use futures::prelude::*;
use libp2p::core::upgrade::{InboundUpgrade, ProtocolName, UpgradeInfo};
use std::{iter::FromIterator, pin::Pin, task::{Context, Poll}, vec};
use std::{
iter::FromIterator,
pin::Pin,
task::{Context, Poll},
vec,
};
// TODO: move this to libp2p => https://github.com/libp2p/rust-libp2p/issues/1445
@@ -44,9 +49,10 @@ impl<T: UpgradeInfo> UpgradeInfo for UpgradeCollec<T> {
type InfoIter = vec::IntoIter<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
self.0.iter().enumerate()
.flat_map(|(n, p)|
p.protocol_info().into_iter().map(move |i| ProtoNameWithUsize(i, n)))
self.0
.iter()
.enumerate()
.flat_map(|(n, p)| p.protocol_info().into_iter().map(move |i| ProtoNameWithUsize(i, n)))
.collect::<Vec<_>>()
.into_iter()
}
@@ -16,6 +16,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use asynchronous_codec::Framed;
/// Notifications protocol.
///
/// The Substrate notifications protocol consists in the following:
@@ -34,14 +35,18 @@
///
/// Notification substreams are unidirectional. If A opens a substream with B, then B is
/// encouraged but not required to open a substream to A as well.
///
use bytes::BytesMut;
use futures::prelude::*;
use asynchronous_codec::Framed;
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade};
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use log::error;
use std::{borrow::Cow, convert::{Infallible, TryFrom as _}, io, mem, pin::Pin, task::{Context, Poll}, vec};
use std::{
borrow::Cow,
convert::{Infallible, TryFrom as _},
io, mem,
pin::Pin,
task::{Context, Poll},
vec,
};
use unsigned_varint::codec::UviBytes;
/// Maximum allowed size of the two handshake messages, in bytes.
@@ -111,15 +116,12 @@ impl NotificationsIn {
pub fn new(
main_protocol_name: impl Into<Cow<'static, str>>,
fallback_names: Vec<Cow<'static, str>>,
max_notification_size: u64
max_notification_size: u64,
) -> Self {
let mut protocol_names = fallback_names;
protocol_names.insert(0, main_protocol_name.into());
NotificationsIn {
protocol_names,
max_notification_size,
}
NotificationsIn { protocol_names, max_notification_size }
}
}
@@ -128,29 +130,31 @@ impl UpgradeInfo for NotificationsIn {
type InfoIter = vec::IntoIter<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
self.protocol_names.iter().cloned().map(StringProtocolName).collect::<Vec<_>>().into_iter()
self.protocol_names
.iter()
.cloned()
.map(StringProtocolName)
.collect::<Vec<_>>()
.into_iter()
}
}
impl<TSubstream> InboundUpgrade<TSubstream> for NotificationsIn
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
where
TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = NotificationsInOpen<TSubstream>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type Error = NotificationsHandshakeError;
fn upgrade_inbound(
self,
mut socket: TSubstream,
negotiated_name: Self::Info,
) -> Self::Future {
fn upgrade_inbound(self, mut socket: TSubstream, negotiated_name: Self::Info) -> Self::Future {
Box::pin(async move {
let handshake_len = unsigned_varint::aio::read_usize(&mut socket).await?;
if handshake_len > MAX_HANDSHAKE_SIZE {
return Err(NotificationsHandshakeError::TooLarge {
requested: handshake_len,
max: MAX_HANDSHAKE_SIZE,
});
})
}
let mut handshake = vec![0u8; handshake_len];
@@ -191,13 +195,14 @@ pub struct NotificationsInOpen<TSubstream> {
}
impl<TSubstream> NotificationsInSubstream<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Unpin,
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
/// Sends the handshake in order to inform the remote that we accept the substream.
pub fn send_handshake(&mut self, message: impl Into<Vec<u8>>) {
if !matches!(self.handshake, NotificationsInSubstreamHandshake::NotSent) {
error!(target: "sub-libp2p", "Tried to send handshake twice");
return;
return
}
self.handshake = NotificationsInSubstreamHandshake::PendingSend(message.into());
@@ -205,7 +210,10 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
/// Equivalent to `Stream::poll_next`, except that it only drives the handshake and is
/// guaranteed to not generate any notification.
pub fn poll_process(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<Infallible, io::Error>> {
pub fn poll_process(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<Infallible, io::Error>> {
let mut this = self.project();
loop {
@@ -222,7 +230,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::PendingSend(msg);
return Poll::Pending
}
},
},
NotificationsInSubstreamHandshake::Flush =>
match Sink::poll_flush(this.socket.as_mut(), cx)? {
@@ -231,7 +239,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::Flush;
return Poll::Pending
}
},
},
st @ NotificationsInSubstreamHandshake::NotSent |
@@ -239,15 +247,16 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote |
st @ NotificationsInSubstreamHandshake::BothSidesClosed => {
*this.handshake = st;
return Poll::Pending;
}
return Poll::Pending
},
}
}
}
}
impl<TSubstream> Stream for NotificationsInSubstream<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Unpin,
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
type Item = Result<BytesMut, io::Error>;
@@ -273,7 +282,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::PendingSend(msg);
return Poll::Pending
}
},
},
NotificationsInSubstreamHandshake::Flush =>
match Sink::poll_flush(this.socket.as_mut(), cx)? {
@@ -282,13 +291,14 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::Flush;
return Poll::Pending
}
},
},
NotificationsInSubstreamHandshake::Sent => {
match Stream::poll_next(this.socket.as_mut(), cx) {
Poll::Ready(None) => *this.handshake =
NotificationsInSubstreamHandshake::ClosingInResponseToRemote,
Poll::Ready(None) =>
*this.handshake =
NotificationsInSubstreamHandshake::ClosingInResponseToRemote,
Poll::Ready(Some(msg)) => {
*this.handshake = NotificationsInSubstreamHandshake::Sent;
return Poll::Ready(Some(msg))
@@ -305,13 +315,13 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
Poll::Ready(()) =>
*this.handshake = NotificationsInSubstreamHandshake::BothSidesClosed,
Poll::Pending => {
*this.handshake = NotificationsInSubstreamHandshake::ClosingInResponseToRemote;
*this.handshake =
NotificationsInSubstreamHandshake::ClosingInResponseToRemote;
return Poll::Pending
}
},
},
NotificationsInSubstreamHandshake::BothSidesClosed =>
return Poll::Ready(None),
NotificationsInSubstreamHandshake::BothSidesClosed => return Poll::Ready(None),
}
}
}
@@ -333,11 +343,7 @@ impl NotificationsOut {
let mut protocol_names = fallback_names;
protocol_names.insert(0, main_protocol_name.into());
NotificationsOut {
protocol_names,
initial_message,
max_notification_size,
}
NotificationsOut { protocol_names, initial_message, max_notification_size }
}
}
@@ -356,22 +362,24 @@ impl UpgradeInfo for NotificationsOut {
type InfoIter = vec::IntoIter<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
self.protocol_names.iter().cloned().map(StringProtocolName).collect::<Vec<_>>().into_iter()
self.protocol_names
.iter()
.cloned()
.map(StringProtocolName)
.collect::<Vec<_>>()
.into_iter()
}
}
impl<TSubstream> OutboundUpgrade<TSubstream> for NotificationsOut
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
where
TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = NotificationsOutOpen<TSubstream>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type Error = NotificationsHandshakeError;
fn upgrade_outbound(
self,
mut socket: TSubstream,
negotiated_name: Self::Info,
) -> Self::Future {
fn upgrade_outbound(self, mut socket: TSubstream, negotiated_name: Self::Info) -> Self::Future {
Box::pin(async move {
upgrade::write_with_len_prefix(&mut socket, &self.initial_message).await?;
@@ -381,7 +389,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
return Err(NotificationsHandshakeError::TooLarge {
requested: handshake_len,
max: MAX_HANDSHAKE_SIZE,
});
})
}
let mut handshake = vec![0u8; handshake_len];
@@ -399,9 +407,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
} else {
Some(negotiated_name.0)
},
substream: NotificationsOutSubstream {
socket: Framed::new(socket, codec),
}
substream: NotificationsOutSubstream { socket: Framed::new(socket, codec) },
})
})
}
@@ -419,14 +425,14 @@ pub struct NotificationsOutOpen<TSubstream> {
}
impl<TSubstream> Sink<Vec<u8>> for NotificationsOutSubstream<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Unpin,
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
type Error = NotificationsOutError;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Sink::poll_ready(this.socket.as_mut(), cx)
.map_err(NotificationsOutError::Io)
Sink::poll_ready(this.socket.as_mut(), cx).map_err(NotificationsOutError::Io)
}
fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
@@ -437,14 +443,12 @@ impl<TSubstream> Sink<Vec<u8>> for NotificationsOutSubstream<TSubstream>
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Sink::poll_flush(this.socket.as_mut(), cx)
.map_err(NotificationsOutError::Io)
Sink::poll_flush(this.socket.as_mut(), cx).map_err(NotificationsOutError::Io)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Sink::poll_close(this.socket.as_mut(), cx)
.map_err(NotificationsOutError::Io)
Sink::poll_close(this.socket.as_mut(), cx).map_err(NotificationsOutError::Io)
}
}
@@ -471,11 +475,12 @@ impl From<unsigned_varint::io::ReadError> for NotificationsHandshakeError {
fn from(err: unsigned_varint::io::ReadError) -> Self {
match err {
unsigned_varint::io::ReadError::Io(err) => NotificationsHandshakeError::Io(err),
unsigned_varint::io::ReadError::Decode(err) => NotificationsHandshakeError::VarintDecode(err),
unsigned_varint::io::ReadError::Decode(err) =>
NotificationsHandshakeError::VarintDecode(err),
_ => {
log::warn!("Unrecognized varint decoding error");
NotificationsHandshakeError::Io(From::from(io::ErrorKind::InvalidData))
}
},
}
}
}
@@ -492,7 +497,7 @@ mod tests {
use super::{NotificationsIn, NotificationsInOpen, NotificationsOut, NotificationsOutOpen};
use async_std::net::{TcpListener, TcpStream};
use futures::{prelude::*, channel::oneshot};
use futures::{channel::oneshot, prelude::*};
use libp2p::core::upgrade;
use std::borrow::Cow;
@@ -506,8 +511,10 @@ mod tests {
let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
upgrade::Version::V1
).await.unwrap();
upgrade::Version::V1,
)
.await
.unwrap();
assert_eq!(handshake, b"hello world");
substream.send(b"test message".to_vec()).await.unwrap();
@@ -520,8 +527,10 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024)
).await.unwrap();
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
assert_eq!(handshake, b"initial message");
substream.send_handshake(&b"hello world"[..]);
@@ -545,8 +554,10 @@ mod tests {
let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, Vec::new(), vec![], 1024 * 1024),
upgrade::Version::V1
).await.unwrap();
upgrade::Version::V1,
)
.await
.unwrap();
assert!(handshake.is_empty());
substream.send(Default::default()).await.unwrap();
@@ -559,8 +570,10 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024)
).await.unwrap();
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
assert!(handshake.is_empty());
substream.send_handshake(vec![]);
@@ -582,8 +595,9 @@ mod tests {
let outcome = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"hello"[..], 1024 * 1024),
upgrade::Version::V1
).await;
upgrade::Version::V1,
)
.await;
// Despite the protocol negotiation being successfully conducted on the listener
// side, we have to receive an error here because the listener didn't send the
@@ -598,8 +612,10 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, substream, .. } = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024)
).await.unwrap();
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
assert_eq!(handshake, b"hello");
@@ -620,9 +636,15 @@ mod tests {
let ret = upgrade::apply_outbound(
socket,
// We check that an initial message that is too large gets refused.
NotificationsOut::new(PROTO_NAME, Vec::new(), (0..32768).map(|_| 0).collect::<Vec<_>>(), 1024 * 1024),
upgrade::Version::V1
).await;
NotificationsOut::new(
PROTO_NAME,
Vec::new(),
(0..32768).map(|_| 0).collect::<Vec<_>>(),
1024 * 1024,
),
upgrade::Version::V1,
)
.await;
assert!(ret.is_err());
});
@@ -633,8 +655,9 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let ret = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024)
).await;
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await;
assert!(ret.is_err());
});
@@ -651,8 +674,9 @@ mod tests {
let ret = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
upgrade::Version::V1
).await;
upgrade::Version::V1,
)
.await;
assert!(ret.is_err());
});
@@ -663,8 +687,10 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024)
).await.unwrap();
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024),
)
.await
.unwrap();
assert_eq!(handshake, b"initial message");
// We check that a handshake that is too large gets refused.
File diff suppressed because it is too large Load Diff
@@ -16,13 +16,15 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::cmp;
use std::ops::Range;
use std::collections::{HashMap, BTreeMap};
use log::trace;
use libp2p::PeerId;
use sp_runtime::traits::{Block as BlockT, NumberFor, One};
use crate::protocol::message;
use libp2p::PeerId;
use log::trace;
use sp_runtime::traits::{Block as BlockT, NumberFor, One};
use std::{
cmp,
collections::{BTreeMap, HashMap},
ops::Range,
};
/// Block data with origin.
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -35,10 +37,7 @@ pub struct BlockData<B: BlockT> {
#[derive(Debug)]
enum BlockRangeState<B: BlockT> {
Downloading {
len: NumberFor<B>,
downloading: u32,
},
Downloading { len: NumberFor<B>, downloading: u32 },
Complete(Vec<BlockData<B>>),
}
@@ -62,10 +61,7 @@ pub struct BlockCollection<B: BlockT> {
impl<B: BlockT> BlockCollection<B> {
/// Create a new instance.
pub fn new() -> Self {
BlockCollection {
blocks: BTreeMap::new(),
peer_requests: HashMap::new(),
}
BlockCollection { blocks: BTreeMap::new(), peer_requests: HashMap::new() }
}
/// Clear everything.
@@ -77,7 +73,7 @@ impl<B: BlockT> BlockCollection<B> {
/// Insert a set of blocks into collection.
pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, who: PeerId) {
if blocks.is_empty() {
return;
return
}
match self.blocks.get(&start) {
@@ -86,13 +82,20 @@ impl<B: BlockT> BlockCollection<B> {
},
Some(&BlockRangeState::Complete(ref existing)) if existing.len() >= blocks.len() => {
trace!(target: "sync", "Ignored block data already downloaded: {}", start);
return;
return
},
_ => (),
}
self.blocks.insert(start, BlockRangeState::Complete(blocks.into_iter()
.map(|b| BlockData { origin: Some(who.clone()), block: b }).collect()));
self.blocks.insert(
start,
BlockRangeState::Complete(
blocks
.into_iter()
.map(|b| BlockData { origin: Some(who.clone()), block: b })
.collect(),
),
);
}
/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
@@ -107,7 +110,7 @@ impl<B: BlockT> BlockCollection<B> {
) -> Option<Range<NumberFor<B>>> {
if peer_best <= common {
// Bail out early
return None;
return None
}
// First block number that we need to download
let first_different = common + <NumberFor<B>>::one();
@@ -120,15 +123,13 @@ impl<B: BlockT> BlockCollection<B> {
break match (prev, next) {
(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
if downloading < max_parallel =>
(*start .. *start + *len, downloading),
(*start..*start + *len, downloading),
(Some((start, r)), Some((next_start, _))) if *start + r.len() < *next_start =>
(*start + r.len() .. cmp::min(*next_start, *start + r.len() + count), 0), // gap
(Some((start, r)), None) =>
(*start + r.len() .. *start + r.len() + count, 0), // last range
(None, None) =>
(first_different .. first_different + count, 0), // empty
(*start + r.len()..cmp::min(*next_start, *start + r.len() + count), 0), // gap
(Some((start, r)), None) => (*start + r.len()..*start + r.len() + count, 0), /* last range */
(None, None) => (first_different..first_different + count, 0), /* empty */
(None, Some((start, _))) if *start > first_different =>
(first_different .. cmp::min(first_different + count, *start), 0), // gap at the start
(first_different..cmp::min(first_different + count, *start), 0), /* gap at the start */
_ => {
prev = next;
continue
@@ -139,23 +140,33 @@ impl<B: BlockT> BlockCollection<B> {
// crop to peers best
if range.start > peer_best {
trace!(target: "sync", "Out of range for peer {} ({} vs {})", who, range.start, peer_best);
return None;
return None
}
range.end = cmp::min(peer_best + One::one(), range.end);
if self.blocks.iter().next().map_or(false, |(n, _)| range.start > *n + max_ahead.into()) {
if self
.blocks
.iter()
.next()
.map_or(false, |(n, _)| range.start > *n + max_ahead.into())
{
trace!(target: "sync", "Too far ahead for peer {} ({})", who, range.start);
return None;
return None
}
self.peer_requests.insert(who, range.start);
self.blocks.insert(range.start, BlockRangeState::Downloading {
len: range.end - range.start,
downloading: downloading + 1
});
self.blocks.insert(
range.start,
BlockRangeState::Downloading {
len: range.end - range.start,
downloading: downloading + 1,
},
);
if range.end <= range.start {
panic!("Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}",
range, count, peer_best, common, self.blocks);
panic!(
"Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}",
range, count, peer_best, common, self.blocks
);
}
Some(range)
}
@@ -188,16 +199,14 @@ impl<B: BlockT> BlockCollection<B> {
pub fn clear_peer_download(&mut self, who: &PeerId) {
if let Some(start) = self.peer_requests.remove(who) {
let remove = match self.blocks.get_mut(&start) {
Some(&mut BlockRangeState::Downloading { ref mut downloading, .. }) if *downloading > 1 => {
Some(&mut BlockRangeState::Downloading { ref mut downloading, .. })
if *downloading > 1 =>
{
*downloading -= 1;
false
},
Some(&mut BlockRangeState::Downloading { .. }) => {
true
},
_ => {
false
}
Some(&mut BlockRangeState::Downloading { .. }) => true,
_ => false,
};
if remove {
self.blocks.remove(&start);
@@ -210,27 +219,28 @@ impl<B: BlockT> BlockCollection<B> {
mod test {
use super::{BlockCollection, BlockData, BlockRangeState};
use crate::{protocol::message, PeerId};
use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper};
use sp_core::H256;
use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper};
type Block = RawBlock<ExtrinsicWrapper<u64>>;
fn is_empty(bc: &BlockCollection<Block>) -> bool {
bc.blocks.is_empty() &&
bc.peer_requests.is_empty()
bc.blocks.is_empty() && bc.peer_requests.is_empty()
}
fn generate_blocks(n: usize) -> Vec<message::BlockData<Block>> {
(0 .. n).map(|_| message::generic::BlockData {
hash: H256::random(),
header: None,
body: None,
indexed_body: None,
message_queue: None,
receipt: None,
justification: None,
justifications: None,
}).collect()
(0..n)
.map(|_| message::generic::BlockData {
hash: H256::random(),
header: None,
body: None,
indexed_body: None,
message_queue: None,
receipt: None,
justification: None,
justifications: None,
})
.collect()
}
#[test]
@@ -252,32 +262,47 @@ mod test {
let peer2 = PeerId::random();
let blocks = generate_blocks(150);
assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0, 1, 200), Some(1 .. 41));
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0, 1, 200), Some(41 .. 81));
assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 0, 1, 200), Some(81 .. 121));
assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0, 1, 200), Some(1..41));
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0, 1, 200), Some(41..81));
assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 0, 1, 200), Some(81..121));
bc.clear_peer_download(&peer1);
bc.insert(41, blocks[41..81].to_vec(), peer1.clone());
assert_eq!(bc.drain(1), vec![]);
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0, 1, 200), Some(121 .. 151));
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0, 1, 200), Some(121..151));
bc.clear_peer_download(&peer0);
bc.insert(1, blocks[1..11].to_vec(), peer0.clone());
assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0, 1, 200), Some(11 .. 41));
assert_eq!(bc.drain(1), blocks[1..11].iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }).collect::<Vec<_>>());
assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0, 1, 200), Some(11..41));
assert_eq!(
bc.drain(1),
blocks[1..11]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) })
.collect::<Vec<_>>()
);
bc.clear_peer_download(&peer0);
bc.insert(11, blocks[11..41].to_vec(), peer0.clone());
let drained = bc.drain(12);
assert_eq!(drained[..30], blocks[11..41].iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }).collect::<Vec<_>>()[..]);
assert_eq!(drained[30..], blocks[41..81].iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }).collect::<Vec<_>>()[..]);
assert_eq!(
drained[..30],
blocks[11..41]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) })
.collect::<Vec<_>>()[..]
);
assert_eq!(
drained[30..],
blocks[41..81]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) })
.collect::<Vec<_>>()[..]
);
bc.clear_peer_download(&peer2);
assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 80, 1, 200), Some(81 .. 121));
assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 80, 1, 200), Some(81..121));
bc.clear_peer_download(&peer2);
bc.insert(81, blocks[81..121].to_vec(), peer2.clone());
bc.clear_peer_download(&peer1);
@@ -285,25 +310,38 @@ mod test {
assert_eq!(bc.drain(80), vec![]);
let drained = bc.drain(81);
assert_eq!(drained[..40], blocks[81..121].iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer2.clone()) }).collect::<Vec<_>>()[..]);
assert_eq!(drained[40..], blocks[121..150].iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }).collect::<Vec<_>>()[..]);
assert_eq!(
drained[..40],
blocks[81..121]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer2.clone()) })
.collect::<Vec<_>>()[..]
);
assert_eq!(
drained[40..],
blocks[121..150]
.iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) })
.collect::<Vec<_>>()[..]
);
}
#[test]
fn large_gap() {
let mut bc: BlockCollection<Block> = BlockCollection::new();
bc.blocks.insert(100, BlockRangeState::Downloading {
len: 128,
downloading: 1,
});
let blocks = generate_blocks(10).into_iter().map(|b| BlockData { block: b, origin: None }).collect();
bc.blocks.insert(100, BlockRangeState::Downloading { len: 128, downloading: 1 });
let blocks = generate_blocks(10)
.into_iter()
.map(|b| BlockData { block: b, origin: None })
.collect();
bc.blocks.insert(114305, BlockRangeState::Complete(blocks));
let peer0 = PeerId::random();
assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 000, 1, 200), Some(1 .. 100));
assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 000, 1, 200), Some(1..100));
assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 600, 1, 200), None); // too far ahead
assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 600, 1, 200000), Some(100 + 128 .. 100 + 128 + 128));
assert_eq!(
bc.needed_blocks(peer0.clone(), 128, 10000, 600, 1, 200000),
Some(100 + 128..100 + 128 + 128)
);
}
}
@@ -16,14 +16,16 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use sp_blockchain::Error as ClientError;
use crate::protocol::sync::{PeerSync, PeerSyncState};
use fork_tree::ForkTree;
use libp2p::PeerId;
use log::{debug, trace, warn};
use sp_blockchain::Error as ClientError;
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::Duration;
use std::{
collections::{HashMap, HashSet, VecDeque},
time::Duration,
};
use wasm_timer::Instant;
// Time to wait before trying to get the same extra data from the same peer.
@@ -61,7 +63,7 @@ pub(crate) struct Metrics {
pub(crate) active_requests: u32,
pub(crate) importing_requests: u32,
pub(crate) failed_requests: u32,
_priv: ()
_priv: (),
}
impl<B: BlockT> ExtraRequests<B> {
@@ -93,13 +95,14 @@ impl<B: BlockT> ExtraRequests<B> {
/// Queue an extra data request to be considered by the `Matcher`.
pub(crate) fn schedule<F>(&mut self, request: ExtraRequest<B>, is_descendent_of: F)
where F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>
where
F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>,
{
match self.tree.import(request.0, request.1, (), &is_descendent_of) {
Ok(true) => {
// this is a new root so we add it to the current `pending_requests`
self.pending_requests.push_back((request.0, request.1));
}
},
Err(fork_tree::Error::Revert) => {
// we have finalized further than the given request, presumably
// by some other part of the system (not sync). we can safely
@@ -107,8 +110,8 @@ impl<B: BlockT> ExtraRequests<B> {
},
Err(err) => {
debug!(target: "sync", "Failed to insert request {:?} into tree: {:?}", request, err);
}
_ => ()
},
_ => (),
}
}
@@ -120,7 +123,11 @@ impl<B: BlockT> ExtraRequests<B> {
}
/// Processes the response for the request previously sent to the given peer.
pub(crate) fn on_response<R>(&mut self, who: PeerId, resp: Option<R>) -> Option<(PeerId, B::Hash, NumberFor<B>, R)> {
pub(crate) fn on_response<R>(
&mut self,
who: PeerId,
resp: Option<R>,
) -> Option<(PeerId, B::Hash, NumberFor<B>, R)> {
// we assume that the request maps to the given response, this is
// currently enforced by the outer network protocol before passing on
// messages to chain sync.
@@ -157,9 +164,10 @@ impl<B: BlockT> ExtraRequests<B> {
&mut self,
best_finalized_hash: &B::Hash,
best_finalized_number: NumberFor<B>,
is_descendent_of: F
is_descendent_of: F,
) -> Result<(), fork_tree::Error<ClientError>>
where F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>
where
F: Fn(&B::Hash, &B::Hash) -> Result<bool, ClientError>,
{
let request = (*best_finalized_hash, best_finalized_number);
@@ -203,9 +211,8 @@ impl<B: BlockT> ExtraRequests<B> {
&mut self,
request: ExtraRequest<B>,
result: Result<ExtraRequest<B>, E>,
reschedule_on_failure: bool
) -> bool
{
reschedule_on_failure: bool,
) -> bool {
if !self.importing_requests.remove(&request) {
return false
}
@@ -217,7 +224,7 @@ impl<B: BlockT> ExtraRequests<B> {
self.pending_requests.push_front(request);
}
return true
}
},
};
if self.tree.finalize_root(&finalized_hash).is_none() {
@@ -258,7 +265,7 @@ impl<B: BlockT> ExtraRequests<B> {
active_requests: self.active_requests.len().try_into().unwrap_or(std::u32::MAX),
failed_requests: self.failed_requests.len().try_into().unwrap_or(std::u32::MAX),
importing_requests: self.importing_requests.len().try_into().unwrap_or(std::u32::MAX),
_priv: ()
_priv: (),
}
}
}
@@ -269,15 +276,12 @@ pub(crate) struct Matcher<'a, B: BlockT> {
/// Length of pending requests collection.
/// Used to ensure we do not loop more than once over all pending requests.
remaining: usize,
extras: &'a mut ExtraRequests<B>
extras: &'a mut ExtraRequests<B>,
}
impl<'a, B: BlockT> Matcher<'a, B> {
fn new(extras: &'a mut ExtraRequests<B>) -> Self {
Matcher {
remaining: extras.pending_requests.len(),
extras
}
Matcher { remaining: extras.pending_requests.len(), extras }
}
/// Finds a peer to which a pending request can be sent.
@@ -294,7 +298,10 @@ impl<'a, B: BlockT> Matcher<'a, B> {
///
/// The returned `PeerId` (if any) is guaranteed to come from the given `peers`
/// argument.
pub(crate) fn next(&mut self, peers: &HashMap<PeerId, PeerSync<B>>) -> Option<(PeerId, ExtraRequest<B>)> {
pub(crate) fn next(
&mut self,
peers: &HashMap<PeerId, PeerSync<B>>,
) -> Option<(PeerId, ExtraRequest<B>)> {
if self.remaining == 0 {
return None
}
@@ -305,7 +312,9 @@ impl<'a, B: BlockT> Matcher<'a, B> {
}
while let Some(request) = self.extras.pending_requests.pop_front() {
for (peer, sync) in peers.iter().filter(|(_, sync)| sync.state == PeerSyncState::Available) {
for (peer, sync) in
peers.iter().filter(|(_, sync)| sync.state == PeerSyncState::Available)
{
// only ask peers that have synced at least up to the block number that we're asking the extra for
if sync.best_number < request.1 {
continue
@@ -315,7 +324,13 @@ impl<'a, B: BlockT> Matcher<'a, B> {
continue
}
// only ask if the same request has not failed for this peer before
if self.extras.failed_requests.get(&request).map(|rr| rr.iter().any(|i| &i.0 == peer)).unwrap_or(false) {
if self
.extras
.failed_requests
.get(&request)
.map(|rr| rr.iter().any(|i| &i.0 == peer))
.unwrap_or(false)
{
continue
}
self.extras.active_requests.insert(peer.clone(), request);
@@ -343,22 +358,22 @@ impl<'a, B: BlockT> Matcher<'a, B> {
#[cfg(test)]
mod tests {
use crate::protocol::sync::PeerSync;
use sp_blockchain::Error as ClientError;
use quickcheck::{Arbitrary, Gen, QuickCheck};
use std::collections::{HashMap, HashSet};
use super::*;
use crate::protocol::sync::PeerSync;
use quickcheck::{Arbitrary, Gen, QuickCheck};
use sp_blockchain::Error as ClientError;
use sp_test_primitives::{Block, BlockNumber, Hash};
use std::collections::{HashMap, HashSet};
#[test]
fn requests_are_processed_in_order() {
fn property(mut peers: ArbitraryPeers) {
let mut requests = ExtraRequests::<Block>::new("test");
let num_peers_available = peers.0.values()
.filter(|s| s.state == PeerSyncState::Available).count();
let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
for i in 0 .. num_peers_available {
for i in 0..num_peers_available {
requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
}
@@ -368,12 +383,12 @@ mod tests {
for p in &pending {
let (peer, r) = m.next(&peers.0).unwrap();
assert_eq!(p, &r);
peers.0.get_mut(&peer).unwrap().state = PeerSyncState::DownloadingJustification(r.0);
peers.0.get_mut(&peer).unwrap().state =
PeerSyncState::DownloadingJustification(r.0);
}
}
QuickCheck::new()
.quickcheck(property as fn(ArbitraryPeers))
QuickCheck::new().quickcheck(property as fn(ArbitraryPeers))
}
#[test]
@@ -398,22 +413,24 @@ mod tests {
fn property(mut peers: ArbitraryPeers) -> bool {
let mut requests = ExtraRequests::<Block>::new("test");
let num_peers_available = peers.0.values()
.filter(|s| s.state == PeerSyncState::Available).count();
let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
for i in 0 .. num_peers_available {
for i in 0..num_peers_available {
requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
}
let mut m = requests.matcher();
while let Some((peer, r)) = m.next(&peers.0) {
peers.0.get_mut(&peer).unwrap().state = PeerSyncState::DownloadingJustification(r.0);
peers.0.get_mut(&peer).unwrap().state =
PeerSyncState::DownloadingJustification(r.0);
}
assert!(requests.pending_requests.is_empty());
let active_peers = requests.active_requests.keys().cloned().collect::<Vec<_>>();
let previously_active = requests.active_requests.values().cloned().collect::<HashSet<_>>();
let previously_active =
requests.active_requests.values().cloned().collect::<HashSet<_>>();
for peer in &active_peers {
requests.peer_disconnected(peer)
@@ -424,8 +441,7 @@ mod tests {
previously_active == requests.pending_requests.iter().cloned().collect::<HashSet<_>>()
}
QuickCheck::new()
.quickcheck(property as fn(ArbitraryPeers) -> bool)
QuickCheck::new().quickcheck(property as fn(ArbitraryPeers) -> bool)
}
#[test]
@@ -433,31 +449,44 @@ mod tests {
fn property(mut peers: ArbitraryPeers) {
let mut requests = ExtraRequests::<Block>::new("test");
let num_peers_available = peers.0.values()
.filter(|s| s.state == PeerSyncState::Available).count();
let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
for i in 0 .. num_peers_available {
for i in 0..num_peers_available {
requests.schedule((Hash::random(), i as u64), |a, b| Ok(a[0] >= b[0]))
}
let mut m = requests.matcher();
while let Some((peer, r)) = m.next(&peers.0) {
peers.0.get_mut(&peer).unwrap().state = PeerSyncState::DownloadingJustification(r.0);
peers.0.get_mut(&peer).unwrap().state =
PeerSyncState::DownloadingJustification(r.0);
}
let active = requests.active_requests.iter().map(|(p, &r)| (p.clone(), r)).collect::<Vec<_>>();
let active = requests
.active_requests
.iter()
.map(|(p, &r)| (p.clone(), r))
.collect::<Vec<_>>();
for (peer, req) in &active {
assert!(requests.failed_requests.get(req).is_none());
assert!(!requests.pending_requests.contains(req));
assert!(requests.on_response::<()>(peer.clone(), None).is_none());
assert!(requests.pending_requests.contains(req));
assert_eq!(1, requests.failed_requests.get(req).unwrap().iter().filter(|(p, _)| p == peer).count())
assert_eq!(
1,
requests
.failed_requests
.get(req)
.unwrap()
.iter()
.filter(|(p, _)| p == peer)
.count()
)
}
}
QuickCheck::new()
.quickcheck(property as fn(ArbitraryPeers))
QuickCheck::new().quickcheck(property as fn(ArbitraryPeers))
}
#[test]
@@ -497,7 +526,10 @@ mod tests {
finality_proofs.try_finalize_root::<()>((hash6, 6), Ok((hash7, 7)), true);
// ensure that there's no request for #6
assert_eq!(finality_proofs.pending_requests.iter().collect::<Vec<_>>(), Vec::<&(Hash, u64)>::new());
assert_eq!(
finality_proofs.pending_requests.iter().collect::<Vec<_>>(),
Vec::<&(Hash, u64)>::new()
);
}
#[test]
@@ -560,7 +592,7 @@ mod tests {
impl Arbitrary for ArbitraryPeers {
fn arbitrary(g: &mut Gen) -> Self {
let mut peers = HashMap::with_capacity(g.size());
for _ in 0 .. g.size() {
for _ in 0..g.size() {
let ps = ArbitraryPeerSync::arbitrary(g).0;
peers.insert(ps.peer_id.clone(), ps);
}
@@ -16,13 +16,15 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::sync::Arc;
use codec::{Encode, Decode};
use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
use sc_client_api::StorageProof;
use crate::schema::v1::{StateRequest, StateResponse, StateEntry};
use crate::chain::{Client, ImportedState};
use super::StateDownloadProgress;
use crate::{
chain::{Client, ImportedState},
schema::v1::{StateEntry, StateRequest, StateResponse},
};
use codec::{Decode, Encode};
use sc_client_api::StorageProof;
use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
use std::sync::Arc;
/// State sync support.
@@ -73,14 +75,14 @@ impl<B: BlockT> StateSync<B> {
target: "sync",
"Bad state response",
);
return ImportResult::BadResponse;
return ImportResult::BadResponse
}
if !self.skip_proof && response.proof.is_empty() {
log::debug!(
target: "sync",
"Missing proof",
);
return ImportResult::BadResponse;
return ImportResult::BadResponse
}
let complete = if !self.skip_proof {
log::debug!(
@@ -93,24 +95,21 @@ impl<B: BlockT> StateSync<B> {
Ok(proof) => proof,
Err(e) => {
log::debug!(target: "sync", "Error decoding proof: {:?}", e);
return ImportResult::BadResponse;
}
};
let (values, complete) = match self.client.verify_range_proof(
self.target_root,
proof,
&self.last_key
) {
Err(e) => {
log::debug!(
target: "sync",
"StateResponse failed proof verification: {:?}",
e,
);
return ImportResult::BadResponse;
return ImportResult::BadResponse
},
Ok(values) => values,
};
let (values, complete) =
match self.client.verify_range_proof(self.target_root, proof, &self.last_key) {
Err(e) => {
log::debug!(
target: "sync",
"StateResponse failed proof verification: {:?}",
e,
);
return ImportResult::BadResponse
},
Ok(values) => values,
};
log::debug!(target: "sync", "Imported with {} keys", values.len());
if let Some(last) = values.last().map(|(k, _)| k) {
@@ -120,7 +119,7 @@ impl<B: BlockT> StateSync<B> {
for (key, value) in values {
self.imported_bytes += key.len() as u64;
self.state.push((key, value))
};
}
self.imported_bytes += proof_size;
complete
} else {
@@ -142,10 +141,14 @@ impl<B: BlockT> StateSync<B> {
};
if complete {
self.complete = true;
ImportResult::Import(self.target_block.clone(), self.target_header.clone(), ImportedState {
block: self.target_block.clone(),
state: std::mem::take(&mut self.state)
})
ImportResult::Import(
self.target_block.clone(),
self.target_header.clone(),
ImportedState {
block: self.target_block.clone(),
state: std::mem::take(&mut self.state),
},
)
} else {
ImportResult::Continue(self.next_request())
}
@@ -178,10 +181,6 @@ impl<B: BlockT> StateSync<B> {
/// Returns state sync estimated progress.
pub fn progress(&self) -> StateDownloadProgress {
let percent_done = (*self.last_key.get(0).unwrap_or(&0u8) as u32) * 100 / 256;
StateDownloadProgress {
percentage: percent_done,
size: self.imported_bytes,
}
StateDownloadProgress { percentage: percent_done, size: self.imported_bytes }
}
}