git.delta.rocks / jrsonnet / refs/commits / 7c6930a6bff0

difftreelog

refactor remove shell-outs for ssh

Yaroslav Bolyukin2023-12-29parent: #904d121.patch.diff
in: trunk

15 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
 checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
 
 [[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
 name = "bitflags"
 version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
  "anyhow",
  "async-trait",
  "base64 0.21.5",
+ "better-command",
  "chrono",
  "clap",
  "futures",
@@ -1510,9 +1523,9 @@
 
 [[package]]
 name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
 
 [[package]]
 name = "opaque-debug"
@@ -1922,6 +1935,10 @@
 checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
 
 [[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
 name = "rnix"
 version = "0.10.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
 
 [[package]]
 name = "serde"
-version = "1.0.190"
+version = "1.0.193"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
 dependencies = [
  "serde_derive",
 ]
@@ -2123,9 +2140,9 @@
 
 [[package]]
 name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2134,9 +2151,9 @@
 
 [[package]]
 name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
 dependencies = [
  "itoa",
  "ryu",
@@ -2527,11 +2544,10 @@
 
 [[package]]
 name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
 dependencies = [
- "cfg-if",
  "pin-project-lite",
  "tracing-attributes",
  "tracing-core",
@@ -2539,9 +2555,9 @@
 
 [[package]]
 name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2550,9 +2566,9 @@
 
 [[package]]
 name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
 dependencies = [
  "once_cell",
  "valuable",
@@ -2560,9 +2576,9 @@
 
 [[package]]
 name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
 dependencies = [
  "indicatif",
  "tracing",
modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
 [workspace]
 members = ["crates/*", "cmds/*"]
 resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
 edition = "2021"
 
 [dependencies]
+nixlike.workspace = true
+better-command.workspace = true
 anyhow = "1.0"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
@@ -15,7 +17,6 @@
 hostname = "0.3.1"
 age-core = "0.9.0"
 peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
 age = { version = "0.9.2", features = ["ssh", "armor"] }
 base64 = "0.21.5"
 chrono = { version = "0.4.31", features = ["serde"] }
modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,3 +1,6 @@
+//! Wrapper around nix repl, which allows to work on nix code, without relying on
+//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.
+
 use std::collections::HashMap;
 use std::ffi::{OsStr, OsString};
 use std::fmt::{self, Display};
@@ -6,6 +9,7 @@
 use std::sync::{Arc, OnceLock};
 
 use anyhow::{anyhow, bail, ensure, Context, Result};
+use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};
 use futures::StreamExt;
 use itertools::Itertools;
 use r2d2::{Pool, PooledConnection};
@@ -14,11 +18,9 @@
 use tokio::io::AsyncWriteExt;
 use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
 use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, Mutex};
 use tokio_util::codec::{FramedRead, LinesCodec};
 use tracing::{debug, error, warn, Level};
-
-use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};
 
 const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";
 
@@ -30,6 +32,8 @@
 	string_wrapping: (String, String),
 	number_wrapping: (String, String),
 
+	executing_command: Arc<Mutex<()>>,
+
 	next_id: u32,
 	free_list: Vec<u32>,
 }
@@ -219,6 +223,8 @@
 			string_wrapping: Default::default(),
 			number_wrapping: Default::default(),
 
+			executing_command: Arc::new(Mutex::new(())),
+
 			next_id: 0,
 			free_list: vec![],
 		};
@@ -331,6 +337,10 @@
 		expr: impl AsRef<[u8]>,
 		err_handler: &mut dyn Handler,
 	) -> Result<String> {
+		// Prevent two commands from being executed in parallel, messing with each other.
+		let _lock = self.executing_command.clone();
+		let _guard = _lock.lock().await;
+
 		self.send_command(expr).await?;
 		// It will be echoed
 		self.send_command(REPL_DELIMITER).await?;
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
 use std::{env::current_dir, time::Duration};
 
 use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
 use crate::nix_go;
 use anyhow::{anyhow, Result};
 use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
 use tokio::{task::LocalSet, time::sleep};
 use tracing::{error, field, info, info_span, warn, Instrument};
 
@@ -112,12 +112,12 @@
 	current: bool,
 	datetime: String,
 }
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
-	let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+	let mut cmd = host.cmd("nix-env").await?;
 	cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 		.arg("--list-generations");
 	// Sudo is required due to --list-generations acquiring lock on the profile.
-	let data = config.run_string_on(host, cmd, true).await?;
+	let data = cmd.sudo().run_string().await?;
 	let generations = data
 		.split('\n')
 		.map(|e| e.trim())
@@ -161,25 +161,12 @@
 		.map_err(|_e| anyhow!("bad list-generations output"))?
 		.ok_or_else(|| anyhow!("failed to find generation"))?;
 	Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
-	let mut cmd = MyCommand::new("systemctl");
-	cmd.arg("stop").arg(unit);
-	config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
-	let mut cmd = MyCommand::new("systemctl");
-	cmd.arg("start").arg(unit);
-	config.run_on(host, cmd, true).await
 }
 
 async fn execute_upload(
 	build: &BuildSystems,
-	config: &Config,
 	action: UploadAction,
-	host: &str,
+	host: &ConfigHost,
 	built: PathBuf,
 ) -> Result<()> {
 	let mut failed = false;
@@ -191,15 +178,15 @@
 	if !build.disable_rollback {
 		let _span = info_span!("preparing").entered();
 		info!("preparing for rollback");
-		let generation = get_current_generation(config, host).await?;
+		let generation = get_current_generation(&host).await?;
 		info!(
 			"rollback target would be {} {}",
 			generation.id, generation.datetime
 		);
 		{
-			let mut cmd = MyCommand::new("sh");
+			let mut cmd = host.cmd("sh").await?;
 			cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
-			if let Err(e) = config.run_on(host, cmd, true).await {
+			if let Err(e) = cmd.sudo().run().await {
 				error!("failed to set rollback marker: {e}");
 				failed = true;
 			}
@@ -215,37 +202,41 @@
 		// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
 		// Anyway, reboot will still help in this case.
 		if action.should_schedule_rollback_run() {
-			let mut cmd = MyCommand::new("systemd-run");
+			let mut cmd = host.cmd("systemd-run").await?;
 			cmd.comparg("--on-active", "3min")
 				.comparg("--unit", "rollback-watchdog-run")
 				.arg("systemctl")
 				.arg("start")
 				.arg("rollback-watchdog.service");
-			if let Err(e) = config.run_on(host, cmd, true).await {
+			if let Err(e) = cmd.sudo().run().await {
 				error!("failed to schedule rollback run: {e}");
 				failed = true;
 			}
 		}
 	}
+
 	if action.should_switch_profile() && !failed {
 		info!("switching generation");
-		let mut cmd = MyCommand::new("nix-env");
+		let mut cmd = host.cmd("nix-env").await?;
 		cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 			.comparg("--set", &built);
-		if let Err(e) = config.run_on(host, cmd, true).await {
+		if let Err(e) = cmd.sudo().run().await {
 			error!("failed to switch generation: {e}");
 			failed = true;
 		}
 	}
+
+	// FIXME: Connection might be disconnected after activation run
+
 	if action.should_activate() && !failed {
 		let _span = info_span!("activating").entered();
 		info!("executing activation script");
 		let mut switch_script = built.clone();
 		switch_script.push("bin");
 		switch_script.push("switch-to-configuration");
-		let mut cmd = MyCommand::new(switch_script);
+		let mut cmd = host.cmd(switch_script).await?;
 		cmd.arg(action.name());
-		if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+		if let Err(e) = cmd.sudo().run().in_current_span().await {
 			error!("failed to activate: {e}");
 			failed = true;
 		}
@@ -253,7 +244,8 @@
 	if !build.disable_rollback {
 		if failed {
 			info!("executing rollback");
-			if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+			if let Err(e) = host
+				.systemctl_start("rollback-watchdog.service")
 				.instrument(info_span!("rollback"))
 				.await
 			{
@@ -261,27 +253,29 @@
 			}
 		} else {
 			info!("trying to mark upgrade as successful");
-			let mut cmd = MyCommand::new("rm");
-			cmd.arg("-f").arg("/etc/fleet_rollback_marker");
-			if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+			if let Err(e) = host
+				.rm_file("/etc/fleet_rollback_marker", true)
+				.in_current_span()
+				.await
+			{
 				error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
 			}
 		}
 		info!("disarming watchdog, just in case");
-		if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+		if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
 			// It is ok, if there was no reboot - then timer might not be running.
 		}
 		if action.should_schedule_rollback_run() {
-			if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+			if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
 				error!("failed to disarm rollback run: {e}");
 			}
 		}
-	} else {
-		let mut cmd = MyCommand::new("rm");
-		cmd.arg("-f").arg("/etc/fleet_rollback_marker");
-		if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
-			// Marker might not exist, yet better try to remove it.
-		}
+	} else if let Err(_e) = host
+		.rm_file("/etc/fleet_rollback_marker", true)
+		.in_current_span()
+		.await
+	{
+		// Marker might not exist, yet better try to remove it.
 	}
 	Ok(())
 }
@@ -289,12 +283,13 @@
 impl BuildSystems {
 	async fn build_task(self, config: Config, host: String) -> Result<()> {
 		info!("building");
+		let host = config.host(&host).await?;
 		let action = Action::from(self.subcommand.clone());
 		let fleet_field = &config.fleet_field;
 		let drv = nix_go!(
 			fleet_field.buildSystems(Obj {
 				localSystem: { config.local_system.clone() }
-			})[{ action.build_attr() }][{ host }]
+			})[{ action.build_attr() }][{ &host.name }]
 		);
 		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
 
 		match action {
 			Action::Upload { action } => {
-				if !config.is_local(&host) {
+				if !config.is_local(&host.name) {
 					info!("uploading system closure");
 					{
+						// TODO: Move to remote_derivation method.
 						// Alternatively, nix store make-content-addressed can be used,
 						// at least for the first deployment, to provide trusted store key.
 						//
@@ -329,13 +325,11 @@
 					}
 					let mut tries = 0;
 					loop {
-						let mut nix = MyCommand::new("nix");
-						nix.arg("copy")
-							.arg("--substitute-on-destination")
-							.comparg("--to", format!("ssh-ng://{host}"))
-							.arg(out_output);
-						match nix.run_nix().await {
-							Ok(()) => break,
+						match host.remote_derivation(out_output).await {
+							Ok(remote) => {
+								assert!(&remote == out_output, "CA derivations aren't implemented");
+								break;
+							}
 							Err(e) if tries < 3 => {
 								tries += 1;
 								warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
 					}
 				}
 				if let Some(action) = action {
-					execute_upload(&self, &config, action, &host, out_output.clone()).await?
+					execute_upload(&self, action, &host, out_output.clone()).await?
 				}
 			}
 			Action::Package(PackageAction::SdImage) => {
 				let mut out = current_dir()?;
-				out.push(format!("sd-image-{}", host));
+				out.push(format!("sd-image-{}", host.name));
 
 				info!("linking sd image to {:?}", out);
 				symlink(out_output, out)?;
 			}
 			Action::Package(PackageAction::InstallationCd) => {
 				let mut out = current_dir()?;
-				out.push(format!("installation-cd-{}", host));
+				out.push(format!("installation-cd-{}", host.name));
 
 				info!("linking iso image to {:?}", out);
 				symlink(out_output, out)?;
@@ -379,6 +373,17 @@
 			let this = this.clone();
 			let span = info_span!("deployment", host = field::display(&host.name));
 			let hostname = host.name;
+			// FIXME: Since the introduction of better-nix-eval,
+			// due to single repl used for builds, hosts are waiting for each other to build,
+			// instead of building concurrently.
+			//
+			// Open multiple repls?
+			//
+			// Create build batcher, which will behave similar to golangs
+			// WaitGroup, and start executing once all the build tasks are scheduled?
+			// This also allows to cleanup build output, as there will be no longer
+			// "waiting for remote machine" messages in the cases when one package is needed for
+			// multiple hosts.
 			set.spawn_local(
 				(async move {
 					match this.build_task(config, hostname).await {
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
 use anyhow::{anyhow, bail, ensure, Context, Result};
 use chrono::{DateTime, Utc};
 use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
 use itertools::Itertools;
 use owo_colors::OwoColorize;
 use std::{
@@ -404,9 +404,8 @@
 					target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
 
 				if let Some(data) = secret.secret.secret {
-					let encrypted = config
-						.reencrypt_on_host(identity_holder, data, target_recipients)
-						.await?;
+					let host = config.host(&identity_holder).await?;
+					let encrypted = host.reencrypt(data, target_recipients).await?;
 					secret.secret.secret = Some(encrypted);
 				}
 
@@ -481,9 +480,8 @@
 							target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
 
 						if let Some(secret) = data.secret.secret {
-							let encrypted = config
-								.reencrypt_on_host(identity_holder, secret, target_recipients)
-								.await?;
+							let host = config.host(identity_holder).await?;
+							let encrypted = host.reencrypt(secret, target_recipients).await?;
 
 							data.secret.secret = Some(encrypted);
 						}
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,23 +1,15 @@
-use std::{
-	collections::HashMap,
-	ffi::OsStr,
-	pin,
-	process::Stdio,
-	sync::{Arc, Mutex},
-	task::Poll,
-};
+use std::thread::sleep;
+use std::time::Duration;
+use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
 
 use anyhow::{anyhow, Result};
+use better_command::{Handler, NixHandler, PlainHandler};
 use futures::StreamExt;
 use itertools::Either;
-use once_cell::sync::Lazy;
 use openssh::{OverSsh, OwningCommand, Session};
-use regex::Regex;
-use serde::{de::Visitor, Deserialize};
 use tokio::{io::AsyncRead, process::Command, select};
 use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
-use tracing::{info, info_span, warn, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tracing::{info, debug};
 
 fn escape_bash(input: &str, out: &mut String) {
 	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
@@ -87,9 +79,7 @@
 			return self;
 		}
 		let mut out = Self::new("env");
-		if let Some(session) = self.ssh_session {
-			out = out.ssh_session(session);
-		}
+		out.ssh_session = self.ssh_session;
 		for (k, v) in self.env {
 			assert!(!k.contains('='));
 			out.arg(format!("{k}={v}"));
@@ -179,21 +169,11 @@
 			out
 		} else {
 			let mut out = Self::new("sudo");
+			out.ssh_session = self.ssh_session.take();
 			out.args(self.into_args());
 			out
 		}
 	}
-	pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
-		self.ssh_session = Some(on);
-		self
-	}
-	pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
-		let mut out = Self::new("ssh");
-		out.ssh_session = self.ssh_session.take();
-		out.arg(on).arg("--");
-		out.arg(self.into_string());
-		out
-	}
 
 	pub async fn run(self) -> Result<()> {
 		let str = self.clone().into_string();
@@ -276,259 +256,6 @@
 	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
 	assert!(v.is_none());
 	Ok(())
-}
-
-pub trait Handler: Send {
-	fn handle_line(&mut self, e: &str);
-}
-
-pub struct ClonableHandler<H>(Arc<Mutex<H>>);
-impl<H> Clone for ClonableHandler<H> {
-	fn clone(&self) -> Self {
-		Self(self.0.clone())
-	}
-}
-impl<H> ClonableHandler<H> {
-	pub fn new(inner: H) -> Self {
-		Self(Arc::new(Mutex::new(inner)))
-	}
-}
-impl<H: Handler> Handler for ClonableHandler<H> {
-	fn handle_line(&mut self, e: &str) {
-		self.0.lock().unwrap().handle_line(e)
-	}
-}
-
-struct PlainHandler;
-impl Handler for PlainHandler {
-	fn handle_line(&mut self, e: &str) {
-		info!(target: "log", "{e}");
-	}
-}
-
-pub struct NoopHandler;
-impl Handler for NoopHandler {
-	fn handle_line(&mut self, _e: &str) {}
-}
-
-#[derive(Default)]
-pub struct NixHandler {
-	spans: HashMap<u64, Span>,
-}
-fn process_message(m: &str) -> String {
-	static OSC_CLEANER: Lazy<Regex> =
-		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
-	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
-	let m = OSC_CLEANER.replace_all(m, "");
-	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
-	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
-	DETABBER.replace_all(m.as_ref(), "  ").to_string()
-}
-impl Handler for NixHandler {
-	fn handle_line(&mut self, e: &str) {
-		if let Some(e) = e.strip_prefix("@nix ") {
-			let log: NixLog = match serde_json::from_str(e) {
-				Ok(l) => l,
-				Err(err) => {
-					warn!("failed to parse nix log line {:?}: {}", e, err);
-					return;
-				}
-			};
-			match log {
-				NixLog::Msg { msg, raw_msg, .. } => {
-					#[allow(clippy::nonminimal_bool)]
-					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
-					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
-					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
-						if let Some(raw_msg) = raw_msg {
-							if !msg.is_empty() {
-								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
-							} else {
-								info!(target: "nix", "{}", raw_msg.trim_end())
-							}
-						} else {
-							info!(target: "nix", "{}", msg.trim_end())
-						}
-					}
-				}
-				NixLog::Start {
-					ref fields,
-					typ,
-					id,
-					..
-				} if typ == 105 && !fields.is_empty() => {
-					if let [LogField::String(drv), ..] = &fields[..] {
-						let mut drv = drv.as_str();
-						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-							let mut it = pkg.splitn(2, '-');
-							it.next();
-							if let Some(pkg) = it.next() {
-								drv = pkg;
-							}
-						}
-						info!(target: "nix","building {}", drv);
-						let span = info_span!("build", drv);
-						span.pb_start();
-						self.spans.insert(id, span);
-					} else {
-						warn!("bad build log: {:?}", log)
-					}
-				}
-				NixLog::Start {
-					ref fields,
-					typ,
-					id,
-					..
-				} if typ == 100 && fields.len() >= 3 => {
-					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
-						&fields[..]
-					{
-						let mut drv = drv.as_str();
-
-						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-							let mut it = pkg.splitn(2, '-');
-							it.next();
-							if let Some(pkg) = it.next() {
-								drv = pkg;
-							}
-						}
-						// info!(target: "nix","copying {} {} -> {}", drv, from, to);
-						let span = info_span!("copy", from, to, drv);
-						span.pb_start();
-						self.spans.insert(id, span);
-					} else {
-						warn!("bad copy log: {:?}", log)
-					}
-				}
-				NixLog::Start { text, typ, id, .. }
-					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
-				{
-					if !text.is_empty()
-						&& text != "querying info about missing paths"
-						&& text != "copying 0 paths"
-						// Too much spam on lazy-trees branch
-						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))
-					{
-						let span = info_span!("job");
-						span.pb_start();
-						span.pb_set_message(&process_message(text.trim()));
-						self.spans.insert(id, span);
-						info!(target: "nix", "{}", text);
-					}
-				}
-				NixLog::Start {
-					text,
-					level: 0,
-					typ: 108,
-					..
-				} if text.is_empty() => {
-					// Cache lookup? Coupled with copy log
-				}
-				NixLog::Start {
-					text,
-					level: 4,
-					typ: 109,
-					..
-				} if text.starts_with("querying info about ") => {
-					// Cache lookup
-				}
-				NixLog::Start {
-					text,
-					level: 4,
-					typ: 101,
-					..
-				} if text.starts_with("downloading ") => {
-					// NAR downloading, coupled with copy log
-				}
-				NixLog::Start {
-					text,
-					level: 1,
-					typ: 111,
-					..
-				} if text.starts_with("waiting for a machine to build ") => {
-					// Useless repeating notification about build
-				}
-				NixLog::Start {
-					text,
-					level: 3,
-					typ: 111,
-					..
-				} if text.starts_with("resolved derivation: ") => {
-					// CA resolved
-				}
-				NixLog::Start {
-					text,
-					level: 1,
-					typ: 111,
-					id,
-					..
-				} if text.starts_with("waiting for lock on ") => {
-					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
-					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
-						drv = txt;
-					}
-					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
-						drv = txt;
-					}
-					if let Some(txt) = drv.split("', '").next() {
-						drv = txt;
-					}
-					if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-						let mut it = pkg.splitn(2, '-');
-						it.next();
-						if let Some(pkg) = it.next() {
-							drv = pkg;
-						}
-					}
-					let span = info_span!("waiting on drv", drv);
-					span.pb_start();
-					self.spans.insert(id, span);
-					// Concurrent build of the same message
-				}
-				NixLog::Stop { id, .. } => {
-					self.spans.remove(&id);
-				}
-				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
-					if let Some(span) = self.spans.get(&id) {
-						if let LogField::String(s) = &fields[0] {
-							span.pb_set_message(&process_message(s.trim()));
-						} else {
-							warn!("bad fields: {fields:?}");
-						}
-					} else {
-						warn!("unknown result id: {id} {typ} {fields:?}");
-					}
-					// dbg!(fields, id, typ);
-				}
-				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
-					if let Some(span) = self.spans.get(&id) {
-						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
-							&fields[..4]
-						{
-							span.pb_set_length(*expected);
-							span.pb_set_position(*done);
-						} else {
-							warn!("bad fields: {fields:?}");
-						}
-					} else {
-						// warn!("unknown result id: {id} {typ} {fields:?}");
-						// Unaccounted progress.
-					}
-					// dbg!(fields, id, typ);
-				}
-				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
-					// Set phase, expected
-				}
-				_ => warn!("unknown log: {:?}", log),
-			};
-		} else {
-			let e = e.trim();
-			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
-				return;
-			}
-			info!("{e}")
-		}
-	}
 }
 
 async fn run_nix_inner_raw(
@@ -540,6 +267,7 @@
 ) -> Result<Option<Vec<u8>>> {
 	cmd.stderr(Stdio::piped());
 	cmd.stdout(Stdio::piped());
+	debug!("running command {cmd:?} on local");
 	let mut child = cmd.spawn()?;
 	let mut stderr = child.stderr.take().unwrap();
 	let stdout = child.stdout.take().unwrap();
@@ -600,6 +328,7 @@
 	err_handler: &mut dyn Handler,
 	mut out_handler: Option<&mut dyn Handler>,
 ) -> Result<Option<Vec<u8>>> {
+	debug!("running command {cmd:?} over ssh");
 	cmd.stderr(openssh::Stdio::piped());
 	cmd.stdout(openssh::Stdio::piped());
 	let mut child = cmd.spawn().await?;
@@ -656,77 +385,4 @@
 	}
 
 	Ok(out_buf)
