difftreelog
feat automatic rollback
in: trunk
10 files changed
Cargo.lockdiffbeforeafterboth--- a/Cargo.lock
+++ b/Cargo.lock
@@ -610,6 +610,12 @@
]
[[package]]
+name = "either"
+version = "1.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07"
+
+[[package]]
name = "encode_unicode"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -684,6 +690,7 @@
"futures",
"hostname",
"indicatif",
+ "itertools",
"nixlike",
"once_cell",
"peg",
@@ -1127,6 +1134,15 @@
]
[[package]]
+name = "itertools"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
+dependencies = [
+ "either",
+]
+
+[[package]]
name = "itoa"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
Cargo.tomldiffbeforeafterboth--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,2 +1,3 @@
[workspace]
members = ["crates/*", "cmds/*"]
+resolver = "2"
cmds/fleet/Cargo.tomldiffbeforeafterboth--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -34,3 +34,4 @@
futures = "0.3.17"
tracing-indicatif = "0.3.5"
indicatif = "0.17.7"
+itertools = "0.11.0"
cmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -2,8 +2,9 @@
use crate::command::MyCommand;
use crate::host::Config;
-use anyhow::Result;
+use anyhow::{anyhow, Result};
use clap::Parser;
+use itertools::Itertools;
use tokio::{task::LocalSet, time::sleep};
use tracing::{error, field, info, info_span, warn, Instrument};
@@ -12,6 +13,9 @@
/// Do not continue on error
#[clap(long)]
fail_fast: bool,
+ /// Disable automatic rollback
+ #[clap(long)]
+ disable_rollback: bool,
/// Run builds as sudo
#[clap(long)]
privileged_build: bool,
@@ -39,6 +43,9 @@
pub(crate) fn should_activate(&self) -> bool {
matches!(self, Self::Switch | Self::Test)
}
+ pub(crate) fn should_schedule_rollback_run(&self) -> bool {
+ matches!(self, Self::Switch | Self::Test)
+ }
}
enum PackageAction {
@@ -103,6 +110,62 @@
InstallationCd,
}
+struct Generation {
+ id: u32,
+ current: bool,
+ datetime: String,
+}
+async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
+ let mut cmd = MyCommand::new("nix-env");
+ cmd.comparg("--profile", "/nix/var/nix/profiles/system")
+ .arg("--list-generations");
+ // Sudo is required due to --list-generations acquiring lock on the profile.
+ let data = config.run_string_on(&host, cmd, true).await?;
+ let generations = data
+ .split('\n')
+ .map(|e| e.trim())
+ .filter(|&l| l != "")
+ .filter_map(|g| {
+ let gen: Option<Generation> = try {
+ let mut parts = g.split_whitespace();
+ let id = parts.next()?;
+ let id: u32 = id.parse().ok()?;
+ let date = parts.next()?;
+ let time = parts.next()?;
+ let current = if let Some(current) = parts.next() {
+ if current == "(current)" {
+ Some(true)
+ } else {
+ None
+ }
+ } else {
+ Some(false)
+ };
+ let current = current?;
+ if parts.next().is_some() {
+ warn!("unexpected text after generation: {g}");
+ }
+ Generation {
+ id,
+ current,
+ datetime: format!("{date} {time}"),
+ }
+ };
+ if gen.is_none() {
+ warn!("bad generation: {g}")
+ }
+ gen
+ })
+ .collect::<Vec<_>>();
+ let current = generations
+ .into_iter()
+ .filter(|g| g.current)
+ .at_most_one()
+ .map_err(|_e| anyhow!("bad list-generations output"))?
+ .ok_or_else(|| anyhow!("failed to find generation"))?;
+ Ok(current)
+}
+
impl BuildSystems {
async fn build_task(self, config: Config, host: String) -> Result<()> {
info!("building");
@@ -155,6 +218,7 @@
loop {
let mut nix = MyCommand::new("nix");
nix.arg("copy")
+ .arg("--substitute-on-destination")
.comparg("--to", format!("ssh://root@{host}"))
.arg(&built);
match nix.run_nix().await {
@@ -169,21 +233,107 @@
}
}
if let Some(action) = action {
- if action.should_switch_profile() {
+ let mut failed = false;
+ // TODO: Lockfile, to prevent concurrent system switch?
+ // TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback
+ // is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to
+ // unit name conflict in systemd-run
+ if !self.disable_rollback {
+ let _span = info_span!("preparing").entered();
+ info!("preparing for rollback");
+ let generation = get_current_generation(&config, &host).await?;
+ info!(
+ "rollback target would be {} {}",
+ generation.id, generation.datetime
+ );
+ {
+ let mut cmd = MyCommand::new("sh");
+ cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
+ if let Err(e) = config.run_on(&host, cmd, true).await {
+ error!("failed to set rollback marker: {e}");
+ failed = true;
+ }
+ }
+ // Activation script also starts rollback-watchdog.timer, however, it is possible that it won't be started.
+ // Kicking it on manually will work best.
+ //
+ // There wouldn't be conflict, because here we trigger start of the primary service, and systemd will
+ // only allow one instance of it.
+ if action.should_schedule_rollback_run() {
+ let mut cmd = MyCommand::new("systemd-run");
+ cmd.comparg("--on-active", "3min")
+ .comparg("--unit", "rollback-watchdog-run")
+ .arg("systemctl")
+ .arg("start")
+ .arg("rollback-watchdog.service");
+ if let Err(e) = config.run_on(&host, cmd, true).await {
+ error!("failed to schedule rollback run: {e}");
+ failed = true;
+ }
+ }
+ }
+ if action.should_switch_profile() && !failed {
info!("switching generation");
let mut cmd = MyCommand::new("nix-env");
cmd.comparg("--profile", "/nix/var/nix/profiles/system")
.comparg("--set", &built);
- config.run_on(&host, cmd, true).await?;
+ if let Err(e) = config.run_on(&host, cmd, true).await {
+ error!("failed to switch generation: {e}");
+ failed = true;
+ }
}
- if action.should_activate() {
+ if action.should_activate() && !failed {
+ let _span = info_span!("activating").entered();
info!("executing activation script");
let mut switch_script = built.clone();
switch_script.push("bin");
switch_script.push("switch-to-configuration");
let mut cmd = MyCommand::new(switch_script);
cmd.arg(action.name());
- config.run_on(&host, cmd, true).await?;
+ if let Err(e) = config.run_on(&host, cmd, true).in_current_span().await {
+ error!("failed to activate: {e}");
+ failed = true;
+ }
+ }
+ if !self.disable_rollback {
+ {
+ let _span = info_span!("rollback").entered();
+ if failed {
+ info!("executing rollback");
+ let mut cmd = MyCommand::new("systemctl");
+ cmd.arg("start").arg("rollback-watchdog.service");
+ if let Err(e) = config.run_on(&host, cmd, true).await {
+ error!("failed to rollback: {e}");
+ }
+ } else {
+ info!("marking upgrade as successful");
+ let mut cmd = MyCommand::new("rm");
+ cmd.arg("-f").arg("/etc/fleet_rollback_marker");
+ if let Err(e) =
+ config.run_on(&host, cmd, true).in_current_span().await
+ {
+ error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
+ }
+ }
+ }
+ {
+ let _span = info_span!("disarm").entered();
+ info!("disarming watchdog, just in case");
+ {
+ let mut cmd = MyCommand::new("systemctl");
+ cmd.arg("stop").arg("rollback-watchdog.timer");
+ if let Err(_e) = config.run_on(&host, cmd, true).await {
+ // It is ok, if there was no reboot.
+ }
+ }
+ if action.should_schedule_rollback_run() {
+ let mut cmd = MyCommand::new("systemctl");
+ cmd.arg("stop").arg("rollback-watchdog-run.timer");
+ if let Err(e) = config.run_on(&host, cmd, true).await {
+ error!("failed to disarm rollback run: {e}");
+ }
+ }
+ }
}
}
}
cmds/fleet/src/command.rsdiffbeforeafterboth--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -143,12 +143,14 @@
pub async fn run_nix_string(self) -> Result<String> {
let str = self.clone().into_string();
- let cmd = self.into_command();
+ let mut cmd = self.into_command();
+ cmd.arg("--log-format").arg("internal-json");
run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await
}
pub async fn run_nix(self) -> Result<()> {
let str = self.clone().into_string();
let mut cmd = self.into_command();
+ cmd.arg("--log-format").arg("internal-json");
cmd.stdout(Stdio::inherit());
run_nix_inner(str, cmd, &mut NixHandler::default()).await
}
@@ -410,7 +412,6 @@
handler: &mut dyn Handler,
) -> Result<Option<String>> {
info!("running {str}");
- cmd.arg("--log-format").arg("internal-json");
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());
let mut child = cmd.spawn()?;
cmds/fleet/src/main.rsdiffbeforeafterboth1pub mod cmds;2pub mod command;3pub mod host;4pub mod keys;56mod fleetdata;78use std::ffi::OsString;9use std::io;10use std::time::Duration;1112use anyhow::{anyhow, bail, Result};13use clap::Parser;1415use cmds::{build_systems::BuildSystems, info::Info, secrets::Secrets};16use host::{Config, FleetOpts};17use indicatif::{ProgressState, ProgressStyle};18use tokio::fs;19use tokio::process::Command;20use tracing::{info, metadata::LevelFilter};21use tracing_indicatif::IndicatifLayer;22use tracing_subscriber::{prelude::*, EnvFilter};2324#[derive(Parser)]25struct Prefetch {}26impl Prefetch {27 async fn run(&self, config: &Config) -> Result<()> {28 let mut prefetch_dir = config.directory.to_path_buf();29 prefetch_dir.push("prefetch");30 if !prefetch_dir.is_dir() {31 info!("nothing to prefetch: no prefetch directory");32 return Ok(());33 }34 for entry in std::fs::read_dir(&prefetch_dir)? {35 let entry = entry?;36 if !entry.metadata()?.is_file() {37 bail!("only files should exist in prefetch directory");38 }39 info!("prefetching {:?}", entry.file_name());40 let mut path = OsString::new();41 path.push("file://");42 path.push(entry.path());43 let status = Command::new("nix-prefetch-url").arg(path).status().await?;44 if !status.success() {45 bail!("failed with {status}");46 }47 }48 Ok(())49 }50}5152#[derive(Parser)]53enum Opts {54 /// Prepare systems for deployments55 BuildSystems(BuildSystems),56 /// Secret management57 #[clap(subcommand)]58 Secrets(Secrets),59 /// Upload prefetch directory to the nix store60 Prefetch(Prefetch),61 /// Config parsing62 Info(Info),63}6465#[derive(Parser)]66#[clap(version = "1.0", author)]67struct RootOpts {68 #[clap(flatten)]69 fleet_opts: FleetOpts,70 #[clap(subcommand)]71 command: Opts,72}7374async fn run_command(config: &Config, command: Opts) -> Result<()> {75 match command {76 Opts::BuildSystems(c) => c.run(config).await?,77 Opts::Secrets(s) => s.run(config).await?,78 Opts::Info(i) => i.run(config).await?,79 Opts::Prefetch(p) => p.run(config).await?,80 };81 Ok(())82}83fn elapsed_subsec(state: &ProgressState, writer: &mut dyn std::fmt::Write) {84 let _ = writer.write_str(&format!("{:?}", state.elapsed()));85}8687#[tokio::main]88async fn main() -> Result<()> {89 let indicatif_layer = IndicatifLayer::new().with_progress_style(90 ProgressStyle::with_template(91 "{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{pos:>7}/{len:7}{elapsed}{color_end}",92 )93 .unwrap()94 .with_key(95 "color_start",96 |state: &ProgressState, writer: &mut dyn std::fmt::Write| {97 let elapsed = state.elapsed();9899 if elapsed > Duration::from_secs(60) {100 // Red101 let _ = write!(writer, "\x1b[{}m", 1 + 30);102 } else if elapsed > Duration::from_secs(30) {103 // Yellow104 let _ = write!(writer, "\x1b[{}m", 3 + 30);105 }106 },107 )108 .with_key(109 "color_end",110 |state: &ProgressState, writer: &mut dyn std::fmt::Write| {111 if state.elapsed() > Duration::from_secs(30) {112 let _ = write!(writer, "\x1b[0m");113 }114 },115 ),116 );117118 let filter = EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into());119120 tracing_subscriber::registry()121 .with(122 tracing_subscriber::fmt::layer()123 .without_time()124 .with_target(false)125 .with_writer(indicatif_layer.get_stderr_writer())126 .with_filter(filter), // .withou,127 )128 .with(indicatif_layer)129 .init();130 info!("Starting");131 let mut os_args = std::env::args_os();132 let opts = RootOpts::parse_from((&mut os_args).take_while(|v| v != "--"));133 let config = opts.fleet_opts.build(os_args.collect()).await?;134135 match run_command(&config, opts.command).await {136 Ok(()) => {137 config.save()?;138 Ok(())139 }140 Err(e) => {141 let _ = config.save();142 Err(e)143 }144 }145}1#![feature(try_blocks)]23pub mod cmds;4pub mod command;5pub mod host;6pub mod keys;78mod fleetdata;910use std::ffi::OsString;11use std::time::Duration;1213use anyhow::{bail, Result};14use clap::Parser;1516use cmds::{build_systems::BuildSystems, info::Info, secrets::Secrets};17use host::{Config, FleetOpts};18use indicatif::{ProgressState, ProgressStyle};19use tokio::process::Command;20use tracing::{info, metadata::LevelFilter};21use tracing_indicatif::IndicatifLayer;22use tracing_subscriber::{prelude::*, EnvFilter};2324#[derive(Parser)]25struct Prefetch {}26impl Prefetch {27 async fn run(&self, config: &Config) -> Result<()> {28 let mut prefetch_dir = config.directory.to_path_buf();29 prefetch_dir.push("prefetch");30 if !prefetch_dir.is_dir() {31 info!("nothing to prefetch: no prefetch directory");32 return Ok(());33 }34 for entry in std::fs::read_dir(&prefetch_dir)? {35 let entry = entry?;36 if !entry.metadata()?.is_file() {37 bail!("only files should exist in prefetch directory");38 }39 info!("prefetching {:?}", entry.file_name());40 let mut path = OsString::new();41 path.push("file://");42 path.push(entry.path());43 let status = Command::new("nix-prefetch-url").arg(path).status().await?;44 if !status.success() {45 bail!("failed with {status}");46 }47 }48 Ok(())49 }50}5152#[derive(Parser)]53enum Opts {54 /// Prepare systems for deployments55 BuildSystems(BuildSystems),56 /// Secret management57 #[clap(subcommand)]58 Secrets(Secrets),59 /// Upload prefetch directory to the nix store60 Prefetch(Prefetch),61 /// Config parsing62 Info(Info),63}6465#[derive(Parser)]66#[clap(version = "1.0", author)]67struct RootOpts {68 #[clap(flatten)]69 fleet_opts: FleetOpts,70 #[clap(subcommand)]71 command: Opts,72}7374async fn run_command(config: &Config, command: Opts) -> Result<()> {75 match command {76 Opts::BuildSystems(c) => c.run(config).await?,77 Opts::Secrets(s) => s.run(config).await?,78 Opts::Info(i) => i.run(config).await?,79 Opts::Prefetch(p) => p.run(config).await?,80 };81 Ok(())82}8384#[tokio::main]85async fn main() -> Result<()> {86 let indicatif_layer = IndicatifLayer::new().with_progress_style(87 ProgressStyle::with_template(88 "{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{pos:>7}/{len:7}{elapsed}{color_end}",89 )90 .unwrap()91 .with_key(92 "color_start",93 |state: &ProgressState, writer: &mut dyn std::fmt::Write| {94 let elapsed = state.elapsed();9596 if elapsed > Duration::from_secs(60) {97 // Red98 let _ = write!(writer, "\x1b[{}m", 1 + 30);99 } else if elapsed > Duration::from_secs(30) {100 // Yellow101 let _ = write!(writer, "\x1b[{}m", 3 + 30);102 }103 },104 )105 .with_key(106 "color_end",107 |state: &ProgressState, writer: &mut dyn std::fmt::Write| {108 if state.elapsed() > Duration::from_secs(30) {109 let _ = write!(writer, "\x1b[0m");110 }111 },112 ),113 );114115 let filter = EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into());116117 tracing_subscriber::registry()118 .with(119 tracing_subscriber::fmt::layer()120 .without_time()121 .with_target(false)122 .with_writer(indicatif_layer.get_stderr_writer())123 .with_filter(filter), // .withou,124 )125 .with(indicatif_layer)126 .init();127 info!("Starting");128 let mut os_args = std::env::args_os();129 let opts = RootOpts::parse_from((&mut os_args).take_while(|v| v != "--"));130 let config = opts.fleet_opts.build(os_args.collect()).await?;131132 match run_command(&config, opts.command).await {133 Ok(()) => {134 config.save()?;135 Ok(())136 }137 Err(e) => {138 let _ = config.save();139 Err(e)140 }141 }142}cmds/install-secrets/Cargo.tomldiffbeforeafterboth--- a/cmds/install-secrets/Cargo.toml
+++ b/cmds/install-secrets/Cargo.toml
@@ -9,7 +9,7 @@
env_logger = "0.10.0"
log = "0.4.14"
nix = "0.26.1"
-serde = "1.0.130"
+serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.89"
clap = { version = "4.0.29", features = [
"derive",
nixos/modules/module-list.nixdiffbeforeafterboth--- a/nixos/modules/module-list.nix
+++ b/nixos/modules/module-list.nix
@@ -2,4 +2,5 @@
../fleetPkgs.nix
../meta.nix
../secrets.nix
+ ../rollback.nix
]
nixos/rollback.nixdiffbeforeafterboth--- /dev/null
+++ b/nixos/rollback.nix
@@ -0,0 +1,45 @@
+{config, ...}: {
+ # TODO: Make it work with systemd-initrd approach.
+ # In this case we can't just switch generation and re-run activation script, since the root filesystem might not be
+ # mounted yet. We need to explicitly remove the last generation, and this needs deeper integration with systemd/grub/
+ # whatever user uses. boot.json also might help here.
+
+ systemd.services.rollback-watchdog = {
+ description = "Rollback watchdog";
+ script = ''
+ set -eu
+ if [ -f /etc/fleet_rollback_marker ]; then
+ echo "found the rollback marker, switching to older generation"
+ target=$(cat /etc/fleet_rollback_marker)
+ echo "rolling back profile"
+ nix profile rollback --profile /nix/var/nix/profiles/system --to "$target"
+ echo "executing activation script"
+ "/nix/var/nix/profiles/system-$target-link/bin/switch-to-configuration" switch
+ echo "removing rollback marker"
+ rm -f /etc/fleet_rollback_marker
+ else
+ echo "rollback marker was removed, upgrade is succeeded"
+ fi
+ '';
+ path = [
+ # Should have nix-command support
+ config.nix.package
+ ];
+ serviceConfig.Type = "exec";
+ unitConfig = {
+ X-StopOnRemoval = false;
+ };
+ };
+
+ systemd.timers.rollback-watchdog = {
+ description = "Timer for rollback watchdog";
+ wantedBy = ["timers.target"];
+ timerConfig = {
+ OnUnitActiveSec = "3min";
+ RemainAfterElapse = false;
+ };
+ unitConfig = {
+ ConditionPathExists = "/etc/fleet_rollback_marker";
+ };
+ };
+}
pkgs/fleet-install-secrets.nixdiffbeforeafterboth--- a/pkgs/fleet-install-secrets.nix
+++ b/pkgs/fleet-install-secrets.nix
@@ -6,7 +6,7 @@
name = "${pname}-${version}";
src = ../.;
- cargoBuildFlags = "-p ${pname}";
+ buildAndTestSubdir = "cmds/install-secrets";
cargoLock = {
lockFile = ../Cargo.lock;
outputHashes = {