git.delta.rocks / jrsonnet / refs/commits / 97d9be65842c

difftreelog

feat libssh preparation

Yaroslav Bolyukin2023-11-17parent: #3f73827.patch.diff
in: trunk

7 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -651,6 +651,27 @@
 ]
 
 [[package]]
+name = "dirs"
+version = "5.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225"
+dependencies = [
+ "dirs-sys",
+]
+
+[[package]]
+name = "dirs-sys"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c"
+dependencies = [
+ "libc",
+ "option-ext",
+ "redox_users",
+ "windows-sys 0.48.0",
+]
+
+[[package]]
 name = "displaydoc"
 version = "0.2.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -731,13 +752,16 @@
  "clap",
  "futures",
  "hostname",
+ "human-repr",
  "indicatif",
  "itertools",
  "nixlike",
  "once_cell",
+ "openssh",
  "owo-colors",
  "peg",
  "r2d2",
+ "regex",
  "serde",
  "serde_json",
  "shlex",
@@ -1019,6 +1043,12 @@
 ]
 
 [[package]]
+name = "human-repr"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f58b778a5761513caf593693f8951c97a5b610841e754788400f32102eefdff1"
+
+[[package]]
 name = "humantime"
 version = "2.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1257,6 +1287,17 @@
 ]
 
 [[package]]
+name = "libredox"
+version = "0.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8"
+dependencies = [
+ "bitflags 2.4.1",
+ "libc",
+ "redox_syscall 0.4.1",
+]
+
+[[package]]
 name = "linked-hash-map"
 version = "0.5.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1480,6 +1521,28 @@
 checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
 
 [[package]]
+name = "openssh"
+version = "0.10.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3dfe68c42d6ee6bd9de175b7a5d9bb86aa99d4e2fa7cf2f2a44e97f60b6d2759"
+dependencies = [
+ "dirs",
+ "libc",
+ "once_cell",
+ "shell-escape",
+ "tempfile",
+ "thiserror",
+ "tokio",
+ "tokio-pipe",
+]
+
+[[package]]
+name = "option-ext"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
+
+[[package]]
 name = "overload"
 version = "0.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1804,15 +1867,26 @@
 ]
 
 [[package]]
+name = "redox_users"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a18479200779601e498ada4e8c1e1f50e3ee19deb0259c25825a98b5603b2cb4"
+dependencies = [
+ "getrandom 0.2.10",
+ "libredox",
+ "thiserror",
+]
+
+[[package]]
 name = "regex"
-version = "1.9.5"
+version = "1.10.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47"
+checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343"
 dependencies = [
  "aho-corasick",
  "memchr",
- "regex-automata 0.3.8",
- "regex-syntax 0.7.5",
+ "regex-automata 0.4.3",
+ "regex-syntax 0.8.2",
 ]
 
 [[package]]
@@ -1826,13 +1900,13 @@
 
 [[package]]
 name = "regex-automata"
-version = "0.3.8"
+version = "0.4.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795"
+checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f"
 dependencies = [
  "aho-corasick",
  "memchr",
- "regex-syntax 0.7.5",
+ "regex-syntax 0.8.2",
 ]
 
 [[package]]
@@ -1843,9 +1917,9 @@
 
 [[package]]
 name = "regex-syntax"
-version = "0.7.5"
+version = "0.8.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da"
+checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
 
 [[package]]
 name = "rnix"
@@ -2099,6 +2173,12 @@
 ]
 
 [[package]]
+name = "shell-escape"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f"
+
+[[package]]
 name = "shlex"
 version = "1.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2379,6 +2459,16 @@
 ]
 
 [[package]]
+name = "tokio-pipe"
+version = "0.2.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f213a84bffbd61b8fa0ba8a044b4bbe35d471d0b518867181e82bd5c15542784"
+dependencies = [
+ "libc",
+ "tokio",
+]
+
+[[package]]
 name = "tokio-util"
 version = "0.7.10"
 source = "registry+https://github.com/rust-lang/crates.io-index"
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -11,7 +11,7 @@
 serde_json = "1.0"
 time = { version = "0.3.30", features = ["serde"] }
 tempfile = "3.8"
-once_cell = "1.18"
+once_cell = "1.18.0"
 hostname = "0.3.1"
 age-core = "0.9.0"
 peg = "0.8.2"
@@ -41,3 +41,6 @@
 r2d2 = "0.8.10"
 abort-on-drop = "0.2.2"
 unindent = "0.2.3"
