git.delta.rocks / jrsonnet / refs/commits / 7c6930a6bff0

difftreelog

refactor remove shell-outs for ssh

Yaroslav Bolyukin2023-12-29parent: #904d121.patch.diff
in: trunk

15 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
 checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
 
 [[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
 name = "bitflags"
 version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
  "anyhow",
  "async-trait",
  "base64 0.21.5",
+ "better-command",
  "chrono",
  "clap",
  "futures",
@@ -1510,9 +1523,9 @@
 
 [[package]]
 name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
 
 [[package]]
 name = "opaque-debug"
@@ -1922,6 +1935,10 @@
 checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
 
 [[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
 name = "rnix"
 version = "0.10.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
 
 [[package]]
 name = "serde"
-version = "1.0.190"
+version = "1.0.193"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
 dependencies = [
  "serde_derive",
 ]
@@ -2123,9 +2140,9 @@
 
 [[package]]
 name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2134,9 +2151,9 @@
 
 [[package]]
 name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
 dependencies = [
  "itoa",
  "ryu",
@@ -2527,11 +2544,10 @@
 
 [[package]]
 name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
 dependencies = [
- "cfg-if",
  "pin-project-lite",
  "tracing-attributes",
  "tracing-core",
@@ -2539,9 +2555,9 @@
 
 [[package]]
 name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2550,9 +2566,9 @@
 
 [[package]]
 name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
 dependencies = [
  "once_cell",
  "valuable",
@@ -2560,9 +2576,9 @@
 
 [[package]]
 name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
 dependencies = [
  "indicatif",
  "tracing",
modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
 [workspace]
 members = ["crates/*", "cmds/*"]
 resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
 edition = "2021"
 
 [dependencies]
+nixlike.workspace = true
+better-command.workspace = true
 anyhow = "1.0"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
@@ -15,7 +17,6 @@
 hostname = "0.3.1"
 age-core = "0.9.0"
 peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
 age = { version = "0.9.2", features = ["ssh", "armor"] }
 base64 = "0.21.5"
 chrono = { version = "0.4.31", features = ["serde"] }
modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
after · cmds/fleet/src/better_nix_eval.rs
1//! Wrapper around nix repl, which allows to work on nix code, without relying on2//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.34use std::collections::HashMap;5use std::ffi::{OsStr, OsString};6use std::fmt::{self, Display};7use std::path::PathBuf;8use std::process::Stdio;9use std::sync::{Arc, OnceLock};1011use anyhow::{anyhow, bail, ensure, Context, Result};12use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};13use futures::StreamExt;14use itertools::Itertools;15use r2d2::{Pool, PooledConnection};16use serde::de::DeserializeOwned;17use serde::{Deserialize, Serialize};18use tokio::io::AsyncWriteExt;19use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};20use tokio::select;21use tokio::sync::{mpsc, oneshot, Mutex};22use tokio_util::codec::{FramedRead, LinesCodec};23use tracing::{debug, error, warn, Level};2425const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2627pub struct NixSessionInner {28	full_delimiter: String,29	nix_handler: ClonableHandler<NixHandler>,30	out: OutputHandler,31	stdin: ChildStdin,32	string_wrapping: (String, String),33	number_wrapping: (String, String),3435	executing_command: Arc<Mutex<()>>,3637	next_id: u32,38	free_list: Vec<u32>,39}40const TRAIN_STRING: &str = "\"TRAIN_STRING\"";41const TRAIN_NUMBER: &str = "13141516";4243#[must_use]44struct ErrorCollector<'i, H> {45	collected: Vec<String>,46	inner: &'i mut H,47}48impl<'i, H> ErrorCollector<'i, H> {49	fn new(inner: &'i mut H) -> Self {50		Self {51			collected: vec![],52			inner,53		}54	}55}56impl<H> ErrorCollector<'_, H> {57	fn handle_line_inner(&mut self, msg: &str) -> bool {58		let Some(msg) = msg.strip_prefix("@nix ") else {59			return false;60		};61		#[derive(Deserialize)]62		struct ErrorAction {63			action: String,64			level: u32,65			msg: String,66		}67		let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {68			return false;69		};70		if act.action != "msg" || act.level != 0 {71			return false;72		}73		self.collected.push(act.msg);74		true75	}76	fn finish(self) -> Result<()> {77		// fn dedent(s: String) -> String {78		// 	s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)79		// }80		if !self.collected.is_empty() {81			bail!(82				"{}",83				self.collected84					.iter()85					.map(|v| {86						if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {87							let v = unindent::unindent(f.trim_start());88							v.trim().to_owned()89						} else {90							v.to_owned()91						}92					})93					.join("\n")94			);95		}96		Ok(())97	}98	fn flush(self) {99		for line in self.collected {100			warn!("{line}");101		}102	}103}104impl<H: Handler> Handler for ErrorCollector<'_, H> {105	fn handle_line(&mut self, e: &str) {106		if self.handle_line_inner(e) {107			return;108		}109		self.inner.handle_line(e)110	}111}112113enum OutputLine {114	Out(String),115	Err(String),116}117struct OutputHandler {118	rx: mpsc::Receiver<OutputLine>,119	_cancel_handle: oneshot::Receiver<()>,120}121impl OutputHandler {122	fn new(out: ChildStdout, err: ChildStderr) -> Self {123		let mut out = FramedRead::new(out, LinesCodec::new());124		let mut err = FramedRead::new(err, LinesCodec::new());125		let (tx, rx) = mpsc::channel(20);126		let (mut cancelled, _cancel_handle) = oneshot::channel();127		tokio::spawn(async move {128			loop {129				select! {130					// We should receive errors earlier than synchronization131					biased;132					e = err.next() => {133						let Some(Ok(e)) = e else {134							if e.is_some() {135								error!("bad repl stderr: {e:?}");136							}137							continue;138						};139						let _ = tx.send(OutputLine::Err(e)).await;140					}141					o = out.next() => {142						let Some(Ok(o)) = o else {143							if o.is_some() {144								error!("bad repl stdout: {o:?}");145							}146							continue;147						};148						let _ = tx.send(OutputLine::Out(o)).await;149					}150					// Reader doesn't care about stdout, as this is cancelled.151					// Error still might be useful, to process leftover span closures?152					_ = cancelled.closed() => {153						break;154					}155				}156			}157		});158		Self { rx, _cancel_handle }159	}160	async fn next(&mut self) -> Option<OutputLine> {161		self.rx.recv().await162	}163}164165struct WarnHandler;166impl Handler for WarnHandler {167	fn handle_line(&mut self, e: &str) {168		warn!(target: "nix", "{e}")169	}170}171172impl NixSessionInner {173	async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {174		let mut cmd = Command::new("nix");175		cmd.arg("repl")176			.arg(flake)177			.arg("--log-format")178			.arg("internal-json");179		for arg in extra_args {180			cmd.arg(arg);181		}182		cmd.stdin(Stdio::piped());183		cmd.stdout(Stdio::piped());184		cmd.stderr(Stdio::piped());185		let cmd = cmd.spawn()?;186		let stdout = cmd.stdout.unwrap();187		let stderr = cmd.stderr.unwrap();188		let mut out = OutputHandler::new(stdout, stderr);189		let mut stdin = cmd.stdin.unwrap();190		// Standard repl hello doesn't work with internal-json logger191		stdin.write_all(REPL_DELIMITER.as_bytes()).await?;192		stdin.write_all(b"\n").await?;193		stdin.flush().await?;194		let nix_handler = NixHandler::default();195		let mut full_delimiter = None;196		let mut errors = vec![];197		while let Some(line) = out.next().await {198			let line = match line {199				OutputLine::Out(o) => o,200				OutputLine::Err(_e) => {201					// Handle startup errors, but skip repl hello?202					errors.push(_e);203					continue;204				}205			};206			if line.contains(REPL_DELIMITER) {207				debug!("discovered repl delimiter with added colors: {line}");208				full_delimiter = Some(line.to_owned());209				break;210			}211		}212		let Some(full_delimiter) = full_delimiter else {213			for e in errors {214				error!("{e}");215			}216			bail!("failed to discover delimiter");217		};218		let mut res = Self {219			full_delimiter,220			nix_handler: ClonableHandler::new(nix_handler),221			out,222			stdin,223			string_wrapping: Default::default(),224			number_wrapping: Default::default(),225226			executing_command: Arc::new(Mutex::new(())),227228			next_id: 0,229			free_list: vec![],230		};231		res.train().await?;232		Ok(res)233	}234	async fn train(&mut self) -> Result<()> {235		{236			let full_string = self237				.execute_expression_raw(TRAIN_STRING, &mut NoopHandler)238				.await?;239			let string_offset = full_string.find(TRAIN_STRING).expect("contained");240			let string_prefix = &full_string[..string_offset];241			let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];242			self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());243		}244		{245			let full_number = self246				.execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)247				.await?;248			let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");249			let number_prefix = &full_number[..number_offset];250			let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];251			self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());252		}253		Ok(())254	}255	async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {256		if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {257			let cmd_str = String::from_utf8_lossy(cmd.as_ref());258			tracing::debug!("{cmd_str}");259		};260		self.stdin.write_all(cmd.as_ref()).await?;261		self.stdin.write_all(b"\n").await?;262		Ok(())263	}264	async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {265		let mut out = String::new();266		while let Some(line) = self.out.next().await {267			let line = match line {268				OutputLine::Out(out) => out,269				OutputLine::Err(err) => {270					err_handler.handle_line(&err);271					continue;272				}273			};274			if line == self.full_delimiter {275				return Ok(out);276			}277			if !out.is_empty() {278				out.push('\n');279			}280			out.push_str(&line);281		}282		bail!("didn't reached delimiter");283	}284	async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {285		let num = self.number_wrapping.clone();286		let n = self.execute_expression_wrapping(expr, &num).await?;287		Ok(n.parse::<u64>()?)288	}289	async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {290		let num = self.string_wrapping.clone();291		let n = self.execute_expression_wrapping(expr, &num).await?;292		let str: String = serde_json::from_str(&n)?;293		Ok(str)294	}295	async fn execute_expression_to_json<V: DeserializeOwned>(296		&mut self,297		expr: impl AsRef<[u8]>,298	) -> Result<V> {299		let mut fexpr = b"builtins.toJSON (".to_vec();300		fexpr.extend_from_slice(expr.as_ref());301		fexpr.push(b')');302		let v = self.execute_expression_string(fexpr).await?;303		Ok(serde_json::from_str(&v)?)304	}305	async fn execute_expression_wrapping(306		&mut self,307		expr: impl AsRef<[u8]>,308		wrapping: &(String, String),309	) -> Result<String> {310		let mut nix_handler = self.nix_handler.clone();311		let mut collected = ErrorCollector::new(&mut nix_handler);312		let res = self.execute_expression_raw(expr, &mut collected).await?;313		if res.is_empty() {314			collected.finish()?;315			bail!("expected expression, got nothing")316		} else {317			collected.flush()318		};319		let Some(res) = res.strip_prefix(&wrapping.0) else {320			bail!("invalid type")321		};322		let Some(res) = res.strip_suffix(&wrapping.1) else {323			bail!("invalid type")324		};325		Ok(res.to_owned())326	}327	async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {328		let mut nix_handler = self.nix_handler.clone();329		let mut collected = ErrorCollector::new(&mut nix_handler);330		let v = self.execute_expression_raw(expr, &mut collected).await?;331		collected.finish()?;332		ensure!(v.is_empty(), "unexpected expression result");333		Ok(())334	}335	async fn execute_expression_raw(336		&mut self,337		expr: impl AsRef<[u8]>,338		err_handler: &mut dyn Handler,339	) -> Result<String> {340		// Prevent two commands from being executed in parallel, messing with each other.341		let _lock = self.executing_command.clone();342		let _guard = _lock.lock().await;343344		self.send_command(expr).await?;345		// It will be echoed346		self.send_command(REPL_DELIMITER).await?;347		self.read_until_delimiter(err_handler).await348	}349	async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {350		let id = self.allocate_id();351		self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))352			.await?;353		Ok(id)354	}355356	/// Id should be immediately used357	fn allocate_id(&mut self) -> u32 {358		if let Some(free) = self.free_list.pop() {359			free360		} else {361			let v = self.next_id;362			self.next_id += 1;363			v364		}365	}366	// Nix has no way to deallocate variable, yet GC will correct everything not reachable.367	// async fn free_id(&mut self, id: u32) -> Result<()> {368	// 	self.execute_expression_empty(format!("sess_field_{id} = null"))369	// 		.await?;370	// 	self.free_list.push(id);371	// 	Ok(())372	// }373}374375#[derive(Clone)]376pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);377378#[derive(Clone)]379pub struct NixExprBuilder {380	out: String,381	used_fields: Vec<Field>,382}383impl NixExprBuilder {384	pub fn object() -> Self {385		NixExprBuilder {386			out: "{ ".to_owned(),387			used_fields: Vec::new(),388		}389	}390	pub fn string(s: &str) -> Self {391		NixExprBuilder {392			out: nixlike::serialize(s)393				.expect("no problems with serializing_string")394				.trim_end()395				.to_owned(),396			used_fields: Vec::new(),397		}398	}399	pub fn serialized(v: impl Serialize) -> Self {400		let serialized = nixlike::serialize(v).expect("invalid value for apply");401		Self {402			out: serialized.trim_end().to_owned(),403			used_fields: Vec::new(),404		}405	}406	pub fn field(f: Field) -> Self {407		Self {408			out: format!("sess_field_{}", f.0.value.expect("no value")),409			used_fields: vec![f],410		}411	}412	pub fn end_obj(&mut self) {413		self.out.push('}');414	}415	pub fn obj_key(&mut self, name: Self, value: Self) {416		self.out.push_str(r#""${"#);417		self.extend(name);418		self.out.push_str(r#"}" = "#);419		self.extend(value);420		self.out.push_str("; ");421	}422423	pub fn extend(&mut self, e: Self) {424		self.out.push_str(&e.out);425		self.used_fields.extend(e.used_fields);426	}427428	pub fn session(&self) -> NixSession {429		let mut session = None;430		for ele in &self.used_fields {431			if session.is_none() {432				session = Some(ele.0.session.clone());433				continue;434			}435			let session = &session.as_ref().expect("checked").0;436			let ele_sess = &ele.0.session.0;437			assert!(438				Arc::ptr_eq(session, ele_sess),439				"can't mix fields from different session"440			);441		}442		session.expect("expr without fields used")443	}444	pub fn index_attr(&mut self, s: &str) {445		let escaped = nixlike::serialize(s).expect("string");446		self.out.push('.');447		self.out.push_str(escaped.trim_end());448	}449}450451#[macro_export]452macro_rules! nix_expr_inner {453	(Obj { $($ident:ident: $($val:tt)+),* $(,)? }) => {{454		use $crate::better_nix_eval::NixExprBuilder;455		let mut out = NixExprBuilder::object();456		$(457			out.obj_key(458				NixExprBuilder::string(stringify!($ident)),459				$crate::nix_expr_inner!($($val)+),460			);461		)*462		out.end_obj();463		out464	}};465	(@field($o:ident) . $var:ident $($tt:tt)*) => {{466		$o.index_attr(stringify!($var));467		nix_expr_inner!(@field($o) $($tt)*);468	}};469	(@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{470		$o.push(Index::attr(&$v));471		nix_expr_inner!(@o($o) $($tt)*);472	}};473	(@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{474		$o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));475		nix_expr_inner!(@o($o) $($tt)*);476	}};477	(@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {478		$o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));479		nix_expr_inner!(@o($o) $($tt)*);480	};481	(@field($o:ident)) => {};482	($field:ident $($tt:tt)*) => {{483		use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};484		#[allow(unused_mut, reason = "might be used if indexed")]485		let mut out = NixExprBuilder::field($field.clone());486		nix_expr_inner!(@field(out) $($tt)*);487		out488	}};489	($v:literal) => {{490		use $crate::better_nix_eval::NixExprBuilder;491		NixExprBuilder::string($v)492	}};493	({$v:expr}) => {{494		use $crate::better_nix_eval::NixExprBuilder;495		NixExprBuilder::serialized(&$v)496	}}497}498#[macro_export]499macro_rules! nix_expr {500	($($tt:tt)+) => {{501		use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};502		let expr = nix_expr_inner!($($tt)+);503		Field::new(expr.session(), expr.out)504	}};505}506507#[macro_export]508macro_rules! nix_go {509	(@o($o:ident) . $var:ident $($tt:tt)*) => {{510		$o.push(Index::attr(stringify!($var)));511		nix_go!(@o($o) $($tt)*);512	}};513	(@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{514		$o.push(Index::attr(&$v));515		nix_go!(@o($o) $($tt)*);516	}};517	(@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{518		$o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));519		nix_go!(@o($o) $($tt)*);520	}};521	(@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {522		$o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));523		nix_go!(@o($o) $($tt)*);524	};525	(@o($o:ident)) => {};526	($field:ident $($tt:tt)+) => {{527		use $crate::{nix_go, better_nix_eval::Index};528		let field = $field.clone();529		let mut out = vec![];530		nix_go!(@o(out) $($tt)*);531		field.select(out).await?532	}}533}534#[macro_export]535macro_rules! nix_go_json {536	($($tt:tt)*) => {{537		$crate::nix_go!($($tt)*).as_json().await?538	}};539}540541#[derive(Clone)]542pub enum Index {543	Var(String),544	String(String),545	Apply(String),546	Expr(NixExprBuilder),547	ExprApply(NixExprBuilder),548}549impl Index {550	pub fn var(v: impl AsRef<str>) -> Self {551		let v = v.as_ref();552		assert!(553			!(v.contains('.') | v.contains(' ')),554			"bad variable name: {v}"555		);556		Self::Var(v.to_owned())557	}558	pub fn attr(v: impl AsRef<str>) -> Self {559		Self::String(v.as_ref().to_owned())560	}561	pub fn apply(v: impl Serialize) -> Self {562		let serialized = nixlike::serialize(v).expect("invalid value for apply");563		Self::Apply(serialized.trim_end().to_owned())564	}565}566impl Display for Index {567	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {568		match self {569			Index::Var(v) => {570				write!(f, "{v}")571			}572			Index::String(k) => {573				let v = nixlike::format_identifier(k.as_str());574				write!(f, ".{v}")575			}576			Index::Apply(o) => {577				write!(f, "<apply>({o})")578			}579			Index::Expr(e) => {580				write!(f, "[{}]", e.out)581			}582			Index::ExprApply(e) => {583				write!(f, "<apply>({})", e.out)584			}585		}586	}587}588impl fmt::Debug for Index {589	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {590		write!(f, "{self}")591	}592}593struct PathDisplay<'i>(&'i [Index]);594impl Display for PathDisplay<'_> {595	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {596		for i in self.0 {597			write!(f, "{i}")?;598		}599		Ok(())600	}601}602struct FieldInner {603	full_path: Option<Vec<Index>>,604	session: NixSession,605	value: Option<u32>,606}607fn context(full_path: Option<&[Index]>, query: &str) -> String {608	if let Some(full_path) = &full_path {609		format!("full path: {}", PathDisplay(full_path))610	} else {611		format!("query: {query:?}")612	}613}614#[derive(Clone)]615pub struct Field(Arc<FieldInner>);616impl Field {617	fn root(session: NixSession) -> Self {618		Self(Arc::new(FieldInner {619			full_path: Some(vec![]),620			session,621			value: None,622		}))623	}624	async fn new(session: NixSession, query: &str) -> Result<Self> {625		let vid = session626			.0627			.lock()628			.await629			.execute_assign(query)630			.await631			.with_context(|| context(None, query))?;632		Ok(Self(Arc::new(FieldInner {633			full_path: None,634			session,635			value: Some(vid),636		})))637	}638	pub async fn field(session: NixSession, field: &str) -> Result<Self> {639		Self::root(session).select([Index::var(field)]).await640	}641	pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {642		let mut used_fields = Vec::new();643		let mut name = name.into_iter();644645		let mut full_path = self.0.full_path.clone();646		let mut query = if let Some(id) = self.0.value {647			format!("sess_field_{id}")648		} else {649			let first = name.next();650			if let Some(Index::Var(i)) = first {651				if let Some(full_path) = &mut full_path {652					full_path.push(Index::Var(i.clone()));653				}654				i.clone()655			} else {656				panic!("first path item should be variable, got {first:?}")657			}658		};659		for v in name {660			if let Some(full_path) = &mut full_path {661				full_path.push(v.clone());662			}663			match v {664				Index::Var(_) => panic!("var item may only be first"),665				Index::String(s) => {666					let escaped = nixlike::serialize(s)?;667					query.push('.');668					query.push_str(escaped.trim());669				}670				Index::Apply(a) => {671					// In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`672					query = format!("({query} {a})");673				}674				Index::Expr(e) => {675					let index = Field::new(self.0.session.clone(), &e.out).await?;676					used_fields.push(index.clone());677					query.push('.');678					let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));679					query.push_str(&index);680				}681				Index::ExprApply(e) => {682					let index = Field::new(self.0.session.clone(), &e.out).await?;683					used_fields.push(index.clone());684					query.push(' ');685					let index = format!("sess_field_{}", index.0.value.expect("value"));686					query.push_str(&index);687					query = format!("({query})");688				}689			}690		}691692		let vid = self693			.0694			.session695			.0696			.lock()697			.await698			.execute_assign(&query)699			.await700			.with_context(|| {701				if let Some(full_path) = &full_path {702					format!("full path: {}", PathDisplay(full_path))703				} else {704					format!("query: {query:?}")705				}706			})?;707		Ok(Self(Arc::new(FieldInner {708			full_path,709			session: self.0.session.clone(),710			value: Some(vid),711		})))712	}713	pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {714		let id = self.0.value.expect("can't serialize root field");715		let query = format!("sess_field_{id}");716		self.0717			.session718			.0719			.lock()720			.await721			.execute_expression_to_json(&query)722			.await723			.with_context(|| context(self.0.full_path.as_deref(), &query))724	}725	pub async fn has_field(&self, name: &str) -> Result<bool> {726		let id = self.0.value.expect("can't list root fields");727		let key = nixlike::escape_string(name);728		let query = format!("sess_field_{id} ? {key}");729		self.0730			.session731			.0732			.lock()733			.await734			.execute_expression_to_json(&query)735			.await736			.with_context(|| context(self.0.full_path.as_deref(), &query))737	}738	pub async fn list_fields(&self) -> Result<Vec<String>> {739		let id = self.0.value.expect("can't list root fields");740		let query = format!("builtins.attrNames sess_field_{id}");741		self.0742			.session743			.0744			.lock()745			.await746			.execute_expression_to_json(&query)747			.await748			.with_context(|| context(self.0.full_path.as_deref(), &query))749	}750	pub async fn type_of(&self) -> Result<String> {751		let id = self.0.value.expect("can't list root fields");752		let query = format!("builtins.typeOf sess_field_{id}");753		self.0754			.session755			.0756			.lock()757			.await758			.execute_expression_to_json(&query)759			.await760			.with_context(|| context(self.0.full_path.as_deref(), &query))761	}762	pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {763		let id = self.0.value.expect("can't use build on not-value");764		let query = format!(":b sess_field_{id}");765		let vid = self766			.0767			.session768			.0769			.lock()770			.await771			.execute_expression_raw(&query, &mut NixHandler::default())772			.await?;773		ensure!(774			!vid.is_empty(),775			"build failed: {}",776			context(self.0.full_path.as_deref(), &query),777		);778		let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")779		else {780			panic!("unexpected build output: {vid:?}");781		};782		let outputs = vid783			.split('\n')784			.filter(|v| !v.is_empty())785			.map(|v| v.split_once(" -> ").expect("unexpected build output"))786			.map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))787			.collect();788		Ok(outputs)789	}790}791impl Drop for FieldInner {792	fn drop(&mut self) {793		if let Some(id) = self.value {794			if let Ok(mut lock) = self.session.0.try_lock() {795				lock.free_list.push(id)796			}797			// Leaked798		}799	}800}801struct NixSessionPoolInner {802	flake: OsString,803	nix_args: Vec<OsString>,804}805806#[derive(Debug)]807pub struct NixPoolError(anyhow::Error);808impl From<anyhow::Error> for NixPoolError {809	fn from(value: anyhow::Error) -> Self {810		Self(value)811	}812}813impl std::error::Error for NixPoolError {}814impl std::fmt::Display for NixPoolError {815	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {816		self.0.fmt(f)817	}818}819impl r2d2::ManageConnection for NixSessionPoolInner {820	type Connection = NixSessionInner;821	type Error = NixPoolError;822	fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {823		let _v = TOKIO_RUNTIME824			.get()825			.expect("missed tokio runtime init!")826			.enter();827		Ok(futures::executor::block_on(NixSessionInner::new(828			self.flake.as_os_str(),829			self.nix_args.iter().map(OsString::as_os_str),830		))?)831	}832833	fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {834		let _v = TOKIO_RUNTIME835			.get()836			.expect("missed tokio runtime init!")837			.enter();838		let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;839		if res != 4 {840			return Err(anyhow!("sanity check failed").into());841		};842		Ok(())843	}844845	fn has_broken(&self, _conn: &mut Self::Connection) -> bool {846		false847	}848}849pub struct NixSessionPool(Pool<NixSessionPoolInner>);850impl NixSessionPool {851	pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {852		let inner = tokio::task::block_in_place(|| {853			r2d2::Builder::<NixSessionPoolInner>::new()854				.min_idle(Some(0))855				.build(NixSessionPoolInner { flake, nix_args })856		})?;857		Ok(Self(inner))858	}859	pub async fn get(&self) -> Result<NixSession> {860		let v = tokio::task::block_in_place(|| self.0.get())?;861		Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))862	}863}864865pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
 use std::{env::current_dir, time::Duration};
 
 use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
 use crate::nix_go;
 use anyhow::{anyhow, Result};
 use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
 use tokio::{task::LocalSet, time::sleep};
 use tracing::{error, field, info, info_span, warn, Instrument};
 
@@ -112,12 +112,12 @@
 	current: bool,
 	datetime: String,
 }
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
-	let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+	let mut cmd = host.cmd("nix-env").await?;
 	cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 		.arg("--list-generations");
 	// Sudo is required due to --list-generations acquiring lock on the profile.
-	let data = config.run_string_on(host, cmd, true).await?;
+	let data = cmd.sudo().run_string().await?;
 	let generations = data
 		.split('\n')
 		.map(|e| e.trim())
@@ -161,25 +161,12 @@
 		.map_err(|_e| anyhow!("bad list-generations output"))?
 		.ok_or_else(|| anyhow!("failed to find generation"))?;
 	Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
-	let mut cmd = MyCommand::new("systemctl");
-	cmd.arg("stop").arg(unit);
-	config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
-	let mut cmd = MyCommand::new("systemctl");
-	cmd.arg("start").arg(unit);
-	config.run_on(host, cmd, true).await
 }
 
 async fn execute_upload(
 	build: &BuildSystems,
-	config: &Config,
 	action: UploadAction,
-	host: &str,
+	host: &ConfigHost,
 	built: PathBuf,
 ) -> Result<()> {
 	let mut failed = false;
@@ -191,15 +178,15 @@
 	if !build.disable_rollback {
 		let _span = info_span!("preparing").entered();
 		info!("preparing for rollback");
-		let generation = get_current_generation(config, host).await?;
+		let generation = get_current_generation(&host).await?;
 		info!(
 			"rollback target would be {} {}",
 			generation.id, generation.datetime
 		);
 		{
-			let mut cmd = MyCommand::new("sh");
+			let mut cmd = host.cmd("sh").await?;
 			cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
-			if let Err(e) = config.run_on(host, cmd, true).await {
+			if let Err(e) = cmd.sudo().run().await {
 				error!("failed to set rollback marker: {e}");
 				failed = true;
 			}
@@ -215,37 +202,41 @@
 		// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
 		// Anyway, reboot will still help in this case.
 		if action.should_schedule_rollback_run() {
-			let mut cmd = MyCommand::new("systemd-run");
+			let mut cmd = host.cmd("systemd-run").await?;
 			cmd.comparg("--on-active", "3min")
 				.comparg("--unit", "rollback-watchdog-run")
 				.arg("systemctl")
 				.arg("start")
 				.arg("rollback-watchdog.service");
-			if let Err(e) = config.run_on(host, cmd, true).await {
+			if let Err(e) = cmd.sudo().run().await {
 				error!("failed to schedule rollback run: {e}");
 				failed = true;
 			}
 		}
 	}
+
 	if action.should_switch_profile() && !failed {
 		info!("switching generation");
-		let mut cmd = MyCommand::new("nix-env");
+		let mut cmd = host.cmd("nix-env").await?;
 		cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 			.comparg("--set", &built);
-		if let Err(e) = config.run_on(host, cmd, true).await {
+		if let Err(e) = cmd.sudo().run().await {
 			error!("failed to switch generation: {e}");
 			failed = true;
 		}
 	}
+
+	// FIXME: Connection might be disconnected after activation run
+
 	if action.should_activate() && !failed {
 		let _span = info_span!("activating").entered();
 		info!("executing activation script");
 		let mut switch_script = built.clone();
 		switch_script.push("bin");
 		switch_script.push("switch-to-configuration");
-		let mut cmd = MyCommand::new(switch_script);
+		let mut cmd = host.cmd(switch_script).await?;
 		cmd.arg(action.name());
-		if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+		if let Err(e) = cmd.sudo().run().in_current_span().await {
 			error!("failed to activate: {e}");
 			failed = true;
 		}
@@ -253,7 +244,8 @@
 	if !build.disable_rollback {
 		if failed {
 			info!("executing rollback");
-			if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+			if let Err(e) = host
+				.systemctl_start("rollback-watchdog.service")
 				.instrument(info_span!("rollback"))
 				.await
 			{
@@ -261,27 +253,29 @@
 			}
 		} else {
 			info!("trying to mark upgrade as successful");
-			let mut cmd = MyCommand::new("rm");
-			cmd.arg("-f").arg("/etc/fleet_rollback_marker");
-			if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+			if let Err(e) = host
+				.rm_file("/etc/fleet_rollback_marker", true)
+				.in_current_span()
+				.await
+			{
 				error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
 			}
 		}
 		info!("disarming watchdog, just in case");
-		if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+		if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
 			// It is ok, if there was no reboot - then timer might not be running.
 		}
 		if action.should_schedule_rollback_run() {
-			if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+			if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
 				error!("failed to disarm rollback run: {e}");
 			}
 		}
-	} else {
-		let mut cmd = MyCommand::new("rm");
-		cmd.arg("-f").arg("/etc/fleet_rollback_marker");
-		if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
-			// Marker might not exist, yet better try to remove it.
-		}
+	} else if let Err(_e) = host
+		.rm_file("/etc/fleet_rollback_marker", true)
+		.in_current_span()
+		.await
+	{
+		// Marker might not exist, yet better try to remove it.
 	}
 	Ok(())
 }
@@ -289,12 +283,13 @@
 impl BuildSystems {
 	async fn build_task(self, config: Config, host: String) -> Result<()> {
 		info!("building");
+		let host = config.host(&host).await?;
 		let action = Action::from(self.subcommand.clone());
 		let fleet_field = &config.fleet_field;
 		let drv = nix_go!(
 			fleet_field.buildSystems(Obj {
 				localSystem: { config.local_system.clone() }
-			})[{ action.build_attr() }][{ host }]
+			})[{ action.build_attr() }][{ &host.name }]
 		);
 		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
 
 		match action {
 			Action::Upload { action } => {
-				if !config.is_local(&host) {
+				if !config.is_local(&host.name) {
 					info!("uploading system closure");
 					{
+						// TODO: Move to remote_derivation method.
 						// Alternatively, nix store make-content-addressed can be used,
 						// at least for the first deployment, to provide trusted store key.
 						//
@@ -329,13 +325,11 @@
 					}
 					let mut tries = 0;
 					loop {
-						let mut nix = MyCommand::new("nix");
-						nix.arg("copy")
-							.arg("--substitute-on-destination")
-							.comparg("--to", format!("ssh-ng://{host}"))
-							.arg(out_output);
-						match nix.run_nix().await {
-							Ok(()) => break,
+						match host.remote_derivation(out_output).await {
+							Ok(remote) => {
+								assert!(&remote == out_output, "CA derivations aren't implemented");
+								break;
+							}
 							Err(e) if tries < 3 => {
 								tries += 1;
 								warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
 					}
 				}
 				if let Some(action) = action {
-					execute_upload(&self, &config, action, &host, out_output.clone()).await?
+					execute_upload(&self, action, &host, out_output.clone()).await?
 				}
 			}
 			Action::Package(PackageAction::SdImage) => {
 				let mut out = current_dir()?;
-				out.push(format!("sd-image-{}", host));
+				out.push(format!("sd-image-{}", host.name));
 
 				info!("linking sd image to {:?}", out);
 				symlink(out_output, out)?;
 			}
 			Action::Package(PackageAction::InstallationCd) => {
 				let mut out = current_dir()?;
-				out.push(format!("installation-cd-{}", host));
+				out.push(format!("installation-cd-{}", host.name));
 
 				info!("linking iso image to {:?}", out);
 				symlink(out_output, out)?;
@@ -379,6 +373,17 @@
 			let this = this.clone();
 			let span = info_span!("deployment", host = field::display(&host.name));
 			let hostname = host.name;
+			// FIXME: Since the introduction of better-nix-eval,
+			// due to single repl used for builds, hosts are waiting for each other to build,
+			// instead of building concurrently.
+			//
+			// Open multiple repls?
+			//
+			// Create build batcher, which will behave similar to golangs
+			// WaitGroup, and start executing once all the build tasks are scheduled?
+			// This also allows to cleanup build output, as there will be no longer
+			// "waiting for remote machine" messages in the cases when one package is needed for
+			// multiple hosts.
 			set.spawn_local(
 				(async move {
 					match this.build_task(config, hostname).await {
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
 use anyhow::{anyhow, bail, ensure, Context, Result};
 use chrono::{DateTime, Utc};
 use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
 use itertools::Itertools;
 use owo_colors::OwoColorize;
 use std::{
@@ -404,9 +404,8 @@
 					target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
 
 				if let Some(data) = secret.secret.secret {
-					let encrypted = config
-						.reencrypt_on_host(identity_holder, data, target_recipients)
-						.await?;
+					let host = config.host(&identity_holder).await?;
+					let encrypted = host.reencrypt(data, target_recipients).await?;
 					secret.secret.secret = Some(encrypted);
 				}
 
@@ -481,9 +480,8 @@
 							target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
 
 						if let Some(secret) = data.secret.secret {
-							let encrypted = config
-								.reencrypt_on_host(identity_holder, secret, target_recipients)
-								.await?;
+							let host = config.host(identity_holder).await?;
+							let encrypted = host.reencrypt(secret, target_recipients).await?;
 
 							data.secret.secret = Some(encrypted);
 						}
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,23 +1,15 @@
-use std::{
-	collections::HashMap,
-	ffi::OsStr,
-	pin,
-	process::Stdio,
-	sync::{Arc, Mutex},
-	task::Poll,
-};
+use std::thread::sleep;
+use std::time::Duration;
+use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
 
 use anyhow::{anyhow, Result};
+use better_command::{Handler, NixHandler, PlainHandler};
 use futures::StreamExt;
 use itertools::Either;
-use once_cell::sync::Lazy;
 use openssh::{OverSsh, OwningCommand, Session};
-use regex::Regex;
-use serde::{de::Visitor, Deserialize};
 use tokio::{io::AsyncRead, process::Command, select};
 use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
-use tracing::{info, info_span, warn, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tracing::{info, debug};
 
 fn escape_bash(input: &str, out: &mut String) {
 	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
@@ -87,9 +79,7 @@
 			return self;
 		}
 		let mut out = Self::new("env");
-		if let Some(session) = self.ssh_session {
-			out = out.ssh_session(session);
-		}
+		out.ssh_session = self.ssh_session;
 		for (k, v) in self.env {
 			assert!(!k.contains('='));
 			out.arg(format!("{k}={v}"));
@@ -179,21 +169,11 @@
 			out
 		} else {
 			let mut out = Self::new("sudo");
+			out.ssh_session = self.ssh_session.take();
 			out.args(self.into_args());
 			out
 		}
 	}
-	pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
-		self.ssh_session = Some(on);
-		self
-	}
-	pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
-		let mut out = Self::new("ssh");
-		out.ssh_session = self.ssh_session.take();
-		out.arg(on).arg("--");
-		out.arg(self.into_string());
-		out
-	}
 
 	pub async fn run(self) -> Result<()> {
 		let str = self.clone().into_string();
@@ -276,259 +256,6 @@
 	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
 	assert!(v.is_none());
 	Ok(())
-}
-
-pub trait Handler: Send {
-	fn handle_line(&mut self, e: &str);
-}
-
-pub struct ClonableHandler<H>(Arc<Mutex<H>>);
-impl<H> Clone for ClonableHandler<H> {
-	fn clone(&self) -> Self {
-		Self(self.0.clone())
-	}
-}
-impl<H> ClonableHandler<H> {
-	pub fn new(inner: H) -> Self {
-		Self(Arc::new(Mutex::new(inner)))
-	}
-}
-impl<H: Handler> Handler for ClonableHandler<H> {
-	fn handle_line(&mut self, e: &str) {
-		self.0.lock().unwrap().handle_line(e)
-	}
-}
-
-struct PlainHandler;
-impl Handler for PlainHandler {
-	fn handle_line(&mut self, e: &str) {
-		info!(target: "log", "{e}");
-	}
-}
-
-pub struct NoopHandler;
-impl Handler for NoopHandler {
-	fn handle_line(&mut self, _e: &str) {}
-}
-
-#[derive(Default)]
-pub struct NixHandler {
-	spans: HashMap<u64, Span>,
-}
-fn process_message(m: &str) -> String {
-	static OSC_CLEANER: Lazy<Regex> =
-		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
-	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
-	let m = OSC_CLEANER.replace_all(m, "");
-	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
-	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
-	DETABBER.replace_all(m.as_ref(), "  ").to_string()
-}
-impl Handler for NixHandler {
-	fn handle_line(&mut self, e: &str) {
-		if let Some(e) = e.strip_prefix("@nix ") {
-			let log: NixLog = match serde_json::from_str(e) {
-				Ok(l) => l,
-				Err(err) => {
-					warn!("failed to parse nix log line {:?}: {}", e, err);
-					return;
-				}
-			};
-			match log {
-				NixLog::Msg { msg, raw_msg, .. } => {
-					#[allow(clippy::nonminimal_bool)]
-					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
-					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
-					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
-						if let Some(raw_msg) = raw_msg {
-							if !msg.is_empty() {
-								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
-							} else {
-								info!(target: "nix", "{}", raw_msg.trim_end())
-							}
-						} else {
-							info!(target: "nix", "{}", msg.trim_end())
-						}
-					}
-				}
-				NixLog::Start {
-					ref fields,
-					typ,
-					id,
-					..
-				} if typ == 105 && !fields.is_empty() => {
-					if let [LogField::String(drv), ..] = &fields[..] {
-						let mut drv = drv.as_str();
-						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-							let mut it = pkg.splitn(2, '-');
-							it.next();
-							if let Some(pkg) = it.next() {
-								drv = pkg;
-							}
-						}
-						info!(target: "nix","building {}", drv);
-						let span = info_span!("build", drv);
-						span.pb_start();
-						self.spans.insert(id, span);
-					} else {
-						warn!("bad build log: {:?}", log)
-					}
-				}
-				NixLog::Start {
-					ref fields,
-					typ,
-					id,
-					..
-				} if typ == 100 && fields.len() >= 3 => {
-					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
-						&fields[..]
-					{
-						let mut drv = drv.as_str();
-
-						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-							let mut it = pkg.splitn(2, '-');
-							it.next();
-							if let Some(pkg) = it.next() {
-								drv = pkg;
-							}
-						}
-						// info!(target: "nix","copying {} {} -> {}", drv, from, to);
-						let span = info_span!("copy", from, to, drv);
-						span.pb_start();
-						self.spans.insert(id, span);
-					} else {
-						warn!("bad copy log: {:?}", log)
-					}
-				}
-				NixLog::Start { text, typ, id, .. }
-					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
-				{
-					if !text.is_empty()
-						&& text != "querying info about missing paths"
-						&& text != "copying 0 paths"
-						// Too much spam on lazy-trees branch
-						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))
-					{
-						let span = info_span!("job");
-						span.pb_start();
-						span.pb_set_message(&process_message(text.trim()));
-						self.spans.insert(id, span);
-						info!(target: "nix", "{}", text);
-					}
-				}
-				NixLog::Start {
-					text,
-					level: 0,
-					typ: 108,
-					..
-				} if text.is_empty() => {
-					// Cache lookup? Coupled with copy log
-				}
-				NixLog::Start {
-					text,
-					level: 4,
-					typ: 109,
-					..
-				} if text.starts_with("querying info about ") => {
-					// Cache lookup
-				}
-				NixLog::Start {
-					text,
-					level: 4,
-					typ: 101,
-					..
-				} if text.starts_with("downloading ") => {
-					// NAR downloading, coupled with copy log
-				}
-				NixLog::Start {
-					text,
-					level: 1,
-					typ: 111,
-					..
-				} if text.starts_with("waiting for a machine to build ") => {
-					// Useless repeating notification about build
-				}
-				NixLog::Start {
-					text,
-					level: 3,
-					typ: 111,
-					..
-				} if text.starts_with("resolved derivation: ") => {
-					// CA resolved
-				}
-				NixLog::Start {
-					text,
-					level: 1,
-					typ: 111,
-					id,
-					..
-				} if text.starts_with("waiting for lock on ") => {
-					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
-					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
-						drv = txt;
-					}
-					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
-						drv = txt;
-					}
-					if let Some(txt) = drv.split("', '").next() {
-						drv = txt;
-					}
-					if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-						let mut it = pkg.splitn(2, '-');
-						it.next();
-						if let Some(pkg) = it.next() {
-							drv = pkg;
-						}
-					}
-					let span = info_span!("waiting on drv", drv);
-					span.pb_start();
-					self.spans.insert(id, span);
-					// Concurrent build of the same message
-				}
-				NixLog::Stop { id, .. } => {
-					self.spans.remove(&id);
-				}
-				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
-					if let Some(span) = self.spans.get(&id) {
-						if let LogField::String(s) = &fields[0] {
-							span.pb_set_message(&process_message(s.trim()));
-						} else {
-							warn!("bad fields: {fields:?}");
-						}
-					} else {
-						warn!("unknown result id: {id} {typ} {fields:?}");
-					}
-					// dbg!(fields, id, typ);
-				}
-				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
-					if let Some(span) = self.spans.get(&id) {
-						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
-							&fields[..4]
-						{
-							span.pb_set_length(*expected);
-							span.pb_set_position(*done);
-						} else {
-							warn!("bad fields: {fields:?}");
-						}
-					} else {
-						// warn!("unknown result id: {id} {typ} {fields:?}");
-						// Unaccounted progress.
-					}
-					// dbg!(fields, id, typ);
-				}
-				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
-					// Set phase, expected
-				}
-				_ => warn!("unknown log: {:?}", log),
-			};
-		} else {
-			let e = e.trim();
-			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
-				return;
-			}
-			info!("{e}")
-		}
-	}
 }
 
 async fn run_nix_inner_raw(
@@ -540,6 +267,7 @@
 ) -> Result<Option<Vec<u8>>> {
 	cmd.stderr(Stdio::piped());
 	cmd.stdout(Stdio::piped());
+	debug!("running command {cmd:?} on local");
 	let mut child = cmd.spawn()?;
 	let mut stderr = child.stderr.take().unwrap();
 	let stdout = child.stdout.take().unwrap();
@@ -600,6 +328,7 @@
 	err_handler: &mut dyn Handler,
 	mut out_handler: Option<&mut dyn Handler>,
 ) -> Result<Option<Vec<u8>>> {
+	debug!("running command {cmd:?} over ssh");
 	cmd.stderr(openssh::Stdio::piped());
 	cmd.stdout(openssh::Stdio::piped());
 	let mut child = cmd.spawn().await?;
@@ -656,77 +385,4 @@
 	}
 
 	Ok(out_buf)
-}
-
-pub trait ErrorRecorder: Send {
-	/// Return true to discard message from logging
-	fn push_message(&mut self, msg: &str) -> bool;
-}
-
-#[derive(Debug)]
-enum LogField {
-	String(String),
-	Num(u64),
-}
-
-impl<'de> Deserialize<'de> for LogField {
-	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
-	where
-		D: serde::Deserializer<'de>,
-	{
-		struct StringOrNum;
-		impl<'de> Visitor<'de> for StringOrNum {
-			type Value = LogField;
-
-			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-				write!(f, "string or unsigned")
-			}
-
-			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
-			where
-				E: serde::de::Error,
-			{
-				Ok(LogField::String(v.to_owned()))
-			}
-
-			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
-			where
-				E: serde::de::Error,
-			{
-				Ok(LogField::Num(v))
-			}
-		}
-
-		deserializer.deserialize_any(StringOrNum)
-	}
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "camelCase", tag = "action")]
-#[allow(dead_code)]
-enum NixLog {
-	Msg {
-		level: u32,
-		msg: String,
-		raw_msg: Option<String>,
-	},
-	Start {
-		id: u64,
-		level: u32,
-		#[serde(default)]
-		fields: Vec<LogField>,
-		text: String,
-		#[serde(rename = "type")]
-		typ: u32,
-	},
-	Stop {
-		id: u64,
-	},
-	Result {
-		id: u64,
-		#[serde(rename = "type")]
-		typ: u32,
-		#[serde(default)]
-		fields: Vec<LogField>,
-	},
 }
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -9,7 +9,6 @@
 	sync::{Arc, Mutex, MutexGuard, OnceLock},
 };
 
-use age::Recipient;
 use anyhow::{anyhow, bail, Context, Result};
 use clap::{ArgGroup, Parser};
 use openssh::SessionBuilder;
@@ -50,10 +49,12 @@
 
 pub struct ConfigHost {
 	pub name: String,
+	pub local: bool,
 	pub session: OnceLock<Arc<openssh::Session>>,
 }
 impl ConfigHost {
-	pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+	async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+		assert!(!self.local, "do not open ssh connection to local session");
 		// FIXME: TOCTOU
 		if let Some(session) = &self.session.get() {
 			return Ok((*session).clone());
@@ -96,8 +97,12 @@
 		D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
 	}
 	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
-		let session = self.open_session().await?;
-		Ok(MyCommand::new_on(cmd, session))
+		if self.local {
+			Ok(MyCommand::new(cmd))
+		} else {
+			let session = self.open_session().await?;
+			Ok(MyCommand::new_on(cmd, session))
+		}
 	}
 
 	pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
@@ -110,8 +115,25 @@
 			.context("failed to call remote host for decrypt")?;
 		z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
 	}
