git.delta.rocks / jrsonnet / refs/commits / 8d4a7a16f575

difftreelog

source

cmds/fleet/src/command.rs6.1 KiBsourcehistory
1use std::{ffi::OsStr, process::Stdio};23use anyhow::{Context, Result};4use async_trait::async_trait;5use futures::StreamExt;6use serde::{7	de::{DeserializeOwned, Visitor},8	Deserialize,9};10use tokio::{process::Command, select};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{info, warn};1314#[async_trait]15pub trait CommandExt {16	async fn run_nix(&mut self) -> Result<()>;17	async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T>;18	async fn run_nix_string(&mut self) -> Result<String>;19	async fn run(&mut self) -> Result<()>;20	async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T>;21	async fn run_string(&mut self) -> Result<String>;22	fn inherit_stdio(&mut self) -> &mut Self;23	fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self;24}2526#[derive(Debug)]27enum LogField {28	String(String),29	Num(u64),30}3132impl<'de> Deserialize<'de> for LogField {33	fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>34	where35		D: serde::Deserializer<'de>,36	{37		struct StringOrNum;38		impl<'de> Visitor<'de> for StringOrNum {39			type Value = LogField;4041			fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {42				write!(f, "string or unsigned")43			}4445			fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>46			where47				E: serde::de::Error,48			{49				Ok(LogField::String(v.to_owned()))50			}5152			fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>53			where54				E: serde::de::Error,55			{56				Ok(LogField::Num(v))57			}58		}5960		deserializer.deserialize_any(StringOrNum)61	}62}6364#[derive(Deserialize, Debug)]65#[serde(rename_all = "camelCase", tag = "action")]66enum NixLog {67	Msg {68		level: u32,69		msg: String,70		raw_msg: Option<String>,71	},72	Start {73		id: u64,74		level: u32,75		#[serde(default)]76		fields: Vec<LogField>,77		text: String,78		#[serde(rename = "type")]79		typ: u32,80	},81	Stop {82		id: u64,83	},84	Result {85		id: u64,86		#[serde(rename = "type")]87		typ: u32,88	},89}9091#[async_trait]92impl CommandExt for Command {93	async fn run_nix(&mut self) -> Result<()> {94		self.run_nix_string().await.map(|_| ())95	}96	async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T> {97		let str = self.run_nix_string().await?;98		serde_json::from_str(&str).with_context(|| format!("{:?}", str))99	}100101	async fn run_nix_string(&mut self) -> Result<String> {102		self.arg("--log-format").arg("internal-json");103		self.stderr(Stdio::piped());104		self.stdout(Stdio::piped());105		let mut child = self.spawn()?;106		let mut stderr = child.stderr.take().unwrap();107		let mut stdout = child.stdout.take().unwrap();108		let mut err = FramedRead::new(&mut stderr, LinesCodec::new());109		let mut out = FramedRead::new(&mut stdout, BytesCodec::new());110111		// while let Some(line) = read.next().await? {}112113		let mut out_buf = vec![];114		loop {115			select! {116				e = err.next() => {117					if let Some(e) = e {118						let e = e?;119						if let Some(e) = e.strip_prefix("@nix ") {120121							let log: NixLog = match serde_json::from_str(e) {122								Ok(l) => l,123								Err(err) => {124									warn!("failed to parse nix log line {:?}: {}", e, err);125									continue;126								},127							};128							match log {129								NixLog::Msg { msg, raw_msg, .. } => {130									if !(msg.ends_with(" is dirty") && msg.contains("warning:") && msg.contains(" Git tree ")) {131										info!(target: "nix", "{}", raw_msg.unwrap_or(msg))132									}133								},134								NixLog::Start { ref fields, typ, .. } if typ == 105 && fields.len() >= 1 => {135									if let [LogField::String(drv), ..] = &fields[..] {136										info!(target: "nix","building {}", drv)137									} else {138										warn!("bad build log: {:?}", log)139									}140								},141								NixLog::Start { ref fields, typ, .. } if typ == 100 && fields.len() >= 3 => {142									if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = &fields[..] {143										info!(target: "nix","copying {} {} -> {}", drv, from, to)144									} else {145										warn!("bad copy log: {:?}", log)146									}147								},148								NixLog::Start { text, typ, .. } if typ == 0 || typ == 102 || typ == 103 || typ == 104 => {149									if !text.is_empty() && text != "querying info about missing paths" && text != "copying 0 paths" {150										info!(target: "nix", "{}", text)151									}152								},153								NixLog::Start { text, level: 0, typ: 108, .. } if text == "" => {154									// Cache lookup? Coupled with copy log155								},156								NixLog::Start { text, level: 4, typ: 109, .. } if text.starts_with("querying info about ") => {157									// Cache lookup158								}159								NixLog::Start { text, level: 4, typ: 101, .. } if text.starts_with("downloading ") => {160									// NAR downloading, coupled with copy log161								}162								NixLog::Start { text, level: 1, typ: 111, .. } if text.starts_with("waiting for a machine to build ") => {163									// Useless repeating notification about build164								}165								NixLog::Stop { .. } => {},166								NixLog::Result { .. } => {},167								_ => warn!("unknown log: {:?}", log)168							};169						} else {170							warn!(target="nix","unknown: {}", e)171						}172					}173				},174				o = out.next() => {175					if let Some(o) = o {176						out_buf.extend_from_slice(&o?);177					}178				},179				code = child.wait() => {180					let code = code?;181					if !code.success() {182						anyhow::bail!("command ({:?}) failed with status {}", self, code);183					}184					break;185				}186			}187		}188189		Ok(String::from_utf8(out_buf)?)190	}191192	fn inherit_stdio(&mut self) -> &mut Self {193		self.stderr(Stdio::inherit());194		self195	}196197	async fn run(&mut self) -> Result<()> {198		self.inherit_stdio();199		let out = self.output().await?;200		if !out.status.success() {201			anyhow::bail!("command ({:?}) failed with status {}", self, out.status);202		}203		Ok(())204	}205206	async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T> {207		let str = self.run_string().await?;208		serde_json::from_str(&str).with_context(|| format!("{:?}", str))209	}210211	async fn run_string(&mut self) -> Result<String> {212		self.inherit_stdio();213		let out = self.output().await?;214		if !out.status.success() {215			anyhow::bail!("command ({:?}) failed with status {}", self, out.status);216		}217		Ok(String::from_utf8(out.stdout)?)218	}219220	fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self {221		let mut cmd = Command::new("ssh");222		cmd.arg(host).arg("--").arg(command);223		cmd224	}225}