git.delta.rocks / jrsonnet / refs/commits / 7c6930a6bff0

difftreelog

refactor remove shell-outs for ssh

Yaroslav Bolyukin2023-12-29parent: #904d121.patch.diff
in: trunk

15 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
 checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
 
 [[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
 name = "bitflags"
 version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
  "anyhow",
  "async-trait",
  "base64 0.21.5",
+ "better-command",
  "chrono",
  "clap",
  "futures",
@@ -1510,9 +1523,9 @@
 
 [[package]]
 name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
 
 [[package]]
 name = "opaque-debug"
@@ -1922,6 +1935,10 @@
 checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
 
 [[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
 name = "rnix"
 version = "0.10.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
 
 [[package]]
 name = "serde"
-version = "1.0.190"
+version = "1.0.193"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
 dependencies = [
  "serde_derive",
 ]
@@ -2123,9 +2140,9 @@
 
 [[package]]
 name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2134,9 +2151,9 @@
 
 [[package]]
 name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
 dependencies = [
  "itoa",
  "ryu",
@@ -2527,11 +2544,10 @@
 
 [[package]]
 name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
 dependencies = [
- "cfg-if",
  "pin-project-lite",
  "tracing-attributes",
  "tracing-core",
@@ -2539,9 +2555,9 @@
 
 [[package]]
 name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2550,9 +2566,9 @@
 
 [[package]]
 name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
 dependencies = [
  "once_cell",
  "valuable",
@@ -2560,9 +2576,9 @@
 
 [[package]]
 name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
 dependencies = [
  "indicatif",
  "tracing",
modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
 [workspace]
 members = ["crates/*", "cmds/*"]
 resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
 edition = "2021"
 
 [dependencies]
+nixlike.workspace = true
+better-command.workspace = true
 anyhow = "1.0"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
@@ -15,7 +17,6 @@
 hostname = "0.3.1"
 age-core = "0.9.0"
 peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
 age = { version = "0.9.2", features = ["ssh", "armor"] }
 base64 = "0.21.5"
 chrono = { version = "0.4.31", features = ["serde"] }
modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,3 +1,6 @@
+//! Wrapper around nix repl, which allows to work on nix code, without relying on
+//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.
+
 use std::collections::HashMap;
 use std::ffi::{OsStr, OsString};
 use std::fmt::{self, Display};
@@ -6,6 +9,7 @@
 use std::sync::{Arc, OnceLock};
 
 use anyhow::{anyhow, bail, ensure, Context, Result};
+use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};
 use futures::StreamExt;
 use itertools::Itertools;
 use r2d2::{Pool, PooledConnection};
@@ -14,11 +18,9 @@
 use tokio::io::AsyncWriteExt;
 use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
 use tokio::select;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::{mpsc, oneshot, Mutex};
 use tokio_util::codec::{FramedRead, LinesCodec};
 use tracing::{debug, error, warn, Level};
-
-use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};
 
 const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";
 
@@ -30,6 +32,8 @@
 	string_wrapping: (String, String),
 	number_wrapping: (String, String),
 
+	executing_command: Arc<Mutex<()>>,
+
 	next_id: u32,
 	free_list: Vec<u32>,
 }
@@ -219,6 +223,8 @@
 			string_wrapping: Default::default(),
 			number_wrapping: Default::default(),
 
+			executing_command: Arc::new(Mutex::new(())),
+
 			next_id: 0,
 			free_list: vec![],
 		};
@@ -331,6 +337,10 @@
 		expr: impl AsRef<[u8]>,
 		err_handler: &mut dyn Handler,
 	) -> Result<String> {
+		// Prevent two commands from being executed in parallel, messing with each other.
+		let _lock = self.executing_command.clone();
+		let _guard = _lock.lock().await;
+
 		self.send_command(expr).await?;
 		// It will be echoed
 		self.send_command(REPL_DELIMITER).await?;
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
 use std::{env::current_dir, time::Duration};
 
 use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
 use crate::nix_go;
 use anyhow::{anyhow, Result};
 use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
 use tokio::{task::LocalSet, time::sleep};
 use tracing::{error, field, info, info_span, warn, Instrument};
 
@@ -112,12 +112,12 @@
 	current: bool,
 	datetime: String,
 }
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
-	let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+	let mut cmd = host.cmd("nix-env").await?;
 	cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 		.arg("--list-generations");
 	// Sudo is required due to --list-generations acquiring lock on the profile.
-	let data = config.run_string_on(host, cmd, true).await?;
+	let data = cmd.sudo().run_string().await?;
 	let generations = data
 		.split('\n')
 		.map(|e| e.trim())
@@ -161,25 +161,12 @@
 		.map_err(|_e| anyhow!("bad list-generations output"))?
 		.ok_or_else(|| anyhow!("failed to find generation"))?;
 	Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
-	let mut cmd = MyCommand::new("systemctl");
-	cmd.arg("stop").arg(unit);
-	config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
-	let mut cmd = MyCommand::new("systemctl");
-	cmd.arg("start").arg(unit);
-	config.run_on(host, cmd, true).await
 }
 
 async fn execute_upload(
 	build: &BuildSystems,
-	config: &Config,
 	action: UploadAction,
-	host: &str,
+	host: &ConfigHost,
 	built: PathBuf,
 ) -> Result<()> {
 	let mut failed = false;
@@ -191,15 +178,15 @@
 	if !build.disable_rollback {
 		let _span = info_span!("preparing").entered();
 		info!("preparing for rollback");
-		let generation = get_current_generation(config, host).await?;
+		let generation = get_current_generation(&host).await?;
 		info!(
 			"rollback target would be {} {}",
 			generation.id, generation.datetime
 		);
 		{
-			let mut cmd = MyCommand::new("sh");
+			let mut cmd = host.cmd("sh").await?;
 			cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
-			if let Err(e) = config.run_on(host, cmd, true).await {
+			if let Err(e) = cmd.sudo().run().await {
 				error!("failed to set rollback marker: {e}");
 				failed = true;
 			}
@@ -215,37 +202,41 @@
 		// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
 		// Anyway, reboot will still help in this case.
 		if action.should_schedule_rollback_run() {
-			let mut cmd = MyCommand::new("systemd-run");
+			let mut cmd = host.cmd("systemd-run").await?;
 			cmd.comparg("--on-active", "3min")
 				.comparg("--unit", "rollback-watchdog-run")
 				.arg("systemctl")
 				.arg("start")
 				.arg("rollback-watchdog.service");
-			if let Err(e) = config.run_on(host, cmd, true).await {
+			if let Err(e) = cmd.sudo().run().await {
 				error!("failed to schedule rollback run: {e}");
 				failed = true;
 			}
 		}
 	}
+
 	if action.should_switch_profile() && !failed {
 		info!("switching generation");
-		let mut cmd = MyCommand::new("nix-env");
+		let mut cmd = host.cmd("nix-env").await?;
 		cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 			.comparg("--set", &built);
-		if let Err(e) = config.run_on(host, cmd, true).await {
+		if let Err(e) = cmd.sudo().run().await {
 			error!("failed to switch generation: {e}");
 			failed = true;
 		}
 	}
+
+	// FIXME: Connection might be disconnected after activation run
+
 	if action.should_activate() && !failed {
 		let _span = info_span!("activating").entered();
 		info!("executing activation script");
 		let mut switch_script = built.clone();
 		switch_script.push("bin");
 		switch_script.push("switch-to-configuration");
-		let mut cmd = MyCommand::new(switch_script);
+		let mut cmd = host.cmd(switch_script).await?;
 		cmd.arg(action.name());
-		if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+		if let Err(e) = cmd.sudo().run().in_current_span().await {
 			error!("failed to activate: {e}");
 			failed = true;
 		}
@@ -253,7 +244,8 @@
 	if !build.disable_rollback {
 		if failed {
 			info!("executing rollback");
-			if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+			if let Err(e) = host
+				.systemctl_start("rollback-watchdog.service")
 				.instrument(info_span!("rollback"))
 				.await
 			{
@@ -261,27 +253,29 @@
 			}
 		} else {
 			info!("trying to mark upgrade as successful");
-			let mut cmd = MyCommand::new("rm");
-			cmd.arg("-f").arg("/etc/fleet_rollback_marker");
-			if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+			if let Err(e) = host
+				.rm_file("/etc/fleet_rollback_marker", true)
+				.in_current_span()
+				.await
+			{
 				error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
 			}
 		}
 		info!("disarming watchdog, just in case");
-		if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+		if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
 			// It is ok, if there was no reboot - then timer might not be running.
 		}
 		if action.should_schedule_rollback_run() {
-			if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+			if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
 				error!("failed to disarm rollback run: {e}");
 			}
 		}
-	} else {
-		let mut cmd = MyCommand::new("rm");
-		cmd.arg("-f").arg("/etc/fleet_rollback_marker");
-		if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
-			// Marker might not exist, yet better try to remove it.
-		}
+	} else if let Err(_e) = host
+		.rm_file("/etc/fleet_rollback_marker", true)
+		.in_current_span()
+		.await
+	{
+		// Marker might not exist, yet better try to remove it.
 	}
 	Ok(())
 }
@@ -289,12 +283,13 @@
 impl BuildSystems {
 	async fn build_task(self, config: Config, host: String) -> Result<()> {
 		info!("building");
+		let host = config.host(&host).await?;
 		let action = Action::from(self.subcommand.clone());
 		let fleet_field = &config.fleet_field;
 		let drv = nix_go!(
 			fleet_field.buildSystems(Obj {
 				localSystem: { config.local_system.clone() }
-			})[{ action.build_attr() }][{ host }]
+			})[{ action.build_attr() }][{ &host.name }]
 		);
 		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
 
 		match action {
 			Action::Upload { action } => {
-				if !config.is_local(&host) {
+				if !config.is_local(&host.name) {
 					info!("uploading system closure");
 					{
+						// TODO: Move to remote_derivation method.
 						// Alternatively, nix store make-content-addressed can be used,
 						// at least for the first deployment, to provide trusted store key.
 						//
@@ -329,13 +325,11 @@
 					}
 					let mut tries = 0;
 					loop {
-						let mut nix = MyCommand::new("nix");
-						nix.arg("copy")
-							.arg("--substitute-on-destination")
-							.comparg("--to", format!("ssh-ng://{host}"))
-							.arg(out_output);
-						match nix.run_nix().await {
-							Ok(()) => break,
+						match host.remote_derivation(out_output).await {
+							Ok(remote) => {
+								assert!(&remote == out_output, "CA derivations aren't implemented");
+								break;
+							}
 							Err(e) if tries < 3 => {
 								tries += 1;
 								warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
 					}
 				}
 				if let Some(action) = action {
-					execute_upload(&self, &config, action, &host, out_output.clone()).await?
+					execute_upload(&self, action, &host, out_output.clone()).await?
 				}
 			}
 			Action::Package(PackageAction::SdImage) => {
 				let mut out = current_dir()?;
-				out.push(format!("sd-image-{}", host));
+				out.push(format!("sd-image-{}", host.name));
 
 				info!("linking sd image to {:?}", out);
 				symlink(out_output, out)?;
 			}
 			Action::Package(PackageAction::InstallationCd) => {
 				let mut out = current_dir()?;
-				out.push(format!("installation-cd-{}", host));
+				out.push(format!("installation-cd-{}", host.name));
 
 				info!("linking iso image to {:?}", out);
 				symlink(out_output, out)?;
@@ -379,6 +373,17 @@
 			let this = this.clone();
 			let span = info_span!("deployment", host = field::display(&host.name));
 			let hostname = host.name;
+			// FIXME: Since the introduction of better-nix-eval,
+			// due to single repl used for builds, hosts are waiting for each other to build,
+			// instead of building concurrently.
+			//
+			// Open multiple repls?
+			//
+			// Create build batcher, which will behave similar to golangs
+			// WaitGroup, and start executing once all the build tasks are scheduled?
+			// This also allows to cleanup build output, as there will be no longer
+			// "waiting for remote machine" messages in the cases when one package is needed for
+			// multiple hosts.
 			set.spawn_local(
 				(async move {
 					match this.build_task(config, hostname).await {
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
 use anyhow::{anyhow, bail, ensure, Context, Result};
 use chrono::{DateTime, Utc};
 use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
 use itertools::Itertools;
 use owo_colors::OwoColorize;
 use std::{
@@ -404,9 +404,8 @@
 					target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
 
 				if let Some(data) = secret.secret.secret {
-					let encrypted = config
-						.reencrypt_on_host(identity_holder, data, target_recipients)
-						.await?;
+					let host = config.host(&identity_holder).await?;
+					let encrypted = host.reencrypt(data, target_recipients).await?;
 					secret.secret.secret = Some(encrypted);
 				}
 
@@ -481,9 +480,8 @@
 							target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
 
 						if let Some(secret) = data.secret.secret {
-							let encrypted = config
-								.reencrypt_on_host(identity_holder, secret, target_recipients)
-								.await?;
+							let host = config.host(identity_holder).await?;
+							let encrypted = host.reencrypt(secret, target_recipients).await?;
 
 							data.secret.secret = Some(encrypted);
 						}
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,23 +1,15 @@
-use std::{
-	collections::HashMap,
-	ffi::OsStr,
-	pin,
-	process::Stdio,
-	sync::{Arc, Mutex},
-	task::Poll,
-};
+use std::thread::sleep;
+use std::time::Duration;
+use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
 
 use anyhow::{anyhow, Result};
+use better_command::{Handler, NixHandler, PlainHandler};
 use futures::StreamExt;
 use itertools::Either;
-use once_cell::sync::Lazy;
 use openssh::{OverSsh, OwningCommand, Session};
-use regex::Regex;
-use serde::{de::Visitor, Deserialize};
 use tokio::{io::AsyncRead, process::Command, select};
 use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
-use tracing::{info, info_span, warn, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tracing::{info, debug};
 
 fn escape_bash(input: &str, out: &mut String) {
 	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
@@ -87,9 +79,7 @@
 			return self;
 		}
 		let mut out = Self::new("env");
-		if let Some(session) = self.ssh_session {
-			out = out.ssh_session(session);
-		}
+		out.ssh_session = self.ssh_session;
 		for (k, v) in self.env {
 			assert!(!k.contains('='));
 			out.arg(format!("{k}={v}"));
@@ -179,21 +169,11 @@
 			out
 		} else {
 			let mut out = Self::new("sudo");
+			out.ssh_session = self.ssh_session.take();
 			out.args(self.into_args());
 			out
 		}
 	}
-	pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
-		self.ssh_session = Some(on);
-		self
-	}
-	pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
-		let mut out = Self::new("ssh");
-		out.ssh_session = self.ssh_session.take();
-		out.arg(on).arg("--");
-		out.arg(self.into_string());
-		out
-	}
 
 	pub async fn run(self) -> Result<()> {
 		let str = self.clone().into_string();
@@ -276,259 +256,6 @@
 	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
 	assert!(v.is_none());
 	Ok(())
-}
-
-pub trait Handler: Send {
-	fn handle_line(&mut self, e: &str);
-}
-
-pub struct ClonableHandler<H>(Arc<Mutex<H>>);
-impl<H> Clone for ClonableHandler<H> {
-	fn clone(&self) -> Self {
-		Self(self.0.clone())
-	}
-}
-impl<H> ClonableHandler<H> {
-	pub fn new(inner: H) -> Self {
-		Self(Arc::new(Mutex::new(inner)))
-	}
-}
-impl<H: Handler> Handler for ClonableHandler<H> {
-	fn handle_line(&mut self, e: &str) {
-		self.0.lock().unwrap().handle_line(e)
-	}
-}
-
-struct PlainHandler;
-impl Handler for PlainHandler {
-	fn handle_line(&mut self, e: &str) {
-		info!(target: "log", "{e}");
-	}
-}
-
-pub struct NoopHandler;
-impl Handler for NoopHandler {
-	fn handle_line(&mut self, _e: &str) {}
-}
-
-#[derive(Default)]
-pub struct NixHandler {
-	spans: HashMap<u64, Span>,
-}
-fn process_message(m: &str) -> String {
-	static OSC_CLEANER: Lazy<Regex> =
-		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
-	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
-	let m = OSC_CLEANER.replace_all(m, "");
-	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
-	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
-	DETABBER.replace_all(m.as_ref(), "  ").to_string()
-}
-impl Handler for NixHandler {
-	fn handle_line(&mut self, e: &str) {
-		if let Some(e) = e.strip_prefix("@nix ") {
-			let log: NixLog = match serde_json::from_str(e) {
-				Ok(l) => l,
-				Err(err) => {
-					warn!("failed to parse nix log line {:?}: {}", e, err);
-					return;
-				}
-			};
-			match log {
-				NixLog::Msg { msg, raw_msg, .. } => {
-					#[allow(clippy::nonminimal_bool)]
-					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
-					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
-					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
-						if let Some(raw_msg) = raw_msg {
-							if !msg.is_empty() {
-								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
-							} else {
-								info!(target: "nix", "{}", raw_msg.trim_end())
-							}
-						} else {
-							info!(target: "nix", "{}", msg.trim_end())
-						}
-					}
-				}
-				NixLog::Start {
-					ref fields,
-					typ,
-					id,
-					..
-				} if typ == 105 && !fields.is_empty() => {
-					if let [LogField::String(drv), ..] = &fields[..] {
-						let mut drv = drv.as_str();
-						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-							let mut it = pkg.splitn(2, '-');
-							it.next();
-							if let Some(pkg) = it.next() {
-								drv = pkg;
-							}
-						}
-						info!(target: "nix","building {}", drv);
-						let span = info_span!("build", drv);
-						span.pb_start();
-						self.spans.insert(id, span);
-					} else {
-						warn!("bad build log: {:?}", log)
-					}
-				}
-				NixLog::Start {
-					ref fields,
-					typ,
-					id,
-					..
-				} if typ == 100 && fields.len() >= 3 => {
-					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
-						&fields[..]
-					{
-						let mut drv = drv.as_str();
-
-						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-							let mut it = pkg.splitn(2, '-');
-							it.next();
-							if let Some(pkg) = it.next() {
-								drv = pkg;
-							}
-						}
-						// info!(target: "nix","copying {} {} -> {}", drv, from, to);
-						let span = info_span!("copy", from, to, drv);
-						span.pb_start();
-						self.spans.insert(id, span);
-					} else {
-						warn!("bad copy log: {:?}", log)
-					}
-				}
-				NixLog::Start { text, typ, id, .. }
-					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
-				{
-					if !text.is_empty()
-						&& text != "querying info about missing paths"
-						&& text != "copying 0 paths"
-						// Too much spam on lazy-trees branch
-						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))
-					{
-						let span = info_span!("job");
-						span.pb_start();
-						span.pb_set_message(&process_message(text.trim()));
-						self.spans.insert(id, span);
-						info!(target: "nix", "{}", text);
-					}
-				}
-				NixLog::Start {
-					text,
-					level: 0,
-					typ: 108,
-					..
-				} if text.is_empty() => {
-					// Cache lookup? Coupled with copy log
-				}
-				NixLog::Start {
-					text,
-					level: 4,
-					typ: 109,
-					..
-				} if text.starts_with("querying info about ") => {
-					// Cache lookup
-				}
-				NixLog::Start {
-					text,
-					level: 4,
-					typ: 101,
-					..
-				} if text.starts_with("downloading ") => {
-					// NAR downloading, coupled with copy log
-				}
-				NixLog::Start {
-					text,
-					level: 1,
-					typ: 111,
-					..
-				} if text.starts_with("waiting for a machine to build ") => {
-					// Useless repeating notification about build
-				}
-				NixLog::Start {
-					text,
-					level: 3,
-					typ: 111,
-					..
-				} if text.starts_with("resolved derivation: ") => {
-					// CA resolved
-				}
-				NixLog::Start {
-					text,
-					level: 1,
-					typ: 111,
-					id,
-					..
-				} if text.starts_with("waiting for lock on ") => {
-					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
-					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
-						drv = txt;
-					}
-					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
-						drv = txt;
-					}
-					if let Some(txt) = drv.split("', '").next() {
-						drv = txt;
-					}
-					if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-						let mut it = pkg.splitn(2, '-');
-						it.next();
-						if let Some(pkg) = it.next() {
-							drv = pkg;
-						}
-					}
-					let span = info_span!("waiting on drv", drv);
-					span.pb_start();
-					self.spans.insert(id, span);
-					// Concurrent build of the same message
-				}
-				NixLog::Stop { id, .. } => {
-					self.spans.remove(&id);
-				}
-				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
-					if let Some(span) = self.spans.get(&id) {
-						if let LogField::String(s) = &fields[0] {
-							span.pb_set_message(&process_message(s.trim()));
-						} else {
-							warn!("bad fields: {fields:?}");
-						}
-					} else {
-						warn!("unknown result id: {id} {typ} {fields:?}");
-					}
-					// dbg!(fields, id, typ);
-				}
-				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
-					if let Some(span) = self.spans.get(&id) {
-						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
-							&fields[..4]
-						{
-							span.pb_set_length(*expected);
-							span.pb_set_position(*done);
-						} else {
-							warn!("bad fields: {fields:?}");
-						}
-					} else {
-						// warn!("unknown result id: {id} {typ} {fields:?}");
-						// Unaccounted progress.
-					}
-					// dbg!(fields, id, typ);
-				}
-				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
-					// Set phase, expected
-				}
-				_ => warn!("unknown log: {:?}", log),
-			};
-		} else {
-			let e = e.trim();
-			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
-				return;
-			}
-			info!("{e}")
-		}
-	}
 }
 
 async fn run_nix_inner_raw(
@@ -540,6 +267,7 @@
 ) -> Result<Option<Vec<u8>>> {
 	cmd.stderr(Stdio::piped());
 	cmd.stdout(Stdio::piped());
+	debug!("running command {cmd:?} on local");
 	let mut child = cmd.spawn()?;
 	let mut stderr = child.stderr.take().unwrap();
 	let stdout = child.stdout.take().unwrap();
@@ -600,6 +328,7 @@
 	err_handler: &mut dyn Handler,
 	mut out_handler: Option<&mut dyn Handler>,
 ) -> Result<Option<Vec<u8>>> {
+	debug!("running command {cmd:?} over ssh");
 	cmd.stderr(openssh::Stdio::piped());
 	cmd.stdout(openssh::Stdio::piped());
 	let mut child = cmd.spawn().await?;
@@ -656,77 +385,4 @@
 	}
 
 	Ok(out_buf)
-}
-
-pub trait ErrorRecorder: Send {
-	/// Return true to discard message from logging
-	fn push_message(&mut self, msg: &str) -> bool;
-}
-
-#[derive(Debug)]
-enum LogField {
-	String(String),
-	Num(u64),
-}
-
-impl<'de> Deserialize<'de> for LogField {
-	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
-	where
-		D: serde::Deserializer<'de>,
-	{
-		struct StringOrNum;
-		impl<'de> Visitor<'de> for StringOrNum {
-			type Value = LogField;
-
-			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-				write!(f, "string or unsigned")
-			}
-
-			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
-			where
-				E: serde::de::Error,
-			{
-				Ok(LogField::String(v.to_owned()))
-			}
-
-			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
-			where
-				E: serde::de::Error,
-			{
-				Ok(LogField::Num(v))
-			}
-		}
-
-		deserializer.deserialize_any(StringOrNum)
-	}
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "camelCase", tag = "action")]
-#[allow(dead_code)]
-enum NixLog {
-	Msg {
-		level: u32,
-		msg: String,
-		raw_msg: Option<String>,
-	},
-	Start {
-		id: u64,
-		level: u32,
-		#[serde(default)]
-		fields: Vec<LogField>,
-		text: String,
-		#[serde(rename = "type")]
-		typ: u32,
-	},
-	Stop {
-		id: u64,
-	},
-	Result {
-		id: u64,
-		#[serde(rename = "type")]
-		typ: u32,
-		#[serde(default)]
-		fields: Vec<LogField>,
-	},
 }
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
before · cmds/fleet/src/host.rs
1use std::{2	env::current_dir,3	ffi::{OsStr, OsString},4	fmt::Display,5	io::Write,6	ops::Deref,7	path::PathBuf,8	str::FromStr,9	sync::{Arc, Mutex, MutexGuard, OnceLock},10};1112use age::Recipient;13use anyhow::{anyhow, bail, Context, Result};14use clap::{ArgGroup, Parser};15use openssh::SessionBuilder;16use serde::de::DeserializeOwned;17use tempfile::NamedTempFile;1819use crate::{20	better_nix_eval::{Field, NixSessionPool},21	command::MyCommand,22	fleetdata::{FleetData, FleetSecret, FleetSharedSecret, SecretData},23	nix_go, nix_go_json,24};2526pub struct FleetConfigInternals {27	pub local_system: String,28	pub directory: PathBuf,29	pub opts: FleetOpts,30	pub data: Mutex<FleetData>,31	pub nix_args: Vec<OsString>,32	/// fleetConfigurations.<name>.<localSystem>33	pub fleet_field: Field,34	/// fleet_config.configUnchecked35	pub config_field: Field,36	/// fleet_config.unchecked37	pub config_unchecked_field: Field,38}3940#[derive(Clone)]41pub struct Config(Arc<FleetConfigInternals>);4243impl Deref for Config {44	type Target = FleetConfigInternals;4546	fn deref(&self) -> &Self::Target {47		&self.048	}49}5051pub struct ConfigHost {52	pub name: String,53	pub session: OnceLock<Arc<openssh::Session>>,54}55impl ConfigHost {56	pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {57		// FIXME: TOCTOU58		if let Some(session) = &self.session.get() {59			return Ok((*session).clone());60		};61		let session = SessionBuilder::default();6263		let session = session64			.connect(&self.name)65			.await66			.map_err(|e| anyhow!("ssh error: {e}"))?;67		let session = Arc::new(session);68		self.session.set(session.clone()).expect("TOCTOU happened");69		Ok(session)70	}71	pub async fn mktemp_dir(&self) -> Result<String> {72		let mut cmd = self.cmd("mktemp").await?;73		cmd.arg("-d");74		let path = cmd.run_string().await?;75		Ok(path.trim_end().to_owned())76	}77	pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {78		let mut cmd = self.cmd("cat").await?;79		cmd.arg(path);80		cmd.run_bytes().await81	}82	pub async fn read_file_text(&self, path: impl AsRef<OsStr>) -> Result<String> {83		let mut cmd = self.cmd("cat").await?;84		cmd.arg(path);85		cmd.run_string().await86	}87	pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {88		let text = self.read_file_text(path).await?;89		Ok(serde_json::from_str(&text)?)90	}91	pub async fn read_file_value<D: FromStr>(&self, path: impl AsRef<OsStr>) -> Result<D>92	where93		<D as FromStr>::Err: Display,94	{95		let text = self.read_file_text(path).await?;96		D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))97	}98	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {99		let session = self.open_session().await?;100		Ok(MyCommand::new_on(cmd, session))101	}102103	pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {104		let mut cmd = self.cmd("fleet-install-secrets").await?;105		cmd.arg("decrypt").eqarg("--secret", data.encode_z85());106		let encoded = cmd107			.sudo()108			.run_string()109			.await110			.context("failed to call remote host for decrypt")?;111		z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")112	}113	/// Returns path for futureproofing, as path might change i.e on conversion to CA114	pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {115		let mut nix = MyCommand::new("nix");116		nix.arg("copy")117			.arg("--substitute-on-destination")118			.comparg("--to", format!("ssh-ng://{}", self.name))119			.arg(path);120		nix.run_nix().await?;121		Ok(path.to_owned())122	}123}124125impl Config {126	pub fn should_skip(&self, host: &str) -> bool {127		if !self.opts.skip.is_empty() {128			self.opts.skip.iter().any(|h| h as &str == host)129		} else if !self.opts.only.is_empty() {130			!self.opts.only.iter().any(|h| h as &str == host)131		} else {132			false133		}134	}135	pub fn is_local(&self, host: &str) -> bool {136		self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)137	}138139	pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {140		if sudo {141			command = command.sudo();142		}143		if !self.is_local(host) {144			command = command.ssh(host);145		}146		command.run().await147	}148	pub async fn run_string_on(149		&self,150		host: &str,151		mut command: MyCommand,152		sudo: bool,153	) -> Result<String> {154		if sudo {155			command = command.sudo();156		}157		if !self.is_local(host) {158			command = command.ssh(host);159		}160		command.run_string().await161	}162163	pub async fn host(&self, name: &str) -> Result<ConfigHost> {164		Ok(ConfigHost {165			name: name.to_owned(),166			session: OnceLock::new(),167		})168	}169	pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {170		let fleet_field = &self.fleet_field;171		let names = nix_go!(fleet_field.configuredHosts).list_fields().await?;172		let mut out = vec![];173		for name in names {174			out.push(ConfigHost {175				name,176				session: OnceLock::new(),177			})178		}179		Ok(out)180	}181	pub async fn system_config(&self, host: &str) -> Result<Field> {182		let fleet_field = &self.fleet_field;183		Ok(nix_go!(fleet_field.configuredSystems[{ host }].config))184	}185186	pub(super) fn data(&self) -> MutexGuard<FleetData> {187		self.data.lock().unwrap()188	}189	pub(super) fn data_mut(&self) -> MutexGuard<FleetData> {190		self.data.lock().unwrap()191	}192	/// Shared secrets configured in fleet.nix or in flake193	pub async fn list_configured_shared(&self) -> Result<Vec<String>> {194		let config_field = &self.config_unchecked_field;195		nix_go!(config_field.configUnchecked.sharedSecrets)196			.list_fields()197			.await198	}199	/// Shared secrets configured in fleet.nix200	pub fn list_shared(&self) -> Vec<String> {201		let data = self.data();202		data.shared_secrets.keys().cloned().collect()203	}204	pub fn has_shared(&self, name: &str) -> bool {205		let data = self.data();206		data.shared_secrets.contains_key(name)207	}208	pub fn replace_shared(&self, name: String, shared: FleetSharedSecret) {209		let mut data = self.data_mut();210		data.shared_secrets.insert(name.to_owned(), shared);211	}212	pub fn remove_shared(&self, secret: &str) {213		let mut data = self.data_mut();214		data.shared_secrets.remove(secret);215	}216217	pub fn has_secret(&self, host: &str, secret: &str) -> bool {218		let data = self.data();219		let Some(host_secrets) = data.host_secrets.get(host) else {220			return false;221		};222		host_secrets.contains_key(secret)223	}224	pub fn insert_secret(&self, host: &str, secret: String, value: FleetSecret) {225		let mut data = self.data_mut();226		let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();227		host_secrets.insert(secret, value);228	}229230	pub async fn reencrypt_on_host(231		&self,232		host: &str,233		data: SecretData,234		targets: Vec<String>,235	) -> Result<SecretData> {236		let mut recmd = MyCommand::new("fleet-install-secrets");237		recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());238		for target in targets {239			recmd.eqarg("--targets", target);240		}241		recmd = recmd.sudo().ssh(host);242		let encoded = recmd243			.run_string()244			.await245			.context("failed to call remote host for decrypt")?246			.trim()247			.to_owned();248		SecretData::decode_z85(&encoded)249	}250251	pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {252		let data = self.data();253		let Some(host_secrets) = data.host_secrets.get(host) else {254			bail!("no secrets for machine {host}");255		};256		let Some(secret) = host_secrets.get(secret) else {257			bail!("machine {host} has no secret {secret}");258		};259		Ok(secret.clone())260	}261	pub fn shared_secret(&self, secret: &str) -> Result<FleetSharedSecret> {262		let data = self.data();263		let Some(secret) = data.shared_secrets.get(secret) else {264			bail!("no shared secret {secret}");265		};266		Ok(secret.clone())267	}268	pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {269		let config_field = &self.config_unchecked_field;270		Ok(nix_go_json!(271			config_field.configUnchecked.sharedSecrets[{ secret }].expectedOwners272		))273	}274275	pub fn save(&self) -> Result<()> {276		let mut tempfile = NamedTempFile::new_in(self.directory.clone())?;277		let data = nixlike::serialize(&self.data() as &FleetData)?;278		tempfile.write_all(279			format!(280				"# This file contains fleet state and shouldn't be edited by hand\n\n{}\n\n# vim: ts=2 et nowrap\n",281				data282			)283			.as_bytes(),284		)?;285		let mut fleet_data_path = self.directory.clone();286		fleet_data_path.push("fleet.nix");287		tempfile.persist(fleet_data_path)?;288		Ok(())289	}290}291292#[derive(Parser, Clone)]293#[clap(group = ArgGroup::new("target_hosts"))]294pub struct FleetOpts {295	/// All hosts except those would be skipped296	#[clap(long, number_of_values = 1, group = "target_hosts")]297	only: Vec<String>,298299	/// Hosts to skip300	#[clap(long, number_of_values = 1, group = "target_hosts")]301	skip: Vec<String>,302303	/// Host, which should be threaten as current machine304	#[clap(long)]305	pub localhost: Option<String>,306307	/// Override detected system for host, to perform builds via308	/// binfmt-declared qemu instead of trying to crosscompile309	#[clap(long, default_value = "detect")]310	pub local_system: String,311}312313impl FleetOpts {314	pub async fn build(mut self, nix_args: Vec<OsString>) -> Result<Config> {315		if self.localhost.is_none() {316			self.localhost317				.replace(hostname::get().unwrap().to_str().unwrap().to_owned());318		}319		let directory = current_dir()?;320321		let pool = NixSessionPool::new(directory.as_os_str().to_owned(), nix_args.clone()).await?;322		let root_field = pool.get().await?;323324		if self.local_system == "detect" {325			let builtins_field = Field::field(root_field.clone(), "builtins").await?;326			self.local_system = nix_go_json!(builtins_field.currentSystem);327		}328		let local_system = self.local_system.clone();329330		let fleet_root = Field::field(root_field, "fleetConfigurations").await?;331332		let fleet_field = nix_go!(fleet_root.default);333		let config_field = nix_go!(fleet_field.configUnchecked);334		let config_unchecked_field = nix_go!(fleet_field.unchecked);335336		let mut fleet_data_path = directory.clone();337		fleet_data_path.push("fleet.nix");338		let bytes = std::fs::read_to_string(fleet_data_path)?;339		let data = nixlike::parse_str(&bytes)?;340341		Ok(Config(Arc::new(FleetConfigInternals {342			opts: self,343			directory,344			data,345			local_system,346			nix_args,347			fleet_field,348			config_field,349			config_unchecked_field,350		})))351	}352}
modifiedcmds/fleet/src/keys.rsdiffbeforeafterboth
--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
 use std::str::FromStr;
 
-use crate::command::MyCommand;
 use crate::host::Config;
 use age::Recipient;
 use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
 			Ok(key)
 		} else {
 			warn!("Loading key for {}", host);
-			let mut cmd = MyCommand::new("cat");
+			let host = self.host(host).await?;
+			let mut cmd = host.cmd("cat").await?;
 			cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
-			let key = self.run_string_on(host, cmd, false).await?;
-			self.update_key(host, key.clone());
+			let key = cmd.run_string().await?;
+			self.update_key(&host.name, key.clone());
 			Ok(key)
 		}
 	}
