git.delta.rocks / jrsonnet / refs/commits / 7e2e5c591e04

difftreelog

source

cmds/fleet/src/command.rs15.0 KiBsourcehistory
1use std::{2	collections::HashMap,3	ffi::OsStr,4	process::Stdio,5	sync::{Arc, Mutex},6	task::Poll,7};89use anyhow::{anyhow, Result};10use futures::StreamExt;11use itertools::Either;12use once_cell::sync::Lazy;13use openssh::{OverSsh, Session};14use regex::Regex;15use serde::{de::Visitor, Deserialize};16use tokio::{io::AsyncRead, process::Command, select};17use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};18use tracing::{info, info_span, warn, Span};19use tracing_indicatif::span_ext::IndicatifSpanExt;2021fn escape_bash(input: &str, out: &mut String) {22	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";23	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {24		out.push_str(input);25		return;26	}27	out.push('\'');28	for (i, v) in input.split('\'').enumerate() {29		if i != 0 {30			out.push_str("'\"'\"'");31		}32		out.push_str(v);33	}34	out.push('\'');35}36fn ostoutf8(os: impl AsRef<OsStr>) -> String {37	os.as_ref().to_str().expect("non-utf8 data").to_owned()38}39#[derive(Clone)]40pub struct MyCommand {41	command: String,42	args: Vec<String>,43	env: Vec<(String, String)>,44	ssh_session: Option<Arc<Session>>,45}46impl MyCommand {47	pub fn new(cmd: impl AsRef<OsStr>) -> Self {48		assert!(!cmd.as_ref().is_empty());49		Self {50			command: ostoutf8(cmd),51			args: vec![],52			env: vec![],53			ssh_session: None,54		}55	}56	fn into_args(self) -> Vec<String> {57		let mut out = Vec::new();58		if !self.env.is_empty() {59			out.push("env".to_owned());60			for (k, v) in self.env {61				assert!(!k.contains('='));62				out.push(format!("{k}={v}"));63			}64		}65		out.push(self.command);66		out.extend(self.args);67		out68	}69	fn into_string(self) -> String {70		let mut out = String::new();71		if !self.env.is_empty() {72			out.push_str("env");73			for (k, v) in self.env {74				out.push(' ');75				assert!(!k.contains('='));76				escape_bash(&k, &mut out);77				out.push('=');78				escape_bash(&v, &mut out);79			}80		}81		if !out.is_empty() {82			out.push(' ');83		}84		escape_bash(&self.command, &mut out);85		for arg in self.args {86			out.push(' ');87			escape_bash(&arg, &mut out);88		}89		out90	}91	fn into_command(self) -> Command {92		let mut out = Command::new(self.command);93		out.args(self.args);94		for (k, v) in self.env {95			out.env(k, v);96		}97		out98	}99	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {100		Ok(if let Some(session) = self.ssh_session.clone() {101			let cmd = self.into_command();102			Either::Right(103				cmd.over_ssh(session)104					.map_err(|e| anyhow!("ssh error: {e}"))?,105			)106		} else {107			let cmd = self.into_command();108			Either::Left(cmd)109		})110	}111	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {112		let arg = arg.as_ref();113		self.args.push(ostoutf8(arg));114		self115	}116	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {117		let arg = arg.as_ref();118		let value = value.as_ref();119		let arg = ostoutf8(arg);120		let value = ostoutf8(value);121		self.arg(format!("{arg}={value}"));122		self123	}124	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {125		self.arg(arg);126		self.arg(value);127		self128	}129	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {130		for arg in args.into_iter() {131			let arg = arg.as_ref();132			self.args.push(ostoutf8(arg));133		}134		self135	}136	pub fn sudo(self) -> Self {137		if std::env::var_os("NO_SUDO").is_some() {138			let mut out = Self::new("su");139			out.arg("-c").arg(self.into_string());140			out141		} else {142			let mut out = Self::new("sudo");143			out.args(self.into_args());144			out145		}146	}147	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {148		let mut out = Self::new("ssh");149		out.arg(on).arg("--");150		out.arg(self.into_string());151		out152	}153	pub fn over_ssh(mut self, session: Arc<Session>) -> Self {154		self.ssh_session = Some(session);155		self156	}157158	pub async fn run(self) -> Result<()> {159		let str = self.clone().into_string();160		let cmd = self.into_command();161		run_nix_inner(str, cmd, &mut PlainHandler).await?;162		Ok(())163	}164	pub async fn run_string(self) -> Result<String> {165		let str = self.clone().into_string();166		let cmd = self.into_command();167		let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;168		Ok(v)169	}170171	pub async fn run_nix_string(self) -> Result<String> {172		let str = self.clone().into_string();173		let mut cmd = self.into_command();174		cmd.arg("--log-format").arg("internal-json");175		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await176	}177	pub async fn run_nix(self) -> Result<()> {178		let str = self.clone().into_string();179		let mut cmd = self.into_command();180		cmd.arg("--log-format").arg("internal-json");181		cmd.stdout(Stdio::inherit());182		run_nix_inner(str, cmd, &mut NixHandler::default()).await183	}184}185186struct EmptyAsyncRead;187impl AsyncRead for EmptyAsyncRead {188	fn poll_read(189		self: std::pin::Pin<&mut Self>,190		_cx: &mut std::task::Context<'_>,191		_buf: &mut tokio::io::ReadBuf<'_>,192	) -> Poll<std::io::Result<()>> {193		Poll::Pending194	}195}196197async fn run_nix_inner_stdout(198	str: String,199	cmd: Command,200	handler: &mut dyn Handler,201) -> Result<String> {202	Ok(run_nix_inner_raw(str, cmd, true, handler, None)203		.await?204		.expect("has out"))205}206async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {207	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;208	assert!(v.is_none());209	Ok(())210}211212pub trait Handler: Send {213	fn handle_line(&mut self, e: &str);214}215216pub struct ClonableHandler<H>(Arc<Mutex<H>>);217impl<H> Clone for ClonableHandler<H> {218	fn clone(&self) -> Self {219		Self(self.0.clone())220	}221}222impl<H> ClonableHandler<H> {223	pub fn new(inner: H) -> Self {224		Self(Arc::new(Mutex::new(inner)))225	}226}227impl<H: Handler> Handler for ClonableHandler<H> {228	fn handle_line(&mut self, e: &str) {229		self.0.lock().unwrap().handle_line(e)230	}231}232233struct PlainHandler;234impl Handler for PlainHandler {235	fn handle_line(&mut self, e: &str) {236		info!(target: "log", "{e}");237	}238}239240pub struct NoopHandler;241impl Handler for NoopHandler {242	fn handle_line(&mut self, _e: &str) {}243}244245#[derive(Default)]246pub struct NixHandler {247	spans: HashMap<u64, Span>,248}249fn process_message(m: &str) -> String {250	static OSC_CLEANER: Lazy<Regex> =251		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());252	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());253	let m = OSC_CLEANER.replace_all(m, "");254	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,255	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.256	DETABBER.replace_all(m.as_ref(), "  ").to_string()257}258impl Handler for NixHandler {259	fn handle_line(&mut self, e: &str) {260		if let Some(e) = e.strip_prefix("@nix ") {261			let log: NixLog = match serde_json::from_str(e) {262				Ok(l) => l,263				Err(err) => {264					warn!("failed to parse nix log line {:?}: {}", e, err);265					return;266				}267			};268			match log {269				NixLog::Msg { msg, raw_msg, .. } => {270					#[allow(clippy::nonminimal_bool)]271					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))272					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")273					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {274						if let Some(raw_msg) = raw_msg {275							if !msg.is_empty() {276								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())277							} else {278								info!(target: "nix", "{}", raw_msg.trim_end())279							}280						} else {281							info!(target: "nix", "{}", msg.trim_end())282						}283					}284				}285				NixLog::Start {286					ref fields,287					typ,288					id,289					..290				} if typ == 105 && !fields.is_empty() => {291					if let [LogField::String(drv), ..] = &fields[..] {292						let mut drv = drv.as_str();293						if let Some(pkg) = drv.strip_prefix("/nix/store/") {294							let mut it = pkg.splitn(2, '-');295							it.next();296							if let Some(pkg) = it.next() {297								drv = pkg;298							}299						}300						info!(target: "nix","building {}", drv);301						let span = info_span!("build", drv);302						span.pb_start();303						self.spans.insert(id, span);304					} else {305						warn!("bad build log: {:?}", log)306					}307				}308				NixLog::Start {309					ref fields,310					typ,311					id,312					..313				} if typ == 100 && fields.len() >= 3 => {314					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =315						&fields[..]316					{317						let mut drv = drv.as_str();318319						if let Some(pkg) = drv.strip_prefix("/nix/store/") {320							let mut it = pkg.splitn(2, '-');321							it.next();322							if let Some(pkg) = it.next() {323								drv = pkg;324							}325						}326						// info!(target: "nix","copying {} {} -> {}", drv, from, to);327						let span = info_span!("copy", from, to, drv);328						span.pb_start();329						self.spans.insert(id, span);330					} else {331						warn!("bad copy log: {:?}", log)332					}333				}334				NixLog::Start { text, typ, id, .. }335					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>336				{337					if !text.is_empty()338						&& text != "querying info about missing paths"339						&& text != "copying 0 paths"340						// Too much spam on lazy-trees branch341						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))342					{343						let span = info_span!("job");344						span.pb_start();345						span.pb_set_message(&process_message(text.trim()));346						self.spans.insert(id, span);347						info!(target: "nix", "{}", text);348					}349				}350				NixLog::Start {351					text,352					level: 0,353					typ: 108,354					..355				} if text.is_empty() => {356					// Cache lookup? Coupled with copy log357				}358				NixLog::Start {359					text,360					level: 4,361					typ: 109,362					..363				} if text.starts_with("querying info about ") => {364					// Cache lookup365				}366				NixLog::Start {367					text,368					level: 4,369					typ: 101,370					..371				} if text.starts_with("downloading ") => {372					// NAR downloading, coupled with copy log373				}374				NixLog::Start {375					text,376					level: 1,377					typ: 111,378					..379				} if text.starts_with("waiting for a machine to build ") => {380					// Useless repeating notification about build381				}382				NixLog::Start {383					text,384					level: 3,385					typ: 111,386					..387				} if text.starts_with("resolved derivation: ") => {388					// CA resolved389				}390				NixLog::Start {391					text,392					level: 1,393					typ: 111,394					id,395					..396				} if text.starts_with("waiting for lock on ") => {397					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();398					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {399						drv = txt;400					}401					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {402						drv = txt;403					}404					if let Some(txt) = drv.split("', '").next() {405						drv = txt;406					}407					if let Some(pkg) = drv.strip_prefix("/nix/store/") {408						let mut it = pkg.splitn(2, '-');409						it.next();410						if let Some(pkg) = it.next() {411							drv = pkg;412						}413					}414					let span = info_span!("waiting on drv", drv);415					span.pb_start();416					self.spans.insert(id, span);417					// Concurrent build of the same message418				}419				NixLog::Stop { id, .. } => {420					self.spans.remove(&id);421				}422				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {423					if let Some(span) = self.spans.get(&id) {424						if let LogField::String(s) = &fields[0] {425							span.pb_set_message(&process_message(s.trim()));426						} else {427							warn!("bad fields: {fields:?}");428						}429					} else {430						warn!("unknown result id: {id} {typ} {fields:?}");431					}432					// dbg!(fields, id, typ);433				}434				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {435					if let Some(span) = self.spans.get(&id) {436						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =437							&fields[..4]438						{439							span.pb_set_length(*expected);440							span.pb_set_position(*done);441						} else {442							warn!("bad fields: {fields:?}");443						}444					} else {445						// warn!("unknown result id: {id} {typ} {fields:?}");446						// Unaccounted progress.447					}448					// dbg!(fields, id, typ);449				}450				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {451					// Set phase, expected452				}453				_ => warn!("unknown log: {:?}", log),454			};455		} else {456			let e = e.trim();457			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {458				return;459			}460			info!("{e}")461		}462	}463}464465async fn run_nix_inner_raw(466	str: String,467	mut cmd: Command,468	want_stdout: bool,469	err_handler: &mut dyn Handler,470	mut out_handler: Option<&mut dyn Handler>,471) -> Result<Option<String>> {472	cmd.stderr(Stdio::piped());473	cmd.stdout(Stdio::piped());474	let mut child = cmd.spawn()?;475	let mut stderr = child.stderr.take().unwrap();476	let stdout = child.stdout.take().unwrap();477	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());478	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));479	let mut ob = want_stdout480		.then(|| out.take().unwrap())481		.unwrap_or_else(|| Box::new(EmptyAsyncRead));482	let mut ol = (!want_stdout)483		.then(|| out.take().unwrap())484		.unwrap_or_else(|| Box::new(EmptyAsyncRead));485	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());486	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());487488	// while let Some(line) = read.next().await? {}489490	let mut out_buf = if want_stdout { Some(vec![]) } else { None };491	loop {492		select! {493			e = err.next() => {494				if let Some(e) = e {495					let e = e?;496					err_handler.handle_line(&e);497				}498			},499			o = ob.next() => {500				if let Some(o) = o {501					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);502				}503			},504			o = ol.next() => {505				if let Some(o) = o {506					let o = o?;507					if let Some(out) = out_handler.as_mut() {508						out.handle_line(&o)509					} else {510						err_handler.handle_line(&o)511					}512					// out_handler.handle_info(&o);513				}514			},515			code = child.wait() => {516				let code = code?;517				if !code.success() {518					anyhow::bail!("command '{str}' failed with status {}", code);519				}520				break;521			}522		}523	}524525	Ok(out_buf.map(String::from_utf8).transpose()?)526}527528pub trait ErrorRecorder: Send {529	/// Return true to discard message from logging530	fn push_message(&mut self, msg: &str) -> bool;531}532533#[derive(Debug)]534enum LogField {535	String(String),536	Num(u64),537}538539impl<'de> Deserialize<'de> for LogField {540	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>541	where542		D: serde::Deserializer<'de>,543	{544		struct StringOrNum;545		impl<'de> Visitor<'de> for StringOrNum {546			type Value = LogField;547548			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {549				write!(f, "string or unsigned")550			}551552			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>553			where554				E: serde::de::Error,555			{556				Ok(LogField::String(v.to_owned()))557			}558559			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>560			where561				E: serde::de::Error,562			{563				Ok(LogField::Num(v))564			}565		}566567		deserializer.deserialize_any(StringOrNum)568	}569}570571#[derive(Deserialize, Debug)]572#[serde(rename_all = "camelCase", tag = "action")]573#[allow(dead_code)]574enum NixLog {575	Msg {576		level: u32,577		msg: String,578		raw_msg: Option<String>,579	},580	Start {581		id: u64,582		level: u32,583		#[serde(default)]584		fields: Vec<LogField>,585		text: String,586		#[serde(rename = "type")]587		typ: u32,588	},589	Stop {590		id: u64,591	},592	Result {593		id: u64,594		#[serde(rename = "type")]595		typ: u32,596		#[serde(default)]597		fields: Vec<LogField>,598	},599}