git.delta.rocks / jrsonnet / refs/commits / 3f73827e390b

difftreelog

source

cmds/fleet/src/command.rs13.6 KiBsourcehistory
1use std::{2	collections::HashMap,3	ffi::OsStr,4	process::Stdio,5	sync::{Arc, Mutex},6	task::Poll,7};89use anyhow::Result;10use futures::StreamExt;11use serde::{de::Visitor, Deserialize};12use tokio::{io::AsyncRead, process::Command, select};13use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};14use tracing::{info, info_span, warn, Span};15use tracing_indicatif::span_ext::IndicatifSpanExt;1617fn escape_bash(input: &str, out: &mut String) {18	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";19	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {20		out.push_str(input);21		return;22	}23	out.push('\'');24	for (i, v) in input.split('\'').enumerate() {25		if i != 0 {26			out.push_str("'\"'\"'");27		}28		out.push_str(v);29	}30	out.push('\'');31}32fn ostoutf8(os: impl AsRef<OsStr>) -> String {33	os.as_ref().to_str().expect("non-utf8 data").to_owned()34}35#[derive(Clone)]36pub struct MyCommand {37	command: String,38	args: Vec<String>,39	env: Vec<(String, String)>,40}41impl MyCommand {42	pub fn new(cmd: impl AsRef<OsStr>) -> Self {43		assert!(!cmd.as_ref().is_empty());44		Self {45			command: ostoutf8(cmd),46			args: vec![],47			env: vec![],48		}49	}50	fn into_args(self) -> Vec<String> {51		let mut out = Vec::new();52		if !self.env.is_empty() {53			out.push("env".to_owned());54			for (k, v) in self.env {55				assert!(!k.contains('='));56				out.push(format!("{k}={v}"));57			}58		}59		out.push(self.command);60		out.extend(self.args);61		out62	}63	fn into_string(self) -> String {64		let mut out = String::new();65		if !self.env.is_empty() {66			out.push_str("env");67			for (k, v) in self.env {68				out.push(' ');69				assert!(!k.contains('='));70				escape_bash(&k, &mut out);71				out.push('=');72				escape_bash(&v, &mut out);73			}74		}75		if !out.is_empty() {76			out.push(' ');77		}78		escape_bash(&self.command, &mut out);79		for arg in self.args {80			out.push(' ');81			escape_bash(&arg, &mut out);82		}83		out84	}85	fn into_command(self) -> Command {86		let mut out = Command::new(self.command);87		out.args(self.args);88		for (k, v) in self.env {89			out.env(k, v);90		}91		out92	}93	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {94		let arg = arg.as_ref();95		self.args.push(ostoutf8(arg));96		self97	}98	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {99		let arg = arg.as_ref();100		let value = value.as_ref();101		let arg = ostoutf8(arg);102		let value = ostoutf8(value);103		self.arg(format!("{arg}={value}"));104		self105	}106	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {107		self.arg(arg);108		self.arg(value);109		self110	}111	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {112		for arg in args.into_iter() {113			let arg = arg.as_ref();114			self.args.push(ostoutf8(arg));115		}116		self117	}118	pub fn sudo(self) -> Self {119		let mut out = Self::new("sudo");120		out.args(self.into_args());121		out122	}123	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {124		let mut out = Self::new("ssh");125		out.arg(on).arg("--");126		out.arg(self.into_string());127		out128	}129130	pub async fn run(self) -> Result<()> {131		let str = self.clone().into_string();132		let cmd = self.into_command();133		run_nix_inner(str, cmd, &mut PlainHandler).await?;134		Ok(())135	}136	pub async fn run_string(self) -> Result<String> {137		let str = self.clone().into_string();138		let cmd = self.into_command();139		let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;140		Ok(v)141	}142143	pub async fn run_nix_string(self) -> Result<String> {144		let str = self.clone().into_string();145		let mut cmd = self.into_command();146		cmd.arg("--log-format").arg("internal-json");147		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await148	}149	pub async fn run_nix(self) -> Result<()> {150		let str = self.clone().into_string();151		let mut cmd = self.into_command();152		cmd.arg("--log-format").arg("internal-json");153		cmd.stdout(Stdio::inherit());154		run_nix_inner(str, cmd, &mut NixHandler::default()).await155	}156}157158struct EmptyAsyncRead;159impl AsyncRead for EmptyAsyncRead {160	fn poll_read(161		self: std::pin::Pin<&mut Self>,162		_cx: &mut std::task::Context<'_>,163		_buf: &mut tokio::io::ReadBuf<'_>,164	) -> Poll<std::io::Result<()>> {165		Poll::Pending166	}167}168169async fn run_nix_inner_stdout(170	str: String,171	cmd: Command,172	handler: &mut dyn Handler,173) -> Result<String> {174	Ok(run_nix_inner_raw(str, cmd, true, handler, None)175		.await?176		.expect("has out"))177}178async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {179	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;180	assert!(v.is_none());181	Ok(())182}183184pub trait Handler: Send {185	fn handle_line(&mut self, e: &str);186}187188pub struct ClonableHandler<H>(Arc<Mutex<H>>);189impl<H> Clone for ClonableHandler<H> {190	fn clone(&self) -> Self {191		Self(self.0.clone())192	}193}194impl<H> ClonableHandler<H> {195	pub fn new(inner: H) -> Self {196		Self(Arc::new(Mutex::new(inner)))197	}198}199impl<H: Handler> Handler for ClonableHandler<H> {200	fn handle_line(&mut self, e: &str) {201		self.0.lock().unwrap().handle_line(e)202	}203}204205struct PlainHandler;206impl Handler for PlainHandler {207	fn handle_line(&mut self, e: &str) {208		info!(target: "log", "{e}");209	}210}211212pub struct NoopHandler;213impl Handler for NoopHandler {214	fn handle_line(&mut self, _e: &str) {}215}216217#[derive(Default)]218pub struct NixHandler {219	spans: HashMap<u64, Span>,220}221impl Handler for NixHandler {222	fn handle_line(&mut self, e: &str) {223		if let Some(e) = e.strip_prefix("@nix ") {224			let log: NixLog = match serde_json::from_str(e) {225				Ok(l) => l,226				Err(err) => {227					warn!("failed to parse nix log line {:?}: {}", e, err);228					return;229				}230			};231			match log {232				NixLog::Msg { msg, raw_msg, .. } => {233					#[allow(clippy::nonminimal_bool)]234					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))235					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")236					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {237						if let Some(raw_msg) = raw_msg {238							if !msg.is_empty() {239								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())240							} else {241								info!(target: "nix", "{}", raw_msg.trim_end())242							}243						} else {244							info!(target: "nix", "{}", msg.trim_end())245						}246					}247				}248				NixLog::Start {249					ref fields,250					typ,251					id,252					..253				} if typ == 105 && !fields.is_empty() => {254					if let [LogField::String(drv), ..] = &fields[..] {255						let mut drv = drv.as_str();256						if let Some(pkg) = drv.strip_prefix("/nix/store/") {257							let mut it = pkg.splitn(2, '-');258							it.next();259							if let Some(pkg) = it.next() {260								drv = pkg;261							}262						}263						info!(target: "nix","building {}", drv);264						let span = info_span!("build", drv);265						span.pb_start();266						self.spans.insert(id, span);267					} else {268						warn!("bad build log: {:?}", log)269					}270				}271				NixLog::Start {272					ref fields,273					typ,274					id,275					..276				} if typ == 100 && fields.len() >= 3 => {277					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =278						&fields[..]279					{280						let mut drv = drv.as_str();281282						if let Some(pkg) = drv.strip_prefix("/nix/store/") {283							let mut it = pkg.splitn(2, '-');284							it.next();285							if let Some(pkg) = it.next() {286								drv = pkg;287							}288						}289						// info!(target: "nix","copying {} {} -> {}", drv, from, to);290						let span = info_span!("copy", from, to, drv);291						span.pb_start();292						self.spans.insert(id, span);293					} else {294						warn!("bad copy log: {:?}", log)295					}296				}297				NixLog::Start { text, typ, id, .. }298					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>299				{300					if !text.is_empty()301						&& text != "querying info about missing paths"302						&& text != "copying 0 paths"303					{304						let span = info_span!("job");305						span.pb_start();306						span.pb_set_message(text.trim());307						self.spans.insert(id, span);308						info!(target: "nix", "{}", text);309					}310				}311				NixLog::Start {312					text,313					level: 0,314					typ: 108,315					..316				} if text.is_empty() => {317					// Cache lookup? Coupled with copy log318				}319				NixLog::Start {320					text,321					level: 4,322					typ: 109,323					..324				} if text.starts_with("querying info about ") => {325					// Cache lookup326				}327				NixLog::Start {328					text,329					level: 4,330					typ: 101,331					..332				} if text.starts_with("downloading ") => {333					// NAR downloading, coupled with copy log334				}335				NixLog::Start {336					text,337					level: 1,338					typ: 111,339					..340				} if text.starts_with("waiting for a machine to build ") => {341					// Useless repeating notification about build342				}343				NixLog::Start {344					text,345					level: 3,346					typ: 111,347					..348				} if text.starts_with("resolved derivation: ") => {349					// CA resolved350				}351				NixLog::Start {352					text,353					level: 1,354					typ: 111,355					id,356					..357				} if text.starts_with("waiting for lock on ") => {358					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();359					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {360						drv = txt;361					}362					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {363						drv = txt;364					}365					if let Some(txt) = drv.split("', '").next() {366						drv = txt;367					}368					if let Some(pkg) = drv.strip_prefix("/nix/store/") {369						let mut it = pkg.splitn(2, '-');370						it.next();371						if let Some(pkg) = it.next() {372							drv = pkg;373						}374					}375					let span = info_span!("waiting on drv", drv);376					span.pb_start();377					self.spans.insert(id, span);378					// Concurrent build of the same message379				}380				NixLog::Stop { id, .. } => {381					self.spans.remove(&id);382				}383				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {384					if let Some(span) = self.spans.get(&id) {385						if let LogField::String(s) = &fields[0] {386							span.pb_set_message(s.trim());387						} else {388							warn!("bad fields: {fields:?}");389						}390					} else {391						warn!("unknown result id: {id} {typ} {fields:?}");392					}393					// dbg!(fields, id, typ);394				}395				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {396					if let Some(span) = self.spans.get(&id) {397						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =398							&fields[..4]399						{400							span.pb_set_length(*expected);401							span.pb_set_position(*done);402						} else {403							warn!("bad fields: {fields:?}");404						}405					} else {406						// warn!("unknown result id: {id} {typ} {fields:?}");407						// Unaccounted progress.408					}409					// dbg!(fields, id, typ);410				}411				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {412					// Set phase, expected413				}414				_ => warn!("unknown log: {:?}", log),415			};416		} else {417			let e = e.trim();418			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {419				return;420			}421			info!("{e}")422		}423	}424}425426async fn run_nix_inner_raw(427	str: String,428	mut cmd: Command,429	want_stdout: bool,430	err_handler: &mut dyn Handler,431	mut out_handler: Option<&mut dyn Handler>,432) -> Result<Option<String>> {433	cmd.stderr(Stdio::piped());434	cmd.stdout(Stdio::piped());435	let mut child = cmd.spawn()?;436	let mut stderr = child.stderr.take().unwrap();437	let stdout = child.stdout.take().unwrap();438	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());439	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));440	let mut ob = want_stdout441		.then(|| out.take().unwrap())442		.unwrap_or_else(|| Box::new(EmptyAsyncRead));443	let mut ol = (!want_stdout)444		.then(|| out.take().unwrap())445		.unwrap_or_else(|| Box::new(EmptyAsyncRead));446	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());447	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());448449	// while let Some(line) = read.next().await? {}450451	let mut out_buf = if want_stdout { Some(vec![]) } else { None };452	loop {453		select! {454			e = err.next() => {455				if let Some(e) = e {456					let e = e?;457					err_handler.handle_line(&e);458				}459			},460			o = ob.next() => {461				if let Some(o) = o {462					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);463				}464			},465			o = ol.next() => {466				if let Some(o) = o {467					let o = o?;468					if let Some(out) = out_handler.as_mut() {469						out.handle_line(&o)470					} else {471						err_handler.handle_line(&o)472					}473					// out_handler.handle_info(&o);474				}475			},476			code = child.wait() => {477				let code = code?;478				if !code.success() {479					anyhow::bail!("command '{str}' failed with status {}", code);480				}481				break;482			}483		}484	}485486	Ok(out_buf.map(String::from_utf8).transpose()?)487}488489pub trait ErrorRecorder: Send {490	/// Return true to discard message from logging491	fn push_message(&mut self, msg: &str) -> bool;492}493494#[derive(Debug)]495enum LogField {496	String(String),497	Num(u64),498}499500impl<'de> Deserialize<'de> for LogField {501	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>502	where503		D: serde::Deserializer<'de>,504	{505		struct StringOrNum;506		impl<'de> Visitor<'de> for StringOrNum {507			type Value = LogField;508509			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {510				write!(f, "string or unsigned")511			}512513			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>514			where515				E: serde::de::Error,516			{517				Ok(LogField::String(v.to_owned()))518			}519520			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>521			where522				E: serde::de::Error,523			{524				Ok(LogField::Num(v))525			}526		}527528		deserializer.deserialize_any(StringOrNum)529	}530}531532#[derive(Deserialize, Debug)]533#[serde(rename_all = "camelCase", tag = "action")]534#[allow(dead_code)]535enum NixLog {536	Msg {537		level: u32,538		msg: String,539		raw_msg: Option<String>,540	},541	Start {542		id: u64,543		level: u32,544		#[serde(default)]545		fields: Vec<LogField>,546		text: String,547		#[serde(rename = "type")]548		typ: u32,549	},550	Stop {551		id: u64,552	},553	Result {554		id: u64,555		#[serde(rename = "type")]556		typ: u32,557		#[serde(default)]558		fields: Vec<LogField>,559	},560}