fix: migrate vendor rustfmt.toml to stable-only features

- Update vendor/pezkuwi-zombienet-sdk/rustfmt.toml to stable-only
- Reformat 74 vendor files with stable rustfmt
- Remove nightly-only features causing CI failures
This commit is contained in:
2025-12-23 10:00:48 +03:00
parent 9bfa143337
commit ebd8fafdee
74 changed files with 19895 additions and 21681 deletions
@@ -16,581 +16,540 @@ pub type Result<T> = core::result::Result<T, Error>;
#[derive(Clone)]
pub struct DockerClient {
using_podman: bool,
using_podman: bool,
}
#[derive(Debug)]
pub struct ContainerRunOptions {
image: String,
command: Vec<String>,
env: Option<Vec<(String, String)>>,
volume_mounts: Option<HashMap<String, String>>,
name: Option<String>,
entrypoint: Option<String>,
port_mapping: HashMap<Port, Port>,
rm: bool,
detach: bool,
image: String,
command: Vec<String>,
env: Option<Vec<(String, String)>>,
volume_mounts: Option<HashMap<String, String>>,
name: Option<String>,
entrypoint: Option<String>,
port_mapping: HashMap<Port, Port>,
rm: bool,
detach: bool,
}
enum Container {
Docker(DockerContainer),
Podman(PodmanContainer),
Docker(DockerContainer),
Podman(PodmanContainer),
}
// TODO: we may don't need this
#[allow(dead_code)]
#[derive(Deserialize, Debug)]
struct DockerContainer {
#[serde(alias = "Names", deserialize_with = "deserialize_list")]
names: Vec<String>,
#[serde(alias = "Ports", deserialize_with = "deserialize_list")]
ports: Vec<String>,
#[serde(alias = "State")]
state: String,
#[serde(alias = "Names", deserialize_with = "deserialize_list")]
names: Vec<String>,
#[serde(alias = "Ports", deserialize_with = "deserialize_list")]
ports: Vec<String>,
#[serde(alias = "State")]
state: String,
}
// TODO: we may don't need this
#[allow(dead_code)]
#[derive(Deserialize, Debug)]
struct PodmanPort {
host_ip: String,
container_port: u16,
host_port: u16,
range: u16,
protocol: String,
host_ip: String,
container_port: u16,
host_port: u16,
range: u16,
protocol: String,
}
// TODO: we may don't need this
#[allow(dead_code)]
#[derive(Deserialize, Debug)]
struct PodmanContainer {
#[serde(alias = "Id")]
id: String,
#[serde(alias = "Image")]
image: String,
#[serde(alias = "Mounts")]
mounts: Vec<String>,
#[serde(alias = "Names")]
names: Vec<String>,
#[serde(alias = "Ports", deserialize_with = "deserialize_null_as_default")]
ports: Vec<PodmanPort>,
#[serde(alias = "State")]
state: String,
#[serde(alias = "Id")]
id: String,
#[serde(alias = "Image")]
image: String,
#[serde(alias = "Mounts")]
mounts: Vec<String>,
#[serde(alias = "Names")]
names: Vec<String>,
#[serde(alias = "Ports", deserialize_with = "deserialize_null_as_default")]
ports: Vec<PodmanPort>,
#[serde(alias = "State")]
state: String,
}
fn deserialize_list<'de, D>(deserializer: D) -> std::result::Result<Vec<String>, D::Error>
where
D: Deserializer<'de>,
D: Deserializer<'de>,
{
let str_sequence = String::deserialize(deserializer)?;
Ok(str_sequence
.split(',')
.filter(|item| !item.is_empty())
.map(|item| item.to_owned())
.collect())
let str_sequence = String::deserialize(deserializer)?;
Ok(str_sequence
.split(',')
.filter(|item| !item.is_empty())
.map(|item| item.to_owned())
.collect())
}
fn deserialize_null_as_default<'de, D, T>(deserializer: D) -> std::result::Result<T, D::Error>
where
T: Default + Deserialize<'de>,
D: Deserializer<'de>,
T: Default + Deserialize<'de>,
D: Deserializer<'de>,
{
let opt = Option::deserialize(deserializer)?;
Ok(opt.unwrap_or_default())
let opt = Option::deserialize(deserializer)?;
Ok(opt.unwrap_or_default())
}
impl ContainerRunOptions {
pub fn new<S>(image: &str, command: Vec<S>) -> Self
where
S: Into<String> + std::fmt::Debug + Send + Clone,
{
ContainerRunOptions {
image: image.to_string(),
command: command
.clone()
.into_iter()
.map(|s| s.into())
.collect::<Vec<_>>(),
env: None,
volume_mounts: None,
name: None,
entrypoint: None,
port_mapping: HashMap::default(),
rm: false,
detach: true, // add -d flag by default
}
}
pub fn new<S>(image: &str, command: Vec<S>) -> Self
where
S: Into<String> + std::fmt::Debug + Send + Clone,
{
ContainerRunOptions {
image: image.to_string(),
command: command.clone().into_iter().map(|s| s.into()).collect::<Vec<_>>(),
env: None,
volume_mounts: None,
name: None,
entrypoint: None,
port_mapping: HashMap::default(),
rm: false,
detach: true, // add -d flag by default
}
}
pub fn env<S>(mut self, env: Vec<(S, S)>) -> Self
where
S: Into<String> + std::fmt::Debug + Send + Clone,
{
self.env = Some(
env.into_iter()
.map(|(name, value)| (name.into(), value.into()))
.collect(),
);
self
}
pub fn env<S>(mut self, env: Vec<(S, S)>) -> Self
where
S: Into<String> + std::fmt::Debug + Send + Clone,
{
self.env = Some(env.into_iter().map(|(name, value)| (name.into(), value.into())).collect());
self
}
pub fn volume_mounts<S>(mut self, volume_mounts: HashMap<S, S>) -> Self
where
S: Into<String> + std::fmt::Debug + Send + Clone,
{
self.volume_mounts = Some(
volume_mounts
.into_iter()
.map(|(source, target)| (source.into(), target.into()))
.collect(),
);
self
}
pub fn volume_mounts<S>(mut self, volume_mounts: HashMap<S, S>) -> Self
where
S: Into<String> + std::fmt::Debug + Send + Clone,
{
self.volume_mounts = Some(
volume_mounts
.into_iter()
.map(|(source, target)| (source.into(), target.into()))
.collect(),
);
self
}
pub fn name<S>(mut self, name: S) -> Self
where
S: Into<String> + std::fmt::Debug + Send + Clone,
{
self.name = Some(name.into());
self
}
pub fn name<S>(mut self, name: S) -> Self
where
S: Into<String> + std::fmt::Debug + Send + Clone,
{
self.name = Some(name.into());
self
}
pub fn entrypoint<S>(mut self, entrypoint: S) -> Self
where
S: Into<String> + std::fmt::Debug + Send + Clone,
{
self.entrypoint = Some(entrypoint.into());
self
}
pub fn entrypoint<S>(mut self, entrypoint: S) -> Self
where
S: Into<String> + std::fmt::Debug + Send + Clone,
{
self.entrypoint = Some(entrypoint.into());
self
}
pub fn port_mapping(mut self, port_mapping: &HashMap<Port, Port>) -> Self {
self.port_mapping.clone_from(port_mapping);
self
}
pub fn port_mapping(mut self, port_mapping: &HashMap<Port, Port>) -> Self {
self.port_mapping.clone_from(port_mapping);
self
}
pub fn rm(mut self) -> Self {
self.rm = true;
self
}
pub fn rm(mut self) -> Self {
self.rm = true;
self
}
pub fn detach(mut self, choice: bool) -> Self {
self.detach = choice;
self
}
pub fn detach(mut self, choice: bool) -> Self {
self.detach = choice;
self
}
}
impl DockerClient {
pub async fn new() -> Result<Self> {
let using_podman = Self::is_using_podman().await?;
pub async fn new() -> Result<Self> {
let using_podman = Self::is_using_podman().await?;
Ok(DockerClient { using_podman })
}
Ok(DockerClient { using_podman })
}
pub fn client_binary(&self) -> String {
String::from(if self.using_podman {
"podman"
} else {
"docker"
})
}
pub fn client_binary(&self) -> String {
String::from(if self.using_podman { "podman" } else { "docker" })
}
async fn is_using_podman() -> Result<bool> {
if let Ok(output) = tokio::process::Command::new("docker")
.arg("version")
.output()
.await
{
// detect whether we're actually running podman with docker emulation
return Ok(String::from_utf8_lossy(&output.stdout)
.to_lowercase()
.contains("podman"));
}
async fn is_using_podman() -> Result<bool> {
if let Ok(output) = tokio::process::Command::new("docker").arg("version").output().await {
// detect whether we're actually running podman with docker emulation
return Ok(String::from_utf8_lossy(&output.stdout).to_lowercase().contains("podman"));
}
tokio::process::Command::new("podman")
.arg("--version")
.output()
.await
.map_err(|err| anyhow!("Failed to detect container engine: {err}"))?;
tokio::process::Command::new("podman")
.arg("--version")
.output()
.await
.map_err(|err| anyhow!("Failed to detect container engine: {err}"))?;
Ok(true)
}
Ok(true)
}
}
impl DockerClient {
fn client_command(&self) -> tokio::process::Command {
tokio::process::Command::new(self.client_binary())
}
fn client_command(&self) -> tokio::process::Command {
tokio::process::Command::new(self.client_binary())
}
pub async fn create_volume(&self, name: &str) -> Result<()> {
let result = self
.client_command()
.args(["volume", "create", name])
.output()
.await
.map_err(|err| anyhow!("Failed to create volume '{name}': {err}"))?;
pub async fn create_volume(&self, name: &str) -> Result<()> {
let result = self
.client_command()
.args(["volume", "create", name])
.output()
.await
.map_err(|err| anyhow!("Failed to create volume '{name}': {err}"))?;
if !result.status.success() {
return Err(anyhow!(
"Failed to create volume '{name}': {}",
String::from_utf8_lossy(&result.stderr)
)
.into());
}
if !result.status.success() {
return Err(anyhow!(
"Failed to create volume '{name}': {}",
String::from_utf8_lossy(&result.stderr)
)
.into());
}
Ok(())
}
Ok(())
}
pub async fn container_run(&self, options: ContainerRunOptions) -> Result<String> {
let mut cmd = self.client_command();
cmd.args(["run", "--platform", "linux/amd64"]);
pub async fn container_run(&self, options: ContainerRunOptions) -> Result<String> {
let mut cmd = self.client_command();
cmd.args(["run", "--platform", "linux/amd64"]);
if options.detach {
cmd.arg("-d");
}
if options.detach {
cmd.arg("-d");
}
Self::apply_cmd_options(&mut cmd, &options);
Self::apply_cmd_options(&mut cmd, &options);
trace!("cmd: {:?}", cmd);
trace!("cmd: {:?}", cmd);
let result = cmd.output().await.map_err(|err| {
anyhow!(
"Failed to run container with image '{image}' and command '{command}': {err}",
image = options.image,
command = options.command.join(" "),
)
})?;
let result = cmd.output().await.map_err(|err| {
anyhow!(
"Failed to run container with image '{image}' and command '{command}': {err}",
image = options.image,
command = options.command.join(" "),
)
})?;
if !result.status.success() {
return Err(anyhow!(
"Failed to run container with image '{image}' and command '{command}': {err}",
image = options.image,
command = options.command.join(" "),
err = String::from_utf8_lossy(&result.stderr)
)
.into());
}
if !result.status.success() {
return Err(anyhow!(
"Failed to run container with image '{image}' and command '{command}': {err}",
image = options.image,
command = options.command.join(" "),
err = String::from_utf8_lossy(&result.stderr)
)
.into());
}
Ok(String::from_utf8_lossy(&result.stdout).to_string())
}
Ok(String::from_utf8_lossy(&result.stdout).to_string())
}
pub async fn container_create(&self, options: ContainerRunOptions) -> Result<String> {
let mut cmd = self.client_command();
cmd.args(["container", "create"]);
pub async fn container_create(&self, options: ContainerRunOptions) -> Result<String> {
let mut cmd = self.client_command();
cmd.args(["container", "create"]);
Self::apply_cmd_options(&mut cmd, &options);
Self::apply_cmd_options(&mut cmd, &options);
trace!("cmd: {:?}", cmd);
trace!("cmd: {:?}", cmd);
let result = cmd.output().await.map_err(|err| {
anyhow!(
"Failed to run container with image '{image}' and command '{command}': {err}",
image = options.image,
command = options.command.join(" "),
)
})?;
let result = cmd.output().await.map_err(|err| {
anyhow!(
"Failed to run container with image '{image}' and command '{command}': {err}",
image = options.image,
command = options.command.join(" "),
)
})?;
if !result.status.success() {
return Err(anyhow!(
"Failed to run container with image '{image}' and command '{command}': {err}",
image = options.image,
command = options.command.join(" "),
err = String::from_utf8_lossy(&result.stderr)
)
.into());
}
if !result.status.success() {
return Err(anyhow!(
"Failed to run container with image '{image}' and command '{command}': {err}",
image = options.image,
command = options.command.join(" "),
err = String::from_utf8_lossy(&result.stderr)
)
.into());
}
Ok(String::from_utf8_lossy(&result.stdout).to_string())
}
Ok(String::from_utf8_lossy(&result.stdout).to_string())
}
pub async fn container_exec<S>(
&self,
name: &str,
command: Vec<S>,
env: Option<Vec<(S, S)>>,
as_user: Option<S>,
) -> Result<ExecutionResult>
where
S: Into<String> + std::fmt::Debug + Send + Clone,
{
let mut cmd = self.client_command();
cmd.arg("exec");
pub async fn container_exec<S>(
&self,
name: &str,
command: Vec<S>,
env: Option<Vec<(S, S)>>,
as_user: Option<S>,
) -> Result<ExecutionResult>
where
S: Into<String> + std::fmt::Debug + Send + Clone,
{
let mut cmd = self.client_command();
cmd.arg("exec");
if let Some(env) = env {
for env_var in env {
cmd.args(["-e", &format!("{}={}", env_var.0.into(), env_var.1.into())]);
}
}
if let Some(env) = env {
for env_var in env {
cmd.args(["-e", &format!("{}={}", env_var.0.into(), env_var.1.into())]);
}
}
if let Some(user) = as_user {
cmd.args(["-u", user.into().as_ref()]);
}
if let Some(user) = as_user {
cmd.args(["-u", user.into().as_ref()]);
}
cmd.arg(name);
cmd.arg(name);
cmd.args(
command
.clone()
.into_iter()
.map(|s| <S as Into<String>>::into(s)),
);
cmd.args(command.clone().into_iter().map(|s| <S as Into<String>>::into(s)));
trace!("cmd is : {:?}", cmd);
trace!("cmd is : {:?}", cmd);
let result = cmd.output().await.map_err(|err| {
anyhow!(
"Failed to exec '{}' on '{}': {err}",
command
.into_iter()
.map(|s| <S as Into<String>>::into(s))
.collect::<Vec<_>>()
.join(" "),
name,
)
})?;
let result = cmd.output().await.map_err(|err| {
anyhow!(
"Failed to exec '{}' on '{}': {err}",
command
.into_iter()
.map(|s| <S as Into<String>>::into(s))
.collect::<Vec<_>>()
.join(" "),
name,
)
})?;
if !result.status.success() {
return Ok(Err((
result.status,
String::from_utf8_lossy(&result.stderr).to_string(),
)));
}
if !result.status.success() {
return Ok(Err((result.status, String::from_utf8_lossy(&result.stderr).to_string())));
}
Ok(Ok(String::from_utf8_lossy(&result.stdout).to_string()))
}
Ok(Ok(String::from_utf8_lossy(&result.stdout).to_string()))
}
pub async fn container_cp(
&self,
name: &str,
local_path: &Path,
remote_path: &Path,
) -> Result<()> {
let result = self
.client_command()
.args([
"cp",
local_path.to_string_lossy().as_ref(),
&format!("{name}:{}", remote_path.to_string_lossy().as_ref()),
])
.output()
.await
.map_err(|err| {
anyhow!(
"Failed copy file '{file}' to container '{name}': {err}",
file = local_path.to_string_lossy(),
)
})?;
pub async fn container_cp(
&self,
name: &str,
local_path: &Path,
remote_path: &Path,
) -> Result<()> {
let result = self
.client_command()
.args([
"cp",
local_path.to_string_lossy().as_ref(),
&format!("{name}:{}", remote_path.to_string_lossy().as_ref()),
])
.output()
.await
.map_err(|err| {
anyhow!(
"Failed copy file '{file}' to container '{name}': {err}",
file = local_path.to_string_lossy(),
)
})?;
if !result.status.success() {
return Err(anyhow!(
"Failed to copy file '{file}' to container '{name}': {err}",
file = local_path.to_string_lossy(),
err = String::from_utf8_lossy(&result.stderr)
)
.into());
}
if !result.status.success() {
return Err(anyhow!(
"Failed to copy file '{file}' to container '{name}': {err}",
file = local_path.to_string_lossy(),
err = String::from_utf8_lossy(&result.stderr)
)
.into());
}
Ok(())
}
Ok(())
}
pub async fn container_rm(&self, name: &str) -> Result<()> {
let result = self
.client_command()
.args(["rm", "--force", "--volumes", name])
.output()
.await
.map_err(|err| anyhow!("Failed do remove container '{name}: {err}"))?;
pub async fn container_rm(&self, name: &str) -> Result<()> {
let result = self
.client_command()
.args(["rm", "--force", "--volumes", name])
.output()
.await
.map_err(|err| anyhow!("Failed do remove container '{name}: {err}"))?;
if !result.status.success() {
return Err(anyhow!(
"Failed to remove container '{name}': {err}",
err = String::from_utf8_lossy(&result.stderr)
)
.into());
}
if !result.status.success() {
return Err(anyhow!(
"Failed to remove container '{name}': {err}",
err = String::from_utf8_lossy(&result.stderr)
)
.into());
}
Ok(())
}
Ok(())
}
pub async fn namespaced_containers_rm(&self, namespace: &str) -> Result<()> {
let container_names: Vec<String> = self
.get_containers()
.await?
.into_iter()
.filter_map(|container| match container {
Container::Docker(container) => {
if let Some(name) = container.names.first() {
if name.starts_with(namespace) {
return Some(name.to_string());
}
}
pub async fn namespaced_containers_rm(&self, namespace: &str) -> Result<()> {
let container_names: Vec<String> = self
.get_containers()
.await?
.into_iter()
.filter_map(|container| match container {
Container::Docker(container) => {
if let Some(name) = container.names.first() {
if name.starts_with(namespace) {
return Some(name.to_string());
}
}
None
},
Container::Podman(container) => {
if let Some(name) = container.names.first() {
if name.starts_with(namespace) {
return Some(name.to_string());
}
}
None
},
Container::Podman(container) => {
if let Some(name) = container.names.first() {
if name.starts_with(namespace) {
return Some(name.to_string());
}
}
None
},
})
.collect();
None
},
})
.collect();
info!("{:?}", container_names);
let futures = container_names
.iter()
.map(|name| self.container_rm(name))
.collect::<Vec<_>>();
try_join_all(futures).await?;
info!("{:?}", container_names);
let futures =
container_names.iter().map(|name| self.container_rm(name)).collect::<Vec<_>>();
try_join_all(futures).await?;
Ok(())
}
Ok(())
}
pub async fn container_ip(&self, container_name: &str) -> Result<String> {
let ip = if self.using_podman {
"127.0.0.1".into()
} else {
let mut cmd = tokio::process::Command::new("docker");
cmd.args(vec![
"inspect",
"-f",
"{{ .NetworkSettings.IPAddress }}",
container_name,
]);
pub async fn container_ip(&self, container_name: &str) -> Result<String> {
let ip = if self.using_podman {
"127.0.0.1".into()
} else {
let mut cmd = tokio::process::Command::new("docker");
cmd.args(vec!["inspect", "-f", "{{ .NetworkSettings.IPAddress }}", container_name]);
trace!("CMD: {cmd:?}");
trace!("CMD: {cmd:?}");
let res = cmd
.output()
.await
.map_err(|err| anyhow!("Failed to get docker container ip, output: {err}"))?;
let res = cmd
.output()
.await
.map_err(|err| anyhow!("Failed to get docker container ip, output: {err}"))?;
String::from_utf8(res.stdout)
.map_err(|err| anyhow!("Failed to get docker container ip, output: {err}"))?
.trim()
.into()
};
String::from_utf8(res.stdout)
.map_err(|err| anyhow!("Failed to get docker container ip, output: {err}"))?
.trim()
.into()
};
trace!("IP: {ip}");
Ok(ip)
}
trace!("IP: {ip}");
Ok(ip)
}
async fn get_containers(&self) -> Result<Vec<Container>> {
let containers = if self.using_podman {
self.get_podman_containers()
.await?
.into_iter()
.map(Container::Podman)
.collect()
} else {
self.get_docker_containers()
.await?
.into_iter()
.map(Container::Docker)
.collect()
};
async fn get_containers(&self) -> Result<Vec<Container>> {
let containers = if self.using_podman {
self.get_podman_containers().await?.into_iter().map(Container::Podman).collect()
} else {
self.get_docker_containers().await?.into_iter().map(Container::Docker).collect()
};
Ok(containers)
}
Ok(containers)
}
async fn get_podman_containers(&self) -> Result<Vec<PodmanContainer>> {
let res = tokio::process::Command::new("podman")
.args(vec!["ps", "--all", "--no-trunc", "--format", "json"])
.output()
.await
.map_err(|err| anyhow!("Failed to get podman containers output: {err}"))?;
async fn get_podman_containers(&self) -> Result<Vec<PodmanContainer>> {
let res = tokio::process::Command::new("podman")
.args(vec!["ps", "--all", "--no-trunc", "--format", "json"])
.output()
.await
.map_err(|err| anyhow!("Failed to get podman containers output: {err}"))?;
let stdout = String::from_utf8_lossy(&res.stdout);
let stdout = String::from_utf8_lossy(&res.stdout);
let containers = serde_json::from_str(&stdout)
.map_err(|err| anyhow!("Failed to parse podman containers output: {err}"))?;
let containers = serde_json::from_str(&stdout)
.map_err(|err| anyhow!("Failed to parse podman containers output: {err}"))?;
Ok(containers)
}
Ok(containers)
}
async fn get_docker_containers(&self) -> Result<Vec<DockerContainer>> {
let res = tokio::process::Command::new("docker")
.args(vec!["ps", "--all", "--no-trunc", "--format", "json"])
.output()
.await
.unwrap();
async fn get_docker_containers(&self) -> Result<Vec<DockerContainer>> {
let res = tokio::process::Command::new("docker")
.args(vec!["ps", "--all", "--no-trunc", "--format", "json"])
.output()
.await
.unwrap();
let stdout = String::from_utf8_lossy(&res.stdout);
let stdout = String::from_utf8_lossy(&res.stdout);
let mut containers = vec![];
for line in stdout.lines() {
containers.push(
serde_json::from_str::<DockerContainer>(line)
.map_err(|err| anyhow!("Failed to parse docker container output: {err}"))?,
);
}
let mut containers = vec![];
for line in stdout.lines() {
containers.push(
serde_json::from_str::<DockerContainer>(line)
.map_err(|err| anyhow!("Failed to parse docker container output: {err}"))?,
);
}
Ok(containers)
}
Ok(containers)
}
pub(crate) async fn container_logs(&self, container_name: &str) -> Result<String> {
let output = Command::new("sh")
.arg("-c")
.arg(format!("docker logs -t '{container_name}' 2>&1"))
.stdout(Stdio::piped())
.output()
.await
.map_err(|err| {
anyhow!(
"Failed to spawn docker logs command for container '{container_name}': {err}"
)
})?;
pub(crate) async fn container_logs(&self, container_name: &str) -> Result<String> {
let output = Command::new("sh")
.arg("-c")
.arg(format!("docker logs -t '{container_name}' 2>&1"))
.stdout(Stdio::piped())
.output()
.await
.map_err(|err| {
anyhow!(
"Failed to spawn docker logs command for container '{container_name}': {err}"
)
})?;
let logs = String::from_utf8_lossy(&output.stdout).to_string();
let logs = String::from_utf8_lossy(&output.stdout).to_string();
if !output.status.success() {
// stderr was redirected to stdout, so logs should contain the error message if any
return Err(anyhow!(
"Failed to get logs for container '{name}': {logs}",
name = container_name,
logs = &logs
)
.into());
}
if !output.status.success() {
// stderr was redirected to stdout, so logs should contain the error message if any
return Err(anyhow!(
"Failed to get logs for container '{name}': {logs}",
name = container_name,
logs = &logs
)
.into());
}
Ok(logs)
}
Ok(logs)
}
fn apply_cmd_options(cmd: &mut Command, options: &ContainerRunOptions) {
if options.rm {
cmd.arg("--rm");
}
fn apply_cmd_options(cmd: &mut Command, options: &ContainerRunOptions) {
if options.rm {
cmd.arg("--rm");
}
if let Some(entrypoint) = options.entrypoint.as_ref() {
cmd.args(["--entrypoint", entrypoint]);
}
if let Some(entrypoint) = options.entrypoint.as_ref() {
cmd.args(["--entrypoint", entrypoint]);
}
if let Some(volume_mounts) = options.volume_mounts.as_ref() {
for (source, target) in volume_mounts {
cmd.args(["-v", &format!("{source}:{target}")]);
}
}
if let Some(volume_mounts) = options.volume_mounts.as_ref() {
for (source, target) in volume_mounts {
cmd.args(["-v", &format!("{source}:{target}")]);
}
}
if let Some(env) = options.env.as_ref() {
for env_var in env {
cmd.args(["-e", &format!("{}={}", env_var.0, env_var.1)]);
}
}
if let Some(env) = options.env.as_ref() {
for env_var in env {
cmd.args(["-e", &format!("{}={}", env_var.0, env_var.1)]);
}
}
// add published ports
for (container_port, host_port) in options.port_mapping.iter() {
cmd.args(["-p", &format!("{host_port}:{container_port}")]);
}
// add published ports
for (container_port, host_port) in options.port_mapping.iter() {
cmd.args(["-p", &format!("{host_port}:{container_port}")]);
}
if let Some(name) = options.name.as_ref() {
cmd.args(["--name", name]);
}
if let Some(name) = options.name.as_ref() {
cmd.args(["--name", name]);
}
cmd.arg(&options.image);
cmd.arg(&options.image);
for arg in &options.command {
cmd.arg(arg);
}
}
for arg in &options.command {
cmd.arg(arg);
}
}
}
@@ -1,8 +1,8 @@
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{Arc, Weak},
thread,
collections::HashMap,
path::{Path, PathBuf},
sync::{Arc, Weak},
thread,
};
use async_trait::async_trait;
@@ -12,212 +12,207 @@ use tracing::{debug, trace, warn};
use uuid::Uuid;
use super::{
client::{ContainerRunOptions, DockerClient},
node::DockerNode,
DockerProvider,
client::{ContainerRunOptions, DockerClient},
node::DockerNode,
DockerProvider,
};
use crate::{
constants::NAMESPACE_PREFIX,
docker::{
node::{DeserializableDockerNodeOptions, DockerNodeOptions},
provider,
},
shared::helpers::extract_execution_result,
types::{
GenerateFileCommand, GenerateFilesOptions, ProviderCapabilities, RunCommandOptions,
SpawnNodeOptions,
},
DynNode, ProviderError, ProviderNamespace, ProviderNode,
constants::NAMESPACE_PREFIX,
docker::{
node::{DeserializableDockerNodeOptions, DockerNodeOptions},
provider,
},
shared::helpers::extract_execution_result,
types::{
GenerateFileCommand, GenerateFilesOptions, ProviderCapabilities, RunCommandOptions,
SpawnNodeOptions,
},
DynNode, ProviderError, ProviderNamespace, ProviderNode,
};
pub struct DockerNamespace<FS>
where
FS: FileSystem + Send + Sync + Clone,
FS: FileSystem + Send + Sync + Clone,
{
weak: Weak<DockerNamespace<FS>>,
#[allow(dead_code)]
provider: Weak<DockerProvider<FS>>,
name: String,
base_dir: PathBuf,
capabilities: ProviderCapabilities,
docker_client: DockerClient,
filesystem: FS,
delete_on_drop: Arc<Mutex<bool>>,
pub(super) nodes: RwLock<HashMap<String, Arc<DockerNode<FS>>>>,
weak: Weak<DockerNamespace<FS>>,
#[allow(dead_code)]
provider: Weak<DockerProvider<FS>>,
name: String,
base_dir: PathBuf,
capabilities: ProviderCapabilities,
docker_client: DockerClient,
filesystem: FS,
delete_on_drop: Arc<Mutex<bool>>,
pub(super) nodes: RwLock<HashMap<String, Arc<DockerNode<FS>>>>,
}
impl<FS> DockerNamespace<FS>
where
FS: FileSystem + Send + Sync + Clone + 'static,
FS: FileSystem + Send + Sync + Clone + 'static,
{
pub(super) async fn new(
provider: &Weak<DockerProvider<FS>>,
tmp_dir: &PathBuf,
capabilities: &ProviderCapabilities,
docker_client: &DockerClient,
filesystem: &FS,
custom_base_dir: Option<&Path>,
) -> Result<Arc<Self>, ProviderError> {
let name = format!("{}{}", NAMESPACE_PREFIX, Uuid::new_v4());
let base_dir = if let Some(custom_base_dir) = custom_base_dir {
if !filesystem.exists(custom_base_dir).await {
filesystem.create_dir(custom_base_dir).await?;
} else {
warn!(
"⚠️ Using and existing directory {} as base dir",
custom_base_dir.to_string_lossy()
);
}
PathBuf::from(custom_base_dir)
} else {
let base_dir = PathBuf::from_iter([tmp_dir, &PathBuf::from(&name)]);
filesystem.create_dir(&base_dir).await?;
base_dir
};
pub(super) async fn new(
provider: &Weak<DockerProvider<FS>>,
tmp_dir: &PathBuf,
capabilities: &ProviderCapabilities,
docker_client: &DockerClient,
filesystem: &FS,
custom_base_dir: Option<&Path>,
) -> Result<Arc<Self>, ProviderError> {
let name = format!("{}{}", NAMESPACE_PREFIX, Uuid::new_v4());
let base_dir = if let Some(custom_base_dir) = custom_base_dir {
if !filesystem.exists(custom_base_dir).await {
filesystem.create_dir(custom_base_dir).await?;
} else {
warn!(
"⚠️ Using and existing directory {} as base dir",
custom_base_dir.to_string_lossy()
);
}
PathBuf::from(custom_base_dir)
} else {
let base_dir = PathBuf::from_iter([tmp_dir, &PathBuf::from(&name)]);
filesystem.create_dir(&base_dir).await?;
base_dir
};
let namespace = Arc::new_cyclic(|weak| DockerNamespace {
weak: weak.clone(),
provider: provider.clone(),
name,
base_dir,
capabilities: capabilities.clone(),
filesystem: filesystem.clone(),
docker_client: docker_client.clone(),
nodes: RwLock::new(HashMap::new()),
delete_on_drop: Arc::new(Mutex::new(true)),
});
let namespace = Arc::new_cyclic(|weak| DockerNamespace {
weak: weak.clone(),
provider: provider.clone(),
name,
base_dir,
capabilities: capabilities.clone(),
filesystem: filesystem.clone(),
docker_client: docker_client.clone(),
nodes: RwLock::new(HashMap::new()),
delete_on_drop: Arc::new(Mutex::new(true)),
});
namespace.initialize().await?;
namespace.initialize().await?;
Ok(namespace)
}
Ok(namespace)
}
pub(super) async fn attach_to_live(
provider: &Weak<DockerProvider<FS>>,
capabilities: &ProviderCapabilities,
docker_client: &DockerClient,
filesystem: &FS,
custom_base_dir: &Path,
name: &str,
) -> Result<Arc<Self>, ProviderError> {
let base_dir = custom_base_dir.to_path_buf();
pub(super) async fn attach_to_live(
provider: &Weak<DockerProvider<FS>>,
capabilities: &ProviderCapabilities,
docker_client: &DockerClient,
filesystem: &FS,
custom_base_dir: &Path,
name: &str,
) -> Result<Arc<Self>, ProviderError> {
let base_dir = custom_base_dir.to_path_buf();
let namespace = Arc::new_cyclic(|weak| DockerNamespace {
weak: weak.clone(),
provider: provider.clone(),
name: name.to_owned(),
base_dir,
capabilities: capabilities.clone(),
filesystem: filesystem.clone(),
docker_client: docker_client.clone(),
nodes: RwLock::new(HashMap::new()),
delete_on_drop: Arc::new(Mutex::new(false)),
});
let namespace = Arc::new_cyclic(|weak| DockerNamespace {
weak: weak.clone(),
provider: provider.clone(),
name: name.to_owned(),
base_dir,
capabilities: capabilities.clone(),
filesystem: filesystem.clone(),
docker_client: docker_client.clone(),
nodes: RwLock::new(HashMap::new()),
delete_on_drop: Arc::new(Mutex::new(false)),
});
Ok(namespace)
}
Ok(namespace)
}
async fn initialize(&self) -> Result<(), ProviderError> {
// let ns_scripts_shared = PathBuf::from_iter([&self.base_dir, &PathBuf::from("shared-scripts")]);
// self.filesystem.create_dir(&ns_scripts_shared).await?;
self.initialize_zombie_scripts_volume().await?;
self.initialize_helper_binaries_volume().await?;
async fn initialize(&self) -> Result<(), ProviderError> {
// let ns_scripts_shared = PathBuf::from_iter([&self.base_dir, &PathBuf::from("shared-scripts")]);
// self.filesystem.create_dir(&ns_scripts_shared).await?;
self.initialize_zombie_scripts_volume().await?;
self.initialize_helper_binaries_volume().await?;
Ok(())
}
Ok(())
}
async fn initialize_zombie_scripts_volume(&self) -> Result<(), ProviderError> {
let local_zombie_wrapper_path =
PathBuf::from_iter([&self.base_dir, &PathBuf::from("zombie-wrapper.sh")]);
async fn initialize_zombie_scripts_volume(&self) -> Result<(), ProviderError> {
let local_zombie_wrapper_path =
PathBuf::from_iter([&self.base_dir, &PathBuf::from("zombie-wrapper.sh")]);
self.filesystem
.write(
&local_zombie_wrapper_path,
include_str!("../shared/scripts/zombie-wrapper.sh"),
)
.await?;
self.filesystem
.write(&local_zombie_wrapper_path, include_str!("../shared/scripts/zombie-wrapper.sh"))
.await?;
let local_helper_binaries_downloader_path = PathBuf::from_iter([
&self.base_dir,
&PathBuf::from("helper-binaries-downloader.sh"),
]);
let local_helper_binaries_downloader_path =
PathBuf::from_iter([&self.base_dir, &PathBuf::from("helper-binaries-downloader.sh")]);
self.filesystem
.write(
&local_helper_binaries_downloader_path,
include_str!("../shared/scripts/helper-binaries-downloader.sh"),
)
.await?;
self.filesystem
.write(
&local_helper_binaries_downloader_path,
include_str!("../shared/scripts/helper-binaries-downloader.sh"),
)
.await?;
let zombie_wrapper_volume_name = format!("{}-zombie-wrapper", self.name);
let zombie_wrapper_container_name = format!("{}-scripts", self.name);
let zombie_wrapper_volume_name = format!("{}-zombie-wrapper", self.name);
let zombie_wrapper_container_name = format!("{}-scripts", self.name);
self.docker_client
.create_volume(&zombie_wrapper_volume_name)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
self.docker_client
.create_volume(&zombie_wrapper_volume_name)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
self.docker_client
.container_create(
ContainerRunOptions::new("alpine:latest", vec!["tail", "-f", "/dev/null"])
.volume_mounts(HashMap::from([(
zombie_wrapper_volume_name.as_str(),
"/scripts",
)]))
.name(&zombie_wrapper_container_name)
.rm(),
)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
self.docker_client
.container_create(
ContainerRunOptions::new("alpine:latest", vec!["tail", "-f", "/dev/null"])
.volume_mounts(HashMap::from([(
zombie_wrapper_volume_name.as_str(),
"/scripts",
)]))
.name(&zombie_wrapper_container_name)
.rm(),
)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
// copy the scripts
self.docker_client
.container_cp(
&zombie_wrapper_container_name,
&local_zombie_wrapper_path,
&PathBuf::from("/scripts/zombie-wrapper.sh"),
)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
// copy the scripts
self.docker_client
.container_cp(
&zombie_wrapper_container_name,
&local_zombie_wrapper_path,
&PathBuf::from("/scripts/zombie-wrapper.sh"),
)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
self.docker_client
.container_cp(
&zombie_wrapper_container_name,
&local_helper_binaries_downloader_path,
&PathBuf::from("/scripts/helper-binaries-downloader.sh"),
)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
self.docker_client
.container_cp(
&zombie_wrapper_container_name,
&local_helper_binaries_downloader_path,
&PathBuf::from("/scripts/helper-binaries-downloader.sh"),
)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
// set permissions for rwx on whole volume recursively
self.docker_client
.container_run(
ContainerRunOptions::new("alpine:latest", vec!["chmod", "-R", "777", "/scripts"])
.volume_mounts(HashMap::from([(
zombie_wrapper_volume_name.as_ref(),
"/scripts",
)]))
.rm(),
)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
// set permissions for rwx on whole volume recursively
self.docker_client
.container_run(
ContainerRunOptions::new("alpine:latest", vec!["chmod", "-R", "777", "/scripts"])
.volume_mounts(HashMap::from([(
zombie_wrapper_volume_name.as_ref(),
"/scripts",
)]))
.rm(),
)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
Ok(())
}
Ok(())
}
async fn initialize_helper_binaries_volume(&self) -> Result<(), ProviderError> {
let helper_binaries_volume_name = format!("{}-helper-binaries", self.name);
let zombie_wrapper_volume_name = format!("{}-zombie-wrapper", self.name);
async fn initialize_helper_binaries_volume(&self) -> Result<(), ProviderError> {
let helper_binaries_volume_name = format!("{}-helper-binaries", self.name);
let zombie_wrapper_volume_name = format!("{}-zombie-wrapper", self.name);
self.docker_client
.create_volume(&helper_binaries_volume_name)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
self.docker_client
.create_volume(&helper_binaries_volume_name)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
// download binaries to volume
self.docker_client
.container_run(
ContainerRunOptions::new(
// download binaries to volume
self.docker_client
.container_run(
ContainerRunOptions::new(
"alpine:latest",
vec!["ash", "/scripts/helper-binaries-downloader.sh"],
)
@@ -234,261 +229,242 @@ where
// wait until complete
.detach(false)
.rm(),
)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
// set permissions for rwx on whole volume recursively
self.docker_client
.container_run(
ContainerRunOptions::new("alpine:latest", vec!["chmod", "-R", "777", "/helpers"])
.volume_mounts(HashMap::from([(
helper_binaries_volume_name.as_ref(),
"/helpers",
)]))
.rm(),
)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
// set permissions for rwx on whole volume recursively
self.docker_client
.container_run(
ContainerRunOptions::new("alpine:latest", vec!["chmod", "-R", "777", "/helpers"])
.volume_mounts(HashMap::from([(
helper_binaries_volume_name.as_ref(),
"/helpers",
)]))
.rm(),
)
.await
.map_err(|err| ProviderError::CreateNamespaceFailed(self.name.clone(), err.into()))?;
Ok(())
}
Ok(())
}
pub async fn set_delete_on_drop(&self, delete_on_drop: bool) {
*self.delete_on_drop.lock().await = delete_on_drop;
}
pub async fn set_delete_on_drop(&self, delete_on_drop: bool) {
*self.delete_on_drop.lock().await = delete_on_drop;
}
pub async fn delete_on_drop(&self) -> bool {
if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() {
*delete_on_drop
} else {
// if we can't lock just remove the ns
true
}
}
pub async fn delete_on_drop(&self) -> bool {
if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() {
*delete_on_drop
} else {
// if we can't lock just remove the ns
true
}
}
}
#[async_trait]
impl<FS> ProviderNamespace for DockerNamespace<FS>
where
FS: FileSystem + Send + Sync + Clone + 'static,
FS: FileSystem + Send + Sync + Clone + 'static,
{
fn name(&self) -> &str {
&self.name
}
fn name(&self) -> &str {
&self.name
}
fn base_dir(&self) -> &PathBuf {
&self.base_dir
}
fn base_dir(&self) -> &PathBuf {
&self.base_dir
}
fn capabilities(&self) -> &ProviderCapabilities {
&self.capabilities
}
fn capabilities(&self) -> &ProviderCapabilities {
&self.capabilities
}
fn provider_name(&self) -> &str {
provider::PROVIDER_NAME
}
fn provider_name(&self) -> &str {
provider::PROVIDER_NAME
}
async fn detach(&self) {
self.set_delete_on_drop(false).await;
}
async fn detach(&self) {
self.set_delete_on_drop(false).await;
}
async fn is_detached(&self) -> bool {
self.delete_on_drop().await
}
async fn is_detached(&self) -> bool {
self.delete_on_drop().await
}
async fn nodes(&self) -> HashMap<String, DynNode> {
self.nodes
.read()
.await
.iter()
.map(|(name, node)| (name.clone(), node.clone() as DynNode))
.collect()
}
async fn nodes(&self) -> HashMap<String, DynNode> {
self.nodes
.read()
.await
.iter()
.map(|(name, node)| (name.clone(), node.clone() as DynNode))
.collect()
}
async fn get_node_available_args(
&self,
(command, image): (String, Option<String>),
) -> Result<String, ProviderError> {
let node_image = image.expect(&format!("image should be present when getting node available args with docker provider {THIS_IS_A_BUG}"));
async fn get_node_available_args(
&self,
(command, image): (String, Option<String>),
) -> Result<String, ProviderError> {
let node_image = image.expect(&format!("image should be present when getting node available args with docker provider {THIS_IS_A_BUG}"));
let temp_node = self
.spawn_node(
&SpawnNodeOptions::new(format!("temp-{}", Uuid::new_v4()), "cat".to_string())
.image(node_image.clone()),
)
.await?;
let temp_node = self
.spawn_node(
&SpawnNodeOptions::new(format!("temp-{}", Uuid::new_v4()), "cat".to_string())
.image(node_image.clone()),
)
.await?;
let available_args_output = temp_node
.run_command(RunCommandOptions::new(command.clone()).args(vec!["--help"]))
.await?
.map_err(|(_exit, status)| {
ProviderError::NodeAvailableArgsError(node_image, command, status)
})?;
let available_args_output = temp_node
.run_command(RunCommandOptions::new(command.clone()).args(vec!["--help"]))
.await?
.map_err(|(_exit, status)| {
ProviderError::NodeAvailableArgsError(node_image, command, status)
})?;
temp_node.destroy().await?;
temp_node.destroy().await?;
Ok(available_args_output)
}
Ok(available_args_output)
}
async fn spawn_node(&self, options: &SpawnNodeOptions) -> Result<DynNode, ProviderError> {
debug!("spawn option {:?}", options);
async fn spawn_node(&self, options: &SpawnNodeOptions) -> Result<DynNode, ProviderError> {
debug!("spawn option {:?}", options);
let node = DockerNode::new(DockerNodeOptions {
namespace: &self.weak,
namespace_base_dir: &self.base_dir,
name: &options.name,
image: options.image.as_ref(),
program: &options.program,
args: &options.args,
env: &options.env,
startup_files: &options.injected_files,
db_snapshot: options.db_snapshot.as_ref(),
docker_client: &self.docker_client,
container_name: format!("{}-{}", self.name, options.name),
filesystem: &self.filesystem,
port_mapping: options.port_mapping.as_ref().unwrap_or(&HashMap::default()),
})
.await?;
let node = DockerNode::new(DockerNodeOptions {
namespace: &self.weak,
namespace_base_dir: &self.base_dir,
name: &options.name,
image: options.image.as_ref(),
program: &options.program,
args: &options.args,
env: &options.env,
startup_files: &options.injected_files,
db_snapshot: options.db_snapshot.as_ref(),
docker_client: &self.docker_client,
container_name: format!("{}-{}", self.name, options.name),
filesystem: &self.filesystem,
port_mapping: options.port_mapping.as_ref().unwrap_or(&HashMap::default()),
})
.await?;
self.nodes
.write()
.await
.insert(node.name().to_string(), node.clone());
self.nodes.write().await.insert(node.name().to_string(), node.clone());
Ok(node)
}
Ok(node)
}
async fn spawn_node_from_json(
&self,
json_value: &serde_json::Value,
) -> Result<DynNode, ProviderError> {
let deserializable: DeserializableDockerNodeOptions =
serde_json::from_value(json_value.clone())?;
let options = DockerNodeOptions::from_deserializable(
&deserializable,
&self.weak,
&self.base_dir,
&self.docker_client,
&self.filesystem,
);
async fn spawn_node_from_json(
&self,
json_value: &serde_json::Value,
) -> Result<DynNode, ProviderError> {
let deserializable: DeserializableDockerNodeOptions =
serde_json::from_value(json_value.clone())?;
let options = DockerNodeOptions::from_deserializable(
&deserializable,
&self.weak,
&self.base_dir,
&self.docker_client,
&self.filesystem,
);
let node = DockerNode::attach_to_live(options).await?;
let node = DockerNode::attach_to_live(options).await?;
self.nodes
.write()
.await
.insert(node.name().to_string(), node.clone());
self.nodes.write().await.insert(node.name().to_string(), node.clone());
Ok(node)
}
Ok(node)
}
async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> {
debug!("generate files options {options:#?}");
async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> {
debug!("generate files options {options:#?}");
let node_name = options
.temp_name
.unwrap_or_else(|| format!("temp-{}", Uuid::new_v4()));
let node_image = options.image.expect(&format!(
"image should be present when generating files with docker provider {THIS_IS_A_BUG}"
));
let node_name = options.temp_name.unwrap_or_else(|| format!("temp-{}", Uuid::new_v4()));
let node_image = options.image.expect(&format!(
"image should be present when generating files with docker provider {THIS_IS_A_BUG}"
));
// run dummy command in a new container
let temp_node = self
.spawn_node(
&SpawnNodeOptions::new(node_name, "cat".to_string())
.injected_files(options.injected_files)
.image(node_image),
)
.await?;
// run dummy command in a new container
let temp_node = self
.spawn_node(
&SpawnNodeOptions::new(node_name, "cat".to_string())
.injected_files(options.injected_files)
.image(node_image),
)
.await?;
for GenerateFileCommand {
program,
args,
env,
local_output_path,
} in options.commands
{
let local_output_full_path = format!(
"{}{}{}",
self.base_dir.to_string_lossy(),
if local_output_path.starts_with("/") {
""
} else {
"/"
},
local_output_path.to_string_lossy()
);
for GenerateFileCommand { program, args, env, local_output_path } in options.commands {
let local_output_full_path = format!(
"{}{}{}",
self.base_dir.to_string_lossy(),
if local_output_path.starts_with("/") { "" } else { "/" },
local_output_path.to_string_lossy()
);
let contents = extract_execution_result(
&temp_node,
RunCommandOptions { program, args, env },
options.expected_path.as_ref(),
)
.await?;
self.filesystem
.write(local_output_full_path, contents)
.await
.map_err(|err| ProviderError::FileGenerationFailed(err.into()))?;
}
let contents = extract_execution_result(
&temp_node,
RunCommandOptions { program, args, env },
options.expected_path.as_ref(),
)
.await?;
self.filesystem
.write(local_output_full_path, contents)
.await
.map_err(|err| ProviderError::FileGenerationFailed(err.into()))?;
}
temp_node.destroy().await
}
temp_node.destroy().await
}
async fn static_setup(&self) -> Result<(), ProviderError> {
todo!()
}
async fn static_setup(&self) -> Result<(), ProviderError> {
todo!()
}
async fn destroy(&self) -> Result<(), ProviderError> {
let _ = self
.docker_client
.namespaced_containers_rm(&self.name)
.await
.map_err(|err| ProviderError::DeleteNamespaceFailed(self.name.clone(), err.into()))?;
async fn destroy(&self) -> Result<(), ProviderError> {
let _ =
self.docker_client.namespaced_containers_rm(&self.name).await.map_err(|err| {
ProviderError::DeleteNamespaceFailed(self.name.clone(), err.into())
})?;
if let Some(provider) = self.provider.upgrade() {
provider.namespaces.write().await.remove(&self.name);
}
if let Some(provider) = self.provider.upgrade() {
provider.namespaces.write().await.remove(&self.name);
}
Ok(())
}
Ok(())
}
}
impl<FS> Drop for DockerNamespace<FS>
where
FS: FileSystem + Send + Sync + Clone,
FS: FileSystem + Send + Sync + Clone,
{
fn drop(&mut self) {
let ns_name = self.name.clone();
if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() {
if *delete_on_drop {
let client = self.docker_client.clone();
let provider = self.provider.upgrade();
fn drop(&mut self) {
let ns_name = self.name.clone();
if let Ok(delete_on_drop) = self.delete_on_drop.try_lock() {
if *delete_on_drop {
let client = self.docker_client.clone();
let provider = self.provider.upgrade();
let handler = thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
trace!("🧟 deleting ns {ns_name} from cluster");
let _ = client.namespaced_containers_rm(&ns_name).await;
trace!("✅ deleted");
});
});
let handler = thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
trace!("🧟 deleting ns {ns_name} from cluster");
let _ = client.namespaced_containers_rm(&ns_name).await;
trace!("✅ deleted");
});
});
if handler.join().is_ok() {
if let Some(provider) = provider {
if let Ok(mut p) = provider.namespaces.try_write() {
p.remove(&self.name);
} else {
warn!(
"⚠️ Can not acquire write lock to the provider, ns {} not removed",
self.name
);
}
}
}
} else {
trace!("⚠️ leaking ns {ns_name} in cluster");
}
};
}
if handler.join().is_ok() {
if let Some(provider) = provider {
if let Ok(mut p) = provider.namespaces.try_write() {
p.remove(&self.name);
} else {
warn!(
"⚠️ Can not acquire write lock to the provider, ns {} not removed",
self.name
);
}
}
}
} else {
trace!("⚠️ leaking ns {ns_name} in cluster");
}
};
}
}
File diff suppressed because it is too large Load Diff
@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{Arc, Weak},
collections::HashMap,
path::{Path, PathBuf},
sync::{Arc, Weak},
};
use async_trait::async_trait;
@@ -10,152 +10,143 @@ use tokio::sync::RwLock;
use super::{client::DockerClient, namespace::DockerNamespace};
use crate::{
shared::helpers::extract_namespace_info, types::ProviderCapabilities, DynNamespace, Provider,
ProviderError, ProviderNamespace,
shared::helpers::extract_namespace_info, types::ProviderCapabilities, DynNamespace, Provider,
ProviderError, ProviderNamespace,
};
pub const PROVIDER_NAME: &str = "docker";
pub struct DockerProvider<FS>
where
FS: FileSystem + Send + Sync + Clone,
FS: FileSystem + Send + Sync + Clone,
{
weak: Weak<DockerProvider<FS>>,
capabilities: ProviderCapabilities,
tmp_dir: PathBuf,
docker_client: DockerClient,
filesystem: FS,
pub(super) namespaces: RwLock<HashMap<String, Arc<DockerNamespace<FS>>>>,
weak: Weak<DockerProvider<FS>>,
capabilities: ProviderCapabilities,
tmp_dir: PathBuf,
docker_client: DockerClient,
filesystem: FS,
pub(super) namespaces: RwLock<HashMap<String, Arc<DockerNamespace<FS>>>>,
}
impl<FS> DockerProvider<FS>
where
FS: FileSystem + Send + Sync + Clone + 'static,
FS: FileSystem + Send + Sync + Clone + 'static,
{
pub async fn new(filesystem: FS) -> Arc<Self> {
let docker_client = DockerClient::new().await.unwrap();
pub async fn new(filesystem: FS) -> Arc<Self> {
let docker_client = DockerClient::new().await.unwrap();
let provider = Arc::new_cyclic(|weak| DockerProvider {
weak: weak.clone(),
capabilities: ProviderCapabilities {
requires_image: true,
has_resources: false,
prefix_with_full_path: false,
use_default_ports_in_cmd: true,
},
tmp_dir: std::env::temp_dir(),
docker_client,
filesystem,
namespaces: RwLock::new(HashMap::new()),
});
let provider = Arc::new_cyclic(|weak| DockerProvider {
weak: weak.clone(),
capabilities: ProviderCapabilities {
requires_image: true,
has_resources: false,
prefix_with_full_path: false,
use_default_ports_in_cmd: true,
},
tmp_dir: std::env::temp_dir(),
docker_client,
filesystem,
namespaces: RwLock::new(HashMap::new()),
});
let cloned_provider = provider.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();
for (_, ns) in cloned_provider.namespaces().await {
if ns.is_detached().await {
// best effort
let _ = ns.destroy().await;
}
}
let cloned_provider = provider.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();
for (_, ns) in cloned_provider.namespaces().await {
if ns.is_detached().await {
// best effort
let _ = ns.destroy().await;
}
}
// exit the process (130, SIGINT)
std::process::exit(130)
});
// exit the process (130, SIGINT)
std::process::exit(130)
});
provider
}
provider
}
pub fn tmp_dir(mut self, tmp_dir: impl Into<PathBuf>) -> Self {
self.tmp_dir = tmp_dir.into();
self
}
pub fn tmp_dir(mut self, tmp_dir: impl Into<PathBuf>) -> Self {
self.tmp_dir = tmp_dir.into();
self
}
}
#[async_trait]
impl<FS> Provider for DockerProvider<FS>
where
FS: FileSystem + Send + Sync + Clone + 'static,
FS: FileSystem + Send + Sync + Clone + 'static,
{
fn name(&self) -> &str {
PROVIDER_NAME
}
fn name(&self) -> &str {
PROVIDER_NAME
}
fn capabilities(&self) -> &ProviderCapabilities {
&self.capabilities
}
fn capabilities(&self) -> &ProviderCapabilities {
&self.capabilities
}
async fn namespaces(&self) -> HashMap<String, DynNamespace> {
self.namespaces
.read()
.await
.iter()
.map(|(name, namespace)| (name.clone(), namespace.clone() as DynNamespace))
.collect()
}
async fn namespaces(&self) -> HashMap<String, DynNamespace> {
self.namespaces
.read()
.await
.iter()
.map(|(name, namespace)| (name.clone(), namespace.clone() as DynNamespace))
.collect()
}
async fn create_namespace(&self) -> Result<DynNamespace, ProviderError> {
let namespace = DockerNamespace::new(
&self.weak,
&self.tmp_dir,
&self.capabilities,
&self.docker_client,
&self.filesystem,
None,
)
.await?;
async fn create_namespace(&self) -> Result<DynNamespace, ProviderError> {
let namespace = DockerNamespace::new(
&self.weak,
&self.tmp_dir,
&self.capabilities,
&self.docker_client,
&self.filesystem,
None,
)
.await?;
self.namespaces
.write()
.await
.insert(namespace.name().to_string(), namespace.clone());
self.namespaces.write().await.insert(namespace.name().to_string(), namespace.clone());
Ok(namespace)
}
Ok(namespace)
}
async fn create_namespace_with_base_dir(
&self,
base_dir: &Path,
) -> Result<DynNamespace, ProviderError> {
let namespace = DockerNamespace::new(
&self.weak,
&self.tmp_dir,
&self.capabilities,
&self.docker_client,
&self.filesystem,
Some(base_dir),
)
.await?;
async fn create_namespace_with_base_dir(
&self,
base_dir: &Path,
) -> Result<DynNamespace, ProviderError> {
let namespace = DockerNamespace::new(
&self.weak,
&self.tmp_dir,
&self.capabilities,
&self.docker_client,
&self.filesystem,
Some(base_dir),
)
.await?;
self.namespaces
.write()
.await
.insert(namespace.name().to_string(), namespace.clone());
self.namespaces.write().await.insert(namespace.name().to_string(), namespace.clone());
Ok(namespace)
}
Ok(namespace)
}
async fn create_namespace_from_json(
&self,
json_value: &serde_json::Value,
) -> Result<DynNamespace, ProviderError> {
let (base_dir, name) = extract_namespace_info(json_value)?;
async fn create_namespace_from_json(
&self,
json_value: &serde_json::Value,
) -> Result<DynNamespace, ProviderError> {
let (base_dir, name) = extract_namespace_info(json_value)?;
let namespace = DockerNamespace::attach_to_live(
&self.weak,
&self.capabilities,
&self.docker_client,
&self.filesystem,
&base_dir,
&name,
)
.await?;
let namespace = DockerNamespace::attach_to_live(
&self.weak,
&self.capabilities,
&self.docker_client,
&self.filesystem,
&base_dir,
&name,
)
.await?;
self.namespaces
.write()
.await
.insert(namespace.name().to_string(), namespace.clone());
self.namespaces.write().await.insert(namespace.name().to_string(), namespace.clone());
Ok(namespace)
}
Ok(namespace)
}
}
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
@@ -2,76 +2,73 @@ use std::collections::BTreeMap;
use configuration::shared::resources::{ResourceQuantity, Resources};
use k8s_openapi::{
api::core::v1::{
ConfigMapVolumeSource, Container, EnvVar, PodSpec, ResourceRequirements, Volume,
VolumeMount,
},
apimachinery::pkg::api::resource::Quantity,
api::core::v1::{
ConfigMapVolumeSource, Container, EnvVar, PodSpec, ResourceRequirements, Volume,
VolumeMount,
},
apimachinery::pkg::api::resource::Quantity,
};
pub(super) struct PodSpecBuilder;
impl PodSpecBuilder {
pub(super) fn build(
name: &str,
image: &str,
resources: Option<&Resources>,
program: &str,
args: &[String],
env: &[(String, String)],
) -> PodSpec {
PodSpec {
hostname: Some(name.to_string()),
init_containers: Some(vec![Self::build_helper_binaries_setup_container()]),
containers: vec![Self::build_main_container(
name, image, resources, program, args, env,
)],
volumes: Some(Self::build_volumes()),
..Default::default()
}
}
pub(super) fn build(
name: &str,
image: &str,
resources: Option<&Resources>,
program: &str,
args: &[String],
env: &[(String, String)],
) -> PodSpec {
PodSpec {
hostname: Some(name.to_string()),
init_containers: Some(vec![Self::build_helper_binaries_setup_container()]),
containers: vec![Self::build_main_container(
name, image, resources, program, args, env,
)],
volumes: Some(Self::build_volumes()),
..Default::default()
}
}
fn build_main_container(
name: &str,
image: &str,
resources: Option<&Resources>,
program: &str,
args: &[String],
env: &[(String, String)],
) -> Container {
Container {
name: name.to_string(),
image: Some(image.to_string()),
image_pull_policy: Some("Always".to_string()),
command: Some(
[
vec!["/zombie-wrapper.sh".to_string(), program.to_string()],
args.to_vec(),
]
.concat(),
),
env: Some(
env.iter()
.map(|(name, value)| EnvVar {
name: name.clone(),
value: Some(value.clone()),
value_from: None,
})
.collect(),
),
volume_mounts: Some(Self::build_volume_mounts(vec![VolumeMount {
name: "zombie-wrapper-volume".to_string(),
mount_path: "/zombie-wrapper.sh".to_string(),
sub_path: Some("zombie-wrapper.sh".to_string()),
..Default::default()
}])),
resources: Self::build_resources_requirements(resources),
..Default::default()
}
}
fn build_main_container(
name: &str,
image: &str,
resources: Option<&Resources>,
program: &str,
args: &[String],
env: &[(String, String)],
) -> Container {
Container {
name: name.to_string(),
image: Some(image.to_string()),
image_pull_policy: Some("Always".to_string()),
command: Some(
[vec!["/zombie-wrapper.sh".to_string(), program.to_string()], args.to_vec()]
.concat(),
),
env: Some(
env.iter()
.map(|(name, value)| EnvVar {
name: name.clone(),
value: Some(value.clone()),
value_from: None,
})
.collect(),
),
volume_mounts: Some(Self::build_volume_mounts(vec![VolumeMount {
name: "zombie-wrapper-volume".to_string(),
mount_path: "/zombie-wrapper.sh".to_string(),
sub_path: Some("zombie-wrapper.sh".to_string()),
..Default::default()
}])),
resources: Self::build_resources_requirements(resources),
..Default::default()
}
}
fn build_helper_binaries_setup_container() -> Container {
Container {
fn build_helper_binaries_setup_container() -> Container {
Container {
name: "helper-binaries-setup".to_string(),
image: Some("europe-west3-docker.pkg.dev/parity-zombienet/zombienet-public-images/alpine:latest".to_string()),
image_pull_policy: Some("IfNotPresent".to_string()),
@@ -87,102 +84,93 @@ impl PodSpecBuilder {
]),
..Default::default()
}
}
}
fn build_volumes() -> Vec<Volume> {
vec![
Volume {
name: "cfg".to_string(),
..Default::default()
},
Volume {
name: "data".to_string(),
..Default::default()
},
Volume {
name: "relay-data".to_string(),
..Default::default()
},
Volume {
name: "zombie-wrapper-volume".to_string(),
config_map: Some(ConfigMapVolumeSource {
name: Some("zombie-wrapper".to_string()),
default_mode: Some(0o755),
..Default::default()
}),
..Default::default()
},
Volume {
name: "helper-binaries-downloader-volume".to_string(),
config_map: Some(ConfigMapVolumeSource {
name: Some("helper-binaries-downloader".to_string()),
default_mode: Some(0o755),
..Default::default()
}),
..Default::default()
},
]
}
fn build_volumes() -> Vec<Volume> {
vec![
Volume { name: "cfg".to_string(), ..Default::default() },
Volume { name: "data".to_string(), ..Default::default() },
Volume { name: "relay-data".to_string(), ..Default::default() },
Volume {
name: "zombie-wrapper-volume".to_string(),
config_map: Some(ConfigMapVolumeSource {
name: Some("zombie-wrapper".to_string()),
default_mode: Some(0o755),
..Default::default()
}),
..Default::default()
},
Volume {
name: "helper-binaries-downloader-volume".to_string(),
config_map: Some(ConfigMapVolumeSource {
name: Some("helper-binaries-downloader".to_string()),
default_mode: Some(0o755),
..Default::default()
}),
..Default::default()
},
]
}
fn build_volume_mounts(non_default_mounts: Vec<VolumeMount>) -> Vec<VolumeMount> {
[
vec![
VolumeMount {
name: "cfg".to_string(),
mount_path: "/cfg".to_string(),
read_only: Some(false),
..Default::default()
},
VolumeMount {
name: "data".to_string(),
mount_path: "/data".to_string(),
read_only: Some(false),
..Default::default()
},
VolumeMount {
name: "relay-data".to_string(),
mount_path: "/relay-data".to_string(),
read_only: Some(false),
..Default::default()
},
],
non_default_mounts,
]
.concat()
}
fn build_volume_mounts(non_default_mounts: Vec<VolumeMount>) -> Vec<VolumeMount> {
[
vec![
VolumeMount {
name: "cfg".to_string(),
mount_path: "/cfg".to_string(),
read_only: Some(false),
..Default::default()
},
VolumeMount {
name: "data".to_string(),
mount_path: "/data".to_string(),
read_only: Some(false),
..Default::default()
},
VolumeMount {
name: "relay-data".to_string(),
mount_path: "/relay-data".to_string(),
read_only: Some(false),
..Default::default()
},
],
non_default_mounts,
]
.concat()
}
fn build_resources_requirements(resources: Option<&Resources>) -> Option<ResourceRequirements> {
resources.map(|resources| ResourceRequirements {
limits: Self::build_resources_requirements_quantities(
resources.limit_cpu(),
resources.limit_memory(),
),
requests: Self::build_resources_requirements_quantities(
resources.request_cpu(),
resources.request_memory(),
),
..Default::default()
})
}
fn build_resources_requirements(resources: Option<&Resources>) -> Option<ResourceRequirements> {
resources.map(|resources| ResourceRequirements {
limits: Self::build_resources_requirements_quantities(
resources.limit_cpu(),
resources.limit_memory(),
),
requests: Self::build_resources_requirements_quantities(
resources.request_cpu(),
resources.request_memory(),
),
..Default::default()
})
}
fn build_resources_requirements_quantities(
cpu: Option<&ResourceQuantity>,
memory: Option<&ResourceQuantity>,
) -> Option<BTreeMap<String, Quantity>> {
let mut quantities = BTreeMap::new();
fn build_resources_requirements_quantities(
cpu: Option<&ResourceQuantity>,
memory: Option<&ResourceQuantity>,
) -> Option<BTreeMap<String, Quantity>> {
let mut quantities = BTreeMap::new();
if let Some(cpu) = cpu {
quantities.insert("cpu".to_string(), Quantity(cpu.as_str().to_string()));
}
if let Some(cpu) = cpu {
quantities.insert("cpu".to_string(), Quantity(cpu.as_str().to_string()));
}
if let Some(memory) = memory {
quantities.insert("memory".to_string(), Quantity(memory.as_str().to_string()));
}
if let Some(memory) = memory {
quantities.insert("memory".to_string(), Quantity(memory.as_str().to_string()));
}
if !quantities.is_empty() {
Some(quantities)
} else {
None
}
}
if !quantities.is_empty() {
Some(quantities)
} else {
None
}
}
}
@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{Arc, Weak},
collections::HashMap,
path::{Path, PathBuf},
sync::{Arc, Weak},
};
use async_trait::async_trait;
@@ -10,136 +10,127 @@ use tokio::sync::RwLock;
use super::{client::KubernetesClient, namespace::KubernetesNamespace};
use crate::{
shared::helpers::extract_namespace_info, types::ProviderCapabilities, DynNamespace, Provider,
ProviderError, ProviderNamespace,
shared::helpers::extract_namespace_info, types::ProviderCapabilities, DynNamespace, Provider,
ProviderError, ProviderNamespace,
};
pub const PROVIDER_NAME: &str = "k8s";
pub struct KubernetesProvider<FS>
where
FS: FileSystem + Send + Sync + Clone,
FS: FileSystem + Send + Sync + Clone,
{
weak: Weak<KubernetesProvider<FS>>,
capabilities: ProviderCapabilities,
tmp_dir: PathBuf,
k8s_client: KubernetesClient,
filesystem: FS,
pub(super) namespaces: RwLock<HashMap<String, Arc<KubernetesNamespace<FS>>>>,
weak: Weak<KubernetesProvider<FS>>,
capabilities: ProviderCapabilities,
tmp_dir: PathBuf,
k8s_client: KubernetesClient,
filesystem: FS,
pub(super) namespaces: RwLock<HashMap<String, Arc<KubernetesNamespace<FS>>>>,
}
impl<FS> KubernetesProvider<FS>
where
FS: FileSystem + Send + Sync + Clone,
FS: FileSystem + Send + Sync + Clone,
{
pub async fn new(filesystem: FS) -> Arc<Self> {
let k8s_client = KubernetesClient::new().await.unwrap();
pub async fn new(filesystem: FS) -> Arc<Self> {
let k8s_client = KubernetesClient::new().await.unwrap();
Arc::new_cyclic(|weak| KubernetesProvider {
weak: weak.clone(),
capabilities: ProviderCapabilities {
requires_image: true,
has_resources: true,
prefix_with_full_path: false,
use_default_ports_in_cmd: true,
},
tmp_dir: std::env::temp_dir(),
k8s_client,
filesystem,
namespaces: RwLock::new(HashMap::new()),
})
}
Arc::new_cyclic(|weak| KubernetesProvider {
weak: weak.clone(),
capabilities: ProviderCapabilities {
requires_image: true,
has_resources: true,
prefix_with_full_path: false,
use_default_ports_in_cmd: true,
},
tmp_dir: std::env::temp_dir(),
k8s_client,
filesystem,
namespaces: RwLock::new(HashMap::new()),
})
}
pub fn tmp_dir(mut self, tmp_dir: impl Into<PathBuf>) -> Self {
self.tmp_dir = tmp_dir.into();
self
}
pub fn tmp_dir(mut self, tmp_dir: impl Into<PathBuf>) -> Self {
self.tmp_dir = tmp_dir.into();
self
}
}
#[async_trait]
impl<FS> Provider for KubernetesProvider<FS>
where
FS: FileSystem + Send + Sync + Clone + 'static,
FS: FileSystem + Send + Sync + Clone + 'static,
{
fn name(&self) -> &str {
PROVIDER_NAME
}
fn name(&self) -> &str {
PROVIDER_NAME
}
fn capabilities(&self) -> &ProviderCapabilities {
&self.capabilities
}
fn capabilities(&self) -> &ProviderCapabilities {
&self.capabilities
}
async fn namespaces(&self) -> HashMap<String, DynNamespace> {
self.namespaces
.read()
.await
.iter()
.map(|(name, namespace)| (name.clone(), namespace.clone() as DynNamespace))
.collect()
}
async fn namespaces(&self) -> HashMap<String, DynNamespace> {
self.namespaces
.read()
.await
.iter()
.map(|(name, namespace)| (name.clone(), namespace.clone() as DynNamespace))
.collect()
}
async fn create_namespace(&self) -> Result<DynNamespace, ProviderError> {
let namespace = KubernetesNamespace::new(
&self.weak,
&self.tmp_dir,
&self.capabilities,
&self.k8s_client,
&self.filesystem,
None,
)
.await?;
async fn create_namespace(&self) -> Result<DynNamespace, ProviderError> {
let namespace = KubernetesNamespace::new(
&self.weak,
&self.tmp_dir,
&self.capabilities,
&self.k8s_client,
&self.filesystem,
None,
)
.await?;
self.namespaces
.write()
.await
.insert(namespace.name().to_string(), namespace.clone());
self.namespaces.write().await.insert(namespace.name().to_string(), namespace.clone());
Ok(namespace)
}
Ok(namespace)
}
async fn create_namespace_with_base_dir(
&self,
base_dir: &Path,
) -> Result<DynNamespace, ProviderError> {
let namespace = KubernetesNamespace::new(
&self.weak,
&self.tmp_dir,
&self.capabilities,
&self.k8s_client,
&self.filesystem,
Some(base_dir),
)
.await?;
async fn create_namespace_with_base_dir(
&self,
base_dir: &Path,
) -> Result<DynNamespace, ProviderError> {
let namespace = KubernetesNamespace::new(
&self.weak,
&self.tmp_dir,
&self.capabilities,
&self.k8s_client,
&self.filesystem,
Some(base_dir),
)
.await?;
self.namespaces
.write()
.await
.insert(namespace.name().to_string(), namespace.clone());
self.namespaces.write().await.insert(namespace.name().to_string(), namespace.clone());
Ok(namespace)
}
Ok(namespace)
}
async fn create_namespace_from_json(
&self,
json_value: &serde_json::Value,
) -> Result<DynNamespace, ProviderError> {
let (base_dir, name) = extract_namespace_info(json_value)?;
async fn create_namespace_from_json(
&self,
json_value: &serde_json::Value,
) -> Result<DynNamespace, ProviderError> {
let (base_dir, name) = extract_namespace_info(json_value)?;
let namespace = KubernetesNamespace::attach_to_live(
&self.weak,
&self.capabilities,
&self.k8s_client,
&self.filesystem,
&base_dir,
&name,
)
.await?;
let namespace = KubernetesNamespace::attach_to_live(
&self.weak,
&self.capabilities,
&self.k8s_client,
&self.filesystem,
&base_dir,
&name,
)
.await?;
self.namespaces
.write()
.await
.insert(namespace.name().to_string(), namespace.clone());
self.namespaces.write().await.insert(namespace.name().to_string(), namespace.clone());
Ok(namespace)
}
Ok(namespace)
}
}
+154 -154
View File
@@ -5,253 +5,253 @@ mod native;
pub mod shared;
use std::{
collections::HashMap,
net::IpAddr,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
collections::HashMap,
net::IpAddr,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use async_trait::async_trait;
use shared::{
constants::LOCALHOST,
types::{
ExecutionResult, GenerateFilesOptions, ProviderCapabilities, RunCommandOptions,
RunScriptOptions, SpawnNodeOptions,
},
constants::LOCALHOST,
types::{
ExecutionResult, GenerateFilesOptions, ProviderCapabilities, RunCommandOptions,
RunScriptOptions, SpawnNodeOptions,
},
};
use support::fs::FileSystemError;
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum ProviderError {
#[error("Failed to create client '{0}': {1}")]
CreateClientFailed(String, anyhow::Error),
#[error("Failed to create client '{0}': {1}")]
CreateClientFailed(String, anyhow::Error),
#[error("Failed to create namespace '{0}': {1}")]
CreateNamespaceFailed(String, anyhow::Error),
#[error("Failed to create namespace '{0}': {1}")]
CreateNamespaceFailed(String, anyhow::Error),
#[error("Failed to spawn node '{0}': {1}")]
NodeSpawningFailed(String, anyhow::Error),
#[error("Failed to spawn node '{0}': {1}")]
NodeSpawningFailed(String, anyhow::Error),
#[error("Error running command '{0}' {1}: {2}")]
RunCommandError(String, String, anyhow::Error),
#[error("Error running command '{0}' {1}: {2}")]
RunCommandError(String, String, anyhow::Error),
#[error("Error running script'{0}': {1}")]
RunScriptError(String, anyhow::Error),
#[error("Error running script'{0}': {1}")]
RunScriptError(String, anyhow::Error),
#[error("Invalid network configuration field {0}")]
InvalidConfig(String),
#[error("Invalid network configuration field {0}")]
InvalidConfig(String),
#[error("Failed to retrieve node available args using image {0} and command {1}: {2}")]
NodeAvailableArgsError(String, String, String),
#[error("Failed to retrieve node available args using image {0} and command {1}: {2}")]
NodeAvailableArgsError(String, String, String),
#[error("Can not recover node: {0}")]
MissingNode(String),
#[error("Can not recover node: {0}")]
MissingNode(String),
#[error("Can not recover node: {0} info, field: {1}")]
MissingNodeInfo(String, String),
#[error("Can not recover node: {0} info, field: {1}")]
MissingNodeInfo(String, String),
#[error("File generation failed: {0}")]
FileGenerationFailed(anyhow::Error),
#[error("File generation failed: {0}")]
FileGenerationFailed(anyhow::Error),
#[error(transparent)]
FileSystemError(#[from] FileSystemError),
#[error(transparent)]
FileSystemError(#[from] FileSystemError),
#[error("Invalid script path for {0}")]
InvalidScriptPath(anyhow::Error),
#[error("Invalid script path for {0}")]
InvalidScriptPath(anyhow::Error),
#[error("Script with path {0} not found")]
ScriptNotFound(PathBuf),
#[error("Script with path {0} not found")]
ScriptNotFound(PathBuf),
#[error("Failed to retrieve process ID for node '{0}'")]
ProcessIdRetrievalFailed(String),
#[error("Failed to retrieve process ID for node '{0}'")]
ProcessIdRetrievalFailed(String),
#[error("Failed to pause node '{0}': {1}")]
PauseNodeFailed(String, anyhow::Error),
#[error("Failed to pause node '{0}': {1}")]
PauseNodeFailed(String, anyhow::Error),
#[error("Failed to resume node '{0}': {1}")]
ResumeNodeFailed(String, anyhow::Error),
#[error("Failed to resume node '{0}': {1}")]
ResumeNodeFailed(String, anyhow::Error),
#[error("Failed to kill node '{0}': {1}")]
KillNodeFailed(String, anyhow::Error),
#[error("Failed to kill node '{0}': {1}")]
KillNodeFailed(String, anyhow::Error),
#[error("Failed to restart node '{0}': {1}")]
RestartNodeFailed(String, anyhow::Error),
#[error("Failed to restart node '{0}': {1}")]
RestartNodeFailed(String, anyhow::Error),
#[error("Failed to destroy node '{0}': {1}")]
DestroyNodeFailed(String, anyhow::Error),
#[error("Failed to destroy node '{0}': {1}")]
DestroyNodeFailed(String, anyhow::Error),
#[error("Failed to get logs for node '{0}': {1}")]
GetLogsFailed(String, anyhow::Error),
#[error("Failed to get logs for node '{0}': {1}")]
GetLogsFailed(String, anyhow::Error),
#[error("Failed to dump logs for node '{0}': {1}")]
DumpLogsFailed(String, anyhow::Error),
#[error("Failed to dump logs for node '{0}': {1}")]
DumpLogsFailed(String, anyhow::Error),
#[error("Failed to copy file from node '{0}': {1}")]
CopyFileFromNodeError(String, anyhow::Error),
#[error("Failed to copy file from node '{0}': {1}")]
CopyFileFromNodeError(String, anyhow::Error),
#[error("Failed to setup fileserver: {0}")]
FileServerSetupError(anyhow::Error),
#[error("Failed to setup fileserver: {0}")]
FileServerSetupError(anyhow::Error),
#[error("Error uploading file: '{0}': {1}")]
UploadFile(String, anyhow::Error),
#[error("Error uploading file: '{0}': {1}")]
UploadFile(String, anyhow::Error),
#[error("Error downloading file: '{0}': {1}")]
DownloadFile(String, anyhow::Error),
#[error("Error downloading file: '{0}': {1}")]
DownloadFile(String, anyhow::Error),
#[error("Error sending file '{0}' to {1}: {2}")]
SendFile(String, String, anyhow::Error),
#[error("Error sending file '{0}' to {1}: {2}")]
SendFile(String, String, anyhow::Error),
#[error("Error creating port-forward '{0}:{1}': {2}")]
PortForwardError(u16, u16, anyhow::Error),
#[error("Error creating port-forward '{0}:{1}': {2}")]
PortForwardError(u16, u16, anyhow::Error),
#[error("Failed to delete namespace '{0}': {1}")]
DeleteNamespaceFailed(String, anyhow::Error),
#[error("Failed to delete namespace '{0}': {1}")]
DeleteNamespaceFailed(String, anyhow::Error),
#[error("Serialization error")]
SerializationError(#[from] serde_json::Error),
#[error("Serialization error")]
SerializationError(#[from] serde_json::Error),
#[error("Failed to acquire lock: {0}")]
FailedToAcquireLock(String),
#[error("Failed to acquire lock: {0}")]
FailedToAcquireLock(String),
}
#[async_trait]
pub trait Provider {
fn name(&self) -> &str;
fn name(&self) -> &str;
fn capabilities(&self) -> &ProviderCapabilities;
fn capabilities(&self) -> &ProviderCapabilities;
async fn namespaces(&self) -> HashMap<String, DynNamespace>;
async fn namespaces(&self) -> HashMap<String, DynNamespace>;
async fn create_namespace(&self) -> Result<DynNamespace, ProviderError>;
async fn create_namespace(&self) -> Result<DynNamespace, ProviderError>;
async fn create_namespace_with_base_dir(
&self,
base_dir: &Path,
) -> Result<DynNamespace, ProviderError>;
async fn create_namespace_with_base_dir(
&self,
base_dir: &Path,
) -> Result<DynNamespace, ProviderError>;
async fn create_namespace_from_json(
&self,
json_value: &serde_json::Value,
) -> Result<DynNamespace, ProviderError>;
async fn create_namespace_from_json(
&self,
json_value: &serde_json::Value,
) -> Result<DynNamespace, ProviderError>;
}
pub type DynProvider = Arc<dyn Provider + Send + Sync>;
#[async_trait]
pub trait ProviderNamespace {
fn name(&self) -> &str;
fn name(&self) -> &str;
fn base_dir(&self) -> &PathBuf;
fn base_dir(&self) -> &PathBuf;
fn capabilities(&self) -> &ProviderCapabilities;
fn capabilities(&self) -> &ProviderCapabilities;
fn provider_name(&self) -> &str;
fn provider_name(&self) -> &str;
async fn detach(&self) {
// noop by default
warn!("Detach is not implemented for {}", self.name());
}
async fn detach(&self) {
// noop by default
warn!("Detach is not implemented for {}", self.name());
}
async fn is_detached(&self) -> bool {
// false by default
false
}
async fn is_detached(&self) -> bool {
// false by default
false
}
async fn nodes(&self) -> HashMap<String, DynNode>;
async fn nodes(&self) -> HashMap<String, DynNode>;
async fn get_node_available_args(
&self,
options: (String, Option<String>),
) -> Result<String, ProviderError>;
async fn get_node_available_args(
&self,
options: (String, Option<String>),
) -> Result<String, ProviderError>;
async fn spawn_node(&self, options: &SpawnNodeOptions) -> Result<DynNode, ProviderError>;
async fn spawn_node(&self, options: &SpawnNodeOptions) -> Result<DynNode, ProviderError>;
async fn spawn_node_from_json(
&self,
json_value: &serde_json::Value,
) -> Result<DynNode, ProviderError>;
async fn spawn_node_from_json(
&self,
json_value: &serde_json::Value,
) -> Result<DynNode, ProviderError>;
async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError>;
async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError>;
async fn destroy(&self) -> Result<(), ProviderError>;
async fn destroy(&self) -> Result<(), ProviderError>;
async fn static_setup(&self) -> Result<(), ProviderError>;
async fn static_setup(&self) -> Result<(), ProviderError>;
}
pub type DynNamespace = Arc<dyn ProviderNamespace + Send + Sync>;
#[async_trait]
pub trait ProviderNode: erased_serde::Serialize {
fn name(&self) -> &str;
fn name(&self) -> &str;
fn args(&self) -> Vec<&str>;
fn args(&self) -> Vec<&str>;
fn base_dir(&self) -> &PathBuf;
fn base_dir(&self) -> &PathBuf;
fn config_dir(&self) -> &PathBuf;
fn config_dir(&self) -> &PathBuf;
fn data_dir(&self) -> &PathBuf;
fn data_dir(&self) -> &PathBuf;
fn relay_data_dir(&self) -> &PathBuf;
fn relay_data_dir(&self) -> &PathBuf;
fn scripts_dir(&self) -> &PathBuf;
fn scripts_dir(&self) -> &PathBuf;
fn log_path(&self) -> &PathBuf;
fn log_path(&self) -> &PathBuf;
fn log_cmd(&self) -> String;
fn log_cmd(&self) -> String;
// Return the absolute path to the file in the `node` perspective
// TODO: purpose?
fn path_in_node(&self, file: &Path) -> PathBuf;
// Return the absolute path to the file in the `node` perspective
// TODO: purpose?
fn path_in_node(&self, file: &Path) -> PathBuf;
async fn logs(&self) -> Result<String, ProviderError>;
async fn logs(&self) -> Result<String, ProviderError>;
async fn dump_logs(&self, local_dest: PathBuf) -> Result<(), ProviderError>;
async fn dump_logs(&self, local_dest: PathBuf) -> Result<(), ProviderError>;
// By default return localhost, should be overrided for k8s
async fn ip(&self) -> Result<IpAddr, ProviderError> {
Ok(LOCALHOST)
}
// By default return localhost, should be overrided for k8s
async fn ip(&self) -> Result<IpAddr, ProviderError> {
Ok(LOCALHOST)
}
// Noop by default (native/docker provider)
async fn create_port_forward(
&self,
_local_port: u16,
_remote_port: u16,
) -> Result<Option<u16>, ProviderError> {
Ok(None)
}
// Noop by default (native/docker provider)
async fn create_port_forward(
&self,
_local_port: u16,
_remote_port: u16,
) -> Result<Option<u16>, ProviderError> {
Ok(None)
}
async fn run_command(
&self,
options: RunCommandOptions,
) -> Result<ExecutionResult, ProviderError>;
async fn run_command(
&self,
options: RunCommandOptions,
) -> Result<ExecutionResult, ProviderError>;
async fn run_script(&self, options: RunScriptOptions)
-> Result<ExecutionResult, ProviderError>;
async fn run_script(&self, options: RunScriptOptions)
-> Result<ExecutionResult, ProviderError>;
async fn send_file(
&self,
local_file_path: &Path,
remote_file_path: &Path,
mode: &str,
) -> Result<(), ProviderError>;
async fn send_file(
&self,
local_file_path: &Path,
remote_file_path: &Path,
mode: &str,
) -> Result<(), ProviderError>;
async fn receive_file(
&self,
remote_file_path: &Path,
local_file_path: &Path,
) -> Result<(), ProviderError>;
async fn receive_file(
&self,
remote_file_path: &Path,
local_file_path: &Path,
) -> Result<(), ProviderError>;
async fn pause(&self) -> Result<(), ProviderError>;
async fn pause(&self) -> Result<(), ProviderError>;
async fn resume(&self) -> Result<(), ProviderError>;
async fn resume(&self) -> Result<(), ProviderError>;
async fn restart(&self, after: Option<Duration>) -> Result<(), ProviderError>;
async fn restart(&self, after: Option<Duration>) -> Result<(), ProviderError>;
async fn destroy(&self) -> Result<(), ProviderError>;
async fn destroy(&self) -> Result<(), ProviderError>;
}
pub type DynNode = Arc<dyn ProviderNode + Send + Sync>;
@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{Arc, Weak},
collections::HashMap,
path::{Path, PathBuf},
sync::{Arc, Weak},
};
use async_trait::async_trait;
@@ -12,363 +12,337 @@ use uuid::Uuid;
use super::node::{NativeNode, NativeNodeOptions};
use crate::{
constants::NAMESPACE_PREFIX,
native::{node::DeserializableNativeNodeOptions, provider},
shared::helpers::extract_execution_result,
types::{
GenerateFileCommand, GenerateFilesOptions, ProviderCapabilities, RunCommandOptions,
SpawnNodeOptions,
},
DynNode, NativeProvider, ProviderError, ProviderNamespace, ProviderNode,
constants::NAMESPACE_PREFIX,
native::{node::DeserializableNativeNodeOptions, provider},
shared::helpers::extract_execution_result,
types::{
GenerateFileCommand, GenerateFilesOptions, ProviderCapabilities, RunCommandOptions,
SpawnNodeOptions,
},
DynNode, NativeProvider, ProviderError, ProviderNamespace, ProviderNode,
};
pub(super) struct NativeNamespace<FS>
where
FS: FileSystem + Send + Sync + Clone,
FS: FileSystem + Send + Sync + Clone,
{
weak: Weak<NativeNamespace<FS>>,
name: String,
provider: Weak<NativeProvider<FS>>,
base_dir: PathBuf,
capabilities: ProviderCapabilities,
filesystem: FS,
pub(super) nodes: RwLock<HashMap<String, Arc<NativeNode<FS>>>>,
weak: Weak<NativeNamespace<FS>>,
name: String,
provider: Weak<NativeProvider<FS>>,
base_dir: PathBuf,
capabilities: ProviderCapabilities,
filesystem: FS,
pub(super) nodes: RwLock<HashMap<String, Arc<NativeNode<FS>>>>,
}
impl<FS> NativeNamespace<FS>
where
FS: FileSystem + Send + Sync + Clone + 'static,
FS: FileSystem + Send + Sync + Clone + 'static,
{
pub(super) async fn new(
provider: &Weak<NativeProvider<FS>>,
tmp_dir: &PathBuf,
capabilities: &ProviderCapabilities,
filesystem: &FS,
custom_base_dir: Option<&Path>,
) -> Result<Arc<Self>, ProviderError> {
let name = format!("{}{}", NAMESPACE_PREFIX, Uuid::new_v4());
let base_dir = if let Some(custom_base_dir) = custom_base_dir {
if !filesystem.exists(custom_base_dir).await {
filesystem.create_dir_all(custom_base_dir).await?;
} else {
warn!(
"⚠️ Using and existing directory {} as base dir",
custom_base_dir.to_string_lossy()
);
}
PathBuf::from(custom_base_dir)
} else {
let base_dir = PathBuf::from_iter([tmp_dir, &PathBuf::from(&name)]);
filesystem.create_dir(&base_dir).await?;
base_dir
};
pub(super) async fn new(
provider: &Weak<NativeProvider<FS>>,
tmp_dir: &PathBuf,
capabilities: &ProviderCapabilities,
filesystem: &FS,
custom_base_dir: Option<&Path>,
) -> Result<Arc<Self>, ProviderError> {
let name = format!("{}{}", NAMESPACE_PREFIX, Uuid::new_v4());
let base_dir = if let Some(custom_base_dir) = custom_base_dir {
if !filesystem.exists(custom_base_dir).await {
filesystem.create_dir_all(custom_base_dir).await?;
} else {
warn!(
"⚠️ Using and existing directory {} as base dir",
custom_base_dir.to_string_lossy()
);
}
PathBuf::from(custom_base_dir)
} else {
let base_dir = PathBuf::from_iter([tmp_dir, &PathBuf::from(&name)]);
filesystem.create_dir(&base_dir).await?;
base_dir
};
Ok(Arc::new_cyclic(|weak| NativeNamespace {
weak: weak.clone(),
provider: provider.clone(),
name,
base_dir,
capabilities: capabilities.clone(),
filesystem: filesystem.clone(),
nodes: RwLock::new(HashMap::new()),
}))
}
Ok(Arc::new_cyclic(|weak| NativeNamespace {
weak: weak.clone(),
provider: provider.clone(),
name,
base_dir,
capabilities: capabilities.clone(),
filesystem: filesystem.clone(),
nodes: RwLock::new(HashMap::new()),
}))
}
pub(super) async fn attach_to_live(
provider: &Weak<NativeProvider<FS>>,
capabilities: &ProviderCapabilities,
filesystem: &FS,
custom_base_dir: &Path,
name: &str,
) -> Result<Arc<Self>, ProviderError> {
let base_dir = custom_base_dir.to_path_buf();
pub(super) async fn attach_to_live(
provider: &Weak<NativeProvider<FS>>,
capabilities: &ProviderCapabilities,
filesystem: &FS,
custom_base_dir: &Path,
name: &str,
) -> Result<Arc<Self>, ProviderError> {
let base_dir = custom_base_dir.to_path_buf();
Ok(Arc::new_cyclic(|weak| NativeNamespace {
weak: weak.clone(),
provider: provider.clone(),
name: name.to_string(),
base_dir,
capabilities: capabilities.clone(),
filesystem: filesystem.clone(),
nodes: RwLock::new(HashMap::new()),
}))
}
Ok(Arc::new_cyclic(|weak| NativeNamespace {
weak: weak.clone(),
provider: provider.clone(),
name: name.to_string(),
base_dir,
capabilities: capabilities.clone(),
filesystem: filesystem.clone(),
nodes: RwLock::new(HashMap::new()),
}))
}
}
#[async_trait]
impl<FS> ProviderNamespace for NativeNamespace<FS>
where
FS: FileSystem + Send + Sync + Clone + 'static,
FS: FileSystem + Send + Sync + Clone + 'static,
{
fn name(&self) -> &str {
&self.name
}
fn name(&self) -> &str {
&self.name
}
fn base_dir(&self) -> &PathBuf {
&self.base_dir
}
fn base_dir(&self) -> &PathBuf {
&self.base_dir
}
fn capabilities(&self) -> &ProviderCapabilities {
&self.capabilities
}
fn capabilities(&self) -> &ProviderCapabilities {
&self.capabilities
}
fn provider_name(&self) -> &str {
provider::PROVIDER_NAME
}
fn provider_name(&self) -> &str {
provider::PROVIDER_NAME
}
async fn nodes(&self) -> HashMap<String, DynNode> {
self.nodes
.read()
.await
.iter()
.map(|(name, node)| (name.clone(), node.clone() as DynNode))
.collect()
}
async fn nodes(&self) -> HashMap<String, DynNode> {
self.nodes
.read()
.await
.iter()
.map(|(name, node)| (name.clone(), node.clone() as DynNode))
.collect()
}
async fn get_node_available_args(
&self,
(command, _image): (String, Option<String>),
) -> Result<String, ProviderError> {
let temp_node = self
.spawn_node(
&SpawnNodeOptions::new(format!("temp-{}", Uuid::new_v4()), "bash".to_string())
.args(vec!["-c", "while :; do sleep 1; done"]),
)
.await?;
async fn get_node_available_args(
&self,
(command, _image): (String, Option<String>),
) -> Result<String, ProviderError> {
let temp_node = self
.spawn_node(
&SpawnNodeOptions::new(format!("temp-{}", Uuid::new_v4()), "bash".to_string())
.args(vec!["-c", "while :; do sleep 1; done"]),
)
.await?;
let available_args_output = temp_node
.run_command(RunCommandOptions::new(command.clone()).args(vec!["--help"]))
.await?
.map_err(|(_exit, status)| {
ProviderError::NodeAvailableArgsError("".to_string(), command, status)
})?;
let available_args_output = temp_node
.run_command(RunCommandOptions::new(command.clone()).args(vec!["--help"]))
.await?
.map_err(|(_exit, status)| {
ProviderError::NodeAvailableArgsError("".to_string(), command, status)
})?;
temp_node.destroy().await?;
temp_node.destroy().await?;
Ok(available_args_output)
}
Ok(available_args_output)
}
async fn spawn_node(&self, options: &SpawnNodeOptions) -> Result<DynNode, ProviderError> {
trace!("spawn node options {options:?}");
async fn spawn_node(&self, options: &SpawnNodeOptions) -> Result<DynNode, ProviderError> {
trace!("spawn node options {options:?}");
let node = NativeNode::new(NativeNodeOptions {
namespace: &self.weak,
namespace_base_dir: &self.base_dir,
name: &options.name,
program: &options.program,
args: &options.args,
env: &options.env,
startup_files: &options.injected_files,
created_paths: &options.created_paths,
db_snapshot: options.db_snapshot.as_ref(),
filesystem: &self.filesystem,
node_log_path: options.node_log_path.as_ref(),
})
.await?;
let node = NativeNode::new(NativeNodeOptions {
namespace: &self.weak,
namespace_base_dir: &self.base_dir,
name: &options.name,
program: &options.program,
args: &options.args,
env: &options.env,
startup_files: &options.injected_files,
created_paths: &options.created_paths,
db_snapshot: options.db_snapshot.as_ref(),
filesystem: &self.filesystem,
node_log_path: options.node_log_path.as_ref(),
})
.await?;
self.nodes
.write()
.await
.insert(options.name.clone(), node.clone());
self.nodes.write().await.insert(options.name.clone(), node.clone());
Ok(node)
}
Ok(node)
}
async fn spawn_node_from_json(
&self,
json_value: &serde_json::Value,
) -> Result<DynNode, ProviderError> {
let deserializable: DeserializableNativeNodeOptions =
serde_json::from_value(json_value.clone())?;
let options = NativeNodeOptions::from_deserializable(
&deserializable,
&self.weak,
&self.base_dir,
&self.filesystem,
);
async fn spawn_node_from_json(
&self,
json_value: &serde_json::Value,
) -> Result<DynNode, ProviderError> {
let deserializable: DeserializableNativeNodeOptions =
serde_json::from_value(json_value.clone())?;
let options = NativeNodeOptions::from_deserializable(
&deserializable,
&self.weak,
&self.base_dir,
&self.filesystem,
);
let pid = json_value
.get("process_handle")
.and_then(|v| v.as_i64())
.ok_or_else(|| ProviderError::InvalidConfig("Missing pid field".to_string()))?
as i32;
let node = NativeNode::attach_to_live(options, pid).await?;
let pid = json_value
.get("process_handle")
.and_then(|v| v.as_i64())
.ok_or_else(|| ProviderError::InvalidConfig("Missing pid field".to_string()))?
as i32;
let node = NativeNode::attach_to_live(options, pid).await?;
self.nodes
.write()
.await
.insert(node.name().to_string(), node.clone());
self.nodes.write().await.insert(node.name().to_string(), node.clone());
Ok(node)
}
Ok(node)
}
async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> {
let node_name = if let Some(name) = options.temp_name {
name
} else {
format!("temp-{}", Uuid::new_v4())
};
async fn generate_files(&self, options: GenerateFilesOptions) -> Result<(), ProviderError> {
let node_name = if let Some(name) = options.temp_name {
name
} else {
format!("temp-{}", Uuid::new_v4())
};
// we spawn a node doing nothing but looping so we can execute our commands
let temp_node = self
.spawn_node(
&SpawnNodeOptions::new(node_name, "bash".to_string())
.args(vec!["-c", "while :; do sleep 1; done"])
.injected_files(options.injected_files),
)
.await?;
// we spawn a node doing nothing but looping so we can execute our commands
let temp_node = self
.spawn_node(
&SpawnNodeOptions::new(node_name, "bash".to_string())
.args(vec!["-c", "while :; do sleep 1; done"])
.injected_files(options.injected_files),
)
.await?;
for GenerateFileCommand {
program,
args,
env,
local_output_path,
} in options.commands
{
trace!(
"🏗 building file {:?} in path {} with command {} {}",
local_output_path.as_os_str(),
self.base_dir.to_string_lossy(),
program,
args.join(" ")
);
let local_output_full_path = format!(
"{}{}{}",
self.base_dir.to_string_lossy(),
if local_output_path.starts_with("/") {
""
} else {
"/"
},
local_output_path.to_string_lossy()
);
for GenerateFileCommand { program, args, env, local_output_path } in options.commands {
trace!(
"🏗 building file {:?} in path {} with command {} {}",
local_output_path.as_os_str(),
self.base_dir.to_string_lossy(),
program,
args.join(" ")
);
let local_output_full_path = format!(
"{}{}{}",
self.base_dir.to_string_lossy(),
if local_output_path.starts_with("/") { "" } else { "/" },
local_output_path.to_string_lossy()
);
let contents = extract_execution_result(
&temp_node,
RunCommandOptions { program, args, env },
options.expected_path.as_ref(),
)
.await?;
self.filesystem
.write(local_output_full_path, contents)
.await
.map_err(|err| ProviderError::FileGenerationFailed(err.into()))?;
}
let contents = extract_execution_result(
&temp_node,
RunCommandOptions { program, args, env },
options.expected_path.as_ref(),
)
.await?;
self.filesystem
.write(local_output_full_path, contents)
.await
.map_err(|err| ProviderError::FileGenerationFailed(err.into()))?;
}
temp_node.destroy().await
}
temp_node.destroy().await
}
async fn static_setup(&self) -> Result<(), ProviderError> {
// no static setup exists for native provider
todo!()
}
async fn static_setup(&self) -> Result<(), ProviderError> {
// no static setup exists for native provider
todo!()
}
async fn destroy(&self) -> Result<(), ProviderError> {
let mut names = vec![];
async fn destroy(&self) -> Result<(), ProviderError> {
let mut names = vec![];
for node in self.nodes.read().await.values() {
node.abort()
.await
.map_err(|err| ProviderError::DestroyNodeFailed(node.name().to_string(), err))?;
names.push(node.name().to_string());
}
for node in self.nodes.read().await.values() {
node.abort()
.await
.map_err(|err| ProviderError::DestroyNodeFailed(node.name().to_string(), err))?;
names.push(node.name().to_string());
}
let mut nodes = self.nodes.write().await;
for name in names {
nodes.remove(&name);
}
let mut nodes = self.nodes.write().await;
for name in names {
nodes.remove(&name);
}
if let Some(provider) = self.provider.upgrade() {
provider.namespaces.write().await.remove(&self.name);
}
if let Some(provider) = self.provider.upgrade() {
provider.namespaces.write().await.remove(&self.name);
}
Ok(())
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use support::fs::local::LocalFileSystem;
use support::fs::local::LocalFileSystem;
use super::*;
use crate::{
types::{GenerateFileCommand, GenerateFilesOptions},
NativeProvider, Provider,
};
use super::*;
use crate::{
types::{GenerateFileCommand, GenerateFilesOptions},
NativeProvider, Provider,
};
fn unique_temp_dir() -> PathBuf {
let mut base = std::env::temp_dir();
base.push(format!("znet_native_ns_test_{}", uuid::Uuid::new_v4()));
base
}
fn unique_temp_dir() -> PathBuf {
let mut base = std::env::temp_dir();
base.push(format!("znet_native_ns_test_{}", uuid::Uuid::new_v4()));
base
}
#[tokio::test]
async fn generate_files_uses_expected_path_when_provided() {
let fs = LocalFileSystem;
let provider = NativeProvider::new(fs.clone());
let base_dir = unique_temp_dir();
// Namespace builder will create directory if needed
let ns = provider
.create_namespace_with_base_dir(&base_dir)
.await
.expect("namespace should be created");
#[tokio::test]
async fn generate_files_uses_expected_path_when_provided() {
let fs = LocalFileSystem;
let provider = NativeProvider::new(fs.clone());
let base_dir = unique_temp_dir();
// Namespace builder will create directory if needed
let ns = provider
.create_namespace_with_base_dir(&base_dir)
.await
.expect("namespace should be created");
// Create a unique on-host path that the native node will write to
let expected_path =
std::env::temp_dir().join(format!("znet_expected_{}.json", uuid::Uuid::new_v4()));
// Create a unique on-host path that the native node will write to
let expected_path =
std::env::temp_dir().join(format!("znet_expected_{}.json", uuid::Uuid::new_v4()));
// Command will write JSON into expected_path; stdout will be something else to ensure we don't read it
let program = "bash".to_string();
let script = format!(
"echo -n '{{\"hello\":\"world\"}}' > {} && echo should_not_be_used",
expected_path.to_string_lossy()
);
let args: Vec<String> = vec!["-lc".into(), script];
// Command will write JSON into expected_path; stdout will be something else to ensure we don't read it
let program = "bash".to_string();
let script = format!(
"echo -n '{{\"hello\":\"world\"}}' > {} && echo should_not_be_used",
expected_path.to_string_lossy()
);
let args: Vec<String> = vec!["-lc".into(), script];
let out_name = PathBuf::from("result_expected.json");
let cmd = GenerateFileCommand::new(program, out_name.clone()).args(args);
let options = GenerateFilesOptions::new(vec![cmd], None, Some(expected_path.clone()));
let out_name = PathBuf::from("result_expected.json");
let cmd = GenerateFileCommand::new(program, out_name.clone()).args(args);
let options = GenerateFilesOptions::new(vec![cmd], None, Some(expected_path.clone()));
ns.generate_files(options)
.await
.expect("generation should succeed");
ns.generate_files(options).await.expect("generation should succeed");
// Read produced file from namespace base_dir
let produced_path = base_dir.join(out_name);
let produced = fs
.read_to_string(&produced_path)
.await
.expect("should read produced file");
assert_eq!(produced, "{\"hello\":\"world\"}");
}
// Read produced file from namespace base_dir
let produced_path = base_dir.join(out_name);
let produced = fs.read_to_string(&produced_path).await.expect("should read produced file");
assert_eq!(produced, "{\"hello\":\"world\"}");
}
#[tokio::test]
async fn generate_files_uses_stdout_when_expected_path_absent() {
let fs = LocalFileSystem;
let provider = NativeProvider::new(fs.clone());
let base_dir = unique_temp_dir();
let ns = provider
.create_namespace_with_base_dir(&base_dir)
.await
.expect("namespace should be created");
#[tokio::test]
async fn generate_files_uses_stdout_when_expected_path_absent() {
let fs = LocalFileSystem;
let provider = NativeProvider::new(fs.clone());
let base_dir = unique_temp_dir();
let ns = provider
.create_namespace_with_base_dir(&base_dir)
.await
.expect("namespace should be created");
// Command prints to stdout only
let program = "bash".to_string();
let args: Vec<String> = vec!["-lc".into(), "echo -n 42".into()];
// Command prints to stdout only
let program = "bash".to_string();
let args: Vec<String> = vec!["-lc".into(), "echo -n 42".into()];
let out_name = PathBuf::from("result_stdout.txt");
let cmd = GenerateFileCommand::new(program, out_name.clone()).args(args);
let options = GenerateFilesOptions::new(vec![cmd], None, None);
let out_name = PathBuf::from("result_stdout.txt");
let cmd = GenerateFileCommand::new(program, out_name.clone()).args(args);
let options = GenerateFilesOptions::new(vec![cmd], None, None);
ns.generate_files(options)
.await
.expect("generation should succeed");
ns.generate_files(options).await.expect("generation should succeed");
let produced_path = base_dir.join(out_name);
let produced = fs
.read_to_string(&produced_path)
.await
.expect("should read produced file");
assert_eq!(produced, "42");
}
let produced_path = base_dir.join(out_name);
let produced = fs.read_to_string(&produced_path).await.expect("should read produced file");
assert_eq!(produced, "42");
}
}
File diff suppressed because it is too large Load Diff
@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{Arc, Weak},
collections::HashMap,
path::{Path, PathBuf},
sync::{Arc, Weak},
};
use async_trait::async_trait;
@@ -10,133 +10,124 @@ use tokio::sync::RwLock;
use super::namespace::NativeNamespace;
use crate::{
shared::helpers::extract_namespace_info, types::ProviderCapabilities, DynNamespace, Provider,
ProviderError, ProviderNamespace,
shared::helpers::extract_namespace_info, types::ProviderCapabilities, DynNamespace, Provider,
ProviderError, ProviderNamespace,
};
pub const PROVIDER_NAME: &str = "native";
pub struct NativeProvider<FS>
where
FS: FileSystem + Send + Sync + Clone,
FS: FileSystem + Send + Sync + Clone,
{
weak: Weak<NativeProvider<FS>>,
capabilities: ProviderCapabilities,
tmp_dir: PathBuf,
filesystem: FS,
pub(super) namespaces: RwLock<HashMap<String, Arc<NativeNamespace<FS>>>>,
weak: Weak<NativeProvider<FS>>,
capabilities: ProviderCapabilities,
tmp_dir: PathBuf,
filesystem: FS,
pub(super) namespaces: RwLock<HashMap<String, Arc<NativeNamespace<FS>>>>,
}
impl<FS> NativeProvider<FS>
where
FS: FileSystem + Send + Sync + Clone,
FS: FileSystem + Send + Sync + Clone,
{
pub fn new(filesystem: FS) -> Arc<Self> {
Arc::new_cyclic(|weak| NativeProvider {
weak: weak.clone(),
capabilities: ProviderCapabilities {
has_resources: false,
requires_image: false,
prefix_with_full_path: true,
use_default_ports_in_cmd: false,
},
// NOTE: temp_dir in linux return `/tmp` but on mac something like
// `/var/folders/rz/1cyx7hfj31qgb98d8_cg7jwh0000gn/T/`, having
// one `trailing slash` and the other no can cause issues if
// you try to build a fullpath by concatenate. Use Pathbuf to prevent the issue.
tmp_dir: std::env::temp_dir(),
filesystem,
namespaces: RwLock::new(HashMap::new()),
})
}
pub fn new(filesystem: FS) -> Arc<Self> {
Arc::new_cyclic(|weak| NativeProvider {
weak: weak.clone(),
capabilities: ProviderCapabilities {
has_resources: false,
requires_image: false,
prefix_with_full_path: true,
use_default_ports_in_cmd: false,
},
// NOTE: temp_dir in linux return `/tmp` but on mac something like
// `/var/folders/rz/1cyx7hfj31qgb98d8_cg7jwh0000gn/T/`, having
// one `trailing slash` and the other no can cause issues if
// you try to build a fullpath by concatenate. Use Pathbuf to prevent the issue.
tmp_dir: std::env::temp_dir(),
filesystem,
namespaces: RwLock::new(HashMap::new()),
})
}
pub fn tmp_dir(mut self, tmp_dir: impl Into<PathBuf>) -> Self {
self.tmp_dir = tmp_dir.into();
self
}
pub fn tmp_dir(mut self, tmp_dir: impl Into<PathBuf>) -> Self {
self.tmp_dir = tmp_dir.into();
self
}
}
#[async_trait]
impl<FS> Provider for NativeProvider<FS>
where
FS: FileSystem + Send + Sync + Clone + 'static,
FS: FileSystem + Send + Sync + Clone + 'static,
{
fn name(&self) -> &str {
PROVIDER_NAME
}
fn name(&self) -> &str {
PROVIDER_NAME
}
fn capabilities(&self) -> &ProviderCapabilities {
&self.capabilities
}
fn capabilities(&self) -> &ProviderCapabilities {
&self.capabilities
}
async fn namespaces(&self) -> HashMap<String, DynNamespace> {
self.namespaces
.read()
.await
.iter()
.map(|(name, namespace)| (name.clone(), namespace.clone() as DynNamespace))
.collect()
}
async fn namespaces(&self) -> HashMap<String, DynNamespace> {
self.namespaces
.read()
.await
.iter()
.map(|(name, namespace)| (name.clone(), namespace.clone() as DynNamespace))
.collect()
}
async fn create_namespace(&self) -> Result<DynNamespace, ProviderError> {
let namespace = NativeNamespace::new(
&self.weak,
&self.tmp_dir,
&self.capabilities,
&self.filesystem,
None,
)
.await?;
async fn create_namespace(&self) -> Result<DynNamespace, ProviderError> {
let namespace = NativeNamespace::new(
&self.weak,
&self.tmp_dir,
&self.capabilities,
&self.filesystem,
None,
)
.await?;
self.namespaces
.write()
.await
.insert(namespace.name().to_string(), namespace.clone());
self.namespaces.write().await.insert(namespace.name().to_string(), namespace.clone());
Ok(namespace)
}
Ok(namespace)
}
async fn create_namespace_with_base_dir(
&self,
base_dir: &Path,
) -> Result<DynNamespace, ProviderError> {
let namespace = NativeNamespace::new(
&self.weak,
&self.tmp_dir,
&self.capabilities,
&self.filesystem,
Some(base_dir),
)
.await?;
async fn create_namespace_with_base_dir(
&self,
base_dir: &Path,
) -> Result<DynNamespace, ProviderError> {
let namespace = NativeNamespace::new(
&self.weak,
&self.tmp_dir,
&self.capabilities,
&self.filesystem,
Some(base_dir),
)
.await?;
self.namespaces
.write()
.await
.insert(namespace.name().to_string(), namespace.clone());
self.namespaces.write().await.insert(namespace.name().to_string(), namespace.clone());
Ok(namespace)
}
Ok(namespace)
}
async fn create_namespace_from_json(
&self,
json_value: &serde_json::Value,
) -> Result<DynNamespace, ProviderError> {
let (base_dir, name) = extract_namespace_info(json_value)?;
async fn create_namespace_from_json(
&self,
json_value: &serde_json::Value,
) -> Result<DynNamespace, ProviderError> {
let (base_dir, name) = extract_namespace_info(json_value)?;
let namespace = NativeNamespace::attach_to_live(
&self.weak,
&self.capabilities,
&self.filesystem,
&base_dir,
&name,
)
.await?;
let namespace = NativeNamespace::attach_to_live(
&self.weak,
&self.capabilities,
&self.filesystem,
&base_dir,
&name,
)
.await?;
self.namespaces
.write()
.await
.insert(namespace.name().to_string(), namespace.clone());
self.namespaces.write().await.insert(namespace.name().to_string(), namespace.clone());
Ok(namespace)
}
Ok(namespace)
}
}
@@ -6,74 +6,69 @@ use crate::{types::RunCommandOptions, DynNode, ProviderError};
/// Check if we are running in `CI` by checking the 'RUN_IN_CI' env var
pub fn running_in_ci() -> bool {
env::var("RUN_IN_CI").unwrap_or_default() == "1"
env::var("RUN_IN_CI").unwrap_or_default() == "1"
}
/// Executes a command on a temporary node and extracts the execution result either from the
/// standard output or a file.
pub async fn extract_execution_result(
temp_node: &DynNode,
options: RunCommandOptions,
expected_path: Option<&PathBuf>,
temp_node: &DynNode,
options: RunCommandOptions,
expected_path: Option<&PathBuf>,
) -> Result<String, ProviderError> {
let output_contents = temp_node
.run_command(options)
.await?
.map_err(|(_, msg)| ProviderError::FileGenerationFailed(anyhow!("{msg}")))?;
let output_contents = temp_node
.run_command(options)
.await?
.map_err(|(_, msg)| ProviderError::FileGenerationFailed(anyhow!("{msg}")))?;
// If an expected_path is provided, read the file contents from inside the container
if let Some(expected_path) = expected_path.as_ref() {
Ok(temp_node
.run_command(
RunCommandOptions::new("cat")
.args(vec![expected_path.to_string_lossy().to_string()]),
)
.await?
.map_err(|(_, msg)| {
ProviderError::FileGenerationFailed(anyhow!(format!(
"failed reading expected_path {}: {}",
expected_path.display(),
msg
)))
})?)
} else {
Ok(output_contents)
}
// If an expected_path is provided, read the file contents from inside the container
if let Some(expected_path) = expected_path.as_ref() {
Ok(temp_node
.run_command(
RunCommandOptions::new("cat")
.args(vec![expected_path.to_string_lossy().to_string()]),
)
.await?
.map_err(|(_, msg)| {
ProviderError::FileGenerationFailed(anyhow!(format!(
"failed reading expected_path {}: {}",
expected_path.display(),
msg
)))
})?)
} else {
Ok(output_contents)
}
}
pub fn extract_namespace_info(
json_value: &serde_json::Value,
json_value: &serde_json::Value,
) -> Result<(PathBuf, String), ProviderError> {
let base_dir = json_value
.get("local_base_dir")
.and_then(|v| v.as_str())
.map(PathBuf::from)
.ok_or(ProviderError::InvalidConfig(
"`field local_base_dir` is missing from zombie.json".to_string(),
))?;
let base_dir =
json_value.get("local_base_dir").and_then(|v| v.as_str()).map(PathBuf::from).ok_or(
ProviderError::InvalidConfig(
"`field local_base_dir` is missing from zombie.json".to_string(),
),
)?;
let name =
json_value
.get("ns")
.and_then(|v| v.as_str())
.ok_or(ProviderError::InvalidConfig(
"field `ns` is missing from zombie.json".to_string(),
))?;
let name = json_value.get("ns").and_then(|v| v.as_str()).ok_or(
ProviderError::InvalidConfig("field `ns` is missing from zombie.json".to_string()),
)?;
Ok((base_dir, name.to_string()))
Ok((base_dir, name.to_string()))
}
#[cfg(test)]
mod tests {
use super::*;
use super::*;
#[test]
fn check_runing_in_ci_env_var() {
assert!(!running_in_ci());
// now set the env var
env::set_var("RUN_IN_CI", "1");
assert!(running_in_ci());
// reset
env::set_var("RUN_IN_CI", "");
}
#[test]
fn check_runing_in_ci_env_var() {
assert!(!running_in_ci());
// now set the env var
env::set_var("RUN_IN_CI", "1");
assert!(running_in_ci());
// reset
env::set_var("RUN_IN_CI", "");
}
}
+286 -297
View File
@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
path::{Path, PathBuf},
process::ExitStatus,
collections::HashMap,
path::{Path, PathBuf},
process::ExitStatus,
};
use configuration::{shared::resources::Resources, types::AssetLocation};
@@ -13,363 +13,352 @@ pub type ExecutionResult = Result<String, (ExitStatus, String)>;
#[derive(Debug, Clone, PartialEq)]
pub struct ProviderCapabilities {
// default ports internal
/// Ensure that we have an image for each node (k8s/podman/docker)
pub requires_image: bool,
/// Allow to customize the resources through manifest (k8s).
pub has_resources: bool,
/// Used in native to prefix filepath with fullpath
pub prefix_with_full_path: bool,
/// Use default ports in node cmd/args.
/// NOTE: generally used in k8s/dockers since the images expose those ports.
pub use_default_ports_in_cmd: bool,
// default ports internal
/// Ensure that we have an image for each node (k8s/podman/docker)
pub requires_image: bool,
/// Allow to customize the resources through manifest (k8s).
pub has_resources: bool,
/// Used in native to prefix filepath with fullpath
pub prefix_with_full_path: bool,
/// Use default ports in node cmd/args.
/// NOTE: generally used in k8s/dockers since the images expose those ports.
pub use_default_ports_in_cmd: bool,
}
#[derive(Debug, Clone)]
pub struct SpawnNodeOptions {
/// Name of the node
pub name: String,
/// Image of the node (IFF is supported by the provider)
pub image: Option<String>,
/// Resources to apply to the node (IFF is supported by the provider)
pub resources: Option<Resources>,
/// Main command to execute
pub program: String,
/// Arguments to pass to the main command
pub args: Vec<String>,
/// Environment to set when running the `program`
pub env: Vec<(String, String)>,
// TODO: rename startup_files
/// Files to inject at startup
pub injected_files: Vec<TransferedFile>,
/// Paths to create before start the node (e.g keystore)
/// should be created with `create_dir_all` in order
/// to create the full path even when we have missing parts
pub created_paths: Vec<PathBuf>,
/// Database snapshot to be injected (should be a tgz file)
/// Could be a local or remote asset
pub db_snapshot: Option<AssetLocation>,
pub port_mapping: Option<HashMap<Port, Port>>,
/// Optionally specify a log path for the node
pub node_log_path: Option<PathBuf>,
/// Name of the node
pub name: String,
/// Image of the node (IFF is supported by the provider)
pub image: Option<String>,
/// Resources to apply to the node (IFF is supported by the provider)
pub resources: Option<Resources>,
/// Main command to execute
pub program: String,
/// Arguments to pass to the main command
pub args: Vec<String>,
/// Environment to set when running the `program`
pub env: Vec<(String, String)>,
// TODO: rename startup_files
/// Files to inject at startup
pub injected_files: Vec<TransferedFile>,
/// Paths to create before start the node (e.g keystore)
/// should be created with `create_dir_all` in order
/// to create the full path even when we have missing parts
pub created_paths: Vec<PathBuf>,
/// Database snapshot to be injected (should be a tgz file)
/// Could be a local or remote asset
pub db_snapshot: Option<AssetLocation>,
pub port_mapping: Option<HashMap<Port, Port>>,
/// Optionally specify a log path for the node
pub node_log_path: Option<PathBuf>,
}
impl SpawnNodeOptions {
pub fn new<S>(name: S, program: S) -> Self
where
S: AsRef<str>,
{
Self {
name: name.as_ref().to_string(),
image: None,
resources: None,
program: program.as_ref().to_string(),
args: vec![],
env: vec![],
injected_files: vec![],
created_paths: vec![],
db_snapshot: None,
port_mapping: None,
node_log_path: None,
}
}
pub fn new<S>(name: S, program: S) -> Self
where
S: AsRef<str>,
{
Self {
name: name.as_ref().to_string(),
image: None,
resources: None,
program: program.as_ref().to_string(),
args: vec![],
env: vec![],
injected_files: vec![],
created_paths: vec![],
db_snapshot: None,
port_mapping: None,
node_log_path: None,
}
}
pub fn image<S>(mut self, image: S) -> Self
where
S: AsRef<str>,
{
self.image = Some(image.as_ref().to_string());
self
}
pub fn image<S>(mut self, image: S) -> Self
where
S: AsRef<str>,
{
self.image = Some(image.as_ref().to_string());
self
}
pub fn resources(mut self, resources: Resources) -> Self {
self.resources = Some(resources);
self
}
pub fn resources(mut self, resources: Resources) -> Self {
self.resources = Some(resources);
self
}
pub fn db_snapshot(mut self, db_snap: Option<AssetLocation>) -> Self {
self.db_snapshot = db_snap;
self
}
pub fn db_snapshot(mut self, db_snap: Option<AssetLocation>) -> Self {
self.db_snapshot = db_snap;
self
}
pub fn args<S, I>(mut self, args: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = S>,
{
self.args = args.into_iter().map(|s| s.as_ref().to_string()).collect();
self
}
pub fn args<S, I>(mut self, args: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = S>,
{
self.args = args.into_iter().map(|s| s.as_ref().to_string()).collect();
self
}
pub fn env<S, I>(mut self, env: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = (S, S)>,
{
self.env = env
.into_iter()
.map(|(name, value)| (name.as_ref().to_string(), value.as_ref().to_string()))
.collect();
self
}
pub fn env<S, I>(mut self, env: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = (S, S)>,
{
self.env = env
.into_iter()
.map(|(name, value)| (name.as_ref().to_string(), value.as_ref().to_string()))
.collect();
self
}
pub fn injected_files<I>(mut self, injected_files: I) -> Self
where
I: IntoIterator<Item = TransferedFile>,
{
self.injected_files = injected_files.into_iter().collect();
self
}
pub fn injected_files<I>(mut self, injected_files: I) -> Self
where
I: IntoIterator<Item = TransferedFile>,
{
self.injected_files = injected_files.into_iter().collect();
self
}
pub fn created_paths<P, I>(mut self, created_paths: I) -> Self
where
P: AsRef<Path>,
I: IntoIterator<Item = P>,
{
self.created_paths = created_paths
.into_iter()
.map(|path| path.as_ref().into())
.collect();
self
}
pub fn created_paths<P, I>(mut self, created_paths: I) -> Self
where
P: AsRef<Path>,
I: IntoIterator<Item = P>,
{
self.created_paths = created_paths.into_iter().map(|path| path.as_ref().into()).collect();
self
}
pub fn port_mapping(mut self, ports: HashMap<Port, Port>) -> Self {
self.port_mapping = Some(ports);
self
}
pub fn port_mapping(mut self, ports: HashMap<Port, Port>) -> Self {
self.port_mapping = Some(ports);
self
}
pub fn node_log_path(mut self, path: Option<PathBuf>) -> Self {
self.node_log_path = path;
self
}
pub fn node_log_path(mut self, path: Option<PathBuf>) -> Self {
self.node_log_path = path;
self
}
}
#[derive(Debug)]
pub struct GenerateFileCommand {
pub program: String,
pub args: Vec<String>,
pub env: Vec<(String, String)>,
pub local_output_path: PathBuf,
pub program: String,
pub args: Vec<String>,
pub env: Vec<(String, String)>,
pub local_output_path: PathBuf,
}
impl GenerateFileCommand {
pub fn new<S, P>(program: S, local_output_path: P) -> Self
where
S: AsRef<str>,
P: AsRef<Path>,
{
Self {
program: program.as_ref().to_string(),
args: vec![],
env: vec![],
local_output_path: local_output_path.as_ref().into(),
}
}
pub fn new<S, P>(program: S, local_output_path: P) -> Self
where
S: AsRef<str>,
P: AsRef<Path>,
{
Self {
program: program.as_ref().to_string(),
args: vec![],
env: vec![],
local_output_path: local_output_path.as_ref().into(),
}
}
pub fn args<S, I>(mut self, args: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = S>,
{
self.args = args.into_iter().map(|s| s.as_ref().to_string()).collect();
self
}
pub fn args<S, I>(mut self, args: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = S>,
{
self.args = args.into_iter().map(|s| s.as_ref().to_string()).collect();
self
}
pub fn env<S, I>(mut self, env: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = (S, S)>,
{
self.env = env
.into_iter()
.map(|(name, value)| (name.as_ref().to_string(), value.as_ref().to_string()))
.collect();
self
}
pub fn env<S, I>(mut self, env: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = (S, S)>,
{
self.env = env
.into_iter()
.map(|(name, value)| (name.as_ref().to_string(), value.as_ref().to_string()))
.collect();
self
}
}
#[derive(Debug)]
pub struct GenerateFilesOptions {
pub commands: Vec<GenerateFileCommand>,
pub image: Option<String>,
pub injected_files: Vec<TransferedFile>,
// Allow to control the name of the node used to create the files.
pub temp_name: Option<String>,
pub expected_path: Option<PathBuf>,
pub commands: Vec<GenerateFileCommand>,
pub image: Option<String>,
pub injected_files: Vec<TransferedFile>,
// Allow to control the name of the node used to create the files.
pub temp_name: Option<String>,
pub expected_path: Option<PathBuf>,
}
impl GenerateFilesOptions {
pub fn new<I>(commands: I, image: Option<String>, expected_path: Option<PathBuf>) -> Self
where
I: IntoIterator<Item = GenerateFileCommand>,
{
Self {
commands: commands.into_iter().collect(),
injected_files: vec![],
image,
temp_name: None,
expected_path,
}
}
pub fn new<I>(commands: I, image: Option<String>, expected_path: Option<PathBuf>) -> Self
where
I: IntoIterator<Item = GenerateFileCommand>,
{
Self {
commands: commands.into_iter().collect(),
injected_files: vec![],
image,
temp_name: None,
expected_path,
}
}
pub fn with_files<I>(
commands: I,
image: Option<String>,
injected_files: &[TransferedFile],
expected_path: Option<PathBuf>,
) -> Self
where
I: IntoIterator<Item = GenerateFileCommand>,
{
Self {
commands: commands.into_iter().collect(),
injected_files: injected_files.into(),
image,
temp_name: None,
expected_path,
}
}
pub fn with_files<I>(
commands: I,
image: Option<String>,
injected_files: &[TransferedFile],
expected_path: Option<PathBuf>,
) -> Self
where
I: IntoIterator<Item = GenerateFileCommand>,
{
Self {
commands: commands.into_iter().collect(),
injected_files: injected_files.into(),
image,
temp_name: None,
expected_path,
}
}
pub fn image<S>(mut self, image: S) -> Self
where
S: AsRef<str>,
{
self.image = Some(image.as_ref().to_string());
self
}
pub fn image<S>(mut self, image: S) -> Self
where
S: AsRef<str>,
{
self.image = Some(image.as_ref().to_string());
self
}
pub fn injected_files<I>(mut self, injected_files: I) -> Self
where
I: IntoIterator<Item = TransferedFile>,
{
self.injected_files = injected_files.into_iter().collect();
self
}
pub fn injected_files<I>(mut self, injected_files: I) -> Self
where
I: IntoIterator<Item = TransferedFile>,
{
self.injected_files = injected_files.into_iter().collect();
self
}
pub fn temp_name(mut self, name: impl Into<String>) -> Self {
self.temp_name = Some(name.into());
self
}
pub fn temp_name(mut self, name: impl Into<String>) -> Self {
self.temp_name = Some(name.into());
self
}
}
#[derive(Debug)]
pub struct RunCommandOptions {
pub program: String,
pub args: Vec<String>,
pub env: Vec<(String, String)>,
pub program: String,
pub args: Vec<String>,
pub env: Vec<(String, String)>,
}
impl RunCommandOptions {
pub fn new<S>(program: S) -> Self
where
S: AsRef<str>,
{
Self {
program: program.as_ref().to_string(),
args: vec![],
env: vec![],
}
}
pub fn new<S>(program: S) -> Self
where
S: AsRef<str>,
{
Self { program: program.as_ref().to_string(), args: vec![], env: vec![] }
}
pub fn args<S, I>(mut self, args: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = S>,
{
self.args = args.into_iter().map(|s| s.as_ref().to_string()).collect();
self
}
pub fn args<S, I>(mut self, args: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = S>,
{
self.args = args.into_iter().map(|s| s.as_ref().to_string()).collect();
self
}
pub fn env<S, I>(mut self, env: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = (S, S)>,
{
self.env = env
.into_iter()
.map(|(name, value)| (name.as_ref().to_string(), value.as_ref().to_string()))
.collect();
self
}
pub fn env<S, I>(mut self, env: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = (S, S)>,
{
self.env = env
.into_iter()
.map(|(name, value)| (name.as_ref().to_string(), value.as_ref().to_string()))
.collect();
self
}
}
pub struct RunScriptOptions {
pub local_script_path: PathBuf,
pub args: Vec<String>,
pub env: Vec<(String, String)>,
pub local_script_path: PathBuf,
pub args: Vec<String>,
pub env: Vec<(String, String)>,
}
impl RunScriptOptions {
pub fn new<P>(local_script_path: P) -> Self
where
P: AsRef<Path>,
{
Self {
local_script_path: local_script_path.as_ref().into(),
args: vec![],
env: vec![],
}
}
pub fn new<P>(local_script_path: P) -> Self
where
P: AsRef<Path>,
{
Self { local_script_path: local_script_path.as_ref().into(), args: vec![], env: vec![] }
}
pub fn args<S, I>(mut self, args: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = S>,
{
self.args = args.into_iter().map(|s| s.as_ref().to_string()).collect();
self
}
pub fn args<S, I>(mut self, args: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = S>,
{
self.args = args.into_iter().map(|s| s.as_ref().to_string()).collect();
self
}
pub fn env<S, I>(mut self, env: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = (S, S)>,
{
self.env = env
.into_iter()
.map(|(name, value)| (name.as_ref().to_string(), value.as_ref().to_string()))
.collect();
self
}
pub fn env<S, I>(mut self, env: I) -> Self
where
S: AsRef<str>,
I: IntoIterator<Item = (S, S)>,
{
self.env = env
.into_iter()
.map(|(name, value)| (name.as_ref().to_string(), value.as_ref().to_string()))
.collect();
self
}
}
// TODO(team): I think we can rename it to FileMap?
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransferedFile {
pub local_path: PathBuf,
pub remote_path: PathBuf,
// TODO: Can be narrowed to have strict typing on this?
pub mode: String,
pub local_path: PathBuf,
pub remote_path: PathBuf,
// TODO: Can be narrowed to have strict typing on this?
pub mode: String,
}
impl TransferedFile {
pub fn new<P>(local_path: P, remote_path: P) -> Self
where
P: AsRef<Path>,
{
Self {
local_path: local_path.as_ref().into(),
remote_path: remote_path.as_ref().into(),
mode: "0644".to_string(), // default to rw-r--r--
}
}
pub fn new<P>(local_path: P, remote_path: P) -> Self
where
P: AsRef<Path>,
{
Self {
local_path: local_path.as_ref().into(),
remote_path: remote_path.as_ref().into(),
mode: "0644".to_string(), // default to rw-r--r--
}
}
pub fn mode<S>(mut self, mode: S) -> Self
where
S: AsRef<str>,
{
self.mode = mode.as_ref().to_string();
self
}
pub fn mode<S>(mut self, mode: S) -> Self
where
S: AsRef<str>,
{
self.mode = mode.as_ref().to_string();
self
}
}
impl std::fmt::Display for TransferedFile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"File to transfer (local: {}, remote: {})",
self.local_path.display(),
self.remote_path.display()
)
}
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"File to transfer (local: {}, remote: {})",
self.local_path.display(),
self.remote_path.display()
)
}
}