difftreelog
refactor remove shell-outs for ssh
in: trunk
15 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
[[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
"anyhow",
"async-trait",
"base64 0.21.5",
+ "better-command",
"chrono",
"clap",
"futures",
@@ -1510,9 +1523,9 @@
[[package]]
name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
@@ -1922,6 +1935,10 @@
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
name = "rnix"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
[[package]]
name = "serde"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
dependencies = [
"serde_derive",
]
@@ -2123,9 +2140,9 @@
[[package]]
name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
@@ -2134,9 +2151,9 @@
[[package]]
name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
dependencies = [
"itoa",
"ryu",
@@ -2527,11 +2544,10 @@
[[package]]
name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
- "cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -2539,9 +2555,9 @@
[[package]]
name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
@@ -2550,9 +2566,9 @@
[[package]]
name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [
"once_cell",
"valuable",
@@ -2560,9 +2576,9 @@
[[package]]
name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
dependencies = [
"indicatif",
"tracing",
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
[workspace]
members = ["crates/*", "cmds/*"]
resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
edition = "2021"
[dependencies]
+nixlike.workspace = true
+better-command.workspace = true
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -15,7 +17,6 @@
hostname = "0.3.1"
age-core = "0.9.0"
peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
age = { version = "0.9.2", features = ["ssh", "armor"] }
base64 = "0.21.5"
chrono = { version = "0.4.31", features = ["serde"] }
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,3 +1,6 @@
+//! Wrapper around nix repl, which allows to work on nix code, without relying on
+//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.
+
use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
use std::fmt::{self, Display};
@@ -6,6 +9,7 @@
use std::sync::{Arc, OnceLock};
use anyhow::{anyhow, bail, ensure, Context, Result};
+use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};
use futures::StreamExt;
use itertools::Itertools;
use r2d2::{Pool, PooledConnection};
@@ -14,11 +18,9 @@
use tokio::io::AsyncWriteExt;
use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_util::codec::{FramedRead, LinesCodec};
use tracing::{debug, error, warn, Level};
-
-use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};
const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";
@@ -30,6 +32,8 @@
string_wrapping: (String, String),
number_wrapping: (String, String),
+ executing_command: Arc<Mutex<()>>,
+
next_id: u32,
free_list: Vec<u32>,
}
@@ -219,6 +223,8 @@
string_wrapping: Default::default(),
number_wrapping: Default::default(),
+ executing_command: Arc::new(Mutex::new(())),
+
next_id: 0,
free_list: vec![],
};
@@ -331,6 +337,10 @@
expr: impl AsRef<[u8]>,
err_handler: &mut dyn Handler,
) -> Result<String> {
+ // Prevent two commands from being executed in parallel, messing with each other.
+ let _lock = self.executing_command.clone();
+ let _guard = _lock.lock().await;
+
self.send_command(expr).await?;
// It will be echoed
self.send_command(REPL_DELIMITER).await?;
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
use std::{env::current_dir, time::Duration};
use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
use crate::nix_go;
use anyhow::{anyhow, Result};
use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
use tokio::{task::LocalSet, time::sleep};
use tracing::{error, field, info, info_span, warn, Instrument};
@@ -112,12 +112,12 @@
current: bool,
datetime: String,
}
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
- let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.arg("--list-generations");
// Sudo is required due to --list-generations acquiring lock on the profile.
- let data = config.run_string_on(host, cmd, true).await?;
+ let data = cmd.sudo().run_string().await?;
let generations = data
.split('\n')
.map(|e| e.trim())
@@ -161,25 +161,12 @@
.map_err(|_e| anyhow!("bad list-generations output"))?
.ok_or_else(|| anyhow!("failed to find generation"))?;
Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("stop").arg(unit);
- config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
- let mut cmd = MyCommand::new("systemctl");
- cmd.arg("start").arg(unit);
- config.run_on(host, cmd, true).await
}
async fn execute_upload(
build: &BuildSystems,
- config: &Config,
action: UploadAction,
- host: &str,
+ host: &ConfigHost,
built: PathBuf,
) -> Result<()> {
let mut failed = false;
@@ -191,15 +178,15 @@
if !build.disable_rollback {
let _span = info_span!("preparing").entered();
info!("preparing for rollback");
- let generation = get_current_generation(config, host).await?;
+ let generation = get_current_generation(&host).await?;
info!(
"rollback target would be {} {}",
generation.id, generation.datetime
);
{
- let mut cmd = MyCommand::new("sh");
+ let mut cmd = host.cmd("sh").await?;
cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to set rollback marker: {e}");
failed = true;
}
@@ -215,37 +202,41 @@
// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
// Anyway, reboot will still help in this case.
if action.should_schedule_rollback_run() {
- let mut cmd = MyCommand::new("systemd-run");
+ let mut cmd = host.cmd("systemd-run").await?;
cmd.comparg("--on-active", "3min")
.comparg("--unit", "rollback-watchdog-run")
.arg("systemctl")
.arg("start")
.arg("rollback-watchdog.service");
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to schedule rollback run: {e}");
failed = true;
}
}
}
+
if action.should_switch_profile() && !failed {
info!("switching generation");
- let mut cmd = MyCommand::new("nix-env");
+ let mut cmd = host.cmd("nix-env").await?;
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.comparg("--set", &built);
- if let Err(e) = config.run_on(host, cmd, true).await {
+ if let Err(e) = cmd.sudo().run().await {
error!("failed to switch generation: {e}");
failed = true;
}
}
+
+ // FIXME: Connection might be disconnected after activation run
+
if action.should_activate() && !failed {
let _span = info_span!("activating").entered();
info!("executing activation script");
let mut switch_script = built.clone();
switch_script.push("bin");
switch_script.push("switch-to-configuration");
- let mut cmd = MyCommand::new(switch_script);
+ let mut cmd = host.cmd(switch_script).await?;
cmd.arg(action.name());
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = cmd.sudo().run().in_current_span().await {
error!("failed to activate: {e}");
failed = true;
}
@@ -253,7 +244,8 @@
if !build.disable_rollback {
if failed {
info!("executing rollback");
- if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+ if let Err(e) = host
+ .systemctl_start("rollback-watchdog.service")
.instrument(info_span!("rollback"))
.await
{
@@ -261,27 +253,29 @@
}
} else {
info!("trying to mark upgrade as successful");
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+ if let Err(e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
}
}
info!("disarming watchdog, just in case");
- if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+ if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
// It is ok, if there was no reboot - then timer might not be running.
}
if action.should_schedule_rollback_run() {
- if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+ if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
error!("failed to disarm rollback run: {e}");
}
}
- } else {
- let mut cmd = MyCommand::new("rm");
- cmd.arg("-f").arg("/etc/fleet_rollback_marker");
- if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
- // Marker might not exist, yet better try to remove it.
- }
+ } else if let Err(_e) = host
+ .rm_file("/etc/fleet_rollback_marker", true)
+ .in_current_span()
+ .await
+ {
+ // Marker might not exist, yet better try to remove it.
}
Ok(())
}
@@ -289,12 +283,13 @@
impl BuildSystems {
async fn build_task(self, config: Config, host: String) -> Result<()> {
info!("building");
+ let host = config.host(&host).await?;
let action = Action::from(self.subcommand.clone());
let fleet_field = &config.fleet_field;
let drv = nix_go!(
fleet_field.buildSystems(Obj {
localSystem: { config.local_system.clone() }
- })[{ action.build_attr() }][{ host }]
+ })[{ action.build_attr() }][{ &host.name }]
);
let outputs = drv.build().await.map_err(|e| {
if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
match action {
Action::Upload { action } => {
- if !config.is_local(&host) {
+ if !config.is_local(&host.name) {
info!("uploading system closure");
{
+ // TODO: Move to remote_derivation method.
// Alternatively, nix store make-content-addressed can be used,
// at least for the first deployment, to provide trusted store key.
//
@@ -329,13 +325,11 @@
}
let mut tries = 0;
loop {
- let mut nix = MyCommand::new("nix");
- nix.arg("copy")
- .arg("--substitute-on-destination")
- .comparg("--to", format!("ssh-ng://{host}"))
- .arg(out_output);
- match nix.run_nix().await {
- Ok(()) => break,
+ match host.remote_derivation(out_output).await {
+ Ok(remote) => {
+ assert!(&remote == out_output, "CA derivations aren't implemented");
+ break;
+ }
Err(e) if tries < 3 => {
tries += 1;
warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
}
}
if let Some(action) = action {
- execute_upload(&self, &config, action, &host, out_output.clone()).await?
+ execute_upload(&self, action, &host, out_output.clone()).await?
}
}
Action::Package(PackageAction::SdImage) => {
let mut out = current_dir()?;
- out.push(format!("sd-image-{}", host));
+ out.push(format!("sd-image-{}", host.name));
info!("linking sd image to {:?}", out);
symlink(out_output, out)?;
}
Action::Package(PackageAction::InstallationCd) => {
let mut out = current_dir()?;
- out.push(format!("installation-cd-{}", host));
+ out.push(format!("installation-cd-{}", host.name));
info!("linking iso image to {:?}", out);
symlink(out_output, out)?;
@@ -379,6 +373,17 @@
let this = this.clone();
let span = info_span!("deployment", host = field::display(&host.name));
let hostname = host.name;
+ // FIXME: Since the introduction of better-nix-eval,
+ // due to single repl used for builds, hosts are waiting for each other to build,
+ // instead of building concurrently.
+ //
+ // Open multiple repls?
+ //
+ // Create build batcher, which will behave similar to golangs
+ // WaitGroup, and start executing once all the build tasks are scheduled?
+ // This also allows to cleanup build output, as there will be no longer
+ // "waiting for remote machine" messages in the cases when one package is needed for
+ // multiple hosts.
set.spawn_local(
(async move {
match this.build_task(config, hostname).await {
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use chrono::{DateTime, Utc};
use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
use itertools::Itertools;
use owo_colors::OwoColorize;
use std::{
@@ -404,9 +404,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(data) = secret.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, data, target_recipients)
- .await?;
+ let host = config.host(&identity_holder).await?;
+ let encrypted = host.reencrypt(data, target_recipients).await?;
secret.secret.secret = Some(encrypted);
}
@@ -481,9 +480,8 @@
target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
if let Some(secret) = data.secret.secret {
- let encrypted = config
- .reencrypt_on_host(identity_holder, secret, target_recipients)
- .await?;
+ let host = config.host(identity_holder).await?;
+ let encrypted = host.reencrypt(secret, target_recipients).await?;
data.secret.secret = Some(encrypted);
}
cmds/fleet/src/command.rsdiffbeforeafterboth1use std::{2 collections::HashMap,3 ffi::OsStr,4 pin,5 process::Stdio,6 sync::{Arc, Mutex},7 task::Poll,8};910use anyhow::{anyhow, Result};11use futures::StreamExt;12use itertools::Either;13use once_cell::sync::Lazy;14use openssh::{OverSsh, OwningCommand, Session};15use regex::Regex;16use serde::{de::Visitor, Deserialize};17use tokio::{io::AsyncRead, process::Command, select};18use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};19use tracing::{info, info_span, warn, Span};20use tracing_indicatif::span_ext::IndicatifSpanExt;2122fn escape_bash(input: &str, out: &mut String) {23 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";24 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {25 out.push_str(input);26 return;27 }28 out.push('\'');29 for (i, v) in input.split('\'').enumerate() {30 if i != 0 {31 out.push_str("'\"'\"'");32 }33 out.push_str(v);34 }35 out.push('\'');36}37fn ostoutf8(os: impl AsRef<OsStr>) -> String {38 os.as_ref().to_str().expect("non-utf8 data").to_owned()39}40#[derive(Clone)]41pub struct MyCommand {42 command: String,43 args: Vec<String>,44 env: Vec<(String, String)>,45 ssh_session: Option<Arc<Session>>,46}47impl MyCommand {48 pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {49 assert!(!cmd.as_ref().is_empty());50 Self {51 command: ostoutf8(cmd),52 args: vec![],53 env: vec![],54 ssh_session: Some(session),55 }56 }57 pub fn new(cmd: impl AsRef<OsStr>) -> Self {58 assert!(!cmd.as_ref().is_empty());59 Self {60 command: ostoutf8(cmd),61 args: vec![],62 env: vec![],63 ssh_session: None,64 }65 }66 fn into_args(self) -> Vec<String> {67 let mut out = Vec::new();68 if !self.env.is_empty() {69 out.push("env".to_owned());70 for (k, v) in self.env {71 assert!(!k.contains('='));72 out.push(format!("{k}={v}"));73 }74 }75 out.push(self.command);76 out.extend(self.args);77 out78 }7980 /// Translates environment variables into env command execution.81 /// Required for ssh, as ssh don't allow to send environment variables (at least by default).82 ///83 /// FIXME: Insecure, as arguments might be seen by other users on the same machine.84 /// Figure out some way to transfer environment using stdio?85 fn translate_env_into_env(self) -> Self {86 if self.env.is_empty() {87 return self;88 }89 let mut out = Self::new("env");90 if let Some(session) = self.ssh_session {91 out = out.ssh_session(session);92 }93 for (k, v) in self.env {94 assert!(!k.contains('='));95 out.arg(format!("{k}={v}"));96 }97 out.arg(self.command);98 out.args(self.args);99100 out101 }102 fn into_string(self) -> String {103 let mut out = String::new();104 if !self.env.is_empty() {105 out.push_str("env");106 for (k, v) in self.env {107 out.push(' ');108 assert!(!k.contains('='));109 escape_bash(&k, &mut out);110 out.push('=');111 escape_bash(&v, &mut out);112 }113 }114 if !out.is_empty() {115 out.push(' ');116 }117 escape_bash(&self.command, &mut out);118 for arg in self.args {119 out.push(' ');120 escape_bash(&arg, &mut out);121 }122 out123 }124 fn into_command(self) -> Command {125 let mut out = Command::new(self.command);126 out.args(self.args);127 for (k, v) in self.env {128 out.env(k, v);129 }130 out131 }132 fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {133 Ok(if let Some(session) = self.ssh_session.clone() {134 let cmd = self.translate_env_into_env().into_command();135 Either::Right(136 cmd.over_ssh(session)137 .map_err(|e| anyhow!("ssh error: {e}"))?,138 )139 } else {140 let cmd = self.into_command();141 Either::Left(cmd)142 })143 }144 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {145 let arg = arg.as_ref();146 self.args.push(ostoutf8(arg));147 self148 }149 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {150 let arg = arg.as_ref();151 let value = value.as_ref();152 let arg = ostoutf8(arg);153 let value = ostoutf8(value);154 self.arg(format!("{arg}={value}"));155 self156 }157 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {158 self.arg(arg);159 self.arg(value);160 self161 }162 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {163 self.env164 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));165 self166 }167 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {168 for arg in args.into_iter() {169 let arg = arg.as_ref();170 self.args.push(ostoutf8(arg));171 }172 self173 }174 pub fn sudo(mut self) -> Self {175 if std::env::var_os("NO_SUDO").is_some() {176 let mut out = Self::new("su");177 out.ssh_session = self.ssh_session.take();178 out.arg("-c").arg(self.into_string());179 out180 } else {181 let mut out = Self::new("sudo");182 out.args(self.into_args());183 out184 }185 }186 pub fn ssh_session(mut self, on: Arc<Session>) -> Self {187 self.ssh_session = Some(on);188 self189 }190 pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {191 let mut out = Self::new("ssh");192 out.ssh_session = self.ssh_session.take();193 out.arg(on).arg("--");194 out.arg(self.into_string());195 out196 }197198 pub async fn run(self) -> Result<()> {199 let str = self.clone().into_string();200 let cmd = self.into_command_new()?;201 match cmd {202 Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,203 Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,204 };205 Ok(())206 }207 pub async fn run_string(self) -> Result<String> {208 let bytes = self.run_bytes().await?;209 Ok(String::from_utf8(bytes)?)210 }211 pub async fn run_bytes(self) -> Result<Vec<u8>> {212 let str = self.clone().into_string();213 let cmd = self.into_command_new()?;214 let v = match cmd {215 Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,216 Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,217 };218 Ok(v)219 }220221 pub async fn run_nix_string(self) -> Result<String> {222 let str = self.clone().into_string();223 let mut cmd = self.into_command();224 cmd.arg("--log-format").arg("internal-json");225 let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;226 Ok(String::from_utf8(bytes)?)227 }228 pub async fn run_nix(self) -> Result<()> {229 let str = self.clone().into_string();230 let mut cmd = self.into_command();231 cmd.arg("--log-format").arg("internal-json");232 cmd.stdout(Stdio::inherit());233 run_nix_inner(str, cmd, &mut NixHandler::default()).await234 }235}236237struct EmptyAsyncRead;238impl AsyncRead for EmptyAsyncRead {239 fn poll_read(240 self: std::pin::Pin<&mut Self>,241 _cx: &mut std::task::Context<'_>,242 _buf: &mut tokio::io::ReadBuf<'_>,243 ) -> Poll<std::io::Result<()>> {244 Poll::Pending245 }246}247248async fn run_nix_inner_stdout(249 str: String,250 cmd: Command,251 handler: &mut dyn Handler,252) -> Result<Vec<u8>> {253 Ok(run_nix_inner_raw(str, cmd, true, handler, None)254 .await?255 .expect("has out"))256}257async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {258 let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;259 assert!(v.is_none());260 Ok(())261}262async fn run_nix_inner_stdout_ssh(263 str: String,264 cmd: OwningCommand<Arc<Session>>,265 handler: &mut dyn Handler,266) -> Result<Vec<u8>> {267 Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)268 .await?269 .expect("has out"))270}271async fn run_nix_inner_ssh(272 str: String,273 cmd: OwningCommand<Arc<Session>>,274 handler: &mut dyn Handler,275) -> Result<()> {276 let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;277 assert!(v.is_none());278 Ok(())279}280281pub trait Handler: Send {282 fn handle_line(&mut self, e: &str);283}284285pub struct ClonableHandler<H>(Arc<Mutex<H>>);286impl<H> Clone for ClonableHandler<H> {287 fn clone(&self) -> Self {288 Self(self.0.clone())289 }290}291impl<H> ClonableHandler<H> {292 pub fn new(inner: H) -> Self {293 Self(Arc::new(Mutex::new(inner)))294 }295}296impl<H: Handler> Handler for ClonableHandler<H> {297 fn handle_line(&mut self, e: &str) {298 self.0.lock().unwrap().handle_line(e)299 }300}301302struct PlainHandler;303impl Handler for PlainHandler {304 fn handle_line(&mut self, e: &str) {305 info!(target: "log", "{e}");306 }307}308309pub struct NoopHandler;310impl Handler for NoopHandler {311 fn handle_line(&mut self, _e: &str) {}312}313314#[derive(Default)]315pub struct NixHandler {316 spans: HashMap<u64, Span>,317}318fn process_message(m: &str) -> String {319 static OSC_CLEANER: Lazy<Regex> =320 Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());321 static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());322 let m = OSC_CLEANER.replace_all(m, "");323 // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,324 // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.325 DETABBER.replace_all(m.as_ref(), " ").to_string()326}327impl Handler for NixHandler {328 fn handle_line(&mut self, e: &str) {329 if let Some(e) = e.strip_prefix("@nix ") {330 let log: NixLog = match serde_json::from_str(e) {331 Ok(l) => l,332 Err(err) => {333 warn!("failed to parse nix log line {:?}: {}", e, err);334 return;335 }336 };337 match log {338 NixLog::Msg { msg, raw_msg, .. } => {339 #[allow(clippy::nonminimal_bool)]340 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))341 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")342 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {343 if let Some(raw_msg) = raw_msg {344 if !msg.is_empty() {345 info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())346 } else {347 info!(target: "nix", "{}", raw_msg.trim_end())348 }349 } else {350 info!(target: "nix", "{}", msg.trim_end())351 }352 }353 }354 NixLog::Start {355 ref fields,356 typ,357 id,358 ..359 } if typ == 105 && !fields.is_empty() => {360 if let [LogField::String(drv), ..] = &fields[..] {361 let mut drv = drv.as_str();362 if let Some(pkg) = drv.strip_prefix("/nix/store/") {363 let mut it = pkg.splitn(2, '-');364 it.next();365 if let Some(pkg) = it.next() {366 drv = pkg;367 }368 }369 info!(target: "nix","building {}", drv);370 let span = info_span!("build", drv);371 span.pb_start();372 self.spans.insert(id, span);373 } else {374 warn!("bad build log: {:?}", log)375 }376 }377 NixLog::Start {378 ref fields,379 typ,380 id,381 ..382 } if typ == 100 && fields.len() >= 3 => {383 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =384 &fields[..]385 {386 let mut drv = drv.as_str();387388 if let Some(pkg) = drv.strip_prefix("/nix/store/") {389 let mut it = pkg.splitn(2, '-');390 it.next();391 if let Some(pkg) = it.next() {392 drv = pkg;393 }394 }395 // info!(target: "nix","copying {} {} -> {}", drv, from, to);396 let span = info_span!("copy", from, to, drv);397 span.pb_start();398 self.spans.insert(id, span);399 } else {400 warn!("bad copy log: {:?}", log)401 }402 }403 NixLog::Start { text, typ, id, .. }404 if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>405 {406 if !text.is_empty()407 && text != "querying info about missing paths"408 && text != "copying 0 paths"409 // Too much spam on lazy-trees branch410 && !(text.starts_with("copying '") && text.ends_with("' to the store"))411 {412 let span = info_span!("job");413 span.pb_start();414 span.pb_set_message(&process_message(text.trim()));415 self.spans.insert(id, span);416 info!(target: "nix", "{}", text);417 }418 }419 NixLog::Start {420 text,421 level: 0,422 typ: 108,423 ..424 } if text.is_empty() => {425 // Cache lookup? Coupled with copy log426 }427 NixLog::Start {428 text,429 level: 4,430 typ: 109,431 ..432 } if text.starts_with("querying info about ") => {433 // Cache lookup434 }435 NixLog::Start {436 text,437 level: 4,438 typ: 101,439 ..440 } if text.starts_with("downloading ") => {441 // NAR downloading, coupled with copy log442 }443 NixLog::Start {444 text,445 level: 1,446 typ: 111,447 ..448 } if text.starts_with("waiting for a machine to build ") => {449 // Useless repeating notification about build450 }451 NixLog::Start {452 text,453 level: 3,454 typ: 111,455 ..456 } if text.starts_with("resolved derivation: ") => {457 // CA resolved458 }459 NixLog::Start {460 text,461 level: 1,462 typ: 111,463 id,464 ..465 } if text.starts_with("waiting for lock on ") => {466 let mut drv = text.strip_prefix("waiting for lock on ").unwrap();467 if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {468 drv = txt;469 }470 if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {471 drv = txt;472 }473 if let Some(txt) = drv.split("', '").next() {474 drv = txt;475 }476 if let Some(pkg) = drv.strip_prefix("/nix/store/") {477 let mut it = pkg.splitn(2, '-');478 it.next();479 if let Some(pkg) = it.next() {480 drv = pkg;481 }482 }483 let span = info_span!("waiting on drv", drv);484 span.pb_start();485 self.spans.insert(id, span);486 // Concurrent build of the same message487 }488 NixLog::Stop { id, .. } => {489 self.spans.remove(&id);490 }491 NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {492 if let Some(span) = self.spans.get(&id) {493 if let LogField::String(s) = &fields[0] {494 span.pb_set_message(&process_message(s.trim()));495 } else {496 warn!("bad fields: {fields:?}");497 }498 } else {499 warn!("unknown result id: {id} {typ} {fields:?}");500 }501 // dbg!(fields, id, typ);502 }503 NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {504 if let Some(span) = self.spans.get(&id) {505 if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =506 &fields[..4]507 {508 span.pb_set_length(*expected);509 span.pb_set_position(*done);510 } else {511 warn!("bad fields: {fields:?}");512 }513 } else {514 // warn!("unknown result id: {id} {typ} {fields:?}");515 // Unaccounted progress.516 }517 // dbg!(fields, id, typ);518 }519 NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {520 // Set phase, expected521 }522 _ => warn!("unknown log: {:?}", log),523 };524 } else {525 let e = e.trim();526 if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {527 return;528 }529 info!("{e}")530 }531 }532}533534async fn run_nix_inner_raw(535 str: String,536 mut cmd: Command,537 want_stdout: bool,538 err_handler: &mut dyn Handler,539 mut out_handler: Option<&mut dyn Handler>,540) -> Result<Option<Vec<u8>>> {541 cmd.stderr(Stdio::piped());542 cmd.stdout(Stdio::piped());543 let mut child = cmd.spawn()?;544 let mut stderr = child.stderr.take().unwrap();545 let stdout = child.stdout.take().unwrap();546 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());547 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));548 let mut ob = want_stdout549 .then(|| out.take().unwrap())550 .unwrap_or_else(|| Box::new(EmptyAsyncRead));551 let mut ol = (!want_stdout)552 .then(|| out.take().unwrap())553 .unwrap_or_else(|| Box::new(EmptyAsyncRead));554 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());555 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());556557 // while let Some(line) = read.next().await? {}558559 let mut out_buf = if want_stdout { Some(vec![]) } else { None };560 loop {561 select! {562 e = err.next() => {563 if let Some(e) = e {564 let e = e?;565 err_handler.handle_line(&e);566 }567 },568 o = ob.next() => {569 if let Some(o) = o {570 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);571 }572 },573 o = ol.next() => {574 if let Some(o) = o {575 let o = o?;576 if let Some(out) = out_handler.as_mut() {577 out.handle_line(&o)578 } else {579 err_handler.handle_line(&o)580 }581 // out_handler.handle_info(&o);582 }583 },584 code = child.wait() => {585 let code = code?;586 if !code.success() {587 anyhow::bail!("command '{str}' failed with status {}", code);588 }589 break;590 }591 }592 }593594 Ok(out_buf)595}596async fn run_nix_inner_raw_ssh(597 str: String,598 mut cmd: OwningCommand<Arc<Session>>,599 want_stdout: bool,600 err_handler: &mut dyn Handler,601 mut out_handler: Option<&mut dyn Handler>,602) -> Result<Option<Vec<u8>>> {603 cmd.stderr(openssh::Stdio::piped());604 cmd.stdout(openssh::Stdio::piped());605 let mut child = cmd.spawn().await?;606 let mut stderr = child.stderr().take().unwrap();607 let stdout = child.stdout().take().unwrap();608 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());609 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));610 let mut ob = want_stdout611 .then(|| out.take().unwrap())612 .unwrap_or_else(|| Box::new(EmptyAsyncRead));613 let mut ol = (!want_stdout)614 .then(|| out.take().unwrap())615 .unwrap_or_else(|| Box::new(EmptyAsyncRead));616 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());617 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());618619 // while let Some(line) = read.next().await? {}620621 let mut out_buf = if want_stdout { Some(vec![]) } else { None };622623 let mut wait_future = pin::pin!(child.wait());624 loop {625 select! {626 e = err.next() => {627 if let Some(e) = e {628 let e = e?;629 err_handler.handle_line(&e);630 }631 },632 o = ob.next() => {633 if let Some(o) = o {634 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);635 }636 },637 o = ol.next() => {638 if let Some(o) = o {639 let o = o?;640 if let Some(out) = out_handler.as_mut() {641 out.handle_line(&o)642 } else {643 err_handler.handle_line(&o)644 }645 // out_handler.handle_info(&o);646 }647 },648 code = &mut wait_future => {649 let code = code?;650 if !code.success() {651 anyhow::bail!("command '{str}' failed with status {}", code);652 }653 break;654 }655 }656 }657658 Ok(out_buf)659}660661pub trait ErrorRecorder: Send {662 /// Return true to discard message from logging663 fn push_message(&mut self, msg: &str) -> bool;664}665666#[derive(Debug)]667enum LogField {668 String(String),669 Num(u64),670}671672impl<'de> Deserialize<'de> for LogField {673 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>674 where675 D: serde::Deserializer<'de>,676 {677 struct StringOrNum;678 impl<'de> Visitor<'de> for StringOrNum {679 type Value = LogField;680681 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {682 write!(f, "string or unsigned")683 }684685 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>686 where687 E: serde::de::Error,688 {689 Ok(LogField::String(v.to_owned()))690 }691692 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>693 where694 E: serde::de::Error,695 {696 Ok(LogField::Num(v))697 }698 }699700 deserializer.deserialize_any(StringOrNum)701 }702}703704#[derive(Deserialize, Debug)]705#[serde(rename_all = "camelCase", tag = "action")]706#[allow(dead_code)]707enum NixLog {708 Msg {709 level: u32,710 msg: String,711 raw_msg: Option<String>,712 },713 Start {714 id: u64,715 level: u32,716 #[serde(default)]717 fields: Vec<LogField>,718 text: String,719 #[serde(rename = "type")]720 typ: u32,721 },722 Stop {723 id: u64,724 },725 Result {726 id: u64,727 #[serde(rename = "type")]728 typ: u32,729 #[serde(default)]730 fields: Vec<LogField>,731 },732}1use std::thread::sleep;2use std::time::Duration;3use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};45use anyhow::{anyhow, Result};6use better_command::{Handler, NixHandler, PlainHandler};7use futures::StreamExt;8use itertools::Either;9use openssh::{OverSsh, OwningCommand, Session};10use tokio::{io::AsyncRead, process::Command, select};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{info, debug};1314fn escape_bash(input: &str, out: &mut String) {15 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17 out.push_str(input);18 return;19 }20 out.push('\'');21 for (i, v) in input.split('\'').enumerate() {22 if i != 0 {23 out.push_str("'\"'\"'");24 }25 out.push_str(v);26 }27 out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30 os.as_ref().to_str().expect("non-utf8 data").to_owned()31}32#[derive(Clone)]33pub struct MyCommand {34 command: String,35 args: Vec<String>,36 env: Vec<(String, String)>,37 ssh_session: Option<Arc<Session>>,38}39impl MyCommand {40 pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {41 assert!(!cmd.as_ref().is_empty());42 Self {43 command: ostoutf8(cmd),44 args: vec![],45 env: vec![],46 ssh_session: Some(session),47 }48 }49 pub fn new(cmd: impl AsRef<OsStr>) -> Self {50 assert!(!cmd.as_ref().is_empty());51 Self {52 command: ostoutf8(cmd),53 args: vec![],54 env: vec![],55 ssh_session: None,56 }57 }58 fn into_args(self) -> Vec<String> {59 let mut out = Vec::new();60 if !self.env.is_empty() {61 out.push("env".to_owned());62 for (k, v) in self.env {63 assert!(!k.contains('='));64 out.push(format!("{k}={v}"));65 }66 }67 out.push(self.command);68 out.extend(self.args);69 out70 }7172 /// Translates environment variables into env command execution.73 /// Required for ssh, as ssh don't allow to send environment variables (at least by default).74 ///75 /// FIXME: Insecure, as arguments might be seen by other users on the same machine.76 /// Figure out some way to transfer environment using stdio?77 fn translate_env_into_env(self) -> Self {78 if self.env.is_empty() {79 return self;80 }81 let mut out = Self::new("env");82 out.ssh_session = self.ssh_session;83 for (k, v) in self.env {84 assert!(!k.contains('='));85 out.arg(format!("{k}={v}"));86 }87 out.arg(self.command);88 out.args(self.args);8990 out91 }92 fn into_string(self) -> String {93 let mut out = String::new();94 if !self.env.is_empty() {95 out.push_str("env");96 for (k, v) in self.env {97 out.push(' ');98 assert!(!k.contains('='));99 escape_bash(&k, &mut out);100 out.push('=');101 escape_bash(&v, &mut out);102 }103 }104 if !out.is_empty() {105 out.push(' ');106 }107 escape_bash(&self.command, &mut out);108 for arg in self.args {109 out.push(' ');110 escape_bash(&arg, &mut out);111 }112 out113 }114 fn into_command(self) -> Command {115 let mut out = Command::new(self.command);116 out.args(self.args);117 for (k, v) in self.env {118 out.env(k, v);119 }120 out121 }122 fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {123 Ok(if let Some(session) = self.ssh_session.clone() {124 let cmd = self.translate_env_into_env().into_command();125 Either::Right(126 cmd.over_ssh(session)127 .map_err(|e| anyhow!("ssh error: {e}"))?,128 )129 } else {130 let cmd = self.into_command();131 Either::Left(cmd)132 })133 }134 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {135 let arg = arg.as_ref();136 self.args.push(ostoutf8(arg));137 self138 }139 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {140 let arg = arg.as_ref();141 let value = value.as_ref();142 let arg = ostoutf8(arg);143 let value = ostoutf8(value);144 self.arg(format!("{arg}={value}"));145 self146 }147 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {148 self.arg(arg);149 self.arg(value);150 self151 }152 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {153 self.env154 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));155 self156 }157 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {158 for arg in args.into_iter() {159 let arg = arg.as_ref();160 self.args.push(ostoutf8(arg));161 }162 self163 }164 pub fn sudo(mut self) -> Self {165 if std::env::var_os("NO_SUDO").is_some() {166 let mut out = Self::new("su");167 out.ssh_session = self.ssh_session.take();168 out.arg("-c").arg(self.into_string());169 out170 } else {171 let mut out = Self::new("sudo");172 out.ssh_session = self.ssh_session.take();173 out.args(self.into_args());174 out175 }176 }177178 pub async fn run(self) -> Result<()> {179 let str = self.clone().into_string();180 let cmd = self.into_command_new()?;181 match cmd {182 Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,183 Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,184 };185 Ok(())186 }187 pub async fn run_string(self) -> Result<String> {188 let bytes = self.run_bytes().await?;189 Ok(String::from_utf8(bytes)?)190 }191 pub async fn run_bytes(self) -> Result<Vec<u8>> {192 let str = self.clone().into_string();193 let cmd = self.into_command_new()?;194 let v = match cmd {195 Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,196 Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,197 };198 Ok(v)199 }200201 pub async fn run_nix_string(self) -> Result<String> {202 let str = self.clone().into_string();203 let mut cmd = self.into_command();204 cmd.arg("--log-format").arg("internal-json");205 let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;206 Ok(String::from_utf8(bytes)?)207 }208 pub async fn run_nix(self) -> Result<()> {209 let str = self.clone().into_string();210 let mut cmd = self.into_command();211 cmd.arg("--log-format").arg("internal-json");212 cmd.stdout(Stdio::inherit());213 run_nix_inner(str, cmd, &mut NixHandler::default()).await214 }215}216217struct EmptyAsyncRead;218impl AsyncRead for EmptyAsyncRead {219 fn poll_read(220 self: std::pin::Pin<&mut Self>,221 _cx: &mut std::task::Context<'_>,222 _buf: &mut tokio::io::ReadBuf<'_>,223 ) -> Poll<std::io::Result<()>> {224 Poll::Pending225 }226}227228async fn run_nix_inner_stdout(229 str: String,230 cmd: Command,231 handler: &mut dyn Handler,232) -> Result<Vec<u8>> {233 Ok(run_nix_inner_raw(str, cmd, true, handler, None)234 .await?235 .expect("has out"))236}237async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {238 let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;239 assert!(v.is_none());240 Ok(())241}242async fn run_nix_inner_stdout_ssh(243 str: String,244 cmd: OwningCommand<Arc<Session>>,245 handler: &mut dyn Handler,246) -> Result<Vec<u8>> {247 Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)248 .await?249 .expect("has out"))250}251async fn run_nix_inner_ssh(252 str: String,253 cmd: OwningCommand<Arc<Session>>,254 handler: &mut dyn Handler,255) -> Result<()> {256 let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;257 assert!(v.is_none());258 Ok(())259}260261async fn run_nix_inner_raw(262 str: String,263 mut cmd: Command,264 want_stdout: bool,265 err_handler: &mut dyn Handler,266 mut out_handler: Option<&mut dyn Handler>,267) -> Result<Option<Vec<u8>>> {268 cmd.stderr(Stdio::piped());269 cmd.stdout(Stdio::piped());270 debug!("running command {cmd:?} on local");271 let mut child = cmd.spawn()?;272 let mut stderr = child.stderr.take().unwrap();273 let stdout = child.stdout.take().unwrap();274 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());275 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));276 let mut ob = want_stdout277 .then(|| out.take().unwrap())278 .unwrap_or_else(|| Box::new(EmptyAsyncRead));279 let mut ol = (!want_stdout)280 .then(|| out.take().unwrap())281 .unwrap_or_else(|| Box::new(EmptyAsyncRead));282 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());283 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());284285 // while let Some(line) = read.next().await? {}286287 let mut out_buf = if want_stdout { Some(vec![]) } else { None };288 loop {289 select! {290 e = err.next() => {291 if let Some(e) = e {292 let e = e?;293 err_handler.handle_line(&e);294 }295 },296 o = ob.next() => {297 if let Some(o) = o {298 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);299 }300 },301 o = ol.next() => {302 if let Some(o) = o {303 let o = o?;304 if let Some(out) = out_handler.as_mut() {305 out.handle_line(&o)306 } else {307 err_handler.handle_line(&o)308 }309 // out_handler.handle_info(&o);310 }311 },312 code = child.wait() => {313 let code = code?;314 if !code.success() {315 anyhow::bail!("command '{str}' failed with status {}", code);316 }317 break;318 }319 }320 }321322 Ok(out_buf)323}324async fn run_nix_inner_raw_ssh(325 str: String,326 mut cmd: OwningCommand<Arc<Session>>,327 want_stdout: bool,328 err_handler: &mut dyn Handler,329 mut out_handler: Option<&mut dyn Handler>,330) -> Result<Option<Vec<u8>>> {331 debug!("running command {cmd:?} over ssh");332 cmd.stderr(openssh::Stdio::piped());333 cmd.stdout(openssh::Stdio::piped());334 let mut child = cmd.spawn().await?;335 let mut stderr = child.stderr().take().unwrap();336 let stdout = child.stdout().take().unwrap();337 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());338 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));339 let mut ob = want_stdout340 .then(|| out.take().unwrap())341 .unwrap_or_else(|| Box::new(EmptyAsyncRead));342 let mut ol = (!want_stdout)343 .then(|| out.take().unwrap())344 .unwrap_or_else(|| Box::new(EmptyAsyncRead));345 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());346 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());347348 // while let Some(line) = read.next().await? {}349350 let mut out_buf = if want_stdout { Some(vec![]) } else { None };351352 let mut wait_future = pin::pin!(child.wait());353 loop {354 select! {355 e = err.next() => {356 if let Some(e) = e {357 let e = e?;358 err_handler.handle_line(&e);359 }360 },361 o = ob.next() => {362 if let Some(o) = o {363 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);364 }365 },366 o = ol.next() => {367 if let Some(o) = o {368 let o = o?;369 if let Some(out) = out_handler.as_mut() {370 out.handle_line(&o)371 } else {372 err_handler.handle_line(&o)373 }374 // out_handler.handle_info(&o);375 }376 },377 code = &mut wait_future => {378 let code = code?;379 if !code.success() {380 anyhow::bail!("command '{str}' failed with status {}", code);381 }382 break;383 }384 }385 }386387 Ok(out_buf)388}cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -9,7 +9,6 @@
sync::{Arc, Mutex, MutexGuard, OnceLock},
};
-use age::Recipient;
use anyhow::{anyhow, bail, Context, Result};
use clap::{ArgGroup, Parser};
use openssh::SessionBuilder;
@@ -50,10 +49,12 @@
pub struct ConfigHost {
pub name: String,
+ pub local: bool,
pub session: OnceLock<Arc<openssh::Session>>,
}
impl ConfigHost {
- pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+ assert!(!self.local, "do not open ssh connection to local session");
// FIXME: TOCTOU
if let Some(session) = &self.session.get() {
return Ok((*session).clone());
@@ -96,8 +97,12 @@
D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
}
pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
- let session = self.open_session().await?;
- Ok(MyCommand::new_on(cmd, session))
+ if self.local {
+ Ok(MyCommand::new(cmd))
+ } else {
+ let session = self.open_session().await?;
+ Ok(MyCommand::new_on(cmd, session))
+ }
}
pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
@@ -110,8 +115,25 @@
.context("failed to call remote host for decrypt")?;
z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
}
+ pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+ let mut cmd = self.cmd("fleet-install-secrets").await?;
+ cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
+ for target in targets {
+ cmd.eqarg("--targets", target);
+ }
+ let encoded = cmd
+ .sudo()
+ .run_string()
+ .await
+ .context("failed to call remote host for decrypt")?;
+ SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
+ }
/// Returns path for futureproofing, as path might change i.e on conversion to CA
pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+ if self.local {
+ // Path is located locally, thus already trusted.
+ return Ok(path.to_owned());
+ }
let mut nix = MyCommand::new("nix");
nix.arg("copy")
.arg("--substitute-on-destination")
@@ -120,6 +142,25 @@
nix.run_nix().await?;
Ok(path.to_owned())
}
+ pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("stop").arg(name);
+ cmd.sudo().run().await
+ }
+ pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+ let mut cmd = self.cmd("systemctl").await?;
+ cmd.arg("start").arg(name);
+ cmd.sudo().run().await
+ }
+
+ pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+ let mut cmd = self.cmd("rm").await?;
+ cmd.arg("-f").arg(path);
+ if sudo {
+ cmd = cmd.sudo()
+ }
+ cmd.run().await
+ }
}
impl Config {
@@ -134,35 +175,12 @@
}
pub fn is_local(&self, host: &str) -> bool {
self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
- }
-
- pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run().await
- }
- pub async fn run_string_on(
- &self,
- host: &str,
- mut command: MyCommand,
- sudo: bool,
- ) -> Result<String> {
- if sudo {
- command = command.sudo();
- }
- if !self.is_local(host) {
- command = command.ssh(host);
- }
- command.run_string().await
}
pub async fn host(&self, name: &str) -> Result<ConfigHost> {
Ok(ConfigHost {
name: name.to_owned(),
+ local: self.is_local(name),
session: OnceLock::new(),
})
}
@@ -172,6 +190,7 @@
let mut out = vec![];
for name in names {
out.push(ConfigHost {
+ local: self.is_local(&name),
name,
session: OnceLock::new(),
})
@@ -225,27 +244,6 @@
let mut data = self.data_mut();
let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
host_secrets.insert(secret, value);
- }
-
- pub async fn reencrypt_on_host(
- &self,
- host: &str,
- data: SecretData,
- targets: Vec<String>,
- ) -> Result<SecretData> {
- let mut recmd = MyCommand::new("fleet-install-secrets");
- recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
- for target in targets {
- recmd.eqarg("--targets", target);
- }
- recmd = recmd.sudo().ssh(host);
- let encoded = recmd
- .run_string()
- .await
- .context("failed to call remote host for decrypt")?
- .trim()
- .to_owned();
- SecretData::decode_z85(&encoded)
}
pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
cmds/fleet/src/keys.rsdiffbeforeafterboth--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
use std::str::FromStr;
-use crate::command::MyCommand;
use crate::host::Config;
use age::Recipient;
use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
Ok(key)
} else {
warn!("Loading key for {}", host);
- let mut cmd = MyCommand::new("cat");
+ let host = self.host(host).await?;
+ let mut cmd = host.cmd("cat").await?;
cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
- let key = self.run_string_on(host, cmd, false).await?;
- self.update_key(host, key.clone());
+ let key = cmd.run_string().await?;
+ self.update_key(&host.name, key.clone());
Ok(key)
}
}
cmds/remowt-agent/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
cmds/remowt-agent/README.adocdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
cmds/remowt-agent/src/main.rsdiffbeforeafterboth--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+ println!("Hello, world!");
+}
crates/better-command/Cargo.tomldiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
crates/better-command/src/handler.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+ fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+ fn clone(&self) -> Self {
+ Self(self.0.clone())
+ }
+}
+impl<H> ClonableHandler<H> {
+ pub fn new(inner: H) -> Self {
+ Self(Arc::new(Mutex::new(inner)))
+ }
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+ fn handle_line(&mut self, e: &str) {
+ self.0.lock().unwrap().handle_line(e)
+ }
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+ fn handle_line(&mut self, e: &str) {
+ info!(target: "log", "{e}");
+ }
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+ fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+ spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+ String(String),
+ Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+ Msg {
+ level: u32,
+ msg: String,
+ raw_msg: Option<String>,
+ },
+ Start {
+ id: u64,
+ level: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ text: String,
+ #[serde(rename = "type")]
+ typ: u32,
+ },
+ Stop {
+ id: u64,
+ },
+ Result {
+ id: u64,
+ #[serde(rename = "type")]
+ typ: u32,
+ #[serde(default)]
+ fields: Vec<LogField>,
+ },
+}
+fn process_message(m: &str) -> String {
+ // Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+ static OSC_CLEANER: Lazy<Regex> =
+ Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+ static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+ let m = OSC_CLEANER.replace_all(m, "");
+ // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+ // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+ DETABBER.replace_all(m.as_ref(), " ").to_string()
+}
+impl Handler for NixHandler {
+ fn handle_line(&mut self, e: &str) {
+ if let Some(e) = e.strip_prefix("@nix ") {
+ let log: NixLog = match serde_json::from_str(e) {
+ Ok(l) => l,
+ Err(err) => {
+ warn!("failed to parse nix log line {:?}: {}", e, err);
+ return;
+ }
+ };
+ match log {
+ NixLog::Msg { msg, raw_msg, .. } => {
+ #[allow(clippy::nonminimal_bool)]
+ if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+ && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+ && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+ if let Some(raw_msg) = raw_msg {
+ if !msg.is_empty() {
+ info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+ } else {
+ info!(target: "nix", "{}", raw_msg.trim_end())
+ }
+ } else {
+ info!(target: "nix", "{}", msg.trim_end())
+ }
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 105 && !fields.is_empty() => {
+ if let [LogField::String(drv), ..] = &fields[..] {
+ let mut drv = drv.as_str();
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ info!(target: "nix","building {}", drv);
+ let span = info_span!("build", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad build log: {:?}", log)
+ }
+ }
+ NixLog::Start {
+ ref fields,
+ typ,
+ id,
+ ..
+ } if typ == 100 && fields.len() >= 3 => {
+ if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+ &fields[..]
+ {
+ let mut drv = drv.as_str();
+
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ // info!(target: "nix","copying {} {} -> {}", drv, from, to);
+ let span = info_span!("copy", from, to, drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ } else {
+ warn!("bad copy log: {:?}", log)
+ }
+ }
+ NixLog::Start { text, typ, id, .. }
+ if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+ {
+ if !text.is_empty()
+ && text != "querying info about missing paths"
+ && text != "copying 0 paths"
+ // Too much spam on lazy-trees branch
+ && !(text.starts_with("copying '") && text.ends_with("' to the store"))
+ {
+ let span = info_span!("job");
+ span.pb_start();
+ span.pb_set_message(&process_message(text.trim()));
+ self.spans.insert(id, span);
+ info!(target: "nix", "{}", text);
+ }
+ }
+ NixLog::Start {
+ text,
+ level: 0,
+ typ: 108,
+ ..
+ } if text.is_empty() => {
+ // Cache lookup? Coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 109,
+ ..
+ } if text.starts_with("querying info about ") => {
+ // Cache lookup
+ }
+ NixLog::Start {
+ text,
+ level: 4,
+ typ: 101,
+ ..
+ } if text.starts_with("downloading ") => {
+ // NAR downloading, coupled with copy log
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ ..
+ } if text.starts_with("waiting for a machine to build ") => {
+ // Useless repeating notification about build
+ }
+ NixLog::Start {
+ text,
+ level: 3,
+ typ: 111,
+ ..
+ } if text.starts_with("resolved derivation: ") => {
+ // CA resolved
+ }
+ NixLog::Start {
+ text,
+ level: 1,
+ typ: 111,
+ id,
+ ..
+ } if text.starts_with("waiting for lock on ") => {
+ let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+ if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+ drv = txt;
+ }
+ if let Some(txt) = drv.split("', '").next() {
+ drv = txt;
+ }
+ if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+ let mut it = pkg.splitn(2, '-');
+ it.next();
+ if let Some(pkg) = it.next() {
+ drv = pkg;
+ }
+ }
+ let span = info_span!("waiting on drv", drv);
+ span.pb_start();
+ self.spans.insert(id, span);
+ // Concurrent build of the same message
+ }
+ NixLog::Stop { id, .. } => {
+ self.spans.remove(&id);
+ }
+ NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+ if let Some(span) = self.spans.get(&id) {
+ if let LogField::String(s) = &fields[0] {
+ span.pb_set_message(&process_message(s.trim()));
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ warn!("unknown result id: {id} {typ} {fields:?}");
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+ if let Some(span) = self.spans.get(&id) {
+ if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+ &fields[..4]
+ {
+ span.pb_set_length(*expected);
+ span.pb_set_position(*done);
+ } else {
+ warn!("bad fields: {fields:?}");
+ }
+ } else {
+ // warn!("unknown result id: {id} {typ} {fields:?}");
+ // Unaccounted progress.
+ }
+ // dbg!(fields, id, typ);
+ }
+ NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+ // Set phase, expected
+ }
+ _ => warn!("unknown log: {:?}", log),
+ };
+ } else {
+ let e = e.trim();
+ if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+ return;
+ }
+ info!("{e}")
+ }
+ }
+}
crates/better-command/src/lib.rsdiffbeforeafterboth--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+ left + right
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn it_works() {
+ let result = add(2, 2);
+ assert_eq!(result, 4);
+ }
+}