git.delta.rocks / jrsonnet / refs/commits / 3972fee37ee3

difftreelog

feat explicitly mark hosts as managed by fleet

Lach2025-04-05parent: #a1a72ce.patch.diff
in: trunk

7 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -924,6 +924,7 @@
  "hostname",
  "human-repr",
  "indicatif",
+ "indoc",
  "itertools 0.13.0",
  "nix-eval",
  "nixlike",
@@ -1537,6 +1538,12 @@
 ]
 
 [[package]]
+name = "indoc"
+version = "2.0.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd"
+
+[[package]]
 name = "inout"
 version = "0.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -47,6 +47,7 @@
 nix-eval.workspace = true
 nom = "7.1.3"
 fleet-base = { version = "0.1.0", path = "../../crates/fleet-base" }
+indoc = "2.0.6"
 
 [features]
 default = ["indicatif"]
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -1,6 +1,6 @@
-use std::{env::current_dir, os::unix::fs::symlink, path::PathBuf, time::Duration};
+use std::{env::current_dir, os::unix::fs::symlink, path::PathBuf, str::FromStr, time::Duration};
 
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, bail, Result};
 use clap::{Parser, ValueEnum};
 use fleet_base::{
 	host::{Config, ConfigHost},
@@ -132,6 +132,7 @@
 	disable_rollback: bool,
 ) -> Result<()> {
 	let mut failed = false;
+
 	// TODO: Lockfile, to prevent concurrent system switch?
 	// TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback
 	// is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to
@@ -332,6 +333,24 @@
 	}
 }
 