addedcmds/remowt-agent/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
addedcmds/remowt-agent/README.adocdiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
addedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+	println!("Hello, world!");
+}
addedcrates/better-command/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
addedcrates/better-command/src/handler.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+	fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+	fn clone(&self) -> Self {
+		Self(self.0.clone())
+	}
+}
+impl<H> ClonableHandler<H> {
+	pub fn new(inner: H) -> Self {
+		Self(Arc::new(Mutex::new(inner)))
+	}
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+	fn handle_line(&mut self, e: &str) {
+		self.0.lock().unwrap().handle_line(e)
+	}
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+	fn handle_line(&mut self, e: &str) {
+		info!(target: "log", "{e}");
+	}
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+	fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+	spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+	String(String),
+	Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+	Msg {
+		level: u32,
+		msg: String,
+		raw_msg: Option<String>,
+	},
+	Start {
+		id: u64,
+		level: u32,
+		#[serde(default)]
+		fields: Vec<LogField>,
+		text: String,
+		#[serde(rename = "type")]
+		typ: u32,
+	},
+	Stop {
+		id: u64,
+	},
+	Result {
+		id: u64,
+		#[serde(rename = "type")]
+		typ: u32,
+		#[serde(default)]
+		fields: Vec<LogField>,
+	},
+}
+fn process_message(m: &str) -> String {
+	// Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+	static OSC_CLEANER: Lazy<Regex> =
+		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+	let m = OSC_CLEANER.replace_all(m, "");
+	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+	DETABBER.replace_all(m.as_ref(), "  ").to_string()
+}
+impl Handler for NixHandler {
+	fn handle_line(&mut self, e: &str) {
+		if let Some(e) = e.strip_prefix("@nix ") {
+			let log: NixLog = match serde_json::from_str(e) {
+				Ok(l) => l,
+				Err(err) => {
+					warn!("failed to parse nix log line {:?}: {}", e, err);
+					return;
+				}
+			};
+			match log {
+				NixLog::Msg { msg, raw_msg, .. } => {
+					#[allow(clippy::nonminimal_bool)]
+					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+						if let Some(raw_msg) = raw_msg {
+							if !msg.is_empty() {
+								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+							} else {
+								info!(target: "nix", "{}", raw_msg.trim_end())
+							}
+						} else {
+							info!(target: "nix", "{}", msg.trim_end())
+						}
+					}
+				}
+				NixLog::Start {
+					ref fields,
+					typ,
+					id,
+					..
+				} if typ == 105 && !fields.is_empty() => {
+					if let [LogField::String(drv), ..] = &fields[..] {
+						let mut drv = drv.as_str();
+						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+							let mut it = pkg.splitn(2, '-');
+							it.next();
+							if let Some(pkg) = it.next() {
+								drv = pkg;
+							}
+						}
+						info!(target: "nix","building {}", drv);
+						let span = info_span!("build", drv);
+						span.pb_start();
+						self.spans.insert(id, span);
+					} else {
+						warn!("bad build log: {:?}", log)
+					}
+				}
+				NixLog::Start {
+					ref fields,
+					typ,
+					id,
+					..
+				} if typ == 100 && fields.len() >= 3 => {
+					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+						&fields[..]
+					{
+						let mut drv = drv.as_str();
+
+						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+							let mut it = pkg.splitn(2, '-');
+							it.next();
+							if let Some(pkg) = it.next() {
+								drv = pkg;
+							}
+						}
+						// info!(target: "nix","copying {} {} -> {}", drv, from, to);
+						let span = info_span!("copy", from, to, drv);
+						span.pb_start();
+						self.spans.insert(id, span);
+					} else {
+						warn!("bad copy log: {:?}", log)
+					}
+				}
+				NixLog::Start { text, typ, id, .. }
+					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+				{
+					if !text.is_empty()
+						&& text != "querying info about missing paths"
+						&& text != "copying 0 paths"
+						// Too much spam on lazy-trees branch
+						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))
+					{
+						let span = info_span!("job");
+						span.pb_start();
+						span.pb_set_message(&process_message(text.trim()));
+						self.spans.insert(id, span);
+						info!(target: "nix", "{}", text);
+					}
+				}
+				NixLog::Start {
+					text,
+					level: 0,
+					typ: 108,
+					..
+				} if text.is_empty() => {
+					// Cache lookup? Coupled with copy log
+				}
+				NixLog::Start {
+					text,
+					level: 4,
+					typ: 109,
+					..
+				} if text.starts_with("querying info about ") => {
+					// Cache lookup
+				}
+				NixLog::Start {
+					text,
+					level: 4,
+					typ: 101,
+					..
+				} if text.starts_with("downloading ") => {
+					// NAR downloading, coupled with copy log
+				}
+				NixLog::Start {
+					text,
+					level: 1,
+					typ: 111,
+					..
+				} if text.starts_with("waiting for a machine to build ") => {
+					// Useless repeating notification about build
+				}
+				NixLog::Start {
+					text,
+					level: 3,
+					typ: 111,
+					..
+				} if text.starts_with("resolved derivation: ") => {
+					// CA resolved
+				}
+				NixLog::Start {
+					text,
+					level: 1,
+					typ: 111,
+					id,
+					..
+				} if text.starts_with("waiting for lock on ") => {
+					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+						drv = txt;
+					}
+					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+						drv = txt;
+					}
+					if let Some(txt) = drv.split("', '").next() {
+						drv = txt;
+					}
+					if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+						let mut it = pkg.splitn(2, '-');
+						it.next();
+						if let Some(pkg) = it.next() {
+							drv = pkg;
+						}
+					}
+					let span = info_span!("waiting on drv", drv);
+					span.pb_start();
+					self.spans.insert(id, span);
+					// Concurrent build of the same message
+				}
+				NixLog::Stop { id, .. } => {
+					self.spans.remove(&id);
+				}
+				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+					if let Some(span) = self.spans.get(&id) {
+						if let LogField::String(s) = &fields[0] {
+							span.pb_set_message(&process_message(s.trim()));
+						} else {
+							warn!("bad fields: {fields:?}");
+						}
+					} else {
+						warn!("unknown result id: {id} {typ} {fields:?}");
+					}
+					// dbg!(fields, id, typ);
+				}
+				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+					if let Some(span) = self.spans.get(&id) {
+						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+							&fields[..4]
+						{
+							span.pb_set_length(*expected);
+							span.pb_set_position(*done);
+						} else {
+							warn!("bad fields: {fields:?}");
+						}
+					} else {
+						// warn!("unknown result id: {id} {typ} {fields:?}");
+						// Unaccounted progress.
+					}
+					// dbg!(fields, id, typ);
+				}
+				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+					// Set phase, expected
+				}
+				_ => warn!("unknown log: {:?}", log),
+			};
+		} else {
+			let e = e.trim();
+			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+				return;
+			}
+			info!("{e}")
+		}
+	}
+}
addedcrates/better-command/src/lib.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+	left + right
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+
+	#[test]
+	fn it_works() {
+		let result = add(2, 2);
+		assert_eq!(result, 4);
+	}
+}