git.delta.rocks / jrsonnet / refs/commits / 837e795f702e

difftreelog

source

cmds/fleet/src/command.rs7.2 KiBsourcehistory
1use std::{ffi::OsStr, process::Stdio};23use anyhow::{Context, Result};4use async_trait::async_trait;5use futures::StreamExt;6use serde::{7	de::{DeserializeOwned, Visitor},8	Deserialize,9};10use tokio::{process::Command, select};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{info, warn};1314#[async_trait]15pub trait CommandExt {16	async fn run_nix(&mut self) -> Result<()>;17	async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T>;18	async fn run_nix_string(&mut self) -> Result<String>;19	async fn run(&mut self) -> Result<()>;20	async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T>;21	async fn run_string(&mut self) -> Result<String>;22	fn inherit_stdio(&mut self) -> &mut Self;23	fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self;24}2526#[derive(Debug)]27enum LogField {28	String(String),29	Num(u64),30}3132impl<'de> Deserialize<'de> for LogField {33	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>34	where35		D: serde::Deserializer<'de>,36	{37		struct StringOrNum;38		impl<'de> Visitor<'de> for StringOrNum {39			type Value = LogField;4041			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {42				write!(f, "string or unsigned")43			}4445			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>46			where47				E: serde::de::Error,48			{49				Ok(LogField::String(v.to_owned()))50			}5152			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>53			where54				E: serde::de::Error,55			{56				Ok(LogField::Num(v))57			}58		}5960		deserializer.deserialize_any(StringOrNum)61	}62}6364#[derive(Deserialize, Debug)]65#[serde(rename_all = "camelCase", tag = "action")]66#[allow(dead_code)]67enum NixLog {68	Msg {69		level: u32,70		msg: String,71		raw_msg: Option<String>,72	},73	Start {74		id: u64,75		level: u32,76		#[serde(default)]77		fields: Vec<LogField>,78		text: String,79		#[serde(rename = "type")]80		typ: u32,81	},82	Stop {83		id: u64,84	},85	Result {86		id: u64,87		#[serde(rename = "type")]88		typ: u32,89	},90}9192#[async_trait]93impl CommandExt for Command {94	async fn run_nix(&mut self) -> Result<()> {95		self.run_nix_string().await.map(|_| ())96	}97	async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T> {98		let str = self.run_nix_string().await?;99		serde_json::from_str(&str).with_context(|| format!("{:?}", str))100	}101102	async fn run_nix_string(&mut self) -> Result<String> {103		self.arg("--log-format").arg("internal-json");104		self.stderr(Stdio::piped());105		self.stdout(Stdio::piped());106		let mut child = self.spawn()?;107		let mut stderr = child.stderr.take().unwrap();108		let mut stdout = child.stdout.take().unwrap();109		let mut err = FramedRead::new(&mut stderr, LinesCodec::new());110		let mut out = FramedRead::new(&mut stdout, BytesCodec::new());111112		// while let Some(line) = read.next().await? {}113114		let mut out_buf = vec![];115		loop {116			select! {117				e = err.next() => {118					if let Some(e) = e {119						let e = e?;120						if let Some(e) = e.strip_prefix("@nix ") {121122							let log: NixLog = match serde_json::from_str(e) {123								Ok(l) => l,124								Err(err) => {125									warn!("failed to parse nix log line {:?}: {}", e, err);126									continue;127								},128							};129							match log {130								NixLog::Msg { msg, raw_msg, .. } => {131									if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))132										&& !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")133										&& msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {134										if let Some(raw_msg) = raw_msg {135											info!(target: "nix", "{raw_msg}\n{msg}")136										}else {137											info!(target: "nix", "{msg}")138139										}140									}141								},142								NixLog::Start { ref fields, typ, .. } if typ == 105 && !fields.is_empty() => {143									if let [LogField::String(drv), ..] = &fields[..] {144										info!(target: "nix","building {}", drv)145									} else {146										warn!("bad build log: {:?}", log)147									}148								},149								NixLog::Start { ref fields, typ, .. } if typ == 100 && fields.len() >= 3 => {150									if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = &fields[..] {151										info!(target: "nix","copying {} {} -> {}", drv, from, to)152									} else {153										warn!("bad copy log: {:?}", log)154									}155								},156								NixLog::Start { text, typ, .. } if typ == 0 || typ == 102 || typ == 103 || typ == 104 => {157									if !text.is_empty() && text != "querying info about missing paths" && text != "copying 0 paths" {158										info!(target: "nix", "{}", text)159									}160								},161								NixLog::Start { text, level: 0, typ: 108, .. } if text.is_empty() => {162									// Cache lookup? Coupled with copy log163								},164								NixLog::Start { text, level: 4, typ: 109, .. } if text.starts_with("querying info about ") => {165									// Cache lookup166								}167								NixLog::Start { text, level: 4, typ: 101, .. } if text.starts_with("downloading ") => {168									// NAR downloading, coupled with copy log169								}170								NixLog::Start { text, level: 1, typ: 111, .. } if text.starts_with("waiting for a machine to build ") => {171									// Useless repeating notification about build172								}173								NixLog::Start { text, level: 3, typ: 111, .. } if text.starts_with("resolved derivation:  ") => {174									// CA resolved175								}176								NixLog::Stop { .. } => {},177								NixLog::Result { .. } => {},178								_ => warn!("unknown log: {:?}", log)179							};180						} else {181							warn!(target="nix","unknown: {}", e)182						}183					}184				},185				o = out.next() => {186					if let Some(o) = o {187						out_buf.extend_from_slice(&o?);188					}189				},190				code = child.wait() => {191					let code = code?;192					if !code.success() {193						anyhow::bail!("command ({:?}) failed with status {}", self, code);194					}195					break;196				}197			}198		}199200		Ok(String::from_utf8(out_buf)?)201	}202203	fn inherit_stdio(&mut self) -> &mut Self {204		self.stderr(Stdio::inherit());205		self206	}207208	async fn run(&mut self) -> Result<()> {209		self.stderr(Stdio::piped());210		self.stdout(Stdio::piped());211		let mut child = self.spawn()?;212		let mut stderr = child.stderr.take().unwrap();213		let mut stdout = child.stdout.take().unwrap();214		let mut err = FramedRead::new(&mut stderr, LinesCodec::new());215		let mut out = FramedRead::new(&mut stdout, LinesCodec::new());216		loop {217			select! {218				e = err.next() => {219					if let Some(e) = e {220						warn!("{}", e?);221					}222				},223				o = out.next() => {224					if let Some(o) = o {225						info!("{}", o?);226					}227				},228				code = child.wait() => {229					let code = code?;230					if !code.success() {231						anyhow::bail!("command ({:?}) failed with status {}", self, code);232					}233					break;234				}235			}236		}237		Ok(())238	}239240	async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T> {241		let str = self.run_string().await?;242		serde_json::from_str(&str).with_context(|| format!("{:?}", str))243	}244245	async fn run_string(&mut self) -> Result<String> {246		self.inherit_stdio();247		self.stdout(Stdio::piped());248		let out = self.spawn()?.wait_with_output().await?;249		if !out.status.success() {250			anyhow::bail!("command ({:?}) failed with status {}", self, out.status);251		}252		Ok(String::from_utf8(out.stdout)?)253	}254255	fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self {256		let mut cmd = Command::new("ssh");257		cmd.arg(host).arg("--").arg(command);258		cmd259	}260}