+regex = "1.10.2"
+openssh = "0.10.1"
+human-repr = "1.1.0"
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -336,15 +336,14 @@
 				if !config.is_local(&host) {
 					info!("uploading system closure");
 					{
-						let mut sign = MyCommand::new("sudo");
+						let mut sign = MyCommand::new("nix");
 						// Private key for host machine is registered in nix-sign.nix
-						sign.arg("nix")
-							.arg("store")
+						sign.arg("store")
 							.arg("sign")
 							.comparg("--key-file", "/etc/nix/private-key")
 							.arg("-r")
 							.arg(&built);
-						if let Err(e) = sign.run_nix().await {
+						if let Err(e) = sign.sudo().run_nix().await {
 							warn!("Failed to sign store paths: {e}");
 						};
 					}
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
before · cmds/fleet/src/command.rs
1use std::{2	collections::HashMap,3	ffi::OsStr,4	process::Stdio,5	sync::{Arc, Mutex},6	task::Poll,7};89use anyhow::Result;10use futures::StreamExt;11use serde::{de::Visitor, Deserialize};12use tokio::{io::AsyncRead, process::Command, select};13use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};14use tracing::{info, info_span, warn, Span};15use tracing_indicatif::span_ext::IndicatifSpanExt;1617fn escape_bash(input: &str, out: &mut String) {18	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";19	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {20		out.push_str(input);21		return;22	}23	out.push('\'');24	for (i, v) in input.split('\'').enumerate() {25		if i != 0 {26			out.push_str("'\"'\"'");27		}28		out.push_str(v);29	}30	out.push('\'');31}32fn ostoutf8(os: impl AsRef<OsStr>) -> String {33	os.as_ref().to_str().expect("non-utf8 data").to_owned()34}35#[derive(Clone)]36pub struct MyCommand {37	command: String,38	args: Vec<String>,39	env: Vec<(String, String)>,40}41impl MyCommand {42	pub fn new(cmd: impl AsRef<OsStr>) -> Self {43		assert!(!cmd.as_ref().is_empty());44		Self {45			command: ostoutf8(cmd),46			args: vec![],47			env: vec![],48		}49	}50	fn into_args(self) -> Vec<String> {51		let mut out = Vec::new();52		if !self.env.is_empty() {53			out.push("env".to_owned());54			for (k, v) in self.env {55				assert!(!k.contains('='));56				out.push(format!("{k}={v}"));57			}58		}59		out.push(self.command);60		out.extend(self.args);61		out62	}63	fn into_string(self) -> String {64		let mut out = String::new();65		if !self.env.is_empty() {66			out.push_str("env");67			for (k, v) in self.env {68				out.push(' ');69				assert!(!k.contains('='));70				escape_bash(&k, &mut out);71				out.push('=');72				escape_bash(&v, &mut out);73			}74		}75		if !out.is_empty() {76			out.push(' ');77		}78		escape_bash(&self.command, &mut out);79		for arg in self.args {80			out.push(' ');81			escape_bash(&arg, &mut out);82		}83		out84	}85	fn into_command(self) -> Command {86		let mut out = Command::new(self.command);87		out.args(self.args);88		for (k, v) in self.env {89			out.env(k, v);90		}91		out92	}93	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {94		let arg = arg.as_ref();95		self.args.push(ostoutf8(arg));96		self97	}98	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {99		let arg = arg.as_ref();100		let value = value.as_ref();101		let arg = ostoutf8(arg);102		let value = ostoutf8(value);103		self.arg(format!("{arg}={value}"));104		self105	}106	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {107		self.arg(arg);108		self.arg(value);109		self110	}111	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {112		for arg in args.into_iter() {113			let arg = arg.as_ref();114			self.args.push(ostoutf8(arg));115		}116		self117	}118	pub fn sudo(self) -> Self {119		let mut out = Self::new("sudo");120		out.args(self.into_args());121		out122	}123	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {124		let mut out = Self::new("ssh");125		out.arg(on).arg("--");126		out.arg(self.into_string());127		out128	}129130	pub async fn run(self) -> Result<()> {131		let str = self.clone().into_string();132		let cmd = self.into_command();133		run_nix_inner(str, cmd, &mut PlainHandler).await?;134		Ok(())135	}136	pub async fn run_string(self) -> Result<String> {137		let str = self.clone().into_string();138		let cmd = self.into_command();139		let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;140		Ok(v)141	}142143	pub async fn run_nix_string(self) -> Result<String> {144		let str = self.clone().into_string();145		let mut cmd = self.into_command();146		cmd.arg("--log-format").arg("internal-json");147		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await148	}149	pub async fn run_nix(self) -> Result<()> {150		let str = self.clone().into_string();151		let mut cmd = self.into_command();152		cmd.arg("--log-format").arg("internal-json");153		cmd.stdout(Stdio::inherit());154		run_nix_inner(str, cmd, &mut NixHandler::default()).await155	}156}157158struct EmptyAsyncRead;159impl AsyncRead for EmptyAsyncRead {160	fn poll_read(161		self: std::pin::Pin<&mut Self>,162		_cx: &mut std::task::Context<'_>,163		_buf: &mut tokio::io::ReadBuf<'_>,164	) -> Poll<std::io::Result<()>> {165		Poll::Pending166	}167}168169async fn run_nix_inner_stdout(170	str: String,171	cmd: Command,172	handler: &mut dyn Handler,173) -> Result<String> {174	Ok(run_nix_inner_raw(str, cmd, true, handler, None)175		.await?176		.expect("has out"))177}178async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {179	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;180	assert!(v.is_none());181	Ok(())182}183184pub trait Handler: Send {185	fn handle_line(&mut self, e: &str);186}187188pub struct ClonableHandler<H>(Arc<Mutex<H>>);189impl<H> Clone for ClonableHandler<H> {190	fn clone(&self) -> Self {191		Self(self.0.clone())192	}193}194impl<H> ClonableHandler<H> {195	pub fn new(inner: H) -> Self {196		Self(Arc::new(Mutex::new(inner)))197	}198}199impl<H: Handler> Handler for ClonableHandler<H> {200	fn handle_line(&mut self, e: &str) {201		self.0.lock().unwrap().handle_line(e)202	}203}204205struct PlainHandler;206impl Handler for PlainHandler {207	fn handle_line(&mut self, e: &str) {208		info!(target: "log", "{e}");209	}210}211212pub struct NoopHandler;213impl Handler for NoopHandler {214	fn handle_line(&mut self, _e: &str) {}215}216217#[derive(Default)]218pub struct NixHandler {219	spans: HashMap<u64, Span>,220}221impl Handler for NixHandler {222	fn handle_line(&mut self, e: &str) {223		if let Some(e) = e.strip_prefix("@nix ") {224			let log: NixLog = match serde_json::from_str(e) {225				Ok(l) => l,226				Err(err) => {227					warn!("failed to parse nix log line {:?}: {}", e, err);228					return;229				}230			};231			match log {232				NixLog::Msg { msg, raw_msg, .. } => {233					#[allow(clippy::nonminimal_bool)]234					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))235					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")236					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {237						if let Some(raw_msg) = raw_msg {238							if !msg.is_empty() {239								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())240							} else {241								info!(target: "nix", "{}", raw_msg.trim_end())242							}243						} else {244							info!(target: "nix", "{}", msg.trim_end())245						}246					}247				}248				NixLog::Start {249					ref fields,250					typ,251					id,252					..253				} if typ == 105 && !fields.is_empty() => {254					if let [LogField::String(drv), ..] = &fields[..] {255						let mut drv = drv.as_str();256						if let Some(pkg) = drv.strip_prefix("/nix/store/") {257							let mut it = pkg.splitn(2, '-');258							it.next();259							if let Some(pkg) = it.next() {260								drv = pkg;261							}262						}263						info!(target: "nix","building {}", drv);264						let span = info_span!("build", drv);265						span.pb_start();266						self.spans.insert(id, span);267					} else {268						warn!("bad build log: {:?}", log)269					}270				}271				NixLog::Start {272					ref fields,273					typ,274					id,275					..276				} if typ == 100 && fields.len() >= 3 => {277					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =278						&fields[..]279					{280						let mut drv = drv.as_str();281282						if let Some(pkg) = drv.strip_prefix("/nix/store/") {283							let mut it = pkg.splitn(2, '-');284							it.next();285							if let Some(pkg) = it.next() {286								drv = pkg;287							}288						}289						// info!(target: "nix","copying {} {} -> {}", drv, from, to);290						let span = info_span!("copy", from, to, drv);291						span.pb_start();292						self.spans.insert(id, span);293					} else {294						warn!("bad copy log: {:?}", log)295					}296				}297				NixLog::Start { text, typ, id, .. }298					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>299				{300					if !text.is_empty()301						&& text != "querying info about missing paths"302						&& text != "copying 0 paths"303					{304						let span = info_span!("job");305						span.pb_start();306						span.pb_set_message(text.trim());307						self.spans.insert(id, span);308						info!(target: "nix", "{}", text);309					}310				}311				NixLog::Start {312					text,313					level: 0,314					typ: 108,315					..316				} if text.is_empty() => {317					// Cache lookup? Coupled with copy log318				}319				NixLog::Start {320					text,321					level: 4,322					typ: 109,323					..324				} if text.starts_with("querying info about ") => {325					// Cache lookup326				}327				NixLog::Start {328					text,329					level: 4,330					typ: 101,331					..332				} if text.starts_with("downloading ") => {333					// NAR downloading, coupled with copy log334				}335				NixLog::Start {336					text,337					level: 1,338					typ: 111,339					..340				} if text.starts_with("waiting for a machine to build ") => {341					// Useless repeating notification about build342				}343				NixLog::Start {344					text,345					level: 3,346					typ: 111,347					..348				} if text.starts_with("resolved derivation: ") => {349					// CA resolved350				}351				NixLog::Start {352					text,353					level: 1,354					typ: 111,355					id,356					..357				} if text.starts_with("waiting for lock on ") => {358					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();359					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {360						drv = txt;361					}362					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {363						drv = txt;364					}365					if let Some(txt) = drv.split("', '").next() {366						drv = txt;367					}368					if let Some(pkg) = drv.strip_prefix("/nix/store/") {369						let mut it = pkg.splitn(2, '-');370						it.next();371						if let Some(pkg) = it.next() {372							drv = pkg;373						}374					}375					let span = info_span!("waiting on drv", drv);376					span.pb_start();377					self.spans.insert(id, span);378					// Concurrent build of the same message379				}380				NixLog::Stop { id, .. } => {381					self.spans.remove(&id);382				}383				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {384					if let Some(span) = self.spans.get(&id) {385						if let LogField::String(s) = &fields[0] {386							span.pb_set_message(s.trim());387						} else {388							warn!("bad fields: {fields:?}");389						}390					} else {391						warn!("unknown result id: {id} {typ} {fields:?}");392					}393					// dbg!(fields, id, typ);394				}395				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {396					if let Some(span) = self.spans.get(&id) {397						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =398							&fields[..4]399						{400							span.pb_set_length(*expected);401							span.pb_set_position(*done);402						} else {403							warn!("bad fields: {fields:?}");404						}405					} else {406						// warn!("unknown result id: {id} {typ} {fields:?}");407						// Unaccounted progress.408					}409					// dbg!(fields, id, typ);410				}411				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {412					// Set phase, expected413				}414				_ => warn!("unknown log: {:?}", log),415			};416		} else {417			let e = e.trim();418			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {419				return;420			}421			info!("{e}")422		}423	}424}425426async fn run_nix_inner_raw(427	str: String,428	mut cmd: Command,429	want_stdout: bool,430	err_handler: &mut dyn Handler,431	mut out_handler: Option<&mut dyn Handler>,432) -> Result<Option<String>> {433	cmd.stderr(Stdio::piped());434	cmd.stdout(Stdio::piped());435	let mut child = cmd.spawn()?;436	let mut stderr = child.stderr.take().unwrap();437	let stdout = child.stdout.take().unwrap();438	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());439	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));440	let mut ob = want_stdout441		.then(|| out.take().unwrap())442		.unwrap_or_else(|| Box::new(EmptyAsyncRead));443	let mut ol = (!want_stdout)444		.then(|| out.take().unwrap())445		.unwrap_or_else(|| Box::new(EmptyAsyncRead));446	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());447	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());448449	// while let Some(line) = read.next().await? {}450451	let mut out_buf = if want_stdout { Some(vec![]) } else { None };452	loop {453		select! {454			e = err.next() => {455				if let Some(e) = e {456					let e = e?;457					err_handler.handle_line(&e);458				}459			},460			o = ob.next() => {461				if let Some(o) = o {462					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);463				}464			},465			o = ol.next() => {466				if let Some(o) = o {467					let o = o?;468					if let Some(out) = out_handler.as_mut() {469						out.handle_line(&o)470					} else {471						err_handler.handle_line(&o)472					}473					// out_handler.handle_info(&o);474				}475			},476			code = child.wait() => {477				let code = code?;478				if !code.success() {479					anyhow::bail!("command '{str}' failed with status {}", code);480				}481				break;482			}483		}484	}485486	Ok(out_buf.map(String::from_utf8).transpose()?)487}488489pub trait ErrorRecorder: Send {490	/// Return true to discard message from logging491	fn push_message(&mut self, msg: &str) -> bool;492}493494#[derive(Debug)]495enum LogField {496	String(String),497	Num(u64),498}499500impl<'de> Deserialize<'de> for LogField {501	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>502	where503		D: serde::Deserializer<'de>,504	{505		struct StringOrNum;506		impl<'de> Visitor<'de> for StringOrNum {507			type Value = LogField;508509			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {510				write!(f, "string or unsigned")511			}512513			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>514			where515				E: serde::de::Error,516			{517				Ok(LogField::String(v.to_owned()))518			}519520			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>521			where522				E: serde::de::Error,523			{524				Ok(LogField::Num(v))525			}526		}527528		deserializer.deserialize_any(StringOrNum)529	}530}531532#[derive(Deserialize, Debug)]533#[serde(rename_all = "camelCase", tag = "action")]534#[allow(dead_code)]535enum NixLog {536	Msg {537		level: u32,538		msg: String,539		raw_msg: Option<String>,540	},541	Start {542		id: u64,543		level: u32,544		#[serde(default)]545		fields: Vec<LogField>,546		text: String,547		#[serde(rename = "type")]548		typ: u32,549	},550	Stop {551		id: u64,552	},553	Result {554		id: u64,555		#[serde(rename = "type")]556		typ: u32,557		#[serde(default)]558		fields: Vec<LogField>,559	},560}
after · cmds/fleet/src/command.rs
1use std::{2	borrow::Cow,3	collections::HashMap,4	ffi::OsStr,5	process::Stdio,6	sync::{Arc, Mutex},7	task::Poll,8};910use anyhow::{anyhow, Result};11use futures::StreamExt;12use itertools::Either;13use once_cell::sync::Lazy;14use openssh::{OverSsh, Session};15use regex::Regex;16use serde::{de::Visitor, Deserialize};17use tokio::{io::AsyncRead, process::Command, select};18use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};19use tracing::{info, info_span, warn, Span};20use tracing_indicatif::span_ext::IndicatifSpanExt;2122fn escape_bash(input: &str, out: &mut String) {23	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";24	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {25		out.push_str(input);26		return;27	}28	out.push('\'');29	for (i, v) in input.split('\'').enumerate() {30		if i != 0 {31			out.push_str("'\"'\"'");32		}33		out.push_str(v);34	}35	out.push('\'');36}37fn ostoutf8(os: impl AsRef<OsStr>) -> String {38	os.as_ref().to_str().expect("non-utf8 data").to_owned()39}40#[derive(Clone)]41pub struct MyCommand {42	command: String,43	args: Vec<String>,44	env: Vec<(String, String)>,45	ssh_session: Option<Arc<Session>>,46}47impl MyCommand {48	pub fn new(cmd: impl AsRef<OsStr>) -> Self {49		assert!(!cmd.as_ref().is_empty());50		Self {51			command: ostoutf8(cmd),52			args: vec![],53			env: vec![],54			ssh_session: None,55		}56	}57	fn into_args(self) -> Vec<String> {58		let mut out = Vec::new();59		if !self.env.is_empty() {60			out.push("env".to_owned());61			for (k, v) in self.env {62				assert!(!k.contains('='));63				out.push(format!("{k}={v}"));64			}65		}66		out.push(self.command);67		out.extend(self.args);68		out69	}70	fn into_string(self) -> String {71		let mut out = String::new();72		if !self.env.is_empty() {73			out.push_str("env");74			for (k, v) in self.env {75				out.push(' ');76				assert!(!k.contains('='));77				escape_bash(&k, &mut out);78				out.push('=');79				escape_bash(&v, &mut out);80			}81		}82		if !out.is_empty() {83			out.push(' ');84		}85		escape_bash(&self.command, &mut out);86		for arg in self.args {87			out.push(' ');88			escape_bash(&arg, &mut out);89		}90		out91	}92	fn into_command(self) -> Command {93		let mut out = Command::new(self.command);94		out.args(self.args);95		for (k, v) in self.env {96			out.env(k, v);97		}98		out99	}100	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {101		Ok(if let Some(session) = self.ssh_session.clone() {102			let cmd = self.into_command();103			Either::Right(104				cmd.over_ssh(session)105					.map_err(|e| anyhow!("ssh error: {e}"))?,106			)107		} else {108			let cmd = self.into_command();109			Either::Left(cmd)110		})111	}112	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {113		let arg = arg.as_ref();114		self.args.push(ostoutf8(arg));115		self116	}117	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {118		let arg = arg.as_ref();119		let value = value.as_ref();120		let arg = ostoutf8(arg);121		let value = ostoutf8(value);122		self.arg(format!("{arg}={value}"));123		self124	}125	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {126		self.arg(arg);127		self.arg(value);128		self129	}130	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {131		for arg in args.into_iter() {132			let arg = arg.as_ref();133			self.args.push(ostoutf8(arg));134		}135		self136	}137	pub fn sudo(self) -> Self {138		if std::env::var_os("NO_SUDO").is_some() {139			let mut out = Self::new("su");140			out.arg("-c").arg(self.into_string());141			out142		} else {143			let mut out = Self::new("sudo");144			out.args(self.into_args());145			out146		}147	}148	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {149		let mut out = Self::new("ssh");150		out.arg(on).arg("--");151		out.arg(self.into_string());152		out153	}154	pub fn over_ssh(mut self, session: Arc<Session>) -> Self {155		self.ssh_session = Some(session);156		self157	}158159	pub async fn run(self) -> Result<()> {160		let str = self.clone().into_string();161		let cmd = self.into_command();162		run_nix_inner(str, cmd, &mut PlainHandler).await?;163		Ok(())164	}165	pub async fn run_string(self) -> Result<String> {166		let str = self.clone().into_string();167		let cmd = self.into_command();168		let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;169		Ok(v)170	}171172	pub async fn run_nix_string(self) -> Result<String> {173		let str = self.clone().into_string();174		let mut cmd = self.into_command();175		cmd.arg("--log-format").arg("internal-json");176		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await177	}178	pub async fn run_nix(self) -> Result<()> {179		let str = self.clone().into_string();180		let mut cmd = self.into_command();181		cmd.arg("--log-format").arg("internal-json");182		cmd.stdout(Stdio::inherit());183		run_nix_inner(str, cmd, &mut NixHandler::default()).await184	}185}186187struct EmptyAsyncRead;188impl AsyncRead for EmptyAsyncRead {189	fn poll_read(190		self: std::pin::Pin<&mut Self>,191		_cx: &mut std::task::Context<'_>,192		_buf: &mut tokio::io::ReadBuf<'_>,193	) -> Poll<std::io::Result<()>> {194		Poll::Pending195	}196}197198async fn run_nix_inner_stdout(199	str: String,200	cmd: Command,201	handler: &mut dyn Handler,202) -> Result<String> {203	Ok(run_nix_inner_raw(str, cmd, true, handler, None)204		.await?205		.expect("has out"))206}207async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {208	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;209	assert!(v.is_none());210	Ok(())211}212213pub trait Handler: Send {214	fn handle_line(&mut self, e: &str);215}216217pub struct ClonableHandler<H>(Arc<Mutex<H>>);218impl<H> Clone for ClonableHandler<H> {219	fn clone(&self) -> Self {220		Self(self.0.clone())221	}222}223impl<H> ClonableHandler<H> {224	pub fn new(inner: H) -> Self {225		Self(Arc::new(Mutex::new(inner)))226	}227}228impl<H: Handler> Handler for ClonableHandler<H> {229	fn handle_line(&mut self, e: &str) {230		self.0.lock().unwrap().handle_line(e)231	}232}233234struct PlainHandler;235impl Handler for PlainHandler {236	fn handle_line(&mut self, e: &str) {237		info!(target: "log", "{e}");238	}239}240241pub struct NoopHandler;242impl Handler for NoopHandler {243	fn handle_line(&mut self, _e: &str) {}244}245246#[derive(Default)]247pub struct NixHandler {248	spans: HashMap<u64, Span>,249}250fn process_message(m: &str) -> Cow<'_, str> {251	static OSC_CLEANER: Lazy<Regex> =252		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());253	OSC_CLEANER.replace_all(m, "")254}255impl Handler for NixHandler {256	fn handle_line(&mut self, e: &str) {257		if let Some(e) = e.strip_prefix("@nix ") {258			let log: NixLog = match serde_json::from_str(e) {259				Ok(l) => l,260				Err(err) => {261					warn!("failed to parse nix log line {:?}: {}", e, err);262					return;263				}264			};265			match log {266				NixLog::Msg { msg, raw_msg, .. } => {267					#[allow(clippy::nonminimal_bool)]268					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))269					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")270					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {271						if let Some(raw_msg) = raw_msg {272							if !msg.is_empty() {273								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())274							} else {275								info!(target: "nix", "{}", raw_msg.trim_end())276							}277						} else {278							info!(target: "nix", "{}", msg.trim_end())279						}280					}281				}282				NixLog::Start {283					ref fields,284					typ,285					id,286					..287				} if typ == 105 && !fields.is_empty() => {288					if let [LogField::String(drv), ..] = &fields[..] {289						let mut drv = drv.as_str();290						if let Some(pkg) = drv.strip_prefix("/nix/store/") {291							let mut it = pkg.splitn(2, '-');292							it.next();293							if let Some(pkg) = it.next() {294								drv = pkg;295							}296						}297						info!(target: "nix","building {}", drv);298						let span = info_span!("build", drv);299						span.pb_start();300						self.spans.insert(id, span);301					} else {302						warn!("bad build log: {:?}", log)303					}304				}305				NixLog::Start {306					ref fields,307					typ,308					id,309					..310				} if typ == 100 && fields.len() >= 3 => {311					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =312						&fields[..]313					{314						let mut drv = drv.as_str();315316						if let Some(pkg) = drv.strip_prefix("/nix/store/") {317							let mut it = pkg.splitn(2, '-');318							it.next();319							if let Some(pkg) = it.next() {320								drv = pkg;321							}322						}323						// info!(target: "nix","copying {} {} -> {}", drv, from, to);324						let span = info_span!("copy", from, to, drv);325						span.pb_start();326						self.spans.insert(id, span);327					} else {328						warn!("bad copy log: {:?}", log)329					}330				}331				NixLog::Start { text, typ, id, .. }332					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>333				{334					if !text.is_empty()335						&& text != "querying info about missing paths"336						&& text != "copying 0 paths"337					{338						let span = info_span!("job");339						span.pb_start();340						span.pb_set_message(&process_message(text.trim()));341						self.spans.insert(id, span);342						info!(target: "nix", "{}", text);343					}344				}345				NixLog::Start {346					text,347					level: 0,348					typ: 108,349					..350				} if text.is_empty() => {351					// Cache lookup? Coupled with copy log352				}353				NixLog::Start {354					text,355					level: 4,356					typ: 109,357					..358				} if text.starts_with("querying info about ") => {359					// Cache lookup360				}361				NixLog::Start {362					text,363					level: 4,364					typ: 101,365					..366				} if text.starts_with("downloading ") => {367					// NAR downloading, coupled with copy log368				}369				NixLog::Start {370					text,371					level: 1,372					typ: 111,373					..374				} if text.starts_with("waiting for a machine to build ") => {375					// Useless repeating notification about build376				}377				NixLog::Start {378					text,379					level: 3,380					typ: 111,381					..382				} if text.starts_with("resolved derivation: ") => {383					// CA resolved384				}385				NixLog::Start {386					text,387					level: 1,388					typ: 111,389					id,390					..391				} if text.starts_with("waiting for lock on ") => {392					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();393					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {394						drv = txt;395					}396					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {397						drv = txt;398					}399					if let Some(txt) = drv.split("', '").next() {400						drv = txt;401					}402					if let Some(pkg) = drv.strip_prefix("/nix/store/") {403						let mut it = pkg.splitn(2, '-');404						it.next();405						if let Some(pkg) = it.next() {406							drv = pkg;407						}408					}409					let span = info_span!("waiting on drv", drv);410					span.pb_start();411					self.spans.insert(id, span);412					// Concurrent build of the same message413				}414				NixLog::Stop { id, .. } => {415					self.spans.remove(&id);416				}417				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {418					if let Some(span) = self.spans.get(&id) {419						if let LogField::String(s) = &fields[0] {420							span.pb_set_message(&process_message(s.trim()));421						} else {422							warn!("bad fields: {fields:?}");423						}424					} else {425						warn!("unknown result id: {id} {typ} {fields:?}");426					}427					// dbg!(fields, id, typ);428				}429				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {430					if let Some(span) = self.spans.get(&id) {431						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =432							&fields[..4]433						{434							span.pb_set_length(*expected);435							span.pb_set_position(*done);436						} else {437							warn!("bad fields: {fields:?}");438						}439					} else {440						// warn!("unknown result id: {id} {typ} {fields:?}");441						// Unaccounted progress.442					}443					// dbg!(fields, id, typ);444				}445				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {446					// Set phase, expected447				}448				_ => warn!("unknown log: {:?}", log),449			};450		} else {451			let e = e.trim();452			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {453				return;454			}455			info!("{e}")456		}457	}458}459460async fn run_nix_inner_raw(461	str: String,462	mut cmd: Command,463	want_stdout: bool,464	err_handler: &mut dyn Handler,465	mut out_handler: Option<&mut dyn Handler>,466) -> Result<Option<String>> {467	cmd.stderr(Stdio::piped());468	cmd.stdout(Stdio::piped());469	let mut child = cmd.spawn()?;470	let mut stderr = child.stderr.take().unwrap();471	let stdout = child.stdout.take().unwrap();472	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());473	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));474	let mut ob = want_stdout475		.then(|| out.take().unwrap())476		.unwrap_or_else(|| Box::new(EmptyAsyncRead));477	let mut ol = (!want_stdout)478		.then(|| out.take().unwrap())479		.unwrap_or_else(|| Box::new(EmptyAsyncRead));480	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());481	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());482483	// while let Some(line) = read.next().await? {}484485	let mut out_buf = if want_stdout { Some(vec![]) } else { None };486	loop {487		select! {488			e = err.next() => {489				if let Some(e) = e {490					let e = e?;491					err_handler.handle_line(&e);492				}493			},494			o = ob.next() => {495				if let Some(o) = o {496					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);497				}498			},499			o = ol.next() => {500				if let Some(o) = o {501					let o = o?;502					if let Some(out) = out_handler.as_mut() {503						out.handle_line(&o)504					} else {505						err_handler.handle_line(&o)506					}507					// out_handler.handle_info(&o);508				}509			},510			code = child.wait() => {511				let code = code?;512				if !code.success() {513					anyhow::bail!("command '{str}' failed with status {}", code);514				}515				break;516			}517		}518	}519520	Ok(out_buf.map(String::from_utf8).transpose()?)521}522523pub trait ErrorRecorder: Send {524	/// Return true to discard message from logging525	fn push_message(&mut self, msg: &str) -> bool;526}527528#[derive(Debug)]529enum LogField {530	String(String),531	Num(u64),532}533534impl<'de> Deserialize<'de> for LogField {535	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>536	where537		D: serde::Deserializer<'de>,538	{539		struct StringOrNum;540		impl<'de> Visitor<'de> for StringOrNum {541			type Value = LogField;542543			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {544				write!(f, "string or unsigned")545			}546547			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>548			where549				E: serde::de::Error,550			{551				Ok(LogField::String(v.to_owned()))552			}553554			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>555			where556				E: serde::de::Error,557			{558				Ok(LogField::Num(v))559			}560		}561562		deserializer.deserialize_any(StringOrNum)563	}564}565566#[derive(Deserialize, Debug)]567#[serde(rename_all = "camelCase", tag = "action")]568#[allow(dead_code)]569enum NixLog {570	Msg {571		level: u32,572		msg: String,573		raw_msg: Option<String>,574	},575	Start {576		id: u64,577		level: u32,578		#[serde(default)]579		fields: Vec<LogField>,580		text: String,581		#[serde(rename = "type")]582		typ: u32,583	},584	Stop {585		id: u64,586	},587	Result {588		id: u64,589		#[serde(rename = "type")]590		typ: u32,591		#[serde(default)]592		fields: Vec<LogField>,593	},594}
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -7,8 +7,9 @@
 	sync::{Arc, Mutex, MutexGuard},
 };
 
