difftreelog
feat explicitly mark hosts as managed by fleet
in: trunk
7 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -924,6 +924,7 @@
"hostname",
"human-repr",
"indicatif",
+ "indoc",
"itertools 0.13.0",
"nix-eval",
"nixlike",
@@ -1537,6 +1538,12 @@
]
[[package]]
+name = "indoc"
+version = "2.0.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd"
+
+[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -47,6 +47,7 @@
nix-eval.workspace = true
nom = "7.1.3"
fleet-base = { version = "0.1.0", path = "../../crates/fleet-base" }
+indoc = "2.0.6"
[features]
default = ["indicatif"]
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -1,6 +1,6 @@
-use std::{env::current_dir, os::unix::fs::symlink, path::PathBuf, time::Duration};
+use std::{env::current_dir, os::unix::fs::symlink, path::PathBuf, str::FromStr, time::Duration};
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, bail, Result};
use clap::{Parser, ValueEnum};
use fleet_base::{
host::{Config, ConfigHost},
@@ -132,6 +132,7 @@
disable_rollback: bool,
) -> Result<()> {
let mut failed = false;
+
// TODO: Lockfile, to prevent concurrent system switch?
// TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback
// is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to
@@ -332,6 +333,24 @@
}
}
+#[derive(Clone, PartialEq, Copy)]
+enum DeployKind {
+ // NixOS => NixOS managed by fleet
+ UpgradeToFleet,
+ // NixOS managed by fleet => NixOS managed by fleet
+ Fleet,
+}
+impl FromStr for DeployKind {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+ match s {
+ "upgrade-to-fleet" => Ok(Self::UpgradeToFleet),
+ "fleet" => Ok(Self::Fleet),
+ v => bail!("unknown deploy_kind: {v}; expected on of \"upgrade-to-fleet\", \"fleet\""),
+ }
+ }
+}
+
impl Deploy {
pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
@@ -348,6 +367,8 @@
let local_host = config.local_host();
let opts = opts.clone();
let batch = batch.clone();
+ let mut deploy_kind: Option<DeployKind> =
+ opts.action_attr(&host, "deploy_kind").await?;
set.spawn_local(
(async move {
@@ -356,10 +377,40 @@
{
Ok(path) => path,
Err(e) => {
- error!("failed to deploy host: {}", e);
+ error!("failed to build host system closure: {}", e);
return;
}
};
+ if deploy_kind == None {
+ let is_fleet_managed = match host.file_exists("/etc/FLEET_HOST").await {
+ Ok(v) => v,
+ Err(e) => {
+ error!("failed to query remote system kind: {}", e);
+ return;
+ },
+ };
+ if !is_fleet_managed {
+ error!(indoc::indoc!{"
+ host is not marked as managed by fleet
+ if you're not trying to lustrate/install system from scratch,
+ you should either
+ 1. manually create /etc/FLEET_HOST file on the target host,
+ 2. use ?deploy_kind=fleet host argument if you're upgrading from older version of fleet
+ 3. use ?deploy_kind=upgrade_to_fleet if you're upgrading from plain nixos to fleet-managed nixos
+ "});
+ return;
+ }
+ deploy_kind = Some(DeployKind::Fleet);
+ }
+ let deploy_kind = deploy_kind.expect("deploy_kind is set");
+
+ // TODO: Make disable_rollback a host attribute instead
+ let mut disable_rollback = self.disable_rollback;
+ if !disable_rollback && deploy_kind != DeployKind::Fleet {
+ warn!("disabling rollback, as not supported by non-fleet deployment kinds");
+ disable_rollback = true;
+ }
+
if !opts.is_local(&hostname) {
info!("uploading system closure");
{
@@ -411,7 +462,7 @@
error!("unreachable? failed to get specialization");
return;
},
- self.disable_rollback,
+ disable_rollback,
)
.await
{
cmds/fleet/src/main.rsdiffbeforeafterboth1#![recursion_limit = "512"]23pub(crate) mod cmds;4// pub(crate) mod command;5pub(crate) mod extra_args;67use std::{ffi::OsString, process::ExitCode};89use anyhow::{bail, Result};10use clap::{CommandFactory, Parser};11use cmds::{12 build_systems::{BuildSystems, Deploy},13 complete::Complete,14 info::Info,15 secrets::Secret,16 tf::Tf,17};18use fleet_base::{host::Config, opts::FleetOpts};19use futures::{future::LocalBoxFuture, stream::FuturesUnordered, TryStreamExt};20// use host::Config;21#[cfg(feature = "indicatif")]22use human_repr::HumanCount;23#[cfg(feature = "indicatif")]24use indicatif::{ProgressState, ProgressStyle};25use tracing::{error, info, info_span, Instrument};26#[cfg(feature = "indicatif")]27use tracing_indicatif::IndicatifLayer;28use tracing_subscriber::{prelude::*, EnvFilter};2930#[derive(Parser)]31struct Prefetch {}32impl Prefetch {33 async fn run(&self, config: &Config) -> Result<()> {34 let mut prefetch_dir = config.directory.to_path_buf();35 prefetch_dir.push("prefetch");36 if !prefetch_dir.is_dir() {37 info!("nothing to prefetch: no prefetch directory");38 return Ok(());39 }40 let tasks = <FuturesUnordered<LocalBoxFuture<Result<()>>>>::new();41 for entry in std::fs::read_dir(&prefetch_dir)? {42 tasks.push(Box::pin(async {43 let entry = entry?;44 if !entry.metadata()?.is_file() {45 bail!("only files should exist in prefetch directory");46 }47 let span = info_span!(48 "prefetching",49 name = entry.file_name().to_string_lossy().as_ref()50 );51 let mut path = OsString::new();52 path.push("file://");53 path.push(entry.path());5455 let mut status = config.local_host().cmd("nix").await?;56 status.args(&config.nix_args);57 status.arg("store").arg("prefetch-file").arg(path);58 status.run_nix_string().instrument(span).await?;59 Ok(())60 }));61 }62 tasks.try_collect::<Vec<()>>().await?;63 Ok(())64 }65}6667#[derive(Parser)]68enum Opts {69 /// Prepare systems for deployments70 BuildSystems(BuildSystems),7172 Deploy(Deploy),73 /// Secret management74 #[clap(subcommand)]75 Secret(Secret),76 /// Upload prefetch directory to the nix store77 Prefetch(Prefetch),78 /// Config parsing79 Info(Info),80 /// Command completions81 #[clap(hide(true))]82 Complete(Complete),83 /// Compile and evaluate terranix configuration84 Tf(Tf),85}8687#[derive(Parser)]88#[clap(version, author)]89struct RootOpts {90 #[clap(flatten)]91 fleet_opts: FleetOpts,92 #[clap(subcommand)]93 command: Opts,94}9596async fn run_command(config: &Config, opts: FleetOpts, command: Opts) -> Result<()> {97 match command {98 Opts::BuildSystems(c) => c.run(config, &opts).await?,99 Opts::Deploy(d) => d.run(config, &opts).await?,100 Opts::Secret(s) => s.run(config, &opts).await?,101 Opts::Info(i) => i.run(config).await?,102 Opts::Prefetch(p) => p.run(config).await?,103 Opts::Tf(t) => t.run(config).await?,104 // TODO: actually parse commands before starting the async runtime105 Opts::Complete(c) => {106 tokio::task::spawn_blocking(move || c.run(RootOpts::command())).await?107 }108 };109 Ok(())110}111112fn setup_logging() {113 #[cfg(feature = "indicatif")]114 let indicatif_layer = {115 use std::time::Duration;116117 IndicatifLayer::new().with_progress_style(118 ProgressStyle::with_template(119 "{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{download_progress} {elapsed}{color_end}",120 )121 .unwrap()122 .with_key("download_progress", |state: &ProgressState, writer: &mut dyn std::fmt::Write| {123 let Some(len) = state.len() else {124 return;125 };126 let pos = state.pos();127 if pos > len {128 let _ = write!(writer, "{}", pos.human_count_bare());129 } else {130 let _ = write!(writer, "{} / {}", pos.human_count_bare(), len.human_count_bare());131 }132 })133 .with_key(134 "color_start",135 |state: &ProgressState, writer: &mut dyn std::fmt::Write| {136 let elapsed = state.elapsed();137138 if elapsed > Duration::from_secs(60) {139 // Red140 let _ = write!(writer, "\x1b[{}m", 1 + 30);141 } else if elapsed > Duration::from_secs(30) {142 // Yellow143 let _ = write!(writer, "\x1b[{}m", 3 + 30);144 }145 },146 )147 .with_key(148 "color_end",149 |state: &ProgressState, writer: &mut dyn std::fmt::Write| {150 if state.elapsed() > Duration::from_secs(30) {151 let _ = write!(writer, "\x1b[0m");152 }153 },154 ),155 )156 };157158 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));159160 let reg = tracing_subscriber::registry().with({161 let sub = tracing_subscriber::fmt::layer()162 .without_time()163 .with_target(false);164 #[cfg(feature = "indicatif")]165 let sub = sub.with_writer(indicatif_layer.get_stdout_writer());166 sub.with_filter(filter) // .without,167 });168 // #[cfg(feature = "indicatif")]169 #[cfg(feature = "indicatif")]170 let reg = reg.with(indicatif_layer);171 reg.init();172}173174fn main() -> ExitCode {175 let opts = RootOpts::parse();176 if let Opts::Complete(c) = &opts.command {177 c.run(RootOpts::command());178 return ExitCode::SUCCESS;179 }180181 setup_logging();182 async_main(opts)183}184185#[tokio::main]186async fn async_main(opts: RootOpts) -> ExitCode {187 if let Err(e) = main_real(opts).await {188 error!("{e:#}");189 return ExitCode::FAILURE;190 }191 ExitCode::SUCCESS192}193194async fn main_real(opts: RootOpts) -> Result<()> {195 nix_eval::init_tokio();196197 let nix_args = std::env::var_os("NIX_ARGS")198 .map(|a| extra_args::parse_os(&a))199 .transpose()?200 .unwrap_or_default();201 let config = opts202 .fleet_opts203 .build(204 nix_args,205 matches!(opts.command, Opts::Deploy(_) | Opts::BuildSystems(_)),206 )207 .await?;208209 match run_command(&config, opts.fleet_opts, opts.command).await {210 Ok(()) => {211 config.save()?;212 Ok(())213 }214 Err(e) => {215 let _ = config.save();216 Err(e)217 }218 }219}220221#[cfg(test)]222mod tests {223 use super::*;224225 #[test]226 fn verify_command() {227 use clap::CommandFactory;228 RootOpts::command().debug_assert();229 }230}1#![recursion_limit = "512"]23pub(crate) mod cmds;4// pub(crate) mod command;5pub(crate) mod extra_args;67use std::{ffi::OsString, process::ExitCode};89use anyhow::{bail, Result};10use clap::{CommandFactory, Parser};11use cmds::{12 build_systems::{BuildSystems, Deploy},13 complete::Complete,14 info::Info,15 secrets::Secret,16 tf::Tf,17};18use fleet_base::{host::Config, opts::FleetOpts};19use futures::{future::LocalBoxFuture, stream::FuturesUnordered, TryStreamExt};20// use host::Config;21#[cfg(feature = "indicatif")]22use human_repr::HumanCount;23#[cfg(feature = "indicatif")]24use indicatif::{ProgressState, ProgressStyle};25use tracing::{error, info, info_span, Instrument};26#[cfg(feature = "indicatif")]27use tracing_indicatif::IndicatifLayer;28use tracing_subscriber::{prelude::*, EnvFilter};2930#[derive(Parser)]31struct Prefetch {}32impl Prefetch {33 async fn run(&self, config: &Config) -> Result<()> {34 let mut prefetch_dir = config.directory.to_path_buf();35 prefetch_dir.push("prefetch");36 if !prefetch_dir.is_dir() {37 info!("nothing to prefetch: no prefetch directory");38 return Ok(());39 }40 let tasks = <FuturesUnordered<LocalBoxFuture<Result<()>>>>::new();41 for entry in std::fs::read_dir(&prefetch_dir)? {42 tasks.push(Box::pin(async {43 let entry = entry?;44 if !entry.metadata()?.is_file() {45 bail!("only files should exist in prefetch directory");46 }47 let span = info_span!(48 "prefetching",49 name = entry.file_name().to_string_lossy().as_ref()50 );51 let mut path = OsString::new();52 path.push("file://");53 path.push(entry.path());5455 let mut status = config.local_host().cmd("nix").await?;56 status.args(&config.nix_args);57 status.arg("store").arg("prefetch-file").arg(path);58 status.run_nix_string().instrument(span).await?;59 Ok(())60 }));61 }62 tasks.try_collect::<Vec<()>>().await?;63 Ok(())64 }65}6667#[derive(Parser)]68enum Opts {69 /// Build system closures70 BuildSystems(BuildSystems),71 /// Upload and switch system closures72 Deploy(Deploy),73 /// Secret management74 #[clap(subcommand)]75 Secret(Secret),76 /// Upload prefetch directory to the nix store77 Prefetch(Prefetch),78 /// Config parsing79 Info(Info),80 /// Command completions81 #[clap(hide(true))]82 Complete(Complete),83 /// Compile and evaluate terranix configuration84 Tf(Tf),85}8687#[derive(Parser)]88#[clap(version, author)]89struct RootOpts {90 #[clap(flatten)]91 fleet_opts: FleetOpts,92 #[clap(subcommand)]93 command: Opts,94}9596async fn run_command(config: &Config, opts: FleetOpts, command: Opts) -> Result<()> {97 match command {98 Opts::BuildSystems(c) => c.run(config, &opts).await?,99 Opts::Deploy(d) => d.run(config, &opts).await?,100 Opts::Secret(s) => s.run(config, &opts).await?,101 Opts::Info(i) => i.run(config).await?,102 Opts::Prefetch(p) => p.run(config).await?,103 Opts::Tf(t) => t.run(config).await?,104 // TODO: actually parse commands before starting the async runtime105 Opts::Complete(c) => {106 tokio::task::spawn_blocking(move || c.run(RootOpts::command())).await?107 }108 };109 Ok(())110}111112fn setup_logging() {113 #[cfg(feature = "indicatif")]114 let indicatif_layer = {115 use std::time::Duration;116117 IndicatifLayer::new().with_progress_style(118 ProgressStyle::with_template(119 "{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{download_progress} {elapsed}{color_end}",120 )121 .unwrap()122 .with_key("download_progress", |state: &ProgressState, writer: &mut dyn std::fmt::Write| {123 let Some(len) = state.len() else {124 return;125 };126 let pos = state.pos();127 if pos > len {128 let _ = write!(writer, "{}", pos.human_count_bare());129 } else {130 let _ = write!(writer, "{} / {}", pos.human_count_bare(), len.human_count_bare());131 }132 })133 .with_key(134 "color_start",135 |state: &ProgressState, writer: &mut dyn std::fmt::Write| {136 let elapsed = state.elapsed();137138 if elapsed > Duration::from_secs(60) {139 // Red140 let _ = write!(writer, "\x1b[{}m", 1 + 30);141 } else if elapsed > Duration::from_secs(30) {142 // Yellow143 let _ = write!(writer, "\x1b[{}m", 3 + 30);144 }145 },146 )147 .with_key(148 "color_end",149 |state: &ProgressState, writer: &mut dyn std::fmt::Write| {150 if state.elapsed() > Duration::from_secs(30) {151 let _ = write!(writer, "\x1b[0m");152 }153 },154 ),155 )156 };157158 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));159160 let reg = tracing_subscriber::registry().with({161 let sub = tracing_subscriber::fmt::layer()162 .without_time()163 .with_target(false);164 #[cfg(feature = "indicatif")]165 let sub = sub.with_writer(indicatif_layer.get_stdout_writer());166 sub.with_filter(filter) // .without,167 });168 // #[cfg(feature = "indicatif")]169 #[cfg(feature = "indicatif")]170 let reg = reg.with(indicatif_layer);171 reg.init();172}173174fn main() -> ExitCode {175 let opts = RootOpts::parse();176 if let Opts::Complete(c) = &opts.command {177 c.run(RootOpts::command());178 return ExitCode::SUCCESS;179 }180181 setup_logging();182 async_main(opts)183}184185#[tokio::main]186async fn async_main(opts: RootOpts) -> ExitCode {187 if let Err(e) = main_real(opts).await {188 error!("{e:#}");189 return ExitCode::FAILURE;190 }191 ExitCode::SUCCESS192}193194async fn main_real(opts: RootOpts) -> Result<()> {195 nix_eval::init_tokio();196197 let nix_args = std::env::var_os("NIX_ARGS")198 .map(|a| extra_args::parse_os(&a))199 .transpose()?200 .unwrap_or_default();201 let config = opts202 .fleet_opts203 .build(204 nix_args,205 matches!(opts.command, Opts::Deploy(_) | Opts::BuildSystems(_)),206 )207 .await?;208209 match run_command(&config, opts.fleet_opts, opts.command).await {210 Ok(()) => {211 config.save()?;212 Ok(())213 }214 Err(e) => {215 let _ = config.save();216 Err(e)217 }218 }219}220221#[cfg(test)]222mod tests {223 use super::*;224225 #[test]226 fn verify_command() {227 use clap::CommandFactory;228 RootOpts::command().debug_assert();229 }230}crates/fleet-base/src/command.rsdiffbeforeafterboth--- a/crates/fleet-base/src/command.rs
+++ b/crates/fleet-base/src/command.rs
@@ -5,6 +5,7 @@
use futures::StreamExt;
use itertools::Either;
use openssh::{OverSsh, OwningCommand, Session};
+use serde::de::DeserializeOwned;
use tokio::{io::AsyncRead, process::Command, select};
use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
use tracing::debug;
@@ -230,6 +231,10 @@
let bytes = self.run_bytes().await?;
Ok(String::from_utf8(bytes)?)
}
+ pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {
+ let v = self.run_string().await?;
+ Ok(serde_json::from_str(&v)?)
+ }
pub async fn run_bytes(self) -> Result<Vec<u8>> {
let str = self.clone().into_string();
let cmd = self.wrap_sudo_if_needed().into_command()?;
crates/fleet-base/src/host.rsdiffbeforeafterboth--- a/crates/fleet-base/src/host.rs
+++ b/crates/fleet-base/src/host.rs
@@ -105,6 +105,14 @@
let path = cmd.run_string().await?;
Ok(path.trim_end().to_owned())
}
+ pub async fn file_exists(&self, path: impl AsRef<OsStr>) -> Result<bool> {
+ let mut cmd = self.cmd("sh").await?;
+ cmd.arg("-c")
+ .arg("test -e \"$1\" && echo true || echo false")
+ .arg("_")
+ .arg(path);
+ Ok(cmd.run_value().await?)
+ }
pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
let mut cmd = self.cmd("cat").await?;
cmd.arg(path);
modules/nixos/meta.nixdiffbeforeafterboth--- a/modules/nixos/meta.nix
+++ b/modules/nixos/meta.nix
@@ -1,8 +1,17 @@
-{lib, ...}: let
+{ lib, ... }:
+let
inherit (lib.modules) mkRemovedOptionModule;
-in {
+in
+{
imports = [
- (mkRemovedOptionModule ["tags"] "tags are now defined at the host level, not the nixos system level for fast filtering without evaluating unnecessary hosts.")
- (mkRemovedOptionModule ["network"] "network is now defined at the host level, not the nixos system level")
+ (mkRemovedOptionModule [ "tags" ]
+ "tags are now defined at the host level, not the nixos system level for fast filtering without evaluating unnecessary hosts."
+ )
+ (mkRemovedOptionModule [
+ "network"
+ ] "network is now defined at the host level, not the nixos system level")
];
+
+ # Version of environment (fleet scripts such as rollback) already installed on the host
+ config.environment.etc.FLEET_HOST.text = "1";
}