git.delta.rocks / jrsonnet / refs/commits / c0c9b96f77be

difftreelog

refactor split build-systems and deploy commands

Yaroslav Bolyukin2024-01-05parent: #718d88b.patch.diff
in: trunk

10 files changed

modifiedCargo.tomldiffbeforeafterboth
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -5,3 +5,5 @@
 [workspace.dependencies]
 nixlike = { path = "./crates/nixlike" }
 better-command = { path = "./crates/better-command" }
+uuid = { version = "1.3.3", features = ["v4"] }
+tokio = { version = "1.33.0", features = ["fs", "rt", "macros", "sync", "time", "rt-multi-thread"] }
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -8,6 +8,7 @@
 [dependencies]
 nixlike.workspace = true
 better-command.workspace = true
+tokio.workspace = true
 anyhow = "1.0"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
@@ -27,7 +28,6 @@
 	"wrap_help",
 	"unicode",
 ] }
-tokio = { version = "1.33.0", features = ["full"] }
 tracing = "0.1"
 tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }
 tokio-util = { version = "0.7.10", features = ["codec"] }
modifiedcmds/fleet/src/better_nix_eval.rsdiffbeforeafterboth
--- a/cmds/fleet/src/better_nix_eval.rs
+++ b/cmds/fleet/src/better_nix_eval.rs
@@ -428,6 +428,7 @@
 		self.used_fields.extend(e.used_fields);
 	}
 
+	#[allow(dead_code)]
 	pub fn session(&self) -> NixSession {
 		let mut session = None;
 		for ele in &self.used_fields {
@@ -444,6 +445,7 @@
 		}
 		session.expect("expr without fields used")
 	}
