git.delta.rocks / jrsonnet / refs/commits / 3972fee37ee3

difftreelog

feat explicitly mark hosts as managed by fleet

Lach2025-04-05parent: #a1a72ce.patch.diff
in: trunk

7 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -924,6 +924,7 @@
  "hostname",
  "human-repr",
  "indicatif",
+ "indoc",
  "itertools 0.13.0",
  "nix-eval",
  "nixlike",
@@ -1537,6 +1538,12 @@
 ]
 
 [[package]]
+name = "indoc"
+version = "2.0.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd"
+
+[[package]]
 name = "inout"
 version = "0.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -47,6 +47,7 @@
 nix-eval.workspace = true
 nom = "7.1.3"
 fleet-base = { version = "0.1.0", path = "../../crates/fleet-base" }
+indoc = "2.0.6"
 
 [features]
 default = ["indicatif"]
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -1,6 +1,6 @@
-use std::{env::current_dir, os::unix::fs::symlink, path::PathBuf, time::Duration};
+use std::{env::current_dir, os::unix::fs::symlink, path::PathBuf, str::FromStr, time::Duration};
 
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, bail, Result};
 use clap::{Parser, ValueEnum};
 use fleet_base::{
 	host::{Config, ConfigHost},
@@ -132,6 +132,7 @@
 	disable_rollback: bool,
 ) -> Result<()> {
 	let mut failed = false;
+
 	// TODO: Lockfile, to prevent concurrent system switch?
 	// TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback
 	// is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to
@@ -332,6 +333,24 @@
 	}
 }
 
