difftreelog
refactor perform build using nix repl
in: trunk
8 files changed
cmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -1,5 +1,7 @@
+use std::collections::HashMap;
use std::ffi::{OsStr, OsString};
-use std::fmt::Display;
+use std::fmt::{self, Display};
+use std::path::PathBuf;
use std::process::Stdio;
use std::sync::{Arc, OnceLock};
@@ -8,7 +10,7 @@
use itertools::Itertools;
use r2d2::{Pool, PooledConnection};
use serde::de::DeserializeOwned;
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tokio::process::{ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::select;
@@ -72,14 +74,20 @@
// s.split('\n').filter(|s| !s.trim().is_empty()).map(|v| v.)
// }
if !self.collected.is_empty() {
- bail!("{}", self.collected.iter().map(|v| {
- if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {
- let v = unindent::unindent(f.trim_start());
- v.trim().to_owned()
- } else {
- v.to_owned()
- }
- }).join("\n"));
+ bail!(
+ "{}",
+ self.collected
+ .iter()
+ .map(|v| {
+ if let Some(f) = v.strip_prefix("\u{1b}[31;1merror:\u{1b}[0m ") {
+ let v = unindent::unindent(f.trim_start());
+ v.trim().to_owned()
+ } else {
+ v.to_owned()
+ }
+ })
+ .join("\n")
+ );
}
Ok(())
}
@@ -150,6 +158,13 @@
}
}
+struct WarnHandler;
+impl Handler for WarnHandler {
+ fn handle_line(&mut self, e: &str) {
+ warn!(target: "nix", "{e}")
+ }
+}
+
impl NixSessionInner {
async fn new(flake: &OsStr, extra_args: impl IntoIterator<Item = &OsStr>) -> Result<Self> {
let mut cmd = Command::new("nix");
@@ -174,12 +189,13 @@
stdin.flush().await?;
let nix_handler = NixHandler::default();
let mut full_delimiter = None;
+ let mut errors = vec![];
while let Some(line) = out.next().await {
let line = match line {
OutputLine::Out(o) => o,
OutputLine::Err(_e) => {
// Handle startup errors, but skip repl hello?
- //nix_handler.handle_line(&e);
+ errors.push(_e);
continue;
}
};
@@ -190,6 +206,9 @@
}
}
let Some(full_delimiter) = full_delimiter else {
+ for e in errors {
+ error!("{e}");
+ }
bail!("failed to discover delimiter");
};
let mut res = Self {
@@ -342,21 +361,93 @@
#[derive(Clone)]
pub struct NixSession(Arc<tokio::sync::Mutex<PooledConnection<NixSessionPoolInner>>>);
+#[macro_export]
+macro_rules! nix_path {
+ (@o($o:ident) $var:ident $($tt:tt)*) => {{
+ $o.push(Index::var(stringify!($var)));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) . $var:ident $($tt:tt)*) => {{
+ $o.push(Index::attr(stringify!($var)));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) . $var:literal $($tt:tt)*) => {{
+ $o.push(Index::attr($var));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) . { $var:expr } $($tt:tt)*) => {{
+ $o.push(Index::attr($var));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) [ $var:literal ] $($tt:tt)*) => {{
+ $o.push(Index::idx($var));
+ nix_path!(@o($o) $($tt)*);
+ }};
+ (@o($o:ident) ($e:expr) $($tt:tt)*) => {
+ $o.push(Index::apply($e));
+ nix_path!(@o($o) $($tt)*);
+ };
+ (@o($o:ident)) => {};
+ ($($tt:tt)+) => {{
+ use $crate::{nix_path, better_nix_eval::Index};
+ let mut out = vec![];
+ nix_path!(@o(out) $($tt)*);
+ out
+ }}
+}
+
#[derive(Clone)]
-enum Index {
+pub enum Index {
+ Var(String),
String(String),
- // Idx(u32),
+ Apply(String),
+ Idx(u32),
}
+impl Index {
+ pub fn var(v: impl AsRef<str>) -> Self {
+ let v = v.as_ref();
+ assert!(
+ !(v.contains('.') | v.contains(' ')),
+ "bad variable name: {v}"
+ );
+ Self::Var(v.to_owned())
+ }
+ pub fn attr(v: impl AsRef<str>) -> Self {
+ Self::String(v.as_ref().to_owned())
+ }
+ pub fn idx(v: u32) -> Self {
+ Self::Idx(v)
+ }
+ pub fn apply(v: impl Serialize) -> Self {
+ let serialized = nixlike::serialize(v).expect("invalid value for apply");
+ Self::Apply(serialized)
+ }
+}
impl Display for Index {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
+ Index::Var(v) => {
+ write!(f, "{v}")
+ }
Index::String(k) => {
let v = nixlike::format_identifier(k.as_str());
write!(f, ".{v}")
}
+ Index::Apply(o) => {
+ let v = nixlike::serialize(o).map_err(|_| fmt::Error)?;
+ write!(f, "<apply>({v})")
+ }
+ Index::Idx(i) => {
+ write!(f, "[{i}]")
+ }
}
}
}
+impl fmt::Debug for Index {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "{self}")
+ }
+}
struct PathDisplay<'i>(&'i [Index]);
impl Display for PathDisplay<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -381,43 +472,49 @@
}
}
pub async fn field(session: NixSession, field: &str) -> Result<Self> {
- Self::root(session).get_field_deep([field]).await
+ Self::root(session)
+ .select([Index::var(field)])
+ .await
}
pub async fn get_json_deep<'a, V: DeserializeOwned>(
&self,
- name: impl IntoIterator<Item = &'a str>,
+ name: impl IntoIterator<Item = Index>,
) -> Result<V> {
- let field = self.get_field_deep(name).await?;
+ let field = self.select(name).await?;
field.as_json().await
}
- pub async fn get_field(&self, name: &str) -> Result<Self> {
- self.get_field_deep([name]).await
- }
- pub async fn get_field_deep<'a>(
- &self,
- name: impl IntoIterator<Item = &'a str>,
- ) -> Result<Self> {
- let mut iter = name.into_iter();
+ pub async fn select<'a>(&self, name: impl IntoIterator<Item = Index>) -> Result<Self> {
+ let mut name = name.into_iter();
let mut full_path = self.full_path.clone();
let mut query = if let Some(id) = self.value {
format!("sess_field_{id}")
} else {
- let first = iter.next().expect("name not empty");
- ensure!(
- !(first.contains('.') | first.contains(' ')),
- "bad name for root query: {first}"
- );
- full_path.push(Index::String(first.to_string()));
- first.to_string()
+ let first = name.next();
+ if let Some(Index::Var(i)) = first {
+ full_path.push(Index::Var(i.clone()));
+ i.clone()
+ } else {
+ panic!("first path item should be variable, got {first:?}")
+ }
};
- for v in iter {
- full_path.push(Index::String(v.to_string()));
- // Escape
- let escaped = nixlike::serialize(v)?;
- let escaped = escaped.trim();
- query.push('.');
- query.push_str(escaped);
+ for v in name {
+ full_path.push(v.clone());
+ match v {
+ Index::Var(_) => panic!("var item may only be first"),
+ Index::String(s) => {
+ let escaped = nixlike::serialize(s)?;
+ query.push('.');
+ query.push_str(escaped.trim());
+ }
+ Index::Apply(a) => {
+ query.push(' ');
+ query.push_str(&a);
+ }
+ Index::Idx(idx) => {
+ query = format!("builtins.elemAt ({query}) {idx}");
+ }
+ }
}
let vid = self
@@ -454,6 +551,28 @@
.await
.with_context(|| format!("full path: {}", PathDisplay(&self.full_path)))
}
+ pub async fn build(&self) -> Result<HashMap<String, PathBuf>> {
+ let id = self.value.expect("can't use build on not-value");
+ let vid = self
+ .session
+ .0
+ .lock()
+ .await
+ .execute_expression_raw(&format!(":b sess_field_{id}"), &mut NixHandler::default())
+ .await?;
+ ensure!(!vid.is_empty(), "build failed");
+ let Some(vid) = vid.strip_prefix("This derivation produced the following outputs:\n")
+ else {
+ panic!("unexpected build output: {vid:?}");
+ };
+ let outputs = vid
+ .split('\n')
+ .filter(|v| !v.is_empty())
+ .map(|v| v.split_once(" -> ").expect("unexpected build output"))
+ .map(|(a, b)| (a.trim_start().to_owned(), PathBuf::from(b)))
+ .collect();
+ Ok(outputs)
+ }
}
impl Drop for Field {
fn drop(&mut self) {
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth1use std::path::PathBuf;2use std::{env::current_dir, time::Duration};34use crate::command::MyCommand;5use crate::host::Config;6use anyhow::{anyhow, Result};7use clap::Parser;8use itertools::Itertools;9use tokio::{task::LocalSet, time::sleep};10use tracing::{error, field, info, info_span, warn, Instrument};1112#[derive(Parser, Clone)]13pub struct BuildSystems {14 /// Do not continue on error15 #[clap(long)]16 fail_fast: bool,17 /// Disable automatic rollback18 #[clap(long)]19 disable_rollback: bool,20 /// Run builds as sudo21 #[clap(long)]22 privileged_build: bool,23 #[clap(subcommand)]24 subcommand: Subcommand,25}2627enum UploadAction {28 Test,29 Boot,30 Switch,31}32impl UploadAction {33 fn name(&self) -> &'static str {34 match self {35 UploadAction::Test => "test",36 UploadAction::Boot => "boot",37 UploadAction::Switch => "switch",38 }39 }4041 pub(crate) fn should_switch_profile(&self) -> bool {42 matches!(self, Self::Switch | Self::Boot)43 }44 pub(crate) fn should_activate(&self) -> bool {45 matches!(self, Self::Switch | Self::Test)46 }47 pub(crate) fn should_schedule_rollback_run(&self) -> bool {48 matches!(self, Self::Switch | Self::Test)49 }50}5152enum PackageAction {53 SdImage,54 InstallationCd,55}56impl PackageAction {57 fn build_attr(&self) -> String {58 match self {59 PackageAction::SdImage => "sdImage".to_owned(),60 PackageAction::InstallationCd => "installationCd".to_owned(),61 }62 }63}6465enum Action {66 Upload { action: Option<UploadAction> },67 Package(PackageAction),68}69impl Action {70 fn build_attr(&self) -> String {71 match self {72 Action::Upload { .. } => "toplevel".to_owned(),73 Action::Package(p) => p.build_attr(),74 }75 }76}7778impl From<Subcommand> for Action {79 fn from(s: Subcommand) -> Self {80 match s {81 Subcommand::Upload => Self::Upload { action: None },82 Subcommand::Test => Self::Upload {83 action: Some(UploadAction::Test),84 },85 Subcommand::Boot => Self::Upload {86 action: Some(UploadAction::Boot),87 },88 Subcommand::Switch => Self::Upload {89 action: Some(UploadAction::Switch),90 },91 Subcommand::SdImage => Self::Package(PackageAction::SdImage),92 Subcommand::InstallationCd => Self::Package(PackageAction::InstallationCd),93 }94 }95}9697#[derive(Parser, Clone)]98enum Subcommand {99 /// Upload, but do not switch100 Upload,101 /// Upload + switch to built system until reboot102 Test,103 /// Upload + switch to built system after reboot104 Boot,105 /// Upload + test + boot106 Switch,107108 /// Build SD .img image109 SdImage,110 /// Build an installation cd ISO image111 InstallationCd,112}113114struct Generation {115 id: u32,116 current: bool,117 datetime: String,118}119async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {120 let mut cmd = MyCommand::new("nix-env");121 cmd.comparg("--profile", "/nix/var/nix/profiles/system")122 .arg("--list-generations");123 // Sudo is required due to --list-generations acquiring lock on the profile.124 let data = config.run_string_on(host, cmd, true).await?;125 let generations = data126 .split('\n')127 .map(|e| e.trim())128 .filter(|&l| !l.is_empty())129 .filter_map(|g| {130 let gen: Option<Generation> = try {131 let mut parts = g.split_whitespace();132 let id = parts.next()?;133 let id: u32 = id.parse().ok()?;134 let date = parts.next()?;135 let time = parts.next()?;136 let current = if let Some(current) = parts.next() {137 if current == "(current)" {138 Some(true)139 } else {140 None141 }142 } else {143 Some(false)144 };145 let current = current?;146 if parts.next().is_some() {147 warn!("unexpected text after generation: {g}");148 }149 Generation {150 id,151 current,152 datetime: format!("{date} {time}"),153 }154 };155 if gen.is_none() {156 warn!("bad generation: {g}")157 }158 gen159 })160 .collect::<Vec<_>>();161 let current = generations162 .into_iter()163 .filter(|g| g.current)164 .at_most_one()165 .map_err(|_e| anyhow!("bad list-generations output"))?166 .ok_or_else(|| anyhow!("failed to find generation"))?;167 Ok(current)168}169170async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {171 let mut cmd = MyCommand::new("systemctl");172 cmd.arg("stop").arg(unit);173 config.run_on(host, cmd, true).await174}175176async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {177 let mut cmd = MyCommand::new("systemctl");178 cmd.arg("start").arg(unit);179 config.run_on(host, cmd, true).await180}181182async fn execute_upload(183 build: &BuildSystems,184 config: &Config,185 action: UploadAction,186 host: &str,187 built: PathBuf,188) -> Result<()> {189 let mut failed = false;190 // TODO: Lockfile, to prevent concurrent system switch?191 // TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback192 // is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to193 // unit name conflict in systemd-run194 // This code is tied to rollback.nix195 if !build.disable_rollback {196 let _span = info_span!("preparing").entered();197 info!("preparing for rollback");198 let generation = get_current_generation(config, host).await?;199 info!(200 "rollback target would be {} {}",201 generation.id, generation.datetime202 );203 {204 let mut cmd = MyCommand::new("sh");205 cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));206 if let Err(e) = config.run_on(host, cmd, true).await {207 error!("failed to set rollback marker: {e}");208 failed = true;209 }210 }211 // Activation script also starts rollback-watchdog.timer, however, it is possible that it won't be started.212 // Kicking it on manually will work best.213 //214 // There wouldn't be conflict, because here we trigger start of the primary service, and systemd will215 // only allow one instance of it.216217 // TODO: We should also watch how this process is going.218 // After running this command, we have less than 3 minutes to deploy everything,219 // if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.220 // Anyway, reboot will still help in this case.221 if action.should_schedule_rollback_run() {222 let mut cmd = MyCommand::new("systemd-run");223 cmd.comparg("--on-active", "3min")224 .comparg("--unit", "rollback-watchdog-run")225 .arg("systemctl")226 .arg("start")227 .arg("rollback-watchdog.service");228 if let Err(e) = config.run_on(host, cmd, true).await {229 error!("failed to schedule rollback run: {e}");230 failed = true;231 }232 }233 }234 if action.should_switch_profile() && !failed {235 info!("switching generation");236 let mut cmd = MyCommand::new("nix-env");237 cmd.comparg("--profile", "/nix/var/nix/profiles/system")238 .comparg("--set", &built);239 if let Err(e) = config.run_on(host, cmd, true).await {240 error!("failed to switch generation: {e}");241 failed = true;242 }243 }244 if action.should_activate() && !failed {245 let _span = info_span!("activating").entered();246 info!("executing activation script");247 let mut switch_script = built.clone();248 switch_script.push("bin");249 switch_script.push("switch-to-configuration");250 let mut cmd = MyCommand::new(switch_script);251 cmd.arg(action.name());252 if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {253 error!("failed to activate: {e}");254 failed = true;255 }256 }257 if !build.disable_rollback {258 if failed {259 info!("executing rollback");260 if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")261 .instrument(info_span!("rollback"))262 .await263 {264 error!("failed to trigger rollback: {e}")265 }266 } else {267 info!("trying to mark upgrade as successful");268 let mut cmd = MyCommand::new("rm");269 cmd.arg("-f").arg("/etc/fleet_rollback_marker");270 if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {271 error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")272 }273 }274 info!("disarming watchdog, just in case");275 if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {276 // It is ok, if there was no reboot - then timer might not be running.277 }278 if action.should_schedule_rollback_run() {279 if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {280 error!("failed to disarm rollback run: {e}");281 }282 }283 } else {284 let mut cmd = MyCommand::new("rm");285 cmd.arg("-f").arg("/etc/fleet_rollback_marker");286 if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {287 // Marker might not exist, yet better try to remove it.288 }289 }290 Ok(())291}292293impl BuildSystems {294 async fn build_task(self, config: Config, host: String) -> Result<()> {295 info!("building");296 let action = Action::from(self.subcommand.clone());297 let built = {298 let dir = tempfile::tempdir()?;299 dir.path().to_owned()300 };301302 let mut nix_build = MyCommand::new("nix");303 nix_build304 .args([305 "build",306 "--impure",307 "--json",308 // "--show-trace",309 "--no-link",310 ])311 .comparg("--out-link", &built)312 .arg(313 config.configuration_attr_name(&format!(314 "buildSystems.{}.{host}",315 action.build_attr()316 )),317 )318 .args(&config.nix_args);319320 if self.privileged_build {321 nix_build = nix_build.sudo();322 }323324 nix_build.run_nix().await.map_err(|e| {325 if action.build_attr() == "sdImage" {326 info!("sd-image build failed");327 info!("Make sure you have imported modulesPath/installer/sd-card/sd-image-<arch>[-installer].nix (For installer, you may want to check config)");328 info!("This module was automatically imported before, but was removed for better customization")329 }330 e331 })?;332 let built = std::fs::canonicalize(built)?;333334 match action {335 Action::Upload { action } => {336 if !config.is_local(&host) {337 info!("uploading system closure");338 {339 let mut sign = MyCommand::new("nix");340 // Private key for host machine is registered in nix-sign.nix341 sign.arg("store")342 .arg("sign")343 .comparg("--key-file", "/etc/nix/private-key")344 .arg("-r")345 .arg(&built);346 if let Err(e) = sign.sudo().run_nix().await {347 warn!("Failed to sign store paths: {e}");348 };349 }350 let mut tries = 0;351 loop {352 let mut nix = MyCommand::new("nix");353 nix.arg("copy")354 .arg("--substitute-on-destination")355 .comparg("--to", format!("ssh-ng://{host}"))356 .arg(&built);357 match nix.run_nix().await {358 Ok(()) => break,359 Err(e) if tries < 3 => {360 tries += 1;361 warn!("Copy failure ({}/3): {}", tries, e);362 sleep(Duration::from_millis(5000)).await;363 }364 Err(e) => return Err(e),365 }366 }367 }368 if let Some(action) = action {369 execute_upload(&self, &config, action, &host, built).await?370 }371 }372 Action::Package(PackageAction::SdImage) => {373 let mut out = current_dir()?;374 out.push(format!("sd-image-{}", host));375376 info!("building sd image to {:?}", out);377 let mut nix_build = MyCommand::new("nix");378 nix_build379 .args(["build", "--impure", "--no-link"])380 .comparg("--out-link", &out)381 .arg(config.configuration_attr_name(&format!("buildSystems.sdImage.{}", host,)))382 .args(&config.nix_args);383 if !self.fail_fast {384 nix_build.arg("--keep-going");385 }386 if self.privileged_build {387 nix_build = nix_build.sudo();388 }389390 nix_build.run_nix().await?;391 }392 Action::Package(PackageAction::InstallationCd) => {393 let mut out = current_dir()?;394 out.push(format!("installation-cd-{}", host));395396 info!("building sd image to {:?}", out);397 let mut nix_build = MyCommand::new("nix");398 nix_build399 .args(["build", "--impure", "--no-link"])400 .comparg("--out-link", &out)401 .arg(402 config.configuration_attr_name(&format!(403 "buildSystems.installationCd.{}",404 host,405 )),406 )407 .args(&config.nix_args);408 if !self.fail_fast {409 nix_build.arg("--keep-going");410 }411 if self.privileged_build {412 nix_build = nix_build.sudo();413 }414415 nix_build.run_nix().await?;416 }417 };418 Ok(())419 }420421 pub async fn run(self, config: &Config) -> Result<()> {422 let hosts = config.list_hosts().await?;423 let set = LocalSet::new();424 let this = &self;425 for host in hosts.into_iter() {426 if config.should_skip(&host.name) {427 continue;428 }429 let config = config.clone();430 let this = this.clone();431 let span = info_span!("deployment", host = field::display(&host.name));432 let hostname = host.name;433 set.spawn_local(434 (async move {435 match this.build_task(config, hostname).await {436 Ok(_) => {}437 Err(e) => {438 error!("failed to deploy host: {}", e)439 }440 }441 })442 .instrument(span),443 );444 }445 set.await;446 Ok(())447 }448}1use std::os::unix::fs::symlink;2use std::path::PathBuf;3use std::{env::current_dir, time::Duration};45use crate::command::MyCommand;6use crate::host::Config;7use crate::nix_path;8use anyhow::{anyhow, Result};9use clap::Parser;10use itertools::Itertools;11use tokio::{task::LocalSet, time::sleep};12use tracing::{error, field, info, info_span, warn, Instrument};1314#[derive(Parser, Clone)]15pub struct BuildSystems {16 /// Disable automatic rollback17 #[clap(long)]18 disable_rollback: bool,19 #[clap(subcommand)]20 subcommand: Subcommand,21}2223enum UploadAction {24 Test,25 Boot,26 Switch,27}28impl UploadAction {29 fn name(&self) -> &'static str {30 match self {31 UploadAction::Test => "test",32 UploadAction::Boot => "boot",33 UploadAction::Switch => "switch",34 }35 }3637 pub(crate) fn should_switch_profile(&self) -> bool {38 matches!(self, Self::Switch | Self::Boot)39 }40 pub(crate) fn should_activate(&self) -> bool {41 matches!(self, Self::Switch | Self::Test)42 }43 pub(crate) fn should_schedule_rollback_run(&self) -> bool {44 matches!(self, Self::Switch | Self::Test)45 }46}4748enum PackageAction {49 SdImage,50 InstallationCd,51}52impl PackageAction {53 fn build_attr(&self) -> String {54 match self {55 PackageAction::SdImage => "sdImage".to_owned(),56 PackageAction::InstallationCd => "installationCd".to_owned(),57 }58 }59}6061enum Action {62 Upload { action: Option<UploadAction> },63 Package(PackageAction),64}65impl Action {66 fn build_attr(&self) -> String {67 match self {68 Action::Upload { .. } => "toplevel".to_owned(),69 Action::Package(p) => p.build_attr(),70 }71 }72}7374impl From<Subcommand> for Action {75 fn from(s: Subcommand) -> Self {76 match s {77 Subcommand::Upload => Self::Upload { action: None },78 Subcommand::Test => Self::Upload {79 action: Some(UploadAction::Test),80 },81 Subcommand::Boot => Self::Upload {82 action: Some(UploadAction::Boot),83 },84 Subcommand::Switch => Self::Upload {85 action: Some(UploadAction::Switch),86 },87 Subcommand::SdImage => Self::Package(PackageAction::SdImage),88 Subcommand::InstallationCd => Self::Package(PackageAction::InstallationCd),89 }90 }91}9293#[derive(Parser, Clone)]94enum Subcommand {95 /// Upload, but do not switch96 Upload,97 /// Upload + switch to built system until reboot98 Test,99 /// Upload + switch to built system after reboot100 Boot,101 /// Upload + test + boot102 Switch,103104 /// Build SD .img image105 SdImage,106 /// Build an installation cd ISO image107 InstallationCd,108}109110struct Generation {111 id: u32,112 current: bool,113 datetime: String,114}115async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {116 let mut cmd = MyCommand::new("nix-env");117 cmd.comparg("--profile", "/nix/var/nix/profiles/system")118 .arg("--list-generations");119 // Sudo is required due to --list-generations acquiring lock on the profile.120 let data = config.run_string_on(host, cmd, true).await?;121 let generations = data122 .split('\n')123 .map(|e| e.trim())124 .filter(|&l| !l.is_empty())125 .filter_map(|g| {126 let gen: Option<Generation> = try {127 let mut parts = g.split_whitespace();128 let id = parts.next()?;129 let id: u32 = id.parse().ok()?;130 let date = parts.next()?;131 let time = parts.next()?;132 let current = if let Some(current) = parts.next() {133 if current == "(current)" {134 Some(true)135 } else {136 None137 }138 } else {139 Some(false)140 };141 let current = current?;142 if parts.next().is_some() {143 warn!("unexpected text after generation: {g}");144 }145 Generation {146 id,147 current,148 datetime: format!("{date} {time}"),149 }150 };151 if gen.is_none() {152 warn!("bad generation: {g}")153 }154 gen155 })156 .collect::<Vec<_>>();157 let current = generations158 .into_iter()159 .filter(|g| g.current)160 .at_most_one()161 .map_err(|_e| anyhow!("bad list-generations output"))?162 .ok_or_else(|| anyhow!("failed to find generation"))?;163 Ok(current)164}165166async fn systemctl_stop(config: &Config, host: &str, unit: &str) -> Result<()> {167 let mut cmd = MyCommand::new("systemctl");168 cmd.arg("stop").arg(unit);169 config.run_on(host, cmd, true).await170}171172async fn systemctl_start(config: &Config, host: &str, unit: &str) -> Result<()> {173 let mut cmd = MyCommand::new("systemctl");174 cmd.arg("start").arg(unit);175 config.run_on(host, cmd, true).await176}177178async fn execute_upload(179 build: &BuildSystems,180 config: &Config,181 action: UploadAction,182 host: &str,183 built: PathBuf,184) -> Result<()> {185 let mut failed = false;186 // TODO: Lockfile, to prevent concurrent system switch?187 // TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback188 // is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to189 // unit name conflict in systemd-run190 // This code is tied to rollback.nix191 if !build.disable_rollback {192 let _span = info_span!("preparing").entered();193 info!("preparing for rollback");194 let generation = get_current_generation(config, host).await?;195 info!(196 "rollback target would be {} {}",197 generation.id, generation.datetime198 );199 {200 let mut cmd = MyCommand::new("sh");201 cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));202 if let Err(e) = config.run_on(host, cmd, true).await {203 error!("failed to set rollback marker: {e}");204 failed = true;205 }206 }207 // Activation script also starts rollback-watchdog.timer, however, it is possible that it won't be started.208 // Kicking it on manually will work best.209 //210 // There wouldn't be conflict, because here we trigger start of the primary service, and systemd will211 // only allow one instance of it.212213 // TODO: We should also watch how this process is going.214 // After running this command, we have less than 3 minutes to deploy everything,215 // if we fail to perform generation switch in time, then we will still call the activation script, and this may break something.216 // Anyway, reboot will still help in this case.217 if action.should_schedule_rollback_run() {218 let mut cmd = MyCommand::new("systemd-run");219 cmd.comparg("--on-active", "3min")220 .comparg("--unit", "rollback-watchdog-run")221 .arg("systemctl")222 .arg("start")223 .arg("rollback-watchdog.service");224 if let Err(e) = config.run_on(host, cmd, true).await {225 error!("failed to schedule rollback run: {e}");226 failed = true;227 }228 }229 }230 if action.should_switch_profile() && !failed {231 info!("switching generation");232 let mut cmd = MyCommand::new("nix-env");233 cmd.comparg("--profile", "/nix/var/nix/profiles/system")234 .comparg("--set", &built);235 if let Err(e) = config.run_on(host, cmd, true).await {236 error!("failed to switch generation: {e}");237 failed = true;238 }239 }240 if action.should_activate() && !failed {241 let _span = info_span!("activating").entered();242 info!("executing activation script");243 let mut switch_script = built.clone();244 switch_script.push("bin");245 switch_script.push("switch-to-configuration");246 let mut cmd = MyCommand::new(switch_script);247 cmd.arg(action.name());248 if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {249 error!("failed to activate: {e}");250 failed = true;251 }252 }253 if !build.disable_rollback {254 if failed {255 info!("executing rollback");256 if let Err(e) = systemctl_start(config, host, "rollback-watchdog.service")257 .instrument(info_span!("rollback"))258 .await259 {260 error!("failed to trigger rollback: {e}")261 }262 } else {263 info!("trying to mark upgrade as successful");264 let mut cmd = MyCommand::new("rm");265 cmd.arg("-f").arg("/etc/fleet_rollback_marker");266 if let Err(e) = config.run_on(host, cmd, true).in_current_span().await {267 error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")268 }269 }270 info!("disarming watchdog, just in case");271 if let Err(_e) = systemctl_stop(config, host, "rollback-watchdog.timer").await {272 // It is ok, if there was no reboot - then timer might not be running.273 }274 if action.should_schedule_rollback_run() {275 if let Err(e) = systemctl_stop(config, host, "rollback-watchdog-run.timer").await {276 error!("failed to disarm rollback run: {e}");277 }278 }279 } else {280 let mut cmd = MyCommand::new("rm");281 cmd.arg("-f").arg("/etc/fleet_rollback_marker");282 if let Err(_e) = config.run_on(host, cmd, true).in_current_span().await {283 // Marker might not exist, yet better try to remove it.284 }285 }286 Ok(())287}288289impl BuildSystems {290 async fn build_task(self, config: Config, host: String) -> Result<()> {291 info!("building");292 let action = Action::from(self.subcommand.clone());293 let drv = config294 .fleet_field295 .select(nix_path!(.buildSystems.{action.build_attr()}.{&host}))296 .await?;297 let outputs = drv.build().await.map_err(|e| {298 if action.build_attr() == "sdImage" {299 info!("sd-image build failed");300 info!("Make sure you have imported modulesPath/installer/sd-card/sd-image-<arch>[-installer].nix (For installer, you may want to check config)");301 info!("This module was automatically imported before, but was removed for better customization")302 }303 e304 })?;305 let out_output = outputs306 .get("out")307 .ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;308309 match action {310 Action::Upload { action } => {311 if !config.is_local(&host) {312 info!("uploading system closure");313 {314 let mut sign = MyCommand::new("nix");315 // Private key for host machine is registered in nix-sign.nix316 sign.arg("store")317 .arg("sign")318 .comparg("--key-file", "/etc/nix/private-key")319 .arg("-r")320 .arg(out_output);321 if let Err(e) = sign.sudo().run_nix().await {322 warn!("Failed to sign store paths: {e}");323 };324 }325 let mut tries = 0;326 loop {327 let mut nix = MyCommand::new("nix");328 nix.arg("copy")329 .arg("--substitute-on-destination")330 .comparg("--to", format!("ssh-ng://{host}"))331 .arg(out_output);332 match nix.run_nix().await {333 Ok(()) => break,334 Err(e) if tries < 3 => {335 tries += 1;336 warn!("Copy failure ({}/3): {}", tries, e);337 sleep(Duration::from_millis(5000)).await;338 }339 Err(e) => return Err(e),340 }341 }342 }343 if let Some(action) = action {344 execute_upload(&self, &config, action, &host, out_output.clone()).await?345 }346 }347 Action::Package(PackageAction::SdImage) => {348 let mut out = current_dir()?;349 out.push(format!("sd-image-{}", host));350351 info!("linking sd image to {:?}", out);352 symlink(out_output, out)?;353 }354 Action::Package(PackageAction::InstallationCd) => {355 let mut out = current_dir()?;356 out.push(format!("installation-cd-{}", host));357358 info!("linking iso image to {:?}", out);359 symlink(out_output, out)?;360 }361 };362 Ok(())363 }364365 pub async fn run(self, config: &Config) -> Result<()> {366 let hosts = config.list_hosts().await?;367 let set = LocalSet::new();368 let this = &self;369 for host in hosts.into_iter() {370 if config.should_skip(&host.name) {371 continue;372 }373 let config = config.clone();374 let this = this.clone();375 let span = info_span!("deployment", host = field::display(&host.name));376 let hostname = host.name;377 set.spawn_local(378 (async move {379 match this.build_task(config, hostname).await {380 Ok(_) => {}381 Err(e) => {382 error!("failed to deploy host: {}", e)383 }384 }385 })386 .instrument(span),387 );388 }389 set.await;390 Ok(())391 }392}cmds/fleet/src/cmds/info.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/info.rs
+++ b/cmds/fleet/src/cmds/info.rs
@@ -1,6 +1,7 @@
use std::collections::BTreeSet;
use crate::host::Config;
+use crate::nix_path;
use anyhow::{ensure, Result};
use clap::Parser;
@@ -38,7 +39,7 @@
if !tagged.is_empty() {
let tags: Vec<String> = config
.fleet_field
- .get_field_deep(["configuredSystems", &host.name, "config", "tags"])
+ .select(nix_path!(.configuredSystems.{&host.name}.config.tags))
.await?
.as_json()
.await?;
@@ -64,7 +65,7 @@
let host = config.system_config(&host).await?;
if external {
out.extend(
- host.get_field_deep(["network", "externalIps"])
+ host.select(nix_path!(.network.externalIps))
.await?
.as_json::<Vec<String>>()
.await?,
@@ -72,7 +73,7 @@
}
if internal {
out.extend(
- host.get_field_deep(["network", "internalIps"])
+ host.select(nix_path!(.network.internalIps))
.await?
.as_json::<Vec<String>>()
.await?,
cmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -1,6 +1,6 @@
use crate::{
fleetdata::{FleetSecret, FleetSharedSecret},
- host::Config,
+ host::Config, nix_path,
};
use anyhow::{bail, ensure, Context, Result};
use chrono::Utc;
@@ -339,7 +339,7 @@
let mut data = config.shared_secret(name)?;
let expected_owners: Vec<String> = config
.config_field
- .get_json_deep(["sharedSecrets", name, "expectedOwners"])
+ .get_json_deep(nix_path!(sharedSecrets.{name}.expectedOwners))
.await?;
if expected_owners.is_empty() {
warn!("secret was removed from fleet config: {name}, removing from data");
@@ -352,7 +352,7 @@
if set != expected_set {
let owner_dependent: bool = config
.config_field
- .get_json_deep(["sharedSecrets", name, "ownerDependent"])
+ .get_json_deep(nix_path!(.sharedSecrets.{name}.ownerDependent))
.await?;
if !owner_dependent {
warn!("reencrypting secret '{name}' for new owner set");
cmds/fleet/src/command.rsdiffbeforeafterboth--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -1,5 +1,4 @@
use std::{
- borrow::Cow,
collections::HashMap,
ffi::OsStr,
process::Stdio,
@@ -247,10 +246,14 @@
pub struct NixHandler {
spans: HashMap<u64, Span>,
}
-fn process_message(m: &str) -> Cow<'_, str> {
+fn process_message(m: &str) -> String {
static OSC_CLEANER: Lazy<Regex> =
Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());
- OSC_CLEANER.replace_all(m, "")
+ static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());
+ let m = OSC_CLEANER.replace_all(m, "");
+ // Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,
+ // and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.
+ DETABBER.replace_all(m.as_ref(), " ").to_string()
}
impl Handler for NixHandler {
fn handle_line(&mut self, e: &str) {
cmds/fleet/src/host.rsdiffbeforeafterboth--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -13,9 +13,10 @@
use tempfile::NamedTempFile;
use crate::{
- better_nix_eval::{Field, NixSessionPool},
+ better_nix_eval::{Field, Index, NixSessionPool},
command::MyCommand,
fleetdata::{FleetData, FleetSecret, FleetSharedSecret},
+ nix_path,
};
pub struct FleetConfigInternals {
@@ -24,9 +25,9 @@
pub opts: FleetOpts,
pub data: Mutex<FleetData>,
pub nix_args: Vec<OsString>,
- // fleetConfigurations.<name>
+ /// fleetConfigurations.<name>.<localSystem>
pub fleet_field: Field,
- // fleet_config.configUnchecked
+ /// fleet_config.configUnchecked
pub config_field: Field,
}
@@ -91,22 +92,12 @@
command = command.ssh(host);
}
command.run_string().await
- }
-
- pub fn configuration_attr_name(&self, name: &str) -> OsString {
- let mut str = self.directory.as_os_str().to_owned();
- str.push("#");
- str.push(&format!(
- "fleetConfigurations.default.{}.{}",
- self.local_system, name
- ));
- str
}
pub async fn list_hosts(&self) -> Result<Vec<ConfigHost>> {
let names = self
.fleet_field
- .get_field_deep(["configuredHosts"])
+ .select(nix_path!(.configuredHosts))
.await?
.list_fields()
.await?;
@@ -118,7 +109,7 @@
}
pub async fn system_config(&self, host: &str) -> Result<Field> {
self.fleet_field
- .get_field_deep(["configuredSystems", host, "config"])
+ .select(nix_path!(.configuredSystems.{host}.config))
.await
}
@@ -131,7 +122,7 @@
/// Shared secrets configured in fleet.nix or in flake
pub async fn list_configured_shared(&self) -> Result<Vec<String>> {
self.config_field
- .get_field("sharedSecrets")
+ .select(nix_path!(.sharedSecrets))
.await?
.list_fields()
.await
@@ -221,7 +212,7 @@
}
pub async fn shared_secret_expected_owners(&self, secret: &str) -> Result<Vec<String>> {
self.config_field
- .get_field_deep(["sharedSecrets", secret, "expectedOwners"])
+ .select(nix_path!(.sharedSecrets.{secret}.expectedOwners))
.await?
.as_json()
.await
@@ -279,7 +270,9 @@
if self.local_system == "detect" {
let builtins_field = Field::field(root_field.clone(), "builtins").await?;
- let system = builtins_field.get_field("currentSystem").await?;
+ let system = builtins_field
+ .select(nix_path!(.currentSystem))
+ .await?;
self.local_system = system.as_json().await?;
}
let local_system = self.local_system.clone();
@@ -287,9 +280,11 @@
let fleet_root = Field::field(root_field, "fleetConfigurations").await?;
let fleet_field = fleet_root
- .get_field_deep(["default", &local_system])
+ .select(nix_path!(.default.{&local_system}))
+ .await?;
+ let config_field = fleet_field
+ .select(nix_path!(.configUnchecked))
.await?;
- let config_field = fleet_field.get_field("configUnchecked").await?;
let mut fleet_data_path = directory.clone();
fleet_data_path.push("fleet.nix");
cmds/fleet/src/main.rsdiffbeforeafterboth--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -1,3 +1,4 @@
+#![recursion_limit = "512"]
#![feature(try_blocks)]
pub(crate) mod cmds;
flake.nixdiffbeforeafterboth--- a/flake.nix
+++ b/flake.nix
@@ -19,6 +19,7 @@
rustPlatform = pkgs.makeRustPlatform { cargo = rust; rustc = rust; };
in
{
+ packages = (import ./pkgs) pkgs pkgs;
devShell = (pkgs.mkShell.override { stdenv = llvmPkgs.stdenv; }) {
nativeBuildInputs = with pkgs; [
rust