difftreelog
feat explicitly mark hosts as managed by fleet
in: trunk
7 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -924,6 +924,7 @@
"hostname",
"human-repr",
"indicatif",
+ "indoc",
"itertools 0.13.0",
"nix-eval",
"nixlike",
@@ -1537,6 +1538,12 @@
]
[[package]]
+name = "indoc"
+version = "2.0.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd"
+
+[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -47,6 +47,7 @@
nix-eval.workspace = true
nom = "7.1.3"
fleet-base = { version = "0.1.0", path = "../../crates/fleet-base" }
+indoc = "2.0.6"
[features]
default = ["indicatif"]
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -1,6 +1,6 @@
-use std::{env::current_dir, os::unix::fs::symlink, path::PathBuf, time::Duration};
+use std::{env::current_dir, os::unix::fs::symlink, path::PathBuf, str::FromStr, time::Duration};
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, bail, Result};
use clap::{Parser, ValueEnum};
use fleet_base::{
host::{Config, ConfigHost},
@@ -132,6 +132,7 @@
disable_rollback: bool,
) -> Result<()> {
let mut failed = false;
+
// TODO: Lockfile, to prevent concurrent system switch?
// TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback
// is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to
@@ -332,6 +333,24 @@
}
}
+#[derive(Clone, PartialEq, Copy)]
+enum DeployKind {
+ // NixOS => NixOS managed by fleet
+ UpgradeToFleet,
+ // NixOS managed by fleet => NixOS managed by fleet
+ Fleet,
+}
+impl FromStr for DeployKind {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+ match s {
+ "upgrade-to-fleet" => Ok(Self::UpgradeToFleet),
+ "fleet" => Ok(Self::Fleet),
+ v => bail!("unknown deploy_kind: {v}; expected on of \"upgrade-to-fleet\", \"fleet\""),
+ }
+ }
+}
+
impl Deploy {
pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
@@ -348,6 +367,8 @@
let local_host = config.local_host();
let opts = opts.clone();
let batch = batch.clone();
+ let mut deploy_kind: Option<DeployKind> =
+ opts.action_attr(&host, "deploy_kind").await?;
set.spawn_local(
(async move {
@@ -356,10 +377,40 @@
{
Ok(path) => path,
Err(e) => {
- error!("failed to deploy host: {}", e);
+ error!("failed to build host system closure: {}", e);
return;
}
};
+ if deploy_kind == None {
+ let is_fleet_managed = match host.file_exists("/etc/FLEET_HOST").await {
+ Ok(v) => v,
+ Err(e) => {
+ error!("failed to query remote system kind: {}", e);
+ return;
+ },
+ };
+ if !is_fleet_managed {
+ error!(indoc::indoc!{"
+ host is not marked as managed by fleet
+ if you're not trying to lustrate/install system from scratch,
+ you should either
+ 1. manually create /etc/FLEET_HOST file on the target host,
+ 2. use ?deploy_kind=fleet host argument if you're upgrading from older version of fleet
+ 3. use ?deploy_kind=upgrade_to_fleet if you're upgrading from plain nixos to fleet-managed nixos
+ "});
+ return;
+ }
+ deploy_kind = Some(DeployKind::Fleet);
+ }
+ let deploy_kind = deploy_kind.expect("deploy_kind is set");
+
+ // TODO: Make disable_rollback a host attribute instead
+ let mut disable_rollback = self.disable_rollback;
+ if !disable_rollback && deploy_kind != DeployKind::Fleet {
+ warn!("disabling rollback, as not supported by non-fleet deployment kinds");
+ disable_rollback = true;
+ }
+
if !opts.is_local(&hostname) {
info!("uploading system closure");
{
@@ -411,7 +462,7 @@
error!("unreachable? failed to get specialization");
return;
},
- self.disable_rollback,
+ disable_rollback,
)
.await
{
cmds/fleet/src/main.rsdiffbeforeafterboth--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -66,9 +66,9 @@
#[derive(Parser)]
enum Opts {
- /// Prepare systems for deployments
+ /// Build system closures
BuildSystems(BuildSystems),
-
+ /// Upload and switch system closures
Deploy(Deploy),
/// Secret management
#[clap(subcommand)]
crates/fleet-base/src/command.rsdiffbeforeafterboth1use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};23use anyhow::{anyhow, Result};4use better_command::{Handler, NixHandler, PlainHandler};5use futures::StreamExt;6use itertools::Either;7use openssh::{OverSsh, OwningCommand, Session};8use tokio::{io::AsyncRead, process::Command, select};9use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};10use tracing::debug;1112use crate::host::EscalationStrategy;1314fn escape_bash(input: &str, out: &mut String) {15 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17 out.push_str(input);18 return;19 }20 out.push('\'');21 for (i, v) in input.split('\'').enumerate() {22 if i != 0 {23 out.push_str("'\"'\"'");24 }25 out.push_str(v);26 }27 out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30 os.as_ref().to_str().expect("non-utf8 data").to_owned()31}3233#[derive(Clone, Debug)]34pub struct MyCommand {35 command: String,36 args: Vec<String>,37 env: Vec<(String, String)>,38 ssh_session: Option<Arc<Session>>,39 escalation: EscalationStrategy,40 escalate: bool,41}42impl MyCommand {43 pub fn new_on(44 escalation: EscalationStrategy,45 cmd: impl AsRef<OsStr>,46 session: Arc<Session>,47 ) -> Self {48 assert!(!cmd.as_ref().is_empty());49 Self {50 command: ostoutf8(cmd),51 args: vec![],52 env: vec![],53 ssh_session: Some(session),54 escalation,55 escalate: false,56 }57 }58 pub fn new(escalation: EscalationStrategy, cmd: impl AsRef<OsStr>) -> Self {59 assert!(!cmd.as_ref().is_empty());60 Self {61 command: ostoutf8(cmd),62 args: vec![],63 env: vec![],64 ssh_session: None,65 escalation,66 escalate: false,67 }68 }69 fn new_here(&self, cmd: impl AsRef<OsStr>) -> Self {70 if let Some(ssh_session) = self.ssh_session.clone() {71 Self::new_on(self.escalation, cmd, ssh_session)72 } else {73 Self::new(self.escalation, cmd)74 }75 }7677 fn into_args(self) -> Vec<String> {78 let mut out = Vec::new();79 if !self.env.is_empty() {80 out.push("env".to_owned());81 for (k, v) in self.env {82 assert!(!k.contains('='));83 out.push(format!("{k}={v}"));84 }85 }86 out.push(self.command);87 out.extend(self.args);88 out89 }9091 /// Translates environment variables into env command execution.92 /// Required for ssh, as ssh don't allow to send environment variables (at least by default).93 ///94 /// FIXME: Insecure, as arguments might be seen by other users on the same machine.95 /// Figure out some way to transfer environment using stdio?96 fn translate_env_into_env(self) -> Self {97 if self.env.is_empty() {98 return self;99 }100 let mut out = self.new_here("env");101 for (k, v) in self.env {102 assert!(!k.contains('='));103 out.arg(format!("{k}={v}"));104 }105 out.arg(self.command);106 out.args(self.args);107108 out109 }110 fn into_string(self) -> String {111 let mut out = String::new();112 if !self.env.is_empty() {113 out.push_str("env");114 for (k, v) in self.env {115 out.push(' ');116 assert!(!k.contains('='));117 escape_bash(&k, &mut out);118 out.push('=');119 escape_bash(&v, &mut out);120 }121 }122 if !out.is_empty() {123 out.push(' ');124 }125 escape_bash(&self.command, &mut out);126 for arg in self.args {127 out.push(' ');128 escape_bash(&arg, &mut out);129 }130 out131 }132 fn into_command_unchecked_local(self) -> Command {133 let mut out = Command::new(self.command);134 out.args(self.args);135 for (k, v) in self.env {136 out.env(k, v);137 }138 out139 }140 fn into_command(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {141 Ok(if let Some(session) = self.ssh_session.clone() {142 let cmd = self.translate_env_into_env().into_command_unchecked_local();143 Either::Right(144 cmd.over_ssh(session)145 .map_err(|e| anyhow!("ssh error: {e}"))?,146 )147 } else {148 let cmd = self.into_command_unchecked_local();149 Either::Left(cmd)150 })151 }152 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {153 let arg = arg.as_ref();154 self.args.push(ostoutf8(arg));155 self156 }157 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {158 let arg = arg.as_ref();159 let value = value.as_ref();160 let arg = ostoutf8(arg);161 let value = ostoutf8(value);162 self.arg(format!("{arg}={value}"));163 self164 }165 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {166 self.arg(arg);167 self.arg(value);168 self169 }170 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {171 self.env172 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));173 self174 }175 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {176 for arg in args.into_iter() {177 let arg = arg.as_ref();178 self.args.push(ostoutf8(arg));179 }180 self181 }182 pub fn sudo(mut self) -> Self {183 self.escalate = true;184 self185 }186 fn wrap_sudo_if_needed(self) -> Self {187 if !self.escalate {188 return self;189 }190 match self.escalation {191 EscalationStrategy::Su => {192 let mut out = self.new_here("su");193 out.arg("-c").arg(self.into_string());194 out195 }196 EscalationStrategy::Sudo => {197 let mut out = self.new_here("sudo");198 out.args(self.into_args());199 out200 }201 EscalationStrategy::Run0 => {202 // run0 wants interactive authentication by default.203 let mut run0 = self.new_here("run0");204 let mut out = self.new_here("script");205206 // Red backgrounds messes with fleet formatting207 run0.arg("--background=");208 run0.args(self.into_args());209210 out.arg("-q");211 out.arg("/dev/null");212 out.arg("-c");213 out.arg(run0.into_string());214 dbg!(&out);215 out216 }217 }218 }219220 pub async fn run(self) -> Result<()> {221 let str = self.clone().into_string();222 let cmd = self.wrap_sudo_if_needed().into_command()?;223 match cmd {224 Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,225 Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,226 };227 Ok(())228 }229 pub async fn run_string(self) -> Result<String> {230 let bytes = self.run_bytes().await?;231 Ok(String::from_utf8(bytes)?)232 }233 pub async fn run_bytes(self) -> Result<Vec<u8>> {234 let str = self.clone().into_string();235 let cmd = self.wrap_sudo_if_needed().into_command()?;236 let v = match cmd {237 Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,238 Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,239 };240 Ok(v)241 }242243 pub async fn run_nix_string(mut self) -> Result<String> {244 let str = self.clone().into_string();245 self.arg("--log-format").arg("internal-json");246 let cmd = self.wrap_sudo_if_needed().into_command()?;247 let bytes = match cmd {248 Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?,249 Either::Right(cmd) => {250 run_nix_inner_stdout_ssh(str, cmd, &mut NixHandler::default()).await?251 }252 };253 Ok(String::from_utf8(bytes)?)254 }255 pub async fn run_nix(mut self) -> Result<()> {256 let str = self.clone().into_string();257 self.arg("--log-format").arg("internal-json");258 let cmd = self.wrap_sudo_if_needed().into_command()?;259 match cmd {260 Either::Left(mut cmd) => {261 cmd.stdout(Stdio::inherit());262 run_nix_inner(str, cmd, &mut NixHandler::default()).await263 }264 Either::Right(mut cmd) => {265 cmd.stdout(openssh::Stdio::inherit());266 run_nix_inner_ssh(str, cmd, &mut NixHandler::default()).await267 }268 }269 }270}271272struct EmptyAsyncRead;273impl AsyncRead for EmptyAsyncRead {274 fn poll_read(275 self: std::pin::Pin<&mut Self>,276 _cx: &mut std::task::Context<'_>,277 _buf: &mut tokio::io::ReadBuf<'_>,278 ) -> Poll<std::io::Result<()>> {279 Poll::Pending280 }281}282283async fn run_nix_inner_stdout(284 str: String,285 cmd: Command,286 handler: &mut dyn Handler,287) -> Result<Vec<u8>> {288 Ok(run_nix_inner_raw(str, cmd, true, handler, None)289 .await?290 .expect("has out"))291}292async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {293 let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;294 assert!(v.is_none());295 Ok(())296}297async fn run_nix_inner_stdout_ssh(298 str: String,299 cmd: OwningCommand<Arc<Session>>,300 handler: &mut dyn Handler,301) -> Result<Vec<u8>> {302 Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)303 .await?304 .expect("has out"))305}306async fn run_nix_inner_ssh(307 str: String,308 cmd: OwningCommand<Arc<Session>>,309 handler: &mut dyn Handler,310) -> Result<()> {311 let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;312 assert!(v.is_none());313 Ok(())314}315316async fn run_nix_inner_raw(317 str: String,318 mut cmd: Command,319 want_stdout: bool,320 err_handler: &mut dyn Handler,321 mut out_handler: Option<&mut dyn Handler>,322) -> Result<Option<Vec<u8>>> {323 cmd.stderr(Stdio::piped());324 cmd.stdout(Stdio::piped());325 debug!("running command {str:?} on local");326 let mut child = cmd.spawn()?;327 let mut stderr = child.stderr.take().unwrap();328 let stdout = child.stdout.take().unwrap();329 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());330 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));331 let mut ob = want_stdout332 .then(|| out.take().unwrap())333 .unwrap_or_else(|| Box::new(EmptyAsyncRead));334 let mut ol = (!want_stdout)335 .then(|| out.take().unwrap())336 .unwrap_or_else(|| Box::new(EmptyAsyncRead));337 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());338 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());339340 // while let Some(line) = read.next().await? {}341342 let mut out_buf = if want_stdout { Some(vec![]) } else { None };343 loop {344 select! {345 e = err.next() => {346 if let Some(e) = e {347 let e = e?;348 err_handler.handle_line(&e);349 }350 },351 o = ob.next() => {352 if let Some(o) = o {353 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);354 }355 },356 o = ol.next() => {357 if let Some(o) = o {358 let o = o?;359 if let Some(out) = out_handler.as_mut() {360 out.handle_line(&o)361 } else {362 err_handler.handle_line(&o)363 }364 // out_handler.handle_info(&o);365 }366 },367 code = child.wait() => {368 let code = code?;369 if !code.success() {370 anyhow::bail!("command '{str}' failed with status {}", code);371 }372 break;373 }374 }375 }376377 Ok(out_buf)378}379async fn run_nix_inner_raw_ssh(380 str: String,381 mut cmd: OwningCommand<Arc<Session>>,382 want_stdout: bool,383 err_handler: &mut dyn Handler,384 mut out_handler: Option<&mut dyn Handler>,385) -> Result<Option<Vec<u8>>> {386 debug!("running command {str:?} over ssh");387 cmd.stderr(openssh::Stdio::piped());388 cmd.stdout(openssh::Stdio::piped());389 let mut child = cmd.spawn().await?;390 let mut stderr = child.stderr().take().unwrap();391 let stdout = child.stdout().take().unwrap();392 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());393 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));394 let mut ob = want_stdout395 .then(|| out.take().unwrap())396 .unwrap_or_else(|| Box::new(EmptyAsyncRead));397 let mut ol = (!want_stdout)398 .then(|| out.take().unwrap())399 .unwrap_or_else(|| Box::new(EmptyAsyncRead));400 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());401 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());402403 // while let Some(line) = read.next().await? {}404405 let mut out_buf = if want_stdout { Some(vec![]) } else { None };406407 let mut wait_future = pin::pin!(child.wait());408 loop {409 select! {410 e = err.next() => {411 if let Some(e) = e {412 let e = e?;413 err_handler.handle_line(&e);414 }415 },416 o = ob.next() => {417 if let Some(o) = o {418 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);419 }420 },421 o = ol.next() => {422 if let Some(o) = o {423 let o = o?;424 if let Some(out) = out_handler.as_mut() {425 out.handle_line(&o)426 } else {427 err_handler.handle_line(&o)428 }429 // out_handler.handle_info(&o);430 }431 },432 code = &mut wait_future => {433 let code = code?;434 if !code.success() {435 anyhow::bail!("command '{str}' failed with status {}", code);436 }437 break;438 }439 }440 }441442 Ok(out_buf)443}crates/fleet-base/src/host.rsdiffbeforeafterboth--- a/crates/fleet-base/src/host.rs
+++ b/crates/fleet-base/src/host.rs
@@ -105,6 +105,14 @@
let path = cmd.run_string().await?;
Ok(path.trim_end().to_owned())
}
+ pub async fn file_exists(&self, path: impl AsRef<OsStr>) -> Result<bool> {
+ let mut cmd = self.cmd("sh").await?;
+ cmd.arg("-c")
+ .arg("test -e \"$1\" && echo true || echo false")
+ .arg("_")
+ .arg(path);
+ Ok(cmd.run_value().await?)
+ }
pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
let mut cmd = self.cmd("cat").await?;
cmd.arg(path);
modules/nixos/meta.nixdiffbeforeafterboth--- a/modules/nixos/meta.nix
+++ b/modules/nixos/meta.nix
@@ -1,8 +1,17 @@
-{lib, ...}: let
+{ lib, ... }:
+let
inherit (lib.modules) mkRemovedOptionModule;
-in {
+in
+{
imports = [
- (mkRemovedOptionModule ["tags"] "tags are now defined at the host level, not the nixos system level for fast filtering without evaluating unnecessary hosts.")
- (mkRemovedOptionModule ["network"] "network is now defined at the host level, not the nixos system level")
+ (mkRemovedOptionModule [ "tags" ]
+ "tags are now defined at the host level, not the nixos system level for fast filtering without evaluating unnecessary hosts."
+ )
+ (mkRemovedOptionModule [
+ "network"
+ ] "network is now defined at the host level, not the nixos system level")
];
+
+ # Version of environment (fleet scripts such as rollback) already installed on the host
+ config.environment.etc.FLEET_HOST.text = "1";
}