difftreelog
feat explicitly mark hosts as managed by fleet
in: trunk
7 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -924,6 +924,7 @@
"hostname",
"human-repr",
"indicatif",
+ "indoc",
"itertools 0.13.0",
"nix-eval",
"nixlike",
@@ -1537,6 +1538,12 @@
]
[[package]]
+name = "indoc"
+version = "2.0.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd"
+
+[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -47,6 +47,7 @@
nix-eval.workspace = true
nom = "7.1.3"
fleet-base = { version = "0.1.0", path = "../../crates/fleet-base" }
+indoc = "2.0.6"
[features]
default = ["indicatif"]
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -1,6 +1,6 @@
-use std::{env::current_dir, os::unix::fs::symlink, path::PathBuf, time::Duration};
+use std::{env::current_dir, os::unix::fs::symlink, path::PathBuf, str::FromStr, time::Duration};
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, bail, Result};
use clap::{Parser, ValueEnum};
use fleet_base::{
host::{Config, ConfigHost},
@@ -132,6 +132,7 @@
disable_rollback: bool,
) -> Result<()> {
let mut failed = false;
+
// TODO: Lockfile, to prevent concurrent system switch?
// TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback
// is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to
@@ -332,6 +333,24 @@
}
}
+#[derive(Clone, PartialEq, Copy)]
+enum DeployKind {
+ // NixOS => NixOS managed by fleet
+ UpgradeToFleet,
+ // NixOS managed by fleet => NixOS managed by fleet
+ Fleet,
+}
+impl FromStr for DeployKind {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+ match s {
+ "upgrade-to-fleet" => Ok(Self::UpgradeToFleet),
+ "fleet" => Ok(Self::Fleet),
+ v => bail!("unknown deploy_kind: {v}; expected on of \"upgrade-to-fleet\", \"fleet\""),
+ }
+ }
+}
+
impl Deploy {
pub async fn run(self, config: &Config, opts: &FleetOpts) -> Result<()> {
let hosts = opts.filter_skipped(config.list_hosts().await?).await?;
@@ -348,6 +367,8 @@
let local_host = config.local_host();
let opts = opts.clone();
let batch = batch.clone();
+ let mut deploy_kind: Option<DeployKind> =
+ opts.action_attr(&host, "deploy_kind").await?;
set.spawn_local(
(async move {
@@ -356,10 +377,40 @@
{
Ok(path) => path,
Err(e) => {
- error!("failed to deploy host: {}", e);
+ error!("failed to build host system closure: {}", e);
return;
}
};
+ if deploy_kind == None {
+ let is_fleet_managed = match host.file_exists("/etc/FLEET_HOST").await {
+ Ok(v) => v,
+ Err(e) => {
+ error!("failed to query remote system kind: {}", e);
+ return;
+ },
+ };
+ if !is_fleet_managed {
+ error!(indoc::indoc!{"
+ host is not marked as managed by fleet
+ if you're not trying to lustrate/install system from scratch,
+ you should either
+ 1. manually create /etc/FLEET_HOST file on the target host,
+ 2. use ?deploy_kind=fleet host argument if you're upgrading from older version of fleet
+ 3. use ?deploy_kind=upgrade_to_fleet if you're upgrading from plain nixos to fleet-managed nixos
+ "});
+ return;
+ }
+ deploy_kind = Some(DeployKind::Fleet);
+ }
+ let deploy_kind = deploy_kind.expect("deploy_kind is set");
+
+ // TODO: Make disable_rollback a host attribute instead
+ let mut disable_rollback = self.disable_rollback;
+ if !disable_rollback && deploy_kind != DeployKind::Fleet {
+ warn!("disabling rollback, as not supported by non-fleet deployment kinds");
+ disable_rollback = true;
+ }
+
if !opts.is_local(&hostname) {
info!("uploading system closure");
{
@@ -411,7 +462,7 @@
error!("unreachable? failed to get specialization");
return;
},
- self.disable_rollback,
+ disable_rollback,
)
.await
{
cmds/fleet/src/main.rsdiffbeforeafterboth--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -66,9 +66,9 @@
#[derive(Parser)]
enum Opts {
- /// Prepare systems for deployments
+ /// Build system closures
BuildSystems(BuildSystems),
-
+ /// Upload and switch system closures
Deploy(Deploy),
/// Secret management
#[clap(subcommand)]
crates/fleet-base/src/command.rsdiffbeforeafterboth1use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};23use anyhow::{anyhow, Result};4use better_command::{Handler, NixHandler, PlainHandler};5use futures::StreamExt;6use itertools::Either;7use openssh::{OverSsh, OwningCommand, Session};8use tokio::{io::AsyncRead, process::Command, select};9use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};10use tracing::debug;1112use crate::host::EscalationStrategy;1314fn escape_bash(input: &str, out: &mut String) {15 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17 out.push_str(input);18 return;19 }20 out.push('\'');21 for (i, v) in input.split('\'').enumerate() {22 if i != 0 {23 out.push_str("'\"'\"'");24 }25 out.push_str(v);26 }27 out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30 os.as_ref().to_str().expect("non-utf8 data").to_owned()31}3233#[derive(Clone, Debug)]34pub struct MyCommand {35 command: String,36 args: Vec<String>,37 env: Vec<(String, String)>,38 ssh_session: Option<Arc<Session>>,39 escalation: EscalationStrategy,40 escalate: bool,41}42impl MyCommand {43 pub fn new_on(44 escalation: EscalationStrategy,45 cmd: impl AsRef<OsStr>,46 session: Arc<Session>,47 ) -> Self {48 assert!(!cmd.as_ref().is_empty());49 Self {50 command: ostoutf8(cmd),51 args: vec![],52 env: vec![],53 ssh_session: Some(session),54 escalation,55 escalate: false,56 }57 }58 pub fn new(escalation: EscalationStrategy, cmd: impl AsRef<OsStr>) -> Self {59 assert!(!cmd.as_ref().is_empty());60 Self {61 command: ostoutf8(cmd),62 args: vec![],63 env: vec![],64 ssh_session: None,65 escalation,66 escalate: false,67 }68 }69 fn new_here(&self, cmd: impl AsRef<OsStr>) -> Self {70 if let Some(ssh_session) = self.ssh_session.clone() {71 Self::new_on(self.escalation, cmd, ssh_session)72 } else {73 Self::new(self.escalation, cmd)74 }75 }7677 fn into_args(self) -> Vec<String> {78 let mut out = Vec::new();79 if !self.env.is_empty() {80 out.push("env".to_owned());81 for (k, v) in self.env {82 assert!(!k.contains('='));83 out.push(format!("{k}={v}"));84 }85 }86 out.push(self.command);87 out.extend(self.args);88 out89 }9091 /// Translates environment variables into env command execution.92 /// Required for ssh, as ssh don't allow to send environment variables (at least by default).93 ///94 /// FIXME: Insecure, as arguments might be seen by other users on the same machine.95 /// Figure out some way to transfer environment using stdio?96 fn translate_env_into_env(self) -> Self {97 if self.env.is_empty() {98 return self;99 }100 let mut out = self.new_here("env");101 for (k, v) in self.env {102 assert!(!k.contains('='));103 out.arg(format!("{k}={v}"));104 }105 out.arg(self.command);106 out.args(self.args);107108 out109 }110 fn into_string(self) -> String {111 let mut out = String::new();112 if !self.env.is_empty() {113 out.push_str("env");114 for (k, v) in self.env {115 out.push(' ');116 assert!(!k.contains('='));117 escape_bash(&k, &mut out);118 out.push('=');119 escape_bash(&v, &mut out);120 }121 }122 if !out.is_empty() {123 out.push(' ');124 }125 escape_bash(&self.command, &mut out);126 for arg in self.args {127 out.push(' ');128 escape_bash(&arg, &mut out);129 }130 out131 }132 fn into_command_unchecked_local(self) -> Command {133 let mut out = Command::new(self.command);134 out.args(self.args);135 for (k, v) in self.env {136 out.env(k, v);137 }138 out139 }140 fn into_command(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {141 Ok(if let Some(session) = self.ssh_session.clone() {142 let cmd = self.translate_env_into_env().into_command_unchecked_local();143 Either::Right(144 cmd.over_ssh(session)145 .map_err(|e| anyhow!("ssh error: {e}"))?,146 )147 } else {148 let cmd = self.into_command_unchecked_local();149 Either::Left(cmd)150 })151 }152 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {153 let arg = arg.as_ref();154 self.args.push(ostoutf8(arg));155 self156 }157 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {158 let arg = arg.as_ref();159 let value = value.as_ref();160 let arg = ostoutf8(arg);161 let value = ostoutf8(value);162 self.arg(format!("{arg}={value}"));163 self164 }165 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {166 self.arg(arg);167 self.arg(value);168 self169 }170 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {171 self.env172 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));173 self174 }175 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {176 for arg in args.into_iter() {177 let arg = arg.as_ref();178 self.args.push(ostoutf8(arg));179 }180 self181 }182 pub fn sudo(mut self) -> Self {183 self.escalate = true;184 self185 }186 fn wrap_sudo_if_needed(self) -> Self {187 if !self.escalate {188 return self;189 }190 match self.escalation {191 EscalationStrategy::Su => {192 let mut out = self.new_here("su");193 out.arg("-c").arg(self.into_string());194 out195 }196 EscalationStrategy::Sudo => {197 let mut out = self.new_here("sudo");198 out.args(self.into_args());199 out200 }201 EscalationStrategy::Run0 => {202 // run0 wants interactive authentication by default.203 let mut run0 = self.new_here("run0");204 let mut out = self.new_here("script");205206 // Red backgrounds messes with fleet formatting207 run0.arg("--background=");208 run0.args(self.into_args());209210 out.arg("-q");211 out.arg("/dev/null");212 out.arg("-c");213 out.arg(run0.into_string());214 dbg!(&out);215 out216 }217 }218 }219220 pub async fn run(self) -> Result<()> {221 let str = self.clone().into_string();222 let cmd = self.wrap_sudo_if_needed().into_command()?;223 match cmd {224 Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,225 Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,226 };227 Ok(())228 }229 pub async fn run_string(self) -> Result<String> {230 let bytes = self.run_bytes().await?;231 Ok(String::from_utf8(bytes)?)232 }233 pub async fn run_bytes(self) -> Result<Vec<u8>> {234 let str = self.clone().into_string();235 let cmd = self.wrap_sudo_if_needed().into_command()?;236 let v = match cmd {237 Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,238 Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,239 };240 Ok(v)241 }242243 pub async fn run_nix_string(mut self) -> Result<String> {244 let str = self.clone().into_string();245 self.arg("--log-format").arg("internal-json");246 let cmd = self.wrap_sudo_if_needed().into_command()?;247 let bytes = match cmd {248 Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?,249 Either::Right(cmd) => {250 run_nix_inner_stdout_ssh(str, cmd, &mut NixHandler::default()).await?251 }252 };253 Ok(String::from_utf8(bytes)?)254 }255 pub async fn run_nix(mut self) -> Result<()> {256 let str = self.clone().into_string();257 self.arg("--log-format").arg("internal-json");258 let cmd = self.wrap_sudo_if_needed().into_command()?;259 match cmd {260 Either::Left(mut cmd) => {261 cmd.stdout(Stdio::inherit());262 run_nix_inner(str, cmd, &mut NixHandler::default()).await263 }264 Either::Right(mut cmd) => {265 cmd.stdout(openssh::Stdio::inherit());266 run_nix_inner_ssh(str, cmd, &mut NixHandler::default()).await267 }268 }269 }270}271272struct EmptyAsyncRead;273impl AsyncRead for EmptyAsyncRead {274 fn poll_read(275 self: std::pin::Pin<&mut Self>,276 _cx: &mut std::task::Context<'_>,277 _buf: &mut tokio::io::ReadBuf<'_>,278 ) -> Poll<std::io::Result<()>> {279 Poll::Pending280 }281}282283async fn run_nix_inner_stdout(284 str: String,285 cmd: Command,286 handler: &mut dyn Handler,287) -> Result<Vec<u8>> {288 Ok(run_nix_inner_raw(str, cmd, true, handler, None)289 .await?290 .expect("has out"))291}292async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {293 let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;294 assert!(v.is_none());295 Ok(())296}297async fn run_nix_inner_stdout_ssh(298 str: String,299 cmd: OwningCommand<Arc<Session>>,300 handler: &mut dyn Handler,301) -> Result<Vec<u8>> {302 Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)303 .await?304 .expect("has out"))305}306async fn run_nix_inner_ssh(307 str: String,308 cmd: OwningCommand<Arc<Session>>,309 handler: &mut dyn Handler,310) -> Result<()> {311 let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;312 assert!(v.is_none());313 Ok(())314}315316async fn run_nix_inner_raw(317 str: String,318 mut cmd: Command,319 want_stdout: bool,320 err_handler: &mut dyn Handler,321 mut out_handler: Option<&mut dyn Handler>,322) -> Result<Option<Vec<u8>>> {323 cmd.stderr(Stdio::piped());324 cmd.stdout(Stdio::piped());325 debug!("running command {str:?} on local");326 let mut child = cmd.spawn()?;327 let mut stderr = child.stderr.take().unwrap();328 let stdout = child.stdout.take().unwrap();329 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());330 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));331 let mut ob = want_stdout332 .then(|| out.take().unwrap())333 .unwrap_or_else(|| Box::new(EmptyAsyncRead));334 let mut ol = (!want_stdout)335 .then(|| out.take().unwrap())336 .unwrap_or_else(|| Box::new(EmptyAsyncRead));337 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());338 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());339340 // while let Some(line) = read.next().await? {}341342 let mut out_buf = if want_stdout { Some(vec![]) } else { None };343 loop {344 select! {345 e = err.next() => {346 if let Some(e) = e {347 let e = e?;348 err_handler.handle_line(&e);349 }350 },351 o = ob.next() => {352 if let Some(o) = o {353 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);354 }355 },356 o = ol.next() => {357 if let Some(o) = o {358 let o = o?;359 if let Some(out) = out_handler.as_mut() {360 out.handle_line(&o)361 } else {362 err_handler.handle_line(&o)363 }364 // out_handler.handle_info(&o);365 }366 },367 code = child.wait() => {368 let code = code?;369 if !code.success() {370 anyhow::bail!("command '{str}' failed with status {}", code);371 }372 break;373 }374 }375 }376377 Ok(out_buf)378}379async fn run_nix_inner_raw_ssh(380 str: String,381 mut cmd: OwningCommand<Arc<Session>>,382 want_stdout: bool,383 err_handler: &mut dyn Handler,384 mut out_handler: Option<&mut dyn Handler>,385) -> Result<Option<Vec<u8>>> {386 debug!("running command {str:?} over ssh");387 cmd.stderr(openssh::Stdio::piped());388 cmd.stdout(openssh::Stdio::piped());389 let mut child = cmd.spawn().await?;390 let mut stderr = child.stderr().take().unwrap();391 let stdout = child.stdout().take().unwrap();392 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());393 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));394 let mut ob = want_stdout395 .then(|| out.take().unwrap())396 .unwrap_or_else(|| Box::new(EmptyAsyncRead));397 let mut ol = (!want_stdout)398 .then(|| out.take().unwrap())399 .unwrap_or_else(|| Box::new(EmptyAsyncRead));400 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());401 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());402403 // while let Some(line) = read.next().await? {}404405 let mut out_buf = if want_stdout { Some(vec![]) } else { None };406407 let mut wait_future = pin::pin!(child.wait());408 loop {409 select! {410 e = err.next() => {411 if let Some(e) = e {412 let e = e?;413 err_handler.handle_line(&e);414 }415 },416 o = ob.next() => {417 if let Some(o) = o {418 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);419 }420 },421 o = ol.next() => {422 if let Some(o) = o {423 let o = o?;424 if let Some(out) = out_handler.as_mut() {425 out.handle_line(&o)426 } else {427 err_handler.handle_line(&o)428 }429 // out_handler.handle_info(&o);430 }431 },432 code = &mut wait_future => {433 let code = code?;434 if !code.success() {435 anyhow::bail!("command '{str}' failed with status {}", code);436 }437 break;438 }439 }440 }441442 Ok(out_buf)443}1use std::{ffi::OsStr, pin, process::Stdio, sync::Arc, task::Poll};23use anyhow::{anyhow, Result};4use better_command::{Handler, NixHandler, PlainHandler};5use futures::StreamExt;6use itertools::Either;7use openssh::{OverSsh, OwningCommand, Session};8use serde::de::DeserializeOwned;9use tokio::{io::AsyncRead, process::Command, select};10use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};11use tracing::debug;1213use crate::host::EscalationStrategy;1415fn escape_bash(input: &str, out: &mut String) {16 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";17 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {18 out.push_str(input);19 return;20 }21 out.push('\'');22 for (i, v) in input.split('\'').enumerate() {23 if i != 0 {24 out.push_str("'\"'\"'");25 }26 out.push_str(v);27 }28 out.push('\'');29}30fn ostoutf8(os: impl AsRef<OsStr>) -> String {31 os.as_ref().to_str().expect("non-utf8 data").to_owned()32}3334#[derive(Clone, Debug)]35pub struct MyCommand {36 command: String,37 args: Vec<String>,38 env: Vec<(String, String)>,39 ssh_session: Option<Arc<Session>>,40 escalation: EscalationStrategy,41 escalate: bool,42}43impl MyCommand {44 pub fn new_on(45 escalation: EscalationStrategy,46 cmd: impl AsRef<OsStr>,47 session: Arc<Session>,48 ) -> Self {49 assert!(!cmd.as_ref().is_empty());50 Self {51 command: ostoutf8(cmd),52 args: vec![],53 env: vec![],54 ssh_session: Some(session),55 escalation,56 escalate: false,57 }58 }59 pub fn new(escalation: EscalationStrategy, cmd: impl AsRef<OsStr>) -> Self {60 assert!(!cmd.as_ref().is_empty());61 Self {62 command: ostoutf8(cmd),63 args: vec![],64 env: vec![],65 ssh_session: None,66 escalation,67 escalate: false,68 }69 }70 fn new_here(&self, cmd: impl AsRef<OsStr>) -> Self {71 if let Some(ssh_session) = self.ssh_session.clone() {72 Self::new_on(self.escalation, cmd, ssh_session)73 } else {74 Self::new(self.escalation, cmd)75 }76 }7778 fn into_args(self) -> Vec<String> {79 let mut out = Vec::new();80 if !self.env.is_empty() {81 out.push("env".to_owned());82 for (k, v) in self.env {83 assert!(!k.contains('='));84 out.push(format!("{k}={v}"));85 }86 }87 out.push(self.command);88 out.extend(self.args);89 out90 }9192 /// Translates environment variables into env command execution.93 /// Required for ssh, as ssh don't allow to send environment variables (at least by default).94 ///95 /// FIXME: Insecure, as arguments might be seen by other users on the same machine.96 /// Figure out some way to transfer environment using stdio?97 fn translate_env_into_env(self) -> Self {98 if self.env.is_empty() {99 return self;100 }101 let mut out = self.new_here("env");102 for (k, v) in self.env {103 assert!(!k.contains('='));104 out.arg(format!("{k}={v}"));105 }106 out.arg(self.command);107 out.args(self.args);108109 out110 }111 fn into_string(self) -> String {112 let mut out = String::new();113 if !self.env.is_empty() {114 out.push_str("env");115 for (k, v) in self.env {116 out.push(' ');117 assert!(!k.contains('='));118 escape_bash(&k, &mut out);119 out.push('=');120 escape_bash(&v, &mut out);121 }122 }123 if !out.is_empty() {124 out.push(' ');125 }126 escape_bash(&self.command, &mut out);127 for arg in self.args {128 out.push(' ');129 escape_bash(&arg, &mut out);130 }131 out132 }133 fn into_command_unchecked_local(self) -> Command {134 let mut out = Command::new(self.command);135 out.args(self.args);136 for (k, v) in self.env {137 out.env(k, v);138 }139 out140 }141 fn into_command(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {142 Ok(if let Some(session) = self.ssh_session.clone() {143 let cmd = self.translate_env_into_env().into_command_unchecked_local();144 Either::Right(145 cmd.over_ssh(session)146 .map_err(|e| anyhow!("ssh error: {e}"))?,147 )148 } else {149 let cmd = self.into_command_unchecked_local();150 Either::Left(cmd)151 })152 }153 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {154 let arg = arg.as_ref();155 self.args.push(ostoutf8(arg));156 self157 }158 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {159 let arg = arg.as_ref();160 let value = value.as_ref();161 let arg = ostoutf8(arg);162 let value = ostoutf8(value);163 self.arg(format!("{arg}={value}"));164 self165 }166 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {167 self.arg(arg);168 self.arg(value);169 self170 }171 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {172 self.env173 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));174 self175 }176 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {177 for arg in args.into_iter() {178 let arg = arg.as_ref();179 self.args.push(ostoutf8(arg));180 }181 self182 }183 pub fn sudo(mut self) -> Self {184 self.escalate = true;185 self186 }187 fn wrap_sudo_if_needed(self) -> Self {188 if !self.escalate {189 return self;190 }191 match self.escalation {192 EscalationStrategy::Su => {193 let mut out = self.new_here("su");194 out.arg("-c").arg(self.into_string());195 out196 }197 EscalationStrategy::Sudo => {198 let mut out = self.new_here("sudo");199 out.args(self.into_args());200 out201 }202 EscalationStrategy::Run0 => {203 // run0 wants interactive authentication by default.204 let mut run0 = self.new_here("run0");205 let mut out = self.new_here("script");206207 // Red backgrounds messes with fleet formatting208 run0.arg("--background=");209 run0.args(self.into_args());210211 out.arg("-q");212 out.arg("/dev/null");213 out.arg("-c");214 out.arg(run0.into_string());215 dbg!(&out);216 out217 }218 }219 }220221 pub async fn run(self) -> Result<()> {222 let str = self.clone().into_string();223 let cmd = self.wrap_sudo_if_needed().into_command()?;224 match cmd {225 Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,226 Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,227 };228 Ok(())229 }230 pub async fn run_string(self) -> Result<String> {231 let bytes = self.run_bytes().await?;232 Ok(String::from_utf8(bytes)?)233 }234 pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {235 let v = self.run_string().await?;236 Ok(serde_json::from_str(&v)?)237 }238 pub async fn run_bytes(self) -> Result<Vec<u8>> {239 let str = self.clone().into_string();240 let cmd = self.wrap_sudo_if_needed().into_command()?;241 let v = match cmd {242 Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,243 Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,244 };245 Ok(v)246 }247248 pub async fn run_nix_string(mut self) -> Result<String> {249 let str = self.clone().into_string();250 self.arg("--log-format").arg("internal-json");251 let cmd = self.wrap_sudo_if_needed().into_command()?;252 let bytes = match cmd {253 Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?,254 Either::Right(cmd) => {255 run_nix_inner_stdout_ssh(str, cmd, &mut NixHandler::default()).await?256 }257 };258 Ok(String::from_utf8(bytes)?)259 }260 pub async fn run_nix(mut self) -> Result<()> {261 let str = self.clone().into_string();262 self.arg("--log-format").arg("internal-json");263 let cmd = self.wrap_sudo_if_needed().into_command()?;264 match cmd {265 Either::Left(mut cmd) => {266 cmd.stdout(Stdio::inherit());267 run_nix_inner(str, cmd, &mut NixHandler::default()).await268 }269 Either::Right(mut cmd) => {270 cmd.stdout(openssh::Stdio::inherit());271 run_nix_inner_ssh(str, cmd, &mut NixHandler::default()).await272 }273 }274 }275}276277struct EmptyAsyncRead;278impl AsyncRead for EmptyAsyncRead {279 fn poll_read(280 self: std::pin::Pin<&mut Self>,281 _cx: &mut std::task::Context<'_>,282 _buf: &mut tokio::io::ReadBuf<'_>,283 ) -> Poll<std::io::Result<()>> {284 Poll::Pending285 }286}287288async fn run_nix_inner_stdout(289 str: String,290 cmd: Command,291 handler: &mut dyn Handler,292) -> Result<Vec<u8>> {293 Ok(run_nix_inner_raw(str, cmd, true, handler, None)294 .await?295 .expect("has out"))296}297async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {298 let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;299 assert!(v.is_none());300 Ok(())301}302async fn run_nix_inner_stdout_ssh(303 str: String,304 cmd: OwningCommand<Arc<Session>>,305 handler: &mut dyn Handler,306) -> Result<Vec<u8>> {307 Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)308 .await?309 .expect("has out"))310}311async fn run_nix_inner_ssh(312 str: String,313 cmd: OwningCommand<Arc<Session>>,314 handler: &mut dyn Handler,315) -> Result<()> {316 let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;317 assert!(v.is_none());318 Ok(())319}320321async fn run_nix_inner_raw(322 str: String,323 mut cmd: Command,324 want_stdout: bool,325 err_handler: &mut dyn Handler,326 mut out_handler: Option<&mut dyn Handler>,327) -> Result<Option<Vec<u8>>> {328 cmd.stderr(Stdio::piped());329 cmd.stdout(Stdio::piped());330 debug!("running command {str:?} on local");331 let mut child = cmd.spawn()?;332 let mut stderr = child.stderr.take().unwrap();333 let stdout = child.stdout.take().unwrap();334 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());335 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));336 let mut ob = want_stdout337 .then(|| out.take().unwrap())338 .unwrap_or_else(|| Box::new(EmptyAsyncRead));339 let mut ol = (!want_stdout)340 .then(|| out.take().unwrap())341 .unwrap_or_else(|| Box::new(EmptyAsyncRead));342 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());343 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());344345 // while let Some(line) = read.next().await? {}346347 let mut out_buf = if want_stdout { Some(vec![]) } else { None };348 loop {349 select! {350 e = err.next() => {351 if let Some(e) = e {352 let e = e?;353 err_handler.handle_line(&e);354 }355 },356 o = ob.next() => {357 if let Some(o) = o {358 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);359 }360 },361 o = ol.next() => {362 if let Some(o) = o {363 let o = o?;364 if let Some(out) = out_handler.as_mut() {365 out.handle_line(&o)366 } else {367 err_handler.handle_line(&o)368 }369 // out_handler.handle_info(&o);370 }371 },372 code = child.wait() => {373 let code = code?;374 if !code.success() {375 anyhow::bail!("command '{str}' failed with status {}", code);376 }377 break;378 }379 }380 }381382 Ok(out_buf)383}384async fn run_nix_inner_raw_ssh(385 str: String,386 mut cmd: OwningCommand<Arc<Session>>,387 want_stdout: bool,388 err_handler: &mut dyn Handler,389 mut out_handler: Option<&mut dyn Handler>,390) -> Result<Option<Vec<u8>>> {391 debug!("running command {str:?} over ssh");392 cmd.stderr(openssh::Stdio::piped());393 cmd.stdout(openssh::Stdio::piped());394 let mut child = cmd.spawn().await?;395 let mut stderr = child.stderr().take().unwrap();396 let stdout = child.stdout().take().unwrap();397 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());398 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));399 let mut ob = want_stdout400 .then(|| out.take().unwrap())401 .unwrap_or_else(|| Box::new(EmptyAsyncRead));402 let mut ol = (!want_stdout)403 .then(|| out.take().unwrap())404 .unwrap_or_else(|| Box::new(EmptyAsyncRead));405 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());406 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());407408 // while let Some(line) = read.next().await? {}409410 let mut out_buf = if want_stdout { Some(vec![]) } else { None };411412 let mut wait_future = pin::pin!(child.wait());413 loop {414 select! {415 e = err.next() => {416 if let Some(e) = e {417 let e = e?;418 err_handler.handle_line(&e);419 }420 },421 o = ob.next() => {422 if let Some(o) = o {423 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);424 }425 },426 o = ol.next() => {427 if let Some(o) = o {428 let o = o?;429 if let Some(out) = out_handler.as_mut() {430 out.handle_line(&o)431 } else {432 err_handler.handle_line(&o)433 }434 // out_handler.handle_info(&o);435 }436 },437 code = &mut wait_future => {438 let code = code?;439 if !code.success() {440 anyhow::bail!("command '{str}' failed with status {}", code);441 }442 break;443 }444 }445 }446447 Ok(out_buf)448}crates/fleet-base/src/host.rsdiffbeforeafterboth--- a/crates/fleet-base/src/host.rs
+++ b/crates/fleet-base/src/host.rs
@@ -105,6 +105,14 @@
let path = cmd.run_string().await?;
Ok(path.trim_end().to_owned())
}
+ pub async fn file_exists(&self, path: impl AsRef<OsStr>) -> Result<bool> {
+ let mut cmd = self.cmd("sh").await?;
+ cmd.arg("-c")
+ .arg("test -e \"$1\" && echo true || echo false")
+ .arg("_")
+ .arg(path);
+ Ok(cmd.run_value().await?)
+ }
pub async fn read_file_bin(&self, path: impl AsRef<OsStr>) -> Result<Vec<u8>> {
let mut cmd = self.cmd("cat").await?;
cmd.arg(path);
modules/nixos/meta.nixdiffbeforeafterboth--- a/modules/nixos/meta.nix
+++ b/modules/nixos/meta.nix
@@ -1,8 +1,17 @@
-{lib, ...}: let
+{ lib, ... }:
+let
inherit (lib.modules) mkRemovedOptionModule;
-in {
+in
+{
imports = [
- (mkRemovedOptionModule ["tags"] "tags are now defined at the host level, not the nixos system level for fast filtering without evaluating unnecessary hosts.")
- (mkRemovedOptionModule ["network"] "network is now defined at the host level, not the nixos system level")
+ (mkRemovedOptionModule [ "tags" ]
+ "tags are now defined at the host level, not the nixos system level for fast filtering without evaluating unnecessary hosts."
+ )
+ (mkRemovedOptionModule [
+ "network"
+ ] "network is now defined at the host level, not the nixos system level")
];
+
+ # Version of environment (fleet scripts such as rollback) already installed on the host
+ config.environment.etc.FLEET_HOST.text = "1";
}