git.delta.rocks / jrsonnet / refs/commits / 89d35672dcfd

difftreelog

refactor perform build using nix repl

Yaroslav Bolyukin2023-12-24parent: #e85b4da.patch.diff
in: trunk

8 files changed

modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,5 +1,7 @@
+use std::collections::HashMap;
 use std::ffi::{OsStr, OsString};
-use std::fmt::Display;
+use std::fmt::{self, Display};
+use std::path::PathBuf;
 use std::process::Stdio;
 use std::sync::{Arc, OnceLock};
 
@@ -8,7 +10,7 @@
 use itertools::Itertools;
 use r2d2::{Pool, PooledConnection};
 use serde::de::DeserializeOwned;
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
 use tokio::io::AsyncWriteExt;
 use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
 use tokio::select;
@@ -72,14 +74,20 @@
 		// 	s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)
 		// }
 		if !self.collected.is_empty() {
-			bail!("{}", self.collected.iter().map(|v| {
-				if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {
-					let v = unindent::unindent(f.trim_start());
-					v.trim().to_owned()
-				} else {
-					v.to_owned()
-				}
-			}).join("\n"));
+			bail!(
+				"{}",
+				self.collected
+					.iter()
+					.map(|v| {
+						if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {
+							let v = unindent::unindent(f.trim_start());
+							v.trim().to_owned()
+						} else {
+							v.to_owned()
+						}
+					})
+					.join("\n")
+			);
 		}
 		Ok(())
 	}
@@ -150,6 +158,13 @@
 	}
 }
 
+struct WarnHandler;
+impl Handler for WarnHandler {
+	fn handle_line(&mut self, e: &str) {
+		warn!(target: "nix", "{e}")
+	}
+}
+
 impl NixSessionInner {
 	async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {
 		let mut cmd = Command::new("nix");
@@ -174,12 +189,13 @@
 		stdin.flush().await?;
 		let nix_handler = NixHandler::default();
 		let mut full_delimiter = None;
+		let mut errors = vec![];
 		while let Some(line) = out.next().await {
 			let line = match line {
 				OutputLine::Out(o) => o,
 				OutputLine::Err(_e) => {
 					// Handle startup errors, but skip repl hello?
-					//nix_handler.handle_line(&e);
+					errors.push(_e);
 					continue;
 				}
 			};
@@ -190,6 +206,9 @@
 			}
 		}
 		let Some(full_delimiter) = full_delimiter else {
+			for e in errors {
+				error!("{e}");
+			}
 			bail!("failed to discover delimiter");
 		};
 		let mut res = Self {
@@ -342,21 +361,93 @@
 #[derive(Clone)]
 pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);
 
+#[macro_export]
+macro_rules! nix_path {
+	(@o($o:ident) $var:ident $($tt:tt)*) => {{
+		$o.push(Index::var(stringify!($var)));
+		nix_path!(@o($o) $($tt)*);
+	}};
+	(@o($o:ident) . $var:ident $($tt:tt)*) => {{
+		$o.push(Index::attr(stringify!($var)));
+		nix_path!(@o($o) $($tt)*);
+	}};
+	(@o($o:ident) . $var:literal $($tt:tt)*) => {{
+		$o.push(Index::attr($var));
+		nix_path!(@o($o) $($tt)*);
+	}};
+	(@o($o:ident) . { $var:expr } $($tt:tt)*) => {{
+		$o.push(Index::attr($var));
+		nix_path!(@o($o) $($tt)*);
+	}};
+	(@o($o:ident) [ $var:literal ] $($tt:tt)*) => {{
+		$o.push(Index::idx($var));
+		nix_path!(@o($o) $($tt)*);
+	}};
+	(@o($o:ident) ($e:expr) $($tt:tt)*) => {
+		$o.push(Index::apply($e));
+		nix_path!(@o($o) $($tt)*);
+	};
+	(@o($o:ident)) => {};
+	($($tt:tt)+) => {{
+		use $crate::{nix_path, better_nix_eval::Index};
+		let mut out = vec![];
+		nix_path!(@o(out) $($tt)*);
+		out
+	}}
+}
+
 #[derive(Clone)]
-enum Index {
+pub enum Index {
+	Var(String),
 	String(String),
-	// Idx(u32),
+	Apply(String),
+	Idx(u32),
 }
+impl Index {
+	pub fn var(v: impl AsRef<str>) -> Self {
+		let v = v.as_ref();
+		assert!(
+			!(v.contains('.') | v.contains(' ')),
+			"bad variable name: {v}"
+		);
+		Self::Var(v.to_owned())
+	}
+	pub fn attr(v: impl AsRef<str>) -> Self {
+		Self::String(v.as_ref().to_owned())
+	}
+	pub fn idx(v: u32) -> Self {
+		Self::Idx(v)
+	}
+	pub fn apply(v: impl Serialize) -> Self {
+		let serialized = nixlike::serialize(v).expect("invalid value for apply");
+		Self::Apply(serialized)
+	}
+}
 impl Display for Index {
 	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 		match self {
+			Index::Var(v) => {
+				write!(f, "{v}")
+			}
 			Index::String(k) => {
 				let v = nixlike::format_identifier(k.as_str());
 				write!(f, ".{v}")
 			}
+			Index::Apply(o) => {
+				let v = nixlike::serialize(o).map_err(|_| fmt::Error)?;
+				write!(f, "<apply>({v})")
+			}
+			Index::Idx(i) => {
+				write!(f, "[{i}]")
+			}
 		}
 	}
 }
+impl fmt::Debug for Index {
+	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+		write!(f, "{self}")
+	}
+}
 struct PathDisplay<'i>(&'i [Index]);
 impl Display for PathDisplay<'_> {
 	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -381,43 +472,49 @@
 		}
 	}
 	pub async fn field(session: NixSession, field: &str) -> Result<Self> {
-		Self::root(session).get_field_deep([field]).await
+		Self::root(session)
+			.select([Index::var(field)])
+			.await
 	}
 	pub async fn get_json_deep<'a, V: DeserializeOwned>(
 		&self,
-		name: impl IntoIterator<Item = &'a str>,
+		name: impl IntoIterator<Item = Index>,
 	) -> Result<V> {
-		let field = self.get_field_deep(name).await?;
+		let field = self.select(name).await?;
 		field.as_json().await
 	}