-}
-
-pub trait ErrorRecorder: Send {
-	/// Return true to discard message from logging
-	fn push_message(&mut self, msg: &str) -> bool;
-}
-
-#[derive(Debug)]
-enum LogField {
-	String(String),
-	Num(u64),
-}
-
-impl<'de> Deserialize<'de> for LogField {
-	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
-	where
-		D: serde::Deserializer<'de>,
-	{
-		struct StringOrNum;
-		impl<'de> Visitor<'de> for StringOrNum {
-			type Value = LogField;
-
-			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-				write!(f, "string or unsigned")
-			}
-
-			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
-			where
-				E: serde::de::Error,
-			{
-				Ok(LogField::String(v.to_owned()))
-			}
-
-			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
-			where
-				E: serde::de::Error,
-			{
-				Ok(LogField::Num(v))
-			}
-		}
-
-		deserializer.deserialize_any(StringOrNum)
-	}
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "camelCase", tag = "action")]
-#[allow(dead_code)]
-enum NixLog {
-	Msg {
-		level: u32,
-		msg: String,
-		raw_msg: Option<String>,
-	},
-	Start {
-		id: u64,
-		level: u32,
-		#[serde(default)]
-		fields: Vec<LogField>,
-		text: String,
-		#[serde(rename = "type")]
-		typ: u32,
-	},
-	Stop {
-		id: u64,
-	},
-	Result {
-		id: u64,
-		#[serde(rename = "type")]
-		typ: u32,
-		#[serde(default)]
-		fields: Vec<LogField>,
-	},
 }
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -9,7 +9,6 @@
 	sync::{Arc, Mutex, MutexGuard, OnceLock},
 };
 
