git.delta.rocks / jrsonnet / refs/commits / 97d9be65842c

difftreelog

feat libssh preparation

Yaroslav Bolyukin2023-11-17parent: #3f73827.patch.diff
in: trunk

7 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -651,6 +651,27 @@
 ]
 
 [[package]]
+name = "dirs"
+version = "5.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225"
+dependencies = [
+ "dirs-sys",
+]
+
+[[package]]
+name = "dirs-sys"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c"
+dependencies = [
+ "libc",
+ "option-ext",
+ "redox_users",
+ "windows-sys 0.48.0",
+]
+
+[[package]]
 name = "displaydoc"
 version = "0.2.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -731,13 +752,16 @@
  "clap",
  "futures",
  "hostname",
+ "human-repr",
  "indicatif",
  "itertools",
  "nixlike",
  "once_cell",
+ "openssh",
  "owo-colors",
  "peg",
  "r2d2",
+ "regex",
  "serde",
  "serde_json",
  "shlex",
@@ -1019,6 +1043,12 @@
 ]
 
 [[package]]
+name = "human-repr"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f58b778a5761513caf593693f8951c97a5b610841e754788400f32102eefdff1"
+
+[[package]]
 name = "humantime"
 version = "2.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1257,6 +1287,17 @@
 ]
 
 [[package]]
+name = "libredox"
+version = "0.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8"
+dependencies = [
+ "bitflags 2.4.1",
+ "libc",
+ "redox_syscall 0.4.1",
+]
+
+[[package]]
 name = "linked-hash-map"
 version = "0.5.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1480,6 +1521,28 @@
 checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
 
 [[package]]
+name = "openssh"
+version = "0.10.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3dfe68c42d6ee6bd9de175b7a5d9bb86aa99d4e2fa7cf2f2a44e97f60b6d2759"
+dependencies = [
+ "dirs",
+ "libc",
+ "once_cell",
+ "shell-escape",
+ "tempfile",
+ "thiserror",
+ "tokio",
+ "tokio-pipe",
+]
+
+[[package]]
+name = "option-ext"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
+
+[[package]]
 name = "overload"
 version = "0.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1804,15 +1867,26 @@
 ]
 
 [[package]]
+name = "redox_users"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a18479200779601e498ada4e8c1e1f50e3ee19deb0259c25825a98b5603b2cb4"
+dependencies = [
+ "getrandom 0.2.10",
+ "libredox",
+ "thiserror",
+]
+
+[[package]]
 name = "regex"
-version = "1.9.5"
+version = "1.10.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47"
+checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343"
 dependencies = [
  "aho-corasick",
  "memchr",
- "regex-automata 0.3.8",
- "regex-syntax 0.7.5",
+ "regex-automata 0.4.3",
+ "regex-syntax 0.8.2",
 ]
 
 [[package]]
@@ -1826,13 +1900,13 @@
 
 [[package]]
 name = "regex-automata"
-version = "0.3.8"
+version = "0.4.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795"
+checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f"
 dependencies = [
  "aho-corasick",
  "memchr",
- "regex-syntax 0.7.5",
+ "regex-syntax 0.8.2",
 ]
 
 [[package]]
@@ -1843,9 +1917,9 @@
 
 [[package]]
 name = "regex-syntax"
-version = "0.7.5"
+version = "0.8.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da"
+checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
 
 [[package]]
 name = "rnix"
@@ -2099,6 +2173,12 @@
 ]
 
 [[package]]
+name = "shell-escape"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f"
+
+[[package]]
 name = "shlex"
 version = "1.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2379,6 +2459,16 @@
 ]
 
 [[package]]
+name = "tokio-pipe"
+version = "0.2.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f213a84bffbd61b8fa0ba8a044b4bbe35d471d0b518867181e82bd5c15542784"
+dependencies = [
+ "libc",
+ "tokio",
+]
+
+[[package]]
 name = "tokio-util"
 version = "0.7.10"
 source = "registry+https://github.com/rust-lang/crates.io-index"
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -11,7 +11,7 @@
 serde_json = "1.0"
 time = { version = "0.3.30", features = ["serde"] }
 tempfile = "3.8"
