1use std::{collections::HashMap, ffi::OsStr, process::Stdio, task::Poll};23use anyhow::{Context, Result};4use futures::StreamExt;5use serde::{6 de::{DeserializeOwned, Visitor},7 Deserialize,8};9use tokio::{io::AsyncRead, process::Command, select};10use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};11use tracing::{info, info_span, warn, Span};12use tracing_indicatif::span_ext::IndicatifSpanExt;1314fn escape_bash(input: &str, out: &mut String) {15 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";16 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {17 out.push_str(input);18 return;19 }20 out.push('\'');21 for (i, v) in input.split('\'').enumerate() {22 if i != 0 {23 out.push_str("'\"'\"'");24 }25 out.push_str(v);26 }27 out.push('\'');28}29fn ostoutf8(os: impl AsRef<OsStr>) -> String {30 os.as_ref().to_str().expect("non-utf8 data").to_owned()31}32#[derive(Clone)]33pub struct MyCommand {34 command: String,35 args: Vec<String>,36 env: Vec<(String, String)>,37}38impl MyCommand {39 pub fn new(cmd: impl AsRef<OsStr>) -> Self {40 assert!(!cmd.as_ref().is_empty());41 Self {42 command: ostoutf8(cmd),43 args: vec![],44 env: vec![],45 }46 }47 fn into_args(self) -> Vec<String> {48 let mut out = Vec::new();49 if !self.env.is_empty() {50 out.push("env".to_owned());51 for (k, v) in self.env {52 assert!(!k.contains("="));53 out.push(format!("{k}={v}"));54 }55 }56 out.push(self.command);57 out.extend(self.args.into_iter());58 out59 }60 fn into_string(self) -> String {61 let mut out = String::new();62 if !self.env.is_empty() {63 out.push_str("env");64 for (k, v) in self.env {65 out.push(' ');66 assert!(!k.contains("="));67 escape_bash(&k, &mut out);68 out.push('=');69 escape_bash(&v, &mut out);70 }71 }72 if !out.is_empty() {73 out.push(' ');74 }75 escape_bash(&self.command, &mut out);76 for arg in self.args {77 out.push(' ');78 escape_bash(&arg, &mut out);79 }80 out81 }82 fn into_command(self) -> Command {83 let mut out = Command::new(self.command);84 out.args(self.args);85 for (k, v) in self.env {86 out.env(k, v);87 }88 out89 }90 pub fn arg(&mut self, arg: impl AsRef<OsStr>) -> &mut Self {91 let arg = arg.as_ref();92 self.args.push(ostoutf8(arg));93 self94 }95 pub fn eqarg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {96 let arg = arg.as_ref();97 let value = value.as_ref();98 let arg = ostoutf8(arg);99 let value = ostoutf8(value);100 self.arg(format!("{arg}={value}"));101 self102 }103 pub fn comparg(&mut self, arg: impl AsRef<OsStr>, value: impl AsRef<OsStr>) -> &mut Self {104 self.arg(arg);105 self.arg(value);106 self107 }108 pub fn args<V: AsRef<OsStr>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {109 for arg in args.into_iter() {110 let arg = arg.as_ref();111 self.args.push(ostoutf8(arg));112 }113 self114 }115 pub fn sudo(self) -> Self {116 let mut out = Self::new("sudo");117 out.args(self.into_args());118 out119 }120 pub fn ssh(self, on: impl AsRef<OsStr>) -> Self {121 let mut out = Self::new("ssh");122 out.arg(on).arg("--");123 out.arg(self.into_string());124 out125 }126127 pub async fn run(self) -> Result<()> {128 let str = self.clone().into_string();129 let cmd = self.into_command();130 run_nix_inner(str, cmd, &mut PlainHandler).await?;131 Ok(())132 }133 pub async fn run_string(self) -> Result<String> {134 let str = self.clone().into_string();135 let cmd = self.into_command();136 let v = run_nix_inner_stdout(str, cmd, &mut PlainHandler).await?;137 Ok(v)138 }139 pub async fn run_nix_json<T: DeserializeOwned>(self) -> Result<T> {140 let str = self.run_nix_string().await?;141 serde_json::from_str(&str).with_context(|| format!("{:?}", str))142 }143144 pub async fn run_nix_string(self) -> Result<String> {145 let str = self.clone().into_string();146 let mut cmd = self.into_command();147 cmd.arg("--log-format").arg("internal-json");148 run_nix_inner_stdout(str, cmd, &mut NixHandler::default()).await149 }150 pub async fn run_nix(self) -> Result<()> {151 let str = self.clone().into_string();152 let mut cmd = self.into_command();153 cmd.arg("--log-format").arg("internal-json");154 cmd.stdout(Stdio::inherit());155 run_nix_inner(str, cmd, &mut NixHandler::default()).await156 }157}158159struct EmptyAsyncRead;160impl AsyncRead for EmptyAsyncRead {161 fn poll_read(162 self: std::pin::Pin<&mut Self>,163 _cx: &mut std::task::Context<'_>,164 _buf: &mut tokio::io::ReadBuf<'_>,165 ) -> Poll<std::io::Result<()>> {166 Poll::Pending167 }168}169170async fn run_nix_inner_stdout(171 str: String,172 cmd: Command,173 handler: &mut dyn Handler,174) -> Result<String> {175 Ok(run_nix_inner_raw(str, cmd, true, handler)176 .await?177 .expect("has out"))178}179async fn run_nix_inner(str: String, cmd: Command, handler: &mut dyn Handler) -> Result<()> {180 let v = run_nix_inner_raw(str, cmd, false, handler).await?;181 assert!(v.is_none());182 Ok(())183}184185trait Handler {186 fn handle_err(&mut self, e: &str);187 fn handle_info(&mut self, e: &str);188}189190struct PlainHandler;191impl Handler for PlainHandler {192 fn handle_err(&mut self, e: &str) {193 info!(target: "log", "{e}");194 }195196 fn handle_info(&mut self, e: &str) {197 info!(target: "log", "{e}");198 }199}200201#[derive(Default)]202struct NixHandler {203 spans: HashMap<u64, Span>,204}205impl Handler for NixHandler {206 fn handle_err(&mut self, e: &str) {207 if let Some(e) = e.strip_prefix("@nix ") {208 let log: NixLog = match serde_json::from_str(e) {209 Ok(l) => l,210 Err(err) => {211 warn!("failed to parse nix log line {:?}: {}", e, err);212 return;213 }214 };215 match log {216 NixLog::Msg { msg, raw_msg, .. } => {217 if !(msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m Git tree '") && msg.ends_with("' is dirty"))218 && !msg.starts_with("\u{1b}[35;1mwarning:\u{1b}[0m not writing modified lock file of flake")219 && msg != "\u{1b}[35;1mwarning:\u{1b}[0m \u{1b}[31;1merror:\u{1b}[0m SQLite database '\u{1b}[35;1m/nix/var/nix/db/db.sqlite\u{1b}[0m' is busy" {220 if let Some(raw_msg) = raw_msg {221 if !msg.is_empty() {222 info!(target: "nix", "{}\n{}", raw_msg.trim_end(), msg.trim_end())223 } else {224 info!(target: "nix", "{}", raw_msg.trim_end())225 }226 } else {227 info!(target: "nix", "{}", msg.trim_end())228 }229 }230 }231 NixLog::Start {232 ref fields,233 typ,234 id,235 ..236 } if typ == 105 && !fields.is_empty() => {237 if let [LogField::String(drv), ..] = &fields[..] {238 let mut drv = drv.as_str();239 if let Some(pkg) = drv.strip_prefix("/nix/store/") {240 let mut it = pkg.splitn(2, '-');241 it.next();242 if let Some(pkg) = it.next() {243 drv = pkg;244 }245 }246 info!(target: "nix","building {}", drv);247 let span = info_span!("build", drv);248 span.pb_start();249 self.spans.insert(id, span);250 } else {251 warn!("bad build log: {:?}", log)252 }253 }254 NixLog::Start {255 ref fields,256 typ,257 id,258 ..259 } if typ == 100 && fields.len() >= 3 => {260 if let [LogField::String(drv), LogField::String(from), LogField::String(to), ..] =261 &fields[..]262 {263 let mut drv = drv.as_str();264265 if let Some(pkg) = drv.strip_prefix("/nix/store/") {266 let mut it = pkg.splitn(2, '-');267 it.next();268 if let Some(pkg) = it.next() {269 drv = pkg;270 }271 }272 273 let span = info_span!("copy", from, to, drv);274 span.pb_start();275 self.spans.insert(id, span);276 } else {277 warn!("bad copy log: {:?}", log)278 }279 }280 NixLog::Start { text, typ, id, .. }281 if typ == 0 || typ == 102 || typ == 103 || typ == 104 =>282 {283 if !text.is_empty()284 && text != "querying info about missing paths"285 && text != "copying 0 paths"286 {287 let span = info_span!("job");288 span.pb_start();289 span.pb_set_message(text.trim());290 self.spans.insert(id, span);291 info!(target: "nix", "{}", text);292 }293 }294 NixLog::Start {295 text,296 level: 0,297 typ: 108,298 ..299 } if text.is_empty() => {300 301 }302 NixLog::Start {303 text,304 level: 4,305 typ: 109,306 ..307 } if text.starts_with("querying info about ") => {308 309 }310 NixLog::Start {311 text,312 level: 4,313 typ: 101,314 ..315 } if text.starts_with("downloading ") => {316 317 }318 NixLog::Start {319 text,320 level: 1,321 typ: 111,322 ..323 } if text.starts_with("waiting for a machine to build ") => {324 325 }326 NixLog::Start {327 text,328 level: 3,329 typ: 111,330 ..331 } if text.starts_with("resolved derivation: ") => {332 333 }334 NixLog::Start {335 text,336 level: 1,337 typ: 111,338 id,339 ..340 } if text.starts_with("waiting for lock on ") => {341 let mut drv = text.strip_prefix("waiting for lock on ").unwrap();342 if let Some(txt) = drv.strip_prefix("\u{1b}[35;1m'") {343 drv = txt;344 }345 if let Some(txt) = drv.strip_suffix("'\u{1b}[0m") {346 drv = txt;347 }348 if let Some(txt) = drv.split("', '").next() {349 drv = txt;350 }351 if let Some(pkg) = drv.strip_prefix("/nix/store/") {352 let mut it = pkg.splitn(2, '-');353 it.next();354 if let Some(pkg) = it.next() {355 drv = pkg;356 }357 }358 let span = info_span!("waiting on drv", drv);359 span.pb_start();360 self.spans.insert(id, span);361 362 }363 NixLog::Stop { id, .. } => {364 self.spans.remove(&id);365 }366 NixLog::Result { fields, id, typ } if typ == 101 && !fields.is_empty() => {367 if let Some(span) = self.spans.get(&id) {368 if let LogField::String(s) = &fields[0] {369 span.pb_set_message(s.trim());370 } else {371 warn!("bad fields: {fields:?}");372 }373 } else {374 warn!("unknown result id: {id} {typ} {fields:?}");375 }376 377 }378 NixLog::Result { fields, id, typ } if typ == 105 && fields.len() >= 4 => {379 if let Some(span) = self.spans.get(&id) {380 if let [LogField::Num(done), LogField::Num(expected), LogField::Num(_running), LogField::Num(_failed)] =381 &fields[..4]382 {383 span.pb_set_length(*expected);384 span.pb_set_position(*done);385 } else {386 warn!("bad fields: {fields:?}");387 }388 } else {389 390 391 }392 393 }394 NixLog::Result { typ, .. } if typ == 104 || typ == 106 => {395 396 }397 _ => warn!("unknown log: {:?}", log),398 };399 } else {400 warn!(target = "nix", "unknown: {}", e.trim())401 }402 }403 fn handle_info(&mut self, o: &str) {404 self.handle_err(o)405 }406}407408async fn run_nix_inner_raw(409 str: String,410 mut cmd: Command,411 want_stdout: bool,412 handler: &mut dyn Handler,413) -> Result<Option<String>> {414 info!("running {str}");415 cmd.stderr(Stdio::piped());416 cmd.stdout(Stdio::piped());417 let mut child = cmd.spawn()?;418 let mut stderr = child.stderr.take().unwrap();419 let stdout = child.stdout.take().unwrap();420 let mut err = FramedRead::new(&mut stderr, LinesCodec::new());421 let mut out: Option<Box<dyn AsyncRead + Unpin>> = Some(Box::new(stdout));422 let mut ob = want_stdout423 .then(|| out.take().unwrap())424 .unwrap_or_else(|| Box::new(EmptyAsyncRead));425 let mut ol = (!want_stdout)426 .then(|| out.take().unwrap())427 .unwrap_or_else(|| Box::new(EmptyAsyncRead));428 let mut ob = FramedRead::new(&mut ob, BytesCodec::new());429 let mut ol = FramedRead::new(&mut ol, LinesCodec::new());430431 432433 let mut out_buf = if want_stdout { Some(vec![]) } else { None };434 loop {435 select! {436 e = err.next() => {437 if let Some(e) = e {438 let e = e?;439 handler.handle_err(&e);440 }441 },442 o = ob.next() => {443 if let Some(o) = o {444 out_buf.as_mut().expect("stdout == wants_stdout").extend_from_slice(&o?);445 }446 },447 o = ol.next() => {448 if let Some(o) = o {449 let o = o?;450 handler.handle_info(&o);451 }452 },453 code = child.wait() => {454 let code = code?;455 if !code.success() {456 anyhow::bail!("command '{str}' failed with status {}", code);457 }458 break;459 }460 }461 }462463 Ok(out_buf.map(String::from_utf8).transpose()?)464}465466#[derive(Debug)]467enum LogField {468 String(String),469 Num(u64),470}471472impl<'de> Deserialize<'de> for LogField {473 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>474 where475 D: serde::Deserializer<'de>,476 {477 struct StringOrNum;478 impl<'de> Visitor<'de> for StringOrNum {479 type Value = LogField;480481 fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {482 write!(f, "string or unsigned")483 }484485 fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>486 where487 E: serde::de::Error,488 {489 Ok(LogField::String(v.to_owned()))490 }491492 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>493 where494 E: serde::de::Error,495 {496 Ok(LogField::Num(v))497 }498 }499500 deserializer.deserialize_any(StringOrNum)501 }502}503504#[derive(Deserialize, Debug)]505#[serde(rename_all = "camelCase", tag = "action")]506#[allow(dead_code)]507enum NixLog {508 Msg {509 level: u32,510 msg: String,511 raw_msg: Option<String>,512 },513 Start {514 id: u64,515 level: u32,516 #[serde(default)]517 fields: Vec<LogField>,518 text: String,519 #[serde(rename = "type")]520 typ: u32,521 },522 Stop {523 id: u64,524 },525 Result {526 id: u64,527 #[serde(rename = "type")]528 typ: u32,529 #[serde(default)]530 fields: Vec<LogField>,531 },532}