+	#[allow(dead_code)]
 	pub fn index_attr(&mut self, s: &str) {
 		let escaped = nixlike::serialize(s).expect("string");
 		self.out.push('.');
@@ -559,7 +561,9 @@
 pub enum Index {
 	Var(String),
 	String(String),
+	#[allow(dead_code)]
 	Apply(String),
+	#[allow(dead_code)]
 	Expr(NixExprBuilder),
 	ExprApply(NixExprBuilder),
 	Pipe(NixExprBuilder),
@@ -576,6 +580,7 @@
 	pub fn attr(v: impl AsRef<str>) -> Self {
 		Self::String(v.as_ref().to_owned())
 	}
+	#[allow(dead_code)]
 	pub fn apply(v: impl Serialize) -> Self {
 		let serialized = nixlike::serialize(v).expect("invalid value for apply");
 		Self::Apply(serialized.trim_end().to_owned())
@@ -749,6 +754,7 @@
 			.await
 			.with_context(|| context("as_json", self.0.full_path.as_deref(), &query))
 	}
+	#[allow(dead_code)]
 	pub async fn has_field(&self, name: &str) -> Result<bool> {
 		let id = self.0.value.expect("can't list root fields");
 		let key = nixlike::escape_string(name);
@@ -786,6 +792,7 @@
 			.await
 			.with_context(|| context("type_of", self.0.full_path.as_deref(), &query))
 	}
+	#[allow(dead_code)]
 	pub async fn import(&self) -> Result<Self> {
 		let import = Self::new(self.0.session.clone(), "import").await?;
 		Ok(nix_go!(self | import))
modifiedcmds/fleet/src/cmds/build_systems.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/build_systems.rs
+++ b/cmds/fleet/src/cmds/build_systems.rs
@@ -6,34 +6,40 @@
 use crate::host::{Config, ConfigHost};
 use crate::nix_go;
 use anyhow::{anyhow, Result};
-use clap::Parser;
+use clap::{Parser, ValueEnum};
 use itertools::Itertools as _;
 use tokio::{task::LocalSet, time::sleep};
 use tracing::{error, field, info, info_span, warn, Instrument};
 
-#[derive(Parser, Clone)]
-pub struct BuildSystems {
+#[derive(Parser)]
+pub struct Deploy {
 	/// Disable automatic rollback
 	#[clap(long)]
 	disable_rollback: bool,
-	#[clap(subcommand)]
-	subcommand: Subcommand,
+	action: DeployAction,
 }
 
-enum UploadAction {
+#[derive(ValueEnum, Clone, Copy)]
+enum DeployAction {
+	/// Upload derivation, but do not execute the update.
+	Upload,
+	/// Upload and execute the activation script, old version will be used after reboot.
 	Test,
+	/// Upload and set as current system profile, but do not execute activation script.
 	Boot,
+	/// Upload, set current profile, and execute activation script.
 	Switch,
 }
-impl UploadAction {
-	fn name(&self) -> &'static str {
+
+impl DeployAction {
+	pub(crate) fn name(&self) -> Option<&'static str> {
 		match self {
-			UploadAction::Test => "test",
-			UploadAction::Boot => "boot",
-			UploadAction::Switch => "switch",
+			DeployAction::Upload => None,
+			DeployAction::Test => Some("test"),
+			DeployAction::Boot => Some("boot"),
+			DeployAction::Switch => Some("switch"),
 		}
 	}
-
 	pub(crate) fn should_switch_profile(&self) -> bool {
 		matches!(self, Self::Switch | Self::Boot)
 	}
@@ -42,69 +48,15 @@
 	}
 	pub(crate) fn should_schedule_rollback_run(&self) -> bool {
 		matches!(self, Self::Switch | Self::Test)
-	}
-}
-
-enum PackageAction {
-	SdImage,
-	InstallationCd,
-}
-impl PackageAction {
-	fn build_attr(&self) -> String {
-		match self {
-			PackageAction::SdImage => "sdImage".to_owned(),
-			PackageAction::InstallationCd => "isoImage".to_owned(),
-		}
-	}
-}
-
-enum Action {
-	Upload { action: Option<UploadAction> },
-	Package(PackageAction),
-}
-impl Action {
-	fn build_attr(&self) -> String {
-		match self {
-			Action::Upload { .. } => "toplevel".to_owned(),
-			Action::Package(p) => p.build_attr(),
-		}
 	}
 }
 
-impl From<Subcommand> for Action {
-	fn from(s: Subcommand) -> Self {
-		match s {
-			Subcommand::Upload => Self::Upload { action: None },
-			Subcommand::Test => Self::Upload {
-				action: Some(UploadAction::Test),
-			},
-			Subcommand::Boot => Self::Upload {
-				action: Some(UploadAction::Boot),
-			},
-			Subcommand::Switch => Self::Upload {
-				action: Some(UploadAction::Switch),
-			},
-			Subcommand::SdImage => Self::Package(PackageAction::SdImage),
-			Subcommand::InstallationCd => Self::Package(PackageAction::InstallationCd),
-		}
-	}
-}
-
 #[derive(Parser, Clone)]
-enum Subcommand {
-	/// Upload, but do not switch
-	Upload,
-	/// Upload + switch to built system until reboot
-	Test,
-	/// Upload + switch to built system after reboot
-	Boot,
-	/// Upload + test + boot
-	Switch,
-
-	/// Build SD .img image
-	SdImage,
-	/// Build an installation cd ISO image
-	InstallationCd,
+pub struct BuildSystems {
+	/// Attribute to build. Systems are deployed from "toplevel" attr, well-known used attributes
+	/// are "sdImage"/"isoImage", and your configuration may include any other build attributes.
+	#[clap(long, default_value = "toplevel")]
+	build_attr: String,
 }
 
 struct Generation {
@@ -163,11 +115,11 @@
 	Ok(current)
 }
 
-async fn execute_upload(
-	build: &BuildSystems,
-	action: UploadAction,
+async fn deploy_task(
+	action: DeployAction,
 	host: &ConfigHost,
 	built: PathBuf,
+	disable_rollback: bool,
 ) -> Result<()> {
 	let mut failed = false;
 	// TODO: Lockfile, to prevent concurrent system switch?
@@ -175,7 +127,7 @@
 	// is scheduler on next boot (default behavior). On current boot - rollback activator will fail due to
 	// unit name conflict in systemd-run
 	// This code is tied to rollback.nix
-	if !build.disable_rollback {
+	if !disable_rollback {
 		let _span = info_span!("preparing").entered();
 		info!("preparing for rollback");
 		let generation = get_current_generation(host).await?;
@@ -235,13 +187,13 @@
 		switch_script.push("bin");
 		switch_script.push("switch-to-configuration");
 		let mut cmd = host.cmd(switch_script).in_current_span().await?;
-		cmd.arg(action.name());
+		cmd.arg(action.name().expect("upload.should_activate == false"));
 		if let Err(e) = cmd.sudo().run().in_current_span().await {
 			error!("failed to activate: {e}");
 			failed = true;
 		}
 	}
-	if !build.disable_rollback {
+	if !disable_rollback {
 		if failed {
 			info!("executing rollback");
 			if let Err(e) = host
@@ -280,97 +232,45 @@
 	Ok(())
 }
 
-impl BuildSystems {
-	async fn build_task(self, config: Config, host: String) -> Result<()> {
-		info!("building");
-		let host = config.host(&host).await?;
-		let action = Action::from(self.subcommand.clone());
-		let fleet_config = &config.config_field;
-		let drv = nix_go!(
-			fleet_config.hosts[{ &host.name }].nixosSystem.config.system.build[{ action.build_attr() }]
-		);
-		let outputs = drv.build().await.map_err(|e| {
-			if action.build_attr() == "sdImage" {
+async fn build_task(config: Config, host: String, build_attr: &str) -> Result<PathBuf> {
+	info!("building");
+	let host = config.host(&host).await?;
+	// let action = Action::from(self.subcommand.clone());
+	let fleet_config = &config.config_field;
+	let drv = nix_go!(
+		fleet_config.hosts[{ &host.name }]
+			.nixosSystem
+			.config
+			.system
+			.build[{ build_attr }]
+	);
+	let outputs = drv.build().await.map_err(|e| {
+			if build_attr == "sdImage" {
 				info!("sd-image build failed");
 				info!("Make sure you have imported modulesPath/installer/sd-card/sd-image-<arch>[-installer].nix (For installer, you may want to check config)");
 			}
 			e
 		})?;
-		let out_output = outputs
-			.get("out")
-			.ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;
-
-		match action {
-			Action::Upload { action } => {
-				if !config.is_local(&host.name) {
-					info!("uploading system closure");
-					{
-						// TODO: Move to remote_derivation method.
-						// Alternatively, nix store make-content-addressed can be used,
-						// at least for the first deployment, to provide trusted store key.
-						//
-						// It is much slower, yet doesn't require root on the deployer machine.
-						let mut sign = MyCommand::new("nix");
-						// Private key for host machine is registered in nix-sign.nix
-						sign.arg("store")
-							.arg("sign")
-							.comparg("--key-file", "/etc/nix/private-key")
-							.arg("-r")
-							.arg(out_output);
-						if let Err(e) = sign.sudo().run_nix().await {
-							warn!("Failed to sign store paths: {e}");
-						};
-					}
-					let mut tries = 0;
-					loop {
-						match host.remote_derivation(out_output).await {
-							Ok(remote) => {
-								assert!(&remote == out_output, "CA derivations aren't implemented");
-								break;
-							}
-							Err(e) if tries < 3 => {
-								tries += 1;
-								warn!("Copy failure ({}/3): {}", tries, e);
-								sleep(Duration::from_millis(5000)).await;
-							}
-							Err(e) => return Err(e),
-						}
-					}
-				}
-				if let Some(action) = action {
-					execute_upload(&self, action, &host, out_output.clone()).await?
-				}
-			}
-			Action::Package(PackageAction::SdImage) => {
-				let mut out = current_dir()?;
-				out.push(format!("sd-image-{}", host.name));
-
-				info!("linking sd image to {:?}", out);
-				symlink(out_output, out)?;
-			}
-			Action::Package(PackageAction::InstallationCd) => {
-				let mut out = current_dir()?;
-				out.push(format!("installation-cd-{}", host.name));
+	let out_output = outputs
+		.get("out")
+		.ok_or_else(|| anyhow!("system build should produce \"out\" output"))?;
 
-				info!("linking iso image to {:?}", out);
-				symlink(out_output, out)?;
-			}
-		};
-		Ok(())
-	}
+	Ok(out_output.clone())
+}
 
+impl BuildSystems {
 	pub async fn run(self, config: &Config) -> Result<()> {
 		let hosts = config.list_hosts().await?;
 		let set = LocalSet::new();
-		let this = &self;
+		let build_attr = self.build_attr.clone();
 		for host in hosts.into_iter() {
 			if config.should_skip(&host.name) {
 				continue;
 			}
 			let config = config.clone();
-			let this = this.clone();
-			let span = info_span!("deployment", host = field::display(&host.name));
+			let span = info_span!("build", host = field::display(&host.name));
 			let hostname = host.name;
+			let build_attr = build_attr.clone();
 			// FIXME: Since the introduction of better-nix-eval,
 			// due to single repl used for builds, hosts are waiting for each other to build,
 			// instead of building concurrently.
@@ -384,11 +284,94 @@
 			// multiple hosts.
 			set.spawn_local(
 				(async move {
-					match this.build_task(config, hostname).await {
-						Ok(_) => {}
+					let built = match build_task(config, hostname.clone(), &build_attr).await {
+						Ok(path) => path,
+						Err(e) => {
+							error!("failed to deploy host: {}", e);
+							return;
+						}
+					};
+					// TODO: Handle error
+					let mut out = current_dir().expect("cwd exists");
+					out.push(format!("built-{}", hostname));
+
+					info!("linking iso image to {:?}", out);
+					if let Err(e) = symlink(built, out) {
+						error!("failed to symlink: {e}")
+					}
+				})
+				.instrument(span),
+			);
+		}
+		set.await;
+		Ok(())
+	}
+}
+
+impl Deploy {
+	pub async fn run(self, config: &Config) -> Result<()> {
+		let hosts = config.list_hosts().await?;
+		let set = LocalSet::new();
+		for host in hosts.into_iter() {
+			if config.should_skip(&host.name) {
+				continue;
+			}
+			let config = config.clone();
+			let span = info_span!("deploy", host = field::display(&host.name));
+			let hostname = host.name.clone();
+			// FIXME: Fix repl concurrency (see build-systems)
+			set.spawn_local(
+				(async move {
+					let built = match build_task(config.clone(), hostname.clone(), "toplevel").await
+					{
+						Ok(path) => path,
 						Err(e) => {
-							error!("failed to deploy host: {}", e)
+							error!("failed to deploy host: {}", e);
+							return;
 						}
+					};
+					if !config.is_local(&hostname) {
+						info!("uploading system closure");
+						{
+							// TODO: Move to remote_derivation method.
+							// Alternatively, nix store make-content-addressed can be used,
+							// at least for the first deployment, to provide trusted store key.
+							//
+							// It is much slower, yet doesn't require root on the deployer machine.
+							let mut sign = MyCommand::new("nix");
+							// Private key for host machine is registered in nix-sign.nix
+							sign.arg("store")
+								.arg("sign")
+								.comparg("--key-file", "/etc/nix/private-key")
+								.arg("-r")
+								.arg(&built);
+							if let Err(e) = sign.sudo().run_nix().await {
+								warn!("Failed to sign store paths: {e}");
+							};
+						}
+						let mut tries = 0;
+						loop {
+							match host.remote_derivation(&built).await {
+								Ok(remote) => {
+									assert!(remote == built, "CA derivations aren't implemented");
+									break;
+								}
+								Err(e) if tries < 3 => {
+									tries += 1;
+									warn!("copy failure ({}/3): {}", tries, e);
+									sleep(Duration::from_millis(5000)).await;
+								}
+								Err(e) => {
+									error!("upload failed: {e}");
+									return;
+								}
+							}
+						}
+					}
+					if let Err(e) =
+						deploy_task(self.action, &host, built, self.disable_rollback).await
+					{
+						error!("activation failed: {e}");
 					}
 				})
 				.instrument(span),
modifiedcmds/fleet/src/cmds/secrets/mod.rsdiffbeforeafterboth
--- a/cmds/fleet/src/cmds/secrets/mod.rs
+++ b/cmds/fleet/src/cmds/secrets/mod.rs
@@ -7,8 +7,6 @@
 use anyhow::{anyhow, bail, ensure, Context, Result};
 use chrono::{DateTime, Utc};
 use clap::Parser;
-use futures::StreamExt;
-use itertools::Itertools;
 use owo_colors::OwoColorize;
 use serde::Deserialize;
 use std::{
@@ -570,7 +568,7 @@
 					config.replace_shared(
 						name.to_owned(),
 						update_owner_set(
-							&name,
+							name,
 							config,
 							data,
 							secret,
modifiedcmds/fleet/src/host.rsdiffbeforeafterboth
--- a/cmds/fleet/src/host.rs
+++ b/cmds/fleet/src/host.rs
@@ -14,7 +14,6 @@
 use openssh::SessionBuilder;
 use serde::de::DeserializeOwned;
 use tempfile::NamedTempFile;
-use tracing::instrument;
 
 use crate::{
 	better_nix_eval::{Field, NixSessionPool},
@@ -90,6 +89,7 @@
 		cmd.arg(path);
 		cmd.run_string().await
 	}
+	#[allow(dead_code)]
 	pub async fn read_file_json<D: DeserializeOwned>(&self, path: impl AsRef<OsStr>) -> Result<D> {
 		let text = self.read_file_text(path).await?;
 		Ok(serde_json::from_str(&text)?)
modifiedcmds/fleet/src/main.rsdiffbeforeafterboth
--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -12,14 +12,17 @@
 mod fleetdata;
 
 use std::ffi::OsString;
-use std::io::{stderr, stdout, Write};
 use std::process::exit;
 use std::time::Duration;
 
 use anyhow::{bail, Result};
 use clap::Parser;
 
-use cmds::{build_systems::BuildSystems, info::Info, secrets::Secret};
+use cmds::{
+	build_systems::{BuildSystems, Deploy},
+	info::Info,
+	secrets::Secret,
+};
 use futures::future::LocalBoxFuture;
 use futures::stream::FuturesUnordered;
 use futures::TryStreamExt;
@@ -73,6 +76,8 @@
 enum Opts {
 	/// Prepare systems for deployments
 	BuildSystems(BuildSystems),
+
+	Deploy(Deploy),
 	/// Secret management
 	#[clap(subcommand)]
 	Secret(Secret),
@@ -94,6 +99,7 @@
 async fn run_command(config: &Config, command: Opts) -> Result<()> {
 	match command {
 		Opts::BuildSystems(c) => c.run(config).await?,
+		Opts::Deploy(d) => d.run(config).await?,
 		Opts::Secret(s) => s.run(config).await?,
 		Opts::Info(i) => i.run(config).await?,
 		Opts::Prefetch(p) => p.run(config).await?,
modifiedcrates/better-command/src/handler.rsdiffbeforeafterboth
before · crates/better-command/src/handler.rs
1//! Collection of handlers, which transform program-specific stdout format to tracing23use std::collections::HashMap;4use std::sync::{Arc, Mutex};56use once_cell::sync::Lazy;7use regex::Regex;8use serde::Deserialize;9use tracing::{Span, info, warn, info_span};10use tracing_indicatif::span_ext::IndicatifSpanExt as _;1112pub trait Handler: Send {13	fn handle_line(&mut self, e: &str);14}1516/// Handler wrapper, which can be cloned.17pub struct ClonableHandler<H>(Arc<Mutex<H>>);18impl<H> Clone for ClonableHandler<H> {19	fn clone(&self) -> Self {20		Self(self.0.clone())21	}22}23impl<H> ClonableHandler<H> {24	pub fn new(inner: H) -> Self {25		Self(Arc::new(Mutex::new(inner)))26	}27}28impl<H: Handler> Handler for ClonableHandler<H> {29	fn handle_line(&mut self, e: &str) {30		self.0.lock().unwrap().handle_line(e)31	}32}3334/// Converts command output to tracing lines35pub struct PlainHandler;36impl Handler for PlainHandler {37	fn handle_line(&mut self, e: &str) {38		info!(target: "log", "{e}");39	}40}4142/// Ignores output43pub struct NoopHandler;44impl Handler for NoopHandler {45	fn handle_line(&mut self, _e: &str) {}46}4748/// Transform nix internal-json logs to tracing spans.49#[derive(Default)]50pub struct NixHandler {51	spans: HashMap<u64, Span>,52}53#[derive(Deserialize, Debug)]54#[serde(untagged)]55enum LogField {56	String(String),57	Num(u64),58}5960/// Nix internal-json log line type61#[derive(Deserialize, Debug)]62#[serde(rename_all = "camelCase", tag = "action")]63#[allow(dead_code)]64enum NixLog {65	Msg {66		level: u32,67		msg: String,68		raw_msg: Option<String>,69	},70	Start {71		id: u64,72		level: u32,73		#[serde(default)]74		fields: Vec<LogField>,75		text: String,76		#[serde(rename = "type")]77		typ: u32,78	},79	Stop {80		id: u64,81	},82	Result {83		id: u64,84		#[serde(rename = "type")]85		typ: u32,86		#[serde(default)]87		fields: Vec<LogField>,88	},89}90fn process_message(m: &str) -> String {91	// Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.92	static OSC_CLEANER: Lazy<Regex> =93		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());94	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());95	let m = OSC_CLEANER.replace_all(m, "");96	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,97	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.98	DETABBER.replace_all(m.as_ref(), "  ").to_string()99}100impl Handler for NixHandler {101	fn handle_line(&mut self, e: &str) {102		if let Some(e) = e.strip_prefix("@nix ") {103			let log: NixLog = match serde_json::from_str(e) {104				Ok(l) => l,105				Err(err) => {106					warn!("failed to parse nix log line {:?}: {}", e, err);107					return;108				}109			};110			match log {111				NixLog::Msg { msg, raw_msg, .. } => {112					#[allow(clippy::nonminimal_bool)]113					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))114					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")115					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {116						if let Some(raw_msg) = raw_msg {117							if !msg.is_empty() {118								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())119							} else {120								info!(target: "nix", "{}", raw_msg.trim_end())121							}122						} else {123							info!(target: "nix", "{}", msg.trim_end())124						}125					}126				}127				NixLog::Start {128					ref fields,129					typ,130					id,131					..132				} if typ == 105 && !fields.is_empty() => {133					if let [LogField::String(drv), ..] = &fields[..] {134						let mut drv = drv.as_str();135						if let Some(pkg) = drv.strip_prefix("/nix/store/") {136							let mut it = pkg.splitn(2, '-');137							it.next();138							if let Some(pkg) = it.next() {139								drv = pkg;140							}141						}142						info!(target: "nix","building {}", drv);143						let span = info_span!("build", drv);144						span.pb_start();145						self.spans.insert(id, span);146					} else {147						warn!("bad build log: {:?}", log)148					}149				}150				NixLog::Start {151					ref fields,152					typ,153					id,154					..155				} if typ == 100 && fields.len() >= 3 => {156					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =157						&fields[..]158					{159						let mut drv = drv.as_str();160161						if let Some(pkg) = drv.strip_prefix("/nix/store/") {162							let mut it = pkg.splitn(2, '-');163							it.next();164							if let Some(pkg) = it.next() {165								drv = pkg;166							}167						}168						// info!(target: "nix","copying {} {} -> {}", drv, from, to);169						let span = info_span!("copy", from, to, drv);170						span.pb_start();171						self.spans.insert(id, span);172					} else {173						warn!("bad copy log: {:?}", log)174					}175				}176				NixLog::Start { text, typ, id, .. }177					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>178				{179					if !text.is_empty()180						&& text != "querying info about missing paths"181						&& text != "copying 0 paths"182						// Too much spam on lazy-trees branch183						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))184					{185						let span = info_span!("job");186						span.pb_start();187						span.pb_set_message(&process_message(text.trim()));188						self.spans.insert(id, span);189						info!(target: "nix", "{}", text);190					}191				}192				NixLog::Start {193					text,194					level: 0,195					typ: 108,196					..197				} if text.is_empty() => {198					// Cache lookup? Coupled with copy log199				}200				NixLog::Start {201					text,202					level: 4,203					typ: 109,204					..205				} if text.starts_with("querying info about ") => {206					// Cache lookup207				}208				NixLog::Start {209					text,210					level: 4,211					typ: 101,212					..213				} if text.starts_with("downloading ") => {214					// NAR downloading, coupled with copy log215				}216				NixLog::Start {217					text,218					level: 1,219					typ: 111,220					..221				} if text.starts_with("waiting for a machine to build ") => {222					// Useless repeating notification about build223				}224				NixLog::Start {225					text,226					level: 3,227					typ: 111,228					..229				} if text.starts_with("resolved derivation: ") => {230					// CA resolved231				}232				NixLog::Start {233					text,234					level: 1,235					typ: 111,236					id,237					..238				} if text.starts_with("waiting for lock on ") => {239					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();240					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {241						drv = txt;242					}243					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {244						drv = txt;245					}246					if let Some(txt) = drv.split("', '").next() {247						drv = txt;248					}249					if let Some(pkg) = drv.strip_prefix("/nix/store/") {250						let mut it = pkg.splitn(2, '-');251						it.next();252						if let Some(pkg) = it.next() {253							drv = pkg;254						}255					}256					let span = info_span!("waiting on drv", drv);257					span.pb_start();258					self.spans.insert(id, span);259					// Concurrent build of the same message260				}261				NixLog::Stop { id, .. } => {262					self.spans.remove(&id);263				}264				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {265					if let Some(span) = self.spans.get(&id) {266						if let LogField::String(s) = &fields[0] {267							span.pb_set_message(&process_message(s.trim()));268						} else {269							warn!("bad fields: {fields:?}");270						}271					} else {272						warn!("unknown result id: {id} {typ} {fields:?}");273					}274					// dbg!(fields, id, typ);275				}276				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {277					if let Some(span) = self.spans.get(&id) {278						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =279							&fields[..4]280						{281							span.pb_set_length(*expected);282							span.pb_set_position(*done);283						} else {284							warn!("bad fields: {fields:?}");285						}286					} else {287						// warn!("unknown result id: {id} {typ} {fields:?}");288						// Unaccounted progress.289					}290					// dbg!(fields, id, typ);291				}292				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {293					// Set phase, expected294				}295				_ => warn!("unknown log: {:?}", log),296			};297		} else {298			let e = e.trim();299			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {300				return;301			}302			info!("{e}")303		}304	}305}
after · crates/better-command/src/handler.rs
1//! Collection of handlers, which transform program-specific stdout format to tracing23use std::collections::HashMap;4use std::sync::{Arc, Mutex};56use once_cell::sync::Lazy;7use regex::Regex;8use serde::Deserialize;9use tracing::{Span, info, warn, info_span};10use tracing_indicatif::span_ext::IndicatifSpanExt as _;1112pub trait Handler: Send {13	fn handle_line(&mut self, e: &str);14}1516/// Handler wrapper, which can be cloned.17pub struct ClonableHandler<H>(Arc<Mutex<H>>);18impl<H> Clone for ClonableHandler<H> {19	fn clone(&self) -> Self {20		Self(self.0.clone())21	}22}23impl<H> ClonableHandler<H> {24	pub fn new(inner: H) -> Self {25		Self(Arc::new(Mutex::new(inner)))26	}27}28impl<H: Handler> Handler for ClonableHandler<H> {29	fn handle_line(&mut self, e: &str) {30		self.0.lock().unwrap().handle_line(e)31	}32}3334/// Converts command output to tracing lines35pub struct PlainHandler;36impl Handler for PlainHandler {37	fn handle_line(&mut self, e: &str) {38		info!(target: "log", "{e}");39	}40}4142/// Ignores output43pub struct NoopHandler;44impl Handler for NoopHandler {45	fn handle_line(&mut self, _e: &str) {}46}4748/// Transform nix internal-json logs to tracing spans.49#[derive(Default)]50pub struct NixHandler {51	spans: HashMap<u64, Span>,52}53#[derive(Deserialize, Debug)]54#[serde(untagged)]55enum LogField {56	String(String),57	Num(u64),58}5960/// Nix internal-json log line type61#[derive(Deserialize, Debug)]62#[serde(rename_all = "camelCase", tag = "action")]63#[allow(dead_code)]64enum NixLog {65	Msg {66		level: u32,67		msg: String,68		raw_msg: Option<String>,69	},70	Start {71		id: u64,72		level: u32,73		#[serde(default)]74		fields: Vec<LogField>,75		text: String,76		#[serde(rename = "type")]77		typ: u32,78	},79	Stop {80		id: u64,81	},82	Result {83		id: u64,84		#[serde(rename = "type")]85		typ: u32,86		#[serde(default)]87		fields: Vec<LogField>,88	},89}90fn process_message(m: &str) -> String {91	// Supposed to remove formatting characters except colors, as some programs try to reset cursor position etc.92	static OSC_CLEANER: Lazy<Regex> =93		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());94	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());95	let m = OSC_CLEANER.replace_all(m, "");96	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,97	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.98	DETABBER.replace_all(m.as_ref(), "  ").to_string()99}100impl Handler for NixHandler {101	fn handle_line(&mut self, e: &str) {102		if let Some(e) = e.strip_prefix("@nix ") {103			let log: NixLog = match serde_json::from_str(e) {104				Ok(l) => l,105				Err(err) => {106					warn!("failed to parse nix log line {:?}: {}", e, err);107					return;108				}109			};110			match log {111				NixLog::Msg { msg, raw_msg, .. } => {112					#[allow(clippy::nonminimal_bool)]113					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))114					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")115					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {116						if let Some(raw_msg) = raw_msg {117							if !msg.is_empty() {118								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())119							} else {120								info!(target: "nix", "{}", raw_msg.trim_end())121							}122						} else {123							info!(target: "nix", "{}", msg.trim_end())124						}125					}126				}127				NixLog::Start {128					ref fields,129					typ,130					id,131					..132				} if typ == 105 && !fields.is_empty() => {133					if let [LogField::String(drv), ..] = &fields[..] {134						let mut drv = drv.as_str();135						if let Some(pkg) = drv.strip_prefix("/nix/store/") {136							let mut it = pkg.splitn(2, '-');137							it.next();138							if let Some(pkg) = it.next() {139								drv = pkg;140							}141						}142						info!(target: "nix","building {}", drv);143						let span = info_span!("build", drv);144						span.pb_start();145						self.spans.insert(id, span);146					} else {147						warn!("bad build log: {:?}", log)148					}149				}150				NixLog::Start {151					ref fields,152					typ,153					id,154					..155				} if typ == 100 && fields.len() >= 3 => {156					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =157						&fields[..]158					{159						let mut drv = drv.as_str();160161						if let Some(pkg) = drv.strip_prefix("/nix/store/") {162							let mut it = pkg.splitn(2, '-');163							it.next();164							if let Some(pkg) = it.next() {165								drv = pkg;166							}167						}168						info!(target: "nix","copying {} {} -> {}", drv, from, to);169						let span = info_span!("copy", from, to, drv);170						span.pb_start();171						self.spans.insert(id, span);172					} else {173						warn!("bad copy log: {:?}", log)174					}175				}176				NixLog::Start { text, typ, id, .. }177					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>178				{179					if !text.is_empty()180						&& text != "querying info about missing paths"181						&& text != "copying 0 paths"182						// Too much spam on lazy-trees branch183						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))184					{185						let span = info_span!("job");186						span.pb_start();187						span.pb_set_message(&process_message(text.trim()));188						self.spans.insert(id, span);189						info!(target: "nix", "{}", text);190					}191				}192				NixLog::Start {193					text,194					level: 0,195					typ: 108,196					..197				} if text.is_empty() => {198					// Cache lookup? Coupled with copy log199				}200				NixLog::Start {201					text,202					level: 4,203					typ: 109,204					..205				} if text.starts_with("querying info about ") => {206					// Cache lookup207				}208				NixLog::Start {209					text,210					level: 4,211					typ: 101,212					..213				} if text.starts_with("downloading ") => {214					// NAR downloading, coupled with copy log215				}216				NixLog::Start {217					text,218					level: 1,219					typ: 111,220					..221				} if text.starts_with("waiting for a machine to build ") => {222					// Useless repeating notification about build223				}224				NixLog::Start {225					text,226					level: 3,227					typ: 111,228					..229				} if text.starts_with("resolved derivation: ") => {230					// CA resolved231				}232				NixLog::Start {233					text,234					level: 1,235					typ: 111,236					id,237					..238				} if text.starts_with("waiting for lock on ") => {239					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();240					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {241						drv = txt;242					}243					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {244						drv = txt;245					}246					if let Some(txt) = drv.split("', '").next() {247						drv = txt;248					}249					if let Some(pkg) = drv.strip_prefix("/nix/store/") {250						let mut it = pkg.splitn(2, '-');251						it.next();252						if let Some(pkg) = it.next() {253							drv = pkg;254						}255					}256					let span = info_span!("waiting on drv", drv);257					span.pb_start();258					self.spans.insert(id, span);259					// Concurrent build of the same message260				}261				NixLog::Stop { id, .. } => {262					self.spans.remove(&id);263				}264				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {265					if let Some(span) = self.spans.get(&id) {266						if let LogField::String(s) = &fields[0] {267							span.pb_set_message(&process_message(s.trim()));268						} else {269							warn!("bad fields: {fields:?}");270						}271					} else {272						warn!("unknown result id: {id} {typ} {fields:?}");273					}274					// dbg!(fields, id, typ);275				}276				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {277					if let Some(span) = self.spans.get(&id) {278						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =279							&fields[..4]280						{281							span.pb_set_length(*expected);282							span.pb_set_position(*done);283						} else {284							warn!("bad fields: {fields:?}");285						}286					} else {287						// warn!("unknown result id: {id} {typ} {fields:?}");288						// Unaccounted progress.289					}290					// dbg!(fields, id, typ);291				}292				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {293					// Set phase, expected294				}295				_ => warn!("unknown log: {:?}", log),296			};297		} else {298			let e = e.trim();299			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {300				return;301			}302			info!("{e}")303		}304	}305}
modifiedflake.lockdiffbeforeafterboth
--- a/flake.lock
+++ b/flake.lock
@@ -38,11 +38,11 @@
     },
     "nixpkgs": {
       "locked": {
-        "lastModified": 1703974965,
-        "narHash": "sha256-dvZjLuAcLnv25bqStTL2ZICC5YSs8aynF5amRM+I6UM=",
+        "lastModified": 1704409229,
+        "narHash": "sha256-Vc41cRJ3trOnocovLe0zZE35pK5Lfuo/zHk0xx3CNDY=",
         "owner": "nixos",
         "repo": "nixpkgs",
-        "rev": "9f434bd436e2bb5615827469ed651e30c26daada",
+        "rev": "786f788914f2a6e94cedf361541894e972b8fd23",
         "type": "github"
       },
       "original": {
@@ -67,11 +67,11 @@
         ]
       },
       "locked": {
-        "lastModified": 1703902408,
-        "narHash": "sha256-qXdWvu+tlgNjeoz8yQMRKSom6QyRROfgpmeOhwbujqw=",
+        "lastModified": 1704075545,
+        "narHash": "sha256-L3zgOuVKhPjKsVLc3yTm2YJ6+BATyZBury7wnhyc8QU=",
         "owner": "oxalica",
         "repo": "rust-overlay",
-        "rev": "319f57cd2c34348c55970a4bf2b35afe82088681",
+        "rev": "a0df72e106322b67e9c6e591fe870380bd0da0d5",
         "type": "github"
       },
       "original": {
modifiedflake.nixdiffbeforeafterboth
--- a/flake.nix
+++ b/flake.nix
@@ -29,7 +29,7 @@
         llvmPkgs = pkgs.buildPackages.llvmPackages_11;
         rust =
           (pkgs.rustChannelOf {
-            date = "2023-12-29";
+            date = "2024-01-01";
             channel = "nightly";
           })
           .default