git.delta.rocks / jrsonnet / refs/commits / e7a5b5f940a6

difftreelog

feat do not use build batching for single-host jobs

Yaroslav Bolyukin2024-11-14parent: #45f5af0.patch.diff
in: trunk

4 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1820,6 +1820,7 @@
 name = "nix-native-eval"
 version = "0.1.0"
 dependencies = [
+ "anyhow",
  "nixrs",
 ]
 
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
before · cmds/fleet/src/cmds/build_systems.rs
1use std::{env::current_dir, os::unix::fs::symlink, path::PathBuf, time::Duration};23use anyhow::{anyhow, Result};4use clap::{Parser, ValueEnum};5use fleet_base::{6	host::{Config, ConfigHost},7	opts::FleetOpts,8};9use itertools::Itertools as _;10use nix_eval::{nix_go, NixBuildBatch};11use tokio::{task::LocalSet, time::sleep};12use tracing::{error, field, info, info_span, warn, Instrument};1314#[derive(Parser)]15pub struct Deploy {16	/// Disable automatic rollback17	#[clap(long)]18	disable_rollback: bool,19	/// Action to execute after system is built20	action: DeployAction,21}2223#[derive(ValueEnum, Clone, Copy)]24enum DeployAction {25	/// Upload derivation, but do not execute the update.26	Upload,27	/// Upload and execute the activation script, old version will be used after reboot.28	Test,29	/// Upload and set as current system profile, but do not execute activation script.30	Boot,31	/// Upload, set current profile, and execute activation script.32	Switch,33}3435impl DeployAction {36	pub(crate) fn name(&self) -> Option<&'static str> {37		match self {38			Self::Upload => None,39			Self::Test => Some("test"),40			Self::Boot => Some("boot"),41			Self::Switch => Some("switch"),42		}43	}44	pub(crate) fn should_switch_profile(&self) -> bool {45		matches!(self, Self::Switch | Self::Boot)46	}47	pub(crate) fn should_activate(&self) -> bool {48		matches!(self, Self::Switch | Self::Test | Self::Boot)49	}50	pub(crate) fn should_create_rollback_marker(&self) -> bool {51		// Upload does nothing on the target machine, other than uploading the closure.52		// In boot case we want to have rollback marker prepared, so that the system may rollback itself on the next boot.53		!matches!(self, Self::Upload)54	}55	pub(crate) fn should_schedule_rollback_run(&self) -> bool {56		matches!(self, Self::Switch | Self::Test)57	}58}5960#[derive(Parser, Clone)]61pub struct BuildSystems {62	/// Attribute to build. Systems are deployed from "toplevel" attr, well-known used attributes63	/// are "sdImage"/"isoImage", and your configuration may include any other build attributes.64	#[clap(long, default_value = "toplevel")]65	build_attr: String,66}6768struct Generation {69	id: u32,70	current: bool,71	datetime: String,72}73async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {74	let mut cmd = host.cmd("nix-env").await?;75	cmd.comparg("--profile", "/nix/var/nix/profiles/system")76		.arg("--list-generations");77	// Sudo is required due to --list-generations acquiring lock on the profile.78	let data = cmd.sudo().run_string().await?;79	let generations = data80		.split('\n')81		.map(|e| e.trim())82		.filter(|&l| !l.is_empty())83		.filter_map(|g| {84			let gen: Option<Generation> = try {85				let mut parts = g.split_whitespace();86				let id = parts.next()?;87				let id: u32 = id.parse().ok()?;88				let date = parts.next()?;89				let time = parts.next()?;90				let current = if let Some(current) = parts.next() {91					if current == "(current)" {92						Some(true)93					} else {94						None95					}96				} else {97					Some(false)98				};99				let current = current?;100				if parts.next().is_some() {101					warn!("unexpected text after generation: {g}");102				}103				Generation {104					id,105					current,106					datetime: format!("{date} {time}"),107				}108			};109			if gen.is_none() {110				warn!("bad generation: {g}")111			}112			gen113		})114		.collect::<Vec<_>>();115	let current = generations116		.into_iter()117		.filter(|g| g.current)118		.at_most_one()119		.map_err(|_e| anyhow!("bad list-generations output"))?120		.ok_or_else(|| anyhow!("failed to find generation"))?;121	Ok(current)122}123124async fn deploy_task(125	action: DeployAction,126	host: &ConfigHost,127	built: PathBuf,128	specialisation: Option<String>,129	disable_rollback: bool,130) -> Result<()> {131	let mut failed = false;132	// TODO: Lockfile, to prevent concurrent system switch?133	// TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback134	// is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to135	// unit name conflict in systemd-run136	// This code is tied to rollback.nix137	if !disable_rollback && action.should_create_rollback_marker() {138		let _span = info_span!("preparing").entered();139		info!("preparing for rollback");140		let generation = get_current_generation(host).await?;141		info!(142			"rollback target would be {} {}",143			generation.id, generation.datetime144		);145		{146			let mut cmd = host.cmd("sh").await?;147			cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));148			if let Err(e) = cmd.sudo().run().await {149				error!("failed to set rollback marker: {e}");150				failed = true;151			}152		}153		// Activation script also starts rollback-watchdog.timer, however, it is possible that it won't be started.154		// Kicking it on manually will work best.155		//156		// There wouldn't be conflict, because here we trigger start of the primary service, and systemd will157		// only allow one instance of it.158159		// TODO: We should also watch how this process is going.160		// After running this command, we have less than 3 minutes to deploy everything,161		// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.162		// Anyway, reboot will still help in this case.163		if action.should_schedule_rollback_run() {164			let mut cmd = host.cmd("systemd-run").await?;165			cmd.comparg("--on-active", "3min")166				.comparg("--unit", "rollback-watchdog-run")167				.arg("systemctl")168				.arg("start")169				.arg("rollback-watchdog.service");170			if let Err(e) = cmd.sudo().run().await {171				error!("failed to schedule rollback run: {e}");172				failed = true;173			}174		}175	}176177	if action.should_switch_profile() && !failed {178		info!("switching system profile generation");179		// It would also be possible to update profile atomically during copy:180		// https://github.com/NixOS/nix/pull/11657181		let mut cmd = host.cmd("nix").await?;182		cmd.arg("build");183		cmd.comparg("--profile", "/nix/var/nix/profiles/system");184		cmd.arg(&built);185		if let Err(e) = cmd.sudo().run_nix().await {186			error!("failed to switch system profile generation: {e}");187			failed = true;188		}189	}190191	// FIXME: Connection might be disconnected after activation run192193	if action.should_activate() && !failed {194		let _span = info_span!("activating").entered();195		info!("executing activation script");196		let specialised = if let Some(specialisation) = specialisation {197			let mut specialised = built.join("specialisation");198			specialised.push(specialisation);199			specialised200		} else {201			built.clone()202		};203		let switch_script = specialised.join("bin/switch-to-configuration");204		let mut cmd = host.cmd(switch_script).in_current_span().await?;205		cmd.arg(action.name().expect("upload.should_activate == false"));206		if let Err(e) = cmd.sudo().run().in_current_span().await {207			error!("failed to activate: {e}");208			failed = true;209		}210	}211	if action.should_create_rollback_marker() {212		if !disable_rollback {213			if failed {214				if action.should_schedule_rollback_run() {215					info!("executing rollback");216					if let Err(e) = host217						.systemctl_start("rollback-watchdog.service")218						.instrument(info_span!("rollback"))219						.await220					{221						error!("failed to trigger rollback: {e}")222					}223				}224			} else {225				info!("trying to mark upgrade as successful");226				if let Err(e) = host227					.rm_file("/etc/fleet_rollback_marker", true)228					.in_current_span()229					.await230				{231					error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")232				}233			}234			info!("disarming watchdog, just in case");235			if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {236				// It is ok, if there was no reboot - then timer might not be running.237			}238			if action.should_schedule_rollback_run() {239				if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {240					error!("failed to disarm rollback run: {e}");241				}242			}243		} else if let Err(_e) = host244			.rm_file("/etc/fleet_rollback_marker", true)245			.in_current_span()246			.await247		{248			// Marker might not exist, yet better try to remove it.249		}250	}251	Ok(())252}253254async fn build_task(255	config: Config,256	host: String,257	build_attr: &str,258	batch: Option<NixBuildBatch>,259) -> Result<PathBuf> {260	info!("building");261	let host = config.host(&host).await?;262	// let action = Action::from(self.subcommand.clone());263	let nixos = host.nixos_config().await?;264	let drv = nix_go!(nixos.system.build[{ build_attr }]);265	let outputs = drv.build_maybe_batch(batch).await?;266	let out_output = outputs267		.get("out")268		.ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;269270	Ok(out_output.clone())271}272273impl BuildSystems {274	pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {275		let hosts = config.list_hosts().await?;276		let set = LocalSet::new();277		let build_attr = self.build_attr.clone();278		for host in hosts.into_iter() {279			if opts.should_skip(&host).await? {280				continue;281			}282			let config = config.clone();283			let span = info_span!("build", host = field::display(&host.name));284			let hostname = host.name;285			let build_attr = build_attr.clone();286			// FIXME: Since the introduction of better-nix-eval,287			// due to single repl used for builds, hosts are waiting for each other to build,288			// instead of building concurrently.289			//290			// Open multiple repls?291			//292			// Create build batcher, which will behave similar to golangs293			// WaitGroup, and start executing once all the build tasks are scheduled?294			// This also allows to cleanup build output, as there will be no longer295			// "waiting for remote machine" messages in the cases when one package is needed for296			// multiple hosts.297			set.spawn_local(298				(async move {299					let built = match build_task(config, hostname.clone(), &build_attr, None).await300					{301						Ok(path) => path,302						Err(e) => {303							error!("failed to deploy host: {}", e);304							return;305						}306					};307					// TODO: Handle error308					let mut out = current_dir().expect("cwd exists");309					out.push(format!("built-{}", hostname));310311					info!("linking iso image to {:?}", out);312					if let Err(e) = symlink(built, out) {313						error!("failed to symlink: {e}")314					}315				})316				.instrument(span),317			);318		}319		set.await;320		Ok(())321	}322}323324impl Deploy {325	pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {326		let hosts = config.list_hosts().await?;327		let set = LocalSet::new();328		let batch = Some(config.nix_session.new_build_batch(format!("deploy-hosts")));329		for host in hosts.into_iter() {330			if opts.should_skip(&host).await? {331				continue;332			}333			let config = config.clone();334			let span = info_span!("deploy", host = field::display(&host.name));335			let hostname = host.name.clone();336			let local_host = config.local_host();337			let opts = opts.clone();338			let batch = batch.clone();339			// FIXME: Fix repl concurrency (see build-systems)340			set.spawn_local(341				(async move {342					let built =343						match build_task(config.clone(), hostname.clone(), "toplevel", batch).await344						{345							Ok(path) => path,346							Err(e) => {347								error!("failed to deploy host: {}", e);348								return;349							}350						};351					if !opts.is_local(&hostname) {352						info!("uploading system closure");353						{354							// TODO: Move to remote_derivation method.355							// Alternatively, nix store make-content-addressed can be used,356							// at least for the first deployment, to provide trusted store key.357							//358							// It is much slower, yet doesn't require root on the deployer machine.359							let Ok(mut sign) = local_host.cmd("nix").await else {360								error!("failed to setup local");361								return;362							};363							// Private key for host machine is registered in nix-sign.nix364							sign.arg("store")365								.arg("sign")366								.comparg("--key-file", "/etc/nix/private-key")367								.arg("-r")368								.arg(&built);369							if let Err(e) = sign.sudo().run_nix().await {370								warn!("failed to sign store paths: {e}");371							};372						}373						let mut tries = 0;374						loop {375							match host.remote_derivation(&built).await {376								Ok(remote) => {377									assert!(remote == built, "CA derivations aren't implemented");378									break;379								}380								Err(e) if tries < 3 => {381									tries += 1;382									warn!("copy failure ({}/3): {}", tries, e);383									sleep(Duration::from_millis(5000)).await;384								}385								Err(e) => {386									error!("upload failed: {e}");387									return;388								}389							}390						}391					}392					if let Err(e) = deploy_task(393						self.action,394						&host,395						built,396						if let Ok(v) = opts.action_attr(&host, "specialisation").await {397							v398						} else {399							error!("unreachable? failed to get specialization");400							return;401						},402						self.disable_rollback,403					)404					.await405					{406						error!("activation failed: {e}");407					}408				})409				.instrument(span),410			);411		}412		drop(batch);413		set.await;414		Ok(())415	}416}
after · cmds/fleet/src/cmds/build_systems.rs
1use std::{env::current_dir, os::unix::fs::symlink, path::PathBuf, time::Duration};23use anyhow::{anyhow, Result};4use clap::{Parser, ValueEnum};5use fleet_base::{6	host::{Config, ConfigHost},7	opts::FleetOpts,8};9use itertools::Itertools as _;10use nix_eval::{nix_go, NixBuildBatch};11use tokio::{task::LocalSet, time::sleep};12use tracing::{error, field, info, info_span, warn, Instrument};1314#[derive(Parser)]15pub struct Deploy {16	/// Disable automatic rollback17	#[clap(long)]18	disable_rollback: bool,19	/// Action to execute after system is built20	action: DeployAction,21}2223#[derive(ValueEnum, Clone, Copy)]24enum DeployAction {25	/// Upload derivation, but do not execute the update.26	Upload,27	/// Upload and execute the activation script, old version will be used after reboot.28	Test,29	/// Upload and set as current system profile, but do not execute activation script.30	Boot,31	/// Upload, set current profile, and execute activation script.32	Switch,33}3435impl DeployAction {36	pub(crate) fn name(&self) -> Option<&'static str> {37		match self {38			Self::Upload => None,39			Self::Test => Some("test"),40			Self::Boot => Some("boot"),41			Self::Switch => Some("switch"),42		}43	}44	pub(crate) fn should_switch_profile(&self) -> bool {45		matches!(self, Self::Switch | Self::Boot)46	}47	pub(crate) fn should_activate(&self) -> bool {48		matches!(self, Self::Switch | Self::Test | Self::Boot)49	}50	pub(crate) fn should_create_rollback_marker(&self) -> bool {51		// Upload does nothing on the target machine, other than uploading the closure.52		// In boot case we want to have rollback marker prepared, so that the system may rollback itself on the next boot.53		!matches!(self, Self::Upload)54	}55	pub(crate) fn should_schedule_rollback_run(&self) -> bool {56		matches!(self, Self::Switch | Self::Test)57	}58}5960#[derive(Parser, Clone)]61pub struct BuildSystems {62	/// Attribute to build. Systems are deployed from "toplevel" attr, well-known used attributes63	/// are "sdImage"/"isoImage", and your configuration may include any other build attributes.64	#[clap(long, default_value = "toplevel")]65	build_attr: String,66}6768struct Generation {69	id: u32,70	current: bool,71	datetime: String,72}73async fn get_current_generation(host: &ConfigHost) -> Result<Generation> {74	let mut cmd = host.cmd("nix-env").await?;75	cmd.comparg("--profile", "/nix/var/nix/profiles/system")76		.arg("--list-generations");77	// Sudo is required due to --list-generations acquiring lock on the profile.78	let data = cmd.sudo().run_string().await?;79	let generations = data80		.split('\n')81		.map(|e| e.trim())82		.filter(|&l| !l.is_empty())83		.filter_map(|g| {84			let gen: Option<Generation> = try {85				let mut parts = g.split_whitespace();86				let id = parts.next()?;87				let id: u32 = id.parse().ok()?;88				let date = parts.next()?;89				let time = parts.next()?;90				let current = if let Some(current) = parts.next() {91					if current == "(current)" {92						Some(true)93					} else {94						None95					}96				} else {97					Some(false)98				};99				let current = current?;100				if parts.next().is_some() {101					warn!("unexpected text after generation: {g}");102				}103				Generation {104					id,105					current,106					datetime: format!("{date} {time}"),107				}108			};109			if gen.is_none() {110				warn!("bad generation: {g}")111			}112			gen113		})114		.collect::<Vec<_>>();115	let current = generations116		.into_iter()117		.filter(|g| g.current)118		.at_most_one()119		.map_err(|_e| anyhow!("bad list-generations output"))?120		.ok_or_else(|| anyhow!("failed to find generation"))?;121	Ok(current)122}123124async fn deploy_task(125	action: DeployAction,126	host: &ConfigHost,127	built: PathBuf,128	specialisation: Option<String>,129	disable_rollback: bool,130) -> Result<()> {131	let mut failed = false;132	// TODO: Lockfile, to prevent concurrent system switch?133	// TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback134	// is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to135	// unit name conflict in systemd-run136	// This code is tied to rollback.nix137	if !disable_rollback && action.should_create_rollback_marker() {138		let _span = info_span!("preparing").entered();139		info!("preparing for rollback");140		let generation = get_current_generation(host).await?;141		info!(142			"rollback target would be {} {}",143			generation.id, generation.datetime144		);145		{146			let mut cmd = host.cmd("sh").await?;147			cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));148			if let Err(e) = cmd.sudo().run().await {149				error!("failed to set rollback marker: {e}");150				failed = true;151			}152		}153		// Activation script also starts rollback-watchdog.timer, however, it is possible that it won't be started.154		// Kicking it on manually will work best.155		//156		// There wouldn't be conflict, because here we trigger start of the primary service, and systemd will157		// only allow one instance of it.158159		// TODO: We should also watch how this process is going.160		// After running this command, we have less than 3 minutes to deploy everything,161		// if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.162		// Anyway, reboot will still help in this case.163		if action.should_schedule_rollback_run() {164			let mut cmd = host.cmd("systemd-run").await?;165			cmd.comparg("--on-active", "3min")166				.comparg("--unit", "rollback-watchdog-run")167				.arg("systemctl")168				.arg("start")169				.arg("rollback-watchdog.service");170			if let Err(e) = cmd.sudo().run().await {171				error!("failed to schedule rollback run: {e}");172				failed = true;173			}174		}175	}176177	if action.should_switch_profile() && !failed {178		info!("switching system profile generation");179		// It would also be possible to update profile atomically during copy:180		// https://github.com/NixOS/nix/pull/11657181		let mut cmd = host.cmd("nix").await?;182		cmd.arg("build");183		cmd.comparg("--profile", "/nix/var/nix/profiles/system");184		cmd.arg(&built);185		if let Err(e) = cmd.sudo().run_nix().await {186			error!("failed to switch system profile generation: {e}");187			failed = true;188		}189	}190191	// FIXME: Connection might be disconnected after activation run192193	if action.should_activate() && !failed {194		let _span = info_span!("activating").entered();195		info!("executing activation script");196		let specialised = if let Some(specialisation) = specialisation {197			let mut specialised = built.join("specialisation");198			specialised.push(specialisation);199			specialised200		} else {201			built.clone()202		};203		let switch_script = specialised.join("bin/switch-to-configuration");204		let mut cmd = host.cmd(switch_script).in_current_span().await?;205		cmd.arg(action.name().expect("upload.should_activate == false"));206		if let Err(e) = cmd.sudo().run().in_current_span().await {207			error!("failed to activate: {e}");208			failed = true;209		}210	}211	if action.should_create_rollback_marker() {212		if !disable_rollback {213			if failed {214				if action.should_schedule_rollback_run() {215					info!("executing rollback");216					if let Err(e) = host217						.systemctl_start("rollback-watchdog.service")218						.instrument(info_span!("rollback"))219						.await220					{221						error!("failed to trigger rollback: {e}")222					}223				}224			} else {225				info!("trying to mark upgrade as successful");226				if let Err(e) = host227					.rm_file("/etc/fleet_rollback_marker", true)228					.in_current_span()229					.await230				{231					error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")232				}233			}234			info!("disarming watchdog, just in case");235			if let Err(_e) = host.systemctl_stop("rollback-watchdog.timer").await {236				// It is ok, if there was no reboot - then timer might not be running.237			}238			if action.should_schedule_rollback_run() {239				if let Err(e) = host.systemctl_stop("rollback-watchdog-run.timer").await {240					error!("failed to disarm rollback run: {e}");241				}242			}243		} else if let Err(_e) = host244			.rm_file("/etc/fleet_rollback_marker", true)245			.in_current_span()246			.await247		{248			// Marker might not exist, yet better try to remove it.249		}250	}251	Ok(())252}253254async fn build_task(255	config: Config,256	host: String,257	build_attr: &str,258	batch: Option<NixBuildBatch>,259) -> Result<PathBuf> {260	info!("building");261	let host = config.host(&host).await?;262	// let action = Action::from(self.subcommand.clone());263	let nixos = host.nixos_config().await?;264	let drv = nix_go!(nixos.system.build[{ build_attr }]);265	let outputs = drv.build_maybe_batch(batch).await?;266	let out_output = outputs267		.get("out")268		.ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;269270	Ok(out_output.clone())271}272273impl BuildSystems {274	pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {275		let hosts = opts.filter_skipped(config.list_hosts().await?).await?;276		let set = LocalSet::new();277		let build_attr = self.build_attr.clone();278		let batch = (hosts.len() > 1).then(|| {279			config280				.nix_session281				.new_build_batch("build-hosts".to_string())282		});283		for host in hosts {284			let config = config.clone();285			let span = info_span!("build", host = field::display(&host.name));286			let hostname = host.name;287			let build_attr = build_attr.clone();288			let batch = batch.clone();289			set.spawn_local(290				(async move {291					let built = match build_task(config, hostname.clone(), &build_attr, batch).await292					{293						Ok(path) => path,294						Err(e) => {295							error!("failed to deploy host: {}", e);296							return;297						}298					};299					// TODO: Handle error300					let mut out = current_dir().expect("cwd exists");301					out.push(format!("built-{}", hostname));302303					info!("linking iso image to {:?}", out);304					if let Err(e) = symlink(built, out) {305						error!("failed to symlink: {e}")306					}307				})308				.instrument(span),309			);310		}311		drop(batch);312		set.await;313		Ok(())314	}315}316317impl Deploy {318	pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {319		let hosts = opts.filter_skipped(config.list_hosts().await?).await?;320		let set = LocalSet::new();321		let batch = (hosts.len() > 1).then(|| {322			config323				.nix_session324				.new_build_batch("deploy-hosts".to_string())325		});326		for host in hosts.into_iter() {327			let config = config.clone();328			let span = info_span!("deploy", host = field::display(&host.name));329			let hostname = host.name.clone();330			let local_host = config.local_host();331			let opts = opts.clone();332			let batch = batch.clone();333334			set.spawn_local(335				(async move {336					let built =337						match build_task(config.clone(), hostname.clone(), "toplevel", batch).await338						{339							Ok(path) => path,340							Err(e) => {341								error!("failed to deploy host: {}", e);342								return;343							}344						};345					if !opts.is_local(&hostname) {346						info!("uploading system closure");347						{348							// TODO: Move to remote_derivation method.349							// Alternatively, nix store make-content-addressed can be used,350							// at least for the first deployment, to provide trusted store key.351							//352							// It is much slower, yet doesn't require root on the deployer machine.353							let Ok(mut sign) = local_host.cmd("nix").await else {354								error!("failed to setup local");355								return;356							};357							// Private key for host machine is registered in nix-sign.nix358							sign.arg("store")359								.arg("sign")360								.comparg("--key-file", "/etc/nix/private-key")361								.arg("-r")362								.arg(&built);363							if let Err(e) = sign.sudo().run_nix().await {364								warn!("failed to sign store paths: {e}");365							};366						}367						let mut tries = 0;368						loop {369							match host.remote_derivation(&built).await {370								Ok(remote) => {371									assert!(remote == built, "CA derivations aren't implemented");372									break;373								}374								Err(e) if tries < 3 => {375									tries += 1;376									warn!("copy failure ({}/3): {}", tries, e);377									sleep(Duration::from_millis(5000)).await;378								}379								Err(e) => {380									error!("upload failed: {e}");381									return;382								}383							}384						}385					}386					if let Err(e) = deploy_task(387						self.action,388						&host,389						built,390						if let Ok(v) = opts.action_attr(&host, "specialisation").await {391							v392						} else {393							error!("unreachable? failed to get specialization");394							return;395						},396						self.disable_rollback,397					)398					.await399					{400						error!("activation failed: {e}");401					}402				})403				.instrument(span),404			);405		}406		drop(batch);407		set.await;408		Ok(())409	}410}
modifiedcrates/fleet-base/src/opts.rsdiffbeforeafterboth
--- a/crates/fleet-base/src/opts.rs
+++ b/crates/fleet-base/src/opts.rs
@@ -91,6 +91,16 @@
 }
 
 impl FleetOpts {
+	pub async fn filter_skipped(&self, hosts: impl IntoIterator<Item = ConfigHost>) -> Result<Vec<ConfigHost>> {
+		let mut out = Vec::new();	
+		for host in hosts {
+			if self.should_skip(&host).await? {
+				continue;
+			}
+			out.push(host);
+		}
+		Ok(out)
+	}
 	pub async fn should_skip(&self, host: &ConfigHost) -> Result<bool> {
 		if self.skip.iter().any(|h| h as &str == host.name) {
 			return Ok(true);
modifiedcrates/nix-eval/src/lib.rsdiffbeforeafterboth
--- a/crates/nix-eval/src/lib.rs
+++ b/crates/nix-eval/src/lib.rs
@@ -9,11 +9,8 @@
 use pool::NixSessionPoolInner;
 use r2d2::PooledConnection;
 pub use session::{Error, Result};
-use tokio::{
-	sync::{mpsc, oneshot},
-	task::AbortHandle,
-};
-use tracing::{info, instrument, Instrument};
+use tokio::sync::{mpsc, oneshot};
+use tracing::instrument;
 pub use value::{Index, Value};
 
 mod pool;