git.delta.rocks / jrsonnet / refs/commits / 7c6930a6bff0

difftreelog

refactor remove shell-outs for ssh

Yaroslav Bolyukin2023-12-29parent: #904d121.patch.diff
in: trunk

15 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
 checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
 
 [[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
 name = "bitflags"
 version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
  "anyhow",
  "async-trait",
  "base64 0.21.5",
+ "better-command",
  "chrono",
  "clap",
  "futures",
@@ -1510,9 +1523,9 @@
 
 [[package]]
 name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
 
 [[package]]
 name = "opaque-debug"
@@ -1922,6 +1935,10 @@
 checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
 
 [[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
 name = "rnix"
 version = "0.10.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
 
 [[package]]
 name = "serde"
-version = "1.0.190"
+version = "1.0.193"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
 dependencies = [
  "serde_derive",
 ]
@@ -2123,9 +2140,9 @@
 
 [[package]]
 name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2134,9 +2151,9 @@
 
 [[package]]
 name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
 dependencies = [
  "itoa",
  "ryu",
@@ -2527,11 +2544,10 @@
 
 [[package]]
 name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
 dependencies = [
- "cfg-if",
  "pin-project-lite",
  "tracing-attributes",
  "tracing-core",
@@ -2539,9 +2555,9 @@
 
 [[package]]
 name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2550,9 +2566,9 @@
 
 [[package]]
 name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
 dependencies = [
  "once_cell",
  "valuable",
@@ -2560,9 +2576,9 @@
 
 [[package]]
 name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
 dependencies = [
  "indicatif",
  "tracing",
modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
 [workspace]
 members = ["crates/*", "cmds/*"]
 resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
 edition = "2021"
 
 [dependencies]
+nixlike.workspace = true
+better-command.workspace = true
 anyhow = "1.0"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
@@ -15,7 +17,6 @@
 hostname = "0.3.1"
 age-core = "0.9.0"
 peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
 age = { version = "0.9.2", features = ["ssh", "armor"] }
 base64 = "0.21.5"
 chrono = { version = "0.4.31", features = ["serde"] }
modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,3 +1,6 @@
+//! Wrapper around nix repl, which allows to work on nix code, without relying on
+//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.
+
 use std::collections::HashMap;
 use std::ffi::{OsStr, OsString};
 use std::fmt::{self, Display};
@@ -6,6 +9,7 @@
 use std::sync::{Arc, OnceLock};
 
 use anyhow::{anyhow, bail, ensure, Context, Result};
+use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};
 use futures::StreamExt;
 use itertools::Itertools;
 use r2d2::{Pool, PooledConnection};
@@ -14,11 +18,9 @@
 use tokio::io::AsyncWriteExt;
 use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
 use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, Mutex};
 use tokio_util::codec::{FramedRead, LinesCodec};
 use tracing::{debug, error, warn, Level};
-
-use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};
 
 const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";
 
@@ -30,6 +32,8 @@
 	string_wrapping: (String, String),
 	number_wrapping: (String, String),
 
+	executing_command: Arc<Mutex<()>>,
+
 	next_id: u32,
 	free_list: Vec<u32>,
 }
@@ -219,6 +223,8 @@
 			string_wrapping: Default::default(),
 			number_wrapping: Default::default(),
 
+			executing_command: Arc::new(Mutex::new(())),
+
 			next_id: 0,
 			free_list: vec![],
 		};
@@ -331,6 +337,10 @@
 		expr: impl AsRef<[u8]>,
 		err_handler: &mut dyn Handler,
 	) -> Result<String> {
+		// Prevent two commands from being executed in parallel, messing with each other.
+		let _lock = self.executing_command.clone();
+		let _guard = _lock.lock().await;
+
 		self.send_command(expr).await?;
 		// It will be echoed
 		self.send_command(REPL_DELIMITER).await?;
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
 use std::{env::current_dir, time::Duration};
 
 use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
 use crate::nix_go;
 use anyhow::{anyhow, Result};
 use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
 use tokio::{task::LocalSet, time::sleep};
 use tracing::{error, field, info, info_span, warn, Instrument};
 
@@ -112,12 +112,12 @@
 	current: bool,
 	datetime: String,
 }
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
-	let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+	let mut cmd = host.cmd("nix-env").await?;
 	cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 		.arg("--list-generations");
 	// Sudo is required due to --list-generations acquiring lock on the profile.
-	let data = config.run_string_on(host, cmd, true).await?;
+	let data = cmd.sudo().run_string().await?;
 	let generations = data
 		.split('\n')
 		.map(|e| e.trim())
@@ -161,25 +161,12 @@
 		.map_err(|_e| anyhow!("bad list-generations output"))?
 		.ok_or_else(|| anyhow!("failed to find generation"))?;
 	Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
-	let mut cmd = MyCommand::new("systemctl");
-	cmd.arg("stop").arg(unit);
-	config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
-	let mut cmd = MyCommand::new("systemctl");
-	cmd.arg("start").arg(unit);
-	config.run_on(host, cmd, true).await
 }
 
 async fn execute_upload(
 	build: &BuildSystems,
-	config: &Config,
 	action: UploadAction,
-	host: &str,
+	host: &ConfigHost,
 	built: PathBuf,
 ) -> Result<()> {
 	let mut failed = false;
@@ -191,15 +178,15 @@
 	if !build.disable_rollback {
 		let _span = info_span!("preparing").entered();
 		info!("preparing for rollback");
-		let generation = get_current_generation(config, host).await?;
+		let generation = get_current_generation(&host).await?;
 		info!(
 			"rollback target would be {} {}",
 			generation.id, generation.datetime
 		);
 		{
-			let mut cmd = MyCommand::new("sh");
+			let mut cmd = host.cmd("sh").await?;
 			cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
-			if let Err(e) = config.run_on(host, cmd, true).await {
+			if let Err(e) = cmd.sudo().run().await {
 				error!("failed to set rollback marker: {e}");
 				failed = true;
 			}
@@ -215,37 +202,41 @@
 		// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
 		// Anyway, reboot will still help in this case.
 		if action.should_schedule_rollback_run() {
-			let mut cmd = MyCommand::new("systemd-run");
+			let mut cmd = host.cmd("systemd-run").await?;
 			cmd.comparg("--on-active", "3min")
 				.comparg("--unit", "rollback-watchdog-run")
 				.arg("systemctl")
 				.arg("start")
 				.arg("rollback-watchdog.service");
-			if let Err(e) = config.run_on(host, cmd, true).await {
+			if let Err(e) = cmd.sudo().run().await {
 				error!("failed to schedule rollback run: {e}");
 				failed = true;
 			}
 		}
 	}
+
 	if action.should_switch_profile() && !failed {
 		info!("switching generation");
-		let mut cmd = MyCommand::new("nix-env");
+		let mut cmd = host.cmd("nix-env").await?;
 		cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 			.comparg("--set", &built);
-		if let Err(e) = config.run_on(host, cmd, true).await {
+		if let Err(e) = cmd.sudo().run().await {
 			error!("failed to switch generation: {e}");
 			failed = true;
 		}
 	}
+
+	// FIXME: Connection might be disconnected after activation run
+
 	if action.should_activate() && !failed {
 		let _span = info_span!("activating").entered();
 		info!("executing activation script");
 		let mut switch_script = built.clone();
 		switch_script.push("bin");
 		switch_script.push("switch-to-configuration");
-		let mut cmd = MyCommand::new(switch_script);
+		let mut cmd = host.cmd(switch_script).await?;
 		cmd.arg(action.name());
-		if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+		if let Err(e) = cmd.sudo().run().in_current_span().await {
 			error!("failed to activate: {e}");
 			failed = true;
 		}
@@ -253,7 +244,8 @@
 	if !build.disable_rollback {
 		if failed {
 			info!("executing rollback");
-			if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+			if let Err(e) = host
+				.systemctl_start("rollback-watchdog.service")
 				.instrument(info_span!("rollback"))
 				.await
 			{
@@ -261,27 +253,29 @@
 			}
 		} else {
 			info!("trying to mark upgrade as successful");
-			let mut cmd = MyCommand::new("rm");
-			cmd.arg("-f").arg("/etc/fleet_rollback_marker");
-			if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+			if let Err(e) = host
+				.rm_file("/etc/fleet_rollback_marker", true)
+				.in_current_span()
+				.await
+			{
 				error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
 			}
 		}
 		info!("disarming watchdog, just in case");
-		if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+		if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
 			// It is ok, if there was no reboot - then timer might not be running.
 		}
 		if action.should_schedule_rollback_run() {
-			if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+			if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
 				error!("failed to disarm rollback run: {e}");
 			}
 		}
-	} else {
-		let mut cmd = MyCommand::new("rm");
-		cmd.arg("-f").arg("/etc/fleet_rollback_marker");
-		if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
-			// Marker might not exist, yet better try to remove it.
-		}
+	} else if let Err(_e) = host
+		.rm_file("/etc/fleet_rollback_marker", true)
+		.in_current_span()
+		.await
+	{
+		// Marker might not exist, yet better try to remove it.
 	}
 	Ok(())
 }
@@ -289,12 +283,13 @@
 impl BuildSystems {
 	async fn build_task(self, config: Config, host: String) -> Result<()> {
 		info!("building");
+		let host = config.host(&host).await?;
 		let action = Action::from(self.subcommand.clone());
 		let fleet_field = &config.fleet_field;
 		let drv = nix_go!(
 			fleet_field.buildSystems(Obj {
 				localSystem: { config.local_system.clone() }
-			})[{ action.build_attr() }][{ host }]
+			})[{ action.build_attr() }][{ &host.name }]
 		);
 		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
 
 		match action {
 			Action::Upload { action } => {
-				if !config.is_local(&host) {
+				if !config.is_local(&host.name) {
 					info!("uploading system closure");
 					{
+						// TODO: Move to remote_derivation method.
 						// Alternatively, nix store make-content-addressed can be used,
 						// at least for the first deployment, to provide trusted store key.
 						//
@@ -329,13 +325,11 @@
 					}
 					let mut tries = 0;
 					loop {
-						let mut nix = MyCommand::new("nix");
-						nix.arg("copy")
-							.arg("--substitute-on-destination")
-							.comparg("--to", format!("ssh-ng://{host}"))
-							.arg(out_output);
-						match nix.run_nix().await {
-							Ok(()) => break,
+						match host.remote_derivation(out_output).await {
+							Ok(remote) => {
+								assert!(&remote == out_output, "CA derivations aren't implemented");
+								break;
+							}
 							Err(e) if tries < 3 => {
 								tries += 1;
 								warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
 					}
 				}
 				if let Some(action) = action {
-					execute_upload(&self, &config, action, &host, out_output.clone()).await?
+					execute_upload(&self, action, &host, out_output.clone()).await?
 				}
 			}
 			Action::Package(PackageAction::SdImage) => {
 				let mut out = current_dir()?;
-				out.push(format!("sd-image-{}", host));
+				out.push(format!("sd-image-{}", host.name));
 
 				info!("linking sd image to {:?}", out);
 				symlink(out_output, out)?;
 			}
 			Action::Package(PackageAction::InstallationCd) => {
 				let mut out = current_dir()?;
-				out.push(format!("installation-cd-{}", host));
+				out.push(format!("installation-cd-{}", host.name));
 
 				info!("linking iso image to {:?}", out);
 				symlink(out_output, out)?;
@@ -379,6 +373,17 @@
 			let this = this.clone();
 			let span = info_span!("deployment", host = field::display(&host.name));
 			let hostname = host.name;
+			// FIXME: Since the introduction of better-nix-eval,
+			// due to single repl used for builds, hosts are waiting for each other to build,
+			// instead of building concurrently.
+			//
+			// Open multiple repls?
+			//
+			// Create build batcher, which will behave similar to golangs
+			// WaitGroup, and start executing once all the build tasks are scheduled?
+			// This also allows to cleanup build output, as there will be no longer
+			// "waiting for remote machine" messages in the cases when one package is needed for
+			// multiple hosts.
 			set.spawn_local(
 				(async move {
 					match this.build_task(config, hostname).await {
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
 use anyhow::{anyhow, bail, ensure, Context, Result};
 use chrono::{DateTime, Utc};
 use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
 use itertools::Itertools;
 use owo_colors::OwoColorize;
 use std::{
@@ -404,9 +404,8 @@
 					target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
 
 				if let Some(data) = secret.secret.secret {
-					let encrypted = config
-						.reencrypt_on_host(identity_holder, data, target_recipients)
-						.await?;
+					let host = config.host(&identity_holder).await?;
+					let encrypted = host.reencrypt(data, target_recipients).await?;
 					secret.secret.secret = Some(encrypted);
 				}
 
@@ -481,9 +480,8 @@
 							target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
 
 						if let Some(secret) = data.secret.secret {
-							let encrypted = config
-								.reencrypt_on_host(identity_holder, secret, target_recipients)
-								.await?;
+							let host = config.host(identity_holder).await?;
+							let encrypted = host.reencrypt(secret, target_recipients).await?;
 
 							data.secret.secret = Some(encrypted);
 						}
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,23 +1,15 @@
-use std::{
-	collections::HashMap,
-	ffi::OsStr,
-	pin,
-	process::Stdio,
-	sync::{Arc, Mutex},
-	task::Poll,
-};
+use std::thread::sleep;
+use std::time::Duration;
+use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
 
 use anyhow::{anyhow, Result};
+use better_command::{Handler, NixHandler, PlainHandler};
 use futures::StreamExt;
 use itertools::Either;
-use once_cell::sync::Lazy;
 use openssh::{OverSsh, OwningCommand, Session};
-use regex::Regex;
-use serde::{de::Visitor, Deserialize};
 use tokio::{io::AsyncRead, process::Command, select};
 use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
-use tracing::{info, info_span, warn, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tracing::{info, debug};
 
 fn escape_bash(input: &str, out: &mut String) {
 	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
@@ -87,9 +79,7 @@
 			return self;
 		}
 		let mut out = Self::new("env");
-		if let Some(session) = self.ssh_session {
-			out = out.ssh_session(session);
-		}
+		out.ssh_session = self.ssh_session;
 		for (k, v) in self.env {
 			assert!(!k.contains('='));
 			out.arg(format!("{k}={v}"));
@@ -179,21 +169,11 @@
 			out
 		} else {
 			let mut out = Self::new("sudo");
+			out.ssh_session = self.ssh_session.take();
 			out.args(self.into_args());
 			out
 		}
 	}
-	pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
-		self.ssh_session = Some(on);
-		self
-	}
-	pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
-		let mut out = Self::new("ssh");
-		out.ssh_session = self.ssh_session.take();
-		out.arg(on).arg("--");
-		out.arg(self.into_string());
-		out
-	}
 
 	pub async fn run(self) -> Result<()> {
 		let str = self.clone().into_string();
@@ -276,259 +256,6 @@
 	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
 	assert!(v.is_none());
 	Ok(())
-}
-
-pub trait Handler: Send {
-	fn handle_line(&mut self, e: &str);
-}
-
-pub struct ClonableHandler<H>(Arc<Mutex<H>>);
-impl<H> Clone for ClonableHandler<H> {
-	fn clone(&self) -> Self {
-		Self(self.0.clone())
-	}
-}
-impl<H> ClonableHandler<H> {
-	pub fn new(inner: H) -> Self {
-		Self(Arc::new(Mutex::new(inner)))
-	}
-}
-impl<H: Handler> Handler for ClonableHandler<H> {
-	fn handle_line(&mut self, e: &str) {
-		self.0.lock().unwrap().handle_line(e)
-	}
-}
-
-struct PlainHandler;
-impl Handler for PlainHandler {
-	fn handle_line(&mut self, e: &str) {
-		info!(target: "log", "{e}");
-	}
-}
-
-pub struct NoopHandler;
-impl Handler for NoopHandler {
-	fn handle_line(&mut self, _e: &str) {}
-}
-
-#[derive(Default)]
-pub struct NixHandler {
-	spans: HashMap<u64, Span>,
-}
-fn process_message(m: &str) -> String {
-	static OSC_CLEANER: Lazy<Regex> =
-		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
-	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
-	let m = OSC_CLEANER.replace_all(m, "");
-	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
-	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
-	DETABBER.replace_all(m.as_ref(), "  ").to_string()
-}
-impl Handler for NixHandler {
-	fn handle_line(&mut self, e: &str) {
-		if let Some(e) = e.strip_prefix("@nix ") {
-			let log: NixLog = match serde_json::from_str(e) {
-				Ok(l) => l,
-				Err(err) => {
-					warn!("failed to parse nix log line {:?}: {}", e, err);
-					return;
-				}
-			};
-			match log {
-				NixLog::Msg { msg, raw_msg, .. } => {
-					#[allow(clippy::nonminimal_bool)]
-					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
-					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
-					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
-						if let Some(raw_msg) = raw_msg {
-							if !msg.is_empty() {
-								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
-							} else {
-								info!(target: "nix", "{}", raw_msg.trim_end())
-							}
-						} else {
-							info!(target: "nix", "{}", msg.trim_end())
-						}
-					}
-				}
-				NixLog::Start {
-					ref fields,
-					typ,
-					id,
-					..
-				} if typ == 105 && !fields.is_empty() => {
-					if let [LogField::String(drv), ..] = &fields[..] {
-						let mut drv = drv.as_str();
-						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-							let mut it = pkg.splitn(2, '-');
-							it.next();
-							if let Some(pkg) = it.next() {
-								drv = pkg;
-							}
-						}
-						info!(target: "nix","building {}", drv);
-						let span = info_span!("build", drv);
-						span.pb_start();
-						self.spans.insert(id, span);
-					} else {
-						warn!("bad build log: {:?}", log)
-					}
-				}
-				NixLog::Start {
-					ref fields,
-					typ,
-					id,
-					..
-				} if typ == 100 && fields.len() >= 3 => {
-					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
-						&fields[..]
-					{
-						let mut drv = drv.as_str();
-
-						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-							let mut it = pkg.splitn(2, '-');
-							it.next();
-							if let Some(pkg) = it.next() {
-								drv = pkg;
-							}
-						}
-						// info!(target: "nix","copying {} {} -> {}", drv, from, to);
-						let span = info_span!("copy", from, to, drv);
-						span.pb_start();
-						self.spans.insert(id, span);
-					} else {
-						warn!("bad copy log: {:?}", log)
-					}
-				}
-				NixLog::Start { text, typ, id, .. }
-					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
-				{
-					if !text.is_empty()
-						&& text != "querying info about missing paths"
-						&& text != "copying 0 paths"
-						// Too much spam on lazy-trees branch
-						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))
-					{
-						let span = info_span!("job");
-						span.pb_start();
-						span.pb_set_message(&process_message(text.trim()));
-						self.spans.insert(id, span);
-						info!(target: "nix", "{}", text);
-					}
-				}
-				NixLog::Start {
-					text,
-					level: 0,
-					typ: 108,
-					..
-				} if text.is_empty() => {
-					// Cache lookup? Coupled with copy log
-				}
-				NixLog::Start {
-					text,
-					level: 4,
-					typ: 109,
-					..
-				} if text.starts_with("querying info about ") => {
-					// Cache lookup
-				}
-				NixLog::Start {
-					text,
-					level: 4,
-					typ: 101,
-					..
-				} if text.starts_with("downloading ") => {
-					// NAR downloading, coupled with copy log
-				}
-				NixLog::Start {
-					text,
-					level: 1,
-					typ: 111,
-					..
-				} if text.starts_with("waiting for a machine to build ") => {
-					// Useless repeating notification about build
-				}
-				NixLog::Start {
-					text,
-					level: 3,
-					typ: 111,
-					..
-				} if text.starts_with("resolved derivation: ") => {
-					// CA resolved
-				}
-				NixLog::Start {
-					text,
-					level: 1,
-					typ: 111,
-					id,
-					..
-				} if text.starts_with("waiting for lock on ") => {
-					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
-					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
-						drv = txt;
-					}
-					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
-						drv = txt;
-					}
-					if let Some(txt) = drv.split("', '").next() {
-						drv = txt;
-					}
-					if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-						let mut it = pkg.splitn(2, '-');
-						it.next();
-						if let Some(pkg) = it.next() {
-							drv = pkg;
-						}
-					}
-					let span = info_span!("waiting on drv", drv);
-					span.pb_start();
-					self.spans.insert(id, span);
-					// Concurrent build of the same message
-				}
-				NixLog::Stop { id, .. } => {
-					self.spans.remove(&id);
-				}
-				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
-					if let Some(span) = self.spans.get(&id) {
-						if let LogField::String(s) = &fields[0] {
-							span.pb_set_message(&process_message(s.trim()));
-						} else {
-							warn!("bad fields: {fields:?}");
-						}
-					} else {
-						warn!("unknown result id: {id} {typ} {fields:?}");
-					}
-					// dbg!(fields, id, typ);
-				}
-				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
-					if let Some(span) = self.spans.get(&id) {
-						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
-							&fields[..4]
-						{
-							span.pb_set_length(*expected);
-							span.pb_set_position(*done);
-						} else {
-							warn!("bad fields: {fields:?}");
-						}
-					} else {
-						// warn!("unknown result id: {id} {typ} {fields:?}");
-						// Unaccounted progress.
-					}
-					// dbg!(fields, id, typ);
-				}
-				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
-					// Set phase, expected
-				}
-				_ => warn!("unknown log: {:?}", log),
-			};
-		} else {
-			let e = e.trim();
-			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
-				return;
-			}
-			info!("{e}")
-		}
-	}
 }
 
 async fn run_nix_inner_raw(
@@ -540,6 +267,7 @@
 ) -> Result<Option<Vec<u8>>> {
 	cmd.stderr(Stdio::piped());
 	cmd.stdout(Stdio::piped());
+	debug!("running command {cmd:?} on local");
 	let mut child = cmd.spawn()?;
 	let mut stderr = child.stderr.take().unwrap();
 	let stdout = child.stdout.take().unwrap();
@@ -600,6 +328,7 @@
 	err_handler: &mut dyn Handler,
 	mut out_handler: Option<&mut dyn Handler>,
 ) -> Result<Option<Vec<u8>>> {
+	debug!("running command {cmd:?} over ssh");
 	cmd.stderr(openssh::Stdio::piped());
 	cmd.stdout(openssh::Stdio::piped());
 	let mut child = cmd.spawn().await?;
@@ -656,77 +385,4 @@
 	}
 
 	Ok(out_buf)
