git.delta.rocks / jrsonnet / refs/commits / 741106e60111

difftreelog

feat automatic rollback

Yaroslav Bolyukin2023-10-15parent: #4340a04.patch.diff
in: trunk

10 files changed

modifiedCargo.lockdiffbeforeafterboth
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -610,6 +610,12 @@
 ]
 
 [[package]]
+name = "either"
+version = "1.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07"
+
+[[package]]
 name = "encode_unicode"
 version = "0.3.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -684,6 +690,7 @@
  "futures",
  "hostname",
  "indicatif",
+ "itertools",
  "nixlike",
  "once_cell",
  "peg",
@@ -1127,6 +1134,15 @@
 ]
 
 [[package]]
+name = "itertools"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
+dependencies = [
+ "either",
+]
+
+[[package]]
 name = "itoa"
 version = "1.0.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,2 +1,3 @@
 [workspace]
 members = ["crates/*", "cmds/*"]
+resolver = "2"
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -34,3 +34,4 @@
 futures = "0.3.17"
 tracing-indicatif = "0.3.5"
 indicatif = "0.17.7"
+itertools = "0.11.0"
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -2,8 +2,9 @@
 
 use crate::command::MyCommand;
 use crate::host::Config;
-use anyhow::Result;
+use anyhow::{anyhow, Result};
 use clap::Parser;
+use itertools::Itertools;
 use tokio::{task::LocalSet, time::sleep};
 use tracing::{error, field, info, info_span, warn, Instrument};
 
@@ -12,6 +13,9 @@
 	/// Do not continue on error
 	#[clap(long)]
 	fail_fast: bool,
+	/// Disable automatic rollback
+	#[clap(long)]
+	disable_rollback: bool,
 	/// Run builds as sudo
 	#[clap(long)]
 	privileged_build: bool,
@@ -39,6 +43,9 @@
 	pub(crate) fn should_activate(&self) -> bool {
 		matches!(self, Self::Switch | Self::Test)
 	}
+	pub(crate) fn should_schedule_rollback_run(&self) -> bool {
+		matches!(self, Self::Switch | Self::Test)
+	}
 }
 
 enum PackageAction {
@@ -103,6 +110,62 @@
 	InstallationCd,
 }
 