+	pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+		let mut cmd = self.cmd("fleet-install-secrets").await?;
+		cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
+		for target in targets {
+			cmd.eqarg("--targets", target);
+		}
+		let encoded = cmd
+			.sudo()
+			.run_string()
+			.await
+			.context("failed to call remote host for decrypt")?;
+		SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
+	}
 	/// Returns path for futureproofing, as path might change i.e on conversion to CA
 	pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+		if self.local {
+			// Path is located locally, thus already trusted.
+			return Ok(path.to_owned());
+		}
 		let mut nix = MyCommand::new("nix");
 		nix.arg("copy")
 			.arg("--substitute-on-destination")
@@ -120,6 +142,25 @@
 		nix.run_nix().await?;
 		Ok(path.to_owned())
 	}
+	pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+		let mut cmd = self.cmd("systemctl").await?;
+		cmd.arg("stop").arg(name);
+		cmd.sudo().run().await
+	}
+	pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+		let mut cmd = self.cmd("systemctl").await?;
+		cmd.arg("start").arg(name);
+		cmd.sudo().run().await
+	}
+
+	pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+		let mut cmd = self.cmd("rm").await?;
+		cmd.arg("-f").arg(path);
+		if sudo {
+			cmd = cmd.sudo()
+		}
+		cmd.run().await
+	}
 }
 
 impl Config {
@@ -134,35 +175,12 @@
 	}
 	pub fn is_local(&self, host: &str) -> bool {
 		self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
-	}
-
-	pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
-		if sudo {
-			command = command.sudo();
-		}
-		if !self.is_local(host) {
-			command = command.ssh(host);
-		}
-		command.run().await
-	}
-	pub async fn run_string_on(
-		&self,
-		host: &str,
-		mut command: MyCommand,
-		sudo: bool,
-	) -> Result<String> {
-		if sudo {
-			command = command.sudo();
-		}
-		if !self.is_local(host) {
-			command = command.ssh(host);
-		}
-		command.run_string().await
 	}
 
 	pub async fn host(&self, name: &str) -> Result<ConfigHost> {
 		Ok(ConfigHost {
 			name: name.to_owned(),
+			local: self.is_local(name),
 			session: OnceLock::new(),
 		})
 	}