-	pub async fn get_field(&self, name: &str) -> Result<Self> {
-		self.get_field_deep([name]).await
-	}
-	pub async fn get_field_deep<'a>(
-		&self,
-		name: impl IntoIterator<Item = &'a str>,
-	) -> Result<Self> {
-		let mut iter = name.into_iter();
+	pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {
+		let mut name = name.into_iter();
 
 		let mut full_path = self.full_path.clone();
 		let mut query = if let Some(id) = self.value {
 			format!("sess_field_{id}")
 		} else {
-			let first = iter.next().expect("name not empty");
-			ensure!(
-				!(first.contains('.') | first.contains(' ')),
-				"bad name for root query: {first}"
-			);
-			full_path.push(Index::String(first.to_string()));
-			first.to_string()
+			let first = name.next();
+			if let Some(Index::Var(i)) = first {
+				full_path.push(Index::Var(i.clone()));
+				i.clone()
+			} else {
+				panic!("first path item should be variable, got {first:?}")
+			}
 		};
-		for v in iter {
-			full_path.push(Index::String(v.to_string()));
-			// Escape
-			let escaped = nixlike::serialize(v)?;
-			let escaped = escaped.trim();
-			query.push('.');
-			query.push_str(escaped);
+		for v in name {
+			full_path.push(v.clone());
+			match v {
+				Index::Var(_) => panic!("var item may only be first"),
+				Index::String(s) => {
+					let escaped = nixlike::serialize(s)?;
+					query.push('.');
+					query.push_str(escaped.trim());
+				}
+				Index::Apply(a) => {
+					query.push(' ');
+					query.push_str(&a);
+				}
+				Index::Idx(idx) => {
+					query = format!("builtins.elemAt ({query}) {idx}");
+				}
+			}
 		}
 
 		let vid = self
@@ -454,6 +551,28 @@
 			.await
 			.with_context(|| format!("full path: {}", PathDisplay(&self.full_path)))
 	}
+	pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {
+		let id = self.value.expect("can't use build on not-value");
+		let vid = self
+			.session
+			.0
+			.lock()
+			.await
+			.execute_expression_raw(&format!(":b sess_field_{id}"), &mut NixHandler::default())
+			.await?;
+		ensure!(!vid.is_empty(), "build failed");
+		let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")
+		else {
+			panic!("unexpected build output: {vid:?}");
+		};
+		let outputs = vid
+			.split('\n')
+			.filter(|v| !v.is_empty())
+			.map(|v| v.split_once(" -> ").expect("unexpected build output"))
+			.map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))
+			.collect();
+		Ok(outputs)
+	}
 }
 impl Drop for Field {
 	fn drop(&mut self) {
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -1,8 +1,10 @@
+use std::os::unix::fs::symlink;
 use std::path::PathBuf;
 use std::{env::current_dir, time::Duration};
 
 use crate::command::MyCommand;
 use crate::host::Config;
+use crate::nix_path;
 use anyhow::{anyhow, Result};
 use clap::Parser;
 use itertools::Itertools;
@@ -11,15 +13,9 @@
 
 #[derive(Parser, Clone)]
 pub struct BuildSystems {
-	/// Do not continue on error
-	#[clap(long)]
-	fail_fast: bool,
 	/// Disable automatic rollback
 	#[clap(long)]
 	disable_rollback: bool,
-	/// Run builds as sudo
-	#[clap(long)]
-	privileged_build: bool,
 	#[clap(subcommand)]
 	subcommand: Subcommand,
 }
@@ -294,34 +290,11 @@
 	async fn build_task(self, config: Config, host: String) -> Result<()> {
 		info!("building");
 		let action = Action::from(self.subcommand.clone());
-		let built = {
-			let dir = tempfile::tempdir()?;
-			dir.path().to_owned()
-		};
-
-		let mut nix_build = MyCommand::new("nix");
-		nix_build
-			.args([
-				"build",
-				"--impure",
-				"--json",
-				// "--show-trace",
-				"--no-link",
-			])
-			.comparg("--out-link", &built)
-			.arg(
-				config.configuration_attr_name(&format!(
-					"buildSystems.{}.{host}",
-					action.build_attr()
-				)),
-			)
-			.args(&config.nix_args);
-
-		if self.privileged_build {
-			nix_build = nix_build.sudo();
-		}
-
-		nix_build.run_nix().await.map_err(|e| {
+		let drv = config
+			.fleet_field
+			.select(nix_path!(.buildSystems.{action.build_attr()}.{&host}))
+			.await?;
+		let outputs = drv.build().await.map_err(|e| {
 			if action.build_attr() == "sdImage" {
 				info!("sd-image build failed");
 				info!("Make sure you have imported modulesPath/installer/sd-card/sd-image-<arch>[-installer].nix (For installer, you may want to check config)");
@@ -329,7 +302,9 @@
 			}
 			e
 		})?;
-		let built = std::fs::canonicalize(built)?;
+		let out_output = outputs
+			.get("out")
+			.ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;
 
 		match action {
 			Action::Upload { action } => {
@@ -342,7 +317,7 @@
 							.arg("sign")
 							.comparg("--key-file", "/etc/nix/private-key")
 							.arg("-r")
-							.arg(&built);
+							.arg(out_output);
 						if let Err(e) = sign.sudo().run_nix().await {
 							warn!("Failed to sign store paths: {e}");
 						};
@@ -353,7 +328,7 @@
 						nix.arg("copy")
 							.arg("--substitute-on-destination")
 							.comparg("--to", format!("ssh-ng://{host}"))
-							.arg(&built);
+							.arg(out_output);
 						match nix.run_nix().await {
 							Ok(()) => break,
 							Err(e) if tries < 3 => {
@@ -366,53 +341,22 @@
 					}
 				}
 				if let Some(action) = action {
-					execute_upload(&self, &config, action, &host, built).await?
+					execute_upload(&self, &config, action, &host, out_output.clone()).await?
 				}
 			}
 			Action::Package(PackageAction::SdImage) => {
 				let mut out = current_dir()?;
 				out.push(format!("sd-image-{}", host));
 
-				info!("building sd image to {:?}", out);
-				let mut nix_build = MyCommand::new("nix");
-				nix_build
-					.args(["build", "--impure", "--no-link"])
-					.comparg("--out-link", &out)
-					.arg(config.configuration_attr_name(&format!("buildSystems.sdImage.{}", host,)))
-					.args(&config.nix_args);
-				if !self.fail_fast {
-					nix_build.arg("--keep-going");
-				}
-				if self.privileged_build {
-					nix_build = nix_build.sudo();
-				}
-
-				nix_build.run_nix().await?;
+				info!("linking sd image to {:?}", out);
+				symlink(out_output, out)?;
 			}
 			Action::Package(PackageAction::InstallationCd) => {
 				let mut out = current_dir()?;
 				out.push(format!("installation-cd-{}", host));
 
-				info!("building sd image to {:?}", out);
-				let mut nix_build = MyCommand::new("nix");
-				nix_build
-					.args(["build", "--impure", "--no-link"])
-					.comparg("--out-link", &out)
-					.arg(
-						config.configuration_attr_name(&format!(
-							"buildSystems.installationCd.{}",
-							host,
-						)),
-					)
-					.args(&config.nix_args);
-				if !self.fail_fast {
-					nix_build.arg("--keep-going");
-				}
-				if self.privileged_build {
-					nix_build = nix_build.sudo();
-				}
-
-				nix_build.run_nix().await?;
+				info!("linking iso image to {:?}", out);
+				symlink(out_output, out)?;
 			}
 		};
 		Ok(())
modifiedcmds/fleet/src/cmds/info.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/info.rs
+++ b/cmds/fleet/src/cmds/info.rs
@@ -1,6 +1,7 @@
 use std::collections::BTreeSet;
 
 use crate::host::Config;
+use crate::nix_path;
 use anyhow::{ensure, Result};
 use clap::Parser;
 
@@ -38,7 +39,7 @@
 					if !tagged.is_empty() {
 						let tags: Vec<String> = config
 							.fleet_field
-							.get_field_deep(["configuredSystems", &host.name, "config", "tags"])
+							.select(nix_path!(.configuredSystems.{&host.name}.config.tags))
 							.await?
 							.as_json()
 							.await?;
@@ -64,7 +65,7 @@
 				let host = config.system_config(&host).await?;
 				if external {
 					out.extend(
-						host.get_field_deep(["network", "externalIps"])
+						host.select(nix_path!(.network.externalIps))
 							.await?
 							.as_json::<Vec<String>>()
 							.await?,
@@ -72,7 +73,7 @@
 				}
 				if internal {
 					out.extend(
-						host.get_field_deep(["network", "internalIps"])
+						host.select(nix_path!(.network.internalIps))
 							.await?
 							.as_json::<Vec<String>>()
 							.await?,
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,6 +1,6 @@
 use crate::{
 	fleetdata::{FleetSecret, FleetSharedSecret},
-	host::Config,
+	host::Config, nix_path,
 };
 use anyhow::{bail, ensure, Context, Result};
 use chrono::Utc;
@@ -339,7 +339,7 @@
 					let mut data = config.shared_secret(name)?;
 					let expected_owners: Vec<String> = config
 						.config_field
-						.get_json_deep(["sharedSecrets", name, "expectedOwners"])
+						.get_json_deep(nix_path!(sharedSecrets.{name}.expectedOwners))
 						.await?;
 					if expected_owners.is_empty() {
 						warn!("secret was removed from fleet config: {name}, removing from data");
@@ -352,7 +352,7 @@
 					if set != expected_set {
 						let owner_dependent: bool = config
 							.config_field
-							.get_json_deep(["sharedSecrets", name, "ownerDependent"])
+							.get_json_deep(nix_path!(.sharedSecrets.{name}.ownerDependent))
 							.await?;
 						if !owner_dependent {
 							warn!("reencrypting secret '{name}' for new owner set");
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
before · cmds/fleet/src/command.rs
1use std::{2	borrow::Cow,3	collections::HashMap,4	ffi::OsStr,5	process::Stdio,6	sync::{Arc, Mutex},7	task::Poll,8};910use anyhow::{anyhow, Result};11use futures::StreamExt;12use itertools::Either;13use once_cell::sync::Lazy;14use openssh::{OverSsh, Session};15use regex::Regex;16use serde::{de::Visitor, Deserialize};17use tokio::{io::AsyncRead, process::Command, select};18use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};19use tracing::{info, info_span, warn, Span};20use tracing_indicatif::span_ext::IndicatifSpanExt;2122fn escape_bash(input: &str, out: &mut String) {23	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";24	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {25		out.push_str(input);26		return;27	}28	out.push('\'');29	for (i, v) in input.split('\'').enumerate() {30		if i != 0 {31			out.push_str("'\"'\"'");32		}33		out.push_str(v);34	}35	out.push('\'');36}37fn ostoutf8(os: impl AsRef<OsStr>) -> String {38	os.as_ref().to_str().expect("non-utf8 data").to_owned()39}40#[derive(Clone)]41pub struct MyCommand {42	command: String,43	args: Vec<String>,44	env: Vec<(String, String)>,45	ssh_session: Option<Arc<Session>>,46}47impl MyCommand {48	pub fn new(cmd: impl AsRef<OsStr>) -> Self {49		assert!(!cmd.as_ref().is_empty());50		Self {51			command: ostoutf8(cmd),52			args: vec![],53			env: vec![],54			ssh_session: None,55		}56	}57	fn into_args(self) -> Vec<String> {58		let mut out = Vec::new();59		if !self.env.is_empty() {60			out.push("env".to_owned());61			for (k, v) in self.env {62				assert!(!k.contains('='));63				out.push(format!("{k}={v}"));64			}65		}66		out.push(self.command);67		out.extend(self.args);68		out69	}70	fn into_string(self) -> String {71		let mut out = String::new();72		if !self.env.is_empty() {73			out.push_str("env");74			for (k, v) in self.env {75				out.push(' ');76				assert!(!k.contains('='));77				escape_bash(&k, &mut out);78				out.push('=');79				escape_bash(&v, &mut out);80			}81		}82		if !out.is_empty() {83			out.push(' ');84		}85		escape_bash(&self.command, &mut out);86		for arg in self.args {87			out.push(' ');88			escape_bash(&arg, &mut out);89		}90		out91	}92	fn into_command(self) -> Command {93		let mut out = Command::new(self.command);94		out.args(self.args);95		for (k, v) in self.env {96			out.env(k, v);97		}98		out99	}100	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {101		Ok(if let Some(session) = self.ssh_session.clone() {102			let cmd = self.into_command();103			Either::Right(104				cmd.over_ssh(session)105					.map_err(|e| anyhow!("ssh error: {e}"))?,106			)107		} else {108			let cmd = self.into_command();109			Either::Left(cmd)110		})111	}112	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {113		let arg = arg.as_ref();114		self.args.push(ostoutf8(arg));115		self116	}117	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {118		let arg = arg.as_ref();119		let value = value.as_ref();120		let arg = ostoutf8(arg);121		let value = ostoutf8(value);122		self.arg(format!("{arg}={value}"));123		self124	}125	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {126		self.arg(arg);127		self.arg(value);128		self129	}130	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {131		for arg in args.into_iter() {132			let arg = arg.as_ref();133			self.args.push(ostoutf8(arg));134		}135		self136	}137	pub fn sudo(self) -> Self {138		if std::env::var_os("NO_SUDO").is_some() {139			let mut out = Self::new("su");140			out.arg("-c").arg(self.into_string());141			out142		} else {143			let mut out = Self::new("sudo");144			out.args(self.into_args());145			out146		}147	}148	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {149		let mut out = Self::new("ssh");150		out.arg(on).arg("--");151		out.arg(self.into_string());152		out153	}154	pub fn over_ssh(mut self, session: Arc<Session>) -> Self {155		self.ssh_session = Some(session);156		self157	}158159	pub async fn run(self) -> Result<()> {160		let str = self.clone().into_string();161		let cmd = self.into_command();162		run_nix_inner(str, cmd, &mut PlainHandler).await?;163		Ok(())164	}165	pub async fn run_string(self) -> Result<String> {166		let str = self.clone().into_string();167		let cmd = self.into_command();168		let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;169		Ok(v)170	}171172	pub async fn run_nix_string(self) -> Result<String> {173		let str = self.clone().into_string();174		let mut cmd = self.into_command();175		cmd.arg("--log-format").arg("internal-json");176		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await177	}178	pub async fn run_nix(self) -> Result<()> {179		let str = self.clone().into_string();180		let mut cmd = self.into_command();181		cmd.arg("--log-format").arg("internal-json");182		cmd.stdout(Stdio::inherit());183		run_nix_inner(str, cmd, &mut NixHandler::default()).await184	}185}186187struct EmptyAsyncRead;188impl AsyncRead for EmptyAsyncRead {189	fn poll_read(190		self: std::pin::Pin<&mut Self>,191		_cx: &mut std::task::Context<'_>,192		_buf: &mut tokio::io::ReadBuf<'_>,193	) -> Poll<std::io::Result<()>> {194		Poll::Pending195	}196}197198async fn run_nix_inner_stdout(199	str: String,200	cmd: Command,201	handler: &mut dyn Handler,202) -> Result<String> {203	Ok(run_nix_inner_raw(str, cmd, true, handler, None)204		.await?205		.expect("has out"))206}207async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {208	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;209	assert!(v.is_none());210	Ok(())211}212213pub trait Handler: Send {214	fn handle_line(&mut self, e: &str);215}216217pub struct ClonableHandler<H>(Arc<Mutex<H>>);218impl<H> Clone for ClonableHandler<H> {219	fn clone(&self) -> Self {220		Self(self.0.clone())221	}222}223impl<H> ClonableHandler<H> {224	pub fn new(inner: H) -> Self {225		Self(Arc::new(Mutex::new(inner)))226	}227}228impl<H: Handler> Handler for ClonableHandler<H> {229	fn handle_line(&mut self, e: &str) {230		self.0.lock().unwrap().handle_line(e)231	}232}233234struct PlainHandler;235impl Handler for PlainHandler {236	fn handle_line(&mut self, e: &str) {237		info!(target: "log", "{e}");238	}239}240241pub struct NoopHandler;242impl Handler for NoopHandler {243	fn handle_line(&mut self, _e: &str) {}244}245246#[derive(Default)]247pub struct NixHandler {248	spans: HashMap<u64, Span>,249}250fn process_message(m: &str) -> Cow<'_, str> {251	static OSC_CLEANER: Lazy<Regex> =252		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());253	OSC_CLEANER.replace_all(m, "")254}255impl Handler for NixHandler {256	fn handle_line(&mut self, e: &str) {257		if let Some(e) = e.strip_prefix("@nix ") {258			let log: NixLog = match serde_json::from_str(e) {259				Ok(l) => l,260				Err(err) => {261					warn!("failed to parse nix log line {:?}: {}", e, err);262					return;263				}264			};265			match log {266				NixLog::Msg { msg, raw_msg, .. } => {267					#[allow(clippy::nonminimal_bool)]268					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))269					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")270					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {271						if let Some(raw_msg) = raw_msg {272							if !msg.is_empty() {273								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())274							} else {275								info!(target: "nix", "{}", raw_msg.trim_end())276							}277						} else {278							info!(target: "nix", "{}", msg.trim_end())279						}280					}281				}282				NixLog::Start {283					ref fields,284					typ,285					id,286					..287				} if typ == 105 && !fields.is_empty() => {288					if let [LogField::String(drv), ..] = &fields[..] {289						let mut drv = drv.as_str();290						if let Some(pkg) = drv.strip_prefix("/nix/store/") {291							let mut it = pkg.splitn(2, '-');292							it.next();293							if let Some(pkg) = it.next() {294								drv = pkg;295							}296						}297						info!(target: "nix","building {}", drv);298						let span = info_span!("build", drv);299						span.pb_start();300						self.spans.insert(id, span);301					} else {302						warn!("bad build log: {:?}", log)303					}304				}305				NixLog::Start {306					ref fields,307					typ,308					id,309					..310				} if typ == 100 && fields.len() >= 3 => {311					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =312						&fields[..]313					{314						let mut drv = drv.as_str();315316						if let Some(pkg) = drv.strip_prefix("/nix/store/") {317							let mut it = pkg.splitn(2, '-');318							it.next();319							if let Some(pkg) = it.next() {320								drv = pkg;321							}322						}323						// info!(target: "nix","copying {} {} -> {}", drv, from, to);324						let span = info_span!("copy", from, to, drv);325						span.pb_start();326						self.spans.insert(id, span);327					} else {328						warn!("bad copy log: {:?}", log)329					}330				}331				NixLog::Start { text, typ, id, .. }332					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>333				{334					if !text.is_empty()335						&& text != "querying info about missing paths"336						&& text != "copying 0 paths"337					{338						let span = info_span!("job");339						span.pb_start();340						span.pb_set_message(&process_message(text.trim()));341						self.spans.insert(id, span);342						info!(target: "nix", "{}", text);343					}344				}345				NixLog::Start {346					text,347					level: 0,348					typ: 108,349					..350				} if text.is_empty() => {351					// Cache lookup? Coupled with copy log352				}353				NixLog::Start {354					text,355					level: 4,356					typ: 109,357					..358				} if text.starts_with("querying info about ") => {359					// Cache lookup360				}361				NixLog::Start {362					text,363					level: 4,364					typ: 101,365					..366				} if text.starts_with("downloading ") => {367					// NAR downloading, coupled with copy log368				}369				NixLog::Start {370					text,371					level: 1,372					typ: 111,373					..374				} if text.starts_with("waiting for a machine to build ") => {375					// Useless repeating notification about build376				}377				NixLog::Start {378					text,379					level: 3,380					typ: 111,381					..382				} if text.starts_with("resolved derivation: ") => {383					// CA resolved384				}385				NixLog::Start {386					text,387					level: 1,388					typ: 111,389					id,390					..391				} if text.starts_with("waiting for lock on ") => {392					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();393					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {394						drv = txt;395					}396					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {397						drv = txt;398					}399					if let Some(txt) = drv.split("', '").next() {400						drv = txt;401					}402					if let Some(pkg) = drv.strip_prefix("/nix/store/") {403						let mut it = pkg.splitn(2, '-');404						it.next();405						if let Some(pkg) = it.next() {406							drv = pkg;407						}408					}409					let span = info_span!("waiting on drv", drv);410					span.pb_start();411					self.spans.insert(id, span);412					// Concurrent build of the same message413				}414				NixLog::Stop { id, .. } => {415					self.spans.remove(&id);416				}417				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {418					if let Some(span) = self.spans.get(&id) {419						if let LogField::String(s) = &fields[0] {420							span.pb_set_message(&process_message(s.trim()));421						} else {422							warn!("bad fields: {fields:?}");423						}424					} else {425						warn!("unknown result id: {id} {typ} {fields:?}");426					}427					// dbg!(fields, id, typ);428				}429				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {430					if let Some(span) = self.spans.get(&id) {431						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =432							&fields[..4]433						{434							span.pb_set_length(*expected);435							span.pb_set_position(*done);436						} else {437							warn!("bad fields: {fields:?}");438						}439					} else {440						// warn!("unknown result id: {id} {typ} {fields:?}");441						// Unaccounted progress.442					}443					// dbg!(fields, id, typ);444				}445				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {446					// Set phase, expected447				}448				_ => warn!("unknown log: {:?}", log),449			};450		} else {451			let e = e.trim();452			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {453				return;454			}455			info!("{e}")456		}457	}458}459460async fn run_nix_inner_raw(461	str: String,462	mut cmd: Command,463	want_stdout: bool,464	err_handler: &mut dyn Handler,465	mut out_handler: Option<&mut dyn Handler>,466) -> Result<Option<String>> {467	cmd.stderr(Stdio::piped());468	cmd.stdout(Stdio::piped());469	let mut child = cmd.spawn()?;470	let mut stderr = child.stderr.take().unwrap();471	let stdout = child.stdout.take().unwrap();472	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());473	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));474	let mut ob = want_stdout475		.then(|| out.take().unwrap())476		.unwrap_or_else(|| Box::new(EmptyAsyncRead));477	let mut ol = (!want_stdout)478		.then(|| out.take().unwrap())479		.unwrap_or_else(|| Box::new(EmptyAsyncRead));480	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());481	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());482483	// while let Some(line) = read.next().await? {}484485	let mut out_buf = if want_stdout { Some(vec![]) } else { None };486	loop {487		select! {488			e = err.next() => {489				if let Some(e) = e {490					let e = e?;491					err_handler.handle_line(&e);492				}493			},494			o = ob.next() => {495				if let Some(o) = o {496					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);497				}498			},499			o = ol.next() => {500				if let Some(o) = o {501					let o = o?;502					if let Some(out) = out_handler.as_mut() {503						out.handle_line(&o)504					} else {505						err_handler.handle_line(&o)506					}507					// out_handler.handle_info(&o);508				}509			},510			code = child.wait() => {511				let code = code?;512				if !code.success() {513					anyhow::bail!("command '{str}' failed with status {}", code);514				}515				break;516			}517		}518	}519520	Ok(out_buf.map(String::from_utf8).transpose()?)521}522523pub trait ErrorRecorder: Send {524	/// Return true to discard message from logging525	fn push_message(&mut self, msg: &str) -> bool;526}527528#[derive(Debug)]529enum LogField {530	String(String),531	Num(u64),532}533534impl<'de> Deserialize<'de> for LogField {535	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>536	where537		D: serde::Deserializer<'de>,538	{539		struct StringOrNum;540		impl<'de> Visitor<'de> for StringOrNum {541			type Value = LogField;542543			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {544				write!(f, "string or unsigned")545			}546547			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>548			where549				E: serde::de::Error,550			{551				Ok(LogField::String(v.to_owned()))552			}553554			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>555			where556				E: serde::de::Error,557			{558				Ok(LogField::Num(v))559			}560		}561562		deserializer.deserialize_any(StringOrNum)563	}564}565566#[derive(Deserialize, Debug)]567#[serde(rename_all = "camelCase", tag = "action")]568#[allow(dead_code)]569enum NixLog {570	Msg {571		level: u32,572		msg: String,573		raw_msg: Option<String>,574	},575	Start {576		id: u64,577		level: u32,578		#[serde(default)]579		fields: Vec<LogField>,580		text: String,581		#[serde(rename = "type")]582		typ: u32,583	},584	Stop {585		id: u64,586	},587	Result {588		id: u64,589		#[serde(rename = "type")]590		typ: u32,591		#[serde(default)]592		fields: Vec<LogField>,593	},594}
after · cmds/fleet/src/command.rs
1use std::{2	collections::HashMap,3	ffi::OsStr,4	process::Stdio,5	sync::{Arc, Mutex},6	task::Poll,7};89use anyhow::{anyhow, Result};10use futures::StreamExt;11use itertools::Either;12use once_cell::sync::Lazy;13use openssh::{OverSsh, Session};14use regex::Regex;15use serde::{de::Visitor, Deserialize};16use tokio::{io::AsyncRead, process::Command, select};17use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};18use tracing::{info, info_span, warn, Span};19use tracing_indicatif::span_ext::IndicatifSpanExt;2021fn escape_bash(input: &str, out: &mut String) {22	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";23	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {24		out.push_str(input);25		return;26	}27	out.push('\'');28	for (i, v) in input.split('\'').enumerate() {29		if i != 0 {30			out.push_str("'\"'\"'");31		}32		out.push_str(v);33	}34	out.push('\'');35}36fn ostoutf8(os: impl AsRef<OsStr>) -> String {37	os.as_ref().to_str().expect("non-utf8 data").to_owned()38}39#[derive(Clone)]40pub struct MyCommand {41	command: String,42	args: Vec<String>,43	env: Vec<(String, String)>,44	ssh_session: Option<Arc<Session>>,45}46impl MyCommand {47	pub fn new(cmd: impl AsRef<OsStr>) -> Self {48		assert!(!cmd.as_ref().is_empty());49		Self {50			command: ostoutf8(cmd),51			args: vec![],52			env: vec![],53			ssh_session: None,54		}55	}56	fn into_args(self) -> Vec<String> {57		let mut out = Vec::new();58		if !self.env.is_empty() {59			out.push("env".to_owned());60			for (k, v) in self.env {61				assert!(!k.contains('='));62				out.push(format!("{k}={v}"));63			}64		}65		out.push(self.command);66		out.extend(self.args);67		out68	}69	fn into_string(self) -> String {70		let mut out = String::new();71		if !self.env.is_empty() {72			out.push_str("env");73			for (k, v) in self.env {74				out.push(' ');75				assert!(!k.contains('='));76				escape_bash(&k, &mut out);77				out.push('=');78				escape_bash(&v, &mut out);79			}80		}81		if !out.is_empty() {82			out.push(' ');83		}84		escape_bash(&self.command, &mut out);85		for arg in self.args {86			out.push(' ');87			escape_bash(&arg, &mut out);88		}89		out90	}91	fn into_command(self) -> Command {92		let mut out = Command::new(self.command);93		out.args(self.args);94		for (k, v) in self.env {95			out.env(k, v);96		}97		out98	}99	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {100		Ok(if let Some(session) = self.ssh_session.clone() {101			let cmd = self.into_command();102			Either::Right(103				cmd.over_ssh(session)104					.map_err(|e| anyhow!("ssh error: {e}"))?,105			)106		} else {107			let cmd = self.into_command();108			Either::Left(cmd)109		})110	}111	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {112		let arg = arg.as_ref();113		self.args.push(ostoutf8(arg));114		self115	}116	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {117		let arg = arg.as_ref();118		let value = value.as_ref();119		let arg = ostoutf8(arg);120		let value = ostoutf8(value);121		self.arg(format!("{arg}={value}"));122		self123	}124	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {125		self.arg(arg);126		self.arg(value);127		self128	}129	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {130		for arg in args.into_iter() {131			let arg = arg.as_ref();132			self.args.push(ostoutf8(arg));133		}134		self135	}136	pub fn sudo(self) -> Self {137		if std::env::var_os("NO_SUDO").is_some() {138			let mut out = Self::new("su");139			out.arg("-c").arg(self.into_string());140			out141		} else {142			let mut out = Self::new("sudo");143			out.args(self.into_args());144			out145		}146	}147	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {148		let mut out = Self::new("ssh");149		out.arg(on).arg("--");150		out.arg(self.into_string());151		out152	}153	pub fn over_ssh(mut self, session: Arc<Session>) -> Self {154		self.ssh_session = Some(session);155		self156	}157158	pub async fn run(self) -> Result<()> {159		let str = self.clone().into_string();160		let cmd = self.into_command();161		run_nix_inner(str, cmd, &mut PlainHandler).await?;162		Ok(())163	}164	pub async fn run_string(self) -> Result<String> {165		let str = self.clone().into_string();166		let cmd = self.into_command();167		let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;168		Ok(v)169	}170171	pub async fn run_nix_string(self) -> Result<String> {172		let str = self.clone().into_string();173		let mut cmd = self.into_command();174		cmd.arg("--log-format").arg("internal-json");175		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await176	}177	pub async fn run_nix(self) -> Result<()> {178		let str = self.clone().into_string();179		let mut cmd = self.into_command();180		cmd.arg("--log-format").arg("internal-json");181		cmd.stdout(Stdio::inherit());182		run_nix_inner(str, cmd, &mut NixHandler::default()).await183	}184}185186struct EmptyAsyncRead;187impl AsyncRead for EmptyAsyncRead {188	fn poll_read(189		self: std::pin::Pin<&mut Self>,190		_cx: &mut std::task::Context<'_>,191		_buf: &mut tokio::io::ReadBuf<'_>,192	) -> Poll<std::io::Result<()>> {193		Poll::Pending194	}195}196197async fn run_nix_inner_stdout(198	str: String,199	cmd: Command,200	handler: &mut dyn Handler,201) -> Result<String> {202	Ok(run_nix_inner_raw(str, cmd, true, handler, None)203		.await?204		.expect("has out"))205}206async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {207	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;208	assert!(v.is_none());209	Ok(())210}211212pub trait Handler: Send {213	fn handle_line(&mut self, e: &str);214}215216pub struct ClonableHandler<H>(Arc<Mutex<H>>);217impl<H> Clone for ClonableHandler<H> {218	fn clone(&self) -> Self {219		Self(self.0.clone())220	}221}222impl<H> ClonableHandler<H> {223	pub fn new(inner: H) -> Self {224		Self(Arc::new(Mutex::new(inner)))225	}226}227impl<H: Handler> Handler for ClonableHandler<H> {228	fn handle_line(&mut self, e: &str) {229		self.0.lock().unwrap().handle_line(e)230	}231}232233struct PlainHandler;234impl Handler for PlainHandler {235	fn handle_line(&mut self, e: &str) {236		info!(target: "log", "{e}");237	}238}239240pub struct NoopHandler;241impl Handler for NoopHandler {242	fn handle_line(&mut self, _e: &str) {}243}244245#[derive(Default)]246pub struct NixHandler {247	spans: HashMap<u64, Span>,248}249fn process_message(m: &str) -> String {250	static OSC_CLEANER: Lazy<Regex> =251		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());252	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());253	let m = OSC_CLEANER.replace_all(m, "");254	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,255	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.256	DETABBER.replace_all(m.as_ref(), "  ").to_string()257}258impl Handler for NixHandler {259	fn handle_line(&mut self, e: &str) {260		if let Some(e) = e.strip_prefix("@nix ") {261			let log: NixLog = match serde_json::from_str(e) {262				Ok(l) => l,263				Err(err) => {264					warn!("failed to parse nix log line {:?}: {}", e, err);265					return;266				}267			};268			match log {269				NixLog::Msg { msg, raw_msg, .. } => {270					#[allow(clippy::nonminimal_bool)]271					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))272					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")273					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {274						if let Some(raw_msg) = raw_msg {275							if !msg.is_empty() {276								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())277							} else {278								info!(target: "nix", "{}", raw_msg.trim_end())279							}280						} else {281							info!(target: "nix", "{}", msg.trim_end())282						}283					}284				}285				NixLog::Start {286					ref fields,287					typ,288					id,289					..290				} if typ == 105 && !fields.is_empty() => {291					if let [LogField::String(drv), ..] = &fields[..] {292						let mut drv = drv.as_str();293						if let Some(pkg) = drv.strip_prefix("/nix/store/") {294							let mut it = pkg.splitn(2, '-');295							it.next();296							if let Some(pkg) = it.next() {297								drv = pkg;298							}299						}300						info!(target: "nix","building {}", drv);301						let span = info_span!("build", drv);302						span.pb_start();303						self.spans.insert(id, span);304					} else {305						warn!("bad build log: {:?}", log)306					}307				}308				NixLog::Start {309					ref fields,310					typ,311					id,312					..313				} if typ == 100 && fields.len() >= 3 => {314					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =315						&fields[..]316					{317						let mut drv = drv.as_str();318319						if let Some(pkg) = drv.strip_prefix("/nix/store/") {320							let mut it = pkg.splitn(2, '-');321							it.next();322							if let Some(pkg) = it.next() {323								drv = pkg;324							}325						}326						// info!(target: "nix","copying {} {} -> {}", drv, from, to);327						let span = info_span!("copy", from, to, drv);328						span.pb_start();329						self.spans.insert(id, span);330					} else {331						warn!("bad copy log: {:?}", log)332					}333				}334				NixLog::Start { text, typ, id, .. }335					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>336				{337					if !text.is_empty()338						&& text != "querying info about missing paths"339						&& text != "copying 0 paths"340					{341						let span = info_span!("job");342						span.pb_start();343						span.pb_set_message(&process_message(text.trim()));344						self.spans.insert(id, span);345						info!(target: "nix", "{}", text);346					}347				}348				NixLog::Start {349					text,350					level: 0,351					typ: 108,352					..353				} if text.is_empty() => {354					// Cache lookup? Coupled with copy log355				}356				NixLog::Start {357					text,358					level: 4,359					typ: 109,360					..361				} if text.starts_with("querying info about ") => {362					// Cache lookup363				}364				NixLog::Start {365					text,366					level: 4,367					typ: 101,368					..369				} if text.starts_with("downloading ") => {370					// NAR downloading, coupled with copy log371				}372				NixLog::Start {373					text,374					level: 1,375					typ: 111,376					..377				} if text.starts_with("waiting for a machine to build ") => {378					// Useless repeating notification about build379				}380				NixLog::Start {381					text,382					level: 3,383					typ: 111,384					..385				} if text.starts_with("resolved derivation: ") => {386					// CA resolved387				}388				NixLog::Start {389					text,390					level: 1,391					typ: 111,392					id,393					..394				} if text.starts_with("waiting for lock on ") => {395					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();396					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {397						drv = txt;398					}399					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {400						drv = txt;401					}402					if let Some(txt) = drv.split("', '").next() {403						drv = txt;404					}405					if let Some(pkg) = drv.strip_prefix("/nix/store/") {406						let mut it = pkg.splitn(2, '-');407						it.next();408						if let Some(pkg) = it.next() {409							drv = pkg;410						}411					}412					let span = info_span!("waiting on drv", drv);413					span.pb_start();414					self.spans.insert(id, span);415					// Concurrent build of the same message416				}417				NixLog::Stop { id, .. } => {418					self.spans.remove(&id);419				}420				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {421					if let Some(span) = self.spans.get(&id) {422						if let LogField::String(s) = &fields[0] {423							span.pb_set_message(&process_message(s.trim()));424						} else {425							warn!("bad fields: {fields:?}");426						}427					} else {428						warn!("unknown result id: {id} {typ} {fields:?}");429					}430					// dbg!(fields, id, typ);431				}432				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {433					if let Some(span) = self.spans.get(&id) {434						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =435							&fields[..4]436						{437							span.pb_set_length(*expected);438							span.pb_set_position(*done);439						} else {440							warn!("bad fields: {fields:?}");441						}442					} else {443						// warn!("unknown result id: {id} {typ} {fields:?}");444						// Unaccounted progress.445					}446					// dbg!(fields, id, typ);447				}448				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {449					// Set phase, expected450				}451				_ => warn!("unknown log: {:?}", log),452			};453		} else {454			let e = e.trim();455			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {456				return;457			}458			info!("{e}")459		}460	}461}462463async fn run_nix_inner_raw(464	str: String,465	mut cmd: Command,466	want_stdout: bool,467	err_handler: &mut dyn Handler,468	mut out_handler: Option<&mut dyn Handler>,469) -> Result<Option<String>> {470	cmd.stderr(Stdio::piped());471	cmd.stdout(Stdio::piped());472	let mut child = cmd.spawn()?;473	let mut stderr = child.stderr.take().unwrap();474	let stdout = child.stdout.take().unwrap();475	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());476	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));477	let mut ob = want_stdout478		.then(|| out.take().unwrap())479		.unwrap_or_else(|| Box::new(EmptyAsyncRead));480	let mut ol = (!want_stdout)481		.then(|| out.take().unwrap())482		.unwrap_or_else(|| Box::new(EmptyAsyncRead));483	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());484	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());485486	// while let Some(line) = read.next().await? {}487488	let mut out_buf = if want_stdout { Some(vec![]) } else { None };489	loop {490		select! {491			e = err.next() => {492				if let Some(e) = e {493					let e = e?;494					err_handler.handle_line(&e);495				}496			},497			o = ob.next() => {498				if let Some(o) = o {499					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);500				}501			},502			o = ol.next() => {503				if let Some(o) = o {504					let o = o?;505					if let Some(out) = out_handler.as_mut() {506						out.handle_line(&o)507					} else {508						err_handler.handle_line(&o)509					}510					// out_handler.handle_info(&o);511				}512			},513			code = child.wait() => {514				let code = code?;515				if !code.success() {516					anyhow::bail!("command '{str}' failed with status {}", code);517				}518				break;519			}520		}521	}522523	Ok(out_buf.map(String::from_utf8).transpose()?)524}525526pub trait ErrorRecorder: Send {527	/// Return true to discard message from logging528	fn push_message(&mut self, msg: &str) -> bool;529}530531#[derive(Debug)]532enum LogField {533	String(String),534	Num(u64),535}536537impl<'de> Deserialize<'de> for LogField {538	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>539	where540		D: serde::Deserializer<'de>,541	{542		struct StringOrNum;543		impl<'de> Visitor<'de> for StringOrNum {544			type Value = LogField;545546			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {547				write!(f, "string or unsigned")548			}549550			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>551			where552				E: serde::de::Error,553			{554				Ok(LogField::String(v.to_owned()))555			}556557			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>558			where559				E: serde::de::Error,560			{561				Ok(LogField::Num(v))562			}563		}564565		deserializer.deserialize_any(StringOrNum)566	}567}568569#[derive(Deserialize, Debug)]570#[serde(rename_all = "camelCase", tag = "action")]571#[allow(dead_code)]572enum NixLog {573	Msg {574		level: u32,575		msg: String,576		raw_msg: Option<String>,577	},578	Start {579		id: u64,580		level: u32,581		#[serde(default)]582		fields: Vec<LogField>,583		text: String,584		#[serde(rename = "type")]585		typ: u32,586	},587	Stop {588		id: u64,589	},590	Result {591		id: u64,592		#[serde(rename = "type")]593		typ: u32,594		#[serde(default)]595		fields: Vec<LogField>,596	},597}
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -13,9 +13,10 @@
 use tempfile::NamedTempFile;
 
 use crate::{
-	better_nix_eval::{Field, NixSessionPool},
+	better_nix_eval::{Field, Index, NixSessionPool},
 	command::MyCommand,
 	fleetdata::{FleetData, FleetSecret, FleetSharedSecret},
+	nix_path,
 };
 
 pub struct FleetConfigInternals {
@@ -24,9 +25,9 @@
 	pub opts: FleetOpts,
 	pub data: Mutex<FleetData>,
 	pub nix_args: Vec<OsString>,
-	// fleetConfigurations.<name>
+	/// fleetConfigurations.<name>.<localSystem>
 	pub fleet_field: Field,
-	// fleet_config.configUnchecked
+	/// fleet_config.configUnchecked
 	pub config_field: Field,
 }
 
@@ -91,22 +92,12 @@
 			command = command.ssh(host);
 		}
 		command.run_string().await
-	}
-
-	pub fn configuration_attr_name(&self, name: &str) -> OsString {
-		let mut str = self.directory.as_os_str().to_owned();
-		str.push("#");
-		str.push(&format!(
-			"fleetConfigurations.default.{}.{}",
-			self.local_system, name
-		));
-		str
 	}
 
 	pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
 		let names = self
 			.fleet_field
-			.get_field_deep(["configuredHosts"])
+			.select(nix_path!(.configuredHosts))
 			.await?
 			.list_fields()
 			.await?;
@@ -118,7 +109,7 @@
 	}
 	pub async fn system_config(&self, host: &str) -> Result<Field> {
 		self.fleet_field
-			.get_field_deep(["configuredSystems", host, "config"])
+			.select(nix_path!(.configuredSystems.{host}.config))
 			.await
 	}
 
@@ -131,7 +122,7 @@
 	/// Shared secrets configured in fleet.nix or in flake
 	pub async fn list_configured_shared(&self) -> Result<Vec<String>> {
 		self.config_field
-			.get_field("sharedSecrets")
+			.select(nix_path!(.sharedSecrets))
 			.await?
 			.list_fields()
 			.await
@@ -221,7 +212,7 @@
 	}
 	pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {
 		self.config_field
-			.get_field_deep(["sharedSecrets", secret, "expectedOwners"])
+			.select(nix_path!(.sharedSecrets.{secret}.expectedOwners))
 			.await?
 			.as_json()
 			.await
@@ -279,7 +270,9 @@
 
 		if self.local_system == "detect" {
 			let builtins_field = Field::field(root_field.clone(), "builtins").await?;
-			let system = builtins_field.get_field("currentSystem").await?;
+			let system = builtins_field
+				.select(nix_path!(.currentSystem))
+				.await?;
 			self.local_system = system.as_json().await?;
 		}
 		let local_system = self.local_system.clone();
@@ -287,9 +280,11 @@
 		let fleet_root = Field::field(root_field, "fleetConfigurations").await?;
 
 		let fleet_field = fleet_root
-			.get_field_deep(["default", &local_system])
+			.select(nix_path!(.default.{&local_system}))
+			.await?;
+		let config_field = fleet_field
+			.select(nix_path!(.configUnchecked))
 			.await?;
-		let config_field = fleet_field.get_field("configUnchecked").await?;
 
 		let mut fleet_data_path = directory.clone();
 		fleet_data_path.push("fleet.nix");
modifiedcmds/fleet/src/main.rsdiffbeforeafterboth
--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -1,3 +1,4 @@
+#![recursion_limit = "512"]
 #![feature(try_blocks)]
 
 pub(crate) mod cmds;
modifiedflake.nixdiffbeforeafterboth
--- a/flake.nix
+++ b/flake.nix
@@ -19,6 +19,7 @@
       rustPlatform = pkgs.makeRustPlatform { cargo = rust; rustc = rust; };
     in
     {
+		packages = (import ./pkgs) pkgs pkgs;
       devShell = (pkgs.mkShell.override { stdenv = llvmPkgs.stdenv; }) {
         nativeBuildInputs = with pkgs; [
           rust