-use age::Recipient;
 use anyhow::{anyhow, bail, Context, Result};
 use clap::{ArgGroup, Parser};
 use openssh::SessionBuilder;
@@ -50,10 +49,12 @@
 
 pub struct ConfigHost {
 	pub name: String,
+	pub local: bool,
 	pub session: OnceLock<Arc<openssh::Session>>,
 }
 impl ConfigHost {
-	pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+	async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+		assert!(!self.local, "do not open ssh connection to local session");
 		// FIXME: TOCTOU
 		if let Some(session) = &self.session.get() {
 			return Ok((*session).clone());
@@ -96,8 +97,12 @@
 		D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
 	}
 	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
-		let session = self.open_session().await?;
-		Ok(MyCommand::new_on(cmd, session))
+		if self.local {
+			Ok(MyCommand::new(cmd))
+		} else {
+			let session = self.open_session().await?;
+			Ok(MyCommand::new_on(cmd, session))
+		}
 	}
 
 	pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
@@ -110,8 +115,25 @@
 			.context("failed to call remote host for decrypt")?;
 		z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
 	}
+	pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+		let mut cmd = self.cmd("fleet-install-secrets").await?;
+		cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
+		for target in targets {
+			cmd.eqarg("--targets", target);
+		}
+		let encoded = cmd
+			.sudo()
+			.run_string()
+			.await
+			.context("failed to call remote host for decrypt")?;
+		SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
+	}
 	/// Returns path for futureproofing, as path might change i.e on conversion to CA
 	pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+		if self.local {
+			// Path is located locally, thus already trusted.
+			return Ok(path.to_owned());
+		}
 		let mut nix = MyCommand::new("nix");
 		nix.arg("copy")
 			.arg("--substitute-on-destination")
