mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 12:51:02 +00:00
[zombinet] initial implementation of zombienet backchannel (#4377)
This commit is contained in:
Generated
+209
-1
@@ -1619,6 +1619,15 @@ version = "1.6.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
|
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "encoding_rs"
|
||||||
|
version = "0.8.30"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7896dc8abb250ffdda33912550faa54c88ec8b998dec0b2c55ab224921ce11df"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if 1.0.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "enum-as-inner"
|
name = "enum-as-inner"
|
||||||
version = "0.3.3"
|
version = "0.3.3"
|
||||||
@@ -1876,6 +1885,21 @@ version = "1.0.7"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "foreign-types"
|
||||||
|
version = "0.3.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
|
||||||
|
dependencies = [
|
||||||
|
"foreign-types-shared",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "foreign-types-shared"
|
||||||
|
version = "0.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fork-tree"
|
name = "fork-tree"
|
||||||
version = "3.0.0"
|
version = "3.0.0"
|
||||||
@@ -2684,6 +2708,19 @@ dependencies = [
|
|||||||
"webpki",
|
"webpki",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hyper-tls"
|
||||||
|
version = "0.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
|
||||||
|
dependencies = [
|
||||||
|
"bytes 1.1.0",
|
||||||
|
"hyper",
|
||||||
|
"native-tls",
|
||||||
|
"tokio",
|
||||||
|
"tokio-native-tls",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "idna"
|
name = "idna"
|
||||||
version = "0.1.5"
|
version = "0.1.5"
|
||||||
@@ -2856,7 +2893,7 @@ dependencies = [
|
|||||||
"socket2 0.3.19",
|
"socket2 0.3.19",
|
||||||
"widestring",
|
"widestring",
|
||||||
"winapi 0.3.9",
|
"winapi 0.3.9",
|
||||||
"winreg",
|
"winreg 0.6.2",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -4130,6 +4167,12 @@ dependencies = [
|
|||||||
"thrift",
|
"thrift",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "mime"
|
||||||
|
version = "0.3.16"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "minimal-lexical"
|
name = "minimal-lexical"
|
||||||
version = "0.2.1"
|
version = "0.2.1"
|
||||||
@@ -4348,6 +4391,24 @@ dependencies = [
|
|||||||
"rand 0.8.4",
|
"rand 0.8.4",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "native-tls"
|
||||||
|
version = "0.2.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "48ba9f7719b5a0f42f338907614285fb5fd70e53858141f69898a1fb7203b24d"
|
||||||
|
dependencies = [
|
||||||
|
"lazy_static",
|
||||||
|
"libc",
|
||||||
|
"log",
|
||||||
|
"openssl",
|
||||||
|
"openssl-probe",
|
||||||
|
"openssl-sys",
|
||||||
|
"schannel",
|
||||||
|
"security-framework",
|
||||||
|
"security-framework-sys",
|
||||||
|
"tempfile",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "net2"
|
name = "net2"
|
||||||
version = "0.2.37"
|
version = "0.2.37"
|
||||||
@@ -4539,12 +4600,39 @@ dependencies = [
|
|||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "openssl"
|
||||||
|
version = "0.10.38"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
"cfg-if 1.0.0",
|
||||||
|
"foreign-types",
|
||||||
|
"libc",
|
||||||
|
"once_cell",
|
||||||
|
"openssl-sys",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "openssl-probe"
|
name = "openssl-probe"
|
||||||
version = "0.1.4"
|
version = "0.1.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a"
|
checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "openssl-sys"
|
||||||
|
version = "0.9.72"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb"
|
||||||
|
dependencies = [
|
||||||
|
"autocfg",
|
||||||
|
"cc",
|
||||||
|
"libc",
|
||||||
|
"pkg-config",
|
||||||
|
"vcpkg",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ordered-float"
|
name = "ordered-float"
|
||||||
version = "1.1.1"
|
version = "1.1.1"
|
||||||
@@ -7854,6 +7942,41 @@ dependencies = [
|
|||||||
"winapi 0.3.9",
|
"winapi 0.3.9",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "reqwest"
|
||||||
|
version = "0.11.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7c4e0a76dc12a116108933f6301b95e83634e0c47b0afbed6abbaa0601e99258"
|
||||||
|
dependencies = [
|
||||||
|
"base64",
|
||||||
|
"bytes 1.1.0",
|
||||||
|
"encoding_rs",
|
||||||
|
"futures-core",
|
||||||
|
"futures-util",
|
||||||
|
"http",
|
||||||
|
"http-body",
|
||||||
|
"hyper",
|
||||||
|
"hyper-tls",
|
||||||
|
"ipnet",
|
||||||
|
"js-sys",
|
||||||
|
"lazy_static",
|
||||||
|
"log",
|
||||||
|
"mime",
|
||||||
|
"native-tls",
|
||||||
|
"percent-encoding 2.1.0",
|
||||||
|
"pin-project-lite 0.2.7",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"serde_urlencoded",
|
||||||
|
"tokio",
|
||||||
|
"tokio-native-tls",
|
||||||
|
"url 2.2.2",
|
||||||
|
"wasm-bindgen",
|
||||||
|
"wasm-bindgen-futures",
|
||||||
|
"web-sys",
|
||||||
|
"winreg 0.7.0",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "resolv-conf"
|
name = "resolv-conf"
|
||||||
version = "0.7.0"
|
version = "0.7.0"
|
||||||
@@ -9253,6 +9376,18 @@ dependencies = [
|
|||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_urlencoded"
|
||||||
|
version = "0.7.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "edfa57a7f8d9c1d260a549e7224100f6c43d43f9103e06dd8b4095a9b2b43ce9"
|
||||||
|
dependencies = [
|
||||||
|
"form_urlencoded",
|
||||||
|
"itoa 0.4.8",
|
||||||
|
"ryu",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "sha-1"
|
name = "sha-1"
|
||||||
version = "0.8.2"
|
version = "0.8.2"
|
||||||
@@ -10811,6 +10946,16 @@ dependencies = [
|
|||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio-native-tls"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b"
|
||||||
|
dependencies = [
|
||||||
|
"native-tls",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-rustls"
|
name = "tokio-rustls"
|
||||||
version = "0.22.0"
|
version = "0.22.0"
|
||||||
@@ -10833,6 +10978,18 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio-tungstenite"
|
||||||
|
version = "0.16.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e80b39df6afcc12cdf752398ade96a6b9e99c903dfdc36e53ad10b9c366bca72"
|
||||||
|
dependencies = [
|
||||||
|
"futures-util",
|
||||||
|
"log",
|
||||||
|
"tokio",
|
||||||
|
"tungstenite",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-util"
|
name = "tokio-util"
|
||||||
version = "0.6.9"
|
version = "0.6.9"
|
||||||
@@ -11066,6 +11223,25 @@ version = "1.0.8"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5e66dcbec4290c69dd03c57e76c2469ea5c7ce109c6dd4351c13055cf71ea055"
|
checksum = "5e66dcbec4290c69dd03c57e76c2469ea5c7ce109c6dd4351c13055cf71ea055"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tungstenite"
|
||||||
|
version = "0.16.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6ad3713a14ae247f22a728a0456a545df14acf3867f905adff84be99e23b3ad1"
|
||||||
|
dependencies = [
|
||||||
|
"base64",
|
||||||
|
"byteorder",
|
||||||
|
"bytes 1.1.0",
|
||||||
|
"http",
|
||||||
|
"httparse",
|
||||||
|
"log",
|
||||||
|
"rand 0.8.4",
|
||||||
|
"sha-1 0.9.8",
|
||||||
|
"thiserror",
|
||||||
|
"url 2.2.2",
|
||||||
|
"utf-8",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "twox-hash"
|
name = "twox-hash"
|
||||||
version = "1.6.1"
|
version = "1.6.1"
|
||||||
@@ -11212,6 +11388,12 @@ dependencies = [
|
|||||||
"percent-encoding 2.1.0",
|
"percent-encoding 2.1.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "utf-8"
|
||||||
|
version = "0.7.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "value-bag"
|
name = "value-bag"
|
||||||
version = "1.0.0-alpha.8"
|
version = "1.0.0-alpha.8"
|
||||||
@@ -11782,6 +11964,15 @@ dependencies = [
|
|||||||
"winapi 0.3.9",
|
"winapi 0.3.9",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "winreg"
|
||||||
|
version = "0.7.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69"
|
||||||
|
dependencies = [
|
||||||
|
"winapi 0.3.9",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ws2_32-sys"
|
name = "ws2_32-sys"
|
||||||
version = "0.2.1"
|
version = "0.2.1"
|
||||||
@@ -11989,6 +12180,23 @@ dependencies = [
|
|||||||
"synstructure",
|
"synstructure",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "zombienet-backchannel"
|
||||||
|
version = "0.9.13"
|
||||||
|
dependencies = [
|
||||||
|
"futures-util",
|
||||||
|
"lazy_static",
|
||||||
|
"parity-scale-codec",
|
||||||
|
"reqwest",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"thiserror",
|
||||||
|
"tokio",
|
||||||
|
"tokio-tungstenite",
|
||||||
|
"tracing",
|
||||||
|
"url 2.2.2",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "zstd"
|
name = "zstd"
|
||||||
version = "0.9.1+zstd.1.5.1"
|
version = "0.9.1+zstd.1.5.1"
|
||||||
|
|||||||
@@ -91,6 +91,7 @@ members = [
|
|||||||
"node/test/polkadot-simnet/common",
|
"node/test/polkadot-simnet/common",
|
||||||
"node/test/polkadot-simnet/node",
|
"node/test/polkadot-simnet/node",
|
||||||
"node/test/polkadot-simnet/test",
|
"node/test/polkadot-simnet/test",
|
||||||
|
"node/zombienet-backchannel",
|
||||||
"parachain/test-parachains",
|
"parachain/test-parachains",
|
||||||
"parachain/test-parachains/adder",
|
"parachain/test-parachains/adder",
|
||||||
"parachain/test-parachains/adder/collator",
|
"parachain/test-parachains/adder/collator",
|
||||||
|
|||||||
@@ -0,0 +1,22 @@
|
|||||||
|
[package]
|
||||||
|
name = "zombienet-backchannel"
|
||||||
|
description = "Zombienet backchannel to notify test runner and coordinate with malus actors."
|
||||||
|
license = "GPL-3.0-only"
|
||||||
|
version = "0.9.13"
|
||||||
|
authors = ["Parity Technologies <admin@parity.io>"]
|
||||||
|
edition = "2021"
|
||||||
|
readme = "README.md"
|
||||||
|
publish = false
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
tokio = { version = "1.0.0", default-features = false, features = ["macros", "net", "rt-multi-thread", "sync"] }
|
||||||
|
url = "2.0.0"
|
||||||
|
tokio-tungstenite = "0.16"
|
||||||
|
futures-util = "0.3.18"
|
||||||
|
lazy_static = "1.4.0"
|
||||||
|
parity-scale-codec = { version = "2.3.1", features = ["derive"] }
|
||||||
|
reqwest = "0.11"
|
||||||
|
thiserror = "1.0.30"
|
||||||
|
tracing = "0.1.26"
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_json = "1"
|
||||||
@@ -0,0 +1,39 @@
|
|||||||
|
// Copyright 2021 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Polkadot.
|
||||||
|
|
||||||
|
// Polkadot is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Polkadot is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! Polkadot Zombienet Backchannel error definitions.
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
#[allow(missing_docs)]
|
||||||
|
pub enum BackchannelError {
|
||||||
|
#[error("Error connecting websocket server")]
|
||||||
|
CantConnectToWS,
|
||||||
|
|
||||||
|
#[error("Backchannel not initialized yet")]
|
||||||
|
Uninitialized,
|
||||||
|
|
||||||
|
#[error("Backchannel already initialized")]
|
||||||
|
AlreadyInitialized,
|
||||||
|
|
||||||
|
#[error("Error sending new value to backchannel")]
|
||||||
|
SendItemFail,
|
||||||
|
|
||||||
|
#[error("Invalid host for connection backchannel")]
|
||||||
|
InvalidHost,
|
||||||
|
|
||||||
|
#[error("Invalid port for connection backchannel")]
|
||||||
|
InvalidPort,
|
||||||
|
}
|
||||||
@@ -0,0 +1,159 @@
|
|||||||
|
// Copyright 2017-2021 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Polkadot.
|
||||||
|
|
||||||
|
// Polkadot is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Polkadot is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! Provides the possibility to coordination between malicious actors and
|
||||||
|
//! the zombienet test-runner, allowing to reference runtime's generated
|
||||||
|
//! values in the test specifications, through a bidirectional message passing
|
||||||
|
//! implemented as a `backchannel`.
|
||||||
|
|
||||||
|
use futures_util::{SinkExt, StreamExt};
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
use parity_scale_codec as codec;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::{env, sync::Mutex};
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
|
||||||
|
|
||||||
|
mod errors;
|
||||||
|
use errors::BackchannelError;
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
pub static ref ZOMBIENET_BACKCHANNEL: Mutex<Option<ZombienetBackchannel>> = Mutex::new(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ZombienetBackchannel {
|
||||||
|
broadcast_tx: broadcast::Sender<BackchannelItem>,
|
||||||
|
ws_tx: broadcast::Sender<BackchannelItem>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct BackchannelItem {
|
||||||
|
key: String,
|
||||||
|
value: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Broadcaster;
|
||||||
|
|
||||||
|
pub const ZOMBIENET: &str = "🧟ZOMBIENET🧟";
|
||||||
|
|
||||||
|
impl Broadcaster {
|
||||||
|
/// Return a subscriber that will receive all message broadcasted by the zombienet backchannel
|
||||||
|
/// websocket server.
|
||||||
|
pub fn subscribe(&self) -> Result<broadcast::Receiver<BackchannelItem>, BackchannelError> {
|
||||||
|
let mut zombienet_bkc = ZOMBIENET_BACKCHANNEL.lock().unwrap();
|
||||||
|
let zombienet_bkc = zombienet_bkc.as_mut().ok_or(BackchannelError::Uninitialized)?;
|
||||||
|
let sender = zombienet_bkc.broadcast_tx.clone();
|
||||||
|
Ok(sender.subscribe())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Provides a simple API to send a key/value to the zombienet websocket server.
|
||||||
|
pub async fn send(
|
||||||
|
&mut self,
|
||||||
|
key: &'static str,
|
||||||
|
val: impl codec::Encode,
|
||||||
|
) -> Result<(), BackchannelError> {
|
||||||
|
let mut zombienet_bkc = ZOMBIENET_BACKCHANNEL.lock().unwrap();
|
||||||
|
let zombienet_bkc = zombienet_bkc.as_mut().ok_or(BackchannelError::Uninitialized)?;
|
||||||
|
|
||||||
|
let encoded = val.encode();
|
||||||
|
let backchannel_item = BackchannelItem {
|
||||||
|
key: key.to_string(),
|
||||||
|
value: String::from_utf8_lossy(&encoded).to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let sender = zombienet_bkc.ws_tx.clone();
|
||||||
|
sender.send(backchannel_item).map_err(|e| {
|
||||||
|
tracing::error!(target = ZOMBIENET, "Error sending new item: {}", e);
|
||||||
|
BackchannelError::SendItemFail
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ZombienetBackchannel {
|
||||||
|
pub async fn init() -> Result<(), BackchannelError> {
|
||||||
|
let mut zombienet_bkc = ZOMBIENET_BACKCHANNEL.lock().unwrap();
|
||||||
|
if zombienet_bkc.is_none() {
|
||||||
|
let backchannel_host =
|
||||||
|
env::var("BACKCHANNEL_HOST").unwrap_or_else(|_| "backchannel".to_string());
|
||||||
|
let backchannel_port =
|
||||||
|
env::var("BACKCHANNEL_PORT").unwrap_or_else(|_| "3000".to_string());
|
||||||
|
|
||||||
|
// validate port
|
||||||
|
backchannel_port.parse::<u16>().map_err(|_| BackchannelError::InvalidPort)?;
|
||||||
|
// validate non empty string for host
|
||||||
|
if backchannel_host.trim().is_empty() {
|
||||||
|
return Err(BackchannelError::InvalidHost)
|
||||||
|
};
|
||||||
|
|
||||||
|
let ws_url = format!("ws://{}:{}/ws", backchannel_host, backchannel_port);
|
||||||
|
tracing::debug!(target = ZOMBIENET, "Connecting to : {}", &ws_url);
|
||||||
|
let (ws_stream, _) =
|
||||||
|
connect_async(ws_url).await.map_err(|_| BackchannelError::CantConnectToWS)?;
|
||||||
|
let (mut write, mut read) = ws_stream.split();
|
||||||
|
|
||||||
|
let (tx, _rx) = broadcast::channel(256);
|
||||||
|
let (tx_relay, mut rx_relay) = broadcast::channel::<BackchannelItem>(256);
|
||||||
|
|
||||||
|
// receive from the ws and send to all subcribers
|
||||||
|
let tx1 = tx.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some(Ok(Message::Text(text))) = read.next().await {
|
||||||
|
match serde_json::from_str::<BackchannelItem>(&text) {
|
||||||
|
Ok(backchannel_item) =>
|
||||||
|
if tx1.send(backchannel_item).is_err() {
|
||||||
|
tracing::error!("Error sending through the channel");
|
||||||
|
return
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
tracing::error!("Invalid payload received");
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// receive from subscribers and relay to ws
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Ok(item) = rx_relay.recv().await {
|
||||||
|
if write
|
||||||
|
.send(Message::Text(serde_json::to_string(&item).unwrap()))
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
tracing::error!("Error sending through ws");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
*zombienet_bkc = Some(ZombienetBackchannel { broadcast_tx: tx, ws_tx: tx_relay });
|
||||||
|
return Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(BackchannelError::AlreadyInitialized)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Ensure that the backchannel is initialized and return a broadcaster instance
|
||||||
|
/// allowing to subscribe or send new items.
|
||||||
|
pub fn broadcaster() -> Result<Broadcaster, BackchannelError> {
|
||||||
|
if ZOMBIENET_BACKCHANNEL.lock().unwrap().is_some() {
|
||||||
|
Ok(Broadcaster {})
|
||||||
|
} else {
|
||||||
|
Err(BackchannelError::Uninitialized)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user