+#[derive(Clone, PartialEq, Copy)]
+enum DeployKind {
+	// NixOS => NixOS managed by fleet
+	UpgradeToFleet,
+	// NixOS managed by fleet => NixOS managed by fleet
+	Fleet,
+}
+impl FromStr for DeployKind {
+	type Err = anyhow::Error;
+	fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+		match s {
+			"upgrade-to-fleet" => Ok(Self::UpgradeToFleet),
+			"fleet" => Ok(Self::Fleet),
+			v => bail!("unknown deploy_kind: {v}; expected on of \"upgrade-to-fleet\", \"fleet\""),
+		}
+	}
+}
+
 impl Deploy {
 	pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
 		let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
@@ -348,6 +367,8 @@
 			let local_host = config.local_host();
 			let opts = opts.clone();
 			let batch = batch.clone();
+			let mut deploy_kind: Option<DeployKind> =
+				opts.action_attr(&host, "deploy_kind").await?;
 
 			set.spawn_local(
 				(async move {
@@ -356,10 +377,40 @@
 						{
 							Ok(path) => path,
 							Err(e) => {
-								error!("failed to deploy host: {}", e);
+								error!("failed to build host system closure: {}", e);
 								return;
 							}
 						};
+					if deploy_kind == None {
+						let is_fleet_managed = match host.file_exists("/etc/FLEET_HOST").await {
+							Ok(v) => v,
+							Err(e) => {
+								error!("failed to query remote system kind: {}", e);
+								return;
+							},
+						};
+						if !is_fleet_managed {
+							error!(indoc::indoc!{"
+								host is not marked as managed by fleet
+								if you're not trying to lustrate/install system from scratch,
+								you should either
+									1. manually create /etc/FLEET_HOST file on the target host,
+									2. use ?deploy_kind=fleet host argument if you're upgrading from older version of fleet
+									3. use ?deploy_kind=upgrade_to_fleet if you're upgrading from plain nixos to fleet-managed nixos
+							"});
+							return;
+						}
+						deploy_kind = Some(DeployKind::Fleet);
+					}
+					let deploy_kind = deploy_kind.expect("deploy_kind is set");
+
+					// TODO: Make disable_rollback a host attribute instead
+					let mut disable_rollback = self.disable_rollback;
+					if !disable_rollback && deploy_kind != DeployKind::Fleet {
+						warn!("disabling rollback, as not supported by non-fleet deployment kinds");
+						disable_rollback = true;
+					}
+
 					if !opts.is_local(&hostname) {
 						info!("uploading system closure");
 						{
@@ -411,7 +462,7 @@
 							error!("unreachable? failed to get specialization");
 							return;
 						},
-						self.disable_rollback,
+						disable_rollback,
 					)
 					.await
 					{
modifiedcmds/fleet/src/main.rsdiffbeforeafterboth
before · cmds/fleet/src/main.rs
1#![recursion_limit = "512"]23pub(crate) mod cmds;4// pub(crate) mod command;5pub(crate) mod extra_args;67use std::{ffi::OsString, process::ExitCode};89use anyhow::{bail, Result};10use clap::{CommandFactory, Parser};11use cmds::{12	build_systems::{BuildSystems, Deploy},13	complete::Complete,14	info::Info,15	secrets::Secret,16	tf::Tf,17};18use fleet_base::{host::Config, opts::FleetOpts};19use futures::{future::LocalBoxFuture, stream::FuturesUnordered, TryStreamExt};20// use host::Config;21#[cfg(feature = "indicatif")]22use human_repr::HumanCount;23#[cfg(feature = "indicatif")]24use indicatif::{ProgressState, ProgressStyle};25use tracing::{error, info, info_span, Instrument};26#[cfg(feature = "indicatif")]27use tracing_indicatif::IndicatifLayer;28use tracing_subscriber::{prelude::*, EnvFilter};2930#[derive(Parser)]31struct Prefetch {}32impl Prefetch {33	async fn run(&self, config: &Config) -> Result<()> {34		let mut prefetch_dir = config.directory.to_path_buf();35		prefetch_dir.push("prefetch");36		if !prefetch_dir.is_dir() {37			info!("nothing to prefetch: no prefetch directory");38			return Ok(());39		}40		let tasks = <FuturesUnordered<LocalBoxFuture<Result<()>>>>::new();41		for entry in std::fs::read_dir(&prefetch_dir)? {42			tasks.push(Box::pin(async {43				let entry = entry?;44				if !entry.metadata()?.is_file() {45					bail!("only files should exist in prefetch directory");46				}47				let span = info_span!(48					"prefetching",49					name = entry.file_name().to_string_lossy().as_ref()50				);51				let mut path = OsString::new();52				path.push("file://");53				path.push(entry.path());5455				let mut status = config.local_host().cmd("nix").await?;56				status.args(&config.nix_args);57				status.arg("store").arg("prefetch-file").arg(path);58				status.run_nix_string().instrument(span).await?;59				Ok(())60			}));61		}62		tasks.try_collect::<Vec<()>>().await?;63		Ok(())64	}65}6667#[derive(Parser)]68enum Opts {69	/// Prepare systems for deployments70	BuildSystems(BuildSystems),7172	Deploy(Deploy),73	/// Secret management74	#[clap(subcommand)]75	Secret(Secret),76	/// Upload prefetch directory to the nix store77	Prefetch(Prefetch),78	/// Config parsing79	Info(Info),80	/// Command completions81	#[clap(hide(true))]82	Complete(Complete),83	/// Compile and evaluate terranix configuration84	Tf(Tf),85}8687#[derive(Parser)]88#[clap(version, author)]89struct RootOpts {90	#[clap(flatten)]91	fleet_opts: FleetOpts,92	#[clap(subcommand)]93	command: Opts,94}9596async fn run_command(config: &Config, opts: FleetOpts, command: Opts) -> Result<()> {97	match command {98		Opts::BuildSystems(c) => c.run(config, &opts).await?,99		Opts::Deploy(d) => d.run(config, &opts).await?,100		Opts::Secret(s) => s.run(config, &opts).await?,101		Opts::Info(i) => i.run(config).await?,102		Opts::Prefetch(p) => p.run(config).await?,103		Opts::Tf(t) => t.run(config).await?,104		// TODO: actually parse commands before starting the async runtime105		Opts::Complete(c) => {106			tokio::task::spawn_blocking(move || c.run(RootOpts::command())).await?107		}108	};109	Ok(())110}111112fn setup_logging() {113	#[cfg(feature = "indicatif")]114	let indicatif_layer = {115		use std::time::Duration;116117		IndicatifLayer::new().with_progress_style(118			ProgressStyle::with_template(119				"{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{download_progress} {elapsed}{color_end}",120			)121				.unwrap()122				.with_key("download_progress", |state: &ProgressState, writer: &mut dyn std::fmt::Write| {123					let Some(len) = state.len() else {124						return;125					};126					let pos = state.pos();127					if pos > len {128						let _ = write!(writer, "{}", pos.human_count_bare());129					} else {130						let _ = write!(writer, "{} / {}", pos.human_count_bare(), len.human_count_bare());131					}132				})133				.with_key(134					"color_start",135					|state: &ProgressState, writer: &mut dyn std::fmt::Write| {136						let elapsed = state.elapsed();137138						if elapsed > Duration::from_secs(60) {139							// Red140							let _ = write!(writer, "\x1b[{}m", 1 + 30);141						} else if elapsed > Duration::from_secs(30) {142							// Yellow143							let _ = write!(writer, "\x1b[{}m", 3 + 30);144						}145					},146				)147				.with_key(148					"color_end",149					|state: &ProgressState, writer: &mut dyn std::fmt::Write| {150						if state.elapsed() > Duration::from_secs(30) {151							let _ = write!(writer, "\x1b[0m");152						}153					},154				),155		)156	};157158	let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));159160	let reg = tracing_subscriber::registry().with({161		let sub = tracing_subscriber::fmt::layer()162			.without_time()163			.with_target(false);164		#[cfg(feature = "indicatif")]165		let sub = sub.with_writer(indicatif_layer.get_stdout_writer());166		sub.with_filter(filter) // .without,167	});168	// #[cfg(feature = "indicatif")]169	#[cfg(feature = "indicatif")]170	let reg = reg.with(indicatif_layer);171	reg.init();172}173174fn main() -> ExitCode {175	let opts = RootOpts::parse();176	if let Opts::Complete(c) = &opts.command {177		c.run(RootOpts::command());178		return ExitCode::SUCCESS;179	}180181	setup_logging();182	async_main(opts)183}184185#[tokio::main]186async fn async_main(opts: RootOpts) -> ExitCode {187	if let Err(e) = main_real(opts).await {188		error!("{e:#}");189		return ExitCode::FAILURE;190	}191	ExitCode::SUCCESS192}193194async fn main_real(opts: RootOpts) -> Result<()> {195	nix_eval::init_tokio();196197	let nix_args = std::env::var_os("NIX_ARGS")198		.map(|a| extra_args::parse_os(&a))199		.transpose()?200		.unwrap_or_default();201	let config = opts202		.fleet_opts203		.build(204			nix_args,205			matches!(opts.command, Opts::Deploy(_) | Opts::BuildSystems(_)),206		)207		.await?;208209	match run_command(&config, opts.fleet_opts, opts.command).await {210		Ok(()) => {211			config.save()?;212			Ok(())213		}214		Err(e) => {215			let _ = config.save();216			Err(e)217		}218	}219}220221#[cfg(test)]222mod tests {223	use super::*;224225	#[test]226	fn verify_command() {227		use clap::CommandFactory;228		RootOpts::command().debug_assert();229	}230}
after · cmds/fleet/src/main.rs
1#![recursion_limit = "512"]23pub(crate) mod cmds;4// pub(crate) mod command;5pub(crate) mod extra_args;67use std::{ffi::OsString, process::ExitCode};89use anyhow::{bail, Result};10use clap::{CommandFactory, Parser};11use cmds::{12	build_systems::{BuildSystems, Deploy},13	complete::Complete,14	info::Info,15	secrets::Secret,16	tf::Tf,17};18use fleet_base::{host::Config, opts::FleetOpts};19use futures::{future::LocalBoxFuture, stream::FuturesUnordered, TryStreamExt};20// use host::Config;21#[cfg(feature = "indicatif")]22use human_repr::HumanCount;23#[cfg(feature = "indicatif")]24use indicatif::{ProgressState, ProgressStyle};25use tracing::{error, info, info_span, Instrument};26#[cfg(feature = "indicatif")]27use tracing_indicatif::IndicatifLayer;28use tracing_subscriber::{prelude::*, EnvFilter};2930#[derive(Parser)]31struct Prefetch {}32impl Prefetch {33	async fn run(&self, config: &Config) -> Result<()> {34		let mut prefetch_dir = config.directory.to_path_buf();35		prefetch_dir.push("prefetch");36		if !prefetch_dir.is_dir() {37			info!("nothing to prefetch: no prefetch directory");38			return Ok(());39		}40		let tasks = <FuturesUnordered<LocalBoxFuture<Result<()>>>>::new();41		for entry in std::fs::read_dir(&prefetch_dir)? {42			tasks.push(Box::pin(async {43				let entry = entry?;44				if !entry.metadata()?.is_file() {45					bail!("only files should exist in prefetch directory");46				}47				let span = info_span!(48					"prefetching",49					name = entry.file_name().to_string_lossy().as_ref()50				);51				let mut path = OsString::new();52				path.push("file://");53				path.push(entry.path());5455				let mut status = config.local_host().cmd("nix").await?;56				status.args(&config.nix_args);57				status.arg("store").arg("prefetch-file").arg(path);58				status.run_nix_string().instrument(span).await?;59				Ok(())60			}));61		}62		tasks.try_collect::<Vec<()>>().await?;63		Ok(())64	}65}6667#[derive(Parser)]68enum Opts {69	/// Build system closures70	BuildSystems(BuildSystems),71	/// Upload and switch system closures72	Deploy(Deploy),73	/// Secret management74	#[clap(subcommand)]75	Secret(Secret),76	/// Upload prefetch directory to the nix store77	Prefetch(Prefetch),78	/// Config parsing79	Info(Info),80	/// Command completions81	#[clap(hide(true))]82	Complete(Complete),83	/// Compile and evaluate terranix configuration84	Tf(Tf),85}8687#[derive(Parser)]88#[clap(version, author)]89struct RootOpts {90	#[clap(flatten)]91	fleet_opts: FleetOpts,92	#[clap(subcommand)]93	command: Opts,94}9596async fn run_command(config: &Config, opts: FleetOpts, command: Opts) -> Result<()> {97	match command {98		Opts::BuildSystems(c) => c.run(config, &opts).await?,99		Opts::Deploy(d) => d.run(config, &opts).await?,100		Opts::Secret(s) => s.run(config, &opts).await?,101		Opts::Info(i) => i.run(config).await?,102		Opts::Prefetch(p) => p.run(config).await?,103		Opts::Tf(t) => t.run(config).await?,104		// TODO: actually parse commands before starting the async runtime105		Opts::Complete(c) => {106			tokio::task::spawn_blocking(move || c.run(RootOpts::command())).await?107		}108	};109	Ok(())110}111112fn setup_logging() {113	#[cfg(feature = "indicatif")]114	let indicatif_layer = {115		use std::time::Duration;116117		IndicatifLayer::new().with_progress_style(118			ProgressStyle::with_template(119				"{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{download_progress} {elapsed}{color_end}",120			)121				.unwrap()122				.with_key("download_progress", |state: &ProgressState, writer: &mut dyn std::fmt::Write| {123					let Some(len) = state.len() else {124						return;125					};126					let pos = state.pos();127					if pos > len {128						let _ = write!(writer, "{}", pos.human_count_bare());129					} else {130						let _ = write!(writer, "{} / {}", pos.human_count_bare(), len.human_count_bare());131					}132				})133				.with_key(134					"color_start",135					|state: &ProgressState, writer: &mut dyn std::fmt::Write| {136						let elapsed = state.elapsed();137138						if elapsed > Duration::from_secs(60) {139							// Red140							let _ = write!(writer, "\x1b[{}m", 1 + 30);141						} else if elapsed > Duration::from_secs(30) {142							// Yellow143							let _ = write!(writer, "\x1b[{}m", 3 + 30);144						}145					},146				)147				.with_key(148					"color_end",149					|state: &ProgressState, writer: &mut dyn std::fmt::Write| {150						if state.elapsed() > Duration::from_secs(30) {151							let _ = write!(writer, "\x1b[0m");152						}153					},154				),155		)156	};157158	let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));159160	let reg = tracing_subscriber::registry().with({161		let sub = tracing_subscriber::fmt::layer()162			.without_time()163			.with_target(false);164		#[cfg(feature = "indicatif")]165		let sub = sub.with_writer(indicatif_layer.get_stdout_writer());166		sub.with_filter(filter) // .without,167	});168	// #[cfg(feature = "indicatif")]169	#[cfg(feature = "indicatif")]170	let reg = reg.with(indicatif_layer);171	reg.init();172}173174fn main() -> ExitCode {175	let opts = RootOpts::parse();176	if let Opts::Complete(c) = &opts.command {177		c.run(RootOpts::command());178		return ExitCode::SUCCESS;179	}180181	setup_logging();182	async_main(opts)183}184185#[tokio::main]186async fn async_main(opts: RootOpts) -> ExitCode {187	if let Err(e) = main_real(opts).await {188		error!("{e:#}");189		return ExitCode::FAILURE;190	}191	ExitCode::SUCCESS192}193194async fn main_real(opts: RootOpts) -> Result<()> {195	nix_eval::init_tokio();196197	let nix_args = std::env::var_os("NIX_ARGS")198		.map(|a| extra_args::parse_os(&a))199		.transpose()?200		.unwrap_or_default();201	let config = opts202		.fleet_opts203		.build(204			nix_args,205			matches!(opts.command, Opts::Deploy(_) | Opts::BuildSystems(_)),206		)207		.await?;208209	match run_command(&config, opts.fleet_opts, opts.command).await {210		Ok(()) => {211			config.save()?;212			Ok(())213		}214		Err(e) => {215			let _ = config.save();216			Err(e)217		}218	}219}220221#[cfg(test)]222mod tests {223	use super::*;224225	#[test]226	fn verify_command() {227		use clap::CommandFactory;228		RootOpts::command().debug_assert();229	}230}
modifiedcrates/fleet-base/src/command.rsdiffbeforeafterboth
--- a/crates/fleet-base/src/command.rs
+++ b/crates/fleet-base/src/command.rs
@@ -5,6 +5,7 @@
 use futures::StreamExt;
 use itertools::Either;
 use openssh::{OverSsh, OwningCommand, Session};
+use serde::de::DeserializeOwned;
 use tokio::{io::AsyncRead, process::Command, select};
 use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
 use tracing::debug;
@@ -230,6 +231,10 @@
 		let bytes = self.run_bytes().await?;
 		Ok(String::from_utf8(bytes)?)
 	}
+	pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {
+		let v = self.run_string().await?;
+		Ok(serde_json::from_str(&v)?)
+	}
 	pub async fn run_bytes(self) -> Result<Vec<u8>> {
 		let str = self.clone().into_string();
 		let cmd = self.wrap_sudo_if_needed().into_command()?;
modifiedcrates/fleet-base/src/host.rsdiffbeforeafterboth
--- a/crates/fleet-base/src/host.rs
+++ b/crates/fleet-base/src/host.rs
@@ -105,6 +105,14 @@
 		let path = cmd.run_string().await?;
 		Ok(path.trim_end().to_owned())
 	}
+	pub async fn file_exists(&self, path: impl AsRef<OsStr>) -> Result<bool> {
+		let mut cmd = self.cmd("sh").await?;
+		cmd.arg("-c")
+			.arg("test -e \"$1\" && echo true || echo false")
+			.arg("_")
+			.arg(path);
+		Ok(cmd.run_value().await?)
+	}
 	pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
 		let mut cmd = self.cmd("cat").await?;
 		cmd.arg(path);
modifiedmodules/nixos/meta.nixdiffbeforeafterboth
--- a/modules/nixos/meta.nix
+++ b/modules/nixos/meta.nix
@@ -1,8 +1,17 @@
-{lib, ...}: let
+{ lib, ... }:
+let
   inherit (lib.modules) mkRemovedOptionModule;
-in {
+in
+{
   imports = [
-    (mkRemovedOptionModule ["tags"] "tags are now defined at the host level, not the nixos system level for fast filtering without evaluating unnecessary hosts.")
-    (mkRemovedOptionModule ["network"] "network is now defined at the host level, not the nixos system level")
+    (mkRemovedOptionModule [ "tags" ]
+      "tags are now defined at the host level, not the nixos system level for fast filtering without evaluating unnecessary hosts."
+    )
+    (mkRemovedOptionModule [
+      "network"
+    ] "network is now defined at the host level, not the nixos system level")
   ];
+
+  # Version of environment (fleet scripts such as rollback) already installed on the host
+  config.environment.etc.FLEET_HOST.text = "1";
 }