git.delta.rocks / jrsonnet / refs/commits / 7c6930a6bff0

difftreelog

refactor remove shell-outs for ssh

Yaroslav Bolyukin2023-12-29parent: #904d121.patch.diff
in: trunk

15 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -319,6 +319,18 @@
 checksum = "d86b93f97252c47b41663388e6d155714a9d0c398b99f1005cbc5f978b29f445"
 
 [[package]]
+name = "better-command"
+version = "0.1.0"
+dependencies = [
+ "once_cell",
+ "regex",
+ "serde",
+ "serde_json",
+ "tracing",
+ "tracing-indicatif",
+]
+
+[[package]]
 name = "bitflags"
 version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -748,6 +760,7 @@
  "anyhow",
  "async-trait",
  "base64 0.21.5",
+ "better-command",
  "chrono",
  "clap",
  "futures",
@@ -1510,9 +1523,9 @@
 
 [[package]]
 name = "once_cell"
-version = "1.18.0"
+version = "1.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
 
 [[package]]
 name = "opaque-debug"
@@ -1922,6 +1935,10 @@
 checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
 
 [[package]]
+name = "remowt-agent"
+version = "0.1.0"
+
+[[package]]
 name = "rnix"
 version = "0.10.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2105,9 +2122,9 @@
 
 [[package]]
 name = "serde"
-version = "1.0.190"
+version = "1.0.193"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "91d3c334ca1ee894a2c6f6ad698fe8c435b76d504b13d436f0685d648d6d96f7"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
 dependencies = [
  "serde_derive",
 ]
@@ -2123,9 +2140,9 @@
 
 [[package]]
 name = "serde_derive"
-version = "1.0.190"
+version = "1.0.193"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "67c5609f394e5c2bd7fc51efda478004ea80ef42fee983d5c67a65e34f32c0e3"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2134,9 +2151,9 @@
 
 [[package]]
 name = "serde_json"
