1use std::{collections::HashMap, ffi::OsStr, process::Stdio, task::Poll};23use anyhow::{Context, Result};4use futures::StreamExt;5use serde::{6 de::{DeserializeOwned, Visitor},7 Deserialize,8};9use tokio::{io::AsyncRead, process::Command, select};10use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};11use tracing::{info, info_span, warn, Span};12use tracing_indicatif::span_ext::IndicatifSpanExt;1314fn escape_bash(input: &str, out: &mut String) {15 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17 out.push_str(input);18 return;19 }20 out.push('\'');21 for (i, v) in input.split('\'').enumerate() {22 if i != 0 {23 out.push_str("'\"'\"'");24 }25 out.push_str(v);26 }27 out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30 os.as_ref().to_str().expect("non-utf8 data").to_owned()31}32#[derive(Clone)]33pub struct MyCommand {34 command: String,35 args: Vec<String>,36 env: Vec<(String, String)>,37}38impl MyCommand {39 pub fn new(cmd: impl AsRef<OsStr>) -> Self {40 assert!(!cmd.as_ref().is_empty());41 Self {42 command: ostoutf8(cmd),43 args: vec![],44 env: vec![],45 }46 }47 fn into_args(self) -> Vec<String> {48 let mut out = Vec::new();49 if !self.env.is_empty() {50 out.push("env".to_owned());51 for (k, v) in self.env {52 assert!(!k.contains("="));53 out.push(format!("{k}={v}"));54 }55 }56 out.push(self.command);57 out.extend(self.args.into_iter());58 out59 }60 fn into_string(self) -> String {61 let mut out = String::new();62 if !self.env.is_empty() {63 out.push_str("env");64 for (k, v) in self.env {65 out.push(' ');66 assert!(!k.contains("="));67 escape_bash(&k, &mut out);68 out.push('=');69 escape_bash(&v, &mut out);70 }71 }72 if !out.is_empty() {73 out.push(' ');74 }75 escape_bash(&self.command, &mut out);76 for arg in self.args {77 out.push(' ');78 escape_bash(&arg, &mut out);79 }80 out81 }82 fn into_command(self) -> Command {83 let mut out = Command::new(self.command);84 out.args(self.args);85 for (k, v) in self.env {86 out.env(k, v);87 }88 out89 }90 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {91 let arg = arg.as_ref();92 self.args.push(ostoutf8(arg));93 self94 }95 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {96 let arg = arg.as_ref();97 let value = value.as_ref();98 let arg = ostoutf8(arg);99 let value = ostoutf8(value);100 self.arg(format!("{arg}={value}"));101 self102 }103 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {104 self.arg(arg);105 self.arg(value);106 self107 }108 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {109 for arg in args.into_iter() {110 let arg = arg.as_ref();111 self.args.push(ostoutf8(arg));112 }113 self114 }115 pub fn sudo(self) -> Self {116 let mut out = Self::new("sudo");117 out.args(self.into_args());118 out119 }120 pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {121 let mut out = Self::new("ssh");122 out.arg(on).arg("--");123 out.arg(self.into_string());124 out125 }126127 pub async fn run(self) -> Result<()> {128 let str = self.clone().into_string();129 let cmd = self.into_command();130 run_nix_inner(str, cmd, &mut PlainHandler).await?;131 Ok(())132 }133 pub async fn run_string(self) -> Result<String> {134 let str = self.clone().into_string();135 let cmd = self.into_command();136 let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;137 Ok(v)138 }139 pub async fn run_nix_json<T: DeserializeOwned>(self) -> Result<T> {140 let str = self.run_nix_string().await?;141 serde_json::from_str(&str).with_context(|| format!("{:?}", str))142 }143144 pub async fn run_nix_string(self) -> Result<String> {145 let str = self.clone().into_string();146 let cmd = self.into_command();147 run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await148 }149 pub async fn run_nix(self) -> Result<()> {150 let str = self.clone().into_string();151 let mut cmd = self.into_command();152 cmd.stdout(Stdio::inherit());153 run_nix_inner(str, cmd, &mut NixHandler::default()).await154 }155}156157struct EmptyAsyncRead;158impl AsyncRead for EmptyAsyncRead {159 fn poll_read(160 self: std::pin::Pin<&mut Self>,161 _cx: &mut std::task::Context<'_>,162 _buf: &mut tokio::io::ReadBuf<'_>,163 ) -> Poll<std::io::Result<()>> {164 Poll::Pending165 }166}167168async fn run_nix_inner_stdout(169 str: String,170 cmd: Command,171 handler: &mut dyn Handler,172) -> Result<String> {173 Ok(run_nix_inner_raw(str, cmd, true, handler)174 .await?175 .expect("has out"))176}177async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {178 let v = run_nix_inner_raw(str, cmd, false, handler).await?;179 assert!(v.is_none());180 Ok(())181}182183trait Handler {184 fn handle_err(&mut self, e: &str);185 fn handle_info(&mut self, e: &str);186}187188struct PlainHandler;189impl Handler for PlainHandler {190 fn handle_err(&mut self, e: &str) {191 info!(target: "log", "{e}");192 }193194 fn handle_info(&mut self, e: &str) {195 info!(target: "log", "{e}");196 }197}198199#[derive(Default)]200struct NixHandler {201 spans: HashMap<u64, Span>,202}203impl Handler for NixHandler {204 fn handle_err(&mut self, e: &str) {205 if let Some(e) = e.strip_prefix("@nix ") {206 let log: NixLog = match serde_json::from_str(e) {207 Ok(l) => l,208 Err(err) => {209 warn!("failed to parse nix log line {:?}: {}", e, err);210 return;211 }212 };213 match log {214 NixLog::Msg { msg, raw_msg, .. } => {215 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))216 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")217 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {218 if let Some(raw_msg) = raw_msg {219 if !msg.is_empty() {220 info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())221 } else {222 info!(target: "nix", "{}", raw_msg.trim_end())223 }224 } else {225 info!(target: "nix", "{}", msg.trim_end())226 }227 }228 }229 NixLog::Start {230 ref fields,231 typ,232 id,233 ..234 } if typ == 105 && !fields.is_empty() => {235 if let [LogField::String(drv), ..] = &fields[..] {236 let mut drv = drv.as_str();237 if let Some(pkg) = drv.strip_prefix("/nix/store/") {238 let mut it = pkg.splitn(2, '-');239 it.next();240 if let Some(pkg) = it.next() {241 drv = pkg;242 }243 }244 info!(target: "nix","building {}", drv);245 let span = info_span!("build", drv);246 span.pb_start();247 self.spans.insert(id, span);248 } else {249 warn!("bad build log: {:?}", log)250 }251 }252 NixLog::Start {253 ref fields,254 typ,255 id,256 ..257 } if typ == 100 && fields.len() >= 3 => {258 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =259 &fields[..]260 {261 let mut drv = drv.as_str();262263 if let Some(pkg) = drv.strip_prefix("/nix/store/") {264 let mut it = pkg.splitn(2, '-');265 it.next();266 if let Some(pkg) = it.next() {267 drv = pkg;268 }269 }270 info!(target: "nix","copying {} {} -> {}", drv, from, to);271 let span = info_span!("copy", from, to, drv);272 span.pb_start();273 self.spans.insert(id, span);274 } else {275 warn!("bad copy log: {:?}", log)276 }277 }278 NixLog::Start { text, typ, id, .. }279 if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>280 {281 if !text.is_empty()282 && text != "querying info about missing paths"283 && text != "copying 0 paths"284 {285 let span = info_span!("job");286 span.pb_start();287 span.pb_set_message(text.trim());288 self.spans.insert(id, span);289 info!(target: "nix", "{}", text);290 }291 }292 NixLog::Start {293 text,294 level: 0,295 typ: 108,296 ..297 } if text.is_empty() => {298 299 }300 NixLog::Start {301 text,302 level: 4,303 typ: 109,304 ..305 } if text.starts_with("querying info about ") => {306 307 }308 NixLog::Start {309 text,310 level: 4,311 typ: 101,312 ..313 } if text.starts_with("downloading ") => {314 315 }316 NixLog::Start {317 text,318 level: 1,319 typ: 111,320 ..321 } if text.starts_with("waiting for a machine to build ") => {322 323 }324 NixLog::Start {325 text,326 level: 3,327 typ: 111,328 ..329 } if text.starts_with("resolved derivation: ") => {330 331 }332 NixLog::Start {333 text,334 level: 1,335 typ: 111,336 id,337 ..338 } if text.starts_with("waiting for lock on ") => {339 let mut drv = text.strip_prefix("waiting for lock on ").unwrap();340 if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {341 drv = txt;342 }343 if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {344 drv = txt;345 }346 if let Some(txt) = drv.split("', '").next() {347 drv = txt;348 }349 if let Some(pkg) = drv.strip_prefix("/nix/store/") {350 let mut it = pkg.splitn(2, '-');351 it.next();352 if let Some(pkg) = it.next() {353 drv = pkg;354 }355 }356 let span = info_span!("waiting on drv", drv);357 span.pb_start();358 self.spans.insert(id, span);359 360 }361 NixLog::Stop { id, .. } => {362 self.spans.remove(&id);363 }364 NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {365 if let Some(span) = self.spans.get(&id) {366 if let LogField::String(s) = &fields[0] {367 span.pb_set_message(s.trim());368 } else {369 warn!("bad fields: {fields:?}");370 }371 } else {372 warn!("unknown result id: {id} {typ} {fields:?}");373 }374 375 }376 NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {377 if let Some(span) = self.spans.get(&id) {378 if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =379 &fields[..4]380 {381 span.pb_set_length(*expected);382 span.pb_set_position(*done);383 } else {384 warn!("bad fields: {fields:?}");385 }386 } else {387 388 389 }390 391 }392 NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {393 394 }395 _ => warn!("unknown log: {:?}", log),396 };397 } else {398 warn!(target = "nix", "unknown: {}", e.trim())399 }400 }401 fn handle_info(&mut self, o: &str) {402 self.handle_err(o)403 }404}405406async fn run_nix_inner_raw(407 str: String,408 mut cmd: Command,409 want_stdout: bool,410 handler: &mut dyn Handler,411) -> Result<Option<String>> {412 info!("running {str}");413 cmd.arg("--log-format").arg("internal-json");414 cmd.stderr(Stdio::piped());415 cmd.stdout(Stdio::piped());416 let mut child = cmd.spawn()?;417 let mut stderr = child.stderr.take().unwrap();418 let stdout = child.stdout.take().unwrap();419 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());420 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));421 let mut ob = want_stdout422 .then(|| out.take().unwrap())423 .unwrap_or_else(|| Box::new(EmptyAsyncRead));424 let mut ol = (!want_stdout)425 .then(|| out.take().unwrap())426 .unwrap_or_else(|| Box::new(EmptyAsyncRead));427 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());428 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());429430 431432 let mut out_buf = if want_stdout { Some(vec![]) } else { None };433 loop {434 select! {435 e = err.next() => {436 if let Some(e) = e {437 let e = e?;438 handler.handle_err(&e);439 }440 },441 o = ob.next() => {442 if let Some(o) = o {443 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);444 }445 },446 o = ol.next() => {447 if let Some(o) = o {448 let o = o?;449 handler.handle_info(&o);450 }451 },452 code = child.wait() => {453 let code = code?;454 if !code.success() {455 anyhow::bail!("command '{str}' failed with status {}", code);456 }457 break;458 }459 }460 }461462 Ok(out_buf.map(String::from_utf8).transpose()?)463}464465#[derive(Debug)]466enum LogField {467 String(String),468 Num(u64),469}470471impl<'de> Deserialize<'de> for LogField {472 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>473 where474 D: serde::Deserializer<'de>,475 {476 struct StringOrNum;477 impl<'de> Visitor<'de> for StringOrNum {478 type Value = LogField;479480 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {481 write!(f, "string or unsigned")482 }483484 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>485 where486 E: serde::de::Error,487 {488 Ok(LogField::String(v.to_owned()))489 }490491 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>492 where493 E: serde::de::Error,494 {495 Ok(LogField::Num(v))496 }497 }498499 deserializer.deserialize_any(StringOrNum)500 }501}502503#[derive(Deserialize, Debug)]504#[serde(rename_all = "camelCase", tag = "action")]505#[allow(dead_code)]506enum NixLog {507 Msg {508 level: u32,509 msg: String,510 raw_msg: Option<String>,511 },512 Start {513 id: u64,514 level: u32,515 #[serde(default)]516 fields: Vec<LogField>,517 text: String,518 #[serde(rename = "type")]519 typ: u32,520 },521 Stop {522 id: u64,523 },524 Result {525 id: u64,526 #[serde(rename = "type")]527 typ: u32,528 #[serde(default)]529 fields: Vec<LogField>,530 },531}