git.delta.rocks / jrsonnet / refs/commits / 741106e60111

difftreelog

feat automatic rollback

Yaroslav Bolyukin2023-10-15parent: #4340a04.patch.diff
in: trunk

10 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -610,6 +610,12 @@
 ]
 
 [[package]]
+name = "either"
+version = "1.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07"
+
+[[package]]
 name = "encode_unicode"
 version = "0.3.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -684,6 +690,7 @@
  "futures",
  "hostname",
  "indicatif",
+ "itertools",
  "nixlike",
  "once_cell",
  "peg",
@@ -1127,6 +1134,15 @@
 ]
 
 [[package]]
+name = "itertools"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
+dependencies = [
+ "either",
+]
+
+[[package]]
 name = "itoa"
 version = "1.0.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,2 +1,3 @@
 [workspace]
 members = ["crates/*", "cmds/*"]
+resolver = "2"
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -34,3 +34,4 @@
 futures = "0.3.17"
 tracing-indicatif = "0.3.5"
 indicatif = "0.17.7"
+itertools = "0.11.0"
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -2,8 +2,9 @@
 
 use crate::command::MyCommand;
 use crate::host::Config;
-use anyhow::Result;
+use anyhow::{anyhow, Result};
 use clap::Parser;
+use itertools::Itertools;
 use tokio::{task::LocalSet, time::sleep};
 use tracing::{error, field, info, info_span, warn, Instrument};
 
@@ -12,6 +13,9 @@
 	/// Do not continue on error
 	#[clap(long)]
 	fail_fast: bool,
+	/// Disable automatic rollback
+	#[clap(long)]
+	disable_rollback: bool,
 	/// Run builds as sudo
 	#[clap(long)]
 	privileged_build: bool,
@@ -39,6 +43,9 @@
 	pub(crate) fn should_activate(&self) -> bool {
 		matches!(self, Self::Switch | Self::Test)
 	}
+	pub(crate) fn should_schedule_rollback_run(&self) -> bool {
+		matches!(self, Self::Switch | Self::Test)
+	}
 }
 
 enum PackageAction {
@@ -103,6 +110,62 @@
 	InstallationCd,
 }
 