-}
-
-pub trait ErrorRecorder: Send {
-	/// Return true to discard message from logging
-	fn push_message(&mut self, msg: &str) -> bool;
-}
-
-#[derive(Debug)]
-enum LogField {
-	String(String),
-	Num(u64),
-}
-
-impl<'de> Deserialize<'de> for LogField {
-	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
-	where
-		D: serde::Deserializer<'de>,
-	{
-		struct StringOrNum;
-		impl<'de> Visitor<'de> for StringOrNum {
-			type Value = LogField;
-
-			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-				write!(f, "string or unsigned")
-			}
-
-			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
-			where
-				E: serde::de::Error,
-			{
-				Ok(LogField::String(v.to_owned()))
-			}
-
-			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
-			where
-				E: serde::de::Error,
-			{
-				Ok(LogField::Num(v))
-			}
-		}
-
-		deserializer.deserialize_any(StringOrNum)
-	}
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "camelCase", tag = "action")]
-#[allow(dead_code)]
-enum NixLog {
-	Msg {
-		level: u32,
-		msg: String,
-		raw_msg: Option<String>,
-	},
-	Start {
-		id: u64,
-		level: u32,
-		#[serde(default)]
-		fields: Vec<LogField>,
-		text: String,
-		#[serde(rename = "type")]
-		typ: u32,
-	},
-	Stop {
-		id: u64,
-	},
-	Result {
-		id: u64,
-		#[serde(rename = "type")]
-		typ: u32,
-		#[serde(default)]
-		fields: Vec<LogField>,
-	},
 }
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -9,7 +9,6 @@
 	sync::{Arc, Mutex, MutexGuard, OnceLock},
 };
 
