1use std::{ffi::OsStr, process::Stdio, task::Poll};23use anyhow::{Context, Result};4use async_trait::async_trait;5use futures::StreamExt;6use serde::{7 de::{DeserializeOwned, Visitor},8 Deserialize,9};10use tokio::{io::AsyncRead, process::Command, select};11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};12use tracing::{info, warn};1314fn escape_bash(input: &str, out: &mut String) {15 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17 out.push_str(input);18 return;19 }20 out.push('\'');21 for (i, v) in input.split('\'').enumerate() {22 if i != 0 {23 out.push_str("'\"'\"'");24 }25 out.push_str(v);26 }27 out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30 os.as_ref().to_str().expect("non-utf8 data").to_owned()31}32#[derive(Clone)]33pub struct MyCommand {34 command: String,35 args: Vec<String>,36 env: Vec<(String, String)>,37}38impl MyCommand {39 pub fn new(cmd: impl AsRef<OsStr>) -> Self {40 assert!(!cmd.as_ref().is_empty());41 Self {42 command: ostoutf8(cmd),43 args: vec![],44 env: vec![],45 }46 }47 fn into_args(self) -> Vec<String> {48 let mut out = Vec::new();49 if !self.env.is_empty() {50 out.push("env".to_owned());51 for (k, v) in self.env {52 assert!(!k.contains("="));53 out.push(format!("{k}={v}"));54 }55 }56 out.push(self.command);57 out.extend(self.args.into_iter());58 out59 }60 fn into_string(self) -> String {61 let mut out = String::new();62 if !self.env.is_empty() {63 out.push_str("env");64 for (k, v) in self.env {65 out.push(' ');66 assert!(!k.contains("="));67 escape_bash(&k, &mut out);68 out.push('=');69 escape_bash(&v, &mut out);70 }71 }72 if !out.is_empty() {73 out.push(' ');74 }75 escape_bash(&self.command, &mut out);76 for arg in self.args {77 out.push(' ');78 escape_bash(&arg, &mut out);79 }80 out81 }82 fn into_command(self) -> Command {83 let mut out = Command::new(self.command);84 out.args(self.args);85 for (k, v) in self.env {86 out.env(k, v);87 }88 out89 }90 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {91 let arg = arg.as_ref();92 self.args.push(ostoutf8(arg));93 self94 }95 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {96 let arg = arg.as_ref();97 let value = value.as_ref();98 let arg = ostoutf8(arg);99 let value = ostoutf8(value);100 self.arg(format!("{arg}={value}"));101 self102 }103 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {104 self.arg(arg);105 self.arg(value);106 self107 }108 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {109 for arg in args.into_iter() {110 let arg = arg.as_ref();111 self.args.push(ostoutf8(arg));112 }113 self114 }115 pub fn sudo(self) -> Self {116 let mut out = Self::new("sudo");117 out.args(self.into_args());118 out119 }120 pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {121 let mut out = Self::new("ssh");122 out.arg(on).arg("--");123 out.arg(self.into_string());124 out125 }126127 pub async fn run(self) -> Result<()> {128 let str = self.clone().into_string();129 info!("running {str}");130 let mut cmd = self.into_command();131 cmd.inherit_stdio();132 let out = cmd.spawn()?.wait_with_output().await?;133 if !out.status.success() {134 anyhow::bail!("command '{}' failed with status {}", str, out.status);135 }136 Ok(())137 }138 pub async fn run_string(self) -> Result<String> {139 let str = self.clone().into_string();140 info!("running {str}");141 let mut cmd = self.into_command();142 cmd.inherit_stdio();143 cmd.stdout(Stdio::piped());144 let out = cmd.spawn()?.wait_with_output().await?;145 if !out.status.success() {146 anyhow::bail!("command '{}' failed with status {}", str, out.status);147 }148 Ok(String::from_utf8(out.stdout)?)149 }150 pub async fn run_nix_json<T: DeserializeOwned>(self) -> Result<T> {151 let str = self.run_nix_string().await?;152 serde_json::from_str(&str).with_context(|| format!("{:?}", str))153 }154155 pub async fn run_nix_string(self) -> Result<String> {156 let str = self.clone().into_string();157 let mut cmd = self.into_command();158 cmd.stdout(Stdio::piped());159 run_nix_inner(str, cmd).await.map(|v| v.unwrap())160 }161 pub async fn run_nix(self) -> Result<()> {162 let str = self.clone().into_string();163 let mut cmd = self.into_command();164 cmd.stdout(Stdio::inherit());165 run_nix_inner(str, cmd).await.map(|v| {166 assert!(v.is_none());167 })168 }169}170171struct EmptyAsyncRead;172impl AsyncRead for EmptyAsyncRead {173 fn poll_read(174 self: std::pin::Pin<&mut Self>,175 _cx: &mut std::task::Context<'_>,176 _buf: &mut tokio::io::ReadBuf<'_>,177 ) -> Poll<std::io::Result<()>> {178 Poll::Pending179 }180}181182async fn run_nix_inner(str: String, mut cmd: Command) -> Result<Option<String>> {183 info!("running {str}");184 cmd.arg("--log-format").arg("internal-json");185 cmd.stderr(Stdio::piped());186 let mut child = cmd.spawn()?;187 let mut stderr = child.stderr.take().unwrap();188 let stdout = child.stdout.take();189 let wants_stdout = stdout.is_some();190 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());191 let mut out: Box<dyn AsyncRead + Unpin> = stdout192 .map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin>)193 .unwrap_or_else(|| Box::new(EmptyAsyncRead));194 let mut out = FramedRead::new(&mut out, BytesCodec::new());195196 197198 let mut out_buf = if wants_stdout { Some(vec![]) } else { None };199 loop {200 select! {201 e = err.next() => {202 if let Some(e) = e {203 let e = e?;204 if let Some(e) = e.strip_prefix("@nix ") {205206 let log: NixLog = match serde_json::from_str(e) {207 Ok(l) => l,208 Err(err) => {209 warn!("failed to parse nix log line {:?}: {}", e, err);210 continue;211 },212 };213 match log {214 NixLog::Msg { msg, raw_msg, .. } => {215 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))216 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")217 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {218 if let Some(raw_msg) = raw_msg {219 info!(target: "nix", "{raw_msg}\n{msg}")220 }else {221 info!(target: "nix", "{msg}")222223 }224 }225 },226 NixLog::Start { ref fields, typ, .. } if typ == 105 && !fields.is_empty() => {227 if let [LogField::String(drv), ..] = &fields[..] {228 info!(target: "nix","building {}", drv)229 } else {230 warn!("bad build log: {:?}", log)231 }232 },233 NixLog::Start { ref fields, typ, .. } if typ == 100 && fields.len() >= 3 => {234 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] = &fields[..] {235 info!(target: "nix","copying {} {} -> {}", drv, from, to)236 } else {237 warn!("bad copy log: {:?}", log)238 }239 },240 NixLog::Start { text, typ, .. } if typ == 0 || typ == 102 || typ == 103 || typ == 104 => {241 if !text.is_empty() && text != "querying info about missing paths" && text != "copying 0 paths" {242 info!(target: "nix", "{}", text)243 }244 },245 NixLog::Start { text, level: 0, typ: 108, .. } if text.is_empty() => {246 247 },248 NixLog::Start { text, level: 4, typ: 109, .. } if text.starts_with("querying info about ") => {249 250 }251 NixLog::Start { text, level: 4, typ: 101, .. } if text.starts_with("downloading ") => {252 253 }254 NixLog::Start { text, level: 1, typ: 111, .. } if text.starts_with("waiting for a machine to build ") => {255 256 }257 NixLog::Start { text, level: 3, typ: 111, .. } if text.starts_with("resolved derivation: ") => {258 259 }260 NixLog::Stop { .. } => {},261 NixLog::Result { .. } => {},262 _ => warn!("unknown log: {:?}", log)263 };264 } else {265 warn!(target="nix","unknown: {}", e)266 }267 }268 },269 o = out.next() => {270 if let Some(o) = o {271 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);272 }273 },274 code = child.wait() => {275 let code = code?;276 if !code.success() {277 anyhow::bail!("command '{str}' failed with status {}", code);278 }279 break;280 }281 }282 }283284 Ok(out_buf.map(String::from_utf8).transpose()?)285}286287#[async_trait]288pub trait CommandExt {289 290 291 292 293 294 295 fn inherit_stdio(&mut self) -> &mut Self;296}297298#[derive(Debug)]299enum LogField {300 String(String),301 Num(u64),302}303304impl<'de> Deserialize<'de> for LogField {305 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>306 where307 D: serde::Deserializer<'de>,308 {309 struct StringOrNum;310 impl<'de> Visitor<'de> for StringOrNum {311 type Value = LogField;312313 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {314 write!(f, "string or unsigned")315 }316317 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>318 where319 E: serde::de::Error,320 {321 Ok(LogField::String(v.to_owned()))322 }323324 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>325 where326 E: serde::de::Error,327 {328 Ok(LogField::Num(v))329 }330 }331332 deserializer.deserialize_any(StringOrNum)333 }334}335336#[derive(Deserialize, Debug)]337#[serde(rename_all = "camelCase", tag = "action")]338#[allow(dead_code)]339enum NixLog {340 Msg {341 level: u32,342 msg: String,343 raw_msg: Option<String>,344 },345 Start {346 id: u64,347 level: u32,348 #[serde(default)]349 fields: Vec<LogField>,350 text: String,351 #[serde(rename = "type")]352 typ: u32,353 },354 Stop {355 id: u64,356 },357 Result {358 id: u64,359 #[serde(rename = "type")]360 typ: u32,361 },362}363364#[async_trait]365impl CommandExt for Command {366 fn inherit_stdio(&mut self) -> &mut Self {367 self.stderr(Stdio::inherit());368 self.stdout(Stdio::inherit());369 self370 }371}