git.delta.rocks / jrsonnet / refs/commits / 89d35672dcfd

difftreelog

source

cmds/fleet/src/command.rs14.9 KiBsourcehistory
1use std::{2	collections::HashMap,3	ffi::OsStr,4	process::Stdio,5	sync::{Arc, Mutex},6	task::Poll,7};89use anyhow::{anyhow, Result};10use futures::StreamExt;11use itertools::Either;12use once_cell::sync::Lazy;13use openssh::{OverSsh, Session};14use regex::Regex;15use serde::{de::Visitor, Deserialize};16use tokio::{io::AsyncRead, process::Command, select};17use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};18use tracing::{info, info_span, warn, Span};19use tracing_indicatif::span_ext::IndicatifSpanExt;2021fn escape_bash(input: &str, out: &mut String) {22	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";23	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {24		out.push_str(input);25		return;26	}27	out.push('\'');28	for (i, v) in input.split('\'').enumerate() {29		if i != 0 {30			out.push_str("'\"'\"'");31		}32		out.push_str(v);33	}34	out.push('\'');35}36fn ostoutf8(os: impl AsRef<OsStr>) -> String {37	os.as_ref().to_str().expect("non-utf8 data").to_owned()38}39#[derive(Clone)]40pub struct MyCommand {41	command: String,42	args: Vec<String>,43	env: Vec<(String, String)>,44	ssh_session: Option<Arc<Session>>,45}46impl MyCommand {47	pub fn new(cmd: impl AsRef<OsStr>) -> Self {48		assert!(!cmd.as_ref().is_empty());49		Self {50			command: ostoutf8(cmd),51			args: vec![],52			env: vec![],53			ssh_session: None,54		}55	}56	fn into_args(self) -> Vec<String> {57		let mut out = Vec::new();58		if !self.env.is_empty() {59			out.push("env".to_owned());60			for (k, v) in self.env {61				assert!(!k.contains('='));62				out.push(format!("{k}={v}"));63			}64		}65		out.push(self.command);66		out.extend(self.args);67		out68	}69	fn into_string(self) -> String {70		let mut out = String::new();71		if !self.env.is_empty() {72			out.push_str("env");73			for (k, v) in self.env {74				out.push(' ');75				assert!(!k.contains('='));76				escape_bash(&k, &mut out);77				out.push('=');78				escape_bash(&v, &mut out);79			}80		}81		if !out.is_empty() {82			out.push(' ');83		}84		escape_bash(&self.command, &mut out);85		for arg in self.args {86			out.push(' ');87			escape_bash(&arg, &mut out);88		}89		out90	}91	fn into_command(self) -> Command {92		let mut out = Command::new(self.command);93		out.args(self.args);94		for (k, v) in self.env {95			out.env(k, v);96		}97		out98	}99	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {100		Ok(if let Some(session) = self.ssh_session.clone() {101			let cmd = self.into_command();102			Either::Right(103				cmd.over_ssh(session)104					.map_err(|e| anyhow!("ssh error: {e}"))?,105			)106		} else {107			let cmd = self.into_command();108			Either::Left(cmd)109		})110	}111	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {112		let arg = arg.as_ref();113		self.args.push(ostoutf8(arg));114		self115	}116	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {117		let arg = arg.as_ref();118		let value = value.as_ref();119		let arg = ostoutf8(arg);120		let value = ostoutf8(value);121		self.arg(format!("{arg}={value}"));122		self123	}124	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {125		self.arg(arg);126		self.arg(value);127		self128	}129	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {130		for arg in args.into_iter() {131			let arg = arg.as_ref();132			self.args.push(ostoutf8(arg));133		}134		self135	}136	pub fn sudo(self) -> Self {137		if std::env::var_os("NO_SUDO").is_some() {138			let mut out = Self::new("su");139			out.arg("-c").arg(self.into_string());140			out141		} else {142			let mut out = Self::new("sudo");143			out.args(self.into_args());144			out145		}146	}147	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {148		let mut out = Self::new("ssh");149		out.arg(on).arg("--");150		out.arg(self.into_string());151		out152	}153	pub fn over_ssh(mut self, session: Arc<Session>) -> Self {154		self.ssh_session = Some(session);155		self156	}157158	pub async fn run(self) -> Result<()> {159		let str = self.clone().into_string();160		let cmd = self.into_command();161		run_nix_inner(str, cmd, &mut PlainHandler).await?;162		Ok(())163	}164	pub async fn run_string(self) -> Result<String> {165		let str = self.clone().into_string();166		let cmd = self.into_command();167		let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;168		Ok(v)169	}170171	pub async fn run_nix_string(self) -> Result<String> {172		let str = self.clone().into_string();173		let mut cmd = self.into_command();174		cmd.arg("--log-format").arg("internal-json");175		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await176	}177	pub async fn run_nix(self) -> Result<()> {178		let str = self.clone().into_string();179		let mut cmd = self.into_command();180		cmd.arg("--log-format").arg("internal-json");181		cmd.stdout(Stdio::inherit());182		run_nix_inner(str, cmd, &mut NixHandler::default()).await183	}184}185186struct EmptyAsyncRead;187impl AsyncRead for EmptyAsyncRead {188	fn poll_read(189		self: std::pin::Pin<&mut Self>,190		_cx: &mut std::task::Context<'_>,191		_buf: &mut tokio::io::ReadBuf<'_>,192	) -> Poll<std::io::Result<()>> {193		Poll::Pending194	}195}196197async fn run_nix_inner_stdout(198	str: String,199	cmd: Command,200	handler: &mut dyn Handler,201) -> Result<String> {202	Ok(run_nix_inner_raw(str, cmd, true, handler, None)203		.await?204		.expect("has out"))205}206async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {207	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;208	assert!(v.is_none());209	Ok(())210}211212pub trait Handler: Send {213	fn handle_line(&mut self, e: &str);214}215216pub struct ClonableHandler<H>(Arc<Mutex<H>>);217impl<H> Clone for ClonableHandler<H> {218	fn clone(&self) -> Self {219		Self(self.0.clone())220	}221}222impl<H> ClonableHandler<H> {223	pub fn new(inner: H) -> Self {224		Self(Arc::new(Mutex::new(inner)))225	}226}227impl<H: Handler> Handler for ClonableHandler<H> {228	fn handle_line(&mut self, e: &str) {229		self.0.lock().unwrap().handle_line(e)230	}231}232233struct PlainHandler;234impl Handler for PlainHandler {235	fn handle_line(&mut self, e: &str) {236		info!(target: "log", "{e}");237	}238}239240pub struct NoopHandler;241impl Handler for NoopHandler {242	fn handle_line(&mut self, _e: &str) {}243}244245#[derive(Default)]246pub struct NixHandler {247	spans: HashMap<u64, Span>,248}249fn process_message(m: &str) -> String {250	static OSC_CLEANER: Lazy<Regex> =251		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());252	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());253	let m = OSC_CLEANER.replace_all(m, "");254	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,255	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.256	DETABBER.replace_all(m.as_ref(), "  ").to_string()257}258impl Handler for NixHandler {259	fn handle_line(&mut self, e: &str) {260		if let Some(e) = e.strip_prefix("@nix ") {261			let log: NixLog = match serde_json::from_str(e) {262				Ok(l) => l,263				Err(err) => {264					warn!("failed to parse nix log line {:?}: {}", e, err);265					return;266				}267			};268			match log {269				NixLog::Msg { msg, raw_msg, .. } => {270					#[allow(clippy::nonminimal_bool)]271					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))272					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")273					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {274						if let Some(raw_msg) = raw_msg {275							if !msg.is_empty() {276								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())277							} else {278								info!(target: "nix", "{}", raw_msg.trim_end())279							}280						} else {281							info!(target: "nix", "{}", msg.trim_end())282						}283					}284				}285				NixLog::Start {286					ref fields,287					typ,288					id,289					..290				} if typ == 105 && !fields.is_empty() => {291					if let [LogField::String(drv), ..] = &fields[..] {292						let mut drv = drv.as_str();293						if let Some(pkg) = drv.strip_prefix("/nix/store/") {294							let mut it = pkg.splitn(2, '-');295							it.next();296							if let Some(pkg) = it.next() {297								drv = pkg;298							}299						}300						info!(target: "nix","building {}", drv);301						let span = info_span!("build", drv);302						span.pb_start();303						self.spans.insert(id, span);304					} else {305						warn!("bad build log: {:?}", log)306					}307				}308				NixLog::Start {309					ref fields,310					typ,311					id,312					..313				} if typ == 100 && fields.len() >= 3 => {314					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =315						&fields[..]316					{317						let mut drv = drv.as_str();318319						if let Some(pkg) = drv.strip_prefix("/nix/store/") {320							let mut it = pkg.splitn(2, '-');321							it.next();322							if let Some(pkg) = it.next() {323								drv = pkg;324							}325						}326						// info!(target: "nix","copying {} {} -> {}", drv, from, to);327						let span = info_span!("copy", from, to, drv);328						span.pb_start();329						self.spans.insert(id, span);330					} else {331						warn!("bad copy log: {:?}", log)332					}333				}334				NixLog::Start { text, typ, id, .. }335					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>336				{337					if !text.is_empty()338						&& text != "querying info about missing paths"339						&& text != "copying 0 paths"340					{341						let span = info_span!("job");342						span.pb_start();343						span.pb_set_message(&process_message(text.trim()));344						self.spans.insert(id, span);345						info!(target: "nix", "{}", text);346					}347				}348				NixLog::Start {349					text,350					level: 0,351					typ: 108,352					..353				} if text.is_empty() => {354					// Cache lookup? Coupled with copy log355				}356				NixLog::Start {357					text,358					level: 4,359					typ: 109,360					..361				} if text.starts_with("querying info about ") => {362					// Cache lookup363				}364				NixLog::Start {365					text,366					level: 4,367					typ: 101,368					..369				} if text.starts_with("downloading ") => {370					// NAR downloading, coupled with copy log371				}372				NixLog::Start {373					text,374					level: 1,375					typ: 111,376					..377				} if text.starts_with("waiting for a machine to build ") => {378					// Useless repeating notification about build379				}380				NixLog::Start {381					text,382					level: 3,383					typ: 111,384					..385				} if text.starts_with("resolved derivation: ") => {386					// CA resolved387				}388				NixLog::Start {389					text,390					level: 1,391					typ: 111,392					id,393					..394				} if text.starts_with("waiting for lock on ") => {395					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();396					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {397						drv = txt;398					}399					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {400						drv = txt;401					}402					if let Some(txt) = drv.split("', '").next() {403						drv = txt;404					}405					if let Some(pkg) = drv.strip_prefix("/nix/store/") {406						let mut it = pkg.splitn(2, '-');407						it.next();408						if let Some(pkg) = it.next() {409							drv = pkg;410						}411					}412					let span = info_span!("waiting on drv", drv);413					span.pb_start();414					self.spans.insert(id, span);415					// Concurrent build of the same message416				}417				NixLog::Stop { id, .. } => {418					self.spans.remove(&id);419				}420				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {421					if let Some(span) = self.spans.get(&id) {422						if let LogField::String(s) = &fields[0] {423							span.pb_set_message(&process_message(s.trim()));424						} else {425							warn!("bad fields: {fields:?}");426						}427					} else {428						warn!("unknown result id: {id} {typ} {fields:?}");429					}430					// dbg!(fields, id, typ);431				}432				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {433					if let Some(span) = self.spans.get(&id) {434						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =435							&fields[..4]436						{437							span.pb_set_length(*expected);438							span.pb_set_position(*done);439						} else {440							warn!("bad fields: {fields:?}");441						}442					} else {443						// warn!("unknown result id: {id} {typ} {fields:?}");444						// Unaccounted progress.445					}446					// dbg!(fields, id, typ);447				}448				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {449					// Set phase, expected450				}451				_ => warn!("unknown log: {:?}", log),452			};453		} else {454			let e = e.trim();455			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {456				return;457			}458			info!("{e}")459		}460	}461}462463async fn run_nix_inner_raw(464	str: String,465	mut cmd: Command,466	want_stdout: bool,467	err_handler: &mut dyn Handler,468	mut out_handler: Option<&mut dyn Handler>,469) -> Result<Option<String>> {470	cmd.stderr(Stdio::piped());471	cmd.stdout(Stdio::piped());472	let mut child = cmd.spawn()?;473	let mut stderr = child.stderr.take().unwrap();474	let stdout = child.stdout.take().unwrap();475	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());476	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));477	let mut ob = want_stdout478		.then(|| out.take().unwrap())479		.unwrap_or_else(|| Box::new(EmptyAsyncRead));480	let mut ol = (!want_stdout)481		.then(|| out.take().unwrap())482		.unwrap_or_else(|| Box::new(EmptyAsyncRead));483	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());484	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());485486	// while let Some(line) = read.next().await? {}487488	let mut out_buf = if want_stdout { Some(vec![]) } else { None };489	loop {490		select! {491			e = err.next() => {492				if let Some(e) = e {493					let e = e?;494					err_handler.handle_line(&e);495				}496			},497			o = ob.next() => {498				if let Some(o) = o {499					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);500				}501			},502			o = ol.next() => {503				if let Some(o) = o {504					let o = o?;505					if let Some(out) = out_handler.as_mut() {506						out.handle_line(&o)507					} else {508						err_handler.handle_line(&o)509					}510					// out_handler.handle_info(&o);511				}512			},513			code = child.wait() => {514				let code = code?;515				if !code.success() {516					anyhow::bail!("command '{str}' failed with status {}", code);517				}518				break;519			}520		}521	}522523	Ok(out_buf.map(String::from_utf8).transpose()?)524}525526pub trait ErrorRecorder: Send {527	/// Return true to discard message from logging528	fn push_message(&mut self, msg: &str) -> bool;529}530531#[derive(Debug)]532enum LogField {533	String(String),534	Num(u64),535}536537impl<'de> Deserialize<'de> for LogField {538	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>539	where540		D: serde::Deserializer<'de>,541	{542		struct StringOrNum;543		impl<'de> Visitor<'de> for StringOrNum {544			type Value = LogField;545546			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {547				write!(f, "string or unsigned")548			}549550			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>551			where552				E: serde::de::Error,553			{554				Ok(LogField::String(v.to_owned()))555			}556557			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>558			where559				E: serde::de::Error,560			{561				Ok(LogField::Num(v))562			}563		}564565		deserializer.deserialize_any(StringOrNum)566	}567}568569#[derive(Deserialize, Debug)]570#[serde(rename_all = "camelCase", tag = "action")]571#[allow(dead_code)]572enum NixLog {573	Msg {574		level: u32,575		msg: String,576		raw_msg: Option<String>,577	},578	Start {579		id: u64,580		level: u32,581		#[serde(default)]582		fields: Vec<LogField>,583		text: String,584		#[serde(rename = "type")]585		typ: u32,586	},587	Stop {588		id: u64,589	},590	Result {591		id: u64,592		#[serde(rename = "type")]593		typ: u32,594		#[serde(default)]595		fields: Vec<LogField>,596	},597}