git.delta.rocks / jrsonnet / refs/commits / 7c6930a6bff0

difftreelog

refactor remove shell-outs for ssh

Yaroslav Bolyukin2023-12-29parent: #904d121.patch.diff
in: trunk

15 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
 checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
 
 [[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
 name = "bitflags"
 version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
  "anyhow",
  "async-trait",
  "base64 0.21.5",
+ "better-command",
  "chrono",
  "clap",
  "futures",
@@ -1510,9 +1523,9 @@
 
 [[package]]
 name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
 
 [[package]]
 name = "opaque-debug"
@@ -1922,6 +1935,10 @@
 checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
 
 [[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
 name = "rnix"
 version = "0.10.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
 
 [[package]]
 name = "serde"
-version = "1.0.190"
+version = "1.0.193"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
 dependencies = [
  "serde_derive",
 ]
@@ -2123,9 +2140,9 @@
 
 [[package]]
 name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2134,9 +2151,9 @@
 
 [[package]]
 name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
 dependencies = [
  "itoa",
  "ryu",
@@ -2527,11 +2544,10 @@
 
 [[package]]
 name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
 dependencies = [
- "cfg-if",
  "pin-project-lite",
  "tracing-attributes",
  "tracing-core",
@@ -2539,9 +2555,9 @@
 
 [[package]]
 name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2550,9 +2566,9 @@
 
 [[package]]
 name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
 dependencies = [
  "once_cell",
  "valuable",
@@ -2560,9 +2576,9 @@
 
 [[package]]
 name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
 dependencies = [
  "indicatif",
  "tracing",
modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
 [workspace]
 members = ["crates/*", "cmds/*"]
 resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
 edition = "2021"
 
 [dependencies]
+nixlike.workspace = true
+better-command.workspace = true
 anyhow = "1.0"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
@@ -15,7 +17,6 @@
 hostname = "0.3.1"
 age-core = "0.9.0"
 peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
 age = { version = "0.9.2", features = ["ssh", "armor"] }
 base64 = "0.21.5"
 chrono = { version = "0.4.31", features = ["serde"] }
modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,3 +1,6 @@
+//! Wrapper around nix repl, which allows to work on nix code, without relying on
+//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.
+
 use std::collections::HashMap;
 use std::ffi::{OsStr, OsString};
 use std::fmt::{self, Display};
@@ -6,6 +9,7 @@
 use std::sync::{Arc, OnceLock};
 
 use anyhow::{anyhow, bail, ensure, Context, Result};
+use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};
 use futures::StreamExt;
 use itertools::Itertools;
 use r2d2::{Pool, PooledConnection};
@@ -14,11 +18,9 @@
 use tokio::io::AsyncWriteExt;
 use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
 use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, Mutex};
 use tokio_util::codec::{FramedRead, LinesCodec};
 use tracing::{debug, error, warn, Level};
-
-use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};
 
 const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";
 
@@ -30,6 +32,8 @@
 	string_wrapping: (String, String),
 	number_wrapping: (String, String),
 
+	executing_command: Arc<Mutex<()>>,
+
 	next_id: u32,
 	free_list: Vec<u32>,
 }
@@ -219,6 +223,8 @@
 			string_wrapping: Default::default(),
 			number_wrapping: Default::default(),
 
+			executing_command: Arc::new(Mutex::new(())),
+
 			next_id: 0,
 			free_list: vec![],
 		};
@@ -331,6 +337,10 @@
 		expr: impl AsRef<[u8]>,
 		err_handler: &mut dyn Handler,
 	) -> Result<String> {
+		// Prevent two commands from being executed in parallel, messing with each other.
+		let _lock = self.executing_command.clone();
+		let _guard = _lock.lock().await;
+
 		self.send_command(expr).await?;
 		// It will be echoed
 		self.send_command(REPL_DELIMITER).await?;
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
 use std::{env::current_dir, time::Duration};
 
 use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
 use crate::nix_go;
 use anyhow::{anyhow, Result};
 use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
 use tokio::{task::LocalSet, time::sleep};
 use tracing::{error, field, info, info_span, warn, Instrument};
 
@@ -112,12 +112,12 @@
 	current: bool,
 	datetime: String,
 }
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
-	let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+	let mut cmd = host.cmd("nix-env").await?;
 	cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 		.arg("--list-generations");
 	// Sudo is required due to --list-generations acquiring lock on the profile.
-	let data = config.run_string_on(host, cmd, true).await?;
+	let data = cmd.sudo().run_string().await?;
 	let generations = data
 		.split('\n')
 		.map(|e| e.trim())
@@ -161,25 +161,12 @@
 		.map_err(|_e| anyhow!("bad list-generations output"))?
 		.ok_or_else(|| anyhow!("failed to find generation"))?;
 	Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
-	let mut cmd = MyCommand::new("systemctl");
-	cmd.arg("stop").arg(unit);
-	config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
-	let mut cmd = MyCommand::new("systemctl");
-	cmd.arg("start").arg(unit);
-	config.run_on(host, cmd, true).await
 }
 
 async fn execute_upload(
 	build: &BuildSystems,
-	config: &Config,
 	action: UploadAction,
-	host: &str,
+	host: &ConfigHost,
 	built: PathBuf,
 ) -> Result<()> {
 	let mut failed = false;
@@ -191,15 +178,15 @@
 	if !build.disable_rollback {
 		let _span = info_span!("preparing").entered();
 		info!("preparing for rollback");
-		let generation = get_current_generation(config, host).await?;
+		let generation = get_current_generation(&host).await?;
 		info!(
 			"rollback target would be {} {}",
 			generation.id, generation.datetime
 		);
 		{
-			let mut cmd = MyCommand::new("sh");
+			let mut cmd = host.cmd("sh").await?;
 			cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
-			if let Err(e) = config.run_on(host, cmd, true).await {
+			if let Err(e) = cmd.sudo().run().await {
 				error!("failed to set rollback marker: {e}");
 				failed = true;
 			}
@@ -215,37 +202,41 @@
 		// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
 		// Anyway, reboot will still help in this case.
 		if action.should_schedule_rollback_run() {
-			let mut cmd = MyCommand::new("systemd-run");
+			let mut cmd = host.cmd("systemd-run").await?;
 			cmd.comparg("--on-active", "3min")
 				.comparg("--unit", "rollback-watchdog-run")
 				.arg("systemctl")
 				.arg("start")
 				.arg("rollback-watchdog.service");
-			if let Err(e) = config.run_on(host, cmd, true).await {
+			if let Err(e) = cmd.sudo().run().await {
 				error!("failed to schedule rollback run: {e}");
 				failed = true;
 			}
 		}
 	}
+
 	if action.should_switch_profile() && !failed {
 		info!("switching generation");
-		let mut cmd = MyCommand::new("nix-env");
+		let mut cmd = host.cmd("nix-env").await?;
 		cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 			.comparg("--set", &built);
-		if let Err(e) = config.run_on(host, cmd, true).await {
+		if let Err(e) = cmd.sudo().run().await {
 			error!("failed to switch generation: {e}");
 			failed = true;
 		}
 	}
+
+	// FIXME: Connection might be disconnected after activation run
+
 	if action.should_activate() && !failed {
 		let _span = info_span!("activating").entered();
 		info!("executing activation script");
 		let mut switch_script = built.clone();
 		switch_script.push("bin");
 		switch_script.push("switch-to-configuration");
-		let mut cmd = MyCommand::new(switch_script);
+		let mut cmd = host.cmd(switch_script).await?;
 		cmd.arg(action.name());
-		if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+		if let Err(e) = cmd.sudo().run().in_current_span().await {
 			error!("failed to activate: {e}");
 			failed = true;
 		}
@@ -253,7 +244,8 @@
 	if !build.disable_rollback {
 		if failed {
 			info!("executing rollback");
-			if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+			if let Err(e) = host
+				.systemctl_start("rollback-watchdog.service")
 				.instrument(info_span!("rollback"))
 				.await
 			{
@@ -261,27 +253,29 @@
 			}
 		} else {
 			info!("trying to mark upgrade as successful");
-			let mut cmd = MyCommand::new("rm");
-			cmd.arg("-f").arg("/etc/fleet_rollback_marker");
-			if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+			if let Err(e) = host
+				.rm_file("/etc/fleet_rollback_marker", true)
+				.in_current_span()
+				.await
+			{
 				error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
 			}
 		}
 		info!("disarming watchdog, just in case");
-		if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+		if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
 			// It is ok, if there was no reboot - then timer might not be running.
 		}
 		if action.should_schedule_rollback_run() {
-			if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+			if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
 				error!("failed to disarm rollback run: {e}");
 			}
 		}
-	} else {
-		let mut cmd = MyCommand::new("rm");
-		cmd.arg("-f").arg("/etc/fleet_rollback_marker");
-		if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
-			// Marker might not exist, yet better try to remove it.
-		}
+	} else if let Err(_e) = host
+		.rm_file("/etc/fleet_rollback_marker", true)
+		.in_current_span()
+		.await
+	{
+		// Marker might not exist, yet better try to remove it.
 	}
 	Ok(())
 }
@@ -289,12 +283,13 @@
 impl BuildSystems {
 	async fn build_task(self, config: Config, host: String) -> Result<()> {
 		info!("building");
+		let host = config.host(&host).await?;
 		let action = Action::from(self.subcommand.clone());
 		let fleet_field = &config.fleet_field;
 		let drv = nix_go!(
 			fleet_field.buildSystems(Obj {
 				localSystem: { config.local_system.clone() }
-			})[{ action.build_attr() }][{ host }]
+			})[{ action.build_attr() }][{ &host.name }]
 		);
 		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
 
 		match action {
 			Action::Upload { action } => {
-				if !config.is_local(&host) {
+				if !config.is_local(&host.name) {
 					info!("uploading system closure");
 					{
+						// TODO: Move to remote_derivation method.
 						// Alternatively, nix store make-content-addressed can be used,
 						// at least for the first deployment, to provide trusted store key.
 						//
@@ -329,13 +325,11 @@
 					}
 					let mut tries = 0;
 					loop {
-						let mut nix = MyCommand::new("nix");
-						nix.arg("copy")
-							.arg("--substitute-on-destination")
-							.comparg("--to", format!("ssh-ng://{host}"))
-							.arg(out_output);
-						match nix.run_nix().await {
-							Ok(()) => break,
+						match host.remote_derivation(out_output).await {
+							Ok(remote) => {
+								assert!(&remote == out_output, "CA derivations aren't implemented");
+								break;
+							}
 							Err(e) if tries < 3 => {
 								tries += 1;
 								warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
 					}
 				}
 				if let Some(action) = action {
-					execute_upload(&self, &config, action, &host, out_output.clone()).await?
+					execute_upload(&self, action, &host, out_output.clone()).await?
 				}
 			}
 			Action::Package(PackageAction::SdImage) => {
 				let mut out = current_dir()?;
-				out.push(format!("sd-image-{}", host));
+				out.push(format!("sd-image-{}", host.name));
 
 				info!("linking sd image to {:?}", out);
 				symlink(out_output, out)?;
 			}
 			Action::Package(PackageAction::InstallationCd) => {
 				let mut out = current_dir()?;
-				out.push(format!("installation-cd-{}", host));
+				out.push(format!("installation-cd-{}", host.name));
 
 				info!("linking iso image to {:?}", out);
 				symlink(out_output, out)?;
@@ -379,6 +373,17 @@
 			let this = this.clone();
 			let span = info_span!("deployment", host = field::display(&host.name));
 			let hostname = host.name;
+			// FIXME: Since the introduction of better-nix-eval,
+			// due to single repl used for builds, hosts are waiting for each other to build,
+			// instead of building concurrently.
+			//
+			// Open multiple repls?
+			//
+			// Create build batcher, which will behave similar to golangs
+			// WaitGroup, and start executing once all the build tasks are scheduled?
+			// This also allows to cleanup build output, as there will be no longer
+			// "waiting for remote machine" messages in the cases when one package is needed for
+			// multiple hosts.
 			set.spawn_local(
 				(async move {
 					match this.build_task(config, hostname).await {
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
 use anyhow::{anyhow, bail, ensure, Context, Result};
 use chrono::{DateTime, Utc};
 use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
 use itertools::Itertools;
 use owo_colors::OwoColorize;
 use std::{
@@ -404,9 +404,8 @@
 					target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
 
 				if let Some(data) = secret.secret.secret {
-					let encrypted = config
-						.reencrypt_on_host(identity_holder, data, target_recipients)
-						.await?;
+					let host = config.host(&identity_holder).await?;
+					let encrypted = host.reencrypt(data, target_recipients).await?;
 					secret.secret.secret = Some(encrypted);
 				}
 
@@ -481,9 +480,8 @@
 							target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
 
 						if let Some(secret) = data.secret.secret {
-							let encrypted = config
-								.reencrypt_on_host(identity_holder, secret, target_recipients)
-								.await?;
+							let host = config.host(identity_holder).await?;
+							let encrypted = host.reencrypt(secret, target_recipients).await?;
 
 							data.secret.secret = Some(encrypted);
 						}
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
before · cmds/fleet/src/command.rs
1use std::{2	collections::HashMap,3	ffi::OsStr,4	pin,5	process::Stdio,6	sync::{Arc, Mutex},7	task::Poll,8};910use anyhow::{anyhow, Result};11use futures::StreamExt;12use itertools::Either;13use once_cell::sync::Lazy;14use openssh::{OverSsh, OwningCommand, Session};15use regex::Regex;16use serde::{de::Visitor, Deserialize};17use tokio::{io::AsyncRead, process::Command, select};18use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};19use tracing::{info, info_span, warn, Span};20use tracing_indicatif::span_ext::IndicatifSpanExt;2122fn escape_bash(input: &str, out: &mut String) {23	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";24	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {25		out.push_str(input);26		return;27	}28	out.push('\'');29	for (i, v) in input.split('\'').enumerate() {30		if i != 0 {31			out.push_str("'\"'\"'");32		}33		out.push_str(v);34	}35	out.push('\'');36}37fn ostoutf8(os: impl AsRef<OsStr>) -> String {38	os.as_ref().to_str().expect("non-utf8 data").to_owned()39}40#[derive(Clone)]41pub struct MyCommand {42	command: String,43	args: Vec<String>,44	env: Vec<(String, String)>,45	ssh_session: Option<Arc<Session>>,46}47impl MyCommand {48	pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {49		assert!(!cmd.as_ref().is_empty());50		Self {51			command: ostoutf8(cmd),52			args: vec![],53			env: vec![],54			ssh_session: Some(session),55		}56	}57	pub fn new(cmd: impl AsRef<OsStr>) -> Self {58		assert!(!cmd.as_ref().is_empty());59		Self {60			command: ostoutf8(cmd),61			args: vec![],62			env: vec![],63			ssh_session: None,64		}65	}66	fn into_args(self) -> Vec<String> {67		let mut out = Vec::new();68		if !self.env.is_empty() {69			out.push("env".to_owned());70			for (k, v) in self.env {71				assert!(!k.contains('='));72				out.push(format!("{k}={v}"));73			}74		}75		out.push(self.command);76		out.extend(self.args);77		out78	}7980	/// Translates environment variables into env command execution.81	/// Required for ssh, as ssh don't allow to send environment variables (at least by default).82	///83	/// FIXME: Insecure, as arguments might be seen by other users on the same machine.84	/// Figure out some way to transfer environment using stdio?85	fn translate_env_into_env(self) -> Self {86		if self.env.is_empty() {87			return self;88		}89		let mut out = Self::new("env");90		if let Some(session) = self.ssh_session {91			out = out.ssh_session(session);92		}93		for (k, v) in self.env {94			assert!(!k.contains('='));95			out.arg(format!("{k}={v}"));96		}97		out.arg(self.command);98		out.args(self.args);99100		out101	}102	fn into_string(self) -> String {103		let mut out = String::new();104		if !self.env.is_empty() {105			out.push_str("env");106			for (k, v) in self.env {107				out.push(' ');108				assert!(!k.contains('='));109				escape_bash(&k, &mut out);110				out.push('=');111				escape_bash(&v, &mut out);112			}113		}114		if !out.is_empty() {115			out.push(' ');116		}117		escape_bash(&self.command, &mut out);118		for arg in self.args {119			out.push(' ');120			escape_bash(&arg, &mut out);121		}122		out123	}124	fn into_command(self) -> Command {125		let mut out = Command::new(self.command);126		out.args(self.args);127		for (k, v) in self.env {128			out.env(k, v);129		}130		out131	}132	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {133		Ok(if let Some(session) = self.ssh_session.clone() {134			let cmd = self.translate_env_into_env().into_command();135			Either::Right(136				cmd.over_ssh(session)137					.map_err(|e| anyhow!("ssh error: {e}"))?,138			)139		} else {140			let cmd = self.into_command();141			Either::Left(cmd)142		})143	}144	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {145		let arg = arg.as_ref();146		self.args.push(ostoutf8(arg));147		self148	}149	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {150		let arg = arg.as_ref();151		let value = value.as_ref();152		let arg = ostoutf8(arg);153		let value = ostoutf8(value);154		self.arg(format!("{arg}={value}"));155		self156	}157	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {158		self.arg(arg);159		self.arg(value);160		self161	}162	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {163		self.env164			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));165		self166	}167	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {168		for arg in args.into_iter() {169			let arg = arg.as_ref();170			self.args.push(ostoutf8(arg));171		}172		self173	}174	pub fn sudo(mut self) -> Self {175		if std::env::var_os("NO_SUDO").is_some() {176			let mut out = Self::new("su");177			out.ssh_session = self.ssh_session.take();178			out.arg("-c").arg(self.into_string());179			out180		} else {181			let mut out = Self::new("sudo");182			out.args(self.into_args());183			out184		}185	}186	pub fn ssh_session(mut self, on: Arc<Session>) -> Self {187		self.ssh_session = Some(on);188		self189	}190	pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {191		let mut out = Self::new("ssh");192		out.ssh_session = self.ssh_session.take();193		out.arg(on).arg("--");194		out.arg(self.into_string());195		out196	}197198	pub async fn run(self) -> Result<()> {199		let str = self.clone().into_string();200		let cmd = self.into_command_new()?;201		match cmd {202			Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,203			Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,204		};205		Ok(())206	}207	pub async fn run_string(self) -> Result<String> {208		let bytes = self.run_bytes().await?;209		Ok(String::from_utf8(bytes)?)210	}211	pub async fn run_bytes(self) -> Result<Vec<u8>> {212		let str = self.clone().into_string();213		let cmd = self.into_command_new()?;214		let v = match cmd {215			Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,216			Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,217		};218		Ok(v)219	}220221	pub async fn run_nix_string(self) -> Result<String> {222		let str = self.clone().into_string();223		let mut cmd = self.into_command();224		cmd.arg("--log-format").arg("internal-json");225		let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;226		Ok(String::from_utf8(bytes)?)227	}228	pub async fn run_nix(self) -> Result<()> {229		let str = self.clone().into_string();230		let mut cmd = self.into_command();231		cmd.arg("--log-format").arg("internal-json");232		cmd.stdout(Stdio::inherit());233		run_nix_inner(str, cmd, &mut NixHandler::default()).await234	}235}236237struct EmptyAsyncRead;238impl AsyncRead for EmptyAsyncRead {239	fn poll_read(240		self: std::pin::Pin<&mut Self>,241		_cx: &mut std::task::Context<'_>,242		_buf: &mut tokio::io::ReadBuf<'_>,243	) -> Poll<std::io::Result<()>> {244		Poll::Pending245	}246}247248async fn run_nix_inner_stdout(249	str: String,250	cmd: Command,251	handler: &mut dyn Handler,252) -> Result<Vec<u8>> {253	Ok(run_nix_inner_raw(str, cmd, true, handler, None)254		.await?255		.expect("has out"))256}257async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {258	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;259	assert!(v.is_none());260	Ok(())261}262async fn run_nix_inner_stdout_ssh(263	str: String,264	cmd: OwningCommand<Arc<Session>>,265	handler: &mut dyn Handler,266) -> Result<Vec<u8>> {267	Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)268		.await?269		.expect("has out"))270}271async fn run_nix_inner_ssh(272	str: String,273	cmd: OwningCommand<Arc<Session>>,274	handler: &mut dyn Handler,275) -> Result<()> {276	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;277	assert!(v.is_none());278	Ok(())279}280281pub trait Handler: Send {282	fn handle_line(&mut self, e: &str);283}284285pub struct ClonableHandler<H>(Arc<Mutex<H>>);286impl<H> Clone for ClonableHandler<H> {287	fn clone(&self) -> Self {288		Self(self.0.clone())289	}290}291impl<H> ClonableHandler<H> {292	pub fn new(inner: H) -> Self {293		Self(Arc::new(Mutex::new(inner)))294	}295}296impl<H: Handler> Handler for ClonableHandler<H> {297	fn handle_line(&mut self, e: &str) {298		self.0.lock().unwrap().handle_line(e)299	}300}301302struct PlainHandler;303impl Handler for PlainHandler {304	fn handle_line(&mut self, e: &str) {305		info!(target: "log", "{e}");306	}307}308309pub struct NoopHandler;310impl Handler for NoopHandler {311	fn handle_line(&mut self, _e: &str) {}312}313314#[derive(Default)]315pub struct NixHandler {316	spans: HashMap<u64, Span>,317}318fn process_message(m: &str) -> String {319	static OSC_CLEANER: Lazy<Regex> =320		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());321	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());322	let m = OSC_CLEANER.replace_all(m, "");323	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,324	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.325	DETABBER.replace_all(m.as_ref(), "  ").to_string()326}327impl Handler for NixHandler {328	fn handle_line(&mut self, e: &str) {329		if let Some(e) = e.strip_prefix("@nix ") {330			let log: NixLog = match serde_json::from_str(e) {331				Ok(l) => l,332				Err(err) => {333					warn!("failed to parse nix log line {:?}: {}", e, err);334					return;335				}336			};337			match log {338				NixLog::Msg { msg, raw_msg, .. } => {339					#[allow(clippy::nonminimal_bool)]340					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))341					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")342					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {343						if let Some(raw_msg) = raw_msg {344							if !msg.is_empty() {345								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())346							} else {347								info!(target: "nix", "{}", raw_msg.trim_end())348							}349						} else {350							info!(target: "nix", "{}", msg.trim_end())351						}352					}353				}354				NixLog::Start {355					ref fields,356					typ,357					id,358					..359				} if typ == 105 && !fields.is_empty() => {360					if let [LogField::String(drv), ..] = &fields[..] {361						let mut drv = drv.as_str();362						if let Some(pkg) = drv.strip_prefix("/nix/store/") {363							let mut it = pkg.splitn(2, '-');364							it.next();365							if let Some(pkg) = it.next() {366								drv = pkg;367							}368						}369						info!(target: "nix","building {}", drv);370						let span = info_span!("build", drv);371						span.pb_start();372						self.spans.insert(id, span);373					} else {374						warn!("bad build log: {:?}", log)375					}376				}377				NixLog::Start {378					ref fields,379					typ,380					id,381					..382				} if typ == 100 && fields.len() >= 3 => {383					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =384						&fields[..]385					{386						let mut drv = drv.as_str();387388						if let Some(pkg) = drv.strip_prefix("/nix/store/") {389							let mut it = pkg.splitn(2, '-');390							it.next();391							if let Some(pkg) = it.next() {392								drv = pkg;393							}394						}395						// info!(target: "nix","copying {} {} -> {}", drv, from, to);396						let span = info_span!("copy", from, to, drv);397						span.pb_start();398						self.spans.insert(id, span);399					} else {400						warn!("bad copy log: {:?}", log)401					}402				}403				NixLog::Start { text, typ, id, .. }404					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>405				{406					if !text.is_empty()407						&& text != "querying info about missing paths"408						&& text != "copying 0 paths"409						// Too much spam on lazy-trees branch410						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))411					{412						let span = info_span!("job");413						span.pb_start();414						span.pb_set_message(&process_message(text.trim()));415						self.spans.insert(id, span);416						info!(target: "nix", "{}", text);417					}418				}419				NixLog::Start {420					text,421					level: 0,422					typ: 108,423					..424				} if text.is_empty() => {425					// Cache lookup? Coupled with copy log426				}427				NixLog::Start {428					text,429					level: 4,430					typ: 109,431					..432				} if text.starts_with("querying info about ") => {433					// Cache lookup434				}435				NixLog::Start {436					text,437					level: 4,438					typ: 101,439					..440				} if text.starts_with("downloading ") => {441					// NAR downloading, coupled with copy log442				}443				NixLog::Start {444					text,445					level: 1,446					typ: 111,447					..448				} if text.starts_with("waiting for a machine to build ") => {449					// Useless repeating notification about build450				}451				NixLog::Start {452					text,453					level: 3,454					typ: 111,455					..456				} if text.starts_with("resolved derivation: ") => {457					// CA resolved458				}459				NixLog::Start {460					text,461					level: 1,462					typ: 111,463					id,464					..465				} if text.starts_with("waiting for lock on ") => {466					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();467					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {468						drv = txt;469					}470					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {471						drv = txt;472					}473					if let Some(txt) = drv.split("', '").next() {474						drv = txt;475					}476					if let Some(pkg) = drv.strip_prefix("/nix/store/") {477						let mut it = pkg.splitn(2, '-');478						it.next();479						if let Some(pkg) = it.next() {480							drv = pkg;481						}482					}483					let span = info_span!("waiting on drv", drv);484					span.pb_start();485					self.spans.insert(id, span);486					// Concurrent build of the same message487				}488				NixLog::Stop { id, .. } => {489					self.spans.remove(&id);490				}491				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {492					if let Some(span) = self.spans.get(&id) {493						if let LogField::String(s) = &fields[0] {494							span.pb_set_message(&process_message(s.trim()));495						} else {496							warn!("bad fields: {fields:?}");497						}498					} else {499						warn!("unknown result id: {id} {typ} {fields:?}");500					}501					// dbg!(fields, id, typ);502				}503				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {504					if let Some(span) = self.spans.get(&id) {505						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =506							&fields[..4]507						{508							span.pb_set_length(*expected);509							span.pb_set_position(*done);510						} else {511							warn!("bad fields: {fields:?}");512						}513					} else {514						// warn!("unknown result id: {id} {typ} {fields:?}");515						// Unaccounted progress.516					}517					// dbg!(fields, id, typ);518				}519				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {520					// Set phase, expected521				}522				_ => warn!("unknown log: {:?}", log),523			};524		} else {525			let e = e.trim();526			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {527				return;528			}529			info!("{e}")530		}531	}532}533534async fn run_nix_inner_raw(535	str: String,536	mut cmd: Command,537	want_stdout: bool,538	err_handler: &mut dyn Handler,539	mut out_handler: Option<&mut dyn Handler>,540) -> Result<Option<Vec<u8>>> {541	cmd.stderr(Stdio::piped());542	cmd.stdout(Stdio::piped());543	let mut child = cmd.spawn()?;544	let mut stderr = child.stderr.take().unwrap();545	let stdout = child.stdout.take().unwrap();546	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());547	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));548	let mut ob = want_stdout549		.then(|| out.take().unwrap())550		.unwrap_or_else(|| Box::new(EmptyAsyncRead));551	let mut ol = (!want_stdout)552		.then(|| out.take().unwrap())553		.unwrap_or_else(|| Box::new(EmptyAsyncRead));554	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());555	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());556557	// while let Some(line) = read.next().await? {}558559	let mut out_buf = if want_stdout { Some(vec![]) } else { None };560	loop {561		select! {562			e = err.next() => {563				if let Some(e) = e {564					let e = e?;565					err_handler.handle_line(&e);566				}567			},568			o = ob.next() => {569				if let Some(o) = o {570					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);571				}572			},573			o = ol.next() => {574				if let Some(o) = o {575					let o = o?;576					if let Some(out) = out_handler.as_mut() {577						out.handle_line(&o)578					} else {579						err_handler.handle_line(&o)580					}581					// out_handler.handle_info(&o);582				}583			},584			code = child.wait() => {585				let code = code?;586				if !code.success() {587					anyhow::bail!("command '{str}' failed with status {}", code);588				}589				break;590			}591		}592	}593594	Ok(out_buf)595}596async fn run_nix_inner_raw_ssh(597	str: String,598	mut cmd: OwningCommand<Arc<Session>>,599	want_stdout: bool,600	err_handler: &mut dyn Handler,601	mut out_handler: Option<&mut dyn Handler>,602) -> Result<Option<Vec<u8>>> {603	cmd.stderr(openssh::Stdio::piped());604	cmd.stdout(openssh::Stdio::piped());605	let mut child = cmd.spawn().await?;606	let mut stderr = child.stderr().take().unwrap();607	let stdout = child.stdout().take().unwrap();608	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());609	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));610	let mut ob = want_stdout611		.then(|| out.take().unwrap())612		.unwrap_or_else(|| Box::new(EmptyAsyncRead));613	let mut ol = (!want_stdout)614		.then(|| out.take().unwrap())615		.unwrap_or_else(|| Box::new(EmptyAsyncRead));616	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());617	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());618619	// while let Some(line) = read.next().await? {}620621	let mut out_buf = if want_stdout { Some(vec![]) } else { None };622623	let mut wait_future = pin::pin!(child.wait());624	loop {625		select! {626			e = err.next() => {627				if let Some(e) = e {628					let e = e?;629					err_handler.handle_line(&e);630				}631			},632			o = ob.next() => {633				if let Some(o) = o {634					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);635				}636			},637			o = ol.next() => {638				if let Some(o) = o {639					let o = o?;640					if let Some(out) = out_handler.as_mut() {641						out.handle_line(&o)642					} else {643						err_handler.handle_line(&o)644					}645					// out_handler.handle_info(&o);646				}647			},648			code = &mut wait_future => {649				let code = code?;650				if !code.success() {651					anyhow::bail!("command '{str}' failed with status {}", code);652				}653				break;654			}655		}656	}657658	Ok(out_buf)659}660661pub trait ErrorRecorder: Send {662	/// Return true to discard message from logging663	fn push_message(&mut self, msg: &str) -> bool;664}665666#[derive(Debug)]667enum LogField {668	String(String),669	Num(u64),670}671672impl<'de> Deserialize<'de> for LogField {673	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>674	where675		D: serde::Deserializer<'de>,676	{677		struct StringOrNum;678		impl<'de> Visitor<'de> for StringOrNum {679			type Value = LogField;680681			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {682				write!(f, "string or unsigned")683			}684685			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>686			where687				E: serde::de::Error,688			{689				Ok(LogField::String(v.to_owned()))690			}691692			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>693			where694				E: serde::de::Error,695			{696				Ok(LogField::Num(v))697			}698		}699700		deserializer.deserialize_any(StringOrNum)701	}702}703704#[derive(Deserialize, Debug)]705#[serde(rename_all = "camelCase", tag = "action")]706#[allow(dead_code)]707enum NixLog {708	Msg {709		level: u32,710		msg: String,711		raw_msg: Option<String>,712	},713	Start {714		id: u64,715		level: u32,716		#[serde(default)]717		fields: Vec<LogField>,718		text: String,719		#[serde(rename = "type")]720		typ: u32,721	},722	Stop {723		id: u64,724	},725	Result {726		id: u64,727		#[serde(rename = "type")]728		typ: u32,729		#[serde(default)]730		fields: Vec<LogField>,731	},732}
after · cmds/fleet/src/command.rs
1use std::thread::sleep;2use std::time::Duration;3use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};45use anyhow::{anyhow, Result};6use better_command::{Handler, NixHandler, PlainHandler};7use futures::StreamExt;8use itertools::Either;9use openssh::{OverSsh, OwningCommand, Session};10use tokio::{io::AsyncRead, process::Command, select};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{info, debug};1314fn escape_bash(input: &str, out: &mut String) {15	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17		out.push_str(input);18		return;19	}20	out.push('\'');21	for (i, v) in input.split('\'').enumerate() {22		if i != 0 {23			out.push_str("'\"'\"'");24		}25		out.push_str(v);26	}27	out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30	os.as_ref().to_str().expect("non-utf8 data").to_owned()31}32#[derive(Clone)]33pub struct MyCommand {34	command: String,35	args: Vec<String>,36	env: Vec<(String, String)>,37	ssh_session: Option<Arc<Session>>,38}39impl MyCommand {40	pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {41		assert!(!cmd.as_ref().is_empty());42		Self {43			command: ostoutf8(cmd),44			args: vec![],45			env: vec![],46			ssh_session: Some(session),47		}48	}49	pub fn new(cmd: impl AsRef<OsStr>) -> Self {50		assert!(!cmd.as_ref().is_empty());51		Self {52			command: ostoutf8(cmd),53			args: vec![],54			env: vec![],55			ssh_session: None,56		}57	}58	fn into_args(self) -> Vec<String> {59		let mut out = Vec::new();60		if !self.env.is_empty() {61			out.push("env".to_owned());62			for (k, v) in self.env {63				assert!(!k.contains('='));64				out.push(format!("{k}={v}"));65			}66		}67		out.push(self.command);68		out.extend(self.args);69		out70	}7172	/// Translates environment variables into env command execution.73	/// Required for ssh, as ssh don't allow to send environment variables (at least by default).74	///75	/// FIXME: Insecure, as arguments might be seen by other users on the same machine.76	/// Figure out some way to transfer environment using stdio?77	fn translate_env_into_env(self) -> Self {78		if self.env.is_empty() {79			return self;80		}81		let mut out = Self::new("env");82		out.ssh_session = self.ssh_session;83		for (k, v) in self.env {84			assert!(!k.contains('='));85			out.arg(format!("{k}={v}"));86		}87		out.arg(self.command);88		out.args(self.args);8990		out91	}92	fn into_string(self) -> String {93		let mut out = String::new();94		if !self.env.is_empty() {95			out.push_str("env");96			for (k, v) in self.env {97				out.push(' ');98				assert!(!k.contains('='));99				escape_bash(&k, &mut out);100				out.push('=');101				escape_bash(&v, &mut out);102			}103		}104		if !out.is_empty() {105			out.push(' ');106		}107		escape_bash(&self.command, &mut out);108		for arg in self.args {109			out.push(' ');110			escape_bash(&arg, &mut out);111		}112		out113	}114	fn into_command(self) -> Command {115		let mut out = Command::new(self.command);116		out.args(self.args);117		for (k, v) in self.env {118			out.env(k, v);119		}120		out121	}122	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {123		Ok(if let Some(session) = self.ssh_session.clone() {124			let cmd = self.translate_env_into_env().into_command();125			Either::Right(126				cmd.over_ssh(session)127					.map_err(|e| anyhow!("ssh error: {e}"))?,128			)129		} else {130			let cmd = self.into_command();131			Either::Left(cmd)132		})133	}134	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {135		let arg = arg.as_ref();136		self.args.push(ostoutf8(arg));137		self138	}139	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {140		let arg = arg.as_ref();141		let value = value.as_ref();142		let arg = ostoutf8(arg);143		let value = ostoutf8(value);144		self.arg(format!("{arg}={value}"));145		self146	}147	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {148		self.arg(arg);149		self.arg(value);150		self151	}152	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {153		self.env154			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));155		self156	}157	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {158		for arg in args.into_iter() {159			let arg = arg.as_ref();160			self.args.push(ostoutf8(arg));161		}162		self163	}164	pub fn sudo(mut self) -> Self {165		if std::env::var_os("NO_SUDO").is_some() {166			let mut out = Self::new("su");167			out.ssh_session = self.ssh_session.take();168			out.arg("-c").arg(self.into_string());169			out170		} else {171			let mut out = Self::new("sudo");172			out.ssh_session = self.ssh_session.take();173			out.args(self.into_args());174			out175		}176	}177178	pub async fn run(self) -> Result<()> {179		let str = self.clone().into_string();180		let cmd = self.into_command_new()?;181		match cmd {182			Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,183			Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,184		};185		Ok(())186	}187	pub async fn run_string(self) -> Result<String> {188		let bytes = self.run_bytes().await?;189		Ok(String::from_utf8(bytes)?)190	}191	pub async fn run_bytes(self) -> Result<Vec<u8>> {192		let str = self.clone().into_string();193		let cmd = self.into_command_new()?;194		let v = match cmd {195			Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,196			Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,197		};198		Ok(v)199	}200201	pub async fn run_nix_string(self) -> Result<String> {202		let str = self.clone().into_string();203		let mut cmd = self.into_command();204		cmd.arg("--log-format").arg("internal-json");205		let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;206		Ok(String::from_utf8(bytes)?)207	}208	pub async fn run_nix(self) -> Result<()> {209		let str = self.clone().into_string();210		let mut cmd = self.into_command();211		cmd.arg("--log-format").arg("internal-json");212		cmd.stdout(Stdio::inherit());213		run_nix_inner(str, cmd, &mut NixHandler::default()).await214	}215}216217struct EmptyAsyncRead;218impl AsyncRead for EmptyAsyncRead {219	fn poll_read(220		self: std::pin::Pin<&mut Self>,221		_cx: &mut std::task::Context<'_>,222		_buf: &mut tokio::io::ReadBuf<'_>,223	) -> Poll<std::io::Result<()>> {224		Poll::Pending225	}226}227228async fn run_nix_inner_stdout(229	str: String,230	cmd: Command,231	handler: &mut dyn Handler,232) -> Result<Vec<u8>> {233	Ok(run_nix_inner_raw(str, cmd, true, handler, None)234		.await?235		.expect("has out"))236}237async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {238	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;239	assert!(v.is_none());240	Ok(())241}242async fn run_nix_inner_stdout_ssh(243	str: String,244	cmd: OwningCommand<Arc<Session>>,245	handler: &mut dyn Handler,246) -> Result<Vec<u8>> {247	Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)248		.await?249		.expect("has out"))250}251async fn run_nix_inner_ssh(252	str: String,253	cmd: OwningCommand<Arc<Session>>,254	handler: &mut dyn Handler,255) -> Result<()> {256	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;257	assert!(v.is_none());258	Ok(())259}260261async fn run_nix_inner_raw(262	str: String,263	mut cmd: Command,264	want_stdout: bool,265	err_handler: &mut dyn Handler,266	mut out_handler: Option<&mut dyn Handler>,267) -> Result<Option<Vec<u8>>> {268	cmd.stderr(Stdio::piped());269	cmd.stdout(Stdio::piped());270	debug!("running command {cmd:?} on local");271	let mut child = cmd.spawn()?;272	let mut stderr = child.stderr.take().unwrap();273	let stdout = child.stdout.take().unwrap();274	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());275	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));276	let mut ob = want_stdout277		.then(|| out.take().unwrap())278		.unwrap_or_else(|| Box::new(EmptyAsyncRead));279	let mut ol = (!want_stdout)280		.then(|| out.take().unwrap())281		.unwrap_or_else(|| Box::new(EmptyAsyncRead));282	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());283	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());284285	// while let Some(line) = read.next().await? {}286287	let mut out_buf = if want_stdout { Some(vec![]) } else { None };288	loop {289		select! {290			e = err.next() => {291				if let Some(e) = e {292					let e = e?;293					err_handler.handle_line(&e);294				}295			},296			o = ob.next() => {297				if let Some(o) = o {298					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);299				}300			},301			o = ol.next() => {302				if let Some(o) = o {303					let o = o?;304					if let Some(out) = out_handler.as_mut() {305						out.handle_line(&o)306					} else {307						err_handler.handle_line(&o)308					}309					// out_handler.handle_info(&o);310				}311			},312			code = child.wait() => {313				let code = code?;314				if !code.success() {315					anyhow::bail!("command '{str}' failed with status {}", code);316				}317				break;318			}319		}320	}321322	Ok(out_buf)323}324async fn run_nix_inner_raw_ssh(325	str: String,326	mut cmd: OwningCommand<Arc<Session>>,327	want_stdout: bool,328	err_handler: &mut dyn Handler,329	mut out_handler: Option<&mut dyn Handler>,330) -> Result<Option<Vec<u8>>> {331	debug!("running command {cmd:?} over ssh");332	cmd.stderr(openssh::Stdio::piped());333	cmd.stdout(openssh::Stdio::piped());334	let mut child = cmd.spawn().await?;335	let mut stderr = child.stderr().take().unwrap();336	let stdout = child.stdout().take().unwrap();337	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());338	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));339	let mut ob = want_stdout340		.then(|| out.take().unwrap())341		.unwrap_or_else(|| Box::new(EmptyAsyncRead));342	let mut ol = (!want_stdout)343		.then(|| out.take().unwrap())344		.unwrap_or_else(|| Box::new(EmptyAsyncRead));345	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());346	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());347348	// while let Some(line) = read.next().await? {}349350	let mut out_buf = if want_stdout { Some(vec![]) } else { None };351352	let mut wait_future = pin::pin!(child.wait());353	loop {354		select! {355			e = err.next() => {356				if let Some(e) = e {357					let e = e?;358					err_handler.handle_line(&e);359				}360			},361			o = ob.next() => {362				if let Some(o) = o {363					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);364				}365			},366			o = ol.next() => {367				if let Some(o) = o {368					let o = o?;369					if let Some(out) = out_handler.as_mut() {370						out.handle_line(&o)371					} else {372						err_handler.handle_line(&o)373					}374					// out_handler.handle_info(&o);375				}376			},377			code = &mut wait_future => {378				let code = code?;379				if !code.success() {380					anyhow::bail!("command '{str}' failed with status {}", code);381				}382				break;383			}384		}385	}386387	Ok(out_buf)388}
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -9,7 +9,6 @@
 	sync::{Arc, Mutex, MutexGuard, OnceLock},
 };
 
-use age::Recipient;
 use anyhow::{anyhow, bail, Context, Result};
 use clap::{ArgGroup, Parser};
 use openssh::SessionBuilder;
@@ -50,10 +49,12 @@
 
 pub struct ConfigHost {
 	pub name: String,
+	pub local: bool,
 	pub session: OnceLock<Arc<openssh::Session>>,
 }
 impl ConfigHost {
-	pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+	async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+		assert!(!self.local, "do not open ssh connection to local session");
 		// FIXME: TOCTOU
 		if let Some(session) = &self.session.get() {
 			return Ok((*session).clone());
@@ -96,8 +97,12 @@
 		D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
 	}
 	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
-		let session = self.open_session().await?;
-		Ok(MyCommand::new_on(cmd, session))
+		if self.local {
+			Ok(MyCommand::new(cmd))
+		} else {
+			let session = self.open_session().await?;
+			Ok(MyCommand::new_on(cmd, session))
+		}
 	}
 
 	pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
@@ -110,8 +115,25 @@
 			.context("failed to call remote host for decrypt")?;
 		z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
 	}
+	pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+		let mut cmd = self.cmd("fleet-install-secrets").await?;
+		cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
+		for target in targets {
+			cmd.eqarg("--targets", target);
+		}
+		let encoded = cmd
+			.sudo()
+			.run_string()
+			.await
+			.context("failed to call remote host for decrypt")?;
+		SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
+	}
 	/// Returns path for futureproofing, as path might change i.e on conversion to CA
 	pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+		if self.local {
+			// Path is located locally, thus already trusted.
+			return Ok(path.to_owned());
+		}
 		let mut nix = MyCommand::new("nix");
 		nix.arg("copy")
 			.arg("--substitute-on-destination")
@@ -120,6 +142,25 @@
 		nix.run_nix().await?;
 		Ok(path.to_owned())
 	}
+	pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+		let mut cmd = self.cmd("systemctl").await?;
+		cmd.arg("stop").arg(name);
+		cmd.sudo().run().await
+	}
+	pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+		let mut cmd = self.cmd("systemctl").await?;
+		cmd.arg("start").arg(name);
+		cmd.sudo().run().await
+	}
+
+	pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+		let mut cmd = self.cmd("rm").await?;
+		cmd.arg("-f").arg(path);
+		if sudo {
+			cmd = cmd.sudo()
+		}
+		cmd.run().await
+	}
 }
 
 impl Config {
@@ -134,35 +175,12 @@
 	}
 	pub fn is_local(&self, host: &str) -> bool {
 		self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
-	}
-
-	pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
-		if sudo {
-			command = command.sudo();
-		}
-		if !self.is_local(host) {
-			command = command.ssh(host);
-		}
-		command.run().await
-	}
-	pub async fn run_string_on(
-		&self,
-		host: &str,
-		mut command: MyCommand,
-		sudo: bool,
-	) -> Result<String> {
-		if sudo {
-			command = command.sudo();
-		}
-		if !self.is_local(host) {
-			command = command.ssh(host);
-		}
-		command.run_string().await
 	}
 
 	pub async fn host(&self, name: &str) -> Result<ConfigHost> {
 		Ok(ConfigHost {
 			name: name.to_owned(),
+			local: self.is_local(name),
 			session: OnceLock::new(),
 		})
 	}