-use age::Recipient;
 use anyhow::{anyhow, bail, Context, Result};
 use clap::{ArgGroup, Parser};
 use openssh::SessionBuilder;
@@ -50,10 +49,12 @@
 
 pub struct ConfigHost {
 	pub name: String,
+	pub local: bool,
 	pub session: OnceLock<Arc<openssh::Session>>,
 }
 impl ConfigHost {
-	pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+	async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+		assert!(!self.local, "do not open ssh connection to local session");
 		// FIXME: TOCTOU
 		if let Some(session) = &self.session.get() {
 			return Ok((*session).clone());
@@ -96,8 +97,12 @@
 		D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
 	}
 	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
-		let session = self.open_session().await?;
-		Ok(MyCommand::new_on(cmd, session))
+		if self.local {
+			Ok(MyCommand::new(cmd))
+		} else {
+			let session = self.open_session().await?;
+			Ok(MyCommand::new_on(cmd, session))
+		}
 	}
 
 	pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
@@ -110,8 +115,25 @@
 			.context("failed to call remote host for decrypt")?;
 		z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
 	}
+	pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+		let mut cmd = self.cmd("fleet-install-secrets").await?;
+		cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
+		for target in targets {
+			cmd.eqarg("--targets", target);
+		}
+		let encoded = cmd
+			.sudo()
+			.run_string()
+			.await
+			.context("failed to call remote host for decrypt")?;
+		SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
+	}
 	/// Returns path for futureproofing, as path might change i.e on conversion to CA
 	pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+		if self.local {
+			// Path is located locally, thus already trusted.
+			return Ok(path.to_owned());
+		}
 		let mut nix = MyCommand::new("nix");
 		nix.arg("copy")
 			.arg("--substitute-on-destination")