+struct Generation {
+	id: u32,
+	current: bool,
+	datetime: String,
+}
+async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
+	let mut cmd = MyCommand::new("nix-env");
+	cmd.comparg("--profile", "/nix/var/nix/profiles/system")
+		.arg("--list-generations");
+	// Sudo is required due to --list-generations acquiring lock on the profile.
+	let data = config.run_string_on(&host, cmd, true).await?;
+	let generations = data
+		.split('\n')
+		.map(|e| e.trim())
+		.filter(|&l| l != "")
+		.filter_map(|g| {
+			let gen: Option<Generation> = try {
+				let mut parts = g.split_whitespace();
+				let id = parts.next()?;
+				let id: u32 = id.parse().ok()?;
+				let date = parts.next()?;
+				let time = parts.next()?;
+				let current = if let Some(current) = parts.next() {
+					if current == "(current)" {
+						Some(true)
+					} else {
+						None
+					}
+				} else {
+					Some(false)
+				};
+				let current = current?;
+				if parts.next().is_some() {
+					warn!("unexpected text after generation: {g}");
+				}
+				Generation {
+					id,
+					current,
+					datetime: format!("{date} {time}"),
+				}
+			};
+			if gen.is_none() {
+				warn!("bad generation: {g}")
+			}
+			gen
+		})
+		.collect::<Vec<_>>();
+	let current = generations
+		.into_iter()
+		.filter(|g| g.current)
+		.at_most_one()
+		.map_err(|_e| anyhow!("bad list-generations output"))?
+		.ok_or_else(|| anyhow!("failed to find generation"))?;
+	Ok(current)
+}
+
 impl BuildSystems {
 	async fn build_task(self, config: Config, host: String) -> Result<()> {
 		info!("building");
@@ -155,6 +218,7 @@
 					loop {
 						let mut nix = MyCommand::new("nix");
 						nix.arg("copy")
+							.arg("--substitute-on-destination")
 							.comparg("--to", format!("ssh://root@{host}"))
 							.arg(&built);
 						match nix.run_nix().await {
@@ -169,21 +233,107 @@
 					}
 				}
 				if let Some(action) = action {
-					if action.should_switch_profile() {
+					let mut failed = false;
+					// TODO: Lockfile, to prevent concurrent system switch?
+					// TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback
+					// is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to
+					// unit name conflict in systemd-run
+					if !self.disable_rollback {
+						let _span = info_span!("preparing").entered();
+						info!("preparing for rollback");
+						let generation = get_current_generation(&config, &host).await?;
+						info!(
+							"rollback target would be {} {}",
+							generation.id, generation.datetime
+						);
+						{
+							let mut cmd = MyCommand::new("sh");
+							cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
+							if let Err(e) = config.run_on(&host, cmd, true).await {
+								error!("failed to set rollback marker: {e}");
+								failed = true;
+							}
+						}
+						// Activation script also starts rollback-watchdog.timer, however, it is possible that it won't be started.
+						// Kicking it on manually will work best.
+						//
+						// There wouldn't be conflict, because here we trigger start of the primary service, and systemd will
+						// only allow one instance of it.
+						if action.should_schedule_rollback_run() {
+							let mut cmd = MyCommand::new("systemd-run");
+							cmd.comparg("--on-active", "3min")
+								.comparg("--unit", "rollback-watchdog-run")
+								.arg("systemctl")
+								.arg("start")
+								.arg("rollback-watchdog.service");
+							if let Err(e) = config.run_on(&host, cmd, true).await {
+								error!("failed to schedule rollback run: {e}");
+								failed = true;
+							}
+						}
+					}
+					if action.should_switch_profile() && !failed {
 						info!("switching generation");
 						let mut cmd = MyCommand::new("nix-env");
 						cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 							.comparg("--set", &built);
-						config.run_on(&host, cmd, true).await?;
+						if let Err(e) = config.run_on(&host, cmd, true).await {
+							error!("failed to switch generation: {e}");
+							failed = true;
+						}
 					}
-					if action.should_activate() {
+					if action.should_activate() && !failed {
+						let _span = info_span!("activating").entered();
 						info!("executing activation script");
 						let mut switch_script = built.clone();
 						switch_script.push("bin");
 						switch_script.push("switch-to-configuration");
 						let mut cmd = MyCommand::new(switch_script);
 						cmd.arg(action.name());
-						config.run_on(&host, cmd, true).await?;
+						if let Err(e) = config.run_on(&host, cmd, true).in_current_span().await {
+							error!("failed to activate: {e}");
+							failed = true;
+						}
+					}
+					if !self.disable_rollback {
+						{
+							let _span = info_span!("rollback").entered();
+							if failed {
+								info!("executing rollback");
+								let mut cmd = MyCommand::new("systemctl");
+								cmd.arg("start").arg("rollback-watchdog.service");
+								if let Err(e) = config.run_on(&host, cmd, true).await {
+									error!("failed to rollback: {e}");
+								}
+							} else {
+								info!("marking upgrade as successful");
+								let mut cmd = MyCommand::new("rm");
+								cmd.arg("-f").arg("/etc/fleet_rollback_marker");
+								if let Err(e) =
+									config.run_on(&host, cmd, true).in_current_span().await
+								{
+									error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
+								}
+							}
+						}
+						{
+							let _span = info_span!("disarm").entered();
+							info!("disarming watchdog, just in case");
+							{
+								let mut cmd = MyCommand::new("systemctl");
+								cmd.arg("stop").arg("rollback-watchdog.timer");
+								if let Err(_e) = config.run_on(&host, cmd, true).await {
+									// It is ok, if there was no reboot.
+								}
+							}
+							if action.should_schedule_rollback_run() {
+								let mut cmd = MyCommand::new("systemctl");
+								cmd.arg("stop").arg("rollback-watchdog-run.timer");
+								if let Err(e) = config.run_on(&host, cmd, true).await {
+									error!("failed to disarm rollback run: {e}");
+								}
+							}
+						}
 					}
 				}
 			}
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
--- a/cmds/fleet/src/command.rs
+++ b/cmds/fleet/src/command.rs
@@ -143,12 +143,14 @@
 
 	pub async fn run_nix_string(self) -> Result<String> {
 		let str = self.clone().into_string();
-		let cmd = self.into_command();
+		let mut cmd = self.into_command();
+		cmd.arg("--log-format").arg("internal-json");
 		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await
 	}
 	pub async fn run_nix(self) -> Result<()> {
 		let str = self.clone().into_string();
 		let mut cmd = self.into_command();
+		cmd.arg("--log-format").arg("internal-json");
 		cmd.stdout(Stdio::inherit());
 		run_nix_inner(str, cmd, &mut NixHandler::default()).await
 	}
@@ -410,7 +412,6 @@
 	handler: &mut dyn Handler,
 ) -> Result<Option<String>> {
 	info!("running {str}");
-	cmd.arg("--log-format").arg("internal-json");
 	cmd.stderr(Stdio::piped());
 	cmd.stdout(Stdio::piped());
 	let mut child = cmd.spawn()?;
modifiedcmds/fleet/src/main.rsdiffbeforeafterboth
before · cmds/fleet/src/main.rs
1pub mod cmds;2pub mod command;3pub mod host;4pub mod keys;56mod fleetdata;78use std::ffi::OsString;9use std::io;10use std::time::Duration;1112use anyhow::{anyhow, bail, Result};13use clap::Parser;1415use cmds::{build_systems::BuildSystems, info::Info, secrets::Secrets};16use host::{Config, FleetOpts};17use indicatif::{ProgressState, ProgressStyle};18use tokio::fs;19use tokio::process::Command;20use tracing::{info, metadata::LevelFilter};21use tracing_indicatif::IndicatifLayer;22use tracing_subscriber::{prelude::*, EnvFilter};2324#[derive(Parser)]25struct Prefetch {}26impl Prefetch {27	async fn run(&self, config: &Config) -> Result<()> {28		let mut prefetch_dir = config.directory.to_path_buf();29		prefetch_dir.push("prefetch");30		if !prefetch_dir.is_dir() {31			info!("nothing to prefetch: no prefetch directory");32			return Ok(());33		}34		for entry in std::fs::read_dir(&prefetch_dir)? {35			let entry = entry?;36			if !entry.metadata()?.is_file() {37				bail!("only files should exist in prefetch directory");38			}39			info!("prefetching {:?}", entry.file_name());40			let mut path = OsString::new();41			path.push("file://");42			path.push(entry.path());43			let status = Command::new("nix-prefetch-url").arg(path).status().await?;44			if !status.success() {45				bail!("failed with {status}");46			}47		}48		Ok(())49	}50}5152#[derive(Parser)]53enum Opts {54	/// Prepare systems for deployments55	BuildSystems(BuildSystems),56	/// Secret management57	#[clap(subcommand)]58	Secrets(Secrets),59	/// Upload prefetch directory to the nix store60	Prefetch(Prefetch),61	/// Config parsing62	Info(Info),63}6465#[derive(Parser)]66#[clap(version = "1.0", author)]67struct RootOpts {68	#[clap(flatten)]69	fleet_opts: FleetOpts,70	#[clap(subcommand)]71	command: Opts,72}7374async fn run_command(config: &Config, command: Opts) -> Result<()> {75	match command {76		Opts::BuildSystems(c) => c.run(config).await?,77		Opts::Secrets(s) => s.run(config).await?,78		Opts::Info(i) => i.run(config).await?,79		Opts::Prefetch(p) => p.run(config).await?,80	};81	Ok(())82}83fn elapsed_subsec(state: &ProgressState, writer: &mut dyn std::fmt::Write) {84	let _ = writer.write_str(&format!("{:?}", state.elapsed()));85}8687#[tokio::main]88async fn main() -> Result<()> {89	let indicatif_layer = IndicatifLayer::new().with_progress_style(90		ProgressStyle::with_template(91			"{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{pos:>7}/{len:7}{elapsed}{color_end}",92		)93		.unwrap()94		.with_key(95			"color_start",96			|state: &ProgressState, writer: &mut dyn std::fmt::Write| {97				let elapsed = state.elapsed();9899				if elapsed > Duration::from_secs(60) {100					// Red101					let _ = write!(writer, "\x1b[{}m", 1 + 30);102				} else if elapsed > Duration::from_secs(30) {103					// Yellow104					let _ = write!(writer, "\x1b[{}m", 3 + 30);105				}106			},107		)108		.with_key(109			"color_end",110			|state: &ProgressState, writer: &mut dyn std::fmt::Write| {111				if state.elapsed() > Duration::from_secs(30) {112					let _ = write!(writer, "\x1b[0m");113				}114			},115		),116	);117118	let filter = EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into());119120	tracing_subscriber::registry()121		.with(122			tracing_subscriber::fmt::layer()123				.without_time()124				.with_target(false)125				.with_writer(indicatif_layer.get_stderr_writer())126				.with_filter(filter), // .withou,127		)128		.with(indicatif_layer)129		.init();130	info!("Starting");131	let mut os_args = std::env::args_os();132	let opts = RootOpts::parse_from((&mut os_args).take_while(|v| v != "--"));133	let config = opts.fleet_opts.build(os_args.collect()).await?;134135	match run_command(&config, opts.command).await {136		Ok(()) => {137			config.save()?;138			Ok(())139		}140		Err(e) => {141			let _ = config.save();142			Err(e)143		}144	}145}
after · cmds/fleet/src/main.rs
1#![feature(try_blocks)]23pub mod cmds;4pub mod command;5pub mod host;6pub mod keys;78mod fleetdata;910use std::ffi::OsString;11use std::time::Duration;1213use anyhow::{bail, Result};14use clap::Parser;1516use cmds::{build_systems::BuildSystems, info::Info, secrets::Secrets};17use host::{Config, FleetOpts};18use indicatif::{ProgressState, ProgressStyle};19use tokio::process::Command;20use tracing::{info, metadata::LevelFilter};21use tracing_indicatif::IndicatifLayer;22use tracing_subscriber::{prelude::*, EnvFilter};2324#[derive(Parser)]25struct Prefetch {}26impl Prefetch {27	async fn run(&self, config: &Config) -> Result<()> {28		let mut prefetch_dir = config.directory.to_path_buf();29		prefetch_dir.push("prefetch");30		if !prefetch_dir.is_dir() {31			info!("nothing to prefetch: no prefetch directory");32			return Ok(());33		}34		for entry in std::fs::read_dir(&prefetch_dir)? {35			let entry = entry?;36			if !entry.metadata()?.is_file() {37				bail!("only files should exist in prefetch directory");38			}39			info!("prefetching {:?}", entry.file_name());40			let mut path = OsString::new();41			path.push("file://");42			path.push(entry.path());43			let status = Command::new("nix-prefetch-url").arg(path).status().await?;44			if !status.success() {45				bail!("failed with {status}");46			}47		}48		Ok(())49	}50}5152#[derive(Parser)]53enum Opts {54	/// Prepare systems for deployments55	BuildSystems(BuildSystems),56	/// Secret management57	#[clap(subcommand)]58	Secrets(Secrets),59	/// Upload prefetch directory to the nix store60	Prefetch(Prefetch),61	/// Config parsing62	Info(Info),63}6465#[derive(Parser)]66#[clap(version = "1.0", author)]67struct RootOpts {68	#[clap(flatten)]69	fleet_opts: FleetOpts,70	#[clap(subcommand)]71	command: Opts,72}7374async fn run_command(config: &Config, command: Opts) -> Result<()> {75	match command {76		Opts::BuildSystems(c) => c.run(config).await?,77		Opts::Secrets(s) => s.run(config).await?,78		Opts::Info(i) => i.run(config).await?,79		Opts::Prefetch(p) => p.run(config).await?,80	};81	Ok(())82}8384#[tokio::main]85async fn main() -> Result<()> {86	let indicatif_layer = IndicatifLayer::new().with_progress_style(87		ProgressStyle::with_template(88			"{color_start}{span_child_prefix} {span_name}{{{span_fields}}}{color_end} {wide_msg} {color_start}{pos:>7}/{len:7}{elapsed}{color_end}",89		)90		.unwrap()91		.with_key(92			"color_start",93			|state: &ProgressState, writer: &mut dyn std::fmt::Write| {94				let elapsed = state.elapsed();9596				if elapsed > Duration::from_secs(60) {97					// Red98					let _ = write!(writer, "\x1b[{}m", 1 + 30);99				} else if elapsed > Duration::from_secs(30) {100					// Yellow101					let _ = write!(writer, "\x1b[{}m", 3 + 30);102				}103			},104		)105		.with_key(106			"color_end",107			|state: &ProgressState, writer: &mut dyn std::fmt::Write| {108				if state.elapsed() > Duration::from_secs(30) {109					let _ = write!(writer, "\x1b[0m");110				}111			},112		),113	);114115	let filter = EnvFilter::from_default_env().add_directive(LevelFilter::INFO.into());116117	tracing_subscriber::registry()118		.with(119			tracing_subscriber::fmt::layer()120				.without_time()121				.with_target(false)122				.with_writer(indicatif_layer.get_stderr_writer())123				.with_filter(filter), // .withou,124		)125		.with(indicatif_layer)126		.init();127	info!("Starting");128	let mut os_args = std::env::args_os();129	let opts = RootOpts::parse_from((&mut os_args).take_while(|v| v != "--"));130	let config = opts.fleet_opts.build(os_args.collect()).await?;131132	match run_command(&config, opts.command).await {133		Ok(()) => {134			config.save()?;135			Ok(())136		}137		Err(e) => {138			let _ = config.save();139			Err(e)140		}141	}142}
modifiedcmds/install-secrets/Cargo.tomldiffbeforeafterboth
--- a/cmds/install-secrets/Cargo.toml
+++ b/cmds/install-secrets/Cargo.toml
@@ -9,7 +9,7 @@
 env_logger = "0.10.0"
 log = "0.4.14"
 nix = "0.26.1"
-serde = "1.0.130"
+serde = { version = "1.0.130", features = ["derive"] }
 serde_json = "1.0.89"
 clap = { version = "4.0.29", features = [
 	"derive",
modifiednixos/modules/module-list.nixdiffbeforeafterboth
--- a/nixos/modules/module-list.nix
+++ b/nixos/modules/module-list.nix
@@ -2,4 +2,5 @@
   ../fleetPkgs.nix
   ../meta.nix
   ../secrets.nix
+  ../rollback.nix
 ]
addednixos/rollback.nixdiffbeforeafterboth
--- /dev/null
+++ b/nixos/rollback.nix
@@ -0,0 +1,45 @@
+{config, ...}: {
+  # TODO: Make it work with systemd-initrd approach.
+  # In this case we can't just switch generation and re-run activation script, since the root filesystem might not be
+  # mounted yet. We need to explicitly remove the last generation, and this needs deeper integration with systemd/grub/
+  # whatever user uses. boot.json also might help here.
+
+  systemd.services.rollback-watchdog = {
+    description = "Rollback watchdog";
+    script = ''
+      set -eu
+      if [ -f /etc/fleet_rollback_marker ]; then
+        echo "found the rollback marker, switching to older generation"
+        target=$(cat /etc/fleet_rollback_marker)
+        echo "rolling back profile"
+        nix profile rollback --profile /nix/var/nix/profiles/system --to "$target"
+        echo "executing activation script"
+        "/nix/var/nix/profiles/system-$target-link/bin/switch-to-configuration" switch
+        echo "removing rollback marker"
+        rm -f /etc/fleet_rollback_marker
+      else
+        echo "rollback marker was removed, upgrade is succeeded"
+      fi
+    '';
+    path = [
+      # Should have nix-command support
+      config.nix.package
+    ];
+    serviceConfig.Type = "exec";
+    unitConfig = {
+      X-StopOnRemoval = false;
+    };
+  };
+
+  systemd.timers.rollback-watchdog = {
+    description = "Timer for rollback watchdog";
+    wantedBy = ["timers.target"];
+    timerConfig = {
+      OnUnitActiveSec = "3min";
+      RemainAfterElapse = false;
+    };
+    unitConfig = {
+      ConditionPathExists = "/etc/fleet_rollback_marker";
+    };
+  };
+}
modifiedpkgs/fleet-install-secrets.nixdiffbeforeafterboth
--- a/pkgs/fleet-install-secrets.nix
+++ b/pkgs/fleet-install-secrets.nix
@@ -6,7 +6,7 @@
   name = "${pname}-${version}";
 
   src = ../.;
-  cargoBuildFlags = "-p ${pname}";
+  buildAndTestSubdir = "cmds/install-secrets";
   cargoLock = {
     lockFile = ../Cargo.lock;
     outputHashes = {