@@ -172,6 +190,7 @@
 		let mut out = vec![];
 		for name in names {
 			out.push(ConfigHost {
+				local: self.is_local(&name),
 				name,
 				session: OnceLock::new(),
 			})
@@ -225,27 +244,6 @@
 		let mut data = self.data_mut();
 		let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
 		host_secrets.insert(secret, value);
-	}
-
-	pub async fn reencrypt_on_host(
-		&self,
-		host: &str,
-		data: SecretData,
-		targets: Vec<String>,
-	) -> Result<SecretData> {
-		let mut recmd = MyCommand::new("fleet-install-secrets");
-		recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
-		for target in targets {
-			recmd.eqarg("--targets", target);
-		}
-		recmd = recmd.sudo().ssh(host);
-		let encoded = recmd
-			.run_string()
-			.await
-			.context("failed to call remote host for decrypt")?
-			.trim()
-			.to_owned();
-		SecretData::decode_z85(&encoded)
 	}
 
 	pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
modifiedcmds/fleet/src/keys.rsdiffbeforeafterboth
--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
 use std::str::FromStr;
 
-use crate::command::MyCommand;
 use crate::host::Config;
 use age::Recipient;
 use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
 			Ok(key)
 		} else {
 			warn!("Loading key for {}", host);
-			let mut cmd = MyCommand::new("cat");
+			let host = self.host(host).await?;
+			let mut cmd = host.cmd("cat").await?;
 			cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
-			let key = self.run_string_on(host, cmd, false).await?;
-			self.update_key(host, key.clone());
+			let key = cmd.run_string().await?;
+			self.update_key(&host.name, key.clone());
 			Ok(key)
 		}
 	}