-version = "1.0.107"
+version = "1.0.108"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65"
+checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b"
 dependencies = [
  "itoa",
  "ryu",
@@ -2527,11 +2544,10 @@
 
 [[package]]
 name = "tracing"
-version = "0.1.37"
+version = "0.1.40"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
 dependencies = [
- "cfg-if",
  "pin-project-lite",
  "tracing-attributes",
  "tracing-core",
@@ -2539,9 +2555,9 @@
 
 [[package]]
 name = "tracing-attributes"
-version = "0.1.26"
+version = "0.1.27"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
+checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -2550,9 +2566,9 @@
 
 [[package]]
 name = "tracing-core"
-version = "0.1.31"
+version = "0.1.32"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
+checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
 dependencies = [
  "once_cell",
  "valuable",
@@ -2560,9 +2576,9 @@
 
 [[package]]
 name = "tracing-indicatif"
-version = "0.3.5"
+version = "0.3.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "57e05fe4a1c906d94b275d8aeb8ff8b9deaca502aeb59ae8ab500a92b8032ac8"
+checksum = "069580424efe11d97c3fef4197fa98c004fa26672cc71ad8770d224e23b1951d"
 dependencies = [
  "indicatif",
  "tracing",
modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,3 +1,7 @@
 [workspace]
 members = ["crates/*", "cmds/*"]
 resolver = "2"
+
+[workspace.dependencies]
+nixlike = { path = "./crates/nixlike" }
+better-command = { path = "./crates/better-command" }
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -6,6 +6,8 @@
 edition = "2021"
 
 [dependencies]
+nixlike.workspace = true
+better-command.workspace = true
 anyhow = "1.0"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
@@ -15,7 +17,6 @@
 hostname = "0.3.1"
 age-core = "0.9.0"
 peg = "0.8.2"
-nixlike = { path = "../../crates/nixlike" }
 age = { version = "0.9.2", features = ["ssh", "armor"] }
 base64 = "0.21.5"
 chrono = { version = "0.4.31", features = ["serde"] }
modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
before · cmds/fleet/src/better_nix_eval.rs
1use std::collections::HashMap;2use std::ffi::{OsStr, OsString};3use std::fmt::{self, Display};4use std::path::PathBuf;5use std::process::Stdio;6use std::sync::{Arc, OnceLock};78use anyhow::{anyhow, bail, ensure, Context, Result};9use futures::StreamExt;10use itertools::Itertools;11use r2d2::{Pool, PooledConnection};12use serde::de::DeserializeOwned;13use serde::{Deserialize, Serialize};14use tokio::io::AsyncWriteExt;15use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};16use tokio::select;17use tokio::sync::{mpsc, oneshot};18use tokio_util::codec::{FramedRead, LinesCodec};19use tracing::{debug, error, warn, Level};2021use crate::command::{ClonableHandler, Handler, NixHandler, NoopHandler};2223const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2425pub struct NixSessionInner {26	full_delimiter: String,27	nix_handler: ClonableHandler<NixHandler>,28	out: OutputHandler,29	stdin: ChildStdin,30	string_wrapping: (String, String),31	number_wrapping: (String, String),3233	next_id: u32,34	free_list: Vec<u32>,35}36const TRAIN_STRING: &str = "\"TRAIN_STRING\"";37const TRAIN_NUMBER: &str = "13141516";3839#[must_use]40struct ErrorCollector<'i, H> {41	collected: Vec<String>,42	inner: &'i mut H,43}44impl<'i, H> ErrorCollector<'i, H> {45	fn new(inner: &'i mut H) -> Self {46		Self {47			collected: vec![],48			inner,49		}50	}51}52impl<H> ErrorCollector<'_, H> {53	fn handle_line_inner(&mut self, msg: &str) -> bool {54		let Some(msg) = msg.strip_prefix("@nix ") else {55			return false;56		};57		#[derive(Deserialize)]58		struct ErrorAction {59			action: String,60			level: u32,61			msg: String,62		}63		let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {64			return false;65		};66		if act.action != "msg" || act.level != 0 {67			return false;68		}69		self.collected.push(act.msg);70		true71	}72	fn finish(self) -> Result<()> {73		// fn dedent(s: String) -> String {74		// 	s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)75		// }76		if !self.collected.is_empty() {77			bail!(78				"{}",79				self.collected80					.iter()81					.map(|v| {82						if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {83							let v = unindent::unindent(f.trim_start());84							v.trim().to_owned()85						} else {86							v.to_owned()87						}88					})89					.join("\n")90			);91		}92		Ok(())93	}94	fn flush(self) {95		for line in self.collected {96			warn!("{line}");97		}98	}99}100impl<H: Handler> Handler for ErrorCollector<'_, H> {101	fn handle_line(&mut self, e: &str) {102		if self.handle_line_inner(e) {103			return;104		}105		self.inner.handle_line(e)106	}107}108109enum OutputLine {110	Out(String),111	Err(String),112}113struct OutputHandler {114	rx: mpsc::Receiver<OutputLine>,115	_cancel_handle: oneshot::Receiver<()>,116}117impl OutputHandler {118	fn new(out: ChildStdout, err: ChildStderr) -> Self {119		let mut out = FramedRead::new(out, LinesCodec::new());120		let mut err = FramedRead::new(err, LinesCodec::new());121		let (tx, rx) = mpsc::channel(20);122		let (mut cancelled, _cancel_handle) = oneshot::channel();123		tokio::spawn(async move {124			loop {125				select! {126					// We should receive errors earlier than synchronization127					biased;128					e = err.next() => {129						let Some(Ok(e)) = e else {130							if e.is_some() {131								error!("bad repl stderr: {e:?}");132							}133							continue;134						};135						let _ = tx.send(OutputLine::Err(e)).await;136					}137					o = out.next() => {138						let Some(Ok(o)) = o else {139							if o.is_some() {140								error!("bad repl stdout: {o:?}");141							}142							continue;143						};144						let _ = tx.send(OutputLine::Out(o)).await;145					}146					// Reader doesn't care about stdout, as this is cancelled.147					// Error still might be useful, to process leftover span closures?148					_ = cancelled.closed() => {149						break;150					}151				}152			}153		});154		Self { rx, _cancel_handle }155	}156	async fn next(&mut self) -> Option<OutputLine> {157		self.rx.recv().await158	}159}160161struct WarnHandler;162impl Handler for WarnHandler {163	fn handle_line(&mut self, e: &str) {164		warn!(target: "nix", "{e}")165	}166}167168impl NixSessionInner {169	async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {170		let mut cmd = Command::new("nix");171		cmd.arg("repl")172			.arg(flake)173			.arg("--log-format")174			.arg("internal-json");175		for arg in extra_args {176			cmd.arg(arg);177		}178		cmd.stdin(Stdio::piped());179		cmd.stdout(Stdio::piped());180		cmd.stderr(Stdio::piped());181		let cmd = cmd.spawn()?;182		let stdout = cmd.stdout.unwrap();183		let stderr = cmd.stderr.unwrap();184		let mut out = OutputHandler::new(stdout, stderr);185		let mut stdin = cmd.stdin.unwrap();186		// Standard repl hello doesn't work with internal-json logger187		stdin.write_all(REPL_DELIMITER.as_bytes()).await?;188		stdin.write_all(b"\n").await?;189		stdin.flush().await?;190		let nix_handler = NixHandler::default();191		let mut full_delimiter = None;192		let mut errors = vec![];193		while let Some(line) = out.next().await {194			let line = match line {195				OutputLine::Out(o) => o,196				OutputLine::Err(_e) => {197					// Handle startup errors, but skip repl hello?198					errors.push(_e);199					continue;200				}201			};202			if line.contains(REPL_DELIMITER) {203				debug!("discovered repl delimiter with added colors: {line}");204				full_delimiter = Some(line.to_owned());205				break;206			}207		}208		let Some(full_delimiter) = full_delimiter else {209			for e in errors {210				error!("{e}");211			}212			bail!("failed to discover delimiter");213		};214		let mut res = Self {215			full_delimiter,216			nix_handler: ClonableHandler::new(nix_handler),217			out,218			stdin,219			string_wrapping: Default::default(),220			number_wrapping: Default::default(),221222			next_id: 0,223			free_list: vec![],224		};225		res.train().await?;226		Ok(res)227	}228	async fn train(&mut self) -> Result<()> {229		{230			let full_string = self231				.execute_expression_raw(TRAIN_STRING, &mut NoopHandler)232				.await?;233			let string_offset = full_string.find(TRAIN_STRING).expect("contained");234			let string_prefix = &full_string[..string_offset];235			let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];236			self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());237		}238		{239			let full_number = self240				.execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)241				.await?;242			let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");243			let number_prefix = &full_number[..number_offset];244			let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];245			self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());246		}247		Ok(())248	}249	async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {250		if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {251			let cmd_str = String::from_utf8_lossy(cmd.as_ref());252			tracing::debug!("{cmd_str}");253		};254		self.stdin.write_all(cmd.as_ref()).await?;255		self.stdin.write_all(b"\n").await?;256		Ok(())257	}258	async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {259		let mut out = String::new();260		while let Some(line) = self.out.next().await {261			let line = match line {262				OutputLine::Out(out) => out,263				OutputLine::Err(err) => {264					err_handler.handle_line(&err);265					continue;266				}267			};268			if line == self.full_delimiter {269				return Ok(out);270			}271			if !out.is_empty() {272				out.push('\n');273			}274			out.push_str(&line);275		}276		bail!("didn't reached delimiter");277	}278	async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {279		let num = self.number_wrapping.clone();280		let n = self.execute_expression_wrapping(expr, &num).await?;281		Ok(n.parse::<u64>()?)282	}283	async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {284		let num = self.string_wrapping.clone();285		let n = self.execute_expression_wrapping(expr, &num).await?;286		let str: String = serde_json::from_str(&n)?;287		Ok(str)288	}289	async fn execute_expression_to_json<V: DeserializeOwned>(290		&mut self,291		expr: impl AsRef<[u8]>,292	) -> Result<V> {293		let mut fexpr = b"builtins.toJSON (".to_vec();294		fexpr.extend_from_slice(expr.as_ref());295		fexpr.push(b')');296		let v = self.execute_expression_string(fexpr).await?;297		Ok(serde_json::from_str(&v)?)298	}299	async fn execute_expression_wrapping(300		&mut self,301		expr: impl AsRef<[u8]>,302		wrapping: &(String, String),303	) -> Result<String> {304		let mut nix_handler = self.nix_handler.clone();305		let mut collected = ErrorCollector::new(&mut nix_handler);306		let res = self.execute_expression_raw(expr, &mut collected).await?;307		if res.is_empty() {308			collected.finish()?;309			bail!("expected expression, got nothing")310		} else {311			collected.flush()312		};313		let Some(res) = res.strip_prefix(&wrapping.0) else {314			bail!("invalid type")315		};316		let Some(res) = res.strip_suffix(&wrapping.1) else {317			bail!("invalid type")318		};319		Ok(res.to_owned())320	}321	async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {322		let mut nix_handler = self.nix_handler.clone();323		let mut collected = ErrorCollector::new(&mut nix_handler);324		let v = self.execute_expression_raw(expr, &mut collected).await?;325		collected.finish()?;326		ensure!(v.is_empty(), "unexpected expression result");327		Ok(())328	}329	async fn execute_expression_raw(330		&mut self,331		expr: impl AsRef<[u8]>,332		err_handler: &mut dyn Handler,333	) -> Result<String> {334		self.send_command(expr).await?;335		// It will be echoed336		self.send_command(REPL_DELIMITER).await?;337		self.read_until_delimiter(err_handler).await338	}339	async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {340		let id = self.allocate_id();341		self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))342			.await?;343		Ok(id)344	}345346	/// Id should be immediately used347	fn allocate_id(&mut self) -> u32 {348		if let Some(free) = self.free_list.pop() {349			free350		} else {351			let v = self.next_id;352			self.next_id += 1;353			v354		}355	}356	// Nix has no way to deallocate variable, yet GC will correct everything not reachable.357	// async fn free_id(&mut self, id: u32) -> Result<()> {358	// 	self.execute_expression_empty(format!("sess_field_{id} = null"))359	// 		.await?;360	// 	self.free_list.push(id);361	// 	Ok(())362	// }363}364365#[derive(Clone)]366pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);367368#[derive(Clone)]369pub struct NixExprBuilder {370	out: String,371	used_fields: Vec<Field>,372}373impl NixExprBuilder {374	pub fn object() -> Self {375		NixExprBuilder {376			out: "{ ".to_owned(),377			used_fields: Vec::new(),378		}379	}380	pub fn string(s: &str) -> Self {381		NixExprBuilder {382			out: nixlike::serialize(s)383				.expect("no problems with serializing_string")384				.trim_end()385				.to_owned(),386			used_fields: Vec::new(),387		}388	}389	pub fn serialized(v: impl Serialize) -> Self {390		let serialized = nixlike::serialize(v).expect("invalid value for apply");391		Self {392			out: serialized.trim_end().to_owned(),393			used_fields: Vec::new(),394		}395	}396	pub fn field(f: Field) -> Self {397		Self {398			out: format!("sess_field_{}", f.0.value.expect("no value")),399			used_fields: vec![f],400		}401	}402	pub fn end_obj(&mut self) {403		self.out.push('}');404	}405	pub fn obj_key(&mut self, name: Self, value: Self) {406		self.out.push_str(r#""${"#);407		self.extend(name);408		self.out.push_str(r#"}" = "#);409		self.extend(value);410		self.out.push_str("; ");411	}412413	pub fn extend(&mut self, e: Self) {414		self.out.push_str(&e.out);415		self.used_fields.extend(e.used_fields);416	}417418	pub fn session(&self) -> NixSession {419		let mut session = None;420		for ele in &self.used_fields {421			if session.is_none() {422				session = Some(ele.0.session.clone());423				continue;424			}425			let session = &session.as_ref().expect("checked").0;426			let ele_sess = &ele.0.session.0;427			assert!(428				Arc::ptr_eq(session, ele_sess),429				"can't mix fields from different session"430			);431		}432		session.expect("expr without fields used")433	}434	pub fn index_attr(&mut self, s: &str) {435		let escaped = nixlike::serialize(s).expect("string");436		self.out.push('.');437		self.out.push_str(escaped.trim_end());438	}439}440441#[macro_export]442macro_rules! nix_expr_inner {443	(Obj { $($ident:ident: $($val:tt)+),* $(,)? }) => {{444		use $crate::better_nix_eval::NixExprBuilder;445		let mut out = NixExprBuilder::object();446		$(447			out.obj_key(448				NixExprBuilder::string(stringify!($ident)),449				$crate::nix_expr_inner!($($val)+),450			);451		)*452		out.end_obj();453		out454	}};455	(@field($o:ident) . $var:ident $($tt:tt)*) => {{456		$o.index_attr(stringify!($var));457		nix_expr_inner!(@field($o) $($tt)*);458	}};459	(@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{460		$o.push(Index::attr(&$v));461		nix_expr_inner!(@o($o) $($tt)*);462	}};463	(@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{464		$o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));465		nix_expr_inner!(@o($o) $($tt)*);466	}};467	(@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {468		$o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));469		nix_expr_inner!(@o($o) $($tt)*);470	};471	(@field($o:ident)) => {};472	($field:ident $($tt:tt)*) => {{473		use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};474		#[allow(unused_mut, reason = "might be used if indexed")]475		let mut out = NixExprBuilder::field($field.clone());476		nix_expr_inner!(@field(out) $($tt)*);477		out478	}};479	($v:literal) => {{480		use $crate::better_nix_eval::NixExprBuilder;481		NixExprBuilder::string($v)482	}};483	({$v:expr}) => {{484		use $crate::better_nix_eval::NixExprBuilder;485		NixExprBuilder::serialized(&$v)486	}}487}488#[macro_export]489macro_rules! nix_expr {490	($($tt:tt)+) => {{491		use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};492		let expr = nix_expr_inner!($($tt)+);493		Field::new(expr.session(), expr.out)494	}};495}496497#[macro_export]498macro_rules! nix_go {499	(@o($o:ident) . $var:ident $($tt:tt)*) => {{500		$o.push(Index::attr(stringify!($var)));501		nix_go!(@o($o) $($tt)*);502	}};503	(@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{504		$o.push(Index::attr(&$v));505		nix_go!(@o($o) $($tt)*);506	}};507	(@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{508		$o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));509		nix_go!(@o($o) $($tt)*);510	}};511	(@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {512		$o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));513		nix_go!(@o($o) $($tt)*);514	};515	(@o($o:ident)) => {};516	($field:ident $($tt:tt)+) => {{517		use $crate::{nix_go, better_nix_eval::Index};518		let field = $field.clone();519		let mut out = vec![];520		nix_go!(@o(out) $($tt)*);521		field.select(out).await?522	}}523}524#[macro_export]525macro_rules! nix_go_json {526	($($tt:tt)*) => {{527		$crate::nix_go!($($tt)*).as_json().await?528	}};529}530531#[derive(Clone)]532pub enum Index {533	Var(String),534	String(String),535	Apply(String),536	Expr(NixExprBuilder),537	ExprApply(NixExprBuilder),538}539impl Index {540	pub fn var(v: impl AsRef<str>) -> Self {541		let v = v.as_ref();542		assert!(543			!(v.contains('.') | v.contains(' ')),544			"bad variable name: {v}"545		);546		Self::Var(v.to_owned())547	}548	pub fn attr(v: impl AsRef<str>) -> Self {549		Self::String(v.as_ref().to_owned())550	}551	pub fn apply(v: impl Serialize) -> Self {552		let serialized = nixlike::serialize(v).expect("invalid value for apply");553		Self::Apply(serialized.trim_end().to_owned())554	}555}556impl Display for Index {557	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {558		match self {559			Index::Var(v) => {560				write!(f, "{v}")561			}562			Index::String(k) => {563				let v = nixlike::format_identifier(k.as_str());564				write!(f, ".{v}")565			}566			Index::Apply(o) => {567				write!(f, "<apply>({o})")568			}569			Index::Expr(e) => {570				write!(f, "[{}]", e.out)571			}572			Index::ExprApply(e) => {573				write!(f, "<apply>({})", e.out)574			}575		}576	}577}578impl fmt::Debug for Index {579	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {580		write!(f, "{self}")581	}582}583struct PathDisplay<'i>(&'i [Index]);584impl Display for PathDisplay<'_> {585	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {586		for i in self.0 {587			write!(f, "{i}")?;588		}589		Ok(())590	}591}592struct FieldInner {593	full_path: Option<Vec<Index>>,594	session: NixSession,595	value: Option<u32>,596}597fn context(full_path: Option<&[Index]>, query: &str) -> String {598	if let Some(full_path) = &full_path {599		format!("full path: {}", PathDisplay(full_path))600	} else {601		format!("query: {query:?}")602	}603}604#[derive(Clone)]605pub struct Field(Arc<FieldInner>);606impl Field {607	fn root(session: NixSession) -> Self {608		Self(Arc::new(FieldInner {609			full_path: Some(vec![]),610			session,611			value: None,612		}))613	}614	async fn new(session: NixSession, query: &str) -> Result<Self> {615		let vid = session616			.0617			.lock()618			.await619			.execute_assign(query)620			.await621			.with_context(|| context(None, query))?;622		Ok(Self(Arc::new(FieldInner {623			full_path: None,624			session,625			value: Some(vid),626		})))627	}628	pub async fn field(session: NixSession, field: &str) -> Result<Self> {629		Self::root(session).select([Index::var(field)]).await630	}631	pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {632		let mut used_fields = Vec::new();633		let mut name = name.into_iter();634635		let mut full_path = self.0.full_path.clone();636		let mut query = if let Some(id) = self.0.value {637			format!("sess_field_{id}")638		} else {639			let first = name.next();640			if let Some(Index::Var(i)) = first {641				if let Some(full_path) = &mut full_path {642					full_path.push(Index::Var(i.clone()));643				}644				i.clone()645			} else {646				panic!("first path item should be variable, got {first:?}")647			}648		};649		for v in name {650			if let Some(full_path) = &mut full_path {651				full_path.push(v.clone());652			}653			match v {654				Index::Var(_) => panic!("var item may only be first"),655				Index::String(s) => {656					let escaped = nixlike::serialize(s)?;657					query.push('.');658					query.push_str(escaped.trim());659				}660				Index::Apply(a) => {661					// In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`662					query = format!("({query} {a})");663				}664				Index::Expr(e) => {665					let index = Field::new(self.0.session.clone(), &e.out).await?;666					used_fields.push(index.clone());667					query.push('.');668					let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));669					query.push_str(&index);670				}671				Index::ExprApply(e) => {672					let index = Field::new(self.0.session.clone(), &e.out).await?;673					used_fields.push(index.clone());674					query.push(' ');675					let index = format!("sess_field_{}", index.0.value.expect("value"));676					query.push_str(&index);677					query = format!("({query})");678				}679			}680		}681682		let vid = self683			.0684			.session685			.0686			.lock()687			.await688			.execute_assign(&query)689			.await690			.with_context(|| {691				if let Some(full_path) = &full_path {692					format!("full path: {}", PathDisplay(full_path))693				} else {694					format!("query: {query:?}")695				}696			})?;697		Ok(Self(Arc::new(FieldInner {698			full_path,699			session: self.0.session.clone(),700			value: Some(vid),701		})))702	}703	pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {704		let id = self.0.value.expect("can't serialize root field");705		let query = format!("sess_field_{id}");706		self.0707			.session708			.0709			.lock()710			.await711			.execute_expression_to_json(&query)712			.await713			.with_context(|| context(self.0.full_path.as_deref(), &query))714	}715	pub async fn has_field(&self, name: &str) -> Result<bool> {716		let id = self.0.value.expect("can't list root fields");717		let key = nixlike::escape_string(name);718		let query = format!("sess_field_{id} ? {key}");719		self.0720			.session721			.0722			.lock()723			.await724			.execute_expression_to_json(&query)725			.await726			.with_context(|| context(self.0.full_path.as_deref(), &query))727	}728	pub async fn list_fields(&self) -> Result<Vec<String>> {729		let id = self.0.value.expect("can't list root fields");730		let query = format!("builtins.attrNames sess_field_{id}");731		self.0732			.session733			.0734			.lock()735			.await736			.execute_expression_to_json(&query)737			.await738			.with_context(|| context(self.0.full_path.as_deref(), &query))739	}740	pub async fn type_of(&self) -> Result<String> {741		let id = self.0.value.expect("can't list root fields");742		let query = format!("builtins.typeOf sess_field_{id}");743		self.0744			.session745			.0746			.lock()747			.await748			.execute_expression_to_json(&query)749			.await750			.with_context(|| context(self.0.full_path.as_deref(), &query))751	}752	pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {753		let id = self.0.value.expect("can't use build on not-value");754		let query = format!(":b sess_field_{id}");755		let vid = self756			.0757			.session758			.0759			.lock()760			.await761			.execute_expression_raw(&query, &mut NixHandler::default())762			.await?;763		ensure!(764			!vid.is_empty(),765			"build failed: {}",766			context(self.0.full_path.as_deref(), &query),767		);768		let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")769		else {770			panic!("unexpected build output: {vid:?}");771		};772		let outputs = vid773			.split('\n')774			.filter(|v| !v.is_empty())775			.map(|v| v.split_once(" -> ").expect("unexpected build output"))776			.map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))777			.collect();778		Ok(outputs)779	}780}781impl Drop for FieldInner {782	fn drop(&mut self) {783		if let Some(id) = self.value {784			if let Ok(mut lock) = self.session.0.try_lock() {785				lock.free_list.push(id)786			}787			// Leaked788		}789	}790}791struct NixSessionPoolInner {792	flake: OsString,793	nix_args: Vec<OsString>,794}795796#[derive(Debug)]797pub struct NixPoolError(anyhow::Error);798impl From<anyhow::Error> for NixPoolError {799	fn from(value: anyhow::Error) -> Self {800		Self(value)801	}802}803impl std::error::Error for NixPoolError {}804impl std::fmt::Display for NixPoolError {805	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {806		self.0.fmt(f)807	}808}809impl r2d2::ManageConnection for NixSessionPoolInner {810	type Connection = NixSessionInner;811	type Error = NixPoolError;812	fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {813		let _v = TOKIO_RUNTIME814			.get()815			.expect("missed tokio runtime init!")816			.enter();817		Ok(futures::executor::block_on(NixSessionInner::new(818			self.flake.as_os_str(),819			self.nix_args.iter().map(OsString::as_os_str),820		))?)821	}822823	fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {824		let _v = TOKIO_RUNTIME825			.get()826			.expect("missed tokio runtime init!")827			.enter();828		let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;829		if res != 4 {830			return Err(anyhow!("sanity check failed").into());831		};832		Ok(())833	}834835	fn has_broken(&self, _conn: &mut Self::Connection) -> bool {836		false837	}838}839pub struct NixSessionPool(Pool<NixSessionPoolInner>);840impl NixSessionPool {841	pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {842		let inner = tokio::task::block_in_place(|| {843			r2d2::Builder::<NixSessionPoolInner>::new()844				.min_idle(Some(0))845				.build(NixSessionPoolInner { flake, nix_args })846		})?;847		Ok(Self(inner))848	}849	pub async fn get(&self) -> Result<NixSession> {850		let v = tokio::task::block_in_place(|| self.0.get())?;851		Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))852	}853}854855pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();
after · cmds/fleet/src/better_nix_eval.rs
1//! Wrapper around nix repl, which allows to work on nix code, without relying on2//! nix libexpr. I mean, nix libexpr is good, but until it has no C bindings, this is the royal PITA.34use std::collections::HashMap;5use std::ffi::{OsStr, OsString};6use std::fmt::{self, Display};7use std::path::PathBuf;8use std::process::Stdio;9use std::sync::{Arc, OnceLock};1011use anyhow::{anyhow, bail, ensure, Context, Result};12use better_command::{ClonableHandler, NixHandler, Handler, NoopHandler};13use futures::StreamExt;14use itertools::Itertools;15use r2d2::{Pool, PooledConnection};16use serde::de::DeserializeOwned;17use serde::{Deserialize, Serialize};18use tokio::io::AsyncWriteExt;19use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};20use tokio::select;21use tokio::sync::{mpsc, oneshot, Mutex};22use tokio_util::codec::{FramedRead, LinesCodec};23use tracing::{debug, error, warn, Level};2425const REPL_DELIMITER: &str = "\"FLEET_MAGIC_REPL_DELIMITER\"";2627pub struct NixSessionInner {28	full_delimiter: String,29	nix_handler: ClonableHandler<NixHandler>,30	out: OutputHandler,31	stdin: ChildStdin,32	string_wrapping: (String, String),33	number_wrapping: (String, String),3435	executing_command: Arc<Mutex<()>>,3637	next_id: u32,38	free_list: Vec<u32>,39}40const TRAIN_STRING: &str = "\"TRAIN_STRING\"";41const TRAIN_NUMBER: &str = "13141516";4243#[must_use]44struct ErrorCollector<'i, H> {45	collected: Vec<String>,46	inner: &'i mut H,47}48impl<'i, H> ErrorCollector<'i, H> {49	fn new(inner: &'i mut H) -> Self {50		Self {51			collected: vec![],52			inner,53		}54	}55}56impl<H> ErrorCollector<'_, H> {57	fn handle_line_inner(&mut self, msg: &str) -> bool {58		let Some(msg) = msg.strip_prefix("@nix ") else {59			return false;60		};61		#[derive(Deserialize)]62		struct ErrorAction {63			action: String,64			level: u32,65			msg: String,66		}67		let Ok(act) = serde_json::from_str::<ErrorAction>(msg) else {68			return false;69		};70		if act.action != "msg" || act.level != 0 {71			return false;72		}73		self.collected.push(act.msg);74		true75	}76	fn finish(self) -> Result<()> {77		// fn dedent(s: String) -> String {78		// 	s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)79		// }80		if !self.collected.is_empty() {81			bail!(82				"{}",83				self.collected84					.iter()85					.map(|v| {86						if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {87							let v = unindent::unindent(f.trim_start());88							v.trim().to_owned()89						} else {90							v.to_owned()91						}92					})93					.join("\n")94			);95		}96		Ok(())97	}98	fn flush(self) {99		for line in self.collected {100			warn!("{line}");101		}102	}103}104impl<H: Handler> Handler for ErrorCollector<'_, H> {105	fn handle_line(&mut self, e: &str) {106		if self.handle_line_inner(e) {107			return;108		}109		self.inner.handle_line(e)110	}111}112113enum OutputLine {114	Out(String),115	Err(String),116}117struct OutputHandler {118	rx: mpsc::Receiver<OutputLine>,119	_cancel_handle: oneshot::Receiver<()>,120}121impl OutputHandler {122	fn new(out: ChildStdout, err: ChildStderr) -> Self {123		let mut out = FramedRead::new(out, LinesCodec::new());124		let mut err = FramedRead::new(err, LinesCodec::new());125		let (tx, rx) = mpsc::channel(20);126		let (mut cancelled, _cancel_handle) = oneshot::channel();127		tokio::spawn(async move {128			loop {129				select! {130					// We should receive errors earlier than synchronization131					biased;132					e = err.next() => {133						let Some(Ok(e)) = e else {134							if e.is_some() {135								error!("bad repl stderr: {e:?}");136							}137							continue;138						};139						let _ = tx.send(OutputLine::Err(e)).await;140					}141					o = out.next() => {142						let Some(Ok(o)) = o else {143							if o.is_some() {144								error!("bad repl stdout: {o:?}");145							}146							continue;147						};148						let _ = tx.send(OutputLine::Out(o)).await;149					}150					// Reader doesn't care about stdout, as this is cancelled.151					// Error still might be useful, to process leftover span closures?152					_ = cancelled.closed() => {153						break;154					}155				}156			}157		});158		Self { rx, _cancel_handle }159	}160	async fn next(&mut self) -> Option<OutputLine> {161		self.rx.recv().await162	}163}164165struct WarnHandler;166impl Handler for WarnHandler {167	fn handle_line(&mut self, e: &str) {168		warn!(target: "nix", "{e}")169	}170}171172impl NixSessionInner {173	async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {174		let mut cmd = Command::new("nix");175		cmd.arg("repl")176			.arg(flake)177			.arg("--log-format")178			.arg("internal-json");179		for arg in extra_args {180			cmd.arg(arg);181		}182		cmd.stdin(Stdio::piped());183		cmd.stdout(Stdio::piped());184		cmd.stderr(Stdio::piped());185		let cmd = cmd.spawn()?;186		let stdout = cmd.stdout.unwrap();187		let stderr = cmd.stderr.unwrap();188		let mut out = OutputHandler::new(stdout, stderr);189		let mut stdin = cmd.stdin.unwrap();190		// Standard repl hello doesn't work with internal-json logger191		stdin.write_all(REPL_DELIMITER.as_bytes()).await?;192		stdin.write_all(b"\n").await?;193		stdin.flush().await?;194		let nix_handler = NixHandler::default();195		let mut full_delimiter = None;196		let mut errors = vec![];197		while let Some(line) = out.next().await {198			let line = match line {199				OutputLine::Out(o) => o,200				OutputLine::Err(_e) => {201					// Handle startup errors, but skip repl hello?202					errors.push(_e);203					continue;204				}205			};206			if line.contains(REPL_DELIMITER) {207				debug!("discovered repl delimiter with added colors: {line}");208				full_delimiter = Some(line.to_owned());209				break;210			}211		}212		let Some(full_delimiter) = full_delimiter else {213			for e in errors {214				error!("{e}");215			}216			bail!("failed to discover delimiter");217		};218		let mut res = Self {219			full_delimiter,220			nix_handler: ClonableHandler::new(nix_handler),221			out,222			stdin,223			string_wrapping: Default::default(),224			number_wrapping: Default::default(),225226			executing_command: Arc::new(Mutex::new(())),227228			next_id: 0,229			free_list: vec![],230		};231		res.train().await?;232		Ok(res)233	}234	async fn train(&mut self) -> Result<()> {235		{236			let full_string = self237				.execute_expression_raw(TRAIN_STRING, &mut NoopHandler)238				.await?;239			let string_offset = full_string.find(TRAIN_STRING).expect("contained");240			let string_prefix = &full_string[..string_offset];241			let string_suffix = &full_string[string_offset + TRAIN_STRING.len()..];242			self.string_wrapping = (string_prefix.to_owned(), string_suffix.to_owned());243		}244		{245			let full_number = self246				.execute_expression_raw(TRAIN_NUMBER, &mut NoopHandler)247				.await?;248			let number_offset = full_number.find(TRAIN_NUMBER).expect("contained");249			let number_prefix = &full_number[..number_offset];250			let number_suffix = &full_number[number_offset + TRAIN_NUMBER.len()..];251			self.number_wrapping = (number_prefix.to_owned(), number_suffix.to_owned());252		}253		Ok(())254	}255	async fn send_command(&mut self, cmd: impl AsRef<[u8]>) -> Result<()> {256		if tracing::enabled!(Level::DEBUG) && cmd.as_ref() != REPL_DELIMITER.as_bytes() {257			let cmd_str = String::from_utf8_lossy(cmd.as_ref());258			tracing::debug!("{cmd_str}");259		};260		self.stdin.write_all(cmd.as_ref()).await?;261		self.stdin.write_all(b"\n").await?;262		Ok(())263	}264	async fn read_until_delimiter(&mut self, err_handler: &mut dyn Handler) -> Result<String> {265		let mut out = String::new();266		while let Some(line) = self.out.next().await {267			let line = match line {268				OutputLine::Out(out) => out,269				OutputLine::Err(err) => {270					err_handler.handle_line(&err);271					continue;272				}273			};274			if line == self.full_delimiter {275				return Ok(out);276			}277			if !out.is_empty() {278				out.push('\n');279			}280			out.push_str(&line);281		}282		bail!("didn't reached delimiter");283	}284	async fn execute_expression_number(&mut self, expr: impl AsRef<[u8]>) -> Result<u64> {285		let num = self.number_wrapping.clone();286		let n = self.execute_expression_wrapping(expr, &num).await?;287		Ok(n.parse::<u64>()?)288	}289	async fn execute_expression_string(&mut self, expr: impl AsRef<[u8]>) -> Result<String> {290		let num = self.string_wrapping.clone();291		let n = self.execute_expression_wrapping(expr, &num).await?;292		let str: String = serde_json::from_str(&n)?;293		Ok(str)294	}295	async fn execute_expression_to_json<V: DeserializeOwned>(296		&mut self,297		expr: impl AsRef<[u8]>,298	) -> Result<V> {299		let mut fexpr = b"builtins.toJSON (".to_vec();300		fexpr.extend_from_slice(expr.as_ref());301		fexpr.push(b')');302		let v = self.execute_expression_string(fexpr).await?;303		Ok(serde_json::from_str(&v)?)304	}305	async fn execute_expression_wrapping(306		&mut self,307		expr: impl AsRef<[u8]>,308		wrapping: &(String, String),309	) -> Result<String> {310		let mut nix_handler = self.nix_handler.clone();311		let mut collected = ErrorCollector::new(&mut nix_handler);312		let res = self.execute_expression_raw(expr, &mut collected).await?;313		if res.is_empty() {314			collected.finish()?;315			bail!("expected expression, got nothing")316		} else {317			collected.flush()318		};319		let Some(res) = res.strip_prefix(&wrapping.0) else {320			bail!("invalid type")321		};322		let Some(res) = res.strip_suffix(&wrapping.1) else {323			bail!("invalid type")324		};325		Ok(res.to_owned())326	}327	async fn execute_expression_empty(&mut self, expr: impl AsRef<[u8]>) -> Result<()> {328		let mut nix_handler = self.nix_handler.clone();329		let mut collected = ErrorCollector::new(&mut nix_handler);330		let v = self.execute_expression_raw(expr, &mut collected).await?;331		collected.finish()?;332		ensure!(v.is_empty(), "unexpected expression result");333		Ok(())334	}335	async fn execute_expression_raw(336		&mut self,337		expr: impl AsRef<[u8]>,338		err_handler: &mut dyn Handler,339	) -> Result<String> {340		// Prevent two commands from being executed in parallel, messing with each other.341		let _lock = self.executing_command.clone();342		let _guard = _lock.lock().await;343344		self.send_command(expr).await?;345		// It will be echoed346		self.send_command(REPL_DELIMITER).await?;347		self.read_until_delimiter(err_handler).await348	}349	async fn execute_assign(&mut self, expr: impl AsRef<str>) -> Result<u32> {350		let id = self.allocate_id();351		self.execute_expression_empty(format!("sess_field_{id} = {}", expr.as_ref()))352			.await?;353		Ok(id)354	}355356	/// Id should be immediately used357	fn allocate_id(&mut self) -> u32 {358		if let Some(free) = self.free_list.pop() {359			free360		} else {361			let v = self.next_id;362			self.next_id += 1;363			v364		}365	}366	// Nix has no way to deallocate variable, yet GC will correct everything not reachable.367	// async fn free_id(&mut self, id: u32) -> Result<()> {368	// 	self.execute_expression_empty(format!("sess_field_{id} = null"))369	// 		.await?;370	// 	self.free_list.push(id);371	// 	Ok(())372	// }373}374375#[derive(Clone)]376pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);377378#[derive(Clone)]379pub struct NixExprBuilder {380	out: String,381	used_fields: Vec<Field>,382}383impl NixExprBuilder {384	pub fn object() -> Self {385		NixExprBuilder {386			out: "{ ".to_owned(),387			used_fields: Vec::new(),388		}389	}390	pub fn string(s: &str) -> Self {391		NixExprBuilder {392			out: nixlike::serialize(s)393				.expect("no problems with serializing_string")394				.trim_end()395				.to_owned(),396			used_fields: Vec::new(),397		}398	}399	pub fn serialized(v: impl Serialize) -> Self {400		let serialized = nixlike::serialize(v).expect("invalid value for apply");401		Self {402			out: serialized.trim_end().to_owned(),403			used_fields: Vec::new(),404		}405	}406	pub fn field(f: Field) -> Self {407		Self {408			out: format!("sess_field_{}", f.0.value.expect("no value")),409			used_fields: vec![f],410		}411	}412	pub fn end_obj(&mut self) {413		self.out.push('}');414	}415	pub fn obj_key(&mut self, name: Self, value: Self) {416		self.out.push_str(r#""${"#);417		self.extend(name);418		self.out.push_str(r#"}" = "#);419		self.extend(value);420		self.out.push_str("; ");421	}422423	pub fn extend(&mut self, e: Self) {424		self.out.push_str(&e.out);425		self.used_fields.extend(e.used_fields);426	}427428	pub fn session(&self) -> NixSession {429		let mut session = None;430		for ele in &self.used_fields {431			if session.is_none() {432				session = Some(ele.0.session.clone());433				continue;434			}435			let session = &session.as_ref().expect("checked").0;436			let ele_sess = &ele.0.session.0;437			assert!(438				Arc::ptr_eq(session, ele_sess),439				"can't mix fields from different session"440			);441		}442		session.expect("expr without fields used")443	}444	pub fn index_attr(&mut self, s: &str) {445		let escaped = nixlike::serialize(s).expect("string");446		self.out.push('.');447		self.out.push_str(escaped.trim_end());448	}449}450451#[macro_export]452macro_rules! nix_expr_inner {453	(Obj { $($ident:ident: $($val:tt)+),* $(,)? }) => {{454		use $crate::better_nix_eval::NixExprBuilder;455		let mut out = NixExprBuilder::object();456		$(457			out.obj_key(458				NixExprBuilder::string(stringify!($ident)),459				$crate::nix_expr_inner!($($val)+),460			);461		)*462		out.end_obj();463		out464	}};465	(@field($o:ident) . $var:ident $($tt:tt)*) => {{466		$o.index_attr(stringify!($var));467		nix_expr_inner!(@field($o) $($tt)*);468	}};469	(@field($o:ident) [{ $v:expr }] $($tt:tt)*) => {{470		$o.push(Index::attr(&$v));471		nix_expr_inner!(@o($o) $($tt)*);472	}};473	(@field($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{474		$o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));475		nix_expr_inner!(@o($o) $($tt)*);476	}};477	(@field($o:ident) ($($var:tt)*) $($tt:tt)*) => {478		$o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));479		nix_expr_inner!(@o($o) $($tt)*);480	};481	(@field($o:ident)) => {};482	($field:ident $($tt:tt)*) => {{483		use $crate::{better_nix_eval::NixExprBuilder, nix_expr_inner};484		#[allow(unused_mut, reason = "might be used if indexed")]485		let mut out = NixExprBuilder::field($field.clone());486		nix_expr_inner!(@field(out) $($tt)*);487		out488	}};489	($v:literal) => {{490		use $crate::better_nix_eval::NixExprBuilder;491		NixExprBuilder::string($v)492	}};493	({$v:expr}) => {{494		use $crate::better_nix_eval::NixExprBuilder;495		NixExprBuilder::serialized(&$v)496	}}497}498#[macro_export]499macro_rules! nix_expr {500	($($tt:tt)+) => {{501		use $crate::{better_nix_eval::{NixExprBuilder, Field}, nix_expr_inner};502		let expr = nix_expr_inner!($($tt)+);503		Field::new(expr.session(), expr.out)504	}};505}506507#[macro_export]508macro_rules! nix_go {509	(@o($o:ident) . $var:ident $($tt:tt)*) => {{510		$o.push(Index::attr(stringify!($var)));511		nix_go!(@o($o) $($tt)*);512	}};513	(@o($o:ident) [{ $v:expr }] $($tt:tt)*) => {{514		$o.push(Index::attr(&$v));515		nix_go!(@o($o) $($tt)*);516	}};517	(@o($o:ident) [ $($var:tt)+ ] $($tt:tt)*) => {{518		$o.push(Index::Expr($crate::nix_expr_inner!($($var)+)));519		nix_go!(@o($o) $($tt)*);520	}};521	(@o($o:ident) ($($var:tt)*) $($tt:tt)*) => {522		$o.push(Index::ExprApply($crate::nix_expr_inner!($($var)+)));523		nix_go!(@o($o) $($tt)*);524	};525	(@o($o:ident)) => {};526	($field:ident $($tt:tt)+) => {{527		use $crate::{nix_go, better_nix_eval::Index};528		let field = $field.clone();529		let mut out = vec![];530		nix_go!(@o(out) $($tt)*);531		field.select(out).await?532	}}533}534#[macro_export]535macro_rules! nix_go_json {536	($($tt:tt)*) => {{537		$crate::nix_go!($($tt)*).as_json().await?538	}};539}540541#[derive(Clone)]542pub enum Index {543	Var(String),544	String(String),545	Apply(String),546	Expr(NixExprBuilder),547	ExprApply(NixExprBuilder),548}549impl Index {550	pub fn var(v: impl AsRef<str>) -> Self {551		let v = v.as_ref();552		assert!(553			!(v.contains('.') | v.contains(' ')),554			"bad variable name: {v}"555		);556		Self::Var(v.to_owned())557	}558	pub fn attr(v: impl AsRef<str>) -> Self {559		Self::String(v.as_ref().to_owned())560	}561	pub fn apply(v: impl Serialize) -> Self {562		let serialized = nixlike::serialize(v).expect("invalid value for apply");563		Self::Apply(serialized.trim_end().to_owned())564	}565}566impl Display for Index {567	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {568		match self {569			Index::Var(v) => {570				write!(f, "{v}")571			}572			Index::String(k) => {573				let v = nixlike::format_identifier(k.as_str());574				write!(f, ".{v}")575			}576			Index::Apply(o) => {577				write!(f, "<apply>({o})")578			}579			Index::Expr(e) => {580				write!(f, "[{}]", e.out)581			}582			Index::ExprApply(e) => {583				write!(f, "<apply>({})", e.out)584			}585		}586	}587}588impl fmt::Debug for Index {589	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {590		write!(f, "{self}")591	}592}593struct PathDisplay<'i>(&'i [Index]);594impl Display for PathDisplay<'_> {595	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {596		for i in self.0 {597			write!(f, "{i}")?;598		}599		Ok(())600	}601}602struct FieldInner {603	full_path: Option<Vec<Index>>,604	session: NixSession,605	value: Option<u32>,606}607fn context(full_path: Option<&[Index]>, query: &str) -> String {608	if let Some(full_path) = &full_path {609		format!("full path: {}", PathDisplay(full_path))610	} else {611		format!("query: {query:?}")612	}613}614#[derive(Clone)]615pub struct Field(Arc<FieldInner>);616impl Field {617	fn root(session: NixSession) -> Self {618		Self(Arc::new(FieldInner {619			full_path: Some(vec![]),620			session,621			value: None,622		}))623	}624	async fn new(session: NixSession, query: &str) -> Result<Self> {625		let vid = session626			.0627			.lock()628			.await629			.execute_assign(query)630			.await631			.with_context(|| context(None, query))?;632		Ok(Self(Arc::new(FieldInner {633			full_path: None,634			session,635			value: Some(vid),636		})))637	}638	pub async fn field(session: NixSession, field: &str) -> Result<Self> {639		Self::root(session).select([Index::var(field)]).await640	}641	pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {642		let mut used_fields = Vec::new();643		let mut name = name.into_iter();644645		let mut full_path = self.0.full_path.clone();646		let mut query = if let Some(id) = self.0.value {647			format!("sess_field_{id}")648		} else {649			let first = name.next();650			if let Some(Index::Var(i)) = first {651				if let Some(full_path) = &mut full_path {652					full_path.push(Index::Var(i.clone()));653				}654				i.clone()655			} else {656				panic!("first path item should be variable, got {first:?}")657			}658		};659		for v in name {660			if let Some(full_path) = &mut full_path {661				full_path.push(v.clone());662			}663			match v {664				Index::Var(_) => panic!("var item may only be first"),665				Index::String(s) => {666					let escaped = nixlike::serialize(s)?;667					query.push('.');668					query.push_str(escaped.trim());669				}670				Index::Apply(a) => {671					// In cases like `a {}.b` first `{}.b` will be evaluated, so `a {}` should be encased in `()`672					query = format!("({query} {a})");673				}674				Index::Expr(e) => {675					let index = Field::new(self.0.session.clone(), &e.out).await?;676					used_fields.push(index.clone());677					query.push('.');678					let index = format!("${{sess_field_{}}}", index.0.value.expect("value"));679					query.push_str(&index);680				}681				Index::ExprApply(e) => {682					let index = Field::new(self.0.session.clone(), &e.out).await?;683					used_fields.push(index.clone());684					query.push(' ');685					let index = format!("sess_field_{}", index.0.value.expect("value"));686					query.push_str(&index);687					query = format!("({query})");688				}689			}690		}691692		let vid = self693			.0694			.session695			.0696			.lock()697			.await698			.execute_assign(&query)699			.await700			.with_context(|| {701				if let Some(full_path) = &full_path {702					format!("full path: {}", PathDisplay(full_path))703				} else {704					format!("query: {query:?}")705				}706			})?;707		Ok(Self(Arc::new(FieldInner {708			full_path,709			session: self.0.session.clone(),710			value: Some(vid),711		})))712	}713	pub async fn as_json<V: DeserializeOwned>(&self) -> Result<V> {714		let id = self.0.value.expect("can't serialize root field");715		let query = format!("sess_field_{id}");716		self.0717			.session718			.0719			.lock()720			.await721			.execute_expression_to_json(&query)722			.await723			.with_context(|| context(self.0.full_path.as_deref(), &query))724	}725	pub async fn has_field(&self, name: &str) -> Result<bool> {726		let id = self.0.value.expect("can't list root fields");727		let key = nixlike::escape_string(name);728		let query = format!("sess_field_{id} ? {key}");729		self.0730			.session731			.0732			.lock()733			.await734			.execute_expression_to_json(&query)735			.await736			.with_context(|| context(self.0.full_path.as_deref(), &query))737	}738	pub async fn list_fields(&self) -> Result<Vec<String>> {739		let id = self.0.value.expect("can't list root fields");740		let query = format!("builtins.attrNames sess_field_{id}");741		self.0742			.session743			.0744			.lock()745			.await746			.execute_expression_to_json(&query)747			.await748			.with_context(|| context(self.0.full_path.as_deref(), &query))749	}750	pub async fn type_of(&self) -> Result<String> {751		let id = self.0.value.expect("can't list root fields");752		let query = format!("builtins.typeOf sess_field_{id}");753		self.0754			.session755			.0756			.lock()757			.await758			.execute_expression_to_json(&query)759			.await760			.with_context(|| context(self.0.full_path.as_deref(), &query))761	}762	pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {763		let id = self.0.value.expect("can't use build on not-value");764		let query = format!(":b sess_field_{id}");765		let vid = self766			.0767			.session768			.0769			.lock()770			.await771			.execute_expression_raw(&query, &mut NixHandler::default())772			.await?;773		ensure!(774			!vid.is_empty(),775			"build failed: {}",776			context(self.0.full_path.as_deref(), &query),777		);778		let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")779		else {780			panic!("unexpected build output: {vid:?}");781		};782		let outputs = vid783			.split('\n')784			.filter(|v| !v.is_empty())785			.map(|v| v.split_once(" -> ").expect("unexpected build output"))786			.map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))787			.collect();788		Ok(outputs)789	}790}791impl Drop for FieldInner {792	fn drop(&mut self) {793		if let Some(id) = self.value {794			if let Ok(mut lock) = self.session.0.try_lock() {795				lock.free_list.push(id)796			}797			// Leaked798		}799	}800}801struct NixSessionPoolInner {802	flake: OsString,803	nix_args: Vec<OsString>,804}805806#[derive(Debug)]807pub struct NixPoolError(anyhow::Error);808impl From<anyhow::Error> for NixPoolError {809	fn from(value: anyhow::Error) -> Self {810		Self(value)811	}812}813impl std::error::Error for NixPoolError {}814impl std::fmt::Display for NixPoolError {815	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {816		self.0.fmt(f)817	}818}819impl r2d2::ManageConnection for NixSessionPoolInner {820	type Connection = NixSessionInner;821	type Error = NixPoolError;822	fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> {823		let _v = TOKIO_RUNTIME824			.get()825			.expect("missed tokio runtime init!")826			.enter();827		Ok(futures::executor::block_on(NixSessionInner::new(828			self.flake.as_os_str(),829			self.nix_args.iter().map(OsString::as_os_str),830		))?)831	}832833	fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> {834		let _v = TOKIO_RUNTIME835			.get()836			.expect("missed tokio runtime init!")837			.enter();838		let res = futures::executor::block_on(conn.execute_expression_number("2 + 2"))?;839		if res != 4 {840			return Err(anyhow!("sanity check failed").into());841		};842		Ok(())843	}844845	fn has_broken(&self, _conn: &mut Self::Connection) -> bool {846		false847	}848}849pub struct NixSessionPool(Pool<NixSessionPoolInner>);850impl NixSessionPool {851	pub async fn new(flake: OsString, nix_args: Vec<OsString>) -> Result<Self> {852		let inner = tokio::task::block_in_place(|| {853			r2d2::Builder::<NixSessionPoolInner>::new()854				.min_idle(Some(0))855				.build(NixSessionPoolInner { flake, nix_args })856		})?;857		Ok(Self(inner))858	}859	pub async fn get(&self) -> Result<NixSession> {860		let v = tokio::task::block_in_place(|| self.0.get())?;861		Ok(NixSession(Arc::new(tokio::sync::Mutex::new(v))))862	}863}864865pub static TOKIO_RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -3,11 +3,11 @@
 use std::{env::current_dir, time::Duration};
 
 use crate::command::MyCommand;
-use crate::host::Config;
+use crate::host::{Config, ConfigHost};
 use crate::nix_go;
 use anyhow::{anyhow, Result};
 use clap::Parser;
-use itertools::Itertools;
+use itertools::Itertools as _;
 use tokio::{task::LocalSet, time::sleep};
 use tracing::{error, field, info, info_span, warn, Instrument};
 
@@ -112,12 +112,12 @@
 	current: bool,
 	datetime: String,
 }
-async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
-	let mut cmd = MyCommand::new("nix-env");
+async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {
+	let mut cmd = host.cmd("nix-env").await?;
 	cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 		.arg("--list-generations");
 	// Sudo is required due to --list-generations acquiring lock on the profile.
-	let data = config.run_string_on(host, cmd, true).await?;
+	let data = cmd.sudo().run_string().await?;
 	let generations = data
 		.split('\n')
 		.map(|e| e.trim())
@@ -161,25 +161,12 @@
 		.map_err(|_e| anyhow!("bad list-generations output"))?
 		.ok_or_else(|| anyhow!("failed to find generation"))?;
 	Ok(current)
-}
-
-async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {
-	let mut cmd = MyCommand::new("systemctl");
-	cmd.arg("stop").arg(unit);
-	config.run_on(host, cmd, true).await
-}
-
-async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {
-	let mut cmd = MyCommand::new("systemctl");
-	cmd.arg("start").arg(unit);
-	config.run_on(host, cmd, true).await
 }
 
 async fn execute_upload(
 	build: &BuildSystems,
-	config: &Config,
 	action: UploadAction,
-	host: &str,
+	host: &ConfigHost,
 	built: PathBuf,
 ) -> Result<()> {
 	let mut failed = false;
@@ -191,15 +178,15 @@
 	if !build.disable_rollback {
 		let _span = info_span!("preparing").entered();
 		info!("preparing for rollback");
-		let generation = get_current_generation(config, host).await?;
+		let generation = get_current_generation(&host).await?;
 		info!(
 			"rollback target would be {} {}",
 			generation.id, generation.datetime
 		);
 		{
-			let mut cmd = MyCommand::new("sh");
+			let mut cmd = host.cmd("sh").await?;
 			cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
-			if let Err(e) = config.run_on(host, cmd, true).await {
+			if let Err(e) = cmd.sudo().run().await {
 				error!("failed to set rollback marker: {e}");
 				failed = true;
 			}
@@ -215,37 +202,41 @@
 		// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.
 		// Anyway, reboot will still help in this case.
 		if action.should_schedule_rollback_run() {
-			let mut cmd = MyCommand::new("systemd-run");
+			let mut cmd = host.cmd("systemd-run").await?;
 			cmd.comparg("--on-active", "3min")
 				.comparg("--unit", "rollback-watchdog-run")
 				.arg("systemctl")
 				.arg("start")
 				.arg("rollback-watchdog.service");
-			if let Err(e) = config.run_on(host, cmd, true).await {
+			if let Err(e) = cmd.sudo().run().await {
 				error!("failed to schedule rollback run: {e}");
 				failed = true;
 			}
 		}
 	}
+
 	if action.should_switch_profile() && !failed {
 		info!("switching generation");
-		let mut cmd = MyCommand::new("nix-env");
+		let mut cmd = host.cmd("nix-env").await?;
 		cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 			.comparg("--set", &built);
-		if let Err(e) = config.run_on(host, cmd, true).await {
+		if let Err(e) = cmd.sudo().run().await {
 			error!("failed to switch generation: {e}");
 			failed = true;
 		}
 	}
+
+	// FIXME: Connection might be disconnected after activation run
+
 	if action.should_activate() && !failed {
 		let _span = info_span!("activating").entered();
 		info!("executing activation script");
 		let mut switch_script = built.clone();
 		switch_script.push("bin");
 		switch_script.push("switch-to-configuration");
-		let mut cmd = MyCommand::new(switch_script);
+		let mut cmd = host.cmd(switch_script).await?;
 		cmd.arg(action.name());
-		if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+		if let Err(e) = cmd.sudo().run().in_current_span().await {
 			error!("failed to activate: {e}");
 			failed = true;
 		}
@@ -253,7 +244,8 @@
 	if !build.disable_rollback {
 		if failed {
 			info!("executing rollback");
-			if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")
+			if let Err(e) = host
+				.systemctl_start("rollback-watchdog.service")
 				.instrument(info_span!("rollback"))
 				.await
 			{
@@ -261,27 +253,29 @@
 			}
 		} else {
 			info!("trying to mark upgrade as successful");
-			let mut cmd = MyCommand::new("rm");
-			cmd.arg("-f").arg("/etc/fleet_rollback_marker");
-			if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {
+			if let Err(e) = host
+				.rm_file("/etc/fleet_rollback_marker", true)
+				.in_current_span()
+				.await
+			{
 				error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
 			}
 		}
 		info!("disarming watchdog, just in case");
-		if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {
+		if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {
 			// It is ok, if there was no reboot - then timer might not be running.
 		}
 		if action.should_schedule_rollback_run() {
-			if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {
+			if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {
 				error!("failed to disarm rollback run: {e}");
 			}
 		}
-	} else {
-		let mut cmd = MyCommand::new("rm");
-		cmd.arg("-f").arg("/etc/fleet_rollback_marker");
-		if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {
-			// Marker might not exist, yet better try to remove it.
-		}
+	} else if let Err(_e) = host
+		.rm_file("/etc/fleet_rollback_marker", true)
+		.in_current_span()
+		.await
+	{
+		// Marker might not exist, yet better try to remove it.
 	}
 	Ok(())
 }
@@ -289,12 +283,13 @@
 impl BuildSystems {
 	async fn build_task(self, config: Config, host: String) -> Result<()> {
 		info!("building");
+		let host = config.host(&host).await?;
 		let action = Action::from(self.subcommand.clone());
 		let fleet_field = &config.fleet_field;
 		let drv = nix_go!(
 			fleet_field.buildSystems(Obj {
 				localSystem: { config.local_system.clone() }
-			})[{ action.build_attr() }][{ host }]
+			})[{ action.build_attr() }][{ &host.name }]
 		);
 		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
@@ -309,9 +304,10 @@
 
 		match action {
 			Action::Upload { action } => {
-				if !config.is_local(&host) {
+				if !config.is_local(&host.name) {
 					info!("uploading system closure");
 					{
+						// TODO: Move to remote_derivation method.
 						// Alternatively, nix store make-content-addressed can be used,
 						// at least for the first deployment, to provide trusted store key.
 						//
@@ -329,13 +325,11 @@
 					}
 					let mut tries = 0;
 					loop {
-						let mut nix = MyCommand::new("nix");
-						nix.arg("copy")
-							.arg("--substitute-on-destination")
-							.comparg("--to", format!("ssh-ng://{host}"))
-							.arg(out_output);
-						match nix.run_nix().await {
-							Ok(()) => break,
+						match host.remote_derivation(out_output).await {
+							Ok(remote) => {
+								assert!(&remote == out_output, "CA derivations aren't implemented");
+								break;
+							}
 							Err(e) if tries < 3 => {
 								tries += 1;
 								warn!("Copy failure ({}/3): {}", tries, e);
@@ -346,19 +340,19 @@
 					}
 				}
 				if let Some(action) = action {
-					execute_upload(&self, &config, action, &host, out_output.clone()).await?
+					execute_upload(&self, action, &host, out_output.clone()).await?
 				}
 			}
 			Action::Package(PackageAction::SdImage) => {
 				let mut out = current_dir()?;
-				out.push(format!("sd-image-{}", host));
+				out.push(format!("sd-image-{}", host.name));
 
 				info!("linking sd image to {:?}", out);
 				symlink(out_output, out)?;
 			}
 			Action::Package(PackageAction::InstallationCd) => {
 				let mut out = current_dir()?;
-				out.push(format!("installation-cd-{}", host));
+				out.push(format!("installation-cd-{}", host.name));
 
 				info!("linking iso image to {:?}", out);
 				symlink(out_output, out)?;
@@ -379,6 +373,17 @@
 			let this = this.clone();
 			let span = info_span!("deployment", host = field::display(&host.name));
 			let hostname = host.name;
+			// FIXME: Since the introduction of better-nix-eval,
+			// due to single repl used for builds, hosts are waiting for each other to build,
+			// instead of building concurrently.
+			//
+			// Open multiple repls?
+			//
+			// Create build batcher, which will behave similar to golangs
+			// WaitGroup, and start executing once all the build tasks are scheduled?
+			// This also allows to cleanup build output, as there will be no longer
+			// "waiting for remote machine" messages in the cases when one package is needed for
+			// multiple hosts.
 			set.spawn_local(
 				(async move {
 					match this.build_task(config, hostname).await {
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,7 +7,7 @@
 use anyhow::{anyhow, bail, ensure, Context, Result};
 use chrono::{DateTime, Utc};
 use clap::Parser;
-use futures::{StreamExt, TryStreamExt};
+use futures::StreamExt;
 use itertools::Itertools;
 use owo_colors::OwoColorize;
 use std::{
@@ -404,9 +404,8 @@
 					target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
 
 				if let Some(data) = secret.secret.secret {
-					let encrypted = config
-						.reencrypt_on_host(identity_holder, data, target_recipients)
-						.await?;
+					let host = config.host(&identity_holder).await?;
+					let encrypted = host.reencrypt(data, target_recipients).await?;
 					secret.secret.secret = Some(encrypted);
 				}
 
@@ -481,9 +480,8 @@
 							target_recipients.into_iter().collect::<Result<Vec<_>>>()?;
 
 						if let Some(secret) = data.secret.secret {
-							let encrypted = config
-								.reencrypt_on_host(identity_holder, secret, target_recipients)
-								.await?;
+							let host = config.host(identity_holder).await?;
+							let encrypted = host.reencrypt(secret, target_recipients).await?;
 
 							data.secret.secret = Some(encrypted);
 						}
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,23 +1,15 @@
-use std::{
-	collections::HashMap,
-	ffi::OsStr,
-	pin,
-	process::Stdio,
-	sync::{Arc, Mutex},
-	task::Poll,
-};
+use std::thread::sleep;
+use std::time::Duration;
+use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};
 
 use anyhow::{anyhow, Result};
+use better_command::{Handler, NixHandler, PlainHandler};
 use futures::StreamExt;
 use itertools::Either;
-use once_cell::sync::Lazy;
 use openssh::{OverSsh, OwningCommand, Session};
-use regex::Regex;
-use serde::{de::Visitor, Deserialize};
 use tokio::{io::AsyncRead, process::Command, select};
 use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
-use tracing::{info, info_span, warn, Span};
-use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tracing::{info, debug};
 
 fn escape_bash(input: &str, out: &mut String) {
 	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
@@ -87,9 +79,7 @@
 			return self;
 		}
 		let mut out = Self::new("env");
-		if let Some(session) = self.ssh_session {
-			out = out.ssh_session(session);
-		}
+		out.ssh_session = self.ssh_session;
 		for (k, v) in self.env {
 			assert!(!k.contains('='));
 			out.arg(format!("{k}={v}"));
@@ -179,21 +169,11 @@
 			out
 		} else {
 			let mut out = Self::new("sudo");
+			out.ssh_session = self.ssh_session.take();
 			out.args(self.into_args());
 			out
 		}
 	}
-	pub fn ssh_session(mut self, on: Arc<Session>) -> Self {
-		self.ssh_session = Some(on);
-		self
-	}
-	pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {
-		let mut out = Self::new("ssh");
-		out.ssh_session = self.ssh_session.take();
-		out.arg(on).arg("--");
-		out.arg(self.into_string());
-		out
-	}
 
 	pub async fn run(self) -> Result<()> {
 		let str = self.clone().into_string();
@@ -276,259 +256,6 @@
 	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;
 	assert!(v.is_none());
 	Ok(())
-}
-
-pub trait Handler: Send {
-	fn handle_line(&mut self, e: &str);
-}
-
-pub struct ClonableHandler<H>(Arc<Mutex<H>>);
-impl<H> Clone for ClonableHandler<H> {
-	fn clone(&self) -> Self {
-		Self(self.0.clone())
-	}
-}
-impl<H> ClonableHandler<H> {
-	pub fn new(inner: H) -> Self {
-		Self(Arc::new(Mutex::new(inner)))
-	}
-}
-impl<H: Handler> Handler for ClonableHandler<H> {
-	fn handle_line(&mut self, e: &str) {
-		self.0.lock().unwrap().handle_line(e)
-	}
-}
-
-struct PlainHandler;
-impl Handler for PlainHandler {
-	fn handle_line(&mut self, e: &str) {
-		info!(target: "log", "{e}");
-	}
-}
-
-pub struct NoopHandler;
-impl Handler for NoopHandler {
-	fn handle_line(&mut self, _e: &str) {}
-}
-
-#[derive(Default)]
-pub struct NixHandler {
-	spans: HashMap<u64, Span>,
-}
-fn process_message(m: &str) -> String {
-	static OSC_CLEANER: Lazy<Regex> =
-		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
-	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
-	let m = OSC_CLEANER.replace_all(m, "");
-	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
-	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
-	DETABBER.replace_all(m.as_ref(), "  ").to_string()
-}
-impl Handler for NixHandler {
-	fn handle_line(&mut self, e: &str) {
-		if let Some(e) = e.strip_prefix("@nix ") {
-			let log: NixLog = match serde_json::from_str(e) {
-				Ok(l) => l,
-				Err(err) => {
-					warn!("failed to parse nix log line {:?}: {}", e, err);
-					return;
-				}
-			};
-			match log {
-				NixLog::Msg { msg, raw_msg, .. } => {
-					#[allow(clippy::nonminimal_bool)]
-					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
-					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
-					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
-						if let Some(raw_msg) = raw_msg {
-							if !msg.is_empty() {
-								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
-							} else {
-								info!(target: "nix", "{}", raw_msg.trim_end())
-							}
-						} else {
-							info!(target: "nix", "{}", msg.trim_end())
-						}
-					}
-				}
-				NixLog::Start {
-					ref fields,
-					typ,
-					id,
-					..
-				} if typ == 105 && !fields.is_empty() => {
-					if let [LogField::String(drv), ..] = &fields[..] {
-						let mut drv = drv.as_str();
-						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-							let mut it = pkg.splitn(2, '-');
-							it.next();
-							if let Some(pkg) = it.next() {
-								drv = pkg;
-							}
-						}
-						info!(target: "nix","building {}", drv);
-						let span = info_span!("build", drv);
-						span.pb_start();
-						self.spans.insert(id, span);
-					} else {
-						warn!("bad build log: {:?}", log)
-					}
-				}
-				NixLog::Start {
-					ref fields,
-					typ,
-					id,
-					..
-				} if typ == 100 && fields.len() >= 3 => {
-					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
-						&fields[..]
-					{
-						let mut drv = drv.as_str();
-
-						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-							let mut it = pkg.splitn(2, '-');
-							it.next();
-							if let Some(pkg) = it.next() {
-								drv = pkg;
-							}
-						}
-						// info!(target: "nix","copying {} {} -> {}", drv, from, to);
-						let span = info_span!("copy", from, to, drv);
-						span.pb_start();
-						self.spans.insert(id, span);
-					} else {
-						warn!("bad copy log: {:?}", log)
-					}
-				}
-				NixLog::Start { text, typ, id, .. }
-					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
-				{
-					if !text.is_empty()
-						&& text != "querying info about missing paths"
-						&& text != "copying 0 paths"
-						// Too much spam on lazy-trees branch
-						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))
-					{
-						let span = info_span!("job");
-						span.pb_start();
-						span.pb_set_message(&process_message(text.trim()));
-						self.spans.insert(id, span);
-						info!(target: "nix", "{}", text);
-					}
-				}
-				NixLog::Start {
-					text,
-					level: 0,
-					typ: 108,
-					..
-				} if text.is_empty() => {
-					// Cache lookup? Coupled with copy log
-				}
-				NixLog::Start {
-					text,
-					level: 4,
-					typ: 109,
-					..
-				} if text.starts_with("querying info about ") => {
-					// Cache lookup
-				}
-				NixLog::Start {
-					text,
-					level: 4,
-					typ: 101,
-					..
-				} if text.starts_with("downloading ") => {
-					// NAR downloading, coupled with copy log
-				}
-				NixLog::Start {
-					text,
-					level: 1,
-					typ: 111,
-					..
-				} if text.starts_with("waiting for a machine to build ") => {
-					// Useless repeating notification about build
-				}
-				NixLog::Start {
-					text,
-					level: 3,
-					typ: 111,
-					..
-				} if text.starts_with("resolved derivation: ") => {
-					// CA resolved
-				}
-				NixLog::Start {
-					text,
-					level: 1,
-					typ: 111,
-					id,
-					..
-				} if text.starts_with("waiting for lock on ") => {
-					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
-					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
-						drv = txt;
-					}
-					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
-						drv = txt;
-					}
-					if let Some(txt) = drv.split("', '").next() {
-						drv = txt;
-					}
-					if let Some(pkg) = drv.strip_prefix("/nix/store/") {
-						let mut it = pkg.splitn(2, '-');
-						it.next();
-						if let Some(pkg) = it.next() {
-							drv = pkg;
-						}
-					}
-					let span = info_span!("waiting on drv", drv);
-					span.pb_start();
-					self.spans.insert(id, span);
-					// Concurrent build of the same message
-				}
-				NixLog::Stop { id, .. } => {
-					self.spans.remove(&id);
-				}
-				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
-					if let Some(span) = self.spans.get(&id) {
-						if let LogField::String(s) = &fields[0] {
-							span.pb_set_message(&process_message(s.trim()));
-						} else {
-							warn!("bad fields: {fields:?}");
-						}
-					} else {
-						warn!("unknown result id: {id} {typ} {fields:?}");
-					}
-					// dbg!(fields, id, typ);
-				}
-				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
-					if let Some(span) = self.spans.get(&id) {
-						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
-							&fields[..4]
-						{
-							span.pb_set_length(*expected);
-							span.pb_set_position(*done);
-						} else {
-							warn!("bad fields: {fields:?}");
-						}
-					} else {
-						// warn!("unknown result id: {id} {typ} {fields:?}");
-						// Unaccounted progress.
-					}
-					// dbg!(fields, id, typ);
-				}
-				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
-					// Set phase, expected
-				}
-				_ => warn!("unknown log: {:?}", log),
-			};
-		} else {
-			let e = e.trim();
-			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
-				return;
-			}
-			info!("{e}")
-		}
-	}
 }
 
 async fn run_nix_inner_raw(
@@ -540,6 +267,7 @@
 ) -> Result<Option<Vec<u8>>> {
 	cmd.stderr(Stdio::piped());
 	cmd.stdout(Stdio::piped());
+	debug!("running command {cmd:?} on local");
 	let mut child = cmd.spawn()?;
 	let mut stderr = child.stderr.take().unwrap();
 	let stdout = child.stdout.take().unwrap();
@@ -600,6 +328,7 @@
 	err_handler: &mut dyn Handler,
 	mut out_handler: Option<&mut dyn Handler>,
 ) -> Result<Option<Vec<u8>>> {
+	debug!("running command {cmd:?} over ssh");
 	cmd.stderr(openssh::Stdio::piped());
 	cmd.stdout(openssh::Stdio::piped());
 	let mut child = cmd.spawn().await?;
@@ -656,77 +385,4 @@
 	}
 
 	Ok(out_buf)
-}
-
-pub trait ErrorRecorder: Send {
-	/// Return true to discard message from logging
-	fn push_message(&mut self, msg: &str) -> bool;
-}
-
-#[derive(Debug)]
-enum LogField {
-	String(String),
-	Num(u64),
-}
-
-impl<'de> Deserialize<'de> for LogField {
-	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
-	where
-		D: serde::Deserializer<'de>,
-	{
-		struct StringOrNum;
-		impl<'de> Visitor<'de> for StringOrNum {
-			type Value = LogField;
-
-			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-				write!(f, "string or unsigned")
-			}
-
-			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
-			where
-				E: serde::de::Error,
-			{
-				Ok(LogField::String(v.to_owned()))
-			}
-
-			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
-			where
-				E: serde::de::Error,
-			{
-				Ok(LogField::Num(v))
-			}
-		}
-
-		deserializer.deserialize_any(StringOrNum)
-	}
-}
-
-#[derive(Deserialize, Debug)]
-#[serde(rename_all = "camelCase", tag = "action")]
-#[allow(dead_code)]
-enum NixLog {
-	Msg {
-		level: u32,
-		msg: String,
-		raw_msg: Option<String>,
-	},
-	Start {
-		id: u64,
-		level: u32,
-		#[serde(default)]
-		fields: Vec<LogField>,
-		text: String,
-		#[serde(rename = "type")]
-		typ: u32,
-	},
-	Stop {
-		id: u64,
-	},
-	Result {
-		id: u64,
-		#[serde(rename = "type")]
-		typ: u32,
-		#[serde(default)]
-		fields: Vec<LogField>,
-	},
 }
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -9,7 +9,6 @@
 	sync::{Arc, Mutex, MutexGuard, OnceLock},
 };
 
-use age::Recipient;
 use anyhow::{anyhow, bail, Context, Result};
 use clap::{ArgGroup, Parser};
 use openssh::SessionBuilder;
@@ -50,10 +49,12 @@
 
 pub struct ConfigHost {
 	pub name: String,
+	pub local: bool,
 	pub session: OnceLock<Arc<openssh::Session>>,
 }
 impl ConfigHost {
-	pub async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+	async fn open_session(&self) -> Result<Arc<openssh::Session>> {
+		assert!(!self.local, "do not open ssh connection to local session");
 		// FIXME: TOCTOU
 		if let Some(session) = &self.session.get() {
 			return Ok((*session).clone());
@@ -96,8 +97,12 @@
 		D::from_str(&text).map_err(|e| anyhow!("failed to parse value: {e}"))
 	}
 	pub async fn cmd(&self, cmd: impl AsRef<OsStr>) -> Result<MyCommand> {
-		let session = self.open_session().await?;
-		Ok(MyCommand::new_on(cmd, session))
+		if self.local {
+			Ok(MyCommand::new(cmd))
+		} else {
+			let session = self.open_session().await?;
+			Ok(MyCommand::new_on(cmd, session))
+		}
 	}
 
 	pub async fn decrypt(&self, data: SecretData) -> Result<Vec<u8>> {
@@ -110,8 +115,25 @@
 			.context("failed to call remote host for decrypt")?;
 		z85::decode(encoded.trim_end()).context("bad encoded data? outdated host?")
 	}
+	pub async fn reencrypt(&self, data: SecretData, targets: Vec<String>) -> Result<SecretData> {
+		let mut cmd = self.cmd("fleet-install-secrets").await?;
+		cmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
+		for target in targets {
+			cmd.eqarg("--targets", target);
+		}
+		let encoded = cmd
+			.sudo()
+			.run_string()
+			.await
+			.context("failed to call remote host for decrypt")?;
+		SecretData::decode_z85(encoded.trim_end()).context("bad encoded data? outdated host?")
+	}
 	/// Returns path for futureproofing, as path might change i.e on conversion to CA
 	pub async fn remote_derivation(&self, path: &PathBuf) -> Result<PathBuf> {
+		if self.local {
+			// Path is located locally, thus already trusted.
+			return Ok(path.to_owned());
+		}
 		let mut nix = MyCommand::new("nix");
 		nix.arg("copy")
 			.arg("--substitute-on-destination")
@@ -120,6 +142,25 @@
 		nix.run_nix().await?;
 		Ok(path.to_owned())
 	}
+	pub async fn systemctl_stop(&self, name: &str) -> Result<()> {
+		let mut cmd = self.cmd("systemctl").await?;
+		cmd.arg("stop").arg(name);
+		cmd.sudo().run().await
+	}
+	pub async fn systemctl_start(&self, name: &str) -> Result<()> {
+		let mut cmd = self.cmd("systemctl").await?;
+		cmd.arg("start").arg(name);
+		cmd.sudo().run().await
+	}
+
+	pub async fn rm_file(&self, path: impl AsRef<OsStr>, sudo: bool) -> Result<()> {
+		let mut cmd = self.cmd("rm").await?;
+		cmd.arg("-f").arg(path);
+		if sudo {
+			cmd = cmd.sudo()
+		}
+		cmd.run().await
+	}
 }
 
 impl Config {
@@ -134,35 +175,12 @@
 	}
 	pub fn is_local(&self, host: &str) -> bool {
 		self.opts.localhost.as_ref().map(|s| s as &str) == Some(host)
-	}
-
-	pub async fn run_on(&self, host: &str, mut command: MyCommand, sudo: bool) -> Result<()> {
-		if sudo {
-			command = command.sudo();
-		}
-		if !self.is_local(host) {
-			command = command.ssh(host);
-		}
-		command.run().await
-	}
-	pub async fn run_string_on(
-		&self,
-		host: &str,
-		mut command: MyCommand,
-		sudo: bool,
-	) -> Result<String> {
-		if sudo {
-			command = command.sudo();
-		}
-		if !self.is_local(host) {
-			command = command.ssh(host);
-		}
-		command.run_string().await
 	}
 
 	pub async fn host(&self, name: &str) -> Result<ConfigHost> {
 		Ok(ConfigHost {
 			name: name.to_owned(),
+			local: self.is_local(name),
 			session: OnceLock::new(),
 		})
 	}
@@ -172,6 +190,7 @@
 		let mut out = vec![];
 		for name in names {
 			out.push(ConfigHost {
+				local: self.is_local(&name),
 				name,
 				session: OnceLock::new(),
 			})
@@ -225,27 +244,6 @@
 		let mut data = self.data_mut();
 		let host_secrets = data.host_secrets.entry(host.to_owned()).or_default();
 		host_secrets.insert(secret, value);
-	}
-
-	pub async fn reencrypt_on_host(
-		&self,
-		host: &str,
-		data: SecretData,
-		targets: Vec<String>,
-	) -> Result<SecretData> {
-		let mut recmd = MyCommand::new("fleet-install-secrets");
-		recmd.arg("reencrypt").eqarg("--secret", data.encode_z85());
-		for target in targets {
-			recmd.eqarg("--targets", target);
-		}
-		recmd = recmd.sudo().ssh(host);
-		let encoded = recmd
-			.run_string()
-			.await
-			.context("failed to call remote host for decrypt")?
-			.trim()
-			.to_owned();
-		SecretData::decode_z85(&encoded)
 	}
 
 	pub fn host_secret(&self, host: &str, secret: &str) -> Result<FleetSecret> {
modifiedcmds/fleet/src/keys.rsdiffbeforeafterboth
--- a/cmds/fleet/src/keys.rs
+++ b/cmds/fleet/src/keys.rs
@@ -1,6 +1,5 @@
 use std::str::FromStr;
 
-use crate::command::MyCommand;
 use crate::host::Config;
 use age::Recipient;
 use anyhow::{anyhow, Result};
@@ -30,10 +29,11 @@
 			Ok(key)
 		} else {
 			warn!("Loading key for {}", host);
-			let mut cmd = MyCommand::new("cat");
+			let host = self.host(host).await?;
+			let mut cmd = host.cmd("cat").await?;
 			cmd.arg("/etc/ssh/ssh_host_ed25519_key.pub");
-			let key = self.run_string_on(host, cmd, false).await?;
-			self.update_key(host, key.clone());
+			let key = cmd.run_string().await?;
+			self.update_key(&host.name, key.clone());
 			Ok(key)
 		}
 	}
addedcmds/remowt-agent/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "remowt-agent"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
addedcmds/remowt-agent/README.adocdiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/README.adoc
@@ -0,0 +1,16 @@
+= Remowt agent
+
+Working with remote machine programmatically is not always easy.
+
+Sure, you have ssh, sftp, and that kind of fancy stuff, but what about minimal distributions, routers?
+
+Well, sftp can be replaced with FISH... But what if remote machine isn't accessible over ssh at all? What if the only communication channel you have is uart?
+
+What if remote host has not enough tools to implement the functionality you need?
+
+Remowt is intended to solve this in a way similar to how some RAT toolkits (I.e metasploit) do - you inject minimal agent, setup some communication channel to it (stdio perhaps?), and then you deploy payloads on it, and the payloads perform the actual work.
+
+== Non-targets
+
+Minimal executable size:: As long as it transferred only once, it shouldn't be a problem to keep it a reasonable size.
+Be stealthy:: As it solves the problem almost the same way as metasploit, it is possible to use it as something bad, but this is not the remowt intended purpose, and never will be.
addedcmds/remowt-agent/src/main.rsdiffbeforeafterboth
--- /dev/null
+++ b/cmds/remowt-agent/src/main.rs
@@ -0,0 +1,3 @@
+fn main() {
+	println!("Hello, world!");
+}
addedcrates/better-command/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "better-command"
+version = "0.1.0"
+edition = "2021"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+once_cell = "1.19.0"
+regex = "1.10.2"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_json = "1.0.108"
+tracing = "0.1.40"
+tracing-indicatif = "0.3.6"
addedcrates/better-command/src/handler.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/src/handler.rs
@@ -0,0 +1,305 @@
+//! Collection of handlers, which transform program-specific stdout format to tracing
+
+use std::collections::HashMap;
+use std::sync::{Arc, Mutex};
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use tracing::{Span, info, warn, info_span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
+
+pub trait Handler: Send {
+	fn handle_line(&mut self, e: &str);
+}
+
+/// Handler wrapper, which can be cloned.
+pub struct ClonableHandler<H>(Arc<Mutex<H>>);
+impl<H> Clone for ClonableHandler<H> {
+	fn clone(&self) -> Self {
+		Self(self.0.clone())
+	}
+}
+impl<H> ClonableHandler<H> {
+	pub fn new(inner: H) -> Self {
+		Self(Arc::new(Mutex::new(inner)))
+	}
+}
+impl<H: Handler> Handler for ClonableHandler<H> {
+	fn handle_line(&mut self, e: &str) {
+		self.0.lock().unwrap().handle_line(e)
+	}
+}
+
+/// Converts command output to tracing lines
+pub struct PlainHandler;
+impl Handler for PlainHandler {
+	fn handle_line(&mut self, e: &str) {
+		info!(target: "log", "{e}");
+	}
+}
+
+/// Ignores output
+pub struct NoopHandler;
+impl Handler for NoopHandler {
+	fn handle_line(&mut self, _e: &str) {}
+}
+
+/// Transform nix internal-json logs to tracing spans.
+#[derive(Default)]
+pub struct NixHandler {
+	spans: HashMap<u64, Span>,
+}
+#[derive(Deserialize, Debug)]
+#[serde(untagged)]
+enum LogField {
+	String(String),
+	Num(u64),
+}
+
+/// Nix internal-json log line type
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase", tag = "action")]
+#[allow(dead_code)]
+enum NixLog {
+	Msg {
+		level: u32,
+		msg: String,
+		raw_msg: Option<String>,
+	},
+	Start {
+		id: u64,
+		level: u32,
+		#[serde(default)]
+		fields: Vec<LogField>,
+		text: String,
+		#[serde(rename = "type")]
+		typ: u32,
+	},
+	Stop {
+		id: u64,
+	},
+	Result {
+		id: u64,
+		#[serde(rename = "type")]
+		typ: u32,
+		#[serde(default)]
+		fields: Vec<LogField>,
+	},
+}
+fn process_message(m: &str) -> String {
+	// Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.
+	static OSC_CLEANER: Lazy<Regex> =
+		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+	let m = OSC_CLEANER.replace_all(m, "");
+	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+	DETABBER.replace_all(m.as_ref(), "  ").to_string()
+}
+impl Handler for NixHandler {
+	fn handle_line(&mut self, e: &str) {
+		if let Some(e) = e.strip_prefix("@nix ") {
+			let log: NixLog = match serde_json::from_str(e) {
+				Ok(l) => l,
+				Err(err) => {
+					warn!("failed to parse nix log line {:?}: {}", e, err);
+					return;
+				}
+			};
+			match log {
+				NixLog::Msg { msg, raw_msg, .. } => {
+					#[allow(clippy::nonminimal_bool)]
+					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))
+					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")
+					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {
+						if let Some(raw_msg) = raw_msg {
+							if !msg.is_empty() {
+								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())
+							} else {
+								info!(target: "nix", "{}", raw_msg.trim_end())
+							}
+						} else {
+							info!(target: "nix", "{}", msg.trim_end())
+						}
+					}
+				}
+				NixLog::Start {
+					ref fields,
+					typ,
+					id,
+					..
+				} if typ == 105 && !fields.is_empty() => {
+					if let [LogField::String(drv), ..] = &fields[..] {
+						let mut drv = drv.as_str();
+						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+							let mut it = pkg.splitn(2, '-');
+							it.next();
+							if let Some(pkg) = it.next() {
+								drv = pkg;
+							}
+						}
+						info!(target: "nix","building {}", drv);
+						let span = info_span!("build", drv);
+						span.pb_start();
+						self.spans.insert(id, span);
+					} else {
+						warn!("bad build log: {:?}", log)
+					}
+				}
+				NixLog::Start {
+					ref fields,
+					typ,
+					id,
+					..
+				} if typ == 100 && fields.len() >= 3 => {
+					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =
+						&fields[..]
+					{
+						let mut drv = drv.as_str();
+
+						if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+							let mut it = pkg.splitn(2, '-');
+							it.next();
+							if let Some(pkg) = it.next() {
+								drv = pkg;
+							}
+						}
+						// info!(target: "nix","copying {} {} -> {}", drv, from, to);
+						let span = info_span!("copy", from, to, drv);
+						span.pb_start();
+						self.spans.insert(id, span);
+					} else {
+						warn!("bad copy log: {:?}", log)
+					}
+				}
+				NixLog::Start { text, typ, id, .. }
+					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>
+				{
+					if !text.is_empty()
+						&& text != "querying info about missing paths"
+						&& text != "copying 0 paths"
+						// Too much spam on lazy-trees branch
+						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))
+					{
+						let span = info_span!("job");
+						span.pb_start();
+						span.pb_set_message(&process_message(text.trim()));
+						self.spans.insert(id, span);
+						info!(target: "nix", "{}", text);
+					}
+				}
+				NixLog::Start {
+					text,
+					level: 0,
+					typ: 108,
+					..
+				} if text.is_empty() => {
+					// Cache lookup? Coupled with copy log
+				}
+				NixLog::Start {
+					text,
+					level: 4,
+					typ: 109,
+					..
+				} if text.starts_with("querying info about ") => {
+					// Cache lookup
+				}
+				NixLog::Start {
+					text,
+					level: 4,
+					typ: 101,
+					..
+				} if text.starts_with("downloading ") => {
+					// NAR downloading, coupled with copy log
+				}
+				NixLog::Start {
+					text,
+					level: 1,
+					typ: 111,
+					..
+				} if text.starts_with("waiting for a machine to build ") => {
+					// Useless repeating notification about build
+				}
+				NixLog::Start {
+					text,
+					level: 3,
+					typ: 111,
+					..
+				} if text.starts_with("resolved derivation: ") => {
+					// CA resolved
+				}
+				NixLog::Start {
+					text,
+					level: 1,
+					typ: 111,
+					id,
+					..
+				} if text.starts_with("waiting for lock on ") => {
+					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();
+					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {
+						drv = txt;
+					}
+					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {
+						drv = txt;
+					}
+					if let Some(txt) = drv.split("', '").next() {
+						drv = txt;
+					}
+					if let Some(pkg) = drv.strip_prefix("/nix/store/") {
+						let mut it = pkg.splitn(2, '-');
+						it.next();
+						if let Some(pkg) = it.next() {
+							drv = pkg;
+						}
+					}
+					let span = info_span!("waiting on drv", drv);
+					span.pb_start();
+					self.spans.insert(id, span);
+					// Concurrent build of the same message
+				}
+				NixLog::Stop { id, .. } => {
+					self.spans.remove(&id);
+				}
+				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
+					if let Some(span) = self.spans.get(&id) {
+						if let LogField::String(s) = &fields[0] {
+							span.pb_set_message(&process_message(s.trim()));
+						} else {
+							warn!("bad fields: {fields:?}");
+						}
+					} else {
+						warn!("unknown result id: {id} {typ} {fields:?}");
+					}
+					// dbg!(fields, id, typ);
+				}
+				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {
+					if let Some(span) = self.spans.get(&id) {
+						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =
+							&fields[..4]
+						{
+							span.pb_set_length(*expected);
+							span.pb_set_position(*done);
+						} else {
+							warn!("bad fields: {fields:?}");
+						}
+					} else {
+						// warn!("unknown result id: {id} {typ} {fields:?}");
+						// Unaccounted progress.
+					}
+					// dbg!(fields, id, typ);
+				}
+				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {
+					// Set phase, expected
+				}
+				_ => warn!("unknown log: {:?}", log),
+			};
+		} else {
+			let e = e.trim();
+			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {
+				return;
+			}
+			info!("{e}")
+		}
+	}
+}
addedcrates/better-command/src/lib.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/better-command/src/lib.rs
@@ -0,0 +1,17 @@
+mod handler;
+pub use handler::{Handler, PlainHandler, NoopHandler, NixHandler, ClonableHandler};
+
+pub fn add(left: usize, right: usize) -> usize {
+	left + right
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+
+	#[test]
+	fn it_works() {
+		let result = add(2, 2);
+		assert_eq!(result, 4);
+	}
+}