git.delta.rocks / jrsonnet / refs/commits / 4340a04aa508

difftreelog

source

cmds/fleet/src/command.rs13.0 KiBsourcehistory
1use std::{collections::HashMap, ffi::OsStr, process::Stdio, task::Poll};23use anyhow::{Context, Result};4use futures::StreamExt;5use serde::{6	de::{DeserializeOwned, Visitor},7	Deserialize,8};9use tokio::{io::AsyncRead, process::Command, select};10use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};11use tracing::{info, info_span, warn, Span};12use tracing_indicatif::span_ext::IndicatifSpanExt;1314fn escape_bash(input: &str, out: &mut String) {15	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17		out.push_str(input);18		return;19	}20	out.push('\'');21	for (i, v) in input.split('\'').enumerate() {22		if i != 0 {23			out.push_str("'\"'\"'");24		}25		out.push_str(v);26	}27	out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30	os.as_ref().to_str().expect("non-utf8 data").to_owned()31}32#[derive(Clone)]33pub struct MyCommand {34	command: String,35	args: Vec<String>,36	env: Vec<(String, String)>,37}38impl MyCommand {39	pub fn new(cmd: impl AsRef<OsStr>) -> Self {40		assert!(!cmd.as_ref().is_empty());41		Self {42			command: ostoutf8(cmd),43			args: vec![],44			env: vec![],45		}46	}47	fn into_args(self) -> Vec<String> {48		let mut out = Vec::new();49		if !self.env.is_empty() {50			out.push("env".to_owned());51			for (k, v) in self.env {52				assert!(!k.contains("="));53				out.push(format!("{k}={v}"));54			}55		}56		out.push(self.command);57		out.extend(self.args.into_iter());58		out59	}60	fn into_string(self) -> String {61		let mut out = String::new();62		if !self.env.is_empty() {63			out.push_str("env");64			for (k, v) in self.env {65				out.push(' ');66				assert!(!k.contains("="));67				escape_bash(&k, &mut out);68				out.push('=');69				escape_bash(&v, &mut out);70			}71		}72		if !out.is_empty() {73			out.push(' ');74		}75		escape_bash(&self.command, &mut out);76		for arg in self.args {77			out.push(' ');78			escape_bash(&arg, &mut out);79		}80		out81	}82	fn into_command(self) -> Command {83		let mut out = Command::new(self.command);84		out.args(self.args);85		for (k, v) in self.env {86			out.env(k, v);87		}88		out89	}90	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {91		let arg = arg.as_ref();92		self.args.push(ostoutf8(arg));93		self94	}95	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {96		let arg = arg.as_ref();97		let value = value.as_ref();98		let arg = ostoutf8(arg);99		let value = ostoutf8(value);100		self.arg(format!("{arg}={value}"));101		self102	}103	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {104		self.arg(arg);105		self.arg(value);106		self107	}108	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {109		for arg in args.into_iter() {110			let arg = arg.as_ref();111			self.args.push(ostoutf8(arg));112		}113		self114	}115	pub fn sudo(self) -> Self {116		let mut out = Self::new("sudo");117		out.args(self.into_args());118		out119	}120	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {121		let mut out = Self::new("ssh");122		out.arg(on).arg("--");123		out.arg(self.into_string());124		out125	}126127	pub async fn run(self) -> Result<()> {128		let str = self.clone().into_string();129		let cmd = self.into_command();130		run_nix_inner(str, cmd, &mut PlainHandler).await?;131		Ok(())132	}133	pub async fn run_string(self) -> Result<String> {134		let str = self.clone().into_string();135		let cmd = self.into_command();136		let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;137		Ok(v)138	}139	pub async fn run_nix_json<T: DeserializeOwned>(self) -> Result<T> {140		let str = self.run_nix_string().await?;141		serde_json::from_str(&str).with_context(|| format!("{:?}", str))142	}143144	pub async fn run_nix_string(self) -> Result<String> {145		let str = self.clone().into_string();146		let cmd = self.into_command();147		run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await148	}149	pub async fn run_nix(self) -> Result<()> {150		let str = self.clone().into_string();151		let mut cmd = self.into_command();152		cmd.stdout(Stdio::inherit());153		run_nix_inner(str, cmd, &mut NixHandler::default()).await154	}155}156157struct EmptyAsyncRead;158impl AsyncRead for EmptyAsyncRead {159	fn poll_read(160		self: std::pin::Pin<&mut Self>,161		_cx: &mut std::task::Context<'_>,162		_buf: &mut tokio::io::ReadBuf<'_>,163	) -> Poll<std::io::Result<()>> {164		Poll::Pending165	}166}167168async fn run_nix_inner_stdout(169	str: String,170	cmd: Command,171	handler: &mut dyn Handler,172) -> Result<String> {173	Ok(run_nix_inner_raw(str, cmd, true, handler)174		.await?175		.expect("has out"))176}177async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {178	let v = run_nix_inner_raw(str, cmd, false, handler).await?;179	assert!(v.is_none());180	Ok(())181}182183trait Handler {184	fn handle_err(&mut self, e: &str);185	fn handle_info(&mut self, e: &str);186}187188struct PlainHandler;189impl Handler for PlainHandler {190	fn handle_err(&mut self, e: &str) {191		info!(target: "log", "{e}");192	}193194	fn handle_info(&mut self, e: &str) {195		info!(target: "log", "{e}");196	}197}198199#[derive(Default)]200struct NixHandler {201	spans: HashMap<u64, Span>,202}203impl Handler for NixHandler {204	fn handle_err(&mut self, e: &str) {205		if let Some(e) = e.strip_prefix("@nix ") {206			let log: NixLog = match serde_json::from_str(e) {207				Ok(l) => l,208				Err(err) => {209					warn!("failed to parse nix log line {:?}: {}", e, err);210					return;211				}212			};213			match log {214				NixLog::Msg { msg, raw_msg, .. } => {215					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))216					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")217					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {218						if let Some(raw_msg) = raw_msg {219							if !msg.is_empty() {220								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())221							} else {222								info!(target: "nix", "{}", raw_msg.trim_end())223							}224						} else {225							info!(target: "nix", "{}", msg.trim_end())226						}227					}228				}229				NixLog::Start {230					ref fields,231					typ,232					id,233					..234				} if typ == 105 && !fields.is_empty() => {235					if let [LogField::String(drv), ..] = &fields[..] {236						let mut drv = drv.as_str();237						if let Some(pkg) = drv.strip_prefix("/nix/store/") {238							let mut it = pkg.splitn(2, '-');239							it.next();240							if let Some(pkg) = it.next() {241								drv = pkg;242							}243						}244						info!(target: "nix","building {}", drv);245						let span = info_span!("build", drv);246						span.pb_start();247						self.spans.insert(id, span);248					} else {249						warn!("bad build log: {:?}", log)250					}251				}252				NixLog::Start {253					ref fields,254					typ,255					id,256					..257				} if typ == 100 && fields.len() >= 3 => {258					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =259						&fields[..]260					{261						let mut drv = drv.as_str();262263						if let Some(pkg) = drv.strip_prefix("/nix/store/") {264							let mut it = pkg.splitn(2, '-');265							it.next();266							if let Some(pkg) = it.next() {267								drv = pkg;268							}269						}270						info!(target: "nix","copying {} {} -> {}", drv, from, to);271						let span = info_span!("copy", from, to, drv);272						span.pb_start();273						self.spans.insert(id, span);274					} else {275						warn!("bad copy log: {:?}", log)276					}277				}278				NixLog::Start { text, typ, id, .. }279					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>280				{281					if !text.is_empty()282						&& text != "querying info about missing paths"283						&& text != "copying 0 paths"284					{285						let span = info_span!("job");286						span.pb_start();287						span.pb_set_message(text.trim());288						self.spans.insert(id, span);289						info!(target: "nix", "{}", text);290					}291				}292				NixLog::Start {293					text,294					level: 0,295					typ: 108,296					..297				} if text.is_empty() => {298					// Cache lookup? Coupled with copy log299				}300				NixLog::Start {301					text,302					level: 4,303					typ: 109,304					..305				} if text.starts_with("querying info about ") => {306					// Cache lookup307				}308				NixLog::Start {309					text,310					level: 4,311					typ: 101,312					..313				} if text.starts_with("downloading ") => {314					// NAR downloading, coupled with copy log315				}316				NixLog::Start {317					text,318					level: 1,319					typ: 111,320					..321				} if text.starts_with("waiting for a machine to build ") => {322					// Useless repeating notification about build323				}324				NixLog::Start {325					text,326					level: 3,327					typ: 111,328					..329				} if text.starts_with("resolved derivation: ") => {330					// CA resolved331				}332				NixLog::Start {333					text,334					level: 1,335					typ: 111,336					id,337					..338				} if text.starts_with("waiting for lock on ") => {339					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();340					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {341						drv = txt;342					}343					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {344						drv = txt;345					}346					if let Some(txt) = drv.split("', '").next() {347						drv = txt;348					}349					if let Some(pkg) = drv.strip_prefix("/nix/store/") {350						let mut it = pkg.splitn(2, '-');351						it.next();352						if let Some(pkg) = it.next() {353							drv = pkg;354						}355					}356					let span = info_span!("waiting on drv", drv);357					span.pb_start();358					self.spans.insert(id, span);359					// Concurrent build of the same message360				}361				NixLog::Stop { id, .. } => {362					self.spans.remove(&id);363				}364				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {365					if let Some(span) = self.spans.get(&id) {366						if let LogField::String(s) = &fields[0] {367							span.pb_set_message(s.trim());368						} else {369							warn!("bad fields: {fields:?}");370						}371					} else {372						warn!("unknown result id: {id} {typ} {fields:?}");373					}374					// dbg!(fields, id, typ);375				}376				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {377					if let Some(span) = self.spans.get(&id) {378						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =379							&fields[..4]380						{381							span.pb_set_length(*expected);382							span.pb_set_position(*done);383						} else {384							warn!("bad fields: {fields:?}");385						}386					} else {387						// warn!("unknown result id: {id} {typ} {fields:?}");388						// Unaccounted progress.389					}390					// dbg!(fields, id, typ);391				}392				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {393					// Set phase, expected394				}395				_ => warn!("unknown log: {:?}", log),396			};397		} else {398			warn!(target = "nix", "unknown: {}", e.trim())399		}400	}401	fn handle_info(&mut self, o: &str) {402		self.handle_err(o)403	}404}405406async fn run_nix_inner_raw(407	str: String,408	mut cmd: Command,409	want_stdout: bool,410	handler: &mut dyn Handler,411) -> Result<Option<String>> {412	info!("running {str}");413	cmd.arg("--log-format").arg("internal-json");414	cmd.stderr(Stdio::piped());415	cmd.stdout(Stdio::piped());416	let mut child = cmd.spawn()?;417	let mut stderr = child.stderr.take().unwrap();418	let stdout = child.stdout.take().unwrap();419	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());420	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));421	let mut ob = want_stdout422		.then(|| out.take().unwrap())423		.unwrap_or_else(|| Box::new(EmptyAsyncRead));424	let mut ol = (!want_stdout)425		.then(|| out.take().unwrap())426		.unwrap_or_else(|| Box::new(EmptyAsyncRead));427	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());428	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());429430	// while let Some(line) = read.next().await? {}431432	let mut out_buf = if want_stdout { Some(vec![]) } else { None };433	loop {434		select! {435			e = err.next() => {436				if let Some(e) = e {437					let e = e?;438					handler.handle_err(&e);439				}440			},441			o = ob.next() => {442				if let Some(o) = o {443					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);444				}445			},446			o = ol.next() => {447				if let Some(o) = o {448					let o = o?;449					handler.handle_info(&o);450				}451			},452			code = child.wait() => {453				let code = code?;454				if !code.success() {455					anyhow::bail!("command '{str}' failed with status {}", code);456				}457				break;458			}459		}460	}461462	Ok(out_buf.map(String::from_utf8).transpose()?)463}464465#[derive(Debug)]466enum LogField {467	String(String),468	Num(u64),469}470471impl<'de> Deserialize<'de> for LogField {472	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>473	where474		D: serde::Deserializer<'de>,475	{476		struct StringOrNum;477		impl<'de> Visitor<'de> for StringOrNum {478			type Value = LogField;479480			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {481				write!(f, "string or unsigned")482			}483484			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>485			where486				E: serde::de::Error,487			{488				Ok(LogField::String(v.to_owned()))489			}490491			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>492			where493				E: serde::de::Error,494			{495				Ok(LogField::Num(v))496			}497		}498499		deserializer.deserialize_any(StringOrNum)500	}501}502503#[derive(Deserialize, Debug)]504#[serde(rename_all = "camelCase", tag = "action")]505#[allow(dead_code)]506enum NixLog {507	Msg {508		level: u32,509		msg: String,510		raw_msg: Option<String>,511	},512	Start {513		id: u64,514		level: u32,515		#[serde(default)]516		fields: Vec<LogField>,517		text: String,518		#[serde(rename = "type")]519		typ: u32,520	},521	Stop {522		id: u64,523	},524	Result {525		id: u64,526		#[serde(rename = "type")]527		typ: u32,528		#[serde(default)]529		fields: Vec<LogField>,530	},531}