addedcmds/remowt-agent/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
addedcmds/remowt-agent/README.adocdiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
addedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+	println!("Hello, world!");
+}
addedcrates/better-command/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
addedcrates/better-command/src/handler.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+	fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+	fn clone(&self) -> Self {
+		Self(self.0.clone())
+	}
+}
+impl<H> ClonableHandler<H> {
+	pub fn new(inner: H) -> Self {
+		Self(Arc::new(Mutex::new(inner)))
+	}
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+	fn handle_line(&mut self, e: &str) {
+		self.0.lock().unwrap().handle_line(e)
+	}
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+	fn handle_line(&mut self, e: &str) {
+		info!(target: "log", "{e}");
+	}
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+	fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+	spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+	String(String),
+	Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+	Msg {
+		level: u32,
+		msg: String,
+		raw_msg: Option<String>,
+	},
+	Start {
+		id: u64,
+		level: u32,
+		#[serde(default)]
+		fields: Vec<LogField>,
+		text: String,
+		#[serde(rename = "type")]
+		typ: u32,
+	},
+	Stop {
+		id: u64,
+	},
+	Result {
+		id: u64,
+		#[serde(rename = "type")]
+		typ: u32,
+		#[serde(default)]
+		fields: Vec<LogField>,
+	},
+}
+fn process_message(m: &str) -> String {
+	// Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+	static OSC_CLEANER: Lazy<Regex> =
+		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+	let m = OSC_CLEANER.replace_all(m, "");
+	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+	DETABBER.replace_all(m.as_ref(), "  ").to_string()
+}
+impl Handler for NixHandler {
+	fn handle_line(&mut self, e: &str) {
+		if let Some(e) = e.strip_prefix("@nix ") {
+			let log: NixLog = match serde_json::from_str(e) {
+				Ok(l) => l,
+				Err(err) => {
+					warn!("failed to parse nix log line {:?}: {}", e, err);
+					return;
+				}
+			};
+			match log {
+				NixLog::Msg { msg, raw_msg, .. } => {
+					#[allow(clippy::nonminimal_bool)]
+					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+						if let Some(raw_msg) = raw_msg {
+							if !msg.is_empty() {
+								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+							} else {
+								info!(target: "nix", "{}", raw_msg.trim_end())
+							}
+						} else {
+							info!(target: "nix", "{}", msg.trim_end())
+						}
+					}
+				}
+				NixLog::Start {
+					ref fields,
+					typ,
+					id,
+					..
+				} if typ == 105 && !fields.is_empty() => {
+					if let [LogField::String(drv), ..] = &fields[..] {
+						let mut drv = drv.as_str();
+						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+							let mut it = pkg.splitn(2, '-');
+							it.next();
+							if let Some(pkg) = it.next() {
+								drv = pkg;
+							}
+						}
+						info!(target: "nix","building {}", drv);
+						let span = info_span!("build", drv);
+						span.pb_start();
+						self.spans.insert(id, span);
+					} else {
+						warn!("bad build log: {:?}", log)
+					}
+				}
+				NixLog::Start {
+					ref fields,
+					typ,
+					id,
+					..
+				} if typ == 100 && fields.len() >= 3 => {
+					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+						&fields[..]
+					{
+						let mut drv = drv.as_str();
+
+						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+							let mut it = pkg.splitn(2, '-');
+							it.next();
+							if let Some(pkg) = it.next() {
+								drv = pkg;
+							}
+						}
+						// info!(target: "nix","copying {} {} -> {}", drv, from, to);
+						let span = info_span!("copy", from, to, drv);
+						span.pb_start();
+						self.spans.insert(id, span);
+					} else {
+						warn!("bad copy log: {:?}", log)
+					}
+				}
+				NixLog::Start { text, typ, id, .. }
+					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+				{
+					if !text.is_empty()
+						&& text != "querying info about missing paths"
+						&& text != "copying 0 paths"
+						// Too much spam on lazy-trees branch
+						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))
+					{
+						let span = info_span!("job");
+						span.pb_start();
+						span.pb_set_message(&process_message(text.trim()));
+						self.spans.insert(id, span);
+						info!(target: "nix", "{}", text);
+					}
+				}
+				NixLog::Start {
+					text,
+					level: 0,
+					typ: 108,
+					..
+				} if text.is_empty() => {
+					// Cache lookup? Coupled with copy log
+				}
+				NixLog::Start {
+					text,
+					level: 4,
+					typ: 109,
+					..
+				} if text.starts_with("querying info about ") => {
+					// Cache lookup
+				}
+				NixLog::Start {
+					text,
+					level: 4,
+					typ: 101,
+					..
+				} if text.starts_with("downloading ") => {
+					// NAR downloading, coupled with copy log
+				}
+				NixLog::Start {
+					text,
+					level: 1,
+					typ: 111,
+					..
+				} if text.starts_with("waiting for a machine to build ") => {
+					// Useless repeating notification about build
+				}
+				NixLog::Start {
+					text,
+					level: 3,
+					typ: 111,
+					..
+				} if text.starts_with("resolved derivation: ") => {
+					// CA resolved
+				}
+				NixLog::Start {
+					text,
+					level: 1,
+					typ: 111,
+					id,
+					..
+				} if text.starts_with("waiting for lock on ") => {
+					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+						drv = txt;
+					}
+					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+						drv = txt;
+					}
+					if let Some(txt) = drv.split("', '").next() {
+						drv = txt;
+					}
+					if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+						let mut it = pkg.splitn(2, '-');
+						it.next();
+						if let Some(pkg) = it.next() {
+							drv = pkg;
+						}
+					}
+					let span = info_span!("waiting on drv", drv);
+					span.pb_start();
+					self.spans.insert(id, span);
+					// Concurrent build of the same message
+				}
+				NixLog::Stop { id, .. } => {
+					self.spans.remove(&id);
+				}
+				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+					if let Some(span) = self.spans.get(&id) {
+						if let LogField::String(s) = &fields[0] {
+							span.pb_set_message(&process_message(s.trim()));
+						} else {
+							warn!("bad fields: {fields:?}");
+						}
+					} else {
+						warn!("unknown result id: {id} {typ} {fields:?}");
+					}
+					// dbg!(fields, id, typ);
+				}
+				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+					if let Some(span) = self.spans.get(&id) {
+						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+							&fields[..4]
+						{
+							span.pb_set_length(*expected);
+							span.pb_set_position(*done);
+						} else {
+							warn!("bad fields: {fields:?}");
+						}
+					} else {
+						// warn!("unknown result id: {id} {typ} {fields:?}");
+						// Unaccounted progress.
+					}
+					// dbg!(fields, id, typ);
+				}
+				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+					// Set phase, expected
+				}
+				_ => warn!("unknown log: {:?}", log),
+			};
+		} else {
+			let e = e.trim();
+			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+				return;
+			}
+			info!("{e}")
+		}
+	}
+}
addedcrates/better-command/src/lib.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+	left + right
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+
+	#[test]
+	fn it_works() {
+		let result = add(2, 2);
+		assert_eq!(result, 4);
+	}
+}