+struct Generation {
+	id: u32,
+	current: bool,
+	datetime: String,
+}
+async fn get_current_generation(config: &Config, host: &str) -> Result<Generation> {
+	let mut cmd = MyCommand::new("nix-env");
+	cmd.comparg("--profile", "/nix/var/nix/profiles/system")
+		.arg("--list-generations");
+	// Sudo is required due to --list-generations acquiring lock on the profile.
+	let data = config.run_string_on(&host, cmd, true).await?;
+	let generations = data
+		.split('\n')
+		.map(|e| e.trim())
+		.filter(|&l| l != "")
+		.filter_map(|g| {
+			let gen: Option<Generation> = try {
+				let mut parts = g.split_whitespace();
+				let id = parts.next()?;
+				let id: u32 = id.parse().ok()?;
+				let date = parts.next()?;
+				let time = parts.next()?;
+				let current = if let Some(current) = parts.next() {
+					if current == "(current)" {
+						Some(true)
+					} else {
+						None
+					}
+				} else {
+					Some(false)
+				};
+				let current = current?;
+				if parts.next().is_some() {
+					warn!("unexpected text after generation: {g}");
+				}
+				Generation {
+					id,
+					current,
+					datetime: format!("{date} {time}"),
+				}
+			};
+			if gen.is_none() {
+				warn!("bad generation: {g}")
+			}
+			gen
+		})
+		.collect::<Vec<_>>();
+	let current = generations
+		.into_iter()
+		.filter(|g| g.current)
+		.at_most_one()
+		.map_err(|_e| anyhow!("bad list-generations output"))?
+		.ok_or_else(|| anyhow!("failed to find generation"))?;
+	Ok(current)
+}
+
 impl BuildSystems {
 	async fn build_task(self, config: Config, host: String) -> Result<()> {
 		info!("building");
@@ -155,6 +218,7 @@
 					loop {
 						let mut nix = MyCommand::new("nix");
 						nix.arg("copy")
+							.arg("--substitute-on-destination")
 							.comparg("--to", format!("ssh://root@{host}"))
 							.arg(&built);
 						match nix.run_nix().await {
@@ -169,21 +233,107 @@
 					}
 				}
 				if let Some(action) = action {
-					if action.should_switch_profile() {
+					let mut failed = false;
+					// TODO: Lockfile, to prevent concurrent system switch?
+					// TODO: If rollback target exists - bail, it should be removed. Lockfile will not work in case if rollback
+					// is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to
+					// unit name conflict in systemd-run
+					if !self.disable_rollback {
+						let _span = info_span!("preparing").entered();
+						info!("preparing for rollback");
+						let generation = get_current_generation(&config, &host).await?;
+						info!(
+							"rollback target would be {} {}",
+							generation.id, generation.datetime
+						);
+						{
+							let mut cmd = MyCommand::new("sh");
+							cmd.arg("-c").arg(format!("mark=$(mktemp -p /etc -t fleet_rollback_marker.XXXXX) && echo -n {} > $mark && mv --no-clobber $mark /etc/fleet_rollback_marker", generation.id));
+							if let Err(e) = config.run_on(&host, cmd, true).await {
+								error!("failed to set rollback marker: {e}");
+								failed = true;
+							}
+						}
+						// Activation script also starts rollback-watchdog.timer, however, it is possible that it won't be started.
+						// Kicking it on manually will work best.
+						//
+						// There wouldn't be conflict, because here we trigger start of the primary service, and systemd will
+						// only allow one instance of it.
+						if action.should_schedule_rollback_run() {
+							let mut cmd = MyCommand::new("systemd-run");
+							cmd.comparg("--on-active", "3min")
+								.comparg("--unit", "rollback-watchdog-run")
+								.arg("systemctl")
+								.arg("start")
+								.arg("rollback-watchdog.service");
+							if let Err(e) = config.run_on(&host, cmd, true).await {
+								error!("failed to schedule rollback run: {e}");
+								failed = true;
+							}
+						}
+					}
+					if action.should_switch_profile() && !failed {
 						info!("switching generation");
 						let mut cmd = MyCommand::new("nix-env");
 						cmd.comparg("--profile", "/nix/var/nix/profiles/system")
 							.comparg("--set", &built);
-						config.run_on(&host, cmd, true).await?;
+						if let Err(e) = config.run_on(&host, cmd, true).await {
+							error!("failed to switch generation: {e}");
+							failed = true;
+						}
 					}
-					if action.should_activate() {
+					if action.should_activate() && !failed {
+						let _span = info_span!("activating").entered();
 						info!("executing activation script");
 						let mut switch_script = built.clone();
 						switch_script.push("bin");
 						switch_script.push("switch-to-configuration");
 						let mut cmd = MyCommand::new(switch_script);
 						cmd.arg(action.name());
-						config.run_on(&host, cmd, true).await?;
+						if let Err(e) = config.run_on(&host, cmd, true).in_current_span().await {
+							error!("failed to activate: {e}");
+							failed = true;
+						}
+					}
+					if !self.disable_rollback {
+						{
+							let _span = info_span!("rollback").entered();
+							if failed {
+								info!("executing rollback");
+								let mut cmd = MyCommand::new("systemctl");
+								cmd.arg("start").arg("rollback-watchdog.service");
+								if let Err(e) = config.run_on(&host, cmd, true).await {
+									error!("failed to rollback: {e}");
+								}
+							} else {
+								info!("marking upgrade as successful");
+								let mut cmd = MyCommand::new("rm");
+								cmd.arg("-f").arg("/etc/fleet_rollback_marker");
+								if let Err(e) =
+									config.run_on(&host, cmd, true).in_current_span().await
+								{
+									error!("failed to remove rollback marker. This is bad, as the system will be rolled back by watchdog: {e}")
+								}
+							}
+						}
+						{
+							let _span = info_span!("disarm").entered();
+							info!("disarming watchdog, just in case");
+							{
+								let mut cmd = MyCommand::new("systemctl");
+								cmd.arg("stop").arg("rollback-watchdog.timer");
+								if let Err(_e) = config.run_on(&host, cmd, true).await {
+									// It is ok, if there was no reboot.
+								}
+							}
+							if action.should_schedule_rollback_run() {
+								let mut cmd = MyCommand::new("systemctl");
+								cmd.arg("stop").arg("rollback-watchdog-run.timer");
+								if let Err(e) = config.run_on(&host, cmd, true).await {
+									error!("failed to disarm rollback run: {e}");
+								}
+							}
+						}
 					}
 				}
 			}
modifiedcmds/fleet/src/command.rsdiffbeforeafterboth
after · cmds/fleet/src/command.rs
1use std::{collections::HashMap, ffi::OsStr, process::Stdio, task::Poll};23use anyhow::{Context, Result};4use futures::StreamExt;5use serde::{6	de::{DeserializeOwned, Visitor},7	Deserialize,8};9use tokio::{io::AsyncRead, process::Command, select};10use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};11use tracing::{info, info_span, warn, Span};12use tracing_indicatif::span_ext::IndicatifSpanExt;1314fn escape_bash(input: &str, out: &mut String) {15	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17		out.push_str(input);18		return;19	}20	out.push('\'');21	for (i, v) in input.split('\'').enumerate() {22		if i != 0 {23			out.push_str("'\"'\"'");24		}25		out.push_str(v);26	}27	out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30	os.as_ref().to_str().expect("non-utf8 data").to_owned()31}32#[derive(Clone)]33pub struct MyCommand {34	command: String,35	args: Vec<String>,36	env: Vec<(String, String)>,37}38impl MyCommand {39	pub fn new(cmd: impl AsRef<OsStr>) -> Self {40		assert!(!cmd.as_ref().is_empty());41		Self {42			command: ostoutf8(cmd),43			args: vec![],44			env: vec![],45		}46	}47	fn into_args(self) -> Vec<String> {48		let mut out = Vec::new();49		if !self.env.is_empty() {50			out.push("env".to_owned());51			for (k, v) in self.env {52				assert!(!k.contains("="));53				out.push(format!("{k}={v}"));54			}55		}56		out.push(self.command);57		out.extend(self.args.into_iter());58		out59	}60	fn into_string(self) -> String {61		let mut out = String::new();62		if !self.env.is_empty() {63			out.push_str("env");64			for (k, v) in self.env {65				out.push(' ');66				assert!(!k.contains("="));67				escape_bash(&k, &mut out);68				out.push('=');69				escape_bash(&v, &mut out);70			}71		}72		if !out.is_empty() {73			out.push(' ');74		}75		escape_bash(&self.command, &mut out);76		for arg in self.args {77			out.push(' ');78			escape_bash(&arg, &mut out);79		}80		out81	}82	fn into_command(self) -> Command {83		let mut out = Command::new(self.command);84		out.args(self.args);85		for (k, v) in self.env {86			out.env(k, v);87		}88		out89	}90	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {91		let arg = arg.as_ref();92		self.args.push(ostoutf8(arg));93		self94	}95	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {96		let arg = arg.as_ref();97		let value = value.as_ref();98		let arg = ostoutf8(arg);99		let value = ostoutf8(value);100		self.arg(format!("{arg}={value}"));101		self102	}103	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {104		self.arg(arg);105		self.arg(value);106		self107	}108	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {109		for arg in args.into_iter() {110			let arg = arg.as_ref();111			self.args.push(ostoutf8(arg));112		}113		self114	}115	pub fn sudo(self) -> Self {116		let mut out = Self::new("sudo");117		out.args(self.into_args());118		out119	}120	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {121		let mut out = Self::new("ssh");122		out.arg(on).arg("--");123		out.arg(self.into_string());124		out125	}126127	pub async fn run(self) -> Result<()> {128		let str = self.clone().into_string();129		let cmd = self.into_command();130		run_nix_inner(str, cmd, &mut PlainHandler).await?;131		Ok(())132	}133	pub async fn run_string(self) -> Result<String> {134		let str = self.clone().into_string();135		let cmd = self.into_command();136		let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;137		Ok(v)138	}139	pub async fn run_nix_json<T: DeserializeOwned>(self) -> Result<T> {140		let str = self.run_nix_string().await?;141		serde_json::from_str(&str).with_context(|| format!("{:?}", str))142	}143144	pub async fn run_nix_string(self) -> Result<String> {145		let str = self.clone().into_string();146		let mut cmd = self.into_command();147		cmd.arg("--log-format").arg("internal-json");148		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await149	}150	pub async fn run_nix(self) -> Result<()> {151		let str = self.clone().into_string();152		let mut cmd = self.into_command();153		cmd.arg("--log-format").arg("internal-json");154		cmd.stdout(Stdio::inherit());155		run_nix_inner(str, cmd, &mut NixHandler::default()).await156	}157}158159struct EmptyAsyncRead;160impl AsyncRead for EmptyAsyncRead {161	fn poll_read(162		self: std::pin::Pin<&mut Self>,163		_cx: &mut std::task::Context<'_>,164		_buf: &mut tokio::io::ReadBuf<'_>,165	) -> Poll<std::io::Result<()>> {166		Poll::Pending167	}168}169170async fn run_nix_inner_stdout(171	str: String,172	cmd: Command,173	handler: &mut dyn Handler,174) -> Result<String> {175	Ok(run_nix_inner_raw(str, cmd, true, handler)176		.await?177		.expect("has out"))178}179async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {180	let v = run_nix_inner_raw(str, cmd, false, handler).await?;181	assert!(v.is_none());182	Ok(())183}184185trait Handler {186	fn handle_err(&mut self, e: &str);187	fn handle_info(&mut self, e: &str);188}189190struct PlainHandler;191impl Handler for PlainHandler {192	fn handle_err(&mut self, e: &str) {193		info!(target: "log", "{e}");194	}195196	fn handle_info(&mut self, e: &str) {197		info!(target: "log", "{e}");198	}199}200201#[derive(Default)]202struct NixHandler {203	spans: HashMap<u64, Span>,204}205impl Handler for NixHandler {206	fn handle_err(&mut self, e: &str) {207		if let Some(e) = e.strip_prefix("@nix ") {208			let log: NixLog = match serde_json::from_str(e) {209				Ok(l) => l,210				Err(err) => {211					warn!("failed to parse nix log line {:?}: {}", e, err);212					return;213				}214			};215			match log {216				NixLog::Msg { msg, raw_msg, .. } => {217					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))218					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")219					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {220						if let Some(raw_msg) = raw_msg {221							if !msg.is_empty() {222								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())223							} else {224								info!(target: "nix", "{}", raw_msg.trim_end())225							}226						} else {227							info!(target: "nix", "{}", msg.trim_end())228						}229					}230				}231				NixLog::Start {232					ref fields,233					typ,234					id,235					..236				} if typ == 105 && !fields.is_empty() => {237					if let [LogField::String(drv), ..] = &fields[..] {238						let mut drv = drv.as_str();239						if let Some(pkg) = drv.strip_prefix("/nix/store/") {240							let mut it = pkg.splitn(2, '-');241							it.next();242							if let Some(pkg) = it.next() {243								drv = pkg;244							}245						}246						info!(target: "nix","building {}", drv);247						let span = info_span!("build", drv);248						span.pb_start();249						self.spans.insert(id, span);250					} else {251						warn!("bad build log: {:?}", log)252					}253				}254				NixLog::Start {255					ref fields,256					typ,257					id,258					..259				} if typ == 100 && fields.len() >= 3 => {260					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =261						&fields[..]262					{263						let mut drv = drv.as_str();264265						if let Some(pkg) = drv.strip_prefix("/nix/store/") {266							let mut it = pkg.splitn(2, '-');267							it.next();268							if let Some(pkg) = it.next() {269								drv = pkg;270							}271						}272						info!(target: "nix","copying {} {} -> {}", drv, from, to);273						let span = info_span!("copy", from, to, drv);274						span.pb_start();275						self.spans.insert(id, span);276					} else {277						warn!("bad copy log: {:?}", log)278					}279				}280				NixLog::Start { text, typ, id, .. }281					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>282				{283					if !text.is_empty()284						&& text != "querying info about missing paths"285						&& text != "copying 0 paths"286					{287						let span = info_span!("job");288						span.pb_start();289						span.pb_set_message(text.trim());290						self.spans.insert(id, span);291						info!(target: "nix", "{}", text);292					}293				}294				NixLog::Start {295					text,296					level: 0,297					typ: 108,298					..299				} if text.is_empty() => {300					// Cache lookup? Coupled with copy log301				}302				NixLog::Start {303					text,304					level: 4,305					typ: 109,306					..307				} if text.starts_with("querying info about ") => {308					// Cache lookup309				}310				NixLog::Start {311					text,312					level: 4,313					typ: 101,314					..315				} if text.starts_with("downloading ") => {316					// NAR downloading, coupled with copy log317				}318				NixLog::Start {319					text,320					level: 1,321					typ: 111,322					..323				} if text.starts_with("waiting for a machine to build ") => {324					// Useless repeating notification about build325				}326				NixLog::Start {327					text,328					level: 3,329					typ: 111,330					..331				} if text.starts_with("resolved derivation: ") => {332					// CA resolved333				}334				NixLog::Start {335					text,336					level: 1,337					typ: 111,338					id,339					..340				} if text.starts_with("waiting for lock on ") => {341					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();342					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {343						drv = txt;344					}345					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {346						drv = txt;347					}348					if let Some(txt) = drv.split("', '").next() {349						drv = txt;350					}351					if let Some(pkg) = drv.strip_prefix("/nix/store/") {352						let mut it = pkg.splitn(2, '-');353						it.next();354						if let Some(pkg) = it.next() {355							drv = pkg;356						}357					}358					let span = info_span!("waiting on drv", drv);359					span.pb_start();360					self.spans.insert(id, span);361					// Concurrent build of the same message362				}363				NixLog::Stop { id, .. } => {364					self.spans.remove(&id);365				}366				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {367					if let Some(span) = self.spans.get(&id) {368						if let LogField::String(s) = &fields[0] {369							span.pb_set_message(s.trim());370						} else {371							warn!("bad fields: {fields:?}");372						}373					} else {374						warn!("unknown result id: {id} {typ} {fields:?}");375					}376					// dbg!(fields, id, typ);377				}378				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {379					if let Some(span) = self.spans.get(&id) {380						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =381							&fields[..4]382						{383							span.pb_set_length(*expected);384							span.pb_set_position(*done);385						} else {386							warn!("bad fields: {fields:?}");387						}388					} else {389						// warn!("unknown result id: {id} {typ} {fields:?}");390						// Unaccounted progress.391					}392					// dbg!(fields, id, typ);393				}394				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {395					// Set phase, expected396				}397				_ => warn!("unknown log: {:?}", log),398			};399		} else {400			warn!(target = "nix", "unknown: {}", e.trim())401		}402	}403	fn handle_info(&mut self, o: &str) {404		self.handle_err(o)405	}406}407408async fn run_nix_inner_raw(409	str: String,410	mut cmd: Command,411	want_stdout: bool,412	handler: &mut dyn Handler,413) -> Result<Option<String>> {414	info!("running {str}");415	cmd.stderr(Stdio::piped());416	cmd.stdout(Stdio::piped());417	let mut child = cmd.spawn()?;418	let mut stderr = child.stderr.take().unwrap();419	let stdout = child.stdout.take().unwrap();420	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());421	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));422	let mut ob = want_stdout423		.then(|| out.take().unwrap())424		.unwrap_or_else(|| Box::new(EmptyAsyncRead));425	let mut ol = (!want_stdout)426		.then(|| out.take().unwrap())427		.unwrap_or_else(|| Box::new(EmptyAsyncRead));428	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());429	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());430431	// while let Some(line) = read.next().await? {}432433	let mut out_buf = if want_stdout { Some(vec![]) } else { None };434	loop {435		select! {436			e = err.next() => {437				if let Some(e) = e {438					let e = e?;439					handler.handle_err(&e);440				}441			},442			o = ob.next() => {443				if let Some(o) = o {444					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);445				}446			},447			o = ol.next() => {448				if let Some(o) = o {449					let o = o?;450					handler.handle_info(&o);451				}452			},453			code = child.wait() => {454				let code = code?;455				if !code.success() {456					anyhow::bail!("command '{str}' failed with status {}", code);457				}458				break;459			}460		}461	}462463	Ok(out_buf.map(String::from_utf8).transpose()?)464}465466#[derive(Debug)]467enum LogField {468	String(String),469	Num(u64),470}471472impl<'de> Deserialize<'de> for LogField {473	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>474	where475		D: serde::Deserializer<'de>,476	{477		struct StringOrNum;478		impl<'de> Visitor<'de> for StringOrNum {479			type Value = LogField;480481			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {482				write!(f, "string or unsigned")483			}484485			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>486			where487				E: serde::de::Error,488			{489				Ok(LogField::String(v.to_owned()))490			}491492			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>493			where494				E: serde::de::Error,495			{496				Ok(LogField::Num(v))497			}498		}499500		deserializer.deserialize_any(StringOrNum)501	}502}503504#[derive(Deserialize, Debug)]505#[serde(rename_all = "camelCase", tag = "action")]506#[allow(dead_code)]507enum NixLog {508	Msg {509		level: u32,510		msg: String,511		raw_msg: Option<String>,512	},513	Start {514		id: u64,515		level: u32,516		#[serde(default)]517		fields: Vec<LogField>,518		text: String,519		#[serde(rename = "type")]520		typ: u32,521	},522	Stop {523		id: u64,524	},525	Result {526		id: u64,527		#[serde(rename = "type")]528		typ: u32,529		#[serde(default)]530		fields: Vec<LogField>,531	},532}
modifiedcmds/fleet/src/main.rsdiffbeforeafterboth
--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -1,3 +1,5 @@
+#![feature(try_blocks)]
+
 pub mod cmds;
 pub mod command;
 pub mod host;
@@ -6,16 +8,14 @@
 mod fleetdata;
 
 use std::ffi::OsString;
-use std::io;
 use std::time::Duration;
 
-use anyhow::{anyhow, bail, Result};
+use anyhow::{bail, Result};
 use clap::Parser;
 
 use cmds::{build_systems::BuildSystems, info::Info, secrets::Secrets};
 use host::{Config, FleetOpts};
 use indicatif::{ProgressState, ProgressStyle};
-use tokio::fs;
 use tokio::process::Command;
 use tracing::{info, metadata::LevelFilter};
 use tracing_indicatif::IndicatifLayer;
@@ -79,9 +79,6 @@
 		Opts::Prefetch(p) => p.run(config).await?,
 	};
 	Ok(())