+#[derive(Clone, PartialEq, Copy)]
+enum DeployKind {
+	// NixOS => NixOS managed by fleet
+	UpgradeToFleet,
+	// NixOS managed by fleet => NixOS managed by fleet
+	Fleet,
+}
+impl FromStr for DeployKind {
+	type Err = anyhow::Error;
+	fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+		match s {
+			"upgrade-to-fleet" => Ok(Self::UpgradeToFleet),
+			"fleet" => Ok(Self::Fleet),
+			v => bail!("unknown deploy_kind: {v}; expected on of \"upgrade-to-fleet\", \"fleet\""),
+		}
+	}
+}
+
 impl Deploy {
 	pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
 		let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
@@ -348,6 +367,8 @@
 			let local_host = config.local_host();
 			let opts = opts.clone();
 			let batch = batch.clone();
+			let mut deploy_kind: Option<DeployKind> =
+				opts.action_attr(&host, "deploy_kind").await?;
 
 			set.spawn_local(
 				(async move {
@@ -356,10 +377,40 @@
 						{
 							Ok(path) => path,
 							Err(e) => {
-								error!("failed to deploy host: {}", e);
+								error!("failed to build host system closure: {}", e);
 								return;
 							}
 						};
+					if deploy_kind == None {
+						let is_fleet_managed = match host.file_exists("/etc/FLEET_HOST").await {
+							Ok(v) => v,
+							Err(e) => {
+								error!("failed to query remote system kind: {}", e);
+								return;
+							},
+						};
+						if !is_fleet_managed {
+							error!(indoc::indoc!{"
+								host is not marked as managed by fleet
+								if you're not trying to lustrate/install system from scratch,
+								you should either
+									1. manually create /etc/FLEET_HOST file on the target host,
+									2. use ?deploy_kind=fleet host argument if you're upgrading from older version of fleet
+									3. use ?deploy_kind=upgrade_to_fleet if you're upgrading from plain nixos to fleet-managed nixos
+							"});
+							return;
+						}
+						deploy_kind = Some(DeployKind::Fleet);
+					}
+					let deploy_kind = deploy_kind.expect("deploy_kind is set");
+
+					// TODO: Make disable_rollback a host attribute instead
+					let mut disable_rollback = self.disable_rollback;
+					if !disable_rollback && deploy_kind != DeployKind::Fleet {
+						warn!("disabling rollback, as not supported by non-fleet deployment kinds");
+						disable_rollback = true;
+					}
+
 					if !opts.is_local(&hostname) {
 						info!("uploading system closure");
 						{
@@ -411,7 +462,7 @@
 							error!("unreachable? failed to get specialization");
 							return;
 						},
-						self.disable_rollback,
+						disable_rollback,
 					)
 					.await
 					{
modifiedcmds/fleet/src/main.rsdiffbeforeafterboth
--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -66,9 +66,9 @@
 
 #[derive(Parser)]
 enum Opts {
-	/// Prepare systems for deployments
+	/// Build system closures
 	BuildSystems(BuildSystems),
-
+	/// Upload and switch system closures
 	Deploy(Deploy),
 	/// Secret management
 	#[clap(subcommand)]
modifiedcrates/fleet-base/src/command.rsdiffbeforeafterboth
before · crates/fleet-base/src/command.rs
1use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};23use anyhow::{anyhow, Result};4use better_command::{Handler, NixHandler, PlainHandler};5use futures::StreamExt;6use itertools::Either;7use openssh::{OverSsh, OwningCommand, Session};8use tokio::{io::AsyncRead, process::Command, select};9use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};10use tracing::debug;1112use crate::host::EscalationStrategy;1314fn escape_bash(input: &str, out: &mut String) {15	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17		out.push_str(input);18		return;19	}20	out.push('\'');21	for (i, v) in input.split('\'').enumerate() {22		if i != 0 {23			out.push_str("'\"'\"'");24		}25		out.push_str(v);26	}27	out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30	os.as_ref().to_str().expect("non-utf8 data").to_owned()31}3233#[derive(Clone, Debug)]34pub struct MyCommand {35	command: String,36	args: Vec<String>,37	env: Vec<(String, String)>,38	ssh_session: Option<Arc<Session>>,39	escalation: EscalationStrategy,40	escalate: bool,41}42impl MyCommand {43	pub fn new_on(44		escalation: EscalationStrategy,45		cmd: impl AsRef<OsStr>,46		session: Arc<Session>,47	) -> Self {48		assert!(!cmd.as_ref().is_empty());49		Self {50			command: ostoutf8(cmd),51			args: vec![],52			env: vec![],53			ssh_session: Some(session),54			escalation,55			escalate: false,56		}57	}58	pub fn new(escalation: EscalationStrategy, cmd: impl AsRef<OsStr>) -> Self {59		assert!(!cmd.as_ref().is_empty());60		Self {61			command: ostoutf8(cmd),62			args: vec![],63			env: vec![],64			ssh_session: None,65			escalation,66			escalate: false,67		}68	}69	fn new_here(&self, cmd: impl AsRef<OsStr>) -> Self {70		if let Some(ssh_session) = self.ssh_session.clone() {71			Self::new_on(self.escalation, cmd, ssh_session)72		} else {73			Self::new(self.escalation, cmd)74		}75	}7677	fn into_args(self) -> Vec<String> {78		let mut out = Vec::new();79		if !self.env.is_empty() {80			out.push("env".to_owned());81			for (k, v) in self.env {82				assert!(!k.contains('='));83				out.push(format!("{k}={v}"));84			}85		}86		out.push(self.command);87		out.extend(self.args);88		out89	}9091	/// Translates environment variables into env command execution.92	/// Required for ssh, as ssh don't allow to send environment variables (at least by default).93	///94	/// FIXME: Insecure, as arguments might be seen by other users on the same machine.95	/// Figure out some way to transfer environment using stdio?96	fn translate_env_into_env(self) -> Self {97		if self.env.is_empty() {98			return self;99		}100		let mut out = self.new_here("env");101		for (k, v) in self.env {102			assert!(!k.contains('='));103			out.arg(format!("{k}={v}"));104		}105		out.arg(self.command);106		out.args(self.args);107108		out109	}110	fn into_string(self) -> String {111		let mut out = String::new();112		if !self.env.is_empty() {113			out.push_str("env");114			for (k, v) in self.env {115				out.push(' ');116				assert!(!k.contains('='));117				escape_bash(&k, &mut out);118				out.push('=');119				escape_bash(&v, &mut out);120			}121		}122		if !out.is_empty() {123			out.push(' ');124		}125		escape_bash(&self.command, &mut out);126		for arg in self.args {127			out.push(' ');128			escape_bash(&arg, &mut out);129		}130		out131	}132	fn into_command_unchecked_local(self) -> Command {133		let mut out = Command::new(self.command);134		out.args(self.args);135		for (k, v) in self.env {136			out.env(k, v);137		}138		out139	}140	fn into_command(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {141		Ok(if let Some(session) = self.ssh_session.clone() {142			let cmd = self.translate_env_into_env().into_command_unchecked_local();143			Either::Right(144				cmd.over_ssh(session)145					.map_err(|e| anyhow!("ssh error: {e}"))?,146			)147		} else {148			let cmd = self.into_command_unchecked_local();149			Either::Left(cmd)150		})151	}152	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {153		let arg = arg.as_ref();154		self.args.push(ostoutf8(arg));155		self156	}157	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {158		let arg = arg.as_ref();159		let value = value.as_ref();160		let arg = ostoutf8(arg);161		let value = ostoutf8(value);162		self.arg(format!("{arg}={value}"));163		self164	}165	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {166		self.arg(arg);167		self.arg(value);168		self169	}170	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {171		self.env172			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));173		self174	}175	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {176		for arg in args.into_iter() {177			let arg = arg.as_ref();178			self.args.push(ostoutf8(arg));179		}180		self181	}182	pub fn sudo(mut self) -> Self {183		self.escalate = true;184		self185	}186	fn wrap_sudo_if_needed(self) -> Self {187		if !self.escalate {188			return self;189		}190		match self.escalation {191			EscalationStrategy::Su => {192				let mut out = self.new_here("su");193				out.arg("-c").arg(self.into_string());194				out195			}196			EscalationStrategy::Sudo => {197				let mut out = self.new_here("sudo");198				out.args(self.into_args());199				out200			}201			EscalationStrategy::Run0 => {202				// run0 wants interactive authentication by default.203				let mut run0 = self.new_here("run0");204				let mut out = self.new_here("script");205206				// Red backgrounds messes with fleet formatting207				run0.arg("--background=");208				run0.args(self.into_args());209210				out.arg("-q");211				out.arg("/dev/null");212				out.arg("-c");213				out.arg(run0.into_string());214				dbg!(&out);215				out216			}217		}218	}219220	pub async fn run(self) -> Result<()> {221		let str = self.clone().into_string();222		let cmd = self.wrap_sudo_if_needed().into_command()?;223		match cmd {224			Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,225			Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,226		};227		Ok(())228	}229	pub async fn run_string(self) -> Result<String> {230		let bytes = self.run_bytes().await?;231		Ok(String::from_utf8(bytes)?)232	}233	pub async fn run_bytes(self) -> Result<Vec<u8>> {234		let str = self.clone().into_string();235		let cmd = self.wrap_sudo_if_needed().into_command()?;236		let v = match cmd {237			Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,238			Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,239		};240		Ok(v)241	}242243	pub async fn run_nix_string(mut self) -> Result<String> {244		let str = self.clone().into_string();245		self.arg("--log-format").arg("internal-json");246		let cmd = self.wrap_sudo_if_needed().into_command()?;247		let bytes = match cmd {248			Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?,249			Either::Right(cmd) => {250				run_nix_inner_stdout_ssh(str, cmd, &mut NixHandler::default()).await?251			}252		};253		Ok(String::from_utf8(bytes)?)254	}255	pub async fn run_nix(mut self) -> Result<()> {256		let str = self.clone().into_string();257		self.arg("--log-format").arg("internal-json");258		let cmd = self.wrap_sudo_if_needed().into_command()?;259		match cmd {260			Either::Left(mut cmd) => {261				cmd.stdout(Stdio::inherit());262				run_nix_inner(str, cmd, &mut NixHandler::default()).await263			}264			Either::Right(mut cmd) => {265				cmd.stdout(openssh::Stdio::inherit());266				run_nix_inner_ssh(str, cmd, &mut NixHandler::default()).await267			}268		}269	}270}271272struct EmptyAsyncRead;273impl AsyncRead for EmptyAsyncRead {274	fn poll_read(275		self: std::pin::Pin<&mut Self>,276		_cx: &mut std::task::Context<'_>,277		_buf: &mut tokio::io::ReadBuf<'_>,278	) -> Poll<std::io::Result<()>> {279		Poll::Pending280	}281}282283async fn run_nix_inner_stdout(284	str: String,285	cmd: Command,286	handler: &mut dyn Handler,287) -> Result<Vec<u8>> {288	Ok(run_nix_inner_raw(str, cmd, true, handler, None)289		.await?290		.expect("has out"))291}292async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {293	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;294	assert!(v.is_none());295	Ok(())296}297async fn run_nix_inner_stdout_ssh(298	str: String,299	cmd: OwningCommand<Arc<Session>>,300	handler: &mut dyn Handler,301) -> Result<Vec<u8>> {302	Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)303		.await?304		.expect("has out"))305}306async fn run_nix_inner_ssh(307	str: String,308	cmd: OwningCommand<Arc<Session>>,309	handler: &mut dyn Handler,310) -> Result<()> {311	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;312	assert!(v.is_none());313	Ok(())314}315316async fn run_nix_inner_raw(317	str: String,318	mut cmd: Command,319	want_stdout: bool,320	err_handler: &mut dyn Handler,321	mut out_handler: Option<&mut dyn Handler>,322) -> Result<Option<Vec<u8>>> {323	cmd.stderr(Stdio::piped());324	cmd.stdout(Stdio::piped());325	debug!("running command {str:?} on local");326	let mut child = cmd.spawn()?;327	let mut stderr = child.stderr.take().unwrap();328	let stdout = child.stdout.take().unwrap();329	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());330	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));331	let mut ob = want_stdout332		.then(|| out.take().unwrap())333		.unwrap_or_else(|| Box::new(EmptyAsyncRead));334	let mut ol = (!want_stdout)335		.then(|| out.take().unwrap())336		.unwrap_or_else(|| Box::new(EmptyAsyncRead));337	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());338	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());339340	// while let Some(line) = read.next().await? {}341342	let mut out_buf = if want_stdout { Some(vec![]) } else { None };343	loop {344		select! {345			e = err.next() => {346				if let Some(e) = e {347					let e = e?;348					err_handler.handle_line(&e);349				}350			},351			o = ob.next() => {352				if let Some(o) = o {353					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);354				}355			},356			o = ol.next() => {357				if let Some(o) = o {358					let o = o?;359					if let Some(out) = out_handler.as_mut() {360						out.handle_line(&o)361					} else {362						err_handler.handle_line(&o)363					}364					// out_handler.handle_info(&o);365				}366			},367			code = child.wait() => {368				let code = code?;369				if !code.success() {370					anyhow::bail!("command '{str}' failed with status {}", code);371				}372				break;373			}374		}375	}376377	Ok(out_buf)378}379async fn run_nix_inner_raw_ssh(380	str: String,381	mut cmd: OwningCommand<Arc<Session>>,382	want_stdout: bool,383	err_handler: &mut dyn Handler,384	mut out_handler: Option<&mut dyn Handler>,385) -> Result<Option<Vec<u8>>> {386	debug!("running command {str:?} over ssh");387	cmd.stderr(openssh::Stdio::piped());388	cmd.stdout(openssh::Stdio::piped());389	let mut child = cmd.spawn().await?;390	let mut stderr = child.stderr().take().unwrap();391	let stdout = child.stdout().take().unwrap();392	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());393	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));394	let mut ob = want_stdout395		.then(|| out.take().unwrap())396		.unwrap_or_else(|| Box::new(EmptyAsyncRead));397	let mut ol = (!want_stdout)398		.then(|| out.take().unwrap())399		.unwrap_or_else(|| Box::new(EmptyAsyncRead));400	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());401	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());402403	// while let Some(line) = read.next().await? {}404405	let mut out_buf = if want_stdout { Some(vec![]) } else { None };406407	let mut wait_future = pin::pin!(child.wait());408	loop {409		select! {410			e = err.next() => {411				if let Some(e) = e {412					let e = e?;413					err_handler.handle_line(&e);414				}415			},416			o = ob.next() => {417				if let Some(o) = o {418					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);419				}420			},421			o = ol.next() => {422				if let Some(o) = o {423					let o = o?;424					if let Some(out) = out_handler.as_mut() {425						out.handle_line(&o)426					} else {427						err_handler.handle_line(&o)428					}429					// out_handler.handle_info(&o);430				}431			},432			code = &mut wait_future => {433				let code = code?;434				if !code.success() {435					anyhow::bail!("command '{str}' failed with status {}", code);436				}437				break;438			}439		}440	}441442	Ok(out_buf)443}
after · crates/fleet-base/src/command.rs
1use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};23use anyhow::{anyhow, Result};4use better_command::{Handler, NixHandler, PlainHandler};5use futures::StreamExt;6use itertools::Either;7use openssh::{OverSsh, OwningCommand, Session};8use serde::de::DeserializeOwned;9use tokio::{io::AsyncRead, process::Command, select};10use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};11use tracing::debug;1213use crate::host::EscalationStrategy;1415fn escape_bash(input: &str, out: &mut String) {16	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";17	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {18		out.push_str(input);19		return;20	}21	out.push('\'');22	for (i, v) in input.split('\'').enumerate() {23		if i != 0 {24			out.push_str("'\"'\"'");25		}26		out.push_str(v);27	}28	out.push('\'');29}30fn ostoutf8(os: impl AsRef<OsStr>) -> String {31	os.as_ref().to_str().expect("non-utf8 data").to_owned()32}3334#[derive(Clone, Debug)]35pub struct MyCommand {36	command: String,37	args: Vec<String>,38	env: Vec<(String, String)>,39	ssh_session: Option<Arc<Session>>,40	escalation: EscalationStrategy,41	escalate: bool,42}43impl MyCommand {44	pub fn new_on(45		escalation: EscalationStrategy,46		cmd: impl AsRef<OsStr>,47		session: Arc<Session>,48	) -> Self {49		assert!(!cmd.as_ref().is_empty());50		Self {51			command: ostoutf8(cmd),52			args: vec![],53			env: vec![],54			ssh_session: Some(session),55			escalation,56			escalate: false,57		}58	}59	pub fn new(escalation: EscalationStrategy, cmd: impl AsRef<OsStr>) -> Self {60		assert!(!cmd.as_ref().is_empty());61		Self {62			command: ostoutf8(cmd),63			args: vec![],64			env: vec![],65			ssh_session: None,66			escalation,67			escalate: false,68		}69	}70	fn new_here(&self, cmd: impl AsRef<OsStr>) -> Self {71		if let Some(ssh_session) = self.ssh_session.clone() {72			Self::new_on(self.escalation, cmd, ssh_session)73		} else {74			Self::new(self.escalation, cmd)75		}76	}7778	fn into_args(self) -> Vec<String> {79		let mut out = Vec::new();80		if !self.env.is_empty() {81			out.push("env".to_owned());82			for (k, v) in self.env {83				assert!(!k.contains('='));84				out.push(format!("{k}={v}"));85			}86		}87		out.push(self.command);88		out.extend(self.args);89		out90	}9192	/// Translates environment variables into env command execution.93	/// Required for ssh, as ssh don't allow to send environment variables (at least by default).94	///95	/// FIXME: Insecure, as arguments might be seen by other users on the same machine.96	/// Figure out some way to transfer environment using stdio?97	fn translate_env_into_env(self) -> Self {98		if self.env.is_empty() {99			return self;100		}101		let mut out = self.new_here("env");102		for (k, v) in self.env {103			assert!(!k.contains('='));104			out.arg(format!("{k}={v}"));105		}106		out.arg(self.command);107		out.args(self.args);108109		out110	}111	fn into_string(self) -> String {112		let mut out = String::new();113		if !self.env.is_empty() {114			out.push_str("env");115			for (k, v) in self.env {116				out.push(' ');117				assert!(!k.contains('='));118				escape_bash(&k, &mut out);119				out.push('=');120				escape_bash(&v, &mut out);121			}122		}123		if !out.is_empty() {124			out.push(' ');125		}126		escape_bash(&self.command, &mut out);127		for arg in self.args {128			out.push(' ');129			escape_bash(&arg, &mut out);130		}131		out132	}133	fn into_command_unchecked_local(self) -> Command {134		let mut out = Command::new(self.command);135		out.args(self.args);136		for (k, v) in self.env {137			out.env(k, v);138		}139		out140	}141	fn into_command(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {142		Ok(if let Some(session) = self.ssh_session.clone() {143			let cmd = self.translate_env_into_env().into_command_unchecked_local();144			Either::Right(145				cmd.over_ssh(session)146					.map_err(|e| anyhow!("ssh error: {e}"))?,147			)148		} else {149			let cmd = self.into_command_unchecked_local();150			Either::Left(cmd)151		})152	}153	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {154		let arg = arg.as_ref();155		self.args.push(ostoutf8(arg));156		self157	}158	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {159		let arg = arg.as_ref();160		let value = value.as_ref();161		let arg = ostoutf8(arg);162		let value = ostoutf8(value);163		self.arg(format!("{arg}={value}"));164		self165	}166	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {167		self.arg(arg);168		self.arg(value);169		self170	}171	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {172		self.env173			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));174		self175	}176	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {177		for arg in args.into_iter() {178			let arg = arg.as_ref();179			self.args.push(ostoutf8(arg));180		}181		self182	}183	pub fn sudo(mut self) -> Self {184		self.escalate = true;185		self186	}187	fn wrap_sudo_if_needed(self) -> Self {188		if !self.escalate {189			return self;190		}191		match self.escalation {192			EscalationStrategy::Su => {193				let mut out = self.new_here("su");194				out.arg("-c").arg(self.into_string());195				out196			}197			EscalationStrategy::Sudo => {198				let mut out = self.new_here("sudo");199				out.args(self.into_args());200				out201			}202			EscalationStrategy::Run0 => {203				// run0 wants interactive authentication by default.204				let mut run0 = self.new_here("run0");205				let mut out = self.new_here("script");206207				// Red backgrounds messes with fleet formatting208				run0.arg("--background=");209				run0.args(self.into_args());210211				out.arg("-q");212				out.arg("/dev/null");213				out.arg("-c");214				out.arg(run0.into_string());215				dbg!(&out);216				out217			}218		}219	}220221	pub async fn run(self) -> Result<()> {222		let str = self.clone().into_string();223		let cmd = self.wrap_sudo_if_needed().into_command()?;224		match cmd {225			Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,226			Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,227		};228		Ok(())229	}230	pub async fn run_string(self) -> Result<String> {231		let bytes = self.run_bytes().await?;232		Ok(String::from_utf8(bytes)?)233	}234	pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {235		let v = self.run_string().await?;236		Ok(serde_json::from_str(&v)?)237	}238	pub async fn run_bytes(self) -> Result<Vec<u8>> {239		let str = self.clone().into_string();240		let cmd = self.wrap_sudo_if_needed().into_command()?;241		let v = match cmd {242			Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,243			Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,244		};245		Ok(v)246	}247248	pub async fn run_nix_string(mut self) -> Result<String> {249		let str = self.clone().into_string();250		self.arg("--log-format").arg("internal-json");251		let cmd = self.wrap_sudo_if_needed().into_command()?;252		let bytes = match cmd {253			Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?,254			Either::Right(cmd) => {255				run_nix_inner_stdout_ssh(str, cmd, &mut NixHandler::default()).await?256			}257		};258		Ok(String::from_utf8(bytes)?)259	}260	pub async fn run_nix(mut self) -> Result<()> {261		let str = self.clone().into_string();262		self.arg("--log-format").arg("internal-json");263		let cmd = self.wrap_sudo_if_needed().into_command()?;264		match cmd {265			Either::Left(mut cmd) => {266				cmd.stdout(Stdio::inherit());267				run_nix_inner(str, cmd, &mut NixHandler::default()).await268			}269			Either::Right(mut cmd) => {270				cmd.stdout(openssh::Stdio::inherit());271				run_nix_inner_ssh(str, cmd, &mut NixHandler::default()).await272			}273		}274	}275}276277struct EmptyAsyncRead;278impl AsyncRead for EmptyAsyncRead {279	fn poll_read(280		self: std::pin::Pin<&mut Self>,281		_cx: &mut std::task::Context<'_>,282		_buf: &mut tokio::io::ReadBuf<'_>,283	) -> Poll<std::io::Result<()>> {284		Poll::Pending285	}286}287288async fn run_nix_inner_stdout(289	str: String,290	cmd: Command,291	handler: &mut dyn Handler,292) -> Result<Vec<u8>> {293	Ok(run_nix_inner_raw(str, cmd, true, handler, None)294		.await?295		.expect("has out"))296}297async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {298	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;299	assert!(v.is_none());300	Ok(())301}302async fn run_nix_inner_stdout_ssh(303	str: String,304	cmd: OwningCommand<Arc<Session>>,305	handler: &mut dyn Handler,306) -> Result<Vec<u8>> {307	Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)308		.await?309		.expect("has out"))310}311async fn run_nix_inner_ssh(312	str: String,313	cmd: OwningCommand<Arc<Session>>,314	handler: &mut dyn Handler,315) -> Result<()> {316	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;317	assert!(v.is_none());318	Ok(())319}320321async fn run_nix_inner_raw(322	str: String,323	mut cmd: Command,324	want_stdout: bool,325	err_handler: &mut dyn Handler,326	mut out_handler: Option<&mut dyn Handler>,327) -> Result<Option<Vec<u8>>> {328	cmd.stderr(Stdio::piped());329	cmd.stdout(Stdio::piped());330	debug!("running command {str:?} on local");331	let mut child = cmd.spawn()?;332	let mut stderr = child.stderr.take().unwrap();333	let stdout = child.stdout.take().unwrap();334	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());335	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));336	let mut ob = want_stdout337		.then(|| out.take().unwrap())338		.unwrap_or_else(|| Box::new(EmptyAsyncRead));339	let mut ol = (!want_stdout)340		.then(|| out.take().unwrap())341		.unwrap_or_else(|| Box::new(EmptyAsyncRead));342	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());343	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());344345	// while let Some(line) = read.next().await? {}346347	let mut out_buf = if want_stdout { Some(vec![]) } else { None };348	loop {349		select! {350			e = err.next() => {351				if let Some(e) = e {352					let e = e?;353					err_handler.handle_line(&e);354				}355			},356			o = ob.next() => {357				if let Some(o) = o {358					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);359				}360			},361			o = ol.next() => {362				if let Some(o) = o {363					let o = o?;364					if let Some(out) = out_handler.as_mut() {365						out.handle_line(&o)366					} else {367						err_handler.handle_line(&o)368					}369					// out_handler.handle_info(&o);370				}371			},372			code = child.wait() => {373				let code = code?;374				if !code.success() {375					anyhow::bail!("command '{str}' failed with status {}", code);376				}377				break;378			}379		}380	}381382	Ok(out_buf)383}384async fn run_nix_inner_raw_ssh(385	str: String,386	mut cmd: OwningCommand<Arc<Session>>,387	want_stdout: bool,388	err_handler: &mut dyn Handler,389	mut out_handler: Option<&mut dyn Handler>,390) -> Result<Option<Vec<u8>>> {391	debug!("running command {str:?} over ssh");392	cmd.stderr(openssh::Stdio::piped());393	cmd.stdout(openssh::Stdio::piped());394	let mut child = cmd.spawn().await?;395	let mut stderr = child.stderr().take().unwrap();396	let stdout = child.stdout().take().unwrap();397	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());398	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));399	let mut ob = want_stdout400		.then(|| out.take().unwrap())401		.unwrap_or_else(|| Box::new(EmptyAsyncRead));402	let mut ol = (!want_stdout)403		.then(|| out.take().unwrap())404		.unwrap_or_else(|| Box::new(EmptyAsyncRead));405	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());406	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());407408	// while let Some(line) = read.next().await? {}409410	let mut out_buf = if want_stdout { Some(vec![]) } else { None };411412	let mut wait_future = pin::pin!(child.wait());413	loop {414		select! {415			e = err.next() => {416				if let Some(e) = e {417					let e = e?;418					err_handler.handle_line(&e);419				}420			},421			o = ob.next() => {422				if let Some(o) = o {423					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);424				}425			},426			o = ol.next() => {427				if let Some(o) = o {428					let o = o?;429					if let Some(out) = out_handler.as_mut() {430						out.handle_line(&o)431					} else {432						err_handler.handle_line(&o)433					}434					// out_handler.handle_info(&o);435				}436			},437			code = &mut wait_future => {438				let code = code?;439				if !code.success() {440					anyhow::bail!("command '{str}' failed with status {}", code);441				}442				break;443			}444		}445	}446447	Ok(out_buf)448}
modifiedcrates/fleet-base/src/host.rsdiffbeforeafterboth
--- a/crates/fleet-base/src/host.rs
+++ b/crates/fleet-base/src/host.rs
@@ -105,6 +105,14 @@
 		let path = cmd.run_string().await?;
 		Ok(path.trim_end().to_owned())
 	}