@@ -120,6 +142,25 @@
 		nix.run_nix().await?;
 		Ok(path.to_owned())
 	}
+	pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+		let mut cmd = self.cmd("systemctl").await?;
+		cmd.arg("stop").arg(name);
+		cmd.sudo().run().await
+	}
+	pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+		let mut cmd = self.cmd("systemctl").await?;
+		cmd.arg("start").arg(name);
+		cmd.sudo().run().await
+	}
+
+	pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+		let mut cmd = self.cmd("rm").await?;
+		cmd.arg("-f").arg(path);
+		if sudo {
+			cmd = cmd.sudo()
+		}
+		cmd.run().await
+	}
 }
 
 impl Config {
@@ -134,35 +175,12 @@
 	}
 	pub fn is_local(&self, host: &str) -> bool {
 		self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
-	}
-
-	pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
-		if sudo {
-			command = command.sudo();
-		}
-		if !self.is_local(host) {
-			command = command.ssh(host);
-		}
-		command.run().await
-	}
-	pub async fn run_string_on(
-		&self,
-		host: &str,
-		mut command: MyCommand,
-		sudo: bool,
-	) -> Result<String> {
-		if sudo {
-			command = command.sudo();
-		}
-		if !self.is_local(host) {
-			command = command.ssh(host);
-		}
-		command.run_string().await
 	}
 
 	pub async fn host(&self, name: &str) -> Result<ConfigHost> {
 		Ok(ConfigHost {
 			name: name.to_owned(),
+			local: self.is_local(name),
 			session: OnceLock::new(),
 		})
 	}
