Update futures and tokio for browser light client (#673)

* Make availability-store compile for WASM

* Use --manifest-path instead

* Make validation work on wasm!

* Switch to Spawn trait

* Migrate validation to std futures

* Migrate network to std futures

* Final changes to validation

* Tidy up network

* Tidy up validation

* Switch branch

* Migrate service

* Get polkadot to compile via wasm!

* Add browser-demo

* Add initial browser file

* Add browser-demo

* Tidy

* Temp switch back to substrate/master

* tidy

* Fix wasm build

* Re-add release flag

* Switch to polkadot-master

* Revert cli tokio version to avoid libp2p panic

* Update tokio version

* Fix availability store tests

* Fix validation tests

* Remove futures01 from availability-store

* Fix network tests

* Small changes

* Fix collator

* Fix typo

* Revert removal of tokio_executor that causes tokio version mismatch panic

* Fix adder test parachain

* Revert "Revert removal of tokio_executor that causes tokio version mismatch panic"

This reverts commit cfeb50c01d8df5e209483406a711e64761b44ae9.

* Update availability-store/src/worker.rs

Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>

* Update network/src/lib.rs

Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>

* Update network/src/lib.rs

Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>

* Box pin changes

* Asyncify network functions

* Clean up browser validation worker error

* Fix av store test

* Nits

* Fix validation test

* Switch favicon

* Fix validation test again

* Revert "Asyncify network functions"

This reverts commit f20ae6548dc482cb1e75bc80641cfe55c6131a53.

* Add async blocks back in
This commit is contained in:
Ashley
2019-12-10 11:58:22 +01:00
committed by GitHub
parent df3ea965e7
commit 25aa988f9b
34 changed files with 826 additions and 444 deletions
+35 -34
View File
@@ -519,6 +519,24 @@ dependencies = [
"bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "console_error_panic_hook"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"wasm-bindgen 0.2.55 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "console_log"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"web-sys 0.3.32 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "const-random"
version = "0.1.6"
@@ -960,15 +978,6 @@ dependencies = [
"libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "exit-future"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "exit-future"
version = "0.2.0"
@@ -1386,14 +1395,6 @@ dependencies = [
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "futures01"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "fxhash"
version = "0.2.1"
@@ -3509,7 +3510,6 @@ dependencies = [
"derive_more 0.99.2 (registry+https://github.com/rust-lang/crates.io-index)",
"exit-future 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures01 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
"kvdb 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"kvdb-memorydb 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"kvdb-rocksdb 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -3528,27 +3528,35 @@ dependencies = [
"sp-consensus 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"sp-core 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"sp-runtime 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "polkadot-cli"
version = "0.7.9"
dependencies = [
"console_error_panic_hook 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"console_log 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"js-sys 0.3.32 (registry+https://github.com/rust-lang/crates.io-index)",
"kvdb-memorydb 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"libp2p 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"polkadot-service 0.7.9",
"sc-cli 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"sc-network 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"sc-service 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"structopt 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"wasm-bindgen 0.2.55 (registry+https://github.com/rust-lang/crates.io-index)",
"wasm-bindgen-futures 0.3.27 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "polkadot-collator"
version = "0.7.9"
dependencies = [
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-timer 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -3564,7 +3572,7 @@ dependencies = [
"sp-consensus 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"sp-core 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"sp-keyring 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -3593,7 +3601,6 @@ version = "0.7.9"
dependencies = [
"arrayvec 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"exit-future 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-scale-codec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -3734,7 +3741,6 @@ dependencies = [
name = "polkadot-service"
version = "0.7.9"
dependencies = [
"exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hex-literal 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -3789,11 +3795,9 @@ dependencies = [
name = "polkadot-validation"
version = "0.7.9"
dependencies = [
"async-std 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bitvec 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)",
"derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)",
"exit-future 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-timer 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -3821,8 +3825,7 @@ dependencies = [
"sp-timestamp 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"sp-transaction-pool-api 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"sp-trie 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -4434,7 +4437,7 @@ dependencies = [
"sp-state-machine 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"structopt 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -5999,7 +6002,7 @@ dependencies = [
[[package]]
name = "tokio"
version = "0.2.2"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -6073,8 +6076,6 @@ version = "0.2.0-alpha.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures-util-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-sync 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -6952,6 +6953,8 @@ dependencies = [
"checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9"
"checksum clear_on_drop 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "97276801e127ffb46b66ce23f35cc96bd454fa311294bced4bbace7baa8b1d17"
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
"checksum console_error_panic_hook 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "b8d976903543e0c48546a91908f21588a680a8c8f984df9a5d69feccb2b2a211"
"checksum console_log 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1e7871d2947441b0fdd8e2bd1ce2a2f75304f896582c0d572162d48290683c48"
"checksum const-random 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7b641a8c9867e341f3295564203b1c250eb8ce6cb6126e007941f78c4d2ed7fe"
"checksum const-random-macro 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c750ec12b83377637110d5a57f5ae08e895b06c4b16e2bdbf1a94ef717428c59"
"checksum constant_time_eq 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "995a44c877f9212528ccc74b21a232f66ad69001e40ede5bcee2ac9ef2657120"
@@ -7001,7 +7004,6 @@ dependencies = [
"checksum erased-serde 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3beee4bc16478a1b26f2e80ad819a52d24745e292f521a63c16eea5f74b7eb60"
"checksum errno 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c2a071601ed01b988f896ab14b95e67335d1eeb50190932a1320f7fe3cadc84e"
"checksum errno-dragonfly 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "14ca354e36190500e1e1fb267c647932382b54053c50b14970856c0b00a35067"
"checksum exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d8013f441e38e31c670e7f34ec8f1d5d3a2bd9d303c1ff83976ca886005e8f48"
"checksum exit-future 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e43f2f1833d64e33f15592464d6fdd70f349dda7b1a53088eb83cd94014008c5"
"checksum faerie 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f902f2af041f6c7177a2a04f805687cdc71e69c7cbef059a2755d8923f4cd7a8"
"checksum failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "f8273f13c977665c5db7eb2b99ae520952fe5ac831ae4cd09d80c4c7042b5ed9"
@@ -7046,7 +7048,6 @@ dependencies = [
"checksum futures-timer 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a1de7508b218029b0f01662ed8f61b1c964b3ae99d6f25462d0f55a595109df6"
"checksum futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c0d66274fb76985d3c62c886d1da7ac4c0903a8c9f754e8fe0f35a6a6cc39e76"
"checksum futures-util-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)" = "5ce968633c17e5f97936bd2797b6e38fb56cf16a7422319f7ec2e30d3c470e8d"
"checksum futures01 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)" = "7ef8cbbf52909170053540c6c05a62433ddb60662dabee714e2a882caa864f22"
"checksum fxhash 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
"checksum gcc 0.3.55 (registry+https://github.com/rust-lang/crates.io-index)" = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2"
"checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec"
@@ -7418,7 +7419,7 @@ dependencies = [
"checksum tiny-keccak 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d8a021c69bb74a44ccedb824a046447e2c84a01df9e5c20779750acb38e11b2"
"checksum tiny-keccak 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2953ca5148619bc99695c1274cb54c5275bbb913c6adad87e72eaf8db9787f69"
"checksum tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)" = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6"
"checksum tokio 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2e765bf9f550bd9b8a970633ca3b56b8120c4b6c5dcbe26a93744cb02fee4b17"
"checksum tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "bcced6bb623d4bff3739c176c415f13c418f426395c169c9c3cd9a492c715b16"
"checksum tokio-buf 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fb220f46c53859a4b7ec083e41dec9778ff0b1851c0942b211edb89e0ccdc46"
"checksum tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f"
"checksum tokio-codec 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)" = "9f5d22fd1e84bd4045d28813491cb7d7caae34d45c80517c2213f09a85e8787a"
+2 -3
View File
@@ -12,9 +12,8 @@ polkadot-runtime = { path = "../runtime" }
parking_lot = "0.9.0"
derive_more = "0.99"
log = "0.4.8"
futures01 = "0.1.17"
futures = { package = "futures", version = "0.3.1", features = ["compat"] }
tokio = "0.1.7"
futures = "0.3.1"
tokio = { version = "0.2.4", features = ["rt-core"] }
exit-future = "0.2.0"
codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
+5 -7
View File
@@ -23,7 +23,7 @@
#![warn(missing_docs)]
use futures::prelude::*;
use futures::channel::{mpsc, oneshot};
use futures::{channel::{mpsc, oneshot}, task::Spawn};
use keystore::KeyStorePtr;
use polkadot_primitives::{
Hash, Block,
@@ -38,7 +38,7 @@ use client::{
BlockchainEvents, BlockBody,
};
use sp_api::ApiExt;
use std::pin::Pin;
use log::warn;
use std::sync::Arc;
@@ -58,10 +58,7 @@ use worker::{
use store::{Store as InnerStore};
/// Abstraction over an executor that lets you spawn tasks in the background.
pub(crate) type TaskExecutor =
Arc<dyn futures01::future::Executor<
Box<dyn futures01::Future<Item = (), Error = ()> + Send>
> + Send + Sync>;
pub(crate) type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
const LOG_TARGET: &str = "availability";
@@ -110,7 +107,7 @@ pub trait ProvideGossipMessages {
fn gossip_messages_for(
&self,
topic: Hash,
) -> Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send + Unpin>;
) -> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>;
/// Gossip an erasure chunk message.
fn gossip_erasure_chunk(
@@ -155,6 +152,7 @@ impl Store {
///
/// Creating a store among other things starts a background worker thread which
/// handles most of the write operations to the storage.
#[cfg(not(target_os = "unknown"))]
pub fn new<PGM>(config: Config, gossip: PGM) -> io::Result<Self>
where PGM: ProvideGossipMessages + Send + Sync + Clone + 'static
{
+2
View File
@@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
#[cfg(not(target_os = "unknown"))]
use kvdb_rocksdb::{Database, DatabaseConfig};
use kvdb::{KeyValueDB, DBTransaction};
use codec::{Encode, Decode};
@@ -82,6 +83,7 @@ fn erasure_roots_in_relay_chain_block_key(relay_block: &Hash) -> Vec<u8> {
impl Store {
/// Create a new `Store` with given condig on disk.
#[cfg(not(target_os = "unknown"))]
pub(super) fn new(config: Config) -> io::Result<Self> {
let mut db_config = DatabaseConfig::with_columns(Some(columns::NUM_COLUMNS));
+22 -32
View File
@@ -37,11 +37,10 @@ use polkadot_primitives::parachain::{
CandidateReceipt, ParachainHost, ValidatorId,
ValidatorPair, AvailableMessages, BlockData, ErasureChunk,
};
use futures::channel::{mpsc, oneshot};
use futures::{FutureExt, Sink, SinkExt, TryFutureExt, StreamExt, future::select};
use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::SpawnExt};
use keystore::KeyStorePtr;
use tokio::runtime::current_thread::{Handle, Runtime as LocalRuntime};
use tokio::runtime::{Handle, Runtime as LocalRuntime};
use crate::{LOG_TARGET, Data, TaskExecutor, ProvideGossipMessages, erasure_coding_topic};
use crate::store::Store;
@@ -308,7 +307,7 @@ where
// Called on startup of the worker to register listeners for all awaited chunks.
fn register_listeners(
&mut self,
runtime_handle: &mut Handle,
runtime_handle: &Handle,
sender: &mut mpsc::UnboundedSender<WorkerMsg>,
) {
if let Some(awaited_chunks) = self.availability_store.awaited_chunks() {
@@ -327,7 +326,7 @@ where
fn register_chunks_listener(
&mut self,
runtime_handle: &mut Handle,
runtime_handle: &Handle,
sender: &mut mpsc::UnboundedSender<WorkerMsg>,
relay_parent: Hash,
erasure_root: Hash,
@@ -354,18 +353,14 @@ where
self.registered_gossip_streams.insert(topic, signal);
let _ = runtime_handle.spawn(
select(fut.boxed(), exit)
.map(|_| Ok(()))
.compat()
);
let _ = runtime_handle.spawn(select(fut.boxed(), exit).map(drop));
Ok(())
}
fn on_parachain_blocks_received(
&mut self,
runtime_handle: &mut Handle,
runtime_handle: &Handle,
sender: &mut mpsc::UnboundedSender<WorkerMsg>,
relay_parent: Hash,
blocks: Vec<(CandidateReceipt, Option<(BlockData, AvailableMessages)>)>,
@@ -450,7 +445,7 @@ where
// we don't have that piece, and then it registers a listener.
fn on_listen_for_chunks_received(
&mut self,
runtime_handle: &mut Handle,
runtime_handle: &Handle,
sender: &mut mpsc::UnboundedSender<WorkerMsg>,
relay_parent: Hash,
candidate_hash: Hash,
@@ -496,11 +491,11 @@ where
let mut runtime = LocalRuntime::new()?;
let mut sender = worker.sender.clone();
let mut runtime_handle = runtime.handle();
let runtime_handle = runtime.handle().clone();
// On startup, registers listeners (gossip streams) for all
// (relay_parent, erasure-root, i) in the awaited frontier.
worker.register_listeners(&mut runtime_handle, &mut sender);
worker.register_listeners(runtime.handle(), &mut sender);
let process_notification = async move {
while let Some(msg) = receiver.next().await {
@@ -525,7 +520,7 @@ where
} = msg;
let res = worker.on_listen_for_chunks_received(
&mut runtime_handle,
&runtime_handle,
&mut sender,
relay_parent,
candidate_hash,
@@ -545,7 +540,7 @@ where
} = msg;
let res = worker.on_parachain_blocks_received(
&mut runtime_handle,
&runtime_handle,
&mut sender,
relay_parent,
blocks,
@@ -589,15 +584,9 @@ where
};
runtime.spawn(
futures::future::select(process_notification.boxed(), exit.clone())
.map(|_| Ok(()))
.compat()
);
runtime.spawn(select(process_notification.boxed(), exit.clone()).map(drop));
if let Err(e) = runtime.block_on(exit.unit_error().compat()) {
warn!(target: LOG_TARGET, "Availability worker error {:?}", e);
}
runtime.block_on(exit);
info!(target: LOG_TARGET, "Availability worker exiting");
@@ -771,9 +760,9 @@ impl<I, P> AvailabilityBlockImport<I, P> {
let prune_available = select(
prune_unneeded_availability(client.clone(), to_worker.clone()).boxed(),
exit.clone()
).map(|_| Ok(())).compat();
).map(drop);
if let Err(_) = thread_pool.execute(Box::new(prune_available)) {
if let Err(_) = thread_pool.spawn(Box::new(prune_available)) {
error!(target: LOG_TARGET, "Failed to spawn availability pruning task");
exit_signal = None;
}
@@ -806,6 +795,7 @@ mod tests {
use std::time::Duration;
use futures::{stream, channel::mpsc, Stream};
use std::sync::{Arc, Mutex};
use std::pin::Pin;
use tokio::runtime::Runtime;
// Just contains topic->channel mapping to give to outer code on `gossip_messages_for` calls.
@@ -815,11 +805,11 @@ mod tests {
impl ProvideGossipMessages for TestGossipMessages {
fn gossip_messages_for(&self, topic: Hash)
-> Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send + Unpin>
-> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>
{
match self.messages.lock().unwrap().remove(&topic) {
Some(receiver) => Box::new(receiver),
None => Box::new(stream::iter(vec![])),
Some(receiver) => receiver.boxed(),
None => stream::iter(vec![]).boxed(),
}
}
@@ -890,7 +880,7 @@ mod tests {
// chunk topics.
handle.sender.unbounded_send(msg).unwrap();
runtime.block_on(r.unit_error().boxed().compat()).unwrap().unwrap().unwrap();
runtime.block_on(r).unwrap().unwrap();
// Make sure that at this point we are waiting for the appropriate chunk.
assert_eq!(
@@ -992,7 +982,7 @@ mod tests {
handle.sender.unbounded_send(listen_msg_2).unwrap();
runtime.block_on(r2.unit_error().boxed().compat()).unwrap().unwrap().unwrap();
runtime.block_on(r2).unwrap().unwrap();
// The gossip sender for this topic left intact => listener not registered.
assert!(messages.messages.lock().unwrap().contains_key(&topic_2));
@@ -1008,7 +998,7 @@ mod tests {
});
handle.sender.unbounded_send(listen_msg_1).unwrap();
runtime.block_on(r1.unit_error().boxed().compat()).unwrap().unwrap().unwrap();
runtime.block_on(r1).unwrap().unwrap();
// The gossip sender taken => listener registered.
assert!(!messages.messages.lock().unwrap().contains_key(&topic_1));
+26 -1
View File
@@ -5,6 +5,9 @@ authors = ["Parity Technologies <admin@parity.io>"]
description = "Polkadot node implementation in Rust."
edition = "2018"
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies]
log = "0.4.8"
tokio = "0.1.22"
@@ -14,6 +17,28 @@ structopt = "0.3.4"
cli = { package = "sc-cli", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
service = { package = "polkadot-service", path = "../service" }
libp2p = { version = "0.13.0", default-features = false, optional = true }
wasm-bindgen = { version = "0.2.45", optional = true }
wasm-bindgen-futures = { version = "0.3.22", optional = true }
console_log = { version = "0.1.2", optional = true }
console_error_panic_hook = { version = "0.1.1", optional = true }
js-sys = { version = "0.3.22", optional = true }
kvdb-memorydb = { version = "0.1.1", optional = true }
substrate-service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", optional = true, default-features = false }
substrate-network = { package = "sc-network", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", optional = true }
[features]
default = [ "wasmtime" ]
default = [ "wasmtime", "rocksdb" ]
wasmtime = [ "cli/wasmtime" ]
rocksdb = [ "service/rocksdb" ]
browser = [
"libp2p",
"wasm-bindgen",
"console_error_panic_hook",
"wasm-bindgen-futures",
"console_log",
"js-sys",
"kvdb-memorydb",
"substrate-service",
"substrate-network"
]
+1
View File
@@ -0,0 +1 @@
pkg
+9
View File
@@ -0,0 +1,9 @@
# How to run this demo
```sh
cargo install wasm-pack # If necessary
wasm-pack build --target web --out-dir ./browser-demo/pkg --no-typescript --release ./.. -- --no-default-features --features "browser"
xdg-open index.html
```
+3
View File
@@ -0,0 +1,3 @@
#!/usr/bin/env sh
wasm-pack build --target web --out-dir ./browser-demo/pkg --no-typescript --release ./.. -- --no-default-features --features "browser"
python -m http.server 8000
Binary file not shown.

After

Width:  |  Height:  |  Size: 4.7 KiB

+39
View File
@@ -0,0 +1,39 @@
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8"/>
<title>Polkadot node</title>
<link rel="shortcut icon" href="/favicon.png" />
<script type="module">
import { start_client, default as init } from './pkg/polkadot_cli.js';
import ws from './ws.js';
function log(msg) {
document.getElementsByTagName('body')[0].innerHTML += msg + '\n';
}
async function start() {
log('Loading WASM');
await init('./pkg/polkadot_cli_bg.wasm');
log('Successfully loaded WASM');
// Build our client.
log('Starting client');
let client = start_client(ws());
log('Client started');
client.rpcSubscribe('{"method":"chain_subscribeNewHead","params":[],"id":1,"jsonrpc":"2.0"}',
(r) => log("New chain head: " + r));
setInterval(() => {
client
.rpcSend('{"method":"system_networkState","params":[],"id":1,"jsonrpc":"2.0"}')
.then((r) => log("Network state: " + r));
}, 1000);
}
start();
</script>
</head>
<body style="white-space: pre"></body>
</html>
+148
View File
@@ -0,0 +1,148 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
export default () => {
return {
dial: dial,
listen_on: (addr) => {
let err = new Error("Listening on WebSockets is not possible from within a browser");
err.name = "NotSupportedError";
throw err;
},
};
}
/// Turns a string multiaddress into a WebSockets string URL.
// TODO: support dns addresses as well
const multiaddr_to_ws = (addr) => {
let parsed = addr.match(/^\/(ip4|ip6|dns4|dns6)\/(.*?)\/tcp\/(.*?)\/(ws|wss|x-parity-ws\/(.*)|x-parity-wss\/(.*))$/);
let proto = 'wss';
if (parsed[4] == 'ws' || parsed[4] == 'x-parity-ws') {
proto = 'ws';
}
let url = decodeURIComponent(parsed[5] || parsed[6] || '');
if (parsed != null) {
if (parsed[1] == 'ip6') {
return proto + "://[" + parsed[2] + "]:" + parsed[3] + url;
} else {
return proto + "://" + parsed[2] + ":" + parsed[3] + url;
}
}
let err = new Error("Address not supported: " + addr);
err.name = "NotSupportedError";
throw err;
}
// Attempt to dial a multiaddress.
const dial = (addr) => {
let ws = new WebSocket(multiaddr_to_ws(addr));
let reader = read_queue();
return new Promise((resolve, reject) => {
// TODO: handle ws.onerror properly after dialing has happened
ws.onerror = (ev) => reject(ev);
ws.onmessage = (ev) => reader.inject_blob(ev.data);
ws.onclose = () => reader.inject_eof();
ws.onopen = () => resolve({
read: (function*() { while(ws.readyState == 1) { yield reader.next(); } })(),
write: (data) => {
if (ws.readyState == 1) {
ws.send(data);
return promise_when_ws_finished(ws);
} else {
return Promise.reject("WebSocket is closed");
}
},
shutdown: () => {},
close: () => ws.close()
});
});
}
// Takes a WebSocket object and returns a Promise that resolves when bufferedAmount is 0.
const promise_when_ws_finished = (ws) => {
if (ws.bufferedAmount == 0) {
return Promise.resolve();
}
return new Promise((resolve, reject) => {
setTimeout(function check() {
if (ws.bufferedAmount == 0) {
resolve();
} else {
setTimeout(check, 100);
}
}, 2);
})
}
// Creates a queue reading system.
const read_queue = () => {
// State of the queue.
let state = {
// Array of promises resolving to `ArrayBuffer`s, that haven't been transmitted back with
// `next` yet.
queue: new Array(),
// If `resolve` isn't null, it is a "resolve" function of a promise that has already been
// returned by `next`. It should be called with some data.
resolve: null,
};
return {
// Inserts a new Blob in the queue.
inject_blob: (blob) => {
if (state.resolve != null) {
var resolve = state.resolve;
state.resolve = null;
var reader = new FileReader();
reader.addEventListener("loadend", () => resolve(reader.result));
reader.readAsArrayBuffer(blob);
} else {
state.queue.push(new Promise((resolve, reject) => {
var reader = new FileReader();
reader.addEventListener("loadend", () => resolve(reader.result));
reader.readAsArrayBuffer(blob);
}));
}
},
// Inserts an EOF message in the queue.
inject_eof: () => {
if (state.resolve != null) {
var resolve = state.resolve;
state.resolve = null;
resolve(null);
} else {
state.queue.push(Promise.resolve(null));
}
},
// Returns a Promise that yields the next entry as an ArrayBuffer.
next: () => {
if (state.queue.length != 0) {
return state.queue.shift(0);
} else {
if (state.resolve !== null)
throw "Internal error: already have a pending promise";
return new Promise((resolve, reject) => {
state.resolve = resolve;
});
}
}
};
};
+169
View File
@@ -0,0 +1,169 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::ChainSpec;
use futures01::{prelude::*, sync::oneshot, sync::mpsc};
use libp2p::wasm_ext;
use log::{debug, info};
use std::sync::Arc;
use service::{AbstractService, Roles as ServiceRoles};
use substrate_service::{RpcSession, Configuration, config::DatabaseConfig};
use wasm_bindgen::prelude::*;
/// Starts the client.
///
/// You must pass a libp2p transport that supports .
#[wasm_bindgen]
pub fn start_client(wasm_ext: wasm_ext::ffi::Transport) -> Result<Client, JsValue> {
start_inner(wasm_ext)
.map_err(|err| JsValue::from_str(&err.to_string()))
}
fn start_inner(wasm_ext: wasm_ext::ffi::Transport) -> Result<Client, Box<dyn std::error::Error>> {
console_error_panic_hook::set_once();
console_log::init_with_level(log::Level::Info);
// Build the configuration to pass to the service.
let config = {
let wasm_ext = wasm_ext::ExtTransport::new(wasm_ext);
let chain_spec = ChainSpec::Kusama.load().map_err(|e| format!("{:?}", e))?;
let mut config = Configuration::<service::CustomConfiguration, _, _>::default_with_spec_and_base_path(chain_spec, None);
config.network.transport = substrate_network::config::TransportConfig::Normal {
wasm_external_transport: Some(wasm_ext.clone()),
allow_private_ipv4: true,
enable_mdns: false,
};
config.telemetry_external_transport = Some(wasm_ext);
config.roles = ServiceRoles::LIGHT;
config.name = "Browser node".to_string();
config.database = {
let db = Arc::new(kvdb_memorydb::create(10));
DatabaseConfig::Custom(db)
};
config.keystore_path = Some(std::path::PathBuf::from("/"));
config
};
info!("Polkadot browser node");
info!(" version {}", config.full_version());
info!(" by Parity Technologies, 2017-2019");
info!("Chain specification: {}", config.chain_spec.name());
if config.chain_spec.name().starts_with("Kusama") {
info!("----------------------------");
info!("This chain is not in any way");
info!(" endorsed by the ");
info!(" KUSAMA FOUNDATION ");
info!("----------------------------");
}
info!("Node name: {}", config.name);
info!("Roles: {:?}", config.roles);
// Create the service. This is the most heavy initialization step.
let mut service = service::new_light(config).map_err(|e| format!("{:?}", e))?;
// We now dispatch a background task responsible for processing the service.
//
// The main action performed by the code below consists in polling the service with
// `service.poll()`.
// The rest consists in handling RPC requests.
let (rpc_send_tx, mut rpc_send_rx) = mpsc::unbounded::<RpcMessage>();
wasm_bindgen_futures::spawn_local(futures01::future::poll_fn(move || {
loop {
match rpc_send_rx.poll() {
Ok(Async::Ready(Some(message))) => {
let fut = service.rpc_query(&message.session, &message.rpc_json);
let _ = message.send_back.send(Box::new(fut));
},
Ok(Async::NotReady) => break,
Err(_) | Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
}
}
loop {
match service.poll().map_err(|_| ())? {
Async::Ready(()) => return Ok(Async::Ready(())),
Async::NotReady => break
}
}
Ok(Async::NotReady)
}));
Ok(Client {
rpc_send_tx,
})
}
/// A running client.
#[wasm_bindgen]
pub struct Client {
rpc_send_tx: mpsc::UnboundedSender<RpcMessage>,
}
struct RpcMessage {
rpc_json: String,
session: RpcSession,
send_back: oneshot::Sender<Box<dyn Future<Item = Option<String>, Error = ()>>>,
}
#[wasm_bindgen]
impl Client {
/// Allows starting an RPC request. Returns a `Promise` containing the result of that request.
#[wasm_bindgen(js_name = "rpcSend")]
pub fn rpc_send(&mut self, rpc: &str) -> js_sys::Promise {
let rpc_session = RpcSession::new(mpsc::channel(1).0);
let (tx, rx) = oneshot::channel();
let _ = self.rpc_send_tx.unbounded_send(RpcMessage {
rpc_json: rpc.to_owned(),
session: rpc_session,
send_back: tx,
});
let fut = rx
.map_err(|_| ())
.and_then(|fut| fut)
.map(|s| JsValue::from_str(&s.unwrap_or(String::new())))
.map_err(|_| JsValue::NULL);
wasm_bindgen_futures::future_to_promise(fut)
}
/// Subscribes to an RPC pubsub endpoint.
#[wasm_bindgen(js_name = "rpcSubscribe")]
pub fn rpc_subscribe(&mut self, rpc: &str, callback: js_sys::Function) {
let (tx, rx) = mpsc::channel(4);
let rpc_session = RpcSession::new(tx);
let (fut_tx, fut_rx) = oneshot::channel();
let _ = self.rpc_send_tx.unbounded_send(RpcMessage {
rpc_json: rpc.to_owned(),
session: rpc_session.clone(),
send_back: fut_tx,
});
let fut_rx = fut_rx
.map_err(|_| ())
.and_then(|fut| fut);
wasm_bindgen_futures::spawn_local(fut_rx.then(|_| Ok(())));
wasm_bindgen_futures::spawn_local(rx.for_each(move |s| {
match callback.call1(&callback, &JsValue::from_str(&s)) {
Ok(_) => Ok(()),
Err(_) => Err(()),
}
}).then(move |v| {
// We need to keep `rpc_session` alive.
debug!("RPC subscription has ended");
drop(rpc_session);
v
}));
}
}
+18 -11
View File
@@ -20,26 +20,27 @@
#![warn(unused_extern_crates)]
mod chain_spec;
#[cfg(feature = "browser")]
mod browser;
use chain_spec::ChainSpec;
use futures::{Future, FutureExt, TryFutureExt, future::select, channel::oneshot, compat::Future01CompatExt};
use futures::{
Future, FutureExt, TryFutureExt, future::select, channel::oneshot, compat::Future01CompatExt,
task::Spawn
};
use tokio::runtime::Runtime;
use std::sync::Arc;
use log::{info, error};
use structopt::StructOpt;
pub use service::{
AbstractService, CustomConfiguration,
ProvideRuntimeApi, CoreApi, ParachainHost,
WrappedExecutor
};
pub use cli::{VersionInfo, IntoExit, NoCustom};
pub use cli::{display_role, error};
type BoxedFuture = Box<dyn futures01::Future<Item = (), Error = ()> + Send>;
/// Abstraction over an executor that lets you spawn tasks in the background.
pub type TaskExecutor = Arc<dyn futures01::future::Executor<BoxedFuture> + Send + Sync>;
fn load_spec(id: &str) -> Result<Option<service::ChainSpec>, String> {
Ok(match ChainSpec::from(id) {
Some(spec) => Some(spec.load()?),
@@ -62,13 +63,14 @@ pub trait Worker: IntoExit {
fn configuration(&self) -> service::CustomConfiguration { Default::default() }
/// Do work and schedule exit.
fn work<S, SC, B, CE>(self, service: &S, executor: TaskExecutor) -> Self::Work
fn work<S, SC, B, CE, SP>(self, service: &S, spawner: SP) -> Self::Work
where S: AbstractService<Block = service::Block, RuntimeApi = service::RuntimeApi,
Backend = B, SelectChain = SC,
NetworkSpecialization = service::PolkadotProtocol, CallExecutor = CE>,
SC: service::SelectChain<service::Block> + 'static,
B: service::Backend<service::Block, service::Blake2Hasher> + 'static,
CE: service::CallExecutor<service::Block, service::Blake2Hasher> + Clone + Send + Sync + 'static;
CE: service::CallExecutor<service::Block, service::Blake2Hasher> + Clone + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync + 'static;
}
#[derive(Debug, StructOpt, Clone)]
@@ -147,8 +149,13 @@ pub fn run<W>(worker: W, version: cli::VersionInfo) -> error::Result<()> where
cli::ParseAndPrepare::RevertChain(cmd) => cmd.run_with_builder::<(), _, _, _, _, _>(|config|
Ok(service::new_chain_ops(config)?), load_spec),
cli::ParseAndPrepare::CustomCommand(PolkadotSubCommands::ValidationWorker(args)) => {
service::run_validation_worker(&args.mem_id)?;
Ok(())
if cfg!(feature = "browser") {
Err(error::Error::Input("Cannot run validation worker in browser".into()))
} else {
#[cfg(not(feature = "browser"))]
service::run_validation_worker(&args.mem_id)?;
Ok(())
}
}
}
}
@@ -180,7 +187,7 @@ fn run_until_exit<T, SC, B, CE, W>(
// but we need to keep holding a reference to the global telemetry guard
let _telemetry = service.telemetry();
let work = worker.work(&service, Arc::new(executor));
let work = worker.work(&service, WrappedExecutor(executor));
let service = service
.map_err(|err| error!("Error while running Service: {}", err))
.compat();
+2 -3
View File
@@ -6,8 +6,7 @@ description = "Collator node implementation"
edition = "2018"
[dependencies]
futures01 = { package = "futures", version = "0.1.17" }
futures = { version = "0.3.1", features = ["compat"] }
futures = "0.3.1"
client = { package = "sc-client", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
client-api = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
primitives = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
@@ -20,7 +19,7 @@ polkadot-network = { path = "../network" }
polkadot-validation = { path = "../validation" }
polkadot-service = { path = "../service" }
log = "0.4.8"
tokio = "0.1.22"
tokio = "0.2.1"
futures-timer = "1.0"
[dev-dependencies]
+25 -34
View File
@@ -49,11 +49,7 @@ use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use futures::{
future, Future, Stream, FutureExt, TryFutureExt, StreamExt,
compat::{Future01CompatExt, Stream01CompatExt}
};
use futures01::{Future as _};
use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn};
use log::{warn, error};
use client::BlockchainEvents;
use primitives::{Pair, Blake2Hasher};
@@ -71,7 +67,7 @@ use polkadot_network::validation::{LeafWorkParams, ValidationNetwork};
use polkadot_network::{PolkadotNetworkService, PolkadotProtocol};
use polkadot_runtime::RuntimeApi;
pub use polkadot_cli::{VersionInfo, TaskExecutor};
pub use polkadot_cli::VersionInfo;
pub use polkadot_network::validation::Incoming;
pub use polkadot_validation::SignedStatement;
pub use polkadot_primitives::parachain::CollatorId;
@@ -83,7 +79,7 @@ const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
pub trait Network: Send + Sync {
/// Convert the given `CollatorId` to a `PeerId`.
fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
Box<dyn Future<Output=Option<PeerId>> + Unpin + Send>;
Box<dyn Future<Output=Option<PeerId>> + Send>;
/// Create a `Stream` of checked statements for the given `relay_parent`.
///
@@ -93,26 +89,19 @@ pub trait Network: Send + Sync {
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>>;
}
impl<P, E> Network for ValidationNetwork<P, E, PolkadotNetworkService, TaskExecutor> where
impl<P, E, SP> Network for ValidationNetwork<P, E, PolkadotNetworkService, SP> where
P: 'static + Send + Sync,
E: 'static + Send + Sync,
SP: 'static + Spawn + Clone + Send + Sync,
{
fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
Box<dyn Future<Output=Option<PeerId>> + Unpin + Send>
Box<dyn Future<Output=Option<PeerId>> + Send>
{
Box::new(
Self::collator_id_to_peer_id(self, collator_id)
.compat()
.map(|res| res.ok().and_then(|id| id))
)
Box::new(Self::collator_id_to_peer_id(self, collator_id))
}
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>> {
Box::new(
Self::checked_statements(self, relay_parent)
.compat()
.filter_map(|item| future::ready(item.ok()))
)
Box::new(Self::checked_statements(self, relay_parent))
}
}
@@ -147,15 +136,16 @@ pub trait BuildParachainContext {
type ParachainContext: self::ParachainContext;
/// Build the `ParachainContext`.
fn build<B, E>(
fn build<B, E, SP>(
self,
client: Arc<PolkadotClient<B, E>>,
task_executor: TaskExecutor,
spawner: SP,
network: Arc<dyn Network>,
) -> Result<Self::ParachainContext, ()>
where
B: client_api::backend::Backend<Block, Blake2Hasher> + 'static,
E: client::CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync + 'static;
E: client::CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync + 'static;
}
/// Parachain context needed for collation.
@@ -239,16 +229,17 @@ pub async fn collate<R, P>(
}
/// Polkadot-api context.
struct ApiContext<P, E> {
network: Arc<ValidationNetwork<P, E, PolkadotNetworkService, TaskExecutor>>,
struct ApiContext<P, E, SP> {
network: Arc<ValidationNetwork<P, E, PolkadotNetworkService, SP>>,
parent_hash: Hash,
validators: Vec<ValidatorId>,
}
impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
impl<P: 'static, E: 'static, SP: 'static> RelayChainContext for ApiContext<P, E, SP> where
P: ProvideRuntimeApi + Send + Sync,
P::Api: ParachainHost<Block>,
E: futures::Future<Output=()> + Clone + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync
{
type Error = String;
type FutureEgress = Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Unpin + Send>;
@@ -262,7 +253,6 @@ impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
parent_hash: self.parent_hash,
authorities: self.validators.clone(),
})
.compat()
.map_err(|e| format!("unable to instantiate validation session: {:?}", e));
Box::new(future::ok(ConsolidatedIngress(Vec::new())))
@@ -302,7 +292,7 @@ impl<P, E> Worker for CollationNode<P, E> where
config
}
fn work<S, SC, B, CE>(self, service: &S, task_executor: TaskExecutor) -> Self::Work
fn work<S, SC, B, CE, SP>(self, service: &S, spawner: SP) -> Self::Work
where
S: AbstractService<
Block = Block,
@@ -314,7 +304,8 @@ impl<P, E> Worker for CollationNode<P, E> where
>,
SC: polkadot_service::SelectChain<Block> + 'static,
B: client_api::backend::Backend<Block, Blake2Hasher> + 'static,
CE: client::CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync + 'static
CE: client::CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync + 'static,
{
let CollationNode { build_parachain_context, exit, para_id, key } = self;
let client = service.client();
@@ -356,12 +347,12 @@ impl<P, E> Worker for CollationNode<P, E> where
exit.clone(),
message_validator,
client.clone(),
task_executor.clone(),
spawner.clone(),
));
let parachain_context = match build_parachain_context.build(
client.clone(),
task_executor,
spawner,
validation_network.clone(),
) {
Ok(ctx) => ctx,
@@ -433,13 +424,13 @@ impl<P, E> Worker for CollationNode<P, E> where
outgoing,
);
let exit = inner_exit_2.clone().unit_error().compat();
tokio::spawn(res.select(exit).then(|_| Ok(())));
let exit = inner_exit_2.clone();
tokio::spawn(future::select(res, exit).map(drop));
})
});
future::Either::Right(collation_work)
}).map(|_| Ok::<_, ()>(()));
});
let deadlined = future::select(
work,
@@ -456,7 +447,7 @@ impl<P, E> Worker for CollationNode<P, E> where
let future = future::select(
silenced,
inner_exit.clone()
).map(|_| Ok::<_, ()>(())).compat();
).map(drop);
tokio::spawn(future);
future::ready(())
+1 -2
View File
@@ -16,8 +16,7 @@ codec = { package = "parity-scale-codec", version = "1.1.0", default-features =
sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
futures = "0.1"
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
futures = "0.3.1"
log = "0.4.8"
exit-future = "0.2.0"
sc-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
+5 -5
View File
@@ -20,7 +20,7 @@ use codec::{Encode, Decode};
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{CollatorId, Id as ParaId, Collation};
use sc_network::PeerId;
use futures::sync::oneshot;
use futures::channel::oneshot;
use std::collections::hash_map::{HashMap, Entry};
use std::time::{Duration, Instant};
@@ -230,7 +230,7 @@ mod tests {
use polkadot_primitives::parachain::{
CandidateReceipt, BlockData, PoVBlock, HeadData, ConsolidatedIngress,
};
use futures::Future;
use futures::executor::block_on;
fn make_pov(block_data: Vec<u8>) -> PoVBlock {
PoVBlock {
@@ -293,8 +293,8 @@ mod tests {
pov: make_pov(vec![4, 5, 6]),
});
rx1.wait().unwrap();
rx2.wait().unwrap();
block_on(rx1).unwrap();
block_on(rx2).unwrap();
assert_eq!(pool.collators.get(&primary).map(|ids| &ids.1).unwrap(), &peer_id);
}
@@ -324,7 +324,7 @@ mod tests {
let (tx, rx) = oneshot::channel();
pool.await_collation(relay_parent, para_id, tx);
rx.wait().unwrap();
block_on(rx).unwrap();
}
#[test]
+24 -29
View File
@@ -26,10 +26,9 @@ pub mod validation;
pub mod gossip;
use codec::{Decode, Encode};
use futures::sync::oneshot;
use futures::future::Either;
use futures::channel::{oneshot, mpsc};
use futures::prelude::*;
use futures03::{channel::mpsc, compat::{Compat, Stream01CompatExt}, FutureExt, StreamExt, TryFutureExt};
use futures::future::Either;
use polkadot_primitives::{Block, Hash, Header};
use polkadot_primitives::parachain::{
Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock,
@@ -48,6 +47,8 @@ use self::local_collations::LocalCollations;
use log::{trace, debug, warn};
use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::task::{Context as PollContext, Poll};
use crate::gossip::{POLKADOT_ENGINE_ID, GossipMessage, ErasureChunkMessage};
@@ -112,23 +113,18 @@ impl<T> av_store::ProvideGossipMessages for AvailabilityNetworkShim<T>
where T: NetworkService
{
fn gossip_messages_for(&self, topic: Hash)
-> Box<dyn futures03::Stream<Item = (Hash, Hash, ErasureChunk)> + Unpin + Send>
-> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>
{
Box::new(self.0.gossip_messages_for(topic)
.compat()
.filter_map(|msg| async move {
self.0.gossip_messages_for(topic)
.filter_map(|(msg, _)| async move {
match msg {
Ok(msg) => match msg.0 {
GossipMessage::ErasureChunk(chunk) => {
Some((chunk.relay_parent, chunk.candidate_hash, chunk.chunk))
},
_ => None,
}
GossipMessage::ErasureChunk(chunk) => {
Some((chunk.relay_parent, chunk.candidate_hash, chunk.chunk))
},
_ => None,
}
})
.boxed()
)
}
fn gossip_erasure_chunk(
@@ -170,7 +166,7 @@ impl NetworkService for PolkadotNetworkService {
Err(_) => mpsc::unbounded().1, // return empty channel.
};
GossipMessageStream::new(Box::new(Compat::new(topic_stream.map(Ok))))
GossipMessageStream::new(topic_stream.boxed())
}
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
@@ -213,12 +209,12 @@ impl GossipService for consensus_gossip::ConsensusGossip<Block> {
/// A stream of gossip messages and an optional sender for a topic.
pub struct GossipMessageStream {
topic_stream: Box<dyn Stream<Item = TopicNotification, Error = ()> + Send>,
topic_stream: Pin<Box<dyn Stream<Item = TopicNotification> + Send>>,
}
impl GossipMessageStream {
/// Create a new instance with the given topic stream.
pub fn new(topic_stream: Box<dyn Stream<Item = TopicNotification, Error = ()> + Send>) -> Self {
pub fn new(topic_stream: Pin<Box<dyn Stream<Item = TopicNotification> + Send>>) -> Self {
Self {
topic_stream,
}
@@ -227,18 +223,20 @@ impl GossipMessageStream {
impl Stream for GossipMessageStream {
type Item = (GossipMessage, Option<PeerId>);
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll_next(self: Pin<&mut Self>, cx: &mut PollContext) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);
loop {
let msg = match futures::try_ready!(self.topic_stream.poll()) {
Some(msg) => msg,
None => return Ok(Async::Ready(None)),
let msg = match Pin::new(&mut this.topic_stream).poll_next(cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
};
debug!(target: "validation", "Processing statement for live validation leaf-work");
if let Ok(gmsg) = GossipMessage::decode(&mut &msg.message[..]) {
return Ok(Async::Ready(Some((gmsg, msg.sender))))
return Poll::Ready(Some((gmsg, msg.sender)))
}
}
}
@@ -835,7 +833,7 @@ impl PolkadotProtocol {
targets: HashSet<ValidatorId>,
collation: Collation,
outgoing_targeted: OutgoingMessages,
) -> impl futures::future::Future<Item = (), Error=()> {
) -> impl Future<Output = ()> {
debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}",
relay_parent, collation.info.parachain_index);
@@ -843,7 +841,7 @@ impl PolkadotProtocol {
Some(ref availability_store) => {
let availability_store_cloned = availability_store.clone();
let collation_cloned = collation.clone();
Either::A((async move {
Either::Left((async move {
let _ = availability_store_cloned.make_available(av_store::Data {
relay_parent,
parachain_id: collation_cloned.info.parachain_index,
@@ -852,13 +850,10 @@ impl PolkadotProtocol {
}).await;
}
)
.unit_error()
.boxed()
.compat()
.then(|_| Ok(()))
)
}
None => Either::B(futures::future::ok::<(), ()>(())),
None => Either::Right(futures::future::ready(())),
};
for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) {
+33 -29
View File
@@ -34,7 +34,7 @@ use polkadot_primitives::parachain::{
use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement, ErasureChunkMessage};
use futures::prelude::*;
use futures03::{future::FutureExt, TryFutureExt};
use futures::{task::SpawnExt, future::{ready, select}};
use parking_lot::Mutex;
use log::{debug, trace};
@@ -59,14 +59,14 @@ pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub(crate) fn checked_statements<N: NetworkService>(network: &N, topic: Hash) ->
impl Stream<Item=SignedStatement, Error=()> {
impl Stream<Item=SignedStatement> {
// spin up a task in the background that processes all incoming statements
// validation has been done already by the gossip validator.
// this will block internally until the gossip messages stream is obtained.
network.gossip_messages_for(topic)
.filter_map(|msg| match msg.0 {
GossipMessage::Statement(s) => Some(s.signed_statement),
_ => None
GossipMessage::Statement(s) => ready(Some(s.signed_statement)),
_ => ready(None)
})
}
@@ -101,7 +101,7 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement, Error=()> {
pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement> {
checked_statements(&**self.network(), self.attestation_topic)
}
@@ -130,7 +130,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
E: Future<Output=()> + Clone + Send + Unpin + 'static,
{
/// Import a statement whose signature has been checked already.
pub(crate) fn import_statement(&self, statement: SignedStatement) {
@@ -174,50 +174,54 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) {
trace!(target: "validation", "driving statement work to completion");
let exit = self.fetcher.exit().clone().unit_error().compat();
let work = work.select2(exit).then(|_| Ok(()));
self.fetcher.executor().spawn(work);
let work = select(work.boxed(), self.fetcher.exit().clone())
.map(drop);
let _ = self.fetcher.executor().spawn(work);
}
}
}
}
fn create_work<D>(&self, candidate_hash: Hash, producer: ParachainWork<D>)
-> impl Future<Item=(),Error=()> + Send + 'static
-> impl Future<Output=()> + Send + 'static
where
D: Future<Item=PoVBlock,Error=io::Error> + Send + 'static,
D: Future<Output=Result<PoVBlock,io::Error>> + Send + Unpin + 'static,
{
let table = self.table.clone();
let network = self.network().clone();
let knowledge = self.fetcher.knowledge().clone();
let attestation_topic = self.attestation_topic;
let parent_hash = self.parent_hash();
let api = self.fetcher.api().clone();
producer.prime(self.fetcher.api().clone())
.validate()
.boxed()
.compat()
.map(move |validated| {
// store the data before broadcasting statements, so other peers can fetch.
knowledge.lock().note_candidate(
async move {
match producer.prime(api).validate().await {
Ok(validated) => {
// store the data before broadcasting statements, so other peers can fetch.
knowledge.lock().note_candidate(
candidate_hash,
Some(validated.0.pov_block().clone()),
validated.0.outgoing_messages().cloned(),
);
);
// propagate the statement.
// consider something more targeted than gossip in the future.
let statement = GossipStatement::new(
// propagate the statement.
// consider something more targeted than gossip in the future.
let statement = GossipStatement::new(
parent_hash,
match table.import_validated(validated.0) {
None => return,
Some(s) => s,
None => return,
Some(s) => s,
}
);
);
network.gossip_message(attestation_topic, statement.into());
})
.map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e))
network.gossip_message(attestation_topic, statement.into());
},
Err(err) => {
debug!(target: "p_net", "Failed to produce statements: {:?}", err);
}
}
}
}
}
@@ -225,7 +229,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
P::Api: ParachainHost<Block>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
E: Future<Output=()> + Clone + Send + 'static,
{
type Error = io::Error;
type FetchValidationProof = validation::PoVReceiver;
+2 -2
View File
@@ -33,7 +33,7 @@ use sc_network::{
specialization::NetworkSpecialization,
};
use futures::Future;
use futures::executor::block_on;
mod validation;
@@ -245,7 +245,7 @@ fn fetches_from_those_with_knowledge() {
let pov_block = make_pov(block_data.0);
on_message(&mut protocol, &mut ctx, peer_b, Message::PovBlock(2, Some(pov_block.clone())));
drop(protocol);
assert_eq!(recv.wait().unwrap(), pov_block);
assert_eq!(block_on(recv).unwrap(), pov_block);
}
}
+23 -21
View File
@@ -39,22 +39,23 @@ use sp_runtime::traits::{ApiRef, {Block as BlockT}, ProvideRuntimeApi};
use std::collections::HashMap;
use std::sync::Arc;
use futures::{prelude::*, sync::mpsc};
use std::pin::Pin;
use std::task::{Poll, Context};
use futures::{prelude::*, channel::mpsc};
use codec::Encode;
use super::{TestContext, TestChainContext};
type TaskExecutor = Arc<dyn futures::future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>;
type TaskExecutor = Arc<dyn futures::task::Spawn + Send + Sync>;
#[derive(Clone, Copy)]
struct NeverExit;
impl Future for NeverExit {
type Item = ();
type Error = ();
type Output = ();
fn poll(&mut self) -> Poll<(), ()> {
Ok(Async::NotReady)
fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> {
Poll::Pending
}
}
@@ -93,27 +94,28 @@ impl GossipRouter {
}
impl Future for GossipRouter {
type Item = ();
type Error = ();
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
fn poll(&mut self) -> Poll<(), ()> {
loop {
match self.incoming_messages.poll().unwrap() {
Async::Ready(Some((topic, message))) => self.add_message(topic, message),
Async::Ready(None) => panic!("ended early."),
Async::NotReady => break,
match Pin::new(&mut this.incoming_messages).poll_next(cx) {
Poll::Ready(Some((topic, message))) => this.add_message(topic, message),
Poll::Ready(None) => panic!("ended early."),
Poll::Pending => break,
}
}
loop {
match self.incoming_streams.poll().unwrap() {
Async::Ready(Some((topic, sender))) => self.add_outgoing(topic, sender),
Async::Ready(None) => panic!("ended early."),
Async::NotReady => break,
match Pin::new(&mut this.incoming_streams).poll_next(cx) {
Poll::Ready(Some((topic, sender))) => this.add_outgoing(topic, sender),
Poll::Ready(None) => panic!("ended early."),
Poll::Pending => break,
}
}
Ok(Async::NotReady)
Poll::Pending
}
}
@@ -148,7 +150,7 @@ impl NetworkService for TestNetwork {
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let (tx, rx) = mpsc::unbounded();
let _ = self.gossip.send_listener.unbounded_send((topic, tx));
GossipMessageStream::new(Box::new(rx))
GossipMessageStream::new(rx.boxed())
}
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
@@ -417,8 +419,8 @@ impl av_store::ProvideGossipMessages for DummyGossipMessages {
fn gossip_messages_for(
&self,
_topic: Hash
) -> Box<dyn futures03::Stream<Item = (Hash, Hash, ErasureChunk)> + Send + Unpin> {
Box::new(futures03::stream::empty())
) -> Pin<Box<dyn futures::Stream<Item = (Hash, Hash, ErasureChunk)> + Send>> {
stream::empty().boxed()
}
fn gossip_erasure_chunk(
+63 -83
View File
@@ -31,17 +31,19 @@ use polkadot_primitives::parachain::{
};
use futures::prelude::*;
use futures::future::{self, Executor as FutureExecutor};
use futures::sync::oneshot::{self, Receiver};
use futures03::{FutureExt as _, TryFutureExt as _};
use futures::task::SpawnExt;
pub use futures::task::Spawn as Executor;
use futures::channel::oneshot::{self, Receiver};
use futures::future::{ready, select};
use std::collections::hash_map::{HashMap, Entry};
use std::io;
use std::sync::Arc;
use std::pin::Pin;
use std::task::{Poll, Context};
use arrayvec::ArrayVec;
use parking_lot::Mutex;
use log::warn;
use crate::router::Router;
use crate::gossip::{RegisteredMessageValidator, MessageValidationData};
@@ -50,33 +52,6 @@ use super::NetworkService;
pub use polkadot_validation::Incoming;
/// An executor suitable for dispatching async consensus tasks.
pub trait Executor {
fn spawn<F: Future<Item=(),Error=()> + Send + 'static>(&self, f: F);
}
/// A wrapped futures::future::Executor.
#[derive(Clone)]
pub struct WrappedExecutor<T>(pub T);
impl<T> Executor for WrappedExecutor<T>
where T: FutureExecutor<Box<dyn Future<Item=(),Error=()> + Send + 'static>>
{
fn spawn<F: Future<Item=(),Error=()> + Send + 'static>(&self, f: F) {
if let Err(e) = self.0.execute(Box::new(f)) {
warn!(target: "validation", "could not spawn consensus task: {:?}", e);
}
}
}
impl Executor for Arc<
dyn futures::future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync
> {
fn spawn<F: Future<Item=(),Error=()> + Send + 'static>(&self, f: F) {
let _ = FutureExecutor::execute(&**self, Box::new(f));
}
}
/// Params to instantiate validation work on a block-DAG leaf.
pub struct LeafWorkParams {
/// The local session key.
@@ -124,7 +99,7 @@ impl<P, E: Clone, N, T: Clone> Clone for ValidationNetwork<P, E, N, T> {
impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block>,
E: Clone + futures03::Future<Output=()> + Send + Sync + 'static,
E: Clone + Future<Output=()> + Send + Sync + 'static,
N: NetworkService,
T: Clone + Executor + Send + Sync + 'static,
{
@@ -184,13 +159,17 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService {
/// Convert the given `CollatorId` to a `PeerId`.
pub fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
impl Future<Item=Option<PeerId>, Error=()> + Send
impl Future<Output=Option<PeerId>> + Send
{
let (send, recv) = oneshot::channel();
self.network.with_spec(move |spec, _| {
let _ = send.send(spec.collator_id_to_peer_id(&collator_id).cloned());
});
recv.map_err(|_| ())
let network = self.network.clone();
async move {
let (send, recv) = oneshot::channel();
network.with_spec(move |spec, _| {
let _ = send.send(spec.collator_id_to_peer_id(&collator_id).cloned());
});
recv.await.ok().and_then(|opt| opt)
}
}
/// Create a `Stream` of checked statements for the given `relay_parent`.
@@ -198,7 +177,7 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService {
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub fn checked_statements(&self, relay_parent: Hash) -> impl Stream<Item=SignedStatement, Error=()> {
pub fn checked_statements(&self, relay_parent: Hash) -> impl Stream<Item=SignedStatement> {
crate::router::checked_statements(&*self.network, crate::router::attestation_topic(relay_parent))
}
}
@@ -207,13 +186,13 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService {
impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
E: Clone + futures03::Future<Output=()> + Send + Sync + Unpin + 'static,
E: Clone + Future<Output=()> + Send + Sync + Unpin + 'static,
N: NetworkService,
T: Clone + Executor + Send + Sync + 'static,
{
type Error = String;
type TableRouter = Router<P, E, N, T>;
type BuildTableRouter = Box<dyn Future<Item=Self::TableRouter, Error=String> + Send>;
type BuildTableRouter = Box<dyn Future<Output=Result<Self::TableRouter, String>> + Send + Unpin>;
fn communication_for(
&self,
@@ -234,7 +213,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
let executor = self.executor.clone();
let work = build_fetcher
.map_err(|e| format!("{:?}", e))
.map(move |fetcher| {
.map_ok(move |fetcher| {
let table_router = Router::new(
table,
fetcher,
@@ -243,12 +222,14 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
let table_router_clone = table_router.clone();
let work = table_router.checked_statements()
.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) })
.select(exit.clone().unit_error().compat())
.map(|_| ())
.map_err(|_| ());
.for_each(move |msg| {
table_router_clone.import_statement(msg);
ready(())
});
executor.spawn(work);
let work = select(work, exit).map(drop);
let _ = executor.spawn(work);
table_router
});
@@ -263,27 +244,26 @@ pub struct NetworkDown;
/// A future that resolves when a collation is received.
pub struct AwaitingCollation {
outer: futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver<Collation>>,
inner: Option<::futures::sync::oneshot::Receiver<Collation>>
outer: oneshot::Receiver<oneshot::Receiver<Collation>>,
inner: Option<oneshot::Receiver<Collation>>
}
impl Future for AwaitingCollation {
type Item = Collation;
type Error = NetworkDown;
type Output = Result<Collation, NetworkDown>;
fn poll(&mut self) -> Poll<Collation, NetworkDown> {
if let Some(ref mut inner) = self.inner {
return inner
.poll()
.map_err(|_| NetworkDown)
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
if let Some(ref mut inner) = this.inner {
return Pin::new(inner).poll(cx).map_err(|_| NetworkDown)
}
match self.outer.poll() {
Ok(futures::Async::Ready(inner)) => {
self.inner = Some(inner);
self.poll()
match Pin::new(&mut this.outer).poll(cx) {
Poll::Ready(Ok(inner)) => {
this.inner = Some(inner);
Pin::new(this).poll(cx)
},
Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady),
Err(_) => Err(NetworkDown)
Poll::Ready(Err(_)) => Poll::Ready(Err(NetworkDown)),
Poll::Pending => Poll::Pending,
}
}
}
@@ -297,7 +277,7 @@ impl<P, E: Clone, N, T: Clone> Collators for ValidationNetwork<P, E, N, T> where
type Collation = AwaitingCollation;
fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation {
let (tx, rx) = ::futures::sync::oneshot::channel();
let (tx, rx) = oneshot::channel();
self.network.with_spec(move |spec, _| {
let collation = spec.await_collation(relay_parent, parachain);
let _ = tx.send(collation);
@@ -375,17 +355,16 @@ pub struct IncomingReceiver {
}
impl Future for IncomingReceiver {
type Item = Incoming;
type Error = io::Error;
type Output = Result<Incoming, io::Error>;
fn poll(&mut self) -> Poll<Incoming, io::Error> {
match self.inner.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(i)) => Ok(Async::Ready(Incoming::clone(&*i))),
Err(_) => Err(io::Error::new(
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Pin::new(&mut Pin::into_inner(self).inner).poll(cx) {
Poll::Ready(Ok(i)) => Poll::Ready(Ok(Incoming::clone(&i))),
Poll::Ready(Err(_)) => Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"Sending end of channel hung up",
)),
))),
Poll::Pending => Poll::Pending,
}
}
}
@@ -592,24 +571,25 @@ pub struct PoVReceiver {
}
impl Future for PoVReceiver {
type Item = PoVBlock;
type Error = io::Error;
type Output = Result<PoVBlock, io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
fn poll(&mut self) -> Poll<PoVBlock, io::Error> {
let map_err = |_| io::Error::new(
io::ErrorKind::Other,
"Sending end of channel hung up",
);
if let Some(ref mut inner) = self.inner {
return inner.poll().map_err(map_err);
if let Some(ref mut inner) = this.inner {
return Pin::new(inner).poll(cx).map_err(map_err);
}
match self.outer.poll().map_err(map_err)? {
Async::Ready(inner) => {
self.inner = Some(inner);
self.poll()
match Pin::new(&mut this.outer).poll(cx).map_err(map_err)? {
Poll::Ready(inner) => {
this.inner = Some(inner);
Pin::new(this).poll(cx)
}
Async::NotReady => Ok(Async::NotReady),
Poll::Pending => Poll::Pending,
}
}
}
@@ -675,7 +655,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
P::Api: ParachainHost<Block>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: futures03::Future<Output=()> + Clone + Send + 'static,
E: Future<Output=()> + Clone + Send + 'static,
{
/// Fetch PoV block for the given candidate receipt.
pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver {
@@ -697,7 +677,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
);
let candidate = candidate.clone();
let (tx, rx) = ::futures::sync::oneshot::channel();
let (tx, rx) = oneshot::channel();
self.network.with_spec(move |spec, ctx| {
if let Ok(Some(canon_roots)) = canon_roots {
let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots);
+6 -3
View File
@@ -8,9 +8,8 @@ edition = "2018"
parking_lot = "0.9.0"
lazy_static = "1.4.0"
log = "0.4.8"
futures = "0.1.29"
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
exit-future = "0.1.4"
futures = "0.3.1"
futures01 = { package = "futures", version = "0.1.29" }
slog = "2.5.2"
hex-literal = "0.2.1"
av_store = { package = "polkadot-availability-store", path = "../availability-store" }
@@ -45,3 +44,7 @@ authority-discovery = { package = "sc-authority-discovery", git = "https://githu
authority-discovery-primitives = { package = "sp-authority-discovery", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
babe = { package = "sc-consensus-babe", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
babe-primitives = { package = "sp-consensus-babe", git = "https://github.com/paritytech/substrate", default-features = false, branch = "polkadot-master" }
[features]
default = ["rocksdb"]
rocksdb = ["service/rocksdb"]
+31 -15
View File
@@ -18,7 +18,8 @@
pub mod chain_spec;
use futures::sync::mpsc;
use futures01::sync::mpsc;
use futures::{FutureExt, TryFutureExt, task::{Spawn, SpawnError, FutureObj}};
use client::LongestChain;
use std::sync::Arc;
use std::time::Duration;
@@ -44,8 +45,21 @@ pub use primitives::Blake2Hasher;
pub use sp_runtime::traits::ProvideRuntimeApi;
pub use sc_network::specialization::NetworkSpecialization;
pub use chain_spec::ChainSpec;
#[cfg(not(target_os = "unknown"))]
pub use consensus::run_validation_worker;
/// Wrap a futures01 executor as a futures03 spawn.
#[derive(Clone)]
pub struct WrappedExecutor<T>(pub T);
impl<T> Spawn for WrappedExecutor<T>
where T: futures01::future::Executor<Box<dyn futures01::Future<Item=(),Error=()> + Send + 'static>>
{
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.0.execute(Box::new(future.map(Ok).compat()))
.map_err(|_| SpawnError::shutdown())
}
}
/// Polkadot-specific configuration.
pub struct CustomConfiguration {
/// Set to `Some` with a collator `CollatorId` and desired parachain
@@ -151,11 +165,7 @@ pub fn new_full(config: Configuration<CustomConfiguration, GenesisConfig>)
>, ServiceError>
{
use sc_network::DhtEvent;
use futures03::{
compat::Stream01CompatExt,
stream::StreamExt,
future::{FutureExt, TryFutureExt},
};
use futures::{compat::Stream01CompatExt, stream::StreamExt};
let is_collator = config.custom.collating_for.is_some();
let is_authority = config.roles.is_authority() && !is_collator;
@@ -237,12 +247,18 @@ pub fn new_full(config: Configuration<CustomConfiguration, GenesisConfig>)
let mut path = PathBuf::from(db_path);
path.push("availability");
av_store::Store::new(::av_store::Config {
cache_size: None,
path,
},
polkadot_network::AvailabilityNetworkShim(service.network()),
)?
let gossip = polkadot_network::AvailabilityNetworkShim(service.network());
#[cfg(not(target_os = "unknown"))]
{
av_store::Store::new(::av_store::Config {
cache_size: None,
path,
}, gossip)?
}
#[cfg(target_os = "unknown")]
av_store::Store::new_in_memory(gossip)
};
{
@@ -263,7 +279,7 @@ pub fn new_full(config: Configuration<CustomConfiguration, GenesisConfig>)
service.on_exit(),
gossip_validator,
service.client(),
polkadot_network::validation::WrappedExecutor(service.spawn_task_handle()),
WrappedExecutor(service.spawn_task_handle()),
);
let proposer = consensus::ProposerFactory::new(
client.clone(),
@@ -271,7 +287,7 @@ pub fn new_full(config: Configuration<CustomConfiguration, GenesisConfig>)
validation_network.clone(),
validation_network,
service.transaction_pool(),
Arc::new(service.spawn_task_handle()),
Arc::new(WrappedExecutor(service.spawn_task_handle())),
service.keystore(),
availability_store.clone(),
polkadot_runtime::constants::time::SLOT_DURATION,
@@ -287,7 +303,7 @@ pub fn new_full(config: Configuration<CustomConfiguration, GenesisConfig>)
let block_import = availability_store.block_import(
block_import,
client.clone(),
Arc::new(service.spawn_task_handle()),
Arc::new(WrappedExecutor(service.spawn_task_handle())),
service.keystore(),
)?;
+6 -5
View File
@@ -18,9 +18,8 @@
#![warn(missing_docs)]
use cli::{AbstractService, VersionInfo, TaskExecutor};
use futures::channel::oneshot;
use futures::{future, FutureExt};
use cli::{AbstractService, VersionInfo};
use futures::{channel::oneshot, future, FutureExt, task::Spawn};
use std::cell::RefCell;
@@ -33,6 +32,7 @@ impl cli::IntoExit for Worker {
let (exit_send, exit) = oneshot::channel();
let exit_send_cell = RefCell::new(Some(exit_send));
#[cfg(not(target_os = "unknown"))]
ctrlc::set_handler(move || {
if let Some(exit_send) = exit_send_cell.try_borrow_mut().expect("signal handler not reentrant; qed").take() {
exit_send.send(()).expect("Error sending exit notification");
@@ -45,13 +45,14 @@ impl cli::IntoExit for Worker {
impl cli::Worker for Worker {
type Work = <Self as cli::IntoExit>::Exit;
fn work<S, SC, B, CE>(self, _: &S, _: TaskExecutor) -> Self::Work
fn work<S, SC, B, CE, SP>(self, _: &S, _: SP) -> Self::Work
where S: AbstractService<Block = service::Block, RuntimeApi = service::RuntimeApi,
Backend = B, SelectChain = SC,
NetworkSpecialization = service::PolkadotProtocol, CallExecutor = CE>,
SC: service::SelectChain<service::Block> + 'static,
B: service::Backend<service::Block, service::Blake2Hasher> + 'static,
CE: service::CallExecutor<service::Block, service::Blake2Hasher> + Clone + Send + Sync + 'static {
CE: service::CallExecutor<service::Block, service::Blake2Hasher> + Clone + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync + 'static {
use cli::IntoExit;
self.into_exit()
}
@@ -29,11 +29,9 @@ use primitives::{
HeadData, BlockData, Id as ParaId, Message, OutgoingMessages, Status as ParachainStatus,
},
};
use collator::{
InvalidHead, ParachainContext, VersionInfo, Network, BuildParachainContext, TaskExecutor,
};
use collator::{InvalidHead, ParachainContext, VersionInfo, Network, BuildParachainContext};
use parking_lot::Mutex;
use futures::future::{Ready, ok, err};
use futures::{future::{Ready, ok, err}, task::Spawn};
const GENESIS: AdderHead = AdderHead {
number: 0,
@@ -108,15 +106,16 @@ impl ParachainContext for AdderContext {
impl BuildParachainContext for AdderContext {
type ParachainContext = Self;
fn build<B, E>(
fn build<B, E, SP>(
self,
_: Arc<collator::PolkadotClient<B, E>>,
_: TaskExecutor,
_: SP,
network: Arc<dyn Network>,
) -> Result<Self::ParachainContext, ()>
where
B: client_api::backend::Backend<Block, Blake2Hasher> + 'static,
E: client::CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync + 'static
E: client::CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync + 'static,
{
Ok(Self { _network: Some(network), ..self })
}
+2 -5
View File
@@ -5,16 +5,13 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
futures = "0.1.17"
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
futures = "0.3.1"
futures-timer = "2.0"
async-std = { version = "1.0.1", features = ["unstable"] }
parking_lot = "0.9.0"
tokio = "0.1.22"
tokio = { version = "0.2.4", features = ["rt-core", "blocking"] }
derive_more = "0.14.1"
log = "0.4.8"
exit-future = "0.2.0"
tokio-executor = { version = "0.2.0-alpha.6", features = ["blocking"] }
codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] }
availability_store = { package = "polkadot-availability-store", path = "../availability-store" }
parachain = { package = "polkadot-parachain", path = "../parachain" }
+21 -32
View File
@@ -23,15 +23,14 @@
/// such as candidate verification while performing event-driven work
/// on a local event loop.
use std::{thread, time::{Duration, Instant}, sync::Arc};
use std::{thread, time::Duration, sync::Arc};
use client::{BlockchainEvents, BlockBody};
use sp_blockchain::HeaderBackend;
use block_builder::BlockBuilderApi;
use consensus::SelectChain;
use futures::prelude::*;
use futures03::{TryStreamExt as _, StreamExt as _, FutureExt as _, TryFutureExt as _};
use log::error;
use futures::{future::{ready, select}, task::{Spawn, SpawnExt}};
use polkadot_primitives::Block;
use polkadot_primitives::parachain::ParachainHost;
use runtime_primitives::traits::{ProvideRuntimeApi};
@@ -39,12 +38,12 @@ use babe_primitives::BabeApi;
use keystore::KeyStorePtr;
use sp_api::ApiExt;
use tokio::{timer::Interval, runtime::current_thread::Runtime as LocalRuntime};
use log::{warn, debug};
use tokio::{runtime::Runtime as LocalRuntime};
use log::{warn, error};
use super::{Network, Collators};
type TaskExecutor = Arc<dyn futures::future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>;
type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
/// Parachain candidate attestation service handle.
pub(crate) struct ServiceHandle {
@@ -62,8 +61,8 @@ pub(crate) fn start<C, N, P, SC>(
max_block_data_size: Option<u64>,
) -> ServiceHandle
where
C: Collators + Send + Sync + 'static,
<C::Collation as IntoFuture>::Future: Send + 'static,
C: Collators + Send + Sync + Unpin + 'static,
C::Collation: Send + Unpin + 'static,
P: BlockchainEvents<Block> + BlockBody<Block>,
P: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> +
@@ -72,10 +71,9 @@ pub(crate) fn start<C, N, P, SC>(
ApiExt<Block, Error = sp_blockchain::Error>,
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static,
<N::BuildTableRouter as IntoFuture>::Future: Send + 'static,
N::BuildTableRouter: Send + Unpin + 'static,
SC: SelectChain<Block> + 'static,
{
const TIMER_DELAY: Duration = Duration::from_secs(5);
const TIMER_INTERVAL: Duration = Duration::from_secs(30);
let (signal, exit) = ::exit_future::signal();
@@ -87,8 +85,7 @@ pub(crate) fn start<C, N, P, SC>(
let keystore = keystore.clone();
client.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
let notifications = client.import_notification_stream()
.for_each(move |notification| {
let parent_hash = notification.hash;
if notification.is_new_best {
@@ -105,43 +102,35 @@ pub(crate) fn start<C, N, P, SC>(
);
}
}
Ok(())
})
.select(exit.clone().unit_error().compat())
.then(|_| Ok(()))
ready(())
});
select(notifications, exit.clone())
};
let prune_old_sessions = {
let select_chain = select_chain.clone();
let interval = Interval::new(
Instant::now() + TIMER_DELAY,
TIMER_INTERVAL,
);
interval
let interval = crate::interval(TIMER_INTERVAL)
.for_each(move |_| match select_chain.leaves() {
Ok(leaves) => {
parachain_validation.retain(|h| leaves.contains(h));
Ok(())
ready(())
}
Err(e) => {
warn!("Error fetching leaves from client: {:?}", e);
Ok(())
ready(())
}
})
.map_err(|e| warn!("Timer error {:?}", e))
.select(exit.clone().unit_error().compat())
.then(|_| Ok(()))
});
select(interval, exit.clone()).map(|_| ())
};
runtime.spawn(notifications);
if let Err(_) = thread_pool.execute(Box::new(prune_old_sessions)) {
if let Err(_) = thread_pool.spawn(prune_old_sessions) {
error!("Failed to spawn old sessions pruning task");
}
if let Err(e) = runtime.block_on(exit.unit_error().compat()) {
debug!("BFT event loop error {:?}", e);
}
runtime.block_on(exit);
});
ServiceHandle {
+30 -19
View File
@@ -32,6 +32,8 @@ use parachain::{wasm_executor::{self, ExternalitiesError, ExecutionMode}, Messag
use trie::TrieConfiguration;
use futures::prelude::*;
use log::debug;
use std::task::{Poll, Context};
use std::pin::Pin;
/// Encapsulates connections to collators and allows collation on any parachain.
///
@@ -40,7 +42,7 @@ pub trait Collators: Clone {
/// Errors when producing collations.
type Error: std::fmt::Debug;
/// A full collation.
type Collation: IntoFuture<Item=Collation,Error=Self::Error>;
type Collation: Future<Output=Result<Collation, Self::Error>>;
/// Collate on a specific parachain, building on a given relay chain parent hash.
///
@@ -63,7 +65,7 @@ pub struct CollationFetch<C: Collators, P> {
relay_parent_hash: Hash,
relay_parent: BlockId,
collators: C,
live_fetch: Option<<C::Collation as IntoFuture>::Future>,
live_fetch: Option<C::Collation>,
client: Arc<P>,
max_block_data_size: Option<u64>,
}
@@ -99,41 +101,50 @@ impl<C: Collators, P> CollationFetch<C, P> {
}
}
impl<C: Collators, P: ProvideRuntimeApi> Future for CollationFetch<C, P>
where P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
impl<C, P> Future for CollationFetch<C, P>
where
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
C: Collators + Unpin,
P: ProvideRuntimeApi,
<C as Collators>::Collation: Unpin,
{
type Item = (Collation, OutgoingMessages, Balance);
type Error = C::Error;
type Output = Result<(Collation, OutgoingMessages, Balance),C::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
fn poll(&mut self) -> Poll<(Collation, OutgoingMessages, Balance), C::Error> {
loop {
let collation = {
let parachain = self.parachain.clone();
let (r, c) = (self.relay_parent_hash, &self.collators);
let poll = self.live_fetch
.get_or_insert_with(move || c.collate(parachain, r).into_future())
.poll();
let parachain = this.parachain.clone();
let (r, c) = (this.relay_parent_hash, &this.collators);
futures::try_ready!(poll)
let future = this.live_fetch
.get_or_insert_with(move || c.collate(parachain, r));
match Pin::new(future).poll(cx) {
Poll::Ready(Ok(c)) => c,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending
}
};
let res = validate_collation(
&*self.client,
&self.relay_parent,
&*this.client,
&this.relay_parent,
&collation,
self.max_block_data_size,
this.max_block_data_size,
);
match res {
Ok((messages, fees)) => {
return Ok(Async::Ready((collation, messages, fees)))
return Poll::Ready(Ok((collation, messages, fees)))
}
Err(e) => {
debug!("Failed to validate parachain due to API error: {}", e);
// just continue if we got a bad collation or failed to validate
self.live_fetch = None;
self.collators.note_bad_collator(collation.info.collator)
this.live_fetch = None;
this.collators.note_bad_collator(collation.info.collator)
}
}
}
+1 -3
View File
@@ -46,9 +46,7 @@ pub enum Error {
Timer(std::io::Error),
#[display(fmt = "Failed to compute deadline of now + {:?}", _0)]
DeadlineComputeFailure(std::time::Duration),
/// Unable to dispatch agreement future
#[display(fmt = "Unable to dispatch agreement future: {:?}", _0)]
Executor(futures::future::ExecuteErrorKind),
Join(tokio::task::JoinError)
}
impl std::error::Error for Error {
+40 -35
View File
@@ -56,12 +56,11 @@ use polkadot_primitives::parachain::{
use primitives::Pair;
use runtime_primitives::traits::{ProvideRuntimeApi, DigestFor};
use futures_timer::Delay;
use async_std::stream::{interval, Interval};
use txpool_api::{TransactionPool, InPoolTransaction};
use attestation_service::ServiceHandle;
use futures::prelude::*;
use futures03::{future::{self, Either}, FutureExt, StreamExt, TryFutureExt};
use futures::{future::{self, Either, select, ready}, stream::unfold, task::{Spawn, SpawnExt}};
use collation::CollationFetch;
use dynamic_inclusion::DynamicInclusion;
use inherents::InherentData;
@@ -70,10 +69,13 @@ use log::{info, debug, warn, trace, error};
use keystore::KeyStorePtr;
use sp_api::ApiExt;
type TaskExecutor =
Arc<
dyn futures::future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>>
+ Send + Sync>;
type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
fn interval(duration: Duration) -> impl Stream<Item=()> + Send + Unpin {
unfold((), move |_| {
futures_timer::Delay::new(duration).map(|_| Some(((), ())))
}).map(drop)
}
pub use self::collation::{
validate_collation, validate_incoming, message_queue_root, egress_roots, Collators,
@@ -84,6 +86,8 @@ pub use self::shared_table::{
SharedTable, ParachainWork, PrimedParachainWork, Validated, Statement, SignedStatement,
GenericStatement,
};
#[cfg(not(target_os = "unknown"))]
pub use parachain::wasm_executor::{run_worker as run_validation_worker};
mod attestation_service;
@@ -107,7 +111,7 @@ pub trait TableRouter: Clone {
/// Errors when fetching data from the network.
type Error: std::fmt::Debug;
/// Future that resolves when candidate data is fetched.
type FetchValidationProof: IntoFuture<Item=PoVBlock,Error=Self::Error>;
type FetchValidationProof: Future<Output=Result<PoVBlock, Self::Error>>;
/// Call with local candidate data. This will make the data available on the network,
/// and sign, import, and broadcast a statement about the candidate.
@@ -134,7 +138,7 @@ pub trait Network {
/// The future used for asynchronously building the table router.
/// This should not fail.
type BuildTableRouter: IntoFuture<Item=Self::TableRouter,Error=Self::Error>;
type BuildTableRouter: Future<Output=Result<Self::TableRouter,Self::Error>>;
/// Instantiate a table router using the given shared table.
/// Also pass through any outgoing messages to be broadcast to peers.
@@ -273,13 +277,13 @@ struct ParachainValidation<C, N, P> {
}
impl<C, N, P> ParachainValidation<C, N, P> where
C: Collators + Send + 'static,
C: Collators + Send + Unpin + 'static,
N: Network,
P: ProvideRuntimeApi + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
<C::Collation as IntoFuture>::Future: Send + 'static,
C::Collation: Send + Unpin + 'static,
N::TableRouter: Send + 'static,
<N::BuildTableRouter as IntoFuture>::Future: Send + 'static,
N::BuildTableRouter: Unpin + Send + 'static,
{
/// Get an attestation table for given parent hash.
///
@@ -400,7 +404,7 @@ impl<C, N, P> ParachainValidation<C, N, P> where
max_block_data_size,
);
collation_work.then(move |result| match result {
collation_work.map(move |result| match result {
Ok((collation, outgoing_targeted, fees_charged)) => {
match produce_receipt_and_chunks(
authorities_num,
@@ -427,10 +431,9 @@ impl<C, N, P> ParachainValidation<C, N, P> where
}
.unit_error()
.boxed()
.compat()
.then(move |_| {
router.local_collation(collation, receipt, outgoing_targeted, (local_id, &chunks));
Ok(())
ready(())
});
@@ -449,18 +452,16 @@ impl<C, N, P> ParachainValidation<C, N, P> where
})
};
let cancellable_work = build_router
.into_future()
let router = build_router
.map_ok(with_router)
.map_err(|e| {
warn!(target: "validation" , "Failed to build table router: {:?}", e);
})
.and_then(with_router)
.then(|_| Ok(()))
.select(exit.unit_error().compat())
.then(|_| Ok(()));
});
let cancellable_work = select(exit, router).map(drop);
// spawn onto thread pool.
if self.handle.execute(Box::new(cancellable_work)).is_err() {
if self.handle.spawn(cancellable_work).is_err() {
error!("Failed to spawn cancellable work task");
}
}
@@ -485,8 +486,8 @@ pub struct ProposerFactory<C, N, P, SC, TxPool: TransactionPool> {
}
impl<C, N, P, SC, TxPool> ProposerFactory<C, N, P, SC, TxPool> where
C: Collators + Send + Sync + 'static,
<C::Collation as IntoFuture>::Future: Send + 'static,
C: Collators + Send + Sync + Unpin + 'static,
C::Collation: Send + Unpin + 'static,
P: BlockchainEvents<Block> + BlockBody<Block>,
P: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> +
@@ -495,7 +496,7 @@ impl<C, N, P, SC, TxPool> ProposerFactory<C, N, P, SC, TxPool> where
ApiExt<Block, Error = sp_blockchain::Error>,
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static,
<N::BuildTableRouter as IntoFuture>::Future: Send + 'static,
N::BuildTableRouter: Send + Unpin + 'static,
TxPool: TransactionPool,
SC: SelectChain<Block> + 'static,
{
@@ -543,7 +544,7 @@ impl<C, N, P, SC, TxPool> ProposerFactory<C, N, P, SC, TxPool> where
}
impl<C, N, P, SC, TxPool> consensus::Environment<Block> for ProposerFactory<C, N, P, SC, TxPool> where
C: Collators + Send + 'static,
C: Collators + Send + Unpin + 'static,
N: Network,
TxPool: TransactionPool<Block=Block> + 'static,
P: ProvideRuntimeApi + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
@@ -551,9 +552,9 @@ impl<C, N, P, SC, TxPool> consensus::Environment<Block> for ProposerFactory<C, N
BlockBuilderApi<Block> +
BabeApi<Block> +
ApiExt<Block, Error = sp_blockchain::Error>,
<C::Collation as IntoFuture>::Future: Send + 'static,
C::Collation: Send + Unpin + 'static,
N::TableRouter: Send + 'static,
<N::BuildTableRouter as IntoFuture>::Future: Send + 'static,
N::BuildTableRouter: Send + Unpin + 'static,
SC: SelectChain<Block>,
{
type Proposer = Proposer<P, TxPool>;
@@ -649,7 +650,7 @@ impl<C, TxPool> consensus::Proposer<Block> for Proposer<C, TxPool> where
let timing = ProposalTiming {
minimum: delay_future,
attempt_propose: interval(ATTEMPT_PROPOSE_EVERY),
attempt_propose: Box::new(interval(ATTEMPT_PROPOSE_EVERY)),
enough_candidates: Delay::new(enough_candidates),
dynamic_inclusion,
last_included: initial_included,
@@ -690,7 +691,7 @@ fn current_timestamp() -> u64 {
struct ProposalTiming {
minimum: Option<Delay>,
attempt_propose: Interval,
attempt_propose: Box<dyn Stream<Item=()> + Send + Unpin>,
dynamic_inclusion: DynamicInclusion,
enough_candidates: Delay,
last_included: usize,
@@ -746,7 +747,7 @@ enum CreateProposalState<C: Send + Sync, TxPool> {
/// Represents the state when we switch from pending to fired.
Switching,
/// Block proposing has fired.
Fired(tokio_executor::blocking::Blocking<Result<Block, Error>>),
Fired(tokio::task::JoinHandle<Result<Block, Error>>),
}
/// Inner data of the create proposal.
@@ -858,7 +859,7 @@ impl<C, TxPool> CreateProposalData<C, TxPool> where
}
}
impl<C, TxPool> futures03::Future for CreateProposal<C, TxPool> where
impl<C, TxPool> Future for CreateProposal<C, TxPool> where
TxPool: TransactionPool<Block=Block> + 'static,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
C::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
@@ -892,18 +893,22 @@ impl<C, TxPool> futures03::Future for CreateProposal<C, TxPool> where
thus Switching will never be reachable here; qed"
),
CreateProposalState::Fired(mut future) => {
let ret = Pin::new(&mut future).poll(cx);
let ret = Pin::new(&mut future)
.poll(cx)
.map(|res| res.map_err(Error::Join).and_then(|res| res));
self.state = CreateProposalState::Fired(future);
return ret
},
};
// 2. propose
let mut future = tokio_executor::blocking::run(move || {
let mut future = tokio::task::spawn_blocking(move || {
let proposed_candidates = data.table.proposed_set();
data.propose_with(proposed_candidates)
});
let polled = Pin::new(&mut future).poll(cx);
let polled = Pin::new(&mut future)
.poll(cx)
.map(|res| res.map_err(Error::Join).and_then(|res| res));
self.state = CreateProposalState::Fired(future);
polled
@@ -19,7 +19,9 @@
use std::collections::HashMap;
use futures::prelude::*;
use futures::sync::oneshot;
use futures::channel::oneshot;
use std::pin::Pin;
use std::task::{Poll, Context};
use polkadot_primitives::Hash;
@@ -95,17 +97,17 @@ impl IncludabilitySender {
pub struct Includable(oneshot::Receiver<()>);
impl Future for Includable {
type Item = ();
type Error = oneshot::Canceled;
type Output = Result<(), oneshot::Canceled>;
fn poll(&mut self) -> Poll<(), oneshot::Canceled> {
self.0.poll()
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::new(&mut Pin::into_inner(self).0).poll(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::executor::block_on;
#[test]
fn it_works() {
@@ -132,6 +134,6 @@ mod tests {
sender.update_candidate(hash1, true);
assert!(sender.is_complete());
recv.wait().unwrap();
block_on(recv).unwrap();
}
}
+18 -18
View File
@@ -139,7 +139,7 @@ impl SharedTableInner {
statement: table::SignedStatement,
max_block_data_size: Option<u64>,
) -> Option<ParachainWork<
<R::FetchValidationProof as IntoFuture>::Future,
R::FetchValidationProof
>> {
let summary = self.table.import_statement(context, statement)?;
self.update_trackers(&summary.candidate, context);
@@ -172,7 +172,7 @@ impl SharedTableInner {
None
}
Some(candidate) => {
let fetch = router.fetch_pov_block(candidate).into_future();
let fetch = router.fetch_pov_block(candidate);
Some(Work {
candidate_receipt: candidate.clone(),
@@ -267,13 +267,13 @@ pub struct ParachainWork<Fetch> {
max_block_data_size: Option<u64>,
}
impl<Fetch: Future> ParachainWork<Fetch> {
impl<Fetch: Future + Unpin> ParachainWork<Fetch> {
/// Prime the parachain work with an API reference for extracting
/// chain information.
pub fn prime<P: ProvideRuntimeApi>(self, api: Arc<P>)
-> PrimedParachainWork<
Fetch,
impl Send + FnMut(&BlockId, &PoVBlock, &CandidateReceipt) -> Result<(OutgoingMessages, ErasureChunk), ()>,
impl Send + FnMut(&BlockId, &PoVBlock, &CandidateReceipt) -> Result<(OutgoingMessages, ErasureChunk), ()> + Unpin,
>
where
P: Send + Sync + 'static,
@@ -326,14 +326,13 @@ pub struct PrimedParachainWork<Fetch, F> {
impl<Fetch, F, Err> PrimedParachainWork<Fetch, F>
where
Fetch: Future<Item=PoVBlock,Error=Err>,
F: FnMut(&BlockId, &PoVBlock, &CandidateReceipt) -> Result<(OutgoingMessages, ErasureChunk), ()>,
Fetch: Future<Output=Result<PoVBlock,Err>> + Unpin,
F: FnMut(&BlockId, &PoVBlock, &CandidateReceipt) -> Result<(OutgoingMessages, ErasureChunk), ()> + Unpin,
Err: From<::std::io::Error>,
{
pub async fn validate(mut self) -> Result<(Validated, Option<ErasureChunk>), Err> {
use futures03::compat::Future01CompatExt;
let candidate = &self.inner.work.candidate_receipt;
let pov_block = self.inner.work.fetch.compat().await?;
let pov_block = self.inner.work.fetch.await?;
let validation_res = (self.validate)(
&BlockId::hash(self.inner.relay_parent),
@@ -439,7 +438,7 @@ impl SharedTable {
router: &R,
statement: table::SignedStatement,
) -> Option<ParachainWork<
<R::FetchValidationProof as IntoFuture>::Future,
R::FetchValidationProof,
>> {
self.inner.lock().import_remote_statement(&*self.context, router, statement, self.max_block_data_size)
}
@@ -455,7 +454,7 @@ impl SharedTable {
R: TableRouter,
I: IntoIterator<Item=table::SignedStatement>,
U: ::std::iter::FromIterator<Option<ParachainWork<
<R::FetchValidationProof as IntoFuture>::Future,
R::FetchValidationProof,
>>>,
{
let mut inner = self.inner.lock();
@@ -575,8 +574,9 @@ mod tests {
use polkadot_primitives::parachain::{AvailableMessages, BlockData, ConsolidatedIngress, Collation};
use polkadot_erasure_coding::{self as erasure};
use availability_store::ProvideGossipMessages;
use futures::{future};
use futures::future;
use futures::executor::block_on;
use std::pin::Pin;
fn pov_block_with_data(data: Vec<u8>) -> PoVBlock {
PoVBlock {
@@ -592,8 +592,8 @@ mod tests {
fn gossip_messages_for(
&self,
_topic: Hash
) -> Box<dyn futures03::Stream<Item = (Hash, Hash, ErasureChunk)> + Unpin + Send> {
Box::new(futures03::stream::empty())
) -> Pin<Box<dyn futures::Stream<Item = (Hash, Hash, ErasureChunk)> + Send>> {
futures::stream::empty().boxed()
}
fn gossip_erasure_chunk(
@@ -609,7 +609,7 @@ mod tests {
struct DummyRouter;
impl TableRouter for DummyRouter {
type Error = ::std::io::Error;
type FetchValidationProof = future::FutureResult<PoVBlock,Self::Error>;
type FetchValidationProof = future::Ready<Result<PoVBlock,Self::Error>>;
fn local_collation(
&self,
@@ -766,7 +766,7 @@ mod tests {
n_validators as u32,
).unwrap();
let producer: ParachainWork<future::FutureResult<_, ::std::io::Error>> = ParachainWork {
let producer: ParachainWork<future::Ready<Result<_, ::std::io::Error>>> = ParachainWork {
work: Work {
candidate_receipt: candidate,
fetch: future::ok(pov_block.clone()),
@@ -777,7 +777,7 @@ mod tests {
max_block_data_size: None,
};
let validated = futures03::executor::block_on(producer.prime_with(|_, _, _| Ok((
let validated = block_on(producer.prime_with(|_, _, _| Ok((
OutgoingMessages { outgoing_messages: Vec::new() },
ErasureChunk {
chunk: vec![1, 2, 3],
@@ -841,7 +841,7 @@ mod tests {
max_block_data_size: None,
};
let validated = futures03::executor::block_on(producer.prime_with(|_, _, _| Ok((
let validated = block_on(producer.prime_with(|_, _, _| Ok((
OutgoingMessages { outgoing_messages: Vec::new() },
ErasureChunk {
chunk: chunks[local_index].clone(),