-use anyhow::{bail, Context, Result};
+use anyhow::{anyhow, bail, Context, Result};
 use clap::{ArgGroup, Parser};
+use openssh::SessionBuilder;
 use tempfile::NamedTempFile;
 
 use crate::{
@@ -43,6 +44,16 @@
 pub struct ConfigHost {
 	pub name: String,
 }
+impl ConfigHost {
+	async fn open_session(&self) -> Result<openssh::Session> {
+		let mut session = SessionBuilder::default();
+
+		session
+			.connect(&self.name)
+			.await
+			.map_err(|e| anyhow!("ssh error: {e}"))
+	}
+}
 
 impl Config {
 	pub fn should_skip(&self, host: &str) -> bool {
@@ -93,21 +104,22 @@
 	}
 
 	pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
-		let names = self.fleet_field
+		let names = self
+			.fleet_field
 			.get_field_deep(["configuredHosts"])
 			.await?
 			.list_fields()
 			.await?;
-		 let mut out = vec![];
-		 for name in names {
-			out.push(ConfigHost {
-				name,
-			})
-		 }
-		 Ok(out)
+		let mut out = vec![];
+		for name in names {
+			out.push(ConfigHost { name })
+		}
+		Ok(out)
 	}
 	pub async fn system_config(&self, host: &str) -> Result<Field> {
-		self.fleet_field.get_field_deep(["configuredSystems", host, "config"]).await
+		self.fleet_field
+			.get_field_deep(["configuredSystems", host, "config"])
+			.await
 	}
 
 	pub(super) fn data(&self) -> MutexGuard<FleetData> {
modifiedcmds/fleet/src/main.rsdiffbeforeafterboth
--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -5,8 +5,8 @@
 pub(crate) mod host;
 pub(crate) mod keys;
 
+pub(crate) mod better_nix_eval;
 pub(crate) mod extra_args;
-pub(crate) mod better_nix_eval;
 
 mod fleetdata;
 
@@ -21,6 +21,7 @@
 use futures::stream::FuturesUnordered;
 use futures::TryStreamExt;
 use host::{Config, FleetOpts};
+use human_repr::HumanCount;
 use indicatif::{ProgressState, ProgressStyle};
 use tracing::{info, metadata::LevelFilter};
 use tracing::{info_span, Instrument};
@@ -121,9 +122,16 @@
 fn setup_logging() {
 	let indicatif_layer = IndicatifLayer::new().with_progress_style(
 		ProgressStyle::with_template(
-			"{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{pos:>7}/{len:7}{elapsed}{color_end}",
+			"{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{download_progress} {elapsed}{color_end}",
 		)
 		.unwrap()
+		.with_key("download_progress", |state: &ProgressState, writer: &mut dyn std::fmt::Write| {
+			let Some(len) = state.len() else {
+				return;
+			};
+			let pos = state.pos();
+			let _ = write!(writer, "{} / {}", pos.human_count_bare(), len.human_count_bare());
+		})
 		.with_key(
 			"color_start",
 			|state: &ProgressState, writer: &mut dyn std::fmt::Write| {
modifiedlib/default.nixdiffbeforeafterboth
--- a/lib/default.nix
+++ b/lib/default.nix
@@ -1,44 +1,56 @@
-{ flake-utils }: {
-  fleetConfiguration = { data, nixpkgs, hosts, ... }@allConfig:
-    let
-      hostNames = nixpkgs.lib.attrNames hosts;
-      config = builtins.removeAttrs allConfig [ "nixpkgs" "data" ];
-      fleetLib = import ./fleetLib.nix {
-        inherit nixpkgs hostNames;
-      };
-    in
-    nixpkgs.lib.genAttrs flake-utils.lib.defaultSystems (system:
-      let
+{flake-utils}: {
+  fleetConfiguration = {
+    data,
+    nixpkgs,
+    hosts,
+    ...
+  } @ allConfig: let
+    hostNames = nixpkgs.lib.attrNames hosts;
+    config = builtins.removeAttrs allConfig ["nixpkgs" "data"];
+    fleetLib = import ./fleetLib.nix {
+      inherit nixpkgs hostNames;
+    };
+  in
+    # Top-level arg is the builder system (not the target system!)
+    nixpkgs.lib.genAttrs flake-utils.lib.defaultSystems (system: let
+      withData = data: rec {
         root = nixpkgs.lib.evalModules {
-          modules = (import ../modules/fleet/_modules.nix) ++ [ config data ];
+          modules = (import ../modules/fleet/_modules.nix) ++ [config data];
           specialArgs = {
             inherit nixpkgs fleetLib;
           };
         };
         failedAssertions = map (x: x.message) (nixpkgs.lib.filter (x: !x.assertion) root.config.assertions);
         rootAssertWarn =
-          if failedAssertions != [ ]
+          if failedAssertions != []
           then throw "Failed assertions:\n${nixpkgs.lib.concatStringsSep "\n" (map (x: "- ${x}") failedAssertions)}"
           else nixpkgs.lib.showWarnings root.config.warnings root;
         configuredHosts = rootAssertWarn.config.hosts;
         configuredSecrets = rootAssertWarn.config.secrets;
-        configuredSystems = configuredSystemsWithExtraModules [ ];
-        configuredSystemsWithExtraModules = extraModules: nixpkgs.lib.listToAttrs (
-          map
+        configuredSystems = configuredSystemsWithExtraModules [];
+        configuredSystemsWithExtraModules = extraModules:
+          nixpkgs.lib.listToAttrs (
+            map
             (
               name: {
                 inherit name;
                 value = nixpkgs.lib.nixosSystem {
                   system = configuredHosts.${name}.system;
-                  modules = configuredHosts.${name}.modules ++ extraModules ++ [
-                    ({ ... }: {
-                      nixpkgs.system = system;
-                      nixpkgs.localSystem.system = system;
-                      nixpkgs.crossSystem = if system == configuredHosts.${name}.system then null else {
-                        system = configuredHosts.${name}.system;
-                      };
-                    })
-                  ];
+                  modules =
+                    configuredHosts.${name}.modules
+                    ++ extraModules
+                    ++ [
+                      ({...}: {
+                        nixpkgs.system = system;
+                        nixpkgs.localSystem.system = system;
+                        nixpkgs.crossSystem =
+                          if system == configuredHosts.${name}.system
+                          then null
+                          else {
+                            system = configuredHosts.${name}.system;
+                          };
+                      })
+                    ];
                   specialArgs = {
                     inherit fleetLib;
                     fleet = fleetLib.hostsToAttrs (host: configuredSystems.${host}.config);
@@ -47,11 +59,7 @@
               }
             )
             (builtins.attrNames rootAssertWarn.config.hosts)
-        );
-      in
-      rec {
-        inherit configuredHosts configuredSecrets configuredSystems;
-        configUnchecked = root.config;
+          );
         buildSystems = {
           toplevel = builtins.mapAttrs (_name: value: value.config.system.build.toplevel) (configuredSystemsWithExtraModules [
             ({...}: {
@@ -66,12 +74,22 @@
           ]);
           installationCd = builtins.mapAttrs (_name: value: value.config.system.build.isoImage) (configuredSystemsWithExtraModules [
             (nixpkgs + "/nixos/modules/installer/cd-dvd/installation-cd-minimal.nix")
-            ({ lib, ... }: {
+            ({lib, ...}: {
               buildTarget = "installation-cd";
               # Needed for https://github.com/NixOS/nixpkgs/issues/58959
-              boot.supportedFilesystems = lib.mkForce [ "btrfs" "reiserfs" "vfat" "f2fs" "xfs" "ntfs" "cifs" ];
+              boot.supportedFilesystems = lib.mkForce ["btrfs" "reiserfs" "vfat" "f2fs" "xfs" "ntfs" "cifs"];
             })
           ]);
         };
-      });
+        configUnchecked = root.config;
+      };
+      defaultData = withData data;
+    in rec {
+      inherit (defaultData) configuredHosts configuredSecrets configuredSystems buildSystems configUnchecked;
+      injectData = data: let
+        injectedData = withData data;
+      in {
+        inherit (injectedData) configuredHosts configuredSecrets configuredSystems buildSystems configUnchecked;
+      };
+    });
 }