git.delta.rocks / jrsonnet / refs/commits / 904d12180e52

difftreelog

source

cmds/fleet/src/command.rs18.8 KiBsourcehistory
1use std::{2	collections::HashMap,3	ffi::OsStr,4	pin,5	process::Stdio,6	sync::{Arc, Mutex},7	task::Poll,8};910use anyhow::{anyhow, Result};11use futures::StreamExt;12use itertools::Either;13use once_cell::sync::Lazy;14use openssh::{OverSsh, OwningCommand, Session};15use regex::Regex;16use serde::{de::Visitor, Deserialize};17use tokio::{io::AsyncRead, process::Command, select};18use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};19use tracing::{info, info_span, warn, Span};20use tracing_indicatif::span_ext::IndicatifSpanExt;2122fn escape_bash(input: &str, out: &mut String) {23	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";24	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {25		out.push_str(input);26		return;27	}28	out.push('\'');29	for (i, v) in input.split('\'').enumerate() {30		if i != 0 {31			out.push_str("'\"'\"'");32		}33		out.push_str(v);34	}35	out.push('\'');36}37fn ostoutf8(os: impl AsRef<OsStr>) -> String {38	os.as_ref().to_str().expect("non-utf8 data").to_owned()39}40#[derive(Clone)]41pub struct MyCommand {42	command: String,43	args: Vec<String>,44	env: Vec<(String, String)>,45	ssh_session: Option<Arc<Session>>,46}47impl MyCommand {48	pub fn new_on(cmd: impl AsRef<OsStr>, session: Arc<Session>) -> Self {49		assert!(!cmd.as_ref().is_empty());50		Self {51			command: ostoutf8(cmd),52			args: vec![],53			env: vec![],54			ssh_session: Some(session),55		}56	}57	pub fn new(cmd: impl AsRef<OsStr>) -> Self {58		assert!(!cmd.as_ref().is_empty());59		Self {60			command: ostoutf8(cmd),61			args: vec![],62			env: vec![],63			ssh_session: None,64		}65	}66	fn into_args(self) -> Vec<String> {67		let mut out = Vec::new();68		if !self.env.is_empty() {69			out.push("env".to_owned());70			for (k, v) in self.env {71				assert!(!k.contains('='));72				out.push(format!("{k}={v}"));73			}74		}75		out.push(self.command);76		out.extend(self.args);77		out78	}7980	/// Translates environment variables into env command execution.81	/// Required for ssh, as ssh don't allow to send environment variables (at least by default).82	///83	/// FIXME: Insecure, as arguments might be seen by other users on the same machine.84	/// Figure out some way to transfer environment using stdio?85	fn translate_env_into_env(self) -> Self {86		if self.env.is_empty() {87			return self;88		}89		let mut out = Self::new("env");90		if let Some(session) = self.ssh_session {91			out = out.ssh_session(session);92		}93		for (k, v) in self.env {94			assert!(!k.contains('='));95			out.arg(format!("{k}={v}"));96		}97		out.arg(self.command);98		out.args(self.args);99100		out101	}102	fn into_string(self) -> String {103		let mut out = String::new();104		if !self.env.is_empty() {105			out.push_str("env");106			for (k, v) in self.env {107				out.push(' ');108				assert!(!k.contains('='));109				escape_bash(&k, &mut out);110				out.push('=');111				escape_bash(&v, &mut out);112			}113		}114		if !out.is_empty() {115			out.push(' ');116		}117		escape_bash(&self.command, &mut out);118		for arg in self.args {119			out.push(' ');120			escape_bash(&arg, &mut out);121		}122		out123	}124	fn into_command(self) -> Command {125		let mut out = Command::new(self.command);126		out.args(self.args);127		for (k, v) in self.env {128			out.env(k, v);129		}130		out131	}132	fn into_command_new(self) -> Result<Either<Command, openssh::OwningCommand<Arc<Session>>>> {133		Ok(if let Some(session) = self.ssh_session.clone() {134			let cmd = self.translate_env_into_env().into_command();135			Either::Right(136				cmd.over_ssh(session)137					.map_err(|e| anyhow!("ssh error: {e}"))?,138			)139		} else {140			let cmd = self.into_command();141			Either::Left(cmd)142		})143	}144	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {145		let arg = arg.as_ref();146		self.args.push(ostoutf8(arg));147		self148	}149	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {150		let arg = arg.as_ref();151		let value = value.as_ref();152		let arg = ostoutf8(arg);153		let value = ostoutf8(value);154		self.arg(format!("{arg}={value}"));155		self156	}157	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {158		self.arg(arg);159		self.arg(value);160		self161	}162	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {163		self.env164			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));165		self166	}167	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {168		for arg in args.into_iter() {169			let arg = arg.as_ref();170			self.args.push(ostoutf8(arg));171		}172		self173	}174	pub fn sudo(mut self) -> Self {175		if std::env::var_os("NO_SUDO").is_some() {176			let mut out = Self::new("su");177			out.ssh_session = self.ssh_session.take();178			out.arg("-c").arg(self.into_string());179			out180		} else {181			let mut out = Self::new("sudo");182			out.args(self.into_args());183			out184		}185	}186	pub fn ssh_session(mut self, on: Arc<Session>) -> Self {187		self.ssh_session = Some(on);188		self189	}190	pub fn ssh(mut self, on: impl AsRef<OsStr>) -> Self {191		let mut out = Self::new("ssh");192		out.ssh_session = self.ssh_session.take();193		out.arg(on).arg("--");194		out.arg(self.into_string());195		out196	}197198	pub async fn run(self) -> Result<()> {199		let str = self.clone().into_string();200		let cmd = self.into_command_new()?;201		match cmd {202			Either::Left(cmd) => run_nix_inner(str, cmd, &mut PlainHandler).await?,203			Either::Right(cmd) => run_nix_inner_ssh(str, cmd, &mut PlainHandler).await?,204		};205		Ok(())206	}207	pub async fn run_string(self) -> Result<String> {208		let bytes = self.run_bytes().await?;209		Ok(String::from_utf8(bytes)?)210	}211	pub async fn run_bytes(self) -> Result<Vec<u8>> {212		let str = self.clone().into_string();213		let cmd = self.into_command_new()?;214		let v = match cmd {215			Either::Left(cmd) => run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?,216			Either::Right(cmd) => run_nix_inner_stdout_ssh(str, cmd, &mut PlainHandler).await?,217		};218		Ok(v)219	}220221	pub async fn run_nix_string(self) -> Result<String> {222		let str = self.clone().into_string();223		let mut cmd = self.into_command();224		cmd.arg("--log-format").arg("internal-json");225		let bytes = run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await?;226		Ok(String::from_utf8(bytes)?)227	}228	pub async fn run_nix(self) -> Result<()> {229		let str = self.clone().into_string();230		let mut cmd = self.into_command();231		cmd.arg("--log-format").arg("internal-json");232		cmd.stdout(Stdio::inherit());233		run_nix_inner(str, cmd, &mut NixHandler::default()).await234	}235}236237struct EmptyAsyncRead;238impl AsyncRead for EmptyAsyncRead {239	fn poll_read(240		self: std::pin::Pin<&mut Self>,241		_cx: &mut std::task::Context<'_>,242		_buf: &mut tokio::io::ReadBuf<'_>,243	) -> Poll<std::io::Result<()>> {244		Poll::Pending245	}246}247248async fn run_nix_inner_stdout(249	str: String,250	cmd: Command,251	handler: &mut dyn Handler,252) -> Result<Vec<u8>> {253	Ok(run_nix_inner_raw(str, cmd, true, handler, None)254		.await?255		.expect("has out"))256}257async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {258	let v = run_nix_inner_raw(str, cmd, false, handler, None).await?;259	assert!(v.is_none());260	Ok(())261}262async fn run_nix_inner_stdout_ssh(263	str: String,264	cmd: OwningCommand<Arc<Session>>,265	handler: &mut dyn Handler,266) -> Result<Vec<u8>> {267	Ok(run_nix_inner_raw_ssh(str, cmd, true, handler, None)268		.await?269		.expect("has out"))270}271async fn run_nix_inner_ssh(272	str: String,273	cmd: OwningCommand<Arc<Session>>,274	handler: &mut dyn Handler,275) -> Result<()> {276	let v = run_nix_inner_raw_ssh(str, cmd, false, handler, None).await?;277	assert!(v.is_none());278	Ok(())279}280281pub trait Handler: Send {282	fn handle_line(&mut self, e: &str);283}284285pub struct ClonableHandler<H>(Arc<Mutex<H>>);286impl<H> Clone for ClonableHandler<H> {287	fn clone(&self) -> Self {288		Self(self.0.clone())289	}290}291impl<H> ClonableHandler<H> {292	pub fn new(inner: H) -> Self {293		Self(Arc::new(Mutex::new(inner)))294	}295}296impl<H: Handler> Handler for ClonableHandler<H> {297	fn handle_line(&mut self, e: &str) {298		self.0.lock().unwrap().handle_line(e)299	}300}301302struct PlainHandler;303impl Handler for PlainHandler {304	fn handle_line(&mut self, e: &str) {305		info!(target: "log", "{e}");306	}307}308309pub struct NoopHandler;310impl Handler for NoopHandler {311	fn handle_line(&mut self, _e: &str) {}312}313314#[derive(Default)]315pub struct NixHandler {316	spans: HashMap<u64, Span>,317}318fn process_message(m: &str) -> String {319	static OSC_CLEANER: Lazy<Regex> =320		Lazy::new(|| Regex::new(r"\x1B\]([^\x07\x1C]*[\x07\x1C])?|\r").unwrap());321	static DETABBER: Lazy<Regex> = Lazy::new(|| Regex::new(r"\t").unwrap());322	let m = OSC_CLEANER.replace_all(m, "");323	// Indicatif can't format tabs. This is not the correct tab formatting, as correct one should be aligned,324	// and not just be replaced with the constant number of spaces, but it's ok for now, as statuses are single-line.325	DETABBER.replace_all(m.as_ref(), "  ").to_string()326}327impl Handler for NixHandler {328	fn handle_line(&mut self, e: &str) {329		if let Some(e) = e.strip_prefix("@nix ") {330			let log: NixLog = match serde_json::from_str(e) {331				Ok(l) => l,332				Err(err) => {333					warn!("failed to parse nix log line {:?}: {}", e, err);334					return;335				}336			};337			match log {338				NixLog::Msg { msg, raw_msg, .. } => {339					#[allow(clippy::nonminimal_bool)]340					if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))341					&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")342					&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {343						if let Some(raw_msg) = raw_msg {344							if !msg.is_empty() {345								info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())346							} else {347								info!(target: "nix", "{}", raw_msg.trim_end())348							}349						} else {350							info!(target: "nix", "{}", msg.trim_end())351						}352					}353				}354				NixLog::Start {355					ref fields,356					typ,357					id,358					..359				} if typ == 105 && !fields.is_empty() => {360					if let [LogField::String(drv), ..] = &fields[..] {361						let mut drv = drv.as_str();362						if let Some(pkg) = drv.strip_prefix("/nix/store/") {363							let mut it = pkg.splitn(2, '-');364							it.next();365							if let Some(pkg) = it.next() {366								drv = pkg;367							}368						}369						info!(target: "nix","building {}", drv);370						let span = info_span!("build", drv);371						span.pb_start();372						self.spans.insert(id, span);373					} else {374						warn!("bad build log: {:?}", log)375					}376				}377				NixLog::Start {378					ref fields,379					typ,380					id,381					..382				} if typ == 100 && fields.len() >= 3 => {383					if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =384						&fields[..]385					{386						let mut drv = drv.as_str();387388						if let Some(pkg) = drv.strip_prefix("/nix/store/") {389							let mut it = pkg.splitn(2, '-');390							it.next();391							if let Some(pkg) = it.next() {392								drv = pkg;393							}394						}395						// info!(target: "nix","copying {} {} -> {}", drv, from, to);396						let span = info_span!("copy", from, to, drv);397						span.pb_start();398						self.spans.insert(id, span);399					} else {400						warn!("bad copy log: {:?}", log)401					}402				}403				NixLog::Start { text, typ, id, .. }404					if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>405				{406					if !text.is_empty()407						&& text != "querying info about missing paths"408						&& text != "copying 0 paths"409						// Too much spam on lazy-trees branch410						&& !(text.starts_with("copying '") && text.ends_with("' to the store"))411					{412						let span = info_span!("job");413						span.pb_start();414						span.pb_set_message(&process_message(text.trim()));415						self.spans.insert(id, span);416						info!(target: "nix", "{}", text);417					}418				}419				NixLog::Start {420					text,421					level: 0,422					typ: 108,423					..424				} if text.is_empty() => {425					// Cache lookup? Coupled with copy log426				}427				NixLog::Start {428					text,429					level: 4,430					typ: 109,431					..432				} if text.starts_with("querying info about ") => {433					// Cache lookup434				}435				NixLog::Start {436					text,437					level: 4,438					typ: 101,439					..440				} if text.starts_with("downloading ") => {441					// NAR downloading, coupled with copy log442				}443				NixLog::Start {444					text,445					level: 1,446					typ: 111,447					..448				} if text.starts_with("waiting for a machine to build ") => {449					// Useless repeating notification about build450				}451				NixLog::Start {452					text,453					level: 3,454					typ: 111,455					..456				} if text.starts_with("resolved derivation: ") => {457					// CA resolved458				}459				NixLog::Start {460					text,461					level: 1,462					typ: 111,463					id,464					..465				} if text.starts_with("waiting for lock on ") => {466					let mut drv = text.strip_prefix("waiting for lock on ").unwrap();467					if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {468						drv = txt;469					}470					if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {471						drv = txt;472					}473					if let Some(txt) = drv.split("', '").next() {474						drv = txt;475					}476					if let Some(pkg) = drv.strip_prefix("/nix/store/") {477						let mut it = pkg.splitn(2, '-');478						it.next();479						if let Some(pkg) = it.next() {480							drv = pkg;481						}482					}483					let span = info_span!("waiting on drv", drv);484					span.pb_start();485					self.spans.insert(id, span);486					// Concurrent build of the same message487				}488				NixLog::Stop { id, .. } => {489					self.spans.remove(&id);490				}491				NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {492					if let Some(span) = self.spans.get(&id) {493						if let LogField::String(s) = &fields[0] {494							span.pb_set_message(&process_message(s.trim()));495						} else {496							warn!("bad fields: {fields:?}");497						}498					} else {499						warn!("unknown result id: {id} {typ} {fields:?}");500					}501					// dbg!(fields, id, typ);502				}503				NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {504					if let Some(span) = self.spans.get(&id) {505						if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =506							&fields[..4]507						{508							span.pb_set_length(*expected);509							span.pb_set_position(*done);510						} else {511							warn!("bad fields: {fields:?}");512						}513					} else {514						// warn!("unknown result id: {id} {typ} {fields:?}");515						// Unaccounted progress.516					}517					// dbg!(fields, id, typ);518				}519				NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {520					// Set phase, expected521				}522				_ => warn!("unknown log: {:?}", log),523			};524		} else {525			let e = e.trim();526			if e.starts_with("Failed tcsetattr(TCSADRAIN): ") {527				return;528			}529			info!("{e}")530		}531	}532}533534async fn run_nix_inner_raw(535	str: String,536	mut cmd: Command,537	want_stdout: bool,538	err_handler: &mut dyn Handler,539	mut out_handler: Option<&mut dyn Handler>,540) -> Result<Option<Vec<u8>>> {541	cmd.stderr(Stdio::piped());542	cmd.stdout(Stdio::piped());543	let mut child = cmd.spawn()?;544	let mut stderr = child.stderr.take().unwrap();545	let stdout = child.stdout.take().unwrap();546	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());547	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));548	let mut ob = want_stdout549		.then(|| out.take().unwrap())550		.unwrap_or_else(|| Box::new(EmptyAsyncRead));551	let mut ol = (!want_stdout)552		.then(|| out.take().unwrap())553		.unwrap_or_else(|| Box::new(EmptyAsyncRead));554	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());555	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());556557	// while let Some(line) = read.next().await? {}558559	let mut out_buf = if want_stdout { Some(vec![]) } else { None };560	loop {561		select! {562			e = err.next() => {563				if let Some(e) = e {564					let e = e?;565					err_handler.handle_line(&e);566				}567			},568			o = ob.next() => {569				if let Some(o) = o {570					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);571				}572			},573			o = ol.next() => {574				if let Some(o) = o {575					let o = o?;576					if let Some(out) = out_handler.as_mut() {577						out.handle_line(&o)578					} else {579						err_handler.handle_line(&o)580					}581					// out_handler.handle_info(&o);582				}583			},584			code = child.wait() => {585				let code = code?;586				if !code.success() {587					anyhow::bail!("command '{str}' failed with status {}", code);588				}589				break;590			}591		}592	}593594	Ok(out_buf)595}596async fn run_nix_inner_raw_ssh(597	str: String,598	mut cmd: OwningCommand<Arc<Session>>,599	want_stdout: bool,600	err_handler: &mut dyn Handler,601	mut out_handler: Option<&mut dyn Handler>,602) -> Result<Option<Vec<u8>>> {603	cmd.stderr(openssh::Stdio::piped());604	cmd.stdout(openssh::Stdio::piped());605	let mut child = cmd.spawn().await?;606	let mut stderr = child.stderr().take().unwrap();607	let stdout = child.stdout().take().unwrap();608	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());609	let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));610	let mut ob = want_stdout611		.then(|| out.take().unwrap())612		.unwrap_or_else(|| Box::new(EmptyAsyncRead));613	let mut ol = (!want_stdout)614		.then(|| out.take().unwrap())615		.unwrap_or_else(|| Box::new(EmptyAsyncRead));616	let mut ob = FramedRead::new(&mut ob, BytesCodec::new());617	let mut ol = FramedRead::new(&mut ol, LinesCodec::new());618619	// while let Some(line) = read.next().await? {}620621	let mut out_buf = if want_stdout { Some(vec![]) } else { None };622623	let mut wait_future = pin::pin!(child.wait());624	loop {625		select! {626			e = err.next() => {627				if let Some(e) = e {628					let e = e?;629					err_handler.handle_line(&e);630				}631			},632			o = ob.next() => {633				if let Some(o) = o {634					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);635				}636			},637			o = ol.next() => {638				if let Some(o) = o {639					let o = o?;640					if let Some(out) = out_handler.as_mut() {641						out.handle_line(&o)642					} else {643						err_handler.handle_line(&o)644					}645					// out_handler.handle_info(&o);646				}647			},648			code = &mut wait_future => {649				let code = code?;650				if !code.success() {651					anyhow::bail!("command '{str}' failed with status {}", code);652				}653				break;654			}655		}656	}657658	Ok(out_buf)659}660661pub trait ErrorRecorder: Send {662	/// Return true to discard message from logging663	fn push_message(&mut self, msg: &str) -> bool;664}665666#[derive(Debug)]667enum LogField {668	String(String),669	Num(u64),670}671672impl<'de> Deserialize<'de> for LogField {673	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>674	where675		D: serde::Deserializer<'de>,676	{677		struct StringOrNum;678		impl<'de> Visitor<'de> for StringOrNum {679			type Value = LogField;680681			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {682				write!(f, "string or unsigned")683			}684685			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>686			where687				E: serde::de::Error,688			{689				Ok(LogField::String(v.to_owned()))690			}691692			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>693			where694				E: serde::de::Error,695			{696				Ok(LogField::Num(v))697			}698		}699700		deserializer.deserialize_any(StringOrNum)701	}702}703704#[derive(Deserialize, Debug)]705#[serde(rename_all = "camelCase", tag = "action")]706#[allow(dead_code)]707enum NixLog {708	Msg {709		level: u32,710		msg: String,711		raw_msg: Option<String>,712	},713	Start {714		id: u64,715		level: u32,716		#[serde(default)]717		fields: Vec<LogField>,718		text: String,719		#[serde(rename = "type")]720		typ: u32,721	},722	Stop {723		id: u64,724	},725	Result {726		id: u64,727		#[serde(rename = "type")]728		typ: u32,729		#[serde(default)]730		fields: Vec<LogField>,731	},732}