-}
-fn elapsed_subsec(state: &ProgressState, writer: &mut dyn std::fmt::Write) {
-	let _ = writer.write_str(&format!("{:?}", state.elapsed()));
 }
 
 #[tokio::main]
modifiedcmds/install-secrets/Cargo.tomldiffbeforeafterboth
--- a/cmds/install-secrets/Cargo.toml
+++ b/cmds/install-secrets/Cargo.toml
@@ -9,7 +9,7 @@
 env_logger = "0.10.0"
 log = "0.4.14"
 nix = "0.26.1"
-serde = "1.0.130"
+serde = { version = "1.0.130", features = ["derive"] }
 serde_json = "1.0.89"
 clap = { version = "4.0.29", features = [
 	"derive",
modifiednixos/modules/module-list.nixdiffbeforeafterboth
--- a/nixos/modules/module-list.nix
+++ b/nixos/modules/module-list.nix
@@ -2,4 +2,5 @@
   ../fleetPkgs.nix
   ../meta.nix
   ../secrets.nix
+  ../rollback.nix
 ]
addednixos/rollback.nixdiffbeforeafterboth
--- /dev/null
+++ b/nixos/rollback.nix
@@ -0,0 +1,45 @@
+{config, ...}: {
+  # TODO: Make it work with systemd-initrd approach.
+  # In this case we can't just switch generation and re-run activation script, since the root filesystem might not be
+  # mounted yet. We need to explicitly remove the last generation, and this needs deeper integration with systemd/grub/
+  # whatever user uses. boot.json also might help here.
+
+  systemd.services.rollback-watchdog = {
+    description = "Rollback watchdog";
+    script = ''
+      set -eu
+      if [ -f /etc/fleet_rollback_marker ]; then
+        echo "found the rollback marker, switching to older generation"
+        target=$(cat /etc/fleet_rollback_marker)
+        echo "rolling back profile"
+        nix profile rollback --profile /nix/var/nix/profiles/system --to "$target"
+        echo "executing activation script"
+        "/nix/var/nix/profiles/system-$target-link/bin/switch-to-configuration" switch
+        echo "removing rollback marker"
+        rm -f /etc/fleet_rollback_marker
+      else
+        echo "rollback marker was removed, upgrade is succeeded"
+      fi
+    '';
+    path = [
+      # Should have nix-command support
+      config.nix.package
+    ];
+    serviceConfig.Type = "exec";
+    unitConfig = {
+      X-StopOnRemoval = false;
+    };
+  };
+
+  systemd.timers.rollback-watchdog = {
+    description = "Timer for rollback watchdog";
+    wantedBy = ["timers.target"];
+    timerConfig = {
+      OnUnitActiveSec = "3min";
+      RemainAfterElapse = false;
+    };
+    unitConfig = {
+      ConditionPathExists = "/etc/fleet_rollback_marker";
+    };
+  };
+}
modifiedpkgs/fleet-install-secrets.nixdiffbeforeafterboth
--- a/pkgs/fleet-install-secrets.nix
+++ b/pkgs/fleet-install-secrets.nix
@@ -6,7 +6,7 @@
   name = "${pname}-${version}";
 
   src = ../.;
-  cargoBuildFlags = "-p ${pname}";
+  buildAndTestSubdir = "cmds/install-secrets";
   cargoLock = {
     lockFile = ../Cargo.lock;
     outputHashes = {