@@ -172,6 +190,7 @@
 		let mut out = vec![];
 		for name in names {
 			out.push(ConfigHost {
+				local: self.is_local(&name),
 				name,
 				session: OnceLock::new(),
 			})
@@ -225,27 +244,6 @@
 		let mut data = self.data_mut();
 		let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
 		host_secrets.insert(secret, value);
-	}
-
-	pub async fn reencrypt_on_host(
-		&self,
-		host: &str,
-		data: SecretData,
-		targets: Vec<String>,
-	) -> Result<SecretData> {
-		let mut recmd = MyCommand::new("fleet-install-secrets");
-		recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
-		for target in targets {
-			recmd.eqarg("--targets", target);
-		}
-		recmd = recmd.sudo().ssh(host);
-		let encoded = recmd
-			.run_string()
-			.await
-			.context("failed to call remote host for decrypt")?
-			.trim()
-			.to_owned();
-		SecretData::decode_z85(&encoded)
 	}
 
 	pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
modifiedcmds/fleet/src/keys.rsdiffbeforeafterboth
--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
 use std::str::FromStr;
 
-use crate::command::MyCommand;
 use crate::host::Config;
 use age::Recipient;
 use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
 			Ok(key)
 		} else {
 			warn!("Loading key for {}", host);
-			let mut cmd = MyCommand::new("cat");
+			let host = self.host(host).await?;
+			let mut cmd = host.cmd("cat").await?;
 			cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
-			let key = self.run_string_on(host, cmd, false).await?;
-			self.update_key(host, key.clone());
+			let key = cmd.run_string().await?;
+			self.update_key(&host.name, key.clone());
 			Ok(key)
 		}
 	}
