git.delta.rocks / jrsonnet / refs/commits / 03c441d805e8

difftreelog

source

cmds/fleet/src/command.rs10.1 KiBsourcehistory
1use std::{ffi::OsStr, process::Stdio, task::Poll};23use anyhow::{Context, Result};4use async_trait::async_trait;5use futures::StreamExt;6use serde::{7	de::{DeserializeOwned, Visitor},8	Deserialize,9};10use tokio::{io::AsyncRead, process::Command, select};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{info, warn};1314fn escape_bash(input: &str, out: &mut String) {15	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17		out.push_str(input);18		return;19	}20	out.push('\'');21	for (i, v) in input.split('\'').enumerate() {22		if i != 0 {23			out.push_str("'\"'\"'");24		}25		out.push_str(v);26	}27	out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30	os.as_ref().to_str().expect("non-utf8 data").to_owned()31}32#[derive(Clone)]33pub struct MyCommand {34	command: String,35	args: Vec<String>,36	env: Vec<(String, String)>,37}38impl MyCommand {39	pub fn new(cmd: impl AsRef<OsStr>) -> Self {40		assert!(!cmd.as_ref().is_empty());41		Self {42			command: ostoutf8(cmd),43			args: vec![],44			env: vec![],45		}46	}47	fn into_args(self) -> Vec<String> {48		let mut out = Vec::new();49		if !self.env.is_empty() {50			out.push("env".to_owned());51			for (k, v) in self.env {52				assert!(!k.contains("="));53				out.push(format!("{k}={v}"));54			}55		}56		out.push(self.command);57		out.extend(self.args.into_iter());58		out59	}60	fn into_string(self) -> String {61		let mut out = String::new();62		if !self.env.is_empty() {63			out.push_str("env");64			for (k, v) in self.env {65				out.push(' ');66				assert!(!k.contains("="));67				escape_bash(&k, &mut out);68				out.push('=');69				escape_bash(&v, &mut out);70			}71		}72		if !out.is_empty() {73			out.push(' ');74		}75		escape_bash(&self.command, &mut out);76		for arg in self.args {77			out.push(' ');78			escape_bash(&arg, &mut out);79		}80		out81	}82	fn into_command(self) -> Command {83		let mut out = Command::new(self.command);84		out.args(self.args);85		for (k, v) in self.env {86			out.env(k, v);87		}88		out89	}90	pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {91		let arg = arg.as_ref();92		self.args.push(ostoutf8(arg));93		self94	}95	pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {96		let arg = arg.as_ref();97		let value = value.as_ref();98		let arg = ostoutf8(arg);99		let value = ostoutf8(value);100		self.arg(format!("{arg}={value}"));101		self102	}103	pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {104		self.arg(arg);105		self.arg(value);106		self107	}108	pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {109		for arg in args.into_iter() {110			let arg = arg.as_ref();111			self.args.push(ostoutf8(arg));112		}113		self114	}115	pub fn sudo(self) -> Self {116		let mut out = Self::new("sudo");117		out.args(self.into_args());118		out119	}120	pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {121		let mut out = Self::new("ssh");122		out.arg(on).arg("--");123		out.arg(self.into_string());124		out125	}126127	pub async fn run(self) -> Result<()> {128		let str = self.clone().into_string();129		info!("running {str}");130		let mut cmd = self.into_command();131		cmd.inherit_stdio();132		let out = cmd.spawn()?.wait_with_output().await?;133		if !out.status.success() {134			anyhow::bail!("command '{}' failed with status {}", str, out.status);135		}136		Ok(())137	}138	pub async fn run_string(self) -> Result<String> {139		let str = self.clone().into_string();140		info!("running {str}");141		let mut cmd = self.into_command();142		cmd.inherit_stdio();143		cmd.stdout(Stdio::piped());144		let out = cmd.spawn()?.wait_with_output().await?;145		if !out.status.success() {146			anyhow::bail!("command '{}' failed with status {}", str, out.status);147		}148		Ok(String::from_utf8(out.stdout)?)149	}150	pub async fn run_nix_json<T: DeserializeOwned>(self) -> Result<T> {151		let str = self.run_nix_string().await?;152		serde_json::from_str(&str).with_context(|| format!("{:?}", str))153	}154155	pub async fn run_nix_string(self) -> Result<String> {156		let str = self.clone().into_string();157		let mut cmd = self.into_command();158		cmd.stdout(Stdio::piped());159		run_nix_inner(str, cmd).await.map(|v| v.unwrap())160	}161	pub async fn run_nix(self) -> Result<()> {162		let str = self.clone().into_string();163		let mut cmd = self.into_command();164		cmd.stdout(Stdio::inherit());165		run_nix_inner(str, cmd).await.map(|v| {166			assert!(v.is_none());167		})168	}169}170171struct EmptyAsyncRead;172impl AsyncRead for EmptyAsyncRead {173	fn poll_read(174		self: std::pin::Pin<&mut Self>,175		_cx: &mut std::task::Context<'_>,176		_buf: &mut tokio::io::ReadBuf<'_>,177	) -> Poll<std::io::Result<()>> {178		Poll::Pending179	}180}181182async fn run_nix_inner(str: String, mut cmd: Command) -> Result<Option<String>> {183	info!("running {str}");184	cmd.arg("--log-format").arg("internal-json");185	cmd.stderr(Stdio::piped());186	let mut child = cmd.spawn()?;187	let mut stderr = child.stderr.take().unwrap();188	let stdout = child.stdout.take();189	let wants_stdout = stdout.is_some();190	let mut err = FramedRead::new(&mut stderr, LinesCodec::new());191	let mut out: Box<dyn AsyncRead + Unpin> = stdout192		.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin>)193		.unwrap_or_else(|| Box::new(EmptyAsyncRead));194	let mut out = FramedRead::new(&mut out, BytesCodec::new());195196	// while let Some(line) = read.next().await? {}197198	let mut out_buf = if wants_stdout { Some(vec![]) } else { None };199	loop {200		select! {201			e = err.next() => {202				if let Some(e) = e {203					let e = e?;204					if let Some(e) = e.strip_prefix("@nix ") {205206						let log: NixLog = match serde_json::from_str(e) {207							Ok(l) => l,208							Err(err) => {209								warn!("failed to parse nix log line {:?}: {}", e, err);210								continue;211							},212						};213						match log {214							NixLog::Msg { msg, raw_msg, .. } => {215								if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))216									&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")217									&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {218									if let Some(raw_msg) = raw_msg {219										info!(target: "nix", "{raw_msg}\n{msg}")220									}else {221										info!(target: "nix", "{msg}")222223									}224								}225							},226							NixLog::Start { ref fields, typ, .. } if typ == 105 && !fields.is_empty() => {227								if let [LogField::String(drv), ..] = &fields[..] {228									info!(target: "nix","building {}", drv)229								} else {230									warn!("bad build log: {:?}", log)231								}232							},233							NixLog::Start { ref fields, typ, .. } if typ == 100 && fields.len() >= 3 => {234								if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = &fields[..] {235									info!(target: "nix","copying {} {} -> {}", drv, from, to)236								} else {237									warn!("bad copy log: {:?}", log)238								}239							},240							NixLog::Start { text, typ, .. } if typ == 0 || typ == 102 || typ == 103 || typ == 104 => {241								if !text.is_empty() && text != "querying info about missing paths" && text != "copying 0 paths" {242									info!(target: "nix", "{}", text)243								}244							},245							NixLog::Start { text, level: 0, typ: 108, .. } if text.is_empty() => {246								// Cache lookup? Coupled with copy log247							},248							NixLog::Start { text, level: 4, typ: 109, .. } if text.starts_with("querying info about ") => {249								// Cache lookup250							}251							NixLog::Start { text, level: 4, typ: 101, .. } if text.starts_with("downloading ") => {252								// NAR downloading, coupled with copy log253							}254							NixLog::Start { text, level: 1, typ: 111, .. } if text.starts_with("waiting for a machine to build ") => {255								// Useless repeating notification about build256							}257							NixLog::Start { text, level: 3, typ: 111, .. } if text.starts_with("resolved derivation: ") => {258								// CA resolved259							}260							NixLog::Start { text, level: 1, typ: 111, .. } if text.starts_with("waiting for lock on ") => {261								// Concurrent build of the same message262							}263							NixLog::Stop { .. } => {},264							NixLog::Result { .. } => {},265							_ => warn!("unknown log: {:?}", log)266						};267					} else {268						warn!(target="nix","unknown: {}", e)269					}270				}271			},272			o = out.next() => {273				if let Some(o) = o {274					out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);275				}276			},277			code = child.wait() => {278				let code = code?;279				if !code.success() {280					anyhow::bail!("command '{str}' failed with status {}", code);281				}282				break;283			}284		}285	}286287	Ok(out_buf.map(String::from_utf8).transpose()?)288}289290#[async_trait]291pub trait CommandExt {292	// async fn run_nix(&mut self) -> Result<()>;293	// async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T>;294	// async fn run_nix_string(&mut self) -> Result<String>;295	// async fn run(&mut self) -> Result<()>;296	// async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T>;297	// async fn run_string(&mut self) -> Result<String>;298	fn inherit_stdio(&mut self) -> &mut Self;299}300301#[derive(Debug)]302enum LogField {303	String(String),304	Num(u64),305}306307impl<'de> Deserialize<'de> for LogField {308	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>309	where310		D: serde::Deserializer<'de>,311	{312		struct StringOrNum;313		impl<'de> Visitor<'de> for StringOrNum {314			type Value = LogField;315316			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {317				write!(f, "string or unsigned")318			}319320			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>321			where322				E: serde::de::Error,323			{324				Ok(LogField::String(v.to_owned()))325			}326327			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>328			where329				E: serde::de::Error,330			{331				Ok(LogField::Num(v))332			}333		}334335		deserializer.deserialize_any(StringOrNum)336	}337}338339#[derive(Deserialize, Debug)]340#[serde(rename_all = "camelCase", tag = "action")]341#[allow(dead_code)]342enum NixLog {343	Msg {344		level: u32,345		msg: String,346		raw_msg: Option<String>,347	},348	Start {349		id: u64,350		level: u32,351		#[serde(default)]352		fields: Vec<LogField>,353		text: String,354		#[serde(rename = "type")]355		typ: u32,356	},357	Stop {358		id: u64,359	},360	Result {361		id: u64,362		#[serde(rename = "type")]363		typ: u32,364	},365}366367#[async_trait]368impl CommandExt for Command {369	fn inherit_stdio(&mut self) -> &mut Self {370		self.stderr(Stdio::inherit());371		self.stdout(Stdio::inherit());372		self373	}374}