Fix timer panics in the wasm light client (#4561)

* Make WASM browser thing compile

* Fix

* updated exit-future (github repo)

* Switch to broadcast crate

* Migrate client/cli

* Switch exit-future to modernize branch

* Small changes

* Switch to cargo version and fix fg tests

* fix basic-authorship

* Fix crash on grafana macro

* Fix grafana macro

* Switch node python version

* Disable record_metrics_slice in grafana macro on wasm

* Update client/grafana-data-source/src/lib.rs

* Revert "Update client/grafana-data-source/src/lib.rs"

This reverts commit 888009a8e0b7051bd4bfbbfdb0448bcf2e2aae93.

* Add wasm support for state machine

* Switch to my own libp2p version

* Revert "Switch to my own libp2p version"

This reverts commit ce613871b59264b3165b45c37943e6560240daa7.

* Revert "Add wasm support for state machine"

This reverts commit de7eaa0694d9534fc3b164621737968e9a6a7c5f.

* Add sc-browser

* Squash

* remove sc-browser

* Fix keystore on wasm

* stubs for removed functions to make env compatible with old runtimes

* Add test (that doesn't work)

* Fix build scripts

* Revert basic-authorship due to no panics

* Revert cli/informant

* Revert consensus

* revert offchain

* Update utils/browser/Cargo.toml

Co-Authored-By: Benjamin Kampmann <ben@gnunicorn.org>

* export console functions

* Add new chainspec

* Fix ws in chain spec

* revert chainspec

* Fix chainspec

* Use an Option<PathBuf> in keystore instead of cfg flags

* Remove crud

* Only use wasm-timer for instant and systemtime

* Remove telemetry changes

* Assuming this is ok

* Add a KeystoreConfig

* Add stubs back in

* Update libp2p

* Revert "Add stubs back in"

This reverts commit 4690cf1882aa0f99f7f00a58c4080c8aa9b77c36.

* Remove commented js again

* Bump kvdb-web version

* Fix cli

* Switch branch on futures-timer

* Fix tests

* Remove sc-client test build in check-web-wasm because there isn't a good way to build futures-timer with wasm-bindgen support in the build

* Remove more things ^^

* Switch branch on futures-timer back

* Put DB io stats behind a cfg flag

* Fix things

* Don't timeout transports on wasm

* Update branch of futures-timer and fix bad merge

* Spawn informant

* Fix network test

* Fix delay resets

* Changes

* Fix tests

* use wasm_timer for transaction pool

* Fixes

* Switch futures-timer to crates

* Only diagnose futures on native

* Fix sc-network-test tests

* Select log level in js

* Fix syncing ;^)

* Allow disabling colours in the informant

* Use OutputFormat enum for informant

* MallocSizeOf impl on transaction pool broke stuff because wasm_timer::Instant doesnt impl it so just revert the transaction pool to master

* Update futures-diagnose

* Revert "MallocSizeOf impl on transaction pool broke stuff because wasm_timer::Instant doesnt impl it so just revert the transaction pool to master"

This reverts commit baa4ffc94fd968b6660a2c17ba8113e06af15548.

* Pass whole chain spec in start_client

* Get Instant::now to work in transaction pool again

* Informant dep reordering

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: Svyatoslav Nikolsky <svyatonik@gmail.com>
Co-authored-by: Benjamin Kampmann <ben.kampmann@googlemail.com>
Co-authored-by: Demi Obenour <48690212+DemiMarie-parity@users.noreply.github.com>
This commit is contained in:
Ashley
2020-02-10 12:23:55 +01:00
committed by GitHub
parent 34bf0caa05
commit ead6815ae4
54 changed files with 299 additions and 155 deletions
@@ -14,7 +14,7 @@ bytes = "0.4.12"
codec = { package = "parity-scale-codec", default-features = false, version = "1.0.3" }
derive_more = "0.99.2"
futures = "0.3.1"
futures-timer = "2.0"
futures-timer = "3.0.1"
libp2p = { version = "0.15.0", default-features = false, features = ["secp256k1", "libp2p-websocket"] }
log = "0.4.8"
prost = "0.6.1"
+1
View File
@@ -21,6 +21,7 @@ tokio = { version = "0.2.9", features = [ "signal", "rt-core", "rt-threaded" ] }
futures = "0.3.1"
fdlimit = "0.1.1"
serde_json = "1.0.41"
sc-informant = { version = "0.8", path = "../informant" }
sp-panic-handler = { version = "2.0.0", path = "../../primitives/panic-handler" }
sc-client-api = { version = "2.0.0", path = "../api" }
sp-blockchain = { version = "2.0.0", path = "../../primitives/blockchain" }
-1
View File
@@ -24,7 +24,6 @@ mod traits;
mod params;
mod execution_strategy;
pub mod error;
pub mod informant;
mod runtime;
mod node_key;
+1 -2
View File
@@ -21,7 +21,6 @@ use futures::select;
use futures::pin_mut;
use sc_service::{AbstractService, Configuration};
use crate::error;
use crate::informant;
#[cfg(target_family = "unix")]
async fn main<F, E>(func: F) -> Result<(), Box<dyn std::error::Error>>
@@ -124,7 +123,7 @@ where
let service = service_builder(config)?;
let informant_future = informant::build(&service);
let informant_future = sc_informant::build(&service, sc_informant::OutputFormat::Coloured);
let _informant_handle = runtime.spawn(informant_future);
// we eagerly drop the service so that the internal exit future is fired,
+1 -1
View File
@@ -16,7 +16,7 @@ codec = { package = "parity-scale-codec", version = "1.0.0" }
sp-consensus = { version = "0.8", path = "../../../primitives/consensus/common" }
derive_more = "0.99.2"
futures = "0.3.1"
futures-timer = "0.4.0"
futures-timer = "3.0.1"
sp-inherents = { version = "2.0.0", path = "../../../primitives/inherents" }
sc-keystore = { version = "2.0.0", path = "../../keystore" }
log = "0.4.8"
+1 -1
View File
@@ -32,7 +32,7 @@ sc-consensus-slots = { version = "0.8", path = "../slots" }
sp-runtime = { version = "2.0.0", path = "../../../primitives/runtime" }
fork-tree = { version = "2.0.0", path = "../../../utils/fork-tree" }
futures = "0.3.1"
futures-timer = "0.4.0"
futures-timer = "3.0.1"
parking_lot = "0.10.0"
log = "0.4.8"
schnorrkel = { version = "0.8.5", features = ["preaudit_deprecated"] }
+1 -1
View File
@@ -19,7 +19,7 @@ sc-telemetry = { version = "2.0.0", path = "../../telemetry" }
sp-consensus = { version = "0.8", path = "../../../primitives/consensus/common" }
sp-inherents = { version = "2.0.0", path = "../../../primitives/inherents" }
futures = "0.3.1"
futures-timer = "2.0"
futures-timer = "3.0.1"
parking_lot = "0.10.0"
log = "0.4.8"
+1 -1
View File
@@ -8,7 +8,7 @@ license = "GPL-3.0"
[dependencies]
fork-tree = { version = "2.0.0", path = "../../utils/fork-tree" }
futures = "0.3.1"
futures-timer = "2.0.2"
futures-timer = "3.0.1"
log = "0.4.8"
parking_lot = "0.10.0"
rand = "0.7.2"
@@ -19,7 +19,7 @@
use futures_timer::Delay;
use futures::{channel::mpsc, future::{FutureExt as _}, prelude::*, ready, stream::Stream};
use log::debug;
use std::{pin::Pin, task::{Context, Poll}, time::{Instant, Duration}};
use std::{pin::Pin, task::{Context, Poll}, time::Duration};
use sc_network::PeerId;
use sp_runtime::traits::{NumberFor, Block as BlockT};
@@ -28,10 +28,6 @@ use super::gossip::{NeighborPacket, GossipMessage};
// How often to rebroadcast, in cases where no new packets are created.
const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);
fn rebroadcast_instant() -> Instant {
Instant::now() + REBROADCAST_AFTER
}
/// A sender used to send neighbor packets to a background job.
#[derive(Clone)]
pub(super) struct NeighborPacketSender<B: BlockT>(
@@ -85,7 +81,7 @@ impl <B: BlockT> Stream for NeighborPacketWorker<B> {
match this.rx.poll_next_unpin(cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some((to, packet))) => {
this.delay.reset(rebroadcast_instant());
this.delay.reset(REBROADCAST_AFTER);
this.last = Some((to.clone(), packet.clone()));
return Poll::Ready(Some((to, GossipMessage::<B>::from(packet.clone()))));
@@ -98,7 +94,7 @@ impl <B: BlockT> Stream for NeighborPacketWorker<B> {
// Getting this far here implies that the timer fired.
this.delay.reset(rebroadcast_instant());
this.delay.reset(REBROADCAST_AFTER);
// Make sure the underlying task is scheduled for wake-up.
//
+19
View File
@@ -0,0 +1,19 @@
[package]
name = "sc-informant"
version = "0.8.0"
authors = ["Parity Technologies <admin@parity.io>"]
description = "Substrate informant."
edition = "2018"
license = "GPL-3.0"
[dependencies]
ansi_term = "0.12.1"
futures = "0.3.1"
log = "0.4.8"
parity-util-mem = { version = "0.5.1", default-features = false, features = ["primitive-types"] }
wasm-timer = "0.2"
sc-client-api = { version = "2.0.0", path = "../api" }
sc-network = { version = "0.8", path = "../network" }
sc-service = { version = "0.8", default-features = false, path = "../service" }
sp-blockchain = { version = "2.0.0", path = "../../primitives/blockchain" }
sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" }
@@ -20,7 +20,9 @@ use log::info;
use sc_network::SyncState;
use sp_runtime::traits::{Block as BlockT, CheckedDiv, NumberFor, Zero, Saturating};
use sc_service::NetworkStatus;
use std::{convert::{TryFrom, TryInto}, fmt, time};
use std::{convert::{TryFrom, TryInto}, fmt};
use wasm_timer::Instant;
use crate::OutputFormat;
/// State of the informant display system.
///
@@ -40,15 +42,18 @@ pub struct InformantDisplay<B: BlockT> {
/// `None` if `display` has never been called.
last_number: Option<NumberFor<B>>,
/// The last time `display` or `new` has been called.
last_update: time::Instant,
last_update: Instant,
/// The format to print output in.
format: OutputFormat,
}
impl<B: BlockT> InformantDisplay<B> {
/// Builds a new informant display system.
pub fn new() -> InformantDisplay<B> {
pub fn new(format: OutputFormat) -> InformantDisplay<B> {
InformantDisplay {
last_number: None,
last_update: time::Instant::now(),
last_update: Instant::now(),
format,
}
}
@@ -56,8 +61,10 @@ impl<B: BlockT> InformantDisplay<B> {
pub fn display(&mut self, info: &ClientInfo<B>, net_status: NetworkStatus<B>) {
let best_number = info.chain.best_number;
let best_hash = info.chain.best_hash;
let finalized_number = info.chain.finalized_number;
let num_connected_peers = net_status.num_connected_peers;
let speed = speed::<B>(best_number, self.last_number, self.last_update);
self.last_update = time::Instant::now();
self.last_update = Instant::now();
self.last_number = Some(best_number);
let (status, target) = match (net_status.sync_state, net_status.best_seen_block) {
@@ -66,19 +73,35 @@ impl<B: BlockT> InformantDisplay<B> {
(SyncState::Downloading, Some(n)) => (format!("Syncing{}", speed), format!(", target=#{}", n)),
};
info!(
target: "substrate",
"{}{} ({} peers), best: #{} ({}), finalized #{} ({}), ⬇ {} ⬆ {}",
Colour::White.bold().paint(&status),
target,
Colour::White.bold().paint(format!("{}", net_status.num_connected_peers)),
Colour::White.paint(format!("{}", best_number)),
best_hash,
Colour::White.paint(format!("{}", info.chain.finalized_number)),
info.chain.finalized_hash,
TransferRateFormat(net_status.average_download_per_sec),
TransferRateFormat(net_status.average_upload_per_sec),
);
if self.format == OutputFormat::Coloured {
info!(
target: "substrate",
"{}{} ({} peers), best: #{} ({}), finalized #{} ({}), ⬇ {} ⬆ {}",
Colour::White.bold().paint(&status),
target,
Colour::White.bold().paint(format!("{}", num_connected_peers)),
Colour::White.paint(format!("{}", best_number)),
best_hash,
Colour::White.paint(format!("{}", finalized_number)),
info.chain.finalized_hash,
TransferRateFormat(net_status.average_download_per_sec),
TransferRateFormat(net_status.average_upload_per_sec),
);
} else {
info!(
target: "substrate",
"{}{} ({} peers), best: #{} ({}), finalized #{} ({}), ⬇ {} ⬆ {}",
status,
target,
num_connected_peers,
best_number,
best_hash,
finalized_number,
info.chain.finalized_hash,
TransferRateFormat(net_status.average_download_per_sec),
TransferRateFormat(net_status.average_upload_per_sec),
);
}
}
}
@@ -87,7 +110,7 @@ impl<B: BlockT> InformantDisplay<B> {
fn speed<B: BlockT>(
best_number: NumberFor<B>,
last_number: Option<NumberFor<B>>,
last_update: time::Instant
last_update: Instant
) -> String {
// Number of milliseconds elapsed since last time.
let elapsed_ms = {
@@ -25,12 +25,19 @@ use std::time::Duration;
mod display;
/// The format to print telemetry output in.
#[derive(PartialEq)]
pub enum OutputFormat {
Coloured,
Plain,
}
/// Creates an informant in the form of a `Future` that must be polled regularly.
pub fn build(service: &impl AbstractService) -> impl futures::Future<Output = ()> {
pub fn build(service: &impl AbstractService, format: OutputFormat) -> impl futures::Future<Output = ()> {
let client = service.client();
let pool = service.transaction_pool();
let mut display = display::InformantDisplay::new();
let mut display = display::InformantDisplay::new(format);
let display_notifications = service
.network_status(Duration::from_millis(5000))
@@ -41,6 +48,7 @@ pub fn build(service: &impl AbstractService) -> impl futures::Future<Output = ()
} else {
trace!(target: "usage", "Usage statistics not displayed as backend does not provide it")
}
#[cfg(not(target_os = "unknown"))]
trace!(
target: "usage",
"Subsystems memory [txpool: {} kB]",
+2 -1
View File
@@ -9,8 +9,9 @@ edition = "2018"
[dependencies]
log = "0.4.8"
futures = { version = "0.3.1", features = ["compat"] }
wasm-timer = "0.2"
futures-timer = "3.0.1"
futures01 = { package = "futures", version = "0.1.29" }
futures-timer = "0.4.0"
libp2p = { version = "0.15.0", default-features = false, features = ["libp2p-websocket"] }
lru = "0.1.2"
parking_lot = "0.10.0"
@@ -28,6 +28,7 @@ use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
use sp_runtime::ConsensusEngineId;
pub use sc_network::message::generic::{Message, ConsensusMessage};
use sc_network::config::Roles;
use wasm_timer::Instant;
// FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115
const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096;
@@ -165,7 +166,7 @@ pub struct ConsensusGossip<B: BlockT> {
messages: Vec<MessageEntry<B>>,
known_messages: LruCache<B::Hash, ()>,
validators: HashMap<ConsensusEngineId, Arc<dyn Validator<B>>>,
next_broadcast: time::Instant,
next_broadcast: Instant,
}
impl<B: BlockT> ConsensusGossip<B> {
@@ -177,7 +178,7 @@ impl<B: BlockT> ConsensusGossip<B> {
messages: Default::default(),
known_messages: LruCache::new(KNOWN_MESSAGES_CACHE_SIZE),
validators: Default::default(),
next_broadcast: time::Instant::now() + REBROADCAST_INTERVAL,
next_broadcast: Instant::now() + REBROADCAST_INTERVAL,
}
}
@@ -260,9 +261,9 @@ impl<B: BlockT> ConsensusGossip<B> {
/// Perform periodic maintenance
pub fn tick(&mut self, network: &mut dyn Network<B>) {
self.collect_garbage();
if time::Instant::now() >= self.next_broadcast {
if Instant::now() >= self.next_broadcast {
self.rebroadcast(network);
self.next_broadcast = time::Instant::now() + REBROADCAST_INTERVAL;
self.next_broadcast = Instant::now() + REBROADCAST_INTERVAL;
}
}
+2 -1
View File
@@ -17,7 +17,8 @@ fnv = "1.0.6"
fork-tree = { version = "2.0.0", path = "../../utils/fork-tree" }
futures = "0.3.1"
futures_codec = "0.3.3"
futures-timer = "0.4.0"
futures-timer = "3.0.1"
wasm-timer = "0.2"
libp2p = { version = "0.15.0", default-features = false, features = ["libp2p-websocket"] }
linked-hash-map = "0.5.2"
linked_hash_set = "0.1.3"
+2 -1
View File
@@ -28,7 +28,8 @@ use std::error;
use std::collections::hash_map::Entry;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::time::Duration;
use wasm_timer::Instant;
use crate::utils::interval;
/// Time after we disconnect from a node before we purge its information from the cache.
+7 -6
View File
@@ -52,6 +52,7 @@ use crate::chain::{Client, FinalityProofProvider};
use sc_client_api::{FetchChecker, ChangesProof, StorageProof};
use crate::error;
use util::LruHashSet;
use wasm_timer::Instant;
mod legacy_proto;
mod util;
@@ -158,7 +159,7 @@ struct PacketStats {
/// A peer that we are connected to
/// and from whom we have not yet received a Status message.
struct HandshakingPeer {
timestamp: time::Instant,
timestamp: Instant,
}
/// Peer information
@@ -166,9 +167,9 @@ struct HandshakingPeer {
struct Peer<B: BlockT, H: ExHashT> {
info: PeerInfo<B>,
/// Current block request, if any.
block_request: Option<(time::Instant, message::BlockRequest<B>)>,
block_request: Option<(Instant, message::BlockRequest<B>)>,
/// Requests we are no longer insterested in.
obsolete_requests: HashMap<message::RequestId, time::Instant>,
obsolete_requests: HashMap<message::RequestId, Instant>,
/// Holds a set of transactions known to this peer.
known_extrinsics: LruHashSet<H>,
/// Holds a set of blocks known to this peer.
@@ -701,7 +702,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
/// Called when a new peer is connected
pub fn on_peer_connected(&mut self, who: PeerId) {
trace!(target: "sync", "Connecting {}", who);
self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: time::Instant::now() });
self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: Instant::now() });
self.send_status(who);
}
@@ -890,7 +891,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
}
fn maintain_peers(&mut self) {
let tick = time::Instant::now();
let tick = Instant::now();
let mut aborting = Vec::new();
{
for (who, peer) in self.context_data.peers.iter() {
@@ -1833,7 +1834,7 @@ fn send_request<B: BlockT, H: ExHashT>(
trace!(target: "sync", "Request {} for {} is now obsolete.", request.id, who);
peer.obsolete_requests.insert(request.id, timestamp);
}
peer.block_request = Some((time::Instant::now(), r.clone()));
peer.block_request = Some((Instant::now(), r.clone()));
}
}
send_message::<B>(behaviour, stats, who, message)
@@ -26,7 +26,8 @@ use log::{debug, error, trace, warn};
use rand::distributions::{Distribution as _, Uniform};
use smallvec::SmallVec;
use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem, pin::Pin};
use std::time::{Duration, Instant};
use std::time::Duration;
use wasm_timer::Instant;
use std::task::{Context, Poll};
/// Network behaviour that handles opening substreams for custom protocols with other nodes.
@@ -387,7 +388,7 @@ impl<TSubstream> LegacyProto<TSubstream> {
debug!(target: "sub-libp2p", "PSM => Connect({:?}): Will start to connect at \
until {:?}", occ_entry.key(), until);
*occ_entry.into_mut() = PeerState::PendingRequest {
timer: futures_timer::Delay::new_at(until.clone()),
timer: futures_timer::Delay::new(until.clone() - Instant::now()),
timer_deadline: until.clone(),
};
},
@@ -406,7 +407,7 @@ impl<TSubstream> LegacyProto<TSubstream> {
*occ_entry.into_mut() = PeerState::DisabledPendingEnable {
connected_point: connected_point.clone(),
open,
timer: futures_timer::Delay::new_at(banned.clone()),
timer: futures_timer::Delay::new(banned.clone() - Instant::now()),
timer_deadline: banned.clone(),
};
},
@@ -348,13 +348,12 @@ where
ProtocolState::Init { substreams, mut init_deadline } => {
match Pin::new(&mut init_deadline).poll(cx) {
Poll::Ready(Ok(())) => {
Poll::Ready(()) => {
init_deadline = Delay::new(Duration::from_secs(60));
error!(target: "sub-libp2p", "Handler initialization process is too long \
with {:?}", self.remote_peer_id)
},
Poll::Pending => {}
Poll::Ready(Err(_)) => error!(target: "sub-libp2p", "Tokio timer has errored")
}
self.state = ProtocolState::Init { substreams, init_deadline };
@@ -363,7 +362,7 @@ where
ProtocolState::Opening { mut deadline } => {
match Pin::new(&mut deadline).poll(cx) {
Poll::Ready(Ok(())) => {
Poll::Ready(()) => {
deadline = Delay::new(Duration::from_secs(60));
let event = CustomProtoHandlerOut::ProtocolError {
is_severe: true,
@@ -376,12 +375,6 @@ where
self.state = ProtocolState::Opening { deadline };
None
},
Poll::Ready(Err(_)) => {
error!(target: "sub-libp2p", "Tokio timer has errored");
deadline = Delay::new(Duration::from_secs(60));
self.state = ProtocolState::Opening { deadline };
None
},
}
}
@@ -409,7 +409,7 @@ fn reconnect_after_disconnect() {
_ => panic!()
}
if let Poll::Ready(Ok(_)) = delay.poll_unpin(cx) {
if let Poll::Ready(()) = delay.poll_unpin(cx) {
Poll::Ready(Ok(()))
} else {
Poll::Pending
@@ -21,7 +21,8 @@
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Instant, Duration};
use std::time::Duration;
use wasm_timer::Instant;
use log::{trace, info};
use futures::channel::oneshot::{Sender as OneShotSender};
use linked_hash_map::{Entry, LinkedHashMap};
@@ -21,7 +21,8 @@ use libp2p::PeerId;
use log::{debug, trace, warn};
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant};
use std::time::Duration;
use wasm_timer::Instant;
// Time to wait before trying to get the same extra data from the same peer.
const EXTRA_RETRY_WAIT: Duration = Duration::from_secs(10);
+11 -4
View File
@@ -118,11 +118,18 @@ pub fn build_transport(
core::upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1)
.map_ok(|(id, muxer)| (id, core::muxing::StreamMuxerBox::new(muxer)))
})
});
.timeout(Duration::from_secs(20))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();
let transport = if cfg!(not(target_os = "unknown")) {
transport
.timeout(Duration::from_secs(20))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed()
} else {
transport
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed()
};
(transport, sinks)
}
+1 -1
View File
@@ -12,7 +12,7 @@ log = "0.4.8"
parking_lot = "0.10.0"
futures = "0.1.29"
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
futures-timer = "0.4.0"
futures-timer = "3.0.1"
rand = "0.7.2"
libp2p = { version = "0.15.0", default-features = false, features = ["libp2p-websocket"] }
sp-consensus = { version = "0.8", path = "../../../primitives/consensus/common" }
+2 -2
View File
@@ -405,7 +405,7 @@ fn blocks_are_not_announced_by_light_nodes() {
net.peers.remove(0);
// Poll for a few seconds and make sure 1 and 2 (now 0 and 1) don't sync together.
let mut delay = futures_timer::Delay::new(Duration::from_secs(5)).compat();
let mut delay = futures_timer::Delay::new(Duration::from_secs(5)).unit_error().compat();
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();
delay.poll().map_err(|_| ())
@@ -504,7 +504,7 @@ fn can_not_sync_from_light_peer() {
net.peers.remove(0);
// ensure that the #2 (now #1) fails to sync block #1 even after 5 seconds
let mut test_finished = futures_timer::Delay::new(Duration::from_secs(5)).compat();
let mut test_finished = futures_timer::Delay::new(Duration::from_secs(5)).unit_error().compat();
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
test_finished.poll().map_err(|_| ())
+1 -1
View File
@@ -13,7 +13,7 @@ sp-api = { version = "2.0.0", path = "../../primitives/api" }
fnv = "1.0.6"
futures01 = { package = "futures", version = "0.1" }
futures = "0.3.1"
futures-timer = "2.0"
futures-timer = "3.0.1"
log = "0.4.8"
threadpool = "1.7"
num_cpus = "1.10"
+1
View File
@@ -12,6 +12,7 @@ futures = "0.3.1"
libp2p = { version = "0.15.0", default-features = false }
log = "0.4.8"
serde_json = "1.0.41"
wasm-timer = "0.2"
[dev-dependencies]
rand = "0.7.2"
+2 -1
View File
@@ -19,12 +19,13 @@
mod peersstate;
use std::{collections::{HashSet, HashMap}, collections::VecDeque, time::Instant};
use std::{collections::{HashSet, HashMap}, collections::VecDeque};
use futures::{prelude::*, channel::mpsc};
use libp2p::PeerId;
use log::{debug, error, trace};
use serde_json::json;
use std::{pin::Pin, task::Context, task::Poll};
use wasm_timer::Instant;
/// We don't accept nodes whose reputation is under this value.
const BANNED_THRESHOLD: i32 = 82 * (i32::min_value() / 100);
+2 -1
View File
@@ -24,7 +24,8 @@ lazy_static = "1.4.0"
log = "0.4.8"
slog = { version = "2.5.2", features = ["nested-values"] }
tokio-executor = "0.1.8"
futures-timer = "2"
futures-timer = "3.0.1"
wasm-timer = "0.2"
exit-future = "0.2.0"
serde = "1.0.101"
serde_json = "1.0.41"
+9 -4
View File
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::{Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID};
use crate::{Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm};
use crate::{SpawnTaskHandle, start_rpc_servers, build_network_future, TransactionPoolAdapter};
use crate::status_sinks;
use crate::config::{Configuration, DatabaseConfig, KeystoreConfig};
@@ -46,8 +46,9 @@ use sc_executor::{NativeExecutor, NativeExecutionDispatch};
use std::{
borrow::Cow,
io::{Read, Write, Seek},
marker::PhantomData, sync::Arc, time::SystemTime, pin::Pin
marker::PhantomData, sync::Arc, pin::Pin
};
use wasm_timer::SystemTime;
use sysinfo::{get_current_pid, ProcessExt, System, SystemExt};
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_transaction_pool::MaintainedTransactionPool;
@@ -738,7 +739,7 @@ ServiceBuilder<
TSc: Clone,
TImpQu: 'static + ImportQueue<TBl>,
TNetP: NetworkSpecialization<TBl>,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash> + 'static,
TExPool: MaintainedTransactionPool<Block=TBl, Hash = <TBl as BlockT>::Hash> + MallocSizeOfWasm + 'static,
TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata> + Clone,
{
@@ -984,6 +985,10 @@ ServiceBuilder<
"disk_read_per_sec" => info.usage.as_ref().map(|usage| usage.io.bytes_read).unwrap_or(0),
"disk_write_per_sec" => info.usage.as_ref().map(|usage| usage.io.bytes_written).unwrap_or(0),
);
#[cfg(not(target_os = "unknown"))]
let memory_transaction_pool = parity_util_mem::malloc_size(&*transaction_pool_);
#[cfg(target_os = "unknown")]
let memory_transaction_pool = 0;
let _ = record_metrics!(
"peers" => num_peers,
"height" => best_number,
@@ -997,7 +1002,7 @@ ServiceBuilder<
"used_db_cache_size" => info.usage.as_ref().map(|usage| usage.memory.database_cache).unwrap_or(0),
"disk_read_per_sec" => info.usage.as_ref().map(|usage| usage.io.bytes_read).unwrap_or(0),
"disk_write_per_sec" => info.usage.as_ref().map(|usage| usage.io.bytes_written).unwrap_or(0),
"memory_transaction_pool" => parity_util_mem::malloc_size(&*transaction_pool_),
"memory_transaction_pool" => memory_transaction_pool,
);
ready(())
+15 -3
View File
@@ -31,7 +31,8 @@ use std::{borrow::Cow, io, pin::Pin};
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::collections::HashMap;
use std::time::{Duration, Instant};
use std::time::Duration;
use wasm_timer::Instant;
use std::task::{Poll, Context};
use parking_lot::Mutex;
@@ -52,6 +53,7 @@ use log::{log, warn, debug, error, Level};
use codec::{Encode, Decode};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{NumberFor, Block as BlockT};
use parity_util_mem::MallocSizeOf;
pub use self::error::Error;
pub use self::builder::{
@@ -73,6 +75,16 @@ pub use sc_network::{FinalityProofProvider, OnDemand, config::BoxFinalityProofRe
const DEFAULT_PROTOCOL_ID: &str = "sup";
/// A type that implements `MallocSizeOf` on native but not wasm.
#[cfg(not(target_os = "unknown"))]
pub trait MallocSizeOfWasm: MallocSizeOf {}
#[cfg(target_os = "unknown")]
pub trait MallocSizeOfWasm {}
#[cfg(not(target_os = "unknown"))]
impl<T: MallocSizeOf> MallocSizeOfWasm for T {}
#[cfg(target_os = "unknown")]
impl<T> MallocSizeOfWasm for T {}
/// Substrate service.
pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
client: Arc<TCl>,
@@ -163,7 +175,7 @@ pub trait AbstractService: 'static + Future<Output = Result<(), Error>> +
/// Chain selection algorithm.
type SelectChain: sp_consensus::SelectChain<Self::Block>;
/// Transaction pool.
type TransactionPool: TransactionPool<Block = Self::Block>;
type TransactionPool: TransactionPool<Block = Self::Block> + MallocSizeOfWasm;
/// Network specialization.
type NetworkSpecialization: NetworkSpecialization<Self::Block>;
@@ -227,7 +239,7 @@ where
TExec: 'static + sc_client::CallExecutor<TBl> + Send + Sync + Clone,
TRtApi: 'static + Send + Sync,
TSc: sp_consensus::SelectChain<TBl> + 'static + Clone + Send + Unpin,
TExPool: 'static + TransactionPool<Block = TBl>,
TExPool: 'static + TransactionPool<Block = TBl> + MallocSizeOfWasm,
TOc: 'static + Send + Sync,
TNetSpec: NetworkSpecialization<TBl>,
{
+2 -1
View File
@@ -10,7 +10,8 @@ license = "GPL-3.0"
bytes = "0.5"
parking_lot = "0.10.0"
futures = "0.3.1"
futures-timer = "2.0.0"
futures-timer = "3.0.1"
wasm-timer = "0.2.0"
libp2p = { version = "0.15.0", default-features = false, features = ["libp2p-websocket"] }
log = "0.4.8"
pin-project = "0.4.6"
+2 -1
View File
@@ -63,7 +63,8 @@ use libp2p::{Multiaddr, wasm_ext};
use log::{error, warn};
use parking_lot::Mutex;
use serde::{Serialize, Deserialize};
use std::{pin::Pin, sync::Arc, task::{Context, Poll}, time::{Duration, Instant}};
use std::{pin::Pin, sync::Arc, task::{Context, Poll}, time::Duration};
use wasm_timer::Instant;
pub use libp2p::wasm_ext::ExtTransport;
pub use slog_scope::with_logger;
+2 -2
View File
@@ -98,10 +98,10 @@ impl TelemetryWorker {
.map_ok(|data| BytesMut::from(data.as_ref()));
future::ready(Ok::<_, io::Error>(connec))
})
});
})
.timeout(CONNECT_TIMEOUT);
let transport = transport
.timeout(CONNECT_TIMEOUT)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.map(|out, _| {
let out = out
@@ -12,6 +12,7 @@ futures = { version = "0.3.1", features = ["compat"] }
futures-diagnose = "1.0"
log = "0.4.8"
parking_lot = "0.10.0"
wasm-timer = "0.2"
sp-core = { version = "2.0.0", path = "../../primitives/core" }
sp-api = { version = "2.0.0", path = "../../primitives/api" }
sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" }
@@ -11,6 +11,7 @@ futures = "0.3.1"
log = "0.4.8"
parking_lot = "0.10.0"
serde = { version = "1.0.101", features = ["derive"] }
wasm-timer = "0.2"
sp-core = { version = "2.0.0", path = "../../../primitives/core" }
sp-runtime = { version = "2.0.0", path = "../../../primitives/runtime" }
sp-transaction-pool = { version = "2.0.0", path = "../../../primitives/transaction-pool" }
@@ -209,7 +209,8 @@ const RECENTLY_PRUNED_TAGS: usize = 2;
/// as-is for the second time will fail or produce unwanted results.
/// Most likely it is required to revalidate them and recompute set of
/// required tags.
#[derive(Debug, parity_util_mem::MallocSizeOf)]
#[derive(Debug)]
#[cfg_attr(not(target_os = "unknown"), derive(parity_util_mem::MallocSizeOf))]
pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
reject_future_transactions: bool,
future: FutureTransactions<Hash, Ex>,
@@ -19,17 +19,17 @@ use std::{
fmt,
hash,
sync::Arc,
time,
};
use sp_core::hexdisplay::HexDisplay;
use sp_runtime::transaction_validity::{
TransactionTag as Tag,
};
use wasm_timer::Instant;
use crate::base_pool::Transaction;
#[derive(parity_util_mem::MallocSizeOf)]
#[cfg_attr(not(target_os = "unknown"), derive(parity_util_mem::MallocSizeOf))]
/// Transaction with partially satisfied dependencies.
pub struct WaitingTransaction<Hash, Ex> {
/// Transaction details.
@@ -37,7 +37,7 @@ pub struct WaitingTransaction<Hash, Ex> {
/// Tags that are required and have not been satisfied yet by other transactions in the pool.
pub missing_tags: HashSet<Tag>,
/// Time of import to the Future Queue.
pub imported_at: time::Instant,
pub imported_at: Instant,
}
impl<Hash: fmt::Debug, Ex: fmt::Debug> fmt::Debug for WaitingTransaction<Hash, Ex> {
@@ -91,7 +91,7 @@ impl<Hash, Ex> WaitingTransaction<Hash, Ex> {
WaitingTransaction {
transaction: Arc::new(transaction),
missing_tags,
imported_at: time::Instant::now(),
imported_at: Instant::now(),
}
}
@@ -110,7 +110,8 @@ impl<Hash, Ex> WaitingTransaction<Hash, Ex> {
///
/// Contains transactions that are still awaiting for some other transactions that
/// could provide a tag that they require.
#[derive(Debug, parity_util_mem::MallocSizeOf)]
#[derive(Debug)]
#[cfg_attr(not(target_os = "unknown"), derive(parity_util_mem::MallocSizeOf))]
pub struct FutureTransactions<Hash: hash::Hash + Eq, Ex> {
/// tags that are not yet provided by any transaction and we await for them
wanted_tags: HashMap<Tag, HashSet<Hash>>,
@@ -34,6 +34,7 @@ use sp_runtime::{
transaction_validity::{TransactionValidity, TransactionTag as Tag, TransactionValidityError},
};
use sp_transaction_pool::{error, PoolStatus};
use wasm_timer::Instant;
use crate::validated_pool::{ValidatedPool, ValidatedTransaction};
@@ -122,6 +123,7 @@ pub struct Pool<B: ChainApi> {
validated_pool: Arc<ValidatedPool<B>>,
}
#[cfg(not(target_os = "unknown"))]
impl<B: ChainApi> parity_util_mem::MallocSizeOf for Pool<B>
where
B::Hash: parity_util_mem::MallocSizeOf,
@@ -189,7 +191,6 @@ impl<B: ChainApi> Pool<B> {
at: &BlockId<B::Block>,
max: Option<usize>,
) -> Result<(), B::Error> {
use std::time::Instant;
log::debug!(target: "txpool",
"Fetching ready transactions (up to: {})",
max.map(|x| format!("{}", x)).unwrap_or_else(|| "all".into())
@@ -317,7 +318,7 @@ impl<B: ChainApi> Pool<B> {
// Make sure that we don't revalidate extrinsics that were part of the recently
// imported block. This is especially important for UTXO-like chains cause the
// inputs are pruned so such transaction would go to future again.
self.validated_pool.ban(&std::time::Instant::now(), known_imported_hashes.clone().into_iter());
self.validated_pool.ban(&Instant::now(), known_imported_hashes.clone().into_iter());
// Try to re-validate pruned transactions since some of them might be still valid.
// note that `known_imported_hashes` will be rejected here due to temporary ban.
@@ -469,10 +470,7 @@ impl<B: ChainApi> Clone for Pool<B> {
#[cfg(test)]
mod tests {
use std::{
collections::{HashMap, HashSet},
time::Instant,
};
use std::collections::{HashMap, HashSet};
use parking_lot::Mutex;
use futures::executor::block_on;
use super::*;
@@ -481,6 +479,7 @@ mod tests {
use codec::Encode;
use substrate_test_runtime::{Block, Extrinsic, Transfer, H256, AccountId};
use assert_matches::assert_matches;
use wasm_timer::Instant;
use crate::base_pool::Limit;
const INVALID_NONCE: u64 = 254;
@@ -23,9 +23,10 @@ use std::{
collections::HashMap,
hash,
iter,
time::{Duration, Instant},
time::Duration,
};
use parking_lot::RwLock;
use wasm_timer::Instant;
use crate::base_pool::Transaction;
@@ -19,7 +19,6 @@ use std::{
fmt,
hash,
sync::Arc,
time,
};
use crate::base_pool as base;
@@ -37,6 +36,7 @@ use sp_runtime::{
transaction_validity::TransactionTag as Tag,
};
use sp_transaction_pool::{error, PoolStatus};
use wasm_timer::Instant;
use crate::base_pool::PruneStatus;
use crate::pool::{EventStream, Options, ChainApi, BlockHash, ExHash, ExtrinsicFor, TransactionFor};
@@ -74,6 +74,7 @@ pub(crate) struct ValidatedPool<B: ChainApi> {
rotator: PoolRotator<ExHash<B>>,
}
#[cfg(not(target_os = "unknown"))]
impl<B: ChainApi> parity_util_mem::MallocSizeOf for ValidatedPool<B>
where
B::Hash: parity_util_mem::MallocSizeOf,
@@ -100,7 +101,7 @@ impl<B: ChainApi> ValidatedPool<B> {
}
/// Bans given set of hashes.
pub fn ban(&self, now: &std::time::Instant, hashes: impl IntoIterator<Item=ExHash<B>>) {
pub fn ban(&self, now: &Instant, hashes: impl IntoIterator<Item=ExHash<B>>) {
self.rotator.ban(now, hashes)
}
@@ -145,7 +146,7 @@ impl<B: ChainApi> ValidatedPool<B> {
Ok(imported.hash().clone())
}
ValidatedTransaction::Invalid(hash, err) => {
self.rotator.ban(&std::time::Instant::now(), std::iter::once(hash));
self.rotator.ban(&Instant::now(), std::iter::once(hash));
Err(err.into())
},
ValidatedTransaction::Unknown(hash, err) => {
@@ -177,7 +178,7 @@ impl<B: ChainApi> ValidatedPool<B> {
let removed = pool.enforce_limits(ready_limit, future_limit)
.into_iter().map(|x| x.hash.clone()).collect::<HashSet<_>>();
// ban all removed transactions
self.rotator.ban(&std::time::Instant::now(), removed.iter().map(|x| x.clone()));
self.rotator.ban(&Instant::now(), removed.iter().map(|x| x.clone()));
removed
};
// run notifications
@@ -208,7 +209,7 @@ impl<B: ChainApi> ValidatedPool<B> {
.map(|_| watcher)
},
ValidatedTransaction::Invalid(hash, err) => {
self.rotator.ban(&std::time::Instant::now(), std::iter::once(hash));
self.rotator.ban(&Instant::now(), std::iter::once(hash));
Err(err.into())
},
ValidatedTransaction::Unknown(_, err) => Err(err.into()),
@@ -430,7 +431,7 @@ impl<B: ChainApi> ValidatedPool<B> {
let block_number = self.api.block_id_to_number(at)?
.ok_or_else(|| error::Error::InvalidBlockId(format!("{:?}", at)).into())?
.saturated_into::<u64>();
let now = time::Instant::now();
let now = Instant::now();
let to_remove = {
self.ready()
.filter(|tx| self.rotator.ban_if_stale(&now, block_number, &tx))
@@ -497,7 +498,7 @@ impl<B: ChainApi> ValidatedPool<B> {
debug!(target: "txpool", "Removing invalid transactions: {:?}", hashes);
// temporarily ban invalid transactions
self.rotator.ban(&time::Instant::now(), hashes.iter().cloned());
self.rotator.ban(&Instant::now(), hashes.iter().cloned());
let invalid = self.pool.write().remove_subtree(hashes);
+4 -2
View File
@@ -28,7 +28,7 @@ pub mod testing;
pub use sc_transaction_graph as txpool;
pub use crate::api::{FullChainApi, LightChainApi};
use std::{collections::HashMap, sync::Arc, pin::Pin, time::Instant};
use std::{collections::HashMap, sync::Arc, pin::Pin};
use futures::{Future, FutureExt, future::ready};
use parking_lot::Mutex;
@@ -41,6 +41,7 @@ use sp_transaction_pool::{
TxHash, TransactionFor, TransactionStatusStreamFor, BlockHash,
MaintainedTransactionPool, PoolFuture,
};
use wasm_timer::Instant;
/// Basic implementation of transaction pool that can be customized by providing PoolApi.
pub struct BasicPool<PoolApi, Block>
@@ -53,6 +54,7 @@ pub struct BasicPool<PoolApi, Block>
revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
}
#[cfg(not(target_os = "unknown"))]
impl<PoolApi, Block> parity_util_mem::MallocSizeOf for BasicPool<PoolApi, Block>
where
PoolApi: sc_transaction_graph::ChainApi<Block=Block, Hash=Block::Hash>,
@@ -205,7 +207,7 @@ enum RevalidationStatus<N> {
/// The revalidation has never been completed.
NotScheduled,
/// The revalidation is scheduled.
Scheduled(Option<std::time::Instant>, Option<N>),
Scheduled(Option<Instant>, Option<N>),
/// The revalidation is in progress.
InProgress,
}