@@ -172,6 +190,7 @@
 		let mut out = vec![];
 		for name in names {
 			out.push(ConfigHost {
+				local: self.is_local(&name),
 				name,
 				session: OnceLock::new(),
 			})
@@ -225,27 +244,6 @@
 		let mut data = self.data_mut();
 		let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
 		host_secrets.insert(secret, value);
-	}
-
-	pub async fn reencrypt_on_host(
-		&self,
-		host: &str,
-		data: SecretData,
-		targets: Vec<String>,
-	) -> Result<SecretData> {
-		let mut recmd = MyCommand::new("fleet-install-secrets");
-		recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
-		for target in targets {
-			recmd.eqarg("--targets", target);
-		}
-		recmd = recmd.sudo().ssh(host);
-		let encoded = recmd
-			.run_string()
-			.await
-			.context("failed to call remote host for decrypt")?
-			.trim()
-			.to_owned();
-		SecretData::decode_z85(&encoded)
 	}
 
 	pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
modifiedcmds/fleet/src/keys.rsdiffbeforeafterboth
--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
 use std::str::FromStr;
 
-use crate::command::MyCommand;
 use crate::host::Config;
 use age::Recipient;
 use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
 			Ok(key)
 		} else {
 			warn!("Loading key for {}", host);
-			let mut cmd = MyCommand::new("cat");
+			let host = self.host(host).await?;
+			let mut cmd = host.cmd("cat").await?;
 			cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
-			let key = self.run_string_on(host, cmd, false).await?;
-			self.update_key(host, key.clone());
+			let key = cmd.run_string().await?;
+			self.update_key(&host.name, key.clone());
 			Ok(key)
 		}
 	}