@@ -120,6 +142,25 @@
 		nix.run_nix().await?;
 		Ok(path.to_owned())
 	}
+	pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+		let mut cmd = self.cmd("systemctl").await?;
+		cmd.arg("stop").arg(name);
+		cmd.sudo().run().await
+	}
+	pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+		let mut cmd = self.cmd("systemctl").await?;
+		cmd.arg("start").arg(name);
+		cmd.sudo().run().await
+	}
+
+	pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+		let mut cmd = self.cmd("rm").await?;
+		cmd.arg("-f").arg(path);
+		if sudo {
+			cmd = cmd.sudo()
+		}
+		cmd.run().await
+	}
 }
 
 impl Config {
@@ -134,35 +175,12 @@
 	}
 	pub fn is_local(&self, host: &str) -> bool {
 		self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
-	}
-
-	pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
-		if sudo {
-			command = command.sudo();
-		}
-		if !self.is_local(host) {
-			command = command.ssh(host);
-		}
-		command.run().await
-	}
-	pub async fn run_string_on(
-		&self,
-		host: &str,
-		mut command: MyCommand,
-		sudo: bool,
-	) -> Result<String> {
-		if sudo {
-			command = command.sudo();
-		}
-		if !self.is_local(host) {
-			command = command.ssh(host);
-		}
-		command.run_string().await
 	}
 
 	pub async fn host(&self, name: &str) -> Result<ConfigHost> {
 		Ok(ConfigHost {
 			name: name.to_owned(),
+			local: self.is_local(name),
 			session: OnceLock::new(),
 		})
 	}
