1use std::{ffi::OsStr, process::Stdio};23use anyhow::{Context, Result};4use async_trait::async_trait;5use futures::StreamExt;6use serde::{7 de::{DeserializeOwned, Visitor},8 Deserialize, 9};10use tokio::{process::Command, select};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{info, warn};1314#[async_trait]15pub trait CommandExt {16 async fn run_nix(&mut self) -> Result<()>;17 async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T>;18 async fn run_nix_string(&mut self) -> Result<String>;19 async fn run(&mut self) -> Result<()>;20 async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T>;21 async fn run_string(&mut self) -> Result<String>;22 fn inherit_stdio(&mut self) -> &mut Self;23 fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self;24}2526#[derive(Debug)]27enum LogField {28 String(String),29 Num(u64),30}3132impl<'de> Deserialize<'de> for LogField {33 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>34 where35 D: serde::Deserializer<'de>,36 {37 struct StringOrNum;38 impl<'de> Visitor<'de> for StringOrNum {39 type Value = LogField;4041 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {42 write!(f, "string or unsigned")43 }4445 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>46 where47 E: serde::de::Error,48 {49 Ok(LogField::String(v.to_owned()))50 }5152 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>53 where54 E: serde::de::Error,55 {56 Ok(LogField::Num(v))57 }58 }5960 deserializer.deserialize_any(StringOrNum)61 }62}6364#[derive(Deserialize, Debug)]65#[serde(rename_all = "camelCase", tag = "action")]66enum NixLog {67 Msg {68 level: u32,69 msg: String,70 raw_msg: Option<String>,71 },72 Start {73 id: u64,74 level: u32,75 #[serde(default)]76 fields: Vec<LogField>,77 text: String,78 #[serde(rename = "type")]79 typ: u32,80 },81 Stop {82 id: u64,83 },84 Result {85 id: u64,86 #[serde(rename = "type")]87 typ: u32,88 },89}9091#[async_trait]92impl CommandExt for Command {93 async fn run_nix(&mut self) -> Result<()> {94 self.run_nix_string().await.map(|_| ())95 }96 async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T> {97 let str = self.run_nix_string().await?;98 serde_json::from_str(&str).with_context(|| format!("{:?}", str))99 }100101 async fn run_nix_string(&mut self) -> Result<String> {102 self.arg("--log-format").arg("internal-json");103 self.stderr(Stdio::piped());104 self.stdout(Stdio::piped());105 let mut child = self.spawn()?;106 let mut stderr = child.stderr.take().unwrap();107 let mut stdout = child.stdout.take().unwrap();108 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());109 let mut out = FramedRead::new(&mut stdout, BytesCodec::new());110111 112113 let mut out_buf = vec![];114 loop {115 select! {116 e = err.next() => {117 if let Some(e) = e {118 let e = e?;119 if let Some(e) = e.strip_prefix("@nix ") {120121 let log: NixLog = match serde_json::from_str(e) {122 Ok(l) => l,123 Err(err) => {124 warn!("failed to parse nix log line {:?}: {}", e, err);125 continue;126 },127 };128 match log {129 NixLog::Msg { msg, raw_msg, .. } => {130 if !(msg.ends_with(" is dirty") && msg.contains("warning:") && msg.contains(" Git tree ")) {131 info!(target: "nix", "{}", raw_msg.unwrap_or(msg))132 }133 },134 NixLog::Start { ref fields, typ, .. } if typ == 105 && fields.len() >= 1 => {135 if let [LogField::String(drv), ..] = &fields[..] {136 info!(target: "nix","building {}", drv)137 } else {138 warn!("bad build log: {:?}", log)139 }140 },141 NixLog::Start { ref fields, typ, .. } if typ == 100 && fields.len() >= 3 => {142 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = &fields[..] {143 info!(target: "nix","copying {} {} -> {}", drv, from, to)144 } else {145 warn!("bad copy log: {:?}", log)146 }147 },148 NixLog::Start { text, typ, .. } if typ == 0 || typ == 102 || typ == 103 || typ == 104 => {149 if !text.is_empty() && text != "querying info about missing paths" && text != "copying 0 paths" {150 info!(target: "nix", "{}", text)151 }152 },153 NixLog::Stop { .. } => {},154 NixLog::Result { .. } => {},155 _ => warn!("unknown log: {:?}", log)156 };157 } else {158 warn!(target="nix","unknown: {}", e)159 }160 }161 },162 o = out.next() => {163 if let Some(o) = o {164 out_buf.extend_from_slice(&o?);165 }166 },167 code = child.wait() => {168 let code = code?;169 if !code.success() {170 anyhow::bail!("command ({:?}) failed with status {}", self, code);171 }172 break;173 }174 }175 }176177 Ok(String::from_utf8(out_buf)?)178 }179180 fn inherit_stdio(&mut self) -> &mut Self {181 self.stderr(Stdio::inherit());182 self183 }184185 async fn run(&mut self) -> Result<()> {186 self.inherit_stdio();187 let out = self.output().await?;188 if !out.status.success() {189 anyhow::bail!("command ({:?}) failed with status {}", self, out.status);190 }191 Ok(())192 }193194 async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T> {195 let str = self.run_string().await?;196 serde_json::from_str(&str).with_context(|| format!("{:?}", str))197 }198199 async fn run_string(&mut self) -> Result<String> {200 self.inherit_stdio();201 let out = self.output().await?;202 if !out.status.success() {203 anyhow::bail!("command ({:?}) failed with status {}", self, out.status);204 }205 Ok(String::from_utf8(out.stdout)?)206 }207208 fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self {209 let mut cmd = Command::new("ssh");210 cmd.arg(host).arg("--").arg(command);211 cmd212 }213}