From 4bdfd02f933dce763a2562df674ba3181e81a20f Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Mon, 10 Aug 2020 15:02:30 +0200 Subject: [PATCH] impl availability distribution Closes #1237 --- polkadot/Cargo.lock | 436 ++++--- polkadot/Cargo.toml | 1 + .../availability-distribution/Cargo.toml | 34 + .../availability-distribution/src/lib.rs | 1092 +++++++++++++++++ .../availability-distribution/src/tests.rs | 975 +++++++++++++++ .../network/bitfield-distribution/Cargo.toml | 4 +- .../network/bitfield-distribution/src/lib.rs | 20 +- .../node/subsystem-test-helpers/src/lib.rs | 2 +- polkadot/node/subsystem/src/lib.rs | 2 +- polkadot/node/subsystem/src/messages.rs | 12 +- polkadot/parachain/src/primitives.rs | 8 +- polkadot/primitives/src/v0.rs | 6 +- polkadot/primitives/src/v1.rs | 10 +- .../availability/availability-distribution.md | 13 +- 14 files changed, 2414 insertions(+), 201 deletions(-) create mode 100644 polkadot/node/network/availability-distribution/Cargo.toml create mode 100644 polkadot/node/network/availability-distribution/src/lib.rs create mode 100644 polkadot/node/network/availability-distribution/src/tests.rs diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 17cdc18bb9..6ff134014b 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -225,10 +225,24 @@ dependencies = [ ] [[package]] -name = "async-io" -version = "0.1.4" +name = "async-executor" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a0fc2017a5cca12763bb5636092a7786b52789c23c5838a392db2eb99963fd3" +checksum = "90f47c78ea98277cb1f5e6f60ba4fc762f5eafe9f6511bc2f7dfd8b75c225650" +dependencies = [ + "async-io", + "futures-lite", + "multitask", + "parking", + "scoped-tls 1.0.0", + "waker-fn", +] + +[[package]] +name = "async-io" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8126ef9fb99355c6fd27575d691be4887b884137a5b6f48c2d961f13590c51" dependencies = [ "cfg-if", "concurrent-queue", @@ -255,7 +269,7 @@ dependencies = [ "futures-io", "futures-timer 3.0.2", "kv-log-macro", - "log 0.4.8", + "log 0.4.11", "memchr", "num_cpus", "once_cell", @@ -380,7 +394,7 @@ dependencies = [ "env_logger", "lazy_static", "lazycell", - "log 0.4.8", + "log 0.4.11", "peeking_take_while", "proc-macro2 1.0.18", "quote 1.0.7", @@ -510,6 +524,19 @@ dependencies = [ "waker-fn", ] +[[package]] +name = "blocking" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76e94bf99b692f54c9d05f97454d3faf11134523fe5b180564a3fb6ed63bcc0a" +dependencies = [ + "async-channel", + "atomic-waker", + "futures-lite", + "once_cell", + "waker-fn", +] + [[package]] name = "bs58" version = "0.3.1" @@ -676,6 +703,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "cloudabi" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4344512281c643ae7638bbabc3af17a11307803ec8f0fcad9fae512a8bf36467" +dependencies = [ + "bitflags", +] + [[package]] name = "concurrent-queue" version = "1.1.1" @@ -701,7 +737,7 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7871d2947441b0fdd8e2bd1ce2a2f75304f896582c0d572162d48290683c48" dependencies = [ - "log 0.4.8", + "log 0.4.11", "web-sys", ] @@ -774,7 +810,7 @@ dependencies = [ "cranelift-codegen-shared", "cranelift-entity", "gimli 0.21.0", - "log 0.4.8", + "log 0.4.11", "regalloc", "serde", "smallvec 1.4.1", @@ -814,7 +850,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ef419efb4f94ecc02e5d9fbcc910d2bb7f0040e2de570e63a454f883bc891d6" dependencies = [ "cranelift-codegen", - "log 0.4.8", + "log 0.4.11", "smallvec 1.4.1", "target-lexicon", ] @@ -839,7 +875,7 @@ dependencies = [ "cranelift-codegen", "cranelift-entity", "cranelift-frontend", - "log 0.4.8", + "log 0.4.11", "serde", "thiserror", "wasmparser 0.59.0", @@ -1090,6 +1126,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c53dc3a653e0f64081026e4bf048d48fec9fce90c66e8326ca7292df0ff2d82" +[[package]] +name = "easy-parallel" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dd4afd79212583ff429b913ad6605242ed7eec277e950b1438f300748f948f4" + [[package]] name = "ed25519" version = "1.0.1" @@ -1156,7 +1198,7 @@ checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" dependencies = [ "atty", "humantime", - "log 0.4.8", + "log 0.4.11", "regex", "termcolor", ] @@ -1204,7 +1246,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74cf96bec282dcdb07099f7e31d9fed323bca9435a09aba7b6d99b7617bca96d" dependencies = [ "lazy_static", - "log 0.4.8", + "log 0.4.11", "serde", "serde_json", ] @@ -1281,7 +1323,7 @@ checksum = "7b6b21baebbed15551f2170010ca4101b9ed3fdc05822791c8bd4631840eab81" dependencies = [ "cfg-if", "js-sys", - "log 0.4.8", + "log 0.4.11", "serde", "serde_derive", "wasm-bindgen", @@ -1295,7 +1337,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b3937f028664bd0e13df401ba49a4567ccda587420365823242977f06609ed1" dependencies = [ "env_logger", - "log 0.4.8", + "log 0.4.11", ] [[package]] @@ -1307,7 +1349,7 @@ dependencies = [ "either", "futures 0.3.5", "futures-timer 2.0.2", - "log 0.4.8", + "log 0.4.11", "num-traits 0.2.12", "parity-scale-codec", "parking_lot 0.9.0", @@ -1428,7 +1470,7 @@ dependencies = [ "frame-metadata", "frame-support-procedural", "impl-trait-for-tuples", - "log 0.4.8", + "log 0.4.11", "once_cell", "parity-scale-codec", "paste", @@ -1627,7 +1669,7 @@ dependencies = [ "futures 0.1.29", "futures 0.3.5", "lazy_static", - "log 0.4.8", + "log 0.4.11", "parking_lot 0.9.0", "pin-project", "serde", @@ -1654,15 +1696,17 @@ checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" [[package]] name = "futures-lite" -version = "0.1.6" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af0bbcb0ec905ef6ee23fab499119b5da2362b8697d66e08d1ef01a8c0d438e2" +checksum = "bbe71459749b2e8e66fb95df721b22fa08661ad384a0c5b519e11d3893b4692a" dependencies = [ "fastrand", "futures-core", "futures-io", "memchr", + "parking", "pin-project-lite", + "waker-fn", ] [[package]] @@ -1767,7 +1811,7 @@ checksum = "add72f17bb81521258fcc8a7a3245b1e184e916bfbe34f0ea89558f440df5c68" dependencies = [ "cc", "libc", - "log 0.4.8", + "log 0.4.11", "rustc_version", "winapi 0.3.9", ] @@ -1866,7 +1910,7 @@ dependencies = [ "aho-corasick", "bstr", "fnv", - "log 0.4.8", + "log 0.4.11", "regex", ] @@ -1895,7 +1939,7 @@ dependencies = [ "futures 0.1.29", "http 0.1.21", "indexmap", - "log 0.4.8", + "log 0.4.11", "slab", "string", "tokio-io", @@ -1914,7 +1958,7 @@ dependencies = [ "futures-util", "http 0.2.1", "indexmap", - "log 0.4.8", + "log 0.4.11", "slab", "tokio 0.2.21", "tokio-util", @@ -2093,7 +2137,7 @@ dependencies = [ "httparse", "iovec", "itoa", - "log 0.4.8", + "log 0.4.11", "net2", "rustc_version", "time", @@ -2123,7 +2167,7 @@ dependencies = [ "http-body 0.3.1", "httparse", "itoa", - "log 0.4.8", + "log 0.4.11", "pin-project", "socket2", "time", @@ -2142,7 +2186,7 @@ dependencies = [ "ct-logs", "futures-util", "hyper 0.13.6", - "log 0.4.8", + "log 0.4.11", "rustls", "rustls-native-certs", "tokio 0.2.21", @@ -2220,6 +2264,12 @@ dependencies = [ "serde", ] +[[package]] +name = "instant" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b141fdc7836c525d4d594027d318c84161ca17aaf8113ab1f81ab93ae897485" + [[package]] name = "integer-sqrt" version = "0.1.3" @@ -2330,7 +2380,7 @@ dependencies = [ "futures 0.1.29", "jsonrpc-core", "jsonrpc-pubsub", - "log 0.4.8", + "log 0.4.11", "serde", "serde_json", "url 1.7.2", @@ -2343,7 +2393,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0747307121ffb9703afd93afbd0fb4f854c38fb873f2c8b90e0e902f27c7b62" dependencies = [ "futures 0.1.29", - "log 0.4.8", + "log 0.4.11", "serde", "serde_derive", "serde_json", @@ -2379,7 +2429,7 @@ dependencies = [ "hyper 0.12.35", "jsonrpc-core", "jsonrpc-server-utils", - "log 0.4.8", + "log 0.4.11", "net2", "parking_lot 0.10.2", "unicase", @@ -2393,7 +2443,7 @@ checksum = "dedccd693325d833963b549e959137f30a7a0ea650cde92feda81dc0c1393cb5" dependencies = [ "jsonrpc-core", "jsonrpc-server-utils", - "log 0.4.8", + "log 0.4.11", "parity-tokio-ipc", "parking_lot 0.10.2", "tokio-service", @@ -2406,7 +2456,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d44f5602a11d657946aac09357956d2841299ed422035edf140c552cb057986" dependencies = [ "jsonrpc-core", - "log 0.4.8", + "log 0.4.11", "parking_lot 0.10.2", "rand 0.7.3", "serde", @@ -2422,7 +2472,7 @@ dependencies = [ "globset", "jsonrpc-core", "lazy_static", - "log 0.4.8", + "log 0.4.11", "tokio 0.1.22", "tokio-codec", "unicase", @@ -2436,7 +2486,7 @@ checksum = "903d3109fe7c4acb932b567e1e607e0f524ed04741b09fb0e61841bc40a022fc" dependencies = [ "jsonrpc-core", "jsonrpc-server-utils", - "log 0.4.8", + "log 0.4.11", "parking_lot 0.10.2", "slab", "ws", @@ -2539,7 +2589,7 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" dependencies = [ - "log 0.4.8", + "log 0.4.11", ] [[package]] @@ -2571,7 +2621,7 @@ checksum = "7c341ef15cfb1f923fa3b5138bfbd2d4813a2c1640b473727a53351c7f0b0fa2" dependencies = [ "fs-swap", "kvdb", - "log 0.4.8", + "log 0.4.11", "num_cpus", "owning_ref", "parity-util-mem", @@ -2591,7 +2641,7 @@ dependencies = [ "js-sys", "kvdb", "kvdb-memorydb", - "log 0.4.8", + "log 0.4.11", "parity-util-mem", "send_wrapper 0.3.0", "wasm-bindgen", @@ -2684,7 +2734,7 @@ dependencies = [ "futures-timer 3.0.2", "lazy_static", "libsecp256k1", - "log 0.4.8", + "log 0.4.11", "multihash", "multistream-select", "parity-multiaddr", @@ -2721,7 +2771,7 @@ checksum = "f751924b6b98e350005e0b87a822beb246792a3fb878c684e088f866158120ac" dependencies = [ "futures 0.3.5", "libp2p-core", - "log 0.4.8", + "log 0.4.11", ] [[package]] @@ -2733,7 +2783,7 @@ dependencies = [ "futures 0.3.5", "libp2p-core", "libp2p-swarm", - "log 0.4.8", + "log 0.4.11", "prost", "prost-build", "smallvec 1.4.1", @@ -2754,7 +2804,7 @@ dependencies = [ "futures_codec", "libp2p-core", "libp2p-swarm", - "log 0.4.8", + "log 0.4.11", "multihash", "prost", "prost-build", @@ -2781,7 +2831,7 @@ dependencies = [ "lazy_static", "libp2p-core", "libp2p-swarm", - "log 0.4.8", + "log 0.4.11", "net2", "rand 0.7.3", "smallvec 1.4.1", @@ -2800,7 +2850,7 @@ dependencies = [ "futures 0.3.5", "futures_codec", "libp2p-core", - "log 0.4.8", + "log 0.4.11", "parking_lot 0.10.2", "unsigned-varint 0.4.0", ] @@ -2816,7 +2866,7 @@ dependencies = [ "futures 0.3.5", "lazy_static", "libp2p-core", - "log 0.4.8", + "log 0.4.11", "prost", "prost-build", "rand 0.7.3", @@ -2836,7 +2886,7 @@ dependencies = [ "futures 0.3.5", "libp2p-core", "libp2p-swarm", - "log 0.4.8", + "log 0.4.11", "rand 0.7.3", "void", "wasm-timer", @@ -2850,7 +2900,7 @@ checksum = "f88d5e2a090a2aadf042cd33484e2f015c6dab212567406a59deece5dedbd133" dependencies = [ "futures 0.3.5", "libp2p-core", - "log 0.4.8", + "log 0.4.11", "rand 0.7.3", "smallvec 1.4.1", "void", @@ -2869,7 +2919,7 @@ dependencies = [ "get_if_addrs", "ipnet", "libp2p-core", - "log 0.4.8", + "log 0.4.11", "socket2", ] @@ -2897,7 +2947,7 @@ dependencies = [ "either", "futures 0.3.5", "libp2p-core", - "log 0.4.8", + "log 0.4.11", "quicksink", "rustls", "rw-stream-sink", @@ -2995,20 +3045,29 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "lock_api" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28247cc5a5be2f05fbcd76dd0cf2c7d3b5400cb978a28042abcd4fa0b3f8261c" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" dependencies = [ - "log 0.4.8", + "log 0.4.11", ] [[package]] name = "log" -version = "0.4.8" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" +checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b" dependencies = [ "cfg-if", ] @@ -3153,7 +3212,7 @@ dependencies = [ "iovec", "kernel32-sys", "libc", - "log 0.4.8", + "log 0.4.11", "miow 0.2.1", "net2", "slab", @@ -3167,7 +3226,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19" dependencies = [ "lazycell", - "log 0.4.8", + "log 0.4.11", "mio", "slab", ] @@ -3178,7 +3237,7 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656" dependencies = [ - "log 0.4.8", + "log 0.4.11", "mio", "miow 0.3.5", "winapi 0.3.9", @@ -3252,7 +3311,7 @@ checksum = "c9157e87afbc2ef0d84cc0345423d715f445edde00141c93721c162de35a05e5" dependencies = [ "bytes 0.5.5", "futures 0.3.5", - "log 0.4.8", + "log 0.4.11", "pin-project", "smallvec 1.4.1", "unsigned-varint 0.4.0", @@ -4041,7 +4100,7 @@ dependencies = [ "blake2-rfc", "crc32fast", "libc", - "log 0.4.8", + "log 0.4.11", "memmap", "parking_lot 0.10.2", ] @@ -4104,7 +4163,7 @@ dependencies = [ "bytes 0.4.12", "futures 0.1.29", "libc", - "log 0.4.8", + "log 0.4.11", "mio-named-pipes", "miow 0.3.5", "rand 0.7.3", @@ -4160,7 +4219,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252" dependencies = [ - "lock_api", + "lock_api 0.3.4", "parking_lot_core 0.6.2", "rustc_version", ] @@ -4171,10 +4230,21 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" dependencies = [ - "lock_api", + "lock_api 0.3.4", "parking_lot_core 0.7.2", ] +[[package]] +name = "parking_lot" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4893845fa2ca272e647da5d0e46660a314ead9c2fdd9a883aabc32e481a8733" +dependencies = [ + "instant", + "lock_api 0.4.1", + "parking_lot_core 0.8.0", +] + [[package]] name = "parking_lot_core" version = "0.6.2" @@ -4182,7 +4252,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b876b1b9e7ac6e1a74a6da34d25c42e17e8862aa409cbbbdcfc8d86c6f3bc62b" dependencies = [ "cfg-if", - "cloudabi", + "cloudabi 0.0.3", "libc", "redox_syscall", "rustc_version", @@ -4197,7 +4267,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" dependencies = [ "cfg-if", - "cloudabi", + "cloudabi 0.0.3", + "libc", + "redox_syscall", + "smallvec 1.4.1", + "winapi 0.3.9", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c361aa727dd08437f2f1447be8b59a33b0edd15e0fcee698f935613d9efbca9b" +dependencies = [ + "cfg-if", + "cloudabi 0.1.0", + "instant", "libc", "redox_syscall", "smallvec 1.4.1", @@ -4334,10 +4419,10 @@ dependencies = [ "env_logger", "futures 0.3.5", "futures-timer 3.0.2", - "log 0.4.8", + "log 0.4.11", "maplit", "parity-scale-codec", - "parking_lot 0.10.2", + "parking_lot 0.11.0", "polkadot-network", "polkadot-network-bridge", "polkadot-node-primitives", @@ -4345,12 +4430,42 @@ dependencies = [ "polkadot-node-subsystem-test-helpers", "polkadot-primitives", "sc-network", - "smol 0.2.0", + "smol 0.3.3", "smol-timeout", "sp-core", "streamunordered", ] +[[package]] +name = "polkadot-availability-distribution" +version = "0.1.0" +dependencies = [ + "assert_matches", + "bitvec", + "derive_more 0.99.9", + "env_logger", + "futures 0.3.5", + "futures-timer 3.0.2", + "log 0.4.11", + "parity-scale-codec", + "parking_lot 0.11.0", + "polkadot-erasure-coding", + "polkadot-network", + "polkadot-network-bridge", + "polkadot-node-primitives", + "polkadot-node-subsystem", + "polkadot-node-subsystem-test-helpers", + "polkadot-primitives", + "sc-keystore", + "sc-network", + "smallvec 1.4.1", + "smol-timeout", + "sp-core", + "sp-keyring", + "sp-staking", + "streamunordered", +] + [[package]] name = "polkadot-availability-store" version = "0.8.22" @@ -4361,7 +4476,7 @@ dependencies = [ "kvdb", "kvdb-memorydb", "kvdb-rocksdb", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.9.0", "polkadot-erasure-coding", @@ -4383,7 +4498,7 @@ version = "0.8.22" dependencies = [ "frame-benchmarking-cli", "futures 0.3.5", - "log 0.4.8", + "log 0.4.11", "polkadot-service", "polkadot-service-new", "sc-cli", @@ -4409,7 +4524,7 @@ version = "0.8.22" dependencies = [ "futures 0.3.5", "futures-timer 2.0.2", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "polkadot-cli", "polkadot-network", @@ -4463,7 +4578,7 @@ dependencies = [ "exit-future", "futures 0.3.5", "futures-timer 2.0.2", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.9.0", "polkadot-availability-store", @@ -4490,7 +4605,7 @@ dependencies = [ "assert_matches", "futures 0.3.5", "futures-timer 3.0.2", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", "polkadot-node-primitives", @@ -4508,7 +4623,7 @@ name = "polkadot-network-test" version = "0.8.22" dependencies = [ "futures 0.3.5", - "log 0.4.8", + "log 0.4.11", "parking_lot 0.10.2", "polkadot-test-runtime-client", "rand 0.7.3", @@ -4534,7 +4649,7 @@ dependencies = [ "kvdb", "kvdb-memorydb", "kvdb-rocksdb", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "polkadot-erasure-coding", "polkadot-node-subsystem", @@ -4552,7 +4667,7 @@ dependencies = [ "bitvec", "derive_more 0.99.9", "futures 0.3.5", - "log 0.4.8", + "log 0.4.11", "polkadot-erasure-coding", "polkadot-node-primitives", "polkadot-node-subsystem", @@ -4575,7 +4690,7 @@ dependencies = [ "bitvec", "derive_more 0.99.9", "futures 0.3.5", - "log 0.4.8", + "log 0.4.11", "polkadot-node-subsystem", "polkadot-node-subsystem-util", "polkadot-primitives", @@ -4590,7 +4705,7 @@ dependencies = [ "assert_matches", "derive_more 0.99.9", "futures 0.3.5", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "polkadot-node-primitives", "polkadot-node-subsystem", @@ -4621,7 +4736,7 @@ version = "0.1.0" dependencies = [ "futures 0.3.5", "futures-timer 3.0.2", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "polkadot-node-subsystem", "polkadot-overseer", @@ -4649,7 +4764,7 @@ dependencies = [ "derive_more 0.99.9", "futures 0.3.5", "lazy_static", - "log 0.4.8", + "log 0.4.11", "polkadot-node-subsystem", "polkadot-node-subsystem-util", "polkadot-primitives", @@ -4691,7 +4806,7 @@ dependencies = [ "derive_more 0.99.9", "futures 0.3.5", "futures-timer 3.0.2", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", "pin-project", @@ -4712,7 +4827,7 @@ dependencies = [ "derive_more 0.99.9", "futures 0.3.5", "futures-timer 3.0.2", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", "pin-project", @@ -4734,7 +4849,7 @@ dependencies = [ "derive_more 0.99.9", "futures 0.3.5", "futures-timer 3.0.2", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", "pin-project", @@ -4759,7 +4874,7 @@ dependencies = [ "futures 0.3.5", "futures-timer 3.0.2", "kv-log-macro", - "log 0.4.8", + "log 0.4.11", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-primitives", @@ -4774,7 +4889,7 @@ version = "0.8.22" dependencies = [ "derive_more 0.99.9", "futures 0.3.5", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", "polkadot-core-primitives", @@ -4795,7 +4910,7 @@ dependencies = [ "assert_matches", "futures 0.3.5", "futures-timer 3.0.2", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", "polkadot-node-primitives", @@ -5033,7 +5148,7 @@ dependencies = [ "hex-literal", "kusama-runtime", "lazy_static", - "log 0.4.8", + "log 0.4.11", "pallet-babe", "pallet-im-online", "pallet-staking", @@ -5094,7 +5209,7 @@ dependencies = [ "hex-literal", "kusama-runtime", "lazy_static", - "log 0.4.8", + "log 0.4.11", "pallet-babe", "pallet-im-online", "pallet-staking", @@ -5153,7 +5268,7 @@ dependencies = [ "futures 0.3.5", "futures-timer 3.0.2", "indexmap", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", "polkadot-node-primitives", @@ -5267,7 +5382,7 @@ dependencies = [ "futures 0.1.29", "futures 0.3.5", "hex", - "log 0.4.8", + "log 0.4.11", "pallet-balances", "pallet-staking", "pallet-transaction-payment", @@ -5317,7 +5432,7 @@ dependencies = [ "exit-future", "futures 0.3.5", "futures-timer 2.0.2", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.9.0", "polkadot-availability-store", @@ -5515,7 +5630,7 @@ dependencies = [ "bytes 0.5.5", "heck", "itertools 0.8.2", - "log 0.4.8", + "log 0.4.11", "multimap", "petgraph", "prost", @@ -5617,7 +5732,7 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c618c47cd3ebd209790115ab837de41425723956ad3ce2e6a7f09890947cacb9" dependencies = [ - "cloudabi", + "cloudabi 0.0.3", "fuchsia-cprng", "libc", "rand_core 0.3.1", @@ -5745,7 +5860,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071" dependencies = [ - "cloudabi", + "cloudabi 0.0.3", "fuchsia-cprng", "libc", "rand_core 0.4.2", @@ -5885,7 +6000,7 @@ version = "0.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9ba8aaf5fe7cf307c6dbdaeed85478961d29e25e3bee5169e11b92fa9f027a8" dependencies = [ - "log 0.4.8", + "log 0.4.11", "rustc-hash", "smallvec 1.4.1", ] @@ -6165,7 +6280,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cac94b333ee2aac3284c5b8a1b7fb4dd11cba88c244e3fe33cdbd047af0eb693" dependencies = [ "base64 0.12.3", - "log 0.4.8", + "log 0.4.11", "ring", "sct", "webpki", @@ -6219,7 +6334,7 @@ dependencies = [ "futures 0.3.5", "futures-timer 3.0.2", "libp2p", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "prost", "prost-build", @@ -6243,7 +6358,7 @@ source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df dependencies = [ "futures 0.3.5", "futures-timer 3.0.2", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "sc-block-builder", "sc-client-api", @@ -6317,7 +6432,7 @@ dependencies = [ "fdlimit", "futures 0.3.5", "lazy_static", - "log 0.4.8", + "log 0.4.11", "names", "nix 0.17.0", "parity-util-mem", @@ -6357,7 +6472,7 @@ dependencies = [ "hex-literal", "kvdb", "lazy_static", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", "sc-executor", @@ -6392,7 +6507,7 @@ dependencies = [ "kvdb-memorydb", "kvdb-rocksdb", "linked-hash-map", - "log 0.4.8", + "log 0.4.11", "parity-db", "parity-scale-codec", "parity-util-mem", @@ -6431,7 +6546,7 @@ dependencies = [ "fork-tree", "futures 0.3.5", "futures-timer 3.0.2", - "log 0.4.8", + "log 0.4.11", "merlin", "num-bigint", "num-rational", @@ -6510,7 +6625,7 @@ source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df dependencies = [ "futures 0.3.5", "futures-timer 3.0.2", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", "sc-client-api", @@ -6531,7 +6646,7 @@ name = "sc-consensus-uncles" version = "0.8.0-rc5" source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df8833c55bc359e0" dependencies = [ - "log 0.4.8", + "log 0.4.11", "sc-client-api", "sp-authorship", "sp-consensus", @@ -6548,7 +6663,7 @@ dependencies = [ "derive_more 0.99.9", "lazy_static", "libsecp256k1", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parity-wasm", "parking_lot 0.10.2", @@ -6574,7 +6689,7 @@ version = "0.8.0-rc5" source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df8833c55bc359e0" dependencies = [ "derive_more 0.99.9", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parity-wasm", "sp-allocator", @@ -6590,7 +6705,7 @@ name = "sc-executor-wasmi" version = "0.8.0-rc5" source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df8833c55bc359e0" dependencies = [ - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "sc-executor-common", "sp-allocator", @@ -6607,7 +6722,7 @@ source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df dependencies = [ "cranelift-codegen", "cranelift-wasm", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parity-wasm", "sc-executor-common", @@ -6631,7 +6746,7 @@ dependencies = [ "fork-tree", "futures 0.3.5", "futures-timer 3.0.2", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", "pin-project", @@ -6670,7 +6785,7 @@ dependencies = [ "jsonrpc-core-client", "jsonrpc-derive", "jsonrpc-pubsub", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "sc-finality-grandpa", "sc-rpc", @@ -6686,7 +6801,7 @@ source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df dependencies = [ "ansi_term 0.12.1", "futures 0.3.5", - "log 0.4.8", + "log 0.4.11", "parity-util-mem", "sc-client-api", "sc-network", @@ -6753,7 +6868,7 @@ dependencies = [ "libp2p", "linked-hash-map", "linked_hash_set", - "log 0.4.8", + "log 0.4.11", "lru", "nohash-hasher", "parity-scale-codec", @@ -6792,7 +6907,7 @@ dependencies = [ "futures 0.3.5", "futures-timer 3.0.2", "libp2p", - "log 0.4.8", + "log 0.4.11", "lru", "sc-network", "sp-runtime", @@ -6808,7 +6923,7 @@ dependencies = [ "futures 0.3.5", "futures-timer 3.0.2", "libp2p", - "log 0.4.8", + "log 0.4.11", "parking_lot 0.10.2", "rand 0.7.3", "sc-block-builder", @@ -6837,7 +6952,7 @@ dependencies = [ "futures-timer 3.0.2", "hyper 0.13.6", "hyper-rustls", - "log 0.4.8", + "log 0.4.11", "num_cpus", "parity-scale-codec", "parking_lot 0.10.2", @@ -6860,7 +6975,7 @@ source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df dependencies = [ "futures 0.3.5", "libp2p", - "log 0.4.8", + "log 0.4.11", "serde_json", "sp-utils", "wasm-timer", @@ -6871,7 +6986,7 @@ name = "sc-proposer-metrics" version = "0.8.0-rc5" source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df8833c55bc359e0" dependencies = [ - "log 0.4.8", + "log 0.4.11", "substrate-prometheus-endpoint", ] @@ -6884,7 +6999,7 @@ dependencies = [ "hash-db", "jsonrpc-core", "jsonrpc-pubsub", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", "sc-block-builder", @@ -6918,7 +7033,7 @@ dependencies = [ "jsonrpc-core-client", "jsonrpc-derive", "jsonrpc-pubsub", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", "serde", @@ -6941,7 +7056,7 @@ dependencies = [ "jsonrpc-ipc-server", "jsonrpc-pubsub", "jsonrpc-ws-server", - "log 0.4.8", + "log 0.4.11", "serde", "serde_json", "sp-runtime", @@ -6961,7 +7076,7 @@ dependencies = [ "hash-db", "jsonrpc-pubsub", "lazy_static", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parity-util-mem", "parking_lot 0.10.2", @@ -7012,7 +7127,7 @@ name = "sc-state-db" version = "0.8.0-rc5" source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df8833c55bc359e0" dependencies = [ - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parity-util-mem", "parity-util-mem-derive", @@ -7029,7 +7144,7 @@ dependencies = [ "futures 0.3.5", "futures-timer 3.0.2", "libp2p", - "log 0.4.8", + "log 0.4.11", "parking_lot 0.10.2", "pin-project", "rand 0.7.3", @@ -7048,7 +7163,7 @@ version = "2.0.0-rc5" source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df8833c55bc359e0" dependencies = [ "erased-serde", - "log 0.4.8", + "log 0.4.11", "parking_lot 0.10.2", "rustc-hash", "sc-telemetry", @@ -7067,7 +7182,7 @@ dependencies = [ "derive_more 0.99.9", "futures 0.3.5", "linked-hash-map", - "log 0.4.8", + "log 0.4.11", "parity-util-mem", "parking_lot 0.10.2", "retain_mut", @@ -7089,7 +7204,7 @@ dependencies = [ "futures 0.3.5", "futures-diagnose", "intervalier", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parity-util-mem", "parking_lot 0.10.2", @@ -7337,7 +7452,7 @@ dependencies = [ "cfg-if", "enum_primitive", "libc", - "log 0.4.8", + "log 0.4.11", "memrange", "nix 0.10.0", "quick-error", @@ -7452,7 +7567,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "620cbb3c6e34da57d3a248cda0cd01cd5848164dc062e764e65d06fe3ea7aed5" dependencies = [ "async-task", - "blocking", + "blocking 0.4.7", "concurrent-queue", "fastrand", "futures-io", @@ -7468,15 +7583,18 @@ dependencies = [ [[package]] name = "smol" -version = "0.2.0" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "346a94824d48ed7c5fc7247f3cbbf0317bdfe15fc39d08f9262609cccce61254" +checksum = "67583f4ccc13bbb105a0752058d8ad66c47753d85445952809bcaca891954f83" dependencies = [ + "async-channel", + "async-executor", "async-io", - "blocking", - "multitask", + "blocking 0.5.0", + "cfg-if", + "easy-parallel", + "futures-lite", "num_cpus", - "once_cell", ] [[package]] @@ -7530,7 +7648,7 @@ dependencies = [ "flate2", "futures 0.3.5", "httparse", - "log 0.4.8", + "log 0.4.11", "rand 0.7.3", "sha-1", ] @@ -7541,7 +7659,7 @@ version = "2.0.0-rc5" source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df8833c55bc359e0" dependencies = [ "derive_more 0.99.9", - "log 0.4.8", + "log 0.4.11", "sp-core", "sp-std", "sp-wasm-interface", @@ -7640,7 +7758,7 @@ version = "2.0.0-rc5" source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df8833c55bc359e0" dependencies = [ "derive_more 0.99.9", - "log 0.4.8", + "log 0.4.11", "lru", "parity-scale-codec", "parking_lot 0.10.2", @@ -7669,7 +7787,7 @@ dependencies = [ "futures 0.3.5", "futures-timer 3.0.2", "libp2p", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", "serde", @@ -7758,7 +7876,7 @@ dependencies = [ "impl-serde 0.3.1", "lazy_static", "libsecp256k1", - "log 0.4.8", + "log 0.4.11", "merlin", "num-traits 0.2.12", "parity-scale-codec", @@ -7820,7 +7938,7 @@ version = "2.0.0-rc5" source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df8833c55bc359e0" dependencies = [ "finality-grandpa", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "serde", "sp-api", @@ -7860,7 +7978,7 @@ dependencies = [ "futures 0.3.5", "hash-db", "libsecp256k1", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", "sp-core", @@ -7923,7 +8041,7 @@ version = "2.0.0-rc5" source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df8833c55bc359e0" dependencies = [ "backtrace", - "log 0.4.8", + "log 0.4.11", ] [[package]] @@ -7943,7 +8061,7 @@ dependencies = [ "either", "hash256-std-hasher", "impl-trait-for-tuples", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "parity-util-mem", "paste", @@ -8023,7 +8141,7 @@ source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df dependencies = [ "hash-db", "itertools 0.9.0", - "log 0.4.8", + "log 0.4.11", "num-traits 0.2.12", "parity-scale-codec", "parking_lot 0.10.2", @@ -8073,7 +8191,7 @@ name = "sp-tracing" version = "2.0.0-rc5" source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df8833c55bc359e0" dependencies = [ - "log 0.4.8", + "log 0.4.11", "rental", "tracing", ] @@ -8085,7 +8203,7 @@ source = "git+https://github.com/paritytech/substrate#eb0e05e126a027499d0993c0df dependencies = [ "derive_more 0.99.9", "futures 0.3.5", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "serde", "sp-api", @@ -8276,7 +8394,7 @@ dependencies = [ "js-sys", "kvdb-web", "libp2p-wasm-ext", - "log 0.4.8", + "log 0.4.11", "rand 0.6.5", "rand 0.7.3", "sc-chain-spec", @@ -8306,7 +8424,7 @@ dependencies = [ "jsonrpc-core", "jsonrpc-core-client", "jsonrpc-derive", - "log 0.4.8", + "log 0.4.11", "parity-scale-codec", "sc-client-api", "sc-rpc-api", @@ -8328,7 +8446,7 @@ dependencies = [ "derive_more 0.99.9", "futures-util", "hyper 0.13.6", - "log 0.4.8", + "log 0.4.11", "prometheus", "tokio 0.2.21", ] @@ -8369,7 +8487,7 @@ dependencies = [ "frame-support", "frame-system", "frame-system-rpc-runtime-api", - "log 0.4.8", + "log 0.4.11", "memory-db", "pallet-babe", "pallet-timestamp", @@ -8442,7 +8560,7 @@ dependencies = [ "cfg-if", "lazy_static", "libc", - "log 0.4.8", + "log 0.4.11", "region", "rustc-demangle", "smallvec 1.4.1", @@ -8836,7 +8954,7 @@ checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" dependencies = [ "bytes 0.4.12", "futures 0.1.29", - "log 0.4.8", + "log 0.4.11", ] [[package]] @@ -8872,7 +8990,7 @@ dependencies = [ "crossbeam-utils", "futures 0.1.29", "lazy_static", - "log 0.4.8", + "log 0.4.11", "mio", "num_cpus", "parking_lot 0.9.0", @@ -8949,7 +9067,7 @@ dependencies = [ "crossbeam-utils", "futures 0.1.29", "lazy_static", - "log 0.4.8", + "log 0.4.11", "num_cpus", "slab", "tokio-executor 0.1.10", @@ -8975,7 +9093,7 @@ checksum = "e2a0b10e610b39c38b031a2fcab08e4b82f16ece36504988dcbd81dbba650d82" dependencies = [ "bytes 0.4.12", "futures 0.1.29", - "log 0.4.8", + "log 0.4.11", "mio", "tokio-codec", "tokio-io", @@ -8992,7 +9110,7 @@ dependencies = [ "futures 0.1.29", "iovec", "libc", - "log 0.4.8", + "log 0.4.11", "mio", "mio-uds", "tokio-codec", @@ -9009,7 +9127,7 @@ dependencies = [ "bytes 0.5.5", "futures-core", "futures-sink", - "log 0.4.8", + "log 0.4.11", "pin-project-lite", "tokio 0.2.21", ] @@ -9074,7 +9192,7 @@ checksum = "39f1a9a9252d38c5337cf0c5392988821a5cf1b2103245016968f2ab41de9e38" dependencies = [ "hash-db", "hashbrown 0.8.0", - "log 0.4.8", + "log 0.4.11", "rustc-hex", "smallvec 1.4.1", ] @@ -9280,7 +9398,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6395efa4784b027708f7451087e647ec73cc74f5d9bc2e418404248d679a230" dependencies = [ "futures 0.1.29", - "log 0.4.8", + "log 0.4.11", "try-lock", ] @@ -9290,7 +9408,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" dependencies = [ - "log 0.4.8", + "log 0.4.11", "try-lock", ] @@ -9320,7 +9438,7 @@ checksum = "3e53963b583d18a5aa3aaae4b4c1cb535218246131ba22a71f05b518098571df" dependencies = [ "bumpalo", "lazy_static", - "log 0.4.8", + "log 0.4.11", "proc-macro2 1.0.18", "quote 1.0.7", "syn 1.0.33", @@ -9454,7 +9572,7 @@ dependencies = [ "file-per-thread-logger", "indexmap", "libc", - "log 0.4.8", + "log 0.4.11", "more-asserts", "rayon", "serde", @@ -9480,7 +9598,7 @@ dependencies = [ "cranelift-native", "cranelift-wasm", "gimli 0.21.0", - "log 0.4.8", + "log 0.4.11", "more-asserts", "object 0.20.0", "region", @@ -9540,7 +9658,7 @@ dependencies = [ "indexmap", "lazy_static", "libc", - "log 0.4.8", + "log 0.4.11", "memoffset", "more-asserts", "region", @@ -9752,7 +9870,7 @@ dependencies = [ "byteorder", "bytes 0.4.12", "httparse", - "log 0.4.8", + "log 0.4.11", "mio", "mio-extras", "rand 0.7.3", @@ -9789,7 +9907,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd37e58a1256a0b328ce9c67d8b62ecdd02f4803ba443df478835cb1a41a637c" dependencies = [ "futures 0.3.5", - "log 0.4.8", + "log 0.4.11", "nohash-hasher", "parking_lot 0.10.2", "rand 0.7.3", diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index 60d2f5671d..be1bc74858 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -57,6 +57,7 @@ members = [ "node/network/pov-distribution", "node/network/statement-distribution", "node/network/bitfield-distribution", + "node/network/availability-distribution", "node/overseer", "node/primitives", "node/service", diff --git a/polkadot/node/network/availability-distribution/Cargo.toml b/polkadot/node/network/availability-distribution/Cargo.toml new file mode 100644 index 0000000000..c2d313c3cf --- /dev/null +++ b/polkadot/node/network/availability-distribution/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "polkadot-availability-distribution" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" + +[dependencies] +futures = "0.3.5" +log = "0.4.11" +streamunordered = "0.5.1" +codec = { package="parity-scale-codec", version = "1.3.4", features = ["std"] } +node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" } +polkadot-primitives = { path = "../../../primitives" } +polkadot-erasure-coding = { path = "../../../erasure-coding" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } +polkadot-network-bridge = { path = "../../network/bridge" } +polkadot-network = { path = "../../../network" } +sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } +derive_more = "0.99.9" +sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } + +[dev-dependencies] +polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpers", path = "../../subsystem-test-helpers" } +bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] } +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } +sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } +parking_lot = "0.11.0" +futures-timer = "3.0.2" +smol-timeout = "0.1.0" +env_logger = "0.7.1" +assert_matches = "1.3.0" +smallvec = "1" \ No newline at end of file diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs new file mode 100644 index 0000000000..b3934ded6f --- /dev/null +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -0,0 +1,1092 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The availability distribution +//! +//! Transforms `AvailableData` into erasure chunks, which are distributed to peers +//! who are interested in the relevant candidates. +//! Gossip messages received from other peers are verified and gossiped to interested +//! peers. Verified in this context means, the erasure chunks contained merkle proof +//! is checked. + +use codec::{Decode, Encode}; +use futures::{channel::oneshot, FutureExt}; + +use keystore::KeyStorePtr; +use sp_core::{ + crypto::Public, + traits::BareCryptoStore, +}; +use sc_keystore as keystore; + +use node_primitives::{ProtocolId, View}; + +use log::{trace, warn}; +use polkadot_erasure_coding::branch_hash; +use polkadot_primitives::v1::{ + PARACHAIN_KEY_TYPE_ID, + BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk, + Hash as Hash, HashT, Id as ParaId, + ValidatorId, ValidatorIndex, +}; +use polkadot_subsystem::messages::*; +use polkadot_subsystem::{ + errors::{ChainApiError, RuntimeApiError}, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, + SubsystemContext, SubsystemError, +}; +use sc_network::ReputationChange as Rep; +use sp_staking::SessionIndex; +use std::collections::{HashMap, HashSet}; +use std::io; +use std::iter; + +const TARGET: &'static str = "avad"; + +#[derive(Debug, derive_more::From)] +enum Error { + #[from] + Erasure(polkadot_erasure_coding::Error), + #[from] + Io(io::Error), + #[from] + Oneshot(oneshot::Canceled), + #[from] + Subsystem(SubsystemError), + #[from] + RuntimeApi(RuntimeApiError), + #[from] + ChainApi(ChainApiError), +} + +type Result = std::result::Result; + +const COST_MERKLE_PROOF_INVALID: Rep = Rep::new(-100, "Merkle proof was invalid"); +const COST_NOT_A_LIVE_CANDIDATE: Rep = Rep::new(-51, "Candidate is not live"); +const COST_MESSAGE_NOT_DECODABLE: Rep = Rep::new(-100, "Message is not decodable"); +const COST_PEER_DUPLICATE_MESSAGE: Rep = Rep::new(-500, "Peer sent identical messages"); +const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::new(15, "Valid message with new information"); +const BENEFIT_VALID_MESSAGE: Rep = Rep::new(10, "Valid message"); + +/// Checked signed availability bitfield that is distributed +/// to other peers. +#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq, Hash)] +pub struct AvailabilityGossipMessage { + /// Anchor hash of the candidate the `ErasureChunk` is associated to. + pub candidate_hash: Hash, + /// The erasure chunk, a encoded information part of `AvailabilityData`. + pub erasure_chunk: ErasureChunk, +} + +/// Data used to track information of peers and relay parents the +/// overseer ordered us to work on. +#[derive(Default, Clone, Debug)] +struct ProtocolState { + /// Track all active peers and their views + /// to determine what is relevant to them. + peer_views: HashMap, + + /// Our own view. + view: View, + + /// Caches a mapping of relay parents or ancestor to live candidate receipts. + /// Allows fast intersection of live candidates with views and consecutive unioning. + /// Maps relay parent / ancestor -> live candidate receipts + its hash. + receipts: HashMap>, + + /// Allow reverse caching of view checks. + /// Maps candidate hash -> relay parent for extracting meta information from `PerRelayParent`. + /// Note that the presence of this is not sufficient to determine if deletion is OK, i.e. + /// two histories could cover this. + reverse: HashMap, + + /// Keeps track of which candidate receipts are required due to ancestors of which relay parents + /// of our view. + /// Maps ancestor -> relay parents in view + ancestry: HashMap>, + + /// Track things needed to start and stop work on a particular relay parent. + per_relay_parent: HashMap, + + /// Track data that is specific to a candidate. + per_candidate: HashMap, +} + +#[derive(Debug, Clone, Default)] +struct PerCandidate { + /// A Candidate and a set of known erasure chunks in form of messages to be gossiped / distributed if the peer view wants that. + /// This is _across_ peers and not specific to a particular one. + /// candidate hash + erasure chunk index -> gossip message + message_vault: HashMap, + + /// Track received candidate hashes and chunk indices from peers. + received_messages: HashMap>, + + /// Track already sent candidate hashes and the erasure chunk index to the peers. + sent_messages: HashMap>, + + /// The set of validators. + validators: Vec, + + /// If this node is a validator, note the index in the validator set. + validator_index: Option, +} + +#[derive(Debug, Clone, Default)] +struct PerRelayParent { + /// Set of `K` ancestors for this relay parent. + ancestors: Vec, +} + +impl ProtocolState { + /// Collects the relay_parents ancestors including the relay parents themselfes. + fn extend_with_ancestors<'a>( + &'a self, + relay_parents: impl IntoIterator + 'a, + ) -> HashSet { + relay_parents + .into_iter() + .map(|relay_parent| { + self.per_relay_parent + .get(relay_parent) + .into_iter() + .map(|per_relay_parent| per_relay_parent.ancestors.iter().cloned()) + .flatten() + .chain(iter::once(*relay_parent)) + }) + .flatten() + .collect::>() + } + + /// Unionize all cached entries for the given relay parents and its ancestors. + /// Ignores all non existent relay parents, so this can be used directly with a peers view. + /// Returns a map from candidate hash -> receipt + fn cached_live_candidates_unioned<'a>( + &'a self, + relay_parents: impl IntoIterator + 'a, + ) -> HashMap { + let relay_parents_and_ancestors = self.extend_with_ancestors(relay_parents); + relay_parents_and_ancestors + .into_iter() + .filter_map(|relay_parent_or_ancestor| self.receipts.get(&relay_parent_or_ancestor)) + .map(|receipt_set| receipt_set.into_iter()) + .flatten() + .map(|(receipt_hash, receipt)| (receipt_hash.clone(), receipt.clone())) + .collect::>() + } + + async fn add_relay_parent( + &mut self, + ctx: &mut Context, + relay_parent: Hash, + validators: Vec, + validator_index: Option, + ) -> Result<()> + where + Context: SubsystemContext, + { + let candidates = + query_live_candidates(ctx, self, std::iter::once(relay_parent)).await?; + + // register the relation of relay_parent to candidate.. + // ..and the reverse association. + for (relay_parent_or_ancestor, (receipt_hash, receipt)) in candidates.clone() { + self + .reverse + .insert(receipt_hash.clone(), relay_parent_or_ancestor.clone()); + let per_candidate = self.per_candidate.entry(receipt_hash.clone()) + .or_default(); + per_candidate.validator_index = validator_index.clone(); + per_candidate.validators = validators.clone(); + + self + .receipts + .entry(relay_parent_or_ancestor) + .or_default() + .insert((receipt_hash, receipt)); + } + + // collect the ancestors again from the hash map + let ancestors = candidates + .iter() + .filter_map(|(ancestor_or_relay_parent, _receipt)| { + if ancestor_or_relay_parent == &relay_parent { + None + } else { + Some(*ancestor_or_relay_parent) + } + }) + .collect::>(); + + // mark all the ancestors as "needed" by this newly added relay parent + for ancestor in ancestors.iter() { + self.ancestry + .entry(ancestor.clone()) + .or_default() + .insert(relay_parent); + } + + self + .per_relay_parent + .entry(relay_parent) + .or_default() + .ancestors = ancestors; + + Ok(()) + } + + fn remove_relay_parent(&mut self, relay_parent: &Hash) -> Result<()> { + // we might be ancestor of some other relay_parent + if let Some(ref mut descendants) = self.ancestry.get_mut(relay_parent) { + // if we were the last user, and it is + // not explicitly set to be worked on by the overseer + if descendants.is_empty() { + // remove from the ancestry index + self.ancestry.remove(relay_parent); + // and also remove the actual receipt + self.receipts.remove(relay_parent); + self.per_candidate.remove(relay_parent); + } + } + if let Some(per_relay_parent) = self.per_relay_parent.remove(relay_parent) { + // remove all "references" from the hash maps and sets for all ancestors + for ancestor in per_relay_parent.ancestors { + // one of our decendants might be ancestor of some other relay_parent + if let Some(ref mut descendants) = self.ancestry.get_mut(&ancestor) { + // we do not need this descendant anymore + descendants.remove(&relay_parent); + // if we were the last user, and it is + // not explicitly set to be worked on by the overseer + if descendants.is_empty() && !self.per_relay_parent.contains_key(&ancestor) { + // remove from the ancestry index + self.ancestry.remove(&ancestor); + // and also remove the actual receipt + self.receipts.remove(&ancestor); + self.per_candidate.remove(&ancestor); + } + } + } + } + Ok(()) + } +} + +fn network_update_message(n: NetworkBridgeEvent) -> AllMessages { + AllMessages::AvailabilityDistribution(AvailabilityDistributionMessage::NetworkBridgeUpdate(n)) +} + +/// Deal with network bridge updates and track what needs to be tracked +/// which depends on the message type received. +async fn handle_network_msg( + ctx: &mut Context, + keystore: KeyStorePtr, + state: &mut ProtocolState, + bridge_message: NetworkBridgeEvent, +) -> Result<()> +where + Context: SubsystemContext, +{ + match bridge_message { + NetworkBridgeEvent::PeerConnected(peerid, _role) => { + // insert if none already present + state.peer_views.entry(peerid).or_default(); + } + NetworkBridgeEvent::PeerDisconnected(peerid) => { + // get rid of superfluous data + state.peer_views.remove(&peerid); + } + NetworkBridgeEvent::PeerViewChange(peerid, view) => { + handle_peer_view_change(ctx, state, peerid, view).await?; + } + NetworkBridgeEvent::OurViewChange(view) => { + handle_our_view_change(ctx, keystore, state, view).await?; + } + NetworkBridgeEvent::PeerMessage(remote, bytes) => { + if let Ok(gossiped_availability) = + AvailabilityGossipMessage::decode(&mut (bytes.as_slice())) + { + trace!( + target: TARGET, + "Received availability gossip from peer {:?}", + &remote + ); + process_incoming_peer_message(ctx, state, remote, gossiped_availability).await?; + } else { + modify_reputation(ctx, remote, COST_MESSAGE_NOT_DECODABLE).await?; + } + } + } + Ok(()) +} + + +/// Handle the changes necessary when our view changes. +async fn handle_our_view_change( + ctx: &mut Context, + keystore: KeyStorePtr, + state: &mut ProtocolState, + view: View, +) -> Result<()> +where + Context: SubsystemContext, +{ + let old_view = std::mem::replace(&mut (state.view), view); + + // needed due to borrow rules + let view = state.view.clone(); + let added = view.difference(&old_view).collect::>(); + + // add all the relay parents and fill the cache + for added in added.iter() { + let added = **added; + let validators = query_validators(ctx, added).await?; + let validator_index = obtain_our_validator_index( + &validators, + keystore.clone(), + ); + state.add_relay_parent(ctx, added, validators, validator_index).await?; + } + + // handle all candidates + for (candidate_hash, _receipt) in state.cached_live_candidates_unioned(added) { + let per_candidate = state + .per_candidate + .entry(candidate_hash) + .or_default(); + + // assure the node has the validator role + if per_candidate.validator_index.is_none() { + continue; + }; + + // check if the availability is present in the store exists + if !query_data_availability(ctx, candidate_hash).await? { + continue; + } + + let validator_count = per_candidate.validators.len(); + + // obtain interested peers in the candidate hash + let peers: Vec = state + .peer_views + .clone() + .into_iter() + .filter(|(_peer, view)| { + // collect all direct interests of a peer w/o ancestors + state + .cached_live_candidates_unioned(view.0.iter()) + .contains_key(&candidate_hash) + }) + .map(|(peer, _view)| peer.clone()) + .collect(); + + // distribute all erasure messages to interested peers + for chunk_index in 0u32..(validator_count as u32) { + + // only the peers which did not receive this particular erasure chunk + let per_candidate = state + .per_candidate + .entry(candidate_hash) + .or_default(); + + // obtain the chunks from the cache, if not fallback + // and query the availability store + let message_id = (candidate_hash, chunk_index); + let erasure_chunk = if let Some(message) = per_candidate.message_vault.get(&chunk_index) { + message.erasure_chunk.clone() + } else if let Some(erasure_chunk) = query_chunk(ctx, candidate_hash, chunk_index as ValidatorIndex).await? { + erasure_chunk + } else { + continue; + }; + + debug_assert_eq!(erasure_chunk.index, chunk_index); + + let peers = peers + .iter() + .filter(|peer| { + // only pick those which were not sent before + !per_candidate + .sent_messages + .get(*peer) + .filter(|set| { + set.contains(&message_id) + }) + .is_some() + }) + .map(|peer| peer.clone()) + .collect::>(); + let message = AvailabilityGossipMessage { + candidate_hash, + erasure_chunk, + }; + + send_tracked_gossip_message_to_peers(ctx, per_candidate, peers, message).await?; + } + } + + // cleanup the removed relay parents and their states + let removed = old_view.difference(&view).collect::>(); + for removed in removed { + state.remove_relay_parent(&removed)?; + } + Ok(()) +} + +#[inline(always)] +async fn send_tracked_gossip_message_to_peers( + ctx: &mut Context, + per_candidate: &mut PerCandidate, + peers: Vec, + message: AvailabilityGossipMessage, +) -> Result<()> +where + Context: SubsystemContext, +{ + send_tracked_gossip_messages_to_peers(ctx, per_candidate, peers, iter::once(message)).await +} + +#[inline(always)] +async fn send_tracked_gossip_messages_to_peer( + ctx: &mut Context, + per_candidate: &mut PerCandidate, + peer: PeerId, + message_iter: impl IntoIterator, +) -> Result<()> +where + Context: SubsystemContext, +{ + send_tracked_gossip_messages_to_peers(ctx, per_candidate, vec![peer], message_iter).await +} + +async fn send_tracked_gossip_messages_to_peers( + ctx: &mut Context, + per_candidate: &mut PerCandidate, + peers: Vec, + message_iter: impl IntoIterator, +) -> Result<()> +where + Context: SubsystemContext, +{ + if peers.is_empty() { + return Ok(()) + } + for message in message_iter { + for peer in peers.iter() { + let message_id = (message.candidate_hash, message.erasure_chunk.index); + per_candidate + .sent_messages + .entry(peer.clone()) + .or_default() + .insert(message_id); + } + + let encoded = message.encode(); + per_candidate + .message_vault + .insert(message.erasure_chunk.index, message); + + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendMessage( + peers.clone(), + AvailabilityDistributionSubsystem::PROTOCOL_ID, + encoded, + ), + )) + .await + .map_err::(Into::into)?; + } + + Ok(()) +} + +// Send the difference between two views which were not sent +// to that particular peer. +async fn handle_peer_view_change( + ctx: &mut Context, + state: &mut ProtocolState, + origin: PeerId, + view: View, +) -> Result<()> +where + Context: SubsystemContext, +{ + let current = state.peer_views.entry(origin.clone()).or_default(); + + let added: Vec = view.difference(&*current).cloned().collect(); + + *current = view; + + // only contains the intersection of what we are interested and + // the union of all relay parent's candidates. + let added_candidates = state.cached_live_candidates_unioned(added.iter()); + + // Send all messages we've seen before and the peer is now interested + // in to that peer. + + for (candidate_hash, _receipt) in added_candidates { + let per_candidate = state.per_candidate.entry(candidate_hash).or_default(); + + // obtain the relevant chunk indices not sent yet + let messages = ((0 as ValidatorIndex) + ..(per_candidate.validators.len() as ValidatorIndex)) + .into_iter() + .filter_map(|erasure_chunk_index: ValidatorIndex| { + let message_id = (candidate_hash, erasure_chunk_index); + + // try to pick up the message from the message vault + // so we send as much as we have + per_candidate + .message_vault + .get(&erasure_chunk_index) + .filter(|_| { + // check if that erasure chunk was already sent before + if let Some(sent_set) = per_candidate.sent_messages.get(&origin) { + if sent_set.contains(&message_id) { + return false; + } + } + true + }) + }) + .cloned() + .collect::>(); + + send_tracked_gossip_messages_to_peer(ctx, per_candidate, origin.clone(), messages).await?; + } + Ok(()) +} + +/// Obtain the first key which has a signing key. +/// Returns the index within the validator set as `ValidatorIndex`, if there exists one, +/// otherwise, `None` is returned. +fn obtain_our_validator_index( + validators: &[ValidatorId], + keystore: KeyStorePtr, +) -> Option { + let keystore = keystore.read(); + validators.iter().enumerate().find_map(|(idx, validator)| { + if keystore.has_keys(&[(validator.to_raw_vec(), PARACHAIN_KEY_TYPE_ID)]) { + Some(idx as ValidatorIndex) + } else { + None + } + }) +} + +/// Handle an incoming message from a peer. +async fn process_incoming_peer_message( + ctx: &mut Context, + state: &mut ProtocolState, + origin: PeerId, + message: AvailabilityGossipMessage, +) -> Result<()> +where + Context: SubsystemContext, +{ + // obtain the set of candidates we are interested in based on our current view + let live_candidates = state.cached_live_candidates_unioned(state.view.0.iter()); + + // check if the candidate is of interest + let live_candidate = if let Some(live_candidate) = live_candidates.get(&message.candidate_hash) + { + live_candidate + } else { + return modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await; + }; + + // check the merkle proof + let root = &live_candidate.commitments.erasure_root; + let anticipated_hash = if let Ok(hash) = branch_hash( + root, + &message.erasure_chunk.proof, + message.erasure_chunk.index as usize, + ) { + hash + } else { + return modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; + }; + + let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk); + if anticipated_hash != erasure_chunk_hash { + return modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await; + } + + // an internal unique identifier of this message + let message_id = (message.candidate_hash, message.erasure_chunk.index); + + { + let per_candidate = state.per_candidate.entry(message_id.0.clone()).or_default(); + + // check if this particular erasure chunk was already sent by that peer before + { + let received_set = per_candidate + .received_messages + .entry(origin.clone()) + .or_default(); + if received_set.contains(&message_id) { + return modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await; + } else { + received_set.insert(message_id.clone()); + } + } + + // insert into known messages and change reputation + if per_candidate + .message_vault + .insert(message_id.1, message.clone()) + .is_some() + { + modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await?; + } else { + modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await?; + + // save the chunk for our index + if let Some(validator_index) = per_candidate.validator_index { + if message.erasure_chunk.index == validator_index { + if let Err(_e) = store_chunk( + ctx, + message.candidate_hash.clone(), + message.erasure_chunk.index, + message.erasure_chunk.clone(), + ).await? { + warn!(target: TARGET, "Failed to store erasure chunk to availability store"); + } + } + } + }; + } + // condense the peers to the peers with interest on the candidate + let peers = state + .peer_views + .clone() + .into_iter() + .filter(|(_peer, view)| { + // peers view must contain the candidate hash too + state + .cached_live_candidates_unioned(view.0.iter()) + .contains_key(&message_id.0) + }) + .map(|(peer, _)| -> PeerId { peer.clone() }) + .collect::>(); + + let per_candidate = state.per_candidate.entry(message_id.0.clone()).or_default(); + + let peers = peers + .into_iter() + .filter(|peer| { + let peer: PeerId = peer.clone(); + // avoid sending duplicate messages + per_candidate + .sent_messages + .entry(peer) + .or_default() + .contains(&message_id) + }) + .collect::>(); + + // gossip that message to interested peers + send_tracked_gossip_message_to_peers(ctx, per_candidate, peers, message).await +} + +/// The bitfield distribution subsystem. +pub struct AvailabilityDistributionSubsystem { + /// Pointer to a keystore, which is required for determining this nodes validator index. + keystore: KeyStorePtr, +} + +impl AvailabilityDistributionSubsystem { + /// The protocol identifier for bitfield distribution. + const PROTOCOL_ID: ProtocolId = *b"avad"; + + /// Number of ancestors to keep around for the relay-chain heads. + const K: usize = 3; + + /// Create a new instance of the availability distribution. + pub fn new(keystore: KeyStorePtr) -> Self { + Self { keystore } + } + + /// Start processing work as passed on from the Overseer. + async fn run(self, mut ctx: Context) -> Result<()> + where + Context: SubsystemContext, + { + // startup: register the network protocol with the bridge. + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::RegisterEventProducer(Self::PROTOCOL_ID, network_update_message), + )) + .await + .map_err::(Into::into)?; + + // work: process incoming messages from the overseer. + let mut state = ProtocolState::default(); + loop { + let message = ctx.recv().await.map_err::(Into::into)?; + match message { + FromOverseer::Communication { + msg: AvailabilityDistributionMessage::NetworkBridgeUpdate(event), + } => { + if let Err(e) = handle_network_msg( + &mut ctx, + self.keystore.clone(), + &mut state, + event + ).await { + warn!( + target: TARGET, + "Failed to handle incomming network messages: {:?}", e + ); + } + } + FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: _, + deactivated: _, + })) => { + // handled at view change + } + FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {} + FromOverseer::Signal(OverseerSignal::Conclude) => { + return Ok(()); + } + } + } + } +} + +impl Subsystem for AvailabilityDistributionSubsystem +where + Context: SubsystemContext + Sync + Send, +{ + fn start(self, ctx: Context) -> SpawnedSubsystem { + SpawnedSubsystem { + name: "availability-distribution-subsystem", + future: Box::pin(async move { self.run(ctx) }.map(|_| ())), + } + } +} + +/// Obtain all live candidates based on an iterator of relay heads. +async fn query_live_candidates_without_ancestors( + ctx: &mut Context, + relay_parents: impl IntoIterator, +) -> Result> +where + Context: SubsystemContext, +{ + let iter = relay_parents.into_iter(); + let hint = iter.size_hint(); + + let mut live_candidates = HashSet::with_capacity(hint.1.unwrap_or(hint.0)); + for relay_parent in iter { + let paras = query_para_ids(ctx, relay_parent).await?; + for para in paras { + if let Some(ccr) = query_pending_availability(ctx, relay_parent, para).await? { + live_candidates.insert(ccr); + } + } + } + Ok(live_candidates) +} + +/// Obtain all live candidates based on an iterator or relay heads including `k` ancestors. +/// +/// Relay parent. +async fn query_live_candidates( + ctx: &mut Context, + state: &mut ProtocolState, + relay_parents: impl IntoIterator, +) -> Result> +where + Context: SubsystemContext, +{ + let iter = relay_parents.into_iter(); + let hint = iter.size_hint(); + + let capacity = hint.1.unwrap_or(hint.0) * (1 + AvailabilityDistributionSubsystem::K); + let mut live_candidates = + HashMap::::with_capacity(capacity); + + for relay_parent in iter { + + // register one of relay parents (not the ancestors) + let mut ancestors = query_up_to_k_ancestors_in_same_session( + ctx, + relay_parent, + AvailabilityDistributionSubsystem::K, + ) + .await?; + + ancestors.push(relay_parent); + + + // ancestors might overlap, so check the cache too + let unknown = ancestors + .into_iter() + .filter(|relay_parent_or_ancestor| { + // use the ones which we pulled before + // but keep the unknown relay parents + state + .receipts + .get(relay_parent_or_ancestor) + .and_then(|receipts| { + // directly extend the live_candidates with the cached value + live_candidates.extend(receipts.into_iter().map( + |(receipt_hash, receipt)| { + ( + relay_parent, + (receipt_hash.clone(), receipt.clone()), + ) + }, + )); + Some(()) + }) + .is_none() + }) + .collect::>(); + + // query the ones that were not present in the receipts cache + let receipts = query_live_candidates_without_ancestors(ctx, unknown.clone()).await?; + live_candidates.extend( + unknown.into_iter().zip( + receipts + .into_iter() + .map(|receipt| (receipt.hash(), receipt)), + ), + ); + } + Ok(live_candidates) +} + +/// Query all para IDs. +async fn query_para_ids(ctx: &mut Context, relay_parent: Hash) -> Result> +where + Context: SubsystemContext, +{ + let (tx, rx) = oneshot::channel(); + ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx), + ))) + .await + .map_err::(Into::into)?; + + let all_para_ids: Vec<_> = rx + .await??; + + let occupied_para_ids = all_para_ids + .into_iter() + .filter_map(|core_state| { + if let CoreState::Occupied(occupied) = core_state { + Some(occupied.para_id) + } else { + None + } + }) + .collect(); + Ok(occupied_para_ids) +} + +/// Modify the reputation of a peer based on its behavior. +async fn modify_reputation(ctx: &mut Context, peer: PeerId, rep: Rep) -> Result<()> +where + Context: SubsystemContext, +{ + trace!( + target: TARGET, + "Reputation change of {:?} for peer {:?}", + rep, + peer + ); + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer(peer, rep), + )) + .await + .map_err::(Into::into) +} + +/// Query the proof of validity for a particular candidate hash. +async fn query_data_availability( + ctx: &mut Context, + candidate_hash: Hash, +) -> Result +where + Context: SubsystemContext, +{ + let (tx, rx) = oneshot::channel(); + ctx.send_message(AllMessages::AvailabilityStore( + AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx), + )) + .await?; + rx.await.map_err::(Into::into) +} + + +async fn query_chunk( + ctx: &mut Context, + candidate_hash: Hash, + validator_index: ValidatorIndex, +) -> Result> +where + Context: SubsystemContext, +{ + let (tx, rx) = oneshot::channel(); + ctx.send_message(AllMessages::AvailabilityStore( + AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx), + )) + .await?; + rx.await.map_err::(Into::into) +} + + +async fn store_chunk( + ctx: &mut Context, + candidate_hash: Hash, + validator_index: ValidatorIndex, + erasure_chunk: ErasureChunk, +) -> Result> +where + Context: SubsystemContext, +{ + let (tx, rx) = oneshot::channel(); + ctx.send_message(AllMessages::AvailabilityStore( + AvailabilityStoreMessage::StoreChunk(candidate_hash, validator_index, erasure_chunk, tx), + )).await?; + rx.await.map_err::(Into::into) +} + +/// Request the head data for a particular para. +async fn query_pending_availability( + ctx: &mut Context, + relay_parent: Hash, + para: ParaId, +) -> Result> +where + Context: SubsystemContext, +{ + let (tx, rx) = oneshot::channel(); + ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::CandidatePendingAvailability(para, tx), + ))) + .await?; + rx.await? + .map_err::(Into::into) +} + +/// Query the validator set. +async fn query_validators( + ctx: &mut Context, + relay_parent: Hash, +) -> Result> +where + Context: SubsystemContext, +{ + let (tx, rx) = oneshot::channel(); + let query_validators = AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )); + + ctx.send_message(query_validators) + .await?; + rx.await? + .map_err::(Into::into) +} + +/// Query the hash of the `K` ancestors +async fn query_k_ancestors( + ctx: &mut Context, + relay_parent: Hash, + k: usize, +) -> Result> +where + Context: SubsystemContext, +{ + let (tx, rx) = oneshot::channel(); + let query_ancestors = AllMessages::ChainApi(ChainApiMessage::Ancestors { + hash: relay_parent, + k, + response_channel: tx, + }); + + ctx.send_message(query_ancestors) + .await?; + rx.await? + .map_err::(Into::into) +} + +/// Query the session index of a relay parent +async fn query_session_index_for_child( + ctx: &mut Context, + relay_parent: Hash, +) -> Result +where + Context: SubsystemContext, +{ + let (tx, rx) = oneshot::channel(); + let query_session_idx_for_child = AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )); + + ctx.send_message(query_session_idx_for_child) + .await?; + rx.await? + .map_err::(Into::into) +} + +/// Queries up to k ancestors with the constraints of equiv session +async fn query_up_to_k_ancestors_in_same_session( + ctx: &mut Context, + relay_parent: Hash, + k: usize, +) -> Result> +where + Context: SubsystemContext, +{ + // k + 1 since we always query the child's session index + // ordering is [parent, grandparent, greatgrandparent, greatgreatgrandparent, ...] + let ancestors = query_k_ancestors(ctx, relay_parent, k + 1).await?; + let desired_session = query_session_index_for_child(ctx, relay_parent).await?; + // we would only need `ancestors.len() - 1`, but the one extra could avoid a re-alloc + // if the consumer wants to push the `relay_parent` onto it too and does not hurt otherwise + let mut acc = Vec::with_capacity(ancestors.len()); + + // iterate from youngest to oldest + let mut iter = ancestors.into_iter().peekable(); + + while let Some(ancestor) = iter.next() { + if let Some(ancestor_parent) = iter.peek() { + let session = query_session_index_for_child(ctx, *ancestor_parent).await?; + if session != desired_session { + break; + } + acc.push(ancestor); + } else { + // either ended up at genesis or the blocks were + // already pruned + break; + } + } + + debug_assert!(acc.len() <= k); + Ok(acc) +} + + +#[cfg(test)] +mod tests; diff --git a/polkadot/node/network/availability-distribution/src/tests.rs b/polkadot/node/network/availability-distribution/src/tests.rs new file mode 100644 index 0000000000..130c2700ca --- /dev/null +++ b/polkadot/node/network/availability-distribution/src/tests.rs @@ -0,0 +1,975 @@ +use super::*; +use assert_matches::assert_matches; +use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks}; +use polkadot_primitives::v1::{ + AvailableData, BlockData, CandidateCommitments, CandidateDescriptor, GlobalValidationData, + GroupIndex, GroupRotationInfo, HeadData, LocalValidationData, OccupiedCore, + OmittedValidationData, PoV, ScheduledCore, ValidatorPair, +}; +use polkadot_subsystem_testhelpers as test_helpers; + +use futures::{executor, future, Future}; +use futures_timer::Delay; +use smallvec::smallvec; +use smol_timeout::TimeoutExt; +use std::time::Duration; + +macro_rules! view { + ( $( $hash:expr ),* $(,)? ) => [ + View(vec![ $( $hash.clone() ),* ]) + ]; + } + +macro_rules! delay { + ($delay:expr) => { + Delay::new(Duration::from_millis($delay)).await; + }; +} + +struct TestHarness { + virtual_overseer: test_helpers::TestSubsystemContextHandle, +} + +fn test_harness>( + keystore: KeyStorePtr, + test: impl FnOnce(TestHarness) -> T, +) { + let _ = env_logger::builder() + .is_test(true) + .filter( + Some("polkadot_availability_distribution"), + log::LevelFilter::Trace, + ) + .try_init(); + + let pool = sp_core::testing::TaskExecutor::new(); + + let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); + + let subsystem = AvailabilityDistributionSubsystem::new(keystore); + let subsystem = subsystem.run(context); + + let test_fut = test(TestHarness { virtual_overseer }); + + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); + + executor::block_on(future::select(test_fut, subsystem)); +} + +const TIMEOUT: Duration = Duration::from_millis(100); + +async fn overseer_signal( + overseer: &mut test_helpers::TestSubsystemContextHandle, + signal: OverseerSignal, +) { + delay!(50); + overseer + .send(FromOverseer::Signal(signal)) + .timeout(TIMEOUT) + .await + .expect("10ms is more than enough for sending signals."); +} + +async fn overseer_send( + overseer: &mut test_helpers::TestSubsystemContextHandle, + msg: AvailabilityDistributionMessage, +) { + log::trace!("Sending message:\n{:?}", &msg); + overseer + .send(FromOverseer::Communication { msg }) + .timeout(TIMEOUT) + .await + .expect("10ms is more than enough for sending messages."); +} + +async fn overseer_recv( + overseer: &mut test_helpers::TestSubsystemContextHandle, +) -> AllMessages { + log::trace!("Waiting for message ..."); + let msg = overseer + .recv() + .timeout(TIMEOUT) + .await + .expect("TIMEOUT is enough to recv."); + log::trace!("Received message:\n{:?}", &msg); + msg +} + +fn dummy_occupied_core(para: ParaId) -> CoreState { + CoreState::Occupied(OccupiedCore { + para_id: para, + next_up_on_available: None, + occupied_since: 0, + time_out_at: 5, + next_up_on_time_out: None, + availability: Default::default(), + group_responsible: GroupIndex::from(0), + }) +} + +use sp_keyring::Sr25519Keyring; + +#[derive(Clone)] +struct TestState { + chain_ids: Vec, + validators: Vec, + validator_public: Vec, + validator_index: Option, + validator_groups: (Vec>, GroupRotationInfo), + head_data: HashMap, + keystore: KeyStorePtr, + relay_parent: Hash, + ancestors: Vec, + availability_cores: Vec, + global_validation_data: GlobalValidationData, + local_validation_data: LocalValidationData, +} + +fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec { + val_ids.iter().map(|v| v.public().into()).collect() +} + +impl Default for TestState { + fn default() -> Self { + let chain_a = ParaId::from(1); + let chain_b = ParaId::from(2); + + let chain_ids = vec![chain_a, chain_b]; + + let validators = vec![ + Sr25519Keyring::Ferdie, // <- this node, role: validator + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + Sr25519Keyring::Dave, + ]; + + let keystore = keystore::Store::new_in_memory(); + + keystore + .write() + .insert_ephemeral_from_seed::(&validators[0].to_seed()) + .expect("Insert key into keystore"); + + let validator_public = validator_pubkeys(&validators); + + let validator_groups = vec![vec![2, 0, 4], vec![1], vec![3]]; + let group_rotation_info = GroupRotationInfo { + session_start_block: 0, + group_rotation_frequency: 100, + now: 1, + }; + let validator_groups = (validator_groups, group_rotation_info); + + let availability_cores = vec![ + CoreState::Scheduled(ScheduledCore { + para_id: chain_ids[0], + collator: None, + }), + CoreState::Scheduled(ScheduledCore { + para_id: chain_ids[1], + collator: None, + }), + ]; + + let mut head_data = HashMap::new(); + head_data.insert(chain_a, HeadData(vec![4, 5, 6])); + head_data.insert(chain_b, HeadData(vec![7, 8, 9])); + + let ancestors = vec![ + Hash::repeat_byte(0x44), + Hash::repeat_byte(0x33), + Hash::repeat_byte(0x22), + ]; + let relay_parent = Hash::repeat_byte(0x05); + + let local_validation_data = LocalValidationData { + parent_head: HeadData(vec![7, 8, 9]), + balance: Default::default(), + code_upgrade_allowed: None, + validation_code_hash: Default::default(), + }; + + let global_validation_data = GlobalValidationData { + max_code_size: 1000, + max_head_data_size: 1000, + block_number: Default::default(), + }; + + let validator_index = Some((validators.len() - 1) as ValidatorIndex); + + Self { + chain_ids, + keystore, + validators, + validator_public, + validator_groups, + availability_cores, + head_data, + local_validation_data, + global_validation_data, + relay_parent, + ancestors, + validator_index, + } + } +} + +fn make_available_data(test: &TestState, pov: PoV) -> AvailableData { + let omitted_validation = OmittedValidationData { + global_validation: test.global_validation_data.clone(), + local_validation: test.local_validation_data.clone(), + }; + + AvailableData { + omitted_validation, + pov, + } +} + +fn make_erasure_root(test: &TestState, pov: PoV) -> Hash { + let available_data = make_available_data(test, pov); + + let chunks = obtain_chunks(test.validators.len(), &available_data).unwrap(); + branches(&chunks).root() +} + +fn make_valid_availability_gossip( + test: &TestState, + candidate_hash: Hash, + erasure_chunk_index: u32, + pov: PoV, +) -> AvailabilityGossipMessage { + let available_data = make_available_data(test, pov); + + let erasure_chunks = derive_erasure_chunks_with_proofs(test.validators.len(), &available_data); + + let erasure_chunk: ErasureChunk = erasure_chunks + .get(erasure_chunk_index as usize) + .expect("Must be valid or input is oob") + .clone(); + + AvailabilityGossipMessage { + candidate_hash, + erasure_chunk, + } +} + +#[derive(Default)] +struct TestCandidateBuilder { + para_id: ParaId, + head_data: HeadData, + pov_hash: Hash, + relay_parent: Hash, + erasure_root: Hash, +} + +impl TestCandidateBuilder { + fn build(self) -> CommittedCandidateReceipt { + CommittedCandidateReceipt { + descriptor: CandidateDescriptor { + para_id: self.para_id, + pov_hash: self.pov_hash, + relay_parent: self.relay_parent, + ..Default::default() + }, + commitments: CandidateCommitments { + head_data: self.head_data, + erasure_root: self.erasure_root, + ..Default::default() + }, + } + } +} + +#[test] +fn helper_integrity() { + let test_state = TestState::default(); + + let pov_block = PoV { + block_data: BlockData(vec![42, 43, 44]), + }; + + let pov_hash = pov_block.hash(); + + let candidate = TestCandidateBuilder { + para_id: test_state.chain_ids[0], + relay_parent: test_state.relay_parent, + pov_hash: pov_hash, + erasure_root: make_erasure_root(&test_state, pov_block.clone()), + ..Default::default() + } + .build(); + + let message = + make_valid_availability_gossip(&test_state, dbg!(candidate.hash()), 2, pov_block.clone()); + + let root = dbg!(&candidate.commitments.erasure_root); + + let anticipated_hash = branch_hash( + root, + &message.erasure_chunk.proof, + dbg!(message.erasure_chunk.index as usize), + ) + .expect("Must be able to derive branch hash"); + assert_eq!( + anticipated_hash, + BlakeTwo256::hash(&message.erasure_chunk.chunk) + ); +} + +fn derive_erasure_chunks_with_proofs( + n_validators: usize, + available_data: &AvailableData, +) -> Vec { + let chunks: Vec> = obtain_chunks(n_validators, available_data).unwrap(); + + // create proofs for each erasure chunk + let branches = branches(chunks.as_ref()); + + let erasure_chunks = branches + .enumerate() + .map(|(index, (proof, chunk))| ErasureChunk { + chunk: chunk.to_vec(), + index: index as _, + proof, + }) + .collect::>(); + + erasure_chunks +} + +#[test] +fn reputation_verification() { + let test_state = TestState::default(); + + test_harness(test_state.keystore.clone(), |test_harness| async move { + let TestHarness { + mut virtual_overseer, + } = test_harness; + + let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap(); + + let pov_block_a = PoV { + block_data: BlockData(vec![42, 43, 44]), + }; + + let pov_block_b = PoV { + block_data: BlockData(vec![45, 46, 47]), + }; + + let pov_block_c = PoV { + block_data: BlockData(vec![48, 49, 50]), + }; + + let pov_hash_a = pov_block_a.hash(); + let pov_hash_b = pov_block_b.hash(); + let pov_hash_c = pov_block_c.hash(); + + let candidates = vec![ + TestCandidateBuilder { + para_id: test_state.chain_ids[0], + relay_parent: test_state.relay_parent, + pov_hash: pov_hash_a, + erasure_root: make_erasure_root(&test_state, pov_block_a.clone()), + ..Default::default() + } + .build(), + TestCandidateBuilder { + para_id: test_state.chain_ids[0], + relay_parent: test_state.relay_parent, + pov_hash: pov_hash_b, + erasure_root: make_erasure_root(&test_state, pov_block_b.clone()), + head_data: expected_head_data.clone(), + ..Default::default() + } + .build(), + TestCandidateBuilder { + para_id: test_state.chain_ids[1], + relay_parent: Hash::repeat_byte(0xFA), + pov_hash: pov_hash_c, + erasure_root: make_erasure_root(&test_state, pov_block_c.clone()), + head_data: test_state + .head_data + .get(&test_state.chain_ids[1]) + .unwrap() + .clone(), + ..Default::default() + } + .build(), + ]; + + let TestState { + chain_ids, + keystore: _, + validators: _, + validator_public, + validator_groups, + availability_cores, + head_data: _, + local_validation_data: _, + global_validation_data: _, + relay_parent: current, + ancestors, + validator_index: _, + } = test_state.clone(); + + let _ = validator_groups; + let _ = availability_cores; + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + assert_ne!(&peer_a, &peer_b); + + log::trace!("peer A: {:?}", peer_a); + log::trace!("peer B: {:?}", peer_b); + + log::trace!("candidate A: {:?}", candidates[0].hash()); + log::trace!("candidate B: {:?}", candidates[1].hash()); + + overseer_signal( + &mut virtual_overseer, + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated: smallvec![current.clone()], + deactivated: smallvec![], + }), + ) + .await; + + // ignore event producer registration + let _ = overseer_recv(&mut virtual_overseer).await; + + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::OurViewChange(view![current,]), + ), + ) + .await; + + // obtain the validators per relay parent + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert_eq!(relay_parent, current); + tx.send(Ok(validator_public.clone())).unwrap(); + } + ); + + let genesis = Hash::repeat_byte(0xAA); + // query of k ancestors, we only provide one + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::Ancestors { + hash: relay_parent, + k, + response_channel: tx, + }) => { + assert_eq!(relay_parent, current); + assert_eq!(k, AvailabilityDistributionSubsystem::K + 1); + // 0xAA..AA will not be included, since there is no mean to determine + // its session index + tx.send(Ok(vec![ancestors[0].clone(), genesis])).unwrap(); + } + ); + + // state query for each of them + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx) + )) => { + assert_eq!(relay_parent, current); + tx.send(Ok(1 as SessionIndex)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx) + )) => { + assert_eq!(relay_parent, genesis); + tx.send(Ok(1 as SessionIndex)).unwrap(); + } + ); + + // subsystem peer id collection + // which will query the availability cores + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx) + )) => { + assert_eq!(relay_parent, ancestors[0]); + // respond with a set of availability core states + tx.send(Ok(vec![ + dummy_occupied_core(chain_ids[0]), + dummy_occupied_core(chain_ids[1]) + ])).unwrap(); + } + ); + + // now each of the relay parents in the view (1) will + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::CandidatePendingAvailability(para, tx) + )) => { + assert_eq!(relay_parent, ancestors[0]); + assert_eq!(para, chain_ids[0]); + tx.send(Ok(Some( + candidates[0].clone() + ))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::CandidatePendingAvailability(para, tx) + )) => { + assert_eq!(relay_parent, ancestors[0]); + assert_eq!(para, chain_ids[1]); + tx.send(Ok(Some( + candidates[1].clone() + ))).unwrap(); + } + ); + + for _ in 0usize..1 { + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _relay_parent, + RuntimeApiRequest::AvailabilityCores(tx), + )) => { + tx.send(Ok(vec![ + CoreState::Occupied(OccupiedCore { + para_id: chain_ids[0].clone(), + next_up_on_available: None, + occupied_since: 0, + time_out_at: 10, + next_up_on_time_out: None, + availability: Default::default(), + group_responsible: GroupIndex::from(0), + }), + CoreState::Free, + CoreState::Free, + CoreState::Occupied(OccupiedCore { + para_id: chain_ids[1].clone(), + next_up_on_available: None, + occupied_since: 1, + time_out_at: 7, + next_up_on_time_out: None, + availability: Default::default(), + group_responsible: GroupIndex::from(0), + }), + CoreState::Free, + CoreState::Free, + ])).unwrap(); + } + ); + + // query the availability cores for each of the paras (2) + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + _relay_parent, + RuntimeApiRequest::CandidatePendingAvailability(para, tx), + ) + ) => { + assert_eq!(para, chain_ids[0]); + tx.send(Ok(Some( + candidates[0].clone() + ))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _relay_parent, + RuntimeApiRequest::CandidatePendingAvailability(para, tx), + )) => { + assert_eq!(para, chain_ids[1]); + tx.send(Ok(Some( + candidates[1].clone() + ))).unwrap(); + } + ); + } + + let mut candidates2 = candidates.clone(); + // check if the availability store can provide the desired erasure chunks + for i in 0usize..2 { + log::trace!("0000"); + let avail_data = make_available_data(&test_state, pov_block_a.clone()); + let chunks = + derive_erasure_chunks_with_proofs(test_state.validators.len(), &avail_data); + + let expected; + // store the chunk to the av store + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::QueryDataAvailability( + candidate_hash, + tx, + ) + ) => { + let index = candidates2.iter().enumerate().find(|x| { x.1.hash() == candidate_hash }).map(|x| x.0).unwrap(); + expected = dbg!(candidates2.swap_remove(index).hash()); + tx.send( + i == 0 + ).unwrap(); + } + ); + + assert_eq!(chunks.len(), test_state.validators.len()); + + log::trace!("xxxx"); + // retrieve a stored chunk + for (j, chunk) in chunks.into_iter().enumerate() { + log::trace!("yyyy i={}, j={}", i, j); + if i != 0 { + // not a validator, so this never happens + break; + } + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::AvailabilityStore( + AvailabilityStoreMessage::QueryChunk( + candidate_hash, + idx, + tx, + ) + ) => { + assert_eq!(candidate_hash, expected); + assert_eq!(j as u32, chunk.index); + assert_eq!(idx, j as u32); + tx.send( + Some(chunk.clone()) + ).unwrap(); + } + ); + } + } + // setup peer a with interest in current + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full), + ), + ) + .await; + + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![current]), + ), + ) + .await; + + // setup peer b with interest in ancestor + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), + ), + ) + .await; + + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![ancestors[0]]), + ), + ) + .await; + + delay!(100); + + ///////////////////////////////////////////////////////// + // ready for action + + // check if garbage messages are detected and peer rep is changed as expected + let garbage = b"I am garbage"; + + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + // AvailabilityDistributionSubsystem::PROTOCOL_ID, + garbage.to_vec(), + )), + ) + .await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer( + peer, + rep + ) + ) => { + assert_eq!(peer, peer_b); + assert_eq!(rep, COST_MESSAGE_NOT_DECODABLE); + } + ); + + let valid: AvailabilityGossipMessage = make_valid_availability_gossip( + &test_state, + candidates[0].hash(), + 2, + pov_block_a.clone(), + ); + + { + // valid (first, from b) + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerMessage(peer_b.clone(), valid.encode()), + ), + ) + .await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer( + peer, + rep + ) + ) => { + assert_eq!(peer, peer_b); + assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST); + } + ); + } + + { + // valid (duplicate, from b) + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerMessage(peer_b.clone(), valid.encode()), + ), + ) + .await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer( + peer, + rep + ) + ) => { + assert_eq!(peer, peer_b); + assert_eq!(rep, COST_PEER_DUPLICATE_MESSAGE); + } + ); + } + + { + // valid (second, from a) + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerMessage(peer_a.clone(), valid.encode()), + ), + ) + .await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer( + peer, + rep + ) + ) => { + assert_eq!(peer, peer_a); + assert_eq!(rep, BENEFIT_VALID_MESSAGE); + } + ); + } + + // peer a is not interested in anything anymore + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![]), + ), + ) + .await; + + { + // send the a message again, so we should detect the duplicate + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerMessage(peer_a.clone(), valid.encode()), + ), + ) + .await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer( + peer, + rep + ) + ) => { + assert_eq!(peer, peer_a); + assert_eq!(rep, COST_PEER_DUPLICATE_MESSAGE); + } + ); + } + + // peer b sends a message before we have the view + // setup peer a with interest in parent x + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerDisconnected(peer_b.clone()), + ), + ) + .await; + + delay!(10); + + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), + ), + ) + .await; + + { + // send another message + let valid2: AvailabilityGossipMessage = make_valid_availability_gossip( + &test_state, + candidates[2].hash(), + 1, + pov_block_c.clone(), + ); + + // send the a message before we send a view update + overseer_send( + &mut virtual_overseer, + AvailabilityDistributionMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerMessage(peer_a.clone(), valid2.encode()), + ), + ) + .await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::ReportPeer( + peer, + rep + ) + ) => { + assert_eq!(peer, peer_a); + assert_eq!(rep, COST_NOT_A_LIVE_CANDIDATE); + } + ); + } + }); +} + +#[test] +fn k_ancestors_in_session() { + let pool = sp_core::testing::TaskExecutor::new(); + let (mut ctx, mut virtual_overseer) = + test_helpers::make_subsystem_context::(pool); + + const DATA: &[(Hash, SessionIndex)] = &[ + (Hash::repeat_byte(0x32), 3), // relay parent + (Hash::repeat_byte(0x31), 3), // grand parent + (Hash::repeat_byte(0x30), 3), // great ... + (Hash::repeat_byte(0x20), 2), + (Hash::repeat_byte(0x12), 1), + (Hash::repeat_byte(0x11), 1), + (Hash::repeat_byte(0x10), 1), + ]; + const K: usize = 5; + + const EXPECTED: &[Hash] = &[DATA[1].0, DATA[2].0]; + + let test_fut = async move { + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::Ancestors { + hash: relay_parent, + k, + response_channel: tx, + }) => { + assert_eq!(k, K+1); + assert_eq!(relay_parent, DATA[0].0); + tx.send(Ok(DATA[1..=k].into_iter().map(|x| x.0).collect::>())).unwrap(); + } + ); + + // query the desired session index of the relay parent + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + assert_eq!(relay_parent, DATA[0].0); + let session: SessionIndex = DATA[0].1; + tx.send(Ok(session)).unwrap(); + } + ); + + // query ancestors + for i in 2usize..=(EXPECTED.len() + 1 + 1) { + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => { + // query is for ancestor_parent + let x = &DATA[i]; + assert_eq!(relay_parent, x.0); + // but needs to yield ancestor_parent's child's session index + let x = &DATA[i-1]; + tx.send(Ok(x.1)).unwrap(); + } + ); + } + }; + + let sut = async move { + let ancestors = query_up_to_k_ancestors_in_same_session(&mut ctx, DATA[0].0, K) + .await + .unwrap(); + assert_eq!(ancestors, EXPECTED.to_vec()); + }; + + futures::pin_mut!(test_fut); + futures::pin_mut!(sut); + + executor::block_on(future::join(test_fut, sut).timeout(Duration::from_millis(1000))); +} diff --git a/polkadot/node/network/bitfield-distribution/Cargo.toml b/polkadot/node/network/bitfield-distribution/Cargo.toml index bf7d23a900..0ae5ad5a3d 100644 --- a/polkadot/node/network/bitfield-distribution/Cargo.toml +++ b/polkadot/node/network/bitfield-distribution/Cargo.toml @@ -21,9 +21,9 @@ sc-network = { git = "https://github.com/paritytech/substrate", branch = "master polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } -parking_lot = "0.10.0" +parking_lot = "0.11.0" maplit = "1.0.2" -smol = "0.2.0" +smol = "0.3.3" smol-timeout = "0.1.0" env_logger = "0.7.1" assert_matches = "1.3.0" diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index d67b5e353f..939b6616df 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -46,9 +46,9 @@ const COST_MESSAGE_NOT_DECODABLE: ReputationChange = ReputationChange::new(-100, "Not interested in that parent hash"); const COST_PEER_DUPLICATE_MESSAGE: ReputationChange = ReputationChange::new(-500, "Peer sent the same message multiple times"); -const GAIN_VALID_MESSAGE_FIRST: ReputationChange = +const BENEFIT_VALID_MESSAGE_FIRST: ReputationChange = ReputationChange::new(15, "Valid message with new information"); -const GAIN_VALID_MESSAGE: ReputationChange = +const BENEFIT_VALID_MESSAGE: ReputationChange = ReputationChange::new(10, "Valid message"); /// Checked signed availability bitfield that is distributed @@ -396,14 +396,14 @@ where "Already received a message for validator at index {}", validator_index ); - modify_reputation(ctx, origin, GAIN_VALID_MESSAGE).await?; + modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await?; return Ok(()); } one_per_validator.insert(validator.clone(), message.clone()); relay_message(ctx, job_data, &mut state.peer_views, validator, message).await?; - modify_reputation(ctx, origin, GAIN_VALID_MESSAGE_FIRST).await + modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await } else { modify_reputation(ctx, origin, COST_SIGNATURE_INVALID).await } @@ -479,14 +479,14 @@ where { let current = state.peer_views.entry(origin.clone()).or_default(); - let delta_vec: Vec = (*current).difference(&view).cloned().collect(); + let added: Vec = view.difference(&*current).cloned().collect(); *current = view; // Send all messages we've seen before and the peer is now interested // in to that peer. - let delta_set: Vec<(ValidatorId, BitfieldGossipMessage)> = delta_vec + let delta_set: Vec<(ValidatorId, BitfieldGossipMessage)> = added .into_iter() .filter_map(|new_relay_parent_interest| { if let Some(job_data) = (&*state).per_relay_parent.get(&new_relay_parent_interest) { @@ -558,7 +558,7 @@ where { fn start(self, ctx: C) -> SpawnedSubsystem { SpawnedSubsystem { - name: "bitfield-distribution", + name: "bitfield-distribution-subsystem", future: Box::pin(async move { Self::run(ctx) }.map(|_| ())), } } @@ -870,7 +870,7 @@ mod test { NetworkBridgeMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_b); - assert_eq!(rep, GAIN_VALID_MESSAGE_FIRST) + assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST) } ); @@ -887,7 +887,7 @@ mod test { NetworkBridgeMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_a); - assert_eq!(rep, GAIN_VALID_MESSAGE) + assert_eq!(rep, BENEFIT_VALID_MESSAGE) } ); @@ -993,7 +993,7 @@ mod test { NetworkBridgeMessage::ReportPeer(peer, rep) ) => { assert_eq!(peer, peer_b); - assert_eq!(rep, GAIN_VALID_MESSAGE_FIRST) + assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST) } ); diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index a4070c028c..4d9b70d4f8 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -229,4 +229,4 @@ pub fn make_subsystem_context(spawn: S) rx: all_messages_rx }, ) -} +} \ No newline at end of file diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index 3f72e2f269..91d485739b 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -220,4 +220,4 @@ impl Subsystem for DummySubsystem { future, } } -} +} \ No newline at end of file diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index acc41a2e7f..ae1d429d75 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -188,12 +188,6 @@ impl NetworkBridgeMessage { /// Availability Distribution Message. #[derive(Debug)] pub enum AvailabilityDistributionMessage { - /// Distribute an availability chunk to other validators. - DistributeChunk(Hash, ErasureChunk), - - /// Fetch an erasure chunk from networking by candidate hash and chunk index. - FetchChunk(Hash, u32), - /// Event from the network bridge. NetworkBridgeUpdate(NetworkBridgeEvent), } @@ -202,8 +196,6 @@ impl AvailabilityDistributionMessage { /// If the current variant contains the relay parent hash, return it. pub fn relay_parent(&self) -> Option { match self { - Self::DistributeChunk(hash, _) => Some(*hash), - Self::FetchChunk(hash, _) => Some(*hash), Self::NetworkBridgeUpdate(_) => None, } } @@ -255,7 +247,7 @@ pub enum AvailabilityStoreMessage { /// megabytes of data to get a single bit of information. QueryDataAvailability(Hash, oneshot::Sender), - /// Query an `ErasureChunk` from the AV store. + /// Query an `ErasureChunk` from the AV store by the candidate hash and validator index. QueryChunk(Hash, ValidatorIndex, oneshot::Sender>), /// Query whether an `ErasureChunk` exists within the AV Store. @@ -513,6 +505,6 @@ pub enum AllMessages { AvailabilityStore(AvailabilityStoreMessage), /// Message for the network bridge subsystem. NetworkBridge(NetworkBridgeMessage), - /// Message for the Chain API subsystem + /// Message for the Chain API subsystem. ChainApi(ChainApiMessage), } diff --git a/polkadot/parachain/src/primitives.rs b/polkadot/parachain/src/primitives.rs index 65d890e26d..955842e30f 100644 --- a/polkadot/parachain/src/primitives.rs +++ b/polkadot/parachain/src/primitives.rs @@ -33,7 +33,7 @@ pub use polkadot_core_primitives::BlockNumber as RelayChainBlockNumber; /// Parachain head data included in the chain. #[derive(PartialEq, Eq, Clone, PartialOrd, Ord, Encode, Decode, RuntimeDebug)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Default))] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Default, Hash))] pub struct HeadData(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec); impl From> for HeadData { @@ -44,7 +44,7 @@ impl From> for HeadData { /// Parachain validation code. #[derive(Default, PartialEq, Eq, Clone, Encode, Decode, RuntimeDebug)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Hash))] pub struct ValidationCode(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec); impl From> for ValidationCode { @@ -186,7 +186,7 @@ impl AccountIdConversion for Id { /// Which origin a parachain's message to the relay chain should be dispatched from. #[derive(Clone, PartialEq, Eq, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug))] +#[cfg_attr(feature = "std", derive(Debug, Hash))] #[repr(u8)] pub enum ParachainDispatchOrigin { /// As a simple `Origin::Signed`, using `ParaId::account_id` as its value. This is good when @@ -215,7 +215,7 @@ impl sp_std::convert::TryFrom for ParachainDispatchOrigin { /// A message from a parachain to its Relay Chain. #[derive(Clone, PartialEq, Eq, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug))] +#[cfg_attr(feature = "std", derive(Debug, Hash))] pub struct UpwardMessage { /// The origin for the message to be sent from. pub origin: ParachainDispatchOrigin, diff --git a/polkadot/primitives/src/v0.rs b/polkadot/primitives/src/v0.rs index 11d62a4c2e..9f014c80c2 100644 --- a/polkadot/primitives/src/v0.rs +++ b/polkadot/primitives/src/v0.rs @@ -612,7 +612,7 @@ pub struct AvailableData { /// A chunk of erasure-encoded block data. #[derive(PartialEq, Eq, Clone, Encode, Decode, Default)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash))] pub struct ErasureChunk { /// The erasure-encoded chunk of data belonging to the candidate block. pub chunk: Vec, @@ -624,8 +624,8 @@ pub struct ErasureChunk { /// Statements that can be made about parachain candidates. These are the /// actual values that are signed. -#[derive(Clone, PartialEq, Eq, Encode, Decode, Hash)] -#[cfg_attr(feature = "std", derive(Debug))] +#[derive(Clone, PartialEq, Eq, Encode, Decode)] +#[cfg_attr(feature = "std", derive(Debug, Hash))] pub enum CompactStatement { /// Proposal of a parachain candidate. #[codec(index = "1")] diff --git a/polkadot/primitives/src/v1.rs b/polkadot/primitives/src/v1.rs index 781f6f7711..f79a6cb4c0 100644 --- a/polkadot/primitives/src/v1.rs +++ b/polkadot/primitives/src/v1.rs @@ -25,7 +25,7 @@ use runtime_primitives::traits::AppVerify; use inherents::InherentIdentifier; use sp_arithmetic::traits::{BaseArithmetic, Saturating, Zero}; -use runtime_primitives::traits::{BlakeTwo256, Hash as HashT}; +pub use runtime_primitives::traits::{BlakeTwo256, Hash as HashT}; // Export some core primitives. pub use polkadot_core_primitives::v1::{ @@ -106,7 +106,7 @@ pub fn validation_data_hash( /// A unique descriptor of the candidate receipt. #[derive(PartialEq, Eq, Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug, Default))] +#[cfg_attr(feature = "std", derive(Debug, Default, Hash))] pub struct CandidateDescriptor { /// The ID of the para this is a candidate for. pub para_id: Id, @@ -176,7 +176,7 @@ pub struct FullCandidateReceipt { /// A candidate-receipt with commitments directly included. #[derive(PartialEq, Eq, Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug, Default))] +#[cfg_attr(feature = "std", derive(Debug, Default, Hash))] pub struct CommittedCandidateReceipt { /// The descriptor of the candidate. pub descriptor: CandidateDescriptor, @@ -266,7 +266,7 @@ pub struct GlobalValidationData { /// Commitments made in a `CandidateReceipt`. Many of these are outputs of validation. #[derive(PartialEq, Eq, Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(Debug, Default))] +#[cfg_attr(feature = "std", derive(Debug, Default, Hash))] pub struct CandidateCommitments { /// Fees paid from the chain to the relay chain validators. pub fees: Balance, @@ -484,7 +484,7 @@ impl CoreAssignment { /// Validation data omitted from most candidate descriptor structs, as it can be derived from the /// relay-parent. #[derive(Clone, Encode, Decode)] -#[cfg_attr(feature = "std", derive(PartialEq, Debug))] +#[cfg_attr(feature = "std", derive(PartialEq, Debug, Default))] pub struct OmittedValidationData { /// The global validation schedule. pub global_validation: GlobalValidationData, diff --git a/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md b/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md index e34250782a..de34f3b8ed 100644 --- a/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md +++ b/polkadot/roadmap/implementers-guide/src/node/availability/availability-distribution.md @@ -23,16 +23,17 @@ Output: For each relay-parent in our local view update, look at all backed candidates pending availability. Distribute via gossip all erasure chunks for all candidates that we have to peers. -We define an operation `live_candidates(relay_heads) -> Set` which returns a set of [`CommittedCandidateReceipt`s](../../types/candidate.md#committed-candidate-receipt) a given set of relay chain heads that implies a set of candidates whose availability chunks should be currently gossiped. This is defined as all candidates pending availability in any of those relay-chain heads or any of their last `K` ancestors. We assume that state is not pruned within `K` blocks of the chain-head. +We define an operation `live_candidates(relay_heads) -> Set` which returns a set of [`CommittedCandidateReceipt`s](../../types/candidate.md#committed-candidate-receipt). +This is defined as all candidates pending availability in any of those relay-chain heads or any of their last `K` ancestors in the same session. We assume that state is not pruned within `K` blocks of the chain-head. `K` commonly is small and is currently fixed to `K=3`. -We will send any erasure-chunks that correspond to candidates in `live_candidates(peer_most_recent_view_update)`. Likewise, we only accept and forward messages pertaining to a candidate in `live_candidates(current_heads)`. Each erasure chunk should be accompanied by a merkle proof that it is committed to by the erasure trie root in the candidate receipt, and this gossip system is responsible for checking such proof. +We will send any erasure-chunks that correspond to candidates in `live_candidates(peer_most_recent_view_update)`. +Likewise, we only accept and forward messages pertaining to a candidate in `live_candidates(current_heads)`. +Each erasure chunk should be accompanied by a merkle proof that it is committed to by the erasure trie root in the candidate receipt, and this gossip system is responsible for checking such proof. We re-attempt to send anything live to a peer upon any view update from that peer. -On our view change, for all live candidates, we will check if we have the PoV by issuing a `QueryPoV` message and waiting for the response. If the query returns `Some`, we will perform the erasure-coding and distribute all messages to peers that will accept them. +On our view change, for all live candidates, we will check if we have the PoV by issuing a `QueryAvailabileData` message and waiting for the response. If the query returns `Some`, we will perform the erasure-coding and distribute all messages to peers that will accept them. If we are operating as a validator, we note our index `i` in the validator set and keep the `i`th availability chunk for any live candidate, as we receive it. We keep the chunk and its merkle proof in the [Availability Store](../utility/availability-store.md) by sending a `StoreChunk` command. This includes chunks and proofs generated as the result of a successful `QueryPoV`. -> TODO: back-and-forth is kind of ugly but drastically simplifies the pruning in the availability store, as it creates an invariant that chunks are only stored if the candidate was actually backed -> -> K=3? +The back-and-forth seems suboptimal at first glance, but drastically simplifies the pruning in the availability store, as it creates an invariant that chunks are only stored if the candidate was actually backed.