difftreelog
feat automatic rollback
in: trunk
10 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -610,6 +610,12 @@
]
[[package]]
+name = "either"
+version = "1.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07"
+
+[[package]]
name = "encode_unicode"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -684,6 +690,7 @@
"futures",
"hostname",
"indicatif",
+ "itertools",
"nixlike",
"once_cell",
"peg",
@@ -1127,6 +1134,15 @@
]
[[package]]
+name = "itertools"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
+dependencies = [
+ "either",
+]
+
+[[package]]
name = "itoa"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,2 +1,3 @@
[workspace]
members = ["crates/*", "cmds/*"]
+resolver = "2"
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -34,3 +34,4 @@
futures = "0.3.17"
tracing-indicatif = "0.3.5"
indicatif = "0.17.7"
+itertools = "0.11.0"
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -2,8 +2,9 @@
use crate::command::MyCommand;
use crate::host::Config;
-use anyhow::Result;
+use anyhow::{anyhow, Result};
use clap::Parser;
+use itertools::Itertools;
use tokio::{task::LocalSet, time::sleep};
use tracing::{error, field, info, info_span, warn, Instrument};
@@ -12,6 +13,9 @@
/// Do not continue on error
#[clap(long)]
fail_fast: bool,
+ /// Disable automatic rollback
+ #[clap(long)]
+ disable_rollback: bool,
/// Run builds as sudo
#[clap(long)]
privileged_build: bool,
@@ -39,6 +43,9 @@
pub(crate) fn should_activate(&self) -> bool {
matches!(self, Self::Switch | Self::Test)
}
+ pub(crate) fn should_schedule_rollback_run(&self) -> bool {
+ matches!(self, Self::Switch | Self::Test)
+ }
}
enum PackageAction {
@@ -103,6 +110,62 @@
InstallationCd,
}
+struct Generation {
+ id: u32,
+ current: bool,
+ datetime: String,
+}
+async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
+ let mut cmd = MyCommand::new("nix-env");
+ cmd.comparg("--profile", "/nix/var/nix/profiles/system")
+ .arg("--list-generations");
+ // Sudo is required due to --list-generations acquiring lock on the profile.
+ let data = config.run_string_on(&host, cmd, true).await?;
+ let generations = data
+ .split('\n')
+ .map(|e| e.trim())
+ .filter(|&l| l != "")
+ .filter_map(|g| {
+ let gen: Option<Generation> = try {
+ let mut parts = g.split_whitespace();
+ let id = parts.next()?;
+ let id: u32 = id.parse().ok()?;
+ let date = parts.next()?;
+ let time = parts.next()?;
+ let current = if let Some(current) = parts.next() {
+ if current == "(current)" {
+ Some(true)
+ } else {
+ None
+ }
+ } else {
+ Some(false)
+ };
+ let current = current?;
+ if parts.next().is_some() {
+ warn!("unexpected text after generation: {g}");
+ }
+ Generation {
+ id,
+ current,
+ datetime: format!("{date} {time}"),
+ }
+ };
+ if gen.is_none() {
+ warn!("bad generation: {g}")
+ }
+ gen
+ })
+ .collect::<Vec<_>>();
+ let current = generations
+ .into_iter()
+ .filter(|g| g.current)
+ .at_most_one()
+ .map_err(|_e| anyhow!("bad list-generations output"))?
+ .ok_or_else(|| anyhow!("failed to find generation"))?;
+ Ok(current)
+}
+
impl BuildSystems {
async fn build_task(self, config: Config, host: String) -> Result<()> {
info!("building");
@@ -155,6 +218,7 @@
loop {
let mut nix = MyCommand::new("nix");
nix.arg("copy")
+ .arg("--substitute-on-destination")
.comparg("--to", format!("ssh://root@{host}"))
.arg(&built);
match nix.run_nix().await {
@@ -169,21 +233,107 @@
}
}
if let Some(action) = action {
- if action.should_switch_profile() {
+ let mut failed = false;
+ // TODO: Lockfile, to prevent concurrent system switch?
+ // TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback
+ // is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to
+ // unit name conflict in systemd-run
+ if !self.disable_rollback {
+ let _span = info_span!("preparing").entered();
+ info!("preparing for rollback");
+ let generation = get_current_generation(&config, &host).await?;
+ info!(
+ "rollback target would be {} {}",
+ generation.id, generation.datetime
+ );
+ {
+ let mut cmd = MyCommand::new("sh");
+ cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
+ if let Err(e) = config.run_on(&host, cmd, true).await {
+ error!("failed to set rollback marker: {e}");
+ failed = true;
+ }
+ }
+ // Activation script also starts rollback-watchdog.timer, however, it is possible that it won't be started.
+ // Kicking it on manually will work best.
+ //
+ // There wouldn't be conflict, because here we trigger start of the primary service, and systemd will
+ // only allow one instance of it.
+ if action.should_schedule_rollback_run() {
+ let mut cmd = MyCommand::new("systemd-run");
+ cmd.comparg("--on-active", "3min")
+ .comparg("--unit", "rollback-watchdog-run")
+ .arg("systemctl")
+ .arg("start")
+ .arg("rollback-watchdog.service");
+ if let Err(e) = config.run_on(&host, cmd, true).await {
+ error!("failed to schedule rollback run: {e}");
+ failed = true;
+ }
+ }
+ }
+ if action.should_switch_profile() && !failed {
info!("switching generation");
let mut cmd = MyCommand::new("nix-env");
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.comparg("--set", &built);
- config.run_on(&host, cmd, true).await?;
+ if let Err(e) = config.run_on(&host, cmd, true).await {
+ error!("failed to switch generation: {e}");
+ failed = true;
+ }
}
- if action.should_activate() {
+ if action.should_activate() && !failed {
+ let _span = info_span!("activating").entered();
info!("executing activation script");
let mut switch_script = built.clone();
switch_script.push("bin");
switch_script.push("switch-to-configuration");
let mut cmd = MyCommand::new(switch_script);
cmd.arg(action.name());
- config.run_on(&host, cmd, true).await?;
+ if let Err(e) = config.run_on(&host, cmd, true).in_current_span().await {
+ error!("failed to activate: {e}");
+ failed = true;
+ }
+ }
+ if !self.disable_rollback {
+ {
+ let _span = info_span!("rollback").entered();
+ if failed {
+ info!("executing rollback");
+ let mut cmd = MyCommand::new("systemctl");
+ cmd.arg("start").arg("rollback-watchdog.service");
+ if let Err(e) = config.run_on(&host, cmd, true).await {
+ error!("failed to rollback: {e}");
+ }
+ } else {
+ info!("marking upgrade as successful");
+ let mut cmd = MyCommand::new("rm");
+ cmd.arg("-f").arg("/etc/fleet_rollback_marker");
+ if let Err(e) =
+ config.run_on(&host, cmd, true).in_current_span().await
+ {
+ error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
+ }
+ }
+ }
+ {
+ let _span = info_span!("disarm").entered();
+ info!("disarming watchdog, just in case");
+ {
+ let mut cmd = MyCommand::new("systemctl");
+ cmd.arg("stop").arg("rollback-watchdog.timer");
+ if let Err(_e) = config.run_on(&host, cmd, true).await {
+ // It is ok, if there was no reboot.
+ }
+ }
+ if action.should_schedule_rollback_run() {
+ let mut cmd = MyCommand::new("systemctl");
+ cmd.arg("stop").arg("rollback-watchdog-run.timer");
+ if let Err(e) = config.run_on(&host, cmd, true).await {
+ error!("failed to disarm rollback run: {e}");
+ }
+ }
+ }
}
}
}
cmds/fleet/src/command.rsdiffbeforeafterboth1use std::{collections::HashMap, ffi::OsStr, process::Stdio, task::Poll};23use anyhow::{Context, Result};4use futures::StreamExt;5use serde::{6 de::{DeserializeOwned, Visitor},7 Deserialize,8};9use tokio::{io::AsyncRead, process::Command, select};10use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};11use tracing::{info, info_span, warn, Span};12use tracing_indicatif::span_ext::IndicatifSpanExt;1314fn escape_bash(input: &str, out: &mut String) {15 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17 out.push_str(input);18 return;19 }20 out.push('\'');21 for (i, v) in input.split('\'').enumerate() {22 if i != 0 {23 out.push_str("'\"'\"'");24 }25 out.push_str(v);26 }27 out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30 os.as_ref().to_str().expect("non-utf8 data").to_owned()31}32#[derive(Clone)]33pub struct MyCommand {34 command: String,35 args: Vec<String>,36 env: Vec<(String, String)>,37}38impl MyCommand {39 pub fn new(cmd: impl AsRef<OsStr>) -> Self {40 assert!(!cmd.as_ref().is_empty());41 Self {42 command: ostoutf8(cmd),43 args: vec![],44 env: vec![],45 }46 }47 fn into_args(self) -> Vec<String> {48 let mut out = Vec::new();49 if !self.env.is_empty() {50 out.push("env".to_owned());51 for (k, v) in self.env {52 assert!(!k.contains("="));53 out.push(format!("{k}={v}"));54 }55 }56 out.push(self.command);57 out.extend(self.args.into_iter());58 out59 }60 fn into_string(self) -> String {61 let mut out = String::new();62 if !self.env.is_empty() {63 out.push_str("env");64 for (k, v) in self.env {65 out.push(' ');66 assert!(!k.contains("="));67 escape_bash(&k, &mut out);68 out.push('=');69 escape_bash(&v, &mut out);70 }71 }72 if !out.is_empty() {73 out.push(' ');74 }75 escape_bash(&self.command, &mut out);76 for arg in self.args {77 out.push(' ');78 escape_bash(&arg, &mut out);79 }80 out81 }82 fn into_command(self) -> Command {83 let mut out = Command::new(self.command);84 out.args(self.args);85 for (k, v) in self.env {86 out.env(k, v);87 }88 out89 }90 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {91 let arg = arg.as_ref();92 self.args.push(ostoutf8(arg));93 self94 }95 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {96 let arg = arg.as_ref();97 let value = value.as_ref();98 let arg = ostoutf8(arg);99 let value = ostoutf8(value);100 self.arg(format!("{arg}={value}"));101 self102 }103 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {104 self.arg(arg);105 self.arg(value);106 self107 }108 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {109 for arg in args.into_iter() {110 let arg = arg.as_ref();111 self.args.push(ostoutf8(arg));112 }113 self114 }115 pub fn sudo(self) -> Self {116 let mut out = Self::new("sudo");117 out.args(self.into_args());118 out119 }120 pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {121 let mut out = Self::new("ssh");122 out.arg(on).arg("--");123 out.arg(self.into_string());124 out125 }126127 pub async fn run(self) -> Result<()> {128 let str = self.clone().into_string();129 let cmd = self.into_command();130 run_nix_inner(str, cmd, &mut PlainHandler).await?;131 Ok(())132 }133 pub async fn run_string(self) -> Result<String> {134 let str = self.clone().into_string();135 let cmd = self.into_command();136 let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;137 Ok(v)138 }139 pub async fn run_nix_json<T: DeserializeOwned>(self) -> Result<T> {140 let str = self.run_nix_string().await?;141 serde_json::from_str(&str).with_context(|| format!("{:?}", str))142 }143144 pub async fn run_nix_string(self) -> Result<String> {145 let str = self.clone().into_string();146 let cmd = self.into_command();147 run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await148 }149 pub async fn run_nix(self) -> Result<()> {150 let str = self.clone().into_string();151 let mut cmd = self.into_command();152 cmd.stdout(Stdio::inherit());153 run_nix_inner(str, cmd, &mut NixHandler::default()).await154 }155}156157struct EmptyAsyncRead;158impl AsyncRead for EmptyAsyncRead {159 fn poll_read(160 self: std::pin::Pin<&mut Self>,161 _cx: &mut std::task::Context<'_>,162 _buf: &mut tokio::io::ReadBuf<'_>,163 ) -> Poll<std::io::Result<()>> {164 Poll::Pending165 }166}167168async fn run_nix_inner_stdout(169 str: String,170 cmd: Command,171 handler: &mut dyn Handler,172) -> Result<String> {173 Ok(run_nix_inner_raw(str, cmd, true, handler)174 .await?175 .expect("has out"))176}177async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {178 let v = run_nix_inner_raw(str, cmd, false, handler).await?;179 assert!(v.is_none());180 Ok(())181}182183trait Handler {184 fn handle_err(&mut self, e: &str);185 fn handle_info(&mut self, e: &str);186}187188struct PlainHandler;189impl Handler for PlainHandler {190 fn handle_err(&mut self, e: &str) {191 info!(target: "log", "{e}");192 }193194 fn handle_info(&mut self, e: &str) {195 info!(target: "log", "{e}");196 }197}198199#[derive(Default)]200struct NixHandler {201 spans: HashMap<u64, Span>,202}203impl Handler for NixHandler {204 fn handle_err(&mut self, e: &str) {205 if let Some(e) = e.strip_prefix("@nix ") {206 let log: NixLog = match serde_json::from_str(e) {207 Ok(l) => l,208 Err(err) => {209 warn!("failed to parse nix log line {:?}: {}", e, err);210 return;211 }212 };213 match log {214 NixLog::Msg { msg, raw_msg, .. } => {215 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))216 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")217 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {218 if let Some(raw_msg) = raw_msg {219 if !msg.is_empty() {220 info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())221 } else {222 info!(target: "nix", "{}", raw_msg.trim_end())223 }224 } else {225 info!(target: "nix", "{}", msg.trim_end())226 }227 }228 }229 NixLog::Start {230 ref fields,231 typ,232 id,233 ..234 } if typ == 105 && !fields.is_empty() => {235 if let [LogField::String(drv), ..] = &fields[..] {236 let mut drv = drv.as_str();237 if let Some(pkg) = drv.strip_prefix("/nix/store/") {238 let mut it = pkg.splitn(2, '-');239 it.next();240 if let Some(pkg) = it.next() {241 drv = pkg;242 }243 }244 info!(target: "nix","building {}", drv);245 let span = info_span!("build", drv);246 span.pb_start();247 self.spans.insert(id, span);248 } else {249 warn!("bad build log: {:?}", log)250 }251 }252 NixLog::Start {253 ref fields,254 typ,255 id,256 ..257 } if typ == 100 && fields.len() >= 3 => {258 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =259 &fields[..]260 {261 let mut drv = drv.as_str();262263 if let Some(pkg) = drv.strip_prefix("/nix/store/") {264 let mut it = pkg.splitn(2, '-');265 it.next();266 if let Some(pkg) = it.next() {267 drv = pkg;268 }269 }270 info!(target: "nix","copying {} {} -> {}", drv, from, to);271 let span = info_span!("copy", from, to, drv);272 span.pb_start();273 self.spans.insert(id, span);274 } else {275 warn!("bad copy log: {:?}", log)276 }277 }278 NixLog::Start { text, typ, id, .. }279 if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>280 {281 if !text.is_empty()282 && text != "querying info about missing paths"283 && text != "copying 0 paths"284 {285 let span = info_span!("job");286 span.pb_start();287 span.pb_set_message(text.trim());288 self.spans.insert(id, span);289 info!(target: "nix", "{}", text);290 }291 }292 NixLog::Start {293 text,294 level: 0,295 typ: 108,296 ..297 } if text.is_empty() => {298 // Cache lookup? Coupled with copy log299 }300 NixLog::Start {301 text,302 level: 4,303 typ: 109,304 ..305 } if text.starts_with("querying info about ") => {306 // Cache lookup307 }308 NixLog::Start {309 text,310 level: 4,311 typ: 101,312 ..313 } if text.starts_with("downloading ") => {314 // NAR downloading, coupled with copy log315 }316 NixLog::Start {317 text,318 level: 1,319 typ: 111,320 ..321 } if text.starts_with("waiting for a machine to build ") => {322 // Useless repeating notification about build323 }324 NixLog::Start {325 text,326 level: 3,327 typ: 111,328 ..329 } if text.starts_with("resolved derivation: ") => {330 // CA resolved331 }332 NixLog::Start {333 text,334 level: 1,335 typ: 111,336 id,337 ..338 } if text.starts_with("waiting for lock on ") => {339 let mut drv = text.strip_prefix("waiting for lock on ").unwrap();340 if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {341 drv = txt;342 }343 if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {344 drv = txt;345 }346 if let Some(txt) = drv.split("', '").next() {347 drv = txt;348 }349 if let Some(pkg) = drv.strip_prefix("/nix/store/") {350 let mut it = pkg.splitn(2, '-');351 it.next();352 if let Some(pkg) = it.next() {353 drv = pkg;354 }355 }356 let span = info_span!("waiting on drv", drv);357 span.pb_start();358 self.spans.insert(id, span);359 // Concurrent build of the same message360 }361 NixLog::Stop { id, .. } => {362 self.spans.remove(&id);363 }364 NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {365 if let Some(span) = self.spans.get(&id) {366 if let LogField::String(s) = &fields[0] {367 span.pb_set_message(s.trim());368 } else {369 warn!("bad fields: {fields:?}");370 }371 } else {372 warn!("unknown result id: {id} {typ} {fields:?}");373 }374 // dbg!(fields, id, typ);375 }376 NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {377 if let Some(span) = self.spans.get(&id) {378 if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =379 &fields[..4]380 {381 span.pb_set_length(*expected);382 span.pb_set_position(*done);383 } else {384 warn!("bad fields: {fields:?}");385 }386 } else {387 // warn!("unknown result id: {id} {typ} {fields:?}");388 // Unaccounted progress.389 }390 // dbg!(fields, id, typ);391 }392 NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {393 // Set phase, expected394 }395 _ => warn!("unknown log: {:?}", log),396 };397 } else {398 warn!(target = "nix", "unknown: {}", e.trim())399 }400 }401 fn handle_info(&mut self, o: &str) {402 self.handle_err(o)403 }404}405406async fn run_nix_inner_raw(407 str: String,408 mut cmd: Command,409 want_stdout: bool,410 handler: &mut dyn Handler,411) -> Result<Option<String>> {412 info!("running {str}");413 cmd.arg("--log-format").arg("internal-json");414 cmd.stderr(Stdio::piped());415 cmd.stdout(Stdio::piped());416 let mut child = cmd.spawn()?;417 let mut stderr = child.stderr.take().unwrap();418 let stdout = child.stdout.take().unwrap();419 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());420 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));421 let mut ob = want_stdout422 .then(|| out.take().unwrap())423 .unwrap_or_else(|| Box::new(EmptyAsyncRead));424 let mut ol = (!want_stdout)425 .then(|| out.take().unwrap())426 .unwrap_or_else(|| Box::new(EmptyAsyncRead));427 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());428 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());429430 // while let Some(line) = read.next().await? {}431432 let mut out_buf = if want_stdout { Some(vec![]) } else { None };433 loop {434 select! {435 e = err.next() => {436 if let Some(e) = e {437 let e = e?;438 handler.handle_err(&e);439 }440 },441 o = ob.next() => {442 if let Some(o) = o {443 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);444 }445 },446 o = ol.next() => {447 if let Some(o) = o {448 let o = o?;449 handler.handle_info(&o);450 }451 },452 code = child.wait() => {453 let code = code?;454 if !code.success() {455 anyhow::bail!("command '{str}' failed with status {}", code);456 }457 break;458 }459 }460 }461462 Ok(out_buf.map(String::from_utf8).transpose()?)463}464465#[derive(Debug)]466enum LogField {467 String(String),468 Num(u64),469}470471impl<'de> Deserialize<'de> for LogField {472 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>473 where474 D: serde::Deserializer<'de>,475 {476 struct StringOrNum;477 impl<'de> Visitor<'de> for StringOrNum {478 type Value = LogField;479480 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {481 write!(f, "string or unsigned")482 }483484 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>485 where486 E: serde::de::Error,487 {488 Ok(LogField::String(v.to_owned()))489 }490491 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>492 where493 E: serde::de::Error,494 {495 Ok(LogField::Num(v))496 }497 }498499 deserializer.deserialize_any(StringOrNum)500 }501}502503#[derive(Deserialize, Debug)]504#[serde(rename_all = "camelCase", tag = "action")]505#[allow(dead_code)]506enum NixLog {507 Msg {508 level: u32,509 msg: String,510 raw_msg: Option<String>,511 },512 Start {513 id: u64,514 level: u32,515 #[serde(default)]516 fields: Vec<LogField>,517 text: String,518 #[serde(rename = "type")]519 typ: u32,520 },521 Stop {522 id: u64,523 },524 Result {525 id: u64,526 #[serde(rename = "type")]527 typ: u32,528 #[serde(default)]529 fields: Vec<LogField>,530 },531}cmds/fleet/src/main.rsdiffbeforeafterboth--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -1,3 +1,5 @@
+#![feature(try_blocks)]
+
pub mod cmds;
pub mod command;
pub mod host;
@@ -6,16 +8,14 @@
mod fleetdata;
use std::ffi::OsString;
-use std::io;
use std::time::Duration;
-use anyhow::{anyhow, bail, Result};
+use anyhow::{bail, Result};
use clap::Parser;
use cmds::{build_systems::BuildSystems, info::Info, secrets::Secrets};
use host::{Config, FleetOpts};
use indicatif::{ProgressState, ProgressStyle};
-use tokio::fs;
use tokio::process::Command;
use tracing::{info, metadata::LevelFilter};
use tracing_indicatif::IndicatifLayer;
@@ -79,9 +79,6 @@
Opts::Prefetch(p) => p.run(config).await?,
};
Ok(())
-}
-fn elapsed_subsec(state: &ProgressState, writer: &mut dyn std::fmt::Write) {
- let _ = writer.write_str(&format!("{:?}", state.elapsed()));
}
#[tokio::main]
cmds/install-secrets/Cargo.tomldiffbeforeafterboth--- a/cmds/install-secrets/Cargo.toml
+++ b/cmds/install-secrets/Cargo.toml
@@ -9,7 +9,7 @@
env_logger = "0.10.0"
log = "0.4.14"
nix = "0.26.1"
-serde = "1.0.130"
+serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.89"
clap = { version = "4.0.29", features = [
"derive",
nixos/modules/module-list.nixdiffbeforeafterboth--- a/nixos/modules/module-list.nix
+++ b/nixos/modules/module-list.nix
@@ -2,4 +2,5 @@
../fleetPkgs.nix
../meta.nix
../secrets.nix
+ ../rollback.nix
]
nixos/rollback.nixdiffbeforeafterboth--- /dev/null
+++ b/nixos/rollback.nix
@@ -0,0 +1,45 @@
+{config, ...}: {
+ # TODO: Make it work with systemd-initrd approach.
+ # In this case we can't just switch generation and re-run activation script, since the root filesystem might not be
+ # mounted yet. We need to explicitly remove the last generation, and this needs deeper integration with systemd/grub/
+ # whatever user uses. boot.json also might help here.
+
+ systemd.services.rollback-watchdog = {
+ description = "Rollback watchdog";
+ script = ''
+ set -eu
+ if [ -f /etc/fleet_rollback_marker ]; then
+ echo "found the rollback marker, switching to older generation"
+ target=$(cat /etc/fleet_rollback_marker)
+ echo "rolling back profile"
+ nix profile rollback --profile /nix/var/nix/profiles/system --to "$target"
+ echo "executing activation script"
+ "/nix/var/nix/profiles/system-$target-link/bin/switch-to-configuration" switch
+ echo "removing rollback marker"
+ rm -f /etc/fleet_rollback_marker
+ else
+ echo "rollback marker was removed, upgrade is succeeded"
+ fi
+ '';
+ path = [
+ # Should have nix-command support
+ config.nix.package
+ ];
+ serviceConfig.Type = "exec";
+ unitConfig = {
+ X-StopOnRemoval = false;
+ };
+ };
+
+ systemd.timers.rollback-watchdog = {
+ description = "Timer for rollback watchdog";
+ wantedBy = ["timers.target"];
+ timerConfig = {
+ OnUnitActiveSec = "3min";
+ RemainAfterElapse = false;
+ };
+ unitConfig = {
+ ConditionPathExists = "/etc/fleet_rollback_marker";
+ };
+ };
+}
pkgs/fleet-install-secrets.nixdiffbeforeafterboth--- a/pkgs/fleet-install-secrets.nix
+++ b/pkgs/fleet-install-secrets.nix
@@ -6,7 +6,7 @@
name = "${pname}-${version}";
src = ../.;
- cargoBuildFlags = "-p ${pname}";
+ buildAndTestSubdir = "cmds/install-secrets";
cargoLock = {
lockFile = ../Cargo.lock;
outputHashes = {