@@ -172,6 +190,7 @@
 		let mut out = vec![];
 		for name in names {
 			out.push(ConfigHost {
+				local: self.is_local(&name),
 				name,
 				session: OnceLock::new(),
 			})
@@ -225,27 +244,6 @@
 		let mut data = self.data_mut();
 		let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
 		host_secrets.insert(secret, value);
-	}
-
-	pub async fn reencrypt_on_host(
-		&self,
-		host: &str,
-		data: SecretData,
-		targets: Vec<String>,
-	) -> Result<SecretData> {
-		let mut recmd = MyCommand::new("fleet-install-secrets");
-		recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
-		for target in targets {
-			recmd.eqarg("--targets", target);
-		}
-		recmd = recmd.sudo().ssh(host);
-		let encoded = recmd
-			.run_string()
-			.await
-			.context("failed to call remote host for decrypt")?
-			.trim()
-			.to_owned();
-		SecretData::decode_z85(&encoded)
 	}
 
 	pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
modifiedcmds/fleet/src/keys.rsdiffbeforeafterboth
--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
 use std::str::FromStr;
 
-use crate::command::MyCommand;
 use crate::host::Config;
 use age::Recipient;
 use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
 			Ok(key)
 		} else {
 			warn!("Loading key for {}", host);
-			let mut cmd = MyCommand::new("cat");
+			let host = self.host(host).await?;
+			let mut cmd = host.cmd("cat").await?;
 			cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
-			let key = self.run_string_on(host, cmd, false).await?;
-			self.update_key(host, key.clone());
+			let key = cmd.run_string().await?;
+			self.update_key(&host.name, key.clone());
 			Ok(key)
 		}
 	}