-once_cell = "1.18"
+once_cell = "1.18.0"
 hostname = "0.3.1"
 age-core = "0.9.0"
 peg = "0.8.2"
@@ -41,3 +41,6 @@
 r2d2 = "0.8.10"
 abort-on-drop = "0.2.2"
 unindent = "0.2.3"
+regex = "1.10.2"
+openssh = "0.10.1"
+human-repr = "1.1.0"
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
before · cmds/fleet/src/cmds/build_systems.rs
1use std::path::PathBuf;2use std::{env::current_dir, time::Duration};34use crate::command::MyCommand;5use crate::host::Config;6use anyhow::{anyhow, Result};7use clap::Parser;8use itertools::Itertools;9use tokio::{task::LocalSet, time::sleep};10use tracing::{error, field, info, info_span, warn, Instrument};1112#[derive(Parser, Clone)]13pub struct BuildSystems {14	/// Do not continue on error15	#[clap(long)]16	fail_fast: bool,17	/// Disable automatic rollback18	#[clap(long)]19	disable_rollback: bool,20	/// Run builds as sudo21	#[clap(long)]22	privileged_build: bool,23	#[clap(subcommand)]24	subcommand: Subcommand,25}2627enum UploadAction {28	Test,29	Boot,30	Switch,31}32impl UploadAction {33	fn name(&self) -> &'static str {34		match self {35			UploadAction::Test => "test",36			UploadAction::Boot => "boot",37			UploadAction::Switch => "switch",38		}39	}4041	pub(crate) fn should_switch_profile(&self) -> bool {42		matches!(self, Self::Switch | Self::Boot)43	}44	pub(crate) fn should_activate(&self) -> bool {45		matches!(self, Self::Switch | Self::Test)46	}47	pub(crate) fn should_schedule_rollback_run(&self) -> bool {48		matches!(self, Self::Switch | Self::Test)49	}50}5152enum PackageAction {53	SdImage,54	InstallationCd,55}56impl PackageAction {57	fn build_attr(&self) -> String {58		match self {59			PackageAction::SdImage => "sdImage".to_owned(),60			PackageAction::InstallationCd => "installationCd".to_owned(),61		}62	}63}6465enum Action {66	Upload { action: Option<UploadAction> },67	Package(PackageAction),68}69impl Action {70	fn build_attr(&self) -> String {71		match self {72			Action::Upload { .. } => "toplevel".to_owned(),73			Action::Package(p) => p.build_attr(),74		}75	}76}7778impl From<Subcommand> for Action {79	fn from(s: Subcommand) -> Self {80		match s {81			Subcommand::Upload => Self::Upload { action: None },82			Subcommand::Test => Self::Upload {83				action: Some(UploadAction::Test),84			},85			Subcommand::Boot => Self::Upload {86				action: Some(UploadAction::Boot),87			},88			Subcommand::Switch => Self::Upload {89				action: Some(UploadAction::Switch),90			},91			Subcommand::SdImage => Self::Package(PackageAction::SdImage),92			Subcommand::InstallationCd => Self::Package(PackageAction::InstallationCd),93		}94	}95}9697#[derive(Parser, Clone)]98enum Subcommand {99	/// Upload, but do not switch100	Upload,101	/// Upload + switch to built system until reboot102	Test,103	/// Upload + switch to built system after reboot104	Boot,105	/// Upload + test + boot106	Switch,107108	/// Build SD .img image109	SdImage,110	/// Build an installation cd ISO image111	InstallationCd,112}113114struct Generation {115	id: u32,116	current: bool,117	datetime: String,118}119async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {120	let mut cmd = MyCommand::new("nix-env");121	cmd.comparg("--profile", "/nix/var/nix/profiles/system")122		.arg("--list-generations");123	// Sudo is required due to --list-generations acquiring lock on the profile.124	let data = config.run_string_on(host, cmd, true).await?;125	let generations = data126		.split('\n')127		.map(|e| e.trim())128		.filter(|&l| !l.is_empty())129		.filter_map(|g| {130			let gen: Option<Generation> = try {131				let mut parts = g.split_whitespace();132				let id = parts.next()?;133				let id: u32 = id.parse().ok()?;134				let date = parts.next()?;135				let time = parts.next()?;136				let current = if let Some(current) = parts.next() {137					if current == "(current)" {138						Some(true)139					} else {140						None141					}142				} else {143					Some(false)144				};145				let current = current?;146				if parts.next().is_some() {147					warn!("unexpected text after generation: {g}");148				}149				Generation {150					id,151					current,152					datetime: format!("{date} {time}"),153				}154			};155			if gen.is_none() {156				warn!("bad generation: {g}")157			}158			gen159		})160		.collect::<Vec<_>>();161	let current = generations162		.into_iter()163		.filter(|g| g.current)164		.at_most_one()165		.map_err(|_e| anyhow!("bad list-generations output"))?166		.ok_or_else(|| anyhow!("failed to find generation"))?;167	Ok(current)168}169170async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {171	let mut cmd = MyCommand::new("systemctl");172	cmd.arg("stop").arg(unit);173	config.run_on(host, cmd, true).await174}175176async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {177	let mut cmd = MyCommand::new("systemctl");178	cmd.arg("start").arg(unit);179	config.run_on(host, cmd, true).await180}181182async fn execute_upload(183	build: &BuildSystems,184	config: &Config,185	action: UploadAction,186	host: &str,187	built: PathBuf,188) -> Result<()> {189	let mut failed = false;190	// TODO: Lockfile, to prevent concurrent system switch?191	// TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback192	// is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to193	// unit name conflict in systemd-run194	// This code is tied to rollback.nix195	if !build.disable_rollback {196		let _span = info_span!("preparing").entered();197		info!("preparing for rollback");198		let generation = get_current_generation(config, host).await?;199		info!(200			"rollback target would be {} {}",201			generation.id, generation.datetime202		);203		{204			let mut cmd = MyCommand::new("sh");205			cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));206			if let Err(e) = config.run_on(host, cmd, true).await {207				error!("failed to set rollback marker: {e}");208				failed = true;209			}210		}211		// Activation script also starts rollback-watchdog.timer, however, it is possible that it won't be started.212		// Kicking it on manually will work best.213		//214		// There wouldn't be conflict, because here we trigger start of the primary service, and systemd will215		// only allow one instance of it.216217		// TODO: We should also watch how this process is going.218		// After running this command, we have less than 3 minutes to deploy everything,219		// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.220		// Anyway, reboot will still help in this case.221		if action.should_schedule_rollback_run() {222			let mut cmd = MyCommand::new("systemd-run");223			cmd.comparg("--on-active", "3min")224				.comparg("--unit", "rollback-watchdog-run")225				.arg("systemctl")226				.arg("start")227				.arg("rollback-watchdog.service");228			if let Err(e) = config.run_on(host, cmd, true).await {229				error!("failed to schedule rollback run: {e}");230				failed = true;231			}232		}233	}234	if action.should_switch_profile() && !failed {235		info!("switching generation");236		let mut cmd = MyCommand::new("nix-env");237		cmd.comparg("--profile", "/nix/var/nix/profiles/system")238			.comparg("--set", &built);239		if let Err(e) = config.run_on(host, cmd, true).await {240			error!("failed to switch generation: {e}");241			failed = true;242		}243	}244	if action.should_activate() && !failed {245		let _span = info_span!("activating").entered();246		info!("executing activation script");247		let mut switch_script = built.clone();248		switch_script.push("bin");249		switch_script.push("switch-to-configuration");250		let mut cmd = MyCommand::new(switch_script);251		cmd.arg(action.name());252		if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {253			error!("failed to activate: {e}");254			failed = true;255		}256	}257	if !build.disable_rollback {258		if failed {259			info!("executing rollback");260			if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")261				.instrument(info_span!("rollback"))262				.await263			{264				error!("failed to trigger rollback: {e}")265			}266		} else {267			info!("trying to mark upgrade as successful");268			let mut cmd = MyCommand::new("rm");269			cmd.arg("-f").arg("/etc/fleet_rollback_marker");270			if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {271				error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")272			}273		}274		info!("disarming watchdog, just in case");275		if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {276			// It is ok, if there was no reboot - then timer might not be running.277		}278		if action.should_schedule_rollback_run() {279			if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {280				error!("failed to disarm rollback run: {e}");281			}282		}283	} else {284		let mut cmd = MyCommand::new("rm");285		cmd.arg("-f").arg("/etc/fleet_rollback_marker");286		if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {287			// Marker might not exist, yet better try to remove it.288		}289	}290	Ok(())291}292293impl BuildSystems {294	async fn build_task(self, config: Config, host: String) -> Result<()> {295		info!("building");296		let action = Action::from(self.subcommand.clone());297		let built = {298			let dir = tempfile::tempdir()?;299			dir.path().to_owned()300		};301302		let mut nix_build = MyCommand::new("nix");303		nix_build304			.args([305				"build",306				"--impure",307				"--json",308				// "--show-trace",309				"--no-link",310			])311			.comparg("--out-link", &built)312			.arg(313				config.configuration_attr_name(&format!(314					"buildSystems.{}.{host}",315					action.build_attr()316				)),317			)318			.args(&config.nix_args);319320		if self.privileged_build {321			nix_build = nix_build.sudo();322		}323324		nix_build.run_nix().await.map_err(|e| {325			if action.build_attr() == "sdImage" {326				info!("sd-image build failed");327				info!("Make sure you have imported modulesPath/installer/sd-card/sd-image-<arch>[-installer].nix (For installer, you may want to check config)");328				info!("This module was automatically imported before, but was removed for better customization")329			}330			e331		})?;332		let built = std::fs::canonicalize(built)?;333334		match action {335			Action::Upload { action } => {336				if !config.is_local(&host) {337					info!("uploading system closure");338					{339						let mut sign = MyCommand::new("sudo");340						// Private key for host machine is registered in nix-sign.nix341						sign.arg("nix")342							.arg("store")343							.arg("sign")344							.comparg("--key-file", "/etc/nix/private-key")345							.arg("-r")346							.arg(&built);347						if let Err(e) = sign.run_nix().await {348							warn!("Failed to sign store paths: {e}");349						};350					}351					let mut tries = 0;352					loop {353						let mut nix = MyCommand::new("nix");354						nix.arg("copy")355							.arg("--substitute-on-destination")356							.comparg("--to", format!("ssh-ng://{host}"))357							.arg(&built);358						match nix.run_nix().await {359							Ok(()) => break,360							Err(e) if tries < 3 => {361								tries += 1;362								warn!("Copy failure ({}/3): {}", tries, e);363								sleep(Duration::from_millis(5000)).await;364							}365							Err(e) => return Err(e),366						}367					}368				}369				if let Some(action) = action {370					execute_upload(&self, &config, action, &host, built).await?371				}372			}373			Action::Package(PackageAction::SdImage) => {374				let mut out = current_dir()?;375				out.push(format!("sd-image-{}", host));376377				info!("building sd image to {:?}", out);378				let mut nix_build = MyCommand::new("nix");379				nix_build380					.args(["build", "--impure", "--no-link"])381					.comparg("--out-link", &out)382					.arg(config.configuration_attr_name(&format!("buildSystems.sdImage.{}", host,)))383					.args(&config.nix_args);384				if !self.fail_fast {385					nix_build.arg("--keep-going");386				}387				if self.privileged_build {388					nix_build = nix_build.sudo();389				}390391				nix_build.run_nix().await?;392			}393			Action::Package(PackageAction::InstallationCd) => {394				let mut out = current_dir()?;395				out.push(format!("installation-cd-{}", host));396397				info!("building sd image to {:?}", out);398				let mut nix_build = MyCommand::new("nix");399				nix_build400					.args(["build", "--impure", "--no-link"])401					.comparg("--out-link", &out)402					.arg(403						config.configuration_attr_name(&format!(404							"buildSystems.installationCd.{}",405							host,406						)),407					)408					.args(&config.nix_args);409				if !self.fail_fast {410					nix_build.arg("--keep-going");411				}412				if self.privileged_build {413					nix_build = nix_build.sudo();414				}415416				nix_build.run_nix().await?;417			}418		};419		Ok(())420	}421422	pub async fn run(self, config: &Config) -> Result<()> {423		let hosts = config.list_hosts().await?;424		let set = LocalSet::new();425		let this = &self;426		for host in hosts.into_iter() {427			if config.should_skip(&host.name) {428				continue;429			}430			let config = config.clone();431			let this = this.clone();432			let span = info_span!("deployment", host = field::display(&host.name));433			let hostname = host.name;434			set.spawn_local(435				(async move {436					match this.build_task(config, hostname).await {437						Ok(_) => {}438						Err(e) => {439							error!("failed to deploy host: {}", e)440						}441					}442				})443				.instrument(span),444			);445		}446		set.await;447		Ok(())448	}449}
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,4 +1,5 @@
 use std::{
+	borrow::Cow,
 	collections::HashMap,
 	ffi::OsStr,
 	process::Stdio,
@@ -6,8 +7,12 @@
 	task::Poll,
 };
 