addedcmds/remowt-agent/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
addedcmds/remowt-agent/README.adocdiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
addedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+	println!("Hello, world!");
+}
addedcrates/better-command/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
addedcrates/better-command/src/handler.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+	fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+	fn clone(&self) -> Self {
+		Self(self.0.clone())
+	}
+}
+impl<H> ClonableHandler<H> {
+	pub fn new(inner: H) -> Self {
+		Self(Arc::new(Mutex::new(inner)))
+	}
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+	fn handle_line(&mut self, e: &str) {
+		self.0.lock().unwrap().handle_line(e)
+	}
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+	fn handle_line(&mut self, e: &str) {
+		info!(target: "log", "{e}");
+	}
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+	fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+	spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+	String(String),
+	Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+	Msg {
+		level: u32,
+		msg: String,
+		raw_msg: Option<String>,
+	},
+	Start {
+		id: u64,
+		level: u32,
+		#[serde(default)]
+		fields: Vec<LogField>,
+		text: String,
+		#[serde(rename = "type")]
+		typ: u32,
+	},
+	Stop {
+		id: u64,
+	},
+	Result {
+		id: u64,
+		#[serde(rename = "type")]
+		typ: u32,
+		#[serde(default)]
+		fields: Vec<LogField>,
+	},
+}
+fn process_message(m: &str) -> String {
+	// Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+	static OSC_CLEANER: Lazy<Regex> =
+		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+	let m = OSC_CLEANER.replace_all(m, "");
+	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+	DETABBER.replace_all(m.as_ref(), "  ").to_string()
+}
+impl Handler for NixHandler {
+	fn handle_line(&mut self, e: &str) {
+		if let Some(e) = e.strip_prefix("@nix ") {
+			let log: NixLog = match serde_json::from_str(e) {
+				Ok(l) => l,
+				Err(err) => {
+					warn!("failed to parse nix log line {:?}: {}", e, err);
+					return;
+				}
+			};
+			match log {
+				NixLog::Msg { msg, raw_msg, .. } => {
+					#[allow(clippy::nonminimal_bool)]
+					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+						if let Some(raw_msg) = raw_msg {
+							if !msg.is_empty() {
+								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+							} else {
+								info!(target: "nix", "{}", raw_msg.trim_end())
+							}
+						} else {
+							info!(target: "nix", "{}", msg.trim_end())
+						}
+					}
+				}
+				NixLog::Start {
+					ref fields,
+					typ,
+					id,
+					..
+				} if typ == 105 && !fields.is_empty() => {
+					if let [LogField::String(drv), ..] = &fields[..] {
+						let mut drv = drv.as_str();
+						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+							let mut it = pkg.splitn(2, '-');
+							it.next();
+							if let Some(pkg) = it.next() {
+								drv = pkg;
+							}
+						}
+						info!(target: "nix","building {}", drv);
+						let span = info_span!("build", drv);
+						span.pb_start();
+						self.spans.insert(id, span);
+					} else {
+						warn!("bad build log: {:?}", log)
+					}
+				}
+				NixLog::Start {
+					ref fields,
+					typ,
+					id,
+					..
+				} if typ == 100 && fields.len() >= 3 => {
+					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+						&fields[..]
+					{
+						let mut drv = drv.as_str();
+
+						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+							let mut it = pkg.splitn(2, '-');
+							it.next();
+							if let Some(pkg) = it.next() {
+								drv = pkg;
+							}
+						}
+						// info!(target: "nix","copying {} {} -> {}", drv, from, to);
+						let span = info_span!("copy", from, to, drv);
+						span.pb_start();
+						self.spans.insert(id, span);
+					} else {
+						warn!("bad copy log: {:?}", log)
+					}
+				}
+				NixLog::Start { text, typ, id, .. }
+					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+				{
+					if !text.is_empty()
+						&& text != "querying info about missing paths"
+						&& text != "copying 0 paths"
+						// Too much spam on lazy-trees branch
+						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))
+					{
+						let span = info_span!("job");
+						span.pb_start();
+						span.pb_set_message(&process_message(text.trim()));
+						self.spans.insert(id, span);
+						info!(target: "nix", "{}", text);
+					}
+				}
+				NixLog::Start {
+					text,
+					level: 0,
+					typ: 108,
+					..
+				} if text.is_empty() => {
+					// Cache lookup? Coupled with copy log
+				}
+				NixLog::Start {
+					text,
+					level: 4,
+					typ: 109,
+					..
+				} if text.starts_with("querying info about ") => {
+					// Cache lookup
+				}
+				NixLog::Start {
+					text,
+					level: 4,
+					typ: 101,
+					..
+				} if text.starts_with("downloading ") => {
+					// NAR downloading, coupled with copy log
+				}
+				NixLog::Start {
+					text,
+					level: 1,
+					typ: 111,
+					..
+				} if text.starts_with("waiting for a machine to build ") => {
+					// Useless repeating notification about build
+				}
+				NixLog::Start {
+					text,
+					level: 3,
+					typ: 111,
+					..
+				} if text.starts_with("resolved derivation: ") => {
+					// CA resolved
+				}
+				NixLog::Start {
+					text,
+					level: 1,
+					typ: 111,
+					id,
+					..
+				} if text.starts_with("waiting for lock on ") => {
+					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+						drv = txt;
+					}
+					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+						drv = txt;
+					}
+					if let Some(txt) = drv.split("', '").next() {
+						drv = txt;
+					}
+					if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+						let mut it = pkg.splitn(2, '-');
+						it.next();
+						if let Some(pkg) = it.next() {
+							drv = pkg;
+						}
+					}
+					let span = info_span!("waiting on drv", drv);
+					span.pb_start();
+					self.spans.insert(id, span);
+					// Concurrent build of the same message
+				}
+				NixLog::Stop { id, .. } => {
+					self.spans.remove(&id);
+				}
+				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+					if let Some(span) = self.spans.get(&id) {
+						if let LogField::String(s) = &fields[0] {
+							span.pb_set_message(&process_message(s.trim()));
+						} else {
+							warn!("bad fields: {fields:?}");
+						}
+					} else {
+						warn!("unknown result id: {id} {typ} {fields:?}");
+					}
+					// dbg!(fields, id, typ);
+				}
+				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+					if let Some(span) = self.spans.get(&id) {
+						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+							&fields[..4]
+						{
+							span.pb_set_length(*expected);
+							span.pb_set_position(*done);
+						} else {
+							warn!("bad fields: {fields:?}");
+						}
+					} else {
+						// warn!("unknown result id: {id} {typ} {fields:?}");
+						// Unaccounted progress.
+					}
+					// dbg!(fields, id, typ);
+				}
+				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+					// Set phase, expected
+				}
+				_ => warn!("unknown log: {:?}", log),
+			};
+		} else {
+			let e = e.trim();
+			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+				return;
+			}
+			info!("{e}")
+		}
+	}
+}
addedcrates/better-command/src/lib.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+	left + right
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+
+	#[test]
+	fn it_works() {
+		let result = add(2, 2);
+		assert_eq!(result, 4);
+	}
+}