addedcmds/remowt-agent/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
addedcmds/remowt-agent/README.adocdiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
addedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+	println!("Hello, world!");
+}
addedcrates/better-command/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
addedcrates/better-command/src/handler.rsdiffbeforeafterboth
after · crates/better-command/src/handler.rs
1//! Collection of handlers, which transform program-specific stdout format to tracing23use std::collections::HashMap;4use std::sync::{Arc, Mutex};56use once_cell::sync::Lazy;7use regex::Regex;8use serde::Deserialize;9use tracing::{Span, info, warn, info_span};10use tracing_indicatif::span_ext::IndicatifSpanExt as _;1112pub trait Handler: Send {13	fn handle_line(&mut self, e: &str);14}1516/// Handler wrapper, which can be cloned.17pub struct ClonableHandler<H>(Arc<Mutex<H>>);18impl<H> Clone for ClonableHandler<H> {19	fn clone(&self) -> Self {20		Self(self.0.clone())21	}22}23impl<H> ClonableHandler<H> {24	pub fn new(inner: H) -> Self {25		Self(Arc::new(Mutex::new(inner)))26	}27}28impl<H: Handler> Handler for ClonableHandler<H> {29	fn handle_line(&mut self, e: &str) {30		self.0.lock().unwrap().handle_line(e)31	}32}3334/// Converts command output to tracing lines35pub struct PlainHandler;36impl Handler for PlainHandler {37	fn handle_line(&mut self, e: &str) {38		info!(target: "log", "{e}");39	}40}4142/// Ignores output43pub struct NoopHandler;44impl Handler for NoopHandler {45	fn handle_line(&mut self, _e: &str) {}46}4748/// Transform nix internal-json logs to tracing spans.49#[derive(Default)]50pub struct NixHandler {51	spans: HashMap<u64, Span>,52}53#[derive(Deserialize, Debug)]54#[serde(untagged)]55enum LogField {56	String(String),57	Num(u64),58}5960/// Nix internal-json log line type61#[derive(Deserialize, Debug)]62#[serde(rename_all = "camelCase", tag = "action")]63#[allow(dead_code)]64enum NixLog {65	Msg {66		level: u32,67		msg: String,68		raw_msg: Option<String>,69	},70	Start {71		id: u64,72		level: u32,73		#[serde(default)]74		fields: Vec<LogField>,75		text: String,76		#[serde(rename = "type")]77		typ: u32,78	},79	Stop {80		id: u64,81	},82	Result {83		id: u64,84		#[serde(rename = "type")]85		typ: u32,86		#[serde(default)]87		fields: Vec<LogField>,88	},89}90fn process_message(m: &str) -> String {91	// Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.92	static OSC_CLEANER: Lazy<Regex> =93		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());94	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());95	let m = OSC_CLEANER.replace_all(m, "");96	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,97	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.98	DETABBER.replace_all(m.as_ref(), "  ").to_string()99}100impl Handler for NixHandler {101	fn handle_line(&mut self, e: &str) {102		if let Some(e) = e.strip_prefix("@nix ") {103			let log: NixLog = match serde_json::from_str(e) {104				Ok(l) => l,105				Err(err) => {106					warn!("failed to parse nix log line {:?}: {}", e, err);107					return;108				}109			};110			match log {111				NixLog::Msg { msg, raw_msg, .. } => {112					#[allow(clippy::nonminimal_bool)]113					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))114					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")115					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {116						if let Some(raw_msg) = raw_msg {117							if !msg.is_empty() {118								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())119							} else {120								info!(target: "nix", "{}", raw_msg.trim_end())121							}122						} else {123							info!(target: "nix", "{}", msg.trim_end())124						}125					}126				}127				NixLog::Start {128					ref fields,129					typ,130					id,131					..132				} if typ == 105 && !fields.is_empty() => {133					if let [LogField::String(drv), ..] = &fields[..] {134						let mut drv = drv.as_str();135						if let Some(pkg) = drv.strip_prefix("/nix/store/") {136							let mut it = pkg.splitn(2, '-');137							it.next();138							if let Some(pkg) = it.next() {139								drv = pkg;140							}141						}142						info!(target: "nix","building {}", drv);143						let span = info_span!("build", drv);144						span.pb_start();145						self.spans.insert(id, span);146					} else {147						warn!("bad build log: {:?}", log)148					}149				}150				NixLog::Start {151					ref fields,152					typ,153					id,154					..155				} if typ == 100 && fields.len() >= 3 => {156					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =157						&fields[..]158					{159						let mut drv = drv.as_str();160161						if let Some(pkg) = drv.strip_prefix("/nix/store/") {162							let mut it = pkg.splitn(2, '-');163							it.next();164							if let Some(pkg) = it.next() {165								drv = pkg;166							}167						}168						// info!(target: "nix","copying {} {} -> {}", drv, from, to);169						let span = info_span!("copy", from, to, drv);170						span.pb_start();171						self.spans.insert(id, span);172					} else {173						warn!("bad copy log: {:?}", log)174					}175				}176				NixLog::Start { text, typ, id, .. }177					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>178				{179					if !text.is_empty()180						&& text != "querying info about missing paths"181						&& text != "copying 0 paths"182						// Too much spam on lazy-trees branch183						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))184					{185						let span = info_span!("job");186						span.pb_start();187						span.pb_set_message(&process_message(text.trim()));188						self.spans.insert(id, span);189						info!(target: "nix", "{}", text);190					}191				}192				NixLog::Start {193					text,194					level: 0,195					typ: 108,196					..197				} if text.is_empty() => {198					// Cache lookup? Coupled with copy log199				}200				NixLog::Start {201					text,202					level: 4,203					typ: 109,204					..205				} if text.starts_with("querying info about ") => {206					// Cache lookup207				}208				NixLog::Start {209					text,210					level: 4,211					typ: 101,212					..213				} if text.starts_with("downloading ") => {214					// NAR downloading, coupled with copy log215				}216				NixLog::Start {217					text,218					level: 1,219					typ: 111,220					..221				} if text.starts_with("waiting for a machine to build ") => {222					// Useless repeating notification about build223				}224				NixLog::Start {225					text,226					level: 3,227					typ: 111,228					..229				} if text.starts_with("resolved derivation: ") => {230					// CA resolved231				}232				NixLog::Start {233					text,234					level: 1,235					typ: 111,236					id,237					..238				} if text.starts_with("waiting for lock on ") => {239					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();240					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {241						drv = txt;242					}243					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {244						drv = txt;245					}246					if let Some(txt) = drv.split("', '").next() {247						drv = txt;248					}249					if let Some(pkg) = drv.strip_prefix("/nix/store/") {250						let mut it = pkg.splitn(2, '-');251						it.next();252						if let Some(pkg) = it.next() {253							drv = pkg;254						}255					}256					let span = info_span!("waiting on drv", drv);257					span.pb_start();258					self.spans.insert(id, span);259					// Concurrent build of the same message260				}261				NixLog::Stop { id, .. } => {262					self.spans.remove(&id);263				}264				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {265					if let Some(span) = self.spans.get(&id) {266						if let LogField::String(s) = &fields[0] {267							span.pb_set_message(&process_message(s.trim()));268						} else {269							warn!("bad fields: {fields:?}");270						}271					} else {272						warn!("unknown result id: {id} {typ} {fields:?}");273					}274					// dbg!(fields, id, typ);275				}276				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {277					if let Some(span) = self.spans.get(&id) {278						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =279							&fields[..4]280						{281							span.pb_set_length(*expected);282							span.pb_set_position(*done);283						} else {284							warn!("bad fields: {fields:?}");285						}286					} else {287						// warn!("unknown result id: {id} {typ} {fields:?}");288						// Unaccounted progress.289					}290					// dbg!(fields, id, typ);291				}292				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {293					// Set phase, expected294				}295				_ => warn!("unknown log: {:?}", log),296			};297		} else {298			let e = e.trim();299			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {300				return;301			}302			info!("{e}")303		}304	}305}
addedcrates/better-command/src/lib.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+	left + right
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+
+	#[test]
+	fn it_works() {
+		let result = add(2, 2);
+		assert_eq!(result, 4);
+	}
+}