-use anyhow::Result;
+use anyhow::{anyhow, Result};
 use futures::StreamExt;
+use itertools::Either;
+use once_cell::sync::Lazy;
+use openssh::{OverSsh, Session};
+use regex::Regex;
 use serde::{de::Visitor, Deserialize};
 use tokio::{io::AsyncRead, process::Command, select};
 use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
@@ -37,6 +42,7 @@
 	command: String,
 	args: Vec<String>,
 	env: Vec<(String, String)>,
+	ssh_session: Option<Arc<Session>>,
 }
 impl MyCommand {
 	pub fn new(cmd: impl AsRef<OsStr>) -> Self {
@@ -45,6 +51,7 @@
 			command: ostoutf8(cmd),
 			args: vec![],
 			env: vec![],
+			ssh_session: None,
 		}
 	}
 	fn into_args(self) -> Vec<String> {
@@ -90,6 +97,18 @@
 		}
 		out
 	}
+	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {
+		Ok(if let Some(session) = self.ssh_session.clone() {
+			let cmd = self.into_command();
+			Either::Right(
+				cmd.over_ssh(session)
+					.map_err(|e| anyhow!("ssh error: {e}"))?,
+			)
+		} else {
+			let cmd = self.into_command();
+			Either::Left(cmd)
+		})
+	}
 	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {
 		let arg = arg.as_ref();
 		self.args.push(ostoutf8(arg));
@@ -116,9 +135,15 @@
 		self
 	}
 	pub fn sudo(self) -> Self {
-		let mut out = Self::new("sudo");
-		out.args(self.into_args());
-		out
+		if std::env::var_os("NO_SUDO").is_some() {
+			let mut out = Self::new("su");
+			out.arg("-c").arg(self.into_string());
+			out
+		} else {
+			let mut out = Self::new("sudo");
+			out.args(self.into_args());
+			out
+		}
 	}
 	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {
 		let mut out = Self::new("ssh");
@@ -126,6 +151,10 @@
 		out.arg(self.into_string());
 		out
 	}
