difftreelog
fix nix log
in: trunk
1 file changed
cmds/fleet/src/command.rsdiffbeforeafterboth1use std::{ffi::OsStr, process::Stdio};23use anyhow::{Context, Result};4use async_trait::async_trait;5use futures::StreamExt;6use serde::{7 de::{DeserializeOwned, Visitor},8 Deserialize,9};10use tokio::{process::Command, select};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{info, warn};1314#[async_trait]15pub trait CommandExt {16 async fn run_nix(&mut self) -> Result<()>;17 async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T>;18 async fn run_nix_string(&mut self) -> Result<String>;19 async fn run(&mut self) -> Result<()>;20 async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T>;21 async fn run_string(&mut self) -> Result<String>;22 fn inherit_stdio(&mut self) -> &mut Self;23 fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self;24}2526#[derive(Debug)]27enum LogField {28 String(String),29 Num(u64),30}3132impl<'de> Deserialize<'de> for LogField {33 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>34 where35 D: serde::Deserializer<'de>,36 {37 struct StringOrNum;38 impl<'de> Visitor<'de> for StringOrNum {39 type Value = LogField;4041 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {42 write!(f, "string or unsigned")43 }4445 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>46 where47 E: serde::de::Error,48 {49 Ok(LogField::String(v.to_owned()))50 }5152 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>53 where54 E: serde::de::Error,55 {56 Ok(LogField::Num(v))57 }58 }5960 deserializer.deserialize_any(StringOrNum)61 }62}6364#[derive(Deserialize, Debug)]65#[serde(rename_all = "camelCase", tag = "action")]66#[allow(dead_code)]67enum NixLog {68 Msg {69 level: u32,70 msg: String,71 raw_msg: Option<String>,72 },73 Start {74 id: u64,75 level: u32,76 #[serde(default)]77 fields: Vec<LogField>,78 text: String,79 #[serde(rename = "type")]80 typ: u32,81 },82 Stop {83 id: u64,84 },85 Result {86 id: u64,87 #[serde(rename = "type")]88 typ: u32,89 },90}9192#[async_trait]93impl CommandExt for Command {94 async fn run_nix(&mut self) -> Result<()> {95 self.run_nix_string().await.map(|_| ())96 }97 async fn run_nix_json<T: DeserializeOwned>(&mut self) -> Result<T> {98 let str = self.run_nix_string().await?;99 serde_json::from_str(&str).with_context(|| format!("{:?}", str))100 }101102 async fn run_nix_string(&mut self) -> Result<String> {103 self.arg("--log-format").arg("internal-json");104 self.stderr(Stdio::piped());105 self.stdout(Stdio::piped());106 let mut child = self.spawn()?;107 let mut stderr = child.stderr.take().unwrap();108 let mut stdout = child.stdout.take().unwrap();109 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());110 let mut out = FramedRead::new(&mut stdout, BytesCodec::new());111112 // while let Some(line) = read.next().await? {}113114 let mut out_buf = vec![];115 loop {116 select! {117 e = err.next() => {118 if let Some(e) = e {119 let e = e?;120 if let Some(e) = e.strip_prefix("@nix ") {121122 let log: NixLog = match serde_json::from_str(e) {123 Ok(l) => l,124 Err(err) => {125 warn!("failed to parse nix log line {:?}: {}", e, err);126 continue;127 },128 };129 match log {130 NixLog::Msg { msg, raw_msg, .. } => {131 if !(msg.ends_with(" is dirty") && msg.contains("warning:") && msg.contains(" Git tree ")) {132 info!(target: "nix", "{}", raw_msg.unwrap_or(msg))133 }134 },135 NixLog::Start { ref fields, typ, .. } if typ == 105 && !fields.is_empty() => {136 if let [LogField::String(drv), ..] = &fields[..] {137 info!(target: "nix","building {}", drv)138 } else {139 warn!("bad build log: {:?}", log)140 }141 },142 NixLog::Start { ref fields, typ, .. } if typ == 100 && fields.len() >= 3 => {143 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = &fields[..] {144 info!(target: "nix","copying {} {} -> {}", drv, from, to)145 } else {146 warn!("bad copy log: {:?}", log)147 }148 },149 NixLog::Start { text, typ, .. } if typ == 0 || typ == 102 || typ == 103 || typ == 104 => {150 if !text.is_empty() && text != "querying info about missing paths" && text != "copying 0 paths" {151 info!(target: "nix", "{}", text)152 }153 },154 NixLog::Start { text, level: 0, typ: 108, .. } if text.is_empty() => {155 // Cache lookup? Coupled with copy log156 },157 NixLog::Start { text, level: 4, typ: 109, .. } if text.starts_with("querying info about ") => {158 // Cache lookup159 }160 NixLog::Start { text, level: 4, typ: 101, .. } if text.starts_with("downloading ") => {161 // NAR downloading, coupled with copy log162 }163 NixLog::Start { text, level: 1, typ: 111, .. } if text.starts_with("waiting for a machine to build ") => {164 // Useless repeating notification about build165 }166 NixLog::Stop { .. } => {},167 NixLog::Result { .. } => {},168 _ => warn!("unknown log: {:?}", log)169 };170 } else {171 warn!(target="nix","unknown: {}", e)172 }173 }174 },175 o = out.next() => {176 if let Some(o) = o {177 out_buf.extend_from_slice(&o?);178 }179 },180 code = child.wait() => {181 let code = code?;182 if !code.success() {183 anyhow::bail!("command ({:?}) failed with status {}", self, code);184 }185 break;186 }187 }188 }189190 Ok(String::from_utf8(out_buf)?)191 }192193 fn inherit_stdio(&mut self) -> &mut Self {194 self.stderr(Stdio::inherit());195 self196 }197198 async fn run(&mut self) -> Result<()> {199 self.inherit_stdio();200 let out = self.spawn()?.wait_with_output().await?;201 if !out.status.success() {202 anyhow::bail!("command ({:?}) failed with status {}", self, out.status);203 }204 Ok(())205 }206207 async fn run_json<T: DeserializeOwned>(&mut self) -> Result<T> {208 let str = self.run_string().await?;209 serde_json::from_str(&str).with_context(|| format!("{:?}", str))210 }211212 async fn run_string(&mut self) -> Result<String> {213 self.inherit_stdio();214 self.stdout(Stdio::piped());215 let out = self.spawn()?.wait_with_output().await?;216 if !out.status.success() {217 anyhow::bail!("command ({:?}) failed with status {}", self, out.status);218 }219 Ok(String::from_utf8(out.stdout)?)220 }221222 fn ssh_on(host: impl AsRef<OsStr>, command: impl AsRef<OsStr>) -> Self {223 let mut cmd = Command::new("ssh");224 cmd.arg(host).arg("--").arg(command);225 cmd226 }227}