Polkadot service (#82)

* Block import notifications

* Build fix

* Consensus messages supported in the networking

* Started consensus service

* BFT service

* Transaction propagation

* Polkadot service

* CLI integration

* Build fix

* Added signatures validation

* Removed executor argument

* Refactored steam loops; Queue size increased

* Limit queue size

* Fixed doc comment

* Fixed wasm build

* Fixed wasm build

* Check id properly
This commit is contained in:
Arkadiy Paronyan
2018-03-30 13:54:37 +02:00
committed by Robert Habermeier
parent 96fb93b09c
commit 471761f4b6
34 changed files with 1597 additions and 420 deletions
+88 -1
View File
@@ -182,6 +182,11 @@ name = "constant_time_eq"
version = "0.1.3" version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "crossbeam"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "crossbeam" name = "crossbeam"
version = "0.3.2" version = "0.3.2"
@@ -949,6 +954,18 @@ dependencies = [
"ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "multiqueue"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "net2" name = "net2"
version = "0.2.31" version = "0.2.31"
@@ -993,6 +1010,11 @@ dependencies = [
"winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "owning_ref"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "owning_ref" name = "owning_ref"
version = "0.3.3" version = "0.3.3"
@@ -1021,6 +1043,16 @@ dependencies = [
"rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "parking_lot"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"owning_ref 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot_core 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
"thread-id 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.4.8" version = "0.4.8"
@@ -1120,9 +1152,9 @@ dependencies = [
"hex-literal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"polkadot-executor 0.1.0", "polkadot-executor 0.1.0",
"polkadot-keystore 0.1.0",
"polkadot-primitives 0.1.0", "polkadot-primitives 0.1.0",
"polkadot-runtime 0.1.0", "polkadot-runtime 0.1.0",
"polkadot-service 0.1.0",
"substrate-client 0.1.0", "substrate-client 0.1.0",
"substrate-codec 0.1.0", "substrate-codec 0.1.0",
"substrate-executor 0.1.0", "substrate-executor 0.1.0",
@@ -1157,8 +1189,12 @@ dependencies = [
"polkadot-statement-table 0.1.0", "polkadot-statement-table 0.1.0",
"polkadot-transaction-pool 0.1.0", "polkadot-transaction-pool 0.1.0",
"substrate-bft 0.1.0", "substrate-bft 0.1.0",
"substrate-client 0.1.0",
"substrate-codec 0.1.0", "substrate-codec 0.1.0",
"substrate-keyring 0.1.0",
"substrate-network 0.1.0",
"substrate-primitives 0.1.0", "substrate-primitives 0.1.0",
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@@ -1226,6 +1262,32 @@ dependencies = [
"substrate-runtime-support 0.1.0", "substrate-runtime-support 0.1.0",
] ]
[[package]]
name = "polkadot-service"
version = "0.1.0"
dependencies = [
"ed25519 0.1.0",
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"polkadot-api 0.1.0",
"polkadot-consensus 0.1.0",
"polkadot-executor 0.1.0",
"polkadot-keystore 0.1.0",
"polkadot-primitives 0.1.0",
"polkadot-runtime 0.1.0",
"polkadot-transaction-pool 0.1.0",
"substrate-client 0.1.0",
"substrate-codec 0.1.0",
"substrate-executor 0.1.0",
"substrate-keyring 0.1.0",
"substrate-network 0.1.0",
"substrate-primitives 0.1.0",
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "polkadot-statement-table" name = "polkadot-statement-table"
version = "0.1.0" version = "0.1.0"
@@ -1553,6 +1615,11 @@ name = "smallvec"
version = "0.2.1" version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "smallvec"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "0.4.4" version = "0.4.4"
@@ -1615,6 +1682,7 @@ dependencies = [
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"hex-literal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"multiqueue 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"substrate-bft 0.1.0", "substrate-bft 0.1.0",
"substrate-codec 0.1.0", "substrate-codec 0.1.0",
@@ -1682,11 +1750,14 @@ name = "substrate-network"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"bitflags 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "bitflags 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"ed25519 0.1.0",
"env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-io 1.9.0 (git+https://github.com/paritytech/parity.git)", "ethcore-io 1.9.0 (git+https://github.com/paritytech/parity.git)",
"ethcore-network 1.9.0 (git+https://github.com/paritytech/parity.git)", "ethcore-network 1.9.0 (git+https://github.com/paritytech/parity.git)",
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"multiqueue 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1882,6 +1953,16 @@ dependencies = [
"unicode-width 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "unicode-width 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "thread-id"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)",
"redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "thread_local" name = "thread_local"
version = "0.3.5" version = "0.3.5"
@@ -2195,6 +2276,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum clap 2.29.4 (registry+https://github.com/rust-lang/crates.io-index)" = "7b8f59bcebcfe4269b09f71dab0da15b355c75916a8f975d3876ce81561893ee" "checksum clap 2.29.4 (registry+https://github.com/rust-lang/crates.io-index)" = "7b8f59bcebcfe4269b09f71dab0da15b355c75916a8f975d3876ce81561893ee"
"checksum coco 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c06169f5beb7e31c7c67ebf5540b8b472d23e3eade3b2ec7d1f5b504a85f91bd" "checksum coco 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c06169f5beb7e31c7c67ebf5540b8b472d23e3eade3b2ec7d1f5b504a85f91bd"
"checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e" "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e"
"checksum crossbeam 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)" = "bd66663db5a988098a89599d4857919b3acf7f61402e61365acfd3919857b9be"
"checksum crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "24ce9782d4d5c53674646a6a4c1863a21a8fc0cb649b3c94dfc16e45071dea19" "checksum crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "24ce9782d4d5c53674646a6a4c1863a21a8fc0cb649b3c94dfc16e45071dea19"
"checksum crunchy 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "a2f4a431c5c9f662e1200b7c7f02c34e91361150e382089a8f2dec3ba680cbda" "checksum crunchy 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "a2f4a431c5c9f662e1200b7c7f02c34e91361150e382089a8f2dec3ba680cbda"
"checksum difference 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3304d19798a8e067e48d8e69b2c37f0b5e9b4e462504ad9e27e9f3fce02bba8" "checksum difference 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3304d19798a8e067e48d8e69b2c37f0b5e9b4e462504ad9e27e9f3fce02bba8"
@@ -2266,15 +2348,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum mime 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e2e00e17be181010a91dbfefb01660b17311059dc8c7f48b9017677721e732bd" "checksum mime 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e2e00e17be181010a91dbfefb01660b17311059dc8c7f48b9017677721e732bd"
"checksum mio 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)" = "7da01a5e23070d92d99b1ecd1cd0af36447c6fd44b0fe283c2db199fa136724f" "checksum mio 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)" = "7da01a5e23070d92d99b1ecd1cd0af36447c6fd44b0fe283c2db199fa136724f"
"checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919"
"checksum multiqueue 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4059673f3516669cbf7ebb448cb37171559ed22e6d8bc79cf0cf9394cf9e73fd"
"checksum net2 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)" = "3a80f842784ef6c9a958b68b7516bc7e35883c614004dd94959a4dca1b716c09" "checksum net2 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)" = "3a80f842784ef6c9a958b68b7516bc7e35883c614004dd94959a4dca1b716c09"
"checksum nodrop 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "9a2228dca57108069a5262f2ed8bd2e82496d2e074a06d1ccc7ce1687b6ae0a2" "checksum nodrop 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "9a2228dca57108069a5262f2ed8bd2e82496d2e074a06d1ccc7ce1687b6ae0a2"
"checksum num-traits 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "9936036cc70fe4a8b2d338ab665900323290efb03983c86cbe235ae800ad8017" "checksum num-traits 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "9936036cc70fe4a8b2d338ab665900323290efb03983c86cbe235ae800ad8017"
"checksum num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c51a3322e4bca9d212ad9a158a02abc6934d005490c054a2778df73a70aa0a30" "checksum num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c51a3322e4bca9d212ad9a158a02abc6934d005490c054a2778df73a70aa0a30"
"checksum odds 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "4eae0151b9dacf24fcc170d9995e511669a082856a91f958a2fe380bfab3fb22" "checksum odds 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "4eae0151b9dacf24fcc170d9995e511669a082856a91f958a2fe380bfab3fb22"
"checksum ole32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5d2c49021782e5233cd243168edfa8037574afed4eba4bbaf538b3d8d1789d8c" "checksum ole32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5d2c49021782e5233cd243168edfa8037574afed4eba4bbaf538b3d8d1789d8c"
"checksum owning_ref 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "9d52571ddcb42e9c900c901a18d8d67e393df723fcd51dd59c5b1a85d0acb6cc"
"checksum owning_ref 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37" "checksum owning_ref 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37"
"checksum parity-wasm 0.15.4 (registry+https://github.com/rust-lang/crates.io-index)" = "235801e9531998c4bb307f4ea6833c9f40a4cf132895219ac8c2cd25a9b310f7" "checksum parity-wasm 0.15.4 (registry+https://github.com/rust-lang/crates.io-index)" = "235801e9531998c4bb307f4ea6833c9f40a4cf132895219ac8c2cd25a9b310f7"
"checksum parity-wordlist 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d0dec124478845b142f68b446cbee953d14d4b41f1bc0425024417720dce693" "checksum parity-wordlist 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d0dec124478845b142f68b446cbee953d14d4b41f1bc0425024417720dce693"
"checksum parking_lot 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fa12d706797d42551663426a45e2db2e0364bd1dbf6aeada87e89c5f981f43e9"
"checksum parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "149d8f5b97f3c1133e3cfcd8886449959e856b557ff281e292b733d7c69e005e" "checksum parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "149d8f5b97f3c1133e3cfcd8886449959e856b557ff281e292b733d7c69e005e"
"checksum parking_lot 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3e7f7c9857874e54afeb950eebeae662b1e51a2493666d2ea4c0a5d91dcf0412" "checksum parking_lot 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3e7f7c9857874e54afeb950eebeae662b1e51a2493666d2ea4c0a5d91dcf0412"
"checksum parking_lot_core 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "9f35048d735bb93dd115a0030498785971aab3234d311fbe273d020084d26bd8" "checksum parking_lot_core 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "9f35048d735bb93dd115a0030498785971aab3234d311fbe273d020084d26bd8"
@@ -2319,6 +2404,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23"
"checksum slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fdeff4cd9ecff59ec7e3744cbca73dfe5ac35c2aedb2cfba8a1c715a18912e9d" "checksum slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fdeff4cd9ecff59ec7e3744cbca73dfe5ac35c2aedb2cfba8a1c715a18912e9d"
"checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" "checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013"
"checksum smallvec 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8266519bc1d17d0b5b16f6c21295625d562841c708f6376f49028a43e9c11e"
"checksum smallvec 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ee4f357e8cd37bf8822e1b964e96fd39e2cb5a0424f8aaa284ccaccc2162411c" "checksum smallvec 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ee4f357e8cd37bf8822e1b964e96fd39e2cb5a0424f8aaa284ccaccc2162411c"
"checksum smallvec 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44db0ecb22921ef790d17ae13a3f6d15784183ff5f2a01aa32098c7498d2b4b9" "checksum smallvec 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44db0ecb22921ef790d17ae13a3f6d15784183ff5f2a01aa32098c7498d2b4b9"
"checksum snappy 0.1.0 (git+https://github.com/paritytech/rust-snappy)" = "<none>" "checksum snappy 0.1.0 (git+https://github.com/paritytech/rust-snappy)" = "<none>"
@@ -2333,6 +2419,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum tempdir 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "f73eebdb68c14bcb24aef74ea96079830e7fa7b31a6106e42ea7ee887c1e134e" "checksum tempdir 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "f73eebdb68c14bcb24aef74ea96079830e7fa7b31a6106e42ea7ee887c1e134e"
"checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096" "checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096"
"checksum textwrap 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c0b59b6b4b44d867f1370ef1bd91bfb262bf07bf0ae65c202ea2fbc16153b693" "checksum textwrap 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c0b59b6b4b44d867f1370ef1bd91bfb262bf07bf0ae65c202ea2fbc16153b693"
"checksum thread-id 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2af4d6289a69a35c4d3aea737add39685f2784122c28119a7713165a63d68c9d"
"checksum thread_local 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "279ef31c19ededf577bfd12dfae728040a21f635b06a24cd670ff510edd38963" "checksum thread_local 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "279ef31c19ededf577bfd12dfae728040a21f635b06a24cd670ff510edd38963"
"checksum time 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)" = "a15375f1df02096fb3317256ce2cee6a1f42fc84ea5ad5fc8c421cfe40c73098" "checksum time 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)" = "a15375f1df02096fb3317256ce2cee6a1f42fc84ea5ad5fc8c421cfe40c73098"
"checksum tiny-keccak 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3e9241752647ca572f12c9b520a5d360d9099360c527770647e694001646a1d0" "checksum tiny-keccak 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3e9241752647ca572f12c9b520a5d360d9099360c527770647e694001646a1d0"
+1
View File
@@ -24,6 +24,7 @@ members = [
"polkadot/statement-table", "polkadot/statement-table",
"polkadot/transaction-pool", "polkadot/transaction-pool",
"polkadot/validator", "polkadot/validator",
"polkadot/service",
"substrate/bft", "substrate/bft",
"substrate/client", "substrate/client",
"substrate/codec", "substrate/codec",
+2 -1
View File
@@ -41,6 +41,7 @@ extern crate log;
pub mod error; pub mod error;
use std::sync::Arc;
use codec::Slicable; use codec::Slicable;
use demo_runtime::genesismap::{additional_storage_with_genesis, GenesisConfig}; use demo_runtime::genesismap::{additional_storage_with_genesis, GenesisConfig};
use client::genesis; use client::genesis;
@@ -98,7 +99,7 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
storage.extend(additional_storage_with_genesis(&block)); storage.extend(additional_storage_with_genesis(&block));
(primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) (primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
}; };
let client = client::new_in_mem(executor, prepare_genesis)?; let client = Arc::new(client::new_in_mem(executor, prepare_genesis)?);
let address = "127.0.0.1:9933".parse().unwrap(); let address = "127.0.0.1:9933".parse().unwrap();
let handler = rpc::rpc_handler(client); let handler = rpc::rpc_handler(client);
+1 -1
View File
@@ -23,4 +23,4 @@ substrate-rpc-servers = { path = "../../substrate/rpc-servers" }
polkadot-primitives = { path = "../primitives" } polkadot-primitives = { path = "../primitives" }
polkadot-executor = { path = "../executor" } polkadot-executor = { path = "../executor" }
polkadot-runtime = { path = "../runtime" } polkadot-runtime = { path = "../runtime" }
polkadot-keystore = { path = "../keystore" } polkadot-service = { path = "../service" }
+1 -5
View File
@@ -22,15 +22,11 @@ error_chain! {
foreign_links { foreign_links {
Io(::std::io::Error) #[doc="IO error"]; Io(::std::io::Error) #[doc="IO error"];
Cli(::clap::Error) #[doc="CLI error"]; Cli(::clap::Error) #[doc="CLI error"];
Service(::service::Error) #[doc="Polkadot service error"];
} }
links { links {
Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"]; Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"];
} }
errors { errors {
/// Key store errors
Keystore(e: ::keystore::Error) {
description("Keystore error"),
display("Keystore error: {:?}", e),
}
} }
} }
+19 -45
View File
@@ -30,10 +30,8 @@ extern crate substrate_rpc_servers as rpc;
extern crate polkadot_primitives; extern crate polkadot_primitives;
extern crate polkadot_executor; extern crate polkadot_executor;
extern crate polkadot_runtime; extern crate polkadot_runtime;
extern crate polkadot_keystore as keystore; extern crate polkadot_service as service;
#[macro_use]
extern crate hex_literal;
#[macro_use] #[macro_use]
extern crate clap; extern crate clap;
#[macro_use] #[macro_use]
@@ -45,11 +43,6 @@ pub mod error;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use codec::Slicable;
use polkadot_runtime::genesismap::{additional_storage_with_genesis, GenesisConfig};
use client::genesis;
use keystore::Store as Keystore;
/// Parse command line arguments and start the node. /// Parse command line arguments and start the node.
/// ///
/// IANA unassigned port ranges that we could use: /// IANA unassigned port ranges that we could use:
@@ -69,52 +62,33 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
let log_pattern = matches.value_of("log").unwrap_or(""); let log_pattern = matches.value_of("log").unwrap_or("");
init_logger(log_pattern); init_logger(log_pattern);
// Create client let mut config = service::Configuration::default();
let executor = polkadot_executor::Executor::new();
let mut storage = Default::default();
let god_key = hex!["3d866ec8a9190c8343c2fc593d21d8a6d0c5c4763aaab2349de3a6111d64d124"];
let genesis_config = GenesisConfig { config.keystore_path = matches.value_of("keystore")
validators: vec![god_key.clone()],
authorities: vec![god_key.clone()],
balances: vec![(god_key.clone(), 1u64 << 63)].into_iter().collect(),
block_time: 5, // 5 second block time.
session_length: 720, // that's 1 hour per session.
sessions_per_era: 24, // 24 hours per era.
bonding_duration: 90, // 90 days per bond.
approval_ratio: 667, // 66.7% approvals required for legislation.
};
let prepare_genesis = || {
storage = genesis_config.genesis_map();
let block = genesis::construct_genesis_block(&storage);
storage.extend(additional_storage_with_genesis(&block));
(primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
};
let keystore_path = matches.value_of("keystore")
.map(|x| Path::new(x).to_owned()) .map(|x| Path::new(x).to_owned())
.unwrap_or_else(default_keystore_path); .unwrap_or_else(default_keystore_path)
.to_string_lossy()
let _keystore = Keystore::open(keystore_path).map_err(::error::ErrorKind::Keystore)?; .into();
let client = client::new_in_mem(executor, prepare_genesis)?;
let address = "127.0.0.1:9933".parse().unwrap();
let handler = rpc::rpc_handler(client);
let server = rpc::start_http(&address, handler)?;
let mut role = service::Role::FULL;
if let Some(_) = matches.subcommand_matches("collator") { if let Some(_) = matches.subcommand_matches("collator") {
info!("Starting collator."); info!("Starting collator.");
server.wait(); role = service::Role::COLLATOR;
return Ok(());
} }
else if let Some(_) = matches.subcommand_matches("validator") {
if let Some(_) = matches.subcommand_matches("validator") {
info!("Starting validator."); info!("Starting validator.");
server.wait(); role = service::Role::VALIDATOR;
return Ok(());
} }
config.roles = role;
let service = service::Service::new(config)?;
let address = "127.0.0.1:9933".parse().unwrap();
let handler = rpc::rpc_handler(service.client());
let server = rpc::start_http(&address, handler)?;
server.wait();
println!("No command given.\n"); println!("No command given.\n");
let _ = clap::App::from_yaml(yaml).print_long_help(); let _ = clap::App::from_yaml(yaml).print_long_help();
+5
View File
@@ -18,3 +18,8 @@ polkadot-transaction-pool = { path = "../transaction-pool" }
substrate-bft = { path = "../../substrate/bft" } substrate-bft = { path = "../../substrate/bft" }
substrate-codec = { path = "../../substrate/codec" } substrate-codec = { path = "../../substrate/codec" }
substrate-primitives = { path = "../../substrate/primitives" } substrate-primitives = { path = "../../substrate/primitives" }
substrate-network = { path = "../../substrate/network" }
tokio-core = "0.1.12"
substrate-keyring = { path = "../../substrate/keyring" }
substrate-client = { path = "../../substrate/client" }
@@ -48,6 +48,10 @@ error_chain! {
::MAX_TRANSACTIONS_SIZE, ::MAX_TRANSACTIONS_SIZE.saturating_sub(*size) ::MAX_TRANSACTIONS_SIZE, ::MAX_TRANSACTIONS_SIZE.saturating_sub(*size)
), ),
} }
Executor(e: ::futures::future::ExecuteErrorKind) {
description("Unable to dispatch agreement future"),
display("Unable to dispatch agreement future: {:?}", e),
}
} }
} }
+8 -2
View File
@@ -41,10 +41,14 @@ extern crate polkadot_transaction_pool as transaction_pool;
extern crate substrate_bft as bft; extern crate substrate_bft as bft;
extern crate substrate_codec as codec; extern crate substrate_codec as codec;
extern crate substrate_primitives as primitives; extern crate substrate_primitives as primitives;
extern crate substrate_network;
extern crate tokio_core;
extern crate substrate_keyring;
extern crate substrate_client as client;
#[macro_use] #[macro_use]
extern crate error_chain; extern crate error_chain;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
@@ -67,8 +71,10 @@ use futures::future;
use parking_lot::Mutex; use parking_lot::Mutex;
pub use self::error::{ErrorKind, Error}; pub use self::error::{ErrorKind, Error};
pub use service::Service;
mod error; mod error;
mod service;
// block size limit. // block size limit.
const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024; const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
@@ -83,7 +89,7 @@ pub trait TableRouter {
type FetchExtrinsic: IntoFuture<Item=Extrinsic,Error=Self::Error>; type FetchExtrinsic: IntoFuture<Item=Extrinsic,Error=Self::Error>;
/// Note local candidate data. /// Note local candidate data.
fn local_candidate_data(&self, block_data: BlockData, extrinsic: Extrinsic); fn local_candidate_data(&self, hash: Hash, block_data: BlockData, extrinsic: Extrinsic);
/// Fetch block data for a specific candidate. /// Fetch block data for a specific candidate.
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate; fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate;
+233
View File
@@ -0,0 +1,233 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Consensus service.
/// Consensus service. A long runnung service that manages BFT agreement and parachain
/// candidate agreement over the network.
use std::thread;
use std::sync::Arc;
use futures::{future, Future, Stream, Sink, Async, Canceled};
use parking_lot::Mutex;
use substrate_network as net;
use tokio_core::reactor;
use client::BlockchainEvents;
use substrate_keyring::Keyring;
use primitives::{Hash, AuthorityId};
use primitives::block::{Id as BlockId, HeaderHash, Header};
use polkadot_primitives::parachain::{BlockData, Extrinsic, CandidateReceipt};
use polkadot_api::PolkadotApi;
use bft::{self, BftService};
use transaction_pool::TransactionPool;
use ed25519;
use super::{TableRouter, SharedTable, ProposerFactory};
use error::Error;
struct BftSink<E> {
network: Arc<net::ConsensusService>,
_e: ::std::marker::PhantomData<E>,
}
fn process_message(msg: net::BftMessage, authorities: &[AuthorityId], parent_hash: HeaderHash) -> Result<bft::Communication, bft::Error> {
Ok(match msg {
net::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c {
net::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({
let proposal = bft::generic::LocalizedProposal {
round_number: proposal.round_number as usize,
proposal: proposal.proposal,
digest: proposal.digest,
sender: proposal.sender,
digest_signature: ed25519::LocalizedSignature {
signature: proposal.digest_signature,
signer: ed25519::Public(proposal.sender),
},
full_signature: ed25519::LocalizedSignature {
signature: proposal.full_signature,
signer: ed25519::Public(proposal.sender),
}
};
bft::check_proposal(authorities, &parent_hash, &proposal)?;
proposal
}),
net::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({
let vote = bft::generic::LocalizedVote {
sender: vote.sender,
signature: ed25519::LocalizedSignature {
signature: vote.signature,
signer: ed25519::Public(vote.sender),
},
vote: match vote.vote {
net::ConsensusVote::Prepare(r, h) => bft::generic::Vote::Prepare(r as usize, h),
net::ConsensusVote::Commit(r, h) => bft::generic::Vote::Commit(r as usize, h),
net::ConsensusVote::AdvanceRound(r) => bft::generic::Vote::AdvanceRound(r as usize),
}
};
bft::check_vote(authorities, &parent_hash, &vote)?;
vote
}),
}),
net::BftMessage::Auxiliary(a) => {
let justification = bft::UncheckedJustification::from(a);
// TODO: get proper error
let justification: Result<_, bft::Error> = bft::check_prepare_justification(authorities, parent_hash, justification)
.map_err(|_| bft::ErrorKind::InvalidJustification.into());
bft::generic::Communication::Auxiliary(justification?)
},
})
}
impl<E> Sink for BftSink<E> {
type SinkItem = bft::Communication;
// TODO: replace this with the ! type when that's stabilized
type SinkError = E;
fn start_send(&mut self, message: bft::Communication) -> ::futures::StartSend<bft::Communication, E> {
let network_message = match message {
bft::generic::Communication::Consensus(c) => net::BftMessage::Consensus(match c {
bft::generic::LocalizedMessage::Propose(proposal) => net::SignedConsensusMessage::Propose(net::SignedConsensusProposal {
round_number: proposal.round_number as u32,
proposal: proposal.proposal,
digest: proposal.digest,
sender: proposal.sender,
digest_signature: proposal.digest_signature.signature,
full_signature: proposal.full_signature.signature,
}),
bft::generic::LocalizedMessage::Vote(vote) => net::SignedConsensusMessage::Vote(net::SignedConsensusVote {
sender: vote.sender,
signature: vote.signature.signature,
vote: match vote.vote {
bft::generic::Vote::Prepare(r, h) => net::ConsensusVote::Prepare(r as u32, h),
bft::generic::Vote::Commit(r, h) => net::ConsensusVote::Commit(r as u32, h),
bft::generic::Vote::AdvanceRound(r) => net::ConsensusVote::AdvanceRound(r as u32),
}
}),
}),
bft::generic::Communication::Auxiliary(justification) => net::BftMessage::Auxiliary(justification.uncheck().into()),
};
self.network.send_bft_message(network_message);
Ok(::futures::AsyncSink::Ready)
}
fn poll_complete(&mut self) -> ::futures::Poll<(), E> {
Ok(Async::Ready(()))
}
}
/// Consensus service. Starts working when created.
pub struct Service {
thread: Option<thread::JoinHandle<()>>,
}
struct Network(Arc<net::ConsensusService>);
impl Service {
/// Create and start a new instance.
pub fn new<C>(client: Arc<C>, network: Arc<net::ConsensusService>, transaction_pool: Arc<Mutex<TransactionPool>>, best_header: &Header) -> Service
where C: BlockchainEvents + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static
{
let best_header = best_header.clone();
let thread = thread::spawn(move || {
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
let key = Arc::new(Keyring::One.into());
let factory = ProposerFactory {
client: client.clone(),
transaction_pool: transaction_pool.clone(),
network: Network(network.clone()),
};
let bft_service = BftService::new(client.clone(), key, factory);
let build_bft = |header: &Header| -> Result<_, Error> {
let hash = header.hash();
let authorities = client.authorities(&BlockId::Hash(hash))?;
let input = network.bft_messages()
.filter_map(move |message| {
process_message(message, &authorities, hash.clone())
.map_err(|e| debug!("Message validation failed: {:?}", e))
.ok()
})
.map_err(|_| bft::InputStreamConcluded.into());
let output = BftSink { network: network.clone(), _e: Default::default() };
Ok(bft_service.build_upon(&header, input, output)?)
};
// Kickstart BFT agreement on start.
if let Err(e) = build_bft(&best_header)
.map_err(|e| debug!("Error creating initial BFT agreement: {:?}", e))
.and_then(|bft| core.run(bft))
{
debug!("Error starting initial BFT agreement: {:?}", e);
}
let bft = client.import_notification_stream().and_then(|notification| {
build_bft(&notification.header).map_err(|e| debug!("BFT agreement error: {:?}", e))
}).for_each(|f| f);
if let Err(e) = core.run(bft) {
debug!("BFT event loop error {:?}", e);
}
});
Service {
thread: Some(thread)
}
}
}
impl Drop for Service {
fn drop(&mut self) {
if let Some(thread) = self.thread.take() {
thread.join().expect("The service thread has panicked");
}
}
}
impl super::Network for Network {
type TableRouter = Router;
fn table_router(&self, _table: Arc<SharedTable>) -> Self::TableRouter {
Router {
network: self.0.clone()
}
}
}
type FetchCandidateAdapter = future::Map<net::FetchFuture, fn(Vec<u8>) -> BlockData>;
struct Router {
network: Arc<net::ConsensusService>,
}
impl Router {
fn fetch_candidate_adapter(data: Vec<u8>) -> BlockData {
BlockData(data)
}
}
impl TableRouter for Router {
type Error = Canceled;
type FetchCandidate = FetchCandidateAdapter;
type FetchExtrinsic = future::FutureResult<Extrinsic, Self::Error>;
fn local_candidate_data(&self, hash: Hash, block_data: BlockData, _extrinsic: Extrinsic) {
let data = block_data.0;
self.network.set_local_candidate(Some((hash, data)))
}
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate {
let hash = candidate.hash();
self.network.fetch_candidate(&hash).map(Self::fetch_candidate_adapter)
}
fn fetch_extrinsic_data(&self, _candidate: &CandidateReceipt) -> Self::FetchExtrinsic {
future::ok(Extrinsic)
}
}
+26
View File
@@ -0,0 +1,26 @@
[package]
name = "polkadot-service"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
futures = "0.1.17"
parking_lot = "0.4"
tokio-timer = "0.1.2"
ed25519 = { path = "../../substrate/ed25519" }
error-chain = "0.11"
log = "0.4"
tokio-core = "0.1.12"
polkadot-primitives = { path = "../primitives" }
polkadot-runtime = { path = "../runtime" }
polkadot-consensus = { path = "../consensus" }
polkadot-executor = { path = "../executor" }
polkadot-api = { path = "../api" }
polkadot-transaction-pool = { path = "../transaction-pool" }
polkadot-keystore = { path = "../keystore" }
substrate-primitives = { path = "../../substrate/primitives" }
substrate-network = { path = "../../substrate/network" }
substrate-client = { path = "../../substrate/client" }
substrate-keyring = { path = "../../substrate/keyring" }
substrate-codec = { path = "../../substrate/codec" }
substrate-executor = { path = "../../substrate/executor" }
+46
View File
@@ -0,0 +1,46 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
//! Service configuration.
use transaction_pool;
pub use network::Role;
pub use network::NetworkConfiguration;
/// Service configuration.
pub struct Configuration {
/// Node roles.
pub roles: Role,
/// Transaction pool configuration.
pub transaction_pool: transaction_pool::Options,
/// Network configuration.
pub network: NetworkConfiguration,
/// Path to key files.
pub keystore_path: String,
// TODO: add more network, client, tx pool configuration options
}
impl Default for Configuration {
fn default() -> Configuration {
Configuration {
roles: Role::FULL,
transaction_pool: Default::default(),
network: Default::default(),
keystore_path: Default::default(),
}
}
}
+35
View File
@@ -0,0 +1,35 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Errors that can occur during the service operation.
use client;
use network;
error_chain! {
links {
Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"];
Network(network::error::Error, network::error::ErrorKind) #[doc="Network error"];
}
errors {
/// Key store errors
Keystore(e: ::keystore::Error) {
description("Keystore error"),
display("Keystore error: {:?}", e),
}
}
}
+216
View File
@@ -0,0 +1,216 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Polkadot service. Starts a thread that spins the network, the client and the transaction pool.
//! Manages communication between them.
extern crate futures;
extern crate ed25519;
extern crate parking_lot;
extern crate tokio_timer;
extern crate polkadot_primitives;
extern crate polkadot_runtime;
extern crate polkadot_executor;
extern crate polkadot_api;
extern crate polkadot_consensus as consensus;
extern crate polkadot_transaction_pool as transaction_pool;
extern crate polkadot_keystore as keystore;
extern crate substrate_primitives as primitives;
extern crate substrate_network as network;
extern crate substrate_codec as codec;
extern crate substrate_executor;
extern crate tokio_core;
extern crate substrate_keyring;
extern crate substrate_client as client;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;
mod error;
mod config;
use std::sync::Arc;
use std::thread;
use futures::prelude::*;
use parking_lot::Mutex;
use tokio_core::reactor::Core;
use codec::Slicable;
use primitives::block::{Id as BlockId, TransactionHash};
use transaction_pool::TransactionPool;
use substrate_keyring::Keyring;
use substrate_executor::NativeExecutor;
use polkadot_executor::Executor as LocalDispatch;
use polkadot_primitives::AccountId;
use keystore::Store as Keystore;
use polkadot_api::PolkadotApi;
use polkadot_runtime::genesismap::{additional_storage_with_genesis, GenesisConfig};
use client::{genesis, BlockchainEvents};
use client::in_mem::Backend as InMemory;
use network::ManageNetwork;
pub use self::error::{ErrorKind, Error};
pub use config::{Configuration, Role};
type Client = client::Client<InMemory, NativeExecutor<LocalDispatch>>;
/// Polkadot service.
pub struct Service {
thread: Option<thread::JoinHandle<()>>,
client: Arc<Client>,
network: Arc<network::Service>,
_consensus: Option<consensus::Service>,
}
struct TransactionPoolAdapter {
pool: Arc<Mutex<TransactionPool>>,
client: Arc<Client>,
}
impl network::TransactionPool for TransactionPoolAdapter {
fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)> {
let best_block = match self.client.info() {
Ok(info) => info.chain.best_hash,
Err(e) => {
debug!("Error getting best block: {:?}", e);
return Vec::new();
}
};
let id = self.client.check_id(BlockId::Hash(best_block)).expect("Best block is always valid; qed.");
let ready = transaction_pool::Ready::create(id, &*self.client);
self.pool.lock().pending(ready).map(|t| {
let hash = ::primitives::Hash::from(&t.hash()[..]);
let tx = codec::Slicable::encode(t.as_transaction());
(hash, tx)
}).collect()
}
fn import(&self, transaction: &[u8]) -> Option<TransactionHash> {
if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) {
match self.pool.lock().import(tx) {
Ok(t) => Some(t.hash()[..].into()),
Err(e) => match *e.kind() {
transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()),
_ => {
debug!("Error adding transaction to the pool: {:?}", e);
None
},
}
}
} else {
debug!("Error decoding transaction");
None
}
}
}
impl Service {
/// Creates and register protocol with the network service
pub fn new(config: Configuration) -> Result<Service, error::Error> {
// Create client
let executor = polkadot_executor::Executor::new();
let mut storage = Default::default();
let key: AccountId = Keyring::One.into();
let genesis_config = GenesisConfig {
validators: vec![key.clone()],
authorities: vec![key.clone()],
balances: vec![(Keyring::One.into(), 1u64 << 63), (Keyring::Two.into(), 1u64 << 63)].into_iter().collect(),
block_time: 5, // 5 second block time.
session_length: 720, // that's 1 hour per session.
sessions_per_era: 24, // 24 hours per era.
bonding_duration: 90, // 90 days per bond.
approval_ratio: 667, // 66.7% approvals required for legislation.
};
let prepare_genesis = || {
storage = genesis_config.genesis_map();
let block = genesis::construct_genesis_block(&storage);
storage.extend(additional_storage_with_genesis(&block));
(primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
};
let _keystore = Keystore::open(config.keystore_path.into()).map_err(::error::ErrorKind::Keystore)?;
let client = Arc::new(client::new_in_mem(executor, prepare_genesis)?);
let best_header = client.header(&BlockId::Hash(client.info()?.chain.best_hash))?.expect("Best header always exists; qed");
info!("Starting Polkadot. Best block is #{}", best_header.number);
let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool)));
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
pool: transaction_pool.clone(),
client: client.clone(),
});
let network_params = network::Params {
config: network::ProtocolConfig {
roles: config.roles,
},
network_config: config.network,
chain: client.clone(),
transaction_pool: transaction_pool_adapter,
};
let network = network::Service::new(network_params)?;
// Spin consensus service if configured
let consensus_service = if config.roles & Role::VALIDATOR == Role::VALIDATOR {
Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool, &best_header))
} else {
None
};
let thread_client = client.clone();
let thread_network = network.clone();
let thread = thread::spawn(move || {
thread_network.start_network();
let mut core = Core::new().expect("tokio::Core could not be created");
let events = thread_client.import_notification_stream().for_each(|notification| {
thread_network.on_block_imported(&notification.header);
Ok(())
});
if let Err(e) = core.run(events) {
debug!("Polkadot service event loop shutdown with {:?}", e);
}
debug!("Polkadot service shutdown");
});
Ok(Service {
thread: Some(thread),
client: client.clone(),
network: network.clone(),
_consensus: consensus_service,
})
}
/// Get shared client instance.
pub fn client(&self) -> Arc<Client> {
self.client.clone()
}
/// Get shared network instance.
pub fn network(&self) -> Arc<network::Service> {
self.network.clone()
}
}
impl Drop for Service {
fn drop(&mut self) {
self.client.stop_notifications();
self.network.stop_network();
if let Some(thread) = self.thread.take() {
thread.join().expect("The service thread has panicked");
}
}
}
+13 -1
View File
@@ -60,6 +60,11 @@ error_chain! {
description("Transaction had bad signature."), description("Transaction had bad signature."),
display("Transaction had bad signature."), display("Transaction had bad signature."),
} }
/// Attempted to queue a transaction that is already in the pool.
AlreadyImported(hash: TransactionHash) {
description("Transaction is already in the pool."),
display("Transaction {:?} is already in the pool.", hash),
}
/// Import error. /// Import error.
Import(err: Box<::std::error::Error + Send>) { Import(err: Box<::std::error::Error + Send>) {
description("Error importing transaction"), description("Error importing transaction"),
@@ -257,8 +262,15 @@ impl TransactionPool {
let verified = VerifiedTransaction::create(tx, insertion_index)?; let verified = VerifiedTransaction::create(tx, insertion_index)?;
// TODO: just use a foreign link when the error type is made public. // TODO: just use a foreign link when the error type is made public.
let hash = verified.hash.clone();
self.inner.import(verified) self.inner.import(verified)
.map_err(|e| ErrorKind::Import(Box::new(e))) .map_err(|e|
match e {
// TODO: make error types public in transaction_pool. For now just treat all errors as AlradyImported
_ => ErrorKind::AlreadyImported(hash),
// transaction_pool::error::AlreadyImported(h) => ErrorKind::AlreadyImported(h),
// e => ErrorKind::Import(Box::new(e)),
})
.map_err(Into::into) .map_err(Into::into)
} }
+16 -4
View File
@@ -42,10 +42,22 @@ error_chain! {
display("Unable to create block proposal."), display("Unable to create block proposal."),
} }
/// Error dispatching the agreement future onto the executor. /// Error checking signature
Executor(e: ::futures::future::ExecuteErrorKind) { InvalidSignature(s: ::ed25519::Signature, a: ::primitives::AuthorityId) {
description("Unable to dispatch agreement future"), description("Message signature is invalid"),
display("Unable to dispatch agreement future: {:?}", e), display("Message signature {:?} by {:?} is invalid.", s, a),
}
/// Account is not an authority.
InvalidAuthority(a: ::primitives::AuthorityId) {
description("Message sender is not a valid authority"),
display("Message sender {:?} is not a valid authority.", a),
}
/// Justification requirements not met.
InvalidJustification {
description("Invalid justification"),
display("Invalid justification."),
} }
/// Some other error. /// Some other error.
+198 -48
View File
@@ -41,8 +41,7 @@ use primitives::bft::{Message as PrimitiveMessage, Action as PrimitiveAction, Ju
use primitives::block::{Block, Id as BlockId, Header, HeaderHash}; use primitives::block::{Block, Id as BlockId, Header, HeaderHash};
use primitives::AuthorityId; use primitives::AuthorityId;
use futures::{stream, task, Async, Sink, Future, IntoFuture}; use futures::{task, Async, Stream, Sink, Future, IntoFuture};
use futures::future::Executor;
use futures::sync::oneshot; use futures::sync::oneshot;
use tokio_timer::Timer; use tokio_timer::Timer;
use parking_lot::Mutex; use parking_lot::Mutex;
@@ -219,34 +218,25 @@ impl<P: Proposer> generic::Context for BftInstance<P> {
} }
} }
type Input<E> = stream::Empty<Communication, E>;
// "black hole" output sink.
struct Output<E>(::std::marker::PhantomData<E>);
impl<E> Sink for Output<E> {
type SinkItem = Communication;
type SinkError = E;
fn start_send(&mut self, _item: Communication) -> ::futures::StartSend<Communication, E> {
Ok(::futures::AsyncSink::Ready)
}
fn poll_complete(&mut self) -> ::futures::Poll<(), E> {
Ok(Async::Ready(()))
}
}
/// A future that resolves either when canceled (witnessing a block from the network at same height) /// A future that resolves either when canceled (witnessing a block from the network at same height)
/// or when agreement completes. /// or when agreement completes.
pub struct BftFuture<P: Proposer, I> { pub struct BftFuture<P, I, InStream, OutSink> where
inner: generic::Agreement<BftInstance<P>, Input<P::Error>, Output<P::Error>>, P: Proposer,
InStream: Stream<Item=Communication, Error=P::Error>,
OutSink: Sink<SinkItem=Communication, SinkError=P::Error>,
{
inner: generic::Agreement<BftInstance<P>, InStream, OutSink>,
cancel: Arc<AtomicBool>, cancel: Arc<AtomicBool>,
send_task: Option<oneshot::Sender<task::Task>>, send_task: Option<oneshot::Sender<task::Task>>,
import: Arc<I>, import: Arc<I>,
} }
impl<P: Proposer, I: BlockImport> Future for BftFuture<P, I> { impl<P, I, InStream, OutSink> Future for BftFuture<P, I, InStream, OutSink> where
P: Proposer,
I: BlockImport,
InStream: Stream<Item=Communication, Error=P::Error>,
OutSink: Sink<SinkItem=Communication, SinkError=P::Error>,
{
type Item = (); type Item = ();
type Error = (); type Error = ();
@@ -274,7 +264,11 @@ impl<P: Proposer, I: BlockImport> Future for BftFuture<P, I> {
} }
} }
impl<P: Proposer, I> Drop for BftFuture<P, I> { impl<P, I, InStream, OutSink> Drop for BftFuture<P, I, InStream, OutSink> where
P: Proposer,
InStream: Stream<Item=Communication, Error=P::Error>,
OutSink: Sink<SinkItem=Communication, SinkError=P::Error>,
{
fn drop(&mut self) { fn drop(&mut self) {
// TODO: have a trait member to pass misbehavior reports into. // TODO: have a trait member to pass misbehavior reports into.
let misbehavior = self.inner.drain_misbehavior().collect::<Vec<_>>(); let misbehavior = self.inner.drain_misbehavior().collect::<Vec<_>>();
@@ -304,9 +298,8 @@ impl Drop for AgreementHandle {
/// The BftService kicks off the agreement process on top of any blocks it /// The BftService kicks off the agreement process on top of any blocks it
/// is notified of. /// is notified of.
pub struct BftService<P, E, I> { pub struct BftService<P, I> {
client: Arc<I>, client: Arc<I>,
executor: E,
live_agreements: Mutex<HashMap<HeaderHash, AgreementHandle>>, live_agreements: Mutex<HashMap<HeaderHash, AgreementHandle>>,
timer: Timer, timer: Timer,
round_timeout_multiplier: u64, round_timeout_multiplier: u64,
@@ -314,17 +307,33 @@ pub struct BftService<P, E, I> {
factory: P, factory: P,
} }
impl<P, E, I> BftService<P, E, I> impl<P, I> BftService<P, I>
where where
P: ProposerFactory, P: ProposerFactory,
E: Executor<BftFuture<P::Proposer, I>>,
I: BlockImport + Authorities, I: BlockImport + Authorities,
{ {
/// Create a new service instance.
pub fn new(client: Arc<I>, key: Arc<ed25519::Pair>, factory: P) -> BftService<P, I> {
BftService {
client: client,
live_agreements: Mutex::new(HashMap::new()),
timer: Timer::default(),
round_timeout_multiplier: 4,
key: key, // TODO: key changing over time.
factory: factory,
}
}
/// Signal that a valid block with the given header has been imported. /// Signal that a valid block with the given header has been imported.
/// ///
/// If the local signing key is an authority, this will begin the consensus process to build a /// If the local signing key is an authority, this will begin the consensus process to build a
/// block on top of it. If the executor fails to run the future, an error will be returned. /// block on top of it. If the executor fails to run the future, an error will be returned.
pub fn build_upon(&self, header: &Header) -> Result<(), P::Error> { pub fn build_upon<InStream, OutSink>(&self, header: &Header, input: InStream, output: OutSink)
-> Result<BftFuture<<P as ProposerFactory>::Proposer, I, InStream, OutSink>, P::Error> where
InStream: Stream<Item=Communication, Error=<<P as ProposerFactory>::Proposer as Proposer>::Error>,
OutSink: Sink<SinkItem=Communication, SinkError=<<P as ProposerFactory>::Proposer as Proposer>::Error>,
{
let hash = header.hash(); let hash = header.hash();
let mut _preempted_consensus = None; // defers drop of live to the end. let mut _preempted_consensus = None; // defers drop of live to the end.
@@ -337,7 +346,7 @@ impl<P, E, I> BftService<P, E, I>
if !authorities.contains(&local_id) { if !authorities.contains(&local_id) {
self.live_agreements.lock().remove(&header.parent_hash); self.live_agreements.lock().remove(&header.parent_hash);
return Ok(()) Err(From::from(ErrorKind::InvalidAuthority(local_id)))?;
} }
let proposer = self.factory.init(header, &authorities, self.key.clone())?; let proposer = self.factory.init(header, &authorities, self.key.clone())?;
@@ -355,25 +364,18 @@ impl<P, E, I> BftService<P, E, I>
bft_instance, bft_instance,
n, n,
max_faulty, max_faulty,
stream::empty(), input,
Output(Default::default()), output,
); );
let cancel = Arc::new(AtomicBool::new(false)); let cancel = Arc::new(AtomicBool::new(false));
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.executor.execute(BftFuture {
inner: agreement,
cancel: cancel.clone(),
send_task: Some(tx),
import: self.client.clone(),
}).map_err(|e| e.kind()).map_err(ErrorKind::Executor).map_err(Error::from)?;
{ {
let mut live = self.live_agreements.lock(); let mut live = self.live_agreements.lock();
live.insert(hash, AgreementHandle { live.insert(hash, AgreementHandle {
task: Some(rx), task: Some(rx),
cancel, cancel: cancel.clone(),
}); });
// cancel any agreements attempted to build upon this block's parent // cancel any agreements attempted to build upon this block's parent
@@ -381,7 +383,12 @@ impl<P, E, I> BftService<P, E, I>
_preempted_consensus = live.remove(&header.parent_hash); _preempted_consensus = live.remove(&header.parent_hash);
} }
Ok(()) Ok(BftFuture {
inner: agreement,
cancel: cancel,
send_task: Some(tx),
import: self.client.clone(),
})
} }
} }
@@ -391,9 +398,16 @@ pub fn max_faulty_of(n: usize) -> usize {
n.saturating_sub(1) / 3 n.saturating_sub(1) / 3
} }
/// Given a total number of authorities, yield the minimum required signatures.
/// This will always be over 2/3.
pub fn bft_threshold(n: usize) -> usize {
n - max_faulty_of(n)
}
fn check_justification_signed_message(authorities: &[AuthorityId], message: &[u8], just: UncheckedJustification) fn check_justification_signed_message(authorities: &[AuthorityId], message: &[u8], just: UncheckedJustification)
-> Result<Justification, UncheckedJustification> -> Result<Justification, UncheckedJustification>
{ {
// TODO: return additional error information.
just.check(authorities.len() - max_faulty_of(authorities.len()), |_, _, sig| { just.check(authorities.len() - max_faulty_of(authorities.len()), |_, _, sig| {
let auth_id = sig.signer.0; let auth_id = sig.signer.0;
if !authorities.contains(&auth_id) { return None } if !authorities.contains(&auth_id) { return None }
@@ -436,6 +450,58 @@ pub fn check_prepare_justification(authorities: &[AuthorityId], parent: HeaderHa
check_justification_signed_message(authorities, &message[..], just) check_justification_signed_message(authorities, &message[..], just)
} }
/// Check proposal message signatures and authority.
/// Provide all valid authorities.
pub fn check_proposal(
authorities: &[AuthorityId],
parent_hash: &HeaderHash,
propose: &::generic::LocalizedProposal<Block, HeaderHash, AuthorityId, LocalizedSignature>)
-> Result<(), Error>
{
if !authorities.contains(&propose.sender) {
return Err(ErrorKind::InvalidAuthority(propose.sender.into()).into());
}
let action_header = PrimitiveAction::ProposeHeader(propose.round_number as u32, propose.digest.clone());
let action_propose = PrimitiveAction::Propose(propose.round_number as u32, propose.proposal.clone());
check_action(action_header, parent_hash, &propose.digest_signature)?;
check_action(action_propose, parent_hash, &propose.full_signature)
}
/// Check vote message signatures and authority.
/// Provide all valid authorities.
pub fn check_vote(
authorities: &[AuthorityId],
parent_hash: &HeaderHash,
vote: &::generic::LocalizedVote<HeaderHash, AuthorityId, LocalizedSignature>)
-> Result<(), Error>
{
if !authorities.contains(&vote.sender) {
return Err(ErrorKind::InvalidAuthority(vote.sender.into()).into());
}
let action = match vote.vote {
::generic::Vote::Prepare(r, h) => PrimitiveAction::Prepare(r as u32, h),
::generic::Vote::Commit(r, h) => PrimitiveAction::Commit(r as u32, h),
::generic::Vote::AdvanceRound(r) => PrimitiveAction::AdvanceRound(r as u32),
};
check_action(action, parent_hash, &vote.signature)
}
fn check_action(action: PrimitiveAction, parent_hash: &HeaderHash, sig: &LocalizedSignature) -> Result<(), Error> {
let primitive = PrimitiveMessage {
parent: parent_hash.clone(),
action,
};
let message = Slicable::encode(&primitive);
if ed25519::verify_strong(&sig.signature, &message, &sig.signer) {
Ok(())
} else {
Err(ErrorKind::InvalidSignature(sig.signature.into(), sig.signer.clone().into()).into())
}
}
/// Sign a BFT message with the given key. /// Sign a BFT message with the given key.
pub fn sign_message(message: Message, key: &ed25519::Pair, parent_hash: HeaderHash) -> LocalizedMessage { pub fn sign_message(message: Message, key: &ed25519::Pair, parent_hash: HeaderHash) -> LocalizedMessage {
let signer = key.public(); let signer = key.public();
@@ -489,8 +555,10 @@ mod tests {
use super::*; use super::*;
use std::collections::HashSet; use std::collections::HashSet;
use primitives::block; use primitives::block;
use self::tokio_core::reactor::{Core, Handle}; use self::tokio_core::reactor::{Core};
use self::keyring::Keyring; use self::keyring::Keyring;
use futures::stream;
use futures::future::Executor;
extern crate substrate_keyring as keyring; extern crate substrate_keyring as keyring;
extern crate tokio_core; extern crate tokio_core;
@@ -512,6 +580,22 @@ mod tests {
} }
} }
// "black hole" output sink.
struct Output<E>(::std::marker::PhantomData<E>);
impl<E> Sink for Output<E> {
type SinkItem = Communication;
type SinkError = E;
fn start_send(&mut self, _item: Communication) -> ::futures::StartSend<Communication, E> {
Ok(::futures::AsyncSink::Ready)
}
fn poll_complete(&mut self) -> ::futures::Poll<(), E> {
Ok(Async::Ready(()))
}
}
struct DummyFactory; struct DummyFactory;
struct DummyProposer(block::Number); struct DummyProposer(block::Number);
@@ -543,12 +627,11 @@ mod tests {
fn import_misbehavior(&self, _misbehavior: Vec<(AuthorityId, Misbehavior)>) {} fn import_misbehavior(&self, _misbehavior: Vec<(AuthorityId, Misbehavior)>) {}
} }
fn make_service(client: FakeClient, handle: Handle) fn make_service(client: FakeClient)
-> BftService<DummyFactory, Handle, FakeClient> -> BftService<DummyFactory, FakeClient>
{ {
BftService { BftService {
client: Arc::new(client), client: Arc::new(client),
executor: handle,
live_agreements: Mutex::new(HashMap::new()), live_agreements: Mutex::new(HashMap::new()),
timer: Timer::default(), timer: Timer::default(),
round_timeout_multiplier: 4, round_timeout_multiplier: 4,
@@ -578,7 +661,7 @@ mod tests {
let mut core = Core::new().unwrap(); let mut core = Core::new().unwrap();
let service = make_service(client, core.handle()); let service = make_service(client);
let first = Header::from_block_number(2); let first = Header::from_block_number(2);
let first_hash = first.hash(); let first_hash = first.hash();
@@ -587,16 +670,18 @@ mod tests {
second.parent_hash = first_hash; second.parent_hash = first_hash;
let second_hash = second.hash(); let second_hash = second.hash();
service.build_upon(&first).unwrap(); let bft = service.build_upon(&first, stream::empty(), Output(Default::default())).unwrap();
assert!(service.live_agreements.lock().contains_key(&first_hash)); assert!(service.live_agreements.lock().contains_key(&first_hash));
// turn the core so the future gets polled and sends its task to the // turn the core so the future gets polled and sends its task to the
// service. otherwise it deadlocks. // service. otherwise it deadlocks.
core.handle().execute(bft).unwrap();
core.turn(Some(::std::time::Duration::from_millis(100))); core.turn(Some(::std::time::Duration::from_millis(100)));
service.build_upon(&second).unwrap(); let bft = service.build_upon(&second, stream::empty(), Output(Default::default())).unwrap();
assert!(!service.live_agreements.lock().contains_key(&first_hash)); assert!(!service.live_agreements.lock().contains_key(&first_hash));
assert!(service.live_agreements.lock().contains_key(&second_hash)); assert!(service.live_agreements.lock().contains_key(&second_hash));
core.handle().execute(bft).unwrap();
core.turn(Some(::std::time::Duration::from_millis(100))); core.turn(Some(::std::time::Duration::from_millis(100)));
} }
@@ -671,4 +756,69 @@ mod tests {
assert!(check_justification(&authorities, parent_hash, unchecked).is_err()); assert!(check_justification(&authorities, parent_hash, unchecked).is_err());
} }
#[test]
fn propose_check_works() {
let parent_hash = Default::default();
let authorities = vec![
Keyring::Alice.to_raw_public(),
Keyring::Eve.to_raw_public(),
];
let block = Block {
header: Header::from_block_number(1),
transactions: Default::default()
};
let proposal = sign_message(::generic::Message::Propose(1, block.clone()), &Keyring::Alice.pair(), parent_hash);;
if let ::generic::LocalizedMessage::Propose(proposal) = proposal {
assert!(check_proposal(&authorities, &parent_hash, &proposal).is_ok());
let mut invalid_round = proposal.clone();
invalid_round.round_number = 0;
assert!(check_proposal(&authorities, &parent_hash, &invalid_round).is_err());
let mut invalid_digest = proposal.clone();
invalid_digest.digest = [0xfe; 32].into();
assert!(check_proposal(&authorities, &parent_hash, &invalid_digest).is_err());
} else {
assert!(false);
}
// Not an authority
let proposal = sign_message(::generic::Message::Propose(1, block), &Keyring::Bob.pair(), parent_hash);;
if let ::generic::LocalizedMessage::Propose(proposal) = proposal {
assert!(check_proposal(&authorities, &parent_hash, &proposal).is_err());
} else {
assert!(false);
}
}
#[test]
fn vote_check_works() {
let parent_hash = Default::default();
let hash = [0xff; 32].into();
let authorities = vec![
Keyring::Alice.to_raw_public(),
Keyring::Eve.to_raw_public(),
];
let vote = sign_message(::generic::Message::Vote(::generic::Vote::Prepare(1, hash)), &Keyring::Alice.pair(), parent_hash);;
if let ::generic::LocalizedMessage::Vote(vote) = vote {
assert!(check_vote(&authorities, &parent_hash, &vote).is_ok());
let mut invalid_sender = vote.clone();
invalid_sender.signature.signer = Keyring::Eve.into();
assert!(check_vote(&authorities, &parent_hash, &invalid_sender).is_err());
} else {
assert!(false);
}
// Not an authority
let vote = sign_message(::generic::Message::Vote(::generic::Vote::Prepare(1, hash)), &Keyring::Bob.pair(), parent_hash);;
if let ::generic::LocalizedMessage::Vote(vote) = vote {
assert!(check_vote(&authorities, &parent_hash, &vote).is_err());
} else {
assert!(false);
}
}
} }
+1
View File
@@ -9,6 +9,7 @@ log = "0.3"
parking_lot = "0.4" parking_lot = "0.4"
triehash = "0.1" triehash = "0.1"
hex-literal = "0.1" hex-literal = "0.1"
multiqueue = "0.3"
ed25519 = { path = "../ed25519" } ed25519 = { path = "../ed25519" }
substrate-bft = { path = "../bft" } substrate-bft = { path = "../bft" }
substrate-codec = { path = "../codec" } substrate-codec = { path = "../codec" }
+83 -5
View File
@@ -16,6 +16,8 @@
//! Substrate Client //! Substrate Client
use multiqueue;
use parking_lot::Mutex;
use primitives::{self, block, AuthorityId}; use primitives::{self, block, AuthorityId};
use primitives::block::Id as BlockId; use primitives::block::Id as BlockId;
use primitives::storage::{StorageKey, StorageData}; use primitives::storage::{StorageKey, StorageData};
@@ -27,11 +29,24 @@ use backend::{self, BlockImportOperation};
use blockchain::{self, Info as ChainInfo, Backend as ChainBackend}; use blockchain::{self, Info as ChainInfo, Backend as ChainBackend};
use {error, in_mem, block_builder, runtime_io, bft}; use {error, in_mem, block_builder, runtime_io, bft};
/// Type that implements `futures::Stream` of block import events.
pub type BlockchainEventStream = multiqueue::BroadcastFutReceiver<BlockImportNotification>;
//TODO: The queue is preallocated in multiqueue. Make it unbounded
const NOTIFICATION_QUEUE_SIZE: u64 = 1 << 16;
/// Polkadot Client /// Polkadot Client
#[derive(Debug)]
pub struct Client<B, E> where B: backend::Backend { pub struct Client<B, E> where B: backend::Backend {
backend: B, backend: B,
executor: E, executor: E,
import_notification_sink: Mutex<multiqueue::BroadcastFutSender<BlockImportNotification>>,
import_notification_stream: Mutex<multiqueue::BroadcastFutReceiver<BlockImportNotification>>,
}
/// A source of blockchain evenets.
pub trait BlockchainEvents {
/// Get block import event stream.
fn import_notification_stream(&self) -> BlockchainEventStream;
} }
/// Client info /// Client info
@@ -82,6 +97,34 @@ pub enum BlockStatus {
Unknown, Unknown,
} }
/// Block data origin.
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BlockOrigin {
/// Genesis block built into the client.
Genesis,
/// Block is part of the initial sync with the network.
NetworkInitialSync,
/// Block was broadcasted on the network.
NetworkBroadcast,
/// Block that was received from the network and validated in the consensus process.
ConsensusBroadcast,
/// Block that was collated by this node.
Own,
/// Block was imported from a file.
File,
}
/// Summary of an imported block
#[derive(Clone, Debug)]
pub struct BlockImportNotification {
/// Imported block origin.
pub origin: BlockOrigin,
/// Imported block header.
pub header: block::Header,
/// Is this the new best block.
pub is_new_best: bool,
}
/// A header paired with a justification which has already been checked. /// A header paired with a justification which has already been checked.
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone)]
pub struct JustifiedHeader { pub struct JustifiedHeader {
@@ -122,6 +165,7 @@ impl<B, E> Client<B, E> where
where where
F: FnOnce() -> (block::Header, Vec<(Vec<u8>, Vec<u8>)>) F: FnOnce() -> (block::Header, Vec<(Vec<u8>, Vec<u8>)>)
{ {
let (sink, stream) = multiqueue::broadcast_fut_queue(NOTIFICATION_QUEUE_SIZE);
if backend.blockchain().header(BlockId::Number(0))?.is_none() { if backend.blockchain().header(BlockId::Number(0))?.is_none() {
trace!("Empty database, writing genesis block"); trace!("Empty database, writing genesis block");
let (genesis_header, genesis_store) = build_genesis(); let (genesis_header, genesis_store) = build_genesis();
@@ -133,6 +177,8 @@ impl<B, E> Client<B, E> where
Ok(Client { Ok(Client {
backend, backend,
executor, executor,
import_notification_sink: Mutex::new(sink),
import_notification_stream: Mutex::new(stream),
}) })
} }
@@ -164,6 +210,13 @@ impl<B, E> Client<B, E> where
self.executor.clone() self.executor.clone()
} }
/// Close notification streams.
pub fn stop_notifications(&self) {
let (sink, stream) = multiqueue::broadcast_fut_queue(NOTIFICATION_QUEUE_SIZE);
*self.import_notification_sink.lock() = sink;
*self.import_notification_stream.lock() = stream;
}
/// Get the current set of authorities from storage. /// Get the current set of authorities from storage.
pub fn authorities_at(&self, id: &BlockId) -> error::Result<Vec<AuthorityId>> { pub fn authorities_at(&self, id: &BlockId) -> error::Result<Vec<AuthorityId>> {
let state = self.state_at(id)?; let state = self.state_at(id)?;
@@ -236,6 +289,7 @@ impl<B, E> Client<B, E> where
/// Queue a block for import. /// Queue a block for import.
pub fn import_block( pub fn import_block(
&self, &self,
origin: BlockOrigin,
header: JustifiedHeader, header: JustifiedHeader,
body: Option<block::Body>, body: Option<block::Body>,
) -> error::Result<ImportResult> { ) -> error::Result<ImportResult> {
@@ -261,9 +315,21 @@ impl<B, E> Client<B, E> where
let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1; let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1;
trace!("Imported {}, (#{}), best={}", block::HeaderHash::from(header.blake2_256()), header.number, is_new_best); trace!("Imported {}, (#{}), best={}", block::HeaderHash::from(header.blake2_256()), header.number, is_new_best);
transaction.set_block_data(header, body, Some(justification.uncheck().into()), is_new_best)?; transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?;
transaction.set_storage(overlay.drain())?; transaction.set_storage(overlay.drain())?;
self.backend.commit_operation(transaction)?; self.backend.commit_operation(transaction)?;
if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast {
let notification = BlockImportNotification {
origin: origin,
header: header,
is_new_best: is_new_best,
};
if let Err(e) = self.import_notification_sink.lock().try_send(notification) {
warn!("Error queueing block import notification: {:?}", e);
}
}
Ok(ImportResult::Queued) Ok(ImportResult::Queued)
} }
@@ -335,7 +401,7 @@ impl<B, E> bft::BlockImport for Client<B, E>
justification, justification,
}; };
let _ = self.import_block(justified_header, Some(block.transactions)); let _ = self.import_block(BlockOrigin::Genesis, justified_header, Some(block.transactions));
} }
} }
@@ -350,6 +416,18 @@ impl<B, E> bft::Authorities for Client<B, E>
} }
} }
impl<B, E> BlockchainEvents for Client<B, E>
where
B: backend::Backend,
E: state_machine::CodeExecutor,
error::Error: From<<B::State as state_machine::backend::Backend>::Error>
{
/// Get block import event stream.
fn import_notification_stream(&self) -> BlockchainEventStream {
self.import_notification_stream.lock().add_stream()
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@@ -455,7 +533,7 @@ mod tests {
let justification = justify(&block.header); let justification = justify(&block.header);
let justified = client.check_justification(block.header, justification).unwrap(); let justified = client.check_justification(block.header, justification).unwrap();
client.import_block(justified, Some(block.transactions)).unwrap(); client.import_block(BlockOrigin::Own, justified, Some(block.transactions)).unwrap();
assert_eq!(client.info().unwrap().chain.best_number, 1); assert_eq!(client.info().unwrap().chain.best_number, 1);
assert_eq!(client.using_environment(|| test_runtime::system::latest_block_hash()).unwrap(), client.block_hash(1).unwrap().unwrap()); assert_eq!(client.using_environment(|| test_runtime::system::latest_block_hash()).unwrap(), client.block_hash(1).unwrap().unwrap());
@@ -499,7 +577,7 @@ mod tests {
let justification = justify(&block.header); let justification = justify(&block.header);
let justified = client.check_justification(block.header, justification).unwrap(); let justified = client.check_justification(block.header, justification).unwrap();
client.import_block(justified, Some(block.transactions)).unwrap(); client.import_block(BlockOrigin::Own, justified, Some(block.transactions)).unwrap();
assert_eq!(client.info().unwrap().chain.best_number, 1); assert_eq!(client.info().unwrap().chain.best_number, 1);
assert!(client.state_at(&BlockId::Number(1)).unwrap() != client.state_at(&BlockId::Number(0)).unwrap()); assert!(client.state_at(&BlockId::Number(1)).unwrap() != client.state_at(&BlockId::Number(0)).unwrap());
+3 -1
View File
@@ -31,6 +31,7 @@ extern crate ed25519;
extern crate triehash; extern crate triehash;
extern crate parking_lot; extern crate parking_lot;
extern crate multiqueue;
#[cfg(test)] #[macro_use] extern crate hex_literal; #[cfg(test)] #[macro_use] extern crate hex_literal;
#[macro_use] extern crate error_chain; #[macro_use] extern crate error_chain;
#[macro_use] extern crate log; #[macro_use] extern crate log;
@@ -43,5 +44,6 @@ pub mod genesis;
pub mod block_builder; pub mod block_builder;
mod client; mod client;
pub use client::{Client, ClientInfo, CallResult, ImportResult, BlockStatus, new_in_mem}; pub use client::{Client, ClientInfo, CallResult, ImportResult,
BlockStatus, BlockOrigin, new_in_mem, BlockchainEventStream, BlockchainEvents};
pub use blockchain::Info as ChainInfo; pub use blockchain::Info as ChainInfo;
+4
View File
@@ -16,13 +16,17 @@ bitflags = "1.0"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
futures = "0.1.17"
multiqueue = "0.3"
ethcore-network = { git = "https://github.com/paritytech/parity.git" } ethcore-network = { git = "https://github.com/paritytech/parity.git" }
ethcore-io = { git = "https://github.com/paritytech/parity.git" } ethcore-io = { git = "https://github.com/paritytech/parity.git" }
ed25519 = { path = "../../substrate/ed25519" }
substrate-primitives = { path = "../../substrate/primitives" } substrate-primitives = { path = "../../substrate/primitives" }
substrate-client = { path = "../../substrate/client" } substrate-client = { path = "../../substrate/client" }
substrate-state-machine = { path = "../../substrate/state-machine" } substrate-state-machine = { path = "../../substrate/state-machine" }
substrate-serializer = { path = "../../substrate/serializer" } substrate-serializer = { path = "../../substrate/serializer" }
substrate-runtime-support = { path = "../../substrate/runtime-support" } substrate-runtime-support = { path = "../../substrate/runtime-support" }
substrate-bft = { path = "../../substrate/bft" }
[dev-dependencies] [dev-dependencies]
substrate-test-runtime = { path = "../test-runtime" } substrate-test-runtime = { path = "../test-runtime" }
+6 -5
View File
@@ -16,15 +16,15 @@
//! Blockchain access trait //! Blockchain access trait
use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus}; use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus, BlockOrigin};
use client::error::Error; use client::error::Error;
use state_machine; use state_machine;
use primitives::block::{self, Id as BlockId}; use primitives::block::{self, Id as BlockId};
use primitives::bft::Justification; use primitives::bft::Justification;
pub trait Client: Send + Sync { pub trait Client: Send + Sync {
/// Given a hash return a header /// Import a new block. Parent is supposed to be existing in the blockchain.
fn import(&self, header: block::Header, justification: Justification, body: Option<block::Body>) -> Result<ImportResult, Error>; fn import(&self, is_best: bool, header: block::Header, justification: Justification, body: Option<block::Body>) -> Result<ImportResult, Error>;
/// Get blockchain info. /// Get blockchain info.
fn info(&self) -> Result<ClientInfo, Error>; fn info(&self) -> Result<ClientInfo, Error>;
@@ -50,10 +50,11 @@ impl<B, E> Client for PolkadotClient<B, E> where
E: state_machine::CodeExecutor + Send + Sync + 'static, E: state_machine::CodeExecutor + Send + Sync + 'static,
Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>, { Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>, {
fn import(&self, header: block::Header, justification: Justification, body: Option<block::Body>) -> Result<ImportResult, Error> { fn import(&self, is_best: bool, header: block::Header, justification: Justification, body: Option<block::Body>) -> Result<ImportResult, Error> {
// TODO: defer justification check. // TODO: defer justification check.
let justified_header = self.check_justification(header, justification.into())?; let justified_header = self.check_justification(header, justification.into())?;
(self as &PolkadotClient<B, E>).import_block(justified_header, body) let origin = if is_best { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync };
(self as &PolkadotClient<B, E>).import_block(origin, justified_header, body)
} }
fn info(&self) -> Result<ClientInfo, Error> { fn info(&self) -> Result<ClientInfo, Error> {
+2 -1
View File
@@ -14,11 +14,12 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.? // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
use service::Role; pub use service::Role;
/// Protocol configuration /// Protocol configuration
#[derive(Clone)] #[derive(Clone)]
pub struct ProtocolConfig { pub struct ProtocolConfig {
/// Assigned roles.
pub roles: Role, pub roles: Role,
} }
@@ -0,0 +1,203 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
//! Consensus related bits of the network service.
use std::collections::HashMap;
use multiqueue;
use futures::sync::oneshot;
use io::SyncIo;
use protocol::Protocol;
use network::PeerId;
use primitives::Hash;
use message::{self, Message};
use runtime_support::Hashable;
//TODO: The queue is preallocated in multiqueue. Make it unbounded
const QUEUE_SIZE: u64 = 1 << 16;
struct CandidateRequest {
id: message::RequestId,
completion: oneshot::Sender<Vec<u8>>,
}
struct PeerConsensus {
candidate_fetch: Option<CandidateRequest>,
candidate_available: Option<Hash>,
}
/// Consensus network protocol handler. Manages statements and candidate requests.
pub struct Consensus {
peers: HashMap<PeerId, PeerConsensus>,
our_candidate: Option<(Hash, Vec<u8>)>,
statement_sink: multiqueue::BroadcastFutSender<message::Statement>,
statement_stream: multiqueue::BroadcastFutReceiver<message::Statement>,
bft_message_sink: multiqueue::BroadcastFutSender<message::BftMessage>,
bft_message_stream: multiqueue::BroadcastFutReceiver<message::BftMessage>,
}
impl Consensus {
/// Create a new instance.
pub fn new() -> Consensus {
let (statement_sink, statement_stream) = multiqueue::broadcast_fut_queue(QUEUE_SIZE);
let (bft_sink, bft_stream) = multiqueue::broadcast_fut_queue(QUEUE_SIZE);
Consensus {
peers: HashMap::new(),
our_candidate: None,
statement_sink: statement_sink,
statement_stream: statement_stream,
bft_message_sink: bft_sink,
bft_message_stream: bft_stream,
}
}
/// Closes all notification streams.
pub fn restart(&mut self) {
let (statement_sink, statement_stream) = multiqueue::broadcast_fut_queue(QUEUE_SIZE);
let (bft_sink, bft_stream) = multiqueue::broadcast_fut_queue(QUEUE_SIZE);
self.statement_sink = statement_sink;
self.statement_stream = statement_stream;
self.bft_message_sink = bft_sink;
self.bft_message_stream = bft_stream;
}
/// Handle new connected peer.
pub fn new_peer(&mut self, _io: &mut SyncIo, _protocol: &Protocol, peer_id: PeerId, roles: &[message::Role]) {
if roles.iter().any(|r| *r == message::Role::Validator) {
trace!(target:"sync", "Registering validator {}", peer_id);
self.peers.insert(peer_id, PeerConsensus {
candidate_fetch: None,
candidate_available: None,
});
}
}
pub fn on_statement(&mut self, peer_id: PeerId, statement: message::Statement) {
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
// TODO: validate signature?
match &statement.statement {
&message::UnsignedStatement::Candidate(ref receipt) => peer.candidate_available = Some(Hash::from(receipt.blake2_256())),
&message::UnsignedStatement::Available(ref hash) => peer.candidate_available = Some(*hash),
&message::UnsignedStatement::Valid(_) | &message::UnsignedStatement::Invalid(_) => (),
}
if let Err(e) = self.statement_sink.try_send(statement) {
trace!(target:"sync", "Error broadcasting statement notification: {:?}", e);
}
} else {
trace!(target:"sync", "Ignored statement from unregistered peer {}", peer_id);
}
}
pub fn statements(&self) -> multiqueue::BroadcastFutReceiver<message::Statement>{
self.statement_stream.add_stream()
}
pub fn on_bft_message(&mut self, peer_id: PeerId, message: message::BftMessage) {
if self.peers.contains_key(&peer_id) {
// TODO: validate signature?
if let Err(e) = self.bft_message_sink.try_send(message) {
trace!(target:"sync", "Error broadcasting BFT message notification: {:?}", e);
}
} else {
trace!(target:"sync", "Ignored BFT statement from unregistered peer {}", peer_id);
}
}
pub fn bft_messages(&self) -> multiqueue::BroadcastFutReceiver<message::BftMessage>{
self.bft_message_stream.add_stream()
}
pub fn fetch_candidate(&mut self, io: &mut SyncIo, protocol: &Protocol, hash: &Hash) -> oneshot::Receiver<Vec<u8>> {
// Request from the first peer that has it available.
// TODO: random peer selection.
trace!(target:"sync", "Trying to fetch candidate {:?}", hash);
let (sender, receiver) = oneshot::channel();
if let Some((peer_id, ref mut peer)) = self.peers.iter_mut()
.find(|&(_, ref peer)| peer.candidate_fetch.is_none() && peer.candidate_available.as_ref().map_or(false, |h| h == hash)) {
trace!(target:"sync", "Fetching candidate from {}", peer_id);
let id = 0; //TODO: generate unique id
peer.candidate_fetch = Some(CandidateRequest {
id: id,
completion: sender,
});
let request = message::CandidateRequest {
id: id,
hash: *hash,
};
protocol.send_message(io, *peer_id, Message::CandidateRequest(request));
}
// If no peer found `sender` is dropped and `receiver` is canceled immediatelly.
return receiver;
}
pub fn send_statement(&mut self, io: &mut SyncIo, protocol: &Protocol, statement: message::Statement) {
// Broadcast statement to all validators.
trace!(target:"sync", "Broadcasting statement {:?}", statement);
for peer in self.peers.keys() {
protocol.send_message(io, *peer, Message::Statement(statement.clone()));
}
}
pub fn send_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, message: message::BftMessage) {
// Broadcast message to all validators.
trace!(target:"sync", "Broadcasting BFT message {:?}", message);
for peer in self.peers.keys() {
protocol.send_message(io, *peer, Message::BftMessage(message.clone()));
}
}
pub fn set_local_candidate(&mut self, candidate: Option<(Hash, Vec<u8>)>) {
trace!(target:"sync", "Set local candidate to {:?}", candidate.as_ref().map(|&(h, _)| h));
self.our_candidate = candidate;
}
pub fn on_candidate_request(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, request: message::CandidateRequest) {
let response = match self.our_candidate {
Some((ref hash, ref data)) if *hash == request.hash => Some(data.clone()),
_ => None,
};
let msg = message::CandidateResponse {
id: request.id,
data: response,
};
protocol.send_message(io, peer_id, Message::CandidateResponse(msg));
}
pub fn on_candidate_response(&mut self, io: &mut SyncIo, _protocol: &Protocol, peer_id: PeerId, response: message::CandidateResponse) {
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
if let Some(request) = peer.candidate_fetch.take() {
if response.id == request.id {
if let Some(data) = response.data {
if let Err(e) = request.completion.send(data) {
trace!(target:"sync", "Error sending candidate data notification: {:?}", e);
}
}
} else {
trace!(target:"sync", "Unexpected candidate response from {}", peer_id);
io.disable_peer(peer_id);
}
} else {
trace!(target:"sync", "Unexpected candidate response from {}", peer_id);
io.disable_peer(peer_id);
}
}
}
pub fn peer_disconnected(&mut self, _io: &mut SyncIo, _protocol: &Protocol, peer_id: PeerId) {
self.peers.remove(&peer_id);
}
}
+3 -1
View File
@@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.? // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
//! Polkadot service possible errors.
use network::Error as NetworkError; use network::Error as NetworkError;
use client; use client;
@@ -23,7 +25,7 @@ error_chain! {
} }
links { links {
Client(client::error::Error, client::error::ErrorKind); Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"];
} }
errors { errors {
+10 -2
View File
@@ -28,8 +28,12 @@ extern crate substrate_state_machine as state_machine;
extern crate substrate_serializer as ser; extern crate substrate_serializer as ser;
extern crate substrate_client as client; extern crate substrate_client as client;
extern crate substrate_runtime_support as runtime_support; extern crate substrate_runtime_support as runtime_support;
extern crate substrate_bft;
extern crate serde; extern crate serde;
extern crate serde_json; extern crate serde_json;
extern crate futures;
extern crate multiqueue;
extern crate ed25519;
#[macro_use] extern crate serde_derive; #[macro_use] extern crate serde_derive;
#[macro_use] extern crate log; #[macro_use] extern crate log;
#[macro_use] extern crate bitflags; #[macro_use] extern crate bitflags;
@@ -47,17 +51,21 @@ mod sync;
mod protocol; mod protocol;
mod io; mod io;
mod message; mod message;
mod error;
mod config; mod config;
mod chain; mod chain;
mod blocks; mod blocks;
mod consensus;
pub mod error;
#[cfg(test)] mod test; #[cfg(test)] mod test;
pub use service::Service; pub use service::{Service, FetchFuture, StatementStream, ConsensusService, BftMessageStream, TransactionPool, Params, ManageNetwork};
pub use protocol::{ProtocolStatus}; pub use protocol::{ProtocolStatus};
pub use sync::{Status as SyncStatus, SyncState}; pub use sync::{Status as SyncStatus, SyncState};
pub use network::{NonReservedPeerMode, ConnectionFilter, ConnectionDirection, NetworkConfiguration}; pub use network::{NonReservedPeerMode, ConnectionFilter, ConnectionDirection, NetworkConfiguration};
pub use message::{Statement, BftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal};
pub use error::Error;
pub use config::{Role, ProtocolConfig};
// TODO: move it elsewhere // TODO: move it elsewhere
fn header_hash(header: &primitives::Header) -> primitives::block::HeaderHash { fn header_hash(header: &primitives::Header) -> primitives::block::HeaderHash {
+124 -9
View File
@@ -16,17 +16,15 @@
//! Network packet message types. These get serialized and put into the lower level protocol payload. //! Network packet message types. These get serialized and put into the lower level protocol payload.
use std::borrow::Borrow; use primitives::{AuthorityId, Hash};
use primitives::AuthorityId; use primitives::block::{Number as BlockNumber, HeaderHash, Header, Body, Block};
use primitives::block::{Number as BlockNumber, HeaderHash, Header, Body};
use primitives::bft::Justification; use primitives::bft::Justification;
use service::Role as RoleFlags; use service::Role as RoleFlags;
use ed25519;
pub type RequestId = u64; pub type RequestId = u64;
type Bytes = Vec<u8>; type Bytes = Vec<u8>;
type Signature = ::primitives::hash::H256; //TODO:
/// Configured node role. /// Configured node role.
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Role { pub enum Role {
@@ -40,10 +38,10 @@ pub enum Role {
Collator, Collator,
} }
impl<T> From<T> for RoleFlags where T: Borrow<[Role]> { impl Role {
fn from(roles: T) -> RoleFlags { /// Convert enum to service flags.
pub fn as_flags(roles: &[Role]) -> RoleFlags {
let mut flags = RoleFlags::NONE; let mut flags = RoleFlags::NONE;
let roles: &[Role] = roles.borrow();
for r in roles { for r in roles {
match *r { match *r {
Role::Full => flags = flags | RoleFlags::FULL, Role::Full => flags = flags | RoleFlags::FULL,
@@ -126,6 +124,95 @@ pub enum Direction {
Descending, Descending,
} }
/// A set of transactions.
pub type Transactions = Vec<Vec<u8>>;
/// Statements circulated among peers.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum UnsignedStatement {
/// Broadcast by a authority to indicate that this is his candidate for
/// inclusion.
///
/// Broadcasting two different candidate messages per round is not allowed.
Candidate(Vec<u8>),
/// Broadcast by a authority to attest that the candidate with given digest
/// is valid.
Valid(Hash),
/// Broadcast by a authority to attest that the auxiliary data for a candidate
/// with given digest is available.
Available(Hash),
/// Broadcast by a authority to attest that the candidate with given digest
/// is invalid.
Invalid(Hash),
}
/// A signed statement.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct Statement {
/// The statement.
pub statement: UnsignedStatement,
/// The signature.
pub signature: ed25519::Signature,
/// The sender.
pub sender: AuthorityId,
}
/// Communication that can occur between participants in consensus.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum BftMessage {
/// A consensus message (proposal or vote)
Consensus(SignedConsensusMessage),
/// Auxiliary communication (just proof-of-lock for now).
Auxiliary(Justification),
}
/// A localized proposal message. Contains two signed pieces of data.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct SignedConsensusProposal {
/// The round number.
pub round_number: u32,
/// The proposal sent.
pub proposal: Block,
/// The digest of the proposal.
pub digest: Hash,
/// The sender of the proposal
pub sender: AuthorityId,
/// The signature on the message (propose, round number, digest)
pub digest_signature: ed25519::Signature,
/// The signature on the message (propose, round number, proposal)
pub full_signature: ed25519::Signature,
}
/// A localized vote message, including the sender.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct SignedConsensusVote {
/// The message sent.
pub vote: ConsensusVote,
/// The sender of the message
pub sender: AuthorityId,
/// The signature of the message.
pub signature: ed25519::Signature,
}
/// Votes during a consensus round.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum ConsensusVote {
/// Prepare to vote for proposal with digest D.
Prepare(u32, Hash),
/// Commit to proposal with digest D..
Commit(u32, Hash),
/// Propose advancement to a new round.
AdvanceRound(u32),
}
/// A localized message.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum SignedConsensusMessage {
/// A proposal.
Propose(SignedConsensusProposal),
/// A vote.
Vote(SignedConsensusVote),
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
/// A network message. /// A network message.
pub enum Message { pub enum Message {
@@ -137,6 +224,16 @@ pub enum Message {
BlockResponse(BlockResponse), BlockResponse(BlockResponse),
/// Block announce. /// Block announce.
BlockAnnounce(BlockAnnounce), BlockAnnounce(BlockAnnounce),
/// Transactions.
Transactions(Transactions),
/// Consensus statement.
Statement(Statement),
/// Candidate data request.
CandidateRequest(CandidateRequest),
/// Candidate response.
CandidateResponse(CandidateResponse),
/// BFT Consensus statement.
BftMessage(BftMessage),
} }
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
@@ -152,7 +249,7 @@ pub struct Status {
/// Genesis block hash. /// Genesis block hash.
pub genesis_hash: HeaderHash, pub genesis_hash: HeaderHash,
/// Signatue of `best_hash` made with validator address. Required for the validator role. /// Signatue of `best_hash` made with validator address. Required for the validator role.
pub validator_signature: Option<Signature>, pub validator_signature: Option<ed25519::Signature>,
/// Validator address. Required for the validator role. /// Validator address. Required for the validator role.
pub validator_id: Option<AuthorityId>, pub validator_id: Option<AuthorityId>,
/// Parachain id. Required for the collator role. /// Parachain id. Required for the collator role.
@@ -176,6 +273,24 @@ pub struct BlockRequest {
pub max: Option<u32>, pub max: Option<u32>,
} }
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
/// Request candidate block data from a peer.
pub struct CandidateRequest {
/// Unique request id.
pub id: RequestId,
/// Candidate receipt hash.
pub hash: Hash,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
/// Candidate block data response.
pub struct CandidateResponse {
/// Unique request id.
pub id: RequestId,
/// Candidate data. Empty if the peer does not have the candidate anymore.
pub data: Option<Vec<u8>>,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
/// Response to `BlockRequest` /// Response to `BlockRequest`
pub struct BlockResponse { pub struct BlockResponse {
+115 -16
View File
@@ -18,14 +18,18 @@ use std::collections::{HashMap, HashSet, BTreeMap};
use std::{mem, cmp}; use std::{mem, cmp};
use std::sync::Arc; use std::sync::Arc;
use std::time; use std::time;
use parking_lot::RwLock; use parking_lot::{RwLock, Mutex};
use multiqueue;
use futures::sync::oneshot;
use serde_json; use serde_json;
use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header, Id as BlockId}; use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header, Id as BlockId};
use primitives::Hash;
use network::{PeerId, NodeId}; use network::{PeerId, NodeId};
use message::{self, Message}; use message::{self, Message};
use sync::{ChainSync, Status as SyncStatus}; use sync::{ChainSync, Status as SyncStatus, SyncState};
use service::Role; use consensus::Consensus;
use service::{Role, TransactionPool};
use config::ProtocolConfig; use config::ProtocolConfig;
use chain::Client; use chain::Client;
use io::SyncIo; use io::SyncIo;
@@ -44,10 +48,12 @@ pub struct Protocol {
chain: Arc<Client>, chain: Arc<Client>,
genesis_hash: HeaderHash, genesis_hash: HeaderHash,
sync: RwLock<ChainSync>, sync: RwLock<ChainSync>,
/// All connected peers consensus: Mutex<Consensus>,
// All connected peers
peers: RwLock<HashMap<PeerId, Peer>>, peers: RwLock<HashMap<PeerId, Peer>>,
/// Connected peers pending Status message. // Connected peers pending Status message.
handshaking_peers: RwLock<HashMap<PeerId, time::Instant>>, handshaking_peers: RwLock<HashMap<PeerId, time::Instant>>,
transaction_pool: Arc<TransactionPool>,
} }
/// Syncing status and statistics /// Syncing status and statistics
@@ -75,8 +81,8 @@ struct Peer {
block_request: Option<message::BlockRequest>, block_request: Option<message::BlockRequest>,
/// Request timestamp /// Request timestamp
request_timestamp: Option<time::Instant>, request_timestamp: Option<time::Instant>,
/// Holds a set of transactions recently sent to this peer to avoid spamming. /// Holds a set of transactions known to this peer.
_last_sent_transactions: HashSet<TransactionHash>, known_transactions: HashSet<TransactionHash>,
/// Request counter, /// Request counter,
next_request_id: message::RequestId, next_request_id: message::RequestId,
} }
@@ -104,15 +110,17 @@ pub struct TransactionStats {
impl Protocol { impl Protocol {
/// Create a new instance. /// Create a new instance.
pub fn new(config: ProtocolConfig, chain: Arc<Client>) -> error::Result<Protocol> { pub fn new(config: ProtocolConfig, chain: Arc<Client>, transaction_pool: Arc<TransactionPool>) -> error::Result<Protocol> {
let info = chain.info()?; let info = chain.info()?;
let protocol = Protocol { let protocol = Protocol {
config: config, config: config,
chain: chain, chain: chain,
genesis_hash: info.chain.genesis_hash, genesis_hash: info.chain.genesis_hash,
sync: RwLock::new(ChainSync::new(&info)), sync: RwLock::new(ChainSync::new(&info)),
consensus: Mutex::new(Consensus::new()),
peers: RwLock::new(HashMap::new()), peers: RwLock::new(HashMap::new()),
handshaking_peers: RwLock::new(HashMap::new()), handshaking_peers: RwLock::new(HashMap::new()),
transaction_pool: transaction_pool,
}; };
Ok(protocol) Ok(protocol)
} }
@@ -168,7 +176,12 @@ impl Protocol {
}, },
Message::BlockAnnounce(announce) => { Message::BlockAnnounce(announce) => {
self.on_block_announce(io, peer_id, announce); self.on_block_announce(io, peer_id, announce);
} },
Message::Statement(s) => self.on_statement(io, peer_id, s),
Message::CandidateRequest(r) => self.on_candidate_request(io, peer_id, r),
Message::CandidateResponse(r) => self.on_candidate_response(io, peer_id, r),
Message::BftMessage(m) => self.on_bft_message(io, peer_id, m),
Message::Transactions(m) => self.on_transactions(io, peer_id, m),
} }
} }
@@ -209,11 +222,12 @@ impl Protocol {
peers.remove(&peer).is_some() peers.remove(&peer).is_some()
}; };
if removed { if removed {
self.consensus.lock().peer_disconnected(io, self, peer);
self.sync.write().peer_disconnected(io, self, peer); self.sync.write().peer_disconnected(io, self, peer);
} }
} }
pub fn on_block_request(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest) { fn on_block_request(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest) {
trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, request.from, request.to, request.max); trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, request.from, request.to, request.max);
let mut blocks = Vec::new(); let mut blocks = Vec::new();
let mut id = match request.from { let mut id = match request.from {
@@ -264,12 +278,63 @@ impl Protocol {
self.send_message(io, peer, Message::BlockResponse(response)) self.send_message(io, peer, Message::BlockResponse(response))
} }
pub fn on_block_response(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest, response: message::BlockResponse) { fn on_block_response(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest, response: message::BlockResponse) {
// TODO: validate response // TODO: validate response
trace!(target: "sync", "BlockResponse {} from {} with {} blocks", response.id, peer, response.blocks.len()); trace!(target: "sync", "BlockResponse {} from {} with {} blocks", response.id, peer, response.blocks.len());
self.sync.write().on_block_data(io, self, peer, request, response); self.sync.write().on_block_data(io, self, peer, request, response);
} }
fn on_candidate_request(&self, io: &mut SyncIo, peer: PeerId, request: message::CandidateRequest) {
trace!(target: "sync", "CandidateRequest {} from {} for {}", request.id, peer, request.hash);
self.consensus.lock().on_candidate_request(io, self, peer, request);
}
fn on_candidate_response(&self, io: &mut SyncIo, peer: PeerId, response: message::CandidateResponse) {
trace!(target: "sync", "CandidateResponse {} from {} with {:?} bytes", response.id, peer, response.data.as_ref().map(|d| d.len()));
self.consensus.lock().on_candidate_response(io, self, peer, response);
}
fn on_statement(&self, _io: &mut SyncIo, peer: PeerId, statement: message::Statement) {
trace!(target: "sync", "Statement from {}: {:?}", peer, statement);
self.consensus.lock().on_statement(peer, statement);
}
fn on_bft_message(&self, _io: &mut SyncIo, peer: PeerId, message: message::BftMessage) {
trace!(target: "sync", "BFT message from {}: {:?}", peer, message);
self.consensus.lock().on_bft_message(peer, message);
}
/// See `ConsensusService` trait.
pub fn send_bft_message(&self, io: &mut SyncIo, message: message::BftMessage) {
self.consensus.lock().send_bft_message(io, self, message)
}
/// See `ConsensusService` trait.
pub fn bft_messages(&self) -> multiqueue::BroadcastFutReceiver<message::BftMessage> {
self.consensus.lock().bft_messages()
}
/// See `ConsensusService` trait.
pub fn statements(&self) -> multiqueue::BroadcastFutReceiver<message::Statement> {
self.consensus.lock().statements()
}
/// See `ConsensusService` trait.
pub fn fetch_candidate(&self, io: &mut SyncIo, hash: &Hash) -> oneshot::Receiver<Vec<u8>> {
self.consensus.lock().fetch_candidate(io, self, hash)
}
/// See `ConsensusService` trait.
pub fn send_statement(&self, io: &mut SyncIo, statement: message::Statement) {
self.consensus.lock().send_statement(io, self, statement)
}
/// See `ConsensusService` trait.
pub fn set_local_candidate(&self, candidate: Option<(Hash, Vec<u8>)>) {
self.consensus.lock().set_local_candidate(candidate)
}
/// Perform time based maintenance.
pub fn tick(&self, io: &mut SyncIo) { pub fn tick(&self, io: &mut SyncIo) {
self.maintain_peers(io); self.maintain_peers(io);
} }
@@ -334,12 +399,12 @@ impl Protocol {
let peer = Peer { let peer = Peer {
protocol_version: status.version, protocol_version: status.version,
roles: status.roles.into(), roles: message::Role::as_flags(&status.roles),
best_hash: status.best_hash, best_hash: status.best_hash,
best_number: status.best_number, best_number: status.best_number,
block_request: None, block_request: None,
request_timestamp: None, request_timestamp: None,
_last_sent_transactions: HashSet::new(), known_transactions: HashSet::new(),
next_request_id: 0, next_request_id: 0,
}; };
peers.insert(peer_id.clone(), peer); peers.insert(peer_id.clone(), peer);
@@ -347,6 +412,42 @@ impl Protocol {
debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id)); debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id));
} }
self.sync.write().new_peer(io, self, peer_id); self.sync.write().new_peer(io, self, peer_id);
self.consensus.lock().new_peer(io, self, peer_id, &status.roles);
}
/// Called when peer sends us new transactions
fn on_transactions(&self, _io: &mut SyncIo, peer_id: PeerId, transactions: message::Transactions) {
// Accept transactions only when fully synced
if self.sync.read().status().state != SyncState::Idle {
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
return;
}
trace!(target: "sync", "Received {} transactions from {}", peer_id, transactions.len());
let mut peers = self.peers.write();
if let Some(ref mut peer) = peers.get_mut(&peer_id) {
for t in transactions {
if let Some(hash) = self.transaction_pool.import(&t) {
peer.known_transactions.insert(hash);
}
}
}
}
/// Called when peer sends us new transactions
pub fn propagate_transactions(&self, io: &mut SyncIo, transactions: &[(TransactionHash, Vec<u8>)]) {
// Accept transactions only when fully synced
if self.sync.read().status().state != SyncState::Idle {
return;
}
let mut peers = self.peers.write();
for (peer_id, ref mut peer) in peers.iter_mut() {
let to_send: Vec<_> = transactions.iter().filter_map(|&(hash, ref t)|
if peer.known_transactions.insert(hash.clone()) { Some(t.clone()) } else { None }).collect();
if !to_send.is_empty() {
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), peer_id);
self.send_message(io, *peer_id, Message::Transactions(to_send));
}
}
} }
/// Send Status message /// Send Status message
@@ -373,6 +474,7 @@ impl Protocol {
sync.clear(); sync.clear();
peers.clear(); peers.clear();
handshaking_peers.clear(); handshaking_peers.clear();
self.consensus.lock().restart();
} }
pub fn on_block_announce(&self, io: &mut SyncIo, peer_id: PeerId, announce: message::BlockAnnounce) { pub fn on_block_announce(&self, io: &mut SyncIo, peer_id: PeerId, announce: message::BlockAnnounce) {
@@ -384,9 +486,6 @@ impl Protocol {
self.sync.write().update_chain_info(&header); self.sync.write().update_chain_info(&header);
} }
pub fn on_new_transactions(&self) {
}
pub fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> { pub fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats> {
BTreeMap::new() BTreeMap::new()
} }
+98 -8
View File
@@ -17,25 +17,42 @@
use std::sync::Arc; use std::sync::Arc;
use std::collections::{BTreeMap}; use std::collections::{BTreeMap};
use std::io; use std::io;
use multiqueue;
use futures::sync::oneshot;
use network::{NetworkProtocolHandler, NetworkService, NetworkContext, HostInfo, PeerId, ProtocolId, use network::{NetworkProtocolHandler, NetworkService, NetworkContext, HostInfo, PeerId, ProtocolId,
NetworkConfiguration , NonReservedPeerMode, ErrorKind}; NetworkConfiguration , NonReservedPeerMode, ErrorKind};
use primitives::block::{TransactionHash, Header}; use primitives::block::{TransactionHash, Header};
use primitives::Hash;
use core_io::{TimerToken}; use core_io::{TimerToken};
use io::NetSyncIo; use io::NetSyncIo;
use protocol::{Protocol, ProtocolStatus, PeerInfo as ProtocolPeerInfo, TransactionStats}; use protocol::{Protocol, ProtocolStatus, PeerInfo as ProtocolPeerInfo, TransactionStats};
use config::{ProtocolConfig}; use config::{ProtocolConfig};
use error::Error; use error::Error;
use chain::Client; use chain::Client;
use message::{Statement, BftMessage};
/// Polkadot devp2p protocol id /// Polkadot devp2p protocol id
pub const DOT_PROTOCOL_ID: ProtocolId = *b"dot"; pub const DOT_PROTOCOL_ID: ProtocolId = *b"dot";
/// Type that represents fetch completion future.
pub type FetchFuture = oneshot::Receiver<Vec<u8>>;
/// Type that represents statement stream.
pub type StatementStream = multiqueue::BroadcastFutReceiver<Statement>;
/// Type that represents bft messages stream.
pub type BftMessageStream = multiqueue::BroadcastFutReceiver<BftMessage>;
bitflags! { bitflags! {
/// Node roles bitmask.
pub struct Role: u32 { pub struct Role: u32 {
/// No network.
const NONE = 0b00000000; const NONE = 0b00000000;
/// Full node, doe not participate in consensus.
const FULL = 0b00000001; const FULL = 0b00000001;
/// Light client node.
const LIGHT = 0b00000010; const LIGHT = 0b00000010;
/// Act as a validator.
const VALIDATOR = 0b00000100; const VALIDATOR = 0b00000100;
/// Act as a collator.
const COLLATOR = 0b00001000; const COLLATOR = 0b00001000;
} }
} }
@@ -52,6 +69,39 @@ pub trait SyncProvider: Send + Sync {
fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats>; fn transactions_stats(&self) -> BTreeMap<TransactionHash, TransactionStats>;
} }
/// Transaction pool interface
pub trait TransactionPool: Send + Sync {
/// Get transactions from the pool that are ready to be propagated.
fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)>;
/// Import a transction into the pool.
fn import(&self, transaction: &[u8]) -> Option<TransactionHash>;
}
/// ConsensusService
pub trait ConsensusService: Send + Sync {
/// Get statement stream.
fn statements(&self) -> multiqueue::BroadcastFutReceiver<Statement>;
/// Send out a statement.
fn send_statement(&self, statement: Statement);
/// Maintain connectivity to given addresses.
fn connect_to_authorities(&self, addresses: &[String]);
/// Fetch candidate.
fn fetch_candidate(&self, hash: &Hash) -> oneshot::Receiver<Vec<u8>>;
/// Note local candidate. Accepts candidate receipt hash and candidate data.
/// Pass `None` to clear the candidate.
fn set_local_candidate(&self, candidate: Option<(Hash, Vec<u8>)>);
/// Get BFT message stream.
fn bft_messages(&self) -> multiqueue::BroadcastFutReceiver<BftMessage>;
/// Send out a BFT message.
fn send_bft_message(&self, message: BftMessage);
}
/// devp2p Protocol handler
struct ProtocolHandler {
protocol: Protocol,
}
/// Peer connection information /// Peer connection information
#[derive(Debug)] #[derive(Debug)]
pub struct PeerInfo { pub struct PeerInfo {
@@ -77,6 +127,8 @@ pub struct Params {
pub network_config: NetworkConfiguration, pub network_config: NetworkConfiguration,
/// Polkadot relay chain access point. /// Polkadot relay chain access point.
pub chain: Arc<Client>, pub chain: Arc<Client>,
/// Transaction pool.
pub transaction_pool: Arc<TransactionPool>,
} }
/// Polkadot network service. Handles network IO and manages connectivity. /// Polkadot network service. Handles network IO and manages connectivity.
@@ -90,13 +142,11 @@ pub struct Service {
impl Service { impl Service {
/// Creates and register protocol with the network service /// Creates and register protocol with the network service
pub fn new(params: Params) -> Result<Arc<Service>, Error> { pub fn new(params: Params) -> Result<Arc<Service>, Error> {
let service = NetworkService::new(params.network_config.clone(), None)?; let service = NetworkService::new(params.network_config.clone(), None)?;
let sync = Arc::new(Service { let sync = Arc::new(Service {
network: service, network: service,
handler: Arc::new(ProtocolHandler { handler: Arc::new(ProtocolHandler {
protocol: Protocol::new(params.config, params.chain.clone())?, protocol: Protocol::new(params.config, params.chain.clone(), params.transaction_pool)?,
}), }),
}); });
@@ -109,8 +159,10 @@ impl Service {
} }
/// Called when new transactons are imported by the client. /// Called when new transactons are imported by the client.
pub fn on_new_transactions(&self) { pub fn on_new_transactions(&self, transactions: &[(TransactionHash, Vec<u8>)]) {
self.handler.protocol.on_new_transactions() self.network.with_context(DOT_PROTOCOL_ID, |context| {
self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context), transactions);
});
} }
fn start(&self) { fn start(&self) {
@@ -130,6 +182,12 @@ impl Service {
} }
} }
impl Drop for Service {
fn drop(&mut self) {
self.stop();
}
}
impl SyncProvider for Service { impl SyncProvider for Service {
/// Get sync status /// Get sync status
fn status(&self) -> ProtocolStatus { fn status(&self) -> ProtocolStatus {
@@ -168,9 +226,41 @@ impl SyncProvider for Service {
} }
} }
struct ProtocolHandler { /// ConsensusService
/// Protocol handler impl ConsensusService for Service {
protocol: Protocol, fn statements(&self) -> multiqueue::BroadcastFutReceiver<Statement> {
self.handler.protocol.statements()
}
fn connect_to_authorities(&self, _addresses: &[String]) {
//TODO: implement me
}
fn fetch_candidate(&self, hash: &Hash) -> oneshot::Receiver<Vec<u8>> {
self.network.with_context_eval(DOT_PROTOCOL_ID, |context| {
self.handler.protocol.fetch_candidate(&mut NetSyncIo::new(context), hash)
}).expect("DOT Service is registered")
}
fn send_statement(&self, statement: Statement) {
self.network.with_context(DOT_PROTOCOL_ID, |context| {
self.handler.protocol.send_statement(&mut NetSyncIo::new(context), statement);
});
}
fn set_local_candidate(&self, candidate: Option<(Hash, Vec<u8>)>) {
self.handler.protocol.set_local_candidate(candidate)
}
fn bft_messages(&self) -> BftMessageStream {
self.handler.protocol.bft_messages()
}
fn send_bft_message(&self, message: BftMessage) {
self.network.with_context(DOT_PROTOCOL_ID, |context| {
self.handler.protocol.send_bft_message(&mut NetSyncIo::new(context), message);
});
}
} }
impl NetworkProtocolHandler for ProtocolHandler { impl NetworkProtocolHandler for ProtocolHandler {
+9 -1
View File
@@ -84,9 +84,13 @@ impl ChainSync {
} }
} }
fn best_seen_block(&self) -> Option<BlockNumber> {
self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number)
}
/// Returns sync status /// Returns sync status
pub fn status(&self) -> Status { pub fn status(&self) -> Status {
let best_seen = self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number); let best_seen = self.best_seen_block();
let state = match &best_seen { let state = match &best_seen {
&Some(n) if n > self.best_queued_number && n - self.best_queued_number > 5 => SyncState::Downloading, &Some(n) if n > self.best_queued_number && n - self.best_queued_number > 5 => SyncState::Downloading,
_ => SyncState::Idle, _ => SyncState::Idle,
@@ -97,6 +101,7 @@ impl ChainSync {
} }
} }
/// Handle new connected peer.
pub fn new_peer(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) { pub fn new_peer(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) {
if let Some(info) = protocol.peer_info(peer_id) { if let Some(info) = protocol.peer_info(peer_id) {
match (protocol.chain().block_status(&BlockId::Hash(info.best_hash)), info.best_number) { match (protocol.chain().block_status(&BlockId::Hash(info.best_hash)), info.best_number) {
@@ -211,6 +216,7 @@ impl ChainSync {
vec![] vec![]
}; };
let best_seen = self.best_seen_block();
// Blocks in the response/drain should be in ascending order. // Blocks in the response/drain should be in ascending order.
for block in new_blocks { for block in new_blocks {
let origin = block.origin; let origin = block.origin;
@@ -220,7 +226,9 @@ impl ChainSync {
let number = header.number; let number = header.number;
let hash = header_hash(&header); let hash = header_hash(&header);
let parent = header.parent_hash; let parent = header.parent_hash;
let is_best = best_seen.as_ref().map_or(false, |n| number >= *n);
let result = protocol.chain().import( let result = protocol.chain().import(
is_best,
header, header,
justification, justification,
block.body block.body
+18 -4
View File
@@ -19,14 +19,15 @@ mod sync;
use std::collections::{VecDeque, HashSet, HashMap}; use std::collections::{VecDeque, HashSet, HashMap};
use std::sync::Arc; use std::sync::Arc;
use parking_lot::RwLock; use parking_lot::RwLock;
use client::{self, genesis}; use client::{self, genesis, BlockOrigin};
use client::block_builder::BlockBuilder; use client::block_builder::BlockBuilder;
use primitives::block::Id as BlockId; use primitives::block::{Id as BlockId, TransactionHash};
use primitives; use primitives;
use executor; use executor;
use io::SyncIo; use io::SyncIo;
use protocol::Protocol; use protocol::Protocol;
use config::ProtocolConfig; use config::ProtocolConfig;
use service::TransactionPool;
use network::{PeerId, SessionInfo, Error as NetworkError}; use network::{PeerId, SessionInfo, Error as NetworkError};
use test_runtime::genesismap::{GenesisConfig, additional_storage_with_genesis}; use test_runtime::genesismap::{GenesisConfig, additional_storage_with_genesis};
use runtime_support::Hashable; use runtime_support::Hashable;
@@ -190,7 +191,7 @@ impl Peer {
trace!("Generating {}, (#{})", primitives::block::HeaderHash::from(block.header.blake2_256()), block.header.number); trace!("Generating {}, (#{})", primitives::block::HeaderHash::from(block.header.blake2_256()), block.header.number);
let justification = Self::justify(&block.header); let justification = Self::justify(&block.header);
let justified = self.client.check_justification(block.header, justification).unwrap(); let justified = self.client.check_justification(block.header, justification).unwrap();
self.client.import_block(justified, Some(block.transactions)).unwrap(); self.client.import_block(BlockOrigin::File, justified, Some(block.transactions)).unwrap();
} }
} }
@@ -215,6 +216,18 @@ impl Peer {
} }
} }
struct EmptyTransactionPool;
impl TransactionPool for EmptyTransactionPool {
fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)> {
Vec::new()
}
fn import(&self, _transaction: &[u8]) -> Option<TransactionHash> {
None
}
}
pub struct TestNet { pub struct TestNet {
pub peers: Vec<Arc<Peer>>, pub peers: Vec<Arc<Peer>>,
pub started: bool, pub started: bool,
@@ -248,7 +261,8 @@ impl TestNet {
for _ in 0..n { for _ in 0..n {
let client = Arc::new(client::new_in_mem(Executor::new(), Self::prepare_genesis).unwrap()); let client = Arc::new(client::new_in_mem(Executor::new(), Self::prepare_genesis).unwrap());
let sync = Protocol::new(config.clone(), client.clone()).unwrap(); let tx_pool = Arc::new(EmptyTransactionPool);
let sync = Protocol::new(config.clone(), client.clone(), tx_pool).unwrap();
net.peers.push(Arc::new(Peer { net.peers.push(Arc::new(Peer {
sync: sync, sync: sync,
client: client, client: client,
-254
View File
@@ -1,254 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
mod sync;
use std::collections::{VecDeque, HashSet, HashMap};
use std::sync::Arc;
use parking_lot::RwLock;
use client::{self, BlockId};
use substrate_executor as executor;
use io::SyncIo;
use protocol::Protocol;
use config::ProtocolConfig;
use network::{PeerId, SessionInfo, Error as NetworkError};
pub struct TestIo<'p> {
pub queue: &'p RwLock<VecDeque<TestPacket>>,
pub sender: Option<PeerId>,
pub to_disconnect: HashSet<PeerId>,
pub packets: Vec<TestPacket>,
pub peers_info: HashMap<PeerId, String>,
}
impl<'p> TestIo<'p> where {
pub fn new(queue: &'p RwLock<VecDeque<TestPacket>>, sender: Option<PeerId>) -> TestIo<'p> {
TestIo {
queue: queue,
sender: sender,
to_disconnect: HashSet::new(),
packets: Vec::new(),
peers_info: HashMap::new(),
}
}
}
impl<'p> Drop for TestIo<'p> {
fn drop(&mut self) {
self.queue.write().extend(self.packets.drain(..));
}
}
impl<'p> SyncIo for TestIo<'p> {
fn disable_peer(&mut self, peer_id: PeerId) {
self.disconnect_peer(peer_id);
}
fn disconnect_peer(&mut self, peer_id: PeerId) {
self.to_disconnect.insert(peer_id);
}
fn is_expired(&self) -> bool {
false
}
fn send(&mut self, peer_id: PeerId, data: Vec<u8>) -> Result<(), NetworkError> {
self.packets.push(TestPacket {
data: data,
recipient: peer_id,
});
Ok(())
}
fn peer_info(&self, peer_id: PeerId) -> String {
self.peers_info.get(&peer_id)
.cloned()
.unwrap_or_else(|| peer_id.to_string())
}
fn peer_session_info(&self, _peer_id: PeerId) -> Option<SessionInfo> {
None
}
}
/// Mocked subprotocol packet
pub struct TestPacket {
pub data: Vec<u8>,
pub recipient: PeerId,
}
pub struct Peer {
pub chain: Arc<client::Client<client::in_mem::Backend, executor::DefaultExecutor>>,
pub sync: Protocol,
pub queue: RwLock<VecDeque<TestPacket>>,
}
impl Peer {
/// Called after blockchain has been populated to updated current state.
fn start(&self) {
// Update the sync state to the lates chain state.
let info = self.chain.info().expect("In-mem chain does not fail");
let header = self.chain.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap();
self.sync.on_block_imported(&header);
}
/// Called on connection to other indicated peer.
fn on_connect(&self, other: PeerId) {
self.sync.on_peer_connected(&mut TestIo::new(&self.queue, Some(other)), other);
}
/// Called on disconnect from other indicated peer.
fn on_disconnect(&self, other: PeerId) {
let mut io = TestIo::new(&self.queue, Some(other));
self.sync.on_peer_disconnected(&mut io, other);
}
/// Receive a message from another peer. Return a set of peers to disconnect.
fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> {
let mut io = TestIo::new(&self.queue, Some(from));
self.sync.handle_packet(&mut io, from, &msg.data);
self.flush();
io.to_disconnect.clone()
}
/// Produce the next pending message to send to another peer.
fn pending_message(&self) -> Option<TestPacket> {
self.flush();
self.queue.write().pop_front()
}
/// Whether this peer is done syncing (has no messages to send).
fn is_done(&self) -> bool {
self.queue.read().is_empty()
}
/// Execute a "sync step". This is called for each peer after it sends a packet.
fn sync_step(&self) {
self.flush();
self.sync.tick(&mut TestIo::new(&self.queue, None));
}
/// Restart sync for a peer.
fn restart_sync(&self) {
self.sync.abort();
}
fn flush(&self) {
}
}
pub struct TestNet {
pub peers: Vec<Arc<Peer>>,
pub started: bool,
pub disconnect_events: Vec<(PeerId, PeerId)>, //disconnected (initiated by, to)
}
impl TestNet {
pub fn new(n: usize) -> Self {
Self::new_with_config(n, ProtocolConfig::default())
}
pub fn new_with_config(n: usize, config: ProtocolConfig) -> Self {
let mut net = TestNet {
peers: Vec::new(),
started: false,
disconnect_events: Vec::new(),
};
for _ in 0..n {
let chain = Arc::new(client::new_in_mem(executor::executor()).unwrap());
let sync = Protocol::new(config.clone(), chain.clone()).unwrap();
net.peers.push(Arc::new(Peer {
sync: sync,
chain: chain,
queue: RwLock::new(VecDeque::new()),
}));
}
net
}
pub fn peer(&self, i: usize) -> &Peer {
&self.peers[i]
}
pub fn start(&mut self) {
if self.started {
return;
}
for peer in 0..self.peers.len() {
self.peers[peer].start();
for client in 0..self.peers.len() {
if peer != client {
self.peers[peer].on_connect(client as PeerId);
}
}
}
self.started = true;
}
pub fn sync_step(&mut self) {
for peer in 0..self.peers.len() {
let packet = self.peers[peer].pending_message();
if let Some(packet) = packet {
let disconnecting = {
let recipient = packet.recipient;
trace!("--- {} -> {} ---", peer, recipient);
let to_disconnect = self.peers[recipient].receive_message(peer as PeerId, packet);
for d in &to_disconnect {
// notify this that disconnecting peers are disconnecting
self.peers[recipient].on_disconnect(*d as PeerId);
self.disconnect_events.push((peer, *d));
}
to_disconnect
};
for d in &disconnecting {
// notify other peers that this peer is disconnecting
self.peers[*d].on_disconnect(peer as PeerId);
}
}
self.sync_step_peer(peer);
}
}
pub fn sync_step_peer(&mut self, peer_num: usize) {
self.peers[peer_num].sync_step();
}
pub fn restart_peer(&mut self, i: usize) {
self.peers[i].restart_sync();
}
pub fn sync(&mut self) -> u32 {
self.start();
let mut total_steps = 0;
while !self.done() {
self.sync_step();
total_steps += 1;
}
total_steps
}
pub fn sync_steps(&mut self, count: usize) {
self.start();
for _ in 0..count {
self.sync_step();
}
}
pub fn done(&self) -> bool {
self.peers.iter().all(|p| p.is_done())
}
}
+4 -3
View File
@@ -21,6 +21,7 @@ mod error;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
use std::sync::Arc;
use client::{self, Client}; use client::{self, Client};
use primitives::block; use primitives::block;
use primitives::storage::{StorageKey, StorageData}; use primitives::storage::{StorageKey, StorageData};
@@ -41,16 +42,16 @@ build_rpc_trait! {
} }
} }
impl<B, E> StateApi for Client<B, E> where impl<B, E> StateApi for Arc<Client<B, E>> where
B: client::backend::Backend + Send + Sync + 'static, B: client::backend::Backend + Send + Sync + 'static,
E: state_machine::CodeExecutor + Send + Sync + 'static, E: state_machine::CodeExecutor + Send + Sync + 'static,
client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>, client::error::Error: From<<<B as client::backend::Backend>::State as state_machine::backend::Backend>::Error>,
{ {
fn storage(&self, key: StorageKey, block: block::HeaderHash) -> Result<StorageData> { fn storage(&self, key: StorageKey, block: block::HeaderHash) -> Result<StorageData> {
Ok(self.storage(&block::Id::Hash(block), &key)?) Ok(self.as_ref().storage(&block::Id::Hash(block), &key)?)
} }
fn call(&self, method: String, data: Vec<u8>, block: block::HeaderHash) -> Result<Vec<u8>> { fn call(&self, method: String, data: Vec<u8>, block: block::HeaderHash) -> Result<Vec<u8>> {
Ok(self.call(&block::Id::Hash(block), &method, &data)?.return_data) Ok(self.as_ref().call(&block::Id::Hash(block), &method, &data)?.return_data)
} }
} }
+2 -2
View File
@@ -30,7 +30,7 @@ fn should_return_storage() {
digest: Default::default(), digest: Default::default(),
}; };
let client = client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap(); let client = Arc::new(client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap());
let genesis_hash = test_genesis_block.blake2_256().into(); let genesis_hash = test_genesis_block.blake2_256().into();
assert_matches!( assert_matches!(
@@ -51,7 +51,7 @@ fn should_call_contract() {
digest: Default::default(), digest: Default::default(),
}; };
let client = client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap(); let client = Arc::new(client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap());
let genesis_hash = test_genesis_block.blake2_256().into(); let genesis_hash = test_genesis_block.blake2_256().into();
assert_matches!( assert_matches!(