+	pub fn over_ssh(mut self, session: Arc<Session>) -> Self {
+		self.ssh_session = Some(session);
+		self
+	}
 
 	pub async fn run(self) -> Result<()> {
 		let str = self.clone().into_string();
@@ -218,6 +247,11 @@
 pub struct NixHandler {
 	spans: HashMap<u64, Span>,
 }
+fn process_message(m: &str) -> Cow<'_, str> {
+	static OSC_CLEANER: Lazy<Regex> =
+		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
+	OSC_CLEANER.replace_all(m, "")
+}
 impl Handler for NixHandler {
 	fn handle_line(&mut self, e: &str) {
 		if let Some(e) = e.strip_prefix("@nix ") {
@@ -303,7 +337,7 @@
 					{
 						let span = info_span!("job");
 						span.pb_start();
-						span.pb_set_message(text.trim());
+						span.pb_set_message(&process_message(text.trim()));
 						self.spans.insert(id, span);
 						info!(target: "nix", "{}", text);
 					}
@@ -383,7 +417,7 @@
 				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {
 					if let Some(span) = self.spans.get(&id) {
 						if let LogField::String(s) = &fields[0] {
-							span.pb_set_message(s.trim());
+							span.pb_set_message(&process_message(s.trim()));
 						} else {
 							warn!("bad fields: {fields:?}");
 						}
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -7,8 +7,9 @@
 	sync::{Arc, Mutex, MutexGuard},
 };
 
-use anyhow::{bail, Context, Result};
+use anyhow::{anyhow, bail, Context, Result};
 use clap::{ArgGroup, Parser};
+use openssh::SessionBuilder;
 use tempfile::NamedTempFile;
 
 use crate::{
@@ -43,6 +44,16 @@
 pub struct ConfigHost {
 	pub name: String,
 }
+impl ConfigHost {
+	async fn open_session(&self) -> Result<openssh::Session> {
+		let mut session = SessionBuilder::default();
+
+		session
+			.connect(&self.name)
+			.await
+			.map_err(|e| anyhow!("ssh error: {e}"))
+	}
+}
 
 impl Config {
 	pub fn should_skip(&self, host: &str) -> bool {
@@ -93,21 +104,22 @@
 	}
 
 	pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
-		let names = self.fleet_field
+		let names = self
+			.fleet_field
 			.get_field_deep(["configuredHosts"])
 			.await?
 			.list_fields()
 			.await?;
-		 let mut out = vec![];
-		 for name in names {
-			out.push(ConfigHost {
-				name,
-			})
-		 }
-		 Ok(out)
+		let mut out = vec![];
+		for name in names {
+			out.push(ConfigHost { name })
+		}
+		Ok(out)
 	}
 	pub async fn system_config(&self, host: &str) -> Result<Field> {
-		self.fleet_field.get_field_deep(["configuredSystems", host, "config"]).await
+		self.fleet_field
+			.get_field_deep(["configuredSystems", host, "config"])
+			.await
 	}
 
 	pub(super) fn data(&self) -> MutexGuard<FleetData> {
modifiedcmds/fleet/src/main.rsdiffbeforeafterboth
--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -5,8 +5,8 @@
 pub(crate) mod host;
 pub(crate) mod keys;
 
+pub(crate) mod better_nix_eval;
 pub(crate) mod extra_args;
-pub(crate) mod better_nix_eval;
 
 mod fleetdata;
 
@@ -21,6 +21,7 @@
 use futures::stream::FuturesUnordered;
 use futures::TryStreamExt;
 use host::{Config, FleetOpts};
+use human_repr::HumanCount;
 use indicatif::{ProgressState, ProgressStyle};
 use tracing::{info, metadata::LevelFilter};
 use tracing::{info_span, Instrument};
@@ -121,9 +122,16 @@
 fn setup_logging() {
 	let indicatif_layer = IndicatifLayer::new().with_progress_style(
 		ProgressStyle::with_template(
-			"{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{pos:>7}/{len:7}{elapsed}{color_end}",
+			"{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{download_progress} {elapsed}{color_end}",
 		)
 		.unwrap()
+		.with_key("download_progress", |state: &ProgressState, writer: &mut dyn std::fmt::Write| {
+			let Some(len) = state.len() else {
+				return;
+			};
+			let pos = state.pos();
+			let _ = write!(writer, "{} / {}", pos.human_count_bare(), len.human_count_bare());
+		})
 		.with_key(
 			"color_start",
 			|state: &ProgressState, writer: &mut dyn std::fmt::Write| {
modifiedlib/default.nixdiffbeforeafterboth
--- a/lib/default.nix
+++ b/lib/default.nix
@@ -1,44 +1,56 @@
-{ flake-utils }: {
-  fleetConfiguration = { data, nixpkgs, hosts, ... }@allConfig:
-    let
-      hostNames = nixpkgs.lib.attrNames hosts;
-      config = builtins.removeAttrs allConfig [ "nixpkgs" "data" ];
-      fleetLib = import ./fleetLib.nix {
-        inherit nixpkgs hostNames;
-      };
-    in
-    nixpkgs.lib.genAttrs flake-utils.lib.defaultSystems (system:
-      let
+{flake-utils}: {
+  fleetConfiguration = {
+    data,
+    nixpkgs,
+    hosts,
+    ...
+  } @ allConfig: let
+    hostNames = nixpkgs.lib.attrNames hosts;
+    config = builtins.removeAttrs allConfig ["nixpkgs" "data"];
+    fleetLib = import ./fleetLib.nix {
+      inherit nixpkgs hostNames;
+    };
+  in
+    # Top-level arg is the builder system (not the target system!)
+    nixpkgs.lib.genAttrs flake-utils.lib.defaultSystems (system: let
+      withData = data: rec {
         root = nixpkgs.lib.evalModules {
-          modules = (import ../modules/fleet/_modules.nix) ++ [ config data ];
+          modules = (import ../modules/fleet/_modules.nix) ++ [config data];
           specialArgs = {
             inherit nixpkgs fleetLib;
           };
         };
         failedAssertions = map (x: x.message) (nixpkgs.lib.filter (x: !x.assertion) root.config.assertions);
         rootAssertWarn =
-          if failedAssertions != [ ]
+          if failedAssertions != []
           then throw "Failed assertions:\n${nixpkgs.lib.concatStringsSep "\n" (map (x: "- ${x}") failedAssertions)}"
           else nixpkgs.lib.showWarnings root.config.warnings root;
         configuredHosts = rootAssertWarn.config.hosts;
         configuredSecrets = rootAssertWarn.config.secrets;
-        configuredSystems = configuredSystemsWithExtraModules [ ];
-        configuredSystemsWithExtraModules = extraModules: nixpkgs.lib.listToAttrs (
-          map
+        configuredSystems = configuredSystemsWithExtraModules [];
+        configuredSystemsWithExtraModules = extraModules:
+          nixpkgs.lib.listToAttrs (
+            map
             (
               name: {
                 inherit name;
                 value = nixpkgs.lib.nixosSystem {
                   system = configuredHosts.${name}.system;
-                  modules = configuredHosts.${name}.modules ++ extraModules ++ [
-                    ({ ... }: {
-                      nixpkgs.system = system;
-                      nixpkgs.localSystem.system = system;
-                      nixpkgs.crossSystem = if system == configuredHosts.${name}.system then null else {
-                        system = configuredHosts.${name}.system;
-                      };
-                    })
-                  ];
+                  modules =
+                    configuredHosts.${name}.modules
+                    ++ extraModules
+                    ++ [
+                      ({...}: {
+                        nixpkgs.system = system;
+                        nixpkgs.localSystem.system = system;
+                        nixpkgs.crossSystem =
+                          if system == configuredHosts.${name}.system
+                          then null
+                          else {
+                            system = configuredHosts.${name}.system;
+                          };
+                      })
+                    ];
                   specialArgs = {
                     inherit fleetLib;
                     fleet = fleetLib.hostsToAttrs (host: configuredSystems.${host}.config);
@@ -47,11 +59,7 @@
               }
             )
             (builtins.attrNames rootAssertWarn.config.hosts)
-        );
-      in
-      rec {
-        inherit configuredHosts configuredSecrets configuredSystems;
-        configUnchecked = root.config;
+          );
         buildSystems = {
           toplevel = builtins.mapAttrs (_name: value: value.config.system.build.toplevel) (configuredSystemsWithExtraModules [
             ({...}: {
@@ -66,12 +74,22 @@
           ]);
           installationCd = builtins.mapAttrs (_name: value: value.config.system.build.isoImage) (configuredSystemsWithExtraModules [
             (nixpkgs + "/nixos/modules/installer/cd-dvd/installation-cd-minimal.nix")
-            ({ lib, ... }: {
+            ({lib, ...}: {
               buildTarget = "installation-cd";
               # Needed for https://github.com/NixOS/nixpkgs/issues/58959
-              boot.supportedFilesystems = lib.mkForce [ "btrfs" "reiserfs" "vfat" "f2fs" "xfs" "ntfs" "cifs" ];
+              boot.supportedFilesystems = lib.mkForce ["btrfs" "reiserfs" "vfat" "f2fs" "xfs" "ntfs" "cifs"];
             })
           ]);
         };
-      });
+        configUnchecked = root.config;
+      };
+      defaultData = withData data;
+    in rec {
+      inherit (defaultData) configuredHosts configuredSecrets configuredSystems buildSystems configUnchecked;
+      injectData = data: let
+        injectedData = withData data;
+      in {
+        inherit (injectedData) configuredHosts configuredSecrets configuredSystems buildSystems configUnchecked;
+      };
+    });
 }