addedcmds/remowt-agent/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
addedcmds/remowt-agent/README.adocdiffbeforeafterboth

no changes

addedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+	println!("Hello, world!");
+}
addedcrates/better-command/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
addedcrates/better-command/src/handler.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+	fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+	fn clone(&self) -> Self {
+		Self(self.0.clone())
+	}
+}
+impl<H> ClonableHandler<H> {
+	pub fn new(inner: H) -> Self {
+		Self(Arc::new(Mutex::new(inner)))
+	}
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+	fn handle_line(&mut self, e: &str) {
+		self.0.lock().unwrap().handle_line(e)
+	}
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+	fn handle_line(&mut self, e: &str) {
+		info!(target: "log", "{e}");
+	}
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+	fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+	spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+	String(String),
+	Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+	Msg {
+		level: u32,
+		msg: String,
+		raw_msg: Option<String>,
+	},
+	Start {
+		id: u64,
+		level: u32,
+		#[serde(default)]
+		fields: Vec<LogField>,
+		text: String,
+		#[serde(rename = "type")]
+		typ: u32,
+	},
+	Stop {
+		id: u64,
+	},
+	Result {
+		id: u64,
+		#[serde(rename = "type")]
+		typ: u32,
+		#[serde(default)]
+		fields: Vec<LogField>,
+	},
+}
+fn process_message(m: &str) -> String {
+	// Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+	static OSC_CLEANER: Lazy<Regex> =
+		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+	let m = OSC_CLEANER.replace_all(m, "");
+	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+	DETABBER.replace_all(m.as_ref(), "  ").to_string()
+}
+impl Handler for NixHandler {
+	fn handle_line(&mut self, e: &str) {
+		if let Some(e) = e.strip_prefix("@nix ") {
+			let log: NixLog = match serde_json::from_str(e) {
+				Ok(l) => l,
+				Err(err) => {
+					warn!("failed to parse nix log line {:?}: {}", e, err);
+					return;
+				}
+			};
+			match log {
+				NixLog::Msg { msg, raw_msg, .. } => {
+					#[allow(clippy::nonminimal_bool)]
+					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+						if let Some(raw_msg) = raw_msg {
+							if !msg.is_empty() {
+								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+							} else {
+								info!(target: "nix", "{}", raw_msg.trim_end())
+							}
+						} else {
+							info!(target: "nix", "{}", msg.trim_end())
+						}
+					}
+				}
+				NixLog::Start {
+					ref fields,
+					typ,
+					id,
+					..
+				} if typ == 105 && !fields.is_empty() => {
+					if let [LogField::String(drv), ..] = &fields[..] {
+						let mut drv = drv.as_str();
+						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+							let mut it = pkg.splitn(2, '-');
+							it.next();
+							if let Some(pkg) = it.next() {
+								drv = pkg;
+							}
+						}
+						info!(target: "nix","building {}", drv);
+						let span = info_span!("build", drv);
+						span.pb_start();
+						self.spans.insert(id, span);
+					} else {
+						warn!("bad build log: {:?}", log)
+					}
+				}
+				NixLog::Start {
+					ref fields,
+					typ,
+					id,
+					..
+				} if typ == 100 && fields.len() >= 3 => {
+					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+						&fields[..]
+					{
+						let mut drv = drv.as_str();
+
+						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+							let mut it = pkg.splitn(2, '-');
+							it.next();
+							if let Some(pkg) = it.next() {
+								drv = pkg;
+							}
+						}
+						// info!(target: "nix","copying {} {} -> {}", drv, from, to);
+						let span = info_span!("copy", from, to, drv);
+						span.pb_start();
+						self.spans.insert(id, span);
+					} else {
+						warn!("bad copy log: {:?}", log)
+					}
+				}
+				NixLog::Start { text, typ, id, .. }
+					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+				{
+					if !text.is_empty()
+						&& text != "querying info about missing paths"
+						&& text != "copying 0 paths"
+						// Too much spam on lazy-trees branch
+						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))
+					{
+						let span = info_span!("job");
+						span.pb_start();
+						span.pb_set_message(&process_message(text.trim()));
+						self.spans.insert(id, span);
+						info!(target: "nix", "{}", text);
+					}
+				}
+				NixLog::Start {
+					text,
+					level: 0,
+					typ: 108,
+					..
+				} if text.is_empty() => {
+					// Cache lookup? Coupled with copy log
+				}
+				NixLog::Start {
+					text,
+					level: 4,
+					typ: 109,
+					..
+				} if text.starts_with("querying info about ") => {
+					// Cache lookup
+				}
+				NixLog::Start {
+					text,
+					level: 4,
+					typ: 101,
+					..
+				} if text.starts_with("downloading ") => {
+					// NAR downloading, coupled with copy log
+				}
+				NixLog::Start {
+					text,
+					level: 1,
+					typ: 111,
+					..
+				} if text.starts_with("waiting for a machine to build ") => {
+					// Useless repeating notification about build
+				}
+				NixLog::Start {
+					text,
+					level: 3,
+					typ: 111,
+					..
+				} if text.starts_with("resolved derivation: ") => {
+					// CA resolved
+				}
+				NixLog::Start {
+					text,
+					level: 1,
+					typ: 111,
+					id,
+					..
+				} if text.starts_with("waiting for lock on ") => {
+					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+						drv = txt;
+					}
+					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+						drv = txt;
+					}
+					if let Some(txt) = drv.split("', '").next() {
+						drv = txt;
+					}
+					if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+						let mut it = pkg.splitn(2, '-');
+						it.next();
+						if let Some(pkg) = it.next() {
+							drv = pkg;
+						}
+					}
+					let span = info_span!("waiting on drv", drv);
+					span.pb_start();
+					self.spans.insert(id, span);
+					// Concurrent build of the same message
+				}
+				NixLog::Stop { id, .. } => {
+					self.spans.remove(&id);
+				}
+				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+					if let Some(span) = self.spans.get(&id) {
+						if let LogField::String(s) = &fields[0] {
+							span.pb_set_message(&process_message(s.trim()));
+						} else {
+							warn!("bad fields: {fields:?}");
+						}
+					} else {
+						warn!("unknown result id: {id} {typ} {fields:?}");
+					}
+					// dbg!(fields, id, typ);
+				}
+				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+					if let Some(span) = self.spans.get(&id) {
+						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+							&fields[..4]
+						{
+							span.pb_set_length(*expected);
+							span.pb_set_position(*done);
+						} else {
+							warn!("bad fields: {fields:?}");
+						}
+					} else {
+						// warn!("unknown result id: {id} {typ} {fields:?}");
+						// Unaccounted progress.
+					}
+					// dbg!(fields, id, typ);
+				}
+				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+					// Set phase, expected
+				}
+				_ => warn!("unknown log: {:?}", log),
+			};
+		} else {
+			let e = e.trim();
+			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+				return;
+			}
+			info!("{e}")
+		}
+	}
+}
addedcrates/better-command/src/lib.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+	left + right
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+
+	#[test]
+	fn it_works() {
+		let result = add(2, 2);
+		assert_eq!(result, 4);
+	}
+}