+	pub async fn file_exists(&self, path: impl AsRef<OsStr>) -> Result<bool> {
+		let mut cmd = self.cmd("sh").await?;
+		cmd.arg("-c")
+			.arg("test -e \"$1\" && echo true || echo false")
+			.arg("_")
+			.arg(path);
+		Ok(cmd.run_value().await?)
+	}
 	pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
 		let mut cmd = self.cmd("cat").await?;
 		cmd.arg(path);
modifiedmodules/nixos/meta.nixdiffbeforeafterboth
--- a/modules/nixos/meta.nix
+++ b/modules/nixos/meta.nix
@@ -1,8 +1,17 @@
-{lib, ...}: let
+{ lib, ... }:
+let
   inherit (lib.modules) mkRemovedOptionModule;
-in {
+in
+{
   imports = [
-    (mkRemovedOptionModule ["tags"] "tags are now defined at the host level, not the nixos system level for fast filtering without evaluating unnecessary hosts.")
-    (mkRemovedOptionModule ["network"] "network is now defined at the host level, not the nixos system level")
+    (mkRemovedOptionModule [ "tags" ]
+      "tags are now defined at the host level, not the nixos system level for fast filtering without evaluating unnecessary hosts."
+    )
+    (mkRemovedOptionModule [
+      "network"
+    ] "network is now defined at the host level, not the nixos system level")
   ];
+
+  # Version of environment (fleet scripts such as rollback) already installed on the host
+  config.environment.etc.FLEET_HOST.text = "1";
 }